Mercurial > personal > weatherlog
view weatherlog/reader.py @ 3:d961c8a93d6b
reader: Use floor-division for list index.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Fri, 27 Sep 2019 21:38:23 -0400 |
parents | 9310e2ff7e17 |
children | 885bff085edf |
line wrap: on
line source
"""A module for reading data from the actual temperature sensor. You should probably assume that this is wildly thread-unsafe. """ import abc import datetime import time import typing as t import adafruit_dht import attr import board import pytz def _utc_now() -> datetime.datetime: """utcnow, but timezone-aware.""" return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) @attr.s(frozen=True, slots=True) class Reading(object): """A single reading from a temperature/humidity sensor.""" # The temperature, in degrees Celsius. temp_c = attr.ib(type=float) # The relative humidity, in percent. rh_pct = attr.ib(type=float) # The timestamp of the reading. timestamp = attr.ib(type=datetime.datetime, factory=_utc_now) class Reader(metaclass=abc.ABCMeta): """Interface for a thing which reads temperatures.""" @abc.abstractmethod def read(self) -> Reading: """Reads a value from the weather sensor.""" raise NotImplementedError() class DHT22Reader(Reader): """A reader for the DHT22 sensor, which attempts to make good readings.""" def __init__(self, pin: t.Any = board.D4): """Initializes a DHT22 reader connected to the specified pin.""" self.device = adafruit_dht.DHT22(pin) _TEMP_C_EPSILON = 0.51 _RH_PCT_EPSILON = 1.01 _NEW_READING_SECS = 2.5 def read(self) -> Reading: """Reads a value from the sensor. This will block until done.""" temps: t.List[float] = [] humids: t.List[float] = [] while True: temp, rh = self._read_once() if _is_reasonable_temp(temp): temps.append(temp) if _is_reasonable_rh(rh): humids.append(rh) try: return Reading( temp_c=_last_stable(temps, epsilon=self._TEMP_C_EPSILON), rh_pct=_last_stable(humids, epsilon=self._RH_PCT_EPSILON)) except ValueError: # There is not yet enough data to see if the value is stable. # Wait for a new reading. time.sleep(self._NEW_READING_SECS) _RETRY_WAIT_SECS = 0.5 def _read_once(self) -> t.Tuple[float, float]: """Tries very very hard to read a single value from the sensor.""" while True: try: temp, rh = self.device.temperature, self.device.humidity if None not in {temp, rh}: return temp, rh except RuntimeError: pass # This will happen a lot. Just try again. time.sleep(self._RETRY_WAIT_SECS) def _last_stable( vals: t.Iterable[float], *, epsilon: float, count: int = 3) -> float: """Returns the last stable value from the given sequence. The stable value is defined as the last value for which there are `count` values in the sequence within `epsilon` of some value. This is used to eliminate misread values in the sequence, usually due to timing issues. The `count` values need not be in sequence. The value returned is the median value of the group (or the next-to-median in the case of an even count). """ buckets: t.Dict[float, t.List[float]] = {} for v in vals: buckets.setdefault(v, []) for bucket, bucket_vals in buckets.items(): if abs(v - bucket) <= epsilon: bucket_vals.append(v) if len(bucket_vals) == count: return sorted(bucket_vals)[count // 2] raise ValueError('No stable value.') def _is_reasonable_temp(temp: float) -> bool: """True if the temperature is a reasonable level for the boathouse.""" return -40 <= temp <= 60 def _is_reasonable_rh(rh: float) -> bool: """True if the relative humidity is plausible.""" return 0 <= rh <= 100