當前位置:首頁 » 編程語言 » pythonqueue多線程

pythonqueue多線程

發布時間: 2023-01-17 21:21:29

python 多線程

python支持多線程效果還不錯,很多方面都用到了python 多線程的知識,我前段時間用python 多線程寫了個處理生產者和消費者的問題,把代碼貼出來給你看下:
#encoding=utf-8
import threading
import random
import time
from Queue import Queue

class Procer(threading.Thread):

def __init__(self, threadname, queue):
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue

def run(self):
for i in range(20):
print self.getName(),'adding',i,'to queue'
self.sharedata.put(i)
time.sleep(random.randrange(10)/10.0)
print self.getName(),'Finished'

# Consumer thread

class Consumer(threading.Thread):

def __init__(self, threadname, queue):
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue

def run(self):

for i in range(20):
print self.getName(),'got a value:',self.sharedata.get()
time.sleep(random.randrange(10)/10.0)
print self.getName(),'Finished'

# Main thread

def main():

queue = Queue()
procer = Procer('Procer', queue)
consumer = Consumer('Consumer', queue)
print 'Starting threads ...'
procer.start()
consumer.start()
procer.join()
consumer.join()
print 'All threads have terminated.'
if __name__ == '__main__':
main()

如果你想要了解更多的python 多線程知識可以點下面的參考資料的地址,希望對有幫助!

㈡ python queue 為什麼線程安全

Queue模塊提供了一個適用於多線程編程的先進先出數據結構,可以用來安全的傳遞多線程信息。
它本身就是線程安全的,使用put和get來處理數據,不會產生對一個數據同時讀寫的問題,所以是安全的。

㈢ Python實現簡單多線程任務隊列

Python實現簡單多線程任務隊列
最近我在用梯度下降演算法繪制神經網路的數據時,遇到了一些演算法性能的問題。梯度下降演算法的代碼如下(偽代碼):
defgradient_descent(): # the gradient descent code plotly.write(X, Y)
一般來說,當網路請求 plot.ly 繪圖時會阻塞等待返回,於是也會影響到其他的梯度下降函數的執行速度。
一種解決辦法是每調用一次 plotly.write 函數就開啟一個新的線程,但是這種方法感覺不是很好。 我不想用一個像 cerely(一種分布式任務隊列)一樣大而全的任務隊列框架,因為框架對於我的這點需求來說太重了,並且我的繪圖也並不需要 redis 來持久化數據。
那用什麼辦法解決呢?我在 python 中寫了一個很小的任務隊列,它可以在一個單獨的線程中調用 plotly.write函數。下面是程序代碼。
classTaskQueue(Queue.Queue):
首先我們繼承 Queue.Queue 類。從 Queue.Queue 類可以繼承 get 和 put 方法,以及隊列的行為。
def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()
初始化的時候,我們可以不用考慮工作線程的數量。
defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))
我們把 task, args, kwargs 以元組的形式存儲在隊列中。*args 可以傳遞數量不等的參數,**kwargs 可以傳遞命名參數。
defstart_workers(self): foriinrange(self.num_workers): t=Thread(target=self.worker) t.daemon=True t.start()
我們為每個 worker 創建一個線程,然後在後台刪除。
下面是 worker 函數的代碼:
defworker(self): whileTrue: tupl=self.get() item, args, kwargs=self.get() item(*args,**kwargs) self.task_done()
worker 函數獲取隊列頂端的任務,並根據輸入參數運行,除此之外,沒有其他的功能。下面是隊列的代碼:
我們可以通過下面的代碼測試:
defblokkah(*args,**kwargs): time.sleep(5) print「Blokkah mofo!」 q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print「Alldone!」
Blokkah 是我們要做的任務名稱。隊列已經緩存在內存中,並且沒有執行很多任務。下面的步驟是把主隊列當做單獨的進程來運行,這樣主程序退出以及執行資料庫持久化時,隊列任務不會停止運行。但是這個例子很好地展示了如何從一個很簡單的小任務寫成像工作隊列這樣復雜的程序。
defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)
修改之後,我的梯度下降演算法工作效率似乎更高了。如果你很感興趣的話,可以參考下面的代碼。 classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print"Blokkah mofo!" q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint"All done!" if__name__=="__main__":tests()

㈣ 多線程和隊列

1、python提供兩種方式使用多線程:一個是基於函數:_thread模塊或者threading模塊。一個是基於類:theading.Thread

使用多線程函數包裝線程對象:_thread

_thead.start_new_thead(func,*args,**kwargs)

args,**kwargs是被包裝函數的入參,必須傳入元祖或字典

使用多線程函數包裝線程對象:threading

threading._start_new_thread(func,*args,**kwargs):開啟線程,帶元祖或字典

threading.currentThread():返回當前線程變數

