python concurrent模块实现多线程

天天365彩票软件官方下载3D ⌚ 2025-11-22 01:06:46 👤 admin 👁️ 8284 ❤️ 579
python concurrent模块实现多线程

引言

之前也写过多线程的博客,用的是 threading ,今天来讲下 python 的另外一个自带库 concurrent 。concurrent 是在 Python3.2 中引入的,只用几行代码就可以编写出线程池/进程池,而且在 IO 型任务由于引入了 Future 的概念(异步)效率要高数倍。而 threading 的话还要自己维护相关的队列防止死锁,代码的可读性也会下降,相反 concurrent 提供的线程池却非常的便捷,不用自己操心死锁以及编写线程池代码,由于异步的概念 IO 型任务也更有优势。

concurrent 的优势在于提供了 ThreadPoolExecutor 和 ProcessPoolExecutor ,一个多线程,一个多进程。但 concurrent 本质上都是对 threading 和 multiprocessing 的封装。看它的源码可以知道,所以最底层并没有异步。ThreadPoolExecutor 自己提供了任务队列,不需要自己写了。而所谓的线程池,它只是简单的比较当前的 threads 数量和定义的 max_workers 的大小,小于 max_workers 就允许任务创建线程执行任务。

操作多线程/多进程

1、创建线程池

通过 ThreadPoolExecutor 类创建线程池对象,max_workers 设置最大运行线程数数。使用 ThreadPoolExecutor 的好处是不用担心线程死锁问题,让多线程编程更简洁。

from concurrent import futures

pool = futures.ThreadPoolExecutor(max_workers = 2)

2、submit

submit(self, fn, *args, **kwargs):

fn:需要异步执行的函数

*args,**kwargs:fn 接受的参数

该方法的作用就是提交一个可执行的回调task,它返回一个Future对象,可以用 result() 获取方法调用的返回值,最好配合 wait 使用。可以看出此方法不会阻塞主线程的执行。

import requests,datetime,time

from concurrent import futures

def get_request(url):

r = requests.get(url)

print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))

urls = ['https://www.baidu.com','https://www.tmall.com','https://www.jd.com']

pool = futures.ThreadPoolExecutor(max_workers = 2)

for url in urls:

task = pool.submit(get_request,url)

print('{}主线程'.format(datetime.datetime.now()))

time.sleep(2)

# 输出结果

2021-03-12 15:29:10.780141:主线程

2021-03-12 15:29:10.865425:https://www.baidu.com 200

2021-03-12 15:29:10.923062:https://www.tmall.com 200

2021-03-12 15:29:10.940930:https://www.jd.com 200

3、map

map(self, fn, *iterables, timeout=None, chunksize=1):

fn:需要异步执行的函数

*iterables:可迭代对象

map 第二个参数是可迭代对象,比如 list、tuple 等,写法相对简单,它返回一个生成器,包含调用方法的返回值。map 方法也不会阻塞主线程的执行。

import requests,datetime,time

from concurrent import futures

def get_request(url):

r = requests.get(url)

print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))

# return 100

urls = ['https://www.baidu.com','https://www.tmall.com','https://www.jd.com']

pool = futures.ThreadPoolExecutor(max_workers = 2)

tasks = pool.map(get_request,urls)

# tasks是一个生成器,若调用方法有返回值,可用以下方法获取

# for result in tasks:

# print(result)

print('{}:主线程'.format(datetime.datetime.now()))

time.sleep(2)

# 输出结果

2021-03-12 16:14:04.854452:主线程

2021-03-12 16:14:04.938870:https://www.baidu.com 200

2021-03-12 16:14:05.033849:https://www.jd.com 200

2021-03-12 16:14:05.048952:https://www.tmall.com 200

4、wait

如果要等待子线程执行完之后再执行主线程要怎么办呢,可以通过 wait 。

wait(fs, timeout=None, return_when=ALL_COMPLETED):

fs:所有任务 tasks

return_when:有三个参数 FIRST_COMPLETED:只要有一个子线程完成则返回结果。 FIRST_EXCEPTION:只要有一个子线程抛异常则返回结果,若没有异常则等同于ALL_COMPLETED。 ALL_COMPLETED:默认参数,等待所有子线程完成。

import requests,datetime,time

from concurrent import futures

def get_request(url):

r = requests.get(url)

print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))

