view weather_server/logfile.py @ 23:88249e451566

server: show date when last report was >12h ago.
author Paul Fisher <paul@pfish.zone>
date Sun, 10 Nov 2019 19:42:04 -0500
parents beb42c835c52
children 20c8ec56e447
line wrap: on
line source

"""The part which handles writing things out and reading things in from CSV.
"""

import concurrent.futures as futures
import contextlib
import fcntl
import os
import queue
import threading
import typing as t

import bson

from . import common


class _WriteRequest:

    def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]):
        """Creates a request to write the given data to the log."""
        # The data to be written.  We take ownership of all the dicts!
        self.entries = entries
        # Once written, a future that will resolve to None if successful.
        self.future = futures.Future()


class _ReadRequest:

    def __init__(self):
        # The future that will be set with the log's contnets.
        self.future = futures.Future()


# probably handle file-writing with a queue that reports back its progress

class Logger:
    """Logger which handles reading/writing a temperature log for one process.
    """

    instance_lock = threading.Lock()
    instances: t.Dict[str, 'Logger'] = {}

    @classmethod
    def create(
        cls,
        filename: str,
        *,
        sample_field: str,
    ) -> 'Logger':
        """Creates a single shared instance of a logger for the given file."""
        try:
            instance = cls.instances[filename]
        except KeyError:
            with cls.instance_lock:
                try:
                    instance = cls.instances[filename]
                except KeyError:
                    cls.instances[filename] = Logger(
                        filename,
                        sample_field=sample_field)
                    instance = cls.instances[filename]
        if instance._sample_field != sample_field:
            raise ValueError(
                'Existing instance has different sample field: '
                '{!r} != {!r}'.format(instance._sample_field, sample_field))
        return instance

    def __init__(self, filename: str, *, sample_field: str):
        """You should probably call .create() instead."""
        self._sample_field = sample_field
        self._file = _open_or_create(filename)
        self._data: t.List[t.Dict[str, t.Any], ...] = []
        self._queue = queue.SimpleQueue()
        self._last_size = 0
        self._lock_status: t.Optional[int] = None
        self._writer_thread = threading.Thread(target=self._writer)
        self._writer_thread.start()

    @property
    def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
        req = _ReadRequest()
        self._queue.put(req)
        return req.future.result()

    def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]):
        req = _WriteRequest(entries)
        self._queue.put(req)
        return req.future.result()

    _POISON = object()

    def close(self):
        self._queue.put(self._POISON)
        self._writer_thread.join()

    def _writer(self) -> None:
        running = True
        while running:
            item = self._queue.get()
            if item is self._POISON:
                # None is the poison pill that makes us stop.
                running = False
            elif isinstance(item, _ReadRequest):
                if not item.future.set_running_or_notify_cancel():
                    continue
                try:
                    with self._file_lock(fcntl.LOCK_SH):
                        self._catch_up()
                except BaseException as x:
                    item.future.set_exception(x)
                else:
                    item.future.set_result(tuple(self._data))
            elif isinstance(item, _WriteRequest):
                if not item.future.set_running_or_notify_cancel():
                    continue
                try:
                    with self._file_lock(fcntl.LOCK_EX):
                        self._catch_up()
                        # Since we're at the last good point, truncate after.
                        self._file.truncate(self._file.tell())
                        if not self._data:
                            last = None
                        else:
                            last = self._data[-1][self._sample_field]
                        for entry in item.entries:
                            entry_key = entry[self._sample_field]
                            if last is None or last < entry_key:
                                self._file.write(common.bson_encode(entry))
                                self._data.append(entry)
                                last = entry_key
                        self._file.flush()
                        self._last_size = self._file.tell()
                except BaseException as x:
                    item.future.set_exception(x)
                else:
                    item.future.set_result(None)
            else:
                raise AssertionError(
                    'Unexpected item {!r} in the queue'.format(item))
        self._file.close()

    def _catch_up(self) -> None:
        """Reads data and advances the file pointer to the end of the file."""
        assert self._lock_status is not None, 'The lock must be held.'
        size = self._size()
        if size == self._last_size:
            return
        last_good = self._file.tell()
        try:
            items = bson.decode_file_iter(
                self._file, codec_options=common.BSON_OPTIONS)
            for item in items:
                last_good = self._file.tell()
                self._data.append(item)
        except bson.InvalidBSON:
            pass  # We have reached the last valid document.  Bail.
        # Seek back to immediately after the end of the last valid doc.
        self._last_size = last_good
        self._file.seek(last_good, os.SEEK_SET)

    def fileno(self) -> int:
        return self._file.fileno()

    def _size(self) -> int:
        return os.stat(self.fileno()).st_size

    @contextlib.contextmanager
    def _file_lock(self, operation: int):
        assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.'
        fcntl.flock(self, operation)
        self._lock_status = operation
        try:
            yield
        finally:
            self._lock_status = None
            fcntl.flock(self, fcntl.LOCK_UN)


def _open_or_create(path: str) -> t.BinaryIO:
    while True:
        try:
            return open(path, 'r+b')
        except FileNotFoundError:
            pass
        try:
            return open(path, 'x+b')
        except FileExistsError:
            pass