diff weatherlog/logger.py @ 14:c01f9929ae38

Make logger and HTTP writer more general and resilient. This makes the logger and HTTP writer more general, by removing any dependency upon the exact data type they are writing. They can now handle any type of BSON-serializable dict, and track what they have sent by keeping track of the last *byte*, not the last timestamp.
author Paul Fisher <paul@pfish.zone>
date Tue, 15 Oct 2019 22:40:24 -0400
parents 8a350ec1aa78
children 770215590d80
line wrap: on
line diff
--- a/weatherlog/logger.py	Sun Sep 29 12:11:16 2019 -0400
+++ b/weatherlog/logger.py	Tue Oct 15 22:40:24 2019 -0400
@@ -10,13 +10,13 @@
 import time
 import typing as t
 
+import attr
 import bson
 import pytz
 
-from . import types
-
 BSON_FILENAME = "temps.bson"
-LAST_SENT_FILENAME = "last-sent"
+OLD_LAST_TS = "last-sent"
+START_BYTE = "start-byte"
 
 
 class RemoteWriter(metaclass=abc.ABCMeta):
@@ -24,7 +24,7 @@
     BATCH_SIZE = 1000
 
     @abc.abstractmethod
-    def write(self, readings: t.Sequence[types.Reading]) -> None:
+    def write(self, readings: t.Iterable[t.Dict[str, object]]) -> None:
         raise NotImplementedError()
 
 
@@ -33,6 +33,15 @@
     pass
 
 
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class ReadingPosition:
+    # The encoded reading that was written.
+    data: bytes
+
+    # The index of the byte immediately following this one.
+    end: int
+
+
 class BufferedLogger:
     """A resilient logger which logs to a local file and a RemoteWriter."""
 
@@ -42,10 +51,10 @@
         self._writer = writer
         self._path = pathlib.Path(directory)
         self._file = _open_exclusive(self._path / BSON_FILENAME)
-        self._last_sent_path = self._path / LAST_SENT_FILENAME
-        last_sent = _read_last_sent(self._last_sent_path)
-        unsent = _read_unsent_and_advance(
-            self._file, last_sent)
+        self._old_last_sent = self._path / OLD_LAST_TS
+        self._start_byte = self._path / START_BYTE
+        unsent = _read_unsent_and_upgrade(
+            self._file, self._old_last_sent, self._start_byte)
         self._send_queue = collections.deque(unsent)
         self._running = False
         self._remote_thread: t.Optional[threading.Thread] = None
@@ -64,13 +73,15 @@
         if self._remote_thread:
             self._remote_thread.join()
 
-    def write(self, reading: types.Reading):
-        self._file.write(bson_encode(reading.as_dict()))
+    def write(self, reading: t.Dict[str, object]) -> None:
+        encoded = bson_encode(reading)
+        self._file.write(encoded)
         self._file.flush()
-        self._send_queue.append(reading)
+        byte = self._file.tell()
+        self._send_queue.append(ReadingPosition(encoded, byte))
 
     def _send_internal(self) -> None:
-        to_send: t.List[types.Reading] = []
+        to_send: t.List[ReadingPosition] = []
         while True:
             # Wait for multiple entries to build up in the queue.
             time.sleep(self.WAIT_TIME)
@@ -88,20 +99,28 @@
                 continue
             try:
                 # Try writing out the values.
-                self._writer.write(to_send)
+                self._writer.write(e.data for e in to_send)
             except RemoteWriteError:
                 pass  # If it fails, just try again next time.
             else:
                 # If we succeeded, record our success.
                 last_sent = to_send[-1]
-                self._update_last_sent(last_sent.sample_time)
+                self._update_start_byte(last_sent.end)
                 to_send.clear()
 
