mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-11 09:41:37 +00:00
Check if state transitions exist at test time (#2241)
Don't ignore missing state transitions in `StatefulOrm` static constructor test, instead error out. `Scaleset` was missing the `Resize` transition.
This commit is contained in:
parent
84b2cc9992
commit
b28008b519
@ -1,4 +1,5 @@
|
||||
using ApiService.OneFuzzLib.Orm;
|
||||
using System.Threading.Tasks;
|
||||
using ApiService.OneFuzzLib.Orm;
|
||||
|
||||
namespace Microsoft.OneFuzz.Service;
|
||||
|
||||
@ -6,15 +7,19 @@ public interface IJobOperations : IStatefulOrm<Job, JobState> {
|
||||
Async.Task<Job?> Get(Guid jobId);
|
||||
Async.Task OnStart(Job job);
|
||||
IAsyncEnumerable<Job> SearchExpired();
|
||||
Async.Task<Job> Stopping(Job job);
|
||||
Async.Task<Job> Init(Job job);
|
||||
IAsyncEnumerable<Job> SearchState(IEnumerable<JobState> states);
|
||||
Async.Task StopNeverStartedJobs();
|
||||
Async.Task StopIfAllDone(Job job);
|
||||
|
||||
// state transitions
|
||||
Async.Task<Job> Init(Job job);
|
||||
Async.Task<Job> Enabled(Job job);
|
||||
Async.Task<Job> Stopping(Job job);
|
||||
Async.Task<Job> Stopped(Job job);
|
||||
}
|
||||
|
||||
public class JobOperations : StatefulOrm<Job, JobState, JobOperations>, IJobOperations {
|
||||
private static TimeSpan JOB_NEVER_STARTED_DURATION = TimeSpan.FromDays(30);
|
||||
private static readonly TimeSpan JOB_NEVER_STARTED_DURATION = TimeSpan.FromDays(30);
|
||||
|
||||
public JobOperations(ILogTracer logTracer, IOnefuzzContext context) : base(logTracer, context) {
|
||||
}
|
||||
@ -111,4 +116,14 @@ public class JobOperations : StatefulOrm<Job, JobState, JobOperations>, IJobOper
|
||||
throw new Exception($"Failed to save job {job.JobId} : {result.ErrorV}");
|
||||
}
|
||||
}
|
||||
|
||||
public Task<Job> Enabled(Job job) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(job);
|
||||
}
|
||||
|
||||
public Task<Job> Stopped(Job job) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(job);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,46 @@
|
||||
using ApiService.OneFuzzLib.Orm;
|
||||
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
|
||||
|
||||
namespace Microsoft.OneFuzz.Service;
|
||||
|
||||
//# this isn't anticipated to be needed by the client, hence it not
|
||||
//# being in onefuzztypes
|
||||
public record NodeMessage(
|
||||
[PartitionKey] Guid MachineId,
|
||||
[RowKey] string MessageId,
|
||||
NodeCommand Message
|
||||
) : EntityBase {
|
||||
public NodeMessage(Guid machineId, NodeCommand message) : this(machineId, NewSortedKey, message) { }
|
||||
};
|
||||
|
||||
public interface INodeMessageOperations : IOrm<NodeMessage> {
|
||||
IAsyncEnumerable<NodeMessage> GetMessage(Guid machineId);
|
||||
Async.Task ClearMessages(Guid machineId);
|
||||
|
||||
Async.Task SendMessage(Guid machineId, NodeCommand message, string? messageId = null);
|
||||
}
|
||||
|
||||
public class NodeMessageOperations : Orm<NodeMessage>, INodeMessageOperations {
|
||||
|
||||
public NodeMessageOperations(ILogTracer log, IOnefuzzContext context)
|
||||
: base(log, context) { }
|
||||
|
||||
public IAsyncEnumerable<NodeMessage> GetMessage(Guid machineId)
|
||||
=> QueryAsync(Query.PartitionKey(machineId.ToString()));
|
||||
|
||||
public async Async.Task ClearMessages(Guid machineId) {
|
||||
_logTracer.Info($"clearing messages for node {machineId}");
|
||||
|
||||
await foreach (var message in GetMessage(machineId)) {
|
||||
var r = await Delete(message);
|
||||
if (!r.IsOk) {
|
||||
_logTracer.WithHttpStatus(r.ErrorV).Error($"failed to delete message for node {machineId}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async Async.Task SendMessage(Guid machineId, NodeCommand message, string? messageId = null) {
|
||||
messageId = messageId ?? DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString();
|
||||
await Insert(new NodeMessage(machineId, messageId, message));
|
||||
}
|
||||
}
|
@ -1,7 +1,6 @@
|
||||
using System.Threading.Tasks;
|
||||
using ApiService.OneFuzzLib.Orm;
|
||||
using Azure.Data.Tables;
|
||||
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
|
||||
|
||||
namespace Microsoft.OneFuzz.Service;
|
||||
|
||||
@ -55,6 +54,17 @@ public interface INodeOperations : IStatefulOrm<Node, NodeState> {
|
||||
IAsyncEnumerable<Node> SearchByPoolName(PoolName poolName);
|
||||
|
||||
Async.Task SetShutdown(Node node);
|
||||
|
||||
// state transitions:
|
||||
Async.Task<Node> Init(Node node);
|
||||
Async.Task<Node> Free(Node node);
|
||||
Async.Task<Node> SettingUp(Node node);
|
||||
Async.Task<Node> Rebooting(Node node);
|
||||
Async.Task<Node> Ready(Node node);
|
||||
Async.Task<Node> Busy(Node node);
|
||||
Async.Task<Node> Done(Node node);
|
||||
Async.Task<Node> Shutdown(Node node);
|
||||
Async.Task<Node> Halt(Node node);
|
||||
}
|
||||
|
||||
|
||||
@ -572,108 +582,49 @@ public class NodeOperations : StatefulOrm<Node, NodeState, NodeOperations>, INod
|
||||
await Stop(node, done: done);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public interface INodeTasksOperations : IStatefulOrm<NodeTasks, NodeTaskState> {
|
||||
IAsyncEnumerable<Node> GetNodesByTaskId(Guid taskId);
|
||||
IAsyncEnumerable<NodeAssignment> GetNodeAssignments(Guid taskId);
|
||||
IAsyncEnumerable<NodeTasks> GetByMachineId(Guid machineId);
|
||||
IAsyncEnumerable<NodeTasks> GetByTaskId(Guid taskId);
|
||||
Async.Task ClearByMachineId(Guid machineId);
|
||||
}
|
||||
|
||||
public class NodeTasksOperations : StatefulOrm<NodeTasks, NodeTaskState, NodeTasksOperations>, INodeTasksOperations {
|
||||
|
||||
ILogTracer _log;
|
||||
|
||||
public NodeTasksOperations(ILogTracer log, IOnefuzzContext context)
|
||||
: base(log, context) {
|
||||
_log = log;
|
||||
public Task<Node> Init(Node node) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(node);
|
||||
}
|
||||
|
||||
//TODO: suggest by Cheick: this can probably be optimize by query all NodesTasks then query the all machine in single request
|
||||
|
||||
public async IAsyncEnumerable<Node> GetNodesByTaskId(Guid taskId) {
|
||||
await foreach (var entry in QueryAsync(Query.RowKey(taskId.ToString()))) {
|
||||
var node = await _context.NodeOperations.GetByMachineId(entry.MachineId);
|
||||
if (node is not null) {
|
||||
yield return node;
|
||||
}
|
||||
}
|
||||
public Task<Node> Free(Node node) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(node);
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<NodeAssignment> GetNodeAssignments(Guid taskId) {
|
||||
|
||||
await foreach (var entry in QueryAsync(Query.RowKey(taskId.ToString()))) {
|
||||
var node = await _context.NodeOperations.GetByMachineId(entry.MachineId);
|
||||
if (node is not null) {
|
||||
var nodeAssignment = new NodeAssignment(node.MachineId, node.ScalesetId, entry.State);
|
||||
yield return nodeAssignment;
|
||||
}
|
||||
}
|
||||
public Task<Node> SettingUp(Node node) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(node);
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<NodeTasks> GetByMachineId(Guid machineId) {
|
||||
return QueryAsync(Query.PartitionKey(machineId.ToString()));
|
||||
public Task<Node> Rebooting(Node node) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(node);
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<NodeTasks> GetByTaskId(Guid taskId) {
|
||||
return QueryAsync(Query.RowKey(taskId.ToString()));
|
||||
public Task<Node> Ready(Node node) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(node);
|
||||
}
|
||||
|
||||
public async Async.Task ClearByMachineId(Guid machineId) {
|
||||
_logTracer.Info($"clearing tasks for node {machineId}");
|
||||
await foreach (var entry in GetByMachineId(machineId)) {
|
||||
var res = await Delete(entry);
|
||||
if (!res.IsOk) {
|
||||
_logTracer.WithHttpStatus(res.ErrorV).Error($"failed to delete node task entry for machine_id: {entry.MachineId}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//# this isn't anticipated to be needed by the client, hence it not
|
||||
//# being in onefuzztypes
|
||||
public record NodeMessage(
|
||||
[PartitionKey] Guid MachineId,
|
||||
[RowKey] string MessageId,
|
||||
NodeCommand Message
|
||||
) : EntityBase {
|
||||
public NodeMessage(Guid machineId, NodeCommand message) : this(machineId, NewSortedKey, message) { }
|
||||
};
|
||||
|
||||
public interface INodeMessageOperations : IOrm<NodeMessage> {
|
||||
IAsyncEnumerable<NodeMessage> GetMessage(Guid machineId);
|
||||
Async.Task ClearMessages(Guid machineId);
|
||||
|
||||
Async.Task SendMessage(Guid machineId, NodeCommand message, string? messageId = null);
|
||||
}
|
||||
|
||||
|
||||
public class NodeMessageOperations : Orm<NodeMessage>, INodeMessageOperations {
|
||||
|
||||
private readonly ILogTracer _log;
|
||||
public NodeMessageOperations(ILogTracer log, IOnefuzzContext context) : base(log, context) {
|
||||
_log = log;
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<NodeMessage> GetMessage(Guid machineId)
|
||||
=> QueryAsync(Query.PartitionKey(machineId.ToString()));
|
||||
|
||||
public async Async.Task ClearMessages(Guid machineId) {
|
||||
_logTracer.Info($"clearing messages for node {machineId}");
|
||||
|
||||
await foreach (var message in GetMessage(machineId)) {
|
||||
var r = await Delete(message);
|
||||
if (!r.IsOk) {
|
||||
_logTracer.WithHttpStatus(r.ErrorV).Error($"failed to delete message for node {machineId}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async Async.Task SendMessage(Guid machineId, NodeCommand message, string? messageId = null) {
|
||||
messageId = messageId ?? DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString();
|
||||
await Insert(new NodeMessage(machineId, messageId, message));
|
||||
public Task<Node> Busy(Node node) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(node);
|
||||
}
|
||||
|
||||
public Task<Node> Done(Node node) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(node);
|
||||
}
|
||||
|
||||
public Task<Node> Shutdown(Node node) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(node);
|
||||
}
|
||||
|
||||
public Task<Node> Halt(Node node) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(node);
|
||||
}
|
||||
}
|
||||
|
79
src/ApiService/ApiService/onefuzzlib/NodeTasksOperations.cs
Normal file
79
src/ApiService/ApiService/onefuzzlib/NodeTasksOperations.cs
Normal file
@ -0,0 +1,79 @@
|
||||
using System.Threading.Tasks;
|
||||
using ApiService.OneFuzzLib.Orm;
|
||||
|
||||
namespace Microsoft.OneFuzz.Service;
|
||||
|
||||
public interface INodeTasksOperations : IStatefulOrm<NodeTasks, NodeTaskState> {
|
||||
IAsyncEnumerable<Node> GetNodesByTaskId(Guid taskId);
|
||||
IAsyncEnumerable<NodeAssignment> GetNodeAssignments(Guid taskId);
|
||||
IAsyncEnumerable<NodeTasks> GetByMachineId(Guid machineId);
|
||||
IAsyncEnumerable<NodeTasks> GetByTaskId(Guid taskId);
|
||||
Async.Task ClearByMachineId(Guid machineId);
|
||||
|
||||
// state transitions:
|
||||
Async.Task<NodeTasks> Init(NodeTasks nodeTasks);
|
||||
Async.Task<NodeTasks> SettingUp(NodeTasks nodeTasks);
|
||||
Async.Task<NodeTasks> Running(NodeTasks nodeTasks);
|
||||
}
|
||||
|
||||
public class NodeTasksOperations : StatefulOrm<NodeTasks, NodeTaskState, NodeTasksOperations>, INodeTasksOperations {
|
||||
|
||||
public NodeTasksOperations(ILogTracer log, IOnefuzzContext context)
|
||||
: base(log, context) {
|
||||
}
|
||||
|
||||
//TODO: suggest by Cheick: this can probably be optimize by query all NodesTasks then query the all machine in single request
|
||||
|
||||
public async IAsyncEnumerable<Node> GetNodesByTaskId(Guid taskId) {
|
||||
await foreach (var entry in QueryAsync(Query.RowKey(taskId.ToString()))) {
|
||||
var node = await _context.NodeOperations.GetByMachineId(entry.MachineId);
|
||||
if (node is not null) {
|
||||
yield return node;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async IAsyncEnumerable<NodeAssignment> GetNodeAssignments(Guid taskId) {
|
||||
|
||||
await foreach (var entry in QueryAsync(Query.RowKey(taskId.ToString()))) {
|
||||
var node = await _context.NodeOperations.GetByMachineId(entry.MachineId);
|
||||
if (node is not null) {
|
||||
var nodeAssignment = new NodeAssignment(node.MachineId, node.ScalesetId, entry.State);
|
||||
yield return nodeAssignment;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<NodeTasks> GetByMachineId(Guid machineId) {
|
||||
return QueryAsync(Query.PartitionKey(machineId.ToString()));
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<NodeTasks> GetByTaskId(Guid taskId) {
|
||||
return QueryAsync(Query.RowKey(taskId.ToString()));
|
||||
}
|
||||
|
||||
public async Async.Task ClearByMachineId(Guid machineId) {
|
||||
_logTracer.Info($"clearing tasks for node {machineId}");
|
||||
await foreach (var entry in GetByMachineId(machineId)) {
|
||||
var res = await Delete(entry);
|
||||
if (!res.IsOk) {
|
||||
_logTracer.WithHttpStatus(res.ErrorV).Error($"failed to delete node task entry for machine_id: {entry.MachineId}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Task<NodeTasks> Init(NodeTasks nodeTasks) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(nodeTasks);
|
||||
}
|
||||
|
||||
public Task<NodeTasks> SettingUp(NodeTasks nodeTasks) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(nodeTasks);
|
||||
}
|
||||
|
||||
public Task<NodeTasks> Running(NodeTasks nodeTasks) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(nodeTasks);
|
||||
}
|
||||
}
|
@ -14,10 +14,14 @@ public interface IPoolOperations : IStatefulOrm<Pool, PoolState> {
|
||||
IAsyncEnumerable<Pool> SearchStates(IEnumerable<PoolState> states);
|
||||
Async.Task<Pool> SetShutdown(Pool pool, bool Now);
|
||||
|
||||
Async.Task<Pool> Init(Pool pool);
|
||||
|
||||
Async.Task<Pool> Create(PoolName name, Os os, Architecture architecture, bool managed, Guid? clientId = null);
|
||||
new Async.Task Delete(Pool pool);
|
||||
|
||||
// state transitions:
|
||||
Async.Task<Pool> Init(Pool pool);
|
||||
Async.Task<Pool> Running(Pool pool);
|
||||
Async.Task<Pool> Shutdown(Pool pool);
|
||||
Async.Task<Pool> Halt(Pool pool);
|
||||
}
|
||||
|
||||
public class PoolOperations : StatefulOrm<Pool, PoolState, PoolOperations>, IPoolOperations {
|
||||
@ -226,4 +230,9 @@ public class PoolOperations : StatefulOrm<Pool, PoolState, PoolOperations>, IPoo
|
||||
|
||||
return pool;
|
||||
}
|
||||
|
||||
public Task<Pool> Running(Pool pool) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(pool);
|
||||
}
|
||||
}
|
||||
|
@ -15,8 +15,16 @@ public interface IProxyOperations : IStatefulOrm<Proxy, VmState> {
|
||||
Async.Task SaveProxyConfig(Proxy proxy);
|
||||
bool IsOutdated(Proxy proxy);
|
||||
Async.Task<Proxy?> GetOrCreate(string region);
|
||||
|
||||
Task<bool> IsUsed(Proxy proxy);
|
||||
|
||||
// state transitions:
|
||||
Async.Task<Proxy> Init(Proxy proxy);
|
||||
Async.Task<Proxy> ExtensionsLaunch(Proxy proxy);
|
||||
Async.Task<Proxy> ExtensionsFailed(Proxy proxy);
|
||||
Async.Task<Proxy> VmAllocationFailed(Proxy proxy);
|
||||
Async.Task<Proxy> Running(Proxy proxy);
|
||||
Async.Task<Proxy> Stopping(Proxy proxy);
|
||||
Async.Task<Proxy> Stopped(Proxy proxy);
|
||||
}
|
||||
public class ProxyOperations : StatefulOrm<Proxy, VmState, ProxyOperations>, IProxyOperations {
|
||||
|
||||
@ -285,11 +293,26 @@ public class ProxyOperations : StatefulOrm<Proxy, VmState, ProxyOperations>, IPr
|
||||
return await Stopped(proxy);
|
||||
}
|
||||
|
||||
private async Task<Proxy> Stopped(Proxy proxy) {
|
||||
public async Task<Proxy> Stopped(Proxy proxy) {
|
||||
var stoppedVm = await SetState(proxy, VmState.Stopped);
|
||||
_logTracer.Info($"removing proxy: {stoppedVm.Region}");
|
||||
await _context.Events.SendEvent(new EventProxyDeleted(stoppedVm.Region, stoppedVm.ProxyId));
|
||||
await Delete(stoppedVm);
|
||||
return stoppedVm;
|
||||
}
|
||||
|
||||
public Task<Proxy> ExtensionsFailed(Proxy proxy) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(proxy);
|
||||
}
|
||||
|
||||
public Task<Proxy> VmAllocationFailed(Proxy proxy) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(proxy);
|
||||
}
|
||||
|
||||
public Task<Proxy> Running(Proxy proxy) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(proxy);
|
||||
}
|
||||
}
|
||||
|
@ -8,16 +8,8 @@ namespace Microsoft.OneFuzz.Service;
|
||||
public interface IReproOperations : IStatefulOrm<Repro, VmState> {
|
||||
public IAsyncEnumerable<Repro> SearchExpired();
|
||||
|
||||
public Async.Task<Repro> Stopping(Repro repro);
|
||||
|
||||
public IAsyncEnumerable<Repro> SearchStates(IEnumerable<VmState>? states);
|
||||
|
||||
|
||||
public Async.Task<Repro> Init(Repro repro);
|
||||
public Async.Task<Repro> ExtensionsLaunch(Repro repro);
|
||||
|
||||
public Async.Task<Repro> Stopped(Repro repro);
|
||||
|
||||
public Async.Task<Repro> SetFailed(Repro repro, VirtualMachineData vmData);
|
||||
|
||||
public Async.Task<Repro> SetError(Repro repro, Error result);
|
||||
@ -26,6 +18,15 @@ public interface IReproOperations : IStatefulOrm<Repro, VmState> {
|
||||
|
||||
public Async.Task<Container?> GetSetupContainer(Repro repro);
|
||||
Task<OneFuzzResult<Repro>> Create(ReproConfig config, UserInfo userInfo);
|
||||
|
||||
// state transitions:
|
||||
Task<Repro> Init(Repro repro);
|
||||
Task<Repro> ExtensionsLaunch(Repro repro);
|
||||
Task<Repro> ExtensionsFailed(Repro repro);
|
||||
Task<Repro> VmAllocationFailed(Repro repro);
|
||||
Task<Repro> Running(Repro repro);
|
||||
Task<Repro> Stopping(Repro repro);
|
||||
Task<Repro> Stopped(Repro repro);
|
||||
}
|
||||
|
||||
public class ReproOperations : StatefulOrm<Repro, VmState, ReproOperations>, IReproOperations {
|
||||
@ -316,4 +317,19 @@ public class ReproOperations : StatefulOrm<Repro, VmState, ReproOperations>, IRe
|
||||
return OneFuzzResult<Repro>.Error(ErrorCode.UNABLE_TO_FIND, "unable to find report");
|
||||
}
|
||||
}
|
||||
|
||||
public Task<Repro> ExtensionsFailed(Repro repro) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(repro);
|
||||
}
|
||||
|
||||
public Task<Repro> VmAllocationFailed(Repro repro) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(repro);
|
||||
}
|
||||
|
||||
public Task<Repro> Running(Repro repro) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(repro);
|
||||
}
|
||||
}
|
||||
|
@ -18,8 +18,6 @@ public interface IScalesetOperations : IStatefulOrm<Scaleset, ScalesetState> {
|
||||
|
||||
Async.Task<bool> CleanupNodes(Scaleset scaleSet);
|
||||
|
||||
Async.Task SetSize(Scaleset scaleset, int size);
|
||||
|
||||
Async.Task SyncScalesetSize(Scaleset scaleset);
|
||||
|
||||
Async.Task<Scaleset> SetState(Scaleset scaleset, ScalesetState state);
|
||||
@ -27,6 +25,15 @@ public interface IScalesetOperations : IStatefulOrm<Scaleset, ScalesetState> {
|
||||
IAsyncEnumerable<Scaleset> SearchStates(IEnumerable<ScalesetState> states);
|
||||
Async.Task<Scaleset> SetShutdown(Scaleset scaleset, bool now);
|
||||
Async.Task<Scaleset> SetSize(Scaleset scaleset, long size);
|
||||
|
||||
// state transitions:
|
||||
Async.Task<Scaleset> Init(Scaleset scaleset);
|
||||
Async.Task<Scaleset> Setup(Scaleset scaleset);
|
||||
Async.Task<Scaleset> Resize(Scaleset scaleset);
|
||||
Async.Task<Scaleset> Running(Scaleset scaleset);
|
||||
Async.Task<Scaleset> Shutdown(Scaleset scaleset);
|
||||
Async.Task<Scaleset> Halt(Scaleset scaleset);
|
||||
Async.Task<Scaleset> CreationFailed(Scaleset scaleset);
|
||||
}
|
||||
|
||||
public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetOperations>, IScalesetOperations {
|
||||
@ -77,14 +84,13 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
|
||||
}
|
||||
}
|
||||
|
||||
public async Async.Task SetSize(Scaleset scaleset, int size) {
|
||||
// # no longer needing to resize
|
||||
if (scaleset is null)
|
||||
return;
|
||||
if (scaleset.State != ScalesetState.Resize)
|
||||
return;
|
||||
public async Async.Task<Scaleset> Resize(Scaleset scaleset) {
|
||||
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} scaleset resize: scaleset_id:{scaleset.ScalesetId} size:{size}");
|
||||
if (scaleset.State != ScalesetState.Resize) {
|
||||
return scaleset;
|
||||
}
|
||||
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} scaleset resize: scaleset_id:{scaleset.ScalesetId} size:{scaleset.Size}");
|
||||
|
||||
var shrinkQueue = new ShrinkQueue(scaleset.ScalesetId, _context.Queue, _log);
|
||||
// # reset the node delete queue
|
||||
@ -101,14 +107,13 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
|
||||
//#if the scaleset is missing, this is an indication the scaleset
|
||||
//# was manually deleted, rather than having OneFuzz delete it. As
|
||||
//# such, we should go thruogh the process of deleting it.
|
||||
await SetShutdown(scaleset, now: true);
|
||||
return;
|
||||
return await SetShutdown(scaleset, now: true);
|
||||
} else if (scaleset.Size == vmssSize) {
|
||||
await ResizeEqual(scaleset);
|
||||
return await ResizeEqual(scaleset);
|
||||
} else if (scaleset.Size > vmssSize) {
|
||||
await ResizeGrow(scaleset);
|
||||
return await ResizeGrow(scaleset);
|
||||
} else {
|
||||
await ResizeShrink(scaleset, vmssSize - scaleset.Size);
|
||||
return await ResizeShrink(scaleset, vmssSize - scaleset.Size);
|
||||
}
|
||||
}
|
||||
|
||||
@ -631,32 +636,33 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
|
||||
}
|
||||
|
||||
|
||||
private async Async.Task ResizeEqual(Scaleset scaleset) {
|
||||
private async Async.Task<Scaleset> ResizeEqual(Scaleset scaleset) {
|
||||
//# NOTE: this is the only place we reset to the 'running' state.
|
||||
//# This ensures that our idea of scaleset size agrees with Azure
|
||||
|
||||
var nodeCount = await _context.NodeOperations.SearchStates(scalesetId: scaleset.ScalesetId).CountAsync();
|
||||
if (nodeCount == scaleset.Size) {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} resize finished: {scaleset.ScalesetId}");
|
||||
await SetState(scaleset, ScalesetState.Running);
|
||||
return await SetState(scaleset, ScalesetState.Running);
|
||||
} else {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} resize finished, waiting for nodes to check in. scaleset_id: {scaleset.ScalesetId} ({nodeCount} of {scaleset.Size} checked in)");
|
||||
return scaleset;
|
||||
}
|
||||
}
|
||||
|
||||
private async Async.Task ResizeGrow(Scaleset scaleset) {
|
||||
|
||||
private async Async.Task<Scaleset> ResizeGrow(Scaleset scaleset) {
|
||||
var resizeResult = await _context.VmssOperations.ResizeVmss(scaleset.ScalesetId, scaleset.Size);
|
||||
if (resizeResult.IsOk == false) {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} scaleset is mid-operation already scaleset_id: {scaleset.ScalesetId} message: {resizeResult.ErrorV}");
|
||||
}
|
||||
return scaleset;
|
||||
}
|
||||
|
||||
private async Async.Task ResizeShrink(Scaleset scaleset, long? toRemove) {
|
||||
private async Async.Task<Scaleset> ResizeShrink(Scaleset scaleset, long? toRemove) {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} shrinking scaleset. scaleset_id: {scaleset.ScalesetId} to remove {toRemove}");
|
||||
|
||||
if (!toRemove.HasValue) {
|
||||
return;
|
||||
return scaleset;
|
||||
} else {
|
||||
var queue = new ShrinkQueue(scaleset.ScalesetId, _context.Queue, _log);
|
||||
await queue.SetSize(toRemove.Value);
|
||||
@ -664,6 +670,7 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
|
||||
await foreach (var node in nodes) {
|
||||
await _context.NodeOperations.SendStopIfFree(node);
|
||||
}
|
||||
return scaleset;
|
||||
}
|
||||
}
|
||||
|
||||
@ -785,4 +792,14 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
|
||||
return 1000;
|
||||
}
|
||||
}
|
||||
|
||||
public Task<Scaleset> Running(Scaleset scaleset) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(scaleset);
|
||||
}
|
||||
|
||||
public Task<Scaleset> CreationFailed(Scaleset scaleset) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(scaleset);
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,16 @@ public interface ITaskOperations : IStatefulOrm<Task, TaskState> {
|
||||
Async.Task<Pool?> GetPool(Task task);
|
||||
Async.Task<Task> SetState(Task task, TaskState state);
|
||||
Async.Task<OneFuzzResult<Task>> Create(TaskConfig config, Guid jobId, UserInfo userInfo);
|
||||
|
||||
// state transitions:
|
||||
Async.Task<Task> Init(Task task);
|
||||
Async.Task<Task> Waiting(Task task);
|
||||
Async.Task<Task> Scheduled(Task task);
|
||||
Async.Task<Task> SettingUp(Task task);
|
||||
Async.Task<Task> Running(Task task);
|
||||
Async.Task<Task> Stopping(Task task);
|
||||
Async.Task<Task> Stopped(Task task);
|
||||
Async.Task<Task> WaitJob(Task task);
|
||||
}
|
||||
|
||||
public class TaskOperations : StatefulOrm<Task, TaskState, TaskOperations>, ITaskOperations {
|
||||
@ -305,8 +315,8 @@ public class TaskOperations : StatefulOrm<Task, TaskState, TaskOperations>, ITas
|
||||
return task;
|
||||
}
|
||||
|
||||
private async Async.Task<Task> Stopped(Task inputTask) {
|
||||
var task = await SetState(inputTask, TaskState.Stopped);
|
||||
public async Async.Task<Task> Stopped(Task task) {
|
||||
task = await SetState(task, TaskState.Stopped);
|
||||
await _context.Queue.DeleteQueue($"{task.TaskId}", StorageType.Corpus);
|
||||
|
||||
// # TODO: we need to 'unschedule' this task from the existing pools
|
||||
@ -317,4 +327,29 @@ public class TaskOperations : StatefulOrm<Task, TaskState, TaskOperations>, ITas
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
public Task<Task> Waiting(Task task) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(task);
|
||||
}
|
||||
|
||||
public Task<Task> Scheduled(Task task) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(task);
|
||||
}
|
||||
|
||||
public Task<Task> SettingUp(Task task) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(task);
|
||||
}
|
||||
|
||||
public Task<Task> Running(Task task) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(task);
|
||||
}
|
||||
|
||||
public Task<Task> WaitJob(Task task) {
|
||||
// nothing to do
|
||||
return Async.Task.FromResult(task);
|
||||
}
|
||||
}
|
||||
|
@ -160,9 +160,11 @@ namespace ApiService.OneFuzzLib.Orm {
|
||||
var delegateType = typeof(StateTransition);
|
||||
MethodInfo delegateSignature = delegateType.GetMethod("Invoke")!;
|
||||
|
||||
var missing = new List<string>();
|
||||
foreach (var state in states) {
|
||||
var methodInfo = thisType?.GetMethod(state.ToString());
|
||||
var methodInfo = thisType.GetMethod(state.ToString());
|
||||
if (methodInfo == null) {
|
||||
missing.Add(state);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -176,9 +178,12 @@ namespace ApiService.OneFuzzLib.Orm {
|
||||
continue;
|
||||
}
|
||||
|
||||
throw new Exception($"State transition method '{state}' in '{thisType?.Name}' does not have the correct signature. Expected '{delegateSignature}' actual '{methodInfo}' ");
|
||||
throw new InvalidOperationException($"State transition method '{state}' in '{thisType.Name}' does not have the correct signature. Expected '{delegateSignature}' actual '{methodInfo}' ");
|
||||
};
|
||||
|
||||
if (missing.Any()) {
|
||||
throw new InvalidOperationException($"State transitions are missing for '{thisType.Name}': {string.Join(", ", missing)}");
|
||||
}
|
||||
|
||||
_partitionKeyGetter =
|
||||
typeof(T).GetProperties().FirstOrDefault(p => p.GetCustomAttributes(true).OfType<PartitionKeyAttribute>().Any())?.GetMethod switch {
|
||||
|
@ -1,9 +1,4 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Xunit;
|
||||
using Xunit;
|
||||
|
||||
namespace FunctionalTests {
|
||||
public class Helpers {
|
||||
|
Loading…
x
Reference in New Issue
Block a user