Set autocrlf by default, update affected files (#2261)

This commit is contained in:
George Pollard
2022-08-17 13:07:20 +12:00
committed by GitHub
parent 2382ce5ca5
commit a3f1d59f70
20 changed files with 1545 additions and 1543 deletions

View File

@ -1,40 +1,40 @@
using System.Text.Json;
using Microsoft.Azure.Functions.Worker;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service.Functions;
public class QueueProxyHearbeat {
private readonly ILogTracer _log;
private readonly IProxyOperations _proxy;
public QueueProxyHearbeat(ILogTracer log, IProxyOperations proxy) {
_log = log;
_proxy = proxy;
}
//[Function("QueueProxyHearbeat")]
public async Async.Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg) {
_log.Info($"heartbeat: {msg}");
var hb = JsonSerializer.Deserialize<ProxyHeartbeat>(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}"); ;
var newHb = hb with { TimeStamp = DateTimeOffset.UtcNow };
var proxy = await _proxy.GetByProxyId(newHb.ProxyId);
var log = _log.WithTag("ProxyId", newHb.ProxyId.ToString());
if (proxy == null) {
log.Warning($"invalid proxy id: {newHb.ProxyId}");
return;
}
var newProxy = proxy with { Heartbeat = newHb };
var r = await _proxy.Replace(newProxy);
if (!r.IsOk) {
var (status, reason) = r.ErrorV;
log.Error($"Failed to replace proxy heartbeat record due to [{status}] {reason}");
}
}
}
using System.Text.Json;
using Microsoft.Azure.Functions.Worker;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service.Functions;
public class QueueProxyHearbeat {
private readonly ILogTracer _log;
private readonly IProxyOperations _proxy;
public QueueProxyHearbeat(ILogTracer log, IProxyOperations proxy) {
_log = log;
_proxy = proxy;
}
//[Function("QueueProxyHearbeat")]
public async Async.Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg) {
_log.Info($"heartbeat: {msg}");
var hb = JsonSerializer.Deserialize<ProxyHeartbeat>(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}"); ;
var newHb = hb with { TimeStamp = DateTimeOffset.UtcNow };
var proxy = await _proxy.GetByProxyId(newHb.ProxyId);
var log = _log.WithTag("ProxyId", newHb.ProxyId.ToString());
if (proxy == null) {
log.Warning($"invalid proxy id: {newHb.ProxyId}");
return;
}
var newProxy = proxy with { Heartbeat = newHb };
var r = await _proxy.Replace(newProxy);
if (!r.IsOk) {
var (status, reason) = r.ErrorV;
log.Error($"Failed to replace proxy heartbeat record due to [{status}] {reason}");
}
}
}

View File

@ -1,24 +1,24 @@
using System.Text.Json;
using Microsoft.Azure.Functions.Worker;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service.Functions;
public class QueueWebhooks {
private readonly ILogTracer _log;
private readonly IWebhookMessageLogOperations _webhookMessageLog;
public QueueWebhooks(ILogTracer log, IWebhookMessageLogOperations webhookMessageLog) {
_log = log;
_webhookMessageLog = webhookMessageLog;
}
//[Function("QueueWebhooks")]
public async Async.Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg) {
_log.Info($"Webhook Message Queued: {msg}");
var obj = JsonSerializer.Deserialize<WebhookMessageQueueObj>(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}");
await _webhookMessageLog.ProcessFromQueue(obj);
}
}
using System.Text.Json;
using Microsoft.Azure.Functions.Worker;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service.Functions;
public class QueueWebhooks {
private readonly ILogTracer _log;
private readonly IWebhookMessageLogOperations _webhookMessageLog;
public QueueWebhooks(ILogTracer log, IWebhookMessageLogOperations webhookMessageLog) {
_log = log;
_webhookMessageLog = webhookMessageLog;
}
//[Function("QueueWebhooks")]
public async Async.Task Run([QueueTrigger("myqueue-items", Connection = "AzureWebJobsStorage")] string msg) {
_log.Info($"Webhook Message Queued: {msg}");
var obj = JsonSerializer.Deserialize<WebhookMessageQueueObj>(msg, EntityConverter.GetJsonSerializerOptions()).EnsureNotNull($"wrong data {msg}");
await _webhookMessageLog.ProcessFromQueue(obj);
}
}

View File

