NDJSON 与 JSON Lines:在日志采集和 ETL 中的最佳实践

JsonTool
订阅
充电
|

适用对象:后端/平台工程师、数据工程师、SRE
场景:Web 访问日志、API 网关、容器 stdout、实时/离线 ETL

先给结论

  • 每行一个 JSON 对象(NDJSON/JSON Lines) 是面向流的天然格式:边界清晰、内存友好、易切分、易回放
  • 对比「漂亮打印的多行 JSON」或「一大坨 JSON 数组」:NDJSON 更抗截断粘包日志轮转单条体积异常等生产环境常见问题。
  • 在 Nginx 上启用 escape=jsonlog_format 配置,配合 Fluent Bit 的 tail + json parser,可以无痛接入 ES/OpenSearch、Kafka、S3/Loki 等下游。

为什么 NDJSON 更好用

  1. 记录边界天然可见\n 即一条。切分、并行处理、断点续传都简单(tail -F | split | parallel)。
  2. 流式处理:一次一条进内存,ETL/校验/聚合对超大日志文件更友好,不用整块解析。
  3. 容错性:单条损坏不拖累整批(不像数组里某一条坏数据会让整体 JSON 失效)。
  4. 生态配套jq | awk | sed | grep 直接上手;Fluent Bit / Logstash / Vector / Beam / Spark 都天然支持行分隔输入。
  5. 平台现实:日志轮转、容器重启、Sidecar 复位时,按行 checkpoint 更稳(Fluent Bit 自带 offset DB)。

类比:持续出厂的零件“单件质检”比“最后把所有件焊起来再一起质检”更容易定位问题与回滚。

字段与约定(推荐模板)

  • 时间戳ts_unix(秒)或 ts_unix_ms(毫秒,数字)——避免字符串解析差异。
  • 幂等/追踪request_idtrace_idspan_id
  • 请求上下文methodpathstatuslatency_msbytes_sentupstream_addr
  • 环境/归属serviceenvhostpod
  • 类型稳定:同一字段永远一种类型(避免 ES 映射冲突)。
  • 编码:UTF-8、LF(\n),不要 BOM
  • 最大行长:限制在可接收上限内(如 32KB/64KB),异常字段做截断与 truncated=true 标记。

示例(单行):

1
{"ts_unix_ms": 1724554834123, "service":"web", "env":"prod", "request_id":"b0f9...", "method":"GET", "path":"/api/v1/items", "status":200, "latency_ms":12.7, "bytes_sent":5321, "upstream_addr":"10.0.0.12:8080", "ip":"203.0.113.10", "ua":"Mozilla/5.0"}

在 Nginx 落地:开启 NDJSON 访问日志

需求:一条请求产出一行合法 JSON,自动转义引号/换行,字段类型尽量是数字。

1)定义 NDJSON 日志格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# nginx.conf/http{}
log_format ndjson escape=json
'{'
'"ts_unix_ms":$msec*1000,'
'"service":"$server_name",'
'"env":"$hostname",'
'"request_id":"$request_id",'
'"method":"$request_method",'
'"path":"$uri",'
'"query":"$args",'
'"protocol":"$server_protocol",'
'"status":$status,'
'"bytes_sent":$bytes_sent,'
'"latency_ms":$request_time*1000,'
'"upstream_time":"$upstream_response_time",'
'"upstream_addr":"$upstream_addr",'
'"ip":"$remote_addr",'
'"xff":"$http_x_forwarded_for",'
'"referer":"$http_referer",'
'"ua":"$http_user_agent"'
'}';

access_log /var/log/nginx/access.ndjson ndjson;

说明:

  • escape=json 会把引号/换行等危险字符转义,确保一行一个对象
  • $msec(秒.毫秒)乘以 1000 得到 ts_unix_ms(数字)。
  • 业务可按需加字段,如 routetenant_idtraceparent$http_traceparent)。

2)确保 request_id

1
2
3
4
5
6
# 在 server 或 http 作用域
map $http_x_request_id $req_id {
default $request_id; # 无上游X-Request-ID则用nginx自带
"~.+ " $http_x_request_id;
}
# 然后把 "request_id":"$req_id" 写入 log_format

在 Fluent Bit 落地:采集、解析、投递

1)输入:Tail NDJSON 文件

1
2
3
4
5
6
7
8
9
10
[INPUT]
Name tail
Path /var/log/nginx/access.ndjson
Tag nginx.access
Parser ndjson
DB /var/log/flb_nginx.db
Mem_Buf_Limit 20MB
Skip_Long_Lines On
Refresh_Interval 5
Rotate_Wait 10

2)解析器:JSON + 时间

如果你用了 ts_unix_ms(数字),推荐直接沿用日志里的时间,不做字符串解析。

1
2
3
4
5
6
[PARSER]
Name ndjson
Format json
Time_Key ts_unix_ms
Time_Format %s.%L # 支持秒.毫秒
Time_Keep On

