python串口通信
1. python 如何防止串口通信失败
Python中串口出现异常通常有:1.打开串口时,串口不存在,2.写串帆答口时,3.读串口时。这几个异常是经常会碰到培宏的(有经验的人就深有体会),一旦异常出现了,整个程序很可能会因此就运行不下去了。避免因为这些异常的出现而导致程序死机的方法是对这些可能存在的异常进行态中慧捕捉。举一个例子:
try:
ComDev.read(1)
print "read Com ok!"
except:
print "read Com error!"
上面的代码意思是:对ComDev这个串口对象读取一个字节,如果读成功,就接着执行print "read Com ok!"而不执行except以下的语句,如果读出现异常,就执行print "read Com error"而不执行
print "read Com ok!"
当然系统还会抛出异常信息,只是我这里没有进行接收,个人觉得很多异常不必接收其信息。
2. python3.5 读取串口中的数据怎么解码
1、安装串口库
2、采用默认通信参数
import serial
t = serial.Serial('com1',9600)
n = t.write('you are my world')print t.portstrprint n
str = t.read(n)print str
3、可以自己设置参数
import timeimport serialser = serial.Serial( #下面这些参数根据情况修改 port='COM1', baudrate=9600, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_TWO, bytesize=serial.SEVENBITS)data = ''while ser.inWaiting() > 0: data += ser.read(1)if data != '': print data
注意:Python与多个串口通信的时候,要确定端口号。如果有时间的话,可以自己写一个查询所有端口的信息,不想这么麻烦的话,下载个串口助手,自己看端口信息,在py文件中修改serial.Serial()里面的端口号。
3. 调用python模块串口的write写的数据与串口实际写的数
1、写入的数据长度不一致。在使用Python的`serial`模散旅汪块进行串口通信时,使用`write`函数写入数据时需要指定写入数据的长度,如果指定的长度与实际写入的数据长度不一致,就会导致写入的数据与实际写入的数据不同。
2、串口参数设置不正确。在使用Python的`serial`模块进行串口通信时,需要设置镇缓串口的通信参数,如波特率、数据位、校验位、停止位等。如果设置不正确,就会导致写入的数据与实际写入的数据不同。
3、串口通信过程中出现错误。在串口通信过程中,可能会出现一些错误,如数据丢失、数据重复等问题,这也会导致写入的数据与实际写入的数据不同。冲仔
4. Python如何进行多串口通信一个串口控制电机 一个串口采集数据
下载 pyserial包
def OpenCom(self,*args): #设置端口和波特率 selComPort =‘com2’ #波特率 selBaudRate =9600 #奇偶校验 selParity = 'N' try: if(not self.mySerial): self.mySerial = serial.Serial(port=selComPort, baudrate=selBaudRate,bytesize=8,parity=selParity,stopbits=1,timeout=5) else: if(self.mySerial.isOpen()): self.mySerial.close() self.mySerial = serial.Serial(port=selComPort, baudrate=selBaudRate, bytesize=8, parity=selParity, stopbits=1, timeout=5) self.lblInfo['text'] = '打开成功!' except Exception as ex: self.lblInfo['text'] = '打开失败!'
#使用com口发送modbus协议给终端设备。
def btnEmId_Click(self):
barray = bytearray([0x05, 0x03, 0xA#, 0x54, 0x00, 0x08])
vOldEmId = self.txbOldEmId.get()
vNewEmId = self.txbNewEmId.get()
barray[0] = int(vOldEmId)
barray[5] = int(vNewEmId)
#crc校验
strInput = utils.crc16_append(barray)
print(barray)
n = self.mySerial.write(barray)
if(n > 0):
str = self.mySerial.readall()
self.lblInfo['text'] = 'success!'
# for s in str:
# print (hex(s))
else:
self.lblInfo['text'] = 'error!'
5. python - serial communication(串口通信)
由于测试工作的需要,在C端产品上经常使用串口进行通信,而测试脚本大部分时候又采用python编写,于是就不得不了解并熟悉python下的串口通信实现方法了,整理如下以备随时使用:
一、说明
pyserial封装了python环境下对串口的访问,其兼容各种平台,并有统一的操作接口。通过python属性访问串口设置,并可对串口的各种配置参数(如串口名,波特率、停止校验位、流控、超时等等)做修改,再进行串口通信的类与接口封装后,非常方便地被调用和移植。
二、模块安装
pip insatll pyserial
三、初始化与参数说明
import serial
ser = serial.Serial('COM3', 115200, timeout=0.5, ....................)
下面看看 serial.Serial 原生类
四、不同平台下初始化
ser=serial.Serial("/dev/ttyUSB0",9600,timeout=0.5)#使用USB连接串行口ser=serial.Serial("/dev/ttyAMA0",9600,timeout=0.5)#使用树莓派的GPIO口连接串行口ser=serial.Serial(1,9600,timeout=0.5)#winsows系统使用COM1口连接串行口ser=serial.Serial("COM1",9600,timeout=0.5)#winsows系统使用COM1口连接串行口ser=serial.Serial("/dev/ttyS1",9600,timeout=0.5)#Linux系统使用COM1口连接串行口
五、串口属性
ser.name #串口名称
ser.port #端口号
ser.baudrate #波特率
ser.bytesize #字节大小
ser.parity #校验位N-无校验,E-偶校验,O-奇校验
ser.stopbits #停止位
ser.timeout #读超时设置
ser.writeTimeout #写超时
ser.xonxoff #软件流控
ser.rtscts #硬件流控
ser.dsrdtr #硬件流控
ser.interCharTimeout #字符间隔超时
六、串口常用方法
isOpen():查看端口是否被打开。
open() :打开端口‘。
close():关闭端口。
read(size=1):从端口读字节数据。默认1个字节。
read_all():从端口接收全部数据。
write(data):向端口写数据。
readline():读一行数据。
readlines():读多行数据。
in_waiting():返回输入缓存中的字节数。
out_waiting():返回输出缓存中的字节数。
flush():等待所有数据写出。
flushInput():丢弃接收缓存中的所有数据。
flushOutput():终止当前写操作,并丢弃发送缓存中的数据。
sendBreadk(ration=0.25):发送BREAK条件,并于ration时间之后返回IDLE
setBreak(level=True):根据level设置break条件。
setRTS(level=True):设置请求发送(RTS)的控制信号
setDTR(level=True):设置数据终端准备就绪的控制信号
七、类与接口封装
import time
import serial
import serial.tools.list_ports
# 串口操作类
class serialCommunication(object):
def __init__(self, port, bps, timeout):# 可配置更多参数
port_list =self.show_usable_com()
if len(port_list) >0:
if portnot in port_list:
self.port = port_list[0]
else:
self.port = port
else:
print("no usable serial, please plugin your serial board")
return
self.bps = bps
self.timeout = timeout
try:
# 初始化串口,并得到串口对象,根据需要可拓展更多参数
self.ser = serial.Serial(self.port, self.bps, 8, 'N', 1, timeout=self.timeout, write_timeout=self.timeout)
except Exception as e:# 抛出异常
print("Exception={}".format(e))
# 显示可用串口列表
@staticmethod
def show_usable_com():
serialport_list = []
portInfo_list =list(serial.tools.list_ports.comports())
if len(portInfo_list) <=0:
print("can not find any serial port!")
else:
print(portInfo_list)
for i in range(len(portInfo_list)):
plist =list(portInfo_list[i])
print(plist)
serialport_list.append(plist[0])
print(serialport_list)
return serialport_list
# 输出串口基本信息
def serial_infor(self):
print(self.ser.name)# 设备名字
print(self.ser.port)# 读或者写端口
print(self.ser.baudrate)# 波特率
print(self.ser.bytesize)# 字节大小
print(self.ser.parity)# 校验位
print(self.ser.stopbits)# 停止位
print(self.ser.timeout)# 读超时设置
print(self.ser.writeTimeout)# 写超时
print(self.ser.xonxoff)# 软件流控
print(self.ser.rtscts)# 软件流控
print(self.ser.dsrdtr)# 硬件流控
print(self.ser.interCharTimeout)# 字符间隔超时
# 打开串口
def serial_open(self):
try:
if not self.ser.isOpen():
self.ser.open()
except Exception as e:# 抛出异常
print("serial_open Exception={}".format(e))
self.ser.close()
# 读取指定大小的数据
# 从串口读size个字节。如果指定超时,则可能在超时后返回较少的字节;如果没有指定超时,则会一直等到收完指定的字节数。
def serial_read_with_size(self, size):
try:
self.serial_open()
return self.ser.read(size).decode("utf-8")
except Exception as e:
print("serial_read_all Exception={}".format(e))
self.ser.close()
# 读取当前串口缓存中的所有数据
def serial_read_data(self):
try:
self.serial_open()
datalen =self.ser.inWaiting()
if datalen ==0:
return None
return self.ser.read(datalen).decode("utf-8")
except Exception as e:
print("serial_read_data Exception={}".format(e))
self.ser.close()
# 读串口全部数据,注意timeout的设置
# 在设定的timeout时间范围内,如果读取的字节数据是有效的(就是非空)那就直接返回,
# 否则一直会等到设定的timeout时间并返回这段时间所读的全部字节数据。
def serial_read_all(self):
try:
self.serial_open()
return self.ser.read_all().decode("utf-8")
except Exception as e:
print("serial_read_all Exception={}".format(e))
self.ser.close()
# 读一行数据
# 使用readline()时应该注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。
# 如果没有超时,readline会报异常。
def serial_read_line(self):
try:
self.serial_open()
return self.ser.readline().decode("utf-8")
except Exception as e:
print("serial_read_line Exception={}".format(e))
self.ser.close()
# 读多行数据,返回行列表
def serial_read_lines(self):
try:
self.serial_open()
return self.ser.readlines().decode("utf-8")
except Exception as e:
print("serial_read_lines Exception={}".format(e))
self.ser.close()
# 写数据
def serial_write_data(self, data):
try:
self.serial_open()
self.ser.flushOutput()
data_len =self.ser.write(data.encode('utf-8'))
return data_len
except Exception as e:
print("serial_write_data Exception={}".format(e))
return 0
# 写行数据,注意参数差异
def serial_write_lines(self, lines):
self.ser.writelines(lines)
# 清除串口缓存
def serial_clean(self):
try:
if self.ser.isOpen():
self.ser.flush()
except Exception as e:
print("serial_clean Exception={}".format(e))
# 关闭串口
def serial_close(self):
try:
if self.ser.isOpen():
self.ser.close()
except Exception as e:
print("serial_clean Exception={}".format(e))
if __name__ =='__main__':
testSerial = serialCommunication("COM10", 1500000, 0.5)
testSerial.serial_open()
testSerial.serial_infor()
testSerial.serial_write_data("ifconfig eth0\n")
time.sleep(0.1)
data = testSerial.serial_read_all()
print(data)
testSerial.serial_close()
八、其他
1)ser.VERSION表示pyserial版本; 另外,ser.name表示设备名称
2)端口设置可以被读入字典,也可从字典加载设置:
getSettingDict():返回当前串口设置的字典
applySettingDict(d):应用字典到串口设置
3) Readline()是读一行,以/n结束,要是没有/n就一直读,阻塞。注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。
4)serial.read_all 与 serial.read_all()区别
serial.read_all:读取串口所有的参数信息
serial.read_all():超时时间内从串口读取的所有数据
5) 异常信息
exception serial.SerialException
exception serial.SerialTimeoutException
6. python中Pyserial如何实现RS485串口通讯
RS485 的数据线要交叉才能通讯,如果你是两台电脑通讯测试,另外一台要有返回才行。就好像你给人家说话,人家听到了重复一遍你才能听到。不然就是单向传输,收不到任何回应。
7. plc与python之间实现通信
python语言与plc建立串口通信时,无法直接读写。
用到python的serial 模块
在创建串口对象时需要定义其属性与plc一致:
self.main_engine.baudrate = self.bps
self.main_engine.bytesize = 7
self.main_engine.parity = 'E'
self.main_engine.stopbits = 2
重点:在写入数据后,需要写16进制数"0D",结束标志位
self.main_engine.write("@00WD1010000152*".encode())
self.main_engine.write(binascii.a2b_hex("0D"))
8. 如何用python实现串口通信
Python非常适合写一些测试的脚本,如快速的串口通信测试等。如果使用VC++ QT开发,可能用时较多,使用python,如果掌握使用方法,可以直接读写测试,配合设备或是串口助手,很快验证与实现。
Python有没有现成的串口API直接调用呢?经过实践验证,需要安装一个叫 Pyserial的组件即可。这个可以在github上下载。
在windows 7 64bit 上可以使用吗?当然可以使用,我安装的python3.5为64位的。把下载后的文件,其中有一个serial的文件夹,拷贝到python35安装路径, C:\Python35\Lib\site-packages\serial
网上可以搜一下windows的安装包,安装完也是:C:\Python35\Lib\site-packages\serial ,可以用最新的版本,替换即可。
测试的方法:在python IDE里测试:
>>> import serial
这里如果报错,是python版本与pyserial版本没有配合好。如果正常,不返回,即可以导入serial模块。
>>> ser=serial.Serial("COM5",115200)
这里为COM5,115200的波特率。如果打不开,请检查安装环境。
>>> ser.write('hello,serial test'.encode())
17
发送测试(如果返回字节数,说明返回成功),这里需要转换一个编码为字节。
以上测试,可以使用现在的设备或是串口助手,如安装Virtual Serial Port Driver 7.2 虚拟串口软件,设置一对串口,进行自发自收的测试。
>>> print(ser.readline())
b'abcdefg\r\n'
这里是串口接收,有接收的超时。设备或是串口助手发送一个字符串,以回车换行结束,这里就可以收到打印出来。
也可以用ser.read(),这里只接收一个字符来实现。
上面已经实现了基本的串口操作。
关闭串口为:
>>> ser.close()
如果使用python,一般写个py文件,就像windows bat 批处理一样,这是python强大的地方。如果写一个py脚本呢?其实只要把上面的命令,一条条写下来,就是一个脚本,测试如下:
import serialser=serial.Serial("COM5",115200,timeout=0.5)for i in range(0,100-1):ser.write('hello\r\n'.encode())print(ser.readline());ser.close()
9. 如何用python写个串口通信的程序
使用 pyserial 就可以处理串口通信,这个包是跨平台的。
http://pyserial.sourceforge.net/
示例程序在这里:
https://pyserial.readthedocs.io/en/latest/examples.html#wxpython-examples
importserial
#创建serial实例
serialport=serial.Serial()
serialport.port='COM1'
serialport.baudrate=9600
serialport.parity='N'
serialport.bytesize=8
serialport.stopbits=1
serialport.timeout=0.6
try:
serialport.open()
serialport.setDTR(True)
serialport.setRTS(True)
exceptException,ex:
printex
#发送数据
serialport.write(raw_data)
#根据项目要求,可以开一个线程扫描接收数据
10. 如何用python写个串口通信的程序
就是打开串口后,启动一个线程来监听串口数据的进入,有数据时,就做数据的处理(也可以发送一个事件,并携带接收到的数据)。
我没有用到串口处理太深的东西。
客户的原程序不能给你,不过我给你改一下吧。
里面的一些东西,已经经过了处理,要运行,可能你要自己改一下,把没有用的东西去掉。
我这里已经没有串口设备了,不能调了,你自己处理一下吧,不过基本的东西已经有了。
=================================================================
#coding=gb18030
import sys,threading,time;
import serial;
import binascii,encodings;
import re;
import socket;
class ReadThread:
def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):
self.l_serial = None;
self.alive = False;
self.waitEnd = None;
self.bFirstMethod = i_FirstMethod;
self.sendport = '';
self.log = Log;
self.output = Output;
self.port = Port;
self.re_num = None;
def waiting(self):
if not self.waitEnd is None:
self.waitEnd.wait();
def SetStopEvent(self):
if not self.waitEnd is None:
self.waitEnd.set();
self.alive = False;
self.stop();
def start(self):
self.l_serial = serial.Serial();
self.l_serial.port = self.port;
self.l_serial.baudrate = 9600;
self.l_serial.timeout = 2;
self.re_num = re.compile('\d');
try:
if not self.output is None:
self.output.WriteText(u'打开通讯端口\r\n');
if not self.log is None:
self.log.info(u'打开通讯端口');
self.l_serial.open();
except Exception, ex:
if self.l_serial.isOpen():
self.l_serial.close();
self.l_serial = None;
if not self.output is None:
self.output.WriteText(u'出错:\r\n %s\r\n' % ex);
if not self.log is None:
self.log.error(u'%s' % ex);
return False;
if self.l_serial.isOpen():
if not self.output is None:
self.output.WriteText(u'创建接收任务\r\n');
if not self.log is None:
self.log.info(u'创建接收任务');
self.waitEnd = threading.Event();
self.alive = True;
self.thread_read = None;
self.thread_read = threading.Thread(target=self.FirstReader);
self.thread_read.setDaemon(1);
self.thread_read.start();
return True;
else:
if not self.output is None:
self.output.WriteText(u'通讯端口未打开\r\n');
if not self.log is None:
self.log.info(u'通讯端口未打开');
return False;
def InitHead(self):
#串口的其它的一些处理
try:
time.sleep(3);
if not self.output is None:
self.output.WriteText(u'数据接收任务开始连接网络\r\n');
if not self.log is None:
self.log.info(u'数据接收任务开始连接网络');
self.l_serial.flushInput();
self.l_serial.write('\x11');
data1 = self.l_serial.read(1024);
except ValueError,ex:
if not self.output is None:
self.output.WriteText(u'出错:\r\n %s\r\n' % ex);
if not self.log is None:
self.log.error(u'%s' % ex);
self.SetStopEvent();
return;
if not self.output is None:
self.output.WriteText(u'开始接收数据\r\n');
if not self.log is None:
self.log.info(u'开始接收数据');
self.output.WriteText(u'===================================\r\n');
def SendData(self, i_msg):
lmsg = '';
isOK = False;
if isinstance(i_msg, unicode):
lmsg = i_msg.encode('gb18030');
else:
lmsg = i_msg;
try:
#发送数据到相应的处理组件
pass
except Exception, ex:
pass;
return isOK;
def FirstReader(self):
data1 = '';
isQuanJiao = True;
isFirstMethod = True;
isEnd = True;
readCount = 0;
saveCount = 0;
RepPos = 0;
#read Head Infor content
self.InitHead();
while self.alive:
try:
data = '';
n = self.l_serial.inWaiting();
if n:
data = data + self.l_serial.read(n);
#print binascii.b2a_hex(data),
for l in xrange(len(data)):
if ord(data[l])==0x8E:
isQuanJiao = True;
continue;
if ord(data[l])==0x8F:
isQuanJiao = False;
continue;
if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:
if len(data1)>10:
if not self.re_num.search(data1,1) is None:
saveCount = saveCount + 1;
if RepPos==0:
RepPos = self.output.GetInsertionPoint();
self.output.Remove(RepPos,self.output.GetLastPosition());
self.SendData(data1);
data1 = '';
continue;
except Exception, ex:
if not self.log is None:
self.log.error(u'%s' % ex);
self.waitEnd.set();
self.alive = False;
def stop(self):
self.alive = False;
self.thread_read.join();
if self.l_serial.isOpen():
self.l_serial.close();
if not self.output is None:
self.output.WriteText(u'关闭通迅端口:[%d] \r\n' % self.port);
if not self.log is None:
self.log.info(u'关闭通迅端口:[%d]' % self.port);
def printHex(self, s):
s1 = binascii.b2a_hex(s);
print s1;
#测试用部分
if __name__ == '__main__':
rt = ReadThread();
f = open("sendport.cfg", "r")
rt.sendport = f.read()
f.close()
try:
if rt.start():
rt.waiting();
rt.stop();
else:
pass;
except Exception,se:
print str(se);
if rt.alive:
rt.stop();
print 'End OK .';
del rt;