4.5 Worker 队列
学习目标
完成本节后,你将能够:
- 理解
worker/src/app.ts如何把 queue contract、processor 和 BullMQ options 组装起来。 - 读懂
WorkerManager.register的运行时职责。 - 判断一个后台任务应该如何定义 payload、producer、consumer、失败语义和验证。
4.5.1 先给结论
Worker 队列链路不是“写一个 processor”就结束。完整工作模型是:
shared queue contract -> producer add job -> Redis/BullMQ -> worker app registers consumer -> WorkerManager wraps metrics/error -> processor -> service/side effect在这个模型里:
packages/shared/src/server/queues.ts定义 job 的名字和 payload;packages/shared/src/server/redis/**提供 typed queue class 和 sharding;- producer 调用 queue
.add(...); worker/src/app.ts根据 env flag 注册 processor;WorkerManager创建 BullMQWorker,包装 metrics、OpenTelemetry、error/stalled hooks;- processor 只做 orchestration,复杂业务应下沉到 service。
4.5.2 总链路图
4.5.3 queues.ts:队列协议源头
源码:packages/shared/src/server/queues.ts
它分三层:
| 层 | 作用 |
|---|---|
| payload schema | 用 Zod 描述 job data,例如 IngestionEvent、OtelIngestionEvent、DatasetQueueEventSchema。 |
QueueName / QueueJobs | 固定 Redis/BullMQ 队列名和 job 名。 |
TQueueJobTypes | 从 queue name 映射到 { timestamp, id, payload, name, retryBaggage }。 |
为什么集中定义?因为 producer 和 consumer 不在同一个运行时:
如果 producer 和 consumer 各自定义 payload,滚动部署时就容易出现新旧版本漂移。
4.5.4 shared Redis queue class:队列实例和 sharding
源码:packages/shared/src/server/redis/**
队列 class 负责创建 typed BullMQ queue,例如 ingestion:
IngestionQueue.getShardNames()生成 shard queue names;IngestionQueue.getInstance({ shardingKey })根据 sharding key 选择 shard;SecondaryIngestionQueue提供隔离队列;- OTel、eval、LLM-as-judge、code eval 等队列也有类似模式。
这层的意义是:producer 不直接 new Queue(...),而是通过 shared queue class 使用统一 Redis options、queue name 和 sharding 规则。
4.5.5 worker/src/app.ts:运行时装配
worker/src/app.ts 的核心不是业务处理,而是装配:
| 装配项 | 例子 |
|---|---|
| env gate | QUEUE_CONSUMER_INGESTION_QUEUE_IS_ENABLED、QUEUE_CONSUMER_EVAL_EXECUTION_QUEUE_IS_ENABLED。 |
| queue shard | IngestionQueue.getShardNames()、EvalExecutionQueue.getShardNames()。 |
| processor builder | ingestionQueueProcessorBuilder(true)、evalJobExecutorQueueProcessorBuilder(...)。 |
| BullMQ options | concurrency、limiter、lockDuration、stalledInterval、maxStalledCount。 |
| scheduled queue instantiation | export、metering、retention 等定时任务会先 getInstance()。 |
| long-running cleaners | project cleaner、retention cleaner、deleted mask cleaner、queue metrics runner、monitor runner。 |
所以代码存在不代表当前部署一定在消费。是否运行要看 env flag。
4.5.6 WorkerManager.register 做了什么
源码:worker/src/queues/workerManager.ts
WorkerManager.register(queueName, processor, options) 做几件事:
- 防止重复注册同名 worker;
- 用
createBullMQWorkerOptionsWithRedis获取 Redis-backed worker options; - 创建 BullMQ
Worker; - 包装 processor,记录 wait time、processing time、queue depth、failed count、active count;
- 从 job payload 里提取 projectId,放入 OpenTelemetry / ClickHouse context;
- 监听
failed、error、stalled事件,记录日志、trace exception 和 metrics; - 暴露
closeWorkers、getWorker、getRegisteredQueueNames。
这说明 WorkerManager 是队列运行时的统一控制面,不是普通 factory。
4.5.7 concurrency、limiter、lock 的含义
| 配置 | 作用 | 何时重要 |
|---|---|---|
concurrency | 单个 worker 同时处理多少 job | 控制吞吐。 |
limiter.max/duration | 全局速率限制 | 防止外部 API、ClickHouse delete、heavy job 被打爆。 |
lockDuration | job lock 多久没续约算可能 stalled | 长任务、CPU 抖动、外部调用慢时重要。 |
stalledInterval | 多久检查 stalled job | 降低频繁检查带来的噪声。 |
maxStalledCount | stalled 后最多重试次数 | 防止锁过期导致无限重复。 |
例如 eval execution、PostHog/Mixpanel integration、trace delete 等长任务会显式设置 lockDuration、stalledInterval 和 maxStalledCount。
4.5.8 失败语义
Worker 里的错误不能一概 throw,也不能一概吞掉。
| 情况 | 倾向处理 |
|---|---|
| 临时网络错误、S3 SlowDown、外部服务短暂不可用 | throw,让 BullMQ retry;必要时标记 secondary queue。 |
| 用户配置错误、API key 无效、目标资源不存在 | 记录业务失败状态,job 可结束。 |
| 永久非法 payload | 不要无限 retry,记录错误并进入可观察状态。 |
| 大规模删除、导出、retention | 使用 limiter 和低 concurrency,保护 DB/ClickHouse。 |
| job lock 可能过期 | 调整 lockDuration、stalledInterval、maxStalledCount。 |
以 ingestion 为例,S3 SlowDown 会 markProjectS3Slowdown 并让项目进入 secondary queue 路径;普通处理错误会 markProjectIngestFailure 后 throw。
4.5.9 新增 queue 的标准路线
- 在
packages/shared/src/server/queues.ts增加 payload schema、QueueName、QueueJobs、TQueueJobTypes。 - 如需 producer 统一创建 queue,在
packages/shared/src/server/redis/**增加 queue class。 - 在 producer 处调用
.add(QueueJobs.X, { ... }),job id 和 sharding key 要稳定。 - 在
worker/src/queues/**增加 processor,复杂逻辑放 service。 - 在
worker/src/app.ts按 env flag 注册,设置 concurrency/limiter/locks。 - 为 rolling deploy 考虑 optional 字段和 fallback。
- 写测试覆盖 schema、producer、processor 成功/失败路径。