view weather_server/logfile.py @ 24:20c8ec56e447

logfile: Pull logfile thread out of Logger. This enables automatic garbage collection of Logger instances, since a running thread no longer has a reference to a Logger's self. It separates exclusive management of logfile state into the _writer_thread function, which now opens the file and writes it until it is told to stop by receiving the poison pill.
author Paul Fisher <paul@pfish.zone>
date Sun, 10 Nov 2019 23:07:11 -0500
parents beb42c835c52
children 9bc3687e1e5e
line wrap: on
line source

"""The part which handles writing things out and reading things in from CSV.
"""

import concurrent.futures as futures
import contextlib
import fcntl
import os
import queue
import threading
import typing as t

import bson

from . import common


class _WriteRequest:

    def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]):
        """Creates a request to write the given data to the log."""
        # The data to be written.  We take ownership of all the dicts!
        self.entries = entries
        # Once written, a future that will resolve to None if successful.
        self.future = futures.Future()


class _ReadRequest:

    def __init__(self):
        # The future that will be set with the log's contnets.
        self.future = futures.Future()


# Poison pill to tell a logger thread to stop.
_POISON = object()


class Logger:
    """Logger which handles reading/writing one temperature log file."""

    def __init__(self, filename: str, *, sample_field: str):
        """Creates a new Logger for the given file.

        Args:
            filename: The filename to open, or create if not already there.
            sample_field: The field name to use as the strictly-increasing
                value to ensure that no duplicate writes occur.
        """
        self._sample_field = sample_field
        self._queue = queue.SimpleQueue()
        # Create a Future that will be resolved once the file is opened
        # (or fails to be opened).
        writer_started = futures.Future()
        self._writer_thread = threading.Thread(
            name=f'{filename!r} writer thread',
            target=lambda: _writer_thread(
                filename, self._queue, sample_field, writer_started),
            daemon=True)
        self._writer_thread.start()
        writer_started.result()

    @property
    def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
        req = _ReadRequest()
        self._queue.put(req)
        return req.future.result()

    def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]):
        req = _WriteRequest(entries)
        self._queue.put(req)
        return req.future.result()

    def __del__(self):
        self.close()

    def close(self):
        self._queue.put(_POISON)
        self._writer_thread.join()


def _writer_thread(
    filename: str,
    q: queue.Queue,
    sample_field: str,
    started: futures.Future,
) -> None:
    if not started.set_running_or_notify_cancel():
        return
    try:
        file = _open_or_create(filename)
        started.set_result(None)
    except BaseException as e:
        started.set_exception(e)
        return
    with file:
        running = True
        data: t.List[t.Dict[str, object]] = []
        while running:
            item = q.get()
            if item is _POISON:
                # None is the poison pill that makes us stop.
                running = False
            elif isinstance(item, _ReadRequest):
                if not item.future.set_running_or_notify_cancel():
                    continue
                try:
                    with _file_lock(file, fcntl.LOCK_SH):
                        data.extend(_catch_up(file))
                except BaseException as x:
                    item.future.set_exception(x)
                else:
                    item.future.set_result(tuple(data))
            elif isinstance(item, _WriteRequest):
                if not item.future.set_running_or_notify_cancel():
                    continue
                try:
                    with _file_lock(file, fcntl.LOCK_EX):
                        data.extend(_catch_up(file))
                        # Since we're at the last good point, truncate after.
                        file.truncate(file.tell())
                        if not data:
                            last = None
                        else:
                            last = data[-1][sample_field]
                        for entry in item.entries:
                            entry_key = entry[sample_field]
                            if last is None or last < entry_key:
                                file.write(common.bson_encode(entry))
                                data.append(entry)
                                last = entry_key
                        file.flush()
                except BaseException as x:
                    item.future.set_exception(x)
                else:
                    item.future.set_result(None)
            else:
                raise AssertionError(
                    'Unexpected item {!r} in the queue'.format(item))


@contextlib.contextmanager
def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]:
    assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.'
    fcntl.flock(file, operation)
    try:
        yield
    finally:
        fcntl.flock(file, fcntl.LOCK_UN)


def _size(file: t.BinaryIO) -> int:
    return os.stat(file.fileno()).st_size


def _catch_up(file: t.BinaryIO) -> t.Iterable[t.Dict[str, object]]:
    """Reads data and advances the file pointer to the end of the file."""
    size = _size(file)
    pointer = file.tell()
    if size == pointer:
        return ()
    output: t.List[t.Dict[str, object]] = []
    try:
        items = bson.decode_file_iter(
            file, codec_options=common.BSON_OPTIONS)
        for item in items:
            pointer = file.tell()
            output.append(item)
    except bson.InvalidBSON:
        pass  # We have reached the last valid document.  Bail.
    # Seek back to immediately after the end of the last valid doc.
    file.seek(pointer, os.SEEK_SET)
    return output


def _open_or_create(path: str) -> t.BinaryIO:
    while True:
        try:
            return open(path, 'r+b')
        except FileNotFoundError:
            pass
        try:
            return open(path, 'x+b')
        except FileExistsError:
            pass