view weather_server/logfile.py @ 10:6d59f038a58b

improve some styles
author Paul Fisher <paul@pfish.zone>
date Sun, 29 Sep 2019 20:42:11 -0400
parents efe7a1eff167
children beb42c835c52
line wrap: on
line source

"""The part which handles writing things out and reading things in from CSV.
"""

import fcntl
import os
import threading
import typing as t

import bson

from . import common
from . import types


class Logger:
    """Logger which handles reading/writing a temperature log for one process.
    """

    instance_lock = threading.Lock()
    instances: t.Dict[str, 'Logger'] = {}

    @classmethod
    def create(cls, filename: str) -> 'Logger':
        """Creates a single shared instance of a logger for the given file."""
        try:
            return cls.instances[filename]
        except KeyError:
            with cls.instance_lock:
                try:
                    return cls.instances[filename]
                except KeyError:
                    cls.instances[filename] = Logger(filename)
                    return cls.instances[filename]

    def __init__(self, filename: str):
        """You should probably call .create() instead."""
        self._file = _open_or_create(filename)
        self._data: t.Tuple[types.Reading] = ()
        self._last_size = 0
        self._maybe_read_data()
        self._lock = threading.Lock()

    def _maybe_read_data(self) -> None:
        """Reads data and advances the file pointer to the end of the file."""
        # This must be called with both the file lock and _lock held.
        size = self._size()
        if size == self._last_size:
            return
        last_good = self._file.tell()
        data = list(self._data)
        try:
            items = bson.decode_file_iter(
                self._file, codec_options=common.BSON_OPTIONS)
            for item in items:
                last_good = self._file.tell()
                try:
                    data.append(types.Reading(**item))
                except TypeError:
                    pass  # Skip this item.
        except bson.InvalidBSON:
            pass  # We have reached the last valid document.  Bail.
        # Seek back to immediately after the end of the last valid doc.
        self._data = tuple(data)
        self._file.truncate(last_good)
        self._last_size = last_good
        self._file.seek(last_good, os.SEEK_SET)

    def write_rows(self, readings: t.Iterable[types.Reading]) -> None:
        """Write a sorted series of readings, ignoring old ones."""
        with self._lock:
            fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
            try:
                self._maybe_read_data()
                self._file.truncate(self._file.tell())
                data = list(self._data)
                if not data:
                    last_time = None
                else:
                    last_time = data[-1].sample_time
                for reading in readings:
                    if not last_time or last_time < reading.sample_time:
                        self._file.write(common.bson_encode(reading.as_dict()))
                        data.append(reading)
                self._data = tuple(data)
            finally:
                self._file.flush()
                self._last_size = self._size()
                fcntl.flock(self, fcntl.LOCK_UN)

    def fileno(self) -> int:
        return self._file.fileno()

    def close(self):
        self._file.close()

    @property
    def data(self) -> t.Tuple[types.Reading, ...]:
        if self._size() != self._last_size:
            fcntl.flock(self, fcntl.LOCK_SH)
            try:
                with self._lock:
                    self._maybe_read_data()
            finally:
                fcntl.flock(self, fcntl.LOCK_UN)
        return self._data

    def _size(self) -> int:
        return os.stat(self.fileno()).st_size


def _open_or_create(path: str) -> t.BinaryIO:
    while True:
        try:
            return open(path, 'r+b')
        except FileNotFoundError:
            pass
        try:
            return open(path, 'x+b')
        except FileExistsError:
            pass