Agent 如何同时活在钉钉、Telegram、Discord 和微信里?

How Hermes Agent Gateway Unifies 18 IM Platforms with a Single Codebase

上周团队在规划 Agent 的多渠道接入方案。有人说"每个 IM 写一套 adapter",有人说"统一用 Webhook 接收然后标准化"。

我打开 Hermes Agent 的代码仓库,gateway/platforms/ 目录下躺着 18 个平台适配器——从 Telegram、Discord 到钉钉、飞书、企业微信、QQ 机器人,甚至还有 iMessage(BlueBubbles)、Signal 和 Home Assistant。

“所有平台共享同一个 Agent Loop,同一套 Session 管理,同一套工具调用。”

他们问:“那 18 个 adapter 之间代码复用率有多少?”

我给他们看了一张图。

整体架构:一个 Agent,多个嘴巴

Hermes Agent 的 Gateway 架构可以抽象为一个四层模型:

┌──────────────────────────────────────────────────────────────────────┐
│  Layer 1: Platform Adapters (gateway/platforms/*.py)                  │
│  ┌─────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐           │
│  │Telegram │ │Discord │ │钉钉    │ │飞书    │ │WeCom   │  ... ×18   │
│  │Bot API  │ │Bot API │ │Stream  │ │Callback│ │Callback│           │
│  └────┬────┘ └───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘           │
│       │           │          │          │          │                 │
│       └───────────┴──────────┴──────────┴──────────┘                 │
│                        │                                            │
│                   MessageEvent (标准化消息)                            │
│                        │                                            │
├────────────────────────┼────────────────────────────────────────────┤
│  Layer 2: Gateway Runner (gateway/run.py)                           │
│  ┌──────────────────────┐  ┌──────────────────────┐                │
│  │  SessionStore        │  │  DeliveryRouter       │                │
│  │  - 会话生命周期       │  │  - 消息路由           │                │
│  │  - 会话重置策略       │  │  - 跨平台投递         │                │
│  │  - AIAgent 缓存      │  │  - origin/local/显式   │                │
│  └──────────┬───────────┘  └───────────┬───────────┘                │
│             │                          │                             │
├─────────────┼──────────────────────────┼────────────────────────────┤
│  Layer 3: AIAgent (run_agent.py)       │                             │
│  ┌─────────────────────────────────┐  │                             │
│  │  run_conversation()             │  │                             │
│  │  - LLM API 调用                 │  │                             │
│  │  - 工具发现与执行               │  │                             │
│  │  - 上下文压缩                    │  │                             │
│  │  - 回调通知 → Layer 2           │  │                             │
│  └─────────────────────────────────┘  │                             │
│                                       │                             │
├───────────────────────────────────────┼────────────────────────────┤
│  Layer 4: Tools & Infrastructure      │                             │
│  terminal / web_search / file / MCP   │  ← 所有平台共享              │
│  delegate_task / cronjob / skill      │                             │
└───────────────────────────────────────┴────────────────────────────┘

核心思想:Layer 1 负责"翻译"——把各平台原生消息格式翻译成统一的 MessageEvent,把 Agent 输出翻译成平台原生 API 调用。Layer 2-4 完全不知道消息来自哪里,它们只看到一个标准化的对话请求。

Layer 1:平台适配器基类

所有 18 个适配器都继承自同一个抽象基类 BasePlatformAdapter

class BasePlatformAdapter(ABC):
    """所有平台适配器的基类。
    
    子类实现平台特定的连接、认证、收发消息逻辑。
    """
    def __init__(self, config: PlatformConfig, platform: Platform):
        self.config = config
        self.platform = platform
        self._message_handler = None       # 消息处理回调(指向 GatewayRunner)
        self._busy_session_handler = None  # 会话忙碌时的处理回调
        self._active_sessions = {}         # {session_key: asyncio.Event()}
        self._pending_messages = {}        # 等待处理的消息队列
        self._background_tasks = set()     # 后台处理任务
# generated by hugo AI

每个子类只需要实现以下几个方法:

方法职责示例
connect()连接平台 APITelegram: aiogram Bot polling
disconnect()断开连接关闭 polling / webhook
send()发送消息调用平台 sendMessage API
send_typing()发送"正在输入"状态Telegram: sendChatAction
send_image()发送图片上传并发送文件
send_voice()发送语音TTS 音频发送
send_document()发送文件文档/代码块发送

这就是复用率高的秘诀——基类已经实现了消息处理、会话管理、重试、流式输出、打字指示器、图片缓存等所有通用逻辑,子类只需要"填空"实现平台特定的 API 调用。

消息标准化:MessageEvent

每个平台收到原始消息后,统一包装成 MessageEvent

class MessageType(Enum):
    TEXT = "text"
    PHOTO = "photo"
    VOICE = "voice"
    VIDEO = "video"
    DOCUMENT = "document"
    AUDIO = "audio"
    STICKER = "sticker"
    CONTACT = "contact"
    LOCATION = "location"

@dataclass
class MessageEvent:
    """标准化消息事件——所有平台统一格式"""
    source: SessionSource     # 来源:平台、chat_id、user_id、线程 ID
    text: str                 # 消息文本
    message_type: MessageType # 消息类型
    message_id: str           # 平台消息 ID(用于回复)
    user_id: str              # 发送者 ID
    user_name: Optional[str]  # 发送者名称
    media_urls: List[str]     # 媒体文件 URL
    media_types: List[str]    # 媒体 MIME 类型
    reply_to_message_id: Optional[str]  # 回复的消息 ID
    metadata: Dict[str, Any]  # 平台扩展字段
    
    def is_command(self) -> bool:
        """是否是斜杠命令"""
        return self.text.startswith('/')
    
    def get_command(self) -> Optional[str]:
        """提取命令名:/model → model"""
        if not self.is_command():
            return None
        return self.text.split()[0][1:].split('@')[0]
    
    def get_command_args(self) -> str:
        """提取命令参数:/model claude-sonnet → claude-sonnet"""
# generated by hugo AI

无论消息来自 Telegram 的 Message 对象、Discord 的 on_message 事件、还是钉钉的 Stream 回调,最终都归一化为这个 MessageEvent。这就是为什么 Gateway Runner 不需要关心"消息从哪来"。

Layer 2:Gateway Runner——大脑

GatewayRunner 是整个 Gateway 的控制中心,负责:

1. 适配器生命周期管理

class GatewayRunner:
    def __init__(self, config: GatewayConfig):
        self.config = config
        self.adapters: Dict[Platform, BasePlatformAdapter] = {}
        self.session_store = SessionStore(...)
        self.delivery_router = DeliveryRouter(self.config)
        self._running_agents: Dict[str, AIAgent] = {}  # 会话 → Agent
        self._agent_cache: OrderedDict = OrderedDict()  # Agent 缓存
# generated by hugo AI

启动时,GatewayRunner 遍历配置中启用的平台,动态加载对应的适配器:

async def start_all_platforms(self):
    for platform in self.config.get_connected_platforms():
        adapter = await self._create_adapter(platform)
        adapter.set_message_handler(self._handle_message)
        adapter.set_busy_session_handler(self._handle_busy_session)
        adapter.set_session_store(self.session_store)
        await adapter.connect()
        self.adapters[platform] = adapter
# generated by hugo AI

2. Session Key——会话身份

这是 Gateway 最精妙的设计之一。不同平台的"会话"概念差异巨大:

  • DM(私聊):一对一对话,每个用户一个会话
  • Group(群聊):多人共享,默认每用户隔离会话
  • Thread(线程):Discord/Slack 的线程,默认共享会话
  • Forum Topic:Telegram 论坛话题,类似 thread

Session Key 是一个确定性的字符串,格式为:

agent:main:{platform}:{chat_type}:{chat_id}[:{user_id}]

具体规则由 build_session_key() 统一控制:

def build_session_key(source, group_sessions_per_user=True, thread_sessions_per_user=False):
    platform = source.platform.value
    
    if source.chat_type == "dm":
        # 私聊:每个 DM 独立会话
        return f"agent:main:{platform}:dm:{source.chat_id}"
    
    # 群聊:默认按用户隔离
    if group_sessions_per_user and source.user_id:
        return f"agent:main:{platform}:group:{source.chat_id}:{source.user_id}"
    
    # 线程:默认共享(所有参与者看到同一上下文)
    if not thread_sessions_per_user:
        return f"agent:main:{platform}:thread:{source.chat_id}:{source.thread_id}"
    
    # 线程按用户隔离
    return f"agent:main:{platform}:thread:{source.chat_id}:{source.thread_id}:{source.user_id}"
# generated by hugo AI

这个设计解决了一个实际问题:同一个群里,不同用户和 Agent 对话,上下文不串。同时,Discord 线程里的所有参与者看到的是同一段对话。

3. 消息处理流水线

MessageEvent 到达
adapter.handle_message(event)
    ├─ 是斜杠命令?→ 命令旁路(/new, /stop, /approve 等)
    │   └─ 不走 Agent Loop,直接由 GatewayRunner 处理
    ├─ 会话正在运行?
    │   ├─ 是 → 消息进入 _pending_messages(队列)
    │   │       触发 interrupt(通知 Agent 有新消息)
    │   │
    │   └─ 否 → 创建新会话
_process_message_background(event, session_key)
    ├─ 1. 预处理:图片缓存、STT 语音转文字、文档下载
    ├─ 2. 获取或创建 AIAgent(从缓存中取,保持 prompt caching)
    ├─ 3. 构建会话上下文(SessionSource → 系统提示注入)
    ├─ 4. 加载历史消息(从 SessionStore)
    ├─ 5. 调用 AIAgent.run_conversation()
    ├─ 6. Agent 输出通过回调 → adapter.send() 回平台
    └─ 7. 清理会话标记、处理 pending messages

4. Agent 缓存:保持 Prompt Caching

这是 Gateway 和 CLI 的关键区别——Gateway 是长寿命进程,可能同时服务成百上千个会话。如果每条消息都新建 AIAgent 实例:

  1. Prompt Caching 全部失效——Anthropic/Claude 的 prefix caching 要求系统提示和历史消息的前缀不变
  2. 内存泄漏——每个 Agent 持有 LLM client、tool schemas、memory providers
  3. 启动延迟——每次初始化都要重新发现工具、加载 skills

解决方案:按 session_key 缓存 AIAgent 实例:

# OrderedDict 实现 LRU 淘汰
self._agent_cache: OrderedDict[str, tuple] = OrderedDict()

def _get_or_create_agent(self, session_key, ...):
    if session_key in self._agent_cache:
        # 缓存命中——保留 prompt caching
        agent, signature = self._agent_cache[session_key]
        self._agent_cache.move_to_end(session_key)  # LRU 更新
        return agent
    
    # 缓存未命中——新建
    agent = AIAgent(...)
    self._agent_cache[session_key] = (agent, signature)
    
    # 超过容量?淘汰最老的
    if len(self._agent_cache) > _AGENT_CACHE_MAX_SIZE:
        oldest_key, _ = self._agent_cache.popitem(last=False)
    
    return agent
# generated by hugo AI

默认容量 128,空闲 TTL 1 小时。超出容量的 Agent 会被安全清理(关闭连接、保存状态)。

5. 中断机制

和 CLI 一样,Gateway 支持运行中打断。但实现方式不同:

# BasePlatformAdapter.handle_message()
async def handle_message(self, event: MessageEvent):
    session_key = build_session_key(event.source)
    
    if session_key in self._active_sessions:
        # 会话正在运行——新消息触发中断
        self._pending_messages[session_key] = event
        self._active_sessions[session_key].set()  # 触发 Event
        return
    
    # 会话空闲——创建新任务
    self._active_sessions[session_key] = asyncio.Event()
    task = asyncio.create_task(self._process_message_background(event, session_key))
# generated by hugo AI

关键设计_active_sessions 是一个 asyncio.Event 字典。当新消息到达时,set() 触发 Event,后台处理任务检测到信号后安全退出(保存进度、清理状态),然后新的消息接管处理。

Layer 3:AIAgent——共享大脑

无论消息来自哪个平台,最终都汇聚到同一个 AIAgent.run_conversation()。这个函数不关心:

  • 消息是 Telegram 用户发的还是钉钉群里来的
  • 用户是用中文还是英文提问
  • 附件是图片、语音还是文档

它只接收:

result = agent.run_conversation(
    user_message=processed_text,        # 处理后的文本
    conversation_history=history,       # 该 session 的历史
    stream_callback=stream_cb,          # 流式输出回调
    task_id=session_key,                # 会话标识
)
# generated by hugo AI

Agent 内部的回调(stream_callbackstatus_callbacktool_progress_callback)由 Gateway Runner 注入,把输出路由回正确的平台:

def _make_stream_callback(adapter, event):
    """为流式输出创建平台特定的回调"""
    async def callback(delta: str):
        # 累积 delta,按间隔编辑消息
        await adapter.send_typing(event.source.chat_id)
        # ... 流式更新逻辑
    return callback
# generated by hugo AI

消息回流:Delivery Router

Agent 的输出不一定发回原处。Gateway 的 DeliveryRouter 支持灵活的路由:

# 投递目标格式:
# "origin"          → 回消息来源
# "local"           → 只保存本地文件
# "telegram"        → Telegram 的 home channel
# "telegram:123456" → 指定的 Telegram 群
# "dingtalk:#dev"   → 钉钉的 #dev 频道

class DeliveryTarget:
    @classmethod
    def parse(cls, target: str, origin: SessionSource):
        if target == "origin":
            return cls(platform=origin.platform, chat_id=origin.chat_id)
        if target == "local":
            return cls(platform=Platform.LOCAL)
        if ":" in target:
            # platform:chat_id 或 platform:chat_id:thread_id
            parts = target.split(":", 2)
            platform = Platform(parts[0])
            return cls(platform=platform, chat_id=parts[1])
        return cls(platform=Platform(target))  # 平台名 → home channel
# generated by hugo AI

这在 Cron 任务中特别有用——一个定时任务可以在服务器上运行,但输出投递到多个频道:

# Cron job 配置
deliver: ["origin", "dingtalk:#ops-team", "local"]
# generated by hugo AI

流式输出:跨平台的渐进式更新

IM 平台通常不支持"打字机效果",但 Gateway 实现了类似的体验:

# StreamingConfig
enabled: bool = True
transport: str = "edit"        # Telegram: editMessageText
edit_interval: float = 1.0     # 每秒更新一次(Telegram 限速)
buffer_threshold: int = 40     # 积累 40 字符才发
cursor: str = " ▉"            # 流式光标
# generated by hugo AI
Token 1 → buffer: "H" → 不足 40,等待
Token 2 → buffer: "He" → 等待
...
Token 40 → buffer: "Hello, 我是 Hermes Agent ▉" → 发送 editMessageText
Token 41 → buffer: "!" → 等待
Token 80 → buffer: "! 有什么可以帮你的 ▉" → 发送 editMessageText
...
完成 → 移除光标,最终编辑

不同平台的速率限制不同——Telegram 限制每秒一次消息编辑,Discord 可以连续发多条消息。BasePlatformAdapter 通过 _send_with_retry 统一处理重试和速率限制。

媒体处理流水线

IM 平台收到的图片和语音是临时 URL(Telegram 文件 URL 约 1 小时过期),需要本地缓存:

用户发送图片 (Telegram Photo)
adapter 下载 → cache_image_from_url()
    │            ├─ SSRF 防护(is_safe_url)
    │            ├─ 重试 + 指数退避
    │            └─ 保存到 ~/.hermes/cache/images/
构建文本占位符: "[User sent an image: /path/to/cache/img_abc123.jpg]"
AIAgent.run_conversation() 收到文本
vision_analyze 工具自动识别图片 → 生成描述
Agent 回复 → adapter.send_image() 发回平台

语音消息类似——下载 → 缓存 → STT 转文字 → Agent 处理。如果用户开启了 /voice 模式,Agent 的回复还会经过 TTS 转为语音发回。

会话生命周期管理

Gateway 的会话不是永恒的——它有一套完整生命周期:

创建 → 活跃 → (可选: 重置) → 归档

触发重置的条件:
├─ 用户主动: /new, /reset
├─ 每日重置: 凌晨 4 点自动清理
├─ 空闲超时: 24 小时无活动
└─ 上下文溢出: 压缩后仍然超限
@dataclass
class SessionResetPolicy:
    mode: str = "both"      # "daily", "idle", "both", "none"
    at_hour: int = 4        # 每日重置时间点
    idle_minutes: int = 1440  # 24 小时空闲重置
    notify: bool = True     # 重置时通知用户
# generated by hugo AI

策略可以按平台、会话类型分别配置。比如:

# 群聊会话每天重置,DM 永不重置
session_reset:
  mode: "both"
  at_hour: 4
  idle_minutes: 1440

# 钉钉 DM 会话永不重置
reset_by_type:
  dm:
    mode: "none"

安全机制

Gateway 作为公网暴露的服务,安全是头等大事:

1. SSRF 防护

# 下载媒体文件前检查 URL 安全性
from tools.url_safety import is_safe_url
if not is_safe_url(url):
    raise ValueError(f"Blocked unsafe URL (SSRF protection)")

# 重定向也要检查——防止 302 跳转到内网
async with httpx.AsyncClient(
    follow_redirects=True,
    event_hooks={"response": [_ssrf_redirect_guard]},
) as client:
    response = await client.get(url)
# generated by hugo AI

2. DM 配对机制

未经授权的 DM 消息会被自动处理:

unauthorized_dm_behavior: str = "pair"  # "pair" or "ignore"

# "pair" 模式:用户发送 /pair <code> 进行配对
# "ignore" 模式:直接忽略未授权消息
# generated by hugo AI

配对码通过 PairingStore 管理,确保只有授权用户才能与 Agent 对话。

3. UTF-16 长度计算

Telegram 的消息长度限制是 4096 UTF-16 code units,不是 Python 的 len()

def utf16_len(s: str) -> int:
    """计算 UTF-16 code units 数量"""
    return len(s.encode("utf-16-le")) // 2

def truncate_message(text: str, limit: int = 4000) -> str:
    """安全截断,不切坏 surrogate pair"""
    if utf16_len(text) <= limit:
        return text
    # 二分查找最长安全前缀
    lo, hi = 0, len(text)
    while lo < hi:
        mid = (lo + hi + 1) // 2
        if utf16_len(text[:mid]) <= limit:
            lo = mid
        else:
            hi = mid - 1
    return text[:lo]
# generated by hugo AI

这个细节很多开源项目都踩过坑——Python 的 len("😀") 返回 1,但 Telegram 认为它是 2 个 code unit。

Hook 系统:扩展点

Gateway 内置了一套事件钩子系统,允许在不修改核心代码的情况下扩展行为:

# HookRegistry 管理所有钩子
class HookRegistry:
    def register(self, event_name: str, handler: Callable):
        ...
    
    async def emit(self, event_name: str, *args, **kwargs):
        for handler in self._handlers.get(event_name, []):
            await handler(*args, **kwargs)

# 内置钩子:
# - message.received: 消息到达时
# - message.sent: 消息发出时
# - agent.start: Agent 开始处理时
# - agent.complete: Agent 完成时
# - session.reset: 会话重置时
# generated by hugo AI

比如 boot_md 钩子在 Gateway 启动时自动生成 ~/.hermes/boot.md 文件,记录当前运行状态,方便重启后恢复上下文。

架构决策总结

决策选择原因
通信模式长连接(Bot polling/Stream)实时性好,适合 IM 场景
适配器模式基类 + 子类继承90% 逻辑复用,各平台只需实现 API 调用
消息格式统一 MessageEventAgent 不需要知道平台细节
会话管理确定性 Session Key可重现、可调试、支持重置
Agent 实例缓存 + LRU 淘汰保持 Prompt Caching,控制内存
流式输出editMessageText 渐进更新适应 IM 平台限制
媒体处理本地缓存 + 过期清理解决临时 URL 过期问题
安全SSRF 防护 + DM 配对公网暴露必须有边界

一句话总结

Gateway 的本质是协议翻译器——18 个平台的差异被完全隔离在 Adapter 层,Agent 只看到一个标准化的对话流。新平台接入只需要实现 6 个方法(connect/send/send_typing/send_image/send_voice/disconnect),剩下的会话管理、Agent 缓存、流式输出、安全机制全部由基类和 Runner 提供。

你在实际项目中接入过几个 IM 平台?最大的坑是什么?欢迎留言讨论。


See also