8.2 从零设计一个 Mini Ingestion Infra
学习目标
完成本节后,你将能够:
- 用 Langfuse 的设计模式搭一个简化版高吞吐事件写入系统。
- 理解为什么要拆 schema、raw storage、queue、worker、writer、query builder。
- 把每一步映射回本 repo 的真实实现。
8.2.1 教程路线
本节不是让你真的在当前 repo 新建一个 mini 系统,而是用“可实现的伪代码 + Langfuse 源码对照”训练 infra 拆解能力。
阅读时请固定一个动作:每看到一个缩小版模块,就立刻回答它在 Langfuse 里对应哪个文件、哪个契约、哪个运行时边界。
如果只记住文件名,这节就失败了;如果能解释“为什么要有这一层”,这节才有效。
8.2.2 需求:一个最小可用的事件平台
假设你要做一个简化版平台,支持:
- SDK 上报 event;
- UI 查询 event 列表;
- event 可能很大,包含 input/output/metadata;
- 写入量高,不能让 HTTP 请求等待 OLAP 数据库;
- worker 可以重试;
- 查询时默认不读完整大字段;
- 多租户隔离必须可靠。
这已经足够接近 Langfuse 的核心 infra。
8.2.3 第一步:定义事件契约
缩小版可以这样定义:
const EventSchema = z.object({
id: z.string(),
projectId: z.string(),
type: z.enum(["trace", "observation", "score"]),
timestamp: z.coerce.date(),
body: z.record(z.string(), z.unknown()),
});Langfuse 对照:
- ingestion event schema 在
packages/shared/src/server/ingestion/types.ts; - domain schema 在
packages/shared/src/domain/**; - Public API request/response schema 在
web/src/features/public-api/types/**。
设计要点:
| 要点 | 为什么 |
|---|---|
| runtime schema | SDK payload 是外部输入,TypeScript 类型不够。 |
| projectId / auth scope | 多租户系统必须让每条写入带租户边界。 |
| type | worker 需要按实体类型分派 merge 逻辑。 |
| timestamp | 排序、合并、查询窗口都依赖它。 |
这一步对应的是“系统边界里的语言”。如果这层没有 runtime schema,后面每层都要处理脏数据;如果这层没有 tenant scope,后面每层都可能漏隔离。
8.2.4 第二步:HTTP 入口只做轻工作
缩小版入口不要直接写 OLAP:
POST /events
-> verify api key
-> parse body with Zod
-> group by entity id
-> upload raw JSON to blob storage
-> enqueue job with pointer
-> return accepted resultLangfuse 对照:
web/src/pages/api/public/ingestion.tsweb/src/features/public-api/server/withMiddlewares.tsweb/src/features/public-api/server/createAuthedProjectAPIRoute.tspackages/shared/src/server/ingestion/processEventBatch.ts
关键设计:
| 不要做 | 应该做 |
|---|---|
| HTTP 中同步 insert ClickHouse | 先写 raw payload,再投递 job。 |
| queue payload 放完整 event body | queue 只放 S3 pointer 和 scope。 |
| 每条 event 一个 S3 文件 | 同一 entity 的同批 event 合并为一个对象。 |
入口层的原则是“少做、但必须做对”:auth、schema、scope、采样、幂等线索、raw payload 指针。这些是请求侧才能可靠建立的信息。
8.2.5 第三步:queue payload 是跨进程协议
缩小版 payload:
const EventJob = z.object({
data: z.object({
type: z.enum(["trace", "observation", "score"]),
entityId: z.string(),
fileKey: z.string(),
bucketPrefix: z.string().optional(),
}),
authCheck: z.object({
scope: z.object({
projectId: z.string(),
}),
}),
});Langfuse 对照:
packages/shared/src/server/queues.tsIngestionEvent.data.bucketPrefixQueueName.IngestionQueueQueueJobs.IngestionJobTQueueJobTypes[QueueName.IngestionQueue]
为什么 bucketPrefix optional?因为 Redis 里可能有旧版本 producer 产生的 job。新 worker 必须能消费旧 payload。这就是 rolling deploy 兼容。
你做自己的 infra 时,queue payload 要当成外部 API 对待:它虽然只在内部 Redis 里流动,但它跨进程、跨版本、跨部署窗口。
8.2.6 第四步:worker 做协调,service 做业务
缩小版 worker:
consume job
-> resolve bucketPrefix
-> read raw JSON
-> dedupe recently processed file
-> maybe redirect to secondary queue
-> call EventMergeService
-> call BatchWriter.addToQueueLangfuse 对照:
worker/src/app.ts注册消费者;worker/src/queues/workerManager.ts统一 metrics/error/stalled;worker/src/queues/ingestionQueue.ts协调 S3、Redis、secondary queue;worker/src/services/IngestionService/index.ts做实体合并;worker/src/services/ClickhouseWriter/index.ts做批量写入。
这里的分层很关键:processor 不应该变成所有业务规则的垃圾桶。它应该处理运行时协调;业务转换进入 service;写入策略进入 writer。
Langfuse 的 ingestion processor 就是这个模式:它处理 S3、Redis、secondary queue、span attributes 和错误标记;真正的 trace/observation/score merge 进入 IngestionService。
8.2.7 第五步:写入器集中处理 OLAP 写入策略
缩小版 writer 至少要支持:
- 按表维护队列;
- batch size 触发 flush;
- interval 触发 flush;
- retry/backoff;
- oversized payload 处理;
- 失败指标。
Langfuse 对照 ClickhouseWriter:
| 能力 | 真实实现 |
|---|---|
| 单例 | ClickhouseWriter.getInstance() |
| 按表队列 | queue[TableName.Traces]、queue[TableName.EventsFull] |
| 定时 flush | setInterval(... flushAll()) |
| 批量 insert | format: "JSONEachRow" |
| retry | exponential-backoff |
| 大字段保护 | string length split、size error truncation |
| 成本字段保护 | Decimal64 clamp |
| 失败可见 | rows_dropped metrics 和 dropped ids log |
如果你把这些写在每个 worker processor 里,后续每个后台任务都会重复踩坑。
写入器是系统的“写入策略集中点”。这里决定的是吞吐、延迟、失败隔离和数据丢弃可见性,不是普通 DAO。
8.2.8 第六步:查询默认走轻量投影
缩小版查询不要默认读完整 input/output/metadata:
list page query
-> filter/order/page on lightweight table
-> return list rows
-> when detail opens, fetch full IO by idsLangfuse 对照:
events_core:轻量查询投影;events_full:完整 I/O 和 metadata;EventsQueryBuilder.needsFullTable():决定读 core 还是 full;buildEventsFullTableSplitQuery():先用 core filter/order/limit,再回 full 拿大字段;eventsService.getEventList():列表默认selectIOAndMetadata: false。
这一步能直接降低列表页查询成本。
对初学者来说,这也是最容易忽略的一点:写入表结构不等于查询结构。高吞吐 infra 常常需要“写入完整事实 + 查询轻量投影”两套形状。
8.2.9 第七步:把 UI filter 做成共享语言
缩小版不要让前端拼 SQL,也不要让每个页面发明自己的 filter 形状。
Langfuse 的做法是:
- UI / Search Bar 产生
FilterState; FilterState由 Zod schema 校验;- 后端
createFilterFromFilterStatelower 成 ClickHouse SQL; - column mapping 决定 UI column 对应哪个物理字段。
Langfuse 对照:
packages/shared/src/interfaces/filters.tspackages/shared/src/server/queries/clickhouse-sql/factory.tsweb/src/features/search-bar/README.mdweb/src/features/events/config/filter-config.ts
这层是“查询语言契约”。它把 UI 列、URL 状态、后端 operator、ClickHouse 物理字段连接起来。没有这层,功能会在多个页面里复制,tenant filter 和字段映射也更容易漏。
8.2.10 最小验收清单
如果你照这个模式做自己的 infra,至少要能回答:
- 外部请求在哪里完成 auth 和 tenant scope?
- 外部 payload 是否经过 runtime schema?
- queue payload 是否能兼容 rolling deploy?
- worker 失败是 retry、DLQ、降级还是记录状态?
- raw payload 是否可重放?
- OLAP 写入是否批量化?
- list query 是否避免默认读取大字段?
- filter 是否是共享语言,而不是页面私有状态?
能回答这些问题,才说明你学到的是 infra 设计,而不是复制了 Langfuse 的目录。
8.2.11 反向对照:为什么 Langfuse 更复杂
mini infra 只保留一条 event 链路;Langfuse 真实 repo 还要处理更多维度:
| 复杂度 | Langfuse 为什么需要 |
|---|---|
| 多实体类型 | trace、observation、score、dataset_run_item 的 merge 规则不同。 |
| legacy + v4 双路径 | 迁移期间要同时支持 legacy ClickHouse 表和 v4 events 表。 |
| OTel 摄取 | OTel span 的输入语义和普通 SDK event 不完全一样。 |
| secondary queue | 高吞吐项目或 S3 SlowDown 场景需要隔离,避免拖慢主队列。 |
| Search Bar 语法 | 分析型 UI 需要 URL 可分享、可校验、可 lower 的筛选语言。 |
| Public API contract | 外部用户依赖稳定字段,所以 Fern/generated client 要同步。 |
| 多租户权限 | UI session、API key、worker job 都必须携带 project scope。 |
这也是学习这个 repo 的价值:它不是一个玩具 CRUD,而是一个真实的多运行时、多存储、多租户、高吞吐 observability infra。