view weatherlog/logger_test.py @ 6:8a350ec1aa78

logger_test: Ensure it crashes when the file is locked.
author Paul Fisher <paul@pfish.zone>
date Sat, 28 Sep 2019 19:55:27 -0400
parents 885bff085edf
children c01f9929ae38
line wrap: on
line source

import contextlib
import datetime
import fcntl
import itertools
import multiprocessing
import pathlib
import tempfile
import time
import unittest

import bson
import pytz

from . import logger
from . import types


class FakeWriter(logger.RemoteWriter):

    BATCH_SIZE = 2

    def __init__(self):
        self.writes = []

    def write(self, readings):
        self.writes.append(tuple(readings))


class FlakyWriter(logger.RemoteWriter):
    is_bad = False

    def __init__(self):
        self.writer = FakeWriter()

    def write(self, readings):
        if self.is_bad:
            raise logger.RemoteWriteError('I am bad!')
        self.writer.write(readings)

    @property
    def writes(self):
        return self.writer.writes


def ts(t):
    return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC)


class LoggerTest(unittest.TestCase):

    maxDiff = None

    def setUp(self):
        super().setUp()
        self.temp_dir = tempfile.TemporaryDirectory()
        self.temp_path = pathlib.Path(self.temp_dir.name)

    def tearDown(self):
        self.temp_dir.cleanup()
        super().tearDown()

    def test_from_nothing(self):
        writer = FakeWriter()
        with contextlib.closing(
            logger.BufferedLogger(self.temp_dir.name, writer)
        ) as bl:
            bl.WAIT_TIME = 0.2
            bl.write(types.Reading(ts(3), 1, 2))
            bl.write(types.Reading(ts(6), 4, 5))
            bl.write(types.Reading(ts(8), 10, 9))
            bl.start()
            time.sleep(1)  # Ensure that we get an entire logger cycle in.

        self.assertEqual(
            writer.writes,
            [
                (types.Reading(ts(3), 1, 2), types.Reading(ts(6), 4, 5)),
                (types.Reading(ts(8), 10, 9),),
            ],
        )
        self.assertEqual(self._read_last_sent(), '8.0')
        self.assertEqual(self._read_bsons(), [
            dict(sample_time=ts(3), temp_c=1, rh_pct=2),
            dict(sample_time=ts(6), temp_c=4, rh_pct=5),
            dict(sample_time=ts(8), temp_c=10, rh_pct=9),
        ])

    def test_append_and_resume(self):
        existing_values = [
            dict(sample_time=ts(10), temp_c=20, rh_pct=30),
            dict(sample_time=ts(60), temp_c=10, rh_pct=99),
        ]
        with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile:
            for value in existing_values:
                outfile.write(logger.bson_encode(value))
            outfile.write(b'non-BSON garbage')

        with (self.temp_path / logger.LAST_SENT_FILENAME).open('w') as outfile:
            outfile.write('10')

        writer = FakeWriter()
        with contextlib.closing(
            logger.BufferedLogger(str(self.temp_path), writer)
        ) as bl:
            bl.WAIT_TIME = 0.2
            bl.start()
            time.sleep(0.5)
            bl.write(types.Reading(ts(99), temp_c=-40, rh_pct=2))
            time.sleep(0.5)

        self.assertEqual(self._read_last_sent(), '99.0')
        self.assertEqual(self._read_bsons(), [
            dict(sample_time=ts(10), temp_c=20, rh_pct=30),
            dict(sample_time=ts(60), temp_c=10, rh_pct=99),
            dict(sample_time=ts(99), temp_c=-40, rh_pct=2),
        ])
        self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [
            types.Reading(ts(60), 10, 99),
            types.Reading(ts(99), -40, 2),
        ])

    def test_send_failure(self):
        writer = FlakyWriter()
        with contextlib.closing(
            logger.BufferedLogger(str(self.temp_path), writer)
        ) as bl:
            bl.WAIT_TIME = 0.2
            bl.start()
            bl.write(types.Reading(ts(1337), 420, 69))
            time.sleep(0.5)
            writer.is_bad = True
            bl.write(types.Reading(ts(31337), 666, 999))
            time.sleep(0.5)

            self.assertEqual(self._read_last_sent(), '1337.0')
            self.assertEqual(
                writer.writes, [(types.Reading(ts(1337), 420, 69),)])
            self.assertEqual(self._read_bsons(), [
                dict(sample_time=ts(1337), temp_c=420, rh_pct=69),
                dict(sample_time=ts(31337), temp_c=666, rh_pct=999),
            ])

            # Ensure that we resume writing again when the condition clears.

            writer.is_bad = False
            time.sleep(0.5)
        self.assertEqual(self._read_last_sent(), '31337.0')
        self.assertEqual(
            writer.writes,
            [
                (types.Reading(ts(1337), 420, 69),),
                (types.Reading(ts(31337), 666, 999),),
            ])
        self.assertEqual(self._read_bsons(), [
            dict(sample_time=ts(1337), temp_c=420, rh_pct=69),
            dict(sample_time=ts(31337), temp_c=666, rh_pct=999),
        ])

    def test_fail_upon_lock(self):
        bson_file = str(self.temp_path / logger.BSON_FILENAME)
        out_queue = multiprocessing.Queue()
        in_queue = multiprocessing.Queue()
        # This needs to be in a separate multiprocessing.Process
        # since flock-based file locks are per-process, not per-thread.
        proc = multiprocessing.Process(
            target=_lock_holder, args=(bson_file, out_queue, in_queue))
        proc.start()
        in_queue.get()  # Wait for the lock to be acquired.

        with self.assertRaises(OSError):
            logger.BufferedLogger(str(self.temp_path), FakeWriter())
        out_queue.put(None)  # Notify that we're done.
        out_queue.close()
        proc.join()
        proc.close()

        # Test that it works after the lock is released.
        logger.BufferedLogger(str(self.temp_path), FakeWriter()).close()

    def _read_last_sent(self):
        with (self.temp_path / logger.LAST_SENT_FILENAME).open('r') as infile:
            return infile.read()

    def _read_bsons(self):
        with (self.temp_path / logger.BSON_FILENAME).open('rb') as infile:
            return bson.decode_all(infile.read(), logger.BSON_OPTIONS)


def _lock_holder(path, in_queue, out_queue):
    with open(path, 'w') as infile:
        fcntl.flock(infile, fcntl.LOCK_SH)
        out_queue.put(None)  # Notify that we've acquired the lock.
        out_queue.close()
        in_queue.get()  # Wait for the test to complete before closing.


if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')
    unittest.main()