Mercurial > personal > weather-server
view weather_server/logfile.py @ 0:efe7a1eff167
Create initial logger for weather server.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sat, 28 Sep 2019 23:17:21 -0400 |
parents | |
children | beb42c835c52 |
line wrap: on
line source
"""The part which handles writing things out and reading things in from CSV. """ import fcntl import os import threading import typing as t import bson from . import common from . import types class Logger: """Logger which handles reading/writing a temperature log for one process. """ instance_lock = threading.Lock() instances: t.Dict[str, 'Logger'] = {} @classmethod def create(cls, filename: str) -> 'Logger': """Creates a single shared instance of a logger for the given file.""" try: return cls.instances[filename] except KeyError: with cls.instance_lock: try: return cls.instances[filename] except KeyError: cls.instances[filename] = Logger(filename) return cls.instances[filename] def __init__(self, filename: str): """You should probably call .create() instead.""" self._file = _open_or_create(filename) self._data: t.Tuple[types.Reading] = () self._last_size = 0 self._maybe_read_data() self._lock = threading.Lock() def _maybe_read_data(self) -> None: """Reads data and advances the file pointer to the end of the file.""" # This must be called with both the file lock and _lock held. size = self._size() if size == self._last_size: return last_good = self._file.tell() data = list(self._data) try: items = bson.decode_file_iter( self._file, codec_options=common.BSON_OPTIONS) for item in items: last_good = self._file.tell() try: data.append(types.Reading(**item)) except TypeError: pass # Skip this item. except bson.InvalidBSON: pass # We have reached the last valid document. Bail. # Seek back to immediately after the end of the last valid doc. self._data = tuple(data) self._file.truncate(last_good) self._last_size = last_good self._file.seek(last_good, os.SEEK_SET) def write_rows(self, readings: t.Iterable[types.Reading]) -> None: """Write a sorted series of readings, ignoring old ones.""" with self._lock: fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) try: self._maybe_read_data() self._file.truncate(self._file.tell()) data = list(self._data) if not data: last_time = None else: last_time = data[-1].sample_time for reading in readings: if not last_time or last_time < reading.sample_time: self._file.write(common.bson_encode(reading.as_dict())) data.append(reading) self._data = tuple(data) finally: self._file.flush() self._last_size = self._size() fcntl.flock(self, fcntl.LOCK_UN) def fileno(self) -> int: return self._file.fileno() def close(self): self._file.close() @property def data(self) -> t.Tuple[types.Reading, ...]: if self._size() != self._last_size: fcntl.flock(self, fcntl.LOCK_SH) try: with self._lock: self._maybe_read_data() finally: fcntl.flock(self, fcntl.LOCK_UN) return self._data def _size(self) -> int: return os.stat(self.fileno()).st_size def _open_or_create(path: str) -> t.BinaryIO: while True: try: return open(path, 'r+b') except FileNotFoundError: pass try: return open(path, 'x+b') except FileExistsError: pass