@ -1,328 +1,328 @@
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service;
[SerializeValue]
public enum ErrorCode {
INVALID_REQUEST = 450,
INVALID_PERMISSION = 451,
MISSING_EULA_AGREEMENT = 452,
INVALID_JOB = 453,
INVALID_TASK = INVALID_JOB,
UNABLE_TO_ADD_TASK_TO_JOB = 454,
INVALID_CONTAINER = 455,
UNABLE_TO_RESIZE = 456,
UNAUTHORIZED = 457,
UNABLE_TO_USE_STOPPED_JOB = 458,
UNABLE_TO_CHANGE_JOB_DURATION = 459,
UNABLE_TO_CREATE_NETWORK = 460,
VM_CREATE_FAILED = 461,
MISSING_NOTIFICATION = 462,
INVALID_IMAGE = 463,
UNABLE_TO_CREATE = 464,
UNABLE_TO_PORT_FORWARD = 465,
UNABLE_TO_FIND = 467,
TASK_FAILED = 468,
INVALID_NODE = 469,
NOTIFICATION_FAILURE = 470,
UNABLE_TO_UPDATE = 471,
PROXY_FAILED = 472,
INVALID_CONFIGURATION = 473,
UNABLE_TO_CREATE_CONTAINER = 474,
}
public enum VmState {
Init,
ExtensionsLaunch,
ExtensionsFailed,
VmAllocationFailed,
Running,
Stopping,
Stopped
}
public enum WebhookMessageState {
Queued,
Retrying,
Succeeded,
Failed
}
public enum TaskState {
Init,
Waiting,
Scheduled,
SettingUp,
Running,
Stopping,
Stopped,
WaitJob
}
public enum TaskType {
Coverage,
LibfuzzerFuzz,
LibfuzzerCoverage,
LibfuzzerCrashReport,
LibfuzzerMerge,
LibfuzzerRegression,
GenericAnalysis,
GenericSupervisor,
GenericMerge,
GenericGenerator,
GenericCrashReport,
GenericRegression,
DotnetCoverage,
}
public enum Os {
Windows,
Linux
}
public enum ContainerType {
Analysis,
Coverage,
Crashes,
Inputs,
NoRepro,
ReadonlyInputs,
Reports,
Setup,
Tools,
UniqueInputs,
UniqueReports,
RegressionReports,
Logs
}
[SkipRename]
public enum StatsFormat {
AFL
}
public enum TaskDebugFlag {
KeepNodeOnFailure,
KeepNodeOnCompletion,
}
public enum ScalesetState {
Init,
Setup,
Resize,
Running,
Shutdown,
Halt,
CreationFailed
}
public enum JobState {
Init,
Enabled,
Stopping,
Stopped
}
public static class JobStateHelper {
private static readonly IReadOnlySet<JobState> _shuttingDown = new HashSet<JobState>(new[] { JobState.Stopping, JobState.Stopped });
private static readonly IReadOnlySet<JobState> _avaiable = new HashSet<JobState>(new[] { JobState.Init, JobState.Enabled });
private static readonly IReadOnlySet<JobState> _needsWork = new HashSet<JobState>(new[] { JobState.Init, JobState.Stopping });
public static IReadOnlySet<JobState> Available => _avaiable;
public static IReadOnlySet<JobState> NeedsWork => _needsWork;
public static IReadOnlySet<JobState> ShuttingDown => _shuttingDown;
}
public static class ScalesetStateHelper {
private static readonly IReadOnlySet<ScalesetState> _canUpdate = new HashSet<ScalesetState> { ScalesetState.Init, ScalesetState.Resize };
private static readonly IReadOnlySet<ScalesetState> _needsWork =
new HashSet<ScalesetState>{
ScalesetState.Init,
ScalesetState.Setup,
ScalesetState.Resize,
ScalesetState.Shutdown,
ScalesetState.Halt
};
private static readonly IReadOnlySet<ScalesetState> _available = new HashSet<ScalesetState> { ScalesetState.Resize, ScalesetState.Running };
private static readonly IReadOnlySet<ScalesetState> _resizing = new HashSet<ScalesetState> { ScalesetState.Halt, ScalesetState.Init, ScalesetState.Setup };
/// set of states that indicate the scaleset can be updated
public static IReadOnlySet<ScalesetState> CanUpdate => _canUpdate;
/// set of states that indicate work is needed during eventing
public static IReadOnlySet<ScalesetState> NeedsWork => _needsWork;
/// set of states that indicate if it's available for work
public static IReadOnlySet<ScalesetState> Available => _available;
/// set of states that indicate scaleset is resizing
public static IReadOnlySet<ScalesetState> Resizing => _resizing;
}
public static class VmStateHelper {
private static readonly IReadOnlySet<VmState> _needsWork = new HashSet<VmState> { VmState.Init, VmState.ExtensionsLaunch, VmState.Stopping };
private static readonly IReadOnlySet<VmState> _available = new HashSet<VmState> { VmState.Init, VmState.ExtensionsLaunch, VmState.ExtensionsFailed, VmState.VmAllocationFailed, VmState.Running, };
public static IReadOnlySet<VmState> NeedsWork => _needsWork;
public static IReadOnlySet<VmState> Available => _available;
}
public static class TaskStateHelper {
public static readonly IReadOnlySet<TaskState> AvailableStates =
new HashSet<TaskState> { TaskState.Waiting, TaskState.Scheduled, TaskState.SettingUp, TaskState.Running, TaskState.WaitJob };
public static readonly IReadOnlySet<TaskState> NeedsWorkStates =
new HashSet<TaskState> { TaskState.Init, TaskState.Stopping };
public static readonly IReadOnlySet<TaskState> ShuttingDownStates =
new HashSet<TaskState> { TaskState.Stopping, TaskState.Stopped };
public static readonly IReadOnlySet<TaskState> HasStartedStates =
new HashSet<TaskState> { TaskState.Running, TaskState.Stopping, TaskState.Stopped };
public static bool Available(this TaskState state) => AvailableStates.Contains(state);
public static bool NeedsWork(this TaskState state) => NeedsWorkStates.Contains(state);
public static bool ShuttingDown(this TaskState state) => ShuttingDownStates.Contains(state);
public static bool HasStarted(this TaskState state) => HasStartedStates.Contains(state);
}
public enum PoolState {
Init,
Running,
Shutdown,
Halt
}
public static class PoolStateHelper {
private static readonly IReadOnlySet<PoolState> _needsWork = new HashSet<PoolState> { PoolState.Init, PoolState.Shutdown, PoolState.Halt };
private static readonly IReadOnlySet<PoolState> _available = new HashSet<PoolState> { PoolState.Running };
public static IReadOnlySet<PoolState> NeedsWork => _needsWork;
public static IReadOnlySet<PoolState> Available => _available;
}
[SkipRename]
public enum Architecture {
x86_64
}
public enum TaskFeature {
InputQueueFromContainer,
SupervisorExe,
SupervisorEnv,
SupervisorOptions,
SupervisorInputMarker,
StatsFile,
StatsFormat,
TargetExe,
TargetExeOptional,
TargetEnv,
TargetOptions,
AnalyzerExe,
AnalyzerEnv,
AnalyzerOptions,
RenameOutput,
TargetOptionsMerge,
TargetWorkers,
GeneratorExe,
GeneratorEnv,
GeneratorOptions,
WaitForFiles,
TargetTimeout,
CheckAsanLog,
CheckDebugger,
CheckRetryCount,
EnsembleSyncDelay,
PreserveExistingOutputs,
CheckFuzzerHelp,
ExpectCrashOnFailure,
ReportList,
MinimizedStackDepth,
CoverageFilter,
TargetMustUseInput
}
[Flags]
public enum ContainerPermission {
Read = 1 << 0,
Write = 1 << 1,
List = 1 << 2,
Delete = 1 << 3,
}
public enum Compare {
Equal,
AtLeast,
AtMost
}
public enum AgentMode {
Fuzz,
Repro,
Proxy
}
public enum NodeState {
Init,
Free,
SettingUp,
Rebooting,
Ready,
Busy,
Done,
Shutdown,
Halt,
}
public static class NodeStateHelper {
private static readonly IReadOnlySet<NodeState> _needsWork =
new HashSet<NodeState>(new[] { NodeState.Done, NodeState.Shutdown, NodeState.Halt });
private static readonly IReadOnlySet<NodeState> _readyForReset
= new HashSet<NodeState>(new[] { NodeState.Done, NodeState.Shutdown, NodeState.Halt });
private static readonly IReadOnlySet<NodeState> _canProcessNewWork =
new HashSet<NodeState>(new[] { NodeState.Free });
private static readonly IReadOnlySet<NodeState> _busy =
new HashSet<NodeState>(new[] { NodeState.Busy });
public static IReadOnlySet<NodeState> BusyStates => _busy;
public static IReadOnlySet<NodeState> NeedsWorkStates => _needsWork;
public static bool NeedsWork(this NodeState state) => _needsWork.Contains(state);
///If Node is in one of these states, ignore updates from the agent.
public static bool ReadyForReset(this NodeState state) => _readyForReset.Contains(state);
public static bool CanProcessNewWork(this NodeState state) => _canProcessNewWork.Contains(state);
}
public enum NodeDisposalStrategy {
ScaleIn,
Decomission
}
public enum GithubIssueState {
Open,
Closed
}
public enum GithubIssueSearchMatch {
Title,
Body
}
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service;
[SerializeValue]
public enum ErrorCode {
INVALID_REQUEST = 450,
INVALID_PERMISSION = 451,
MISSING_EULA_AGREEMENT = 452,
INVALID_JOB = 453,
INVALID_TASK = INVALID_JOB,
UNABLE_TO_ADD_TASK_TO_JOB = 454,
INVALID_CONTAINER = 455,
UNABLE_TO_RESIZE = 456,
UNAUTHORIZED = 457,
UNABLE_TO_USE_STOPPED_JOB = 458,
UNABLE_TO_CHANGE_JOB_DURATION = 459,
UNABLE_TO_CREATE_NETWORK = 460,
VM_CREATE_FAILED = 461,
MISSING_NOTIFICATION = 462,
INVALID_IMAGE = 463,
UNABLE_TO_CREATE = 464,
UNABLE_TO_PORT_FORWARD = 465,
UNABLE_TO_FIND = 467,
TASK_FAILED = 468,
INVALID_NODE = 469,
NOTIFICATION_FAILURE = 470,
UNABLE_TO_UPDATE = 471,
PROXY_FAILED = 472,
INVALID_CONFIGURATION = 473,
UNABLE_TO_CREATE_CONTAINER = 474,
}
public enum VmState {
Init,
ExtensionsLaunch,
ExtensionsFailed,
VmAllocationFailed,
Running,
Stopping,
Stopped
}
public enum WebhookMessageState {
Queued,
Retrying,
Succeeded,
Failed
}
public enum TaskState {
Init,
Waiting,
Scheduled,
SettingUp,
Running,
Stopping,
Stopped,
WaitJob
}
public enum TaskType {
Coverage,
LibfuzzerFuzz,
LibfuzzerCoverage,
LibfuzzerCrashReport,
LibfuzzerMerge,
LibfuzzerRegression,
GenericAnalysis,
GenericSupervisor,
GenericMerge,
GenericGenerator,
GenericCrashReport,
GenericRegression,
DotnetCoverage,
}
public enum Os {
Windows,
Linux
}
public enum ContainerType {
Analysis,
Coverage,
Crashes,
Inputs,
NoRepro,
ReadonlyInputs,
Reports,
Setup,
Tools,
UniqueInputs,
UniqueReports,
RegressionReports,
Logs
}
[SkipRename]
public enum StatsFormat {
AFL
}
public enum TaskDebugFlag {
KeepNodeOnFailure,
KeepNodeOnCompletion,
}
public enum ScalesetState {
Init,
Setup,
Resize,
Running,
Shutdown,
Halt,
CreationFailed
}
public enum JobState {
Init,
Enabled,
Stopping,
Stopped
}
public static class JobStateHelper {
private static readonly IReadOnlySet<JobState> _shuttingDown = new HashSet<JobState>(new[] { JobState.Stopping, JobState.Stopped });
private static readonly IReadOnlySet<JobState> _avaiable = new HashSet<JobState>(new[] { JobState.Init, JobState.Enabled });
private static readonly IReadOnlySet<JobState> _needsWork = new HashSet<JobState>(new[] { JobState.Init, JobState.Stopping });
public static IReadOnlySet<JobState> Available => _avaiable;
public static IReadOnlySet<JobState> NeedsWork => _needsWork;
public static IReadOnlySet<JobState> ShuttingDown => _shuttingDown;
}
public static class ScalesetStateHelper {
private static readonly IReadOnlySet<ScalesetState> _canUpdate = new HashSet<ScalesetState> { ScalesetState.Init, ScalesetState.Resize };
private static readonly IReadOnlySet<ScalesetState> _needsWork =
new HashSet<ScalesetState>{
ScalesetState.Init,
ScalesetState.Setup,
ScalesetState.Resize,
ScalesetState.Shutdown,
ScalesetState.Halt
};
private static readonly IReadOnlySet<ScalesetState> _available = new HashSet<ScalesetState> { ScalesetState.Resize, ScalesetState.Running };
private static readonly IReadOnlySet<ScalesetState> _resizing = new HashSet<ScalesetState> { ScalesetState.Halt, ScalesetState.Init, ScalesetState.Setup };
/// set of states that indicate the scaleset can be updated
public static IReadOnlySet<ScalesetState> CanUpdate => _canUpdate;
/// set of states that indicate work is needed during eventing
public static IReadOnlySet<ScalesetState> NeedsWork => _needsWork;
/// set of states that indicate if it's available for work
public static IReadOnlySet<ScalesetState> Available => _available;
/// set of states that indicate scaleset is resizing
public static IReadOnlySet<ScalesetState> Resizing => _resizing;
}
public static class VmStateHelper {
private static readonly IReadOnlySet<VmState> _needsWork = new HashSet<VmState> { VmState.Init, VmState.ExtensionsLaunch, VmState.Stopping };
private static readonly IReadOnlySet<VmState> _available = new HashSet<VmState> { VmState.Init, VmState.ExtensionsLaunch, VmState.ExtensionsFailed, VmState.VmAllocationFailed, VmState.Running, };
public static IReadOnlySet<VmState> NeedsWork => _needsWork;
public static IReadOnlySet<VmState> Available => _available;
}
public static class TaskStateHelper {
public static readonly IReadOnlySet<TaskState> AvailableStates =
new HashSet<TaskState> { TaskState.Waiting, TaskState.Scheduled, TaskState.SettingUp, TaskState.Running, TaskState.WaitJob };
public static readonly IReadOnlySet<TaskState> NeedsWorkStates =
new HashSet<TaskState> { TaskState.Init, TaskState.Stopping };
public static readonly IReadOnlySet<TaskState> ShuttingDownStates =
new HashSet<TaskState> { TaskState.Stopping, TaskState.Stopped };
public static readonly IReadOnlySet<TaskState> HasStartedStates =
new HashSet<TaskState> { TaskState.Running, TaskState.Stopping, TaskState.Stopped };
public static bool Available(this TaskState state) => AvailableStates.Contains(state);
public static bool NeedsWork(this TaskState state) => NeedsWorkStates.Contains(state);
public static bool ShuttingDown(this TaskState state) => ShuttingDownStates.Contains(state);
public static bool HasStarted(this TaskState state) => HasStartedStates.Contains(state);
}
public enum PoolState {
Init,
Running,
Shutdown,
Halt
}
public static class PoolStateHelper {
private static readonly IReadOnlySet<PoolState> _needsWork = new HashSet<PoolState> { PoolState.Init, PoolState.Shutdown, PoolState.Halt };
private static readonly IReadOnlySet<PoolState> _available = new HashSet<PoolState> { PoolState.Running };
public static IReadOnlySet<PoolState> NeedsWork => _needsWork;
public static IReadOnlySet<PoolState> Available => _available;
}
[SkipRename]
public enum Architecture {
x86_64
}
public enum TaskFeature {
InputQueueFromContainer,
SupervisorExe,
SupervisorEnv,
SupervisorOptions,
SupervisorInputMarker,
StatsFile,
StatsFormat,
TargetExe,
TargetExeOptional,
TargetEnv,
TargetOptions,
AnalyzerExe,
AnalyzerEnv,
AnalyzerOptions,
RenameOutput,
TargetOptionsMerge,
TargetWorkers,
GeneratorExe,
GeneratorEnv,
GeneratorOptions,
WaitForFiles,
TargetTimeout,
CheckAsanLog,
CheckDebugger,
CheckRetryCount,
EnsembleSyncDelay,
PreserveExistingOutputs,
CheckFuzzerHelp,
ExpectCrashOnFailure,
ReportList,
MinimizedStackDepth,
CoverageFilter,
TargetMustUseInput
}
[Flags]
public enum ContainerPermission {
Read = 1 << 0,
Write = 1 << 1,
List = 1 << 2,
Delete = 1 << 3,
}
public enum Compare {
Equal,
AtLeast,
AtMost
}
public enum AgentMode {
Fuzz,
Repro,
Proxy
}
public enum NodeState {
Init,
Free,
SettingUp,
Rebooting,
Ready,
Busy,
Done,
Shutdown,
Halt,
}
public static class NodeStateHelper {
private static readonly IReadOnlySet<NodeState> _needsWork =
new HashSet<NodeState>(new[] { NodeState.Done, NodeState.Shutdown, NodeState.Halt });
private static readonly IReadOnlySet<NodeState> _readyForReset
= new HashSet<NodeState>(new[] { NodeState.Done, NodeState.Shutdown, NodeState.Halt });
private static readonly IReadOnlySet<NodeState> _canProcessNewWork =
new HashSet<NodeState>(new[] { NodeState.Free });
private static readonly IReadOnlySet<NodeState> _busy =
new HashSet<NodeState>(new[] { NodeState.Busy });
public static IReadOnlySet<NodeState> BusyStates => _busy;
public static IReadOnlySet<NodeState> NeedsWorkStates => _needsWork;
public static bool NeedsWork(this NodeState state) => _needsWork.Contains(state);
///If Node is in one of these states, ignore updates from the agent.
public static bool ReadyForReset(this NodeState state) => _readyForReset.Contains(state);
public static bool CanProcessNewWork(this NodeState state) => _canProcessNewWork.Contains(state);
}
public enum NodeDisposalStrategy {
ScaleIn,
Decomission
}
public enum GithubIssueState {
Open,
Closed
}
public enum GithubIssueSearchMatch {
Title,
Body
}

