# HG changeset patch # User Paul Fisher # Date 1594165890 14400 # Node ID 9bc3687e1e5e86fbda3a3091cffe6e53efe544d3 # Parent c760ab7f93c2bcc3379fb66187e67e8e611d3b6c logfile: Add an index, and don't keep everything in RAM. - Adds index BSON file, updated upon writing. - Limits amount of data in RAM. - Gracefully handles writes that don't update index. diff -r c760ab7f93c2 -r 9bc3687e1e5e setup.py --- a/setup.py Tue May 19 10:15:29 2020 -0400 +++ b/setup.py Tue Jul 07 19:51:30 2020 -0400 @@ -2,7 +2,7 @@ setuptools.setup( name='weather-server', - version='0.1.2', + version='0.1.3', packages=setuptools.find_packages(), python_requires='>=3.7', install_requires=[ diff -r c760ab7f93c2 -r 9bc3687e1e5e weather_server/locations.py --- a/weather_server/locations.py Tue May 19 10:15:29 2020 -0400 +++ b/weather_server/locations.py Tue Jul 07 19:51:30 2020 -0400 @@ -48,10 +48,10 @@ @property def entries(self) -> t.Iterable[t.Dict[str, object]]: - return self.logger.data + return self.logger.cached_data def latest(self) -> t.Optional[types.Reading]: - most_recent = reversed(self.logger.data) + most_recent = reversed(self.logger.cached_data) for entry in most_recent: try: return types.Reading.from_dict(entry) diff -r c760ab7f93c2 -r 9bc3687e1e5e weather_server/logfile.py --- a/weather_server/logfile.py Tue May 19 10:15:29 2020 -0400 +++ b/weather_server/logfile.py Tue Jul 07 19:51:30 2020 -0400 @@ -1,6 +1,8 @@ """The part which handles writing things out and reading things in from CSV. """ +import attr +import collections import concurrent.futures as futures import contextlib import fcntl @@ -14,6 +16,13 @@ from . import common +# The number of entries to keep in memory without reading from disk. +CACHED_ENTRIES = 16384 + +# How many entries to read before creating a new index entry. +INDEX_GAP = 4096 + + class _WriteRequest: def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): @@ -29,6 +38,7 @@ def __init__(self): # The future that will be set with the log's contnets. self.future = futures.Future() + # TODO(pfish): Make it possible to read from a starting point. # Poison pill to tell a logger thread to stop. @@ -38,7 +48,16 @@ class Logger: """Logger which handles reading/writing one temperature log file.""" - def __init__(self, filename: str, *, sample_field: str): + _file: t.BinaryIO + _index_file: t.BinaryIO + + def __init__( + self, + filename: str, + *, + sample_field: str, + cached_entries: int = CACHED_ENTRIES, + index_gap: int = INDEX_GAP): """Creates a new Logger for the given file. Args: @@ -46,21 +65,28 @@ sample_field: The field name to use as the strictly-increasing value to ensure that no duplicate writes occur. """ - self._sample_field = sample_field + + self._filename = filename + self._cached_entries = cached_entries + self._index_gap = index_gap + self._queue = queue.SimpleQueue() - # Create a Future that will be resolved once the file is opened - # (or fails to be opened). - writer_started = futures.Future() - self._writer_thread = threading.Thread( + self._sample_field = sample_field + + self._start_future = futures.Future() + self._thread = threading.Thread( name=f'{filename!r} writer thread', - target=lambda: _writer_thread( - filename, self._queue, sample_field, writer_started), + target=self._writer_thread, daemon=True) - self._writer_thread.start() - writer_started.result() + self._thread.start() + + self._data = collections.deque(maxlen=self._cached_entries) + self._index: t.List[IndexEntry] = [] + self._message_count = 0 + self._start_future.result() @property - def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: + def cached_data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: req = _ReadRequest() self._queue.put(req) return req.future.result() @@ -70,72 +96,159 @@ self._queue.put(req) return req.future.result() - def __del__(self): + def _writer_thread(self) -> None: + try: + self._file = _open_or_create(self._filename) + self._index_file = _open_or_create(_index_filename(self._filename)) + except BaseException as e: + self._start_future.set_exception(e) + return + self._start_future.set_result(None) + + with self._file, self._index_file: + running = True + while running: + item = self._queue.get() + if item is _POISON: + # None is the poison pill that makes us stop. + running = False + elif isinstance(item, _ReadRequest): + if not item.future.set_running_or_notify_cancel(): + continue + with _file_lock(self._file, fcntl.LOCK_SH): + try: + self._catch_up() + except BaseException as x: + item.future.set_exception(x) + else: + item.future.set_result(tuple(self._data)) + elif isinstance(item, _WriteRequest): + if not item.future.set_running_or_notify_cancel(): + continue + try: + with _file_lock(self._file, fcntl.LOCK_EX): + self._catch_up() + current_ptr = self._file.tell() + self._file.truncate(current_ptr) + if not self._data: + prev_key = None + else: + prev_key = self._data[-1][self._sample_field] + for entry in item.entries: + entry_key = entry[self._sample_field] + if prev_key is None or prev_key < entry_key: + self._data.append(entry) + self._file.write(common.bson_encode(entry)) + prev_key = entry_key + self._file.flush() + self._update_index() + except BaseException as x: + item.future.set_exception(x) + else: + item.future.set_result(None) + else: + raise AssertionError( + f'Unexpected item {item!r} in the queue') + + def _catch_up(self) -> None: + # Preconditions: + # - At least a read lock is held. + # - File pointer is at the end of the last-read entry. + self._catch_up_index() + if self._index: + # Since we have an index, use it to find a starting place. + last_idx = self._index[-1] + # Figure out, based on the number of entries we want to keep + # in memory, where we should start reading from. + read_start_count = max( + 0, last_idx.entry_count - self._cached_entries) + if self._message_count < read_start_count: + # If we've already read past that starting point, we're OK. + for idx_entry in self._index: + if read_start_count <= idx_entry.entry_count: + break + starting_point = idx_entry + self._data.clear() + self._file.seek(starting_point.byte, os.SEEK_SET) + self._message_count = starting_point.entry_count + pointer = self._file.tell() + try: + items = bson.decode_file_iter( + self._file, codec_options=common.BSON_OPTIONS) + for item in items: + pointer = self._file.tell() + self._data.append(item) + self._message_count += 1 + except bson.InvalidBSON: + pass # We have reached the last valid document. Bail. + # Seek back to immediately after the end of the last valid doc. + self._file.seek(pointer, os.SEEK_SET) + + def _update_index(self) -> None: + # Preconditions: + # - File pointer is at the end of the last-written message. + # - Index pointer is at the last-read index value. + # - Exclusive lock is held. + with _dup_file(self._file) as update_fp: + if self._index: + last_idx = self._index[-1] + update_fp.seek(last_idx.byte) + current_count = last_idx.entry_count + decoder = bson.decode_file_iter( + update_fp, codec_options=common.BSON_OPTIONS) + try: + # Skip the current index entry. + next(decoder) + current_count += 1 + current_byte = update_fp.tell() + except StopIteration: + # If there are no more entries, don't update the index. + return + else: + current_byte = 0 + update_fp.seek(current_byte, os.SEEK_SET) + current_count = 0 + entries = bson.decode_file_iter( + update_fp, codec_options=common.BSON_OPTIONS) + for entry in entries: + if current_count % self._index_gap == 0: + idx_entry = IndexEntry( + entry_count=current_count, + entry_key=entry[self._sample_field], + byte=current_byte) + self._index.append(idx_entry) + self._index_file.truncate() + self._index_file.write( + common.bson_encode(idx_entry.to_dict())) + current_count += 1 + current_byte = update_fp.tell() + self._index_file.flush() + + def _catch_up_index(self) -> None: + # Preconditions: + # - At least a read lock is held. + pointer = self._index_file.tell() + try: + index_entries = bson.decode_file_iter( + self._index_file, codec_options=common.BSON_OPTIONS) + for entry_dict in index_entries: + entry = IndexEntry.from_dict(entry_dict) + self._index.append(entry) + pointer = self._index_file.tell() + except bson.InvalidBSON: + pass # We have reached the last valid BSON document. Bail. + self._index_file.seek(pointer, os.SEEK_SET) + + def __del__(self) -> None: self.close() - def close(self): + def close(self) -> None: self._queue.put(_POISON) - self._writer_thread.join() + self._thread.join() -def _writer_thread( - filename: str, - q: queue.Queue, - sample_field: str, - started: futures.Future, -) -> None: - if not started.set_running_or_notify_cancel(): - return - try: - file = _open_or_create(filename) - started.set_result(None) - except BaseException as e: - started.set_exception(e) - return - with file: - running = True - data: t.List[t.Dict[str, object]] = [] - while running: - item = q.get() - if item is _POISON: - # None is the poison pill that makes us stop. - running = False - elif isinstance(item, _ReadRequest): - if not item.future.set_running_or_notify_cancel(): - continue - try: - with _file_lock(file, fcntl.LOCK_SH): - data.extend(_catch_up(file)) - except BaseException as x: - item.future.set_exception(x) - else: - item.future.set_result(tuple(data)) - elif isinstance(item, _WriteRequest): - if not item.future.set_running_or_notify_cancel(): - continue - try: - with _file_lock(file, fcntl.LOCK_EX): - data.extend(_catch_up(file)) - # Since we're at the last good point, truncate after. - file.truncate(file.tell()) - if not data: - last = None - else: - last = data[-1][sample_field] - for entry in item.entries: - entry_key = entry[sample_field] - if last is None or last < entry_key: - file.write(common.bson_encode(entry)) - data.append(entry) - last = entry_key - file.flush() - except BaseException as x: - item.future.set_exception(x) - else: - item.future.set_result(None) - else: - raise AssertionError( - 'Unexpected item {!r} in the queue'.format(item)) +def _index_filename(filename: str) -> str: + return filename + '.index.bson' @contextlib.contextmanager @@ -152,24 +265,21 @@ return os.stat(file.fileno()).st_size -def _catch_up(file: t.BinaryIO) -> t.Iterable[t.Dict[str, object]]: - """Reads data and advances the file pointer to the end of the file.""" - size = _size(file) - pointer = file.tell() - if size == pointer: - return () - output: t.List[t.Dict[str, object]] = [] - try: - items = bson.decode_file_iter( - file, codec_options=common.BSON_OPTIONS) - for item in items: - pointer = file.tell() - output.append(item) - except bson.InvalidBSON: - pass # We have reached the last valid document. Bail. - # Seek back to immediately after the end of the last valid doc. - file.seek(pointer, os.SEEK_SET) - return output +T = t.TypeVar('T') + + +@attr.s(auto_attribs=True, frozen=True, slots=True) +class IndexEntry: + entry_count: int + entry_key: object + byte: int + + @classmethod + def from_dict(cls: t.Type[T], d: t.Dict[str, t.Any]) -> T: + return cls(**{f.name: d[f.name] for f in attr.fields(cls)}) + + def to_dict(self) -> t.Dict[str, object]: + return attr.asdict(self, recurse=False) def _open_or_create(path: str) -> t.BinaryIO: @@ -182,3 +292,8 @@ return open(path, 'x+b') except FileExistsError: pass + + +def _dup_file(file: t.BinaryIO) -> t.BinaryIO: + duplicate = os.dup(file.fileno()) + return os.fdopen(duplicate, 'r+b') diff -r c760ab7f93c2 -r 9bc3687e1e5e weather_server/logfile_test.py --- a/weather_server/logfile_test.py Tue May 19 10:15:29 2020 -0400 +++ b/weather_server/logfile_test.py Tue Jul 07 19:51:30 2020 -0400 @@ -3,7 +3,6 @@ import os.path import pathlib import tempfile -import threading import unittest import bson @@ -25,6 +24,7 @@ super().setUp() self.temp_dir = tempfile.TemporaryDirectory() self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson' + self.index_path = pathlib.Path(str(self.log_path) + '.index.bson') def tearDown(self): self.temp_dir.cleanup() @@ -34,7 +34,7 @@ lg = logfile.Logger( str(self.log_path), sample_field='x') with contextlib.closing(lg) as logger: - self.assertEqual(logger.data, ()) + self.assertEqual(logger.cached_data, ()) def test_fails_to_open(self): with self.assertRaises(OSError): @@ -64,7 +64,7 @@ sample_field='sample_time', )) as logger: self.assertEqual( - logger.data, + logger.cached_data, ( dict( sample_time=ts(123), @@ -84,6 +84,8 @@ with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='sample_time', + cached_entries=4, + index_gap=2, )) as logger: logger.write_rows([ # Ignored, since it's older than the newest entry. @@ -101,7 +103,7 @@ ), ]) self.assertEqual( - logger.data, + logger.cached_data, ( dict( sample_time=ts(123), @@ -132,11 +134,15 @@ ingest_time=ts(200), ), ]) + self.assertEqual(self.read_index(), [ + dict(entry_count=0, entry_key=ts(123), byte=0), + ]) def test_outside_writes(self): with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='sample_time', + index_gap=2, )) as logger: logger.write_rows([ dict( @@ -160,7 +166,9 @@ ingest_time=ts(4096), ))) outfile.flush() - self.assertEqual(logger.data, ( + # Empty write to update the index. + logger.write_rows([]) + self.assertEqual(logger.cached_data, ( dict( sample_time=ts(100), temp_c=999, @@ -180,12 +188,199 @@ ingest_time=ts(4096), ), )) + self.assertEqual(self.read_bsons(), [ + dict( + sample_time=ts(100), + temp_c=999, + rh_pct=666, + ingest_time=ts(101), + ), + dict( + sample_time=ts(125), + temp_c=333, + rh_pct=777, + ingest_time=ts(200), + ), + dict( + sample_time=ts(1024), + temp_c=256, + rh_pct=128, + ingest_time=ts(4096), + ), + ]) + self.assertEqual(self.read_index(), [ + dict(entry_count=0, entry_key=ts(100), byte=0), + dict(entry_count=2, entry_key=ts(1024), byte=142), + ]) + + def test_lots_of_outside_writes(self): + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=4, + index_gap=2, + )) as logger: + logger.write_rows(make_messages(10)) + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=4, + index_gap=2, + )) as other_logger: + other_logger.write_rows(make_messages(10, 10)) + logger.write_rows(make_messages(1, 20)) + self.assertEqual(logger.cached_data, ( + dict(key='0011', value=17), + dict(key='0012', value=18), + dict(key='0013', value=19), + dict(key='0014', value=20), + )) + self.assertEqual(self.read_bsons(), make_messages(21)) + self.assertEqual(self.read_index(), [ + dict(entry_count=0, entry_key='0000', byte=0), + dict(entry_count=2, entry_key='0002', byte=60), + dict(entry_count=4, entry_key='0004', byte=120), + dict(entry_count=6, entry_key='0006', byte=180), + dict(entry_count=8, entry_key='0008', byte=240), + dict(entry_count=10, entry_key='000a', byte=300), + dict(entry_count=12, entry_key='000c', byte=360), + dict(entry_count=14, entry_key='000e', byte=420), + dict(entry_count=16, entry_key='0010', byte=480), + dict(entry_count=18, entry_key='0012', byte=540), + dict(entry_count=20, entry_key='0014', byte=600), + ]) + + def test_outside_writes_that_dont_update_index(self): + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=4, + index_gap=2, + )) as logger: + logger.write_rows(make_messages(10)) + with self.log_path.open('ab') as outfile: + for item in make_messages(16, 256): + outfile.write(common.bson_encode(item)) + self.assertEqual(logger.cached_data, ( + dict(key='010c', value=268), + dict(key='010d', value=269), + dict(key='010e', value=270), + dict(key='010f', value=271), + )) + self.assertEqual(self.read_bsons(), [ + dict(key='0000', value=0), + dict(key='0001', value=1), + dict(key='0002', value=2), + dict(key='0003', value=3), + dict(key='0004', value=4), + dict(key='0005', value=5), + dict(key='0006', value=6), + dict(key='0007', value=7), + dict(key='0008', value=8), + dict(key='0009', value=9), + dict(key='0100', value=256), + dict(key='0101', value=257), + dict(key='0102', value=258), + dict(key='0103', value=259), + dict(key='0104', value=260), + dict(key='0105', value=261), + dict(key='0106', value=262), + dict(key='0107', value=263), + dict(key='0108', value=264), + dict(key='0109', value=265), + dict(key='010a', value=266), + dict(key='010b', value=267), + dict(key='010c', value=268), + dict(key='010d', value=269), + dict(key='010e', value=270), + dict(key='010f', value=271), + ]) + self.assertEqual(self.read_index(), [ + dict(entry_count=0, entry_key='0000', byte=0), + dict(entry_count=2, entry_key='0002', byte=60), + dict(entry_count=4, entry_key='0004', byte=120), + dict(entry_count=6, entry_key='0006', byte=180), + dict(entry_count=8, entry_key='0008', byte=240), + ]) + + def test_outside_write_but_dont_skip(self): + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=10, + index_gap=4, + )) as logger: + logger.write_rows(make_messages(10)) + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=10, + index_gap=4, + )) as other_logger: + other_logger.write_rows(make_messages(5, 10)) + self.assertEqual(logger.cached_data, ( + dict(key='0005', value=5), + dict(key='0006', value=6), + dict(key='0007', value=7), + dict(key='0008', value=8), + dict(key='0009', value=9), + dict(key='000a', value=10), + dict(key='000b', value=11), + dict(key='000c', value=12), + dict(key='000d', value=13), + dict(key='000e', value=14), + )) + logger.write_rows(make_messages(5, 15)) + self.assertEqual(self.read_index(), [ + dict(entry_count=0, entry_key='0000', byte=0), + dict(entry_count=4, entry_key='0004', byte=120), + dict(entry_count=8, entry_key='0008', byte=240), + dict(entry_count=12, entry_key='000c', byte=360), + dict(entry_count=16, entry_key='0010', byte=480), + ]) + logger.write_rows(make_messages(1, 20)) + self.assertEqual(self.read_index(), [ + dict(entry_count=0, entry_key='0000', byte=0), + dict(entry_count=4, entry_key='0004', byte=120), + dict(entry_count=8, entry_key='0008', byte=240), + dict(entry_count=12, entry_key='000c', byte=360), + dict(entry_count=16, entry_key='0010', byte=480), + dict(entry_count=20, entry_key='0014', byte=600), + ]) + + def test_start_and_skip(self): + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=4, + index_gap=2, + )) as logger: + logger.write_rows(make_messages(10)) + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=4, + index_gap=2, + )) as other_logger: + self.assertEqual( + other_logger.cached_data, tuple(make_messages(4, 6))) def read_bsons(self): with self.log_path.open('rb') as infile: return bson.decode_all( infile.read(), common.BSON_OPTIONS) + def read_index(self): + with self.index_path.open('rb') as infile: + return bson.decode_all( + infile.read(), common.BSON_OPTIONS) + + +def make_messages(count, start=0): + return [ + dict(key=format(n, '04x'), value=n) + for n in range(start, start + count)] + if __name__ == '__main__': unittest.main()