Skip to content

Orchestrator

ActionParseError

Bases: OrchestratorError

Exception raised when an error occurs during action parsing.

Source code in dynamiq/nodes/agents/orchestrators/orchestrator.py
19
20
class ActionParseError(OrchestratorError):
    """Exception raised when an error occurs during action parsing."""

Orchestrator

Bases: Node, ABC

Orchestrates the execution of complex tasks using multiple specialized agents.

This abstract base class provides the framework for orchestrating complex tasks through multiple agents. It manages the execution flow and communication between different specialized agents.

Attributes:

Name Type Description
manager ManagerAgent

The managing agent responsible for overseeing the orchestration process.

objective Optional[str]

The main objective of the orchestration.

Abstract Methods

run_flow: Processes the given task and returns the result. setup_streaming: Configures streaming functionality for the orchestrator.

Source code in dynamiq/nodes/agents/orchestrators/orchestrator.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class Orchestrator(Node, ABC):
    """
    Orchestrates the execution of complex tasks using multiple specialized agents.

    This abstract base class provides the framework for orchestrating complex tasks
    through multiple agents. It manages the execution flow and communication between
    different specialized agents.

    Attributes:
        manager (ManagerAgent): The managing agent responsible for overseeing the orchestration process.
        objective (Optional[str]): The main objective of the orchestration.

    Abstract Methods:
        run_flow: Processes the given task and returns the result.
        setup_streaming: Configures streaming functionality for the orchestrator.
    """

    name: str | None = "Orchestrator"
    group: NodeGroup = NodeGroup.AGENTS
    input_schema: ClassVar[type[OrchestratorInputSchema]] = OrchestratorInputSchema
    manager: AgentManager
    objective: str = ""

    def __init__(self, **kwargs):
        """
        Initialize the orchestrator with given parameters.

        Args:
            **kwargs: Arbitrary keyword arguments.
        """
        super().__init__(**kwargs)
        self._run_depends = []
        self._chat_history = []

    def get_final_result(
        self,
        input_data: dict[str, str],
        config: RunnableConfig = None,
        **kwargs,
    ) -> str:
        """
        Generate a comprehensive final result based on the provided data.

        Args:
            input_data (dict[str, str]): Input data for the manager.
            config (RunnableConfig): Configuration for the runnable.
            **kwargs: Additional keyword arguments.

        Returns:
            str: The final comprehensive result.

        Raises:
            OrchestratorError: If an error occurs while generating the final answer.
        """
        logger.debug(f"Orchestrator {self.name} - {self.id}: Running final summarizer")
        manager_result = self.manager.run(
            input_data={"action": "final", **input_data},
            config=config,
            run_depends=self._run_depends,
            **kwargs,
        )
        self._run_depends = [NodeDependency(node=self.manager).to_dict()]

        if manager_result.status != RunnableStatus.SUCCESS:
            error_message = f"Manager '{self.manager.name}' failed: {manager_result.output.get('content')}"
            logger.error(f"Orchestrator {self.name} - {self.id}: Error generating final, due to error: {error_message}")
            raise OrchestratorError(
                f"Orchestrator {self.name} - {self.id}: Error generating final, due to error: {error_message}"
            )

        return manager_result.output.get("content").get("result")

    def reset_run_state(self):
        self._run_depends = []
        self._chat_history = []

    @abstractmethod
    def run_flow(self, input_task: str, config: RunnableConfig = None, **kwargs) -> dict[str, Any]:
        """
        Process the given task.

        Args:
            input_task (str): The task to be processed.
            config (RunnableConfig): Configuration for the runnable.

        Returns:
            dict[str, Any]: The final output generated after processing the task.
        """
        pass

    @abstractmethod
    def setup_streaming(self) -> None:
        """Setups streaming for orchestrator."""
        pass

    def execute(self, input_data: OrchestratorInputSchema, config: RunnableConfig = None, **kwargs) -> dict:
        """
        Execute orchestrator flow.

        Args:
            input_data (OrchestratorInputSchema): The input data containing the objective.
            config (Optional[RunnableConfig]): Configuration for the runnable.
            **kwargs: Additional keyword arguments.

        Returns:
            dict[str, Any]: The result of the orchestration process.

        Raises:
            OrchestratorError: If the orchestration process fails.
        """
        logger.info(f"Orchestrator {self.name} - {self.id}: started with INPUT DATA:\n{input_data}")
        self.reset_run_state()
        config = ensure_config(config)
        self.run_on_node_execute_run(config.callbacks, **kwargs)

        input_task = input_data.input or self.objective

        kwargs = kwargs | {"parent_run_id": kwargs.get("run_id")}
        kwargs.pop("run_depends", None)

        if self.streaming.enabled:
            self.setup_streaming()

        output = self.run_flow(
            input_task=input_task,
            config=config,
            **kwargs,
        )

        logger.info(f"Orchestrator {self.name} - {self.id}: finished with RESULT:\n{str(output)[:200]}...")
        return output

    def _extract_output_content(self, text: str) -> str:
        """
        Extracts the content of the <output> tag. If a properly closed tag is not found,
        fall back to extracting everything after the first occurrence of <output>.
        """
        match = re.search(r"<output>(.*?)</output>", text, re.DOTALL)
        if match:
            return match.group(1).strip()

        start = text.find("<output>")
        if start != -1:
            fallback_content = text[start + len("<output>") :].strip()
            if fallback_content:
                return fallback_content
        raise ActionParseError("No <output> tags found in the response.")

    def _clean_content(self, content: str) -> LET.Element:
        """
        Clean and parse XML content by removing code block markers and wrapping in a root element.

        Args:
            content (str): The input string containing XML content, possibly with code block markers.

        Returns:
            LET.Element: A parsed XML element tree with the cleaned content wrapped in a root element.

        Note:
            - Removes triple backticks (```) from the content
            - Wraps the content in a <root> element for proper XML parsing
            - Uses a lenient XML parser that attempts to recover from malformed XML
        """
        cleaned_content = content.replace("```", "").strip()
        wrapped_content = f"<root>{cleaned_content}</root>"
        parser = LET.XMLParser(recover=True)  # nosec B320
        return LET.fromstring(wrapped_content, parser=parser)  # nosec B320

