當前位置:首頁 » 編程語言 » python阻塞

python阻塞

發布時間: 2025-05-12 02:09:20

1. pythonsocketrecv阻塞

在Python中,socket可以分為阻塞和非阻塞兩種類型。設置方式可以通過setsockopt、setblocking或者settimeout實現。阻塞型socket的recv遵循規則:緩沖區有數據時立即返回所有數據;緩沖區無數據時,阻塞至有數據。而非阻塞型socket的recv遵循規則:緩沖區有數據時立即返回所有數據;緩沖區無數據時產生EAGAIN錯誤並返回(在Python中表現為異常)。兩種情況下都不會返回空字元串,返回空數據的結果是對方關閉連接後才會出現。

由於TCP socket是一個流,所以不存在"讀完了對方發送的數據"這一概念。每次讀取數據後,需根據數據本身判斷是否已收到全部所需數據,以決定是否執行下一個recv操作。

以hiredis庫的介面設計為例,該庫中的Reader包含兩個介面:feed和gets。feed介面用於輸入部分數據,無需確保數據分片正確;gets介面返回已獲取的完整結果,若返回False表示無新結果。TCP socket編程通常遵循此方法:讀取新數據;判斷是否存在完整新消息;處理新消息或等待更多數據。

2. python:如何以非阻塞的方式讀

代碼是這樣的:
subp = subprocess.Popen(["d:/T1.exe"], shell=True, stdout=subprocess.PIPE, bufsize=0)
subp.stdout.read()

但是發現read和readline函數是阻塞方式調用的,一定要subprocess運行結束才能返回數據。

3. python中join如果加在列表下面,是對列表的阻塞還是列表裡面子線程的阻塞

t.join會等待這個t退出後才繼續運行,因為t.join是運行在主線程中,因此會阻塞主線程,即阻塞整個for循環。只有t.join的線程退出後才會繼續執行下一個for循環。在主線程阻塞期間,子線程不會被阻塞,依然會繼續運行。

4. python多進程中隊列不空時阻塞,求解為什麼

最近接觸一個項目,要在多個虛擬機中運行任務,參考別人之前項目的代碼,採用了多進程來處理,於是上網查了查python中的多進程

一、先說說Queue(隊列對象)

Queue是python中的標准庫,可以直接import 引用,之前學習的時候有聽過著名的「先吃先拉」與「後吃先吐」,其實就是這里說的隊列,隊列的構造的時候可以定義它的容量,別吃撐了,吃多了,就會報錯,構造的時候不寫或者寫個小於1的數則表示無限多

import Queue

q = Queue.Queue(10)

向隊列中放值(put)

