retry any failed request regardless of status (#674)

This commit is contained in:
bmc-msft
2021-03-15 19:10:24 -04:00
committed by GitHub
parent a3fdc74c53
commit 09e4afcbce

View File

@ -1,108 +1,46 @@
// Copyright (c) Microsoft Corporation. // Copyright (c) Microsoft Corporation.
// Licensed under the MIT License. // Licensed under the MIT License.
use anyhow::Result; use anyhow::{format_err, Result};
use async_trait::async_trait; use async_trait::async_trait;
use backoff::{self, future::FutureOperation, ExponentialBackoff}; use backoff::{self, future::FutureOperation, ExponentialBackoff};
use onefuzz_telemetry::warn; use onefuzz_telemetry::warn;
use reqwest::{Response, StatusCode}; use reqwest::Response;
use std::{ use std::{
sync::atomic::{AtomicI32, Ordering}, sync::atomic::{AtomicUsize, Ordering},
time::Duration, time::Duration,
}; };
use std::error::Error as StdError; const DEFAULT_RETRY_PERIOD: Duration = Duration::from_secs(5);
use std::io::ErrorKind; const MAX_RETRY_ATTEMPTS: usize = 5;
const DEFAULT_RETRY_PERIOD: Duration = Duration::from_secs(2);
const MAX_ELAPSED_TIME: Duration = Duration::from_secs(30);
const MAX_RETRY_ATTEMPTS: i32 = 5;
const MAX_RETRY_ERROR_MESSAGE: &str = "Maximum number of attempts reached for this request";
fn is_transient_socket_error(error: &reqwest::Error) -> bool {
let source = error.source();
if let Some(err) = source {
if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
match io_error.kind() {
ErrorKind::ConnectionAborted
| ErrorKind::ConnectionReset
| ErrorKind::ConnectionRefused
| ErrorKind::TimedOut
| ErrorKind::NotConnected => {
return true;
}
_ => (),
}
}
}
false
}
fn to_backoff_response(
result: Result<Response, reqwest::Error>,
) -> Result<Response, backoff::Error<anyhow::Error>> {
match result {
Err(error) => {
if error.is_connect() || is_transient_socket_error(&error) {
Err(backoff::Error::Transient(anyhow::Error::from(error)))
} else {
Err(backoff::Error::Permanent(anyhow::Error::from(error)))
}
}
Ok(response) => match response.status() {
status if status.is_success() => Ok(response),
StatusCode::REQUEST_TIMEOUT
| StatusCode::TOO_MANY_REQUESTS
| StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => Ok(response
.error_for_status()
.map_err(|error| backoff::Error::Transient(anyhow::Error::from(error)))?),
_ => Ok(response),
},
}
}
pub async fn send_retry_reqwest_default< pub async fn send_retry_reqwest_default<
F: Fn() -> Result<reqwest::RequestBuilder> + Send + Sync, F: Fn() -> Result<reqwest::RequestBuilder> + Send + Sync,
>( >(
build_request: F, build_request: F,
) -> Result<Response> { ) -> Result<Response> {
send_retry_reqwest( send_retry_reqwest(build_request, DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS).await
build_request,
DEFAULT_RETRY_PERIOD,
MAX_ELAPSED_TIME,
MAX_RETRY_ATTEMPTS,
to_backoff_response,
)
.await
} }
pub async fn send_retry_reqwest< pub async fn send_retry_reqwest<F: Fn() -> Result<reqwest::RequestBuilder> + Send + Sync>(
F: Fn() -> Result<reqwest::RequestBuilder> + Send + Sync,
F2: Fn(Result<Response, reqwest::Error>) -> Result<Response, backoff::Error<anyhow::Error>>
+ Send
+ Sync,
>(
build_request: F, build_request: F,
retry_period: Duration, retry_period: Duration,
max_elapsed_time: Duration, max_retry: usize,
max_retry: i32,
error_mapper: F2,
) -> Result<Response> { ) -> Result<Response> {
let counter = AtomicI32::new(0); let counter = AtomicUsize::new(0);
let op = || async { let op = || async {
if counter.fetch_add(1, Ordering::SeqCst) >= max_retry { if counter.fetch_add(1, Ordering::SeqCst) >= max_retry {
Result::<Response, backoff::Error<anyhow::Error>>::Err(backoff::Error::Permanent( Err(backoff::Error::Permanent(format_err!(
anyhow::Error::msg(MAX_RETRY_ERROR_MESSAGE), "request failed after {} attempts",
)) max_retry
)))
} else { } else {
let request = build_request().map_err(backoff::Error::Permanent)?; let request = build_request().map_err(backoff::Error::Permanent)?;
let response = request.send().await; let response = request
let mapped = error_mapper(response)?; .send()
.await
Result::<Response, backoff::Error<anyhow::Error>>::Ok(mapped) .map_err(|e| backoff::Error::Transient(anyhow::Error::from(e)))?;
Ok(response)
} }
}; };
let result = op let result = op
@ -110,10 +48,9 @@ pub async fn send_retry_reqwest<
ExponentialBackoff { ExponentialBackoff {
current_interval: retry_period, current_interval: retry_period,
initial_interval: retry_period, initial_interval: retry_period,
max_elapsed_time: Some(max_elapsed_time),
..ExponentialBackoff::default() ..ExponentialBackoff::default()
}, },
|err, _| warn!("transient http error: {}", err), |err, dur| warn!("request attempt failed after {:?}: {}", dur, err),
) )
.await?; .await?;
Ok(result) Ok(result)
@ -121,43 +58,18 @@ pub async fn send_retry_reqwest<
#[async_trait] #[async_trait]
pub trait SendRetry { pub trait SendRetry {
async fn send_retry< async fn send_retry(self, retry_period: Duration, max_retry: usize) -> Result<Response>;
F: Fn(Result<Response, reqwest::Error>) -> Result<Response, backoff::Error<anyhow::Error>>
+ Send
+ Sync,
>(
self,
retry_period: Duration,
max_elapsed_time: Duration,
max_retry: i32,
error_mapper: F,
) -> Result<Response>;
async fn send_retry_default(self) -> Result<Response>; async fn send_retry_default(self) -> Result<Response>;
} }
#[async_trait] #[async_trait]
impl SendRetry for reqwest::RequestBuilder { impl SendRetry for reqwest::RequestBuilder {
async fn send_retry_default(self) -> Result<Response> { async fn send_retry_default(self) -> Result<Response> {
self.send_retry( self.send_retry(DEFAULT_RETRY_PERIOD, MAX_RETRY_ATTEMPTS)
DEFAULT_RETRY_PERIOD, .await
MAX_ELAPSED_TIME,
MAX_RETRY_ATTEMPTS,
to_backoff_response,
)
.await
} }
async fn send_retry< async fn send_retry(self, retry_period: Duration, max_retry: usize) -> Result<Response> {
F: Fn(Result<Response, reqwest::Error>) -> Result<Response, backoff::Error<anyhow::Error>>
+ Send
+ Sync,
>(
self,
retry_period: Duration,
max_elapsed_time: Duration,
max_retry: i32,
response_mapper: F,
) -> Result<Response> {
let result = send_retry_reqwest( let result = send_retry_reqwest(
|| { || {
self.try_clone().ok_or_else(|| { self.try_clone().ok_or_else(|| {
@ -165,9 +77,7 @@ impl SendRetry for reqwest::RequestBuilder {
}) })
}, },
retry_period, retry_period,
max_elapsed_time,
max_retry, max_retry,
response_mapper,
) )
.await?; .await?;
@ -195,17 +105,12 @@ mod test {
let invalid_url = "http://127.0.0.1:81/test.txt"; let invalid_url = "http://127.0.0.1:81/test.txt";
let resp = reqwest::Client::new() let resp = reqwest::Client::new()
.get(invalid_url) .get(invalid_url)
.send_retry( .send_retry(Duration::from_millis(1), 3)
Duration::from_secs(1),
Duration::from_secs(300),
2i32,
to_backoff_response,
)
.await; .await;
if let Err(err) = &resp { if let Err(err) = &resp {
let as_text = format!("{}", err); let as_text = format!("{}", err);
assert!(as_text.contains(MAX_RETRY_ERROR_MESSAGE)); assert!(as_text.contains("request failed after"));
} else { } else {
anyhow::bail!("response to {} was expected to fail", invalid_url); anyhow::bail!("response to {} was expected to fail", invalid_url);
} }