Mercurial > personal > weather-server
comparison weather_server/logfile.py @ 0:efe7a1eff167
Create initial logger for weather server.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sat, 28 Sep 2019 23:17:21 -0400 |
parents | |
children | beb42c835c52 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:efe7a1eff167 |
---|---|
1 """The part which handles writing things out and reading things in from CSV. | |
2 """ | |
3 | |
4 import fcntl | |
5 import os | |
6 import threading | |
7 import typing as t | |
8 | |
9 import bson | |
10 | |
11 from . import common | |
12 from . import types | |
13 | |
14 | |
15 class Logger: | |
16 """Logger which handles reading/writing a temperature log for one process. | |
17 """ | |
18 | |
19 instance_lock = threading.Lock() | |
20 instances: t.Dict[str, 'Logger'] = {} | |
21 | |
22 @classmethod | |
23 def create(cls, filename: str) -> 'Logger': | |
24 """Creates a single shared instance of a logger for the given file.""" | |
25 try: | |
26 return cls.instances[filename] | |
27 except KeyError: | |
28 with cls.instance_lock: | |
29 try: | |
30 return cls.instances[filename] | |
31 except KeyError: | |
32 cls.instances[filename] = Logger(filename) | |
33 return cls.instances[filename] | |
34 | |
35 def __init__(self, filename: str): | |
36 """You should probably call .create() instead.""" | |
37 self._file = _open_or_create(filename) | |
38 self._data: t.Tuple[types.Reading] = () | |
39 self._last_size = 0 | |
40 self._maybe_read_data() | |
41 self._lock = threading.Lock() | |
42 | |
43 def _maybe_read_data(self) -> None: | |
44 """Reads data and advances the file pointer to the end of the file.""" | |
45 # This must be called with both the file lock and _lock held. | |
46 size = self._size() | |
47 if size == self._last_size: | |
48 return | |
49 last_good = self._file.tell() | |
50 data = list(self._data) | |
51 try: | |
52 items = bson.decode_file_iter( | |
53 self._file, codec_options=common.BSON_OPTIONS) | |
54 for item in items: | |
55 last_good = self._file.tell() | |
56 try: | |
57 data.append(types.Reading(**item)) | |
58 except TypeError: | |
59 pass # Skip this item. | |
60 except bson.InvalidBSON: | |
61 pass # We have reached the last valid document. Bail. | |
62 # Seek back to immediately after the end of the last valid doc. | |
63 self._data = tuple(data) | |
64 self._file.truncate(last_good) | |
65 self._last_size = last_good | |
66 self._file.seek(last_good, os.SEEK_SET) | |
67 | |
68 def write_rows(self, readings: t.Iterable[types.Reading]) -> None: | |
69 """Write a sorted series of readings, ignoring old ones.""" | |
70 with self._lock: | |
71 fcntl.flock(self._file.fileno(), fcntl.LOCK_EX) | |
72 try: | |
73 self._maybe_read_data() | |
74 self._file.truncate(self._file.tell()) | |
75 data = list(self._data) | |
76 if not data: | |
77 last_time = None | |
78 else: | |
79 last_time = data[-1].sample_time | |
80 for reading in readings: | |
81 if not last_time or last_time < reading.sample_time: | |
82 self._file.write(common.bson_encode(reading.as_dict())) | |
83 data.append(reading) | |
84 self._data = tuple(data) | |
85 finally: | |
86 self._file.flush() | |
87 self._last_size = self._size() | |
88 fcntl.flock(self, fcntl.LOCK_UN) | |
89 | |
90 def fileno(self) -> int: | |
91 return self._file.fileno() | |
92 | |
93 def close(self): | |
94 self._file.close() | |
95 | |
96 @property | |
97 def data(self) -> t.Tuple[types.Reading, ...]: | |
98 if self._size() != self._last_size: | |
99 fcntl.flock(self, fcntl.LOCK_SH) | |
100 try: | |
101 with self._lock: | |
102 self._maybe_read_data() | |
103 finally: | |
104 fcntl.flock(self, fcntl.LOCK_UN) | |
105 return self._data | |
106 | |
107 def _size(self) -> int: | |
108 return os.stat(self.fileno()).st_size | |
109 | |
110 | |
111 def _open_or_create(path: str) -> t.BinaryIO: | |
112 while True: | |
113 try: | |
114 return open(path, 'r+b') | |
115 except FileNotFoundError: | |
116 pass | |
117 try: | |
118 return open(path, 'x+b') | |
119 except FileExistsError: | |
120 pass |