日志聚合与分析体系
大约 17 分钟约 5017 字
日志聚合与分析体系
简介
日志是系统运维和故障排查的核心数据来源。在分布式系统和微服务架构中,应用日志分散在数十甚至数百台服务器上,传统的 SSH 登录到每台服务器查看日志的方式已经无法满足运维需求。日志聚合与分析体系通过集中收集、结构化存储、高效检索和智能分析,帮助运维团队快速定位问题、监控系统状态、满足合规要求。
本文将系统性地介绍日志聚合与分析的完整技术栈,包括 rsyslog 本地日志管理、logrotate 日志轮转、ELK(Elasticsearch + Logstash + Kibana)集中式日志平台、Filebeat/Fluentd 轻量级日志采集、结构化日志规范以及日志告警和留存策略。
核心特点
| 特点 | 说明 |
|---|---|
| 集中收集 | 将分散在各节点的日志统一收集到中心节点 |
| 结构化存储 | 将非结构化日志转为可检索的结构化数据 |
| 全文检索 | 支持对日志内容进行全文搜索和聚合分析 |
| 实时告警 | 基于日志模式匹配触发告警通知 |
| 可视化分析 | 通过图表直观展示系统运行状态 |
| 合规审计 | 满足日志留存和安全审计要求 |
日志架构概览
+------------------+ +------------------+ +------------------+
| 应用服务器 A | | 应用服务器 B | | 应用服务器 C |
| Filebeat/Fluentd| | Filebeat/Fluentd| | Filebeat/Fluentd|
+--------+---------+ +--------+---------+ +--------+---------+
| | |
+------------------------+------------------------+
|
+--------+---------+
| Logstash / |
| Fluentd (Agg) |
+--------+---------+
|
+-------------+-------------+
| |
+--------+---------+ +---------+--------+
| Elasticsearch | | Elasticsearch |
| (数据节点) | | (主节点) |
+--------+---------+ +---------+--------+
| |
+-------------+-------------+
|
+--------+---------+
| Kibana |
| (可视化面板) |
+------------------+日志级别规范
日志级别定义
| 级别 | 值 | 说明 | 使用场景 |
|---|---|---|---|
| FATAL | 0 | 系统不可用 | 数据库连接完全失败、核心服务启动失败 |
| ERROR | 1 | 错误但不影响系统运行 | 请求处理失败、外部服务调用超时 |
| WARN | 2 | 警告信息 | 接近资源上限、降级处理、重试成功 |
| INFO | 3 | 关键业务信息 | 请求处理完成、用户登录、定时任务执行 |
| DEBUG | 4 | 调试信息 | SQL 语句、方法参数、中间变量 |
| TRACE | 5 | 详细追踪信息 | 完整的请求响应内容、函数调用链 |
日志级别配置建议
# 生产环境:INFO 级别
# 测试环境:DEBUG 级别
# 开发环境:DEBUG 或 TRACE 级别
# 故障排查:临时调整为 DEBUG 或 TRACErsyslog 配置
rsyslog 基础配置
# 安装 rsyslog(通常已预装)
yum install -y rsyslog
# 查看当前配置
cat /etc/rsyslog.conf
# rsyslog 配置格式说明
# facility.priority action
# facility: kern, user, mail, daemon, auth, syslog, lpr, news, cron, local0-7
# priority: emerg, alert, crit, err, warning, notice, info, debug
# action: 文件路径, 远程服务器, 用户, 程序主配置文件
cat > /etc/rsyslog.conf << 'EOF'
# 全局配置
global(
workDirectory="/var/lib/rsyslog"
maxMessageSize="64k"
)
# 加载模块
module(load="imuxsock") # 本地日志
module(load="imklog") # 内核日志
module(load="imfile") # 文件日志输入
module(load="imudp") # UDP 输入
module(load="imtcp") # TCP 输入
# UDP 接收(性能好,可能丢包)
input(type="imudp" port="514")
# TCP 接收(可靠,性能略低)
input(type="imtcp" port="514" MaxSessions="500")
# 规则配置
# 内核日志
kern.* /var/log/kernel.log
# 认证相关
auth.* /var/log/secure
authpriv.* /var/log/secure
# Cron 日志
cron.* /var/log/cron
# 邮件日志
mail.info -/var/log/maillog
mail.warning /var/log/maillog
# 系统日志
*.info;mail.none;authpriv.none;cron.none /var/log/messages
*.warning /var/log/warnings.log
*.err /var/log/errors.log
# 所有日志发送到远程日志服务器
*.* @@192.168.1.200:514
# 紧急消息发送到所有在线用户
*.emerg :omusrmsg:*
EOF
systemctl restart rsyslog
systemctl enable rsyslog应用日志转发配置
cat > /etc/rsyslog.d/app.conf << 'EOF'
# 转发 Nginx 访问日志
input(type="imfile"
File="/var/log/nginx/access.log"
Tag="nginx_access:"
Severity="info"
Facility="local1"
PersistStateInterval="100"
startmsg.regex="^\\S+"
)
# 转发 Nginx 错误日志
input(type="imfile"
File="/var/log/nginx/error.log"
Tag="nginx_error:"
Severity="error"
Facility="local1"
PersistStateInterval="100"
)
# 转发应用日志
input(type="imfile"
File="/opt/myapp/logs/app.log"
Tag="myapp:"
Severity="info"
Facility="local2"
PersistStateInterval="50"
)
# 转发多行日志(如 Java 堆栈)
input(type="imfile"
File="/opt/myapp/logs/app-error.log"
Tag="myapp_error:"
Severity="error"
Facility="local2"
startmsg.regex="^\\d{4}-\\d{2}-\\d{2}"
)
# local1(Nginx)转发到远程并保存本地
local1.* /var/log/nginx_rsyslog.log
local1.* @@192.168.1.200:514
# local2(应用)转发到远程并保存本地
local2.* /var/log/myapp_rsyslog.log
local2.* @@192.168.1.200:514
# 使用模板增强日志格式
template(name="JsonFormat" type="list") {
constant(value="{")
constant(value="\"timestamp\":\"")
property(name="timereported" dateFormat="rfc3339")
constant(value="\",\"host\":\"")
property(name="hostname")
constant(value="\",\"severity\":\"")
property(name="syslogseverity-text")
constant(value="\",\"facility\":\"")
property(name="syslogfacility-text")
constant(value="\",\"tag\":\"")
property(name="syslogtag")
constant(value="\",\"message\":\"")
property(name="msg" format="jsonf")
constant(value="\"}\n")
}
# 使用 JSON 格式发送到远程
local2.* action(type="omfwd"
target="192.168.1.200"
port="514"
protocol="tcp"
template="JsonFormat"
queue.type="LinkedList"
queue.size="10000"
queue.dequeuebatchsize="100"
action.resumeRetryCount="-1")
EOF
systemctl restart rsysloglogrotate 日志轮转
基础配置
# 全局配置文件
cat /etc/logrotate.conf
# 常用参数说明
# daily/weekly/monthly 轮转周期
# rotate N 保留 N 个轮转文件
# compress 压缩旧日志
# delaycompress 延迟一个周期再压缩
# missingok 日志文件不存在不报错
# notifempty 空文件不轮转
# create 0644 user group 创建新日志文件的权限和属主
# sharedscripts 多个日志文件共用脚本
# postrotate/endscript 轮转后执行的脚本
# size 100M 超过 100M 触发轮转
# maxsize 200M 最大不超过 200M
# dateext 使用日期作为扩展名
# dateformat 日期格式Nginx 日志轮转
cat > /etc/logrotate.d/nginx << 'EOF'
/var/log/nginx/*.log {
daily
rotate 30
compress
delaycompress
missingok
notifempty
create 0644 nginx nginx
dateext
dateformat -%Y%m%d
sharedscripts
postrotate
[ -f /var/run/nginx.pid ] && kill -USR1 $(cat /var/run/nginx.pid)
endscript
}
EOF应用日志轮转
cat > /etc/logrotate.d/myapp << 'EOF'
/opt/myapp/logs/*.log {
daily
rotate 60
compress
delaycompress
missingok
notifempty
create 0644 app app
dateext
dateformat -%Y%m%d
size 200M
maxsize 500M
sharedscripts
postrotate
# 通知应用重新打开日志文件
pid=$(pgrep -f myapp.jar)
if [ -n "${pid}" ]; then
kill -USR1 ${pid} 2>/dev/null || true
fi
endscript
}
EOF手动测试轮转
# 调试模式(不实际执行)
logrotate -d /etc/logrotate.d/nginx
# 强制执行一次
logrotate -f /etc/logrotate.d/nginx
# 查看轮转状态
cat /var/lib/logrotate/logrotate.status
# 查看轮转文件
ls -la /var/log/nginx/ELK Stack 部署
Elasticsearch 部署
# 安装 JDK
yum install -y java-11-openjdk java-11-openjdk-devel
# 配置 JAVA_HOME
cat >> /etc/profile << 'EOF'
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk
export PATH=$JAVA_HOME/bin:$PATH
EOF
source /etc/profile
# 添加 Elasticsearch YUM 源
cat > /etc/yum.repos.d/elasticsearch.repo << 'EOF'
[elasticsearch-7.x]
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
# 安装 Elasticsearch
yum install -y elasticsearch-7.17.18
# 配置 Elasticsearch
cat > /etc/elasticsearch/elasticsearch.yml << 'EOF'
cluster.name: log-cluster
node.name: node-1
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
# 网络配置
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300
# 集群发现
discovery.seed_hosts: ["192.168.1.200", "192.168.1.201"]
cluster.initial_master_nodes: ["node-1"]
# 内存设置
bootstrap.memory_lock: true
# 索引配置
action.destructive_requires_name: true
# 安全配置(生产建议开启)
xpack.security.enabled: false
EOF
# JVM 堆内存配置
cat > /etc/elasticsearch/jvm.options.d/heap.options << 'EOF'
-Xms4g
-Xmx4g
-XX:+UseG1GC
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30
EOF
# 系统参数优化
cat >> /etc/sysctl.conf << 'EOF'
vm.swappiness=1
vm.max_map_count=262144
EOF
sysctl -p
cat >> /etc/security/limits.conf << 'EOF'
elasticsearch soft memlock unlimited
elasticsearch hard memlock unlimited
elasticsearch soft nofile 65536
elasticsearch hard nofile 65536
EOF
# 创建数据目录并设置权限
mkdir -p /var/lib/elasticsearch
chown -R elasticsearch:elasticsearch /var/lib/elasticsearch
# 启动服务
systemctl daemon-reload
systemctl start elasticsearch
systemctl enable elasticsearch
# 验证
curl -s http://localhost:9200 | python3 -m json.tool
curl -s http://localhost:9200/_cluster/health?prettyLogstash 部署
# 安装 Logstash
yum install -y logstash-7.17.18
# 输入配置 - Beats 接收
cat > /etc/logstash/conf.d/01-inputs.conf << 'EOF'
input {
beats {
port => 5044
host => "0.0.0.0"
codec => plain {
charset => "UTF-8"
}
}
}
EOF
# 过滤配置 - Nginx 日志解析
cat > /etc/logstash/conf.d/10-nginx.conf << 'EOF'
filter {
if "nginx" in [tags] or [type] == "nginx" {
grok {
match => {
"message" => '%{IPORHOST:client_ip} - %{USERNAME:remote_user} \[%{HTTPDATE:timestamp}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:http_version}" %{NUMBER:status} %{NUMBER:bytes} "%{DATA:referer}" "%{DATA:user_agent}" %{NUMBER:response_time}'
}
overwrite => ["message"]
}
# 提取地理位置
geoip {
source => "client_ip"
target => "geoip"
fields => ["city_name", "country_name", "region_name"]
}
# 提取 UA 信息
useragent {
source => "user_agent"
target => "ua"
}
# 转换数值类型
mutate {
convert => {
"status" => "integer"
"bytes" => "integer"
"response_time" => "float"
}
}
# 时间戳处理
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
}
}
EOF
# 过滤配置 - 应用日志解析
cat > /etc/logstash/conf.d/11-app.conf << 'EOF'
filter {
if "myapp" in [tags] or [type] == "myapp" {
# 尝试 JSON 解析
json {
source => "message"
target => "parsed"
skip_on_invalid_json => true
}
# 如果不是 JSON,使用 grok
if ![parsed] {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:log_time} \[%{DATA:thread}\] %{LOGLEVEL:level} %{JAVACLASS:class} - %{GREEDYDATA:log_message}"
}
overwrite => ["message"]
}
}
# 提取错误堆栈
if [level] == "ERROR" or [level] == "FATAL" {
mutate {
add_field => { "alert_level" => "critical" }
}
}
}
}
EOF
# 输出配置
cat > /etc/logstash/conf.d/99-outputs.conf << 'EOF'
output {
# 输出到 Elasticsearch
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[type]}-%{+YYYY.MM.dd}"
template_name => "logs"
template_overwrite => true
}
# 调试输出(生产环境注释掉)
# stdout {
# codec => rubydebug
# }
}
EOF
# JVM 配置
cat > /etc/logstash/jvm.options << 'EOF'
-Xms2g
-Xmx2g
-XX:+UseG1GC
EOF
# 启动
systemctl start logstash
systemctl enable logstash
# 验证配置
/usr/share/logstash/bin/logstash --path.settings /etc/logstash -tKibana 部署
# 安装 Kibana
yum install -y kibana-7.17.18
# 配置 Kibana
cat > /etc/kibana/kibana.yml << 'EOF'
server.port: 5601
server.host: "0.0.0.0"
server.name: "log-kibana"
elasticsearch.hosts: ["http://localhost:9200"]
elasticsearch.requestTimeout: 30000
# 中文界面
i18n.locale: "zh-CN"
# 日志
logging.dest: /var/log/kibana/kibana.log
logging.verbose: false
# 安全
xpack.security.enabled: false
EOF
# 创建日志目录
mkdir -p /var/log/kibana
chown -R kibana:kibana /var/log/kibana
# 启动
systemctl start kibana
systemctl enable kibana
# 验证
curl -s http://localhost:5601/api/status | python3 -m json.toolFilebeat 日志采集
安装与配置
# 安装 Filebeat
yum install -y filebeat-7.17.18
# 主配置文件
cat > /etc/filebeat/filebeat.yml << 'EOF'
# =================== Filebeat 输入 ===================
filebeat.inputs:
# Nginx 访问日志
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
type: nginx-access
env: production
service: web
fields_under_root: true
tags: ["nginx", "web"]
multiline:
pattern: '^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
negate: true
match: after
# Nginx 错误日志
- type: log
enabled: true
paths:
- /var/log/nginx/error.log
fields:
type: nginx-error
env: production
fields_under_root: true
tags: ["nginx", "error"]
# 应用日志
- type: log
enabled: true
paths:
- /opt/myapp/logs/app.log
- /opt/myapp/logs/app-*.log
fields:
type: myapp
env: production
service: myapp
fields_under_root: true
tags: ["myapp", "java"]
multiline:
pattern: '^\d{4}-\d{2}-\d{2}'
negate: true
match: after
multiline.max_lines: 500
multiline.timeout: 5s
# MySQL 慢查询日志
- type: log
enabled: true
paths:
- /var/log/mysql/slow.log
fields:
type: mysql-slow
env: production
fields_under_root: true
tags: ["mysql", "database"]
multiline:
pattern: '^# Time:'
negate: true
match: after
# 系统日志
- type: log
enabled: true
paths:
- /var/log/messages
- /var/log/secure
fields:
type: syslog
env: production
fields_under_root: true
tags: ["system"]
# =================== Filebeat 模块 ===================
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
# 启用 Nginx 模块
# filebeat modules enable nginx
# =================== 输出配置 ===================
# 方案一:直连 Elasticsearch(简单场景)
# output.elasticsearch:
# hosts: ["192.168.1.200:9200"]
# indices:
# - index: "nginx-access-%{+yyyy.MM.dd}"
# when.contains:
# type: "nginx-access"
# - index: "nginx-error-%{+yyyy.MM.dd}"
# when.contains:
# type: "nginx-error"
# - index: "myapp-%{+yyyy.MM.dd}"
# when.contains:
# type: "myapp"
# 方案二:发送到 Logstash(推荐)
output.logstash:
hosts: ["192.168.1.200:5044"]
loadbalance: true
worker: 2
bulk_max_size: 2048
# =================== 日志输出 ===================
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644
# =================== 高级配置 ===================
# 注册表文件(记录已读取位置)
path.data: /var/lib/filebeat
# 关闭主机信息采集(减少数据量)
# processors:
# - drop_fields:
# fields: ["agent", "ecs", "host"]
# Queue 配置
queue.mem:
events: 4096
flush.min_events: 2048
flush.timeout: 1s
EOF
systemctl start filebeat
systemctl enable filebeatFilebeat 处理器配置
# 在 filebeat.yml 中添加处理器
processors:
# 添加主机信息
- add_host_metadata:
when.not.contains.tags: forwarded
# 添加 Docker 元数据
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
# 添加 Kubernetes 元数据
# - add_kubernetes_metadata:
# host: ${NODE_NAME}
# matchers:
# - logs_path:
# path: "/var/log/containers/"
# 删除不需要的字段(减少存储)
- drop_fields:
fields: ["agent.ephemeral_id", "agent.id", "agent.type", "agent.version",
"ecs.version", "input.type", "log.offset"]
ignore_missing: true
# 解析 JSON 格式日志
- decode_json_fields:
fields: ["message"]
target: "json"
overwrite_keys: true
when.contains:
type: "myapp-json"
# 重命名字段
- rename:
fields:
- from: "json.level"
to: "log_level"
- from: "json.traceId"
to: "trace_id"
ignore_missing: trueFluentd 日志采集(替代方案)
安装 Fluentd
# 使用 td-agent(Treasure Data 维护的稳定版本)
curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent4.sh | sh
# 启动
systemctl start td-agent
systemctl enable td-agentFluentd 配置
cat > /etc/td-agent/td-agent.conf << 'EOF'
# =================== 数据源 ===================
# 监听端口接收日志
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
# 采集 Nginx 访问日志
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/td-agent/nginx-access.pos
tag nginx.access
read_from_head false
<parse>
@type regexp
expression /^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)"? (?<response_time>[0-9.]+))?$/
time_key time
time_format %d/%b/%Y:%H:%M:%S %z
</parse>
</source>
# 采集应用日志(JSON 格式)
<source>
@type tail
path /opt/myapp/logs/app.log
pos_file /var/log/td-agent/myapp.pos
tag myapp
read_from_head false
<parse>
@type json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
# =================== 过滤处理 ===================
# 添加主机信息
<filter **>
@type record_transformer
<record>
hostname "#{Socket.gethostname}"
</record>
</filter>
# 解析地理位置
<filter nginx.access>
@type geoip
geoip_lookup_key remote
<record>
city ${city.names.en["remote"]}
country ${country.names.en["remote"]}
latitude ${location.latitude["remote"]}
longitude ${location.longitude["remote"]}
</record>
</filter>
# =================== 输出 ===================
# 输出到 Elasticsearch
<match **>
@type elasticsearch
host 192.168.1.200
port 9200
logstash_format true
logstash_prefix ${tag}
logstash_dateformat %Y.%m.%d
include_tag_key true
tag_key @log_name
flush_interval 5s
retry_max_interval 30
retry_forever true
<buffer>
@type file
path /var/log/td-agent/buffer
flush_mode interval
flush_interval 5s
chunk_limit_size 16MB
total_limit_size 8GB
overflow_action block
</buffer>
</match>
# 错误日志额外输出到文件
<match nginx.access>
@type copy
<store>
@type elasticsearch
host 192.168.1.200
port 9200
logstash_format true
logstash_prefix nginx-access
</store>
</match>
EOF
systemctl restart td-agent结构化日志规范
JSON 日志格式
{
"timestamp": "2024-01-15T10:30:00.123+08:00",
"level": "INFO",
"service": "user-service",
"traceId": "abc123def456",
"spanId": "span789",
"thread": "http-nio-8080-exec-1",
"class": "com.example.user.controller.UserController",
"method": "getUser",
"message": "Query user by id",
"userId": "12345",
"requestId": "req-uuid-001",
"clientIp": "192.168.1.50",
"duration": 45,
"status": "success",
"tags": {
"env": "production",
"version": "2.1.0",
"region": "cn-east"
}
}Logback 配置(Java 应用)
<!-- logback-spring.xml -->
<configuration>
<springProperty scope="context" name="APP_NAME" source="spring.application.name"/>
<springProperty scope="context" name="ENV" source="spring.profiles.active"/>
<!-- JSON 格式输出 -->
<appender name="JSON_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/opt/myapp/logs/app.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>/opt/myapp/logs/app.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>200MB</maxFileSize>
<maxHistory>30</maxHistory>
<totalSizeCap>5GB</totalSizeCap>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{
"service":"${APP_NAME}",
"env":"${ENV}"
}</customFields>
<includeMdc>true</includeMdc>
<includeCallerData>true</includeCallerData>
<fieldNames>
<timestamp>timestamp</timestamp>
<levelValue>[ignore]</levelValue>
</fieldNames>
</encoder>
</appender>
<!-- 错误日志单独输出 -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
<file>/opt/myapp/logs/app-error.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>/opt/myapp/logs/app-error.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<maxFileSize>100MB</maxFileSize>
<maxHistory>90</maxHistory>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="INFO">
<appender-ref ref="JSON_FILE"/>
<appender-ref ref="ERROR_FILE"/>
</root>
</configuration>日志搜索与分析
Kibana 查询语法
# 基础全文搜索
error message
"connection timeout"
# 字段搜索
level:ERROR
status:500
service:"user-service"
# 范围搜索
response_time:>1000
status:[400 TO 599]
@timestamp:[now-1h TO now]
# 布尔组合
level:ERROR AND service:"order-service"
level:ERROR OR level:FATAL
NOT level:DEBUG
level:WARN AND NOT service:"health-check"
# 通配符
message:Connection*
user_id:user_12?3
# 正则表达式
message:/exception:\s*\w+/
# 聚合查询
# 在 Kibana 中使用 Aggregation 构建图表常用 KQL 查询场景
# 查找错误日志
level: "ERROR" or level: "FATAL"
# 查找特定时间段的慢请求
response_time > 3000 and @timestamp >= "2024-01-15T00:00:00"
# 查找特定用户的操作轨迹
userId: "12345"
# 查找特定接口的错误
service: "order-service" and method: "createOrder" and level: "ERROR"
# 查找包含异常堆栈的日志
message: *Exception* and not level: "INFO"
# 通过 traceId 追踪完整请求链路
traceId: "abc123def456"日志告警
Elasticsearch Watcher 告警
{
"trigger": {
"schedule": {
"interval": "5m"
}
},
"input": {
"search": {
"request": {
"indices": ["myapp-*"],
"body": {
"query": {
"bool": {
"must": [
{ "match": { "level": "ERROR" } },
{ "range": { "@timestamp": { "gte": "now-5m" } } }
]
}
},
"aggs": {
"by_service": {
"terms": { "field": "service.keyword" }
}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.hits.total.value": {
"gt": 10
}
}
},
"actions": {
"send_email": {
"email": {
"to": "ops@example.com",
"subject": "Alert: {{ctx.payload.hits.total.value}} errors in last 5 minutes",
"body": "Error count: {{ctx.payload.hits.total.value}}"
}
}
}
}ElastAlert 告警(开源方案)
# 安装 ElastAlert
pip3 install elastalert
# 创建配置文件
cat > /opt/elastalert/config.yaml << 'EOF'
rules_folder: /opt/elastalert/rules
run_every:
minutes: 1
buffer_time:
minutes: 5
es_host: 192.168.1.200
es_port: 9200
writeback_index: elastalert_status
alert_time_limit:
days: 2
EOF
# 创建告警规则
cat > /opt/elastalert/rules/error_alert.yaml << 'EOF'
name: Application Error Alert
type: frequency
index: myapp-*
num_events: 10
timeframe:
minutes: 5
filter:
- term:
level: "ERROR"
alert:
- "email"
- "slack"
email:
- "ops@example.com"
slack:
slack_webhook_url: "https://hooks.slack.com/services/xxx"
alert_subject: "Alert: {0} errors detected in {1}"
alert_subject_args:
- num_events
- index
alert_text_type: alert_text_jinja
alert_text: |
错误数量: {{ num_events }}
时间范围: {{ timeframe }}
最近错误:
{% for match in matches %}
- {{ match['@timestamp'] }}: {{ match['message'][:100] }}
{% endfor %}
EOF基于日志模式的告警脚本
#!/bin/bash
# /opt/scripts/log_alert.sh
# 定期检查日志中的异常模式
LOG_INDEX="myapp-$(date +%Y.%m.%d)"
ES_URL="http://192.168.1.200:9200"
ALERT_WEBHOOK="https://hooks.slack.com/services/xxx"
# 检查错误率
ERROR_COUNT=$(curl -s "${ES_URL}/${LOG_INDEX}/_count" \
-H 'Content-Type: application/json' \
-d '{
"query": {
"bool": {
"must": [
{"match": {"level": "ERROR"}},
{"range": {"@timestamp": {"gte": "now-5m"}}}
]
}
}
}' | python3 -c "import sys,json; print(json.load(sys.stdin)['count'])")
THRESHOLD=50
if [ "${ERROR_COUNT}" -gt "${THRESHOLD}" ]; then
# 发送告警
curl -s -X POST "${ALERT_WEBHOOK}" \
-H 'Content-Type: application/json' \
-d "{
\"text\": \"[ALERT] 错误日志数量异常\",
\"blocks\": [{
\"type\": \"section\",
\"text\": {
\"type\": \"mrkdwn\",
\"text\": \"*错误日志告警*\n最近 5 分钟错误数: ${ERROR_COUNT} (阈值: ${THRESHOLD})\n索引: ${LOG_INDEX}\"
}
}]
}"
echo "$(date) - Alert sent: ${ERROR_COUNT} errors" >> /var/log/log_alert.log
fi存储与留存策略
ILM(Index Lifecycle Management)
# 创建 ILM 策略
curl -X PUT "http://localhost:9200/_ilm/policy/logs-policy" \
-H 'Content-Type: application/json' \
-d '{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "50gb",
"max_age": "1d"
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "3d",
"actions": {
"forcemerge": {
"max_num_segments": 1
},
"shrink": {
"number_of_shards": 1
},
"allocate": {
"require": {
"data": "warm"
}
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {},
"allocate": {
"require": {
"data": "cold"
}
}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}'
# 创建索引模板关联 ILM 策略
curl -X PUT "http://localhost:9200/_template/logs-template" \
-H 'Content-Type: application/json' \
-d '{
"index_patterns": ["nginx-*", "myapp-*", "syslog-*"],
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.lifecycle.name": "logs-policy",
"index.lifecycle.rollover_alias": "logs"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"message": { "type": "text", "analyzer": "standard" },
"level": { "type": "keyword" },
"service": { "type": "keyword" },
"host": { "type": "keyword" },
"traceId": { "type": "keyword" }
}
}
}'留存策略参考
| 日志类型 | 保留周期 | 存储层级 | 说明 |
|---|---|---|---|
| 访问日志 | 30 天 | hot -> warm | 用于日常分析 |
| 应用日志 | 90 天 | hot -> warm -> cold | 用于故障排查 |
| 错误日志 | 180 天 | hot -> warm | 用于问题分析 |
| 审计日志 | 365 天+ | warm -> cold | 合规要求 |
| 安全日志 | 365 天+ | cold | 安全审计 |
| 调试日志 | 7 天 | hot | 仅用于开发调试 |
优点
- 集中管理:所有日志统一收集、存储和检索
- 快速定位:全文搜索和聚合分析大幅缩短排障时间
- 可视化:Kibana 图表直观展示系统状态和趋势
- 实时告警:基于日志模式自动触发告警
- 合规支持:满足日志留存和安全审计要求
- 水平扩展:Elasticsearch 集群可随数据量扩展
缺点
- 资源消耗大:ELK Stack 对内存和磁盘要求高
- 运维复杂:Elasticsearch 集群管理和调优需要经验
- 学习成本:KQL 查询语法和 Lucene 语法需要学习
- 存储成本:大量日志的存储成本不容忽视
- 延迟问题:日志从产生到可搜索有数秒延迟
性能注意事项
- 索引分片规划:按日志量合理设置分片数,一般每个分片 30-50GB
- 批量写入:使用 bulk API 批量写入,减少网络开销
- 字段类型:合理设置字段类型,text vs keyword 避免不必要的分词
- 冷热架构:使用 ILM 策略将冷数据迁移到低成本存储
- Filebeat 性能:调整 bulk_max_size 和 worker 数量优化吞吐
- JVM 调优:Elasticsearch 堆内存不超过物理内存的 50%
总结
日志聚合与分析体系是现代运维基础设施的核心组成部分。从 rsyslog 本地管理到 ELK 集中化平台,从结构化日志规范到智能告警,构建一个完善的日志体系需要综合考虑采集效率、存储成本、检索性能和合规要求。选择合适的技术栈(Filebeat vs Fluentd、Logstash vs 直连 ES),制定合理的留存策略,才能在保障运维效率的同时控制总体成本。
关键知识点
- rsyslog 的工作原理和配置语法
- logrotate 日志轮转策略和触发机制
- ELK Stack 各组件的职责和交互方式
- Filebeat/Fluentd 日志采集的工作原理
- 结构化 JSON 日志规范和最佳实践
- Elasticsearch ILM 索引生命周期管理
- Kibana KQL 查询语法和聚合分析
- 基于日志模式的告警配置
常见误区
- 日志越多越好:过多无用日志增加存储成本和检索噪音
- 忽略日志格式统一:不同服务使用不同日志格式增加解析难度
- 不做日志分级:所有日志同等对待,无法快速定位关键信息
- 忽视索引管理:不设置 ILM 策略导致磁盘写满
- Filebeat 直连 ES:复杂场景应该使用 Logstash 做中间处理
- 忽略链路追踪:不记录 traceId 导致无法追踪跨服务调用
进阶路线
- EFK 方案:学习 Fluentd + Elasticsearch + Kibana 替代方案
- Loki + Grafana:学习 Grafana Labs 的轻量级日志方案
- ClickHouse 日志分析:了解基于列存储的日志分析方案
- OpenTelemetry:学习统一的可观测性标准(日志+指标+追踪)
- 机器学习异常检测:使用 Elasticsearch ML 进行自动异常检测
- 日志脱敏:在采集阶段自动识别和脱敏敏感信息
适用场景
- 分布式系统的集中式日志管理
- 微服务架构的链路追踪和问题定位
- Web 应用的访问分析和用户行为分析
- 安全审计和合规日志管理
- 数据库慢查询分析和性能优化
- 容器化环境(Docker/K8s)的日志收集
落地建议
- 先统一格式:推动所有服务使用 JSON 结构化日志输出
- 分级实施:先收集关键服务日志,再逐步覆盖全部服务
- 控制成本:合理设置 ILM 策略,避免无限膨胀
- 制定规范:建立日志命名、格式、级别的团队规范
- 告警先行:在日志平台上线前完成关键告警配置
- 定期演练:模拟故障场景,验证日志可检索性和告警有效性
排错清单
| 现象 | 可能原因 | 排查方法 |
|---|---|---|
| Filebeat 不发送日志 | 配置错误/路径错误 | filebeat test config, filebeat test output |
| Logstash 解析失败 | grok 正则不匹配 | 使用 Grok Debugger 调试 |
| ES 写入被拒绝 | 队列满/磁盘满 | 检查 _cat/thread_pool |
| Kibana 查询慢 | 索引分片不合理 | 优化 mappings 和分片数 |
| 日志丢失 | Filebeat 缓冲区满 | 增大 queue.mem.events |
| 磁盘增长过快 | ILM 未配置 | 配置 ILM 自动清理 |
| 日志延迟大 | 采集/传输瓶颈 | 检查 Filebeat 和网络 |
| 搜索无结果 | 索引模式不匹配 | 检查 Kibana Index Pattern |
复盘问题
- Filebeat 和 Fluentd 各自的优劣是什么?在什么场景下应该选择哪个?
- 如何设计一个支持每天 100TB 日志量的日志架构?
- Elasticsearch 的冷热架构是如何工作的?如何合理规划 hot/warm/cold 节点?
- 如何在日志采集阶段实现敏感信息(如密码、手机号)的自动脱敏?
- Logstash 的 grok 性能瓶颈如何解决?有哪些替代方案?
- 如何评估日志系统的存储成本并制定合理的压缩和清理策略?
