Mercurial > personal > weather-server
view weather_server/locations.py @ 28:f817fa785c93
Make graph only consider visible points when setting range.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sun, 19 Jan 2020 17:05:11 -0500 |
parents | 7def5611895b |
children | 9bc3687e1e5e |
line wrap: on
line source
"""Manages the directory containing the various logs.""" import configparser import datetime import pathlib import typing as t import pytz from . import logfile from . import types CONFIG_FILE = 'config.ini' LOG = 'log.bson' class ConfigError(Exception): """Raised when a location can't be loaded.""" class Location: def __init__(self, root: pathlib.Path, key: str): parser = configparser.ConfigParser(interpolation=None) self.root = root self.key = key config_file = root / CONFIG_FILE try: with open(config_file, 'r') as infile: parser.read_file(infile) self.name = parser.get( 'location', 'name', fallback='Weather station') self.tz_name = parser.get('location', 'timezone', fallback='UTC') self.password = parser.get('location', 'password') self.logger = logfile.Logger( str(root / LOG), sample_field='sample_time') except (IOError, KeyError, configparser.Error): raise ConfigError("Couldn't load location info.") def record( self, entries: t.Iterable[t.Dict[str, object]], timestamp: datetime.datetime, ) -> None: for e in entries: e['ingest_time'] = timestamp self.logger.write_rows(entries) @property def entries(self) -> t.Iterable[t.Dict[str, object]]: return self.logger.data def latest(self) -> t.Optional[types.Reading]: most_recent = reversed(self.logger.data) for entry in most_recent: try: return types.Reading.from_dict(entry) except KeyError: pass # go to the older one. return None def timezone(self) -> datetime.tzinfo: try: return pytz.timezone(self.tz_name) except pytz.UnknownTimeZoneError: return pytz.UTC def __repr__(self) -> str: return '<Location in %r>'.format(self.root) class LocationFolder: def __init__(self, root: pathlib.Path): self.root = root # locations, mtime self.info: t.Tuple[t.Dict[str, Location], t.Optional[int]] = ({}, None) self._maybe_reload() def get(self, name: str) -> Location: self._maybe_reload() locs, _ = self.info return locs[name] def locations(self) -> t.Dict[str, Location]: return self.info[0] def _maybe_reload(self) -> None: new_mtime = self.root.stat().st_mtime_ns _, old_mtime = self.info if old_mtime == new_mtime: return locations = {} for child in self.root.iterdir(): try: locations[child.name] = Location(child, child.name) except ConfigError: pass # It's OK. Skip this. self.info = locations, new_mtime