diff weather_server/logfile_test.py @ 31:9bc3687e1e5e

logfile: Add an index, and don't keep everything in RAM. - Adds index BSON file, updated upon writing. - Limits amount of data in RAM. - Gracefully handles writes that don't update index.
author Paul Fisher <paul@pfish.zone>
date Tue, 07 Jul 2020 19:51:30 -0400
parents 20c8ec56e447
children b77c8e7d2742
line wrap: on
line diff
--- a/weather_server/logfile_test.py	Tue May 19 10:15:29 2020 -0400
+++ b/weather_server/logfile_test.py	Tue Jul 07 19:51:30 2020 -0400
@@ -3,7 +3,6 @@
 import os.path
 import pathlib
 import tempfile
-import threading
 import unittest
 
 import bson
@@ -25,6 +24,7 @@
         super().setUp()
         self.temp_dir = tempfile.TemporaryDirectory()
         self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson'
+        self.index_path = pathlib.Path(str(self.log_path) + '.index.bson')
 
     def tearDown(self):
         self.temp_dir.cleanup()
@@ -34,7 +34,7 @@
         lg = logfile.Logger(
             str(self.log_path), sample_field='x')
         with contextlib.closing(lg) as logger:
-            self.assertEqual(logger.data, ())
+            self.assertEqual(logger.cached_data, ())
 
     def test_fails_to_open(self):
         with self.assertRaises(OSError):
