Rework events (#3069)

* Start reworking events

* Trying things out

* .

* Add versioning, create events container in deploy script

* Improve gitignore

* Making the types line up

* Lets see how it works

* .

* Add CLI for querying event

* Linting

* Update src/ApiService/ApiService/Functions/Events.cs

Co-authored-by: George Pollard <porges@porg.es>

* almost done testing

* Added test

---------

Co-authored-by: George Pollard <porges@porg.es>
This commit is contained in:
Teo Voinea
2023-04-27 16:10:39 -04:00
committed by GitHub
parent 4e307334ee
commit 333702aa1e
29 changed files with 433 additions and 104 deletions

6
.gitignore vendored
View File

@ -19,4 +19,8 @@ lcov.info
/src/ApiService/ApiService/Properties/PublishProfiles/*
/src/ApiService/ApiService/Properties/ServiceDependencies/*
.vs
.vs
**/__azurite_db_*.json
**/__blobstorage__
**/__queuestorage__

View File

@ -0,0 +1,39 @@
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
namespace Microsoft.OneFuzz.Service.Functions;
public class EventsFunction {
private readonly ILogTracer _log;
private readonly IEndpointAuthorization _auth;
private readonly IOnefuzzContext _context;
public EventsFunction(ILogTracer log, IEndpointAuthorization auth, IOnefuzzContext context) {
_auth = auth;
_context = context;
_log = log;
}
[Function("Events")]
public Async.Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Anonymous, "GET")] HttpRequestData req)
=> _auth.CallIfUser(req, r => r.Method switch {
"GET" => Get(r),
_ => throw new NotSupportedException(),
});
private async Async.Task<HttpResponseData> Get(HttpRequestData req) {
var request = await RequestHandling.ParseRequest<EventsGet>(req);
if (!request.IsOk) {
return await _context.RequestHandling.NotOk(req, request.ErrorV, "events get");
}
var eventsGet = request.OkV;
var requestedEvent = await _context.Events.GetDownloadableEvent(eventsGet.EventId);
if (!requestedEvent.IsOk) {
return await _context.RequestHandling.NotOk(req, requestedEvent.ErrorV, "events get");
}
return await RequestHandling.Ok(req, new EventGetResponse(requestedEvent.OkV));
}
}

View File

