Update Rust azure_* crates (#2266)

This commit is contained in:
George Pollard 2022-08-23 10:45:47 +12:00 committed by GitHub
parent f92d207dd4
commit cddaed1bf1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 157 additions and 153 deletions

180
src/agent/Cargo.lock generated
View File

@ -62,7 +62,7 @@ dependencies = [
"serde",
"serde_json",
"sm",
"uuid",
"uuid 0.8.2",
]
[[package]]
@ -133,80 +133,76 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "azure_core"
version = "0.1.1"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c61455ab776eedabfc7e166dda27c6c6bc2a882c043c35817501f1bd7440158"
checksum = "e6e66a6d993197d1b575cffd08bf725e04bc1414de6586baeb3537925e49b4ff"
dependencies = [
"async-trait",
"base64",
"bytes",
"chrono",
"dyn-clone",
"futures",
"getrandom 0.2.3",
"http",
"http-types",
"log",
"oauth2",
"paste",
"pin-project",
"rand 0.8.4",
"reqwest",
"rustc_version",
"serde",
"serde_derive",
"serde_json",
"thiserror",
"time 0.3.13",
"url",
"uuid",
"uuid 1.1.2",
]
[[package]]
name = "azure_storage"
version = "0.1.0"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22c413e8459badf86c9e6e0c84f5894609663bcc8fa5eb1e49bfb985273dac58"
checksum = "55aa63fb0426c76b9cc909234e94e69764f5e777a62b7ed6411f3cc04aa94a16"
dependencies = [
"RustyXML",
"async-trait",
"azure_core",
"base64",
"bytes",
"chrono",
"futures",
"http",
"hmac",
"log",
"once_cell",
"ring",
"serde",
"serde-xml-rs",
"serde_derive",
"serde_json",
"thiserror",
"sha2",
"time 0.3.13",
"url",
"uuid",
"uuid 1.1.2",
]
[[package]]
name = "azure_storage_blobs"
version = "0.1.0"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a70ec6fab8a2cae5d774098267870c0f3fbef1cb63cac12afab38b8c17cc8d97"
checksum = "33f953cedd7c240347ed0f6ed8a0bd7472b32579c38341ab9e117eaa6173e9ba"
dependencies = [
"RustyXML",
"azure_core",
"azure_storage",
"base64",
"bytes",
"chrono",
"futures",
"http",
"log",
"md5",
"serde",
"serde-xml-rs",
"serde_derive",
"serde_json",
"thiserror",
"time 0.3.13",
"url",
"uuid",
"uuid 1.1.2",
]
[[package]]
@ -306,15 +302,6 @@ dependencies = [
"wyz 0.5.0",
]
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]]
name = "block-buffer"
version = "0.10.1"
@ -402,8 +389,7 @@ dependencies = [
"libc",
"num-integer",
"num-traits",
"serde",
"time",
"time 0.1.43",
"winapi",
]
@ -517,7 +503,7 @@ dependencies = [
"serde_json",
"structopt",
"symbolic",
"uuid",
"uuid 0.8.2",
"win-util",
"winapi",
"xml-rs",
@ -711,7 +697,7 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f91cf5a8c2f2097e2a32627123508635d47ce10563d999ec1a95addf08b502ba"
dependencies = [
"uuid",
"uuid 0.8.2",
]
[[package]]
@ -731,23 +717,15 @@ version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e25ea47919b1560c4e3b7fe0aaab9becf5b84a10325ddf7db0f0ba5e1026499"
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]]
name = "digest"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506"
dependencies = [
"block-buffer 0.10.1",
"block-buffer",
"crypto-common",
"subtle",
]
[[package]]
@ -1226,6 +1204,15 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hmac"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
"digest",
]
[[package]]
name = "hostname"
version = "0.3.1"
@ -1411,7 +1398,7 @@ dependencies = [
"log",
"num_cpus",
"rayon",
"sha2 0.10.2",
"sha2",
"win-util",
"winapi",
]
@ -1808,23 +1795,12 @@ dependencies = [
]
[[package]]
name = "oauth2"
version = "4.1.0"
name = "num_threads"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80e47cfc4c0a1a519d9a025ebfbac3a2439d1b5cdf397d72dcb79b11d9920dab"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
dependencies = [
"base64",
"chrono",
"getrandom 0.2.3",
"http",
"rand 0.8.4",
"reqwest",
"serde",
"serde_json",
"serde_path_to_error",
"sha2 0.9.9",
"thiserror",
"url",
"libc",
]
[[package]]
@ -1876,7 +1852,7 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"sha2 0.10.2",
"sha2",
"stacktrace-parser",
"storage-queue",
"structopt",
@ -1889,7 +1865,7 @@ dependencies = [
"url",
"url-escape",
"urlparse",
"uuid",
"uuid 0.8.2",
"winapi",
"winreg 0.10.1",
]
@ -1917,7 +1893,7 @@ dependencies = [
"tokio",
"url",
"users",
"uuid",
"uuid 0.8.2",
"winapi",
]
@ -1964,7 +1940,7 @@ dependencies = [
"tokio-util 0.7.2",
"tui",
"url",
"uuid",
"uuid 0.8.2",
]
[[package]]
@ -1979,16 +1955,10 @@ dependencies = [
"log",
"serde",
"tokio",
"uuid",
"uuid 0.8.2",
"z3-sys",
]
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "os_pipe"
version = "1.0.0"
@ -2100,7 +2070,7 @@ checksum = "13f4d162ecaaa1467de5afbe62d597757b674b51da8bb4e587430c5fdb2af7aa"
dependencies = [
"fallible-iterator",
"scroll 0.10.2",
"uuid",
"uuid 0.8.2",
]
[[package]]
@ -2701,15 +2671,6 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7868ad3b8196a8a0aea99a8220b124278ee5320a55e4fde97794b6f85b1a377"
dependencies = [
"serde",
]
[[package]]
name = "serde_qs"
version = "0.8.4"
@ -2733,19 +2694,6 @@ dependencies = [
"serde",
]
[[package]]
name = "sha2"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800"
dependencies = [
"block-buffer 0.9.0",
"cfg-if 1.0.0",
"cpufeatures",
"digest 0.9.0",
"opaque-debug",
]
[[package]]
name = "sha2"
version = "0.10.2"
@ -2754,7 +2702,7 @@ checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676"
dependencies = [
"cfg-if 1.0.0",
"cpufeatures",
"digest 0.10.3",
"digest",
]
[[package]]
@ -2910,7 +2858,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
"sha2 0.10.2",
"sha2",
]
[[package]]
@ -2940,7 +2888,7 @@ dependencies = [
"serde_derive",
"serde_json",
"tokio",
"uuid",
"uuid 0.8.2",
]
[[package]]
@ -3012,6 +2960,12 @@ dependencies = [
"syn 1.0.95",
]
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "symbolic"
version = "8.8.0"
@ -3033,7 +2987,7 @@ dependencies = [
"debugid",
"memmap2",
"stable_deref_trait",
"uuid",
"uuid 0.8.2",
]
[[package]]
@ -3189,6 +3143,25 @@ dependencies = [
"winapi",
]
[[package]]
name = "time"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db76ff9fa4b1458b3c7f077f3ff9887394058460d21e634355b273aaf11eea45"
dependencies = [
"itoa 1.0.1",
"libc",
"num_threads",
"serde",
"time-macros",
]
[[package]]
name = "time-macros"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792"
[[package]]
name = "tinyvec"
version = "1.4.0"
@ -3466,6 +3439,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "uuid"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f"
dependencies = [
"getrandom 0.2.3",
]
[[package]]
name = "vec_map"
version = "0.8.2"

