Resilient connection (#153)

This commit is contained in:
Cheick Keita
2020-10-28 07:51:18 -07:00
committed by GitHub
parent 1d2fb99dd4
commit db8534109f
19 changed files with 532 additions and 38 deletions

267
src/agent/Cargo.lock generated
View File

@ -64,6 +64,108 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034"
[[package]]
name = "async-channel"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-executor"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d373d78ded7d0b3fa8039375718cde0aace493f2e34fb60f51cbf567562ca801"
dependencies = [
"async-task",
"concurrent-queue",
"fastrand",
"futures-lite",
"once_cell",
"vec-arena",
]
[[package]]
name = "async-global-executor"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fefeb39da249f4c33af940b779a56723ce45809ef5c54dad84bb538d4ffb6d9e"
dependencies = [
"async-executor",
"async-io",
"futures-lite",
"num_cpus",
"once_cell",
]
[[package]]
name = "async-io"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38628c78a34f111c5a6b98fc87dfc056cd1590b61afe748b145be4623c56d194"
dependencies = [
"cfg-if 0.1.10",
"concurrent-queue",
"fastrand",
"futures-lite",
"libc",
"log",
"once_cell",
"parking",
"polling",
"socket2",
"vec-arena",
"waker-fn",
"wepoll-sys-stjepang",
"winapi 0.3.9",
]
[[package]]
name = "async-mutex"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e"
dependencies = [
"event-listener",
]
[[package]]
name = "async-std"
version = "1.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9fa76751505e8df1c7a77762f60486f60c71bbd9b8557f4da6ad47d083732ed"
dependencies = [
"async-global-executor",
"async-io",
"async-mutex",
"blocking",
"crossbeam-utils",
"futures-channel",
"futures-core",
"futures-io",
"futures-lite",
"gloo-timers",
"kv-log-macro",
"log",
"memchr",
"num_cpus",
"once_cell",
"pin-project-lite",
"pin-utils",
"slab",
"wasm-bindgen-futures",
]
[[package]]
name = "async-task"
version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ab27c1aa62945039e44edaeee1dc23c74cc0c303dd5fe0fb462a184f1c3a518"
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.41" version = "0.1.41"
@ -84,6 +186,12 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "atomic-waker"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -101,6 +209,19 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "backoff"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "721c249ab59cbc483ad4294c9ee2671835c1e43e9ffc277e6b4ecfef733cfdc5"
dependencies = [
"async-std",
"futures-core",
"instant",
"pin-project",
"rand",
]
[[package]] [[package]]
name = "backtrace" name = "backtrace"
version = "0.3.53" version = "0.3.53"
@ -146,6 +267,20 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "blocking"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9"
dependencies = [
"async-channel",
"async-task",
"atomic-waker",
"fastrand",
"futures-lite",
"once_cell",
]
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.4.0" version = "3.4.0"
@ -164,6 +299,12 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "cache-padded"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba"
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.61" version = "1.0.61"
@ -210,6 +351,15 @@ dependencies = [
"vec_map", "vec_map",
] ]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.7.0" version = "0.7.0"
@ -403,6 +553,12 @@ dependencies = [
"winapi 0.2.8", "winapi 0.2.8",
] ]
[[package]]
name = "event-listener"
version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]] [[package]]
name = "failure" name = "failure"
version = "0.1.8" version = "0.1.8"
@ -431,6 +587,15 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fastrand"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca5faf057445ce5c9d4329e382b2ce7ca38550ef3b73a5348362d5f24e0c7fe3"
dependencies = [
"instant",
]
[[package]] [[package]]
name = "filetime" name = "filetime"
version = "0.2.12" version = "0.2.12"
@ -580,6 +745,21 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fc94b64bb39543b4e432f1790b6bf18e3ee3b74653c5449f63310e9a74b123c" checksum = "5fc94b64bb39543b4e432f1790b6bf18e3ee3b74653c5449f63310e9a74b123c"
[[package]]
name = "futures-lite"
version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "381a7ad57b1bad34693f63f6f377e1abded7a9c85c9d3eb6771e11c60aaadab9"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.6" version = "0.3.6"
@ -660,6 +840,19 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]]
name = "gloo-timers"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]] [[package]]
name = "goblin" name = "goblin"
version = "0.2.3" version = "0.2.3"
@ -880,6 +1073,15 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "instant"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63312a18f7ea8760cdd0a7c5aac1a619752a246b833545e3e36d1f81f7cd9e66"
dependencies = [
"cfg-if 0.1.10",
]
[[package]] [[package]]
name = "iovec" name = "iovec"
version = "0.1.4" version = "0.1.4"
@ -920,6 +1122,15 @@ dependencies = [
"winapi-build", "winapi-build",
] ]
[[package]]
name = "kv-log-macro"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
dependencies = [
"log",
]
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.4.0" version = "1.4.0"
@ -1257,6 +1468,7 @@ dependencies = [
"rand", "rand",
"regex", "regex",
"reqwest", "reqwest",
"reqwest-retry",
"ring", "ring",
"rstack", "rstack",
"serde", "serde",
@ -1292,6 +1504,7 @@ dependencies = [
"num_cpus", "num_cpus",
"onefuzz", "onefuzz",
"reqwest", "reqwest",
"reqwest-retry",
"serde", "serde",
"serde_json", "serde_json",
"storage-queue", "storage-queue",
@ -1316,6 +1529,7 @@ dependencies = [
"log", "log",
"onefuzz", "onefuzz",
"reqwest", "reqwest",
"reqwest-retry",
"serde", "serde",
"serde_json", "serde_json",
"storage-queue", "storage-queue",
@ -1374,6 +1588,12 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]] [[package]]
name = "paste" name = "paste"
version = "0.1.18" version = "0.1.18"
@ -1465,6 +1685,19 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6"
[[package]]
name = "polling"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a5388c2f19adcd6c462b8ca7db12b2c67e4c11c3d084361d1f5203d4389d990"
dependencies = [
"cfg-if 0.1.10",
"libc",
"log",
"wepoll-sys-stjepang",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.9" version = "0.2.9"
@ -1708,6 +1941,17 @@ dependencies = [
"winreg", "winreg",
] ]
[[package]]
name = "reqwest-retry"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"backoff",
"log",
"reqwest",
]
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.16.15" version = "0.16.15"
@ -1972,9 +2216,11 @@ name = "storage-queue"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"base64", "base64",
"regex", "regex",
"reqwest", "reqwest",
"reqwest-retry",
"serde", "serde",
"serde-xml-rs", "serde-xml-rs",
"serde_derive", "serde_derive",
@ -2369,6 +2615,12 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c"
[[package]]
name = "vec-arena"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eafc1b9b2dfc6f5529177b62cf806484db55b32dc7c9658a118e11bbeb33061d"
[[package]] [[package]]
name = "vec_map" name = "vec_map"
version = "0.8.2" version = "0.8.2"
@ -2387,6 +2639,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]] [[package]]
name = "walkdir" name = "walkdir"
version = "2.3.1" version = "2.3.1"
@ -2498,6 +2756,15 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "wepoll-sys-stjepang"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fdfbb03f290ca0b27922e8d48a0997b4ceea12df33269b9f75e713311eb178d"
dependencies = [
"cc",
]
[[package]] [[package]]
name = "win-util" name = "win-util"
version = "0.1.0" version = "0.1.0"