@ -36,6 +36,8 @@ public enum ErrorCode {
ADO_VALIDATION_INVALID_FIELDS = 479,
GITHUB_VALIDATION_INVALID_PAT = 480,
GITHUB_VALIDATION_INVALID_REPOSITORY = 481,
UNEXPECTED_DATA_SHAPE = 482,
UNABLE_TO_SEND = 483,
}
public enum VmState {

View File

@ -50,13 +50,11 @@ public enum EventType {
}
public abstract record BaseEvent() {
private static readonly IReadOnlyDictionary<Type, EventType> typeToEvent;
private static readonly IReadOnlyDictionary<EventType, Type> eventToType;
private static readonly IReadOnlyDictionary<Type, EventType> _typeToEvent;
private static readonly IReadOnlyDictionary<EventType, Type> _eventToType;
static BaseEvent() {
EventType ExtractEventType(Type type) {
static EventType ExtractEventType(Type type) {
var attr = type.GetCustomAttribute<EventTypeAttribute>();
if (attr is null) {
throw new InvalidOperationException($"Type {type} is missing {nameof(EventTypeAttribute)}");
@ -64,16 +62,16 @@ public abstract record BaseEvent() {
return attr.EventType;
}
typeToEvent =
_typeToEvent =
typeof(BaseEvent).Assembly.GetTypes()
.Where(t => t.IsSubclassOf(typeof(BaseEvent)))
.ToDictionary(x => x, ExtractEventType);
eventToType = typeToEvent.ToDictionary(x => x.Value, x => x.Key);
_eventToType = _typeToEvent.ToDictionary(x => x.Value, x => x.Key);
// check that all event types are accounted for
var allEventTypes = Enum.GetValues<EventType>();
var missingEventTypes = allEventTypes.Except(eventToType.Keys).ToList();
var missingEventTypes = allEventTypes.Except(_eventToType.Keys).ToList();
if (missingEventTypes.Any()) {
throw new InvalidOperationException($"Missing event types: {string.Join(", ", missingEventTypes)}");
}
@ -82,7 +80,7 @@ public abstract record BaseEvent() {
public EventType GetEventType() {
var type = this.GetType();
if (typeToEvent.TryGetValue(type, out var eventType)) {
if (_typeToEvent.TryGetValue(type, out var eventType)) {
return eventType;
}
@ -90,7 +88,7 @@ public abstract record BaseEvent() {
}
public static Type GetTypeInfo(EventType eventType) {
if (eventToType.TryGetValue(eventType, out var type)) {
if (_eventToType.TryGetValue(eventType, out var type)) {
return type;
}
@ -350,6 +348,14 @@ public record EventNotificationFailed(
Error? Error
) : BaseEvent();
public record DownloadableEventMessage : EventMessage {
public Uri SasUrl { get; init; }
public DownloadableEventMessage(Guid EventId, EventType EventType, BaseEvent Event, Guid InstanceId, string InstanceName, DateTime CreatedAt, Uri SasUrl)
: base(EventId, EventType, Event, InstanceId, InstanceName, CreatedAt) {
this.SasUrl = SasUrl;
}
}
public record EventMessage(
Guid EventId,
EventType EventType,
@ -357,7 +363,9 @@ public record EventMessage(
[property: JsonConverter(typeof(BaseEventConverter))]
BaseEvent Event,
Guid InstanceId,
String InstanceName
String InstanceName,
DateTime CreatedAt,
String Version = "1.0"
);
public class BaseEventConverter : JsonConverter<BaseEvent> {

View File

@ -328,3 +328,7 @@ public record TemplateValidationPost(
public record JinjaToScribanMigrationPost(
bool DryRun = false
) : BaseRequest;
public record EventsGet(
[property: Required] Guid EventId
) : BaseRequest;

View File

@ -206,6 +206,10 @@ public record JinjaToScribanMigrationDryRunResponse(
List<Guid> NotificationIdsToUpdate
) : BaseResponse();
public record EventGetResponse(
DownloadableEventMessage Event
) : BaseResponse();
public record NotificationTestResponse(
bool Success,
string? Error = null

View File

@ -19,7 +19,9 @@ public record WebhookMessage(Guid EventId,
BaseEvent Event,
Guid InstanceId,
String InstanceName,
Guid WebhookId) : EventMessage(EventId, EventType, Event, InstanceId, InstanceName);
Guid WebhookId,
DateTime CreatedAt,
Uri SasUrl) : DownloadableEventMessage(EventId, EventType, Event, InstanceId, InstanceName, CreatedAt, SasUrl);
public record WebhookMessageEventGrid(
@ -30,7 +32,9 @@ public record WebhookMessageEventGrid(
Guid Id,
[property: TypeDiscrimnatorAttribute("EventType", typeof(EventTypeProvider))]
[property: JsonConverter(typeof(BaseEventConverter))]
BaseEvent Data);
BaseEvent Data,
Uri SasUrl,
String Version = "1.0");
public record WebhookMessageLog(
[RowKey] Guid EventId,

View File

@ -25,7 +25,7 @@ namespace ApiService.TestHooks {
_log.Info($"Log event");
var s = await req.ReadAsStringAsync();
var msg = JsonSerializer.Deserialize<EventMessage>(s!, EntityConverter.GetJsonSerializerOptions());
var msg = JsonSerializer.Deserialize<DownloadableEventMessage>(s!, EntityConverter.GetJsonSerializerOptions());
_events.LogEvent(msg!.Event);
var resp = req.CreateResponse(HttpStatusCode.OK);
return resp;
@ -37,7 +37,7 @@ namespace ApiService.TestHooks {
_log.Info($"Send event");
var s = await req.ReadAsStringAsync();
var msg = JsonSerializer.Deserialize<EventMessage>(s!, EntityConverter.GetJsonSerializerOptions());
var msg = JsonSerializer.Deserialize<DownloadableEventMessage>(s!, EntityConverter.GetJsonSerializerOptions());
await _events.SendEvent(msg!.Event);
var resp = req.CreateResponse(HttpStatusCode.OK);
return resp;

View File

@ -1,5 +1,7 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using Azure.Storage.Sas;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service {
@ -8,16 +10,20 @@ namespace Microsoft.OneFuzz.Service {
public record SignalREvent
(
string Target,
List<EventMessage> arguments
List<DownloadableEventMessage> arguments
);
public interface IEvents {
Async.Task SendEvent(BaseEvent anEvent);
Async.Task QueueSignalrEvent(EventMessage message);
Async.Task QueueSignalrEvent(DownloadableEventMessage message);
void LogEvent(BaseEvent anEvent);
Async.Task<OneFuzzResult<EventMessage>> GetEvent(Guid eventId);
Async.Task<OneFuzzResult<DownloadableEventMessage>> GetDownloadableEvent(Guid eventId);
Async.Task<DownloadableEventMessage> MakeDownloadable(EventMessage eventMessage);
}
public class Events : IEvents {
@ -28,20 +34,20 @@ namespace Microsoft.OneFuzz.Service {
private readonly ICreds _creds;
private readonly JsonSerializerOptions _options;
public Events(IQueue queue, IWebhookOperations webhook, ILogTracer log, IContainers containers, ICreds creds) {
_queue = queue;
_webhook = webhook;
public Events(ILogTracer log, IOnefuzzContext context) {
_queue = context.Queue;
_webhook = context.WebhookOperations;
_log = log;
_containers = containers;
_creds = creds;
_containers = context.Containers;
_creds = context.Creds;
_options = new JsonSerializerOptions(EntityConverter.GetJsonSerializerOptions()) {
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
};
_options.Converters.Add(new RemoveUserInfo());
}
public async Async.Task QueueSignalrEvent(EventMessage message) {
var ev = new SignalREvent("events", new List<EventMessage>() { message });
public virtual async Async.Task QueueSignalrEvent(DownloadableEventMessage message) {
var ev = new SignalREvent("events", new List<DownloadableEventMessage>() { message });
await _queue.SendMessage("signalr-events", JsonSerializer.Serialize(ev, _options), StorageType.Config);
}
@ -49,23 +55,80 @@ namespace Microsoft.OneFuzz.Service {
var eventType = anEvent.GetEventType();
var instanceId = await _containers.GetInstanceId();
var creationDate = DateTime.UtcNow;
var eventMessage = new EventMessage(
Guid.NewGuid(),
eventType,
anEvent,
instanceId,
_creds.GetInstanceName()
_creds.GetInstanceName(),
creationDate
);
await QueueSignalrEvent(eventMessage);
await _webhook.SendEvent(eventMessage);
var downloadableEventMessage = await MakeDownloadable(eventMessage);
await QueueSignalrEvent(downloadableEventMessage);
await _webhook.SendEvent(downloadableEventMessage);
LogEvent(anEvent);
}
public void LogEvent(BaseEvent anEvent) {
public virtual void LogEvent(BaseEvent anEvent) {
var serializedEvent = JsonSerializer.Serialize(anEvent, anEvent.GetType(), _options);
_log.Info($"sending event: {anEvent.GetEventType():Tag:EventType} - {serializedEvent}");
}
public async Async.Task<OneFuzzResult<EventMessage>> GetEvent(Guid eventId) {
var blob = await _containers.GetBlob(WellKnownContainers.Events, eventId.ToString(), StorageType.Corpus);
if (blob == null) {
return OneFuzzResult<EventMessage>.Error(ErrorCode.UNABLE_TO_FIND, $"Could not find container for event with id {eventId}");
}
var eventMessage = JsonSerializer.Deserialize<EventMessage>(blob, _options);
if (eventMessage == null) {
return OneFuzzResult<EventMessage>.Error(ErrorCode.UNEXPECTED_DATA_SHAPE, $"Could not deserialize event with id {eventId}");
}
return OneFuzzResult<EventMessage>.Ok(eventMessage);
}
public async Async.Task<OneFuzzResult<DownloadableEventMessage>> GetDownloadableEvent(Guid eventId) {
var eventMessageResult = await GetEvent(eventId);
if (!eventMessageResult.IsOk) {
return eventMessageResult.ErrorV;
}
var sasUrl = await _containers.GetFileSasUrl(WellKnownContainers.Events, eventId.ToString(), StorageType.Corpus, BlobSasPermissions.Read);
if (sasUrl == null) {
return OneFuzzResult<DownloadableEventMessage>.Error(ErrorCode.UNABLE_TO_FIND, $"Could not find container for event with id {eventId}");
}
var eventMessage = eventMessageResult.OkV!;
return OneFuzzResult<DownloadableEventMessage>.Ok(new DownloadableEventMessage(
eventMessage.EventId,
eventMessage.EventType,
eventMessage.Event,
eventMessage.InstanceId,
eventMessage.InstanceName,
eventMessage.CreatedAt,
sasUrl
));
}
public async Task<DownloadableEventMessage> MakeDownloadable(EventMessage eventMessage) {
await _containers.SaveBlob(WellKnownContainers.Events, eventMessage.EventId.ToString(), JsonSerializer.Serialize(eventMessage, _options), StorageType.Corpus);
var sasUrl = await _containers.GetFileSasUrl(WellKnownContainers.Events, eventMessage.EventId.ToString(), StorageType.Corpus, BlobSasPermissions.Read);
return new DownloadableEventMessage(
eventMessage.EventId,
eventMessage.EventType,
eventMessage.Event,
eventMessage.InstanceId,
eventMessage.InstanceName,
eventMessage.CreatedAt,
sasUrl
);
}
}

View File

@ -22,6 +22,11 @@ public class NotificationOperations : Orm<Notification>, INotificationOperations
}
public async Async.Task NewFiles(Container container, string filename, bool isLastRetryAttempt) {
// We don't want to store file added events for the events container because that causes an infinite loop
if (container == WellKnownContainers.Events) {
return;
}
var notifications = GetNotifications(container);
var hasNotifications = await notifications.AnyAsync();
var reportOrRegression = await _context.Reports.GetReportOrRegression(container, filename, expectReports: hasNotifications);

View File

@ -10,9 +10,9 @@ using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service;
public interface IWebhookOperations : IOrm<Webhook> {
Async.Task SendEvent(EventMessage eventMessage);
Async.Task SendEvent(DownloadableEventMessage eventMessage);
Async.Task<Webhook?> GetByWebhookId(Guid webhookId);
Async.Task<bool> Send(WebhookMessageLog messageLog);
Async.Task<OneFuzzResultVoid> Send(WebhookMessageLog messageLog);
Task<EventPing> Ping(Webhook webhook);
}
@ -25,7 +25,7 @@ public class WebhookOperations : Orm<Webhook>, IWebhookOperations {
_httpFactory = httpFactory;
}
async public Async.Task SendEvent(EventMessage eventMessage) {
public async Async.Task SendEvent(DownloadableEventMessage eventMessage) {
await foreach (var webhook in GetWebhooksCached()) {
if (!webhook.EventTypes.Contains(eventMessage.EventType)) {
continue;
@ -34,7 +34,7 @@ public class WebhookOperations : Orm<Webhook>, IWebhookOperations {
}
}
async private Async.Task AddEvent(Webhook webhook, EventMessage eventMessage) {
private async Async.Task AddEvent(Webhook webhook, DownloadableEventMessage eventMessage) {
(string, string)[] tags = { ("WebhookId", webhook.WebhookId.ToString()), ("EventId", eventMessage.EventId.ToString()) };
var message = new WebhookMessageLog(
@ -77,14 +77,25 @@ public class WebhookOperations : Orm<Webhook>, IWebhookOperations {
}
}
public async Async.Task<bool> Send(WebhookMessageLog messageLog) {
public async Async.Task<OneFuzzResultVoid> Send(WebhookMessageLog messageLog) {
var webhook = await GetByWebhookId(messageLog.WebhookId);
if (webhook == null || webhook.Url == null) {
throw new Exception($"Invalid Webhook. Webhook with WebhookId: {messageLog.WebhookId} Not Found");
return OneFuzzResultVoid.Error(ErrorCode.UNABLE_TO_FIND, $"Invalid Webhook. Webhook with WebhookId: {messageLog.WebhookId} Not Found");
}
var (data, digest) = await BuildMessage(webhookId: webhook.WebhookId, eventId: messageLog.EventId, eventType: messageLog.EventType, webhookEvent: messageLog.Event!, secretToken: webhook.SecretToken, messageFormat: webhook.MessageFormat);
var messageResult = await BuildMessage(webhookId: webhook.WebhookId, eventId: messageLog.EventId, eventType: messageLog.EventType, webhookEvent: messageLog.Event!, secretToken: webhook.SecretToken, messageFormat: webhook.MessageFormat);
if (!messageResult.IsOk) {
var tags = new List<(string, string)> {
("WebhookId", webhook.WebhookId.ToString()),
("EventId", messageLog.EventId.ToString()),
("EventType", messageLog.EventType.ToString())
};
_logTracer.WithTags(tags).Error($"Failed to build message for webhook.");
return OneFuzzResultVoid.Error(messageResult.ErrorV);
}
var (data, digest) = messageResult.OkV;
var headers = new Dictionary<string, string> { { "User-Agent", $"onefuzz-webhook {_context.ServiceConfiguration.OneFuzzVersion}" } };
if (digest != null) {
@ -98,7 +109,7 @@ public class WebhookOperations : Orm<Webhook>, IWebhookOperations {
using var response = await client.Post(url: webhook.Url, json: data, headers: headers);
if (response.IsSuccessStatusCode) {
_logTracer.Info($"Successfully sent webhook: {messageLog.WebhookId:Tag:WebhookId}");
return true;
return OneFuzzResultVoid.Ok;
}
_logTracer
@ -108,26 +119,38 @@ public class WebhookOperations : Orm<Webhook>, IWebhookOperations {
})
.Info($"Webhook not successful");
return false;
return OneFuzzResultVoid.Error(ErrorCode.UNABLE_TO_SEND, $"Webhook not successful. Status Code: {response.StatusCode}");
}
public async Task<EventPing> Ping(Webhook webhook) {
var ping = new EventPing(Guid.NewGuid());
var instanceId = await _context.Containers.GetInstanceId();
var instanceName = _context.Creds.GetInstanceName();
await AddEvent(webhook, new EventMessage(Guid.NewGuid(), EventType.Ping, ping, instanceId, instanceName));
var eventMessage = new EventMessage(
ping.PingId, EventType.Ping, ping, instanceId, instanceName, DateTime.UtcNow
);
var downloadableEventMessage = await _context.Events.MakeDownloadable(eventMessage);
await AddEvent(webhook, downloadableEventMessage);
return ping;
}
// Not converting to bytes, as it's not neccessary in C#. Just keeping as string.
public async Async.Task<Tuple<string, string?>> BuildMessage(Guid webhookId, Guid eventId, EventType eventType, BaseEvent webhookEvent, String? secretToken, WebhookMessageFormat? messageFormat) {
string data = "";
public async Async.Task<OneFuzzResult<Tuple<string, string?>>> BuildMessage(Guid webhookId, Guid eventId, EventType eventType, BaseEvent webhookEvent, String? secretToken, WebhookMessageFormat? messageFormat) {
var eventDataResult = await _context.Events.GetDownloadableEvent(eventId);
if (!eventDataResult.IsOk) {
return OneFuzzResult<Tuple<string, string?>>.Error(eventDataResult.ErrorV);
}
var eventData = eventDataResult.OkV;
string data;
if (messageFormat != null && messageFormat == WebhookMessageFormat.EventGrid) {
var eventGridMessage = new[] { new WebhookMessageEventGrid(Id: eventId, Data: webhookEvent, DataVersion: "1.0.0", Subject: _context.Creds.GetInstanceName(), EventType: eventType, EventTime: DateTimeOffset.UtcNow) };
var eventGridMessage = new[] { new WebhookMessageEventGrid(Id: eventId, Data: webhookEvent, DataVersion: "1.0.0", Subject: _context.Creds.GetInstanceName(), EventType: eventType, EventTime: DateTimeOffset.UtcNow, SasUrl: eventData.SasUrl) };
data = JsonSerializer.Serialize(eventGridMessage, options: EntityConverter.GetJsonSerializerOptions());
} else {
var instanceId = await _context.Containers.GetInstanceId();
var webhookMessage = new WebhookMessage(WebhookId: webhookId, EventId: eventId, EventType: eventType, Event: webhookEvent, InstanceId: instanceId, InstanceName: _context.Creds.GetInstanceName());
var webhookMessage = new WebhookMessage(WebhookId: webhookId, EventId: eventId, EventType: eventType, Event: webhookEvent, InstanceId: instanceId, InstanceName: _context.Creds.GetInstanceName(), CreatedAt: eventData.CreatedAt, SasUrl: eventData.SasUrl);
data = JsonSerializer.Serialize(webhookMessage, options: EntityConverter.GetJsonSerializerOptions());
}
@ -137,7 +160,8 @@ public class WebhookOperations : Orm<Webhook>, IWebhookOperations {
using var hmac = new HMACSHA512(Encoding.UTF8.GetBytes(secretToken));
digest = Convert.ToHexString(hmac.ComputeHash(Encoding.UTF8.GetBytes(data)));
}
return new Tuple<string, string?>(data, digest);
return OneFuzzResult<Tuple<string, string?>>.Ok(new Tuple<string, string?>(data, digest));
}
@ -166,7 +190,7 @@ public class WebhookMessageLogOperations : Orm<WebhookMessageLog>, IWebhookMessa
const int MAX_TRIES = 5;
public WebhookMessageLogOperations(IHttpClientFactory httpFactory, ILogTracer log, IOnefuzzContext context)
public WebhookMessageLogOperations(ILogTracer log, IOnefuzzContext context)
: base(log, context) {
}
@ -244,7 +268,11 @@ public class WebhookMessageLogOperations : Orm<WebhookMessageLog>, IWebhookMessa
return false;
}
try {
return await _context.WebhookOperations.Send(message);
var sendResult = await _context.WebhookOperations.Send(message);
if (!sendResult.IsOk) {
_logTracer.Error(sendResult.ErrorV);
}
return sendResult.IsOk;
} catch (Exception exc) {
log.Exception(exc);
return false;

View File

@ -9,4 +9,5 @@ public static class WellKnownContainers {
public static readonly Container ReproScripts = Container.Parse("repro-scripts");
public static readonly Container TaskConfigs = Container.Parse("task-configs");
public static readonly Container ProxyConfigs = Container.Parse("proxy-configs");
public static readonly Container Events = Container.Parse("events");
}

View File

@ -1,3 +0,0 @@
__azurite_db_*.json
__blobstorage__
__queuestorage__

View File

@ -5,6 +5,7 @@ using System.IO;
using System.Linq;
using System.Net;
using Azure.Storage.Blobs;
using FluentAssertions;
using IntegrationTests.Fakes;
using Microsoft.OneFuzz.Service;
using Microsoft.OneFuzz.Service.Functions;
@ -155,13 +156,20 @@ public abstract class ContainersTestBase : FunctionTestBase {
var list = BodyAs<ContainerInfoBase[]>(result);
// other tests can run in parallel, so filter to just our containers:
var cs = list.Where(ci => ci.Name.String.StartsWith(Context.ServiceConfiguration.OneFuzzStoragePrefix)).ToList();
Assert.Equal(2, cs.Count);
var cs = list
.Where(ci => ci.Name.String.StartsWith(Context.ServiceConfiguration.OneFuzzStoragePrefix))
.ToList();
_ = list.Should().Contain(ci => ci.Name.String.Contains("one"));
_ = list.Should().Contain(ci => ci.Name.String.Contains("two"));
var cs1 = list.Single(ci => ci.Name.String.Contains("one"));
var cs2 = list.Single(ci => ci.Name.String.Contains("two"));
// ensure correct metadata was returned.
// these will be in order as "one"<"two"
Assert.Equal(meta1, cs[0].Metadata);
Assert.Equal(meta2, cs[1].Metadata);
Assert.Equal(meta1, cs1.Metadata);
Assert.Equal(meta2, cs2.Metadata);
}
private static async Async.Task AssertCanCRUD(Uri sasUrl) {

View File

@ -0,0 +1,65 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using Azure.Storage.Blobs;
using FluentAssertions;
using IntegrationTests.Fakes;
using Microsoft.OneFuzz.Service;
using Microsoft.OneFuzz.Service.Functions;
using Xunit;
using Xunit.Abstractions;
using Async = System.Threading.Tasks;
namespace IntegrationTests;
[Trait("Category", "Live")]
public class AzureStorageEventsTest : EventsTestBase {
public AzureStorageEventsTest(ITestOutputHelper output)
: base(output, Integration.AzureStorage.FromEnvironment()) { }
}
public class AzuriteEventsTest : EventsTestBase {
public AzuriteEventsTest(ITestOutputHelper output)
: base(output, new Integration.AzuriteStorage()) { }
}
public abstract class EventsTestBase : FunctionTestBase {
public EventsTestBase(ITestOutputHelper output, IStorage storage)
: base(output, storage) { }
[Fact]
public async Async.Task BlobIsCreatedAndIsAccessible() {
var webhookId = Guid.NewGuid();
var webhookName = "test-webhook";
var insertWebhook = await Context.WebhookOperations.Insert(
new Webhook(webhookId, webhookName, null, new List<EventType> { EventType.Ping }, null, WebhookMessageFormat.Onefuzz)
);
insertWebhook.IsOk.Should().BeTrue();
var webhook = await Context.WebhookOperations.GetByWebhookId(webhookId);
webhook.Should().NotBeNull();
var ping = await Context.WebhookOperations.Ping(webhook!);
ping.Should().NotBeNull();
var msg = TestHttpRequestData.FromJson("GET", new EventsGet(ping.PingId));
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new EventsFunction(Logger, auth, Context);
var result = await func.Run(msg);
result.StatusCode.Should().Be(HttpStatusCode.OK);
var eventPayload = BodyAs<EventGetResponse>(result);
eventPayload.Event.EventType.Should().Be(EventType.Ping);
var pingEvent = (EventPing)eventPayload.Event.Event;
pingEvent.PingId.Should().Be(ping.PingId);
var containerClient = new BlobContainerClient(eventPayload.Event.SasUrl);
var stream = await containerClient.GetBlobClient(pingEvent.PingId.ToString()).OpenReadAsync();
using var sr = new StreamReader(stream);
var eventData = await sr.ReadToEndAsync(); // read to make sure the SAS URL works
eventData.Should().Contain(ping.PingId.ToString());
}
}

View File

@ -1,14 +1,7 @@
using System;
using System.Threading.Tasks;
using Microsoft.OneFuzz.Service;
using Microsoft.OneFuzz.Service;
// TestContainers class allows use of InstanceID without having to set it up in blob storage
sealed class TestContainers : Containers {
public TestContainers(ILogTracer log, IStorage storage, IServiceConfig config)
: base(log, storage, config) { }
public Guid InstanceId { get; } = Guid.NewGuid();
public override Task<Guid> GetInstanceId()
=> System.Threading.Tasks.Task.FromResult(InstanceId);
}

View File

@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Net.Http;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Configuration.AzureAppConfiguration;
using Microsoft.Extensions.Options;
@ -14,7 +15,7 @@ namespace IntegrationTests.Fakes;
// TestContext provides a minimal IOnefuzzContext implementation to allow running
// of functions as unit or integration tests.
public sealed class TestContext : IOnefuzzContext {
public TestContext(ILogTracer logTracer, IStorage storage, ICreds creds, string storagePrefix) {
public TestContext(IHttpClientFactory httpClientFactory, ILogTracer logTracer, IStorage storage, ICreds creds, string storagePrefix) {
var cache = new MemoryCache(Options.Create(new MemoryCacheOptions()));
EntityConverter = new EntityConverter();
ServiceConfiguration = new TestServiceConfiguration(storagePrefix);
@ -42,10 +43,11 @@ public sealed class TestContext : IOnefuzzContext {
NotificationOperations = new NotificationOperations(logTracer, this);
SecretsOperations = new TestSecretsOperations(Creds, ServiceConfiguration);
FeatureManagerSnapshot = new TestFeatureManagerSnapshot();
WebhookOperations = new TestWebhookOperations(httpClientFactory, logTracer, this);
Events = new TestEvents(logTracer, this);
WebhookMessageLogOperations = new TestWebhookMessageLogOperations(logTracer, this);
}
public TestEvents Events { get; set; } = new();
// convenience method for test setup
public Async.Task InsertAll(params EntityBase[] objs)
=> Async.Task.WhenAll(
@ -58,12 +60,13 @@ public sealed class TestContext : IOnefuzzContext {
NodeTasks nt => NodeTasksOperations.Insert(nt),
InstanceConfig ic => ConfigOperations.Insert(ic),
Notification n => NotificationOperations.Insert(n),
Webhook w => WebhookOperations.Insert(w),
_ => throw new NotSupportedException($"You will need to add an TestContext.InsertAll case for {x.GetType()} entities"),
}));
// Implementations:
IEvents IOnefuzzContext.Events => Events;
public IEvents Events { get; }
public IServiceConfig ServiceConfiguration { get; }
@ -95,6 +98,9 @@ public sealed class TestContext : IOnefuzzContext {
public IFeatureManagerSnapshot FeatureManagerSnapshot { get; }
public IWebhookOperations WebhookOperations { get; }
public IWebhookMessageLogOperations WebhookMessageLogOperations { get; }
// -- Remainder not implemented --
@ -121,10 +127,6 @@ public sealed class TestContext : IOnefuzzContext {
public IVmOperations VmOperations => throw new System.NotImplementedException();
public IWebhookMessageLogOperations WebhookMessageLogOperations => throw new System.NotImplementedException();
public IWebhookOperations WebhookOperations => throw new System.NotImplementedException();
public INsgOperations NsgOperations => throw new NotImplementedException();
public ISubnet Subnet => throw new NotImplementedException();

View File

@ -16,11 +16,14 @@ sealed class TestCreds : ICreds {
private readonly Guid _subscriptionId;
private readonly string _resourceGroup;
private readonly Region _region;
private readonly string _instanceName;
public TestCreds(Guid subscriptionId, string resourceGroup, Region region) {
public TestCreds(Guid subscriptionId, string resourceGroup, Region region, string instanceName) {
_subscriptionId = subscriptionId;
_resourceGroup = resourceGroup;
_region = region;
_instanceName = instanceName;
}
public ArmClient ArmClient => null!;
@ -40,7 +43,7 @@ sealed class TestCreds : ICreds {
}
public string GetInstanceName() {
throw new NotImplementedException();
return _instanceName;
}
public ResourceGroupResource GetResourceGroupResource() {

View File

@ -5,22 +5,20 @@ using Async = System.Threading.Tasks;
namespace IntegrationTests.Fakes;
public sealed class TestEvents : IEvents {
public sealed class TestEvents : Events {
public List<BaseEvent> Events { get; } = new();
public List<EventMessage> SignalREvents { get; } = new();
public List<DownloadableEventMessage> SignalREvents { get; } = new();
public void LogEvent(BaseEvent anEvent) {
public TestEvents(ILogTracer log, IOnefuzzContext context)
: base(log, context) { }
public override void LogEvent(BaseEvent anEvent) {
Events.Add(anEvent);
}
public Async.Task QueueSignalrEvent(EventMessage message) {
public override Async.Task QueueSignalrEvent(DownloadableEventMessage message) {
SignalREvents.Add(message);
return Async.Task.CompletedTask;
}
public Async.Task SendEvent(BaseEvent anEvent) {
Events.Add(anEvent);
return Async.Task.CompletedTask;
}
}

View File

@ -0,0 +1,13 @@
using System.Collections.Generic;
using Microsoft.OneFuzz.Service;
namespace IntegrationTests.Fakes;
public sealed class TestWebhookMessageLogOperations : WebhookMessageLogOperations {
public List<BaseEvent> Events { get; } = new();
public List<DownloadableEventMessage> SignalREvents { get; } = new();
public TestWebhookMessageLogOperations(ILogTracer log, IOnefuzzContext context)
: base(log, context) { }
}

View File

@ -0,0 +1,14 @@
using System.Collections.Generic;
using System.Net.Http;
using Microsoft.OneFuzz.Service;
namespace IntegrationTests.Fakes;
public sealed class TestWebhookOperations : WebhookOperations {
public List<BaseEvent> Events { get; } = new();
public List<DownloadableEventMessage> SignalREvents { get; } = new();
public TestWebhookOperations(IHttpClientFactory httpClientFactory, ILogTracer log, IOnefuzzContext context)
: base(httpClientFactory, log, context) { }
}

View File

@ -1,6 +1,6 @@

using System;
using System.Net;
using FluentAssertions;
using IntegrationTests.Fakes;
using Microsoft.OneFuzz.Service;
using Microsoft.OneFuzz.Service.Functions;
@ -45,14 +45,6 @@ public abstract class InfoTestBase : FunctionTestBase {
[Fact]
public async Async.Task TestInfo_WithUserCredentials_Succeeds() {
// store the instance ID in the expected location:
// for production this is done by the deploy script
var instanceId = Guid.NewGuid().ToString();
var baseConfigContainer = WellKnownContainers.BaseConfig;
var containerClient = GetContainerClient(baseConfigContainer);
_ = await containerClient.CreateAsync();
_ = await containerClient.GetBlobClient("instance_id").UploadAsync(new BinaryData(instanceId));
var auth = new TestEndpointAuthorization(RequestType.User, Logger, Context);
var func = new Info(auth, Context);
@ -61,6 +53,7 @@ public abstract class InfoTestBase : FunctionTestBase {
// the instance ID should be somewhere in the result,
// indicating it was read from the blob
Assert.Contains(instanceId, BodyAsString(result));
var info = BodyAs<InfoResponse>(result);
info.InstanceId.Should().NotBeEmpty();
}
}

View File

@ -1,6 +1,7 @@
using System;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using ApiService.OneFuzzLib.Orm;
using Azure.Data.Tables;
@ -46,15 +47,23 @@ public abstract class FunctionTestBase : IAsyncLifetime {
=> _blobClient.GetBlobContainerClient(_storagePrefix + container.String);
public FunctionTestBase(ITestOutputHelper output, IStorage storage) {
var instanceId = Guid.NewGuid().ToString();
Logger = new TestLogTracer(output);
_storage = storage;
var creds = new TestCreds(_subscriptionId, _resourceGroup, _region);
Context = new TestContext(Logger, _storage, creds, _storagePrefix);
var creds = new TestCreds(_subscriptionId, _resourceGroup, _region, instanceId);
Context = new TestContext(new DefaultHttpClientFactory(), Logger, _storage, creds, _storagePrefix);
// set up blob client for test purposes:
// this is always sync for test purposes
_blobClient = _storage.GetBlobServiceClientForAccount(_storage.GetPrimaryAccount(StorageType.Config)).Result;
var baseConfigContainer = WellKnownContainers.BaseConfig;
var containerClient = GetContainerClient(baseConfigContainer);
_ = containerClient.Create();
_ = containerClient.GetBlobClient("instance_id").Upload(new BinaryData(instanceId));
_ = GetContainerClient(WellKnownContainers.Events).Create();
}
public async Task InitializeAsync() {
@ -109,3 +118,15 @@ public abstract class FunctionTestBase : IAsyncLifetime {
})
.ToListAsync());
}
public sealed class DefaultHttpClientFactory : IHttpClientFactory, IDisposable {
private readonly Lazy<HttpMessageHandler> _handlerLazy = new(() => new HttpClientHandler());
public HttpClient CreateClient(string name) => new(_handlerLazy.Value, disposeHandler: false);
public void Dispose() {
if (_handlerLazy.IsValueCreated) {
_handlerLazy.Value.Dispose();
}
}
}

View File

@ -155,14 +155,15 @@ namespace Tests {
Outdated: outdated);
public static Gen<EventMessage> EventMessage() {
return Arb.Generate<Tuple<Guid, BaseEvent, Guid, string>>().Select(
return Arb.Generate<Tuple<Guid, BaseEvent, Guid, string, DateTime>>().Select(
arg =>
new EventMessage(
EventId: arg.Item1,
EventType: arg.Item2.GetEventType(),
Event: arg.Item2,
InstanceId: arg.Item3,
InstanceName: arg.Item4
InstanceName: arg.Item4,
CreatedAt: arg.Item5
)
);
}
@ -280,7 +281,7 @@ namespace Tests {
}
public static Gen<WebhookMessage> WebhookMessage() {
return Arb.Generate<Tuple<Guid, BaseEvent, Guid, string, Guid>>().Select(
return Arb.Generate<Tuple<Guid, BaseEvent, Guid, string, Guid, DateTime, Uri>>().Select(
arg =>
new WebhookMessage(
EventId: arg.Item1,
@ -288,13 +289,15 @@ namespace Tests {
Event: arg.Item2,
InstanceId: arg.Item3,
InstanceName: arg.Item4,
WebhookId: arg.Item5
WebhookId: arg.Item5,
CreatedAt: arg.Item6,
SasUrl: arg.Item7
)
);
}
public static Gen<WebhookMessageEventGrid> WebhookMessageEventGrid() {
return Arb.Generate<Tuple<string, string, BaseEvent, Guid, DateTimeOffset>>().Select(
return Arb.Generate<Tuple<string, string, BaseEvent, Guid, DateTimeOffset, Uri>>().Select(
arg =>
new WebhookMessageEventGrid(
DataVersion: arg.Item1,
@ -302,7 +305,8 @@ namespace Tests {
EventType: arg.Item3.GetEventType(),
Data: arg.Item3,
Id: arg.Item4,
EventTime: arg.Item5
EventTime: arg.Item5,
SasUrl: arg.Item6
)
);
}
@ -487,6 +491,21 @@ namespace Tests {
)
);
}
public static Gen<DownloadableEventMessage> DownloadableEventMessage() {
return Arb.Generate<Tuple<Guid, BaseEvent, Guid, string, DateTime, Uri>>().Select(
arg =>
new DownloadableEventMessage(
EventId: arg.Item1,
EventType: arg.Item2.GetEventType(),
Event: arg.Item2,
InstanceId: arg.Item3,
InstanceName: arg.Item4,
CreatedAt: arg.Item5,
SasUrl: arg.Item6
)
);
}
}
public class OrmArb {
@ -528,6 +547,10 @@ namespace Tests {
return Arb.From(OrmGenerators.EventMessage());
}
public static Arbitrary<DownloadableEventMessage> DownloadableEventMessage() {
return Arb.From(OrmGenerators.DownloadableEventMessage());
}
public static Arbitrary<NetworkConfig> NetworkConfig() {
return Arb.From(OrmGenerators.NetworkConfig());
}
@ -1084,7 +1107,7 @@ namespace Tests {
}
[Property]
public bool EventMessage(EventMessage e) {
public bool EventMessage(DownloadableEventMessage e) {
return Test(e);
}

View File

@ -236,9 +236,9 @@ namespace Tests {
[Fact]
public void TestEventSerialization() {
var expectedEvent = new EventMessage(Guid.NewGuid(), EventType.NodeHeartbeat, new EventNodeHeartbeat(Guid.NewGuid(), Guid.NewGuid(), PoolName.Parse("test-Poool")), Guid.NewGuid(), "test");
var expectedEvent = new EventMessage(Guid.NewGuid(), EventType.NodeHeartbeat, new EventNodeHeartbeat(Guid.NewGuid(), Guid.NewGuid(), PoolName.Parse("test-Poool")), Guid.NewGuid(), "test", DateTime.UtcNow);
var serialized = JsonSerializer.Serialize(expectedEvent, EntityConverter.GetJsonSerializerOptions());
var actualEvent = JsonSerializer.Deserialize<EventMessage>(serialized, EntityConverter.GetJsonSerializerOptions());
var actualEvent = JsonSerializer.Deserialize<EventMessage>((string)serialized, EntityConverter.GetJsonSerializerOptions());
Assert.Equal(expectedEvent, actualEvent);
}

View File

@ -1765,6 +1765,21 @@ class ValidateScriban(Endpoint):
return self._req_model("POST", responses.TemplateValidationResponse, data=req)
class Events(Endpoint):
"""Interact with Onefuzz events"""
endpoint = "events"
def get(self, event_id: UUID_EXPANSION) -> events.EventGetResponse:
"""Get an event's payload by id"""
self.logger.debug("get event: %s", event_id)
return self._req_model(
"GET",
events.EventGetResponse,
data=requests.EventsGet(event_id=event_id),
)
class Command:
def __init__(self, onefuzz: "Onefuzz", logger: logging.Logger):
self.onefuzz = onefuzz
@ -1856,6 +1871,7 @@ class Onefuzz:
self.tools = Tools(self)
self.instance_config = InstanceConfigCmd(self)
self.validate_scriban = ValidateScriban(self)
self.events = Events(self)
# these are externally developed cli modules
self.template = Template(self, self.logger)

View File

@ -6,7 +6,6 @@ var suffix = uniqueString(resourceGroup().id)
var storageAccountNameFuzz = 'fuzz${suffix}'
var storageAccountNameFunc = 'func${suffix}'
var storage_account_sas = {
signedExpiry: signedExpiry
signedPermission: 'rwdlacup'
@ -14,6 +13,9 @@ var storage_account_sas = {
signedServices: 'bfqt'
}
var storageAccountFuzzContainersParams = [
'events'
]
var storageAccountFuncContainersParams = [
'vm-scripts'
@ -102,6 +104,13 @@ resource storageAccountFunBlobContainers 'Microsoft.Storage/storageAccounts/blob
]
}]
resource storageAccountFuzzBlobContainers 'Microsoft.Storage/storageAccounts/blobServices/containers@2021-08-01' = [for c in storageAccountFuzzContainersParams: {
name: '${storageAccountNameFuzz}/default/${c}'
dependsOn: [
storageAccountFuzz
]
}]
output FuzzName string = storageAccountNameFuzz
output FuncName string = storageAccountNameFunc

View File

@ -8,7 +8,7 @@ from enum import Enum
from typing import Any, Dict, List, Optional, Union
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
from pydantic import AnyHttpUrl, BaseModel, Field
from ._monkeypatch import _check_hotfix
from .enums import (
@ -320,6 +320,14 @@ class EventMessage(BaseEvent):
instance_name: str
class DownloadableEventMessage(EventMessage):
sas_url: AnyHttpUrl
class EventGetResponse(BaseResponse):
event: DownloadableEventMessage
# because Pydantic does not yet have discriminated union types yet, parse events
# by hand. https://github.com/samuelcolvin/pydantic/issues/619
def parse_event_message(data: Dict[str, Any]) -> EventMessage:

View File

@ -275,6 +275,10 @@ class JinjaToScribanMigrationPost(BaseModel):
dry_run: bool = Field(default=False)
class EventsGet(BaseModel):
event_id: UUID
class NotificationTest(BaseModel):
report: Report
notification: models.Notification