changeset 5:885bff085edf

Add remote logger class for eventual HTTP writer.
author Paul Fisher <paul@pfish.zone>
date Sat, 28 Sep 2019 19:28:22 -0400
parents 1abd0cc4caa4
children 8a350ec1aa78
files __init__.py weatherlog/logger.py weatherlog/logger_test.py weatherlog/reader.py weatherlog/types.py
diffstat 4 files changed, 372 insertions(+), 24 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/weatherlog/logger.py	Sat Sep 28 19:28:22 2019 -0400
@@ -0,0 +1,167 @@
+"""Module to log temperatures to a directory, and an external website."""
+
+import abc
+import collections
+import datetime
+import fcntl
+import os
+import pathlib
+import threading
+import time
+import typing as t
+
+import bson
+import pytz
+
+from . import types
+
+BSON_FILENAME = "temps.bson"
+LAST_SENT_FILENAME = "last-sent"
+
+
+class RemoteWriter(metaclass=abc.ABCMeta):
+
+    BATCH_SIZE = 1000
+
+    @abc.abstractmethod
+    def write(self, readings: t.Sequence[types.Reading]) -> None:
+        raise NotImplementedError()
+
+
+class RemoteWriteError(Exception):
+    """Error to be raised by RemoteWriter.write."""
+    pass
+
+
+class BufferedLogger:
+    """A resilient logger which logs to a local file and a RemoteWriter."""
+
+    WAIT_TIME = 2
+
+    def __init__(self, directory: str, writer: RemoteWriter):
+        self._writer = writer
+        self._path = pathlib.Path(directory)
+        self._file = _open_exclusive(self._path / BSON_FILENAME)
+        self._last_sent_path = self._path / LAST_SENT_FILENAME
+        last_sent = _read_last_sent(self._last_sent_path)
+        unsent = _read_unsent_and_advance(
+            self._file, last_sent)
+        self._send_queue = collections.deque(unsent)
+        self._running = False
+        self._remote_thread: t.Optional[threading.Thread] = None
+
+    def start(self) -> None:
+        """Starts the bit which logs to HTTP."""
+        self._running = True
+        self._remote_thread = threading.Thread(target=self._send_internal)
+        self._remote_thread.start()
+
+    def close(self) -> None:
+        """Stops the logger, closes all files, and stops writing to HTTP."""
+        fcntl.flock(self._file, fcntl.LOCK_UN)
+        self._file.close()
+        self._running = False
+        if self._remote_thread:
+            self._remote_thread.join()
+
+    def write(self, reading: types.Reading):
+        self._file.write(bson_encode(reading.as_dict()))
+        self._file.flush()
+        self._send_queue.append(reading)
+
+    def _send_internal(self) -> None:
+        to_send: t.List[types.Reading] = []
+        while True:
+            # Wait for multiple entries to build up in the queue.
+            time.sleep(self.WAIT_TIME)
+            while len(to_send) < self._writer.BATCH_SIZE:
+                # Pop all the values we can off the queue.
+                try:
+                    to_send.append(self._send_queue.popleft())
+                except IndexError:
+                    break
+            if not self._running:
+                # Stop if we've been asked to stop.
+                break
+            if not to_send:
+                # If there's nothing to send, don't try to send anything.
+                continue
+            try:
+                # Try writing out the values.
+                self._writer.write(to_send)
+            except RemoteWriteError:
+                pass  # If it fails, just try again next time.
+            else:
+                # If we succeeded, record our success.
+                last_sent = to_send[-1]
+                self._update_last_sent(last_sent.sample_time)
+                to_send.clear()
+
+    def _update_last_sent(self, timestamp: datetime.datetime) -> None:
+        last_sent_name = self._path / (LAST_SENT_FILENAME + ".new")
+        with last_sent_name.open('w') as outfile:
+            outfile.write(str(timestamp.timestamp()))
+        last_sent_name.rename(self._last_sent_path)
+
+
+def _open_or_create(path: pathlib.Path) -> t.BinaryIO:
+    while True:
+        try:
+            return path.open('r+b')
+        except FileNotFoundError:
+            pass
+        try:
+            return path.open('x+b')
+        except FileExistsError:
+            pass
+
+
+def _open_exclusive(path: pathlib.Path) -> t.BinaryIO:
+    file = _open_or_create(path)
+    try:
+        fcntl.flock(file, fcntl.LOCK_EX | fcntl.LOCK_NB)
+    except BlockingIOError as ex:
+        raise OSError('Another copy of the logger is running.') from ex
+    return file
+
+
+def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]:
+    try:
+        with path.open('r') as infile:
+            unix_ts = float(infile.read())
+    except (OSError, ValueError):
+        # If the last-written file is missing or corrupt, assume it is dead.
+        return None
+    return datetime.datetime.utcfromtimestamp(unix_ts).replace(
+        tzinfo=pytz.UTC)
+
+
+BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options(
+    tz_aware=True, tzinfo=pytz.UTC)
+
+
+def bson_encode(data: t.Dict[str, t.Any]) -> bytes:
+    return bson.BSON.encode(data, codec_options=BSON_OPTIONS)
+
+
+def _read_unsent_and_advance(
+    infile: t.BinaryIO, last_sent: t.Optional[datetime.datetime],
+) -> t.List[types.Reading]:
+    """Reads all the unsent Readings and advances the file pointer to the end.
+    """
+    reader = bson.decode_file_iter(infile, BSON_OPTIONS)
+    last_good = infile.tell()
+    unsent: t.List[types.Reading] = []
+    try:
+        for entry in reader:
+            last_good = infile.tell()
+            try:
+                reading = types.Reading(**entry)
+            except TypeError:
+                continue  # Invalid entry; skip it.
+            if not last_sent or last_sent < reading.sample_time:
+                unsent.append(reading)
+    except bson.InvalidBSON:
+        infile.seek(last_good, os.SEEK_SET)
+        infile.truncate(last_good)
+    return unsent
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/weatherlog/logger_test.py	Sat Sep 28 19:28:22 2019 -0400
@@ -0,0 +1,167 @@
+import contextlib
+import datetime
+import itertools
+import pathlib
+import tempfile
+import time
+import unittest
+
+import bson
+import pytz
+
+from . import logger
+from . import types
+
+
+class FakeWriter(logger.RemoteWriter):
+
+    BATCH_SIZE = 2
+
+    def __init__(self):
+        self.writes = []
+
+    def write(self, readings):
+        self.writes.append(tuple(readings))
+
+
+class FlakyWriter(logger.RemoteWriter):
+    is_bad = False
+
+    def __init__(self):
+        self.writer = FakeWriter()
+
+    def write(self, readings):
+        if self.is_bad:
+            raise logger.RemoteWriteError('I am bad!')
+        self.writer.write(readings)
+
+    @property
+    def writes(self):
+        return self.writer.writes
+
+
+def ts(t):
+    return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC)
+
+
+class LoggerTest(unittest.TestCase):
+
+    maxDiff = None
+
+    def setUp(self):
+        super().setUp()
+        self.temp_dir = tempfile.TemporaryDirectory()
+        self.temp_path = pathlib.Path(self.temp_dir.name)
+
+    def tearDown(self):
+        self.temp_dir.cleanup()
+        super().tearDown()
+
+    def test_from_nothing(self):
+        writer = FakeWriter()
+        with contextlib.closing(
+            logger.BufferedLogger(self.temp_dir.name, writer)
+        ) as bl:
+            bl.WAIT_TIME = 0.2
+            bl.write(types.Reading(ts(3), 1, 2))
+            bl.write(types.Reading(ts(6), 4, 5))
+            bl.write(types.Reading(ts(8), 10, 9))
+            bl.start()
+            time.sleep(1)  # Ensure that we get an entire logger cycle in.
+
+        self.assertEqual(
+            writer.writes,
+            [
+                (types.Reading(ts(3), 1, 2), types.Reading(ts(6), 4, 5)),
+                (types.Reading(ts(8), 10, 9),),
+            ],
+        )
+        self.assertEqual(self._read_last_sent(), '8.0')
+        self.assertEqual(self._read_bsons(), [
+            dict(sample_time=ts(3), temp_c=1, rh_pct=2),
+            dict(sample_time=ts(6), temp_c=4, rh_pct=5),
+            dict(sample_time=ts(8), temp_c=10, rh_pct=9),
+        ])
+
+    def test_append_and_resume(self):
+        existing_values = [
+            dict(sample_time=ts(10), temp_c=20, rh_pct=30),
+            dict(sample_time=ts(60), temp_c=10, rh_pct=99),
+        ]
+        with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile:
+            for value in existing_values:
+                outfile.write(logger.bson_encode(value))
+            outfile.write(b'non-BSON garbage')
+
+        with (self.temp_path / logger.LAST_SENT_FILENAME).open('w') as outfile:
+            outfile.write('10')
+
+        writer = FakeWriter()
+        with contextlib.closing(
+            logger.BufferedLogger(str(self.temp_path), writer)
+        ) as bl:
+            bl.WAIT_TIME = 0.2
+            bl.start()
+            time.sleep(0.5)
+            bl.write(types.Reading(ts(99), temp_c=-40, rh_pct=2))
+            time.sleep(0.5)
+
+        self.assertEqual(self._read_last_sent(), '99.0')
+        self.assertEqual(self._read_bsons(), [
+            dict(sample_time=ts(10), temp_c=20, rh_pct=30),
+            dict(sample_time=ts(60), temp_c=10, rh_pct=99),
+            dict(sample_time=ts(99), temp_c=-40, rh_pct=2),
+        ])
+        self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [
+            types.Reading(ts(60), 10, 99),
+            types.Reading(ts(99), -40, 2),
+        ])
+
+    def test_send_failure(self):
+        writer = FlakyWriter()
+        with contextlib.closing(
+            logger.BufferedLogger(str(self.temp_path), writer)
+        ) as bl:
+            bl.WAIT_TIME = 0.2
+            bl.start()
+            bl.write(types.Reading(ts(1337), 420, 69))
+            time.sleep(0.5)
+            writer.is_bad = True
+            bl.write(types.Reading(ts(31337), 666, 999))
+            time.sleep(0.5)
+
+            self.assertEqual(self._read_last_sent(), '1337.0')
+            self.assertEqual(
+                writer.writes, [(types.Reading(ts(1337), 420, 69),)])
+            self.assertEqual(self._read_bsons(), [
+                dict(sample_time=ts(1337), temp_c=420, rh_pct=69),
+                dict(sample_time=ts(31337), temp_c=666, rh_pct=999),
+            ])
+
+            # Ensure that we resume writing again when the condition clears.
+
+            writer.is_bad = False
+            time.sleep(0.5)
+        self.assertEqual(self._read_last_sent(), '31337.0')
+        self.assertEqual(
+            writer.writes,
+            [
+                (types.Reading(ts(1337), 420, 69),),
+                (types.Reading(ts(31337), 666, 999),),
+            ])
+        self.assertEqual(self._read_bsons(), [
+            dict(sample_time=ts(1337), temp_c=420, rh_pct=69),
+            dict(sample_time=ts(31337), temp_c=666, rh_pct=999),
+        ])
+
+    def _read_last_sent(self):
+        with (self.temp_path / logger.LAST_SENT_FILENAME).open('r') as infile:
+            return infile.read()
+
+    def _read_bsons(self):
+        with (self.temp_path / logger.BSON_FILENAME).open('rb') as infile:
+            return bson.decode_all(infile.read(), logger.BSON_OPTIONS)
+
+
+if __name__ == '__main__':
+    unittest.main()
--- a/weatherlog/reader.py	Fri Sep 27 21:42:47 2019 -0400
+++ b/weatherlog/reader.py	Sat Sep 28 19:28:22 2019 -0400
@@ -4,40 +4,20 @@
 """
 
 import abc
-import datetime
 import time
 import typing as t
 
 import adafruit_dht
-import attr
 import board
-import pytz
-
-
-def _utc_now() -> datetime.datetime:
-    """utcnow, but timezone-aware."""
-    return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
-
 
-@attr.s(frozen=True, slots=True)
-class Reading(object):
-    """A single reading from a temperature/humidity sensor."""
-
-    # The temperature, in degrees Celsius.
-    temp_c = attr.ib(type=float)
-
-    # The relative humidity, in percent.
-    rh_pct = attr.ib(type=float)
-
-    # The timestamp of the reading.
-    timestamp = attr.ib(type=datetime.datetime, factory=_utc_now)
+from . import types
 
 
 class Reader(metaclass=abc.ABCMeta):
     """Interface for a thing which reads temperatures."""
 
     @abc.abstractmethod
-    def read(self) -> Reading:
+    def read(self) -> types.Reading:
         """Reads a value from the weather sensor."""
         raise NotImplementedError()
 
@@ -53,7 +33,7 @@
     _RH_PCT_EPSILON = 1.01
     _NEW_READING_SECS = 2.5
 
-    def read(self) -> Reading:
+    def read(self) -> types.Reading:
         """Reads a value from the sensor.  This will block until done."""
         temps: t.List[float] = []
         humids: t.List[float] = []
@@ -64,7 +44,7 @@
             if _is_reasonable_rh(rh):
                 humids.append(rh)
             try:
-                return Reading(
+                return types.Reading.from_now(
                     temp_c=_last_stable(temps, epsilon=self._TEMP_C_EPSILON),
                     rh_pct=_last_stable(humids, epsilon=self._RH_PCT_EPSILON))
             except ValueError:
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/weatherlog/types.py	Sat Sep 28 19:28:22 2019 -0400
@@ -0,0 +1,34 @@
+"""Basic datatypes for the weather station.
+"""
+
+import datetime
+
+import attr
+import pytz
+
+
+def _utc_now() -> datetime.datetime:
+    """utcnow, but timezone-aware."""
+    return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
+
+
+@attr.s(frozen=True, slots=True)
+class Reading(object):
+    """A single reading from a temperature/humidity sensor."""
+
+    # The timestamp of the reading.
+    sample_time = attr.ib(type=datetime.datetime)
+
+    # The temperature, in degrees Celsius.
+    temp_c = attr.ib(type=float)
+
+    # The relative humidity, in percent.
+    rh_pct = attr.ib(type=float)
+
+    @classmethod
+    def from_now(cls, **kwargs) -> 'Reading':
+        """Creates a new reading taken at the current time."""
+        return cls(sample_time=_utc_now(), **kwargs)
+
+    def as_dict(self):
+        return attr.asdict(self, recurse=False)