timer workers (part 2) (#1876)

* timer workers (part 2)

* addressing pr comment

Co-authored-by: stas <statis@microsoft.com>
This commit is contained in:
Stas
2022-05-02 17:32:34 -07:00
committed by GitHub
parent 3541b9a7ac
commit 5ccd519065
19 changed files with 565 additions and 75 deletions

View File

@ -114,12 +114,13 @@ public interface ILogTracer {
void Verbose(string message);
ILogTracer WithTag(string k, string v);
ILogTracer WithTags((string, string)[]? tags);
ILogTracer WithTags(IEnumerable<(string, string)>? tags);
ILogTracer WithHttpStatus((int, string) status);
}
internal interface ILogTracerInternal : ILogTracer {
void ReplaceCorrelationId(Guid newCorrelationId);
void AddTags((string, string)[] tags);
void AddTags(IEnumerable<(string, string)> tags);
}
@ -130,14 +131,14 @@ public class LogTracer : ILogTracerInternal {
}
private Guid _correlationId;
private List<ILog> _loggers;
private IEnumerable<ILog> _loggers;
private Dictionary<string, string> _tags;
private SeverityLevel _logSeverityLevel;
public Guid CorrelationId => _correlationId;
public IReadOnlyDictionary<string, string> Tags => _tags;
private static List<KeyValuePair<string, string>> ConvertTags((string, string)[]? tags) {
private static IEnumerable<KeyValuePair<string, string>> ConvertTags(IEnumerable<(string, string)>? tags) {
List<KeyValuePair<string, string>> converted = new List<KeyValuePair<string, string>>();
if (tags is null) {
return converted;
@ -149,11 +150,11 @@ public class LogTracer : ILogTracerInternal {
}
}
public LogTracer(Guid correlationId, (string, string)[]? tags, List<ILog> loggers, SeverityLevel logSeverityLevel) :
public LogTracer(Guid correlationId, IEnumerable<(string, string)>? tags, List<ILog> loggers, SeverityLevel logSeverityLevel) :
this(correlationId, new Dictionary<string, string>(ConvertTags(tags)), loggers, logSeverityLevel) { }
public LogTracer(Guid correlationId, IReadOnlyDictionary<string, string> tags, List<ILog> loggers, SeverityLevel logSeverityLevel) {
public LogTracer(Guid correlationId, IReadOnlyDictionary<string, string> tags, IEnumerable<ILog> loggers, SeverityLevel logSeverityLevel) {
_correlationId = correlationId;
_tags = new(tags);
_loggers = loggers;
@ -166,7 +167,7 @@ public class LogTracer : ILogTracerInternal {
}
//single threaded only
public void AddTags((string, string)[] tags) {
public void AddTags(IEnumerable<(string, string)> tags) {
if (tags is not null) {
foreach (var (k, v) in tags) {
_tags[k] = v;
@ -178,7 +179,12 @@ public class LogTracer : ILogTracerInternal {
return WithTags(new[] { (k, v) });
}
public ILogTracer WithTags((string, string)[]? tags) {
public ILogTracer WithHttpStatus((int, string) status) {
(string, string)[] tags = { ("StatusCode", status.Item1.ToString()), ("ReasonPhrase", status.Item2) };
return WithTags(tags);
}
public ILogTracer WithTags(IEnumerable<(string, string)>? tags) {
var newTags = new Dictionary<string, string>(Tags);
if (tags is not null) {
foreach (var (k, v) in tags) {
@ -255,7 +261,7 @@ public class LogTracer : ILogTracerInternal {
}
public interface ILogTracerFactory {
LogTracer CreateLogTracer(Guid correlationId, (string, string)[]? tags = null, SeverityLevel severityLevel = SeverityLevel.Verbose);
LogTracer CreateLogTracer(Guid correlationId, IEnumerable<(string, string)>? tags = null, SeverityLevel severityLevel = SeverityLevel.Verbose);
}
public class LogTracerFactory : ILogTracerFactory {
@ -265,7 +271,7 @@ public class LogTracerFactory : ILogTracerFactory {
_loggers = loggers;
}
public LogTracer CreateLogTracer(Guid correlationId, (string, string)[]? tags = null, SeverityLevel severityLevel = SeverityLevel.Verbose) {
public LogTracer CreateLogTracer(Guid correlationId, IEnumerable<(string, string)>? tags = null, SeverityLevel severityLevel = SeverityLevel.Verbose) {
return new(correlationId, tags, _loggers, severityLevel);
}

View File

@ -384,3 +384,7 @@ public static class NodeStateHelper {
}
public enum NodeDisposalStrategy {
ScaleIn,
Decomission
}

View File

@ -61,6 +61,7 @@ public abstract record BaseEvent() {
EventScalesetResizeScheduled _ => EventType.ScalesetResizeScheduled,
EventScalesetStateUpdated _ => EventType.ScalesetStateUpdated,
EventNodeDeleted _ => EventType.NodeDeleted,
EventNodeCreated _ => EventType.NodeCreated,
_ => throw new NotImplementedException(),
};
@ -86,6 +87,7 @@ public abstract record BaseEvent() {
EventType.ScalesetResizeScheduled => typeof(EventScalesetResizeScheduled),
EventType.ScalesetStateUpdated => typeof(EventScalesetStateUpdated),
EventType.NodeDeleted => typeof(EventNodeDeleted),
EventType.NodeCreated => typeof(EventNodeCreated),
_ => throw new ArgumentException($"invalid input {eventType}"),
};
@ -234,11 +236,11 @@ public record EventProxyStateUpdated(
) : BaseEvent();
//record EventNodeCreated(
// Guid MachineId,
// Guid? ScalesetId,
// PoolName PoolName
// ) : BaseEvent();
public record EventNodeCreated(
Guid MachineId,
Guid? ScalesetId,
PoolName PoolName
) : BaseEvent();

View File

@ -53,10 +53,10 @@ public record NodeCommandAddSshKey(string PublicKey);
public record NodeCommand
(
StopNodeCommand? Stop,
StopTaskNodeCommand? StopTask,
NodeCommandAddSshKey? AddSshKey,
NodeCommandStopIfFree? StopIfFree
StopNodeCommand? Stop = default,
StopTaskNodeCommand? StopTask = default,
NodeCommandAddSshKey? AddSshKey = default,
NodeCommandStopIfFree? StopIfFree = default
);
public enum NodeTaskState {
@ -83,17 +83,20 @@ public record ProxyHeartbeat
public record Node
(
DateTimeOffset? InitializedAt,
[PartitionKey] PoolName PoolName,
Guid? PoolId,
[RowKey] Guid MachineId,
NodeState State,
Guid? ScalesetId,
DateTimeOffset Heartbeat,
Guid? PoolId,
string Version,
bool ReimageRequested,
bool DeleteRequested,
bool DebugKeepNode
DateTimeOffset? Heartbeat = null,
DateTimeOffset? InitializedAt = null,
NodeState State = NodeState.Init,
List<NodeTasks>? Tasks = null,
List<NodeCommand>? Messages = null,
Guid? ScalesetId = null,
bool ReimageRequested = false,
bool DeleteRequested = false,
bool DebugKeepNode = false
) : StatefulEntityBase<NodeState>(State);

View File

@ -32,6 +32,8 @@ public interface IServiceConfig {
public string? OneFuzzMonitor { get; }
public string? OneFuzzOwner { get; }
public string OneFuzzNodeDisposalStrategy { get; }
public string? OneFuzzResourceGroup { get; }
public string? OneFuzzTelemetry { get; }
@ -78,4 +80,6 @@ public class ServiceConfiguration : IServiceConfig {
public string? OneFuzzResourceGroup { get => Environment.GetEnvironmentVariable("ONEFUZZ_RESOURCE_GROUP"); }
public string? OneFuzzTelemetry { get => Environment.GetEnvironmentVariable("ONEFUZZ_TELEMETRY"); }
public string OneFuzzVersion { get => Environment.GetEnvironmentVariable("ONEFUZZ_VERSION") ?? "0.0.0"; }
public string OneFuzzNodeDisposalStrategy { get => Environment.GetEnvironmentVariable("ONEFUZZ_NODE_DISPOSAL_STRATEGY") ?? "scale_in"; }
}

View File

@ -60,7 +60,7 @@ public class TimerRetention {
_log.Info($"deleting expired notification: {notification.NotificationId}");
var r = await _notificaitonOps.Delete(notification);
if (!r.IsOk) {
_log.Error($"failed to delete notification with id {notification.NotificationId} due to [{r.ErrorV.Item1}] {r.ErrorV.Item2}");
_log.WithHttpStatus(r.ErrorV).Error($"failed to delete notification with id {notification.NotificationId}");
}
}
}
@ -72,7 +72,7 @@ public class TimerRetention {
var updatedJob = job with { UserInfo = userInfo };
var r = await _jobOps.Replace(updatedJob);
if (!r.IsOk) {
_log.Error($"Failed to save job {updatedJob.JobId} due to [{r.ErrorV.Item1}] {r.ErrorV.Item2}");
_log.WithHttpStatus(r.ErrorV).Error($"Failed to save job {updatedJob.JobId}");
}
}
}
@ -84,7 +84,7 @@ public class TimerRetention {
var updatedTask = task with { UserInfo = userInfo };
var r = await _taskOps.Replace(updatedTask);
if (!r.IsOk) {
_log.Error($"Failed to save task {updatedTask.TaskId} due to [{r.ErrorV.Item1}] {r.ErrorV.Item2}");
_log.WithHttpStatus(r.ErrorV).Error($"Failed to save task {updatedTask.TaskId}");
}
}
}
@ -96,7 +96,7 @@ public class TimerRetention {
var updatedRepro = repro with { UserInfo = userInfo };
var r = await _reproOps.Replace(updatedRepro);
if (!r.IsOk) {
_log.Error($"Failed to save repro {updatedRepro.VmId} due to [{r.ErrorV.Item1}] {r.ErrorV.Item2}");
_log.WithHttpStatus(r.ErrorV).Error($"Failed to save repro {updatedRepro.VmId}");
}
}
}

View File

@ -14,7 +14,7 @@ public class TimerWorkers {
_scaleSetOps.UpdateConfigs(scaleset);
//if (_scaleSetOps.Cleanup)
}

View File

@ -30,20 +30,17 @@ public class ConfigOperations : Orm<InstanceConfig>, IConfigOperations {
if (isNew) {
r = await Insert(config);
if (!r.IsOk) {
var (status, reason) = r.ErrorV;
_log.Error($"Failed to save new instance config record with result [{status}] {reason}");
_log.WithHttpStatus(r.ErrorV).Error($"Failed to save new instance config record");
}
} else if (requireEtag && config.ETag.HasValue) {
r = await Update(config);
if (!r.IsOk) {
var (status, reason) = r.ErrorV;
_log.Error($"Failed to update instance config record with result: [{status}] {reason}");
_log.WithHttpStatus(r.ErrorV).Error($"Failed to update instance config record");
}
} else {
r = await Replace(config);
if (!r.IsOk) {
var (status, reason) = r.ErrorV;
_log.Error($"Failed to replace instance config record with result [{status}] {reason}");
_log.WithHttpStatus(r.ErrorV).Error($"Failed to replace instance config record");
}
}

View File

@ -1,5 +1,4 @@
using System.Text.Json;
using System.Threading.Tasks;
using System.Threading.Tasks;
using ApiService.OneFuzzLib.Orm;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
@ -16,14 +15,43 @@ public interface INodeOperations : IStatefulOrm<Node, NodeState> {
int? numResults = default);
new Async.Task Delete(Node node);
Async.Task ReimageLongLivedNodes(Guid scaleSetId);
Async.Task<Node> Create(
Guid poolId,
string poolName,
Guid machineId,
Guid? scaleSetId,
string version,
bool isNew = false);
Async.Task SetHalt(Node node);
IAsyncEnumerable<Node> GetDeadNodes(Guid scaleSetId, TimeSpan expirationPeriod);
Async.Task MarkTasksStoppedEarly(Node node, Error? error = null);
Async.Task ToReimage(Node node, bool done = false);
static TimeSpan NODE_EXPIRATION_TIME = TimeSpan.FromHours(1.0);
static TimeSpan NODE_REIMAGE_TIME = TimeSpan.FromDays(6.0);
}
/// Future work:
///
/// Enabling autoscaling for the scalesets based on the pool work queues.
/// https://docs.microsoft.com/en-us/azure/azure-monitor/platform/autoscale-common-metrics#commonly-used-storage-metrics
public class NodeOperations : StatefulOrm<Node, NodeState>, INodeOperations {
private readonly INodeTasksOperations _nodeTasksOps;
private readonly ITaskOperations _taskOps;
private readonly INodeMessageOperations _nodeMessageOps;
private readonly IEvents _events;
private readonly ILogTracer _log;
private readonly ICreds _creds;
public NodeOperations(
IStorage storage,
@ -32,7 +60,8 @@ public class NodeOperations : StatefulOrm<Node, NodeState>, INodeOperations {
ITaskOperations taskOps,
INodeTasksOperations nodeTasksOps,
INodeMessageOperations nodeMessageOps,
IEvents events
IEvents events,
ICreds creds
)
: base(storage, log, config) {
@ -40,9 +69,123 @@ public class NodeOperations : StatefulOrm<Node, NodeState>, INodeOperations {
_nodeTasksOps = nodeTasksOps;
_nodeMessageOps = nodeMessageOps;
_events = events;
_log = log;
_creds = creds;
}
public async Task<Node?> GetByMachineId(Guid machineId) {
/// Mark any excessively long lived node to be re-imaged.
/// This helps keep nodes on scalesets that use `latest` OS image SKUs
/// reasonably up-to-date with OS patches without disrupting running
/// fuzzing tasks with patch reboot cycles.
public async Async.Task ReimageLongLivedNodes(Guid scaleSetId) {
var timeFilter = $"not (initialized_at ge datetime'{(DateTimeOffset.UtcNow - INodeOperations.NODE_REIMAGE_TIME).ToString("o")}')";
await foreach (var node in QueryAsync($"(scaleset_id eq {scaleSetId}) and {timeFilter}")) {
if (node.DebugKeepNode) {
_log.Info($"removing debug_keep_node for expired node. scaleset_id:{node.ScalesetId} machine_id:{node.MachineId}");
}
await ToReimage(node with { DebugKeepNode = false });
}
}
public async Async.Task ToReimage(Node node, bool done = false) {
var nodeState = node.State;
if (done) {
if (!NodeStateHelper.ReadyForReset.Contains(node.State)) {
nodeState = NodeState.Done;
}
}
var reimageRequested = node.ReimageRequested;
if (!node.ReimageRequested && !node.DeleteRequested) {
_log.Info($"setting reimage_requested: {node.MachineId}");
reimageRequested = true;
}
var updatedNode = node with { State = nodeState, ReimageRequested = reimageRequested };
//if we're going to reimage, make sure the node doesn't pick up new work too.
await SendStopIfFree(updatedNode);
var r = await Replace(updatedNode);
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error("Failed to save Node record");
}
}
public IAsyncEnumerable<Node> GetDeadNodes(Guid scaleSetId, TimeSpan expirationPeriod) {
var minDate = DateTimeOffset.UtcNow - expirationPeriod;
var filter = $"heartbeat lt datetime'{minDate.ToString("o")}' or Timestamp lt datetime'{minDate.ToString("o")}'";
return QueryAsync(Query.And(filter, $"scaleset_id eq ${scaleSetId}"));
}
public async Async.Task<Node> Create(
Guid poolId,
string poolName,
Guid machineId,
Guid? scaleSetId,
string version,
bool isNew = false) {
var node = new Node(poolName, machineId, poolId, version, ScalesetId: scaleSetId);
ResultVoid<(int, string)> r;
if (isNew) {
r = await Replace(node);
} else {
r = await Update(node);
}
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error($"failed to save NodeRecord, isNew: {isNew}");
} else {
await _events.SendEvent(
new EventNodeCreated(
node.MachineId,
node.ScalesetId,
node.PoolName
)
);
}
return node;
}
public async Async.Task Stop(Node node, bool done = false) {
await ToReimage(node, done);
await SendMessage(node, new NodeCommand(Stop: new StopNodeCommand()));
}
/// <summary>
/// Tell node to stop everything
/// </summary>
/// <param name="node"></param>
/// <returns></returns>
public async Async.Task SetHalt(Node node) {
_log.Info($"setting halt: {node.MachineId}");
var updatedNode = node with { DeleteRequested = true };
await Stop(updatedNode, true);
await SendStopIfFree(updatedNode);
}
public async Async.Task SendStopIfFree(Node node) {
var ver = new Version(_config.OneFuzzVersion.Split('-')[0]);
if (ver >= Version.Parse("2.16.1")) {
await SendMessage(node, new NodeCommand(StopIfFree: new NodeCommandStopIfFree()));
}
}
public async Async.Task SendMessage(Node node, NodeCommand message) {
var r = await _nodeMessageOps.Replace(new NodeMessage(node.MachineId, message));
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error($"failed to replace NodeMessge record for machine_id: {node.MachineId}");
}
}
public async Async.Task<Node?> GetByMachineId(Guid machineId) {
var data = QueryAsync(filter: $"RowKey eq '{machineId}'");
return await data.FirstOrDefaultAsync();
@ -68,8 +211,7 @@ public class NodeOperations : StatefulOrm<Node, NodeState>, INodeOperations {
}
if (states is not null) {
IEnumerable<string> convertedStates = states.Select(x => JsonSerializer.Serialize(x, EntityConverter.GetJsonSerializerOptions()).Trim('"'));
var q = Query.EqualAny("state", convertedStates);
var q = Query.EqualAnyEnum("state", states);
queryParts.Add($"({q})");
}
@ -123,6 +265,7 @@ public class NodeOperations : StatefulOrm<Node, NodeState>, INodeOperations {
await _events.SendEvent(new EventNodeDeleted(node.MachineId, node.ScalesetId, node.PoolName));
}
}
@ -145,7 +288,6 @@ public class NodeTasksOperations : StatefulOrm<NodeTasks, NodeTaskState>, INodeT
//TODO: suggest by Cheick: this can probably be optimize by query all NodesTasks then query the all machine in single request
public async IAsyncEnumerable<Node> GetNodesByTaskId(Guid taskId, INodeOperations nodeOps) {
List<Node> results = new();
await foreach (var entry in QueryAsync($"task_id eq '{taskId}'")) {
var node = await nodeOps.GetByMachineId(entry.MachineId);
if (node is not null) {
@ -177,7 +319,7 @@ public class NodeTasksOperations : StatefulOrm<NodeTasks, NodeTaskState>, INodeT
await foreach (var entry in GetByMachineId(machineId)) {
var res = await Delete(entry);
if (!res.IsOk) {
_log.Error($"failed to delete node task entry for machine_id: {entry.MachineId} due to [{res.ErrorV.Item1}] {res.ErrorV.Item2}");
_log.WithHttpStatus(res.ErrorV).Error($"failed to delete node task entry for machine_id: {entry.MachineId}");
}
}
}
@ -189,7 +331,9 @@ public record NodeMessage(
[PartitionKey] Guid MachineId,
[RowKey] string MessageId,
NodeCommand Message
) : EntityBase;
) : EntityBase {
public NodeMessage(Guid machineId, NodeCommand message) : this(machineId, NewSortedKey, message) { }
};
public interface INodeMessageOperations : IOrm<NodeMessage> {
IAsyncEnumerable<NodeMessage> GetMessage(Guid machineId);
@ -214,7 +358,7 @@ public class NodeMessageOperations : Orm<NodeMessage>, INodeMessageOperations {
await foreach (var message in GetMessage(machineId)) {
var r = await Delete(message);
if (!r.IsOk) {
_log.Error($"failed to delete message for node {machineId} due to [{r.ErrorV.Item1}] {r.ErrorV.Item2}");
_log.WithHttpStatus(r.ErrorV).Error($"failed to delete message for node {machineId}");
}
}
}

View File

@ -55,7 +55,7 @@ public class ProxyOperations : StatefulOrm<Proxy, VmState>, IProxyOperations {
}
_logTracer.Info($"creating proxy: region:{region}");
var newProxy = new Proxy(region, Guid.NewGuid(), DateTimeOffset.UtcNow, VmState.Init, Auth.BuildAuth(), null, null, _config.OneFuzzVersion, null, false);
var newProxy = new Proxy(region, Guid.NewGuid(), DateTimeOffset.UtcNow, VmState.Init, Auth.BuildAuth(), null, null, _config.OneFuzzVersion.ToString(), null, false);
await Replace(newProxy);
await _events.SendEvent(new EventProxyCreated(region, newProxy.ProxyId));

View File

@ -12,6 +12,7 @@ public interface IScalesetOperations : IOrm<Scaleset> {
public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState>, IScalesetOperations {
const string SCALESET_LOG_PREFIX = "scalesets: ";
ILogTracer _log;
IPoolOperations _poolOps;
IEvents _events;
@ -19,8 +20,21 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState>, IScalese
IVmssOperations _vmssOps;
IQueue _queue;
INodeOperations _nodeOps;
IServiceConfig _serviceConfig;
ICreds _creds;
public ScalesetOperations(IStorage storage, ILogTracer log, IServiceConfig config, IPoolOperations poolOps, IEvents events, IExtensions extensions, IVmssOperations vmssOps, IQueue queue, INodeOperations nodeOps)
public ScalesetOperations(
IStorage storage,
ILogTracer log,
IServiceConfig config,
IPoolOperations poolOps,
IEvents events,
IExtensions extensions,
IVmssOperations vmssOps,
IQueue queue,
INodeOperations nodeOps,
ICreds creds
)
: base(storage, log, config) {
_log = log;
_poolOps = poolOps;
@ -29,6 +43,8 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState>, IScalese
_vmssOps = vmssOps;
_queue = queue;
_nodeOps = nodeOps;
_serviceConfig = config;
_creds = creds;
}
public IAsyncEnumerable<Scaleset> Search() {
@ -105,19 +121,28 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState>, IScalese
}
}
public async Async.Task Halt(Scaleset scaleset) {
var shrinkQueue = new ShrinkQueue(scaleset.ScalesetId, _queue, _log);
await shrinkQueue.Delete();
await foreach (var node in _nodeOps.SearchStates(scaleSetId: scaleset.ScalesetId)) {
_log.Info($"{SCALESET_LOG_PREFIX} deleting node scaleset_id {scaleset.ScalesetId} machine_id {node.MachineId}");
await _nodeOps.Delete(node);
}
//_nodeOps.
_log.Info($"{SCALESET_LOG_PREFIX} scaleset delete starting: scaleset_id:{scaleset.ScalesetId}");
if (await _vmssOps.DeleteVmss(scaleset.ScalesetId)) {
_log.Info($"{SCALESET_LOG_PREFIX}scaleset deleted: scaleset_id {scaleset.ScalesetId}");
var r = await Delete(scaleset);
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error($"Failed to delete scaleset record {scaleset.ScalesetId}");
}
} else {
var r = await Replace(scaleset);
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error($"Failed to save scaleset record {scaleset.ScalesetId}");
}
}
}
@ -131,14 +156,178 @@ public class ScalesetOperations : StatefulOrm<Scaleset, ScalesetState>, IScalese
if (scaleSet.State == ScalesetState.Halt) {
_log.Info($"{SCALESET_LOG_PREFIX} halting scaleset scaleset_id {scaleSet.ScalesetId}");
await Halt(scaleSet);
return true;
}
var pool = await _poolOps.GetByName(scaleSet.PoolName);
if (!pool.IsOk) {
_log.Error($"unable to find pool during cleanup {scaleSet.ScalesetId} - {scaleSet.PoolName}");
await SetFailed(scaleSet, pool.ErrorV!);
return true;
}
await _nodeOps.ReimageLongLivedNodes(scaleSet.ScalesetId);
//ground truth of existing nodes
var azureNodes = await _vmssOps.ListInstanceIds(scaleSet.ScalesetId);
var nodes = _nodeOps.SearchStates(scaleSetId: scaleSet.ScalesetId);
//# Nodes do not exists in scalesets but in table due to unknown failure
await foreach (var node in nodes) {
if (!azureNodes.ContainsKey(node.MachineId)) {
_log.Info($"{SCALESET_LOG_PREFIX} no longer in scaleset. scaleset_id:{scaleSet.ScalesetId} machine_id:{node.MachineId}");
await _nodeOps.Delete(node);
}
}
//# Scalesets can have nodes that never check in (such as broken OS setup
//# scripts).
//
//# This will add nodes that Azure knows about but have not checked in
//# such that the `dead node` detection will eventually reimage the node.
//
//# NOTE: If node setup takes longer than NODE_EXPIRATION_TIME (1 hour),
//# this will cause the nodes to continuously get reimaged.
var nodeMachineIds = await nodes.Select(x => x.MachineId).ToHashSetAsync();
foreach (var azureNode in azureNodes) {
var machineId = azureNode.Key;
if (nodeMachineIds.Contains(machineId)) {
continue;
}
_log.Info($"{SCALESET_LOG_PREFIX} adding missing azure node. scaleset_id:{scaleSet.ScalesetId} machine_id:{machineId}");
//# Note, using `new=True` makes it such that if a node already has
//# checked in, this won't overwrite it.
//Python code does use created node
//pool.IsOk was handled above, OkV must be not null at this point
var _ = await _nodeOps.Create(pool.OkV!.PoolId, scaleSet.PoolName, machineId, scaleSet.ScalesetId, _config.OneFuzzVersion, true);
}
var existingNodes =
from x in nodes
where azureNodes.ContainsKey(x.MachineId)
select x;
var nodesToReset =
from x in existingNodes
where NodeStateHelper.ReadyForReset.Contains(x.State)
select x;
Dictionary<Guid, Node> toDelete = new();
Dictionary<Guid, Node> toReimage = new();
await foreach (var node in nodesToReset) {
if (node.DeleteRequested) {
toDelete[node.MachineId] = node;
} else {
if (await new ShrinkQueue(scaleSet.ScalesetId, _queue, _log).ShouldShrink()) {
await _nodeOps.SetHalt(node);
toDelete[node.MachineId] = node;
} else if (await new ShrinkQueue(pool.OkV!.PoolId, _queue, _log).ShouldShrink()) {
await _nodeOps.SetHalt(node);
toDelete[node.MachineId] = node;
} else {
toReimage[node.MachineId] = node;
}
}
}
var deadNodes = _nodeOps.GetDeadNodes(scaleSet.ScalesetId, INodeOperations.NODE_EXPIRATION_TIME);
await foreach (var deadNode in deadNodes) {
string errorMessage;
if (deadNode.Heartbeat is not null) {
errorMessage = "node reimaged due to expired hearbeat";
} else {
errorMessage = "node reimaged due to never receiving heartbeat";
}
var error = new Error(ErrorCode.TASK_FAILED, new[] { $"{errorMessage} scaleset_id {deadNode.ScalesetId} last heartbeat:{deadNode.Heartbeat}" });
await _nodeOps.MarkTasksStoppedEarly(deadNode, error);
await _nodeOps.ToReimage(deadNode, true);
toReimage[deadNode.MachineId] = deadNode;
}
// Perform operations until they fail due to scaleset getting locked
NodeDisposalStrategy strategy =
(_serviceConfig.OneFuzzNodeDisposalStrategy.ToLowerInvariant()) switch {
"decomission" => NodeDisposalStrategy.Decomission,
_ => NodeDisposalStrategy.ScaleIn
};
throw new NotImplementedException();
}
public async Async.Task ReimageNodes(Scaleset scaleSet, IEnumerable<Node> nodes, NodeDisposalStrategy disposalStrategy) {
if (nodes is null || !nodes.Any()) {
_log.Info($"{SCALESET_LOG_PREFIX} no nodes to reimage: scaleset_id: {scaleSet.ScalesetId}");
return;
}
if (scaleSet.State == ScalesetState.Shutdown) {
_log.Info($"{SCALESET_LOG_PREFIX} scaleset shutting down, deleting rather than reimaging nodes. scaleset_id: {scaleSet.ScalesetId}");
await DeleteNodes(scaleSet, nodes, disposalStrategy);
return;
}
if (scaleSet.State == ScalesetState.Halt) {
_log.Info($"{SCALESET_LOG_PREFIX} scaleset halting, ignoring node reimage: scaleset_id:{scaleSet.ScalesetId}");
return;
}
var machineIds = new HashSet<Guid>();
foreach (var node in nodes) {
if (node.State == NodeState.Done) {
continue;
}
if (node.DebugKeepNode) {
_log.Warning($"{SCALESET_LOG_PREFIX} not reimaging manually overriden node. scaleset_id:{scaleSet.ScalesetId} machine_id:{node.MachineId}");
} else {
machineIds.Add(node.MachineId);
}
}
if (!machineIds.Any()) {
_log.Info($"{SCALESET_LOG_PREFIX} no nodes to reimage: {scaleSet.ScalesetId}");
return;
}
throw new NotImplementedException();
}
public async Async.Task DeleteNodes(Scaleset scaleSet, IEnumerable<Node> nodes, NodeDisposalStrategy disposalStrategy) {
if (nodes is null || !nodes.Any()) {
_log.Info($"{SCALESET_LOG_PREFIX} no nodes to delete: scaleset_id: {scaleSet.ScalesetId}");
return;
}
foreach (var node in nodes) {
await _nodeOps.SetHalt(node);
}
if (scaleSet.State == ScalesetState.Halt) {
_log.Info($"{SCALESET_LOG_PREFIX} scaleset halting, ignoring deletion {scaleSet.ScalesetId}");
return;
}
HashSet<Guid> machineIds = new();
foreach (var node in nodes) {
if (node.DebugKeepNode) {
_log.Warning($"{SCALESET_LOG_PREFIX} not deleting manually overriden node. scaleset_id:{scaleSet.ScalesetId} machine_id:{node.MachineId}");
} else {
machineIds.Add(node.MachineId);
}
}
throw new NotImplementedException();
}
}

View File

@ -57,7 +57,7 @@ public class Scheduler : IScheduler {
}
}
private async System.Threading.Tasks.Task<bool> ScheduleWorkset(WorkSet workSet, Pool pool, int count) {
private async Async.Task<bool> ScheduleWorkset(WorkSet workSet, Pool pool, int count) {
if (!PoolStateHelper.Available().Contains(pool.State)) {
_logTracer.Info($"pool not available for work: {pool.Name} state: {pool.State}");
return false;
@ -120,7 +120,7 @@ public class Scheduler : IScheduler {
record BucketConfig(int count, bool reboot, Container setupContainer, string? setupScript, Pool pool);
private async System.Threading.Tasks.Task<(BucketConfig, WorkUnit)?> BuildWorkunit(Task task) {
private async Async.Task<(BucketConfig, WorkUnit)?> BuildWorkunit(Task task) {
Pool? pool = await _taskOperations.GetPool(task);
if (pool == null) {
_logTracer.Info($"unable to find pool for task: {task.TaskId}");

View File

@ -51,8 +51,10 @@ public class ShrinkQueue {
i++;
}
}
}
public async Async.Task<bool> ShouldShrink() {
return await _queueOps.RemoveFirstMessage(QueueName, StorageType.Config);
}
}

View File

@ -18,8 +18,8 @@ public interface ITaskOperations : IStatefulOrm<Task, TaskState> {
Async.Task<TaskVm?> GetReproVmConfig(Task task);
Async.Task<bool> CheckPrereqTasks(Task task);
System.Threading.Tasks.Task<Pool?> GetPool(Task task);
System.Threading.Tasks.Task<Task> SetState(Task task, TaskState state);
Async.Task<Pool?> GetPool(Task task);
Async.Task<Task> SetState(Task task, TaskState state);
}
public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations {
@ -222,7 +222,7 @@ public class TaskOperations : StatefulOrm<Task, TaskState>, ITaskOperations {
return true;
}
public async System.Threading.Tasks.Task<Pool?> GetPool(Task task) {
public async Async.Task<Pool?> GetPool(Task task) {
if (task.Config.Pool != null) {
var pool = await _poolOperations.GetByName(task.Config.Pool.PoolName);
if (!pool.IsOk) {

View File

@ -1,12 +1,18 @@
using Azure;
using Azure.ResourceManager.Compute;
using Azure.ResourceManager.Compute.Models;
using Microsoft.Rest.Azure;
namespace Microsoft.OneFuzz.Service;
public interface IVmssOperations {
public Async.Task<OneFuzzResultVoid> UpdateExtensions(Guid name, IList<VirtualMachineScaleSetExtensionData> extensions);
Async.Task<OneFuzzResultVoid> UpdateExtensions(Guid name, IList<VirtualMachineScaleSetExtensionData> extensions);
Async.Task<VirtualMachineScaleSetData> GetVmss(Guid name);
Async.Task<bool> DeleteVmss(Guid name, bool? forceDeletion = null);
Async.Task<IDictionary<Guid, string>> ListInstanceIds(Guid name);
}
public class VmssOperations : IVmssOperations {
@ -19,6 +25,19 @@ public class VmssOperations : IVmssOperations {
_creds = creds;
}
public async Async.Task<bool> DeleteVmss(Guid name, bool? forceDeletion = null) {
var r = GetVmssResource(name);
var result = await r.DeleteAsync(WaitUntil.Started, forceDeletion: forceDeletion);
var raw = result.GetRawResponse();
if (raw.IsError) {
_log.WithHttpStatus((raw.Status, raw.ReasonPhrase)).Error($"Failed to delete vmss: {name}");
return false;
} else {
return true;
}
}
private VirtualMachineScaleSetResource GetVmssResource(Guid name) {
var resourceGroup = _creds.GetBaseResourceGroup();
var id = VirtualMachineScaleSetResource.CreateResourceIdentifier(_creds.GetSubscription(), resourceGroup, name.ToString());
@ -63,4 +82,91 @@ public class VmssOperations : IVmssOperations {
return OneFuzzResultVoid.Error(canUpdate.ErrorV);
}
}
public async Async.Task<IDictionary<Guid, string>> ListInstanceIds(Guid name) {
_log.Verbose($"get instance IDs for scaleset {name}");
var results = new Dictionary<Guid, string>();
var res = GetVmssResource(name);
if (res is null) {
_log.Verbose($"vm does not exist {name}");
return results;
} else {
try {
await foreach (var instance in res.GetVirtualMachineScaleSetVms().AsAsyncEnumerable()) {
if (instance is not null) {
Guid key;
if (Guid.TryParse(instance.Data.VmId, out key)) {
results[key] = instance.Data.InstanceId;
} else {
_log.Error($"failed to convert vmId {instance.Data.VmId} to Guid");
}
}
}
} catch (CloudException ex) {
_log.Exception(ex, $"vm does not exist {name}");
}
}
return results;
}
public async Async.Task<OneFuzzResult<VirtualMachineScaleSetVmResource>> GetInstanceVm(Guid name, Guid vmId) {
_log.Info($"get instance ID for scaleset node: {name}:{vmId}");
var scaleSet = GetVmssResource(name);
try {
await foreach (var vm in scaleSet.GetVirtualMachineScaleSetVms().AsAsyncEnumerable()) {
var response = await vm.GetAsync();
if (!response.Value.HasData) {
return OneFuzzResult<VirtualMachineScaleSetVmResource>.Error(ErrorCode.UNABLE_TO_FIND, $"failed to get vm data");
}
if (response.Value.Data.VmId == vmId.ToString()) {
return OneFuzzResult<VirtualMachineScaleSetVmResource>.Ok(response);
}
}
} catch (CloudException ex) {
_log.Exception(ex, $"unable to find vm instance: {name}:{vmId}");
return OneFuzzResult<VirtualMachineScaleSetVmResource>.Error(ErrorCode.UNABLE_TO_FIND, $"unable to find vm instance: {name}:{vmId}");
}
return OneFuzzResult<VirtualMachineScaleSetVmResource>.Error(ErrorCode.UNABLE_TO_FIND, $"unable to find scaleset machine: {name}:{vmId}");
}
public async Async.Task<OneFuzzResult<string>> GetInstanceId(Guid name, Guid vmId) {
var vm = await GetInstanceVm(name, vmId);
if (vm.IsOk) {
return OneFuzzResult<string>.Ok(vm.OkV!.Data.InstanceId);
} else {
return OneFuzzResult<string>.Error(vm.ErrorV);
}
}
public async Async.Task<OneFuzzResultVoid> UpdateScaleInProtection(Guid name, Guid vmId, bool protectFromScaleIn) {
var res = await GetInstanceVm(name, vmId);
if (!res.IsOk) {
return OneFuzzResultVoid.Error(res.ErrorV);
} else {
VirtualMachineScaleSetVmProtectionPolicy newProtectionPolicy;
var instanceVm = res.OkV!;
if (instanceVm.Data.ProtectionPolicy is not null) {
newProtectionPolicy = instanceVm.Data.ProtectionPolicy;
newProtectionPolicy.ProtectFromScaleIn = protectFromScaleIn;
} else {
newProtectionPolicy = new VirtualMachineScaleSetVmProtectionPolicy() { ProtectFromScaleIn = protectFromScaleIn };
}
instanceVm.Data.ProtectionPolicy = newProtectionPolicy;
var scaleSet = GetVmssResource(name);
VirtualMachineScaleSetVmInstanceRequiredIds ids = new VirtualMachineScaleSetVmInstanceRequiredIds(new[] { instanceVm.Data.InstanceId });
var updateRes = await scaleSet.UpdateInstancesAsync(WaitUntil.Started, ids);
//TODO: finish this after UpdateInstance method is fixed
//https://github.com/Azure/azure-sdk-for-net/issues/28491
throw new NotImplementedException("Update instance does not work as expected. See https://github.com/Azure/azure-sdk-for-net/issues/28491");
}
}
}

View File

@ -11,6 +11,10 @@ namespace Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
public abstract record EntityBase {
[JsonIgnore] public ETag? ETag { get; set; }
public DateTimeOffset? TimeStamp { get; set; }
// https://docs.microsoft.com/en-us/rest/api/storageservices/designing-a-scalable-partitioning-strategy-for-azure-table-storage#yyy
// Produce "good-quality-table-key" based on a DateTimeOffset timestamp
public static string NewSortedKey => $"{DateTimeOffset.MaxValue.Ticks - DateTimeOffset.UtcNow.Ticks}";
}
public abstract record StatefulEntityBase<T>([property: JsonIgnore] T State) : EntityBase() where T : Enum;

View File

@ -1,17 +1,34 @@
namespace ApiService.OneFuzzLib.Orm {
using System.Text.Json;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace ApiService.OneFuzzLib.Orm {
public static class Query {
public static string Or(IEnumerable<string> queries) {
return string.Join(" or ", queries.Select(x => $"({x})"));
}
public static string Or(string q1, string q2) {
return Or(new[] { q1, q2 });
}
public static string And(IEnumerable<string> queries) {
return string.Join(" and ", queries.Select(x => $"({x})"));
}
public static string And(string q1, string q2) {
return And(new[] { q1, q2 });
}
public static string EqualAny(string property, IEnumerable<string> values) {
return Or(values.Select(x => $"{property} eq '{x}'"));
}
public static string EqualAnyEnum<T>(string property, IEnumerable<T> enums) where T : Enum {
IEnumerable<string> convertedEnums = enums.Select(x => JsonSerializer.Serialize(x, EntityConverter.GetJsonSerializerOptions()).Trim('"'));
return Query.EqualAny(property, convertedEnums);
}
}
}

View File

@ -31,6 +31,7 @@ namespace Tests {
Arb.Generate<EventScalesetResizeScheduled>().Select(e => e as BaseEvent),
Arb.Generate<EventScalesetStateUpdated>().Select(e => e as BaseEvent),
Arb.Generate<EventNodeDeleted>().Select(e => e as BaseEvent),
Arb.Generate<EventNodeCreated>().Select(e => e as BaseEvent),
});
}
@ -40,6 +41,14 @@ namespace Tests {
);
}
public static Gen<Version> Version() {
//OneFuzz version uses 3 number version
return Arb.Generate<Tuple<UInt16, UInt16, UInt16>>().Select(
arg =>
new Version(arg.Item1, arg.Item2, arg.Item3)
);
}
public static Gen<WebhookMessageLog> WebhookMessageLog() {
return Arb.Generate<Tuple<Tuple<Guid, BaseEvent, Guid, string, Guid>, Tuple<WebhookMessageState, int>>>().Select(
arg => new WebhookMessageLog(
@ -321,6 +330,11 @@ namespace Tests {
}
public class OrmArb {
public static Arbitrary<Version> Vresion() {
return Arb.From(OrmGenerators.Version());
}
public static Arbitrary<Uri> Uri() {
return Arb.From(OrmGenerators.Uri());
}
@ -805,6 +819,3 @@ namespace Tests {
}
}

View File

@ -9,24 +9,25 @@ namespace Tests {
[Fact]
public void NodeOperationsSearchStatesQuery() {
var ver = "1.2.3";
var query1 = NodeOperations.SearchStatesQuery("1.2.3");
var query1 = NodeOperations.SearchStatesQuery(ver);
Assert.Equal("(not (version eq '1.2.3'))", query1);
var query2 = NodeOperations.SearchStatesQuery("1.2.3", poolId: Guid.Parse("3b0426d3-9bde-4ae8-89ac-4edf0d3b3618"));
var query2 = NodeOperations.SearchStatesQuery(ver, poolId: Guid.Parse("3b0426d3-9bde-4ae8-89ac-4edf0d3b3618"));
Assert.Equal("((pool_id eq '3b0426d3-9bde-4ae8-89ac-4edf0d3b3618')) and (not (version eq '1.2.3'))", query2);
var query3 = NodeOperations.SearchStatesQuery("1.2.3", scaleSetId: Guid.Parse("4c96dd6b-9bdb-4758-9720-1010c244fa4b"));
var query3 = NodeOperations.SearchStatesQuery(ver, scaleSetId: Guid.Parse("4c96dd6b-9bdb-4758-9720-1010c244fa4b"));
Assert.Equal("((scaleset_id eq '4c96dd6b-9bdb-4758-9720-1010c244fa4b')) and (not (version eq '1.2.3'))", query3);
var query4 = NodeOperations.SearchStatesQuery("1.2.3", states: new[] { NodeState.Free, NodeState.Done, NodeState.Ready });
var query4 = NodeOperations.SearchStatesQuery(ver, states: new[] { NodeState.Free, NodeState.Done, NodeState.Ready });
Assert.Equal("(((state eq 'free') or (state eq 'done') or (state eq 'ready'))) and (not (version eq '1.2.3'))", query4);
var query5 = NodeOperations.SearchStatesQuery("1.2.3", excludeUpdateScheduled: true);
var query5 = NodeOperations.SearchStatesQuery(ver, excludeUpdateScheduled: true);
Assert.Equal("(reimage_requested eq false) and (delete_requested eq false) and (not (version eq '1.2.3'))", query5);
var query7 = NodeOperations.SearchStatesQuery(
"1.2.3",
ver,
poolId: Guid.Parse("3b0426d3-9bde-4ae8-89ac-4edf0d3b3618"),
scaleSetId: Guid.Parse("4c96dd6b-9bdb-4758-9720-1010c244fa4b"),
states: new[] { NodeState.Free, NodeState.Done, NodeState.Ready },