Mercurial > personal > weather-server
view weather_server/logfile_test.py @ 34:8d3f32455575
Bump version.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sat, 12 Jun 2021 20:24:30 +0000 |
parents | 9bc3687e1e5e |
children |
line wrap: on
line source
import contextlib import datetime import os.path import pathlib import tempfile import unittest import bson import pytz from . import common from . import logfile def ts(n): return datetime.datetime.utcfromtimestamp(n).replace(tzinfo=pytz.UTC) class LoggerTest(unittest.TestCase): maxDiff = None def setUp(self): super().setUp() self.temp_dir = tempfile.TemporaryDirectory() self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson' self.index_path = pathlib.Path(str(self.log_path) + '.index.bson') def tearDown(self): self.temp_dir.cleanup() super().tearDown() def test_empty(self): lg = logfile.Logger( str(self.log_path), sample_field='x') with contextlib.closing(lg) as logger: self.assertEqual(logger.cached_data, ()) def test_fails_to_open(self): with self.assertRaises(OSError): logfile.Logger( os.path.join( self.temp_dir.name, 'nonexistent-directory', 'bogus-filename'), sample_field='unimportant') def test_del(self): lg = logfile.Logger( str(self.log_path), sample_field='x') del lg def test_loading(self): with self.log_path.open('wb') as outfile: outfile.write(common.bson_encode(dict( sample_time=ts(123), temp_c=420, rh_pct=69, ingest_time=ts(125), ))) outfile.write(b'garbage to ignore') with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='sample_time', )) as logger: self.assertEqual( logger.cached_data, ( dict( sample_time=ts(123), temp_c=420, rh_pct=69, ingest_time=ts(125)), )) def test_writing(self): with self.log_path.open('wb') as outfile: outfile.write(common.bson_encode(dict( sample_time=ts(123), temp_c=420, rh_pct=69, ingest_time=ts(125), ))) with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='sample_time', cached_entries=4, index_gap=2, )) as logger: logger.write_rows([ # Ignored, since it's older than the newest entry. dict( sample_time=ts(100), temp_c=999, rh_pct=666, ingest_time=ts(101), ), dict( sample_time=ts(125), temp_c=333, rh_pct=777, ingest_time=ts(200), ), ]) self.assertEqual( logger.cached_data, ( dict( sample_time=ts(123), temp_c=420, rh_pct=69, ingest_time=ts(125), ), dict( sample_time=ts(125), temp_c=333, rh_pct=777, ingest_time=ts(200), ), ) ) self.assertEqual(self.read_bsons(), [ dict( sample_time=ts(123), temp_c=420, rh_pct=69, ingest_time=ts(125), ), dict( sample_time=ts(125), temp_c=333, rh_pct=777, ingest_time=ts(200), ), ]) self.assertEqual(self.read_index(), [ dict(entry_count=0, entry_key=ts(123), byte=0), ]) def test_outside_writes(self): with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='sample_time', index_gap=2, )) as logger: logger.write_rows([ dict( sample_time=ts(100), temp_c=999, rh_pct=666, ingest_time=ts(101), ), dict( sample_time=ts(125), temp_c=333, rh_pct=777, ingest_time=ts(200), ), ]) with self.log_path.open('ab') as outfile: outfile.write(common.bson_encode(dict( sample_time=ts(1024), temp_c=256, rh_pct=128, ingest_time=ts(4096), ))) outfile.flush() # Empty write to update the index. logger.write_rows([]) self.assertEqual(logger.cached_data, ( dict( sample_time=ts(100), temp_c=999, rh_pct=666, ingest_time=ts(101), ), dict( sample_time=ts(125), temp_c=333, rh_pct=777, ingest_time=ts(200), ), dict( sample_time=ts(1024), temp_c=256, rh_pct=128, ingest_time=ts(4096), ), )) self.assertEqual(self.read_bsons(), [ dict( sample_time=ts(100), temp_c=999, rh_pct=666, ingest_time=ts(101), ), dict( sample_time=ts(125), temp_c=333, rh_pct=777, ingest_time=ts(200), ), dict( sample_time=ts(1024), temp_c=256, rh_pct=128, ingest_time=ts(4096), ), ]) self.assertEqual(self.read_index(), [ dict(entry_count=0, entry_key=ts(100), byte=0), dict(entry_count=2, entry_key=ts(1024), byte=142), ]) def test_lots_of_outside_writes(self): with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='key', cached_entries=4, index_gap=2, )) as logger: logger.write_rows(make_messages(10)) with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='key', cached_entries=4, index_gap=2, )) as other_logger: other_logger.write_rows(make_messages(10, 10)) logger.write_rows(make_messages(1, 20)) self.assertEqual(logger.cached_data, ( dict(key='0011', value=17), dict(key='0012', value=18), dict(key='0013', value=19), dict(key='0014', value=20), )) self.assertEqual(self.read_bsons(), make_messages(21)) self.assertEqual(self.read_index(), [ dict(entry_count=0, entry_key='0000', byte=0), dict(entry_count=2, entry_key='0002', byte=60), dict(entry_count=4, entry_key='0004', byte=120), dict(entry_count=6, entry_key='0006', byte=180), dict(entry_count=8, entry_key='0008', byte=240), dict(entry_count=10, entry_key='000a', byte=300), dict(entry_count=12, entry_key='000c', byte=360), dict(entry_count=14, entry_key='000e', byte=420), dict(entry_count=16, entry_key='0010', byte=480), dict(entry_count=18, entry_key='0012', byte=540), dict(entry_count=20, entry_key='0014', byte=600), ]) def test_outside_writes_that_dont_update_index(self): with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='key', cached_entries=4, index_gap=2, )) as logger: logger.write_rows(make_messages(10)) with self.log_path.open('ab') as outfile: for item in make_messages(16, 256): outfile.write(common.bson_encode(item)) self.assertEqual(logger.cached_data, ( dict(key='010c', value=268), dict(key='010d', value=269), dict(key='010e', value=270), dict(key='010f', value=271), )) self.assertEqual(self.read_bsons(), [ dict(key='0000', value=0), dict(key='0001', value=1), dict(key='0002', value=2), dict(key='0003', value=3), dict(key='0004', value=4), dict(key='0005', value=5), dict(key='0006', value=6), dict(key='0007', value=7), dict(key='0008', value=8), dict(key='0009', value=9), dict(key='0100', value=256), dict(key='0101', value=257), dict(key='0102', value=258), dict(key='0103', value=259), dict(key='0104', value=260), dict(key='0105', value=261), dict(key='0106', value=262), dict(key='0107', value=263), dict(key='0108', value=264), dict(key='0109', value=265), dict(key='010a', value=266), dict(key='010b', value=267), dict(key='010c', value=268), dict(key='010d', value=269), dict(key='010e', value=270), dict(key='010f', value=271), ]) self.assertEqual(self.read_index(), [ dict(entry_count=0, entry_key='0000', byte=0), dict(entry_count=2, entry_key='0002', byte=60), dict(entry_count=4, entry_key='0004', byte=120), dict(entry_count=6, entry_key='0006', byte=180), dict(entry_count=8, entry_key='0008', byte=240), ]) def test_outside_write_but_dont_skip(self): with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='key', cached_entries=10, index_gap=4, )) as logger: logger.write_rows(make_messages(10)) with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='key', cached_entries=10, index_gap=4, )) as other_logger: other_logger.write_rows(make_messages(5, 10)) self.assertEqual(logger.cached_data, ( dict(key='0005', value=5), dict(key='0006', value=6), dict(key='0007', value=7), dict(key='0008', value=8), dict(key='0009', value=9), dict(key='000a', value=10), dict(key='000b', value=11), dict(key='000c', value=12), dict(key='000d', value=13), dict(key='000e', value=14), )) logger.write_rows(make_messages(5, 15)) self.assertEqual(self.read_index(), [ dict(entry_count=0, entry_key='0000', byte=0), dict(entry_count=4, entry_key='0004', byte=120), dict(entry_count=8, entry_key='0008', byte=240), dict(entry_count=12, entry_key='000c', byte=360), dict(entry_count=16, entry_key='0010', byte=480), ]) logger.write_rows(make_messages(1, 20)) self.assertEqual(self.read_index(), [ dict(entry_count=0, entry_key='0000', byte=0), dict(entry_count=4, entry_key='0004', byte=120), dict(entry_count=8, entry_key='0008', byte=240), dict(entry_count=12, entry_key='000c', byte=360), dict(entry_count=16, entry_key='0010', byte=480), dict(entry_count=20, entry_key='0014', byte=600), ]) def test_start_and_skip(self): with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='key', cached_entries=4, index_gap=2, )) as logger: logger.write_rows(make_messages(10)) with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='key', cached_entries=4, index_gap=2, )) as other_logger: self.assertEqual( other_logger.cached_data, tuple(make_messages(4, 6))) def read_bsons(self): with self.log_path.open('rb') as infile: return bson.decode_all( infile.read(), common.BSON_OPTIONS) def read_index(self): with self.index_path.open('rb') as infile: return bson.decode_all( infile.read(), common.BSON_OPTIONS) def make_messages(count, start=0): return [ dict(key=format(n, '04x'), value=n) for n in range(start, start + count)] if __name__ == '__main__': unittest.main()