数据迁移实战
大约 11 分钟约 3182 字
数据迁移实战
简介
数据迁移是数据库运维中最具挑战性的操作之一,涉及将数据从源系统安全、完整、高效地迁移到目标系统。无论是数据库版本升级、架构调整、还是系统整合,数据迁移都需要严密的计划和可靠的执行方案。本文将从迁移策略制定、ETL 工具选型、增量迁移实现和回滚方案设计四个方面,系统地介绍数据迁移的实战方法和最佳实践。
特点
迁移策略
数据迁移策略的选择取决于业务允许的停机时间、数据量和数据一致性要求。
迁移策略对比
| 策略 | 停机时间 | 数据量 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 全量一次性迁移 | 数小时到数天 | 中小规模 | 低 | 非核心系统、新系统上线 |
| 全量 + 增量迁移 | 分钟级 | 大规模 | 高 | 核心业务系统 |
| 双写迁移 | 零停机 | 任意规模 | 很高 | 交易系统、支付系统 |
| 分批迁移 | 分钟级 | 大规模 | 中等 | 历史数据归档 |
迁移前检查清单
-- MySQL 迁移前源库检查
-- 1. 检查数据库大小
SELECT
table_schema AS database_name,
ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS size_mb,
COUNT(*) AS table_count
FROM information_schema.tables
WHERE table_schema NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys')
GROUP BY table_schema
ORDER BY size_mb DESC;
-- 2. 检查表数据量和行数
SELECT
table_name,
table_rows,
ROUND(data_length / 1024 / 1024, 2) AS data_mb,
ROUND(index_length / 1024 / 1024, 2) AS index_mb,
engine
FROM information_schema.tables
WHERE table_schema = 'mydb'
ORDER BY data_length DESC;
-- 3. 检查字符集和排序规则
SELECT table_name, column_name, character_set_name, collation_name
FROM information_schema.columns
WHERE table_schema = 'mydb'
AND character_set_name IS NOT NULL
AND character_set_name != 'utf8mb4';
-- 4. 检查外键依赖关系
SELECT
CONSTRAINT_NAME,
TABLE_NAME,
COLUMN_NAME,
REFERENCED_TABLE_NAME,
REFERENCED_COLUMN_NAME
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = 'mydb'
AND REFERENCED_TABLE_NAME IS NOT NULL
ORDER BY REFERENCED_TABLE_NAME;数据一致性校验
-- 使用 CHECKSUM TABLE 进行数据校验
CHECKSUM TABLE mydb.orders EXTENDED;
CHECKSUM TABLE mydb.order_details EXTENDED;
-- 自定义数据校验脚本
SELECT
'orders' AS table_name,
COUNT(*) AS row_count,
MD5(GROUP_CONCAT(
MD5(CONCAT(
COALESCE(order_id, ''),
COALESCE(order_no, ''),
COALESCE(customer_id, ''),
COALESCE(total_amount, '')
))
ORDER BY order_id
)) AS data_checksum
FROM mydb.orders;ETL 工具
DataX 配置示例
DataX 是阿里巴巴开源的离线数据同步工具,支持多种数据源之间的数据迁移。
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "source_user",
"password": "source_password",
"connection": [
{
"jdbcUrl": ["jdbc:mysql://source-host:3306/mydb"],
"query": [
"SELECT order_id, order_no, customer_id, order_date, total_amount, status FROM orders WHERE order_date >= '2024-01-01'"
]
}
],
"column": ["order_id", "order_no", "customer_id", "order_date", "total_amount", "status"],
"splitPk": "order_id",
"fetchSize": 5000
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "target_user",
"password": "target_password",
"connection": [
{
"jdbcUrl": "jdbc:mysql://target-host:3306/mydb",
"table": ["orders"]
}
],
"column": ["order_id", "order_no", "customer_id", "order_date", "total_amount", "status"],
"preSql": ["TRUNCATE TABLE orders"],
"batchSize": 5000,
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": 4,
"byte": 10485760
},
"errorLimit": {
"record": 100,
"percentage": 0.01
}
}
}
}Python ETL 脚本
import pymysql
import logging
from datetime import datetime
from typing import List, Tuple, Optional
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class DataMigrator:
"""通用数据迁移工具"""
def __init__(self, source_config: dict, target_config: dict):
self.source_config = source_config
self.target_config = target_config
self.batch_size = 5000
self.stats = {"total": 0, "success": 0, "failed": 0, "start_time": None}
def migrate_table(self, table_name: str, query: str, insert_sql: str,
transform=None) -> dict:
"""迁移单张表"""
self.stats["start_time"] = datetime.now()
logger.info(f"开始迁移表: {table_name}")
source_conn = pymysql.connect(**self.source_config)
target_conn = pymysql.connect(**self.target_config)
try:
source_cursor = source_conn.cursor(pymysql.cursors.SSCursor)
target_cursor = target_conn.cursor()
source_cursor.execute(query)
batch: List[Tuple] = []
processed = 0
while True:
rows = source_cursor.fetchmany(self.batch_size)
if not rows:
break
for row in rows:
try:
transformed_row = transform(row) if transform else row
batch.append(transformed_row)
except Exception as e:
logger.warning(f"数据转换失败: {row}, 错误: {e}")
self.stats["failed"] += 1
continue
if len(batch) >= self.batch_size:
self._execute_batch(target_cursor, insert_sql, batch)
target_conn.commit()
processed += len(batch)
logger.info(f"[{table_name}] 已迁移 {processed} 条记录")
batch = []
self.stats["total"] += len(rows)
# 处理剩余数据
if batch:
self._execute_batch(target_cursor, insert_sql, batch)
target_conn.commit()
processed += len(batch)
self.stats["success"] = processed
elapsed = (datetime.now() - self.stats["start_time"]).total_seconds()
logger.info(
f"迁移完成: {table_name}, "
f"成功: {processed}, 失败: {self.stats['failed']}, "
f"耗时: {elapsed:.1f}秒"
)
return self.stats
except Exception as e:
target_conn.rollback()
logger.error(f"迁移失败: {table_name}, 错误: {e}")
raise
finally:
source_cursor.close()
target_cursor.close()
source_conn.close()
target_conn.close()
def _execute_batch(self, cursor, sql: str, batch: List[Tuple]):
cursor.executemany(sql, batch)
self.stats["success"] += len(batch)
# 使用示例
if __name__ == "__main__":
source = {"host": "source-host", "port": 3306, "user": "reader",
"password": "pwd", "database": "source_db", "charset": "utf8mb4"}
target = {"host": "target-host", "port": 3306, "user": "writer",
"password": "pwd", "database": "target_db", "charset": "utf8mb4"}
migrator = DataMigrator(source, target)
# 迁移订单表
migrator.migrate_table(
table_name="orders",
query="SELECT * FROM orders WHERE create_time >= '2024-01-01'",
insert_sql="INSERT INTO orders (id, order_no, customer_id, amount, status) VALUES (%s,%s,%s,%s,%s)"
)增量迁移
增量迁移通过 CDC(Change Data Capture)技术捕获源库的数据变更,实现准实时的数据同步。
基于 MySQL Binlog 的增量迁移
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
import json
import logging
logger = logging.getLogger(__name__)
class BinlogMigrator:
"""基于 Binlog 的增量迁移工具"""
def __init__(self, source_config: dict, target_config: dict):
self.source_config = source_config
self.target_config = target_config
self.monitored_tables = {"mydb.orders", "mydb.order_details", "mydb.customers"}
def start_sync(self, resume_binlog_file: str = None, resume_binlog_pos: int = None):
"""启动增量同步"""
stream_settings = {
"connection_settings": {
"host": self.source_config["host"],
"port": self.source_config["port"],
"user": self.source_config["user"],
"passwd": self.source_config["password"]
},
"server_id": 100,
"blocking": True,
"resume_stream": True,
"only_events": [WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent]
}
if resume_binlog_file and resume_binlog_pos:
stream_settings["log_file"] = resume_binlog_file
stream_settings["log_pos"] = resume_binlog_pos
stream = BinLogStreamReader(**stream_settings)
try:
for binlog_event in stream:
schema = binlog_event.schema
table = binlog_event.table
full_table = f"{schema}.{table}"
if full_table not in self.monitored_tables:
continue
if isinstance(binlog_event, WriteRowsEvent):
for row in binlog_event.rows:
self._handle_insert(schema, table, row["values"])
elif isinstance(binlog_event, UpdateRowsEvent):
for row in binlog_event.rows:
self._handle_update(schema, table, row["before_values"], row["after_values"])
elif isinstance(binlog_event, DeleteRowsEvent):
for row in binlog_event.rows:
self._handle_delete(schema, table, row["values"])
# 记录当前 binlog 位置
logger.info(
f"已同步到 binlog: {stream.log_file}:{stream.log_pos}"
)
except KeyboardInterrupt:
logger.info(f"停止同步,当前位置: {stream.log_file}:{stream.log_pos}")
finally:
stream.close()
def _handle_insert(self, schema, table, values):
"""处理 INSERT 事件"""
columns = ", ".join(values.keys())
placeholders = ", ".join(["%s"] * len(values))
sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
# 执行插入到目标库
def _handle_update(self, schema, table, before, after):
"""处理 UPDATE 事件"""
set_clause = ", ".join([f"{k} = %s" for k in after.keys() if k in self._get_pk(table)])
where_clause = " AND ".join([f"{k} = %s" for k in before.keys() if k in self._get_pk(table)])
sql = f"UPDATE {table} SET {set_clause} WHERE {where_clause}"
# 执行更新到目标库
def _handle_delete(self, schema, table, values):
"""处理 DELETE 事件"""
where_clause = " AND ".join([f"{k} = %s" for k in values.keys() if k in self._get_pk(table)])
sql = f"DELETE FROM {table} WHERE {where_clause}"
# 执行删除到目标库
def _get_pk(self, table):
"""获取表的主键列"""
pk_map = {
"orders": ["order_id"],
"order_details": ["detail_id"],
"customers": ["customer_id"]
}
return pk_map.get(table, ["id"])增量迁移时间线
| 阶段 | 操作 | 持续时间 | 说明 |
|---|---|---|---|
| 准备阶段 | 环境准备、工具部署 | 1-2 天 | 部署 CDC 工具,创建目标库表 |
| 全量迁移 | 历史数据全量同步 | 数小时到数天 | 取决于数据量 |
| 增量同步 | 启动 CDC 同步变更 | 持续运行 | 追赶全量迁移期间的新增数据 |
| 数据校验 | 全量数据对比校验 | 2-4 小时 | 确保源和目标数据一致 |
| 流量切换 | 应用切换到新库 | 分钟级 | DNS 切换或修改连接配置 |
| 观察期 | 双向同步监控 | 3-7 天 | 确认新库稳定运行 |
回滚方案
回滚方案是数据迁移的安全保障,确保在迁移失败时能够安全恢复。
回滚方案设计
-- 1. 迁移前备份关键表
CREATE TABLE orders_backup_20240615 AS SELECT * FROM orders;
CREATE TABLE order_details_backup_20240615 AS SELECT * FROM order_details;
CREATE TABLE customers_backup_20240615 AS SELECT * FROM customers;
-- 2. 记录迁移前的 binlog 位置
SHOW MASTER STATUS;
-- 记录 File 和 Position 值
-- 3. 如果迁移失败,执行回滚
START TRANSACTION;
-- 恢复备份表数据
TRUNCATE TABLE orders;
INSERT INTO orders SELECT * FROM orders_backup_20240615;
TRUNCATE TABLE order_details;
INSERT INTO order_details SELECT * FROM order_details_backup_20240615;
TRUNCATE TABLE customers;
INSERT INTO customers SELECT * FROM customers_backup_20240615;
-- 验证数据一致性
SELECT COUNT(*) AS orders_count FROM orders;
SELECT COUNT(*) AS details_count FROM order_details;
SELECT COUNT(*) AS customers_count FROM customers;
-- 确认无误后提交
COMMIT;
-- 如有问题可以 ROLLBACK迁移检查与回滚脚本
import pymysql
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class MigrationRollback:
"""迁移回滚管理器"""
def __init__(self, source_config: dict, target_config: dict):
self.source = pymysql.connect(**source_config)
self.target = pymysql.connect(**target_config)
def validate_migration(self, tables: list) -> dict:
"""验证迁移结果"""
results = {}
for table in tables:
source_cursor = self.source.cursor()
target_cursor = self.target.cursor()
source_cursor.execute(f"SELECT COUNT(*) FROM {table}")
source_count = source_cursor.fetchone()[0]
target_cursor.execute(f"SELECT COUNT(*) FROM {table}")
target_count = target_cursor.fetchone()[0]
# 校验数据 checksum
source_cursor.execute(f"CHECKSUM TABLE {table} EXTENDED")
source_checksum = source_cursor.fetchone()[1]
target_cursor.execute(f"CHECKSUM TABLE {table} EXTENDED")
target_checksum = target_cursor.fetchone()[1]
is_valid = (source_count == target_count and source_checksum == target_checksum)
results[table] = {
"source_count": source_count,
"target_count": target_count,
"source_checksum": source_checksum,
"target_checksum": target_checksum,
"is_valid": is_valid
}
status = "通过" if is_valid else "失败"
logger.info(f"校验 {table}: {status} (源: {source_count}, 目标: {target_count})")
source_cursor.close()
target_cursor.close()
return results
def execute_rollback(self, tables: list, backup_suffix: str):
"""执行回滚操作"""
logger.warning(f"开始回滚,备份表后缀: {backup_suffix}")
cursor = self.target.cursor()
try:
cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
for table in tables:
backup_table = f"{table}_backup_{backup_suffix}"
logger.info(f"回滚表 {table},从备份 {backup_table} 恢复")
cursor.execute(f"TRUNCATE TABLE {table}")
cursor.execute(f"INSERT INTO {table} SELECT * FROM {backup_table}")
# 验证恢复结果
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
logger.info(f"表 {table} 已恢复,记录数: {count}")
cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
self.target.commit()
logger.info("回滚完成")
except Exception as e:
self.target.rollback()
logger.error(f"回滚失败: {e}")
raise
finally:
cursor.close()优点
缺点
总结
数据迁移是一项需要严谨计划和充分测试的系统工程。对于允许停机的非核心系统,全量一次性迁移是最简单的选择;对于需要零停机的核心业务系统,全量加增量的 CDC 迁移方案是最佳实践。无论选择哪种策略,都必须建立完善的数据校验机制和可靠的回滚方案。建议在正式迁移前进行至少两轮完整的演练,记录每个步骤的耗时和可能的问题点,确保正式迁移时能够顺利执行。同时,迁移完成后应保留一段观察期,通过双向同步或读写回源的方式确保新系统的稳定运行。
关键知识点
- 数据库主题一定要同时看数据模型、读写模式和执行代价。
- 很多性能问题不是 SQL 语法问题,而是索引、统计信息、事务和数据分布问题。
- 高可用、备份、迁移和治理与查询优化同样重要。
- 先把数据模型、访问模式和执行代价绑定起来理解。
项目落地视角
- 所有优化前后都保留执行计划、样本 SQL 和关键指标对比。
- 上线前准备回滚脚本、备份点和校验方案。
- 把连接池、锁等待、慢查询和容量增长纳入日常巡检。
- 保留执行计划、样本 SQL、索引定义和优化前后指标。
常见误区
- 脱离真实数据分布讨论索引或分片。
- 只看单条 SQL,不看整条业务链路的事务和锁。
- 把测试环境结论直接等同于生产环境结论。
- 脱离真实数据分布设计索引。
进阶路线
- 继续向执行计划、存储引擎、复制机制和数据治理层深入。
- 把主题与 ORM、缓存、消息队列和归档策略联动起来思考。
- 沉淀成数据库设计规范、SQL 审核规则和变更流程。
- 继续深入存储引擎、复制机制、归档与冷热分层治理。
适用场景
- 当你准备把《数据迁移实战》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合数据建模、查询优化、事务控制、高可用和迁移治理。
- 当系统开始遇到慢查询、锁冲突、热点数据或容量增长时,这类主题价值最高。
落地建议
- 先分析真实查询模式、数据量级和写入特征,再决定索引或分片策略。
- 所有优化结论都结合执行计划、样本数据和监控指标验证。
- 高风险操作前准备备份、回滚脚本与校验 SQL。
排错清单
- 先确认瓶颈在 CPU、I/O、锁等待、网络还是 SQL 本身。
- 检查执行计划是否走错索引、是否发生排序或全表扫描。
- 排查长事务、隐式类型转换、统计信息过期和参数嗅探。
复盘问题
- 如果把《数据迁移实战》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《数据迁移实战》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《数据迁移实战》最大的收益和代价分别是什么?
