Mercurial > personal > weather-server
diff weather_server/logfile.py @ 31:9bc3687e1e5e
logfile: Add an index, and don't keep everything in RAM.
- Adds index BSON file, updated upon writing.
- Limits amount of data in RAM.
- Gracefully handles writes that don't update index.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Tue, 07 Jul 2020 19:51:30 -0400 |
parents | 20c8ec56e447 |
children | b77c8e7d2742 |
line wrap: on
line diff
--- a/weather_server/logfile.py Tue May 19 10:15:29 2020 -0400 +++ b/weather_server/logfile.py Tue Jul 07 19:51:30 2020 -0400 @@ -1,6 +1,8 @@ """The part which handles writing things out and reading things in from CSV. """ +import attr +import collections import concurrent.futures as futures import contextlib import fcntl @@ -14,6 +16,13 @@ from . import common +# The number of entries to keep in memory without reading from disk. +CACHED_ENTRIES = 16384 + +# How many entries to read before creating a new index entry. +INDEX_GAP = 4096 + + class _WriteRequest: def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): @@ -29,6 +38,7 @@ def __init__(self): # The future that will be set with the log's contnets. self.future = futures.Future() + # TODO(pfish): Make it possible to read from a starting point. # Poison pill to tell a logger thread to stop. @@ -38,7 +48,16 @@ class Logger: """Logger which handles reading/writing one temperature log file.""" - def __init__(self, filename: str, *, sample_field: str): + _file: t.BinaryIO + _index_file: t.BinaryIO + + def __init__( + self, + filename: str, + *, + sample_field: str, + cached_entries: int = CACHED_ENTRIES, + index_gap: int = INDEX_GAP): """Creates a new Logger for the given file. Args: @@ -46,21 +65,28 @@ sample_field: The field name to use as the strictly-increasing value to ensure that no duplicate writes occur. """ - self._sample_field = sample_field + + self._filename = filename + self._cached_entries = cached_entries + self._index_gap = index_gap + self._queue = queue.SimpleQueue() - # Create a Future that will be resolved once the file is opened - # (or fails to be opened). - writer_started = futures.Future() - self._writer_thread = threading.Thread( + self._sample_field = sample_field + + self._start_future = futures.Future() + self._thread = threading.Thread( name=f'{filename!r} writer thread', - target=lambda: _writer_thread( - filename, self._queue, sample_field, writer_started), + target=self._writer_thread, daemon=True) - self._writer_thread.start() - writer_started.result() + self._thread.start() + + self._data = collections.deque(maxlen=self._cached_entries) + self._index: t.List[IndexEntry] = [] + self._message_count = 0 + self._start_future.result() @property - def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: + def cached_data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: req = _ReadRequest() self._queue.put(req) return req.future.result() @@ -70,72 +96,159 @@ self._queue.put(req) return req.future.result() - def __del__(self): + def _writer_thread(self) -> None: + try: + self._file = _open_or_create(self._filename) + self._index_file = _open_or_create(_index_filename(self._filename)) + except BaseException as e: + self._start_future.set_exception(e) + return + self._start_future.set_result(None) + + with self._file, self._index_file: + running = True + while running: + item = self._queue.get() + if item is _POISON: + # None is the poison pill that makes us stop. + running = False + elif isinstance(item, _ReadRequest): + if not item.future.set_running_or_notify_cancel(): + continue + with _file_lock(self._file, fcntl.LOCK_SH): + try: + self._catch_up() + except BaseException as x: + item.future.set_exception(x) + else: + item.future.set_result(tuple(self._data)) + elif isinstance(item, _WriteRequest): + if not item.future.set_running_or_notify_cancel(): + continue + try: + with _file_lock(self._file, fcntl.LOCK_EX): + self._catch_up() + current_ptr = self._file.tell() + self._file.truncate(current_ptr) + if not self._data: + prev_key = None + else: + prev_key = self._data[-1][self._sample_field] + for entry in item.entries: + entry_key = entry[self._sample_field] + if prev_key is None or prev_key < entry_key: + self._data.append(entry) + self._file.write(common.bson_encode(entry)) + prev_key = entry_key + self._file.flush() + self._update_index() + except BaseException as x: + item.future.set_exception(x) + else: + item.future.set_result(None) + else: + raise AssertionError( + f'Unexpected item {item!r} in the queue') + + def _catch_up(self) -> None: + # Preconditions: + # - At least a read lock is held. + # - File pointer is at the end of the last-read entry. + self._catch_up_index() + if self._index: + # Since we have an index, use it to find a starting place. + last_idx = self._index[-1] + # Figure out, based on the number of entries we want to keep + # in memory, where we should start reading from. + read_start_count = max( + 0, last_idx.entry_count - self._cached_entries) + if self._message_count < read_start_count: + # If we've already read past that starting point, we're OK. + for idx_entry in self._index: + if read_start_count <= idx_entry.entry_count: + break + starting_point = idx_entry + self._data.clear() + self._file.seek(starting_point.byte, os.SEEK_SET) + self._message_count = starting_point.entry_count + pointer = self._file.tell() + try: + items = bson.decode_file_iter( + self._file, codec_options=common.BSON_OPTIONS) + for item in items: + pointer = self._file.tell() + self._data.append(item) + self._message_count += 1 + except bson.InvalidBSON: + pass # We have reached the last valid document. Bail. + # Seek back to immediately after the end of the last valid doc. + self._file.seek(pointer, os.SEEK_SET) + + def _update_index(self) -> None: + # Preconditions: + # - File pointer is at the end of the last-written message. + # - Index pointer is at the last-read index value. + # - Exclusive lock is held. + with _dup_file(self._file) as update_fp: + if self._index: + last_idx = self._index[-1] + update_fp.seek(last_idx.byte) + current_count = last_idx.entry_count + decoder = bson.decode_file_iter( + update_fp, codec_options=common.BSON_OPTIONS) + try: + # Skip the current index entry. + next(decoder) + current_count += 1 + current_byte = update_fp.tell() + except StopIteration: + # If there are no more entries, don't update the index. + return + else: + current_byte = 0 + update_fp.seek(current_byte, os.SEEK_SET) + current_count = 0 + entries = bson.decode_file_iter( + update_fp, codec_options=common.BSON_OPTIONS) + for entry in entries: + if current_count % self._index_gap == 0: + idx_entry = IndexEntry( + entry_count=current_count, + entry_key=entry[self._sample_field], + byte=current_byte) + self._index.append(idx_entry) + self._index_file.truncate() + self._index_file.write( + common.bson_encode(idx_entry.to_dict())) + current_count += 1 + current_byte = update_fp.tell() + self._index_file.flush() + + def _catch_up_index(self) -> None: + # Preconditions: + # - At least a read lock is held. + pointer = self._index_file.tell() + try: + index_entries = bson.decode_file_iter( + self._index_file, codec_options=common.BSON_OPTIONS) + for entry_dict in index_entries: + entry = IndexEntry.from_dict(entry_dict) + self._index.append(entry) + pointer = self._index_file.tell() + except bson.InvalidBSON: + pass # We have reached the last valid BSON document. Bail. + self._index_file.seek(pointer, os.SEEK_SET) + + def __del__(self) -> None: self.close() - def close(self): + def close(self) -> None: self._queue.put(_POISON) - self._writer_thread.join() + self._thread.join() -def _writer_thread( - filename: str, - q: queue.Queue, - sample_field: str, - started: futures.Future, -) -> None: - if not started.set_running_or_notify_cancel(): - return - try: - file = _open_or_create(filename) - started.set_result(None) - except BaseException as e: - started.set_exception(e) - return - with file: - running = True - data: t.List[t.Dict[str, object]] = [] - while running: - item = q.get() - if item is _POISON: - # None is the poison pill that makes us stop. - running = False - elif isinstance(item, _ReadRequest): - if not item.future.set_running_or_notify_cancel(): - continue - try: - with _file_lock(file, fcntl.LOCK_SH): - data.extend(_catch_up(file)) - except BaseException as x: - item.future.set_exception(x) - else: - item.future.set_result(tuple(data)) - elif isinstance(item, _WriteRequest): - if not item.future.set_running_or_notify_cancel(): - continue - try: - with _file_lock(file, fcntl.LOCK_EX): - data.extend(_catch_up(file)) - # Since we're at the last good point, truncate after. - file.truncate(file.tell()) - if not data: - last = None - else: - last = data[-1][sample_field] - for entry in item.entries: - entry_key = entry[sample_field] - if last is None or last < entry_key: - file.write(common.bson_encode(entry)) - data.append(entry) - last = entry_key - file.flush() - except BaseException as x: - item.future.set_exception(x) - else: - item.future.set_result(None) - else: - raise AssertionError( - 'Unexpected item {!r} in the queue'.format(item)) +def _index_filename(filename: str) -> str: + return filename + '.index.bson' @contextlib.contextmanager @@ -152,24 +265,21 @@ return os.stat(file.fileno()).st_size -def _catch_up(file: t.BinaryIO) -> t.Iterable[t.Dict[str, object]]: - """Reads data and advances the file pointer to the end of the file.""" - size = _size(file) - pointer = file.tell() - if size == pointer: - return () - output: t.List[t.Dict[str, object]] = [] - try: - items = bson.decode_file_iter( - file, codec_options=common.BSON_OPTIONS) - for item in items: - pointer = file.tell() - output.append(item) - except bson.InvalidBSON: - pass # We have reached the last valid document. Bail. - # Seek back to immediately after the end of the last valid doc. - file.seek(pointer, os.SEEK_SET) - return output +T = t.TypeVar('T') + + +@attr.s(auto_attribs=True, frozen=True, slots=True) +class IndexEntry: + entry_count: int + entry_key: object + byte: int + + @classmethod + def from_dict(cls: t.Type[T], d: t.Dict[str, t.Any]) -> T: + return cls(**{f.name: d[f.name] for f in attr.fields(cls)}) + + def to_dict(self) -> t.Dict[str, object]: + return attr.asdict(self, recurse=False) def _open_or_create(path: str) -> t.BinaryIO: @@ -182,3 +292,8 @@ return open(path, 'x+b') except FileExistsError: pass + + +def _dup_file(file: t.BinaryIO) -> t.BinaryIO: + duplicate = os.dup(file.fileno()) + return os.fdopen(duplicate, 'r+b')