add log checking to refactored integration check (#700)

In practice, Application Insights can take up to 3 minutes before something sent to it is available via KQL.

This PR logs a start and stop marker such that the integration tests only search for logs during the integration tests. This reduces the complexity when using the integration tests during the development process.

Note: this migrated the new functionality from #356 into the latest integration test tools.
This commit is contained in:
bmc-msft
2021-04-02 17:49:19 -04:00
committed by GitHub
parent 9c1540aca8
commit ca12904684
7 changed files with 147 additions and 24 deletions

View File

@ -214,9 +214,6 @@ def on_worker_event_done(machine_id: UUID, event: WorkerDoneEvent) -> Result[Non
node.debug_keep_node = True node.debug_keep_node = True
node.save() node.save()
else: else:
logging.error(
"task failed. %s:%s status:%s", task.job_id, task.task_id, event.exit_status
)
task.mark_failed( task.mark_failed(
Error( Error(
code=ErrorCode.TASK_FAILED, code=ErrorCode.TASK_FAILED,

View File

@ -136,7 +136,7 @@ def get_extension(vm_name: str, extension_name: str) -> Optional[Any]:
resource_group, vm_name, extension_name resource_group, vm_name, extension_name
) )
except (ResourceNotFoundError, CloudError) as err: except (ResourceNotFoundError, CloudError) as err:
logging.error("extension does not exist %s", err) logging.info("extension does not exist %s", err)
return None return None

View File

@ -108,9 +108,12 @@ def get_queue_tasks() -> Sequence[Tuple[Task, Sequence[str]]]:
def new_files(container: Container, filename: str) -> None: def new_files(container: Container, filename: str) -> None:
report = get_report_or_regression(container, filename)
notifications = get_notifications(container) notifications = get_notifications(container)
report = get_report_or_regression(
container, filename, expect_reports=bool(notifications)
)
if notifications: if notifications:
done = [] done = []
for notification in notifications: for notification in notifications:

View File

@ -17,12 +17,15 @@ from .azure.storage import StorageType
def parse_report_or_regression( def parse_report_or_regression(
content: Union[str, bytes], file_path: Optional[str] = None content: Union[str, bytes],
file_path: Optional[str] = None,
expect_reports: bool = False,
) -> Optional[Union[Report, RegressionReport]]: ) -> Optional[Union[Report, RegressionReport]]:
if isinstance(content, bytes): if isinstance(content, bytes):
try: try:
content = content.decode() content = content.decode()
except UnicodeDecodeError as err: except UnicodeDecodeError as err:
if expect_reports:
logging.error( logging.error(
f"unable to parse report ({file_path}): " f"unable to parse report ({file_path}): "
f"unicode decode of report failed - {err}" f"unicode decode of report failed - {err}"
@ -32,6 +35,7 @@ def parse_report_or_regression(
try: try:
data = json.loads(content) data = json.loads(content)
except json.decoder.JSONDecodeError as err: except json.decoder.JSONDecodeError as err:
if expect_reports:
logging.error( logging.error(
f"unable to parse report ({file_path}): json decoding failed - {err}" f"unable to parse report ({file_path}): json decoding failed - {err}"
) )
@ -46,6 +50,7 @@ def parse_report_or_regression(
try: try:
return Report.parse_obj(data) return Report.parse_obj(data)
except ValidationError as err: except ValidationError as err:
if expect_reports:
logging.error( logging.error(
f"unable to parse report ({file_path}) as a report or regression. " f"unable to parse report ({file_path}) as a report or regression. "
f"regression error: {regression_err} report error: {err}" f"regression error: {regression_err} report error: {err}"
@ -56,19 +61,23 @@ def parse_report_or_regression(
# cache the last 1000 reports # cache the last 1000 reports
@cached(max_size=1000) @cached(max_size=1000)
def get_report_or_regression( def get_report_or_regression(
container: Container, filename: str container: Container, filename: str, *, expect_reports: bool = False
) -> Optional[Union[Report, RegressionReport]]: ) -> Optional[Union[Report, RegressionReport]]:
file_path = "/".join([container, filename]) file_path = "/".join([container, filename])
if not filename.endswith(".json"): if not filename.endswith(".json"):
if expect_reports:
logging.error("get_report invalid extension: %s", file_path) logging.error("get_report invalid extension: %s", file_path)
return None return None
blob = get_blob(container, filename, StorageType.corpus) blob = get_blob(container, filename, StorageType.corpus)
if blob is None: if blob is None:
if expect_reports:
logging.error("get_report invalid blob: %s", file_path) logging.error("get_report invalid blob: %s", file_path)
return None return None
return parse_report_or_regression(blob, file_path=file_path) return parse_report_or_regression(
blob, file_path=file_path, expect_reports=expect_reports
)
def get_report(container: Container, filename: str) -> Optional[Report]: def get_report(container: Container, filename: str) -> Optional[Report]:

View File

@ -204,6 +204,8 @@ class Task(BASE_TASK, ORMMixin):
) )
return return
logging.error("task failed %s:%s - %s", self.job_id, self.task_id, error)
self.error = error self.error = error
self.set_state(TaskState.stopping) self.set_state(TaskState.stopping)

View File

@ -24,10 +24,14 @@ class TestReportParse(unittest.TestCase):
self.assertIsInstance(report, Report) self.assertIsInstance(report, Report)
with self.assertLogs(level="ERROR"): with self.assertLogs(level="ERROR"):
self.assertIsNone(parse_report_or_regression('"invalid"')) self.assertIsNone(
parse_report_or_regression('"invalid"', expect_reports=True)
)
with self.assertLogs(level="WARNING") as logs: with self.assertLogs(level="WARNING") as logs:
self.assertIsNone(parse_report_or_regression(json.dumps(invalid))) self.assertIsNone(
parse_report_or_regression(json.dumps(invalid), expect_reports=True)
)
self.assertTrue(any(["unable to parse report" in x for x in logs.output])) self.assertTrue(any(["unable to parse report" in x for x in logs.output]))

View File

@ -17,6 +17,7 @@
# checks on each of the created items for the stage. This batch processing # checks on each of the created items for the stage. This batch processing
# allows testing multiple components concurrently. # allows testing multiple components concurrently.
import datetime
import logging import logging
import os import os
import re import re
@ -26,6 +27,7 @@ from shutil import which
from typing import Dict, List, Optional, Set, Tuple from typing import Dict, List, Optional, Set, Tuple
from uuid import UUID, uuid4 from uuid import UUID, uuid4
import requests
from onefuzz.api import Command, Onefuzz from onefuzz.api import Command, Onefuzz
from onefuzz.backend import ContainerWrapper, wait from onefuzz.backend import ContainerWrapper, wait
from onefuzz.cli import execute_api from onefuzz.cli import execute_api
@ -207,6 +209,8 @@ class TestOnefuzz:
self.pools: Dict[OS, Pool] = {} self.pools: Dict[OS, Pool] = {}
self.test_id = test_id self.test_id = test_id
self.project = f"test-{self.test_id}" self.project = f"test-{self.test_id}"
self.start_log_marker = f"integration-test-injection-error-start-{self.test_id}"
self.stop_log_marker = f"integration-test-injection-error-stop-{self.test_id}"
def setup( def setup(
self, self,
@ -215,6 +219,7 @@ class TestOnefuzz:
pool_size: int, pool_size: int,
os_list: List[OS], os_list: List[OS],
) -> None: ) -> None:
self.inject_log(self.start_log_marker)
for entry in os_list: for entry in os_list:
name = PoolName(f"testpool-{entry.name}-{self.test_id}") name = PoolName(f"testpool-{entry.name}-{self.test_id}")
self.logger.info("creating pool: %s:%s", entry.name, name) self.logger.info("creating pool: %s:%s", entry.name, name)
@ -686,6 +691,101 @@ class TestOnefuzz:
if errors: if errors:
raise Exception("cleanup failed") raise Exception("cleanup failed")
def inject_log(self, message: str) -> None:
# This is an *extremely* minimal implementation of the Application Insights rest
# API, as discussed here:
#
# https://apmtips.com/posts/2017-10-27-send-metric-to-application-insights/
key = self.of.info.get().insights_instrumentation_key
assert key is not None, "instrumentation key required for integration testing"
data = {
"data": {
"baseData": {
"message": message,
"severityLevel": "Information",
"ver": 2,
},
"baseType": "MessageData",
},
"iKey": key,
"name": "Microsoft.ApplicationInsights.Message",
"time": datetime.datetime.now(datetime.timezone.utc)
.astimezone()
.isoformat(),
}
requests.post(
"https://dc.services.visualstudio.com/v2/track", json=data
).raise_for_status()
def check_log_end_marker(
self,
) -> Tuple[bool, str, bool]:
logs = self.of.debug.logs.keyword(
self.stop_log_marker, limit=1, timespan="PT1H"
)
return (
len(logs) > 0,
"waiting for application insight logs to flush",
True,
)
def check_logs_for_errors(self) -> None:
# only check for errors that exist between the start and stop markers
# also, only check for the most recent 100 errors within the last 3
# hours. The records are scanned through in reverse chronological
# order.
self.inject_log(self.stop_log_marker)
wait(self.check_log_end_marker, frequency=5.0)
self.logger.info("application insights log flushed")
logs = self.of.debug.logs.keyword("error", limit=100000, timespan="PT3H")
seen_errors = False
seen_stop = False
for entry in logs:
message = entry.get("message", "")
if not seen_stop:
if self.stop_log_marker in message:
seen_stop = True
continue
if self.start_log_marker in message:
break
# ignore logging.info coming from Azure Functions
if entry.get("customDimensions", {}).get("LogLevel") == "Information":
continue
# ignore warnings coming from the rust code, only be concerned
# about errors
if (
entry.get("severityLevel") == 2
and entry.get("sdkVersion") == "rust:0.1.5"
):
continue
# ignore resource not found warnings from azure-functions layer,
# which relate to azure-retry issues
if (
message.startswith("Client-Request-ID=")
and "ResourceNotFound" in message
and entry.get("sdkVersion", "").startswith("azurefunctions")
):
continue
if message is None:
self.logger.error("error log: %s", entry)
else:
self.logger.error("error log: %s", message)
seen_errors = True
if seen_errors:
raise Exception("logs included errors")
class Run(Command): class Run(Command):
def check_jobs( def check_jobs(
@ -739,6 +839,11 @@ class Run(Command):
tester = TestOnefuzz(self.onefuzz, self.logger, test_id=test_id) tester = TestOnefuzz(self.onefuzz, self.logger, test_id=test_id)
tester.cleanup() tester.cleanup()
def check_logs(self, test_id: UUID, *, endpoint: Optional[str]) -> None:
self.onefuzz.__setup__(endpoint=endpoint)
tester = TestOnefuzz(self.onefuzz, self.logger, test_id=test_id)
tester.check_logs_for_errors()
def test( def test(
self, self,
samples: Directory, samples: Directory,
@ -774,6 +879,9 @@ class Run(Command):
self.logger.warning("not testing crash repro") self.logger.warning("not testing crash repro")
else: else:
self.check_repros(test_id, endpoint=endpoint) self.check_repros(test_id, endpoint=endpoint)
self.check_logs(test_id, endpoint=endpoint)
except Exception as e: except Exception as e:
self.logger.error("testing failed: %s", repr(e)) self.logger.error("testing failed: %s", repr(e))
error = e error = e