當前位置:首頁 » 編程語言 » python多進程循環

python多進程循環

發布時間: 2022-10-20 11:50:10

1. python的多進程模塊multiprocessing

眾所周知,Python中不存在真正的多線程,Python中的多線程是一個並發過程。如果想要並行的執行程序,充分的利用cpu資源(cpu核心),還是需要使用多進程解決的。其中multiprocessing模塊應該是Python中最常用的多進程模塊了。

基本上multiprocessing這個模塊和threading這個模塊用法是相同的,也是可以通過函數和類創建進程。

上述案例基本上就是筆者搬用了上篇文章多線程的案例,可見其使用的相似之處。導入multiprocessing後實例化Process就可以創建一個進程,參數的話也是和多線程一樣,target放置進程執行函數,args存放該函數的參數。

使用類來創建進程也是需要先繼承multiprocessing.Process並且實現其init方法。

Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求。

但如果池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,才會創建新的進程。

需要注意的是,在調用join方法阻塞進程前,需要先調用close方法,,否則程序會出錯。

在上述案例中,提到了非阻塞,當把創建進程的方法換為pool.apply(func, (msg,))時,就會阻塞進程,出現下面的狀況。

在multiprocessing模塊中還存在Queue對象,這是一個進程的安全隊列,近似queue.Queue。隊列一般也是需要配合多線程或者多進程使用。

下列案例是一個使用進程隊列實現的生產者消費者模式。

multiprocessing支持兩種進程間的通信,其中一種便是上述案例的隊列,另一種則稱作管道。在官方文檔的描述中,multiprocessing中的隊列是基於管道實現的,並且擁有更高的讀寫效率。

管道可以理解為進程間的通道,使用Pipe([plex])創建,並返回一個元組(conn1,conn2)。如果plex被置為True(默認值),那麼該管道是雙向的,如果plex被置為False,那麼該管道是單向的,即conn1隻能用於接收消息,而conn2僅能用於發送消息。

其中conn1、conn2表示管道兩端的連接對象,每個連接對象都有send()和recv()方法。send和recv方法分別是發送和接受消息的方法。例如,可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那麼recv方法會拋出EOFError。

關於multiprocessing模塊其實還有很多實用的類和方法,由於篇幅有限(懶),筆者就先寫到這里。該模塊其實用起來很像threading模塊,像鎖對象和守護線程(進程)等multiprocessing模塊也是有的,使用方法也近乎相同。

如果想要更加詳細的了解multiprocessing模塊,請參考官方文檔。

2. Python 多進程內存佔用問題

當我們有一個很長很長的任務隊列(mission_list)和閾值對應的一個處理函數(missionFunction)時,我們一般採用如下的方式進行處理:

但是,如果這任務列表很長很長,處理函數很復雜(佔用cpu)時,單核往往需要很長的時間進行處理,此時,Multiprocess便可以極大的提高我們程序的運行速度,相關內容請借鑒 multiprocessing --- 基於進程的並行 — Python 3.10.4 文檔。

以上這種場景下,推薦大家採用最簡單的進程池+map的方法進行處理,標準的寫法, chunksize要借鑒官方的說法,最好大一點

但是!!!! 如果我們的任務列表非常的長,這會導致多進程還沒跑起來之前,內存已經撐爆了,任務自然沒法完成,此時我們有幾種辦法進行優化:

進程的啟動方法有三種,可參考官方文檔:

[圖片上傳失敗...(image-48cd3c-1650511153989)]

linux環境下,使用forkserver可以節省很多的內存空間, 因為進程啟動的是一個服務,不會把主進程的數據全部復制

採用imap會極大的節省空間,它返回的是一個迭代器,也就是結果列表:

但注意,以上寫法中,你寫的結果迭代部分必須寫在with下面。或者採用另一種寫法:

還有最後一種,當你的mission list實在太大了,導致你在生成 mission list的時候已經把內存撐爆了,這個時候就得優化 mission_list了,如果你的mission_list是通過一個for循環生成的,你可以使用yield欄位,將其封裝為一個迭代器,傳入進程池:

這樣子,我們就封裝好了mission_list,它是一個可迭代對象,在取數據的時候才會將數據拉到內存

我在項目中結合了後兩種方法,原本256G的內存都不夠用,但在修改後內存只佔用了不到10G。希望能夠幫助到你

3. python怎麼多進程

需要借用庫,來進行多進程,
threading
可以去了解熟悉這個庫,這個可以實現多進程並發

