Mercurial > personal > weather-server
comparison weather_server/logfile.py @ 24:20c8ec56e447
logfile: Pull logfile thread out of Logger.
This enables automatic garbage collection of Logger instances,
since a running thread no longer has a reference to a Logger's self.
It separates exclusive management of logfile state into the
_writer_thread function, which now opens the file and writes it until
it is told to stop by receiving the poison pill.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sun, 10 Nov 2019 23:07:11 -0500 |
parents | beb42c835c52 |
children | 9bc3687e1e5e |
comparison
equal
deleted
inserted
replaced
23:88249e451566 | 24:20c8ec56e447 |
---|---|
29 def __init__(self): | 29 def __init__(self): |
30 # The future that will be set with the log's contnets. | 30 # The future that will be set with the log's contnets. |
31 self.future = futures.Future() | 31 self.future = futures.Future() |
32 | 32 |
33 | 33 |
34 # probably handle file-writing with a queue that reports back its progress | 34 # Poison pill to tell a logger thread to stop. |
35 _POISON = object() | |
36 | |
35 | 37 |
36 class Logger: | 38 class Logger: |
37 """Logger which handles reading/writing a temperature log for one process. | 39 """Logger which handles reading/writing one temperature log file.""" |
38 """ | |
39 | |
40 instance_lock = threading.Lock() | |
41 instances: t.Dict[str, 'Logger'] = {} | |
42 | |
43 @classmethod | |
44 def create( | |
45 cls, | |
46 filename: str, | |
47 *, | |
48 sample_field: str, | |
49 ) -> 'Logger': | |
50 """Creates a single shared instance of a logger for the given file.""" | |
51 try: | |
52 instance = cls.instances[filename] | |
53 except KeyError: | |
54 with cls.instance_lock: | |
55 try: | |
56 instance = cls.instances[filename] | |
57 except KeyError: | |
58 cls.instances[filename] = Logger( | |
59 filename, | |
60 sample_field=sample_field) | |
61 instance = cls.instances[filename] | |
62 if instance._sample_field != sample_field: | |
63 raise ValueError( | |
64 'Existing instance has different sample field: ' | |
65 '{!r} != {!r}'.format(instance._sample_field, sample_field)) | |
66 return instance | |
67 | 40 |
68 def __init__(self, filename: str, *, sample_field: str): | 41 def __init__(self, filename: str, *, sample_field: str): |
69 """You should probably call .create() instead.""" | 42 """Creates a new Logger for the given file. |
43 | |
44 Args: | |
45 filename: The filename to open, or create if not already there. | |
46 sample_field: The field name to use as the strictly-increasing | |
47 value to ensure that no duplicate writes occur. | |
48 """ | |
70 self._sample_field = sample_field | 49 self._sample_field = sample_field |
71 self._file = _open_or_create(filename) | |
72 self._data: t.List[t.Dict[str, t.Any], ...] = [] | |
73 self._queue = queue.SimpleQueue() | 50 self._queue = queue.SimpleQueue() |
74 self._last_size = 0 | 51 # Create a Future that will be resolved once the file is opened |
75 self._lock_status: t.Optional[int] = None | 52 # (or fails to be opened). |
76 self._writer_thread = threading.Thread(target=self._writer) | 53 writer_started = futures.Future() |
54 self._writer_thread = threading.Thread( | |
55 name=f'{filename!r} writer thread', | |
56 target=lambda: _writer_thread( | |
57 filename, self._queue, sample_field, writer_started), | |
58 daemon=True) | |
77 self._writer_thread.start() | 59 self._writer_thread.start() |
60 writer_started.result() | |
78 | 61 |
79 @property | 62 @property |
80 def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: | 63 def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: |
81 req = _ReadRequest() | 64 req = _ReadRequest() |
82 self._queue.put(req) | 65 self._queue.put(req) |
85 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): | 68 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): |
86 req = _WriteRequest(entries) | 69 req = _WriteRequest(entries) |
87 self._queue.put(req) | 70 self._queue.put(req) |
88 return req.future.result() | 71 return req.future.result() |
89 | 72 |
90 _POISON = object() | 73 def __del__(self): |
74 self.close() | |
91 | 75 |
92 def close(self): | 76 def close(self): |
93 self._queue.put(self._POISON) | 77 self._queue.put(_POISON) |
94 self._writer_thread.join() | 78 self._writer_thread.join() |
95 | 79 |
96 def _writer(self) -> None: | 80 |
81 def _writer_thread( | |
82 filename: str, | |
83 q: queue.Queue, | |
84 sample_field: str, | |
85 started: futures.Future, | |
86 ) -> None: | |
87 if not started.set_running_or_notify_cancel(): | |
88 return | |
89 try: | |
90 file = _open_or_create(filename) | |
91 started.set_result(None) | |
92 except BaseException as e: | |
93 started.set_exception(e) | |
94 return | |
95 with file: | |
97 running = True | 96 running = True |
97 data: t.List[t.Dict[str, object]] = [] | |
98 while running: | 98 while running: |
99 item = self._queue.get() | 99 item = q.get() |
100 if item is self._POISON: | 100 if item is _POISON: |
101 # None is the poison pill that makes us stop. | 101 # None is the poison pill that makes us stop. |
102 running = False | 102 running = False |
103 elif isinstance(item, _ReadRequest): | 103 elif isinstance(item, _ReadRequest): |
104 if not item.future.set_running_or_notify_cancel(): | 104 if not item.future.set_running_or_notify_cancel(): |
105 continue | 105 continue |
106 try: | 106 try: |
107 with self._file_lock(fcntl.LOCK_SH): | 107 with _file_lock(file, fcntl.LOCK_SH): |
108 self._catch_up() | 108 data.extend(_catch_up(file)) |
109 except BaseException as x: | 109 except BaseException as x: |
110 item.future.set_exception(x) | 110 item.future.set_exception(x) |
111 else: | 111 else: |
112 item.future.set_result(tuple(self._data)) | 112 item.future.set_result(tuple(data)) |
113 elif isinstance(item, _WriteRequest): | 113 elif isinstance(item, _WriteRequest): |
114 if not item.future.set_running_or_notify_cancel(): | 114 if not item.future.set_running_or_notify_cancel(): |
115 continue | 115 continue |
116 try: | 116 try: |
117 with self._file_lock(fcntl.LOCK_EX): | 117 with _file_lock(file, fcntl.LOCK_EX): |
118 self._catch_up() | 118 data.extend(_catch_up(file)) |
119 # Since we're at the last good point, truncate after. | 119 # Since we're at the last good point, truncate after. |
120 self._file.truncate(self._file.tell()) | 120 file.truncate(file.tell()) |
121 if not self._data: | 121 if not data: |
122 last = None | 122 last = None |
123 else: | 123 else: |
124 last = self._data[-1][self._sample_field] | 124 last = data[-1][sample_field] |
125 for entry in item.entries: | 125 for entry in item.entries: |
126 entry_key = entry[self._sample_field] | 126 entry_key = entry[sample_field] |
127 if last is None or last < entry_key: | 127 if last is None or last < entry_key: |
128 self._file.write(common.bson_encode(entry)) | 128 file.write(common.bson_encode(entry)) |
129 self._data.append(entry) | 129 data.append(entry) |
130 last = entry_key | 130 last = entry_key |
131 self._file.flush() | 131 file.flush() |
132 self._last_size = self._file.tell() | |
133 except BaseException as x: | 132 except BaseException as x: |
134 item.future.set_exception(x) | 133 item.future.set_exception(x) |
135 else: | 134 else: |
136 item.future.set_result(None) | 135 item.future.set_result(None) |
137 else: | 136 else: |
138 raise AssertionError( | 137 raise AssertionError( |
139 'Unexpected item {!r} in the queue'.format(item)) | 138 'Unexpected item {!r} in the queue'.format(item)) |
140 self._file.close() | |
141 | 139 |
142 def _catch_up(self) -> None: | |
143 """Reads data and advances the file pointer to the end of the file.""" | |
144 assert self._lock_status is not None, 'The lock must be held.' | |
145 size = self._size() | |
146 if size == self._last_size: | |
147 return | |
148 last_good = self._file.tell() | |
149 try: | |
150 items = bson.decode_file_iter( | |
151 self._file, codec_options=common.BSON_OPTIONS) | |
152 for item in items: | |
153 last_good = self._file.tell() | |
154 self._data.append(item) | |
155 except bson.InvalidBSON: | |
156 pass # We have reached the last valid document. Bail. | |
157 # Seek back to immediately after the end of the last valid doc. | |
158 self._last_size = last_good | |
159 self._file.seek(last_good, os.SEEK_SET) | |
160 | 140 |
161 def fileno(self) -> int: | 141 @contextlib.contextmanager |
162 return self._file.fileno() | 142 def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]: |
143 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' | |
144 fcntl.flock(file, operation) | |
145 try: | |
146 yield | |
147 finally: | |
148 fcntl.flock(file, fcntl.LOCK_UN) | |
163 | 149 |
164 def _size(self) -> int: | |
165 return os.stat(self.fileno()).st_size | |
166 | 150 |
167 @contextlib.contextmanager | 151 def _size(file: t.BinaryIO) -> int: |
168 def _file_lock(self, operation: int): | 152 return os.stat(file.fileno()).st_size |
169 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' | 153 |
170 fcntl.flock(self, operation) | 154 |
171 self._lock_status = operation | 155 def _catch_up(file: t.BinaryIO) -> t.Iterable[t.Dict[str, object]]: |
172 try: | 156 """Reads data and advances the file pointer to the end of the file.""" |
173 yield | 157 size = _size(file) |
174 finally: | 158 pointer = file.tell() |
175 self._lock_status = None | 159 if size == pointer: |
176 fcntl.flock(self, fcntl.LOCK_UN) | 160 return () |
161 output: t.List[t.Dict[str, object]] = [] | |
162 try: | |
163 items = bson.decode_file_iter( | |
164 file, codec_options=common.BSON_OPTIONS) | |
165 for item in items: | |
166 pointer = file.tell() | |
167 output.append(item) | |
168 except bson.InvalidBSON: | |
169 pass # We have reached the last valid document. Bail. | |
170 # Seek back to immediately after the end of the last valid doc. | |
171 file.seek(pointer, os.SEEK_SET) | |
172 return output | |
177 | 173 |
178 | 174 |
179 def _open_or_create(path: str) -> t.BinaryIO: | 175 def _open_or_create(path: str) -> t.BinaryIO: |
180 while True: | 176 while True: |
181 try: | 177 try: |