Python 日志进阶
大约 10 分钟约 2926 字
Python 日志进阶
简介
Python logging 模块是标准库提供的灵活日志框架,支持多层级 Logger、多种 Handler 和 Formatter 组合。进阶用法涵盖结构化日志、日志轮转、分布式追踪、性能优化等方面,是构建可观测系统的基础能力。
特点
实现
结构化日志与 JSON 格式输出
import logging
import json
import sys
from datetime import datetime, timezone
from typing import Any
class JsonFormatter(logging.Formatter):
"""JSON 结构化日志格式化器"""
def __init__(self, service_name: str = "app", version: str = "1.0.0"):
super().__init__()
self.service_name = service_name
self.version = version
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": datetime.fromtimestamp(
record.created, tz=timezone.utc
).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"service": self.service_name,
"version": self.version,
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# 合并额外字段
if hasattr(record, "extra_data"):
log_entry["data"] = record.extra_data
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry, ensure_ascii=False, default=str)
def setup_logging(level: str = "INFO", json_output: bool = False):
"""配置结构化日志"""
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, level.upper()))
handler = logging.StreamHandler(sys.stdout)
if json_output:
handler.setFormatter(JsonFormatter(service_name="my-service"))
else:
fmt = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
handler.setFormatter(logging.Formatter(fmt, datefmt="%Y-%m-%d %H:%M:%S"))
root_logger.addHandler(handler)
# 使用
setup_logging(level="DEBUG", json_output=True)
logger = logging.getLogger("api.order")
logger.info("订单创建", extra={
"extra_data": {"order_id": "ORD-001", "amount": 99.9, "user": "张三"}
})
# {"timestamp":"2024-...","level":"INFO","message":"订单创建","data":{"order_id":"ORD-001",...}}日志轮转与多 Handler 配置
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
import os
def setup_production_logging(log_dir: str = "logs", app_name: str = "app"):
"""生产环境日志配置"""
os.makedirs(log_dir, exist_ok=True)
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
# 1. 控制台 Handler(INFO 及以上)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_fmt = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
console_handler.setFormatter(logging.Formatter(console_fmt))
# 2. 按大小轮转的文件 Handler(DEBUG 及以上,单文件最大 10MB,保留 5 个备份)
file_handler = RotatingFileHandler(
f"{log_dir}/{app_name}.log",
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
encoding="utf-8",
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s | %(levelname)-8s | %(name)s:%(lineno)d | %(message)s"
))
# 3. 按时间轮转的文件 Handler(每天一个文件,保留 30 天)
daily_handler = TimedRotatingFileHandler(
f"{log_dir}/{app_name}_daily.log",
when="midnight",
interval=1,
backupCount=30,
encoding="utf-8",
)
daily_handler.setLevel(logging.INFO)
daily_handler.setFormatter(logging.Formatter(
"%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
))
# 4. 错误日志单独文件
error_handler = RotatingFileHandler(
f"{log_dir}/{app_name}_error.log",
maxBytes=10 * 1024 * 1024,
backupCount=10,
encoding="utf-8",
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(logging.Formatter(
"%(asctime)s | %(levelname)-8s | %(name)s:%(lineno)d | %(message)s | %(exc_info)s"
))
root_logger.addHandler(console_handler)
root_logger.addHandler(file_handler)
root_logger.addHandler(daily_handler)
root_logger.addHandler(error_handler)
return root_logger
# 使用
logger = setup_production_logging(app_name="order-service")
logger.info("服务启动")
logger.error("数据库连接失败", exc_info=True)请求追踪与上下文日志
import logging
import contextvars
import uuid
from typing import Any
# 请求级别的上下文变量
request_id_var: contextvars.ContextVar[str] = contextvars.ContextVar("request_id", default="-")
user_id_var: contextvars.ContextVar[str] = contextvars.ContextVar("user_id", default="-")
class ContextFilter(logging.Filter):
"""将上下文变量注入日志记录"""
def filter(self, record: logging.LogRecord) -> bool:
record.request_id = request_id_var.get()
record.user_id = user_id_var.get()
return True
def setup_context_logging():
"""配置带上下文的日志"""
logger = logging.getLogger("app")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.addFilter(ContextFilter())
fmt = (
"%(asctime)s | %(levelname)-8s | [req:%(request_id)s] "
"[user:%(user_id)s] | %(name)s | %(message)s"
)
handler.setFormatter(logging.Formatter(fmt))
logger.addHandler(handler)
return logger
# 使用示例
import functools
def with_request_context(func):
"""为请求生成追踪 ID"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
request_id_var.set(str(uuid.uuid4())[:8])
return func(*args, **kwargs)
return wrapper
@with_request_context
def process_order(order_id: str, user_id: str):
user_id_var.set(user_id)
logger = logging.getLogger("app.order")
logger.info(f"开始处理订单: {order_id}")
logger.info("库存检查通过")
logger.info(f"订单处理完成: {order_id}")
# 初始化后调用
# setup_context_logging()
# process_order("ORD-001", "U-12345")日志性能优化与防敏感信息泄漏
import logging
import re
from typing import Any
class SensitiveDataFilter(logging.Filter):
"""敏感信息脱敏过滤器"""
SENSITIVE_PATTERNS = {
r'password["\s:=]+(\S+)': 'password=***',
r'token["\s:=]+(\S+)': 'token=***',
r'secret["\s:=]+(\S+)': 'secret=***',
r'(api[_-]?key["\s:=]+)\S+': r'\1***',
r'(\d{3})\d{4}(\d{4})': r'\1****\2', # 手机号脱敏
r'(\w{2})\w+(@\S+)': r'\1***\2', # 邮箱脱敏
}
def filter(self, record: logging.LogRecord) -> bool:
if isinstance(record.msg, str):
for pattern, replacement in self.SENSITIVE_PATTERNS.items():
record.msg = re.sub(pattern, replacement, record.msg, flags=re.IGNORECASE)
return True
class RateLimitedLogger:
"""限速日志器:防止高频日志导致性能问题"""
def __init__(self, logger: logging.Logger, min_interval: float = 1.0):
self._logger = logger
self._min_interval = min_interval
self._last_log_time: dict[str, float] = {}
def _should_log(self, key: str) -> bool:
import time
now = time.time()
last = self._last_log_time.get(key, 0)
if now - last >= self._min_interval:
self._last_log_time[key] = now
return True
return False
def info(self, msg: str, *args, **kwargs):
if self._should_log(msg[:50]):
self._logger.info(msg, *args, **kwargs)
def warning(self, msg: str, *args, **kwargs):
if self._should_log(msg[:50]):
self._logger.warning(msg, *args, **kwargs)
def error(self, msg: str, *args, **kwargs):
# 错误日志不限速
self._logger.error(msg, *args, **kwargs)
# 使用
logger = logging.getLogger("app")
logger.addFilter(SensitiveDataFilter())
rate_logger = RateLimitedLogger(logger, min_interval=5.0)
# 日志输出时敏感信息自动脱敏
# logger.info("用户登录: password=secret123, token=abc123")
# 输出: 用户登录: password=***, token=***
# 高频日志限速
# for i in range(10000):
# rate_logger.info(f"处理请求 {i}") # 最多每 5 秒输出一次多进程安全日志与文件锁
import logging
import logging.handlers
import os
from typing import Optional
def setup_multiprocess_logging(log_dir: str = "logs", app_name: str = "app"):
"""多进程安全日志配置"""
os.makedirs(log_dir, exist_ok=True)
# 使用 ConcurrentRotatingFileHandler 或 SocketHandler
# 方案 1: 使用 handler 的延迟打开避免多进程冲突
file_handler = logging.handlers.RotatingFileHandler(
f"{log_dir}/{app_name}.log",
maxBytes=50 * 1024 * 1024, # 50MB
backupCount=10,
encoding="utf-8",
delay=True, # 延迟打开文件,减少进程间冲突
)
formatter = logging.Formatter(
"%(asctime)s | %(processName)s | %(levelname)-8s | %(name)s | %(message)s"
)
file_handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(file_handler)
return root_logger
# 方案 2: 使用 QueueHandler + QueueListener(推荐)
import multiprocessing
from logging.handlers import QueueHandler, QueueListener
def setup_queue_logging(log_queue: multiprocessing.Queue):
"""主进程设置日志监听器"""
listener = QueueListener(
log_queue,
*setup_handlers(),
respect_handler_level=True,
)
listener.start()
return listener
def setup_handlers():
"""创建日志 handlers"""
handlers = []
# 控制台 handler
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter(
"%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
))
handlers.append(console)
# 文件 handler
file_h = logging.handlers.RotatingFileHandler(
"logs/app.log", maxBytes=50 * 1024 * 1024, backupCount=10,
encoding="utf-8",
)
file_h.setLevel(logging.DEBUG)
file_h.setFormatter(logging.Formatter(
"%(asctime)s | %(processName)s | %(levelname)-8s | %(name)s:%(lineno)d | %(message)s"
))
handlers.append(file_h)
return handlers
def worker_setup(log_queue: multiprocessing.Queue):
"""子进程配置:将日志发送到队列"""
root = logging.getLogger()
root.handlers.clear() # 清除默认 handlers
root.addHandler(QueueHandler(log_queue))
root.setLevel(logging.DEBUG)日志级别动态调整与运行时控制
import logging
import logging.config
from typing import Optional
class DynamicLogLevelManager:
"""运行时动态调整日志级别"""
def __init__(self):
self._loggers: dict[str, logging.Logger] = {}
self._default_level = logging.INFO
def get_logger(self, name: str) -> logging.Logger:
logger = logging.getLogger(name)
self._loggers[name] = logger
return logger
def set_level(self, logger_name: str, level: str):
"""设置指定 logger 的级别"""
level_map = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
log_level = level_map.get(level.upper())
if log_level is None:
raise ValueError(f"无效的日志级别: {level}")
if logger_name == "root":
logging.getLogger().setLevel(log_level)
elif logger_name in self._loggers:
self._loggers[logger_name].setLevel(log_level)
else:
logging.getLogger(logger_name).setLevel(log_level)
def set_all_levels(self, level: str):
"""设置所有 logger 的级别"""
for name in self._loggers:
self.set_level(name, level)
def get_status(self) -> dict:
"""获取所有 logger 的当前级别"""
return {
name: logging.getLevelName(logger.level)
for name, logger in self._loggers.items()
}
# 使用
manager = DynamicLogLevelManager()
app_logger = manager.get_logger("app.api")
db_logger = manager.get_logger("app.database")
# 运行时动态调整(例如通过 API 端点)
# manager.set_level("app.database", "DEBUG")
# print(manager.get_status())日志性能优化技巧
import logging
# 1. 延迟格式化:使用 % 格式而非 f-string
# 好:只有日志级别满足时才会格式化
logger.debug("处理用户 %s, 数据: %r", user_id, data)
# 差:f-string 无论日志级别如何都会执行格式化
# logger.debug(f"处理用户 {user_id}, 数据: {data}")
# 2. 级别检查避免昂贵计算
if logger.isEnabledFor(logging.DEBUG):
expensive_data = compute_expensive_debug_info()
logger.debug("调试数据: %s", expensive_data)
# 3. 使用 logger.exception 自动记录异常堆栈
try:
result = risky_operation()
except Exception:
logger.exception("操作失败")
# 等价于 logger.error("操作失败", exc_info=True)
# 4. 自定义日志级别(谨慎使用)
VERBOSE = 5 # 比 DEBUG 更详细
logging.addLevelName(VERBOSE, "VERBOSE")
def verbose(self, msg, *args, **kwargs):
if self.isEnabledFor(VERBOSE):
self._log(VERBOSE, msg, args, **kwargs)
logging.Logger.verbose = verbose日志审计与合规记录
import logging
from datetime import datetime, timezone
from typing import Any, Optional
class AuditLogger:
"""审计日志记录器(不可篡改的合规日志)"""
def __init__(self, log_file: str = "logs/audit.log"):
self.logger = logging.getLogger("audit")
self.logger.setLevel(logging.INFO)
handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=100 * 1024 * 1024, backupCount=30,
encoding="utf-8",
)
handler.setFormatter(logging.Formatter(
"%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
))
self.logger.addHandler(handler)
def log_action(self, user_id: str, action: str, resource: str,
resource_id: str, details: Optional[dict] = None,
ip_address: str = "-", result: str = "success"):
"""记录审计事件"""
import json
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"user_id": user_id,
"action": action,
"resource": resource,
"resource_id": resource_id,
"ip": ip_address,
"result": result,
}
if details:
entry["details"] = details
self.logger.info(json.dumps(entry, ensure_ascii=False))
def log_login(self, user_id: str, ip: str, success: bool):
self.log_action(
user_id=user_id, action="login", resource="session",
resource_id="-", ip_address=ip,
result="success" if success else "failed"
)
def log_data_change(self, user_id: str, table: str, record_id: str,
changes: dict, ip: str = "-"):
self.log_action(
user_id=user_id, action="data_change", resource=table,
resource_id=record_id, details=changes, ip_address=ip
)
def log_permission_check(self, user_id: str, permission: str,
granted: bool, ip: str = "-"):
self.log_action(
user_id=user_id, action="permission_check",
resource=permission, resource_id="-",
ip_address=ip, result="granted" if granted else "denied"
)
# 使用
audit = AuditLogger()
audit.log_login("U-12345", "192.168.1.100", success=True)
audit.log_data_change("U-12345", "users", "U-67890",
{"field": "role", "old": "viewer", "new": "editor"})优点
缺点
总结
日志进阶的核心是构建结构化、可追踪、可脱敏的日志体系。生产环境应统一使用 JSON 格式输出,配合请求 ID 实现链路追踪,使用轮转策略控制磁盘占用,并通过脱敏过滤器防止敏感信息泄漏。建议将日志配置封装为项目级工具函数,确保所有服务使用统一的日志格式。
关键知识点
- Logger 层级继承:子 Logger 的日志会向上传播给父 Logger 的 Handler
- logging.getLogger(name) 是模块级 Logger 的标准用法
- 日志级别从低到高:DEBUG < INFO < WARNING < ERROR < CRITICAL
- RotatingFileHandler 按大小轮转,TimedRotatingFileHandler 按时间轮转
项目落地视角
- 所有服务统一使用 JSON 结构化日志,便于日志平台采集和分析
- 每个请求携带唯一的 request_id,贯穿日志、监控和告警链路
- 配置日志轮转策略,单文件不超过 50MB,保留最近 7-30 天
- 敏感字段(密码、token、手机号)必须在日志输出前脱敏
常见误区
- 在循环中大量输出 DEBUG 日志,导致磁盘写满和性能下降
- 日志中直接打印用户密码、API Key 等敏感信息
- 使用 print() 代替 logging,导致生产环境无法控制输出
- Logger 配置重复添加 Handler,导致同一条日志输出多次
进阶路线
- 学习 structlog 库,体验更现代的结构化日志 API
- 研究日志在 OpenTelemetry 追踪体系中的集成方式
- 了解日志采集平台(ELK、Loki)的最佳实践
- 探索异步日志方案(aiologger)在高并发场景中的应用
适用场景
- 所有需要运行时诊断和问题排查的生产服务
- 需要请求链路追踪的分布式微服务架构
- 需要审计和合规日志的金融、医疗等敏感业务
落地建议
- 封装项目级日志配置函数,统一日志格式、轮转策略和脱敏规则
- 在 CI/CD 中检查日志配置是否完整,禁止使用 print 输出
- 为关键业务操作定义标准日志字段和格式,确保日志可查询
排错清单
- 检查 Logger 是否重复添加 Handler 导致日志重复输出
- 确认日志轮转配置是否生效,磁盘空间是否充足
- 排查日志中的敏感信息是否已正确脱敏
复盘问题
- 你的服务在高峰期日志量是多少?是否有因日志导致性能问题的历史?
- 日志中的 request_id 是否贯穿了整个请求链路?能否通过 ID 追踪完整调用?
- 日志保留策略是否满足业务和合规要求?是否有因日志丢失导致无法排查问题的情况?
