python分布式調度
A. python爬蟲技術有哪些做的比較好的
基礎爬蟲:(1)基礎庫:urllib模塊/requests第三方模塊首先爬蟲就是要從網頁上把我們需要的信息抓取下來的,那麼我們就要學習urllib/requests模塊,這兩種模塊是負責爬取網頁的。這里大家覺得哪一種用的習慣就用哪一種,選擇一種精通就好了。我推薦讀者使用使用requests模塊,因為這一種簡便很多,容易操作、容易理解,所以requests被稱為「人性化模塊」。(2)多進程、多線程、協程和分布式進程:為什麼要學著四個知識呢?假如你要爬取200萬條的數據,使用一般的單進程或者單線程的話,你爬取下載這些數據,也許要一個星期或是更久。試問這是你想要看到的結果嗎?顯然單進程和單線程不要滿足我們追求的高效率,太浪費時間了。只要設置好多進程和多線程,爬取數據的速度可以提高10倍甚至更高的效率。(3)網頁解析提取庫:xpath/BeautifulSoup4/正則表達式通過前面的(1)和(2)爬取下來的是網頁源代碼,這里有很多並不是我們想要的信息,所以需要將沒用的信息過濾掉,留下對我們有價值的信息。這里有三種解析器,三種在不同的場景各有特色也各有不足,總的來說,學會這三種靈活運用會很方便的。推薦理解能力不是很強的朋友或是剛入門爬蟲的朋友,學習BeautifulSoup4是很容易掌握並能夠快速應用實戰的,功能也非常強大。(4)反屏蔽:請求頭/代理伺服器/cookie在爬取網頁的時候有時會失敗,因為別人網站設置了反爬蟲措施了,這個時候就需要我們去偽裝自己的行為,讓對方網站察覺不到我們就是爬蟲方。請求頭設置,主要是模擬成瀏覽器的行為;IP被屏蔽了,就需要使用代理伺服器來破解;而cookie是模擬成登錄的行為進入網站。
B. python分布式爬蟲是什麼意思
一、分布式爬蟲架構
在了解分布式爬蟲架構之前,首先回顧一下Scrapy的架構,如下圖所示。
我們需要做的就是在多台主機上同時運行爬蟲任務協同爬取,而協同爬取的前提就是共享爬取隊列。這樣各台主機就不需要各自維護爬取隊列,而是從共享爬取隊列存取Request。但是各台主機還是有各自的Scheler和Downloader,所以調度和下載功能分別完成。如果不考慮隊列存取性能消耗,爬取效率還是會成倍提高。
二、維護爬取隊列
那麼這個隊列用什麼來維護?首先需要考慮的就是性能問題。我們自然想到的是基於內存存儲的Redis,它支持多種數據結構,例如列表(List)、集合(Set)、有序集合(Sorted Set)等,存取的操作也非常簡單。
Redis支持的這幾種數據結構存儲各有優點。
列表有lpush()、lpop()、rpush()、rpop()方法,我們可以用它來實現先進先出式爬取隊列,也可以實現先進後出棧式爬取隊列。
集合的元素是無序的且不重復的,這樣我們可以非常方便地實現隨機排序且不重復的爬取隊列。
有序集合帶有分數表示,而Scrapy的Request也有優先順序的控制,我們可以用它來實現帶優先順序調度的隊列。
我們需要根據具體爬蟲的需求來靈活選擇不同的隊列。
三、如何去重
Scrapy有自動去重,它的去重使用了Python中的集合。這個集合記錄了Scrapy中每個Request的指紋,這個指紋實際上就是Request的散列值。我們可以看看Scrapy的源代碼,如下所示:
importhashlib
defrequest_fingerprint(request, include_headers=None):
ifinclude_headers:
include_headers = tuple(to_bytes(h.lower())
forhinsorted(include_headers))
cache = _fingerprint_cache.setdefault(request, {})
ifinclude_headersnotincache:
fp = hashlib.sha1()
fp.update(to_bytes(request.method))
fp.update(to_bytes(canonicalize_url(request.url)))
fp.update(request.bodyorb'')
ifinclude_headers:
forhdrininclude_headers:
ifhdrinrequest.headers:
fp.update(hdr)
forvinrequest.headers.getlist(hdr):
fp.update(v)
cache[include_headers] = fp.hexdigest()
returncache[include_headers]
request_fingerprint()就是計算Request指紋的方法,其方法內部使用的是hashlib的sha1()方法。計算的欄位包括Request的Method、URL、Body、Headers這幾部分內容,這里只要有一點不同,那麼計算的結果就不同。計算得到的結果是加密後的字元串,也就是指紋。每個Request都有獨有的指紋,指紋就是一個字元串,判定字元串是否重復比判定Request對象是否重復容易得多,所以指紋可以作為判定Request是否重復的依據。
那麼我們如何判定重復呢?Scrapy是這樣實現的,如下所示:
def__init__(self):
self.fingerprints = set()
defrequest_seen(self, request):
fp = self.request_fingerprint(request)
iffpinself.fingerprints:
returnTrue
self.fingerprints.add(fp)
在去重的類RFPDupeFilter中,有一個request_seen()方法,這個方法有一個參數request,它的作用就是檢測該Request對象是否重復。這個方法調用request_fingerprint()獲取該Request的指紋,檢測這個指紋是否存在於fingerprints變數中,而fingerprints是一個集合,集合的元素都是不重復的。如果指紋存在,那麼就返回True,說明該Request是重復的,否則這個指紋加入到集合中。如果下次還有相同的Request傳遞過來,指紋也是相同的,那麼這時指紋就已經存在於集合中,Request對象就會直接判定為重復。這樣去重的目的就實現了。
Scrapy的去重過程就是,利用集合元素的不重復特性來實現Request的去重。
對於分布式爬蟲來說,我們肯定不能再用每個爬蟲各自的集合來去重了。因為這樣還是每個主機單獨維護自己的集合,不能做到共享。多台主機如果生成了相同的Request,只能各自去重,各個主機之間就無法做到去重了。
那麼要實現去重,這個指紋集合也需要是共享的,Redis正好有集合的存儲數據結構,我們可以利用Redis的集合作為指紋集合,那麼這樣去重集合也是利用Redis共享的。每台主機新生成Request之後,把該Request的指紋與集合比對,如果指紋已經存在,說明該Request是重復的,否則將Request的指紋加入到這個集合中即可。利用同樣的原理不同的存儲結構我們也實現了分布式Reqeust的去重。
四、防止中斷
在Scrapy中,爬蟲運行時的Request隊列放在內存中。爬蟲運行中斷後,這個隊列的空間就被釋放,此隊列就被銷毀了。所以一旦爬蟲運行中斷,爬蟲再次運行就相當於全新的爬取過程。
要做到中斷後繼續爬取,我們可以將隊列中的Request保存起來,下次爬取直接讀取保存數據即可獲取上次爬取的隊列。我們在Scrapy中指定一個爬取隊列的存儲路徑即可,這個路徑使用JOB_DIR變數來標識,我們可以用如下命令來實現:
scrapy crawl spider -s JOB_DIR=crawls/spider
更加詳細的使用方法可以參見官方文檔,鏈接為:https://doc.scrapy.org/en/latest/topics/jobs.html。
在Scrapy中,我們實際是把爬取隊列保存到本地,第二次爬取直接讀取並恢復隊列即可。那麼在分布式架構中我們還用擔心這個問題嗎?不需要。因為爬取隊列本身就是用資料庫保存的,如果爬蟲中斷了,資料庫中的Request依然是存在的,下次啟動就會接著上次中斷的地方繼續爬取。
所以,當Redis的隊列為空時,爬蟲會重新爬取;當Redis的隊列不為空時,爬蟲便會接著上次中斷之處繼續爬取。
五、架構實現
我們接下來就需要在程序中實現這個架構了。首先實現一個共享的爬取隊列,還要實現去重的功能。另外,重寫一個Scheer的實現,使之可以從共享的爬取隊列存取Request。
幸運的是,已經有人實現了這些邏輯和架構,並發布成叫Scrapy-Redis的Python包。接下來,我們看看Scrapy-Redis的源碼實現,以及它的詳細工作原理
C. python rq 的這個庫,可以用於分布式嗎
只能應付簡單的非同步計算需求而已。復雜的分布式計算不是很靠譜(因為 rq 實在是太簡單了,個人認為它缺少資源調度、穩定性保障和監控機制),而且對中文支持有 bug ,提交過 patch 忘記作者合了沒有。
D. 如何用 Python 構建一個簡單的分布式系統
分布式爬蟲概覽
何謂分布式爬蟲?
通俗的講,分布式爬蟲就是多台機器多個
spider
對多個
url
的同時處理問題,分布式的方式可以極大提高程序的抓取效率。
構建分布式爬蟲通暢需要考慮的問題
(1)如何能保證多台機器同時抓取同一個URL?
(2)如果某個節點掛掉,會不會影響其它節點,任務如何繼續?
(3)既然是分布式,如何保證架構的可伸縮性和可擴展性?不同優先順序的抓取任務如何進行資源分配和調度?
基於上述問題,我選擇使用celery作為分布式任務調度工具,是分布式爬蟲中任務和資源調度的核心模塊。它會把所有任務都通過消息隊列發送給各個分布式節點進行執行,所以可以很好的保證url不會被重復抓取;它在檢測到worker掛掉的情況下,會嘗試向其他的worker重新發送這個任務信息,這樣第二個問題也可以得到解決;celery自帶任務路由,我們可以根據實際情況在不同的節點上運行不同的抓取任務(在實戰篇我會講到)。本文主要就是帶大家了解一下celery的方方面面(有celery相關經驗的同學和大牛可以直接跳過了)
Celery知識儲備
celery基礎講解
按celery官網的介紹來說
Celery
是一個簡單、靈活且可靠的,處理大量消息的分布式系統,並且提供維護這樣一個系統的必需工具。它是一個專注於實時處理的任務隊列,同時也支持任務調度。
下面幾個關於celery的核心知識點
broker:翻譯過來叫做中間人。它是一個消息傳輸的中間件,可以理解為一個郵箱。每當應用程序調用celery的非同步任務的時候,會向broker傳遞消息,而後celery的worker將會取到消息,執行相應程序。這其實就是消費者和生產者之間的橋梁。
backend:
通常程序發送的消息,發完就完了,可能都不知道對方時候接受了。為此,celery實現了一個backend,用於存儲這些消息以及celery執行的一些消息和結果。
worker:
Celery類的實例,作用就是執行各種任務。注意在celery3.1.25後windows是不支持celery
worker的!
procer:
發送任務,將其傳遞給broker
beat:
celery實現的定時任務。可以將其理解為一個procer,因為它也是通過網路調用定時將任務發送給worker執行。注意在windows上celery是不支持定時任務的!
下面是關於celery的架構示意圖,結合上面文字的話應該會更好理解
由於celery只是任務隊列,而不是真正意義上的消息隊列,它自身不具有存儲數據的功能,所以broker和backend需要通過第三方工具來存儲信息,celery官方推薦的是
RabbitMQ和Redis,另外mongodb等也可以作為broker或者backend,可能不會很穩定,我們這里選擇Redis作為broker兼backend。
實際例子
先安裝celery
pip
install
celery
我們以官網給出的例子來做說明,並對其進行擴展。首先在項目根目錄下,這里我新建一個項目叫做celerystudy,然後切換到該項目目錄下,新建文件tasks.py,然後在其中輸入下面代碼
這里我詳細講一下代碼:我們先通過app=Celery()來實例化一個celery對象,在這個過程中,我們指定了它的broker,是redis的db
2,也指定了它的backend,是redis的db3,
broker和backend的連接形式大概是這樣
redis://:password@hostname:port/db_number
然後定義了一個add函數,重點是@app.task,它的作用在我看來就是將add()
注冊為一個類似服務的東西,本來只能通過本地調用的函數被它裝飾後,就可以通過網路來調用。這個tasks.py中的app就是一個worker。它可以有很多任務,比如這里的任務函數add。我們再通過在命令行切換到項目根目錄,執行
celery
-A
tasks
worker
-l
info
啟動成功後就是下圖所示的樣子
這里我說一下各個參數的意思,-A指定的是app(即Celery實例)所在的文件模塊,我們的app是放在tasks.py中,所以這里是
tasks;worker表示當前以worker的方式運行,難道還有別的方式?對的,比如運行定時任務就不用指定worker這個關鍵字;
-l
info表示該worker節點的日誌等級是info,更多關於啟動worker的參數(比如-c、-Q等常用的)請使用
celery
worker
--help
進行查看
將worker啟動起來後,我們就可以通過網路來調用add函數了。我們在後面的分布式爬蟲構建中也是採用這種方式分發和消費url的。在命令行先切換到項目根目錄,然後打開python交互端
from
tasks
import
addrs
=
add.delay(2,
2)
這里的add.delay就是通過網路調用將任務發送給add所在的worker執行,這個時候我們可以在worker的界面看到接收的任務和計算的結果。
這里是非同步調用,如果我們需要返回的結果,那麼要等rs的ready狀態true才行。這里add看不出效果,不過試想一下,如果我們是調用的比較占時間的io任務,那麼非同步任務就比較有價值了
上面講的是從Python交互終端中調用add函數,如果我們要從另外一個py文件調用呢?除了通過import然後add.delay()這種方式,我們還可以通過send_task()這種方式,我們在項目根目錄另外新建一個py文件叫做
excute_tasks.py,在其中寫下如下的代碼
from
tasks
import
addif
__name__
==
'__main__':
add.delay(5,
10)
這時候可以在celery的worker界面看到執行的結果
此外,我們還可以通過send_task()來調用,將excute_tasks.py改成這樣
這種方式也是可以的。send_task()還可能接收到為注冊(即通過@app.task裝飾)的任務,這個時候worker會忽略這個消息
定時任務
上面部分講了怎麼啟動worker和調用worker的相關函數,這里再講一下celery的定時任務。
爬蟲由於其特殊性,可能需要定時做增量抓取,也可能需要定時做模擬登陸,以防止cookie過期,而celery恰恰就實現了定時任務的功能。在上述基礎上,我們將tasks.py文件改成如下內容
然後先通過ctrl+c停掉前一個worker,因為我們代碼改了,需要重啟worker才會生效。我們再次以celery
-A
tasks
worker
-l
info這個命令開啟worker。
這個時候我們只是開啟了worker,如果要讓worker執行任務,那麼還需要通過beat給它定時發送,我們再開一個命令行,切換到項目根目錄,通過
這樣就表示定時任務已經開始運行了。
眼尖的同學可能看到我這里celery的版本是3.1.25,這是因為celery支持的windows最高版本是3.1.25。由於我的分布式微博爬蟲的worker也同時部署在了windows上,所以我選擇了使用
3.1.25。如果全是linux系統,建議使用celery4。
此外,還有一點需要注意,在celery4後,定時任務(通過schele調度的會這樣,通過crontab調度的會馬上執行)會在當前時間再過定時間隔執行第一次任務,比如我這里設置的是60秒的間隔,那麼第一次執行add會在我們通過celery
beat
-A
tasks
-l
info啟動定時任務後60秒才執行;celery3.1.25則會馬上執行該任務