Skip to content

Human feedback

HumanFeedbackTool

Bases: Node

A tool for gathering user information through human feedback.

This tool prompts the user for input and returns the response. It should be used to check actual information from the user or to gather additional input during a process.

Attributes:

Name Type Description
group Literal[TOOLS]

The group the node belongs to.

name str

The name of the tool.

description str

A brief description of the tool's purpose.

msg_template str

Template of message to send.

input_method FeedbackMethod | InputMethodCallable

The method used to gather user input.

Source code in dynamiq/nodes/tools/human_feedback.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
class HumanFeedbackTool(Node):
    """
    A tool for gathering user information through human feedback.

    This tool prompts the user for input and returns the response. It should be used to check actual
    information from the user or to gather additional input during a process.

    Attributes:
        group (Literal[NodeGroup.TOOLS]): The group the node belongs to.
        name (str): The name of the tool.
        description (str): A brief description of the tool's purpose.
        msg_template (str): Template of message to send.
        input_method (FeedbackMethod | InputMethodCallable): The method used to gather user input.
    """

    group: Literal[NodeGroup.TOOLS] = NodeGroup.TOOLS
    name: str = "Human Feedback Tool"
    description: str = """Collects human feedback during workflow execution for validation and decision-making.

Key Capabilities:
- Workflow pause for human input collection
- Console and streaming interface support
- Customizable message templates with parameter substitution
- Integration with workflow orchestrators and streaming systems

Usage Strategy:
- Use for content validation before publication
- Implement decision confirmation for high-stakes operations
- Create quality assurance checkpoints in automated workflows
- Collect user preferences and customization inputs

Parameter Guide:
- msg_template: Jinja2 template for message formatting

Examples:
- {"msg_template": "Please review this email draft before sending"}"""
    input_method: FeedbackMethod | InputMethodCallable = FeedbackMethod.CONSOLE
    input_schema: ClassVar[type[HumanFeedbackInputSchema]] = HumanFeedbackInputSchema
    msg_template: str = "{{input}}"
    model_config = ConfigDict(arbitrary_types_allowed=True)

    @model_validator(mode="after")
    def update_description(self):
        msg_template = self.msg_template
        self.description += (
            f"\nThis is the template of message to send: '{msg_template}'."
            " Parameters will be substituted based on the provided input data."
        )
        return self

    def input_method_console(self, prompt: str) -> str:
        """
        Get input from the user using the console input method.

        Args:
            prompt (str): The prompt to display to the user.

        Returns:
            str: The user's input.
        """
        return input(prompt)

    def input_method_streaming(self, prompt: str, config: RunnableConfig, **kwargs) -> str:
        """
        Get input from the user using the queue streaming input method.

        Args:
            prompt (str): The prompt to display to the user.
            config (RunnableConfig, optional): The configuration for the runnable. Defaults to None.

        Returns:
            str: The user's input.
        """
        logger.debug(f"Tool {self.name} - {self.id}: started with prompt {prompt}")

        streaming = getattr(config.nodes_override.get(self.id), "streaming", None) or self.streaming

        event = HFStreamingOutputEventMessage(
            wf_run_id=config.run_id,
            entity_id=self.id,
            data=HFStreamingOutputEventMessageData(prompt=prompt),
            event=streaming.event,
            source=StreamingEntitySource(
                name=self.name,
                group=self.group,
                type=self.type,
            ),
        )
        logger.debug(f"Tool {self.name} - {self.id}: sending output event {event}")
        self.run_on_node_execute_stream(callbacks=config.callbacks, event=event, **kwargs)
        event = self.get_input_streaming_event(
            event_msg_type=HFStreamingInputEventMessage,
            event=streaming.event,
            config=config,
        )
        logger.debug(f"Tool {self.name} - {self.id}: received input event {event}")

        return event.data.content

    def execute(
        self, input_data: HumanFeedbackInputSchema, config: RunnableConfig | None = None, **kwargs
    ) -> dict[str, Any]:
        """
        Execute the tool with the provided input data and configuration.

        This method prompts the user for input using the specified input method and returns the result.

        Args:
            input_data (dict[str, Any]): The input data containing the prompt for the user.
            config (RunnableConfig, optional): The configuration for the runnable. Defaults to None.
            **kwargs: Additional keyword arguments to be passed to the node execute run.

        Returns:
            dict[str, Any]: A dictionary containing the user's input under the 'content' key.

        Raises:
            ValueError: If the input_data does not contain an 'input' key.
        """
        logger.debug(f"Tool {self.name} - {self.id}: started with input data {input_data.model_dump()}")
        config = ensure_config(config)
        self.run_on_node_execute_run(config.callbacks, **kwargs)

        input_text = Template(self.msg_template).render(input_data.model_dump())

        if isinstance(self.input_method, FeedbackMethod):
            if self.input_method == FeedbackMethod.CONSOLE:
                result = self.input_method_console(input_text)
            elif self.input_method == FeedbackMethod.STREAM:
                streaming = getattr(config.nodes_override.get(self.id), "streaming", None) or self.streaming
                if not streaming.input_streaming_enabled:
                    raise ValueError(
                        f"'{FeedbackMethod.STREAM}' input method requires enabled input and output streaming."
                    )

                result = self.input_method_streaming(prompt=input_text, config=config, **kwargs)
            else:
                raise ValueError(f"Unsupported input method: {self.input_method}")
        else:
            result = self.input_method.get_input(input_text)

        logger.debug(f"Tool {self.name} - {self.id}: finished with result {result}")
        return {"content": result}

