comparison weatherlog/logger.py @ 14:c01f9929ae38

Make logger and HTTP writer more general and resilient. This makes the logger and HTTP writer more general, by removing any dependency upon the exact data type they are writing. They can now handle any type of BSON-serializable dict, and track what they have sent by keeping track of the last *byte*, not the last timestamp.
author Paul Fisher <paul@pfish.zone>
date Tue, 15 Oct 2019 22:40:24 -0400
parents 8a350ec1aa78
children 770215590d80
comparison
equal deleted inserted replaced
13:4c81182eaa6b 14:c01f9929ae38
8 import pathlib 8 import pathlib
9 import threading 9 import threading
10 import time 10 import time
11 import typing as t 11 import typing as t
12 12
13 import attr
13 import bson 14 import bson
14 import pytz 15 import pytz
15 16
16 from . import types
17
18 BSON_FILENAME = "temps.bson" 17 BSON_FILENAME = "temps.bson"
19 LAST_SENT_FILENAME = "last-sent" 18 OLD_LAST_TS = "last-sent"
19 START_BYTE = "start-byte"
20 20
21 21
22 class RemoteWriter(metaclass=abc.ABCMeta): 22 class RemoteWriter(metaclass=abc.ABCMeta):
23 23
24 BATCH_SIZE = 1000 24 BATCH_SIZE = 1000
25 25
26 @abc.abstractmethod 26 @abc.abstractmethod
27 def write(self, readings: t.Sequence[types.Reading]) -> None: 27 def write(self, readings: t.Iterable[t.Dict[str, object]]) -> None:
28 raise NotImplementedError() 28 raise NotImplementedError()
29 29
30 30
31 class RemoteWriteError(Exception): 31 class RemoteWriteError(Exception):
32 """Error to be raised by RemoteWriter.write.""" 32 """Error to be raised by RemoteWriter.write."""
33 pass 33 pass
34
35
36 @attr.s(auto_attribs=True, frozen=True, slots=True)
37 class ReadingPosition:
38 # The encoded reading that was written.
39 data: bytes
40
41 # The index of the byte immediately following this one.
42 end: int
34 43
35 44
36 class BufferedLogger: 45 class BufferedLogger:
37 """A resilient logger which logs to a local file and a RemoteWriter.""" 46 """A resilient logger which logs to a local file and a RemoteWriter."""
38 47
40 49
41 def __init__(self, directory: str, writer: RemoteWriter): 50 def __init__(self, directory: str, writer: RemoteWriter):
42 self._writer = writer 51 self._writer = writer
43 self._path = pathlib.Path(directory) 52 self._path = pathlib.Path(directory)
44 self._file = _open_exclusive(self._path / BSON_FILENAME) 53 self._file = _open_exclusive(self._path / BSON_FILENAME)
45 self._last_sent_path = self._path / LAST_SENT_FILENAME 54 self._old_last_sent = self._path / OLD_LAST_TS
46 last_sent = _read_last_sent(self._last_sent_path) 55 self._start_byte = self._path / START_BYTE
47 unsent = _read_unsent_and_advance( 56 unsent = _read_unsent_and_upgrade(
48 self._file, last_sent) 57 self._file, self._old_last_sent, self._start_byte)
49 self._send_queue = collections.deque(unsent) 58 self._send_queue = collections.deque(unsent)
50 self._running = False 59 self._running = False
51 self._remote_thread: t.Optional[threading.Thread] = None 60 self._remote_thread: t.Optional[threading.Thread] = None
52 61
53 def start(self) -> None: 62 def start(self) -> None:
62 self._file.close() 71 self._file.close()
63 self._running = False 72 self._running = False
64 if self._remote_thread: 73 if self._remote_thread:
65 self._remote_thread.join() 74 self._remote_thread.join()
66 75
67 def write(self, reading: types.Reading): 76 def write(self, reading: t.Dict[str, object]) -> None:
68 self._file.write(bson_encode(reading.as_dict())) 77 encoded = bson_encode(reading)
78 self._file.write(encoded)
69 self._file.flush() 79 self._file.flush()
70 self._send_queue.append(reading) 80 byte = self._file.tell()
81 self._send_queue.append(ReadingPosition(encoded, byte))
71 82
72 def _send_internal(self) -> None: 83 def _send_internal(self) -> None:
73 to_send: t.List[types.Reading] = [] 84 to_send: t.List[ReadingPosition] = []
74 while True: 85 while True:
75 # Wait for multiple entries to build up in the queue. 86 # Wait for multiple entries to build up in the queue.
76 time.sleep(self.WAIT_TIME) 87 time.sleep(self.WAIT_TIME)
77 while len(to_send) < self._writer.BATCH_SIZE: 88 while len(to_send) < self._writer.BATCH_SIZE:
78 # Pop all the values we can off the queue. 89 # Pop all the values we can off the queue.
86 if not to_send: 97 if not to_send:
87 # If there's nothing to send, don't try to send anything. 98 # If there's nothing to send, don't try to send anything.
88 continue 99 continue
89 try: 100 try:
90 # Try writing out the values. 101 # Try writing out the values.
91 self._writer.write(to_send) 102 self._writer.write(e.data for e in to_send)
92 except RemoteWriteError: 103 except RemoteWriteError:
93 pass # If it fails, just try again next time. 104 pass # If it fails, just try again next time.
94 else: 105 else:
95 # If we succeeded, record our success. 106 # If we succeeded, record our success.
96 last_sent = to_send[-1] 107 last_sent = to_send[-1]
97 self._update_last_sent(last_sent.sample_time) 108 self._update_start_byte(last_sent.end)
98 to_send.clear() 109 to_send.clear()
99 110
100 def _update_last_sent(self, timestamp: datetime.datetime) -> None: 111 def _update_start_byte(self, byte: int) -> None:
101 last_sent_name = self._path / (LAST_SENT_FILENAME + ".new") 112 start_byte_name = self._path / (START_BYTE + ".new")
102 with last_sent_name.open('w') as outfile: 113 with start_byte_name.open('w') as outfile:
103 outfile.write(str(timestamp.timestamp())) 114 outfile.write(str(byte))
104 last_sent_name.rename(self._last_sent_path) 115 start_byte_name.rename(self._start_byte)
116
117
118 def _atomic_write(file: pathlib.Path, contents: str) -> None:
119 """Writes a string to a file, atomically."""
120 new_name = file.with_name(file.name + '.new')
121 with new_name.open('w') as outfile:
122 outfile.write(contents)
123 new_name.rename(file)
105 124
106 125
107 def _open_or_create(path: pathlib.Path) -> t.BinaryIO: 126 def _open_or_create(path: pathlib.Path) -> t.BinaryIO:
108 while True: 127 while True:
109 try: 128 try:
124 file.close() 143 file.close()
125 raise OSError('Another copy of the logger is running.') from ex 144 raise OSError('Another copy of the logger is running.') from ex
126 return file 145 return file
127 146
128 147
148 def _read_unsent_and_upgrade(
149 infile: t.BinaryIO,
150 last_sent_file: pathlib.Path,
151 start_byte_file: pathlib.Path,
152 ) -> t.List[ReadingPosition]:
153 _maybe_upgrade_last_sent(infile, last_sent_file, start_byte_file)
154 start_byte = _read_start_byte(start_byte_file)
155 infile.seek(start_byte, os.SEEK_SET)
156 reader = bson.decode_file_iter(infile, BSON_OPTIONS)
157 readings: t.List[ReadingPosition] = []
158 end_pos = infile.tell()
159 try:
160 for entry in reader:
161 data = bson_encode(entry)
162 end_pos = infile.tell()
163 readings.append(ReadingPosition(data, end_pos))
164 except bson.InvalidBSON:
165 infile.seek(end_pos, os.SEEK_SET)
166 infile.truncate(end_pos)
167 return readings
168
169
170 def _read_start_byte(path: pathlib.Path) -> int:
171 try:
172 with path.open('r') as infile:
173 return int(infile.read())
174 except (OSError, ValueError):
175 return 0
176
177
178 def _maybe_upgrade_last_sent(
179 infile: t.BinaryIO,
180 last_sent_file: pathlib.Path,
181 start_byte_file: pathlib.Path,
182 ) -> None:
183 """If there's a last-sent file, upgrades it to start-byte."""
184 last_sent = _read_last_sent(last_sent_file)
185 if not last_sent:
186 return
187 reader = bson.decode_file_iter(infile, BSON_OPTIONS)
188 last_good = infile.tell()
189 try:
190 for entry in reader:
191 try:
192 timestamp: datetime.datetime = entry['sample_time']
193 except KeyError:
194 continue # Invalid entry; skip it.
195 if last_sent < timestamp:
196 break
197 last_good = infile.tell()
198 except bson.InvalidBSON:
199 infile.seek(last_good, os.SEEK_SET)
200 infile.truncate(last_good)
201 _atomic_write(start_byte_file, str(last_good))
202 last_sent_file.unlink()
203
204
129 def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]: 205 def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]:
130 try: 206 try:
131 with path.open('r') as infile: 207 with path.open('r') as infile:
132 unix_ts = float(infile.read()) 208 unix_ts = float(infile.read())
133 except (OSError, ValueError): 209 except (OSError, ValueError):
139 215
140 BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options( 216 BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options(
141 tz_aware=True, tzinfo=pytz.UTC) 217 tz_aware=True, tzinfo=pytz.UTC)
142 218
143 219
144 def bson_encode(data: t.Dict[str, t.Any]) -> bytes: 220 def bson_encode(data: t.Dict[str, object]) -> bytes:
145 return bson.BSON.encode(data, codec_options=BSON_OPTIONS) 221 return bson.BSON.encode(data, codec_options=BSON_OPTIONS)
146
147
148 def _read_unsent_and_advance(
149 infile: t.BinaryIO, last_sent: t.Optional[datetime.datetime],
150 ) -> t.List[types.Reading]:
151 """Reads all the unsent Readings and advances the file pointer to the end.
152 """
153 reader = bson.decode_file_iter(infile, BSON_OPTIONS)
154 last_good = infile.tell()
155 unsent: t.List[types.Reading] = []
156 try:
157 for entry in reader:
158 last_good = infile.tell()
159 try:
160 reading = types.Reading(**entry)
161 except TypeError:
162 continue # Invalid entry; skip it.
163 if not last_sent or last_sent < reading.sample_time:
164 unsent.append(reading)
165 except bson.InvalidBSON:
166 infile.seek(last_good, os.SEEK_SET)
167 infile.truncate(last_good)
168 return unsent