View File

@ -1,135 +1,135 @@
using System.Threading.Tasks;
using ApiService.OneFuzzLib.Orm;
using Azure.Storage.Sas;
namespace Microsoft.OneFuzz.Service;
public interface IProxyOperations : IStatefulOrm<Proxy, VmState> {
Task<Proxy?> GetByProxyId(Guid proxyId);
Async.Task SetState(Proxy proxy, VmState state);
bool IsAlive(Proxy proxy);
Async.Task SaveProxyConfig(Proxy proxy);
bool IsOutdated(Proxy proxy);
Async.Task<Proxy?> GetOrCreate(string region);
}
public class ProxyOperations : StatefulOrm<Proxy, VmState, ProxyOperations>, IProxyOperations {
static TimeSpan PROXY_LIFESPAN = TimeSpan.FromDays(7);
public ProxyOperations(ILogTracer log, IOnefuzzContext context)
: base(log.WithTag("Component", "scaleset-proxy"), context) {
}
public async Task<Proxy?> GetByProxyId(Guid proxyId) {
var data = QueryAsync(filter: $"RowKey eq '{proxyId}'");
return await data.FirstOrDefaultAsync();
}
public async Async.Task<Proxy?> GetOrCreate(string region) {
var proxyList = QueryAsync(filter: $"region eq '{region}' and outdated eq false");
await foreach (var proxy in proxyList) {
if (IsOutdated(proxy)) {
await Replace(proxy with { Outdated = true });
continue;
}
if (!VmStateHelper.Available.Contains(proxy.State)) {
continue;
}
return proxy;
}
_logTracer.Info($"creating proxy: region:{region}");
var newProxy = new Proxy(region, Guid.NewGuid(), DateTimeOffset.UtcNow, VmState.Init, Auth.BuildAuth(), null, null, _context.ServiceConfiguration.OneFuzzVersion, null, false);
await Replace(newProxy);
await _context.Events.SendEvent(new EventProxyCreated(region, newProxy.ProxyId));
return newProxy;
}
public bool IsAlive(Proxy proxy) {
var tenMinutesAgo = DateTimeOffset.UtcNow - TimeSpan.FromMinutes(10);
if (proxy.Heartbeat != null && proxy.Heartbeat.TimeStamp < tenMinutesAgo) {
_logTracer.Info($"last heartbeat is more than an 10 minutes old: {proxy.Region} - last heartbeat:{proxy.Heartbeat} compared_to:{tenMinutesAgo}");
return false;
}
if (proxy.Heartbeat != null && proxy.TimeStamp != null && proxy.TimeStamp < tenMinutesAgo) {
_logTracer.Error($"no heartbeat in the last 10 minutes: {proxy.Region} timestamp: {proxy.TimeStamp} compared_to:{tenMinutesAgo}");
return false;
}
return true;
}
public bool IsOutdated(Proxy proxy) {
if (!VmStateHelper.Available.Contains(proxy.State)) {
return false;
}
if (proxy.Version != _context.ServiceConfiguration.OneFuzzVersion) {
_logTracer.Info($"mismatch version: proxy:{proxy.Version} service:{_context.ServiceConfiguration.OneFuzzVersion} state:{proxy.State}");
return true;
}
if (proxy.CreatedTimestamp != null) {
if (proxy.CreatedTimestamp < (DateTimeOffset.UtcNow - PROXY_LIFESPAN)) {
_logTracer.Info($"proxy older than 7 days:proxy-created:{proxy.CreatedTimestamp} state:{proxy.State}");
return true;
}
}
return false;
}
public async Async.Task SaveProxyConfig(Proxy proxy) {
var forwards = await GetForwards(proxy);
var url = (await _context.Containers.GetFileSasUrl(new Container("proxy-configs"), $"{proxy.Region}/{proxy.ProxyId}/config.json", StorageType.Config, BlobSasPermissions.Read)).EnsureNotNull("Can't generate file sas");
var queueSas = await _context.Queue.GetQueueSas("proxy", StorageType.Config, QueueSasPermissions.Add).EnsureNotNull("can't generate queue sas") ?? throw new Exception("Queue sas is null");
var proxyConfig = new ProxyConfig(
Url: url,
Notification: queueSas,
Region: proxy.Region,
ProxyId: proxy.ProxyId,
Forwards: forwards,
InstanceTelemetryKey: _context.ServiceConfiguration.ApplicationInsightsInstrumentationKey.EnsureNotNull("missing InstrumentationKey"),
MicrosoftTelemetryKey: _context.ServiceConfiguration.OneFuzzTelemetry.EnsureNotNull("missing Telemetry"),
InstanceId: await _context.Containers.GetInstanceId());
await _context.Containers.SaveBlob(new Container("proxy-configs"), $"{proxy.Region}/{proxy.ProxyId}/config.json", _entityConverter.ToJsonString(proxyConfig), StorageType.Config);
}
public async Async.Task SetState(Proxy proxy, VmState state) {
if (proxy.State == state) {
return;
}
await Replace(proxy with { State = state });
await _context.Events.SendEvent(new EventProxyStateUpdated(proxy.Region, proxy.ProxyId, proxy.State));
}
public async Async.Task<List<Forward>> GetForwards(Proxy proxy) {
var forwards = new List<Forward>();
await foreach (var entry in _context.ProxyForwardOperations.SearchForward(region: proxy.Region, proxyId: proxy.ProxyId)) {
if (entry.EndTime < DateTimeOffset.UtcNow) {
await _context.ProxyForwardOperations.Delete(entry);
} else {
forwards.Add(new Forward(entry.Port, entry.DstPort, entry.DstIp));
}
}
return forwards;
}
}
using System.Threading.Tasks;
using ApiService.OneFuzzLib.Orm;
using Azure.Storage.Sas;
namespace Microsoft.OneFuzz.Service;
public interface IProxyOperations : IStatefulOrm<Proxy, VmState> {
Task<Proxy?> GetByProxyId(Guid proxyId);
Async.Task SetState(Proxy proxy, VmState state);
bool IsAlive(Proxy proxy);
Async.Task SaveProxyConfig(Proxy proxy);
bool IsOutdated(Proxy proxy);
Async.Task<Proxy?> GetOrCreate(string region);
}
public class ProxyOperations : StatefulOrm<Proxy, VmState, ProxyOperations>, IProxyOperations {
static TimeSpan PROXY_LIFESPAN = TimeSpan.FromDays(7);
public ProxyOperations(ILogTracer log, IOnefuzzContext context)
: base(log.WithTag("Component", "scaleset-proxy"), context) {
}
public async Task<Proxy?> GetByProxyId(Guid proxyId) {
var data = QueryAsync(filter: $"RowKey eq '{proxyId}'");
return await data.FirstOrDefaultAsync();
}
public async Async.Task<Proxy?> GetOrCreate(string region) {
var proxyList = QueryAsync(filter: $"region eq '{region}' and outdated eq false");
await foreach (var proxy in proxyList) {
if (IsOutdated(proxy)) {
await Replace(proxy with { Outdated = true });
continue;
}
if (!VmStateHelper.Available.Contains(proxy.State)) {
continue;
}
return proxy;
}
_logTracer.Info($"creating proxy: region:{region}");
var newProxy = new Proxy(region, Guid.NewGuid(), DateTimeOffset.UtcNow, VmState.Init, Auth.BuildAuth(), null, null, _context.ServiceConfiguration.OneFuzzVersion, null, false);
await Replace(newProxy);
await _context.Events.SendEvent(new EventProxyCreated(region, newProxy.ProxyId));
return newProxy;
}
public bool IsAlive(Proxy proxy) {
var tenMinutesAgo = DateTimeOffset.UtcNow - TimeSpan.FromMinutes(10);
if (proxy.Heartbeat != null && proxy.Heartbeat.TimeStamp < tenMinutesAgo) {
_logTracer.Info($"last heartbeat is more than an 10 minutes old: {proxy.Region} - last heartbeat:{proxy.Heartbeat} compared_to:{tenMinutesAgo}");
return false;
}
if (proxy.Heartbeat != null && proxy.TimeStamp != null && proxy.TimeStamp < tenMinutesAgo) {
_logTracer.Error($"no heartbeat in the last 10 minutes: {proxy.Region} timestamp: {proxy.TimeStamp} compared_to:{tenMinutesAgo}");
return false;
}
return true;
}
public bool IsOutdated(Proxy proxy) {
if (!VmStateHelper.Available.Contains(proxy.State)) {
return false;
}
if (proxy.Version != _context.ServiceConfiguration.OneFuzzVersion) {
_logTracer.Info($"mismatch version: proxy:{proxy.Version} service:{_context.ServiceConfiguration.OneFuzzVersion} state:{proxy.State}");
return true;
}
if (proxy.CreatedTimestamp != null) {
if (proxy.CreatedTimestamp < (DateTimeOffset.UtcNow - PROXY_LIFESPAN)) {
_logTracer.Info($"proxy older than 7 days:proxy-created:{proxy.CreatedTimestamp} state:{proxy.State}");
return true;
}
}
return false;
}
public async Async.Task SaveProxyConfig(Proxy proxy) {
var forwards = await GetForwards(proxy);
var url = (await _context.Containers.GetFileSasUrl(new Container("proxy-configs"), $"{proxy.Region}/{proxy.ProxyId}/config.json", StorageType.Config, BlobSasPermissions.Read)).EnsureNotNull("Can't generate file sas");
var queueSas = await _context.Queue.GetQueueSas("proxy", StorageType.Config, QueueSasPermissions.Add).EnsureNotNull("can't generate queue sas") ?? throw new Exception("Queue sas is null");
var proxyConfig = new ProxyConfig(
Url: url,
Notification: queueSas,
Region: proxy.Region,
ProxyId: proxy.ProxyId,
Forwards: forwards,
InstanceTelemetryKey: _context.ServiceConfiguration.ApplicationInsightsInstrumentationKey.EnsureNotNull("missing InstrumentationKey"),
MicrosoftTelemetryKey: _context.ServiceConfiguration.OneFuzzTelemetry.EnsureNotNull("missing Telemetry"),
InstanceId: await _context.Containers.GetInstanceId());
await _context.Containers.SaveBlob(new Container("proxy-configs"), $"{proxy.Region}/{proxy.ProxyId}/config.json", _entityConverter.ToJsonString(proxyConfig), StorageType.Config);
}
public async Async.Task SetState(Proxy proxy, VmState state) {
if (proxy.State == state) {
return;
}
await Replace(proxy with { State = state });
await _context.Events.SendEvent(new EventProxyStateUpdated(proxy.Region, proxy.ProxyId, proxy.State));
}
public async Async.Task<List<Forward>> GetForwards(Proxy proxy) {
var forwards = new List<Forward>();
await foreach (var entry in _context.ProxyForwardOperations.SearchForward(region: proxy.Region, proxyId: proxy.ProxyId)) {
if (entry.EndTime < DateTimeOffset.UtcNow) {
await _context.ProxyForwardOperations.Delete(entry);
} else {
forwards.Add(new Forward(entry.Port, entry.DstPort, entry.DstIp));
}
}
return forwards;
}
}

