当 10 万个定时任务同时敲门:MaaS 平台调度优化实战

从整点风暴到分布式调度——平台视角的六个关键策略

上周五下午 3 点,告警群炸了:MaaS 层的 GPU 推理集群 QPS 在 60 秒内从 1200 飙到 18000,p99 延迟从 800ms 打到 45 秒,大量请求 429。

排查发现原因很"朴素"——大约 3 万个 OpenClaw 实例的定时任务都跑在整点。每个实例可能只有 1-3 个 cron job(数据摘要、定时巡检、报表生成),但所有人的 cron 都写着 0 * * * *0 0 * * *。三万乘以三,就是整点瞬间涌来的近十万个 LLM 推理请求。

这不是应用层的 bug,而是平台设计的缺陷。当你的平台承载成千上万个租户的定时任务时,“整点风暴"不是意外——它是必然。问题是:作为平台设计者,你该怎么办?

一、问题本质:整点惊群的数学必然性

先理解为什么整点风暴必然发生。

人类设置 cron 表达式有强烈的"整数偏好”:研究表明,超过 80% 的 cron 任务被设置在 :00:30 分。在单机或小规模部署中,这无所谓——几十个任务的叠加不构成威胁。但在多租户 MaaS 平台上,这个"无所谓"被放大了数万倍。

请求分布(未优化):

QPS
18000│          ╱╲
     │         ╱  ╲
     │        ╱    ╲
     │       ╱      ╲
 1200│──────╱        ╲──────────
     └──────┬──┬──┬──┬──────── 时间
          :58 :00 :02 :04

峰值 = 基线 × 15,持续约 2-3 分钟

更棘手的是,MaaS 层的资源是刚性的——GPU 算力不像 CPU 那样可以瞬间弹性扩展。一张 A100 同时能处理的推理请求是有上限的,排队就意味着超时,超时就意味着用户侧任务失败。

以下是我们实施的六个策略,按优先级排序。

二、策略一:平台侧请求打散(Jitter 注入)

这是投入产出比最高的一招,不需要用户做任何改变。

核心思路:平台在调度层为每个租户的 cron 触发时间自动注入一个确定性的偏移量(deterministic jitter),将整点的流量尖峰摊平到一个时间窗口内。

import hashlib

def calculate_jitter(tenant_id: str, task_id: str, window_seconds: int = 300) -> int:
    """
    为每个租户的每个任务计算一个确定性的延迟偏移量。
    
    使用 hash 而非 random,保证:
    1. 同一个任务每次触发的偏移量相同(可预期性)
    2. 不同租户的偏移量均匀分布(打散效果)
    3. 无需持久化存储偏移量
    """
    seed = f"{tenant_id}:{task_id}"
    hash_value = int(hashlib.sha256(seed.encode()).hexdigest(), 16)
    jitter = hash_value % window_seconds
    return jitter

def schedule_with_jitter(tenant_id: str, task_id: str, cron_expr: str):
    """调度时自动注入 jitter,用户无感知"""
    jitter_seconds = calculate_jitter(tenant_id, task_id, window_seconds=300)
    
    # 在 cron 触发后延迟 jitter_seconds 再实际执行
    scheduler.add_job(
        execute_task,
        trigger=CronTrigger.from_crontab(cron_expr),
        args=[tenant_id, task_id],
        misfire_grace_time=600,
        jitter=jitter_seconds  # 平台自动注入
    )
    # generated by hugo's coding agent

为什么用确定性 hash 而不是随机数?三个原因:

  1. 可调试:用户问"我的任务为什么比整点晚了 47 秒",你能精确解释
  2. 幂等性:重启调度器后偏移量不变,不会导致任务时间漂移
  3. 均匀性:SHA256 的输出分布足够均匀,大量租户的 jitter 天然均匀分布

效果实测:

指标优化前5 分钟窗口打散改善
整点峰值 QPS18,0003,800-79%
p99 延迟45s2.1s-95%
429 错误率12%0.3%-97%

五分钟的打散窗口就把问题解决了大半。 但仅此不够——打散只解决了"同时来"的问题,没有解决"来太多"的问题。

三、策略二:用户分层与差异化配额

不是所有租户都应该获得相同的调度待遇。平台需要按用户层级设计不同的调度策略。

