生產者消費者python
A. python協程gevent怎麼用
在學習gevent之前,你肯定要知道你學的這個東西是什麼。
官方描述gevent
gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.
翻譯:gevent是一個基於協程的Python網路庫。我們先理解這句,也是這次學習的重點——協程。
wiki描述協程
與子常式一樣,協程也是一種程序組件。相對子常式而言,協程更為一般和靈活,但在實踐中使用沒有子常式那樣廣泛。子常式的起始處是惟一的入口點,一旦退出即完成了子常式的執行,子常式的一個實例只會返回一次;協程可以通過yield來調用其它協程。通過yield方式轉移執行權的協程之間不是調用者與被調用者的關系,而是彼此對稱、平等的。協程允許多個入口點,可以在指定位置掛起和恢復執行。
沒看懂?沒關系,我也沒看懂,不過算是有點線索:子常式。
子常式
過程有兩種,一種叫子常式(Subroutine),通常叫Sub;另一種叫函數(Function)。底層實現機制是一樣的,區別在於,Sub只執行操作,沒有返回值;Function不但執行操作,並且有返回值。用過VB的應該會比較清楚這點。(原諒我用了網路)說到底子常式就是過程,我們一般叫它函數。
說到函數,我就想吐槽了,不明白為什麼要叫函數。很多時候我們寫一個函數是為了封裝、模塊化某個功能,它是一個功能、或者說是一個過程。因為它包含的是類似於流程圖那樣的具體邏輯,先怎樣做,然後怎樣做;如果遇到A情況則怎樣,如果遇到B情況又怎樣。個人覺得還是叫過程比較好,叫做函數就讓人很糾結了,難道因為回歸到底層還是計算問題,出於數學的角度把它稱為函數?這個略坑啊!為了符合大家的口味,我還是稱之為函數好了(其實我也習慣叫函數了%>_
講到函數,我們就往底層深入一點,看看下面的代碼:
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def a():
print "a start"
b()
print "a end"
def b():
print "b start"
c()
print "b end"
def c():
print "c start"
print "c end"
if __name__ == "__main__":
a()
a start
b start
c start
c end
b end
a end
對於這樣的結果大家肯定不會意外的。每當函數被調用,就會在棧中開辟一個棧空間,調用結束後再回收該空間。
假設一個這樣的場景:有個講台,每個人都可以上去發表言論,但是每次講台只能站一個人。現在a在上面演講,當他說到「大家好!」的時候,b有個緊急通知要告訴大家,所以a就先下來讓b講完通知,然後a再上講台繼續演講。如果用函數的思想模擬這個問題,堆棧示意圖是這樣的:
那什麼東西有這樣的能力呢?我們很快就可以想到進程、線程,但是你真的想使用進程、線程如此重量級的東西在這么簡單的程序上嗎?野蠻的搶占式機制和笨重的上下文切換!
還有一種程序組件,那就是協程。它能保留上一次調用時的狀態,每次重新進入該過程的時候,就相當於回到上一次離開時所處邏輯流的位置。協程的起始處是第一個入口點,在協程里,返回點之後是接下來的入口點。協程的生命期完全由他們的使用的需要決定。每個協程在用yield命令向另一個協程交出控制時都盡可能做了更多的工作,放棄控制使得另一個協程從這個協程停止的地方開始,接下來的每次協程被調用時,都是從協程返回(或yield)的位置接著執行。
從上面這些你就可以知道其實協程是模擬了多線程(或多進程)的操作,多線程在切換的時候都會有一個上下文切換,在退出的時候將現場保存起來,等到下一次進入的時候從保存的現場開始,繼續執行。
看下協程是怎樣實現的:
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import random
from time import sleep
from greenlet import greenlet
from Queue import Queue
queue = Queue(1)
@greenlet
def procer():
chars = ['a', 'b', 'c', 'd', 'e']
global queue
while True:
char = random.choice(chars)
queue.put(char)
print "Proced: ", char
sleep(1)
consumer.switch()
@greenlet
def consumer():
global queue
while True:
char = queue.get()
print "Consumed: ", char
sleep(1)
procer.switch()
if __name__ == "__main__":
procer.run()
consumer.run()
應用場景
我們一直都在大談協程是什麼樣一個東西,卻從沒有提起協程用來幹嘛,這個其實大家分析一下就能夠知道。從上面的生產者——消費者問題應該能看出,它分別有兩個任務,假設交給兩個人去執行,但每次只能允許一個人行動。當緩沖區滿的時候,生產者是出於等待狀態的,這個時候可以將執行任務的權利轉交給消費者,當緩沖區空得時候,消費者是出於等待狀態的,這個時候可以將執行任務的權利轉交給生產者,是不是很容易聯想到多任務切換?然後想到線程?最後想到高並發?
但同學們又會問,既然有了線程為什麼還要協程呢?因為線程是系統級別的,在做切換的時候消耗是特別大的,具體為什麼這么大等我研究好了再告訴你;同時線程的切換是由CPU決定的,可能你剛好執行到一個地方的時候就要被迫終止,這個時候你需要用各種措施來保證你的數據不出錯,所以線程對於數據安全的操作是比較復雜的。而協程是用戶級別的切換,且切換是由自己控制,不受外力終止。
總結
協程其實模擬了人類活動的一種過程。例如:你准備先寫文檔,然後修復bug。這時候接到電話說這個bug很嚴重,必須立即修復(可以看作CPU通知)。於是你暫停寫文檔,開始去填坑,終於你把坑填完了,你回來寫文檔,這個時候你肯定是接著之前寫的文檔繼續,難道你要把之前寫的給刪了,重新寫?這就是協程。那如果是子常式呢?那你就必須重新寫了,因為退出之後,棧幀就會被彈出銷毀,再次調用就是開辟新的棧空間了。
總結:協程就是用戶態下的線程,是人們在有了進程、線程之後仍覺得效率不夠,而追求的又一種高並發解決方案。為什麼說是用戶態,是因為操作系統並不知道它的存在,它是由程序員自己控制、互相協作的讓出控制權而不是像進程、線程那樣由操作系統調度決定是否讓出控制權。
B. python生產者消費者問題
這個程序里可能有很多問題。
1.變數傳遞的問題,這個可能會有問題。
2.condition的用法問題。太復雜了。condition.release少加了一個(),這可能是關鍵。
3.isEmpty的判斷問題。
goods建議用Queue,這樣你就省去了condition, 也不用擔心isEmpty的邏輯問題了。
如果你用進程模型,則復雜的多。線程是共享同一個內存空間的。這與GIL沒有關系。
生產消費者模型經常用於任務分發型程序。 比如爬行器,一個線程或者是進程給URL,其它的下載。結果再合並。
或者是WEB SERVER,一個程序accept, 其它的線程進程只是recv, process,send
C. Python多線程中線程之間的通信(生產者和模型消費者)如何使消費者的速度大於等於生產者。
管道流是為也實現多個線程之間的I/O通信。用於在一個(或多個)線程發送數據,另一個線程(或多個)接收數據。這也類似於Procer/Consumer模式。 它可以用於實現unix下的管道,只不過是unix下管道連接的是進程,而java下連接的是線程。
D. 什麼是生產者,消費者,分解者
生產者是是指綠色植物,它們能進行光合作用將太陽能轉變為化學能,將無機物轉化為有機物,不僅供自身生長發育的需要,也是其他生物類群的食物和能源的提供者。
消費者是指直接或間接利用生產者所製造的有機物質為食物和能量來源的生物,主要指動物,也包括某些寄生的菌類等。根據食性的不同可分為一級消費者、二級消費者等。
分解者是指生態系統中細菌、真菌和放線菌等具有分解能力的生物,也包括某些原生動物和腐食性動物。它們能把動植物殘體中復雜的有機物,分解成簡單的無機物,釋放到環境中,供生產者再一次利用。
(4)生產者消費者python擴展閱讀:
生產者的作用:
一、光合作用
即光能合成作用,是植物、藻類和某些細菌,在可見光的照射下,經過光反應和碳反應,利用光合色素,將二氧化碳(或硫化氫)和水轉化為有機物,並釋放出氧氣(或氫氣)的生化過程。光合作用是一系列復雜的代謝反應的總和,是生物界賴以生存的基礎,也是地球碳氧循環的重要媒介
二、化能合成作用
自然界中存在某些微生物,它們能以二氧化碳為主要碳源,以無機含氮化合物為氮源,合成細胞物質,並通過氧化外界無機物獲得生長所需要的能量。這些微生物進行的營養方式稱為化能合成作用。
參考資料來源:
網路—生產者
網路—消費者
網路—分解者
E. python 多線程
python支持多線程效果還不錯,很多方面都用到了python 多線程的知識,我前段時間用python 多線程寫了個處理生產者和消費者的問題,把代碼貼出來給你看下:
#encoding=utf-8
import threading
import random
import time
from Queue import Queue
class Procer(threading.Thread):
def __init__(self, threadname, queue):
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue
def run(self):
for i in range(20):
print self.getName(),'adding',i,'to queue'
self.sharedata.put(i)
time.sleep(random.randrange(10)/10.0)
print self.getName(),'Finished'
# Consumer thread
class Consumer(threading.Thread):
def __init__(self, threadname, queue):
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue
def run(self):
for i in range(20):
print self.getName(),'got a value:',self.sharedata.get()
time.sleep(random.randrange(10)/10.0)
print self.getName(),'Finished'
# Main thread
def main():
queue = Queue()
procer = Procer('Procer', queue)
consumer = Consumer('Consumer', queue)
print 'Starting threads ...'
procer.start()
consumer.start()
procer.join()
consumer.join()
print 'All threads have terminated.'
if __name__ == '__main__':
main()
如果你想要了解更多的python 多線程知識可以點下面的參考資料的地址,希望對有幫助!
F. 如何用 Python 構建一個簡單的分布式系統
分布式爬蟲概覽
何謂分布式爬蟲?
通俗的講,分布式爬蟲就是多台機器多個
spider
對多個
url
的同時處理問題,分布式的方式可以極大提高程序的抓取效率。
構建分布式爬蟲通暢需要考慮的問題
(1)如何能保證多台機器同時抓取同一個URL?
(2)如果某個節點掛掉,會不會影響其它節點,任務如何繼續?
(3)既然是分布式,如何保證架構的可伸縮性和可擴展性?不同優先順序的抓取任務如何進行資源分配和調度?
基於上述問題,我選擇使用celery作為分布式任務調度工具,是分布式爬蟲中任務和資源調度的核心模塊。它會把所有任務都通過消息隊列發送給各個分布式節點進行執行,所以可以很好的保證url不會被重復抓取;它在檢測到worker掛掉的情況下,會嘗試向其他的worker重新發送這個任務信息,這樣第二個問題也可以得到解決;celery自帶任務路由,我們可以根據實際情況在不同的節點上運行不同的抓取任務(在實戰篇我會講到)。本文主要就是帶大家了解一下celery的方方面面(有celery相關經驗的同學和大牛可以直接跳過了)
Celery知識儲備
celery基礎講解
按celery官網的介紹來說
Celery
是一個簡單、靈活且可靠的,處理大量消息的分布式系統,並且提供維護這樣一個系統的必需工具。它是一個專注於實時處理的任務隊列,同時也支持任務調度。
下面幾個關於celery的核心知識點
broker:翻譯過來叫做中間人。它是一個消息傳輸的中間件,可以理解為一個郵箱。每當應用程序調用celery的非同步任務的時候,會向broker傳遞消息,而後celery的worker將會取到消息,執行相應程序。這其實就是消費者和生產者之間的橋梁。
backend:
通常程序發送的消息,發完就完了,可能都不知道對方時候接受了。為此,celery實現了一個backend,用於存儲這些消息以及celery執行的一些消息和結果。
worker:
Celery類的實例,作用就是執行各種任務。注意在celery3.1.25後windows是不支持celery
worker的!
procer:
發送任務,將其傳遞給broker
beat:
celery實現的定時任務。可以將其理解為一個procer,因為它也是通過網路調用定時將任務發送給worker執行。注意在windows上celery是不支持定時任務的!
下面是關於celery的架構示意圖,結合上面文字的話應該會更好理解
由於celery只是任務隊列,而不是真正意義上的消息隊列,它自身不具有存儲數據的功能,所以broker和backend需要通過第三方工具來存儲信息,celery官方推薦的是
RabbitMQ和Redis,另外mongodb等也可以作為broker或者backend,可能不會很穩定,我們這里選擇Redis作為broker兼backend。
實際例子
先安裝celery
pip
install
celery
我們以官網給出的例子來做說明,並對其進行擴展。首先在項目根目錄下,這里我新建一個項目叫做celerystudy,然後切換到該項目目錄下,新建文件tasks.py,然後在其中輸入下面代碼
這里我詳細講一下代碼:我們先通過app=Celery()來實例化一個celery對象,在這個過程中,我們指定了它的broker,是redis的db
2,也指定了它的backend,是redis的db3,
broker和backend的連接形式大概是這樣
redis://:password@hostname:port/db_number
然後定義了一個add函數,重點是@app.task,它的作用在我看來就是將add()
注冊為一個類似服務的東西,本來只能通過本地調用的函數被它裝飾後,就可以通過網路來調用。這個tasks.py中的app就是一個worker。它可以有很多任務,比如這里的任務函數add。我們再通過在命令行切換到項目根目錄,執行
celery
-A
tasks
worker
-l
info
啟動成功後就是下圖所示的樣子
這里我說一下各個參數的意思,-A指定的是app(即Celery實例)所在的文件模塊,我們的app是放在tasks.py中,所以這里是
tasks;worker表示當前以worker的方式運行,難道還有別的方式?對的,比如運行定時任務就不用指定worker這個關鍵字;
-l
info表示該worker節點的日誌等級是info,更多關於啟動worker的參數(比如-c、-Q等常用的)請使用
celery
worker
--help
進行查看
將worker啟動起來後,我們就可以通過網路來調用add函數了。我們在後面的分布式爬蟲構建中也是採用這種方式分發和消費url的。在命令行先切換到項目根目錄,然後打開python交互端
from
tasks
import
addrs
=
add.delay(2,
2)
這里的add.delay就是通過網路調用將任務發送給add所在的worker執行,這個時候我們可以在worker的界面看到接收的任務和計算的結果。
這里是非同步調用,如果我們需要返回的結果,那麼要等rs的ready狀態true才行。這里add看不出效果,不過試想一下,如果我們是調用的比較占時間的io任務,那麼非同步任務就比較有價值了
上面講的是從Python交互終端中調用add函數,如果我們要從另外一個py文件調用呢?除了通過import然後add.delay()這種方式,我們還可以通過send_task()這種方式,我們在項目根目錄另外新建一個py文件叫做
excute_tasks.py,在其中寫下如下的代碼
from
tasks
import
addif
__name__
==
'__main__':
add.delay(5,
10)
這時候可以在celery的worker界面看到執行的結果
此外,我們還可以通過send_task()來調用,將excute_tasks.py改成這樣
這種方式也是可以的。send_task()還可能接收到為注冊(即通過@app.task裝飾)的任務,這個時候worker會忽略這個消息
定時任務
上面部分講了怎麼啟動worker和調用worker的相關函數,這里再講一下celery的定時任務。
爬蟲由於其特殊性,可能需要定時做增量抓取,也可能需要定時做模擬登陸,以防止cookie過期,而celery恰恰就實現了定時任務的功能。在上述基礎上,我們將tasks.py文件改成如下內容
然後先通過ctrl+c停掉前一個worker,因為我們代碼改了,需要重啟worker才會生效。我們再次以celery
-A
tasks
worker
-l
info這個命令開啟worker。
這個時候我們只是開啟了worker,如果要讓worker執行任務,那麼還需要通過beat給它定時發送,我們再開一個命令行,切換到項目根目錄,通過
這樣就表示定時任務已經開始運行了。
眼尖的同學可能看到我這里celery的版本是3.1.25,這是因為celery支持的windows最高版本是3.1.25。由於我的分布式微博爬蟲的worker也同時部署在了windows上,所以我選擇了使用
3.1.25。如果全是linux系統,建議使用celery4。
此外,還有一點需要注意,在celery4後,定時任務(通過schele調度的會這樣,通過crontab調度的會馬上執行)會在當前時間再過定時間隔執行第一次任務,比如我這里設置的是60秒的間隔,那麼第一次執行add會在我們通過celery
beat
-A
tasks
-l
info啟動定時任務後60秒才執行;celery3.1.25則會馬上執行該任務
G. 使用Python多線程如何實現生產者消費者模式
在enqueue和dequeue方法內部,只有隊列的大小等於上限(limit)或者下限(0)時,才調用notifyAll方法。
如果隊列的大小既不等於上限,也不等於下限,任何線程調用enqueue或者dequeue方法時,都不會阻塞,都能夠正常的往隊列中添加或者移除元素。
H. 我想讓系統一邊採集數據一邊處理,python多線程怎樣弄
查一下生產者消費者模式,python的生產者消費者模式的框架,在框架上改改應該就可以滿足你的需求。