diff weather_server/logfile.py @ 21:beb42c835c52

Make weather server handle arbitrary data: - Make logfile record arbitrary BSONs - Make server handlers OK with same - Make location type a normal class rather than attrs; have it handle its own logger. - Bump version number.
author Paul Fisher <paul@pfish.zone>
date Sat, 19 Oct 2019 18:40:48 -0400
parents efe7a1eff167
children 20c8ec56e447
line wrap: on
line diff
--- a/weather_server/logfile.py	Sun Oct 13 18:44:12 2019 -0400
+++ b/weather_server/logfile.py	Sat Oct 19 18:40:48 2019 -0400
@@ -1,16 +1,37 @@
 """The part which handles writing things out and reading things in from CSV.
 """
 
+import concurrent.futures as futures
+import contextlib
 import fcntl
 import os
+import queue
 import threading
 import typing as t
 
 import bson
 
 from . import common
-from . import types
+
+
+class _WriteRequest:
 
+    def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]):
+        """Creates a request to write the given data to the log."""
+        # The data to be written.  We take ownership of all the dicts!
+        self.entries = entries
+        # Once written, a future that will resolve to None if successful.
+        self.future = futures.Future()
+
+
+class _ReadRequest:
+
+    def __init__(self):
+        # The future that will be set with the log's contnets.
+        self.future = futures.Future()
+
+
+# probably handle file-writing with a queue that reports back its progress
 
 class Logger:
     """Logger which handles reading/writing a temperature log for one process.
@@ -20,93 +41,140 @@
     instances: t.Dict[str, 'Logger'] = {}
 
     @classmethod
-    def create(cls, filename: str) -> 'Logger':
+    def create(
+        cls,
+        filename: str,
+        *,
+        sample_field: str,
+    ) -> 'Logger':
         """Creates a single shared instance of a logger for the given file."""
         try:
-            return cls.instances[filename]
+            instance = cls.instances[filename]
         except KeyError:
             with cls.instance_lock:
                 try:
-                    return cls.instances[filename]
+                    instance = cls.instances[filename]
                 except KeyError:
-                    cls.instances[filename] = Logger(filename)
-                    return cls.instances[filename]
+                    cls.instances[filename] = Logger(
+                        filename,
+                        sample_field=sample_field)
+                    instance = cls.instances[filename]
+        if instance._sample_field != sample_field:
+            raise ValueError(
+                'Existing instance has different sample field: '
+                '{!r} != {!r}'.format(instance._sample_field, sample_field))
+        return instance
 
-    def __init__(self, filename: str):
+    def __init__(self, filename: str, *, sample_field: str):
         """You should probably call .create() instead."""
+        self._sample_field = sample_field
         self._file = _open_or_create(filename)
-        self._data: t.Tuple[types.Reading] = ()
+        self._data: t.List[t.Dict[str, t.Any], ...] = []
+        self._queue = queue.SimpleQueue()
         self._last_size = 0
-        self._maybe_read_data()
-        self._lock = threading.Lock()
+        self._lock_status: t.Optional[int] = None
+        self._writer_thread = threading.Thread(target=self._writer)
+        self._writer_thread.start()
+
+    @property
+    def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
+        req = _ReadRequest()
+        self._queue.put(req)
+        return req.future.result()
+
+    def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]):
+        req = _WriteRequest(entries)
+        self._queue.put(req)
+        return req.future.result()
+
+    _POISON = object()
+
+    def close(self):
+        self._queue.put(self._POISON)
+        self._writer_thread.join()
 
-    def _maybe_read_data(self) -> None:
+    def _writer(self) -> None:
+        running = True
+        while running:
+            item = self._queue.get()
+            if item is self._POISON:
+                # None is the poison pill that makes us stop.
+                running = False
+            elif isinstance(item, _ReadRequest):
+                if not item.future.set_running_or_notify_cancel():
+                    continue
+                try:
+                    with self._file_lock(fcntl.LOCK_SH):
+                        self._catch_up()
+                except BaseException as x:
+                    item.future.set_exception(x)
+                else:
+                    item.future.set_result(tuple(self._data))
+            elif isinstance(item, _WriteRequest):
+                if not item.future.set_running_or_notify_cancel():
+                    continue
+                try:
+                    with self._file_lock(fcntl.LOCK_EX):
+                        self._catch_up()
+                        # Since we're at the last good point, truncate after.
+                        self._file.truncate(self._file.tell())
+                        if not self._data:
+                            last = None
+                        else:
+                            last = self._data[-1][self._sample_field]
+                        for entry in item.entries:
+                            entry_key = entry[self._sample_field]
+                            if last is None or last < entry_key:
+                                self._file.write(common.bson_encode(entry))
+                                self._data.append(entry)
+                                last = entry_key
+                        self._file.flush()
+                        self._last_size = self._file.tell()
+                except BaseException as x:
+                    item.future.set_exception(x)
+                else:
+                    item.future.set_result(None)
+            else:
+                raise AssertionError(
+                    'Unexpected item {!r} in the queue'.format(item))
+        self._file.close()
+
+    def _catch_up(self) -> None:
         """Reads data and advances the file pointer to the end of the file."""
-        # This must be called with both the file lock and _lock held.
+        assert self._lock_status is not None, 'The lock must be held.'
         size = self._size()
         if size == self._last_size:
             return
         last_good = self._file.tell()
-        data = list(self._data)
         try:
             items = bson.decode_file_iter(
                 self._file, codec_options=common.BSON_OPTIONS)
             for item in items:
                 last_good = self._file.tell()
-                try:
-                    data.append(types.Reading(**item))
-                except TypeError:
-                    pass  # Skip this item.
+                self._data.append(item)
         except bson.InvalidBSON:
             pass  # We have reached the last valid document.  Bail.
         # Seek back to immediately after the end of the last valid doc.
-        self._data = tuple(data)
-        self._file.truncate(last_good)
         self._last_size = last_good
         self._file.seek(last_good, os.SEEK_SET)
 
-    def write_rows(self, readings: t.Iterable[types.Reading]) -> None:
-        """Write a sorted series of readings, ignoring old ones."""
-        with self._lock:
-            fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
-            try:
-                self._maybe_read_data()
-                self._file.truncate(self._file.tell())
-                data = list(self._data)
-                if not data:
-                    last_time = None
-                else:
-                    last_time = data[-1].sample_time
-                for reading in readings:
-                    if not last_time or last_time < reading.sample_time:
-                        self._file.write(common.bson_encode(reading.as_dict()))
-                        data.append(reading)
-                self._data = tuple(data)
-            finally:
-                self._file.flush()
-                self._last_size = self._size()
-                fcntl.flock(self, fcntl.LOCK_UN)
-
     def fileno(self) -> int:
         return self._file.fileno()
 
-    def close(self):
-        self._file.close()
-
-    @property
-    def data(self) -> t.Tuple[types.Reading, ...]:
-        if self._size() != self._last_size:
-            fcntl.flock(self, fcntl.LOCK_SH)
-            try:
-                with self._lock:
-                    self._maybe_read_data()
-            finally:
-                fcntl.flock(self, fcntl.LOCK_UN)
-        return self._data
-
     def _size(self) -> int:
         return os.stat(self.fileno()).st_size
 
+    @contextlib.contextmanager
+    def _file_lock(self, operation: int):
+        assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.'
+        fcntl.flock(self, operation)
+        self._lock_status = operation
+        try:
+            yield
+        finally:
+            self._lock_status = None
+            fcntl.flock(self, fcntl.LOCK_UN)
+
 
 def _open_or_create(path: str) -> t.BinaryIO:
     while True: