view weatherlog/logger.py @ 14:c01f9929ae38

Make logger and HTTP writer more general and resilient. This makes the logger and HTTP writer more general, by removing any dependency upon the exact data type they are writing. They can now handle any type of BSON-serializable dict, and track what they have sent by keeping track of the last *byte*, not the last timestamp.
author Paul Fisher <paul@pfish.zone>
date Tue, 15 Oct 2019 22:40:24 -0400
parents 8a350ec1aa78
children 770215590d80
line wrap: on
line source

"""Module to log temperatures to a directory, and an external website."""

import abc
import collections
import datetime
import fcntl
import os
import pathlib
import threading
import time
import typing as t

import attr
import bson
import pytz

BSON_FILENAME = "temps.bson"
OLD_LAST_TS = "last-sent"
START_BYTE = "start-byte"


class RemoteWriter(metaclass=abc.ABCMeta):

    BATCH_SIZE = 1000

    @abc.abstractmethod
    def write(self, readings: t.Iterable[t.Dict[str, object]]) -> None:
        raise NotImplementedError()


class RemoteWriteError(Exception):
    """Error to be raised by RemoteWriter.write."""
    pass


@attr.s(auto_attribs=True, frozen=True, slots=True)
class ReadingPosition:
    # The encoded reading that was written.
    data: bytes

    # The index of the byte immediately following this one.
    end: int


class BufferedLogger:
    """A resilient logger which logs to a local file and a RemoteWriter."""

    WAIT_TIME = 2

    def __init__(self, directory: str, writer: RemoteWriter):
        self._writer = writer
        self._path = pathlib.Path(directory)
        self._file = _open_exclusive(self._path / BSON_FILENAME)
        self._old_last_sent = self._path / OLD_LAST_TS
        self._start_byte = self._path / START_BYTE
        unsent = _read_unsent_and_upgrade(
            self._file, self._old_last_sent, self._start_byte)
        self._send_queue = collections.deque(unsent)
        self._running = False
        self._remote_thread: t.Optional[threading.Thread] = None

    def start(self) -> None:
        """Starts the bit which logs to HTTP."""
        self._running = True
        self._remote_thread = threading.Thread(target=self._send_internal)
        self._remote_thread.start()

    def close(self) -> None:
        """Stops the logger, closes all files, and stops writing to HTTP."""
        fcntl.flock(self._file, fcntl.LOCK_UN)
        self._file.close()
        self._running = False
        if self._remote_thread:
            self._remote_thread.join()

    def write(self, reading: t.Dict[str, object]) -> None:
        encoded = bson_encode(reading)
        self._file.write(encoded)
        self._file.flush()
        byte = self._file.tell()
        self._send_queue.append(ReadingPosition(encoded, byte))

    def _send_internal(self) -> None:
        to_send: t.List[ReadingPosition] = []
        while True:
            # Wait for multiple entries to build up in the queue.
            time.sleep(self.WAIT_TIME)
            while len(to_send) < self._writer.BATCH_SIZE:
                # Pop all the values we can off the queue.
                try:
                    to_send.append(self._send_queue.popleft())
                except IndexError:
                    break
            if not self._running:
                # Stop if we've been asked to stop.
                break
            if not to_send:
                # If there's nothing to send, don't try to send anything.
                continue
            try:
                # Try writing out the values.
                self._writer.write(e.data for e in to_send)
            except RemoteWriteError:
                pass  # If it fails, just try again next time.
            else:
                # If we succeeded, record our success.
                last_sent = to_send[-1]
                self._update_start_byte(last_sent.end)
                to_send.clear()

    def _update_start_byte(self, byte: int) -> None:
        start_byte_name = self._path / (START_BYTE + ".new")
        with start_byte_name.open('w') as outfile:
            outfile.write(str(byte))
        start_byte_name.rename(self._start_byte)


def _atomic_write(file: pathlib.Path, contents: str) -> None:
    """Writes a string to a file, atomically."""
    new_name = file.with_name(file.name + '.new')
    with new_name.open('w') as outfile:
        outfile.write(contents)
    new_name.rename(file)


def _open_or_create(path: pathlib.Path) -> t.BinaryIO:
    while True:
        try:
            return path.open('r+b')
        except FileNotFoundError:
            pass
        try:
            return path.open('x+b')
        except FileExistsError:
            pass


def _open_exclusive(path: pathlib.Path) -> t.BinaryIO:
    file = _open_or_create(path)
    try:
        fcntl.flock(file, fcntl.LOCK_EX | fcntl.LOCK_NB)
    except BlockingIOError as ex:
        file.close()
        raise OSError('Another copy of the logger is running.') from ex
    return file


def _read_unsent_and_upgrade(
    infile: t.BinaryIO,
    last_sent_file: pathlib.Path,
    start_byte_file: pathlib.Path,
) -> t.List[ReadingPosition]:
    _maybe_upgrade_last_sent(infile, last_sent_file, start_byte_file)
    start_byte = _read_start_byte(start_byte_file)
    infile.seek(start_byte, os.SEEK_SET)
    reader = bson.decode_file_iter(infile, BSON_OPTIONS)
    readings: t.List[ReadingPosition] = []
    end_pos = infile.tell()
    try:
        for entry in reader:
            data = bson_encode(entry)
            end_pos = infile.tell()
            readings.append(ReadingPosition(data, end_pos))
    except bson.InvalidBSON:
        infile.seek(end_pos, os.SEEK_SET)
        infile.truncate(end_pos)
    return readings


def _read_start_byte(path: pathlib.Path) -> int:
    try:
        with path.open('r') as infile:
            return int(infile.read())
    except (OSError, ValueError):
        return 0


def _maybe_upgrade_last_sent(
    infile: t.BinaryIO,
    last_sent_file: pathlib.Path,
    start_byte_file: pathlib.Path,
) -> None:
    """If there's a last-sent file, upgrades it to start-byte."""
    last_sent = _read_last_sent(last_sent_file)
    if not last_sent:
        return
    reader = bson.decode_file_iter(infile, BSON_OPTIONS)
    last_good = infile.tell()
    try:
        for entry in reader:
            try:
                timestamp: datetime.datetime = entry['sample_time']
            except KeyError:
                continue  # Invalid entry; skip it.
            if last_sent < timestamp:
                break
            last_good = infile.tell()
    except bson.InvalidBSON:
        infile.seek(last_good, os.SEEK_SET)
        infile.truncate(last_good)
    _atomic_write(start_byte_file, str(last_good))
    last_sent_file.unlink()


def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]:
    try:
        with path.open('r') as infile:
            unix_ts = float(infile.read())
    except (OSError, ValueError):
        # If the last-written file is missing or corrupt, assume it is dead.
        return None
    return datetime.datetime.utcfromtimestamp(unix_ts).replace(
        tzinfo=pytz.UTC)


BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options(
    tz_aware=True, tzinfo=pytz.UTC)


def bson_encode(data: t.Dict[str, object]) -> bytes:
    return bson.BSON.encode(data, codec_options=BSON_OPTIONS)