From b97093735a999775db6fdfcf17ac79ff43f54ad6 Mon Sep 17 00:00:00 2001 From: bmc-msft <41130664+bmc-msft@users.noreply.github.com> Date: Tue, 2 Mar 2021 14:02:10 -0500 Subject: [PATCH] fix agent `retry` on connection level failures (#623) In debugging the connection retry issues, I dug into this more. Apparently, some of hyper's connection errors are not mapped to std::io::Error, rendering the existing downcast impl ineffective. As such, this PR makes the following updates: 1. Any request that fails for what `reqwest` calls a `connection` error is considered transient. 2. Updates the retry notify code to use our `warn` macro such that the events show up in application insights. 3. Updates the unit test to demonstrate that failures by trying to connect to `http://localhost:81/`, which shouldn't be listening on any system. 4. Adds a simple unit test to verify with send_retry_default, connections to https://www.microsoft.com work Fixes #263 --- src/agent/Cargo.lock | 2 +- src/agent/reqwest-retry/Cargo.toml | 4 +- src/agent/reqwest-retry/src/lib.rs | 84 +++++++++++++++++++----------- 3 files changed, 58 insertions(+), 32 deletions(-) diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index e88d09a83..e417d2aca 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -2124,7 +2124,7 @@ dependencies = [ "anyhow", "async-trait", "backoff", - "log", + "onefuzz-telemetry", "reqwest", "tokio", ] diff --git a/src/agent/reqwest-retry/Cargo.toml b/src/agent/reqwest-retry/Cargo.toml index 53287f0d8..1f90de7d7 100644 --- a/src/agent/reqwest-retry/Cargo.toml +++ b/src/agent/reqwest-retry/Cargo.toml @@ -10,7 +10,7 @@ anyhow = "1.0" async-trait = "0.1" reqwest = { version = "0.10", features = ["json", "stream"] } backoff = { version = "0.2", features = ["async-std"] } -log = "0.4" +onefuzz-telemetry = { path = "../onefuzz-telemetry" } [dev-dependencies] -tokio = { version = "0.2" } \ No newline at end of file +tokio = { version = "0.2" , features=["macros"] } \ No newline at end of file diff --git a/src/agent/reqwest-retry/src/lib.rs b/src/agent/reqwest-retry/src/lib.rs index acf1556b0..019f32d75 100644 --- a/src/agent/reqwest-retry/src/lib.rs +++ b/src/agent/reqwest-retry/src/lib.rs @@ -4,6 +4,7 @@ use anyhow::Result; use async_trait::async_trait; use backoff::{self, future::FutureOperation, ExponentialBackoff}; +use onefuzz_telemetry::warn; use reqwest::{Response, StatusCode}; use std::{ sync::atomic::{AtomicI32, Ordering}, @@ -16,30 +17,33 @@ use std::io::ErrorKind; const DEFAULT_RETRY_PERIOD: Duration = Duration::from_secs(2); const MAX_ELAPSED_TIME: Duration = Duration::from_secs(30); const MAX_RETRY_ATTEMPTS: i32 = 5; +const MAX_RETRY_ERROR_MESSAGE: &str = "Maximum number of attempts reached for this request"; + +fn is_transient_socket_error(error: &reqwest::Error) -> bool { + let source = error.source(); + if let Some(err) = source { + if let Some(io_error) = err.downcast_ref::() { + match io_error.kind() { + ErrorKind::ConnectionAborted + | ErrorKind::ConnectionReset + | ErrorKind::ConnectionRefused + | ErrorKind::TimedOut + | ErrorKind::NotConnected => { + return true; + } + _ => (), + } + } + } + false +} fn to_backoff_response( result: Result, ) -> Result> { - fn is_transient_socket_error(error: &reqwest::Error) -> bool { - let source = error.source(); - if let Some(err) = source { - if let Some(io_error) = err.downcast_ref::() { - match io_error.kind() { - ErrorKind::ConnectionAborted - | ErrorKind::ConnectionReset - | ErrorKind::ConnectionRefused - | ErrorKind::TimedOut - | ErrorKind::NotConnected => return true, - _ => (), - } - } - } - false - } - match result { Err(error) => { - if is_transient_socket_error(&error) { + if error.is_connect() || is_transient_socket_error(&error) { Err(backoff::Error::Transient(anyhow::Error::from(error))) } else { Err(backoff::Error::Permanent(anyhow::Error::from(error))) @@ -91,12 +95,14 @@ pub async fn send_retry_reqwest< let op = || async { if counter.fetch_add(1, Ordering::SeqCst) >= max_retry { Result::>::Err(backoff::Error::Permanent( - anyhow::Error::msg("Maximum number of attempts reached for this request"), + anyhow::Error::msg(MAX_RETRY_ERROR_MESSAGE), )) } else { let request = build_request().map_err(backoff::Error::Permanent)?; let response = request.send().await; - Result::>::Ok(error_mapper(response)?) + let mapped = error_mapper(response)?; + + Result::>::Ok(mapped) } }; let result = op @@ -107,7 +113,7 @@ pub async fn send_retry_reqwest< max_elapsed_time: Some(max_elapsed_time), ..ExponentialBackoff::default() }, - |err, _| println!("Transient error: {}", err), + |err, _| warn!("transient http error: {}", err), ) .await?; Ok(result) @@ -173,17 +179,37 @@ impl SendRetry for reqwest::RequestBuilder { mod test { use super::*; - // TODO: convert to feature-gated integration test. - #[ignore] #[tokio::test] - async fn empty_stack() -> Result<()> { - let resp = reqwest::Client::new() - .get("http://localhost:5000/api/testGet") + async fn retry_should_pass() -> Result<()> { + reqwest::Client::new() + .get("https://www.microsoft.com") .send_retry_default() - .await?; - println!("{:?}", resp); + .await? + .error_for_status()?; + + Ok(()) + } + + #[tokio::test] + async fn retry_should_fail() -> Result<()> { + let invalid_url = "http://localhost:81/test.txt"; + let resp = reqwest::Client::new() + .get(invalid_url) + .send_retry( + Duration::from_secs(1), + Duration::from_secs(3), + 3i32, + to_backoff_response, + ) + .await; + + if let Err(err) = &resp { + let as_text = format!("{}", err); + assert!(as_text.contains("Maximum number of attempts reached for this request")); + } else { + bail!("response to {} was expected to fail", invalid_url); + } - assert!(resp.error_for_status().is_err()); Ok(()) } }