# HG changeset patch # User Paul Fisher # Date 1569714927 14400 # Node ID 8a350ec1aa78e45d26a74c9dc41c5194ba8318da # Parent 885bff085edf18040fff9cfd030b7267cf8a2fed logger_test: Ensure it crashes when the file is locked. diff -r 885bff085edf -r 8a350ec1aa78 weatherlog/logger.py --- a/weatherlog/logger.py Sat Sep 28 19:28:22 2019 -0400 +++ b/weatherlog/logger.py Sat Sep 28 19:55:27 2019 -0400 @@ -121,6 +121,7 @@ try: fcntl.flock(file, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError as ex: + file.close() raise OSError('Another copy of the logger is running.') from ex return file diff -r 885bff085edf -r 8a350ec1aa78 weatherlog/logger_test.py --- a/weatherlog/logger_test.py Sat Sep 28 19:28:22 2019 -0400 +++ b/weatherlog/logger_test.py Sat Sep 28 19:55:27 2019 -0400 @@ -1,6 +1,8 @@ import contextlib import datetime +import fcntl import itertools +import multiprocessing import pathlib import tempfile import time @@ -154,6 +156,27 @@ dict(sample_time=ts(31337), temp_c=666, rh_pct=999), ]) + def test_fail_upon_lock(self): + bson_file = str(self.temp_path / logger.BSON_FILENAME) + out_queue = multiprocessing.Queue() + in_queue = multiprocessing.Queue() + # This needs to be in a separate multiprocessing.Process + # since flock-based file locks are per-process, not per-thread. + proc = multiprocessing.Process( + target=_lock_holder, args=(bson_file, out_queue, in_queue)) + proc.start() + in_queue.get() # Wait for the lock to be acquired. + + with self.assertRaises(OSError): + logger.BufferedLogger(str(self.temp_path), FakeWriter()) + out_queue.put(None) # Notify that we're done. + out_queue.close() + proc.join() + proc.close() + + # Test that it works after the lock is released. + logger.BufferedLogger(str(self.temp_path), FakeWriter()).close() + def _read_last_sent(self): with (self.temp_path / logger.LAST_SENT_FILENAME).open('r') as infile: return infile.read() @@ -163,5 +186,14 @@ return bson.decode_all(infile.read(), logger.BSON_OPTIONS) +def _lock_holder(path, in_queue, out_queue): + with open(path, 'w') as infile: + fcntl.flock(infile, fcntl.LOCK_SH) + out_queue.put(None) # Notify that we've acquired the lock. + out_queue.close() + in_queue.get() # Wait for the test to complete before closing. + + if __name__ == '__main__': + multiprocessing.set_start_method('spawn') unittest.main()