Skip to content

Base

BaseLLM

Bases: ConnectionNode

Base class for all LLM nodes.

Attributes:

Name Type Description
MODEL_PREFIX ClassVar[str | None]

Optional model prefix.

name str | None

Name of the LLM node. Defaults to "LLM".

model str

Model to use for the LLM.

prompt Prompt | None

Prompt to use for the LLM.

connection BaseConnection

Connection to use for the LLM.

group Literal[LLMS]

Group for the node. Defaults to NodeGroup.LLMS.

temperature float | None

Temperature for the LLM.

max_tokens int | None

Maximum number of tokens for the LLM.

stop list[str]

List of tokens to stop at for the LLM.

error_handling ErrorHandling

Error handling config. Defaults to ErrorHandling(timeout_seconds=600).

top_p float | None

Value to consider tokens with top_p probability.

seed int | None

Seed for generating the same result for repeated requests.

presence_penalty float | None

Penalize new tokens based on their existence in the text.

frequency_penalty float | None

Penalize new tokens based on their frequency in the text.

tool_choice str | None

Value to control which function is called by the model.

thinking_enabled bool

Enables advanced reasoning if set to True.

budget_tokens int

Maximum number of tokens allocated for thinking.

response_format dict[str, Any]

JSON schema that specifies the structure of the llm's output.

tools list[Tool]

List of tools that llm can call.

fallback FallbackConfig

Configuration for fallback behavior.

