Skip to content

Tracing

ExecutionRun dataclass

Data class for execution run details.

Attributes:

Name Type Description
id UUID

Execution run ID.

start_time datetime

Start time of the execution.

end_time datetime | None

End time of the execution.

status RunStatus | None

Status of the execution.

input Any | None

Input data for the execution.

output Any | None

Output data from the execution.

error Any | None

Error details if any.

metadata dict

Additional metadata.

Source code in dynamiq/callbacks/tracing.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass
class ExecutionRun:
    """Data class for execution run details.

    Attributes:
        id (UUID): Execution run ID.
        start_time (datetime): Start time of the execution.
        end_time (datetime | None): End time of the execution.
        status (RunStatus | None): Status of the execution.
        input (Any | None): Input data for the execution.
        output (Any | None): Output data from the execution.
        error (Any | None): Error details if any.
        metadata (dict): Additional metadata.
    """
    id: UUID
    start_time: datetime
    end_time: datetime | None = None
    status: RunStatus | None = None
    input: Any | None = None
    output: Any | None = None
    error: Any | None = None
    metadata: dict = field(default_factory=dict)

    def to_dict(self) -> dict:
        """Convert ExecutionRun to dictionary.

        Returns:
            dict: Dictionary representation of ExecutionRun.
        """
        return asdict(self)

to_dict()

Convert ExecutionRun to dictionary.

Returns:

Name Type Description
dict dict

Dictionary representation of ExecutionRun.

Source code in dynamiq/callbacks/tracing.py
58
59
60
61
62
63
64
def to_dict(self) -> dict:
    """Convert ExecutionRun to dictionary.

    Returns:
        dict: Dictionary representation of ExecutionRun.
    """
    return asdict(self)

Run dataclass

Data class for run details.

Attributes:

Name Type Description
id UUID

Run ID.

name str

Name of the run.

type RunType

Type of the run.

trace_id UUID | str

Trace ID.

source_id UUID | str

Source ID.

session_id UUID | str

Session ID.

start_time datetime

Start time of the run.

end_time datetime

End time of the run.

parent_run_id UUID

Parent run ID.

status RunStatus

Status of the run.

input Any

Input data for the run.

output Any

Output data from the run.

metadata Any

Additional metadata.

error Any

Error details if any.

executions list[ExecutionRun]

List of execution runs.

tags list[str]

List of tags.

Source code in dynamiq/callbacks/tracing.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@dataclass
class Run:
    """Data class for run details.

    Attributes:
        id (UUID): Run ID.
        name (str): Name of the run.
        type (RunType): Type of the run.
        trace_id (UUID | str): Trace ID.
        source_id (UUID | str): Source ID.
        session_id (UUID | str): Session ID.
        start_time (datetime): Start time of the run.
        end_time (datetime): End time of the run.
        parent_run_id (UUID): Parent run ID.
        status (RunStatus): Status of the run.
        input (Any): Input data for the run.
        output (Any): Output data from the run.
        metadata (Any): Additional metadata.
        error (Any): Error details if any.
        executions (list[ExecutionRun]): List of execution runs.
        tags (list[str]): List of tags.
    """
    id: UUID
    name: str
    type: RunType
    trace_id: UUID | str
    source_id: UUID | str
    session_id: UUID | str
    start_time: datetime
    end_time: datetime = None
    parent_run_id: UUID = None
    status: RunStatus = None
    input: Any = None
    output: Any = None
    metadata: Any = None
    error: Any = None
    executions: list[ExecutionRun] = field(default_factory=list)
    tags: list[str] = field(default_factory=list)

    def to_dict(self) -> dict:
        """Convert Run to dictionary.

        Returns:
            dict: Dictionary representation of Run.
        """
        return asdict(self)

    def to_json(self) -> str:
        """Convert Run to JSON string.

        Returns:
            str: JSON string representation of Run.
        """
        return json.dumps(self.to_dict(), cls=JsonWorkflowEncoder)

