Mercurial > personal > weather-server
diff weather_server/logfile.py @ 21:beb42c835c52
Make weather server handle arbitrary data:
- Make logfile record arbitrary BSONs
- Make server handlers OK with same
- Make location type a normal class rather than attrs;
have it handle its own logger.
- Bump version number.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sat, 19 Oct 2019 18:40:48 -0400 |
parents | efe7a1eff167 |
children | 20c8ec56e447 |
line wrap: on
line diff
--- a/weather_server/logfile.py Sun Oct 13 18:44:12 2019 -0400 +++ b/weather_server/logfile.py Sat Oct 19 18:40:48 2019 -0400 @@ -1,16 +1,37 @@ """The part which handles writing things out and reading things in from CSV. """ +import concurrent.futures as futures +import contextlib import fcntl import os +import queue import threading import typing as t import bson from . import common -from . import types + + +class _WriteRequest: + def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): + """Creates a request to write the given data to the log.""" + # The data to be written. We take ownership of all the dicts! + self.entries = entries + # Once written, a future that will resolve to None if successful. + self.future = futures.Future() + + +class _ReadRequest: + + def __init__(self): + # The future that will be set with the log's contnets. + self.future = futures.Future() + + +# probably handle file-writing with a queue that reports back its progress class Logger: """Logger which handles reading/writing a temperature log for one process. @@ -20,93 +41,140 @@ instances: t.Dict[str, 'Logger'] = {} @classmethod - def create(cls, filename: str) -> 'Logger': + def create( + cls, + filename: str, + *, + sample_field: str, + ) -> 'Logger': """Creates a single shared instance of a logger for the given file.""" try: - return cls.instances[filename] + instance = cls.instances[filename] except KeyError: with cls.instance_lock: try: - return cls.instances[filename] + instance = cls.instances[filename] except KeyError: - cls.instances[filename] = Logger(filename) - return cls.instances[filename] + cls.instances[filename] = Logger( + filename, + sample_field=sample_field) + instance = cls.instances[filename] + if instance._sample_field != sample_field: + raise ValueError( + 'Existing instance has different sample field: ' + '{!r} != {!r}'.format(instance._sample_field, sample_field)) + return instance - def __init__(self, filename: str): + def __init__(self, filename: str, *, sample_field: str): """You should probably call .create() instead.""" + self._sample_field = sample_field self._file = _open_or_create(filename) - self._data: t.Tuple[types.Reading] = () + self._data: t.List[t.Dict[str, t.Any], ...] = [] + self._queue = queue.SimpleQueue() self._last_size = 0 - self._maybe_read_data() - self._lock = threading.Lock() + self._lock_status: t.Optional[int] = None + self._writer_thread = threading.Thread(target=self._writer) + self._writer_thread.start() + + @property + def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: + req = _ReadRequest() + self._queue.put(req) + return req.future.result() + + def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): + req = _WriteRequest(entries) + self._queue.put(req) + return req.future.result() + + _POISON = object() + + def close(self): + self._queue.put(self._POISON) + self._writer_thread.join() - def _maybe_read_data(self) -> None: + def _writer(self) -> None: + running = True + while running: + item = self._queue.get() + if item is self._POISON: + # None is the poison pill that makes us stop. + running = False + elif isinstance(item, _ReadRequest): + if not item.future.set_running_or_notify_cancel(): + continue + try: + with self._file_lock(fcntl.LOCK_SH): + self._catch_up() + except BaseException as x: + item.future.set_exception(x) + else: + item.future.set_result(tuple(self._data)) + elif isinstance(item, _WriteRequest): + if not item.future.set_running_or_notify_cancel(): + continue + try: + with self._file_lock(fcntl.LOCK_EX): + self._catch_up() + # Since we're at the last good point, truncate after. + self._file.truncate(self._file.tell()) + if not self._data: + last = None + else: + last = self._data[-1][self._sample_field] + for entry in item.entries: + entry_key = entry[self._sample_field] + if last is None or last < entry_key: + self._file.write(common.bson_encode(entry)) + self._data.append(entry) + last = entry_key + self._file.flush() + self._last_size = self._file.tell() + except BaseException as x: + item.future.set_exception(x) + else: + item.future.set_result(None) + else: + raise AssertionError( + 'Unexpected item {!r} in the queue'.format(item)) + self._file.close() + + def _catch_up(self) -> None: """Reads data and advances the file pointer to the end of the file.""" - # This must be called with both the file lock and _lock held. + assert self._lock_status is not None, 'The lock must be held.' size = self._size() if size == self._last_size: return last_good = self._file.tell() - data = list(self._data) try: items = bson.decode_file_iter( self._file, codec_options=common.BSON_OPTIONS) for item in items: last_good = self._file.tell() - try: - data.append(types.Reading(**item)) - except TypeError: - pass # Skip this item. + self._data.append(item) except bson.InvalidBSON: pass # We have reached the last valid document. Bail. # Seek back to immediately after the end of the last valid doc. - self._data = tuple(data) - self._file.truncate(last_good) self._last_size = last_good self._file.seek(last_good, os.SEEK_SET) - def write_rows(self, readings: t.Iterable[types.Reading]) -> None: - """Write a sorted series of readings, ignoring old ones.""" - with self._lock: - fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) - try: - self._maybe_read_data() - self._file.truncate(self._file.tell()) - data = list(self._data) - if not data: - last_time = None - else: - last_time = data[-1].sample_time - for reading in readings: - if not last_time or last_time < reading.sample_time: - self._file.write(common.bson_encode(reading.as_dict())) - data.append(reading) - self._data = tuple(data) - finally: - self._file.flush() - self._last_size = self._size() - fcntl.flock(self, fcntl.LOCK_UN) - def fileno(self) -> int: return self._file.fileno() - def close(self): - self._file.close() - - @property - def data(self) -> t.Tuple[types.Reading, ...]: - if self._size() != self._last_size: - fcntl.flock(self, fcntl.LOCK_SH) - try: - with self._lock: - self._maybe_read_data() - finally: - fcntl.flock(self, fcntl.LOCK_UN) - return self._data - def _size(self) -> int: return os.stat(self.fileno()).st_size + @contextlib.contextmanager + def _file_lock(self, operation: int): + assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' + fcntl.flock(self, operation) + self._lock_status = operation + try: + yield + finally: + self._lock_status = None + fcntl.flock(self, fcntl.LOCK_UN) + def _open_or_create(path: str) -> t.BinaryIO: while True: