4.4 Ingestion 链路
学习目标
完成本节后,你将能够:
- 从 SDK/OTel 上报追到 ClickHouse 写入。
- 理解为什么 ingestion 不同步写 ClickHouse。
- 说清楚 S3、Redis、worker、
IngestionService、ClickhouseWriter如何组合。
4.4.1 先给结论
Langfuse ingestion 的设计不是“收到 event -> 直接 insert”。真实链路是:
- 请求侧只做校验、排序、分组、采样、raw event 上传和轻量 job 投递;
- worker 侧做 S3 读取、去重、secondary queue 分流、实体合并、数据补全;
- ClickHouse 写入侧再做按表缓冲、批量 insert、重试、截断和失败计数。
这个拆分是为了高吞吐、可重试、可批量、可重放,并避免用户请求直接承担 ClickHouse 写入延迟和失败。
4.4.2 链路图
4.4.3 请求侧:processEventBatch 做什么
源码:packages/shared/src/server/ingestion/processEventBatch.ts
| 阶段 | 代码意图 | 为什么重要 |
|---|---|---|
| 记录 metrics/span | recordIncrement、recordDistribution、span attributes | ingestion 是高吞吐入口,必须能观察 batch size 和 project scope。 |
| 校验 project scope | authCheck.scope.projectId | 防止无租户归属的 event 进入数据面。 |
| Zod 校验 event | createIngestionEventSchema | SDK payload 是外部输入,必须先变成内部可处理事件。 |
| 权限判断 | isAuthorized | score-only key 只能创建 score,project key 才能写完整事件。 |
| 排序 | sortBatch | create 事件先于 update 事件,降低同批次 merge 的歧义。 |
| 分组 | entityType + event.body.id | 同一 trace/observation/score 的多个事件合并成一个 S3 对象和一个 job。 |
| 路径安全 | safeBlobKeySegment、safeBlobFilenameStem、buildEventBucketPrefix | 防止用户 id 影响 S3 路径结构,也让 producer/consumer 对路径有明确契约。 |
| raw upload | uploadJson(bucketPath, data) | 保存原始事件,worker 可以重放、合并、批量处理。 |
| enqueue | IngestionQueue.getInstance({ shardingKey }).add(...) | job 只带指针和轻量 metadata,不塞完整 body。 |
这里最值得注意的是 bucketPrefix。它由 producer 计算后放入 queue payload,consumer 优先使用它,而不是用 worker 本地 env 重新拼路径。这是跨进程协议设计,不是普通字段。
4.4.4 队列契约:producer 和 consumer 怎么对齐
源码:packages/shared/src/server/queues.ts
IngestionEvent 定义了 producer 和 consumer 共同理解的 payload:
| 字段 | 作用 |
|---|---|
data.type | 判断 trace、observation、score、dataset_run_item 等实体类型。 |
data.eventBodyId | 实体 id,用于 sharding、dedupe 和 merge。 |
data.fileKey | S3 文件名 stem,让 worker 找到 raw JSON。 |
data.skipS3List | OTel observation 或特定项目可以直接读单文件,减少 list 成本。 |
data.forwardToEventsTable | 控制是否写入 v4 events staging/full path。 |
data.bucketPrefix | producer 写 S3 的绝对前缀,避免 consumer 重构路径漂移。 |
authCheck.scope.projectId | tenant scope,worker 后续写入必须带它。 |
这类字段一旦变更,就不是“改一个类型”那么简单。它影响 rolling deploy、历史 job、secondary queue 和 worker 兼容。
4.4.5 worker 侧:ingestionQueueProcessorBuilder 做什么
源码:worker/src/queues/ingestionQueue.ts
| 阶段 | 代码意图 | 设计原因 |
|---|---|---|
| span attributes | 把 job id、projectId、eventBodyId、type、fileKey 放入 tracing | 失败时能从 job 追到租户和实体。 |
获取 ClickhouseWriter | ClickhouseWriter.getInstance() | worker 内共享批量写入器。 |
| 解析 S3 prefix | 优先 payload.data.bucketPrefix,旧 job fallback 到 raw prefix | 兼容 rolling deploy 和老队列。 |
| Redis seen cache | langfuse:ingestion:recently-processed:* | 防止短时间重复处理同一文件。 |
| secondary queue 分流 | env allowlist 或 S3 SlowDown flag | 高吞吐/异常项目可以隔离到 secondary queue。 |
| S3 读取 | skipS3List 走单文件,否则 list prefix 后并发下载 | 在成本和完整性之间取舍。 |
| 设置 seen keys | 下载成功后写 Redis 5 分钟窗口 | 避免快速更新导致重复消费。 |
计算 forwardToEventsTable | payload 优先,env fallback | 支持迁移期开关和双写。 |
| 恢复 canonical id | 从 event body 恢复 raw id,而不是用 sanitized path id | 防止 S3 安全路径影响 ClickHouse row id。 |
调用 IngestionService | mergeAndWrite(...) | processor 不直接做业务合并。 |
这里的 processor 像“运行时协调器”:它处理 job、S3、Redis、secondary queue 和错误语义,但不应该把 ClickHouse record 合并逻辑散落在 processor 里。
4.4.6 业务合并:IngestionService 才是数据转换核心
源码:worker/src/services/IngestionService/index.ts
IngestionService.mergeAndWrite 按实体类型分派:
| 实体 | 方法 | 关键行为 |
|---|---|---|
| trace | processTraceEventList | 合并 trace create/update;补 input/output;写 traces;必要时写 staging observation;触发 TraceUpsertQueue 给 eval 使用。 |
| observation | processObservationEventList | 合并 observation;补 prompt、tool calls、usage、cost;必要时创建 wrapper trace;写 observations 和 staging。 |
| score | processScoreEventList | 校验/补全 score;读取旧 ClickHouse record;合并后写 scores。 |
| dataset_run_item | processDatasetRunItemEventList | 从 Postgres 补 dataset run/item 信息;写 ClickHouse dataset run item 表。 |
它的核心模式是:
- 将同一实体的事件按时间排序;
- 读取 ClickHouse 中可能已经存在的旧 record;
- 用不可变字段规则保护
id、project_id、timestamp 等字段; - 让新事件覆盖可变字段;
- 做模型、usage、cost、prompt、tool metadata 等补全;
- 用 Zod/record schema 校验最终 insert record;
- 交给
ClickhouseWriter,而不是自己 insert。
这就是 ingestion 的“业务内核”。
4.4.7 写入器:ClickhouseWriter 为什么单独存在
源码:worker/src/services/ClickhouseWriter/index.ts
ClickhouseWriter 是 worker 内的批量写入缓冲层:
| 机制 | 代码行为 | 设计原因 |
|---|---|---|
| 单例 | ClickhouseWriter.getInstance() | 多个 processor 共用同一批量写入队列。 |
| 按表队列 | queue[TableName.Traces] 等 | 不同表有不同 record schema,批量写入要分表。 |
| 双触发 flush | batch size 到达或 interval 到达 | 在吞吐和延迟之间折中。 |
JSONEachRow | clickhouseClient().insert({ format: "JSONEachRow" }) | ClickHouse 批量写入友好。 |
| retry/backoff | socket hang up 等可重试错误会 retry | 网络抖动不应直接丢数据。 |
| 超大字段保护 | string length / size error 时拆批或截断 | 防止单条异常 payload 拖垮整批。 |
| Decimal clamp | cost 字段超出 Decimal64 时限幅 | 防止非法成本值导致整批失败。 |
| drop metrics | max attempts 后记录 dropped rows | 失败不是静默发生。 |
如果把这些逻辑写在每个 processor 里,所有后台任务都会重复处理批量、重试、截断和指标。独立的 writer 是为了把 ClickHouse 写入策略集中起来。
4.4.8 为什么是 S3 + Redis + worker + ClickHouse
| 方案 | 问题 |
|---|---|
| HTTP 请求里直接写 ClickHouse | 请求延迟和 ClickHouse 抖动直接暴露给 SDK;无法批量;难重试。 |
| Redis job 里塞完整 event body | payload 大、成本高、队列内存压力大、重放不方便。 |
| 每个 event 单独写 S3 和 job | 小文件和 job 数过多,S3/list/queue 成本高。 |
| worker 直接 insert 单条 | ClickHouse 写入吞吐低,失败处理散落。 |
当前设计的组合是:
- S3/blob 保存原始事件正文;
- Redis/BullMQ 只保存轻量 job 和调度状态;
- worker 做异步合并、重试和隔离;
IngestionService做业务转换;ClickhouseWriter做统一批量写入;- ClickHouse 承载最终分析查询。
这是一条面向高吞吐事件平台的写入路径。
4.4.9 关键不变量
- producer 和 consumer 必须共享同一 queue payload schema。
- worker 不能随意重构 producer 的 S3 路径;优先使用 payload 里的
bucketPrefix。 - ClickHouse row id 应来自 event body 的 canonical id,而不是 sanitized S3 path。
IngestionService负责实体级合并,processor 只做运行时协调。- ClickHouse 写入必须走
ClickhouseWriter,不要在 processor 里散落 insert。 - tenant scope 必须从 authCheck 带到每一次写入。
- 采样、secondary queue、skip S3 list、v4 dual-write 都是运行时策略,不是独立业务分支。