add 'onefuzz debug log tail' (#401)

Adds `onefuzz debug log tail <keyword>`, which enables performing the same component in `onefuzz debug log keyword <keyword>` in a loop.  

Optimizations:
* only returns the N records at a time (default 1000)
* each query only returns records that occur after the latest record received.
* If no results are returned, waits 10s before retrying
* Increases the wait time by 1.5x until the wait time is larger than 60s

Using `--filter` provides the ability to filter each record that comes back via jmespath.

Example uses:

Monitor any log messages (which ignores metrics) for a given job_id GUID
```
onefuzz debug logs tail bf4efdfd-685c-444a-81c5-d911477433ae --filter message
```

Log the job_id and task_id for each new unique report:
```
onefuzz debug logs tail new_unique_report --filter '[customDimensions.job_id, customDimensions.task_id]'
```

Log the job_id and task_id for each new unique report only for the specific job_id:
```
onefuzz debug logs tail "new_unique_report d5bcd4d2-4dab-49d5-a215-66db94fb0309" --filter '[customDimensions.job_id, customDimensions.task_id]'
```
This commit is contained in:
bmc-msft
2021-01-04 16:08:27 -05:00
committed by GitHub
parent 29c7cfbd5d
commit f8f7e28aa2

View File

@ -9,10 +9,12 @@ import os
import shutil import shutil
import subprocess # nosec import subprocess # nosec
import tempfile import tempfile
from typing import Any, Dict, List, Optional, Tuple import time
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse from urllib.parse import urlparse
from uuid import UUID from uuid import UUID
import jmespath
from azure.applicationinsights import ApplicationInsightsDataClient from azure.applicationinsights import ApplicationInsightsDataClient
from azure.applicationinsights.models import QueryBody from azure.applicationinsights.models import QueryBody
from azure.common.client_factory import get_azure_cli_credentials from azure.common.client_factory import get_azure_cli_credentials
@ -20,7 +22,7 @@ from onefuzztypes.enums import ContainerType, TaskType
from onefuzztypes.models import BlobRef, NodeAssignment, Report, Task from onefuzztypes.models import BlobRef, NodeAssignment, Report, Task
from onefuzztypes.primitives import Directory from onefuzztypes.primitives import Directory
from onefuzz.api import UUID_EXPANSION, Command from onefuzz.api import UUID_EXPANSION, Command, Onefuzz
from .backend import wait from .backend import wait
from .rdp import rdp_connect from .rdp import rdp_connect
@ -29,6 +31,8 @@ from .ssh import ssh_connect
EMPTY_SHA256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" EMPTY_SHA256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
ZERO_SHA256 = "0" * len(EMPTY_SHA256) ZERO_SHA256 = "0" * len(EMPTY_SHA256)
DAY_TIMESPAN = "PT24H" DAY_TIMESPAN = "PT24H"
HOUR_TIMESPAN = "PT1H"
DEFAULT_TAIL_DELAY = 10.0
class DebugRepro(Command): class DebugRepro(Command):
@ -360,7 +364,13 @@ class DebugJob(Command):
class DebugLog(Command): class DebugLog(Command):
def _convert(self, raw_data: Any) -> Dict[str, List[Dict[str, Any]]]: def __init__(self, onefuzz: "Onefuzz", logger: logging.Logger):
self.onefuzz = onefuzz
self.logger = logger
self._client: Optional[ApplicationInsightsDataClient] = None
self._app_id: Optional[str] = None
def _convert(self, raw_data: Any) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
results = {} results = {}
for table in raw_data.tables: for table in raw_data.tables:
result = [] result = []
@ -376,10 +386,18 @@ class DebugLog(Command):
) )
result.append(converted) result.append(converted)
results[table.name] = result results[table.name] = result
if list(results.keys()) == ["PrimaryResult"]:
return results["PrimaryResult"]
return results return results
def query( def query(
self, log_query: str, *, timespan: str = DAY_TIMESPAN, raw: bool = False self,
log_query: str,
*,
timespan: Optional[str] = DAY_TIMESPAN,
raw: bool = False,
) -> Any: ) -> Any:
""" """
Perform an Application Insights query Perform an Application Insights query
@ -391,17 +409,20 @@ class DebugLog(Command):
:param str timespan: ISO 8601 duration format :param str timespan: ISO 8601 duration format
:param bool raw: Do not simplify the data result :param bool raw: Do not simplify the data result
""" """
if self._app_id is None:
self._app_id = self.onefuzz.info.get().insights_appid
if self._app_id is None:
raise Exception("instance does not have an insights_appid")
if self._client is None:
creds, _ = get_azure_cli_credentials( creds, _ = get_azure_cli_credentials(
resource="https://api.applicationinsights.io" resource="https://api.applicationinsights.io"
) )
client = ApplicationInsightsDataClient(creds) self._client = ApplicationInsightsDataClient(creds)
app_id = self.onefuzz.info.get().insights_appid
if app_id is None:
raise Exception("instance does not have an insights_appid")
self.logger.debug("query: %s", log_query) self.logger.debug("query: %s", log_query)
raw_data = client.query.execute( raw_data = self._client.query.execute(
app_id, body=QueryBody(query=log_query, timespan=timespan) self._app_id, body=QueryBody(query=log_query, timespan=timespan)
) )
if "error" in raw_data.additional_properties: if "error" in raw_data.additional_properties:
raise Exception( raise Exception(
@ -412,16 +433,32 @@ class DebugLog(Command):
return self._convert(raw_data) return self._convert(raw_data)
def _query_parts( def _query_parts(
self, parts: List[str], timespan: str, *, raw: bool = False self, parts: List[str], *, timespan: Optional[str] = None, raw: bool = False
) -> Any: ) -> Any:
log_query = " | ".join(parts) log_query = " | ".join(parts)
return self.query(log_query, timespan=timespan, raw=raw) return self.query(log_query, timespan=timespan, raw=raw)
def _build_keyword_query(
self, value: str, limit: Optional[int] = None, desc: bool = True
) -> List[str]:
# See https://docs.microsoft.com/en-us/azure/data-explorer/kql-quick-reference
components = ["union isfuzzy=true exceptions, traces, customEvents"]
value = value.strip()
keywords = ['* has "%s"' % (x.replace('"', '\\"')) for x in value.split(" ")]
if keywords:
components.append("where " + " and ".join(keywords))
order = "desc" if desc else "asc"
if limit:
components.append(f"take {limit}")
components.append(f"order by timestamp {order}")
return components
def keyword( def keyword(
self, self,
value: str, value: str,
*, *,
timespan: str = DAY_TIMESPAN, timespan: Optional[str] = DAY_TIMESPAN,
limit: Optional[int] = None, limit: Optional[int] = None,
raw: bool = False, raw: bool = False,
) -> Any: ) -> Any:
@ -434,22 +471,58 @@ class DebugLog(Command):
:param bool raw: Do not simplify the data result :param bool raw: Do not simplify the data result
""" """
# See https://docs.microsoft.com/en-us/azure/data-explorer/kql-quick-reference components = self._build_keyword_query(value, limit=limit)
components = ["union isfuzzy=true exceptions, traces, customEvents"]
value = value.strip()
keywords = ['* has "%s"' % (x.replace('"', '\\"')) for x in value.split(" ")]
if keywords:
components.append("where " + " and ".join(keywords))
components.append("order by timestamp desc")
if limit is not None:
components.append(f"take {limit}")
return self._query_parts(components, timespan=timespan, raw=raw) return self._query_parts(components, timespan=timespan, raw=raw)
def tail(
self,
value: str,
*,
limit: int = 1000,
indent: Optional[int] = None,
filter: Optional[str] = "[message, name, customDimensions]",
timespan: Optional[str] = HOUR_TIMESPAN,
) -> None:
"""
Perform an Application Insights keyword query akin to "Transaction Search"
:param str value: Keyword to query Application Insights
:param str indent: Specify indent for JSON printing
:param str limit: Limit the number of records to return in each query
:param str filter: JMESPath filter for streaming results
"""
expression = None
if filter:
expression = jmespath.compile(filter)
base_query = self._build_keyword_query(value, limit=limit, desc=False)
last_seen: Optional[str] = None
wait = DEFAULT_TAIL_DELAY
while True:
query = base_query.copy()
if last_seen is not None:
query.append(f'where timestamp > datetime("{last_seen}")')
results = self._query_parts(query, timespan=timespan)
if results:
last_seen = results[-1]["timestamp"]
for entry in results:
if expression is not None:
entry = expression.search(entry)
if entry:
print(json.dumps(entry, indent=indent, sort_keys=True))
wait = DEFAULT_TAIL_DELAY
else:
self.onefuzz.logger.debug("waiting %f seconds", wait)
time.sleep(wait)
if wait < 60:
wait *= 1.5
def _query_libfuzzer_coverage( def _query_libfuzzer_coverage(
self, query: str, timespan: str, limit: Optional[int] = None self, query: str, timespan: str, limit: Optional[int] = None
) -> Any: ) -> Any:
@ -471,10 +544,7 @@ class DebugLog(Command):
if limit: if limit:
query_parts.append(f"take {limit}") query_parts.append(f"take {limit}")
results = self.onefuzz.debug.logs._query_parts(query_parts, timespan=timespan) return self.onefuzz.debug.logs._query_parts(query_parts, timespan=timespan)
if "PrimaryResult" in results:
return results["PrimaryResult"]
return results
def _query_libfuzzer_execs_sec( def _query_libfuzzer_execs_sec(
self, self,
@ -500,10 +570,7 @@ class DebugLog(Command):
if limit: if limit:
query_parts.append(f"take {limit}") query_parts.append(f"take {limit}")
results = self.onefuzz.debug.logs._query_parts(query_parts, timespan=timespan) return self.onefuzz.debug.logs._query_parts(query_parts, timespan=timespan)
if "PrimaryResult" in results:
return results["PrimaryResult"]
return results
class DebugNotification(Command): class DebugNotification(Command):