跳到主要内容

Python 线程通信

在编写多线程程序时,线程之间的通信是一个不可避免的话题。良好的线程通信机制能够让多个线程协同工作,提高程序的效率和可靠性。本文将介绍Python中常用的线程通信机制,并通过实例讲解其应用场景。

线程通信的基本概念

线程通信是指多个线程之间传递信息的过程。在Python中,由于所有线程共享同一个进程的内存空间,因此线程通信相对简单。但同时也带来了数据竞争和同步问题。

常见的线程通信机制包括:

  1. 共享变量
  2. 队列(Queue)
  3. 事件(Event)
  4. 条件变量(Condition)
  5. 信号量(Semaphore)

接下来我们将详细介绍每种机制。

1. 共享变量

最基本的线程通信方式是通过共享变量。但使用共享变量时需要注意线程安全问题,通常需要加锁保护。

使用Lock保护共享变量

python
import threading
import time

# 共享变量
counter = 0
# 创建锁
lock = threading.Lock()

def increment():
global counter
for _ in range(100000):
# 获取锁
lock.acquire()
try:
# 修改共享变量
counter += 1
finally:
# 释放锁
lock.release()

# 创建两个线程
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)

# 启动线程
t1.start()
t2.start()

# 等待线程结束
t1.join()
t2.join()

print(f"最终计数: {counter}") # 输出: 最终计数: 200000
注意

不使用锁保护共享变量可能导致数据竞争,结果将是不可预测的。共享变量是最简单但也是最容易出错的通信方式。

2. 使用Queue进行线程通信

Queue模块提供了一个线程安全的FIFO队列实现,非常适合用于线程间通信。

生产者-消费者模式

python
import threading
import queue
import time
import random

# 创建队列
q = queue.Queue(maxsize=10)

def producer():
"""生产者:产生数据放入队列"""
for i in range(5):
item = f"item-{i}"
# 将数据放入队列
q.put(item)
print(f"生产: {item}")
# 随机暂停一段时间
time.sleep(random.random())

def consumer():
"""消费者:从队列获取数据处理"""
while True:
# 从队列获取数据
item = q.get()
print(f"消费: {item}")
# 随机暂停一段时间
time.sleep(random.random() * 2)
# 通知队列任务已完成
q.task_done()

# 创建生产者线程
producer_thread = threading.Thread(target=producer)
# 创建消费者线程
consumer_thread = threading.Thread(target=consumer, daemon=True)

# 启动线程
consumer_thread.start()
producer_thread.start()

# 等待生产者完成
producer_thread.join()
# 等待所有任务处理完成
q.join()
print("所有工作已完成")

输出示例:

生产: item-0
消费: item-0
生产: item-1
消费: item-1
生产: item-2
消费: item-2
生产: item-3
消费: item-3
生产: item-4
消费: item-4
所有工作已完成
提示

Queue是线程安全的,内部已经实现了锁机制,不需要额外的同步操作。queue.Queue是最推荐的线程间通信方式之一。

3. 使用Event进行线程通信

Event对象提供了一种简单的线程通信机制,用于通知事件的发生。

python
import threading
import time

# 创建事件对象
event = threading.Event()

def waiter():
"""等待事件被设置"""
print("等待事件...")
# 阻塞直到事件被设置
event.wait()
print("事件已被设置,继续执行")

def setter():
"""设置事件"""
print("准备设置事件")
time.sleep(3) # 模拟一些操作
print("设置事件")
# 设置事件
event.set()

# 创建线程
waiter_thread = threading.Thread(target=waiter)
setter_thread = threading.Thread(target=setter)

# 启动线程
waiter_thread.start()
setter_thread.start()

# 等待线程结束
waiter_thread.join()
setter_thread.join()

输出:

等待事件...
准备设置事件
设置事件
事件已被设置,继续执行

Event对象有四个主要方法:

  • set(): 设置事件,使等待的线程继续执行
  • clear(): 清除事件,恢复到未设置状态
  • is_set(): 检查事件是否已设置
  • wait(): 阻塞直到事件被设置

4. 使用Condition实现线程通信

Condition对象提供了比Event更复杂的线程同步功能,允许线程等待特定条件满足。

实现生产者-消费者模式

python
import threading
import time
import random

# 创建条件变量
condition = threading.Condition()
# 共享缓冲区
buffer = []
MAX_SIZE = 5

def producer():
"""生产者:当缓冲区未满时添加数据"""
for i in range(10):
# 获取条件变量锁
with condition:
# 当缓冲区已满时等待
while len(buffer) == MAX_SIZE:
print("缓冲区已满,生产者等待...")
condition.wait()

# 添加数据
item = f"item-{i}"
buffer.append(item)
print(f"生产: {item}")

# 通知消费者可以获取数据
condition.notify()

# 模拟生产耗时
time.sleep(random.random())

def consumer():
"""消费者:当缓冲区有数据时获取数据"""
while True:
# 获取条件变量锁
with condition:
# 当缓冲区为空时等待
while not buffer:
print("缓冲区为空,消费者等待...")
condition.wait()

# 获取数据
item = buffer.pop(0)
print(f"消费: {item}")

# 通知生产者可以添加数据
condition.notify()

# 模拟消费耗时
time.sleep(random.random() * 2)

# 当消费10个数据后退出
if "item-9" in item:
break

# 创建线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# 启动线程
producer_thread.start()
consumer_thread.start()

# 等待线程结束
producer_thread.join()
consumer_thread.join()

