Python API
Introduction
Developers will call our SDK in the Scriptlet Block to interact with the OOCANA
workflow engine. This document provides detailed instructions on how to use the related APIs.
Quick Start
How to use the SDK in the Scriptlet Block. You need to import the Context
object and use it to call the related APIs.
Scriptlet
Template
Below is the template function for a Scriptlet. You need to write your execution code within the function body. The input and output types can be customized, and third-party libraries are supported. To pass data to subsequent blocks, simply return a dictionary as you would in a normal function. OOCANA will use the top-level keys as the corresponding output handles, and the values as the output for those handles.
from oocana import Context
def main(params: dict, context: Context):
# Your code will be executed here
return { "output": None }
Common APIs
def finish(self, *, result: Dict[str, Any] | None = None, error: str | None = None):
"""
finish the block, and send the result to oocana.
if error is not None, the block will be finished with error.
then if result is not None, the block will be finished with result.
lastly, if both error and result are None, the block will be finished without any result.
"""
def outputs(self, outputs: Dict[str, Any]):
"""
output the value to the next block
map: Dict[str, Any], the key of the output, should be defined in the block schema output defs, the field name is handle
"""
def output(self, key: str, value: Any):
"""
output the value to the next block
key: str, the key of the output, should be defined in the block schema output defs, the field name is handle
value: Any, the value of the output
"""
# Enable preview
def preview(self, payload: PreviewPayload):
# Temporary directory for the entire session, users can use it to store temporary files
def session_dir(self) -> str:
# Report progress, progress 0~100, this function is throttled to trigger every 300ms
def report_progress(self, progress: float | int):
"""report progress
This api is used to report the progress of the block. but it just effect the ui progress not the real progress.
This api is throttled. the minimum interval is 0.3s.
When you first call this api, it will report the progress immediately. After it invoked once, it will report the progress at the end of the throttling period.
| 0.25 s | 0.2 s |
first call second call third call 4 5 6 7's calls
| | | | | | |
| -------- 0.3 s -------- | -------- 0.3 s -------- |
invoke invoke invoke
:param float | int progress: the progress of the block, the value should be in [0, 100].
"""
Context.preview
Types
context.preview
is a core API. Its main function is to help developers preview data. We support previews for almost all common data types.
PreviewPayload
Types
Function name: context.preview
def preview(self, payload: PreviewPayload):
# Only supports python
payload = self.__dataframe(payload)
# Only supports python
payload = self.__matplotlib(payload)
self.__mainframe.report(
self.block_info,
{
"type": "BlockPreview",
"payload": payload,
},
)
Parameter type: PreviewPayload
PreviewPayload: TypeAlias = Union[
TablePreviewPayload,
TextPreviewPayload,
JSONPreviewPayload,
ImagePreviewPayload,
MediaPreviewPayload,
DataFrame,
PandasPreviewPayload,
DefaultPreviewPayload
]
class TablePreviewData(TypedDict):
columns: List[str | int | float]
rows: List[List[str | int | float | bool]]
row_count: int | None
class TablePreviewPayload(TypedDict):
type: Literal['table']
data: TablePreviewData | Any
class TextPreviewPayload(TypedDict):
type: Literal["text"]
data: Any
class JSONPreviewPayload(TypedDict):
type: Literal["json"]
data: Any
class ImagePreviewPayload(TypedDict):
type: Literal['image']
data: str | List[str]
class MediaPreviewPayload(TypedDict):
type: Literal["image", 'video', 'audio', 'markdown', "iframe", "html"]
data: str
class PandasPreviewPayload(TypedDict):
type: Literal['table']
data: DataFrame
class DefaultPreviewPayload:
type: str
data: Any
Preview
Examples
context.preview({
# Type can be: "image" | "video" | "audio" | "markdown" | "iframe" | "html" | "json" | "text" | `text/${string}` | "table";
"type": "video",
# Type can be: file path, base64, pandas dataframe
"data": "https://www.w3schools.com/html/mov_bbb.mp4"
})
Only supports Python
import pandas as pd
df = pd.DataFrame({
'A': [1, 2, 3],
'B': [4, 5, 6],
'C': [7, 8, 9]
})
context.preview(df)

