python多进程循环
1. python的多进程模块multiprocessing
众所周知,Python中不存在真正的多线程,Python中的多线程是一个并发过程。如果想要并行的执行程序,充分的利用cpu资源(cpu核心),还是需要使用多进程解决的。其中multiprocessing模块应该是Python中最常用的多进程模块了。
基本上multiprocessing这个模块和threading这个模块用法是相同的,也是可以通过函数和类创建进程。
上述案例基本上就是笔者搬用了上篇文章多线程的案例,可见其使用的相似之处。导入multiprocessing后实例化Process就可以创建一个进程,参数的话也是和多线程一样,target放置进程执行函数,args存放该函数的参数。
使用类来创建进程也是需要先继承multiprocessing.Process并且实现其init方法。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求。
但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程。
需要注意的是,在调用join方法阻塞进程前,需要先调用close方法,,否则程序会出错。
在上述案例中,提到了非阻塞,当把创建进程的方法换为pool.apply(func, (msg,))时,就会阻塞进程,出现下面的状况。
在multiprocessing模块中还存在Queue对象,这是一个进程的安全队列,近似queue.Queue。队列一般也是需要配合多线程或者多进程使用。
下列案例是一个使用进程队列实现的生产者消费者模式。
multiprocessing支持两种进程间的通信,其中一种便是上述案例的队列,另一种则称作管道。在官方文档的描述中,multiprocessing中的队列是基于管道实现的,并且拥有更高的读写效率。
管道可以理解为进程间的通道,使用Pipe([plex])创建,并返回一个元组(conn1,conn2)。如果plex被置为True(默认值),那么该管道是双向的,如果plex被置为False,那么该管道是单向的,即conn1只能用于接收消息,而conn2仅能用于发送消息。
其中conn1、conn2表示管道两端的连接对象,每个连接对象都有send()和recv()方法。send和recv方法分别是发送和接受消息的方法。例如,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。
关于multiprocessing模块其实还有很多实用的类和方法,由于篇幅有限(懒),笔者就先写到这里。该模块其实用起来很像threading模块,像锁对象和守护线程(进程)等multiprocessing模块也是有的,使用方法也近乎相同。
如果想要更加详细的了解multiprocessing模块,请参考官方文档。
2. Python 多进程内存占用问题
当我们有一个很长很长的任务队列(mission_list)和阈值对应的一个处理函数(missionFunction)时,我们一般采用如下的方式进行处理:
但是,如果这任务列表很长很长,处理函数很复杂(占用cpu)时,单核往往需要很长的时间进行处理,此时,Multiprocess便可以极大的提高我们程序的运行速度,相关内容请借鉴 multiprocessing --- 基于进程的并行 — Python 3.10.4 文档。
以上这种场景下,推荐大家采用最简单的进程池+map的方法进行处理,标准的写法, chunksize要借鉴官方的说法,最好大一点 :
但是!!!! 如果我们的任务列表非常的长,这会导致多进程还没跑起来之前,内存已经撑爆了,任务自然没法完成,此时我们有几种办法进行优化:
进程的启动方法有三种,可参考官方文档:
[图片上传失败...(image-48cd3c-1650511153989)]
在linux环境下,使用forkserver可以节省很多的内存空间, 因为进程启动的是一个服务,不会把主进程的数据全部复制
采用imap会极大的节省空间,它返回的是一个迭代器,也就是结果列表:
但注意,以上写法中,你写的结果迭代部分必须写在with下面。或者采用另一种写法:
还有最后一种,当你的mission list实在太大了,导致你在生成 mission list的时候已经把内存撑爆了,这个时候就得优化 mission_list了,如果你的mission_list是通过一个for循环生成的,你可以使用yield字段,将其封装为一个迭代器,传入进程池:
这样子,我们就封装好了mission_list,它是一个可迭代对象,在取数据的时候才会将数据拉到内存
我在项目中结合了后两种方法,原本256G的内存都不够用,但在修改后内存只占用了不到10G。希望能够帮助到你
3. python怎么多进程
需要借用库,来进行多进程,
threading
可以去了解熟悉这个库,这个可以实现多进程并发
4. python 多进程
基于官方文档:
https://docs.python.org/zh-cn/3/library/multiprocessing.html
日乐购,刚才看到的一个博客,写的都不太对,还是基于官方的比较稳妥
我就是喜欢抄官方的,哈哈
通常我们使用Process实例化一个进程,并调用 他的 start() 方法启动它。
这种方法和 Thread 是一样的。
上图中,我写了 p.join() 所以主进程是 等待 子进程执行完后,才执行 print("运行结束")
否则就是反过来了(这个不一定,看你的语句了,顺序其实是随机的)例如:
主进加个 sleep
所以不加join() ,其实子进程和主进程是各干各的,谁也不等谁。都执行完后,文件运行就结束了
上面我们用了 os.getpid() 和 os.getppid() 获取 当前进程,和父进程的id
下面就讲一下,这两个函数的用法:
os.getpid()
返回当前进程的id
os.getppid()
返回父进程的id。 父进程退出后,unix 返回初始化进程(1)中的一个
windows返回相同的id (可能被其他进程使用了)
这也就解释了,为啥我上面 的程序运行多次, 第一次打印的parentid 都是 14212 了。
而子进程的父级 process id 是调用他的那个进程的 id : 1940
视频笔记:
多进程:使用大致方法:
参考: 进程通信(pipe和queue)
pool.map (函数可以有return 也可以共享内存或queue) 结果直接是个列表
poll.apply_async() (同map,只不过是一个进程,返回结果用 xx.get() 获得)
报错:
参考 : https://blog.csdn.net/xiemanR/article/details/71700531
把 pool = Pool() 放到 if name == " main ": 下面初始化搞定。
结果:
这个肯定有解释的
测试多进程计算效果:
进程池运行:
结果:
普通计算:
我们同样传入 1 2 10 三个参数测试:
其实对比下来开始快了一半的;
我们把循环里的数字去掉一个 0;
单进程:
多进程:
两次测试 单进程/进程池 分别为 0.669 和 0.772 几乎成正比的。
问题 二:
视图:
post 视图里面
Music 类:
直接报错:
写在 类里面也 在函数里用 self.pool 调用也不行,也是相同的错误。
最后 把 pool = Pool 直接写在 search 函数里面,奇迹出现了:
前台也能显示搜索的音乐结果了
总结一点,进程这个东西,最好 写在 直接运行的函数里面,而不是 一个函数跳来跳去。因为最后可能 是在子进程的子进程运行的,这是不许的,会报错。
还有一点,多进程运行的函数对象,不能是 lambda 函数。也许lambda 虚拟,在内存??
使用 pool.map 子进程 函数报错,导致整个 pool 挂了:
参考: https://blog.csdn.net/hedongho/article/details/79139606
主要你要,对函数内部捕获错误,而不能让异常抛出就可以了。
关于map 传多个函数参数
我一开始,就是正常思维,多个参数,搞个元祖,让参数一一对应不就行了:
报错:
参考:
https://blog.csdn.net/qq_15969343/article/details/84672527
普通的 process 当让可以穿多个参数,map 却不知道咋传的。
apply_async 和map 一样,不知道咋传的。
最简单的方法:
使用 starmap 而不是 map
结果:
子进程结束
1.8399453163146973
成功拿到结果了
关于map 和 starmap 不同的地方看源码:
关于apply_async() ,我没找到多参数的方法,大不了用 一个迭代的 starmap 实现。哈哈
关于 上面源码里面有 itertools.starmap
itertools 用法参考:
https://docs.python.org/zh-cn/3/library/itertools.html#itertool-functions
有个问题,多进程最好不要使用全部的 cpu , 因为这样可能影响其他任务,所以 在进程池 添加 process 参数 指定,cpu 个数:
上面就是预留了 一个cpu 干其他事的
后面直接使用 Queue 遇到这个问题:
解决:
Manager().Queue() 代替 Queue()
因为 queue.get() 是堵塞型的,所以可以提前判断是不是 空的,以免堵塞进程。比如下面这样:
使用 queue.empty() 空为True
5. 如何使用Python实现多进程编程
1.Process
创建进程的类:Process([group[,target[,name[,args[,kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。
属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。
例1.1:创建函数并将其作为单个进程
importmultiprocessing
importtime
defworker(interval):
n=5
whilen>0:
print("Thetimeis{0}".format(time.ctime()))
time.sleep(interval)
n-=1
if__name__=="__main__":
p=multiprocessing.Process(target=worker,args=(3,))
p.start()
print"p.pid:",p.pid
print"p.name:",p.name
print"p.is_alive:",p.is_alive()
结果
12345678p.pid:8736p.name:Process-1p.is_alive:TrueThetimeisTueApr2120:55:122015ThetimeisTueApr2120:55:152015ThetimeisTueApr2120:55:182015ThetimeisTueApr2120:55:212015ThetimeisTueApr2120:55:242015
例1.2:创建函数并将其作为多个进程
importmultiprocessing
importtime
defworker_1(interval):
print"worker_1"
time.sleep(interval)
print"endworker_1"
defworker_2(interval):
print"worker_2"
time.sleep(interval)
print"endworker_2"
defworker_3(interval):
print"worker_3"
time.sleep(interval)
print"endworker_3"
if__name__=="__main__":
p1=multiprocessing.Process(target=worker_1,args=(2,))
p2=multiprocessing.Process(target=worker_2,args=(3,))
p3=multiprocessing.Process(target=worker_3,args=(4,))
p1.start()
p2.start()
p3.start()
print("ThenumberofCPUis:"+str(multiprocessing.cpu_count()))
forpinmultiprocessing.active_children():
print("childp.name:"+p.name+" p.id"+str(p.pid))
print"END!!!!!!!!!!!!!!!!!"
结果
1234567891011ThenumberofCPUis:4childp.name:Process-3p.id7992childp.name:Process-2p.id4204childp.name:Process-1p.id6380END!!!!!!!!!!!!!!!!!worker_1worker_3worker_2endworker_1endworker_2endworker_3
例1.3:将进程定义为类
importmultiprocessing
importtime
classClockProcess(multiprocessing.Process):
def__init__(self,interval):
multiprocessing.Process.__init__(self)
self.interval=interval
defrun(self):
n=5
whilen>0:
print("thetimeis{0}".format(time.ctime()))
time.sleep(self.interval)
n-=1
if__name__=='__main__':
p=ClockProcess(3)
p.start()
注:进程p调用start()时,自动调用run()
结果
12345thetimeisTueApr2120:31:302015thetimeisTueApr2120:31:332015thetimeisTueApr2120:31:362015thetimeisTueApr2120:31:392015thetimeisTueApr2120:31:422015
6. python Windows下的多进程控制问题
windows的python多进程确实比较特殊,不过通过main入口是可以解决的,我平常都是这样用。像下面这样的结构
A文件:
importmultiprocessing
defmain():
p=multiprocessing.Process(target=work)
p.start()
defwork():
print('work')
B文件
importa
if__name__=='__main__':
a.main()
如果你的结构和我的一样还是会发生循环调用的情况,那方便把关键结构的代码贴一下吗,我看一下哪里的问题
7. python多进程为什么一定要
前面讲了为什么Python里推荐用多进程而不是多线程,但是多进程也有其自己的限制:相比线程更加笨重、切换耗时更长,并且在python的多进程下,进程数量不推荐超过CPU核心数(一个进程只有一个GIL,所以一个进程只能跑满一个CPU),因为一个进程占用一个CPU时能充分利用机器的性能,但是进程多了就会出现频繁的进程切换,反而得不偿失。
不过特殊情况(特指IO密集型任务)下,多线程是比多进程好用的。
举个例子:给你200W条url,需要你把每个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果肯定是很差的。为什么呢?
例如每次请求的等待时间是2秒,那么如下(忽略cpu计算时间):
1、单进程+单线程:需要2秒*200W=400W秒==1111.11个小时==46.3天,这个速度明显是不能接受的2、单进程+多线程:例如我们在这个进程中开了10个多线程,比1中能够提升10倍速度,也就是大约4.63天能够完成200W条抓取,请注意,这里的实际执行是:线程1遇见了阻塞,CPU切换到线程2去执行,遇见阻塞又切换到线程3等等,10个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,所以速度上提升大约能到10倍(这里忽略了线程切换带来的开销,实际上的提升应该是不能达到10倍的),但是需要考虑的是线程的切换也是有开销的,所以不能无限的启动多线程(开200W个线程肯定是不靠谱的)3、多进程+多线程:这里就厉害了,一般来说也有很多人用这个方法,多进程下,每个进程都能占一个cpu,而多线程从一定程度上绕过了阻塞的等待,所以比单进程下的多线程又更好使了,例如我们开10个进程,每个进程里开20W个线程,执行的速度理论上是比单进程开200W个线程快10倍以上的(为什么是10倍以上而不是10倍,主要是cpu切换200W个线程的消耗肯定比切换20W个进程大得多,考虑到这部分开销,所以是10倍以上)。
还有更好的方法吗?答案是肯定的,它就是:
4、协程,使用它之前我们先讲讲what/why/how(它是什么/为什么用它/怎么使用它)what:
协程是一种用户级的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。
why:
目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。
不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。
而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。
因为协程是用户自己来编写调度逻辑的,对CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。
how:
python里面怎么使用协程?答案是使用gevent,使用方法:看这里使用协程,可以不受线程开销的限制,我尝试过一次把20W条url放在单进程的协程里执行,完全没问题。
所以最推荐的方法,是多进程+协程(可以看作是每个进程里都是单线程,而这个单线程是协程化的)多进程+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,这种方式对于数据量较大的爬虫还有文件读写之类的效率提升是巨大的。
小例子:
#-*- coding=utf-8 -*-
import requests
from multiprocessing import Process
import gevent
from gevent import monkey; monkey.patch_all()import sys
reload(sys)
sys.setdefaultencoding('utf8')
def fetch(url):
try:
s = requests.Session()
r = s.get(url,timeout=1)#在这里抓取页面
except Exception,e:
print e
return ''
def process_start(tasks):
gevent.joinall(tasks)#使用协程来执行
def task_start(filepath,flag = 100000):#每10W条url启动一个进程with open(filepath,'r') as reader:#从给定的文件中读取urlurl = reader.readline().strip()
task_list = []#这个list用于存放协程任务
i = 0 #计数器,记录添加了多少个url到协程队列while url!='':
i += 1
task_list.append(gevent.spawn(fetch,url,queue))#每次读取出url,将任务添加到协程队列if i == flag:#一定数量的url就启动一个进程并执行p = Process(target=process_start,args=(task_list,))p.start()
task_list = [] #重置协程队列
i = 0 #重置计数器
url = reader.readline().strip()
if task_list not []:#若退出循环后任务队列里还有url剩余p = Process(target=process_start,args=(task_list,))#把剩余的url全都放到最后这个进程来执行p.start()
if __name__ == '__main__':
task_start('./testData.txt')#读取指定文件细心的同学会发现:上面的例子中隐藏了一个问题:进程的数量会随着url数量的增加而不断增加,我们在这里不使用进程池multiprocessing.Pool来控制进程数量的原因是multiprocessing.Pool和gevent有冲突不能同时使用,但是有兴趣的同学可以研究一下gevent.pool这个协程池。
另外还有一个问题:每个进程处理的url是累积的而不是独立的,例如第一个进程会处理10W个,第二个进程会变成20W个,以此类推。最后定位到问题是gevent.joinall()导致的问题,有兴趣的同学可以研究一下为什么会这样。不过这个问题的处理方案是:主进程只负责读取url然后写入到list中,在创建子进程的时候直接把list传给子进程,由子进程自己去构建协程。这样就不会出现累加的问题
8. 如何多线程(多进程)加速while循环(语言-python)
import numpy as np
import os
import sys
import multiprocessing as mp
import time
def MCS(input_data, med):
#t1 = time.perf_counter()
left = 0
lp = 0
while True:
lp = lp + 1
data_pool = input_data + left
output_data = med * 0.05 * data_pool / (10000 + med)
output_data = np.where(output_data > data_pool, data_pool, output_data)
left = data_pool - output_data
cri = (input_data - output_data) / input_data * 100
#print(lp, data_pool, output_data, cri)
if cri <= 1:
break
t2 = time.perf_counter()
#print(f'Finished in {t2 - t1} seconds')
if __name__ == "__main__":
pool = mp.Pool(processes=5)
tasks = []
for i in np.linspace(0.4, 0.6, num = 10):
tasks.append([100, i])
t1 = time.perf_counter()
pool.starmap(MCS, tasks)
#pool.apply_async(MCS, args=(100, 0.4))
t2 = time.perf_counter()
#pool.join()
#pool.close()
for i in np.linspace(0.4, 0.6, num = 10):
MCS(100, i)
t3 = time.perf_counter()
print(f'Finished in {t2 - t1} seconds')
print(f'Finished in {t3 - t2} seconds')
原因可能是只运行了一个例子,
如图测试了10个例子,测试结果如下
Finished in 15.062450630997773 seconds
Finished in 73.1936681799998 seconds
并行确实有一定的加速。
9. python 多进程读取同一个循环处理、可以用multiprocessing
可以每个在func中加上一个参数data,data是这个线程处理的数据;
多线程处理的时候,给每个线程分配相应的data就可以了。
给个示例:
#-*-coding:utf-8-*-
importthread,threading
importtime
defFuncTest(tdata):
printtdata
classmythread(threading.Thread):
def__init__(self,threadname):
threading.Thread.__init__(self)
defrun(self):
lock.acquire()
FuncTest(ft)
lock.release()
defMutiThread(num):
threads=[]
i=0
globalft
forxinxrange(num):
threads.append(mythread(num))
fortinthreads:
time.sleep(0.5)
lock.acquire()
ft=GetThreadParam(datafile,num,i)
#print'[%s]Thread:%s,Testdata:%s'%(time.ctime(),t,ft)
i=i+1
t.start()
lock.release()
fortinthreads:
t.join()
defGetThreadParam(datafile,num,curthread):
#线程数需要小于文件行数
f=open(datafile,'r')
lines=f.readlines()
divres=divmod(len(lines),num)
ifcurthread<(num-1):
res=lines[curthread*divres[0]:(curthread+1)*divres[0]]
elifcurthread==(num-1):
res=lines[curthread*divres[0]:((curthread+1)*divres[0]+divres[1])]
returnres
f.close()
if__name__=='__main__':
globalnum,lock
datafile='a.txt'
num=3#num并发数
lock=threading.Lock()
MutiThread(num)
a.txt文件内容如下
1
2
3
4
5
6
7
8
9
10
3个线程并发时,运行结果:
>>>
['1 ', '2 ', '3 ']
['4 ', '5 ', '6 ']
['7 ', '8 ', '9 ', '10']
10. 一个for循环的Python脚本程序中如何加入多进程(并发进程)呢,急急急,在线等
简单的如下
defps(i):
print(str(i))
defrun():
foriinrange(5):
Process(target=ps,args=(i,)).start()
if__name__=="__main__":
run()