# HG changeset patch # User Paul Fisher # Date 1569713302 14400 # Node ID 885bff085edf18040fff9cfd030b7267cf8a2fed # Parent 1abd0cc4caa492a4efcbfd6befd2175ea90d114f Add remote logger class for eventual HTTP writer. diff -r 1abd0cc4caa4 -r 885bff085edf __init__.py diff -r 1abd0cc4caa4 -r 885bff085edf weatherlog/logger.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/weatherlog/logger.py Sat Sep 28 19:28:22 2019 -0400 @@ -0,0 +1,167 @@ +"""Module to log temperatures to a directory, and an external website.""" + +import abc +import collections +import datetime +import fcntl +import os +import pathlib +import threading +import time +import typing as t + +import bson +import pytz + +from . import types + +BSON_FILENAME = "temps.bson" +LAST_SENT_FILENAME = "last-sent" + + +class RemoteWriter(metaclass=abc.ABCMeta): + + BATCH_SIZE = 1000 + + @abc.abstractmethod + def write(self, readings: t.Sequence[types.Reading]) -> None: + raise NotImplementedError() + + +class RemoteWriteError(Exception): + """Error to be raised by RemoteWriter.write.""" + pass + + +class BufferedLogger: + """A resilient logger which logs to a local file and a RemoteWriter.""" + + WAIT_TIME = 2 + + def __init__(self, directory: str, writer: RemoteWriter): + self._writer = writer + self._path = pathlib.Path(directory) + self._file = _open_exclusive(self._path / BSON_FILENAME) + self._last_sent_path = self._path / LAST_SENT_FILENAME + last_sent = _read_last_sent(self._last_sent_path) + unsent = _read_unsent_and_advance( + self._file, last_sent) + self._send_queue = collections.deque(unsent) + self._running = False + self._remote_thread: t.Optional[threading.Thread] = None + + def start(self) -> None: + """Starts the bit which logs to HTTP.""" + self._running = True + self._remote_thread = threading.Thread(target=self._send_internal) + self._remote_thread.start() + + def close(self) -> None: + """Stops the logger, closes all files, and stops writing to HTTP.""" + fcntl.flock(self._file, fcntl.LOCK_UN) + self._file.close() + self._running = False + if self._remote_thread: + self._remote_thread.join() + + def write(self, reading: types.Reading): + self._file.write(bson_encode(reading.as_dict())) + self._file.flush() + self._send_queue.append(reading) + + def _send_internal(self) -> None: + to_send: t.List[types.Reading] = [] + while True: + # Wait for multiple entries to build up in the queue. + time.sleep(self.WAIT_TIME) + while len(to_send) < self._writer.BATCH_SIZE: + # Pop all the values we can off the queue. + try: + to_send.append(self._send_queue.popleft()) + except IndexError: + break + if not self._running: + # Stop if we've been asked to stop. + break + if not to_send: + # If there's nothing to send, don't try to send anything. + continue + try: + # Try writing out the values. + self._writer.write(to_send) + except RemoteWriteError: + pass # If it fails, just try again next time. + else: + # If we succeeded, record our success. + last_sent = to_send[-1] + self._update_last_sent(last_sent.sample_time) + to_send.clear() + + def _update_last_sent(self, timestamp: datetime.datetime) -> None: + last_sent_name = self._path / (LAST_SENT_FILENAME + ".new") + with last_sent_name.open('w') as outfile: + outfile.write(str(timestamp.timestamp())) + last_sent_name.rename(self._last_sent_path) + + +def _open_or_create(path: pathlib.Path) -> t.BinaryIO: + while True: + try: + return path.open('r+b') + except FileNotFoundError: + pass + try: + return path.open('x+b') + except FileExistsError: + pass + + +def _open_exclusive(path: pathlib.Path) -> t.BinaryIO: + file = _open_or_create(path) + try: + fcntl.flock(file, fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError as ex: + raise OSError('Another copy of the logger is running.') from ex + return file + + +def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]: + try: + with path.open('r') as infile: + unix_ts = float(infile.read()) + except (OSError, ValueError): + # If the last-written file is missing or corrupt, assume it is dead. + return None + return datetime.datetime.utcfromtimestamp(unix_ts).replace( + tzinfo=pytz.UTC) + + +BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options( + tz_aware=True, tzinfo=pytz.UTC) + + +def bson_encode(data: t.Dict[str, t.Any]) -> bytes: + return bson.BSON.encode(data, codec_options=BSON_OPTIONS) + + +def _read_unsent_and_advance( + infile: t.BinaryIO, last_sent: t.Optional[datetime.datetime], +) -> t.List[types.Reading]: + """Reads all the unsent Readings and advances the file pointer to the end. + """ + reader = bson.decode_file_iter(infile, BSON_OPTIONS) + last_good = infile.tell() + unsent: t.List[types.Reading] = [] + try: + for entry in reader: + last_good = infile.tell() + try: + reading = types.Reading(**entry) + except TypeError: + continue # Invalid entry; skip it. + if not last_sent or last_sent < reading.sample_time: + unsent.append(reading) + except bson.InvalidBSON: + infile.seek(last_good, os.SEEK_SET) + infile.truncate(last_good) + return unsent diff -r 1abd0cc4caa4 -r 885bff085edf weatherlog/logger_test.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/weatherlog/logger_test.py Sat Sep 28 19:28:22 2019 -0400 @@ -0,0 +1,167 @@ +import contextlib +import datetime +import itertools +import pathlib +import tempfile +import time +import unittest + +import bson +import pytz + +from . import logger +from . import types + + +class FakeWriter(logger.RemoteWriter): + + BATCH_SIZE = 2 + + def __init__(self): + self.writes = [] + + def write(self, readings): + self.writes.append(tuple(readings)) + + +class FlakyWriter(logger.RemoteWriter): + is_bad = False + + def __init__(self): + self.writer = FakeWriter() + + def write(self, readings): + if self.is_bad: + raise logger.RemoteWriteError('I am bad!') + self.writer.write(readings) + + @property + def writes(self): + return self.writer.writes + + +def ts(t): + return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC) + + +class LoggerTest(unittest.TestCase): + + maxDiff = None + + def setUp(self): + super().setUp() + self.temp_dir = tempfile.TemporaryDirectory() + self.temp_path = pathlib.Path(self.temp_dir.name) + + def tearDown(self): + self.temp_dir.cleanup() + super().tearDown() + + def test_from_nothing(self): + writer = FakeWriter() + with contextlib.closing( + logger.BufferedLogger(self.temp_dir.name, writer) + ) as bl: + bl.WAIT_TIME = 0.2 + bl.write(types.Reading(ts(3), 1, 2)) + bl.write(types.Reading(ts(6), 4, 5)) + bl.write(types.Reading(ts(8), 10, 9)) + bl.start() + time.sleep(1) # Ensure that we get an entire logger cycle in. + + self.assertEqual( + writer.writes, + [ + (types.Reading(ts(3), 1, 2), types.Reading(ts(6), 4, 5)), + (types.Reading(ts(8), 10, 9),), + ], + ) + self.assertEqual(self._read_last_sent(), '8.0') + self.assertEqual(self._read_bsons(), [ + dict(sample_time=ts(3), temp_c=1, rh_pct=2), + dict(sample_time=ts(6), temp_c=4, rh_pct=5), + dict(sample_time=ts(8), temp_c=10, rh_pct=9), + ]) + + def test_append_and_resume(self): + existing_values = [ + dict(sample_time=ts(10), temp_c=20, rh_pct=30), + dict(sample_time=ts(60), temp_c=10, rh_pct=99), + ] + with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile: + for value in existing_values: + outfile.write(logger.bson_encode(value)) + outfile.write(b'non-BSON garbage') + + with (self.temp_path / logger.LAST_SENT_FILENAME).open('w') as outfile: + outfile.write('10') + + writer = FakeWriter() + with contextlib.closing( + logger.BufferedLogger(str(self.temp_path), writer) + ) as bl: + bl.WAIT_TIME = 0.2 + bl.start() + time.sleep(0.5) + bl.write(types.Reading(ts(99), temp_c=-40, rh_pct=2)) + time.sleep(0.5) + + self.assertEqual(self._read_last_sent(), '99.0') + self.assertEqual(self._read_bsons(), [ + dict(sample_time=ts(10), temp_c=20, rh_pct=30), + dict(sample_time=ts(60), temp_c=10, rh_pct=99), + dict(sample_time=ts(99), temp_c=-40, rh_pct=2), + ]) + self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [ + types.Reading(ts(60), 10, 99), + types.Reading(ts(99), -40, 2), + ]) + + def test_send_failure(self): + writer = FlakyWriter() + with contextlib.closing( + logger.BufferedLogger(str(self.temp_path), writer) + ) as bl: + bl.WAIT_TIME = 0.2 + bl.start() + bl.write(types.Reading(ts(1337), 420, 69)) + time.sleep(0.5) + writer.is_bad = True + bl.write(types.Reading(ts(31337), 666, 999)) + time.sleep(0.5) + + self.assertEqual(self._read_last_sent(), '1337.0') + self.assertEqual( + writer.writes, [(types.Reading(ts(1337), 420, 69),)]) + self.assertEqual(self._read_bsons(), [ + dict(sample_time=ts(1337), temp_c=420, rh_pct=69), + dict(sample_time=ts(31337), temp_c=666, rh_pct=999), + ]) + + # Ensure that we resume writing again when the condition clears. + + writer.is_bad = False + time.sleep(0.5) + self.assertEqual(self._read_last_sent(), '31337.0') + self.assertEqual( + writer.writes, + [ + (types.Reading(ts(1337), 420, 69),), + (types.Reading(ts(31337), 666, 999),), + ]) + self.assertEqual(self._read_bsons(), [ + dict(sample_time=ts(1337), temp_c=420, rh_pct=69), + dict(sample_time=ts(31337), temp_c=666, rh_pct=999), + ]) + + def _read_last_sent(self): + with (self.temp_path / logger.LAST_SENT_FILENAME).open('r') as infile: + return infile.read() + + def _read_bsons(self): + with (self.temp_path / logger.BSON_FILENAME).open('rb') as infile: + return bson.decode_all(infile.read(), logger.BSON_OPTIONS) + + +if __name__ == '__main__': + unittest.main() diff -r 1abd0cc4caa4 -r 885bff085edf weatherlog/reader.py --- a/weatherlog/reader.py Fri Sep 27 21:42:47 2019 -0400 +++ b/weatherlog/reader.py Sat Sep 28 19:28:22 2019 -0400 @@ -4,40 +4,20 @@ """ import abc -import datetime import time import typing as t import adafruit_dht -import attr import board -import pytz - - -def _utc_now() -> datetime.datetime: - """utcnow, but timezone-aware.""" - return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) - -@attr.s(frozen=True, slots=True) -class Reading(object): - """A single reading from a temperature/humidity sensor.""" - - # The temperature, in degrees Celsius. - temp_c = attr.ib(type=float) - - # The relative humidity, in percent. - rh_pct = attr.ib(type=float) - - # The timestamp of the reading. - timestamp = attr.ib(type=datetime.datetime, factory=_utc_now) +from . import types class Reader(metaclass=abc.ABCMeta): """Interface for a thing which reads temperatures.""" @abc.abstractmethod - def read(self) -> Reading: + def read(self) -> types.Reading: """Reads a value from the weather sensor.""" raise NotImplementedError() @@ -53,7 +33,7 @@ _RH_PCT_EPSILON = 1.01 _NEW_READING_SECS = 2.5 - def read(self) -> Reading: + def read(self) -> types.Reading: """Reads a value from the sensor. This will block until done.""" temps: t.List[float] = [] humids: t.List[float] = [] @@ -64,7 +44,7 @@ if _is_reasonable_rh(rh): humids.append(rh) try: - return Reading( + return types.Reading.from_now( temp_c=_last_stable(temps, epsilon=self._TEMP_C_EPSILON), rh_pct=_last_stable(humids, epsilon=self._RH_PCT_EPSILON)) except ValueError: diff -r 1abd0cc4caa4 -r 885bff085edf weatherlog/types.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/weatherlog/types.py Sat Sep 28 19:28:22 2019 -0400 @@ -0,0 +1,34 @@ +"""Basic datatypes for the weather station. +""" + +import datetime + +import attr +import pytz + + +def _utc_now() -> datetime.datetime: + """utcnow, but timezone-aware.""" + return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) + + +@attr.s(frozen=True, slots=True) +class Reading(object): + """A single reading from a temperature/humidity sensor.""" + + # The timestamp of the reading. + sample_time = attr.ib(type=datetime.datetime) + + # The temperature, in degrees Celsius. + temp_c = attr.ib(type=float) + + # The relative humidity, in percent. + rh_pct = attr.ib(type=float) + + @classmethod + def from_now(cls, **kwargs) -> 'Reading': + """Creates a new reading taken at the current time.""" + return cls(sample_time=_utc_now(), **kwargs) + + def as_dict(self): + return attr.asdict(self, recurse=False)