Make supervisor heartbeat only fire on main loop progress (#283)

This commit is contained in:
Cheick Keita 2020-11-11 15:30:02 -08:00 committed by GitHub
parent ca209eb543
commit c56f72b37c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 45 additions and 10 deletions

View File

@ -48,9 +48,7 @@ pub async fn init_task_heartbeat(queue_url: Url, task_id: Uuid) -> Result<TaskHe
let machine_id = context.state.machine_id;
let machine_name = context.state.machine_name.clone();
let mut data =
HeartbeatClient::<TaskContext, _>::drain_current_messages(context.clone());
data.push(HeartbeatData::MachineAlive);
let data = HeartbeatClient::<TaskContext, _>::drain_current_messages(context.clone());
let _ = context
.queue_client
.enqueue(Heartbeat {
@ -69,13 +67,19 @@ pub trait HeartbeatSender {
fn send(&self, data: HeartbeatData) -> Result<()>;
fn alive(&self) {
self.send(HeartbeatData::TaskAlive).unwrap()
if let Err(error) = self.send(HeartbeatData::TaskAlive) {
error!("failed to send heartbeat: {}", error);
}
}
}
impl HeartbeatSender for TaskHeartbeatClient {
fn send(&self, data: HeartbeatData) -> Result<()> {
let mut messages_lock = self.context.pending_messages.lock().unwrap();
let mut messages_lock = self
.context
.pending_messages
.lock()
.map_err(|_| anyhow::format_err!("Unable to acquire the lock"))?;
messages_lock.insert(data);
Ok(())
}

View File

@ -6,7 +6,7 @@ use tokio::time;
use crate::coordinator::*;
use crate::done::set_done_lock;
use crate::heartbeat::AgentHeartbeatClient;
use crate::heartbeat::{AgentHeartbeatClient, HeartbeatSender};
use crate::reboot::*;
use crate::scheduler::*;
use crate::setup::*;
@ -20,7 +20,7 @@ pub struct Agent {
setup_runner: Box<dyn ISetupRunner>,
work_queue: Box<dyn IWorkQueue>,
worker_runner: Box<dyn IWorkerRunner>,
_heartbeat: Option<AgentHeartbeatClient>,
heartbeat: Option<AgentHeartbeatClient>,
previous_state: NodeState,
}
@ -44,7 +44,7 @@ impl Agent {
setup_runner,
work_queue,
worker_runner,
_heartbeat: heartbeat,
heartbeat,
previous_state,
}
}
@ -67,6 +67,7 @@ impl Agent {
}
loop {
self.heartbeat.alive();
if delay.is_elapsed() {
self.execute_pending_commands().await?;
delay = command_delay();

View File

@ -40,8 +40,7 @@ pub async fn init_agent_heartbeat(queue_url: Url) -> Result<AgentHeartbeatClient
queue_url,
None,
|context| async move {
let mut data = HeartbeatClient::drain_current_messages(context.clone());
data.push(HeartbeatData::MachineAlive);
let data = HeartbeatClient::drain_current_messages(context.clone());
let _ = context
.queue_client
.enqueue(Heartbeat {
@ -54,3 +53,34 @@ pub async fn init_agent_heartbeat(queue_url: Url) -> Result<AgentHeartbeatClient
);
Ok(hb)
}
pub trait HeartbeatSender {
fn send(&self, data: HeartbeatData) -> Result<()>;
fn alive(&self) {
if let Err(error) = self.send(HeartbeatData::MachineAlive) {
error!("failed to send heartbeat: {}", error);
}
}
}
impl HeartbeatSender for AgentHeartbeatClient {
fn send(&self, data: HeartbeatData) -> Result<()> {
let mut messages_lock = self
.context
.pending_messages
.lock()
.map_err(|_| anyhow::format_err!("Unable to acquire the lock"))?;
messages_lock.insert(data);
Ok(())
}
}
impl HeartbeatSender for Option<AgentHeartbeatClient> {
fn send(&self, data: HeartbeatData) -> Result<()> {
match self {
Some(client) => client.send(data),
None => Ok(()),
}
}
}