ELK 与 Filebeat 日志采集
大约 13 分钟约 3893 字
ELK 与 Filebeat 日志采集
简介
在生产环境里,日志采集的重点不是"把文件传走",而是让结构化日志、采集规则、字段标准、索引生命周期和排障链路形成一套完整体系。Filebeat 作为轻量级采集器,通常部署在应用节点、容器宿主机或边缘服务器上,把日志可靠地送入 Elasticsearch / Logstash / Kafka,再配合 Kibana 做查询与可视化分析。
一个成熟的日志体系需要解决以下核心问题:
- 结构化:日志是否为 JSON 或其他可解析格式,还是无结构的纯文本?
- 标准化:不同服务的日志是否遵循统一的字段命名规范?
- 可靠性:采集链路是否有背压保护、重试机制和监控告警?
- 成本控制:索引生命周期管理(ILM)是否合理?存储成本是否可控?
- 可检索性:关键字段是否建立了合适的索引模板?查询效率如何?
特点
日志架构设计
常见日志架构模式
# 模式一:Filebeat -> Elasticsearch(简单直接)
# 适合:小规模、日志量不大、不需要复杂处理
[应用] -> [Filebeat] -> [Elasticsearch] -> [Kibana]
# 模式二:Filebeat -> Logstash -> Elasticsearch(推荐)
# 适合:中等规模、需要日志解析转换
[应用] -> [Filebeat] -> [Logstash] -> [Elasticsearch] -> [Kibana]
# 模式三:Filebeat -> Kafka -> Logstash -> Elasticsearch(大规模)
# 适合:大规模、高吞吐、多消费方
[应用] -> [Filebeat] -> [Kafka] -> [Logstash] -> [Elasticsearch] -> [Kibana]
|
v
[其他消费方]
# 模式四:Filebeat -> Elasticsearch(K8s 内置模式)
# 适合:K8s 环境、使用 Elasticsearch Operator
[Pod] -> [Filebeat Sidecar/DaemonSet] -> [Elasticsearch] -> [Kibana]日志格式规范
// 推荐的 JSON 日志格式
{
"timestamp": "2026-04-12T10:00:00.000+08:00",
"level": "INFO",
"service": "order-api",
"env": "production",
"traceId": "abc123def456",
"spanId": "789xyz",
"userId": "user-10001",
"method": "POST",
"path": "/api/v1/orders",
"statusCode": 200,
"durationMs": 45,
"message": "order created successfully",
"host": "order-api-pod-7d8f9",
"version": "v2.3.1"
}// 错误日志格式
{
"timestamp": "2026-04-12T10:00:03.000+08:00",
"level": "ERROR",
"service": "payment-api",
"env": "production",
"traceId": "abc124def457",
"spanId": "790xyz",
"error": {
"type": "DBConnectionTimeout",
"message": "database connection timeout after 3000ms",
"stackTrace": "com.example.db.ConnectionPool.acquire(ConnectionPool.java:123)\n..."
},
"message": "payment processing failed",
"host": "payment-api-pod-3a2b1"
}# 推荐的文本日志格式(不适合 JSON 的场景)
2026-04-12T10:00:00.000+08:00 INFO [http-nio-8080-exec-1] c.e.order.OrderService - order created orderId=10001 userId=user-10001 durationMs=45 traceId=abc123
2026-04-12T10:00:03.000+08:00 ERROR [http-nio-8080-exec-2] c.e.payment.PaymentService - db timeout errorType=DBConnectionTimeout traceId=abc124
java.sql.SQLTimeoutException: Timeout after 3000ms
at com.example.db.ConnectionPool.acquire(ConnectionPool.java:123)
at com.example.payment.PaymentService.process(PaymentService.java:456)实现
Filebeat 基础配置与应用日志采集
# /etc/filebeat/filebeat.yml
# 全局配置
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
reload.period: 10s
# 日志输入配置
filebeat.inputs:
- type: filestream
id: app-log
enabled: true
paths:
- /var/log/myapp/*.log
# 多行日志处理(Java 异常堆栈、Go panic 等)
parsers:
- multiline:
type: pattern
pattern: '^\d{4}-\d{2}-\d{2}'
negate: true
match: after
# 排除不需要的日志文件
exclude_files: ['.gz$', '.tmp$', 'debug-.*']
# 清理过期文件
clean_inactive: 72h
ignore_older: 24h
# 自定义字段
fields:
service: order-api
env: prod
team: backend
region: cn-east
fields_under_root: true
# 处理器链
processors:
# 添加主机元数据
- add_host_metadata:
when.not.contains.tags: forwarded
# 添加云元数据(AWS/GCP/Azure)
- add_cloud_metadata: ~
# 添加 Docker 元数据
- add_docker_metadata: ~
# 删除不需要的字段(节省存储)
- drop_fields:
fields: ["agent.ephemeral_id", "agent.id", "agent.type", "agent.version", "ecs.version"]
ignore_missing: true
# 设置时间戳
- timestamp:
field: timestamp
layouts:
- '2006-01-02T15:04:05.000Z07:00'
- '2006-01-02 15:04:05'
test:
- '2026-04-12T10:00:00.000+08:00'
# 添加日志来源标记
- add_tags:
tags: ["app-log", "production"]
# Elasticsearch 输出
output.elasticsearch:
hosts: ["http://10.0.0.11:9200", "http://10.0.0.12:9200"]
# 负载均衡
loadbalance: true
# 索引命名
index: "app-logs-%{[service]}-%{[env]}-%{+yyyy.MM.dd}"
# ILM 策略
ilm.enabled: true
ilm.rollover_alias: "app-logs"
ilm.pattern: "{now/d}-000001"
ilm.policy_name: "app-logs-policy"
# 认证
username: "filebeat_internal"
password: "${ELASTICSEARCH_PASSWORD}"
# 性能调优
worker: 4
bulk_max_size: 2048
flush_interval: 5s
# 压缩
compression_level: 3
# Kibana 连接配置
setup.kibana:
host: "http://10.0.0.13:5601"
# 索引模板配置
setup.template.settings:
index.number_of_shards: 3
index.number_of_replicas: 1
index.codec: best_compression
# 日志级别
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0640# 应用日志建议为 JSON,便于直接解析
filebeat.inputs:
- type: filestream
id: app-json-log
paths:
- /var/log/myapp/json/*.log
# JSON 解析配置
json.keys_under_root: true
json.add_error_key: true
json.message_key: message
json.overwrite_keys: true
# JSON 解析失败时保留原始消息
json.expand_keys: true
fields:
service: payment-api
env: prod
team: payment
fields_under_root: true
# 按 service 路由到不同索引// JSON 日志示例
{"timestamp":"2026-04-12T10:00:00+08:00","level":"INFO","traceId":"abc123","message":"order created","orderId":10001}
{"timestamp":"2026-04-12T10:00:03+08:00","level":"ERROR","traceId":"abc124","message":"db timeout","durationMs":3200}# 校验配置
filebeat test config -c /etc/filebeat/filebeat.yml
# 测试输出连接
filebeat test output -c /etc/filebeat/filebeat.yml
# 启动 Filebeat
systemctl enable --now filebeat
systemctl status filebeat
# 查看 Filebeat 注册表(文件读取位置)
cat /var/lib/filebeat/registry/filebeat/log.json | python3 -m json.tool
# 手动触发模块加载
filebeat modules list
filebeat modules enable nginx mysql
# 查看 Filebeat 版本
filebeat version容器日志采集与 processors 处理
# 采集 Docker / containerd 容器日志
filebeat.inputs:
- type: container
enabled: true
paths:
- /var/log/containers/*.log
stream: all # all = stdout + stderr
# 多行处理(容器日志通常单行输出,但某些框架仍有多行)
parsers:
- multiline:
type: pattern
pattern: '^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}'
negate: true
match: after
processors:
# 添加 Kubernetes 元数据
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
# 额外提取 Labels 和 Annotations
default_indexers_enabled: true
default_matchers_enabled: true
# 添加容器元数据
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
match_fields: ["systemd_unit"]
match_source: true
# 添加自定义字段
- add_fields:
target: ''
fields:
source_type: kubernetes
cluster: production-cluster
# 重新映射字段(将 container.labels 映射到顶层)
- rename:
fields:
- from: "kubernetes.labels.app"
to: "service"
- from: "kubernetes.labels.env"
to: "env"
ignore_missing: true
fail_on_error: false# Kubernetes DaemonSet 部署 Filebeat
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: filebeat
namespace: kube-system
labels:
k8s-app: filebeat
spec:
selector:
matchLabels:
k8s-app: filebeat
template:
metadata:
labels:
k8s-app: filebeat
spec:
serviceAccountName: filebeat
terminationGracePeriodSeconds: 30
containers:
- name: filebeat
image: docker.elastic.co/beats/filebeat:8.12.0
args: [
"-c", "/etc/filebeat.yml",
"-e",
]
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: ELASTICSEARCH_PASSWORD
valueFrom:
secretKeyRef:
name: elasticsearch-secrets
key: password
securityContext:
runAsUser: 0
resources:
limits:
memory: 500Mi
cpu: 500m
requests:
memory: 200Mi
cpu: 100m
volumeMounts:
- name: config
mountPath: /etc/filebeat.yml
readOnly: true
subPath: filebeat.yml
- name: data
mountPath: /usr/share/filebeat/data
- name: varlog
mountPath: /var/log
readOnly: true
- name: containers
mountPath: /var/lib/docker/containers
readOnly: true
- name: podlogs
mountPath: /var/log/containers
readOnly: true
volumes:
- name: config
configMap:
name: filebeat-config
- name: data
hostPath:
path: /var/lib/filebeat-data
- name: varlog
hostPath:
path: /var/log
- name: containers
hostPath:
path: /var/lib/docker/containers
- name: podlogs
hostPath:
path: /var/log/containers处理器详解
# 使用 dissect 从文本日志中提取字段
# dissect 比 grok 更快,适合固定格式的日志
processors:
- dissect:
tokenizer: "%{log_time} %{level} [%{thread}] %{logger} - %{message}"
field: "message"
target_prefix: "parsed"
- rename:
fields:
- from: "parsed.level"
to: "log.level"
- from: "parsed.logger"
to: "log.logger"
- from: "parsed.thread"
to: "log.thread"
ignore_missing: true
- convert:
fields:
- { from: "log.level", to: "log.level", type: "string" }
ignore_missing: true# 使用 grok 处理复杂格式(比 dissect 灵活但更慢)
processors:
- grok:
field: "message"
patterns:
- "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \\[%{DATA:thread}\\] %{JAVACLASS:logger} - %{GREEDYDATA:message}"
ignore_missing: true# 使用 script 处理器进行自定义逻辑(性能较差,谨慎使用)
processors:
- script:
lang: javascript
source: >
var msg = ctx.message;
if (msg && msg.indexOf('health check') >= 0) {
ctx.tags = ctx.tags || [];
ctx.tags.push('health-check');
}# 过滤无价值日志,减少 ES 压力
processors:
# 丢弃健康检查日志
- drop_event:
when:
or:
- contains:
message: "health check ok"
- contains:
message: " readiness probe"
- equals:
log.level: "DEBUG"
- equals:
log.level: "TRACE"
# 丢弃特定服务的低价值日志
- drop_event:
when:
and:
- equals:
service: "metrics-exporter"
- contains:
message: "scrape completed"# PII 脱敏处理器
processors:
# 手机号脱敏
- script:
lang: javascript
source: >
function maskPII(text) {
if (!text) return text;
return text.replace(/1[3-9]\d{9}/g, '1***${text.slice(-4)}');
}
if (ctx.message) {
ctx.message = maskPII(ctx.message);
}
if (ctx.request && ctx.request.body) {
ctx.request.body = maskPII(ctx.request.body);
}Logstash 处理管道
# /etc/logstash/conf.d/app.conf
# 输入
input {
beats {
port => 5044
# SSL 配置
ssl => true
ssl_certificate => "/etc/logstash/ssl/cert.pem"
ssl_key => "/etc/logstash/ssl/key.pem"
}
}
# 过滤
filter {
# 按 service 路由
if [service] == "order-api" {
mutate {
add_field => { "[@metadata][target_index]" => "order-api-%{+YYYY.MM.dd}" }
}
# 解析 User-Agent
useragent {
source => "http_user_agent"
target => "user_agent"
}
}
if [service] == "payment-api" {
mutate {
add_field => { "[@metadata][target_index]" => "payment-api-%{+YYYY.MM.dd}" }
}
# 解析持续时间
grok {
match => { "durationMs" => "%{NUMBER:duration:int}" }
}
}
# GeoIP 解析(安全审计场景)
if [client_ip] {
geoip {
source => "client_ip"
target => "geoip"
}
}
# 时间戳解析
date {
match => ["timestamp", "ISO8601", "yyyy-MM-dd HH:mm:ss.SSS"]
target => "@timestamp"
}
# 删除不需要的字段
mutate {
remove_field => ["headers", "host"]
}
}
# 输出
output {
elasticsearch {
hosts => ["http://10.0.0.11:9200"]
index => "%{[@metadata][target_index]}"
# ILM 配置
ilm_enabled => true
ilm_rollover_alias => "app-logs"
ilm_pattern => "{now/d}-000001"
ilm_policy => "app-logs-policy"
}
# 调试输出(生产环境关闭)
# stdout { codec => rubydebug }
}# /etc/logstash/conf.d/nginx.conf — Nginx 日志处理
input {
beats {
port => 5045
}
}
filter {
if [type] == "nginx-access" {
grok {
match => { "message" => '%{IPORHOST:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{DATA:path} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:bytes} "%{DATA:referer}" "%{DATA:user_agent}" %{NUMBER:request_time}' }
}
mutate {
convert => { "bytes" => "integer" }
convert => { "request_time" => "float" }
convert => { "status" => "integer" }
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
# 状态码分类
if [status] >= 500 {
mutate { add_tag => ["server_error"] }
} else if [status] >= 400 {
mutate { add_tag => ["client_error"] }
}
}
}
output {
elasticsearch {
hosts => ["http://10.0.0.11:9200"]
index => "nginx-access-%{+YYYY.MM.dd}"
}
}Kafka 输出与日志总线
# Filebeat 输出到 Kafka
output.kafka:
hosts: ["10.0.0.31:9092", "10.0.0.32:9092", "10.0.0.33:9092"]
topic: "logs-%{[service]}"
# 消息 Key(用于分区路由)
key: "%{[service]}"
# JSON 编码
codec.json:
pretty: false
# Kafka 生产者配置
required_acks: 1
max_message_bytes: 1000000
# 背压配置
bulk_max_size: 2048
worker: 4
# SSL 配置
ssl.certificate_authorities: ["/etc/filebeat/kafka-ca.crt"]
ssl.certificate: "/etc/filebeat/kafka-client.crt"
ssl.key: "/etc/filebeat/kafka-client.key"
# SASL 认证
username: "filebeat"
password: "${KAFKA_PASSWORD}"
sasl.mechanism: PLAIN# Logstash 从 Kafka 消费
input {
kafka {
bootstrap_servers => "10.0.0.31:9092,10.0.0.32:9092"
topics => ["logs-order-api", "logs-payment-api"]
group_id => "logstash-consumer"
consumer_threads => 4
codec => json
# 从最早开始消费(新消费者组)
start_from_beginning => false
# SSL 配置
security_protocol => "SSL"
ssl_truststore_location => "/etc/logstash/kafka.truststore.jks"
ssl_truststore_password => "changeit"
}
}索引模板与 ILM 生命周期管理
// Elasticsearch 索引模板
PUT _index_template/app-logs-template
{
"index_patterns": ["app-logs-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.lifecycle.name": "app-logs-policy",
"index.lifecycle.rollover_alias": "app-logs",
"index.codec": "best_compression",
"refresh_interval": "5s"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"timestamp": { "type": "date" },
"level": { "type": "keyword" },
"service": { "type": "keyword" },
"env": { "type": "keyword" },
"team": { "type": "keyword" },
"traceId": { "type": "keyword" },
"spanId": { "type": "keyword" },
"userId": { "type": "keyword" },
"host": { "type": "keyword" },
"message": { "type": "text", "analyzer": "ik_max_word" },
"statusCode": { "type": "integer" },
"durationMs": { "type": "float" },
"error": {
"properties": {
"type": { "type": "keyword" },
"message": { "type": "text" },
"stackTrace": { "type": "text" }
}
},
"geoip": {
"properties": {
"city_name": { "type": "keyword" },
"country_name": { "type": "keyword" },
"location": { "type": "geo_point" }
}
}
}
}
},
"composed_of": ["ecs-log"],
"priority": 200
}// ILM 生命周期策略
PUT _ilm/policy/app-logs-policy
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_primary_shard_size": "30gb",
"max_age": "1d",
"max_docs": 10000000
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"allocate": {
"number_of_replicas": 1,
"require": {
"data": "warm"
}
},
"shrink": {
"number_of_shards": 1
},
"forcemerge": {
"max_num_segments": 1
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"searchable_snapshot": {
"snapshot_repository": "logs-backup"
},
"set_priority": {
"priority": 0
}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}# 创建 ILM 策略
curl -X PUT "http://localhost:9200/_ilm/policy/app-logs-policy" \
-H "Content-Type: application/json" \
-d @ilm-policy.json
# 查看 ILM 策略状态
curl -s "http://localhost:9200/_ilm/policy/app-logs-policy?pretty"
# 查看索引的 ILM 状态
curl -s "http://localhost:9200/_cat/indices/app-logs-*?v&h=index,health,status,pri,rep,docs.count,store.size,ilm.creation"
# 查看索引大小
curl -s "http://localhost:9200/_cat/indices?v&h=index,store.size&s=store.size:desc" | head -20
# 强制 Rollover
curl -X POST "http://localhost:9200/app-logs/_rollover"Filebeat 模块配置
# 启用 Nginx 模块
filebeat.modules:
- module: nginx
access:
enabled: true
var.paths: ["/var/log/nginx/access.log*"]
error:
enabled: true
var.paths: ["/var/log/nginx/error.log*"]
# 启用 MySQL 模块
- module: mysql
error:
enabled: true
var.paths: ["/var/log/mysql/error.log*"]
slowlog:
enabled: true
var.paths: ["/var/log/mysql/mysql-slow.log*"]
# 启用 Kubernetes 模块
- module: kubernetes
enabled: true# 管理模块
filebeat modules list
filebeat modules enable nginx
filebeat modules disable mysql
# 模块配置文件位置
ls /etc/filebeat/modules.d/
# nginx.yml.disabled -> nginx.yml(启用后去掉 .disabled 后缀)监控与告警
# Filebeat 内置监控
# 在 filebeat.yml 中添加
monitoring.enabled: true
monitoring.cluster_uuid: ${CLUSTER_UUID}
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch:
hosts: ["http://10.0.0.11:9200"]# 检查 Filebeat 健康状态
curl -s http://localhost:5066/stats | python3 -m json.tool
# 检查 Filebeat 注册表
filebeat test output
# 检查 Elasticsearch 索引健康
curl -s "http://localhost:9200/_cluster/health?pretty"
curl -s "http://localhost:9200/_cat/indices?v&h=index,health,status,pri,rep,docs.count,store.size"# Prometheus 监控 Filebeat(需要 Metricbeat 或 Elastic Agent)
# 或者通过 Elasticsearch 的监控接口
curl -s "http://localhost:9200/_nodes/stats/indices,os,process?pretty" | python3 -m json.tool优点
缺点
总结
ELK + Filebeat 的核心不只是"收日志",而是让日志具备统一字段、可靠传输、可检索分析和生命周期治理能力。真正落地时,建议先统一应用日志格式和关键字段,再逐步引入多行处理、容器元数据、索引模板和 ILM,而不是一开始就堆复杂链路。
关键知识点
- 日志结构化比单纯全文采集更重要
- multiline 规则错误会直接导致堆栈日志拆裂或粘连
- 关键字段建议统一:timestamp、level、service、env、traceId、host
- 索引命名和 ILM 策略决定了长期成本与查询效率
- Filebeat 注册表(registry)记录了每个文件的读取位置
- dissect 比 grok 快 10 倍以上,优先使用 dissect 处理固定格式
- ECS(Elastic Common Schema)是推荐的日志字段命名规范
- Kafka 日志总线适合解耦采集和处理,支持多消费方
项目落地视角
- Java 服务优先输出 JSON 日志,减少 Logstash 解析成本
- K8s 集群统一从
/var/log/containers/*.log采集容器日志 - 关键服务日志进入热索引,普通日志按策略快速冷存或删除
- 故障排查时通过 traceId 串联网关、应用、数据库日志
- 不同环境(dev/staging/prod)使用不同的 ILM 策略
- 高敏感日志(含 PII)需要在采集层进行脱敏处理
常见误区
- 直接把所有日志原样扔进 ES,不做字段治理
- 把 DEBUG 日志长期保留在生产热索引里,导致成本飙升
- multiline 配置不匹配 Java/Go/Python 异常堆栈格式
- 只看采集成功,不关注 ES 索引膨胀与查询性能
- Filebeat 和 Elasticsearch 版本不匹配导致兼容性问题
- 忽略 Filebeat 注册表清理,导致磁盘空间被占满
- 容器日志不添加 Kubernetes 元数据,导致无法关联 Pod/Service 信息
进阶路线
- 使用 ECS(Elastic Common Schema)统一日志字段规范
- 为高价值日志建立专门的 ingest pipeline 与告警规则
- 接入 Kafka 做日志总线解耦
- 建立索引模板、ILM、冷热分层和成本监控体系
- 学习 Elasticsearch 的 Transform 和 Data Rollup 功能
- 评估 Elastic Agent 作为 Filebeat 的替代方案
适用场景
- 微服务应用日志集中检索与排障
- K8s / Docker 容器日志采集
- 安全审计、访问日志、错误日志统一归集
- 需要全文搜索和可视化分析的运维平台
- 多集群、多环境的统一日志管理
落地建议
- 先统一日志格式,再统一采集配置,最后再做高级分析
- 关键字段命名统一,不同服务不要各自为政
- 为高噪音日志设置过滤规则,避免无效数据堆积
- 为 Filebeat、Logstash、Elasticsearch 各层都加健康检查与容量监控
- 为 ES 存储设置配额告警,避免索引膨胀导致集群不可用
- 定期审查 ILM 策略,根据实际使用情况调整保留周期
排错清单
- 检查 Filebeat input 路径、权限和 multiline 规则是否正确
- 检查输出目标(ES/Logstash/Kafka)连通性与认证配置
- 检查文档字段是否爆炸、mapping 是否冲突
- 检查索引模板、ILM、时间字段和时区是否符合预期
- 检查 Filebeat 注册表是否损坏(可尝试删除后重建)
- 检查 Logstash JVM 堆内存是否足够
- 检查 Kafka Topic 分区数和消费者 Lag
复盘问题
- 你的日志链路里,最先需要保证的是"收得到"还是"查得准"?
- 哪些字段是团队排障真正依赖的?是否已经强制统一?
- 目前 ES 的成本主要来自数据量、字段数还是保留周期?
- 如果今天要根据 traceId 回放一条请求链路,现有日志体系够不够?
- 日志采集链路是否有端到端的监控和告警?
- 团队是否定期审查索引模板和 ILM 策略的效果?
