From 0f895d11c938a49cbb00f795d6def8c27996b4d6 Mon Sep 17 00:00:00 2001 From: bmc-msft <41130664+bmc-msft@users.noreply.github.com> Date: Sat, 27 Feb 2021 20:17:04 -0500 Subject: [PATCH] add context to logging of supervisor work queue interaction (#601) --- src/agent/onefuzz-supervisor/src/agent.rs | 3 +- src/agent/onefuzz-supervisor/src/work.rs | 38 ++++++++++++--------- src/agent/storage-queue/src/lib.rs | 41 +++++++++++++++++------ 3 files changed, 56 insertions(+), 26 deletions(-) diff --git a/src/agent/onefuzz-supervisor/src/agent.rs b/src/agent/onefuzz-supervisor/src/agent.rs index adb5abcb2..7de40c43f 100644 --- a/src/agent/onefuzz-supervisor/src/agent.rs +++ b/src/agent/onefuzz-supervisor/src/agent.rs @@ -123,7 +123,7 @@ impl Agent { let msg = self.work_queue.poll().await?; let next = if let Some(msg) = msg { - debug!("received work set message: {:?}", msg); + info!("received work set message: {:?}", msg); let can_schedule = self.coordinator.can_schedule(&msg.work_set).await?; @@ -179,6 +179,7 @@ impl Agent { state.into() } } else { + info!("no work available"); self.sleep().await; state.into() }; diff --git a/src/agent/onefuzz-supervisor/src/work.rs b/src/agent/onefuzz-supervisor/src/work.rs index 3f03dc244..cacf3b2f0 100644 --- a/src/agent/onefuzz-supervisor/src/work.rs +++ b/src/agent/onefuzz-supervisor/src/work.rs @@ -146,7 +146,10 @@ impl WorkQueue { } async fn renew(&mut self) -> Result<()> { - self.registration.renew().await?; + self.registration + .renew() + .await + .context("unable to renew registration in workqueue")?; let url = self.registration.dynamic_config.work_queue.clone(); self.queue = QueueClient::new(url); Ok(()) @@ -159,25 +162,27 @@ impl WorkQueue { // it was just due to a stale SAS URL. if let Err(err) = &msg { if is_auth_error(err) { - self.renew().await?; + self.renew() + .await + .context("unable to renew registration in poll")?; msg = self.queue.pop().await; } } // Now we've had a chance to ensure our SAS URL is fresh. For any other // error, including another auth error, bail. - let msg = msg?; + let msg = msg.context("unable to check work queue")?; - if msg.is_none() { - return Ok(None); - } - - let msg = msg.unwrap(); - let work_set = serde_json::from_slice(msg.data())?; - let receipt = Receipt(msg.receipt); - let msg = Message { receipt, work_set }; - - Ok(Some(msg)) + let result = match msg { + Some(msg) => { + let work_set = + serde_json::from_slice(msg.data()).context("unable to parse WorkSet")?; + let receipt = Receipt(msg.receipt); + Some(Message { receipt, work_set }) + } + None => None, + }; + Ok(result) } pub async fn claim(&mut self, receipt: Receipt) -> Result<()> { @@ -189,8 +194,11 @@ impl WorkQueue { // it was just due to a stale SAS URL. if let Err(err) = &result { if is_auth_error(err) { - self.renew().await?; - self.queue.delete(receipt).await?; + self.renew().await.context("unable to renew registration")?; + self.queue + .delete(receipt) + .await + .context("unable to claim work from queue")?; } } diff --git a/src/agent/storage-queue/src/lib.rs b/src/agent/storage-queue/src/lib.rs index 4f2c40e6f..facc2b426 100644 --- a/src/agent/storage-queue/src/lib.rs +++ b/src/agent/storage-queue/src/lib.rs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -use anyhow::Result; +use anyhow::{bail, Context, Result}; use reqwest::{Client, Url}; use reqwest_retry::SendRetry; use serde::{Deserialize, Serialize}; @@ -47,8 +47,11 @@ impl QueueClient { .post(self.messages_url()) .body(body) .send_retry_default() - .await?; - let _ = r.error_for_status()?; + .await + .context("storage queue enqueue failed")?; + let _ = r + .error_for_status() + .context("storage queue enqueue failed with error")?; Ok(()) } @@ -57,15 +60,23 @@ impl QueueClient { .http .get(self.messages_url()) .send_retry_default() - .await? - .error_for_status()?; - let text = response.text().await?; + .await + .context("storage queue pop failed")? + .error_for_status() + .context("storage queue pop failed with error")?; + let text = response + .text() + .await + .context("unable to parse response text")?; let msg = Message::parse(&text); let msg = if let Some(msg) = msg { msg } else { - return Ok(None); + if is_empty_message(&text) { + return Ok(None); + } + bail!("unable to parse response text body: {}", text); }; let msg = if msg.data.is_empty() { None } else { Some(msg) }; @@ -79,8 +90,10 @@ impl QueueClient { self.http .delete(url) .send_retry_default() - .await? - .error_for_status()?; + .await + .context("storage queue delete failed")? + .error_for_status() + .context("storage queue delete failed")?; Ok(()) } @@ -145,11 +158,19 @@ impl Message { } pub fn get<'a, T: serde::de::Deserialize<'a>>(&'a self) -> Result { - let data = serde_json::from_slice(&self.data)?; + let data = + serde_json::from_slice(&self.data).context("get storage queue message failed")?; Ok(data) } } +fn is_empty_message(text: &str) -> bool { + regex::Regex::new(r".*[\s\n\r]*") + .unwrap() + .is_match(&text) + || text.contains(r"") +} + fn parse_message_id(text: &str) -> Option { let pat = r"(.*)"; let re = regex::Regex::new(pat).unwrap();