@@ -64,7 +64,7 @@
             sample_field='sample_time',
         )) as logger:
             self.assertEqual(
-                logger.data,
+                logger.cached_data,
                 (
                     dict(
                         sample_time=ts(123),
@@ -84,6 +84,8 @@
         with contextlib.closing(logfile.Logger(
             str(self.log_path),
             sample_field='sample_time',
+            cached_entries=4,
+            index_gap=2,
         )) as logger:
             logger.write_rows([
                 # Ignored, since it's older than the newest entry.
@@ -101,7 +103,7 @@
                 ),
             ])
             self.assertEqual(
-                logger.data,
+                logger.cached_data,
                 (
                     dict(
                         sample_time=ts(123),
@@ -132,11 +134,15 @@
                 ingest_time=ts(200),
             ),
         ])
+        self.assertEqual(self.read_index(), [
+            dict(entry_count=0, entry_key=ts(123), byte=0),
+        ])
 
     def test_outside_writes(self):
         with contextlib.closing(logfile.Logger(
             str(self.log_path),
             sample_field='sample_time',
+            index_gap=2,
         )) as logger:
             logger.write_rows([
                 dict(
@@ -160,7 +166,9 @@
                     ingest_time=ts(4096),
                 )))
                 outfile.flush()
-            self.assertEqual(logger.data, (
+            # Empty write to update the index.
+            logger.write_rows([])
+            self.assertEqual(logger.cached_data, (
                 dict(
                     sample_time=ts(100),
                     temp_c=999,
@@ -180,12 +188,199 @@
                     ingest_time=ts(4096),
                 ),
             ))
+        self.assertEqual(self.read_bsons(), [
+            dict(
+                sample_time=ts(100),
+                temp_c=999,
+                rh_pct=666,
+                ingest_time=ts(101),
+            ),
+            dict(
+                sample_time=ts(125),
+                temp_c=333,
+                rh_pct=777,
+                ingest_time=ts(200),
+            ),
+            dict(
+                sample_time=ts(1024),
+                temp_c=256,
+                rh_pct=128,
+                ingest_time=ts(4096),
+            ),
+        ])
+        self.assertEqual(self.read_index(), [
+            dict(entry_count=0, entry_key=ts(100), byte=0),
+            dict(entry_count=2, entry_key=ts(1024), byte=142),
+        ])
+
+    def test_lots_of_outside_writes(self):
+        with contextlib.closing(logfile.Logger(
+            str(self.log_path),
+            sample_field='key',
+            cached_entries=4,
+            index_gap=2,
+        )) as logger:
+            logger.write_rows(make_messages(10))
+            with contextlib.closing(logfile.Logger(
+                str(self.log_path),
+                sample_field='key',
+                cached_entries=4,
+                index_gap=2,
+            )) as other_logger:
+                other_logger.write_rows(make_messages(10, 10))
+                logger.write_rows(make_messages(1, 20))
+                self.assertEqual(logger.cached_data, (
+                    dict(key='0011', value=17),
+                    dict(key='0012', value=18),
+                    dict(key='0013', value=19),
+                    dict(key='0014', value=20),
+                ))
+                self.assertEqual(self.read_bsons(), make_messages(21))
+                self.assertEqual(self.read_index(), [
+                    dict(entry_count=0, entry_key='0000', byte=0),
+                    dict(entry_count=2, entry_key='0002', byte=60),
+                    dict(entry_count=4, entry_key='0004', byte=120),
+                    dict(entry_count=6, entry_key='0006', byte=180),
+                    dict(entry_count=8, entry_key='0008', byte=240),
+                    dict(entry_count=10, entry_key='000a', byte=300),
+                    dict(entry_count=12, entry_key='000c', byte=360),
+                    dict(entry_count=14, entry_key='000e', byte=420),
+                    dict(entry_count=16, entry_key='0010', byte=480),
+                    dict(entry_count=18, entry_key='0012', byte=540),
+                    dict(entry_count=20, entry_key='0014', byte=600),
+                ])
+
+    def test_outside_writes_that_dont_update_index(self):
+        with contextlib.closing(logfile.Logger(
+            str(self.log_path),
+            sample_field='key',
+            cached_entries=4,
+            index_gap=2,
+        )) as logger:
+            logger.write_rows(make_messages(10))
+            with self.log_path.open('ab') as outfile:
+                for item in make_messages(16, 256):
+                    outfile.write(common.bson_encode(item))
+            self.assertEqual(logger.cached_data, (
+                dict(key='010c', value=268),
+                dict(key='010d', value=269),
+                dict(key='010e', value=270),
+                dict(key='010f', value=271),
+            ))
+        self.assertEqual(self.read_bsons(), [
+            dict(key='0000', value=0),
+            dict(key='0001', value=1),
+            dict(key='0002', value=2),
+            dict(key='0003', value=3),
+            dict(key='0004', value=4),
+            dict(key='0005', value=5),
+            dict(key='0006', value=6),
+            dict(key='0007', value=7),
+            dict(key='0008', value=8),
+            dict(key='0009', value=9),
+            dict(key='0100', value=256),
+            dict(key='0101', value=257),
+            dict(key='0102', value=258),
+            dict(key='0103', value=259),
+            dict(key='0104', value=260),
+            dict(key='0105', value=261),
+            dict(key='0106', value=262),
+            dict(key='0107', value=263),
+            dict(key='0108', value=264),
+            dict(key='0109', value=265),
+            dict(key='010a', value=266),
+            dict(key='010b', value=267),
+            dict(key='010c', value=268),
+            dict(key='010d', value=269),
+            dict(key='010e', value=270),
+            dict(key='010f', value=271),
+        ])
+        self.assertEqual(self.read_index(), [
+            dict(entry_count=0, entry_key='0000', byte=0),
+            dict(entry_count=2, entry_key='0002', byte=60),
+            dict(entry_count=4, entry_key='0004', byte=120),
+            dict(entry_count=6, entry_key='0006', byte=180),
+            dict(entry_count=8, entry_key='0008', byte=240),
+        ])
+
+    def test_outside_write_but_dont_skip(self):
+        with contextlib.closing(logfile.Logger(
+            str(self.log_path),
+            sample_field='key',
+            cached_entries=10,
+            index_gap=4,
+        )) as logger:
+            logger.write_rows(make_messages(10))
+            with contextlib.closing(logfile.Logger(
+                str(self.log_path),
+                sample_field='key',
+                cached_entries=10,
+                index_gap=4,
+            )) as other_logger:
+                other_logger.write_rows(make_messages(5, 10))
+                self.assertEqual(logger.cached_data, (
+                    dict(key='0005', value=5),
+                    dict(key='0006', value=6),
+                    dict(key='0007', value=7),
+                    dict(key='0008', value=8),
+                    dict(key='0009', value=9),
+                    dict(key='000a', value=10),
+                    dict(key='000b', value=11),
+                    dict(key='000c', value=12),
+                    dict(key='000d', value=13),
+                    dict(key='000e', value=14),
+                ))
+                logger.write_rows(make_messages(5, 15))
+                self.assertEqual(self.read_index(), [
+                    dict(entry_count=0, entry_key='0000', byte=0),
+                    dict(entry_count=4, entry_key='0004', byte=120),
+                    dict(entry_count=8, entry_key='0008', byte=240),
+                    dict(entry_count=12, entry_key='000c', byte=360),
+                    dict(entry_count=16, entry_key='0010', byte=480),
+                ])
+                logger.write_rows(make_messages(1, 20))
+                self.assertEqual(self.read_index(), [
+                    dict(entry_count=0, entry_key='0000', byte=0),
+                    dict(entry_count=4, entry_key='0004', byte=120),
+                    dict(entry_count=8, entry_key='0008', byte=240),
+                    dict(entry_count=12, entry_key='000c', byte=360),
+                    dict(entry_count=16, entry_key='0010', byte=480),
+                    dict(entry_count=20, entry_key='0014', byte=600),
+                ])
+
+    def test_start_and_skip(self):
+        with contextlib.closing(logfile.Logger(
+            str(self.log_path),
+            sample_field='key',
+            cached_entries=4,
+            index_gap=2,
+        )) as logger:
+            logger.write_rows(make_messages(10))
+            with contextlib.closing(logfile.Logger(
+                str(self.log_path),
+                sample_field='key',
+                cached_entries=4,
+                index_gap=2,
+            )) as other_logger:
+                self.assertEqual(
+                    other_logger.cached_data, tuple(make_messages(4, 6)))
 
     def read_bsons(self):
         with self.log_path.open('rb') as infile:
             return bson.decode_all(
                 infile.read(), common.BSON_OPTIONS)
 
+    def read_index(self):
+        with self.index_path.open('rb') as infile:
+            return bson.decode_all(
+                infile.read(), common.BSON_OPTIONS)
+
+
+def make_messages(count, start=0):
+    return [
+        dict(key=format(n, '04x'), value=n)
+        for n in range(start, start + count)]
+
 
 if __name__ == '__main__':
     unittest.main()