comparison weather_server/logfile_test.py @ 31:9bc3687e1e5e

logfile: Add an index, and don't keep everything in RAM. - Adds index BSON file, updated upon writing. - Limits amount of data in RAM. - Gracefully handles writes that don't update index.
author Paul Fisher <paul@pfish.zone>
date Tue, 07 Jul 2020 19:51:30 -0400
parents 20c8ec56e447
children b77c8e7d2742
comparison
equal deleted inserted replaced
30:c760ab7f93c2 31:9bc3687e1e5e
1 import contextlib 1 import contextlib
2 import datetime 2 import datetime
3 import os.path 3 import os.path
4 import pathlib 4 import pathlib
5 import tempfile 5 import tempfile
6 import threading
7 import unittest 6 import unittest
8 7
9 import bson 8 import bson
10 import pytz 9 import pytz
11 10
23 22
24 def setUp(self): 23 def setUp(self):
25 super().setUp() 24 super().setUp()
26 self.temp_dir = tempfile.TemporaryDirectory() 25 self.temp_dir = tempfile.TemporaryDirectory()
27 self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson' 26 self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson'
27 self.index_path = pathlib.Path(str(self.log_path) + '.index.bson')
28 28
29 def tearDown(self): 29 def tearDown(self):
30 self.temp_dir.cleanup() 30 self.temp_dir.cleanup()
31 super().tearDown() 31 super().tearDown()
32 32
33 def test_empty(self): 33 def test_empty(self):
34 lg = logfile.Logger( 34 lg = logfile.Logger(
35 str(self.log_path), sample_field='x') 35 str(self.log_path), sample_field='x')
36 with contextlib.closing(lg) as logger: 36 with contextlib.closing(lg) as logger:
37 self.assertEqual(logger.data, ()) 37 self.assertEqual(logger.cached_data, ())
38 38
39 def test_fails_to_open(self): 39 def test_fails_to_open(self):
40 with self.assertRaises(OSError): 40 with self.assertRaises(OSError):
41 logfile.Logger( 41 logfile.Logger(
42 os.path.join( 42 os.path.join(
62 with contextlib.closing(logfile.Logger( 62 with contextlib.closing(logfile.Logger(
63 str(self.log_path), 63 str(self.log_path),
64 sample_field='sample_time', 64 sample_field='sample_time',
65 )) as logger: 65 )) as logger:
66 self.assertEqual( 66 self.assertEqual(
67 logger.data, 67 logger.cached_data,
68 ( 68 (
69 dict( 69 dict(
70 sample_time=ts(123), 70 sample_time=ts(123),
71 temp_c=420, 71 temp_c=420,
72 rh_pct=69, 72 rh_pct=69,
82 ingest_time=ts(125), 82 ingest_time=ts(125),
83 ))) 83 )))
84 with contextlib.closing(logfile.Logger( 84 with contextlib.closing(logfile.Logger(
85 str(self.log_path), 85 str(self.log_path),
86 sample_field='sample_time', 86 sample_field='sample_time',
87 cached_entries=4,
88 index_gap=2,
87 )) as logger: 89 )) as logger:
88 logger.write_rows([ 90 logger.write_rows([
89 # Ignored, since it's older than the newest entry. 91 # Ignored, since it's older than the newest entry.
90 dict( 92 dict(
91 sample_time=ts(100), 93 sample_time=ts(100),
99 rh_pct=777, 101 rh_pct=777,
100 ingest_time=ts(200), 102 ingest_time=ts(200),
101 ), 103 ),
102 ]) 104 ])
103 self.assertEqual( 105 self.assertEqual(
104 logger.data, 106 logger.cached_data,
105 ( 107 (
106 dict( 108 dict(
107 sample_time=ts(123), 109 sample_time=ts(123),
108 temp_c=420, 110 temp_c=420,
109 rh_pct=69, 111 rh_pct=69,
130 temp_c=333, 132 temp_c=333,
131 rh_pct=777, 133 rh_pct=777,
132 ingest_time=ts(200), 134 ingest_time=ts(200),
133 ), 135 ),
134 ]) 136 ])
137 self.assertEqual(self.read_index(), [
138 dict(entry_count=0, entry_key=ts(123), byte=0),
139 ])
135 140
136 def test_outside_writes(self): 141 def test_outside_writes(self):
137 with contextlib.closing(logfile.Logger( 142 with contextlib.closing(logfile.Logger(
138 str(self.log_path), 143 str(self.log_path),
139 sample_field='sample_time', 144 sample_field='sample_time',
145 index_gap=2,
140 )) as logger: 146 )) as logger:
141 logger.write_rows([ 147 logger.write_rows([
142 dict( 148 dict(
143 sample_time=ts(100), 149 sample_time=ts(100),
144 temp_c=999, 150 temp_c=999,
158 temp_c=256, 164 temp_c=256,
159 rh_pct=128, 165 rh_pct=128,
160 ingest_time=ts(4096), 166 ingest_time=ts(4096),
161 ))) 167 )))
162 outfile.flush() 168 outfile.flush()
163 self.assertEqual(logger.data, ( 169 # Empty write to update the index.
170 logger.write_rows([])
171 self.assertEqual(logger.cached_data, (
164 dict( 172 dict(
165 sample_time=ts(100), 173 sample_time=ts(100),
166 temp_c=999, 174 temp_c=999,
167 rh_pct=666, 175 rh_pct=666,
168 ingest_time=ts(101), 176 ingest_time=ts(101),
178 temp_c=256, 186 temp_c=256,
179 rh_pct=128, 187 rh_pct=128,
180 ingest_time=ts(4096), 188 ingest_time=ts(4096),
181 ), 189 ),
182 )) 190 ))
191 self.assertEqual(self.read_bsons(), [
192 dict(
193 sample_time=ts(100),
194 temp_c=999,
195 rh_pct=666,
196 ingest_time=ts(101),
197 ),
198 dict(
199 sample_time=ts(125),
200 temp_c=333,
201 rh_pct=777,
202 ingest_time=ts(200),
203 ),
204 dict(
205 sample_time=ts(1024),
206 temp_c=256,
207 rh_pct=128,
208 ingest_time=ts(4096),
209 ),
210 ])
211 self.assertEqual(self.read_index(), [
212 dict(entry_count=0, entry_key=ts(100), byte=0),
213 dict(entry_count=2, entry_key=ts(1024), byte=142),
214 ])
215
216 def test_lots_of_outside_writes(self):
217 with contextlib.closing(logfile.Logger(
218 str(self.log_path),
219 sample_field='key',
220 cached_entries=4,
221 index_gap=2,
222 )) as logger:
223 logger.write_rows(make_messages(10))
224 with contextlib.closing(logfile.Logger(
225 str(self.log_path),
226 sample_field='key',
227 cached_entries=4,
228 index_gap=2,
229 )) as other_logger:
230 other_logger.write_rows(make_messages(10, 10))
231 logger.write_rows(make_messages(1, 20))
232 self.assertEqual(logger.cached_data, (
233 dict(key='0011', value=17),
234 dict(key='0012', value=18),
235 dict(key='0013', value=19),
236 dict(key='0014', value=20),
237 ))
238 self.assertEqual(self.read_bsons(), make_messages(21))
239 self.assertEqual(self.read_index(), [
240 dict(entry_count=0, entry_key='0000', byte=0),
241 dict(entry_count=2, entry_key='0002', byte=60),
242 dict(entry_count=4, entry_key='0004', byte=120),
243 dict(entry_count=6, entry_key='0006', byte=180),
244 dict(entry_count=8, entry_key='0008', byte=240),
245 dict(entry_count=10, entry_key='000a', byte=300),
246 dict(entry_count=12, entry_key='000c', byte=360),
247 dict(entry_count=14, entry_key='000e', byte=420),
248 dict(entry_count=16, entry_key='0010', byte=480),
249 dict(entry_count=18, entry_key='0012', byte=540),
250 dict(entry_count=20, entry_key='0014', byte=600),
251 ])
252
253 def test_outside_writes_that_dont_update_index(self):
254 with contextlib.closing(logfile.Logger(
255 str(self.log_path),
256 sample_field='key',
257 cached_entries=4,
258 index_gap=2,
259 )) as logger:
260 logger.write_rows(make_messages(10))
261 with self.log_path.open('ab') as outfile:
262 for item in make_messages(16, 256):
263 outfile.write(common.bson_encode(item))
264 self.assertEqual(logger.cached_data, (
265 dict(key='010c', value=268),
266 dict(key='010d', value=269),
267 dict(key='010e', value=270),
268 dict(key='010f', value=271),
269 ))
270 self.assertEqual(self.read_bsons(), [
271 dict(key='0000', value=0),
272 dict(key='0001', value=1),
273 dict(key='0002', value=2),
274 dict(key='0003', value=3),
275 dict(key='0004', value=4),
276 dict(key='0005', value=5),
277 dict(key='0006', value=6),
278 dict(key='0007', value=7),
279 dict(key='0008', value=8),
280 dict(key='0009', value=9),
281 dict(key='0100', value=256),
282 dict(key='0101', value=257),
283 dict(key='0102', value=258),
284 dict(key='0103', value=259),
285 dict(key='0104', value=260),
286 dict(key='0105', value=261),
287 dict(key='0106', value=262),
288 dict(key='0107', value=263),
289 dict(key='0108', value=264),
290 dict(key='0109', value=265),
291 dict(key='010a', value=266),
292 dict(key='010b', value=267),
293 dict(key='010c', value=268),
294 dict(key='010d', value=269),
295 dict(key='010e', value=270),
296 dict(key='010f', value=271),
297 ])
298 self.assertEqual(self.read_index(), [
299 dict(entry_count=0, entry_key='0000', byte=0),
300 dict(entry_count=2, entry_key='0002', byte=60),
301 dict(entry_count=4, entry_key='0004', byte=120),
302 dict(entry_count=6, entry_key='0006', byte=180),
303 dict(entry_count=8, entry_key='0008', byte=240),
304 ])
305
306 def test_outside_write_but_dont_skip(self):
307 with contextlib.closing(logfile.Logger(
308 str(self.log_path),
309 sample_field='key',
310 cached_entries=10,
311 index_gap=4,
312 )) as logger:
313 logger.write_rows(make_messages(10))
314 with contextlib.closing(logfile.Logger(
315 str(self.log_path),
316 sample_field='key',
317 cached_entries=10,
318 index_gap=4,
319 )) as other_logger:
320 other_logger.write_rows(make_messages(5, 10))
321 self.assertEqual(logger.cached_data, (
322 dict(key='0005', value=5),
323 dict(key='0006', value=6),
324 dict(key='0007', value=7),
325 dict(key='0008', value=8),
326 dict(key='0009', value=9),
327 dict(key='000a', value=10),
328 dict(key='000b', value=11),
329 dict(key='000c', value=12),
330 dict(key='000d', value=13),
331 dict(key='000e', value=14),
332 ))
333 logger.write_rows(make_messages(5, 15))
334 self.assertEqual(self.read_index(), [
335 dict(entry_count=0, entry_key='0000', byte=0),
336 dict(entry_count=4, entry_key='0004', byte=120),
337 dict(entry_count=8, entry_key='0008', byte=240),
338 dict(entry_count=12, entry_key='000c', byte=360),
339 dict(entry_count=16, entry_key='0010', byte=480),
340 ])
341 logger.write_rows(make_messages(1, 20))
342 self.assertEqual(self.read_index(), [
343 dict(entry_count=0, entry_key='0000', byte=0),
344 dict(entry_count=4, entry_key='0004', byte=120),
345 dict(entry_count=8, entry_key='0008', byte=240),
346 dict(entry_count=12, entry_key='000c', byte=360),
347 dict(entry_count=16, entry_key='0010', byte=480),
348 dict(entry_count=20, entry_key='0014', byte=600),
349 ])
350
351 def test_start_and_skip(self):
352 with contextlib.closing(logfile.Logger(
353 str(self.log_path),
354 sample_field='key',
355 cached_entries=4,
356 index_gap=2,
357 )) as logger:
358 logger.write_rows(make_messages(10))
359 with contextlib.closing(logfile.Logger(
360 str(self.log_path),
361 sample_field='key',
362 cached_entries=4,
363 index_gap=2,
364 )) as other_logger:
365 self.assertEqual(
366 other_logger.cached_data, tuple(make_messages(4, 6)))
183 367
184 def read_bsons(self): 368 def read_bsons(self):
185 with self.log_path.open('rb') as infile: 369 with self.log_path.open('rb') as infile:
186 return bson.decode_all( 370 return bson.decode_all(
187 infile.read(), common.BSON_OPTIONS) 371 infile.read(), common.BSON_OPTIONS)
188 372
373 def read_index(self):
374 with self.index_path.open('rb') as infile:
375 return bson.decode_all(
376 infile.read(), common.BSON_OPTIONS)
377
378
379 def make_messages(count, start=0):
380 return [
381 dict(key=format(n, '04x'), value=n)
382 for n in range(start, start + count)]
383
189 384
190 if __name__ == '__main__': 385 if __name__ == '__main__':
191 unittest.main() 386 unittest.main()