From 09e4afcbcef9376b02d0a87d4c5ecbc3a2dbe1ad Mon Sep 17 00:00:00 2001 From: bmc-msft <41130664+bmc-msft@users.noreply.github.com> Date: Mon, 15 Mar 2021 19:10:24 -0400 Subject: [PATCH] retry any failed request regardless of status (#674) --- src/agent/reqwest-retry/src/lib.rs | 145 +++++------------------------ 1 file changed, 25 insertions(+), 120 deletions(-) diff --git a/src/agent/reqwest-retry/src/lib.rs b/src/agent/reqwest-retry/src/lib.rs index b55a0a97a..1cf64c9ec 100644 --- a/src/agent/reqwest-retry/src/lib.rs +++ b/src/agent/reqwest-retry/src/lib.rs @@ -1,108 +1,46 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. -use anyhow::Result; +use anyhow::{format_err, Result}; use async_trait::async_trait; use backoff::{self, future::FutureOperation, ExponentialBackoff}; use onefuzz_telemetry::warn; -use reqwest::{Response, StatusCode}; +use reqwest::Response; use std::{ - sync::atomic::{AtomicI32, Ordering}, + sync::atomic::{AtomicUsize, Ordering}, time::Duration, }; -use std::error::Error as StdError; -use std::io::ErrorKind; - -const DEFAULT_RETRY_PERIOD: Duration = Duration::from_secs(2); -const MAX_ELAPSED_TIME: Duration = Duration::from_secs(30); -const MAX_RETRY_ATTEMPTS: i32 = 5; -const MAX_RETRY_ERROR_MESSAGE: &str = "Maximum number of attempts reached for this request"; - -fn is_transient_socket_error(error: &reqwest::Error) -> bool { - let source = error.source(); - if let Some(err) = source { - if let Some(io_error) = err.downcast_ref::() { - match io_error.kind() { - ErrorKind::ConnectionAborted - | ErrorKind::ConnectionReset - | ErrorKind::ConnectionRefused - | ErrorKind::TimedOut - | ErrorKind::NotConnected => { - return true; - } - _ => (), - } - } - } - false -} - -fn to_backoff_response( - result: Result, -) -> Result> { - match result { - Err(error) => { - if error.is_connect() || is_transient_socket_error(&error) { - Err(backoff::Error::Transient(anyhow::Error::from(error))) - } else { - Err(backoff::Error::Permanent(anyhow::Error::from(error))) - } - } - Ok(response) => match response.status() { - status if status.is_success() => Ok(response), - StatusCode::REQUEST_TIMEOUT - | StatusCode::TOO_MANY_REQUESTS - | StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT => Ok(response - .error_for_status() - .map_err(|error| backoff::Error::Transient(anyhow::Error::from(error)))?), - _ => Ok(response), - }, - } -} +const DEFAULT_RETRY_PERIOD: Duration = Duration::from_secs(5); +const MAX_RETRY_ATTEMPTS: usize = 5; pub async fn send_retry_reqwest_default< F: Fn() -> Result + Send + Sync, >( build_request: F, ) -> Result { - send_retry_reqwest( - build_request, - DEFAULT_RETRY_PERIOD, - MAX_ELAPSED_TIME, - MAX_RETRY_ATTEMPTS, - to_backoff_response, - ) - .await + send_retry_reqwest(build_request, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS).await } -pub async fn send_retry_reqwest< - F: Fn() -> Result + Send + Sync, - F2: Fn(Result) -> Result> - + Send - + Sync, ->( +pub async fn send_retry_reqwest Result + Send + Sync>( build_request: F, retry_period: Duration, - max_elapsed_time: Duration, - max_retry: i32, - error_mapper: F2, + max_retry: usize, ) -> Result { - let counter = AtomicI32::new(0); + let counter = AtomicUsize::new(0); let op = || async { if counter.fetch_add(1, Ordering::SeqCst) >= max_retry { - Result::>::Err(backoff::Error::Permanent( - anyhow::Error::msg(MAX_RETRY_ERROR_MESSAGE), - )) + Err(backoff::Error::Permanent(format_err!( + "request failed after {} attempts", + max_retry + ))) } else { let request = build_request().map_err(backoff::Error::Permanent)?; - let response = request.send().await; - let mapped = error_mapper(response)?; - - Result::>::Ok(mapped) + let response = request + .send() + .await + .map_err(|e| backoff::Error::Transient(anyhow::Error::from(e)))?; + Ok(response) } }; let result = op @@ -110,10 +48,9 @@ pub async fn send_retry_reqwest< ExponentialBackoff { current_interval: retry_period, initial_interval: retry_period, - max_elapsed_time: Some(max_elapsed_time), ..ExponentialBackoff::default() }, - |err, _| warn!("transient http error: {}", err), + |err, dur| warn!("request attempt failed after {:?}: {}", dur, err), ) .await?; Ok(result) @@ -121,43 +58,18 @@ pub async fn send_retry_reqwest< #[async_trait] pub trait SendRetry { - async fn send_retry< - F: Fn(Result) -> Result> - + Send - + Sync, - >( - self, - retry_period: Duration, - max_elapsed_time: Duration, - max_retry: i32, - error_mapper: F, - ) -> Result; + async fn send_retry(self, retry_period: Duration, max_retry: usize) -> Result; async fn send_retry_default(self) -> Result; } #[async_trait] impl SendRetry for reqwest::RequestBuilder { async fn send_retry_default(self) -> Result { - self.send_retry( - DEFAULT_RETRY_PERIOD, - MAX_ELAPSED_TIME, - MAX_RETRY_ATTEMPTS, - to_backoff_response, - ) - .await + self.send_retry(DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS) + .await } - async fn send_retry< - F: Fn(Result) -> Result> - + Send - + Sync, - >( - self, - retry_period: Duration, - max_elapsed_time: Duration, - max_retry: i32, - response_mapper: F, - ) -> Result { + async fn send_retry(self, retry_period: Duration, max_retry: usize) -> Result { let result = send_retry_reqwest( || { self.try_clone().ok_or_else(|| { @@ -165,9 +77,7 @@ impl SendRetry for reqwest::RequestBuilder { }) }, retry_period, - max_elapsed_time, max_retry, - response_mapper, ) .await?; @@ -195,17 +105,12 @@ mod test { let invalid_url = "http://127.0.0.1:81/test.txt"; let resp = reqwest::Client::new() .get(invalid_url) - .send_retry( - Duration::from_secs(1), - Duration::from_secs(300), - 2i32, - to_backoff_response, - ) + .send_retry(Duration::from_millis(1), 3) .await; if let Err(err) = &resp { let as_text = format!("{}", err); - assert!(as_text.contains(MAX_RETRY_ERROR_MESSAGE)); + assert!(as_text.contains("request failed after")); } else { anyhow::bail!("response to {} was expected to fail", invalid_url); }