┌────────────────────────────────────────────────────────────┐
│                    调度请求入口                              │
└──────────────────────┬─────────────────────────────────────┘
                ┌──────▼──────┐
                │  租户分层路由  │
                └──┬───┬───┬──┘
                   │   │   │
        ┌──────────┘   │   └──────────┐
        │              │              │
  ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
  │  企业用户   │ │  付费用户   │ │  免费用户   │
  │            │ │            │ │            │
  │ • 专属队列  │ │ • 高优队列  │ │ • 共享队列  │
  │ • 无 jitter│ │ • 小 jitter│ │ • 大 jitter│
  │ • 预留容量  │ │ • 突发容量  │ │ • 尽力调度  │
  │ • SLA 保证  │ │ • 有限重试  │ │ • 可降级    │
  └─────┬─────┘ └─────┬─────┘ └─────┬─────┘
        │              │              │
  ┌─────▼──────────────▼──────────────▼─────┐
  │            MaaS 推理集群                   │
  │  ┌──────┐  ┌──────────┐  ┌──────────┐   │
  │  │预留池 │  │ 弹性池    │  │ 共享池    │   │
  │  └──────┘  └──────────┘  └──────────┘   │
  └─────────────────────────────────────────┘

分层配额设计

from dataclasses import dataclass
from enum import Enum

class TierLevel(Enum):
    FREE = "free"
    PRO = "pro"
    ENTERPRISE = "enterprise"

@dataclass
class SchedulingPolicy:
    max_concurrent_tasks: int     # 最大并发任务数
    jitter_window_seconds: int    # 打散窗口大小
    max_tasks_per_hour: int       # 每小时最大任务数
    priority: int                 # 队列优先级 (1=最高)
    retry_budget: int             # 失败重试次数
    can_degrade: bool             # 是否可降级
    reserved_capacity: float      # 预留容量比例

TIER_POLICIES = {
    TierLevel.FREE: SchedulingPolicy(
        max_concurrent_tasks=2,
        jitter_window_seconds=600,    # 10 分钟打散窗口
        max_tasks_per_hour=10,
        priority=3,
        retry_budget=1,
        can_degrade=True,             # 高峰期可降级
        reserved_capacity=0.0,
    ),
    TierLevel.PRO: SchedulingPolicy(
        max_concurrent_tasks=10,
        jitter_window_seconds=120,    # 2 分钟打散窗口
        max_tasks_per_hour=60,
        priority=2,
        retry_budget=3,
        can_degrade=False,
        reserved_capacity=0.0,
    ),
    TierLevel.ENTERPRISE: SchedulingPolicy(
        max_concurrent_tasks=50,
        jitter_window_seconds=0,      # 无打散,按需调度
        max_tasks_per_hour=500,
        priority=1,
        retry_budget=5,
        can_degrade=False,
        reserved_capacity=0.2,        # 预留 20% 容量
    ),
}
# generated by hugo's coding agent

关键设计决策:

  • 免费用户用大窗口打散:10 分钟的 jitter 窗口不影响"每小时跑一次"的语义,但极大缓解了峰值
  • 企业用户不打散:企业为精确调度付了费,他们的 SLA 承诺了时间精度
  • 付费用户折中:2 分钟窗口,在精度和平滑之间取平衡

四、策略三:多级优先级队列

打散解决的是时间维度的问题,优先级队列解决的是资源分配的问题:当请求量超过容量时,谁先被服务?

import heapq
import time
from threading import Lock

class PriorityTaskQueue:
    """
    多级优先级任务队列。
    
    优先级规则:
    1. 企业用户 > 付费用户 > 免费用户
    2. 同级内,等待时间长的优先(防饥饿)
    3. 带 SLA deadline 的任务优先级动态提升
    """
    def __init__(self):
        self._queue = []  # min-heap: (priority_score, enqueue_time, task)
        self._lock = Lock()
    
    def enqueue(self, task: dict):
        with self._lock:
            score = self._calculate_priority(task)
            heapq.heappush(self._queue, (score, time.monotonic(), task))
    
    def dequeue(self) -> dict | None:
        with self._lock:
            if not self._queue:
                return None
            # 出队前重新计算优先级(处理动态提升)
            self._rebalance()
            _, _, task = heapq.heappop(self._queue)
            return task
    
    def _calculate_priority(self, task: dict) -> float:
        tier = task["tier"]
        base_priority = {"enterprise": 0, "pro": 1000, "free": 2000}[tier]
        
        # SLA deadline 临近时动态提升优先级
        if "deadline" in task:
            remaining = task["deadline"] - time.time()
            if remaining < 60:  # 不到 1 分钟就要超 SLA 了
                base_priority = max(0, base_priority - 1500)
        
        return base_priority
    
    def _rebalance(self):
        """定期重新计算,防止低优先级任务饿死"""
        now = time.monotonic()
        new_queue = []
        for score, enqueue_time, task in self._queue:
            wait_time = now - enqueue_time
            # 每等待 60 秒,优先级提升 100 分
            age_bonus = min(wait_time / 60 * 100, 800)
            adjusted = self._calculate_priority(task) - age_bonus
            heapq.heappush(new_queue, (adjusted, enqueue_time, task))
        self._queue = new_queue
    # generated by hugo's coding agent