execute(input_data, config=None, **kwargs)

Execute the tool with the provided input data and configuration.

This method prompts the user for input using the specified input method and returns the result.

Parameters:

Name Type Description Default
input_data dict[str, Any]

The input data containing the prompt for the user.

required
config RunnableConfig

The configuration for the runnable. Defaults to None.

None
**kwargs

Additional keyword arguments to be passed to the node execute run.

{}

Returns:

Type Description
dict[str, Any]

dict[str, Any]: A dictionary containing the user's input under the 'content' key.

Raises:

Type Description
ValueError

If the input_data does not contain an 'input' key.

Source code in dynamiq/nodes/tools/human_feedback.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def execute(
    self, input_data: HumanFeedbackInputSchema, config: RunnableConfig | None = None, **kwargs
) -> dict[str, Any]:
    """
    Execute the tool with the provided input data and configuration.

    This method prompts the user for input using the specified input method and returns the result.

    Args:
        input_data (dict[str, Any]): The input data containing the prompt for the user.
        config (RunnableConfig, optional): The configuration for the runnable. Defaults to None.
        **kwargs: Additional keyword arguments to be passed to the node execute run.

    Returns:
        dict[str, Any]: A dictionary containing the user's input under the 'content' key.

    Raises:
        ValueError: If the input_data does not contain an 'input' key.
    """
    logger.debug(f"Tool {self.name} - {self.id}: started with input data {input_data.model_dump()}")
    config = ensure_config(config)
    self.run_on_node_execute_run(config.callbacks, **kwargs)

    input_text = Template(self.msg_template).render(input_data.model_dump())

    if isinstance(self.input_method, FeedbackMethod):
        if self.input_method == FeedbackMethod.CONSOLE:
            result = self.input_method_console(input_text)
        elif self.input_method == FeedbackMethod.STREAM:
            streaming = getattr(config.nodes_override.get(self.id), "streaming", None) or self.streaming
            if not streaming.input_streaming_enabled:
                raise ValueError(
                    f"'{FeedbackMethod.STREAM}' input method requires enabled input and output streaming."
                )

            result = self.input_method_streaming(prompt=input_text, config=config, **kwargs)
        else:
            raise ValueError(f"Unsupported input method: {self.input_method}")
    else:
        result = self.input_method.get_input(input_text)

    logger.debug(f"Tool {self.name} - {self.id}: finished with result {result}")
    return {"content": result}

input_method_console(prompt)

Get input from the user using the console input method.

Parameters:

Name Type Description Default
prompt str

The prompt to display to the user.

required

Returns:

Name Type Description
str str

