view weatherlog/ @ 5:885bff085edf

Add remote logger class for eventual HTTP writer.
author Paul Fisher <>
date Sat, 28 Sep 2019 19:28:22 -0400
parents d961c8a93d6b
children b5625b531d2d
line wrap: on
line source

"""A module for reading data from the actual temperature sensor.

You should probably assume that this is wildly thread-unsafe.

import abc
import time
import typing as t

import adafruit_dht
import board

from . import types

class Reader(metaclass=abc.ABCMeta):
    """Interface for a thing which reads temperatures."""

    def read(self) -> types.Reading:
        """Reads a value from the weather sensor."""
        raise NotImplementedError()

class DHT22Reader(Reader):
    """A reader for the DHT22 sensor, which attempts to make good readings."""

    def __init__(self, pin: t.Any = board.D4):
        """Initializes a DHT22 reader connected to the specified pin."""
        self.device = adafruit_dht.DHT22(pin)

    _TEMP_C_EPSILON = 0.51
    _RH_PCT_EPSILON = 1.01

    def read(self) -> types.Reading:
        """Reads a value from the sensor.  This will block until done."""
        temps: t.List[float] = []
        humids: t.List[float] = []
        while True:
            temp, rh = self._read_once()
            if _is_reasonable_temp(temp):
            if _is_reasonable_rh(rh):
                return types.Reading.from_now(
                    temp_c=_last_stable(temps, epsilon=self._TEMP_C_EPSILON),
                    rh_pct=_last_stable(humids, epsilon=self._RH_PCT_EPSILON))
            except ValueError:
                # There is not yet enough data to see if the value is stable.
                # Wait for a new reading.

    _RETRY_WAIT_SECS = 0.5

    def _read_once(self) -> t.Tuple[float, float]:
        """Tries very very hard to read a single value from the sensor."""
        while True:
                temp, rh = self.device.temperature, self.device.humidity
                if None not in {temp, rh}:
                    return temp, rh
            except RuntimeError:
                pass  # This will happen a lot.  Just try again.

def _last_stable(
        vals: t.Iterable[float],
        epsilon: float,
        count: int = 3) -> float:
    """Returns the last stable value from the given sequence.

    The stable value is defined as the last value for which there are `count`
    values in the sequence within `epsilon` of some value.  This is used to
    eliminate misread values in the sequence, usually due to timing issues.
    The `count` values need not be in sequence.

    The value returned is the median value of the group (or the next-to-median
    in the case of an even count).
    buckets: t.Dict[float, t.List[float]] = {}
    for v in vals:
        buckets.setdefault(v, [])
        for bucket, bucket_vals in buckets.items():
            if abs(v - bucket) <= epsilon:
            if len(bucket_vals) == count:
                return sorted(bucket_vals)[count // 2]
    raise ValueError('No stable value.')

def _is_reasonable_temp(temp: float) -> bool:
    """True if the temperature is a reasonable level for the boathouse."""
    return -40 <= temp <= 60

def _is_reasonable_rh(rh: float) -> bool:
    """True if the relative humidity is plausible."""
    return 0 <= rh <= 100