Mercurial > personal > weather-server
view weather_server/logfile.py @ 21:beb42c835c52
Make weather server handle arbitrary data:
- Make logfile record arbitrary BSONs
- Make server handlers OK with same
- Make location type a normal class rather than attrs;
have it handle its own logger.
- Bump version number.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sat, 19 Oct 2019 18:40:48 -0400 |
parents | efe7a1eff167 |
children | 20c8ec56e447 |
line wrap: on
line source
"""The part which handles writing things out and reading things in from CSV. """ import concurrent.futures as futures import contextlib import fcntl import os import queue import threading import typing as t import bson from . import common class _WriteRequest: def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): """Creates a request to write the given data to the log.""" # The data to be written. We take ownership of all the dicts! self.entries = entries # Once written, a future that will resolve to None if successful. self.future = futures.Future() class _ReadRequest: def __init__(self): # The future that will be set with the log's contnets. self.future = futures.Future() # probably handle file-writing with a queue that reports back its progress class Logger: """Logger which handles reading/writing a temperature log for one process. """ instance_lock = threading.Lock() instances: t.Dict[str, 'Logger'] = {} @classmethod def create( cls, filename: str, *, sample_field: str, ) -> 'Logger': """Creates a single shared instance of a logger for the given file.""" try: instance = cls.instances[filename] except KeyError: with cls.instance_lock: try: instance = cls.instances[filename] except KeyError: cls.instances[filename] = Logger( filename, sample_field=sample_field) instance = cls.instances[filename] if instance._sample_field != sample_field: raise ValueError( 'Existing instance has different sample field: ' '{!r} != {!r}'.format(instance._sample_field, sample_field)) return instance def __init__(self, filename: str, *, sample_field: str): """You should probably call .create() instead.""" self._sample_field = sample_field self._file = _open_or_create(filename) self._data: t.List[t.Dict[str, t.Any], ...] = [] self._queue = queue.SimpleQueue() self._last_size = 0 self._lock_status: t.Optional[int] = None self._writer_thread = threading.Thread(target=self._writer) self._writer_thread.start() @property def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: req = _ReadRequest() self._queue.put(req) return req.future.result() def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): req = _WriteRequest(entries) self._queue.put(req) return req.future.result() _POISON = object() def close(self): self._queue.put(self._POISON) self._writer_thread.join() def _writer(self) -> None: running = True while running: item = self._queue.get() if item is self._POISON: # None is the poison pill that makes us stop. running = False elif isinstance(item, _ReadRequest): if not item.future.set_running_or_notify_cancel(): continue try: with self._file_lock(fcntl.LOCK_SH): self._catch_up() except BaseException as x: item.future.set_exception(x) else: item.future.set_result(tuple(self._data)) elif isinstance(item, _WriteRequest): if not item.future.set_running_or_notify_cancel(): continue try: with self._file_lock(fcntl.LOCK_EX): self._catch_up() # Since we're at the last good point, truncate after. self._file.truncate(self._file.tell()) if not self._data: last = None else: last = self._data[-1][self._sample_field] for entry in item.entries: entry_key = entry[self._sample_field] if last is None or last < entry_key: self._file.write(common.bson_encode(entry)) self._data.append(entry) last = entry_key self._file.flush() self._last_size = self._file.tell() except BaseException as x: item.future.set_exception(x) else: item.future.set_result(None) else: raise AssertionError( 'Unexpected item {!r} in the queue'.format(item)) self._file.close() def _catch_up(self) -> None: """Reads data and advances the file pointer to the end of the file.""" assert self._lock_status is not None, 'The lock must be held.' size = self._size() if size == self._last_size: return last_good = self._file.tell() try: items = bson.decode_file_iter( self._file, codec_options=common.BSON_OPTIONS) for item in items: last_good = self._file.tell() self._data.append(item) except bson.InvalidBSON: pass # We have reached the last valid document. Bail. # Seek back to immediately after the end of the last valid doc. self._last_size = last_good self._file.seek(last_good, os.SEEK_SET) def fileno(self) -> int: return self._file.fileno() def _size(self) -> int: return os.stat(self.fileno()).st_size @contextlib.contextmanager def _file_lock(self, operation: int): assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' fcntl.flock(self, operation) self._lock_status = operation try: yield finally: self._lock_status = None fcntl.flock(self, fcntl.LOCK_UN) def _open_or_create(path: str) -> t.BinaryIO: while True: try: return open(path, 'r+b') except FileNotFoundError: pass try: return open(path, 'x+b') except FileExistsError: pass