view weatherlog/logger_test.py @ 16:770215590d80

logger: Actually pass the right type. The logger was improperly giving the writer already-encoded values.
author Paul Fisher <paul@pfish.zone>
date Thu, 17 Oct 2019 22:19:33 -0400
parents c01f9929ae38
children
line wrap: on
line source

import contextlib
import datetime
import fcntl
import itertools
import multiprocessing
import pathlib
import tempfile
import time
import unittest

import bson
import pytz

from . import logger


class FakeWriter(logger.RemoteWriter):

    BATCH_SIZE = 2

    def __init__(self):
        self.writes = []

    def write(self, readings):
        self.writes.append(tuple(readings))


class FlakyWriter(logger.RemoteWriter):
    is_bad = False

    def __init__(self):
        self.writer = FakeWriter()

    def write(self, readings):
        if self.is_bad:
            raise logger.RemoteWriteError('I am bad!')
        self.writer.write(readings)

    @property
    def writes(self):
        return self.writer.writes


def ts(t):
    return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC)


bs = logger.bson_encode


class LoggerTest(unittest.TestCase):

    maxDiff = None

    def setUp(self):
        super().setUp()
        self.temp_dir = tempfile.TemporaryDirectory()
        self.temp_path = pathlib.Path(self.temp_dir.name)

    def tearDown(self):
        self.temp_dir.cleanup()
        super().tearDown()

    def test_from_nothing(self):
        writer = FakeWriter()
        with contextlib.closing(
            logger.BufferedLogger(self.temp_dir.name, writer)
        ) as bl:
            bl.WAIT_TIME = 0.2
            bl.write({'first': 'item'})
            bl.write({'entry': 2})
            bl.write({'thing': 'three'})
            bl.start()
            time.sleep(1)  # Ensure that we get an entire logger cycle in.

        self.assertEqual(
            writer.writes,
            [
                ({'first': 'item'}, {'entry': 2}),
                ({'thing': 'three'},),
            ],
        )
        self.assertEqual(self._read_last_sent(), '59')
        self.assertEqual(self._read_bsons(), [
            {'first': 'item'},
            {'entry': 2},
            {'thing': 'three'},
        ])

    def test_append_and_resume(self):
        existing_values = [
            dict(sample_time=ts(10), temp_c=20, rh_pct=30),
            dict(sample_time=ts(60), temp_c=10, rh_pct=99),
        ]
        with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile:
            for value in existing_values:
                outfile.write(bs(value))
            outfile.write(b'non-BSON garbage')

        with (self.temp_path / logger.OLD_LAST_TS).open('w') as outfile:
            outfile.write('10')

        writer = FakeWriter()
        with contextlib.closing(
            logger.BufferedLogger(str(self.temp_path), writer)
        ) as bl:
            bl.WAIT_TIME = 0.2
            bl.start()
            time.sleep(0.5)
            bl.write({'some new': 'entry'})
            time.sleep(0.5)

        self.assertEqual(self._read_last_sent(), '125')
        self.assertEqual(self._read_bsons(), [
            dict(sample_time=ts(10), temp_c=20, rh_pct=30),
            dict(sample_time=ts(60), temp_c=10, rh_pct=99),
            {'some new': 'entry'},
        ])
        self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [
            dict(sample_time=ts(60), temp_c=10, rh_pct=99),
            {'some new': 'entry'},
        ])

    def test_resume_from_byte(self):
        existing_values = [
            {'old': 'value'},
            {'unsent': 'value'},
        ]
        with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile:
            for value in existing_values:
                outfile.write(bs(value))
            outfile.write(b'non-BSON garbage')
        with (self.temp_path / logger.START_BYTE).open('w') as outfile:
            outfile.write('20')  # immediately after 'old: value'

        writer = FakeWriter()
        with contextlib.closing(
            logger.BufferedLogger(str(self.temp_path), writer)
        ) as bl:
            bl.WAIT_TIME = 0.2
            bl.start()
            bl.write({'some new': 'entry'})
            time.sleep(0.5)

        self.assertEqual(self._read_last_sent(), '68')
        self.assertEqual(self._read_bsons(), [
            {'old': 'value'},
            {'unsent': 'value'},
            {'some new': 'entry'},
        ])
        self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [
            {'unsent': 'value'},
            {'some new': 'entry'},
        ])

    def test_send_failure(self):
        writer = FlakyWriter()
        with contextlib.closing(
            logger.BufferedLogger(str(self.temp_path), writer)
        ) as bl:
            bl.WAIT_TIME = 0.2
            bl.start()
            bl.write({'cool write': 'succeeds'})
            time.sleep(0.5)
            writer.is_bad = True
            bl.write({'bad write': 'fails'})
            time.sleep(0.5)

            self.assertEqual(self._read_last_sent(), '30')
            self.assertEqual(
                writer.writes, [({'cool write': 'succeeds'},)])
            self.assertEqual(self._read_bsons(), [
                {'cool write': 'succeeds'},
                {'bad write': 'fails'},
            ])

            # Ensure that we resume writing again when the condition clears.

            writer.is_bad = False
            time.sleep(0.5)
        self.assertEqual(self._read_last_sent(), '56')
        self.assertEqual(
            writer.writes,
            [
                ({'cool write': 'succeeds'},),
                ({'bad write': 'fails'},),
            ])
        self.assertEqual(self._read_bsons(), [
            {'cool write': 'succeeds'},
            {'bad write': 'fails'},
        ])

    def test_fail_upon_lock(self):
        bson_file = str(self.temp_path / logger.BSON_FILENAME)
        out_queue = multiprocessing.Queue()
        in_queue = multiprocessing.Queue()
        # This needs to be in a separate multiprocessing.Process
        # since flock-based file locks are per-process, not per-thread.
        proc = multiprocessing.Process(
            target=_lock_holder, args=(bson_file, out_queue, in_queue))
        proc.start()
        in_queue.get()  # Wait for the lock to be acquired.

        with self.assertRaises(OSError):
            logger.BufferedLogger(str(self.temp_path), FakeWriter())
        out_queue.put(None)  # Notify that we're done.
        out_queue.close()
        proc.join()
        proc.close()

        # Test that it works after the lock is released.
        logger.BufferedLogger(str(self.temp_path), FakeWriter()).close()

    def test_upgrade_last_sent(self):
        for (timestamp, byte_count) in [
            ('5', '0'),
            ('20', '52'),
            ('30', '78'),
        ]:
            bson_file = self.temp_path / logger.BSON_FILENAME
            with bson_file.open('wb') as outfile:
                outfile.write(logger.bson_encode(dict(sample_time=ts(10))))
                outfile.write(logger.bson_encode(dict(sample_time=ts(20))))
                outfile.write(logger.bson_encode(dict(sample_time=ts(30))))
                outfile.write(b'some bogus data')
            last_sent = self.temp_path / logger.OLD_LAST_TS
            with last_sent.open('w') as outfile:
                outfile.write(timestamp)
            start_byte = self.temp_path / logger.START_BYTE
            with bson_file.open('r+b') as infile:
                logger._read_unsent_and_upgrade(
                    infile, last_sent, start_byte)
            self.assertFalse(last_sent.exists())
            with start_byte.open('r') as infile:
                self.assertEqual(infile.read(), byte_count)

    def test_upgrade_last_sent_no_last_sent(self):
        bson_file = self.temp_path / logger.BSON_FILENAME
        with bson_file.open('wb') as outfile:
            outfile.write(logger.bson_encode(dict(sample_time=ts(10))))
            outfile.write(logger.bson_encode(dict(sample_time=ts(20))))
            outfile.write(logger.bson_encode(dict(sample_time=ts(30))))
        last_sent = self.temp_path / logger.OLD_LAST_TS
        start_byte = self.temp_path / logger.START_BYTE
        with start_byte.open('w') as outfile:
            outfile.write('untouched')
        with bson_file.open('r+b') as infile:
            logger._read_unsent_and_upgrade(
                infile, last_sent, start_byte)
        self.assertFalse(last_sent.exists())
        with start_byte.open('r') as infile:
            self.assertEqual(infile.read(), 'untouched')

    def _read_last_sent(self):
        with (self.temp_path / logger.START_BYTE).open('r') as infile:
            return infile.read()

    def _read_bsons(self):
        with (self.temp_path / logger.BSON_FILENAME).open('rb') as infile:
            return bson.decode_all(infile.read(), logger.BSON_OPTIONS)


def _lock_holder(path, in_queue, out_queue):
    with open(path, 'w') as infile:
        fcntl.flock(infile, fcntl.LOCK_SH)
        out_queue.put(None)  # Notify that we've acquired the lock.
        out_queue.close()
        in_queue.get()  # Wait for the test to complete before closing.


if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')
    unittest.main()