Mercurial > personal > weather-server
view weather_server/server.py @ 19:47987502bf4c
Add graph, make it public, and bump the version.
This checks in the tsc-compiled JS files because idklol.
author | Paul Fisher <paul@pfish.zone> |
---|---|
date | Sun, 13 Oct 2019 18:22:06 -0400 |
parents | 9a609bcf0809 |
children | beb42c835c52 |
line wrap: on
line source
import datetime import hmac import sys import bson import flask from . import common from . import locations from . import types def build_app(root_directory: str) -> flask.Flask: locs = locations.Locations(root_directory) app = flask.Flask(__name__) app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 @app.route('/favicon.ico') def favicon(): return flask.send_file('static/favicon.ico') @app.route('/') def home(): return 'Weather server' @app.route('/_submit', methods=['POST']) def submit(): req = flask.request reader = bson.decode_file_iter( req.stream, codec_options=common.BSON_OPTIONS) try: preamble = next(reader) loc_name = preamble['location'] password = str(preamble['password']) loc, logger = locs.get(loc_name) if not hmac.compare_digest(password, loc.password): flask.abort(400) entries = [ types.Reading.from_now( sample_time=item['sample_time'], temp_c=item['temp_c'], rh_pct=item['rh_pct'], ) for item in reader ] except (KeyError, bson.InvalidBSON): flask.abort(400) logger.write_rows(entries) return flask.jsonify({'status': 'OK'}) @app.route('/<location>') def show(location: str): try: loc, logger = locs.get(location) except KeyError: flask.abort(404) data = logger.data if data: last_reading = data[-1] tz = loc.timezone() date = tz.normalize(last_reading.sample_time.astimezone(tz)) else: last_reading = None date = None return flask.render_template( 'location.html', location=loc, last_reading=last_reading, date=date) @app.route('/<location>/recent') def recent(location: str): try: loc, logger = locs.get(location) except KeyError: flask.abort(404) req = flask.request try: seconds = int(req.args['seconds']) except (KeyError, ValueError): flask.abort(400) start = common.utc_now() - datetime.timedelta(seconds=seconds) readings = [ r.as_dict() for r in logger.data if start < r.sample_time ] resp = flask.Response() resp.content_type = 'application/json' resp.data = common.json_dumps({ 'timezone': loc.tz_name, 'readings': readings, }) return resp return app def main(argv): """Main function for a simple local demo.""" app = build_app(argv[0]) app.run(host='0.0.0.0') if __name__ == '__main__': main(sys.argv[1:])