输出示例(每次运行略有不同):

生产: item-0
消费: item-0
缓冲区为空,消费者等待...
生产: item-1
消费: item-1
缓冲区为空,消费者等待...
生产: item-2
消费: item-2
...
生产: item-9
消费: item-9

5. 使用Semaphore控制并发访问

信号量用于控制对共享资源的并发访问数量。

python
import threading
import time
import random

# 创建信号量,允许最多3个线程同时访问资源
semaphore = threading.Semaphore(3)

def worker(id):
"""模拟工作线程"""
print(f"工作线程 {id} 尝试访问资源")
# 获取信号量
semaphore.acquire()

try:
print(f"工作线程 {id} 正在使用资源")
# 模拟使用资源
time.sleep(random.random() * 3)
finally:
print(f"工作线程 {id} 释放资源")
# 释放信号量
semaphore.release()

# 创建多个线程
threads = []
for i in range(10):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()

# 等待所有线程结束
for t in threads:
t.join()

输出示例(部分):

工作线程 0 尝试访问资源
工作线程 0 正在使用资源
工作线程 1 尝试访问资源
工作线程 1 正在使用资源
工作线程 2 尝试访问资源
工作线程 2 正在使用资源
工作线程 3 尝试访问资源
...

实际应用案例:网页爬虫

下面是一个使用多线程和队列实现的简单网页爬虫,演示了线程通信的实际应用:

python
import threading
import queue
import time
import random
import requests
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup

# 创建URL队列和已访问集合
url_queue = queue.Queue()
visited_urls = set()
lock = threading.Lock()

# 模拟要爬取的初始URL
base_url = "https://example.com"
url_queue.put(base_url)

def get_links(html, base_url):
"""从HTML中提取链接"""
soup = BeautifulSoup(html, 'html.parser')
links = []
for link in soup.find_all('a'):
href = link.get('href')
if href:
url = urljoin(base_url, href)
# 只保留相同域名的链接
if urlparse(url).netloc == urlparse(base_url).netloc:
links.append(url)
return links

def worker(worker_id):
"""工作线程:从队列获取URL并处理"""
while True:
try:
# 从队列获取URL,5秒超时
url = url_queue.get(timeout=5)

# 检查URL是否已访问
with lock:
if url in visited_urls:
url_queue.task_done()
continue
visited_urls.add(url)

print(f"线程 {worker_id} 正在处理: {url}")

try:
# 获取页面内容
response = requests.get(url, timeout=3)

# 模拟处理页面内容
time.sleep(0.5)

# 如果请求成功,提取链接并添加到队列
if response.status_code == 200:
new_links = get_links(response.text, url)
for link in new_links:
with lock:
if link not in visited_urls:
url_queue.put(link)

print(f"线程 {worker_id} 完成: {url}")

except Exception as e:
print(f"线程 {worker_id} 处理 {url} 时出错: {e}")

finally:
# 通知队列任务完成
url_queue.task_done()

except queue.Empty:
# 队列为空时退出
print(f"线程 {worker_id} 退出,队列为空")
break

# 创建工作线程
NUM_THREADS = 3
threads = []
for i in range(NUM_THREADS):
t = threading.Thread(target=worker, args=(i,))
t.daemon = True # 设置为守护线程
threads.append(t)
t.start()

# 等待队列中的所有URL处理完毕
try:
url_queue.join()
except KeyboardInterrupt:
print("爬虫被中断")

print(f"爬虫完成,共爬取 {len(visited_urls)} 个链接")
备注

上面的代码示例中,我们使用了队列(Queue)进行线程通信,锁(Lock)保护共享资源(已访问URL集合)。这个案例展示了在实际应用中如何结合多种线程通信机制。

线程通信的最佳实践

  1. 优先使用QueueQueue是线程安全的,并且提供了阻塞操作,是线程通信的首选方式。

  2. 避免直接操作共享变量:如果必须使用共享变量,确保使用适当的锁机制保护。

  3. 使用with语句:使用with语句管理锁、条件变量等资源,确保资源正确释放。

  4. 避免过度同步:过度同步会降低并发性能,在设计时应权衡安全性和性能。

  5. 使用适当的超时机制:在等待操作中设置超时,避免程序永久阻塞。

总结

Python线程通信是多线程编程中的重要概念。本文介绍了Python中常用的线程通信机制:共享变量、队列、事件、条件变量和信号量,并通过代码示例展示了它们的用法。

在实际应用中,队列(Queue)通常是最简单、最安全的线程通信方式。根据具体需求,可能需要结合多种通信机制来构建复杂的线程间协作关系。

正确使用这些线程通信机制,可以编写出高效、安全的多线程程序,充分利用多核CPU的处理能力,提高程序性能。

练习题

  1. 编写一个多线程程序,使用Queue实现一个工作池,处理一组任务。

  2. 使用Condition实现一个有限缓冲区,支持多个生产者和多个消费者。

  3. 实现一个线程安全的计数器类,使用适当的锁机制保护共享状态。

  4. 使用Event实现一个线程协调机制,让多个线程在特定条件下同步执行。

扩展阅读

  • Python官方文档中的threading模块
  • Python官方文档中的queue模块
  • 《Python Cookbook》第12章:并发编程
  • 《Fluent Python》中关于线程和并发的章节

掌握线程通信技术是成为Python高级开发者的重要一步。通过不断练习和实践,你将能够设计出更加高效、安全的多线程程序。