mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-14 11:08:06 +00:00
@ -8,12 +8,14 @@
|
||||
<WarningLevel>5</WarningLevel>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Azure.ResourceManager.Monitor" Version="1.0.0-beta.2" />
|
||||
<PackageReference Include="Faithlife.Utility" Version="0.12.2" />
|
||||
<PackageReference Include="Azure.Security.KeyVault.Secrets" Version="4.3.0" />
|
||||
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Storage" Version="5.0.0" />
|
||||
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.EventGrid" Version="2.1.0" />
|
||||
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.0.13" />
|
||||
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Timer" Version="4.1.0" />
|
||||
<PackageReference Include="Microsoft.Azure.Management.OperationalInsights" Version="0.24.0-preview" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.ApplicationInsights" Version="2.20.0" />
|
||||
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.3.0" OutputItemType="Analyzer" />
|
||||
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.6.0" />
|
||||
|
@ -295,3 +295,10 @@ public static class PoolStateHelper {
|
||||
public enum Architecture {
|
||||
x86_64
|
||||
}
|
||||
|
||||
|
||||
public enum AgentMode {
|
||||
Fuzz,
|
||||
Repro,
|
||||
Proxy
|
||||
}
|
||||
|
@ -57,6 +57,9 @@ public abstract record BaseEvent() {
|
||||
EventTaskFailed _ => EventType.TaskFailed,
|
||||
EventTaskStopped _ => EventType.TaskStopped,
|
||||
EventTaskStateUpdated _ => EventType.TaskStateUpdated,
|
||||
EventScalesetFailed _ => EventType.ScalesetFailed,
|
||||
EventScalesetResizeScheduled _ => EventType.ScalesetResizeScheduled,
|
||||
EventScalesetStateUpdated _ => EventType.ScalesetStateUpdated,
|
||||
_ => throw new NotImplementedException(),
|
||||
};
|
||||
|
||||
@ -166,11 +169,11 @@ public record EventPing(
|
||||
// int Size) : BaseEvent();
|
||||
|
||||
|
||||
//record EventScalesetFailed(
|
||||
// Guid ScalesetId,
|
||||
// PoolName: PoolName,
|
||||
// Error: Error
|
||||
//): BaseEvent();
|
||||
public record EventScalesetFailed(
|
||||
Guid ScalesetId,
|
||||
PoolName PoolName,
|
||||
Error Error
|
||||
) : BaseEvent();
|
||||
|
||||
|
||||
//record EventScalesetDeleted(
|
||||
@ -180,11 +183,11 @@ public record EventPing(
|
||||
// ) : BaseEvent();
|
||||
|
||||
|
||||
//record EventScalesetResizeScheduled(
|
||||
// Guid ScalesetId,
|
||||
// PoolName PoolName,
|
||||
// int size
|
||||
// ) : BaseEvent();
|
||||
public record EventScalesetResizeScheduled(
|
||||
Guid ScalesetId,
|
||||
PoolName PoolName,
|
||||
int size
|
||||
) : BaseEvent();
|
||||
|
||||
|
||||
//record EventPoolDeleted(
|
||||
@ -249,11 +252,11 @@ public record EventNodeHeartbeat(
|
||||
// ) : BaseEvent();
|
||||
|
||||
|
||||
// record EventScalesetStateUpdated(
|
||||
// Guid ScalesetId,
|
||||
// PoolName PoolName,
|
||||
// ScalesetState State
|
||||
// ) : BaseEvent();
|
||||
public record EventScalesetStateUpdated(
|
||||
Guid ScalesetId,
|
||||
PoolName PoolName,
|
||||
ScalesetState State
|
||||
) : BaseEvent();
|
||||
|
||||
// record EventNodeStateUpdated(
|
||||
// Guid MachineId,
|
||||
|
@ -386,6 +386,7 @@ public record Scaleset(
|
||||
bool SpotInstance,
|
||||
bool EphemeralOsDisks,
|
||||
bool NeedsConfigUpdate,
|
||||
Error? Error,
|
||||
List<ScalesetNodeState> Nodes,
|
||||
Guid? ClientId,
|
||||
Guid? ClientObjectId,
|
||||
@ -513,8 +514,26 @@ public record Pool(
|
||||
) : StatefulEntityBase<PoolState>(State);
|
||||
|
||||
|
||||
// TODO
|
||||
public record AgentConfig();
|
||||
public record ClientCredentials
|
||||
(
|
||||
Guid ClientId,
|
||||
string ClientSecret
|
||||
);
|
||||
|
||||
|
||||
public record AgentConfig(
|
||||
ClientCredentials? ClientCredentials,
|
||||
[property: JsonPropertyName("onefuzz_url")] Uri OneFuzzUrl,
|
||||
PoolName PoolName,
|
||||
Uri? HeartbeatQueue,
|
||||
string? InstanceTelemetryKey,
|
||||
string? MicrosoftTelemetryKey,
|
||||
string? MultiTenantDomain,
|
||||
Guid InstanceId
|
||||
);
|
||||
|
||||
|
||||
|
||||
public record WorkSetSummary();
|
||||
public record ScalesetSummary();
|
||||
|
||||
|
@ -1,8 +1,11 @@
|
||||
// to avoid collision with Task in model.cs
|
||||
global using System;
|
||||
global using System.Collections.Generic;
|
||||
global using System.Linq;
|
||||
global using Async = System.Threading.Tasks;
|
||||
global
|
||||
using System.Collections.Generic;
|
||||
global
|
||||
using System.Linq;
|
||||
global
|
||||
using Async = System.Threading.Tasks;
|
||||
using Microsoft.Azure.Functions.Worker;
|
||||
using Microsoft.Azure.Functions.Worker.Middleware;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
@ -81,6 +84,7 @@ public class Program {
|
||||
.AddScoped<IJobOperations, JobOperations>()
|
||||
.AddScoped<IScheduler, Scheduler>()
|
||||
.AddScoped<IConfig, Config>()
|
||||
.AddScoped<ILogAnalytics, LogAnalytics>()
|
||||
|
||||
//Move out expensive resources into separate class, and add those as Singleton
|
||||
// ArmClient, Table Client(s), Queue Client(s), HttpClient, etc.\
|
||||
|
@ -1,35 +1,50 @@
|
||||
using System.Threading.Tasks;
|
||||
using Azure;
|
||||
using Azure;
|
||||
using Azure.ResourceManager;
|
||||
using Azure.Storage;
|
||||
using Azure.Storage.Blobs;
|
||||
using Azure.Storage.Sas;
|
||||
|
||||
|
||||
namespace Microsoft.OneFuzz.Service;
|
||||
|
||||
|
||||
public interface IContainers {
|
||||
public Task<BinaryData?> GetBlob(Container container, string name, StorageType storageType);
|
||||
public Async.Task<BinaryData?> GetBlob(Container container, string name, StorageType storageType);
|
||||
|
||||
public Async.Task<BlobContainerClient?> FindContainer(Container container, StorageType storageType);
|
||||
|
||||
public Async.Task<Uri?> GetFileSasUrl(Container container, string name, StorageType storageType, BlobSasPermissions permissions, TimeSpan? duration = null);
|
||||
Async.Task saveBlob(Container container, string v1, string v2, StorageType config);
|
||||
Task<Guid> GetInstanceId();
|
||||
public Async.Task SaveBlob(Container container, string v1, string v2, StorageType config);
|
||||
public Async.Task<Guid> GetInstanceId();
|
||||
|
||||
public Async.Task<Uri?> GetFileUrl(Container container, string name, StorageType storageType);
|
||||
|
||||
public Async.Task<Uri?> GetContainerSasUrl(Container container, StorageType storageType, BlobSasPermissions permissions);
|
||||
}
|
||||
|
||||
|
||||
public class Containers : IContainers {
|
||||
private ILogTracer _log;
|
||||
private IStorage _storage;
|
||||
private ICreds _creds;
|
||||
private ArmClient _armClient;
|
||||
|
||||
public Containers(ILogTracer log, IStorage storage, ICreds creds) {
|
||||
_log = log;
|
||||
_storage = storage;
|
||||
_creds = creds;
|
||||
_armClient = creds.ArmClient;
|
||||
}
|
||||
public async Task<BinaryData?> GetBlob(Container container, string name, StorageType storageType) {
|
||||
|
||||
public async Async.Task<Uri?> GetFileUrl(Container container, string name, StorageType storageType) {
|
||||
var client = await FindContainer(container, storageType);
|
||||
if (client is null)
|
||||
return null;
|
||||
|
||||
return new Uri($"{GetUrl(client.AccountName)}{container}/{name}");
|
||||
}
|
||||
|
||||
public async Async.Task<BinaryData?> GetBlob(Container container, string name, StorageType storageType) {
|
||||
var client = await FindContainer(container, storageType);
|
||||
|
||||
if (client == null) {
|
||||
@ -52,17 +67,23 @@ public class Containers : IContainers {
|
||||
// #
|
||||
// # Secondary accounts, if they exist, are preferred for containers and have
|
||||
// # increased IOP rates, this should be a slight optimization
|
||||
return await _storage.GetAccounts(storageType)
|
||||
|
||||
var containers = _storage.GetAccounts(storageType)
|
||||
.Reverse()
|
||||
.Select(account => GetBlobService(account)?.GetBlobContainerClient(container.ContainerName))
|
||||
.ToAsyncEnumerable()
|
||||
.WhereAwait(async client => client != null && (await client.ExistsAsync()).Value)
|
||||
.FirstOrDefaultAsync();
|
||||
.Select(async account => (await GetBlobService(account))?.GetBlobContainerClient(container.ContainerName));
|
||||
|
||||
foreach (var c in containers) {
|
||||
var client = await c;
|
||||
if (client != null && (await client.ExistsAsync()).Value) {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private BlobServiceClient? GetBlobService(string accountId) {
|
||||
private async Async.Task<BlobServiceClient?> GetBlobService(string accountId) {
|
||||
_log.Info($"getting blob container (account_id: {accountId}");
|
||||
var (accountName, accountKey) = _storage.GetStorageAccountNameAndKey(accountId);
|
||||
var (accountName, accountKey) = await _storage.GetStorageAccountNameAndKey(accountId);
|
||||
if (accountName == null) {
|
||||
_log.Error("Failed to get storage account name");
|
||||
return null;
|
||||
@ -78,7 +99,7 @@ public class Containers : IContainers {
|
||||
|
||||
public async Async.Task<Uri?> GetFileSasUrl(Container container, string name, StorageType storageType, BlobSasPermissions permissions, TimeSpan? duration = null) {
|
||||
var client = await FindContainer(container, storageType) ?? throw new Exception($"unable to find container: {container.ContainerName} - {storageType}");
|
||||
var (accountName, accountKey) = _storage.GetStorageAccountNameAndKey(client.AccountName);
|
||||
var (accountName, accountKey) = await _storage.GetStorageAccountNameAndKey(client.AccountName);
|
||||
|
||||
var (startTime, endTime) = SasTimeWindow(duration ?? TimeSpan.FromDays(30));
|
||||
|
||||
@ -108,12 +129,12 @@ public class Containers : IContainers {
|
||||
return (start, expiry);
|
||||
}
|
||||
|
||||
public async System.Threading.Tasks.Task saveBlob(Container container, string name, string data, StorageType storageType) {
|
||||
public async Async.Task SaveBlob(Container container, string name, string data, StorageType storageType) {
|
||||
var client = await FindContainer(container, storageType) ?? throw new Exception($"unable to find container: {container.ContainerName} - {storageType}");
|
||||
|
||||
await client.UploadBlobAsync(name, new BinaryData(data));
|
||||
}
|
||||
|
||||
//TODO: get this ones on startup and cache (and make this method un-accessible to everyone else)
|
||||
public async Async.Task<Guid> GetInstanceId() {
|
||||
var blob = await GetBlob(new Container("base-config"), "instance_id", StorageType.Config);
|
||||
if (blob == null) {
|
||||
@ -121,4 +142,35 @@ public class Containers : IContainers {
|
||||
}
|
||||
return System.Guid.Parse(blob.ToString());
|
||||
}
|
||||
|
||||
public Uri? GetContainerSasUrlService(
|
||||
BlobContainerClient client,
|
||||
BlobSasPermissions permissions,
|
||||
bool tag = false,
|
||||
TimeSpan? timeSpan = null) {
|
||||
var (start, expiry) = SasTimeWindow(timeSpan ?? TimeSpan.FromDays(30.0));
|
||||
var sasBuilder = new BlobSasBuilder(permissions, expiry) { StartsOn = start };
|
||||
var sas = client.GenerateSasUri(sasBuilder);
|
||||
return sas;
|
||||
}
|
||||
|
||||
|
||||
//TODO: instead of returning null when container not found, convert to return to "Result" type and set appropriate error
|
||||
public async Async.Task<Uri?> GetContainerSasUrl(Container container, StorageType storageType, BlobSasPermissions permissions) {
|
||||
var client = await FindContainer(container, storageType);
|
||||
|
||||
if (client is null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
var uri = GetContainerSasUrlService(client, permissions);
|
||||
|
||||
if (uri is null) {
|
||||
//TODO: return result error
|
||||
return uri;
|
||||
} else {
|
||||
return uri;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,7 @@ namespace Microsoft.OneFuzz.Service;
|
||||
public interface ICreds {
|
||||
public DefaultAzureCredential GetIdentity();
|
||||
|
||||
public string GetSubcription();
|
||||
public string GetSubscription();
|
||||
|
||||
public string GetBaseResourceGroup();
|
||||
|
||||
@ -21,6 +21,8 @@ public interface ICreds {
|
||||
public ResourceGroupResource GetResourceGroupResource();
|
||||
|
||||
public string GetBaseRegion();
|
||||
|
||||
public Uri GetInstanceUrl();
|
||||
}
|
||||
|
||||
public class Creds : ICreds {
|
||||
@ -33,14 +35,14 @@ public class Creds : ICreds {
|
||||
public Creds(IServiceConfig config) {
|
||||
_config = config;
|
||||
_azureCredential = new DefaultAzureCredential();
|
||||
_armClient = new ArmClient(this.GetIdentity(), this.GetSubcription());
|
||||
_armClient = new ArmClient(this.GetIdentity(), this.GetSubscription());
|
||||
}
|
||||
|
||||
public DefaultAzureCredential GetIdentity() {
|
||||
return _azureCredential;
|
||||
}
|
||||
|
||||
public string GetSubcription() {
|
||||
public string GetSubscription() {
|
||||
var storageResourceId = _config.OneFuzzDataStorage
|
||||
?? throw new System.Exception("Data storage env var is not present");
|
||||
var storageResource = new ResourceIdentifier(storageResourceId);
|
||||
@ -75,4 +77,8 @@ public class Creds : ICreds {
|
||||
public string GetBaseRegion() {
|
||||
return ArmClient.GetResourceGroupResource(GetResourceGroupResourceIdentifier()).Data.Location.Name;
|
||||
}
|
||||
|
||||
public Uri GetInstanceUrl() {
|
||||
return new Uri($"https://{GetInstanceName()}.azurewebsites.net");
|
||||
}
|
||||
}
|
||||
|
@ -32,11 +32,11 @@ public class IpOperations : IIpOperations {
|
||||
return await _creds.GetResourceGroupResource().GetPublicIPAddressAsync(name);
|
||||
}
|
||||
|
||||
public System.Threading.Tasks.Task DeleteNic(string resourceGroup, string name) {
|
||||
public Async.Task DeleteNic(string resourceGroup, string name) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public System.Threading.Tasks.Task DeleteIp(string resourceGroup, string name) {
|
||||
public Async.Task DeleteIp(string resourceGroup, string name) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
|
@ -3,12 +3,12 @@
|
||||
namespace Microsoft.OneFuzz.Service;
|
||||
|
||||
public interface IJobOperations : IStatefulOrm<Job, JobState> {
|
||||
System.Threading.Tasks.Task<Job?> Get(Guid jobId);
|
||||
System.Threading.Tasks.Task OnStart(Job job);
|
||||
Async.Task<Job?> Get(Guid jobId);
|
||||
Async.Task OnStart(Job job);
|
||||
IAsyncEnumerable<Job> SearchExpired();
|
||||
System.Threading.Tasks.Task Stopping(Job job, ITaskOperations taskOperations);
|
||||
Async.Task Stopping(Job job, ITaskOperations taskOperations);
|
||||
IAsyncEnumerable<Job> SearchState(IEnumerable<JobState> states);
|
||||
System.Threading.Tasks.Task StopNeverStartedJobs();
|
||||
Async.Task StopNeverStartedJobs();
|
||||
}
|
||||
|
||||
public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations {
|
||||
@ -18,11 +18,11 @@ public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations {
|
||||
_events = events;
|
||||
}
|
||||
|
||||
public async System.Threading.Tasks.Task<Job?> Get(Guid jobId) {
|
||||
public async Async.Task<Job?> Get(Guid jobId) {
|
||||
return await QueryAsync($"PartitionKey eq '{jobId}'").FirstOrDefaultAsync();
|
||||
}
|
||||
|
||||
public async System.Threading.Tasks.Task OnStart(Job job) {
|
||||
public async Async.Task OnStart(Job job) {
|
||||
if (job.EndTime == null) {
|
||||
await Replace(job with { EndTime = DateTimeOffset.UtcNow + TimeSpan.FromHours(job.Config.Duration) });
|
||||
}
|
||||
@ -40,11 +40,11 @@ public class JobOperations : StatefulOrm<Job, JobState>, IJobOperations {
|
||||
return QueryAsync(filter: query);
|
||||
}
|
||||
|
||||
public System.Threading.Tasks.Task StopNeverStartedJobs() {
|
||||
public Async.Task StopNeverStartedJobs() {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public async System.Threading.Tasks.Task Stopping(Job job, ITaskOperations taskOperations) {
|
||||
public async Async.Task Stopping(Job job, ITaskOperations taskOperations) {
|
||||
job = job with { State = JobState.Stopping };
|
||||
var tasks = await taskOperations.QueryAsync(filter: $"job_id eq '{job.JobId}'").ToListAsync();
|
||||
var taskNotStopped = tasks.ToLookup(task => task.State != TaskState.Stopped);
|
||||
|
40
src/ApiService/ApiService/onefuzzlib/LogAnalytics.cs
Normal file
40
src/ApiService/ApiService/onefuzzlib/LogAnalytics.cs
Normal file
@ -0,0 +1,40 @@
|
||||
using Azure.Core;
|
||||
using Microsoft.Azure.Management.OperationalInsights;
|
||||
|
||||
namespace Microsoft.OneFuzz.Service;
|
||||
|
||||
public record MonitorSettings(string CustomerId, string Key);
|
||||
|
||||
public interface ILogAnalytics {
|
||||
public ResourceIdentifier GetWorkspaceId();
|
||||
public Async.Task<MonitorSettings> GetMonitorSettings();
|
||||
}
|
||||
|
||||
|
||||
public class LogAnalytics : ILogAnalytics {
|
||||
|
||||
ICreds _creds;
|
||||
IServiceConfig _config;
|
||||
|
||||
public LogAnalytics(ICreds creds, IServiceConfig config) {
|
||||
_creds = creds;
|
||||
_config = config;
|
||||
}
|
||||
|
||||
public async Async.Task<MonitorSettings> GetMonitorSettings() {
|
||||
string[] scopes = { "https://management.azure.com/.default" };
|
||||
var token = _creds.GetIdentity().GetToken(new TokenRequestContext(scopes));
|
||||
var client = new OperationalInsightsManagementClient(new Rest.TokenCredentials(token.Token)) { SubscriptionId = _creds.GetSubscription() };
|
||||
var customerId = (await client.Workspaces.ListByResourceGroupAsync(_creds.GetBaseResourceGroup()))
|
||||
.Select(w => w.CustomerId)
|
||||
.First();
|
||||
var keys = await client.SharedKeys.GetSharedKeysAsync(_creds.GetBaseResourceGroup(), _config.OneFuzzMonitor);
|
||||
return new MonitorSettings(customerId, keys.PrimarySharedKey);
|
||||
}
|
||||
|
||||
|
||||
public ResourceIdentifier GetWorkspaceId() {
|
||||
return new ResourceIdentifier($"/subscriptions/{_creds.GetSubscription()}/resourceGroups/{_creds.GetBaseResourceGroup()}/providers/microsoft.operationalinsights/workspaces/{_config.OneFuzzInstanceName}");
|
||||
}
|
||||
|
||||
}
|
@ -49,7 +49,7 @@ public partial class TimerProxy {
|
||||
return _subnet.GetSubnet(_name, _name);
|
||||
}
|
||||
|
||||
internal System.Threading.Tasks.Task<VirtualNetworkResource?> GetVnet() {
|
||||
internal Async.Task<VirtualNetworkResource?> GetVnet() {
|
||||
return _subnet.GetVnet(_name);
|
||||
}
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ namespace Microsoft.OneFuzz.Service {
|
||||
return null;
|
||||
}
|
||||
|
||||
public System.Threading.Tasks.Task DissociateNic(NetworkInterfaceResource nic) {
|
||||
public Async.Task DissociateNic(NetworkInterfaceResource nic) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
|
@ -9,9 +9,9 @@ public interface IProxyOperations : IStatefulOrm<Proxy, VmState> {
|
||||
|
||||
Async.Task SetState(Proxy proxy, VmState state);
|
||||
bool IsAlive(Proxy proxy);
|
||||
System.Threading.Tasks.Task SaveProxyConfig(Proxy proxy);
|
||||
Async.Task SaveProxyConfig(Proxy proxy);
|
||||
bool IsOutdated(Proxy proxy);
|
||||
System.Threading.Tasks.Task<Proxy?> GetOrCreate(string region);
|
||||
Async.Task<Proxy?> GetOrCreate(string region);
|
||||
}
|
||||
public class ProxyOperations : StatefulOrm<Proxy, VmState>, IProxyOperations {
|
||||
|
||||
@ -39,7 +39,7 @@ public class ProxyOperations : StatefulOrm<Proxy, VmState>, IProxyOperations {
|
||||
return await data.FirstOrDefaultAsync();
|
||||
}
|
||||
|
||||
public async System.Threading.Tasks.Task<Proxy?> GetOrCreate(string region) {
|
||||
public async Async.Task<Proxy?> GetOrCreate(string region) {
|
||||
var proxyList = QueryAsync(filter: $"region eq '{region}' and outdated eq false");
|
||||
|
||||
await foreach (var proxy in proxyList) {
|
||||
@ -97,13 +97,14 @@ public class ProxyOperations : StatefulOrm<Proxy, VmState>, IProxyOperations {
|
||||
return false;
|
||||
}
|
||||
|
||||
public async System.Threading.Tasks.Task SaveProxyConfig(Proxy proxy) {
|
||||
public async Async.Task SaveProxyConfig(Proxy proxy) {
|
||||
var forwards = await GetForwards(proxy);
|
||||
var url = (await _containers.GetFileSasUrl(new Container("proxy-configs"), $"{proxy.Region}/{proxy.ProxyId}/config.json", StorageType.Config, BlobSasPermissions.Read)).EnsureNotNull("Can't generate file sas");
|
||||
var queueSas = await _queue.GetQueueSas("proxy", StorageType.Config, QueueSasPermissions.Add).EnsureNotNull("can't generate queue sas") ?? throw new Exception("Queue sas is null");
|
||||
|
||||
var proxyConfig = new ProxyConfig(
|
||||
Url: url,
|
||||
Notification: _queue.GetQueueSas("proxy", StorageType.Config, QueueSasPermissions.Add).EnsureNotNull("can't generate queue sas"),
|
||||
Notification: queueSas,
|
||||
Region: proxy.Region,
|
||||
ProxyId: proxy.ProxyId,
|
||||
Forwards: forwards,
|
||||
@ -111,8 +112,7 @@ public class ProxyOperations : StatefulOrm<Proxy, VmState>, IProxyOperations {
|
||||
MicrosoftTelemetryKey: _config.OneFuzzTelemetry.EnsureNotNull("missing Telemetry"),
|
||||
InstanceId: await _containers.GetInstanceId());
|
||||
|
||||
|
||||
await _containers.saveBlob(new Container("proxy-configs"), $"{proxy.Region}/{proxy.ProxyId}/config.json", _entityConverter.ToJsonString(proxyConfig), StorageType.Config);
|
||||
await _containers.SaveBlob(new Container("proxy-configs"), $"{proxy.Region}/{proxy.ProxyId}/config.json", _entityConverter.ToJsonString(proxyConfig), StorageType.Config);
|
||||
}
|
||||
|
||||
|
||||
|
@ -9,7 +9,7 @@ namespace Microsoft.OneFuzz.Service;
|
||||
public interface IQueue {
|
||||
Async.Task SendMessage(string name, byte[] message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null);
|
||||
Async.Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout);
|
||||
Uri? GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration = null);
|
||||
Async.Task<Uri?> GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration = null);
|
||||
}
|
||||
|
||||
|
||||
@ -26,7 +26,7 @@ public class Queue : IQueue {
|
||||
|
||||
|
||||
public async Async.Task SendMessage(string name, byte[] message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) {
|
||||
var queue = GetQueue(name, storageType);
|
||||
var queue = await GetQueue(name, storageType);
|
||||
if (queue != null) {
|
||||
try {
|
||||
await queue.SendMessageAsync(Convert.ToBase64String(message), visibilityTimeout: visibilityTimeout, timeToLive: timeToLive);
|
||||
@ -35,8 +35,8 @@ public class Queue : IQueue {
|
||||
}
|
||||
}
|
||||
|
||||
public QueueClient? GetQueue(string name, StorageType storageType) {
|
||||
var client = GetQueueClient(storageType);
|
||||
public async Task<QueueClient?> GetQueue(string name, StorageType storageType) {
|
||||
var client = await GetQueueClient(storageType);
|
||||
try {
|
||||
return client.GetQueueClient(name);
|
||||
} catch (Exception) {
|
||||
@ -45,17 +45,17 @@ public class Queue : IQueue {
|
||||
}
|
||||
|
||||
|
||||
public QueueServiceClient GetQueueClient(StorageType storageType) {
|
||||
public async Task<QueueServiceClient> GetQueueClient(StorageType storageType) {
|
||||
var accountId = _storage.GetPrimaryAccount(storageType);
|
||||
//_logger.LogDEbug("getting blob container (account_id: %s)", account_id)
|
||||
(var name, var key) = _storage.GetStorageAccountNameAndKey(accountId);
|
||||
var (name, key) = await _storage.GetStorageAccountNameAndKey(accountId);
|
||||
var accountUrl = new Uri($"https://{name}.queue.core.windows.net");
|
||||
var client = new QueueServiceClient(accountUrl, new StorageSharedKeyCredential(name, key));
|
||||
return client;
|
||||
}
|
||||
|
||||
public async Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout) {
|
||||
var queue = GetQueue(name, storageType) ?? throw new Exception($"unable to queue object, no such queue: {name}");
|
||||
var queue = await GetQueue(name, storageType) ?? throw new Exception($"unable to queue object, no such queue: {name}");
|
||||
|
||||
var serialized = JsonSerializer.Serialize(obj, EntityConverter.GetJsonSerializerOptions());
|
||||
//var encoded = Encoding.UTF8.GetBytes(serialized);
|
||||
@ -68,8 +68,8 @@ public class Queue : IQueue {
|
||||
}
|
||||
}
|
||||
|
||||
public Uri? GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration) {
|
||||
var queue = GetQueue(name, storageType) ?? throw new Exception($"unable to queue object, no such queue: {name}");
|
||||
public async Task<Uri?> GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration) {
|
||||
var queue = await GetQueue(name, storageType) ?? throw new Exception($"unable to queue object, no such queue: {name}");
|
||||
var sasaBuilder = new QueueSasBuilder(permissions, DateTimeOffset.UtcNow + (duration ?? DEFAULT_DURATION));
|
||||
var url = queue.GenerateSasUri(sasaBuilder);
|
||||
return url;
|
||||
|
@ -70,7 +70,7 @@ public class ReproOperations : StatefulOrm<Repro, VmState>, IReproOperations {
|
||||
);
|
||||
}
|
||||
|
||||
public async System.Threading.Tasks.Task Stopping(Repro repro) {
|
||||
public async Async.Task Stopping(Repro repro) {
|
||||
var config = await _configOperations.Fetch();
|
||||
var vm = await GetVm(repro, config);
|
||||
if (!await _vmOperations.IsDeleted(vm)) {
|
||||
|
@ -17,7 +17,7 @@ public class Scheduler : IScheduler {
|
||||
_config = config;
|
||||
}
|
||||
|
||||
public async System.Threading.Tasks.Task ScheduleTasks() {
|
||||
public async Async.Task ScheduleTasks() {
|
||||
var tasks = await _taskOperations.SearchStates(states: new[] { TaskState.Waiting }).ToDictionaryAsync(x => x.TaskId);
|
||||
var seen = new HashSet<Guid>();
|
||||
|
||||
|
@ -13,7 +13,10 @@ public enum StorageType {
|
||||
public interface IStorage {
|
||||
public IEnumerable<string> CorpusAccounts();
|
||||
string GetPrimaryAccount(StorageType storageType);
|
||||
public (string?, string?) GetStorageAccountNameAndKey(string accountId);
|
||||
public Async.Task<(string?, string?)> GetStorageAccountNameAndKey(string accountId);
|
||||
|
||||
public Async.Task<string?> GetStorageAccountNameAndKeyByName(string accountName);
|
||||
|
||||
public IEnumerable<string> GetAccounts(StorageType storageType);
|
||||
}
|
||||
|
||||
@ -89,14 +92,24 @@ public class Storage : IStorage {
|
||||
};
|
||||
}
|
||||
|
||||
public (string?, string?) GetStorageAccountNameAndKey(string accountId) {
|
||||
public async Async.Task<(string?, string?)> GetStorageAccountNameAndKey(string accountId) {
|
||||
var resourceId = new ResourceIdentifier(accountId);
|
||||
var armClient = GetMgmtClient();
|
||||
var storageAccount = armClient.GetStorageAccountResource(resourceId);
|
||||
var key = storageAccount.GetKeys().Value.Keys.FirstOrDefault();
|
||||
var keys = await storageAccount.GetKeysAsync();
|
||||
var key = keys.Value.Keys.FirstOrDefault();
|
||||
return (resourceId.Name, key?.Value);
|
||||
}
|
||||
|
||||
public async Async.Task<string?> GetStorageAccountNameAndKeyByName(string accountName) {
|
||||
var armClient = GetMgmtClient();
|
||||
var resourceGroup = _creds.GetResourceGroupResourceIdentifier();
|
||||
var storageAccount = await armClient.GetResourceGroupResource(resourceGroup).GetStorageAccountAsync(accountName);
|
||||
var keys = await storageAccount.Value.GetKeysAsync();
|
||||
var key = keys.Value.Keys.FirstOrDefault();
|
||||
return key?.Value;
|
||||
}
|
||||
|
||||
public string ChooseAccounts(StorageType storageType) {
|
||||
var accounts = GetAccounts(storageType);
|
||||
if (!accounts.Any()) {
|
||||
|
@ -4,9 +4,9 @@ namespace Microsoft.OneFuzz.Service;
|
||||
|
||||
|
||||
public interface ISubnet {
|
||||
System.Threading.Tasks.Task<VirtualNetworkResource?> GetVnet(string vnetName);
|
||||
Async.Task<VirtualNetworkResource?> GetVnet(string vnetName);
|
||||
|
||||
System.Threading.Tasks.Task<SubnetResource?> GetSubnet(string vnetName, string subnetName);
|
||||
Async.Task<SubnetResource?> GetSubnet(string vnetName, string subnetName);
|
||||
}
|
||||
|
||||
public partial class TimerProxy {
|
||||
@ -17,7 +17,7 @@ public partial class TimerProxy {
|
||||
_creds = creds;
|
||||
}
|
||||
|
||||
public async System.Threading.Tasks.Task<SubnetResource?> GetSubnet(string vnetName, string subnetName) {
|
||||
public async Async.Task<SubnetResource?> GetSubnet(string vnetName, string subnetName) {
|
||||
var vnet = await this.GetVnet(vnetName);
|
||||
|
||||
if (vnet != null) {
|
||||
@ -26,7 +26,7 @@ public partial class TimerProxy {
|
||||
return null;
|
||||
}
|
||||
|
||||
public async System.Threading.Tasks.Task<VirtualNetworkResource?> GetVnet(string vnetName) {
|
||||
public async Async.Task<VirtualNetworkResource?> GetVnet(string vnetName) {
|
||||
var resourceGroupId = _creds.GetResourceGroupResourceIdentifier();
|
||||
var response = await _creds.ArmClient.GetResourceGroupResource(resourceGroupId).GetVirtualNetworkAsync(vnetName);
|
||||
return response.Value;
|
||||
|
@ -71,7 +71,7 @@ public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations {
|
||||
return QueryAsync(filter: timeFilter);
|
||||
}
|
||||
|
||||
public async System.Threading.Tasks.Task MarkStopping(Task task) {
|
||||
public async Async.Task MarkStopping(Task task) {
|
||||
if (TaskStateHelper.ShuttingDown().Contains(task.State)) {
|
||||
_logTracer.Verbose($"ignoring post - task stop calls to stop {task.JobId}:{task.TaskId}");
|
||||
return;
|
||||
@ -105,7 +105,7 @@ public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations {
|
||||
await MarkDependantsFailed(task, taskInJob);
|
||||
}
|
||||
|
||||
private async System.Threading.Tasks.Task MarkDependantsFailed(Task task, List<Task>? taskInJob = null) {
|
||||
private async Async.Task MarkDependantsFailed(Task task, List<Task>? taskInJob = null) {
|
||||
taskInJob = taskInJob ?? await QueryAsync(filter: $"job_id eq ''{task.JobId}").ToListAsync();
|
||||
|
||||
foreach (var t in taskInJob) {
|
||||
|
@ -175,7 +175,7 @@ public class WebhookMessageLogOperations : Orm<WebhookMessageLog>, IWebhookMessa
|
||||
}
|
||||
}
|
||||
|
||||
private async System.Threading.Tasks.Task Process(WebhookMessageLog message) {
|
||||
private async Async.Task Process(WebhookMessageLog message) {
|
||||
|
||||
if (message.State == WebhookMessageState.Failed || message.State == WebhookMessageState.Succeeded) {
|
||||
_log.WithTags(
|
||||
|
@ -88,7 +88,7 @@ namespace ApiService.OneFuzzLib.Orm {
|
||||
|
||||
public async Task<TableClient> GetTableClient(string table, string? accountId = null) {
|
||||
var account = accountId ?? _config.OneFuzzFuncStorage ?? throw new ArgumentNullException(nameof(accountId));
|
||||
var (name, key) = _storage.GetStorageAccountNameAndKey(account);
|
||||
var (name, key) = await _storage.GetStorageAccountNameAndKey(account);
|
||||
var tableClient = new TableServiceClient(new Uri($"https://{name}.table.core.windows.net"), new TableSharedKeyCredential(name, key));
|
||||
await tableClient.CreateTableIfNotExistsAsync(table);
|
||||
return tableClient.GetTableClient(table);
|
||||
@ -108,9 +108,9 @@ namespace ApiService.OneFuzzLib.Orm {
|
||||
|
||||
|
||||
public interface IStatefulOrm<T, TState> : IOrm<T> where T : StatefulEntityBase<TState> where TState : Enum {
|
||||
System.Threading.Tasks.Task<T?> ProcessStateUpdate(T entity);
|
||||
Async.Task<T?> ProcessStateUpdate(T entity);
|
||||
|
||||
System.Threading.Tasks.Task<T?> ProcessStateUpdates(T entity, int MaxUpdates = 5);
|
||||
Async.Task<T?> ProcessStateUpdates(T entity, int MaxUpdates = 5);
|
||||
}
|
||||
|
||||
|
||||
@ -143,7 +143,7 @@ namespace ApiService.OneFuzzLib.Orm {
|
||||
/// </summary>
|
||||
/// <param name="entity"></param>
|
||||
/// <returns></returns>
|
||||
public async System.Threading.Tasks.Task<T?> ProcessStateUpdate(T entity) {
|
||||
public async Async.Task<T?> ProcessStateUpdate(T entity) {
|
||||
TState state = entity.State;
|
||||
var func = _stateFuncs.GetOrAdd(state.ToString(), (string k) =>
|
||||
typeof(T).GetMethod(k) switch {
|
||||
@ -165,7 +165,7 @@ namespace ApiService.OneFuzzLib.Orm {
|
||||
/// <param name="MaxUpdates"></param>
|
||||
/// <returns></returns>
|
||||
/// <exception cref="NotImplementedException"></exception>
|
||||
public async System.Threading.Tasks.Task<T?> ProcessStateUpdates(T entity, int MaxUpdates = 5) {
|
||||
public async Async.Task<T?> ProcessStateUpdates(T entity, int MaxUpdates = 5) {
|
||||
for (int i = 0; i < MaxUpdates; i++) {
|
||||
var state = entity.State;
|
||||
var newEntity = await ProcessStateUpdate(entity);
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -176,8 +176,8 @@ namespace Tests {
|
||||
public static Gen<Scaleset> Scaleset() {
|
||||
return Arb.Generate<Tuple<
|
||||
Tuple<string, Guid, ScalesetState, Authentication?, string, string, string>,
|
||||
Tuple<int, bool, bool, bool, List<ScalesetNodeState>, Guid?, Guid?>,
|
||||
Tuple<Dictionary<string, string>>>>().Select(
|
||||
Tuple<int, bool, bool, bool, Error?, List<ScalesetNodeState>, Guid?>,
|
||||
Tuple<Guid?, Dictionary<string, string>>>>().Select(
|
||||
arg =>
|
||||
new Scaleset(
|
||||
PoolName: arg.Item1.Item1,
|
||||
@ -192,11 +192,12 @@ namespace Tests {
|
||||
SpotInstance: arg.Item2.Item2,
|
||||
EphemeralOsDisks: arg.Item2.Item3,
|
||||
NeedsConfigUpdate: arg.Item2.Item4,
|
||||
Nodes: arg.Item2.Item5,
|
||||
ClientId: arg.Item2.Item6,
|
||||
ClientObjectId: arg.Item2.Item7,
|
||||
Error: arg.Item2.Item5,
|
||||
Nodes: arg.Item2.Item6,
|
||||
ClientId: arg.Item2.Item7,
|
||||
|
||||
Tags: arg.Item3.Item1
|
||||
ClientObjectId: arg.Item3.Item1,
|
||||
Tags: arg.Item3.Item2
|
||||
)
|
||||
);
|
||||
}
|
||||
|
Reference in New Issue
Block a user