[WIP] migrate timer_proxy (#1820)

This commit is contained in:
Cheick Keita
2022-04-22 08:43:44 -07:00
committed by GitHub
parent 350d07b7af
commit 812a6d7517
19 changed files with 787 additions and 254 deletions

View File

@ -1,201 +1,238 @@
using System.Collections.Concurrent; using System.Collections.Concurrent;
namespace Microsoft.OneFuzz.Service; namespace Microsoft.OneFuzz.Service;
public enum ErrorCode public enum ErrorCode
{ {
INVALID_REQUEST = 450, INVALID_REQUEST = 450,
INVALID_PERMISSION = 451, INVALID_PERMISSION = 451,
MISSING_EULA_AGREEMENT = 452, MISSING_EULA_AGREEMENT = 452,
INVALID_JOB = 453, INVALID_JOB = 453,
INVALID_TASK = 453, INVALID_TASK = 453,
UNABLE_TO_ADD_TASK_TO_JOB = 454, UNABLE_TO_ADD_TASK_TO_JOB = 454,
INVALID_CONTAINER = 455, INVALID_CONTAINER = 455,
UNABLE_TO_RESIZE = 456, UNABLE_TO_RESIZE = 456,
UNAUTHORIZED = 457, UNAUTHORIZED = 457,
UNABLE_TO_USE_STOPPED_JOB = 458, UNABLE_TO_USE_STOPPED_JOB = 458,
UNABLE_TO_CHANGE_JOB_DURATION = 459, UNABLE_TO_CHANGE_JOB_DURATION = 459,
UNABLE_TO_CREATE_NETWORK = 460, UNABLE_TO_CREATE_NETWORK = 460,
VM_CREATE_FAILED = 461, VM_CREATE_FAILED = 461,
MISSING_NOTIFICATION = 462, MISSING_NOTIFICATION = 462,
INVALID_IMAGE = 463, INVALID_IMAGE = 463,
UNABLE_TO_CREATE = 464, UNABLE_TO_CREATE = 464,
UNABLE_TO_PORT_FORWARD = 465, UNABLE_TO_PORT_FORWARD = 465,
UNABLE_TO_FIND = 467, UNABLE_TO_FIND = 467,
TASK_FAILED = 468, TASK_FAILED = 468,
INVALID_NODE = 469, INVALID_NODE = 469,
NOTIFICATION_FAILURE = 470, NOTIFICATION_FAILURE = 470,
UNABLE_TO_UPDATE = 471, UNABLE_TO_UPDATE = 471,
PROXY_FAILED = 472, PROXY_FAILED = 472,
INVALID_CONFIGURATION = 473, INVALID_CONFIGURATION = 473,
} }
public enum VmState public enum VmState
{ {
Init, Init,
ExtensionsLaunched, ExtensionsLaunched,
ExtensionsFailed, ExtensionsFailed,
VmAllocationFailed, VmAllocationFailed,
Running, Running,
Stopping, Stopping,
Stopped Stopped
} }
public enum WebhookMessageState public enum WebhookMessageState
{ {
Queued, Queued,
Retrying, Retrying,
Succeeded, Succeeded,
Failed Failed
} }
public enum TaskState public enum TaskState
{ {
Init, Init,
Waiting, Waiting,
Scheduled, Scheduled,
SettingUp, SettingUp,
Running, Running,
Stopping, Stopping,
Stopped, Stopped,
WaitJob WaitJob
} }
public enum TaskType public enum TaskType
{ {
Coverage, Coverage,
LibfuzzerFuzz, LibfuzzerFuzz,
LibfuzzerCoverage, LibfuzzerCoverage,
LibfuzzerCrashReport, LibfuzzerCrashReport,
LibfuzzerMerge, LibfuzzerMerge,
LibfuzzerRegression, LibfuzzerRegression,
GenericAnalysis, GenericAnalysis,
GenericSupervisor, GenericSupervisor,
GenericMerge, GenericMerge,
GenericGenerator, GenericGenerator,
GenericCrashReport, GenericCrashReport,
GenericRegression GenericRegression
} }
public enum Os public enum Os
{ {
Windows, Windows,
Linux Linux
} }
public enum ContainerType public enum ContainerType
{ {
Analysis, Analysis,
Coverage, Coverage,
Crashes, Crashes,
Inputs, Inputs,
NoRepro, NoRepro,
ReadonlyInputs, ReadonlyInputs,
Reports, Reports,
Setup, Setup,
Tools, Tools,
UniqueInputs, UniqueInputs,
UniqueReports, UniqueReports,
RegressionReports, RegressionReports,
Logs Logs
} }
public enum StatsFormat public enum StatsFormat
{ {
AFL AFL
} }
public enum TaskDebugFlag public enum TaskDebugFlag
{ {
KeepNodeOnFailure, KeepNodeOnFailure,
KeepNodeOnCompletion, KeepNodeOnCompletion,
} }
public enum ScalesetState public enum ScalesetState
{ {
Init, Init,
Setup, Setup,
Resize, Resize,
Running, Running,
Shutdown, Shutdown,
Halt, Halt,
CreationFailed CreationFailed
} }
public static class ScalesetStateHelper public static class ScalesetStateHelper
{ {
static ConcurrentDictionary<string, ScalesetState[]> _states = new ConcurrentDictionary<string, ScalesetState[]>(); static ConcurrentDictionary<string, ScalesetState[]> _states = new ConcurrentDictionary<string, ScalesetState[]>();
/// set of states that indicate the scaleset can be updated /// set of states that indicate the scaleset can be updated
public static ScalesetState[] CanUpdate() public static ScalesetState[] CanUpdate()
{ {
return return
_states.GetOrAdd("CanUpdate", k => new[]{ _states.GetOrAdd("CanUpdate", k => new[]{
ScalesetState.Running, ScalesetState.Running,
ScalesetState.Resize ScalesetState.Resize
}); });
} }
/// set of states that indicate work is needed during eventing /// set of states that indicate work is needed during eventing
public static ScalesetState[] NeedsWork() public static ScalesetState[] NeedsWork()
{ {
return return
_states.GetOrAdd("CanUpdate", k => new[]{ _states.GetOrAdd("CanUpdate", k => new[]{
ScalesetState.Init, ScalesetState.Init,
ScalesetState.Setup, ScalesetState.Setup,
ScalesetState.Resize, ScalesetState.Resize,
ScalesetState.Shutdown, ScalesetState.Shutdown,
ScalesetState.Halt, ScalesetState.Halt,
}); });
} }
/// set of states that indicate if it's available for work /// set of states that indicate if it's available for work
public static ScalesetState[] Available() public static ScalesetState[] Available()
{ {
return return
_states.GetOrAdd("CanUpdate", k => _states.GetOrAdd("CanUpdate", k =>
{ {
return return
new[]{ new[]{
ScalesetState.Resize, ScalesetState.Resize,
ScalesetState.Running, ScalesetState.Running,
}; };
}); });
} }
/// set of states that indicate scaleset is resizing /// set of states that indicate scaleset is resizing
public static ScalesetState[] Resizing() public static ScalesetState[] Resizing()
{ {
return return
_states.GetOrAdd("CanDelete", k => _states.GetOrAdd("CanDelete", k =>
{ {
return return
new[]{ new[]{
ScalesetState.Halt, ScalesetState.Halt,
ScalesetState.Init, ScalesetState.Init,
ScalesetState.Setup, ScalesetState.Setup,
}; };
}); });
} }
} }
public static class TaskStateHelper
{ public static class VmStateHelper
static ConcurrentDictionary<string, TaskState[]> _states = new ConcurrentDictionary<string, TaskState[]>(); {
public static TaskState[] Available()
{ static ConcurrentDictionary<string, VmState[]> _states = new ConcurrentDictionary<string, VmState[]>();
return public static VmState[] NeedsWork()
_states.GetOrAdd("Available", k => {
{ return
return _states.GetOrAdd(nameof(VmStateHelper.NeedsWork), k =>
new[]{ {
TaskState.Waiting, return
TaskState.Scheduled, new[]{
TaskState.SettingUp, VmState.Init,
TaskState.Running, VmState.ExtensionsLaunched,
TaskState.WaitJob VmState.Stopping
}; };
}); });
} }
}
public static VmState[] Available()
{
return
_states.GetOrAdd(nameof(VmStateHelper.Available), k =>
{
return
new[]{
VmState.Init,
VmState.ExtensionsLaunched,
VmState.ExtensionsFailed,
VmState.VmAllocationFailed,
VmState.Running,
};
});
}
}
public static class TaskStateHelper
{
static ConcurrentDictionary<string, TaskState[]> _states = new ConcurrentDictionary<string, TaskState[]>();
public static TaskState[] Available()
{
return
_states.GetOrAdd("Available", k =>
{
return
new[]{
TaskState.Waiting,
TaskState.Scheduled,
TaskState.SettingUp,
TaskState.Running,
TaskState.WaitJob
};
});
}
}

View File

@ -2,6 +2,7 @@ using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
using System.Text.Json; using System.Text.Json;
using System.Text.Json.Serialization; using System.Text.Json.Serialization;
using PoolName = System.String; using PoolName = System.String;
using Region = System.String;
namespace Microsoft.OneFuzz.Service; namespace Microsoft.OneFuzz.Service;
@ -49,6 +50,10 @@ public abstract record BaseEvent()
EventNodeHeartbeat _ => EventType.NodeHeartbeat, EventNodeHeartbeat _ => EventType.NodeHeartbeat,
EventTaskHeartbeat _ => EventType.TaskHeartbeat, EventTaskHeartbeat _ => EventType.TaskHeartbeat,
EventInstanceConfigUpdated _ => EventType.InstanceConfigUpdated, EventInstanceConfigUpdated _ => EventType.InstanceConfigUpdated,
EventProxyCreated _ => EventType.ProxyCreated,
EventProxyDeleted _ => EventType.ProxyDeleted,
EventProxyFailed _ => EventType.ProxyFailed,
EventProxyStateUpdated _ => EventType.ProxyStateUpdated,
EventCrashReported _ => EventType.CrashReported, EventCrashReported _ => EventType.CrashReported,
EventRegressionReported _ => EventType.RegressionReported, EventRegressionReported _ => EventType.RegressionReported,
EventFileAdded _ => EventType.FileAdded, EventFileAdded _ => EventType.FileAdded,
@ -64,6 +69,10 @@ public abstract record BaseEvent()
EventType.NodeHeartbeat => typeof(EventNodeHeartbeat), EventType.NodeHeartbeat => typeof(EventNodeHeartbeat),
EventType.InstanceConfigUpdated => typeof(EventInstanceConfigUpdated), EventType.InstanceConfigUpdated => typeof(EventInstanceConfigUpdated),
EventType.TaskHeartbeat => typeof(EventTaskHeartbeat), EventType.TaskHeartbeat => typeof(EventTaskHeartbeat),
EventType.ProxyCreated => typeof(EventProxyCreated),
EventType.ProxyDeleted => typeof(EventProxyDeleted),
EventType.ProxyFailed => typeof(EventProxyFailed),
EventType.ProxyStateUpdated => typeof(EventProxyStateUpdated),
EventType.CrashReported => typeof(EventCrashReported), EventType.CrashReported => typeof(EventCrashReported),
EventType.RegressionReported => typeof(EventRegressionReported), EventType.RegressionReported => typeof(EventRegressionReported),
EventType.FileAdded => typeof(EventFileAdded), EventType.FileAdded => typeof(EventFileAdded),
@ -192,31 +201,30 @@ public record EventTaskHeartbeat(
// ) : BaseEvent(); // ) : BaseEvent();
//record EventProxyCreated( public record EventProxyCreated(
// Region Region, Region Region,
// Guid? ProxyId, Guid? ProxyId
) : BaseEvent();
// ) : BaseEvent();
//record EventProxyDeleted( record EventProxyDeleted(
// Region Region, Region Region,
// Guid? ProxyId Guid? ProxyId
//) : BaseEvent(); ) : BaseEvent();
//record EventProxyFailed( record EventProxyFailed(
// Region Region, Region Region,
// Guid? ProxyId, Guid? ProxyId,
// Error Error Error Error
//) : BaseEvent(); ) : BaseEvent();
//record EventProxyStateUpdated( record EventProxyStateUpdated(
// Region Region, Region Region,
// Guid ProxyId, Guid ProxyId,
// VmState State VmState State
// ) : BaseEvent(); ) : BaseEvent();
//record EventNodeCreated( //record EventNodeCreated(

View File

@ -108,7 +108,7 @@ public partial record Node
bool ReimageRequested, bool ReimageRequested,
bool DeleteRequested, bool DeleteRequested,
bool DebugKeepNode bool DebugKeepNode
) : EntityBase(); ) : StatefulEntityBase<NodeState>(State);
public partial record ProxyForward public partial record ProxyForward
@ -141,8 +141,9 @@ public partial record Proxy
string? Ip, string? Ip,
Error? Error, Error? Error,
string Version, string Version,
ProxyHeartbeat? Heartbeat ProxyHeartbeat? Heartbeat,
) : EntityBase(); bool Outdated
) : StatefulEntityBase<VmState>(State);
public record Error(ErrorCode Code, string[]? Errors = null); public record Error(ErrorCode Code, string[]? Errors = null);
@ -244,7 +245,7 @@ public record Task(
Authentication? Auth, Authentication? Auth,
DateTimeOffset? Heartbeat, DateTimeOffset? Heartbeat,
DateTimeOffset? EndTime, DateTimeOffset? EndTime,
UserInfo? UserInfo) : EntityBase() UserInfo? UserInfo) : StatefulEntityBase<TaskState>(State)
{ {
List<TaskEventSummary> Events { get; set; } = new List<TaskEventSummary>(); List<TaskEventSummary> Events { get; set; } = new List<TaskEventSummary>();
List<NodeAssignment> Nodes { get; set; } = new List<NodeAssignment>(); List<NodeAssignment> Nodes { get; set; } = new List<NodeAssignment>();
@ -280,6 +281,9 @@ public record NetworkConfig(
string Subnet string Subnet
) )
{ {
public static NetworkConfig Default { get; } = new NetworkConfig("10.0.0.0/8", "10.0.0.0/16");
public NetworkConfig() : this("10.0.0.0/8", "10.0.0.0/16") { } public NetworkConfig() : this("10.0.0.0/8", "10.0.0.0/16") { }
} }
@ -392,7 +396,7 @@ public record Scaleset(
Guid? ClientObjectId, Guid? ClientObjectId,
Dictionary<string, string> Tags Dictionary<string, string> Tags
) : EntityBase(); ) : StatefulEntityBase<ScalesetState>(State);
[JsonConverter(typeof(ContainerConverter))] [JsonConverter(typeof(ContainerConverter))]
public record Container(string ContainerName) public record Container(string ContainerName)

