mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-21 21:54:26 +00:00
fix agent retry
on connection level failures (#623)
In debugging the connection retry issues, I dug into this more. Apparently, some of hyper's connection errors are not mapped to std::io::Error, rendering the existing downcast impl ineffective. As such, this PR makes the following updates: 1. Any request that fails for what `reqwest` calls a `connection` error is considered transient. 2. Updates the retry notify code to use our `warn` macro such that the events show up in application insights. 3. Updates the unit test to demonstrate that failures by trying to connect to `http://localhost:81/`, which shouldn't be listening on any system. 4. Adds a simple unit test to verify with send_retry_default, connections to https://www.microsoft.com work Fixes #263
This commit is contained in:
2
src/agent/Cargo.lock
generated
2
src/agent/Cargo.lock
generated
@ -2124,7 +2124,7 @@ dependencies = [
|
|||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"backoff",
|
"backoff",
|
||||||
"log",
|
"onefuzz-telemetry",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
@ -10,7 +10,7 @@ anyhow = "1.0"
|
|||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
reqwest = { version = "0.10", features = ["json", "stream"] }
|
reqwest = { version = "0.10", features = ["json", "stream"] }
|
||||||
backoff = { version = "0.2", features = ["async-std"] }
|
backoff = { version = "0.2", features = ["async-std"] }
|
||||||
log = "0.4"
|
onefuzz-telemetry = { path = "../onefuzz-telemetry" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "0.2" }
|
tokio = { version = "0.2" , features=["macros"] }
|
@ -4,6 +4,7 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use backoff::{self, future::FutureOperation, ExponentialBackoff};
|
use backoff::{self, future::FutureOperation, ExponentialBackoff};
|
||||||
|
use onefuzz_telemetry::warn;
|
||||||
use reqwest::{Response, StatusCode};
|
use reqwest::{Response, StatusCode};
|
||||||
use std::{
|
use std::{
|
||||||
sync::atomic::{AtomicI32, Ordering},
|
sync::atomic::{AtomicI32, Ordering},
|
||||||
@ -16,30 +17,33 @@ use std::io::ErrorKind;
|
|||||||
const DEFAULT_RETRY_PERIOD: Duration = Duration::from_secs(2);
|
const DEFAULT_RETRY_PERIOD: Duration = Duration::from_secs(2);
|
||||||
const MAX_ELAPSED_TIME: Duration = Duration::from_secs(30);
|
const MAX_ELAPSED_TIME: Duration = Duration::from_secs(30);
|
||||||
const MAX_RETRY_ATTEMPTS: i32 = 5;
|
const MAX_RETRY_ATTEMPTS: i32 = 5;
|
||||||
|
const MAX_RETRY_ERROR_MESSAGE: &str = "Maximum number of attempts reached for this request";
|
||||||
|
|
||||||
|
fn is_transient_socket_error(error: &reqwest::Error) -> bool {
|
||||||
|
let source = error.source();
|
||||||
|
if let Some(err) = source {
|
||||||
|
if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
|
||||||
|
match io_error.kind() {
|
||||||
|
ErrorKind::ConnectionAborted
|
||||||
|
| ErrorKind::ConnectionReset
|
||||||
|
| ErrorKind::ConnectionRefused
|
||||||
|
| ErrorKind::TimedOut
|
||||||
|
| ErrorKind::NotConnected => {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
fn to_backoff_response(
|
fn to_backoff_response(
|
||||||
result: Result<Response, reqwest::Error>,
|
result: Result<Response, reqwest::Error>,
|
||||||
) -> Result<Response, backoff::Error<anyhow::Error>> {
|
) -> Result<Response, backoff::Error<anyhow::Error>> {
|
||||||
fn is_transient_socket_error(error: &reqwest::Error) -> bool {
|
|
||||||
let source = error.source();
|
|
||||||
if let Some(err) = source {
|
|
||||||
if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
|
|
||||||
match io_error.kind() {
|
|
||||||
ErrorKind::ConnectionAborted
|
|
||||||
| ErrorKind::ConnectionReset
|
|
||||||
| ErrorKind::ConnectionRefused
|
|
||||||
| ErrorKind::TimedOut
|
|
||||||
| ErrorKind::NotConnected => return true,
|
|
||||||
_ => (),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
if is_transient_socket_error(&error) {
|
if error.is_connect() || is_transient_socket_error(&error) {
|
||||||
Err(backoff::Error::Transient(anyhow::Error::from(error)))
|
Err(backoff::Error::Transient(anyhow::Error::from(error)))
|
||||||
} else {
|
} else {
|
||||||
Err(backoff::Error::Permanent(anyhow::Error::from(error)))
|
Err(backoff::Error::Permanent(anyhow::Error::from(error)))
|
||||||
@ -91,12 +95,14 @@ pub async fn send_retry_reqwest<
|
|||||||
let op = || async {
|
let op = || async {
|
||||||
if counter.fetch_add(1, Ordering::SeqCst) >= max_retry {
|
if counter.fetch_add(1, Ordering::SeqCst) >= max_retry {
|
||||||
Result::<Response, backoff::Error<anyhow::Error>>::Err(backoff::Error::Permanent(
|
Result::<Response, backoff::Error<anyhow::Error>>::Err(backoff::Error::Permanent(
|
||||||
anyhow::Error::msg("Maximum number of attempts reached for this request"),
|
anyhow::Error::msg(MAX_RETRY_ERROR_MESSAGE),
|
||||||
))
|
))
|
||||||
} else {
|
} else {
|
||||||
let request = build_request().map_err(backoff::Error::Permanent)?;
|
let request = build_request().map_err(backoff::Error::Permanent)?;
|
||||||
let response = request.send().await;
|
let response = request.send().await;
|
||||||
Result::<Response, backoff::Error<anyhow::Error>>::Ok(error_mapper(response)?)
|
let mapped = error_mapper(response)?;
|
||||||
|
|
||||||
|
Result::<Response, backoff::Error<anyhow::Error>>::Ok(mapped)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let result = op
|
let result = op
|
||||||
@ -107,7 +113,7 @@ pub async fn send_retry_reqwest<
|
|||||||
max_elapsed_time: Some(max_elapsed_time),
|
max_elapsed_time: Some(max_elapsed_time),
|
||||||
..ExponentialBackoff::default()
|
..ExponentialBackoff::default()
|
||||||
},
|
},
|
||||||
|err, _| println!("Transient error: {}", err),
|
|err, _| warn!("transient http error: {}", err),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(result)
|
Ok(result)
|
||||||
@ -173,17 +179,37 @@ impl SendRetry for reqwest::RequestBuilder {
|
|||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
// TODO: convert to feature-gated integration test.
|
|
||||||
#[ignore]
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn empty_stack() -> Result<()> {
|
async fn retry_should_pass() -> Result<()> {
|
||||||
let resp = reqwest::Client::new()
|
reqwest::Client::new()
|
||||||
.get("http://localhost:5000/api/testGet")
|
.get("https://www.microsoft.com")
|
||||||
.send_retry_default()
|
.send_retry_default()
|
||||||
.await?;
|
.await?
|
||||||
println!("{:?}", resp);
|
.error_for_status()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn retry_should_fail() -> Result<()> {
|
||||||
|
let invalid_url = "http://localhost:81/test.txt";
|
||||||
|
let resp = reqwest::Client::new()
|
||||||
|
.get(invalid_url)
|
||||||
|
.send_retry(
|
||||||
|
Duration::from_secs(1),
|
||||||
|
Duration::from_secs(3),
|
||||||
|
3i32,
|
||||||
|
to_backoff_response,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if let Err(err) = &resp {
|
||||||
|
let as_text = format!("{}", err);
|
||||||
|
assert!(as_text.contains("Maximum number of attempts reached for this request"));
|
||||||
|
} else {
|
||||||
|
bail!("response to {} was expected to fail", invalid_url);
|
||||||
|
}
|
||||||
|
|
||||||
assert!(resp.error_for_status().is_err());
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user