Refactor queue file changes (#1755)

* Untested refactor

* Untested refactor

* It works with some test messages

* Use custom deserializer

* Clean up warnings
This commit is contained in:
Teo Voinea
2022-04-06 16:37:52 -04:00
committed by GitHub
parent 8299d8fb57
commit c5f60dcaec
6 changed files with 223 additions and 0 deletions

View File

@ -23,6 +23,7 @@
<PackageReference Include="Azure.ResourceManager.Network" Version="1.0.0-beta.5" />
<PackageReference Include="Azure.ResourceManager.Resources" Version="1.0.0-beta.6" />
<PackageReference Include="Azure.ResourceManager.Storage" Version="1.0.0-beta.6" />
<PackageReference Include="Azure.Storage.Queues" Version="12.9.0" />
<PackageReference Include="Microsoft.Graph" Version="4.20.0" />
<PackageReference Include="Microsoft.Identity.Client" Version="4.42.0" />
<PackageReference Include="Microsoft.Identity.Web.TokenCache" Version="1.23.0" />

View File

@ -36,6 +36,8 @@ public class Program
.ConfigureServices((context, services) =>
services.AddSingleton(_ => new LogTracerFactory(GetLoggers()))
.AddSingleton<IStorageProvider>(_ => new StorageProvider(EnvironmentVariables.OneFuzz.FuncStorage ?? throw new InvalidOperationException("Missing account id") ))
.AddSingleton<ICreds>(_ => new Creds())
.AddSingleton<IStorage, Storage>()
)
.Build();

View File

@ -0,0 +1,70 @@
using System;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
using Azure.Storage.Queues.Models;
using System.Linq;
namespace Microsoft.OneFuzz.Service;
public class QueueFileChanges {
// The number of time the function will be retried if an error occurs
// https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-storage-queue-trigger?tabs=csharp#poison-messages
const int MAX_DEQUEUE_COUNT = 5;
private readonly ILogger _logger;
private readonly IStorageProvider _storageProvider;
private readonly IStorage _storage;
public QueueFileChanges(ILoggerFactory loggerFactory, IStorageProvider storageProvider, IStorage storage)
{
_logger = loggerFactory.CreateLogger<QueueFileChanges>();
_storageProvider = storageProvider;
_storage = storage;
}
[Function("QueueFileChanges")]
public Task Run(
[QueueTrigger("file-changes-refactored", Connection = "AzureWebJobsStorage")] string msg,
int dequeueCount)
{
var fileChangeEvent = JsonSerializer.Deserialize<Dictionary<string, string>>(msg, EntityConverter.GetJsonSerializerOptions());
var lastTry = dequeueCount == MAX_DEQUEUE_COUNT;
var _ = fileChangeEvent ?? throw new ArgumentException("Unable to parse queue trigger as JSON");
// check type first before calling Azure APIs
const string eventType = "eventType";
if (!fileChangeEvent.ContainsKey(eventType)
|| fileChangeEvent[eventType] != "Microsoft.Storage.BlobCreated")
{
return Task.CompletedTask;
}
const string topic = "topic";
if (!fileChangeEvent.ContainsKey(topic)
|| !_storage.CorpusAccounts().Contains(fileChangeEvent[topic]))
{
return Task.CompletedTask;
}
file_added(fileChangeEvent, lastTry);
return Task.CompletedTask;
}
private void file_added(Dictionary<string, string> fileChangeEvent, bool failTaskOnTransientError) {
var data = JsonSerializer.Deserialize<Dictionary<string, string>>(fileChangeEvent["data"])!;
var url = data["url"];
var parts = url.Split("/").Skip(3).ToList();
var container = parts[0];
var path = string.Join('/', parts.Skip(1));
_logger.LogInformation($"file added container: {container} - path: {path}");
// TODO: new_files(container, path, fail_task_on_transient_error)
}
}

View File

@ -0,0 +1,46 @@
using Azure.Identity;
using Azure.Core;
namespace Microsoft.OneFuzz.Service;
public interface ICreds {
public DefaultAzureCredential GetIdentity();
public string GetSubcription();
public string GetBaseResourceGroup();
public ResourceIdentifier GetResourceGroupResourceIdentifier();
}
public class Creds : ICreds {
// TODO: @cached
public DefaultAzureCredential GetIdentity() {
// TODO: AllowMoreWorkers
// TODO: ReduceLogging
return new DefaultAzureCredential();
}
// TODO: @cached
public string GetSubcription() {
var storageResourceId = EnvironmentVariables.OneFuzz.DataStorage
?? throw new System.Exception("Data storage env var is not present");
var storageResource = new ResourceIdentifier(storageResourceId);
return storageResource.SubscriptionId!;
}
// TODO: @cached
public string GetBaseResourceGroup() {
var storageResourceId = EnvironmentVariables.OneFuzz.DataStorage
?? throw new System.Exception("Data storage env var is not present");
var storageResource = new ResourceIdentifier(storageResourceId);
return storageResource.ResourceGroupName!;
}
public ResourceIdentifier GetResourceGroupResourceIdentifier() {
var resourceId = EnvironmentVariables.OneFuzz.ResourceGroup
?? throw new System.Exception("Resource group env var is not present");
return new ResourceIdentifier(resourceId);
}
}

View File

@ -0,0 +1,79 @@
using System.Collections.Generic;
using System;
using Azure.ResourceManager;
using Azure.ResourceManager.Storage;
using Azure.Core;
using Microsoft.Extensions.Logging;
using System.Text.Json;
namespace Microsoft.OneFuzz.Service;
public interface IStorage {
public ArmClient GetMgmtClient();
public IEnumerable<string> CorpusAccounts();
}
public class Storage : IStorage {
private ICreds _creds;
private readonly ILogger _logger;
public Storage(ILoggerFactory loggerFactory, ICreds creds) {
_creds = creds;
_logger = loggerFactory.CreateLogger<Storage>();
}
// TODO: @cached
public static string GetFuncStorage() {
return EnvironmentVariables.OneFuzz.FuncStorage
?? throw new Exception("Func storage env var is missing");
}
// TODO: @cached
public static string GetFuzzStorage() {
return EnvironmentVariables.OneFuzz.DataStorage
?? throw new Exception("Fuzz storage env var is missing");
}
// TODO: @cached
public ArmClient GetMgmtClient() {
return new ArmClient(credential: _creds.GetIdentity(), defaultSubscriptionId: _creds.GetSubcription());
}
// TODO: @cached
public IEnumerable<string> CorpusAccounts() {
var skip = GetFuncStorage();
var results = new List<string> {GetFuzzStorage()};
var client = GetMgmtClient();
var group = _creds.GetResourceGroupResourceIdentifier();
const string storageTypeTagKey = "storage_type";
var resourceGroup = client.GetResourceGroup(group);
foreach (var account in resourceGroup.GetStorageAccounts()) {
if (account.Id == skip) {
continue;
}
if (results.Contains(account.Id!)) {
continue;
}
if (string.IsNullOrEmpty(account.Data.PrimaryEndpoints.Blob)) {
continue;
}
if (!account.Data.Tags.ContainsKey(storageTypeTagKey)
|| account.Data.Tags[storageTypeTagKey] != "corpus") {
continue;
}
results.Add(account.Id!);
}
_logger.LogInformation($"corpus accounts: {JsonSerializer.Serialize(results)}");
return results;
}
}

View File

@ -107,6 +107,17 @@
"System.Text.Json": "4.7.2"
}
},
"Azure.Storage.Queues": {
"type": "Direct",
"requested": "[12.9.0, )",
"resolved": "12.9.0",
"contentHash": "jDiyHtsCUCrWNvZW7SjJnJb46UhpdgQrWCbL8aWpapDHlq9LvbvxYpfLh4dfKAz09QiTznLMIU3i+md9+7GzqQ==",
"dependencies": {
"Azure.Storage.Common": "12.10.0",
"System.Memory.Data": "1.0.2",
"System.Text.Json": "4.7.2"
}
},
"Microsoft.Azure.Functions.Worker": {
"type": "Direct",
"requested": "[1.6.0, )",
@ -223,6 +234,15 @@
"Microsoft.Bcl.AsyncInterfaces": "6.0.0"
}
},
"Azure.Storage.Common": {
"type": "Transitive",
"resolved": "12.10.0",
"contentHash": "vYkHGzUkdZTace/cDPZLG+Mh/EoPqQuGxDIBOau9D+XWoDPmuUFGk325aXplkFE4JFGpSwoytNYzk/qBCaiHqg==",
"dependencies": {
"Azure.Core": "1.22.0",
"System.IO.Hashing": "6.0.0"
}
},
"Google.Protobuf": {
"type": "Transitive",
"resolved": "3.15.8",
@ -795,6 +815,11 @@
"System.Threading.Tasks": "4.3.0"
}
},
"System.IO.Hashing": {
"type": "Transitive",
"resolved": "6.0.0",
"contentHash": "Rfm2jYCaUeGysFEZjDe7j1R4x6Z6BzumS/vUT5a1AA/AWJuGX71PoGB0RmpyX3VmrGqVnAwtfMn39OHR8Y/5+g=="
},
"System.Memory": {
"type": "Transitive",
"resolved": "4.5.4",