The user's input.

Source code in dynamiq/nodes/tools/human_feedback.py
127
128
129
130
131
132
133
134
135
136
137
def input_method_console(self, prompt: str) -> str:
    """
    Get input from the user using the console input method.

    Args:
        prompt (str): The prompt to display to the user.

    Returns:
        str: The user's input.
    """
    return input(prompt)

input_method_streaming(prompt, config, **kwargs)

Get input from the user using the queue streaming input method.

Parameters:

Name Type Description Default
prompt str

The prompt to display to the user.

required
config RunnableConfig

The configuration for the runnable. Defaults to None.

required

Returns:

Name Type Description
str str

The user's input.

Source code in dynamiq/nodes/tools/human_feedback.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def input_method_streaming(self, prompt: str, config: RunnableConfig, **kwargs) -> str:
    """
    Get input from the user using the queue streaming input method.

    Args:
        prompt (str): The prompt to display to the user.
        config (RunnableConfig, optional): The configuration for the runnable. Defaults to None.

    Returns:
        str: The user's input.
    """
    logger.debug(f"Tool {self.name} - {self.id}: started with prompt {prompt}")

    streaming = getattr(config.nodes_override.get(self.id), "streaming", None) or self.streaming

    event = HFStreamingOutputEventMessage(
        wf_run_id=config.run_id,
        entity_id=self.id,
        data=HFStreamingOutputEventMessageData(prompt=prompt),
        event=streaming.event,
        source=StreamingEntitySource(
            name=self.name,
            group=self.group,
            type=self.type,
        ),
    )
    logger.debug(f"Tool {self.name} - {self.id}: sending output event {event}")
    self.run_on_node_execute_stream(callbacks=config.callbacks, event=event, **kwargs)
    event = self.get_input_streaming_event(
        event_msg_type=HFStreamingInputEventMessage,
        event=streaming.event,
        config=config,
    )
    logger.debug(f"Tool {self.name} - {self.id}: received input event {event}")

    return event.data.content

InputMethodCallable

Bases: ABC

Abstract base class for input methods.

This class defines the interface for various input methods that can be used to gather user input in the HumanFeedbackTool.

Source code in dynamiq/nodes/tools/human_feedback.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class InputMethodCallable(ABC):
    """
    Abstract base class for input methods.

    This class defines the interface for various input methods that can be used
    to gather user input in the HumanFeedbackTool.
    """

    @abstractmethod
    def get_input(self, prompt: str, **kwargs) -> str:
        """
        Get input from the user.

        Args:
            prompt (str): The prompt to display to the user.

        Returns:
            str: The user's input.
        """
        pass

get_input(prompt, **kwargs) abstractmethod

Get input from the user.

Parameters:

Name Type Description Default
prompt str

The prompt to display to the user.

required

Returns:

Name Type Description
str str

The user's input.

Source code in dynamiq/nodes/tools/human_feedback.py
39
40
41
42
43
44
45
46
47
48
49
50
@abstractmethod
def get_input(self, prompt: str, **kwargs) -> str:
    """
    Get input from the user.

    Args:
        prompt (str): The prompt to display to the user.

    Returns:
        str: The user's input.
    """
    pass

MessageSenderTool

Bases: Node

A tool for sending messages.

Attributes:

Name Type Description
group Literal[TOOLS]

The group the node belongs to.

name str

The name of the tool.

description str

A brief description of the tool's purpose.

msg_template str

Template of message to send.

output_method FeedbackMethod | InputMethodCallable

The method used to gather user input.

