changeset 15:b5625b531d2d

Add support for BME280 temperature sensor over i2c.
author Paul Fisher <paul@pfish.zone>
date Tue, 15 Oct 2019 23:05:10 -0400
parents c01f9929ae38
children 770215590d80
files requirements.txt setup.py weatherlog/reader.py weatherlog/types.py
diffstat 4 files changed, 38 insertions(+), 6 deletions(-) [+]
line wrap: on
line diff
--- a/requirements.txt	Tue Oct 15 22:40:24 2019 -0400
+++ b/requirements.txt	Tue Oct 15 23:05:10 2019 -0400
@@ -4,3 +4,5 @@
 pymongo
 pytz
 requests
+RPi.bme280
+smbus2
--- a/setup.py	Tue Oct 15 22:40:24 2019 -0400
+++ b/setup.py	Tue Oct 15 23:05:10 2019 -0400
@@ -12,6 +12,8 @@
         'pymongo',
         'pytz',
         'requests',
+        'RPi.bme280',
+        'smbus2',
     ],
     setup_requires=['wheel'],
 )
--- a/weatherlog/reader.py	Tue Oct 15 22:40:24 2019 -0400
+++ b/weatherlog/reader.py	Tue Oct 15 23:05:10 2019 -0400
@@ -8,7 +8,9 @@
 import typing as t
 
 import adafruit_dht
+import bme280
 import board
+import smbus2
 
 from . import types
 
@@ -66,6 +68,21 @@
             time.sleep(self._RETRY_WAIT_SECS)
 
 
+class BME280Reader:
+
+    def __init__(self, bus_id: int = 1, address: int = 0x77):
+        self.bus = smbus2.SMBus(bus_id)
+        self.address = address
+        self.calibration = bme280.load_calibration_params(self.bus, address)
+
+    def read(self) -> types.Reading:
+        reading = bme280.sample(self.bus, self.address, self.calibration)
+        return types.Reading.from_now(
+            temp_c=reading.temperature,
+            rh_pct=reading.humidity,
+            pressure_kpa=reading.pressure / 10)
+
+
 def _last_stable(
         vals: t.Iterable[float],
         *,
--- a/weatherlog/types.py	Tue Oct 15 22:40:24 2019 -0400
+++ b/weatherlog/types.py	Tue Oct 15 23:05:10 2019 -0400
@@ -2,6 +2,7 @@
 """
 
 import datetime
+import typing as t
 
 import attr
 import pytz
@@ -12,23 +13,33 @@
     return datetime.datetime.now(tz=pytz.UTC)
 
 
-@attr.s(frozen=True, slots=True)
+T = t.TypeVar('T')
+
+
+@attr.s(auto_attribs=True, frozen=True, slots=True)
 class Reading(object):
     """A single reading from a temperature/humidity sensor."""
 
     # The timestamp of the reading.
-    sample_time = attr.ib(type=datetime.datetime)
+    sample_time: datetime.datetime
 
     # The temperature, in degrees Celsius.
-    temp_c = attr.ib(type=float)
+    temp_c: float
 
     # The relative humidity, in percent.
-    rh_pct = attr.ib(type=float)
+    rh_pct: float
+
+    # The atmospheric pressure in kilopascals, if known.
+    pressure_kpa: t.Optional[float] = None
 
     @classmethod
-    def from_now(cls, **kwargs) -> 'Reading':
+    def from_now(cls: t.Type[T], **kwargs) -> T:
         """Creates a new reading taken at the current time."""
         return cls(sample_time=_utc_now(), **kwargs)
 
     def as_dict(self):
-        return attr.asdict(self, recurse=False)
+        return filter_nones(attr.asdict(self, recurse=False))
+
+
+def filter_nones(d: t.Dict[str, t.Optional[object]]) -> t.Dict[str, object]:
+    return {k: v for (k, v) in d.items() if v is not None}