AUN SDK Python - E2EE 加密通信
默认行为
SDK 默认开启端到端加密:
- P2P 消息(
message.send)和群组消息(group.send)默认加密发送,无需显式传encrypt=True - 群组 E2EE 是必选能力,当前 Python SDK 固定启用;群组密钥的创建、分发、轮换、恢复均自动完成
- 接收端(推送和 pull)均自动解密,无需额外操作
如需发送明文消息,显式传入 encrypt=False:
# 发送明文 P2P 消息
await client.call("message.send", {
"to": "bob.agentid.pub",
"payload": {"type": "text", "text": "这是明文"},
"encrypt": False,
})
# 发送明文群消息
await client.call("group.send", {
"group_id": "g-abc123.agentid.pub",
"payload": {"type": "text", "text": "这是明文"},
"encrypt": False,
})发送加密消息
message.send 默认加密发送,SDK 自动完成加密:
await client.call("message.send", {
"to": "bob.agentid.pub",
"payload": {"type": "text", "text": "秘密消息"},
})SDK 优先使用 prekey_ecdh_v2,并默认要求前向保密:
- prekey_ecdh_v2 — 对方有预上传的 prekey,四路 ECDH(ephemeral×prekey + ephemeral×identity + sender×prekey + sender×identity),前向安全,附带发送方签名
- long_term_key — 对方无 prekey,双路 ECDH(ephemeral×recipient_identity + sender×recipient_identity)+ HKDF 派生密钥(降级模式),附带发送方签名
Python SDK 默认
require_forward_secrecy=true,无 prekey 时拒绝 long_term_key 降级并抛出错误。需显式配置require_forward_secrecy=false才允许降级。
每条消息独立生成临时 ECDH 密钥对,实现一消息一密钥。
群组加密消息
SDK 的群组 E2EE 编排对使用者透明:
- 建群:
group.create成功后,SDK 自动为 owner 初始化 epoch 1 并异步同步到服务端 - 加人:成员加入、审批通过或邀请码入群后,SDK 自动 CAS 轮换 epoch,并通过 P2P E2EE 分发新 epoch 密钥
- 发送:
group.send默认加密,建群后即可立即发送
前置配置
群组 E2EE 是必选能力,所有客户端必须支持。当前 Python SDK 固定启用,无需也不能通过配置关闭;成员加入时也固定轮换 epoch,无应用层开关:
client = AUNClient({})重要:所有客户端必须声明群组 E2EE 能力。服务端当前仅对在线客户端做能力校验;离线客户端入群时不做强制检查。消息是否加密由发送者自主决定。
发送
group.send 默认加密发送:
await client.call("group.send", {
"group_id": "g-abc123.agentid.pub",
"payload": {"type": "text", "text": "群组加密消息"},
})SDK 自动处理:
- 建群后自动创建群组密钥(
group.create返回后) - 加人后自动 CAS 轮换密钥并分发给当前成员(含新成员)(
group.add_member返回后) - 踢人后自动 CAS 轮换密钥并分发给剩余成员(
group.kick返回后) - 成员退群后由剩余在线 admin/owner 收到
group.changed事件后自动 CAS 轮换密钥(离开者自身不执行轮换) - 审批通过后自动 CAS 轮换密钥并分发给当前成员(含新成员)(
group.review_join_request返回后) - 批量审批通过后自动 CAS 轮换密钥并分发给当前成员(含新成员)(
group.batch_review_join_request返回后) - 通过邀请码入群(
group.use_invite_code)后,SDK 自动向群内 admin/owner 发起密钥恢复请求;恢复是异步过程,后续 pull 或再次收到群消息时才能成功解密
接收
推送和 pull 均自动解密:
# 推送
client.on("group.message_created", lambda msg: print(msg["payload"]))
# Pull
result = await client.call("group.pull", {"group_id": "g-abc123.agentid.pub", "after_message_seq": 0})
for msg in result["messages"]:
if msg.get("e2ee"):
print(f"加密消息: {msg['payload']}")手动操作(通常不需要)
rotate_epoch() 是纯本地密码学 API。多客户端并发环境下,应通过服务端 CAS(group.e2ee.get_epoch + group.e2ee.rotate_epoch)确保原子性,SDK 自动编排已使用 CAS 路径。
# 手动轮换 epoch(纯本地,不经过服务端 CAS 校验)
info = client.group_e2ee.rotate_epoch("g-abc123.agentid.pub", member_aids)
# 查询状态
client.group_e2ee.has_secret("g-abc123.agentid.pub") # 是否持有密钥
client.group_e2ee.current_epoch("g-abc123.agentid.pub") # 当前 epoch
client.group_e2ee.get_member_aids("g-abc123.agentid.pub") # 已知成员列表接收加密消息
推送接收
def handler(msg):
if msg.get("encrypted"):
print(f"加密消息: {msg['payload']}")
client.on("message.received", handler)Pull 接收
message.pull 返回的消息已自动解密:
result = await client.call("message.pull", {"after_seq": 0, "limit": 50})
for msg in result["messages"]:
if msg.get("encrypted"):
print(f"加密消息: {msg['payload']}")
print(f"加密模式: {msg['e2ee']['encryption_mode']}")Prekey 管理
连接时 SDK 自动上传 prekey,并定时轮换(默认每小时)。一般无需手动管理。
手动上传 prekey:
# 通过 AUNClient 上传(生成 + RPC)
await client._upload_prekey()底层 API(E2EEManager 只生成材料,不做 RPC):
prekey_material = client.e2ee.generate_prekey()
# 返回 {"prekey_id": "...", "public_key": "...", "signature": "...", "created_at": ...}
# 需要自行上传:await transport.call("message.e2ee.put_prekey", prekey_material)
---
## 裸 WebSocket 开发者指南
> 本节面向不使用 AUNClient 连接管理、自行管理 WebSocket 连接的开发者。
### 架构说明
`E2EEManager` 是纯密码学工具类,无 I/O 依赖。构造函数只需:
- `identity_fn` — 返回当前身份信息的函数
- `keystore` — 密钥存储实现
所有 I/O(获取证书、获取 prekey、RPC 调用)由调用方自行处理。
### 实例化 E2EEManager
```python
from aun_core.e2ee import E2EEManager
from aun_core.keystore.file import FileKeyStore
e2ee = E2EEManager(
identity_fn=lambda: my_identity,
keystore=FileKeyStore("~/.aun/myapp"),
)加密消息
# 1. 获取对方证书(HTTP)和 prekey(RPC,可选)
peer_cert_pem = await fetch_cert(peer_aid) # 调用方实现
prekey = await fetch_prekey(peer_aid) # 调用方实现,可能为 None
# 2. 加密(传入 prekey 会自动缓存,后续可传 None 复用缓存)
envelope, ok = e2ee.encrypt_message(
to_aid="bob.agentid.pub",
payload={"type": "text", "text": "秘密消息"},
peer_cert_pem=peer_cert_pem,
prekey=prekey,
)
# 3. 通过自己的 WebSocket 发送
aad = envelope["aad"]
await my_rpc_call("message.send", {
"to": "bob.agentid.pub",
"payload": envelope,
"type": "e2ee.encrypted",
"encrypted": True,
"message_id": aad["message_id"],
"timestamp": aad["timestamp"],
})P2P 消息的投递语义来自连接阶段声明的 delivery_mode:
fanout:广播到在线实例,并保留离线历史queue:只做单实例实时消费,不进入历史
解密消息
# 解密单条消息(内置本地防重放)
decrypted = e2ee.decrypt_message(raw_message)
if decrypted is not None:
print(decrypted["payload"]) # 明文Prekey 缓存
E2EEManager 内置 prekey 缓存(TTL 默认 1 小时):
# 首次传入 prekey → 自动缓存
envelope, ok = e2ee.encrypt_message(..., prekey=fetched_prekey)
# 后续传 None → 自动复用缓存
envelope, ok = e2ee.encrypt_message(..., prekey=None)
# 手动管理
e2ee.cache_prekey("bob.agentid.pub", prekey_dict)
cached = e2ee.get_cached_prekey("bob.agentid.pub")
e2ee.invalidate_prekey_cache("bob.agentid.pub")生成 Prekey
prekey_material = e2ee.generate_prekey()
# 返回 {"prekey_id": "...", "public_key": "...", "signature": "...", "created_at": ...}
# 自行上传到服务端
await my_rpc_call("message.e2ee.put_prekey", prekey_material)群组 E2EE(GroupE2EEManager)
GroupE2EEManager 是群组 E2EE 的纯工具类,与 E2EEManager 平行。密码学和状态管理全自动,网络发送由调用方负责。
实例化
from aun_core.e2ee import GroupE2EEManager
from aun_core.keystore.file import FileKeyStore
group_e2ee = GroupE2EEManager(
identity_fn=lambda: my_identity,
keystore=FileKeyStore("~/.aun/myapp"),
)建群后创建 epoch 并分发
info = group_e2ee.create_epoch(group_id, member_aids)
# info = {epoch, commitment, distributions: [{to, payload}, ...]}
for dist in info["distributions"]:
# 通过 P2P E2EE 发送密钥分发消息
envelope, _ = e2ee.encrypt_message(
to_aid=dist["to"], payload=dist["payload"],
peer_cert_pem=fetch_cert(dist["to"]),
)
await my_rpc_call("message.send", {
"to": dist["to"], "payload": envelope,
"type": "e2ee.encrypted", "encrypted": True,
...
})加密群消息
envelope = group_e2ee.encrypt(group_id, {"type": "text", "text": "hello"})
await my_rpc_call("group.send", {
"group_id": group_id,
"payload": envelope,
"type": "e2ee.group_encrypted",
"encrypted": True,
})解密群消息
decrypted = group_e2ee.decrypt(raw_group_message)
# 内置防重放 + 外层 group_id/from/sender_aid 校验
# 非加密消息原样返回,解密失败返回 None
# 批量解密(用于 group.pull)
results = group_e2ee.decrypt_batch(messages)处理 P2P 密钥消息
所有密钥协议消息(分发/请求/响应)通过 P2P E2EE 传输。收到后先 P2P 解密,再交给 handle_incoming:
inner = e2ee.decrypt_message(p2p_message) # P2P 层解密
result = group_e2ee.handle_incoming(inner["payload"])
if result == "distribution":
pass # 密钥已自动存储
elif result == "distribution_rejected":
pass # epoch 降级被拒
elif result == "response":
pass # 密钥恢复响应已存储
elif result == "request":
# 需要回复:查成员列表 → 构建响应 → P2P 发送
members = get_group_members(group_id)
response = group_e2ee.handle_key_request_msg(inner["payload"], members)
if response:
p2p_e2ee_send(inner["payload"]["requester_aid"], response)踢人后轮换
info = group_e2ee.rotate_epoch(group_id, remaining_member_aids)
for dist in info["distributions"]:
p2p_e2ee_send(dist["to"], dist["payload"])配合服务端 CAS 防脑裂(推荐):
# 1. 读当前 epoch
status = await my_rpc_call("group.e2ee.get_epoch", {"group_id": group_id})
# 2. CAS 递增(服务端校验 admin/owner 角色 + 原子递增 + rotation_signature 验签)
cas = await my_rpc_call("group.e2ee.rotate_epoch", {
"group_id": group_id, "current_epoch": status["epoch"],
# Python SDK 自动附加 rotation_signature 和 rotation_timestamp
# 裸客户端必须自行签名:sign("{group_id}|{current_epoch}|{new_epoch}|{aid}|{timestamp}")
})
if cas["success"]:
info = group_e2ee.rotate_epoch_to(group_id, cas["epoch"], member_aids)
for dist in info["distributions"]:
p2p_e2ee_send(dist["to"], dist["payload"])
# CAS 失败说明别的 admin 先轮换了,放弃即可加人后轮换并分发密钥
# 成员加入改变成员集,推荐先通过服务端 CAS 推进 epoch。
status = await my_rpc_call("group.e2ee.get_epoch", {"group_id": group_id})
cas = await my_rpc_call("group.e2ee.rotate_epoch", {"group_id": group_id, "current_epoch": status["epoch"], ...})
if cas["success"]:
info = group_e2ee.rotate_epoch_to(group_id, cas["epoch"], updated_member_aids)
for dist in info["distributions"]:
p2p_e2ee_send(dist["to"], dist["payload"])解密失败时请求恢复
当前 Python SDK 优先请求本地成员列表中的第一个候选者;零状态时退化为请求当前消息发送者。
recovery = group_e2ee.build_recovery_request(
group_id, epoch, sender_aid=msg.get("sender_aid"),
)
if recovery:
p2p_e2ee_send(recovery["to"], recovery["payload"])
# 频率限制:同群同 epoch 30 秒内不重复请求状态查询
group_e2ee.has_secret(group_id) # 是否持有密钥
group_e2ee.current_epoch(group_id) # 当前 epoch,无密钥返回 None
group_e2ee.get_member_aids(group_id) # 已知成员列表
group_e2ee.load_all_secrets(group_id) # {epoch: secret_bytes} 映射
group_e2ee.cleanup(group_id) # 清理过期旧 epoch(默认保留 7 天)自定义密钥存储
实现 KeyStore Protocol,可替换默认的文件存储:
class MyKeyStore:
def load_key_pair(self, aid: str) -> dict | None: ...
def save_key_pair(self, aid: str, key_pair: dict) -> None: ...
def load_cert(self, aid: str) -> str | None: ...
def save_cert(self, aid: str, cert_pem: str) -> None: ...
def load_identity(self, aid: str) -> dict | None: ...
def save_identity(self, aid: str, identity: dict) -> None: ...
def load_metadata(self, aid: str) -> dict | None: ...
def save_metadata(self, aid: str, metadata: dict) -> None: ...
client = AUNClient({"aun_path": "..."})AUNClient 不接收外部 keystore 参数。若需替换 KeyStore,应按 SDK 内部接口规范扩展实现,再由对应语言 SDK 的内部装配层接入。
自定义敏感数据存储
实现 SecretStore Protocol。默认行为:Windows 使用 DPAPI,其他平台使用内存存储。
class MySecretStore:
def protect(self, scope: str, name: str, plaintext: bytes) -> dict: ...
def reveal(self, scope: str, name: str, record: dict) -> bytes | None: ...
def clear(self, scope: str, name: str) -> None: ...
client = AUNClient({"aun_path": "..."})AUNClient 构造函数只保留 aun_path、root_ca_path、seed_password。SecretStore / KeyStore / SQLiteBackup 属于 SDK 内部基础设施,不作为应用层构造参数暴露。
当前安全默认值
| 默认行为 | 说明 |
|---|---|
| P2P 消息默认要求发送方签名 | 无 sender_signature 的消息被拒绝 |
| 群组消息默认要求发送方签名 | require_signature=True,无签名或无发送方证书的消息被拒绝 |
| 群组 E2EE 为固定启用能力 | group_e2ee=true,不可关闭 |
| 默认要求前向保密 | require_forward_secrecy=true,无 prekey 时拒绝 long_term_key 降级 |
| 客户端操作签名 | group.send/group.kick/group.add_member/group.leave 等操作自动附加 client_signature,服务端强制验签 |

