diff weather_server/logfile.py @ 31:9bc3687e1e5e

logfile: Add an index, and don't keep everything in RAM. - Adds index BSON file, updated upon writing. - Limits amount of data in RAM. - Gracefully handles writes that don't update index.
author Paul Fisher <paul@pfish.zone>
date Tue, 07 Jul 2020 19:51:30 -0400
parents 20c8ec56e447
children b77c8e7d2742
line wrap: on
line diff
--- a/weather_server/logfile.py	Tue May 19 10:15:29 2020 -0400
+++ b/weather_server/logfile.py	Tue Jul 07 19:51:30 2020 -0400
@@ -1,6 +1,8 @@
 """The part which handles writing things out and reading things in from CSV.
 """
 
+import attr
+import collections
 import concurrent.futures as futures
 import contextlib
 import fcntl
@@ -14,6 +16,13 @@
 from . import common
 
 
+# The number of entries to keep in memory without reading from disk.
+CACHED_ENTRIES = 16384
+
+# How many entries to read before creating a new index entry.
+INDEX_GAP = 4096
+
+
 class _WriteRequest:
 
     def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]):
@@ -29,6 +38,7 @@
     def __init__(self):
         # The future that will be set with the log's contnets.
         self.future = futures.Future()
+        # TODO(pfish): Make it possible to read from a starting point.
 
 
 # Poison pill to tell a logger thread to stop.
@@ -38,7 +48,16 @@
 class Logger:
     """Logger which handles reading/writing one temperature log file."""
 
-    def __init__(self, filename: str, *, sample_field: str):
+    _file: t.BinaryIO
+    _index_file: t.BinaryIO
+
+    def __init__(
+            self,
+            filename: str,
+            *,
+            sample_field: str,
+            cached_entries: int = CACHED_ENTRIES,
+            index_gap: int = INDEX_GAP):
         """Creates a new Logger for the given file.
 
         Args:
@@ -46,21 +65,28 @@
             sample_field: The field name to use as the strictly-increasing
                 value to ensure that no duplicate writes occur.
         """
-        self._sample_field = sample_field
+
+        self._filename = filename
+        self._cached_entries = cached_entries
+        self._index_gap = index_gap
+
         self._queue = queue.SimpleQueue()
