Implement scaleset function for C# (#2191)

Implements #2194
This commit is contained in:
George Pollard 2022-08-17 15:00:57 +12:00 committed by GitHub
parent a3f1d59f70
commit d1bfaefd0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 649 additions and 197 deletions

View File

@ -0,0 +1,260 @@
using System.Buffers.Binary;
using System.Diagnostics;
using System.Security.Cryptography;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
namespace Microsoft.OneFuzz.Service.Functions;
public class Scaleset {
private readonly ILogTracer _log;
private readonly IEndpointAuthorization _auth;
private readonly IOnefuzzContext _context;
public Scaleset(ILogTracer log, IEndpointAuthorization auth, IOnefuzzContext context) {
_log = log;
_auth = auth;
_context = context;
}
[Function("Scaleset")]
public Async.Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Anonymous, "GET", "PATCH", "POST", "DELETE")] HttpRequestData req) {
return _auth.CallIfUser(req, r => r.Method switch {
"GET" => Get(r),
"PATCH" => Patch(r),
"POST" => Post(r),
"DELETE" => Delete(r),
_ => throw new InvalidOperationException("Unsupported HTTP method"),
});
}
private async Task<HttpResponseData> Delete(HttpRequestData req) {
var request = await RequestHandling.ParseRequest<ScalesetStop>(req);
if (!request.IsOk) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "ScalesetDelete");
}
var answer = await _auth.CheckRequireAdmins(req);
if (!answer.IsOk) {
return await _context.RequestHandling.NotOk(req, answer.ErrorV, "ScalesetDelete");
}
var scalesetResult = await _context.ScalesetOperations.GetById(request.OkV.ScalesetId);
if (!scalesetResult.IsOk) {
return await _context.RequestHandling.NotOk(req, scalesetResult.ErrorV, "ScalesetDelete");
}
var scaleset = scalesetResult.OkV;
await _context.ScalesetOperations.SetShutdown(scaleset, request.OkV.Now);
return await RequestHandling.Ok(req, true);
}
private async Task<HttpResponseData> Post(HttpRequestData req) {
var request = await RequestHandling.ParseRequest<ScalesetCreate>(req);
if (!request.IsOk) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "ScalesetCreate");
}
var answer = await _auth.CheckRequireAdmins(req);
if (!answer.IsOk) {
return await _context.RequestHandling.NotOk(req, answer.ErrorV, "ScalesetCreate");
}
var create = request.OkV;
// verify the pool exists
var poolResult = await _context.PoolOperations.GetByName(create.PoolName);
if (!poolResult.IsOk) {
return await _context.RequestHandling.NotOk(req, answer.ErrorV, "ScalesetCreate");
}
var pool = poolResult.OkV;
if (!pool.Managed) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.UNABLE_TO_CREATE,
Errors: new string[] { "scalesets can only be added to managed pools " }),
context: "ScalesetCreate");
}
string region;
if (create.Region is null) {
region = await _context.Creds.GetBaseRegion();
} else {
var validRegions = await _context.Creds.GetRegions();
if (!validRegions.Contains(create.Region)) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.UNABLE_TO_CREATE,
Errors: new string[] { "invalid region" }),
context: "ScalesetCreate");
}
region = create.Region;
}
var availableSkus = await _context.VmssOperations.ListAvailableSkus(region);
if (!availableSkus.Contains(create.VmSku)) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.UNABLE_TO_CREATE,
Errors: new string[] { $"The specified VM SKU '{create.VmSku}' is not available in the location ${region}" }),
context: "ScalesetCreate");
}
var tags = create.Tags;
var configTags = (await _context.ConfigOperations.Fetch()).VmssTags;
if (configTags is not null) {
foreach (var (key, value) in configTags) {
tags[key] = value;
}
}
var scaleset = new Service.Scaleset(
ScalesetId: Guid.NewGuid(),
State: ScalesetState.Init,
NeedsConfigUpdate: false,
Auth: GenerateAuthentication(),
PoolName: create.PoolName,
VmSku: create.VmSku,
Image: create.Image,
Region: region,
Size: create.Size,
SpotInstances: create.SpotInstances,
EphemeralOsDisks: create.EphemeralOsDisks,
Tags: tags);
var inserted = await _context.ScalesetOperations.Insert(scaleset);
if (!inserted.IsOk) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.UNABLE_TO_CREATE,
new string[] { $"unable to insert scaleset: {inserted.ErrorV}" }
),
context: "ScalesetCreate");
}
if (create.AutoScale is AutoScaleOptions options) {
var autoScale = new AutoScale(
scaleset.ScalesetId,
Min: options.Min,
Max: options.Max,
Default: options.Default,
ScaleOutAmount: options.ScaleOutAmount,
ScaleOutCooldown: options.ScaleOutCooldown,
ScaleInAmount: options.ScaleInAmount,
ScaleInCooldown: options.ScaleInCooldown);
await _context.AutoScaleOperations.Insert(autoScale);
}
return await RequestHandling.Ok(req, ScalesetResponse.ForScaleset(scaleset));
}
private static Authentication GenerateAuthentication() {
using var rsa = RSA.Create(2048);
var privateKey = rsa.ExportRSAPrivateKey();
var formattedPrivateKey = $"-----BEGIN RSA PRIVATE KEY-----\n{Convert.ToBase64String(privateKey)}\n-----END RSA PRIVATE KEY-----\n";
var publicKey = BuildPublicKey(rsa);
var formattedPublicKey = $"ssh-rsa {Convert.ToBase64String(publicKey)} onefuzz-generated-key";
return new Authentication(
Password: Guid.NewGuid().ToString(),
PublicKey: formattedPublicKey,
PrivateKey: formattedPrivateKey);
}
private static ReadOnlySpan<byte> SSHRSABytes => new byte[] { (byte)'s', (byte)'s', (byte)'h', (byte)'-', (byte)'r', (byte)'s', (byte)'a' };
private static byte[] BuildPublicKey(RSA rsa) {
static Span<byte> WriteLengthPrefixedBytes(ReadOnlySpan<byte> src, Span<byte> dest) {
BinaryPrimitives.WriteInt32BigEndian(dest, src.Length);
dest = dest[sizeof(int)..];
src.CopyTo(dest);
return dest[src.Length..];
}
var parameters = rsa.ExportParameters(includePrivateParameters: false);
// public key format is "ssh-rsa", exponent, modulus, all written
// as (big-endian) length-prefixed bytes
var result = new byte[sizeof(int) + SSHRSABytes.Length + sizeof(int) + parameters.Modulus!.Length + sizeof(int) + parameters.Exponent!.Length];
var spanResult = result.AsSpan();
spanResult = WriteLengthPrefixedBytes(SSHRSABytes, spanResult);
spanResult = WriteLengthPrefixedBytes(parameters.Exponent, spanResult);
spanResult = WriteLengthPrefixedBytes(parameters.Modulus, spanResult);
Debug.Assert(spanResult.Length == 0);
return result;
}
private async Task<HttpResponseData> Patch(HttpRequestData req) {
var request = await RequestHandling.ParseRequest<ScalesetUpdate>(req);
if (!request.IsOk) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "ScalesetUpdate");
}
var answer = await _auth.CheckRequireAdmins(req);
if (!answer.IsOk) {
return await _context.RequestHandling.NotOk(req, answer.ErrorV, "ScalesetUpdate");
}
var scalesetResult = await _context.ScalesetOperations.GetById(request.OkV.ScalesetId);
if (!scalesetResult.IsOk) {
return await _context.RequestHandling.NotOk(req, scalesetResult.ErrorV, "ScalesetUpdate");
}
var scaleset = scalesetResult.OkV;
if (!scaleset.State.CanUpdate()) {
return await _context.RequestHandling.NotOk(
req,
new Error(
Code: ErrorCode.INVALID_REQUEST,
Errors: new[] { $"scaleset must be in one of the following states to update: {string.Join(", ", ScalesetStateHelper.CanUpdateStates)}" }),
"ScalesetUpdate");
}
if (request.OkV.Size is long size) {
scaleset = await _context.ScalesetOperations.SetSize(scaleset, size);
}
scaleset = scaleset with { Auth = null };
return await RequestHandling.Ok(req, ScalesetResponse.ForScaleset(scaleset));
}
private async Task<HttpResponseData> Get(HttpRequestData req) {
var request = await RequestHandling.ParseRequest<ScalesetSearch>(req);
if (!request.IsOk) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "ScalesetSearch");
}
var search = request.OkV;
if (search.ScalesetId is Guid id) {
var scalesetResult = await _context.ScalesetOperations.GetById(id);
if (!scalesetResult.IsOk) {
return await _context.RequestHandling.NotOk(req, scalesetResult.ErrorV, "ScalesetSearch");
}
var scaleset = scalesetResult.OkV;
var response = ScalesetResponse.ForScaleset(scaleset);
response = response with { Nodes = await _context.ScalesetOperations.GetNodes(scaleset) };
if (!search.IncludeAuth) {
response = response with { Auth = null };
}
return await RequestHandling.Ok(req, response);
}
var states = search.State ?? Enumerable.Empty<ScalesetState>();
var scalesets = await _context.ScalesetOperations.SearchStates(states).ToListAsync();
// don't return auths during list actions, only 'get'
var result = scalesets.Select(ss => ScalesetResponse.ForScaleset(ss with { Auth = null }));
return await RequestHandling.Ok(req, result);
}
}

