view weather_server/logfile_test.py @ 31:9bc3687e1e5e

logfile: Add an index, and don't keep everything in RAM. - Adds index BSON file, updated upon writing. - Limits amount of data in RAM. - Gracefully handles writes that don't update index.
author Paul Fisher <paul@pfish.zone>
date Tue, 07 Jul 2020 19:51:30 -0400
parents 20c8ec56e447
children
line wrap: on
line source

import contextlib
import datetime
import os.path
import pathlib
import tempfile
import unittest

import bson
import pytz

from . import common
from . import logfile


def ts(n):
    return datetime.datetime.utcfromtimestamp(n).replace(tzinfo=pytz.UTC)


class LoggerTest(unittest.TestCase):

    maxDiff = None

    def setUp(self):
        super().setUp()
        self.temp_dir = tempfile.TemporaryDirectory()
        self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson'
        self.index_path = pathlib.Path(str(self.log_path) + '.index.bson')

    def tearDown(self):
        self.temp_dir.cleanup()
        super().tearDown()

    def test_empty(self):
        lg = logfile.Logger(
            str(self.log_path), sample_field='x')
        with contextlib.closing(lg) as logger:
            self.assertEqual(logger.cached_data, ())

    def test_fails_to_open(self):
        with self.assertRaises(OSError):
            logfile.Logger(
                os.path.join(
                    self.temp_dir.name,
                    'nonexistent-directory',
                    'bogus-filename'),
                sample_field='unimportant')

    def test_del(self):
        lg = logfile.Logger(
            str(self.log_path), sample_field='x')
        del lg

    def test_loading(self):
        with self.log_path.open('wb') as outfile:
            outfile.write(common.bson_encode(dict(
                sample_time=ts(123),
                temp_c=420,
                rh_pct=69,
                ingest_time=ts(125),
            )))
            outfile.write(b'garbage to ignore')
        with contextlib.closing(logfile.Logger(
            str(self.log_path),
            sample_field='sample_time',
        )) as logger:
            self.assertEqual(
                logger.cached_data,
                (
                    dict(
                        sample_time=ts(123),
                        temp_c=420,
                        rh_pct=69,
                        ingest_time=ts(125)),
                ))

    def test_writing(self):
        with self.log_path.open('wb') as outfile:
            outfile.write(common.bson_encode(dict(
                sample_time=ts(123),
                temp_c=420,
                rh_pct=69,
                ingest_time=ts(125),
            )))
        with contextlib.closing(logfile.Logger(
            str(self.log_path),
            sample_field='sample_time',
            cached_entries=4,
            index_gap=2,
        )) as logger:
            logger.write_rows([
                # Ignored, since it's older than the newest entry.
                dict(
                    sample_time=ts(100),
                    temp_c=999,
                    rh_pct=666,
                    ingest_time=ts(101),
                ),
                dict(
                    sample_time=ts(125),
                    temp_c=333,
                    rh_pct=777,
                    ingest_time=ts(200),
                ),
            ])
            self.assertEqual(
                logger.cached_data,
                (
                    dict(
                        sample_time=ts(123),
                        temp_c=420,
                        rh_pct=69,
                        ingest_time=ts(125),
                    ),
                    dict(
                        sample_time=ts(125),
                        temp_c=333,
                        rh_pct=777,
                        ingest_time=ts(200),
                    ),
                )
            )

        self.assertEqual(self.read_bsons(), [
            dict(
                sample_time=ts(123),
                temp_c=420,
                rh_pct=69,
                ingest_time=ts(125),
            ),
            dict(
                sample_time=ts(125),
                temp_c=333,
                rh_pct=777,
                ingest_time=ts(200),
            ),
        ])
        self.assertEqual(self.read_index(), [
            dict(entry_count=0, entry_key=ts(123), byte=0),
        ])

    def test_outside_writes(self):
        with contextlib.closing(logfile.Logger(
            str(self.log_path),
            sample_field='sample_time',
            index_gap=2,
        )) as logger:
            logger.write_rows([
                dict(
                    sample_time=ts(100),
                    temp_c=999,
                    rh_pct=666,
                    ingest_time=ts(101),
                ),
                dict(
                    sample_time=ts(125),
                    temp_c=333,
                    rh_pct=777,
                    ingest_time=ts(200),
                ),
            ])
            with self.log_path.open('ab') as outfile:
                outfile.write(common.bson_encode(dict(
                    sample_time=ts(1024),
                    temp_c=256,
                    rh_pct=128,
                    ingest_time=ts(4096),
                )))
                outfile.flush()
            # Empty write to update the index.
            logger.write_rows([])
            self.assertEqual(logger.cached_data, (
                dict(
                    sample_time=ts(100),
                    temp_c=999,
                    rh_pct=666,
                    ingest_time=ts(101),
                ),
                dict(
                    sample_time=ts(125),
                    temp_c=333,
                    rh_pct=777,
                    ingest_time=ts(200),
                ),
                dict(
                    sample_time=ts(1024),
                    temp_c=256,
                    rh_pct=128,
                    ingest_time=ts(4096),
                ),
            ))
        self.assertEqual(self.read_bsons(), [
            dict(
                sample_time=ts(100),
                temp_c=999,
                rh_pct=666,
                ingest_time=ts(101),
            ),
            dict(
                sample_time=ts(125),
                temp_c=333,
                rh_pct=777,
                ingest_time=ts(200),
            ),
            dict(
                sample_time=ts(1024),
                temp_c=256,
                rh_pct=128,
                ingest_time=ts(4096),
            ),
        ])
        self.assertEqual(self.read_index(), [
            dict(entry_count=0, entry_key=ts(100), byte=0),
            dict(entry_count=2, entry_key=ts(1024), byte=142),
        ])

    def test_lots_of_outside_writes(self):
        with contextlib.closing(logfile.Logger(
            str(self.log_path),
            sample_field='key',
            cached_entries=4,
            index_gap=2,
        )) as logger:
            logger.write_rows(make_messages(10))
            with contextlib.closing(logfile.Logger(
                str(self.log_path),
                sample_field='key',
                cached_entries=4,
                index_gap=2,
            )) as other_logger:
                other_logger.write_rows(make_messages(10, 10))
                logger.write_rows(make_messages(1, 20))
                self.assertEqual(logger.cached_data, (
                    dict(key='0011', value=17),
                    dict(key='0012', value=18),
                    dict(key='0013', value=19),
                    dict(key='0014', value=20),
                ))
                self.assertEqual(self.read_bsons(), make_messages(21))
                self.assertEqual(self.read_index(), [
                    dict(entry_count=0, entry_key='0000', byte=0),
                    dict(entry_count=2, entry_key='0002', byte=60),
                    dict(entry_count=4, entry_key='0004', byte=120),
                    dict(entry_count=6, entry_key='0006', byte=180),
                    dict(entry_count=8, entry_key='0008', byte=240),
                    dict(entry_count=10, entry_key='000a', byte=300),
                    dict(entry_count=12, entry_key='000c', byte=360),
                    dict(entry_count=14, entry_key='000e', byte=420),
                    dict(entry_count=16, entry_key='0010', byte=480),
                    dict(entry_count=18, entry_key='0012', byte=540),
                    dict(entry_count=20, entry_key='0014', byte=600),
                ])

    def test_outside_writes_that_dont_update_index(self):
        with contextlib.closing(logfile.Logger(
            str(self.log_path),
            sample_field='key',
            cached_entries=4,
            index_gap=2,
        )) as logger:
            logger.write_rows(make_messages(10))
            with self.log_path.open('ab') as outfile:
                for item in make_messages(16, 256):
                    outfile.write(common.bson_encode(item))
            self.assertEqual(logger.cached_data, (
                dict(key='010c', value=268),
                dict(key='010d', value=269),
                dict(key='010e', value=270),
                dict(key='010f', value=271),
            ))
        self.assertEqual(self.read_bsons(), [
            dict(key='0000', value=0),
            dict(key='0001', value=1),
            dict(key='0002', value=2),
            dict(key='0003', value=3),
            dict(key='0004', value=4),
            dict(key='0005', value=5),
            dict(key='0006', value=6),
            dict(key='0007', value=7),
            dict(key='0008', value=8),
            dict(key='0009', value=9),
            dict(key='0100', value=256),
            dict(key='0101', value=257),
            dict(key='0102', value=258),
            dict(key='0103', value=259),
            dict(key='0104', value=260),
            dict(key='0105', value=261),
            dict(key='0106', value=262),
            dict(key='0107', value=263),
            dict(key='0108', value=264),
            dict(key='0109', value=265),
            dict(key='010a', value=266),
            dict(key='010b', value=267),
            dict(key='010c', value=268),
            dict(key='010d', value=269),
            dict(key='010e', value=270),
            dict(key='010f', value=271),
        ])
        self.assertEqual(self.read_index(), [
            dict(entry_count=0, entry_key='0000', byte=0),
            dict(entry_count=2, entry_key='0002', byte=60),
            dict(entry_count=4, entry_key='0004', byte=120),
            dict(entry_count=6, entry_key='0006', byte=180),
            dict(entry_count=8, entry_key='0008', byte=240),
        ])

    def test_outside_write_but_dont_skip(self):
        with contextlib.closing(logfile.Logger(
            str(self.log_path),
            sample_field='key',
            cached_entries=10,
            index_gap=4,
        )) as logger:
            logger.write_rows(make_messages(10))
            with contextlib.closing(logfile.Logger(
                str(self.log_path),
                sample_field='key',
                cached_entries=10,
                index_gap=4,
            )) as other_logger:
                other_logger.write_rows(make_messages(5, 10))
                self.assertEqual(logger.cached_data, (
                    dict(key='0005', value=5),
                    dict(key='0006', value=6),
                    dict(key='0007', value=7),
                    dict(key='0008', value=8),
                    dict(key='0009', value=9),
                    dict(key='000a', value=10),
                    dict(key='000b', value=11),
                    dict(key='000c', value=12),
                    dict(key='000d', value=13),
                    dict(key='000e', value=14),
                ))
                logger.write_rows(make_messages(5, 15))
                self.assertEqual(self.read_index(), [
                    dict(entry_count=0, entry_key='0000', byte=0),
                    dict(entry_count=4, entry_key='0004', byte=120),
                    dict(entry_count=8, entry_key='0008', byte=240),
                    dict(entry_count=12, entry_key='000c', byte=360),
                    dict(entry_count=16, entry_key='0010', byte=480),
                ])
                logger.write_rows(make_messages(1, 20))
                self.assertEqual(self.read_index(), [
                    dict(entry_count=0, entry_key='0000', byte=0),
                    dict(entry_count=4, entry_key='0004', byte=120),
                    dict(entry_count=8, entry_key='0008', byte=240),
                    dict(entry_count=12, entry_key='000c', byte=360),
                    dict(entry_count=16, entry_key='0010', byte=480),
                    dict(entry_count=20, entry_key='0014', byte=600),
                ])

    def test_start_and_skip(self):
        with contextlib.closing(logfile.Logger(
            str(self.log_path),
            sample_field='key',
            cached_entries=4,
            index_gap=2,
        )) as logger:
            logger.write_rows(make_messages(10))
            with contextlib.closing(logfile.Logger(
                str(self.log_path),
                sample_field='key',
                cached_entries=4,
                index_gap=2,
            )) as other_logger:
                self.assertEqual(
                    other_logger.cached_data, tuple(make_messages(4, 6)))

    def read_bsons(self):
        with self.log_path.open('rb') as infile:
            return bson.decode_all(
                infile.read(), common.BSON_OPTIONS)

    def read_index(self):
        with self.index_path.open('rb') as infile:
            return bson.decode_all(
                infile.read(), common.BSON_OPTIONS)


def make_messages(count, start=0):
    return [
        dict(key=format(n, '04x'), value=n)
        for n in range(start, start + count)]


if __name__ == '__main__':
    unittest.main()