进程、线程、同步/异步 完全指南
一、进程(Process)
什么是进程?
进程是操作系统进行资源分配和调度的基本单位。每个进程都有自己独立的内存空间、文件描述符等资源。
import os
from multiprocessing import Process
# 每个进程有独立的 PID
print(f"主进程 PID: {os.getpid()}")
def worker(name):
print(f"子进程 '{name}' PID: {os.getpid()}")
# 子进程有自己的内存空间
x = 100 # 这个变量只属于这个子进程
# 创建新进程
p = Process(target=worker, args=("Worker1",))
p.start()
p.join()
进程的特点
# 进程间内存隔离的演示
from multiprocessing import Process
shared_list = [] # 主进程的列表
def add_item():
shared_list.append(1) # 子进程会复制一份列表,不影响主进程
print(f"子进程中的列表: {shared_list}")
p = Process(target=add_item)
p.start()
p.join()
print(f"主进程中的列表: {shared_list}") # 仍然是 [],互不影响
# 输出:
# 子进程中的列表: [1]
# 主进程中的列表: []
关键特性:
- ✅ 资源独立,一个进程崩溃不影响其他进程
- ✅ 真正并行(多核 CPU 可同时运行多个进程)
- ❌ 创建/销毁开销大
- ❌ 进程间通信复杂(需要 IPC:管道、队列、共享内存等)
- ❌ 切换开销大
二、线程(Thread)
什么是线程?
线程是操作系统进行调度的基本单位。一个进程内的多个线程共享进程的内存空间和资源。
import threading
import os
print(f"主线程 PID: {os.getpid()}, 线程ID: {threading.current_thread().name}")
def worker(name):
print(f"子线程 '{name}' PID: {os.getpid()}, 线程ID: {threading.current_thread().name}")
# 线程共享进程的内存空间
shared_list.append(1) # 可以访问并修改主线程的变量
shared_list = [] # 所有线程共享这个列表
t = threading.Thread(target=worker, args=("Worker1",))
t.start()
t.join()
print(f"主线程中的列表: {shared_list}") # [1],被子线程修改了
# 输出:
# 主线程 PID: 12345, 线程ID: MainThread
# 子线程 'Worker1' PID: 12345, 线程ID: Thread-1
# 主线程中的列表: [1]
线程的特点
import threading
import time
# 多个线程共享全局变量
counter = 0
def increment():
global counter
for _ in range(1000000):
counter += 1 # 这个操作不是原子的,需要加锁
def increment_with_lock():
global counter
for _ in range(1000000):
with lock:
counter += 1
# 无锁的情况(会有竞态条件)
counter = 0
threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"无锁结果: {counter}") # 结果不确定,通常小于 4000000
# 有锁的情况
counter = 0
lock = threading.Lock()
threads = [threading.Thread(target=increment_with_lock) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"有锁结果: {counter}") # 正好 4000000
关键特性:
- ✅ 创建/销毁开销小
- ✅ 切换开销小
- ✅ 线程间通信简单(共享内存)
- ❌ 需要处理数据同步(锁、信号量等)
- ❌ 一个线程崩溃可能影响整个进程
- ❌ 在 CPython 中受 GIL 限制
三、进程 vs 线程 对比
import threading
import multiprocessing
import time
import os
def cpu_intensive(n):
"""CPU 密集型任务"""
total = 0
for i in range(n):
total += i ** 2
return total
def compare_performance():
"""对比进程和线程在 CPU 密集型任务上的表现"""
n_workers = 4
n_iterations = 10_000_000
# 多线程(受 GIL 影响)
start = time.time()
threads = [threading.Thread(target=cpu_intensive, args=(n_iterations,))
for _ in range(n_workers)]
for t in threads: t.start()
for t in threads: t.join()
thread_time = time.time() - start
# 多进程(不受 GIL 影响)
start = time.time()
processes = [multiprocessing.Process(target=cpu_intensive, args=(n_iterations,))
for _ in range(n_workers)]
for p in processes: p.start()
for p in processes: p.join()
process_time = time.time() - start
print(f"多线程耗时: {thread_time:.2f}s (受到 GIL 限制)")
print(f"多进程耗时: {process_time:.2f}s (真正的并行)")
# 输出示例(4核CPU):
# 多线程耗时: 8.5s (串行执行)
# 多进程耗时: 2.3s (并行执行)
# 内存隔离演示
def memory_isolation():
shared = [] # 主进程的列表
def modify_list():
shared.append("item")
print(f"子进程看到: {shared}")
p = multiprocessing.Process(target=modify_list)
p.start()
p.join()
print(f"主进程看到: {shared}") # 还是 [],因为内存隔离
四、同步 vs 异步
同步(Synchronous)
任务按顺序执行,必须等待当前任务完成后才能继续下一个任务。
import time
def sync_example():
"""同步执行:等一个完成再做下一个"""
print("开始同步任务")
def task(name, duration):
print(f"{name} 开始")
time.sleep(duration) # 模拟耗时操作
print(f"{name} 完成")
return f"{name} 结果"
# 串行执行
result1 = task("任务A", 2)
result2 = task("任务B", 1)
result3 = task("任务C", 1.5)
print(f"所有结果: {result1}, {result2}, {result3}")
# 总耗时: 2 + 1 + 1.5 = 4.5 秒
sync_example()
异步(Asynchronous)
发起任务后不等待完成,可以继续执行其他任务,完成后通过回调或轮询获取结果。
import asyncio
import time
async def async_example():
"""异步执行:并发等待,充分利用等待时间"""
print("开始异步任务")
async def task(name, duration):
print(f"{name} 开始")
await asyncio.sleep(duration) # 模拟异步 I/O 操作
print(f"{name} 完成")
return f"{name} 结果"
# 并发执行
tasks = [
task("任务A", 2),
task("任务B", 1),
task("任务C", 1.5)
]
results = await asyncio.gather(*tasks)
print(f"所有结果: {results}")
# 总耗时: max(2, 1, 1.5) = 2 秒(并发执行)
# 运行异步代码
asyncio.run(async_example())
同步 vs 异步 直观对比
import asyncio
import threading
import time
# 模拟网络请求
def sync_network_request(url):
"""同步网络请求"""
print(f"同步请求 {url} 开始")
time.sleep(1) # 模拟网络延迟
print(f"同步请求 {url} 完成")
return f"{url} 数据"
async def async_network_request(url):
"""异步网络请求"""
print(f"异步请求 {url} 开始")
await asyncio.sleep(1) # 模拟异步网络延迟
print(f"异步请求 {url} 完成")
return f"{url} 数据"
def sync_demo():
"""同步方式:串行等待"""
start = time.time()
urls = ["url1", "url2", "url3"]
results = []
for url in urls:
results.append(sync_network_request(url))
print(f"同步总耗时: {time.time() - start:.2f}s")
# 输出: 同步总耗时: 3.01s
async def async_demo():
"""异步方式:并发等待"""
start = time.time()
urls = ["url1", "url2", "url3"]
tasks = [async_network_request(url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"异步总耗时: {time.time() - start:.2f}s")
# 输出: 异步总耗时: 1.01s
# 执行对比
sync_demo()
asyncio.run(async_demo())
五、阻塞 vs 非阻塞
阻塞(Blocking)
调用函数后,当前线程被挂起,等待操作完成。
import time
import socket
def blocking_example():
"""阻塞 I/O 示例"""
print("1. 开始阻塞连接")
# socket.connect() 会阻塞直到连接成功
sock = socket.socket()
sock.connect(("www.python.org", 80)) # 阻塞在这里
print("2. 连接成功")
# time.sleep() 也是阻塞的
time.sleep(2) # 线程被挂起 2 秒
print("3. 睡眠结束")
# input() 也是阻塞的
user_input = input("4. 等待输入: ") # 阻塞等待用户输入
print(f"5. 收到输入: {user_input}")
# 阻塞期间,线程不能做任何其他事情
非阻塞(Non-blocking)
调用函数后立即返回,不等待操作完成。
import socket
def nonblocking_example():
"""非阻塞 I/O 示例(设置套接字为非阻塞)"""
sock = socket.socket()
sock.setblocking(False) # 设置为非阻塞模式
try:
# 不会阻塞,但可能立即抛出错误
sock.connect(("www.python.org", 80))
except BlockingIOError:
print("连接正在进行中(非阻塞模式)")
# 可以在这里做其他事情,稍后再检查连接状态
# 立即返回,不需要等待
print("可以执行其他代码")
# 注意:非阻塞模式通常需要配合 select/poll/epoll 使用
六、并发 vs 并行
这是最容易被混淆的两个概念:
并发(Concurrency)
逻辑上同时处理多件事情,但不一定在同一时刻执行。
import threading
import time
def concurrency_demo():
"""并发:多个线程交替执行"""
def task(name):
for i in range(3):
print(f"{name} 执行步骤 {i}")
time.sleep(0.1) # 主动让出 CPU
# 两个线程并发执行(在单核 CPU 上)
t1 = threading.Thread(target=task, args=("任务A",))
t2 = threading.Thread(target=task, args=("任务B",))
t1.start()
t2.start()
t1.join()
t2.join()
# 可能的输出(交错执行):
# 任务A 执行步骤 0
# 任务B 执行步骤 0
# 任务A 执行步骤 1
# 任务B 执行步骤 1
# 任务A 执行步骤 2
# 任务B 执行步骤 2
并行(Parallelism)
物理上同时执行多个任务(需要多核 CPU)。
from multiprocessing import Pool
import time
def parallel_demo():
"""并行:多个进程同时执行"""
def heavy_computation(n):
"""CPU 密集型计算"""
total = 0
for i in range(n):
total += i ** 2
return total
# 使用进程池,在多个 CPU 核心上并行执行
with Pool(processes=4) as pool:
results = pool.map(heavy_computation, [10_000_000] * 4)
# 4 个任务真正地同时在不同核心上运行
print(f"计算结果: {results}")
# 并发的比喻:一个人同时煮咖啡和烤面包(交替进行)
# 并行的比喻:两个人,一个煮咖啡,一个烤面包(同时进行)
七、GIL 到底影响哪部分?
这是最关键的部分!GIL 不影响所有代码。
GIL 影响的代码
import threading
import time
import numpy as np
# ❌ GIL 影响:纯 Python 的 CPU 密集型代码
def pure_python_cpu():
"""纯 Python 计算 - 受 GIL 严重影响"""
total = 0
for i in range(100_000_000): # 纯 Python 循环
total += i ** 2
return total
# 多线程执行纯 Python 计算
def test_gil_effect():
start = time.time()
threads = []
for _ in range(4):
t = threading.Thread(target=pure_python_cpu)
t.start()
threads.append(t)
for t in threads:
t.join()
print(f"纯 Python 多线程耗时: {time.time() - start:.2f}s")
# 输出: ~8s (几乎等于单线程 × 4,因为 GIL 串行化)
GIL 不影响的代码
# ✅ GIL 不影响 1: I/O 操作
def io_operation():
"""I/O 操作会自动释放 GIL"""
import requests
response = requests.get("https://www.python.org") # 释放 GIL
return response.status_code
# ✅ GIL 不影响 2: NumPy/SciPy 等 C 扩展
def numpy_operation():
"""NumPy 的底层 C 代码会释放 GIL"""
import numpy as np
a = np.random.rand(1000, 1000)
b = np.random.rand(1000, 1000)
# 这个矩阵乘法在 C 层执行,会释放 GIL
c = np.dot(a, b) # 多核并行!
return c
# ✅ GIL 不影响 3: 系统调用
def system_call():
"""大多数系统调用会释放 GIL"""
import os
# 文件读写会释放 GIL
with open("/dev/null", "w") as f:
f.write("x" * 1000000)
# time.sleep 也会释放 GIL
time.sleep(1)
# ✅ GIL 不影响 4: 加密/压缩等 C 库
def c_library_operation():
"""调用 C 库通常会释放 GIL"""
import hashlib
data = b"x" * 10000000
# hashlib 的底层 C 实现会释放 GIL
hash_obj = hashlib.sha256(data) # 多线程中也能并行
return hash_obj.hexdigest()
实际测试:GIL 的边界
import threading
import time
import numpy as np
def benchmark_gil_boundary():
"""测试 GIL 影响边界"""
# 1. 纯 Python 循环(受 GIL 影响)
def python_loop():
total = 0
for i in range(50_000_000):
total += i
# 2. NumPy 操作(不受 GIL 影响)
def numpy_ops():
arr = np.random.rand(1000, 1000)
result = np.dot(arr, arr.T)
# 3. 混合操作(部分受 GIL 影响)
def mixed_ops():
arr = np.random.rand(1000, 1000)
result = np.dot(arr, arr.T) # 无 GIL
total = 0
for i in range(10_000_000): # 有 GIL
total += i
# 测试纯 Python
start = time.time()
threads = [threading.Thread(target=python_loop) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
python_time = time.time() - start
# 测试 NumPy
start = time.time()
threads = [threading.Thread(target=numpy_ops) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
numpy_time = time.time() - start
print(f"纯 Python 循环(受 GIL 限制): {python_time:.2f}s")
print(f"NumPy 操作(突破 GIL): {numpy_time:.2f}s")
# benchmark_gil_boundary()
八、总结对比表
| 特性 | 进程 | 线程(CPython) | 异步 |
|---|---|---|---|
| 资源开销 | 很大 | 较小 | 极小 |
| 切换开销 | 大 | 小 | 几乎无 |
| 数据共享 | 困难(IPC) | 容易(共享内存) | 容易 |
| GIL 影响 | 无影响 | CPU 密集型受影响 | I/O 密集型受影响 |
| 真正并行 | ✅ 是 | ❌ 否(GIL限制) | ❌ 否 |
| 适用场景 | CPU 密集型 | I/O 密集型 | 高并发 I/O |
| 调试难度 | 中等 | 困难(竞态条件) | 较难 |
| 崩溃影响 | 隔离 | 可能影响整个进程 | 影响当前任务 |
九、实战选择指南
def choose_concurrency_model(task_type, requirements):
"""根据场景选择合适的并发模型"""
if task_type == "cpu_intensive":
if requirements.get("need_shared_memory", False):
return "使用 multiprocessing + 共享内存"
else:
return "使用 ProcessPoolExecutor"
elif task_type == "io_intensive":
if requirements.get("high_concurrency", 0) > 1000:
return "使用 asyncio(高并发 I/O)"
else:
return "使用 ThreadPoolExecutor(简单场景)"
elif task_type == "mixed":
return "组合使用:asyncio + ProcessPoolExecutor"
elif task_type == "real_time":
return "考虑使用 C 扩展或 PyPy"
else:
return "单线程足够,不要过度设计"
# 使用示例
print(choose_concurrency_model("cpu_intensive", {}))
# 输出: 使用 ProcessPoolExecutor
print(choose_concurrency_model("io_intensive", {"high_concurrency": 10000}))
# 输出: 使用 asyncio(高并发 I/O)
理解了这些基础概念后,GIL 的影响范围就清晰了:
- GIL 只影响纯 Python 代码的 CPU 密集型任务
- I/O 操作、C 扩展、NumPy 等都不受 GIL 限制
- 多进程是绕过 GIL 的最简单方式
评论区