Mercurial > personal > weatherlog
changeset 6:8a350ec1aa78
logger_test: Ensure it crashes when the file is locked.
| author | Paul Fisher <paul@pfish.zone> | 
|---|---|
| date | Sat, 28 Sep 2019 19:55:27 -0400 | 
| parents | 885bff085edf | 
| children | 357079c3c150 | 
| files | weatherlog/logger.py weatherlog/logger_test.py | 
| diffstat | 2 files changed, 33 insertions(+), 0 deletions(-) [+] | 
line wrap: on
 line diff
--- a/weatherlog/logger.py Sat Sep 28 19:28:22 2019 -0400 +++ b/weatherlog/logger.py Sat Sep 28 19:55:27 2019 -0400 @@ -121,6 +121,7 @@ try: fcntl.flock(file, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError as ex: + file.close() raise OSError('Another copy of the logger is running.') from ex return file
--- a/weatherlog/logger_test.py Sat Sep 28 19:28:22 2019 -0400 +++ b/weatherlog/logger_test.py Sat Sep 28 19:55:27 2019 -0400 @@ -1,6 +1,8 @@ import contextlib import datetime +import fcntl import itertools +import multiprocessing import pathlib import tempfile import time @@ -154,6 +156,27 @@ dict(sample_time=ts(31337), temp_c=666, rh_pct=999), ]) + def test_fail_upon_lock(self): + bson_file = str(self.temp_path / logger.BSON_FILENAME) + out_queue = multiprocessing.Queue() + in_queue = multiprocessing.Queue() + # This needs to be in a separate multiprocessing.Process + # since flock-based file locks are per-process, not per-thread. + proc = multiprocessing.Process( + target=_lock_holder, args=(bson_file, out_queue, in_queue)) + proc.start() + in_queue.get() # Wait for the lock to be acquired. + + with self.assertRaises(OSError): + logger.BufferedLogger(str(self.temp_path), FakeWriter()) + out_queue.put(None) # Notify that we're done. + out_queue.close() + proc.join() + proc.close() + + # Test that it works after the lock is released. + logger.BufferedLogger(str(self.temp_path), FakeWriter()).close() + def _read_last_sent(self): with (self.temp_path / logger.LAST_SENT_FILENAME).open('r') as infile: return infile.read() @@ -163,5 +186,14 @@ return bson.decode_all(infile.read(), logger.BSON_OPTIONS) +def _lock_holder(path, in_queue, out_queue): + with open(path, 'w') as infile: + fcntl.flock(infile, fcntl.LOCK_SH) + out_queue.put(None) # Notify that we've acquired the lock. + out_queue.close() + in_queue.get() # Wait for the test to complete before closing. + + if __name__ == '__main__': + multiprocessing.set_start_method('spawn') unittest.main()
