Mercurial > personal > weather-server
diff weather_server/logfile.py @ 0:efe7a1eff167
Create initial logger for weather server.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sat, 28 Sep 2019 23:17:21 -0400 |
parents | |
children | beb42c835c52 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/weather_server/logfile.py Sat Sep 28 23:17:21 2019 -0400 @@ -0,0 +1,120 @@ +"""The part which handles writing things out and reading things in from CSV. +""" + +import fcntl +import os +import threading +import typing as t + +import bson + +from . import common +from . import types + + +class Logger: + """Logger which handles reading/writing a temperature log for one process. + """ + + instance_lock = threading.Lock() + instances: t.Dict[str, 'Logger'] = {} + + @classmethod + def create(cls, filename: str) -> 'Logger': + """Creates a single shared instance of a logger for the given file.""" + try: + return cls.instances[filename] + except KeyError: + with cls.instance_lock: + try: + return cls.instances[filename] + except KeyError: + cls.instances[filename] = Logger(filename) + return cls.instances[filename] + + def __init__(self, filename: str): + """You should probably call .create() instead.""" + self._file = _open_or_create(filename) + self._data: t.Tuple[types.Reading] = () + self._last_size = 0 + self._maybe_read_data() + self._lock = threading.Lock() + + def _maybe_read_data(self) -> None: + """Reads data and advances the file pointer to the end of the file.""" + # This must be called with both the file lock and _lock held. + size = self._size() + if size == self._last_size: + return + last_good = self._file.tell() + data = list(self._data) + try: + items = bson.decode_file_iter( + self._file, codec_options=common.BSON_OPTIONS) + for item in items: + last_good = self._file.tell() + try: + data.append(types.Reading(**item)) + except TypeError: + pass # Skip this item. + except bson.InvalidBSON: + pass # We have reached the last valid document. Bail. + # Seek back to immediately after the end of the last valid doc. + self._data = tuple(data) + self._file.truncate(last_good) + self._last_size = last_good + self._file.seek(last_good, os.SEEK_SET) + + def write_rows(self, readings: t.Iterable[types.Reading]) -> None: + """Write a sorted series of readings, ignoring old ones.""" + with self._lock: + fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) + try: + self._maybe_read_data() + self._file.truncate(self._file.tell()) + data = list(self._data) + if not data: + last_time = None + else: + last_time = data[-1].sample_time + for reading in readings: + if not last_time or last_time < reading.sample_time: + self._file.write(common.bson_encode(reading.as_dict())) + data.append(reading) + self._data = tuple(data) + finally: + self._file.flush() + self._last_size = self._size() + fcntl.flock(self, fcntl.LOCK_UN) + + def fileno(self) -> int: + return self._file.fileno() + + def close(self): + self._file.close() + + @property + def data(self) -> t.Tuple[types.Reading, ...]: + if self._size() != self._last_size: + fcntl.flock(self, fcntl.LOCK_SH) + try: + with self._lock: + self._maybe_read_data() + finally: + fcntl.flock(self, fcntl.LOCK_UN) + return self._data + + def _size(self) -> int: + return os.stat(self.fileno()).st_size + + +def _open_or_create(path: str) -> t.BinaryIO: + while True: + try: + return open(path, 'r+b') + except FileNotFoundError: + pass + try: + return open(path, 'x+b') + except FileExistsError: + pass