mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-22 14:19:03 +00:00
Implement not implemented bits (#2341)
`ScalesetOperations` was missing: * `ReimageNodes` * `DeleteNodes`
This commit is contained in:
@ -333,7 +333,7 @@ public static class NodeStateHelper {
|
||||
|
||||
public enum NodeDisposalStrategy {
|
||||
ScaleIn,
|
||||
Decomission
|
||||
Decommission
|
||||
}
|
||||
|
||||
|
||||
|
@ -10,6 +10,7 @@ public interface INodeOperations : IStatefulOrm<Node, NodeState> {
|
||||
Task<bool> CanProcessNewWork(Node node);
|
||||
|
||||
Task<OneFuzzResultVoid> AcquireScaleInProtection(Node node);
|
||||
Task<OneFuzzResultVoid> ReleaseScaleInProtection(Node node);
|
||||
|
||||
bool IsOutdated(Node node);
|
||||
Async.Task Stop(Node node, bool done = false);
|
||||
@ -75,17 +76,12 @@ public interface INodeOperations : IStatefulOrm<Node, NodeState> {
|
||||
|
||||
public class NodeOperations : StatefulOrm<Node, NodeState, NodeOperations>, INodeOperations {
|
||||
|
||||
|
||||
public NodeOperations(
|
||||
ILogTracer log,
|
||||
IOnefuzzContext context
|
||||
)
|
||||
public NodeOperations(ILogTracer log, IOnefuzzContext context)
|
||||
: base(log, context) {
|
||||
|
||||
}
|
||||
|
||||
public async Task<OneFuzzResultVoid> AcquireScaleInProtection(Node node) {
|
||||
if (await ScalesetNodeExists(node) && node.ScalesetId is Guid scalesetId) {
|
||||
if (node.ScalesetId is Guid scalesetId && await ScalesetNodeExists(node)) {
|
||||
_logTracer.Info($"Setting scale-in protection on node {node.MachineId}");
|
||||
return await _context.VmssOperations.UpdateScaleInProtection(scalesetId, node.MachineId, protectFromScaleIn: true);
|
||||
}
|
||||
@ -93,6 +89,17 @@ public class NodeOperations : StatefulOrm<Node, NodeState, NodeOperations>, INod
|
||||
return OneFuzzResultVoid.Ok;
|
||||
}
|
||||
|
||||
public async Task<OneFuzzResultVoid> ReleaseScaleInProtection(Node node) {
|
||||
if (!node.DebugKeepNode &&
|
||||
node.ScalesetId is Guid scalesetId &&
|
||||
await ScalesetNodeExists(node)) {
|
||||
_logTracer.Info($"Removing scale-in protection on node {node.MachineId}");
|
||||
return await _context.VmssOperations.UpdateScaleInProtection(scalesetId, node.MachineId, protectFromScaleIn: false);
|
||||
}
|
||||
|
||||
return OneFuzzResultVoid.Ok;
|
||||
}
|
||||
|
||||
public async Async.Task<bool> ScalesetNodeExists(Node node) {
|
||||
if (node.ScalesetId == null) {
|
||||
return false;
|
||||
|
@ -530,12 +530,12 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
|
||||
toReimage[deadNode.MachineId] = deadNode;
|
||||
}
|
||||
|
||||
// Perform operations until they fail due to scaleset getting locked
|
||||
NodeDisposalStrategy strategy =
|
||||
(_context.ServiceConfiguration.OneFuzzNodeDisposalStrategy.ToLowerInvariant()) switch {
|
||||
"decomission" => NodeDisposalStrategy.Decomission,
|
||||
_ => NodeDisposalStrategy.ScaleIn
|
||||
};
|
||||
// Perform operations until they fail due to scaleset getting locked:
|
||||
var strategy = _context.ServiceConfiguration.OneFuzzNodeDisposalStrategy.ToLowerInvariant() switch {
|
||||
// allowing typo’d or correct name for config setting:
|
||||
"decomission" or "decommission" => NodeDisposalStrategy.Decommission,
|
||||
_ => NodeDisposalStrategy.ScaleIn,
|
||||
};
|
||||
|
||||
await ReimageNodes(scaleSet, toReimage.Values, strategy);
|
||||
await DeleteNodes(scaleSet, toDelete.Values, strategy);
|
||||
@ -544,71 +544,105 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState, ScalesetO
|
||||
}
|
||||
|
||||
|
||||
public async Async.Task ReimageNodes(Scaleset scaleSet, IEnumerable<Node> nodes, NodeDisposalStrategy disposalStrategy) {
|
||||
public async Async.Task ReimageNodes(Scaleset scaleset, IEnumerable<Node> nodes, NodeDisposalStrategy disposalStrategy) {
|
||||
|
||||
if (nodes is null || !nodes.Any()) {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} no nodes to reimage: scaleset_id: {scaleSet.ScalesetId}");
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} no nodes to reimage: scaleset_id: {scaleset.ScalesetId}");
|
||||
return;
|
||||
}
|
||||
|
||||
if (scaleSet.State == ScalesetState.Shutdown) {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} scaleset shutting down, deleting rather than reimaging nodes. scaleset_id: {scaleSet.ScalesetId}");
|
||||
await DeleteNodes(scaleSet, nodes, disposalStrategy);
|
||||
if (scaleset.State == ScalesetState.Shutdown) {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} scaleset shutting down, deleting rather than reimaging nodes. scaleset_id: {scaleset.ScalesetId}");
|
||||
await DeleteNodes(scaleset, nodes, disposalStrategy);
|
||||
return;
|
||||
}
|
||||
|
||||
if (scaleSet.State == ScalesetState.Halt) {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} scaleset halting, ignoring node reimage: scaleset_id:{scaleSet.ScalesetId}");
|
||||
if (scaleset.State == ScalesetState.Halt) {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} scaleset halting, ignoring node reimage: scaleset_id:{scaleset.ScalesetId}");
|
||||
return;
|
||||
}
|
||||
|
||||
var machineIds = new HashSet<Guid>();
|
||||
foreach (var node in nodes) {
|
||||
if (node.State == NodeState.Done) {
|
||||
if (node.State != NodeState.Done) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (node.DebugKeepNode) {
|
||||
_log.Warning($"{SCALESET_LOG_PREFIX} not reimaging manually overriden node. scaleset_id:{scaleSet.ScalesetId} machine_id:{node.MachineId}");
|
||||
_log.Warning($"{SCALESET_LOG_PREFIX} not reimaging manually overriden node. scaleset_id:{scaleset.ScalesetId} machine_id:{node.MachineId}");
|
||||
} else {
|
||||
machineIds.Add(node.MachineId);
|
||||
}
|
||||
}
|
||||
|
||||
if (!machineIds.Any()) {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} no nodes to reimage: {scaleSet.ScalesetId}");
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} no nodes to reimage: {scaleset.ScalesetId}");
|
||||
return;
|
||||
}
|
||||
|
||||
throw new NotImplementedException();
|
||||
switch (disposalStrategy) {
|
||||
case NodeDisposalStrategy.Decommission:
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} decommissioning nodes");
|
||||
await Async.Task.WhenAll(nodes
|
||||
.Where(node => machineIds.Contains(node.MachineId))
|
||||
.Select(node => _context.NodeOperations.ReleaseScaleInProtection(node)));
|
||||
return;
|
||||
|
||||
case NodeDisposalStrategy.ScaleIn:
|
||||
await _context.VmssOperations.ReimageNodes(scaleset.ScalesetId, machineIds);
|
||||
await Async.Task.WhenAll(nodes
|
||||
.Where(node => machineIds.Contains(node.MachineId))
|
||||
.Select(async node => {
|
||||
await _context.NodeOperations.Delete(node);
|
||||
await _context.NodeOperations.ReleaseScaleInProtection(node);
|
||||
}));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
public async Async.Task DeleteNodes(Scaleset scaleSet, IEnumerable<Node> nodes, NodeDisposalStrategy disposalStrategy) {
|
||||
|
||||
public async Async.Task DeleteNodes(Scaleset scaleset, IEnumerable<Node> nodes, NodeDisposalStrategy disposalStrategy) {
|
||||
if (nodes is null || !nodes.Any()) {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} no nodes to delete: scaleset_id: {scaleSet.ScalesetId}");
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} no nodes to delete: scaleset_id: {scaleset.ScalesetId}");
|
||||
return;
|
||||
}
|
||||
|
||||
foreach (var node in nodes) {
|
||||
await _context.NodeOperations.SetHalt(node);
|
||||
}
|
||||
// TODO: try to do this as one atomic operation:
|
||||
await Async.Task.WhenAll(nodes.Select(node => _context.NodeOperations.SetHalt(node)));
|
||||
|
||||
if (scaleSet.State == ScalesetState.Halt) {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} scaleset halting, ignoring deletion {scaleSet.ScalesetId}");
|
||||
if (scaleset.State == ScalesetState.Halt) {
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} scaleset halting, ignoring deletion {scaleset.ScalesetId}");
|
||||
return;
|
||||
}
|
||||
|
||||
HashSet<Guid> machineIds = new();
|
||||
|
||||
foreach (var node in nodes) {
|
||||
if (node.DebugKeepNode) {
|
||||
_log.Warning($"{SCALESET_LOG_PREFIX} not deleting manually overriden node. scaleset_id:{scaleSet.ScalesetId} machine_id:{node.MachineId}");
|
||||
_log.Warning($"{SCALESET_LOG_PREFIX} not deleting manually overriden node. scaleset_id:{scaleset.ScalesetId} machine_id:{node.MachineId}");
|
||||
} else {
|
||||
machineIds.Add(node.MachineId);
|
||||
}
|
||||
}
|
||||
|
||||
throw new NotImplementedException();
|
||||
switch (disposalStrategy) {
|
||||
case NodeDisposalStrategy.Decommission:
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} decommissioning nodes");
|
||||
await Async.Task.WhenAll(nodes
|
||||
.Where(node => machineIds.Contains(node.MachineId))
|
||||
.Select(node => _context.NodeOperations.ReleaseScaleInProtection(node)));
|
||||
return;
|
||||
|
||||
case NodeDisposalStrategy.ScaleIn:
|
||||
_log.Info($"{SCALESET_LOG_PREFIX} deleting nodes scaleset_id: {scaleset.ScalesetId} machine_id: {string.Join(", ", machineIds)}");
|
||||
await _context.VmssOperations.DeleteNodes(scaleset.ScalesetId, machineIds);
|
||||
await Async.Task.WhenAll(nodes
|
||||
.Where(node => machineIds.Contains(node.MachineId))
|
||||
.Select(async node => {
|
||||
await _context.NodeOperations.Delete(node);
|
||||
await _context.NodeOperations.ReleaseScaleInProtection(node);
|
||||
}));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<OneFuzzResult<Scaleset>> GetById(Guid scalesetId) {
|
||||
|
@ -41,6 +41,8 @@ public interface IVmssOperations {
|
||||
IDictionary<string, string> tags);
|
||||
|
||||
Async.Task<List<string>?> ListVmss(Guid name, Func<VirtualMachineScaleSetVmResource, bool>? filter);
|
||||
Async.Task ReimageNodes(Guid scalesetId, IReadOnlySet<Guid> machineIds);
|
||||
Async.Task DeleteNodes(Guid scalesetId, IReadOnlySet<Guid> machineIds);
|
||||
}
|
||||
|
||||
public class VmssOperations : IVmssOperations {
|
||||
@ -392,4 +394,86 @@ public class VmssOperations : IVmssOperations {
|
||||
|
||||
return skuNames;
|
||||
});
|
||||
|
||||
public async Async.Task ReimageNodes(Guid scalesetId, IReadOnlySet<Guid> machineIds) {
|
||||
var result = await CheckCanUpdate(scalesetId);
|
||||
if (!result.IsOk) {
|
||||
throw new Exception($"cannot reimage scaleset {scalesetId}: {result.ErrorV}");
|
||||
}
|
||||
|
||||
var instanceIds = new HashSet<string>();
|
||||
var machineToInstance = await ListInstanceIds(scalesetId);
|
||||
foreach (var machineId in machineIds) {
|
||||
if (machineToInstance.TryGetValue(machineId, out var instanceId)) {
|
||||
instanceIds.Add(instanceId);
|
||||
} else {
|
||||
_log.Info($"unable to find instance ID for {scalesetId}:{machineId}");
|
||||
}
|
||||
}
|
||||
|
||||
if (!instanceIds.Any()) {
|
||||
return;
|
||||
}
|
||||
|
||||
var subscription = _creds.GetSubscription();
|
||||
var resourceGroup = _creds.GetBaseResourceGroup();
|
||||
var vmssId = VirtualMachineScaleSetResource.CreateResourceIdentifier(
|
||||
subscription, resourceGroup, scalesetId.ToString());
|
||||
|
||||
var computeClient = _creds.ArmClient;
|
||||
var vmssResource = computeClient.GetVirtualMachineScaleSetResource(vmssId);
|
||||
|
||||
// Nodes that must be are 'upgraded' before the reimage. This call makes sure
|
||||
// the instance is up-to-date with the VMSS model.
|
||||
// The expectation is that these requests are queued and handled subsequently.
|
||||
// The VMSS Team confirmed this expectation and testing supports it, as well.
|
||||
_log.Info($"upgrading VMSS ndoes - name: {scalesetId} ids: {string.Join(", ", instanceIds)}");
|
||||
await vmssResource.UpdateInstancesAsync(
|
||||
WaitUntil.Started,
|
||||
new VirtualMachineScaleSetVmInstanceRequiredIds(instanceIds));
|
||||
|
||||
_log.Info($"reimaging VMSS nodes - name: {scalesetId} ids: {string.Join(", ", instanceIds)}");
|
||||
|
||||
// very weird API here…
|
||||
var reqInstanceIds = new VirtualMachineScaleSetVmInstanceIds();
|
||||
foreach (var instanceId in instanceIds) {
|
||||
reqInstanceIds.InstanceIds.Add(instanceId);
|
||||
}
|
||||
|
||||
await vmssResource.ReimageAllAsync(WaitUntil.Started, reqInstanceIds);
|
||||
}
|
||||
|
||||
public async Async.Task DeleteNodes(Guid scalesetId, IReadOnlySet<Guid> machineIds) {
|
||||
var result = await CheckCanUpdate(scalesetId);
|
||||
if (!result.IsOk) {
|
||||
throw new Exception($"cannot delete nodes from scaleset {scalesetId}: {result.ErrorV}");
|
||||
}
|
||||
|
||||
var instanceIds = new HashSet<string>();
|
||||
var machineToInstance = await ListInstanceIds(scalesetId);
|
||||
foreach (var machineId in machineIds) {
|
||||
if (machineToInstance.TryGetValue(machineId, out var instanceId)) {
|
||||
instanceIds.Add(instanceId);
|
||||
} else {
|
||||
_log.Info($"unable to find instance ID for {scalesetId}:{machineId}");
|
||||
}
|
||||
}
|
||||
|
||||
if (!instanceIds.Any()) {
|
||||
return;
|
||||
}
|
||||
|
||||
var subscription = _creds.GetSubscription();
|
||||
var resourceGroup = _creds.GetBaseResourceGroup();
|
||||
var vmssId = VirtualMachineScaleSetResource.CreateResourceIdentifier(
|
||||
subscription, resourceGroup, scalesetId.ToString());
|
||||
|
||||
var computeClient = _creds.ArmClient;
|
||||
var vmssResource = computeClient.GetVirtualMachineScaleSetResource(vmssId);
|
||||
|
||||
_log.Info($"deleting scaleset VMs - name: {scalesetId} ids: {instanceIds}");
|
||||
await vmssResource.DeleteInstancesAsync(
|
||||
WaitUntil.Started,
|
||||
new VirtualMachineScaleSetVmInstanceRequiredIds(instanceIds));
|
||||
}
|
||||
}
|
||||
|
@ -233,10 +233,6 @@ public class WebhookMessageLogOperations : Orm<WebhookMessageLog>, IWebhookMessa
|
||||
|
||||
}
|
||||
|
||||
private void QueueObject(string v, WebhookMessageQueueObj obj, StorageType config, int? visibility_timeout) {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public IAsyncEnumerable<WebhookMessageLog> SearchExpired() {
|
||||
var expireTime = (DateTimeOffset.UtcNow - TimeSpan.FromDays(EXPIRE_DAYS)).ToString("o");
|
||||
|
||||
|
Reference in New Issue
Block a user