to_dict()

Convert Run to dictionary.

Returns:

Name Type Description
dict dict

Dictionary representation of Run.

Source code in dynamiq/callbacks/tracing.py
106
107
108
109
110
111
112
def to_dict(self) -> dict:
    """Convert Run to dictionary.

    Returns:
        dict: Dictionary representation of Run.
    """
    return asdict(self)

to_json()

Convert Run to JSON string.

Returns:

Name Type Description
str str

JSON string representation of Run.

Source code in dynamiq/callbacks/tracing.py
114
115
116
117
118
119
120
def to_json(self) -> str:
    """Convert Run to JSON string.

    Returns:
        str: JSON string representation of Run.
    """
    return json.dumps(self.to_dict(), cls=JsonWorkflowEncoder)

RunStatus

Bases: str, Enum

Enumeration for run statuses.

Source code in dynamiq/callbacks/tracing.py
21
22
23
24
25
class RunStatus(str, Enum):
    """Enumeration for run statuses."""
    SUCCEEDED = "succeeded"
    FAILED = "failed"
    SKIPPED = "skipped"

RunType

Bases: str, Enum

Enumeration for run types.

Source code in dynamiq/callbacks/tracing.py
28
29
30
31
32
class RunType(str, Enum):
    """Enumeration for run types."""
    WORKFLOW = "workflow"
    FLOW = "flow"
    NODE = "node"

TracingCallbackHandler

Bases: BaseModel, BaseCallbackHandler

Callback handler for tracing workflow events.

Attributes:

Name Type Description
source_id str | None

Source ID.

trace_id str | None

Trace ID.

session_id str | None

Session ID.

client BaseTracingClient | None

Tracing client.

runs dict[UUID, Run]

Dictionary of runs.

tags list[str]

List of tags.

installed_pkgs list[str]

List of installed packages.

