comparison weather_server/logfile.py @ 24:20c8ec56e447

logfile: Pull logfile thread out of Logger. This enables automatic garbage collection of Logger instances, since a running thread no longer has a reference to a Logger's self. It separates exclusive management of logfile state into the _writer_thread function, which now opens the file and writes it until it is told to stop by receiving the poison pill.
author Paul Fisher <paul@pfish.zone>
date Sun, 10 Nov 2019 23:07:11 -0500
parents beb42c835c52
children 9bc3687e1e5e
comparison
equal deleted inserted replaced
23:88249e451566 24:20c8ec56e447
29 def __init__(self): 29 def __init__(self):
30 # The future that will be set with the log's contnets. 30 # The future that will be set with the log's contnets.
31 self.future = futures.Future() 31 self.future = futures.Future()
32 32
33 33
34 # probably handle file-writing with a queue that reports back its progress 34 # Poison pill to tell a logger thread to stop.
35 _POISON = object()
36
35 37
36 class Logger: 38 class Logger:
37 """Logger which handles reading/writing a temperature log for one process. 39 """Logger which handles reading/writing one temperature log file."""
38 """
39
40 instance_lock = threading.Lock()
41 instances: t.Dict[str, 'Logger'] = {}
42
43 @classmethod
44 def create(
45 cls,
46 filename: str,
47 *,
48 sample_field: str,
49 ) -> 'Logger':
50 """Creates a single shared instance of a logger for the given file."""
51 try:
52 instance = cls.instances[filename]
53 except KeyError:
54 with cls.instance_lock:
55 try:
56 instance = cls.instances[filename]
57 except KeyError:
58 cls.instances[filename] = Logger(
59 filename,
60 sample_field=sample_field)
61 instance = cls.instances[filename]
62 if instance._sample_field != sample_field:
63 raise ValueError(
64 'Existing instance has different sample field: '
65 '{!r} != {!r}'.format(instance._sample_field, sample_field))
66 return instance
67 40
68 def __init__(self, filename: str, *, sample_field: str): 41 def __init__(self, filename: str, *, sample_field: str):
69 """You should probably call .create() instead.""" 42 """Creates a new Logger for the given file.
43
44 Args:
45 filename: The filename to open, or create if not already there.
46 sample_field: The field name to use as the strictly-increasing
47 value to ensure that no duplicate writes occur.
48 """
70 self._sample_field = sample_field 49 self._sample_field = sample_field
71 self._file = _open_or_create(filename)
72 self._data: t.List[t.Dict[str, t.Any], ...] = []
73 self._queue = queue.SimpleQueue() 50 self._queue = queue.SimpleQueue()
74 self._last_size = 0 51 # Create a Future that will be resolved once the file is opened
75 self._lock_status: t.Optional[int] = None 52 # (or fails to be opened).
76 self._writer_thread = threading.Thread(target=self._writer) 53 writer_started = futures.Future()
54 self._writer_thread = threading.Thread(
55 name=f'{filename!r} writer thread',
56 target=lambda: _writer_thread(
57 filename, self._queue, sample_field, writer_started),
58 daemon=True)
77 self._writer_thread.start() 59 self._writer_thread.start()
60 writer_started.result()
78 61
79 @property 62 @property
80 def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: 63 def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
81 req = _ReadRequest() 64 req = _ReadRequest()
82 self._queue.put(req) 65 self._queue.put(req)
85 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): 68 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]):
86 req = _WriteRequest(entries) 69 req = _WriteRequest(entries)
87 self._queue.put(req) 70 self._queue.put(req)
88 return req.future.result() 71 return req.future.result()
89 72
90 _POISON = object() 73 def __del__(self):
74 self.close()
91 75
92 def close(self): 76 def close(self):
93 self._queue.put(self._POISON) 77 self._queue.put(_POISON)
94 self._writer_thread.join() 78 self._writer_thread.join()
95 79
96 def _writer(self) -> None: 80
81 def _writer_thread(
82 filename: str,
83 q: queue.Queue,
84 sample_field: str,
85 started: futures.Future,
86 ) -> None:
87 if not started.set_running_or_notify_cancel():
88 return
89 try:
90 file = _open_or_create(filename)
91 started.set_result(None)
92 except BaseException as e:
93 started.set_exception(e)
94 return
95 with file:
97 running = True 96 running = True
97 data: t.List[t.Dict[str, object]] = []
98 while running: 98 while running:
99 item = self._queue.get() 99 item = q.get()
100 if item is self._POISON: 100 if item is _POISON:
101 # None is the poison pill that makes us stop. 101 # None is the poison pill that makes us stop.
102 running = False 102 running = False
103 elif isinstance(item, _ReadRequest): 103 elif isinstance(item, _ReadRequest):
104 if not item.future.set_running_or_notify_cancel(): 104 if not item.future.set_running_or_notify_cancel():
105 continue 105 continue
106 try: 106 try:
107 with self._file_lock(fcntl.LOCK_SH): 107 with _file_lock(file, fcntl.LOCK_SH):
108 self._catch_up() 108 data.extend(_catch_up(file))
109 except BaseException as x: 109 except BaseException as x:
110 item.future.set_exception(x) 110 item.future.set_exception(x)
111 else: 111 else:
112 item.future.set_result(tuple(self._data)) 112 item.future.set_result(tuple(data))
113 elif isinstance(item, _WriteRequest): 113 elif isinstance(item, _WriteRequest):
114 if not item.future.set_running_or_notify_cancel(): 114 if not item.future.set_running_or_notify_cancel():
115 continue 115 continue
116 try: 116 try:
117 with self._file_lock(fcntl.LOCK_EX): 117 with _file_lock(file, fcntl.LOCK_EX):
118 self._catch_up() 118 data.extend(_catch_up(file))
119 # Since we're at the last good point, truncate after. 119 # Since we're at the last good point, truncate after.
120 self._file.truncate(self._file.tell()) 120 file.truncate(file.tell())
121 if not self._data: 121 if not data:
122 last = None 122 last = None
123 else: 123 else:
124 last = self._data[-1][self._sample_field] 124 last = data[-1][sample_field]
125 for entry in item.entries: 125 for entry in item.entries:
126 entry_key = entry[self._sample_field] 126 entry_key = entry[sample_field]
127 if last is None or last < entry_key: 127 if last is None or last < entry_key:
128 self._file.write(common.bson_encode(entry)) 128 file.write(common.bson_encode(entry))
129 self._data.append(entry) 129 data.append(entry)
130 last = entry_key 130 last = entry_key
131 self._file.flush() 131 file.flush()
132 self._last_size = self._file.tell()
133 except BaseException as x: 132 except BaseException as x:
134 item.future.set_exception(x) 133 item.future.set_exception(x)
135 else: 134 else:
136 item.future.set_result(None) 135 item.future.set_result(None)
137 else: 136 else:
138 raise AssertionError( 137 raise AssertionError(
139 'Unexpected item {!r} in the queue'.format(item)) 138 'Unexpected item {!r} in the queue'.format(item))
140 self._file.close()
141 139
142 def _catch_up(self) -> None:
143 """Reads data and advances the file pointer to the end of the file."""
144 assert self._lock_status is not None, 'The lock must be held.'
145 size = self._size()
146 if size == self._last_size:
147 return
148 last_good = self._file.tell()
149 try:
150 items = bson.decode_file_iter(
151 self._file, codec_options=common.BSON_OPTIONS)
152 for item in items:
153 last_good = self._file.tell()
154 self._data.append(item)
155 except bson.InvalidBSON:
156 pass # We have reached the last valid document. Bail.
157 # Seek back to immediately after the end of the last valid doc.
158 self._last_size = last_good
159 self._file.seek(last_good, os.SEEK_SET)
160 140
161 def fileno(self) -> int: 141 @contextlib.contextmanager
162 return self._file.fileno() 142 def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]:
143 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.'
144 fcntl.flock(file, operation)
145 try:
146 yield
147 finally:
148 fcntl.flock(file, fcntl.LOCK_UN)
163 149
164 def _size(self) -> int:
165 return os.stat(self.fileno()).st_size
166 150
167 @contextlib.contextmanager 151 def _size(file: t.BinaryIO) -> int:
168 def _file_lock(self, operation: int): 152 return os.stat(file.fileno()).st_size
169 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' 153
170 fcntl.flock(self, operation) 154
171 self._lock_status = operation 155 def _catch_up(file: t.BinaryIO) -> t.Iterable[t.Dict[str, object]]:
172 try: 156 """Reads data and advances the file pointer to the end of the file."""
173 yield 157 size = _size(file)
174 finally: 158 pointer = file.tell()
175 self._lock_status = None 159 if size == pointer:
176 fcntl.flock(self, fcntl.LOCK_UN) 160 return ()
161 output: t.List[t.Dict[str, object]] = []
162 try:
163 items = bson.decode_file_iter(
164 file, codec_options=common.BSON_OPTIONS)
165 for item in items:
166 pointer = file.tell()
167 output.append(item)
168 except bson.InvalidBSON:
169 pass # We have reached the last valid document. Bail.
170 # Seek back to immediately after the end of the last valid doc.
171 file.seek(pointer, os.SEEK_SET)
172 return output
177 173
178 174
179 def _open_or_create(path: str) -> t.BinaryIO: 175 def _open_or_create(path: str) -> t.BinaryIO:
180 while True: 176 while True:
181 try: 177 try: