add the ability to execute a debug script (#39)

This commit is contained in:
bmc-msft
2020-09-29 09:56:53 -04:00
committed by GitHub
parent 1864e8b6e4
commit bc9d80e34b

View File

@ -7,9 +7,9 @@ import json
import logging
import os
import re
import subprocess # nosec
import uuid
from shutil import which
from subprocess import call # nosec
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar
from uuid import UUID
@ -48,6 +48,12 @@ def is_uuid(value: str) -> bool:
A = TypeVar("A", bound=BaseModel)
def wsl_path(path: str) -> str:
if which("wslpath"):
return subprocess.check_output(["wslpath", "-w", path]).decode().strip()
return path
class Endpoint:
endpoint: str
@ -299,8 +305,94 @@ class Repro(Endpoint):
self.logger.debug("listing repro vms")
return self._req_model_list("GET", models.Repro, json_data={})
def connect(self, vm_id: UUID_EXPANSION, delete_after_use: bool = False) -> None:
def _dbg_linux(
self, repro: models.Repro, debug_command: Optional[str]
) -> Optional[str]:
""" Launch gdb with GDB script that includes 'target remote | ssh ...' """
if (
repro.auth is None
or repro.ip is None
or repro.state != enums.VmState.running
):
raise Exception("vm setup failed: %s" % repro.state)
with build_ssh_command(
repro.ip, repro.auth.private_key, command="-T"
) as ssh_cmd:
gdb_script = [
"target remote | %s sudo /onefuzz/bin/repro-stdout.sh"
% " ".join(ssh_cmd)
]
if debug_command:
gdb_script += [debug_command, "quit"]
with temp_file("gdb.script", "\n".join(gdb_script)) as gdb_script_path:
dbg = ["gdb", "--silent", "--command", gdb_script_path]
if debug_command:
dbg += ["--batch"]
try:
return subprocess.run(
dbg, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
).stdout.decode(errors="ignore")
except subprocess.CalledProcessError as err:
self.logger.error(
"debug failed: %s", err.output.decode(errors="ignore")
)
raise err
else:
subprocess.call(dbg)
return None
def _dbg_windows(
self, repro: models.Repro, debug_command: Optional[str]
) -> Optional[str]:
""" Setup an SSH tunnel, then connect via CDB over SSH tunnel """
if (
repro.auth is None
or repro.ip is None
or repro.state != enums.VmState.running
):
raise Exception("vm setup failed: %s" % repro.state)
bind_all = which("wslpath") is not None and repro.os == enums.OS.windows
proxy = "*:" + REPRO_SSH_FORWARD if bind_all else REPRO_SSH_FORWARD
with ssh_connect(repro.ip, repro.auth.private_key, proxy=proxy):
dbg = ["cdb.exe", "-remote", "tcp:port=1337,server=localhost"]
if debug_command:
dbg_script = [debug_command, "qq"]
with temp_file("db.script", "\r\n".join(dbg_script)) as dbg_script_path:
dbg += ["-cf", wsl_path(dbg_script_path)]
logging.debug("launching: %s", dbg)
try:
return subprocess.run(
dbg, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
).stdout.decode(errors="ignore")
except subprocess.CalledProcessError as err:
self.logger.error(
"debug failed: %s", err.output.decode(errors="ignore")
)
raise err
else:
logging.debug("launching: %s", dbg)
subprocess.call(dbg)
return None
def connect(
self,
vm_id: UUID_EXPANSION,
delete_after_use: bool = False,
debug_command: Optional[str] = None,
) -> Optional[str]:
""" Connect to an existing Reproduction VM """
self.logger.info("connecting to reproduction VM: %s", vm_id)
if which("ssh") is None:
@ -323,9 +415,6 @@ class Repro(Endpoint):
if which("gdb") is None:
raise Exception("unable to find gdb")
wslpath = which("wslpath")
bind_all = wslpath is not None and repro.os == enums.OS.windows
def func() -> Tuple[bool, str, models.Repro]:
repro = self.get(vm_id)
state = repro.state
@ -338,41 +427,35 @@ class Repro(Endpoint):
)
repro = wait(func)
if (
repro.auth is None
or repro.ip is None
or repro.state != enums.VmState.running
):
raise Exception("vm setup failed: %s" % repro.state)
proxy = "*:" + REPRO_SSH_FORWARD if bind_all else REPRO_SSH_FORWARD
result: Optional[str] = None
if repro.os == enums.OS.windows:
with ssh_connect(repro.ip, repro.auth.private_key, proxy=proxy):
call(["cdb.exe", "-remote", "tcp:port=1337,server=localhost"])
result = self._dbg_windows(repro, debug_command)
elif repro.os == enums.OS.linux:
result = self._dbg_linux(repro, debug_command)
else:
with build_ssh_command(
repro.ip, repro.auth.private_key, command="-T"
) as ssh_cmd:
cmd = " ".join(ssh_cmd)
command = "target remote | %s sudo /onefuzz/bin/repro-stdout.sh" % cmd
with temp_file("gdb.script", command) as gdb_script:
dbg = ["gdb", "--silent", "--command", gdb_script]
call(dbg)
raise NotImplementedError
if delete_after_use:
self.logger.debug("deleting vm %s", repro.vm_id)
self.delete(repro.vm_id)
return result
def create_and_connect(
self,
container: str,
path: str,
duration: int = 24,
delete_after_use: bool = False,
) -> None:
debug_command: Optional[str] = None,
) -> Optional[str]:
""" Create and connect to a Reproduction VM """
repro = self.create(container, path, duration=duration)
return self.connect(repro.vm_id, delete_after_use=delete_after_use)
return self.connect(
repro.vm_id, delete_after_use=delete_after_use, debug_command=debug_command
)
class Notifications(Endpoint):
@ -893,7 +976,7 @@ class Scaleset(Endpoint):
pool_name: str,
size: int,
*,
image: Optional[str],
image: Optional[str] = None,
vm_sku: Optional[str] = "Standard_D2s_v3",
region: Optional[str] = None,
spot_instances: bool = False,