view weatherlog/logger.py @ 16:770215590d80

logger: Actually pass the right type. The logger was improperly giving the writer already-encoded values.
author Paul Fisher <paul@pfish.zone>
date Thu, 17 Oct 2019 22:19:33 -0400
parents c01f9929ae38
children
line wrap: on
line source

"""Module to log temperatures to a directory, and an external website."""

import abc
import collections
import datetime
import fcntl
import os
import pathlib
import threading
import time
import typing as t

import attr
import bson
import pytz

BSON_FILENAME = "temps.bson"
OLD_LAST_TS = "last-sent"
START_BYTE = "start-byte"


class RemoteWriter(metaclass=abc.ABCMeta):

    BATCH_SIZE = 1000

    @abc.abstractmethod
    def write(self, readings: t.Iterable[t.Dict[str, object]]) -> None:
        raise NotImplementedError()


class RemoteWriteError(Exception):
    """Error to be raised by RemoteWriter.write."""
    pass


@attr.s(auto_attribs=True, frozen=True, slots=True)
class ReadingPosition:
    # The encoded reading that was written.
    data: t.Dict[str, object]

    # The index of the byte immediately following this one.
    end: int


class BufferedLogger:
    """A resilient logger which logs to a local file and a RemoteWriter."""

    WAIT_TIME = 2

    def __init__(self, directory: str, writer: RemoteWriter):
        self._writer = writer
        self._path = pathlib.Path(directory)
        self._file = _open_exclusive(self._path / BSON_FILENAME)
        self._old_last_sent = self._path / OLD_LAST_TS
        self._start_byte = self._path / START_BYTE
        unsent = _read_unsent_and_upgrade(
            self._file, self._old_last_sent, self._start_byte)
        self._send_queue = collections.deque(unsent)
        self._running = False
        self._remote_thread: t.Optional[threading.Thread] = None

    def start(self) -> None:
        """Starts the bit which logs to HTTP."""
        self._running = True
        self._remote_thread = threading.Thread(target=self._send_internal)
        self._remote_thread.start()

    def close(self) -> None:
        """Stops the logger, closes all files, and stops writing to HTTP."""
        fcntl.flock(self._file, fcntl.LOCK_UN)
        self._file.close()
        self._running = False
        if self._remote_thread:
            self._remote_thread.join()

    def write(self, reading: t.Dict[str, object]) -> None:
        encoded = bson_encode(reading)
        self._file.write(encoded)
        self._file.flush()
        byte = self._file.tell()
        self._send_queue.append(ReadingPosition(reading, byte))

    def _send_internal(self) -> None:
        to_send: t.List[ReadingPosition] = []
        while True:
            # Wait for multiple entries to build up in the queue.
            time.sleep(self.WAIT_TIME)
            while len(to_send) < self._writer.BATCH_SIZE:
                # Pop all the values we can off the queue.
                try:
                    to_send.append(self._send_queue.popleft())
                except IndexError:
                    break
            if not self._running:
                # Stop if we've been asked to stop.
                break
            if not to_send:
                # If there's nothing to send, don't try to send anything.
                continue
            try:
                # Try writing out the values.
                self._writer.write(e.data for e in to_send)
            except RemoteWriteError:
                pass  # If it fails, just try again next time.
            else:
                # If we succeeded, record our success.
                last_sent = to_send[-1]
                self._update_start_byte(last_sent.end)
                to_send.clear()

    def _update_start_byte(self, byte: int) -> None:
        start_byte_name = self._path / (START_BYTE + ".new")
        with start_byte_name.open('w') as outfile:
            outfile.write(str(byte))
        start_byte_name.rename(self._start_byte)


def _atomic_write(file: pathlib.Path, contents: str) -> None:
    """Writes a string to a file, atomically."""
    new_name = file.with_name(file.name + '.new')
    with new_name.open('w') as outfile:
        outfile.write(contents)
    new_name.rename(file)


def _open_or_create(path: pathlib.Path) -> t.BinaryIO:
    while True:
        try:
            return path.open('r+b')
        except FileNotFoundError:
            pass
        try:
            return path.open('x+b')
        except FileExistsError:
            pass


def _open_exclusive(path: pathlib.Path) -> t.BinaryIO:
    file = _open_or_create(path)
    try:
        fcntl.flock(file, fcntl.LOCK_EX | fcntl.LOCK_NB)
    except BlockingIOError as ex:
        file.close()
        raise OSError('Another copy of the logger is running.') from ex
    return file


def _read_unsent_and_upgrade(
    infile: t.BinaryIO,
    last_sent_file: pathlib.Path,
    start_byte_file: pathlib.Path,
) -> t.List[ReadingPosition]:
    _maybe_upgrade_last_sent(infile, last_sent_file, start_byte_file)
    start_byte = _read_start_byte(start_byte_file)
    infile.seek(start_byte, os.SEEK_SET)
    reader = bson.decode_file_iter(infile, BSON_OPTIONS)
    readings: t.List[ReadingPosition] = []
    end_pos = infile.tell()
    try:
        for entry in reader:
            end_pos = infile.tell()
            readings.append(ReadingPosition(entry, end_pos))
    except bson.InvalidBSON:
        infile.seek(end_pos, os.SEEK_SET)
        infile.truncate(end_pos)
    return readings


def _read_start_byte(path: pathlib.Path) -> int:
    try:
        with path.open('r') as infile:
            return int(infile.read())
    except (OSError, ValueError):
        return 0


def _maybe_upgrade_last_sent(
    infile: t.BinaryIO,
    last_sent_file: pathlib.Path,
    start_byte_file: pathlib.Path,
) -> None:
    """If there's a last-sent file, upgrades it to start-byte."""
    last_sent = _read_last_sent(last_sent_file)
    if not last_sent:
        return
    reader = bson.decode_file_iter(infile, BSON_OPTIONS)
    last_good = infile.tell()
    try:
        for entry in reader:
            try:
                timestamp: datetime.datetime = entry['sample_time']
            except KeyError:
                continue  # Invalid entry; skip it.
            if last_sent < timestamp:
                break
            last_good = infile.tell()
    except bson.InvalidBSON:
        infile.seek(last_good, os.SEEK_SET)
        infile.truncate(last_good)
    _atomic_write(start_byte_file, str(last_good))
    last_sent_file.unlink()


def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]:
    try:
        with path.open('r') as infile:
            unix_ts = float(infile.read())
    except (OSError, ValueError):
        # If the last-written file is missing or corrupt, assume it is dead.
        return None
    return datetime.datetime.utcfromtimestamp(unix_ts).replace(
        tzinfo=pytz.UTC)


BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options(
    tz_aware=True, tzinfo=pytz.UTC)


def bson_encode(data: t.Dict[str, object]) -> bytes:
    return bson.BSON.encode(data, codec_options=BSON_OPTIONS)