Mercurial > personal > weatherlog
view weatherlog/logger_test.py @ 6:8a350ec1aa78
logger_test: Ensure it crashes when the file is locked.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sat, 28 Sep 2019 19:55:27 -0400 |
parents | 885bff085edf |
children | c01f9929ae38 |
line wrap: on
line source
import contextlib import datetime import fcntl import itertools import multiprocessing import pathlib import tempfile import time import unittest import bson import pytz from . import logger from . import types class FakeWriter(logger.RemoteWriter): BATCH_SIZE = 2 def __init__(self): self.writes = [] def write(self, readings): self.writes.append(tuple(readings)) class FlakyWriter(logger.RemoteWriter): is_bad = False def __init__(self): self.writer = FakeWriter() def write(self, readings): if self.is_bad: raise logger.RemoteWriteError('I am bad!') self.writer.write(readings) @property def writes(self): return self.writer.writes def ts(t): return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC) class LoggerTest(unittest.TestCase): maxDiff = None def setUp(self): super().setUp() self.temp_dir = tempfile.TemporaryDirectory() self.temp_path = pathlib.Path(self.temp_dir.name) def tearDown(self): self.temp_dir.cleanup() super().tearDown() def test_from_nothing(self): writer = FakeWriter() with contextlib.closing( logger.BufferedLogger(self.temp_dir.name, writer) ) as bl: bl.WAIT_TIME = 0.2 bl.write(types.Reading(ts(3), 1, 2)) bl.write(types.Reading(ts(6), 4, 5)) bl.write(types.Reading(ts(8), 10, 9)) bl.start() time.sleep(1) # Ensure that we get an entire logger cycle in. self.assertEqual( writer.writes, [ (types.Reading(ts(3), 1, 2), types.Reading(ts(6), 4, 5)), (types.Reading(ts(8), 10, 9),), ], ) self.assertEqual(self._read_last_sent(), '8.0') self.assertEqual(self._read_bsons(), [ dict(sample_time=ts(3), temp_c=1, rh_pct=2), dict(sample_time=ts(6), temp_c=4, rh_pct=5), dict(sample_time=ts(8), temp_c=10, rh_pct=9), ]) def test_append_and_resume(self): existing_values = [ dict(sample_time=ts(10), temp_c=20, rh_pct=30), dict(sample_time=ts(60), temp_c=10, rh_pct=99), ] with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile: for value in existing_values: outfile.write(logger.bson_encode(value)) outfile.write(b'non-BSON garbage') with (self.temp_path / logger.LAST_SENT_FILENAME).open('w') as outfile: outfile.write('10') writer = FakeWriter() with contextlib.closing( logger.BufferedLogger(str(self.temp_path), writer) ) as bl: bl.WAIT_TIME = 0.2 bl.start() time.sleep(0.5) bl.write(types.Reading(ts(99), temp_c=-40, rh_pct=2)) time.sleep(0.5) self.assertEqual(self._read_last_sent(), '99.0') self.assertEqual(self._read_bsons(), [ dict(sample_time=ts(10), temp_c=20, rh_pct=30), dict(sample_time=ts(60), temp_c=10, rh_pct=99), dict(sample_time=ts(99), temp_c=-40, rh_pct=2), ]) self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [ types.Reading(ts(60), 10, 99), types.Reading(ts(99), -40, 2), ]) def test_send_failure(self): writer = FlakyWriter() with contextlib.closing( logger.BufferedLogger(str(self.temp_path), writer) ) as bl: bl.WAIT_TIME = 0.2 bl.start() bl.write(types.Reading(ts(1337), 420, 69)) time.sleep(0.5) writer.is_bad = True bl.write(types.Reading(ts(31337), 666, 999)) time.sleep(0.5) self.assertEqual(self._read_last_sent(), '1337.0') self.assertEqual( writer.writes, [(types.Reading(ts(1337), 420, 69),)]) self.assertEqual(self._read_bsons(), [ dict(sample_time=ts(1337), temp_c=420, rh_pct=69), dict(sample_time=ts(31337), temp_c=666, rh_pct=999), ]) # Ensure that we resume writing again when the condition clears. writer.is_bad = False time.sleep(0.5) self.assertEqual(self._read_last_sent(), '31337.0') self.assertEqual( writer.writes, [ (types.Reading(ts(1337), 420, 69),), (types.Reading(ts(31337), 666, 999),), ]) self.assertEqual(self._read_bsons(), [ dict(sample_time=ts(1337), temp_c=420, rh_pct=69), dict(sample_time=ts(31337), temp_c=666, rh_pct=999), ]) def test_fail_upon_lock(self): bson_file = str(self.temp_path / logger.BSON_FILENAME) out_queue = multiprocessing.Queue() in_queue = multiprocessing.Queue() # This needs to be in a separate multiprocessing.Process # since flock-based file locks are per-process, not per-thread. proc = multiprocessing.Process( target=_lock_holder, args=(bson_file, out_queue, in_queue)) proc.start() in_queue.get() # Wait for the lock to be acquired. with self.assertRaises(OSError): logger.BufferedLogger(str(self.temp_path), FakeWriter()) out_queue.put(None) # Notify that we're done. out_queue.close() proc.join() proc.close() # Test that it works after the lock is released. logger.BufferedLogger(str(self.temp_path), FakeWriter()).close() def _read_last_sent(self): with (self.temp_path / logger.LAST_SENT_FILENAME).open('r') as infile: return infile.read() def _read_bsons(self): with (self.temp_path / logger.BSON_FILENAME).open('rb') as infile: return bson.decode_all(infile.read(), logger.BSON_OPTIONS) def _lock_holder(path, in_queue, out_queue): with open(path, 'w') as infile: fcntl.flock(infile, fcntl.LOCK_SH) out_queue.put(None) # Notify that we've acquired the lock. out_queue.close() in_queue.get() # Wait for the test to complete before closing. if __name__ == '__main__': multiprocessing.set_start_method('spawn') unittest.main()