view weatherlog/logger_test.py @ 14:c01f9929ae38

Make logger and HTTP writer more general and resilient. This makes the logger and HTTP writer more general, by removing any dependency upon the exact data type they are writing. They can now handle any type of BSON-serializable dict, and track what they have sent by keeping track of the last *byte*, not the last timestamp.
author Paul Fisher <paul@pfish.zone>
date Tue, 15 Oct 2019 22:40:24 -0400
parents 8a350ec1aa78
children 770215590d80
line wrap: on
line source

import contextlib
import datetime
import fcntl
import itertools
import multiprocessing
import pathlib
import tempfile
import time
import unittest

import bson
import pytz

from . import logger
from . import types


class FakeWriter(logger.RemoteWriter):

    BATCH_SIZE = 2

    def __init__(self):
        self.writes = []

    def write(self, readings):
        self.writes.append(tuple(readings))


class FlakyWriter(logger.RemoteWriter):
    is_bad = False

    def __init__(self):
        self.writer = FakeWriter()

    def write(self, readings):
        if self.is_bad:
            raise logger.RemoteWriteError('I am bad!')
        self.writer.write(readings)

    @property
    def writes(self):
        return self.writer.writes


def ts(t):
    return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC)


bs = logger.bson_encode


class LoggerTest(unittest.TestCase):

    maxDiff = None

    def setUp(self):
        super().setUp()
        self.temp_dir = tempfile.TemporaryDirectory()
        self.temp_path = pathlib.Path(self.temp_dir.name)

    def tearDown(self):
        self.temp_dir.cleanup()
        super().tearDown()

    def test_from_nothing(self):
        writer = FakeWriter()
        with contextlib.closing(
            logger.BufferedLogger(self.temp_dir.name, writer)
        ) as bl:
            bl.WAIT_TIME = 0.2
            bl.write({'first': 'item'})
            bl.write({'entry': 2})
            bl.write({'thing': 'three'})
            bl.start()
            time.sleep(1)  # Ensure that we get an entire logger cycle in.

        self.assertEqual(
            writer.writes,
            [
                (bs({'first': 'item'}), bs({'entry': 2})),
                (bs({'thing': 'three'}),),
            ],
        )
        self.assertEqual(self._read_last_sent(), '59')
        self.assertEqual(self._read_bsons(), [
            {'first': 'item'},
            {'entry': 2},
            {'thing': 'three'},
        ])

    def test_append_and_resume(self):
        existing_values = [
            dict(sample_time=ts(10), temp_c=20, rh_pct=30),
            dict(sample_time=ts(60), temp_c=10, rh_pct=99),
        ]
        with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile:
            for value in existing_values:
                outfile.write(bs(value))
            outfile.write(b'non-BSON garbage')

        with (self.temp_path / logger.OLD_LAST_TS).open('w') as outfile:
            outfile.write('10')

        writer = FakeWriter()
        with contextlib.closing(
            logger.BufferedLogger(str(self.temp_path), writer)
        ) as bl:
            bl.WAIT_TIME = 0.2
            bl.start()
            time.sleep(0.5)
            bl.write({'some new': 'entry'})
            time.sleep(0.5)

        self.assertEqual(self._read_last_sent(), '125')
        self.assertEqual(self._read_bsons(), [
            dict(sample_time=ts(10), temp_c=20, rh_pct=30),
            dict(sample_time=ts(60), temp_c=10, rh_pct=99),
            {'some new': 'entry'},
        ])
        self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [
            bs(dict(sample_time=ts(60), temp_c=10, rh_pct=99)),
            bs({'some new': 'entry'}),
        ])

    def test_resume_from_byte(self):
        existing_values = [
            {'old': 'value'},
            {'unsent': 'value'},
        ]
        with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile:
            for value in existing_values:
                outfile.write(bs(value))
            outfile.write(b'non-BSON garbage')
        with (self.temp_path / logger.START_BYTE).open('w') as outfile:
            outfile.write('20')  # immediately after 'old: value'

        writer = FakeWriter()
        with contextlib.closing(
            logger.BufferedLogger(str(self.temp_path), writer)
        ) as bl:
            bl.WAIT_TIME = 0.2
            bl.start()
            bl.write({'some new': 'entry'})
            time.sleep(0.5)

        self.assertEqual(self._read_last_sent(), '68')
        self.assertEqual(self._read_bsons(), [
            {'old': 'value'},
            {'unsent': 'value'},
            {'some new': 'entry'},
        ])
        self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [
            bs({'unsent': 'value'}),
            bs({'some new': 'entry'}),
        ])

    def test_send_failure(self):
        writer = FlakyWriter()
        with contextlib.closing(
            logger.BufferedLogger(str(self.temp_path), writer)
        ) as bl:
            bl.WAIT_TIME = 0.2
            bl.start()
            bl.write({'cool write': 'succeeds'})
            time.sleep(0.5)
            writer.is_bad = True
            bl.write({'bad write': 'fails'})
            time.sleep(0.5)

            self.assertEqual(self._read_last_sent(), '30')
            self.assertEqual(
                writer.writes, [(bs({'cool write': 'succeeds'}),)])
            self.assertEqual(self._read_bsons(), [
                {'cool write': 'succeeds'},
                {'bad write': 'fails'},
            ])

            # Ensure that we resume writing again when the condition clears.

            writer.is_bad = False
            time.sleep(0.5)
        self.assertEqual(self._read_last_sent(), '56')
        self.assertEqual(
            writer.writes,
            [
                (bs({'cool write': 'succeeds'}),),
                (bs({'bad write': 'fails'}),),
            ])
        self.assertEqual(self._read_bsons(), [
            {'cool write': 'succeeds'},
            {'bad write': 'fails'},
        ])

    def test_fail_upon_lock(self):
        bson_file = str(self.temp_path / logger.BSON_FILENAME)
        out_queue = multiprocessing.Queue()
        in_queue = multiprocessing.Queue()
        # This needs to be in a separate multiprocessing.Process
        # since flock-based file locks are per-process, not per-thread.
        proc = multiprocessing.Process(
            target=_lock_holder, args=(bson_file, out_queue, in_queue))
        proc.start()
        in_queue.get()  # Wait for the lock to be acquired.

        with self.assertRaises(OSError):
            logger.BufferedLogger(str(self.temp_path), FakeWriter())
        out_queue.put(None)  # Notify that we're done.
        out_queue.close()
        proc.join()
        proc.close()

        # Test that it works after the lock is released.
        logger.BufferedLogger(str(self.temp_path), FakeWriter()).close()

    def test_upgrade_last_sent(self):
        for (timestamp, byte_count) in [
            ('5', '0'),
            ('20', '52'),
            ('30', '78'),
        ]:
            bson_file = self.temp_path / logger.BSON_FILENAME
            with bson_file.open('wb') as outfile:
                outfile.write(logger.bson_encode(dict(sample_time=ts(10))))
                outfile.write(logger.bson_encode(dict(sample_time=ts(20))))
                outfile.write(logger.bson_encode(dict(sample_time=ts(30))))
                outfile.write(b'some bogus data')
            last_sent = self.temp_path / logger.OLD_LAST_TS
            with last_sent.open('w') as outfile:
                outfile.write(timestamp)
            start_byte = self.temp_path / logger.START_BYTE
            with bson_file.open('r+b') as infile:
                logger._read_unsent_and_upgrade(
                    infile, last_sent, start_byte)
            self.assertFalse(last_sent.exists())
            with start_byte.open('r') as infile:
                self.assertEqual(infile.read(), byte_count)

    def test_upgrade_last_sent_no_last_sent(self):
        bson_file = self.temp_path / logger.BSON_FILENAME
        with bson_file.open('wb') as outfile:
            outfile.write(logger.bson_encode(dict(sample_time=ts(10))))
            outfile.write(logger.bson_encode(dict(sample_time=ts(20))))
            outfile.write(logger.bson_encode(dict(sample_time=ts(30))))
        last_sent = self.temp_path / logger.OLD_LAST_TS
        start_byte = self.temp_path / logger.START_BYTE
        with start_byte.open('w') as outfile:
            outfile.write('untouched')
        with bson_file.open('r+b') as infile:
            logger._read_unsent_and_upgrade(
                infile, last_sent, start_byte)
        self.assertFalse(last_sent.exists())
        with start_byte.open('r') as infile:
            self.assertEqual(infile.read(), 'untouched')

    def _read_last_sent(self):
        with (self.temp_path / logger.START_BYTE).open('r') as infile:
            return infile.read()

    def _read_bsons(self):
        with (self.temp_path / logger.BSON_FILENAME).open('rb') as infile:
            return bson.decode_all(infile.read(), logger.BSON_OPTIONS)


def _lock_holder(path, in_queue, out_queue):
    with open(path, 'w') as infile:
        fcntl.flock(infile, fcntl.LOCK_SH)
        out_queue.put(None)  # Notify that we've acquired the lock.
        out_queue.close()
        in_queue.get()  # Wait for the test to complete before closing.


if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')
    unittest.main()