changeset 31:9bc3687e1e5e

logfile: Add an index, and don't keep everything in RAM. - Adds index BSON file, updated upon writing. - Limits amount of data in RAM. - Gracefully handles writes that don't update index.
author Paul Fisher <paul@pfish.zone>
date Tue, 07 Jul 2020 19:51:30 -0400
parents c760ab7f93c2
children 2faf3499a226
files setup.py weather_server/locations.py weather_server/logfile.py weather_server/logfile_test.py
diffstat 4 files changed, 408 insertions(+), 98 deletions(-) [+]
line wrap: on
line diff
--- a/setup.py	Tue May 19 10:15:29 2020 -0400
+++ b/setup.py	Tue Jul 07 19:51:30 2020 -0400
@@ -2,7 +2,7 @@
 
 setuptools.setup(
     name='weather-server',
-    version='0.1.2',
+    version='0.1.3',
     packages=setuptools.find_packages(),
     python_requires='>=3.7',
     install_requires=[
--- a/weather_server/locations.py	Tue May 19 10:15:29 2020 -0400
+++ b/weather_server/locations.py	Tue Jul 07 19:51:30 2020 -0400
@@ -48,10 +48,10 @@
 
     @property
     def entries(self) -> t.Iterable[t.Dict[str, object]]:
-        return self.logger.data
+        return self.logger.cached_data
 
     def latest(self) -> t.Optional[types.Reading]:
-        most_recent = reversed(self.logger.data)
+        most_recent = reversed(self.logger.cached_data)
         for entry in most_recent:
             try:
                 return types.Reading.from_dict(entry)
--- a/weather_server/logfile.py	Tue May 19 10:15:29 2020 -0400
+++ b/weather_server/logfile.py	Tue Jul 07 19:51:30 2020 -0400
@@ -1,6 +1,8 @@
 """The part which handles writing things out and reading things in from CSV.
 """
 
+import attr
+import collections
 import concurrent.futures as futures
 import contextlib
 import fcntl
@@ -14,6 +16,13 @@
 from . import common
 
 
+# The number of entries to keep in memory without reading from disk.
+CACHED_ENTRIES = 16384
+
+# How many entries to read before creating a new index entry.
+INDEX_GAP = 4096
+
+
 class _WriteRequest:
 
     def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]):
@@ -29,6 +38,7 @@
     def __init__(self):
         # The future that will be set with the log's contnets.
         self.future = futures.Future()
+        # TODO(pfish): Make it possible to read from a starting point.
 
 
 # Poison pill to tell a logger thread to stop.
@@ -38,7 +48,16 @@
 class Logger:
     """Logger which handles reading/writing one temperature log file."""
 
-    def __init__(self, filename: str, *, sample_field: str):
+    _file: t.BinaryIO
+    _index_file: t.BinaryIO
+
+    def __init__(
+            self,
+            filename: str,
+            *,
+            sample_field: str,
+            cached_entries: int = CACHED_ENTRIES,
+            index_gap: int = INDEX_GAP):
         """Creates a new Logger for the given file.
 
         Args:
@@ -46,21 +65,28 @@
             sample_field: The field name to use as the strictly-increasing
                 value to ensure that no duplicate writes occur.
         """
-        self._sample_field = sample_field
+
+        self._filename = filename
+        self._cached_entries = cached_entries
+        self._index_gap = index_gap
+
         self._queue = queue.SimpleQueue()
