Update Task Heartbeat to include Job_id (#594)

This commit is contained in:
nharper285 2021-02-26 10:36:10 -08:00 committed by GitHub
parent 6a049db3a3
commit 06f45f338c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 10 additions and 3 deletions

View File

@ -47,7 +47,7 @@ impl CommonConfig {
pub async fn init_heartbeat(&self) -> Result<Option<TaskHeartbeatClient>> {
match &self.heartbeat_queue {
Some(url) => {
let hb = init_task_heartbeat(url.clone(), self.task_id).await?;
let hb = init_task_heartbeat(url.clone(), self.task_id, self.job_id).await?;
Ok(Some(hb))
}
None => Ok(None),

View File

@ -26,18 +26,24 @@ struct Heartbeat {
#[derive(Clone)]
pub struct TaskContext {
task_id: Uuid,
job_id: Uuid,
machine_id: Uuid,
machine_name: String,
}
pub type TaskHeartbeatClient = HeartbeatClient<TaskContext, HeartbeatData>;
pub async fn init_task_heartbeat(queue_url: Url, task_id: Uuid) -> Result<TaskHeartbeatClient> {
pub async fn init_task_heartbeat(
queue_url: Url,
task_id: Uuid,
job_id: Uuid,
) -> Result<TaskHeartbeatClient> {
let machine_id = get_machine_id().await?;
let machine_name = get_machine_name().await?;
let hb = HeartbeatClient::init_heartbeat(
TaskContext {
task_id,
job_id,
machine_id,
machine_name,
},

View File

@ -30,7 +30,7 @@ def main(msg: func.QueueMessage, dashboard: func.Out[str]) -> None:
task.heartbeat = datetime.utcnow()
task.save()
except ValidationError:
logging.error("invalid task heartbat: %s", raw)
logging.error("invalid task heartbeat: %s", raw)
events = get_events()
if events:

View File

@ -516,6 +516,7 @@ class Job(BaseModel):
class TaskHeartbeatEntry(BaseModel):
task_id: UUID
job_id: Optional[UUID]
machine_id: UUID
data: List[Dict[str, HeartbeatType]]