AUN SDK Python - E2EE 加密通信
默认行为
SDK 默认开启端到端加密:
- P2P 消息(
message.send)、P2P 思考内容(message.thought.put)、群组消息(group.send)和群思考内容(group.thought.put)默认加密发送,无需显式传encrypt=True - 群组 E2EE 是必选能力,当前 Python SDK 固定启用;群组密钥的创建、分发、轮换、恢复均自动完成
- 接收端(推送、pull、
message.thought.get和group.thought.get)均自动解密,无需额外操作
如需发送明文消息,显式传入 encrypt=False:
# 发送明文 P2P 消息
await client.call("message.send", {
"to": "bob.agentid.pub",
"payload": {"type": "text", "text": "这是明文"},
"encrypt": False,
})
# 发送明文群消息
await client.call("group.send", {
"group_id": "g-abc123.agentid.pub",
"payload": {"type": "text", "text": "这是明文"},
"encrypt": False,
})发送加密消息
message.send 默认加密发送,SDK 自动完成加密:
await client.call("message.send", {
"to": "bob.agentid.pub",
"payload": {"type": "text", "text": "秘密消息"},
})SDK 优先使用 prekey_ecdh_v2,并默认要求前向保密:
- prekey_ecdh_v2 — 对方有预上传的 prekey,四路 ECDH(ephemeral×prekey + ephemeral×identity + sender×prekey + sender×identity),前向安全,附带发送方签名
- long_term_key — 对方无 prekey,双路 ECDH(ephemeral×recipient_identity + sender×recipient_identity)+ HKDF 派生密钥(降级模式),附带发送方签名
Python SDK 默认
require_forward_secrecy=true,当加密结果不满足前向保密(无论是因为无 prekey 还是 prekey 加密失败降级到 long_term_key)时拒绝发送并抛出错误。需显式配置require_forward_secrecy=false才允许降级。
每条消息独立生成临时 ECDH 密钥对,实现一消息一密钥。
ProtectedHeaders 与可验证上下文
protected_headers 是 E2EE 信封里的可选元数据字典,语义接近 HTTP headers:适合放 device_id、slot_id、device_name、os、sdk_version、app_name 等需要被接收端识别、且需要防篡改的非业务内容。payload 仍然类似 HTTP body,业务内容应放在 payload 内。
protected_headers 会随 E2EE 信封发送,接收端可以读取,因此它提供完整性保护,不提供机密性保护。不要把访问令牌、私钥、隐私正文或其他只允许端到端可见的内容放入 protected_headers;这类内容应放进加密的 payload。
推荐通过 SDK 实例级 setter 设置稳定元数据,例如 client.set_protected_headers(...) / client.setProtectedHeaders(...) / client.SetProtectedHeaders(...)。发送方也可以在以下 SDK 调用中传入顶层 protected_headers 作为单次发送的高级覆盖;headers 仅作为兼容旧调用的别名,不推荐新代码使用:
message.sendmessage.thought.putgroup.sendgroup.thought.put
payload_type 不需要应用层传入。SDK 会读取加密前 payload.type,自动写入 protected_headers.payload_type,接收端解密后会校验它与明文 payload.type 一致。
protected_headers / headers 是 send/thought 参数的顶层字段,不放入单独的 envelope 入参对象,也不属于业务 payload。裸 WebSocket 客户端若自行发送已加密信封,需要把 protected headers 放在自构造的 E2EE 信封内并自行完成 _auth,服务端不会替裸 RPC 调用生成或校验明文侧的 protected headers。
示例:
from aun_core import ProtectedHeaders
headers = (
ProtectedHeaders()
.set("device_id", "dev-123")
.set("slot_id", "desktop")
.set("sdk_version", "0.4.5")
.set("app_name", "my-agent")
)
await client.call("message.send", {
"to": "bob.agentid.pub",
"payload": {"type": "text", "text": "秘密消息"},
"protected_headers": headers,
})应用层也可以直接传普通字典:
await client.call("group.send", {
"group_id": "10001.example.com",
"payload": {"type": "text", "text": "群组消息"},
"protected_headers": {"device_id": "dev-123", "slot_id": "desktop"},
})防篡改机制
为兼容旧 E2EE 信封,protected_headers 和 context 不加入原有信封整体 AAD。它们各自带一个 _auth 字段,自包含完整性校验信息:
{
"type": "e2ee.encrypted",
"ciphertext": "...",
"protected_headers": {
"device_id": "dev-123",
"slot_id": "desktop",
"payload_type": "text",
"_auth": {
"alg": "HMAC-SHA256",
"tag": "base64..."
}
},
"context": {
"type": "run",
"id": "run-xxx",
"_auth": {
"alg": "HMAC-SHA256",
"tag": "base64..."
}
}
}计算规则:
- 解密流程会派生出本条消息的
message_key。 metadata_key = HMAC-SHA256(message_key, "aun-envelope-metadata-key-v1")。- 对字典去掉
_auth后做 canonical JSON:UTF-8、key 排序、紧凑分隔符。 tag = HMAC-SHA256(metadata_key, domain + "\0" + canonical_json(body))。domain对protected_headers为aun-protected-headers-v1,对context为aun-protected-context-v1。
只有持有本条消息密钥的发送端和接收端能生成或验证 _auth.tag。中间服务可以看到这些元数据,但不能在不破坏校验的情况下修改它们。
兼容策略:
- 老消息没有
protected_headers/context时照常解密。 - 新消息一旦携带
protected_headers或 E2EE 信封内的context,就必须携带对应_auth。 _auth校验失败、payload_type与解密后payload.type不一致,或信封内context与外层 thought selector 不一致时,SDK 视为解密失败。
接收端读取
SDK 会在验签/验 _auth 后,把 _auth 去掉,只把业务可见字段回传给应用层:
msg = result["messages"][0]
headers = msg.get("e2ee", {}).get("protected_headers", {})
device_id = headers.get("device_id")
payload_type = headers.get("payload_type")对于 thought,顶层 context.type + context.id 仍是服务端定位 thought head 的 selector;E2EE 信封内的 context 只是对这个 selector 的端到端完整性绑定。
P2P 思考内容
P2P 思考内容不是普通消息,不广播、不进 message.pull、不分配 seq、无需 ack,也不持久化。它只通过 message.thought.put/get 读写,并强制使用 P2P E2EE。
thought selector 只使用顶层 context.type + context.id,推荐 {"type": "run", "id": "run-xxx"}。如果需要展示被引用消息摘要,应放在加密后的 payload.quote 或 payload.client_context 中,不作为服务端 selector。
await client.call("message.thought.put", {
"to": "bob.agentid.pub",
"context": {"type": "run", "id": "run-xxx"},
"payload": {"type": "thought", "text": "先核对 Bob 的约束,再输出答复"},
})读取对方写给当前用户的 thought 时,指定 sender_aid 和同一个 context:
result = await client.call("message.thought.get", {
"sender_aid": "bob.agentid.pub",
"context": {"type": "run", "id": "run-xxx"},
})读取自己写给对方的 thought 时,还需要指定 peer_aid 或 to:
result = await client.call("message.thought.get", {
"sender_aid": "alice.agentid.pub",
"peer_aid": "bob.agentid.pub",
"context": {"type": "run", "id": "run-xxx"},
})SDK 返回的 result["thoughts"] 是已解密数组。message.thought.get 是查询操作,重复读取同一条 thought 不按消息 replay 消费处理。
群组加密消息
SDK 的群组 E2EE 编排对使用者透明:
- 建群:
group.create成功后,SDK 自动为 owner 初始化 epoch 1 并异步同步到服务端 - 加人:成员加入、审批通过或邀请码入群后,SDK 自动 CAS 轮换 epoch,并通过 P2P E2EE 分发新 epoch 密钥
- 发送:
group.send默认加密,建群后即可立即发送 - 思考内容:
group.thought.put强制走群 E2EE;group.thought.get返回前由 SDK 解密为thoughts[]
前置配置
群组 E2EE 是必选能力,所有客户端必须支持。当前 Python SDK 固定启用,无需也不能通过配置关闭;成员加入时也固定轮换 epoch,无应用层开关:
client = AUNClient(aid)重要:所有客户端必须声明群组 E2EE 能力。服务端当前仅对在线客户端做能力校验;离线客户端入群时不做强制检查。消息是否加密由发送者自主决定。
发送
group.send 默认加密发送:
await client.call("group.send", {
"group_id": "g-abc123.agentid.pub",
"payload": {"type": "text", "text": "群组加密消息"},
})SDK 自动处理:
- 建群后自动创建群组密钥(
group.create返回后) - 加人后自动 CAS 轮换密钥并分发给当前成员(含新成员)(
group.add_member返回后) - 踢人后自动 CAS 轮换密钥并分发给剩余成员(
group.kick返回后) - 成员退群后由剩余在线 admin/owner 收到
group.changed事件后自动 CAS 轮换密钥(离开者自身不执行轮换) - 审批通过后自动 CAS 轮换密钥并分发给当前成员(含新成员)(
group.review_join_request返回后) - 批量审批通过后自动 CAS 轮换密钥并分发给当前成员(含新成员)(
group.batch_review_join_request返回后) - 通过邀请码入群(
group.use_invite_code)后,SDK 自动向群内 admin/owner 发起密钥恢复请求;恢复是异步过程,后续 pull 或再次收到群消息时才能成功解密
接收
推送和 pull 均自动解密:
# 推送
client.on("group.message_created", lambda msg: print(msg["payload"]))
# Pull
result = await client.call("group.pull", {"group_id": "g-abc123.agentid.pub", "after_message_seq": 0})
for msg in result["messages"]:
if msg.get("e2ee"):
print(f"加密消息: {msg['payload']}")群思考内容
群思考内容不是普通群消息,不广播、不进 group.pull、不分配 seq、无需 ack,也不持久化。它只通过 group.thought.put/get 读写,并强制使用群组 E2EE。
群 thought selector 同样只使用顶层 context.type + context.id。
await client.call("group.thought.put", {
"group_id": "g-abc123.agentid.pub",
"context": {"type": "run", "id": "run-xxx"},
"payload": {"type": "thought", "text": "正在比较两个候选方案"},
})读取时必须指定 thought 作者和同一个 context:
result = await client.call("group.thought.get", {
"group_id": "g-abc123.agentid.pub",
"sender_aid": "alice.agentid.pub",
"context": {"type": "run", "id": "run-xxx"},
})SDK 返回的 result["thoughts"] 是已解密数组。group.thought.get 是查询操作,重复读取同一条 thought 不按消息 replay 消费处理。
手动操作(通常不需要)
rotate_epoch() 是纯本地密码学 API。多客户端并发环境下,应通过服务端 CAS(group.e2ee.get_epoch + group.e2ee.rotate_epoch)确保原子性,SDK 自动编排已使用 CAS 路径。
# 手动轮换 epoch(纯本地,不经过服务端 CAS 校验)
info = client.group_e2ee.rotate_epoch("g-abc123.agentid.pub", member_aids)
# 查询状态
client.group_e2ee.has_secret("g-abc123.agentid.pub") # 是否持有密钥
client.group_e2ee.current_epoch("g-abc123.agentid.pub") # 当前 epoch
client.group_e2ee.get_member_aids("g-abc123.agentid.pub") # 已知成员列表接收加密消息
推送接收
def handler(msg):
if msg.get("encrypted"):
print(f"加密消息: {msg['payload']}")
client.on("message.received", handler)Pull 接收
message.pull 返回的消息已自动解密:
result = await client.call("message.pull", {"after_seq": 0, "limit": 50})
for msg in result["messages"]:
if msg.get("encrypted"):
print(f"加密消息: {msg['payload']}")
print(f"加密模式: {msg['e2ee']['encryption_mode']}")Prekey 管理
连接时 SDK 自动上传 prekey,并定时轮换(默认每小时)。一般无需手动管理。
手动上传 prekey:
# 通过 AUNClient 上传(生成 + RPC)
await client._upload_prekey()底层 API(E2EEManager 只生成材料,不做 RPC):
prekey_material = client.e2ee.generate_prekey()
# 返回 {"prekey_id": "...", "public_key": "...", "signature": "...", "created_at": ...}
# 需要自行上传:await transport.call("message.e2ee.put_prekey", prekey_material)
---
## 裸 WebSocket 开发者指南
> 本节面向不使用 AUNClient 连接管理、自行管理 WebSocket 连接的开发者。
### 架构说明
`E2EEManager` 是纯密码学工具类,无 I/O 依赖。构造函数只需:
- `identity_fn` — 返回当前身份信息的函数
- `keystore` — 密钥存储实现
所有 I/O(获取证书、获取 prekey、RPC 调用)由调用方自行处理。
### 实例化 E2EEManager
```python
from aun_core.e2ee import E2EEManager
from aun_core.keystore.local_token_store import LocalTokenStore
e2ee = E2EEManager(
identity_fn=lambda: my_identity,
keystore=LocalTokenStore("~/.aun/myapp"),
)加密消息
# 1. 获取对方证书(HTTP)和 prekey(RPC,可选)
peer_cert_pem = await fetch_cert(peer_aid) # 调用方实现
prekey = await fetch_prekey(peer_aid) # 调用方实现,可能为 None
# 2. 加密(传入 prekey 会自动缓存,后续可传 None 复用缓存)
envelope, ok = e2ee.encrypt_message(
to_aid="bob.agentid.pub",
payload={"type": "text", "text": "秘密消息"},
peer_cert_pem=peer_cert_pem,
prekey=prekey,
)
# 3. 通过自己的 WebSocket 发送
aad = envelope["aad"]
await my_rpc_call("message.send", {
"to": "bob.agentid.pub",
"payload": envelope,
"type": "e2ee.encrypted",
"encrypted": True,
"message_id": aad["message_id"],
"timestamp": aad["timestamp"],
})P2P 消息的投递语义来自连接阶段声明的 delivery_mode:
fanout:广播到在线实例,并保留离线历史queue:只做单实例实时消费,不进入历史
解密消息
# 解密单条消息(内置本地防重放)
decrypted = e2ee.decrypt_message(raw_message)
if decrypted is not None:
print(decrypted["payload"]) # 明文Prekey 缓存
E2EEManager 内置 prekey 缓存(TTL 默认 1 小时):
# 首次传入 prekey → 自动缓存
envelope, ok = e2ee.encrypt_message(..., prekey=fetched_prekey)
# 后续传 None → 自动复用缓存
envelope, ok = e2ee.encrypt_message(..., prekey=None)
# 手动管理
e2ee.cache_prekey("bob.agentid.pub", prekey_dict)
cached = e2ee.get_cached_prekey("bob.agentid.pub")
e2ee.invalidate_prekey_cache("bob.agentid.pub")生成 Prekey
prekey_material = e2ee.generate_prekey()
# 返回 {"prekey_id": "...", "public_key": "...", "signature": "...", "created_at": ...}
# 自行上传到服务端
await my_rpc_call("message.e2ee.put_prekey", prekey_material)群组 E2EE(GroupE2EEManager)
GroupE2EEManager 是群组 E2EE 的纯工具类,与 E2EEManager 平行。密码学和状态管理全自动,网络发送由调用方负责。
实例化
from aun_core.e2ee import GroupE2EEManager
from aun_core.keystore.local_token_store import LocalTokenStore
group_e2ee = GroupE2EEManager(
identity_fn=lambda: my_identity,
keystore=LocalTokenStore("~/.aun/myapp"),
)建群后创建 epoch 并分发
info = group_e2ee.create_epoch(group_id, member_aids)
# info = {epoch, commitment, distributions: [{to, payload}, ...]}
for dist in info["distributions"]:
# 通过 P2P E2EE 发送密钥分发消息
envelope, _ = e2ee.encrypt_message(
to_aid=dist["to"], payload=dist["payload"],
peer_cert_pem=fetch_cert(dist["to"]),
)
await my_rpc_call("message.send", {
"to": dist["to"], "payload": envelope,
"type": "e2ee.encrypted", "encrypted": True,
...
})加密群消息
envelope = group_e2ee.encrypt(group_id, {"type": "text", "text": "hello"})
await my_rpc_call("group.send", {
"group_id": group_id,
"payload": envelope,
"type": "e2ee.group_encrypted",
"encrypted": True,
})解密群消息
decrypted = group_e2ee.decrypt(raw_group_message)
# 内置防重放 + 外层 group_id/from/sender_aid 校验
# 非加密消息原样返回,解密失败返回 None
# 批量解密(用于 group.pull)
results = group_e2ee.decrypt_batch(messages)处理 P2P 密钥消息
所有密钥协议消息(分发/请求/响应)通过 P2P E2EE 传输。收到后先 P2P 解密,再交给 handle_incoming:
inner = e2ee.decrypt_message(p2p_message) # P2P 层解密
result = group_e2ee.handle_incoming(inner["payload"])
if result == "distribution":
pass # 密钥已自动存储
elif result == "distribution_rejected":
pass # epoch 降级被拒
elif result == "response":
pass # 密钥恢复响应已存储
elif result == "request":
# 需要回复:查成员列表 → 构建响应 → P2P 发送
members = get_group_members(group_id)
response = group_e2ee.handle_key_request_msg(inner["payload"], members)
if response:
p2p_e2ee_send(inner["payload"]["requester_aid"], response)踢人后轮换
info = group_e2ee.rotate_epoch(group_id, remaining_member_aids)
for dist in info["distributions"]:
p2p_e2ee_send(dist["to"], dist["payload"])配合服务端 CAS 防脑裂(推荐):
# 1. 读当前 epoch
status = await my_rpc_call("group.e2ee.get_epoch", {"group_id": group_id})
# 2. CAS 递增(服务端校验 admin/owner 角色 + 原子递增 + rotation_signature 验签)
cas = await my_rpc_call("group.e2ee.rotate_epoch", {
"group_id": group_id, "current_epoch": status["epoch"],
# Python SDK 自动附加 rotation_signature 和 rotation_timestamp
# 裸客户端必须自行签名:sign("{group_id}|{current_epoch}|{new_epoch}|{aid}|{timestamp}")
})
if cas["success"]:
info = group_e2ee.rotate_epoch_to(group_id, cas["epoch"], member_aids)
for dist in info["distributions"]:
p2p_e2ee_send(dist["to"], dist["payload"])
# CAS 失败说明别的 admin 先轮换了,放弃即可加人后轮换并分发密钥
# 成员加入改变成员集,推荐先通过服务端 CAS 推进 epoch。
status = await my_rpc_call("group.e2ee.get_epoch", {"group_id": group_id})
cas = await my_rpc_call("group.e2ee.rotate_epoch", {"group_id": group_id, "current_epoch": status["epoch"], ...})
if cas["success"]:
info = group_e2ee.rotate_epoch_to(group_id, cas["epoch"], updated_member_aids)
for dist in info["distributions"]:
p2p_e2ee_send(dist["to"], dist["payload"])解密失败时请求恢复
当前 Python SDK 优先请求本地成员列表中的第一个候选者;零状态时退化为请求当前消息发送者。
recovery = group_e2ee.build_recovery_request(
group_id, epoch, sender_aid=msg.get("sender_aid"),
)
if recovery:
p2p_e2ee_send(recovery["to"], recovery["payload"])
# 频率限制:同群同 epoch 30 秒内不重复请求状态查询
group_e2ee.has_secret(group_id) # 是否持有密钥
group_e2ee.current_epoch(group_id) # 当前 epoch,无密钥返回 None
group_e2ee.get_member_aids(group_id) # 已知成员列表
group_e2ee.load_all_secrets(group_id) # {epoch: secret_bytes} 映射
group_e2ee.cleanup(group_id) # 清理过期旧 epoch(默认保留 7 天)自定义密钥存储
实现 KeyStore Protocol,可替换默认的文件存储:
class MyKeyStore:
def load_key_pair(self, aid: str) -> dict | None: ...
def save_key_pair(self, aid: str, key_pair: dict) -> None: ...
def load_cert(self, aid: str) -> str | None: ...
def save_cert(self, aid: str, cert_pem: str) -> None: ...
def load_identity(self, aid: str) -> dict | None: ...
def save_identity(self, aid: str, identity: dict) -> None: ...
def load_metadata(self, aid: str) -> dict | None: ...
def save_metadata(self, aid: str, metadata: dict) -> None: ...
client = AUNClient(aid)AUNClient 不接收外部 keystore 参数,也不再通过配置字典选择本地身份。若需替换 KeyStore,应按 SDK 内部接口规范扩展实现,再由对应语言 SDK 的内部装配层接入。
自定义敏感数据存储
实现 SecretStore Protocol。默认行为:Windows 使用 DPAPI,其他平台使用内存存储。
class MySecretStore:
def protect(self, scope: str, name: str, plaintext: bytes) -> dict: ...
def reveal(self, scope: str, name: str, record: dict) -> bytes | None: ...
def clear(self, scope: str, name: str) -> None: ...
client = AUNClient(aid)AUNClient 构造函数只接收可选 AID 对象。SecretStore / KeyStore / SQLiteBackup 属于 SDK 内部基础设施,不作为应用层构造参数暴露。
当前安全默认值
| 默认行为 | 说明 |
|---|---|
| P2P 消息默认要求发送方签名 | 无 sender_signature 的消息被拒绝 |
| 群组消息默认要求发送方签名 | require_signature=True,无签名或无发送方证书的消息被拒绝 |
| 群组 E2EE 为固定启用能力 | group_e2ee=true,不可关闭 |
| 默认要求前向保密 | require_forward_secrecy=true,无 prekey 时拒绝 long_term_key 降级 |
| 客户端操作签名 | group.send/group.kick/group.add_member/group.leave 等操作自动附加 client_signature,服务端强制验签 |

