270 lines
7.4 KiB
Python
Executable File
270 lines
7.4 KiB
Python
Executable File
# Render the current weather forecast from SMHI.se
|
|
#
|
|
import time
|
|
import gc
|
|
try:
|
|
import urequests as requests
|
|
except ImportError:
|
|
import requests
|
|
|
|
# Local imports
|
|
from pixelfont import PixelFont
|
|
from icon import Icon
|
|
|
|
# Based on demoscene.py
|
|
class WeatherScene:
|
|
"""
|
|
This module displays a weather forecast from SMHI (Sweden)
|
|
"""
|
|
|
|
dir_prefix = 'weather/'
|
|
|
|
def __init__(self, display, config):
|
|
"""
|
|
Initialize the module.
|
|
`display` is saved as an instance variable because it is needed to
|
|
update the display via self.display.put_pixel() and .render()
|
|
"""
|
|
self.display = display
|
|
self.icon = None
|
|
self.debug = False
|
|
self.intensity = 16
|
|
self.lat = 51.666664
|
|
self.lon = 5.3666652
|
|
self.api_url = 'https://opendata-download-metfcst.smhi.se'
|
|
self.headers = {
|
|
'User-Agent':'weatherscene.py/1.0 (+https://github.com/noahwilliamsson/lamatrix)',
|
|
'Accept-Encoding': 'identity',
|
|
}
|
|
self.temperature = 0
|
|
self.wind_speed = 0
|
|
self.last_refreshed_at = 0
|
|
# http://opendata.smhi.se/apidocs/metfcst/parameters.html#parameter-wsymb
|
|
self.symbol = None
|
|
self.symbol_to_icon = [
|
|
None,
|
|
['moon-stars.bin', 'sunny.bin'], # clear sky
|
|
['moon-stars.bin', 'sunny-with-clouds.bin'], # nearly clear sky
|
|
'cloud-partly.bin', # variable cloudiness
|
|
'sunny-with-clouds.bin', # halfclear sky
|
|
'cloudy.bin', # cloudy sky
|
|
'cloudy.bin', # overcast
|
|
'fog.bin', # fog
|
|
'rain.bin', # light rain showers
|
|
'rain.bin', # medium rain showers
|
|
'rain.bin', # heavy rain showers
|
|
'rain.bin', # thunderstorm
|
|
'rain-snow.bin', # light sleet showers
|
|
'rain-snow.bin', # medium sleet showers
|
|
'rain-snow.bin', # heavy sleet showers
|
|
'rain-snow.bin', # light snow showers
|
|
'rain-snow.bin', # medium snow showers
|
|
'rain-snow.bin', # heavy snow showers
|
|
'rain.bin', # light rain
|
|
'rain.bin', # medium rain
|
|
'rain.bin', # heavy rain
|
|
'thunderstorm.bin', # thunder
|
|
'rain-snowy.bin', # light sleet
|
|
'rain-snowy.bin', # medium sleet
|
|
'rain-snowy.bin', # heavy sleet
|
|
'snow-house.bin', # light snowfall
|
|
'snow-house.bin', # medium snowfall
|
|
'snow-house.bin', # heavy snowfall
|
|
]
|
|
self.num_frames = 1
|
|
self.remaining_frames = 1
|
|
self.next_frame_at = 0
|
|
if not config:
|
|
return
|
|
if 'debug' in config:
|
|
self.debug = config['debug']
|
|
if 'intensity' in config:
|
|
self.intensity = int(round(config['intensity']*255))
|
|
if 'lat' in config:
|
|
self.lat = config['lat']
|
|
if 'lon' in config:
|
|
self.lon = config['lon']
|
|
|
|
|
|
def reset(self):
|
|
"""
|
|
This method is called before transitioning to this scene.
|
|
Use it to (re-)initialize any state necessary for your scene.
|
|
"""
|
|
self.next_frame_at = 0
|
|
self.reset_icon()
|
|
t = time.time()
|
|
if t < self.last_refreshed_at + 1800:
|
|
return
|
|
|
|
# fetch a new forecast from SMHI
|
|
url = '{}/api/category/pmp3g/version/2/geotype/point/lon/{}/lat/{}/data.json'.format(self.api_url, self.lon, self.lat)
|
|
print('WeatherScene: reset called, requesting weather forecast from: {}'.format(url))
|
|
|
|
r = requests.get(url, headers=self.headers, stream=True)
|
|
if r.status_code != 200:
|
|
print('WeatherScene: failed to request {}: status {}'.format(url, r.status_code))
|
|
return
|
|
|
|
print('WeatherScene: parsing weather forecast')
|
|
next_hour = int(time.time())
|
|
next_hour = next_hour - next_hour%3600 + 3600
|
|
expected_timestamp = '{:04d}-{:02d}-{:02d}T{:02d}'.format(*time.gmtime(next_hour))
|
|
temp, ws, symb = self.get_forecast(r.raw, expected_timestamp)
|
|
# Close socket and free up RAM
|
|
r.close()
|
|
r = None
|
|
gc.collect()
|
|
|
|
if temp == None:
|
|
print('WeatherScene: failed to find forecast for timestamp prefix: {}'.format(expected_timestamp))
|
|
return
|
|
self.temperature = float(temp.decode())
|
|
self.wind_speed = float(ws.decode())
|
|
self.symbol = int(symb.decode())
|
|
self.last_refreshed_at = t
|
|
|
|
filename = self.symbol_to_icon[self.symbol]
|
|
if not filename:
|
|
return
|
|
|
|
if type(filename) == list:
|
|
lt = time.localtime(next_hour)
|
|
if lt[3] < 7 or lt[3] > 21:
|
|
# Assume night icon
|
|
filename = filename[0]
|
|
else:
|
|
filename = filename[1]
|
|
if self.icon:
|
|
# MicroPython does not support destructors so we need to manually
|
|
# close the file we have opened
|
|
self.icon.close() # Close icon file
|
|
self.icon = Icon(self.dir_prefix + filename)
|
|
self.icon.set_intensity(self.intensity)
|
|
self.reset_icon()
|
|
|
|
def input(self, button_state):
|
|
"""
|
|
Handle button inputs
|
|
"""
|
|
return 0 # signal that we did not handle the input
|
|
|
|
def set_intensity(self, value=None):
|
|
if value is not None:
|
|
self.intensity -= 1
|
|
if not self.intensity:
|
|
self.intensity = 16
|
|
if self.icon:
|
|
self.icon.set_intensity(self.intensity)
|
|
return self.intensity
|
|
|
|
def render(self, frame, dropped_frames, fps):
|
|
"""
|
|
Render the scene.
|
|
This method is called by the render loop with the current frame number,
|
|
the number of dropped frames since the previous invocation and the
|
|
requested frames per second (FPS).
|
|
"""
|
|
|
|
if frame < self.next_frame_at:
|
|
return True
|
|
|
|
self.remaining_frames -= 1
|
|
n = self.num_frames
|
|
index = n - (self.remaining_frames % n) - 1
|
|
# Calculate next frame number
|
|
self.next_frame_at = frame + int(fps * self.icon.frame_length()/1000)
|
|
# Render frame
|
|
display = self.display
|
|
intensity = self.intensity
|
|
self.icon.blit(display, 0 if display.columns == 32 else 4, 0)
|
|
|
|
# Render text
|
|
if self.remaining_frames >= n:
|
|
text = '{:.2g}\'c'.format(self.temperature)
|
|
else:
|
|
text = '{:.2g}m/s'.format(self.wind_speed)
|
|
if display.columns <= 16:
|
|
text = '{:.1g}m/s'.format(self.wind_speed)
|
|
display.render_text(PixelFont, text, 9 if display.columns == 32 else 0, 1 if display.columns == 32 else 10, intensity)
|
|
|
|
display.render()
|
|
if self.remaining_frames == 0:
|
|
return False
|
|
return True
|
|
|
|
def reset_icon(self):
|
|
if not self.icon:
|
|
return
|
|
self.icon.reset()
|
|
self.num_frames = self.icon.num_frames
|
|
self.remaining_frames = self.num_frames*2
|
|
t_icon = self.icon.length_total()
|
|
# Ensure a minimum display time
|
|
for i in range(1,6):
|
|
if t_icon*i >= 4000:
|
|
break
|
|
self.remaining_frames += self.num_frames
|
|
|
|
def get_forecast(self, f, validTime):
|
|
"""
|
|
Extract temperature, windspeed, weather symbol from JSON response
|
|
"""
|
|
timeStr = None
|
|
tempStr = None
|
|
wsStr = None
|
|
symbStr = None
|
|
while True:
|
|
v = f.read(1)
|
|
if v != b'"':
|
|
continue
|
|
s = self.next_string(f)
|
|
if not timeStr:
|
|
if not s.startswith(b'validTime'):
|
|
continue
|
|
timeStr = self.next_string(f, 2)
|
|
if not timeStr.startswith(validTime):
|
|
timeStr = None
|
|
continue
|
|
elif not tempStr and s == b't':
|
|
tempStr = self.next_array_entry(f)
|
|
elif not wsStr and s == b'ws':
|
|
wsStr = self.next_array_entry(f)
|
|
elif not symbStr and s == b'Wsymb2':
|
|
symbStr = self.next_array_entry(f)
|
|
elif timeStr and tempStr and wsStr and symbStr:
|
|
break
|
|
return (tempStr, wsStr, symbStr)
|
|
|
|
def next_string(self, f, start_at = 0):
|
|
"""
|
|
Extract string value from JSON
|
|
"""
|
|
if start_at:
|
|
f.read(start_at)
|
|
stash = bytearray()
|
|
while True:
|
|
c = f.read(1)
|
|
if c == b'"':
|
|
break
|
|
stash.append(c[0])
|
|
return bytes(stash)
|
|
|
|
def next_array_entry(self, f):
|
|
"""
|
|
Extract first array entry from JSON
|
|
"""
|
|
in_array = False
|
|
stash = bytearray()
|
|
while True:
|
|
c = f.read(1)
|
|
if not in_array:
|
|
if c != b'[':
|
|
continue
|
|
in_array = True
|
|
continue
|
|
if c == b']' or c == b' ' or c == b',':
|
|
break
|
|
stash.append(c[0])
|
|
return bytes(stash)
|