防饥饿(starvation prevention)是优先级队列设计中最容易被忽略的点。如果企业用户的请求永远排在前面,免费用户在高峰期可能等到超时也执行不了。等待时间越长、优先级越高的"老化"机制可以兜底。

五、策略四:容量规划——从"猜"到"算"

前三个策略是"需求侧"的优化——减少峰值、排优先级。但真正要回答的根本问题是:我到底需要多少容量?

容量建模

@dataclass
class CapacityModel:
    """
    MaaS 层容量规划模型。
    
    核心等式:
    所需容量 = 峰值 QPS × 单请求处理时间 × 安全系数
    """
    total_tenants: int             # 总租户数
    avg_tasks_per_tenant: float    # 每租户平均定时任务数
    peak_multiplier: float         # 峰值 / 均值 比率
    avg_inference_time_ms: float   # 单次推理平均耗时
    target_p99_latency_ms: float   # p99 延迟目标
    safety_factor: float = 1.5    # 安全系数
    
    def calculate_baseline_qps(self) -> float:
        """计算小时级平均 QPS"""
        total_tasks_per_hour = self.total_tenants * self.avg_tasks_per_tenant
        return total_tasks_per_hour / 3600
    
    def calculate_peak_qps(self, jitter_window: int = 300) -> float:
        """
        计算峰值 QPS(考虑 jitter 打散效果)。
        jitter_window: 打散窗口大小(秒),窗口越大峰值越平
        """
        total_tasks = self.total_tenants * self.avg_tasks_per_tenant
        # 打散后,任务均匀分布在 jitter_window 内
        peak_qps = total_tasks / jitter_window
        return peak_qps * self.peak_multiplier
    
    def required_gpu_instances(self, jitter_window: int = 300) -> int:
        """计算所需的 GPU 推理实例数"""
        peak_qps = self.calculate_peak_qps(jitter_window)
        # 每个实例每秒能处理的请求数
        instance_throughput = 1000 / self.avg_inference_time_ms
        raw_count = peak_qps / instance_throughput
        return int(raw_count * self.safety_factor) + 1

# 示例:3 万租户,平均 3 个定时任务/小时
model = CapacityModel(
    total_tenants=30000,
    avg_tasks_per_tenant=3.0,
    peak_multiplier=2.0,
    avg_inference_time_ms=800,
    target_p99_latency_ms=3000,
)

print(f"基线 QPS: {model.calculate_baseline_qps():.0f}")
print(f"峰值 QPS (无打散): {model.calculate_peak_qps(jitter_window=1):.0f}")
print(f"峰值 QPS (5分钟打散): {model.calculate_peak_qps(jitter_window=300):.0f}")
print(f"所需 GPU 实例 (无打散): {model.required_gpu_instances(jitter_window=1)}")
print(f"所需 GPU 实例 (5分钟打散): {model.required_gpu_instances(jitter_window=300)}")
# generated by hugo's coding agent

跑一下输出:

基线 QPS: 25
峰值 QPS (无打散): 180000
峰值 QPS (5分钟打散): 600
所需 GPU 实例 (无打散): 216001
所需 GPU 实例 (5分钟打散): 721

没有打散时,峰值是基线的 7200 倍。 5 分钟打散窗口把这个数字压缩到 24 倍。这就是为什么 jitter 是第一优先级——它直接改变了容量规划的量级。

六、策略五:弹性扩容——为峰值做好准备

即使做了打散,峰值仍然是基线的数倍到数十倍。为峰值预留固定容量太浪费,需要弹性扩容。

