5.2 Domain + Zod
学习目标
完成本节后,你将能够:
- 区分 domain schema、API schema、queue schema、DB record schema。
- 理解 Zod 在 Langfuse 里为什么是运行时边界,不只是 TypeScript 类型来源。
- 判断数据转换应该发生在哪个边界。
5.2.1 先给结论
Langfuse 的 Zod schema 不只是为了 z.infer。它承担的是跨边界数据校验:
外部 JSON -> API/ingestion schema -> domain schema -> DB record schema -> API/UI response在平台型系统里,TypeScript 类型只能保护编译期。SDK payload、HTTP body、Redis job、ClickHouse row、Postgres row 都是在运行时到达的,所以需要 Zod 把“不可信输入”变成“内部可处理数据”。
5.2.2 四类 schema
| 类型 | 位置 | 例子 | 作用 |
|---|---|---|---|
| Domain schema | packages/shared/src/domain/** | TraceDomain、ObservationSchema、ScoreSchema | 表示业务概念的稳定形状。 |
| API schema | web/src/features/public-api/types/**、route config | query/body/response schema | 保护外部 API contract。 |
| Queue schema | packages/shared/src/server/queues.ts | IngestionEvent、OtelIngestionEvent | 保护 producer/consumer 跨进程协议。 |
| DB record schema | packages/shared/src/server/repositories/definitions.ts | observationRecordInsertSchema、scoreRecordReadSchema | 保护 ClickHouse row 的读写形状。 |
这些 schema 解决的是不同层的问题,不应该互相替代。
5.2.3 Domain schema:业务语义
源码:
packages/shared/src/domain/traces.tspackages/shared/src/domain/observations.tspackages/shared/src/domain/scores.ts
典型模式:
export const TraceDomain = z.object({
id: z.string(),
name: z.string().nullable(),
timestamp: z.date(),
environment: z.string(),
tags: z.array(z.string()),
projectId: z.string(),
});
export type TraceDomain = z.infer<typeof TraceDomain>;domain schema 表达的是产品概念,例如:
- observation 有
type、level、startTime、endTime、input、output、usage/cost; - score 有 source、dataType、trace/session/dataset/observation 关联;
- trace 有 session、user、metadata、tags、bookmarked/public。
注意:domain schema 不等于 DB row。domain 用 camelCase 和 JS Date;ClickHouse record 用 snake_case、epoch number 或 ClickHouse date string。
5.2.4 API schema:外部承诺
Public API 的 schema 包含 query/body/response。它们的职责是:
- 过滤掉外部请求里不合法的字段;
- 给 SDK 和用户稳定响应;
- 在 development 环境发现 response schema drift;
- 和 Fern contract 对齐。
例如 createAuthedProjectAPIRoute 要求每条 route 显式传 responseSchema。这不是多余校验,而是外部 contract 的运行时保护。
5.2.5 Queue schema:跨进程协议
packages/shared/src/server/queues.ts 里的 schema 面向 Redis/BullMQ job。
以 IngestionEvent 为例:
data.type
data.eventBodyId
data.fileKey
data.skipS3List
data.forwardToEventsTable
data.bucketPrefix
authCheck.scope.projectIdproducer 在 request side 或 shared service;consumer 在 worker。它们不共享内存,只共享 payload。所以 queue schema 必须能回答:
- consumer 如何找到 S3 文件?
- tenant scope 从哪里来?
- 是否跳过 S3 list?
- 是否写入 v4 events table?
- rolling deploy 中旧 job 没有新字段时怎么办?
这就是为什么 queue 字段新增常常先 optional(),再在 consumer 写 fallback。
5.2.6 DB record schema:读写形状
源码:packages/shared/src/server/repositories/definitions.ts
ClickHouse record schema 常分成 read 和 insert:
| schema | 形状 | 原因 |
|---|---|---|
observationRecordInsertSchema | timestamp 字段是 number,适合写入 | worker/ClickhouseWriter 插入 ClickHouse。 |
observationRecordReadSchema | timestamp 字段是 ClickHouse string,再 transform 成 ISO datetime | ClickHouse client 读取时返回字符串。 |
scoreRecordInsertSchema / scoreRecordReadSchema | score row 写入/读取形状不同 | 读写边界分离。 |
eventsObservationRecordReadSchema | events table 额外带 user/session/trace denormalized 字段 | v4 events 查询需要更多上下文。 |
这说明 Zod schema 也在处理物理存储差异。不要假设 ClickHouse row 和 domain object 一样。
5.2.7 数据转换边界
常见转换:
| 转换 | 发生位置 |
|---|---|
| 外部 JSON -> ingestion event | processEventBatch、ingestion schema。 |
| Public API body -> route input | createAuthedProjectAPIRoute。 |
| loose event list -> strict ClickHouse record | IngestionService。 |
| ClickHouse row -> domain/read model | repositories 和 read schema。 |
| domain/read model -> client-safe response | public API response schema 或 UI domain converter。 |
边界越清楚,越容易测试和迁移。
5.2.8 改 schema 时的风险面
| 改动 | 要同步检查 |
|---|---|
| 新增 domain 字段 | API response、UI display、DB record、tests。 |
| 新增 ingestion 字段 | SDK payload、Zod schema、S3 raw event、worker merge、ClickHouse writer。 |
| 新增 queue 字段 | producer、consumer、old jobs、secondary queue、DLQ retry。 |
| 新增 ClickHouse 列 | migrations、insert schema、read schema、query field map。 |
| 新增 Public API response field | responseSchema、Fern、generated client、docs。 |
| 改 score/observation enum | filters、search-bar field registry、query builder、UI labels。 |
5.2.9 自检清单
- 这个 schema 是 API、domain、queue 还是 DB record?
- 输入是否来自不可信边界?
- 是否需要 runtime parse,而不只是 TS type?
- 是否跨进程或跨部署版本?
- 是否有旧数据或旧 job 缺字段?
- 读 schema 和写 schema 是否应该分开?
- Public API 是否需要同步 Fern?