add security auditing of python code using Bandit during CICD (#491)

This commit is contained in:
bmc-msft
2021-02-01 16:51:03 -05:00
committed by GitHub
parent 0f70ffa3e2
commit 5e2e9448df
12 changed files with 69 additions and 24 deletions

View File

@ -103,6 +103,7 @@ jobs:
cd src/cli
pip install -r requirements-lint.txt
flake8 .
bandit -r ./onefuzz/
black onefuzz examples tests --check
isort --profile black ./onefuzz ./examples/ ./tests/ --check
pytest -v tests
@ -208,6 +209,7 @@ jobs:
pip install -r requirements-dev.txt
pytest
flake8 .
bandit -r ./__app__/
black ./__app__/ ./tests --check
isort --profile black ./__app__/ ./tests --check
mypy __app__ ./tests

View File

@ -86,7 +86,9 @@ def choose_account(storage_type: StorageType) -> str:
# Use a random secondary storage account if any are available. This
# reduces IOP contention for the Storage Queues, which are only available
# on primary accounts
return random.choice(accounts[1:])
#
# security note: this is not used as a security feature
return random.choice(accounts[1:]) # nosec
@cached

View File

@ -4,3 +4,4 @@ mypy
isort
vulture
black
bandit

View File

@ -14,6 +14,7 @@ python setup.py sdist bdist_wheel
pip install -r requirements-lint.txt
black ./onefuzztypes --check
flake8 ./onefuzztypes
bandit -r ./onefuzztypes
isort --profile black ./onefuzztypes --check
mypy ./onefuzztypes --ignore-missing-imports
pytest -v tests

View File

@ -64,9 +64,13 @@ def is_uuid(value: str) -> bool:
A = TypeVar("A", bound=BaseModel)
def wsl_path(path: str) -> str:
def _wsl_path(path: str) -> str:
if which("wslpath"):
return subprocess.check_output(["wslpath", "-w", path]).decode().strip()
# security note: path should always be a temporary path constructed by
# this library
return (
subprocess.check_output(["wslpath", "-w", path]).decode().strip() # nosec
)
return path
@ -530,7 +534,9 @@ class Repro(Endpoint):
dbg += ["--batch"]
try:
return subprocess.run(
# security note: dbg is built from content coming from
# the server, which is trusted in this context.
return subprocess.run( # nosec
dbg, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
).stdout.decode(errors="ignore")
except subprocess.CalledProcessError as err:
@ -539,7 +545,9 @@ class Repro(Endpoint):
)
raise err
else:
subprocess.call(dbg)
# security note: dbg is built from content coming from the
# server, which is trusted in this context.
subprocess.call(dbg) # nosec
return None
def _dbg_windows(
@ -561,11 +569,13 @@ class Repro(Endpoint):
if debug_command:
dbg_script = [debug_command, "qq"]
with temp_file("db.script", "\r\n".join(dbg_script)) as dbg_script_path:
dbg += ["-cf", wsl_path(dbg_script_path)]
dbg += ["-cf", _wsl_path(dbg_script_path)]
logging.debug("launching: %s", dbg)
try:
return subprocess.run(
# security note: dbg is built from content coming from the server,
# which is trusted in this context.
return subprocess.run( # nosec
dbg, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
).stdout.decode(errors="ignore")
except subprocess.CalledProcessError as err:
@ -575,7 +585,9 @@ class Repro(Endpoint):
raise err
else:
logging.debug("launching: %s", dbg)
subprocess.call(dbg)
# security note: dbg is built from content coming from the
# server, which is trusted in this context.
subprocess.call(dbg) # nosec
return None

View File

@ -360,7 +360,12 @@ class DebugJob(Command):
if not os.path.exists(outdir):
os.makedirs(outdir)
self.logger.info("downloading: %s", name)
subprocess.check_output([azcopy, "sync", to_download[name], outdir])
# security note: the src for azcopy comes from the server which is
# trusted in this context, while the destination is provided by the
# user
subprocess.check_output( # nosec
[azcopy, "sync", to_download[name], outdir]
)
class DebugLog(Command):

View File

@ -68,9 +68,12 @@ class TemplateSubmitHandler(Endpoint):
if container_type == container.type:
seen = True
if not seen:
assert isinstance(request.user_fields["project"], str)
assert isinstance(request.user_fields["name"], str)
assert isinstance(request.user_fields["build"], str)
if not isinstance(request.user_fields["project"], str):
raise TypeError
if not isinstance(request.user_fields["name"], str):
raise TypeError
if not isinstance(request.user_fields["build"], str):
raise TypeError
container_name = _build_container_name(
self.onefuzz,
container_type,

View File

@ -25,7 +25,9 @@ def rdp_connect(ip: str, password: str, *, port: int) -> Generator:
)
encrypted = (
subprocess.check_output(
# security note: script includes content coming from the server,
# which is trusted in this context.
subprocess.check_output( # nosec
["powershell.exe", "-ExecutionPolicy", "Unrestricted", script]
)
.decode()
@ -46,5 +48,7 @@ def rdp_connect(ip: str, password: str, *, port: int) -> Generator:
os.chdir(tmpdir)
cmd = ["mstsc.exe", FILENAME]
logging.info("launching rdp: %s", " ".join(cmd))
yield subprocess.call(cmd)
# security note: mstsc.exe is called using a config with data coming
# from the server, which is trusted in this context.
yield subprocess.call(cmd) # nosec
os.chdir(previous)

View File

@ -33,7 +33,7 @@ def get_local_tmp() -> Optional[str]:
@contextmanager
def temp_file(
filename: str, content: str, *, permissions: Optional[str] = None
filename: str, content: str, *, set_owner_only: bool = False
) -> Generator:
with tempfile.TemporaryDirectory(dir=get_local_tmp()) as tmpdir:
full_path = os.path.join(tmpdir, filename)
@ -42,8 +42,10 @@ def temp_file(
with open(full_path, "w") as handle:
handle.write(content)
if permissions is not None and platform.system() != "Windows":
subprocess.check_call(["chmod", permissions, full_path])
if set_owner_only and platform.system() != "Windows":
# security note: full_path is created via callers using known static
# filenames within the newly created temporary file name
subprocess.check_call(["chmod", "600", full_path]) # nosec
yield full_path
@ -59,7 +61,7 @@ def build_ssh_command(
port: Optional[int] = None,
command: Optional[str] = None,
) -> Generator:
with temp_file("id_rsa", private_key, permissions="600") as ssh_key:
with temp_file("id_rsa", private_key, set_owner_only=True) as ssh_key:
cmd = [
"ssh",
"onefuzz@%s" % ip,
@ -102,10 +104,14 @@ def ssh_connect(
logging.info("launching ssh: %s", " ".join(cmd))
if call:
yield subprocess.call(cmd)
# security note: command includes user provided arguments
# intentionally
yield subprocess.call(cmd) # nosec
return
with subprocess.Popen(
# security note: command includes user provided arguments
# intentionally
with subprocess.Popen( # nosec
cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, bufsize=0
) as ssh:
yield ssh

View File

@ -80,7 +80,9 @@ class OssFuzz(Command):
dst_sas,
]
self.logger.info("copying base setup")
subprocess.check_output(cmd)
# security note: the source and destination container sas URLS are
# considerd trusted from the service
subprocess.check_output(cmd) # nosec
def _copy_exe(self, src_sas: str, dst_sas: str, target_exe: File) -> None:
files: List[File] = [target_exe]
@ -100,7 +102,9 @@ class OssFuzz(Command):
dst_url,
]
self.logger.info("uploading %s", path)
subprocess.check_output(cmd)
# security note: the source and destination container sas URLS
# are considerd trusted from the service
subprocess.check_output(cmd) # nosec
def libfuzzer(
self,
@ -141,7 +145,10 @@ class OssFuzz(Command):
container_sas[name] = sas_url
self.logger.info("uploading build artifacts")
subprocess.check_output(
# security note: the container sas is considered trusted from the
# service
subprocess.check_output( # nosec
[
"azcopy",
"sync",
@ -151,7 +158,7 @@ class OssFuzz(Command):
"*fuzzer_seed_corpus.zip",
]
)
subprocess.check_output(
subprocess.check_output( # nosec
[
"azcopy",
"sync",

View File

@ -4,3 +4,4 @@ pytest
isort
vulture
black
bandit

View File

@ -5,3 +5,4 @@ isort
pydantic
vulture
black
bandit