Source code in dynamiq/callbacks/tracing.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
class TracingCallbackHandler(BaseModel, BaseCallbackHandler):
    """Callback handler for tracing workflow events.

    Attributes:
        source_id (str | None): Source ID.
        trace_id (str | None): Trace ID.
        session_id (str | None): Session ID.
        client (BaseTracingClient | None): Tracing client.
        runs (dict[UUID, Run]): Dictionary of runs.
        tags (list[str]): List of tags.
        installed_pkgs (list[str]): List of installed packages.
    """
    source_id: str | None = Field(default_factory=generate_uuid)
    trace_id: str | None = Field(default_factory=generate_uuid)
    session_id: str | None = Field(default_factory=generate_uuid)
    client: BaseTracingClient | None = None
    runs: dict[UUID, Run] = {}
    tags: list[str] = []
    metadata: dict = {}

    installed_pkgs: list[str] = Field(
        ["dynamiq"],
        description="List of installed packages to include in the host information.",
    )

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @cached_property
    def host(self) -> dict:
        """Get host information.

        Returns:
            dict: Host information including installed packages.
        """
        return {
            "installed_pkgs": [
                {"name": dist.metadata["Name"], "version": dist.version}
                for dist in distributions()
                if dist.metadata.get("Name") in self.installed_pkgs
            ],
        }

    def _get_node_base_run(self, serialized: dict[str, Any], **kwargs: Any) -> Run:
        """Get base run details for a node.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            **kwargs (Any): Additional arguments.

        Returns:
            Run: Base run details for the node.
        """
        run_id = get_run_id(kwargs)
        parent_run_id = get_parent_run_id(kwargs)

        from dynamiq.nodes import NodeGroup

        # Handle runtime LLM prompt override
        if serialized.get("group") == NodeGroup.LLMS:
            prompt = kwargs.get("prompt") or serialized.get("prompt")
            if isinstance(prompt, BaseModel):
                prompt = prompt.model_dump()
            serialized["prompt"] = prompt

        truncate_metadata = {}
        formatted_input, truncate_metadata.setdefault("truncated", {})["input"] = format_value(
            kwargs.get("input_data"), truncate_enabled=True
        )
        run = Run(
            id=run_id,
            name=serialized.get("name"),
            type=RunType.NODE,
            trace_id=self.trace_id,
            source_id=self.source_id,
            session_id=self.session_id,
            start_time=datetime.now(UTC),
            parent_run_id=parent_run_id,
            metadata={
                "node": serialized,
                "run_depends": kwargs.get("run_depends", []),
                "host": self.host,
                **self.metadata,
                **truncate_metadata,
            },
            tags=self.tags,
            input=formatted_input,
        )
        return run

    def on_workflow_start(
        self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the workflow starts.

        Args:
            serialized (dict[str, Any]): Serialized workflow data.
            input_data (dict[str, Any]): Input data for the workflow.
            **kwargs (Any): Additional arguments.
        """
        run_id = get_run_id(kwargs)

        truncate_metadata = {}
        formatted_input, truncate_metadata.setdefault("truncated", {})["input"] = format_value(
            input_data, truncate_enabled=True
        )
        self.runs[run_id] = Run(
            id=run_id,
            name="Workflow",
            type=RunType.WORKFLOW,
            trace_id=self.trace_id,
            source_id=self.source_id,
            session_id=self.session_id,
            start_time=datetime.now(UTC),
            input=formatted_input,
            metadata={
                "workflow": {"id": serialized.get("id"), "version": serialized.get("version")},
                "host": self.host,
                **self.metadata,
                **truncate_metadata,
            },
            tags=self.tags,
        )

    def on_workflow_end(
        self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the workflow ends.

        Args:
            serialized (dict[str, Any]): Serialized workflow data.
            output_data (dict[str, Any]): Output data from the workflow.
            **kwargs (Any): Additional arguments.
        """
        run = ensure_run(get_run_id(kwargs), self.runs)
        run.end_time = datetime.now(UTC)
        run.output, run.metadata.setdefault("truncated", {})["output"] = format_value(
            output_data, truncate_enabled=True
        )
        run.status = RunStatus.SUCCEEDED

        self.flush()

    def on_workflow_error(
        self, serialized: dict[str, Any], error: BaseException, **kwargs: Any
    ):
        """Called when the workflow errors.

        Args:
            serialized (dict[str, Any]): Serialized workflow data.
            error (BaseException): Error encountered.
            **kwargs (Any): Additional arguments.
        """
        run = ensure_run(get_run_id(kwargs), self.runs)
        run.end_time = datetime.now(UTC)
        run.status = RunStatus.FAILED
        run.error = {
            "message": str(error),
            "traceback": traceback.format_exc(),
        }

    def on_flow_start(
        self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the flow starts.

        Args:
            serialized (dict[str, Any]): Serialized flow data.
            input_data (dict[str, Any]): Input data for the flow.
            **kwargs (Any): Additional arguments.
        """
        run_id = get_run_id(kwargs)
        parent_run_id = get_parent_run_id(kwargs)
        truncate_metadata = {}
        formatted_input, truncate_metadata.setdefault("truncated", {})["input"] = format_value(
            input_data, truncate_enabled=True
        )

        self.runs[run_id] = Run(
            id=run_id,
            name="Flow",
            type=RunType.FLOW,
            trace_id=self.trace_id,
            source_id=self.source_id,
            session_id=self.session_id,
            start_time=datetime.now(UTC),
            parent_run_id=parent_run_id,
            input=formatted_input,
            metadata={"flow": {"id": serialized.get("id")}, "host": self.host, **self.metadata, **truncate_metadata},
            tags=self.tags,
        )

    def on_flow_end(
        self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the flow ends.

        Args:
            serialized (dict[str, Any]): Serialized flow data.
            output_data (dict[str, Any]): Output data from the flow.
            **kwargs (Any): Additional arguments.
        """
        run = ensure_run(get_run_id(kwargs), self.runs)
        run.end_time = datetime.now(UTC)
        run.output, run.metadata.setdefault("truncated", {})["output"] = format_value(
            output_data, truncate_enabled=True
        )
        run.status = RunStatus.SUCCEEDED
        # If parent_run_id is None, the run is the highest in the execution tree
        if run.parent_run_id is None:
            self.flush()

    def on_flow_error(
        self, serialized: dict[str, Any], error: BaseException, **kwargs: Any
    ):
        """Called when the flow errors.

        Args:
            serialized (dict[str, Any]): Serialized flow data.
            error (BaseException): Error encountered.
            **kwargs (Any): Additional arguments.
        """
        run = ensure_run(get_run_id(kwargs), self.runs)
        run.end_time = datetime.now(UTC)
        run.status = RunStatus.FAILED
        run.error = {
            "message": str(error),
            "traceback": traceback.format_exc(),
        }

    def on_node_start(
        self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the node starts.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            input_data (dict[str, Any]): Input data for the node.
            **kwargs (Any): Additional arguments.
        """
        run_id = get_run_id(kwargs)
        run = self._get_node_base_run(serialized, **kwargs)
        run.input, run.metadata.setdefault("truncated", {})["input"] = format_value(input_data, truncate_enabled=True)
        self.runs[run_id] = run

    def on_node_end(
        self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the node ends.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            output_data (dict[str, Any]): Output data from the node.
            **kwargs (Any): Additional arguments.
        """
        run = ensure_run(get_run_id(kwargs), self.runs)
        run.end_time = datetime.now(UTC)
        run.output, run.metadata.setdefault("truncated", {})["output"] = format_value(
            output_data, truncate_enabled=True
        )
        run.status = RunStatus.SUCCEEDED
        run.metadata["is_output_from_cache"] = kwargs.get("is_output_from_cache", False)
        # If parent_run_id is None, the run is the highest in the execution tree
        if run.parent_run_id is None:
            self.flush()

    def on_node_error(
        self, serialized: dict[str, Any], error: BaseException, **kwargs: Any
    ):
        """Called when the node errors.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            error (BaseException): Error encountered.
            **kwargs (Any): Additional arguments.
        """
        run_id = get_run_id(kwargs)
        if (run := self.runs.get(run_id)) is None:
            run = self._get_node_base_run(serialized, **kwargs)
            self.runs[run_id] = run

        run.end_time = datetime.now(UTC)
        run.status = RunStatus.FAILED
        run.error = {
            "message": str(error),
            "traceback": traceback.format_exc(),
        }

    def on_node_skip(
        self,
        serialized: dict[str, Any],
        skip_data: dict[str, Any],
        input_data: dict[str, Any],
        **kwargs: Any,
    ):
        """Called when the node skips.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            skip_data (dict[str, Any]): Data related to the skip.
            input_data (dict[str, Any]): Input data for the node.
            **kwargs (Any): Additional arguments.
        """
        run_id = get_run_id(kwargs)
        if (run := self.runs.get(run_id)) is None:
            run = self._get_node_base_run(serialized, **kwargs)
            self.runs[run_id] = run

        run.input, run.metadata.setdefault("truncated", {})["input"] = format_value(input_data, truncate_enabled=True)
        run.end_time = run.start_time
        run.status = RunStatus.SKIPPED
        run.metadata["skip"] = format_value(skip_data)[0]

    def on_node_execute_start(
        self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the node execute starts.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            input_data (dict[str, Any]): Input data for the node.
            **kwargs (Any): Additional arguments.
        """
        run = ensure_run(get_run_id(kwargs), self.runs)
        execution_run_id = get_execution_run_id(kwargs)
        truncate_metadata = {}
        formatted_input, truncate_metadata.setdefault("truncated", {})["input"] = format_value(
            input_data, truncate_enabled=True
        )
        execution = ExecutionRun(
            id=execution_run_id,
            start_time=datetime.now(UTC),
            input=formatted_input,
            metadata=truncate_metadata,
        )
        run.executions.append(execution)

    def on_node_execute_end(
        self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any
    ):
        """Called when the node execute ends.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            output_data (dict[str, Any]): Output data from the node.
            **kwargs (Any): Additional arguments.
        """
        run = ensure_run(get_run_id(kwargs), self.runs)
        execution = ensure_execution_run(get_execution_run_id(kwargs), run.executions)
        execution.end_time = datetime.now(UTC)
        execution.output, execution.metadata.setdefault("truncated", {})["output"] = format_value(
            output_data, truncate_enabled=True
        )
        execution.status = RunStatus.SUCCEEDED

    def on_node_execute_error(
        self, serialized: dict[str, Any], error: BaseException, **kwargs: Any
    ):
        """Called when the node execute errors.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            error (BaseException): Error encountered.
            **kwargs (Any): Additional arguments.
        """
        run = ensure_run(get_run_id(kwargs), self.runs)
        execution = ensure_execution_run(get_execution_run_id(kwargs), run.executions)
        execution.end_time = datetime.now(UTC)
        execution.status = RunStatus.FAILED
        execution.error = {
            "message": str(error),
            "traceback": traceback.format_exc(),
        }

    def on_node_execute_run(self, serialized: dict[str, Any], **kwargs: Any):
        """Called when the node execute runs.

        Args:
            serialized (dict[str, Any]): Serialized node data.
            **kwargs (Any): Additional arguments.
        """
        run = ensure_run(get_run_id(kwargs), self.runs)
        if usage := kwargs.get("usage_data"):
            run.metadata["usage"] = usage

        if prompt_messages := kwargs.get("prompt_messages"):
            run.metadata["node"]["prompt"]["messages"] = prompt_messages

    def flush(self):
        """Flush the runs to the tracing client."""
        if self.client:
            self.client.trace([run for run in self.runs.values()])

host: dict cached property

Get host information.

Returns:

Name Type Description
dict dict

Host information including installed packages.

flush()

Flush the runs to the tracing client.

Source code in dynamiq/callbacks/tracing.py
510
511
512
513
def flush(self):
    """Flush the runs to the tracing client."""
    if self.client:
        self.client.trace([run for run in self.runs.values()])

on_flow_end(serialized, output_data, **kwargs)

Called when the flow ends.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized flow data.

required
output_data dict[str, Any]

Output data from the flow.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def on_flow_end(
    self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any
):
    """Called when the flow ends.

    Args:
        serialized (dict[str, Any]): Serialized flow data.
        output_data (dict[str, Any]): Output data from the flow.
        **kwargs (Any): Additional arguments.
    """
    run = ensure_run(get_run_id(kwargs), self.runs)
    run.end_time = datetime.now(UTC)
    run.output, run.metadata.setdefault("truncated", {})["output"] = format_value(
        output_data, truncate_enabled=True
    )
    run.status = RunStatus.SUCCEEDED
    # If parent_run_id is None, the run is the highest in the execution tree
    if run.parent_run_id is None:
        self.flush()

on_flow_error(serialized, error, **kwargs)

Called when the flow errors.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized flow data.

required
error BaseException

Error encountered.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def on_flow_error(
    self, serialized: dict[str, Any], error: BaseException, **kwargs: Any
):
    """Called when the flow errors.

    Args:
        serialized (dict[str, Any]): Serialized flow data.
        error (BaseException): Error encountered.
        **kwargs (Any): Additional arguments.
    """
    run = ensure_run(get_run_id(kwargs), self.runs)
    run.end_time = datetime.now(UTC)
    run.status = RunStatus.FAILED
    run.error = {
        "message": str(error),
        "traceback": traceback.format_exc(),
    }

on_flow_start(serialized, input_data, **kwargs)

Called when the flow starts.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized flow data.

required
input_data dict[str, Any]

Input data for the flow.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def on_flow_start(
    self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
):
    """Called when the flow starts.

    Args:
        serialized (dict[str, Any]): Serialized flow data.
        input_data (dict[str, Any]): Input data for the flow.
        **kwargs (Any): Additional arguments.
    """
    run_id = get_run_id(kwargs)
    parent_run_id = get_parent_run_id(kwargs)
    truncate_metadata = {}
    formatted_input, truncate_metadata.setdefault("truncated", {})["input"] = format_value(
        input_data, truncate_enabled=True
    )

    self.runs[run_id] = Run(
        id=run_id,
        name="Flow",
        type=RunType.FLOW,
        trace_id=self.trace_id,
        source_id=self.source_id,
        session_id=self.session_id,
        start_time=datetime.now(UTC),
        parent_run_id=parent_run_id,
        input=formatted_input,
        metadata={"flow": {"id": serialized.get("id")}, "host": self.host, **self.metadata, **truncate_metadata},
        tags=self.tags,
    )

on_node_end(serialized, output_data, **kwargs)

Called when the node ends.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
output_data dict[str, Any]

Output data from the node.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def on_node_end(
    self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any
):
    """Called when the node ends.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        output_data (dict[str, Any]): Output data from the node.
        **kwargs (Any): Additional arguments.
    """
    run = ensure_run(get_run_id(kwargs), self.runs)
    run.end_time = datetime.now(UTC)
    run.output, run.metadata.setdefault("truncated", {})["output"] = format_value(
        output_data, truncate_enabled=True
    )
    run.status = RunStatus.SUCCEEDED
    run.metadata["is_output_from_cache"] = kwargs.get("is_output_from_cache", False)
    # If parent_run_id is None, the run is the highest in the execution tree
    if run.parent_run_id is None:
        self.flush()

on_node_error(serialized, error, **kwargs)

Called when the node errors.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
error BaseException

Error encountered.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def on_node_error(
    self, serialized: dict[str, Any], error: BaseException, **kwargs: Any
):
    """Called when the node errors.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        error (BaseException): Error encountered.
        **kwargs (Any): Additional arguments.
    """
    run_id = get_run_id(kwargs)
    if (run := self.runs.get(run_id)) is None:
        run = self._get_node_base_run(serialized, **kwargs)
        self.runs[run_id] = run

    run.end_time = datetime.now(UTC)
    run.status = RunStatus.FAILED
    run.error = {
        "message": str(error),
        "traceback": traceback.format_exc(),
    }

on_node_execute_end(serialized, output_data, **kwargs)

Called when the node execute ends.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
output_data dict[str, Any]

Output data from the node.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
def on_node_execute_end(
    self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any
):
    """Called when the node execute ends.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        output_data (dict[str, Any]): Output data from the node.
        **kwargs (Any): Additional arguments.
    """
    run = ensure_run(get_run_id(kwargs), self.runs)
    execution = ensure_execution_run(get_execution_run_id(kwargs), run.executions)
    execution.end_time = datetime.now(UTC)
    execution.output, execution.metadata.setdefault("truncated", {})["output"] = format_value(
        output_data, truncate_enabled=True
    )
    execution.status = RunStatus.SUCCEEDED

on_node_execute_error(serialized, error, **kwargs)

Called when the node execute errors.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
error BaseException

Error encountered.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def on_node_execute_error(
    self, serialized: dict[str, Any], error: BaseException, **kwargs: Any
):
    """Called when the node execute errors.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        error (BaseException): Error encountered.
        **kwargs (Any): Additional arguments.
    """
    run = ensure_run(get_run_id(kwargs), self.runs)
    execution = ensure_execution_run(get_execution_run_id(kwargs), run.executions)
    execution.end_time = datetime.now(UTC)
    execution.status = RunStatus.FAILED
    execution.error = {
        "message": str(error),
        "traceback": traceback.format_exc(),
    }

on_node_execute_run(serialized, **kwargs)

Called when the node execute runs.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
496
497
498
499
500
501
502
503
504
505
506
507
508
def on_node_execute_run(self, serialized: dict[str, Any], **kwargs: Any):
    """Called when the node execute runs.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        **kwargs (Any): Additional arguments.
    """
    run = ensure_run(get_run_id(kwargs), self.runs)
    if usage := kwargs.get("usage_data"):
        run.metadata["usage"] = usage

    if prompt_messages := kwargs.get("prompt_messages"):
        run.metadata["node"]["prompt"]["messages"] = prompt_messages

on_node_execute_start(serialized, input_data, **kwargs)

Called when the node execute starts.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
input_data dict[str, Any]

Input data for the node.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def on_node_execute_start(
    self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
):
    """Called when the node execute starts.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        input_data (dict[str, Any]): Input data for the node.
        **kwargs (Any): Additional arguments.
    """
    run = ensure_run(get_run_id(kwargs), self.runs)
    execution_run_id = get_execution_run_id(kwargs)
    truncate_metadata = {}
    formatted_input, truncate_metadata.setdefault("truncated", {})["input"] = format_value(
        input_data, truncate_enabled=True
    )
    execution = ExecutionRun(
        id=execution_run_id,
        start_time=datetime.now(UTC),
        input=formatted_input,
        metadata=truncate_metadata,
    )
    run.executions.append(execution)

on_node_skip(serialized, skip_data, input_data, **kwargs)

Called when the node skips.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
skip_data dict[str, Any]

Data related to the skip.

required
input_data dict[str, Any]

Input data for the node.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def on_node_skip(
    self,
    serialized: dict[str, Any],
    skip_data: dict[str, Any],
    input_data: dict[str, Any],
    **kwargs: Any,
):
    """Called when the node skips.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        skip_data (dict[str, Any]): Data related to the skip.
        input_data (dict[str, Any]): Input data for the node.
        **kwargs (Any): Additional arguments.
    """
    run_id = get_run_id(kwargs)
    if (run := self.runs.get(run_id)) is None:
        run = self._get_node_base_run(serialized, **kwargs)
        self.runs[run_id] = run

    run.input, run.metadata.setdefault("truncated", {})["input"] = format_value(input_data, truncate_enabled=True)
    run.end_time = run.start_time
    run.status = RunStatus.SKIPPED
    run.metadata["skip"] = format_value(skip_data)[0]

on_node_start(serialized, input_data, **kwargs)

Called when the node starts.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized node data.

required
input_data dict[str, Any]

Input data for the node.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def on_node_start(
    self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
):
    """Called when the node starts.

    Args:
        serialized (dict[str, Any]): Serialized node data.
        input_data (dict[str, Any]): Input data for the node.
        **kwargs (Any): Additional arguments.
    """
    run_id = get_run_id(kwargs)
    run = self._get_node_base_run(serialized, **kwargs)
    run.input, run.metadata.setdefault("truncated", {})["input"] = format_value(input_data, truncate_enabled=True)
    self.runs[run_id] = run

on_workflow_end(serialized, output_data, **kwargs)

Called when the workflow ends.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized workflow data.

required
output_data dict[str, Any]

Output data from the workflow.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def on_workflow_end(
    self, serialized: dict[str, Any], output_data: dict[str, Any], **kwargs: Any
):
    """Called when the workflow ends.

    Args:
        serialized (dict[str, Any]): Serialized workflow data.
        output_data (dict[str, Any]): Output data from the workflow.
        **kwargs (Any): Additional arguments.
    """
    run = ensure_run(get_run_id(kwargs), self.runs)
    run.end_time = datetime.now(UTC)
    run.output, run.metadata.setdefault("truncated", {})["output"] = format_value(
        output_data, truncate_enabled=True
    )
    run.status = RunStatus.SUCCEEDED

    self.flush()

on_workflow_error(serialized, error, **kwargs)

Called when the workflow errors.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized workflow data.

required
error BaseException

Error encountered.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def on_workflow_error(
    self, serialized: dict[str, Any], error: BaseException, **kwargs: Any
):
    """Called when the workflow errors.

    Args:
        serialized (dict[str, Any]): Serialized workflow data.
        error (BaseException): Error encountered.
        **kwargs (Any): Additional arguments.
    """
    run = ensure_run(get_run_id(kwargs), self.runs)
    run.end_time = datetime.now(UTC)
    run.status = RunStatus.FAILED
    run.error = {
        "message": str(error),
        "traceback": traceback.format_exc(),
    }

on_workflow_start(serialized, input_data, **kwargs)

Called when the workflow starts.

Parameters:

Name Type Description Default
serialized dict[str, Any]

Serialized workflow data.

required
input_data dict[str, Any]

Input data for the workflow.

required
**kwargs Any

Additional arguments.

{}
Source code in dynamiq/callbacks/tracing.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def on_workflow_start(
    self, serialized: dict[str, Any], input_data: dict[str, Any], **kwargs: Any
):
    """Called when the workflow starts.

    Args:
        serialized (dict[str, Any]): Serialized workflow data.
        input_data (dict[str, Any]): Input data for the workflow.
        **kwargs (Any): Additional arguments.
    """
    run_id = get_run_id(kwargs)

    truncate_metadata = {}
    formatted_input, truncate_metadata.setdefault("truncated", {})["input"] = format_value(
        input_data, truncate_enabled=True
    )
    self.runs[run_id] = Run(
        id=run_id,
        name="Workflow",
        type=RunType.WORKFLOW,
        trace_id=self.trace_id,
        source_id=self.source_id,
        session_id=self.session_id,
        start_time=datetime.now(UTC),
        input=formatted_input,
        metadata={
            "workflow": {"id": serialized.get("id"), "version": serialized.get("version")},
            "host": self.host,
            **self.metadata,
            **truncate_metadata,
        },
        tags=self.tags,
    )

ensure_execution_run(execution_run_id, executions)

Ensure the execution run exists in the executions list.

Parameters:

Name Type Description Default
execution_run_id UUID

Execution run ID.

required
executions list[ExecutionRun]

List of execution runs.

required

Returns:

Name Type Description
ExecutionRun ExecutionRun

The execution run corresponding to the execution run ID.

Raises:

Type Description
ValueError

If the execution run is not found.

Source code in dynamiq/callbacks/tracing.py
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
def ensure_execution_run(execution_run_id: UUID, executions: list[ExecutionRun]) -> ExecutionRun:
    """Ensure the execution run exists in the executions list.

    Args:
        execution_run_id (UUID): Execution run ID.
        executions (list[ExecutionRun]): List of execution runs.

    Returns:
        ExecutionRun: The execution run corresponding to the execution run ID.

    Raises:
        ValueError: If the execution run is not found.
    """
    for execution in executions:
        if execution.id == execution_run_id:
            return execution

    raise ValueError(f"execution run {execution_run_id} not found")

ensure_run(run_id, runs)

Ensure the run exists in the runs dictionary.

Parameters:

Name Type Description Default
run_id UUID

Run ID.

required
runs dict[UUID, Run]

Dictionary of runs.

required

Returns:

Name Type Description
Run Run

The run corresponding to the run ID.

Raises:

Type Description
ValueError

If the run is not found.

Source code in dynamiq/callbacks/tracing.py
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
def ensure_run(run_id: UUID, runs: dict[UUID, Run]) -> Run:
    """Ensure the run exists in the runs dictionary.

    Args:
        run_id (UUID): Run ID.
        runs (dict[UUID, Run]): Dictionary of runs.

    Returns:
        Run: The run corresponding to the run ID.

    Raises:
        ValueError: If the run is not found.
    """
    run = runs.get(run_id)
    if not run:
        raise ValueError(f"run {run_id} not found")

    return runs[run_id]