Mercurial > personal > weather-server
view weather_server/logfile.py @ 34:8d3f32455575
Bump version.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sat, 12 Jun 2021 20:24:30 +0000 |
parents | 9bc3687e1e5e |
children |
line wrap: on
line source
"""The part which handles writing things out and reading things in from CSV. """ import attr import collections import concurrent.futures as futures import contextlib import fcntl import os import queue import threading import typing as t import bson from . import common # The number of entries to keep in memory without reading from disk. CACHED_ENTRIES = 16384 # How many entries to read before creating a new index entry. INDEX_GAP = 4096 class _WriteRequest: def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): """Creates a request to write the given data to the log.""" # The data to be written. We take ownership of all the dicts! self.entries = entries # Once written, a future that will resolve to None if successful. self.future = futures.Future() class _ReadRequest: def __init__(self): # The future that will be set with the log's contnets. self.future = futures.Future() # TODO(pfish): Make it possible to read from a starting point. # Poison pill to tell a logger thread to stop. _POISON = object() class Logger: """Logger which handles reading/writing one temperature log file.""" _file: t.BinaryIO _index_file: t.BinaryIO def __init__( self, filename: str, *, sample_field: str, cached_entries: int = CACHED_ENTRIES, index_gap: int = INDEX_GAP): """Creates a new Logger for the given file. Args: filename: The filename to open, or create if not already there. sample_field: The field name to use as the strictly-increasing value to ensure that no duplicate writes occur. """ self._filename = filename self._cached_entries = cached_entries self._index_gap = index_gap self._queue = queue.SimpleQueue() self._sample_field = sample_field self._start_future = futures.Future() self._thread = threading.Thread( name=f'{filename!r} writer thread', target=self._writer_thread, daemon=True) self._thread.start() self._data = collections.deque(maxlen=self._cached_entries) self._index: t.List[IndexEntry] = [] self._message_count = 0 self._start_future.result() @property def cached_data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: req = _ReadRequest() self._queue.put(req) return req.future.result() def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): req = _WriteRequest(entries) self._queue.put(req) return req.future.result() def _writer_thread(self) -> None: try: self._file = _open_or_create(self._filename) self._index_file = _open_or_create(_index_filename(self._filename)) except BaseException as e: self._start_future.set_exception(e) return self._start_future.set_result(None) with self._file, self._index_file: running = True while running: item = self._queue.get() if item is _POISON: # None is the poison pill that makes us stop. running = False elif isinstance(item, _ReadRequest): if not item.future.set_running_or_notify_cancel(): continue with _file_lock(self._file, fcntl.LOCK_SH): try: self._catch_up() except BaseException as x: item.future.set_exception(x) else: item.future.set_result(tuple(self._data)) elif isinstance(item, _WriteRequest): if not item.future.set_running_or_notify_cancel(): continue try: with _file_lock(self._file, fcntl.LOCK_EX): self._catch_up() current_ptr = self._file.tell() self._file.truncate(current_ptr) if not self._data: prev_key = None else: prev_key = self._data[-1][self._sample_field] for entry in item.entries: entry_key = entry[self._sample_field] if prev_key is None or prev_key < entry_key: self._data.append(entry) self._file.write(common.bson_encode(entry)) prev_key = entry_key self._file.flush() self._update_index() except BaseException as x: item.future.set_exception(x) else: item.future.set_result(None) else: raise AssertionError( f'Unexpected item {item!r} in the queue') def _catch_up(self) -> None: # Preconditions: # - At least a read lock is held. # - File pointer is at the end of the last-read entry. self._catch_up_index() if self._index: # Since we have an index, use it to find a starting place. last_idx = self._index[-1] # Figure out, based on the number of entries we want to keep # in memory, where we should start reading from. read_start_count = max( 0, last_idx.entry_count - self._cached_entries) if self._message_count < read_start_count: # If we've already read past that starting point, we're OK. for idx_entry in self._index: if read_start_count <= idx_entry.entry_count: break starting_point = idx_entry self._data.clear() self._file.seek(starting_point.byte, os.SEEK_SET) self._message_count = starting_point.entry_count pointer = self._file.tell() try: items = bson.decode_file_iter( self._file, codec_options=common.BSON_OPTIONS) for item in items: pointer = self._file.tell() self._data.append(item) self._message_count += 1 except bson.InvalidBSON: pass # We have reached the last valid document. Bail. # Seek back to immediately after the end of the last valid doc. self._file.seek(pointer, os.SEEK_SET) def _update_index(self) -> None: # Preconditions: # - File pointer is at the end of the last-written message. # - Index pointer is at the last-read index value. # - Exclusive lock is held. with _dup_file(self._file) as update_fp: if self._index: last_idx = self._index[-1] update_fp.seek(last_idx.byte) current_count = last_idx.entry_count decoder = bson.decode_file_iter( update_fp, codec_options=common.BSON_OPTIONS) try: # Skip the current index entry. next(decoder) current_count += 1 current_byte = update_fp.tell() except StopIteration: # If there are no more entries, don't update the index. return else: current_byte = 0 update_fp.seek(current_byte, os.SEEK_SET) current_count = 0 entries = bson.decode_file_iter( update_fp, codec_options=common.BSON_OPTIONS) for entry in entries: if current_count % self._index_gap == 0: idx_entry = IndexEntry( entry_count=current_count, entry_key=entry[self._sample_field], byte=current_byte) self._index.append(idx_entry) self._index_file.truncate() self._index_file.write( common.bson_encode(idx_entry.to_dict())) current_count += 1 current_byte = update_fp.tell() self._index_file.flush() def _catch_up_index(self) -> None: # Preconditions: # - At least a read lock is held. pointer = self._index_file.tell() try: index_entries = bson.decode_file_iter( self._index_file, codec_options=common.BSON_OPTIONS) for entry_dict in index_entries: entry = IndexEntry.from_dict(entry_dict) self._index.append(entry) pointer = self._index_file.tell() except bson.InvalidBSON: pass # We have reached the last valid BSON document. Bail. self._index_file.seek(pointer, os.SEEK_SET) def __del__(self) -> None: self.close() def close(self) -> None: self._queue.put(_POISON) self._thread.join() def _index_filename(filename: str) -> str: return filename + '.index.bson' @contextlib.contextmanager def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]: assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' fcntl.flock(file, operation) try: yield finally: fcntl.flock(file, fcntl.LOCK_UN) def _size(file: t.BinaryIO) -> int: return os.stat(file.fileno()).st_size T = t.TypeVar('T') @attr.s(auto_attribs=True, frozen=True, slots=True) class IndexEntry: entry_count: int entry_key: object byte: int @classmethod def from_dict(cls: t.Type[T], d: t.Dict[str, t.Any]) -> T: return cls(**{f.name: d[f.name] for f in attr.fields(cls)}) def to_dict(self) -> t.Dict[str, object]: return attr.asdict(self, recurse=False) def _open_or_create(path: str) -> t.BinaryIO: while True: try: return open(path, 'r+b') except FileNotFoundError: pass try: return open(path, 'x+b') except FileExistsError: pass def _dup_file(file: t.BinaryIO) -> t.BinaryIO: duplicate = os.dup(file.fileno()) return os.fdopen(duplicate, 'r+b')