Migrate timer_task part 2 (#1855)

This commit is contained in:
Cheick Keita
2022-04-28 12:45:00 -07:00
committed by GitHub
parent 4aa8058b8b
commit fed6069f12
10 changed files with 1408 additions and 222 deletions

View File

@ -296,7 +296,59 @@ public enum Architecture {
x86_64
}
public enum TaskFeature {
InputQueueFromContainer,
SupervisorExe,
SupervisorEnv,
SupervisorOptions,
SupervisorInputMarker,
StatsFile,
StatsFormat,
TargetExe,
TargetExeOptional,
TargetEnv,
TargetOptions,
AnalyzerExe,
AnalyzerEnv,
AnalyzerOptions,
RenameOutput,
TargetOptionsMerge,
TargetWorkers,
GeneratorExe,
GeneratorEnv,
GeneratorOptions,
WaitForFiles,
TargetTimeout,
CheckAsanLog,
CheckDebugger,
CheckRetryCount,
EnsembleSyncDelay,
PreserveExistingOutputs,
CheckFuzzerHelp,
ExpectCrashOnFailure,
ReportList,
MinimizedStackDepth,
CoverageFilter,
TargetMustUseInput
}
[Flags]
public enum ContainerPermission {
Read = 1 << 0,
Write = 1 << 1,
List = 1 << 2,
Delete = 1 << 3,
}
public enum Compare {
Equal,
AtLeast,
AtMost
}
public enum AgentMode {
Fuzz,
Repro,

View File

@ -596,3 +596,108 @@ public record Job(
public List<JobTaskInfo>? TaskInfo { get; set; }
public UserInfo? UserInfo { get; set; }
}
public record WorkUnit(
Guid JobId,
Guid TaskId,
TaskType TaskType,
TaskUnitConfig Config
);
public record VmDefinition(
Compare Compare,
int Value
);
public record TaskDefinition(
TaskFeature[] Features,
VmDefinition Vm,
ContainerDefinition[] Containers,
ContainerType? MonitorQueue = null
);
public record WorkSet(
bool Reboot,
Uri SetupUrl,
bool Script,
List<WorkUnit> WorkUnits
);
public record ContainerDefinition(
ContainerType Type,
Compare Compare,
int Value,
ContainerPermission Permissions);
// TODO: service shouldn't pass SyncedDir, but just the url and let the agent
// come up with paths
public record SyncedDir(string Path, Uri url);
public interface IContainerDef { }
public record SingleContainer(SyncedDir SyncedDir) : IContainerDef;
public record MultipleContainer(List<SyncedDir> SyncedDirs) : IContainerDef;
public record TaskUnitConfig(
Guid InstanceId,
Guid JobId,
Guid TaskId,
Uri logs,
TaskType TaskType,
string? InstanceTelemetryKey,
string? MicrosoftTelemetryKey,
Uri HeartbeatQueue
) {
public Uri? inputQueue { get; set; }
public String? SupervisorExe { get; set; }
public Dictionary<string, string>? SupervisorEnv { get; set; }
public List<string>? SupervisorOptions { get; set; }
public string? SupervisorInputMarker { get; set; }
public string? TargetExe { get; set; }
public Dictionary<string, string>? TargetEnv { get; set; }
public List<string>? TargetOptions { get; set; }
public int? TargetTimeout { get; set; }
public bool? TargetOptionsMerge { get; set; }
public int? TargetWorkers { get; set; }
public bool? CheckAsanLog { get; set; }
public bool? CheckDebugger { get; set; }
public int? CheckRetryCount { get; set; }
public bool? CheckFuzzerHelp { get; set; }
public bool? ExpectCrashOnFailure { get; set; }
public bool? RenameOutput { get; set; }
public string? GeneratorExe { get; set; }
public Dictionary<string, string>? GeneratorEnv { get; set; }
public List<string>? GeneratorOptions { get; set; }
public ContainerType? WaitForFiles { get; set; }
public string? AnalyzerExe { get; set; }
public Dictionary<string, string>? AnalyzerEnv { get; set; }
public List<string>? AnalyzerOptions { get; set; }
public string? StatsFile { get; set; }
public StatsFormat? StatsFormat { get; set; }
public int? EnsembleSyncDelay { get; set; }
public List<string>? ReportList { get; set; }
public int? MinimizedStackDepth { get; set; }
public string? CoverageFilter { get; set; }
// from here forwards are Container definitions. These need to be inline
// with TaskDefinitions and ContainerTypes
public IContainerDef? Analysis { get; set; }
public IContainerDef? Coverage { get; set; }
public IContainerDef? Crashes { get; set; }
public IContainerDef? Inputs { get; set; }
public IContainerDef? NoRepro { get; set; }
public IContainerDef? ReadonlyInputs { get; set; }
public IContainerDef? Reports { get; set; }
public IContainerDef? Tools { get; set; }
public IContainerDef? UniqueInputs { get; set; }
public IContainerDef? UniqueReports { get; set; }
public IContainerDef? RegressionReport { get; set; }
}

View File

@ -0,0 +1,264 @@
using Azure.Storage.Sas;
namespace Microsoft.OneFuzz.Service;
public interface IConfig {
string GetSetupContainer(TaskConfig config);
Async.Task<TaskUnitConfig> BuildTaskConfig(Job job, Task task);
}
public class Config : IConfig {
private readonly IContainers _containers;
private readonly IServiceConfig _serviceConfig;
private readonly IQueue _queue;
public Config(IContainers containers, IServiceConfig serviceConfig, IQueue queue) {
_containers = containers;
_serviceConfig = serviceConfig;
_queue = queue;
}
private BlobContainerSasPermissions ConvertPermissions(ContainerPermission permission) {
BlobContainerSasPermissions blobPermissions = 0;
if (permission.HasFlag(ContainerPermission.Read)) {
blobPermissions |= BlobContainerSasPermissions.Read;
}
if (permission.HasFlag(ContainerPermission.Write)) {
blobPermissions |= BlobContainerSasPermissions.Write;
}
if (permission.HasFlag(ContainerPermission.Delete)) {
blobPermissions |= BlobContainerSasPermissions.Delete;
}
if (permission.HasFlag(ContainerPermission.List)) {
blobPermissions |= BlobContainerSasPermissions.List;
}
return blobPermissions;
}
public async Async.Task<TaskUnitConfig> BuildTaskConfig(Job job, Task task) {
if (!Defs.TASK_DEFINITIONS.ContainsKey(task.Config.Task.Type)) {
throw new Exception($"unsupported task type: {task.Config.Task.Type}");
}
if (job.Config.Logs == null) {
throw new Exception($"Missing log container: job_id {job.JobId}, task_id {task.TaskId}");
}
var definition = Defs.TASK_DEFINITIONS[task.Config.Task.Type];
var config = new TaskUnitConfig(
InstanceId: await _containers.GetInstanceId(),
JobId: job.JobId,
TaskId: task.TaskId,
logs: await _containers.AddContainerSasUrl(new Uri(job.Config.Logs)),
TaskType: task.Config.Task.Type,
InstanceTelemetryKey: _serviceConfig.ApplicationInsightsInstrumentationKey,
MicrosoftTelemetryKey: _serviceConfig.OneFuzzTelemetry,
HeartbeatQueue: await _queue.GetQueueSas("task-heartbeat", StorageType.Config, QueueSasPermissions.Add) ?? throw new Exception("unable to get heartbeat queue sas")
);
if (definition.MonitorQueue != null) {
config.inputQueue = await _queue.GetQueueSas(task.TaskId.ToString(), StorageType.Config, QueueSasPermissions.Add | QueueSasPermissions.Read | QueueSasPermissions.Update | QueueSasPermissions.Process);
}
var containersByType = definition.Containers.Where(c => c.Type != ContainerType.Setup && task.Config.Containers != null)
.ToAsyncEnumerable()
.SelectAwait(async countainerDef => {
var containers = await
task.Config.Containers!
.Where(c => c.Type == countainerDef.Type).Select(container => (countainerDef, container))
.Where(x => x.container != null)
.ToAsyncEnumerable()
.SelectAwait(async (x, i) =>
new SyncedDir(
string.Join("_", "task", x.Item1.Type.ToString().ToLower(), i),
await _containers.GetContainerSasUrl(x.Item2.Name, StorageType.Corpus, ConvertPermissions(x.Item1.Permissions)))
).ToListAsync();
return (countainerDef, containers);
}
);
await foreach (var data in containersByType) {
IContainerDef def = data.countainerDef switch {
ContainerDefinition { Compare: Compare.Equal, Value: 1 } or
ContainerDefinition { Compare: Compare.AtMost, Value: 1 } => new SingleContainer(data.containers[0]),
_ => new MultipleContainer(data.containers)
};
switch (data.countainerDef.Type) {
case ContainerType.Analysis:
config.Analysis = def;
break;
case ContainerType.Coverage:
config.Coverage = def;
break;
case ContainerType.Crashes:
config.Crashes = def;
break;
case ContainerType.Inputs:
config.Inputs = def;
break;
case ContainerType.NoRepro:
config.NoRepro = def;
break;
case ContainerType.ReadonlyInputs:
config.ReadonlyInputs = def;
break;
case ContainerType.Reports:
config.Reports = def;
break;
case ContainerType.Tools:
config.Tools = def;
break;
case ContainerType.UniqueInputs:
config.UniqueInputs = def;
break;
case ContainerType.UniqueReports:
config.UniqueReports = def;
break;
}
}
if (definition.Features.Contains(TaskFeature.SupervisorExe)) {
config.SupervisorExe = task.Config.Task.SupervisorExe;
}
if (definition.Features.Contains(TaskFeature.SupervisorEnv)) {
config.SupervisorEnv = task.Config.Task.SupervisorEnv ?? new Dictionary<string, string>();
}
if (definition.Features.Contains(TaskFeature.SupervisorOptions)) {
config.SupervisorOptions = task.Config.Task.SupervisorOptions ?? new List<string>();
}
if (definition.Features.Contains(TaskFeature.SupervisorInputMarker)) {
config.SupervisorInputMarker = task.Config.Task.SupervisorInputMarker;
}
if (definition.Features.Contains(TaskFeature.TargetExe)) {
config.TargetExe = $"setup/{task.Config.Task.TargetExe}";
}
if (definition.Features.Contains(TaskFeature.TargetExeOptional) && config.TargetExe != null) {
config.TargetExe = $"setup/{task.Config.Task.TargetExe}";
}
if (definition.Features.Contains(TaskFeature.TargetEnv)) {
config.TargetEnv = task.Config.Task.TargetEnv ?? new Dictionary<string, string>();
}
if (definition.Features.Contains(TaskFeature.TargetOptions)) {
config.TargetOptions = task.Config.Task.TargetOptions ?? new List<string>();
}
if (definition.Features.Contains(TaskFeature.TargetOptionsMerge)) {
config.TargetOptionsMerge = task.Config.Task.TargetOptionsMerge ?? false;
}
if (definition.Features.Contains(TaskFeature.TargetWorkers)) {
config.TargetWorkers = task.Config.Task.TargetWorkers;
}
if (definition.Features.Contains(TaskFeature.RenameOutput)) {
config.RenameOutput = task.Config.Task.RenameOutput;
}
if (definition.Features.Contains(TaskFeature.GeneratorExe)) {
config.GeneratorExe = task.Config.Task.GeneratorExe;
}
if (definition.Features.Contains(TaskFeature.GeneratorOptions)) {
config.GeneratorOptions = task.Config.Task.GeneratorOptions ?? new List<string>();
}
if (definition.Features.Contains(TaskFeature.WaitForFiles) && task.Config.Task.WaitForFiles != null) {
config.WaitForFiles = task.Config.Task.WaitForFiles;
}
if (definition.Features.Contains(TaskFeature.AnalyzerExe)) {
config.AnalyzerExe = task.Config.Task.AnalyzerExe;
}
if (definition.Features.Contains(TaskFeature.AnalyzerOptions)) {
config.AnalyzerOptions = task.Config.Task.AnalyzerOptions ?? new List<string>();
}
if (definition.Features.Contains(TaskFeature.AnalyzerEnv)) {
config.AnalyzerEnv = task.Config.Task.AnalyzerEnv ?? new Dictionary<string, string>();
}
if (definition.Features.Contains(TaskFeature.StatsFile)) {
config.StatsFile = task.Config.Task.StatsFile;
config.StatsFormat = task.Config.Task.StatsFormat;
}
if (definition.Features.Contains(TaskFeature.TargetTimeout)) {
config.TargetTimeout = task.Config.Task.TargetTimeout;
}
if (definition.Features.Contains(TaskFeature.CheckAsanLog)) {
config.CheckAsanLog = task.Config.Task.CheckAsanLog;
}
if (definition.Features.Contains(TaskFeature.CheckDebugger)) {
config.CheckDebugger = task.Config.Task.CheckDebugger;
}
if (definition.Features.Contains(TaskFeature.CheckRetryCount)) {
config.CheckRetryCount = task.Config.Task.CheckRetryCount ?? 0;
}
if (definition.Features.Contains(TaskFeature.EnsembleSyncDelay)) {
config.EnsembleSyncDelay = task.Config.Task.EnsembleSyncDelay;
}
if (definition.Features.Contains(TaskFeature.CheckFuzzerHelp)) {
config.CheckFuzzerHelp = task.Config.Task.CheckFuzzerHelp ?? true;
}
if (definition.Features.Contains(TaskFeature.ReportList)) {
config.ReportList = task.Config.Task.ReportList;
}
if (definition.Features.Contains(TaskFeature.MinimizedStackDepth)) {
config.MinimizedStackDepth = task.Config.Task.MinimizedStackDepth;
}
if (definition.Features.Contains(TaskFeature.ExpectCrashOnFailure)) {
config.ExpectCrashOnFailure = task.Config.Task.ExpectCrashOnFailure ?? true;
}
if (definition.Features.Contains(TaskFeature.CoverageFilter)) {
var coverageFilter = task.Config.Task.CoverageFilter;
if (coverageFilter != null) {
config.CoverageFilter = $"setup/{coverageFilter}";
}
}
return config;
}
public string GetSetupContainer(TaskConfig config) {
foreach (var container in config.Containers ?? throw new Exception("Missing containers")) {
if (container.Type == ContainerType.Setup) {
return container.Name.ContainerName;
}
}
throw new Exception($"task missing setup container: task_type = {config.Task.Type}");
}
}

View File

@ -1,176 +1,197 @@
using Azure;
using Azure.ResourceManager;
using Azure.Storage;
using Azure.Storage.Blobs;
using Azure.Storage.Sas;
namespace Microsoft.OneFuzz.Service;
public interface IContainers {
public Async.Task<BinaryData?> GetBlob(Container container, string name, StorageType storageType);
public Async.Task<BlobContainerClient?> FindContainer(Container container, StorageType storageType);
public Async.Task<Uri?> GetFileSasUrl(Container container, string name, StorageType storageType, BlobSasPermissions permissions, TimeSpan? duration = null);
public Async.Task SaveBlob(Container container, string v1, string v2, StorageType config);
public Async.Task<Guid> GetInstanceId();
public Async.Task<Uri?> GetFileUrl(Container container, string name, StorageType storageType);
public Async.Task<Uri?> GetContainerSasUrl(Container container, StorageType storageType, BlobSasPermissions permissions);
}
public class Containers : IContainers {
private ILogTracer _log;
private IStorage _storage;
private ICreds _creds;
private ArmClient _armClient;
public Containers(ILogTracer log, IStorage storage, ICreds creds) {
_log = log;
_storage = storage;
_creds = creds;
_armClient = creds.ArmClient;
}
public async Async.Task<Uri?> GetFileUrl(Container container, string name, StorageType storageType) {
var client = await FindContainer(container, storageType);
if (client is null)
return null;
return new Uri($"{GetUrl(client.AccountName)}{container}/{name}");
}
public async Async.Task<BinaryData?> GetBlob(Container container, string name, StorageType storageType) {
var client = await FindContainer(container, storageType);
if (client == null) {
return null;
}
try {
return (await client.GetBlobClient(name).DownloadContentAsync())
.Value.Content;
} catch (RequestFailedException) {
return null;
}
}
public async Async.Task<BlobContainerClient?> FindContainer(Container container, StorageType storageType) {
// # check secondary accounts first by searching in reverse.
// #
// # By implementation, the primary account is specified first, followed by
// # any secondary accounts.
// #
// # Secondary accounts, if they exist, are preferred for containers and have
// # increased IOP rates, this should be a slight optimization
var containers = _storage.GetAccounts(storageType)
.Reverse()
.Select(async account => (await GetBlobService(account))?.GetBlobContainerClient(container.ContainerName));
foreach (var c in containers) {
var client = await c;
if (client != null && (await client.ExistsAsync()).Value) {
return client;
}
}
return null;
}
private async Async.Task<BlobServiceClient?> GetBlobService(string accountId) {
_log.Info($"getting blob container (account_id: {accountId}");
var (accountName, accountKey) = await _storage.GetStorageAccountNameAndKey(accountId);
if (accountName == null) {
_log.Error("Failed to get storage account name");
return null;
}
var storageKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
var accountUrl = GetUrl(accountName);
return new BlobServiceClient(accountUrl, storageKeyCredential);
}
private static Uri GetUrl(string accountName) {
return new Uri($"https://{accountName}.blob.core.windows.net/");
}
public async Async.Task<Uri?> GetFileSasUrl(Container container, string name, StorageType storageType, BlobSasPermissions permissions, TimeSpan? duration = null) {
var client = await FindContainer(container, storageType) ?? throw new Exception($"unable to find container: {container.ContainerName} - {storageType}");
var (accountName, accountKey) = await _storage.GetStorageAccountNameAndKey(client.AccountName);
var (startTime, endTime) = SasTimeWindow(duration ?? TimeSpan.FromDays(30));
var sasBuilder = new BlobSasBuilder(permissions, endTime) {
StartsOn = startTime,
BlobContainerName = container.ContainerName,
BlobName = name
};
var sasUrl = client.GetBlobClient(name).GenerateSasUri(sasBuilder);
return sasUrl;
}
public (DateTimeOffset, DateTimeOffset) SasTimeWindow(TimeSpan timeSpan) {
// SAS URLs are valid 6 hours earlier, primarily to work around dev
// workstations having out-of-sync time. Additionally, SAS URLs are stopped
// 15 minutes later than requested based on "Be careful with SAS start time"
// guidance.
// Ref: https://docs.microsoft.com/en-us/azure/storage/common/storage-sas-overview
var SAS_START_TIME_DELTA = TimeSpan.FromHours(6);
var SAS_END_TIME_DELTA = TimeSpan.FromMinutes(6);
var now = DateTimeOffset.UtcNow;
var start = now - SAS_START_TIME_DELTA;
var expiry = now + timeSpan + SAS_END_TIME_DELTA;
return (start, expiry);
}
public async Async.Task SaveBlob(Container container, string name, string data, StorageType storageType) {
var client = await FindContainer(container, storageType) ?? throw new Exception($"unable to find container: {container.ContainerName} - {storageType}");
await client.UploadBlobAsync(name, new BinaryData(data));
}
//TODO: get this ones on startup and cache (and make this method un-accessible to everyone else)
public async Async.Task<Guid> GetInstanceId() {
var blob = await GetBlob(new Container("base-config"), "instance_id", StorageType.Config);
if (blob == null) {
throw new System.Exception("Blob Not Found");
}
return System.Guid.Parse(blob.ToString());
}
public Uri? GetContainerSasUrlService(
BlobContainerClient client,
BlobSasPermissions permissions,
bool tag = false,
TimeSpan? timeSpan = null) {
var (start, expiry) = SasTimeWindow(timeSpan ?? TimeSpan.FromDays(30.0));
var sasBuilder = new BlobSasBuilder(permissions, expiry) { StartsOn = start };
var sas = client.GenerateSasUri(sasBuilder);
return sas;
}
//TODO: instead of returning null when container not found, convert to return to "Result" type and set appropriate error
public async Async.Task<Uri?> GetContainerSasUrl(Container container, StorageType storageType, BlobSasPermissions permissions) {
var client = await FindContainer(container, storageType);
if (client is null) {
return null;
}
var uri = GetContainerSasUrlService(client, permissions);
if (uri is null) {
//TODO: return result error
return uri;
} else {
return uri;
}
}
}
using Azure;
using Azure.ResourceManager;
using Azure.Storage;
using Azure.Storage.Blobs;
using Azure.Storage.Sas;
namespace Microsoft.OneFuzz.Service;
public interface IContainers {
public Async.Task<BinaryData?> GetBlob(Container container, string name, StorageType storageType);
public Async.Task<BlobContainerClient?> FindContainer(Container container, StorageType storageType);
public Async.Task<Uri?> GetFileSasUrl(Container container, string name, StorageType storageType, BlobSasPermissions permissions, TimeSpan? duration = null);
public Async.Task SaveBlob(Container container, string v1, string v2, StorageType config);
public Async.Task<Guid> GetInstanceId();
public Async.Task<Uri?> GetFileUrl(Container container, string name, StorageType storageType);
public Async.Task<Uri> GetContainerSasUrl(Container container, StorageType storageType, BlobContainerSasPermissions permissions, TimeSpan? duration = null);
public Async.Task<bool> BlobExists(Container container, string name, StorageType storageType);
public Async.Task<Uri> AddContainerSasUrl(Uri uri);
}
public class Containers : IContainers {
private ILogTracer _log;
private IStorage _storage;
private ICreds _creds;
private ArmClient _armClient;
public Containers(ILogTracer log, IStorage storage, ICreds creds) {
_log = log;
_storage = storage;
_creds = creds;
_armClient = creds.ArmClient;
}
public async Async.Task<Uri?> GetFileUrl(Container container, string name, StorageType storageType) {
var client = await FindContainer(container, storageType);
if (client is null)
return null;
return new Uri($"{GetUrl(client.AccountName)}{container}/{name}");
}
public async Async.Task<BinaryData?> GetBlob(Container container, string name, StorageType storageType) {
var client = await FindContainer(container, storageType);
if (client == null) {
return null;
}
try {
return (await client.GetBlobClient(name).DownloadContentAsync())
.Value.Content;
} catch (RequestFailedException) {
return null;
}
}
public async Async.Task<BlobContainerClient?> FindContainer(Container container, StorageType storageType) {
// # check secondary accounts first by searching in reverse.
// #
// # By implementation, the primary account is specified first, followed by
// # any secondary accounts.
// #
// # Secondary accounts, if they exist, are preferred for containers and have
// # increased IOP rates, this should be a slight optimization
var containers = _storage.GetAccounts(storageType)
.Reverse()
.Select(async account => (await GetBlobService(account))?.GetBlobContainerClient(container.ContainerName));
foreach (var c in containers) {
var client = await c;
if (client != null && (await client.ExistsAsync()).Value) {
return client;
}
}
return null;
}
private async Async.Task<BlobServiceClient?> GetBlobService(string accountId) {
_log.Info($"getting blob container (account_id: {accountId}");
var (accountName, accountKey) = await _storage.GetStorageAccountNameAndKey(accountId);
if (accountName == null) {
_log.Error("Failed to get storage account name");
return null;
}
var storageKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
var accountUrl = GetUrl(accountName);
return new BlobServiceClient(accountUrl, storageKeyCredential);
}
private static Uri GetUrl(string accountName) {
return new Uri($"https://{accountName}.blob.core.windows.net/");
}
public async Async.Task<Uri?> GetFileSasUrl(Container container, string name, StorageType storageType, BlobSasPermissions permissions, TimeSpan? duration = null) {
var client = await FindContainer(container, storageType) ?? throw new Exception($"unable to find container: {container.ContainerName} - {storageType}");
var (startTime, endTime) = SasTimeWindow(duration ?? TimeSpan.FromDays(30));
var sasBuilder = new BlobSasBuilder(permissions, endTime) {
StartsOn = startTime,
BlobContainerName = container.ContainerName,
BlobName = name
};
var sasUrl = client.GetBlobClient(name).GenerateSasUri(sasBuilder);
return sasUrl;
}
public (DateTimeOffset, DateTimeOffset) SasTimeWindow(TimeSpan timeSpan) {
// SAS URLs are valid 6 hours earlier, primarily to work around dev
// workstations having out-of-sync time. Additionally, SAS URLs are stopped
// 15 minutes later than requested based on "Be careful with SAS start time"
// guidance.
// Ref: https://docs.microsoft.com/en-us/azure/storage/common/storage-sas-overview
var SAS_START_TIME_DELTA = TimeSpan.FromHours(6);
var SAS_END_TIME_DELTA = TimeSpan.FromMinutes(6);
var now = DateTimeOffset.UtcNow;
var start = now - SAS_START_TIME_DELTA;
var expiry = now + timeSpan + SAS_END_TIME_DELTA;
return (start, expiry);
}
public async Async.Task SaveBlob(Container container, string name, string data, StorageType storageType) {
var client = await FindContainer(container, storageType) ?? throw new Exception($"unable to find container: {container.ContainerName} - {storageType}");
await client.UploadBlobAsync(name, new BinaryData(data));
}
//TODO: get this ones on startup and cache (and make this method un-accessible to everyone else)
public async Async.Task<Guid> GetInstanceId() {
var blob = await GetBlob(new Container("base-config"), "instance_id", StorageType.Config);
if (blob == null) {
throw new System.Exception("Blob Not Found");
}
return System.Guid.Parse(blob.ToString());
}
public Uri? GetContainerSasUrlService(
BlobContainerClient client,
BlobSasPermissions permissions,
bool tag = false,
TimeSpan? timeSpan = null) {
var (start, expiry) = SasTimeWindow(timeSpan ?? TimeSpan.FromDays(30.0));
var sasBuilder = new BlobSasBuilder(permissions, expiry) { StartsOn = start };
var sas = client.GenerateSasUri(sasBuilder);
return sas;
}
public async Async.Task<Uri> AddContainerSasUrl(Uri uri) {
if (uri.Query.Contains("sig")) {
return uri;
}
var accountName = uri.Host.Split('.')[0];
var (_, accountKey) = await _storage.GetStorageAccountNameAndKey(accountName);
var sasBuilder = new BlobSasBuilder(
BlobContainerSasPermissions.Read | BlobContainerSasPermissions.Write | BlobContainerSasPermissions.Delete | BlobContainerSasPermissions.List,
DateTimeOffset.UtcNow + TimeSpan.FromHours(1));
var sas = sasBuilder.ToSasQueryParameters(new StorageSharedKeyCredential(accountName, accountKey)).ToString();
return new UriBuilder(uri) {
Query = sas
}.Uri;
}
public async Async.Task<Uri> GetContainerSasUrl(Container container, StorageType storageType, BlobContainerSasPermissions permissions, TimeSpan? duration = null) {
var client = await FindContainer(container, storageType) ?? throw new Exception($"unable to find container: {container.ContainerName} - {storageType}");
var (accountName, accountKey) = await _storage.GetStorageAccountNameAndKey(client.AccountName);
var (startTime, endTime) = SasTimeWindow(duration ?? TimeSpan.FromDays(30));
var sasBuilder = new BlobSasBuilder(permissions, endTime) {
StartsOn = startTime,
BlobContainerName = container.ContainerName,
};
var sasUrl = client.GenerateSasUri(sasBuilder);
return sasUrl;
}
public async Async.Task<bool> BlobExists(Container container, string name, StorageType storageType) {
var client = await FindContainer(container, storageType) ?? throw new Exception($"unable to find container: {container.ContainerName} - {storageType}");
return await client.GetBlobClient(name).ExistsAsync();
}
}

View File

@ -0,0 +1,559 @@
namespace Microsoft.OneFuzz.Service;
public static class Defs {
public static Dictionary<TaskType, TaskDefinition> TASK_DEFINITIONS = new Dictionary<TaskType, TaskDefinition>() {
{ TaskType.Coverage ,
new TaskDefinition(
Features: new[] {
TaskFeature.TargetExe,
TaskFeature.TargetEnv,
TaskFeature.TargetOptions,
TaskFeature.TargetTimeout,
TaskFeature.CoverageFilter,
TaskFeature.TargetMustUseInput,
},
Vm: new VmDefinition(Compare: Compare.Equal, Value:1),
Containers: new [] {
new ContainerDefinition(
Type:ContainerType.Setup,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.ReadonlyInputs,
Compare: Compare.AtLeast,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Coverage,
Compare: Compare.Equal,
Value:1,
Permissions:
ContainerPermission.List |
ContainerPermission.Read |
ContainerPermission.Write
)},
MonitorQueue: ContainerType.ReadonlyInputs)
},
{ TaskType.GenericAnalysis ,
new TaskDefinition(
Features: new[] {
TaskFeature.TargetExe,
TaskFeature.TargetOptions,
TaskFeature.AnalyzerExe,
TaskFeature.AnalyzerEnv,
TaskFeature.AnalyzerOptions,
},
Vm: new VmDefinition(Compare: Compare.AtLeast, Value:1),
Containers: new [] {
new ContainerDefinition(
Type:ContainerType.Setup,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Analysis,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Write | ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Crashes,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Tools,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
},
MonitorQueue: ContainerType.Crashes)
},
{
TaskType.LibfuzzerFuzz , new TaskDefinition(
Features: new[] {
TaskFeature.TargetExe,
TaskFeature.TargetEnv,
TaskFeature.TargetOptions,
TaskFeature.TargetWorkers,
TaskFeature.EnsembleSyncDelay,
TaskFeature.CheckFuzzerHelp,
TaskFeature.ExpectCrashOnFailure,
},
Vm: new VmDefinition(Compare: Compare.AtLeast, Value: 1),
Containers: new[] {
new ContainerDefinition(
Type:ContainerType.Setup,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Crashes,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Write
),
new ContainerDefinition(
Type: ContainerType.Inputs,
Compare: Compare.Equal,
Value: 1,
Permissions: ContainerPermission.Write | ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type: ContainerType.ReadonlyInputs,
Compare: Compare.AtLeast,
Value: 0,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
}
)},
{
TaskType.LibfuzzerCrashReport , new TaskDefinition(
Features: new[] {
TaskFeature.TargetExe,
TaskFeature.TargetEnv,
TaskFeature.TargetOptions,
TaskFeature.TargetTimeout,
TaskFeature.CheckRetryCount,
TaskFeature.CheckFuzzerHelp,
TaskFeature.MinimizedStackDepth,
},
Vm: new VmDefinition(Compare: Compare.AtLeast, Value: 1),
Containers: new[] {
new ContainerDefinition(
Type:ContainerType.Setup,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Crashes,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Reports,
Compare: Compare.AtMost,
Value:1,
Permissions: ContainerPermission.Write
),
new ContainerDefinition(
Type: ContainerType.UniqueReports,
Compare: Compare.AtMost,
Value: 1,
Permissions: ContainerPermission.Write
),
new ContainerDefinition(
Type: ContainerType.NoRepro,
Compare: Compare.AtMost,
Value: 1,
Permissions: ContainerPermission.Write
),
},
MonitorQueue: ContainerType.Crashes
)
},
{
TaskType.LibfuzzerCoverage , new TaskDefinition(
Features: new[] {
TaskFeature.TargetExe,
TaskFeature.TargetEnv,
TaskFeature.TargetOptions,
TaskFeature.CheckFuzzerHelp,
},
Vm: new VmDefinition(Compare: Compare.Equal, Value: 1),
Containers: new[] {
new ContainerDefinition(
Type:ContainerType.Setup,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.ReadonlyInputs,
Compare: Compare.AtLeast,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Coverage,
Compare: Compare.Equal,
Value:1,
Permissions:
ContainerPermission.List |
ContainerPermission.Read |
ContainerPermission.Write
)},
MonitorQueue: ContainerType.ReadonlyInputs
)},
{
TaskType.LibfuzzerMerge , new TaskDefinition(
Features: new[] {
TaskFeature.TargetExe,
TaskFeature.TargetEnv,
TaskFeature.TargetOptions,
TaskFeature.CheckFuzzerHelp,
},
Vm: new VmDefinition(Compare: Compare.Equal, Value: 1),
Containers: new[] {
new ContainerDefinition(
Type:ContainerType.Setup,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.UniqueReports,
Compare: Compare.Equal,
Value:1,
Permissions:
ContainerPermission.List |
ContainerPermission.Read |
ContainerPermission.Write
),
new ContainerDefinition(
Type: ContainerType.Inputs,
Compare: Compare.AtLeast,
Value: 0,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
}
)
},
{
TaskType.GenericSupervisor , new TaskDefinition(
Features: new[] {
TaskFeature.TargetExe,
TaskFeature.TargetOptions,
TaskFeature.SupervisorExe,
TaskFeature.SupervisorEnv,
TaskFeature.SupervisorOptions,
TaskFeature.SupervisorInputMarker,
TaskFeature.WaitForFiles,
TaskFeature.StatsFile,
TaskFeature.EnsembleSyncDelay,
},
Vm: new VmDefinition(Compare: Compare.AtLeast, Value: 1),
Containers: new[] {
new ContainerDefinition(
Type:ContainerType.Setup,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Tools,
Compare: Compare.AtMost,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Crashes,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Write
),
new ContainerDefinition(
Type: ContainerType.Inputs,
Compare: Compare.Equal,
Value: 1,
Permissions: ContainerPermission.Write | ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type: ContainerType.UniqueReports,
Compare: Compare.AtMost,
Value: 1,
Permissions: ContainerPermission.Write | ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type: ContainerType.Reports,
Compare: Compare.AtMost,
Value: 1,
Permissions: ContainerPermission.Write | ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type: ContainerType.NoRepro,
Compare: Compare.AtMost,
Value: 1,
Permissions: ContainerPermission.Write | ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type: ContainerType.Coverage,
Compare: Compare.AtMost,
Value: 1,
Permissions: ContainerPermission.Write | ContainerPermission.Read | ContainerPermission.List
),
}
)
},
{
TaskType.GenericMerge , new TaskDefinition(
Features: new[] {
TaskFeature.TargetExe,
TaskFeature.TargetOptions,
TaskFeature.SupervisorExe,
TaskFeature.SupervisorEnv,
TaskFeature.SupervisorOptions,
TaskFeature.SupervisorInputMarker,
TaskFeature.StatsFile,
TaskFeature.PreserveExistingOutputs,
},
Vm: new VmDefinition(Compare: Compare.AtLeast, Value: 1),
Containers: new[] {
new ContainerDefinition(
Type:ContainerType.Setup,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Tools,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.ReadonlyInputs,
Compare: Compare.AtLeast,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Inputs,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Write| ContainerPermission.List
),
}
)
},
{
TaskType.GenericGenerator , new TaskDefinition(
Features: new[] {
TaskFeature.GeneratorExe,
TaskFeature.GeneratorEnv,
TaskFeature.GeneratorOptions,
TaskFeature.TargetExe,
TaskFeature.TargetEnv,
TaskFeature.TargetOptions,
TaskFeature.RenameOutput,
TaskFeature.TargetTimeout,
TaskFeature.CheckAsanLog,
TaskFeature.CheckDebugger,
TaskFeature.CheckRetryCount,
TaskFeature.EnsembleSyncDelay,
TaskFeature.TargetMustUseInput,
},
Vm: new VmDefinition(Compare: Compare.AtLeast, Value: 1),
Containers: new[] {
new ContainerDefinition(
Type:ContainerType.Setup,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Tools,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Crashes,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Write
),
new ContainerDefinition(
Type:ContainerType.ReadonlyInputs,
Compare: Compare.AtLeast,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
}
)
},
{
TaskType.GenericCrashReport , new TaskDefinition(
Features: new[] {
TaskFeature.TargetExe,
TaskFeature.TargetEnv,
TaskFeature.TargetOptions,
TaskFeature.TargetTimeout,
TaskFeature.CheckAsanLog,
TaskFeature.CheckDebugger,
TaskFeature.CheckRetryCount,
TaskFeature.MinimizedStackDepth,
},
Vm: new VmDefinition(Compare: Compare.AtLeast, Value: 1),
Containers: new[] {
new ContainerDefinition(
Type:ContainerType.Setup,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Crashes,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Reports,
Compare: Compare.AtMost,
Value:1,
Permissions: ContainerPermission.Write
),
new ContainerDefinition(
Type:ContainerType.UniqueReports,
Compare: Compare.AtMost,
Value:1,
Permissions: ContainerPermission.Write
),
new ContainerDefinition(
Type:ContainerType.NoRepro,
Compare: Compare.AtMost,
Value:1,
Permissions: ContainerPermission.Write
),
}
)
},
{
TaskType.GenericRegression , new TaskDefinition(
Features: new[] {
TaskFeature.TargetExe,
TaskFeature.TargetEnv,
TaskFeature.TargetOptions,
TaskFeature.TargetTimeout,
TaskFeature.CheckAsanLog,
TaskFeature.CheckDebugger,
TaskFeature.CheckRetryCount,
TaskFeature.ReportList,
TaskFeature.MinimizedStackDepth,
},
Vm: new VmDefinition(Compare: Compare.AtLeast, Value: 1),
Containers: new[] {
new ContainerDefinition(
Type:ContainerType.Setup,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.RegressionReports,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Write | ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Crashes,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Reports,
Compare: Compare.AtMost,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.UniqueReports,
Compare: Compare.AtMost,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.NoRepro,
Compare: Compare.AtMost,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.ReadonlyInputs,
Compare: Compare.AtMost,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
})
},
{
TaskType.LibfuzzerRegression , new TaskDefinition(
Features: new[] {
TaskFeature.TargetExe,
TaskFeature.TargetEnv,
TaskFeature.TargetOptions,
TaskFeature.TargetTimeout,
TaskFeature.CheckFuzzerHelp,
TaskFeature.CheckRetryCount,
TaskFeature.ReportList,
TaskFeature.MinimizedStackDepth,
},
Vm: new VmDefinition(Compare: Compare.AtLeast, Value: 1),
Containers: new[] {
new ContainerDefinition(
Type:ContainerType.Setup,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.RegressionReports,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Write | ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Crashes,
Compare: Compare.Equal,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.UniqueReports,
Compare: Compare.AtMost,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.Reports,
Compare: Compare.AtMost,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.NoRepro,
Compare: Compare.AtMost,
Value:1,
Permissions: ContainerPermission.Read | ContainerPermission.List
),
new ContainerDefinition(
Type:ContainerType.ReadonlyInputs,
Compare: Compare.AtMost,
Value:1,
Permissions:
ContainerPermission.Read | ContainerPermission.List
),
})
} };
}

View File

@ -239,8 +239,8 @@ public class Extensions : IExtensions {
}
public async Async.Task UpdateManagedScripts() {
var instanceSpecificSetupSas = _containers.GetContainerSasUrl(new Container("instance-specific-setup"), StorageType.Config, BlobSasPermissions.List | BlobSasPermissions.Read);
var toolsSas = _containers.GetContainerSasUrl(new Container("tools"), StorageType.Config, BlobSasPermissions.List | BlobSasPermissions.Read);
var instanceSpecificSetupSas = _containers.GetContainerSasUrl(new Container("instance-specific-setup"), StorageType.Config, BlobContainerSasPermissions.List | BlobContainerSasPermissions.Read);
var toolsSas = _containers.GetContainerSasUrl(new Container("tools"), StorageType.Config, BlobContainerSasPermissions.List | BlobContainerSasPermissions.Read);
string[] commands = {
$"azcopy sync '{instanceSpecificSetupSas}' instance-specific-setup",

View File

@ -1,18 +1,21 @@
using ApiService.OneFuzzLib.Orm;
using System.Threading.Tasks;
using ApiService.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service;
public interface IPoolOperations {
public Async.Task<Result<Pool, Error>> GetByName(string poolName);
Task<bool> ScheduleWorkset(Pool pool, WorkSet workSet);
}
public class PoolOperations : StatefulOrm<Pool, PoolState>, IPoolOperations {
private IConfigOperations _configOperations;
private readonly IQueue _queue;
public PoolOperations(IStorage storage, ILogTracer log, IServiceConfig config, IConfigOperations configOperations)
public PoolOperations(IStorage storage, ILogTracer log, IServiceConfig config, IConfigOperations configOperations, IQueue queue)
: base(storage, log, config) {
_configOperations = configOperations;
_queue = queue;
}
public async Async.Task<Result<Pool, Error>> GetByName(string poolName) {
@ -28,4 +31,16 @@ public class PoolOperations : StatefulOrm<Pool, PoolState>, IPoolOperations {
return new Result<Pool, Error>(await pools.SingleAsync());
}
public async Task<bool> ScheduleWorkset(Pool pool, WorkSet workSet) {
if (pool.State == PoolState.Shutdown || pool.State == PoolState.Halt) {
return false;
}
return await _queue.QueueObject(GetPoolQueue(pool), workSet, StorageType.Corpus);
}
private string GetPoolQueue(Pool pool) {
return $"pool-{pool.PoolId.ToString("N")}";
}
}

View File

@ -8,7 +8,7 @@ using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service;
public interface IQueue {
Async.Task SendMessage(string name, byte[] message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null);
Async.Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout);
Async.Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null);
Async.Task<Uri?> GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration = null);
}
@ -28,20 +28,13 @@ public class Queue : IQueue {
public async Async.Task SendMessage(string name, byte[] message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) {
var queue = await GetQueue(name, storageType);
if (queue != null) {
try {
await queue.SendMessageAsync(Convert.ToBase64String(message), visibilityTimeout: visibilityTimeout, timeToLive: timeToLive);
} catch (Exception) {
}
await queue.SendMessageAsync(Convert.ToBase64String(message), visibilityTimeout: visibilityTimeout, timeToLive: timeToLive);
}
}
public async Task<QueueClient?> GetQueue(string name, StorageType storageType) {
var client = await GetQueueClient(storageType);
try {
return client.GetQueueClient(name);
} catch (Exception) {
return null;
}
return client.GetQueueClient(name);
}
@ -59,13 +52,8 @@ public class Queue : IQueue {
var serialized = JsonSerializer.Serialize(obj, EntityConverter.GetJsonSerializerOptions());
//var encoded = Encoding.UTF8.GetBytes(serialized);
try {
await queue.SendMessageAsync(serialized, visibilityTimeout: visibilityTimeout);
return true;
} catch (Exception) {
return false;
}
var response = await queue.SendMessageAsync(serialized, visibilityTimeout: visibilityTimeout);
return !response.GetRawResponse().IsError;
}
public async Task<Uri?> GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration) {

View File

@ -1,4 +1,6 @@
namespace Microsoft.OneFuzz.Service;
using Azure.Storage.Sas;
namespace Microsoft.OneFuzz.Service;
public interface IScheduler {
@ -8,13 +10,21 @@ public interface IScheduler {
public class Scheduler : IScheduler {
private readonly ITaskOperations _taskOperations;
private readonly IConfig _config;
private readonly IPoolOperations _poolOperations;
private readonly ILogTracer _logTracer;
private readonly IJobOperations _jobOperations;
private readonly IContainers _containers;
// TODO: eventually, this should be tied to the pool.
const int MAX_TASKS_PER_SET = 10;
public Scheduler(ITaskOperations taskOperations, IConfig config) {
public Scheduler(ITaskOperations taskOperations, IConfig config, IPoolOperations poolOperations, ILogTracer logTracer, IJobOperations jobOperations, IContainers containers) {
_taskOperations = taskOperations;
_config = config;
_poolOperations = poolOperations;
_logTracer = logTracer;
_jobOperations = jobOperations;
_containers = containers;
}
public async Async.Task ScheduleTasks() {
@ -25,15 +35,151 @@ public class Scheduler : IScheduler {
foreach (var bucketedTasks in buckets) {
foreach (var chunks in bucketedTasks.Chunk(MAX_TASKS_PER_SET)) {
var result = BuildWorkSet(chunks);
var result = await BuildWorkSet(chunks);
if (result == null) {
continue;
}
var (bucketConfig, workSet) = result.Value;
if (await ScheduleWorkset(workSet, bucketConfig.pool, bucketConfig.count)) {
foreach (var workUnit in workSet.WorkUnits) {
var task1 = tasks[workUnit.TaskId];
Task task = await _taskOperations.SetState(task1, TaskState.Scheduled);
seen.Add(task.TaskId);
}
}
}
}
throw new NotImplementedException();
var notReadyCount = tasks.Count - seen.Count;
if (notReadyCount > 0) {
_logTracer.Info($"tasks not ready {notReadyCount}");
}
}
private object BuildWorkSet(Task[] chunks) {
throw new NotImplementedException();
private async System.Threading.Tasks.Task<bool> ScheduleWorkset(WorkSet workSet, Pool pool, int count) {
if (!PoolStateHelper.Available().Contains(pool.State)) {
_logTracer.Info($"pool not available for work: {pool.Name} state: {pool.State}");
return false;
}
for (var i = 0; i < count; i++) {
if (!await _poolOperations.ScheduleWorkset(pool, workSet)) {
_logTracer.Error($"unable to schedule workset. pool:{pool.Name} workset: {workSet}");
return false;
}
}
return true;
}
private async Async.Task<(BucketConfig, WorkSet)?> BuildWorkSet(Task[] tasks) {
var taskIds = tasks.Select(x => x.TaskId).ToHashSet();
var work_units = new List<WorkUnit>();
BucketConfig? bucketConfig = null;
foreach (var task in tasks) {
if ((task.Config.PrereqTasks?.Count ?? 0) > 0) {
// if all of the prereqs are in this bucket, they will be
// scheduled together
if (!taskIds.IsSupersetOf(task.Config.PrereqTasks!)) {
if (!(await _taskOperations.CheckPrereqTasks(task))) {
continue;
}
}
}
var result = await BuildWorkunit(task);
if (result == null) {
continue;
}
if (bucketConfig == null) {
bucketConfig = result.Value.Item1;
} else if (bucketConfig != result.Value.Item1) {
throw new Exception($"bucket configs differ: {bucketConfig} VS {result.Value.Item1}");
}
work_units.Add(result.Value.Item2);
}
if (bucketConfig != null) {
var setupUrl = await _containers.GetContainerSasUrl(bucketConfig.setupContainer, StorageType.Corpus, BlobContainerSasPermissions.Read | BlobContainerSasPermissions.List) ?? throw new Exception("container not found");
var workSet = new WorkSet(
Reboot: bucketConfig.reboot,
Script: bucketConfig.setupScript != null,
SetupUrl: setupUrl,
WorkUnits: work_units
);
return (bucketConfig, workSet);
}
return null;
}
record BucketConfig(int count, bool reboot, Container setupContainer, string? setupScript, Pool pool);
private async System.Threading.Tasks.Task<(BucketConfig, WorkUnit)?> BuildWorkunit(Task task) {
Pool? pool = await _taskOperations.GetPool(task);
if (pool == null) {
_logTracer.Info($"unable to find pool for task: {task.TaskId}");
return null;
}
_logTracer.Info($"scheduling task: {task.TaskId}");
var job = await _jobOperations.Get(task.JobId);
if (job == null) {
throw new Exception($"invalid job_id {task.JobId} for task {task.TaskId}");
}
var taskConfig = await _config.BuildTaskConfig(job, task);
var setupContainer = task.Config.Containers?.FirstOrDefault(c => c.Type == ContainerType.Setup) ?? throw new Exception($"task missing setup container: task_type = {task.Config.Task.Type}");
var setupPs1Exist = _containers.BlobExists(setupContainer.Name, "setup.ps1", StorageType.Corpus);
var setupShExist = _containers.BlobExists(setupContainer.Name, "setup.sh", StorageType.Corpus);
string? setupScript = null;
if (task.Os == Os.Windows && await setupPs1Exist) {
setupScript = "setup.ps1";
}
if (task.Os == Os.Linux && await setupShExist) {
setupScript = "setup.sh";
}
var reboot = false;
var count = 1;
if (task.Config.Pool != null) {
count = task.Config.Pool.Count;
reboot = task.Config.Task.RebootAfterSetup ?? false;
} else if (task.Config.Vm != null) {
count = task.Config.Vm.Count;
reboot = (task.Config.Vm.RebootAfterSetup ?? false) || (task.Config.Task.RebootAfterSetup ?? false);
} else {
throw new Exception();
}
var workUnit = new WorkUnit(
JobId: taskConfig.JobId,
TaskId: taskConfig.TaskId,
TaskType: taskConfig.TaskType,
// todo: make sure that we exclude nulls when serializing
// config = task_config.json(exclude_none = True, exclude_unset = True),
Config: taskConfig);
var bucketConfig = new BucketConfig(
count,
reboot,
setupContainer.Name,
setupScript,
pool);
return (bucketConfig, workUnit);
}
record struct BucketId(Os os, Guid jobId, (string, string)? vm, string? pool, string setupContainer, bool? reboot, Guid? unique);
@ -75,19 +221,3 @@ public class Scheduler : IScheduler {
}
public interface IConfig {
string GetSetupContainer(TaskConfig config);
}
public class Config : IConfig {
public string GetSetupContainer(TaskConfig config) {
foreach (var container in config.Containers ?? throw new Exception("Missing containers")) {
if (container.Type == ContainerType.Setup) {
return container.Name.ContainerName;
}
}
throw new Exception($"task missing setup container: task_type = {config.Task.Type}");
}
}

View File

@ -15,7 +15,9 @@ public interface ITaskOperations : IStatefulOrm<Task, TaskState> {
IAsyncEnumerable<Task> SearchExpired();
Async.Task MarkStopping(Task task);
Async.Task<TaskVm?> GetReproVmConfig(Task task);
Async.Task<bool> CheckPrereqTasks(Task task);
System.Threading.Tasks.Task<Pool?> GetPool(Task task);
System.Threading.Tasks.Task<Task> SetState(Task task, TaskState state);
}
public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations {
@ -117,7 +119,7 @@ public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations {
}
}
private async Async.Task<Task> SetState(Task task, TaskState state) {
public async Async.Task<Task> SetState(Task task, TaskState state) {
if (task.State == state) {
return task;
}
@ -199,5 +201,55 @@ public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations {
return new TaskVm(scaleset.Region, scaleset.VmSku, scaleset.Image, null);
}
public async Async.Task<bool> CheckPrereqTasks(Task task) {
if (task.Config.PrereqTasks != null) {
foreach (var taskId in task.Config.PrereqTasks) {
var t = await GetByTaskId(taskId);
// if a prereq task fails, then mark this task as failed
if (t == null) {
await MarkFailed(task, new Error(ErrorCode.INVALID_REQUEST, Errors: new[] { "unable to find task" }));
return false;
}
if (!TaskStateHelper.HasStarted().Contains(t.State)) {
return false;
}
}
}
return true;
}
public async System.Threading.Tasks.Task<Pool?> GetPool(Task task) {
if (task.Config.Pool != null) {
var pool = await _poolOperations.GetByName(task.Config.Pool.PoolName);
if (!pool.IsOk) {
_logTracer.Info(
$"unable to schedule task to pool: {task.TaskId} - {pool.ErrorV}"
);
return null;
}
return pool.OkV;
} else if (task.Config.Vm != null) {
var scalesets = _scalesetOperations.Search().Where(s => s.VmSku == task.Config.Vm.Sku && s.Image == task.Config.Vm.Image);
await foreach (var scaleset in scalesets) {
if (task.Config.Pool == null) {
continue;
}
var pool = await _poolOperations.GetByName(task.Config.Pool.PoolName);
if (!pool.IsOk) {
_logTracer.Info(
$"unable to schedule task to pool: {task.TaskId} - {pool.ErrorV}"
);
return null;
}
return pool.OkV;
}
}
_logTracer.Warning($"unable to find a scaleset that matches the task prereqs: {task.TaskId}");
return null;
}
}