#!/usr/bin/env python # # This code is running on the host and implements the serial protocol # used to control the display connected to the MCU. # # On the MCU side, the serial protocol is either implemented under Arduino # or under MicroPython. # import serial class ArduinoSerialHAL: """ ArduinoSerialHAL is handles the serial protocol (API) used to control the display connected to the MCU. """ def __init__(self, config): self.port = config['port'] self.baudrate = config['baudrate'] self.ser = None # initialized in reset() self.reset() def readline(self): """ Read output from MCU over the serial link """ if not self.ser.in_waiting: return None line = self.ser.readline() return line def reset(self): """ (Re-)open serial ports and resynchronize the protocol """ if self.ser: print('SerialProtocol: closing serial link') self.ser.close() print('SerialProtocol: opening port {} @ {} baud'.format(self.port, self.baudrate)) self.ser = serial.Serial(self.port, baudrate=self.baudrate, rtscts=True, timeout=0.1, write_timeout=0.5) self.resynchronize_protocol() def resynchronize_protocol(self): """ Resynchronize the protocol by writing a string of zeroes. """ data = bytearray(10) self.ser.write(data) def safe_write(self, data): """ Write data to the serial link and attempt to handle write timeouts """ try: self.ser.write(data) return except serial.SerialTimeoutException: print('SerialProtocol: write timeout, attempting reset..') print('WARN: Serial write timed out, attempting reset') self.reset() print('SerialProtocol: retrying send of {} bytes'.format(len(data))) self.ser.write(data) def init_display(self, num_pixels=256): # Setup FastLED library data = bytearray(3) data[0] = ord('i') data[1] = num_pixels & 0xff data[2] = (num_pixels >> 8) & 0xff self.safe_write(data) def clear_display(self): data = bytearray(2) data[0] = ord('c') self.safe_write(data) def update_display(self, num_modified_pixels=None): data = bytearray(2) data[0] = ord('s') self.safe_write(data) def put_pixel(self, addr, r, g, b): data = bytearray(6) data[0] = ord('l') data[1] = (addr >> 0) & 0xff data[2] = (addr >> 8) & 0xff data[3] = r data[4] = g data[5] = b self.safe_write(data) def set_rtc(self, t): # Resynchronize RTC data = bytearray(5) data[0] = ord('@') t = int(t) data[1] = (t >> 0) & 0xff data[2] = (t >> 8) & 0xff data[3] = (t >> 16) & 0xff data[4] = (t >> 24) & 0xff self.safe_write(data) def set_auto_time(self, enable=True): # Enable or disable automatic rendering of current time data = bytearray(2) data[0] = ord('t') data[1] = int(enable) self.safe_write(data) def suspend_host(self, restart_timeout_seconds): data = bytearray(3) data[0] = ord('S') data[1] = (restart_timeout_seconds >> 0) & 0xff data[2] = (restart_timeout_seconds >> 8) & 0xff self.safe_write(data) if __name__ == '__main__': import os import time port = '/dev/tty.usbmodem575711' if not os.path.exists(port): port = '/dev/ttyACM0' p = SerialProtocol(port, 115200) p.init_display(256) p.clear_display() p.put_pixel(0, 8, 0, 0) p.put_pixel(8, 0, 8, 0) p.put_pixel(16, 0, 0, 8) p.update_display() time.sleep(1) p.clear_display()