Skip to content

E2b sandbox

E2BInterpreterTool

Bases: BaseCodeInterpreterTool

E2B-specific sandbox interpreter tool.

Thin adapter over BaseCodeInterpreterTool that delegates to the E2B SDK.

Source code in dynamiq/nodes/tools/e2b_sandbox.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class E2BInterpreterTool(BaseCodeInterpreterTool):
    """
    E2B-specific sandbox interpreter tool.

    Thin adapter over BaseCodeInterpreterTool that delegates to the E2B SDK.
    """

    name: str = "e2b-code-interpreter-tool"
    description: str = DESCRIPTION_E2B
    connection: E2BConnection
    _sandbox: Sandbox | None = None
    _rate_limit_exception: ClassVar[type[Exception]] = E2BRateLimitException

    def _create_sandbox(self) -> Sandbox:
        return Sandbox.create(
            api_key=self.connection.api_key,
            timeout=self.timeout,
            domain=self.connection.domain,
        )

    def _get_sandbox_id(self, sandbox: Sandbox) -> str:
        return sandbox.sandbox_id

    def _get_sandbox_host(self, sandbox: Sandbox) -> str | None:
        try:
            return sandbox.get_host(port=sandbox.connection_config.envd_port)
        except Exception:
            return None

    def _execute_python_code(
        self, code: str, sandbox: Any, params: dict | None = None, timeout: int | None = None
    ) -> str:
        if not sandbox:
            raise ValueError("Sandbox instance is required for code execution.")

        if params:
            vars_code = "\n# Tool params variables\n"
            for key, value in params.items():
                resolved = self._resolve_param_value(value, sandbox)
                vars_code += f"{key} = {repr(resolved)}\n"

            code = vars_code + "\n" + code

        try:
            logger.info(f"Executing Python code: {code}")
            execution = sandbox.run_code(code, timeout=timeout)
            output_parts = []

            if execution.text:
                output_parts.append(execution.text)

            if execution.error:
                if "NameError" in str(execution.error) and self.persistent_sandbox:
                    logger.debug(f"Tool {self.name}: Recoverable NameError in persistent session: {execution.error}")
                raise ToolExecutionException(f"Error during Python code execution: {execution.error}", recoverable=True)

            if hasattr(execution, "logs") and execution.logs:
                if hasattr(execution.logs, "stdout") and execution.logs.stdout:
                    for log in execution.logs.stdout:
                        output_parts.append(log)
                if hasattr(execution.logs, "stderr") and execution.logs.stderr:
                    for log in execution.logs.stderr:
                        output_parts.append(f"[stderr] {log}")

            return "\n".join(output_parts) if output_parts else ""

        except ToolExecutionException:
            raise
        except Exception as e:
            raise ToolExecutionException(f"Error during Python code execution: {e}", recoverable=True)

    def _execute_shell_command(
        self,
        command: str,
        sandbox: Any,
        env: dict | None = None,
        cwd: str | None = None,
        timeout: int | None = None,
    ) -> str:
        if not sandbox:
            raise ValueError("Sandbox instance is required for command execution.")

        run_kwargs: dict[str, Any] = {"background": True, "envs": env or {}, "cwd": cwd}
        if timeout:
            run_kwargs["timeout"] = timeout

        try:
            process = sandbox.commands.run(command, **run_kwargs)
        except Exception as e:
            raise ToolExecutionException(f"Error during shell command execution: {e}", recoverable=True)

        output = process.wait()
        if output.exit_code != 0:
            raise ToolExecutionException(f"Error during shell command execution: {output.stderr}", recoverable=True)
        return output.stdout

    def _install_packages(self, sandbox: Any, packages: str) -> None:
        if packages:
            logger.debug(f"Tool {self.name} - {self.id}: Installing packages: {packages}")
            try:
                process = sandbox.commands.run(f"pip install -qq {' '.join(packages.split(','))}")
            except Exception as e:
                raise ToolExecutionException(f"Error during package installation: {e}", recoverable=True)

            if process.exit_code != 0:
                raise ToolExecutionException(f"Error during package installation: {process.stderr}", recoverable=True)

    def _upload_file_to_sandbox(self, file: io.BytesIO, target_path: str, sandbox: Any) -> str:
        uploaded_info = sandbox.files.write(target_path, file)
        logger.debug(f"Tool {self.name} - {self.id}: Uploaded file info: {uploaded_info}")
        return uploaded_info.path

    def _download_file_bytes(self, file_path: str, sandbox: Any) -> bytes:
        return bytes(sandbox.files.read(file_path, "bytes"))

    def _run_shell_command(self, command: str, sandbox: Any) -> tuple[int, str]:
        res = sandbox.commands.run(command)
        if hasattr(res, "wait"):
            out = res.wait()
        else:
            out = res
        return out.exit_code, out.stdout

    def _reconnect_sandbox(self, sandbox_id: str) -> Sandbox:
        return Sandbox.connect(
            sandbox_id,
            api_key=self.connection.api_key,
            domain=self.connection.domain,
        )

    def _destroy_sandbox(self, sandbox: Any) -> None:
        sandbox.kill()

    def set_timeout(self, timeout: int) -> None:
        super().set_timeout(timeout)
        if self._sandbox and self.persistent_sandbox:
            try:
                self._sandbox.set_timeout(timeout)
                logger.debug(f"Tool {self.name} - {self.id}: Updated sandbox timeout to {timeout}s")
            except Exception as e:
                logger.warning(f"Tool {self.name} - {self.id}: Failed to update sandbox timeout: {e}")

    def close(self) -> None:
        """Close the persistent sandbox if it exists."""
        if self._sandbox and self.persistent_sandbox:
            logger.debug(f"Tool {self.name} - {self.id}: Closing Sandbox")
            self._sandbox.kill()
            self._sandbox = None

close()

Close the persistent sandbox if it exists.

Source code in dynamiq/nodes/tools/e2b_sandbox.py
157
158
159
160
161
162
def close(self) -> None:
    """Close the persistent sandbox if it exists."""
    if self._sandbox and self.persistent_sandbox:
        logger.debug(f"Tool {self.name} - {self.id}: Closing Sandbox")
        self._sandbox.kill()
        self._sandbox = None