Mercurial > personal > weatherlog
view weatherlog/reader.py @ 15:b5625b531d2d
Add support for BME280 temperature sensor over i2c.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Tue, 15 Oct 2019 23:05:10 -0400 |
parents | 885bff085edf |
children | 92367b644e29 |
line wrap: on
line source
"""A module for reading data from the actual temperature sensor. You should probably assume that this is wildly thread-unsafe. """ import abc import time import typing as t import adafruit_dht import bme280 import board import smbus2 from . import types class Reader(metaclass=abc.ABCMeta): """Interface for a thing which reads temperatures.""" @abc.abstractmethod def read(self) -> types.Reading: """Reads a value from the weather sensor.""" raise NotImplementedError() class DHT22Reader(Reader): """A reader for the DHT22 sensor, which attempts to make good readings.""" def __init__(self, pin: t.Any = board.D4): """Initializes a DHT22 reader connected to the specified pin.""" self.device = adafruit_dht.DHT22(pin) _TEMP_C_EPSILON = 0.51 _RH_PCT_EPSILON = 1.01 _NEW_READING_SECS = 2.5 def read(self) -> types.Reading: """Reads a value from the sensor. This will block until done.""" temps: t.List[float] = [] humids: t.List[float] = [] while True: temp, rh = self._read_once() if _is_reasonable_temp(temp): temps.append(temp) if _is_reasonable_rh(rh): humids.append(rh) try: return types.Reading.from_now( temp_c=_last_stable(temps, epsilon=self._TEMP_C_EPSILON), rh_pct=_last_stable(humids, epsilon=self._RH_PCT_EPSILON)) except ValueError: # There is not yet enough data to see if the value is stable. # Wait for a new reading. time.sleep(self._NEW_READING_SECS) _RETRY_WAIT_SECS = 0.5 def _read_once(self) -> t.Tuple[float, float]: """Tries very very hard to read a single value from the sensor.""" while True: try: temp, rh = self.device.temperature, self.device.humidity if None not in {temp, rh}: return temp, rh except RuntimeError: pass # This will happen a lot. Just try again. time.sleep(self._RETRY_WAIT_SECS) class BME280Reader: def __init__(self, bus_id: int = 1, address: int = 0x77): self.bus = smbus2.SMBus(bus_id) self.address = address self.calibration = bme280.load_calibration_params(self.bus, address) def read(self) -> types.Reading: reading = bme280.sample(self.bus, self.address, self.calibration) return types.Reading.from_now( temp_c=reading.temperature, rh_pct=reading.humidity, pressure_kpa=reading.pressure / 10) def _last_stable( vals: t.Iterable[float], *, epsilon: float, count: int = 3) -> float: """Returns the last stable value from the given sequence. The stable value is defined as the last value for which there are `count` values in the sequence within `epsilon` of some value. This is used to eliminate misread values in the sequence, usually due to timing issues. The `count` values need not be in sequence. The value returned is the median value of the group (or the next-to-median in the case of an even count). """ buckets: t.Dict[float, t.List[float]] = {} for v in vals: buckets.setdefault(v, []) for bucket, bucket_vals in buckets.items(): if abs(v - bucket) <= epsilon: bucket_vals.append(v) if len(bucket_vals) == count: return sorted(bucket_vals)[count // 2] raise ValueError('No stable value.') def _is_reasonable_temp(temp: float) -> bool: """True if the temperature is a reasonable level for the boathouse.""" return -40 <= temp <= 60 def _is_reasonable_rh(rh: float) -> bool: """True if the relative humidity is plausible.""" return 0 <= rh <= 100