# HG changeset patch # User Paul Fisher # Date 1569727041 14400 # Node ID efe7a1eff16786bdbff3cfb87db27fc471e8a6f6 Create initial logger for weather server. diff -r 000000000000 -r efe7a1eff167 weather_server/__init__.py diff -r 000000000000 -r efe7a1eff167 weather_server/common.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/weather_server/common.py Sat Sep 28 23:17:21 2019 -0400 @@ -0,0 +1,11 @@ +import typing as t + +import bson +import pytz + +BSON_OPTIONS = bson.DEFAULT_CODEC_OPTIONS.with_options( + tz_aware=True, tzinfo=pytz.UTC) + + +def bson_encode(data: t.Dict[str, t.Any]) -> bytes: + return bson.BSON.encode(data, codec_options=BSON_OPTIONS) diff -r 000000000000 -r efe7a1eff167 weather_server/logfile.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/weather_server/logfile.py Sat Sep 28 23:17:21 2019 -0400 @@ -0,0 +1,120 @@ +"""The part which handles writing things out and reading things in from CSV. +""" + +import fcntl +import os +import threading +import typing as t + +import bson + +from . import common +from . import types + + +class Logger: + """Logger which handles reading/writing a temperature log for one process. + """ + + instance_lock = threading.Lock() + instances: t.Dict[str, 'Logger'] = {} + + @classmethod + def create(cls, filename: str) -> 'Logger': + """Creates a single shared instance of a logger for the given file.""" + try: + return cls.instances[filename] + except KeyError: + with cls.instance_lock: + try: + return cls.instances[filename] + except KeyError: + cls.instances[filename] = Logger(filename) + return cls.instances[filename] + + def __init__(self, filename: str): + """You should probably call .create() instead.""" + self._file = _open_or_create(filename) + self._data: t.Tuple[types.Reading] = () + self._last_size = 0 + self._maybe_read_data() + self._lock = threading.Lock() + + def _maybe_read_data(self) -> None: + """Reads data and advances the file pointer to the end of the file.""" + # This must be called with both the file lock and _lock held. + size = self._size() + if size == self._last_size: + return + last_good = self._file.tell() + data = list(self._data) + try: + items = bson.decode_file_iter( + self._file, codec_options=common.BSON_OPTIONS) + for item in items: + last_good = self._file.tell() + try: + data.append(types.Reading(**item)) + except TypeError: + pass # Skip this item. + except bson.InvalidBSON: + pass # We have reached the last valid document. Bail. + # Seek back to immediately after the end of the last valid doc. + self._data = tuple(data) + self._file.truncate(last_good) + self._last_size = last_good + self._file.seek(last_good, os.SEEK_SET) + + def write_rows(self, readings: t.Iterable[types.Reading]) -> None: + """Write a sorted series of readings, ignoring old ones.""" + with self._lock: + fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) + try: + self._maybe_read_data() + self._file.truncate(self._file.tell()) + data = list(self._data) + if not data: + last_time = None + else: + last_time = data[-1].sample_time + for reading in readings: + if not last_time or last_time < reading.sample_time: + self._file.write(common.bson_encode(reading.as_dict())) + data.append(reading) + self._data = tuple(data) + finally: + self._file.flush() + self._last_size = self._size() + fcntl.flock(self, fcntl.LOCK_UN) + + def fileno(self) -> int: + return self._file.fileno() + + def close(self): + self._file.close() + + @property + def data(self) -> t.Tuple[types.Reading, ...]: + if self._size() != self._last_size: + fcntl.flock(self, fcntl.LOCK_SH) + try: + with self._lock: + self._maybe_read_data() + finally: + fcntl.flock(self, fcntl.LOCK_UN) + return self._data + + def _size(self) -> int: + return os.stat(self.fileno()).st_size + + +def _open_or_create(path: str) -> t.BinaryIO: + while True: + try: + return open(path, 'r+b') + except FileNotFoundError: + pass + try: + return open(path, 'x+b') + except FileExistsError: + pass diff -r 000000000000 -r efe7a1eff167 weather_server/logfile_test.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/weather_server/logfile_test.py Sat Sep 28 23:17:21 2019 -0400 @@ -0,0 +1,113 @@ +import contextlib +import datetime +import pathlib +import tempfile +import unittest + +import bson +import pytz + +from . import common +from . import logfile +from . import types + + +def ts(n): + return datetime.datetime.utcfromtimestamp(n).replace(tzinfo=pytz.UTC) + + +class LoggerTest(unittest.TestCase): + + maxDiff = None + + def setUp(self): + super().setUp() + self.temp_dir = tempfile.TemporaryDirectory() + self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson' + + def tearDown(self): + self.temp_dir.cleanup() + super().tearDown() + + def test_empty(self): + with contextlib.closing(logfile.Logger(str(self.log_path))) as logger: + self.assertEqual(logger.data, ()) + + def test_loading(self): + with self.log_path.open('wb') as outfile: + outfile.write(common.bson_encode(dict( + sample_time=ts(123), + temp_c=420, + rh_pct=69, + ingest_time=ts(125), + ))) + outfile.write(b'garbage to ignore') + with contextlib.closing(logfile.Logger(str(self.log_path))) as logger: + self.assertEqual( + logger.data, + (types.Reading(ts(123), 420, 69, ts(125)),)) + + def test_writing(self): + with self.log_path.open('wb') as outfile: + outfile.write(common.bson_encode(dict( + sample_time=ts(123), + temp_c=420, + rh_pct=69, + ingest_time=ts(125), + ))) + with contextlib.closing(logfile.Logger(str(self.log_path))) as logger: + logger.write_rows([ + types.Reading(ts(100), 999, 666, ts(101)), + types.Reading(ts(125), 333, 777, ts(200)), + ]) + self.assertEqual( + logger.data, + ( + types.Reading(ts(123), 420, 69, ts(125)), + types.Reading(ts(125), 333, 777, ts(200)), + ) + ) + + self.assertEqual(self.read_bsons(), [ + dict( + sample_time=ts(123), + temp_c=420, + rh_pct=69, + ingest_time=ts(125), + ), + dict( + sample_time=ts(125), + temp_c=333, + rh_pct=777, + ingest_time=ts(200), + ), + ]) + + def test_outside_writes(self): + with contextlib.closing(logfile.Logger(str(self.log_path))) as logger: + logger.write_rows([ + types.Reading(ts(100), 999, 666, ts(101)), + types.Reading(ts(125), 333, 777, ts(200)), + ]) + with self.log_path.open('ab') as outfile: + outfile.write(common.bson_encode(dict( + sample_time=ts(1024), + temp_c=256, + rh_pct=128, + ingest_time=ts(4096), + ))) + outfile.flush() + self.assertEqual(logger.data, ( + types.Reading(ts(100), 999, 666, ts(101)), + types.Reading(ts(125), 333, 777, ts(200)), + types.Reading(ts(1024), 256, 128, ts(4096)), + )) + + def read_bsons(self): + with self.log_path.open('rb') as infile: + return bson.decode_all( + infile.read(), common.BSON_OPTIONS) + + +if __name__ == '__main__': + unittest.main() diff -r 000000000000 -r efe7a1eff167 weather_server/types.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/weather_server/types.py Sat Sep 28 23:17:21 2019 -0400 @@ -0,0 +1,68 @@ +"""Basic data types for the weather server.""" + +import datetime +import math +import typing as t + +import attr +import pytz + + +def c_to_f(c: float) -> float: + return c * 9 / 5 + 32 + + +# Values from Sontag1990 via Wikipedia: +# https://en.wikipedia.org/wiki/Dew_point +_MAGNUS_B = 17.62 +_MAGNUS_C = 243.12 + + +@attr.s(frozen=True, slots=True) +class Reading: + """A single reading from a weather thingy. + + Field order is important, as it is used in CSV files. + """ + + # The Unix timestamp of the reading. + sample_time = attr.ib(type=datetime.datetime) + + # The temperature, in degrees Celsius. + temp_c = attr.ib(type=float) + + # The relative humidity, in percent. + rh_pct = attr.ib(type=float) + + # The Unix timestamp when the reading was received. + ingest_time = attr.ib(type=datetime.datetime) + + @property + def temp_f(self) -> float: + return c_to_f(self.temp_c) + + @property + def dew_point_c(self) -> float: + gamma = self._gamma + return _MAGNUS_C * gamma / (_MAGNUS_B - gamma) + + @property + def dew_point_f(self) -> float: + return c_to_f(self.dew_point_c) + + def as_dict(self) -> t.Dict[str, t.Any]: + return attr.asdict(self, recurse=False) + + @classmethod + def from_now(cls, **kwargs) -> 'Reading': + return cls(ingest_time=_utc_now(), **kwargs) + + @property + def _gamma(self) -> float: + return ( + math.log(self.rh_pct / 100) + + _MAGNUS_B * self.temp_c / (_MAGNUS_C + self.temp_c)) + + +def _utc_now(): + return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)