pythonqueue多线程
㈠ python 多线程
python支持多线程效果还不错,很多方面都用到了python 多线程的知识,我前段时间用python 多线程写了个处理生产者和消费者的问题,把代码贴出来给你看下:
#encoding=utf-8
import threading
import random
import time
from Queue import Queue
class Procer(threading.Thread):
def __init__(self, threadname, queue):
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue
def run(self):
for i in range(20):
print self.getName(),'adding',i,'to queue'
self.sharedata.put(i)
time.sleep(random.randrange(10)/10.0)
print self.getName(),'Finished'
# Consumer thread
class Consumer(threading.Thread):
def __init__(self, threadname, queue):
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue
def run(self):
for i in range(20):
print self.getName(),'got a value:',self.sharedata.get()
time.sleep(random.randrange(10)/10.0)
print self.getName(),'Finished'
# Main thread
def main():
queue = Queue()
procer = Procer('Procer', queue)
consumer = Consumer('Consumer', queue)
print 'Starting threads ...'
procer.start()
consumer.start()
procer.join()
consumer.join()
print 'All threads have terminated.'
if __name__ == '__main__':
main()
如果你想要了解更多的python 多线程知识可以点下面的参考资料的地址,希望对有帮助!
㈡ python queue 为什么线程安全
Queue模块提供了一个适用于多线程编程的先进先出数据结构,可以用来安全的传递多线程信息。
它本身就是线程安全的,使用put和get来处理数据,不会产生对一个数据同时读写的问题,所以是安全的。
㈢ Python实现简单多线程任务队列
Python实现简单多线程任务队列
最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码):
defgradient_descent(): # the gradient descent code plotly.write(X, Y)
一般来说,当网络请求 plot.ly 绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。
一种解决办法是每调用一次 plotly.write 函数就开启一个新的线程,但是这种方法感觉不是很好。 我不想用一个像 cerely(一种分布式任务队列)一样大而全的任务队列框架,因为框架对于我的这点需求来说太重了,并且我的绘图也并不需要 redis 来持久化数据。
那用什么办法解决呢?我在 python 中写了一个很小的任务队列,它可以在一个单独的线程中调用 plotly.write函数。下面是程序代码。
classTaskQueue(Queue.Queue):
首先我们继承 Queue.Queue 类。从 Queue.Queue 类可以继承 get 和 put 方法,以及队列的行为。
def__init__(self, num_workers=1): Queue.Queue.__init__(self) self.num_workers=num_workers self.start_workers()
初始化的时候,我们可以不用考虑工作线程的数量。
defadd_task(self, task,*args,**kwargs): args=argsor() kwargs=kwargsor{} self.put((task, args, kwargs))
我们把 task, args, kwargs 以元组的形式存储在队列中。*args 可以传递数量不等的参数,**kwargs 可以传递命名参数。
defstart_workers(self): foriinrange(self.num_workers): t=Thread(target=self.worker) t.daemon=True t.start()
我们为每个 worker 创建一个线程,然后在后台删除。
下面是 worker 函数的代码:
defworker(self): whileTrue: tupl=self.get() item, args, kwargs=self.get() item(*args,**kwargs) self.task_done()
worker 函数获取队列顶端的任务,并根据输入参数运行,除此之外,没有其他的功能。下面是队列的代码:
我们可以通过下面的代码测试:
defblokkah(*args,**kwargs): time.sleep(5) print“Blokkah mofo!” q=TaskQueue(num_workers=5) foriteminrange(1): q.add_task(blokkah) q.join()# wait for all the tasks to finish. print“Alldone!”
Blokkah 是我们要做的任务名称。队列已经缓存在内存中,并且没有执行很多任务。下面的步骤是把主队列当做单独的进程来运行,这样主程序退出以及执行数据库持久化时,队列任务不会停止运行。但是这个例子很好地展示了如何从一个很简单的小任务写成像工作队列这样复杂的程序。
defgradient_descent(): # the gradient descent code queue.add_task(plotly.write, x=X, y=Y)
修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感兴趣的话,可以参考下面的代码。 classTaskQueue(Queue.Queue): def__init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers=num_workersself.start_workers() defadd_task(self, task,*args,**kwargs):args=argsor()kwargs=kwargsor{}self.put((task, args, kwargs)) defstart_workers(self):foriinrange(self.num_workers):t=Thread(target=self.worker)t.daemon=Truet.start() defworker(self):whileTrue:tupl=self.get()item, args, kwargs=self.get()item(*args,**kwargs)self.task_done() deftests():defblokkah(*args,**kwargs):time.sleep(5)print"Blokkah mofo!" q=TaskQueue(num_workers=5) foriteminrange(10):q.add_task(blokkah) q.join()# block until all tasks are doneprint"All done!" if__name__=="__main__":tests()
㈣ 多线程和队列
1、python提供两种方式使用多线程:一个是基于函数:_thread模块或者threading模块。一个是基于类:theading.Thread
使用多线程函数包装线程对象:_thread
_thead.start_new_thead(func,*args,**kwargs)
args,**kwargs是被包装函数的入参,必须传入元祖或字典
使用多线程函数包装线程对象:threading
threading._start_new_thread(func,*args,**kwargs):开启线程,带元祖或字典
threading.currentThread():返回当前线程变量
threading.enumerate():正在运行的线程列表,不含未启动和已结束线程
threading.activeCount():返回正在运行的线程数量
threading.settrace(func):为所有threading模块启动的线程设置追踪函数,在调用run方法之前,func会被传给追踪函数
threading.setprofile(func):为所有threading模块启动的线程设置性能测试函数,也是在run方法调用前就传递给性能测试函数
使用多线程类包装线程对象:threading.Thread
Thread类提供以下方法:
run():表示线程活动的方法,线程需要控制些什么活动都在这里面定义。当线程对象一但被创建,其活动一定会因调用线程的 start() 方法开始。这会在独立的控制线程调用 run() 方法。
start():开启线程活动
join():等待线程中止,阻塞当前线程直到被调用join方法的线程中止。线程A调用线程B的join方法,那线程A将会被阻塞至线程B中止。
isAlive():返回线程是否还活动
getName():获取线程名字
setName():设置线程名字
Lock对象:实例化线程锁,包含acquire方法获取锁 和 release 方法释放锁,在最开始创建锁的时候,锁为未锁定状态,调用acquire方法后锁置为锁定状态,此时其他线程再调用acquire方法就将会被阻塞至其他线程调用release方法释放锁,如果释放一个并未被锁定的锁将会抛出异常。支持上下文管理协议,直接with lock 无需调用锁定,释放方法
Rlock对象:重入锁,相比lock增加了线程和递归的概念。比如:线程目标函数F,在获得锁之后执行函数G,但函数G也需要先获得锁,此时同一线程,F获得锁,G等待,F等待G执行,就造成了死锁,此时使用rlock可避免。一旦线程获得了重入锁,同一个线程再次获取它将不阻塞;但线程必须在每次获取它时释放一次。
daemon属性:设置该线程是否是守护线程,默认为none,需要在调用start方法之前设置好
事件对象:一个线程发出事件信号 ,其他线程收到信号后作出对应活动。实例化事件对象后,初始事件标志为flase。调用其wait方法将阻塞当前所属线程,至事件标志为true时。调用set方法可将事件标志置为true,被阻塞的线程将被执行。调用clear方法可将事件标志置为flase
注意点:
1、继承threading.Thread类,初始化时要记得继承父类的__init__方法
2、run()方法只能有一个入参,故尽量把启动线程时的参数入参到初始化的时候
3、锁要设定全局的,一个子线程获得一个锁没有意义
以下实例:有一个列表,线程A从尾到头遍历元素,线程B从头到尾将元素值重置为1,设置线程锁之前线程A遍历到头部的数据已经被修改,设置线程锁之后不会再有数据不一致的情况
import threading,time
class tt(threading.Thread):
def __init__(self,name,func,ll):
threading.Thread.__init__(self) #继承父级的初始化方法
self.name=name
self.func=func #run方法只能带一个入参,故把方法入参到初始化的时候
self.ll=ll
def run(self):
print(self.name)
threadlock.acquire() #获得锁
self.func(self.ll)
threadlock.release() #释放锁
def readd(x):
a=len(x)
while a>0:
print(x[a-1])
a-=1
def sett(x):
for i in range(len(x)):
x[i]=1
print(x)
if __name__=="__main__":
l = [0,0,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
threadlock=threading.Lock() #实例化全局锁
th1=tt("read",readd,l)
th2=tt("set",sett,l)
th1.start()
th2.start()
th_list=[]
th_list.append(th1)
th_list.append(th2)
for li in th_list:
li.join() #主线程被阻塞,直到两个子线程处理结束
print("主线程结束")
2、队列
queue模块包含queue.Queue(maxsize=0)先入先出队列,queue.LifoQueue()先入后出队列,和queue.PriorityQueue()优先级可设置的队列
Queue 模块中的常用方法:
Queue.qsize() 返回队列的大小,获取的数据不可靠,因为一直有线程在操作队列,数据一直变化
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.put(block=true,timeout=none) 将item数据写入队列,block=True,设置线程是否阻塞,设置阻塞当队列数据满了之后就会阻塞,一直到队列数据不满时继续添加,如果设置不阻塞,当队列满了就会一直到timeout到后报错
Queue.get([block[, timeout]]) 取出队列数据,block=True,设置线程是否阻塞。设置阻塞,将会等待直到队列不为空有数据可取出,设置不阻塞直到超过timeout等待时间后报错
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操作。会在队列有未完成时阻塞,等待队列无未完成的任务,取出数据get()之后还需要配置task_done使用才能让等待队列数-1
import queue,time
import threading
q=queue.Queue(maxsize=5)
def sett():
a=0
while a<20:
q.put(a,True)
print("%d被put"%a)
a+=1
def gett():
time.sleep(1)
while not q.empty(): #只要队列没空,一直取数据
print("%d被取出"%q.get(True))
q.task_done() #取出一次数据,将未完成任务-1,不然使用join方法线程会一直阻塞
if __name__=="__main__":
th1=threading._start_new_thread(sett,()) #不带参数也要传入空元祖不然会报错
th2=threading._start_new_thread(gett,())
time.sleep(1) #延时主线程1S,等待put线程已经put部分数据到队列
q.join()#阻塞主线程,直到未完成任务为0
㈤ python多线程并发数量控制
python多线程如果不进行并发数量控制,在启动线程数量多到一定程度后,会造成线程无法启动的错误。
控制多线程并发数量的方法有好几钟,下面介绍用queue控制多线程并发数量的方法。python3
㈥ python3没有queue模块吗
有的,直接使用就可以了。
import queue
lr = queue.Queue()
㈦ 关于python多进程使用(Queue、生产者和消费者)
关于 的生产者和消费者的实现,刚好最近有用到,简单总结记录下:
是系统独立调度核分配系统资源(CPU、内存)的基本单位,进程之间是相互独立的,每启动一个新的进程相当于把数据进行了一次克隆。
python提供了多种方法实现了多进程中间的 (可以修改同一份数据)。
GIL 的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定。
某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个,这就导致了多线程抢占GIL耗时。这就是为什么在多核CPU上,Python 的多线程效率并不高的根本原因。
所以有必要学习下多进程的使用。
㈧ python queue是多线程么
是的。pythonqueue主要就是为多线程生产值、消费者之间线程通信提供服务,具有先进先出的数据结构。
㈨ Python Queue 入门
Queue 叫队列,是数据结构中的一种,基本上所有成熟的编程语言都内置了对 Queue 的支持。
Python 中的 Queue 模块实现了多生产者和多消费者模型,当需要在多线程编程中非常实用。而且该模块中的 Queue 类实现了锁原语,不需要再考虑多线程安全问题。
该模块内置了三种类型的 Queue,分别是 class queue.Queue(maxsize=0) , class queue.LifoQueue(maxsize=0) 和 class queue.PriorityQueue(maxsize=0) 。它们三个的区别仅仅是取出时的顺序不一致而已。
Queue 是一个 FIFO 队列,任务按照添加的顺序被取出。
LifoQueue 是一个 LIFO 队列,类似堆栈,后添加的任务先被取出。
PriorityQueue 是一个优先级队列,队列里面的任务按照优先级排序,优先级高的先被取出。
如你所见,就是上面所说的三种不同类型的内置队列,其中 maxsize 是个整数,用于设置可以放入队列中的任务数的上限。当达到这个大小的时候,插入操作将阻塞至队列中的任务被消费掉。如果 maxsize 小于等于零,则队列尺寸为无限大。
向队列中添加任务,直接调用 put() 函数即可
put() 函数完整的函数签名如下 Queue.put(item, block=True, timeout=None) ,如你所见,该函数有两个可选参数。
默认情况下,在队列满时,该函数会一直阻塞,直到队列中有空余的位置可以添加任务为止。如果 timeout 是正数,则最多阻塞 timeout 秒,如果这段时间内还没有空余的位置出来,则会引发 Full 异常。
当 block 为 false 时,timeout 参数将失效。同时如果队列中没有空余的位置可添加任务则会引发 Full 异常,否则会直接把任务放入队列并返回,不会阻塞。
另外,还可以通过 Queue.put_nowait(item) 来添加任务,相当于 Queue.put(item, False) ,不再赘述。同样,在队列满时,该操作会引发 Full 异常。
从队列中获取任务,直接调用 get() 函数即可。
与 put() 函数一样, get() 函数也有两个可选参数,完整签名如下 Queue.get(block=True, timeout=None) 。
默认情况下,当队列空时调用该函数会一直阻塞,直到队列中有任务可获取为止。如果 timeout 是正数,则最多阻塞 timeout 秒,如果这段时间内还没有任务可获取,则会引发 Empty 异常。
当 block 为 false 时,timeout 参数将失效。同时如果队列中没有任务可获取则会立刻引发 Empty 异常,否则会直接获取一个任务并返回,不会阻塞。
另外,还可以通过 Queue.get_nowait() 来获取任务,相当于 Queue.get(False) ,不再赘述。同样,在队列为空时,该操作会引发 Empty 异常。
Queue.qsize() 函数返回队列的大小。注意这个大小不是精确的,qsize() > 0 不保证后续的 get() 不被阻塞,同样 qsize() < maxsize 也不保证 put() 不被阻塞。
如果队列为空,返回 True ,否则返回 False 。如果 empty() 返回 True ,不保证后续调用的 put() 不被阻塞。类似的,如果 empty() 返回 False ,也不保证后续调用的 get() 不被阻塞。
如果队列是满的返回 True ,否则返回 False 。如果 full() 返回 True 不保证后续调用的 get() 不被阻塞。类似的,如果 full() 返回 False 也不保证后续调用的 put() 不被阻塞。
queue.Queue() 是 FIFO 队列,出队顺序跟入队顺序是一致的。
queue.LifoQueue() 是 LIFO 队列,出队顺序跟入队顺序是完全相反的,类似于栈。
优先级队列中的任务顺序跟放入时的顺序是无关的,而是按照任务的大小来排序,最小值先被取出。那任务比较大小的规则是怎么样的呢。
注意,因为列表的比较对规则是按照下标顺序来比较的,所以在没有比较出大小之前 ,队列中所有列表对应下标位置的元素类型要一致。
好比 [2,1] 和 ["1","b"] 因为第一个位置的元素类型不一样,所以是没有办法比较大小的,所以也就放入不了优先级队列。
然而对于 [2,1] 和 [1,"b"] 来说即使第二个元素的类型不一致也是可以放入优先级队列的,因为只需要比较第一个位置元素的大小就可以比较出结果了,就不需要比较第二个位置元素的大小了。
但是对于 [2,1] 和 1 [2,"b"] 来说,则同样不可以放入优先级队列,因为需要比较第二个位置的元素才可以比较出结果,然而第二个位置的元素类型是不一致的,无法比较大小。
综上,也就是说, 直到在比较出结果之前,对应下标位置的元素类型都是需要一致的 。
下面我们自定义一个动物类型,希望按照年龄大小来做优先级排序。年龄越小优先级越高。
本章节介绍了队列以及其常用操作。因为队列默认实现了锁原语,因此在多线程编程中就不需要再考虑多线程安全问题了,对于程序员来说相当友好了。
㈩ python_队列
1.队列是先进先出,列表可以读取某个指定数据
2.队列如果将储存的数据都读完就结束,列表可以反复读取
例如:
二、具体介绍一下queue
在使用queue的时候要先引入queue模块,创建对象~
其中queue可以创建出三种对象分别是
1.先进先出行Queue(maxsize = ?)
通过上面的例子我们能发现,put 方法是往队列放数据,但是队列跟列表不同取完之后数据就没有了,如果取的数据大于列表存放的数据就会卡住这时候有两种解决办法,第一种调用get_nowait()方法,这时候就会报异常queue.Empty,第二种就是从get自身解决,get(block = False),默认的时候block是True。
2.后进先出LifeQueue()是个缩写是Last in first out
3.priorityQueue可以理解成vip,看你的心情让那先出就先出
三、利用queue和多线程写一个生产者消费者
