comparison weatherlog/logger_test.py @ 14:c01f9929ae38

Make logger and HTTP writer more general and resilient. This makes the logger and HTTP writer more general, by removing any dependency upon the exact data type they are writing. They can now handle any type of BSON-serializable dict, and track what they have sent by keeping track of the last *byte*, not the last timestamp.
author Paul Fisher <paul@pfish.zone>
date Tue, 15 Oct 2019 22:40:24 -0400
parents 8a350ec1aa78
children 770215590d80
comparison
equal deleted inserted replaced
13:4c81182eaa6b 14:c01f9929ae38
44 44
45 def ts(t): 45 def ts(t):
46 return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC) 46 return datetime.datetime.utcfromtimestamp(t).replace(tzinfo=pytz.UTC)
47 47
48 48
49 bs = logger.bson_encode
50
51
49 class LoggerTest(unittest.TestCase): 52 class LoggerTest(unittest.TestCase):
50 53
51 maxDiff = None 54 maxDiff = None
52 55
53 def setUp(self): 56 def setUp(self):
63 writer = FakeWriter() 66 writer = FakeWriter()
64 with contextlib.closing( 67 with contextlib.closing(
65 logger.BufferedLogger(self.temp_dir.name, writer) 68 logger.BufferedLogger(self.temp_dir.name, writer)
66 ) as bl: 69 ) as bl:
67 bl.WAIT_TIME = 0.2 70 bl.WAIT_TIME = 0.2
68 bl.write(types.Reading(ts(3), 1, 2)) 71 bl.write({'first': 'item'})
69 bl.write(types.Reading(ts(6), 4, 5)) 72 bl.write({'entry': 2})
70 bl.write(types.Reading(ts(8), 10, 9)) 73 bl.write({'thing': 'three'})
71 bl.start() 74 bl.start()
72 time.sleep(1) # Ensure that we get an entire logger cycle in. 75 time.sleep(1) # Ensure that we get an entire logger cycle in.
73 76
74 self.assertEqual( 77 self.assertEqual(
75 writer.writes, 78 writer.writes,
76 [ 79 [
77 (types.Reading(ts(3), 1, 2), types.Reading(ts(6), 4, 5)), 80 (bs({'first': 'item'}), bs({'entry': 2})),
78 (types.Reading(ts(8), 10, 9),), 81 (bs({'thing': 'three'}),),
79 ], 82 ],
80 ) 83 )
81 self.assertEqual(self._read_last_sent(), '8.0') 84 self.assertEqual(self._read_last_sent(), '59')
82 self.assertEqual(self._read_bsons(), [ 85 self.assertEqual(self._read_bsons(), [
83 dict(sample_time=ts(3), temp_c=1, rh_pct=2), 86 {'first': 'item'},
84 dict(sample_time=ts(6), temp_c=4, rh_pct=5), 87 {'entry': 2},
85 dict(sample_time=ts(8), temp_c=10, rh_pct=9), 88 {'thing': 'three'},
86 ]) 89 ])
87 90
88 def test_append_and_resume(self): 91 def test_append_and_resume(self):
89 existing_values = [ 92 existing_values = [
90 dict(sample_time=ts(10), temp_c=20, rh_pct=30), 93 dict(sample_time=ts(10), temp_c=20, rh_pct=30),
91 dict(sample_time=ts(60), temp_c=10, rh_pct=99), 94 dict(sample_time=ts(60), temp_c=10, rh_pct=99),
92 ] 95 ]
93 with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile: 96 with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile:
94 for value in existing_values: 97 for value in existing_values:
95 outfile.write(logger.bson_encode(value)) 98 outfile.write(bs(value))
96 outfile.write(b'non-BSON garbage') 99 outfile.write(b'non-BSON garbage')
97 100
98 with (self.temp_path / logger.LAST_SENT_FILENAME).open('w') as outfile: 101 with (self.temp_path / logger.OLD_LAST_TS).open('w') as outfile:
99 outfile.write('10') 102 outfile.write('10')
100 103
101 writer = FakeWriter() 104 writer = FakeWriter()
102 with contextlib.closing( 105 with contextlib.closing(
103 logger.BufferedLogger(str(self.temp_path), writer) 106 logger.BufferedLogger(str(self.temp_path), writer)
104 ) as bl: 107 ) as bl:
105 bl.WAIT_TIME = 0.2 108 bl.WAIT_TIME = 0.2
106 bl.start() 109 bl.start()
107 time.sleep(0.5) 110 time.sleep(0.5)
108 bl.write(types.Reading(ts(99), temp_c=-40, rh_pct=2)) 111 bl.write({'some new': 'entry'})
109 time.sleep(0.5) 112 time.sleep(0.5)
110 113
111 self.assertEqual(self._read_last_sent(), '99.0') 114 self.assertEqual(self._read_last_sent(), '125')
112 self.assertEqual(self._read_bsons(), [ 115 self.assertEqual(self._read_bsons(), [
113 dict(sample_time=ts(10), temp_c=20, rh_pct=30), 116 dict(sample_time=ts(10), temp_c=20, rh_pct=30),
114 dict(sample_time=ts(60), temp_c=10, rh_pct=99), 117 dict(sample_time=ts(60), temp_c=10, rh_pct=99),
115 dict(sample_time=ts(99), temp_c=-40, rh_pct=2), 118 {'some new': 'entry'},
116 ]) 119 ])
117 self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [ 120 self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [
118 types.Reading(ts(60), 10, 99), 121 bs(dict(sample_time=ts(60), temp_c=10, rh_pct=99)),
119 types.Reading(ts(99), -40, 2), 122 bs({'some new': 'entry'}),
123 ])
124
125 def test_resume_from_byte(self):
126 existing_values = [
127 {'old': 'value'},
128 {'unsent': 'value'},
129 ]
130 with (self.temp_path / logger.BSON_FILENAME).open('wb') as outfile:
131 for value in existing_values:
132 outfile.write(bs(value))
133 outfile.write(b'non-BSON garbage')
134 with (self.temp_path / logger.START_BYTE).open('w') as outfile:
135 outfile.write('20') # immediately after 'old: value'
136
137 writer = FakeWriter()
138 with contextlib.closing(
139 logger.BufferedLogger(str(self.temp_path), writer)
140 ) as bl:
141 bl.WAIT_TIME = 0.2
142 bl.start()
143 bl.write({'some new': 'entry'})
144 time.sleep(0.5)
145
146 self.assertEqual(self._read_last_sent(), '68')
147 self.assertEqual(self._read_bsons(), [
148 {'old': 'value'},
149 {'unsent': 'value'},
150 {'some new': 'entry'},
151 ])
152 self.assertEqual(list(itertools.chain.from_iterable(writer.writes)), [
153 bs({'unsent': 'value'}),
154 bs({'some new': 'entry'}),
120 ]) 155 ])
121 156
122 def test_send_failure(self): 157 def test_send_failure(self):
123 writer = FlakyWriter() 158 writer = FlakyWriter()
124 with contextlib.closing( 159 with contextlib.closing(
125 logger.BufferedLogger(str(self.temp_path), writer) 160 logger.BufferedLogger(str(self.temp_path), writer)
126 ) as bl: 161 ) as bl:
127 bl.WAIT_TIME = 0.2 162 bl.WAIT_TIME = 0.2
128 bl.start() 163 bl.start()
129 bl.write(types.Reading(ts(1337), 420, 69)) 164 bl.write({'cool write': 'succeeds'})
130 time.sleep(0.5) 165 time.sleep(0.5)
131 writer.is_bad = True 166 writer.is_bad = True
132 bl.write(types.Reading(ts(31337), 666, 999)) 167 bl.write({'bad write': 'fails'})
133 time.sleep(0.5) 168 time.sleep(0.5)
134 169
135 self.assertEqual(self._read_last_sent(), '1337.0') 170 self.assertEqual(self._read_last_sent(), '30')
136 self.assertEqual( 171 self.assertEqual(
137 writer.writes, [(types.Reading(ts(1337), 420, 69),)]) 172 writer.writes, [(bs({'cool write': 'succeeds'}),)])
138 self.assertEqual(self._read_bsons(), [ 173 self.assertEqual(self._read_bsons(), [
139 dict(sample_time=ts(1337), temp_c=420, rh_pct=69), 174 {'cool write': 'succeeds'},
140 dict(sample_time=ts(31337), temp_c=666, rh_pct=999), 175 {'bad write': 'fails'},
141 ]) 176 ])
142 177
143 # Ensure that we resume writing again when the condition clears. 178 # Ensure that we resume writing again when the condition clears.
144 179
145 writer.is_bad = False 180 writer.is_bad = False
146 time.sleep(0.5) 181 time.sleep(0.5)
147 self.assertEqual(self._read_last_sent(), '31337.0') 182 self.assertEqual(self._read_last_sent(), '56')
148 self.assertEqual( 183 self.assertEqual(
149 writer.writes, 184 writer.writes,
150 [ 185 [
151 (types.Reading(ts(1337), 420, 69),), 186 (bs({'cool write': 'succeeds'}),),
152 (types.Reading(ts(31337), 666, 999),), 187 (bs({'bad write': 'fails'}),),
153 ]) 188 ])
154 self.assertEqual(self._read_bsons(), [ 189 self.assertEqual(self._read_bsons(), [
155 dict(sample_time=ts(1337), temp_c=420, rh_pct=69), 190 {'cool write': 'succeeds'},
156 dict(sample_time=ts(31337), temp_c=666, rh_pct=999), 191 {'bad write': 'fails'},
157 ]) 192 ])
158 193
159 def test_fail_upon_lock(self): 194 def test_fail_upon_lock(self):
160 bson_file = str(self.temp_path / logger.BSON_FILENAME) 195 bson_file = str(self.temp_path / logger.BSON_FILENAME)
161 out_queue = multiprocessing.Queue() 196 out_queue = multiprocessing.Queue()
175 proc.close() 210 proc.close()
176 211
177 # Test that it works after the lock is released. 212 # Test that it works after the lock is released.
178 logger.BufferedLogger(str(self.temp_path), FakeWriter()).close() 213 logger.BufferedLogger(str(self.temp_path), FakeWriter()).close()
179 214
215 def test_upgrade_last_sent(self):
216 for (timestamp, byte_count) in [
217 ('5', '0'),
218 ('20', '52'),
219 ('30', '78'),
220 ]:
221 bson_file = self.temp_path / logger.BSON_FILENAME
222 with bson_file.open('wb') as outfile:
223 outfile.write(logger.bson_encode(dict(sample_time=ts(10))))
224 outfile.write(logger.bson_encode(dict(sample_time=ts(20))))
225 outfile.write(logger.bson_encode(dict(sample_time=ts(30))))
226 outfile.write(b'some bogus data')
227 last_sent = self.temp_path / logger.OLD_LAST_TS
228 with last_sent.open('w') as outfile:
229 outfile.write(timestamp)
230 start_byte = self.temp_path / logger.START_BYTE
231 with bson_file.open('r+b') as infile:
232 logger._read_unsent_and_upgrade(
233 infile, last_sent, start_byte)
234 self.assertFalse(last_sent.exists())
235 with start_byte.open('r') as infile:
236 self.assertEqual(infile.read(), byte_count)
237
238 def test_upgrade_last_sent_no_last_sent(self):
239 bson_file = self.temp_path / logger.BSON_FILENAME
240 with bson_file.open('wb') as outfile:
241 outfile.write(logger.bson_encode(dict(sample_time=ts(10))))
242 outfile.write(logger.bson_encode(dict(sample_time=ts(20))))
243 outfile.write(logger.bson_encode(dict(sample_time=ts(30))))
244 last_sent = self.temp_path / logger.OLD_LAST_TS
245 start_byte = self.temp_path / logger.START_BYTE
246 with start_byte.open('w') as outfile:
247 outfile.write('untouched')
248 with bson_file.open('r+b') as infile:
249 logger._read_unsent_and_upgrade(
250 infile, last_sent, start_byte)
251 self.assertFalse(last_sent.exists())
252 with start_byte.open('r') as infile:
253 self.assertEqual(infile.read(), 'untouched')
254
180 def _read_last_sent(self): 255 def _read_last_sent(self):
181 with (self.temp_path / logger.LAST_SENT_FILENAME).open('r') as infile: 256 with (self.temp_path / logger.START_BYTE).open('r') as infile:
182 return infile.read() 257 return infile.read()
183 258
184 def _read_bsons(self): 259 def _read_bsons(self):
185 with (self.temp_path / logger.BSON_FILENAME).open('rb') as infile: 260 with (self.temp_path / logger.BSON_FILENAME).open('rb') as infile:
186 return bson.decode_all(infile.read(), logger.BSON_OPTIONS) 261 return bson.decode_all(infile.read(), logger.BSON_OPTIONS)