Mercurial > personal > weatherlog
comparison weatherlog/logger.py @ 14:c01f9929ae38
Make logger and HTTP writer more general and resilient.
This makes the logger and HTTP writer more general, by removing
any dependency upon the exact data type they are writing.
They can now handle any type of BSON-serializable dict,
and track what they have sent by keeping track of the last *byte*,
not the last timestamp.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Tue, 15 Oct 2019 22:40:24 -0400 |
parents | 8a350ec1aa78 |
children | 770215590d80 |
comparison
equal
deleted
inserted
replaced
13:4c81182eaa6b | 14:c01f9929ae38 |
---|---|
8 import pathlib | 8 import pathlib |
9 import threading | 9 import threading |
10 import time | 10 import time |
11 import typing as t | 11 import typing as t |
12 | 12 |
13 import attr | |
13 import bson | 14 import bson |
14 import pytz | 15 import pytz |
15 | 16 |
16 from . import types | |
17 | |
18 BSON_FILENAME = "temps.bson" | 17 BSON_FILENAME = "temps.bson" |
19 LAST_SENT_FILENAME = "last-sent" | 18 OLD_LAST_TS = "last-sent" |
19 START_BYTE = "start-byte" | |
20 | 20 |
21 | 21 |
22 class RemoteWriter(metaclass=abc.ABCMeta): | 22 class RemoteWriter(metaclass=abc.ABCMeta): |
23 | 23 |
24 BATCH_SIZE = 1000 | 24 BATCH_SIZE = 1000 |
25 | 25 |
26 @abc.abstractmethod | 26 @abc.abstractmethod |
27 def write(self, readings: t.Sequence[types.Reading]) -> None: | 27 def write(self, readings: t.Iterable[t.Dict[str, object]]) -> None: |
28 raise NotImplementedError() | 28 raise NotImplementedError() |
29 | 29 |
30 | 30 |
31 class RemoteWriteError(Exception): | 31 class RemoteWriteError(Exception): |
32 """Error to be raised by RemoteWriter.write.""" | 32 """Error to be raised by RemoteWriter.write.""" |
33 pass | 33 pass |
34 | |
35 | |
36 @attr.s(auto_attribs=True, frozen=True, slots=True) | |
37 class ReadingPosition: | |
38 # The encoded reading that was written. | |
39 data: bytes | |
40 | |
41 # The index of the byte immediately following this one. | |
42 end: int | |
34 | 43 |
35 | 44 |
36 class BufferedLogger: | 45 class BufferedLogger: |
37 """A resilient logger which logs to a local file and a RemoteWriter.""" | 46 """A resilient logger which logs to a local file and a RemoteWriter.""" |
38 | 47 |
40 | 49 |
41 def __init__(self, directory: str, writer: RemoteWriter): | 50 def __init__(self, directory: str, writer: RemoteWriter): |
42 self._writer = writer | 51 self._writer = writer |
43 self._path = pathlib.Path(directory) | 52 self._path = pathlib.Path(directory) |
44 self._file = _open_exclusive(self._path / BSON_FILENAME) | 53 self._file = _open_exclusive(self._path / BSON_FILENAME) |
45 self._last_sent_path = self._path / LAST_SENT_FILENAME | 54 self._old_last_sent = self._path / OLD_LAST_TS |
46 last_sent = _read_last_sent(self._last_sent_path) | 55 self._start_byte = self._path / START_BYTE |
47 unsent = _read_unsent_and_advance( | 56 unsent = _read_unsent_and_upgrade( |
48 self._file, last_sent) | 57 self._file, self._old_last_sent, self._start_byte) |
49 self._send_queue = collections.deque(unsent) | 58 self._send_queue = collections.deque(unsent) |
50 self._running = False | 59 self._running = False |
51 self._remote_thread: t.Optional[threading.Thread] = None | 60 self._remote_thread: t.Optional[threading.Thread] = None |
52 | 61 |
53 def start(self) -> None: | 62 def start(self) -> None: |
62 self._file.close() | 71 self._file.close() |
63 self._running = False | 72 self._running = False |
64 if self._remote_thread: | 73 if self._remote_thread: |
65 self._remote_thread.join() | 74 self._remote_thread.join() |
66 | 75 |
67 def write(self, reading: types.Reading): | 76 def write(self, reading: t.Dict[str, object]) -> None: |
68 self._file.write(bson_encode(reading.as_dict())) | 77 encoded = bson_encode(reading) |
78 self._file.write(encoded) | |
69 self._file.flush() | 79 self._file.flush() |
70 self._send_queue.append(reading) | 80 byte = self._file.tell() |
81 self._send_queue.append(ReadingPosition(encoded, byte)) | |
71 | 82 |
72 def _send_internal(self) -> None: | 83 def _send_internal(self) -> None: |
73 to_send: t.List[types.Reading] = [] | 84 to_send: t.List[ReadingPosition] = [] |
74 while True: | 85 while True: |
75 # Wait for multiple entries to build up in the queue. | 86 # Wait for multiple entries to build up in the queue. |
76 time.sleep(self.WAIT_TIME) | 87 time.sleep(self.WAIT_TIME) |
77 while len(to_send) < self._writer.BATCH_SIZE: | 88 while len(to_send) < self._writer.BATCH_SIZE: |
78 # Pop all the values we can off the queue. | 89 # Pop all the values we can off the queue. |
86 if not to_send: | 97 if not to_send: |
87 # If there's nothing to send, don't try to send anything. | 98 # If there's nothing to send, don't try to send anything. |
88 continue | 99 continue |
89 try: | 100 try: |
90 # Try writing out the values. | 101 # Try writing out the values. |
91 self._writer.write(to_send) | 102 self._writer.write(e.data for e in to_send) |
92 except RemoteWriteError: | 103 except RemoteWriteError: |
93 pass # If it fails, just try again next time. | 104 pass # If it fails, just try again next time. |
94 else: | 105 else: |
95 # If we succeeded, record our success. | 106 # If we succeeded, record our success. |
96 last_sent = to_send[-1] | 107 last_sent = to_send[-1] |
97 self._update_last_sent(last_sent.sample_time) | 108 self._update_start_byte(last_sent.end) |
98 to_send.clear() | 109 to_send.clear() |
99 | 110 |
100 def _update_last_sent(self, timestamp: datetime.datetime) -> None: | 111 def _update_start_byte(self, byte: int) -> None: |
101 last_sent_name = self._path / (LAST_SENT_FILENAME + ".new") | 112 start_byte_name = self._path / (START_BYTE + ".new") |
102 with last_sent_name.open('w') as outfile: | 113 with start_byte_name.open('w') as outfile: |
103 outfile.write(str(timestamp.timestamp())) | 114 outfile.write(str(byte)) |
104 last_sent_name.rename(self._last_sent_path) | 115 start_byte_name.rename(self._start_byte) |
116 | |
117 | |
118 def _atomic_write(file: pathlib.Path, contents: str) -> None: | |
119 """Writes a string to a file, atomically.""" | |
120 new_name = file.with_name(file.name + '.new') | |
121 with new_name.open('w') as outfile: | |
122 outfile.write(contents) | |
123 new_name.rename(file) | |
105 | 124 |
106 | 125 |
107 def _open_or_create(path: pathlib.Path) -> t.BinaryIO: | 126 def _open_or_create(path: pathlib.Path) -> t.BinaryIO: |
108 while True: | 127 while True: |
109 try: | 128 try: |
124 file.close() | 143 file.close() |
125 raise OSError('Another copy of the logger is running.') from ex | 144 raise OSError('Another copy of the logger is running.') from ex |
126 return file | 145 return file |
127 | 146 |
128 | 147 |
148 def _read_unsent_and_upgrade( | |
149 infile: t.BinaryIO, | |
150 last_sent_file: pathlib.Path, | |
151 start_byte_file: pathlib.Path, | |
152 ) -> t.List[ReadingPosition]: | |
153 _maybe_upgrade_last_sent(infile, last_sent_file, start_byte_file) | |
154 start_byte = _read_start_byte(start_byte_file) | |
155 infile.seek(start_byte, os.SEEK_SET) | |
156 reader = bson.decode_file_iter(infile, BSON_OPTIONS) | |
157 readings: t.List[ReadingPosition] = [] | |
158 end_pos = infile.tell() | |
159 try: | |
160 for entry in reader: | |
161 data = bson_encode(entry) | |
162 end_pos = infile.tell() | |
163 readings.append(ReadingPosition(data, end_pos)) | |
164 except bson.InvalidBSON: | |
165 infile.seek(end_pos, os.SEEK_SET) | |
166 infile.truncate(end_pos) | |
167 return readings | |
168 | |
169 | |
170 def _read_start_byte(path: pathlib.Path) -> int: | |
171 try: | |
172 with path.open('r') as infile: | |
173 return int(infile.read()) | |
174 except (OSError, ValueError): | |
175 return 0 | |
176 | |
177 | |
178 def _maybe_upgrade_last_sent( | |
179 infile: t.BinaryIO, | |
180 last_sent_file: pathlib.Path, | |
181 start_byte_file: pathlib.Path, | |
182 ) -> None: | |
183 """If there's a last-sent file, upgrades it to start-byte.""" | |
184 last_sent = _read_last_sent(last_sent_file) | |
185 if not last_sent: | |
186 return | |
187 reader = bson.decode_file_iter(infile, BSON_OPTIONS) | |
188 last_good = infile.tell() | |
189 try: | |
190 for entry in reader: | |
191 try: | |
192 timestamp: datetime.datetime = entry['sample_time'] | |
193 except KeyError: | |
194 continue # Invalid entry; skip it. | |
195 if last_sent < timestamp: | |
196 break | |
197 last_good = infile.tell() | |
198 except bson.InvalidBSON: | |
199 infile.seek(last_good, os.SEEK_SET) | |
200 infile.truncate(last_good) | |
201 _atomic_write(start_byte_file, str(last_good)) | |
202 last_sent_file.unlink() | |
203 | |
204 | |
129 def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]: | 205 def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]: |
130 try: | 206 try: |
131 with path.open('r') as infile: | 207 with path.open('r') as infile: |
132 unix_ts = float(infile.read()) | 208 unix_ts = float(infile.read()) |
133 except (OSError, ValueError): | 209 except (OSError, ValueError): |
139 | 215 |
140 BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options( | 216 BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options( |
141 tz_aware=True, tzinfo=pytz.UTC) | 217 tz_aware=True, tzinfo=pytz.UTC) |
142 | 218 |
143 | 219 |
144 def bson_encode(data: t.Dict[str, t.Any]) -> bytes: | 220 def bson_encode(data: t.Dict[str, object]) -> bytes: |
145 return bson.BSON.encode(data, codec_options=BSON_OPTIONS) | 221 return bson.BSON.encode(data, codec_options=BSON_OPTIONS) |
146 | |
147 | |
148 def _read_unsent_and_advance( | |
149 infile: t.BinaryIO, last_sent: t.Optional[datetime.datetime], | |
150 ) -> t.List[types.Reading]: | |
151 """Reads all the unsent Readings and advances the file pointer to the end. | |
152 """ | |
153 reader = bson.decode_file_iter(infile, BSON_OPTIONS) | |
154 last_good = infile.tell() | |
155 unsent: t.List[types.Reading] = [] | |
156 try: | |
157 for entry in reader: | |
158 last_good = infile.tell() | |
159 try: | |
160 reading = types.Reading(**entry) | |
161 except TypeError: | |
162 continue # Invalid entry; skip it. | |
163 if not last_sent or last_sent < reading.sample_time: | |
164 unsent.append(reading) | |
165 except bson.InvalidBSON: | |
166 infile.seek(last_good, os.SEEK_SET) | |
167 infile.truncate(last_good) | |
168 return unsent |