# HG changeset patch # User Paul Fisher # Date 1571195110 14400 # Node ID b5625b531d2dea33bb24da54dd36215cc49e539c # Parent c01f9929ae38a9d926daa1a77670c0de9cf418bc Add support for BME280 temperature sensor over i2c. diff -r c01f9929ae38 -r b5625b531d2d requirements.txt --- a/requirements.txt Tue Oct 15 22:40:24 2019 -0400 +++ b/requirements.txt Tue Oct 15 23:05:10 2019 -0400 @@ -4,3 +4,5 @@ pymongo pytz requests +RPi.bme280 +smbus2 diff -r c01f9929ae38 -r b5625b531d2d setup.py --- a/setup.py Tue Oct 15 22:40:24 2019 -0400 +++ b/setup.py Tue Oct 15 23:05:10 2019 -0400 @@ -12,6 +12,8 @@ 'pymongo', 'pytz', 'requests', + 'RPi.bme280', + 'smbus2', ], setup_requires=['wheel'], ) diff -r c01f9929ae38 -r b5625b531d2d weatherlog/reader.py --- a/weatherlog/reader.py Tue Oct 15 22:40:24 2019 -0400 +++ b/weatherlog/reader.py Tue Oct 15 23:05:10 2019 -0400 @@ -8,7 +8,9 @@ import typing as t import adafruit_dht +import bme280 import board +import smbus2 from . import types @@ -66,6 +68,21 @@ time.sleep(self._RETRY_WAIT_SECS) +class BME280Reader: + + def __init__(self, bus_id: int = 1, address: int = 0x77): + self.bus = smbus2.SMBus(bus_id) + self.address = address + self.calibration = bme280.load_calibration_params(self.bus, address) + + def read(self) -> types.Reading: + reading = bme280.sample(self.bus, self.address, self.calibration) + return types.Reading.from_now( + temp_c=reading.temperature, + rh_pct=reading.humidity, + pressure_kpa=reading.pressure / 10) + + def _last_stable( vals: t.Iterable[float], *, diff -r c01f9929ae38 -r b5625b531d2d weatherlog/types.py --- a/weatherlog/types.py Tue Oct 15 22:40:24 2019 -0400 +++ b/weatherlog/types.py Tue Oct 15 23:05:10 2019 -0400 @@ -2,6 +2,7 @@ """ import datetime +import typing as t import attr import pytz @@ -12,23 +13,33 @@ return datetime.datetime.now(tz=pytz.UTC) -@attr.s(frozen=True, slots=True) +T = t.TypeVar('T') + + +@attr.s(auto_attribs=True, frozen=True, slots=True) class Reading(object): """A single reading from a temperature/humidity sensor.""" # The timestamp of the reading. - sample_time = attr.ib(type=datetime.datetime) + sample_time: datetime.datetime # The temperature, in degrees Celsius. - temp_c = attr.ib(type=float) + temp_c: float # The relative humidity, in percent. - rh_pct = attr.ib(type=float) + rh_pct: float + + # The atmospheric pressure in kilopascals, if known. + pressure_kpa: t.Optional[float] = None @classmethod - def from_now(cls, **kwargs) -> 'Reading': + def from_now(cls: t.Type[T], **kwargs) -> T: """Creates a new reading taken at the current time.""" return cls(sample_time=_utc_now(), **kwargs) def as_dict(self): - return attr.asdict(self, recurse=False) + return filter_nones(attr.asdict(self, recurse=False)) + + +def filter_nones(d: t.Dict[str, t.Optional[object]]) -> t.Dict[str, object]: + return {k: v for (k, v) in d.items() if v is not None}