Mercurial > personal > weather-server
diff weather_server/logfile_test.py @ 31:9bc3687e1e5e
logfile: Add an index, and don't keep everything in RAM.
- Adds index BSON file, updated upon writing.
- Limits amount of data in RAM.
- Gracefully handles writes that don't update index.
| author | Paul Fisher <paul@pfish.zone> |
|---|---|
| date | Tue, 07 Jul 2020 19:51:30 -0400 |
| parents | 20c8ec56e447 |
| children | b77c8e7d2742 |
line wrap: on
line diff
--- a/weather_server/logfile_test.py Tue May 19 10:15:29 2020 -0400 +++ b/weather_server/logfile_test.py Tue Jul 07 19:51:30 2020 -0400 @@ -3,7 +3,6 @@ import os.path import pathlib import tempfile -import threading import unittest import bson @@ -25,6 +24,7 @@ super().setUp() self.temp_dir = tempfile.TemporaryDirectory() self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson' + self.index_path = pathlib.Path(str(self.log_path) + '.index.bson') def tearDown(self): self.temp_dir.cleanup() @@ -34,7 +34,7 @@ lg = logfile.Logger( str(self.log_path), sample_field='x') with contextlib.closing(lg) as logger: - self.assertEqual(logger.data, ()) + self.assertEqual(logger.cached_data, ()) def test_fails_to_open(self): with self.assertRaises(OSError): @@ -64,7 +64,7 @@ sample_field='sample_time', )) as logger: self.assertEqual( - logger.data, + logger.cached_data, ( dict( sample_time=ts(123), @@ -84,6 +84,8 @@ with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='sample_time', + cached_entries=4, + index_gap=2, )) as logger: logger.write_rows([ # Ignored, since it's older than the newest entry. @@ -101,7 +103,7 @@ ), ]) self.assertEqual( - logger.data, + logger.cached_data, ( dict( sample_time=ts(123), @@ -132,11 +134,15 @@ ingest_time=ts(200), ), ]) + self.assertEqual(self.read_index(), [ + dict(entry_count=0, entry_key=ts(123), byte=0), + ]) def test_outside_writes(self): with contextlib.closing(logfile.Logger( str(self.log_path), sample_field='sample_time', + index_gap=2, )) as logger: logger.write_rows([ dict( @@ -160,7 +166,9 @@ ingest_time=ts(4096), ))) outfile.flush() - self.assertEqual(logger.data, ( + # Empty write to update the index. + logger.write_rows([]) + self.assertEqual(logger.cached_data, ( dict( sample_time=ts(100), temp_c=999, @@ -180,12 +188,199 @@ ingest_time=ts(4096), ), )) + self.assertEqual(self.read_bsons(), [ + dict( + sample_time=ts(100), + temp_c=999, + rh_pct=666, + ingest_time=ts(101), + ), + dict( + sample_time=ts(125), + temp_c=333, + rh_pct=777, + ingest_time=ts(200), + ), + dict( + sample_time=ts(1024), + temp_c=256, + rh_pct=128, + ingest_time=ts(4096), + ), + ]) + self.assertEqual(self.read_index(), [ + dict(entry_count=0, entry_key=ts(100), byte=0), + dict(entry_count=2, entry_key=ts(1024), byte=142), + ]) + + def test_lots_of_outside_writes(self): + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=4, + index_gap=2, + )) as logger: + logger.write_rows(make_messages(10)) + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=4, + index_gap=2, + )) as other_logger: + other_logger.write_rows(make_messages(10, 10)) + logger.write_rows(make_messages(1, 20)) + self.assertEqual(logger.cached_data, ( + dict(key='0011', value=17), + dict(key='0012', value=18), + dict(key='0013', value=19), + dict(key='0014', value=20), + )) + self.assertEqual(self.read_bsons(), make_messages(21)) + self.assertEqual(self.read_index(), [ + dict(entry_count=0, entry_key='0000', byte=0), + dict(entry_count=2, entry_key='0002', byte=60), + dict(entry_count=4, entry_key='0004', byte=120), + dict(entry_count=6, entry_key='0006', byte=180), + dict(entry_count=8, entry_key='0008', byte=240), + dict(entry_count=10, entry_key='000a', byte=300), + dict(entry_count=12, entry_key='000c', byte=360), + dict(entry_count=14, entry_key='000e', byte=420), + dict(entry_count=16, entry_key='0010', byte=480), + dict(entry_count=18, entry_key='0012', byte=540), + dict(entry_count=20, entry_key='0014', byte=600), + ]) + + def test_outside_writes_that_dont_update_index(self): + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=4, + index_gap=2, + )) as logger: + logger.write_rows(make_messages(10)) + with self.log_path.open('ab') as outfile: + for item in make_messages(16, 256): + outfile.write(common.bson_encode(item)) + self.assertEqual(logger.cached_data, ( + dict(key='010c', value=268), + dict(key='010d', value=269), + dict(key='010e', value=270), + dict(key='010f', value=271), + )) + self.assertEqual(self.read_bsons(), [ + dict(key='0000', value=0), + dict(key='0001', value=1), + dict(key='0002', value=2), + dict(key='0003', value=3), + dict(key='0004', value=4), + dict(key='0005', value=5), + dict(key='0006', value=6), + dict(key='0007', value=7), + dict(key='0008', value=8), + dict(key='0009', value=9), + dict(key='0100', value=256), + dict(key='0101', value=257), + dict(key='0102', value=258), + dict(key='0103', value=259), + dict(key='0104', value=260), + dict(key='0105', value=261), + dict(key='0106', value=262), + dict(key='0107', value=263), + dict(key='0108', value=264), + dict(key='0109', value=265), + dict(key='010a', value=266), + dict(key='010b', value=267), + dict(key='010c', value=268), + dict(key='010d', value=269), + dict(key='010e', value=270), + dict(key='010f', value=271), + ]) + self.assertEqual(self.read_index(), [ + dict(entry_count=0, entry_key='0000', byte=0), + dict(entry_count=2, entry_key='0002', byte=60), + dict(entry_count=4, entry_key='0004', byte=120), + dict(entry_count=6, entry_key='0006', byte=180), + dict(entry_count=8, entry_key='0008', byte=240), + ]) + + def test_outside_write_but_dont_skip(self): + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=10, + index_gap=4, + )) as logger: + logger.write_rows(make_messages(10)) + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=10, + index_gap=4, + )) as other_logger: + other_logger.write_rows(make_messages(5, 10)) + self.assertEqual(logger.cached_data, ( + dict(key='0005', value=5), + dict(key='0006', value=6), + dict(key='0007', value=7), + dict(key='0008', value=8), + dict(key='0009', value=9), + dict(key='000a', value=10), + dict(key='000b', value=11), + dict(key='000c', value=12), + dict(key='000d', value=13), + dict(key='000e', value=14), + )) + logger.write_rows(make_messages(5, 15)) + self.assertEqual(self.read_index(), [ + dict(entry_count=0, entry_key='0000', byte=0), + dict(entry_count=4, entry_key='0004', byte=120), + dict(entry_count=8, entry_key='0008', byte=240), + dict(entry_count=12, entry_key='000c', byte=360), + dict(entry_count=16, entry_key='0010', byte=480), + ]) + logger.write_rows(make_messages(1, 20)) + self.assertEqual(self.read_index(), [ + dict(entry_count=0, entry_key='0000', byte=0), + dict(entry_count=4, entry_key='0004', byte=120), + dict(entry_count=8, entry_key='0008', byte=240), + dict(entry_count=12, entry_key='000c', byte=360), + dict(entry_count=16, entry_key='0010', byte=480), + dict(entry_count=20, entry_key='0014', byte=600), + ]) + + def test_start_and_skip(self): + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=4, + index_gap=2, + )) as logger: + logger.write_rows(make_messages(10)) + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='key', + cached_entries=4, + index_gap=2, + )) as other_logger: + self.assertEqual( + other_logger.cached_data, tuple(make_messages(4, 6))) def read_bsons(self): with self.log_path.open('rb') as infile: return bson.decode_all( infile.read(), common.BSON_OPTIONS) + def read_index(self): + with self.index_path.open('rb') as infile: + return bson.decode_all( + infile.read(), common.BSON_OPTIONS) + + +def make_messages(count, start=0): + return [ + dict(key=format(n, '04x'), value=n) + for n in range(start, start + count)] + if __name__ == '__main__': unittest.main()