4. python 多進程

基於官方文檔:
https://docs.python.org/zh-cn/3/library/multiprocessing.html
日樂購,剛才看到的一個博客,寫的都不太對,還是基於官方的比較穩妥
我就是喜歡抄官方的,哈哈

通常我們使用Process實例化一個進程,並調用 他的 start() 方法啟動它。
這種方法和 Thread 是一樣的。

上圖中,我寫了 p.join() 所以主進程是 等待 子進程執行完後,才執行 print("運行結束")
否則就是反過來了(這個不一定,看你的語句了,順序其實是隨機的)例如:

主進加個 sleep

所以不加join() ,其實子進程和主進程是各干各的,誰也不等誰。都執行完後,文件運行就結束了

上面我們用了 os.getpid() 和 os.getppid() 獲取 當前進程,和父進程的id
下面就講一下,這兩個函數的用法:
os.getpid()
返回當前進程的id
os.getppid()
返回父進程的id。 父進程退出後,unix 返回初始化進程(1)中的一個
windows返回相同的id (可能被其他進程使用了)
這也就解釋了,為啥我上面 的程序運行多次, 第一次列印的parentid 都是 14212 了。
而子進程的父級 process id 是調用他的那個進程的 id : 1940

視頻筆記:
多進程:使用大致方法:

參考: 進程通信(pipe和queue)

pool.map (函數可以有return 也可以共享內存或queue) 結果直接是個列表

poll.apply_async() (同map,只不過是一個進程,返回結果用 xx.get() 獲得)

報錯:

參考 : https://blog.csdn.net/xiemanR/article/details/71700531

把 pool = Pool() 放到 if name == " main ": 下面初始化搞定。
結果:

這個肯定有解釋的

測試多進程計算效果:
進程池運行:

結果:

普通計算:

我們同樣傳入 1 2 10 三個參數測試:

其實對比下來開始快了一半的;
我們把循環里的數字去掉一個 0;
單進程:

多進程:

兩次測試 單進程/進程池 分別為 0.669 和 0.772 幾乎成正比的。
問題 二:
視圖:
post 視圖裡面

Music 類:

直接報錯:

寫在 類裡面也 在函數里用 self.pool 調用也不行,也是相同的錯誤。

最後 把 pool = Pool 直接寫在 search 函數裡面,奇跡出現了:

前台也能顯示搜索的音樂結果了

總結一點,進程這個東西,最好 寫在 直接運行的函數裡面,而不是 一個函數跳來跳去。因為最後可能 是在子進程的子進程運行的,這是不許的,會報錯。
還有一點,多進程運行的函數對象,不能是 lambda 函數。也許lambda 虛擬,在內存??

使用 pool.map 子進程 函數報錯,導致整個 pool 掛了:
參考: https://blog.csdn.net/hedongho/article/details/79139606
主要你要,對函數內部捕獲錯誤,而不能讓異常拋出就可以了。

關於map 傳多個函數參數
我一開始,就是正常思維,多個參數,搞個元祖,讓參數一一對應不就行了:

報錯:

參考:
https://blog.csdn.net/qq_15969343/article/details/84672527
普通的 process 當讓可以穿多個參數,map 卻不知道咋傳的。
apply_async 和map 一樣,不知道咋傳的。

最簡單的方法:
使用 starmap 而不是 map

結果:
子進程結束
1.8399453163146973
成功拿到結果了

關於map 和 starmap 不同的地方看源碼

關於apply_async() ,我沒找到多參數的方法,大不了用 一個迭代的 starmap 實現。哈哈

關於 上面源碼裡面有 itertools.starmap
itertools 用法參考:
https://docs.python.org/zh-cn/3/library/itertools.html#itertool-functions

有個問題,多進程最好不要使用全部的 cpu , 因為這樣可能影響其他任務,所以 在進程池 添加 process 參數 指定,cpu 個數:

上面就是預留了 一個cpu 干其他事的

後面直接使用 Queue 遇到這個問題:

解決:
Manager().Queue() 代替 Queue()

因為 queue.get() 是堵塞型的,所以可以提前判斷是不是 空的,以免堵塞進程。比如下面這樣:
使用 queue.empty() 空為True

5. 如何使用Python實現多進程編程

