Sandboxes
Clusterfudge sandboxes provide isolated, interactive desktop environments that can be controlled programmatically using the Clusterfudge Python SDK. They are designed to be driven by agentic systems that need full access to an entire operating system to maximise their capabilities.
Sandboxes are useful for a variety of tasks, including:
- Automating the benchmarking of AI capabilities
- Running large-scale synthetic data generation/processing workflows
- Testing agentic behaviour in realistic simulated environments
- Creating secure, isolated execution environments for agents, at scale
Quickstart
To begin launching sandboxes, first install the Clusterfudge SDK:
pip install clusterfudge
Next, grab a Client
instance from the SDK; use this client to create new sandboxes.
Creating a sandbox client
from clusterfudge import Client, SandboxClient, SandboxParams
# Initialise the main Clusterfudge client
cf_client = Client()
# Create a new sandbox
sandbox_id = await cf_client.create_sandbox(
params=SandboxParams(
display_name="my-first-sandbox"
)
)
# Initialise the SandboxClient
sandbox_client = SandboxClient(sandbox_id, cf_client)
print(f"Sandbox created with ID: {sandbox_id}")
# You can now interact with the sandbox using sandbox_client
# ...
The create_sandbox
method is asynchronous and returns the ID of the newly
created sandbox once it's ready. Make sure to run this in an async context.
Once you have a SandboxClient
instance, you can access various tools to interact with the sandbox.
The SandboxClient
provides access to several sub-clients, each designed for a specific type of interaction:
- Computer client: For simulating UI interactions like mouse movements, keyboard inputs, and taking screenshots.
- File editor client: For performing file operations like viewing, creating, and modifying files.
- File manager client: For uploading and downloading files and folders.
- Process client: For interacting with running processes (e.g., writing to stdin, reading stdout/stderr).
Full example
Full sandbox interaction example
import asyncio
import os
from clusterfudge import Client, SandboxParams, SandboxClient
async def run_sandbox_example():
cf_client: Client | None = None
sandbox_id: str | None = None
try:
# 1. Initialise the main Clusterfudge client
# Ensure you have logged in via CLI: `clusterfudge login`
# Alternatively, you can pass your API key directly: Client(api_key="CFSK-...")
print("Initialising Clusterfudge client...")
cf_client = Client()
# 2. Define optional sandbox parameters
sandbox_params = SandboxParams(display_name="My Automated Sandbox")
# 3. Create a new sandbox instance
print("Creating sandbox...")
sandbox_id = await cf_client.create_sandbox(params=sandbox_params)
print(f"Sandbox '{sandbox_id}' is ready.")
# 4. Get the SandboxClient for interaction
sandbox_client = SandboxClient(sandbox_id, cf_client)
# 5. --- Interact with the sandbox ---
# Example: Take a screenshot using the ComputerClient
computer = sandbox_client.computer()
print("Taking screenshot...")
screenshot_bytes = await computer.screenshot()
screenshot_path = "sandbox_screenshot.png"
with open(screenshot_path, "wb") as f:
f.write(screenshot_bytes)
print(f"Screenshot saved to '{os.path.abspath(screenshot_path)}'")
# Add more interactions here using other clients...
# e.g., await sandbox_client.file_editor().create(...)
# e.g., await sandbox_client.process().write_to_process(...)
except Exception as e:
print(f"An error occurred: {e}")
finally:
# 6. Clean up the sandbox
if cf_client and sandbox_id:
try:
print(f"Deleting sandbox '{sandbox_id}'...")
await cf_client.delete_sandbox(sandbox_id)
except Exception as e:
print(f"Error deleting sandbox '{sandbox_id}': {e}")
if __name__ == "__main__":
asyncio.run(run_sandbox_example())
Authentication
To interact with our sandboxes API, you will need valid Clusterfudge credentials, either a session token or an API key.
By default, we will look for an active auth token in the ~/.clusterfudge/config.json
file. To get an auth token, run clusterfudge login
.
Alternatively, you can manually set an API key when instantiating a client. Your Clusterfudge API key is available in our dashboard (click on the user icon in the bottom left => org settings => API keys).
from clusterfudge import Client
cf_client = Client(api_key="CFSK-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX")
Computer client
The ComputerClient
allows you to simulate human interaction with a graphical user interface within the sandbox. This includes actions like mouse movements, clicks, keyboard inputs, and taking screenshots.
To access the ComputerClient
:
computer = sandbox_client.computer()
screenshot()
Takes a screenshot of the entire sandbox screen.
- Returns:
bytes
- The image data of the screenshot.
image_bytes = await computer.screenshot()
with open("screenshot.png", "wb") as f:
f.write(image_bytes)
key(text: str)
Presses a key or key-combination on the keyboard. This supports xdotool's key
syntax.
- Parameters:
text
(str
): The key or key-combination to press (e.g., "a", "Return", "alt+Tab", "ctrl+s").
- Returns:
str
- A confirmation or result message.
await computer.key("ctrl+c")
type(text: str)
Types a string of text on the keyboard.
- Parameters:
text
(str
): The string to type.
- Returns:
str
- A confirmation or result message.
await computer.type("Hello, world!")
mouse_move(x: int, y: int)
Moves the mouse cursor to the specified (x, y) pixel coordinate on the screen.
- Parameters:
x
(int
): The x-coordinate (pixels from the left edge).y
(int
): The y-coordinate (pixels from the top edge).
await computer.mouse_move(100, 200)
left_click_drag(x: int, y: int)
Simulates a left-click and drag operation. The drag starts from the current mouse position to the specified (x,y) coordinate.
- Parameters:
x
(int
): The x-coordinate to drag to.y
(int
): The y-coordinate to drag to.
# Example: Assuming the mouse is already at the desired start position for the drag.
# To explicitly set a start position first, use mouse_move(startX, startY).
await computer.mouse_move(50, 50) # Move to start position
await computer.left_click_drag(300, 400) # Drag to end position
cursor_position()
Gets the current (x, y) pixel coordinate of the mouse cursor.
- Returns:
tuple[int, int]
- A tuple(x, y)
representing the cursor's current position.
pos_x, pos_y = await computer.cursor_position()
print(f"Cursor is at: ({pos_x}, {pos_y})")
click(action: str, x: int | None = None, y: int | None = None, key: str | None = None)
Performs a click action (e.g., left_click
, right_click
) at the specified coordinates. Can also simulate holding a key during the click.
- Parameters:
action
(str
): The type of click action. Valid actions areleft_click
,right_click
,middle_click
,double_click
,triple_click
.x
(int
, optional): The x-coordinate for the click. Required ify
is provided.y
(int
, optional): The y-coordinate for the click. Required ifx
is provided.key
(str
, optional): A key to hold down during the click (e.g., "shift").
# Left click at current position
await computer.click("left_click")
# Right click at (150, 250)
await computer.click("right_click", x=150, y=250)
# Shift + left_click at (50, 50)
await computer.click("left_click", x=50, y=50, key="shift")
left_mouse_down()
Presses and holds the left mouse button down.
await computer.left_mouse_down()
# ... perform other actions while mouse button is held ...
left_mouse_up()
Releases the left mouse button.
# await computer.left_mouse_down() # (if previously pressed and held)
await computer.left_mouse_up()
scroll(direction: str, amount: int, x: int | None = None, y: int | None = None, key: str | None = None)
Scrolls the screen in a specified direction by a certain amount (number of scroll wheel clicks).
- Parameters:
direction
(str
): The direction to scroll. Valid directions areup
,down
,left
,right
.amount
(int
): The number of 'clicks' to scroll.x
(int
, optional): The x-coordinate to scroll at. If provided,y
must also be provided.y
(int
, optional): The y-coordinate to scroll at. If provided,x
must also be provided.key
(str
, optional): A key to hold during the scroll.
# Scroll down by 5 clicks
await computer.scroll("down", 5)
# Scroll right by 2 clicks at (100,100)
await computer.scroll("right", 2, x=100, y=100)
hold_key(text: str, duration: float)
Holds down a key or multiple keys for a specified duration.
- Parameters:
text
(str
): The key or key-combination to hold (e.g., "shift", "ctrl+a").duration
(float
): The duration in seconds to hold the key.
# Hold the shift key for 2.5 seconds
await computer.hold_key("shift", 2.5)
wait(duration: float)
Pauses execution for a specified duration and returns a screenshot taken after the wait.
- Parameters:
duration
(float
): The time to wait in seconds.
- Returns:
bytes
- The image data of the screenshot taken after the wait.
image_after_wait = await computer.wait(5.0) # Wait for 5 seconds
with open("screenshot_after_wait.png", "wb") as f:
f.write(image_after_wait)
Process client
The ProcessClient
allows the creation, and interaction with, individual processes running within the sandbox.
The process_id
used in the methods below is a user-defined identifier for
the process within the sandbox session. It does not correspond to a
system-level Process ID (PID). We recommend using a UUID or a similarly unique
random string for this ID to avoid collisions, especially if you plan to
manage multiple concurrent processes.
To access the ProcessClient
:
process_client = sandbox_client.process()
Data structure: ProcessResponse
Several ProcessClient
methods return a ProcessResponse
object (or a dictionary with similar fields). It has the following attributes:
stdin
(list[str]
): A list of strings representing the history of data written to the process's stdin via this client.stdout
(str
): The latest standard output from the process.stderr
(str
): The latest standard error output from the process.terminal_output
(str
): Consolidated terminal output, potentially including both stdout and stderr, or other terminal control sequences.process_error
(str | None
): An error message specific to the process itself (e.g., if the process crashed internally).exit_code
(int | None
): The exit code of the process if it has terminated.None
if still running.
write_to_process(process_id: str, input_bytes: bytes | str, wait_for_response_ms: int = 300)
Writes bytes (or a string, which will be UTF-8 encoded) to the standard input (stdin) of a specified process and waits for a response.
- Parameters:
process_id
(str
): The user-defined unique identifier of the process to interact with.input_bytes
(bytes | str
): The data to write to the process's stdin.wait_for_response_ms
(int
, optional): Time in milliseconds to wait for the process to respond after writing. Defaults to 300ms.
- Returns:
ProcessResponse
- An object containing details about the process state after the write.
import uuid
# Example: Sending a string to an interactive script
my_interactive_script_id = str(uuid.uuid4())
response = await process_client.write_to_process(
my_interactive_script_id,
"send this command\n",
wait_for_response_ms=500
)
print(f"STDOUT after write: {response.stdout}")
if response.exit_code is not None:
print(f"Process exited with code: {response.exit_code}")
# Example: Sending bytes to a process expecting binary data
my_binary_protocol_id = str(uuid.uuid4())
byte_command = b"\x01\x02\x03\n"
response_bytes = await process_client.write_to_process(
my_binary_protocol_id,
byte_command,
wait_for_response_ms=500
)
print(f"Binary process STDOUT: {response_bytes.stdout}")
kill_process(process_id: str)
Terminates a specified process running within the sandbox.
- Parameters:
process_id
(str
): The user-defined ID of the process to kill.
- Returns:
dict
- A dictionary typically containing:success
(bool
): True if the kill signal was successfully sent.sandbox_error
(str | None
): An error message from the sandbox system if the operation failed (e.g., process not found).
import uuid
process_to_terminate_id = str(uuid.uuid4()) # Example ID of the process to kill
result = await process_client.kill_process(process_to_terminate_id)
if result.get("success"):
print(f"Process {process_to_terminate_id} kill signal sent successfully.")
elif result.get("sandbox_error"):
print(f"Error killing process {process_to_terminate_id}: {result['sandbox_error']}")
else:
print(f"Kill process response for {process_to_terminate_id}: {result}")
get_process(process_id: str)
Retrieves the current state and information about a specified process.
- Parameters:
process_id
(str
): The user-defined ID of the process to query.
- Returns:
dict
- A dictionary that is structurally similar to theProcessResponse
object (containing fields likestdin
,stdout
,stderr
,exit_code
, etc.), plus a potentialsandbox_error
field.
import uuid
long_running_task_id = str(uuid.uuid4()) # Example ID of the process to query
process_info = await process_client.get_process(long_running_task_id)
print(f"Current STDOUT for {long_running_task_id}: {process_info.get('stdout')}")
if process_info.get('exit_code') is not None:
print(f"Process {long_running_task_id} has exited with code: {process_info['exit_code']}")
else:
print(f"Process {long_running_task_id} is still running.")
File editor client
The FileEditorClient
provides tools for interacting with files within the sandbox, such as viewing, creating, and modifying their content.
To access the FileEditorClient
:
file_editor = sandbox_client.file_editor()
view(path: str, view_range: list[int] | None = None)
Views the content of a file. Optionally, a specific range of lines can be specified.
- Parameters:
path
(str
): Absolute path to the file (e.g.,/repo/file.py
).view_range
(list[int]
, optional): A list specifying the start and end line numbers to view (1-indexed, inclusive). E.g.,[10, 20]
views lines 10 to 20.[10, -1]
views from line 10 to the end.
- Returns:
str
- The content of the file or the specified lines.
# View the entire file
content = await file_editor.view("/repo/my_script.py")
print(content)
# View lines 5 to 10
lines_5_to_10 = await file_editor.view("/repo/my_script.py", view_range=[5, 10])
print(lines_5_to_10)
str_replace(path: str, old_str: str, new_str: str)
Replaces all occurrences of a specific string (old_str
) with another string (new_str
) within a file.
- Parameters:
path
(str
): Absolute path to the file.old_str
(str
): The string to be replaced.new_str
(str
): The string to replace with.
- Returns:
str
- A result message indicating the outcome of the operation.
result_msg = await file_editor.str_replace("/repo/config.txt", "old_value", "new_value")
print(result_msg)
insert(path: str, insert_line: int, new_str: str)
Inserts a string (new_str
) into a file AFTER a specified line number (insert_line
).
- Parameters:
path
(str
): Absolute path to the file.insert_line
(int
): The 1-indexed line number after whichnew_str
will be inserted.new_str
(str
): The string to insert.
- Returns:
str
- A result message.
# Insert "new_line_content" after line 3
result_msg = await file_editor.insert("/repo/data.txt", 3, "new_line_content")
print(result_msg)
undo_edit(path: str)
Undoes the last modification made to the specified file by the FileEditorClient
.
- Parameters:
path
(str
): Absolute path to the file.
- Returns:
str
- A result message.
# Assume an edit was just made to /repo/my_file.txt
result_msg = await file_editor.undo_edit("/repo/my_file.txt")
print(result_msg)
create(path: str, file_text: str)
Creates a new file with the specified content.
- Parameters:
path
(str
): Absolute path where the new file will be created (e.g.,/repo/new_file.txt
).file_text
(str
): The content to write into the new file.
- Returns:
str
- A result message.
file_content = "This is a new file.\nHello from Clusterfudge!"
result_msg = await file_editor.create("/repo/sample.txt", file_content)
print(result_msg)
File manager client
The FileManagerClient
is responsible for transferring files and folders between your local machine and the sandbox environment.
To access the FileManagerClient
:
file_manager = sandbox_client.file_manager()
download_file(absolute_file_path: str)
Downloads a specific file from the sandbox.
- Parameters:
absolute_file_path
(str
): The absolute path to the file within the sandbox (e.g.,/sandbox/data/output.txt
).
- Returns:
tuple[bytes, str | None]
- A tuple containing:bytes
: The content of the file as bytes.str | None
: An error message if the download failed, otherwiseNone
.
file_bytes, error = await file_manager.download_file("/sandbox/results.csv")
if error:
print(f"Error downloading file: {error}")
else:
with open("local_results.csv", "wb") as f:
f.write(file_bytes)
print("File downloaded successfully.")
download_folder(absolute_folder_path: str)
Downloads an entire folder from the sandbox as a zip archive.
- Parameters:
absolute_folder_path
(str
): The absolute path to the folder within the sandbox (e.g.,/sandbox/logs/
).
- Returns:
tuple[bytes, str | None]
- A tuple containing:bytes
: The content of the zip archive as bytes.str | None
: An error message if the download failed, otherwiseNone
.
zip_bytes, error = await file_manager.download_folder("/sandbox/all_outputs")
if error:
print(f"Error downloading folder: {error}")
else:
with open("downloaded_folder.zip", "wb") as f:
f.write(zip_bytes)
print("Folder downloaded successfully as a zip file.")
create_file(absolute_file_path: str, contents: bytes, overwrite_existing: bool = False)
Creates a new file in the sandbox with the specified binary content. This is effectively an upload operation for a single file.
- Parameters:
absolute_file_path
(str
): The absolute path where the file will be created in the sandbox (e.g.,/sandbox/input_data.bin
).contents
(bytes
): The binary content to write to the new file.overwrite_existing
(bool
, optional): IfTrue
, an existing file at the path will be overwritten. Defaults toFalse
.
- Returns:
dict
- A dictionary containing:sandbox_error
(str | None
): An error message if the file creation failed, otherwiseNone
.
my_data = b"This is some binary data for the sandbox."
response = await file_manager.create_file("/sandbox/my_binary_file.dat", my_data, overwrite_existing=True)
if response.get("sandbox_error"):
print(f"Error creating file: {response['sandbox_error']}")
else:
print("File created successfully in the sandbox.")
# To upload a text file, encode it to bytes first:
text_content = "Hello, Sandbox!"
response_text = await file_manager.create_file(
"/sandbox/greeting.txt",
text_content.encode('utf-8'),
overwrite_existing=False
)
if response_text.get("sandbox_error"):
print(f"Error creating text file: {response_text['sandbox_error']}")
else:
print("Text file created successfully.")