Mercurial > personal > weather-server
comparison weather_server/logfile.py @ 21:beb42c835c52
Make weather server handle arbitrary data:
- Make logfile record arbitrary BSONs
- Make server handlers OK with same
- Make location type a normal class rather than attrs;
have it handle its own logger.
- Bump version number.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sat, 19 Oct 2019 18:40:48 -0400 |
parents | efe7a1eff167 |
children | 20c8ec56e447 |
comparison
equal
deleted
inserted
replaced
20:a7fe635d1c88 | 21:beb42c835c52 |
---|---|
1 """The part which handles writing things out and reading things in from CSV. | 1 """The part which handles writing things out and reading things in from CSV. |
2 """ | 2 """ |
3 | 3 |
4 import concurrent.futures as futures | |
5 import contextlib | |
4 import fcntl | 6 import fcntl |
5 import os | 7 import os |
8 import queue | |
6 import threading | 9 import threading |
7 import typing as t | 10 import typing as t |
8 | 11 |
9 import bson | 12 import bson |
10 | 13 |
11 from . import common | 14 from . import common |
12 from . import types | |
13 | 15 |
16 | |
17 class _WriteRequest: | |
18 | |
19 def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): | |
20 """Creates a request to write the given data to the log.""" | |
21 # The data to be written. We take ownership of all the dicts! | |
22 self.entries = entries | |
23 # Once written, a future that will resolve to None if successful. | |
24 self.future = futures.Future() | |
25 | |
26 | |
27 class _ReadRequest: | |
28 | |
29 def __init__(self): | |
30 # The future that will be set with the log's contnets. | |
31 self.future = futures.Future() | |
32 | |
33 | |
34 # probably handle file-writing with a queue that reports back its progress | |
14 | 35 |
15 class Logger: | 36 class Logger: |
16 """Logger which handles reading/writing a temperature log for one process. | 37 """Logger which handles reading/writing a temperature log for one process. |
17 """ | 38 """ |
18 | 39 |
19 instance_lock = threading.Lock() | 40 instance_lock = threading.Lock() |
20 instances: t.Dict[str, 'Logger'] = {} | 41 instances: t.Dict[str, 'Logger'] = {} |
21 | 42 |
22 @classmethod | 43 @classmethod |
23 def create(cls, filename: str) -> 'Logger': | 44 def create( |
45 cls, | |
46 filename: str, | |
47 *, | |
48 sample_field: str, | |
49 ) -> 'Logger': | |
24 """Creates a single shared instance of a logger for the given file.""" | 50 """Creates a single shared instance of a logger for the given file.""" |
25 try: | 51 try: |
26 return cls.instances[filename] | 52 instance = cls.instances[filename] |
27 except KeyError: | 53 except KeyError: |
28 with cls.instance_lock: | 54 with cls.instance_lock: |
29 try: | 55 try: |
30 return cls.instances[filename] | 56 instance = cls.instances[filename] |
31 except KeyError: | 57 except KeyError: |
32 cls.instances[filename] = Logger(filename) | 58 cls.instances[filename] = Logger( |
33 return cls.instances[filename] | 59 filename, |
60 sample_field=sample_field) | |
61 instance = cls.instances[filename] | |
62 if instance._sample_field != sample_field: | |
63 raise ValueError( | |
64 'Existing instance has different sample field: ' | |
65 '{!r} != {!r}'.format(instance._sample_field, sample_field)) | |
66 return instance | |
34 | 67 |
35 def __init__(self, filename: str): | 68 def __init__(self, filename: str, *, sample_field: str): |
36 """You should probably call .create() instead.""" | 69 """You should probably call .create() instead.""" |
70 self._sample_field = sample_field | |
37 self._file = _open_or_create(filename) | 71 self._file = _open_or_create(filename) |
38 self._data: t.Tuple[types.Reading] = () | 72 self._data: t.List[t.Dict[str, t.Any], ...] = [] |
73 self._queue = queue.SimpleQueue() | |
39 self._last_size = 0 | 74 self._last_size = 0 |
40 self._maybe_read_data() | 75 self._lock_status: t.Optional[int] = None |
41 self._lock = threading.Lock() | 76 self._writer_thread = threading.Thread(target=self._writer) |
77 self._writer_thread.start() | |
42 | 78 |
43 def _maybe_read_data(self) -> None: | 79 @property |
80 def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: | |
81 req = _ReadRequest() | |
82 self._queue.put(req) | |
83 return req.future.result() | |
84 | |
85 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): | |
86 req = _WriteRequest(entries) | |
87 self._queue.put(req) | |
88 return req.future.result() | |
89 | |
90 _POISON = object() | |
91 | |
92 def close(self): | |
93 self._queue.put(self._POISON) | |
94 self._writer_thread.join() | |
95 | |
96 def _writer(self) -> None: | |
97 running = True | |
98 while running: | |
99 item = self._queue.get() | |
100 if item is self._POISON: | |
101 # None is the poison pill that makes us stop. | |
102 running = False | |
103 elif isinstance(item, _ReadRequest): | |
104 if not item.future.set_running_or_notify_cancel(): | |
105 continue | |
106 try: | |
107 with self._file_lock(fcntl.LOCK_SH): | |
108 self._catch_up() | |
109 except BaseException as x: | |
110 item.future.set_exception(x) | |
111 else: | |
112 item.future.set_result(tuple(self._data)) | |
113 elif isinstance(item, _WriteRequest): | |
114 if not item.future.set_running_or_notify_cancel(): | |
115 continue | |
116 try: | |
117 with self._file_lock(fcntl.LOCK_EX): | |
118 self._catch_up() | |
119 # Since we're at the last good point, truncate after. | |
120 self._file.truncate(self._file.tell()) | |
121 if not self._data: | |
122 last = None | |
123 else: | |
124 last = self._data[-1][self._sample_field] | |
125 for entry in item.entries: | |
126 entry_key = entry[self._sample_field] | |
127 if last is None or last < entry_key: | |
128 self._file.write(common.bson_encode(entry)) | |
129 self._data.append(entry) | |
130 last = entry_key | |
131 self._file.flush() | |
132 self._last_size = self._file.tell() | |
133 except BaseException as x: | |
134 item.future.set_exception(x) | |
135 else: | |
136 item.future.set_result(None) | |
137 else: | |
138 raise AssertionError( | |
139 'Unexpected item {!r} in the queue'.format(item)) | |
140 self._file.close() | |
141 | |
142 def _catch_up(self) -> None: | |
44 """Reads data and advances the file pointer to the end of the file.""" | 143 """Reads data and advances the file pointer to the end of the file.""" |
45 # This must be called with both the file lock and _lock held. | 144 assert self._lock_status is not None, 'The lock must be held.' |
46 size = self._size() | 145 size = self._size() |
47 if size == self._last_size: | 146 if size == self._last_size: |
48 return | 147 return |
49 last_good = self._file.tell() | 148 last_good = self._file.tell() |
50 data = list(self._data) | |
51 try: | 149 try: |
52 items = bson.decode_file_iter( | 150 items = bson.decode_file_iter( |
53 self._file, codec_options=common.BSON_OPTIONS) | 151 self._file, codec_options=common.BSON_OPTIONS) |
54 for item in items: | 152 for item in items: |
55 last_good = self._file.tell() | 153 last_good = self._file.tell() |
56 try: | 154 self._data.append(item) |
57 data.append(types.Reading(**item)) | |
58 except TypeError: | |
59 pass # Skip this item. | |
60 except bson.InvalidBSON: | 155 except bson.InvalidBSON: |
61 pass # We have reached the last valid document. Bail. | 156 pass # We have reached the last valid document. Bail. |
62 # Seek back to immediately after the end of the last valid doc. | 157 # Seek back to immediately after the end of the last valid doc. |
63 self._data = tuple(data) | |
64 self._file.truncate(last_good) | |
65 self._last_size = last_good | 158 self._last_size = last_good |
66 self._file.seek(last_good, os.SEEK_SET) | 159 self._file.seek(last_good, os.SEEK_SET) |
67 | |
68 def write_rows(self, readings: t.Iterable[types.Reading]) -> None: | |
69 """Write a sorted series of readings, ignoring old ones.""" | |
70 with self._lock: | |
71 fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) | |
72 try: | |
73 self._maybe_read_data() | |
74 self._file.truncate(self._file.tell()) | |
75 data = list(self._data) | |
76 if not data: | |
77 last_time = None | |
78 else: | |
79 last_time = data[-1].sample_time | |
80 for reading in readings: | |
81 if not last_time or last_time < reading.sample_time: | |
82 self._file.write(common.bson_encode(reading.as_dict())) | |
83 data.append(reading) | |
84 self._data = tuple(data) | |
85 finally: | |
86 self._file.flush() | |
87 self._last_size = self._size() | |
88 fcntl.flock(self, fcntl.LOCK_UN) | |
89 | 160 |
90 def fileno(self) -> int: | 161 def fileno(self) -> int: |
91 return self._file.fileno() | 162 return self._file.fileno() |
92 | 163 |
93 def close(self): | |
94 self._file.close() | |
95 | |
96 @property | |
97 def data(self) -> t.Tuple[types.Reading, ...]: | |
98 if self._size() != self._last_size: | |
99 fcntl.flock(self, fcntl.LOCK_SH) | |
100 try: | |
101 with self._lock: | |
102 self._maybe_read_data() | |
103 finally: | |
104 fcntl.flock(self, fcntl.LOCK_UN) | |
105 return self._data | |
106 | |
107 def _size(self) -> int: | 164 def _size(self) -> int: |
108 return os.stat(self.fileno()).st_size | 165 return os.stat(self.fileno()).st_size |
166 | |
167 @contextlib.contextmanager | |
168 def _file_lock(self, operation: int): | |
169 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' | |
170 fcntl.flock(self, operation) | |
171 self._lock_status = operation | |
172 try: | |
173 yield | |
174 finally: | |
175 self._lock_status = None | |
176 fcntl.flock(self, fcntl.LOCK_UN) | |
109 | 177 |
110 | 178 |
111 def _open_or_create(path: str) -> t.BinaryIO: | 179 def _open_or_create(path: str) -> t.BinaryIO: |
112 while True: | 180 while True: |
113 try: | 181 try: |