Python API

简介

开发者需要在 Scriptlet Block 中调用 SDK 与 OOCANA 工作流引擎进行交互。本文档详细介绍了相关 API 的使用方法。

快速开始

在 Scriptlet Block 中使用 SDK 时,需要导入 Context 对象,并通过该对象调用相关 API。

Scriptlet 模板

下面是 Scriptlet 的模板函数,需要在函数体内编写执行代码。函数的输入和输出类型可自定义,并且支持导入第三方库。 向后续 block 传递数据,可以像普通函数一样 return 一个字典即可。oocana 会将第一级的 key 作为对应的 handle 输出,value 做为这个 handle 的值进行输出。

from oocana import Context

def main(params: dict, context: Context):

# 此处编写需要执行的代码
# 例如,向下游传递一个 output 数据,值为 None。
return { "output": None }

核心 API

    def finish(self, *, result: Dict[str, Any] | None = None, error: str | None = None):
"""
结束当前块的执行,并将结果发送给 oocana。
如果 error 不为 None,则以错误状态结束当前块。
如果 result 不为 None,则以指定结果结束当前块。
如果 error 和 result 均为 None,则正常结束当前块且不返回结果。
"""

def output(self, key: str, value: Any):
"""
将数据输出到下游块。

key: str,输出的键名,需在块的输出定义(handle 字段)中声明
value: Any,输出的值
"""

def outputs(self, outputs: Dict[str, Any]):
"""
批量将数据输出到下游块。

outputs: Dict[str, Any],输出的键值对,键需在块的输出定义(handle 字段)中声明
"""

# 启用数据预览功能
def preview(self, payload: PreviewPayload):
# 获取会话临时目录路径,可用于存储临时文件
def session_dir(self) -> str:
# 报告执行进度(0-100),内置节流机制,每300ms触发一次
def report_progress(self, progress: float | int):

Context.preview 类型

context.preview 是一个核心 API,用于预览各种类型的数据。该 API 支持几乎所有常见数据格式的预览。

PreviewPayload 类型定义

函数签名:context.preview

    def preview(self, payload: PreviewPayload):
# 仅支持 Python
payload = self.__dataframe(payload)
# 仅支持 Python
payload = self.__matplotlib(payload)

self.__mainframe.report(
self.block_info,
{
"type": "BlockPreview",
"payload": payload,
},
)

参数类型:PreviewPayload


PreviewPayload: TypeAlias = Union[
TablePreviewPayload,
TextPreviewPayload,
JSONPreviewPayload,
ImagePreviewPayload,
MediaPreviewPayload,
DataFrame,
PandasPreviewPayload,
DefaultPreviewPayload
]

class TablePreviewData(TypedDict):
columns: List[str | int | float]
rows: List[List[str | int | float | bool]]
row_count: int | None

class TablePreviewPayload(TypedDict):
type: Literal['table']
data: TablePreviewData | Any

class TextPreviewPayload(TypedDict):
type: Literal["text"]
data: Any

class JSONPreviewPayload(TypedDict):
type: Literal["json"]
data: Any

class ImagePreviewPayload(TypedDict):
type: Literal['image']
data: str | List[str]

class MediaPreviewPayload(TypedDict):
type: Literal["image", 'video', 'audio', 'markdown', "iframe", "html"]
data: str

class PandasPreviewPayload(TypedDict):
type: Literal['table']
data: DataFrame

class DefaultPreviewPayload:
type: str
data: Any

Preview 使用示例

  context.preview({
# 支持的类型: "image" | "video" | "audio" | "markdown" | "iframe" | "html" | "json" | "text" | `text/${string}` | "table"
"type": "video",
# 数据可以是: 文件路径、base64 编码或 pandas DataFrame
"data": "https://www.w3schools.com/html/mov_bbb.mp4"
})
提示

以下功能仅支持 Python

    import pandas as pd

df = pd.DataFrame({
'A': [1, 2, 3],
'B': [4, 5, 6],
'C': [7, 8, 9]
})
context.preview(df)
    import matplotlib.pyplot as plt
import numpy as np

x = np.linspace(0, 10, 100)
y = np.sin(x)
plt.plot(x, y)
plt.show()
    import plotly.graph_objects as go

fig = go.Figure(data=[go.Bar(y=[2, 3, 1])])
fig.show()

Context 类型定义

OOCANA SDK 提供的完整 API 类型描述,帮助开发者准确理解和使用这些接口。

class OOMOL_LLM_ENV(TypedDict):
base_url: str
base_url_v1: str
api_key: str
models: list[str]

class HostInfo(TypedDict):
gpu_vendor: str
gpu_renderer: str

class Context:
def __init__(self, inputs: dict[str, Any], blockInfo: BlockInfo, mainframe: Mainframe, store, outputs, session_dir: str, tmp_dir: str, package_name: str, pkg_dir: str) -> None: ...
@property
def logger(self) -> logging.Logger: ...
@property
def session_dir(self) -> str: ...
@property
def tmp_dir(self) -> str: ...
@property
def tmp_pkg_dir(self) -> str: ...
@property
def pkg_dir(self) -> str: ...
@property
def keepAlive(self): ...
@property
def inputs(self): ...
@property
def session_id(self): ...
@property
def job_id(self): ...
@property
def job_info(self) -> JobDict: ...
@property
def block_info(self) -> BlockDict: ...
@property
def node_id(self) -> str: ...
@property
def oomol_llm_env(self) -> OOMOL_LLM_ENV: ...
@property
def host_info(self) -> HostInfo: ...
@property
def host_endpoint(self) -> str | None: ...
def output(self, key: str, value: Any): ...
"""
将数据输出到下游块

key: str,输出的键名,需在块的输出定义(handle 字段)中声明
value: Any,输出的值
"""
def outputs(self, outputs: dict[str, Any]): ...
"""
批量将数据输出到下游块

outputs: Dict[str, Any],输出的键值对,键需在块的输出定义(handle 字段)中声明
"""
def finish(self, *, result: dict[str, Any] | None = None, error: str | None = None): ...
"""
结束当前块的执行,并将结果发送给 oocana。
如果 error 不为 None,则以错误状态结束当前块。
如果 result 不为 None,则以指定结果结束当前块。
如果 error 和 result 均为 None,则正常结束当前块且不返回结果。
"""
def send_message(self, payload) -> None: ...
def preview(self, payload: PreviewPayload): ...
def report_progress(self, progress: float | int): ...
"""报告进度

此 API 用于报告块的执行进度,仅影响 UI 进度条,不影响实际进度。
此 API 内置节流机制,最小间隔为 0.3 秒。
第一次调用时会立即上报进度,之后每隔节流周期结束时才会上报一次。

| 0.25 秒 | 0.2 秒 |
第一次调用 第二次调用 第三次调用 第四五六七次调用
| | | | | | |
| -------- 0.3 秒 -------- | -------- 0.3 秒 -------- |
上报 上报 上报
:param float | int progress: 块的进度值,取值范围应为 [0, 100]。
"""
def report_log(self, line: str, stdio: str = 'stdout'): ...
def log_json(self, payload) -> None: ...
def send_warning(self, warning: str): ...
def send_error(self, error: str): ...
def error(self, error: str): ...