Source code in dynamiq/nodes/llms/base.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
class BaseLLM(ConnectionNode):
    """Base class for all LLM nodes.

    Attributes:
        MODEL_PREFIX (ClassVar[str | None]): Optional model prefix.
        name (str | None): Name of the LLM node. Defaults to "LLM".
        model (str): Model to use for the LLM.
        prompt (Prompt | None): Prompt to use for the LLM.
        connection (BaseConnection): Connection to use for the LLM.
        group (Literal[NodeGroup.LLMS]): Group for the node. Defaults to NodeGroup.LLMS.
        temperature (float | None): Temperature for the LLM.
        max_tokens (int | None): Maximum number of tokens for the LLM.
        stop (list[str]): List of tokens to stop at for the LLM.
        error_handling (ErrorHandling): Error handling config. Defaults to ErrorHandling(timeout_seconds=600).
        top_p (float | None): Value to consider tokens with top_p probability.
        seed (int | None): Seed for generating the same result for repeated requests.
        presence_penalty (float | None): Penalize new tokens based on their existence in the text.
        frequency_penalty (float | None): Penalize new tokens based on their frequency in the text.
        tool_choice (str | None): Value to control which function is called by the model.
        thinking_enabled (bool): Enables advanced reasoning if set to True.
        budget_tokens (int): Maximum number of tokens allocated for thinking.
        response_format (dict[str, Any]): JSON schema that specifies the structure of the llm's output.
        tools (list[Tool]): List of tools that llm can call.
        fallback (FallbackConfig): Configuration for fallback behavior.
    """

    MODEL_PREFIX: ClassVar[str | None] = None
    name: str | None = "LLM"
    model: str
    prompt: Prompt | None = None
    connection: BaseConnection
    group: Literal[NodeGroup.LLMS] = NodeGroup.LLMS
    temperature: float | None = None
    max_tokens: int | None = None
    stop: list[str] | None = None
    error_handling: ErrorHandling = Field(default_factory=lambda: ErrorHandling(timeout_seconds=600))
    top_p: float | None = None
    seed: int | None = None
    presence_penalty: float | None = None
    frequency_penalty: float | None = None
    tool_choice: str | None = None
    thinking_enabled: bool | None = None
    budget_tokens: int = 1024
    response_format: dict[str, Any] | None = None
    tools: list[Tool | dict] | None = None
    input_schema: ClassVar[type[BaseLLMInputSchema]] = BaseLLMInputSchema
    inference_mode: InferenceMode = Field(
        default=InferenceMode.DEFAULT,
        deprecated="Please use `tools` and `response_format` parameters "
        "for selecting between function calling and structured output.",
    )
    schema_: dict[str, Any] | type[BaseModel] | None = Field(
        None,
        description="Schema for structured output or function calling.",
        alias="schema",
        deprecated="Please use `tools` and `response_format` parameters "
        "for function calling and structured output respectively.",
    )
    fallback: FallbackConfig | None = Field(
        default=None,
        description="Configuration for fallback behavior including the fallback LLM.",
    )

    model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True)

    _completion: Callable = PrivateAttr()
    _stream_chunk_builder: Callable = PrivateAttr()
    _is_fallback_run: bool = PrivateAttr(default=False)
    _json_schema_fields: ClassVar[list[str]] = ["model", "temperature", "max_tokens", "prompt"]

    @classmethod
    def _generate_json_schema(cls, models: list[str], **kwargs) -> dict[str, Any]:
        """
        Generates full json schema of BaseLLM Node.

        This schema is designed for compatibility with the WorkflowYamlParser,
        containing enough partial information to instantiate an BaseLLM.
        Parameters name to be included in the schema are either defined in the _json_schema_fields class variable or
        passed via the fields parameter.

        It generates a schema using provided models.

        Args:
            models (list[str]): List of available models.
            **kwargs: Additional keyword arguments.

        Returns:
            dict[str, Any]: Generated json schema.
        """
        schema = super()._generate_json_schema(**kwargs)
        schema["properties"]["model"]["enum"] = models
        return schema

    @field_validator("model")
    @classmethod
    def set_model(cls, value: str | None) -> str:
        """Set the model with the appropriate prefix.

        Args:
            value (str | None): The model value.

        Returns:
            str: The model value with the prefix.
        """
        if cls.MODEL_PREFIX is not None and not value.startswith(cls.MODEL_PREFIX):
            value = f"{cls.MODEL_PREFIX}{value}"
        return value

    def __init__(self, **kwargs):
        """Initialize the BaseLLM instance.

        Args:
            **kwargs: Additional keyword arguments.
        """
        super().__init__(**kwargs)

        # Save a bit of loading time as litellm is slow
        from litellm import completion, stream_chunk_builder

        # Avoid the same imports multiple times and for future usage in execute
        self._completion = completion
        self._stream_chunk_builder = stream_chunk_builder

    def init_components(self, connection_manager=None):
        """Initialize components including fallback LLM if configured.

        Args:
            connection_manager: The connection manager for initializing connections.
        """
        super().init_components(connection_manager)
        if self.fallback and self.fallback.llm and self.fallback.llm.is_postponed_component_init:
            self.fallback.llm.init_components(connection_manager)

    @property
    def to_dict_exclude_params(self) -> dict:
        """Exclude fallback configuration during serialization."""
        return super().to_dict_exclude_params | {"fallback": True}

    def to_dict(self, **kwargs) -> dict:
        """Convert to dictionary representation."""
        data = super().to_dict(**kwargs)
        if self.fallback:
            data["fallback"] = self.fallback.model_dump(exclude={"llm": True})
            data["fallback"]["llm"] = self.fallback.llm.to_dict(**kwargs) if self.fallback.llm else None
        if self._is_fallback_run:
            data["is_fallback"] = True
        return data

    def reset_run_state(self):
        """Reset the run state of the LLM."""
        self._is_fallback_run = False

    def get_context_for_input_schema(self) -> dict:
        """Provides context for input schema that is required for proper validation."""
        return {"instance_prompt": self.prompt}

    def _get_litellm_model_info(self) -> dict[str, Any] | None:
        """Return litellm model info dict, or ``None`` if the model is unknown to litellm."""
        try:
            return get_model_info(model=self.model)
        except Exception:
            return None

    def get_token_limit(self) -> int:
        """Returns token limits of a llm.

        Returns:
            int: Number of tokens.
        """
        info = self._get_litellm_model_info()
        if info is not None:
            max_input = info.get("max_input_tokens")
            if max_input:
                return max_input

        try:
            return get_max_tokens(self.model)
        except Exception:
            logger.debug("Model %s not found in litellm, falling back to custom registry.", self.model)

        custom_max = model_registry.get_max_tokens(self.model)
        if custom_max is not None:
            return custom_max

        logger.warning(f"Model {self.model} not found in litellm or custom registry. Using default token limit.")
        return LLM_DEFAULT_MAX_TOKENS

    @property
    def is_vision_supported(self) -> bool:
        """Check if the LLM supports vision/image processing."""
        if self._get_litellm_model_info() is not None:
            try:
                return supports_vision(self.model)
            except Exception:
                return False
        custom = model_registry.supports_vision(self.model)
        return custom if custom is not None else False

    @property
    def is_pdf_input_supported(self) -> bool:
        """Check if the LLM supports PDF input."""
        if self._get_litellm_model_info() is not None:
            try:
                return supports_pdf_input(self.model)
            except Exception:
                return False
        custom = model_registry.supports_pdf_input(self.model)
        return custom if custom is not None else False

    def get_messages(
        self,
        prompt,
        input_data,
    ) -> list[dict]:
        """
        Format and filter message parameters based on provider requirements.
        Override this in provider-specific subclasses.
        """
        messages = prompt.format_messages(**dict(input_data))
        return messages

    @staticmethod
    def _extract_usage(completion: "ModelResponse") -> Any:
        """Extract usage payload from completion across response shapes."""
        model_extra = getattr(completion, "model_extra", None) or {}
        usage = getattr(completion, "usage", None) or model_extra.get("usage")
        if usage is None and hasattr(completion, "get"):
            usage = completion.get("usage")
        return usage

    @staticmethod
    def _usage_value(usage: Any, key: str, default: Any = None) -> Any:
        """Read a usage value from dict-like or object-like usage payloads."""
        if isinstance(usage, dict):
            return usage.get(key, default)
        return getattr(usage, key, default)

    @classmethod
    def get_usage_data(
        cls,
        model: str,
        completion: "ModelResponse",
    ) -> BaseLLMUsageData:
        """Get usage data for the LLM.

        This method generates usage data for the LLM based on the provided messages.

        Args:
            model (str): The model to use for generating the usage data.
            completion (ModelResponse): The completion response from the LLM.

        Returns:
            BaseLLMUsageData: A model containing the usage data for the LLM.
        """
        from litellm import cost_per_token

        usage = cls._extract_usage(completion=completion)
        prompt_tokens = cls._usage_value(usage, "prompt_tokens", 0) or 0
        completion_tokens = cls._usage_value(usage, "completion_tokens", 0) or 0
        total_tokens = (
            cls._usage_value(usage, "total_tokens", prompt_tokens + completion_tokens)
            or prompt_tokens + completion_tokens
        )
        cache_read_input_tokens = cls._usage_value(usage, "cache_read_input_tokens")
        cache_creation_input_tokens = cls._usage_value(usage, "cache_creation_input_tokens")

        try:
            cost_kwargs: dict[str, Any] = {
                "model": model,
                "prompt_tokens": prompt_tokens,
                "completion_tokens": completion_tokens,
            }
            if cache_read_input_tokens is not None:
                cost_kwargs["cache_read_input_tokens"] = cache_read_input_tokens
            if cache_creation_input_tokens is not None:
                cost_kwargs["cache_creation_input_tokens"] = cache_creation_input_tokens

            prompt_tokens_cost_usd, completion_tokens_cost_usd = cost_per_token(**cost_kwargs)
            total_tokens_cost_usd = prompt_tokens_cost_usd + completion_tokens_cost_usd
        except Exception:
            prompt_tokens_cost_usd, completion_tokens_cost_usd, total_tokens_cost_usd = None, None, None

        return BaseLLMUsageData(
            prompt_tokens=prompt_tokens,
            prompt_tokens_cost_usd=prompt_tokens_cost_usd,
            completion_tokens=completion_tokens,
            completion_tokens_cost_usd=completion_tokens_cost_usd,
            total_tokens=total_tokens,
            total_tokens_cost_usd=total_tokens_cost_usd,
            cache_read_input_tokens=cache_read_input_tokens,
            cache_creation_input_tokens=cache_creation_input_tokens,
        )

    def _handle_completion_response(
        self,
        response: Union["ModelResponse", "CustomStreamWrapper"],
        config: RunnableConfig = None,
        **kwargs,
    ) -> dict:
        """Handle completion response.

        Args:
            response (ModelResponse | CustomStreamWrapper): The response from the LLM.
            config (RunnableConfig, optional): The configuration for the execution. Defaults to None.
            **kwargs: Additional keyword arguments.

        Returns:
            dict: A dictionary containing the generated content and tool calls if present.
        """
        content = response.choices[0].message.content
        result = {"content": content}
        if tool_calls := response.choices[0].message.tool_calls:
            tool_calls_parsed = []
            for tc in tool_calls:
                call = tc.model_dump()
                call["function"]["arguments"] = json.loads(call["function"]["arguments"])
                tool_calls_parsed.append(call)
            result["tool_calls"] = tool_calls_parsed

        usage_data = self.get_usage_data(model=self.model, completion=response).model_dump()
        self.run_on_node_execute_run(callbacks=config.callbacks, usage_data=usage_data, **kwargs)

        return result

    def _handle_streaming_completion_response(
        self,
        response: Union["ModelResponse", "CustomStreamWrapper"],
        messages: list[dict],
        config: RunnableConfig = None,
        **kwargs,
    ):
        """Handle streaming completion response.

        Args:
            response (ModelResponse | CustomStreamWrapper): The response from the LLM.
            messages (list[dict]): The messages used for the LLM.
            config (RunnableConfig, optional): The configuration for the execution. Defaults to None.
            **kwargs: Additional keyword arguments.

        Returns:
            dict: A dictionary containing the generated content and tool calls.
        """
        chunks = []
        for chunk in response:
            chunks.append(chunk)

            self.run_on_node_execute_stream(
                config.callbacks,
                chunk.model_dump(),
                **kwargs,
            )

        full_response = self._stream_chunk_builder(chunks=chunks, messages=messages)
        return self._handle_completion_response(response=full_response, config=config, **kwargs)

    def _get_response_format_and_tools(
        self,
        prompt: Prompt | None = None,
        tools: list[Tool | dict] | None = None,
        response_format: dict[str, Any] | None = None,
    ) -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
        """Get response format and tools
        Args:
            input_data (BaseLLMInputSchema): The input data for the LLM.
            prompt (Prompt | None): The prompt to use.
            tools (list[Tool] | None): The tools to use.
            response_format (dict[str, Any] | None): The response format to use.
        Returns:
            tuple[dict[str, Any] | None, dict[str, Any] | None]: Response format and tools.
        Raises:
            ValueError: If schema is None when using STRUCTURED_OUTPUT or FUNCTION_CALLING modes.
        """
        response_format = response_format or self.response_format or prompt.response_format
        tools = tools or self.tools or prompt.tools

        # Suppress DeprecationWarning if deprecated parameters are not set
        with warnings.catch_warnings():
            warnings.simplefilter("ignore", category=DeprecationWarning)
            use_inference_mode = (not response_format or not tools) and self.inference_mode != InferenceMode.DEFAULT

        if use_inference_mode:
            schema = self.schema_
            match self.inference_mode:
                case InferenceMode.STRUCTURED_OUTPUT:
                    if schema is None:
                        raise ValueError("Schema must be provided when using STRUCTURED_OUTPUT inference mode")
                    response_format = response_format or schema
                case InferenceMode.FUNCTION_CALLING:
                    if schema is None:
                        raise ValueError("Schema must be provided when using FUNCTION_CALLING inference mode")
                    tools = tools or schema

        if tools:
            tools = [tool.model_dump() if isinstance(tool, Tool) else tool for tool in tools]

        return response_format, tools

    def update_completion_params(self, params: dict[str, Any]) -> dict[str, Any]:
        """
        Updates or modifies the parameters for the completion method.

        This method can be overridden by subclasses to customize the parameters
        passed to the completion method. By default, it enables usage information
        in streaming mode if streaming is enabled and include_usage is set.
        Args:
            params (dict[str, Any]): The parameters to be updated.

        Returns:
            dict[str, Any]: The updated parameters.
        """
        if self.streaming and self.streaming.enabled and self.streaming.include_usage and params.get("stream", False):
            params.setdefault("stream_options", {})
            params["stream_options"]["include_usage"] = True
        return params

    def execute(
        self,
        input_data: BaseLLMInputSchema,
        config: RunnableConfig = None,
        prompt: Prompt | None = None,
        tools: list[Tool | dict] | None = None,
        response_format: dict[str, Any] | None = None,
        parallel_tool_calls: bool | None = None,
        **kwargs,
    ):
        """Execute the LLM node.

        This method processes the input data, formats the prompt, and generates a response using
        the configured LLM.

        Args:
            input_data (BaseLLMInputSchema): The input data for the LLM.
            config (RunnableConfig, optional): The configuration for the execution. Defaults to None.
            prompt (Prompt, optional): The prompt to use for this execution. Defaults to None.
            tools (list[Tool|dict]): List of tools that llm can call.
            response_format (dict[str, Any]): JSON schema that specifies the structure of the llm's output
            parallel_tool_calls (bool | None): Whether to allow the LLM to return multiple tool calls
                in a single response. None means provider decides.
            **kwargs: Additional keyword arguments.

        Returns:
            dict: A dictionary containing the generated content and tool calls.
        """
        config = ensure_config(config)
        self.reset_run_state()
        prompt = prompt or self.prompt or Prompt(messages=[], tools=None, response_format=None)
        messages = self.get_messages(prompt, input_data)
        self.run_on_node_execute_run(callbacks=config.callbacks, prompt_messages=messages, **kwargs)

        extra = copy.deepcopy(self.__pydantic_extra__)
        params = self.connection.conn_params.copy()
        if self.client and not isinstance(self.connection, HttpApiKey):
            params.update({"client": self.client})
        if self.thinking_enabled:
            params.update({"thinking": {"type": "enabled", "budget_tokens": self.budget_tokens}})
        if extra:
            params.update(extra)

        response_format, tools = self._get_response_format_and_tools(
            prompt=prompt,
            tools=tools,
            response_format=response_format,
        )
        # Check if a streaming callback is available in the config and enable streaming only if it is
        # This is to avoid unnecessary streaming to reduce CPU usage
        is_streaming_callback_available = any(
            isinstance(callback, BaseStreamingCallbackHandler) for callback in config.callbacks
        )
        common_params: dict[str, Any] = {
            "model": self.model,
            "messages": messages,
            "stream": self.streaming.enabled and is_streaming_callback_available,
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "tools": tools,
            "tool_choice": self.tool_choice,
            "stop": self.stop if self.stop else None,
            "top_p": self.top_p,
            "seed": self.seed,
            "presence_penalty": self.presence_penalty,
            "frequency_penalty": self.frequency_penalty,
            "response_format": response_format,
            "drop_params": True,
            **params,
        }
        if parallel_tool_calls is not None:
            common_params["parallel_tool_calls"] = parallel_tool_calls

        common_params = self.update_completion_params(common_params)

        response = self._completion(**common_params)
        handle_completion = (
            self._handle_streaming_completion_response
            if self.streaming.enabled and is_streaming_callback_available
            else self._handle_completion_response
        )

        return handle_completion(
            response=response, messages=messages, config=config, input_data=dict(input_data), **kwargs
        )

    def _is_rate_limit_error(self, exception_type: type[Exception], error_str: str) -> bool:
        """Check if the error is a rate limit error.

        Args:
            exception_type: The type of exception.
            error_str: Lowercase error message string.

        Returns:
            bool: True if it's a rate limit error.
        """
        if issubclass(exception_type, (RateLimitError, BudgetExceededError)):
            return True
        return any(indicator in error_str for indicator in LLM_RATE_LIMIT_ERROR_INDICATORS)

    def _is_connection_error(self, exception_type: type[Exception], error_str: str) -> bool:
        """Check if the error is a connection error.

        Args:
            exception_type: The type of exception.
            error_str: Lowercase error message string.

        Returns:
            bool: True if it's a connection error.
        """
        if issubclass(exception_type, (APIConnectionError, Timeout, ServiceUnavailableError, InternalServerError)):
            return True
        if issubclass(exception_type, (ConnectionError, TimeoutError, OSError)):
            return True
        return any(indicator in error_str for indicator in LLM_CONNECTION_ERROR_INDICATORS)

    def _should_trigger_fallback(self, exception_type: type[Exception], exception_message: str | None = None) -> bool:
        """Determine if exception should trigger fallback to secondary LLM.

        Args:
            exception_type: The type of exception that caused the primary LLM to fail.
            exception_message: The exception message string for string-based detection.

        Returns:
            bool: True if fallback should be triggered, False otherwise.
        """
        if not self.fallback or not self.fallback.enabled or not self.fallback.llm:
            return False

        triggers = set(self.fallback.triggers)
        if FallbackTrigger.ANY in triggers:
            return True

        error_str = (exception_message or "").lower()

        if FallbackTrigger.RATE_LIMIT in triggers and self._is_rate_limit_error(exception_type, error_str):
            return True
        if FallbackTrigger.CONNECTION in triggers and self._is_connection_error(exception_type, error_str):
            return True

        return False

    def run_sync(
        self,
        input_data: dict,
        config: RunnableConfig = None,
        depends_result: dict = None,
        **kwargs,
    ) -> RunnableResult:
        """Run the LLM with fallback support.

        If the primary LLM fails and a fallback is configured, the primary failure
        is traced first, then the fallback LLM is executed separately.

        The fallback receives the same transformed input that the primary received,
        and the primary's output_transformer is applied to the fallback's output.

        Args:
            input_data: Input data for the LLM.
            config: Configuration for the run.
            depends_result: Results of dependent nodes.
            **kwargs: Additional keyword arguments.

        Returns:
            RunnableResult: Result of the LLM execution.
        """
        result = super().run_sync(input_data=input_data, config=config, depends_result=depends_result, **kwargs)

        if result.status != RunnableStatus.FAILURE:
            return result

        if not self.fallback or not self.fallback.llm:
            return result

        if not result.error:
            return result

        if not self._should_trigger_fallback(result.error.type, result.error.message):
            return result

        fallback_llm = self.fallback.llm
        fallback_llm._is_fallback_run = True
        logger.warning(
            f"LLM {self.name} - {self.id}: Primary LLM ({self.model}) failed. "
            f"Error: {result.error.type.__name__}: {result.error.message}. "
            f"Attempting fallback to {fallback_llm.name} - {fallback_llm.id}"
        )

        # Use the primary's already transformed input for fallback
        # This ensures fallback works with the same prepared input as primary
        fallback_kwargs = {k: v for k, v in kwargs.items() if k != "run_depends"}
        fallback_kwargs["parent_run_id"] = kwargs.get("parent_run_id")

        fallback_input = result.input.model_dump() if hasattr(result.input, "model_dump") else result.input
        fallback_result = fallback_llm.run_sync(
            input_data=fallback_input,
            config=config,
            depends_result=None,  # Input is already transformed, no need to merge depends
            **fallback_kwargs,
        )

        if fallback_result.status == RunnableStatus.SUCCESS:
            logger.info(f"LLM {self.name} - {self.id}: Fallback LLM ({fallback_llm.model}) succeeded")
            # Apply primary node's output_transformer to fallback result
            transformed_output = self.transform_output(fallback_result.output, config=config, **kwargs)
            return RunnableResult(
                status=RunnableStatus.SUCCESS,
                input=result.input,
                output=transformed_output,
            )

        logger.error(f"LLM {self.name} - {self.id}: Fallback LLM ({fallback_llm.model}) failed.")
        return result

