Initial Custom Metrics - Node & Task Heartbeat (#3082)

* Refactor logging to use  and new function queue

* Testing setup of custom metric.

* Changing host.json

* Updating log interface.

* changes.

* Fix encoding.

* Updating.

* Updating tests.

* Adding metrics to program

* Pushing latest changes.

* Update interface references.

* Removing string type.

* Add string back.

* Getting additional data for task heartbeat.

* Removing additional fields.

* Removing containers.

* Cleaning up.

* Adding feature flag.

* Adding bicep changes.

* Fixing tests.

* Fixing test metrics.

* Removing most of tests.

* Telemetry Refact Round 2.

* Updated metrics.

* Remove custom metric function.

* Syncing events.cs

* Making optional.

* Using events as metric dimensions.

* Fixing ORM tests.

* Remove metric records.

* Removing bad test.

* Remove testmetrics.'

* Adding test back.

* Improving custom dimensions serialization.

* Update src/ApiService/ApiService/onefuzzlib/Metrics.cs

Co-authored-by: Cheick Keita <kcheick@gmail.com>

* Reverting change.

---------

Co-authored-by: Cheick Keita <kcheick@gmail.com>
This commit is contained in:
Noah McGregor Harper
2023-05-16 13:17:28 -07:00
committed by GitHub
parent 64782d7e9b
commit d84b72b5fd
16 changed files with 156 additions and 18 deletions

View File

@ -4,4 +4,5 @@ public static class FeatureFlagConstants {
public const string RenderOnlyScribanTemplates = "RenderOnlyScribanTemplates";
public const string EnableNodeDecommissionStrategy = "EnableNodeDecommissionStrategy";
public const string SemanticNotificationConfigValidation = "SemanticNotificationConfigValidation";
public const string EnableCustomMetricTelemetry = "EnableCustomMetricTelemetry";
}

View File

@ -17,10 +17,12 @@ public class QueueNodeHearbeat {
[Function("QueueNodeHeartbeat")]
public async Async.Task Run([QueueTrigger("node-heartbeat", Connection = "AzureWebJobsStorage")] string msg) {
_log.Info($"heartbeat: {msg}");
var nodes = _context.NodeOperations;
var events = _context.Events;
var metrics = _context.Metrics;
_log.Info($"heartbeat: {msg}");
var hb = JsonSerializer.Deserialize<NodeHeartbeatEntry>(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}");
var node = await nodes.GetByMachineId(hb.NodeId);
@ -35,7 +37,12 @@ public class QueueNodeHearbeat {
_log.WithHttpStatus(r.ErrorV).Error($"Failed to replace heartbeat: {hb.NodeId:Tag:NodeId}");
}
var nodeHeartbeatEvent = new EventNodeHeartbeat(node.MachineId, node.ScalesetId, node.PoolName, node.State);
// TODO: do we still send event if we fail do update the table ?
await events.SendEvent(new EventNodeHeartbeat(node.MachineId, node.ScalesetId, node.PoolName));
await events.SendEvent(nodeHeartbeatEvent);
if (await _context.FeatureManagerSnapshot.IsEnabledAsync(FeatureFlagConstants.EnableCustomMetricTelemetry)) {
metrics.SendMetric(1, nodeHeartbeatEvent);
}
}
}

View File

@ -7,34 +7,45 @@ namespace Microsoft.OneFuzz.Service.Functions;
public class QueueTaskHearbeat {
private readonly ILogTracer _log;
private readonly IOnefuzzContext _context;
private readonly IEvents _events;
private readonly ITaskOperations _tasks;
public QueueTaskHearbeat(ILogTracer logTracer, ITaskOperations tasks, IEvents events) {
public QueueTaskHearbeat(ILogTracer logTracer, IOnefuzzContext context) {
_log = logTracer;
_tasks = tasks;
_events = events;
_context = context;
}
[Function("QueueTaskHeartbeat")]
public async Async.Task Run([QueueTrigger("task-heartbeat", Connection = "AzureWebJobsStorage")] string msg) {
_log.Info($"heartbeat: {msg}");
var _tasks = _context.TaskOperations;
var _jobs = _context.JobOperations;
var _events = _context.Events;
var _metrics = _context.Metrics;
_log.Info($"heartbeat: {msg}");
var hb = JsonSerializer.Deserialize<TaskHeartbeatEntry>(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}");
var task = await _tasks.GetByTaskId(hb.TaskId);
if (task == null) {
_log.Warning($"invalid {hb.TaskId:Tag:TaskId}");
return;
}
var job = await _jobs.Get(task.JobId);
if (job == null) {
_log.Warning($"invalid {task.JobId:Tag:JobId}");
return;
}
var newTask = task with { Heartbeat = DateTimeOffset.UtcNow };
var r = await _tasks.Replace(newTask);
if (!r.IsOk) {
_log.WithHttpStatus(r.ErrorV).Error($"failed to replace with new task {hb.TaskId:Tag:TaskId}");
}
await _events.SendEvent(new EventTaskHeartbeat(newTask.JobId, newTask.TaskId, newTask.Config));
var taskHeartBeatEvent = new EventTaskHeartbeat(newTask.JobId, newTask.TaskId, job.Config.Project, job.Config.Name, newTask.State, newTask.Config);
await _events.SendEvent(taskHeartBeatEvent);
if (await _context.FeatureManagerSnapshot.IsEnabledAsync(FeatureFlagConstants.EnableCustomMetricTelemetry)) {
_metrics.SendMetric(1, taskHeartBeatEvent);
}
}
}

View File

@ -57,6 +57,8 @@ public struct LogStringHandler {
public interface ILog {
void Log(Guid correlationId, LogStringHandler message, SeverityLevel level, IReadOnlyDictionary<string, string> tags, string? caller);
void LogEvent(Guid correlationId, LogStringHandler evt, IReadOnlyDictionary<string, string> tags, IReadOnlyDictionary<string, double>? metrics, string? caller);
void LogMetric(Guid correlationId, LogStringHandler metric, int value, IReadOnlyDictionary<string, string>? customDimensions, IReadOnlyDictionary<string, string> tags, string? caller);
void LogException(Guid correlationId, Exception ex, LogStringHandler message, IReadOnlyDictionary<string, string> tags, IReadOnlyDictionary<string, double>? metrics, string? caller);
void Flush();
}
@ -103,6 +105,17 @@ sealed class AppInsights : ILog {
_telemetryClient.TrackEvent(telemetry);
}
public void LogMetric(Guid correlationId, LogStringHandler metric, int value, IReadOnlyDictionary<string, string>? customDimensions, IReadOnlyDictionary<string, string> tags, string? caller) {
var telemetry = new MetricTelemetry(metric.ToString(), value, value, value, value, value);
// copy properties
Copy(telemetry.Properties, customDimensions);
telemetry.Properties["CorrelationId"] = correlationId.ToString();
if (caller is not null) telemetry.Properties["CalledBy"] = caller;
Copy(telemetry.Properties, metric.Tags);
_telemetryClient.TrackMetric(telemetry);
}
public void LogException(Guid correlationId, Exception ex, LogStringHandler message, IReadOnlyDictionary<string, string> tags, IReadOnlyDictionary<string, double>? metrics, string? caller) {
{
var telemetry = new ExceptionTelemetry(ex);
@ -160,11 +173,17 @@ sealed class Console : ILog {
}
}
public void LogMetric(Guid correlationId, LogStringHandler metric, int value, IReadOnlyDictionary<string, string>? customDimensions, IReadOnlyDictionary<string, string> tags, string? caller) {
System.Console.Out.WriteLine($"[{correlationId}][Metric] {metric}");
LogTags(correlationId, tags);
}
public void LogEvent(Guid correlationId, LogStringHandler evt, IReadOnlyDictionary<string, string> tags, IReadOnlyDictionary<string, double>? metrics, string? caller) {
System.Console.Out.WriteLine($"[{correlationId}][Event] {evt}");
LogTags(correlationId, tags);
LogMetrics(correlationId, metrics);
}
public void LogException(Guid correlationId, Exception ex, LogStringHandler message, IReadOnlyDictionary<string, string> tags, IReadOnlyDictionary<string, double>? metrics, string? caller) {
System.Console.Out.WriteLine($"[{correlationId}][Exception] {message}:{ex}");
LogTags(correlationId, tags);
@ -183,6 +202,7 @@ public interface ILogTracer {
void Error(Error error);
void Event(LogStringHandler evt, IReadOnlyDictionary<string, double>? metrics = null);
void Metric(LogStringHandler metric, int value, IReadOnlyDictionary<string, string>? customDimensions);
void Exception(Exception ex, LogStringHandler message = $"", IReadOnlyDictionary<string, double>? metrics = null);
void ForceFlush();
void Info(LogStringHandler message);
@ -327,6 +347,13 @@ public class LogTracer : ILogTracerInternal {
}
}
public void Metric(LogStringHandler metric, int value, IReadOnlyDictionary<string, string>? customDimensions) {
var caller = GetCaller();
foreach (var logger in _loggers) {
logger.LogMetric(CorrelationId, metric, value, customDimensions, Tags, caller);
}
}
public void Exception(Exception ex, LogStringHandler message, IReadOnlyDictionary<string, double>? metrics) {
var caller = GetCaller();
foreach (var logger in _loggers) {

View File

@ -167,12 +167,14 @@ public record EventTaskStateUpdated(
TaskConfig Config
) : BaseEvent();
[EventType(EventType.TaskHeartbeat)]
public record EventTaskHeartbeat(
Guid JobId,
Guid TaskId,
TaskConfig Config
string? Project,
string? Name,
TaskState? State,
TaskConfig? Config
) : BaseEvent();
[EventType(EventType.Ping)]
@ -273,7 +275,8 @@ public record EventNodeCreated(
public record EventNodeHeartbeat(
Guid MachineId,
Guid? ScalesetId,
PoolName PoolName
PoolName PoolName,
NodeState state
) : BaseEvent();

View File

@ -85,6 +85,7 @@ public class Program {
})
.AddScoped<IAutoScaleOperations, AutoScaleOperations>()
.AddScoped<INodeOperations, NodeOperations>()
.AddScoped<IMetrics, Metrics>()
.AddScoped<IEvents, Events>()
.AddScoped<IWebhookOperations, WebhookOperations>()
.AddScoped<IWebhookMessageLogOperations, WebhookMessageLogOperations>()

View File

@ -3,8 +3,8 @@
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
"isEnabled": false,
"excludedTypes": "Request;Trace;Dependency;Event;Exception"
}
}
}

View File

@ -21,7 +21,6 @@ namespace Microsoft.OneFuzz.Service {
public interface IEvents {
Async.Task SendEvent(BaseEvent anEvent);
Async.Task QueueSignalrEvent(DownloadableEventMessage message);
void LogEvent(BaseEvent anEvent);

View File

@ -0,0 +1,54 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service {
public record CustomMetric(
string name,
int value,
Dictionary<string, string> customDimensions
);
public interface IMetrics {
void SendMetric(int metricValue, BaseEvent customDimensions);
void LogMetric(BaseEvent metric);
}
public class Metrics : IMetrics {
private readonly ILogTracer _log;
private readonly IOnefuzzContext _context;
private readonly JsonSerializerOptions _options;
public Metrics(ILogTracer log, IOnefuzzContext context) {
_context = context;
_log = log;
_options = new JsonSerializerOptions(EntityConverter.GetJsonSerializerOptions()) {
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
};
_options.Converters.Add(new RemoveUserInfo());
}
public void SendMetric(int metricValue, BaseEvent customDimensions) {
var metricType = customDimensions.GetEventType();
_ = _options.PropertyNamingPolicy ?? throw new ArgumentException("Serializer _options not available.");
var metricTypeSnakeCase = _options.PropertyNamingPolicy.ConvertName($"{metricType}");
var dimensionNode = JsonSerializer.SerializeToNode(customDimensions, customDimensions.GetType(), _options);
_ = dimensionNode ?? throw new JsonException("Was not able to properly serialize the custom dimensions.");
var dimensionDict = dimensionNode.AsObject().ToDictionary(kvp => kvp.Key.ToString(), kvp => kvp.Value is not null ? kvp.Value.ToString() : "");
_log.Metric($"{metricTypeSnakeCase}", metricValue, dimensionDict);
LogMetric(customDimensions);
}
public void LogMetric(BaseEvent metric) {
var serializedMetric = JsonSerializer.Serialize(metric, metric.GetType(), _options);
_log.Info($"sending metric: {metric.GetEventType():Tag:MetricType} - {serializedMetric}");
}
}
}

View File

@ -15,6 +15,7 @@ public interface IOnefuzzContext {
ICreds Creds { get; }
IDiskOperations DiskOperations { get; }
IEvents Events { get; }
IMetrics Metrics { get; }
IExtensions Extensions { get; }
IIpOperations IpOperations { get; }
IJobOperations JobOperations { get; }
@ -62,6 +63,7 @@ public class OnefuzzContext : IOnefuzzContext {
public IAutoScaleOperations AutoScaleOperations => _serviceProvider.GetRequiredService<IAutoScaleOperations>();
public INodeOperations NodeOperations => _serviceProvider.GetRequiredService<INodeOperations>();
public IEvents Events => _serviceProvider.GetRequiredService<IEvents>();
public IMetrics Metrics => _serviceProvider.GetRequiredService<IMetrics>();
public IWebhookOperations WebhookOperations => _serviceProvider.GetRequiredService<IWebhookOperations>();
public IWebhookMessageLogOperations WebhookMessageLogOperations => _serviceProvider.GetRequiredService<IWebhookMessageLogOperations>();
public ITaskOperations TaskOperations => _serviceProvider.GetRequiredService<ITaskOperations>();

View File

@ -45,6 +45,7 @@ public sealed class TestContext : IOnefuzzContext {
FeatureManagerSnapshot = new TestFeatureManagerSnapshot();
WebhookOperations = new TestWebhookOperations(httpClientFactory, logTracer, this);
Events = new TestEvents(logTracer, this);
Metrics = new TestMetrics(logTracer, this);
WebhookMessageLogOperations = new TestWebhookMessageLogOperations(logTracer, this);
}
@ -67,6 +68,7 @@ public sealed class TestContext : IOnefuzzContext {
// Implementations:
public IEvents Events { get; }
public IMetrics Metrics { get; }
public IServiceConfig ServiceConfiguration { get; }

View File

@ -0,0 +1,12 @@
using System.Collections.Generic;
using Microsoft.OneFuzz.Service;
namespace IntegrationTests.Fakes;
public sealed class TestMetrics : Metrics {
public List<BaseEvent> Metrics { get; } = new();
public List<EventMessage> CustomMetrics { get; } = new();
public TestMetrics(ILogTracer log, IOnefuzzContext context)
: base(log, context) { }
}

View File

@ -28,6 +28,11 @@ sealed class TestLogTracer : ILogTracer {
_output.WriteLine($"[Event] [{evt}]");
}
public void Metric(LogStringHandler metric, int value, IReadOnlyDictionary<string, string>? customDimensions) {
// TODO: metrics
_output.WriteLine($"[Event] [{metric}]");
}
public void Exception(Exception ex, LogStringHandler message = $"", IReadOnlyDictionary<string, double>? metrics = null) {
// TODO: metrics
_output.WriteLine($"[Error] {message} {ex}");

View File

@ -236,7 +236,7 @@ namespace Tests {
[Fact]
public void TestEventSerialization() {
var expectedEvent = new EventMessage(Guid.NewGuid(), EventType.NodeHeartbeat, new EventNodeHeartbeat(Guid.NewGuid(), Guid.NewGuid(), PoolName.Parse("test-Poool")), Guid.NewGuid(), "test", DateTime.UtcNow);
var expectedEvent = new EventMessage(Guid.NewGuid(), EventType.NodeHeartbeat, new EventNodeHeartbeat(Guid.NewGuid(), Guid.NewGuid(), PoolName.Parse("test-Poool"), NodeState.Busy), Guid.NewGuid(), "test", DateTime.UtcNow);
var serialized = JsonSerializer.Serialize(expectedEvent, EntityConverter.GetJsonSerializerOptions());
var actualEvent = JsonSerializer.Deserialize<EventMessage>((string)serialized, EntityConverter.GetJsonSerializerOptions());
Assert.Equal(expectedEvent, actualEvent);

View File

@ -37,4 +37,17 @@ resource validateNotificationConfigSemantics 'Microsoft.AppConfiguration/configu
}
}
resource enableCustomMetricFeatureFlag 'Microsoft.AppConfiguration/configurationStores/keyValues@2021-10-01-preview' = {
parent: featureFlags
name: '.appconfig.featureflag~2FEnableCustomMetricTelemetry'
properties: {
value: string({
id: 'EnableCustomMetricTelemetry'
description: 'Allow custom metrics to be sent.'
enabled: false
})
contentType: 'application/vnd.microsoft.appconfig.ff+json;charset=utf-8'
}
}
output AppConfigEndpoint string = 'https://${appConfigName}.azconfig.io'

View File

@ -33,6 +33,7 @@ var storageAccountFuncQueuesParams = [
'update-queue'
'webhooks'
'signalr-events'
'custom-metrics'
]
var fileChangesQueueIndex = 0