预测性扩容 vs 响应式扩容

                ┌─────────────────────────────┐
                │        调度引擎              │
                └──────────┬──────────────────┘
                    ┌──────▼──────┐
                    │  容量决策器   │
                    └──┬──────┬──┘
                       │      │
            ┌──────────┘      └──────────┐
            │                            │
     ┌──────▼──────┐              ┌──────▼──────┐
     │ 预测性扩容    │              │ 响应式扩容   │
     │              │              │             │
     │ 基于历史模式   │              │ 基于实时指标  │
     │ 提前 5-10 分  │              │ 秒级触发     │
     │ 钟扩容        │              │ 应对突发     │
     └──────┬──────┘              └──────┬──────┘
            │                            │
            └────────────┬───────────────┘
                  ┌──────▼──────┐
                  │  GPU 推理池   │
                  │ ┌──┐┌──┐┌──┐│
                  │ │常│││弹│││预││
                  │ │驻│││性│││热││
                  │ └──┘└──┘└──┘│
                  └─────────────┘

关键代码逻辑:

class ElasticScaler:
    """
    弹性扩容控制器。
    结合预测性扩容和响应式扩容两种策略。
    """
    
    def __init__(self, min_instances: int, max_instances: int):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.current_instances = min_instances
    
    def predictive_scale(self, current_time: datetime) -> int:
        """
        预测性扩容:基于历史数据预测未来 10 分钟的负载。
        每天的流量模式高度相似——利用这个规律提前扩容。
        """
        # 查询过去 7 天同一时段的平均峰值
        historical_peak = self._get_historical_peak(
            time_of_day=current_time.time(),
            lookback_days=7,
            percentile=95  # 用 P95 而非平均值,留出余量
        )
        
        needed = self._qps_to_instances(historical_peak)
        target = min(max(needed, self.min_instances), self.max_instances)
        
        # 提前 5 分钟扩容(GPU 实例冷启动需要时间)
        if target > self.current_instances:
            self._scale_up(target)
        
        return target
    
    def reactive_scale(self, current_qps: float, queue_depth: int) -> int:
        """
        响应式扩容:基于实时指标快速反应。
        当队列深度超过阈值时立即扩容。
        """
        utilization = current_qps / self._total_capacity()
        
        if utilization > 0.8 or queue_depth > 1000:
            # 快速扩容:增加 50% 容量
            scale_factor = 1.5
            target = min(
                int(self.current_instances * scale_factor),
                self.max_instances
            )
            self._scale_up(target)
            return target
        
        if utilization < 0.3 and queue_depth == 0:
            # 缩容:但要缓慢缩容,防止抖动
            target = max(
                int(self.current_instances * 0.8),
                self.min_instances
            )
            self._scale_down_gradually(target)
            return target
        
        return self.current_instances
    
    def _scale_up(self, target: int):
        """扩容是激进的——宁可多不可少"""
        self.current_instances = target
        gpu_pool.add_instances(target - self.current_instances)
    
    def _scale_down_gradually(self, target: int):
        """缩容是保守的——等待 10 分钟稳定后再缩"""
        if self._stable_for(minutes=10):
            self.current_instances = target
            gpu_pool.remove_instances(self.current_instances - target)
    # generated by hugo's coding agent

扩缩容的核心原则:扩容要快、缩容要慢(scale up fast, scale down slow)。GPU 实例的冷启动时间可能是几十秒到几分钟,如果等到来不及了才扩容,用户已经超时了。

另一个实战经验:保持一个"温池"(warm pool)。始终有 2-3 个空闲实例处于预热状态,模型已加载到显存,收到请求就能立即推理。这比从零启动快 10 倍以上。

七、策略六:降级策略——最后一道防线

当所有措施都拦不住、流量真的超过了总容量时,你需要降级策略。降级不是"出了故障",而是有计划地放弃一部分请求质量来保住整体可用性

