Mercurial > personal > weather-server
comparison weather_server/logfile.py @ 24:20c8ec56e447
logfile: Pull logfile thread out of Logger.
This enables automatic garbage collection of Logger instances,
since a running thread no longer has a reference to a Logger's self.
It separates exclusive management of logfile state into the
_writer_thread function, which now opens the file and writes it until
it is told to stop by receiving the poison pill.
| author | Paul Fisher <paul@pfish.zone> |
|---|---|
| date | Sun, 10 Nov 2019 23:07:11 -0500 |
| parents | beb42c835c52 |
| children | 9bc3687e1e5e |
comparison
equal
deleted
inserted
replaced
| 23:88249e451566 | 24:20c8ec56e447 |
|---|---|
| 29 def __init__(self): | 29 def __init__(self): |
| 30 # The future that will be set with the log's contnets. | 30 # The future that will be set with the log's contnets. |
| 31 self.future = futures.Future() | 31 self.future = futures.Future() |
| 32 | 32 |
| 33 | 33 |
| 34 # probably handle file-writing with a queue that reports back its progress | 34 # Poison pill to tell a logger thread to stop. |
| 35 _POISON = object() | |
| 36 | |
| 35 | 37 |
| 36 class Logger: | 38 class Logger: |
| 37 """Logger which handles reading/writing a temperature log for one process. | 39 """Logger which handles reading/writing one temperature log file.""" |
| 38 """ | |
| 39 | |
| 40 instance_lock = threading.Lock() | |
| 41 instances: t.Dict[str, 'Logger'] = {} | |
| 42 | |
| 43 @classmethod | |
| 44 def create( | |
| 45 cls, | |
| 46 filename: str, | |
| 47 *, | |
| 48 sample_field: str, | |
| 49 ) -> 'Logger': | |
| 50 """Creates a single shared instance of a logger for the given file.""" | |
| 51 try: | |
| 52 instance = cls.instances[filename] | |
| 53 except KeyError: | |
| 54 with cls.instance_lock: | |
| 55 try: | |
| 56 instance = cls.instances[filename] | |
| 57 except KeyError: | |
| 58 cls.instances[filename] = Logger( | |
| 59 filename, | |
| 60 sample_field=sample_field) | |
| 61 instance = cls.instances[filename] | |
| 62 if instance._sample_field != sample_field: | |
| 63 raise ValueError( | |
| 64 'Existing instance has different sample field: ' | |
| 65 '{!r} != {!r}'.format(instance._sample_field, sample_field)) | |
| 66 return instance | |
| 67 | 40 |
| 68 def __init__(self, filename: str, *, sample_field: str): | 41 def __init__(self, filename: str, *, sample_field: str): |
| 69 """You should probably call .create() instead.""" | 42 """Creates a new Logger for the given file. |
| 43 | |
| 44 Args: | |
| 45 filename: The filename to open, or create if not already there. | |
| 46 sample_field: The field name to use as the strictly-increasing | |
| 47 value to ensure that no duplicate writes occur. | |
| 48 """ | |
| 70 self._sample_field = sample_field | 49 self._sample_field = sample_field |
| 71 self._file = _open_or_create(filename) | |
| 72 self._data: t.List[t.Dict[str, t.Any], ...] = [] | |
| 73 self._queue = queue.SimpleQueue() | 50 self._queue = queue.SimpleQueue() |
| 74 self._last_size = 0 | 51 # Create a Future that will be resolved once the file is opened |
| 75 self._lock_status: t.Optional[int] = None | 52 # (or fails to be opened). |
| 76 self._writer_thread = threading.Thread(target=self._writer) | 53 writer_started = futures.Future() |
| 54 self._writer_thread = threading.Thread( | |
| 55 name=f'{filename!r} writer thread', | |
| 56 target=lambda: _writer_thread( | |
| 57 filename, self._queue, sample_field, writer_started), | |
| 58 daemon=True) | |
| 77 self._writer_thread.start() | 59 self._writer_thread.start() |
| 60 writer_started.result() | |
| 78 | 61 |
| 79 @property | 62 @property |
| 80 def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: | 63 def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: |
| 81 req = _ReadRequest() | 64 req = _ReadRequest() |
| 82 self._queue.put(req) | 65 self._queue.put(req) |
| 85 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): | 68 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): |
| 86 req = _WriteRequest(entries) | 69 req = _WriteRequest(entries) |
| 87 self._queue.put(req) | 70 self._queue.put(req) |
| 88 return req.future.result() | 71 return req.future.result() |
| 89 | 72 |
| 90 _POISON = object() | 73 def __del__(self): |
| 74 self.close() | |
| 91 | 75 |
| 92 def close(self): | 76 def close(self): |
| 93 self._queue.put(self._POISON) | 77 self._queue.put(_POISON) |
| 94 self._writer_thread.join() | 78 self._writer_thread.join() |
| 95 | 79 |
| 96 def _writer(self) -> None: | 80 |
| 81 def _writer_thread( | |
| 82 filename: str, | |
| 83 q: queue.Queue, | |
| 84 sample_field: str, | |
| 85 started: futures.Future, | |
| 86 ) -> None: | |
| 87 if not started.set_running_or_notify_cancel(): | |
| 88 return | |
| 89 try: | |
| 90 file = _open_or_create(filename) | |
| 91 started.set_result(None) | |
| 92 except BaseException as e: | |
| 93 started.set_exception(e) | |
| 94 return | |
| 95 with file: | |
| 97 running = True | 96 running = True |
| 97 data: t.List[t.Dict[str, object]] = [] | |
| 98 while running: | 98 while running: |
| 99 item = self._queue.get() | 99 item = q.get() |
| 100 if item is self._POISON: | 100 if item is _POISON: |
| 101 # None is the poison pill that makes us stop. | 101 # None is the poison pill that makes us stop. |
| 102 running = False | 102 running = False |
| 103 elif isinstance(item, _ReadRequest): | 103 elif isinstance(item, _ReadRequest): |
| 104 if not item.future.set_running_or_notify_cancel(): | 104 if not item.future.set_running_or_notify_cancel(): |
| 105 continue | 105 continue |
| 106 try: | 106 try: |
| 107 with self._file_lock(fcntl.LOCK_SH): | 107 with _file_lock(file, fcntl.LOCK_SH): |
| 108 self._catch_up() | 108 data.extend(_catch_up(file)) |
| 109 except BaseException as x: | 109 except BaseException as x: |
| 110 item.future.set_exception(x) | 110 item.future.set_exception(x) |
| 111 else: | 111 else: |
| 112 item.future.set_result(tuple(self._data)) | 112 item.future.set_result(tuple(data)) |
| 113 elif isinstance(item, _WriteRequest): | 113 elif isinstance(item, _WriteRequest): |
| 114 if not item.future.set_running_or_notify_cancel(): | 114 if not item.future.set_running_or_notify_cancel(): |
| 115 continue | 115 continue |
| 116 try: | 116 try: |
| 117 with self._file_lock(fcntl.LOCK_EX): | 117 with _file_lock(file, fcntl.LOCK_EX): |
| 118 self._catch_up() | 118 data.extend(_catch_up(file)) |
| 119 # Since we're at the last good point, truncate after. | 119 # Since we're at the last good point, truncate after. |
| 120 self._file.truncate(self._file.tell()) | 120 file.truncate(file.tell()) |
| 121 if not self._data: | 121 if not data: |
| 122 last = None | 122 last = None |
| 123 else: | 123 else: |
| 124 last = self._data[-1][self._sample_field] | 124 last = data[-1][sample_field] |
| 125 for entry in item.entries: | 125 for entry in item.entries: |
| 126 entry_key = entry[self._sample_field] | 126 entry_key = entry[sample_field] |
| 127 if last is None or last < entry_key: | 127 if last is None or last < entry_key: |
| 128 self._file.write(common.bson_encode(entry)) | 128 file.write(common.bson_encode(entry)) |
| 129 self._data.append(entry) | 129 data.append(entry) |
| 130 last = entry_key | 130 last = entry_key |
| 131 self._file.flush() | 131 file.flush() |
| 132 self._last_size = self._file.tell() | |
| 133 except BaseException as x: | 132 except BaseException as x: |
| 134 item.future.set_exception(x) | 133 item.future.set_exception(x) |
| 135 else: | 134 else: |
| 136 item.future.set_result(None) | 135 item.future.set_result(None) |
| 137 else: | 136 else: |
| 138 raise AssertionError( | 137 raise AssertionError( |
| 139 'Unexpected item {!r} in the queue'.format(item)) | 138 'Unexpected item {!r} in the queue'.format(item)) |
| 140 self._file.close() | |
| 141 | 139 |
| 142 def _catch_up(self) -> None: | |
| 143 """Reads data and advances the file pointer to the end of the file.""" | |
| 144 assert self._lock_status is not None, 'The lock must be held.' | |
| 145 size = self._size() | |
| 146 if size == self._last_size: | |
| 147 return | |
| 148 last_good = self._file.tell() | |
| 149 try: | |
| 150 items = bson.decode_file_iter( | |
| 151 self._file, codec_options=common.BSON_OPTIONS) | |
| 152 for item in items: | |
| 153 last_good = self._file.tell() | |
| 154 self._data.append(item) | |
| 155 except bson.InvalidBSON: | |
| 156 pass # We have reached the last valid document. Bail. | |
| 157 # Seek back to immediately after the end of the last valid doc. | |
| 158 self._last_size = last_good | |
| 159 self._file.seek(last_good, os.SEEK_SET) | |
| 160 | 140 |
| 161 def fileno(self) -> int: | 141 @contextlib.contextmanager |
| 162 return self._file.fileno() | 142 def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]: |
| 143 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' | |
| 144 fcntl.flock(file, operation) | |
| 145 try: | |
| 146 yield | |
| 147 finally: | |
| 148 fcntl.flock(file, fcntl.LOCK_UN) | |
| 163 | 149 |
| 164 def _size(self) -> int: | |
| 165 return os.stat(self.fileno()).st_size | |
| 166 | 150 |
| 167 @contextlib.contextmanager | 151 def _size(file: t.BinaryIO) -> int: |
| 168 def _file_lock(self, operation: int): | 152 return os.stat(file.fileno()).st_size |
| 169 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' | 153 |
| 170 fcntl.flock(self, operation) | 154 |
| 171 self._lock_status = operation | 155 def _catch_up(file: t.BinaryIO) -> t.Iterable[t.Dict[str, object]]: |
| 172 try: | 156 """Reads data and advances the file pointer to the end of the file.""" |
| 173 yield | 157 size = _size(file) |
| 174 finally: | 158 pointer = file.tell() |
| 175 self._lock_status = None | 159 if size == pointer: |
| 176 fcntl.flock(self, fcntl.LOCK_UN) | 160 return () |
| 161 output: t.List[t.Dict[str, object]] = [] | |
| 162 try: | |
| 163 items = bson.decode_file_iter( | |
| 164 file, codec_options=common.BSON_OPTIONS) | |
| 165 for item in items: | |
| 166 pointer = file.tell() | |
| 167 output.append(item) | |
| 168 except bson.InvalidBSON: | |
| 169 pass # We have reached the last valid document. Bail. | |
| 170 # Seek back to immediately after the end of the last valid doc. | |
| 171 file.seek(pointer, os.SEEK_SET) | |
| 172 return output | |
| 177 | 173 |
| 178 | 174 |
| 179 def _open_or_create(path: str) -> t.BinaryIO: | 175 def _open_or_create(path: str) -> t.BinaryIO: |
| 180 while True: | 176 while True: |
| 181 try: | 177 try: |