Source code in dynamiq/nodes/tools/human_feedback.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
class MessageSenderTool(Node):
    """
    A tool for sending messages.

    Attributes:
        group (Literal[NodeGroup.TOOLS]): The group the node belongs to.
        name (str): The name of the tool.
        description (str): A brief description of the tool's purpose.
        msg_template (str): Template of message to send.
        output_method (FeedbackMethod | InputMethodCallable): The method used to gather user input.
    """

    group: Literal[NodeGroup.TOOLS] = NodeGroup.TOOLS
    name: str = "Message Sender Tool"
    description: str = """Sends messages to users through console or streaming output methods.

Key Capabilities:
- Message delivery via console or streaming interfaces
- Customizable message templates with parameter substitution
- Real-time status updates and notifications
- Integration with workflow callback systems

Usage Strategy:
- Send status updates during long-running processes
- Provide user notifications for completed operations
- Display error messages with context and details
- Broadcast information to connected clients

Parameter Guide:
- msg_template: Jinja2 template for message formatting
- Dynamic parameters: Content substituted into template

Examples:
- {"msg_template": "Process completed successfully"}
"""
    msg_template: str = "{{input}}"
    output_method: FeedbackMethod | OutputMethodCallable = FeedbackMethod.CONSOLE
    input_schema: ClassVar[type[MessageSenderInputSchema]] = MessageSenderInputSchema
    model_config = ConfigDict(arbitrary_types_allowed=True)

    @model_validator(mode="after")
    def update_description(self):
        msg_template = self.msg_template
        self.description += (
            f"\nThis is the template of message to send: '{msg_template}'."
            " Parameters will be substituted based on the provided input data."
        )
        return self

    def output_method_console(self, prompt: str) -> None:
        """
        Sends message to console.

        Args:
            prompt (str): The prompt to display to the user.
        """
        print(prompt)

    def output_method_streaming(self, prompt: str, config: RunnableConfig, **kwargs) -> None:
        """
        Sends message using streaming method.

        Args:
            prompt (str): The prompt to display to the user.
            config (RunnableConfig, optional): The configuration for the runnable. Defaults to None.
        """
        event = HFStreamingOutputEventMessage(
            wf_run_id=config.run_id,
            entity_id=self.id,
            data=HFStreamingOutputEventMessageData(prompt=prompt),
            event=self.streaming.event,
            source=StreamingEntitySource(
                name=self.name,
                group=self.group,
                type=self.type,
            ),
        )
        logger.debug(f"Tool {self.name} - {self.id}: sending output event {event}")
        self.run_on_node_execute_stream(callbacks=config.callbacks, event=event, **kwargs)

    def execute(
        self, input_data: MessageSenderInputSchema, config: RunnableConfig | None = None, **kwargs
    ) -> dict[str, Any]:
        """
        Execute the tool with the provided input data and configuration.

        This method prompts the user for input using the specified input method and returns the result.

        Args:
            input_data (dict[str, Any]): The input data containing the prompt for the user.
            config (RunnableConfig, optional): The configuration for the runnable. Defaults to None.
            **kwargs: Additional keyword arguments to be passed to the node execute run.

        Returns:
            dict[str, Any]: A dictionary containing the user's input under the 'content' key.

        Raises:
            ValueError: If the input_data does not contain an 'input' key.
        """
        logger.debug(f"Tool {self.name} - {self.id}: started with input data {input_data.model_dump()}")
        config = ensure_config(config)
        self.run_on_node_execute_run(config.callbacks, **kwargs)
        input_text = Template(self.msg_template).render(input_data.model_dump())

        if isinstance(self.output_method, FeedbackMethod):
            if self.output_method == FeedbackMethod.CONSOLE:
                self.output_method_console(input_text)
            elif self.output_method == FeedbackMethod.STREAM:
                self.output_method_streaming(prompt=input_text, config=config, **kwargs)
            else:
                raise ValueError(f"Unsupported feedback method: {self.output_method}")
        else:
            self.output_method.send_message(input_text)

        logger.debug(f"Tool {self.name} - {self.id}: finished")
        return {"content": input_text}

execute(input_data, config=None, **kwargs)

Execute the tool with the provided input data and configuration.

This method prompts the user for input using the specified input method and returns the result.

Parameters:

Name Type Description Default
input_data dict[str, Any]

The input data containing the prompt for the user.

required
config RunnableConfig

The configuration for the runnable. Defaults to None.

None
**kwargs

Additional keyword arguments to be passed to the node execute run.

{}

Returns:

Type Description
dict[str, Any]

dict[str, Any]: A dictionary containing the user's input under the 'content' key.