View File

@ -0,0 +1,122 @@
using Microsoft.Azure.Functions.Worker;
namespace Microsoft.OneFuzz.Service;
public partial class TimerProxy
{
private readonly ILogTracer _logger;
private readonly IProxyOperations _proxYOperations;
private readonly IScalesetOperations _scalesetOperations;
private readonly INsg _nsg;
private readonly ICreds _creds;
private readonly IConfigOperations _configOperations;
private readonly ISubnet _subnet;
public TimerProxy(ILogTracer logTracer, IProxyOperations proxies, IScalesetOperations scalesets, INsg nsg, ICreds creds, IConfigOperations configOperations, ISubnet subnet)
{
_logger = logTracer;
_proxYOperations = proxies;
_scalesetOperations = scalesets;
_nsg = nsg;
_creds = creds;
_configOperations = configOperations;
_subnet = subnet;
}
//[Function("TimerDaily")]
public async Async.Task Run([TimerTrigger("1.00:00:00")] TimerInfo myTimer)
{
var proxies = await _proxYOperations.QueryAsync().ToListAsync();
foreach (var proxy in proxies)
{
if (VmStateHelper.Available().Contains(proxy.State))
{
// Note, outdated checked at the start, but set at the end of this loop.
// As this function is called via a timer, this works around a user
// requesting to use the proxy while this function is checking if it's
// out of date
if (proxy.Outdated)
{
await _proxYOperations.SetState(proxy, VmState.Stopping);
// If something is "wrong" with a proxy, delete & recreate it
}
else if (!_proxYOperations.IsAlive(proxy))
{
_logger.Error($"scaleset-proxy: alive check failed, stopping: {proxy.Region}");
await _proxYOperations.SetState(proxy, VmState.Stopping);
}
else
{
await _proxYOperations.SaveProxyConfig(proxy);
}
}
if (VmStateHelper.NeedsWork().Contains(proxy.State))
{
_logger.Error($"scaleset-proxy: update state. proxy:{proxy.Region} state:{proxy.State}");
await _proxYOperations.ProcessStateUpdate(proxy);
}
if (proxy.State != VmState.Stopped && _proxYOperations.IsOutdated(proxy))
{
await _proxYOperations.Replace(proxy with { Outdated = true });
}
}
// make sure there is a proxy for every currently active region
var regions = await _scalesetOperations.QueryAsync().Select(x => x.Region).ToHashSetAsync();
foreach (var region in regions)
{
var allOutdated = proxies.Where(x => x.Region == region).All(p => p.Outdated);
if (allOutdated)
{
await _proxYOperations.GetOrCreate(region);
_logger.Info($"Creating new proxy in region {region}");
}
// this is required in order to support upgrade from non-nsg to
// nsg enabled OneFuzz this will overwrite existing NSG
// assignment though. This behavior is acceptable at this point
// since we do not support bring your own NSG
if (await _nsg.GetNsg(region) != null)
{
var network = await Network.Create(region, _creds, _configOperations, _subnet);
var subnet = await network.GetSubnet();
var vnet = await network.GetVnet();
if (subnet != null && vnet != null)
{
var error = _nsg.AssociateSubnet(region, vnet, subnet);
if (error != null)
{
_logger.Error($"Failed to associate NSG and subnet due to {error} in region {region}");
}
}
}
// if there are NSGs with name same as the region that they are allocated
// and have no NIC associated with it then delete the NSG
await foreach (var nsg in _nsg.ListNsgs())
{
if (_nsg.OkToDelete(regions, nsg.Data.Location, nsg.Data.Name))
{
if (nsg.Data.NetworkInterfaces.Count == 0 && nsg.Data.Subnets.Count == 0)
{
await _nsg.StartDeleteNsg(nsg.Data.Name);
}
}
}
}
}
}

