Python 日志与监控
大约 10 分钟约 2905 字
Python 日志与监控
简介
生产级 Python 应用需要完善的日志记录和监控体系。理解 logging 模块的高级配置、结构化日志、APM 集成和告警机制,有助于快速定位和解决线上问题。
特点
logging 高级配置
结构化日志
import logging
import logging.config
import json
import sys
from datetime import datetime
from pythonjsonlogger import jsonlogger
# 1. JSON 格式日志(推荐用于生产)
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super().add_fields(log_record, record, message_dict)
log_record["timestamp"] = datetime.utcnow().isoformat()
log_record["level"] = record.levelname
log_record["logger"] = record.name
log_record["service"] = "my-service"
log_record["environment"] = "production"
# 添加请求上下文
if hasattr(record, "request_id"):
log_record["request_id"] = record.request_id
if hasattr(record, "user_id"):
log_record["user_id"] = record.user_id
if hasattr(record, "duration_ms"):
log_record["duration_ms"] = record.duration_ms
# 2. YAML 配置(推荐)
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json": {
"()": "pythonjsonlogger.jsonlogger.JsonFormatter",
"format": "%(timestamp)s %(level)s %(name)s %(message)s"
},
"console": {
"format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"stream": sys.stdout,
"formatter": "console",
"level": "INFO"
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"filename": "logs/app.log",
"maxBytes": 10485760, # 10MB
"backupCount": 10,
"formatter": "json",
"level": "INFO"
},
"error_file": {
"class": "logging.handlers.RotatingFileHandler",
"filename": "logs/error.log",
"maxBytes": 10485760,
"backupCount": 5,
"formatter": "json",
"level": "ERROR"
}
},
"loggers": {
"myapp": {
"handlers": ["console", "file", "error_file"],
"level": "INFO",
"propagate": False
},
"sqlalchemy.engine": {
"handlers": ["file"],
"level": "WARNING"
}
},
"root": {
"handlers": ["console"],
"level": "INFO"
}
}
# 应用配置
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("myapp")
# 3. 请求日志中间件(FastAPI)
from fastapi import Request
import time
import uuid
async def logging_middleware(request: Request, call_next):
"""请求日志中间件"""
request_id = str(uuid.uuid4())[:8]
start_time = time.perf_counter()
# 绑定请求上下文
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.request_id = request_id
record.user_id = getattr(request.state, "user_id", None)
return record
logging.setLogRecordFactory(record_factory)
logger.info(f"→ {request.method} {request.url.path}",
extra={"method": request.method, "path": str(request.url.path)})
try:
response = await call_next(request)
duration = (time.perf_counter() - start_time) * 1000
logger.info(f"← {response.status_code} ({duration:.1f}ms)",
extra={
"status_code": response.status_code,
"duration_ms": round(duration, 2)
})
response.headers["X-Request-Id"] = request_id
return response
except Exception as e:
duration = (time.perf_counter() - start_time) * 1000
logger.error(f"✗ {request.method} {request.url.path} - {e}",
extra={"duration_ms": round(duration, 2)},
exc_info=True)
raise
finally:
logging.setLogRecordFactory(old_factory)日志轮转与归档
文件管理
import logging.handlers
import gzip
import shutil
# 1. 时间轮转日志
def setup_timed_rotation():
handler = logging.handlers.TimedRotatingFileHandler(
filename="logs/app.log",
when="midnight", # 每天轮转
interval=1,
backupCount=30, # 保留 30 天
encoding="utf-8"
)
handler.suffix = "%Y-%m-%d"
handler.extMatch = r"^\d{4}-\d{2}-\d{2}$"
return handler
# 2. 自定义压缩轮转
class GzipRotatingFileHandler(logging.handlers.RotatingFileHandler):
"""自动压缩的轮转处理器"""
def doRollover(self):
if self.stream:
self.stream.close()
# 压缩旧文件
if self.backupCount > 0:
for i in range(self.backupCount - 1, 0, -1):
src = f"{self.baseFilename}.{i}.gz"
dst = f"{self.baseFilename}.{i + 1}.gz"
if os.path.exists(src):
if os.path.exists(dst):
os.remove(dst)
os.rename(src, dst)
# 压缩当前备份
backup = f"{self.baseFilename}.1"
if os.path.exists(backup):
with open(backup, "rb") as f_in:
with gzip.open(f"{backup}.gz", "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(backup)
# 创建新文件
self.stream = open(self.baseFilename, "a", encoding=self.encoding)OpenTelemetry 集成
分布式追踪
# pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-fastapi
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def setup_telemetry(service_name="my-service"):
"""配置 OpenTelemetry"""
# 追踪
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(endpoint="http://otel-collector:4317")
)
)
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer(service_name)
# 指标
meter = metrics.get_meter(service_name)
request_counter = meter.create_counter("app.requests")
request_duration = meter.create_histogram("app.request.duration")
return tracer, request_counter, request_duration
# FastAPI 自动插桩
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)
# 手动 Span
tracer = trace.get_tracer("myapp")
async def process_order(order_id: str):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
with tracer.start_as_current_span("validate_order"):
validate(order_id)
with tracer.start_as_current_span("save_order"):
save(order_id)
span.add_event("order_processed", {"order_id": order_id})告警机制
错误率监控
class ErrorRateMonitor:
"""错误率监控器"""
def __init__(self, threshold_percent=5.0, window_seconds=60):
self.threshold = threshold_percent
self.window = window_seconds
self.requests = [] # (timestamp, is_error)
self._lock = threading.Lock()
self._callbacks = []
def record(self, is_error: bool):
now = time.time()
with self._lock:
self.requests.append((now, is_error))
self._cleanup(now)
self._check_alert()
def _cleanup(self, now):
cutoff = now - self.window
self.requests = [(t, e) for t, e in self.requests if t > cutoff]
def get_error_rate(self) -> float:
now = time.time()
with self._lock:
self._cleanup(now)
if not self.requests:
return 0.0
errors = sum(1 for _, e in self.requests if e)
return errors / len(self.requests) * 100
def _check_alert(self):
rate = self.get_error_rate()
if rate > self.threshold:
for callback in self._callbacks:
callback(rate)
def on_alert(self, callback):
self._callbacks.append(callback)
# 使用
monitor = ErrorRateMonitor(threshold_percent=5.0)
monitor.on_alert(lambda rate: logger.critical(f"错误率告警: {rate:.1f}%"))
monitor.on_alert(lambda rate: send_alert_notification(f"错误率 {rate:.1f}%"))性能指标收集
import time
import threading
from collections import deque
from typing import Dict, Callable, Optional
from dataclasses import dataclass, field
@dataclass
class MetricPoint:
timestamp: float
value: float
tags: Dict[str, str] = field(default_factory=dict)
class MetricsCollector:
"""应用性能指标收集器"""
def __init__(self, retention_seconds: int = 3600, max_points: int = 10000):
self._metrics: Dict[str, deque] = {}
self._lock = threading.Lock()
self._retention = retention_seconds
self._max_points = max_points
self._counters: Dict[str, float] = {}
self._gauges: Dict[str, float] = {}
def record(self, metric_name: str, value: float, tags: Optional[Dict] = None):
"""记录指标数据点"""
point = MetricPoint(timestamp=time.time(), value=value, tags=tags or {})
with self._lock:
if metric_name not in self._metrics:
self._metrics[metric_name] = deque(maxlen=self._max_points)
self._metrics[metric_name].append(point)
def increment(self, counter_name: str, value: float = 1.0):
"""递增计数器"""
with self._lock:
self._counters[counter_name] = self._counters.get(counter_name, 0) + value
def set_gauge(self, gauge_name: str, value: float):
"""设置仪表盘值"""
with self._lock:
self._gauges[gauge_name] = value
def get_counter(self, counter_name: str) -> float:
return self._counters.get(counter_name, 0)
def get_gauge(self, gauge_name: str) -> float:
return self._gauges.get(gauge_name, 0)
def get_summary(self, metric_name: str, window_seconds: int = 60) -> dict:
"""获取指标摘要"""
cutoff = time.time() - window_seconds
with self._lock:
points = self._metrics.get(metric_name, deque())
values = [p.value for p in points if p.timestamp >= cutoff]
if not values:
return {"count": 0, "min": 0, "max": 0, "avg": 0, "p95": 0, "p99": 0}
values.sort()
n = len(values)
return {
"count": n,
"min": values[0],
"max": values[-1],
"avg": sum(values) / n,
"p95": values[int(n * 0.95)] if n > 1 else values[0],
"p99": values[int(n * 0.99)] if n > 1 else values[0],
}
def get_all_counters(self) -> dict:
return dict(self._counters)
def get_all_gauges(self) -> dict:
return dict(self._gauges)
# 全局指标收集器
metrics = MetricsCollector()
# 使用示例
class RequestMetrics:
"""请求指标中间件"""
@staticmethod
def record_request(duration_ms: float, method: str, path: str, status_code: int):
metrics.record("request.duration", duration_ms, {"method": method, "path": path})
metrics.increment("request.total")
if status_code >= 400:
metrics.increment("request.errors")
if duration_ms > 1000:
metrics.increment("request.slow")
@staticmethod
def record_db_query(duration_ms: float, query_type: str):
metrics.record("db.query_duration", duration_ms, {"type": query_type})
@staticmethod
def update_gauge_connections(active: int, idle: int):
metrics.set_gauge("db.connections.active", active)
metrics.set_gauge("db.connections.idle", idle)
# 查看指标
summary = metrics.get_summary("request.duration", window_seconds=300)
print(f"请求延迟: avg={summary['avg']:.1f}ms, p95={summary['p95']:.1f}ms")
print(f"总请求数: {metrics.get_counter('request.total')}")
print(f"错误数: {metrics.get_counter('request.errors')}")健康检查与就绪探针
import time
from dataclasses import dataclass
from typing import Dict, List, Callable, Optional
@dataclass
class HealthCheckResult:
name: str
status: str # "healthy", "degraded", "unhealthy"
message: str = ""
duration_ms: float = 0
timestamp: float = 0
class HealthChecker:
"""服务健康检查"""
def __init__(self):
self._checks: Dict[str, Callable] = {}
def register(self, name: str, check_fn: Callable):
"""注册健康检查项"""
self._checks[name] = check_fn
def check_all(self) -> dict:
"""执行所有健康检查"""
results = []
overall_status = "healthy"
for name, check_fn in self._checks.items():
start = time.perf_counter()
try:
message = check_fn()
status = "healthy"
except Exception as e:
message = str(e)
status = "unhealthy"
overall_status = "unhealthy"
finally:
duration = (time.perf_counter() - start) * 1000
results.append(HealthCheckResult(
name=name, status=status, message=message,
duration_ms=round(duration, 2), timestamp=time.time(),
))
return {
"status": overall_status,
"timestamp": time.time(),
"checks": [
{"name": r.name, "status": r.status, "message": r.message,
"duration_ms": r.duration_ms}
for r in results
],
}
# 使用
health = HealthChecker()
def check_database():
"""检查数据库连接"""
# db.execute("SELECT 1")
return "OK"
def check_redis():
"""检查 Redis 连接"""
# redis.ping()
return "OK"
def check_disk_space():
"""检查磁盘空间"""
import shutil
usage = shutil.disk_usage("/")
free_gb = usage.free / (1024 ** 3)
if free_gb < 1:
raise RuntimeError(f"磁盘空间不足: {free_gb:.1f}GB")
return f"{free_gb:.1f}GB free"
health.register("database", check_database)
health.register("redis", check_redis)
health.register("disk", check_disk_space)
# result = health.check_all()Prometheus 指标暴露
# pip install prometheus-client
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import random
# 定义 Prometheus 指标
REQUEST_COUNT = Counter(
"app_http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"],
)
REQUEST_DURATION = Histogram(
"app_http_request_duration_seconds",
"HTTP request duration",
["method", "endpoint"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
ACTIVE_CONNECTIONS = Gauge(
"app_active_connections",
"Active connections",
["type"],
)
ERROR_COUNT = Counter(
"app_errors_total",
"Total errors",
["type", "module"],
)
# 启动 Prometheus 指标服务器
start_http_server(9090) # http://localhost:9090/metrics
# 在业务代码中记录指标
def handle_request(method: str, endpoint: str):
with REQUEST_DURATION.labels(method=method, endpoint=endpoint).time():
start = time.time()
try:
# 处理请求
time.sleep(random.uniform(0.01, 0.1))
status = "200"
except Exception:
status = "500"
ERROR_COUNT.labels(type="http", module="api").inc()
finally:
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
# Prometheus 配置示例(prometheus.yml)
# scrape_configs:
# - job_name: 'my-python-app'
# static_configs:
# - targets: ['localhost:9090']日志与告警集成实战
import logging
import smtplib
from email.mime.text import MIMEText
from typing import List, Optional
from dataclasses import dataclass
from datetime import datetime
logger = logging.getLogger("monitoring")
@dataclass
class AlertRule:
name: str
metric: str
threshold: float
operator: str # "gt", "lt", "gte", "lte"
window_seconds: int = 60
severity: str = "warning" # "warning", "critical"
class AlertManager:
"""告警管理器"""
def __init__(self, metrics_collector):
self._rules: List[AlertRule] = []
self._metrics = metrics_collector
self._handlers: List[Callable] = []
self._alert_history: List[dict] = []
def add_rule(self, rule: AlertRule):
self._rules.append(rule)
def add_handler(self, handler: Callable):
"""添加告警处理器(邮件、Slack、Webhook 等)"""
self._handlers.append(handler)
def evaluate(self):
"""评估所有告警规则"""
for rule in self._rules:
summary = self._metrics.get_summary(rule.metric, rule.window_seconds)
value = summary.get("avg", 0)
triggered = False
if rule.operator == "gt" and value > rule.threshold:
triggered = True
elif rule.operator == "lt" and value < rule.threshold:
triggered = True
elif rule.operator == "gte" and value >= rule.threshold:
triggered = True
elif rule.operator == "lte" and value <= rule.threshold:
triggered = True
if triggered:
alert = {
"rule": rule.name,
"metric": rule.metric,
"value": value,
"threshold": rule.threshold,
"severity": rule.severity,
"timestamp": datetime.now().isoformat(),
}
self._alert_history.append(alert)
logger.warning(f"告警触发: {alert}")
for handler in self._handlers:
try:
handler(alert)
except Exception as e:
logger.error(f"告警处理器异常: {e}")
def email_alert_handler(alert: dict):
"""邮件告警处理器"""
logger.info(f"发送邮件告警: {alert['rule']}")
def webhook_alert_handler(alert: dict, webhook_url: str):
"""Webhook 告警处理器"""
import requests
try:
requests.post(webhook_url, json=alert, timeout=5)
except Exception as e:
logger.error(f"Webhook 发送失败: {e}")
# 使用
alert_manager = AlertManager(metrics)
alert_manager.add_rule(AlertRule(
name="高错误率", metric="request.error_rate",
threshold=5.0, operator="gt", severity="critical",
))
alert_manager.add_rule(AlertRule(
name="高延迟", metric="request.duration",
threshold=1000.0, operator="gt", severity="warning",
))
alert_manager.add_handler(email_alert_handler)优点
缺点
总结
生产级日志使用 JSON 结构化格式,通过 python-json-logger 或 structlog 库实现。日志轮转使用 RotatingFileHandler(按大小)或 TimedRotatingFileHandler(按时间)。FastAPI 请求日志中间件记录方法、路径、状态码和耗时。OpenTelemetry 提供分布式追踪和指标收集。告警机制通过滑动窗口计算错误率,超过阈值触发通知。建议使用 logging.config.dictConfig 集中管理配置,区分 console(开发)和 JSON(生产)格式。
关键知识点
- 先区分这篇内容属于语法能力、工程能力,还是生态工具能力。
- Python 的开发效率来自生态,但可维护性来自结构、测试和规范。
- 脚本一旦进入长期维护,就必须按项目来治理。
- 框架与语言特性类主题要同时理解运行方式和工程组织方式。
项目落地视角
- 统一虚拟环境、依赖锁定、格式化和日志方案。
- 把入口、配置、业务逻辑和工具函数拆开,避免单文件膨胀。
- 对网络请求、文件读写和数据处理结果做异常与样本校验。
- 明确项目入口、配置管理、依赖管理、日志和测试策略。
常见误区
- 把临时脚本直接当生产代码使用。
- 忽略依赖版本、编码、路径和时区差异。
- 只会写 happy path,没有补超时、重试和资源释放。
- 把 notebook 或脚本风格直接带入长期维护项目。
进阶路线
- 把类型注解、测试、打包和部署纳入统一工程流程。
- 继续向异步、性能、数据管线和框架源码层深入。
- 把常用脚本抽成可复用库或 CLI 工具,而不是复制粘贴。
- 继续补齐部署、打包、监控和性能调优能力。
适用场景
- 当你准备把《Python 日志与监控》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合脚本自动化、数据处理、Web 开发和测试工具建设。
- 当需求强调快速迭代和丰富生态时,Python 往往能快速起步。
落地建议
- 统一使用虚拟环境与依赖锁定,避免环境漂移。
- 对核心函数补类型注解、异常处理和日志,减少“脚本黑盒”。
- 一旦脚本进入生产链路,及时补测试和监控。
排错清单
- 先确认当前解释器、虚拟环境和依赖版本是否正确。
- 检查编码、路径、时区和第三方库行为差异。
- 排查同步阻塞、数据库连接未释放或网络请求无超时。
复盘问题
- 如果把《Python 日志与监控》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《Python 日志与监控》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《Python 日志与监控》最大的收益和代价分别是什么?
