Python SDK
oomol-cloud-task-sdk provides the Python client for OOMOL Cloud Task API v3.
Install
pip install oomol-cloud-task-sdk
Runtime Requirements
- Python
>=3.7 requests>=2.25.0- Python 3.7 additionally requires
typing_extensions>=4.0.0
Authentication
from oomol_cloud_task import OomolTaskClient
client = OomolTaskClient(
api_key="your-api-key",
default_headers={"x-client": "my-app"},
)
- If
api_keyis set, the client sendsAuthorization: Bearer <api_key> - If
api_keyis omitted, you can attach cookies through a customrequests.Session
Quick Start
from oomol_cloud_task import BackoffStrategy, OomolTaskClient
client = OomolTaskClient(api_key="YOUR_API_KEY")
response = client.create_and_wait(
{
"packageName": "@oomol/my-package",
"packageVersion": "1.0.0",
"blockName": "main",
"inputValues": {"text": "hello"},
},
interval_ms=2000,
timeout_ms=10 * 60 * 1000,
backoff_strategy=BackoffStrategy.EXPONENTIAL,
max_interval_ms=10000,
on_progress=lambda progress, status: print("progress:", progress, "status:", status),
)
print("taskID:", response.taskID)
if response.result["status"] == "success":
print("resultURL:", response.result.get("resultURL"))
print("resultData:", response.result.get("resultData"))
API Overview
| Method | Description |
|---|---|
create_task(request) | Create a task |
create_and_wait(request, ...) | Create a task and wait for completion |
list_tasks(query=None) | List the current user's tasks |
get_latest_tasks(workload_ids) | Get the latest task for one or more workloads |
get_task(task_id) | Get task details |
get_task_detail(task_id) | Alias of get_task(task_id) |
get_task_result(task_id) | Get the current task result state |
await_result(task_id, ...) | Poll until the task succeeds or fails |
get_dashboard() | Get limits, queue counts, and pause state |
set_tasks_pause(paused) | Pause or resume the current user's queue |
pause_user_queue() | Pause the current user's queue |
resume_user_queue() | Resume the current user's queue |
upload_file(file, ...) | Upload a file and return a remote URL |
Common Patterns
Create a task
client.create_task(
{
"packageName": "@oomol/my-package",
"packageVersion": "1.0.0",
"blockName": "main",
"inputValues": {"foo": "bar"},
}
)
The client normalizes task creation requests to type: "serverless".
webhookUrl and metadata remain in the request type only for backward compatibility. They are ignored before the request is sent.
Query tasks
page = client.list_tasks(
{
"size": 20,
"status": "running",
"taskType": "user",
}
)
latest = client.get_latest_tasks(
[
"550e8400-e29b-41d4-a716-446655440022",
"550e8400-e29b-41d4-a716-446655440023",
]
)
detail = client.get_task("019234a5-b678-7def-8123-456789abcdef")
result = client.get_task_result("019234a5-b678-7def-8123-456789abcdef")
dashboard = client.get_dashboard()
Pause or resume the queue
client.pause_user_queue()
client.resume_user_queue()
client.set_tasks_pause(True)
client.set_tasks_pause(False)
Upload a file
url = client.upload_file(
"/path/to/file.pdf",
retries=3,
on_progress=lambda progress: print("upload:", progress),
)
upload_file() accepts:
- a filesystem path
- a seekable binary file object
Request And Polling Behavior
list_tasks(query=None)
sizemust be an integer from1to100- Supported filters:
nextToken,status,taskType,workload,workloadID,packageID
get_latest_tasks(workload_ids)
- Accepts
list[str]or a comma-separatedstr - Arrays must contain
1to50workload IDs - Empty strings or empty items raise an error before the request is sent
await_result(task_id, ...)
- Default polling interval:
3000ms - Default backoff strategy:
BackoffStrategy.EXPONENTIAL - Default max polling interval:
3000ms - If
timeout_msis omitted, polling continues until success or failure - Temporary request failures during polling are retried automatically
- Task failure raises
TaskFailedError - Timeout raises
TimeoutError - If polling had request errors before timeout, the timeout message includes the latest polling error
create_and_wait(request, ...)
- Returns a
CreateAndWaitResponsenamed tuple - You can access
response.taskIDandresponse.result - Tuple unpacking also works:
task_id, result = response
Key Types
CreateTaskRequest
Only serverless tasks are supported:
CreateTaskRequest = {
"type": "serverless", # optional
"packageName": str,
"packageVersion": str,
"blockName": str,
"inputValues": dict, # optional
"webhookUrl": str, # optional, ignored by the client
"metadata": dict, # optional, ignored by the client
}
TaskResultResponse
Three result shapes are possible:
{"status": "queued" | "scheduling" | "scheduled" | "running", "progress": float}{"status": "success", "resultURL": str | None, "resultData": list}{"status": "failed", "error": str | None}
Constructor Arguments
| Argument | Description |
|---|---|
api_key | Bearer token |
base_url | Task API base URL, default https://cloud-task.oomol.com |
default_headers | Extra headers attached to every request |
session | Custom requests.Session |
Public Type Helpers
The package also exports:
AwaitOptionsBackoffOptionsBackoffStrategyClientOptionsUploadOptionsTaskStatusTaskTerminalStatusRunTaskErrorCode
Error Handling
from oomol_cloud_task import (
ApiError,
RunTaskErrorCode,
TaskFailedError,
TimeoutError,
UploadError,
)
try:
response = client.create_and_wait(
{
"packageName": "@oomol/my-package",
"packageVersion": "1.0.0",
"blockName": "main",
},
timeout_ms=60_000,
)
print(response)
except TaskFailedError as error:
print(error.task_id, error.code, error.status_code, error.message, error.detail)
except TimeoutError as error:
print(error)
except ApiError as error:
print(error.status, error.body)
except UploadError as error:
print(error.status_code, error.code, error)
print(RunTaskErrorCode.INSUFFICIENT_QUOTA)