数据仓库基础
大约 18 分钟约 5379 字
数据仓库基础
简介
数据仓库(Data Warehouse)面向的是分析型查询,而不是高并发事务写入。它通常通过主题建模、事实表/维度表、ETL/ELT 流程和分层存储,把业务系统中的分散数据整理成可统计、可追溯、可复盘的分析数据资产。
数据仓库的核心思想是"把数据从业务系统中分离出来,按照分析需求重新组织"。它不追求实时性和高并发写入能力,而是追求在海量数据上快速完成复杂的多维分析查询。一个设计良好的数据仓库,能让业务分析师、管理者和数据科学家自助地完成数据分析,而不需要每次都找开发团队写临时查询。
特点
核心概念
OLTP vs OLAP
OLTP(联机事务处理) OLAP(联机分析处理)
───────────────────── ─────────────────────
面向操作 面向分析
高并发读写 高吞吐查询
数据规范到 3NF 数据可能适度反范式
少量数据操作 大量数据聚合
响应时间毫秒级 响应时间秒到分钟级
MySQL, PostgreSQL, SQL Server ClickHouse, Snowflake, BigQuery数据仓库分层架构
┌─────────────────────────────────────────────┐
│ ADS (Application Data Store) - 应用层 │ → 报表、看板、API 直接查询
│ dws_sales_monthly, ads_top_products │
├─────────────────────────────────────────────┤
│ DWS (Data Warehouse Summary) - 汇总层 │ → 按主题汇总的宽表
│ dws_sales_daily, dws_user_active_monthly │
├─────────────────────────────────────────────┤
│ DWD (Data Warehouse Detail) - 明细层 │ → 清洗后的标准化明细数据
│ dwd_orders, dwd_payments, dwd_shipments │
├─────────────────────────────────────────────┤
│ ODS (Operational Data Store) - 贴源层 │ → 原始业务数据副本
│ ods_orders, ods_products, ods_users │
└─────────────────────────────────────────────┘
↑ ETL/ELT 从业务系统抽取
┌─────────────────────────┐
│ 业务系统 (OLTP) │
│ 订单库、用户库、库存库 │
└─────────────────────────┘实现
星型模型与维度建模
维度表设计
-- 维度表:日期维度(预填充未来 10 年 + 历史 5 年)
CREATE TABLE dim_date (
date_key INT PRIMARY KEY, -- 格式: 20240315
full_date DATE NOT NULL,
year_num INT NOT NULL, -- 2024
quarter_num INT NOT NULL, -- 1-4
month_num INT NOT NULL, -- 1-12
month_name VARCHAR(20) NOT NULL, -- 'March'
month_name_cn VARCHAR(10) NOT NULL, -- '三月'
week_of_year INT NOT NULL, -- 1-52
day_of_month INT NOT NULL, -- 1-31
day_of_week INT NOT NULL, -- 1-7 (1=Monday)
is_weekend BOOLEAN NOT NULL,
is_holiday BOOLEAN NOT NULL DEFAULT FALSE,
fiscal_year INT NOT NULL, -- 财年
fiscal_quarter INT NOT NULL -- 财季
);
-- 维度表:商品维度(支持 SCD Type 2)
CREATE TABLE dim_product (
product_key BIGSERIAL PRIMARY KEY,
product_id BIGINT NOT NULL, -- 业务系统中的商品 ID
product_name VARCHAR(100) NOT NULL,
category_l1 VARCHAR(50) NOT NULL, -- 一级分类
category_l2 VARCHAR(50), -- 二级分类
brand_name VARCHAR(50),
supplier_name VARCHAR(100),
price NUMERIC(18,2),
cost NUMERIC(18,2),
status VARCHAR(20) NOT NULL, -- 'active', 'inactive', 'discontinued'
effective_from TIMESTAMP NOT NULL,
effective_to TIMESTAMP, -- NULL 表示当前有效
is_current BOOLEAN NOT NULL DEFAULT TRUE,
UNIQUE (product_id, effective_from)
);
-- 维度表:客户维度
CREATE TABLE dim_customer (
customer_key BIGSERIAL PRIMARY KEY,
customer_id BIGINT NOT NULL,
customer_name VARCHAR(100) NOT NULL,
gender VARCHAR(10),
age_group VARCHAR(20), -- '18-24', '25-34', '35-44'...
city VARCHAR(50),
province VARCHAR(50),
registration_date DATE,
member_level VARCHAR(20), -- 'bronze', 'silver', 'gold', 'platinum'
effective_from TIMESTAMP NOT NULL,
effective_to TIMESTAMP,
is_current BOOLEAN NOT NULL DEFAULT TRUE,
UNIQUE (customer_id, effective_from)
);
-- 维度表:门店维度
CREATE TABLE dim_store (
store_key BIGSERIAL PRIMARY KEY,
store_id BIGINT NOT NULL,
store_name VARCHAR(100) NOT NULL,
region VARCHAR(50), -- '华东', '华南', '华北'
city VARCHAR(50),
store_type VARCHAR(20), -- 'flagship', 'standard', 'online'
opening_date DATE,
is_current BOOLEAN NOT NULL DEFAULT TRUE
);事实表设计
-- 事实表:销售事实(粒度为每笔订单的每个商品行)
CREATE TABLE fact_sales (
sales_key BIGSERIAL PRIMARY KEY,
date_key INT NOT NULL, -- 关联 dim_date
product_key BIGINT NOT NULL, -- 关联 dim_product
customer_key BIGINT NOT NULL, -- 关联 dim_customer
store_key BIGINT NOT NULL, -- 关联 dim_store
order_id BIGINT NOT NULL, -- 业务订单号
line_number INT NOT NULL, -- 订单行号
quantity INT NOT NULL,
unit_price NUMERIC(18,2) NOT NULL,
amount NUMERIC(18,2) NOT NULL, -- quantity * unit_price
discount_amount NUMERIC(18,2) NOT NULL DEFAULT 0,
cost_amount NUMERIC(18,2), -- 成本金额(利润分析)
profit_amount NUMERIC(18,2) GENERATED ALWAYS AS
(amount - discount_amount - cost_amount) STORED,
is_returned BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP NOT NULL,
FOREIGN KEY (date_key) REFERENCES dim_date(date_key),
FOREIGN KEY (product_key) REFERENCES dim_product(product_key),
FOREIGN KEY (customer_key) REFERENCES dim_customer(customer_key),
FOREIGN KEY (store_key) REFERENCES dim_store(store_key)
);
-- 为事实表创建索引
CREATE INDEX idx_fact_sales_date ON fact_sales(date_key);
CREATE INDEX idx_fact_sales_product ON fact_sales(product_key);
CREATE INDEX idx_fact_sales_customer ON fact_sales(customer_key);
CREATE INDEX idx_fact_sales_store ON fact_sales(store_key);
CREATE INDEX idx_fact_sales_dt_amount ON fact_sales(date_key, amount);
-- 事实表:库存快照事实(每日库存水平)
CREATE TABLE fact_inventory_snapshot (
snapshot_key BIGSERIAL PRIMARY KEY,
date_key INT NOT NULL,
product_key BIGINT NOT NULL,
store_key BIGINT NOT NULL,
quantity_on_hand INT NOT NULL,
quantity_reserved INT NOT NULL DEFAULT 0,
quantity_available INT GENERATED ALWAYS AS
(quantity_on_hand - quantity_reserved) STORED,
unit_cost NUMERIC(18,2),
total_value NUMERIC(18,2) GENERATED ALWAYS AS
(quantity_on_hand * unit_cost) STORED,
FOREIGN KEY (date_key) REFERENCES dim_date(date_key),
FOREIGN KEY (product_key) REFERENCES dim_product(product_key),
FOREIGN KEY (store_key) REFERENCES dim_store(store_key)
);星型模型查询示例
-- 按月、分类统计销售额和数量
SELECT
d.year_num,
d.month_num,
d.month_name_cn,
p.category_l1,
SUM(f.amount - f.discount_amount) AS net_revenue,
SUM(f.quantity) AS total_qty,
SUM(f.profit_amount) AS total_profit,
ROUND(SUM(f.profit_amount) / NULLIF(SUM(f.amount - f.discount_amount), 0) * 100, 2)
AS profit_margin_pct
FROM fact_sales f
JOIN dim_date d ON f.date_key = d.date_key
JOIN dim_product p ON f.product_key = p.product_key
WHERE f.is_returned = FALSE
AND d.year_num = 2024
GROUP BY d.year_num, d.month_num, d.month_name_cn, p.category_l1
ORDER BY d.month_num, net_revenue DESC;
-- 按客户等级统计消费情况
SELECT
c.member_level,
COUNT(DISTINCT f.customer_key) AS customer_count,
COUNT(*) AS order_lines,
SUM(f.amount) AS total_revenue,
AVG(f.amount) AS avg_order_value,
MAX(f.amount) AS max_single_order
FROM fact_sales f
JOIN dim_customer c ON f.customer_key = c.customer_key
WHERE c.is_current = TRUE
AND f.is_returned = FALSE
GROUP BY c.member_level
ORDER BY total_revenue DESC;
-- 按区域和门店类型分析销售
SELECT
s.region,
s.store_type,
COUNT(DISTINCT f.store_key) AS store_count,
SUM(f.amount) AS total_revenue,
SUM(f.quantity) AS total_qty,
SUM(f.profit_amount) AS total_profit
FROM fact_sales f
JOIN dim_store s ON f.store_key = s.store_key
WHERE s.is_current = TRUE
AND f.is_returned = FALSE
AND f.date_key BETWEEN 20240101 AND 20241231
GROUP BY s.region, s.store_type
ORDER BY s.region, total_revenue DESC;模型对比
星型模型(Star Schema)
- 事实表在中心,维度表围绕事实表展开
- 维度表是扁平的(反范式化)
- 优点:查询简单(通常只需 1 次 JOIN),BI 工具友好,性能好
- 缺点:维度可能有冗余字段
- 适用:大多数数仓场景,特别是 BI 报表
雪花模型(Snowflake Schema)
- 维度继续拆分层级,如品类、品牌、区域再独立成表
- 维度表是规范化的(3NF)
- 优点:规范化高,冗余更少,维度维护更灵活
- 缺点:JOIN 更多(通常 3-5 次),建模和查询更复杂
- 适用:维度层级深且变化频繁的场景
星座模型(Constellation Schema / Fact Constellation)
- 多个事实表共享维度表
- 优点:支持跨事实表的统一分析
- 缺点:模型更复杂,需要更多 JOIN
- 适用:大型企业数仓,多个业务主题需要统一分析ETL / ELT 与数仓分层
ODS 层(贴源层)
-- ODS 层:保留原始业务数据副本,不做任何修改
CREATE TABLE ods_orders (
order_id BIGINT,
customer_id BIGINT,
product_id BIGINT,
order_status VARCHAR(20),
order_amount NUMERIC(18,2),
discount_amount NUMERIC(18,2),
payment_method VARCHAR(30),
created_at TIMESTAMP,
updated_at TIMESTAMP,
-- ETL 元数据
etl_batch_id VARCHAR(50) NOT NULL, -- 批次号
etl_loaded_at TIMESTAMP DEFAULT NOW(), -- 加载时间
etl_source VARCHAR(50) NOT NULL -- 数据来源
);
-- ODS 层增量加载策略
-- 方案 1:基于时间戳增量抽取
INSERT INTO ods_orders (
order_id, customer_id, product_id, order_status,
order_amount, discount_amount, payment_method,
created_at, updated_at, etl_batch_id, etl_source
)
SELECT
order_id, customer_id, product_id, order_status,
order_amount, discount_amount, payment_method,
created_at, updated_at, @batch_id, 'order_db'
FROM source_orders
WHERE updated_at > @last_extract_time;
-- 方案 2:全量覆盖(适合小表)
TRUNCATE TABLE ods_orders;
INSERT INTO ods_orders SELECT * FROM source_orders;DWD 层(明细层)
-- DWD 层:清洗后的标准化明细数据
CREATE TABLE dwd_orders (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
order_status VARCHAR(20) NOT NULL,
order_amount NUMERIC(18,2) NOT NULL,
discount_amount NUMERIC(18,2) NOT NULL DEFAULT 0,
payment_method VARCHAR(30),
created_at TIMESTAMP NOT NULL,
paid_at TIMESTAMP,
shipped_at TIMESTAMP,
completed_at TIMESTAMP,
cancelled_at TIMESTAMP,
dt DATE NOT NULL, -- 分区字段
-- 数据质量标记
is_valid BOOLEAN NOT NULL DEFAULT TRUE,
dq_check_time TIMESTAMP,
-- 元数据
etl_updated_at TIMESTAMP DEFAULT NOW()
) PARTITION BY RANGE (dt); -- PostgreSQL 按日期分区
-- 创建分区
CREATE TABLE dwd_orders_202401 PARTITION OF dwd_orders
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE dwd_orders_202402 PARTITION OF dwd_orders
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- 从 ODS 清洗到 DWD
INSERT INTO dwd_orders(
order_id, customer_id, product_id, order_status,
order_amount, discount_amount, payment_method,
created_at, paid_at, shipped_at, completed_at,
cancelled_at, dt, is_valid, dq_check_time
)
SELECT
order_id,
customer_id,
product_id,
LOWER(TRIM(order_status)), -- 标准化状态值
GREATEST(order_amount, 0), -- 金额不能为负
COALESCE(discount_amount, 0), -- 空值处理
payment_method,
created_at,
CASE WHEN order_status = 'paid' THEN created_at END, -- 派生字段
NULL,
CASE WHEN order_status = 'completed' THEN NOW() END,
CASE WHEN order_status = 'cancelled' THEN NOW() END,
CAST(created_at AS DATE), -- 分区字段
CASE WHEN order_id IS NOT NULL
AND customer_id IS NOT NULL
AND order_amount >= 0
THEN TRUE ELSE FALSE END, -- 数据质量校验
NOW()
FROM ods_orders
WHERE order_id IS NOT NULL
AND order_amount >= 0
ON CONFLICT (order_id) DO UPDATE SET
order_status = EXCLUDED.order_status,
etl_updated_at = NOW();DWS 层(汇总层)
-- DWS 层:按天 + 商品汇总的销售统计
CREATE TABLE dws_sales_daily (
dt DATE NOT NULL,
product_id BIGINT NOT NULL,
category_l1 VARCHAR(50) NOT NULL,
brand_name VARCHAR(50),
order_count INT NOT NULL DEFAULT 0,
order_customer_count INT NOT NULL DEFAULT 0, -- 去重客户数
total_quantity INT NOT NULL DEFAULT 0,
total_amount NUMERIC(18,2) NOT NULL DEFAULT 0,
total_discount NUMERIC(18,2) NOT NULL DEFAULT 0,
total_profit NUMERIC(18,2),
return_count INT NOT NULL DEFAULT 0,
return_amount NUMERIC(18,2) NOT NULL DEFAULT 0,
-- 环比指标
qty_vs_yesterday NUMERIC(10,2), -- 相对昨天的数量增长率
amt_vs_yesterday NUMERIC(10,2), -- 相对昨天的金额增长率
-- 元数据
etl_updated_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (dt, product_id)
);
-- 每日汇总 SQL
INSERT INTO dws_sales_daily(
dt, product_id, category_l1, brand_name,
order_count, order_customer_count, total_quantity,
total_amount, total_discount, total_profit,
return_count, return_amount
)
SELECT
o.dt,
o.product_id,
p.category_l1,
p.brand_name,
COUNT(DISTINCT o.order_id),
COUNT(DISTINCT o.customer_id),
SUM(o.order_amount),
SUM(o.order_amount - o.discount_amount),
SUM(o.discount_amount),
SUM(o.order_amount * 0.3), -- 假设毛利率 30%
SUM(CASE WHEN o.order_status = 'returned' THEN 1 ELSE 0 END),
SUM(CASE WHEN o.order_status = 'returned' THEN o.order_amount ELSE 0 END)
FROM dwd_orders o
JOIN dim_product p ON o.product_id = p.product_id AND p.is_current = TRUE
WHERE o.is_valid = TRUE
AND o.dt = CURRENT_DATE - 1 -- 处理前一天的数据
GROUP BY o.dt, o.product_id, p.category_l1, p.brand_name
ON CONFLICT (dt, product_id) DO UPDATE SET
order_count = EXCLUDED.order_count,
total_amount = EXCLUDED.total_amount,
etl_updated_at = NOW();
-- 计算环比增长
UPDATE dws_sales_daily curr
SET
qty_vs_yesterday = ROUND(
(curr.total_quantity - prev.total_quantity)
/ NULLIF(prev.total_quantity, 0) * 100, 2),
amt_vs_yesterday = ROUND(
(curr.total_amount - prev.total_amount)
/ NULLIF(prev.total_amount, 0) * 100, 2)
FROM dws_sales_daily prev
WHERE prev.dt = curr.dt - 1
AND prev.product_id = curr.product_id;ADS 层(应用层)
-- ADS 层:直接为报表/看板提供查询能力的视图或物化表
-- 月度销售趋势
CREATE TABLE ads_sales_monthly_trend (
year_month VARCHAR(7) NOT NULL, -- '2024-03'
total_revenue NUMERIC(18,2),
total_orders INT,
avg_order_value NUMERIC(18,2),
total_profit NUMERIC(18,2),
profit_margin NUMERIC(5,2),
revenue_vs_last_month NUMERIC(10,2),
PRIMARY KEY (year_month)
);
-- 月度趋势汇总
INSERT INTO ads_sales_monthly_trend
SELECT
TO_CHAR(dt, 'YYYY-MM') AS year_month,
SUM(total_amount - total_discount) AS total_revenue,
SUM(order_count) AS total_orders,
ROUND(AVG(total_amount / NULLIF(order_count, 0)), 2) AS avg_order_value,
SUM(total_profit) AS total_profit,
ROUND(SUM(total_profit) / NULLIF(SUM(total_amount - total_discount), 0) * 100, 2)
AS profit_margin,
NULL AS revenue_vs_last_month -- 后续更新
FROM dws_sales_daily
GROUP BY TO_CHAR(dt, 'YYYY-MM')
ON CONFLICT (year_month) DO UPDATE SET
total_revenue = EXCLUDED.total_revenue;
-- Top 10 商品排行
CREATE MATERIALIZED VIEW ads_top10_products AS
SELECT
product_id,
category_l1,
SUM(total_amount - total_discount) AS total_revenue,
SUM(total_quantity) AS total_qty,
COUNT(DISTINCT dt) AS active_days,
DENSE_RANK() OVER (
PARTITION BY DATE_TRUNC('month', dt)
ORDER BY SUM(total_amount - total_discount) DESC
) AS rank_in_month,
dt
FROM dws_sales_daily
WHERE dt >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY product_id, category_l1, dt
WITH DATA;
-- 定期刷新物化视图
REFRESH MATERIALIZED VIEW CONCURRENTLY ads_top10_products;
-- 客户分层分析
CREATE TABLE ads_customer_segment AS
SELECT
c.customer_id,
c.customer_name,
c.member_level,
c.province,
COUNT(DISTINCT f.order_id) AS order_count,
SUM(f.amount) AS total_spent,
MIN(f.created_at) AS first_order_date,
MAX(f.created_at) AS last_order_date,
CASE
WHEN SUM(f.amount) >= 10000 THEN '高价值'
WHEN SUM(f.amount) >= 3000 THEN '中价值'
WHEN SUM(f.amount) >= 500 THEN '低价值'
ELSE '待激活'
END AS value_segment,
CASE
WHEN MAX(f.created_at) >= CURRENT_DATE - INTERVAL '30 days' THEN '活跃'
WHEN MAX(f.created_at) >= CURRENT_DATE - INTERVAL '90 days' THEN '沉睡'
ELSE '流失'
END AS activity_segment
FROM dim_customer c
JOIN fact_sales f ON c.customer_key = f.customer_key
WHERE c.is_current = TRUE
AND f.is_returned = FALSE
GROUP BY c.customer_id, c.customer_name, c.member_level, c.province;Slowly Changing Dimension(SCD)与历史追踪
SCD Type 1:直接覆盖
-- SCD Type 1:直接更新,不保留历史
-- 适用:修正错误数据或不重要的属性变更
UPDATE dim_product
SET product_name = '旗舰耳机 Pro Max',
price = 1299.00
WHERE product_id = 1001
AND is_current = TRUE;SCD Type 2:新增版本
-- SCD Type 2:新增一行记录,保留完整历史
-- 适用:价格变更、分类调整等需要追踪历史的场景
-- 步骤 1:关闭当前有效记录
UPDATE dim_product
SET effective_to = NOW(),
is_current = FALSE
WHERE product_id = 1001
AND is_current = TRUE;
-- 步骤 2:插入新的有效记录
INSERT INTO dim_product(
product_id, product_name, category_l1, brand_name,
price, cost, status, effective_from, effective_to, is_current
)
VALUES (
1001, '旗舰耳机 Pro Max', '耳机', 'SunnyAudio',
1299.00, 650.00, 'active', NOW(), NULL, TRUE
);
-- 查询:获取商品在指定日期的维度信息
SELECT product_name, price, category_l1
FROM dim_product
WHERE product_id = 1001
AND effective_from <= '2024-06-15'
AND (effective_to IS NULL OR effective_to > '2024-06-15');SCD Type 3:有限历史
-- SCD Type 3:在维度表中增加"前一版本"字段
-- 适用:只需要保留最近一次变更的场景
CREATE TABLE dim_product_v3 (
product_key BIGSERIAL PRIMARY KEY,
product_id BIGINT NOT NULL,
product_name VARCHAR(100) NOT NULL,
previous_name VARCHAR(100), -- 上一次的商品名
price NUMERIC(18,2) NOT NULL,
previous_price NUMERIC(18,2), -- 上一次的价格
category_name VARCHAR(50) NOT NULL,
effective_from TIMESTAMP NOT NULL,
is_current BOOLEAN NOT NULL DEFAULT TRUE
);
-- 更新时保留前一版本
UPDATE dim_product_v3
SET previous_name = product_name,
previous_price = price,
product_name = '旗舰耳机 Pro Max',
price = 1299.00
WHERE product_id = 1001
AND is_current = TRUE;SCD 策略选择指南:
- Type 1:修正错误数据、不重要的属性变更
- Type 2:价格、分类、状态等重要属性变更(最常用)
- Type 3:只需保留最近一次变更的简单场景
- Type 2 + Type 1 混合:大部分属性 Type 2,少量不重要的属性 Type 1实战模式
模式一:电商数据仓库完整设计
-- 主题域:订单
-- 事实表粒度:每笔订单的每个商品行
-- 维度:日期、商品、客户、门店、促销
-- 促销维度
CREATE TABLE dim_promotion (
promotion_key BIGSERIAL PRIMARY KEY,
promotion_id BIGINT NOT NULL,
promotion_name VARCHAR(100) NOT NULL,
promotion_type VARCHAR(50), -- 'discount', 'coupon', 'flash_sale'
discount_rate NUMERIC(5,2),
effective_from TIMESTAMP NOT NULL,
effective_to TIMESTAMP,
is_current BOOLEAN NOT NULL DEFAULT TRUE
);
-- 带促销维度的销售事实
CREATE TABLE fact_sales_with_promo (
sales_key BIGSERIAL PRIMARY KEY,
date_key INT NOT NULL,
product_key BIGINT NOT NULL,
customer_key BIGINT NOT NULL,
store_key BIGINT NOT NULL,
promotion_key BIGINT, -- 可能为空(无促销)
order_id BIGINT NOT NULL,
quantity INT NOT NULL,
unit_price NUMERIC(18,2) NOT NULL,
original_amount NUMERIC(18,2) NOT NULL, -- 原价
discount_amount NUMERIC(18,2) NOT NULL DEFAULT 0,
final_amount NUMERIC(18,2) NOT NULL, -- 实付金额
FOREIGN KEY (date_key) REFERENCES dim_date(date_key),
FOREIGN KEY (product_key) REFERENCES dim_product(product_key),
FOREIGN KEY (customer_key) REFERENCES dim_customer(customer_key),
FOREIGN KEY (store_key) REFERENCES dim_store(store_key)
);
-- 促销效果分析
SELECT
p.promotion_type,
p.promotion_name,
COUNT(DISTINCT f.order_id) AS affected_orders,
SUM(f.original_amount) AS original_revenue,
SUM(f.final_amount) AS actual_revenue,
SUM(f.discount_amount) AS total_discount,
ROUND(SUM(f.discount_amount) / NULLIF(SUM(f.original_amount), 0) * 100, 2)
AS discount_rate_pct,
COUNT(DISTINCT f.customer_key) AS affected_customers
FROM fact_sales_with_promo f
JOIN dim_promotion p ON f.promotion_key = p.promotion_key
WHERE f.date_key BETWEEN 20240101 AND 20241231
GROUP BY p.promotion_type, p.promotion_name
ORDER BY total_discount DESC;模式二:数据质量校验
-- 数据质量校验表
CREATE TABLE dq_check_results (
check_id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
check_name VARCHAR(100) NOT NULL,
check_type VARCHAR(50) NOT NULL, -- 'completeness', 'accuracy', 'consistency'
expected_count INT,
actual_count INT,
pass_flag BOOLEAN NOT NULL,
details TEXT,
checked_at TIMESTAMP DEFAULT NOW()
);
-- 完整性检查:ODDS 订单是否有空值
INSERT INTO dq_check_results (table_name, check_name, check_type, expected_count, actual_count, pass_flag, details)
SELECT
'ods_orders',
'order_id_not_null',
'completeness',
COUNT(*),
COUNT(order_id),
COUNT(*) = COUNT(order_id),
'检查 order_id 是否有空值'
FROM ods_orders
WHERE etl_batch_id = @batch_id;
-- 一致性检查:DWD 与 ODS 订单数量是否一致
INSERT INTO dq_check_results (table_name, check_name, check_type, expected_count, actual_count, pass_flag, details)
SELECT
'dwd_orders',
'row_count_consistency',
'consistency',
(SELECT COUNT(*) FROM ods_orders WHERE etl_batch_id = @batch_id),
(SELECT COUNT(*) FROM dwd_orders WHERE dt = CURRENT_DATE - 1),
(SELECT COUNT(*) FROM ods_orders WHERE etl_batch_id = @batch_id)
= (SELECT COUNT(*) FROM dwd_orders WHERE dt = CURRENT_DATE - 1),
'DWD 与 ODS 订单行数一致性检查';
-- 准确性检查:DWS 汇总金额与 DWD 明细金额是否一致
INSERT INTO dq_check_results (table_name, check_name, check_type, expected_count, actual_count, pass_flag, details)
SELECT
'dws_sales_daily',
'amount_consistency',
'accuracy',
(SELECT SUM(total_amount) FROM dws_sales_daily WHERE dt = CURRENT_DATE - 1),
(SELECT SUM(order_amount) FROM dwd_orders WHERE dt = CURRENT_DATE - 1),
ABS((SELECT SUM(total_amount) FROM dws_sales_daily WHERE dt = CURRENT_DATE - 1)
- (SELECT SUM(order_amount) FROM dwd_orders WHERE dt = CURRENT_DATE - 1)) < 0.01,
'DWS 汇总金额与 DWD 明细金额偏差检查';模式三:Python ETL 脚本
import pandas as pd
from sqlalchemy import create_engine
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ETL Pipeline:
"""ODS -> DWD -> DWS -> ADS 的 ETL 流水线"""
def __init__(self, connection_string: str):
self.engine = create_engine(connection_string)
def extract_from_oltp(self, batch_id: str):
"""从业务系统抽取数据到 ODS 层"""
oltp_engine = create_engine("postgresql://user:pass@oltp-host:5432/order_db")
# 增量抽取
query = """
SELECT *, %(batch_id)s AS etl_batch_id, NOW() AS etl_loaded_at
FROM orders
WHERE updated_at > (
SELECT COALESCE(MAX(updated_at), '1900-01-01'::timestamp)
FROM ods_orders
)
"""
df = pd.read_sql(query, oltp_engine, params={"batch_id": batch_id})
if not df.empty:
df.to_sql("ods_orders", self.engine, if_exists="append", index=False)
logger.info(f"ODS 抽取完成: {len(df)} 条记录")
oltp_engine.dispose()
def transform_ods_to_dwd(self):
"""ODS 清洗到 DWD"""
sql = """
INSERT INTO dwd_orders(
order_id, customer_id, product_id, order_status,
order_amount, discount_amount, payment_method,
created_at, paid_at, dt, is_valid
)
SELECT
order_id,
customer_id,
product_id,
LOWER(TRIM(order_status)),
GREATEST(order_amount, 0),
COALESCE(discount_amount, 0),
payment_method,
created_at,
CASE WHEN order_status = 'paid' THEN created_at END,
CAST(created_at AS DATE),
CASE WHEN order_id IS NOT NULL
AND customer_id IS NOT NULL
AND order_amount >= 0
THEN TRUE ELSE FALSE END
FROM ods_orders
WHERE order_id NOT IN (SELECT order_id FROM dwd_orders)
AND order_id IS NOT NULL
"""
with self.engine.connect() as conn:
result = conn.execute(sql)
conn.commit()
logger.info(f"DWD 清洗完成: {result.rowcount} 条记录")
def aggregate_dwd_to_dws(self, dt: str):
"""DWD 汇总到 DWS"""
sql = f"""
INSERT INTO dws_sales_daily(
dt, product_id, category_l1, order_count,
total_quantity, total_amount, total_discount
)
SELECT
dt, product_id, category_l1,
COUNT(DISTINCT order_id),
SUM(1),
SUM(order_amount),
SUM(discount_amount)
FROM dwd_orders
WHERE dt = '{dt}' AND is_valid = TRUE
GROUP BY dt, product_id, category_l1
ON CONFLICT (dt, product_id) DO UPDATE SET
order_count = EXCLUDED.order_count,
total_amount = EXCLUDED.total_amount
"""
with self.engine.connect() as conn:
result = conn.execute(sql)
conn.commit()
logger.info(f"DWS 汇总完成: {result.rowcount} 条记录")
def run_daily(self, batch_id: str):
"""运行每日 ETL 流水线"""
logger.info(f"=== 开始每日 ETL: {batch_id} ===")
try:
self.extract_from_oltp(batch_id)
self.transform_ods_to_dwd()
self.aggregate_dwd_to_dws("CURRENT_DATE - 1")
logger.info("=== 每日 ETL 完成 ===")
except Exception as e:
logger.error(f"ETL 失败: {e}")
raise优点
缺点
总结
数据仓库的重点不是把数据"存更多",而是让数据变得可分析、可治理、可复盘。实际项目中,先把主题域、指标口径、分层边界和维度建模规范定清楚,再逐步补 ETL、质量校验和 BI 输出,整体效果会稳定得多。不要追求一步到位建好所有层,而是从最急需的分析场景出发,逐步迭代完善。
关键知识点
- 事务库关注写入一致性,数仓更关注分析效率和历史追溯。
- 星型模型更适合 BI 查询,雪花模型更规范但更复杂。
- ODS/DWD/DWS/ADS 分层有助于治理和排障。
- SCD Type 2 是处理维度历史变化的常用手段。
- 事实表粒度决定了分析的下限,粒度越细分析越灵活。
- 数据质量校验是 ETL 流程中不可省略的环节。
- 分区表和物化视图是提升数仓查询性能的重要手段。
项目落地视角
- 电商业务可围绕订单、商品、用户、库存建立主题仓库。
- 财务报表通常需要稳定的历史口径和快照能力。
- 运营分析常以 DWS/ADS 层直接支持看板和报表。
- 数据仓库应与数据质量平台、任务调度平台协同工作。
- 初期不要建太多层,ODS + DWD + ADS 三层起步即可。
- 指标口径必须在建表前统一,否则后期修正成本极高。
常见误区
- 把业务 OLTP 表直接拿来做 BI 报表,导致查询慢且口径混乱。
- 只建表不做指标定义,导致各团队理解不一致。
- 分层太多但无清晰职责,最后 ETL 链路失控。
- 不保留历史维度变更,导致历史报表无法复现。
- 事实表粒度太粗,无法支持细粒度分析。
- 忽略数据质量校验,ETL 错误传播到报表层才发现。
- 过度设计星座模型,维护成本远超分析需求。
进阶路线
- 学习 Kimball 维度建模与 Inmon 企业信息工厂方法。
- 研究数据血缘、数据质量校验和元数据管理。
- 引入 ClickHouse、Snowflake、BigQuery 等分析引擎。
- 学习实时数仓与 Lambda/Kappa 架构。
- 掌握 dbt(Data Build Tool)等现代数仓工具链。
- 研究数据湖(Data Lake)与湖仓一体架构。
适用场景
- 销售、订单、库存、用户行为分析。
- 企业管理报表与多维 BI 分析。
- 经营复盘、财务审计、主题数据集市。
- 需要整合多个业务系统做统一分析的场景。
- 长期趋势分析和历史数据对比。
- 跨部门数据口径统一。
落地建议
- 先统一指标口径,再做表设计和 ETL 开发。
- 为每层数据建立字段说明、更新时间和 owner 信息。
- 事实表关注业务事件,维度表关注解释上下文。
- 为关键指标建立对账校验和异常告警机制。
- 使用分区表管理大数据量表,按日期分区是最佳实践。
- 定期刷新物化视图,保持 ADS 层数据的时效性。
- ETL 脚本纳入版本控制,变更需经过测试。
排错清单
- 检查事实表粒度是否和分析目标一致。
- 检查维度历史是否完整,SCD 是否按预期更新。
- 检查 ETL 是否存在重复加载、漏数或口径漂移。
- 检查 BI 查询慢是否来自模型设计还是执行引擎问题。
- 检查数据质量校验是否覆盖了完整性和一致性。
- 检查分区表的分区范围是否覆盖了所有数据。
- 检查物化视图的刷新策略是否满足业务时效性要求。
复盘问题
- 你的报表问题更像是"没数据"还是"口径不一致"?
- 事实表粒度是否足以支持未来常见分析?
- 当前最值得先建的数据主题域是什么?
- 历史维度变化是否已经可追踪、可解释?
- ETL 链路的延迟是否满足业务需求?
- 数据质量校验是否覆盖了关键指标?
- 数仓建设投入是否与业务价值匹配?
