Upload logs of the agent (#1721)

* Setting the service side of the log management
- a log is created or reused when e create a job
- when scheduling the task we send the log location to the agent
The expected log structure looks like
{fuzzContainer}/logs/{job_id}/{task_id}/{machine_id}/1.log
This commit is contained in:
Cheick Keita
2022-03-30 15:20:42 -07:00
committed by GitHub
parent 7add51fd3a
commit a2e87c6158
7 changed files with 990 additions and 15 deletions

347
src/agent/Cargo.lock generated
View File

@ -2,6 +2,12 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "RustyXML"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5"
[[package]] [[package]]
name = "addr2line" name = "addr2line"
version = "0.17.0" version = "0.17.0"
@ -125,6 +131,84 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "azure_core"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c61455ab776eedabfc7e166dda27c6c6bc2a882c043c35817501f1bd7440158"
dependencies = [
"async-trait",
"base64",
"bytes 1.1.0",
"chrono",
"dyn-clone",
"futures",
"getrandom 0.2.3",
"http",
"log",
"oauth2",
"rand 0.8.4",
"reqwest",
"rustc_version",
"serde",
"serde_derive",
"serde_json",
"thiserror",
"url",
"uuid",
]
[[package]]
name = "azure_storage"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22c413e8459badf86c9e6e0c84f5894609663bcc8fa5eb1e49bfb985273dac58"
dependencies = [
"RustyXML",
"async-trait",
"azure_core",
"base64",
"bytes 1.1.0",
"chrono",
"futures",
"http",
"log",
"once_cell",
"ring",
"serde",
"serde-xml-rs",
"serde_derive",
"serde_json",
"thiserror",
"url",
"uuid",
]
[[package]]
name = "azure_storage_blobs"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a70ec6fab8a2cae5d774098267870c0f3fbef1cb63cac12afab38b8c17cc8d97"
dependencies = [
"RustyXML",
"azure_core",
"azure_storage",
"base64",
"bytes 1.1.0",
"chrono",
"futures",
"http",
"log",
"md5",
"serde",
"serde-xml-rs",
"serde_derive",
"serde_json",
"thiserror",
"url",
"uuid",
]
[[package]] [[package]]
name = "backoff" name = "backoff"
version = "0.4.0" version = "0.4.0"
@ -210,6 +294,15 @@ dependencies = [
"wyz", "wyz",
] ]
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.10.1" version = "0.10.1"
@ -324,6 +417,7 @@ dependencies = [
"libc", "libc",
"num-integer", "num-integer",
"num-traits", "num-traits",
"serde",
"time", "time",
"winapi 0.3.9", "winapi 0.3.9",
] ]
@ -374,6 +468,22 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "core-foundation"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]] [[package]]
name = "coverage" name = "coverage"
version = "0.1.0" version = "0.1.0"
@ -621,13 +731,22 @@ version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e25ea47919b1560c4e3b7fe0aaab9becf5b84a10325ddf7db0f0ba5e1026499" checksum = "0e25ea47919b1560c4e3b7fe0aaab9becf5b84a10325ddf7db0f0ba5e1026499"
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.10.0" version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8549e6bfdecd113b7e221fe60b433087f6957387a20f8118ebca9b12af19143d" checksum = "8549e6bfdecd113b7e221fe60b433087f6957387a20f8118ebca9b12af19143d"
dependencies = [ dependencies = [
"block-buffer", "block-buffer 0.10.1",
"crypto-common", "crypto-common",
"generic-array", "generic-array",
] ]
@ -656,6 +775,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "453440c271cf5577fd2a40e4942540cb7d0d2f85e27c8d07dd0023c925a67541" checksum = "453440c271cf5577fd2a40e4942540cb7d0d2f85e27c8d07dd0023c925a67541"
[[package]]
name = "dyn-clone"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21e50f3adc76d6a43f5ed73b698a87d0760ca74617f60f7c3b879003536fdd28"
[[package]] [[package]]
name = "dynamic-library" name = "dynamic-library"
version = "0.1.0" version = "0.1.0"
@ -812,6 +937,15 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared 0.1.1",
]
[[package]] [[package]]
name = "foreign-types" name = "foreign-types"
version = "0.5.0" version = "0.5.0"
@ -819,7 +953,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965"
dependencies = [ dependencies = [
"foreign-types-macros", "foreign-types-macros",
"foreign-types-shared", "foreign-types-shared 0.3.0",
] ]
[[package]] [[package]]
@ -833,6 +967,12 @@ dependencies = [
"syn 1.0.76", "syn 1.0.76",
] ]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]] [[package]]
name = "foreign-types-shared" name = "foreign-types-shared"
version = "0.3.0" version = "0.3.0"
@ -1230,6 +1370,19 @@ dependencies = [
"tokio-rustls", "tokio-rustls",
] ]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes 1.1.0",
"hyper",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]] [[package]]
name = "iced-x86" name = "iced-x86"
version = "1.15.0" version = "1.15.0"
@ -1312,7 +1465,7 @@ dependencies = [
"log", "log",
"num_cpus", "num_cpus",
"rayon", "rayon",
"sha2", "sha2 0.10.1",
"win-util", "win-util",
"winapi 0.3.9", "winapi 0.3.9",
] ]
@ -1411,9 +1564,9 @@ dependencies = [
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.103" version = "0.2.121"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6" checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f"
[[package]] [[package]]
name = "libclusterfuzz" name = "libclusterfuzz"
@ -1483,6 +1636,12 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "md5"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.4.1" version = "2.4.1"
@ -1622,6 +1781,24 @@ dependencies = [
"getrandom 0.2.3", "getrandom 0.2.3",
] ]
[[package]]
name = "native-tls"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09bf6f32a3afefd0b587ee42ed19acd945c6d1f3b5424040f50b2f24ab16be77"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]] [[package]]
name = "net2" name = "net2"
version = "0.2.37" version = "0.2.37"
@ -1755,6 +1932,26 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "oauth2"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80e47cfc4c0a1a519d9a025ebfbac3a2439d1b5cdf397d72dcb79b11d9920dab"
dependencies = [
"base64",
"chrono",
"getrandom 0.2.3",
"http",
"rand 0.8.4",
"reqwest",
"serde",
"serde_json",
"serde_path_to_error",
"sha2 0.9.9",
"thiserror",
"url",
]
[[package]] [[package]]
name = "object" name = "object"
version = "0.27.0" version = "0.27.0"
@ -1803,7 +2000,7 @@ dependencies = [
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
"sha2", "sha2 0.10.1",
"stacktrace-parser", "stacktrace-parser",
"storage-queue", "storage-queue",
"structopt", "structopt",
@ -1829,6 +2026,9 @@ dependencies = [
"arraydeque", "arraydeque",
"async-trait", "async-trait",
"atexit", "atexit",
"azure_core",
"azure_storage",
"azure_storage_blobs",
"backoff", "backoff",
"clap", "clap",
"coverage", "coverage",
@ -1846,6 +2046,7 @@ dependencies = [
"reqwest", "reqwest",
"reqwest-retry", "reqwest-retry",
"serde", "serde",
"serde-xml-rs",
"serde_json", "serde_json",
"stacktrace-parser", "stacktrace-parser",
"storage-queue", "storage-queue",
@ -1903,6 +2104,45 @@ dependencies = [
"z3-sys", "z3-sys",
] ]
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95"
dependencies = [
"bitflags",
"cfg-if 1.0.0",
"foreign-types 0.3.2",
"libc",
"once_cell",
"openssl-sys",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb"
dependencies = [
"autocfg",
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]] [[package]]
name = "os_pipe" name = "os_pipe"
version = "1.0.0" version = "1.0.0"
@ -2372,11 +2612,13 @@ dependencies = [
"http-body", "http-body",
"hyper", "hyper",
"hyper-rustls", "hyper-rustls",
"hyper-tls",
"ipnet", "ipnet",
"js-sys", "js-sys",
"lazy_static", "lazy_static",
"log", "log",
"mime", "mime",
"native-tls",
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
"rustls", "rustls",
@ -2385,6 +2627,7 @@ dependencies = [
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
"tokio", "tokio",
"tokio-native-tls",
"tokio-rustls", "tokio-rustls",
"tokio-util", "tokio-util",
"url", "url",
@ -2448,6 +2691,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.20.2" version = "0.20.2"
@ -2490,6 +2742,16 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "schannel"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75"
dependencies = [
"lazy_static",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "scopeguard" name = "scopeguard"
version = "1.1.0" version = "1.1.0"
@ -2526,6 +2788,35 @@ dependencies = [
"untrusted", "untrusted",
] ]
[[package]]
name = "security-framework"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc"
dependencies = [
"bitflags",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "semver"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d65bd28f48be7196d222d95b9243287f48d27aca604e08497513019ff0502cc4"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.130" version = "1.0.130"
@ -2570,6 +2861,15 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serde_path_to_error"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7868ad3b8196a8a0aea99a8220b124278ee5320a55e4fde97794b6f85b1a377"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "serde_qs" name = "serde_qs"
version = "0.8.4" version = "0.8.4"
@ -2593,6 +2893,19 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "sha2"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800"
dependencies = [
"block-buffer 0.9.0",
"cfg-if 1.0.0",
"cpufeatures",
"digest 0.9.0",
"opaque-debug",
]
[[package]] [[package]]
name = "sha2" name = "sha2"
version = "0.10.1" version = "0.10.1"
@ -2601,7 +2914,7 @@ checksum = "99c3bd8169c58782adad9290a9af5939994036b76187f7b4f0e6de91dbbfc0ec"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"cpufeatures", "cpufeatures",
"digest", "digest 0.10.0",
] ]
[[package]] [[package]]
@ -2756,7 +3069,7 @@ dependencies = [
"regex", "regex",
"serde", "serde",
"serde_json", "serde_json",
"sha2", "sha2 0.10.1",
] ]
[[package]] [[package]]
@ -3067,6 +3380,16 @@ dependencies = [
"syn 1.0.76", "syn 1.0.76",
] ]
[[package]]
name = "tokio-native-tls"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b"
dependencies = [
"native-tls",
"tokio",
]
[[package]] [[package]]
name = "tokio-rustls" name = "tokio-rustls"
version = "0.23.1" version = "0.23.1"
@ -3208,7 +3531,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b737c38ba258c25916dd4002b12e631b180b6ea63528147a94d6364f68d886df" checksum = "b737c38ba258c25916dd4002b12e631b180b6ea63528147a94d6364f68d886df"
dependencies = [ dependencies = [
"foreign-types", "foreign-types 0.5.0",
"libc", "libc",
"unwind-sys", "unwind-sys",
] ]
@ -3272,6 +3595,12 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]] [[package]]
name = "vec_map" name = "vec_map"
version = "0.8.2" version = "0.8.2"

