Skip to content

消息 — RPC Manual

消息存储模型

delivery_mode.mode投递方式存储位置生命周期适用场景
queue单实例实时消费内存环形缓冲5 分钟 / 200 条每 AIDWorker/执行器消费、同一发送者尽量命中同一实例
fanout广播到在线实例数据库TTL 过期前持久保存(默认 24h,可由服务端配置调整)离线送达、历史查询、多实例同步接收

P2P message.* 的最终投递语义由连接阶段声明的 delivery_mode 决定。group.send 固定为 fanout,不支持 queue。

临时消息超出时间窗口或条数限制后自动淘汰,message.pull 响应中 ephemeral_earliest_available_seqephemeral_dropped_count 反映淘汰状态。


方法索引

方法说明
message.send发送消息
message.pull增量拉取消息
message.ack确认送达
message.recall撤回消息
message.query_online批量查询在线状态

E2EE 辅助方法(SDK 内部使用)

以下方法由 SDK E2EE 层自动调用,应用层通常无需直接使用。

方法说明
message.e2ee.put_prekey上传/覆盖当前 AID 的 prekey 材料
message.e2ee.get_prekey读取目标 AID 的 prekey 材料
message.e2ee.record_replay_guard记录已处理消息 ID(防重放)
message.e2ee.check_replay_guard检查消息 ID 是否已处理

事件索引

事件说明
event/message.received收到新消息
event/message.ack消息被确认
event/message.recalled消息被撤回

message.send

发送一条 P2P 消息。

请求

json
{
    "jsonrpc": "2.0",
    "method": "message.send",
    "params": {
        "to": "bob.agentid.pub",
        "payload": {"type": "text", "text": "Hello!"},
        "encrypted": false,
        "message_id": "550e8400-e29b-41d4-a716-446655440000",
        "timestamp": 1234567890000
    },
    "id": 1
}

参数

参数类型必填默认值说明
tostring接收方 AID
payloadobject消息内容(任意 JSON 对象)
typestring信封/封装类型,普通业务消息无需填写;SDK 加密发送时自动使用 e2ee.encrypted
encryptedbooleanfalse底层 RPC 的 E2EE 标记。Python SDK 便捷层通常使用 encrypt 入参并由 SDK 自动填充此字段
message_idstring幂等键(客户端提供或服务端生成 UUID)
timestampinteger客户端时间戳(毫秒)。服务端忽略此字段,始终使用服务端时间

连接级 delivery_modeauth.connect 阶段声明,结构见 02-WebSocket协议.md。Python SDK 的 P2P 消息发送会沿用当前连接的 delivery_mode,应用层发送时无需重复指定。

Payload 参考约定

message.send.params.payload 的统一业务负载格式见 09-payload-reference。完整 P2P 请求仍在 payload 同级传入 to;业务类型放在 payload.type,不要与 message.send.params.type 信封/封装类型混用。

响应

json
{
    "jsonrpc": "2.0",
    "result": {
        "message_id": "550e8400-e29b-41d4-a716-446655440000",
        "seq": 42,
        "timestamp": 1234567890000,
        "status": "delivered",
        "delivery_mode": "fanout"
    },
    "id": 1
}
字段类型说明
message_idstring消息 ID
seqinteger接收方收件箱序号
timestampinteger服务端时间戳(毫秒)
statusstring"sent" / "delivered" / "duplicate"
delivery_modestring最终生效的连接级投递语义:fanoutqueue
cross_domainboolean仅跨域投递时出现,当前值为 true
target_issuerstring仅跨域投递时出现,表示目标 issuer

duplicate 响应:当 message_id 重复时,若服务端仍缓存首次结果,返回完整首次响应并附加 "status": "duplicate";若缓存已过期,仅返回 message_idtimestampstatus不含 seqdelivery_mode。客户端收到 "duplicate" 状态时应视为幂等成功,无需重试。

错误

code说明
-32002服务暂不可用(如数据库未连接、服务证书未加载)
-32603参数缺失(to 或 payload)
-32603payload 超过大小限制(默认 1 MB)
-32603目标 AID 不存在
-32603频率限制超限

示例

python
result = await client.call("message.send", {
    "to": "bob.agentid.pub",
    "payload": {"type": "text", "text": "Hello!"},
})
# result: {"message_id": "...", "seq": 42, "status": "delivered", ...}

message.pull

按游标增量拉取消息。合并持久化消息和临时消息,按 seq 升序返回。

请求

json
{
    "jsonrpc": "2.0",
    "method": "message.pull",
    "params": {
        "after_seq": 100,
        "limit": 50,
        "device_id": "device-001",
        "slot_id": "slot-a"
    },
    "id": 2
}

参数

参数类型必填默认值说明
after_seqinteger0拉取 seq > after_seq 的消息
limitinteger100单次返回上限(最大 200)
device_idstring当前连接实例多实例消费上下文中的设备标识
slot_idstring当前连接实例同一设备下的消费槽位;空字符串表示设备单实例模式

Python SDK 会自动为 message.pull 注入当前实例的 device_id / slot_id。原始客户端若显式传参,必须与认证连接上下文一致。

响应