is_pdf_input_supported: bool property

Check if the LLM supports PDF input.

is_vision_supported: bool property

Check if the LLM supports vision/image processing.

to_dict_exclude_params: dict property

Exclude fallback configuration during serialization.

__init__(**kwargs)

Initialize the BaseLLM instance.

Parameters:

Name Type Description Default
**kwargs

Additional keyword arguments.

{}
Source code in dynamiq/nodes/llms/base.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def __init__(self, **kwargs):
    """Initialize the BaseLLM instance.

    Args:
        **kwargs: Additional keyword arguments.
    """
    super().__init__(**kwargs)

    # Save a bit of loading time as litellm is slow
    from litellm import completion, stream_chunk_builder

    # Avoid the same imports multiple times and for future usage in execute
    self._completion = completion
    self._stream_chunk_builder = stream_chunk_builder

execute(input_data, config=None, prompt=None, tools=None, response_format=None, parallel_tool_calls=None, **kwargs)

Execute the LLM node.

This method processes the input data, formats the prompt, and generates a response using the configured LLM.

Parameters:

Name Type Description Default
input_data BaseLLMInputSchema

The input data for the LLM.

required
config RunnableConfig

The configuration for the execution. Defaults to None.

None
prompt Prompt

The prompt to use for this execution. Defaults to None.

