comparison weather_server/logfile.py @ 21:beb42c835c52

Make weather server handle arbitrary data: - Make logfile record arbitrary BSONs - Make server handlers OK with same - Make location type a normal class rather than attrs; have it handle its own logger. - Bump version number.
author Paul Fisher <paul@pfish.zone>
date Sat, 19 Oct 2019 18:40:48 -0400
parents efe7a1eff167
children 20c8ec56e447
comparison
equal deleted inserted replaced
20:a7fe635d1c88 21:beb42c835c52
1 """The part which handles writing things out and reading things in from CSV. 1 """The part which handles writing things out and reading things in from CSV.
2 """ 2 """
3 3
4 import concurrent.futures as futures
5 import contextlib
4 import fcntl 6 import fcntl
5 import os 7 import os
8 import queue
6 import threading 9 import threading
7 import typing as t 10 import typing as t
8 11
9 import bson 12 import bson
10 13
11 from . import common 14 from . import common
12 from . import types
13 15
16
17 class _WriteRequest:
18
19 def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]):
20 """Creates a request to write the given data to the log."""
21 # The data to be written. We take ownership of all the dicts!
22 self.entries = entries
23 # Once written, a future that will resolve to None if successful.
24 self.future = futures.Future()
25
26
27 class _ReadRequest:
28
29 def __init__(self):
30 # The future that will be set with the log's contnets.
31 self.future = futures.Future()
32
33
34 # probably handle file-writing with a queue that reports back its progress
14 35
15 class Logger: 36 class Logger:
16 """Logger which handles reading/writing a temperature log for one process. 37 """Logger which handles reading/writing a temperature log for one process.
17 """ 38 """
18 39
19 instance_lock = threading.Lock() 40 instance_lock = threading.Lock()
20 instances: t.Dict[str, 'Logger'] = {} 41 instances: t.Dict[str, 'Logger'] = {}
21 42
22 @classmethod 43 @classmethod
23 def create(cls, filename: str) -> 'Logger': 44 def create(
45 cls,
46 filename: str,
47 *,
48 sample_field: str,
49 ) -> 'Logger':
24 """Creates a single shared instance of a logger for the given file.""" 50 """Creates a single shared instance of a logger for the given file."""
25 try: 51 try:
26 return cls.instances[filename] 52 instance = cls.instances[filename]
27 except KeyError: 53 except KeyError:
28 with cls.instance_lock: 54 with cls.instance_lock:
29 try: 55 try:
30 return cls.instances[filename] 56 instance = cls.instances[filename]
31 except KeyError: 57 except KeyError:
32 cls.instances[filename] = Logger(filename) 58 cls.instances[filename] = Logger(
33 return cls.instances[filename] 59 filename,
60 sample_field=sample_field)
61 instance = cls.instances[filename]
62 if instance._sample_field != sample_field:
63 raise ValueError(
64 'Existing instance has different sample field: '
65 '{!r} != {!r}'.format(instance._sample_field, sample_field))
66 return instance
34 67
35 def __init__(self, filename: str): 68 def __init__(self, filename: str, *, sample_field: str):
36 """You should probably call .create() instead.""" 69 """You should probably call .create() instead."""
70 self._sample_field = sample_field
37 self._file = _open_or_create(filename) 71 self._file = _open_or_create(filename)
38 self._data: t.Tuple[types.Reading] = () 72 self._data: t.List[t.Dict[str, t.Any], ...] = []
73 self._queue = queue.SimpleQueue()
39 self._last_size = 0 74 self._last_size = 0
40 self._maybe_read_data() 75 self._lock_status: t.Optional[int] = None
41 self._lock = threading.Lock() 76 self._writer_thread = threading.Thread(target=self._writer)
77 self._writer_thread.start()
42 78
43 def _maybe_read_data(self) -> None: 79 @property
80 def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]:
81 req = _ReadRequest()
82 self._queue.put(req)
83 return req.future.result()
84
85 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]):
86 req = _WriteRequest(entries)
87 self._queue.put(req)
88 return req.future.result()
89
90 _POISON = object()
91
92 def close(self):
93 self._queue.put(self._POISON)
94 self._writer_thread.join()
95
96 def _writer(self) -> None:
97 running = True
98 while running:
99 item = self._queue.get()
100 if item is self._POISON:
101 # None is the poison pill that makes us stop.
102 running = False
103 elif isinstance(item, _ReadRequest):
104 if not item.future.set_running_or_notify_cancel():
105 continue
106 try:
107 with self._file_lock(fcntl.LOCK_SH):
108 self._catch_up()
109 except BaseException as x:
110 item.future.set_exception(x)
111 else:
112 item.future.set_result(tuple(self._data))
113 elif isinstance(item, _WriteRequest):
114 if not item.future.set_running_or_notify_cancel():
115 continue
116 try:
117 with self._file_lock(fcntl.LOCK_EX):
118 self._catch_up()
119 # Since we're at the last good point, truncate after.
120 self._file.truncate(self._file.tell())
121 if not self._data:
122 last = None
123 else:
124 last = self._data[-1][self._sample_field]
125 for entry in item.entries:
126 entry_key = entry[self._sample_field]
127 if last is None or last < entry_key:
128 self._file.write(common.bson_encode(entry))
129 self._data.append(entry)
130 last = entry_key
131 self._file.flush()
132 self._last_size = self._file.tell()
133 except BaseException as x:
134 item.future.set_exception(x)
135 else:
136 item.future.set_result(None)
137 else:
138 raise AssertionError(
139 'Unexpected item {!r} in the queue'.format(item))
140 self._file.close()
141
142 def _catch_up(self) -> None:
44 """Reads data and advances the file pointer to the end of the file.""" 143 """Reads data and advances the file pointer to the end of the file."""
45 # This must be called with both the file lock and _lock held. 144 assert self._lock_status is not None, 'The lock must be held.'
46 size = self._size() 145 size = self._size()
47 if size == self._last_size: 146 if size == self._last_size:
48 return 147 return
49 last_good = self._file.tell() 148 last_good = self._file.tell()
50 data = list(self._data)
51 try: 149 try:
52 items = bson.decode_file_iter( 150 items = bson.decode_file_iter(
53 self._file, codec_options=common.BSON_OPTIONS) 151 self._file, codec_options=common.BSON_OPTIONS)
54 for item in items: 152 for item in items:
55 last_good = self._file.tell() 153 last_good = self._file.tell()
56 try: 154 self._data.append(item)
57 data.append(types.Reading(**item))
58 except TypeError:
59 pass # Skip this item.
60 except bson.InvalidBSON: 155 except bson.InvalidBSON:
61 pass # We have reached the last valid document. Bail. 156 pass # We have reached the last valid document. Bail.
62 # Seek back to immediately after the end of the last valid doc. 157 # Seek back to immediately after the end of the last valid doc.
63 self._data = tuple(data)
64 self._file.truncate(last_good)
65 self._last_size = last_good 158 self._last_size = last_good
66 self._file.seek(last_good, os.SEEK_SET) 159 self._file.seek(last_good, os.SEEK_SET)
67
68 def write_rows(self, readings: t.Iterable[types.Reading]) -> None:
69 """Write a sorted series of readings, ignoring old ones."""
70 with self._lock:
71 fcntl.flock(self._file.fileno(), fcntl.LOCK_EX)
72 try:
73 self._maybe_read_data()
74 self._file.truncate(self._file.tell())
75 data = list(self._data)
76 if not data:
77 last_time = None
78 else:
79 last_time = data[-1].sample_time
80 for reading in readings:
81 if not last_time or last_time < reading.sample_time:
82 self._file.write(common.bson_encode(reading.as_dict()))
83 data.append(reading)
84 self._data = tuple(data)
85 finally:
86 self._file.flush()
87 self._last_size = self._size()
88 fcntl.flock(self, fcntl.LOCK_UN)
89 160
90 def fileno(self) -> int: 161 def fileno(self) -> int:
91 return self._file.fileno() 162 return self._file.fileno()
92 163
93 def close(self):
94 self._file.close()
95
96 @property
97 def data(self) -> t.Tuple[types.Reading, ...]:
98 if self._size() != self._last_size:
99 fcntl.flock(self, fcntl.LOCK_SH)
100 try:
101 with self._lock:
102 self._maybe_read_data()
103 finally:
104 fcntl.flock(self, fcntl.LOCK_UN)
105 return self._data
106
107 def _size(self) -> int: 164 def _size(self) -> int:
108 return os.stat(self.fileno()).st_size 165 return os.stat(self.fileno()).st_size
166
167 @contextlib.contextmanager
168 def _file_lock(self, operation: int):
169 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.'
170 fcntl.flock(self, operation)
171 self._lock_status = operation
172 try:
173 yield
174 finally:
175 self._lock_status = None
176 fcntl.flock(self, fcntl.LOCK_UN)
109 177
110 178
111 def _open_or_create(path: str) -> t.BinaryIO: 179 def _open_or_create(path: str) -> t.BinaryIO:
112 while True: 180 while True:
113 try: 181 try: