changeset 0:efe7a1eff167

Create initial logger for weather server.
author Paul Fisher <paul@pfish.zone>
date Sat, 28 Sep 2019 23:17:21 -0400
parents
children f66df122f18d
files weather_server/__init__.py weather_server/common.py weather_server/logfile.py weather_server/logfile_test.py weather_server/types.py
diffstat 4 files changed, 312 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/weather_server/common.py	Sat Sep 28 23:17:21 2019 -0400
@@ -0,0 +1,11 @@
+import typing as t
+
+import bson
+import pytz
+
+BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options(
+    tz_aware=True, tzinfo=pytz.UTC)
+
+
+def bson_encode(data: t.Dict[str, t.Any]) -> bytes:
+    return bson.BSON.encode(data, codec_options=BSON_OPTIONS)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/weather_server/logfile.py	Sat Sep 28 23:17:21 2019 -0400
@@ -0,0 +1,120 @@
+"""The part which handles writing things out and reading things in from CSV.
+"""
+
+import fcntl
+import os
+import threading
+import typing as t
+
+import bson
+
+from . import common
+from . import types
+
+
+class Logger:
+    """Logger which handles reading/writing a temperature log for one process.
+    """
+
+    instance_lock = threading.Lock()
+    instances: t.Dict[str, 'Logger'] = {}
+
+    @classmethod
+    def create(cls, filename: str) -> 'Logger':
+        """Creates a single shared instance of a logger for the given file."""
+        try:
+            return cls.instances[filename]
+        except KeyError:
+            with cls.instance_lock:
+                try:
+                    return cls.instances[filename]
+                except KeyError:
+                    cls.instances[filename] = Logger(filename)
+                    return cls.instances[filename]
+
+    def __init__(self, filename: str):
+        """You should probably call .create() instead."""
+        self._file = _open_or_create(filename)
+        self._data: t.Tuple[types.Reading] = ()
+        self._last_size = 0
+        self._maybe_read_data()
+        self._lock = threading.Lock()
+
+    def _maybe_read_data(self) -> None:
+        """Reads data and advances the file pointer to the end of the file."""
+        # This must be called with both the file lock and _lock held.
+        size = self._size()
+        if size == self._last_size:
+            return
+        last_good = self._file.tell()
+        data = list(self._data)
+        try:
+            items = bson.decode_file_iter(
+                self._file, codec_options=common.BSON_OPTIONS)
+            for item in items:
+                last_good = self._file.tell()
+                try:
+                    data.append(types.Reading(**item))
+                except TypeError:
+                    pass  # Skip this item.
+        except bson.InvalidBSON:
+            pass  # We have reached the last valid document.  Bail.
+        # Seek back to immediately after the end of the last valid doc.
+        self._data = tuple(data)
+        self._file.truncate(last_good)
+        self._last_size = last_good
+        self._file.seek(last_good, os.SEEK_SET)
+
+    def write_rows(self, readings: t.Iterable[types.Reading]) -> None:
+        """Write a sorted series of readings, ignoring old ones."""
+        with self._lock:
+            fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
+            try:
+                self._maybe_read_data()
+                self._file.truncate(self._file.tell())
+                data = list(self._data)
+                if not data:
+                    last_time = None
+                else:
+                    last_time = data[-1].sample_time
+                for reading in readings:
+                    if not last_time or last_time < reading.sample_time:
+                        self._file.write(common.bson_encode(reading.as_dict()))
+                        data.append(reading)
+                self._data = tuple(data)
+            finally:
+                self._file.flush()
+                self._last_size = self._size()
+                fcntl.flock(self, fcntl.LOCK_UN)
+
+    def fileno(self) -> int:
+        return self._file.fileno()
+
+    def close(self):
+        self._file.close()
+
+    @property
+    def data(self) -> t.Tuple[types.Reading, ...]:
+        if self._size() != self._last_size:
+            fcntl.flock(self, fcntl.LOCK_SH)
+            try:
+                with self._lock:
+                    self._maybe_read_data()
+            finally:
+                fcntl.flock(self, fcntl.LOCK_UN)
+        return self._data
+
+    def _size(self) -> int:
+        return os.stat(self.fileno()).st_size
+
+
+def _open_or_create(path: str) -> t.BinaryIO:
+    while True:
+        try:
+            return open(path, 'r+b')
+        except FileNotFoundError:
+            pass
+        try:
+            return open(path, 'x+b')
+        except FileExistsError:
+            pass
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/weather_server/logfile_test.py	Sat Sep 28 23:17:21 2019 -0400
@@ -0,0 +1,113 @@
+import contextlib
+import datetime
+import pathlib
+import tempfile
+import unittest
+
+import bson
+import pytz
+
+from . import common
+from . import logfile
+from . import types
+
+
+def ts(n):
+    return datetime.datetime.utcfromtimestamp(n).replace(tzinfo=pytz.UTC)
+
+
+class LoggerTest(unittest.TestCase):
+
+    maxDiff = None
+
+    def setUp(self):
+        super().setUp()
+        self.temp_dir = tempfile.TemporaryDirectory()
+        self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson'
+
+    def tearDown(self):
+        self.temp_dir.cleanup()
+        super().tearDown()
+
+    def test_empty(self):
+        with contextlib.closing(logfile.Logger(str(self.log_path))) as logger:
+            self.assertEqual(logger.data, ())
+
+    def test_loading(self):
+        with self.log_path.open('wb') as outfile:
+            outfile.write(common.bson_encode(dict(
+                sample_time=ts(123),
+                temp_c=420,
+                rh_pct=69,
+                ingest_time=ts(125),
+            )))
+            outfile.write(b'garbage to ignore')
+        with contextlib.closing(logfile.Logger(str(self.log_path))) as logger:
+            self.assertEqual(
+                logger.data,
+                (types.Reading(ts(123), 420, 69, ts(125)),))
+
+    def test_writing(self):
+        with self.log_path.open('wb') as outfile:
+            outfile.write(common.bson_encode(dict(
+                sample_time=ts(123),
+                temp_c=420,
+                rh_pct=69,
+                ingest_time=ts(125),
+            )))
+        with contextlib.closing(logfile.Logger(str(self.log_path))) as logger:
+            logger.write_rows([
+                types.Reading(ts(100), 999, 666, ts(101)),
+                types.Reading(ts(125), 333, 777, ts(200)),
+            ])
+            self.assertEqual(
+                logger.data,
+                (
+                    types.Reading(ts(123), 420, 69, ts(125)),
+                    types.Reading(ts(125), 333, 777, ts(200)),
+                )
+            )
+
+        self.assertEqual(self.read_bsons(), [
+            dict(
+                sample_time=ts(123),
+                temp_c=420,
+                rh_pct=69,
+                ingest_time=ts(125),
+            ),
+            dict(
+                sample_time=ts(125),
+                temp_c=333,
+                rh_pct=777,
+                ingest_time=ts(200),
+            ),
+        ])
+
+    def test_outside_writes(self):
+        with contextlib.closing(logfile.Logger(str(self.log_path))) as logger:
+            logger.write_rows([
+                types.Reading(ts(100), 999, 666, ts(101)),
+                types.Reading(ts(125), 333, 777, ts(200)),
+            ])
+            with self.log_path.open('ab') as outfile:
+                outfile.write(common.bson_encode(dict(
+                    sample_time=ts(1024),
+                    temp_c=256,
+                    rh_pct=128,
+                    ingest_time=ts(4096),
+                )))
+                outfile.flush()
+            self.assertEqual(logger.data, (
+                types.Reading(ts(100), 999, 666, ts(101)),
+                types.Reading(ts(125), 333, 777, ts(200)),
+                types.Reading(ts(1024), 256, 128, ts(4096)),
+            ))
+
+    def read_bsons(self):
+        with self.log_path.open('rb') as infile:
+            return bson.decode_all(
+                infile.read(), common.BSON_OPTIONS)
+
+
+if __name__ == '__main__':
+    unittest.main()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/weather_server/types.py	Sat Sep 28 23:17:21 2019 -0400
@@ -0,0 +1,68 @@
+"""Basic data types for the weather server."""
+
+import datetime
+import math
+import typing as t
+
+import attr
+import pytz
+
+
+def c_to_f(c: float) -> float:
+    return c * 9 / 5 + 32
+
+
+# Values from Sontag1990 via Wikipedia:
+# https://en.wikipedia.org/wiki/Dew_point
+_MAGNUS_B = 17.62
+_MAGNUS_C = 243.12
+
+
+@attr.s(frozen=True, slots=True)
+class Reading:
+    """A single reading from a weather thingy.
+
+    Field order is important, as it is used in CSV files.
+    """
+
+    # The Unix timestamp of the reading.
+    sample_time = attr.ib(type=datetime.datetime)
+
+    # The temperature, in degrees Celsius.
+    temp_c = attr.ib(type=float)
+
+    # The relative humidity, in percent.
+    rh_pct = attr.ib(type=float)
+
+    # The Unix timestamp when the reading was received.
+    ingest_time = attr.ib(type=datetime.datetime)
+
+    @property
+    def temp_f(self) -> float:
+        return c_to_f(self.temp_c)
+
+    @property
+    def dew_point_c(self) -> float:
+        gamma = self._gamma
+        return _MAGNUS_C * gamma / (_MAGNUS_B - gamma)
+
+    @property
+    def dew_point_f(self) -> float:
+        return c_to_f(self.dew_point_c)
+
+    def as_dict(self) -> t.Dict[str, t.Any]:
+        return attr.asdict(self, recurse=False)
+
+    @classmethod
+    def from_now(cls, **kwargs) -> 'Reading':
+        return cls(ingest_time=_utc_now(), **kwargs)
+
+    @property
+    def _gamma(self) -> float:
+        return (
+            math.log(self.rh_pct / 100) +
+            _MAGNUS_B * self.temp_c / (_MAGNUS_C + self.temp_c))
+
+
+def _utc_now():
+    return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)