AI 数据预处理
大约 11 分钟约 3152 字
AI 数据预处理
简介
数据预处理通常决定了模型训练的上限和下限:原始数据脏、标签乱、字段不一致、切分泄漏,再复杂的模型也很难救回来。AI 数据预处理不只是"清洗一下数据",而是把采集、清洗、标准化、增强、切分和特征准备做成一条可复现的数据管线。
数据预处理在 AI 项目中的重要性可以用"Garbage In, Garbage Out"来概括。统计表明,数据科学家和 ML 工程师通常花费 60-80% 的时间在数据准备上。这不仅仅是因为数据清洗本身耗时,更是因为数据质量直接影响模型的泛化能力、线上稳定性和业务可靠性。一个设计良好的数据管线应该具备:可复现性(每次运行结果一致)、可审计性(知道每步做了什么)、可回滚性(能回到之前的版本)和可扩展性(适应数据规模增长)。
特点
实现
结构化数据清洗与标准化
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder, MinMaxScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
# 模拟原始数据
raw_df = pd.DataFrame({
"age": [25, 32, None, 29, 300],
"income": [8000, 12000, 10000, None, 15000],
"city": ["上海", "北京", "北京", None, "上海"],
"label": [1, 0, 1, 0, 1]
})
# 过滤明显异常值
clean_df = raw_df[(raw_df["age"].isna()) | ((raw_df["age"] >= 18) & (raw_df["age"] <= 80))].copy()
print(clean_df)数据质量检查
class DataQualityChecker:
"""自动化数据质量检查工具"""
def __init__(self, df: pd.DataFrame):
self.df = df
self.report = {}
def check_missing(self, threshold=0.3):
"""检查缺失值比例"""
missing = self.df.isnull().mean()
problematic = missing[missing > threshold].to_dict()
self.report["missing_ratio"] = missing.to_dict()
self.report["high_missing_cols"] = problematic
return problematic
def check_duplicates(self):
"""检查重复行"""
dup_count = self.df.duplicated().sum()
self.report["duplicate_rows"] = int(dup_count)
return dup_count
def check_distribution(self, column):
"""检查数值列的分布异常"""
col = self.df[column].dropna()
stats = {
"mean": float(col.mean()),
"std": float(col.std()),
"min": float(col.min()),
"max": float(col.max()),
"median": float(col.median()),
"skewness": float(col.skew()),
"kurtosis": float(col.kurtosis()),
}
# 简单异常值检测(IQR 方法)
q1, q3 = col.quantile(0.25), col.quantile(0.75)
iqr = q3 - q1
outliers = ((col < q1 - 1.5 * iqr) | (col > q3 + 1.5 * iqr)).sum()
stats["outlier_count"] = int(outliers)
self.report[f"distribution_{column}"] = stats
return stats
def check_label_balance(self, label_col):
"""检查标签分布均衡性"""
dist = self.df[label_col].value_counts(normalize=True).to_dict()
self.report["label_distribution"] = dist
return dist
def summary(self):
"""生成质量报告"""
return self.report
checker = DataQualityChecker(pd.DataFrame({
"age": [25, 32, None, 29, 300, 28, 31, None, 27, 26],
"label": [1, 0, 1, 0, 1, 1, 1, 0, 0, 0]
}))
checker.check_missing()
checker.check_duplicates()
checker.check_distribution("age")
checker.check_label_balance("label")
print(checker.summary())Pipeline 构建
X = clean_df[["age", "income", "city"]]
y = clean_df["label"]
numeric_features = ["age", "income"]
categorical_features = ["city"]
numeric_pipeline = Pipeline([
("imputer", SimpleImputer(strategy="median")),
("scaler", StandardScaler())
])
categorical_pipeline = Pipeline([
("imputer", SimpleImputer(strategy="most_frequent")),
("encoder", OneHotEncoder(handle_unknown="ignore"))
])
preprocessor = ColumnTransformer([
("num", numeric_pipeline, numeric_features),
("cat", categorical_pipeline, categorical_features)
])
X_processed = preprocessor.fit_transform(X)
print(X_processed.shape)不同缩放策略对比
import numpy as np
data = np.array([[100], [200], [300], [400], [500]])
# StandardScaler: 均值0,方差1(适合正态分布)
from sklearn.preprocessing import StandardScaler
standard = StandardScaler().fit_transform(data)
print(f"StandardScaler: {standard.flatten().round(3)}")
# MinMaxScaler: 缩放到[0,1](适合有明确边界的特征)
from sklearn.preprocessing import MinMaxScaler
minmax = MinMaxScaler().fit_transform(data)
print(f"MinMaxScaler: {minmax.flatten().round(3)}")
# RobustScaler: 使用中位数和四分位数(适合有异常值的数据)
from sklearn.preprocessing import RobustScaler
robust = RobustScaler().fit_transform(data)
print(f"RobustScaler: {robust.flatten().round(3)}")
print("""
选择建议:
- StandardScaler:大多数机器学习模型的默认选择
- MinMaxScaler:神经网络、图像像素值、明确有界的特征
- RobustScaler:数据中有大量异常值时
- MaxAbsScaler:稀疏数据(保留零值)
""")# 去重与时间字段规范化
orders = pd.DataFrame([
{"order_id": 1, "user_id": 101, "created_at": "2026/04/01 10:00:00"},
{"order_id": 1, "user_id": 101, "created_at": "2026/04/01 10:00:00"},
{"order_id": 2, "user_id": 102, "created_at": "2026-04-02T11:30:00"}
])
orders = orders.drop_duplicates(subset=["order_id"])
orders["created_at"] = pd.to_datetime(orders["created_at"], errors="coerce")
orders["weekday"] = orders["created_at"].dt.weekday
orders["hour"] = orders["created_at"].dt.hour
print(orders)# 标签映射统一
label_map = {
"positive": 1,
"negative": 0,
"spam": 2
}
text_df = pd.DataFrame({
"text": ["很好用", "太差了", "点击领取优惠"],
"label": ["positive", "negative", "spam"]
})
text_df["label_id"] = text_df["label"].map(label_map)
print(text_df)文本与图像数据预处理
import re
import jieba
stopwords = {"的", "了", "是", "在"}
def normalize_text(text: str) -> str:
text = text.lower().strip()
text = re.sub(r"http\S+", " ", text)
text = re.sub(r"[^\u4e00-\u9fa5a-z0-9 ]+", " ", text)
tokens = [w for w in jieba.cut(text) if w not in stopwords and w.strip()]
return " ".join(tokens)
samples = [
"这个产品真的很好用!!!",
"点击 http://example.com 领取优惠",
"客服处理速度一般。"
]
normalized = [normalize_text(s) for s in samples]
print(normalized)高级文本清洗
class TextPreprocessor:
"""生产级文本预处理器"""
def __init__(self, language='zh', remove_stopwords=True, min_token_length=1):
self.language = language
self.remove_stopwords = remove_stopwords
self.min_token_length = min_token_length
self.stopwords = {'的', '了', '是', '在', '我', '有', '和', '就', '不', '人',
'都', '一', '一个', '上', '也', '很', '到', '说', '要', '去',
'你', '会', '着', '没有', '看', '好', '自己', '这'} if language == 'zh' else set()
def clean(self, text: str) -> str:
if not text:
return ""
# 去除特殊字符和多余空白
text = re.sub(r'[^\w\s\u4e00-\u9fff]', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def tokenize(self, text: str) -> list[str]:
text = self.clean(text)
if self.language == 'zh':
tokens = jieba.lcut(text)
else:
tokens = text.split()
if self.remove_stopwords:
tokens = [t for t in tokens if t not in self.stopwords]
tokens = [t for t in tokens if len(t) >= self.min_token_length]
return tokens
def preprocess(self, text: str) -> str:
return ' '.join(self.tokenize(text))
processor = TextPreprocessor()
print(processor.preprocess("这个产品真的很好用!推荐购买!"))
print(processor.preprocess("Click https://example.com for details"))from PIL import Image
from torchvision import transforms
train_transform = transforms.Compose([
transforms.Resize((256, 256)),
transforms.RandomHorizontalFlip(p=0.5),
transforms.RandomRotation(10),
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
val_transform = transforms.Compose([
transforms.Resize((256, 256)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
image = Image.new("RGB", (512, 512), color="white")
train_tensor = train_transform(image)
print(train_tensor.shape)# 数据增强要区分训练和验证/测试集
# 训练集:可以随机翻转、裁剪、颜色扰动
# 验证/测试集:只做确定性 resize / normalize数据集切分与防泄漏
from sklearn.model_selection import train_test_split
records = pd.DataFrame({
"user_id": [1, 1, 2, 2, 3, 3, 4, 4],
"feature": [10, 11, 12, 14, 9, 8, 15, 16],
"label": [1, 1, 0, 0, 1, 1, 0, 0]
})
# 简单分层切分
train_df, temp_df = train_test_split(records, test_size=0.4, stratify=records["label"], random_state=42)
valid_df, test_df = train_test_split(temp_df, test_size=0.5, stratify=temp_df["label"], random_state=42)
print("train:", len(train_df), "valid:", len(valid_df), "test:", len(test_df))# 按用户分组切分,避免用户级泄漏
from sklearn.model_selection import GroupShuffleSplit
splitter = GroupShuffleSplit(test_size=0.2, n_splits=1, random_state=42)
train_idx, test_idx = next(splitter.split(records, groups=records["user_id"]))
train_group_df = records.iloc[train_idx]
test_group_df = records.iloc[test_idx]
print("train users:", sorted(train_group_df["user_id"].unique()))
print("test users:", sorted(test_group_df["user_id"].unique()))# 时间序列切分(按时间而非随机)
time_df = pd.DataFrame({
"date": pd.date_range("2026-01-01", periods=100),
"value": np.random.randn(100),
"label": np.random.randint(0, 2, 100)
})
# 按时间切分:前70%训练,中间15%验证,后15%测试
n = len(time_df)
train_end = int(n * 0.7)
val_end = int(n * 0.85)
train_time = time_df.iloc[:train_end]
val_time = time_df.iloc[train_end:val_end]
test_time = time_df.iloc[val_end:]
print(f"时间切分: train={len(train_time)}, val={len(val_time)}, test={len(test_time)}")
print("注意:时间序列不能随机打乱,否则会造成未来信息泄漏")# 保存预处理产物与元数据,保证训练/推理一致
import joblib
joblib.dump(preprocessor, "artifacts/preprocessor.joblib")
metadata = {
"label_map": label_map,
"numeric_features": numeric_features,
"categorical_features": categorical_features,
"version": "2026-04-12"
}
print(metadata)数据增强策略
import random
class TextAugmenter:
"""文本数据增强器"""
def __init__(self):
self.synonym_dict = {
"好": ["优秀", "棒", "不错"],
"差": ["糟糕", "不好", "不行"],
"快速": ["迅速", "及时", "高效"],
}
def synonym_replacement(self, text: str, n: int = 1) -> str:
"""同义词替换"""
words = list(jieba.cut(text))
augmented = words.copy()
replaceable = [w for w in words if w in self.synonym_dict]
for _ in range(min(n, len(replaceable))):
word = random.choice(replaceable)
synonym = random.choice(self.synonym_dict[word])
idx = augmented.index(word)
augmented[idx] = synonym
return ''.join(augmented)
def random_deletion(self, text: str, p: float = 0.1) -> str:
"""随机删除词"""
words = list(jieba.cut(text))
if len(words) <= 1:
return text
augmented = [w for w in words if random.random() > p]
return ''.join(augmented) if augmented else random.choice(words)
def random_swap(self, text: str, n: int = 1) -> str:
"""随机交换相邻词"""
words = list(jieba.cut(text))
for _ in range(n):
if len(words) < 2:
break
idx = random.randint(0, len(words) - 2)
words[idx], words[idx + 1] = words[idx + 1], words[idx]
return ''.join(words)
augmenter = TextAugmenter()
text = "这个产品质量很好,发货速度很快"
print(f"原始: {text}")
print(f"同义词替换: {augmenter.synonym_replacement(text)}")
print(f"随机删除: {augmenter.random_deletion(text)}")
print(f"随机交换: {augmenter.random_swap(text)}")数据管线序列化与复现
class DataPipeline:
"""
数据管线:保证训练和推理使用完全一致的处理逻辑
"""
def __init__(self, name, version):
self.name = name
self.version = version
self.steps = []
self.fitted_artifacts = {}
def add_step(self, name, transform_fn):
"""添加处理步骤"""
self.steps.append({"name": name, "fn": transform_fn})
return self
def fit_transform(self, data):
"""拟合并转换(训练阶段)"""
result = data.copy() if hasattr(data, 'copy') else data
for step in self.steps:
result, artifact = step["fn"](result, fit=True)
self.fitted_artifacts[step["name"]] = artifact
return result
def transform(self, data):
"""只转换(推理阶段)"""
result = data.copy() if hasattr(data, 'copy') else data
for step in self.steps:
artifact = self.fitted_artifacts.get(step["name"])
result, _ = step["fn"](result, fit=False, artifact=artifact)
return result
def save(self, path):
"""保存管线配置和拟合产物"""
import json, joblib
config = {"name": self.name, "version": self.version, "steps": [s["name"] for s in self.steps]}
json.dump(config, open(f"{path}/config.json", "w"), ensure_ascii=False, indent=2)
joblib.dump(self.fitted_artifacts, f"{path}/artifacts.joblib")
print("数据管线框架已定义——核心思想:训练和推理必须使用同一个 pipeline 实例")优点
缺点
总结
数据预处理不是辅助步骤,而是 AI 项目最核心的基础设施之一。无论是结构化数据、文本还是图像,只有把清洗、标准化、切分和增强流程做成可复现、可审计、可回放的流水线,模型结果才真正具备可解释性和可落地性。
关键知识点
- 先治理数据质量,再谈模型结构优化。
- 训练/验证/测试切分必须防止样本泄漏。
- 增强策略只应作用于训练集,不应污染评估集。
- 训练与推理必须复用同一套预处理规则。
- 数据管线应该纳入版本管理,与模型版本联动。
项目落地视角
- 推荐系统要重点处理缺失值、异常点击和时间字段。
- NLP 项目要统一文本清洗、分词和标签映射规则。
- 视觉项目要区分训练增强与验证标准化流程。
- 生产上线前必须固化 preprocessing artifact 与版本号。
常见误区
- 数据切分后才做归一化之外的统计,结果把测试集信息泄漏进训练。
- 只看整体准确率,不看脏标签和异常样本比例。
- 训练和推理使用了两套不同清洗逻辑。
- 图像增强太重,生成了脱离真实业务场景的样本。
- 时间序列数据随机打乱,造成未来信息泄漏。
进阶路线
- 使用 Great Expectations / Pandera 做数据质量校验。
- 引入 Feature Store 管理特征一致性。
- 建立数据漂移检测和标签漂移监控。
- 为多模态项目建立统一 preprocessing pipeline。
适用场景
- 机器学习结构化数据建模。
- NLP 文本分类、实体识别、检索任务。
- 计算机视觉分类、检测、分割训练。
- 生产级模型训练与推理服务。
落地建议
- 每个数据集都建立字段说明、标签规范和清洗规则文档。
- 固定 random seed 与 split 方案,保证实验可比。
- 把 preprocessing 也纳入版本管理,而不只是模型权重。
- 为异常值、缺失值、类别爆炸等问题建立自动检查脚本。
排错清单
- 检查是否存在重复样本、异常值和标签映射错误。
- 检查切分方式是否导致用户级、时间级数据泄漏。
- 检查训练和推理是否使用同一套 scaler / encoder / tokenizer。
- 检查增强操作是否改变了标签语义或破坏分布。
复盘问题
- 你现在的模型问题,究竟有多少来自模型本身,又有多少来自数据?
- 数据集切分是否真正反映了线上场景?
- 哪些预处理规则应该固化成流水线,哪些应该保留可实验空间?
- 如果明天重新训练一次,你能否得到一致的输入分布和结果趋势?
