| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- import serial
- import datetime
- import time
- class SerialPort():
- def __init__(self,PortName,BaudRate=115200,TimeOut=6):
- try:
- self.port = serial.Serial(PortName, baudrate=BaudRate,timeout=TimeOut)
- except:
- raise RuntimeError("Open serial port error: %s", PortName)
- def close(self):
- self.port.flush()
- self.port.close()
- def sendcmd(self, cmd):
- if self.port.is_open:
- self.port.flushInput()
- self.port.flushOutput()
- print datetime.datetime.now().strftime("\n%Y-%m-%d %H:%M:%S.%f"), ' ', cmd,
- self.port.write(cmd + '\n')
- else:
- raise RuntimeError("Cmd send error, port not open: %s", self.port.name)
- def sendcmd_bybyte(self, cmd,interval=0.020):
- if self.port.is_open:
- self.port.flushInput()
- self.port.flushOutput()
- print datetime.datetime.now().strftime("\n%Y-%m-%d %H:%M:%S.%f"), ' ', cmd,
- for sc in cmd:
- self.port.write(sc)
- time.sleep(interval)
- self.port.write("\n")
- else:
- raise RuntimeError("Cmd send error, port not open: %s", self.port.name)
- def readResponse(self,terminator=']'):
- res = self.port.read_until(terminator)
- print datetime.datetime.now().strftime("\n%Y-%m-%d %H:%M:%S.%f"), ' ', res,
- if terminator in res:
- return res
- else:
- raise RuntimeError("Error: timeout %s",res)
- def send_read(self,cmd,terminator=']'):
- self.read_existing()
- self.sendcmd(cmd)
- return self.readResponse(terminator)
- def read_existing(self):
- # res = self.port.read(self.port.in_waiting)
- res = self.port.read_all()
- if res != "":
- print datetime.datetime.now().strftime("\n%Y-%m-%d %H:%M:%S.%f"), ' ', res,
- return res
- def setTimeout(self,timeOut=6):
- self.port.timeout = timeOut
|