python串口通信
1. python 如何防止串口通信失敗
Python中串口出現異常通常有:1.打開串口時,串口不存在,2.寫串帆答口時,3.讀串口時。這幾個異常是經常會碰到培宏的(有經驗的人就深有體會),一旦異常出現了,整個程序很可能會因此就運行不下去了。避免因為這些異常的出現而導致程序死機的方法是對這些可能存在的異常進行態中慧捕捉。舉一個例子:
try:
ComDev.read(1)
print "read Com ok!"
except:
print "read Com error!"
上面的代碼意思是:對ComDev這個串口對象讀取一個位元組,如果讀成功,就接著執行print "read Com ok!"而不執行except以下的語句,如果讀出現異常,就執行print "read Com error"而不執行
print "read Com ok!"
當然系統還會拋出異常信息,只是我這里沒有進行接收,個人覺得很多異常不必接收其信息。
2. python3.5 讀取串口中的數據怎麼解碼
1、安裝串口庫
2、採用默認通信參數
import serial
t = serial.Serial('com1',9600)
n = t.write('you are my world')print t.portstrprint n
str = t.read(n)print str
3、可以自己設置參數
import timeimport serialser = serial.Serial( #下面這些參數根據情況修改 port='COM1', baudrate=9600, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_TWO, bytesize=serial.SEVENBITS)data = ''while ser.inWaiting() > 0: data += ser.read(1)if data != '': print data
注意:Python與多個串口通信的時候,要確定埠號。如果有時間的話,可以自己寫一個查詢所有埠的信息,不想這么麻煩的話,下載個串口助手,自己看埠信息,在py文件中修改serial.Serial()裡面的埠號。
3. 調用python模塊串口的write寫的數據與串口實際寫的數
1、寫入的數據長度不一致。在使用Python的`serial`模散旅汪塊進行串口通信時,使用`write`函數寫入數據時需要指定寫入數據的長度,如果指定的長度與實際寫入的數據長度不一致,就會導致寫入的數據與實際寫入的數據不同。
2、串口參數設置不正確。在使用Python的`serial`模塊進行串口通信時,需要設置鎮緩串口的通信參數,如波特率、數據位、校驗位、停止位等。如果設置不正確,就會導致寫入的數據與實際寫入的數據不同。
3、串口通信過程中出現錯誤。在串口通信過程中,可能會出現一些錯誤,如數據丟失、數據重復等問題,這也會導致寫入的數據與實際寫入的數據不同。沖仔
4. Python如何進行多串口通信一個串口控制電機 一個串口採集數據
下載 pyserial包
def OpenCom(self,*args): #設置埠和波特率 selComPort =『com2』 #波特率 selBaudRate =9600 #奇偶校驗 selParity = 'N' try: if(not self.mySerial): self.mySerial = serial.Serial(port=selComPort, baudrate=selBaudRate,bytesize=8,parity=selParity,stopbits=1,timeout=5) else: if(self.mySerial.isOpen()): self.mySerial.close() self.mySerial = serial.Serial(port=selComPort, baudrate=selBaudRate, bytesize=8, parity=selParity, stopbits=1, timeout=5) self.lblInfo['text'] = '打開成功!' except Exception as ex: self.lblInfo['text'] = '打開失敗!'
#使用com口發送modbus協議給終端設備。
def btnEmId_Click(self):
barray = bytearray([0x05, 0x03, 0xA#, 0x54, 0x00, 0x08])
vOldEmId = self.txbOldEmId.get()
vNewEmId = self.txbNewEmId.get()
barray[0] = int(vOldEmId)
barray[5] = int(vNewEmId)
#crc校驗
strInput = utils.crc16_append(barray)
print(barray)
n = self.mySerial.write(barray)
if(n > 0):
str = self.mySerial.readall()
self.lblInfo['text'] = 'success!'
# for s in str:
# print (hex(s))
else:
self.lblInfo['text'] = 'error!'
5. python - serial communication(串口通信)
由於測試工作的需要,在C端產品上經常使用串口進行通信,而測試腳本大部分時候又採用python編寫,於是就不得不了解並熟悉python下的串口通信實現方法了,整理如下以備隨時使用:
一、說明
pyserial封裝了python環境下對串口的訪問,其兼容各種平台,並有統一的操作介面。通過python屬性訪問串口設置,並可對串口的各種配置參數(如串口名,波特率、停止校驗位、流控、超時等等)做修改,再進行串口通信的類與介面封裝後,非常方便地被調用和移植。
二、模塊安裝
pip insatll pyserial
三、初始化與參數說明
import serial
ser = serial.Serial('COM3', 115200, timeout=0.5, ....................)
下面看看 serial.Serial 原生類
四、不同平台下初始化
ser=serial.Serial("/dev/ttyUSB0",9600,timeout=0.5)#使用USB連接串列口ser=serial.Serial("/dev/ttyAMA0",9600,timeout=0.5)#使用樹莓派的GPIO口連接串列口ser=serial.Serial(1,9600,timeout=0.5)#winsows系統使用COM1口連接串列口ser=serial.Serial("COM1",9600,timeout=0.5)#winsows系統使用COM1口連接串列口ser=serial.Serial("/dev/ttyS1",9600,timeout=0.5)#Linux系統使用COM1口連接串列口
五、串口屬性
ser.name #串口名稱
ser.port #埠號
ser.baudrate #波特率
ser.bytesize #位元組大小
ser.parity #校驗位N-無校驗,E-偶校驗,O-奇校驗
ser.stopbits #停止位
ser.timeout #讀超時設置
ser.writeTimeout #寫超時
ser.xonxoff #軟體流控
ser.rtscts #硬體流控
ser.dsrdtr #硬體流控
ser.interCharTimeout #字元間隔超時
六、串口常用方法
isOpen():查看埠是否被打開。
open() :打開埠『。
close():關閉埠。
read(size=1):從埠讀位元組數據。默認1個位元組。
read_all():從埠接收全部數據。
write(data):向埠寫數據。
readline():讀一行數據。
readlines():讀多行數據。
in_waiting():返回輸入緩存中的位元組數。
out_waiting():返回輸出緩存中的位元組數。
flush():等待所有數據寫出。
flushInput():丟棄接收緩存中的所有數據。
flushOutput():終止當前寫操作,並丟棄發送緩存中的數據。
sendBreadk(ration=0.25):發送BREAK條件,並於ration時間之後返回IDLE
setBreak(level=True):根據level設置break條件。
setRTS(level=True):設置請求發送(RTS)的控制信號
setDTR(level=True):設置數據終端准備就緒的控制信號
七、類與介面封裝
import time
import serial
import serial.tools.list_ports
# 串口操作類
class serialCommunication(object):
def __init__(self, port, bps, timeout):# 可配置更多參數
port_list =self.show_usable_com()
if len(port_list) >0:
if portnot in port_list:
self.port = port_list[0]
else:
self.port = port
else:
print("no usable serial, please plugin your serial board")
return
self.bps = bps
self.timeout = timeout
try:
# 初始化串口,並得到串口對象,根據需要可拓展更多參數
self.ser = serial.Serial(self.port, self.bps, 8, 'N', 1, timeout=self.timeout, write_timeout=self.timeout)
except Exception as e:# 拋出異常
print("Exception={}".format(e))
# 顯示可用串口列表
@staticmethod
def show_usable_com():
serialport_list = []
portInfo_list =list(serial.tools.list_ports.comports())
if len(portInfo_list) <=0:
print("can not find any serial port!")
else:
print(portInfo_list)
for i in range(len(portInfo_list)):
plist =list(portInfo_list[i])
print(plist)
serialport_list.append(plist[0])
print(serialport_list)
return serialport_list
# 輸出串口基本信息
def serial_infor(self):
print(self.ser.name)# 設備名字
print(self.ser.port)# 讀或者寫埠
print(self.ser.baudrate)# 波特率
print(self.ser.bytesize)# 位元組大小
print(self.ser.parity)# 校驗位
print(self.ser.stopbits)# 停止位
print(self.ser.timeout)# 讀超時設置
print(self.ser.writeTimeout)# 寫超時
print(self.ser.xonxoff)# 軟體流控
print(self.ser.rtscts)# 軟體流控
print(self.ser.dsrdtr)# 硬體流控
print(self.ser.interCharTimeout)# 字元間隔超時
# 打開串口
def serial_open(self):
try:
if not self.ser.isOpen():
self.ser.open()
except Exception as e:# 拋出異常
print("serial_open Exception={}".format(e))
self.ser.close()
# 讀取指定大小的數據
# 從串口讀size個位元組。如果指定超時,則可能在超時後返回較少的位元組;如果沒有指定超時,則會一直等到收完指定的位元組數。
def serial_read_with_size(self, size):
try:
self.serial_open()
return self.ser.read(size).decode("utf-8")
except Exception as e:
print("serial_read_all Exception={}".format(e))
self.ser.close()
# 讀取當前串口緩存中的所有數據
def serial_read_data(self):
try:
self.serial_open()
datalen =self.ser.inWaiting()
if datalen ==0:
return None
return self.ser.read(datalen).decode("utf-8")
except Exception as e:
print("serial_read_data Exception={}".format(e))
self.ser.close()
# 讀串口全部數據,注意timeout的設置
# 在設定的timeout時間范圍內,如果讀取的位元組數據是有效的(就是非空)那就直接返回,
# 否則一直會等到設定的timeout時間並返回這段時間所讀的全部位元組數據。
def serial_read_all(self):
try:
self.serial_open()
return self.ser.read_all().decode("utf-8")
except Exception as e:
print("serial_read_all Exception={}".format(e))
self.ser.close()
# 讀一行數據
# 使用readline()時應該注意:打開串口時應該指定超時,否則如果串口沒有收到新行,則會一直等待。
# 如果沒有超時,readline會報異常。
def serial_read_line(self):
try:
self.serial_open()
return self.ser.readline().decode("utf-8")
except Exception as e:
print("serial_read_line Exception={}".format(e))
self.ser.close()
# 讀多行數據,返回行列表
def serial_read_lines(self):
try:
self.serial_open()
return self.ser.readlines().decode("utf-8")
except Exception as e:
print("serial_read_lines Exception={}".format(e))
self.ser.close()
# 寫數據
def serial_write_data(self, data):
try:
self.serial_open()
self.ser.flushOutput()
data_len =self.ser.write(data.encode('utf-8'))
return data_len
except Exception as e:
print("serial_write_data Exception={}".format(e))
return 0
# 寫行數據,注意參數差異
def serial_write_lines(self, lines):
self.ser.writelines(lines)
# 清除串口緩存
def serial_clean(self):
try:
if self.ser.isOpen():
self.ser.flush()
except Exception as e:
print("serial_clean Exception={}".format(e))
# 關閉串口
def serial_close(self):
try:
if self.ser.isOpen():
self.ser.close()
except Exception as e:
print("serial_clean Exception={}".format(e))
if __name__ =='__main__':
testSerial = serialCommunication("COM10", 1500000, 0.5)
testSerial.serial_open()
testSerial.serial_infor()
testSerial.serial_write_data("ifconfig eth0\n")
time.sleep(0.1)
data = testSerial.serial_read_all()
print(data)
testSerial.serial_close()
八、其他
1)ser.VERSION表示pyserial版本; 另外,ser.name表示設備名稱
2)埠設置可以被讀入字典,也可從字典載入設置:
getSettingDict():返回當前串口設置的字典
applySettingDict(d):應用字典到串口設置
3) Readline()是讀一行,以/n結束,要是沒有/n就一直讀,阻塞。注意:打開串口時應該指定超時,否則如果串口沒有收到新行,則會一直等待。
4)serial.read_all 與 serial.read_all()區別
serial.read_all:讀取串口所有的參數信息
serial.read_all():超時時間內從串口讀取的所有數據
5) 異常信息
exception serial.SerialException
exception serial.SerialTimeoutException
6. python中Pyserial如何實現RS485串口通訊
RS485 的數據線要交叉才能通訊,如果你是兩台電腦通訊測試,另外一台要有返回才行。就好像你給人家說話,人家聽到了重復一遍你才能聽到。不然就是單向傳輸,收不到任何回應。
7. plc與python之間實現通信
python語言與plc建立串口通信時,無法直接讀寫。
用到python的serial 模塊
在創建串口對象時需要定義其屬性與plc一致:
self.main_engine.baudrate = self.bps
self.main_engine.bytesize = 7
self.main_engine.parity = 'E'
self.main_engine.stopbits = 2
重點:在寫入數據後,需要寫16進制數"0D",結束標志位
self.main_engine.write("@00WD1010000152*".encode())
self.main_engine.write(binascii.a2b_hex("0D"))
8. 如何用python實現串口通信
Python非常適合寫一些測試的腳本,如快速的串口通信測試等。如果使用VC++ QT開發,可能用時較多,使用python,如果掌握使用方法,可以直接讀寫測試,配合設備或是串口助手,很快驗證與實現。
Python有沒有現成的串口API直接調用呢?經過實踐驗證,需要安裝一個叫 Pyserial的組件即可。這個可以在github上下載。
在windows 7 64bit 上可以使用嗎?當然可以使用,我安裝的python3.5為64位的。把下載後的文件,其中有一個serial的文件夾,拷貝到python35安裝路徑, C:\Python35\Lib\site-packages\serial
網上可以搜一下windows的安裝包,安裝完也是:C:\Python35\Lib\site-packages\serial ,可以用最新的版本,替換即可。
測試的方法:在python IDE里測試:
>>> import serial
這里如果報錯,是python版本與pyserial版本沒有配合好。如果正常,不返回,即可以導入serial模塊。
>>> ser=serial.Serial("COM5",115200)
這里為COM5,115200的波特率。如果打不開,請檢查安裝環境。
>>> ser.write('hello,serial test'.encode())
17
發送測試(如果返回位元組數,說明返回成功),這里需要轉換一個編碼為位元組。
以上測試,可以使用現在的設備或是串口助手,如安裝Virtual Serial Port Driver 7.2 虛擬串口軟體,設置一對串口,進行自發自收的測試。
>>> print(ser.readline())
b'abcdefg\r\n'
這里是串口接收,有接收的超時。設備或是串口助手發送一個字元串,以回車換行結束,這里就可以收到列印出來。
也可以用ser.read(),這里只接收一個字元來實現。
上面已經實現了基本的串口操作。
關閉串口為:
>>> ser.close()
如果使用python,一般寫個py文件,就像windows bat 批處理一樣,這是python強大的地方。如果寫一個py腳本呢?其實只要把上面的命令,一條條寫下來,就是一個腳本,測試如下:
import serialser=serial.Serial("COM5",115200,timeout=0.5)for i in range(0,100-1):ser.write('hello\r\n'.encode())print(ser.readline());ser.close()
9. 如何用python寫個串口通信的程序
使用 pyserial 就可以處理串口通信,這個包是跨平台的。
http://pyserial.sourceforge.net/
示常式序在這里:
https://pyserial.readthedocs.io/en/latest/examples.html#wxpython-examples
importserial
#創建serial實例
serialport=serial.Serial()
serialport.port='COM1'
serialport.baudrate=9600
serialport.parity='N'
serialport.bytesize=8
serialport.stopbits=1
serialport.timeout=0.6
try:
serialport.open()
serialport.setDTR(True)
serialport.setRTS(True)
exceptException,ex:
printex
#發送數據
serialport.write(raw_data)
#根據項目要求,可以開一個線程掃描接收數據
10. 如何用python寫個串口通信的程序
就是打開串口後,啟動一個線程來監聽串口數據的進入,有數據時,就做數據的處理(也可以發送一個事件,並攜帶接收到的數據)。
我沒有用到串口處理太深的東西。
客戶的原程序不能給你,不過我給你改一下吧。
裡面的一些東西,已經經過了處理,要運行,可能你要自己改一下,把沒有用的東西去掉。
我這里已經沒有串口設備了,不能調了,你自己處理一下吧,不過基本的東西已經有了。
=================================================================
#coding=gb18030
import sys,threading,time;
import serial;
import binascii,encodings;
import re;
import socket;
class ReadThread:
def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):
self.l_serial = None;
self.alive = False;
self.waitEnd = None;
self.bFirstMethod = i_FirstMethod;
self.sendport = '';
self.log = Log;
self.output = Output;
self.port = Port;
self.re_num = None;
def waiting(self):
if not self.waitEnd is None:
self.waitEnd.wait();
def SetStopEvent(self):
if not self.waitEnd is None:
self.waitEnd.set();
self.alive = False;
self.stop();
def start(self):
self.l_serial = serial.Serial();
self.l_serial.port = self.port;
self.l_serial.baudrate = 9600;
self.l_serial.timeout = 2;
self.re_num = re.compile('\d');
try:
if not self.output is None:
self.output.WriteText(u'打開通訊埠\r\n');
if not self.log is None:
self.log.info(u'打開通訊埠');
self.l_serial.open();
except Exception, ex:
if self.l_serial.isOpen():
self.l_serial.close();
self.l_serial = None;
if not self.output is None:
self.output.WriteText(u'出錯:\r\n %s\r\n' % ex);
if not self.log is None:
self.log.error(u'%s' % ex);
return False;
if self.l_serial.isOpen():
if not self.output is None:
self.output.WriteText(u'創建接收任務\r\n');
if not self.log is None:
self.log.info(u'創建接收任務');
self.waitEnd = threading.Event();
self.alive = True;
self.thread_read = None;
self.thread_read = threading.Thread(target=self.FirstReader);
self.thread_read.setDaemon(1);
self.thread_read.start();
return True;
else:
if not self.output is None:
self.output.WriteText(u'通訊埠未打開\r\n');
if not self.log is None:
self.log.info(u'通訊埠未打開');
return False;
def InitHead(self):
#串口的其它的一些處理
try:
time.sleep(3);
if not self.output is None:
self.output.WriteText(u'數據接收任務開始連接網路\r\n');
if not self.log is None:
self.log.info(u'數據接收任務開始連接網路');
self.l_serial.flushInput();
self.l_serial.write('\x11');
data1 = self.l_serial.read(1024);
except ValueError,ex:
if not self.output is None:
self.output.WriteText(u'出錯:\r\n %s\r\n' % ex);
if not self.log is None:
self.log.error(u'%s' % ex);
self.SetStopEvent();
return;
if not self.output is None:
self.output.WriteText(u'開始接收數據\r\n');
if not self.log is None:
self.log.info(u'開始接收數據');
self.output.WriteText(u'===================================\r\n');
def SendData(self, i_msg):
lmsg = '';
isOK = False;
if isinstance(i_msg, unicode):
lmsg = i_msg.encode('gb18030');
else:
lmsg = i_msg;
try:
#發送數據到相應的處理組件
pass
except Exception, ex:
pass;
return isOK;
def FirstReader(self):
data1 = '';
isQuanJiao = True;
isFirstMethod = True;
isEnd = True;
readCount = 0;
saveCount = 0;
RepPos = 0;
#read Head Infor content
self.InitHead();
while self.alive:
try:
data = '';
n = self.l_serial.inWaiting();
if n:
data = data + self.l_serial.read(n);
#print binascii.b2a_hex(data),
for l in xrange(len(data)):
if ord(data[l])==0x8E:
isQuanJiao = True;
continue;
if ord(data[l])==0x8F:
isQuanJiao = False;
continue;
if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:
if len(data1)>10:
if not self.re_num.search(data1,1) is None:
saveCount = saveCount + 1;
if RepPos==0:
RepPos = self.output.GetInsertionPoint();
self.output.Remove(RepPos,self.output.GetLastPosition());
self.SendData(data1);
data1 = '';
continue;
except Exception, ex:
if not self.log is None:
self.log.error(u'%s' % ex);
self.waitEnd.set();
self.alive = False;
def stop(self):
self.alive = False;
self.thread_read.join();
if self.l_serial.isOpen():
self.l_serial.close();
if not self.output is None:
self.output.WriteText(u'關閉通迅埠:[%d] \r\n' % self.port);
if not self.log is None:
self.log.info(u'關閉通迅埠:[%d]' % self.port);
def printHex(self, s):
s1 = binascii.b2a_hex(s);
print s1;
#測試用部分
if __name__ == '__main__':
rt = ReadThread();
f = open("sendport.cfg", "r")
rt.sendport = f.read()
f.close()
try:
if rt.start():
rt.waiting();
rt.stop();
else:
pass;
except Exception,se:
print str(se);
if rt.alive:
rt.stop();
print 'End OK .';
del rt;