Mercurial > personal > weather-server
view weather_server/logfile.py @ 30:c760ab7f93c2
bump version number
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Tue, 19 May 2020 10:15:29 -0400 |
parents | 20c8ec56e447 |
children | 9bc3687e1e5e |
line wrap: on
line source
"""The part which handles writing things out and reading things in from CSV. """ import concurrent.futures as futures import contextlib import fcntl import os import queue import threading import typing as t import bson from . import common class _WriteRequest: def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): """Creates a request to write the given data to the log.""" # The data to be written. We take ownership of all the dicts! self.entries = entries # Once written, a future that will resolve to None if successful. self.future = futures.Future() class _ReadRequest: def __init__(self): # The future that will be set with the log's contnets. self.future = futures.Future() # Poison pill to tell a logger thread to stop. _POISON = object() class Logger: """Logger which handles reading/writing one temperature log file.""" def __init__(self, filename: str, *, sample_field: str): """Creates a new Logger for the given file. Args: filename: The filename to open, or create if not already there. sample_field: The field name to use as the strictly-increasing value to ensure that no duplicate writes occur. """ self._sample_field = sample_field self._queue = queue.SimpleQueue() # Create a Future that will be resolved once the file is opened # (or fails to be opened). writer_started = futures.Future() self._writer_thread = threading.Thread( name=f'{filename!r} writer thread', target=lambda: _writer_thread( filename, self._queue, sample_field, writer_started), daemon=True) self._writer_thread.start() writer_started.result() @property def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: req = _ReadRequest() self._queue.put(req) return req.future.result() def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): req = _WriteRequest(entries) self._queue.put(req) return req.future.result() def __del__(self): self.close() def close(self): self._queue.put(_POISON) self._writer_thread.join() def _writer_thread( filename: str, q: queue.Queue, sample_field: str, started: futures.Future, ) -> None: if not started.set_running_or_notify_cancel(): return try: file = _open_or_create(filename) started.set_result(None) except BaseException as e: started.set_exception(e) return with file: running = True data: t.List[t.Dict[str, object]] = [] while running: item = q.get() if item is _POISON: # None is the poison pill that makes us stop. running = False elif isinstance(item, _ReadRequest): if not item.future.set_running_or_notify_cancel(): continue try: with _file_lock(file, fcntl.LOCK_SH): data.extend(_catch_up(file)) except BaseException as x: item.future.set_exception(x) else: item.future.set_result(tuple(data)) elif isinstance(item, _WriteRequest): if not item.future.set_running_or_notify_cancel(): continue try: with _file_lock(file, fcntl.LOCK_EX): data.extend(_catch_up(file)) # Since we're at the last good point, truncate after. file.truncate(file.tell()) if not data: last = None else: last = data[-1][sample_field] for entry in item.entries: entry_key = entry[sample_field] if last is None or last < entry_key: file.write(common.bson_encode(entry)) data.append(entry) last = entry_key file.flush() except BaseException as x: item.future.set_exception(x) else: item.future.set_result(None) else: raise AssertionError( 'Unexpected item {!r} in the queue'.format(item)) @contextlib.contextmanager def _file_lock(file: t.BinaryIO, operation: int) -> t.Iterator[None]: assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' fcntl.flock(file, operation) try: yield finally: fcntl.flock(file, fcntl.LOCK_UN) def _size(file: t.BinaryIO) -> int: return os.stat(file.fileno()).st_size def _catch_up(file: t.BinaryIO) -> t.Iterable[t.Dict[str, object]]: """Reads data and advances the file pointer to the end of the file.""" size = _size(file) pointer = file.tell() if size == pointer: return () output: t.List[t.Dict[str, object]] = [] try: items = bson.decode_file_iter( file, codec_options=common.BSON_OPTIONS) for item in items: pointer = file.tell() output.append(item) except bson.InvalidBSON: pass # We have reached the last valid document. Bail. # Seek back to immediately after the end of the last valid doc. file.seek(pointer, os.SEEK_SET) return output def _open_or_create(path: str) -> t.BinaryIO: while True: try: return open(path, 'r+b') except FileNotFoundError: pass try: return open(path, 'x+b') except FileExistsError: pass