批量操作与大数据处理
大约 10 分钟约 2910 字
批量操作与大数据处理
简介
批量操作是提升数据处理效率的关键技术。当需要插入、更新或处理大量数据时,逐条操作效率极低。.NET 提供了多种批量处理方案:EF Core 批量扩展、Dapper 批量执行、SqlBulkCopy 高速导入、分片并行处理。掌握这些技术可以处理百万级甚至千万级数据。
特点
SqlBulkCopy
高速批量导入
/// <summary>
/// SqlBulkCopy — SQL Server 批量导入
/// </summary>
using Microsoft.Data.SqlClient;
public class BulkImportService
{
private readonly string _connectionString;
public BulkImportService(IConfiguration config)
{
_connectionString = config.GetConnectionString("Default")!;
}
// DataTable 批量导入
public async Task<int> BulkInsertFromDataTableAsync(DataTable table, string destinationTable)
{
await using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = destinationTable,
BatchSize = 10000,
BulkCopyTimeout = 300,
EnableStreaming = true
};
// 映射列
foreach (DataColumn column in table.Columns)
{
bulkCopy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
await bulkCopy.WriteToServerAsync(table);
return table.Rows.Count;
}
// 从实体列表批量导入
public async Task BulkInsertAsync<T>(IEnumerable<T> items, string tableName)
{
var table = ConvertToDataTable(items);
await BulkInsertFromDataTableAsync(table, tableName);
}
private DataTable ConvertToDataTable<T>(IEnumerable<T> items)
{
var properties = typeof(T).GetProperties(BindingFlags.Public | BindingFlags.Instance);
var table = new DataTable();
foreach (var prop in properties)
table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
foreach (var item in items)
{
var row = table.NewRow();
foreach (var prop in properties)
row[prop.Name] = prop.GetValue(item) ?? DBNull.Value;
table.Rows.Add(row);
}
return table;
}
}EF Core 批量扩展
EFCore.BulkExtensions
/// <summary>
/// EF Core 批量操作扩展
/// </summary>
using EFCore.BulkExtensions;
public class OrderBulkService
{
private readonly AppDbContext _db;
public OrderBulkService(AppDbContext db)
{
_db = db;
}
// 批量插入(绕过 ChangeTracker)
public async Task BulkInsertOrdersAsync(List<Order> orders)
{
await _db.BulkInsertAsync(orders, options =>
{
options.BatchSize = 5000;
options.SetOutputIdentity = true; // 获取自增 ID
options.UseTempDB = true; // 使用临时表
});
}
// 批量更新
public async Task BulkUpdatePricesAsync(List<Product> products)
{
await _db.BulkUpdateAsync(products, options =>
{
options.PropertiesToInclude = new List<string> { "Id", "Price", "UpdatedAt" };
options.BatchSize = 5000;
});
}
// 批量删除
public async Task BulkDeleteInactiveAsync()
{
await _db.Products
.Where(p => p.Status == "Inactive" && p.UpdatedAt < DateTime.Now.AddYears(-1))
.ExecuteDeleteAsync(); // EF Core 7+ 原生批量删除
}
// 批量插入或更新(Upsert)
public async Task BulkSyncAsync(List<Product> products)
{
await _db.BulkInsertOrUpdateAsync(products, options =>
{
options.UpdateByProperties = new List<string> { "Code" };
options.PropertiesToIncludeOnCompare = new List<string> { "Name", "Price", "Stock" };
});
}
// 批量读取(分页流式处理)
public async IAsyncEnumerable<Product> StreamAllProductsAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
const int batchSize = 5000;
int skip = 0;
while (!ct.IsCancellationRequested)
{
var batch = await _db.Products
.OrderBy(p => p.Id)
.Skip(skip)
.Take(batchSize)
.ToListAsync(ct);
if (batch.Count == 0) yield break;
foreach (var product in batch)
yield return product;
skip += batchSize;
}
}
}分批处理
大数据分批处理
/// <summary>
/// 分批处理大数据集
/// </summary>
public class BatchProcessor
{
private readonly ILogger<BatchProcessor> _logger;
public BatchProcessor(ILogger<BatchProcessor> logger)
{
_logger = logger;
}
// 分批处理列表数据
public async Task ProcessInBatchesAsync<T>(
IEnumerable<T> items,
int batchSize,
Func<List<T>, Task> processBatch)
{
var batch = new List<T>(batchSize);
int totalProcessed = 0;
foreach (var item in items)
{
batch.Add(item);
if (batch.Count >= batchSize)
{
await processBatch(batch);
totalProcessed += batch.Count;
_logger.LogInformation("已处理 {Count} 条", totalProcessed);
batch.Clear();
}
}
// 处理剩余
if (batch.Count > 0)
{
await processBatch(batch);
totalProcessed += batch.Count;
}
_logger.LogInformation("全部处理完成:{Total}", totalProcessed);
}
// 使用示例
public async Task ImportLargeDatasetAsync(IEnumerable<Record> records)
{
await ProcessInBatchesAsync(records, 5000, async batch =>
{
await _db.BulkInsertAsync(batch);
});
}
}并行批量处理
并行分片
/// <summary>
/// 并行批量处理
/// </summary>
public class ParallelBatchProcessor
{
public async Task ParallelProcessAsync<T>(
IReadOnlyList<T> items,
int batchSize,
int maxDegree,
Func<List<T>, Task> processBatch)
{
var batches = items
.Select((item, index) => (item, index))
.GroupBy(x => x.index / batchSize)
.Select(g => g.Select(x => x.item).ToList())
.ToList();
var options = new ParallelOptions { MaxDegreeOfParallelism = maxDegree };
await Parallel.ForEachAsync(batches, options, async (batch, ct) =>
{
await processBatch(batch);
});
}
// 使用 — 并行导入
public async Task ParallelImportAsync(List<Record> records)
{
await ParallelProcessAsync(records, 10000, 4, async batch =>
{
using var scope = _serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
await db.BulkInsertAsync(batch);
});
}
}Dapper 批量操作
/// <summary>
/// Dapper 高性能批量操作
/// </summary>
// dotnet add package Dapper
// dotnet add package Microsoft.Data.SqlClient
public class DapperBulkService
{
private readonly string _connectionString;
public DapperBulkService(IConfiguration config)
{
_connectionString = config.GetConnectionString("Default")!;
}
// 批量插入(Dapper + TVP 表值参数)
public async Task<int> BulkInsertAsync(IEnumerable<Product> products)
{
await using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
var dt = new DataTable();
dt.Columns.Add("Name", typeof(string));
dt.Columns.Add("Price", typeof(decimal));
dt.Columns.Add("Category", typeof(string));
foreach (var product in products)
{
dt.Rows.Add(product.Name, product.Price, product.Category);
}
// 使用 TVP 参数
var parameter = new
{
Products = dt.AsTableValuedParameter("ProductTableType")
};
var affected = await connection.ExecuteAsync(
"INSERT INTO Products (Name, Price, Category) SELECT Name, Price, Category FROM @Products",
parameter);
return affected;
}
// 批量更新
public async Task BulkUpdatePricesAsync(Dictionary<int, decimal> priceUpdates)
{
await using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
// 使用临时表
var dt = new DataTable();
dt.Columns.Add("Id", typeof(int));
dt.Columns.Add("Price", typeof(decimal));
foreach (var (id, price) in priceUpdates)
{
dt.Rows.Add(id, price);
}
await connection.ExecuteAsync(@"
CREATE TABLE #PriceUpdates (Id INT, Price DECIMAL(18,2));
INSERT INTO #PriceUpdates SELECT * FROM @Updates;
UPDATE p SET p.Price = u.Price
FROM Products p INNER JOIN #PriceUpdates u ON p.Id = u.Id;
DROP TABLE #PriceUpdates;",
new { Updates = dt.AsTableValuedParameter("IdPriceTableType") });
}
// 快速查询(无追踪,高性能)
public async Task<List<Product>> GetProductsAsync()
{
await using var connection = new SqlConnection(_connectionString);
var sql = "SELECT Id, Name, Price, Category FROM Products WHERE IsActive = 1";
return (await connection.QueryAsync<Product>(sql)).ToList();
}
// 多结果集
public async Task<(List<Product> Products, int Total)> GetPagedAsync(
int page, int pageSize, string category)
{
await using var connection = new SqlConnection(_connectionString);
using var multi = await connection.QueryMultipleAsync(@"
SELECT COUNT(*) FROM Products WHERE Category = @Category;
SELECT Id, Name, Price FROM Products
WHERE Category = @Category
ORDER BY Id OFFSET @Skip ROWS FETCH NEXT @Take ROWS ONLY;",
new { Category = category, Skip = (page - 1) * pageSize, Take = pageSize });
var total = await multi.ReadFirstAsync<int>();
var products = (await multi.ReadAsync<Product>()).ToList();
return (products, total);
}
}大文件导入与流式处理
/// <summary>
/// 大文件流式导入
/// </summary>
public class FileImportService
{
private readonly ILogger<FileImportService> _logger;
private readonly IServiceScopeFactory _scopeFactory;
public FileImportService(
ILogger<FileImportService> logger,
IServiceScopeFactory scopeFactory)
{
_logger = logger;
_scopeFactory = scopeFactory;
}
// CSV 文件流式导入
public async Task<ImportResult> ImportCsvAsync(
Stream fileStream,
int batchSize = 5000,
CancellationToken cancellationToken = default)
{
var result = new ImportResult();
var batch = new List<Product>(batchSize);
var lineNumber = 0;
using var reader = new StreamReader(fileStream);
var header = await reader.ReadLineAsync(cancellationToken); // 跳过表头
lineNumber++;
while (await reader.ReadLineAsync(cancellationToken) is { } line)
{
lineNumber++;
try
{
var parts = line.Split(',');
var product = new Product
{
Name = parts[0],
Price = decimal.Parse(parts[1]),
Category = parts[2]
};
batch.Add(product);
}
catch (Exception ex)
{
result.Errors.Add(new ImportError(lineNumber, ex.Message));
_logger.LogWarning(ex, "第 {Line} 行解析失败", lineNumber);
}
if (batch.Count >= batchSize)
{
await SaveBatchAsync(batch, result, cancellationToken);
batch.Clear();
}
}
// 处理剩余数据
if (batch.Count > 0)
{
await SaveBatchAsync(batch, result, cancellationToken);
}
_logger.LogInformation("导入完成: 成功 {Success}, 失败 {Failed}",
result.SuccessCount, result.ErrorCount);
return result;
}
private async Task SaveBatchAsync(
List<Product> batch,
ImportResult result,
CancellationToken cancellationToken)
{
using var scope = _scopeFactory.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
try
{
await db.BulkInsertAsync(batch, options =>
{
options.BatchSize = batch.Count;
options.SetOutputIdentity = true;
});
result.SuccessCount += batch.Count;
}
catch (Exception ex)
{
_logger.LogError(ex, "批量保存失败");
result.ErrorCount += batch.Count;
}
}
}
public record ImportResult(int SuccessCount = 0, int ErrorCount = 0);
public record ImportError(int LineNumber, string Message);
// 使用
[HttpPost("import")]
[RequestSizeLimit(100 * 1024 * 1024)] // 100MB 限制
public async Task<IActionResult> ImportProducts(IFormFile file)
{
if (file.Length == 0) return BadRequest("文件为空");
await using var stream = file.OpenReadStream();
var result = await _importService.ImportCsvAsync(stream);
return Ok(new
{
success = result.SuccessCount,
failed = result.ErrorCount,
errors = result.Errors.Take(10).ToList() // 只返回前 10 条错误
});
}大数据导出
/// <summary>
/// 大数据导出 — 流式写入避免内存溢出
/// </summary>
public class ExportService
{
private readonly AppDbContext _db;
// CSV 流式导出
public async Task ExportToCsvAsync(
Stream outputStream,
Expression<Func<Product, bool>> filter,
CancellationToken cancellationToken = default)
{
await using var writer = new StreamWriter(outputStream);
await writer.WriteLineAsync("Id,Name,Price,Category,CreatedAt");
// 使用 AsNoTracking + AsAsyncEnumerable 避免内存溢出
await foreach (var product in _db.Products
.AsNoTracking()
.Where(filter)
.OrderBy(p => p.Id)
.AsAsyncEnumerable()
.WithCancellation(cancellationToken))
{
var line = $"{product.Id},{EscapeCsv(product.Name)},{product.Price},{product.Category},{product.CreatedAt:yyyy-MM-dd}";
await writer.WriteLineAsync(line);
}
await writer.FlushAsync();
}
// JSON 流式导出
public async Task ExportToJsonAsync(
Stream outputStream,
CancellationToken cancellationToken = default)
{
await using var writer = new Utf8JsonWriter(outputStream);
writer.WriteStartArray();
var first = true;
await foreach (var product in _db.Products
.AsNoTracking()
.OrderBy(p => p.Id)
.AsAsyncEnumerable()
.WithCancellation(cancellationToken))
{
if (!first) writer.WriteRawValue(",");
first = false;
writer.WriteStartObject();
writer.WriteNumber("id", product.Id);
writer.WriteString("name", product.Name);
writer.WriteNumber("price", product.Price);
writer.WriteString("category", product.Category);
writer.WriteEndObject();
}
writer.WriteEndArray();
await writer.FlushAsync();
}
// Excel 导出(使用 ClosedXML 或 EPPlus)
public async Task<byte[]> ExportToExcelAsync(
Expression<Func<Product, bool>> filter)
{
var products = await _db.Products
.AsNoTracking()
.Where(filter)
.OrderBy(p => p.Id)
.Take(100000) // Excel 行数限制
.ToListAsync();
using var workbook = new XLWorkbook();
var worksheet = workbook.Worksheets.Add("Products");
// 表头
worksheet.Cell(1, 1).Value = "ID";
worksheet.Cell(1, 2).Value = "名称";
worksheet.Cell(1, 3).Value = "价格";
worksheet.Cell(1, 4).Value = "分类";
// 数据
for (int i = 0; i < products.Count; i++)
{
var row = i + 2;
worksheet.Cell(row, 1).Value = products[i].Id;
worksheet.Cell(row, 2).Value = products[i].Name;
worksheet.Cell(row, 3).Value = products[i].Price;
worksheet.Cell(row, 4).Value = products[i].Category;
}
using var stream = new MemoryStream();
workbook.SaveAs(stream);
return stream.ToArray();
}
private static string EscapeCsv(string value)
{
if (value.Contains(',') || value.Contains('"') || value.Contains('\n'))
{
return $"\"{value.Replace("\"", "\"\"")}\"";
}
return value;
}
}批量操作性能对比
批量插入 10,000 条数据的性能对比:
| 方式 | 耗时 | 内存占用 | 特点 |
|--------------------------|---------|----------|------------------------|
| EF Core 逐条 Add | ~30s | 高 | 每条一次 INSERT |
| EF Core AddRange | ~8s | 高 | 批量 INSERT,有追踪开销 |
| EFCore.BulkExtensions | ~1.2s | 中 | 绕过 ChangeTracker |
| SqlBulkCopy | ~0.5s | 中 | SQL Server 最高效 |
| Dapper + TVP | ~0.8s | 低 | 灵活,支持复杂 SQL |
选择建议:
- 日常 CRUD(<1000条): EF Core AddRange 即可
- 批量导入(1000-100万): EFCore.BulkExtensions 或 SqlBulkCopy
- 复杂 SQL + 批量: Dapper + 表值参数
- 需要跨数据库: EFCore.BulkExtensions(支持多种数据库)
- 纯数据迁移: SqlBulkCopy(最快)
注意事项:
1. 批量操作注意事务大小,太大可能导致锁表
2. 每批次 5000-10000 条是较优选择
3. 并行操作注意连接池大小(默认 100)
4. 长时间运行的任务考虑 CancellationToken
5. 大文件导入使用流式处理,避免一次性加载到内存优点
缺点
总结
批量操作选择:SQL Server 用 SqlBulkCopy(最快),EF Core 用 EFCore.BulkExtensions(方便),通用场景用分批处理。核心原则:大批拆小批(5000-10000 条/批),分批提交事务,内存不够用流式处理。并行处理注意每个线程用独立的 DbContext,避免上下文冲突。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
适用场景
- 当你准备把《批量操作与大数据处理》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《批量操作与大数据处理》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《批量操作与大数据处理》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《批量操作与大数据处理》最大的收益和代价分别是什么?