threading.enumerate():正在運行的線程列表,不含未啟動和已結束線程

threading.activeCount():返回正在運行的線程數量

threading.settrace(func):為所有threading模塊啟動的線程設置追蹤函數,在調用run方法之前,func會被傳給追蹤函數

threading.setprofile(func):為所有threading模塊啟動的線程設置性能測試函數,也是在run方法調用前就傳遞給性能測試函數

使用多線程類包裝線程對象:threading.Thread

Thread類提供以下方法:

run():表示線程活動的方法,線程需要控制些什麼活動都在這裡面定義。當線程對象一但被創建,其活動一定會因調用線程的 start() 方法開始。這會在獨立的控制線程調用 run() 方法。

start():開啟線程活動

join():等待線程中止,阻塞當前線程直到被調用join方法的線程中止。線程A調用線程B的join方法,那線程A將會被阻塞至線程B中止。

isAlive():返回線程是否還活動

getName():獲取線程名字

setName():設置線程名字

Lock對象:實例化線程鎖,包含acquire方法獲取鎖 和 release 方法釋放鎖,在最開始創建鎖的時候,鎖為未鎖定狀態,調用acquire方法後鎖置為鎖定狀態,此時其他線程再調用acquire方法就將會被阻塞至其他線程調用release方法釋放鎖,如果釋放一個並未被鎖定的鎖將會拋出異常。支持上下文管理協議,直接with lock 無需調用鎖定,釋放方法

Rlock對象:重入鎖,相比lock增加了線程和遞歸的概念。比如:線程目標函數F,在獲得鎖之後執行函數G,但函數G也需要先獲得鎖,此時同一線程,F獲得鎖,G等待,F等待G執行,就造成了死鎖,此時使用rlock可避免。一旦線程獲得了重入鎖,同一個線程再次獲取它將不阻塞;但線程必須在每次獲取它時釋放一次。

daemon屬性:設置該線程是否是守護線程,默認為none,需要在調用start方法之前設置好

事件對象:一個線程發出事件信號 ,其他線程收到信號後作出對應活動。實例化事件對象後,初始事件標志為flase。調用其wait方法將阻塞當前所屬線程,至事件標志為true時。調用set方法可將事件標志置為true,被阻塞的線程將被執行。調用clear方法可將事件標志置為flase

注意點:

1、繼承threading.Thread類,初始化時要記得繼承父類的__init__方法

2、run()方法只能有一個入參,故盡量把啟動線程時的參數入參到初始化的時候

3、鎖要設定全局的,一個子線程獲得一個鎖沒有意義

以下實例:有一個列表,線程A從尾到頭遍歷元素,線程B從頭到尾將元素值重置為1,設置線程鎖之前線程A遍歷到頭部的數據已經被修改,設置線程鎖之後不會再有數據不一致的情況

import threading,time

class tt(threading.Thread):

    def __init__(self,name,func,ll):

        threading.Thread.__init__(self) #繼承父級的初始化方法

        self.name=name

        self.func=func  #run方法只能帶一個入參,故把方法入參到初始化的時候

        self.ll=ll

    def run(self):

        print(self.name)

        threadlock.acquire() #獲得鎖

        self.func(self.ll)

        threadlock.release() #釋放鎖

def readd(x):

    a=len(x)

    while a>0:

        print(x[a-1])

        a-=1

def sett(x):

    for i in range(len(x)):

        x[i]=1

    print(x)

if __name__=="__main__":

    l = [0,0,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]

    threadlock=threading.Lock() #實例化全局鎖

    th1=tt("read",readd,l)

    th2=tt("set",sett,l)

    th1.start()

    th2.start()

    th_list=[] 

    th_list.append(th1)

    th_list.append(th2)

    for li in th_list:

        li.join()        #主線程被阻塞,直到兩個子線程處理結束

    print("主線程結束")

2、隊列

queue模塊包含queue.Queue(maxsize=0)先入先出隊列,queue.LifoQueue()先入後出隊列,和queue.PriorityQueue()優先順序可設置的隊列

Queue 模塊中的常用方法:

Queue.qsize() 返回隊列的大小,獲取的數據不可靠,因為一直有線程在操作隊列,數據一直變化

Queue.empty() 如果隊列為空,返回True,反之False

Queue.full() 如果隊列滿了,返回True,反之False

Queue.full 與 maxsize 大小對應

Queue.put(block=true,timeout=none) 將item數據寫入隊列,block=True,設置線程是否阻塞,設置阻塞當隊列數據滿了之後就會阻塞,一直到隊列數據不滿時繼續添加,如果設置不阻塞,當隊列滿了就會一直到timeout到後報錯

Queue.get([block[, timeout]]) 取出隊列數據,block=True,設置線程是否阻塞。設置阻塞,將會等待直到隊列不為空有數據可取出,設置不阻塞直到超過timeout等待時間後報錯

