Pandas 数据分析
大约 16 分钟约 4880 字
Pandas 数据分析
简介
Pandas 是 Python 最流行的数据分析库,提供 DataFrame 和 Series 两种核心数据结构,支持数据清洗、转换、聚合和可视化。是数据科学家和分析师的必备工具。
Pandas 的核心设计理念是"关系型数据操作"——它借鉴了 SQL 的表格操作思维,同时融合了 NumPy 的向量计算能力和 R 语言的 data.frame 设计。DataFrame 本质上是一个带有标签轴的二维数组,每一列是一个 Series,拥有统一的 dtype。这种设计使得 Pandas 在处理结构化数据时既灵活又高效。
从工程角度看,Pandas 是数据分析管道的核心环节:上游接收来自 CSV、数据库、API 的原始数据,下游将清洗后的数据输送给机器学习模型、可视化工具或报表系统。理解 Pandas 的内存模型和性能特征,对于构建稳定的数据处理流程至关重要。
特点
DataFrame 基础
创建和查看
import pandas as pd
import numpy as np
# 从字典创建
df = pd.DataFrame({
"name": ["张三", "李四", "王五", "赵六", "钱七"],
"age": [28, 35, 42, 25, 31],
"city": ["深圳", "北京", "上海", "深圳", "广州"],
"salary": [15000, 22000, 18000, 12000, 20000],
"department": ["技术", "产品", "技术", "设计", "技术"]
})
# 基本信息
print(df.head(3)) # 前3行
print(df.shape) # (5, 5)
print(df.dtypes) # 数据类型
print(df.describe()) # 统计摘要
print(df.info()) # 详细信息
# 选择数据
print(df["name"]) # 单列 Series
print(df[["name", "age"]]) # 多列
print(df.iloc[0:2]) # 按位置
print(df.loc[df["age"] > 30]) # 按条件深入理解 DataFrame 内存模型
import pandas as pd
import numpy as np
# DataFrame 的内存占用取决于 dtype
df = pd.DataFrame({
"id": range(100000),
"value": np.random.randn(100000),
"category": np.random.choice(["A", "B", "C"], 100000),
"flag": np.random.choice([True, False], 100000),
})
# 查看内存占用
print(df.memory_usage(deep=True))
print(f"总内存: {df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")
# 优化内存:使用更小的 dtype
df["id"] = df["id"].astype(np.int32) # int64 -> int32 节省一半
df["flag"] = df["flag"].astype(bool) # 确保使用 bool 类型
df["category"] = df["category"].astype("category") # 分类类型大幅节省
print(f"\n优化后总内存: {df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")
# 使用 info(memory_usage="deep") 查看详细内存信息
df.info(memory_usage="deep")创建 DataFrame 的多种方式
import pandas as pd
import numpy as np
# 方式一:从字典创建(最常用)
df1 = pd.DataFrame({
"name": ["Alice", "Bob"],
"age": [25, 30]
})
# 方式二:从列表的列表创建
df2 = pd.DataFrame(
[["Alice", 25], ["Bob", 30]],
columns=["name", "age"]
)
# 方式三:从 NumPy 数组创建
data = np.random.randn(5, 3)
df3 = pd.DataFrame(data, columns=["A", "B", "C"])
# 方式四:从 Series 字典创建
s1 = pd.Series([1, 2, 3], index=["a", "b", "c"])
s2 = pd.Series([4, 5, 6], index=["a", "b", "c"])
df4 = pd.DataFrame({"x": s1, "y": s2})
# 方式五:从字典列表创建(类似 JSON 记录)
records = [
{"name": "Alice", "age": 25, "city": "北京"},
{"name": "Bob", "age": 30, "city": "上海"},
{"name": "Charlie", "age": 35, "city": "深圳"},
]
df5 = pd.DataFrame(records)
# 方式六:从读取器直接创建
import io
csv_data = """name,age,city
Alice,25,北京
Bob,30,上海
Charlie,35,深圳"""
df6 = pd.read_csv(io.StringIO(csv_data))
# 方式七:设置索引
df7 = pd.DataFrame(
{"value": [10, 20, 30]},
index=pd.Index(["row1", "row2", "row3"], name="idx")
)数据筛选
# 条件筛选
tech_staff = df[df["department"] == "技术"]
senior = df[(df["age"] > 30) & (df["salary"] > 15000)]
high_salary = df[df["salary"].isin([15000, 22000])]
# 字符串筛选
name_zhang = df[df["name"].str.startswith("张")]
# 排序
df_sorted = df.sort_values("salary", ascending=False)
df_multi_sort = df.sort_values(["department", "salary"], ascending=[True, False])
# 去重
df_unique = df.drop_duplicates(subset=["department"])
# 添加计算列
df["annual_salary"] = df["salary"] * 12
df["age_group"] = pd.cut(df["age"], bins=[0, 25, 35, 50],
labels=["青年", "中年", "资深"])高级筛选与查询
import pandas as pd
import numpy as np
df = pd.DataFrame({
"name": ["张三", "李四", "王五", "赵六", "钱七"],
"age": [28, 35, 42, 25, 31],
"salary": [15000, 22000, 18000, 12000, 20000],
"department": ["技术", "产品", "技术", "设计", "技术"]
})
# query() 方法 —— 更可读的查询语法
result = df.query("age > 30 and salary > 15000")
print(result)
# 使用变量
min_age = 30
result = df.query("age > @min_age")
print(result)
# between() —— 范围筛选
result = df[df["age"].between(25, 35)]
print(result)
# nlargest / nsmallest —— 取 Top N
top_3 = df.nlargest(3, "salary")
print(top_3)
# sample() —— 随机采样
sampled = df.sample(n=3, random_state=42)
print(sampled)
# where() —— 条件保留(不满足条件的设为 NaN)
df_copy = df.copy()
df_copy["salary"] = df_copy["salary"].where(df_copy["salary"] > 15000, other=15000)
print(df_copy)
# mask() —— 与 where 相反
df_copy["salary"] = df_copy["salary"].mask(df_copy["salary"] > 18000, other=18000)
print(df_copy)loc vs iloc 完全指南
import pandas as pd
df = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
"city": ["北京", "上海", "深圳"]
}, index=["row_a", "row_b", "row_c"])
# loc:基于标签(标签索引)
print(df.loc["row_a"]) # 单行
print(df.loc["row_a", "name"]) # 单个值
print(df.loc[["row_a", "row_c"]]) # 多行
print(df.loc[:, "name"]) # 单列
print(df.loc["row_a":"row_c", "name":"age"]) # 切片(包含两端)
# iloc:基于位置(整数索引)
print(df.iloc[0]) # 第一行
print(df.iloc[0, 1]) # 第一行第二列
print(df.iloc[[0, 2]]) # 第1和第3行
print(df.iloc[:, 0]) # 第一列
print(df.iloc[0:2, 0:2]) # 切片(不包含右端)
# 布尔索引
print(df.loc[df["age"] > 28])
# 设置值
df.loc["row_a", "age"] = 26
df.iloc[1, 1] = 31
# 关键区别:loc 的切片包含两端,iloc 的切片不包含右端数据清洗
缺失值处理
# 创建含缺失值的数据
df = pd.DataFrame({
"A": [1, 2, np.nan, 4, 5],
"B": [10, np.nan, 30, np.nan, 50],
"C": ["x", "y", np.nan, "z", "w"]
})
# 检查缺失值
print(df.isnull().sum()) # 每列缺失数
print(df.isnull().any()) # 哪些列有缺失
# 处理缺失值
df_dropna = df.dropna() # 删除含缺失的行
df_fill = df.fillna({"A": 0, "B": df["B"].mean(), "C": "unknown"})
df_ffill = df.ffill() # 前向填充
# 替换值
df_replaced = df.replace({np.nan: -1})深入缺失值处理策略
import pandas as pd
import numpy as np
# 创建模拟数据
np.random.seed(42)
df = pd.DataFrame({
"user_id": range(1, 101),
"age": np.where(np.random.random(100) > 0.1, np.random.randint(18, 65, 100), np.nan),
"income": np.where(np.random.random(100) > 0.15, np.random.uniform(3000, 50000, 100), np.nan),
"score": np.where(np.random.random(100) > 0.05, np.random.uniform(0, 100, 100), np.nan),
})
# 1. 缺失值诊断报告
def missing_report(df: pd.DataFrame) -> pd.DataFrame:
"""生成缺失值诊断报告"""
report = pd.DataFrame({
"总行数": [len(df)] * len(df.columns),
"缺失数": df.isnull().sum().values,
"缺失比例": (df.isnull().sum() / len(df) * 100).values,
"数据类型": df.dtypes.values,
}, index=df.columns)
return report.sort_values("缺失比例", ascending=False)
print("缺失值诊断报告:")
print(missing_report(df))
# 2. 按列采用不同策略
df_cleaned = df.copy()
# 数值列:用中位数填充(比均值更抗异常值)
df_cleaned["age"] = df_cleaned["age"].fillna(df_cleaned["age"].median())
df_cleaned["income"] = df_cleaned["income"].fillna(df_cleaned["income"].median())
# 分数列:用列均值填充
df_cleaned["score"] = df_cleaned["score"].fillna(df_cleaned["score"].mean())
# 3. 分组填充 —— 用同组的中位数填充
df_grouped_fill = df.copy()
# 假设按年龄段分组填充收入
df_grouped_fill["income"] = df_grouped_fill.groupby(
pd.cut(df_grouped_fill["age"], bins=[0, 25, 35, 45, 100])
)["income"].transform(lambda x: x.fillna(x.median()))
# 4. 插值填充(适用于时间序列)
ts = pd.DataFrame({
"date": pd.date_range("2024-01-01", periods=20),
"value": [10, np.nan, 15, np.nan, np.nan, 20, 22, np.nan, 25, 28,
np.nan, 30, 35, np.nan, np.nan, 40, 42, np.nan, 45, 50]
})
ts["linear"] = ts["value"].interpolate(method="linear")
ts["time"] = ts["value"].interpolate(method="time")
print(ts)
# 5. 阈值删除 —— 缺失超过一定比例的行或列
threshold = 0.3 # 缺失超过 30% 则删除
df_filtered = df.dropna(thresh=int(len(df) * (1 - threshold)), axis=1)
print(f"\n删除高缺失列后: {df_filtered.columns.tolist()}")数据类型转换
# 类型转换
df = pd.DataFrame({
"price": ["12.5", "23.8", "45.0"],
"date": ["2024-01-15", "2024-02-20", "2024-03-10"],
"flag": ["1", "0", "1"]
})
df["price"] = df["price"].astype(float)
df["date"] = pd.to_datetime(df["date"])
df["flag"] = df["flag"].astype(bool)
# 日期操作
df["year"] = df["date"].dt.year
df["month"] = df["date"].dt.month
df["weekday"] = df["date"].dt.day_name()字符串处理
import pandas as pd
df = pd.DataFrame({
"name": [" Alice ", "bob SMITH", "Charlie Brown"],
"email": ["alice@example.com", "bob@test.org", "charlie@example.com"],
"phone": ["138-1234-5678", "139-8765-4321", "invalid"],
"address": ["北京市朝阳区", "上海市浦东新区", "深圳市南山区"]
})
# 基础字符串操作(通过 .str 访问器)
df["name_clean"] = df["name"].str.strip().str.title()
# "Alice", "Bob Smith", "Charlie Brown"
# 字符串匹配
df["is_gmail"] = df["email"].str.contains("example.com")
df["domain"] = df["email"].str.split("@").str.get(1)
# 正则表达式
df["phone_valid"] = df["phone"].str.match(r"\d{3}-\d{4}-\d{4}")
df["phone_clean"] = df["phone"].str.replace(r"[^\d]", "", regex=True)
# 字符串长度
df["name_length"] = df["name_clean"].str.len()
# 条件替换
df["category"] = df["address"].str.extract(r"(北京|上海|深圳)")
# 批量处理
df["email_upper"] = df["email"].str.upper()
df["initial"] = df["name_clean"].str[0]
print(df[["name_clean", "is_gmail", "domain", "phone_valid", "category"]])分组聚合
GroupBy 操作
# 销售数据
sales = pd.DataFrame({
"date": pd.date_range("2024-01-01", periods=100, freq="D"),
"product": np.random.choice(["A", "B", "C"], 100),
"region": np.random.choice(["华东", "华南", "华北"], 100),
"amount": np.random.randint(100, 1000, 100),
"quantity": np.random.randint(1, 20, 100)
})
# 基本分组
by_product = sales.groupby("product")["amount"].sum()
by_region = sales.groupby("region").agg({
"amount": ["sum", "mean", "count"],
"quantity": ["sum", "mean"]
})
# 多级分组
multi_group = sales.groupby(["region", "product"]).agg(
total_sales=("amount", "sum"),
avg_sales=("amount", "mean"),
order_count=("amount", "count")
).round(2)
print(multi_group)
# 透视表
pivot = pd.pivot_table(
sales,
values="amount",
index="region",
columns="product",
aggfunc="sum",
fill_value=0,
margins=True # 汇总行列
)
print(pivot)高级分组技巧
import pandas as pd
import numpy as np
# 创建示例数据
np.random.seed(42)
sales = pd.DataFrame({
"date": pd.date_range("2024-01-01", periods=365),
"product": np.random.choice(["手机", "电脑", "平板", "耳机"], 365),
"region": np.random.choice(["华东", "华南", "华北", "华西"], 365),
"amount": np.random.lognormal(mean=7, sigma=0.5, size=365).round(2),
"quantity": np.random.randint(1, 50, 365),
})
# 1. transform —— 保持原始形状的聚合
# 每个产品在其类别中的占比
sales["category_total"] = sales.groupby("product")["amount"].transform("sum")
sales["category_pct"] = sales["amount"] / sales["category_total"]
print(sales[["product", "amount", "category_pct"]].head(10))
# 2. filter —— 按组条件过滤
# 只保留总销售额超过 50 万的产品
high_sales = sales.groupby("product").filter(lambda x: x["amount"].sum() > 500000)
print(f"高销售额产品: {high_sales['product'].unique()}")
# 3. apply —— 自定义聚合函数
def top_n_per_group(group, n=3):
"""每组取销售额最高的 N 条记录"""
return group.nlargest(n, "amount")
top_records = sales.groupby("product").apply(top_n_per_group, n=3)
print(top_records)
# 4. 多重聚合并重命名
result = sales.groupby("region").agg(
总销售额=("amount", "sum"),
平均销售额=("amount", "mean"),
订单数量=("amount", "count"),
最高单笔=("amount", "max"),
总销量=("quantity", "sum"),
).round(2)
print(result)
# 5. 按时间分组
sales["month"] = sales["date"].dt.to_period("M")
monthly = sales.groupby("month")["amount"].sum()
print(monthly)
# 6. rolling + groupby —— 分组滚动统计
sales["rolling_avg"] = sales.groupby("product")["amount"].transform(
lambda x: x.rolling(window=7, min_periods=1).mean()
)数据合并与连接
Merge(类似 SQL JOIN)
import pandas as pd
# 员工表
employees = pd.DataFrame({
"emp_id": [1, 2, 3, 4, 5],
"name": ["张三", "李四", "王五", "赵六", "钱七"],
"dept_id": [101, 102, 101, 103, 102]
})
# 部门表
departments = pd.DataFrame({
"dept_id": [101, 102, 103, 104],
"dept_name": ["技术部", "产品部", "设计部", "市场部"],
"budget": [500000, 300000, 200000, 400000]
})
# 内连接(只保留匹配的行)
inner = pd.merge(employees, departments, on="dept_id", how="inner")
print("内连接:")
print(inner)
# 左连接(保留左表所有行)
left = pd.merge(employees, departments, on="dept_id", how="left")
print("\n左连接:")
print(left)
# 外连接(保留所有行)
outer = pd.merge(employees, departments, on="dept_id", how="outer")
print("\n外连接:")
print(outer)
# 多键连接
left_df = pd.DataFrame({
"key1": ["A", "B", "C"],
"key2": [1, 2, 1],
"value": [10, 20, 30]
})
right_df = pd.DataFrame({
"key1": ["A", "B", "A"],
"key2": [1, 2, 2],
"score": [100, 200, 300]
})
multi_merge = pd.merge(left_df, right_df, on=["key1", "key2"], how="left")
print("\n多键连接:")
print(multi_merge)Concat 与 Combine
import pandas as pd
# concat:纵向或横向拼接
df1 = pd.DataFrame({"A": [1, 2], "B": [3, 4]})
df2 = pd.DataFrame({"A": [5, 6], "B": [7, 8]})
# 纵向拼接
vertical = pd.concat([df1, df2], axis=0, ignore_index=True)
print("纵向拼接:")
print(vertical)
# 横向拼接
df3 = pd.DataFrame({"C": [9, 10], "D": [11, 12]})
horizontal = pd.concat([df1, df3], axis=1)
print("\n横向拼接:")
print(horizontal)
# 带标签的拼接
combined = pd.concat([df1, df2], keys=["表1", "表2"])
print("\n带标签:")
print(combined)
print(combined.loc["表1"])
# combine_first —— 用另一个 DataFrame 填充缺失值
df_a = pd.DataFrame({"A": [1, np.nan, 3], "B": [4, 5, np.nan]})
df_b = pd.DataFrame({"A": [10, 20, 30], "B": [40, 50, 60]})
filled = df_a.combine_first(df_b)
print("\ncombine_first:")
print(filled)
# A 列: [1, 20, 3] (NaN 被 df_b 的值填充)
# B 列: [4, 5, 60]数据 IO
读写文件
# CSV
df.to_csv("output.csv", index=False, encoding="utf-8-sig")
df = pd.read_csv("data.csv", encoding="utf-8")
# Excel(需要 openpyxl)
df.to_excel("output.xlsx", sheet_name="Sheet1", index=False)
df = pd.read_excel("data.xlsx", sheet_name="Sheet1")
# SQL(需要 sqlalchemy)
from sqlalchemy import create_engine
engine = create_engine("mssql+pyodbc://server/database?driver=ODBC+Driver+17+for+SQL+Server")
# df.to_sql("users", engine, if_exists="replace", index=False)
# df = pd.read_sql("SELECT * FROM users WHERE age > 25", engine)
# JSON
df.to_json("output.json", orient="records", force_ascii=False)
df = pd.read_json("data.json")
# Parquet(高性能列式存储)
df.to_parquet("output.parquet")
df = pd.read_parquet("data.parquet")高级 IO 技巧
import pandas as pd
import numpy as np
# 1. 分块读取大文件(避免内存溢出)
chunk_iter = pd.read_csv("large_file.csv", chunksize=10000)
results = []
for chunk in chunk_iter:
# 对每个分块进行处理
filtered = chunk[chunk["amount"] > 100]
results.append(filtered)
combined = pd.concat(results, ignore_index=True)
print(f"处理完成,共 {len(combined)} 条记录")
# 2. 指定列读取(减少内存使用)
df = pd.read_csv("data.csv", usecols=["name", "age", "salary"])
print(df.columns.tolist())
# 3. 指定数据类型(避免自动推断的开销)
dtypes = {
"id": np.int32,
"name": "category",
"amount": np.float32,
"flag": bool,
}
df = pd.read_csv("data.csv", dtype=dtypes)
print(df.dtypes)
# 4. 解析日期列(读取时直接转换)
df = pd.read_csv("data.csv", parse_dates=["date", "created_at"])
print(df.dtypes)
# 5. 处理 CSV 编码问题
# utf-8-sig 处理带 BOM 的文件
# latin1 作为 fallback
try:
df = pd.read_csv("data.csv", encoding="utf-8")
except UnicodeDecodeError:
df = pd.read_csv("data.csv", encoding="latin1")
# 6. Parquet 分区存储
df = pd.DataFrame({
"year": [2023, 2023, 2024, 2024],
"month": [1, 2, 1, 2],
"value": [10, 20, 30, 40]
})
df.to_parquet("partitioned_data", partition_cols=["year"])
# 7. Excel 多 Sheet 操作
with pd.ExcelWriter("multi_sheet.xlsx") as writer:
df1.to_excel(writer, sheet_name="摘要", index=False)
df2.to_excel(writer, sheet_name="明细", index=False)
# 读取指定 Sheet
sheet1 = pd.read_excel("multi_sheet.xlsx", sheet_name="摘要")
all_sheets = pd.read_excel("multi_sheet.xlsx", sheet_name=None) # 字典形式时间序列处理
import pandas as pd
import numpy as np
# 创建时间序列
dates = pd.date_range("2024-01-01", periods=365, freq="D")
ts = pd.Series(np.random.randn(365).cumsum(), index=dates)
# 重采样(改变频率)
monthly = ts.resample("M").mean() # 月均值
weekly = ts.resample("W").sum() # 周总和
quarterly = ts.resample("QE").ohlc() # 季度 OHLC
# 移动窗口
rolling_mean = ts.rolling(window=7).mean() # 7日移动平均
rolling_std = ts.rolling(window=7).std() # 7日移动标准差
exp_mean = ts.ewm(span=7).mean() # 指数加权移动平均
# 时间偏移
ts_shifted = ts.shift(1) # 前移1天
ts_diff = ts.diff(1) # 一阶差分
ts_pct = ts.pct_change(1) # 百分比变化
# 时间筛选
mask = (ts.index >= "2024-03-01") & (ts.index <= "2024-03-31")
march_data = ts[mask]
# 时区处理
ts_utc = ts.tz_localize("UTC")
ts_beijing = ts_utc.tz_convert("Asia/Shanghai")
print(f"UTC 时间: {ts_utc.index[0]}")
print(f"北京时间: {ts_beijing.index[0]}")性能优化
避免循环,使用向量化
import pandas as pd
import numpy as np
df = pd.DataFrame({
"price": np.random.uniform(1, 100, 100000),
"quantity": np.random.randint(1, 20, 100000),
"category": np.random.choice(["A", "B", "C", "D"], 100000),
})
# 慢:逐行循环
# def calc_total(row):
# if row["category"] == "A":
# return row["price"] * row["quantity"] * 0.9
# return row["price"] * row["quantity"]
# df["total"] = df.apply(calc_total, axis=1)
# 快:向量化操作
discount_map = {"A": 0.9, "B": 0.95, "C": 1.0, "D": 1.0}
df["total"] = df["price"] * df["quantity"] * df["category"].map(discount_map)
# eval() —— 复杂表达式的优化
df.eval("total_v2 = price * quantity * 1.1", inplace=True)
# query() —— 快速筛选
result = df.query("price > 50 and quantity > 10")使用 category 类型优化
import pandas as pd
df = pd.DataFrame({
"city": ["北京"] * 50000 + ["上海"] * 50000 + ["深圳"] * 30000,
"product": ["A", "B", "C", "D", "E"] * 26000,
"value": range(130000)
})
# 优化前
print(f"优化前内存: {df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")
# 优化后
df["city"] = df["city"].astype("category")
df["product"] = df["product"].astype("category")
print(f"优化后内存: {df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")
# category 类型不会降低计算速度,反而可能提升分组聚合速度
result = df.groupby("city")["value"].agg(["sum", "mean", "count"])实用模式
模式一:数据清洗管道
import pandas as pd
import numpy as np
def clean_data_pipeline(
df: pd.DataFrame,
drop_threshold: float = 0.5,
dedup_cols: list | None = None
) -> pd.DataFrame:
"""通用数据清洗管道
Args:
df: 原始数据
drop_threshold: 列缺失比例超过此值则删除
dedup_cols: 去重的列名列表
Returns:
清洗后的 DataFrame
"""
print(f"原始数据: {df.shape}")
# 1. 删除高缺失列
missing_pct = df.isnull().sum() / len(df)
cols_to_drop = missing_pct[missing_pct > drop_threshold].index.tolist()
if cols_to_drop:
df = df.drop(columns=cols_to_drop)
print(f"删除高缺失列: {cols_to_drop}")
# 2. 去重
if dedup_cols:
before = len(df)
df = df.drop_duplicates(subset=dedup_cols)
print(f"去重: {before} -> {len(df)}")
# 3. 去除全为空值的行
df = df.dropna(how="all")
# 4. 字符串列统一清理
str_cols = df.select_dtypes(include=["object"]).columns
for col in str_cols:
df[col] = df[col].str.strip()
# 5. 数值列填充中位数
num_cols = df.select_dtypes(include=[np.number]).columns
for col in num_cols:
median = df[col].median()
if pd.isna(median):
median = 0
df[col] = df[col].fillna(median)
print(f"清洗后数据: {df.shape}")
return df.reset_index(drop=True)
# 使用示例
raw = pd.DataFrame({
"id": [1, 2, 2, 3, 4],
"name": [" Alice", "Bob", "Bob", "Charlie", " David"],
"score": [85, np.nan, np.nan, 92, 78],
"empty_col": [np.nan] * 5,
})
cleaned = clean_data_pipeline(raw, dedup_cols=["id"])模式二:多表关联分析
import pandas as pd
import numpy as np
# 模拟业务数据
orders = pd.DataFrame({
"order_id": range(1001, 1101),
"customer_id": np.random.randint(1, 21, 100),
"product_id": np.random.randint(1, 11, 100),
"amount": np.random.lognormal(6, 0.8, 100).round(2),
"order_date": pd.date_range("2024-01-01", periods=100, freq="D"),
})
customers = pd.DataFrame({
"customer_id": range(1, 21),
"name": [f"客户{i}" for i in range(1, 21)],
"city": np.random.choice(["北京", "上海", "深圳", "广州"], 20),
"segment": np.random.choice(["VIP", "普通", "新客"], 20),
})
products = pd.DataFrame({
"product_id": range(1, 11),
"product_name": [f"产品{i}" for i in range(1, 11)],
"category": np.random.choice(["电子产品", "家居", "食品"], 10),
})
# 三表关联
full_data = orders.merge(customers, on="customer_id", how="left")
full_data = full_data.merge(products, on="product_id", how="left")
# 客户分析
customer_analysis = full_data.groupby(["segment", "city"]).agg(
客户数=("customer_id", "nunique"),
总消费=("amount", "sum"),
平均客单价=("amount", "mean"),
订单数=("order_id", "count"),
).round(2)
print("客户分群分析:")
print(customer_analysis)
# 产品分析
product_analysis = full_data.groupby("category").agg(
总销售额=("amount", "sum"),
订单量=("order_id", "count"),
热门产品=("product_name", lambda x: x.mode().iloc[0] if len(x.mode()) > 0 else "N/A"),
).round(2)
print("\n产品品类分析:")
print(product_analysis)优点
缺点
总结
Pandas 核心数据结构:DataFrame(二维表格)和 Series(一维数组)。数据筛选用布尔索引和 query。清洗用 dropna/fillna/replace。聚合用 groupby + agg。透视表用 pivot_table。IO 支持 CSV/Excel/SQL/JSON。大数据场景(>1GB)考虑使用 Polars 替代。实际分析流程:读取数据 -> 查看信息 -> 清洗处理 -> 分组聚合 -> 导出结果。
关键知识点
- loc 基于标签索引(包含切片右端),iloc 基于位置索引(不包含切片右端)
- category 类型可以大幅降低字符串列的内存占用
- SettingWithCopyWarning 是因为对视图做了修改,用 .copy() 解决
- groupby 后的 transform 保持原始形状,agg 改变形状
- 分块读取大文件用 read_csv(chunksize=N)
项目落地视角
- 对 DataFrame 操作函数编写单元测试,验证列名、类型和空值处理
- 大数据处理使用分块读取或 Dask/Polars 替代
- 数据清洗管道模块化,便于复用和测试
- 统一项目中的编码(utf-8)、日期格式和时区处理
常见误区
- 把临时脚本直接当生产代码使用。
- 忽略依赖版本、编码、路径和时区差异。
- 只会写 happy path,没有补超时、重试和资源释放。
- 把 notebook 或脚本风格直接带入长期维护项目。
- 使用链式索引 df[df["x"] > 0]["y"] = 1 导致 SettingWithCopyWarning
- 在循环中使用 df.append()(已废弃),应使用 pd.concat()
- 忽视 DataFrame 的内存占用,处理大数据时未使用分块或优化 dtype
进阶路线
- 学习 Polars 作为高性能 Pandas 替代品
- 掌握 Dask 实现分布式 DataFrame 计算
- 深入理解 Pandas 内部的 BlockManager 内存管理
- 学习 Modin/Ray 实现多核并行 Pandas
适用场景
- 当你准备把《Pandas 数据分析》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合脚本自动化、数据处理、Web 开发和测试工具建设。
- 当需求强调快速迭代和丰富生态时,Python 往往能快速起步。
- 数据清洗、报表生成、特征工程、数据探索性分析等场景。
落地建议
- 统一使用虚拟环境与依赖锁定,避免环境漂移。
- 对核心函数补类型注解、异常处理和日志,减少"脚本黑盒"。
- 一旦脚本进入生产链路,及时补测试和监控。
- 数据清洗逻辑封装为可测试的函数,避免内联的复杂链式操作。
排错清单
- 先确认当前解释器、虚拟环境和依赖版本是否正确。
- 检查编码、路径、时区和第三方库行为差异。
- 排查同步阻塞、数据库连接未释放或网络请求无超时。
- 检查是否有 SettingWithCopyWarning,用 .copy() 或 .loc[] 修复
- 确认日期解析是否正确,时区是否统一
复盘问题
- 如果把《Pandas 数据分析》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《Pandas 数据分析》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《Pandas 数据分析》最大的收益和代价分别是什么?
