Rewrite to reduce memory usage and add more features
In addition to the clock scene, both the animation scene and the weather scene should now work under MicroPython on devices with 520kBytes of RAM (e.g. LoPy 1, WiPy 2) after: - combating heap fragmentation during initialization by temporarily allocating a large chunk of RAM in the beginning of main.py and freeing it after all modules have been imported and initialized - stream parsing the JSON response from the weather API - converting animations to binary and streaming them from the flash file system (additionally, older ESP8266 modules with 4MB flash have been found working under some circumstances with MicroPython 1.9.4 and an 8x8 LED matrix) - 3D parts: add diffuser grid and frame for square LED matrix displays - Arduino projects needs to be in a folder with the same name as the .ino file - config: allow multiple WiFi networks to be configured - config: add support for debug flags - config: add intensity configuration - HAL: unify serial input processing for Arduino and Pycom devices - HAL: handle UART write failures on Pycom devices - HAL: drop garbage collection from .update_display() because it takes several hundred milliseconds on 4MB devices - MCU: clear display when enabling/disabling MCU independence from host - PixelFont: move data to class attributes to reduce memory usage - PixelFont: add more characters - PixelFont: move data generation to scripts/generate-pixelfont.py - LedMatrix: support LED matrixes with strides other than 8 (e.g. as 16x16 matrices) - LedMatrix: add method to render text - LedMatrix: let consumers handle brightness themselves - AnimationScene: MicroPython does not implement bytearray.find - AnimationScene: ensure minimum on-screen time - BootScene: wifi connection and RTC sync progress for Pycom devices - ClockScene: delete unused code, switch to generic text rendering method - FireScene: classical fire effect - WeatherScene: bug fixes, switch to generic text rendering method - WeatherScene: ensure minimum on-screen time - WeatherScene: use custom JSON parsing to reduce memory usage
This commit is contained in:
312
main.py
312
main.py
@@ -1,157 +1,52 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# This is a project to drive a 8x32 (or 8x8) LED matrix based on the
|
||||
# popular WS2812 RGB LEDs using a microcontroller (e.g. a Teensy 3.x
|
||||
# or a Pycom module with 4MB RAM) and optionally control them both
|
||||
# using a more powerful host computer, such as a Raspberry Pi Zero W.
|
||||
# This is a project to drive a 32x8 or 16x16 LED matrix based on the popular
|
||||
# WS2812 RGB LEDs using a microcontroller running MicroPython (preferrably
|
||||
# one with 4MB RAM although modules with 1MB also work).
|
||||
#
|
||||
# -- noah@hack.se, 2018
|
||||
#
|
||||
import sys
|
||||
import time
|
||||
import gc
|
||||
from math import ceil
|
||||
from ledmatrix import LedMatrix
|
||||
# This is to make sure we have a large contiguous block of RAM on devices with
|
||||
# 520kB RAM after all modules and modules have been compiled and instantiated.
|
||||
#
|
||||
# In the weather scene, the ussl module needs a large chunk of around 1850
|
||||
# bytes, and without the dummy allocation below the heap will be too
|
||||
# fragmented after all the initial processing to find such a large chunk.
|
||||
large_temp_chunk = bytearray(3400)
|
||||
|
||||
pycom_board = False
|
||||
esp8266_board = False
|
||||
if hasattr(sys,'implementation') and sys.implementation.name == 'micropython':
|
||||
pycom_board = True
|
||||
import ujson as json
|
||||
import machine
|
||||
from network import WLAN
|
||||
# Local imports
|
||||
from pycomhal import PycomHAL
|
||||
from os import uname
|
||||
tmp = uname()
|
||||
if tmp.sysname == 'esp8266':
|
||||
esp8266_board = True
|
||||
from upyhal import uPyHAL as HAL
|
||||
else:
|
||||
pycom_board = True
|
||||
from pycomhal import PycomHAL as HAL
|
||||
tmp = None
|
||||
del uname
|
||||
else:
|
||||
pycom_board = False
|
||||
# Emulate https://docs.pycom.io/firmwareapi/micropython/utime.html
|
||||
time.ticks_ms = lambda: int(time.time() * 1000)
|
||||
time.sleep_ms = lambda x: time.sleep(x/1000.0)
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import signal
|
||||
# Local imports
|
||||
from arduinoserialhal import ArduinoSerialHAL
|
||||
from arduinoserialhal import ArduinoSerialHAL as HAL
|
||||
|
||||
# Local imports
|
||||
from ledmatrix import LedMatrix
|
||||
from clockscene import ClockScene
|
||||
gc.collect()
|
||||
from renderloop import RenderLoop
|
||||
|
||||
|
||||
class RenderLoop:
|
||||
def __init__(self, display = None, fps=10):
|
||||
self.display = display
|
||||
self.fps = fps
|
||||
self.t_next_frame = None
|
||||
self.prev_frame = 0
|
||||
self.frame = 1
|
||||
self.t_init = time.ticks_ms() / 1000.0
|
||||
self.debug = 1
|
||||
self.scenes = []
|
||||
self.scene_index = 0
|
||||
self.scene_switch_effect = 0
|
||||
self.reset_scene_switch_counter()
|
||||
|
||||
def reset_scene_switch_counter(self):
|
||||
"""
|
||||
Reset counter used to automatically switch scenes.
|
||||
The counter is decreased in .next_frame()
|
||||
"""
|
||||
self.scene_switch_countdown = 45 * self.fps
|
||||
|
||||
def add_scene(self, scene):
|
||||
"""
|
||||
Add new scene to the render loop
|
||||
"""
|
||||
self.scenes.append(scene)
|
||||
|
||||
def next_scene(self):
|
||||
"""
|
||||
Transition to a new scene and re-initialize the scene
|
||||
"""
|
||||
print('RenderLoop: next_scene: transitioning scene')
|
||||
# Fade out current scene
|
||||
effect = self.scene_switch_effect
|
||||
self.scene_switch_effect = (effect + 1) % 3
|
||||
if effect == 0:
|
||||
self.display.dissolve()
|
||||
elif effect == 1:
|
||||
self.display.fade()
|
||||
else:
|
||||
self.display.scrollout()
|
||||
|
||||
self.scene_index += 1
|
||||
if self.scene_index == len(self.scenes):
|
||||
self.scene_index = 0
|
||||
i = self.scene_index
|
||||
print('RenderLoop: next_scene: selected {}'.format(self.scenes[i].__class__.__name__))
|
||||
# (Re-)initialize scene
|
||||
self.scenes[i].reset()
|
||||
|
||||
def next_frame(self, button_pressed=0):
|
||||
"""
|
||||
Display next frame, possibly after a delay to ensure we meet the FPS target
|
||||
"""
|
||||
|
||||
scene = self.scenes[self.scene_index]
|
||||
if button_pressed:
|
||||
# Let the scene handle input
|
||||
if scene.input(0, button_pressed):
|
||||
# The scene handled the input itself so ignore it
|
||||
button_pressed = 0
|
||||
|
||||
t_now = time.ticks_ms() / 1000.0 - self.t_init
|
||||
if not self.t_next_frame:
|
||||
self.t_next_frame = t_now
|
||||
|
||||
delay = self.t_next_frame - t_now
|
||||
if delay >= 0:
|
||||
# Wait until we can display next frame
|
||||
x = time.ticks_ms() / 1000.0
|
||||
time.sleep_ms(int(1000 * delay))
|
||||
x = time.ticks_ms() / 1000.0 - x
|
||||
if x-delay > 0.01:
|
||||
print('RenderLoop: WARN: Overslept when sleeping for {}s, slept {}s more'.format(delay, round(x-delay, 6)))
|
||||
else:
|
||||
if self.debug:
|
||||
print('RenderLoop: WARN: FPS {} might be too high, {}s behind and missed {} frames'.format(self.fps, -delay, round(-delay*self.fps, 2)))
|
||||
# Resynchronize
|
||||
t_diff = self.fps * (t_now-self.t_next_frame)/self.fps - delay
|
||||
if self.debug:
|
||||
print('RenderLoop: Should have rendered frame {} at {} but was {}s late'.format(self.frame, self.t_next_frame, t_diff))
|
||||
t_diff += 1./self.fps
|
||||
self.frame += int(round(self.fps * t_diff))
|
||||
self.t_next_frame += t_diff
|
||||
if self.debug:
|
||||
print('RenderLoop: Will instead render frame {} at {}'.format(self.frame, self.t_next_frame))
|
||||
|
||||
if self.debug:
|
||||
print('RenderLoop: Rendering frame {}, next frame at {}'.format(self.frame, round(self.t_next_frame+1./self.fps, 4)))
|
||||
|
||||
# Render current scene
|
||||
t = time.ticks_ms() / 1000.0
|
||||
loop_again = scene.render(self.frame, self.frame - self.prev_frame - 1, self.fps)
|
||||
t = time.ticks_ms() / 1000.0 - t
|
||||
if t > 0.1:
|
||||
print('RenderLoop: WARN: Spent {}s rendering'.format(t))
|
||||
|
||||
self.scene_switch_countdown -= 1
|
||||
if button_pressed or not loop_again or not self.scene_switch_countdown:
|
||||
self.reset_scene_switch_counter()
|
||||
if not loop_again:
|
||||
print('RenderLoop: scene "{}" signalled completion'.format(self.scenes[self.scene_index].__class__.__name__))
|
||||
else:
|
||||
print('RenderLoop: forcefully switching scenes (button: {}, timer: {}'.format(button_pressed, self.scene_switch_countdown))
|
||||
# Transition to next scene
|
||||
self.next_scene()
|
||||
# Account for time wasted above
|
||||
t_new = time.ticks_ms() / 1000.0 - self.t_init
|
||||
t_diff = t_new - t_now
|
||||
frames_wasted = ceil(t_diff * self.fps)
|
||||
#print('RenderLoop: setup: scene switch took {}s, original t {}s, new t {}s, spent {} frames'.format(t_diff, t_now,t_new, self.fps*t_diff))
|
||||
self.frame += int(frames_wasted)
|
||||
self.t_next_frame += frames_wasted / self.fps
|
||||
|
||||
self.prev_frame = self.frame
|
||||
self.frame += 1
|
||||
self.t_next_frame += 1./self.fps
|
||||
|
||||
def sigint_handler(sig, frame):
|
||||
"""
|
||||
Clear display when the program is terminated by Ctrl-C or SIGTERM
|
||||
@@ -166,19 +61,11 @@ if __name__ == '__main__':
|
||||
f = open('config.json')
|
||||
config = json.loads(f.read())
|
||||
f.close()
|
||||
del json
|
||||
|
||||
# Initialize HAL
|
||||
if pycom_board:
|
||||
# We're running under MCU here
|
||||
print('WLAN: Connecting')
|
||||
wlan = WLAN(mode=WLAN.STA)
|
||||
wlan.connect(config['ssid'], auth=(WLAN.WPA2, config['password']))
|
||||
while not wlan.isconnected():
|
||||
machine.idle() # save power while waiting
|
||||
time.sleep(1)
|
||||
print('WLAN: Connected with IP: {}'.format(wlan.ifconfig()[0]))
|
||||
driver = PycomHAL(config)
|
||||
else:
|
||||
driver = HAL(config)
|
||||
if not esp8266_board and not pycom_board:
|
||||
# We're running on the host computer here
|
||||
ports = [
|
||||
'/dev/tty.usbmodem575711', # Teensy 3.x on macOS
|
||||
@@ -188,57 +75,122 @@ if __name__ == '__main__':
|
||||
]
|
||||
for port in ports:
|
||||
if os.path.exists(port):
|
||||
config['port'] = port
|
||||
break
|
||||
driver = ArduinoSerialHAL(config)
|
||||
driver.set_rtc(time.time() + config['tzOffsetSeconds'])
|
||||
# Disable automatic rendering of time
|
||||
driver.set_auto_time(False)
|
||||
# Trap Ctrl-C and service termination
|
||||
signal.signal(signal.SIGINT, sigint_handler)
|
||||
signal.signal(signal.SIGTERM, sigint_handler)
|
||||
|
||||
|
||||
# Initialize led matrix framebuffer on top of HAL
|
||||
num_leds = 256
|
||||
rows = 8
|
||||
cols = num_leds // rows
|
||||
display = LedMatrix(driver, cols, rows, rotation=0)
|
||||
display = LedMatrix(driver, config['LedMatrix'])
|
||||
driver.clear_display()
|
||||
|
||||
if pycom_board:
|
||||
# If we're running on the MCU then loop forever
|
||||
while True:
|
||||
driver.serial_loop(display)
|
||||
# We're running under MCU here
|
||||
from bootscene import BootScene
|
||||
scene = BootScene(display, config['Boot'])
|
||||
wlan = WLAN(mode=WLAN.STA)
|
||||
if not wlan.isconnected():
|
||||
print('WLAN: Scanning for networks')
|
||||
scene.render(0,0,0)
|
||||
default_ssid, default_auth = wlan.ssid(), wlan.auth()
|
||||
candidates = wlan.scan()
|
||||
for conf in config['networks']:
|
||||
nets = [candidate for candidate in candidates if candidate.ssid == conf['ssid']]
|
||||
if not nets:
|
||||
continue
|
||||
print('WLAN: Connecting to known network: {}'.format(nets[0].ssid))
|
||||
wlan.connect(nets[0].ssid, auth=(nets[0].sec, conf['password']))
|
||||
for i in range(1,40):
|
||||
scene.render(i, 0, 0)
|
||||
time.sleep(0.2)
|
||||
if wlan.isconnected():
|
||||
break
|
||||
if wlan.isconnected():
|
||||
break
|
||||
scene.render(0, 0, 0)
|
||||
if not wlan.isconnected():
|
||||
# TODO: This will only use SSID/auth settings from NVRAM during cold boots
|
||||
print('WLAN: No known networks, enabling AP with ssid={}, pwd={}'.format(default_ssid, default_auth[1]))
|
||||
wlan.init(mode=WLAN.AP, ssid=default_ssid, auth=default_auth, channel=6)
|
||||
else:
|
||||
display.clear()
|
||||
print('WLAN: Connected with IP: {}'.format(wlan.ifconfig()[0]))
|
||||
# Initialize RTC now that we're connected
|
||||
driver.set_rtc(scene)
|
||||
scene.render(0,0,0)
|
||||
scene = None
|
||||
del BootScene
|
||||
elif esp8266_board:
|
||||
pass
|
||||
|
||||
|
||||
# This is where it all begins
|
||||
r = RenderLoop(display, fps=10)
|
||||
r = RenderLoop(display, config)
|
||||
|
||||
scene = ClockScene(display, config['ClockScene'])
|
||||
r.add_scene(scene)
|
||||
if 'Clock' in config:
|
||||
from clockscene import ClockScene
|
||||
scene = ClockScene(display, config['Clock'])
|
||||
r.add_scene(scene)
|
||||
gc.collect()
|
||||
|
||||
from weatherscene import WeatherScene
|
||||
scene = WeatherScene(display, config['WeatherScene'])
|
||||
r.add_scene(scene)
|
||||
if 'Demo' in config:
|
||||
from demoscene import DemoScene
|
||||
scene = DemoScene(display, config['Demo'])
|
||||
r.add_scene(scene)
|
||||
gc.collect()
|
||||
|
||||
from animationscene import AnimationScene
|
||||
scene = AnimationScene(display, config['AnimationScene'])
|
||||
r.add_scene(scene)
|
||||
if 'Weather' in config:
|
||||
from weatherscene import WeatherScene
|
||||
scene = WeatherScene(display, config['Weather'])
|
||||
r.add_scene(scene)
|
||||
gc.collect()
|
||||
|
||||
if 'Fire' in config:
|
||||
from firescene import FireScene
|
||||
scene = FireScene(display, config['Fire'])
|
||||
r.add_scene(scene)
|
||||
gc.collect()
|
||||
|
||||
if 'Animation' in config:
|
||||
from animationscene import AnimationScene
|
||||
scene = AnimationScene(display, config['Animation'])
|
||||
r.add_scene(scene)
|
||||
gc.collect()
|
||||
|
||||
# Now that we're all setup, release the large chunk
|
||||
large_temp_chunk = None
|
||||
|
||||
# Render scenes forever
|
||||
while True:
|
||||
button_pressed = 0
|
||||
while True:
|
||||
# Drain output from MCU and detect button presses
|
||||
line = driver.readline()
|
||||
if not line:
|
||||
break
|
||||
event = line.strip()
|
||||
if event == 'BUTTON_SHRT_PRESS':
|
||||
button_pressed = 1
|
||||
elif event == 'BUTTON_LONG_PRESS':
|
||||
button_pressed = 2
|
||||
else:
|
||||
print('MCU: {}'.format(event))
|
||||
|
||||
r.next_frame(button_pressed)
|
||||
if button_pressed:
|
||||
button_state = 0
|
||||
# Process input
|
||||
button_state = 0
|
||||
if pycom_board:
|
||||
# When running under MicroPython on the MCU we need to deal with
|
||||
# any input coming from the host over the serial link
|
||||
while True:
|
||||
button_state = driver.process_input()
|
||||
if driver.enable_auto_time:
|
||||
# If the host disconnected we'll hand over control to the
|
||||
# game loop which will take care of updating the display
|
||||
break
|
||||
else:
|
||||
# When running under regular Python on the host computer we need
|
||||
# to pick up any button presses sent over the serial link from
|
||||
# the Arduino firmware
|
||||
n = 0
|
||||
while True:
|
||||
# Drain output from MCU and detect button presses
|
||||
line = driver.process_input()
|
||||
if not line:
|
||||
break
|
||||
event = line.strip()
|
||||
if event == 'LEFTB_SHRT_PRESS':
|
||||
button_state = 1
|
||||
elif event == 'LEFTB_LONG_PRESS':
|
||||
button_state = 2
|
||||
else:
|
||||
print('MCU: {}'.format(event))
|
||||
r.next_frame(button_state)
|
||||
|
||||
Reference in New Issue
Block a user