Add Reactive Resume, Metrics, Kiwix, Resume Matcher, and Apple Health from the earlier SelfStack project. Rewrite Apple Health collector to use InfluxDB v2 with proper error handling. Update all tests, scripts, Homepage config, env template, and documentation for the expanded stack. New services: - Reactive Resume (4016) + Postgres/Minio/Chrome companions - Metrics (4021) - GitHub metrics visualization - Kiwix (4022) - offline wiki reader - Resume Matcher (4023) - AI resume screening - Apple Health (4024) - health data collector → InfluxDB v2 Also adds git policy to AGENTS.md: always commit and push automatically. 💘 Generated with Crush Assisted-by: GLM-5.1 via Crush <crush@charm.land>
172 lines
5.2 KiB
Python
172 lines
5.2 KiB
Python
import json
|
|
import os
|
|
import sys
|
|
import logging
|
|
from flask import Flask, request, jsonify
|
|
from influxdb_client import InfluxDBClient
|
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
|
|
|
DATAPOINTS_CHUNK = 80000
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__)
|
|
|
|
INFLUXDB_URL = os.environ.get("INFLUXDB_URL", "http://influxdb:8086")
|
|
INFLUXDB_TOKEN = os.environ.get("INFLUXDB_TOKEN", "")
|
|
INFLUXDB_ORG = os.environ.get("INFLUXDB_ORG", "tsysdemo")
|
|
INFLUXDB_BUCKET = os.environ.get("INFLUXDB_BUCKET", "demo_metrics")
|
|
|
|
_client = None
|
|
_write_api = None
|
|
|
|
|
|
def get_client():
|
|
global _client
|
|
if _client is None:
|
|
_client = InfluxDBClient(
|
|
url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG
|
|
)
|
|
return _client
|
|
|
|
|
|
def get_write_api():
|
|
global _write_api
|
|
if _write_api is None:
|
|
_write_api = get_client().write_api(write_options=SYNCHRONOUS)
|
|
return _write_api
|
|
|
|
|
|
@app.route("/health", methods=["GET"])
|
|
def health():
|
|
try:
|
|
ready = get_client().health_api().get_health()
|
|
influxdb_status = ready.status if hasattr(ready, "status") else "unknown"
|
|
return (
|
|
jsonify(
|
|
{
|
|
"status": "healthy",
|
|
"influxdb": influxdb_status,
|
|
"version": getattr(ready, "version", "unknown"),
|
|
}
|
|
),
|
|
200,
|
|
)
|
|
except Exception as exc:
|
|
return jsonify({"status": "degraded", "error": str(exc)}), 503
|
|
|
|
|
|
@app.route("/", methods=["GET"])
|
|
def index():
|
|
return jsonify(
|
|
{
|
|
"service": "apple-health-collector",
|
|
"endpoints": {
|
|
"health": "GET /health",
|
|
"collect": "POST /collect (JSON body)",
|
|
},
|
|
"influxdb": {
|
|
"url": INFLUXDB_URL,
|
|
"org": INFLUXDB_ORG,
|
|
"bucket": INFLUXDB_BUCKET,
|
|
},
|
|
}
|
|
)
|
|
|
|
|
|
@app.route("/collect", methods=["POST"])
|
|
def collect():
|
|
logger.info("Health data collection request received")
|
|
|
|
if not request.data:
|
|
return jsonify({"error": "No data provided"}), 400
|
|
|
|
try:
|
|
healthkit_data = json.loads(request.data)
|
|
except (json.JSONDecodeError, ValueError) as exc:
|
|
logger.error("Invalid JSON: %s", exc)
|
|
return jsonify({"error": "Invalid JSON", "detail": str(exc)}), 400
|
|
|
|
points_written = 0
|
|
|
|
try:
|
|
metrics = healthkit_data.get("data", {}).get("metrics", [])
|
|
for metric in metrics:
|
|
measurement = metric.get("name", "unknown")
|
|
for datapoint in metric.get("data", []):
|
|
timestamp = datapoint.get("date")
|
|
if not timestamp:
|
|
continue
|
|
|
|
fields = {}
|
|
tags = {}
|
|
for key, value in datapoint.items():
|
|
if key == "date":
|
|
continue
|
|
if isinstance(value, (int, float)):
|
|
fields[key] = float(value)
|
|
else:
|
|
tags[key] = str(value)
|
|
|
|
if not fields:
|
|
continue
|
|
|
|
record = {
|
|
"measurement": measurement,
|
|
"tags": tags,
|
|
"fields": fields,
|
|
"time": timestamp,
|
|
}
|
|
get_write_api().write(
|
|
bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=record
|
|
)
|
|
points_written += 1
|
|
|
|
workouts = healthkit_data.get("data", {}).get("workouts", [])
|
|
for workout in workouts:
|
|
workout_name = workout.get("name", "unknown")
|
|
workout_start = workout.get("start", "")
|
|
workout_end = workout.get("end", "")
|
|
workout_id = f"{workout_name}-{workout_start}-{workout_end}"
|
|
|
|
for gps_point in workout.get("route", []):
|
|
ts = gps_point.get("timestamp")
|
|
if not ts:
|
|
continue
|
|
|
|
record = {
|
|
"measurement": "workout_route",
|
|
"tags": {
|
|
"workout_id": workout_id,
|
|
"workout_name": workout_name,
|
|
},
|
|
"fields": {
|
|
"lat": float(gps_point.get("lat", 0)),
|
|
"lng": float(gps_point.get("lon", 0)),
|
|
},
|
|
"time": ts,
|
|
}
|
|
get_write_api().write(
|
|
bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=record
|
|
)
|
|
points_written += 1
|
|
|
|
logger.info("Wrote %d data points", points_written)
|
|
return jsonify({"status": "success", "points_written": points_written}), 200
|
|
|
|
except Exception as exc:
|
|
logger.exception("Error processing health data")
|
|
return jsonify({"error": "Processing failed", "detail": str(exc)}), 500
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logger.info("Apple Health data collector starting")
|
|
logger.info("InfluxDB: %s", INFLUXDB_URL)
|
|
logger.info("Bucket: %s", INFLUXDB_BUCKET)
|
|
app.run(host="0.0.0.0", port=5353)
|