# HG changeset patch # User Paul Fisher # Date 1571524848 14400 # Node ID beb42c835c52f9de6a9f51ac7a425e27f0c025ea # Parent a7fe635d1c88894a153d6e88123ad0f61ad7a627 Make weather server handle arbitrary data: - Make logfile record arbitrary BSONs - Make server handlers OK with same - Make location type a normal class rather than attrs; have it handle its own logger. - Bump version number. diff -r a7fe635d1c88 -r beb42c835c52 setup.py --- a/setup.py Sun Oct 13 18:44:12 2019 -0400 +++ b/setup.py Sat Oct 19 18:40:48 2019 -0400 @@ -2,7 +2,7 @@ setuptools.setup( name='weather-server', - version='0.0.5', + version='0.0.6', packages=setuptools.find_packages(), python_requires='>=3.7', install_requires=[ diff -r a7fe635d1c88 -r beb42c835c52 weather_server/locations.py --- a/weather_server/locations.py Sun Oct 13 18:44:12 2019 -0400 +++ b/weather_server/locations.py Sat Oct 19 18:40:48 2019 -0400 @@ -1,53 +1,95 @@ """Manages the directory containing the various logs.""" import configparser +import datetime import pathlib import typing as t -import attr import pytz from . import logfile from . import types - CONFIG_FILE = 'config.ini' LOG = 'log.bson' -@attr.s(frozen=True, slots=True) -class LocationInfo: - name = attr.ib(type=str) - tz_name = attr.ib(type=str) - password = attr.ib(type=str) +class ConfigError(Exception): + """Raised when a location can't be loaded.""" + + +class Location: + + def __init__(self, root: pathlib.Path): + parser = configparser.ConfigParser(interpolation=None) + self.root = root + config_file = root / CONFIG_FILE + try: + with open(config_file, 'r') as infile: + parser.read_file(infile) + self.location = parser.get( + 'location', 'name', fallback='Weather station') + self.tz_name = parser.get('location', 'timezone', fallback='UTC') + self.password = parser.get('location', 'password') + self.logger = logfile.Logger.create( + str(root / LOG), sample_field='sample_time') + except (IOError, KeyError, configparser.Error): + raise ConfigError("Couldn't load location info.") - def timezone(self): + def record( + self, + entries: t.Iterable[t.Dict[str, object]], + timestamp: datetime.datetime, + ) -> None: + for e in entries: + e['ingest_time'] = timestamp + self.logger.write_rows(entries) + + @property + def entries(self) -> t.Iterable[t.Dict[str, object]]: + return self.logger.data + + def latest(self) -> t.Optional[types.Reading]: + most_recent = reversed(self.logger.data) + for entry in most_recent: + try: + return types.Reading.from_dict(entry) + except KeyError: + pass # go to the older one. + return None + + def timezone(self) -> datetime.tzinfo: try: return pytz.timezone(self.tz_name) except pytz.UnknownTimeZoneError: return pytz.UTC - @classmethod - def load(cls, config_file: pathlib.Path) -> 'LocationInfo': - parser = configparser.ConfigParser(interpolation=None) - parser.read(config_file) - return LocationInfo( - name=parser.get('location', 'name', fallback='Weather station'), - tz_name=parser.get('location', 'timezone', fallback='UTC'), - password=parser.get('location', 'password')) + def __repr__(self) -> str: + return ''.format(self.root) -class Locations: - def __init__(self, base: str): - self._path = pathlib.Path(base) +class LocationFolder: - def paths(self) -> t.Tuple[str, ...]: - return tuple(sorted(f.name for f in self._path.iterdir())) + def __init__(self, root: pathlib.Path): + self.root = root + # locations, mtime + self.info: t.Tuple[t.Dict[str, Location], t.Optional[int]] = ({}, None) + self._maybe_reload() + + def get(self, name: str) -> Location: + self._maybe_reload() + locs, _ = self.info + return locs[name] - def get(self, name) -> t.Tuple[LocationInfo, logfile.Logger]: - try: - directory = self._path / name - logger = logfile.Logger.create(str(directory / LOG)) - return (LocationInfo.load(directory / CONFIG_FILE), logger) - except OSError: - raise KeyError(name) + def _maybe_reload(self) -> None: + new_mtime = self.root.stat().st_mtime_ns + _, old_mtime = self.info + if old_mtime == new_mtime: + return + locations = {} + for child in self.root.iterdir(): + try: + locations[child.name] = Location(child) + except ConfigError: + pass # It's OK. Skip this. + self.info = locations, new_mtime diff -r a7fe635d1c88 -r beb42c835c52 weather_server/logfile.py --- a/weather_server/logfile.py Sun Oct 13 18:44:12 2019 -0400 +++ b/weather_server/logfile.py Sat Oct 19 18:40:48 2019 -0400 @@ -1,16 +1,37 @@ """The part which handles writing things out and reading things in from CSV. """ +import concurrent.futures as futures +import contextlib import fcntl import os +import queue import threading import typing as t import bson from . import common -from . import types + + +class _WriteRequest: + def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): + """Creates a request to write the given data to the log.""" + # The data to be written. We take ownership of all the dicts! + self.entries = entries + # Once written, a future that will resolve to None if successful. + self.future = futures.Future() + + +class _ReadRequest: + + def __init__(self): + # The future that will be set with the log's contnets. + self.future = futures.Future() + + +# probably handle file-writing with a queue that reports back its progress class Logger: """Logger which handles reading/writing a temperature log for one process. @@ -20,93 +41,140 @@ instances: t.Dict[str, 'Logger'] = {} @classmethod - def create(cls, filename: str) -> 'Logger': + def create( + cls, + filename: str, + *, + sample_field: str, + ) -> 'Logger': """Creates a single shared instance of a logger for the given file.""" try: - return cls.instances[filename] + instance = cls.instances[filename] except KeyError: with cls.instance_lock: try: - return cls.instances[filename] + instance = cls.instances[filename] except KeyError: - cls.instances[filename] = Logger(filename) - return cls.instances[filename] + cls.instances[filename] = Logger( + filename, + sample_field=sample_field) + instance = cls.instances[filename] + if instance._sample_field != sample_field: + raise ValueError( + 'Existing instance has different sample field: ' + '{!r} != {!r}'.format(instance._sample_field, sample_field)) + return instance - def __init__(self, filename: str): + def __init__(self, filename: str, *, sample_field: str): """You should probably call .create() instead.""" + self._sample_field = sample_field self._file = _open_or_create(filename) - self._data: t.Tuple[types.Reading] = () + self._data: t.List[t.Dict[str, t.Any], ...] = [] + self._queue = queue.SimpleQueue() self._last_size = 0 - self._maybe_read_data() - self._lock = threading.Lock() + self._lock_status: t.Optional[int] = None + self._writer_thread = threading.Thread(target=self._writer) + self._writer_thread.start() + + @property + def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: + req = _ReadRequest() + self._queue.put(req) + return req.future.result() + + def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): + req = _WriteRequest(entries) + self._queue.put(req) + return req.future.result() + + _POISON = object() + + def close(self): + self._queue.put(self._POISON) + self._writer_thread.join() - def _maybe_read_data(self) -> None: + def _writer(self) -> None: + running = True + while running: + item = self._queue.get() + if item is self._POISON: + # None is the poison pill that makes us stop. + running = False + elif isinstance(item, _ReadRequest): + if not item.future.set_running_or_notify_cancel(): + continue + try: + with self._file_lock(fcntl.LOCK_SH): + self._catch_up() + except BaseException as x: + item.future.set_exception(x) + else: + item.future.set_result(tuple(self._data)) + elif isinstance(item, _WriteRequest): + if not item.future.set_running_or_notify_cancel(): + continue + try: + with self._file_lock(fcntl.LOCK_EX): + self._catch_up() + # Since we're at the last good point, truncate after. + self._file.truncate(self._file.tell()) + if not self._data: + last = None + else: + last = self._data[-1][self._sample_field] + for entry in item.entries: + entry_key = entry[self._sample_field] + if last is None or last < entry_key: + self._file.write(common.bson_encode(entry)) + self._data.append(entry) + last = entry_key + self._file.flush() + self._last_size = self._file.tell() + except BaseException as x: + item.future.set_exception(x) + else: + item.future.set_result(None) + else: + raise AssertionError( + 'Unexpected item {!r} in the queue'.format(item)) + self._file.close() + + def _catch_up(self) -> None: """Reads data and advances the file pointer to the end of the file.""" - # This must be called with both the file lock and _lock held. + assert self._lock_status is not None, 'The lock must be held.' size = self._size() if size == self._last_size: return last_good = self._file.tell() - data = list(self._data) try: items = bson.decode_file_iter( self._file, codec_options=common.BSON_OPTIONS) for item in items: last_good = self._file.tell() - try: - data.append(types.Reading(**item)) - except TypeError: - pass # Skip this item. + self._data.append(item) except bson.InvalidBSON: pass # We have reached the last valid document. Bail. # Seek back to immediately after the end of the last valid doc. - self._data = tuple(data) - self._file.truncate(last_good) self._last_size = last_good self._file.seek(last_good, os.SEEK_SET) - def write_rows(self, readings: t.Iterable[types.Reading]) -> None: - """Write a sorted series of readings, ignoring old ones.""" - with self._lock: - fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) - try: - self._maybe_read_data() - self._file.truncate(self._file.tell()) - data = list(self._data) - if not data: - last_time = None - else: - last_time = data[-1].sample_time - for reading in readings: - if not last_time or last_time < reading.sample_time: - self._file.write(common.bson_encode(reading.as_dict())) - data.append(reading) - self._data = tuple(data) - finally: - self._file.flush() - self._last_size = self._size() - fcntl.flock(self, fcntl.LOCK_UN) - def fileno(self) -> int: return self._file.fileno() - def close(self): - self._file.close() - - @property - def data(self) -> t.Tuple[types.Reading, ...]: - if self._size() != self._last_size: - fcntl.flock(self, fcntl.LOCK_SH) - try: - with self._lock: - self._maybe_read_data() - finally: - fcntl.flock(self, fcntl.LOCK_UN) - return self._data - def _size(self) -> int: return os.stat(self.fileno()).st_size + @contextlib.contextmanager + def _file_lock(self, operation: int): + assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' + fcntl.flock(self, operation) + self._lock_status = operation + try: + yield + finally: + self._lock_status = None + fcntl.flock(self, fcntl.LOCK_UN) + def _open_or_create(path: str) -> t.BinaryIO: while True: diff -r a7fe635d1c88 -r beb42c835c52 weather_server/logfile_test.py --- a/weather_server/logfile_test.py Sun Oct 13 18:44:12 2019 -0400 +++ b/weather_server/logfile_test.py Sat Oct 19 18:40:48 2019 -0400 @@ -30,7 +30,9 @@ super().tearDown() def test_empty(self): - with contextlib.closing(logfile.Logger(str(self.log_path))) as logger: + lg = logfile.Logger( + str(self.log_path), sample_field='x') + with contextlib.closing(lg) as logger: self.assertEqual(logger.data, ()) def test_loading(self): @@ -42,10 +44,19 @@ ingest_time=ts(125), ))) outfile.write(b'garbage to ignore') - with contextlib.closing(logfile.Logger(str(self.log_path))) as logger: + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='sample_time', + )) as logger: self.assertEqual( logger.data, - (types.Reading(ts(123), 420, 69, ts(125)),)) + ( + dict( + sample_time=ts(123), + temp_c=420, + rh_pct=69, + ingest_time=ts(125)), + )) def test_writing(self): with self.log_path.open('wb') as outfile: @@ -55,16 +66,20 @@ rh_pct=69, ingest_time=ts(125), ))) - with contextlib.closing(logfile.Logger(str(self.log_path))) as logger: + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='sample_time', + )) as logger: logger.write_rows([ - types.Reading(ts(100), 999, 666, ts(101)), - types.Reading(ts(125), 333, 777, ts(200)), + # Ignored, since it's older than the newest entry. + types.Reading(ts(100), 999, 666, ts(101)).as_dict(), + types.Reading(ts(125), 333, 777, ts(200)).as_dict(), ]) self.assertEqual( logger.data, ( - types.Reading(ts(123), 420, 69, ts(125)), - types.Reading(ts(125), 333, 777, ts(200)), + types.Reading(ts(123), 420, 69, ts(125)).as_dict(), + types.Reading(ts(125), 333, 777, ts(200)).as_dict(), ) ) @@ -84,10 +99,13 @@ ]) def test_outside_writes(self): - with contextlib.closing(logfile.Logger(str(self.log_path))) as logger: + with contextlib.closing(logfile.Logger( + str(self.log_path), + sample_field='sample_time', + )) as logger: logger.write_rows([ - types.Reading(ts(100), 999, 666, ts(101)), - types.Reading(ts(125), 333, 777, ts(200)), + types.Reading(ts(100), 999, 666, ts(101)).as_dict(), + types.Reading(ts(125), 333, 777, ts(200)).as_dict(), ]) with self.log_path.open('ab') as outfile: outfile.write(common.bson_encode(dict( @@ -98,9 +116,9 @@ ))) outfile.flush() self.assertEqual(logger.data, ( - types.Reading(ts(100), 999, 666, ts(101)), - types.Reading(ts(125), 333, 777, ts(200)), - types.Reading(ts(1024), 256, 128, ts(4096)), + types.Reading(ts(100), 999, 666, ts(101)).as_dict(), + types.Reading(ts(125), 333, 777, ts(200)).as_dict(), + types.Reading(ts(1024), 256, 128, ts(4096)).as_dict(), )) def read_bsons(self): diff -r a7fe635d1c88 -r beb42c835c52 weather_server/server.py --- a/weather_server/server.py Sun Oct 13 18:44:12 2019 -0400 +++ b/weather_server/server.py Sat Oct 19 18:40:48 2019 -0400 @@ -1,17 +1,18 @@ import datetime import hmac +import pathlib import sys import bson import flask +import pytz from . import common from . import locations -from . import types def build_app(root_directory: str) -> flask.Flask: - locs = locations.Locations(root_directory) + locs = locations.LocationFolder(pathlib.Path(root_directory)) app = flask.Flask(__name__) app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 @@ -32,35 +33,27 @@ preamble = next(reader) loc_name = preamble['location'] password = str(preamble['password']) - loc, logger = locs.get(loc_name) + loc = locs.get(loc_name) if not hmac.compare_digest(password, loc.password): flask.abort(400) - entries = [ - types.Reading.from_now( - sample_time=item['sample_time'], - temp_c=item['temp_c'], - rh_pct=item['rh_pct'], - ) - for item in reader - ] + entries = tuple(reader) except (KeyError, bson.InvalidBSON): flask.abort(400) - logger.write_rows(entries) + now = datetime.datetime.now(tz=pytz.UTC) + loc.record(entries, now) return flask.jsonify({'status': 'OK'}) @app.route('/') def show(location: str): try: - loc, logger = locs.get(location) + loc = locs.get(location) except KeyError: flask.abort(404) - data = logger.data - if data: - last_reading = data[-1] + last_reading = loc.latest() + if last_reading: tz = loc.timezone() date = tz.normalize(last_reading.sample_time.astimezone(tz)) else: - last_reading = None date = None return flask.render_template( 'location.html', @@ -71,7 +64,7 @@ @app.route('//recent') def recent(location: str): try: - loc, logger = locs.get(location) + loc = locs.get(location) except KeyError: flask.abort(404) req = flask.request @@ -83,10 +76,7 @@ start = common.utc_now() - datetime.timedelta(seconds=seconds) - readings = [ - r.as_dict() for r in logger.data - if start < r.sample_time - ] + readings = [r for r in loc.entries if start < r['sample_time']] resp = flask.Response() resp.content_type = 'application/json' resp.data = common.json_dumps({ diff -r a7fe635d1c88 -r beb42c835c52 weather_server/types.py --- a/weather_server/types.py Sun Oct 13 18:44:12 2019 -0400 +++ b/weather_server/types.py Sat Oct 19 18:40:48 2019 -0400 @@ -6,7 +6,8 @@ import attr -from . import common + +T = t.TypeVar('T') def c_to_f(c: float) -> float: @@ -51,12 +52,9 @@ def dew_point_f(self) -> float: return c_to_f(self.dew_point_c) - def as_dict(self) -> t.Dict[str, t.Any]: - return attr.asdict(self, recurse=False) - @classmethod - def from_now(cls, **kwargs) -> 'Reading': - return cls(ingest_time=common.utc_now(), **kwargs) + def from_dict(cls: t.Type[T], d: t.Dict[str, t.Any]) -> T: + return cls(**{f.name: d[f.name] for f in attr.fields(cls)}) @property def _gamma(self) -> float: