view weatherlog/logger.py @ 6:8a350ec1aa78

logger_test: Ensure it crashes when the file is locked.
author Paul Fisher <paul@pfish.zone>
date Sat, 28 Sep 2019 19:55:27 -0400
parents 885bff085edf
children c01f9929ae38
line wrap: on
line source

"""Module to log temperatures to a directory, and an external website."""

import abc
import collections
import datetime
import fcntl
import os
import pathlib
import threading
import time
import typing as t

import bson
import pytz

from . import types

BSON_FILENAME = "temps.bson"
LAST_SENT_FILENAME = "last-sent"


class RemoteWriter(metaclass=abc.ABCMeta):

    BATCH_SIZE = 1000

    @abc.abstractmethod
    def write(self, readings: t.Sequence[types.Reading]) -> None:
        raise NotImplementedError()


class RemoteWriteError(Exception):
    """Error to be raised by RemoteWriter.write."""
    pass


class BufferedLogger:
    """A resilient logger which logs to a local file and a RemoteWriter."""

    WAIT_TIME = 2

    def __init__(self, directory: str, writer: RemoteWriter):
        self._writer = writer
        self._path = pathlib.Path(directory)
        self._file = _open_exclusive(self._path / BSON_FILENAME)
        self._last_sent_path = self._path / LAST_SENT_FILENAME
        last_sent = _read_last_sent(self._last_sent_path)
        unsent = _read_unsent_and_advance(
            self._file, last_sent)
        self._send_queue = collections.deque(unsent)
        self._running = False
        self._remote_thread: t.Optional[threading.Thread] = None

    def start(self) -> None:
        """Starts the bit which logs to HTTP."""
        self._running = True
        self._remote_thread = threading.Thread(target=self._send_internal)
        self._remote_thread.start()

    def close(self) -> None:
        """Stops the logger, closes all files, and stops writing to HTTP."""
        fcntl.flock(self._file, fcntl.LOCK_UN)
        self._file.close()
        self._running = False
        if self._remote_thread:
            self._remote_thread.join()

    def write(self, reading: types.Reading):
        self._file.write(bson_encode(reading.as_dict()))
        self._file.flush()
        self._send_queue.append(reading)

    def _send_internal(self) -> None:
        to_send: t.List[types.Reading] = []
        while True:
            # Wait for multiple entries to build up in the queue.
            time.sleep(self.WAIT_TIME)
            while len(to_send) < self._writer.BATCH_SIZE:
                # Pop all the values we can off the queue.
                try:
                    to_send.append(self._send_queue.popleft())
                except IndexError:
                    break
            if not self._running:
                # Stop if we've been asked to stop.
                break
            if not to_send:
                # If there's nothing to send, don't try to send anything.
                continue
            try:
                # Try writing out the values.
                self._writer.write(to_send)
            except RemoteWriteError:
                pass  # If it fails, just try again next time.
            else:
                # If we succeeded, record our success.
                last_sent = to_send[-1]
                self._update_last_sent(last_sent.sample_time)
                to_send.clear()

    def _update_last_sent(self, timestamp: datetime.datetime) -> None:
        last_sent_name = self._path / (LAST_SENT_FILENAME + ".new")
        with last_sent_name.open('w') as outfile:
            outfile.write(str(timestamp.timestamp()))
        last_sent_name.rename(self._last_sent_path)


def _open_or_create(path: pathlib.Path) -> t.BinaryIO:
    while True:
        try:
            return path.open('r+b')
        except FileNotFoundError:
            pass
        try:
            return path.open('x+b')
        except FileExistsError:
            pass


def _open_exclusive(path: pathlib.Path) -> t.BinaryIO:
    file = _open_or_create(path)
    try:
        fcntl.flock(file, fcntl.LOCK_EX | fcntl.LOCK_NB)
    except BlockingIOError as ex:
        file.close()
        raise OSError('Another copy of the logger is running.') from ex
    return file


def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]:
    try:
        with path.open('r') as infile:
            unix_ts = float(infile.read())
    except (OSError, ValueError):
        # If the last-written file is missing or corrupt, assume it is dead.
        return None
    return datetime.datetime.utcfromtimestamp(unix_ts).replace(
        tzinfo=pytz.UTC)


BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options(
    tz_aware=True, tzinfo=pytz.UTC)


def bson_encode(data: t.Dict[str, t.Any]) -> bytes:
    return bson.BSON.encode(data, codec_options=BSON_OPTIONS)


def _read_unsent_and_advance(
    infile: t.BinaryIO, last_sent: t.Optional[datetime.datetime],
) -> t.List[types.Reading]:
    """Reads all the unsent Readings and advances the file pointer to the end.
    """
    reader = bson.decode_file_iter(infile, BSON_OPTIONS)
    last_good = infile.tell()
    unsent: t.List[types.Reading] = []
    try:
        for entry in reader:
            last_good = infile.tell()
            try:
                reading = types.Reading(**entry)
            except TypeError:
                continue  # Invalid entry; skip it.
            if not last_sent or last_sent < reading.sample_time:
                unsent.append(reading)
    except bson.InvalidBSON:
        infile.seek(last_good, os.SEEK_SET)
        infile.truncate(last_good)
    return unsent