None
tools list[Tool | dict]

List of tools that llm can call.

None
response_format dict[str, Any]

JSON schema that specifies the structure of the llm's output

None
parallel_tool_calls bool | None

Whether to allow the LLM to return multiple tool calls in a single response. None means provider decides.

None
**kwargs

Additional keyword arguments.

{}

Returns:

Name Type Description
dict

A dictionary containing the generated content and tool calls.

Source code in dynamiq/nodes/llms/base.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
def execute(
    self,
    input_data: BaseLLMInputSchema,
    config: RunnableConfig = None,
    prompt: Prompt | None = None,
    tools: list[Tool | dict] | None = None,
    response_format: dict[str, Any] | None = None,
    parallel_tool_calls: bool | None = None,
    **kwargs,
):
    """Execute the LLM node.

    This method processes the input data, formats the prompt, and generates a response using
    the configured LLM.

    Args:
        input_data (BaseLLMInputSchema): The input data for the LLM.
        config (RunnableConfig, optional): The configuration for the execution. Defaults to None.
        prompt (Prompt, optional): The prompt to use for this execution. Defaults to None.
        tools (list[Tool|dict]): List of tools that llm can call.
        response_format (dict[str, Any]): JSON schema that specifies the structure of the llm's output
        parallel_tool_calls (bool | None): Whether to allow the LLM to return multiple tool calls
            in a single response. None means provider decides.
        **kwargs: Additional keyword arguments.

    Returns:
        dict: A dictionary containing the generated content and tool calls.
    """
    config = ensure_config(config)
    self.reset_run_state()
    prompt = prompt or self.prompt or Prompt(messages=[], tools=None, response_format=None)
    messages = self.get_messages(prompt, input_data)
    self.run_on_node_execute_run(callbacks=config.callbacks, prompt_messages=messages, **kwargs)

    extra = copy.deepcopy(self.__pydantic_extra__)
    params = self.connection.conn_params.copy()
    if self.client and not isinstance(self.connection, HttpApiKey):
        params.update({"client": self.client})
    if self.thinking_enabled:
        params.update({"thinking": {"type": "enabled", "budget_tokens": self.budget_tokens}})
    if extra:
        params.update(extra)

    response_format, tools = self._get_response_format_and_tools(
        prompt=prompt,
        tools=tools,
        response_format=response_format,
    )
    # Check if a streaming callback is available in the config and enable streaming only if it is
    # This is to avoid unnecessary streaming to reduce CPU usage
    is_streaming_callback_available = any(
        isinstance(callback, BaseStreamingCallbackHandler) for callback in config.callbacks
    )
    common_params: dict[str, Any] = {
        "model": self.model,
        "messages": messages,
        "stream": self.streaming.enabled and is_streaming_callback_available,
        "temperature": self.temperature,
        "max_tokens": self.max_tokens,
        "tools": tools,
        "tool_choice": self.tool_choice,
        "stop": self.stop if self.stop else None,
        "top_p": self.top_p,
        "seed": self.seed,
        "presence_penalty": self.presence_penalty,
        "frequency_penalty": self.frequency_penalty,
        "response_format": response_format,
        "drop_params": True,
        **params,
    }
    if parallel_tool_calls is not None:
        common_params["parallel_tool_calls"] = parallel_tool_calls

    common_params = self.update_completion_params(common_params)

    response = self._completion(**common_params)
    handle_completion = (
        self._handle_streaming_completion_response
        if self.streaming.enabled and is_streaming_callback_available
        else self._handle_completion_response
    )

    return handle_completion(
        response=response, messages=messages, config=config, input_data=dict(input_data), **kwargs
    )

