Python如何创建线程池?
在 Python 中,可以使用 concurrent.futures 或 multiprocessing 模块来创建线程池。以下是常用的方法:
1. 使用 concurrent.futures.ThreadPoolExecutor
这是 Python 标准库中最方便创建线程池的方法之一,它提供了易用的线程池接口,可以简单地提交任务,并等待它们完成。
from concurrent.futures import ThreadPoolExecutor
# 定义一个任务函数
def task_function(x):
return x * x
# 使用线程池
def main():
# 创建一个线程池,指定线程数
with ThreadPoolExecutor(max_workers=4) as executor:
# 提交任务到线程池,返回一个 Future 对象
future = executor.submit(task_function, 5)
print(f"Result: {future.result()}") # 获取任务结果
# 多个任务提交方法1:使用 submit()
futures = [executor.submit(task_function, i) for i in range(10)]
# 输出结果
for future in futures:
print(future.result())
# 多个任务提交方法2:使用 map()
results = executor.map(task_function, range(10))
print(list(results)) # 输出所有结果
if __name__ == '__main__':
main()
说明:
max_workers:指定线程池中的最大线程数。.submit():向线程池中提交一个任务,返回一个Future对象。.map():批量提交任务,返回结果的迭代器。- 使用
with语法创建线程池,线程池会自动关闭,避免资源泄露。
2. 使用 multiprocessing.dummy.Pool
multiprocessing.dummy 是一个基于线程的实现,与 multiprocessing 的 API 一致。使用 Pool 可以很方便地创建线程池并执行并发任务。
from multiprocessing.dummy import Pool
# 定义任务函数
def task_function(x):
return x * x
# 使用线程池
def main():
# 创建一个线程池,指定线程数
pool = Pool(4)
# 使用 map 方法分发任务
results = pool.map(task_function, range(10))
print(results)
# 使用 apply_async 方法提交单个任务
async_result = pool.apply_async(task_function, (5,))
print(async_result.get()) # 获取任务结果
# 关闭线程池
pool.close()
pool.join()
if __name__ == '__main__':
main()
说明:
Pool(4):创建一个最多包含 4 个线程的线程池。.map():将函数应用于每个输入(可迭代),返回结果列表。.apply_async():提交异步任务。.close()和.join():关闭线程池并等待所有线程完成。
3. 手动创建线程池(不推荐)
如果不使用库,也可以手动创建线程池。这种方式需要更多处理,比如线程的启动、任务队列管理、回调和终止线程。
import threading
import queue
class ThreadPool:
def __init__(self, num_threads):
self.tasks = queue.Queue()
self.threads = []
# 创建线程
for _ in range(num_threads):
thread = threading.Thread(target=self.worker)
thread.daemon = True # 确保主线程退出时自动回收
thread.start()
self.threads.append(thread)
def worker(self):
while True:
func, args, kwargs = self.tasks.get()
try:
func(*args, **kwargs)
finally:
self.tasks.task_done()
def submit(self, func, *args, **kwargs):
self.tasks.put((func, args, kwargs))
def wait(self):
self.tasks.join()
# 使用线程池
def task_function(x):
print(f"Task {x} completed in thread {threading.current_thread().name}")
if __name__ == '__main__':
pool = ThreadPool(num_threads=4)
for i in range(10):
pool.submit(task_function, i)
pool.wait()
说明:
- 手动实现线程池需要管理任务队列和线程生命周期。
- 不推荐这种方法,因为标准库已经封装好这些功能。
比较与总结
| 方法 | 优点 | 缺点 |
|---|---|---|
concurrent.futures.ThreadPoolExecutor | 最易用,支持 submit() 和 map() 函数 | 缺乏复杂任务队列管理能力 |
multiprocessing.dummy.Pool | 与 multiprocessing.Pool API 一致 | 较少高级功能 |
| 手动实现线程池 | 灵活,可完全控制线程和任务逻辑 | 代码复杂,容易出错 |
在大多数情况下,推荐使用 concurrent.futures.ThreadPoolExecutor,它易用且功能全面,适合现代 Python 开发。

关注公众号:程序新视界,一个让你软实力、硬技术同步提升的平台
除非注明,否则均为程序新视界原创文章,转载必须以链接形式标明本文链接
本文链接:http://choupangxia.com/2025/08/02/python-create-theadpool/