Add Prometheus metrics for gw backend + API methods.

This commit is contained in:
Orne Brocaar 2022-07-05 14:00:57 +01:00
parent 9eeee3759c
commit 7baedd1bf9
10 changed files with 345 additions and 4 deletions

48
Cargo.lock generated
View File

@ -839,7 +839,9 @@ dependencies = [
"pbjson-types",
"pbkdf2",
"petgraph 0.6.0",
"pin-project",
"pq-sys",
"prometheus-client",
"prost",
"prost-types",
"r2d2",
@ -1240,6 +1242,12 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0"
[[package]]
name = "dtoa"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5caaa75cbd2b960ff1e5392d2cfb1f44717fffe12fc1f32b7b5d1267f99732a6"
[[package]]
name = "either"
version = "1.6.1"
@ -2418,6 +2426,15 @@ version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa"
[[package]]
name = "owning_ref"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ff55baddef9e4ad00f88b6c743a2a8062d4c6ade126c2a528644b8e444d52ce"
dependencies = [
"stable_deref_trait",
]
[[package]]
name = "paho-mqtt"
version = "0.9.1"
@ -2725,6 +2742,29 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "prometheus-client"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac1abe0255c04d15f571427a2d1e00099016506cf3297b53853acd2b7eb87825"
dependencies = [
"dtoa 1.0.2",
"itoa 1.0.1",
"owning_ref",
"prometheus-client-derive-text-encode",
]
[[package]]
name = "prometheus-client-derive-text-encode"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8e12d01b9d66ad9eb4529c57666b6263fc1993cb30261d83ead658fdd932652"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost"
version = "0.10.0"
@ -2870,7 +2910,7 @@ dependencies = [
"async-trait",
"combine",
"crc16",
"dtoa",
"dtoa 0.4.8",
"itoa 0.4.8",
"percent-encoding",
"r2d2",
@ -3493,6 +3533,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "string_cache"
version = "0.8.2"

View File

@ -103,6 +103,8 @@ base64 = "0.13"
async-recursion = "1.0"
regex = "1"
petgraph = "0.6"
prometheus-client = "0.16"
pin-project = "1.0"
# Development and testing
[dev-dependencies]

View File

@ -1,6 +1,7 @@
use std::convert::Infallible;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
@ -8,9 +9,15 @@ use std::{
use anyhow::Result;
use futures::future::{self, Either, TryFutureExt};
use hyper::{service::make_service_fn, Server};
use pin_project::pin_project;
use prometheus_client::encoding::text::Encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::Histogram;
use rust_embed::RustEmbed;
use tokio::try_join;
use tonic::transport::Server as TonicServer;
use tonic::Code;
use tonic_reflection::server::Builder as TonicReflectionBuilder;
use tower::{Service, ServiceBuilder};
use tower_http::trace::TraceLayer;
@ -29,6 +36,7 @@ use chirpstack_api::api::user_service_server::UserServiceServer;
use super::config;
use crate::api::auth::validator;
use crate::monitoring::prometheus;
pub mod application;
pub mod auth;
@ -40,11 +48,40 @@ pub mod error;
pub mod gateway;
pub mod helpers;
pub mod internal;
pub mod monitoring;
pub mod multicast;
pub mod oidc;
pub mod tenant;
pub mod user;
lazy_static! {
static ref GRPC_COUNTER: Family<GrpcLabels, Counter> = {
let counter = Family::<GrpcLabels, Counter>::default();
prometheus::register(
"api_requests_handled",
"Number of API requests handled by service, method and status code",
Box::new(counter.clone()),
);
counter
};
static ref GRPC_HISTOGRAM: Family<GrpcLabels, Histogram> = {
let histogram = Family::<GrpcLabels, Histogram>::new_with_constructor(|| {
Histogram::new(
[
0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
]
.into_iter(),
)
});
prometheus::register(
"api_requests_handled_seconds",
"Duration of API requests handled by service, method and status code",
Box::new(histogram.clone()),
);
histogram
};
}
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
#[derive(RustEmbed)]
@ -135,6 +172,7 @@ pub async fn setup() -> Result<()> {
.on_request(OnRequest {})
.on_response(OnResponse {}),
)
.layer(PrometheusLogger {})
.service(tonic_service);
// HTTP service
@ -181,9 +219,10 @@ pub async fn setup() -> Result<()> {
});
let backend_handle = tokio::spawn(backend::setup());
let monitoring_handle = tokio::spawn(monitoring::setup());
let api_handle = tokio::spawn(Server::bind(&addr).serve(service));
let _ = try_join!(api_handle, backend_handle)?;
let _ = try_join!(api_handle, backend_handle, monitoring_handle)?;
Ok(())
}
@ -269,3 +308,100 @@ impl<B> tower_http::trace::OnResponse<B> for OnResponse {
tracing::info!(status = resp.status().as_str(), latency = ?latency, "Finished processing request");
}
}
#[derive(Clone, Hash, PartialEq, Eq, Encode)]
struct GrpcLabels {
service: String,
method: String,
status_code: String,
}
struct PrometheusLogger {}
impl<S> tower::Layer<S> for PrometheusLogger {
type Service = PrometheusLoggerService<S>;
fn layer(&self, service: S) -> Self::Service {
PrometheusLoggerService { inner: service }
}
}
#[derive(Debug, Clone)]
struct PrometheusLoggerService<S> {
inner: S,
}
impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for PrometheusLoggerService<S>
where
S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
ReqBody: http_body::Body,
ResBody: http_body::Body,
{
type Response = S::Response;
type Error = S::Error;
type Future = PrometheusLoggerResponseFuture<S::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, request: http::Request<ReqBody>) -> Self::Future {
let uri = request.uri().path().to_string();
let uri_parts: Vec<&str> = uri.split('/').collect();
let response_future = self.inner.call(request);
let start = Instant::now();
PrometheusLoggerResponseFuture {
response_future,
start,
service: uri_parts.get(1).map(|v| v.to_string()).unwrap_or_default(),
method: uri_parts.get(2).map(|v| v.to_string()).unwrap_or_default(),
}
}
}
#[pin_project]
struct PrometheusLoggerResponseFuture<F> {
#[pin]
response_future: F,
start: Instant,
service: String,
method: String,
}
impl<F, ResBody, Error> Future for PrometheusLoggerResponseFuture<F>
where
F: Future<Output = Result<http::Response<ResBody>, Error>>,
ResBody: http_body::Body,
{
type Output = Result<http::Response<ResBody>, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.response_future.poll(cx) {
Poll::Ready(result) => {
if let Ok(response) = &result {
let status_code: i32 = match response.headers().get("grpc-status") {
None => 0,
Some(v) => match v.to_str() {
Ok(s) => s.parse().unwrap_or_default(),
Err(_) => 2,
},
};
let status_code = Code::from_i32(status_code);
let labels = GrpcLabels {
service: this.service.clone(),
method: this.method.clone(),
status_code: format!("{:?}", status_code),
};
GRPC_COUNTER.get_or_create(&labels).inc();
GRPC_HISTOGRAM
.get_or_create(&labels)
.observe(this.start.elapsed().as_secs_f64());
}
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
}
}

View File

@ -0,0 +1,67 @@
use std::convert::Infallible;
use std::net::SocketAddr;
use anyhow::{Context, Result};
use diesel::RunQueryDsl;
use tokio::task;
use tracing::info;
use warp::{http::Response, http::StatusCode, Filter};
use crate::config;
use crate::monitoring::prometheus;
use crate::storage::{get_db_conn, get_redis_conn};
pub async fn setup() {
let conf = config::get();
if conf.monitoring.bind.is_empty() {
return;
}
let addr: SocketAddr = conf.monitoring.bind.parse().unwrap();
info!(bind = %conf.monitoring.bind, "Setting up monitoring endpoint");
let prom_endpoint = warp::get()
.and(warp::path!("metrics"))
.and_then(prometheus_handler);
let health_endpoint = warp::get()
.and(warp::path!("health"))
.and_then(health_handler);
let routes = prom_endpoint.or(health_endpoint);
warp::serve(routes).run(addr).await;
}
async fn prometheus_handler() -> Result<impl warp::Reply, Infallible> {
let body = prometheus::encode_to_string().unwrap_or_default();
Ok(Response::builder().body(body))
}
async fn health_handler() -> Result<impl warp::Reply, Infallible> {
if let Err(e) = _health_handler().await {
return Ok(warp::reply::with_status(
e.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
));
}
Ok(warp::reply::with_status("OK".to_string(), StatusCode::OK))
}
async fn _health_handler() -> Result<()> {
task::spawn_blocking(move || -> Result<()> {
let mut r = get_redis_conn()?;
if !r.check_connection() {
return Err(anyhow!("Redis connection error"));
}
let c = get_db_conn()?;
diesel::sql_query("select 1")
.execute(&c)
.context("PostgreSQL connection error")?;
Ok(())
})
.await?
}

View File

@ -203,6 +203,7 @@ impl Default for Scheduler {
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct Monitoring {
pub bind: String,
pub meta_log_max_history: usize,
pub gateway_frame_log_max_history: usize,
pub device_frame_log_max_history: usize,
@ -221,6 +222,7 @@ pub struct Monitoring {
impl Default for Monitoring {
fn default() -> Self {
Monitoring {
bind: "".to_string(),
meta_log_max_history: 10,
gateway_frame_log_max_history: 10,
device_frame_log_max_history: 10,

View File

@ -5,17 +5,52 @@ use async_trait::async_trait;
use futures::stream::StreamExt;
use handlebars::Handlebars;
use paho_mqtt as mqtt;
use prometheus_client::encoding::text::Encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prost::Message;
use serde::Serialize;
use tracing::{error, info, trace};
use crate::config::GatewayBackendMqtt;
use crate::monitoring::prometheus;
use crate::{downlink, uplink};
use lrwn::region::CommonName;
use lrwn::EUI64;
use super::GatewayBackend;
#[derive(Clone, Hash, PartialEq, Eq, Encode)]
struct EventLabels {
event: String,
}
#[derive(Clone, Hash, PartialEq, Eq, Encode)]
struct CommandLabels {
command: String,
}
lazy_static! {
static ref EVENT_COUNTER: Family<EventLabels, Counter> = {
let counter = Family::<EventLabels, Counter>::default();
prometheus::register(
"gateway_backend_mqtt_events",
"Number of events received",
Box::new(counter.clone()),
);
counter
};
static ref COMMAND_COUNTER: Family<CommandLabels, Counter> = {
let counter = Family::<CommandLabels, Counter>::default();
prometheus::register(
"gateway_backend_mqtt_commands",
"Number of commands sent",
Box::new(counter.clone()),
);
counter
};
}
struct MqttContext {
region_name: String,
region_common_name: CommonName,
@ -152,6 +187,11 @@ impl<'a> MqttBackend<'a> {
#[async_trait]
impl GatewayBackend for MqttBackend<'_> {
async fn send_downlink(&self, df: &chirpstack_api::gw::DownlinkFrame) -> Result<()> {
COMMAND_COUNTER
.get_or_create(&CommandLabels {
command: "down".to_string(),
})
.inc();
let topic = self.get_command_topic(&df.gateway_id, "down")?;
let mut df = df.clone();
df.v4_migrate();
@ -169,6 +209,11 @@ impl GatewayBackend for MqttBackend<'_> {
&self,
gw_conf: &chirpstack_api::gw::GatewayConfiguration,
) -> Result<()> {
COMMAND_COUNTER
.get_or_create(&CommandLabels {
command: "config".to_string(),
})
.inc();
let gateway_id = EUI64::from_slice(&gw_conf.gateway_id)?;
let topic = self.get_command_topic(&gateway_id.to_string(), "config")?;
let b = gw_conf.encode_to_vec();
@ -196,6 +241,11 @@ async fn message_callback(region_name: &str, region_common_name: CommonName, msg
let err = || -> Result<()> {
if topic.ends_with("/up") {
EVENT_COUNTER
.get_or_create(&EventLabels {
event: "up".to_string(),
})
.inc();
let mut event = chirpstack_api::gw::UplinkFrame::decode(&mut Cursor::new(b))?;
event.v4_migrate();
@ -206,6 +256,11 @@ async fn message_callback(region_name: &str, region_common_name: CommonName, msg
tokio::spawn(uplink::deduplicate_uplink(event));
} else if topic.ends_with("/stats") {
EVENT_COUNTER
.get_or_create(&EventLabels {
event: "stats".to_string(),
})
.inc();
let mut event = chirpstack_api::gw::GatewayStats::decode(&mut Cursor::new(b))?;
event
.meta_data
@ -216,6 +271,11 @@ async fn message_callback(region_name: &str, region_common_name: CommonName, msg
);
tokio::spawn(uplink::stats::Stats::handle(event));
} else if topic.ends_with("/ack") {
EVENT_COUNTER
.get_or_create(&EventLabels {
event: "ack".to_string(),
})
.inc();
let mut event = chirpstack_api::gw::DownlinkTxAck::decode(&mut Cursor::new(b))?;
event.v4_migrate();
tokio::spawn(downlink::tx_ack::TxAck::handle(event));

View File

@ -209,7 +209,7 @@ impl Integration {
info!("Applying schema migrations");
embedded_migrations::run(&db_conn).context("Run migrations error")?;
Ok(Integration { pg_pool: pg_pool })
Ok(Integration { pg_pool })
}
}

View File

@ -34,6 +34,7 @@ mod gpstime;
mod integration;
mod maccommand;
mod metalog;
mod monitoring;
mod region;
mod sensitivity;
mod storage;

View File

@ -0,0 +1 @@
pub mod prometheus;

View File

@ -0,0 +1,26 @@
use std::sync::RwLock;
use anyhow::Result;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
lazy_static! {
static ref REGISTRY: RwLock<Registry> = RwLock::new(<Registry>::default());
}
pub fn encode_to_string() -> Result<String> {
let registry_r = REGISTRY.read().unwrap();
let mut buffer = vec![];
encode(&mut buffer, &registry_r)?;
Ok(String::from_utf8(buffer)?)
}
pub fn register(
name: &str,
help: &str,
metric: Box<dyn prometheus_client::encoding::text::SendSyncEncodeMetric>,
) {
let mut registry_w = REGISTRY.write().unwrap();
registry_w.register(name, help, metric)
}