get_context_for_input_schema()

Provides context for input schema that is required for proper validation.

Source code in dynamiq/nodes/llms/base.py
299
300
301
def get_context_for_input_schema(self) -> dict:
    """Provides context for input schema that is required for proper validation."""
    return {"instance_prompt": self.prompt}

get_messages(prompt, input_data)

Format and filter message parameters based on provider requirements. Override this in provider-specific subclasses.

Source code in dynamiq/nodes/llms/base.py
356
357
358
359
360
361
362
363
364
365
366
def get_messages(
    self,
    prompt,
    input_data,
) -> list[dict]:
    """
    Format and filter message parameters based on provider requirements.
    Override this in provider-specific subclasses.
    """
    messages = prompt.format_messages(**dict(input_data))
    return messages

get_token_limit()

Returns token limits of a llm.

Returns:

Name Type Description
int int

Number of tokens.

Source code in dynamiq/nodes/llms/base.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def get_token_limit(self) -> int:
    """Returns token limits of a llm.

    Returns:
        int: Number of tokens.
    """
    info = self._get_litellm_model_info()
    if info is not None:
        max_input = info.get("max_input_tokens")
        if max_input:
            return max_input

    try:
        return get_max_tokens(self.model)
    except Exception:
        logger.debug("Model %s not found in litellm, falling back to custom registry.", self.model)

    custom_max = model_registry.get_max_tokens(self.model)
    if custom_max is not None:
        return custom_max

    logger.warning(f"Model {self.model} not found in litellm or custom registry. Using default token limit.")
    return LLM_DEFAULT_MAX_TOKENS