View File

@ -1,5 +1,7 @@
using Azure.Identity; using Azure.Identity;
using Azure.Core; using Azure.Core;
using Azure.ResourceManager;
using Azure.ResourceManager.Resources;
namespace Microsoft.OneFuzz.Service; namespace Microsoft.OneFuzz.Service;
@ -12,10 +14,23 @@ public interface ICreds
public string GetBaseResourceGroup(); public string GetBaseResourceGroup();
public ResourceIdentifier GetResourceGroupResourceIdentifier(); public ResourceIdentifier GetResourceGroupResourceIdentifier();
public ArmClient ArmClient { get; }
public ResourceGroupResource GetResourceGroupResource();
} }
public class Creds : ICreds public class Creds : ICreds
{ {
private readonly Lazy<ArmClient> _armClient;
public ArmClient ArmClient => _armClient.Value;
public Creds()
{
_armClient = new Lazy<ArmClient>(() => new ArmClient(this.GetIdentity(), this.GetSubcription()), true);
}
// TODO: @cached // TODO: @cached
public DefaultAzureCredential GetIdentity() public DefaultAzureCredential GetIdentity()
@ -47,4 +62,10 @@ public class Creds : ICreds
?? throw new System.Exception("Resource group env var is not present"); ?? throw new System.Exception("Resource group env var is not present");
return new ResourceIdentifier(resourceId); return new ResourceIdentifier(resourceId);
} }
public ResourceGroupResource GetResourceGroupResource()
{
var resourceId = GetResourceGroupResourceIdentifier();
return ArmClient.GetResourceGroupResource(resourceId);
}
} }