-        # Create a Future that will be resolved once the file is opened
-        # (or fails to be opened).
-        writer_started = futures.Future()
-        self._writer_thread = threading.Thread(
+        self._sample_field = sample_field
+
+        self._start_future = futures.Future()
+        self._thread = threading.Thread(
             name=f'{filename!r} writer thread',
-            target=lambda: _writer_thread(
-                filename, self._queue, sample_field, writer_started),
+            target=self._writer_thread,
             daemon=True)
-        self._writer_thread.start()
-        writer_started.result()
+        self._thread.start()
+
+        self._data = collections.deque(maxlen=self._cached_entries)
+        self._index: t.List[IndexEntry] = []
+        self._message_count = 0
+        self._start_future.result()
 
     @property
-    def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
+    def cached_data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
         req = _ReadRequest()
         self._queue.put(req)
         return req.future.result()
@@ -70,72 +96,159 @@
         self._queue.put(req)
         return req.future.result()
 
-    def __del__(self):
+    def _writer_thread(self) -> None:
+        try:
+            self._file = _open_or_create(self._filename)
+            self._index_file = _open_or_create(_index_filename(self._filename))
+        except BaseException as e:
+            self._start_future.set_exception(e)
+            return
+        self._start_future.set_result(None)
+
+        with self._file, self._index_file:
+            running = True
+            while running:
+                item = self._queue.get()
+                if item is _POISON:
+                    # None is the poison pill that makes us stop.
+                    running = False
+                elif isinstance(item, _ReadRequest):
+                    if not item.future.set_running_or_notify_cancel():
+                        continue
+                    with _file_lock(self._file, fcntl.LOCK_SH):
+                        try:
+                            self._catch_up()
+                        except BaseException as x:
+                            item.future.set_exception(x)
+                        else:
+                            item.future.set_result(tuple(self._data))
+                elif isinstance(item, _WriteRequest):
+                    if not item.future.set_running_or_notify_cancel():
+                        continue
+                    try:
+                        with _file_lock(self._file, fcntl.LOCK_EX):
+                            self._catch_up()
+                            current_ptr = self._file.tell()
+                            self._file.truncate(current_ptr)
+                            if not self._data:
+                                prev_key = None
+                            else:
+                                prev_key = self._data[-1][self._sample_field]
+                            for entry in item.entries:
+                                entry_key = entry[self._sample_field]
+                                if prev_key is None or prev_key < entry_key:
+                                    self._data.append(entry)
+                                    self._file.write(common.bson_encode(entry))
+                                    prev_key = entry_key
+                            self._file.flush()
+                            self._update_index()
+                    except BaseException as x:
+                        item.future.set_exception(x)
+                    else:
+                        item.future.set_result(None)
+                else:
+                    raise AssertionError(
+                        f'Unexpected item {item!r} in the queue')
+
+    def _catch_up(self) -> None:
+        # Preconditions:
+        # - At least a read lock is held.
+        # - File pointer is at the end of the last-read entry.
+        self._catch_up_index()
+        if self._index:
+            # Since we have an index, use it to find a starting place.
+            last_idx = self._index[-1]
+            # Figure out, based on the number of entries we want to keep
+            # in memory, where we should start reading from.
+            read_start_count = max(
+                0, last_idx.entry_count - self._cached_entries)
+            if self._message_count < read_start_count:
+                # If we've already read past that starting point, we're OK.
+                for idx_entry in self._index:
+                    if read_start_count <= idx_entry.entry_count:
+                        break
+                    starting_point = idx_entry
+                self._data.clear()
+                self._file.seek(starting_point.byte, os.SEEK_SET)
+                self._message_count = starting_point.entry_count
+        pointer = self._file.tell()
+        try:
+            items = bson.decode_file_iter(
+                self._file, codec_options=common.BSON_OPTIONS)
+            for item in items:
+                pointer = self._file.tell()
+                self._data.append(item)
+                self._message_count += 1
+        except bson.InvalidBSON:
+            pass  # We have reached the last valid document. Bail.
+        # Seek back to immediately after the end of the last valid doc.
+        self._file.seek(pointer, os.SEEK_SET)
+
+    def _update_index(self) -> None:
+        # Preconditions:
+        # - File pointer is at the end of the last-written message.
+        # - Index pointer is at the last-read index value.
+        # - Exclusive lock is held.
+        with _dup_file(self._file) as update_fp:
+            if self._index:
+                last_idx = self._index[-1]
+                update_fp.seek(last_idx.byte)
+                current_count = last_idx.entry_count
+                decoder = bson.decode_file_iter(
+                    update_fp, codec_options=common.BSON_OPTIONS)
+                try:
+                    # Skip the current index entry.
+                    next(decoder)
+                    current_count += 1
+                    current_byte = update_fp.tell()
+                except StopIteration:
+                    # If there are no more entries, don't update the index.
+                    return
+            else:
+                current_byte = 0
+                update_fp.seek(current_byte, os.SEEK_SET)
+                current_count = 0
+            entries = bson.decode_file_iter(
+                update_fp, codec_options=common.BSON_OPTIONS)
+            for entry in entries:
+                if current_count % self._index_gap == 0:
+                    idx_entry = IndexEntry(
+                        entry_count=current_count,
+                        entry_key=entry[self._sample_field],
+                        byte=current_byte)
+                    self._index.append(idx_entry)
+                    self._index_file.truncate()
+                    self._index_file.write(
+                        common.bson_encode(idx_entry.to_dict()))
+                current_count += 1
+                current_byte = update_fp.tell()
+        self._index_file.flush()
+
+    def _catch_up_index(self) -> None:
+        # Preconditions:
+        # - At least a read lock is held.
+        pointer = self._index_file.tell()
+        try:
+            index_entries = bson.decode_file_iter(
+                self._index_file, codec_options=common.BSON_OPTIONS)
+            for entry_dict in index_entries:
+                entry = IndexEntry.from_dict(entry_dict)
+                self._index.append(entry)
+                pointer = self._index_file.tell()
+        except bson.InvalidBSON:
+            pass  # We have reached the last valid BSON document. Bail.
+        self._index_file.seek(pointer, os.SEEK_SET)
+
+    def __del__(self) -> None:
         self.close()
 
-    def close(self):
+    def close(self) -> None:
         self._queue.put(_POISON)
-        self._writer_thread.join()
+        self._thread.join()
 
 
-def _writer_thread(
-    filename: str,
-    q: queue.Queue,
-    sample_field: str,
-    started: futures.Future,
-) -> None:
-    if not started.set_running_or_notify_cancel():
-        return
-    try:
-        file = _open_or_create(filename)
-        started.set_result(None)
-    except BaseException as e:
-        started.set_exception(e)
-        return
-    with file:
-        running = True
-        data: t.List[t.Dict[str, object]] = []
-        while running:
-            item = q.get()
-            if item is _POISON:
-                # None is the poison pill that makes us stop.
-                running = False
-            elif isinstance(item, _ReadRequest):
-                if not item.future.set_running_or_notify_cancel():
-                    continue
-                try:
-                    with _file_lock(file, fcntl.LOCK_SH):
-                        data.extend(_catch_up(file))
-                except BaseException as x:
-                    item.future.set_exception(x)
-                else:
-                    item.future.set_result(tuple(data))
-            elif isinstance(item, _WriteRequest):
-                if not item.future.set_running_or_notify_cancel():
-                    continue
-                try:
-                    with _file_lock(file, fcntl.LOCK_EX):
-                        data.extend(_catch_up(file))
-                        # Since we're at the last good point, truncate after.
-                        file.truncate(file.tell())
-                        if not data:
-                            last = None
-                        else:
-                            last = data[-1][sample_field]
-                        for entry in item.entries:
-                            entry_key = entry[sample_field]
-                            if last is None or last < entry_key:
-                                file.write(common.bson_encode(entry))
-                                data.append(entry)
-                                last = entry_key
-                        file.flush()
-                except BaseException as x:
-                    item.future.set_exception(x)
-                else:
-                    item.future.set_result(None)
-            else:
-                raise AssertionError(
-                    'Unexpected item {!r} in the queue'.format(item))
+def _index_filename(filename: str) -> str:
+    return filename + '.index.bson'
 
 
 @contextlib.contextmanager
@@ -152,24 +265,21 @@
     return os.stat(file.fileno()).st_size
 
 
-def _catch_up(file: t.BinaryIO) -> t.Iterable[t.Dict[str, object]]:
-    """Reads data and advances the file pointer to the end of the file."""
-    size = _size(file)
-    pointer = file.tell()
-    if size == pointer:
-        return ()
-    output: t.List[t.Dict[str, object]] = []
-    try:
-        items = bson.decode_file_iter(
-            file, codec_options=common.BSON_OPTIONS)
-        for item in items:
-            pointer = file.tell()
-            output.append(item)
-    except bson.InvalidBSON:
-        pass  # We have reached the last valid document.  Bail.
-    # Seek back to immediately after the end of the last valid doc.
-    file.seek(pointer, os.SEEK_SET)
-    return output
+T = t.TypeVar('T')
+
+
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class IndexEntry:
+    entry_count: int
+    entry_key: object
+    byte: int
+
+    @classmethod
+    def from_dict(cls: t.Type[T], d: t.Dict[str, t.Any]) -> T:
+        return cls(**{f.name: d[f.name] for f in attr.fields(cls)})
+
+    def to_dict(self) -> t.Dict[str, object]:
+        return attr.asdict(self, recurse=False)
 
 
 def _open_or_create(path: str) -> t.BinaryIO:
@@ -182,3 +292,8 @@
             return open(path, 'x+b')
         except FileExistsError:
             pass
+
+
+def _dup_file(file: t.BinaryIO) -> t.BinaryIO:
+    duplicate = os.dup(file.fileno())
+    return os.fdopen(duplicate, 'r+b')
--- a/weather_server/logfile_test.py	Tue May 19 10:15:29 2020 -0400
+++ b/weather_server/logfile_test.py	Tue Jul 07 19:51:30 2020 -0400
@@ -3,7 +3,6 @@
 import os.path
 import pathlib
 import tempfile
-import threading
 import unittest
 
 import bson
@@ -25,6 +24,7 @@
         super().setUp()
         self.temp_dir = tempfile.TemporaryDirectory()
         self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson'
+        self.index_path = pathlib.Path(str(self.log_path) + '.index.bson')
 
     def tearDown(self):
         self.temp_dir.cleanup()
@@ -34,7 +34,7 @@
         lg = logfile.Logger(
             str(self.log_path), sample_field='x')
         with contextlib.closing(lg) as logger:
-            self.assertEqual(logger.data, ())
+            self.assertEqual(logger.cached_data, ())
 
     def test_fails_to_open(self):
         with self.assertRaises(OSError):
@@ -64,7 +64,7 @@
             sample_field='sample_time',
         )) as logger:
             self.assertEqual(
-                logger.data,
+                logger.cached_data,
                 (
                     dict(
                         sample_time=ts(123),
@@ -84,6 +84,8 @@
         with contextlib.closing(logfile.Logger(
             str(self.log_path),
             sample_field='sample_time',
+            cached_entries=4,
+            index_gap=2,
         )) as logger:
             logger.write_rows([
                 # Ignored, since it's older than the newest entry.
@@ -101,7 +103,7 @@
                 ),
             ])
             self.assertEqual(
-                logger.data,
+                logger.cached_data,
                 (
                     dict(
                         sample_time=ts(123),
@@ -132,11 +134,15 @@
                 ingest_time=ts(200),
             ),
         ])
+        self.assertEqual(self.read_index(), [
+            dict(entry_count=0, entry_key=ts(123), byte=0),
+        ])
 
     def test_outside_writes(self):
         with contextlib.closing(logfile.Logger(
             str(self.log_path),
             sample_field='sample_time',
+            index_gap=2,
         )) as logger:
             logger.write_rows([
                 dict(
@@ -160,7 +166,9 @@
                     ingest_time=ts(4096),
                 )))
                 outfile.flush()
-            self.assertEqual(logger.data, (
+            # Empty write to update the index.
+            logger.write_rows([])
+            self.assertEqual(logger.cached_data, (
                 dict(
                     sample_time=ts(100),
                     temp_c=999,
@@ -180,12 +188,199 @@
                     ingest_time=ts(4096),
                 ),
             ))
+        self.assertEqual(self.read_bsons(), [
+            dict(
+                sample_time=ts(100),
+                temp_c=999,
+                rh_pct=666,
+                ingest_time=ts(101),
+            ),
+            dict(
+                sample_time=ts(125),
+                temp_c=333,
+                rh_pct=777,
+                ingest_time=ts(200),
+            ),
+            dict(
+                sample_time=ts(1024),
+                temp_c=256,
+                rh_pct=128,
+                ingest_time=ts(4096),
+            ),
+        ])
+        self.assertEqual(self.read_index(), [
+            dict(entry_count=0, entry_key=ts(100), byte=0),
+            dict(entry_count=2, entry_key=ts(1024), byte=142),
+        ])
+
+    def test_lots_of_outside_writes(self):
+        with contextlib.closing(logfile.Logger(
+            str(self.log_path),
+            sample_field='key',
+            cached_entries=4,
+            index_gap=2,
+        )) as logger:
+            logger.write_rows(make_messages(10))
+            with contextlib.closing(logfile.Logger(
+                str(self.log_path),
+                sample_field='key',
+                cached_entries=4,
+                index_gap=2,
+            )) as other_logger:
+                other_logger.write_rows(make_messages(10, 10))
+                logger.write_rows(make_messages(1, 20))
+                self.assertEqual(logger.cached_data, (
+                    dict(key='0011', value=17),
+                    dict(key='0012', value=18),
+                    dict(key='0013', value=19),
+                    dict(key='0014', value=20),
+                ))
+                self.assertEqual(self.read_bsons(), make_messages(21))
+                self.assertEqual(self.read_index(), [
+                    dict(entry_count=0, entry_key='0000', byte=0),
+                    dict(entry_count=2, entry_key='0002', byte=60),
+                    dict(entry_count=4, entry_key='0004', byte=120),
+                    dict(entry_count=6, entry_key='0006', byte=180),
+                    dict(entry_count=8, entry_key='0008', byte=240),
+                    dict(entry_count=10, entry_key='000a', byte=300),
+                    dict(entry_count=12, entry_key='000c', byte=360),
+                    dict(entry_count=14, entry_key='000e', byte=420),
+                    dict(entry_count=16, entry_key='0010', byte=480),
+                    dict(entry_count=18, entry_key='0012', byte=540),
+                    dict(entry_count=20, entry_key='0014', byte=600),
+                ])
+
+    def test_outside_writes_that_dont_update_index(self):
+        with contextlib.closing(logfile.Logger(
+            str(self.log_path),
+            sample_field='key',
+            cached_entries=4,
+            index_gap=2,
+        )) as logger:
+            logger.write_rows(make_messages(10))
+            with self.log_path.open('ab') as outfile:
+                for item in make_messages(16, 256):
+                    outfile.write(common.bson_encode(item))
+            self.assertEqual(logger.cached_data, (
+                dict(key='010c', value=268),
+                dict(key='010d', value=269),
+                dict(key='010e', value=270),
+                dict(key='010f', value=271),
+            ))
+        self.assertEqual(self.read_bsons(), [
+            dict(key='0000', value=0),
+            dict(key='0001', value=1),
+            dict(key='0002', value=2),
+            dict(key='0003', value=3),
+            dict(key='0004', value=4),
+            dict(key='0005', value=5),
+            dict(key='0006', value=6),
+            dict(key='0007', value=7),
+            dict(key='0008', value=8),
+            dict(key='0009', value=9),
+            dict(key='0100', value=256),
+            dict(key='0101', value=257),
+            dict(key='0102', value=258),
+            dict(key='0103', value=259),
+            dict(key='0104', value=260),
+            dict(key='0105', value=261),
+            dict(key='0106', value=262),
+            dict(key='0107', value=263),
+            dict(key='0108', value=264),
+            dict(key='0109', value=265),
+            dict(key='010a', value=266),
+            dict(key='010b', value=267),
+            dict(key='010c', value=268),
+            dict(key='010d', value=269),
+            dict(key='010e', value=270),
+            dict(key='010f', value=271),
+        ])
+        self.assertEqual(self.read_index(), [
+            dict(entry_count=0, entry_key='0000', byte=0),
+            dict(entry_count=2, entry_key='0002', byte=60),
+            dict(entry_count=4, entry_key='0004', byte=120),
+            dict(entry_count=6, entry_key='0006', byte=180),
+            dict(entry_count=8, entry_key='0008', byte=240),
+        ])
+
+    def test_outside_write_but_dont_skip(self):
+        with contextlib.closing(logfile.Logger(
+            str(self.log_path),
+            sample_field='key',
+            cached_entries=10,
+            index_gap=4,
+        )) as logger:
+            logger.write_rows(make_messages(10))
+            with contextlib.closing(logfile.Logger(
+                str(self.log_path),
+                sample_field='key',
+                cached_entries=10,
+                index_gap=4,
+            )) as other_logger:
+                other_logger.write_rows(make_messages(5, 10))
+                self.assertEqual(logger.cached_data, (
+                    dict(key='0005', value=5),
+                    dict(key='0006', value=6),
+                    dict(key='0007', value=7),
+                    dict(key='0008', value=8),
+                    dict(key='0009', value=9),
+                    dict(key='000a', value=10),
+                    dict(key='000b', value=11),
+                    dict(key='000c', value=12),
+                    dict(key='000d', value=13),
+                    dict(key='000e', value=14),
+                ))
+                logger.write_rows(make_messages(5, 15))
+                self.assertEqual(self.read_index(), [
+                    dict(entry_count=0, entry_key='0000', byte=0),
+                    dict(entry_count=4, entry_key='0004', byte=120),
+                    dict(entry_count=8, entry_key='0008', byte=240),
+                    dict(entry_count=12, entry_key='000c', byte=360),
+                    dict(entry_count=16, entry_key='0010', byte=480),
+                ])
+                logger.write_rows(make_messages(1, 20))
+                self.assertEqual(self.read_index(), [
+                    dict(entry_count=0, entry_key='0000', byte=0),
+                    dict(entry_count=4, entry_key='0004', byte=120),
+                    dict(entry_count=8, entry_key='0008', byte=240),
+                    dict(entry_count=12, entry_key='000c', byte=360),
+                    dict(entry_count=16, entry_key='0010', byte=480),
+                    dict(entry_count=20, entry_key='0014', byte=600),
+                ])
+
+    def test_start_and_skip(self):
+        with contextlib.closing(logfile.Logger(
+            str(self.log_path),
+            sample_field='key',
+            cached_entries=4,
+            index_gap=2,
+        )) as logger:
+            logger.write_rows(make_messages(10))
+            with contextlib.closing(logfile.Logger(
+                str(self.log_path),
+                sample_field='key',
+                cached_entries=4,
+                index_gap=2,
+            )) as other_logger:
+                self.assertEqual(
+                    other_logger.cached_data, tuple(make_messages(4, 6)))
 
     def read_bsons(self):
         with self.log_path.open('rb') as infile:
             return bson.decode_all(
                 infile.read(), common.BSON_OPTIONS)
 
+    def read_index(self):
+        with self.index_path.open('rb') as infile:
+            return bson.decode_all(
+                infile.read(), common.BSON_OPTIONS)
+
+
+def make_messages(count, start=0):
+    return [
+        dict(key=format(n, '04x'), value=n)
+        for n in range(start, start + count)]
+
 
 if __name__ == '__main__':
     unittest.main()