Span 与 Memory 高级用法
大约 17 分钟约 5101 字
Span 与 Memory 高级用法
简介
在掌握了 Span<T> 和 Memory<T> 的基础知识后,本文将深入探讨其内部实现机制、内存池化技术、零拷贝解析、二进制协议处理等高级场景。这些技术在高性能网络服务、游戏引擎、实时数据处理等领域有广泛应用,是 .NET 高级开发者的必备技能。
特点
Span 内部实现
ref struct 底层结构
/// <summary>
/// Span 的简化内部表示(伪代码,帮助理解)
/// Span 本质上是一个值类型,包含两个字段:引用 + 长度
/// </summary>
public readonly ref struct Span<T>
{
// 内部的 byref-like 引用,指向数据起始位置
private readonly ref T _reference;
private readonly int _length;
public ref T this[int index]
{
get
{
// 边界检查( release 模式下可能被 JIT 消除)
if ((uint)index >= (uint)_length)
ThrowHelper.ThrowIndexOutOfRangeException();
return ref Unsafe.Add(ref _reference, (nint)(uint)index);
}
}
}
/// <summary>
/// 理解 Span 的内存布局
/// </summary>
public class SpanInternalsDemo
{
public void DemonstrateLayout()
{
// Span 在栈上的大小(x64 下为 16 字节:指针 8 + 长度 4 + padding)
Console.WriteLine($"Span<int> 大小: {System.Runtime.CompilerServices.Unsafe.SizeOf<Span<int>>()}");
// ReadOnlySpan 与 Span 的关系
int[] arr = { 1, 2, 3, 4, 5 };
Span<int> span = arr.AsSpan();
ReadOnlySpan<int> readOnly = span;
// span 可以修改原始数据
span[0] = 100;
Console.WriteLine(string.Join(", ", arr)); // 100, 2, 3, 4, 5
// ReadOnlySpan 不允许修改
// readOnly[0] = 200; // 编译错误
}
}Span 的安全约束
/// <summary>
/// Span 的编译期安全约束演示
/// </summary>
public class SpanSafetyConstraints
{
// 错误:Span 不能作为类的字段
// private Span<int> _data; // CS8345: 不能在 async/字段中使用 ref struct
// 错误:Span 不能在 async 方法中使用(跨 await 边界)
// public async Task ProcessAsync()
// {
// Span<int> data = stackalloc int[100];
// await Task.Delay(1); // 编译错误
// _ = data.Length;
// }
// 正确:在同步代码中使用
public void ProcessSync()
{
Span<int> data = stackalloc int[100];
data.Fill(42);
ProcessSpan(data);
}
private void ProcessSpan(Span<int> data)
{
for (int i = 0; i < data.Length; i++)
{
data[i] = i * 2;
}
}
// 正确:在 async 方法中使用 Memory<T> 替代
public async Task ProcessAsync()
{
Memory<int> data = new int[100];
await Task.Delay(1);
ProcessMemory(data);
}
private void ProcessMemory(Memory<int> data)
{
Span<int> span = data.Span;
for (int i = 0; i < span.Length; i++)
{
span[i] = i * 2;
}
}
// 错误:Span 不能装箱
// object box = span; // 编译错误
// 错误:Span 不能用于泛型约束不匹配的场景
// List<Span<int>> list = new(); // 编译错误
// 错误:Span 不能捕获到 Lambda
// public void LambdaCapture()
// {
// Span<int> data = stackalloc int[10];
// Func<int> fn = () => data[0]; // 编译错误
// }
}Memory 与 Span 的选择策略
核心区别与使用场景
/// <summary>
/// Memory<T> vs Span<T> 详细对比
/// </summary>
public class MemoryVsSpan
{
/// <summary>
/// 何时使用 Span<T>
/// - 同步处理,短生命周期
/// - 方法参数传递
/// - 栈上临时计算
/// </summary>
public int SumWithSpan(ReadOnlySpan<int> data)
{
int sum = 0;
foreach (int item in data)
{
sum += item;
}
return sum;
}
/// <summary>
/// 何时使用 Memory<T>
/// - 需要跨 await
/// - 需要存储为字段
/// - 需要在队列/缓存中保存
/// </summary>
private Memory<byte> _buffer = new byte[4096];
public async Task<int> ReadFromNetworkAsync(Stream stream)
{
// Memory 可以跨 await 使用
int bytesRead = await stream.ReadAsync(_buffer);
return bytesRead;
}
/// <summary>
/// Memory 的所有权模型
/// </summary>
public void OwnershipModel()
{
// Memory 本身不拥有数据,只是引用
var array = new byte[1024];
Memory<byte> memory1 = array; // 引用同一个数组
Memory<byte> memory2 = memory1.Slice(0, 512); // 引用同一数组的一部分
// 通过 memory2 修改会影响 memory1
memory2.Span.Fill(0xAB);
// 使用 IMemoryOwner 明确所有权(见下文)
}
}IMemoryOwner 资源管理
using System.Buffers;
/// <summary>
/// IMemoryOwner<T> 实现明确的内存所有权管理
/// 谁拥有 IMemoryOwner,谁负责 Dispose
/// </summary>
public class MemoryOwnerDemo : IDisposable
{
private IMemoryOwner<byte>? _owner;
/// <summary>
/// 使用 MemoryPool 租用内存
/// </summary>
public void RentFromPool(int size)
{
_owner = MemoryPool<byte>.Shared.Rent(size);
Console.WriteLine($"租用大小: {_owner.Memory.Length}"); // 可能大于请求的 size
}
/// <summary>
/// 处理租用的内存
/// </summary>
public void ProcessData()
{
if (_owner == null) return;
// 获取 Span 进行操作
Span<byte> span = _owner.Memory.Span;
for (int i = 0; i < span.Length; i++)
{
span[i] = (byte)(i % 256);
}
}
public void Dispose()
{
// 归还内存到池
_owner?.Dispose();
_owner = null;
}
}
/// <summary>
/// 使用 IMemoryOwner 的工厂模式
/// </summary>
public static class DataProcessor
{
public static IMemoryOwner<byte> Process(ReadOnlySpan<byte> input)
{
// 租用输出缓冲区
var owner = MemoryPool<byte>.Shared.Rent(input.Length);
var output = owner.Memory.Span;
// 处理数据(例如:简单翻转)
input.CopyTo(output);
output.Reverse();
return owner; // 所有权转移给调用者
}
}
/// <summary>
/// 使用示例
/// </summary>
public class OwnerUsageExample
{
public void Run()
{
ReadOnlySpan<byte> input = stackalloc byte[] { 1, 2, 3, 4, 5 };
using (IMemoryOwner<byte> result = DataProcessor.Process(input))
{
// 使用结果
ReadOnlySpan<byte> data = result.Memory.Span;
Console.WriteLine($"处理结果: {data[0]}, {data[^1]}"); // 5, 1
} // 自动 Dispose,归还内存
}
}ArrayPool 深入
ArrayPool 高级用法
using System.Buffers;
/// <summary>
/// ArrayPool<T> 的高级使用模式
/// </summary>
public class ArrayPoolAdvanced
{
/// <summary>
/// 基本租借与归还
/// </summary>
public void BasicUsage()
{
int[] buffer = ArrayPool<int>.Shared.Rent(1024);
try
{
// 注意:Rent 返回的数组可能大于请求的大小
Console.WriteLine($"请求 1024, 实际: {buffer.Length}");
// 只使用你需要的部分
var span = buffer.AsSpan(0, 1024);
span.Fill(42);
ProcessData(span);
}
finally
{
// 归还时可以清除数据(防止信息泄露)
ArrayPool<int>.Shared.Return(buffer, clearArray: true);
}
}
/// <summary>
/// 使用 MemoryOwner 简化 ArrayPool 管理
/// </summary>
public void WithMemoryOwner()
{
using var owner = MemoryPool<byte>.Shared.Rent(4096);
Span<byte> buffer = owner.Memory.Span;
// 使用 buffer...
buffer.Fill(0xFF);
// 自动归还,无需手动 Return
}
/// <summary>
/// 自定义 ArrayPool(控制最大数组大小)
/// </summary>
public void CustomPool()
{
// maxArrayLength: 最大数组长度(2的幂,1-30 对应 2^1 到 2^30)
// maxArraysPerBucket: 每个桶中最大缓存数组数
var pool = ArrayPool<byte>.Create(
maxArrayLength: 1024 * 1024, // 1MB
maxArraysPerBucket: 50
);
byte[] buffer = pool.Rent(512);
try
{
// 使用 buffer
}
finally
{
pool.Return(buffer);
}
}
private void ProcessData(Span<int> data) { }
}
/// <summary>
/// 高频场景:网络包处理中的 ArrayPool 使用
/// </summary>
public class NetworkPacketProcessor
{
private readonly ArrayPool<byte> _pool = ArrayPool<byte>.Shared;
public async Task ProcessStreamAsync(Stream stream, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
byte[] buffer = _pool.Rent(8192);
try
{
int bytesRead = await stream.ReadAsync(buffer, 0, 8192, ct);
if (bytesRead == 0) break;
// 只处理实际读取的字节
ProcessPacket(buffer.AsSpan(0, bytesRead));
}
finally
{
_pool.Return(buffer, clearArray: true);
}
}
}
private void ProcessPacket(Span<byte> packet)
{
// 解析网络包...
}
}流水线式数据处理(Pipelined Processing)
使用 Pipe 实现零拷贝流水线
using System.IO.Pipelines;
/// <summary>
/// 使用 System.IO.Pipelines 实现高性能流水线处理
/// </.combine Pipe + Span 实现零拷贝解析
/// </summary>
public class PipelinedProcessor
{
/// <summary>
/// 从流中读取并流水线处理
/// </summary>
public async Task ProcessStreamAsync(Stream stream, CancellationToken ct)
{
var pipe = new Pipe();
// 生产者任务:从流填充 Pipe
Task fillTask = FillPipeAsync(stream, pipe.Writer, ct);
// 消费者任务:从 Pipe 读取并处理
Task consumeTask = ConsumePipeAsync(pipe.Reader, ct);
await Task.WhenAll(fillTask, consumeTask);
}
private async Task FillPipeAsync(Stream stream, PipeWriter writer, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
// 从 writer 获取至少 512 字节的 Span
Memory<byte> memory = writer.GetMemory(512);
try
{
int bytesRead = await stream.ReadAsync(memory, ct);
if (bytesRead == 0) break;
// 告知 PipeWriter 已写入的数据量
writer.Advance(bytesRead);
}
catch (Exception ex)
{
Console.WriteLine($"读取异常: {ex.Message}");
break;
}
// 刷新,让消费者可以读取
FlushResult result = await writer.FlushAsync(ct);
if (result.IsCompleted) break;
}
await writer.CompleteAsync();
}
private async Task ConsumePipeAsync(PipeReader reader, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
ReadResult result = await reader.ReadAsync(ct);
ReadOnlySequence<byte> buffer = result.Buffer;
// 尝试解析完整的消息
SequencePosition consumed = ParseMessages(buffer);
// 标记已消费的部分
reader.AdvanceTo(consumed, buffer.End);
if (result.IsCompleted) break;
}
await reader.CompleteAsync();
}
private SequencePosition ParseMessages(ReadOnlySequence<byte> buffer)
{
SequenceReader<byte> reader = new SequenceReader<byte>(buffer);
while (!reader.End)
{
// 假设消息格式:[4字节长度][N字节数据]
if (reader.Remaining < 4) break;
// 读取长度(不移动位置)
if (!reader.TryReadLittleEndian(out int length)) break;
if (reader.Remaining < length)
{
reader.Rewind(4); // 长度不够,回退
break;
}
// 读取消息体
ReadOnlySpan<byte> messageBody = stackalloc byte[length]; // 小消息用栈
if (length <= 256)
{
Span<byte> temp = stackalloc byte[length];
reader.TryCopyTo(temp);
ProcessMessage(temp);
}
else
{
// 大消息用池
byte[] rented = ArrayPool<byte>.Shared.Rent(length);
try
{
Span<byte> temp = rented.AsSpan(0, length);
reader.TryCopyTo(temp);
ProcessMessage(temp);
}
finally
{
ArrayPool<byte>.Shared.Return(rented);
}
}
}
return reader.Position;
}
private void ProcessMessage(ReadOnlySpan<byte> message)
{
// 处理单条消息
}
}零拷贝解析实战
二进制协议解析
using System.Buffers.Binary;
/// <summary>
/// 自定义二进制协议解析器
/// 协议格式:
/// [Magic: 2B][Version: 1B][Flags: 1B][PayloadLen: 4B][Payload: NB][CRC32: 4B]
/// </summary>
public ref struct BinaryProtocolParser
{
private ReadOnlySpan<byte> _buffer;
public BinaryProtocolParser(ReadOnlySpan<byte> buffer)
{
_buffer = buffer;
}
public bool IsValid => _buffer.Length >= 12; // 最小头部大小
public ushort Magic => BinaryPrimitives.ReadUInt16BigEndian(_buffer);
public byte Version => _buffer[2];
public byte Flags => _buffer[3];
public int PayloadLength => BinaryPrimitives.ReadInt32BigEndian(_buffer.Slice(4, 4));
public ReadOnlySpan<byte> Payload => _buffer.Slice(8, PayloadLength);
public uint Crc32 => BinaryPrimitives.ReadUInt32BigEndian(_buffer.Slice(8 + PayloadLength, 4));
/// <summary>
/// 验证 CRC
/// </summary>
public bool ValidateChecksum()
{
// 计算 0 到 Payload 末尾的 CRC
uint computed = ComputeCrc32(_buffer.Slice(0, 8 + PayloadLength));
return computed == Crc32;
}
/// <summary>
/// 获取完整消息所需的字节数(用于粘包处理)
/// </summary>
public static int GetMessageLength(ReadOnlySpan<byte> header)
{
if (header.Length < 8) return -1; // 头部不完整
return 12 + BinaryPrimitives.ReadInt32BigEndian(header.Slice(4, 4));
}
private static uint ComputeCrc32(ReadOnlySpan<byte> data)
{
// 简化的 CRC32 实现
uint crc = 0xFFFFFFFF;
foreach (byte b in data)
{
crc ^= b;
for (int i = 0; i < 8; i++)
{
crc = (crc >> 1) ^ (crc & 1) * 0xEDB88320;
}
}
return ~crc;
}
}
/// <summary>
/// 二进制协议解析使用示例
/// </summary>
public class ProtocolUsage
{
public void ParseIncomingData(ReadOnlySpan<byte> data)
{
while (data.Length > 0)
{
int requiredLength = BinaryProtocolParser.GetMessageLength(data);
if (requiredLength < 0 || data.Length < requiredLength)
break; // 数据不完整,等待更多数据
var parser = new BinaryProtocolParser(data.Slice(0, requiredLength));
if (parser.Magic != 0xABCD)
{
Console.WriteLine("无效 Magic");
break;
}
if (!parser.ValidateChecksum())
{
Console.WriteLine("CRC 校验失败");
break;
}
// 处理 payload
HandlePayload(parser.Version, parser.Flags, parser.Payload);
// 移动到下一条消息
data = data.Slice(requiredLength);
}
}
private void HandlePayload(byte version, byte flags, ReadOnlySpan<byte> payload) { }
}零拷贝文本解析
/// <summary>
/// 使用 Span 进行零拷贝 CSV 解析
/// </summary>
public ref struct CsvSpanParser
{
private ReadOnlySpan<char> _line;
private readonly char _separator;
public CsvSpanParser(ReadOnlySpan<char> line, char separator = ',')
{
_line = line;
_separator = separator;
}
public bool HasMore => !_line.IsEmpty;
public ReadOnlySpan<char> ReadNext()
{
if (_line.IsEmpty) return ReadOnlySpan<char>.Empty;
int sepIndex = _line.IndexOf(_separator);
if (sepIndex < 0)
{
// 最后一个字段
var field = _line;
_line = ReadOnlySpan<char>.Empty;
return Unescape(field);
}
var result = _line.Slice(0, sepIndex);
_line = _line.Slice(sepIndex + 1);
return Unescape(result);
}
private ReadOnlySpan<char> Unescape(ReadOnlySpan<char> field)
{
// 处理引号包裹的字段
if (field.Length >= 2 && field[0] == '"' && field[^1] == '"')
{
return field.Slice(1, field.Length - 2);
}
return field;
}
}
/// <summary>
/// CSV 解析示例
/// </summary>
public class CsvParserExample
{
public void ParseCsv(ReadOnlySpan<char> csvData)
{
int lineStart = 0;
while (lineStart < csvData.Length)
{
int lineEnd = csvData.Slice(lineStart).IndexOf('\n');
if (lineEnd < 0) lineEnd = csvData.Length - lineStart;
var line = csvData.Slice(lineStart, lineEnd).TrimEnd('\r');
if (!line.IsEmpty)
{
ProcessLine(line);
}
lineStart += lineEnd + 1;
}
}
private void ProcessLine(ReadOnlySpan<char> line)
{
var parser = new CsvSpanParser(line);
int col = 0;
while (parser.HasMore)
{
ReadOnlySpan<char> field = parser.ReadNext();
// 零拷贝:field 直接引用原始字符串,没有新分配
ProcessField(col, field);
col++;
}
}
private void ProcessField(int column, ReadOnlySpan<char> value) { }
}性能基准测试
BenchmarkDotNet 示例
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
/// <summary>
/// Span vs 传统方式的性能对比
/// </summary>
[MemoryDiagnoser]
public class SpanBenchmarks
{
private byte[] _data = null!;
private string _text = null!;
[GlobalSetup]
public void Setup()
{
_data = new byte[1024 * 1024]; // 1MB
Random.Shared.NextBytes(_data);
_text = new string('a', 1_000_000);
}
// 场景1:数组求和
[Benchmark(Baseline = true)]
public int SumWithArray()
{
int sum = 0;
for (int i = 0; i < _data.Length; i++)
{
sum += _data[i];
}
return sum;
}
[Benchmark]
public int SumWithSpan()
{
int sum = 0;
Span<byte> span = _data.AsSpan();
for (int i = 0; i < span.Length; i++)
{
sum += span[i];
}
return sum;
}
// 场景2:字符串分割
[Benchmark(Baseline = true)]
public int CountWordsWithString()
{
return _text.Split(' ').Length;
}
[Benchmark]
public int CountWordsWithSpan()
{
int count = 0;
ReadOnlySpan<char> span = _text.AsSpan();
foreach (var range in span.Split(' '))
{
count++;
}
return count;
}
// 场景3:字节数组复制
[Benchmark(Baseline = true)]
public byte[] CopyWithArray()
{
var copy = new byte[_data.Length];
Array.Copy(_data, copy, _data.Length);
return copy;
}
[Benchmark]
public byte[] CopyWithSpan()
{
var copy = new byte[_data.Length];
_data.AsSpan().CopyTo(copy);
return copy;
}
}
/// <summary>
/// 内存池 vs 直接分配对比
/// </summary>
[MemoryDiagnoser]
public class PoolBenchmarks
{
[Benchmark(Baseline = true)]
public void DirectAllocation()
{
for (int i = 0; i < 1000; i++)
{
var buffer = new byte[8192];
buffer[0] = 1; // 模拟使用
}
}
[Benchmark]
public void ArrayPoolRent()
{
for (int i = 0; i < 1000; i++)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(8192);
buffer[0] = 1;
ArrayPool<byte>.Shared.Return(buffer);
}
}
}高级 stackalloc 技巧
动态栈分配与安全模式
using System.Runtime.CompilerServices;
/// <summary>
/// 高级 stackalloc 使用模式
/// </summary>
public class AdvancedStackalloc
{
/// <summary>
/// 模式1:条件选择栈或堆分配
/// </summary>
public void ConditionalAlloc(int size)
{
// 小数据用栈,大数据用堆
if (size <= 256)
{
Span<byte> buffer = stackalloc byte[size];
ProcessSmall(buffer);
}
else
{
Span<byte> buffer = new byte[size];
ProcessLarge(buffer);
}
}
/// <summary>
/// 模式2:使用stackalloc初始化的技巧
/// </summary>
public void InitPatterns()
{
// C# 7.2+ 支持在表达式中使用 stackalloc
ReadOnlySpan<int> numbers = stackalloc int[] { 1, 2, 3, 4, 5 };
// 在 Span 方法调用中直接使用
int index = stackalloc byte[128].IndexOf((byte)0);
// 利用 Span 构建临时查找表
Span<int> lookup = stackalloc int[256];
lookup.Fill(-1);
for (int i = 0; i < 256; i++)
{
lookup[i] = i * 2;
}
}
/// <summary>
/// 模式3:避免栈溢出的安全封装
/// </summary>
public static Span<byte> SafeBuffer(int size)
{
if (size <= 0)
throw new ArgumentOutOfRangeException(nameof(size));
// 默认最大栈分配 1MB
const int MaxStackSize = 1024 * 1024;
if (size <= MaxStackSize)
{
return Span<byte>.Create(ref Unsafe.AsRef<byte>(
(void*)System.Runtime.InteropServices.NativeMemory.AllocStack((nuint)size)),
size);
}
// 超过限制,使用 ArrayPool
return ArrayPool<byte>.Shared.Rent(size);
}
/// <summary>
/// 模式4:递归中使用 stackalloc 的注意事项
/// </summary>
public int RecursiveProcess(ReadOnlySpan<int> data, int depth = 0)
{
if (data.IsEmpty || depth > 100) return 0;
// 每次递归分配少量栈空间
Span<int> temp = stackalloc int[16]; // 固定小尺寸,避免栈溢出
int mid = data.Length / 2;
return data[mid]
+ RecursiveProcess(data.Slice(0, mid), depth + 1)
+ RecursiveProcess(data.Slice(mid + 1), depth + 1);
}
private void ProcessSmall(Span<byte> buffer) { }
private void ProcessLarge(Span<byte> buffer) { }
}使用 MemoryMarshal 进行高级操作
using System.Runtime.InteropServices;
/// <summary>
/// MemoryMarshal 高级操作
/// </summary>
public class MemoryMarshalAdvanced
{
/// <summary>
/// 类型重新解释(Cast)
/// </summary>
public void CastBetweenTypes()
{
// byte[] -> int[] 的零拷贝视图
Span<byte> bytes = stackalloc byte[16];
for (int i = 0; i < bytes.Length; i++) bytes[i] = (byte)i;
// 将 byte 视图转为 int 视图
Span<int> ints = MemoryMarshal.Cast<byte, int>(bytes);
Console.WriteLine($"16 bytes = {ints.Length} ints"); // 4
// ints 和 bytes 引用同一块内存
ints[0] = 0x01020304;
// bytes[0..4] 现在是小端序的 0x01020304
}
/// <summary>
/// 获取数组的引用
/// </summary>
public void GetArrayDataReference()
{
int[] array = { 10, 20, 30, 40, 50 };
// 获取第一个元素的引用(即使数组为空也安全)
ref int reference = ref MemoryMarshal.GetArrayDataReference(array);
Console.WriteLine(reference); // 10
// 通过引用修改
reference = 100;
Console.WriteLine(array[0]); // 100
}
/// <summary>
/// 创建只读引用
/// </summary>
public void CreateReadOnlyFromNullable()
{
string? str = "Hello, Span!";
if (str != null)
{
// 从字符串获取只读 span
ReadOnlySpan<char> span = MemoryMarshal.AsReadOnlySpan(str);
Console.WriteLine(span.Length);
}
}
/// <summary>
/// 将 struct 视为 byte Span
/// </summary>
[StructLayout(LayoutKind.Sequential, Pack = 1)]
public struct NetworkHeader
{
public ushort SourcePort;
public ushort DestPort;
public uint SequenceNumber;
public uint AckNumber;
}
public void StructToBytes()
{
var header = new NetworkHeader
{
SourcePort = 8080,
DestPort = 443,
SequenceNumber = 1000,
AckNumber = 2000
};
// 将 struct 转为 byte Span
ReadOnlySpan<byte> bytes = MemoryMarshal.AsBytes(
MemoryMarshal.CreateReadOnlySpan(ref header, 1));
Console.WriteLine($"Header 大小: {bytes.Length} bytes"); // 12
// 反向:从 byte Span 读取 struct
ref NetworkHeader parsed = ref MemoryMarshal.Cast<byte, NetworkHeader>(
MemoryMarshal.AsBytes(MemoryMarshal.CreateReadOnlySpan(ref header, 1)))[0];
}
}何时不应使用 Span
Span 的局限与替代方案
/// <summary>
/// Span 不适用的场景及替代方案
/// </summary>
public class SpanLimitations
{
/// <summary>
/// 场景1:需要跨 async 边界传递
/// 使用 Memory<T> 替代
/// </summary>
public async Task<byte[]> AsyncProcess(Stream stream)
{
// 错误:Span 不能跨 await
// Span<byte> buffer = stackalloc byte[1024];
// 正确:使用 Memory<T>
Memory<byte> buffer = new byte[1024];
int read = await stream.ReadAsync(buffer);
// 在同步部分可以使用 Span
ProcessBuffer(buffer.Span.Slice(0, read));
return buffer.Span.Slice(0, read).ToArray();
}
/// <summary>
/// 场景2:需要存储在集合中
/// 使用 Memory<T> 或 ReadOnlyMemory<T>
/// </summary>
public void StoreInCollection()
{
// 错误:List<Span<byte>> 不合法
// 正确:使用 Memory
var chunks = new List<Memory<byte>>();
byte[] data = new byte[1024];
chunks.Add(data.AsMemory(0, 256));
chunks.Add(data.AsMemory(256, 256));
chunks.Add(data.AsMemory(512, 512));
}
/// <summary>
/// 场景3:需要作为接口返回值
/// 使用 Memory<T> 或 IMemoryOwner<T>
/// </summary>
public interface IBufferProvider
{
// 不行:Span 不能作为接口方法的返回值(某些场景可 ref return)
// Span<byte> GetBuffer();
// 正确
Memory<byte> GetBuffer();
IMemoryOwner<byte> RentBuffer(int size);
}
/// <summary>
/// 场景4:超大数据处理
/// 使用 ReadOnlySequence<T> 替代
/// </summary>
public void LargeDataProcessing()
{
// 当数据分散在多个缓冲区时,使用 ReadOnlySequence
var segments = new List<byte[]>
{
new byte[1024],
new byte[2048],
new byte[1024]
};
// 构建连续视图
var sequence = new ReadOnlySequence<byte>(
new FirstSegment(segments[0]),
0,
new LastSegment(segments[^1]),
segments[^1].Length
);
// 可以像操作单个 Span 一样操作
long totalLength = sequence.Length;
}
private void ProcessBuffer(Span<byte> buffer) { }
}
/// <summary>
/// 辅助类:ReadOnlySequence 的 Segment
/// </summary>
internal class FirstSegment : ReadOnlySequenceSegment<byte>
{
public FirstSegment(byte[] data)
{
Memory = data;
RunningIndex = 0;
}
}
internal class LastSegment : ReadOnlySequenceSegment<byte>
{
public LastSegment(byte[] data)
{
Memory = data;
}
}实战:高性能消息解析器
完整示例
using System.Buffers;
using System.IO.Pipelines;
/// <summary>
/// 完整的高性能消息解析器示例
/// 整合 ArrayPool + Pipe + Span
/// </summary>
public class HighPerformanceMessageParser : IAsyncDisposable
{
private readonly Pipe _pipe = new(new PipeOptions(
pauseWriterThreshold: 1_000_000,
resumeWriterThreshold: 500_000,
minimumSegmentSize: 8192
));
private int _messageCount;
public async Task ReadFromStreamAsync(Stream stream, CancellationToken ct)
{
var writer = _pipe.Writer;
while (!ct.IsCancellationRequested)
{
Memory<byte> memory = writer.GetMemory(8192);
int bytesRead = await stream.ReadAsync(memory, ct);
if (bytesRead == 0) break;
writer.Advance(bytesRead);
FlushResult flushResult = await writer.FlushAsync(ct);
if (flushResult.IsCompleted) break;
}
await writer.CompleteAsync();
}
public async Task ProcessMessagesAsync(
Func<ReadOnlyMemory<byte>, CancellationToken, ValueTask> handler,
CancellationToken ct)
{
var reader = _pipe.Reader;
while (!ct.IsCancellationRequested)
{
ReadResult result = await reader.ReadAsync(ct);
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition consumed = await ProcessBufferAsync(buffer, handler, ct);
reader.AdvanceTo(consumed, buffer.End);
if (result.IsCompleted) break;
}
await reader.CompleteAsync();
}
private async Task<SequencePosition> ProcessBufferAsync(
ReadOnlySequence<byte> buffer,
Func<ReadOnlyMemory<byte>, CancellationToken, ValueTask> handler,
CancellationToken ct)
{
SequenceReader<byte> reader = new SequenceReader<byte>(buffer);
while (!reader.End)
{
if (reader.Remaining < 4)
break;
if (!reader.TryReadLittleEndian(out int length) || length <= 0)
continue;
if (reader.Remaining < length)
{
reader.Rewind(4);
break;
}
// 使用 ArrayPool 处理大消息
if (length <= 256)
{
byte[] rented = ArrayPool<byte>.Shared.Rent(length);
try
{
reader.TryCopyTo(rented);
reader.Advance(length);
await handler(rented.AsMemory(0, length), ct);
}
finally
{
ArrayPool<byte>.Shared.Return(rented);
}
}
else
{
// 小消息直接从 Sequence 中切片
var slice = buffer.Slice(reader.Position, length);
byte[] message = slice.ToArray();
reader.Advance(length);
await handler(message, ct);
}
Interlocked.Increment(ref _messageCount);
}
return reader.Position;
}
public int MessageCount => Volatile.Read(ref _messageCount);
public async ValueTask DisposeAsync()
{
_pipe.Reader.Complete();
_pipe.Writer.Complete();
}
}性能注意事项
- 栈分配限制:
stackalloc默认线程栈大小约 1MB(Windows),避免在递归或循环中大量使用 - ArrayPool 泄漏:忘记 Return 会导致池耗尽,务必使用 try/finally 或 using 模式
- Span 边界检查:Release 模式 JIT 可能消除部分检查,但复杂索引仍会产生开销
- Memory 转 Span:
memory.Span有获取锁开销,在热路径中应缓存结果 - Pipe 反压:设置合理的 pauseWriterThreshold,避免生产者过度填充导致内存膨胀
- ReadOnlySequence 切片:多段 Sequence 的 Slice 比单段慢,尽量保持单段
总结
Span 和 Memory 的高级用法核心在于:理解内存所有权、选择合适的池化策略、避免不必要的分配。在流水线处理和二进制解析场景中,System.IO.Pipelines + Span + ArrayPool 的组合可以提供接近 C 语言级别的性能,同时保持 C# 的安全性。
关键知识点
- Span 是 ref struct,只能在栈上,不能跨 async/存储
- Memory 可以跨 async,但 Span.Memory 属性访问有微小开销
- ArrayPool.Rent 返回的数组可能比请求的大
- IMemoryOwner 是明确内存所有权的推荐模式
- System.IO.Pipelines 提供了生产者-消费者模式的零拷贝管道
- MemoryMarshal.Cast 允许零拷贝类型转换,但要注意对齐和平台字节序
- ReadOnlySequence 用于处理分散在多个缓冲区的数据
常见误区
| 误区 | 正确理解 |
|---|---|
| Span 可以替代所有数组操作 | Span 适用于短生命周期的热路径,长期持有数据用 Memory |
| ArrayPool.Rent 恰好返回请求的大小 | 实际返回的是 2 的幂次大小,可能更大 |
| stackalloc 可以分配任意大小 | 受线程栈大小限制(默认约 1MB),过大导致 StackOverflow |
| Memory.Span 属性无开销 | Memory.Span 内部有同步访问开销,热路径应缓存 |
| Span 比数组索引更快 | 在简单遍历中差异很小,优势在于避免分配 |
| ReadOnlySequence 等同于 Span | Sequence 可能跨多个段,访问模式不同 |
进阶路线
- 入门阶段:掌握 Span 基本操作、创建方式和安全约束
- 进阶阶段:理解 ArrayPool/MemoryPool 池化、IMemoryOwner 所有权
- 高级阶段:使用 System.IO.Pipelines 构建流水线、二进制协议解析
- 专家阶段:MemoryMarshal 底层操作、Unsafe 高性能技巧、自定义 Pipe 调度
- 极致优化:SIMD + Span 向量化处理、NativeMemory 与 Span 结合
适用场景
- 高频网络包解析和处理
- 二进制协议(Protobuf、MessagePack 等)的底层序列化
- 实时数据流处理(股票行情、IoT 传感器)
- 大文件分块读写
- 图像像素处理
- 文本解析(CSV、JSON、日志文件)
- 游戏引擎中的内存管理
落地建议
- 先在关键热路径中引入 Span,不要全面重构
- ArrayPool 使用必须配合 try/finally 或 using,确保归还
- 使用 BenchmarkDotNet 验证优化效果,避免过早优化
- 在 CI 中加入 Span 安全规则(如禁止在 async 方法中使用 stackalloc)
- 封装内部使用 Span 的公共 API,对外暴露 Memory 或数组
排错清单
复盘问题
- 你的项目中哪些场景可以受益于 Span/Memory 的零拷贝特性?
- 是否存在 ArrayPool 租借未归还的情况?如何系统性地防止?
- 在 async/await 为主的代码中,如何优雅地结合 Span 和 Memory?
- Pipe 的反压机制如何与上游生产者协调?
- 如何在不牺牲安全性的前提下最大化 Span 的性能优势?
