Skip to main content
TODO

BoltzPlatformClient

The main client class for interacting with the Boltz Lab API.

Initialization

from boltz_api import BoltzPlatformClient

# Using environment variables / config file (recommended)
client = BoltzPlatformClient()

# With explicit API key
client = BoltzPlatformClient(api_key="sk-...")

# With custom timeout
client = BoltzPlatformClient(timeout=60.0)  # seconds

Parameters

ParameterTypeDefaultDescription
api_key`strNone`NoneAPI key for authentication. If not provided, uses config/env var (BOLTZ_API_KEY)
base_url`strNone`NoneAPI base URL. If not provided, uses config/env var (BOLTZ_API_ENDPOINT) or https://app.boltz.bio
signup_url`strNone`NoneSignup URL used in interactive prompts (config/env var: BOLTZ_SIGNUP_URL)
timeoutfloat30.0Default request timeout in seconds

Context Manager

Use the client as an async context manager to ensure the underlying HTTP connection pool is cleaned up:
async with BoltzPlatformClient() as client:
    status = await client.get_prediction_status("…")
If you don’t use the context manager, call await client.close() when you’re done.

Job Submission

submit_job_from_yaml()

Submit a prediction job from a YAML file path or URL.
job = await client.submit_job_from_yaml(
    "path/to/job.yaml",
    prediction_name="My Prediction",
    priority="low",
)

Parameters

ParameterTypeDefaultDescription
yaml_pathstrRequiredPath to YAML file or URL
prediction_name`strNone`NoneHuman-readable name for the prediction
prioritystr"low"Priority level: "low" or "high"
flags`dictNone`NoneOptional prediction flags (see below)

Returns

PredictionJob

submit_job_from_dict()

Submit a prediction job from a Python dictionary (already-parsed YAML/JSON).
job_spec = {
    "sequences": [
        {
            "protein": {
                "id": ["A"],
                "sequence": "MVTPEGNVS...",
                "modifications": []
            }
        },
        {
            "ligand": {
                "id": ["B"],
                "smiles": "N[C@@H](Cc1ccc(O)cc1)C(=O)O"
            }
        }
    ],
    "constraints": [],
    "properties": [{"affinity": {"binder": "B"}}],
}

job = await client.submit_job_from_dict(job_spec, prediction_name="Dict Prediction")

Parameters

ParameterTypeDefaultDescription
job_datadictRequiredJob specification as a Python dictionary
prediction_name`strNone`NoneHuman-readable name for the prediction
prioritystr"low"Priority level: "low" or "high"
flags`dictNone`NoneOptional prediction flags (see below)

Returns

PredictionJob

submit_prediction()

Lower-level method used by the helpers above. It submits a complex_data dictionary directly.
job = await client.submit_prediction(
    complex_data={"sequences": [...], "constraints": []},
    prediction_name="Raw Submit",
    priority="low",
)

Prediction flags

All submission methods accept a flags dict. Supported keys include:
  • use_potentials (bool, default: False)
  • recycling_steps (int, default: 3)
  • diffusion_samples (int, default: 1)
  • sampling_steps (int, default: 200)
  • step_scale (float, default: 1.5)
  • subsample_msa (bool, default: True)
  • num_subsampled_msa (int, default: 1024)
Example:
job = await client.submit_job_from_yaml(
    "affinity.yaml",
    flags={"recycling_steps": 5, "diffusion_samples": 2},
)

Status and Retrieval

get_prediction_status()

Get the current status of a prediction job.
status = await client.get_prediction_status(prediction_id)
print(status.prediction_status)
print(status.prediction_stage_description)

Returns

PredictionStatus

Notes on status values

PredictionStatus.prediction_status is a string returned by the API. For filtering and common terminal-state logic, the SDK defines a JobStatus enum:
from boltz_api.models import JobStatus

print([s.value for s in JobStatus])
# ['PENDING', 'CREATED', 'RUNNING', 'COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT']

list_predictions()

List predictions with optional filtering.
from boltz_api.models import JobStatus

# List all predictions (default limit=20, offset=0)
resp = await client.list_predictions()

# Filter by status
completed = await client.list_predictions(status=JobStatus.COMPLETED)

# Pagination
page1 = await client.list_predictions(limit=10, offset=0)
page2 = await client.list_predictions(limit=10, offset=10)

Parameters

