view weather_server/logfile_test.py @ 24:20c8ec56e447

logfile: Pull logfile thread out of Logger. This enables automatic garbage collection of Logger instances, since a running thread no longer has a reference to a Logger's self. It separates exclusive management of logfile state into the _writer_thread function, which now opens the file and writes it until it is told to stop by receiving the poison pill.
author Paul Fisher <paul@pfish.zone>
date Sun, 10 Nov 2019 23:07:11 -0500
parents beb42c835c52
children 9bc3687e1e5e
line wrap: on
line source

import contextlib
import datetime
import os.path
import pathlib
import tempfile
import threading
import unittest

import bson
import pytz

from . import common
from . import logfile


def ts(n):
    return datetime.datetime.utcfromtimestamp(n).replace(tzinfo=pytz.UTC)


class LoggerTest(unittest.TestCase):

    maxDiff = None

    def setUp(self):
        super().setUp()
        self.temp_dir = tempfile.TemporaryDirectory()
        self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson'

    def tearDown(self):
        self.temp_dir.cleanup()
        super().tearDown()

    def test_empty(self):
        lg = logfile.Logger(
            str(self.log_path), sample_field='x')
        with contextlib.closing(lg) as logger:
            self.assertEqual(logger.data, ())

    def test_fails_to_open(self):
        with self.assertRaises(OSError):
            logfile.Logger(
                os.path.join(
                    self.temp_dir.name,
                    'nonexistent-directory',
                    'bogus-filename'),
                sample_field='unimportant')

    def test_del(self):
        lg = logfile.Logger(
            str(self.log_path), sample_field='x')
        del lg

    def test_loading(self):
        with self.log_path.open('wb') as outfile:
            outfile.write(common.bson_encode(dict(
                sample_time=ts(123),
                temp_c=420,
                rh_pct=69,
                ingest_time=ts(125),
            )))
            outfile.write(b'garbage to ignore')
        with contextlib.closing(logfile.Logger(
            str(self.log_path),
            sample_field='sample_time',
        )) as logger:
            self.assertEqual(
                logger.data,
                (
                    dict(
                        sample_time=ts(123),
                        temp_c=420,
                        rh_pct=69,
                        ingest_time=ts(125)),
                ))

    def test_writing(self):
        with self.log_path.open('wb') as outfile:
            outfile.write(common.bson_encode(dict(
                sample_time=ts(123),
                temp_c=420,
                rh_pct=69,
                ingest_time=ts(125),
            )))
        with contextlib.closing(logfile.Logger(
            str(self.log_path),
            sample_field='sample_time',
        )) as logger:
            logger.write_rows([
                # Ignored, since it's older than the newest entry.
                dict(
                    sample_time=ts(100),
                    temp_c=999,
                    rh_pct=666,
                    ingest_time=ts(101),
                ),
                dict(
                    sample_time=ts(125),
                    temp_c=333,
                    rh_pct=777,
                    ingest_time=ts(200),
                ),
            ])
            self.assertEqual(
                logger.data,
                (
                    dict(
                        sample_time=ts(123),
                        temp_c=420,
                        rh_pct=69,
                        ingest_time=ts(125),
                    ),
                    dict(
                        sample_time=ts(125),
                        temp_c=333,
                        rh_pct=777,
                        ingest_time=ts(200),
                    ),
                )
            )

        self.assertEqual(self.read_bsons(), [
            dict(
                sample_time=ts(123),
                temp_c=420,
                rh_pct=69,
                ingest_time=ts(125),
            ),
            dict(
                sample_time=ts(125),
                temp_c=333,
                rh_pct=777,
                ingest_time=ts(200),
            ),
        ])

    def test_outside_writes(self):
        with contextlib.closing(logfile.Logger(
            str(self.log_path),
            sample_field='sample_time',
        )) as logger:
            logger.write_rows([
                dict(
                    sample_time=ts(100),
                    temp_c=999,
                    rh_pct=666,
                    ingest_time=ts(101),
                ),
                dict(
                    sample_time=ts(125),
                    temp_c=333,
                    rh_pct=777,
                    ingest_time=ts(200),
                ),
            ])
            with self.log_path.open('ab') as outfile:
                outfile.write(common.bson_encode(dict(
                    sample_time=ts(1024),
                    temp_c=256,
                    rh_pct=128,
                    ingest_time=ts(4096),
                )))
                outfile.flush()
            self.assertEqual(logger.data, (
                dict(
                    sample_time=ts(100),
                    temp_c=999,
                    rh_pct=666,
                    ingest_time=ts(101),
                ),
                dict(
                    sample_time=ts(125),
                    temp_c=333,
                    rh_pct=777,
                    ingest_time=ts(200),
                ),
                dict(
                    sample_time=ts(1024),
                    temp_c=256,
                    rh_pct=128,
                    ingest_time=ts(4096),
                ),
            ))

    def read_bsons(self):
        with self.log_path.open('rb') as infile:
            return bson.decode_all(
                infile.read(), common.BSON_OPTIONS)


if __name__ == '__main__':
    unittest.main()