Mercurial > personal > weatherlog
diff weatherlog/logger.py @ 14:c01f9929ae38
Make logger and HTTP writer more general and resilient.
This makes the logger and HTTP writer more general, by removing
any dependency upon the exact data type they are writing.
They can now handle any type of BSON-serializable dict,
and track what they have sent by keeping track of the last *byte*,
not the last timestamp.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Tue, 15 Oct 2019 22:40:24 -0400 |
parents | 8a350ec1aa78 |
children | 770215590d80 |
line wrap: on
line diff
--- a/weatherlog/logger.py Sun Sep 29 12:11:16 2019 -0400 +++ b/weatherlog/logger.py Tue Oct 15 22:40:24 2019 -0400 @@ -10,13 +10,13 @@ import time import typing as t +import attr import bson import pytz -from . import types - BSON_FILENAME = "temps.bson" -LAST_SENT_FILENAME = "last-sent" +OLD_LAST_TS = "last-sent" +START_BYTE = "start-byte" class RemoteWriter(metaclass=abc.ABCMeta): @@ -24,7 +24,7 @@ BATCH_SIZE = 1000 @abc.abstractmethod - def write(self, readings: t.Sequence[types.Reading]) -> None: + def write(self, readings: t.Iterable[t.Dict[str, object]]) -> None: raise NotImplementedError() @@ -33,6 +33,15 @@ pass +@attr.s(auto_attribs=True, frozen=True, slots=True) +class ReadingPosition: + # The encoded reading that was written. + data: bytes + + # The index of the byte immediately following this one. + end: int + + class BufferedLogger: """A resilient logger which logs to a local file and a RemoteWriter.""" @@ -42,10 +51,10 @@ self._writer = writer self._path = pathlib.Path(directory) self._file = _open_exclusive(self._path / BSON_FILENAME) - self._last_sent_path = self._path / LAST_SENT_FILENAME - last_sent = _read_last_sent(self._last_sent_path) - unsent = _read_unsent_and_advance( - self._file, last_sent) + self._old_last_sent = self._path / OLD_LAST_TS + self._start_byte = self._path / START_BYTE + unsent = _read_unsent_and_upgrade( + self._file, self._old_last_sent, self._start_byte) self._send_queue = collections.deque(unsent) self._running = False self._remote_thread: t.Optional[threading.Thread] = None @@ -64,13 +73,15 @@ if self._remote_thread: self._remote_thread.join() - def write(self, reading: types.Reading): - self._file.write(bson_encode(reading.as_dict())) + def write(self, reading: t.Dict[str, object]) -> None: + encoded = bson_encode(reading) + self._file.write(encoded) self._file.flush() - self._send_queue.append(reading) + byte = self._file.tell() + self._send_queue.append(ReadingPosition(encoded, byte)) def _send_internal(self) -> None: - to_send: t.List[types.Reading] = [] + to_send: t.List[ReadingPosition] = [] while True: # Wait for multiple entries to build up in the queue. time.sleep(self.WAIT_TIME) @@ -88,20 +99,28 @@ continue try: # Try writing out the values. - self._writer.write(to_send) + self._writer.write(e.data for e in to_send) except RemoteWriteError: pass # If it fails, just try again next time. else: # If we succeeded, record our success. last_sent = to_send[-1] - self._update_last_sent(last_sent.sample_time) + self._update_start_byte(last_sent.end) to_send.clear() - def _update_last_sent(self, timestamp: datetime.datetime) -> None: - last_sent_name = self._path / (LAST_SENT_FILENAME + ".new") - with last_sent_name.open('w') as outfile: - outfile.write(str(timestamp.timestamp())) - last_sent_name.rename(self._last_sent_path) + def _update_start_byte(self, byte: int) -> None: + start_byte_name = self._path / (START_BYTE + ".new") + with start_byte_name.open('w') as outfile: + outfile.write(str(byte)) + start_byte_name.rename(self._start_byte) + + +def _atomic_write(file: pathlib.Path, contents: str) -> None: + """Writes a string to a file, atomically.""" + new_name = file.with_name(file.name + '.new') + with new_name.open('w') as outfile: + outfile.write(contents) + new_name.rename(file) def _open_or_create(path: pathlib.Path) -> t.BinaryIO: @@ -126,6 +145,63 @@ return file +def _read_unsent_and_upgrade( + infile: t.BinaryIO, + last_sent_file: pathlib.Path, + start_byte_file: pathlib.Path, +) -> t.List[ReadingPosition]: + _maybe_upgrade_last_sent(infile, last_sent_file, start_byte_file) + start_byte = _read_start_byte(start_byte_file) + infile.seek(start_byte, os.SEEK_SET) + reader = bson.decode_file_iter(infile, BSON_OPTIONS) + readings: t.List[ReadingPosition] = [] + end_pos = infile.tell() + try: + for entry in reader: + data = bson_encode(entry) + end_pos = infile.tell() + readings.append(ReadingPosition(data, end_pos)) + except bson.InvalidBSON: + infile.seek(end_pos, os.SEEK_SET) + infile.truncate(end_pos) + return readings + + +def _read_start_byte(path: pathlib.Path) -> int: + try: + with path.open('r') as infile: + return int(infile.read()) + except (OSError, ValueError): + return 0 + + +def _maybe_upgrade_last_sent( + infile: t.BinaryIO, + last_sent_file: pathlib.Path, + start_byte_file: pathlib.Path, +) -> None: + """If there's a last-sent file, upgrades it to start-byte.""" + last_sent = _read_last_sent(last_sent_file) + if not last_sent: + return + reader = bson.decode_file_iter(infile, BSON_OPTIONS) + last_good = infile.tell() + try: + for entry in reader: + try: + timestamp: datetime.datetime = entry['sample_time'] + except KeyError: + continue # Invalid entry; skip it. + if last_sent < timestamp: + break + last_good = infile.tell() + except bson.InvalidBSON: + infile.seek(last_good, os.SEEK_SET) + infile.truncate(last_good) + _atomic_write(start_byte_file, str(last_good)) + last_sent_file.unlink() + + def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]: try: with path.open('r') as infile: @@ -141,28 +217,5 @@ tz_aware=True, tzinfo=pytz.UTC) -def bson_encode(data: t.Dict[str, t.Any]) -> bytes: +def bson_encode(data: t.Dict[str, object]) -> bytes: return bson.BSON.encode(data, codec_options=BSON_OPTIONS) - - -def _read_unsent_and_advance( - infile: t.BinaryIO, last_sent: t.Optional[datetime.datetime], -) -> t.List[types.Reading]: - """Reads all the unsent Readings and advances the file pointer to the end. - """ - reader = bson.decode_file_iter(infile, BSON_OPTIONS) - last_good = infile.tell() - unsent: t.List[types.Reading] = [] - try: - for entry in reader: - last_good = infile.tell() - try: - reading = types.Reading(**entry) - except TypeError: - continue # Invalid entry; skip it. - if not last_sent or last_sent < reading.sample_time: - unsent.append(reading) - except bson.InvalidBSON: - infile.seek(last_good, os.SEEK_SET) - infile.truncate(last_good) - return unsent