View File

@ -15,7 +15,7 @@ public class TimerWorkers {
_nodeOps = context.NodeOperations;
}
public async Async.Task ProcessScalesets(Scaleset scaleset) {
public async Async.Task ProcessScalesets(Service.Scaleset scaleset) {
_log.Verbose($"checking scaleset for updates: {scaleset.ScalesetId}");
await _scaleSetOps.UpdateConfigs(scaleset);

View File

@ -138,29 +138,49 @@ public static class JobStateHelper {
public static class ScalesetStateHelper {
private static readonly IReadOnlySet<ScalesetState> _canUpdate = new HashSet<ScalesetState> { ScalesetState.Init, ScalesetState.Resize };
private static readonly IReadOnlySet<ScalesetState> _needsWork =
new HashSet<ScalesetState>{
private static readonly HashSet<ScalesetState> _canUpdate =
new() {
ScalesetState.Init,
ScalesetState.Resize,
};
private static readonly HashSet<ScalesetState> _needsWork =
new() {
ScalesetState.Init,
ScalesetState.Setup,
ScalesetState.Resize,
ScalesetState.Shutdown,
ScalesetState.Halt
ScalesetState.Halt,
};
private static readonly HashSet<ScalesetState> _available =
new() {
ScalesetState.Resize,
ScalesetState.Running,
};
private static readonly HashSet<ScalesetState> _resizing =
new() {
ScalesetState.Halt,
ScalesetState.Init,
ScalesetState.Setup,
};
private static readonly IReadOnlySet<ScalesetState> _available = new HashSet<ScalesetState> { ScalesetState.Resize, ScalesetState.Running };
private static readonly IReadOnlySet<ScalesetState> _resizing = new HashSet<ScalesetState> { ScalesetState.Halt, ScalesetState.Init, ScalesetState.Setup };
/// set of states that indicate the scaleset can be updated
public static IReadOnlySet<ScalesetState> CanUpdate => _canUpdate;
public static bool CanUpdate(this ScalesetState state) => _canUpdate.Contains(state);
public static IReadOnlySet<ScalesetState> CanUpdateStates => _canUpdate;
/// set of states that indicate work is needed during eventing
public static IReadOnlySet<ScalesetState> NeedsWork => _needsWork;
public static bool NeedsWork(this ScalesetState state) => _needsWork.Contains(state);
public static IReadOnlySet<ScalesetState> NeedsWorkStates => _needsWork;
/// set of states that indicate if it's available for work
public static IReadOnlySet<ScalesetState> Available => _available;
public static bool IsAvailable(this ScalesetState state) => _available.Contains(state);
public static IReadOnlySet<ScalesetState> AvailableStates => _available;
/// set of states that indicate scaleset is resizing
public static IReadOnlySet<ScalesetState> Resizing => _resizing;
public static bool IsResizing(this ScalesetState state) => _resizing.Contains(state);
public static IReadOnlySet<ScalesetState> ResizingStates => _resizing;
}

View File

@ -378,26 +378,21 @@ public record AutoScale(
long Max,
long Default,
long ScaleOutAmount,
long ScaleOutCoolDown,
long ScaleOutCooldown,
long ScaleInAmount,
long ScaleInCoolDown
) : EntityBase();
long ScaleInCooldown
) : EntityBase;
public record ScalesetNodeState(
Guid MachineId,
string InstanceId,
NodeState? State
);
public record Scaleset(
[PartitionKey] PoolName PoolName,
[RowKey] Guid ScalesetId,
ScalesetState State,
Authentication? Auth,
string VmSku,
string Image,
Region Region,
@ -405,11 +400,12 @@ public record Scaleset(
bool? SpotInstances,
bool EphemeralOsDisks,
bool NeedsConfigUpdate,
Error? Error,
List<ScalesetNodeState>? Nodes,
Guid? ClientId,
Guid? ClientObjectId,
Dictionary<string, string> Tags
Dictionary<string, string> Tags,
Authentication? Auth = null,
Error? Error = null,
Guid? ClientId = null,
Guid? ClientObjectId = null
// 'Nodes' removed when porting from Python: only used in search response
) : StatefulEntityBase<ScalesetState>(State);
[JsonConverter(typeof(ContainerConverter))]

View File

@ -1,4 +1,5 @@
using System.Text.Json.Serialization;
using System.ComponentModel.DataAnnotations;
using System.Text.Json.Serialization;
namespace Microsoft.OneFuzz.Service;
@ -164,6 +165,46 @@ public record ProxyReset(
string Region
);
public record ScalesetCreate(
PoolName PoolName,
string VmSku,
string Image,
string? Region,
[property: Range(1, long.MaxValue)]
long Size,
bool SpotInstances,
Dictionary<string, string> Tags,
bool EphemeralOsDisks = false,
AutoScaleOptions? AutoScale = null
);
public record AutoScaleOptions(
[property: Range(0, long.MaxValue)] long Min,
[property: Range(1, long.MaxValue)] long Max,
[property: Range(0, long.MaxValue)] long Default,
[property: Range(1, long.MaxValue)] long ScaleOutAmount,
[property: Range(1, long.MaxValue)] long ScaleOutCooldown,
[property: Range(1, long.MaxValue)] long ScaleInAmount,
[property: Range(1, long.MaxValue)] long ScaleInCooldown
);
public record ScalesetSearch(
Guid? ScalesetId = null,
List<ScalesetState>? State = null,
bool IncludeAuth = false
);
public record ScalesetStop(
Guid ScalesetId,
bool Now
);
public record ScalesetUpdate(
Guid ScalesetId,
[property: Range(1, long.MaxValue)]
long? Size
);
public record TaskGet(Guid TaskId);
public record TaskSearch(

View File

@ -105,6 +105,44 @@ public record PoolGetResult(
List<ScalesetSummary>? ScalesetSummary
) : BaseResponse();
public record ScalesetResponse(
PoolName PoolName,
Guid ScalesetId,
ScalesetState State,
Authentication? Auth,
string VmSku,
string Image,
string Region,
long Size,
bool? SpotInstances,
bool EmphemeralOsDisks,
bool NeedsConfigUpdate,
Error? Error,
Guid? ClientId,
Guid? ClientObjectId,
Dictionary<string, string> Tags,
List<ScalesetNodeState>? Nodes
) : BaseResponse() {
public static ScalesetResponse ForScaleset(Scaleset s)
=> new(
PoolName: s.PoolName,
ScalesetId: s.ScalesetId,
State: s.State,
Auth: s.Auth,
VmSku: s.VmSku,
Image: s.Image,
Region: s.Region,
Size: s.Size,
SpotInstances: s.SpotInstances,
EmphemeralOsDisks: s.EphemeralOsDisks,
NeedsConfigUpdate: s.NeedsConfigUpdate,
Error: s.Error,
ClientId: s.ClientId,
ClientObjectId: s.ClientObjectId,
Tags: s.Tags,
Nodes: null);
}
public class BaseResponseConverter : JsonConverter<BaseResponse> {
public override BaseResponse? Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) {
return null;
@ -130,4 +168,3 @@ public record ProxyInfo(
public record ProxyList(
List<ProxyInfo> Proxies
);

View File

@ -100,6 +100,7 @@ public class Program {
.AddScoped<IEndpointAuthorization, EndpointAuthorization>()
.AddScoped<INodeMessageOperations, NodeMessageOperations>()
.AddScoped<ISubnet, Subnet>()
.AddScoped<IAutoScaleOperations, AutoScaleOperations>()
.AddSingleton<ICreds, Creds>()
.AddSingleton<IServiceConfig, ServiceConfiguration>()

View File

@ -4,18 +4,10 @@ using Microsoft.Azure.Management.Monitor;
namespace Microsoft.OneFuzz.Service;
public interface IAutoScaleOperations {
Async.Task<AutoScale> Create(
Guid scalesetId,
long minAmount,
long maxAmount,
long defaultAmount,
long scaleOutAmount,
long scaleOutCooldown,
long scaleInAmount,
long scaleInCooldown);
Async.Task<AutoScale?> GetSettingsForScaleset(Guid scalesetId);
public Async.Task<ResultVoid<(int, string)>> Insert(AutoScale autoScale);
public Async.Task<AutoScale> GetSettingsForScaleset(Guid scalesetId);
Azure.Management.Monitor.Models.AutoscaleProfile CreateAutoScaleProfile(
string queueUri,
@ -27,7 +19,8 @@ public interface IAutoScaleOperations {
long scaleInAmount,
double scaleInCooldownMinutes);
Azure.Management.Monitor.Models.AutoscaleProfile DeafaultAutoScaleProfile(string queueUri, long scaleSetSize);
Azure.Management.Monitor.Models.AutoscaleProfile DefaultAutoScaleProfile(string queueUri, long scaleSetSize);
Async.Task<OneFuzzResultVoid> AddAutoScaleToVmss(Guid vmss, Azure.Management.Monitor.Models.AutoscaleProfile autoScaleProfile);
}
@ -35,43 +28,10 @@ public interface IAutoScaleOperations {
public class AutoScaleOperations : Orm<AutoScale>, IAutoScaleOperations {
public AutoScaleOperations(ILogTracer log, IOnefuzzContext context)
: base(log, context) {
}
public async Async.Task<AutoScale> Create(
Guid scalesetId,
long minAmount,
long maxAmount,
long defaultAmount,
long scaleOutAmount,
long scaleOutCooldown,
long scaleInAmount,
long scaleInCooldown) {
var entry = new AutoScale(
scalesetId,
Min: minAmount,
Max: maxAmount,
Default: defaultAmount,
ScaleOutAmount: scaleOutAmount,
ScaleOutCoolDown: scaleOutCooldown,
ScaleInAmount: scaleInAmount,
ScaleInCoolDown: scaleInCooldown
);
var r = await Insert(entry);
if (!r.IsOk) {
_logTracer.Error($"Failed to save auto-scale record for scaleset ID: {scalesetId}, minAmount: {minAmount}, maxAmount: {maxAmount}, defaultAmount: {defaultAmount}, scaleOutAmount: {scaleOutAmount}, scaleOutCooldown: {scaleOutCooldown}, scaleInAmount: {scaleInAmount}, scaleInCooldown: {scaleInCooldown}");
}
return entry;
}
public async Async.Task<AutoScale?> GetSettingsForScaleset(Guid scalesetId) {
var autoscale = await GetEntityAsync(scalesetId.ToString(), scalesetId.ToString());
return autoscale;
}
: base(log, context) { }
public Async.Task<AutoScale> GetSettingsForScaleset(Guid scalesetId)
=> GetEntityAsync(scalesetId.ToString(), scalesetId.ToString());
public async Async.Task<OneFuzzResultVoid> AddAutoScaleToVmss(Guid vmss, Azure.Management.Monitor.Models.AutoscaleProfile autoScaleProfile) {
_logTracer.Info($"Checking scaleset {vmss} for existing auto scale resource");
@ -136,14 +96,14 @@ public class AutoScaleOperations : Orm<AutoScale>, IAutoScaleOperations {
//TODO: Do this using bicep template
public Azure.Management.Monitor.Models.AutoscaleProfile CreateAutoScaleProfile(
string queueUri,
long minAmount,
long maxAmount,
long defaultAmount,
long scaleOutAmount,
double scaleOutCooldownMinutes,
long scaleInAmount,
double scaleInCooldownMinutes) {
string queueUri,
long minAmount,
long maxAmount,
long defaultAmount,
long scaleOutAmount,
double scaleOutCooldownMinutes,
long scaleInAmount,
double scaleInCooldownMinutes) {
var rules = new[] {
//Scale out
@ -200,7 +160,7 @@ public class AutoScaleOperations : Orm<AutoScale>, IAutoScaleOperations {
}
public Azure.Management.Monitor.Models.AutoscaleProfile DeafaultAutoScaleProfile(string queueUri, long scaleSetSize) {
public Azure.Management.Monitor.Models.AutoscaleProfile DefaultAutoScaleProfile(string queueUri, long scaleSetSize) {
return CreateAutoScaleProfile(queueUri, 1L, scaleSetSize, scaleSetSize, 1, 10.0, 1, 5.0);
}

View File

@ -24,6 +24,8 @@ public interface ICreds {
public ResourceGroupResource GetResourceGroupResource();
public SubscriptionResource GetSubscriptionResource();
public Async.Task<string> GetBaseRegion();
public Uri GetInstanceUrl();
@ -91,6 +93,11 @@ public sealed class Creds : ICreds, IDisposable {
return ArmClient.GetResourceGroupResource(resourceId);
}
public SubscriptionResource GetSubscriptionResource() {
var id = SubscriptionResource.CreateResourceIdentifier(GetSubscription());
return ArmClient.GetSubscriptionResource(id);
}
public Async.Task<string> GetBaseRegion() {
return _cache.GetOrCreateAsync(nameof(GetBaseRegion), async _ => {
var rg = await ArmClient.GetResourceGroupResource(GetResourceGroupResourceIdentifier()).GetAsync();
@ -188,6 +195,7 @@ public sealed class Creds : ICreds, IDisposable {
.Select(x => x.Name)
.ToListAsync();
});
}

View File

@ -141,25 +141,25 @@ public class NodeOperations : StatefulOrm<Node, NodeState, NodeOperations>, INod
if (node.ScalesetId != null) {
var scalesetResult = await _context.ScalesetOperations.GetById(node.ScalesetId.Value);
if (!scalesetResult.IsOk || scalesetResult.OkV == null) {
if (!scalesetResult.IsOk) {
_logTracer.Info($"can_process_new_work invalid scaleset. scaleset_id:{node.ScalesetId} machine_id:{node.MachineId}");
return false;
}
var scaleset = scalesetResult.OkV!;
if (!ScalesetStateHelper.Available.Contains(scaleset.State)) {
var scaleset = scalesetResult.OkV;
if (!scaleset.State.IsAvailable()) {
_logTracer.Info($"can_process_new_work scaleset not available for work. scaleset_id:{node.ScalesetId} machine_id:{node.MachineId}");
return false;
}
}
var poolResult = await _context.PoolOperations.GetByName(node.PoolName);
if (!poolResult.IsOk || poolResult.OkV == null) {
if (!poolResult.IsOk) {
_logTracer.Info($"can_schedule - invalid pool. pool_name:{node.PoolName} machine_id:{node.MachineId}");
return false;
}
var pool = poolResult.OkV!;
var pool = poolResult.OkV;
if (!PoolStateHelper.Available.Contains(pool.State)) {
_logTracer.Info($"can_schedule - pool is not available for work. pool_name:{node.PoolName} machine_id:{node.MachineId}");
return false;

View File

@ -3,7 +3,7 @@
using Microsoft.Extensions.DependencyInjection;
public interface IOnefuzzContext {
IAutoScaleOperations AutoScaleOperations { get; }
IConfig Config { get; }
IConfigOperations ConfigOperations { get; }
IContainers Containers { get; }
@ -40,8 +40,6 @@ public interface IOnefuzzContext {
INsgOperations NsgOperations { get; }
ISubnet Subnet { get; }
IImageOperations ImageOperations { get; }
IAutoScaleOperations AutoScaleOperations { get; }
}
public class OnefuzzContext : IOnefuzzContext {

View File

@ -1,4 +1,5 @@
using System.Net;
using System.ComponentModel.DataAnnotations;
using System.Net;
using System.Text.Json;
using System.Text.Json.Nodes;
using Faithlife.Utility;
@ -35,7 +36,15 @@ public class RequestHandling : IRequestHandling {
try {
var t = await req.ReadFromJsonAsync<T>();
if (t != null) {
return OneFuzzResult<T>.Ok(t);
var validationContext = new ValidationContext(t);
var validationResults = new List<ValidationResult>();
if (Validator.TryValidateObject(t, validationContext, validationResults, true)) {
return OneFuzzResult.Ok(t);
} else {
return new Error(
Code: ErrorCode.INVALID_REQUEST,
Errors: validationResults.Select(vr => vr.ToString()).ToArray());
}
}
} catch (Exception e) {
exception = e;

View File

@ -1,6 +1,7 @@
using System.Threading.Tasks;
using ApiService.OneFuzzLib.Orm;
using Azure.ResourceManager.Compute;
using Microsoft.Azure.Management.Monitor.Models;
namespace Microsoft.OneFuzz.Service;
@ -19,15 +20,18 @@ public interface IScalesetOperations : IStatefulOrm<Scaleset, ScalesetState> {
Async.Task SetSize(Scaleset scaleset, int size);
Async.Task SyncScalesetSize(Scaleset scaleset);
Async.Task<Scaleset> SetShutdown(Scaleset scaleset, bool now);
Async.Task<Scaleset> SetState(Scaleset scaleset, ScalesetState state);
public Async.Task<List<ScalesetNodeState>> GetNodes(Scaleset scaleset);
IAsyncEnumerable<Scaleset> SearchStates(IEnumerable<ScalesetState> states);
Async.Task<Scaleset> SetShutdown(Scaleset scaleset, bool now);
Async.Task<Scaleset> SetSize(Scaleset scaleset, long size);
}
public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetOperations>, IScalesetOperations {
const string SCALESET_LOG_PREFIX = "scalesets: ";
ILogTracer _log;
private readonly ILogTracer _log;
public ScalesetOperations(ILogTracer log, IOnefuzzContext context)
: base(log, context) {
@ -105,33 +109,26 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
} else {
await ResizeShrink(scaleset, vmssSize - scaleset.Size);
}
}
static int ScalesetMaxSize(string image) {
// https://docs.microsoft.com/en-us/azure/virtual-machine-scale-sets/
// virtual-machine-scale-sets-placement-groups#checklist-for-using-large-scale-sets
if (image.StartsWith('/'))
return 600;
else
return 1000;
}
static int MaxSize(Scaleset scaleset) {
return ScalesetMaxSize(scaleset.Image);
}
public async Async.Task<Scaleset> SetState(Scaleset scaleset, ScalesetState state) {
if (scaleset.State == state)
if (scaleset.State == state) {
return scaleset;
}
if (scaleset.State == ScalesetState.Halt)
if (scaleset.State == ScalesetState.Halt) {
// terminal state, unable to change
// TODO: should this throw an exception instead?
return scaleset;
}
var updatedScaleSet = scaleset with { State = state };
var r = await Update(updatedScaleSet);
if (!r.IsOk) {
_log.Error($"Failed to update scaleset {scaleset.ScalesetId} when updating state from {scaleset.State} to {state}");
var msg = "Failed to update scaleset {scaleSet.ScalesetId} when updating state from {scaleSet.State} to {state}";
_log.Error(msg);
// TODO: this should really return OneFuzzResult but then that propagates up the call stack
throw new Exception(msg);
}
if (state == ScalesetState.Resize) {
@ -144,14 +141,17 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
);
}
return updatedScaleSet;
return scaleset;
}
async Async.Task<Scaleset> SetFailed(Scaleset scaleset, Error error) {
if (scaleset.Error is not null)
if (scaleset.Error is not null) {
// already has an error, don't overwrite it
return scaleset;
}
var updatedScaleset = await SetState(scaleset with { Error = error }, ScalesetState.CreationFailed);
await _context.Events.SendEvent(new EventScalesetFailed(scaleset.ScalesetId, scaleset.PoolName, error));
return updatedScaleset;
}
@ -174,9 +174,9 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
var pool = await _context.PoolOperations.GetByName(scaleSet.PoolName);
if (!pool.IsOk || pool.OkV is null) {
if (!pool.IsOk) {
_log.Error($"{SCALESET_LOG_PREFIX} unable to find pool during config update. pool:{scaleSet.PoolName}, scaleset_id:{scaleSet.ScalesetId}");
await SetFailed(scaleSet, pool.ErrorV!);
await SetFailed(scaleSet, pool.ErrorV);
return;
}
@ -189,13 +189,8 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
}
}
public async Async.Task<Scaleset> SetShutdown(Scaleset scaleset, bool now) {
if (now) {
return await SetState(scaleset, ScalesetState.Halt);
} else {
return await SetState(scaleset, ScalesetState.Shutdown);
}
}
public Async.Task<Scaleset> SetShutdown(Scaleset scaleset, bool now)
=> SetState(scaleset, now ? ScalesetState.Halt : ScalesetState.Shutdown);
public async Async.Task<Scaleset> Setup(Scaleset scaleset) {
//# TODO: How do we pass in SSH configs for Windows? Previously
@ -210,11 +205,13 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
if (!result.IsOk) {
return await SetFailed(scaleset, result.ErrorV);
}
//TODO : why are we saving scaleset here ?
var r = await Update(scaleset);
if (!r.IsOk) {
_logTracer.Error($"Failed to save scaleset {scaleset.ScalesetId} due to {r.ErrorV}");
}
return scaleset;
}
@ -285,6 +282,7 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
if (!rr.IsOk) {
_logTracer.Error($"Failed to save scale data for scale set: {scaleset.ScalesetId}");
}
return scaleset;
}
@ -308,7 +306,6 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
}
}
async Async.Task<OneFuzzResultVoid> TryEnableAutoScaling(Scaleset scaleset) {
_logTracer.Info($"Trying to add auto scaling for scaleset {scaleset.ScalesetId}");
@ -331,30 +328,30 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
var autoScaleConfig = await _context.AutoScaleOperations.GetSettingsForScaleset(scaleset.ScalesetId);
Azure.Management.Monitor.Models.AutoscaleProfile autoScaleProfile;
if (poolQueueUri is null) {
var failedToFindQueueUri = OneFuzzResultVoid.Error(ErrorCode.UNABLE_TO_FIND, $"Failed to get pool queue uri for scaleset {scaleset.ScalesetId}");
_logTracer.Error(failedToFindQueueUri.ErrorV.ToString());
return failedToFindQueueUri;
} else {
if (autoScaleConfig is null) {
autoScaleProfile = _context.AutoScaleOperations.DeafaultAutoScaleProfile(poolQueueUri!, capacity.Value);
} else {
_logTracer.Info("Using existing auto scale settings from database");
autoScaleProfile = _context.AutoScaleOperations.CreateAutoScaleProfile(
poolQueueUri!,
autoScaleConfig.Min,
autoScaleConfig.Max,
autoScaleConfig.Default,
autoScaleConfig.ScaleOutAmount,
autoScaleConfig.ScaleOutCoolDown,
autoScaleConfig.ScaleInAmount,
autoScaleConfig.ScaleInCoolDown
);
}
}
AutoscaleProfile autoScaleProfile;
if (autoScaleConfig is null) {
autoScaleProfile = _context.AutoScaleOperations.DefaultAutoScaleProfile(poolQueueUri!, capacity.Value);
} else {
_logTracer.Info("Using existing auto scale settings from database");
autoScaleProfile = _context.AutoScaleOperations.CreateAutoScaleProfile(
poolQueueUri!,
autoScaleConfig.Min,
autoScaleConfig.Max,
autoScaleConfig.Default,
autoScaleConfig.ScaleOutAmount,
autoScaleConfig.ScaleOutCooldown,
autoScaleConfig.ScaleInAmount,
autoScaleConfig.ScaleInCooldown
);
}
_logTracer.Info($"Added auto scale resource to scaleset: {scaleset.ScalesetId}");
return await _context.AutoScaleOperations.AddAutoScaleToVmss(scaleset.ScalesetId, autoScaleProfile);
}
@ -391,6 +388,7 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
} else {
return await SetState(scaleset, ScalesetState.Setup);
}
return scaleset;
}
@ -667,4 +665,48 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
}
}
}
public async Task<List<ScalesetNodeState>> GetNodes(Scaleset scaleset) {
// Be in at-least 'setup' before checking for the list of VMs
if (scaleset.State == ScalesetState.Init) {
return new List<ScalesetNodeState>();
}
var (nodes, azureNodes) = await (
_context.NodeOperations.SearchStates(scaleset.ScalesetId).ToListAsync().AsTask(),
_context.VmssOperations.ListInstanceIds(scaleset.ScalesetId));
var result = new List<ScalesetNodeState>();
foreach (var (machineId, instanceId) in azureNodes) {
var node = nodes.FirstOrDefault(n => n.MachineId == machineId);
result.Add(new ScalesetNodeState(
MachineId: machineId,
InstanceId: instanceId,
node?.State));
}
return result;
}
public IAsyncEnumerable<Scaleset> SearchStates(IEnumerable<ScalesetState> states)
=> QueryAsync(Query.EqualAnyEnum("state", states));
public Async.Task<Scaleset> SetSize(Scaleset scaleset, long size) {
var permittedSize = Math.Min(size, MaxSize(scaleset));
if (permittedSize == scaleset.Size) {
return Async.Task.FromResult(scaleset); // nothing to do
}
scaleset = scaleset with { Size = permittedSize };
return SetState(scaleset, ScalesetState.Resize);
}
private static long MaxSize(Scaleset scaleset) {
// https://docs.microsoft.com/en-us/azure/virtual-machine-scale-sets/virtual-machine-scale-sets-placement-groups#checklist-for-using-large-scale-sets
if (scaleset.Image.StartsWith("/", StringComparison.Ordinal)) {
return 600;
} else {
return 1000;
}
}
}

View File

@ -1,8 +1,11 @@
using Azure;
using System.Net;
using Azure;
using Azure.Core;
using Azure.Data.Tables;
using Azure.ResourceManager.Compute;
using Azure.ResourceManager.Compute.Models;
using Azure.ResourceManager.Models;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Rest.Azure;
namespace Microsoft.OneFuzz.Service;
@ -13,6 +16,8 @@ public interface IVmssOperations {
Async.Task<OneFuzzResultVoid> UpdateExtensions(Guid name, IList<VirtualMachineScaleSetExtensionData> extensions);
Async.Task<VirtualMachineScaleSetData?> GetVmss(Guid name);
Async.Task<IReadOnlyList<string>> ListAvailableSkus(string region);
Async.Task<bool> DeleteVmss(Guid name, bool? forceDeletion = null);
Async.Task<IDictionary<Guid, string>> ListInstanceIds(Guid name);
@ -37,17 +42,18 @@ public interface IVmssOperations {
}
public class VmssOperations : IVmssOperations {
private readonly ILogTracer _log;
private readonly ICreds _creds;
private readonly IImageOperations _imageOps;
private readonly IServiceConfig _serviceConfig;
private readonly IMemoryCache _cache;
readonly ILogTracer _log;
readonly ICreds _creds;
readonly IImageOperations _imageOps;
readonly IServiceConfig _serviceConfig;
public VmssOperations(ILogTracer log, IOnefuzzContext context) {
public VmssOperations(ILogTracer log, IOnefuzzContext context, IMemoryCache cache) {
_log = log;
_creds = context.Creds;
_imageOps = context.ImageOperations;
_serviceConfig = context.ServiceConfiguration;
_cache = cache;
}
public async Async.Task<bool> DeleteVmss(Guid name, bool? forceDeletion = null) {
@ -135,36 +141,14 @@ public class VmssOperations : IVmssOperations {
public async Async.Task<IDictionary<Guid, string>> ListInstanceIds(Guid name) {
_log.Verbose($"get instance IDs for scaleset {name}");
var results = new Dictionary<Guid, string>();
VirtualMachineScaleSetResource res;
try {
var r = await GetVmssResource(name).GetAsync();
res = r.Value;
} catch (Exception ex) when (ex is RequestFailedException) {
_log.Verbose($"vm does not exist {name}");
return results;
return await GetVmssResource(name)
.GetVirtualMachineScaleSetVms()
.ToDictionaryAsync(vm => Guid.Parse(vm.Data.VmId), vm => vm.Data.InstanceId);
} catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.NotFound || ex.ErrorCode == "NotFound") {
_log.Exception(ex, $"scaleset does not exist: {name}");
return new Dictionary<Guid, string>();
}
if (res is null) {
_log.Verbose($"vm does not exist {name}");
return results;
} else {
try {
await foreach (var instance in res!.GetVirtualMachineScaleSetVms().AsAsyncEnumerable()) {
if (instance is not null) {
Guid key;
if (Guid.TryParse(instance.Data.VmId, out key)) {
results[key] = instance.Data.InstanceId;
} else {
_log.Error($"failed to convert vmId {instance.Data.VmId} to Guid");
}
}
}
} catch (Exception ex) when (ex is RequestFailedException || ex is CloudException) {
_log.Exception(ex, $"vm does not exist {name}");
}
}
return results;
}
public async Async.Task<OneFuzzResult<VirtualMachineScaleSetVmResource>> GetInstanceVm(Guid name, Guid vmId) {
@ -223,18 +207,18 @@ public class VmssOperations : IVmssOperations {
}
public async Async.Task<OneFuzzResultVoid> CreateVmss(
string location,
Guid name,
string vmSku,
long vmCount,
string image,
string networkId,
bool? spotInstance,
bool ephemeralOsDisks,
IList<VirtualMachineScaleSetExtensionData>? extensions,
string password,
string sshPublicKey,
IDictionary<string, string> tags) {
string location,
Guid name,
string vmSku,
long vmCount,
string image,
string networkId,
bool? spotInstance,
bool ephemeralOsDisks,
IList<VirtualMachineScaleSetExtensionData>? extensions,
string password,
string sshPublicKey,
IDictionary<string, string> tags) {
var vmss = await GetVmss(name);
if (vmss is not null) {
return OneFuzzResultVoid.Ok;
@ -340,4 +324,32 @@ public class VmssOperations : IVmssOperations {
return OneFuzzResultVoid.Error(ErrorCode.VM_CREATE_FAILED, new[] { ex.Message });
}
}
public Async.Task<IReadOnlyList<string>> ListAvailableSkus(string region)
=> _cache.GetOrCreateAsync<IReadOnlyList<string>>($"compute-skus-{region}", async entry => {
entry.SetAbsoluteExpiration(TimeSpan.FromMinutes(10));
var sub = _creds.GetSubscriptionResource();
var skus = sub.GetResourceSkusAsync(filter: TableClient.CreateQueryFilter($"location eq '{region}'"));
var skuNames = new List<string>();
await foreach (var sku in skus) {
var available = true;
if (sku.Restrictions is not null) {
foreach (var restriction in sku.Restrictions) {
if (restriction.RestrictionsType == ResourceSkuRestrictionsType.Location &&
restriction.Values.Contains(region, StringComparer.OrdinalIgnoreCase)) {
available = false;
break;
}
}
}
if (available) {
skuNames.Add(sku.Name);
}
}
return skuNames;
});
}

View File

@ -76,6 +76,7 @@ public sealed class TestContext : IOnefuzzContext {
public IConfig Config => throw new System.NotImplementedException();
public IAutoScaleOperations AutoScaleOperations => throw new NotImplementedException();
public IDiskOperations DiskOperations => throw new System.NotImplementedException();
@ -112,6 +113,4 @@ public sealed class TestContext : IOnefuzzContext {
public ISubnet Subnet => throw new NotImplementedException();
public IImageOperations ImageOperations => throw new NotImplementedException();
public IAutoScaleOperations AutoScaleOperations => throw new NotImplementedException();
}

View File

@ -47,6 +47,10 @@ class TestCreds : ICreds {
throw new NotImplementedException();
}
public SubscriptionResource GetSubscriptionResource() {
throw new NotImplementedException();
}
public ResourceIdentifier GetResourceGroupResourceIdentifier() {
throw new NotImplementedException();
}

View File

@ -1,6 +1,4 @@

using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;

View File

@ -0,0 +1,68 @@
using System;
using System.Linq;
using System.Net;
using IntegrationTests.Fakes;
using Microsoft.OneFuzz.Service;
using Xunit;
using Xunit.Abstractions;
using Async = System.Threading.Tasks;
using ScalesetFunction = Microsoft.OneFuzz.Service.Functions.Scaleset;
namespace IntegrationTests.Functions;
[Trait("Category", "Live")]
public class AzureStorageScalesetTest : ScalesetTestBase {
public AzureStorageScalesetTest(ITestOutputHelper output)
: base(output, Integration.AzureStorage.FromEnvironment()) { }
}
public class AzuriteScalesetTest : ScalesetTestBase {
public AzuriteScalesetTest(ITestOutputHelper output)
: base(output, new Integration.AzuriteStorage()) { }
}
public abstract class ScalesetTestBase : FunctionTestBase {
public ScalesetTestBase(ITestOutputHelper output, IStorage storage)
: base(output, storage) { }
[Theory]
[InlineData("POST", RequestType.Agent)]
[InlineData("POST", RequestType.NoAuthorization)]
[InlineData("PATCH", RequestType.Agent)]
[InlineData("PATCH", RequestType.NoAuthorization)]
[InlineData("GET", RequestType.Agent)]
[InlineData("GET", RequestType.NoAuthorization)]
[InlineData("DELETE", RequestType.Agent)]
[InlineData("DELETE", RequestType.NoAuthorization)]
public async Async.Task UserAuthorization_IsRequired(string method, RequestType authType) {
var auth = new TestEndpointAuthorization(authType, Logger, Context);
var func = new ScalesetFunction(Logger, auth, Context);
var result = await func.Run(TestHttpRequestData.Empty(method));
Assert.Equal(HttpStatusCode.Unauthorized, result.StatusCode);
}
[Fact]
public async Async.Task Search_SpecificScaleset_ReturnsErrorIfNoneFound() {
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var req = new ScalesetSearch(ScalesetId: Guid.NewGuid());
var func = new ScalesetFunction(Logger, auth, Context);
var result = await func.Run(TestHttpRequestData.FromJson("GET", req));
Assert.Equal(HttpStatusCode.BadRequest, result.StatusCode);
var err = BodyAs<Error>(result);
Assert.Equal("unable to find scaleset", err.Errors?.Single());
}
[Fact]
public async Async.Task Search_AllScalesets_ReturnsEmptyIfNoneFound() {
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var req = new ScalesetSearch();
var func = new ScalesetFunction(Logger, auth, Context);
var result = await func.Run(TestHttpRequestData.FromJson("GET", req));
Assert.Equal(HttpStatusCode.OK, result.StatusCode);
Assert.Equal("[]", BodyAsString(result));
}
}

View File

@ -220,7 +220,7 @@ namespace Tests {
public static Gen<Scaleset> Scaleset { get; }
= from arg in Arb.Generate<Tuple<
Tuple<Guid, ScalesetState, Authentication?, string, string, string>,
Tuple<int, bool, bool, bool, Error?, List<ScalesetNodeState>, Guid?>,
Tuple<int, bool, bool, bool, Error?, Guid?>,
Tuple<Guid?, Dictionary<string, string>>>>()
from poolName in PoolNameGen
select new Scaleset(
@ -237,8 +237,7 @@ namespace Tests {
EphemeralOsDisks: arg.Item2.Item3,
NeedsConfigUpdate: arg.Item2.Item4,
Error: arg.Item2.Item5,
Nodes: arg.Item2.Item6,
ClientId: arg.Item2.Item7,
ClientId: arg.Item2.Item6,
ClientObjectId: arg.Item3.Item1,
Tags: arg.Item3.Item2);