Documentation Index
Fetch the complete documentation index at: https://docs.boltz.bio/llms.txt
Use this file to discover all available pages before exploring further.
TODO
The main client class for interacting with the Boltz Lab API.
Initialization
from boltz_lab import BoltzPlatformClient
# Using environment variables / config file (recommended)
client = BoltzPlatformClient()
# With explicit API key
client = BoltzPlatformClient(api_key="sk-...")
# With custom timeout
client = BoltzPlatformClient(timeout=60.0) # seconds
Parameters
| Parameter | Type | Default | Description | |
|---|
api_key | `str | None` | None | API key for authentication. If not provided, uses config/env var (BOLTZ_API_KEY) |
base_url | `str | None` | None | API base URL. If not provided, uses config/env var (BOLTZ_API_ENDPOINT) or https://app.boltz.bio |
signup_url | `str | None` | None | Signup URL used in interactive prompts (config/env var: BOLTZ_SIGNUP_URL) |
timeout | float | 30.0 | Default request timeout in seconds | |
Context Manager
Use the client as an async context manager to ensure the underlying HTTP connection pool is cleaned up:
async with BoltzPlatformClient() as client:
status = await client.get_prediction_status("…")
If you don’t use the context manager, call await client.close() when you’re done.
Job Submission
submit_job_from_yaml()
Submit a prediction job from a YAML file path or URL.
job = await client.submit_job_from_yaml(
"path/to/job.yaml",
prediction_name="My Prediction",
)
Parameters
| Parameter | Type | Default | Description | |
|---|
yaml_path | str | Required | Path to YAML file or URL | |
prediction_name | `str | None` | None | Human-readable name for the prediction |
flags | `dict | None` | None | Optional prediction flags (see below) |
Returns
PredictionJob
submit_job_from_dict()
Submit a prediction job from a Python dictionary (already-parsed YAML/JSON).
job_spec = {
"sequences": [
{
"protein": {
"id": ["A"],
"sequence": "MVTPEGNVS...",
"modifications": []
}
},
{
"ligand": {
"id": ["B"],
"smiles": "N[C@@H](Cc1ccc(O)cc1)C(=O)O"
}
}
],
"constraints": [],
"properties": [{"affinity": {"binder": "B"}}],
}
job = await client.submit_job_from_dict(job_spec, prediction_name="Dict Prediction")
Parameters
| Parameter | Type | Default | Description | |
|---|
job_data | dict | Required | Job specification as a Python dictionary | |
prediction_name | `str | None` | None | Human-readable name for the prediction |
flags | `dict | None` | None | Optional prediction flags (see below) |
Returns
PredictionJob
submit_prediction()
Lower-level method used by the helpers above. It submits a complex_data dictionary directly.
job = await client.submit_prediction(
complex_data={"sequences": [...], "constraints": []},
prediction_name="Raw Submit",
)
Prediction flags
All submission methods accept a flags dict. Supported keys include:
use_potentials (bool, default: False)
recycling_steps (int, default: 3)
diffusion_samples (int, default: 1)
sampling_steps (int, default: 200)
step_scale (float, default: 1.5)
subsample_msa (bool, default: True)
num_subsampled_msa (int, default: 1024)
Example:
job = await client.submit_job_from_yaml(
"affinity.yaml",
flags={"recycling_steps": 5, "diffusion_samples": 2},
)
Status and Retrieval
get_prediction_status()
Get the current status of a prediction job.
status = await client.get_prediction_status(prediction_id)
print(status.prediction_status)
print(status.prediction_stage_description)
Returns
PredictionStatus
Notes on status values
PredictionStatus.prediction_status is a string returned by the API.
For filtering and common terminal-state logic, the SDK defines a JobStatus enum:
from boltz_lab.models import JobStatus
print([s.value for s in JobStatus])
# ['PENDING', 'CREATED', 'RUNNING', 'COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT']
list_predictions()
List predictions with optional filtering.
from boltz_lab.models import JobStatus
# List all predictions (default limit=20, offset=0)
resp = await client.list_predictions()
# Filter by status
completed = await client.list_predictions(status=JobStatus.COMPLETED)
# Pagination
page1 = await client.list_predictions(limit=10, offset=0)
page2 = await client.list_predictions(limit=10, offset=10)
Parameters
| Parameter | Type | Default | Description | |
|---|
status | `JobStatus | None` | None | Filter by status |
limit | int | 20 | Maximum number of results to return | |
offset | int | 0 | Number of results to skip for pagination | |
Returns
PredictionListResponse
get_prediction_output_url()
Get a presigned URL (or similar) for the archived prediction output, if available.
url = await client.get_prediction_output_url(prediction_id)
download_results()
Download prediction results to disk.
# Download as tar.gz archive (default)
path = await client.download_results(prediction_id)
# Download into a directory
path = await client.download_results(prediction_id, output_dir="results/")
# Download as JSON (writes prediction_results to a .json file)
json_path = await client.download_results(
prediction_id,
output_dir="results/",
output_format="json",
)
# Custom filename (without extension)
custom_path = await client.download_results(
prediction_id,
output_dir="results/",
output_filename="my_results",
)
Parameters
| Parameter | Type | Default | Description | |
|---|
prediction_id | str | Required | Prediction identifier | |
output_dir | str | "." | Directory to save results | |
output_format | str | "archive" | "archive" (tar.gz) or "json" | |
output_filename | `str | None` | None | Custom filename (without extension) |
Returns
str path to the downloaded file.
wait_for_prediction()
Wait for a prediction to reach a terminal state.
def on_progress(s):
print(s.prediction_status, s.prediction_stage_description)
final_status = await client.wait_for_prediction(
prediction_id,
polling_interval=5,
timeout=600,
progress_callback=on_progress,
)
PredictionJob
PredictionJob is returned by submission methods and provides convenience wrappers around the client.
Properties
| Property | Type | Description |
|---|
prediction_id | str | Unique identifier for the job |
Methods
get_status()
status = await job.get_status()
wait_for_completion()
await job.wait_for_completion(
polling_interval=10,
timeout=600,
progress_callback=lambda s: print(s.prediction_status),
)
download_results()
path = await job.download_results(
output_dir="results/",
output_format="archive",
)
Data Models
PredictionStatus
Fields returned by the API are mapped into PredictionStatus. Common fields include:
| Field | Type | Notes | |
|---|
prediction_id | str | ID | |
prediction_name | str | Name (may be empty) | |
prediction_type | str | Type string from the API | |
prediction_status | str | Status string from the API | |
prediction_stage_description | str | Human-readable stage | |
created_at | datetime | Always present | |
started_at | `datetime | None` | Present once started |
completed_at | `datetime | None` | Present once finished |
estimated_completion_time | `datetime | None` | Optional ETA |
prediction_results | `dict | None` | Results (when embedded in the status response) |
PredictionListResponse
| Field | Type | Description |
|---|
predictions | list[PredictionStatus] | List of predictions |
total | int | Total number of predictions |
Exception Handling
The SDK raises exceptions from boltz_lab.exceptions.
Common exceptions:
from boltz_lab import BoltzAPIError, BoltzAuthenticationError, BoltzNotFoundError, BoltzTimeoutError
from boltz_lab.exceptions import BoltzValidationError, BoltzConnectionError
Example:
try:
job = await client.submit_job_from_yaml("job.yaml")
await job.wait_for_completion(timeout=300)
except BoltzAuthenticationError:
print("Invalid API key")
except BoltzValidationError as e:
print(f"Invalid job specification: {e}")
except BoltzTimeoutError:
print("Job timed out")
except BoltzAPIError as e:
print(f"API error: {e}")
if e.response_data:
print(f"Details: {e.response_data}")
Complete Example
import asyncio
from pathlib import Path
from boltz_lab import BoltzPlatformClient, BoltzAPIError
async def run_prediction():
try:
async with BoltzPlatformClient() as client:
# Submit job
print("Submitting job...")
job = await client.submit_job_from_yaml(
"affinity.yaml",
prediction_name="Tyrosine Affinity Prediction",
)
print(f"Job ID: {job.prediction_id}")
# Wait with progress updates
print("Waiting for completion...")
def progress(status):
print(f" → {status.prediction_stage_description}")
await job.wait_for_completion(
polling_interval=5,
timeout=600,
progress_callback=progress
)
# Download results
print("Downloading results...")
output_dir = Path("results")
output_dir.mkdir(exist_ok=True)
path = await job.download_results(str(output_dir))
print(f"✓ Results saved to: {path}")
except BoltzAPIError as e:
print(f"Error: {e}")
if e.response_data:
print(f"Details: {e.response_data}")
if __name__ == "__main__":
asyncio.run(run_prediction())