Mercurial > personal > weather-server
comparison weather_server/logfile_test.py @ 31:9bc3687e1e5e
logfile: Add an index, and don't keep everything in RAM.
- Adds index BSON file, updated upon writing.
- Limits amount of data in RAM.
- Gracefully handles writes that don't update index.
| author | Paul Fisher <paul@pfish.zone> |
|---|---|
| date | Tue, 07 Jul 2020 19:51:30 -0400 |
| parents | 20c8ec56e447 |
| children | b77c8e7d2742 |
comparison
equal
deleted
inserted
replaced
| 30:c760ab7f93c2 | 31:9bc3687e1e5e |
|---|---|
| 1 import contextlib | 1 import contextlib |
| 2 import datetime | 2 import datetime |
| 3 import os.path | 3 import os.path |
| 4 import pathlib | 4 import pathlib |
| 5 import tempfile | 5 import tempfile |
| 6 import threading | |
| 7 import unittest | 6 import unittest |
| 8 | 7 |
| 9 import bson | 8 import bson |
| 10 import pytz | 9 import pytz |
| 11 | 10 |
| 23 | 22 |
| 24 def setUp(self): | 23 def setUp(self): |
| 25 super().setUp() | 24 super().setUp() |
| 26 self.temp_dir = tempfile.TemporaryDirectory() | 25 self.temp_dir = tempfile.TemporaryDirectory() |
| 27 self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson' | 26 self.log_path = pathlib.Path(self.temp_dir.name) / 'test.bson' |
| 27 self.index_path = pathlib.Path(str(self.log_path) + '.index.bson') | |
| 28 | 28 |
| 29 def tearDown(self): | 29 def tearDown(self): |
| 30 self.temp_dir.cleanup() | 30 self.temp_dir.cleanup() |
| 31 super().tearDown() | 31 super().tearDown() |
| 32 | 32 |
| 33 def test_empty(self): | 33 def test_empty(self): |
| 34 lg = logfile.Logger( | 34 lg = logfile.Logger( |
| 35 str(self.log_path), sample_field='x') | 35 str(self.log_path), sample_field='x') |
| 36 with contextlib.closing(lg) as logger: | 36 with contextlib.closing(lg) as logger: |
| 37 self.assertEqual(logger.data, ()) | 37 self.assertEqual(logger.cached_data, ()) |
| 38 | 38 |
| 39 def test_fails_to_open(self): | 39 def test_fails_to_open(self): |
| 40 with self.assertRaises(OSError): | 40 with self.assertRaises(OSError): |
| 41 logfile.Logger( | 41 logfile.Logger( |
| 42 os.path.join( | 42 os.path.join( |
| 62 with contextlib.closing(logfile.Logger( | 62 with contextlib.closing(logfile.Logger( |
| 63 str(self.log_path), | 63 str(self.log_path), |
| 64 sample_field='sample_time', | 64 sample_field='sample_time', |
| 65 )) as logger: | 65 )) as logger: |
| 66 self.assertEqual( | 66 self.assertEqual( |
| 67 logger.data, | 67 logger.cached_data, |
| 68 ( | 68 ( |
| 69 dict( | 69 dict( |
| 70 sample_time=ts(123), | 70 sample_time=ts(123), |
| 71 temp_c=420, | 71 temp_c=420, |
| 72 rh_pct=69, | 72 rh_pct=69, |
| 82 ingest_time=ts(125), | 82 ingest_time=ts(125), |
| 83 ))) | 83 ))) |
| 84 with contextlib.closing(logfile.Logger( | 84 with contextlib.closing(logfile.Logger( |
| 85 str(self.log_path), | 85 str(self.log_path), |
| 86 sample_field='sample_time', | 86 sample_field='sample_time', |
| 87 cached_entries=4, | |
| 88 index_gap=2, | |
| 87 )) as logger: | 89 )) as logger: |
| 88 logger.write_rows([ | 90 logger.write_rows([ |
| 89 # Ignored, since it's older than the newest entry. | 91 # Ignored, since it's older than the newest entry. |
| 90 dict( | 92 dict( |
| 91 sample_time=ts(100), | 93 sample_time=ts(100), |
| 99 rh_pct=777, | 101 rh_pct=777, |
| 100 ingest_time=ts(200), | 102 ingest_time=ts(200), |
| 101 ), | 103 ), |
| 102 ]) | 104 ]) |
| 103 self.assertEqual( | 105 self.assertEqual( |
| 104 logger.data, | 106 logger.cached_data, |
| 105 ( | 107 ( |
| 106 dict( | 108 dict( |
| 107 sample_time=ts(123), | 109 sample_time=ts(123), |
| 108 temp_c=420, | 110 temp_c=420, |
| 109 rh_pct=69, | 111 rh_pct=69, |
| 130 temp_c=333, | 132 temp_c=333, |
| 131 rh_pct=777, | 133 rh_pct=777, |
| 132 ingest_time=ts(200), | 134 ingest_time=ts(200), |
| 133 ), | 135 ), |
| 134 ]) | 136 ]) |
| 137 self.assertEqual(self.read_index(), [ | |
| 138 dict(entry_count=0, entry_key=ts(123), byte=0), | |
| 139 ]) | |
| 135 | 140 |
| 136 def test_outside_writes(self): | 141 def test_outside_writes(self): |
| 137 with contextlib.closing(logfile.Logger( | 142 with contextlib.closing(logfile.Logger( |
| 138 str(self.log_path), | 143 str(self.log_path), |
| 139 sample_field='sample_time', | 144 sample_field='sample_time', |
| 145 index_gap=2, | |
| 140 )) as logger: | 146 )) as logger: |
| 141 logger.write_rows([ | 147 logger.write_rows([ |
| 142 dict( | 148 dict( |
| 143 sample_time=ts(100), | 149 sample_time=ts(100), |
| 144 temp_c=999, | 150 temp_c=999, |
| 158 temp_c=256, | 164 temp_c=256, |
| 159 rh_pct=128, | 165 rh_pct=128, |
| 160 ingest_time=ts(4096), | 166 ingest_time=ts(4096), |
| 161 ))) | 167 ))) |
| 162 outfile.flush() | 168 outfile.flush() |
| 163 self.assertEqual(logger.data, ( | 169 # Empty write to update the index. |
| 170 logger.write_rows([]) | |
| 171 self.assertEqual(logger.cached_data, ( | |
| 164 dict( | 172 dict( |
| 165 sample_time=ts(100), | 173 sample_time=ts(100), |
| 166 temp_c=999, | 174 temp_c=999, |
| 167 rh_pct=666, | 175 rh_pct=666, |
| 168 ingest_time=ts(101), | 176 ingest_time=ts(101), |
| 178 temp_c=256, | 186 temp_c=256, |
| 179 rh_pct=128, | 187 rh_pct=128, |
| 180 ingest_time=ts(4096), | 188 ingest_time=ts(4096), |
| 181 ), | 189 ), |
| 182 )) | 190 )) |
| 191 self.assertEqual(self.read_bsons(), [ | |
| 192 dict( | |
| 193 sample_time=ts(100), | |
| 194 temp_c=999, | |
| 195 rh_pct=666, | |
| 196 ingest_time=ts(101), | |
| 197 ), | |
| 198 dict( | |
| 199 sample_time=ts(125), | |
| 200 temp_c=333, | |
| 201 rh_pct=777, | |
| 202 ingest_time=ts(200), | |
| 203 ), | |
| 204 dict( | |
| 205 sample_time=ts(1024), | |
| 206 temp_c=256, | |
| 207 rh_pct=128, | |
| 208 ingest_time=ts(4096), | |
| 209 ), | |
| 210 ]) | |
| 211 self.assertEqual(self.read_index(), [ | |
| 212 dict(entry_count=0, entry_key=ts(100), byte=0), | |
| 213 dict(entry_count=2, entry_key=ts(1024), byte=142), | |
| 214 ]) | |
| 215 | |
| 216 def test_lots_of_outside_writes(self): | |
| 217 with contextlib.closing(logfile.Logger( | |
| 218 str(self.log_path), | |
| 219 sample_field='key', | |
| 220 cached_entries=4, | |
| 221 index_gap=2, | |
| 222 )) as logger: | |
| 223 logger.write_rows(make_messages(10)) | |
| 224 with contextlib.closing(logfile.Logger( | |
| 225 str(self.log_path), | |
| 226 sample_field='key', | |
| 227 cached_entries=4, | |
| 228 index_gap=2, | |
| 229 )) as other_logger: | |
| 230 other_logger.write_rows(make_messages(10, 10)) | |
| 231 logger.write_rows(make_messages(1, 20)) | |
| 232 self.assertEqual(logger.cached_data, ( | |
| 233 dict(key='0011', value=17), | |
| 234 dict(key='0012', value=18), | |
| 235 dict(key='0013', value=19), | |
| 236 dict(key='0014', value=20), | |
| 237 )) | |
| 238 self.assertEqual(self.read_bsons(), make_messages(21)) | |
| 239 self.assertEqual(self.read_index(), [ | |
| 240 dict(entry_count=0, entry_key='0000', byte=0), | |
| 241 dict(entry_count=2, entry_key='0002', byte=60), | |
| 242 dict(entry_count=4, entry_key='0004', byte=120), | |
| 243 dict(entry_count=6, entry_key='0006', byte=180), | |
| 244 dict(entry_count=8, entry_key='0008', byte=240), | |
| 245 dict(entry_count=10, entry_key='000a', byte=300), | |
| 246 dict(entry_count=12, entry_key='000c', byte=360), | |
| 247 dict(entry_count=14, entry_key='000e', byte=420), | |
| 248 dict(entry_count=16, entry_key='0010', byte=480), | |
| 249 dict(entry_count=18, entry_key='0012', byte=540), | |
| 250 dict(entry_count=20, entry_key='0014', byte=600), | |
| 251 ]) | |
| 252 | |
| 253 def test_outside_writes_that_dont_update_index(self): | |
| 254 with contextlib.closing(logfile.Logger( | |
| 255 str(self.log_path), | |
| 256 sample_field='key', | |
| 257 cached_entries=4, | |
| 258 index_gap=2, | |
| 259 )) as logger: | |
| 260 logger.write_rows(make_messages(10)) | |
| 261 with self.log_path.open('ab') as outfile: | |
| 262 for item in make_messages(16, 256): | |
| 263 outfile.write(common.bson_encode(item)) | |
| 264 self.assertEqual(logger.cached_data, ( | |
| 265 dict(key='010c', value=268), | |
| 266 dict(key='010d', value=269), | |
| 267 dict(key='010e', value=270), | |
| 268 dict(key='010f', value=271), | |
| 269 )) | |
| 270 self.assertEqual(self.read_bsons(), [ | |
| 271 dict(key='0000', value=0), | |
| 272 dict(key='0001', value=1), | |
| 273 dict(key='0002', value=2), | |
| 274 dict(key='0003', value=3), | |
| 275 dict(key='0004', value=4), | |
| 276 dict(key='0005', value=5), | |
| 277 dict(key='0006', value=6), | |
| 278 dict(key='0007', value=7), | |
| 279 dict(key='0008', value=8), | |
| 280 dict(key='0009', value=9), | |
| 281 dict(key='0100', value=256), | |
| 282 dict(key='0101', value=257), | |
| 283 dict(key='0102', value=258), | |
| 284 dict(key='0103', value=259), | |
| 285 dict(key='0104', value=260), | |
| 286 dict(key='0105', value=261), | |
| 287 dict(key='0106', value=262), | |
| 288 dict(key='0107', value=263), | |
| 289 dict(key='0108', value=264), | |
| 290 dict(key='0109', value=265), | |
| 291 dict(key='010a', value=266), | |
| 292 dict(key='010b', value=267), | |
| 293 dict(key='010c', value=268), | |
| 294 dict(key='010d', value=269), | |
| 295 dict(key='010e', value=270), | |
| 296 dict(key='010f', value=271), | |
| 297 ]) | |
| 298 self.assertEqual(self.read_index(), [ | |
| 299 dict(entry_count=0, entry_key='0000', byte=0), | |
| 300 dict(entry_count=2, entry_key='0002', byte=60), | |
| 301 dict(entry_count=4, entry_key='0004', byte=120), | |
| 302 dict(entry_count=6, entry_key='0006', byte=180), | |
| 303 dict(entry_count=8, entry_key='0008', byte=240), | |
| 304 ]) | |
| 305 | |
| 306 def test_outside_write_but_dont_skip(self): | |
| 307 with contextlib.closing(logfile.Logger( | |
| 308 str(self.log_path), | |
| 309 sample_field='key', | |
| 310 cached_entries=10, | |
| 311 index_gap=4, | |
| 312 )) as logger: | |
| 313 logger.write_rows(make_messages(10)) | |
| 314 with contextlib.closing(logfile.Logger( | |
| 315 str(self.log_path), | |
| 316 sample_field='key', | |
| 317 cached_entries=10, | |
| 318 index_gap=4, | |
| 319 )) as other_logger: | |
| 320 other_logger.write_rows(make_messages(5, 10)) | |
| 321 self.assertEqual(logger.cached_data, ( | |
| 322 dict(key='0005', value=5), | |
| 323 dict(key='0006', value=6), | |
| 324 dict(key='0007', value=7), | |
| 325 dict(key='0008', value=8), | |
| 326 dict(key='0009', value=9), | |
| 327 dict(key='000a', value=10), | |
| 328 dict(key='000b', value=11), | |
| 329 dict(key='000c', value=12), | |
| 330 dict(key='000d', value=13), | |
| 331 dict(key='000e', value=14), | |
| 332 )) | |
| 333 logger.write_rows(make_messages(5, 15)) | |
| 334 self.assertEqual(self.read_index(), [ | |
| 335 dict(entry_count=0, entry_key='0000', byte=0), | |
| 336 dict(entry_count=4, entry_key='0004', byte=120), | |
| 337 dict(entry_count=8, entry_key='0008', byte=240), | |
| 338 dict(entry_count=12, entry_key='000c', byte=360), | |
| 339 dict(entry_count=16, entry_key='0010', byte=480), | |
| 340 ]) | |
| 341 logger.write_rows(make_messages(1, 20)) | |
| 342 self.assertEqual(self.read_index(), [ | |
| 343 dict(entry_count=0, entry_key='0000', byte=0), | |
| 344 dict(entry_count=4, entry_key='0004', byte=120), | |
| 345 dict(entry_count=8, entry_key='0008', byte=240), | |
| 346 dict(entry_count=12, entry_key='000c', byte=360), | |
| 347 dict(entry_count=16, entry_key='0010', byte=480), | |
| 348 dict(entry_count=20, entry_key='0014', byte=600), | |
| 349 ]) | |
| 350 | |
| 351 def test_start_and_skip(self): | |
| 352 with contextlib.closing(logfile.Logger( | |
| 353 str(self.log_path), | |
| 354 sample_field='key', | |
| 355 cached_entries=4, | |
| 356 index_gap=2, | |
| 357 )) as logger: | |
| 358 logger.write_rows(make_messages(10)) | |
| 359 with contextlib.closing(logfile.Logger( | |
| 360 str(self.log_path), | |
| 361 sample_field='key', | |
| 362 cached_entries=4, | |
| 363 index_gap=2, | |
| 364 )) as other_logger: | |
| 365 self.assertEqual( | |
| 366 other_logger.cached_data, tuple(make_messages(4, 6))) | |
| 183 | 367 |
| 184 def read_bsons(self): | 368 def read_bsons(self): |
| 185 with self.log_path.open('rb') as infile: | 369 with self.log_path.open('rb') as infile: |
| 186 return bson.decode_all( | 370 return bson.decode_all( |
| 187 infile.read(), common.BSON_OPTIONS) | 371 infile.read(), common.BSON_OPTIONS) |
| 188 | 372 |
| 373 def read_index(self): | |
| 374 with self.index_path.open('rb') as infile: | |
| 375 return bson.decode_all( | |
| 376 infile.read(), common.BSON_OPTIONS) | |
| 377 | |
| 378 | |
| 379 def make_messages(count, start=0): | |
| 380 return [ | |
| 381 dict(key=format(n, '04x'), value=n) | |
| 382 for n in range(start, start + count)] | |
| 383 | |
| 189 | 384 |
| 190 if __name__ == '__main__': | 385 if __name__ == '__main__': |
| 191 unittest.main() | 386 unittest.main() |
