view weather_server/logfile.py @ 35:2f3473416c11

Fix format string in repr(Location).
author Paul Fisher <paul@pfish.zone>
date Fri, 02 Jul 2021 20:36:56 -0400
parents 9bc3687e1e5e
children
line wrap: on
line source

"""The part which handles writing things out and reading things in from CSV.
"""

import attr
import collections
import concurrent.futures as futures
import contextlib
import fcntl
import os
import queue
import threading
import typing as t

import bson

from . import common


# The number of entries to keep in memory without reading from disk.
CACHED_ENTRIES = 16384

# How many entries to read before creating a new index entry.
INDEX_GAP = 4096


class _WriteRequest:

    def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]):
        """Creates a request to write the given data to the log."""
        # The data to be written.  We take ownership of all the dicts!
        self.entries = entries
        # Once written, a future that will resolve to None if successful.
        self.future = futures.Future()


class _ReadRequest:

    def __init__(self):
        # The future that will be set with the log's contnets.
        self.future = futures.Future()
        # TODO(pfish): Make it possible to read from a starting point.


# Poison pill to tell a logger thread to stop.
_POISON = object()


class Logger:
    """Logger which handles reading/writing one temperature log file."""

    _file: t.BinaryIO
    _index_file: t.BinaryIO

    def __init__(
            self,
            filename: str,
            *,
            sample_field: str,
            cached_entries: int = CACHED_ENTRIES,
            index_gap: int = INDEX_GAP):
        """Creates a new Logger for the given file.

        Args:
            filename: The filename to open, or create if not already there.
            sample_field: The field name to use as the strictly-increasing
                value to ensure that no duplicate writes occur.
        """

        self._filename = filename
        self._cached_entries = cached_entries
        self._index_gap = index_gap

        self._queue = queue.SimpleQueue()
        self._sample_field = sample_field

        self._start_future = futures.Future()
        self._thread = threading.Thread(
            name=f'{filename!r} writer thread',
            target=self._writer_thread,
            daemon=True)
        self._thread.start()

        self._data = collections.deque(maxlen=self._cached_entries)
        self._index: t.List[IndexEntry] = []
        self._message_count = 0
        self._start_future.result()

    @property
    def cached_data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
        req = _ReadRequest()
        self._queue.put(req)
        return req.future.result()

    def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]):
        req = _WriteRequest(entries)
        self._queue.put(req)
        return req.future.result()

    def _writer_thread(self) -> None:
        try:
            self._file = _open_or_create(self._filename)
            self._index_file = _open_or_create(_index_filename(self._filename))
        except BaseException as e:
            self._start_future.set_exception(e)
            return
        self._start_future.set_result(None)

        with self._file, self._index_file:
            running = True
            while running:
                item = self._queue.get()
                if item is _POISON:
                    # None is the poison pill that makes us stop.
                    running = False
                elif isinstance(item, _ReadRequest):
                    if not item.future.set_running_or_notify_cancel():
                        continue
                    with _file_lock(self._file, fcntl.LOCK_SH):
                        try:
                            self._catch_up()
                        except BaseException as x:
                            item.future.set_exception(x)
                        else:
                            item.future.set_result(tuple(self._data))
                elif isinstance(item, _WriteRequest):
                    if not item.future.set_running_or_notify_cancel():
                        continue
                    try:
                        with _file_lock(self._file, fcntl.LOCK_EX):
                            self._catch_up()
                            current_ptr = self._file.tell()
                            self._file.truncate(current_ptr)
                            if not self._data:
                                prev_key = None
                            else:
                                prev_key = self._data[-1][self._sample_field]
                            for entry in item.entries:
                                entry_key = entry[self._sample_field]
                                if prev_key is None or prev_key < entry_key:
                                    self._data.append(entry)
                                    self._file.write(common.bson_encode(entry))
                                    prev_key = entry_key
                            self._file.flush()
                            self._update_index()
                    except BaseException as x:
                        item.future.set_exception(x)
                    else:
                        item.future.set_result(None)
                else:
                    raise AssertionError(
                        f'Unexpected item {item!r} in the queue')

    def _catch_up(self) -> None:
        # Preconditions:
        # - At least a read lock is held.
        # - File pointer is at the end of the last-read entry.
        self._catch_up_index()
        if self._index:
            # Since we have an index, use it to find a starting place.
            last_idx = self._index[-1]
            # Figure out, based on the number of entries we want to keep
            # in memory, where we should start reading from.
            read_start_count = max(
                0, last_idx.entry_count - self._cached_entries)
            if self._message_count < read_start_count:
                # If we've already read past that starting point, we're OK.
                for idx_entry in self._index:
                    if read_start_count <= idx_entry.entry_count:
                        break
                    starting_point = idx_entry
                self._data.clear()
                self._file.seek(starting_point.byte, os.SEEK_SET)
                self._message_count = starting_point.entry_count
        pointer = self._file.tell()
        try:
            items = bson.decode_file_iter(
                self._file, codec_options=common.BSON_OPTIONS)
            for item in items:
                pointer = self._file.tell()
                self._data.append(item)
                self._message_count += 1
        except bson.InvalidBSON:
            pass  # We have reached the last valid document. Bail.
        # Seek back to immediately after the end of the last valid doc.
        self._file.seek(pointer, os.SEEK_SET)

    def _update_index(self) -> None:
        # Preconditions:
        # - File pointer is at the end of the last-written message.
        # - Index pointer is at the last-read index value.
        # - Exclusive lock is held.
        with _dup_file(self._file) as update_fp:
            if self._index:
                last_idx = self._index[-1]
                update_fp.seek(last_idx.byte)
                current_count = last_idx.entry_count
                decoder = bson.decode_file_iter(
                    update_fp, codec_options=common.BSON_OPTIONS)
                try:
                    # Skip the current index entry.
                    next(decoder)
                    current_count += 1
                    current_byte = update_fp.tell()
                except StopIteration:
                    # If there are no more entries, don't update the index.
                    return
            else:
                current_byte = 0
                update_fp.seek(current_byte, os.SEEK_SET)
                current_count = 0
            entries = bson.decode_file_iter(
                update_fp, codec_options=common.BSON_OPTIONS)
            for entry in entries:
                if current_count % self._index_gap == 0:
                    idx_entry = IndexEntry(
                        entry_count=current_count,
                        entry_key=entry[self._sample_field],
                        byte=current_byte)
                    self._index.append(idx_entry)
                    self._index_file.truncate()
                    self._index_file.write(
                        common.bson_encode(idx_entry.to_dict()))
                current_count += 1
                current_byte = update_fp.tell()
        self._index_file.flush()

    def _catch_up_index(self) -> None:
        # Preconditions:
        # - At least a read lock is held.
        pointer = self._index_file.tell()
        try:
            index_entries = bson.decode_file_iter(
                self._index_file, codec_options=common.BSON_OPTIONS)
            for entry_dict in index_entries:
                entry = IndexEntry.from_dict(entry_dict)
                self._index.append(entry)
                pointer = self._index_file.tell()
        except bson.InvalidBSON:
            pass  # We have reached the last valid BSON document. Bail.
        self._index_file.seek(pointer, os.SEEK_SET)

    def __del__(self) -> None:
        self.close()

    def close(self) -> None:
        self._queue.put(_POISON)
        self._thread.join()


def _index_filename(filename: str) -> str:
    return filename + '.index.bson'


@contextlib.contextmanager
def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]:
    assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.'
    fcntl.flock(file, operation)
    try:
        yield
    finally:
        fcntl.flock(file, fcntl.LOCK_UN)


def _size(file: t.BinaryIO) -> int:
    return os.stat(file.fileno()).st_size


T = t.TypeVar('T')


@attr.s(auto_attribs=True, frozen=True, slots=True)
class IndexEntry:
    entry_count: int
    entry_key: object
    byte: int

    @classmethod
    def from_dict(cls: t.Type[T], d: t.Dict[str, t.Any]) -> T:
        return cls(**{f.name: d[f.name] for f in attr.fields(cls)})

    def to_dict(self) -> t.Dict[str, object]:
        return attr.asdict(self, recurse=False)


def _open_or_create(path: str) -> t.BinaryIO:
    while True:
        try:
            return open(path, 'r+b')
        except FileNotFoundError:
            pass
        try:
            return open(path, 'x+b')
        except FileExistsError:
            pass


def _dup_file(file: t.BinaryIO) -> t.BinaryIO:
    duplicate = os.dup(file.fileno())
    return os.fdopen(duplicate, 'r+b')