get_usage_data(model, completion) classmethod

Get usage data for the LLM.

This method generates usage data for the LLM based on the provided messages.

Parameters:

Name Type Description Default
model str

The model to use for generating the usage data.

required
completion ModelResponse

The completion response from the LLM.

required

Returns:

Name Type Description
BaseLLMUsageData BaseLLMUsageData

A model containing the usage data for the LLM.

Source code in dynamiq/nodes/llms/base.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
@classmethod
def get_usage_data(
    cls,
    model: str,
    completion: "ModelResponse",
) -> BaseLLMUsageData:
    """Get usage data for the LLM.

    This method generates usage data for the LLM based on the provided messages.

    Args:
        model (str): The model to use for generating the usage data.
        completion (ModelResponse): The completion response from the LLM.

    Returns:
        BaseLLMUsageData: A model containing the usage data for the LLM.
    """
    from litellm import cost_per_token

    usage = cls._extract_usage(completion=completion)
    prompt_tokens = cls._usage_value(usage, "prompt_tokens", 0) or 0
    completion_tokens = cls._usage_value(usage, "completion_tokens", 0) or 0
    total_tokens = (
        cls._usage_value(usage, "total_tokens", prompt_tokens + completion_tokens)
        or prompt_tokens + completion_tokens
    )
    cache_read_input_tokens = cls._usage_value(usage, "cache_read_input_tokens")
    cache_creation_input_tokens = cls._usage_value(usage, "cache_creation_input_tokens")

    try:
        cost_kwargs: dict[str, Any] = {
            "model": model,
            "prompt_tokens": prompt_tokens,
            "completion_tokens": completion_tokens,
        }
        if cache_read_input_tokens is not None:
            cost_kwargs["cache_read_input_tokens"] = cache_read_input_tokens
        if cache_creation_input_tokens is not None:
            cost_kwargs["cache_creation_input_tokens"] = cache_creation_input_tokens

        prompt_tokens_cost_usd, completion_tokens_cost_usd = cost_per_token(**cost_kwargs)
        total_tokens_cost_usd = prompt_tokens_cost_usd + completion_tokens_cost_usd
    except Exception:
        prompt_tokens_cost_usd, completion_tokens_cost_usd, total_tokens_cost_usd = None, None, None

    return BaseLLMUsageData(
        prompt_tokens=prompt_tokens,
        prompt_tokens_cost_usd=prompt_tokens_cost_usd,
        completion_tokens=completion_tokens,
        completion_tokens_cost_usd=completion_tokens_cost_usd,
        total_tokens=total_tokens,
        total_tokens_cost_usd=total_tokens_cost_usd,
        cache_read_input_tokens=cache_read_input_tokens,
        cache_creation_input_tokens=cache_creation_input_tokens,
    )

