Mercurial > personal > weather-server
comparison weather_server/logfile.py @ 31:9bc3687e1e5e
logfile: Add an index, and don't keep everything in RAM.
- Adds index BSON file, updated upon writing.
- Limits amount of data in RAM.
- Gracefully handles writes that don't update index.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Tue, 07 Jul 2020 19:51:30 -0400 |
parents | 20c8ec56e447 |
children | b77c8e7d2742 |
comparison
equal
deleted
inserted
replaced
30:c760ab7f93c2 | 31:9bc3687e1e5e |
---|---|
1 """The part which handles writing things out and reading things in from CSV. | 1 """The part which handles writing things out and reading things in from CSV. |
2 """ | 2 """ |
3 | 3 |
4 import attr | |
5 import collections | |
4 import concurrent.futures as futures | 6 import concurrent.futures as futures |
5 import contextlib | 7 import contextlib |
6 import fcntl | 8 import fcntl |
7 import os | 9 import os |
8 import queue | 10 import queue |
12 import bson | 14 import bson |
13 | 15 |
14 from . import common | 16 from . import common |
15 | 17 |
16 | 18 |
19 # The number of entries to keep in memory without reading from disk. | |
20 CACHED_ENTRIES = 16384 | |
21 | |
22 # How many entries to read before creating a new index entry. | |
23 INDEX_GAP = 4096 | |
24 | |
25 | |
17 class _WriteRequest: | 26 class _WriteRequest: |
18 | 27 |
19 def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): | 28 def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): |
20 """Creates a request to write the given data to the log.""" | 29 """Creates a request to write the given data to the log.""" |
21 # The data to be written. We take ownership of all the dicts! | 30 # The data to be written. We take ownership of all the dicts! |
27 class _ReadRequest: | 36 class _ReadRequest: |
28 | 37 |
29 def __init__(self): | 38 def __init__(self): |
30 # The future that will be set with the log's contnets. | 39 # The future that will be set with the log's contnets. |
31 self.future = futures.Future() | 40 self.future = futures.Future() |
41 # TODO(pfish): Make it possible to read from a starting point. | |
32 | 42 |
33 | 43 |
34 # Poison pill to tell a logger thread to stop. | 44 # Poison pill to tell a logger thread to stop. |
35 _POISON = object() | 45 _POISON = object() |
36 | 46 |
37 | 47 |
38 class Logger: | 48 class Logger: |
39 """Logger which handles reading/writing one temperature log file.""" | 49 """Logger which handles reading/writing one temperature log file.""" |
40 | 50 |
41 def __init__(self, filename: str, *, sample_field: str): | 51 _file: t.BinaryIO |
52 _index_file: t.BinaryIO | |
53 | |
54 def __init__( | |
55 self, | |
56 filename: str, | |
57 *, | |
58 sample_field: str, | |
59 cached_entries: int = CACHED_ENTRIES, | |
60 index_gap: int = INDEX_GAP): | |
42 """Creates a new Logger for the given file. | 61 """Creates a new Logger for the given file. |
43 | 62 |
44 Args: | 63 Args: |
45 filename: The filename to open, or create if not already there. | 64 filename: The filename to open, or create if not already there. |
46 sample_field: The field name to use as the strictly-increasing | 65 sample_field: The field name to use as the strictly-increasing |
47 value to ensure that no duplicate writes occur. | 66 value to ensure that no duplicate writes occur. |
48 """ | 67 """ |
68 | |
69 self._filename = filename | |
70 self._cached_entries = cached_entries | |
71 self._index_gap = index_gap | |
72 | |
73 self._queue = queue.SimpleQueue() | |
49 self._sample_field = sample_field | 74 self._sample_field = sample_field |
50 self._queue = queue.SimpleQueue() | 75 |
51 # Create a Future that will be resolved once the file is opened | 76 self._start_future = futures.Future() |
52 # (or fails to be opened). | 77 self._thread = threading.Thread( |
53 writer_started = futures.Future() | |
54 self._writer_thread = threading.Thread( | |
55 name=f'{filename!r} writer thread', | 78 name=f'{filename!r} writer thread', |
56 target=lambda: _writer_thread( | 79 target=self._writer_thread, |
57 filename, self._queue, sample_field, writer_started), | |
58 daemon=True) | 80 daemon=True) |
59 self._writer_thread.start() | 81 self._thread.start() |
60 writer_started.result() | 82 |
83 self._data = collections.deque(maxlen=self._cached_entries) | |
84 self._index: t.List[IndexEntry] = [] | |
85 self._message_count = 0 | |
86 self._start_future.result() | |
61 | 87 |
62 @property | 88 @property |
63 def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: | 89 def cached_data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: |
64 req = _ReadRequest() | 90 req = _ReadRequest() |
65 self._queue.put(req) | 91 self._queue.put(req) |
66 return req.future.result() | 92 return req.future.result() |
67 | 93 |
68 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): | 94 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): |
69 req = _WriteRequest(entries) | 95 req = _WriteRequest(entries) |
70 self._queue.put(req) | 96 self._queue.put(req) |
71 return req.future.result() | 97 return req.future.result() |
72 | 98 |
73 def __del__(self): | 99 def _writer_thread(self) -> None: |
100 try: | |
101 self._file = _open_or_create(self._filename) | |
102 self._index_file = _open_or_create(_index_filename(self._filename)) | |
103 except BaseException as e: | |
104 self._start_future.set_exception(e) | |
105 return | |
106 self._start_future.set_result(None) | |
107 | |
108 with self._file, self._index_file: | |
109 running = True | |
110 while running: | |
111 item = self._queue.get() | |
112 if item is _POISON: | |
113 # None is the poison pill that makes us stop. | |
114 running = False | |
115 elif isinstance(item, _ReadRequest): | |
116 if not item.future.set_running_or_notify_cancel(): | |
117 continue | |
118 with _file_lock(self._file, fcntl.LOCK_SH): | |
119 try: | |
120 self._catch_up() | |
121 except BaseException as x: | |
122 item.future.set_exception(x) | |
123 else: | |
124 item.future.set_result(tuple(self._data)) | |
125 elif isinstance(item, _WriteRequest): | |
126 if not item.future.set_running_or_notify_cancel(): | |
127 continue | |
128 try: | |
129 with _file_lock(self._file, fcntl.LOCK_EX): | |
130 self._catch_up() | |
131 current_ptr = self._file.tell() | |
132 self._file.truncate(current_ptr) | |
133 if not self._data: | |
134 prev_key = None | |
135 else: | |
136 prev_key = self._data[-1][self._sample_field] | |
137 for entry in item.entries: | |
138 entry_key = entry[self._sample_field] | |
139 if prev_key is None or prev_key < entry_key: | |
140 self._data.append(entry) | |
141 self._file.write(common.bson_encode(entry)) | |
142 prev_key = entry_key | |
143 self._file.flush() | |
144 self._update_index() | |
145 except BaseException as x: | |
146 item.future.set_exception(x) | |
147 else: | |
148 item.future.set_result(None) | |
149 else: | |
150 raise AssertionError( | |
151 f'Unexpected item {item!r} in the queue') | |
152 | |
153 def _catch_up(self) -> None: | |
154 # Preconditions: | |
155 # - At least a read lock is held. | |
156 # - File pointer is at the end of the last-read entry. | |
157 self._catch_up_index() | |
158 if self._index: | |
159 # Since we have an index, use it to find a starting place. | |
160 last_idx = self._index[-1] | |
161 # Figure out, based on the number of entries we want to keep | |
162 # in memory, where we should start reading from. | |
163 read_start_count = max( | |
164 0, last_idx.entry_count - self._cached_entries) | |
165 if self._message_count < read_start_count: | |
166 # If we've already read past that starting point, we're OK. | |
167 for idx_entry in self._index: | |
168 if read_start_count <= idx_entry.entry_count: | |
169 break | |
170 starting_point = idx_entry | |
171 self._data.clear() | |
172 self._file.seek(starting_point.byte, os.SEEK_SET) | |
173 self._message_count = starting_point.entry_count | |
174 pointer = self._file.tell() | |
175 try: | |
176 items = bson.decode_file_iter( | |
177 self._file, codec_options=common.BSON_OPTIONS) | |
178 for item in items: | |
179 pointer = self._file.tell() | |
180 self._data.append(item) | |
181 self._message_count += 1 | |
182 except bson.InvalidBSON: | |
183 pass # We have reached the last valid document. Bail. | |
184 # Seek back to immediately after the end of the last valid doc. | |
185 self._file.seek(pointer, os.SEEK_SET) | |
186 | |
187 def _update_index(self) -> None: | |
188 # Preconditions: | |
189 # - File pointer is at the end of the last-written message. | |
190 # - Index pointer is at the last-read index value. | |
191 # - Exclusive lock is held. | |
192 with _dup_file(self._file) as update_fp: | |
193 if self._index: | |
194 last_idx = self._index[-1] | |
195 update_fp.seek(last_idx.byte) | |
196 current_count = last_idx.entry_count | |
197 decoder = bson.decode_file_iter( | |
198 update_fp, codec_options=common.BSON_OPTIONS) | |
199 try: | |
200 # Skip the current index entry. | |
201 next(decoder) | |
202 current_count += 1 | |
203 current_byte = update_fp.tell() | |
204 except StopIteration: | |
205 # If there are no more entries, don't update the index. | |
206 return | |
207 else: | |
208 current_byte = 0 | |
209 update_fp.seek(current_byte, os.SEEK_SET) | |
210 current_count = 0 | |
211 entries = bson.decode_file_iter( | |
212 update_fp, codec_options=common.BSON_OPTIONS) | |
213 for entry in entries: | |
214 if current_count % self._index_gap == 0: | |
215 idx_entry = IndexEntry( | |
216 entry_count=current_count, | |
217 entry_key=entry[self._sample_field], | |
218 byte=current_byte) | |
219 self._index.append(idx_entry) | |
220 self._index_file.truncate() | |
221 self._index_file.write( | |
222 common.bson_encode(idx_entry.to_dict())) | |
223 current_count += 1 | |
224 current_byte = update_fp.tell() | |
225 self._index_file.flush() | |
226 | |
227 def _catch_up_index(self) -> None: | |
228 # Preconditions: | |
229 # - At least a read lock is held. | |
230 pointer = self._index_file.tell() | |
231 try: | |
232 index_entries = bson.decode_file_iter( | |
233 self._index_file, codec_options=common.BSON_OPTIONS) | |
234 for entry_dict in index_entries: | |
235 entry = IndexEntry.from_dict(entry_dict) | |
236 self._index.append(entry) | |
237 pointer = self._index_file.tell() | |
238 except bson.InvalidBSON: | |
239 pass # We have reached the last valid BSON document. Bail. | |
240 self._index_file.seek(pointer, os.SEEK_SET) | |
241 | |
242 def __del__(self) -> None: | |
74 self.close() | 243 self.close() |
75 | 244 |
76 def close(self): | 245 def close(self) -> None: |
77 self._queue.put(_POISON) | 246 self._queue.put(_POISON) |
78 self._writer_thread.join() | 247 self._thread.join() |
79 | 248 |
80 | 249 |
81 def _writer_thread( | 250 def _index_filename(filename: str) -> str: |
82 filename: str, | 251 return filename + '.index.bson' |
83 q: queue.Queue, | |
84 sample_field: str, | |
85 started: futures.Future, | |
86 ) -> None: | |
87 if not started.set_running_or_notify_cancel(): | |
88 return | |
89 try: | |
90 file = _open_or_create(filename) | |
91 started.set_result(None) | |
92 except BaseException as e: | |
93 started.set_exception(e) | |
94 return | |
95 with file: | |
96 running = True | |
97 data: t.List[t.Dict[str, object]] = [] | |
98 while running: | |
99 item = q.get() | |
100 if item is _POISON: | |
101 # None is the poison pill that makes us stop. | |
102 running = False | |
103 elif isinstance(item, _ReadRequest): | |
104 if not item.future.set_running_or_notify_cancel(): | |
105 continue | |
106 try: | |
107 with _file_lock(file, fcntl.LOCK_SH): | |
108 data.extend(_catch_up(file)) | |
109 except BaseException as x: | |
110 item.future.set_exception(x) | |
111 else: | |
112 item.future.set_result(tuple(data)) | |
113 elif isinstance(item, _WriteRequest): | |
114 if not item.future.set_running_or_notify_cancel(): | |
115 continue | |
116 try: | |
117 with _file_lock(file, fcntl.LOCK_EX): | |
118 data.extend(_catch_up(file)) | |
119 # Since we're at the last good point, truncate after. | |
120 file.truncate(file.tell()) | |
121 if not data: | |
122 last = None | |
123 else: | |
124 last = data[-1][sample_field] | |
125 for entry in item.entries: | |
126 entry_key = entry[sample_field] | |
127 if last is None or last < entry_key: | |
128 file.write(common.bson_encode(entry)) | |
129 data.append(entry) | |
130 last = entry_key | |
131 file.flush() | |
132 except BaseException as x: | |
133 item.future.set_exception(x) | |
134 else: | |
135 item.future.set_result(None) | |
136 else: | |
137 raise AssertionError( | |
138 'Unexpected item {!r} in the queue'.format(item)) | |
139 | 252 |
140 | 253 |
141 @contextlib.contextmanager | 254 @contextlib.contextmanager |
142 def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]: | 255 def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]: |
143 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' | 256 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' |
150 | 263 |
151 def _size(file: t.BinaryIO) -> int: | 264 def _size(file: t.BinaryIO) -> int: |
152 return os.stat(file.fileno()).st_size | 265 return os.stat(file.fileno()).st_size |
153 | 266 |
154 | 267 |
155 def _catch_up(file: t.BinaryIO) -> t.Iterable[t.Dict[str, object]]: | 268 T = t.TypeVar('T') |
156 """Reads data and advances the file pointer to the end of the file.""" | 269 |
157 size = _size(file) | 270 |
158 pointer = file.tell() | 271 @attr.s(auto_attribs=True, frozen=True, slots=True) |
159 if size == pointer: | 272 class IndexEntry: |
160 return () | 273 entry_count: int |
161 output: t.List[t.Dict[str, object]] = [] | 274 entry_key: object |
162 try: | 275 byte: int |
163 items = bson.decode_file_iter( | 276 |
164 file, codec_options=common.BSON_OPTIONS) | 277 @classmethod |
165 for item in items: | 278 def from_dict(cls: t.Type[T], d: t.Dict[str, t.Any]) -> T: |
166 pointer = file.tell() | 279 return cls(**{f.name: d[f.name] for f in attr.fields(cls)}) |
167 output.append(item) | 280 |
168 except bson.InvalidBSON: | 281 def to_dict(self) -> t.Dict[str, object]: |
169 pass # We have reached the last valid document. Bail. | 282 return attr.asdict(self, recurse=False) |
170 # Seek back to immediately after the end of the last valid doc. | |
171 file.seek(pointer, os.SEEK_SET) | |
172 return output | |
173 | 283 |
174 | 284 |
175 def _open_or_create(path: str) -> t.BinaryIO: | 285 def _open_or_create(path: str) -> t.BinaryIO: |
176 while True: | 286 while True: |
177 try: | 287 try: |
180 pass | 290 pass |
181 try: | 291 try: |
182 return open(path, 'x+b') | 292 return open(path, 'x+b') |
183 except FileExistsError: | 293 except FileExistsError: |
184 pass | 294 pass |
295 | |
296 | |
297 def _dup_file(file: t.BinaryIO) -> t.BinaryIO: | |
298 duplicate = os.dup(file.fileno()) | |
299 return os.fdopen(duplicate, 'r+b') |