__init__(**kwargs)

Initialize the orchestrator with given parameters.

Parameters:

Name Type Description Default
**kwargs

Arbitrary keyword arguments.

{}
Source code in dynamiq/nodes/agents/orchestrators/orchestrator.py
50
51
52
53
54
55
56
57
58
59
def __init__(self, **kwargs):
    """
    Initialize the orchestrator with given parameters.

    Args:
        **kwargs: Arbitrary keyword arguments.
    """
    super().__init__(**kwargs)
    self._run_depends = []
    self._chat_history = []

execute(input_data, config=None, **kwargs)

Execute orchestrator flow.

Parameters:

Name Type Description Default
input_data OrchestratorInputSchema

The input data containing the objective.

required
config Optional[RunnableConfig]

Configuration for the runnable.

None
**kwargs

Additional keyword arguments.

{}

Returns:

Type Description
dict

dict[str, Any]: The result of the orchestration process.

Raises:

Type Description
OrchestratorError

If the orchestration process fails.

Source code in dynamiq/nodes/agents/orchestrators/orchestrator.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def execute(self, input_data: OrchestratorInputSchema, config: RunnableConfig = None, **kwargs) -> dict:
    """
    Execute orchestrator flow.

    Args:
        input_data (OrchestratorInputSchema): The input data containing the objective.
        config (Optional[RunnableConfig]): Configuration for the runnable.
        **kwargs: Additional keyword arguments.

    Returns:
        dict[str, Any]: The result of the orchestration process.

    Raises:
        OrchestratorError: If the orchestration process fails.
    """
    logger.info(f"Orchestrator {self.name} - {self.id}: started with INPUT DATA:\n{input_data}")
    self.reset_run_state()
    config = ensure_config(config)
    self.run_on_node_execute_run(config.callbacks, **kwargs)

    input_task = input_data.input or self.objective

    kwargs = kwargs | {"parent_run_id": kwargs.get("run_id")}
    kwargs.pop("run_depends", None)

    if self.streaming.enabled:
        self.setup_streaming()

    output = self.run_flow(
        input_task=input_task,
        config=config,
        **kwargs,
    )

    logger.info(f"Orchestrator {self.name} - {self.id}: finished with RESULT:\n{str(output)[:200]}...")
    return output

get_final_result(input_data, config=None, **kwargs)

Generate a comprehensive final result based on the provided data.

Parameters:

Name Type Description Default
input_data dict[str, str]

Input data for the manager.

required
config RunnableConfig

Configuration for the runnable.

None
**kwargs

Additional keyword arguments.

{}

Returns:

Name Type Description
str str

The final comprehensive result.

Raises:

Type Description
OrchestratorError

If an error occurs while generating the final answer.

Source code in dynamiq/nodes/agents/orchestrators/orchestrator.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def get_final_result(
    self,
    input_data: dict[str, str],
    config: RunnableConfig = None,
    **kwargs,
) -> str:
    """
    Generate a comprehensive final result based on the provided data.

    Args:
        input_data (dict[str, str]): Input data for the manager.
        config (RunnableConfig): Configuration for the runnable.
        **kwargs: Additional keyword arguments.

    Returns:
        str: The final comprehensive result.

    Raises:
        OrchestratorError: If an error occurs while generating the final answer.
    """
    logger.debug(f"Orchestrator {self.name} - {self.id}: Running final summarizer")
    manager_result = self.manager.run(
        input_data={"action": "final", **input_data},
        config=config,
        run_depends=self._run_depends,
        **kwargs,
    )
    self._run_depends = [NodeDependency(node=self.manager).to_dict()]

    if manager_result.status != RunnableStatus.SUCCESS:
        error_message = f"Manager '{self.manager.name}' failed: {manager_result.output.get('content')}"
        logger.error(f"Orchestrator {self.name} - {self.id}: Error generating final, due to error: {error_message}")
        raise OrchestratorError(
            f"Orchestrator {self.name} - {self.id}: Error generating final, due to error: {error_message}"
        )

    return manager_result.output.get("content").get("result")

run_flow(input_task, config=None, **kwargs) abstractmethod

Process the given task.

Parameters:

Name Type Description Default
input_task str

The task to be processed.

required
config RunnableConfig

Configuration for the runnable.

None

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The final output generated after processing the task.

Source code in dynamiq/nodes/agents/orchestrators/orchestrator.py
103
104
105
106
107
108
109
110
111
112
113
114
115
@abstractmethod
def run_flow(self, input_task: str, config: RunnableConfig = None, **kwargs) -> dict[str, Any]:
    """
    Process the given task.

    Args:
        input_task (str): The task to be processed.
        config (RunnableConfig): Configuration for the runnable.

    Returns:
        dict[str, Any]: The final output generated after processing the task.
    """
    pass

setup_streaming() abstractmethod

Setups streaming for orchestrator.

Source code in dynamiq/nodes/agents/orchestrators/orchestrator.py
117
118
119
120
@abstractmethod
def setup_streaming(self) -> None:
    """Setups streaming for orchestrator."""
    pass

OrchestratorError

Bases: Exception

Base exception for Orchestrator errors.

Source code in dynamiq/nodes/agents/orchestrators/orchestrator.py
15
16
class OrchestratorError(Exception):
    """Base exception for Orchestrator errors."""