View File

@ -6,6 +6,7 @@ members = [
"onefuzz", "onefuzz",
"onefuzz-agent", "onefuzz-agent",
"onefuzz-supervisor", "onefuzz-supervisor",
"reqwest-retry",
"storage-queue", "storage-queue",
"win-util", "win-util",
] ]

View File

@ -30,6 +30,7 @@ url = { version = "2.1", features = ["serde"] }
uuid = { version = "0.8", features = ["serde", "v4"] } uuid = { version = "0.8", features = ["serde", "v4"] }
onefuzz = { path = "../onefuzz" } onefuzz = { path = "../onefuzz" }
storage-queue = { path = "../storage-queue" } storage-queue = { path = "../storage-queue" }
reqwest-retry = { path = "../reqwest-retry" }
[dev-dependencies] [dev-dependencies]
tempfile = "3.1" tempfile = "3.1"

View File

@ -8,6 +8,7 @@ use onefuzz::{
syncdir::SyncedDir, syncdir::SyncedDir,
}; };
use reqwest::Url; use reqwest::Url;
use reqwest_retry::SendRetry;
use serde::Deserialize; use serde::Deserialize;
use std::{ use std::{
collections::HashMap, collections::HashMap,
@ -117,7 +118,7 @@ async fn try_delete_blob(input_url: Url) -> Result<()> {
let http_client = reqwest::Client::new(); let http_client = reqwest::Client::new();
match http_client match http_client
.delete(input_url) .delete(input_url)
.send() .send_retry_default()
.await? .await?
.error_for_status_with_body() .error_for_status_with_body()
.await .await

View File

@ -10,6 +10,7 @@ use onefuzz::{
syncdir::SyncedDir, syncdir::SyncedDir,
}; };
use reqwest::Url; use reqwest::Url;
use reqwest_retry::SendRetry;
use serde::Deserialize; use serde::Deserialize;
use std::{ use std::{
collections::HashMap, collections::HashMap,
@ -116,7 +117,7 @@ async fn try_delete_blob(input_url: Url) -> Result<()> {
let http_client = reqwest::Client::new(); let http_client = reqwest::Client::new();
match http_client match http_client
.delete(input_url) .delete(input_url)
.send() .send_retry_default()
.await? .await?
.error_for_status_with_body() .error_for_status_with_body()
.await .await

View File

@ -8,7 +8,9 @@ use onefuzz::{
syncdir::SyncedDir, syncdir::SyncedDir,
telemetry::Event::{new_report, new_unable_to_reproduce, new_unique_report}, telemetry::Event::{new_report, new_unable_to_reproduce, new_unique_report},
}; };
use reqwest::StatusCode; use reqwest::StatusCode;
use reqwest_retry::SendRetry;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::PathBuf; use std::path::PathBuf;
use uuid::Uuid; use uuid::Uuid;
@ -65,7 +67,7 @@ async fn upload_deduped(report: &CrashReport, container: &BlobContainerUrl) -> R
.json(report) .json(report)
// Conditional PUT, only if-not-exists. // Conditional PUT, only if-not-exists.
.header("If-None-Match", "*") .header("If-None-Match", "*")
.send() .send_retry_default()
.await?; .await?;
if result.status() != StatusCode::NOT_MODIFIED { if result.status() != StatusCode::NOT_MODIFIED {
event!(new_unique_report;); event!(new_unique_report;);
@ -77,7 +79,7 @@ async fn upload_report(report: &CrashReport, container: &BlobContainerUrl) -> Re
event!(new_report;); event!(new_report;);
let blob = BlobClient::new(); let blob = BlobClient::new();
let url = container.blob(report.blob_name()).url(); let url = container.blob(report.blob_name()).url();
blob.put(url).json(report).send().await?; blob.put(url).json(report).send_retry_default().await?;
Ok(()) Ok(())
} }
@ -85,7 +87,7 @@ async fn upload_no_repro(report: &NoCrash, container: &BlobContainerUrl) -> Resu
event!(new_unable_to_reproduce;); event!(new_unable_to_reproduce;);
let blob = BlobClient::new(); let blob = BlobClient::new();
let url = container.blob(report.blob_name()).url(); let url = container.blob(report.blob_name()).url();
blob.put(url).json(report).send().await?; blob.put(url).json(report).send_retry_default().await?;
Ok(()) Ok(())
} }

View File

@ -24,3 +24,4 @@ tokio = { version = "0.2.13", features = ["full"] }
url = { version = "2.1.1", features = ["serde"] } url = { version = "2.1.1", features = ["serde"] }
uuid = { version = "0.8.1", features = ["serde", "v4"] } uuid = { version = "0.8.1", features = ["serde", "v4"] }
clap = "2.33" clap = "2.33"
reqwest-retry = { path = "../reqwest-retry" }

View File

@ -5,6 +5,7 @@ use std::fmt;
use anyhow::Result; use anyhow::Result;
use onefuzz::http::ResponseExt; use onefuzz::http::ResponseExt;
use reqwest_retry::SendRetry;
use url::Url; use url::Url;
use uuid::Uuid; use uuid::Uuid;
@ -112,7 +113,7 @@ impl ClientCredentials {
.post(url) .post(url)
.header("Content-Length", "0") .header("Content-Length", "0")
.form(&self.form_data()) .form(&self.form_data())
.send() .send_retry_default()
.await? .await?
.error_for_status_with_body() .error_for_status_with_body()
.await?; .await?;
@ -183,7 +184,7 @@ impl ManagedIdentityCredentials {
let response = reqwest::Client::new() let response = reqwest::Client::new()
.get(self.url()) .get(self.url())
.header("Metadata", "true") .header("Metadata", "true")
.send() .send_retry_default()
.await? .await?
.error_for_status_with_body() .error_for_status_with_body()
.await?; .await?;

View File

@ -4,6 +4,7 @@
use anyhow::Result; use anyhow::Result;
use onefuzz::{http::ResponseExt, jitter::delay_with_jitter}; use onefuzz::{http::ResponseExt, jitter::delay_with_jitter};
use reqwest::StatusCode; use reqwest::StatusCode;
use reqwest_retry::SendRetry;
use std::{ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
time::{Duration, Instant}, time::{Duration, Instant},
@ -159,7 +160,7 @@ impl Registration {
.header("Content-Length", "0") .header("Content-Length", "0")
.bearer_auth(token.secret().expose_ref()) .bearer_auth(token.secret().expose_ref())
.body("") .body("")
.send() .send_retry_default()
.await? .await?
.error_for_status(); .error_for_status();
@ -219,7 +220,7 @@ impl Registration {
let response = reqwest::Client::new() let response = reqwest::Client::new()
.get(url) .get(url)
.bearer_auth(token.secret().expose_ref()) .bearer_auth(token.secret().expose_ref())
.send() .send_retry_default()
.await? .await?
.error_for_status_with_body() .error_for_status_with_body()
.await?; .await?;

View File

@ -37,6 +37,7 @@ strum = "0.19"
strum_macros = "0.19" strum_macros = "0.19"
tempfile = "3.1" tempfile = "3.1"
process_control = "2.0" process_control = "2.0"
reqwest-retry = { path = "../reqwest-retry"}
[target.'cfg(target_os = "windows")'.dependencies] [target.'cfg(target_os = "windows")'.dependencies]
winreg = "0.7" winreg = "0.7"

View File

@ -6,6 +6,7 @@ use std::path::{Path, PathBuf};
use anyhow::Result; use anyhow::Result;
use futures::stream::TryStreamExt; use futures::stream::TryStreamExt;
use reqwest::{Body, RequestBuilder, Response, Url}; use reqwest::{Body, RequestBuilder, Response, Url};
use reqwest_retry::SendRetry;
use serde::Serialize; use serde::Serialize;
use tokio::{fs, io}; use tokio::{fs, io};
use tokio_util::codec; use tokio_util::codec;
@ -31,7 +32,12 @@ impl BlobClient {
pub async fn get(&self, url: &Url) -> Result<Response> { pub async fn get(&self, url: &Url) -> Result<Response> {
let url = url.clone(); let url = url.clone();
let r = self.client.get(url).send().await?.error_for_status()?; let r = self
.client
.get(url)
.send_retry_default()
.await?
.error_for_status()?;
Ok(r) Ok(r)
} }
@ -62,7 +68,7 @@ impl BlobClient {
.put(url) .put(url)
.header("x-ms-blob-type", "BlockBlob") .header("x-ms-blob-type", "BlockBlob")
.body(data) .body(data)
.send() .send_retry_default()
.await?; .await?;
Ok(r) Ok(r)
@ -77,7 +83,7 @@ impl BlobClient {
.put(url) .put(url)
.header("x-ms-blob-type", "BlockBlob") .header("x-ms-blob-type", "BlockBlob")
.json(&item) .json(&item)
.send() .send_retry_default()
.await?; .await?;
Ok(r) Ok(r)
@ -103,7 +109,7 @@ impl BlobClient {
.header("Content-Length", &content_length) .header("Content-Length", &content_length)
.header("x-ms-blob-type", "BlockBlob") .header("x-ms-blob-type", "BlockBlob")
.body(body) .body(body)
.send() .send_retry_default()
.await?; .await?;
Ok(resp) Ok(resp)

View File

@ -3,6 +3,7 @@
use crate::fs::{onefuzz_etc, write_file}; use crate::fs::{onefuzz_etc, write_file};
use anyhow::Result; use anyhow::Result;
use reqwest_retry::SendRetry;
use std::time::Duration; use std::time::Duration;
use tokio::fs; use tokio::fs;
use uuid::Uuid; use uuid::Uuid;
@ -27,7 +28,7 @@ pub async fn get_ims_id() -> Result<Uuid> {
.get(IMS_ID_URL) .get(IMS_ID_URL)
.timeout(Duration::from_millis(500)) .timeout(Duration::from_millis(500))
.header("Metadata", "true") .header("Metadata", "true")
.send() .send_retry_default()
.await?; .await?;
let body = resp.text().await?; let body = resp.text().await?;
write_file(path, &body).await?; write_file(path, &body).await?;
@ -48,7 +49,7 @@ pub async fn get_machine_name() -> Result<String> {
.get(VM_NAME_URL) .get(VM_NAME_URL)
.timeout(Duration::from_millis(500)) .timeout(Duration::from_millis(500))
.header("Metadata", "true") .header("Metadata", "true")
.send() .send_retry_default()
.await?; .await?;
let body = resp.text().await?; let body = resp.text().await?;
write_file(path, &body).await?; write_file(path, &body).await?;
@ -68,7 +69,7 @@ pub async fn get_scaleset_name() -> Result<String> {
.get(VM_SCALESET_NAME) .get(VM_SCALESET_NAME)
.timeout(Duration::from_millis(500)) .timeout(Duration::from_millis(500))
.header("Metadata", "true") .header("Metadata", "true")
.send() .send_retry_default()
.await?; .await?;
let body = resp.text().await?; let body = resp.text().await?;
write_file(path, &body).await?; write_file(path, &body).await?;

View File

@ -6,6 +6,7 @@ use std::path::Path;
use anyhow::Result; use anyhow::Result;
use futures::stream::TryStreamExt; use futures::stream::TryStreamExt;
use reqwest as r; use reqwest as r;
use reqwest_retry::{send_retry_reqwest_default, SendRetry};
use serde::Serialize; use serde::Serialize;
use tokio::{fs, io}; use tokio::{fs, io};
use tokio_util::codec; use tokio_util::codec;
@ -31,13 +32,6 @@ impl BlobUploader {
let metadata = fs::metadata(file_path).await?; let metadata = fs::metadata(file_path).await?;
let file_len = metadata.len(); let file_len = metadata.len();
let file = fs::File::open(file_path).await?;
let reader = io::BufReader::new(file);
let codec = codec::BytesCodec::new();
let file_stream = codec::FramedRead::new(reader, codec).map_ok(bytes::BytesMut::freeze);
let body = r::Body::wrap_stream(file_stream);
let url = { let url = {
let url_path = self.url.path(); let url_path = self.url.path();
let blob_path = format!("{}/{}", url_path, file_name); let blob_path = format!("{}/{}", url_path, file_name);
@ -47,21 +41,29 @@ impl BlobUploader {
}; };
// Check if the file already exists before uploading // Check if the file already exists before uploading
let head = self.client.head(url.clone()).send().await?; let head = self.client.head(url.clone()).send_retry_default().await?;
if head.status() == reqwest::StatusCode::OK { if head.status() == reqwest::StatusCode::OK {
return Ok(head); return Ok(head);
} }
let content_length = format!("{}", file_len); let content_length = format!("{}", file_len);
let resp = self let resp = send_retry_reqwest_default(|| {
.client let file = fs::File::from_std(std::fs::File::open(file_path)?);
.put(url) let reader = io::BufReader::new(file);
.header("Content-Length", &content_length) let codec = codec::BytesCodec::new();
.header("x-ms-blob-type", "BlockBlob") let file_stream = codec::FramedRead::new(reader, codec).map_ok(bytes::BytesMut::freeze);
.body(body)
.send() let request_builder = self
.await?; .client
.put(url.clone())
.header("Content-Length", &content_length)
.header("x-ms-blob-type", "BlockBlob")
.body(r::Body::wrap_stream(file_stream));
Ok(request_builder)
})
.await?;
Ok(resp) Ok(resp)
} }
@ -84,7 +86,7 @@ impl BlobUploader {
.put(url) .put(url)
.header("x-ms-blob-type", "BlockBlob") .header("x-ms-blob-type", "BlockBlob")
.json(&data) .json(&data)
.send() .send_retry_default()
.await?; .await?;
Ok(resp) Ok(resp)

View File

@ -0,0 +1,13 @@
[package]
name = "reqwest-retry"
version = "0.1.0"
authors = ["fuzzing@microsoft.com"]
edition = "2018"
license = "MIT"
[dependencies]
anyhow = "1.0"
async-trait = "0.1.36"
reqwest = { version = "0.10", features = ["json", "stream"] }
backoff = { version = "0.2.1", features = ["async-std"] }
log = "0.4"

View File

@ -0,0 +1,186 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
use anyhow::Result;
use async_trait::async_trait;
use backoff::{self, future::FutureOperation, ExponentialBackoff};
use reqwest::{Response, StatusCode};
use std::{
sync::atomic::{AtomicI32, Ordering},
time::Duration,
};
use std::error::Error as StdError;
use std::io::ErrorKind;
const DEFAULT_RETRY_PERIOD: Duration = Duration::from_secs(2);
const MAX_ELAPSED_TIME: Duration = Duration::from_secs(30);
const MAX_RETRY_ATTEMPTS: i32 = 5;
fn to_backoff_response(
result: Result<Response, reqwest::Error>,
) -> Result<Response, backoff::Error<anyhow::Error>> {
fn is_transient_socket_error(error: &reqwest::Error) -> bool {
let source = error.source();
while let Some(err) = source {
if let Some(io_error) = err.downcast_ref::<std::io::Error>() {
match io_error.kind() {
ErrorKind::ConnectionAborted
| ErrorKind::ConnectionReset
| ErrorKind::TimedOut
| ErrorKind::NotConnected => return true,
_ => (),
}
}
}
false
}
match result {
Err(error) => {
if is_transient_socket_error(&error) {
Err(backoff::Error::Transient(anyhow::Error::from(error)))
} else {
Err(backoff::Error::Permanent(anyhow::Error::from(error)))
}
}
Ok(response) => match response.status() {
status if status.is_success() => Ok(response),
StatusCode::REQUEST_TIMEOUT
| StatusCode::TOO_MANY_REQUESTS
| StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => Ok(response
.error_for_status()
.map_err(|error| backoff::Error::Transient(anyhow::Error::from(error)))?),
_ => Ok(response),
},
}
}
pub async fn send_retry_reqwest_default<
F: Fn() -> Result<reqwest::RequestBuilder> + Send + Sync,
>(
build_request: F,
) -> Result<Response> {
send_retry_reqwest(
build_request,
DEFAULT_RETRY_PERIOD,
MAX_ELAPSED_TIME,
MAX_RETRY_ATTEMPTS,
to_backoff_response,
)
.await
}
pub async fn send_retry_reqwest<
F: Fn() -> Result<reqwest::RequestBuilder> + Send + Sync,
F2: Fn(Result<Response, reqwest::Error>) -> Result<Response, backoff::Error<anyhow::Error>>
+ Send
+ Sync,
>(
build_request: F,
retry_period: Duration,
max_elapsed_time: Duration,
max_retry: i32,
error_mapper: F2,
) -> Result<Response> {
let counter = AtomicI32::new(0);
let op = || async {
if counter.fetch_add(1, Ordering::SeqCst) >= max_retry {
Result::<Response, backoff::Error<anyhow::Error>>::Err(backoff::Error::Permanent(
anyhow::Error::msg("Maximum number of attempts reached for this request"),
))
} else {
let request = build_request().map_err(backoff::Error::Permanent)?;
let response = request.send().await;
Result::<Response, backoff::Error<anyhow::Error>>::Ok(error_mapper(response)?)
}
};
let result = op
.retry_notify(
ExponentialBackoff {
current_interval: retry_period,
initial_interval: retry_period,
max_elapsed_time: Some(max_elapsed_time),
..ExponentialBackoff::default()
},
|err, _| println!("Transient error: {}", err),
)
.await?;
Ok(result)
}
#[async_trait]
pub trait SendRetry {
async fn send_retry<
F: Fn(Result<Response, reqwest::Error>) -> Result<Response, backoff::Error<anyhow::Error>>
+ Send
+ Sync,
>(
self,
retry_period: Duration,
max_elapsed_time: Duration,
max_retry: i32,
error_mapper: F,
) -> Result<Response>;
async fn send_retry_default(self) -> Result<Response>;
}
#[async_trait]
impl SendRetry for reqwest::RequestBuilder {
async fn send_retry_default(self) -> Result<Response> {
self.send_retry(
DEFAULT_RETRY_PERIOD,
MAX_ELAPSED_TIME,
MAX_RETRY_ATTEMPTS,
to_backoff_response,
)
.await
}
async fn send_retry<
F: Fn(Result<Response, reqwest::Error>) -> Result<Response, backoff::Error<anyhow::Error>>
+ Send
+ Sync,
>(
self,
retry_period: Duration,
max_elapsed_time: Duration,
max_retry: i32,
response_mapper: F,
) -> Result<Response> {
let result = send_retry_reqwest(
|| {
self.try_clone().ok_or_else(|| {
anyhow::Error::msg("This request cannot be retried because it cannot be cloned")
})
},
retry_period,
max_elapsed_time,
max_retry,
response_mapper,
)
.await?;
Ok(result)
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn empty_stack() -> Result<()> {
let resp = reqwest::Client::new()
.get("http://localhost:5000/api/testGet")
.send_retry_default()
.await?;
println!("{:?}", resp);
assert!(resp.error_for_status().is_err());
Ok(())
}
}

View File

@ -6,12 +6,14 @@ edition = "2018"
license = "MIT" license = "MIT"
[dependencies] [dependencies]
anyhow = "1.0"
async-trait = "0.1.36"
reqwest = { version = "0.10", features = ["json", "stream"] } reqwest = { version = "0.10", features = ["json", "stream"] }
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde-xml-rs = "0.4" serde-xml-rs = "0.4"
serde_derive = "1.0" serde_derive = "1.0"
anyhow = "1.0"
uuid = { version = "0.8", features = ["serde", "v4"] } uuid = { version = "0.8", features = ["serde", "v4"] }
regex = "1.3" regex = "1.3"
base64 = "0.12" base64 = "0.12"
reqwest-retry = { path = "../reqwest-retry" }

View File

@ -3,6 +3,7 @@
use anyhow::Result; use anyhow::Result;
use reqwest::{Client, Url}; use reqwest::{Client, Url};
use reqwest_retry::SendRetry;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::time::Duration; use std::time::Duration;
use uuid::Uuid; use uuid::Uuid;
@ -45,7 +46,7 @@ impl QueueClient {
.http .http
.post(self.messages_url()) .post(self.messages_url())
.body(body) .body(body)
.send() .send_retry_default()
.await?; .await?;
let _ = r.error_for_status()?; let _ = r.error_for_status()?;
Ok(()) Ok(())
@ -55,7 +56,7 @@ impl QueueClient {
let response = self let response = self
.http .http
.get(self.messages_url()) .get(self.messages_url())
.send() .send_retry_default()
.await? .await?
.error_for_status()?; .error_for_status()?;
let text = response.text().await?; let text = response.text().await?;
@ -75,7 +76,11 @@ impl QueueClient {
pub async fn delete(&mut self, receipt: impl Into<Receipt>) -> Result<()> { pub async fn delete(&mut self, receipt: impl Into<Receipt>) -> Result<()> {
let receipt = receipt.into(); let receipt = receipt.into();
let url = self.delete_url(receipt); let url = self.delete_url(receipt);
self.http.delete(url).send().await?.error_for_status()?; self.http
.delete(url)
.send_retry_default()
.await?
.error_for_status()?;
Ok(()) Ok(())
} }

View File

@ -19,3 +19,4 @@ storage-queue = { path = "../agent/storage-queue" }
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "0.2", features = ["macros", "rt-threaded", "fs", "process"] } tokio = { version = "0.2", features = ["macros", "rt-threaded", "fs", "process"] }
url = { version = "2.1", features = ["serde"] } url = { version = "2.1", features = ["serde"] }
reqwest-retry = { path = "../agent/reqwest-retry"}

View File

@ -3,6 +3,7 @@
use crate::proxy; use crate::proxy;
use anyhow::Result; use anyhow::Result;
use reqwest_retry::SendRetry;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fs::File, io::BufReader, path::PathBuf}; use std::{fs::File, io::BufReader, path::PathBuf};
use storage_queue::QueueClient; use storage_queue::QueueClient;
@ -85,7 +86,7 @@ impl Config {
if let Some(etag) = &self.etag { if let Some(etag) = &self.etag {
request = request.header(reqwest::header::IF_NONE_MATCH, etag); request = request.header(reqwest::header::IF_NONE_MATCH, etag);
} }
let response = request.send().await?; let response = request.send_retry_default().await?;
let status = response.status(); let status = response.status();
if status == reqwest::StatusCode::NOT_MODIFIED { if status == reqwest::StatusCode::NOT_MODIFIED {