Mercurial > personal > weatherlog
comparison weatherlog/logger_test.py @ 14:c01f9929ae38
Make logger and HTTP writer more general and resilient.
This makes the logger and HTTP writer more general, by removing
any dependency upon the exact data type they are writing.
They can now handle any type of BSON-serializable dict,
and track what they have sent by keeping track of the last *byte*,
not the last timestamp.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Tue, 15 Oct 2019 22:40:24 -0400 |
parents | 8a350ec1aa78 |
children | 770215590d80 |
comparison
equal
deleted
inserted
replaced
13:4c81182eaa6b | 14:c01f9929ae38 |
---|---|
44 | 44 |
45 def ts(t): | 45 def ts(t): |
46 return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC) | 46 return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC) |
47 | 47 |
48 | 48 |
49 bs = logger.bson_encode | |
50 | |
51 | |
49 class LoggerTest(unittest.TestCase): | 52 class LoggerTest(unittest.TestCase): |
50 | 53 |
51 maxDiff = None | 54 maxDiff = None |
52 | 55 |
53 def setUp(self): | 56 def setUp(self): |
63 writer = FakeWriter() | 66 writer = FakeWriter() |
64 with contextlib.closing( | 67 with contextlib.closing( |
65 logger.BufferedLogger(self.temp_dir.name, writer) | 68 logger.BufferedLogger(self.temp_dir.name, writer) |
66 ) as bl: | 69 ) as bl: |
67 bl.WAIT_TIME = 0.2 | 70 bl.WAIT_TIME = 0.2 |
68 bl.write(types.Reading(ts(3), 1, 2)) | 71 bl.write({'first': 'item'}) |
69 bl.write(types.Reading(ts(6), 4, 5)) | 72 bl.write({'entry': 2}) |
70 bl.write(types.Reading(ts(8), 10, 9)) | 73 bl.write({'thing': 'three'}) |
71 bl.start() | 74 bl.start() |
72 time.sleep(1) # Ensure that we get an entire logger cycle in. | 75 time.sleep(1) # Ensure that we get an entire logger cycle in. |
73 | 76 |
74 self.assertEqual( | 77 self.assertEqual( |
75 writer.writes, | 78 writer.writes, |
76 [ | 79 [ |
77 (types.Reading(ts(3), 1, 2), types.Reading(ts(6), 4, 5)), | 80 (bs({'first': 'item'}), bs({'entry': 2})), |
78 (types.Reading(ts(8), 10, 9),), | 81 (bs({'thing': 'three'}),), |
79 ], | 82 ], |
80 ) | 83 ) |
81 self.assertEqual(self._read_last_sent(), '8.0') | 84 self.assertEqual(self._read_last_sent(), '59') |
82 self.assertEqual(self._read_bsons(), [ | 85 self.assertEqual(self._read_bsons(), [ |
83 dict(sample_time=ts(3), temp_c=1, rh_pct=2), | 86 {'first': 'item'}, |
84 dict(sample_time=ts(6), temp_c=4, rh_pct=5), | 87 {'entry': 2}, |
85 dict(sample_time=ts(8), temp_c=10, rh_pct=9), | 88 {'thing': 'three'}, |
86 ]) | 89 ]) |
87 | 90 |
88 def test_append_and_resume(self): | 91 def test_append_and_resume(self): |
89 existing_values = [ | 92 existing_values = [ |
90 dict(sample_time=ts(10), temp_c=20, rh_pct=30), | 93 dict(sample_time=ts(10), temp_c=20, rh_pct=30), |
91 dict(sample_time=ts(60), temp_c=10, rh_pct=99), | 94 dict(sample_time=ts(60), temp_c=10, rh_pct=99), |
92 ] | 95 ] |
93 with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile: | 96 with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile: |
94 for value in existing_values: | 97 for value in existing_values: |
95 outfile.write(logger.bson_encode(value)) | 98 outfile.write(bs(value)) |
96 outfile.write(b'non-BSON garbage') | 99 outfile.write(b'non-BSON garbage') |
97 | 100 |
98 with (self.temp_path / logger.LAST_SENT_FILENAME).open('w') as outfile: | 101 with (self.temp_path / logger.OLD_LAST_TS).open('w') as outfile: |
99 outfile.write('10') | 102 outfile.write('10') |
100 | 103 |
101 writer = FakeWriter() | 104 writer = FakeWriter() |
102 with contextlib.closing( | 105 with contextlib.closing( |
103 logger.BufferedLogger(str(self.temp_path), writer) | 106 logger.BufferedLogger(str(self.temp_path), writer) |
104 ) as bl: | 107 ) as bl: |
105 bl.WAIT_TIME = 0.2 | 108 bl.WAIT_TIME = 0.2 |
106 bl.start() | 109 bl.start() |
107 time.sleep(0.5) | 110 time.sleep(0.5) |
108 bl.write(types.Reading(ts(99), temp_c=-40, rh_pct=2)) | 111 bl.write({'some new': 'entry'}) |
109 time.sleep(0.5) | 112 time.sleep(0.5) |
110 | 113 |
111 self.assertEqual(self._read_last_sent(), '99.0') | 114 self.assertEqual(self._read_last_sent(), '125') |
112 self.assertEqual(self._read_bsons(), [ | 115 self.assertEqual(self._read_bsons(), [ |
113 dict(sample_time=ts(10), temp_c=20, rh_pct=30), | 116 dict(sample_time=ts(10), temp_c=20, rh_pct=30), |
114 dict(sample_time=ts(60), temp_c=10, rh_pct=99), | 117 dict(sample_time=ts(60), temp_c=10, rh_pct=99), |
115 dict(sample_time=ts(99), temp_c=-40, rh_pct=2), | 118 {'some new': 'entry'}, |
116 ]) | 119 ]) |
117 self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [ | 120 self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [ |
118 types.Reading(ts(60), 10, 99), | 121 bs(dict(sample_time=ts(60), temp_c=10, rh_pct=99)), |
119 types.Reading(ts(99), -40, 2), | 122 bs({'some new': 'entry'}), |
123 ]) | |
124 | |
125 def test_resume_from_byte(self): | |
126 existing_values = [ | |
127 {'old': 'value'}, | |
128 {'unsent': 'value'}, | |
129 ] | |
130 with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile: | |
131 for value in existing_values: | |
132 outfile.write(bs(value)) | |
133 outfile.write(b'non-BSON garbage') | |
134 with (self.temp_path / logger.START_BYTE).open('w') as outfile: | |
135 outfile.write('20') # immediately after 'old: value' | |
136 | |
137 writer = FakeWriter() | |
138 with contextlib.closing( | |
139 logger.BufferedLogger(str(self.temp_path), writer) | |
140 ) as bl: | |
141 bl.WAIT_TIME = 0.2 | |
142 bl.start() | |
143 bl.write({'some new': 'entry'}) | |
144 time.sleep(0.5) | |
145 | |
146 self.assertEqual(self._read_last_sent(), '68') | |
147 self.assertEqual(self._read_bsons(), [ | |
148 {'old': 'value'}, | |
149 {'unsent': 'value'}, | |
150 {'some new': 'entry'}, | |
151 ]) | |
152 self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [ | |
153 bs({'unsent': 'value'}), | |
154 bs({'some new': 'entry'}), | |
120 ]) | 155 ]) |
121 | 156 |
122 def test_send_failure(self): | 157 def test_send_failure(self): |
123 writer = FlakyWriter() | 158 writer = FlakyWriter() |
124 with contextlib.closing( | 159 with contextlib.closing( |
125 logger.BufferedLogger(str(self.temp_path), writer) | 160 logger.BufferedLogger(str(self.temp_path), writer) |
126 ) as bl: | 161 ) as bl: |
127 bl.WAIT_TIME = 0.2 | 162 bl.WAIT_TIME = 0.2 |
128 bl.start() | 163 bl.start() |
129 bl.write(types.Reading(ts(1337), 420, 69)) | 164 bl.write({'cool write': 'succeeds'}) |
130 time.sleep(0.5) | 165 time.sleep(0.5) |
131 writer.is_bad = True | 166 writer.is_bad = True |
132 bl.write(types.Reading(ts(31337), 666, 999)) | 167 bl.write({'bad write': 'fails'}) |
133 time.sleep(0.5) | 168 time.sleep(0.5) |
134 | 169 |
135 self.assertEqual(self._read_last_sent(), '1337.0') | 170 self.assertEqual(self._read_last_sent(), '30') |
136 self.assertEqual( | 171 self.assertEqual( |
137 writer.writes, [(types.Reading(ts(1337), 420, 69),)]) | 172 writer.writes, [(bs({'cool write': 'succeeds'}),)]) |
138 self.assertEqual(self._read_bsons(), [ | 173 self.assertEqual(self._read_bsons(), [ |
139 dict(sample_time=ts(1337), temp_c=420, rh_pct=69), | 174 {'cool write': 'succeeds'}, |
140 dict(sample_time=ts(31337), temp_c=666, rh_pct=999), | 175 {'bad write': 'fails'}, |
141 ]) | 176 ]) |
142 | 177 |
143 # Ensure that we resume writing again when the condition clears. | 178 # Ensure that we resume writing again when the condition clears. |
144 | 179 |
145 writer.is_bad = False | 180 writer.is_bad = False |
146 time.sleep(0.5) | 181 time.sleep(0.5) |
147 self.assertEqual(self._read_last_sent(), '31337.0') | 182 self.assertEqual(self._read_last_sent(), '56') |
148 self.assertEqual( | 183 self.assertEqual( |
149 writer.writes, | 184 writer.writes, |
150 [ | 185 [ |
151 (types.Reading(ts(1337), 420, 69),), | 186 (bs({'cool write': 'succeeds'}),), |
152 (types.Reading(ts(31337), 666, 999),), | 187 (bs({'bad write': 'fails'}),), |
153 ]) | 188 ]) |
154 self.assertEqual(self._read_bsons(), [ | 189 self.assertEqual(self._read_bsons(), [ |
155 dict(sample_time=ts(1337), temp_c=420, rh_pct=69), | 190 {'cool write': 'succeeds'}, |
156 dict(sample_time=ts(31337), temp_c=666, rh_pct=999), | 191 {'bad write': 'fails'}, |
157 ]) | 192 ]) |
158 | 193 |
159 def test_fail_upon_lock(self): | 194 def test_fail_upon_lock(self): |
160 bson_file = str(self.temp_path / logger.BSON_FILENAME) | 195 bson_file = str(self.temp_path / logger.BSON_FILENAME) |
161 out_queue = multiprocessing.Queue() | 196 out_queue = multiprocessing.Queue() |
175 proc.close() | 210 proc.close() |
176 | 211 |
177 # Test that it works after the lock is released. | 212 # Test that it works after the lock is released. |
178 logger.BufferedLogger(str(self.temp_path), FakeWriter()).close() | 213 logger.BufferedLogger(str(self.temp_path), FakeWriter()).close() |
179 | 214 |
215 def test_upgrade_last_sent(self): | |
216 for (timestamp, byte_count) in [ | |
217 ('5', '0'), | |
218 ('20', '52'), | |
219 ('30', '78'), | |
220 ]: | |
221 bson_file = self.temp_path / logger.BSON_FILENAME | |
222 with bson_file.open('wb') as outfile: | |
223 outfile.write(logger.bson_encode(dict(sample_time=ts(10)))) | |
224 outfile.write(logger.bson_encode(dict(sample_time=ts(20)))) | |
225 outfile.write(logger.bson_encode(dict(sample_time=ts(30)))) | |
226 outfile.write(b'some bogus data') | |
227 last_sent = self.temp_path / logger.OLD_LAST_TS | |
228 with last_sent.open('w') as outfile: | |
229 outfile.write(timestamp) | |
230 start_byte = self.temp_path / logger.START_BYTE | |
231 with bson_file.open('r+b') as infile: | |
232 logger._read_unsent_and_upgrade( | |
233 infile, last_sent, start_byte) | |
234 self.assertFalse(last_sent.exists()) | |
235 with start_byte.open('r') as infile: | |
236 self.assertEqual(infile.read(), byte_count) | |
237 | |
238 def test_upgrade_last_sent_no_last_sent(self): | |
239 bson_file = self.temp_path / logger.BSON_FILENAME | |
240 with bson_file.open('wb') as outfile: | |
241 outfile.write(logger.bson_encode(dict(sample_time=ts(10)))) | |
242 outfile.write(logger.bson_encode(dict(sample_time=ts(20)))) | |
243 outfile.write(logger.bson_encode(dict(sample_time=ts(30)))) | |
244 last_sent = self.temp_path / logger.OLD_LAST_TS | |
245 start_byte = self.temp_path / logger.START_BYTE | |
246 with start_byte.open('w') as outfile: | |
247 outfile.write('untouched') | |
248 with bson_file.open('r+b') as infile: | |
249 logger._read_unsent_and_upgrade( | |
250 infile, last_sent, start_byte) | |
251 self.assertFalse(last_sent.exists()) | |
252 with start_byte.open('r') as infile: | |
253 self.assertEqual(infile.read(), 'untouched') | |
254 | |
180 def _read_last_sent(self): | 255 def _read_last_sent(self): |
181 with (self.temp_path / logger.LAST_SENT_FILENAME).open('r') as infile: | 256 with (self.temp_path / logger.START_BYTE).open('r') as infile: |
182 return infile.read() | 257 return infile.read() |
183 | 258 |
184 def _read_bsons(self): | 259 def _read_bsons(self): |
185 with (self.temp_path / logger.BSON_FILENAME).open('rb') as infile: | 260 with (self.temp_path / logger.BSON_FILENAME).open('rb') as infile: |
186 return bson.decode_all(infile.read(), logger.BSON_OPTIONS) | 261 return bson.decode_all(infile.read(), logger.BSON_OPTIONS) |