Queue.task_done() 在完成一項工作之後,Queue.task_done()函數向任務已經完成的隊列發送一個信號

Queue.join() 實際上意味著等到隊列為空,再執行別的操作。會在隊列有未完成時阻塞,等待隊列無未完成的任務,取出數據get()之後還需要配置task_done使用才能讓等待隊列數-1

import queue,time

import threading

q=queue.Queue(maxsize=5)

def sett():

    a=0

    while a<20:

        q.put(a,True)

        print("%d被put"%a)

        a+=1

def gett():

    time.sleep(1)

    while not q.empty(): #只要隊列沒空,一直取數據

        print("%d被取出"%q.get(True))

        q.task_done() #取出一次數據,將未完成任務-1,不然使用join方法線程會一直阻塞

if __name__=="__main__":

    th1=threading._start_new_thread(sett,()) #不帶參數也要傳入空元祖不然會報錯

    th2=threading._start_new_thread(gett,())

    time.sleep(1) #延時主線程1S,等待put線程已經put部分數據到隊列

    q.join()#阻塞主線程,直到未完成任務為0

㈤ python多線程並發數量控制

python多線程如果不進行並發數量控制,在啟動線程數量多到一定程度後,會造成線程無法啟動的錯誤。
控制多線程並發數量的方法有好幾鍾,下面介紹用queue控制多線程並發數量的方法。python3

㈥ python3沒有queue模塊嗎

有的,直接使用就可以了。
import queue
lr = queue.Queue()

㈦ 關於python多進程使用(Queue、生產者和消費者)

關於 的生產者和消費者的實現,剛好最近有用到,簡單總結記錄下:

是系統獨立調度核分配系統資源(CPU、內存)的基本單位,進程之間是相互獨立的,每啟動一個新的進程相當於把數據進行了一次克隆。
python提供了多種方法實現了多進程中間的 (可以修改同一份數據)。

GIL 的全稱是 Global Interpreter Lock(全局解釋器鎖),來源是 Python 設計之初的考慮,為了數據安全所做的決定。
某個線程想要執行,必須先拿到 GIL,我們可以把 GIL 看作是「通行證」,並且在一個 Python 進程中,GIL 只有一個,這就導致了多線程搶佔GIL耗時。這就是為什麼在多核CPU上,Python 的多線程效率並不高的根本原因。
所以有必要學習下多進程的使用。

㈧ python queue是多線程么

是的。pythonqueue主要就是為多線程生產值、消費者之間線程通信提供服務,具有先進先出的數據結構。

㈨ Python Queue 入門

Queue 叫隊列,是數據結構中的一種,基本上所有成熟的編程語言都內置了對 Queue 的支持。

Python 中的 Queue 模塊實現了多生產者和多消費者模型,當需要在多線程編程中非常實用。而且該模塊中的 Queue 類實現了鎖原語,不需要再考慮多線程安全問題。

該模塊內置了三種類型的 Queue,分別是 class queue.Queue(maxsize=0) , class queue.LifoQueue(maxsize=0) 和 class queue.PriorityQueue(maxsize=0) 。它們三個的區別僅僅是取出時的順序不一致而已。

Queue 是一個 FIFO 隊列,任務按照添加的順序被取出。

LifoQueue 是一個 LIFO 隊列,類似堆棧,後添加的任務先被取出。

PriorityQueue 是一個優先順序隊列,隊列裡面的任務按照優先順序排序,優先順序高的先被取出。

如你所見,就是上面所說的三種不同類型的內置隊列,其中 maxsize 是個整數,用於設置可以放入隊列中的任務數的上限。當達到這個大小的時候,插入操作將阻塞至隊列中的任務被消費掉。如果 maxsize 小於等於零,則隊列尺寸為無限大。

向隊列中添加任務,直接調用 put() 函數即可

put() 函數完整的函數簽名如下 Queue.put(item, block=True, timeout=None) ,如你所見,該函數有兩個可選參數。

默認情況下,在隊列滿時,該函數會一直阻塞,直到隊列中有空餘的位置可以添加任務為止。如果 timeout 是正數,則最多阻塞 timeout 秒,如果這段時間內還沒有空餘的位置出來,則會引發 Full 異常。

當 block 為 false 時,timeout 參數將失效。同時如果隊列中沒有空餘的位置可添加任務則會引發 Full 異常,否則會直接把任務放入隊列並返回,不會阻塞。

另外,還可以通過 Queue.put_nowait(item) 來添加任務,相當於 Queue.put(item, False) ,不再贅述。同樣,在隊列滿時,該操作會引發 Full 異常。

從隊列中獲取任務,直接調用 get() 函數即可。