View File

@ -28,6 +28,7 @@ num_cpus = "1.13"
reqwest = { version = "0.11", features = ["json", "stream", "rustls-tls"], default-features=false } reqwest = { version = "0.11", features = ["json", "stream", "rustls-tls"], default-features=false }
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde-xml-rs = "0.5.1"
onefuzz = { path = "../onefuzz" } onefuzz = { path = "../onefuzz" }
onefuzz-telemetry = { path = "../onefuzz-telemetry" } onefuzz-telemetry = { path = "../onefuzz-telemetry" }
path-absolutize = "3.0" path-absolutize = "3.0"
@ -44,3 +45,7 @@ tokio-stream = "0.1"
tui = { version = "0.16", default-features = false, features = ['crossterm'] } tui = { version = "0.16", default-features = false, features = ['crossterm'] }
url = { version = "2.2", features = ["serde"] } url = { version = "2.2", features = ["serde"] }
uuid = { version = "0.8", features = ["serde", "v4"] } uuid = { version = "0.8", features = ["serde", "v4"] }
azure_core = "0.1.1"
azure_storage = "0.1.0"
azure_storage_blobs = "0.1.0"

View File

@ -11,7 +11,7 @@ use crossterm::{
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use log::Level; use log::Level;
use onefuzz::utils::try_wait_all_join_handles; use onefuzz::utils::try_wait_all_join_handles;
use onefuzz_telemetry::{self, EventData}; use onefuzz_telemetry::{self, EventData, LogEvent};
use std::{ use std::{
collections::HashMap, collections::HashMap,
io::{self, Stdout}, io::{self, Stdout},
@ -233,13 +233,14 @@ impl TerminalUi {
while cancellation_rx.try_recv() == Err(broadcast::error::TryRecvError::Empty) { while cancellation_rx.try_recv() == Err(broadcast::error::TryRecvError::Empty) {
match rx.try_recv() { match rx.try_recv() {
Ok((_event, data)) => { Ok(LogEvent::Event((_event, data))) => {
let data = data let data = data
.into_iter() .into_iter()
.filter(Self::filter_event) .filter(Self::filter_event)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let _ = ui_event_tx.send(TerminalEvent::Telemetry(data)); let _ = ui_event_tx.send(TerminalEvent::Telemetry(data));
} }
Ok(_) => continue,
Err(TryRecvError::Empty) => sleep(EVENT_POLLING_PERIOD).await, Err(TryRecvError::Empty) => sleep(EVENT_POLLING_PERIOD).await,
Err(TryRecvError::Lagged(_)) => continue, Err(TryRecvError::Lagged(_)) => continue,
Err(TryRecvError::Closed) => break, Err(TryRecvError::Closed) => break,

View File

@ -7,7 +7,7 @@ use crate::tasks::coverage;
use crate::tasks::{ use crate::tasks::{
analysis, fuzz, analysis, fuzz,
heartbeat::{init_task_heartbeat, TaskHeartbeatClient}, heartbeat::{init_task_heartbeat, TaskHeartbeatClient},
merge, regression, report, merge, regression, report, task_logger,
}; };
use anyhow::Result; use anyhow::Result;
use onefuzz::machine_id::{get_machine_id, get_scaleset_name}; use onefuzz::machine_id::{get_machine_id, get_scaleset_name};
@ -46,6 +46,8 @@ pub struct CommonConfig {
pub microsoft_telemetry_key: Option<MicrosoftTelemetryKey>, pub microsoft_telemetry_key: Option<MicrosoftTelemetryKey>,
pub logs: Option<Url>,
#[serde(default)] #[serde(default)]
pub setup_dir: PathBuf, pub setup_dir: PathBuf,
@ -204,6 +206,7 @@ impl Config {
telemetry::set_property(EventData::Version(env!("ONEFUZZ_VERSION").to_string())); telemetry::set_property(EventData::Version(env!("ONEFUZZ_VERSION").to_string()));
telemetry::set_property(EventData::InstanceId(self.common().instance_id)); telemetry::set_property(EventData::InstanceId(self.common().instance_id));
telemetry::set_property(EventData::Role(Role::Agent)); telemetry::set_property(EventData::Role(Role::Agent));
let scaleset = get_scaleset_name().await?; let scaleset = get_scaleset_name().await?;
if let Some(scaleset_name) = &scaleset { if let Some(scaleset_name) = &scaleset {
telemetry::set_property(EventData::ScalesetId(scaleset_name.to_string())); telemetry::set_property(EventData::ScalesetId(scaleset_name.to_string()));
@ -212,6 +215,20 @@ impl Config {
info!("agent ready, dispatching task"); info!("agent ready, dispatching task");
self.report_event(); self.report_event();
let common = self.common().clone();
if let Some(logs) = common.logs.clone() {
let rx = onefuzz_telemetry::subscribe_to_events();
let _logging = tokio::spawn(async move {
let logger = task_logger::TaskLogger::new(
common.job_id,
common.task_id,
get_machine_id().await?,
);
logger.start(rx, logs).await
});
}
match self { match self {
#[cfg(any(target_os = "linux", target_os = "windows"))] #[cfg(any(target_os = "linux", target_os = "windows"))]
Config::Coverage(config) => coverage::generic::CoverageTask::new(config).run().await, Config::Coverage(config) => coverage::generic::CoverageTask::new(config).run().await,

View File

@ -12,4 +12,5 @@ pub mod merge;
pub mod regression; pub mod regression;
pub mod report; pub mod report;
pub mod stats; pub mod stats;
pub mod task_logger;
pub mod utils; pub mod utils;

View File

@ -0,0 +1,604 @@
#![allow(clippy::if_same_then_else)]
#![allow(dead_code)]
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use azure_core::HttpError;
use azure_storage::core::prelude::*;
use azure_storage_blobs::prelude::*;
use futures::{StreamExt, TryStreamExt};
use onefuzz_telemetry::LogEvent;
use reqwest::{StatusCode, Url};
use std::{path::PathBuf, sync::Arc, time::Duration};
use uuid::Uuid;
use tokio::sync::broadcast::Receiver;
const LOGS_BUFFER_SIZE: usize = 100;
const MAX_LOG_SIZE: u64 = 100000000; // 100 MB
const DEFAULT_LOGGING_INTERVAL: Duration = Duration::from_secs(60);
const DEFAULT_POLLING_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
#[serde(rename = "Error")]
struct RequestError {
code: String,
message: String,
}
#[derive(PartialEq, Debug)]
enum WriteLogResponse {
Success,
/// The message needs to be split into multiple parts.
MessageTooLarge,
/// the log file is full we need a new file
MaxSizeReached,
}
/// Abstracts the operation needed to write logs
#[async_trait]
trait LogWriter<T>: Send + Sync {
async fn write_logs(&self, logs: &[LogEvent]) -> Result<WriteLogResponse>;
/// creates a new blob file and returns the logWriter associated with it
async fn get_next_writer(&self) -> Result<Box<dyn LogWriter<T>>>;
}
/// Writes logs on azure blobs
pub struct BlobLogWriter {
container_client: Arc<ContainerClient>,
task_id: Uuid,
machine_id: Uuid,
blob_id: usize,
max_log_size: u64,
}
impl BlobLogWriter {
fn get_blob_name(&self) -> String {
format!("{}/{}/{}.log", self.task_id, self.machine_id, self.blob_id)
}
pub async fn create(
task_id: Uuid,
machine_id: Uuid,
log_container: Url,
max_log_size: u64,
) -> Result<Self> {
let container_client = TaskLogger::create_container_client(&log_container)?;
let prefix = format!("{}/{}", task_id, machine_id);
let blob_list = container_client
.list_blobs()
.prefix(prefix.as_str())
.execute()
.await
.map_err(|e| anyhow!(e.to_string()))?;
let mut blob_ids = blob_list
.blobs
.blobs
.iter()
.filter_map(|b| {
b.name
.strip_prefix(&prefix)
.map(PathBuf::from)
.filter(|file_name| {
file_name.extension().and_then(|f| f.to_str()) == Some("log")
})
.map(|file_name| file_name.with_extension(""))
.and_then(|file_name| {
file_name
.with_extension("")
.to_str()
.and_then(|f| f.parse::<usize>().ok())
})
})
.collect::<Vec<_>>();
blob_ids.sort_unstable();
let blob_id = match blob_ids.into_iter().last() {
Some(id) => id,
None => {
let blob_client = container_client.as_blob_client(format!("{}/1.log", prefix));
blob_client
.put_append_blob()
.execute()
.await
.map_err(|e| anyhow!(e.to_string()))?;
1
}
};
Ok(Self {
container_client,
task_id,
machine_id,
blob_id,
max_log_size,
})
}
}
#[async_trait]
impl LogWriter<BlobLogWriter> for BlobLogWriter {
async fn write_logs(&self, logs: &[LogEvent]) -> Result<WriteLogResponse> {
let blob_name = self.get_blob_name();
print!("{}", blob_name);
let blob_client = self.container_client.as_blob_client(blob_name);
let data_stream = logs
.iter()
.flat_map(|log_event| match log_event {
LogEvent::Event((ev, data)) => format!(
"{}: {}\n",
ev.as_str(),
data.iter()
.map(|p| p.as_values())
.map(|(name, val)| format!("{} {}", name, val))
.collect::<Vec<_>>()
.join(", ")
)
.into_bytes(),
LogEvent::Trace((level, msg)) => {
format!("{}: {}\n", level.as_str(), msg).into_bytes()
}
})
.collect::<Vec<_>>();
let result = blob_client
.append_block(data_stream)
.condition_max_size(self.max_log_size)
.execute()
.await;
match result {
Ok(_r) => Ok(WriteLogResponse::Success),
Err(e) => match e.downcast_ref::<HttpError>() {
Some(HttpError::StatusCode { status: s, body: b }) => {
if s == &StatusCode::PRECONDITION_FAILED
&& b.contains("MaxBlobSizeConditionNotMet")
{
Ok(WriteLogResponse::MaxSizeReached)
} else if s == &StatusCode::CONFLICT && b.contains("BlockCountExceedsLimit") {
Ok(WriteLogResponse::MaxSizeReached)
} else if s == &StatusCode::PAYLOAD_TOO_LARGE {
Ok(WriteLogResponse::MessageTooLarge)
} else {
Err(anyhow!(e.to_string()))
}
}
_ => Err(anyhow!(e.to_string())),
},
}
}
async fn get_next_writer(&self) -> Result<Box<dyn LogWriter<BlobLogWriter>>> {
let new_writer = Self {
blob_id: self.blob_id + 1,
container_client: self.container_client.clone(),
task_id: self.task_id,
machine_id: self.machine_id,
max_log_size: self.max_log_size,
};
let blob_client = self
.container_client
.as_blob_client(new_writer.get_blob_name());
blob_client
.put_append_blob()
.execute()
.await
.map_err(|e| anyhow!(e.to_string()))?;
Ok(Box::new(new_writer))
}
}
#[derive(Debug, Clone)]
pub struct TaskLogger {
job_id: Uuid,
task_id: Uuid,
machine_id: Uuid,
logging_interval: Duration,
log_buffer_size: usize,
polling_interval: Duration,
}
enum LoopState {
Receive,
InitLog { start: usize, count: usize },
Send { start: usize, count: usize },
}
struct LoopContext<T: Sized> {
pub log_writer: Box<dyn LogWriter<T>>,
pub pending_logs: Vec<LogEvent>,
pub state: LoopState,
pub event: Receiver<LogEvent>,
}
impl TaskLogger {
pub fn new(job_id: Uuid, task_id: Uuid, machine_id: Uuid) -> Self {
Self {
job_id,
task_id,
machine_id,
logging_interval: DEFAULT_LOGGING_INTERVAL,
log_buffer_size: LOGS_BUFFER_SIZE,
polling_interval: DEFAULT_POLLING_INTERVAL,
}
}
fn create_container_client(log_container: &Url) -> Result<Arc<ContainerClient>> {
let account = log_container
.domain()
.and_then(|d| d.split('.').next())
.ok_or(anyhow!("Invalid log container"))?
.to_owned();
let container = log_container
.path_segments()
.and_then(|mut ps| ps.next())
.ok_or(anyhow!("Invalid log container"))?
.to_owned();
let sas_token = log_container
.query()
.ok_or(anyhow!("Invalid log container"))?;
let http_client = azure_core::new_http_client();
let storage_account_client =
StorageAccountClient::new_sas_token(http_client, account, sas_token)?;
Ok(storage_account_client.as_container_client(container))
}
async fn event_loop<T: Send + Sized>(
self,
log_writer: Box<dyn LogWriter<T>>,
event: Receiver<LogEvent>,
) -> Result<()> {
let initial_state = LoopContext {
log_writer,
pending_logs: vec![],
state: LoopState::Receive,
event,
};
let _loop_result = futures::stream::repeat(0)
.map(Ok)
.try_fold(initial_state, |context, _| async {
match context.state {
LoopState::Send { start, count } => {
match context
.log_writer
.write_logs(&context.pending_logs[start..start + count])
.await?
{
WriteLogResponse::Success => {
if start + count >= context.pending_logs.len() {
Result::<_, anyhow::Error>::Ok(LoopContext {
pending_logs: vec![],
state: LoopState::Receive,
..context
})
} else {
let new_start = start + 1;
let new_count = context.pending_logs.len() - new_start;
Result::<_, anyhow::Error>::Ok(LoopContext {
state: LoopState::Send {
start: new_start,
count: new_count,
},
..context
})
}
}
WriteLogResponse::MaxSizeReached => {
Result::<_, anyhow::Error>::Ok(LoopContext {
state: LoopState::InitLog { start, count },
..context
})
}
WriteLogResponse::MessageTooLarge => {
// split the logs here
Result::<_, anyhow::Error>::Ok(LoopContext {
state: LoopState::Send {
start,
count: count / 2,
},
..context
})
}
}
}
LoopState::InitLog { start, count } => {
let new_writer = context.log_writer.get_next_writer().await?;
Result::<_, anyhow::Error>::Ok(LoopContext {
log_writer: new_writer,
state: LoopState::Send { start, count },
..context
})
}
LoopState::Receive => {
let mut event = context.event;
let mut data = Vec::with_capacity(self.log_buffer_size);
let now = tokio::time::Instant::now();
loop {
if data.len() >= self.log_buffer_size {
break;
}
if tokio::time::Instant::now() - now > self.logging_interval {
break;
}
if let Ok(v) = event.try_recv() {
data.push(v);
} else {
tokio::time::sleep(self.polling_interval).await;
}
}
if !data.is_empty() {
Result::<_, anyhow::Error>::Ok(LoopContext {
state: LoopState::Send {
start: 0,
count: data.len(),
},
pending_logs: data,
event,
..context
})
} else {
Result::<_, anyhow::Error>::Ok(LoopContext { event, ..context })
}
}
}
})
.await;
Ok(())
}
pub async fn start(&self, event: Receiver<LogEvent>, log_container: Url) -> Result<()> {
let blob_writer =
BlobLogWriter::create(self.task_id, self.machine_id, log_container, MAX_LOG_SIZE)
.await?;
self._start(event, Box::new(blob_writer)).await
}
async fn _start<T: 'static + Send>(
&self,
event: Receiver<LogEvent>,
log_writer: Box<dyn LogWriter<T>>,
) -> Result<()> {
self.clone().event_loop(log_writer, event).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::RwLock};
use super::*;
use reqwest::Url;
#[tokio::test]
#[ignore]
async fn test_get_blob() -> Result<()> {
let url = std::env::var("test_blob_logger_container")?;
let log_container = Url::parse(&url)?;
let client = TaskLogger::create_container_client(&log_container)?;
let response = client
.list_blobs()
.prefix(format!("job1/tak1/1"))
.execute()
.await
.map_err(|e| anyhow!(e.to_string()))?;
println!("blob prefix {:?}", response.blobs.blob_prefix);
for blob in response.blobs.blobs {
println!("{}", blob.name);
}
Ok(())
}
#[tokio::test]
#[ignore]
async fn test_write_log() -> Result<()> {
let url = std::env::var("test_blob_logger_container")?;
let log_container = Url::parse(&url)?;
let blob_logger = TaskLogger::new(Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4());
let (tx, rx) = tokio::sync::broadcast::channel(16);
tx.send(LogEvent::Trace((log::Level::Info, "test".into())))?;
blob_logger.start(rx, log_container).await?;
Ok(())
}
pub struct TestLogWriter {
events: Arc<RwLock<HashMap<usize, Vec<LogEvent>>>>,
id: usize,
max_size: usize,
}
#[async_trait]
impl LogWriter<TestLogWriter> for TestLogWriter {
async fn write_logs(&self, logs: &[LogEvent]) -> Result<WriteLogResponse> {
let mut events = self.events.write().unwrap();
let entry = &mut *events.entry(self.id).or_insert(Vec::new());
if entry.len() >= self.max_size {
Ok(WriteLogResponse::MaxSizeReached)
} else if logs.len() > 1 {
Ok(WriteLogResponse::MessageTooLarge)
} else {
for v in logs {
entry.push(v.clone());
}
Ok(WriteLogResponse::Success)
}
}
async fn get_next_writer(&self) -> Result<Box<dyn LogWriter<TestLogWriter>>> {
Ok(Box::new(Self {
events: self.events.clone(),
id: self.id + 1,
..*self
}))
}
}
#[tokio::test]
async fn test_task_logger_normal_messages() -> Result<()> {
let events = Arc::new(RwLock::new(HashMap::new()));
let log_writer = Box::new(TestLogWriter {
id: 0,
events: events.clone(),
max_size: 1,
});
let blob_logger = TaskLogger {
job_id: Uuid::new_v4(),
task_id: Uuid::new_v4(),
machine_id: Uuid::new_v4(),
logging_interval: Duration::from_secs(1),
log_buffer_size: 1,
polling_interval: Duration::from_secs(1),
};
let (tx, rx) = tokio::sync::broadcast::channel(16);
tx.send(LogEvent::Trace((log::Level::Info, "test1".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test2".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test3".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test4".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test5".into())))?;
let _res =
tokio::time::timeout(Duration::from_secs(5), blob_logger._start(rx, log_writer)).await;
let x = events.read().unwrap();
for (k, values) in x.iter() {
println!("{}", k);
for v in values {
println!(" {:?}", v);
}
}
assert_eq!(x.keys().len(), 5, "expected 5 groups of messages");
Ok(())
}
#[tokio::test]
async fn test_task_logger_big_messages() -> Result<()> {
let events = Arc::new(RwLock::new(HashMap::new()));
let log_writer = Box::new(TestLogWriter {
id: 0,
events: events.clone(),
max_size: 2,
});
let blob_logger = TaskLogger {
job_id: Uuid::new_v4(),
task_id: Uuid::new_v4(),
machine_id: Uuid::new_v4(),
logging_interval: Duration::from_secs(3),
log_buffer_size: 2,
polling_interval: Duration::from_secs(1),
};
let (tx, rx) = tokio::sync::broadcast::channel(16);
tx.send(LogEvent::Trace((log::Level::Info, "test1".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test2".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test3".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test4".into())))?;
tx.send(LogEvent::Trace((log::Level::Info, "test5".into())))?;
let _res =
tokio::time::timeout(Duration::from_secs(15), blob_logger._start(rx, log_writer)).await;
let x = events.read().unwrap();
for (k, values) in x.iter() {
println!("{}", k);
for v in values {
println!(" {:?}", v);
}
}
assert_eq!(x.keys().len(), 3, "expected 3 groups of messages");
Ok(())
}
#[tokio::test]
#[ignore]
async fn test_blob_writer_create() -> Result<()> {
let url = std::env::var("test_blob_logger_container")?;
let blob_writer =
BlobLogWriter::create(Uuid::new_v4(), Uuid::new_v4(), Url::parse(&url)?, 15).await?;
let blob_prefix = format!("{}/{}", blob_writer.task_id, blob_writer.machine_id);
print!("blob prefix {}", &blob_prefix);
let container_client = blob_writer.container_client.clone();
let blobs = container_client
.list_blobs()
.prefix(blob_prefix.clone())
.execute()
.await
.map_err(|e| anyhow!(e.to_string()))?;
// test initial blob creation
assert_eq!(blobs.blobs.blobs.len(), 1, "expected exactly one blob");
assert_eq!(
blobs.blobs.blobs[0].name,
format!("{}/1.log", &blob_prefix),
"Wrong file name"
);
println!("logging test event");
let result = blob_writer
.write_logs(&[LogEvent::Trace((log::Level::Info, "test".into()))])
.await
.map_err(|e| anyhow!(e.to_string()))?;
assert_eq!(result, WriteLogResponse::Success, "expected success");
// testing that we return MaxSizeReached when the size is exceeded
let result = blob_writer
.write_logs(&[LogEvent::Trace((log::Level::Info, "test".into()))])
.await
.map_err(|e| anyhow!(e.to_string()))?;
assert_eq!(
result,
WriteLogResponse::MaxSizeReached,
"expected MaxSizeReached"
);
// testing the creation of new blob when we call get_next_writer()
let _blob_writer = blob_writer.get_next_writer().await?;
let blobs = container_client
.list_blobs()
.prefix(blob_prefix.clone())
.execute()
.await
.map_err(|e| anyhow!(e.to_string()))?;
assert_eq!(blobs.blobs.blobs.len(), 2, "expected exactly 2 blob");
let blob_names = blobs
.blobs
.blobs
.iter()
.map(|b| b.name.clone())
.collect::<Vec<_>>();
assert!(
blob_names.contains(&format!("{}/2.log", &blob_prefix)),
"expected 2.log"
);
Ok(())
}
}

View File

@ -333,6 +333,12 @@ impl EventData {
} }
} }
#[derive(Clone, Debug)]
pub enum LogEvent {
Trace((log::Level, String)),
Event((Event, Vec<EventData>)),
}
mod global { mod global {
use std::sync::{ use std::sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
@ -355,7 +361,7 @@ mod global {
}; };
lazy_static! { lazy_static! {
pub static ref EVENT_SOURCE: Sender<(Event, Vec<EventData>)> = { pub static ref EVENT_SOURCE: Sender<LogEvent> = {
let (telemetry_event_source, _) = broadcast::channel::<_>(100); let (telemetry_event_source, _) = broadcast::channel::<_>(100);
telemetry_event_source telemetry_event_source
}; };
@ -532,10 +538,21 @@ fn try_broadcast_event(event: &Event, properties: &[EventData]) -> bool {
// we ignore any send error here because they indicate that // we ignore any send error here because they indicate that
// there are no receivers on the other end // there are no receivers on the other end
let (event, properties) = (event.clone(), properties.to_vec()); let (event, properties) = (event.clone(), properties.to_vec());
global::EVENT_SOURCE.send((event, properties)).is_ok() global::EVENT_SOURCE
.send(LogEvent::Event((event, properties)))
.is_ok()
} }
pub fn subscribe_to_events() -> Receiver<(Event, Vec<EventData>)> { pub fn try_broadcast_trace(msg: String, level: log::Level) -> bool {
// we ignore any send error here because they indicate that
// there are no receivers on the other end
global::EVENT_SOURCE
.send(LogEvent::Trace((level, msg)))
.is_ok()
}
pub fn subscribe_to_events() -> Receiver<LogEvent> {
global::EVENT_SOURCE.subscribe() global::EVENT_SOURCE.subscribe()
} }
@ -616,6 +633,7 @@ macro_rules! log {
if log_level <= log::max_level() { if log_level <= log::max_level() {
let msg = format!("{}", format_args!($($arg)+)); let msg = format!("{}", format_args!($($arg)+));
log::log!(log_level, "{}", msg); log::log!(log_level, "{}", msg);
onefuzz_telemetry::try_broadcast_trace(msg.to_string(), log_level);
onefuzz_telemetry::log_message($level, msg.to_string()); onefuzz_telemetry::log_message($level, msg.to_string());
} }
}}; }};