15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162 | class E2BInterpreterTool(BaseCodeInterpreterTool):
"""
E2B-specific sandbox interpreter tool.
Thin adapter over BaseCodeInterpreterTool that delegates to the E2B SDK.
"""
name: str = "e2b-code-interpreter-tool"
description: str = DESCRIPTION_E2B
connection: E2BConnection
_sandbox: Sandbox | None = None
_rate_limit_exception: ClassVar[type[Exception]] = E2BRateLimitException
def _create_sandbox(self) -> Sandbox:
return Sandbox.create(
api_key=self.connection.api_key,
timeout=self.timeout,
domain=self.connection.domain,
)
def _get_sandbox_id(self, sandbox: Sandbox) -> str:
return sandbox.sandbox_id
def _get_sandbox_host(self, sandbox: Sandbox) -> str | None:
try:
return sandbox.get_host(port=sandbox.connection_config.envd_port)
except Exception:
return None
def _execute_python_code(
self, code: str, sandbox: Any, params: dict | None = None, timeout: int | None = None
) -> str:
if not sandbox:
raise ValueError("Sandbox instance is required for code execution.")
if params:
vars_code = "\n# Tool params variables\n"
for key, value in params.items():
resolved = self._resolve_param_value(value, sandbox)
vars_code += f"{key} = {repr(resolved)}\n"
code = vars_code + "\n" + code
try:
logger.info(f"Executing Python code: {code}")
execution = sandbox.run_code(code, timeout=timeout)
output_parts = []
if execution.text:
output_parts.append(execution.text)
if execution.error:
if "NameError" in str(execution.error) and self.persistent_sandbox:
logger.debug(f"Tool {self.name}: Recoverable NameError in persistent session: {execution.error}")
raise ToolExecutionException(f"Error during Python code execution: {execution.error}", recoverable=True)
if hasattr(execution, "logs") and execution.logs:
if hasattr(execution.logs, "stdout") and execution.logs.stdout:
for log in execution.logs.stdout:
output_parts.append(log)
if hasattr(execution.logs, "stderr") and execution.logs.stderr:
for log in execution.logs.stderr:
output_parts.append(f"[stderr] {log}")
return "\n".join(output_parts) if output_parts else ""
except ToolExecutionException:
raise
except Exception as e:
raise ToolExecutionException(f"Error during Python code execution: {e}", recoverable=True)
def _execute_shell_command(
self,
command: str,
sandbox: Any,
env: dict | None = None,
cwd: str | None = None,
timeout: int | None = None,
) -> str:
if not sandbox:
raise ValueError("Sandbox instance is required for command execution.")
run_kwargs: dict[str, Any] = {"background": True, "envs": env or {}, "cwd": cwd}
if timeout:
run_kwargs["timeout"] = timeout
try:
process = sandbox.commands.run(command, **run_kwargs)
except Exception as e:
raise ToolExecutionException(f"Error during shell command execution: {e}", recoverable=True)
output = process.wait()
if output.exit_code != 0:
raise ToolExecutionException(f"Error during shell command execution: {output.stderr}", recoverable=True)
return output.stdout
def _install_packages(self, sandbox: Any, packages: str) -> None:
if packages:
logger.debug(f"Tool {self.name} - {self.id}: Installing packages: {packages}")
try:
process = sandbox.commands.run(f"pip install -qq {' '.join(packages.split(','))}")
except Exception as e:
raise ToolExecutionException(f"Error during package installation: {e}", recoverable=True)
if process.exit_code != 0:
raise ToolExecutionException(f"Error during package installation: {process.stderr}", recoverable=True)
def _upload_file_to_sandbox(self, file: io.BytesIO, target_path: str, sandbox: Any) -> str:
uploaded_info = sandbox.files.write(target_path, file)
logger.debug(f"Tool {self.name} - {self.id}: Uploaded file info: {uploaded_info}")
return uploaded_info.path
def _download_file_bytes(self, file_path: str, sandbox: Any) -> bytes:
return bytes(sandbox.files.read(file_path, "bytes"))
def _run_shell_command(self, command: str, sandbox: Any) -> tuple[int, str]:
res = sandbox.commands.run(command)
if hasattr(res, "wait"):
out = res.wait()
else:
out = res
return out.exit_code, out.stdout
def _reconnect_sandbox(self, sandbox_id: str) -> Sandbox:
return Sandbox.connect(
sandbox_id,
api_key=self.connection.api_key,
domain=self.connection.domain,
)
def _destroy_sandbox(self, sandbox: Any) -> None:
sandbox.kill()
def set_timeout(self, timeout: int) -> None:
super().set_timeout(timeout)
if self._sandbox and self.persistent_sandbox:
try:
self._sandbox.set_timeout(timeout)
logger.debug(f"Tool {self.name} - {self.id}: Updated sandbox timeout to {timeout}s")
except Exception as e:
logger.warning(f"Tool {self.name} - {self.id}: Failed to update sandbox timeout: {e}")
def close(self) -> None:
"""Close the persistent sandbox if it exists."""
if self._sandbox and self.persistent_sandbox:
logger.debug(f"Tool {self.name} - {self.id}: Closing Sandbox")
self._sandbox.kill()
self._sandbox = None
|