- Replace MinIO + Chrome with SeaweedFS (S3) + bucket init container - Update Reactive Resume to v5 config (S3_* env vars, APP_URL, AUTH_SECRET) - Fix Kiwix: smaller ZIM download, graceful fallback on failure, start_period - Fix Apple Health: use InfluxDB ping() instead of deprecated ready() - Remove stale RESUME_CHROME_TOKEN and RESUME_REFRESH_TOKEN_SECRET - Add .yamllint config to relax line-length for compose template - Update validate-all.sh to use local yamllint config and new image refs - Update unit tests for createbucket service (replaces chrome) 💘 Generated with Crush Assisted-by: GLM-5.1 via Crush <crush@charm.land>
165 lines
5.0 KiB
Python
165 lines
5.0 KiB
Python
import json
|
|
import os
|
|
import sys
|
|
import logging
|
|
from flask import Flask, request, jsonify
|
|
from influxdb_client import InfluxDBClient
|
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
|
|
|
DATAPOINTS_CHUNK = 80000
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__)
|
|
|
|
INFLUXDB_URL = os.environ.get("INFLUXDB_URL", "http://influxdb:8086")
|
|
INFLUXDB_TOKEN = os.environ.get("INFLUXDB_TOKEN", "")
|
|
INFLUXDB_ORG = os.environ.get("INFLUXDB_ORG", "tsysdemo")
|
|
INFLUXDB_BUCKET = os.environ.get("INFLUXDB_BUCKET", "demo_metrics")
|
|
|
|
_client = None
|
|
_write_api = None
|
|
|
|
|
|
def get_client():
|
|
global _client
|
|
if _client is None:
|
|
_client = InfluxDBClient(
|
|
url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG
|
|
)
|
|
return _client
|
|
|
|
|
|
def get_write_api():
|
|
global _write_api
|
|
if _write_api is None:
|
|
_write_api = get_client().write_api(write_options=SYNCHRONOUS)
|
|
return _write_api
|
|
|
|
|
|
@app.route("/health", methods=["GET"])
|
|
def health():
|
|
try:
|
|
client = get_client()
|
|
ping = client.ping()
|
|
if ping:
|
|
return jsonify({"status": "healthy"}), 200
|
|
return jsonify({"status": "degraded", "influxdb": "not reachable"}), 200
|
|
except Exception as exc:
|
|
return jsonify({"status": "degraded", "error": str(exc)}), 200
|
|
|
|
|
|
@app.route("/", methods=["GET"])
|
|
def index():
|
|
return jsonify(
|
|
{
|
|
"service": "apple-health-collector",
|
|
"endpoints": {
|
|
"health": "GET /health",
|
|
"collect": "POST /collect (JSON body)",
|
|
},
|
|
"influxdb": {
|
|
"url": INFLUXDB_URL,
|
|
"org": INFLUXDB_ORG,
|
|
"bucket": INFLUXDB_BUCKET,
|
|
},
|
|
}
|
|
)
|
|
|
|
|
|
@app.route("/collect", methods=["POST"])
|
|
def collect():
|
|
logger.info("Health data collection request received")
|
|
|
|
if not request.data:
|
|
return jsonify({"error": "No data provided"}), 400
|
|
|
|
try:
|
|
healthkit_data = json.loads(request.data)
|
|
except (json.JSONDecodeError, ValueError) as exc:
|
|
logger.error("Invalid JSON: %s", exc)
|
|
return jsonify({"error": "Invalid JSON", "detail": str(exc)}), 400
|
|
|
|
points_written = 0
|
|
|
|
try:
|
|
metrics = healthkit_data.get("data", {}).get("metrics", [])
|
|
for metric in metrics:
|
|
measurement = metric.get("name", "unknown")
|
|
for datapoint in metric.get("data", []):
|
|
timestamp = datapoint.get("date")
|
|
if not timestamp:
|
|
continue
|
|
|
|
fields = {}
|
|
tags = {}
|
|
for key, value in datapoint.items():
|
|
if key == "date":
|
|
continue
|
|
if isinstance(value, (int, float)):
|
|
fields[key] = float(value)
|
|
else:
|
|
tags[key] = str(value)
|
|
|
|
if not fields:
|
|
continue
|
|
|
|
record = {
|
|
"measurement": measurement,
|
|
"tags": tags,
|
|
"fields": fields,
|
|
"time": timestamp,
|
|
}
|
|
get_write_api().write(
|
|
bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=record
|
|
)
|
|
points_written += 1
|
|
|
|
workouts = healthkit_data.get("data", {}).get("workouts", [])
|
|
for workout in workouts:
|
|
workout_name = workout.get("name", "unknown")
|
|
workout_start = workout.get("start", "")
|
|
workout_end = workout.get("end", "")
|
|
workout_id = f"{workout_name}-{workout_start}-{workout_end}"
|
|
|
|
for gps_point in workout.get("route", []):
|
|
ts = gps_point.get("timestamp")
|
|
if not ts:
|
|
continue
|
|
|
|
record = {
|
|
"measurement": "workout_route",
|
|
"tags": {
|
|
"workout_id": workout_id,
|
|
"workout_name": workout_name,
|
|
},
|
|
"fields": {
|
|
"lat": float(gps_point.get("lat", 0)),
|
|
"lng": float(gps_point.get("lon", 0)),
|
|
},
|
|
"time": ts,
|
|
}
|
|
get_write_api().write(
|
|
bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=record
|
|
)
|
|
points_written += 1
|
|
|
|
logger.info("Wrote %d data points", points_written)
|
|
return jsonify({"status": "success", "points_written": points_written}), 200
|
|
|
|
except Exception as exc:
|
|
logger.exception("Error processing health data")
|
|
return jsonify({"error": "Processing failed", "detail": str(exc)}), 500
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logger.info("Apple Health data collector starting")
|
|
logger.info("InfluxDB: %s", INFLUXDB_URL)
|
|
logger.info("Bucket: %s", INFLUXDB_BUCKET)
|
|
app.run(host="0.0.0.0", port=5353)
|