changeset 21:beb42c835c52

Make weather server handle arbitrary data: - Make logfile record arbitrary BSONs - Make server handlers OK with same - Make location type a normal class rather than attrs; have it handle its own logger. - Bump version number.
author Paul Fisher <paul@pfish.zone>
date Sat, 19 Oct 2019 18:40:48 -0400
parents a7fe635d1c88
children e229afdd447b
files setup.py weather_server/locations.py weather_server/logfile.py weather_server/logfile_test.py weather_server/server.py weather_server/types.py
diffstat 6 files changed, 242 insertions(+), 126 deletions(-) [+]
line wrap: on
line diff
--- a/setup.py	Sun Oct 13 18:44:12 2019 -0400
+++ b/setup.py	Sat Oct 19 18:40:48 2019 -0400
@@ -2,7 +2,7 @@
 
 setuptools.setup(
     name='weather-server',
-    version='0.0.5',
+    version='0.0.6',
     packages=setuptools.find_packages(),
     python_requires='>=3.7',
     install_requires=[
--- a/weather_server/locations.py	Sun Oct 13 18:44:12 2019 -0400
+++ b/weather_server/locations.py	Sat Oct 19 18:40:48 2019 -0400
@@ -1,53 +1,95 @@
 """Manages the directory containing the various logs."""
 
 import configparser
+import datetime
 import pathlib
 import typing as t
 
-import attr
 import pytz
 
 from . import logfile
 from . import types
 
-
 CONFIG_FILE = 'config.ini'
 LOG = 'log.bson'
 
 
-@attr.s(frozen=True, slots=True)
-class LocationInfo:
-    name = attr.ib(type=str)
-    tz_name = attr.ib(type=str)
-    password = attr.ib(type=str)
+class ConfigError(Exception):
+    """Raised when a location can't be loaded."""
+
+
+class Location:
+
+    def __init__(self, root: pathlib.Path):
+        parser = configparser.ConfigParser(interpolation=None)
+        self.root = root
+        config_file = root / CONFIG_FILE
+        try:
+            with open(config_file, 'r') as infile:
+                parser.read_file(infile)
+            self.location = parser.get(
+                'location', 'name', fallback='Weather station')
+            self.tz_name = parser.get('location', 'timezone', fallback='UTC')
+            self.password = parser.get('location', 'password')
+            self.logger = logfile.Logger.create(
+                str(root / LOG), sample_field='sample_time')
+        except (IOError, KeyError, configparser.Error):
+            raise ConfigError("Couldn't load location info.")
 
-    def timezone(self):
+    def record(
+        self,
+        entries: t.Iterable[t.Dict[str, object]],
+        timestamp: datetime.datetime,
+    ) -> None:
+        for e in entries:
+            e['ingest_time'] = timestamp
+        self.logger.write_rows(entries)
+
+    @property
+    def entries(self) -> t.Iterable[t.Dict[str, object]]:
+        return self.logger.data
+
+    def latest(self) -> t.Optional[types.Reading]:
+        most_recent = reversed(self.logger.data)
+        for entry in most_recent:
+            try:
+                return types.Reading.from_dict(entry)
+            except KeyError:
+                pass  # go to the older one.
+        return None
+
+    def timezone(self) -> datetime.tzinfo:
         try:
             return pytz.timezone(self.tz_name)
         except pytz.UnknownTimeZoneError:
             return pytz.UTC
 
-    @classmethod
-    def load(cls, config_file: pathlib.Path) -> 'LocationInfo':
-        parser = configparser.ConfigParser(interpolation=None)
-        parser.read(config_file)
-        return LocationInfo(
-            name=parser.get('location', 'name', fallback='Weather station'),
-            tz_name=parser.get('location', 'timezone', fallback='UTC'),
-            password=parser.get('location', 'password'))
+    def __repr__(self) -> str:
+        return '<Location in %r>'.format(self.root)
 
 
-class Locations:
-    def __init__(self, base: str):
-        self._path = pathlib.Path(base)
+class LocationFolder:
 
-    def paths(self) -> t.Tuple[str, ...]:
-        return tuple(sorted(f.name for f in self._path.iterdir()))
+    def __init__(self, root: pathlib.Path):
+        self.root = root
+        # locations, mtime
+        self.info: t.Tuple[t.Dict[str, Location], t.Optional[int]] = ({}, None)
+        self._maybe_reload()
+
+    def get(self, name: str) -> Location:
+        self._maybe_reload()
+        locs, _ = self.info
+        return locs[name]
 
-    def get(self, name) -> t.Tuple[LocationInfo, logfile.Logger]:
-        try:
-            directory = self._path / name
-            logger = logfile.Logger.create(str(directory / LOG))
-            return (LocationInfo.load(directory / CONFIG_FILE), logger)
-        except OSError:
-            raise KeyError(name)
+    def _maybe_reload(self) -> None:
+        new_mtime = self.root.stat().st_mtime_ns
+        _, old_mtime = self.info
+        if old_mtime == new_mtime:
+            return
+        locations = {}
+        for child in self.root.iterdir():
+            try:
+                locations[child.name] = Location(child)
+            except ConfigError:
+                pass  # It's OK. Skip this.
+        self.info = locations, new_mtime
--- a/weather_server/logfile.py	Sun Oct 13 18:44:12 2019 -0400
+++ b/weather_server/logfile.py	Sat Oct 19 18:40:48 2019 -0400
@@ -1,16 +1,37 @@
 """The part which handles writing things out and reading things in from CSV.
 """
 
+import concurrent.futures as futures
+import contextlib
 import fcntl
 import os
+import queue
 import threading
 import typing as t
 
 import bson
 
 from . import common
-from . import types
+
+
+class _WriteRequest:
 
+    def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]):
+        """Creates a request to write the given data to the log."""
+        # The data to be written.  We take ownership of all the dicts!
+        self.entries = entries
+        # Once written, a future that will resolve to None if successful.
+        self.future = futures.Future()
+
+
+class _ReadRequest:
+
+    def __init__(self):
+        # The future that will be set with the log's contnets.
+        self.future = futures.Future()
+
+
+# probably handle file-writing with a queue that reports back its progress
 
 class Logger:
     """Logger which handles reading/writing a temperature log for one process.
@@ -20,93 +41,140 @@
     instances: t.Dict[str, 'Logger'] = {}
 
     @classmethod
-    def create(cls, filename: str) -> 'Logger':
+    def create(
+        cls,
+        filename: str,
+        *,
+        sample_field: str,
+    ) -> 'Logger':
         """Creates a single shared instance of a logger for the given file."""
         try:
-            return cls.instances[filename]
+            instance = cls.instances[filename]
         except KeyError:
             with cls.instance_lock:
                 try:
-                    return cls.instances[filename]
+                    instance = cls.instances[filename]
                 except KeyError:
-                    cls.instances[filename] = Logger(filename)
-                    return cls.instances[filename]
+                    cls.instances[filename] = Logger(
+                        filename,
+                        sample_field=sample_field)
+                    instance = cls.instances[filename]
+        if instance._sample_field != sample_field:
+            raise ValueError(
+                'Existing instance has different sample field: '
+                '{!r} != {!r}'.format(instance._sample_field, sample_field))
+        return instance
 
-    def __init__(self, filename: str):
+    def __init__(self, filename: str, *, sample_field: str):
         """You should probably call .create() instead."""
+        self._sample_field = sample_field
         self._file = _open_or_create(filename)
-        self._data: t.Tuple[types.Reading] = ()
+        self._data: t.List[t.Dict[str, t.Any], ...] = []
+        self._queue = queue.SimpleQueue()
         self._last_size = 0
-        self._maybe_read_data()
-        self._lock = threading.Lock()
+        self._lock_status: t.Optional[int] = None
+        self._writer_thread = threading.Thread(target=self._writer)
+        self._writer_thread.start()
+
+    @property
+    def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
+        req = _ReadRequest()
+        self._queue.put(req)
+        return req.future.result()
+
+    def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]):
+        req = _WriteRequest(entries)
+        self._queue.put(req)
+        return req.future.result()
+
+    _POISON = object()
+
+    def close(self):
+        self._queue.put(self._POISON)
+        self._writer_thread.join()
 
-    def _maybe_read_data(self) -> None:
+    def _writer(self) -> None:
+        running = True
+        while running:
+            item = self._queue.get()
+            if item is self._POISON:
+                # None is the poison pill that makes us stop.
+                running = False
+            elif isinstance(item, _ReadRequest):
+                if not item.future.set_running_or_notify_cancel():
+                    continue
+                try:
+                    with self._file_lock(fcntl.LOCK_SH):
+                        self._catch_up()
+                except BaseException as x:
+                    item.future.set_exception(x)
+                else:
+                    item.future.set_result(tuple(self._data))
+            elif isinstance(item, _WriteRequest):
+                if not item.future.set_running_or_notify_cancel():
+                    continue
+                try:
+                    with self._file_lock(fcntl.LOCK_EX):
+                        self._catch_up()
+                        # Since we're at the last good point, truncate after.
+                        self._file.truncate(self._file.tell())
+                        if not self._data:
+                            last = None
+                        else:
+                            last = self._data[-1][self._sample_field]
+                        for entry in item.entries:
+                            entry_key = entry[self._sample_field]
+                            if last is None or last < entry_key:
+                                self._file.write(common.bson_encode(entry))
+                                self._data.append(entry)
+                                last = entry_key
+                        self._file.flush()
+                        self._last_size = self._file.tell()
+                except BaseException as x:
+                    item.future.set_exception(x)
+                else:
+                    item.future.set_result(None)
+            else:
+                raise AssertionError(
+                    'Unexpected item {!r} in the queue'.format(item))
+        self._file.close()
+
+    def _catch_up(self) -> None:
         """Reads data and advances the file pointer to the end of the file."""
-        # This must be called with both the file lock and _lock held.
+        assert self._lock_status is not None, 'The lock must be held.'
         size = self._size()
         if size == self._last_size:
             return
         last_good = self._file.tell()
-        data = list(self._data)
         try:
             items = bson.decode_file_iter(
                 self._file, codec_options=common.BSON_OPTIONS)
             for item in items:
                 last_good = self._file.tell()
-                try:
-                    data.append(types.Reading(**item))
-                except TypeError:
-                    pass  # Skip this item.
+                self._data.append(item)
         except bson.InvalidBSON:
             pass  # We have reached the last valid document.  Bail.
         # Seek back to immediately after the end of the last valid doc.
-        self._data = tuple(data)
-        self._file.truncate(last_good)
         self._last_size = last_good
         self._file.seek(last_good, os.SEEK_SET)
 
-    def write_rows(self, readings: t.Iterable[types.Reading]) -> None:
-        """Write a sorted series of readings, ignoring old ones."""
-        with self._lock:
-            fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
-            try:
-                self._maybe_read_data()
-                self._file.truncate(self._file.tell())
-                data = list(self._data)
-                if not data:
-                    last_time = None
-                else:
-                    last_time = data[-1].sample_time
-                for reading in readings:
-                    if not last_time or last_time < reading.sample_time:
-                        self._file.write(common.bson_encode(reading.as_dict()))
-                        data.append(reading)
-                self._data = tuple(data)
-            finally:
-                self._file.flush()
-                self._last_size = self._size()
-                fcntl.flock(self, fcntl.LOCK_UN)
-
     def fileno(self) -> int:
         return self._file.fileno()
 
-    def close(self):
-        self._file.close()
-
-    @property
-    def data(self) -> t.Tuple[types.Reading, ...]:
-        if self._size() != self._last_size:
-            fcntl.flock(self, fcntl.LOCK_SH)
-            try:
-                with self._lock:
-                    self._maybe_read_data()
-            finally:
-                fcntl.flock(self, fcntl.LOCK_UN)
-        return self._data
-
     def _size(self) -> int:
         return os.stat(self.fileno()).st_size
 
+    @contextlib.contextmanager
+    def _file_lock(self, operation: int):
+        assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.'
+        fcntl.flock(self, operation)
+        self._lock_status = operation
+        try:
+            yield
+        finally:
+            self._lock_status = None
+            fcntl.flock(self, fcntl.LOCK_UN)
+
 
 def _open_or_create(path: str) -> t.BinaryIO:
     while True:
--- a/weather_server/logfile_test.py	Sun Oct 13 18:44:12 2019 -0400
+++ b/weather_server/logfile_test.py	Sat Oct 19 18:40:48 2019 -0400
@@ -30,7 +30,9 @@
         super().tearDown()
 
     def test_empty(self):
-        with contextlib.closing(logfile.Logger(str(self.log_path))) as logger:
+        lg = logfile.Logger(
+            str(self.log_path), sample_field='x')
+        with contextlib.closing(lg) as logger:
             self.assertEqual(logger.data, ())
 
     def test_loading(self):
@@ -42,10 +44,19 @@
                 ingest_time=ts(125),
             )))
             outfile.write(b'garbage to ignore')
-        with contextlib.closing(logfile.Logger(str(self.log_path))) as logger:
+        with contextlib.closing(logfile.Logger(
+            str(self.log_path),
+            sample_field='sample_time',
+        )) as logger:
             self.assertEqual(
                 logger.data,
-                (types.Reading(ts(123), 420, 69, ts(125)),))
+                (
+                    dict(
+                        sample_time=ts(123),
+                        temp_c=420,
+                        rh_pct=69,
+                        ingest_time=ts(125)),
+                ))
 
     def test_writing(self):
         with self.log_path.open('wb') as outfile:
@@ -55,16 +66,20 @@
                 rh_pct=69,
                 ingest_time=ts(125),
             )))
-        with contextlib.closing(logfile.Logger(str(self.log_path))) as logger:
+        with contextlib.closing(logfile.Logger(
+            str(self.log_path),
+            sample_field='sample_time',
+        )) as logger:
             logger.write_rows([
-                types.Reading(ts(100), 999, 666, ts(101)),
-                types.Reading(ts(125), 333, 777, ts(200)),
+                # Ignored, since it's older than the newest entry.
+                types.Reading(ts(100), 999, 666, ts(101)).as_dict(),
+                types.Reading(ts(125), 333, 777, ts(200)).as_dict(),
             ])
             self.assertEqual(
                 logger.data,
                 (
-                    types.Reading(ts(123), 420, 69, ts(125)),
-                    types.Reading(ts(125), 333, 777, ts(200)),
+                    types.Reading(ts(123), 420, 69, ts(125)).as_dict(),
+                    types.Reading(ts(125), 333, 777, ts(200)).as_dict(),
                 )
             )
 
@@ -84,10 +99,13 @@
         ])
 
     def test_outside_writes(self):
-        with contextlib.closing(logfile.Logger(str(self.log_path))) as logger:
+        with contextlib.closing(logfile.Logger(
+            str(self.log_path),
+            sample_field='sample_time',
+        )) as logger:
             logger.write_rows([
-                types.Reading(ts(100), 999, 666, ts(101)),
-                types.Reading(ts(125), 333, 777, ts(200)),
+                types.Reading(ts(100), 999, 666, ts(101)).as_dict(),
+                types.Reading(ts(125), 333, 777, ts(200)).as_dict(),
             ])
             with self.log_path.open('ab') as outfile:
                 outfile.write(common.bson_encode(dict(
@@ -98,9 +116,9 @@
                 )))
                 outfile.flush()
             self.assertEqual(logger.data, (
-                types.Reading(ts(100), 999, 666, ts(101)),
-                types.Reading(ts(125), 333, 777, ts(200)),
-                types.Reading(ts(1024), 256, 128, ts(4096)),
+                types.Reading(ts(100), 999, 666, ts(101)).as_dict(),
+                types.Reading(ts(125), 333, 777, ts(200)).as_dict(),
+                types.Reading(ts(1024), 256, 128, ts(4096)).as_dict(),
             ))
 
     def read_bsons(self):
--- a/weather_server/server.py	Sun Oct 13 18:44:12 2019 -0400
+++ b/weather_server/server.py	Sat Oct 19 18:40:48 2019 -0400
@@ -1,17 +1,18 @@
 import datetime
 import hmac
+import pathlib
 import sys
 
 import bson
 import flask
+import pytz
 
 from . import common
 from . import locations
-from . import types
 
 
 def build_app(root_directory: str) -> flask.Flask:
-    locs = locations.Locations(root_directory)
+    locs = locations.LocationFolder(pathlib.Path(root_directory))
     app = flask.Flask(__name__)
     app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0
 
@@ -32,35 +33,27 @@
             preamble = next(reader)
             loc_name = preamble['location']
             password = str(preamble['password'])
-            loc, logger = locs.get(loc_name)
+            loc = locs.get(loc_name)
             if not hmac.compare_digest(password, loc.password):
                 flask.abort(400)
-            entries = [
-                types.Reading.from_now(
-                    sample_time=item['sample_time'],
-                    temp_c=item['temp_c'],
-                    rh_pct=item['rh_pct'],
-                )
-                for item in reader
-            ]
+            entries = tuple(reader)
         except (KeyError, bson.InvalidBSON):
             flask.abort(400)
-        logger.write_rows(entries)
+        now = datetime.datetime.now(tz=pytz.UTC)
+        loc.record(entries, now)
         return flask.jsonify({'status': 'OK'})
 
     @app.route('/<location>')
     def show(location: str):
         try:
-            loc, logger = locs.get(location)
+            loc = locs.get(location)
         except KeyError:
             flask.abort(404)
-        data = logger.data
-        if data:
-            last_reading = data[-1]
+        last_reading = loc.latest()
+        if last_reading:
             tz = loc.timezone()
             date = tz.normalize(last_reading.sample_time.astimezone(tz))
         else:
-            last_reading = None
             date = None
         return flask.render_template(
             'location.html',
@@ -71,7 +64,7 @@
     @app.route('/<location>/recent')
     def recent(location: str):
         try:
-            loc, logger = locs.get(location)
+            loc = locs.get(location)
         except KeyError:
             flask.abort(404)
         req = flask.request
@@ -83,10 +76,7 @@
 
         start = common.utc_now() - datetime.timedelta(seconds=seconds)
 
-        readings = [
-            r.as_dict() for r in logger.data
-            if start < r.sample_time
-        ]
+        readings = [r for r in loc.entries if start < r['sample_time']]
         resp = flask.Response()
         resp.content_type = 'application/json'
         resp.data = common.json_dumps({
--- a/weather_server/types.py	Sun Oct 13 18:44:12 2019 -0400
+++ b/weather_server/types.py	Sat Oct 19 18:40:48 2019 -0400
@@ -6,7 +6,8 @@
 
 import attr
 
-from . import common
+
+T = t.TypeVar('T')
 
 
 def c_to_f(c: float) -> float:
@@ -51,12 +52,9 @@
     def dew_point_f(self) -> float:
         return c_to_f(self.dew_point_c)
 
-    def as_dict(self) -> t.Dict[str, t.Any]:
-        return attr.asdict(self, recurse=False)
-
     @classmethod
-    def from_now(cls, **kwargs) -> 'Reading':
-        return cls(ingest_time=common.utc_now(), **kwargs)
+    def from_dict(cls: t.Type[T], d: t.Dict[str, t.Any]) -> T:
+        return cls(**{f.name: d[f.name] for f in attr.fields(cls)})
 
     @property
     def _gamma(self) -> float: