mirror of
https://github.com/microsoft/onefuzz.git
synced 2025-06-15 03:18:07 +00:00
standardize on unix file endings (#516)
The majority of our source files have `\n` line endings. This updates the few files that use `\r\n` to use `\n`.
This commit is contained in:
@ -1,11 +1,11 @@
|
||||
# What is this for?
|
||||
|
||||
This section of code contains scripts which help to deploy latest releases of OneFuzz at demand. It uses Azure DevOps Build Pipeline.
|
||||
|
||||
The script [deploy-onefuzz.yml](deploy-onefuzz.yml) can be used saved in Azure DevOps Build Pipeline or can be stored in the repository and can be pointed to it.
|
||||
|
||||
It also contain supporting `python` scripts which helps to fetch latest version and artifacts from OneFuzz GitHub repository.
|
||||
|
||||
# How to use it?
|
||||
|
||||
# What is this for?
|
||||
|
||||
This section of code contains scripts which help to deploy latest releases of OneFuzz at demand. It uses Azure DevOps Build Pipeline.
|
||||
|
||||
The script [deploy-onefuzz.yml](deploy-onefuzz.yml) can be used saved in Azure DevOps Build Pipeline or can be stored in the repository and can be pointed to it.
|
||||
|
||||
It also contain supporting `python` scripts which helps to fetch latest version and artifacts from OneFuzz GitHub repository.
|
||||
|
||||
# How to use it?
|
||||
|
||||
This script is intended only for deploying newer updates. There are certain set of pipeline variables needs to be set as mentioned in [deploy-onefuzz.yml](deploy-onefuzz.yml) for authentication purposes to the OneFuzz instance.
|
@ -1,21 +1,21 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import azure.functions as func
|
||||
|
||||
# This endpoint handles the signalr negotation
|
||||
# As we do not differentiate from clients at this time, we pass the Functions runtime
|
||||
# provided connection straight to the client
|
||||
#
|
||||
# For more info:
|
||||
# https://docs.microsoft.com/en-us/azure/azure-signalr/signalr-concept-internals
|
||||
|
||||
|
||||
def main(req: func.HttpRequest, connectionInfoJson: str) -> func.HttpResponse:
|
||||
return func.HttpResponse(
|
||||
connectionInfoJson,
|
||||
status_code=200,
|
||||
headers={"Content-type": "application/json"},
|
||||
)
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import azure.functions as func
|
||||
|
||||
# This endpoint handles the signalr negotation
|
||||
# As we do not differentiate from clients at this time, we pass the Functions runtime
|
||||
# provided connection straight to the client
|
||||
#
|
||||
# For more info:
|
||||
# https://docs.microsoft.com/en-us/azure/azure-signalr/signalr-concept-internals
|
||||
|
||||
|
||||
def main(req: func.HttpRequest, connectionInfoJson: str) -> func.HttpResponse:
|
||||
return func.HttpResponse(
|
||||
connectionInfoJson,
|
||||
status_code=200,
|
||||
headers={"Content-type": "application/json"},
|
||||
)
|
||||
|
@ -1,4 +1,4 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
@ -1,65 +1,65 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from uuid import UUID
|
||||
|
||||
from onefuzztypes.enums import OS, Architecture, ContainerType, TaskType
|
||||
from onefuzztypes.models import TaskConfig, TaskContainers, TaskDetails, TaskPool
|
||||
from onefuzztypes.primitives import Container, PoolName
|
||||
|
||||
from __app__.onefuzzlib.autoscale import autoscale_pool, get_vm_count
|
||||
from __app__.onefuzzlib.tasks.main import Task
|
||||
from __app__.onefuzzlib.workers.pools import Pool
|
||||
|
||||
|
||||
class TestAutoscale(unittest.TestCase):
|
||||
@patch("__app__.onefuzzlib.tasks.main.Task.get_tasks_by_pool_name")
|
||||
def test_autoscale_pool(self, mock_get_tasks_by_pool_name: MagicMock) -> None:
|
||||
pool = Pool(
|
||||
name=PoolName("test-pool"),
|
||||
pool_id=UUID("6b049d51-23e9-4f5c-a5af-ff1f73d0d9e9"),
|
||||
os=OS.linux,
|
||||
managed=False,
|
||||
arch=Architecture.x86_64,
|
||||
)
|
||||
autoscale_pool(pool=pool)
|
||||
mock_get_tasks_by_pool_name.assert_not_called()
|
||||
|
||||
@patch("__app__.onefuzzlib.tasks.main.Task.get_pool")
|
||||
def test_get_vm_count(self, mock_get_pool: MagicMock) -> None:
|
||||
self.assertEqual(get_vm_count([]), 0)
|
||||
|
||||
task_config = TaskConfig(
|
||||
job_id=UUID("6b049d51-23e9-4f5c-a5af-ff1f73d0d9e9"),
|
||||
containers=[
|
||||
TaskContainers(
|
||||
type=ContainerType.inputs, name=Container("test-container")
|
||||
)
|
||||
],
|
||||
tags={},
|
||||
task=TaskDetails(
|
||||
type=TaskType.libfuzzer_fuzz,
|
||||
duration=12,
|
||||
target_exe="fuzz.exe",
|
||||
target_env={},
|
||||
target_options=[],
|
||||
),
|
||||
pool=TaskPool(count=2, pool_name=PoolName("test-pool")),
|
||||
)
|
||||
task = Task(
|
||||
job_id=UUID("6b049d51-23e9-4f5c-a5af-ff1f73d0d9e9"),
|
||||
os=OS.linux,
|
||||
config=task_config,
|
||||
)
|
||||
mock_get_pool.return_value = Pool(
|
||||
name=PoolName("test-pool"),
|
||||
pool_id=UUID("6b049d51-23e9-4f5c-a5af-ff1f73d0d9e9"),
|
||||
os=OS.linux,
|
||||
managed=False,
|
||||
arch=Architecture.x86_64,
|
||||
)
|
||||
self.assertEqual(get_vm_count([task]), 2)
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from uuid import UUID
|
||||
|
||||
from onefuzztypes.enums import OS, Architecture, ContainerType, TaskType
|
||||
from onefuzztypes.models import TaskConfig, TaskContainers, TaskDetails, TaskPool
|
||||
from onefuzztypes.primitives import Container, PoolName
|
||||
|
||||
from __app__.onefuzzlib.autoscale import autoscale_pool, get_vm_count
|
||||
from __app__.onefuzzlib.tasks.main import Task
|
||||
from __app__.onefuzzlib.workers.pools import Pool
|
||||
|
||||
|
||||
class TestAutoscale(unittest.TestCase):
|
||||
@patch("__app__.onefuzzlib.tasks.main.Task.get_tasks_by_pool_name")
|
||||
def test_autoscale_pool(self, mock_get_tasks_by_pool_name: MagicMock) -> None:
|
||||
pool = Pool(
|
||||
name=PoolName("test-pool"),
|
||||
pool_id=UUID("6b049d51-23e9-4f5c-a5af-ff1f73d0d9e9"),
|
||||
os=OS.linux,
|
||||
managed=False,
|
||||
arch=Architecture.x86_64,
|
||||
)
|
||||
autoscale_pool(pool=pool)
|
||||
mock_get_tasks_by_pool_name.assert_not_called()
|
||||
|
||||
@patch("__app__.onefuzzlib.tasks.main.Task.get_pool")
|
||||
def test_get_vm_count(self, mock_get_pool: MagicMock) -> None:
|
||||
self.assertEqual(get_vm_count([]), 0)
|
||||
|
||||
task_config = TaskConfig(
|
||||
job_id=UUID("6b049d51-23e9-4f5c-a5af-ff1f73d0d9e9"),
|
||||
containers=[
|
||||
TaskContainers(
|
||||
type=ContainerType.inputs, name=Container("test-container")
|
||||
)
|
||||
],
|
||||
tags={},
|
||||
task=TaskDetails(
|
||||
type=TaskType.libfuzzer_fuzz,
|
||||
duration=12,
|
||||
target_exe="fuzz.exe",
|
||||
target_env={},
|
||||
target_options=[],
|
||||
),
|
||||
pool=TaskPool(count=2, pool_name=PoolName("test-pool")),
|
||||
)
|
||||
task = Task(
|
||||
job_id=UUID("6b049d51-23e9-4f5c-a5af-ff1f73d0d9e9"),
|
||||
os=OS.linux,
|
||||
config=task_config,
|
||||
)
|
||||
mock_get_pool.return_value = Pool(
|
||||
name=PoolName("test-pool"),
|
||||
pool_id=UUID("6b049d51-23e9-4f5c-a5af-ff1f73d0d9e9"),
|
||||
os=OS.linux,
|
||||
managed=False,
|
||||
arch=Architecture.x86_64,
|
||||
)
|
||||
self.assertEqual(get_vm_count([task]), 2)
|
||||
|
@ -1,147 +1,147 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import unittest
|
||||
from typing import Dict, Generator, List, TypeVar
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from onefuzztypes.enums import OS, ContainerType, TaskType
|
||||
from onefuzztypes.models import TaskConfig, TaskContainers, TaskDetails, TaskPool
|
||||
from onefuzztypes.primitives import Container, PoolName
|
||||
|
||||
from __app__.onefuzzlib.tasks.main import Task
|
||||
from __app__.onefuzzlib.tasks.scheduler import bucket_tasks
|
||||
|
||||
A = TypeVar("A")
|
||||
|
||||
|
||||
def chunks(items: List[A], size: int) -> Generator[List[A], None, None]:
|
||||
return (items[x : x + size] for x in range(0, len(items), size))
|
||||
|
||||
|
||||
class TestTaskBuckets(unittest.TestCase):
|
||||
def build_tasks(self, size: int) -> List[Task]:
|
||||
tasks = []
|
||||
for _ in range(size):
|
||||
task = Task(
|
||||
job_id=UUID(int=0),
|
||||
config=TaskConfig(
|
||||
job_id=UUID(int=0),
|
||||
task=TaskDetails(
|
||||
type=TaskType.libfuzzer_fuzz,
|
||||
duration=1,
|
||||
target_exe="fuzz.exe",
|
||||
target_env={},
|
||||
target_options=[],
|
||||
),
|
||||
pool=TaskPool(pool_name=PoolName("pool"), count=1),
|
||||
containers=[
|
||||
TaskContainers(
|
||||
type=ContainerType.setup, name=Container("setup")
|
||||
)
|
||||
],
|
||||
tags={},
|
||||
colocate=True,
|
||||
),
|
||||
os=OS.linux,
|
||||
)
|
||||
tasks.append(task)
|
||||
return tasks
|
||||
|
||||
def test_all_colocate(self) -> None:
|
||||
# all tasks should land in one bucket
|
||||
tasks = self.build_tasks(10)
|
||||
for task in tasks:
|
||||
task.config.colocate = True
|
||||
|
||||
buckets = bucket_tasks(tasks)
|
||||
|
||||
for bucket in buckets.values():
|
||||
self.assertEqual(len(bucket), 10)
|
||||
|
||||
self.check_buckets(buckets, tasks, bucket_count=1)
|
||||
|
||||
def test_partial_colocate(self) -> None:
|
||||
# 2 tasks should land on their own, the rest should be colocated into a
|
||||
# single bucket.
|
||||
|
||||
tasks = self.build_tasks(10)
|
||||
|
||||
# a the task came before colocation was defined
|
||||
tasks[0].config.colocate = None
|
||||
|
||||
# a the task shouldn't be colocated
|
||||
tasks[1].config.colocate = False
|
||||
|
||||
buckets = bucket_tasks(tasks)
|
||||
|
||||
lengths = []
|
||||
for bucket in buckets.values():
|
||||
lengths.append(len(bucket))
|
||||
self.assertEqual([1, 1, 8], sorted(lengths))
|
||||
self.check_buckets(buckets, tasks, bucket_count=3)
|
||||
|
||||
def test_all_unique_job(self) -> None:
|
||||
# everything has a unique job_id
|
||||
tasks = self.build_tasks(10)
|
||||
for task in tasks:
|
||||
job_id = uuid4()
|
||||
task.job_id = job_id
|
||||
task.config.job_id = job_id
|
||||
|
||||
buckets = bucket_tasks(tasks)
|
||||
|
||||
for bucket in buckets.values():
|
||||
self.assertEqual(len(bucket), 1)
|
||||
|
||||
self.check_buckets(buckets, tasks, bucket_count=10)
|
||||
|
||||
def test_multiple_job_buckets(self) -> None:
|
||||
# at most 3 tasks per bucket, by job_id
|
||||
tasks = self.build_tasks(10)
|
||||
for task_chunks in chunks(tasks, 3):
|
||||
job_id = uuid4()
|
||||
for task in task_chunks:
|
||||
task.job_id = job_id
|
||||
task.config.job_id = job_id
|
||||
|
||||
buckets = bucket_tasks(tasks)
|
||||
|
||||
for bucket in buckets.values():
|
||||
self.assertLessEqual(len(bucket), 3)
|
||||
|
||||
self.check_buckets(buckets, tasks, bucket_count=4)
|
||||
|
||||
def test_many_buckets(self) -> None:
|
||||
tasks = self.build_tasks(100)
|
||||
job_id = UUID(int=1)
|
||||
for i, task in enumerate(tasks):
|
||||
if i % 2 == 0:
|
||||
task.job_id = job_id
|
||||
task.config.job_id = job_id
|
||||
|
||||
if i % 3 == 0:
|
||||
task.os = OS.windows
|
||||
|
||||
if i % 4 == 0:
|
||||
task.config.containers[0].name = Container("setup2")
|
||||
|
||||
if i % 5 == 0:
|
||||
if task.config.pool:
|
||||
task.config.pool.pool_name = PoolName("alternate-pool")
|
||||
|
||||
buckets = bucket_tasks(tasks)
|
||||
self.check_buckets(buckets, tasks, bucket_count=12)
|
||||
|
||||
def check_buckets(self, buckets: Dict, tasks: List, *, bucket_count: int) -> None:
|
||||
self.assertEqual(len(buckets), bucket_count, "bucket count")
|
||||
|
||||
for task in tasks:
|
||||
seen = False
|
||||
for bucket in buckets.values():
|
||||
if task in bucket:
|
||||
self.assertEqual(seen, False, "task seen in multiple buckets")
|
||||
seen = True
|
||||
self.assertEqual(seen, True, "task not seein in any buckets")
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import unittest
|
||||
from typing import Dict, Generator, List, TypeVar
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
from onefuzztypes.enums import OS, ContainerType, TaskType
|
||||
from onefuzztypes.models import TaskConfig, TaskContainers, TaskDetails, TaskPool
|
||||
from onefuzztypes.primitives import Container, PoolName
|
||||
|
||||
from __app__.onefuzzlib.tasks.main import Task
|
||||
from __app__.onefuzzlib.tasks.scheduler import bucket_tasks
|
||||
|
||||
A = TypeVar("A")
|
||||
|
||||
|
||||
def chunks(items: List[A], size: int) -> Generator[List[A], None, None]:
|
||||
return (items[x : x + size] for x in range(0, len(items), size))
|
||||
|
||||
|
||||
class TestTaskBuckets(unittest.TestCase):
|
||||
def build_tasks(self, size: int) -> List[Task]:
|
||||
tasks = []
|
||||
for _ in range(size):
|
||||
task = Task(
|
||||
job_id=UUID(int=0),
|
||||
config=TaskConfig(
|
||||
job_id=UUID(int=0),
|
||||
task=TaskDetails(
|
||||
type=TaskType.libfuzzer_fuzz,
|
||||
duration=1,
|
||||
target_exe="fuzz.exe",
|
||||
target_env={},
|
||||
target_options=[],
|
||||
),
|
||||
pool=TaskPool(pool_name=PoolName("pool"), count=1),
|
||||
containers=[
|
||||
TaskContainers(
|
||||
type=ContainerType.setup, name=Container("setup")
|
||||
)
|
||||
],
|
||||
tags={},
|
||||
colocate=True,
|
||||
),
|
||||
os=OS.linux,
|
||||
)
|
||||
tasks.append(task)
|
||||
return tasks
|
||||
|
||||
def test_all_colocate(self) -> None:
|
||||
# all tasks should land in one bucket
|
||||
tasks = self.build_tasks(10)
|
||||
for task in tasks:
|
||||
task.config.colocate = True
|
||||
|
||||
buckets = bucket_tasks(tasks)
|
||||
|
||||
for bucket in buckets.values():
|
||||
self.assertEqual(len(bucket), 10)
|
||||
|
||||
self.check_buckets(buckets, tasks, bucket_count=1)
|
||||
|
||||
def test_partial_colocate(self) -> None:
|
||||
# 2 tasks should land on their own, the rest should be colocated into a
|
||||
# single bucket.
|
||||
|
||||
tasks = self.build_tasks(10)
|
||||
|
||||
# a the task came before colocation was defined
|
||||
tasks[0].config.colocate = None
|
||||
|
||||
# a the task shouldn't be colocated
|
||||
tasks[1].config.colocate = False
|
||||
|
||||
buckets = bucket_tasks(tasks)
|
||||
|
||||
lengths = []
|
||||
for bucket in buckets.values():
|
||||
lengths.append(len(bucket))
|
||||
self.assertEqual([1, 1, 8], sorted(lengths))
|
||||
self.check_buckets(buckets, tasks, bucket_count=3)
|
||||
|
||||
def test_all_unique_job(self) -> None:
|
||||
# everything has a unique job_id
|
||||
tasks = self.build_tasks(10)
|
||||
for task in tasks:
|
||||
job_id = uuid4()
|
||||
task.job_id = job_id
|
||||
task.config.job_id = job_id
|
||||
|
||||
buckets = bucket_tasks(tasks)
|
||||
|
||||
for bucket in buckets.values():
|
||||
self.assertEqual(len(bucket), 1)
|
||||
|
||||
self.check_buckets(buckets, tasks, bucket_count=10)
|
||||
|
||||
def test_multiple_job_buckets(self) -> None:
|
||||
# at most 3 tasks per bucket, by job_id
|
||||
tasks = self.build_tasks(10)
|
||||
for task_chunks in chunks(tasks, 3):
|
||||
job_id = uuid4()
|
||||
for task in task_chunks:
|
||||
task.job_id = job_id
|
||||
task.config.job_id = job_id
|
||||
|
||||
buckets = bucket_tasks(tasks)
|
||||
|
||||
for bucket in buckets.values():
|
||||
self.assertLessEqual(len(bucket), 3)
|
||||
|
||||
self.check_buckets(buckets, tasks, bucket_count=4)
|
||||
|
||||
def test_many_buckets(self) -> None:
|
||||
tasks = self.build_tasks(100)
|
||||
job_id = UUID(int=1)
|
||||
for i, task in enumerate(tasks):
|
||||
if i % 2 == 0:
|
||||
task.job_id = job_id
|
||||
task.config.job_id = job_id
|
||||
|
||||
if i % 3 == 0:
|
||||
task.os = OS.windows
|
||||
|
||||
if i % 4 == 0:
|
||||
task.config.containers[0].name = Container("setup2")
|
||||
|
||||
if i % 5 == 0:
|
||||
if task.config.pool:
|
||||
task.config.pool.pool_name = PoolName("alternate-pool")
|
||||
|
||||
buckets = bucket_tasks(tasks)
|
||||
self.check_buckets(buckets, tasks, bucket_count=12)
|
||||
|
||||
def check_buckets(self, buckets: Dict, tasks: List, *, bucket_count: int) -> None:
|
||||
self.assertEqual(len(buckets), bucket_count, "bucket count")
|
||||
|
||||
for task in tasks:
|
||||
seen = False
|
||||
for bucket in buckets.values():
|
||||
if task in bucket:
|
||||
self.assertEqual(seen, False, "task seen in multiple buckets")
|
||||
seen = True
|
||||
self.assertEqual(seen, True, "task not seein in any buckets")
|
||||
|
@ -1,51 +1,51 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import unittest
|
||||
from uuid import UUID
|
||||
|
||||
from onefuzztypes.events import EventPing, EventType
|
||||
|
||||
from __app__.onefuzzlib.webhooks import build_message
|
||||
|
||||
|
||||
class TestWebhookHmac(unittest.TestCase):
|
||||
def test_webhook_hmac(self) -> None:
|
||||
webhook_id = UUID(int=0)
|
||||
event_id = UUID(int=1)
|
||||
event_type = EventType.ping
|
||||
event = EventPing(ping_id=UUID(int=2))
|
||||
|
||||
data, digest = build_message(
|
||||
webhook_id=webhook_id, event_id=event_id, event_type=event_type, event=event
|
||||
)
|
||||
|
||||
expected = (
|
||||
b"{"
|
||||
b'"event": {"ping_id": "00000000-0000-0000-0000-000000000002"}, '
|
||||
b'"event_id": "00000000-0000-0000-0000-000000000001", '
|
||||
b'"event_type": "ping", '
|
||||
b'"webhook_id": "00000000-0000-0000-0000-000000000000"'
|
||||
b"}"
|
||||
)
|
||||
|
||||
expected_digest = (
|
||||
"3502f83237ce006b7f6cfa40b89c0295009e3ccb0a1e62ce1d689700c2c6e698"
|
||||
"61c0de81e011495c2ca89fbf99485b841cee257bcfba326a3edc66f39dc1feec"
|
||||
)
|
||||
|
||||
print(repr(expected))
|
||||
self.assertEqual(data, expected)
|
||||
self.assertEqual(digest, None)
|
||||
|
||||
data, digest = build_message(
|
||||
webhook_id=webhook_id,
|
||||
event_id=event_id,
|
||||
event_type=event_type,
|
||||
event=event,
|
||||
secret_token="hello there",
|
||||
)
|
||||
self.assertEqual(data, expected)
|
||||
self.assertEqual(digest, expected_digest)
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import unittest
|
||||
from uuid import UUID
|
||||
|
||||
from onefuzztypes.events import EventPing, EventType
|
||||
|
||||
from __app__.onefuzzlib.webhooks import build_message
|
||||
|
||||
|
||||
class TestWebhookHmac(unittest.TestCase):
|
||||
def test_webhook_hmac(self) -> None:
|
||||
webhook_id = UUID(int=0)
|
||||
event_id = UUID(int=1)
|
||||
event_type = EventType.ping
|
||||
event = EventPing(ping_id=UUID(int=2))
|
||||
|
||||
data, digest = build_message(
|
||||
webhook_id=webhook_id, event_id=event_id, event_type=event_type, event=event
|
||||
)
|
||||
|
||||
expected = (
|
||||
b"{"
|
||||
b'"event": {"ping_id": "00000000-0000-0000-0000-000000000002"}, '
|
||||
b'"event_id": "00000000-0000-0000-0000-000000000001", '
|
||||
b'"event_type": "ping", '
|
||||
b'"webhook_id": "00000000-0000-0000-0000-000000000000"'
|
||||
b"}"
|
||||
)
|
||||
|
||||
expected_digest = (
|
||||
"3502f83237ce006b7f6cfa40b89c0295009e3ccb0a1e62ce1d689700c2c6e698"
|
||||
"61c0de81e011495c2ca89fbf99485b841cee257bcfba326a3edc66f39dc1feec"
|
||||
)
|
||||
|
||||
print(repr(expected))
|
||||
self.assertEqual(data, expected)
|
||||
self.assertEqual(digest, None)
|
||||
|
||||
data, digest = build_message(
|
||||
webhook_id=webhook_id,
|
||||
event_id=event_id,
|
||||
event_type=event_type,
|
||||
event=event,
|
||||
secret_token="hello there",
|
||||
)
|
||||
self.assertEqual(data, expected)
|
||||
self.assertEqual(digest, expected_digest)
|
||||
|
@ -1,20 +1,20 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
import azure.functions as func
|
||||
|
||||
from onefuzz.api import Onefuzz
|
||||
|
||||
|
||||
def main(req: func.HttpRequest) -> func.HttpResponse:
|
||||
logging.info("Python HTTP trigger function processed a request.")
|
||||
|
||||
o = Onefuzz()
|
||||
o.config(
|
||||
endpoint=os.environ.get("ONEFUZZ_ENDPOINT"),
|
||||
authority=os.environ.get("ONEFUZZ_AUTHORITY"),
|
||||
client_id=os.environ.get("ONEFUZZ_CLIENT_ID"),
|
||||
client_secret=os.environ.get("ONEFUZZ_CLIENT_SECRET"),
|
||||
)
|
||||
info = o.info.get()
|
||||
return func.HttpResponse(info.json())
|
||||
import logging
|
||||
import os
|
||||
|
||||
import azure.functions as func
|
||||
|
||||
from onefuzz.api import Onefuzz
|
||||
|
||||
|
||||
def main(req: func.HttpRequest) -> func.HttpResponse:
|
||||
logging.info("Python HTTP trigger function processed a request.")
|
||||
|
||||
o = Onefuzz()
|
||||
o.config(
|
||||
endpoint=os.environ.get("ONEFUZZ_ENDPOINT"),
|
||||
authority=os.environ.get("ONEFUZZ_AUTHORITY"),
|
||||
client_id=os.environ.get("ONEFUZZ_CLIENT_ID"),
|
||||
client_secret=os.environ.get("ONEFUZZ_CLIENT_SECRET"),
|
||||
)
|
||||
info = o.info.get()
|
||||
return func.HttpResponse(info.json())
|
||||
|
Reference in New Issue
Block a user