Raises:

Type Description
ValueError

If the input_data does not contain an 'input' key.

Source code in dynamiq/nodes/tools/human_feedback.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def execute(
    self, input_data: MessageSenderInputSchema, config: RunnableConfig | None = None, **kwargs
) -> dict[str, Any]:
    """
    Execute the tool with the provided input data and configuration.

    This method prompts the user for input using the specified input method and returns the result.

    Args:
        input_data (dict[str, Any]): The input data containing the prompt for the user.
        config (RunnableConfig, optional): The configuration for the runnable. Defaults to None.
        **kwargs: Additional keyword arguments to be passed to the node execute run.

    Returns:
        dict[str, Any]: A dictionary containing the user's input under the 'content' key.

    Raises:
        ValueError: If the input_data does not contain an 'input' key.
    """
    logger.debug(f"Tool {self.name} - {self.id}: started with input data {input_data.model_dump()}")
    config = ensure_config(config)
    self.run_on_node_execute_run(config.callbacks, **kwargs)
    input_text = Template(self.msg_template).render(input_data.model_dump())

    if isinstance(self.output_method, FeedbackMethod):
        if self.output_method == FeedbackMethod.CONSOLE:
            self.output_method_console(input_text)
        elif self.output_method == FeedbackMethod.STREAM:
            self.output_method_streaming(prompt=input_text, config=config, **kwargs)
        else:
            raise ValueError(f"Unsupported feedback method: {self.output_method}")
    else:
        self.output_method.send_message(input_text)

    logger.debug(f"Tool {self.name} - {self.id}: finished")
    return {"content": input_text}

output_method_console(prompt)

Sends message to console.

Parameters:

Name Type Description Default
prompt str

The prompt to display to the user.

required
Source code in dynamiq/nodes/tools/human_feedback.py
274
275
276
277
278
279
280
281
def output_method_console(self, prompt: str) -> None:
    """
    Sends message to console.

    Args:
        prompt (str): The prompt to display to the user.
    """
    print(prompt)

output_method_streaming(prompt, config, **kwargs)

Sends message using streaming method.

Parameters:

Name Type Description Default
prompt str

The prompt to display to the user.

required
config RunnableConfig

The configuration for the runnable. Defaults to None.

required
Source code in dynamiq/nodes/tools/human_feedback.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def output_method_streaming(self, prompt: str, config: RunnableConfig, **kwargs) -> None:
    """
    Sends message using streaming method.

    Args:
        prompt (str): The prompt to display to the user.
        config (RunnableConfig, optional): The configuration for the runnable. Defaults to None.
    """
    event = HFStreamingOutputEventMessage(
        wf_run_id=config.run_id,
        entity_id=self.id,
        data=HFStreamingOutputEventMessageData(prompt=prompt),
        event=self.streaming.event,
        source=StreamingEntitySource(
            name=self.name,
            group=self.group,
            type=self.type,
        ),
    )
    logger.debug(f"Tool {self.name} - {self.id}: sending output event {event}")
    self.run_on_node_execute_stream(callbacks=config.callbacks, event=event, **kwargs)

OutputMethodCallable

Bases: ABC

Abstract base class for sending message.

This class defines the interface for various output methods that can be used to send user information in the MessageSenderTool.

Source code in dynamiq/nodes/tools/human_feedback.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class OutputMethodCallable(ABC):
    """
    Abstract base class for sending message.

    This class defines the interface for various output methods that can be used
    to send user information in the MessageSenderTool.
    """

    @abstractmethod
    def send_message(self, message: str, **kwargs) -> None:
        """
        Sends message to the user

        Args:
            message (str): The message to send to the user.
        """

        pass

send_message(message, **kwargs) abstractmethod

Sends message to the user

Parameters:

Name Type Description Default
message str

The message to send to the user.

required
Source code in dynamiq/nodes/tools/human_feedback.py
61
62
63
64
65
66
67
68
69
70
@abstractmethod
def send_message(self, message: str, **kwargs) -> None:
    """
    Sends message to the user

    Args:
        message (str): The message to send to the user.
    """

    pass