init_components(connection_manager=None)

Initialize components including fallback LLM if configured.

Parameters:

Name Type Description Default
connection_manager

The connection manager for initializing connections.

None
Source code in dynamiq/nodes/llms/base.py
270
271
272
273
274
275
276
277
278
def init_components(self, connection_manager=None):
    """Initialize components including fallback LLM if configured.

    Args:
        connection_manager: The connection manager for initializing connections.
    """
    super().init_components(connection_manager)
    if self.fallback and self.fallback.llm and self.fallback.llm.is_postponed_component_init:
        self.fallback.llm.init_components(connection_manager)

reset_run_state()

Reset the run state of the LLM.

Source code in dynamiq/nodes/llms/base.py
295
296
297
def reset_run_state(self):
    """Reset the run state of the LLM."""
    self._is_fallback_run = False

run_sync(input_data, config=None, depends_result=None, **kwargs)

Run the LLM with fallback support.

If the primary LLM fails and a fallback is configured, the primary failure is traced first, then the fallback LLM is executed separately.

The fallback receives the same transformed input that the primary received, and the primary's output_transformer is applied to the fallback's output.

Parameters:

Name Type Description Default
input_data dict

Input data for the LLM.

required
config RunnableConfig

Configuration for the run.

None
depends_result dict

Results of dependent nodes.

None
**kwargs

Additional keyword arguments.

{}

Returns:

Name Type Description
RunnableResult RunnableResult

Result of the LLM execution.

Source code in dynamiq/nodes/llms/base.py
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
def run_sync(
    self,
    input_data: dict,
    config: RunnableConfig = None,
    depends_result: dict = None,
    **kwargs,
) -> RunnableResult:
    """Run the LLM with fallback support.

    If the primary LLM fails and a fallback is configured, the primary failure
    is traced first, then the fallback LLM is executed separately.

    The fallback receives the same transformed input that the primary received,
    and the primary's output_transformer is applied to the fallback's output.

    Args:
        input_data: Input data for the LLM.
        config: Configuration for the run.
        depends_result: Results of dependent nodes.
        **kwargs: Additional keyword arguments.

    Returns:
        RunnableResult: Result of the LLM execution.
    """
    result = super().run_sync(input_data=input_data, config=config, depends_result=depends_result, **kwargs)

    if result.status != RunnableStatus.FAILURE:
        return result

    if not self.fallback or not self.fallback.llm:
        return result

    if not result.error:
        return result

    if not self._should_trigger_fallback(result.error.type, result.error.message):
        return result

    fallback_llm = self.fallback.llm
    fallback_llm._is_fallback_run = True
    logger.warning(
        f"LLM {self.name} - {self.id}: Primary LLM ({self.model}) failed. "
        f"Error: {result.error.type.__name__}: {result.error.message}. "
        f"Attempting fallback to {fallback_llm.name} - {fallback_llm.id}"
    )

    # Use the primary's already transformed input for fallback
    # This ensures fallback works with the same prepared input as primary
    fallback_kwargs = {k: v for k, v in kwargs.items() if k != "run_depends"}
    fallback_kwargs["parent_run_id"] = kwargs.get("parent_run_id")

    fallback_input = result.input.model_dump() if hasattr(result.input, "model_dump") else result.input
    fallback_result = fallback_llm.run_sync(
        input_data=fallback_input,
        config=config,
        depends_result=None,  # Input is already transformed, no need to merge depends
        **fallback_kwargs,
    )

    if fallback_result.status == RunnableStatus.SUCCESS:
        logger.info(f"LLM {self.name} - {self.id}: Fallback LLM ({fallback_llm.model}) succeeded")
        # Apply primary node's output_transformer to fallback result
        transformed_output = self.transform_output(fallback_result.output, config=config, **kwargs)
        return RunnableResult(
            status=RunnableStatus.SUCCESS,
            input=result.input,
            output=transformed_output,
        )

    logger.error(f"LLM {self.name} - {self.id}: Fallback LLM ({fallback_llm.model}) failed.")
    return result

set_model(value) classmethod

Set the model with the appropriate prefix.

Parameters:

Name Type Description Default
value str | None

The model value.

required

Returns:

Name Type Description
str str

The model value with the prefix.

Source code in dynamiq/nodes/llms/base.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
@field_validator("model")
@classmethod
def set_model(cls, value: str | None) -> str:
    """Set the model with the appropriate prefix.

    Args:
        value (str | None): The model value.

    Returns:
        str: The model value with the prefix.
    """
    if cls.MODEL_PREFIX is not None and not value.startswith(cls.MODEL_PREFIX):
        value = f"{cls.MODEL_PREFIX}{value}"
    return value

to_dict(**kwargs)

Convert to dictionary representation.

