Mercurial > personal > weatherlog
view weatherlog/logger_test.py @ 16:770215590d80
logger: Actually pass the right type.
The logger was improperly giving the writer already-encoded values.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Thu, 17 Oct 2019 22:19:33 -0400 |
parents | c01f9929ae38 |
children |
line wrap: on
line source
import contextlib import datetime import fcntl import itertools import multiprocessing import pathlib import tempfile import time import unittest import bson import pytz from . import logger class FakeWriter(logger.RemoteWriter): BATCH_SIZE = 2 def __init__(self): self.writes = [] def write(self, readings): self.writes.append(tuple(readings)) class FlakyWriter(logger.RemoteWriter): is_bad = False def __init__(self): self.writer = FakeWriter() def write(self, readings): if self.is_bad: raise logger.RemoteWriteError('I am bad!') self.writer.write(readings) @property def writes(self): return self.writer.writes def ts(t): return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC) bs = logger.bson_encode class LoggerTest(unittest.TestCase): maxDiff = None def setUp(self): super().setUp() self.temp_dir = tempfile.TemporaryDirectory() self.temp_path = pathlib.Path(self.temp_dir.name) def tearDown(self): self.temp_dir.cleanup() super().tearDown() def test_from_nothing(self): writer = FakeWriter() with contextlib.closing( logger.BufferedLogger(self.temp_dir.name, writer) ) as bl: bl.WAIT_TIME = 0.2 bl.write({'first': 'item'}) bl.write({'entry': 2}) bl.write({'thing': 'three'}) bl.start() time.sleep(1) # Ensure that we get an entire logger cycle in. self.assertEqual( writer.writes, [ ({'first': 'item'}, {'entry': 2}), ({'thing': 'three'},), ], ) self.assertEqual(self._read_last_sent(), '59') self.assertEqual(self._read_bsons(), [ {'first': 'item'}, {'entry': 2}, {'thing': 'three'}, ]) def test_append_and_resume(self): existing_values = [ dict(sample_time=ts(10), temp_c=20, rh_pct=30), dict(sample_time=ts(60), temp_c=10, rh_pct=99), ] with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile: for value in existing_values: outfile.write(bs(value)) outfile.write(b'non-BSON garbage') with (self.temp_path / logger.OLD_LAST_TS).open('w') as outfile: outfile.write('10') writer = FakeWriter() with contextlib.closing( logger.BufferedLogger(str(self.temp_path), writer) ) as bl: bl.WAIT_TIME = 0.2 bl.start() time.sleep(0.5) bl.write({'some new': 'entry'}) time.sleep(0.5) self.assertEqual(self._read_last_sent(), '125') self.assertEqual(self._read_bsons(), [ dict(sample_time=ts(10), temp_c=20, rh_pct=30), dict(sample_time=ts(60), temp_c=10, rh_pct=99), {'some new': 'entry'}, ]) self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [ dict(sample_time=ts(60), temp_c=10, rh_pct=99), {'some new': 'entry'}, ]) def test_resume_from_byte(self): existing_values = [ {'old': 'value'}, {'unsent': 'value'}, ] with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile: for value in existing_values: outfile.write(bs(value)) outfile.write(b'non-BSON garbage') with (self.temp_path / logger.START_BYTE).open('w') as outfile: outfile.write('20') # immediately after 'old: value' writer = FakeWriter() with contextlib.closing( logger.BufferedLogger(str(self.temp_path), writer) ) as bl: bl.WAIT_TIME = 0.2 bl.start() bl.write({'some new': 'entry'}) time.sleep(0.5) self.assertEqual(self._read_last_sent(), '68') self.assertEqual(self._read_bsons(), [ {'old': 'value'}, {'unsent': 'value'}, {'some new': 'entry'}, ]) self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [ {'unsent': 'value'}, {'some new': 'entry'}, ]) def test_send_failure(self): writer = FlakyWriter() with contextlib.closing( logger.BufferedLogger(str(self.temp_path), writer) ) as bl: bl.WAIT_TIME = 0.2 bl.start() bl.write({'cool write': 'succeeds'}) time.sleep(0.5) writer.is_bad = True bl.write({'bad write': 'fails'}) time.sleep(0.5) self.assertEqual(self._read_last_sent(), '30') self.assertEqual( writer.writes, [({'cool write': 'succeeds'},)]) self.assertEqual(self._read_bsons(), [ {'cool write': 'succeeds'}, {'bad write': 'fails'}, ]) # Ensure that we resume writing again when the condition clears. writer.is_bad = False time.sleep(0.5) self.assertEqual(self._read_last_sent(), '56') self.assertEqual( writer.writes, [ ({'cool write': 'succeeds'},), ({'bad write': 'fails'},), ]) self.assertEqual(self._read_bsons(), [ {'cool write': 'succeeds'}, {'bad write': 'fails'}, ]) def test_fail_upon_lock(self): bson_file = str(self.temp_path / logger.BSON_FILENAME) out_queue = multiprocessing.Queue() in_queue = multiprocessing.Queue() # This needs to be in a separate multiprocessing.Process # since flock-based file locks are per-process, not per-thread. proc = multiprocessing.Process( target=_lock_holder, args=(bson_file, out_queue, in_queue)) proc.start() in_queue.get() # Wait for the lock to be acquired. with self.assertRaises(OSError): logger.BufferedLogger(str(self.temp_path), FakeWriter()) out_queue.put(None) # Notify that we're done. out_queue.close() proc.join() proc.close() # Test that it works after the lock is released. logger.BufferedLogger(str(self.temp_path), FakeWriter()).close() def test_upgrade_last_sent(self): for (timestamp, byte_count) in [ ('5', '0'), ('20', '52'), ('30', '78'), ]: bson_file = self.temp_path / logger.BSON_FILENAME with bson_file.open('wb') as outfile: outfile.write(logger.bson_encode(dict(sample_time=ts(10)))) outfile.write(logger.bson_encode(dict(sample_time=ts(20)))) outfile.write(logger.bson_encode(dict(sample_time=ts(30)))) outfile.write(b'some bogus data') last_sent = self.temp_path / logger.OLD_LAST_TS with last_sent.open('w') as outfile: outfile.write(timestamp) start_byte = self.temp_path / logger.START_BYTE with bson_file.open('r+b') as infile: logger._read_unsent_and_upgrade( infile, last_sent, start_byte) self.assertFalse(last_sent.exists()) with start_byte.open('r') as infile: self.assertEqual(infile.read(), byte_count) def test_upgrade_last_sent_no_last_sent(self): bson_file = self.temp_path / logger.BSON_FILENAME with bson_file.open('wb') as outfile: outfile.write(logger.bson_encode(dict(sample_time=ts(10)))) outfile.write(logger.bson_encode(dict(sample_time=ts(20)))) outfile.write(logger.bson_encode(dict(sample_time=ts(30)))) last_sent = self.temp_path / logger.OLD_LAST_TS start_byte = self.temp_path / logger.START_BYTE with start_byte.open('w') as outfile: outfile.write('untouched') with bson_file.open('r+b') as infile: logger._read_unsent_and_upgrade( infile, last_sent, start_byte) self.assertFalse(last_sent.exists()) with start_byte.open('r') as infile: self.assertEqual(infile.read(), 'untouched') def _read_last_sent(self): with (self.temp_path / logger.START_BYTE).open('r') as infile: return infile.read() def _read_bsons(self): with (self.temp_path / logger.BSON_FILENAME).open('rb') as infile: return bson.decode_all(infile.read(), logger.BSON_OPTIONS) def _lock_holder(path, in_queue, out_queue): with open(path, 'w') as infile: fcntl.flock(infile, fcntl.LOCK_SH) out_queue.put(None) # Notify that we've acquired the lock. out_queue.close() in_queue.get() # Wait for the test to complete before closing. if __name__ == '__main__': multiprocessing.set_start_method('spawn') unittest.main()