Skip to content

实时流 — RPC Manual

方法索引

控制面方法(通过 Gateway JSON-RPC)

方法说明
stream.create创建流,返回推流/拉流 URL
stream.close关闭流(仅创建者)
stream.get_info获取流状态和统计
stream.list_active列出当前 AID 的活跃流

数据面端点(独立端口,默认 9490)

端点协议说明
/push/{stream_id}WebSocket推流。推荐使用 Authorization: Bearer {push_token},query token 仅为兼容旧客户端
/pull/{stream_id}HTTP SSE拉流。推荐使用 Authorization: Bearer {pull_token},可跨域
/healthHTTP GET健康检查

stream.create

创建一条新的流,返回推流和拉流地址。推流方通过 WebSocket 连接 push_url 发送数据帧,拉流方通过 HTTP SSE 连接 pull_url 接收数据。

参数

参数类型必填说明
content_typestring内容类型,默认 "text/plain"。常用值:text/plain(文本流)、application/json-stream(JSON 对象流)、text/event-stream(SSE 风格事件流)
metadataobject自定义元数据,如 {"model": "gpt-4", "task_id": "xxx"}
target_aidstring绑定拉流方 AID。当前实现中,只有拉流方显式提供 aid 时才会做该匹配校验

响应

字段类型说明
stream_idstring流唯一 ID(16 位 hex)
push_urlstring推流 WebSocket URL(含 push_token)
pull_urlstring拉流 HTTP SSE URL(含 pull_token)
push_tokenstring推流凭证
pull_tokenstring拉流凭证,便于通过 message.send 单独传递
push_headersobject推流 Authorization header({"Authorization": "Bearer {push_token}"})
pull_headersobject拉流 Authorization header({"Authorization": "Bearer {pull_token}"})

调用示例

python
result = await client.call("stream.create", {
    "content_type": "text/plain",
    "metadata": {"model": "gpt-4"},
})
# result = {
#   "stream_id": "4d5067f203cf42ba",
#   "push_url": "wss://stream.aid.com:9490/push/4d5067f203cf42ba?token=ec80...",
#   "pull_url": "https://stream.aid.com:9490/pull/4d5067f203cf42ba?token=c438...",
#   "push_token": "ec80...",
#   "pull_token": "c438953be0ca887b...",
#   "push_headers": {"Authorization": "Bearer ec80..."},
#   "pull_headers": {"Authorization": "Bearer c438..."}
# }

当前实现中,push_url / pull_url 仍会附带 query token 以兼容旧客户端;新客户端优先使用返回的 push_headers / pull_headers

错误

codemessage原因
-33402Stream limit exceeded活跃流数超过服务配置上限

stream.close

关闭流。仅流的创建者可调用。关闭后所有拉流端收到 SSE event: done

参数

参数类型必填说明
stream_idstring要关闭的流 ID

响应

字段类型说明
successbooleantrue

调用示例

python
await client.call("stream.close", {"stream_id": "4d5067f203cf42ba"})

错误

codemessage原因
-33405Stream invalid params缺少 stream_id
-33403Stream permission denied非创建者调用

流不存在时幂等返回 {"success": true},不抛错误。


stream.get_info

获取流的状态和统计信息。仅流的创建者或 target_aid 可调用。

参数

参数类型必填说明
stream_idstring流 ID

响应

字段类型说明
stream_idstring流 ID
creator_aidstring创建者 AID
content_typestring内容类型
metadataobject自定义元数据
statusstring"waiting" / "active" / "done"
is_onlineboolean推流端是否在线
seqinteger当前最大序列号
frames_pushedinteger已推送帧数
bytes_pushedinteger已推送字节数
puller_countinteger当前拉流端数量
age_secondsfloat流存活时间(秒)
idle_secondsfloat距最近活动的秒数

调用示例

python
info = await client.call("stream.get_info", {"stream_id": "4d5067f203cf42ba"})
# info["status"] == "active"
# info["frames_pushed"] == 42

stream.list_active

列出当前 AID 创建的所有活跃流。需要有效认证身份。

参数

无。

响应

字段类型说明
streamsarrayStreamInfo 对象数组(同 get_info 响应格式)

调用示例

python
result = await client.call("stream.list_active", {})
for s in result["streams"]:
    print(f"{s['stream_id']}: {s['status']}, {s['frames_pushed']} frames")

数据面:推流 WebSocket

连接 stream.create 返回的 push_url,通过 WebSocket 发送 JSON 帧。当前实现优先从 Authorization: Bearer 读取 token,query string 仅作兼容回退。

帧格式

数据帧

json
{"cmd": "data", "data": "chunk内容", "seq": 1}
字段类型必填说明
cmdstring固定 "data"
datastring数据内容,无大小限制(WS 帧上限 64MB)
seqinteger序列号,不提供则服务端自增

关闭帧

json
{"cmd": "close"}

完整示例

python
import websockets, json

async with websockets.connect(push_url, ssl=ssl_ctx) as ws:
    for i, token in enumerate(llm_tokens, 1):
        await ws.send(json.dumps({"cmd": "data", "data": token, "seq": i}))
    await ws.send(json.dumps({"cmd": "close"}))

断线重连

WebSocket 断开后,服务端保留流状态最多 120 秒。重连后继续从断点 seq 推送即可。


数据面:拉流 HTTP SSE

连接 stream.create 返回的 pull_url,接收标准 SSE 流。

当前实现优先从 Authorization: Bearer 读取 pull token;若提供拉流方 AID,优先使用 X-Stream-AID 请求头,query 参数仅作兼容回退。

SSE 格式

data: Hello 

data: World

event: done
data: {}
  • data: — 原始数据内容
  • event: done — 流结束信号
  • : keep-alive — 心跳注释(每 10 秒)

断线续拉

当前实现支持标准 SSE 的 Last-Event-ID 续拉:

  • 服务端在每个 SSE 数据块中写入 id: {seq}
  • 客户端重连时可携带 Last-Event-ID
  • 服务端会跳过 seq <= Last-Event-ID 的缓冲数据,再继续实时推送

注意:这只覆盖仍保留在当前流内存缓冲中的历史数据,不是持久化重放。 如果请求的 seq 早于当前内存缓冲最小值,服务端不会返回显式 gap 错误,而是从仍保留的最早帧继续推送。

完整示例

python
import aiohttp

async with aiohttp.ClientSession() as session:
    async with session.get(pull_url, headers={"Accept": "text/event-stream"}) as resp:
        buffer = ""
        async for chunk in resp.content.iter_any():
            buffer += chunk.decode()
            # 解析 SSE 帧...

HTTP 错误码

状态码说明
403pull_token 无效,或显式提供的 aid 与 target_aid 不匹配
404流不存在
410流已关闭
429拉流端数量已达上限

控制面错误码汇总

codemessage说明
-33401Stream not foundstream_id 无效或流已被清理
-33402Stream limit exceeded活跃流数超过上限
-33403Stream permission denied非创建者执行受限操作
-33404Stream already closed流已关闭
-33405Stream invalid params参数无效(如缺少 stream_id)
-33406Stream rate limited速率限制
-33407Stream internal error服务内部错误

AUN Protocol Documentation