Python 数据处理管线
大约 10 分钟约 3112 字
Python 数据处理管线
简介
数据管线(Data Pipeline)将数据采集、清洗、转换和存储串联为自动化的处理流程。理解 ETL 流程、数据验证、增量处理和任务调度,有助于构建可靠的数据处理系统。
特点
ETL 管线
抽取、转换、加载
import pandas as pd
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Generator
@dataclass
class DataPipelineConfig:
source_path: str
output_path: str
batch_size: int = 10000
date_column: str = "created_at"
class ETLPipeline:
"""ETL 数据管线"""
def __init__(self, config: DataPipelineConfig):
self.config = config
self.metrics = {
"extracted": 0,
"transformed": 0,
"loaded": 0,
"errors": 0,
}
def extract(self) -> Generator[pd.DataFrame, None, None]:
"""抽取 — 分批读取源数据"""
print(f"[Extract] 读取: {self.config.source_path}")
# 分批读取 CSV
reader = pd.read_csv(
self.config.source_path,
chunksize=self.config.batch_size,
parse_dates=[self.config.date_column],
encoding="utf-8"
)
for i, chunk in enumerate(reader):
self.metrics["extracted"] += len(chunk)
print(f" Batch {i}: {len(chunk)} rows")
yield chunk
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""转换 — 数据清洗和特征工程"""
original_count = len(df)
# 1. 去重
df = df.drop_duplicates()
# 2. 处理缺失值
numeric_cols = df.select_dtypes(include=["number"]).columns
df[numeric_cols] = df[numeric_cols].fillna(0)
text_cols = df.select_dtypes(include=["object"]).columns
df[text_cols] = df[text_cols].fillna("")
# 3. 异常值处理(IQR 方法)
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
df[col] = df[col].clip(lower, upper)
# 4. 标准化文本
for col in text_cols:
df[col] = df[col].str.strip().str.lower()
# 5. 特征工程
if self.config.date_column in df.columns:
dt = df[self.config.date_column]
df["year"] = dt.dt.year
df["month"] = dt.dt.month
df["day_of_week"] = dt.dt.dayofweek
df["hour"] = dt.dt.hour
# 6. 类型转换
df = df.convert_dtypes()
self.metrics["transformed"] += len(df)
removed = original_count - len(df)
if removed > 0:
print(f" 去重: 移除 {removed} 行")
return df
def load(self, df: pd.DataFrame):
"""加载 — 写入目标存储"""
output_path = self.config.output_path
self.metrics["loaded"] += len(df)
# 追加写入 Parquet(高效的列式存储)
df.to_parquet(
output_path,
engine="pyarrow",
compression="snappy",
index=False
)
print(f" 写入 {len(df)} 行到 {output_path}")
def run(self):
"""运行完整管线"""
start = datetime.now()
print(f"=== ETL Pipeline Start: {start} ===")
for chunk in self.extract():
transformed = self.transform(chunk)
self.load(transformed)
elapsed = (datetime.now() - start).total_seconds()
print(f"=== ETL Pipeline Complete: {elapsed:.2f}s ===")
print(f"Metrics: {self.metrics}")
return self.metrics数据验证
Pydantic 数据校验
from pydantic import BaseModel, Field, validator
from typing import Optional
from datetime import datetime
class RawUserRecord(BaseModel):
"""原始数据模型"""
user_id: str
name: str = ""
email: Optional[str] = None
age: int = 0
created_at: Optional[str] = None
status: str = "active"
@validator("email")
def validate_email(cls, v):
if v and "@" not in v:
raise ValueError(f"无效邮箱: {v}")
return v
@validator("age")
def validate_age(cls, v):
if v < 0 or v > 150:
raise ValueError(f"无效年龄: {v}")
return v
class CleanedUserRecord(BaseModel):
"""清洗后的数据模型"""
user_id: int
name: str
email: Optional[str]
age: int
status: str
created_at: Optional[datetime]
@validator("user_id", pre=True)
def parse_user_id(cls, v):
return int(str(v).strip())
class DataValidator:
"""数据验证器"""
def __init__(self):
self.errors = []
self.valid_records = []
self.invalid_records = []
def validate_batch(self, records: list[dict]) -> list[CleanedUserRecord]:
"""批量验证数据"""
for i, record in enumerate(records):
try:
# 原始数据验证
raw = RawUserRecord(**record)
# 转换为清洗后模型
cleaned = CleanedUserRecord(
user_id=raw.user_id,
name=raw.name.title(), # 标准化名字
email=raw.email,
age=raw.age,
status=raw.status,
created_at=datetime.fromisoformat(raw.created_at) if raw.created_at else None
)
self.valid_records.append(cleaned)
except Exception as e:
self.errors.append({"row": i, "error": str(e), "data": record})
self.invalid_records.append(record)
print(f"验证: {len(self.valid_records)} 有效, {len(self.invalid_records)} 无效")
return self.valid_records增量处理
变更数据捕获
class IncrementalProcessor:
"""增量数据处理"""
def __init__(self, state_path: str = "pipeline_state.json"):
self.state_path = state_path
self.state = self._load_state()
def _load_state(self) -> dict:
"""加载处理状态"""
try:
with open(self.state_path) as f:
return json.load(f)
except FileNotFoundError:
return {"last_processed_id": 0, "last_processed_time": None}
def _save_state(self):
"""保存处理状态"""
with open(self.state_path, "w") as f:
json.dump(self.state, f, indent=2)
def get_new_records(self, df: pd.DataFrame, id_column: str = "id",
time_column: str = "updated_at") -> pd.DataFrame:
"""获取新增/更新的记录"""
last_id = self.state.get("last_processed_id", 0)
last_time = self.state.get("last_processed_time")
# 按 ID 或时间过滤
new_records = df[df[id_column] > last_id]
if last_time and time_column in df.columns:
time_mask = pd.to_datetime(df[time_column]) > pd.Timestamp(last_time)
new_records = df[id_mask | time_mask]
print(f"增量记录: {len(new_records)} (总 {len(df)})")
return new_records
def mark_processed(self, df: pd.DataFrame, id_column: str = "id",
time_column: str = "updated_at"):
"""标记已处理"""
if len(df) > 0:
self.state["last_processed_id"] = int(df[id_column].max())
if time_column in df.columns:
self.state["last_processed_time"] = str(df[time_column].max())
self._save_state()
# 使用
processor = IncrementalProcessor()
# 每次运行只处理新数据
full_data = pd.read_csv("data.csv")
new_data = processor.get_new_records(full_data)
if len(new_data) > 0:
transformed = transform(new_data)
load(transformed)
processor.mark_processed(new_data)任务调度
简单任务调度器
import schedule
import threading
class DataPipelineScheduler:
"""数据管线调度器"""
def __init__(self):
self.pipelines = {}
self._running = False
def register(self, name: str, pipeline_func, schedule_expr: str):
"""注册管线和调度"""
self.pipelines[name] = {
"func": pipeline_func,
"schedule": schedule_expr
}
# 配置调度
parts = schedule_expr.split()
if len(parts) == 2 and parts[1] == "minutes":
schedule.every(int(parts[0])).minutes.do(pipeline_func)
elif schedule_expr == "hourly":
schedule.every().hour.do(pipeline_func)
elif schedule_expr == "daily":
schedule.every().day.at("02:00").do(pipeline_func)
def run_pipeline(self, name: str):
"""手动触发管线"""
if name in self.pipelines:
print(f"手动触发: {name}")
self.pipelines[name]["func"]()
def start(self):
"""启动调度器"""
self._running = True
def run_scheduler():
while self._running:
schedule.run_pending()
time.sleep(1)
thread = threading.Thread(target=run_scheduler, daemon=True)
thread.start()
print("调度器已启动")
def stop(self):
self._running = False
schedule.clear()
print("调度器已停止")
# 使用
scheduler = DataPipelineScheduler()
scheduler.register("daily_sync", lambda: ETLPipeline(config).run(), "daily")
scheduler.register("hourly_metrics", compute_metrics, "hourly")
scheduler.register("realtime_alerts", check_alerts, "5 minutes")
scheduler.start()Airflow DAG 编排
Apache Airflow 是生产级数据管线编排工具,通过 DAG(有向无环图)定义任务依赖关系。
"""Airflow DAG 示例 — 用户数据日终处理"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
# 默认参数
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email": ["data-team@example.com"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="user_data_daily_pipeline",
default_args=default_args,
description="用户数据日终处理管线",
schedule_interval="0 2 * * *", # 每天凌晨 2 点
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["etl", "user"],
) as dag:
# 任务 1:等待源数据文件就绪
wait_for_file = FileSensor(
task_id="wait_for_source_file",
filepath="/data/incoming/users_{{ ds }}.csv",
poke_interval=60, # 每 60 秒检查一次
timeout=3600, # 超时 1 小时
mode="poke",
)
# 任务 2:抽取数据
def extract_data(**context):
import pandas as pd
ds = context["ds"]
df = pd.read_csv(f"/data/incoming/users_{ds}.csv")
df.to_parquet(f"/data/staging/users_{ds}.parquet", index=False)
return len(df)
extract = PythonOperator(
task_id="extract",
python_callable=extract_data,
)
# 任务 3:数据清洗
def transform_data(**context):
import pandas as pd
ds = context["ds"]
df = pd.read_parquet(f"/data/staging/users_{ds}.parquet")
df = df.drop_duplicates()
df = df.dropna(subset=["user_id", "email"])
df["processed_at"] = datetime.now()
df.to_parquet(f"/data/processed/users_{ds}.parquet", index=False)
return len(df)
transform = PythonOperator(
task_id="transform",
python_callable=transform_data,
)
# 任务 4:加载数据到数据仓库
def load_data(**context):
import pandas as pd
ds = context["ds"]
df = pd.read_parquet(f"/data/processed/users_{ds}.parquet")
# 写入数据库(伪代码)
# df.to_sql("users", engine, if_exists="append", index=False)
print(f"已加载 {len(df)} 条记录")
load = PythonOperator(
task_id="load",
python_callable=load_data,
)
# 任务 5:数据质量检查
def quality_check(**context):
import pandas as pd
ds = context["ds"]
df = pd.read_parquet(f"/data/processed/users_{ds}.parquet")
checks = {
"row_count": len(df) > 0,
"no_null_ids": df["user_id"].notna().all(),
"valid_emails": df["email"].str.contains("@").all(),
}
failed = [k for k, v in checks.items() if not v]
if failed:
raise ValueError(f"数据质量检查失败: {failed}")
return checks
quality = PythonOperator(
task_id="quality_check",
python_callable=quality_check,
)
# 任务 6:发送通知
notify = BashOperator(
task_id="notify",
bash_command='echo "Pipeline completed for {{ ds }}"',
)
# 定义任务依赖
wait_for_file >> extract >> transform >> load >> quality >> notify数据管线监控与告警
"""数据管线监控"""
import logging
import smtplib
from datetime import datetime
from dataclasses import dataclass, field
from typing import Callable
logger = logging.getLogger("pipeline")
@dataclass
class PipelineMetrics:
"""管线运行指标"""
pipeline_name: str
start_time: datetime = field(default_factory=datetime.now)
end_time: datetime | None = None
records_read: int = 0
records_written: int = 0
records_failed: int = 0
errors: list[str] = field(default_factory=list)
@property
def duration_seconds(self) -> float:
end = self.end_time or datetime.now()
return (end - self.start_time).total_seconds()
@property
def success_rate(self) -> float:
total = self.records_read
return (self.records_written / total * 100) if total > 0 else 0
def summary(self) -> str:
return (
f"管线: {self.pipeline_name}\n"
f"耗时: {self.duration_seconds:.1f}s\n"
f"读取: {self.records_read}, 写入: {self.records_written}, "
f"失败: {self.records_failed}\n"
f"成功率: {self.success_rate:.1f}%"
)
class PipelineMonitor:
"""管线监控器"""
def __init__(self):
self.alert_rules: list[dict] = []
self.history: list[PipelineMetrics] = []
def add_alert_rule(self, name: str, condition: Callable[[PipelineMetrics], bool],
message: str):
"""添加告警规则"""
self.alert_rules.append({
"name": name, "condition": condition, "message": message
})
def check_alerts(self, metrics: PipelineMetrics):
"""检查告警规则"""
for rule in self.alert_rules:
if rule["condition"](metrics):
self._send_alert(rule["name"], rule["message"].format(metrics=metrics))
def _send_alert(self, rule_name: str, message: str):
"""发送告警"""
logger.warning(f"[ALERT] {rule_name}: {message}")
# 实际场景中对接邮件/Slack/钉钉等
def record(self, metrics: PipelineMetrics):
"""记录运行指标"""
self.history.append(metrics)
self.check_alerts(metrics)
# 预定义告警规则
monitor = PipelineMonitor()
monitor.add_alert_rule(
"high_failure_rate",
lambda m: m.success_rate < 95,
"成功率低于 95%: {metrics.success_rate:.1f}%"
)
monitor.add_alert_rule(
"slow_pipeline",
lambda m: m.duration_seconds > 3600,
"管线执行超过 1 小时: {metrics.duration_seconds:.0f}s"
)
monitor.add_alert_rule(
"zero_records",
lambda m: m.records_read == 0,
"管线未读取到任何记录"
)Parquet 与数据格式选型
"""数据存储格式对比与选型"""
import pandas as pd
import time
# 常见数据格式对比:
# +----------+--------+----------+-----------+----------+
# | 格式 | 压缩率 | 读取速度 | 列式/行式 | Schema |
# +----------+--------+----------+-----------+----------+
# | CSV | 无 | 慢 | 行式 | 无 |
# | JSON | 无 | 慢 | 行式/嵌套 | 有(弱) |
# | Parquet | 高 | 快(列) | 列式 | 有 |
# | Avro | 中 | 中 | 行式 | 有 |
# | Feather | 低 | 最快 | 列式 | 有 |
# +----------+--------+----------+-----------+----------+
# Parquet 最佳实践
def write_partitioned_parquet(df: pd.DataFrame, output_dir: str,
partition_cols: list[str]):
"""分区写入 Parquet 文件"""
df.to_parquet(
output_dir,
engine="pyarrow",
compression="snappy", # snappy 速度快,gzip 压缩率高
index=False,
partition_cols=partition_cols # 按指定列分区
)
# 示例:按年月分区
# write_partitioned_parquet(df, "/data/output", ["year", "month"])
# 生成目录结构:
# /data/output/year=2024/month=01/part-0.parquet
# /data/output/year=2024/month=02/part-0.parquet
# 读取分区数据(自动过滤)
# df = pd.read_parquet("/data/output", filters=[("year", "==", 2024)])
# 性能对比
def benchmark_formats(df: pd.DataFrame):
"""对比不同格式的写入和读取性能"""
formats = {
"CSV": {"write": "to_csv", "read": "read_csv", "ext": ".csv"},
"Parquet (snappy)": {"write": "to_parquet", "read": "read_parquet",
"ext": ".parquet", "compression": "snappy"},
"Parquet (gzip)": {"write": "to_parquet", "read": "read_parquet",
"ext": ".parquet.gz", "compression": "gzip"},
"Feather": {"write": "to_feather", "read": "read_feather", "ext": ".feather"},
}
results = {}
for name, fmt in formats.items():
file_path = f"benchmark{fmt['ext']}"
# 写入测试
start = time.perf_counter()
write_func = getattr(df, fmt["write"])
if "compression" in fmt:
write_func(file_path, compression=fmt["compression"], index=False)
else:
write_func(file_path, index=False)
write_time = time.perf_counter() - start
# 读取测试
start = time.perf_counter()
read_func = getattr(pd, fmt["read"])
_ = read_func(file_path)
read_time = time.perf_counter() - start
# 文件大小
import os
file_size = os.path.getsize(file_path) / 1024 / 1024
results[name] = {
"write_seconds": round(write_time, 3),
"read_seconds": round(read_time, 3),
"file_size_mb": round(file_size, 2),
}
return pd.DataFrame(results).T优点
缺点
总结
ETL 管线分为抽取(Extract)、转换(Transform)、加载(Load)三步,使用 Pandas 分批处理大数据。数据验证使用 Pydantic 模型确保数据质量,包括类型检查、范围验证和格式标准化。增量处理通过记录处理状态(last_id/last_time)只处理新增/变更数据。任务调度使用 schedule 库或 Apache Airflow 实现定时执行。建议使用 Parquet 列式存储格式、实现断点续传和错误重试机制。
关键知识点
- 先区分这篇内容属于语法能力、工程能力,还是生态工具能力。
- Python 的开发效率来自生态,但可维护性来自结构、测试和规范。
- 脚本一旦进入长期维护,就必须按项目来治理。
- 框架与语言特性类主题要同时理解运行方式和工程组织方式。
项目落地视角
- 统一虚拟环境、依赖锁定、格式化和日志方案。
- 把入口、配置、业务逻辑和工具函数拆开,避免单文件膨胀。
- 对网络请求、文件读写和数据处理结果做异常与样本校验。
- 明确项目入口、配置管理、依赖管理、日志和测试策略。
常见误区
- 把临时脚本直接当生产代码使用。
- 忽略依赖版本、编码、路径和时区差异。
- 只会写 happy path,没有补超时、重试和资源释放。
- 把 notebook 或脚本风格直接带入长期维护项目。
进阶路线
- 把类型注解、测试、打包和部署纳入统一工程流程。
- 继续向异步、性能、数据管线和框架源码层深入。
- 把常用脚本抽成可复用库或 CLI 工具,而不是复制粘贴。
- 继续补齐部署、打包、监控和性能调优能力。
适用场景
- 当你准备把《Python 数据处理管线》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合脚本自动化、数据处理、Web 开发和测试工具建设。
- 当需求强调快速迭代和丰富生态时,Python 往往能快速起步。
落地建议
- 统一使用虚拟环境与依赖锁定,避免环境漂移。
- 对核心函数补类型注解、异常处理和日志,减少“脚本黑盒”。
- 一旦脚本进入生产链路,及时补测试和监控。
排错清单
- 先确认当前解释器、虚拟环境和依赖版本是否正确。
- 检查编码、路径、时区和第三方库行为差异。
- 排查同步阻塞、数据库连接未释放或网络请求无超时。
复盘问题
- 如果把《Python 数据处理管线》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《Python 数据处理管线》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《Python 数据处理管线》最大的收益和代价分别是什么?