import matplotlib.pyplot as plt
import numpy as np
x = np.linspace(0, 10, 100)
y = np.sin(x)
plt.plot(x, y)
plt.show()

import plotly.graph_objects as go
fig = go.Figure(data=[go.Bar(y=[2, 3, 1])])
fig.show()

Context
Types
API type descriptions provided by the OOCANA
SDK to help users understand how to use these APIs accurately.
class Context:
__inputs: Dict[str, Any]
__block_info: BlockInfo
__outputs_def: Dict[str, HandleDef]
__store: Any
__is_done: bool = False
__keep_alive: OnlyEqualSelf = OnlyEqualSelf()
__session_dir: str
def __init__(
self, inputs: Dict[str, Any], blockInfo: BlockInfo, mainframe: Mainframe, store, outputs, session_dir: str
) -> None:
self.__block_info = blockInfo
self.__mainframe = mainframe
self.__store = store
self.__inputs = inputs
outputs_defs = {}
if outputs is not None:
for k, v in outputs.items():
outputs_defs[k] = HandleDef(**v)
self.__outputs_def = outputs_defs
self.__session_dir = session_dir
@property
def session_dir(self) -> str:
"""Temporary directory for the current session, all blocks in a session will share the same directory."""
return self.__session_dir
@property
def keepAlive(self):
return self.__keep_alive
@property
def inputs(self):
return self.__inputs
@property
def session_id(self):
return self.__block_info.session_id
@property
def job_id(self):
return self.__block_info.job_id
@property
def job_info(self) -> JobDict:
return self.__block_info.job_info()
@property
def block_info(self) -> BlockDict:
return self.__block_info.block_dict()
@property
def node_id(self) -> str:
return self.__block_info.stacks[-1].get("node_id", None)
def __store_ref(self, handle: str):
return StoreKey(
executor=EXECUTOR_NAME,
handle=handle,
job_id=self.job_id,
session_id=self.session_id,
)
def __is_basic_type(self, value: Any) -> bool:
return isinstance(value, (int, float, str, bool))
def output(self, key: str, value: Any, done: bool = False):
"""
Output the value to the next block
key: str, the output key, should be defined in the block mode output definition, field name is handle
value: Any, the output value
done: bool, whether to mark the current block as completed
"""
v = value
if self.__outputs_def is not None:
output_def = self.__outputs_def.get(key)
if (
output_def is not None and output_def.is_var_handle() and not self.__is_basic_type(value) # Basic types are not stored in the store even if they are variables, they are directly passed as json content
):
ref = self.__store_ref(key)
self.__store[ref] = value
v = asdict(ref)
elif output_def is not None and output_def.is_bin_handle():
if not isinstance(value, bytes):
self.send_warning(
f"Output handle key: [{key}] is defined as binary, but the value is not bytes."
)
return
bin_file = f"{self.session_dir}/binary/{self.session_id}/{key}"
os.makedirs(os.path.dirname(bin_file), exist_ok=True)
try:
with open(bin_file, "wb") as f:
f.write(value)
except IOError as e:
self.send_warning(
f"Output handle key: [{key}] is defined as binary, but an error occurred while writing the file: {e}"
)
return
if os.path.exists(bin_file):
v = bin_file
else:
self.send_warning(
f"Output handle key: [{key}] is defined as binary, but the file was not written."
)
return
# If the passed key does not exist in the output definition, ignore it and do not send data. But done still takes effect.
if self.__outputs_def is not None and self.__outputs_def.get(key) is None:
self.send_warning(
f"Output handle key: [{key}] is not defined in the block output mode."
)
if done:
self.done()
return
node_result = {
"type": "BlockOutput",
"handle": key,
"output": v,
"done": done,
}
self.__mainframe.send(self.job_info, node_result)
if done:
self.done()
def done(self, error: str | None = None):
if self.__is_done:
self.send_warning("done has been called multiple times and will be ignored.")
return
self.__is_done = True
if error is None:
self.__mainframe.send(self.job_info, {"type": "BlockFinished"})
else:
self.__mainframe.send(
self.job_info, {"type": "BlockFinished", "error": error}
)
def send_message(self, payload):
self.__mainframe.report(
self.block_info,
{
"type": "BlockMessage",
"payload": payload,
},
)
def __dataframe(self, payload: PreviewPayload) -> PreviewPayload:
if isinstance(payload, DataFrame):
payload = { "type": "table", "data": payload }
if isinstance(payload, dict) and payload.get("type") == "table":
df = payload.get("data")
if isinstance(df, ShapeDataFrame):
row_count = df.shape[0]
if row_count <= 10:
data = df.to_dict(orient='split')
columns = data.get("columns", [])
rows = data.get("data", [])
elif isinstance(df, PartialDataFrame):
data_columns = loads(df.head(5).to_json(orient='split'))
columns = data_columns.get("columns", [])
rows_head = data_columns.get("data", [])
data_tail = loads(df.tail(5).to_json(orient='split'))
rows_tail = data_tail.get("data", [])
rows_dots = [["..."] * len(columns)]
rows = rows_head + rows_dots + rows_tail
else:
print("dataframe exceeds 10 rows but does not support head and tail")
return payload
data: TablePreviewData = { "rows": rows, "columns": columns, "row_count": row_count }
payload = { "type": "table", "data": data }
else:
print("dataframe does not support shape attribute")
return payload
def __matplotlib(self, payload: PreviewPayload) -> PreviewPayload:
# payload is a matplotlib Figure
if hasattr(payload, 'savefig'):
fig: Any = payload
buffer = BytesIO()
fig.savefig(buffer, format='png')
buffer.seek(0)
png = buffer.getvalue()
buffer.close()
url = f'data:image/png;base64,{b64encode(png).decode("utf-8")}'
payload = { "type": "image", "data": url }
return payload
def preview(self, payload: PreviewPayload):
payload = self.__dataframe(payload)
payload = self.__matplotlib(payload)
self.__mainframe.report(
self.block_info,
{
"type": "BlockPreview",
"payload": payload,
},
)
@throttle(0.3)
def report_progress(self, progress: float | int):
"""Report progress
This API is used to report the progress of the block, but it only affects the UI progress and not the real progress.
This API is throttled with a minimum interval of 0.3 seconds.
When you call this API for the first time, it will report the progress immediately. After calling it once, it will report the progress at the end of the throttle cycle.
| 0.25 s | 0.2 s |
First call Second call Third call 4 5 6 7 calls
| | | | | | |
| -------- 0.3 s -------- | -------- 0.3 s -------- |
Call Call Call
:param float | int progress: The progress of the block, the value should be between [0, 100].
"""
self.__mainframe.report(
self.block_info,
{
"type": "BlockProgress",
"rate": progress,
}
)
def report_log(self, line: str, stdio: str = "stdout"):
self.__mainframe.report(
self.block_info,
{
"type": "BlockLog",
"log": line,
stdio: stdio,
},
)
def log_json(self, payload):
self.__mainframe.report(
self.block_info,
{
"type": "BlockLogJSON",
"json": payload,
},
)
def send_warning(self, warning: str):
self.__mainframe.report(self.block_info, {"type": "BlockWarning", "warning": warning})
def send_error(self, error: str):
'''
Deprecated, please use error(error) instead.
Consider removing in the future.
'''
self.error(error)
def error(self, error: str):
self.__mainframe.send(self.job_info, {"type": "BlockError", "error": error})