Python 性能剖析与优化
大约 16 分钟约 4859 字
Python 性能剖析与优化
简介
Python 性能优化是在保证代码可读性和可维护性的前提下,通过分析瓶颈、选择更优算法和数据结构、利用内置函数和 C 扩展等手段提升程序执行效率的过程。Python 因其动态类型和解释执行的特性,原生性能约为 C 语言的 1/50 到 1/10,但在大多数业务场景中,性能瓶颈通常集中在少数几个热点函数上,优化这些热点就能获得显著提升。
性能优化的核心原则是先度量、再优化。过早优化是万恶之源,而凭直觉猜测瓶颈往往猜错。通过 cProfile、line_profiler、memory_profiler 等工具精确定位热点,然后针对性地优化,才能做到投入产出比最高。
常见的 Python 性能问题包括:低效的循环和算法、不恰当的数据结构选择、过多的函数调用开销、GIL 导致的多线程无法利用多核、内存泄漏导致频繁 GC 等。每种问题都有对应的诊断工具和优化方案。
特点
cProfile 性能剖析
基础剖析
import cProfile
import pstats
import io
import time
def slow_function():
"""模拟慢函数"""
total = 0
for i in range(1000000):
total += i
return total
def fast_function():
"""优化后的函数"""
return sum(range(1000000))
def mixed_workload():
"""模拟混合工作负载"""
# 字符串拼接(慢)
result = ""
for i in range(10000):
result += str(i)
# 列表推导(快)
squares = [x ** 2 for x in range(10000)]
# 字典操作
lookup = {i: i * 2 for i in range(10000)}
values = [lookup.get(i, 0) for i in range(5000)]
return result, squares, values
def main():
slow_function()
fast_function()
mixed_workload()
# 方式 1: 命令行剖析
# python -m cProfile -s cumulative my_script.py
# 方式 2: 代码内剖析
pr = cProfile.Profile()
pr.enable()
main()
pr.disable()
# 格式化输出
s = io.StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(20) # 显示前 20 个函数
print(s.getvalue())
# 方式 3: 保存剖析结果
pr.dump_stats('profile_output.prof')
# 使用 snakeviz 可视化: pip install snakeviz && snakeviz profile_output.prof
# 方式 4: 装饰器方式
def profile_func(func):
"""性能剖析装饰器"""
def wrapper(*args, **kwargs):
pr = cProfile.Profile()
pr.enable()
result = func(*args, **kwargs)
pr.disable()
ps = pstats.Stats(pr).sort_stats('cumulative')
print(f"\n=== {func.__name__} 性能剖析 ===")
ps.print_stats(10)
return result
return wrapper
@profile_func
def my_heavy_function():
return [x ** 2 for x in range(100000)]剖析结果解读
"""
cProfile 输出解读:
1000004 function calls in 0.134 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.134 0.134 script.py:1(main)
1 0.041 0.041 0.092 0.092 script.py:5(slow_function)
1 0.051 0.051 0.051 0.051 script.py:10(fast_function)
1 0.015 0.015 0.041 0.041 script.py:13(mixed_workload)
字段说明:
- ncalls: 调用次数
- tottime: 函数自身执行时间(不含子函数)
- percall: tottime / ncalls
- cumtime: 函数累计执行时间(含子函数)
- percall: cumtime / ncalls
优化方向:
1. 找 tottime 最高的函数 -> 这是 CPU 热点
2. 找 ncalls 异常高的函数 -> 可能有冗余调用
3. 找 cumtime 高但 tottime 低的函数 -> 子函数是瓶颈
"""line_profiler 逐行剖析
# pip install line_profiler
# 逐行剖析: 比 cProfile 更精确,能看到每行的执行时间
from line_profiler import LineProfiler
def process_data(data: list) -> dict:
"""需要优化的数据处理函数"""
result = {}
# 第 1 段: 过滤数据
filtered = []
for item in data:
if item['value'] > 100: # 可能的热点行
filtered.append(item)
# 第 2 段: 分组统计
for item in filtered:
category = item['category']
if category not in result:
result[category] = {'count': 0, 'total': 0}
result[category]['count'] += 1
result[category]['total'] += item['value']
# 第 3 段: 计算平均值
for cat in result:
result[cat]['average'] = (
result[cat]['total'] / result[cat]['count']
)
return result
# 使用 line_profiler
lp = LineProfiler()
lp_wrapper = lp(process_data)
# 准备测试数据
import random
test_data = [
{'value': random.randint(50, 200), 'category': random.choice(['A', 'B', 'C'])}
for _ in range(100000)
]
lp_wrapper(test_data)
lp.print_stats()
"""
输出示例:
Timer unit: 1e-06 s
Total time: 0.0523 s
File: script.py
Function: process_data at line 5
Line # Hits Time Per Hit % Time Line Contents
==============================================================
5 def process_data(data):
6 1 2 2.0 0.0 result = {}
8 1 1 1.0 0.0 filtered = []
9 100001 45000 0.5 86.1 for item in data:
10 100001 15000 0.2 28.7 if item['value'] > 100:
11 50000 8000 0.2 15.3 filtered.append(item)
...
分析: 第9行的循环消耗了86%的时间,是优化重点
"""memory_profiler 内存剖析
# pip install memory_profiler
from memory_profiler import profile
@profile
def memory_intensive_function():
"""内存密集型函数示例"""
# 大列表
data_list = [x * 2 for x in range(1000000)]
print(f"列表大小: {len(data_list)}")
# 转为集合
data_set = set(data_list)
print(f"集合大小: {len(data_set)}")
# 转为字典
data_dict = {x: x ** 2 for x in range(500000)}
print(f"字典大小: {len(data_dict)}")
# 生成器(不占内存)
data_gen = (x * 2 for x in range(1000000))
first_10 = list(next(data_gen) for _ in range(10))
print(f"生成器取前10个: {first_10}")
# 清理
del data_list
del data_set
return data_dict
# 运行: python -m memory_profiler script.py
"""
输出示例:
Line # Mem usage Increment Occurrences Line Contents
=============================================================
3 50.1 MiB 50.1 MiB 1 def memory_intensive_function():
5 75.3 MiB 25.2 MiB 1 data_list = [x*2 for x in range(1000000)]
8 95.7 MiB 20.4 MiB 1 data_set = set(data_list)
11 120.2 MiB 24.5 MiB 1 data_dict = {x: x**2 for x in range(500000)}
14 120.2 MiB 0.0 MiB 1 data_gen = (x*2 for x in range(1000000))
19 95.4 MiB -24.8 MiB 1 del data_list
"""
# 内存快照对比
import tracemalloc
def compare_memory_usage():
"""对比不同实现的内存使用"""
tracemalloc.start()
# 方式 1: 列表
snapshot1 = tracemalloc.take_snapshot()
data_list = list(range(1000000))
snapshot2 = tracemalloc.take_snapshot()
stats = snapshot2.compare_to(snapshot1, 'lineno')
for stat in stats[:3]:
print(stat)
del data_list
# 方式 2: 生成器
snapshot3 = tracemalloc.take_snapshot()
data_gen = (x for x in range(1000000))
snapshot4 = tracemalloc.take_snapshot()
stats = snapshot4.compare_to(snapshot3, 'lineno')
for stat in stats[:3]:
print(stat)py-spy 采样剖析
# py-spy: 无需修改代码的采样剖析器
# 对生产环境安全,开销极小
"""
# 安装
pip install py-spy
# 1. 附加到运行中的进程
py-spy top --pid 12345
# 2. 生成火焰图
py-spy record --pid 12345 -o flamegraph.svg --duration 60
# 3. 转储调用栈
py-spy dump --pid 12345
# 4. 直接运行并剖析
py-spy record -- python my_script.py -o profile.svg
火焰图解读:
- X 轴: 采样比例(不是时间)
- Y 轴: 调用栈深度
- 颜色: 随机(无特殊含义)
- 宽块 = 该函数消耗 CPU 多 = 优化重点
"""算法与数据结构优化
import time
from collections import defaultdict, Counter, deque
import bisect
class PerformanceOptimization:
"""常见性能优化模式"""
# ============================================
# 1. 数据结构选择
# ============================================
@staticmethod
def lookup_comparison():
"""查找性能对比: list vs set vs dict"""
data_size = 1000000
data = list(range(data_size))
data_set = set(data)
data_dict = {x: x for x in data}
lookup_values = list(range(data_size // 2, data_size // 2 + 10000))
# 列表查找 O(n)
start = time.time()
found = [v for v in lookup_values if v in data]
list_time = time.time() - start
# 集合查找 O(1)
start = time.time()
found = [v for v in lookup_values if v in data_set]
set_time = time.time() - start
# 字典查找 O(1)
start = time.time()
found = [v for v in lookup_values if v in data_dict]
dict_time = time.time() - start
print(f"列表查找: {list_time:.4f}s")
print(f"集合查找: {set_time:.4f}s")
print(f"字典查找: {dict_time:.4f}s")
print(f"集合比列表快 {list_time/set_time:.0f} 倍")
# ============================================
# 2. 循环优化
# ============================================
@staticmethod
def loop_optimization():
"""循环优化对比"""
data = list(range(100000))
# 慢: Python 循环 + append
start = time.time()
result = []
for x in data:
result.append(x * 2 + 1)
loop_time = time.time() - start
# 快: 列表推导
start = time.time()
result = [x * 2 + 1 for x in data]
list_comp_time = time.time() - start
# 更快: map + 内置函数
start = time.time()
result = list(map(lambda x: x * 2 + 1, data))
map_time = time.time() - start
# 最快: numpy
import numpy as np
arr = np.array(data)
start = time.time()
result = arr * 2 + 1
numpy_time = time.time() - start
print(f"for 循环: {loop_time:.4f}s")
print(f"列表推导: {list_comp_time:.4f}s ({loop_time/list_comp_time:.1f}x)")
print(f"map: {map_time:.4f}s ({loop_time/map_time:.1f}x)")
print(f"numpy: {numpy_time:.6f}s ({loop_time/numpy_time:.1f}x)")
# ============================================
# 3. 字符串拼接优化
# ============================================
@staticmethod
def string_concat():
"""字符串拼接性能对比"""
parts = [f"item_{i}" for i in range(100000)]
# 慢: += 拼接 (每次创建新字符串)
start = time.time()
result = ""
for p in parts:
result += p + ", "
concat_time = time.time() - start
# 快: join
start = time.time()
result = ", ".join(parts)
join_time = time.time() - start
print(f"+= 拼接: {concat_time:.4f}s")
print(f"join: {join_time:.4f}s (快 {concat_time/join_time:.0f} 倍)")
# ============================================
# 4. 利用内置函数
# ============================================
@staticmethod
def builtin_functions():
"""内置函数 vs 手写实现"""
data = list(range(100000))
# 求和: 手写循环
start = time.time()
total = 0
for x in data:
total += x
loop_sum_time = time.time() - start
# 求和: sum()
start = time.time()
total = sum(data)
sum_time = time.time() - start
# 计数: Counter
words = ["hello", "world", "hello", "python"] * 25000
start = time.time()
counts = {}
for w in words:
counts[w] = counts.get(w, 0) + 1
manual_count_time = time.time() - start
start = time.time()
counts = Counter(words)
counter_time = time.time() - start
print(f"手动求和: {loop_sum_time:.4f}s")
print(f"sum(): {sum_time:.4f}s (快 {loop_sum_time/sum_time:.1f}x)")
print(f"手动计数: {manual_count_time:.4f}s")
print(f"Counter: {counter_time:.4f}s (快 {manual_count_time/counter_time:.1f}x)")
# ============================================
# 5. 字典操作优化
# ============================================
@staticmethod
def dict_optimization():
data = [('a', 1), ('b', 2), ('c', 3), ('a', 4), ('b', 5)]
# 慢: 手动分组
start = time.time()
grouped = {}
for key, val in data * 100000:
if key not in grouped:
grouped[key] = []
grouped[key].append(val)
manual_time = time.time() - start
# 快: defaultdict
start = time.time()
grouped = defaultdict(list)
for key, val in data * 100000:
grouped[key].append(val)
dd_time = time.time() - start
print(f"手动分组: {manual_time:.4f}s")
print(f"defaultdict: {dd_time:.4f}s (快 {manual_time/dd_time:.1f}x)")
# setdefault vs defaultdict
d = {}
# 慢
d.setdefault('key', []).append('value')
# 快
dd = defaultdict(list)
dd['key'].append('value')Cython 加速
# Cython: 将 Python 代码编译为 C 扩展
# 1. 创建 .pyx 文件: fast_math.pyx
"""
# fast_math.pyx
# cython: language_level=3
import cython
# 类型声明的快速求和
@cython.boundscheck(False)
@cython.wraparound(False)
def fast_sum(double[:] data):
cdef long i
cdef long n = data.shape[0]
cdef double total = 0.0
for i in range(n):
total += data[i]
return total
# 斐波那契数列(C 级别速度)
@cython.cdivision(True)
def fibonacci(int n):
if n <= 1:
return n
cdef int a = 0, b = 1, temp
cdef int i
for i in range(2, n + 1):
temp = a + b
a = b
b = temp
return b
# 矩阵运算
import numpy as np
cimport numpy as np
def matrix_multiply(
np.ndarray[np.float64_t, ndim=2] a,
np.ndarray[np.float64_t, ndim=2] b
):
cdef int m = a.shape[0]
cdef int n = b.shape[1]
cdef int k = a.shape[1]
cdef np.ndarray[np.float64_t, ndim=2] c = np.zeros((m, n))
cdef int i, j, p
cdef double s
for i in range(m):
for j in range(n):
s = 0.0
for p in range(k):
s += a[i, p] * b[p, j]
c[i, j] = s
return c
"""
# 2. 编译配置: setup.py
"""
from setuptools import setup
from Cython.Build import cythonize
import numpy as np
setup(
ext_modules=cythonize("fast_math.pyx"),
include_dirs=[np.get_include()],
)
# 编译: python setup.py build_ext --inplace
"""
# 3. 使用编译后的模块
"""
import numpy as np
from fast_math import fast_sum, fibonacci, matrix_multiply
# 比纯 Python 快 10-100 倍
data = np.random.rand(10000000)
result = fast_sum(data)
print(f"Sum: {result}")
"""numba JIT 编译
# pip install numba
from numba import jit, njit, prange
import numpy as np
import time
# 1. 基础 JIT 加速
@njit(fastmath=True)
def monte_carlo_pi(n: int) -> float:
"""蒙特卡洛计算 Pi — JIT 加速"""
count = 0
for i in range(n):
x = np.random.random()
y = np.random.random()
if x * x + y * y <= 1.0:
count += 1
return 4.0 * count / n
# 第一次调用会编译(慢),后续调用很快
start = time.time()
result = monte_carlo_pi(10_000_000)
print(f"Pi ≈ {result}, 耗时: {time.time() - start:.3f}s")
# 2. 并行 JIT
@njit(parallel=True)
def parallel_computation(data: np.ndarray) -> np.ndarray:
"""并行计算"""
n = len(data)
result = np.empty(n)
for i in prange(n):
result[i] = data[i] ** 2 + np.sin(data[i]) * np.cos(data[i])
return result
data = np.random.rand(10_000_000)
start = time.time()
result = parallel_computation(data)
print(f"并行计算耗时: {time.time() - start:.3f}s")
# 3. 纯 Python vs numba 对比
def pure_python_sum(arr):
"""纯 Python 求和"""
total = 0.0
for x in arr:
total += x
return total
@njit
def numba_sum(arr):
"""numba 加速求和"""
total = 0.0
for x in arr:
total += x
return total
data = np.random.rand(10_000_000)
# 纯 Python
start = time.time()
result = pure_python_sum(data)
python_time = time.time() - start
# numba (包含编译时间)
start = time.time()
result = numba_sum(data)
numma_first_time = time.time() - start
# numba (编译缓存后)
start = time.time()
result = numba_sum(data)
numba_cached_time = time.time() - start
print(f"纯 Python: {python_time:.4f}s")
print(f"numba(首次): {numma_first_time:.4f}s (含编译)")
print(f"numba(缓存): {numba_cached_time:.6f}s")
print(f"加速比: {python_time/numba_cached_time:.0f}x")asyncio 性能优化
import asyncio
import aiohttp
import time
from concurrent.futures import ThreadPoolExecutor
class AsyncPerformance:
"""asyncio 性能优化模式"""
@staticmethod
async def fetch_urls(urls: list) -> list:
"""并发 HTTP 请求 — 对比串行和并发"""
async with aiohttp.ClientSession() as session:
# 方式 1: 串行(慢)
async def fetch_one(url):
async with session.get(url) as resp:
return await resp.text()
# 方式 2: 全部并发(可能触发限流)
# results = await asyncio.gather(*[fetch_one(u) for u in urls])
# 方式 3: 限制并发数(推荐)
semaphore = asyncio.Semaphore(10)
async def fetch_with_limit(url):
async with semaphore:
return await fetch_one(url)
start = time.time()
results = await asyncio.gather(
*[fetch_with_limit(u) for u in urls],
return_exceptions=True
)
elapsed = time.time() - start
success = sum(1 for r in results if not isinstance(r, Exception))
print(f"完成 {success}/{len(urls)} 个请求, 耗时 {elapsed:.2f}s")
return results
@staticmethod
async def producer_consumer():
"""生产者-消费者模式"""
queue = asyncio.Queue(maxsize=100)
async def producer(pid):
for i in range(50):
await queue.put(f"item-{pid}-{i}")
await asyncio.sleep(0.001)
print(f"生产者 {pid} 完成")
async def consumer(cid):
processed = 0
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=1.0)
# 模拟处理
await asyncio.sleep(0.002)
processed += 1
queue.task_done()
except asyncio.TimeoutError:
break
print(f"消费者 {cid} 处理了 {processed} 个任务")
# 启动生产者和消费者
producers = [asyncio.create_task(producer(i)) for i in range(3)]
consumers = [asyncio.create_task(consumer(i)) for i in range(5)]
await asyncio.gather(*producers)
await queue.join()
for c in consumers:
c.cancel()
@staticmethod
def cpu_bound_async():
"""CPU 密集型任务 + asyncio"""
import math
def cpu_heavy(n):
"""CPU 密集计算"""
return sum(math.isqrt(i) for i in range(n))
async def run_cpu_tasks():
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=4)
# 将 CPU 任务放到线程池中执行
tasks = [
loop.run_in_executor(executor, cpu_heavy, 500000)
for _ in range(4)
]
start = time.time()
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"4 个 CPU 任务并发完成, 耗时 {elapsed:.2f}s")
return results
asyncio.run(run_cpu_tasks())GIL 的影响与应对
"""
Python GIL (Global Interpreter Lock)
- 保证同一时刻只有一个线程执行 Python 字节码
- 影响: CPU 密集型多线程无法利用多核
- 不影响: IO 密集型任务(线程在等待 IO 时释放 GIL)
应对策略:
1. CPU 密集 -> multiprocessing(多进程)
2. IO 密集 -> threading / asyncio
3. C 扩展 -> Cython 释放 GIL
4. numpy -> 底层 C 实现,不受 GIL 影响
"""
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
import math
def cpu_task(n: int) -> int:
"""CPU 密集任务"""
return sum(math.isqrt(i) for i in range(n))
def compare_parallel_strategies():
"""对比并行策略"""
task_size = 1000000
task_count = 4
tasks = [task_size] * task_count
# 1. 串行
start = time.time()
results = [cpu_task(n) for n in tasks]
serial_time = time.time() - start
# 2. 多线程(受 GIL 限制,可能更慢)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as pool:
results = list(pool.map(cpu_task, tasks))
thread_time = time.time() - start
# 3. 多进程(真正并行)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as pool:
results = list(pool.map(cpu_task, tasks))
process_time = time.time() - start
print(f"串行: {serial_time:.2f}s")
print(f"多线程: {thread_time:.2f}s (受 GIL 限制)")
print(f"多进程: {process_time:.2f}s (绕过 GIL)")
print(f"多进程加速比: {serial_time/process_time:.1f}x")内存优化技巧
import sys
import array
from dataclasses import dataclass
from typing import NamedTuple
class MemoryOptimization:
"""内存优化技巧"""
@staticmethod
def compare_memory_usage():
"""对比不同实现的内存使用"""
n = 1000000
# 1. list vs array
int_list = list(range(n))
int_array = array.array('i', range(n))
print(f"list: {sys.getsizeof(int_list) / 1024 / 1024:.2f} MB")
print(f"array: {sys.getsizeof(int_array) / 1024 / 1024:.2f} MB")
# 2. dict vs namedtuple vs dataclass vs __slots__
# 普通 dict
d = {'x': 1, 'y': 2, 'z': 3}
dict_size = sys.getsizeof(d)
# namedtuple
from collections import namedtuple
Point = namedtuple('Point', ['x', 'y', 'z'])
p = Point(1, 2, 3)
nt_size = sys.getsizeof(p)
# __slots__
class PointSlots:
__slots__ = ['x', 'y', 'z']
def __init__(self, x, y, z):
self.x, self.y, self.z = x, y, z
ps = PointSlots(1, 2, 3)
slots_size = sys.getsizeof(ps)
# dataclass with slots
@dataclass(slots=True)
class PointDataclass:
x: int
y: int
z: int
pd = PointDataclass(1, 2, 3)
dc_size = sys.getsizeof(pd)
print(f"\n单个对象内存对比:")
print(f"dict: {dict_size} bytes")
print(f"namedtuple: {nt_size} bytes")
print(f"__slots__: {slots_size} bytes")
print(f"dataclass: {dc_size} bytes")
@staticmethod
def generator_vs_list():
"""生成器 vs 列表的内存对比"""
import tracemalloc
# 列表: 一次性加载全部数据
tracemalloc.start()
data_list = [x ** 2 for x in range(1000000)]
_, list_peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
# 生成器: 按需生成
tracemalloc.start()
data_gen = (x ** 2 for x in range(1000000))
# 只取前 10 个
first_10 = [next(data_gen) for _ in range(10)]
_, gen_peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"列表内存峰值: {list_peak / 1024 / 1024:.2f} MB")
print(f"生成器内存峰值: {gen_peak / 1024 / 1024:.2f} MB")
print(f"节省: {(list_peak - gen_peak) / 1024 / 1024:.2f} MB")
@staticmethod
def intern_strings():
"""字符串驻留优化"""
# 大量重复字符串的场景(如日志处理)
import sys
# 不使用 intern
words = ["status_code"] * 100000
unique_words = [w for w in words]
total_no_intern = sum(sys.getsizeof(w) for w in unique_words)
# 使用 intern
interned_words = [sys.intern(w) for w in words]
total_with_intern = sum(
sys.getsizeof(w) for w in set(interned_words)
)
print(f"不使用 intern: {total_no_intern / 1024 / 1024:.2f} MB")
print(f"使用 intern: {total_with_intern / 1024:.2f} bytes (只存一份)")优点
缺点
性能注意事项
- 剖析优先:优化前务必剖析,确认瓶颈所在
- 算法复杂度:O(n) 到 O(log n) 的提升远大于微优化
- I/O 瓶颈:很多时候性能瓶颈在数据库/网络,而非 Python 代码
- 缓存结果:重复计算使用 lru_cache,避免冗余运算
- 批量操作:数据库/文件操作使用批量接口,减少往返次数
- 预热 JIT:numba 首次调用有编译开销,生产环境需预热
总结
Python 性能优化遵循"度量 -> 定位 -> 优化 -> 验证"的闭环流程。大多数情况下,选择合适的数据结构(set/dict 替代 list)、使用内置函数(sum/join/sorted)、列表推导替代循环就能获得显著提升。当这些手段不够时,再考虑 numba JIT、Cython 编译、多进程并行等重型武器。
关键知识点
- cProfile — 函数级性能剖析,找到最耗时的函数
- line_profiler — 逐行剖析,精确定位热点代码行
- 数据结构选择 — 查找用 set/dict(O(1)),不要用 list(O(n))
- 列表推导 — 比 for 循环快 30-50%
- numba @njit — 零成本将 Python 编译为机器码
- Cython — 类型声明 + 编译,接近 C 的性能
- GIL — CPU 密集用多进程,IO 密集用 asyncio
- 生成器 — 处理大数据时节省内存
常见误区
- 过早优化:没有剖析就优化,优化了不关键的代码
- 过度使用 Cython:大多数场景 numba 就够了,Cython 增加构建复杂度
- 忽视 I/O 瓶颈:Python 代码不是瓶颈,数据库和网络才是
- 多线程万能:CPU 密集型任务多线程反而更慢(GIL)
- 忽视内存:只关注 CPU,忽视内存泄漏和 GC 压力
- 微优化执念:花大量时间优化节省 1ms,不如优化算法节省 1000ms
进阶路线
- 入门:掌握 cProfile、列表推导、内置函数优化
- 进阶:line_profiler、memory_profiler、数据结构选择
- 高级:numba JIT、Cython、asyncio 并发优化
- 专家:自定义 C 扩展、性能建模、全链路优化
适用场景
- 数据处理和 ETL 管道性能优化
- Web API 响应时间优化
- 计算密集型科学计算加速
- 大规模数据处理内存优化
- 高并发服务的并发模型选择
落地建议
- 第一步:对关键路径代码运行 cProfile,找到 top 5 热点函数
- 第二步:用 line_profiler 分析热点函数的每一行
- 第三步:优先优化算法和数据结构
- 第四步:对数值计算热点使用 numba 加速
- 第五步:对 IO 密集型改为 asyncio
- 持续:建立性能基线,每次发版前后对比
排错清单
复盘问题
- 当前系统的性能基线是什么?关键路径的 P99 延迟是多少?
- 上次性能优化的效果如何?提升了多少?
- 是否有已知的性能瓶颈未解决?阻塞点是什么?
- 内存使用趋势如何?是否存在内存泄漏?
- 并发能力是否满足峰值需求?
- 下一步的优化重点是什么?
