Mercurial > personal > weather-server
comparison weather_server/logfile.py @ 31:9bc3687e1e5e
logfile: Add an index, and don't keep everything in RAM.
- Adds index BSON file, updated upon writing.
- Limits amount of data in RAM.
- Gracefully handles writes that don't update index.
| author | Paul Fisher <paul@pfish.zone> |
|---|---|
| date | Tue, 07 Jul 2020 19:51:30 -0400 |
| parents | 20c8ec56e447 |
| children | b77c8e7d2742 |
comparison
equal
deleted
inserted
replaced
| 30:c760ab7f93c2 | 31:9bc3687e1e5e |
|---|---|
| 1 """The part which handles writing things out and reading things in from CSV. | 1 """The part which handles writing things out and reading things in from CSV. |
| 2 """ | 2 """ |
| 3 | 3 |
| 4 import attr | |
| 5 import collections | |
| 4 import concurrent.futures as futures | 6 import concurrent.futures as futures |
| 5 import contextlib | 7 import contextlib |
| 6 import fcntl | 8 import fcntl |
| 7 import os | 9 import os |
| 8 import queue | 10 import queue |
| 12 import bson | 14 import bson |
| 13 | 15 |
| 14 from . import common | 16 from . import common |
| 15 | 17 |
| 16 | 18 |
| 19 # The number of entries to keep in memory without reading from disk. | |
| 20 CACHED_ENTRIES = 16384 | |
| 21 | |
| 22 # How many entries to read before creating a new index entry. | |
| 23 INDEX_GAP = 4096 | |
| 24 | |
| 25 | |
| 17 class _WriteRequest: | 26 class _WriteRequest: |
| 18 | 27 |
| 19 def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): | 28 def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): |
| 20 """Creates a request to write the given data to the log.""" | 29 """Creates a request to write the given data to the log.""" |
| 21 # The data to be written. We take ownership of all the dicts! | 30 # The data to be written. We take ownership of all the dicts! |
| 27 class _ReadRequest: | 36 class _ReadRequest: |
| 28 | 37 |
| 29 def __init__(self): | 38 def __init__(self): |
| 30 # The future that will be set with the log's contnets. | 39 # The future that will be set with the log's contnets. |
| 31 self.future = futures.Future() | 40 self.future = futures.Future() |
| 41 # TODO(pfish): Make it possible to read from a starting point. | |
| 32 | 42 |
| 33 | 43 |
| 34 # Poison pill to tell a logger thread to stop. | 44 # Poison pill to tell a logger thread to stop. |
| 35 _POISON = object() | 45 _POISON = object() |
| 36 | 46 |
| 37 | 47 |
| 38 class Logger: | 48 class Logger: |
| 39 """Logger which handles reading/writing one temperature log file.""" | 49 """Logger which handles reading/writing one temperature log file.""" |
| 40 | 50 |
| 41 def __init__(self, filename: str, *, sample_field: str): | 51 _file: t.BinaryIO |
| 52 _index_file: t.BinaryIO | |
| 53 | |
| 54 def __init__( | |
| 55 self, | |
| 56 filename: str, | |
| 57 *, | |
| 58 sample_field: str, | |
| 59 cached_entries: int = CACHED_ENTRIES, | |
| 60 index_gap: int = INDEX_GAP): | |
| 42 """Creates a new Logger for the given file. | 61 """Creates a new Logger for the given file. |
| 43 | 62 |
| 44 Args: | 63 Args: |
| 45 filename: The filename to open, or create if not already there. | 64 filename: The filename to open, or create if not already there. |
| 46 sample_field: The field name to use as the strictly-increasing | 65 sample_field: The field name to use as the strictly-increasing |
| 47 value to ensure that no duplicate writes occur. | 66 value to ensure that no duplicate writes occur. |
| 48 """ | 67 """ |
| 68 | |
| 69 self._filename = filename | |
| 70 self._cached_entries = cached_entries | |
| 71 self._index_gap = index_gap | |
| 72 | |
| 73 self._queue = queue.SimpleQueue() | |
| 49 self._sample_field = sample_field | 74 self._sample_field = sample_field |
| 50 self._queue = queue.SimpleQueue() | 75 |
| 51 # Create a Future that will be resolved once the file is opened | 76 self._start_future = futures.Future() |
| 52 # (or fails to be opened). | 77 self._thread = threading.Thread( |
| 53 writer_started = futures.Future() | |
| 54 self._writer_thread = threading.Thread( | |
| 55 name=f'{filename!r} writer thread', | 78 name=f'{filename!r} writer thread', |
| 56 target=lambda: _writer_thread( | 79 target=self._writer_thread, |
| 57 filename, self._queue, sample_field, writer_started), | |
| 58 daemon=True) | 80 daemon=True) |
| 59 self._writer_thread.start() | 81 self._thread.start() |
| 60 writer_started.result() | 82 |
| 83 self._data = collections.deque(maxlen=self._cached_entries) | |
| 84 self._index: t.List[IndexEntry] = [] | |
| 85 self._message_count = 0 | |
| 86 self._start_future.result() | |
| 61 | 87 |
| 62 @property | 88 @property |
| 63 def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: | 89 def cached_data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: |
| 64 req = _ReadRequest() | 90 req = _ReadRequest() |
| 65 self._queue.put(req) | 91 self._queue.put(req) |
| 66 return req.future.result() | 92 return req.future.result() |
| 67 | 93 |
| 68 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): | 94 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): |
| 69 req = _WriteRequest(entries) | 95 req = _WriteRequest(entries) |
| 70 self._queue.put(req) | 96 self._queue.put(req) |
| 71 return req.future.result() | 97 return req.future.result() |
| 72 | 98 |
| 73 def __del__(self): | 99 def _writer_thread(self) -> None: |
| 100 try: | |
| 101 self._file = _open_or_create(self._filename) | |
| 102 self._index_file = _open_or_create(_index_filename(self._filename)) | |
| 103 except BaseException as e: | |
| 104 self._start_future.set_exception(e) | |
| 105 return | |
| 106 self._start_future.set_result(None) | |
| 107 | |
| 108 with self._file, self._index_file: | |
| 109 running = True | |
| 110 while running: | |
| 111 item = self._queue.get() | |
| 112 if item is _POISON: | |
| 113 # None is the poison pill that makes us stop. | |
| 114 running = False | |
| 115 elif isinstance(item, _ReadRequest): | |
| 116 if not item.future.set_running_or_notify_cancel(): | |
| 117 continue | |
| 118 with _file_lock(self._file, fcntl.LOCK_SH): | |
| 119 try: | |
| 120 self._catch_up() | |
| 121 except BaseException as x: | |
| 122 item.future.set_exception(x) | |
| 123 else: | |
| 124 item.future.set_result(tuple(self._data)) | |
| 125 elif isinstance(item, _WriteRequest): | |
| 126 if not item.future.set_running_or_notify_cancel(): | |
| 127 continue | |
| 128 try: | |
| 129 with _file_lock(self._file, fcntl.LOCK_EX): | |
| 130 self._catch_up() | |
| 131 current_ptr = self._file.tell() | |
| 132 self._file.truncate(current_ptr) | |
| 133 if not self._data: | |
| 134 prev_key = None | |
| 135 else: | |
| 136 prev_key = self._data[-1][self._sample_field] | |
| 137 for entry in item.entries: | |
| 138 entry_key = entry[self._sample_field] | |
| 139 if prev_key is None or prev_key < entry_key: | |
| 140 self._data.append(entry) | |
| 141 self._file.write(common.bson_encode(entry)) | |
| 142 prev_key = entry_key | |
| 143 self._file.flush() | |
| 144 self._update_index() | |
| 145 except BaseException as x: | |
| 146 item.future.set_exception(x) | |
| 147 else: | |
| 148 item.future.set_result(None) | |
| 149 else: | |
| 150 raise AssertionError( | |
| 151 f'Unexpected item {item!r} in the queue') | |
| 152 | |
| 153 def _catch_up(self) -> None: | |
| 154 # Preconditions: | |
| 155 # - At least a read lock is held. | |
| 156 # - File pointer is at the end of the last-read entry. | |
| 157 self._catch_up_index() | |
| 158 if self._index: | |
| 159 # Since we have an index, use it to find a starting place. | |
| 160 last_idx = self._index[-1] | |
| 161 # Figure out, based on the number of entries we want to keep | |
| 162 # in memory, where we should start reading from. | |
| 163 read_start_count = max( | |
| 164 0, last_idx.entry_count - self._cached_entries) | |
| 165 if self._message_count < read_start_count: | |
| 166 # If we've already read past that starting point, we're OK. | |
| 167 for idx_entry in self._index: | |
| 168 if read_start_count <= idx_entry.entry_count: | |
| 169 break | |
| 170 starting_point = idx_entry | |
| 171 self._data.clear() | |
| 172 self._file.seek(starting_point.byte, os.SEEK_SET) | |
| 173 self._message_count = starting_point.entry_count | |
| 174 pointer = self._file.tell() | |
| 175 try: | |
| 176 items = bson.decode_file_iter( | |
| 177 self._file, codec_options=common.BSON_OPTIONS) | |
| 178 for item in items: | |
| 179 pointer = self._file.tell() | |
| 180 self._data.append(item) | |
| 181 self._message_count += 1 | |
| 182 except bson.InvalidBSON: | |
| 183 pass # We have reached the last valid document. Bail. | |
| 184 # Seek back to immediately after the end of the last valid doc. | |
| 185 self._file.seek(pointer, os.SEEK_SET) | |
| 186 | |
| 187 def _update_index(self) -> None: | |
| 188 # Preconditions: | |
| 189 # - File pointer is at the end of the last-written message. | |
| 190 # - Index pointer is at the last-read index value. | |
| 191 # - Exclusive lock is held. | |
| 192 with _dup_file(self._file) as update_fp: | |
| 193 if self._index: | |
| 194 last_idx = self._index[-1] | |
| 195 update_fp.seek(last_idx.byte) | |
| 196 current_count = last_idx.entry_count | |
| 197 decoder = bson.decode_file_iter( | |
| 198 update_fp, codec_options=common.BSON_OPTIONS) | |
| 199 try: | |
| 200 # Skip the current index entry. | |
| 201 next(decoder) | |
| 202 current_count += 1 | |
| 203 current_byte = update_fp.tell() | |
| 204 except StopIteration: | |
| 205 # If there are no more entries, don't update the index. | |
| 206 return | |
| 207 else: | |
| 208 current_byte = 0 | |
| 209 update_fp.seek(current_byte, os.SEEK_SET) | |
| 210 current_count = 0 | |
| 211 entries = bson.decode_file_iter( | |
| 212 update_fp, codec_options=common.BSON_OPTIONS) | |
| 213 for entry in entries: | |
| 214 if current_count % self._index_gap == 0: | |
| 215 idx_entry = IndexEntry( | |
| 216 entry_count=current_count, | |
| 217 entry_key=entry[self._sample_field], | |
| 218 byte=current_byte) | |
| 219 self._index.append(idx_entry) | |
| 220 self._index_file.truncate() | |
| 221 self._index_file.write( | |
| 222 common.bson_encode(idx_entry.to_dict())) | |
| 223 current_count += 1 | |
| 224 current_byte = update_fp.tell() | |
| 225 self._index_file.flush() | |
| 226 | |
| 227 def _catch_up_index(self) -> None: | |
| 228 # Preconditions: | |
| 229 # - At least a read lock is held. | |
| 230 pointer = self._index_file.tell() | |
| 231 try: | |
| 232 index_entries = bson.decode_file_iter( | |
| 233 self._index_file, codec_options=common.BSON_OPTIONS) | |
| 234 for entry_dict in index_entries: | |
| 235 entry = IndexEntry.from_dict(entry_dict) | |
| 236 self._index.append(entry) | |
| 237 pointer = self._index_file.tell() | |
| 238 except bson.InvalidBSON: | |
| 239 pass # We have reached the last valid BSON document. Bail. | |
| 240 self._index_file.seek(pointer, os.SEEK_SET) | |
| 241 | |
| 242 def __del__(self) -> None: | |
| 74 self.close() | 243 self.close() |
| 75 | 244 |
| 76 def close(self): | 245 def close(self) -> None: |
| 77 self._queue.put(_POISON) | 246 self._queue.put(_POISON) |
| 78 self._writer_thread.join() | 247 self._thread.join() |
| 79 | 248 |
| 80 | 249 |
| 81 def _writer_thread( | 250 def _index_filename(filename: str) -> str: |
| 82 filename: str, | 251 return filename + '.index.bson' |
| 83 q: queue.Queue, | |
| 84 sample_field: str, | |
| 85 started: futures.Future, | |
| 86 ) -> None: | |
| 87 if not started.set_running_or_notify_cancel(): | |
| 88 return | |
| 89 try: | |
| 90 file = _open_or_create(filename) | |
| 91 started.set_result(None) | |
| 92 except BaseException as e: | |
| 93 started.set_exception(e) | |
| 94 return | |
| 95 with file: | |
| 96 running = True | |
| 97 data: t.List[t.Dict[str, object]] = [] | |
| 98 while running: | |
| 99 item = q.get() | |
| 100 if item is _POISON: | |
| 101 # None is the poison pill that makes us stop. | |
| 102 running = False | |
| 103 elif isinstance(item, _ReadRequest): | |
| 104 if not item.future.set_running_or_notify_cancel(): | |
| 105 continue | |
| 106 try: | |
| 107 with _file_lock(file, fcntl.LOCK_SH): | |
| 108 data.extend(_catch_up(file)) | |
| 109 except BaseException as x: | |
| 110 item.future.set_exception(x) | |
| 111 else: | |
| 112 item.future.set_result(tuple(data)) | |
| 113 elif isinstance(item, _WriteRequest): | |
| 114 if not item.future.set_running_or_notify_cancel(): | |
| 115 continue | |
| 116 try: | |
| 117 with _file_lock(file, fcntl.LOCK_EX): | |
| 118 data.extend(_catch_up(file)) | |
| 119 # Since we're at the last good point, truncate after. | |
| 120 file.truncate(file.tell()) | |
| 121 if not data: | |
| 122 last = None | |
| 123 else: | |
| 124 last = data[-1][sample_field] | |
| 125 for entry in item.entries: | |
| 126 entry_key = entry[sample_field] | |
| 127 if last is None or last < entry_key: | |
| 128 file.write(common.bson_encode(entry)) | |
| 129 data.append(entry) | |
| 130 last = entry_key | |
| 131 file.flush() | |
| 132 except BaseException as x: | |
| 133 item.future.set_exception(x) | |
| 134 else: | |
| 135 item.future.set_result(None) | |
| 136 else: | |
| 137 raise AssertionError( | |
| 138 'Unexpected item {!r} in the queue'.format(item)) | |
| 139 | 252 |
| 140 | 253 |
| 141 @contextlib.contextmanager | 254 @contextlib.contextmanager |
| 142 def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]: | 255 def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]: |
| 143 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' | 256 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' |
| 150 | 263 |
| 151 def _size(file: t.BinaryIO) -> int: | 264 def _size(file: t.BinaryIO) -> int: |
| 152 return os.stat(file.fileno()).st_size | 265 return os.stat(file.fileno()).st_size |
| 153 | 266 |
| 154 | 267 |
| 155 def _catch_up(file: t.BinaryIO) -> t.Iterable[t.Dict[str, object]]: | 268 T = t.TypeVar('T') |
| 156 """Reads data and advances the file pointer to the end of the file.""" | 269 |
| 157 size = _size(file) | 270 |
| 158 pointer = file.tell() | 271 @attr.s(auto_attribs=True, frozen=True, slots=True) |
| 159 if size == pointer: | 272 class IndexEntry: |
| 160 return () | 273 entry_count: int |
| 161 output: t.List[t.Dict[str, object]] = [] | 274 entry_key: object |
| 162 try: | 275 byte: int |
| 163 items = bson.decode_file_iter( | 276 |
| 164 file, codec_options=common.BSON_OPTIONS) | 277 @classmethod |
| 165 for item in items: | 278 def from_dict(cls: t.Type[T], d: t.Dict[str, t.Any]) -> T: |
| 166 pointer = file.tell() | 279 return cls(**{f.name: d[f.name] for f in attr.fields(cls)}) |
| 167 output.append(item) | 280 |
| 168 except bson.InvalidBSON: | 281 def to_dict(self) -> t.Dict[str, object]: |
| 169 pass # We have reached the last valid document. Bail. | 282 return attr.asdict(self, recurse=False) |
| 170 # Seek back to immediately after the end of the last valid doc. | |
| 171 file.seek(pointer, os.SEEK_SET) | |
| 172 return output | |
| 173 | 283 |
| 174 | 284 |
| 175 def _open_or_create(path: str) -> t.BinaryIO: | 285 def _open_or_create(path: str) -> t.BinaryIO: |
| 176 while True: | 286 while True: |
| 177 try: | 287 try: |
| 180 pass | 290 pass |
| 181 try: | 291 try: |
| 182 return open(path, 'x+b') | 292 return open(path, 'x+b') |
| 183 except FileExistsError: | 293 except FileExistsError: |
| 184 pass | 294 pass |
| 295 | |
| 296 | |
| 297 def _dup_file(file: t.BinaryIO) -> t.BinaryIO: | |
| 298 duplicate = os.dup(file.fileno()) | |
| 299 return os.fdopen(duplicate, 'r+b') |