View File

@ -1,253 +1,253 @@
using System.Net;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text.Json;
using System.Threading.Tasks;
using ApiService.OneFuzzLib.Orm;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service;
public interface IWebhookOperations : IOrm<Webhook> {
Async.Task SendEvent(EventMessage eventMessage);
Async.Task<Webhook?> GetByWebhookId(Guid webhookId);
Async.Task<bool> Send(WebhookMessageLog messageLog);
Task<EventPing> Ping(Webhook webhook);
}
public class WebhookOperations : Orm<Webhook>, IWebhookOperations {
private readonly IHttpClientFactory _httpFactory;
public WebhookOperations(IHttpClientFactory httpFactory, ILogTracer log, IOnefuzzContext context)
: base(log, context) {
_httpFactory = httpFactory;
}
async public Async.Task SendEvent(EventMessage eventMessage) {
await foreach (var webhook in GetWebhooksCached()) {
if (!webhook.EventTypes.Contains(eventMessage.EventType)) {
continue;
}
await AddEvent(webhook, eventMessage);
}
}
async private Async.Task AddEvent(Webhook webhook, EventMessage eventMessage) {
var message = new WebhookMessageLog(
EventId: eventMessage.EventId,
EventType: eventMessage.EventType,
Event: eventMessage.Event,
InstanceId: eventMessage.InstanceId,
InstanceName: eventMessage.InstanceName,
WebhookId: webhook.WebhookId,
TryCount: 0
);
var r = await _context.WebhookMessageLogOperations.Replace(message);
if (!r.IsOk) {
var (status, reason) = r.ErrorV;
_logTracer.Error($"Failed to replace webhook message log due to [{status}] {reason}");
}
await _context.WebhookMessageLogOperations.QueueWebhook(message);
}
public async Async.Task<bool> Send(WebhookMessageLog messageLog) {
var webhook = await GetByWebhookId(messageLog.WebhookId);
if (webhook == null || webhook.Url == null) {
throw new Exception($"Invalid Webhook. Webhook with WebhookId: {messageLog.WebhookId} Not Found");
}
var (data, digest) = await BuildMessage(webhookId: webhook.WebhookId, eventId: messageLog.EventId, eventType: messageLog.EventType, webhookEvent: messageLog.Event, secretToken: webhook.SecretToken, messageFormat: webhook.MessageFormat);
var headers = new Dictionary<string, string> { { "User-Agent", $"onefuzz-webhook {_context.ServiceConfiguration.OneFuzzVersion}" } };
if (digest != null) {
headers["X-Onefuzz-Digest"] = digest;
}
var client = new Request(_httpFactory.CreateClient());
_logTracer.Info(data);
var response = client.Post(url: webhook.Url, json: data, headers: headers);
var result = response.Result;
if (result.StatusCode == HttpStatusCode.Accepted) {
return true;
}
return false;
}
public async Task<EventPing> Ping(Webhook webhook) {
var ping = new EventPing(Guid.NewGuid());
var instanceId = await _context.Containers.GetInstanceId();
var instanceName = _context.Creds.GetInstanceName();
await AddEvent(webhook, new EventMessage(Guid.NewGuid(), EventType.Ping, ping, instanceId, instanceName));
return ping;
}
// Not converting to bytes, as it's not neccessary in C#. Just keeping as string.
public async Async.Task<Tuple<string, string?>> BuildMessage(Guid webhookId, Guid eventId, EventType eventType, BaseEvent webhookEvent, String? secretToken, WebhookMessageFormat? messageFormat) {
var entityConverter = new EntityConverter();
string data = "";
if (messageFormat != null && messageFormat == WebhookMessageFormat.EventGrid) {
var eventGridMessage = new[] { new WebhookMessageEventGrid(Id: eventId, Data: webhookEvent, DataVersion: "1.0.0", Subject: _context.Creds.GetInstanceName(), EventType: eventType, EventTime: DateTimeOffset.UtcNow) };
data = JsonSerializer.Serialize(eventGridMessage, options: EntityConverter.GetJsonSerializerOptions());
} else {
var instanceId = await _context.Containers.GetInstanceId();
var webhookMessage = new WebhookMessage(WebhookId: webhookId, EventId: eventId, EventType: eventType, Event: webhookEvent, InstanceId: instanceId, InstanceName: _context.Creds.GetInstanceName());
data = JsonSerializer.Serialize(webhookMessage, options: EntityConverter.GetJsonSerializerOptions());
}
string? digest = null;
var hmac = HMAC.Create("HMACSHA512");
if (secretToken != null && hmac != null) {
hmac.Key = System.Text.Encoding.UTF8.GetBytes(secretToken);
digest = Convert.ToHexString(hmac.ComputeHash(System.Text.Encoding.UTF8.GetBytes(data)));
}
return new Tuple<string, string?>(data, digest);
}
public async Async.Task<Webhook?> GetByWebhookId(Guid webhookId) {
var data = QueryAsync(filter: $"PartitionKey eq '{webhookId}'");
return await data.FirstOrDefaultAsync();
}
//todo: caching
public IAsyncEnumerable<Webhook> GetWebhooksCached() {
return QueryAsync();
}
}
public interface IWebhookMessageLogOperations : IOrm<WebhookMessageLog> {
IAsyncEnumerable<WebhookMessageLog> SearchExpired();
public Async.Task ProcessFromQueue(WebhookMessageQueueObj obj);
public Async.Task QueueWebhook(WebhookMessageLog webhookLog);
}
public class WebhookMessageLogOperations : Orm<WebhookMessageLog>, IWebhookMessageLogOperations {
const int EXPIRE_DAYS = 7;
const int MAX_TRIES = 5;
public WebhookMessageLogOperations(IHttpClientFactory httpFactory, ILogTracer log, IOnefuzzContext context)
: base(log, context) {
}
public async Async.Task QueueWebhook(WebhookMessageLog webhookLog) {
var obj = new WebhookMessageQueueObj(webhookLog.WebhookId, webhookLog.EventId);
TimeSpan? visibilityTimeout = webhookLog.State switch {
WebhookMessageState.Queued => TimeSpan.Zero,
WebhookMessageState.Retrying => TimeSpan.FromSeconds(30),
_ => null
};
if (visibilityTimeout == null) {
_logTracer.WithTags(
new[] {
("WebhookId", webhookLog.WebhookId.ToString()),
("EventId", webhookLog.EventId.ToString()) }
).
Error($"invalid WebhookMessage queue state, not queuing. {webhookLog.WebhookId}:{webhookLog.EventId} - {webhookLog.State}");
} else {
await _context.Queue.QueueObject("webhooks", obj, StorageType.Config, visibilityTimeout: visibilityTimeout);
}
}
public async Async.Task ProcessFromQueue(WebhookMessageQueueObj obj) {
var message = await GetWebhookMessageById(obj.WebhookId, obj.EventId);
if (message == null) {
_logTracer.WithTags(
new[] {
("WebhookId", obj.WebhookId.ToString()),
("EventId", obj.EventId.ToString()) }
).
Error($"webhook message log not found for webhookId: {obj.WebhookId} and eventId: {obj.EventId}");
} else {
await Process(message);
}
}
private async Async.Task Process(WebhookMessageLog message) {
if (message.State == WebhookMessageState.Failed || message.State == WebhookMessageState.Succeeded) {
_logTracer.WithTags(
new[] {
("WebhookId", message.WebhookId.ToString()),
("EventId", message.EventId.ToString()) }
).
Error($"webhook message already handled. {message.WebhookId}:{message.EventId}");
return;
}
var newMessage = message with { TryCount = message.TryCount + 1 };
_logTracer.Info($"sending webhook: {message.WebhookId}:{message.EventId}");
var success = await Send(newMessage);
if (success) {
newMessage = newMessage with { State = WebhookMessageState.Succeeded };
await Replace(newMessage);
_logTracer.Info($"sent webhook event {newMessage.WebhookId}:{newMessage.EventId}");
} else if (newMessage.TryCount < MAX_TRIES) {
newMessage = newMessage with { State = WebhookMessageState.Retrying };
await Replace(newMessage);
await QueueWebhook(newMessage);
_logTracer.Warning($"sending webhook event failed, re-queued {newMessage.WebhookId}:{newMessage.EventId}");
} else {
newMessage = newMessage with { State = WebhookMessageState.Failed };
await Replace(newMessage);
_logTracer.Info($"sending webhook: {newMessage.WebhookId} event: {newMessage.EventId} failed {newMessage.TryCount} times.");
}
}
private async Async.Task<bool> Send(WebhookMessageLog message) {
var webhook = await _context.WebhookOperations.GetByWebhookId(message.WebhookId);
if (webhook == null) {
_logTracer.WithTags(
new[] {
("WebhookId", message.WebhookId.ToString()),
}
).
Error($"webhook not found for webhookId: {message.WebhookId}");
return false;
}
try {
return await _context.WebhookOperations.Send(message);
} catch (Exception exc) {
_logTracer.WithTags(
new[] {
("WebhookId", message.WebhookId.ToString())
}
).
Exception(exc);
return false;
}
}
private void QueueObject(string v, WebhookMessageQueueObj obj, StorageType config, int? visibility_timeout) {
throw new NotImplementedException();
}
public IAsyncEnumerable<WebhookMessageLog> SearchExpired() {
var expireTime = (DateTimeOffset.UtcNow - TimeSpan.FromDays(EXPIRE_DAYS)).ToString("o");
var timeFilter = $"Timestamp lt datetime'{expireTime}'";
return QueryAsync(filter: timeFilter);
}
public async Async.Task<WebhookMessageLog?> GetWebhookMessageById(Guid webhookId, Guid eventId) {
var data = QueryAsync(filter: $"PartitionKey eq '{webhookId}' and RowKey eq '{eventId}'");
return await data.FirstOrDefaultAsync();
}
}
using System.Net;
using System.Net.Http;
using System.Security.Cryptography;
using System.Text.Json;
using System.Threading.Tasks;
using ApiService.OneFuzzLib.Orm;
using Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
namespace Microsoft.OneFuzz.Service;
public interface IWebhookOperations : IOrm<Webhook> {
Async.Task SendEvent(EventMessage eventMessage);
Async.Task<Webhook?> GetByWebhookId(Guid webhookId);
Async.Task<bool> Send(WebhookMessageLog messageLog);
Task<EventPing> Ping(Webhook webhook);
}
public class WebhookOperations : Orm<Webhook>, IWebhookOperations {
private readonly IHttpClientFactory _httpFactory;
public WebhookOperations(IHttpClientFactory httpFactory, ILogTracer log, IOnefuzzContext context)
: base(log, context) {
_httpFactory = httpFactory;
}
async public Async.Task SendEvent(EventMessage eventMessage) {
await foreach (var webhook in GetWebhooksCached()) {
if (!webhook.EventTypes.Contains(eventMessage.EventType)) {
continue;
}
await AddEvent(webhook, eventMessage);
}
}
async private Async.Task AddEvent(Webhook webhook, EventMessage eventMessage) {
var message = new WebhookMessageLog(
EventId: eventMessage.EventId,
EventType: eventMessage.EventType,
Event: eventMessage.Event,
InstanceId: eventMessage.InstanceId,
InstanceName: eventMessage.InstanceName,
WebhookId: webhook.WebhookId,
TryCount: 0
);
var r = await _context.WebhookMessageLogOperations.Replace(message);
if (!r.IsOk) {
var (status, reason) = r.ErrorV;
_logTracer.Error($"Failed to replace webhook message log due to [{status}] {reason}");
}
await _context.WebhookMessageLogOperations.QueueWebhook(message);
}
public async Async.Task<bool> Send(WebhookMessageLog messageLog) {
var webhook = await GetByWebhookId(messageLog.WebhookId);
if (webhook == null || webhook.Url == null) {
throw new Exception($"Invalid Webhook. Webhook with WebhookId: {messageLog.WebhookId} Not Found");
}
var (data, digest) = await BuildMessage(webhookId: webhook.WebhookId, eventId: messageLog.EventId, eventType: messageLog.EventType, webhookEvent: messageLog.Event, secretToken: webhook.SecretToken, messageFormat: webhook.MessageFormat);
var headers = new Dictionary<string, string> { { "User-Agent", $"onefuzz-webhook {_context.ServiceConfiguration.OneFuzzVersion}" } };
if (digest != null) {
headers["X-Onefuzz-Digest"] = digest;
}
var client = new Request(_httpFactory.CreateClient());
_logTracer.Info(data);
var response = client.Post(url: webhook.Url, json: data, headers: headers);
var result = response.Result;
if (result.StatusCode == HttpStatusCode.Accepted) {
return true;
}
return false;
}
public async Task<EventPing> Ping(Webhook webhook) {
var ping = new EventPing(Guid.NewGuid());
var instanceId = await _context.Containers.GetInstanceId();
var instanceName = _context.Creds.GetInstanceName();
await AddEvent(webhook, new EventMessage(Guid.NewGuid(), EventType.Ping, ping, instanceId, instanceName));
return ping;
}
// Not converting to bytes, as it's not neccessary in C#. Just keeping as string.
public async Async.Task<Tuple<string, string?>> BuildMessage(Guid webhookId, Guid eventId, EventType eventType, BaseEvent webhookEvent, String? secretToken, WebhookMessageFormat? messageFormat) {
var entityConverter = new EntityConverter();
string data = "";
if (messageFormat != null && messageFormat == WebhookMessageFormat.EventGrid) {
var eventGridMessage = new[] { new WebhookMessageEventGrid(Id: eventId, Data: webhookEvent, DataVersion: "1.0.0", Subject: _context.Creds.GetInstanceName(), EventType: eventType, EventTime: DateTimeOffset.UtcNow) };
data = JsonSerializer.Serialize(eventGridMessage, options: EntityConverter.GetJsonSerializerOptions());
} else {
var instanceId = await _context.Containers.GetInstanceId();
var webhookMessage = new WebhookMessage(WebhookId: webhookId, EventId: eventId, EventType: eventType, Event: webhookEvent, InstanceId: instanceId, InstanceName: _context.Creds.GetInstanceName());
data = JsonSerializer.Serialize(webhookMessage, options: EntityConverter.GetJsonSerializerOptions());
}
string? digest = null;
var hmac = HMAC.Create("HMACSHA512");
if (secretToken != null && hmac != null) {
hmac.Key = System.Text.Encoding.UTF8.GetBytes(secretToken);
digest = Convert.ToHexString(hmac.ComputeHash(System.Text.Encoding.UTF8.GetBytes(data)));
}
return new Tuple<string, string?>(data, digest);
}
public async Async.Task<Webhook?> GetByWebhookId(Guid webhookId) {
var data = QueryAsync(filter: $"PartitionKey eq '{webhookId}'");
return await data.FirstOrDefaultAsync();
}
//todo: caching
public IAsyncEnumerable<Webhook> GetWebhooksCached() {
return QueryAsync();
}
}
public interface IWebhookMessageLogOperations : IOrm<WebhookMessageLog> {
IAsyncEnumerable<WebhookMessageLog> SearchExpired();
public Async.Task ProcessFromQueue(WebhookMessageQueueObj obj);
public Async.Task QueueWebhook(WebhookMessageLog webhookLog);
}
public class WebhookMessageLogOperations : Orm<WebhookMessageLog>, IWebhookMessageLogOperations {
const int EXPIRE_DAYS = 7;
const int MAX_TRIES = 5;
public WebhookMessageLogOperations(IHttpClientFactory httpFactory, ILogTracer log, IOnefuzzContext context)
: base(log, context) {
}
public async Async.Task QueueWebhook(WebhookMessageLog webhookLog) {
var obj = new WebhookMessageQueueObj(webhookLog.WebhookId, webhookLog.EventId);
TimeSpan? visibilityTimeout = webhookLog.State switch {
WebhookMessageState.Queued => TimeSpan.Zero,
WebhookMessageState.Retrying => TimeSpan.FromSeconds(30),
_ => null
};
if (visibilityTimeout == null) {
_logTracer.WithTags(
new[] {
("WebhookId", webhookLog.WebhookId.ToString()),
("EventId", webhookLog.EventId.ToString()) }
).
Error($"invalid WebhookMessage queue state, not queuing. {webhookLog.WebhookId}:{webhookLog.EventId} - {webhookLog.State}");
} else {
await _context.Queue.QueueObject("webhooks", obj, StorageType.Config, visibilityTimeout: visibilityTimeout);
}
}
public async Async.Task ProcessFromQueue(WebhookMessageQueueObj obj) {
var message = await GetWebhookMessageById(obj.WebhookId, obj.EventId);
if (message == null) {
_logTracer.WithTags(
new[] {
("WebhookId", obj.WebhookId.ToString()),
("EventId", obj.EventId.ToString()) }
).
Error($"webhook message log not found for webhookId: {obj.WebhookId} and eventId: {obj.EventId}");
} else {
await Process(message);
}
}
private async Async.Task Process(WebhookMessageLog message) {
if (message.State == WebhookMessageState.Failed || message.State == WebhookMessageState.Succeeded) {
_logTracer.WithTags(
new[] {
("WebhookId", message.WebhookId.ToString()),
("EventId", message.EventId.ToString()) }
).
Error($"webhook message already handled. {message.WebhookId}:{message.EventId}");
return;
}
var newMessage = message with { TryCount = message.TryCount + 1 };
_logTracer.Info($"sending webhook: {message.WebhookId}:{message.EventId}");
var success = await Send(newMessage);
if (success) {
newMessage = newMessage with { State = WebhookMessageState.Succeeded };
await Replace(newMessage);
_logTracer.Info($"sent webhook event {newMessage.WebhookId}:{newMessage.EventId}");
} else if (newMessage.TryCount < MAX_TRIES) {
newMessage = newMessage with { State = WebhookMessageState.Retrying };
await Replace(newMessage);
await QueueWebhook(newMessage);
_logTracer.Warning($"sending webhook event failed, re-queued {newMessage.WebhookId}:{newMessage.EventId}");
} else {
newMessage = newMessage with { State = WebhookMessageState.Failed };
await Replace(newMessage);
_logTracer.Info($"sending webhook: {newMessage.WebhookId} event: {newMessage.EventId} failed {newMessage.TryCount} times.");
}
}
private async Async.Task<bool> Send(WebhookMessageLog message) {
var webhook = await _context.WebhookOperations.GetByWebhookId(message.WebhookId);
if (webhook == null) {
_logTracer.WithTags(
new[] {
("WebhookId", message.WebhookId.ToString()),
}
).
Error($"webhook not found for webhookId: {message.WebhookId}");
return false;
}
try {
return await _context.WebhookOperations.Send(message);
} catch (Exception exc) {
_logTracer.WithTags(
new[] {
("WebhookId", message.WebhookId.ToString())
}
).
Exception(exc);
return false;
}
}
private void QueueObject(string v, WebhookMessageQueueObj obj, StorageType config, int? visibility_timeout) {
throw new NotImplementedException();
}
public IAsyncEnumerable<WebhookMessageLog> SearchExpired() {
var expireTime = (DateTimeOffset.UtcNow - TimeSpan.FromDays(EXPIRE_DAYS)).ToString("o");
var timeFilter = $"Timestamp lt datetime'{expireTime}'";
return QueryAsync(filter: timeFilter);
}
public async Async.Task<WebhookMessageLog?> GetWebhookMessageById(Guid webhookId, Guid eventId) {
var data = QueryAsync(filter: $"PartitionKey eq '{webhookId}' and RowKey eq '{eventId}'");
return await data.FirstOrDefaultAsync();
}
}