q.put(『yang')

q.put(4)

q.put([『yan','xing'])

在隊列中取值get()

默認的隊列是先進先出的

>>> q.get()
『yang'
>>> q.get()
4
>>> q.get()
[『yan', 『xing']

當一個隊列為空的時候如果再用get取則會堵塞,所以取隊列的時候一般是用到

get_nowait()方法,這種方法在向一個空隊列取值的時候會拋一個Empty異常

所以更常用的方法是先判斷一個隊列是否為空,如果不為空則取值

隊列中常用的方法

Queue.qsize() 返回隊列的大小
Queue.empty() 如果隊列為空,返回True,反之False
Queue.full() 如果隊列滿了,返回True,反之False
Queue.get([block[, timeout]]) 獲取隊列,timeout等待時間
Queue.get_nowait() 相當Queue.get(False)
非阻塞 Queue.put(item) 寫入隊列,timeout等待時間
Queue.put_nowait(item) 相當Queue.put(item, False)

二、multiprocessing中使用子進程概念

from multiprocessing import Process

可以通過Process來構造一個子進程

p = Process(target=fun,args=(args))

再通過p.start()來啟動子進程

再通過p.join()方法來使得子進程運行結束後再執行父進程

from multiprocessing import Process
import os

# 子進程要執行的代碼
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'

上面的程序運行後的結果其實是按照上圖中1,2,3分開進行的,先列印1,3秒後列印2,再3秒後列印3

代碼中的p.close()是關掉進程池子,是不再向裡面添加進程了,對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之後就不能繼續添加新的Process了。

當時也可以是實例pool的時候給它定義一個進程的多少

如果上面的代碼中p=Pool(5)那麼所有的子進程就可以同時進行

三、多個子進程間的通信

多個子進程間的通信就要採用第一步中說到的Queue,比如有以下的需求,一個子進程向隊列中寫數據,另外一個進程從隊列中取數據,

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 寫數據進程執行的代碼:
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())

# 讀數據進程執行的代碼:
def read(q):
while True:
if not q.empty():
value = q.get(True)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break

if __name__=='__main__':
# 父進程創建Queue,並傳給各個子進程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子進程pw,寫入:
pw.start()
# 等待pw結束:
pw.join()
# 啟動子進程pr,讀取:
pr.start()
pr.join()
# pr進程里是死循環,無法等待其結束,只能強行終止:
print
print '所有數據都寫入並且讀完'

四、關於上面代碼的幾個有趣的問題

if __name__=='__main__':
# 父進程創建Queue,並傳給各個子進程:
q = Queue()
p = Pool()
pw = p.apply_async(write,args=(q,))
pr = p.apply_async(read,args=(q,))
p.close()
p.join()

print
print '所有數據都寫入並且讀完'

如果main函數寫成上面的樣本,本來我想要的是將會得到一個隊列,將其作為參數傳入進程池子里的每個子進程,但是卻得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的錯誤,查了下,大意是隊列對象不能在父進程與子進程間通信,這個如果想要使用進程池中使用隊列則要使用multiprocess的Manager類

if __name__=='__main__':
manager = multiprocessing.Manager()
# 父進程創建Queue,並傳給各個子進程:
q = manager.Queue()
p = Pool()
pw = p.apply_async(write,args=(q,))
time.sleep(0.5)
pr = p.apply_async(read,args=(q,))
p.close()
p.join()

print
print '所有數據都寫入並且讀完'

這樣這個隊列對象就可以在父進程與子進程間通信,不用池則不需要Manager,以後再擴展multiprocess中的Manager類吧

關於鎖的應用,在不同程序間如果有同時對同一個隊列操作的時候,為了避免錯誤,可以在某個函數操作隊列的時候給它加把鎖,這樣在同一個時間內則只能有一個子進程對隊列進行操作,鎖也要在manager對象中的鎖

#coding:gbk

from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random

# 寫數據進程執行的代碼:
def write(q,lock):
lock.acquire() #加上鎖
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
lock.release() #釋放鎖

# 讀數據進程執行的代碼:
def read(q):
while True:
if not q.empty():
value = q.get(False)
print 'Get %s from queue.' % value
time.sleep(random.random())
else:
break

if __name__=='__main__':
manager = multiprocessing.Manager()
# 父進程創建Queue,並傳給各個子進程:
q = manager.Queue()
lock = manager.Lock() #初始化一把鎖
p = Pool()
pw = p.apply_async(write,args=(q,lock))
pr = p.apply_async(read,args=(q,))
p.close()
p.join()

print
print '所有數據都寫入並且讀完'

熱點內容
app什麼情況下找不到伺服器 發布:2025-05-12 15:46:25 瀏覽:713
php跳過if 發布:2025-05-12 15:34:29 瀏覽:466
不定時演算法 發布:2025-05-12 15:30:16 瀏覽:129
c語言延時1ms程序 發布:2025-05-12 15:01:30 瀏覽:163
動物園靈長類動物配置什麼植物 發布:2025-05-12 14:49:59 瀏覽:732
wifi密碼設置什麼好 發布:2025-05-12 14:49:17 瀏覽:147
三位數乘兩位數速演算法 發布:2025-05-12 13:05:48 瀏覽:396
暴風影音緩存在哪裡 發布:2025-05-12 12:42:03 瀏覽:539
access資料庫exe 發布:2025-05-12 12:39:04 瀏覽:627
五開的配置是什麼 發布:2025-05-12 12:36:37 瀏覽:363