Mercurial > personal > weather-server
comparison weather_server/logfile.py @ 21:beb42c835c52
Make weather server handle arbitrary data:
- Make logfile record arbitrary BSONs
- Make server handlers OK with same
- Make location type a normal class rather than attrs;
have it handle its own logger.
- Bump version number.
| author | Paul Fisher <paul@pfish.zone> |
|---|---|
| date | Sat, 19 Oct 2019 18:40:48 -0400 |
| parents | efe7a1eff167 |
| children | 20c8ec56e447 |
comparison
equal
deleted
inserted
replaced
| 20:a7fe635d1c88 | 21:beb42c835c52 |
|---|---|
| 1 """The part which handles writing things out and reading things in from CSV. | 1 """The part which handles writing things out and reading things in from CSV. |
| 2 """ | 2 """ |
| 3 | 3 |
| 4 import concurrent.futures as futures | |
| 5 import contextlib | |
| 4 import fcntl | 6 import fcntl |
| 5 import os | 7 import os |
| 8 import queue | |
| 6 import threading | 9 import threading |
| 7 import typing as t | 10 import typing as t |
| 8 | 11 |
| 9 import bson | 12 import bson |
| 10 | 13 |
| 11 from . import common | 14 from . import common |
| 12 from . import types | |
| 13 | 15 |
| 16 | |
| 17 class _WriteRequest: | |
| 18 | |
| 19 def __init__(self, entries: t.Iterable[t.Dict[str, t.Any]]): | |
| 20 """Creates a request to write the given data to the log.""" | |
| 21 # The data to be written. We take ownership of all the dicts! | |
| 22 self.entries = entries | |
| 23 # Once written, a future that will resolve to None if successful. | |
| 24 self.future = futures.Future() | |
| 25 | |
| 26 | |
| 27 class _ReadRequest: | |
| 28 | |
| 29 def __init__(self): | |
| 30 # The future that will be set with the log's contnets. | |
| 31 self.future = futures.Future() | |
| 32 | |
| 33 | |
| 34 # probably handle file-writing with a queue that reports back its progress | |
| 14 | 35 |
| 15 class Logger: | 36 class Logger: |
| 16 """Logger which handles reading/writing a temperature log for one process. | 37 """Logger which handles reading/writing a temperature log for one process. |
| 17 """ | 38 """ |
| 18 | 39 |
| 19 instance_lock = threading.Lock() | 40 instance_lock = threading.Lock() |
| 20 instances: t.Dict[str, 'Logger'] = {} | 41 instances: t.Dict[str, 'Logger'] = {} |
| 21 | 42 |
| 22 @classmethod | 43 @classmethod |
| 23 def create(cls, filename: str) -> 'Logger': | 44 def create( |
| 45 cls, | |
| 46 filename: str, | |
| 47 *, | |
| 48 sample_field: str, | |
| 49 ) -> 'Logger': | |
| 24 """Creates a single shared instance of a logger for the given file.""" | 50 """Creates a single shared instance of a logger for the given file.""" |
| 25 try: | 51 try: |
| 26 return cls.instances[filename] | 52 instance = cls.instances[filename] |
| 27 except KeyError: | 53 except KeyError: |
| 28 with cls.instance_lock: | 54 with cls.instance_lock: |
| 29 try: | 55 try: |
| 30 return cls.instances[filename] | 56 instance = cls.instances[filename] |
| 31 except KeyError: | 57 except KeyError: |
| 32 cls.instances[filename] = Logger(filename) | 58 cls.instances[filename] = Logger( |
| 33 return cls.instances[filename] | 59 filename, |
| 60 sample_field=sample_field) | |
| 61 instance = cls.instances[filename] | |
| 62 if instance._sample_field != sample_field: | |
| 63 raise ValueError( | |
| 64 'Existing instance has different sample field: ' | |
| 65 '{!r} != {!r}'.format(instance._sample_field, sample_field)) | |
| 66 return instance | |
| 34 | 67 |
| 35 def __init__(self, filename: str): | 68 def __init__(self, filename: str, *, sample_field: str): |
| 36 """You should probably call .create() instead.""" | 69 """You should probably call .create() instead.""" |
| 70 self._sample_field = sample_field | |
| 37 self._file = _open_or_create(filename) | 71 self._file = _open_or_create(filename) |
| 38 self._data: t.Tuple[types.Reading] = () | 72 self._data: t.List[t.Dict[str, t.Any], ...] = [] |
| 73 self._queue = queue.SimpleQueue() | |
| 39 self._last_size = 0 | 74 self._last_size = 0 |
| 40 self._maybe_read_data() | 75 self._lock_status: t.Optional[int] = None |
| 41 self._lock = threading.Lock() | 76 self._writer_thread = threading.Thread(target=self._writer) |
| 77 self._writer_thread.start() | |
| 42 | 78 |
| 43 def _maybe_read_data(self) -> None: | 79 @property |
| 80 def data(self) -> t.Tuple[t.Dict[str, t.Any], ...]: | |
| 81 req = _ReadRequest() | |
| 82 self._queue.put(req) | |
| 83 return req.future.result() | |
| 84 | |
| 85 def write_rows(self, entries: t.Iterable[t.Dict[str, t.Any]]): | |
| 86 req = _WriteRequest(entries) | |
| 87 self._queue.put(req) | |
| 88 return req.future.result() | |
| 89 | |
| 90 _POISON = object() | |
| 91 | |
| 92 def close(self): | |
| 93 self._queue.put(self._POISON) | |
| 94 self._writer_thread.join() | |
| 95 | |
| 96 def _writer(self) -> None: | |
| 97 running = True | |
| 98 while running: | |
| 99 item = self._queue.get() | |
| 100 if item is self._POISON: | |
| 101 # None is the poison pill that makes us stop. | |
| 102 running = False | |
| 103 elif isinstance(item, _ReadRequest): | |
| 104 if not item.future.set_running_or_notify_cancel(): | |
| 105 continue | |
| 106 try: | |
| 107 with self._file_lock(fcntl.LOCK_SH): | |
| 108 self._catch_up() | |
| 109 except BaseException as x: | |
| 110 item.future.set_exception(x) | |
| 111 else: | |
| 112 item.future.set_result(tuple(self._data)) | |
| 113 elif isinstance(item, _WriteRequest): | |
| 114 if not item.future.set_running_or_notify_cancel(): | |
| 115 continue | |
| 116 try: | |
| 117 with self._file_lock(fcntl.LOCK_EX): | |
| 118 self._catch_up() | |
| 119 # Since we're at the last good point, truncate after. | |
| 120 self._file.truncate(self._file.tell()) | |
| 121 if not self._data: | |
| 122 last = None | |
| 123 else: | |
| 124 last = self._data[-1][self._sample_field] | |
| 125 for entry in item.entries: | |
| 126 entry_key = entry[self._sample_field] | |
| 127 if last is None or last < entry_key: | |
| 128 self._file.write(common.bson_encode(entry)) | |
| 129 self._data.append(entry) | |
| 130 last = entry_key | |
| 131 self._file.flush() | |
| 132 self._last_size = self._file.tell() | |
| 133 except BaseException as x: | |
| 134 item.future.set_exception(x) | |
| 135 else: | |
| 136 item.future.set_result(None) | |
| 137 else: | |
| 138 raise AssertionError( | |
| 139 'Unexpected item {!r} in the queue'.format(item)) | |
| 140 self._file.close() | |
| 141 | |
| 142 def _catch_up(self) -> None: | |
| 44 """Reads data and advances the file pointer to the end of the file.""" | 143 """Reads data and advances the file pointer to the end of the file.""" |
| 45 # This must be called with both the file lock and _lock held. | 144 assert self._lock_status is not None, 'The lock must be held.' |
| 46 size = self._size() | 145 size = self._size() |
| 47 if size == self._last_size: | 146 if size == self._last_size: |
| 48 return | 147 return |
| 49 last_good = self._file.tell() | 148 last_good = self._file.tell() |
| 50 data = list(self._data) | |
| 51 try: | 149 try: |
| 52 items = bson.decode_file_iter( | 150 items = bson.decode_file_iter( |
| 53 self._file, codec_options=common.BSON_OPTIONS) | 151 self._file, codec_options=common.BSON_OPTIONS) |
| 54 for item in items: | 152 for item in items: |
| 55 last_good = self._file.tell() | 153 last_good = self._file.tell() |
| 56 try: | 154 self._data.append(item) |
| 57 data.append(types.Reading(**item)) | |
| 58 except TypeError: | |
| 59 pass # Skip this item. | |
| 60 except bson.InvalidBSON: | 155 except bson.InvalidBSON: |
| 61 pass # We have reached the last valid document. Bail. | 156 pass # We have reached the last valid document. Bail. |
| 62 # Seek back to immediately after the end of the last valid doc. | 157 # Seek back to immediately after the end of the last valid doc. |
| 63 self._data = tuple(data) | |
| 64 self._file.truncate(last_good) | |
| 65 self._last_size = last_good | 158 self._last_size = last_good |
| 66 self._file.seek(last_good, os.SEEK_SET) | 159 self._file.seek(last_good, os.SEEK_SET) |
| 67 | |
| 68 def write_rows(self, readings: t.Iterable[types.Reading]) -> None: | |
| 69 """Write a sorted series of readings, ignoring old ones.""" | |
| 70 with self._lock: | |
| 71 fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) | |
| 72 try: | |
| 73 self._maybe_read_data() | |
| 74 self._file.truncate(self._file.tell()) | |
| 75 data = list(self._data) | |
| 76 if not data: | |
| 77 last_time = None | |
| 78 else: | |
| 79 last_time = data[-1].sample_time | |
| 80 for reading in readings: | |
| 81 if not last_time or last_time < reading.sample_time: | |
| 82 self._file.write(common.bson_encode(reading.as_dict())) | |
| 83 data.append(reading) | |
| 84 self._data = tuple(data) | |
| 85 finally: | |
| 86 self._file.flush() | |
| 87 self._last_size = self._size() | |
| 88 fcntl.flock(self, fcntl.LOCK_UN) | |
| 89 | 160 |
| 90 def fileno(self) -> int: | 161 def fileno(self) -> int: |
| 91 return self._file.fileno() | 162 return self._file.fileno() |
| 92 | 163 |
| 93 def close(self): | |
| 94 self._file.close() | |
| 95 | |
| 96 @property | |
| 97 def data(self) -> t.Tuple[types.Reading, ...]: | |
| 98 if self._size() != self._last_size: | |
| 99 fcntl.flock(self, fcntl.LOCK_SH) | |
| 100 try: | |
| 101 with self._lock: | |
| 102 self._maybe_read_data() | |
| 103 finally: | |
| 104 fcntl.flock(self, fcntl.LOCK_UN) | |
| 105 return self._data | |
| 106 | |
| 107 def _size(self) -> int: | 164 def _size(self) -> int: |
| 108 return os.stat(self.fileno()).st_size | 165 return os.stat(self.fileno()).st_size |
| 166 | |
| 167 @contextlib.contextmanager | |
| 168 def _file_lock(self, operation: int): | |
| 169 assert operation in (fcntl.LOCK_SH, fcntl.LOCK_EX), 'Invalid operation.' | |
| 170 fcntl.flock(self, operation) | |
| 171 self._lock_status = operation | |
| 172 try: | |
| 173 yield | |
| 174 finally: | |
| 175 self._lock_status = None | |
| 176 fcntl.flock(self, fcntl.LOCK_UN) | |
| 109 | 177 |
| 110 | 178 |
| 111 def _open_or_create(path: str) -> t.BinaryIO: | 179 def _open_or_create(path: str) -> t.BinaryIO: |
| 112 while True: | 180 while True: |
| 113 try: | 181 try: |
