Mercurial > personal > weatherlog
view weatherlog/logger.py @ 16:770215590d80
logger: Actually pass the right type.
The logger was improperly giving the writer already-encoded values.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Thu, 17 Oct 2019 22:19:33 -0400 |
parents | c01f9929ae38 |
children |
line wrap: on
line source
"""Module to log temperatures to a directory, and an external website.""" import abc import collections import datetime import fcntl import os import pathlib import threading import time import typing as t import attr import bson import pytz BSON_FILENAME = "temps.bson" OLD_LAST_TS = "last-sent" START_BYTE = "start-byte" class RemoteWriter(metaclass=abc.ABCMeta): BATCH_SIZE = 1000 @abc.abstractmethod def write(self, readings: t.Iterable[t.Dict[str, object]]) -> None: raise NotImplementedError() class RemoteWriteError(Exception): """Error to be raised by RemoteWriter.write.""" pass @attr.s(auto_attribs=True, frozen=True, slots=True) class ReadingPosition: # The encoded reading that was written. data: t.Dict[str, object] # The index of the byte immediately following this one. end: int class BufferedLogger: """A resilient logger which logs to a local file and a RemoteWriter.""" WAIT_TIME = 2 def __init__(self, directory: str, writer: RemoteWriter): self._writer = writer self._path = pathlib.Path(directory) self._file = _open_exclusive(self._path / BSON_FILENAME) self._old_last_sent = self._path / OLD_LAST_TS self._start_byte = self._path / START_BYTE unsent = _read_unsent_and_upgrade( self._file, self._old_last_sent, self._start_byte) self._send_queue = collections.deque(unsent) self._running = False self._remote_thread: t.Optional[threading.Thread] = None def start(self) -> None: """Starts the bit which logs to HTTP.""" self._running = True self._remote_thread = threading.Thread(target=self._send_internal) self._remote_thread.start() def close(self) -> None: """Stops the logger, closes all files, and stops writing to HTTP.""" fcntl.flock(self._file, fcntl.LOCK_UN) self._file.close() self._running = False if self._remote_thread: self._remote_thread.join() def write(self, reading: t.Dict[str, object]) -> None: encoded = bson_encode(reading) self._file.write(encoded) self._file.flush() byte = self._file.tell() self._send_queue.append(ReadingPosition(reading, byte)) def _send_internal(self) -> None: to_send: t.List[ReadingPosition] = [] while True: # Wait for multiple entries to build up in the queue. time.sleep(self.WAIT_TIME) while len(to_send) < self._writer.BATCH_SIZE: # Pop all the values we can off the queue. try: to_send.append(self._send_queue.popleft()) except IndexError: break if not self._running: # Stop if we've been asked to stop. break if not to_send: # If there's nothing to send, don't try to send anything. continue try: # Try writing out the values. self._writer.write(e.data for e in to_send) except RemoteWriteError: pass # If it fails, just try again next time. else: # If we succeeded, record our success. last_sent = to_send[-1] self._update_start_byte(last_sent.end) to_send.clear() def _update_start_byte(self, byte: int) -> None: start_byte_name = self._path / (START_BYTE + ".new") with start_byte_name.open('w') as outfile: outfile.write(str(byte)) start_byte_name.rename(self._start_byte) def _atomic_write(file: pathlib.Path, contents: str) -> None: """Writes a string to a file, atomically.""" new_name = file.with_name(file.name + '.new') with new_name.open('w') as outfile: outfile.write(contents) new_name.rename(file) def _open_or_create(path: pathlib.Path) -> t.BinaryIO: while True: try: return path.open('r+b') except FileNotFoundError: pass try: return path.open('x+b') except FileExistsError: pass def _open_exclusive(path: pathlib.Path) -> t.BinaryIO: file = _open_or_create(path) try: fcntl.flock(file, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError as ex: file.close() raise OSError('Another copy of the logger is running.') from ex return file def _read_unsent_and_upgrade( infile: t.BinaryIO, last_sent_file: pathlib.Path, start_byte_file: pathlib.Path, ) -> t.List[ReadingPosition]: _maybe_upgrade_last_sent(infile, last_sent_file, start_byte_file) start_byte = _read_start_byte(start_byte_file) infile.seek(start_byte, os.SEEK_SET) reader = bson.decode_file_iter(infile, BSON_OPTIONS) readings: t.List[ReadingPosition] = [] end_pos = infile.tell() try: for entry in reader: end_pos = infile.tell() readings.append(ReadingPosition(entry, end_pos)) except bson.InvalidBSON: infile.seek(end_pos, os.SEEK_SET) infile.truncate(end_pos) return readings def _read_start_byte(path: pathlib.Path) -> int: try: with path.open('r') as infile: return int(infile.read()) except (OSError, ValueError): return 0 def _maybe_upgrade_last_sent( infile: t.BinaryIO, last_sent_file: pathlib.Path, start_byte_file: pathlib.Path, ) -> None: """If there's a last-sent file, upgrades it to start-byte.""" last_sent = _read_last_sent(last_sent_file) if not last_sent: return reader = bson.decode_file_iter(infile, BSON_OPTIONS) last_good = infile.tell() try: for entry in reader: try: timestamp: datetime.datetime = entry['sample_time'] except KeyError: continue # Invalid entry; skip it. if last_sent < timestamp: break last_good = infile.tell() except bson.InvalidBSON: infile.seek(last_good, os.SEEK_SET) infile.truncate(last_good) _atomic_write(start_byte_file, str(last_good)) last_sent_file.unlink() def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]: try: with path.open('r') as infile: unix_ts = float(infile.read()) except (OSError, ValueError): # If the last-written file is missing or corrupt, assume it is dead. return None return datetime.datetime.utcfromtimestamp(unix_ts).replace( tzinfo=pytz.UTC) BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options( tz_aware=True, tzinfo=pytz.UTC) def bson_encode(data: t.Dict[str, object]) -> bytes: return bson.BSON.encode(data, codec_options=BSON_OPTIONS)