import serial import datetime import time class SerialPort(): def __init__(self,PortName,BaudRate=115200,TimeOut=6): try: self.port = serial.Serial(PortName, baudrate=BaudRate,timeout=TimeOut) except: raise RuntimeError("Open serial port error: %s", PortName) def close(self): self.port.flush() self.port.close() def sendcmd(self, cmd): if self.port.is_open: self.port.flushInput() self.port.flushOutput() print datetime.datetime.now().strftime("\n%Y-%m-%d %H:%M:%S.%f"), ' ', cmd, self.port.write(cmd + '\n') else: raise RuntimeError("Cmd send error, port not open: %s", self.port.name) def sendcmd_bybyte(self, cmd,interval=0.020): if self.port.is_open: self.port.flushInput() self.port.flushOutput() print datetime.datetime.now().strftime("\n%Y-%m-%d %H:%M:%S.%f"), ' ', cmd, for sc in cmd: self.port.write(sc) time.sleep(interval) self.port.write("\n") else: raise RuntimeError("Cmd send error, port not open: %s", self.port.name) def readResponse(self,terminator=']'): res = self.port.read_until(terminator) print datetime.datetime.now().strftime("\n%Y-%m-%d %H:%M:%S.%f"), ' ', res, if terminator in res: return res else: raise RuntimeError("Error: timeout %s",res) def send_read(self,cmd,terminator=']'): self.read_existing() self.sendcmd(cmd) return self.readResponse(terminator) def read_existing(self): # res = self.port.read(self.port.in_waiting) res = self.port.read_all() if res != "": print datetime.datetime.now().strftime("\n%Y-%m-%d %H:%M:%S.%f"), ' ', res, return res def setTimeout(self,timeOut=6): self.port.timeout = timeOut