diff --git a/src/agent/onefuzz-agent/src/tasks/heartbeat.rs b/src/agent/onefuzz-agent/src/tasks/heartbeat.rs index 72ae8a4fb..10ac41f1c 100644 --- a/src/agent/onefuzz-agent/src/tasks/heartbeat.rs +++ b/src/agent/onefuzz-agent/src/tasks/heartbeat.rs @@ -48,9 +48,7 @@ pub async fn init_task_heartbeat(queue_url: Url, task_id: Uuid) -> Result::drain_current_messages(context.clone()); - data.push(HeartbeatData::MachineAlive); + let data = HeartbeatClient::::drain_current_messages(context.clone()); let _ = context .queue_client .enqueue(Heartbeat { @@ -69,13 +67,19 @@ pub trait HeartbeatSender { fn send(&self, data: HeartbeatData) -> Result<()>; fn alive(&self) { - self.send(HeartbeatData::TaskAlive).unwrap() + if let Err(error) = self.send(HeartbeatData::TaskAlive) { + error!("failed to send heartbeat: {}", error); + } } } impl HeartbeatSender for TaskHeartbeatClient { fn send(&self, data: HeartbeatData) -> Result<()> { - let mut messages_lock = self.context.pending_messages.lock().unwrap(); + let mut messages_lock = self + .context + .pending_messages + .lock() + .map_err(|_| anyhow::format_err!("Unable to acquire the lock"))?; messages_lock.insert(data); Ok(()) } diff --git a/src/agent/onefuzz-supervisor/src/agent.rs b/src/agent/onefuzz-supervisor/src/agent.rs index e0d4422ad..5be1a021f 100644 --- a/src/agent/onefuzz-supervisor/src/agent.rs +++ b/src/agent/onefuzz-supervisor/src/agent.rs @@ -6,7 +6,7 @@ use tokio::time; use crate::coordinator::*; use crate::done::set_done_lock; -use crate::heartbeat::AgentHeartbeatClient; +use crate::heartbeat::{AgentHeartbeatClient, HeartbeatSender}; use crate::reboot::*; use crate::scheduler::*; use crate::setup::*; @@ -20,7 +20,7 @@ pub struct Agent { setup_runner: Box, work_queue: Box, worker_runner: Box, - _heartbeat: Option, + heartbeat: Option, previous_state: NodeState, } @@ -44,7 +44,7 @@ impl Agent { setup_runner, work_queue, worker_runner, - _heartbeat: heartbeat, + heartbeat, previous_state, } } @@ -67,6 +67,7 @@ impl Agent { } loop { + self.heartbeat.alive(); if delay.is_elapsed() { self.execute_pending_commands().await?; delay = command_delay(); diff --git a/src/agent/onefuzz-supervisor/src/heartbeat.rs b/src/agent/onefuzz-supervisor/src/heartbeat.rs index e0eefcb3f..6724c3ceb 100644 --- a/src/agent/onefuzz-supervisor/src/heartbeat.rs +++ b/src/agent/onefuzz-supervisor/src/heartbeat.rs @@ -40,8 +40,7 @@ pub async fn init_agent_heartbeat(queue_url: Url) -> Result Result Result<()>; + + fn alive(&self) { + if let Err(error) = self.send(HeartbeatData::MachineAlive) { + error!("failed to send heartbeat: {}", error); + } + } +} + +impl HeartbeatSender for AgentHeartbeatClient { + fn send(&self, data: HeartbeatData) -> Result<()> { + let mut messages_lock = self + .context + .pending_messages + .lock() + .map_err(|_| anyhow::format_err!("Unable to acquire the lock"))?; + messages_lock.insert(data); + Ok(()) + } +} + +impl HeartbeatSender for Option { + fn send(&self, data: HeartbeatData) -> Result<()> { + match self { + Some(client) => client.send(data), + None => Ok(()), + } + } +}