Python 并发与多进程
大约 11 分钟约 3177 字
Python 并发与多进程
简介
Python 的 GIL(全局解释器锁)限制了多线程的 CPU 并行能力。理解多线程、多进程、线程池和进程池的选择策略,以及 multiprocessing、concurrent.futures 的使用,有助于选择正确的并发方案。
特点
GIL 与并发选择
选择策略
# GIL(Global Interpreter Lock):
# 同一时刻只有一个线程执行 Python 字节码
# 影响:CPU 密集任务无法真正并行
# 并发方案选择:
# IO 密集 → asyncio(最佳)、多线程(次选)
# CPU 密集 → 多进程、C 扩展
# 混合型 → 多进程 + 每进程内 asyncio
import time
import threading
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# CPU 密集任务对比
def cpu_bound(n):
"""CPU 密集计算"""
return sum(i * i for i in range(n))
def benchmark_cpu():
n = 10_000_000
tasks = 4
# 串行
start = time.perf_counter()
for _ in range(tasks):
cpu_bound(n)
serial_time = time.perf_counter() - start
# 多线程(受 GIL 限制,无加速)
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=tasks) as executor:
list(executor.map(cpu_bound, [n] * tasks))
thread_time = time.perf_counter() - start
# 多进程(绕过 GIL,真正并行)
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=tasks) as executor:
list(executor.map(cpu_bound, [n] * tasks))
process_time = time.perf_counter() - start
print(f"串行: {serial_time:.2f}s")
print(f"多线程: {thread_time:.2f}s")
print(f"多进程: {process_time:.2f}s")多线程
线程安全与同步
import threading
from queue import Queue
# 1. 线程安全的计数器
class AtomicCounter:
def __init__(self, initial=0):
self._value = initial
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._value += 1
return self._value
@property
def value(self):
return self._value
# 2. 生产者-消费者
class ProducerConsumer:
def __init__(self, buffer_size=100):
self.queue = Queue(maxsize=buffer_size)
self.counter = AtomicCounter()
def producer(self, producer_id: int, count: int):
for i in range(count):
item = f"P{producer_id}-{i}"
self.queue.put(item)
time.sleep(0.01)
self.queue.put(None) # 哨兵值
def consumer(self, consumer_id: int, num_producers: int):
sentinels = 0
while sentinels < num_producers:
item = self.queue.get()
if item is None:
sentinels += 1
else:
self.counter.increment()
self.queue.task_done()
def run(self, num_producers=3, num_consumers=2):
producers = [
threading.Thread(target=self.producer, args=(i, 10))
for i in range(num_producers)
]
consumers = [
threading.Thread(target=self.consumer, args=(i, num_producers))
for i in range(num_consumers)
]
for p in producers: p.start()
for c in consumers: c.start()
for p in producers: p.join()
for c in consumers: c.join()
print(f"处理了 {self.counter.value} 个项目")
# 3. 读写锁
class ReadWriteLock:
def __init__(self):
self._read_ready = threading.Condition(threading.Lock())
self._readers = 0
def acquire_read(self):
with self._read_ready:
self._readers += 1
def release_read(self):
with self._read_ready:
self._readers -= 1
if self._readers == 0:
self._read_ready.notify_all()
def acquire_write(self):
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()
def release_write(self):
self._read_ready.release()多进程
进程池与共享状态
import multiprocessing as mp
from multiprocessing import Pool, Manager
# 1. 进程池
def process_file(filename: str) -> dict:
"""处理单个文件"""
# 每个进程独立运行,无 GIL 限制
with open(filename) as f:
lines = f.readlines()
return {"file": filename, "lines": len(lines)}
def parallel_file_processing(filenames: list[str], workers: int = 4):
with Pool(processes=workers) as pool:
results = pool.map(process_file, filenames)
return results
# 2. 进程间共享状态
def worker_with_shared(queue, counter_dict, lock, worker_id):
"""使用共享状态的 worker"""
for i in range(100):
# 通过 Queue 传递数据
queue.put(f"Worker-{worker_id}: item-{i}")
# 通过 Manager Dict 共享计数
with lock:
counter_dict[worker_id] = counter_dict.get(worker_id, 0) + 1
def shared_state_example():
with Manager() as manager:
shared_queue = manager.Queue()
shared_dict = manager.dict()
lock = manager.Lock()
processes = [
mp.Process(target=worker_with_shared,
args=(shared_queue, shared_dict, lock, i))
for i in range(4)
]
for p in processes: p.start()
for p in processes: p.join()
print(f"各 Worker 处理数: {dict(shared_dict)}")
# 3. 共享内存(高性能)
def worker_shared_memory(shared_arr, index, value):
"""使用共享内存"""
shared_arr[index] = value
def shared_memory_example():
# 共享数组
arr = mp.Array('d', [0.0] * 10) # double 类型数组
# 共享值
val = mp.Value('i', 0) # int 类型值
processes = [
mp.Process(target=worker_shared_memory, args=(arr, i, i * 10.0))
for i in range(10)
]
for p in processes: p.start()
for p in processes: p.join()
print(f"共享数组: {list(arr)}")
# 4. ProcessPoolExecutor(推荐)
from concurrent.futures import ProcessPoolExecutor, as_completed
def parallel_map(func, items, max_workers=None):
"""并行 Map"""
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(func, item): i
for i, item in enumerate(items)}
results = [None] * len(items)
for future in as_completed(futures):
idx = futures[future]
results[idx] = future.result()
return results
# 5. 进程初始化
def init_worker(shared_state):
"""进程初始化函数"""
global _shared_state
_shared_state = shared_state
def process_with_init(data):
"""使用共享状态的 worker"""
_shared_state["processed"] += 1
return data * 2
def pool_with_initializer():
with Manager() as manager:
shared = manager.dict(processed=0)
with Pool(processes=4, initializer=init_worker, initargs=(shared,)) as pool:
results = pool.map(process_with_init, range(20))
print(f"结果: {results}")
print(f"处理数: {shared['processed']}")异步并发(asyncio)
asyncio 基础与并发 HTTP 请求
import asyncio
import aiohttp
import time
from typing import Optional
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
"""异步请求单个 URL"""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return {"url": url, "status": resp.status, "size": len(await resp.read())}
except Exception as e:
return {"url": url, "error": str(e)}
async def fetch_all(urls: list[str], max_concurrent: int = 10) -> list[dict]:
"""并发请求多个 URL(带并发控制)"""
semaphore = asyncio.Semaphore(max_concurrent)
async def limited_fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with semaphore:
return await fetch_url(session, url)
async with aiohttp.ClientSession() as session:
tasks = [limited_fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
# 使用
async def main():
urls = [f"https://httpbin.org/get?id={i}" for i in range(20)]
start = time.perf_counter()
results = await fetch_all(urls, max_concurrent=5)
elapsed = time.perf_counter() - start
success = sum(1 for r in results if "error" not in r)
print(f"完成 {success}/{len(urls)} 个请求,耗时 {elapsed:.2f}s")
asyncio.run(main())asyncio 任务编排与超时控制
import asyncio
from typing import Any
async def worker(name: str, duration: float) -> str:
"""模拟异步 worker"""
await asyncio.sleep(duration)
return f"{name} 完成(耗时 {duration}s)"
async def timeout_example():
"""超时控制"""
try:
result = await asyncio.wait_for(
worker("慢任务", 5.0),
timeout=2.0
)
except asyncio.TimeoutError:
print("任务超时!")
async def gather_with_timeout():
"""并发任务带整体超时"""
tasks = [
worker("任务A", 1.0),
worker("任务B", 2.0),
worker("任务C", 0.5),
]
try:
results = await asyncio.wait_for(
asyncio.gather(*tasks),
timeout=3.0
)
for r in results:
print(f" {r}")
except asyncio.TimeoutError:
print("整体超时!")
async def race_example():
"""竞速:谁先完成用谁"""
done, pending = await asyncio.wait(
[worker("快速", 0.5), worker("慢速", 3.0)],
return_when=asyncio.FIRST_COMPLETED,
)
for task in done:
print(f"胜出: {task.result()}")
for task in pending:
task.cancel()
asyncio.run(race_example())异步队列与生产者-消费者
import asyncio
import random
import time
async def producer(queue: asyncio.Queue, producer_id: int, count: int):
"""异步生产者"""
for i in range(count):
item = f"P{producer_id}-{i}"
await queue.put(item)
await asyncio.sleep(random.uniform(0.01, 0.05))
print(f"生产者 P{producer_id} 完成")
async def consumer(queue: asyncio.Queue, consumer_id: int):
"""异步消费者"""
while True:
item = await queue.get()
# 模拟处理
await asyncio.sleep(random.uniform(0.01, 0.03))
queue.task_done()
async def async_producer_consumer():
"""异步生产者-消费者模型"""
queue = asyncio.Queue(maxsize=100)
# 启动消费者
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# 启动生产者
producers = [
asyncio.create_task(producer(queue, i, 20))
for i in range(2)
]
# 等待所有生产者完成
await asyncio.gather(*producers)
# 等待队列清空
await queue.join()
# 取消消费者
for c in consumers:
c.cancel()
asyncio.run(async_producer_consumer())线程池与进程池实战
并发数据下载
import concurrent.futures
import time
import urllib.request
from typing import List, Dict
def download_file(url: str, save_path: str) -> dict:
"""下载单个文件"""
try:
start = time.time()
urllib.request.urlretrieve(url, save_path)
elapsed = time.time() - start
return {"url": url, "status": "success", "elapsed": round(elapsed, 3)}
except Exception as e:
return {"url": url, "status": "error", "error": str(e)}
def batch_download(urls: List[str], output_dir: str, max_workers: int = 5):
"""批量并发下载"""
from pathlib import Path
Path(output_dir).mkdir(parents=True, exist_ok=True)
tasks = []
for i, url in enumerate(urls):
filename = url.split("/")[-1] or f"file_{i}"
save_path = f"{output_dir}/{filename}"
tasks.append((url, save_path))
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(download_file, url, path): url
for url, path in tasks
}
results = []
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
elapsed = time.time() - start
success = sum(1 for r in results if r["status"] == "success")
print(f"下载完成: {success}/{len(urls)},总耗时 {elapsed:.2f}s")
return results并行数据处理
import concurrent.futures
import os
import time
from pathlib import Path
from typing import List, Dict, Callable, Any
def process_file(filepath: str) -> Dict[str, Any]:
"""处理单个文件(进程池场景)"""
stat = os.stat(filepath)
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
return {
"file": filepath,
"size": stat.st_size,
"lines": len(lines),
"words": sum(len(line.split()) for line in lines),
}
def parallel_file_stats(directory: str, pattern: str = "*.py") -> List[Dict]:
"""并行统计目录下文件信息"""
files = list(Path(directory).rglob(pattern))
print(f"找到 {len(files)} 个文件")
start = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
results = list(executor.map(process_file, [str(f) for f in files]))
elapsed = time.time() - start
total_lines = sum(r["lines"] for r in results)
total_words = sum(r["words"] for r in results)
print(f"处理完成,耗时 {elapsed:.2f}s")
print(f"总行数: {total_lines},总词数: {total_words}")
return results并发性能调优
并发方案选择决策树
# === 选择指南 ===
#
# 1. IO 密集 + 少量任务(< 100)
# -> asyncio(最佳,零开销)
#
# 2. IO 密集 + 大量任务 + 需要兼容同步库
# -> ThreadPoolExecutor
#
# 3. CPU 密集 + 可并行
# -> ProcessPoolExecutor
#
# 4. CPU 密集 + IO 密集 混合
# -> ProcessPoolExecutor + 每进程内 asyncio
#
# 5. 高并发网络服务
# -> asyncio + uvloop(生产级事件循环)
import os
import time
def calculate_optimal_workers():
"""计算最优 worker 数量"""
cpu_count = os.cpu_count() or 4
# CPU 密集:等于 CPU 核数
cpu_workers = cpu_count
# IO 密集:通常 CPU 核数 * 5~10
io_workers = cpu_count * 5
# 混合型:CPU 核数 * 2 + 1
mixed_workers = cpu_count * 2 + 1
print(f"CPU 核数: {cpu_count}")
print(f" CPU 密集任务推荐: {cpu_workers} workers")
print(f" IO 密集任务推荐: {io_workers} workers")
print(f" 混合型任务推荐: {mixed_workers} workers")
calculate_optimal_workers()常见并发陷阱与解决方案
import threading
import multiprocessing as mp
# 陷阱 1:死锁
class DeadlockExample:
def __init__(self):
self.lock_a = threading.Lock()
self.lock_b = threading.Lock()
def task1(self):
with self.lock_a:
time.sleep(0.01)
with self.lock_b: # 如果 task2 先拿了 lock_b,这里会死锁
print("task1 done")
def task2(self):
with self.lock_b:
time.sleep(0.01)
with self.lock_a: # 同理
print("task2 done")
# 解决方案:统一加锁顺序
class SafeLockOrder:
def __init__(self):
self.lock_a = threading.Lock()
self.lock_b = threading.Lock()
def task1(self):
# 始终先 lock_a 再 lock_b
with self.lock_a:
with self.lock_b:
print("task1 done")
def task2(self):
# 同样先 lock_a 再 lock_b
with self.lock_a:
with self.lock_b:
print("task2 done")
# 陷阱 2:进程池中的全局变量不共享
# 错误示范
counter = 0
def increment_wrong():
global counter
counter += 1 # 每个进程有自己的 counter 副本
# 正确做法:使用 Manager 或返回值聚合
def increment_correct(result_dict, worker_id):
result_dict[worker_id] = result_dict.get(worker_id, 0) + 1
def safe_counter_example():
with mp.Manager() as manager:
shared_dict = manager.dict()
with mp.Pool(4) as pool:
pool.starmap(increment_correct, [(shared_dict, i) for i in range(4)])
print(f"各 worker 计数: {dict(shared_dict)}")
# 陷阱 3:大对象在进程间传递的序列化开销
def process_large_data_wrong(large_list):
# large_list 会被 pickle 序列化传递,开销巨大
return sum(large_list)
def process_large_data_correct(shared_array, start, end):
# 使用共享内存,避免序列化
total = 0
for i in range(start, end):
total += shared_array[i]
return total优点
缺点
总结
Python 并发方案根据任务类型选择:IO 密集用 asyncio 或多线程,CPU 密集用多进程。GIL 限制了多线程的 CPU 并行能力,多进程是唯一真正并行方案。concurrent.futures 提供 ThreadPoolExecutor 和 ProcessPoolExecutor 统一接口。进程间通信使用 Queue(简单)、Manager(共享状态)和共享内存(高性能)。建议使用 ProcessPoolExecutor + map/as_completed 模式,设置合理的 max_workers(通常等于 CPU 核数)。
关键知识点
- 先区分这篇内容属于语法能力、工程能力,还是生态工具能力。
- Python 的开发效率来自生态,但可维护性来自结构、测试和规范。
- 脚本一旦进入长期维护,就必须按项目来治理。
- 框架与语言特性类主题要同时理解运行方式和工程组织方式。
项目落地视角
- 统一虚拟环境、依赖锁定、格式化和日志方案。
- 把入口、配置、业务逻辑和工具函数拆开,避免单文件膨胀。
- 对网络请求、文件读写和数据处理结果做异常与样本校验。
- 明确项目入口、配置管理、依赖管理、日志和测试策略。
常见误区
- 把临时脚本直接当生产代码使用。
- 忽略依赖版本、编码、路径和时区差异。
- 只会写 happy path,没有补超时、重试和资源释放。
- 把 notebook 或脚本风格直接带入长期维护项目。
进阶路线
- 把类型注解、测试、打包和部署纳入统一工程流程。
- 继续向异步、性能、数据管线和框架源码层深入。
- 把常用脚本抽成可复用库或 CLI 工具,而不是复制粘贴。
- 继续补齐部署、打包、监控和性能调优能力。
适用场景
- 当你准备把《Python 并发与多进程》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合脚本自动化、数据处理、Web 开发和测试工具建设。
- 当需求强调快速迭代和丰富生态时,Python 往往能快速起步。
落地建议
- 统一使用虚拟环境与依赖锁定,避免环境漂移。
- 对核心函数补类型注解、异常处理和日志,减少“脚本黑盒”。
- 一旦脚本进入生产链路,及时补测试和监控。
排错清单
- 先确认当前解释器、虚拟环境和依赖版本是否正确。
- 检查编码、路径、时区和第三方库行为差异。
- 排查同步阻塞、数据库连接未释放或网络请求无超时。
复盘问题
- 如果把《Python 并发与多进程》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《Python 并发与多进程》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《Python 并发与多进程》最大的收益和代价分别是什么?
