comparison weather_server/logfile.py @ 31:9bc3687e1e5e

logfile: Add an index, and don't keep everything in RAM. - Adds index BSON file, updated upon writing. - Limits amount of data in RAM. - Gracefully handles writes that don't update index.
author Paul Fisher <paul@pfish.zone>
date Tue, 07 Jul 2020 19:51:30 -0400
parents 20c8ec56e447
children b77c8e7d2742
comparison
equal deleted inserted replaced
30:c760ab7f93c2 31:9bc3687e1e5e
1 """The part which handles writing things out and reading things in from CSV. 1 """The part which handles writing things out and reading things in from CSV.
2 """ 2 """
3 3
4 import attr
5 import collections
4 import concurrent.futures as futures 6 import concurrent.futures as futures
5 import contextlib 7 import contextlib
6 import fcntl 8 import fcntl
7 import os 9 import os
8 import queue 10 import queue
12 import bson 14 import bson
13 15
14 from . import common 16 from . import common
15 17
16 18
19 # The number of entries to keep in memory without reading from disk.
20 CACHED_ENTRIES = 16384
21
22 # How many entries to read before creating a new index entry.
23 INDEX_GAP = 4096
24
25
17 class _WriteRequest: 26 class _WriteRequest:
18 27
19 def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): 28 def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]):
20 """Creates a request to write the given data to the log.""" 29 """Creates a request to write the given data to the log."""
21 # The data to be written. We take ownership of all the dicts! 30 # The data to be written. We take ownership of all the dicts!
27 class _ReadRequest: 36 class _ReadRequest:
28 37
29 def __init__(self): 38 def __init__(self):
30 # The future that will be set with the log's contnets. 39 # The future that will be set with the log's contnets.
31 self.future = futures.Future() 40 self.future = futures.Future()
41 # TODO(pfish): Make it possible to read from a starting point.
32 42
33 43
34 # Poison pill to tell a logger thread to stop. 44 # Poison pill to tell a logger thread to stop.
35 _POISON = object() 45 _POISON = object()
36 46
37 47
38 class Logger: 48 class Logger:
39 """Logger which handles reading/writing one temperature log file.""" 49 """Logger which handles reading/writing one temperature log file."""
40 50
41 def __init__(self, filename: str, *, sample_field: str): 51 _file: t.BinaryIO
52 _index_file: t.BinaryIO
53
54 def __init__(
55 self,
56 filename: str,
57 *,
58 sample_field: str,
59 cached_entries: int = CACHED_ENTRIES,
60 index_gap: int = INDEX_GAP):
42 """Creates a new Logger for the given file. 61 """Creates a new Logger for the given file.
43 62
44 Args: 63 Args:
45 filename: The filename to open, or create if not already there. 64 filename: The filename to open, or create if not already there.
46 sample_field: The field name to use as the strictly-increasing 65 sample_field: The field name to use as the strictly-increasing
47 value to ensure that no duplicate writes occur. 66 value to ensure that no duplicate writes occur.
48 """ 67 """
68
69 self._filename = filename
70 self._cached_entries = cached_entries
71 self._index_gap = index_gap
72
73 self._queue = queue.SimpleQueue()
49 self._sample_field = sample_field 74 self._sample_field = sample_field
50 self._queue = queue.SimpleQueue() 75
51 # Create a Future that will be resolved once the file is opened 76 self._start_future = futures.Future()
52 # (or fails to be opened). 77 self._thread = threading.Thread(
53 writer_started = futures.Future()
54 self._writer_thread = threading.Thread(
55 name=f'{filename!r} writer thread', 78 name=f'{filename!r} writer thread',
56 target=lambda: _writer_thread( 79 target=self._writer_thread,
57 filename, self._queue, sample_field, writer_started),
58 daemon=True) 80 daemon=True)
59 self._writer_thread.start() 81 self._thread.start()
60 writer_started.result() 82
83 self._data = collections.deque(maxlen=self._cached_entries)
84 self._index: t.List[IndexEntry] = []
85 self._message_count = 0
86 self._start_future.result()
61 87
62 @property 88 @property
63 def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: 89 def cached_data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
64 req = _ReadRequest() 90 req = _ReadRequest()
65 self._queue.put(req) 91 self._queue.put(req)
66 return req.future.result() 92 return req.future.result()
67 93
68 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): 94 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]):
69 req = _WriteRequest(entries) 95 req = _WriteRequest(entries)
70 self._queue.put(req) 96 self._queue.put(req)
71 return req.future.result() 97 return req.future.result()
72 98
73 def __del__(self): 99 def _writer_thread(self) -> None:
100 try:
101 self._file = _open_or_create(self._filename)
102 self._index_file = _open_or_create(_index_filename(self._filename))
103 except BaseException as e:
104 self._start_future.set_exception(e)
105 return
106 self._start_future.set_result(None)
107
108 with self._file, self._index_file:
109 running = True
110 while running:
111 item = self._queue.get()
112 if item is _POISON:
113 # None is the poison pill that makes us stop.
114 running = False
115 elif isinstance(item, _ReadRequest):
116 if not item.future.set_running_or_notify_cancel():
117 continue
118 with _file_lock(self._file, fcntl.LOCK_SH):
119 try:
120 self._catch_up()
121 except BaseException as x:
122 item.future.set_exception(x)
123 else:
124 item.future.set_result(tuple(self._data))
125 elif isinstance(item, _WriteRequest):
126 if not item.future.set_running_or_notify_cancel():
127 continue
128 try:
129 with _file_lock(self._file, fcntl.LOCK_EX):
130 self._catch_up()
131 current_ptr = self._file.tell()
132 self._file.truncate(current_ptr)
133 if not self._data:
134 prev_key = None
135 else:
136 prev_key = self._data[-1][self._sample_field]
137 for entry in item.entries:
138 entry_key = entry[self._sample_field]
139 if prev_key is None or prev_key < entry_key:
140 self._data.append(entry)
141 self._file.write(common.bson_encode(entry))
142 prev_key = entry_key
143 self._file.flush()
144 self._update_index()
145 except BaseException as x:
146 item.future.set_exception(x)
147 else:
148 item.future.set_result(None)
149 else:
150 raise AssertionError(
151 f'Unexpected item {item!r} in the queue')
152
153 def _catch_up(self) -> None:
154 # Preconditions:
155 # - At least a read lock is held.
156 # - File pointer is at the end of the last-read entry.
157 self._catch_up_index()
158 if self._index:
159 # Since we have an index, use it to find a starting place.
160 last_idx = self._index[-1]
161 # Figure out, based on the number of entries we want to keep
162 # in memory, where we should start reading from.
163 read_start_count = max(
164 0, last_idx.entry_count - self._cached_entries)
165 if self._message_count < read_start_count:
166 # If we've already read past that starting point, we're OK.
167 for idx_entry in self._index:
168 if read_start_count <= idx_entry.entry_count:
169 break
170 starting_point = idx_entry
171 self._data.clear()
172 self._file.seek(starting_point.byte, os.SEEK_SET)
173 self._message_count = starting_point.entry_count
174 pointer = self._file.tell()
175 try:
176 items = bson.decode_file_iter(
177 self._file, codec_options=common.BSON_OPTIONS)
178 for item in items:
179 pointer = self._file.tell()
180 self._data.append(item)
181 self._message_count += 1
182 except bson.InvalidBSON:
183 pass # We have reached the last valid document. Bail.
184 # Seek back to immediately after the end of the last valid doc.
185 self._file.seek(pointer, os.SEEK_SET)
186
187 def _update_index(self) -> None:
188 # Preconditions:
189 # - File pointer is at the end of the last-written message.
190 # - Index pointer is at the last-read index value.
191 # - Exclusive lock is held.
192 with _dup_file(self._file) as update_fp:
193 if self._index:
194 last_idx = self._index[-1]
195 update_fp.seek(last_idx.byte)
196 current_count = last_idx.entry_count
197 decoder = bson.decode_file_iter(
198 update_fp, codec_options=common.BSON_OPTIONS)
199 try:
200 # Skip the current index entry.
201 next(decoder)
202 current_count += 1
203 current_byte = update_fp.tell()
204 except StopIteration:
205 # If there are no more entries, don't update the index.
206 return
207 else:
208 current_byte = 0
209 update_fp.seek(current_byte, os.SEEK_SET)
210 current_count = 0
211 entries = bson.decode_file_iter(
212 update_fp, codec_options=common.BSON_OPTIONS)
213 for entry in entries:
214 if current_count % self._index_gap == 0:
215 idx_entry = IndexEntry(
216 entry_count=current_count,
217 entry_key=entry[self._sample_field],
218 byte=current_byte)
219 self._index.append(idx_entry)
220 self._index_file.truncate()
221 self._index_file.write(
222 common.bson_encode(idx_entry.to_dict()))
223 current_count += 1
224 current_byte = update_fp.tell()
225 self._index_file.flush()
226
227 def _catch_up_index(self) -> None:
228 # Preconditions:
229 # - At least a read lock is held.
230 pointer = self._index_file.tell()
231 try:
232 index_entries = bson.decode_file_iter(
233 self._index_file, codec_options=common.BSON_OPTIONS)
234 for entry_dict in index_entries:
235 entry = IndexEntry.from_dict(entry_dict)
236 self._index.append(entry)
237 pointer = self._index_file.tell()
238 except bson.InvalidBSON:
239 pass # We have reached the last valid BSON document. Bail.
240 self._index_file.seek(pointer, os.SEEK_SET)
241
242 def __del__(self) -> None:
74 self.close() 243 self.close()
75 244
76 def close(self): 245 def close(self) -> None:
77 self._queue.put(_POISON) 246 self._queue.put(_POISON)
78 self._writer_thread.join() 247 self._thread.join()
79 248
80 249
81 def _writer_thread( 250 def _index_filename(filename: str) -> str:
82 filename: str, 251 return filename + '.index.bson'
83 q: queue.Queue,
84 sample_field: str,
85 started: futures.Future,
86 ) -> None:
87 if not started.set_running_or_notify_cancel():
88 return
89 try:
90 file = _open_or_create(filename)
91 started.set_result(None)
92 except BaseException as e:
93 started.set_exception(e)
94 return
95 with file:
96 running = True
97 data: t.List[t.Dict[str, object]] = []
98 while running:
99 item = q.get()
100 if item is _POISON:
101 # None is the poison pill that makes us stop.
102 running = False
103 elif isinstance(item, _ReadRequest):
104 if not item.future.set_running_or_notify_cancel():
105 continue
106 try:
107 with _file_lock(file, fcntl.LOCK_SH):
108 data.extend(_catch_up(file))
109 except BaseException as x:
110 item.future.set_exception(x)
111 else:
112 item.future.set_result(tuple(data))
113 elif isinstance(item, _WriteRequest):
114 if not item.future.set_running_or_notify_cancel():
115 continue
116 try:
117 with _file_lock(file, fcntl.LOCK_EX):
118 data.extend(_catch_up(file))
119 # Since we're at the last good point, truncate after.
120 file.truncate(file.tell())
121 if not data:
122 last = None
123 else:
124 last = data[-1][sample_field]
125 for entry in item.entries:
126 entry_key = entry[sample_field]
127 if last is None or last < entry_key:
128 file.write(common.bson_encode(entry))
129 data.append(entry)
130 last = entry_key
131 file.flush()
132 except BaseException as x:
133 item.future.set_exception(x)
134 else:
135 item.future.set_result(None)
136 else:
137 raise AssertionError(
138 'Unexpected item {!r} in the queue'.format(item))
139 252
140 253
141 @contextlib.contextmanager 254 @contextlib.contextmanager
142 def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]: 255 def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]:
143 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' 256 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.'
150 263
151 def _size(file: t.BinaryIO) -> int: 264 def _size(file: t.BinaryIO) -> int:
152 return os.stat(file.fileno()).st_size 265 return os.stat(file.fileno()).st_size
153 266
154 267
155 def _catch_up(file: t.BinaryIO) -> t.Iterable[t.Dict[str, object]]: 268 T = t.TypeVar('T')
156 """Reads data and advances the file pointer to the end of the file.""" 269
157 size = _size(file) 270
158 pointer = file.tell() 271 @attr.s(auto_attribs=True, frozen=True, slots=True)
159 if size == pointer: 272 class IndexEntry:
160 return () 273 entry_count: int
161 output: t.List[t.Dict[str, object]] = [] 274 entry_key: object
162 try: 275 byte: int
163 items = bson.decode_file_iter( 276
164 file, codec_options=common.BSON_OPTIONS) 277 @classmethod
165 for item in items: 278 def from_dict(cls: t.Type[T], d: t.Dict[str, t.Any]) -> T:
166 pointer = file.tell() 279 return cls(**{f.name: d[f.name] for f in attr.fields(cls)})
167 output.append(item) 280
168 except bson.InvalidBSON: 281 def to_dict(self) -> t.Dict[str, object]:
169 pass # We have reached the last valid document. Bail. 282 return attr.asdict(self, recurse=False)
170 # Seek back to immediately after the end of the last valid doc.
171 file.seek(pointer, os.SEEK_SET)
172 return output
173 283
174 284
175 def _open_or_create(path: str) -> t.BinaryIO: 285 def _open_or_create(path: str) -> t.BinaryIO:
176 while True: 286 while True:
177 try: 287 try:
180 pass 290 pass
181 try: 291 try:
182 return open(path, 'x+b') 292 return open(path, 'x+b')
183 except FileExistsError: 293 except FileExistsError:
184 pass 294 pass
295
296
297 def _dup_file(file: t.BinaryIO) -> t.BinaryIO:
298 duplicate = os.dup(file.fileno())
299 return os.fdopen(duplicate, 'r+b')