生产者消费者python
A. python协程gevent怎么用
在学习gevent之前,你肯定要知道你学的这个东西是什么。
官方描述gevent
gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.
翻译:gevent是一个基于协程的Python网络库。我们先理解这句,也是这次学习的重点——协程。
wiki描述协程
与子例程一样,协程也是一种程序组件。相对子例程而言,协程更为一般和灵活,但在实践中使用没有子例程那样广泛。子例程的起始处是惟一的入口点,一旦退出即完成了子例程的执行,子例程的一个实例只会返回一次;协程可以通过yield来调用其它协程。通过yield方式转移执行权的协程之间不是调用者与被调用者的关系,而是彼此对称、平等的。协程允许多个入口点,可以在指定位置挂起和恢复执行。
没看懂?没关系,我也没看懂,不过算是有点线索:子例程。
子例程
过程有两种,一种叫子例程(Subroutine),通常叫Sub;另一种叫函数(Function)。底层实现机制是一样的,区别在于,Sub只执行操作,没有返回值;Function不但执行操作,并且有返回值。用过VB的应该会比较清楚这点。(原谅我用了网络)说到底子例程就是过程,我们一般叫它函数。
说到函数,我就想吐槽了,不明白为什么要叫函数。很多时候我们写一个函数是为了封装、模块化某个功能,它是一个功能、或者说是一个过程。因为它包含的是类似于流程图那样的具体逻辑,先怎样做,然后怎样做;如果遇到A情况则怎样,如果遇到B情况又怎样。个人觉得还是叫过程比较好,叫做函数就让人很纠结了,难道因为回归到底层还是计算问题,出于数学的角度把它称为函数?这个略坑啊!为了符合大家的口味,我还是称之为函数好了(其实我也习惯叫函数了%>_
讲到函数,我们就往底层深入一点,看看下面的代码:
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def a():
print "a start"
b()
print "a end"
def b():
print "b start"
c()
print "b end"
def c():
print "c start"
print "c end"
if __name__ == "__main__":
a()
a start
b start
c start
c end
b end
a end
对于这样的结果大家肯定不会意外的。每当函数被调用,就会在栈中开辟一个栈空间,调用结束后再回收该空间。
假设一个这样的场景:有个讲台,每个人都可以上去发表言论,但是每次讲台只能站一个人。现在a在上面演讲,当他说到“大家好!”的时候,b有个紧急通知要告诉大家,所以a就先下来让b讲完通知,然后a再上讲台继续演讲。如果用函数的思想模拟这个问题,堆栈示意图是这样的:
那什么东西有这样的能力呢?我们很快就可以想到进程、线程,但是你真的想使用进程、线程如此重量级的东西在这么简单的程序上吗?野蛮的抢占式机制和笨重的上下文切换!
还有一种程序组件,那就是协程。它能保留上一次调用时的状态,每次重新进入该过程的时候,就相当于回到上一次离开时所处逻辑流的位置。协程的起始处是第一个入口点,在协程里,返回点之后是接下来的入口点。协程的生命期完全由他们的使用的需要决定。每个协程在用yield命令向另一个协程交出控制时都尽可能做了更多的工作,放弃控制使得另一个协程从这个协程停止的地方开始,接下来的每次协程被调用时,都是从协程返回(或yield)的位置接着执行。
从上面这些你就可以知道其实协程是模拟了多线程(或多进程)的操作,多线程在切换的时候都会有一个上下文切换,在退出的时候将现场保存起来,等到下一次进入的时候从保存的现场开始,继续执行。
看下协程是怎样实现的:
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import random
from time import sleep
from greenlet import greenlet
from Queue import Queue
queue = Queue(1)
@greenlet
def procer():
chars = ['a', 'b', 'c', 'd', 'e']
global queue
while True:
char = random.choice(chars)
queue.put(char)
print "Proced: ", char
sleep(1)
consumer.switch()
@greenlet
def consumer():
global queue
while True:
char = queue.get()
print "Consumed: ", char
sleep(1)
procer.switch()
if __name__ == "__main__":
procer.run()
consumer.run()
应用场景
我们一直都在大谈协程是什么样一个东西,却从没有提起协程用来干嘛,这个其实大家分析一下就能够知道。从上面的生产者——消费者问题应该能看出,它分别有两个任务,假设交给两个人去执行,但每次只能允许一个人行动。当缓冲区满的时候,生产者是出于等待状态的,这个时候可以将执行任务的权利转交给消费者,当缓冲区空得时候,消费者是出于等待状态的,这个时候可以将执行任务的权利转交给生产者,是不是很容易联想到多任务切换?然后想到线程?最后想到高并发?
但同学们又会问,既然有了线程为什么还要协程呢?因为线程是系统级别的,在做切换的时候消耗是特别大的,具体为什么这么大等我研究好了再告诉你;同时线程的切换是由CPU决定的,可能你刚好执行到一个地方的时候就要被迫终止,这个时候你需要用各种措施来保证你的数据不出错,所以线程对于数据安全的操作是比较复杂的。而协程是用户级别的切换,且切换是由自己控制,不受外力终止。
总结
协程其实模拟了人类活动的一种过程。例如:你准备先写文档,然后修复bug。这时候接到电话说这个bug很严重,必须立即修复(可以看作CPU通知)。于是你暂停写文档,开始去填坑,终于你把坑填完了,你回来写文档,这个时候你肯定是接着之前写的文档继续,难道你要把之前写的给删了,重新写?这就是协程。那如果是子例程呢?那你就必须重新写了,因为退出之后,栈帧就会被弹出销毁,再次调用就是开辟新的栈空间了。
总结:协程就是用户态下的线程,是人们在有了进程、线程之后仍觉得效率不够,而追求的又一种高并发解决方案。为什么说是用户态,是因为操作系统并不知道它的存在,它是由程序员自己控制、互相协作的让出控制权而不是像进程、线程那样由操作系统调度决定是否让出控制权。
B. python生产者消费者问题
这个程序里可能有很多问题。
1.变量传递的问题,这个可能会有问题。
2.condition的用法问题。太复杂了。condition.release少加了一个(),这可能是关键。
3.isEmpty的判断问题。
goods建议用Queue,这样你就省去了condition, 也不用担心isEmpty的逻辑问题了。
如果你用进程模型,则复杂的多。线程是共享同一个内存空间的。这与GIL没有关系。
生产消费者模型经常用于任务分发型程序。 比如爬行器,一个线程或者是进程给URL,其它的下载。结果再合并。
或者是WEB SERVER,一个程序accept, 其它的线程进程只是recv, process,send
C. Python多线程中线程之间的通信(生产者和模型消费者)如何使消费者的速度大于等于生产者。
管道流是为也实现多个线程之间的I/O通信。用于在一个(或多个)线程发送数据,另一个线程(或多个)接收数据。这也类似于Procer/Consumer模式。 它可以用于实现unix下的管道,只不过是unix下管道连接的是进程,而java下连接的是线程。
D. 什么是生产者,消费者,分解者
生产者是是指绿色植物,它们能进行光合作用将太阳能转变为化学能,将无机物转化为有机物,不仅供自身生长发育的需要,也是其他生物类群的食物和能源的提供者。
消费者是指直接或间接利用生产者所制造的有机物质为食物和能量来源的生物,主要指动物,也包括某些寄生的菌类等。根据食性的不同可分为一级消费者、二级消费者等。
分解者是指生态系统中细菌、真菌和放线菌等具有分解能力的生物,也包括某些原生动物和腐食性动物。它们能把动植物残体中复杂的有机物,分解成简单的无机物,释放到环境中,供生产者再一次利用。
(4)生产者消费者python扩展阅读:
生产者的作用:
一、光合作用
即光能合成作用,是植物、藻类和某些细菌,在可见光的照射下,经过光反应和碳反应,利用光合色素,将二氧化碳(或硫化氢)和水转化为有机物,并释放出氧气(或氢气)的生化过程。光合作用是一系列复杂的代谢反应的总和,是生物界赖以生存的基础,也是地球碳氧循环的重要媒介
二、化能合成作用
自然界中存在某些微生物,它们能以二氧化碳为主要碳源,以无机含氮化合物为氮源,合成细胞物质,并通过氧化外界无机物获得生长所需要的能量。这些微生物进行的营养方式称为化能合成作用。
参考资料来源:
网络—生产者
网络—消费者
网络—分解者
E. python 多线程
python支持多线程效果还不错,很多方面都用到了python 多线程的知识,我前段时间用python 多线程写了个处理生产者和消费者的问题,把代码贴出来给你看下:
#encoding=utf-8
import threading
import random
import time
from Queue import Queue
class Procer(threading.Thread):
def __init__(self, threadname, queue):
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue
def run(self):
for i in range(20):
print self.getName(),'adding',i,'to queue'
self.sharedata.put(i)
time.sleep(random.randrange(10)/10.0)
print self.getName(),'Finished'
# Consumer thread
class Consumer(threading.Thread):
def __init__(self, threadname, queue):
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue
def run(self):
for i in range(20):
print self.getName(),'got a value:',self.sharedata.get()
time.sleep(random.randrange(10)/10.0)
print self.getName(),'Finished'
# Main thread
def main():
queue = Queue()
procer = Procer('Procer', queue)
consumer = Consumer('Consumer', queue)
print 'Starting threads ...'
procer.start()
consumer.start()
procer.join()
consumer.join()
print 'All threads have terminated.'
if __name__ == '__main__':
main()
如果你想要了解更多的python 多线程知识可以点下面的参考资料的地址,希望对有帮助!
F. 如何用 Python 构建一个简单的分布式系统
分布式爬虫概览
何谓分布式爬虫?
通俗的讲,分布式爬虫就是多台机器多个
spider
对多个
url
的同时处理问题,分布式的方式可以极大提高程序的抓取效率。
构建分布式爬虫通畅需要考虑的问题
(1)如何能保证多台机器同时抓取同一个URL?
(2)如果某个节点挂掉,会不会影响其它节点,任务如何继续?
(3)既然是分布式,如何保证架构的可伸缩性和可扩展性?不同优先级的抓取任务如何进行资源分配和调度?
基于上述问题,我选择使用celery作为分布式任务调度工具,是分布式爬虫中任务和资源调度的核心模块。它会把所有任务都通过消息队列发送给各个分布式节点进行执行,所以可以很好的保证url不会被重复抓取;它在检测到worker挂掉的情况下,会尝试向其他的worker重新发送这个任务信息,这样第二个问题也可以得到解决;celery自带任务路由,我们可以根据实际情况在不同的节点上运行不同的抓取任务(在实战篇我会讲到)。本文主要就是带大家了解一下celery的方方面面(有celery相关经验的同学和大牛可以直接跳过了)
Celery知识储备
celery基础讲解
按celery官网的介绍来说
Celery
是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
下面几个关于celery的核心知识点
broker:翻译过来叫做中间人。它是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery的worker将会取到消息,执行相应程序。这其实就是消费者和生产者之间的桥梁。
backend:
通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。
worker:
Celery类的实例,作用就是执行各种任务。注意在celery3.1.25后windows是不支持celery
worker的!
procer:
发送任务,将其传递给broker
beat:
celery实现的定时任务。可以将其理解为一个procer,因为它也是通过网络调用定时将任务发送给worker执行。注意在windows上celery是不支持定时任务的!
下面是关于celery的架构示意图,结合上面文字的话应该会更好理解
由于celery只是任务队列,而不是真正意义上的消息队列,它自身不具有存储数据的功能,所以broker和backend需要通过第三方工具来存储信息,celery官方推荐的是
RabbitMQ和Redis,另外mongodb等也可以作为broker或者backend,可能不会很稳定,我们这里选择Redis作为broker兼backend。
实际例子
先安装celery
pip
install
celery
我们以官网给出的例子来做说明,并对其进行扩展。首先在项目根目录下,这里我新建一个项目叫做celerystudy,然后切换到该项目目录下,新建文件tasks.py,然后在其中输入下面代码
这里我详细讲一下代码:我们先通过app=Celery()来实例化一个celery对象,在这个过程中,我们指定了它的broker,是redis的db
2,也指定了它的backend,是redis的db3,
broker和backend的连接形式大概是这样
redis://:password@hostname:port/db_number
然后定义了一个add函数,重点是@app.task,它的作用在我看来就是将add()
注册为一个类似服务的东西,本来只能通过本地调用的函数被它装饰后,就可以通过网络来调用。这个tasks.py中的app就是一个worker。它可以有很多任务,比如这里的任务函数add。我们再通过在命令行切换到项目根目录,执行
celery
-A
tasks
worker
-l
info
启动成功后就是下图所示的样子
这里我说一下各个参数的意思,-A指定的是app(即Celery实例)所在的文件模块,我们的app是放在tasks.py中,所以这里是
tasks;worker表示当前以worker的方式运行,难道还有别的方式?对的,比如运行定时任务就不用指定worker这个关键字;
-l
info表示该worker节点的日志等级是info,更多关于启动worker的参数(比如-c、-Q等常用的)请使用
celery
worker
--help
进行查看
将worker启动起来后,我们就可以通过网络来调用add函数了。我们在后面的分布式爬虫构建中也是采用这种方式分发和消费url的。在命令行先切换到项目根目录,然后打开python交互端
from
tasks
import
addrs
=
add.delay(2,
2)
这里的add.delay就是通过网络调用将任务发送给add所在的worker执行,这个时候我们可以在worker的界面看到接收的任务和计算的结果。
这里是异步调用,如果我们需要返回的结果,那么要等rs的ready状态true才行。这里add看不出效果,不过试想一下,如果我们是调用的比较占时间的io任务,那么异步任务就比较有价值了
上面讲的是从Python交互终端中调用add函数,如果我们要从另外一个py文件调用呢?除了通过import然后add.delay()这种方式,我们还可以通过send_task()这种方式,我们在项目根目录另外新建一个py文件叫做
excute_tasks.py,在其中写下如下的代码
from
tasks
import
addif
__name__
==
'__main__':
add.delay(5,
10)
这时候可以在celery的worker界面看到执行的结果
此外,我们还可以通过send_task()来调用,将excute_tasks.py改成这样
这种方式也是可以的。send_task()还可能接收到为注册(即通过@app.task装饰)的任务,这个时候worker会忽略这个消息
定时任务
上面部分讲了怎么启动worker和调用worker的相关函数,这里再讲一下celery的定时任务。
爬虫由于其特殊性,可能需要定时做增量抓取,也可能需要定时做模拟登陆,以防止cookie过期,而celery恰恰就实现了定时任务的功能。在上述基础上,我们将tasks.py文件改成如下内容
然后先通过ctrl+c停掉前一个worker,因为我们代码改了,需要重启worker才会生效。我们再次以celery
-A
tasks
worker
-l
info这个命令开启worker。
这个时候我们只是开启了worker,如果要让worker执行任务,那么还需要通过beat给它定时发送,我们再开一个命令行,切换到项目根目录,通过
这样就表示定时任务已经开始运行了。
眼尖的同学可能看到我这里celery的版本是3.1.25,这是因为celery支持的windows最高版本是3.1.25。由于我的分布式微博爬虫的worker也同时部署在了windows上,所以我选择了使用
3.1.25。如果全是linux系统,建议使用celery4。
此外,还有一点需要注意,在celery4后,定时任务(通过schele调度的会这样,通过crontab调度的会马上执行)会在当前时间再过定时间隔执行第一次任务,比如我这里设置的是60秒的间隔,那么第一次执行add会在我们通过celery
beat
-A
tasks
-l
info启动定时任务后60秒才执行;celery3.1.25则会马上执行该任务
G. 使用Python多线程如何实现生产者消费者模式
在enqueue和dequeue方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。
如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。
H. 我想让系统一边采集数据一边处理,python多线程怎样弄
查一下生产者消费者模式,python的生产者消费者模式的框架,在框架上改改应该就可以满足你的需求。