Mercurial > personal > weatherlog
view weatherlog/logger_test.py @ 14:c01f9929ae38
Make logger and HTTP writer more general and resilient.
This makes the logger and HTTP writer more general, by removing
any dependency upon the exact data type they are writing.
They can now handle any type of BSON-serializable dict,
and track what they have sent by keeping track of the last *byte*,
not the last timestamp.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Tue, 15 Oct 2019 22:40:24 -0400 |
parents | 8a350ec1aa78 |
children | 770215590d80 |
line wrap: on
line source
import contextlib import datetime import fcntl import itertools import multiprocessing import pathlib import tempfile import time import unittest import bson import pytz from . import logger from . import types class FakeWriter(logger.RemoteWriter): BATCH_SIZE = 2 def __init__(self): self.writes = [] def write(self, readings): self.writes.append(tuple(readings)) class FlakyWriter(logger.RemoteWriter): is_bad = False def __init__(self): self.writer = FakeWriter() def write(self, readings): if self.is_bad: raise logger.RemoteWriteError('I am bad!') self.writer.write(readings) @property def writes(self): return self.writer.writes def ts(t): return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC) bs = logger.bson_encode class LoggerTest(unittest.TestCase): maxDiff = None def setUp(self): super().setUp() self.temp_dir = tempfile.TemporaryDirectory() self.temp_path = pathlib.Path(self.temp_dir.name) def tearDown(self): self.temp_dir.cleanup() super().tearDown() def test_from_nothing(self): writer = FakeWriter() with contextlib.closing( logger.BufferedLogger(self.temp_dir.name, writer) ) as bl: bl.WAIT_TIME = 0.2 bl.write({'first': 'item'}) bl.write({'entry': 2}) bl.write({'thing': 'three'}) bl.start() time.sleep(1) # Ensure that we get an entire logger cycle in. self.assertEqual( writer.writes, [ (bs({'first': 'item'}), bs({'entry': 2})), (bs({'thing': 'three'}),), ], ) self.assertEqual(self._read_last_sent(), '59') self.assertEqual(self._read_bsons(), [ {'first': 'item'}, {'entry': 2}, {'thing': 'three'}, ]) def test_append_and_resume(self): existing_values = [ dict(sample_time=ts(10), temp_c=20, rh_pct=30), dict(sample_time=ts(60), temp_c=10, rh_pct=99), ] with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile: for value in existing_values: outfile.write(bs(value)) outfile.write(b'non-BSON garbage') with (self.temp_path / logger.OLD_LAST_TS).open('w') as outfile: outfile.write('10') writer = FakeWriter() with contextlib.closing( logger.BufferedLogger(str(self.temp_path), writer) ) as bl: bl.WAIT_TIME = 0.2 bl.start() time.sleep(0.5) bl.write({'some new': 'entry'}) time.sleep(0.5) self.assertEqual(self._read_last_sent(), '125') self.assertEqual(self._read_bsons(), [ dict(sample_time=ts(10), temp_c=20, rh_pct=30), dict(sample_time=ts(60), temp_c=10, rh_pct=99), {'some new': 'entry'}, ]) self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [ bs(dict(sample_time=ts(60), temp_c=10, rh_pct=99)), bs({'some new': 'entry'}), ]) def test_resume_from_byte(self): existing_values = [ {'old': 'value'}, {'unsent': 'value'}, ] with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile: for value in existing_values: outfile.write(bs(value)) outfile.write(b'non-BSON garbage') with (self.temp_path / logger.START_BYTE).open('w') as outfile: outfile.write('20') # immediately after 'old: value' writer = FakeWriter() with contextlib.closing( logger.BufferedLogger(str(self.temp_path), writer) ) as bl: bl.WAIT_TIME = 0.2 bl.start() bl.write({'some new': 'entry'}) time.sleep(0.5) self.assertEqual(self._read_last_sent(), '68') self.assertEqual(self._read_bsons(), [ {'old': 'value'}, {'unsent': 'value'}, {'some new': 'entry'}, ]) self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [ bs({'unsent': 'value'}), bs({'some new': 'entry'}), ]) def test_send_failure(self): writer = FlakyWriter() with contextlib.closing( logger.BufferedLogger(str(self.temp_path), writer) ) as bl: bl.WAIT_TIME = 0.2 bl.start() bl.write({'cool write': 'succeeds'}) time.sleep(0.5) writer.is_bad = True bl.write({'bad write': 'fails'}) time.sleep(0.5) self.assertEqual(self._read_last_sent(), '30') self.assertEqual( writer.writes, [(bs({'cool write': 'succeeds'}),)]) self.assertEqual(self._read_bsons(), [ {'cool write': 'succeeds'}, {'bad write': 'fails'}, ]) # Ensure that we resume writing again when the condition clears. writer.is_bad = False time.sleep(0.5) self.assertEqual(self._read_last_sent(), '56') self.assertEqual( writer.writes, [ (bs({'cool write': 'succeeds'}),), (bs({'bad write': 'fails'}),), ]) self.assertEqual(self._read_bsons(), [ {'cool write': 'succeeds'}, {'bad write': 'fails'}, ]) def test_fail_upon_lock(self): bson_file = str(self.temp_path / logger.BSON_FILENAME) out_queue = multiprocessing.Queue() in_queue = multiprocessing.Queue() # This needs to be in a separate multiprocessing.Process # since flock-based file locks are per-process, not per-thread. proc = multiprocessing.Process( target=_lock_holder, args=(bson_file, out_queue, in_queue)) proc.start() in_queue.get() # Wait for the lock to be acquired. with self.assertRaises(OSError): logger.BufferedLogger(str(self.temp_path), FakeWriter()) out_queue.put(None) # Notify that we're done. out_queue.close() proc.join() proc.close() # Test that it works after the lock is released. logger.BufferedLogger(str(self.temp_path), FakeWriter()).close() def test_upgrade_last_sent(self): for (timestamp, byte_count) in [ ('5', '0'), ('20', '52'), ('30', '78'), ]: bson_file = self.temp_path / logger.BSON_FILENAME with bson_file.open('wb') as outfile: outfile.write(logger.bson_encode(dict(sample_time=ts(10)))) outfile.write(logger.bson_encode(dict(sample_time=ts(20)))) outfile.write(logger.bson_encode(dict(sample_time=ts(30)))) outfile.write(b'some bogus data') last_sent = self.temp_path / logger.OLD_LAST_TS with last_sent.open('w') as outfile: outfile.write(timestamp) start_byte = self.temp_path / logger.START_BYTE with bson_file.open('r+b') as infile: logger._read_unsent_and_upgrade( infile, last_sent, start_byte) self.assertFalse(last_sent.exists()) with start_byte.open('r') as infile: self.assertEqual(infile.read(), byte_count) def test_upgrade_last_sent_no_last_sent(self): bson_file = self.temp_path / logger.BSON_FILENAME with bson_file.open('wb') as outfile: outfile.write(logger.bson_encode(dict(sample_time=ts(10)))) outfile.write(logger.bson_encode(dict(sample_time=ts(20)))) outfile.write(logger.bson_encode(dict(sample_time=ts(30)))) last_sent = self.temp_path / logger.OLD_LAST_TS start_byte = self.temp_path / logger.START_BYTE with start_byte.open('w') as outfile: outfile.write('untouched') with bson_file.open('r+b') as infile: logger._read_unsent_and_upgrade( infile, last_sent, start_byte) self.assertFalse(last_sent.exists()) with start_byte.open('r') as infile: self.assertEqual(infile.read(), 'untouched') def _read_last_sent(self): with (self.temp_path / logger.START_BYTE).open('r') as infile: return infile.read() def _read_bsons(self): with (self.temp_path / logger.BSON_FILENAME).open('rb') as infile: return bson.decode_all(infile.read(), logger.BSON_OPTIONS) def _lock_holder(path, in_queue, out_queue): with open(path, 'w') as infile: fcntl.flock(infile, fcntl.LOCK_SH) out_queue.put(None) # Notify that we've acquired the lock. out_queue.close() in_queue.get() # Wait for the test to complete before closing. if __name__ == '__main__': multiprocessing.set_start_method('spawn') unittest.main()