# HG changeset patch # User Paul Fisher # Date 1571193624 14400 # Node ID c01f9929ae38a9d926daa1a77670c0de9cf418bc # Parent 4c81182eaa6b06696f3d0351f5aff4b9848df5e6 Make logger and HTTP writer more general and resilient. This makes the logger and HTTP writer more general, by removing any dependency upon the exact data type they are writing. They can now handle any type of BSON-serializable dict, and track what they have sent by keeping track of the last *byte*, not the last timestamp. diff -r 4c81182eaa6b -r c01f9929ae38 weatherlog/daemon.py --- a/weatherlog/daemon.py Sun Sep 29 12:11:16 2019 -0400 +++ b/weatherlog/daemon.py Tue Oct 15 22:40:24 2019 -0400 @@ -27,7 +27,7 @@ start = time.time() try: while True: - log.write(r.read()) + log.write(r.read().as_dict()) cycle += 1 target = start + interval * cycle now = time.time() diff -r 4c81182eaa6b -r c01f9929ae38 weatherlog/http_writer.py --- a/weatherlog/http_writer.py Sun Sep 29 12:11:16 2019 -0400 +++ b/weatherlog/http_writer.py Tue Oct 15 22:40:24 2019 -0400 @@ -2,24 +2,22 @@ import typing as t -import bson import requests from . import logger -from . import types class HTTPWriter(logger.RemoteWriter): - def __init__(self, url: str, preamble: t.Dict[str, t.Any]): + def __init__(self, url: str, preamble: t.Dict[str, object]): self._url = url self._session = requests.Session() self._session.headers['User-Agent'] = 'weatherlogger/0.0.1' - self._preamble = bson.BSON.encode(preamble) + self._preamble = logger.bson_encode(preamble) - def write(self, readings: t.Sequence[types.Reading]) -> None: + def write(self, readings: t.Iterable[t.Dict[str, object]]) -> None: try: - data = b''.join(bson.BSON.encode(r.as_dict()) for r in readings) + data = b''.join(map(logger.bson_encode, readings)) response = self._session.post(self._url, data=self._preamble + data) response.raise_for_status() except requests.exceptions.RequestException as rex: diff -r 4c81182eaa6b -r c01f9929ae38 weatherlog/logger.py --- a/weatherlog/logger.py Sun Sep 29 12:11:16 2019 -0400 +++ b/weatherlog/logger.py Tue Oct 15 22:40:24 2019 -0400 @@ -10,13 +10,13 @@ import time import typing as t +import attr import bson import pytz -from . import types - BSON_FILENAME = "temps.bson" -LAST_SENT_FILENAME = "last-sent" +OLD_LAST_TS = "last-sent" +START_BYTE = "start-byte" class RemoteWriter(metaclass=abc.ABCMeta): @@ -24,7 +24,7 @@ BATCH_SIZE = 1000 @abc.abstractmethod - def write(self, readings: t.Sequence[types.Reading]) -> None: + def write(self, readings: t.Iterable[t.Dict[str, object]]) -> None: raise NotImplementedError() @@ -33,6 +33,15 @@ pass +@attr.s(auto_attribs=True, frozen=True, slots=True) +class ReadingPosition: + # The encoded reading that was written. + data: bytes + + # The index of the byte immediately following this one. + end: int + + class BufferedLogger: """A resilient logger which logs to a local file and a RemoteWriter.""" @@ -42,10 +51,10 @@ self._writer = writer self._path = pathlib.Path(directory) self._file = _open_exclusive(self._path / BSON_FILENAME) - self._last_sent_path = self._path / LAST_SENT_FILENAME - last_sent = _read_last_sent(self._last_sent_path) - unsent = _read_unsent_and_advance( - self._file, last_sent) + self._old_last_sent = self._path / OLD_LAST_TS + self._start_byte = self._path / START_BYTE + unsent = _read_unsent_and_upgrade( + self._file, self._old_last_sent, self._start_byte) self._send_queue = collections.deque(unsent) self._running = False self._remote_thread: t.Optional[threading.Thread] = None @@ -64,13 +73,15 @@ if self._remote_thread: self._remote_thread.join() - def write(self, reading: types.Reading): - self._file.write(bson_encode(reading.as_dict())) + def write(self, reading: t.Dict[str, object]) -> None: + encoded = bson_encode(reading) + self._file.write(encoded) self._file.flush() - self._send_queue.append(reading) + byte = self._file.tell() + self._send_queue.append(ReadingPosition(encoded, byte)) def _send_internal(self) -> None: - to_send: t.List[types.Reading] = [] + to_send: t.List[ReadingPosition] = [] while True: # Wait for multiple entries to build up in the queue. time.sleep(self.WAIT_TIME) @@ -88,20 +99,28 @@ continue try: # Try writing out the values. - self._writer.write(to_send) + self._writer.write(e.data for e in to_send) except RemoteWriteError: pass # If it fails, just try again next time. else: # If we succeeded, record our success. last_sent = to_send[-1] - self._update_last_sent(last_sent.sample_time) + self._update_start_byte(last_sent.end) to_send.clear() - def _update_last_sent(self, timestamp: datetime.datetime) -> None: - last_sent_name = self._path / (LAST_SENT_FILENAME + ".new") - with last_sent_name.open('w') as outfile: - outfile.write(str(timestamp.timestamp())) - last_sent_name.rename(self._last_sent_path) + def _update_start_byte(self, byte: int) -> None: + start_byte_name = self._path / (START_BYTE + ".new") + with start_byte_name.open('w') as outfile: + outfile.write(str(byte)) + start_byte_name.rename(self._start_byte) + + +def _atomic_write(file: pathlib.Path, contents: str) -> None: + """Writes a string to a file, atomically.""" + new_name = file.with_name(file.name + '.new') + with new_name.open('w') as outfile: + outfile.write(contents) + new_name.rename(file) def _open_or_create(path: pathlib.Path) -> t.BinaryIO: @@ -126,6 +145,63 @@ return file +def _read_unsent_and_upgrade( + infile: t.BinaryIO, + last_sent_file: pathlib.Path, + start_byte_file: pathlib.Path, +) -> t.List[ReadingPosition]: + _maybe_upgrade_last_sent(infile, last_sent_file, start_byte_file) + start_byte = _read_start_byte(start_byte_file) + infile.seek(start_byte, os.SEEK_SET) + reader = bson.decode_file_iter(infile, BSON_OPTIONS) + readings: t.List[ReadingPosition] = [] + end_pos = infile.tell() + try: + for entry in reader: + data = bson_encode(entry) + end_pos = infile.tell() + readings.append(ReadingPosition(data, end_pos)) + except bson.InvalidBSON: + infile.seek(end_pos, os.SEEK_SET) + infile.truncate(end_pos) + return readings + + +def _read_start_byte(path: pathlib.Path) -> int: + try: + with path.open('r') as infile: + return int(infile.read()) + except (OSError, ValueError): + return 0 + + +def _maybe_upgrade_last_sent( + infile: t.BinaryIO, + last_sent_file: pathlib.Path, + start_byte_file: pathlib.Path, +) -> None: + """If there's a last-sent file, upgrades it to start-byte.""" + last_sent = _read_last_sent(last_sent_file) + if not last_sent: + return + reader = bson.decode_file_iter(infile, BSON_OPTIONS) + last_good = infile.tell() + try: + for entry in reader: + try: + timestamp: datetime.datetime = entry['sample_time'] + except KeyError: + continue # Invalid entry; skip it. + if last_sent < timestamp: + break + last_good = infile.tell() + except bson.InvalidBSON: + infile.seek(last_good, os.SEEK_SET) + infile.truncate(last_good) + _atomic_write(start_byte_file, str(last_good)) + last_sent_file.unlink() + + def _read_last_sent(path: pathlib.Path) -> t.Optional[datetime.datetime]: try: with path.open('r') as infile: @@ -141,28 +217,5 @@ tz_aware=True, tzinfo=pytz.UTC) -def bson_encode(data: t.Dict[str, t.Any]) -> bytes: +def bson_encode(data: t.Dict[str, object]) -> bytes: return bson.BSON.encode(data, codec_options=BSON_OPTIONS) - - -def _read_unsent_and_advance( - infile: t.BinaryIO, last_sent: t.Optional[datetime.datetime], -) -> t.List[types.Reading]: - """Reads all the unsent Readings and advances the file pointer to the end. - """ - reader = bson.decode_file_iter(infile, BSON_OPTIONS) - last_good = infile.tell() - unsent: t.List[types.Reading] = [] - try: - for entry in reader: - last_good = infile.tell() - try: - reading = types.Reading(**entry) - except TypeError: - continue # Invalid entry; skip it. - if not last_sent or last_sent < reading.sample_time: - unsent.append(reading) - except bson.InvalidBSON: - infile.seek(last_good, os.SEEK_SET) - infile.truncate(last_good) - return unsent diff -r 4c81182eaa6b -r c01f9929ae38 weatherlog/logger_test.py --- a/weatherlog/logger_test.py Sun Sep 29 12:11:16 2019 -0400 +++ b/weatherlog/logger_test.py Tue Oct 15 22:40:24 2019 -0400 @@ -46,6 +46,9 @@ return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC) +bs = logger.bson_encode + + class LoggerTest(unittest.TestCase): maxDiff = None @@ -65,24 +68,24 @@ logger.BufferedLogger(self.temp_dir.name, writer) ) as bl: bl.WAIT_TIME = 0.2 - bl.write(types.Reading(ts(3), 1, 2)) - bl.write(types.Reading(ts(6), 4, 5)) - bl.write(types.Reading(ts(8), 10, 9)) + bl.write({'first': 'item'}) + bl.write({'entry': 2}) + bl.write({'thing': 'three'}) bl.start() time.sleep(1) # Ensure that we get an entire logger cycle in. self.assertEqual( writer.writes, [ - (types.Reading(ts(3), 1, 2), types.Reading(ts(6), 4, 5)), - (types.Reading(ts(8), 10, 9),), + (bs({'first': 'item'}), bs({'entry': 2})), + (bs({'thing': 'three'}),), ], ) - self.assertEqual(self._read_last_sent(), '8.0') + self.assertEqual(self._read_last_sent(), '59') self.assertEqual(self._read_bsons(), [ - dict(sample_time=ts(3), temp_c=1, rh_pct=2), - dict(sample_time=ts(6), temp_c=4, rh_pct=5), - dict(sample_time=ts(8), temp_c=10, rh_pct=9), + {'first': 'item'}, + {'entry': 2}, + {'thing': 'three'}, ]) def test_append_and_resume(self): @@ -92,10 +95,10 @@ ] with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile: for value in existing_values: - outfile.write(logger.bson_encode(value)) + outfile.write(bs(value)) outfile.write(b'non-BSON garbage') - with (self.temp_path / logger.LAST_SENT_FILENAME).open('w') as outfile: + with (self.temp_path / logger.OLD_LAST_TS).open('w') as outfile: outfile.write('10') writer = FakeWriter() @@ -105,18 +108,50 @@ bl.WAIT_TIME = 0.2 bl.start() time.sleep(0.5) - bl.write(types.Reading(ts(99), temp_c=-40, rh_pct=2)) + bl.write({'some new': 'entry'}) time.sleep(0.5) - self.assertEqual(self._read_last_sent(), '99.0') + self.assertEqual(self._read_last_sent(), '125') self.assertEqual(self._read_bsons(), [ dict(sample_time=ts(10), temp_c=20, rh_pct=30), dict(sample_time=ts(60), temp_c=10, rh_pct=99), - dict(sample_time=ts(99), temp_c=-40, rh_pct=2), + {'some new': 'entry'}, ]) self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [ - types.Reading(ts(60), 10, 99), - types.Reading(ts(99), -40, 2), + bs(dict(sample_time=ts(60), temp_c=10, rh_pct=99)), + bs({'some new': 'entry'}), + ]) + + def test_resume_from_byte(self): + existing_values = [ + {'old': 'value'}, + {'unsent': 'value'}, + ] + with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile: + for value in existing_values: + outfile.write(bs(value)) + outfile.write(b'non-BSON garbage') + with (self.temp_path / logger.START_BYTE).open('w') as outfile: + outfile.write('20') # immediately after 'old: value' + + writer = FakeWriter() + with contextlib.closing( + logger.BufferedLogger(str(self.temp_path), writer) + ) as bl: + bl.WAIT_TIME = 0.2 + bl.start() + bl.write({'some new': 'entry'}) + time.sleep(0.5) + + self.assertEqual(self._read_last_sent(), '68') + self.assertEqual(self._read_bsons(), [ + {'old': 'value'}, + {'unsent': 'value'}, + {'some new': 'entry'}, + ]) + self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [ + bs({'unsent': 'value'}), + bs({'some new': 'entry'}), ]) def test_send_failure(self): @@ -126,34 +161,34 @@ ) as bl: bl.WAIT_TIME = 0.2 bl.start() - bl.write(types.Reading(ts(1337), 420, 69)) + bl.write({'cool write': 'succeeds'}) time.sleep(0.5) writer.is_bad = True - bl.write(types.Reading(ts(31337), 666, 999)) + bl.write({'bad write': 'fails'}) time.sleep(0.5) - self.assertEqual(self._read_last_sent(), '1337.0') + self.assertEqual(self._read_last_sent(), '30') self.assertEqual( - writer.writes, [(types.Reading(ts(1337), 420, 69),)]) + writer.writes, [(bs({'cool write': 'succeeds'}),)]) self.assertEqual(self._read_bsons(), [ - dict(sample_time=ts(1337), temp_c=420, rh_pct=69), - dict(sample_time=ts(31337), temp_c=666, rh_pct=999), + {'cool write': 'succeeds'}, + {'bad write': 'fails'}, ]) # Ensure that we resume writing again when the condition clears. writer.is_bad = False time.sleep(0.5) - self.assertEqual(self._read_last_sent(), '31337.0') + self.assertEqual(self._read_last_sent(), '56') self.assertEqual( writer.writes, [ - (types.Reading(ts(1337), 420, 69),), - (types.Reading(ts(31337), 666, 999),), + (bs({'cool write': 'succeeds'}),), + (bs({'bad write': 'fails'}),), ]) self.assertEqual(self._read_bsons(), [ - dict(sample_time=ts(1337), temp_c=420, rh_pct=69), - dict(sample_time=ts(31337), temp_c=666, rh_pct=999), + {'cool write': 'succeeds'}, + {'bad write': 'fails'}, ]) def test_fail_upon_lock(self): @@ -177,8 +212,48 @@ # Test that it works after the lock is released. logger.BufferedLogger(str(self.temp_path), FakeWriter()).close() + def test_upgrade_last_sent(self): + for (timestamp, byte_count) in [ + ('5', '0'), + ('20', '52'), + ('30', '78'), + ]: + bson_file = self.temp_path / logger.BSON_FILENAME + with bson_file.open('wb') as outfile: + outfile.write(logger.bson_encode(dict(sample_time=ts(10)))) + outfile.write(logger.bson_encode(dict(sample_time=ts(20)))) + outfile.write(logger.bson_encode(dict(sample_time=ts(30)))) + outfile.write(b'some bogus data') + last_sent = self.temp_path / logger.OLD_LAST_TS + with last_sent.open('w') as outfile: + outfile.write(timestamp) + start_byte = self.temp_path / logger.START_BYTE + with bson_file.open('r+b') as infile: + logger._read_unsent_and_upgrade( + infile, last_sent, start_byte) + self.assertFalse(last_sent.exists()) + with start_byte.open('r') as infile: + self.assertEqual(infile.read(), byte_count) + + def test_upgrade_last_sent_no_last_sent(self): + bson_file = self.temp_path / logger.BSON_FILENAME + with bson_file.open('wb') as outfile: + outfile.write(logger.bson_encode(dict(sample_time=ts(10)))) + outfile.write(logger.bson_encode(dict(sample_time=ts(20)))) + outfile.write(logger.bson_encode(dict(sample_time=ts(30)))) + last_sent = self.temp_path / logger.OLD_LAST_TS + start_byte = self.temp_path / logger.START_BYTE + with start_byte.open('w') as outfile: + outfile.write('untouched') + with bson_file.open('r+b') as infile: + logger._read_unsent_and_upgrade( + infile, last_sent, start_byte) + self.assertFalse(last_sent.exists()) + with start_byte.open('r') as infile: + self.assertEqual(infile.read(), 'untouched') + def _read_last_sent(self): - with (self.temp_path / logger.LAST_SENT_FILENAME).open('r') as infile: + with (self.temp_path / logger.START_BYTE).open('r') as infile: return infile.read() def _read_bsons(self): diff -r 4c81182eaa6b -r c01f9929ae38 weatherlog/types.py --- a/weatherlog/types.py Sun Sep 29 12:11:16 2019 -0400 +++ b/weatherlog/types.py Tue Oct 15 22:40:24 2019 -0400 @@ -9,7 +9,7 @@ def _utc_now() -> datetime.datetime: """utcnow, but timezone-aware.""" - return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) + return datetime.datetime.now(tz=pytz.UTC) @attr.s(frozen=True, slots=True)