view weather_server/logfile.py @ 21:beb42c835c52

Make weather server handle arbitrary data: - Make logfile record arbitrary BSONs - Make server handlers OK with same - Make location type a normal class rather than attrs; have it handle its own logger. - Bump version number.
author Paul Fisher <paul@pfish.zone>
date Sat, 19 Oct 2019 18:40:48 -0400
parents efe7a1eff167
children 20c8ec56e447
line wrap: on
line source

"""The part which handles writing things out and reading things in from CSV.
"""

import concurrent.futures as futures
import contextlib
import fcntl
import os
import queue
import threading
import typing as t

import bson

from . import common


class _WriteRequest:

    def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]):
        """Creates a request to write the given data to the log."""
        # The data to be written.  We take ownership of all the dicts!
        self.entries = entries
        # Once written, a future that will resolve to None if successful.
        self.future = futures.Future()


class _ReadRequest:

    def __init__(self):
        # The future that will be set with the log's contnets.
        self.future = futures.Future()


# probably handle file-writing with a queue that reports back its progress

class Logger:
    """Logger which handles reading/writing a temperature log for one process.
    """

    instance_lock = threading.Lock()
    instances: t.Dict[str, 'Logger'] = {}

    @classmethod
    def create(
        cls,
        filename: str,
        *,
        sample_field: str,
    ) -> 'Logger':
        """Creates a single shared instance of a logger for the given file."""
        try:
            instance = cls.instances[filename]
        except KeyError:
            with cls.instance_lock:
                try:
                    instance = cls.instances[filename]
                except KeyError:
                    cls.instances[filename] = Logger(
                        filename,
                        sample_field=sample_field)
                    instance = cls.instances[filename]
        if instance._sample_field != sample_field:
            raise ValueError(
                'Existing instance has different sample field: '
                '{!r} != {!r}'.format(instance._sample_field, sample_field))
        return instance

    def __init__(self, filename: str, *, sample_field: str):
        """You should probably call .create() instead."""
        self._sample_field = sample_field
        self._file = _open_or_create(filename)
        self._data: t.List[t.Dict[str, t.Any], ...] = []
        self._queue = queue.SimpleQueue()
        self._last_size = 0
        self._lock_status: t.Optional[int] = None
        self._writer_thread = threading.Thread(target=self._writer)
        self._writer_thread.start()

    @property
    def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
        req = _ReadRequest()
        self._queue.put(req)
        return req.future.result()

    def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]):
        req = _WriteRequest(entries)
        self._queue.put(req)
        return req.future.result()

    _POISON = object()

    def close(self):
        self._queue.put(self._POISON)
        self._writer_thread.join()

    def _writer(self) -> None:
        running = True
        while running:
            item = self._queue.get()
            if item is self._POISON:
                # None is the poison pill that makes us stop.
                running = False
            elif isinstance(item, _ReadRequest):
                if not item.future.set_running_or_notify_cancel():
                    continue
                try:
                    with self._file_lock(fcntl.LOCK_SH):
                        self._catch_up()
                except BaseException as x:
                    item.future.set_exception(x)
                else:
                    item.future.set_result(tuple(self._data))
            elif isinstance(item, _WriteRequest):
                if not item.future.set_running_or_notify_cancel():
                    continue
                try:
                    with self._file_lock(fcntl.LOCK_EX):
                        self._catch_up()
                        # Since we're at the last good point, truncate after.
                        self._file.truncate(self._file.tell())
                        if not self._data:
                            last = None
                        else:
                            last = self._data[-1][self._sample_field]
                        for entry in item.entries:
                            entry_key = entry[self._sample_field]
                            if last is None or last < entry_key:
                                self._file.write(common.bson_encode(entry))
                                self._data.append(entry)
                                last = entry_key
                        self._file.flush()
                        self._last_size = self._file.tell()
                except BaseException as x:
                    item.future.set_exception(x)
                else:
                    item.future.set_result(None)
            else:
                raise AssertionError(
                    'Unexpected item {!r} in the queue'.format(item))
        self._file.close()

    def _catch_up(self) -> None:
        """Reads data and advances the file pointer to the end of the file."""
        assert self._lock_status is not None, 'The lock must be held.'
        size = self._size()
        if size == self._last_size:
            return
        last_good = self._file.tell()
        try:
            items = bson.decode_file_iter(
                self._file, codec_options=common.BSON_OPTIONS)
            for item in items:
                last_good = self._file.tell()
                self._data.append(item)
        except bson.InvalidBSON:
            pass  # We have reached the last valid document.  Bail.
        # Seek back to immediately after the end of the last valid doc.
        self._last_size = last_good
        self._file.seek(last_good, os.SEEK_SET)

    def fileno(self) -> int:
        return self._file.fileno()

    def _size(self) -> int:
        return os.stat(self.fileno()).st_size

    @contextlib.contextmanager
    def _file_lock(self, operation: int):
        assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.'
        fcntl.flock(self, operation)
        self._lock_status = operation
        try:
            yield
        finally:
            self._lock_status = None
            fcntl.flock(self, fcntl.LOCK_UN)


def _open_or_create(path: str) -> t.BinaryIO:
    while True:
        try:
            return open(path, 'r+b')
        except FileNotFoundError:
            pass
        try:
            return open(path, 'x+b')
        except FileExistsError:
            pass