Skip to content

Base

BaseCallbackHandler

Bases: NodeCallbackHandler, ABC

Abstract base class for general callback handlers.

Source code in dynamiq/callbacks/base.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
class BaseCallbackHandler(NodeCallbackHandler, ABC):
    """Abstract base class for general callback handlers."""

    def on_workflow_start(
        self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the workflow starts.

        Args:
            serialized (dict[str, Any]): Serialized workflow data.
            input_data (dict[str, Any]): Input data for the workflow.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_workflow_end(
        self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the workflow ends.

        Args:
            serialized (dict[str, Any]): Serialized workflow data.
            output_data (dict[str, Any]): Output data from the workflow.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_workflow_error(
        self, serialized: dict[str, Any], error: BaseException, **kwargs: Any
    ):
        """Called when the workflow errors.

        Args:
            serialized (dict[str, Any]): Serialized workflow data.
            error (BaseException): Error encountered.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_flow_start(
        self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the flow starts.

        Args:
            serialized (dict[str, Any]): Serialized flow data.
            input_data (dict[str, Any]): Input data for the flow.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_flow_end(
        self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the flow ends.

        Args:
            serialized (dict[str, Any]): Serialized flow data.
            output_data (dict[str, Any]): Output data from the flow.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_flow_error(
        self, serialized: dict[str, Any], error: BaseException, **kwargs: Any
    ):
        """Called when the flow errors.

        Args:
            serialized (dict[str, Any]): Serialized flow data.
            error (BaseException): Error encountered.
            **kwargs (Any): Additional arguments.
        """
        pass

on_flow_end(serialized, output_data, **kwargs)

Called when the flow ends.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized flow data.

required
output_data dict[str, Any]

Output data from the flow.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
153
154
155
156
157
158
159
160
161
162
163
def on_flow_end(
    self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any
):
    """Called when the flow ends.

    Args:
        serialized (dict[str, Any]): Serialized flow data.
        output_data (dict[str, Any]): Output data from the flow.
        **kwargs (Any): Additional arguments.
    """
    pass

on_flow_error(serialized, error, **kwargs)

Called when the flow errors.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized flow data.

required
error BaseException

Error encountered.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
165
166
167
168
169
170
171
172
173
174
175
def on_flow_error(
    self, serialized: dict[str, Any], error: BaseException, **kwargs: Any
):
    """Called when the flow errors.

    Args:
        serialized (dict[str, Any]): Serialized flow data.
        error (BaseException): Error encountered.
        **kwargs (Any): Additional arguments.
    """
    pass

on_flow_start(serialized, input_data, **kwargs)

Called when the flow starts.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized flow data.

required
input_data dict[str, Any]

Input data for the flow.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
141
142
143
144
145
146
147
148
149
150
151
def on_flow_start(
    self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
):
    """Called when the flow starts.

    Args:
        serialized (dict[str, Any]): Serialized flow data.
        input_data (dict[str, Any]): Input data for the flow.
        **kwargs (Any): Additional arguments.
    """
    pass

on_workflow_end(serialized, output_data, **kwargs)

Called when the workflow ends.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized workflow data.

required
output_data dict[str, Any]

Output data from the workflow.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
117
118
119
120
121
122
123
124
125
126
127
def on_workflow_end(
    self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any
):
    """Called when the workflow ends.

    Args:
        serialized (dict[str, Any]): Serialized workflow data.
        output_data (dict[str, Any]): Output data from the workflow.
        **kwargs (Any): Additional arguments.
    """
    pass

on_workflow_error(serialized, error, **kwargs)

Called when the workflow errors.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized workflow data.

required
error BaseException

Error encountered.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
129
130
131
132
133
134
135
136
137
138
139
def on_workflow_error(
    self, serialized: dict[str, Any], error: BaseException, **kwargs: Any
):
    """Called when the workflow errors.

    Args:
        serialized (dict[str, Any]): Serialized workflow data.
        error (BaseException): Error encountered.
        **kwargs (Any): Additional arguments.
    """
    pass

on_workflow_start(serialized, input_data, **kwargs)

Called when the workflow starts.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized workflow data.

required
input_data dict[str, Any]

Input data for the workflow.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
105
106
107
108
109
110
111
112
113
114
115
def on_workflow_start(
    self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
):
    """Called when the workflow starts.

    Args:
        serialized (dict[str, Any]): Serialized workflow data.
        input_data (dict[str, Any]): Input data for the workflow.
        **kwargs (Any): Additional arguments.
    """
    pass

NodeCallbackHandler

Bases: ABC

Abstract class for node callback handlers.

Source code in dynamiq/callbacks/base.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class NodeCallbackHandler(ABC):
    """Abstract class for node callback handlers."""

    def on_node_start(self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any):
        """Called when the node starts.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            input_data (dict[str, Any]): Input data for the node.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_node_end(self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any):
        """Called when the node ends.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            output_data (dict[str, Any]): Output data from the node.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_node_error(self, serialized: dict[str, Any], error: BaseException, **kwargs: Any):
        """Called when the node errors.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            error (BaseException): Error encountered.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_node_execute_start(self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any):
        """Called when the node execute starts.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            input_data (dict[str, Any]): Input data for the node.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_node_execute_end(self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any):
        """Called when the node execute ends.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            output_data (dict[str, Any]): Output data from the node.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_node_execute_error(self, serialized: dict[str, Any], error: BaseException, **kwargs: Any):
        """Called when the node execute errors.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            error (BaseException): Error encountered.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_node_execute_run(self, serialized: dict[str, Any], **kwargs: Any):
        """Called when the node execute runs.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_node_execute_stream(self, serialized: dict[str, Any], chunk: dict[str, Any] | None = None, **kwargs: Any):
        """Called when the node execute streams.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            chunk (dict[str, Any] | None): Stream chunk data.
            **kwargs (Any): Additional arguments.
        """
        pass

    def on_node_skip(
        self, serialized: dict[str, Any], skip_data: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the node skips.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            skip_data (dict[str, Any]): Data related to the skip.
            input_data (dict[str, Any]): Input data for the node.
            **kwargs (Any): Additional arguments.
        """
        pass

on_node_end(serialized, output_data, **kwargs)

Called when the node ends.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
output_data dict[str, Any]

Output data from the node.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
19
20
21
22
23
24
25
26
27
def on_node_end(self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any):
    """Called when the node ends.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        output_data (dict[str, Any]): Output data from the node.
        **kwargs (Any): Additional arguments.
    """
    pass

on_node_error(serialized, error, **kwargs)

Called when the node errors.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
error BaseException

Error encountered.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
29
30
31
32
33
34
35
36
37
def on_node_error(self, serialized: dict[str, Any], error: BaseException, **kwargs: Any):
    """Called when the node errors.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        error (BaseException): Error encountered.
        **kwargs (Any): Additional arguments.
    """
    pass

on_node_execute_end(serialized, output_data, **kwargs)

Called when the node execute ends.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
output_data dict[str, Any]

Output data from the node.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
49
50
51
52
53
54
55
56
57
def on_node_execute_end(self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any):
    """Called when the node execute ends.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        output_data (dict[str, Any]): Output data from the node.
        **kwargs (Any): Additional arguments.
    """
    pass

on_node_execute_error(serialized, error, **kwargs)

Called when the node execute errors.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
error BaseException

Error encountered.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
59
60
61
62
63
64
65
66
67
def on_node_execute_error(self, serialized: dict[str, Any], error: BaseException, **kwargs: Any):
    """Called when the node execute errors.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        error (BaseException): Error encountered.
        **kwargs (Any): Additional arguments.
    """
    pass

on_node_execute_run(serialized, **kwargs)

Called when the node execute runs.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
69
70
71
72
73
74
75
76
def on_node_execute_run(self, serialized: dict[str, Any], **kwargs: Any):
    """Called when the node execute runs.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        **kwargs (Any): Additional arguments.
    """
    pass

on_node_execute_start(serialized, input_data, **kwargs)

Called when the node execute starts.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
input_data dict[str, Any]

Input data for the node.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
39
40
41
42
43
44
45
46
47
def on_node_execute_start(self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any):
    """Called when the node execute starts.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        input_data (dict[str, Any]): Input data for the node.
        **kwargs (Any): Additional arguments.
    """
    pass

on_node_execute_stream(serialized, chunk=None, **kwargs)

Called when the node execute streams.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
chunk dict[str, Any] | None

Stream chunk data.

None
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
78
79
80
81
82
83
84
85
86
def on_node_execute_stream(self, serialized: dict[str, Any], chunk: dict[str, Any] | None = None, **kwargs: Any):
    """Called when the node execute streams.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        chunk (dict[str, Any] | None): Stream chunk data.
        **kwargs (Any): Additional arguments.
    """
    pass

on_node_skip(serialized, skip_data, input_data, **kwargs)

Called when the node skips.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
skip_data dict[str, Any]

Data related to the skip.

required
input_data dict[str, Any]

Input data for the node.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
88
89
90
91
92
93
94
95
96
97
98
99
def on_node_skip(
    self, serialized: dict[str, Any], skip_data: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
):
    """Called when the node skips.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        skip_data (dict[str, Any]): Data related to the skip.
        input_data (dict[str, Any]): Input data for the node.
        **kwargs (Any): Additional arguments.
    """
    pass

on_node_start(serialized, input_data, **kwargs)

Called when the node starts.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
input_data dict[str, Any]

Input data for the node.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/base.py
 9
10
11
12
13
14
15
16
17
def on_node_start(self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any):
    """Called when the node starts.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        input_data (dict[str, Any]): Input data for the node.
        **kwargs (Any): Additional arguments.
    """
    pass

get_entity_id(entity_name, kwargs)

Retrieve entity ID from kwargs.

Parameters:

Name Type Description Default
entity_name str

Name of the entity.

required
kwargs dict

Keyword arguments.

required

Returns:

Name Type Description
UUID UUID

Entity ID.

Raises:

Type Description
ValueError

If entity ID is not found or invalid.

Source code in dynamiq/callbacks/base.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def get_entity_id(entity_name: str, kwargs: dict) -> UUID:
    """Retrieve entity ID from kwargs.

    Args:
        entity_name (str): Name of the entity.
        kwargs (dict): Keyword arguments.

    Returns:
        UUID: Entity ID.

    Raises:
        ValueError: If entity ID is not found or invalid.
    """
    entity_id = kwargs.get(entity_name)
    if not entity_id:
        raise ValueError(f"{entity_name} not found")

    if isinstance(entity_id, UUID):
        return entity_id
    elif isinstance(entity_id, str):
        return UUID(entity_id)

    raise ValueError(f"{entity_name} is not UUID or str")

get_execution_run_id(kwargs)

Retrieve execution run ID from kwargs.

Parameters:

Name Type Description Default
kwargs dict

Keyword arguments.

required

Returns:

Name Type Description
UUID UUID

Execution run ID.

Source code in dynamiq/callbacks/base.py
227
228
229
230
231
232
233
234
235
236
def get_execution_run_id(kwargs: dict) -> UUID:
    """Retrieve execution run ID from kwargs.

    Args:
        kwargs (dict): Keyword arguments.

    Returns:
        UUID: Execution run ID.
    """
    return get_entity_id("execution_run_id", kwargs)

get_parent_run_id(kwargs)

Retrieve parent run ID from kwargs.

Parameters:

Name Type Description Default
kwargs dict

Keyword arguments.

required

Returns:

Name Type Description
UUID UUID

Parent run ID.

Source code in dynamiq/callbacks/base.py
215
216
217
218
219
220
221
222
223
224
def get_parent_run_id(kwargs: dict) -> UUID:
    """Retrieve parent run ID from kwargs.

    Args:
        kwargs (dict): Keyword arguments.

    Returns:
        UUID: Parent run ID.
    """
    return get_entity_id("parent_run_id", kwargs)

get_run_id(kwargs)

Retrieve run ID from kwargs.

Parameters:

Name Type Description Default
kwargs dict

Keyword arguments.

required

Returns:

Name Type Description
UUID UUID

Run ID.

Source code in dynamiq/callbacks/base.py
203
204
205
206
207
208
209
210
211
212
def get_run_id(kwargs: dict) -> UUID:
    """Retrieve run ID from kwargs.

    Args:
        kwargs (dict): Keyword arguments.

    Returns:
        UUID: Run ID.
    """
    return get_entity_id("run_id", kwargs)