Mercurial > personal > weatherlog
view weatherlog/logger.py @ 6:8a350ec1aa78
logger_test: Ensure it crashes when the file is locked.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sat, 28 Sep 2019 19:55:27 -0400 |
parents | 885bff085edf |
children | c01f9929ae38 |
line wrap: on
line source
"""Module to log temperatures to a directory, and an external website.""" import abc import collections import datetime import fcntl import os import pathlib import threading import time import typing as t import bson import pytz from . import types BSON_FILENAME = "temps.bson" LAST_SENT_FILENAME = "last-sent" class RemoteWriter(metaclass=abc.ABCMeta): BATCH_SIZE = 1000 @abc.abstractmethod def write(self, readings: t.Sequence[types.Reading]) -> None: raise NotImplementedError() class RemoteWriteError(Exception): """Error to be raised by RemoteWriter.write.""" pass class BufferedLogger: """A resilient logger which logs to a local file and a RemoteWriter.""" WAIT_TIME = 2 def __init__(self, directory: str, writer: RemoteWriter): self._writer = writer self._path = pathlib.Path(directory) self._file = _open_exclusive(self._path / BSON_FILENAME) self._last_sent_path = self._path / LAST_SENT_FILENAME last_sent = _read_last_sent(self._last_sent_path) unsent = _read_unsent_and_advance( self._file, last_sent) self._send_queue = collections.deque(unsent) self._running = False self._remote_thread: t.Optional[threading.Thread] = None def start(self) -> None: """Starts the bit which logs to HTTP.""" self._running = True self._remote_thread = threading.Thread(target=self._send_internal) self._remote_thread.start() def close(self) -> None: """Stops the logger, closes all files, and stops writing to HTTP.""" fcntl.flock(self._file, fcntl.LOCK_UN) self._file.close() self._running = False if self._remote_thread: self._remote_thread.join() def write(self, reading: types.Reading): self._file.write(bson_encode(reading.as_dict())) self._file.flush() self._send_queue.append(reading) def _send_internal(self) -> None: to_send: t.List[types.Reading] = [] while True: # Wait for multiple entries to build up in the queue. time.sleep(self.WAIT_TIME) while len(to_send) < self._writer.BATCH_SIZE: # Pop all the values we can off the queue. try: to_send.append(self._send_queue.popleft()) except IndexError: break if not self._running: # Stop if we've been asked to stop. break if not to_send: # If there's nothing to send, don't try to send anything. continue try: # Try writing out the values. self._writer.write(to_send) except RemoteWriteError: pass # If it fails, just try again next time. else: # If we succeeded, record our success. last_sent = to_send[-1] self._update_last_sent(last_sent.sample_time) to_send.clear() def _update_last_sent(self, timestamp: datetime.datetime) -> None: last_sent_name = self._path / (LAST_SENT_FILENAME + ".new") with last_sent_name.open('w') as outfile: outfile.write(str(timestamp.timestamp())) last_sent_name.rename(self._last_sent_path) def _open_or_create(path: pathlib.Path) -> t.BinaryIO: while True: try: return path.open('r+b') except FileNotFoundError: pass try: return path.open('x+b') except FileExistsError: pass def _open_exclusive(path: pathlib.Path) -> t.BinaryIO: file = _open_or_create(path) try: fcntl.flock(file, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError as ex: file.close() raise OSError('Another copy of the logger is running.') from ex return file def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]: try: with path.open('r') as infile: unix_ts = float(infile.read()) except (OSError, ValueError): # If the last-written file is missing or corrupt, assume it is dead. return None return datetime.datetime.utcfromtimestamp(unix_ts).replace( tzinfo=pytz.UTC) BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options( tz_aware=True, tzinfo=pytz.UTC) def bson_encode(data: t.Dict[str, t.Any]) -> bytes: return bson.BSON.encode(data, codec_options=BSON_OPTIONS) def _read_unsent_and_advance( infile: t.BinaryIO, last_sent: t.Optional[datetime.datetime], ) -> t.List[types.Reading]: """Reads all the unsent Readings and advances the file pointer to the end. """ reader = bson.decode_file_iter(infile, BSON_OPTIONS) last_good = infile.tell() unsent: t.List[types.Reading] = [] try: for entry in reader: last_good = infile.tell() try: reading = types.Reading(**entry) except TypeError: continue # Invalid entry; skip it. if not last_sent or last_sent < reading.sample_time: unsent.append(reading) except bson.InvalidBSON: infile.seek(last_good, os.SEEK_SET) infile.truncate(last_good) return unsent