-        # Create a Future that will be resolved once the file is opened
-        # (or fails to be opened).
-        writer_started = futures.Future()
-        self._writer_thread = threading.Thread(
+        self._sample_field = sample_field
+
+        self._start_future = futures.Future()
+        self._thread = threading.Thread(
             name=f'{filename!r} writer thread',
-            target=lambda: _writer_thread(
-                filename, self._queue, sample_field, writer_started),
+            target=self._writer_thread,
             daemon=True)
-        self._writer_thread.start()
-        writer_started.result()
+        self._thread.start()
+
+        self._data = collections.deque(maxlen=self._cached_entries)
+        self._index: t.List[IndexEntry] = []
+        self._message_count = 0
+        self._start_future.result()
 
     @property
-    def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
+    def cached_data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
         req = _ReadRequest()
         self._queue.put(req)
         return req.future.result()
@@ -70,72 +96,159 @@
         self._queue.put(req)
         return req.future.result()
 
-    def __del__(self):
+    def _writer_thread(self) -> None:
+        try:
+            self._file = _open_or_create(self._filename)
+            self._index_file = _open_or_create(_index_filename(self._filename))
+        except BaseException as e:
+            self._start_future.set_exception(e)
+            return
+        self._start_future.set_result(None)
+
+        with self._file, self._index_file:
+            running = True
+            while running:
+                item = self._queue.get()
+                if item is _POISON:
+                    # None is the poison pill that makes us stop.
+                    running = False
+                elif isinstance(item, _ReadRequest):
+                    if not item.future.set_running_or_notify_cancel():
+                        continue
+                    with _file_lock(self._file, fcntl.LOCK_SH):
+                        try:
+                            self._catch_up()
+                        except BaseException as x:
+                            item.future.set_exception(x)
+                        else:
+                            item.future.set_result(tuple(self._data))
+                elif isinstance(item, _WriteRequest):
+                    if not item.future.set_running_or_notify_cancel():
+                        continue
+                    try:
+                        with _file_lock(self._file, fcntl.LOCK_EX):
+                            self._catch_up()
+                            current_ptr = self._file.tell()
+                            self._file.truncate(current_ptr)
+                            if not self._data:
+                                prev_key = None
+                            else:
+                                prev_key = self._data[-1][self._sample_field]
+                            for entry in item.entries:
+                                entry_key = entry[self._sample_field]
+                                if prev_key is None or prev_key < entry_key:
+                                    self._data.append(entry)
+                                    self._file.write(common.bson_encode(entry))
+                                    prev_key = entry_key
+                            self._file.flush()
+                            self._update_index()
+                    except BaseException as x:
+                        item.future.set_exception(x)
+                    else:
+                        item.future.set_result(None)
+                else:
+                    raise AssertionError(
+                        f'Unexpected item {item!r} in the queue')
+
+    def _catch_up(self) -> None:
+        # Preconditions:
+        # - At least a read lock is held.
+        # - File pointer is at the end of the last-read entry.
+        self._catch_up_index()
+        if self._index:
+            # Since we have an index, use it to find a starting place.
+            last_idx = self._index[-1]
+            # Figure out, based on the number of entries we want to keep
+            # in memory, where we should start reading from.
+            read_start_count = max(
+                0, last_idx.entry_count - self._cached_entries)
+            if self._message_count < read_start_count:
+                # If we've already read past that starting point, we're OK.
+                for idx_entry in self._index:
+                    if read_start_count <= idx_entry.entry_count:
+                        break
+                    starting_point = idx_entry
+                self._data.clear()
+                self._file.seek(starting_point.byte, os.SEEK_SET)
+                self._message_count = starting_point.entry_count
+        pointer = self._file.tell()
+        try:
+            items = bson.decode_file_iter(
+                self._file, codec_options=common.BSON_OPTIONS)
+            for item in items:
+                pointer = self._file.tell()
+                self._data.append(item)
+                self._message_count += 1
+        except bson.InvalidBSON:
+            pass  # We have reached the last valid document. Bail.
+        # Seek back to immediately after the end of the last valid doc.
+        self._file.seek(pointer, os.SEEK_SET)
+
+    def _update_index(self) -> None:
+        # Preconditions:
+        # - File pointer is at the end of the last-written message.
+        # - Index pointer is at the last-read index value.
+        # - Exclusive lock is held.
+        with _dup_file(self._file) as update_fp:
+            if self._index:
+                last_idx = self._index[-1]
+                update_fp.seek(last_idx.byte)
+                current_count = last_idx.entry_count
+                decoder = bson.decode_file_iter(
+                    update_fp, codec_options=common.BSON_OPTIONS)
+                try:
+                    # Skip the current index entry.
+                    next(decoder)
+                    current_count += 1
+                    current_byte = update_fp.tell()
+                except StopIteration:
+                    # If there are no more entries, don't update the index.
+                    return
+            else:
+                current_byte = 0
+                update_fp.seek(current_byte, os.SEEK_SET)
+                current_count = 0
+            entries = bson.decode_file_iter(
+                update_fp, codec_options=common.BSON_OPTIONS)
+            for entry in entries:
+                if current_count % self._index_gap == 0:
+                    idx_entry = IndexEntry(
+                        entry_count=current_count,
+                        entry_key=entry[self._sample_field],
+                        byte=current_byte)
+                    self._index.append(idx_entry)
+                    self._index_file.truncate()
+                    self._index_file.write(
+                        common.bson_encode(idx_entry.to_dict()))
+                current_count += 1
+                current_byte = update_fp.tell()
+        self._index_file.flush()
+
+    def _catch_up_index(self) -> None:
+        # Preconditions:
+        # - At least a read lock is held.
+        pointer = self._index_file.tell()
+        try:
+            index_entries = bson.decode_file_iter(
+                self._index_file, codec_options=common.BSON_OPTIONS)
+            for entry_dict in index_entries:
+                entry = IndexEntry.from_dict(entry_dict)
+                self._index.append(entry)
+                pointer = self._index_file.tell()
+        except bson.InvalidBSON:
+            pass  # We have reached the last valid BSON document. Bail.
+        self._index_file.seek(pointer, os.SEEK_SET)
+
+    def __del__(self) -> None:
         self.close()
 
-    def close(self):
+    def close(self) -> None:
         self._queue.put(_POISON)
-        self._writer_thread.join()
+        self._thread.join()
 
 
-def _writer_thread(
-    filename: str,
-    q: queue.Queue,
-    sample_field: str,
-    started: futures.Future,
-) -> None:
-    if not started.set_running_or_notify_cancel():
-        return
-    try:
-        file = _open_or_create(filename)
-        started.set_result(None)
-    except BaseException as e:
-        started.set_exception(e)
-        return
-    with file:
-        running = True
-        data: t.List[t.Dict[str, object]] = []
-        while running:
-            item = q.get()
-            if item is _POISON:
-                # None is the poison pill that makes us stop.
-                running = False
-            elif isinstance(item, _ReadRequest):
-                if not item.future.set_running_or_notify_cancel():
-                    continue
-                try:
-                    with _file_lock(file, fcntl.LOCK_SH):
-                        data.extend(_catch_up(file))
-                except BaseException as x:
-                    item.future.set_exception(x)
-                else:
-                    item.future.set_result(tuple(data))
-            elif isinstance(item, _WriteRequest):
-                if not item.future.set_running_or_notify_cancel():
-                    continue
-                try:
-                    with _file_lock(file, fcntl.LOCK_EX):
-                        data.extend(_catch_up(file))
-                        # Since we're at the last good point, truncate after.
-                        file.truncate(file.tell())
-                        if not data:
-                            last = None
-                        else:
-                            last = data[-1][sample_field]
-                        for entry in item.entries:
-                            entry_key = entry[sample_field]
-                            if last is None or last < entry_key:
-                                file.write(common.bson_encode(entry))
-                                data.append(entry)
-                                last = entry_key
-                        file.flush()
-                except BaseException as x:
-                    item.future.set_exception(x)
-                else:
-                    item.future.set_result(None)
-            else:
-                raise AssertionError(
-                    'Unexpected item {!r} in the queue'.format(item))
+def _index_filename(filename: str) -> str:
+    return filename + '.index.bson'
 
 
 @contextlib.contextmanager
@@ -152,24 +265,21 @@
     return os.stat(file.fileno()).st_size
 
 
-def _catch_up(file: t.BinaryIO) -> t.Iterable[t.Dict[str, object]]:
-    """Reads data and advances the file pointer to the end of the file."""
-    size = _size(file)
-    pointer = file.tell()
-    if size == pointer:
-        return ()
-    output: t.List[t.Dict[str, object]] = []
-    try:
-        items = bson.decode_file_iter(
-            file, codec_options=common.BSON_OPTIONS)
-        for item in items:
-            pointer = file.tell()
-            output.append(item)
-    except bson.InvalidBSON:
-        pass  # We have reached the last valid document.  Bail.
-    # Seek back to immediately after the end of the last valid doc.
-    file.seek(pointer, os.SEEK_SET)
-    return output
+T = t.TypeVar('T')
+
+
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class IndexEntry:
+    entry_count: int
+    entry_key: object
+    byte: int
+
+    @classmethod
+    def from_dict(cls: t.Type[T], d: t.Dict[str, t.Any]) -> T:
+        return cls(**{f.name: d[f.name] for f in attr.fields(cls)})
+
+    def to_dict(self) -> t.Dict[str, object]:
+        return attr.asdict(self, recurse=False)
 
 
 def _open_or_create(path: str) -> t.BinaryIO:
@@ -182,3 +292,8 @@
             return open(path, 'x+b')
         except FileExistsError:
             pass
+
+
+def _dup_file(file: t.BinaryIO) -> t.BinaryIO:
+    duplicate = os.dup(file.fileno())
+    return os.fdopen(duplicate, 'r+b')