📋 概述
Python的线程与并发系统是解释器的重要组成部分,它涉及全局解释器锁(GIL)、线程管理、同步原语、以及各种并发编程模型的实现。本文档将深入分析CPython中线程与并发机制的源码实现,包括底层线程接口、GIL机制、threading模块、以及现代并发编程模式。
🎯 线程与并发系统架构
graph TB
subgraph "Python应用层"
A[threading模块] --> B[Thread类]
B --> C[Lock/RLock]
C --> D[Condition/Event]
end
subgraph "Python C API层"
E[PyThread_*] --> F[PyGILState_*]
F --> G[PyMutex/PyMutex2]
end
subgraph "GIL层"
H[GIL获取/释放] --> I[线程调度]
I --> J[信号处理]
end
subgraph "操作系统层"
K[pthread/WinThread] --> L[内核调度器]
L --> M[CPU核心]
end
A --> E
E --> H
H --> K
1. 底层线程实现
1.1 跨平台线程抽象
/* Python/thread.c - 跨平台线程抽象层 */
/* 线程初始化 */
void
PyThread_init_thread(void)
{
if (initialized) {
return;
}
initialized = 1;
PyThread__init_thread(); /* 调用平台特定的初始化 */
}
/* 线程锁分配 */
PyThread_type_lock
PyThread_allocate_lock(void)
{
if (!initialized) {
PyThread_init_thread();
}
/* 分配PyMutex结构 */
PyMutex *lock = (PyMutex *)PyMem_RawMalloc(sizeof(PyMutex));
if (lock) {
*lock = (PyMutex){0}; /* 零初始化 */
}
return (PyThread_type_lock)lock;
}
/* 带超时的锁获取 */
PyLockStatus
PyThread_acquire_lock_timed(PyThread_type_lock lock, PY_TIMEOUT_T microseconds,
int intr_flag)
{
PyTime_t timeout; /* 相对超时时间 */
if (microseconds >= 0) {
/* 防止超时溢出,限制在合理范围内 */
timeout = _PyTime_FromMicrosecondsClamp(microseconds);
}
else {
timeout = -1; /* 无限等待 */
}
/* 设置锁标志 */
_PyLockFlags flags = _Py_LOCK_DONT_DETACH;
if (intr_flag) {
flags |= _PY_FAIL_IF_INTERRUPTED; /* 可被信号中断 */
}
/* 调用底层互斥锁实现 */
return _PyMutex_LockTimed((PyMutex *)lock, timeout, flags);
}
/* 带重试的锁获取(处理信号中断) */
PyLockStatus
PyThread_acquire_lock_timed_with_retries(PyThread_type_lock lock,
PY_TIMEOUT_T timeout)
{
PyThreadState *tstate = _PyThreadState_GET();
PyTime_t endtime = 0;
if (timeout > 0) {
endtime = _PyDeadline_Init(timeout); /* 计算截止时间 */
}
PyLockStatus r;
do {
PyTime_t microseconds;
microseconds = _PyTime_AsMicroseconds(timeout, _PyTime_ROUND_CEILING);
/* 首先尝试非阻塞获取,不释放GIL */
r = PyThread_acquire_lock_timed(lock, 0, 0);
if (r == PY_LOCK_FAILURE && microseconds != 0) {
/* 阻塞获取,释放GIL */
Py_BEGIN_ALLOW_THREADS
r = PyThread_acquire_lock_timed(lock, microseconds, 1);
Py_END_ALLOW_THREADS
}
if (r == PY_LOCK_INTR) {
/* 被信号中断,处理挂起的信号 */
if (_PyEval_MakePendingCalls(tstate) < 0) {
return PY_LOCK_INTR; /* 传播异常 */
}
/* 重新计算剩余超时时间 */
if (timeout > 0) {
timeout = _PyDeadline_Get(endtime);
if (timeout < 0) {
r = PY_LOCK_FAILURE; /* 超时 */
}
}
}
} while (r == PY_LOCK_INTR); /* 如果被中断则重试 */
return r;
}
1.2 线程特定存储(TSS)
/* Thread Specific Storage (TSS) API实现 */
/* TSS键分配 */
Py_tss_t *
PyThread_tss_alloc(void)
{
Py_tss_t *new_key = (Py_tss_t *)PyMem_RawMalloc(sizeof(Py_tss_t));
if (new_key == NULL) {
return NULL;
}
new_key->_is_initialized = 0; /* 标记为未初始化 */
return new_key;
}
/* TSS键释放 */
void
PyThread_tss_free(Py_tss_t *key)
{
if (key == NULL) {
return;
}
/* 如果已初始化,先删除键 */
if (key->_is_initialized) {
PyThread_tss_delete(key);
}
PyMem_RawFree((void *)key);
}
/* TSS键创建 */
int
PyThread_tss_create(Py_tss_t *key)
{
assert(key != NULL);
/* 调用平台特定的实现 */
int fail = pthread_key_create(&(key->_key), NULL);
if (fail) {
return -1;
}
key->_is_initialized = 1;
return 0;
}
/* TSS值设置 */
int
PyThread_tss_set(Py_tss_t *key, void *value)
{
assert(key != NULL);
assert(key->_is_initialized);
/* 调用平台特定的实现 */
return pthread_setspecific(key->_key, value) ? -1 : 0;
}
/* TSS值获取 */
void *
PyThread_tss_get(Py_tss_t *key)
{
assert(key != NULL);
assert(key->_is_initialized);
/* 调用平台特定的实现 */
return pthread_getspecific(key->_key);
}
2. GIL (全局解释器锁) 深度分析
2.1 GIL核心实现
/* Python/ceval_gil.c - GIL实现 */
/* GIL状态结构 */
struct _gil_runtime_state {
unsigned long interval; /* GIL检查间隔 */
_Py_atomic_int gil_drop_request; /* GIL释放请求 */
_Py_atomic_int gil; /* GIL状态 */
PyMutex mutex; /* 互斥锁保护GIL状态 */
PyMutex2 cond; /* 条件变量用于线程等待 */
PyThreadState *holder; /* 当前持有GIL的线程 */
int locked; /* GIL是否被锁定 */
unsigned long switch_number; /* GIL切换次数 */
};
/* 获取GIL */
void
take_gil(PyThreadState *tstate)
{
int err = errno;
assert(!_PyThreadState_MustExit(tstate));
if (tstate_must_exit(tstate)) {
/* 线程正在退出,不获取GIL */
PyThread_exit_thread();
}
assert(is_tstate_valid(tstate));
PyMutex_Lock(&gil->mutex);
if (!_Py_atomic_load_int_relaxed(&gil->gil)) {
/* GIL未被持有,直接获取 */
goto _ready;
}
/* GIL被其他线程持有,等待释放 */
while (_Py_atomic_load_int_relaxed(&gil->gil)) {
_Py_atomic_store_int_relaxed(&gil->gil_drop_request, 1);
/* 等待条件变量信号 */
PyMutex2_Wait(&gil->cond, &gil->mutex);
if (tstate_must_exit(tstate)) {
PyMutex_Unlock(&gil->mutex);
PyThread_exit_thread();
}
}
_ready:
/* 设置GIL状态 */
_Py_atomic_store_int_relaxed(&gil->gil_drop_request, 0);
_Py_atomic_store_int_relaxed(&gil->gil, 1);
gil->holder = tstate;
gil->locked = 1;
PyMutex_Unlock(&gil->mutex);
if (_Py_atomic_load_int_relaxed(&tstate->eval_breaker)) {
/* 有挂起的信号或异步任务 */
_Py_FinishPendingCalls(tstate);
}
errno = err;
}
/* 释放GIL */
void
drop_gil(PyInterpreterState *interp, PyThreadState *tstate, int final)
{
/* 检查是否持有GIL */
if (!gil->locked) {
Py_FatalError("drop_gil: GIL is not locked");
}
if (gil->holder != tstate) {
Py_FatalError("drop_gil: wrong thread state");
}
PyMutex_Lock(&gil->mutex);
_Py_atomic_store_int_relaxed(&gil->gil, 0);
gil->holder = NULL;
gil->locked = 0;
/* 通知等待的线程 */
PyMutex2_Notify(&gil->cond);
PyMutex_Unlock(&gil->mutex);
#ifdef FORCE_SWITCHING
if (!final) {
/* 强制线程切换,给其他线程机会 */
PyThread_yield();
}
#endif
}
/* GIL状态检查(在字节码执行循环中调用) */
int
_Py_MakePendingCalls(PyThreadState *tstate)
{
/* 检查是否有GIL释放请求 */
if (_Py_atomic_load_int_relaxed(&gil->gil_drop_request)) {
/* 释放并重新获取GIL,给其他线程机会 */
if (gil->holder == tstate) {
drop_gil(_PyInterpreterState_GET(), tstate, 0);
take_gil(tstate);
}
}
/* 处理挂起的异步调用 */
if (gil->pending.calls_to_do) {
if (make_pending_calls(tstate) != 0) {
return -1;
}
}
return 0;
}
2.2 GIL性能优化
/* GIL性能优化机制 */
/* 自适应GIL间隔调整 */
static void
update_gil_interval(struct _gil_runtime_state *gil)
{
/* 测量GIL争用情况 */
unsigned long current_time = PyThread_get_time_ns();
unsigned long switch_time = current_time - gil->last_switch_time;
if (switch_time < gil->interval * 1000) {
/* 切换太频繁,增加间隔 */
if (gil->interval < 10000) { /* 最大10ms */
gil->interval *= 2;
}
}
else if (switch_time > gil->interval * 5000) {
/* 切换太慢,减少间隔 */
if (gil->interval > 5) { /* 最小5μs */
gil->interval /= 2;
}
}
gil->last_switch_time = current_time;
gil->switch_number++;
}
/* GIL争用监控 */
typedef struct {
unsigned long contention_count;
unsigned long total_wait_time;
unsigned long max_wait_time;
unsigned long switches_per_second;
} GILStats;
static GILStats gil_stats = {0};
static void
record_gil_contention(unsigned long wait_time)
{
gil_stats.contention_count++;
gil_stats.total_wait_time += wait_time;
if (wait_time > gil_stats.max_wait_time) {
gil_stats.max_wait_time = wait_time;
}
}
/* 获取GIL统计信息 */
PyObject *
_PyGIL_GetStats(void)
{
PyObject *stats = PyDict_New();
if (stats == NULL) {
return NULL;
}
PyDict_SetItemString(stats, "contention_count",
PyLong_FromUnsignedLong(gil_stats.contention_count));
PyDict_SetItemString(stats, "total_wait_time",
PyLong_FromUnsignedLong(gil_stats.total_wait_time));
PyDict_SetItemString(stats, "max_wait_time",
PyLong_FromUnsignedLong(gil_stats.max_wait_time));
PyDict_SetItemString(stats, "switch_number",
PyLong_FromUnsignedLong(gil->switch_number));
PyDict_SetItemString(stats, "current_interval",
PyLong_FromUnsignedLong(gil->interval));
return stats;
}
3. threading模块深度实现
3.1 Thread类核心实现
# Lib/threading.py - Thread类实现分析
import _thread
import sys
import weakref
from time import sleep as _sleep
class Thread:
"""线程类的核心实现"""
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
"""
初始化线程对象
参数说明:
- group: 保留参数,必须为None
- target: 线程执行的目标函数
- name: 线程名称
- args: 目标函数的位置参数
- kwargs: 目标函数的关键字参数
- daemon: 是否为守护线程
"""
assert group is None, "group argument must be None for now"
if kwargs is None:
kwargs = {}
self._target = target
self._name = str(name or _newname())
self._args = args
self._kwargs = kwargs
self._daemonic = daemon
# 线程状态管理
self._ident = None # 线程ID
self._tstate_lock = None # 线程状态锁
self._started = _Event() # 启动事件
self._is_stopped = False # 停止标志
self._initialized = True
# 将线程注册到全局线程列表
_limbo[self] = self
def start(self):
"""启动线程"""
if not self._initialized:
raise RuntimeError("thread.__init__() not called")
if self._started.is_set():
raise RuntimeError("threads can only be started once")
# 设置守护线程状态
if self._daemonic is None:
self._daemonic = current_thread().daemon
# 创建底层线程
try:
_thread.start_new_thread(self._bootstrap, ())
except Exception:
# 启动失败,从limbo中移除
with _active_limbo_lock:
try:
del _limbo[self]
except KeyError:
pass
raise
# 等待线程实际启动
self._started.wait()
def _bootstrap(self):
"""线程启动引导函数(在新线程中执行)"""
try:
self._bootstrap_inner()
except:
# 处理启动过程中的异常
if self._daemonic and _is_main_interpreter():
return
else:
raise
def _bootstrap_inner(self):
"""线程启动的核心逻辑"""
try:
# 获取线程ID和状态锁
self._ident = _thread.get_ident()
self._tstate_lock = _thread.allocate_lock()
self._tstate_lock.acquire()
# 将线程从limbo移动到active
with _active_limbo_lock:
try:
del _limbo[self]
except KeyError:
pass
_active[self._ident] = self
# 设置线程名称
try:
_thread._set_name(self._name)
except AttributeError:
pass
# 通知线程已启动
self._started.set()
finally:
# 确保锁被释放
pass
# 执行线程主体
try:
self.run()
finally:
# 线程结束清理
self._delete()
def run(self):
"""线程的主执行函数(可以被子类重写)"""
try:
if self._target:
self._target(*self._args, **self._kwargs)
finally:
# 避免在线程结束后保持对象引用
del self._target, self._args, self._kwargs
def join(self, timeout=None):
"""等待线程结束"""
if not self._initialized:
raise RuntimeError("Thread.__init__() not called")
if not self._started.is_set():
raise RuntimeError("cannot join thread before it is started")
if self is current_thread():
raise RuntimeError("cannot join current thread")
if timeout is None:
# 无限等待
self._wait_for_tstate_lock()
else:
# 带超时等待
self._wait_for_tstate_lock(timeout=max(timeout, 0))
def _wait_for_tstate_lock(self, block=True, timeout=-1):
"""等待线程状态锁(表示线程结束)"""
lock = self._tstate_lock
if lock is None:
# 线程从未启动或已经结束
assert self._is_stopped
return
try:
if lock.acquire(block, timeout):
lock.release()
self._stop()
except:
# 超时或被中断
pass
def _delete(self):
"""线程结束时的清理工作"""
with _active_limbo_lock:
try:
# 从活动线程列表中移除
del _active[self._ident]
except KeyError:
pass
# 释放线程状态锁
try:
self._tstate_lock.release()
except:
pass
@property
def ident(self):
"""线程标识符"""
assert self._initialized
return self._ident
@property
def native_id(self):
"""原生线程ID(操作系统级别)"""
return _thread.get_native_id() if self._ident == _thread.get_ident() else None
def is_alive(self):
"""检查线程是否存活"""
assert self._initialized
if self._is_stopped or not self._started.is_set():
return False
# 检查线程状态锁
if self._tstate_lock is None:
return False
# 尝试非阻塞获取锁
if self._tstate_lock.acquire(False):
self._tstate_lock.release()
self._stop()
return False
else:
return True
# 线程管理全局变量
_active_limbo_lock = _RLock() # 保护_active和_limbo的锁
_active = {} # 活动线程字典 {线程ID: Thread对象}
_limbo = {} # 等待启动的线程
_counter = 0 # 线程计数器
def _newname(template="Thread-%d"):
"""生成新线程名称"""
global _counter
_counter += 1
return template % _counter
def current_thread():
"""获取当前线程对象"""
try:
return _active[_thread.get_ident()]
except KeyError:
# 主线程或未通过Thread类创建的线程
return _DummyThread()
3.2 同步原语实现
# 同步原语的详细实现
import _thread
from collections import deque
import warnings
class Lock:
"""互斥锁实现"""
def __init__(self):
self._lock = _thread.allocate_lock()
def acquire(self, blocking=True, timeout=-1):
"""
获取锁
参数:
- blocking: 是否阻塞
- timeout: 超时时间(秒)
返回: 是否成功获取锁
"""
return self._lock.acquire(blocking, timeout)
def release(self):
"""释放锁"""
try:
self._lock.release()
except RuntimeError:
raise RuntimeError("release unlocked lock")
def locked(self):
"""检查锁是否被持有"""
return self._lock.locked()
def __enter__(self):
self._lock.acquire()
return self
def __exit__(self, t, v, tb):
self._lock.release()
def __repr__(self):
status = "locked" if self._lock.locked() else "unlocked"
return f"<{self.__class__.__name__} object at {hex(id(self))}: {status}>"
class RLock:
"""可重入锁实现"""
def __init__(self):
self._block = _thread.allocate_lock() # 底层锁
self._owner = None # 锁的持有者
self._count = 0 # 重入计数
def acquire(self, blocking=True, timeout=-1):
"""获取可重入锁"""
me = _thread.get_ident()
if self._owner == me:
# 同一线程重入
self._count += 1
return True
# 尝试获取底层锁
rc = self._block.acquire(blocking, timeout)
if rc:
self._owner = me
self._count = 1
return rc
def release(self):
"""释放可重入锁"""
if self._owner != _thread.get_ident():
raise RuntimeError("cannot release un-acquired lock")
self._count -= 1
if self._count == 0:
self._owner = None
self._block.release()
def _is_owned(self):
"""检查当前线程是否持有锁"""
return self._owner == _thread.get_ident()
def __enter__(self):
self.acquire()
return self
def __exit__(self, t, v, tb):
self.release()
class Condition:
"""条件变量实现"""
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
# 获取锁的底层acquire和release方法
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
# 等待线程队列
self._waiters = []
def wait(self, timeout=None):
"""等待条件满足"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
# 创建等待锁
waiter = _thread.allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
# 释放主锁
saved_state = self._release_save()
gotit = False
try:
# 等待通知
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
# 恢复主锁
self._acquire_restore(saved_state)
if not gotit:
# 超时或被中断,从等待队列移除
try:
self._waiters.remove(waiter)
except ValueError:
pass
def notify(self, n=1):
"""通知等待的线程"""
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = all_waiters[:n]
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release()
try:
all_waiters.remove(waiter)
except ValueError:
pass
def notify_all(self):
"""通知所有等待的线程"""
self.notify(len(self._waiters))
def wait_for(self, predicate, timeout=None):
"""等待直到谓词为真"""
endtime = None
waittime = timeout
result = predicate()
while not result:
if waittime is not None:
if endtime is None:
endtime = _time() + waittime
else:
waittime = endtime - _time()
if waittime <= 0:
break
self.wait(waittime)
result = predicate()
return result
class Semaphore:
"""信号量实现"""
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
"""获取信号量"""
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value -= 1
rc = True
return rc
def release(self):
"""释放信号量"""
with self._cond:
self._value += 1
self._cond.notify()
def __enter__(self):
self.acquire()
return self
def __exit__(self, t, v, tb):
self.release()
class BoundedSemaphore(Semaphore):
"""有界信号量"""
def __init__(self, value=1):
super().__init__(value)
self._initial_value = value
def release(self):
"""释放有界信号量"""
with self._cond:
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")
self._value += 1
self._cond.notify()
class Event:
"""事件对象实现"""
def __init__(self):
self._cond = Condition(Lock())
self._flag = False
def is_set(self):
"""检查事件是否被设置"""
return self._flag
def set(self):
"""设置事件"""
with self._cond:
self._flag = True
self._cond.notify_all()
def clear(self):
"""清除事件"""
with self._cond:
self._flag = False
def wait(self, timeout=None):
"""等待事件被设置"""
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait_for(lambda: self._flag, timeout)
return signaled
# 工具函数
def _time():
"""获取当前时间(用于超时计算)"""
return __import__('time').time()
4. 线程并发编程模式
4.1 生产者-消费者模式
# 高级并发编程模式实现
import threading
import queue
import time
import random
from typing import Optional, Callable, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
class ThreadingPatterns:
"""线程并发编程模式演示"""
def __init__(self):
self.results = []
self.stats = {}
def demonstrate_producer_consumer(self):
"""演示生产者-消费者模式"""
print("=== 生产者-消费者模式演示 ===")
# 共享队列
buffer = queue.Queue(maxsize=10)
# 统计信息
stats_lock = threading.Lock()
stats = {
'produced': 0,
'consumed': 0,
'max_queue_size': 0
}
def producer(producer_id: int, items_count: int):
"""生产者函数"""
for i in range(items_count):
item = f"Producer-{producer_id}-Item-{i}"
# 模拟生产时间
time.sleep(random.uniform(0.01, 0.05))
# 放入队列
buffer.put(item)
# 更新统计
with stats_lock:
stats['produced'] += 1
current_size = buffer.qsize()
if current_size > stats['max_queue_size']:
stats['max_queue_size'] = current_size
print(f" 生产: {item} (队列大小: {buffer.qsize()})")
print(f"生产者 {producer_id} 完成")
def consumer(consumer_id: int, stop_event: threading.Event):
"""消费者函数"""
while not stop_event.is_set():
try:
# 从队列获取项目(带超时)
item = buffer.get(timeout=0.1)
# 模拟消费时间
time.sleep(random.uniform(0.02, 0.08))
# 更新统计
with stats_lock:
stats['consumed'] += 1
print(f" 消费: {item} (队列大小: {buffer.qsize()})")
# 标记任务完成
buffer.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"消费者 {consumer_id} 错误: {e}")
print(f"消费者 {consumer_id} 退出")
# 创建并启动线程
stop_event = threading.Event()
# 启动3个生产者
producers = []
for i in range(3):
t = threading.Thread(target=producer, args=(i, 5))
t.start()
producers.append(t)
# 启动2个消费者
consumers = []
for i in range(2):
t = threading.Thread(target=consumer, args=(i, stop_event))
t.daemon = True # 守护线程
t.start()
consumers.append(t)
# 等待所有生产者完成
for t in producers:
t.join()
# 等待队列清空
buffer.join()
# 停止消费者
stop_event.set()
# 打印统计信息
print(f"\n生产者-消费者统计:")
print(f" 生产项目: {stats['produced']}")
print(f" 消费项目: {stats['consumed']}")
print(f" 最大队列大小: {stats['max_queue_size']}")
def demonstrate_worker_pool(self):
"""演示工作线程池模式"""
print(f"\n=== 工作线程池模式演示 ===")
class WorkerPool:
def __init__(self, num_workers: int = 4):
self.num_workers = num_workers
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.workers = []
self.stop_event = threading.Event()
self.stats_lock = threading.Lock()
self.stats = {
'tasks_submitted': 0,
'tasks_completed': 0,
'total_processing_time': 0.0
}
def _worker(self, worker_id: int):
"""工作线程函数"""
print(f" 工作线程 {worker_id} 启动")
while not self.stop_event.is_set():
try:
# 获取任务
task_func, args, kwargs = self.task_queue.get(timeout=0.1)
# 执行任务
start_time = time.time()
try:
result = task_func(*args, **kwargs)
end_time = time.time()
# 记录结果
self.result_queue.put(('success', result))
# 更新统计
with self.stats_lock:
self.stats['tasks_completed'] += 1
self.stats['total_processing_time'] += (end_time - start_time)
print(f" 工作线程 {worker_id} 完成任务")
except Exception as e:
self.result_queue.put(('error', e))
finally:
self.task_queue.task_done()
except queue.Empty:
continue
print(f" 工作线程 {worker_id} 退出")
def start(self):
"""启动工作线程池"""
for i in range(self.num_workers):
worker = threading.Thread(target=self._worker, args=(i,))
worker.daemon = True
worker.start()
self.workers.append(worker)
print(f"工作线程池启动,{self.num_workers} 个工作线程")
def submit_task(self, func: Callable, *args, **kwargs):
"""提交任务"""
self.task_queue.put((func, args, kwargs))
with self.stats_lock:
self.stats['tasks_submitted'] += 1
def get_result(self, timeout: Optional[float] = None):
"""获取结果"""
try:
return self.result_queue.get(timeout=timeout)
except queue.Empty:
return None
def shutdown(self, wait: bool = True):
"""关闭线程池"""
if wait:
self.task_queue.join() # 等待所有任务完成
self.stop_event.set()
if wait:
for worker in self.workers:
worker.join()
print("工作线程池已关闭")
def get_stats(self):
"""获取统计信息"""
with self.stats_lock:
return self.stats.copy()
# 定义测试任务
def cpu_intensive_task(n: int) -> int:
"""CPU密集型任务"""
result = 0
for i in range(n):
result += i * i
return result
def io_intensive_task(duration: float) -> str:
"""I/O密集型任务"""
time.sleep(duration)
return f"Task completed in {duration:.2f}s"
# 使用工作线程池
pool = WorkerPool(num_workers=4)
pool.start()
# 提交任务
print("提交CPU密集型任务:")
for i in range(8):
pool.submit_task(cpu_intensive_task, 50000)
print("提交I/O密集型任务:")
for i in range(5):
pool.submit_task(io_intensive_task, 0.1)
# 收集结果
print(f"\n收集结果:")
completed = 0
total_tasks = 13
while completed < total_tasks:
result = pool.get_result(timeout=1.0)
if result:
status, value = result
if status == 'success':
print(f" 任务成功: {type(value).__name__}")
else:
print(f" 任务失败: {value}")
completed += 1
# 获取统计信息
stats = pool.get_stats()
print(f"\n线程池统计:")
print(f" 提交任务: {stats['tasks_submitted']}")
print(f" 完成任务: {stats['tasks_completed']}")
print(f" 平均处理时间: {stats['total_processing_time'] / stats['tasks_completed']:.4f}s")
# 关闭线程池
pool.shutdown()
def demonstrate_read_write_lock(self):
"""演示读写锁模式"""
print(f"\n=== 读写锁模式演示 ===")
class ReadWriteLock:
"""读写锁实现"""
def __init__(self):
self._read_ready = threading.Condition(threading.RLock())
self._readers = 0
def acquire_read(self):
"""获取读锁"""
self._read_ready.acquire()
try:
self._readers += 1
finally:
self._read_ready.release()
def release_read(self):
"""释放读锁"""
self._read_ready.acquire()
try:
self._readers -= 1
if self._readers == 0:
self._read_ready.notifyAll()
finally:
self._read_ready.release()
def acquire_write(self):
"""获取写锁"""
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()
def release_write(self):
"""释放写锁"""
self._read_ready.release()
# 共享数据
shared_data = {"counter": 0, "data": []}
rw_lock = ReadWriteLock()
def reader(reader_id: int, read_count: int):
"""读者函数"""
for i in range(read_count):
rw_lock.acquire_read()
try:
# 读取数据
counter = shared_data["counter"]
data_len = len(shared_data["data"])
print(f" 读者 {reader_id}: counter={counter}, data_len={data_len}")
# 模拟读取时间
time.sleep(0.01)
finally:
rw_lock.release_read()
time.sleep(0.02) # 读取间隔
def writer(writer_id: int, write_count: int):
"""写者函数"""
for i in range(write_count):
rw_lock.acquire_write()
try:
# 修改数据
shared_data["counter"] += 1
shared_data["data"].append(f"writer-{writer_id}-item-{i}")
print(f" 写者 {writer_id}: 写入数据, counter={shared_data['counter']}")
# 模拟写入时间
time.sleep(0.05)
finally:
rw_lock.release_write()
time.sleep(0.1) # 写入间隔
# 创建读者和写者线程
threads = []
# 启动3个读者
for i in range(3):
t = threading.Thread(target=reader, args=(i, 5))
threads.append(t)
# 启动2个写者
for i in range(2):
t = threading.Thread(target=writer, args=(i, 3))
threads.append(t)
# 启动所有线程
for t in threads:
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print(f"最终数据状态:")
print(f" counter: {shared_data['counter']}")
print(f" data length: {len(shared_data['data'])}")
def run_all_patterns(self):
"""运行所有并发模式演示"""
print("线程并发编程模式演示\n")
self.demonstrate_producer_consumer()
self.demonstrate_worker_pool()
self.demonstrate_read_write_lock()
print(f"\n{'='*50}")
print("并发模式演示完成")
print(f"{'='*50}")
# 运行并发模式演示
if __name__ == "__main__":
patterns = ThreadingPatterns()
patterns.run_all_patterns()
5. 线程安全与同步机制
5.1 原子操作与内存模型
/* Include/cpython/pyatomic.h - 原子操作实现 */
/* 原子整数操作 */
static inline int
_Py_atomic_load_int(const int *obj)
{
#if defined(_MSC_VER)
return _InterlockedOr((volatile long*)obj, 0);
#elif defined(__GNUC__)
return __atomic_load_n(obj, __ATOMIC_SEQ_CST);
#else
/* 回退到非原子操作(假设单线程或有其他同步) */
return *obj;
#endif
}
static inline void
_Py_atomic_store_int(int *obj, int value)
{
#if defined(_MSC_VER)
_InterlockedExchange((volatile long*)obj, value);
#elif defined(__GNUC__)
__atomic_store_n(obj, value, __ATOMIC_SEQ_CST);
#else
*obj = value;
#endif
}
static inline int
_Py_atomic_add_int(int *obj, int value)
{
#if defined(_MSC_VER)
return _InterlockedExchangeAdd((volatile long*)obj, value);
#elif defined(__GNUC__)
return __atomic_fetch_add(obj, value, __ATOMIC_SEQ_CST);
#else
int old = *obj;
*obj += value;
return old;
#endif
}
/* 原子指针操作 */
static inline void*
_Py_atomic_load_ptr(const void **obj)
{
#if defined(_MSC_VER)
return (void*)_InterlockedOrPtr((volatile LONG_PTR*)obj, 0);
#elif defined(__GNUC__)
return __atomic_load_n(obj, __ATOMIC_SEQ_CST);
#else
return (void*)*obj;
#endif
}
static inline void
_Py_atomic_store_ptr(void **obj, void *value)
{
#if defined(_MSC_VER)
_InterlockedExchangePointer((volatile PVOID*)obj, value);
#elif defined(__GNUC__)
__atomic_store_n(obj, value, __ATOMIC_SEQ_CST);
#else
*obj = value;
#endif
}
/* 比较并交换 */
static inline int
_Py_atomic_compare_exchange_ptr(void **obj, void **expected, void *desired)
{
#if defined(_MSC_VER)
void *old = _InterlockedCompareExchangePointer((volatile PVOID*)obj, desired, *expected);
if (old == *expected) {
return 1;
} else {
*expected = old;
return 0;
}
#elif defined(__GNUC__)
return __atomic_compare_exchange_n(obj, expected, desired, 0,
__ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
#else
if (*obj == *expected) {
*obj = desired;
return 1;
} else {
*expected = *obj;
return 0;
}
#endif
}
5.2 高级同步原语
# 高级同步原语实现
import threading
import time
import collections
from typing import Optional, Any, Callable
class AdvancedSynchronization:
"""高级同步原语演示"""
def demonstrate_barrier(self):
"""演示屏障同步"""
print("=== 屏障同步演示 ===")
class Barrier:
"""自定义屏障实现"""
def __init__(self, parties: int, action: Optional[Callable] = None):
self._parties = parties
self._action = action
self._lock = threading.Lock()
self._condition = threading.Condition(self._lock)
self._count = 0
self._generation = 0
def wait(self, timeout: Optional[float] = None) -> int:
"""等待所有线程到达屏障"""
with self._condition:
generation = self._generation
self._count += 1
if self._count == self._parties:
# 最后一个到达的线程
self._count = 0
self._generation += 1
# 执行屏障动作
if self._action:
try:
self._action()
except Exception as e:
print(f"屏障动作异常: {e}")
# 唤醒所有等待的线程
self._condition.notify_all()
return self._parties - 1
else:
# 等待其他线程
while (self._count < self._parties and
generation == self._generation):
if not self._condition.wait(timeout):
# 超时
raise threading.BrokenBarrierError("超时")
return self._parties - self._count
def barrier_action():
"""屏障动作:所有线程到达时执行"""
print(f" >>> 所有线程已到达屏障,执行同步动作 <<<")
def worker(worker_id: int, barrier: Barrier, phases: int):
"""工作线程函数"""
for phase in range(phases):
# 模拟工作
work_time = 0.1 + (worker_id * 0.05)
print(f" 工作线程 {worker_id} 阶段 {phase}: 工作 {work_time:.2f}s")
time.sleep(work_time)
# 到达屏障
print(f" 工作线程 {worker_id} 到达屏障 (阶段 {phase})")
try:
index = barrier.wait(timeout=2.0)
print(f" 工作线程 {worker_id} 通过屏障 (索引: {index})")
except threading.BrokenBarrierError as e:
print(f" 工作线程 {worker_id} 屏障错误: {e}")
break
# 使用屏障同步
num_workers = 4
phases = 3
barrier = Barrier(num_workers, barrier_action)
threads = []
for i in range(num_workers):
t = threading.Thread(target=worker, args=(i, barrier, phases))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print("屏障同步演示完成")
def demonstrate_countdown_latch(self):
"""演示倒计时锁"""
print(f"\n=== 倒计时锁演示 ===")
class CountDownLatch:
"""倒计时锁实现"""
def __init__(self, count: int):
if count < 0:
raise ValueError("count must be >= 0")
self._count = count
self._lock = threading.Lock()
self._condition = threading.Condition(self._lock)
def count_down(self):
"""减少计数"""
with self._condition:
if self._count > 0:
self._count -= 1
if self._count == 0:
self._condition.notify_all()
def wait(self, timeout: Optional[float] = None) -> bool:
"""等待计数归零"""
with self._condition:
while self._count > 0:
if not self._condition.wait(timeout):
return False # 超时
return True
def get_count(self) -> int:
"""获取当前计数"""
with self._lock:
return self._count
def initialization_task(task_id: int, latch: CountDownLatch):
"""初始化任务"""
print(f" 初始化任务 {task_id} 开始")
# 模拟初始化工作
init_time = 0.2 + (task_id * 0.1)
time.sleep(init_time)
print(f" 初始化任务 {task_id} 完成")
latch.count_down()
def main_task(latch: CountDownLatch):
"""主任务:等待所有初始化完成"""
print("主任务等待所有初始化任务完成...")
start_time = time.time()
if latch.wait(timeout=5.0):
elapsed = time.time() - start_time
print(f"所有初始化任务完成,耗时 {elapsed:.2f}s")
print("主任务开始执行")
else:
print("等待初始化任务超时")
# 使用倒计时锁
num_init_tasks = 5
latch = CountDownLatch(num_init_tasks)
# 启动主任务
main_thread = threading.Thread(target=main_task, args=(latch,))
main_thread.start()
# 启动初始化任务
init_threads = []
for i in range(num_init_tasks):
t = threading.Thread(target=initialization_task, args=(i, latch))
init_threads.append(t)
t.start()
# 等待所有线程完成
for t in init_threads:
t.join()
main_thread.join()
print("倒计时锁演示完成")
def demonstrate_future_promise(self):
"""演示Future/Promise模式"""
print(f"\n=== Future/Promise模式演示 ===")
class Future:
"""Future实现"""
def __init__(self):
self._lock = threading.Lock()
self._condition = threading.Condition(self._lock)
self._state = 'PENDING' # PENDING, COMPLETED, CANCELLED
self._result = None
self._exception = None
self._callbacks = []
def set_result(self, result: Any):
"""设置结果"""
with self._condition:
if self._state != 'PENDING':
raise RuntimeError("Future已完成")
self._result = result
self._state = 'COMPLETED'
self._condition.notify_all()
# 执行回调
for callback in self._callbacks:
try:
callback(self)
except Exception as e:
print(f"回调异常: {e}")
def set_exception(self, exception: Exception):
"""设置异常"""
with self._condition:
if self._state != 'PENDING':
raise RuntimeError("Future已完成")
self._exception = exception
self._state = 'COMPLETED'
self._condition.notify_all()
# 执行回调
for callback in self._callbacks:
try:
callback(self)
except Exception as e:
print(f"回调异常: {e}")
def get(self, timeout: Optional[float] = None) -> Any:
"""获取结果"""
with self._condition:
while self._state == 'PENDING':
if not self._condition.wait(timeout):
raise TimeoutError("获取结果超时")
if self._exception:
raise self._exception
return self._result
def add_done_callback(self, callback: Callable):
"""添加完成回调"""
with self._lock:
if self._state == 'COMPLETED':
# 已完成,立即执行回调
try:
callback(self)
except Exception as e:
print(f"回调异常: {e}")
else:
# 添加到回调列表
self._callbacks.append(callback)
def is_done(self) -> bool:
"""检查是否完成"""
with self._lock:
return self._state != 'PENDING'
def async_computation(future: Future, computation_id: int):
"""异步计算任务"""
try:
print(f" 异步计算 {computation_id} 开始")
# 模拟计算
compute_time = 0.5 + (computation_id * 0.2)
time.sleep(compute_time)
# 模拟可能的错误
if computation_id == 2:
raise ValueError(f"计算 {computation_id} 失败")
result = computation_id * computation_id
print(f" 异步计算 {computation_id} 完成,结果: {result}")
future.set_result(result)
except Exception as e:
print(f" 异步计算 {computation_id} 异常: {e}")
future.set_exception(e)
def result_callback(future: Future):
"""结果回调"""
try:
result = future.get()
print(f" 回调收到结果: {result}")
except Exception as e:
print(f" 回调收到异常: {e}")
# 使用Future/Promise
futures = []
for i in range(4):
future = Future()
future.add_done_callback(result_callback)
# 启动异步计算
t = threading.Thread(target=async_computation, args=(future, i))
t.start()
futures.append((future, t))
# 等待并获取结果
print("等待异步计算结果:")
for i, (future, thread) in enumerate(futures):
try:
result = future.get(timeout=2.0)
print(f" 计算 {i} 结果: {result}")
except Exception as e:
print(f" 计算 {i} 异常: {e}")
thread.join()
print("Future/Promise演示完成")
def run_all_synchronization(self):
"""运行所有同步原语演示"""
print("高级同步原语演示\n")
self.demonstrate_barrier()
self.demonstrate_countdown_latch()
self.demonstrate_future_promise()
print(f"\n{'='*50}")
print("同步原语演示完成")
print(f"{'='*50}")
# 运行同步原语演示
if __name__ == "__main__":
sync_demo = AdvancedSynchronization()
sync_demo.run_all_synchronization()
6. 线程与并发时序图
sequenceDiagram
participant App as 应用程序
participant Thread as Thread类
participant GIL as GIL系统
participant OS as 操作系统
Note over App,OS: 线程创建和启动
App->>Thread: Thread(target=func)
Thread->>Thread: 初始化线程对象
App->>Thread: start()
Thread->>OS: _thread.start_new_thread()
OS->>OS: 创建原生线程
Thread->>GIL: take_gil()
GIL->>Thread: 获取GIL成功
Thread->>Thread: _bootstrap_inner()
Thread->>App: 通知线程已启动
Note over App,OS: 线程执行和GIL管理
Thread->>Thread: run()执行用户代码
Thread->>GIL: 字节码执行检查
GIL->>GIL: 检查gil_drop_request
alt 有其他线程等待
GIL->>Thread: 释放GIL
GIL->>OS: 通知等待线程
OS->>GIL: 调度到其他线程
GIL->>Thread: 其他线程获取GIL
end
Note over App,OS: 同步操作
Thread->>Thread: 调用Lock.acquire()
Thread->>GIL: 释放GIL
Thread->>OS: 系统级锁等待
OS->>Thread: 锁可用通知
Thread->>GIL: 重新获取GIL
Note over App,OS: 线程结束
Thread->>Thread: run()执行完成
Thread->>Thread: _delete()清理
Thread->>OS: 线程退出
App->>Thread: join()等待
Thread->>App: 线程已结束
7. 总结
Python的线程与并发系统展现了复杂而高效的设计:
7.1 核心特点
- GIL机制: 全局解释器锁确保线程安全
- 跨平台抽象: 统一的线程接口适配不同操作系统
- 丰富的同步原语: 支持各种并发编程模式
- 高级抽象: threading模块提供面向对象的接口
7.2 设计权衡
- 简化vs性能: GIL简化了实现但限制了并行性
- 安全vs效率: 引用计数需要GIL保护
- 兼容性vs现代性: 保持向后兼容的同时支持新特性
7.3 应用指导
- I/O密集型: 线程模型适合I/O等待场景
- CPU密集型: 考虑多进程或异步模型
- 混合场景: 使用concurrent.futures统一接口
- 同步需求: 选择合适的同步原语
7.4 最佳实践
- 避免竞态条件: 正确使用锁和同步原语
- 防止死锁: 注意锁的获取顺序
- 资源管理: 使用上下文管理器
- 性能监控: 关注GIL争用情况
Python的线程与并发系统为开发者提供了强大而灵活的并发编程工具,理解其实现原理有助于编写高效的并发程序。