ParameterTypeDefaultDescription
status`JobStatusNone`NoneFilter by status
limitint20Maximum number of results to return
offsetint0Number of results to skip for pagination

Returns

PredictionListResponse

get_prediction_output_url()

Get a presigned URL (or similar) for the archived prediction output, if available.
url = await client.get_prediction_output_url(prediction_id)

download_results()

Download prediction results to disk.
# Download as tar.gz archive (default)
path = await client.download_results(prediction_id)

# Download into a directory
path = await client.download_results(prediction_id, output_dir="results/")

# Download as JSON (writes prediction_results to a .json file)
json_path = await client.download_results(
    prediction_id,
    output_dir="results/",
    output_format="json",
)

# Custom filename (without extension)
custom_path = await client.download_results(
    prediction_id,
    output_dir="results/",
    output_filename="my_results",
)

Parameters

ParameterTypeDefaultDescription
prediction_idstrRequiredPrediction identifier
output_dirstr"."Directory to save results
output_formatstr"archive""archive" (tar.gz) or "json"
output_filename`strNone`NoneCustom filename (without extension)

Returns

str path to the downloaded file.

wait_for_prediction()

Wait for a prediction to reach a terminal state.
def on_progress(s):
    print(s.prediction_status, s.prediction_stage_description)

final_status = await client.wait_for_prediction(
    prediction_id,
    polling_interval=5,
    timeout=600,
    progress_callback=on_progress,
)

PredictionJob

PredictionJob is returned by submission methods and provides convenience wrappers around the client.

Properties

PropertyTypeDescription
prediction_idstrUnique identifier for the job

Methods

get_status()

status = await job.get_status()

wait_for_completion()

await job.wait_for_completion(
    polling_interval=10,
    timeout=600,
    progress_callback=lambda s: print(s.prediction_status),
)

download_results()

path = await job.download_results(
    output_dir="results/",
    output_format="archive",
)

Data Models

PredictionStatus

Fields returned by the API are mapped into PredictionStatus. Common fields include:
FieldTypeNotes
prediction_idstrID
prediction_namestrName (may be empty)
prediction_typestrType string from the API
prediction_statusstrStatus string from the API
prediction_stage_descriptionstrHuman-readable stage
created_atdatetimeAlways present
started_at`datetimeNone`Present once started
completed_at`datetimeNone`Present once finished
estimated_completion_time`datetimeNone`Optional ETA
prediction_results`dictNone`Results (when embedded in the status response)

PredictionListResponse

FieldTypeDescription
predictionslist[PredictionStatus]List of predictions
totalintTotal number of predictions

Exception Handling

The SDK raises exceptions from boltz_api.exceptions. Common exceptions:
from boltz_api import BoltzAPIError, BoltzAuthenticationError, BoltzNotFoundError, BoltzTimeoutError
from boltz_api.exceptions import BoltzValidationError, BoltzConnectionError
Example:
try:
    job = await client.submit_job_from_yaml("job.yaml")
    await job.wait_for_completion(timeout=300)
except BoltzAuthenticationError:
    print("Invalid API key")
except BoltzValidationError as e:
    print(f"Invalid job specification: {e}")
except BoltzTimeoutError:
    print("Job timed out")
except BoltzAPIError as e:
    print(f"API error: {e}")
    if e.response_data:
        print(f"Details: {e.response_data}")

Complete Example

import asyncio
from pathlib import Path
from boltz_api import BoltzPlatformClient, BoltzAPIError

async def run_prediction():
    try:
        async with BoltzPlatformClient() as client:
            # Submit job
            print("Submitting job...")
            job = await client.submit_job_from_yaml(
                "affinity.yaml",
                prediction_name="Tyrosine Affinity Prediction",
                priority="high",
            )
            print(f"Job ID: {job.prediction_id}")

            # Wait with progress updates
            print("Waiting for completion...")
            def progress(status):
                print(f"  → {status.prediction_stage_description}")

            await job.wait_for_completion(
                polling_interval=5,
                timeout=600,
                progress_callback=progress
            )

            # Download results
            print("Downloading results...")
            output_dir = Path("results")
            output_dir.mkdir(exist_ok=True)

            path = await job.download_results(str(output_dir))
            print(f"✓ Results saved to: {path}")

    except BoltzAPIError as e:
        print(f"Error: {e}")
        if e.response_data:
            print(f"Details: {e.response_data}")

if __name__ == "__main__":
    asyncio.run(run_prediction())