與 put() 函數一樣, get() 函數也有兩個可選參數,完整簽名如下 Queue.get(block=True, timeout=None) 。

默認情況下,當隊列空時調用該函數會一直阻塞,直到隊列中有任務可獲取為止。如果 timeout 是正數,則最多阻塞 timeout 秒,如果這段時間內還沒有任務可獲取,則會引發 Empty 異常。

當 block 為 false 時,timeout 參數將失效。同時如果隊列中沒有任務可獲取則會立刻引發 Empty 異常,否則會直接獲取一個任務並返回,不會阻塞。

另外,還可以通過 Queue.get_nowait() 來獲取任務,相當於 Queue.get(False) ,不再贅述。同樣,在隊列為空時,該操作會引發 Empty 異常。

Queue.qsize() 函數返回隊列的大小。注意這個大小不是精確的,qsize() > 0 不保證後續的 get() 不被阻塞,同樣 qsize() < maxsize 也不保證 put() 不被阻塞。

如果隊列為空,返回 True ,否則返回 False 。如果 empty() 返回 True ,不保證後續調用的 put() 不被阻塞。類似的,如果 empty() 返回 False ,也不保證後續調用的 get() 不被阻塞。

如果隊列是滿的返回 True ,否則返回 False 。如果 full() 返回 True 不保證後續調用的 get() 不被阻塞。類似的,如果 full() 返回 False 也不保證後續調用的 put() 不被阻塞。

queue.Queue() 是 FIFO 隊列,出隊順序跟入隊順序是一致的。

queue.LifoQueue() 是 LIFO 隊列,出隊順序跟入隊順序是完全相反的,類似於棧。

優先順序隊列中的任務順序跟放入時的順序是無關的,而是按照任務的大小來排序,最小值先被取出。那任務比較大小的規則是怎麼樣的呢。

注意,因為列表的比較對規則是按照下標順序來比較的,所以在沒有比較出大小之前 ,隊列中所有列表對應下標位置的元素類型要一致。

好比 [2,1] 和 ["1","b"] 因為第一個位置的元素類型不一樣,所以是沒有辦法比較大小的,所以也就放入不了優先順序隊列。

然而對於 [2,1] 和 [1,"b"] 來說即使第二個元素的類型不一致也是可以放入優先順序隊列的,因為只需要比較第一個位置元素的大小就可以比較出結果了,就不需要比較第二個位置元素的大小了。

但是對於 [2,1] 和 1 [2,"b"] 來說,則同樣不可以放入優先順序隊列,因為需要比較第二個位置的元素才可以比較出結果,然而第二個位置的元素類型是不一致的,無法比較大小。

綜上,也就是說, 直到在比較出結果之前,對應下標位置的元素類型都是需要一致的

下面我們自定義一個動物類型,希望按照年齡大小來做優先順序排序。年齡越小優先順序越高。

本章節介紹了隊列以及其常用操作。因為隊列默認實現了鎖原語,因此在多線程編程中就不需要再考慮多線程安全問題了,對於程序員來說相當友好了。

㈩ python_隊列

1.隊列是先進先出,列表可以讀取某個指定數據
2.隊列如果將儲存的數據都讀完就結束,列表可以反復讀取
例如:

二、具體介紹一下queue
在使用queue的時候要先引入queue模塊,創建對象~
其中queue可以創建出三種對象分別是
1.先進先出行Queue(maxsize = ?)

通過上面的例子我們能發現,put 方法是往隊列放數據,但是隊列跟列表不同取完之後數據就沒有了,如果取的數據大於列表存放的數據就會卡住這時候有兩種解決辦法,第一種調用get_nowait()方法,這時候就會報異常queue.Empty,第二種就是從get自身解決,get(block = False),默認的時候block是True。
2.後進先出LifeQueue()是個縮寫是Last in first out

3.priorityQueue可以理解成vip,看你的心情讓那先出就先出

三、利用queue和多線程寫一個生產者消費者

熱點內容
c語言期末試卷 發布:2025-07-17 05:49:58 瀏覽:404
64位access資料庫 發布:2025-07-17 05:35:58 瀏覽:374
php文件的相對路徑 發布:2025-07-17 05:34:22 瀏覽:711
矢量的叉乘運演算法則 發布:2025-07-17 05:29:41 瀏覽:661
dell雲存儲伺服器 發布:2025-07-17 05:21:06 瀏覽:255
銑床怎麼編程 發布:2025-07-17 05:20:29 瀏覽:776
sql11oracle 發布:2025-07-17 05:15:39 瀏覽:744
全國各地移動dns伺服器ip地址 發布:2025-07-17 05:07:47 瀏覽:312
sdvn加密 發布:2025-07-17 05:01:36 瀏覽:739
怎麼提取電腦緩存中的音樂 發布:2025-07-17 04:53:14 瀏覽:27