View File

@ -1,325 +1,325 @@
using System.Collections.Concurrent;
using System.Linq.Expressions;
using System.Reflection;
using System.Text.Json;
using System.Text.Json.Serialization;
using Azure;
using Azure.Data.Tables;
namespace Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
public abstract record EntityBase {
[JsonIgnore] public ETag? ETag { get; set; }
public DateTimeOffset? TimeStamp { get; set; }
// https://docs.microsoft.com/en-us/rest/api/storageservices/designing-a-scalable-partitioning-strategy-for-azure-table-storage#yyy
// Produce "good-quality-table-key" based on a DateTimeOffset timestamp
public static string NewSortedKey => $"{DateTimeOffset.MaxValue.Ticks - DateTimeOffset.UtcNow.Ticks}";
}
public abstract record StatefulEntityBase<T>([property: JsonIgnore] T BaseState) : EntityBase() where T : Enum;
/// How the value is populated
public enum InitMethod {
//T() will be used
DefaultConstructor,
}
[AttributeUsage(AttributeTargets.Parameter)]
public class DefaultValueAttribute : Attribute {
public InitMethod InitMethod { get; }
public DefaultValueAttribute(InitMethod initMethod) {
InitMethod = initMethod;
}
}
/// Indicates that the enum cases should no be renamed
[AttributeUsage(AttributeTargets.Enum)]
public class SerializeValueAttribute : Attribute { }
/// Indicates that the enum cases should no be renamed
[AttributeUsage(AttributeTargets.Enum)]
public class SkipRenameAttribute : Attribute { }
[AttributeUsage(AttributeTargets.Parameter)]
public class RowKeyAttribute : Attribute { }
[AttributeUsage(AttributeTargets.Parameter)]
public class PartitionKeyAttribute : Attribute { }
[AttributeUsage(AttributeTargets.Property)]
public class TypeDiscrimnatorAttribute : Attribute {
public string FieldName { get; }
// the type of a function that takes the value of fieldName as an input and return the type
public Type ConverterType { get; }
public TypeDiscrimnatorAttribute(string fieldName, Type converterType) {
if (!converterType.IsAssignableTo(typeof(ITypeProvider))) {
throw new ArgumentException($"the provided type needs to implement ITypeProvider");
}
FieldName = fieldName;
ConverterType = converterType;
}
}
public interface ITypeProvider {
Type GetTypeInfo(object input);
}
public enum EntityPropertyKind {
PartitionKey,
RowKey,
Column
}
public record EntityProperty(
string name,
string columnName,
Type type,
EntityPropertyKind kind,
(TypeDiscrimnatorAttribute, ITypeProvider)? discriminator,
DefaultValueAttribute? defaultValue,
ParameterInfo parameterInfo
);
public record EntityInfo(Type type, ILookup<string, EntityProperty> properties, Func<object?[], object> constructor);
class OnefuzzNamingPolicy : JsonNamingPolicy {
public override string ConvertName(string name) {
return CaseConverter.PascalToSnake(name);
}
}
public class EntityConverter {
private readonly JsonSerializerOptions _options;
private readonly ConcurrentDictionary<Type, EntityInfo> _cache;
public EntityConverter() {
_options = GetJsonSerializerOptions();
_cache = new ConcurrentDictionary<Type, EntityInfo>();
}
public static JsonSerializerOptions GetJsonSerializerOptions() {
var options = new JsonSerializerOptions() {
PropertyNamingPolicy = new OnefuzzNamingPolicy(),
};
options.Converters.Add(new CustomEnumConverterFactory());
options.Converters.Add(new PolymorphicConverterFactory());
return options;
}
internal static Func<object?[], object> BuildConstructerFrom(ConstructorInfo constructorInfo) {
var constructorParameters = Expression.Parameter(typeof(object?[]));
var parameterExpressions =
constructorInfo.GetParameters().Select((parameterInfo, i) => {
var ithIndex = Expression.Constant(i);
var ithParameter = Expression.ArrayIndex(constructorParameters, ithIndex);
var unboxedIthParameter = Expression.Convert(ithParameter, parameterInfo.ParameterType);
return unboxedIthParameter;
}).ToArray();
NewExpression constructorCall = Expression.New(constructorInfo, parameterExpressions);
Func<object?[], object> ctor = Expression.Lambda<Func<object?[], object>>(constructorCall, constructorParameters).Compile();
return ctor;
}
private static IEnumerable<EntityProperty> GetEntityProperties<T>(ParameterInfo parameterInfo) {
var name = parameterInfo.Name.EnsureNotNull($"Invalid paramter {parameterInfo}");
var parameterType = parameterInfo.ParameterType.EnsureNotNull($"Invalid paramter {parameterInfo}");
var isRowkey = parameterInfo.GetCustomAttribute(typeof(RowKeyAttribute)) != null;
var isPartitionkey = parameterInfo.GetCustomAttribute(typeof(PartitionKeyAttribute)) != null;
var discriminatorAttribute = typeof(T).GetProperty(name)?.GetCustomAttribute<TypeDiscrimnatorAttribute>();
var defaultValueAttribute = parameterInfo.GetCustomAttribute<DefaultValueAttribute>();
(TypeDiscrimnatorAttribute, ITypeProvider)? discriminator = null;
if (discriminatorAttribute != null) {
var t = (ITypeProvider)(Activator.CreateInstance(discriminatorAttribute.ConverterType) ?? throw new Exception("unable to retrive the type provider"));
discriminator = (discriminatorAttribute, t);
}
if (isPartitionkey) {
yield return new EntityProperty(name, "PartitionKey", parameterType, EntityPropertyKind.PartitionKey, discriminator, defaultValueAttribute, parameterInfo);
}
if (isRowkey) {
yield return new EntityProperty(name, "RowKey", parameterType, EntityPropertyKind.RowKey, discriminator, defaultValueAttribute, parameterInfo);
}
if (!isPartitionkey && !isRowkey) {
var columnName = typeof(T).GetProperty(name)?.GetCustomAttribute<JsonPropertyNameAttribute>()?.Name ?? CaseConverter.PascalToSnake(name);
yield return new EntityProperty(name, columnName, parameterType, EntityPropertyKind.Column, discriminator, defaultValueAttribute, parameterInfo);
}
}
private EntityInfo GetEntityInfo<T>() {
return _cache.GetOrAdd(typeof(T), type => {
var constructor = type.GetConstructors()[0];
var parameterInfos = constructor.GetParameters();
var parameters =
parameterInfos.SelectMany(GetEntityProperties<T>).ToArray();
return new EntityInfo(typeof(T), parameters.ToLookup(x => x.name), BuildConstructerFrom(constructor));
});
}
public string ToJsonString<T>(T typedEntity) => JsonSerializer.Serialize(typedEntity, _options);
public T? FromJsonString<T>(string value) => JsonSerializer.Deserialize<T>(value, _options);
public TableEntity ToTableEntity<T>(T typedEntity) where T : EntityBase {
if (typedEntity == null) {
throw new ArgumentNullException(nameof(typedEntity));
}
var type = typeof(T);
var entityInfo = GetEntityInfo<T>();
Dictionary<string, object?> columnValues = entityInfo.properties.SelectMany(x => x).Select(prop => {
var value = entityInfo.type.GetProperty(prop.name)?.GetValue(typedEntity);
if (value == null) {
return (prop.columnName, value: (object?)null);
}
if (prop.kind == EntityPropertyKind.PartitionKey || prop.kind == EntityPropertyKind.RowKey) {
return (prop.columnName, value?.ToString());
}
if (prop.type == typeof(Guid) || prop.type == typeof(Guid?) || prop.type == typeof(Uri)) {
return (prop.columnName, value?.ToString());
}
if (prop.type == typeof(bool)
|| prop.type == typeof(bool?)
|| prop.type == typeof(string)
|| prop.type == typeof(DateTime)
|| prop.type == typeof(DateTime?)
|| prop.type == typeof(DateTimeOffset)
|| prop.type == typeof(DateTimeOffset?)
|| prop.type == typeof(int)
|| prop.type == typeof(int?)
|| prop.type == typeof(long)
|| prop.type == typeof(long?)
|| prop.type == typeof(double)
|| prop.type == typeof(double?)
) {
return (prop.columnName, value);
}
var serialized = JsonSerializer.Serialize(value, _options);
return (prop.columnName, serialized.Trim('"'));
}).ToDictionary(x => x.columnName, x => x.value);
var tableEntity = new TableEntity(columnValues);
if (typedEntity.ETag.HasValue) {
tableEntity.ETag = typedEntity.ETag.Value;
}
return tableEntity;
}
private object? GetFieldValue(EntityInfo info, string name, TableEntity entity) {
var ef = info.properties[name].First();
if (ef.kind == EntityPropertyKind.PartitionKey || ef.kind == EntityPropertyKind.RowKey) {
if (ef.type == typeof(string))
return entity.GetString(ef.kind.ToString());
else if (ef.type == typeof(Guid))
return Guid.Parse(entity.GetString(ef.kind.ToString()));
else if (ef.type == typeof(int))
return int.Parse(entity.GetString(ef.kind.ToString()));
else if (ef.type == typeof(long))
return long.Parse(entity.GetString(ef.kind.ToString()));
else if (ef.type.IsClass)
return ef.type.GetConstructor(new[] { typeof(string) })!.Invoke(new[] { entity.GetString(ef.kind.ToString()) });
else {
throw new Exception($"invalid partition or row key type of {info.type} property {name}: {ef.type}");
}
}
var fieldName = ef.columnName;
var obj = entity[fieldName];
if (obj == null) {
if (ef.parameterInfo.HasDefaultValue) {
return ef.parameterInfo.DefaultValue;
}
return ef.defaultValue switch {
DefaultValueAttribute { InitMethod: InitMethod.DefaultConstructor } => Activator.CreateInstance(ef.type),
_ => null,
};
}
try {
if (ef.type == typeof(string)) {
return entity.GetString(fieldName);
} else if (ef.type == typeof(bool) || ef.type == typeof(bool?)) {
return entity.GetBoolean(fieldName);
} else if (ef.type == typeof(DateTimeOffset) || ef.type == typeof(DateTimeOffset?)) {
return entity.GetDateTimeOffset(fieldName);
} else if (ef.type == typeof(DateTime) || ef.type == typeof(DateTime?)) {
return entity.GetDateTime(fieldName);
} else if (ef.type == typeof(double) || ef.type == typeof(double?)) {
return entity.GetDouble(fieldName);
} else if (ef.type == typeof(Guid) || ef.type == typeof(Guid?)) {
return (object?)Guid.Parse(entity.GetString(fieldName));
} else if (ef.type == typeof(int) || ef.type == typeof(short) || ef.type == typeof(int?) || ef.type == typeof(short?)) {
return entity.GetInt32(fieldName);
} else if (ef.type == typeof(long) || ef.type == typeof(long?)) {
return entity.GetInt64(fieldName);
} else {
var outputType = ef.type;
if (ef.discriminator != null) {
var (attr, typeProvider) = ef.discriminator.Value;
var v = GetFieldValue(info, attr.FieldName, entity) ?? throw new Exception($"No value for {attr.FieldName}");
outputType = typeProvider.GetTypeInfo(v);
}
var objType = obj.GetType();
if (objType == typeof(string)) {
var value = entity.GetString(fieldName);
if (value.StartsWith('[') || value.StartsWith('{') || value == "null") {
return JsonSerializer.Deserialize(value, outputType, options: _options);
} else {
return JsonSerializer.Deserialize($"\"{value}\"", outputType, options: _options);
}
} else {
var value = entity.GetString(fieldName);
return JsonSerializer.Deserialize(value, outputType, options: _options);
}
}
} catch (Exception ex) {
throw new InvalidOperationException($"Unable to get value for property '{name}' (entity field '{fieldName}')", ex);
}
}
public T ToRecord<T>(TableEntity entity) where T : EntityBase {
var entityInfo = GetEntityInfo<T>();
object?[] parameters;
try {
parameters = entityInfo.properties.Select(grouping => GetFieldValue(entityInfo, grouping.Key, entity)).ToArray();
} catch (Exception ex) {
throw new InvalidOperationException($"Unable to extract properties from TableEntity for {typeof(T)}", ex);
}
try {
var entityRecord = (T)entityInfo.constructor.Invoke(parameters);
if (entity.ETag != default) {
entityRecord.ETag = entity.ETag;
}
entityRecord.TimeStamp = entity.Timestamp;
return entityRecord;
} catch (Exception ex) {
var stringParam = string.Join(", ", parameters);
throw new InvalidOperationException($"Could not initialize object of type {typeof(T)} with the following parameters: {stringParam} constructor {entityInfo.constructor}", ex);
}
}
}
using System.Collections.Concurrent;
using System.Linq.Expressions;
using System.Reflection;
using System.Text.Json;
using System.Text.Json.Serialization;
using Azure;
using Azure.Data.Tables;
namespace Microsoft.OneFuzz.Service.OneFuzzLib.Orm;
public abstract record EntityBase {
[JsonIgnore] public ETag? ETag { get; set; }
public DateTimeOffset? TimeStamp { get; set; }
// https://docs.microsoft.com/en-us/rest/api/storageservices/designing-a-scalable-partitioning-strategy-for-azure-table-storage#yyy
// Produce "good-quality-table-key" based on a DateTimeOffset timestamp
public static string NewSortedKey => $"{DateTimeOffset.MaxValue.Ticks - DateTimeOffset.UtcNow.Ticks}";
}
public abstract record StatefulEntityBase<T>([property: JsonIgnore] T BaseState) : EntityBase() where T : Enum;
/// How the value is populated
public enum InitMethod {
//T() will be used
DefaultConstructor,
}
[AttributeUsage(AttributeTargets.Parameter)]
public class DefaultValueAttribute : Attribute {
public InitMethod InitMethod { get; }
public DefaultValueAttribute(InitMethod initMethod) {
InitMethod = initMethod;
}
}
/// Indicates that the enum cases should no be renamed
[AttributeUsage(AttributeTargets.Enum)]
public class SerializeValueAttribute : Attribute { }
/// Indicates that the enum cases should no be renamed
[AttributeUsage(AttributeTargets.Enum)]
public class SkipRenameAttribute : Attribute { }
[AttributeUsage(AttributeTargets.Parameter)]
public class RowKeyAttribute : Attribute { }
[AttributeUsage(AttributeTargets.Parameter)]
public class PartitionKeyAttribute : Attribute { }
[AttributeUsage(AttributeTargets.Property)]
public class TypeDiscrimnatorAttribute : Attribute {
public string FieldName { get; }
// the type of a function that takes the value of fieldName as an input and return the type
public Type ConverterType { get; }
public TypeDiscrimnatorAttribute(string fieldName, Type converterType) {
if (!converterType.IsAssignableTo(typeof(ITypeProvider))) {
throw new ArgumentException($"the provided type needs to implement ITypeProvider");
}
FieldName = fieldName;
ConverterType = converterType;
}
}
public interface ITypeProvider {
Type GetTypeInfo(object input);
}
public enum EntityPropertyKind {
PartitionKey,
RowKey,
Column
}
public record EntityProperty(
string name,
string columnName,
Type type,
EntityPropertyKind kind,
(TypeDiscrimnatorAttribute, ITypeProvider)? discriminator,
DefaultValueAttribute? defaultValue,
ParameterInfo parameterInfo
);
public record EntityInfo(Type type, ILookup<string, EntityProperty> properties, Func<object?[], object> constructor);
class OnefuzzNamingPolicy : JsonNamingPolicy {
public override string ConvertName(string name) {
return CaseConverter.PascalToSnake(name);
}
}
public class EntityConverter {
private readonly JsonSerializerOptions _options;
private readonly ConcurrentDictionary<Type, EntityInfo> _cache;
public EntityConverter() {
_options = GetJsonSerializerOptions();
_cache = new ConcurrentDictionary<Type, EntityInfo>();
}
public static JsonSerializerOptions GetJsonSerializerOptions() {
var options = new JsonSerializerOptions() {
PropertyNamingPolicy = new OnefuzzNamingPolicy(),
};
options.Converters.Add(new CustomEnumConverterFactory());
options.Converters.Add(new PolymorphicConverterFactory());
return options;
}
internal static Func<object?[], object> BuildConstructerFrom(ConstructorInfo constructorInfo) {
var constructorParameters = Expression.Parameter(typeof(object?[]));
var parameterExpressions =
constructorInfo.GetParameters().Select((parameterInfo, i) => {
var ithIndex = Expression.Constant(i);
var ithParameter = Expression.ArrayIndex(constructorParameters, ithIndex);
var unboxedIthParameter = Expression.Convert(ithParameter, parameterInfo.ParameterType);
return unboxedIthParameter;
}).ToArray();
NewExpression constructorCall = Expression.New(constructorInfo, parameterExpressions);
Func<object?[], object> ctor = Expression.Lambda<Func<object?[], object>>(constructorCall, constructorParameters).Compile();
return ctor;
}
private static IEnumerable<EntityProperty> GetEntityProperties<T>(ParameterInfo parameterInfo) {
var name = parameterInfo.Name.EnsureNotNull($"Invalid paramter {parameterInfo}");
var parameterType = parameterInfo.ParameterType.EnsureNotNull($"Invalid paramter {parameterInfo}");
var isRowkey = parameterInfo.GetCustomAttribute(typeof(RowKeyAttribute)) != null;
var isPartitionkey = parameterInfo.GetCustomAttribute(typeof(PartitionKeyAttribute)) != null;
var discriminatorAttribute = typeof(T).GetProperty(name)?.GetCustomAttribute<TypeDiscrimnatorAttribute>();
var defaultValueAttribute = parameterInfo.GetCustomAttribute<DefaultValueAttribute>();
(TypeDiscrimnatorAttribute, ITypeProvider)? discriminator = null;
if (discriminatorAttribute != null) {
var t = (ITypeProvider)(Activator.CreateInstance(discriminatorAttribute.ConverterType) ?? throw new Exception("unable to retrive the type provider"));
discriminator = (discriminatorAttribute, t);
}
if (isPartitionkey) {
yield return new EntityProperty(name, "PartitionKey", parameterType, EntityPropertyKind.PartitionKey, discriminator, defaultValueAttribute, parameterInfo);
}
if (isRowkey) {
yield return new EntityProperty(name, "RowKey", parameterType, EntityPropertyKind.RowKey, discriminator, defaultValueAttribute, parameterInfo);
}
if (!isPartitionkey && !isRowkey) {
var columnName = typeof(T).GetProperty(name)?.GetCustomAttribute<JsonPropertyNameAttribute>()?.Name ?? CaseConverter.PascalToSnake(name);
yield return new EntityProperty(name, columnName, parameterType, EntityPropertyKind.Column, discriminator, defaultValueAttribute, parameterInfo);
}
}
private EntityInfo GetEntityInfo<T>() {
return _cache.GetOrAdd(typeof(T), type => {
var constructor = type.GetConstructors()[0];
var parameterInfos = constructor.GetParameters();
var parameters =
parameterInfos.SelectMany(GetEntityProperties<T>).ToArray();
return new EntityInfo(typeof(T), parameters.ToLookup(x => x.name), BuildConstructerFrom(constructor));
});
}
public string ToJsonString<T>(T typedEntity) => JsonSerializer.Serialize(typedEntity, _options);
public T? FromJsonString<T>(string value) => JsonSerializer.Deserialize<T>(value, _options);
public TableEntity ToTableEntity<T>(T typedEntity) where T : EntityBase {
if (typedEntity == null) {
throw new ArgumentNullException(nameof(typedEntity));
}
var type = typeof(T);
var entityInfo = GetEntityInfo<T>();
Dictionary<string, object?> columnValues = entityInfo.properties.SelectMany(x => x).Select(prop => {
var value = entityInfo.type.GetProperty(prop.name)?.GetValue(typedEntity);
if (value == null) {
return (prop.columnName, value: (object?)null);
}
if (prop.kind == EntityPropertyKind.PartitionKey || prop.kind == EntityPropertyKind.RowKey) {
return (prop.columnName, value?.ToString());
}
if (prop.type == typeof(Guid) || prop.type == typeof(Guid?) || prop.type == typeof(Uri)) {
return (prop.columnName, value?.ToString());
}
if (prop.type == typeof(bool)
|| prop.type == typeof(bool?)
|| prop.type == typeof(string)
|| prop.type == typeof(DateTime)
|| prop.type == typeof(DateTime?)
|| prop.type == typeof(DateTimeOffset)
|| prop.type == typeof(DateTimeOffset?)
|| prop.type == typeof(int)
|| prop.type == typeof(int?)
|| prop.type == typeof(long)
|| prop.type == typeof(long?)
|| prop.type == typeof(double)
|| prop.type == typeof(double?)
) {
return (prop.columnName, value);
}
var serialized = JsonSerializer.Serialize(value, _options);
return (prop.columnName, serialized.Trim('"'));
}).ToDictionary(x => x.columnName, x => x.value);
var tableEntity = new TableEntity(columnValues);
if (typedEntity.ETag.HasValue) {
tableEntity.ETag = typedEntity.ETag.Value;
}
return tableEntity;
}
private object? GetFieldValue(EntityInfo info, string name, TableEntity entity) {
var ef = info.properties[name].First();
if (ef.kind == EntityPropertyKind.PartitionKey || ef.kind == EntityPropertyKind.RowKey) {
if (ef.type == typeof(string))
return entity.GetString(ef.kind.ToString());
else if (ef.type == typeof(Guid))
return Guid.Parse(entity.GetString(ef.kind.ToString()));
else if (ef.type == typeof(int))
return int.Parse(entity.GetString(ef.kind.ToString()));
else if (ef.type == typeof(long))
return long.Parse(entity.GetString(ef.kind.ToString()));
else if (ef.type.IsClass)
return ef.type.GetConstructor(new[] { typeof(string) })!.Invoke(new[] { entity.GetString(ef.kind.ToString()) });
else {
throw new Exception($"invalid partition or row key type of {info.type} property {name}: {ef.type}");
}
}
var fieldName = ef.columnName;
var obj = entity[fieldName];
if (obj == null) {
if (ef.parameterInfo.HasDefaultValue) {
return ef.parameterInfo.DefaultValue;
}
return ef.defaultValue switch {
DefaultValueAttribute { InitMethod: InitMethod.DefaultConstructor } => Activator.CreateInstance(ef.type),
_ => null,
};
}
try {
if (ef.type == typeof(string)) {
return entity.GetString(fieldName);
} else if (ef.type == typeof(bool) || ef.type == typeof(bool?)) {
return entity.GetBoolean(fieldName);
} else if (ef.type == typeof(DateTimeOffset) || ef.type == typeof(DateTimeOffset?)) {
return entity.GetDateTimeOffset(fieldName);
} else if (ef.type == typeof(DateTime) || ef.type == typeof(DateTime?)) {
return entity.GetDateTime(fieldName);
} else if (ef.type == typeof(double) || ef.type == typeof(double?)) {
return entity.GetDouble(fieldName);
} else if (ef.type == typeof(Guid) || ef.type == typeof(Guid?)) {
return (object?)Guid.Parse(entity.GetString(fieldName));
} else if (ef.type == typeof(int) || ef.type == typeof(short) || ef.type == typeof(int?) || ef.type == typeof(short?)) {
return entity.GetInt32(fieldName);
} else if (ef.type == typeof(long) || ef.type == typeof(long?)) {
return entity.GetInt64(fieldName);
} else {
var outputType = ef.type;
if (ef.discriminator != null) {
var (attr, typeProvider) = ef.discriminator.Value;
var v = GetFieldValue(info, attr.FieldName, entity) ?? throw new Exception($"No value for {attr.FieldName}");
outputType = typeProvider.GetTypeInfo(v);
}
var objType = obj.GetType();
if (objType == typeof(string)) {
var value = entity.GetString(fieldName);
if (value.StartsWith('[') || value.StartsWith('{') || value == "null") {
return JsonSerializer.Deserialize(value, outputType, options: _options);
} else {
return JsonSerializer.Deserialize($"\"{value}\"", outputType, options: _options);
}
} else {
var value = entity.GetString(fieldName);
return JsonSerializer.Deserialize(value, outputType, options: _options);
}
}
} catch (Exception ex) {
throw new InvalidOperationException($"Unable to get value for property '{name}' (entity field '{fieldName}')", ex);
}
}
public T ToRecord<T>(TableEntity entity) where T : EntityBase {
var entityInfo = GetEntityInfo<T>();
object?[] parameters;
try {
parameters = entityInfo.properties.Select(grouping => GetFieldValue(entityInfo, grouping.Key, entity)).ToArray();
} catch (Exception ex) {
throw new InvalidOperationException($"Unable to extract properties from TableEntity for {typeof(T)}", ex);
}
try {
var entityRecord = (T)entityInfo.constructor.Invoke(parameters);
if (entity.ETag != default) {
entityRecord.ETag = entity.ETag;
}
entityRecord.TimeStamp = entity.Timestamp;
return entityRecord;
} catch (Exception ex) {
var stringParam = string.Join(", ", parameters);
throw new InvalidOperationException($"Could not initialize object of type {typeof(T)} with the following parameters: {stringParam} constructor {entityInfo.constructor}", ex);
}
}
}