1.Process
創建進程的類:Process([group[,target[,name[,args[,kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name為別名。group實質上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個進程。
屬性:authkey、daemon(要通過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。其中daemon是父進程終止後自動終止,且自己不能產生新進程,必須在start()之前設置。

例1.1:創建函數並將其作為單個進程
importmultiprocessing
importtime

defworker(interval):
n=5
whilen>0:
print("Thetimeis{0}".format(time.ctime()))
time.sleep(interval)
n-=1

if__name__=="__main__":
p=multiprocessing.Process(target=worker,args=(3,))
p.start()
print"p.pid:",p.pid
print"p.name:",p.name
print"p.is_alive:",p.is_alive()
結果
12345678p.pid:8736p.name:Process-1p.is_alive:TrueThetimeisTueApr2120:55:122015ThetimeisTueApr2120:55:152015ThetimeisTueApr2120:55:182015ThetimeisTueApr2120:55:212015ThetimeisTueApr2120:55:242015

例1.2:創建函數並將其作為多個進程
importmultiprocessing
importtime

defworker_1(interval):
print"worker_1"
time.sleep(interval)
print"endworker_1"

defworker_2(interval):
print"worker_2"
time.sleep(interval)
print"endworker_2"

defworker_3(interval):
print"worker_3"
time.sleep(interval)
print"endworker_3"

if__name__=="__main__":
p1=multiprocessing.Process(target=worker_1,args=(2,))
p2=multiprocessing.Process(target=worker_2,args=(3,))
p3=multiprocessing.Process(target=worker_3,args=(4,))

p1.start()
p2.start()
p3.start()

print("ThenumberofCPUis:"+str(multiprocessing.cpu_count()))
forpinmultiprocessing.active_children():
print("childp.name:"+p.name+" p.id"+str(p.pid))
print"END!!!!!!!!!!!!!!!!!"
結果
1234567891011ThenumberofCPUis:4childp.name:Process-3p.id7992childp.name:Process-2p.id4204childp.name:Process-1p.id6380END!!!!!!!!!!!!!!!!!worker_1worker_3worker_2endworker_1endworker_2endworker_3

例1.3:將進程定義為類
importmultiprocessing
importtime

classClockProcess(multiprocessing.Process):
def__init__(self,interval):
multiprocessing.Process.__init__(self)
self.interval=interval

defrun(self):
n=5
whilen>0:
print("thetimeis{0}".format(time.ctime()))
time.sleep(self.interval)
n-=1

if__name__=='__main__':
p=ClockProcess(3)
p.start()
註:進程p調用start()時,自動調用run()
結果
12345thetimeisTueApr2120:31:302015thetimeisTueApr2120:31:332015thetimeisTueApr2120:31:362015thetimeisTueApr2120:31:392015thetimeisTueApr2120:31:422015

6. python Windows下的多進程式控制制問題

windows的python多進程確實比較特殊,不過通過main入口是可以解決的,我平常都是這樣用。像下面這樣的結構

A文件:

importmultiprocessing


defmain():
p=multiprocessing.Process(target=work)
p.start()


defwork():
print('work')

B文件

importa

if__name__=='__main__':
a.main()

如果你的結構和我的一樣還是會發生循環調用的情況,那方便把關鍵結構的代碼貼一下嗎,我看一下哪裡的問題

7. python多進程為什麼一定要

前面講了為什麼Python里推薦用多進程而不是多線程,但是多進程也有其自己的限制:相比線程更加笨重、切換耗時更長,並且在python的多進程下,進程數量不推薦超過CPU核心數(一個進程只有一個GIL,所以一個進程只能跑滿一個CPU),因為一個進程佔用一個CPU時能充分利用機器的性能,但是進程多了就會出現頻繁的進程切換,反而得不償失。
不過特殊情況(特指IO密集型任務)下,多線程是比多進程好用的。
舉個例子:給你200W條url,需要你把每個url對應的頁面抓取保存起來,這種時候,單單使用多進程,效果肯定是很差的。為什麼呢?
例如每次請求的等待時間是2秒,那麼如下(忽略cpu計算時間):
1、單進程+單線程:需要2秒*200W=400W秒==1111.11個小時==46.3天,這個速度明顯是不能接受的2、單進程+多線程:例如我們在這個進程中開了10個多線程,比1中能夠提升10倍速度,也就是大約4.63天能夠完成200W條抓取,請注意,這里的實際執行是:線程1遇見了阻塞,CPU切換到線程2去執行,遇見阻塞又切換到線程3等等,10個線程都阻塞後,這個進程就阻塞了,而直到某個線程阻塞完成後,這個進程才能繼續執行,所以速度上提升大約能到10倍(這里忽略了線程切換帶來的開銷,實際上的提升應該是不能達到10倍的),但是需要考慮的是線程的切換也是有開銷的,所以不能無限的啟動多線程(開200W個線程肯定是不靠譜的)3、多進程+多線程:這里就厲害了,一般來說也有很多人用這個方法,多進程下,每個進程都能佔一個cpu,而多線程從一定程度上繞過了阻塞的等待,所以比單進程下的多線程又更好使了,例如我們開10個進程,每個進程里開20W個線程,執行的速度理論上是比單進程開200W個線程快10倍以上的(為什麼是10倍以上而不是10倍,主要是cpu切換200W個線程的消耗肯定比切換20W個進程大得多,考慮到這部分開銷,所以是10倍以上)。
還有更好的方法嗎?答案是肯定的,它就是:
4、協程,使用它之前我們先講講what/why/how(它是什麼/為什麼用它/怎麼使用它)what:
協程是一種用戶級的輕量級線程。協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
在並發編程中,協程與線程類似,每個協程表示一個執行單元,有自己的本地數據,與其它協程共享全局數據和其它資源。
why:
目前主流語言基本上都選擇了多線程作為並發設施,與線程相關的概念是搶占式多任務(Preemptive multitasking),而與協程相關的是協作式多任務。
不管是進程還是線程,每次阻塞、切換都需要陷入系統調用(system call),先讓CPU跑操作系統的調度程序,然後再由調度程序決定該跑哪一個進程(線程)。
而且由於搶占式調度執行順序無法確定的特點,使用線程時需要非常小心地處理同步問題,而協程完全不存在這個問題(事件驅動和非同步程序也有同樣的優點)。
因為協程是用戶自己來編寫調度邏輯的,對CPU來說,協程其實是單線程,所以CPU不用去考慮怎麼調度、切換上下文,這就省去了CPU的切換開銷,所以協程在一定程度上又好於多線程。
how:
python裡面怎麼使用協程?答案是使用gevent,使用方法:看這里使用協程,可以不受線程開銷的限制,我嘗試過一次把20W條url放在單進程的協程里執行,完全沒問題。
所以最推薦的方法,是多進程+協程(可以看作是每個進程里都是單線程,而這個單線程是協程化的)多進程+協程下,避開了CPU切換的開銷,又能把多個CPU充分利用起來,這種方式對於數據量較大的爬蟲還有文件讀寫之類的效率提升是巨大的。
小例子:
#-*- coding=utf-8 -*-
import requests
from multiprocessing import Process
import gevent
from gevent import monkey; monkey.patch_all()import sys
reload(sys)
sys.setdefaultencoding('utf8')
def fetch(url):
try:
s = requests.Session()
r = s.get(url,timeout=1)#在這里抓取頁面
except Exception,e:
print e
return ''
def process_start(tasks):
gevent.joinall(tasks)#使用協程來執行
def task_start(filepath,flag = 100000):#每10W條url啟動一個進程with open(filepath,'r') as reader:#從給定的文件中讀取urlurl = reader.readline().strip()
task_list = []#這個list用於存放協程任務
i = 0 #計數器,記錄添加了多少個url到協程隊列while url!='':
i += 1
task_list.append(gevent.spawn(fetch,url,queue))#每次讀取出url,將任務添加到協程隊列if i == flag:#一定數量的url就啟動一個進程並執行p = Process(target=process_start,args=(task_list,))p.start()
task_list = [] #重置協程隊列
i = 0 #重置計數器
url = reader.readline().strip()
if task_list not []:#若退出循環後任務隊列里還有url剩餘p = Process(target=process_start,args=(task_list,))#把剩餘的url全都放到最後這個進程來執行p.start()
if __name__ == '__main__':
task_start('./testData.txt')#讀取指定文件細心的同學會發現:上面的例子中隱藏了一個問題:進程的數量會隨著url數量的增加而不斷增加,我們在這里不使用進程池multiprocessing.Pool來控制進程數量的原因是multiprocessing.Pool和gevent有沖突不能同時使用,但是有興趣的同學可以研究一下gevent.pool這個協程池。
另外還有一個問題:每個進程處理的url是累積的而不是獨立的,例如第一個進程會處理10W個,第二個進程會變成20W個,以此類推。最後定位到問題是gevent.joinall()導致的問題,有興趣的同學可以研究一下為什麼會這樣。不過這個問題的處理方案是:主進程只負責讀取url然後寫入到list中,在創建子進程的時候直接把list傳給子進程,由子進程自己去構建協程。這樣就不會出現累加的問題

8. 如何多線程(多進程)加速while循環(語言-python)

import numpy as np
import os
import sys
import multiprocessing as mp
import time

def MCS(input_data, med):
#t1 = time.perf_counter()
left = 0
lp = 0

while True:
lp = lp + 1
data_pool = input_data + left
output_data = med * 0.05 * data_pool / (10000 + med)
output_data = np.where(output_data > data_pool, data_pool, output_data)
left = data_pool - output_data
cri = (input_data - output_data) / input_data * 100
#print(lp, data_pool, output_data, cri)
if cri <= 1:
break
t2 = time.perf_counter()
#print(f'Finished in {t2 - t1} seconds')

if __name__ == "__main__":
pool = mp.Pool(processes=5)
tasks = []
for i in np.linspace(0.4, 0.6, num = 10):
tasks.append([100, i])
t1 = time.perf_counter()
pool.starmap(MCS, tasks)
#pool.apply_async(MCS, args=(100, 0.4))
t2 = time.perf_counter()
#pool.join()
#pool.close()
for i in np.linspace(0.4, 0.6, num = 10):
MCS(100, i)
t3 = time.perf_counter()

print(f'Finished in {t2 - t1} seconds')
print(f'Finished in {t3 - t2} seconds')

原因可能是只運行了一個例子,
如圖測試了10個例子,測試結果如下
Finished in 15.062450630997773 seconds
Finished in 73.1936681799998 seconds
並行確實有一定的加速。

9. python 多進程讀取同一個循環處理、可以用multiprocessing

可以每個在func中加上一個參數data,data是這個線程處理的數據;

多線程處理的時候,給每個線程分配相應的data就可以了。


給個示例:

#-*-coding:utf-8-*-
importthread,threading
importtime

defFuncTest(tdata):
printtdata

classmythread(threading.Thread):
def__init__(self,threadname):
threading.Thread.__init__(self)

defrun(self):
lock.acquire()
FuncTest(ft)
lock.release()

defMutiThread(num):
threads=[]
i=0
globalft
forxinxrange(num):
threads.append(mythread(num))
fortinthreads:
time.sleep(0.5)
lock.acquire()
ft=GetThreadParam(datafile,num,i)
#print'[%s]Thread:%s,Testdata:%s'%(time.ctime(),t,ft)
i=i+1
t.start()
lock.release()
fortinthreads:
t.join()

defGetThreadParam(datafile,num,curthread):
#線程數需要小於文件行數
f=open(datafile,'r')
lines=f.readlines()
divres=divmod(len(lines),num)
ifcurthread<(num-1):
res=lines[curthread*divres[0]:(curthread+1)*divres[0]]
elifcurthread==(num-1):
res=lines[curthread*divres[0]:((curthread+1)*divres[0]+divres[1])]
returnres
f.close()

if__name__=='__main__':

globalnum,lock
datafile='a.txt'

num=3#num並發數

lock=threading.Lock()
MutiThread(num)

a.txt文件內容如下

1

2

3

4

5

6

7

8

9

10


3個線程並發時,運行結果:

>>>

['1 ', '2 ', '3 ']

['4 ', '5 ', '6 ']

['7 ', '8 ', '9 ', '10']

10. 一個for循環的Python腳本程序中如何加入多進程(並發進程)呢,急急急,在線等

簡單的如下


defps(i):
print(str(i))
defrun():
foriinrange(5):
Process(target=ps,args=(i,)).start()
if__name__=="__main__":
run()
熱點內容
隨機啟動腳本 發布:2025-07-05 16:10:30 瀏覽:515
微博資料庫設計 發布:2025-07-05 15:30:55 瀏覽:19
linux485 發布:2025-07-05 14:38:28 瀏覽:299
php用的軟體 發布:2025-07-05 14:06:22 瀏覽:750
沒有許可權訪問計算機 發布:2025-07-05 13:29:11 瀏覽:425
javaweb開發教程視頻教程 發布:2025-07-05 13:24:41 瀏覽:684
康師傅控流腳本破解 發布:2025-07-05 13:17:27 瀏覽:233
java的開發流程 發布:2025-07-05 12:45:11 瀏覽:678
怎麼看內存卡配置 發布:2025-07-05 12:29:19 瀏覽:277
訪問學者英文個人簡歷 發布:2025-07-05 12:29:17 瀏覽:828