View File

@ -48,6 +48,6 @@ url = { version = "2.2", features = ["serde"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
chrono = "0.4"
azure_core = { version = "0.1.1", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.1.0", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage_blobs = { version = "0.1.0", default-features = false, features = ["enable_reqwest_rustls"] }
azure_core = { version = "0.4", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.5", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage_blobs = { version = "0.5", default-features = false, features = ["enable_reqwest_rustls"] }

View File

@ -3,12 +3,15 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use azure_core::HttpError;
use azure_storage::core::prelude::*;
use azure_core::error::HttpError;
use azure_core::StatusCode;
use azure_storage::prelude::StorageClient;
use azure_storage_blobs::prelude::*;
use azure_storage_blobs::{container::operations::ListBlobsResponse, prelude::AsContainerClient};
use futures::TryStreamExt;
use onefuzz_telemetry::{LogTrace, LoggingEvent};
use reqwest::{StatusCode, Url};
use std::{path::PathBuf, sync::Arc, time::Duration};
use reqwest::Url;
use std::{path::PathBuf, time::Duration};
use uuid::Uuid;
use tokio::sync::broadcast::{error::TryRecvError, Receiver};
@ -45,7 +48,7 @@ trait LogWriter<T>: Send + Sync {
/// Writes logs on azure blobs
pub struct BlobLogWriter {
container_client: Arc<ContainerClient>,
container_client: ContainerClient,
task_id: Uuid,
machine_id: Uuid,
blob_id: usize,
@ -65,17 +68,17 @@ impl BlobLogWriter {
) -> Result<Self> {
let container_client = TaskLogger::create_container_client(&log_container)?;
let prefix = format!("{}/{}", task_id, machine_id);
let blob_list = container_client
let blob_list: Vec<ListBlobsResponse> = container_client
.list_blobs()
.prefix(prefix.as_str())
.execute()
.await
.map_err(|e| anyhow!(e.to_string()))?;
let mut blob_ids = blob_list
.blobs
.blobs
.iter()
.filter_map(|b| {
.prefix(prefix.clone())
.into_stream()
.try_collect()
.await?;
let mut blob_ids: Vec<usize> = blob_list
.into_iter()
.flat_map(|lbr: ListBlobsResponse| lbr.blobs.blobs)
.filter_map(|b: Blob| {
b.name
.strip_prefix(&prefix)
.map(PathBuf::from)
@ -90,16 +93,17 @@ impl BlobLogWriter {
.and_then(|f| f.parse::<usize>().ok())
})
})
.collect::<Vec<_>>();
.collect();
blob_ids.sort_unstable();
let blob_id = match blob_ids.into_iter().last() {
Some(id) => id,
None => {
let blob_client = container_client.as_blob_client(format!("{}/1.log", prefix));
let blob_client = container_client.blob_client(format!("{}/1.log", prefix));
blob_client
.put_append_blob()
.execute()
.into_future()
.await
.map_err(|e| anyhow!(e.to_string()))?;
1
@ -120,7 +124,7 @@ impl BlobLogWriter {
impl LogWriter<BlobLogWriter> for BlobLogWriter {
async fn write_logs(&self, logs: &[LoggingEvent]) -> Result<WriteLogResponse> {
let blob_name = self.get_blob_name();
let blob_client = self.container_client.as_blob_client(blob_name);
let blob_client = self.container_client.blob_client(blob_name);
let data_stream = logs
.iter()
.flat_map(|log_event| match log_event {
@ -150,25 +154,24 @@ impl LogWriter<BlobLogWriter> for BlobLogWriter {
let result = blob_client
.append_block(data_stream)
.condition_max_size(self.max_log_size)
.execute()
.into_future()
.await;
match result {
Ok(_r) => Ok(WriteLogResponse::Success),
Err(e) => match e.downcast_ref::<HttpError>() {
Some(HttpError::StatusCode { status: s, body: b }) => {
if s == &StatusCode::PRECONDITION_FAILED
&& b.contains("MaxBlobSizeConditionNotMet")
Some(herr) => match herr.status() {
StatusCode::PreconditionFailed
if herr.error_code() == Some("MaxBlobSizeConditionNotMet") =>
{
Ok(WriteLogResponse::MaxSizeReached)
} else if s == &StatusCode::CONFLICT && b.contains("BlockCountExceedsLimit") {
Ok(WriteLogResponse::MaxSizeReached)
} else if s == &StatusCode::PAYLOAD_TOO_LARGE {
Ok(WriteLogResponse::MessageTooLarge)
} else {
Err(anyhow!(e.to_string()))
}
}
StatusCode::Conflict if herr.error_code() == Some("BlockCountExceedsLimit") => {
Ok(WriteLogResponse::MaxSizeReached)
}
StatusCode::PayloadTooLarge => Ok(WriteLogResponse::MessageTooLarge),
_ => Err(anyhow!(e.to_string())),
},
_ => Err(anyhow!(e.to_string())),
},
}
@ -184,10 +187,11 @@ impl LogWriter<BlobLogWriter> for BlobLogWriter {
let blob_client = self
.container_client
.as_blob_client(new_writer.get_blob_name());
.blob_client(new_writer.get_blob_name());
blob_client
.put_append_blob()
.execute()
.into_future()
.await
.map_err(|e| anyhow!(e.to_string()))?;
@ -239,7 +243,7 @@ impl TaskLogger {
}
}
fn create_container_client(log_container: &Url) -> Result<Arc<ContainerClient>> {
fn create_container_client(log_container: &Url) -> Result<ContainerClient> {
let account = log_container
.domain()
.and_then(|d| d.split('.').next())
@ -254,10 +258,8 @@ impl TaskLogger {
.query()
.ok_or(anyhow!("Invalid log container"))?;
let http_client = azure_core::new_http_client();
let storage_account_client =
StorageAccountClient::new_sas_token(http_client, account, sas_token)?;
Ok(storage_account_client.as_container_client(container))
let client = StorageClient::new_sas_token(account, sas_token)?;
Ok(client.container_client(container))
}
async fn event_loop<T: Send + Sized>(self, context: LoopContext<T>) -> Result<LoopContext<T>> {
@ -448,7 +450,10 @@ impl SpawnedLogger {
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::RwLock};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use super::*;
use onefuzz_telemetry::LogTrace;
@ -469,17 +474,29 @@ mod tests {
let log_container = Url::parse(&url)?;
let client = TaskLogger::create_container_client(&log_container)?;
let response = client
let responses: Vec<_> = client
.list_blobs()
.prefix("job1/tak1/1")
.execute()
.into_stream()
.try_collect()
.await
.map_err(|e| anyhow!(e.to_string()))?;
println!("blob prefix {:?}", response.blobs.blob_prefix);
for blob in response.blobs.blobs {
println!("{}", blob.name);
println!(
"blob prefix {:?}",
responses
.first()
.expect("expected some blobs")
.blobs
.blob_prefix
);
for response in responses {
for blob in response.blobs.blobs {
println!("{}", blob.name);
}
}
Ok(())
}
@ -661,17 +678,20 @@ mod tests {
let container_client = blob_writer.container_client.clone();
let blobs = container_client
let pages: Vec<_> = container_client
.list_blobs()
.prefix(blob_prefix.clone())
.execute()
.into_stream()
.try_collect()
.await
.map_err(|e| anyhow!(e.to_string()))?;
let blobs: Vec<_> = pages.into_iter().flat_map(|p| p.blobs.blobs).collect();
// test initial blob creation
assert_eq!(blobs.blobs.blobs.len(), 1, "expected exactly one blob");
assert_eq!(blobs.len(), 1, "expected exactly one blob");
assert_eq!(
blobs.blobs.blobs[0].name,
blobs[0].name,
format!("{}/1.log", &blob_prefix),
"Wrong file name"
);
@ -704,15 +724,17 @@ mod tests {
// testing the creation of new blob when we call get_next_writer()
let _blob_writer = blob_writer.get_next_writer().await?;
let blobs = container_client
let pages: Vec<_> = container_client
.list_blobs()
.prefix(blob_prefix.clone())
.execute()
.into_stream()
.try_collect()
.await
.map_err(|e| anyhow!(e.to_string()))?;
assert_eq!(blobs.blobs.blobs.len(), 2, "expected exactly 2 blob");
let blobs = blobs.blobs.blobs;
let blobs: Vec<_> = pages.into_iter().flat_map(|p| p.blobs.blobs).collect();
assert_eq!(blobs.len(), 2, "expected exactly 2 blob");
assert!(
blobs