class DegradationController:
    """
    分级降级控制器。
    
    降级等级:
    L0: 正常 —— 全功能服务
    L1: 轻度降级 —— 限制免费用户并发
    L2: 中度降级 —— 免费用户排队,降低模型质量
    L3: 重度降级 —— 仅服务企业用户,其余排队
    L4: 熔断 —— 所有新请求排队,保护存量请求完成
    """
    
    def __init__(self):
        self.current_level = 0
    
    def evaluate(self, metrics: dict) -> int:
        """根据实时指标判断降级等级"""
        utilization = metrics["gpu_utilization"]
        queue_depth = metrics["queue_depth"]
        error_rate = metrics["error_rate"]
        
        if error_rate > 0.1 or queue_depth > 10000:
            return 4  # 熔断
        if utilization > 0.95:
            return 3
        if utilization > 0.85:
            return 2
        if utilization > 0.75:
            return 1
        return 0
    
    def apply_degradation(self, task: dict, level: int) -> dict:
        """根据降级等级调整任务执行策略"""
        tier = task["tier"]
        
        if level == 0:
            return {"action": "execute", "model": task["model"]}
        
        if level == 1:
            if tier == "free":
                return {"action": "execute", "model": task["model"],
                        "max_tokens": min(task.get("max_tokens", 4096), 2048)}
            return {"action": "execute", "model": task["model"]}
        
        if level == 2:
            if tier == "free":
                # 降级到更小的模型
                fallback_model = self._get_fallback_model(task["model"])
                return {"action": "execute", "model": fallback_model,
                        "max_tokens": 1024}
            if tier == "pro":
                return {"action": "execute", "model": task["model"],
                        "max_tokens": min(task.get("max_tokens", 4096), 2048)}
            return {"action": "execute", "model": task["model"]}
        
        if level == 3:
            if tier == "enterprise":
                return {"action": "execute", "model": task["model"]}
            # 非企业用户延迟执行
            return {"action": "defer", "retry_after": 300,
                    "reason": "capacity_pressure"}
        
        # level 4: 全部排队
        if tier == "enterprise":
            return {"action": "execute", "model": task["model"],
                    "max_tokens": min(task.get("max_tokens", 4096), 2048)}
        return {"action": "defer", "retry_after": 600,
                "reason": "system_overload"}
    
    def _get_fallback_model(self, original_model: str) -> str:
        """模型降级映射"""
        fallback_map = {
            "claude-sonnet-4-6": "claude-haiku-4-5-20251001",
            "gpt-4o": "gpt-4o-mini",
            "deepseek-r1": "deepseek-v3",
        }
        return fallback_map.get(original_model, original_model)
    # generated by hugo's coding agent

降级策略的关键原则

  1. 降级要对用户透明:返回 retry_afterreason,让调用方知道发生了什么,而不是静默丢弃
  2. 降级要可恢复:负载下降后自动恢复,不需要人工介入
  3. 降级要有层次:不是"全开或全关",而是逐级降级,影响面从小到大
  4. 企业用户最后降级:他们付了 SLA 的钱,平台要兑现承诺

八、串联整体:调度引擎架构

把六个策略串起来,完整的调度引擎架构如下:

用户 cron 配置
┌──────────────┐    ┌────────────────┐
│  Jitter 注入  │───▶│  分层配额检查    │
│  (策略一)     │    │  (策略二)        │
└──────────────┘    └───────┬────────┘
                    ┌───────▼────────┐
                    │  优先级队列入队   │
                    │  (策略三)        │
                    └───────┬────────┘
              ┌─────────────▼──────────────┐
              │        容量决策器            │
              │  ┌─────────┐ ┌───────────┐ │
              │  │容量规划   │ │弹性扩缩容  │ │
              │  │(策略四)   │ │(策略五)    │ │
              │  └─────────┘ └───────────┘ │
              └─────────────┬──────────────┘
                    ┌───────▼────────┐
                    │  降级决策器      │
                    │  (策略六)        │
                    └───────┬────────┘
                    ┌───────▼────────┐
                    │  MaaS 推理执行   │
                    └────────────────┘

各策略的生效阶段:

阶段策略作用生效条件
入口Jitter 注入时间维度打散所有请求
准入分层配额按层级限流超配额时
排队优先级队列资源分配排序队列有积压时
供给容量规划 + 弹性扩容匹配供需持续运行
兜底降级策略保全局稳定容量不足时

九、总结:平台思维 vs 应用思维

回顾这六个策略,它们有一个共同特征:都是从平台视角出发的系统性设计,而非单个应用的局部优化。

如果你是应用开发者,你的关注点是"我的任务跑得对不对"。但如果你是平台设计者,你的关注点是"十万个任务同时跑的时候,系统还能不能撑住"。

策略核心收益实施难度建议优先级
Jitter 打散削峰 79%+P0,立即做
用户分层差异化服务P0,和打散一起做
优先级队列公平调度P1,有排队就需要
容量规划精准预算P1,省真金白银
弹性扩容应对峰值P1,但收益最大
降级策略最后防线P2,兜底方案

最后说一个经常被忽略的点:给用户可见性。在 OpenClaw 的控制台上,让用户看到自己的任务被打散到了什么时间、当前处于什么优先级、系统的健康状态如何。透明度是信任的基础——用户可以接受"你的任务晚了 2 分钟执行",但无法接受"我的任务不知道为什么没跑"。

好的平台设计不是让问题不发生,而是让问题发生时,影响面最小、恢复最快、用户最不意外。


See also