View File

@ -15,7 +15,7 @@ public class ConfigOperations : Orm<InstanceConfig>, IConfigOperations
{ {
private readonly IEvents _events; private readonly IEvents _events;
private readonly ILogTracer _log; private readonly ILogTracer _log;
public ConfigOperations(IStorage storage, IEvents events, ILogTracer log) : base(storage) public ConfigOperations(IStorage storage, IEvents events, ILogTracer log) : base(storage, log)
{ {
_events = events; _events = events;
_log = log; _log = log;

View File

@ -0,0 +1,74 @@
using Azure.ResourceManager.Network;
namespace Microsoft.OneFuzz.Service;
public partial class TimerProxy
{
public class Network
{
private readonly string _name;
private readonly string _group;
private readonly string _region;
private readonly NetworkConfig _networkConfig;
private readonly ISubnet _subnet;
// This was generated randomly and should be preserved moving forwards
static Guid NETWORK_GUID_NAMESPACE = Guid.Parse("372977ad-b533-416a-b1b4-f770898e0b11");
public Network(string region, string group, string name, NetworkConfig networkConfig, ISubnet subnet)
{
_networkConfig = networkConfig;
_region = region;
_group = group;
_name = name;
_subnet = subnet;
}
private static Guid GenerateGuidv5(Guid nameSpace, string name)
{
throw new NotImplementedException();
}
public static async Async.Task<Network> Create(string region, ICreds creds, IConfigOperations configOperations, ISubnet subnet)
{
var group = creds.GetBaseResourceGroup();
var instanceConfig = await configOperations.Fetch();
var networkConfig = instanceConfig.NetworkConfig;
// Network names will be calculated from the address_space/subnet
// *except* if they are the original values. This allows backwards
// compatibility to existing configs if you don't change the network
// configs.
string name;
if (networkConfig.AddressSpace == NetworkConfig.Default.AddressSpace && networkConfig.Subnet == NetworkConfig.Default.Subnet)
{
name = region;
}
else
{
var networkId = GenerateGuidv5(NETWORK_GUID_NAMESPACE, string.Join("|", networkConfig.AddressSpace, networkConfig.Subnet));
name = $"{region}-{networkId}";
}
return new Network(region, group, name, networkConfig, subnet);
}
public Async.Task<SubnetResource?> GetSubnet()
{
return _subnet.GetSubnet(_name, _name);
}
internal System.Threading.Tasks.Task<VirtualNetworkResource?> GetVnet()
{
return _subnet.GetVnet(_name);
}
}
}

View File

@ -3,16 +3,16 @@ using System.Threading.Tasks;
namespace Microsoft.OneFuzz.Service; namespace Microsoft.OneFuzz.Service;
public interface INodeOperations : IOrm<Node> public interface INodeOperations : IStatefulOrm<Node, NodeState>
{ {
Task<Node?> GetByMachineId(Guid machineId); Task<Node?> GetByMachineId(Guid machineId);
} }
public class NodeOperations : Orm<Node>, INodeOperations public class NodeOperations : StatefulOrm<Node, NodeState>, INodeOperations
{ {
public NodeOperations(IStorage storage) public NodeOperations(IStorage storage, ILogTracer log)
: base(storage) : base(storage, log)
{ {
} }

View File

@ -21,7 +21,7 @@ public class NotificationOperations : Orm<Notification>, INotificationOperations
private IEvents _events; private IEvents _events;
public NotificationOperations(ILogTracer log, IStorage storage, IReports reports, ITaskOperations taskOperations, IContainers containers, IQueue queue, IEvents events) public NotificationOperations(ILogTracer log, IStorage storage, IReports reports, ITaskOperations taskOperations, IContainers containers, IQueue queue, IEvents events)
: base(storage) : base(storage, log)
{ {
_log = log; _log = log;
_reports = reports; _reports = reports;

View File

@ -0,0 +1,85 @@
using Azure;
using Azure.ResourceManager.Network;
namespace Microsoft.OneFuzz.Service
{
public interface INsg
{
Async.Task<NetworkSecurityGroupResource?> GetNsg(string name);
public Async.Task<Error?> AssociateSubnet(string name, VirtualNetworkResource vnet, SubnetResource subnet);
IAsyncEnumerable<NetworkSecurityGroupResource> ListNsgs();
bool OkToDelete(HashSet<string> active_regions, string nsg_region, string nsg_name);
Async.Task<bool> StartDeleteNsg(string name);
}
public class Nsg : INsg
{
private readonly ICreds _creds;
private readonly ILogTracer _logTracer;
public Nsg(ICreds creds, ILogTracer logTracer)
{
_creds = creds;
_logTracer = logTracer;
}
public async Async.Task<Error?> AssociateSubnet(string name, VirtualNetworkResource vnet, SubnetResource subnet)
{
var nsg = await GetNsg(name);
if (nsg == null)
{
return new Error(ErrorCode.UNABLE_TO_FIND, new[] { $"cannot associate subnet. nsg {name} not found" });
}
if (nsg.Data.Location != vnet.Data.Location)
{
return new Error(ErrorCode.UNABLE_TO_UPDATE, new[] { $"subnet and nsg have to be in the same region. nsg {nsg.Data.Name} {nsg.Data.Location}, subnet: {subnet.Data.Name} {subnet.Data}" });
}
if (subnet.Data.NetworkSecurityGroup != null && subnet.Data.NetworkSecurityGroup.Id == nsg.Id)
{
_logTracer.Info($"Subnet {subnet.Data.Name} and NSG {name} already associated, not updating");
return null;
}
subnet.Data.NetworkSecurityGroup = nsg.Data;
var result = await vnet.GetSubnets().CreateOrUpdateAsync(WaitUntil.Started, subnet.Data.Name, subnet.Data);
return null;
}
public async Async.Task<NetworkSecurityGroupResource?> GetNsg(string name)
{
var response = await _creds.GetResourceGroupResource().GetNetworkSecurityGroupAsync(name);
if (response == null)
{
//_logTracer.Debug($"nsg %s does not exist: {name}");
}
return response?.Value;
}
public IAsyncEnumerable<NetworkSecurityGroupResource> ListNsgs()
{
return _creds.GetResourceGroupResource().GetNetworkSecurityGroups().GetAllAsync();
}
public bool OkToDelete(HashSet<string> active_regions, string nsg_region, string nsg_name)
{
return !active_regions.Contains(nsg_region) && nsg_region == nsg_name;
}
// Returns True if deletion completed (thus resource not found) or successfully started.
// Returns False if failed to start deletion.
public async Async.Task<bool> StartDeleteNsg(string name)
{
_logTracer.Info($"deleting nsg: {name}");
var nsg = await _creds.GetResourceGroupResource().GetNetworkSecurityGroupAsync(name);
await nsg.Value.DeleteAsync(WaitUntil.Completed);
return true;
}
}
}

View File

@ -1,20 +1,29 @@
using ApiService.OneFuzzLib.Orm; using ApiService.OneFuzzLib.Orm;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Microsoft.OneFuzz.Service; namespace Microsoft.OneFuzz.Service;
public interface IProxyOperations : IOrm<Proxy> public interface IProxyOperations : IStatefulOrm<Proxy, VmState>
{ {
Task<Proxy?> GetByProxyId(Guid proxyId); Task<Proxy?> GetByProxyId(Guid proxyId);
Async.Task SetState(Proxy proxy, VmState state);
bool IsAlive(Proxy proxy);
System.Threading.Tasks.Task SaveProxyConfig(Proxy proxy);
bool IsOutdated(Proxy proxy);
System.Threading.Tasks.Task GetOrCreate(string region);
} }
public class ProxyOperations : Orm<Proxy>, IProxyOperations public class ProxyOperations : StatefulOrm<Proxy, VmState>, IProxyOperations
{ {
private readonly ILogTracer _log; private readonly ILogTracer _log;
public ProxyOperations(ILogTracer log, IStorage storage) private readonly IEvents _events;
: base(storage)
public ProxyOperations(ILogTracer log, IStorage storage, IEvents events)
: base(storage, log)
{ {
_log = log; _log = log;
_events = events;
} }
public async Task<Proxy?> GetByProxyId(Guid proxyId) public async Task<Proxy?> GetByProxyId(Guid proxyId)
@ -24,4 +33,36 @@ public class ProxyOperations : Orm<Proxy>, IProxyOperations
return await data.FirstOrDefaultAsync(); return await data.FirstOrDefaultAsync();
} }
public System.Threading.Tasks.Task GetOrCreate(string region)
{
throw new NotImplementedException();
}
public bool IsAlive(Proxy proxy)
{
throw new NotImplementedException();
}
public bool IsOutdated(Proxy proxy)
{
throw new NotImplementedException();
}
public System.Threading.Tasks.Task SaveProxyConfig(Proxy proxy)
{
throw new NotImplementedException();
}
public async Async.Task SetState(Proxy proxy, VmState state)
{
if (proxy.State == state)
{
return;
}
await Replace(proxy with { State = state });
await _events.SendEvent(new EventProxyStateUpdated(proxy.Region, proxy.ProxyId, proxy.State));
}
} }

View File

@ -7,11 +7,11 @@ public interface IScalesetOperations : IOrm<Scaleset>
IAsyncEnumerable<Scaleset> Search(); IAsyncEnumerable<Scaleset> Search();
} }
public class ScalesetOperations : Orm<Scaleset>, IScalesetOperations public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState>, IScalesetOperations
{ {
public ScalesetOperations(IStorage storage) public ScalesetOperations(IStorage storage, ILogTracer log)
: base(storage) : base(storage, log)
{ {
} }

View File

@ -13,8 +13,6 @@ public enum StorageType
public interface IStorage public interface IStorage
{ {
public ArmClient GetMgmtClient();
public IEnumerable<string> CorpusAccounts(); public IEnumerable<string> CorpusAccounts();
string GetPrimaryAccount(StorageType storageType); string GetPrimaryAccount(StorageType storageType);
public (string?, string?) GetStorageAccountNameAndKey(string accountId); public (string?, string?) GetStorageAccountNameAndKey(string accountId);

View File

@ -0,0 +1,44 @@
using Azure.Core;
using Azure.ResourceManager.Network;
namespace Microsoft.OneFuzz.Service;
public interface ISubnet
{
System.Threading.Tasks.Task<VirtualNetworkResource?> GetVnet(string vnetName);
System.Threading.Tasks.Task<SubnetResource?> GetSubnet(string vnetName, string subnetName);
}
public partial class TimerProxy
{
public class Subnet : ISubnet
{
private readonly ICreds _creds;
public Subnet(ICreds creds)
{
_creds = creds;
}
public async System.Threading.Tasks.Task<SubnetResource?> GetSubnet(string vnetName, string subnetName)
{
var vnet = await this.GetVnet(vnetName);
if (vnet != null)
{
return await vnet.GetSubnetAsync(subnetName);
}
return null;
}
public async System.Threading.Tasks.Task<VirtualNetworkResource?> GetVnet(string vnetName)
{
var resourceGroupId = new ResourceIdentifier(EnvironmentVariables.OneFuzz.ResourceGroup ?? throw new Exception("Missing resource group"));
var response = await _creds.ArmClient.GetResourceGroupResource(resourceGroupId).GetVirtualNetworkAsync(vnetName);
return response.Value;
}
}
}

View File

@ -2,7 +2,7 @@
namespace Microsoft.OneFuzz.Service; namespace Microsoft.OneFuzz.Service;
public interface ITaskOperations : IOrm<Task> public interface ITaskOperations : IStatefulOrm<Task, TaskState>
{ {
Async.Task<Task?> GetByTaskId(Guid taskId); Async.Task<Task?> GetByTaskId(Guid taskId);
@ -15,11 +15,11 @@ public interface ITaskOperations : IOrm<Task>
} }
public class TaskOperations : Orm<Task>, ITaskOperations public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations
{ {
public TaskOperations(IStorage storage) public TaskOperations(IStorage storage, ILogTracer log)
: base(storage) : base(storage, log)
{ {
} }

View File

@ -20,7 +20,7 @@ public class WebhookMessageLogOperations : Orm<WebhookMessageLog>, IWebhookMessa
private readonly IQueue _queue; private readonly IQueue _queue;
private readonly ILogTracer _log; private readonly ILogTracer _log;
public WebhookMessageLogOperations(IStorage storage, IQueue queue, ILogTracer log) : base(storage) public WebhookMessageLogOperations(IStorage storage, IQueue queue, ILogTracer log) : base(storage, log)
{ {
_queue = queue; _queue = queue;
_log = log; _log = log;
@ -78,7 +78,7 @@ public class WebhookOperations : Orm<Webhook>, IWebhookOperations
private readonly IWebhookMessageLogOperations _webhookMessageLogOperations; private readonly IWebhookMessageLogOperations _webhookMessageLogOperations;
private readonly ILogTracer _log; private readonly ILogTracer _log;
public WebhookOperations(IStorage storage, IWebhookMessageLogOperations webhookMessageLogOperations, ILogTracer log) public WebhookOperations(IStorage storage, IWebhookMessageLogOperations webhookMessageLogOperations, ILogTracer log)
: base(storage) : base(storage, log)
{ {
_webhookMessageLogOperations = webhookMessageLogOperations; _webhookMessageLogOperations = webhookMessageLogOperations;
_log = log; _log = log;

View File

@ -12,11 +12,10 @@ public abstract record EntityBase
{ {
[JsonIgnore] public ETag? ETag { get; set; } [JsonIgnore] public ETag? ETag { get; set; }
public DateTimeOffset? TimeStamp { get; set; } public DateTimeOffset? TimeStamp { get; set; }
//public ApiService.OneFuzzLib.Orm.IOrm<EntityBase>? Orm { get; set; }
} }
public abstract record StatefulEntityBase<T>([property: JsonIgnore] T state) : EntityBase() where T : Enum;
/// Indicates that the enum cases should no be renamed /// Indicates that the enum cases should no be renamed
[AttributeUsage(AttributeTargets.Enum)] [AttributeUsage(AttributeTargets.Enum)]
public class SkipRename : Attribute { } public class SkipRename : Attribute { }

View File

@ -1,30 +1,40 @@
using Azure.Data.Tables; using Azure.Data.Tables;
using Microsoft.OneFuzz.Service; using Microsoft.OneFuzz.Service;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm; using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
using System.Reflection;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Collections.Concurrent;
namespace ApiService.OneFuzzLib.Orm namespace ApiService.OneFuzzLib.Orm
{ {
public interface IOrm<T> where T : EntityBase public interface IOrm<T> where T : EntityBase
{ {
Task<TableClient> GetTableClient(string table, string? accountId = null); Task<TableClient> GetTableClient(string table, string? accountId = null);
IAsyncEnumerable<T> QueryAsync(string filter); IAsyncEnumerable<T> QueryAsync(string? filter = null);
Task<ResultOk<(int, string)>> Replace(T entity); Task<ResultOk<(int, string)>> Replace(T entity);
Task<T> GetEntityAsync(string partitionKey, string rowKey); Task<T> GetEntityAsync(string partitionKey, string rowKey);
Task<ResultOk<(int, string)>> Insert(T entity); Task<ResultOk<(int, string)>> Insert(T entity);
Task<ResultOk<(int, string)>> Delete(T entity); Task<ResultOk<(int, string)>> Delete(T entity);
} }
public class Orm<T> : IOrm<T> where T : EntityBase public class Orm<T> : IOrm<T> where T : EntityBase
{ {
IStorage _storage; IStorage _storage;
EntityConverter _entityConverter; EntityConverter _entityConverter;
protected ILogTracer _logTracer;
public Orm(IStorage storage)
public Orm(IStorage storage, ILogTracer logTracer)
{ {
_storage = storage; _storage = storage;
_entityConverter = new EntityConverter(); _entityConverter = new EntityConverter();
_logTracer = logTracer;
} }
public async IAsyncEnumerable<T> QueryAsync(string? filter = null) public async IAsyncEnumerable<T> QueryAsync(string? filter = null)
@ -122,4 +132,93 @@ namespace ApiService.OneFuzzLib.Orm
} }
} }
} }
public interface IStatefulOrm<T, TState> : IOrm<T> where T : StatefulEntityBase<TState> where TState : Enum
{
System.Threading.Tasks.Task<T?> ProcessStateUpdate(T entity);
System.Threading.Tasks.Task<T?> ProcessStateUpdates(T entity, int MaxUpdates = 5);
}
public class StatefulOrm<T, TState> : Orm<T>, IStatefulOrm<T, TState> where T : StatefulEntityBase<TState> where TState : Enum
{
static Lazy<Func<object>>? _partitionKeyGetter;
static Lazy<Func<object>>? _rowKeyGetter;
static ConcurrentDictionary<string, Func<T, Async.Task<T>>?> _stateFuncs = new ConcurrentDictionary<string, Func<T, Async.Task<T>>?>();
static StatefulOrm()
{
_partitionKeyGetter =
typeof(T).GetProperties().FirstOrDefault(p => p.GetCustomAttributes(true).OfType<PartitionKeyAttribute>().Any())?.GetMethod switch
{
null => null,
MethodInfo info => new Lazy<Func<object>>(() => (Func<object>)Delegate.CreateDelegate(typeof(Func<object>), info), true)
};
_rowKeyGetter =
typeof(T).GetProperties().FirstOrDefault(p => p.GetCustomAttributes(true).OfType<RowKeyAttribute>().Any())?.GetMethod switch
{
null => null,
MethodInfo info => new Lazy<Func<object>>(() => (Func<object>)Delegate.CreateDelegate(typeof(Func<object>), info), true)
};
}
public StatefulOrm(IStorage storage, ILogTracer logTracer) : base(storage, logTracer)
{
}
/// <summary>
/// process a single state update, if the obj
/// implements a function for that state
/// </summary>
/// <param name="entity"></param>
/// <returns></returns>
public async System.Threading.Tasks.Task<T?> ProcessStateUpdate(T entity)
{
TState state = entity.state;
var func = _stateFuncs.GetOrAdd(state.ToString(), (string k) =>
typeof(T).GetMethod(k) switch
{
null => null,
MethodInfo info => (Func<T, Async.Task<T>>)Delegate.CreateDelegate(typeof(Func<T, Async.Task<T>>), info)
});
if (func != null)
{
_logTracer.Info($"processing state update: {typeof(T)} - PartitionKey {_partitionKeyGetter?.Value() } {_rowKeyGetter?.Value() } - %s");
return await func(entity);
}
return null;
}
/// <summary>
/// process through the state machine for an object
/// </summary>
/// <param name="entity"></param>
/// <param name="MaxUpdates"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public async System.Threading.Tasks.Task<T?> ProcessStateUpdates(T entity, int MaxUpdates = 5)
{
for (int i = 0; i < MaxUpdates; i++)
{
var state = entity.state;
var newEntity = await ProcessStateUpdate(entity);
if (newEntity == null)
return null;
if (newEntity.state.Equals(state))
{
return newEntity;
}
}
return null;
}
}
} }

View File

@ -89,7 +89,7 @@ namespace Tests
public static Gen<Proxy> Proxy() public static Gen<Proxy> Proxy()
{ {
return Arb.Generate<Tuple<Tuple<string, Guid, DateTimeOffset?, VmState, Authentication, string?, Error?>, Tuple<string, ProxyHeartbeat?>>>().Select( return Arb.Generate<Tuple<Tuple<string, Guid, DateTimeOffset?, VmState, Authentication, string?, Error?>, Tuple<string, ProxyHeartbeat?, bool>>>().Select(
arg => arg =>
new Proxy( new Proxy(
Region: arg.Item1.Item1, Region: arg.Item1.Item1,
@ -100,7 +100,8 @@ namespace Tests
Ip: arg.Item1.Item6, Ip: arg.Item1.Item6,
Error: arg.Item1.Item7, Error: arg.Item1.Item7,
Version: arg.Item2.Item1, Version: arg.Item2.Item1,
Heartbeat: arg.Item2.Item2 Heartbeat: arg.Item2.Item2,
Outdated: arg.Item2.Item3
) )
); );
} }