import json import os import sys import logging from flask import Flask, request, jsonify from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS DATAPOINTS_CHUNK = 80000 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) logger = logging.getLogger(__name__) app = Flask(__name__) INFLUXDB_URL = os.environ.get("INFLUXDB_URL", "http://influxdb:8086") INFLUXDB_TOKEN = os.environ.get("INFLUXDB_TOKEN", "") INFLUXDB_ORG = os.environ.get("INFLUXDB_ORG", "tsysdemo") INFLUXDB_BUCKET = os.environ.get("INFLUXDB_BUCKET", "demo_metrics") _client = None _write_api = None def get_client(): global _client if _client is None: _client = InfluxDBClient( url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG ) return _client def get_write_api(): global _write_api if _write_api is None: _write_api = get_client().write_api(write_options=SYNCHRONOUS) return _write_api @app.route("/health", methods=["GET"]) def health(): try: client = get_client() ping = client.ping() if ping: return jsonify({"status": "healthy"}), 200 return jsonify({"status": "degraded", "influxdb": "not reachable"}), 200 except Exception as exc: return jsonify({"status": "degraded", "error": str(exc)}), 200 @app.route("/", methods=["GET"]) def index(): return jsonify( { "service": "apple-health-collector", "endpoints": { "health": "GET /health", "collect": "POST /collect (JSON body)", }, "influxdb": { "url": INFLUXDB_URL, "org": INFLUXDB_ORG, "bucket": INFLUXDB_BUCKET, }, } ) @app.route("/collect", methods=["POST"]) def collect(): logger.info("Health data collection request received") if not request.data: return jsonify({"error": "No data provided"}), 400 try: healthkit_data = json.loads(request.data) except (json.JSONDecodeError, ValueError) as exc: logger.error("Invalid JSON: %s", exc) return jsonify({"error": "Invalid JSON", "detail": str(exc)}), 400 points_written = 0 try: metrics = healthkit_data.get("data", {}).get("metrics", []) for metric in metrics: measurement = metric.get("name", "unknown") for datapoint in metric.get("data", []): timestamp = datapoint.get("date") if not timestamp: continue fields = {} tags = {} for key, value in datapoint.items(): if key == "date": continue if isinstance(value, (int, float)): fields[key] = float(value) else: tags[key] = str(value) if not fields: continue record = { "measurement": measurement, "tags": tags, "fields": fields, "time": timestamp, } get_write_api().write( bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=record ) points_written += 1 workouts = healthkit_data.get("data", {}).get("workouts", []) for workout in workouts: workout_name = workout.get("name", "unknown") workout_start = workout.get("start", "") workout_end = workout.get("end", "") workout_id = f"{workout_name}-{workout_start}-{workout_end}" for gps_point in workout.get("route", []): ts = gps_point.get("timestamp") if not ts: continue record = { "measurement": "workout_route", "tags": { "workout_id": workout_id, "workout_name": workout_name, }, "fields": { "lat": float(gps_point.get("lat", 0)), "lng": float(gps_point.get("lon", 0)), }, "time": ts, } get_write_api().write( bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=record ) points_written += 1 logger.info("Wrote %d data points", points_written) return jsonify({"status": "success", "points_written": points_written}), 200 except Exception as exc: logger.exception("Error processing health data") return jsonify({"error": "Processing failed", "detail": str(exc)}), 500 if __name__ == "__main__": logger.info("Apple Health data collector starting") logger.info("InfluxDB: %s", INFLUXDB_URL) logger.info("Bucket: %s", INFLUXDB_BUCKET) app.run(host="0.0.0.0", port=5353)