add context to logging of supervisor work queue interaction (#601)

This commit is contained in:
bmc-msft
2021-02-27 20:17:04 -05:00
committed by GitHub
parent c1a2c9febb
commit 0f895d11c9
3 changed files with 56 additions and 26 deletions

View File

@ -123,7 +123,7 @@ impl Agent {
let msg = self.work_queue.poll().await?; let msg = self.work_queue.poll().await?;
let next = if let Some(msg) = msg { let next = if let Some(msg) = msg {
debug!("received work set message: {:?}", msg); info!("received work set message: {:?}", msg);
let can_schedule = self.coordinator.can_schedule(&msg.work_set).await?; let can_schedule = self.coordinator.can_schedule(&msg.work_set).await?;
@ -179,6 +179,7 @@ impl Agent {
state.into() state.into()
} }
} else { } else {
info!("no work available");
self.sleep().await; self.sleep().await;
state.into() state.into()
}; };

View File

@ -146,7 +146,10 @@ impl WorkQueue {
} }
async fn renew(&mut self) -> Result<()> { async fn renew(&mut self) -> Result<()> {
self.registration.renew().await?; self.registration
.renew()
.await
.context("unable to renew registration in workqueue")?;
let url = self.registration.dynamic_config.work_queue.clone(); let url = self.registration.dynamic_config.work_queue.clone();
self.queue = QueueClient::new(url); self.queue = QueueClient::new(url);
Ok(()) Ok(())
@ -159,25 +162,27 @@ impl WorkQueue {
// it was just due to a stale SAS URL. // it was just due to a stale SAS URL.
if let Err(err) = &msg { if let Err(err) = &msg {
if is_auth_error(err) { if is_auth_error(err) {
self.renew().await?; self.renew()
.await
.context("unable to renew registration in poll")?;
msg = self.queue.pop().await; msg = self.queue.pop().await;
} }
} }
// Now we've had a chance to ensure our SAS URL is fresh. For any other // Now we've had a chance to ensure our SAS URL is fresh. For any other
// error, including another auth error, bail. // error, including another auth error, bail.
let msg = msg?; let msg = msg.context("unable to check work queue")?;
if msg.is_none() { let result = match msg {
return Ok(None); Some(msg) => {
} let work_set =
serde_json::from_slice(msg.data()).context("unable to parse WorkSet")?;
let msg = msg.unwrap();
let work_set = serde_json::from_slice(msg.data())?;
let receipt = Receipt(msg.receipt); let receipt = Receipt(msg.receipt);
let msg = Message { receipt, work_set }; Some(Message { receipt, work_set })
}
Ok(Some(msg)) None => None,
};
Ok(result)
} }
pub async fn claim(&mut self, receipt: Receipt) -> Result<()> { pub async fn claim(&mut self, receipt: Receipt) -> Result<()> {
@ -189,8 +194,11 @@ impl WorkQueue {
// it was just due to a stale SAS URL. // it was just due to a stale SAS URL.
if let Err(err) = &result { if let Err(err) = &result {
if is_auth_error(err) { if is_auth_error(err) {
self.renew().await?; self.renew().await.context("unable to renew registration")?;
self.queue.delete(receipt).await?; self.queue
.delete(receipt)
.await
.context("unable to claim work from queue")?;
} }
} }

View File

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. // Copyright (c) Microsoft Corporation.
// Licensed under the MIT License. // Licensed under the MIT License.
use anyhow::Result; use anyhow::{bail, Context, Result};
use reqwest::{Client, Url}; use reqwest::{Client, Url};
use reqwest_retry::SendRetry; use reqwest_retry::SendRetry;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -47,8 +47,11 @@ impl QueueClient {
.post(self.messages_url()) .post(self.messages_url())
.body(body) .body(body)
.send_retry_default() .send_retry_default()
.await?; .await
let _ = r.error_for_status()?; .context("storage queue enqueue failed")?;
let _ = r
.error_for_status()
.context("storage queue enqueue failed with error")?;
Ok(()) Ok(())
} }
@ -57,15 +60,23 @@ impl QueueClient {
.http .http
.get(self.messages_url()) .get(self.messages_url())
.send_retry_default() .send_retry_default()
.await? .await
.error_for_status()?; .context("storage queue pop failed")?
let text = response.text().await?; .error_for_status()
.context("storage queue pop failed with error")?;
let text = response
.text()
.await
.context("unable to parse response text")?;
let msg = Message::parse(&text); let msg = Message::parse(&text);
let msg = if let Some(msg) = msg { let msg = if let Some(msg) = msg {
msg msg
} else { } else {
if is_empty_message(&text) {
return Ok(None); return Ok(None);
}
bail!("unable to parse response text body: {}", text);
}; };
let msg = if msg.data.is_empty() { None } else { Some(msg) }; let msg = if msg.data.is_empty() { None } else { Some(msg) };
@ -79,8 +90,10 @@ impl QueueClient {
self.http self.http
.delete(url) .delete(url)
.send_retry_default() .send_retry_default()
.await? .await
.error_for_status()?; .context("storage queue delete failed")?
.error_for_status()
.context("storage queue delete failed")?;
Ok(()) Ok(())
} }
@ -145,11 +158,19 @@ impl Message {
} }
pub fn get<'a, T: serde::de::Deserialize<'a>>(&'a self) -> Result<T> { pub fn get<'a, T: serde::de::Deserialize<'a>>(&'a self) -> Result<T> {
let data = serde_json::from_slice(&self.data)?; let data =
serde_json::from_slice(&self.data).context("get storage queue message failed")?;
Ok(data) Ok(data)
} }
} }
fn is_empty_message(text: &str) -> bool {
regex::Regex::new(r".*<QueueMessagesList>[\s\n\r]*</QueueMessagesList>")
.unwrap()
.is_match(&text)
|| text.contains(r"<QueueMessagesList />")
}
fn parse_message_id(text: &str) -> Option<Uuid> { fn parse_message_id(text: &str) -> Option<Uuid> {
let pat = r"<MessageId>(.*)</MessageId>"; let pat = r"<MessageId>(.*)</MessageId>";
let re = regex::Regex::new(pat).unwrap(); let re = regex::Regex::new(pat).unwrap();