# return 100

urls = ['https://www.baidu.com','https://www.tmall.com','https://www.jd.com']

pool = futures.ThreadPoolExecutor(max_workers = 2)

tasks =[]

for url in urls:

task = pool.submit(get_request,url)

tasks.append(task)

futures.wait(tasks)

# 等待子线程完成后,获取调用方法返回值。

# for result in tasks:

# print(result.result())

print('{}:主线程'.format(datetime.datetime.now()))

time.sleep(2)

# 输出结果

2021-03-12 16:30:13.437042:https://www.baidu.com 200

2021-03-12 16:30:13.552700:https://www.jd.com 200

2021-03-12 16:30:14.117325:https://www.tmall.com 200

2021-03-12 16:30:14.118284:主线程

5、异常处理

as_completed(fs, timeout=None)

所有任务 tasks

使用 concurrent.futures 操作 多线程/多进程 过程中,很多函数报错并不会直接终止程序,而是什么都没发生。使用 as_completed 可以捕获异常,代码如下

import requests,datetime,time

from concurrent import futures

def get_request(url):

r = requests.get(url)

print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))

urls = ['www.baidu.com','https://www.tmall.com','https://www.jd.com']

# 创建线程池

pool = futures.ThreadPoolExecutor(max_workers = 2)

tasks =[]

for url in urls:

task = pool.submit(get_request,url)

tasks.append(task)

# 异常捕获

errors = futures.as_completed(tasks)

for error in errors:

# error.result() 等待子线程都完成,并抛出异常,中断主线程

# 捕获子线程异常,不会终止主线程继续运行

print(error.exception())

futures.wait(tasks)

print('{}:主线程'.format(datetime.datetime.now()))

time.sleep(2)

# 输出结果

Invalid URL 'www.baidu.com': No schema supplied. Perhaps you meant http://www.baidu.com?

2021-03-12 17:24:26.984933:https://www.tmall.com 200

None

2021-03-12 17:24:26.993939:https://www.jd.com 200

None

2021-03-12 17:24:26.994937:主线程

多进程编程也类似,将 ThreadPoolExecutor 替换成 ProcessPoolExecutor 。注意 多进程时加上 if __name=='__main__',和 ProcessPoolExecutor 使用的位置,否则会造成多进程循环调用,会报concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending 错误。

相关数据

男鞋十大品牌排行榜

男鞋十大品牌排行榜

综合榜 1)评选介绍:综合榜基于五个主要维度和七项加权项目进行评比。 2)算法简述:综合评分根据品牌行业等级、销量、好评率、品牌历史

09-27 天天365彩票软件官方下载3D
网咖系统管理软件哪个好 全面评测助您精准选择

网咖系统管理软件哪个好 全面评测助您精准选择

网咖系统管理软件哪个好 全面评测助您精准选择林睿诚·2025-06-17 00:49:42阅读5分钟已读1030次网咖系统管理软件哪个好?本文从功能完整性、稳定

07-23 天天365彩票软件官方下载3D
韩国综艺排行榜

韩国综艺排行榜

郑重声明: i韩迷韩剧所有播放资源均由机器人收集于互联网,旨在为韩剧爱好者方便查找韩剧,本站不参与任何影视资源制作与存储,不承担

07-11 mobile3656
热血传奇

热血传奇

本文目录导读: 1.传奇世界石墓阵走法 2.传奇石墓阵走法 3.热血传奇石墓阵走法 4.热血传奇石墓阵走法 5.传奇小巴双传势石墓阵走法 传奇世界石

07-17 天天365彩票软件官方下载3D
钓包头鲢鱼的都是什么饵料

钓包头鲢鱼的都是什么饵料

奢求726 发表于 2017-08-19 03:46:28 桶装料+鲢鳙香精1包,海杆,纺车轮,8.0尼龙线(8.0大力马),水怪,同心铅,太空豆,大肚漂。全套装备我都

07-05 电视直播网365
「屌你老母 / 𨳒你老母」

「屌你老母 / 𨳒你老母」

「屌你老母 / 𨳒你老母」 第 #1 條 寫法、讀音: 屌你老母 diu2 nei5 lou5 mou2 𨳒你老母 diu2 nei5 lou5 mou2 詞性: 語句 標籤: 粗俗 解釋: (廣東話)

11-01 电视直播网365