Mercurial > personal > weatherlog
view weatherlog/logger.py @ 14:c01f9929ae38
Make logger and HTTP writer more general and resilient.
This makes the logger and HTTP writer more general, by removing
any dependency upon the exact data type they are writing.
They can now handle any type of BSON-serializable dict,
and track what they have sent by keeping track of the last *byte*,
not the last timestamp.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Tue, 15 Oct 2019 22:40:24 -0400 |
parents | 8a350ec1aa78 |
children | 770215590d80 |
line wrap: on
line source
"""Module to log temperatures to a directory, and an external website.""" import abc import collections import datetime import fcntl import os import pathlib import threading import time import typing as t import attr import bson import pytz BSON_FILENAME = "temps.bson" OLD_LAST_TS = "last-sent" START_BYTE = "start-byte" class RemoteWriter(metaclass=abc.ABCMeta): BATCH_SIZE = 1000 @abc.abstractmethod def write(self, readings: t.Iterable[t.Dict[str, object]]) -> None: raise NotImplementedError() class RemoteWriteError(Exception): """Error to be raised by RemoteWriter.write.""" pass @attr.s(auto_attribs=True, frozen=True, slots=True) class ReadingPosition: # The encoded reading that was written. data: bytes # The index of the byte immediately following this one. end: int class BufferedLogger: """A resilient logger which logs to a local file and a RemoteWriter.""" WAIT_TIME = 2 def __init__(self, directory: str, writer: RemoteWriter): self._writer = writer self._path = pathlib.Path(directory) self._file = _open_exclusive(self._path / BSON_FILENAME) self._old_last_sent = self._path / OLD_LAST_TS self._start_byte = self._path / START_BYTE unsent = _read_unsent_and_upgrade( self._file, self._old_last_sent, self._start_byte) self._send_queue = collections.deque(unsent) self._running = False self._remote_thread: t.Optional[threading.Thread] = None def start(self) -> None: """Starts the bit which logs to HTTP.""" self._running = True self._remote_thread = threading.Thread(target=self._send_internal) self._remote_thread.start() def close(self) -> None: """Stops the logger, closes all files, and stops writing to HTTP.""" fcntl.flock(self._file, fcntl.LOCK_UN) self._file.close() self._running = False if self._remote_thread: self._remote_thread.join() def write(self, reading: t.Dict[str, object]) -> None: encoded = bson_encode(reading) self._file.write(encoded) self._file.flush() byte = self._file.tell() self._send_queue.append(ReadingPosition(encoded, byte)) def _send_internal(self) -> None: to_send: t.List[ReadingPosition] = [] while True: # Wait for multiple entries to build up in the queue. time.sleep(self.WAIT_TIME) while len(to_send) < self._writer.BATCH_SIZE: # Pop all the values we can off the queue. try: to_send.append(self._send_queue.popleft()) except IndexError: break if not self._running: # Stop if we've been asked to stop. break if not to_send: # If there's nothing to send, don't try to send anything. continue try: # Try writing out the values. self._writer.write(e.data for e in to_send) except RemoteWriteError: pass # If it fails, just try again next time. else: # If we succeeded, record our success. last_sent = to_send[-1] self._update_start_byte(last_sent.end) to_send.clear() def _update_start_byte(self, byte: int) -> None: start_byte_name = self._path / (START_BYTE + ".new") with start_byte_name.open('w') as outfile: outfile.write(str(byte)) start_byte_name.rename(self._start_byte) def _atomic_write(file: pathlib.Path, contents: str) -> None: """Writes a string to a file, atomically.""" new_name = file.with_name(file.name + '.new') with new_name.open('w') as outfile: outfile.write(contents) new_name.rename(file) def _open_or_create(path: pathlib.Path) -> t.BinaryIO: while True: try: return path.open('r+b') except FileNotFoundError: pass try: return path.open('x+b') except FileExistsError: pass def _open_exclusive(path: pathlib.Path) -> t.BinaryIO: file = _open_or_create(path) try: fcntl.flock(file, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError as ex: file.close() raise OSError('Another copy of the logger is running.') from ex return file def _read_unsent_and_upgrade( infile: t.BinaryIO, last_sent_file: pathlib.Path, start_byte_file: pathlib.Path, ) -> t.List[ReadingPosition]: _maybe_upgrade_last_sent(infile, last_sent_file, start_byte_file) start_byte = _read_start_byte(start_byte_file) infile.seek(start_byte, os.SEEK_SET) reader = bson.decode_file_iter(infile, BSON_OPTIONS) readings: t.List[ReadingPosition] = [] end_pos = infile.tell() try: for entry in reader: data = bson_encode(entry) end_pos = infile.tell() readings.append(ReadingPosition(data, end_pos)) except bson.InvalidBSON: infile.seek(end_pos, os.SEEK_SET) infile.truncate(end_pos) return readings def _read_start_byte(path: pathlib.Path) -> int: try: with path.open('r') as infile: return int(infile.read()) except (OSError, ValueError): return 0 def _maybe_upgrade_last_sent( infile: t.BinaryIO, last_sent_file: pathlib.Path, start_byte_file: pathlib.Path, ) -> None: """If there's a last-sent file, upgrades it to start-byte.""" last_sent = _read_last_sent(last_sent_file) if not last_sent: return reader = bson.decode_file_iter(infile, BSON_OPTIONS) last_good = infile.tell() try: for entry in reader: try: timestamp: datetime.datetime = entry['sample_time'] except KeyError: continue # Invalid entry; skip it. if last_sent < timestamp: break last_good = infile.tell() except bson.InvalidBSON: infile.seek(last_good, os.SEEK_SET) infile.truncate(last_good) _atomic_write(start_byte_file, str(last_good)) last_sent_file.unlink() def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]: try: with path.open('r') as infile: unix_ts = float(infile.read()) except (OSError, ValueError): # If the last-written file is missing or corrupt, assume it is dead. return None return datetime.datetime.utcfromtimestamp(unix_ts).replace( tzinfo=pytz.UTC) BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options( tz_aware=True, tzinfo=pytz.UTC) def bson_encode(data: t.Dict[str, object]) -> bytes: return bson.BSON.encode(data, codec_options=BSON_OPTIONS)