Source code in dynamiq/nodes/llms/base.py
285
286
287
288
289
290
291
292
293
def to_dict(self, **kwargs) -> dict:
    """Convert to dictionary representation."""
    data = super().to_dict(**kwargs)
    if self.fallback:
        data["fallback"] = self.fallback.model_dump(exclude={"llm": True})
        data["fallback"]["llm"] = self.fallback.llm.to_dict(**kwargs) if self.fallback.llm else None
    if self._is_fallback_run:
        data["is_fallback"] = True
    return data

update_completion_params(params)

Updates or modifies the parameters for the completion method.

This method can be overridden by subclasses to customize the parameters passed to the completion method. By default, it enables usage information in streaming mode if streaming is enabled and include_usage is set. Args: params (dict[str, Any]): The parameters to be updated.

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The updated parameters.

Source code in dynamiq/nodes/llms/base.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
def update_completion_params(self, params: dict[str, Any]) -> dict[str, Any]:
    """
    Updates or modifies the parameters for the completion method.

    This method can be overridden by subclasses to customize the parameters
    passed to the completion method. By default, it enables usage information
    in streaming mode if streaming is enabled and include_usage is set.
    Args:
        params (dict[str, Any]): The parameters to be updated.

    Returns:
        dict[str, Any]: The updated parameters.
    """
    if self.streaming and self.streaming.enabled and self.streaming.include_usage and params.get("stream", False):
        params.setdefault("stream_options", {})
        params["stream_options"]["include_usage"] = True
    return params

BaseLLMUsageData

Bases: BaseModel

Model for LLM usage data.

Attributes:

Name Type Description
prompt_tokens int

Number of prompt tokens.

prompt_tokens_cost_usd float | None

Cost of prompt tokens in USD.

completion_tokens int

Number of completion tokens.

completion_tokens_cost_usd float | None

Cost of completion tokens in USD.

total_tokens int

Total number of tokens.

total_tokens_cost_usd float | None

Total cost of tokens in USD.

cache_read_input_tokens int | None

Number of cache read input tokens.

cache_creation_input_tokens int | None

Number of cache creation input tokens.

Source code in dynamiq/nodes/llms/base.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class BaseLLMUsageData(BaseModel):
    """Model for LLM usage data.

    Attributes:
        prompt_tokens (int): Number of prompt tokens.
        prompt_tokens_cost_usd (float | None): Cost of prompt tokens in USD.
        completion_tokens (int): Number of completion tokens.
        completion_tokens_cost_usd (float | None): Cost of completion tokens in USD.
        total_tokens (int): Total number of tokens.
        total_tokens_cost_usd (float | None): Total cost of tokens in USD.
        cache_read_input_tokens (int | None): Number of cache read input tokens.
        cache_creation_input_tokens (int | None): Number of cache creation input tokens.
    """
    prompt_tokens: int
    prompt_tokens_cost_usd: float | None
    completion_tokens: int
    completion_tokens_cost_usd: float | None
    total_tokens: int
    total_tokens_cost_usd: float | None
    cache_read_input_tokens: int | None = None
    cache_creation_input_tokens: int | None = None

FallbackConfig

Bases: BaseModel

Configuration for LLM fallback behavior.

Attributes:

Name Type Description
llm BaseLLM | None

The fallback LLM to use when the primary LLM fails. Required when enabled=True.

enabled bool

Whether fallback is enabled. Defaults to False.

triggers list[FallbackTrigger]

List of trigger conditions that will activate the fallback. Use FallbackTrigger.ANY to trigger on any error.

Examples:

Single trigger

FallbackConfig(llm=my_llm, enabled=True, triggers=[FallbackTrigger.RATE_LIMIT])

Multiple triggers

FallbackConfig(llm=my_llm, enabled=True, triggers=[FallbackTrigger.RATE_LIMIT, FallbackTrigger.CONNECTION])

Source code in dynamiq/nodes/llms/base.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class FallbackConfig(BaseModel):
    """Configuration for LLM fallback behavior.

    Attributes:
        llm: The fallback LLM to use when the primary LLM fails. Required when enabled=True.
        enabled: Whether fallback is enabled. Defaults to False.
        triggers: List of trigger conditions that will activate the fallback.
            Use FallbackTrigger.ANY to trigger on any error.

    Examples:
        # Single trigger
        FallbackConfig(llm=my_llm, enabled=True, triggers=[FallbackTrigger.RATE_LIMIT])

        # Multiple triggers
        FallbackConfig(llm=my_llm, enabled=True, triggers=[FallbackTrigger.RATE_LIMIT, FallbackTrigger.CONNECTION])
    """

    llm: "BaseLLM | None" = None
    enabled: bool = False
    triggers: list[FallbackTrigger] = Field(default_factory=lambda: [FallbackTrigger.ANY])

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @model_validator(mode="after")
    def validate_llm_required_when_enabled(self) -> "FallbackConfig":
        """Validate that llm is provided when fallback is enabled."""
        if self.enabled and self.llm is None:
            raise ValueError("FallbackConfig requires 'llm' when 'enabled' is True")
        return self

validate_llm_required_when_enabled()

Validate that llm is provided when fallback is enabled.

Source code in dynamiq/nodes/llms/base.py
 96
 97
 98
 99
100
101
@model_validator(mode="after")
def validate_llm_required_when_enabled(self) -> "FallbackConfig":
    """Validate that llm is provided when fallback is enabled."""
    if self.enabled and self.llm is None:
        raise ValueError("FallbackConfig requires 'llm' when 'enabled' is True")
    return self