-    def _update_last_sent(self, timestamp: datetime.datetime) -> None:
-        last_sent_name = self._path / (LAST_SENT_FILENAME + ".new")
-        with last_sent_name.open('w') as outfile:
-            outfile.write(str(timestamp.timestamp()))
-        last_sent_name.rename(self._last_sent_path)
+    def _update_start_byte(self, byte: int) -> None:
+        start_byte_name = self._path / (START_BYTE + ".new")
+        with start_byte_name.open('w') as outfile:
+            outfile.write(str(byte))
+        start_byte_name.rename(self._start_byte)
+
+
+def _atomic_write(file: pathlib.Path, contents: str) -> None:
+    """Writes a string to a file, atomically."""
+    new_name = file.with_name(file.name + '.new')
+    with new_name.open('w') as outfile:
+        outfile.write(contents)
+    new_name.rename(file)
 
 
 def _open_or_create(path: pathlib.Path) -> t.BinaryIO:
@@ -126,6 +145,63 @@
     return file
 
 
+def _read_unsent_and_upgrade(
+    infile: t.BinaryIO,
+    last_sent_file: pathlib.Path,
+    start_byte_file: pathlib.Path,
+) -> t.List[ReadingPosition]:
+    _maybe_upgrade_last_sent(infile, last_sent_file, start_byte_file)
+    start_byte = _read_start_byte(start_byte_file)
+    infile.seek(start_byte, os.SEEK_SET)
+    reader = bson.decode_file_iter(infile, BSON_OPTIONS)
+    readings: t.List[ReadingPosition] = []
+    end_pos = infile.tell()
+    try:
+        for entry in reader:
+            data = bson_encode(entry)
+            end_pos = infile.tell()
+            readings.append(ReadingPosition(data, end_pos))
+    except bson.InvalidBSON:
+        infile.seek(end_pos, os.SEEK_SET)
+        infile.truncate(end_pos)
+    return readings
+
+
+def _read_start_byte(path: pathlib.Path) -> int:
+    try:
+        with path.open('r') as infile:
+            return int(infile.read())
+    except (OSError, ValueError):
+        return 0
+
+
+def _maybe_upgrade_last_sent(
+    infile: t.BinaryIO,
+    last_sent_file: pathlib.Path,
+    start_byte_file: pathlib.Path,
+) -> None:
+    """If there's a last-sent file, upgrades it to start-byte."""
+    last_sent = _read_last_sent(last_sent_file)
+    if not last_sent:
+        return
+    reader = bson.decode_file_iter(infile, BSON_OPTIONS)
+    last_good = infile.tell()
+    try:
+        for entry in reader:
+            try:
+                timestamp: datetime.datetime = entry['sample_time']
+            except KeyError:
+                continue  # Invalid entry; skip it.
+            if last_sent < timestamp:
+                break
+            last_good = infile.tell()
+    except bson.InvalidBSON:
+        infile.seek(last_good, os.SEEK_SET)
+        infile.truncate(last_good)
+    _atomic_write(start_byte_file, str(last_good))
+    last_sent_file.unlink()
+
+
 def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]:
     try:
         with path.open('r') as infile:
@@ -141,28 +217,5 @@
     tz_aware=True, tzinfo=pytz.UTC)
 
 
-def bson_encode(data: t.Dict[str, t.Any]) -> bytes:
+def bson_encode(data: t.Dict[str, object]) -> bytes:
     return bson.BSON.encode(data, codec_options=BSON_OPTIONS)
-
-
-def _read_unsent_and_advance(
-    infile: t.BinaryIO, last_sent: t.Optional[datetime.datetime],
-) -> t.List[types.Reading]:
-    """Reads all the unsent Readings and advances the file pointer to the end.
-    """
-    reader = bson.decode_file_iter(infile, BSON_OPTIONS)
-    last_good = infile.tell()
-    unsent: t.List[types.Reading] = []
-    try:
-        for entry in reader:
-            last_good = infile.tell()
-            try:
-                reading = types.Reading(**entry)
-            except TypeError:
-                continue  # Invalid entry; skip it.
-            if not last_sent or last_sent < reading.sample_time:
-                unsent.append(reading)
-    except bson.InvalidBSON:
-        infile.seek(last_good, os.SEEK_SET)
-        infile.truncate(last_good)
-    return unsent