diff weather_server/logfile.py @ 0:efe7a1eff167

Create initial logger for weather server.
author Paul Fisher <paul@pfish.zone>
date Sat, 28 Sep 2019 23:17:21 -0400
parents
children beb42c835c52
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/weather_server/logfile.py	Sat Sep 28 23:17:21 2019 -0400
@@ -0,0 +1,120 @@
+"""The part which handles writing things out and reading things in from CSV.
+"""
+
+import fcntl
+import os
+import threading
+import typing as t
+
+import bson
+
+from . import common
+from . import types
+
+
+class Logger:
+    """Logger which handles reading/writing a temperature log for one process.
+    """
+
+    instance_lock = threading.Lock()
+    instances: t.Dict[str, 'Logger'] = {}
+
+    @classmethod
+    def create(cls, filename: str) -> 'Logger':
+        """Creates a single shared instance of a logger for the given file."""
+        try:
+            return cls.instances[filename]
+        except KeyError:
+            with cls.instance_lock:
+                try:
+                    return cls.instances[filename]
+                except KeyError:
+                    cls.instances[filename] = Logger(filename)
+                    return cls.instances[filename]
+
+    def __init__(self, filename: str):
+        """You should probably call .create() instead."""
+        self._file = _open_or_create(filename)
+        self._data: t.Tuple[types.Reading] = ()
+        self._last_size = 0
+        self._maybe_read_data()
+        self._lock = threading.Lock()
+
+    def _maybe_read_data(self) -> None:
+        """Reads data and advances the file pointer to the end of the file."""
+        # This must be called with both the file lock and _lock held.
+        size = self._size()
+        if size == self._last_size:
+            return
+        last_good = self._file.tell()
+        data = list(self._data)
+        try:
+            items = bson.decode_file_iter(
+                self._file, codec_options=common.BSON_OPTIONS)
+            for item in items:
+                last_good = self._file.tell()
+                try:
+                    data.append(types.Reading(**item))
+                except TypeError:
+                    pass  # Skip this item.
+        except bson.InvalidBSON:
+            pass  # We have reached the last valid document.  Bail.
+        # Seek back to immediately after the end of the last valid doc.
+        self._data = tuple(data)
+        self._file.truncate(last_good)
+        self._last_size = last_good
+        self._file.seek(last_good, os.SEEK_SET)
+
+    def write_rows(self, readings: t.Iterable[types.Reading]) -> None:
+        """Write a sorted series of readings, ignoring old ones."""
+        with self._lock:
+            fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
+            try:
+                self._maybe_read_data()
+                self._file.truncate(self._file.tell())
+                data = list(self._data)
+                if not data:
+                    last_time = None
+                else:
+                    last_time = data[-1].sample_time
+                for reading in readings:
+                    if not last_time or last_time < reading.sample_time:
+                        self._file.write(common.bson_encode(reading.as_dict()))
+                        data.append(reading)
+                self._data = tuple(data)
+            finally:
+                self._file.flush()
+                self._last_size = self._size()
+                fcntl.flock(self, fcntl.LOCK_UN)
+
+    def fileno(self) -> int:
+        return self._file.fileno()
+
+    def close(self):
+        self._file.close()
+
+    @property
+    def data(self) -> t.Tuple[types.Reading, ...]:
+        if self._size() != self._last_size:
+            fcntl.flock(self, fcntl.LOCK_SH)
+            try:
+                with self._lock:
+                    self._maybe_read_data()
+            finally:
+                fcntl.flock(self, fcntl.LOCK_UN)
+        return self._data
+
+    def _size(self) -> int:
+        return os.stat(self.fileno()).st_size
+
+
+def _open_or_create(path: str) -> t.BinaryIO:
+    while True:
+        try:
+            return open(path, 'r+b')
+        except FileNotFoundError:
+            pass
+        try:
+            return open(path, 'x+b')
+        except FileExistsError:
+            pass