comparison weather_server/logfile.py @ 0:efe7a1eff167

Create initial logger for weather server.
author Paul Fisher <paul@pfish.zone>
date Sat, 28 Sep 2019 23:17:21 -0400
parents
children beb42c835c52
comparison
equal deleted inserted replaced
-1:000000000000 0:efe7a1eff167
1 """The part which handles writing things out and reading things in from CSV.
2 """
3
4 import fcntl
5 import os
6 import threading
7 import typing as t
8
9 import bson
10
11 from . import common
12 from . import types
13
14
15 class Logger:
16 """Logger which handles reading/writing a temperature log for one process.
17 """
18
19 instance_lock = threading.Lock()
20 instances: t.Dict[str, 'Logger'] = {}
21
22 @classmethod
23 def create(cls, filename: str) -> 'Logger':
24 """Creates a single shared instance of a logger for the given file."""
25 try:
26 return cls.instances[filename]
27 except KeyError:
28 with cls.instance_lock:
29 try:
30 return cls.instances[filename]
31 except KeyError:
32 cls.instances[filename] = Logger(filename)
33 return cls.instances[filename]
34
35 def __init__(self, filename: str):
36 """You should probably call .create() instead."""
37 self._file = _open_or_create(filename)
38 self._data: t.Tuple[types.Reading] = ()
39 self._last_size = 0
40 self._maybe_read_data()
41 self._lock = threading.Lock()
42
43 def _maybe_read_data(self) -> None:
44 """Reads data and advances the file pointer to the end of the file."""
45 # This must be called with both the file lock and _lock held.
46 size = self._size()
47 if size == self._last_size:
48 return
49 last_good = self._file.tell()
50 data = list(self._data)
51 try:
52 items = bson.decode_file_iter(
53 self._file, codec_options=common.BSON_OPTIONS)
54 for item in items:
55 last_good = self._file.tell()
56 try:
57 data.append(types.Reading(**item))
58 except TypeError:
59 pass # Skip this item.
60 except bson.InvalidBSON:
61 pass # We have reached the last valid document. Bail.
62 # Seek back to immediately after the end of the last valid doc.
63 self._data = tuple(data)
64 self._file.truncate(last_good)
65 self._last_size = last_good
66 self._file.seek(last_good, os.SEEK_SET)
67
68 def write_rows(self, readings: t.Iterable[types.Reading]) -> None:
69 """Write a sorted series of readings, ignoring old ones."""
70 with self._lock:
71 fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
72 try:
73 self._maybe_read_data()
74 self._file.truncate(self._file.tell())
75 data = list(self._data)
76 if not data:
77 last_time = None
78 else:
79 last_time = data[-1].sample_time
80 for reading in readings:
81 if not last_time or last_time < reading.sample_time:
82 self._file.write(common.bson_encode(reading.as_dict()))
83 data.append(reading)
84 self._data = tuple(data)
85 finally:
86 self._file.flush()
87 self._last_size = self._size()
88 fcntl.flock(self, fcntl.LOCK_UN)
89
90 def fileno(self) -> int:
91 return self._file.fileno()
92
93 def close(self):
94 self._file.close()
95
96 @property
97 def data(self) -> t.Tuple[types.Reading, ...]:
98 if self._size() != self._last_size:
99 fcntl.flock(self, fcntl.LOCK_SH)
100 try:
101 with self._lock:
102 self._maybe_read_data()
103 finally:
104 fcntl.flock(self, fcntl.LOCK_UN)
105 return self._data
106
107 def _size(self) -> int:
108 return os.stat(self.fileno()).st_size
109
110
111 def _open_or_create(path: str) -> t.BinaryIO:
112 while True:
113 try:
114 return open(path, 'r+b')
115 except FileNotFoundError:
116 pass
117 try:
118 return open(path, 'x+b')
119 except FileExistsError:
120 pass