说明:ts_unix_ms 是“毫秒整数”,而 %s.%L 期望“秒.毫秒”。两种方式二选一:

  • 方式 A(简单):把 Nginx 写成 "ts":"$msec"(例如 1724554834.123),解析用 %s.%L
  • 方式 B(整数毫秒):继续写 "ts_unix_ms":$msec*1000,并 不设置 Time_Key(由 Fluent Bit 赋值采集时间),或使用 Lua Filter把毫秒转为秒.毫秒再赋给 time。多数场景方式 A 更省事

示例(方式 A,更推荐):

1
2
3
4
5
6
7
"log_format" 里写 ->  '"ts":"$msec", ...'
[PARSER]
Name ndjson
Format json
Time_Key ts
Time_Format %s.%L
Time_Keep On

3)可选:规范/补充字段

1
2
3
4
5
6
[FILTER]
Name modify
Match nginx.access
Add service web
Add env prod
Rename upstream_time latency_upstream

4)输出:示例到 OpenSearch / Kafka / Loki

OpenSearch/Elasticsearch

1
2
3
4
5
6
7
8
9
[OUTPUT]
Name es
Match nginx.access
Host opensearch.logging.svc
Port 9200
Index nginx-access-%Y-%m-%d
Logstash_Format On
Replace_Dots On
Suppress_Type_Name On

Kafka

1
2
3
4
5
6
[OUTPUT]
Name kafka
Match nginx.access
Brokers kafka-0:9092,kafka-1:9092
Topics nginx.access
Format json

Loki

1
2
3
4
5
6
7
[OUTPUT]
Name loki
Match nginx.access
Host loki.logging.svc
Port 3100
Labels job=nginx,service=web,env=prod
Line_Format json

ETL 实战:一条命令解决 80% 需求

筛选 5xx

1
cat access.ndjson | jq -c 'select(.status >= 500)'

按 path 聚合 P95 延迟(示意,小文件可用)

1
2
3
4
5
6
7
8
cat access.ndjson \
| jq -r '[.path, (.latency_ms|tonumber)] | @tsv' \
| awk -F'\t' '{k[$1]=k[$1] $2 " " } END { for (p in k) {
n=split(k[p],a," ");
asort(a);
idx=int(0.95*n); if(idx<1) idx=1;
print p, a[idx]
}}' | column -t

裁剪字段/脱敏

1
2
jq -c '{ts, method, path, status, latency_ms, request_id}'
jq -c 'del(.ua, .referer, .xff)'

重放到 Kafka

1
cat access.ndjson | kafka-console-producer --broker-list ... --topic nginx.access

常见问题

  1. 多行内容(如后端异常栈)
    • 在 Nginx 访问日志基本无此问题;应用日志务必把换行转义为 \n
    • 若必须多行,使用 Fluent Bit Multiline Parser,但那就不再是 NDJSON 了。
  2. 字段类型不稳定
    • status/bytes_sent/latency_ms 均应为数字。不要今天 "200" 明天 200
    • 为 ES/OpenSearch 提前准备 Index Template(禁用 dynamic 或显式 mapping)。
  3. 时间解析失败
    • 最稳妥:日志里放 "$msec"(秒.毫秒),解析用 %s.%L
    • ISO 8601 的 +08:00 时区冒号有时会踩 parser 兼容坑。
  4. 超长行
    • Nginx 层截断异常字段并打 truncated=true 标签;Fluent Bit 开启 Skip_Long_Lines On + 监控告警。
  5. CRLF/编码问题
    • 统一 LF 与 UTF-8,无 BOM。跨平台复制文件要注意 \r
  6. 日志轮转与丢失
    • 使用 Fluent Bit DB 持久化 offset;Rotate_Wait 给下游一点时间;避免 copytruncate 带来的竞争,优先 create 模式。

用 JSON Schema 校验格式

示例(片段):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"$schema":"https://json-schema.org/draft/2020-12/schema",
"type":"object",
"required":["ts","method","path","status","latency_ms"],
"properties":{
"ts":{"type":"string","pattern":"^\\d+\\.\\d{3}$"},
"method":{"type":"string"},
"path":{"type":"string"},
"status":{"type":"integer"},
"latency_ms":{"type":"number","minimum":0},
"bytes_sent":{"type":"integer","minimum":0}
},
"additionalProperties":true
}

在 CI/ETL 入口用 ajv/jsonschema 批量校验,避免“幽灵字段”与类型漂移。

小结

  • NDJSON 把「日志是流」这件事落到了最朴素的工程抽象:一行一个事件
  • Nginx escape=json + Fluent Bit tail + json parser 的组合,既能快速上线,又为后续扩展(ES/Kafka/S3/Loki)留足余地。
  • 把时间与类型定死、限制行长、规范字段命名,你的日志与 ETL 就会少大半“灵异事件”。