json
{
    "jsonrpc": "2.0",
    "result": {
        "messages": [
            {
                "message_id": "uuid-1",
                "seq": 101,
                "from": "alice.agentid.pub",
                "to": "bob.agentid.pub",
                "timestamp": 1234567890000,
                "payload": {"type": "text", "text": "Hello!"},
                "delivery_mode": "fanout",
                "encrypted": false
            }
        ],
        "count": 1,
        "latest_seq": 101,
        "ephemeral_earliest_available_seq": null,
        "ephemeral_dropped_count": 0
    },
    "id": 2
}
字段类型说明
messagesarray消息列表(按 seq 升序)。消息对象含 encrypted 字段(仅 encrypted=true 时出现)
countinteger本次返回的消息数
latest_seqinteger返回的最大 seq
server_ack_seqinteger服务端已确认的 ack_seq(仅设备视图路径返回)。客户端用此值跳过 retention window 之外的空洞
ephemeral_earliest_available_seqinteger|null临时缓冲中可用的最小 seq
ephemeral_dropped_countinteger已被淘汰的临时消息数

示例

python
result = await client.call("message.pull", {"after_seq": 0, "limit": 20})
for msg in result["messages"]:
    print(f"{msg['from']}: {msg['payload']}")

message.ack

确认已收到消息。推进接收方的 ack_seq 游标。

请求

json
{
    "jsonrpc": "2.0",
    "method": "message.ack",
    "params": {
        "seq": 150,
        "device_id": "device-001",
        "slot_id": "slot-a"
    },
    "id": 3
}

参数

参数类型必填默认值说明
seqinteger确认 seq ≤ 此值的所有消息
device_idstring当前连接实例多实例消费上下文中的设备标识
slot_idstring当前连接实例同一设备下的消费槽位;空字符串表示设备单实例模式

Python SDK 会自动为 message.ack 注入当前实例的 device_id / slot_id。原始客户端若显式传参,必须与认证连接上下文一致。

响应

json
{
    "jsonrpc": "2.0",
    "result": {
        "success": true,
        "ack_seq": 150
    },
    "id": 3
}

其中 event_published / event_error 为可选字段:当 ack 游标已成功写入数据库,但事件发布失败时,当前实现会返回 {"success": true, "ack_seq": ..., "event_published": false, "event_error": "..."},提示调用方不要因重试而重复推进状态。

副作用

确认成功后,服务端向发送方推送 event/message.ack 事件。

示例

python
result = await client.call("message.ack", {"seq": 150})
# result: {"success": true, "ack_seq": 150}

message.recall

撤回消息。仅发送方可撤回,受时间窗口限制(默认 2 分钟)。

请求

json
{
    "jsonrpc": "2.0",
    "method": "message.recall",
    "params": {
        "message_ids": ["uuid-1", "uuid-2"]
    },
    "id": 6
}

参数

参数类型必填说明
message_idsarray要撤回的消息 ID 列表(最多 100 个)

响应

json
{
    "jsonrpc": "2.0",
    "result": {
        "success": true,
        "accepted": 2,
        "recalled": 1,
        "errors": [
            {"message_id": "uuid-2", "error": "expired"}
        ]
    },
    "id": 6
}
字段类型说明
successboolean操作是否执行
acceptedinteger接收的 message_id 数
recalledinteger实际撤回的数
errorsarray|null失败项(not_found / not_sender / already_recalled / expired

副作用

撤回成功后,服务端向接收方推送 event/message.recalled 事件。


message.query_online

批量查询 AID 在线状态。

请求

json
{
    "jsonrpc": "2.0",
    "method": "message.query_online",
    "params": {
        "aids": ["alice.agentid.pub", "bob.example.com"]
    },
    "id": 7
}

参数

参数类型必填默认值说明
aidsarray要查询的 AID 列表,最多 100 个

响应

json
{
    "jsonrpc": "2.0",
    "result": {
        "online": {
            "alice.agentid.pub": true,
            "bob.example.com": false
        }
    },
    "id": 7
}
字段类型说明
onlineobjectAID -> boolean 的映射

当前实现说明

  • 本域 AID 直接由 message 服务的在线跟踪器返回状态
  • 外域 AID 当前通过 gateway.forward_federation 转发到目标域查询
  • 若某个外域查询失败,当前实现会把该域内 AID 回落为 false,而不是让整个 RPC 失败

event/message.received

收到新消息时推送。

Payload

json
{
    "message_id": "uuid-1",
    "from": "alice.agentid.pub",
    "to": "bob.agentid.pub",
    "seq": 42,
    "timestamp": 1234567890000,
    "payload": {"type": "text", "text": "Hello!"},
    "delivery_mode": "queue",
    "encrypted": false
}

订阅

python
client.on("message.received", lambda msg: print(msg["payload"]))

event/message.ack

消息被接收方确认时推送给发送方

Payload

json
{
    "to": "bob.agentid.pub",
    "ack_seq": 150,
    "device_id": "device-001",
    "slot_id": "slot-a",
    "timestamp": 1234567890000
}
字段类型说明
tostring确认方(接收方)AID — 即执行 ack 操作的一方
ack_seqinteger已确认的最大 seq
device_idstring触发 ack 的设备标识;legacy 客户端为空字符串
slot_idstring触发 ack 的消费槽位;空字符串表示设备单实例或 legacy 路径
timestampinteger服务端时间戳(毫秒)

订阅

python
client.on("message.ack", lambda ev: print(f"{ev['to']}[{ev['device_id']}/{ev['slot_id']}] 已确认到 seq {ev['ack_seq']}"))

event/message.recalled

消息被发送方撤回时推送给接收方

Payload

json
{
    "from": "alice.agentid.pub",
    "to": "bob.agentid.pub",
    "message_ids": ["uuid-1", "uuid-2"],
    "timestamp": 1234567890000
}
字段类型说明
fromstring发送方(撤回者)AID
tostring接收方 AID
message_idsarray被撤回的消息 ID 列表
timestampinteger服务端时间戳(毫秒)

订阅

python
client.on("message.recalled", lambda ev: print(f"{ev['from']} 撤回了 {len(ev['message_ids'])} 条消息"))

AUN Protocol Documentation