#67 Add systems tests for HIRS Provisioner TPM 2.0 (#73)

* Added System Tests.

* Cleaned up scripts

* Cleaned up system tests.

* Cleaned up system tests.

* Cleaned up system tests.

* Updated system tests.

* Code review updates.
This commit is contained in:
busaboy1340 2019-01-15 12:46:06 -05:00 committed by GitHub
parent 05a78a3d79
commit 17d7dbd6f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1526 additions and 125 deletions

10
.ci/docker/.env Normal file
View File

@ -0,0 +1,10 @@
HIRS_ACA_PROVISIONER_IP=172.19.0.3
TPM_ENABLED=true
IMA_ENABLED=false
HIRS_ACA_PORTAL_IP=172.19.0.2
HIRS_ACA_PORTAL_PORT=8443
HIRS_BROKER_PORT=61616
HIRS_ACA_PORTAL_CONTAINER_PORT=80
HIRS_SUBNET=172.19.0.0/16

View File

@ -1,17 +1,46 @@
version: "3.1"
services:
aca:
image: hirs/hirs-ci:aca
ports:
- "8443:8443"
volumes:
- ../../:/HIRS
command: /HIRS/.ci/integration-tests/setup-aca.sh
tpm2provisioner:
image: hirs/hirs-ci:tpm2provisioner
depends_on:
- "aca"
volumes:
- ../../:/HIRS
network_mode: "host"
command: /HIRS/.ci/integration-tests/setup-tpm2provisioner.sh
aca:
image: hirs/hirs-ci:aca
container_name: hirs-aca
volumes:
- ../../:/HIRS
ports:
- "${HIRS_ACA_PORTAL_PORT}:${HIRS_ACA_PORTAL_CONTAINER_PORT}"
entrypoint: /bin/bash -c
command: [HIRS/.ci/integration-tests/setup-aca.sh]
networks:
hirs_aca_system_tests:
ipv4_address: ${HIRS_ACA_PORTAL_IP}
tpm2provisioner:
image: hirs/hirs-ci:tpm2provisioner
container_name: hirs-aca-provisioner-tpm2
depends_on:
- aca
volumes:
- ../../:/HIRS
entrypoint: /bin/bash -c
command: [HIRS/.ci/integration-tests/setup-tpm2provisioner.sh;
HIRS/.ci/system-tests/systems-test-centos7-tpm2.sh]
networks:
hirs_aca_system_tests:
ipv4_address: ${HIRS_ACA_PROVISIONER_IP}
environment:
- HIRS_ACA_PROVISIONER_IP=${HIRS_ACA_PROVISIONER_IP}
- TPM_ENABLED=${TPM_ENABLED}
- IMA_ENABLED=${IMA_ENABLED}
- HIRS_ACA_PORTAL_IP=${HIRS_ACA_PORTAL_IP}
- HIRS_ACA_PORTAL_PORT=${HIRS_ACA_PORTAL_PORT}
- HIRS_BROKER_PORT=${HIRS_BROKER_PORT}
- HIRS_ACA_PORTAL_CONTAINER_PORT=${HIRS_ACA_PORTAL_CONTAINER_PORT}
- HIRS_SUBNET=${HIRS_SUBNET}
networks:
hirs_aca_system_tests:
driver: bridge
ipam:
driver: default
config:
- subnet: ${HIRS_SUBNET}

View File

@ -1,33 +0,0 @@
#!/bin/bash
# Script to run the Integration Tests for HIRS
set -e
# Start Integration Testing Docker Environment
docker-compose -f .ci/docker/docker-compose.yml up -d
# Check to see if Environment Stand-Up is Complete
# TODO: Refine to handle multiple container IDs
container_id_regex='([a-f0-9]{12})\s+hirs\/hirs-ci:tpm2provisioner'
while : ; do
docker_containers=$(docker container ls)
if [[ $docker_containers =~ $container_id_regex ]]; then
container_id=${BASH_REMATCH[1]}
break
fi
echo "Containers not found. Waiting 5 seconds."
sleep 5
done
tpm2_provisioner_started_regex='TPM2 Provisioner Loaded!'
while : ; do
docker_logs=$(docker logs $container_id)
if [[ $docker_logs =~ $tpm2_provisioner_started_regex ]]; then
break
fi
echo "Containers not completely booted. Waiting 10 seconds."
sleep 10
done
echo "Environment Stand-Up Complete!"

View File

@ -1,89 +1,143 @@
#!/bin/bash
# Script to setup the TPM2 Provisioner Docker Image for Integration Tests
set -e
# Wait for ACA to boot
until [ "`curl --silent --connect-timeout 1 -I -k https://localhost:8443/HIRS_AttestationCAPortal | grep '302 Found'`" != "" ]; do
:
echo "Waiting for ACA to spin up..."
until [ "`curl --silent --connect-timeout 1 -I -k https://${HIRS_ACA_PORTAL_IP}:${HIRS_ACA_PORTAL_PORT}/HIRS_AttestationCAPortal | grep '302 Found'`" != "" ]; do
:
done
echo "ACA is up!"
pushd /HIRS
if [ ! -d package/rpm/RPMS ]; then
./package/package.centos.sh
fi
yum install -y package/rpm/RPMS/x86_64/HIRS_Provisioner_TPM_2_0*.el7.x86_64.rpm
popd
# Function to install provisioner packages.
function InstallProvisioner {
echo "===========Installing Provisioner Packages...==========="
pushd /HIRS
if [ ! -d package/rpm/RPMS ]; then
./package/package.centos.sh
fi
yum install -y package/rpm/RPMS/x86_64/HIRS_Provisioner_TPM_2_0*.el7.x86_64.rpm
popd
}
mkdir -p /var/run/dbus
if [ -e /var/run/dbus/pid ]; then
rm /var/run/dbus/pid
fi
# Function to initialize the TPM2 Emulator
function InitTpmEmulator {
echo "===========Initializing TPM2 Emulator...==========="
mkdir -p /var/run/dbus
if [ -e /var/run/dbus/pid ]; then
rm /var/run/dbus/pid
fi
if [ -e /var/run/dbus/system_bus_socket ]; then
rm /var/run/dbus/system_bus_socket
fi
# Start the DBus
dbus-daemon --fork --system
echo "DBus started"
# Give DBus time to start up
sleep 5
/ibmtpm/src/./tpm_server &
echo "TPM Emulator started"
tpm2-abrmd -t socket &
echo "TPM2-Abrmd started"
# Give ABRMD time to start and register on the DBus
sleep 5
# EK and PC Certificate
ek_cert_der="/HIRS/.ci/integration-tests/certs/ek_cert.der"
platform_cert="platformAttributeCertificate.pem"
echo "Creating Platform Cert for Container."
PC_DIR=/var/hirs/pc_generation
mkdir -p $PC_DIR
/opt/paccor/scripts/allcomponents.sh > $PC_DIR/componentsFile
/opt/paccor/scripts/referenceoptions.sh > $PC_DIR/optionsFile
/opt/paccor/scripts/otherextensions.sh > $PC_DIR/extensionsFile
/opt/paccor/bin/observer -c $PC_DIR/componentsFile -p $PC_DIR/optionsFile -e $ek_cert_der -f $PC_DIR/observerFile
/opt/paccor/bin/signer -o $PC_DIR/observerFile -x $PC_DIR/extensionsFile -b 20180101 -a 20280101 -N $RANDOM -k /HIRS/.ci/integration-tests/certs/ca.key -P /HIRS/.ci/integration-tests/certs/ca.crt --pem -f $PC_DIR/$platform_cert
if tpm2_nvlist | grep -q 0x1c00002; then
echo "Released NVRAM for EK."
tpm2_nvrelease -x 0x1c00002 -a 0x40000001
fi
# Define nvram space to enable loading of EK cert (-x NV Index, -a handle to
# authorize [0x40000001 = ownerAuth handle], -s size [defaults to 2048], -t
# specifies attribute value in publicInfo struct
# [0x2000A = ownerread|ownerwrite|policywrite])
size=$(cat $ek_cert_der | wc -c)
echo "Define NVRAM location for EK cert of size $size."
tpm2_nvdefine -x 0x1c00002 -a 0x40000001 -t 0x2000A -s $size
# Load key into TPM nvram
echo "Loading EK cert into NVRAM."
tpm2_nvwrite -x 0x1c00002 -a 0x40000001 $ek_cert_der
if tpm2_nvlist | grep -q 0x1c90000; then
echo "Released NVRAM for PC."
tpm2_nvrelease -x 0x1c90000 -a 0x40000001
fi
# Store the platform certificate in the TPM's NVRAM
size=$(cat $PC_DIR/$platform_cert | wc -c)
echo "Define NVRAM location for PC cert of size $size."
tpm2_nvdefine -x 0x1c90000 -a 0x40000001 -t 0x2000A -s $size
echo "Loading PC cert into NVRAM."
tpm2_nvwrite -x 0x1c90000 -a 0x40000001 $PC_DIR/$platform_cert
echo "===========TPM2 Emulator Initialization Complete!==========="
# Set Logging to INFO Level
sed -i "s/WARN/INFO/" /etc/hirs/TPM2_Provisioner/log4cplus_config.ini
}
if [ -e /var/run/dbus/system_bus_socket ]; then
rm /var/run/dbus/system_bus_socket
fi
# Function to update the hirs-site.config file
function UpdateHirsSiteConfigFile {
HIRS_SITE_CONFIG="/etc/hirs/hirs-site.config"
echo ""
echo "===========Updating ${HIRS_SITE_CONFIG}, using values from /HIRS/.ci/docker/.env file...==========="
cat /HIRS/.ci/docker/.env
cat <<DEFAULT_SITE_CONFIG_FILE > $HIRS_SITE_CONFIG
#*******************************************
#* HIRS site configuration properties file
#*******************************************
CLIENT_HOSTNAME=${HIRS_ACA_PROVISIONER_IP}
TPM_ENABLED=${TPM_ENABLED}
IMA_ENABLED=${IMA_ENABLED}
# Site-specific configuration
ATTESTATION_CA_FQDN=${HIRS_ACA_PORTAL_IP}
ATTESTATION_CA_PORT=${HIRS_ACA_PORTAL_PORT}
BROKER_FQDN=${HIRS_ACA_PORTAL_IP}
BROKER_PORT=${HIRS_BROKER_PORT}
PORTAL_FQDN=${HIRS_ACA_PORTAL_IP}
PORTAL_PORT=${HIRS_ACA_PORTAL_PORT}
DEFAULT_SITE_CONFIG_FILE
# Start the DBus
dbus-daemon --fork --system
echo "DBus started"
echo "===========New HIRS Config File==========="
cat /etc/hirs/hirs-site.config
}
# Give DBus time to start up
sleep 5
# Install packages
InstallProvisioner
/ibmtpm/src/./tpm_server &
echo "TPM Emulator started"
# Install TPM Emulator
InitTpmEmulator
tpm2-abrmd -t socket &
echo "TPM2-Abrmd started"
# Update the hir-site.config file
UpdateHirsSiteConfigFile
# Give ABRMD time to start and register on the DBus
sleep 5
# EK and PC Certificate
ek_cert_der="/HIRS/.ci/integration-tests/certs/ek_cert.der"
platform_cert="platformAttributeCertificate.pem"
echo "Creating Platform Cert for Container"
PC_DIR=/var/hirs/pc_generation
mkdir -p $PC_DIR
/opt/paccor/scripts/allcomponents.sh > $PC_DIR/componentsFile
/opt/paccor/scripts/referenceoptions.sh > $PC_DIR/optionsFile
/opt/paccor/scripts/otherextensions.sh > $PC_DIR/extensionsFile
/opt/paccor/bin/observer -c $PC_DIR/componentsFile -p $PC_DIR/optionsFile -e $ek_cert_der -f $PC_DIR/observerFile
/opt/paccor/bin/signer -o $PC_DIR/observerFile -x $PC_DIR/extensionsFile -b 20180101 -a 20280101 -N $RANDOM -k /HIRS/.ci/integration-tests/certs/ca.key -P /HIRS/.ci/integration-tests/certs/ca.crt --pem -f $PC_DIR/$platform_cert
# Release EK Cert if one exists
if tpm2_nvlist | grep -q 0x1c00002; then
tpm2_nvrelease -x 0x1c00002 -a 0x40000001
fi
# Define nvram space to enable loading of EK cert (-x NV Index, -a handle to
# authorize [0x40000001 = ownerAuth handle], -s size [defaults to 2048], -t
# specifies attribute value in publicInfo struct
# [0x2000A = ownerread|ownerwrite|policywrite])
size=$(cat $ek_cert_der | wc -c)
echo "Define nvram location for ek cert of size $size"
tpm2_nvdefine -x 0x1c00002 -a 0x40000001 -t 0x2000A -s $size
# Load EK Cert into TPM nvram
echo "Load ek cert into nvram"
tpm2_nvwrite -x 0x1c00002 -a 0x40000001 $ek_cert_der
# Release Platform Cert if one exists
if tpm2_nvlist | grep -q 0x1c90000; then
tpm2_nvrelease -x 0x1c90000 -a 0x40000001
fi
# Store the platform certificate in the TPM's NVRAM
echo "Load platform cert into nvram"
tpm2_nvdefine -x 0x1c90000 -a 0x40000001 -t 0x2000A -s $(cat $PC_DIR/$platform_cert | wc -c)
tpm2_nvwrite -x 0x1c90000 -a 0x40000001 $PC_DIR/$platform_cert
# Set Logging to INFO Level
sed -i "s/WARN/INFO/" /etc/hirs/TPM2_Provisioner/log4cplus_config.ini
echo "TPM2 Provisioner Loaded!"
tail -f /dev/null
echo ""
echo "===========HIRS ACA Provisioner Setup Complete!==========="

View File

@ -0,0 +1,41 @@
#!/bin/bash
# Script to run the System Tests for HIRS
set -e
echo ""
echo "System Tests Starting..."
echo ""
# Start System Testing Docker Environment
cd .ci/docker
docker-compose up -d
tpm2_container_id="$(docker ps -aqf "name=hirs-aca-provisioner-tpm2")"
echo "TPM2 Container ID: $tpm2_container_id"
tpm2_container_status="$(docker inspect $tpm2_container_id --format='{{.State.Status}}')"
echo "TPM2 Container Status: $tpm2_container_status"
while [ $tpm2_container_status == "running" ]
do
sleep 10
# Add status message, so Travis will not time out.
# It may timeout if it has'nt received output for more than 10 minutes.
echo "Still running tests, please wait..."
tpm2_container_status="$(docker inspect $tpm2_container_id --format='{{.State.Status}}')"
done
echo ""
echo "===========hirs-aca-provisioner-tpm2 System Tests Log:==========="
docker logs $tpm2_container_id
echo ""
echo "End of System Tests, cleaning up..."
echo ""
docker-compose down

View File

@ -0,0 +1,769 @@
# system_test.py - implements a group of tests that run appraisals on a client and server
# TODO: test_01-test_11 will need to be implemented when the additional HIRS
# projects are imported to the new GitHub repo. The test code is commented out for now.
import binascii
from ConfigParser import SafeConfigParser
import datetime
import json
import os
import shlex
import subprocess
import unittest
import re
import requests
import logging
import pprint
import hashlib
import random
import uuid
import time
import sys
import argparse
from system_test_core import HIRSPortal, AttestationCAPortal, collectors, \
send_command, send_command_sha1sum, run_hirs_report, \
run_hirs_provisioner_tpm2, parse_xml_with_stripped_namespaces, get_current_timestamp, \
get_all_nodes_recursively, touch_random_file_and_remove, get_random_pcr_hex_value, \
is_ubuntu_client, is_tpm2,\
DEFAULT_IMA_POLICY, DEFAULT_TPM_POLICY
NUMBER_OF_PCRS = 24
suffix = os.environ.get('RANDOM_SYS_TEST_ID')
if suffix != None:
print "Configuring with suffix " + suffix
suffix = "-" + suffix
else:
suffix = ""
COLLECTOR_LIST = os.environ.get('ENABLED_COLLECTORS').split(',')
CLIENT = os.environ.get('CLIENT_HOSTNAME')
CLIENT_OS = os.environ.get('CLIENT_OS')
TPM_VERSION = os.environ.get('TPM_VERSION')
HIRS_SERVER_URL = "https://TBD/HIRS_Portal/"
HIRS_ATTESTATION_CA_PORTAL_URL = "https://" + \
os.environ.get('HIRS_ACA_PORTAL_IP') +":" + \
os.environ.get('HIRS_ACA_PORTAL_PORT') + \
"/HIRS_AttestationCAPortal/"
TEST_LOG_FILE = os.environ.get('TEST_LOG')
LOG_LEVEL = os.environ.get('LOG_LEVEL')
CA_CERT_LOCATION = "/HIRS/.ci/integration-tests/certs/ca.crt"
EK_CA_CERT_LOCATION = "/HIRS/.ci/integration-tests/certs/ek_cert.der"
USB_STORAGE_FILE_HASH = "e164c378ceb45a62642730be5eb3169a6bfc2d6d"
USB_STORAGE_FILE_HASH_2 = "e164c378ceb45a62642730be5eb3169a6bfc1234"
FORMAT = "%(asctime)-15s %(message)s"
provisioner_out = None
logging.basicConfig(filename=TEST_LOG_FILE,level=eval(LOG_LEVEL), format=FORMAT)
logging.info("*****************beginning of system_test.py*****************")
logging.info("The ACA Portal is: " + HIRS_ATTESTATION_CA_PORTAL_URL)
Portal = HIRSPortal(HIRS_SERVER_URL)
AcaPortal = AttestationCAPortal(HIRS_ATTESTATION_CA_PORTAL_URL)
requests.packages.urllib3.disable_warnings()
class SystemTest(unittest.TestCase):
@classmethod
def setUpClass(self):
"""Set the class up"""
def setUp(self):
"""Set the systems tests state up for testing"""
# Portal.set_group_appraisal_wait_setting()
AcaPortal.disable_supply_chain_validations()
def tearDown(self):
"""Tears down the state for testing"""
@collectors(['IMA', 'TPM'], COLLECTOR_LIST)
def test_01_empty_baselines(self):
"""Test that appraisal succeeds with empty IMA and TPM baselines"""
logging.info("*****************beginning of empty baseline test*****************")
# Portal.set_default_policies(ima_policy=DEFAULT_IMA_POLICY, tpm_policy=DEFAULT_TPM_POLICY)
# result = run_hirs_report(CLIENT)
# self.assertTrue(result)
# self.assertEqual(0, Portal.get_alert_count_from_latest_report())
@collectors(['IMA'], COLLECTOR_LIST)
def test_02_small_ima_appraisal(self):
"""Test that appraisal works with a small hard-coded IMA baseline
steps:
- upload a small hard-coded required set (two records)
- make a policy that points to that baseline as its required set
- set the default device group to point to that policy
- run a report from the client machine using vagrant ssh
"""
logging.info("*****************beginning of small IMA appraisal test*****************")
# baseline = make_simple_ima_baseline()
# policy_name = Portal.add_ima_policy(required_set=baseline, policy_name_prefix='small_ima')
# Portal.set_default_policies(ima_policy=policy_name)
# result = run_hirs_report(CLIENT)
# self.assertTrue(result)
@collectors(['IMA'], COLLECTOR_LIST)
def test_03_large_ima_appraisal(self):
"""Test that appraisal works with a full-size IMA baseline
steps:
- generate an XML report or use a cached one
- convert the IMA part of the report into a csv baseline
- upload the csv file as an IMA baseline
- make a policy that points to that baseline as its required set
- set the default device group to point to that policy
- run a report from the client machine using vagrant ssh
"""
logging.info("*****************beginning of large IMA appraisal test*****************")
# empty_ima_policy = Portal.add_ima_policy(required_set=None, policy_name_prefix="empty")
# Portal.set_default_policies(ima_policy=empty_ima_policy,
# tpm_policy=DEFAULT_TPM_POLICY)
# run_hirs_report(CLIENT)
# xml_report = Portal.get_latest_report()
# baseline = make_baseline_from_xml(xml_report, "IMA")
# policy_name = Portal.add_ima_policy(required_set=baseline, unknown_fail="true", policy_name_prefix="large_ima")
# Portal.set_default_policies(ima_policy=policy_name)
# result = run_hirs_report(CLIENT)
# after_alerts = Portal.get_alerts_from_latest_report()
# new_alert_count = after_alerts['recordsTotal']
# logging.info("{0} new alerts generated by latest report".format(new_alert_count))
# if new_alert_count > 0:
# logging.warning("new alert count: " + str(new_alert_count))
# #logging.debug("new alerts:\n{0}".format(pprint.pformat(after_alerts['data'][0:new_alert_count])))
# self.assertTrue(True)
@collectors(['IMA'], COLLECTOR_LIST)
def test_04_small_ima_appraisal_required_set_missing(self):
"""Test that appraisal results in an appropriate alert generation when a required set file is missing
steps:
- upload a small hard-coded required set (two records)
- add a fictitious file to the baseline
- make a policy that points to that baseline as its required set
- set the default device group to point to that policy
- run a report from the client machine using vagrant ssh
- make sure it failed and that one appropriate alert was thrown
"""
logging.info("*****************beginning of small IMA appraisal test with required set missing*****************")
# baseline = make_simple_ima_baseline()
# baseline["name"] = "ima_baseline_missing_required_record_{0}".format(get_current_timestamp())
# random_hash = str(hashlib.sha1(str(random.random())).hexdigest())
# missing_file = "/required_directory/required_file"
# baseline["records"].append({"path": missing_file, "hash": random_hash})
# policy_name = Portal.add_ima_policy(required_set=baseline, policy_name_prefix="small_ima_req")
# Portal.set_default_policies(ima_policy=policy_name)
#
# result = run_hirs_report(CLIENT)
# self.assertFalse(result)
# after_alerts = Portal.get_alerts_from_latest_report()
# new_alert_count = after_alerts['recordsTotal']
# self.assertEqual(new_alert_count, 1)
#
# # find the alert with the most recent createTime
# latest_alert = max(after_alerts['data'], key=lambda alert: alert['createTime'])
# self.assertTrue("MISSING_RECORD" in latest_alert['type'])
# self.assertTrue(random_hash in latest_alert['expected'])
# self.assertTrue(missing_file in latest_alert['expected'])
@collectors(['TPM'], COLLECTOR_LIST)
def test_05_tpm_white_list_appraisal(self):
"""Test that appraisal works with a TPM white list baseline
steps:
- run hirs report to generate an XML report for baseline creation
- download the latest report in XML format
- convert the TPM part of the report into a json baseline
- make a policy that points to that json TPM white list baseline
- set the default device group to point to that policy
- run a report from the client machine
"""
logging.info("*****************beginning of TPM white list appraisal test*****************")
# empty_ima_policy = Portal.add_ima_policy(required_set=None)
# Portal.set_default_policies(ima_policy=empty_ima_policy,
# tpm_policy=DEFAULT_TPM_POLICY)
# result = run_hirs_report(CLIENT)
# self.assertTrue(result)
# xml_report = Portal.get_latest_report()
# baseline = make_baseline_from_xml(xml_report, "TPM")
# policy_name = Portal.add_tpm_wl_policy(baseline, policy_name_prefix="good")
# Portal.set_default_policies(tpm_policy=policy_name)
# result = run_hirs_report(CLIENT)
# self.assertTrue(result)
# self.assertEqual(0, Portal.get_alert_count_from_latest_report())
#
# # create a new baseline with random PCR values
# baseline_bad_tpm_pcr = make_baseline_from_xml(xml_report, "TPM")
# for pcr_index in range(0, NUMBER_OF_PCRS):
# baseline_bad_tpm_pcr["records"][pcr_index]["hash"] = get_random_pcr_hex_value()
#
# policy_name = Portal.add_tpm_wl_policy(baseline_bad_tpm_pcr, policy_name_prefix='bad_vals')
# Portal.set_default_policies(tpm_policy=policy_name)
# result = run_hirs_report(CLIENT)
# self.assertFalse(result)
# self.assertEqual(NUMBER_OF_PCRS, Portal.get_alert_count_from_latest_report())
#
# after_alerts = Portal.get_alerts()
#
# # for the set of new alerts, verify the alert fields for each PCR value
# # the order of the alerts it not necessarily PCR 0, 1, 2... , so we must index
# # in to the hash table correctly
# for alert_index in range(0, NUMBER_OF_PCRS):
# pcr_alert = after_alerts["data"][alert_index]
# alert_details = pcr_alert["details"]
# pcr_int = int(re.findall(r'\d+', alert_details)[0])
#
# logging.info("Checking TPM alert for PCR %s", pcr_int)
#
# self.assertTrue("WHITE_LIST_PCR_MISMATCH" in pcr_alert['type'])
# self.assertTrue("TPM_APPRAISER" in pcr_alert['source'])
# baseline_hash = baseline_bad_tpm_pcr["records"][pcr_int]["hash"]
# reported_hash = baseline["records"][pcr_int]["hash"]
#
# self.assertTrue(baseline_hash in pcr_alert['expected'])
# self.assertTrue(reported_hash in pcr_alert['received'])
@collectors(['IMA'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_06_ima_blacklist_appraisal(self):
"""Test that appraisal works with a small IMA blacklist baseline
steps:
- upload a policy with a small hard-coded blacklist baseline
- set the default device group to point to that policy
- run a report from the client machine and ensure the appraisal passes
- touch a file on the client that is contained in the blacklist
- run another report from the client machine and ensure the appraisal fails
"""
logging.info("*****************beginning of blacklist IMA appraisal test*****************")
# baseline = make_simple_ima_blacklist_baseline()
# policy_name = Portal.add_ima_policy(blacklist=baseline, policy_name_prefix='small_ima_blacklist')
# Portal.set_default_policies(ima_policy=policy_name)
#
# result = run_hirs_report(CLIENT)
# self.assertTrue(result)
#
# send_command('touch /boot/usb-storage-foo.ko')
# #send_command('sudo cat /tmp/usb-storage-foo.ko')
# result = run_hirs_report(CLIENT)
# self.assertFalse(result)
#
# after_alerts = Portal.get_alerts_from_latest_report()
# new_alert_count = after_alerts['recordsTotal']
# self.assertEqual(new_alert_count, 1)
#
# # find the alert with the most recent createTime
# latest_alert = after_alerts['data'][0]
# self.assertTrue("IMA_BLACKLIST_PATH_MATCH" in latest_alert['type'])
# self.assertTrue("usb-storage-foo.ko" in latest_alert['expected'])
#
# #
# # create ima blacklist baseline that contains a hash and generate alert upon detection
# #
#
# # create file and add content to file
# send_command('touch /tmp/usb-storage_2.ko')
# send_command('echo blacklist >> /tmp/usb-storage_2.ko')
# policy_name = Portal.add_ima_policy(blacklist=None,
# policy_name_prefix='empty')
# Portal.set_default_policies(ima_policy=policy_name)
#
# # send report to verify successful appraisal
# result = run_hirs_report(CLIENT)
# self.assertTrue(result)
#
# # create blacklist baseline with hash and update policy
# baseline = make_simple_ima_blacklist_baseline_with_hash();
# policy_name = Portal.add_ima_policy(blacklist=baseline,
# policy_name_prefix='small_ima_blacklist_with_hash')
# Portal.set_default_policies(ima_policy=policy_name)
#
# # trigger measurement of file and run hirs report
# send_command('sudo cat /tmp/usb-storage_2.ko')
# result = run_hirs_report(CLIENT)
# self.assertFalse(result)
#
# after_alerts = Portal.get_alerts_from_latest_report()
# new_alert_count = after_alerts['recordsTotal']
# self.assertEqual(new_alert_count, 1)
#
# # find the alert with the most recent createTime
# latest_alert = after_alerts['data'][0]
# self.assertTrue("IMA_BLACKLIST_HASH_MATCH" in latest_alert['type'])
# self.assertTrue(USB_STORAGE_FILE_HASH in latest_alert['expected'])
#
# #
# # create ima blacklist baseline that contains a file and hash and generate alert upon detection
# #
# policy_name = Portal.add_ima_policy(blacklist=None,
# policy_name_prefix='empty')
# Portal.set_default_policies(ima_policy=policy_name)
#
# # send report to verify successful appraisal
# result = run_hirs_report(CLIENT)
# self.assertTrue(result)
#
# # create blacklist baseline with file and hash and update policy
# baseline = make_simple_ima_blacklist_baseline_with_file_and_hash();
# policy_name = Portal.add_ima_policy(blacklist=baseline,
# policy_name_prefix='small_ima_blacklist_with_file_and_hash')
# Portal.set_default_policies(ima_policy=policy_name)
#
# result = run_hirs_report(CLIENT)
# self.assertFalse(result)
#
# after_alerts = Portal.get_alerts_from_latest_report()
# new_alert_count = after_alerts['recordsTotal']
# self.assertEqual(new_alert_count, 1)
#
# # find the alert with the most recent createTime
# latest_alert = after_alerts['data'][0]
# self.assertTrue("IMA_BLACKLIST_PATH_AND_HASH_MATCH" in latest_alert['type'])
# self.assertTrue("usb-storage_2.ko" in latest_alert['expected'])
# self.assertTrue(USB_STORAGE_FILE_HASH in latest_alert['expected'])
#
# #
# # change ima blacklist baseline file and hash and verify alert is not generated
# #
#
# # create blacklist baseline with file and hash and update policy
# baseline = make_simple_ima_blacklist_baseline_with_updated_file_and_hash();
# policy_name = Portal.add_ima_policy(blacklist=baseline,
# policy_name_prefix='small_ima_blacklist_with_updated_file_and_hash')
# Portal.set_default_policies(ima_policy=policy_name)
#
# result = run_hirs_report(CLIENT)
# self.assertTrue(result)
@collectors(['IMA'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_07_delta_reports_required_set(self):
"""Test that appraisal works with delta reports and required sets.
steps:
- Run hirs report with an empty required set and delta reports
enabled
- Check first report for success and to make sure the test files
are not there
- Add the two test files (foo-file and foo-bar-file) to the required
set with a hashes that indicates the files are empty
- create foo-file and read it as root so it is measured by IMA
- Run second hirs report
- Check for failed appraisal (foo-bar-file hasn't been created yet)
- Check that the report includes foo-file, but not foo-bar-file
- Create foo-bar-file and read it as root
- Run third hirs report
- Check for failed appraisal (foo-file was in the previous report,
so it won't be included in this one.
- Check that foo-bar-file is in this report, but not foo-file
"""
logging.info("*****************beginning of Delta Reports required set appraisal test*****************")
# unique_name = uuid.uuid4().hex
# baseline_name = 'delta-reports-required-baseline-' + unique_name
# foo_file_name = 'foo-file-' + unique_name
# foo_bar_file_name = 'foo-bar-file-' + unique_name
# test_hash = 'a94a8fe5ccb19ba61c4c0873d391e987982fbbd3'
#
# baseline = {"name": baseline_name,
# "description": "a simple hard-coded ima baseline "
# "for delta reports systems testing",
# "records": []}
#
# ima_policy = Portal.add_ima_policy(required_set=baseline, delta_reports_enabled="true", policy_name_prefix="delta_with_required_set")
# Portal.set_default_policies(ima_policy=ima_policy)
# run_hirs_report(CLIENT)
# report = Portal.get_latest_report()
# found_foo_file = foo_file_name in report
# found_foo_bar_file = foo_bar_file_name in report
# self.assertFalse(found_foo_file)
# self.assertFalse(found_foo_bar_file)
#
# Portal.add_to_ima_baseline(baseline_name, foo_file_name, test_hash)
# Portal.add_to_ima_baseline(baseline_name, foo_bar_file_name, test_hash)
#
# #create foo_file_name. Don't create foo_bar_file_name yet.
# #send_vagrant_command('echo {0} > {1}'.format("test", foo_file_name), CLIENT)
# #send_vagrant_command('sudo cat {0}'.format(foo_file_name), CLIENT)
# send_command('echo {0} > {1}'.format("test", foo_file_name))
# send_command('sudo cat {0}'.format(foo_file_name))
#
# result = run_hirs_report(CLIENT)
# self.assertFalse(result, msg="report should fail - " + foo_bar_file_name + " not present")
# report = Portal.get_latest_report()
# found_foo_file = foo_file_name in report
# found_foo_bar_file = foo_bar_file_name in report
# self.assertTrue(found_foo_file)
# self.assertFalse(found_foo_bar_file)
#
# send_vagrant_command('echo {0} > {1}'.format("test", foo_bar_file_name), CLIENT)
# send_vagrant_command('sudo cat {0}'.format(foo_bar_file_name), CLIENT)
# result = run_hirs_report(CLIENT)
# self.assertFalse(result, msg="delta reporting should fail becuase foo_file was in an earlier report")
# report = Portal.get_latest_report()
# found_foo_file = foo_file_name in report
# found_foo_bar_file = foo_bar_file_name in report
# self.assertFalse(found_foo_file)
# self.assertTrue(found_foo_bar_file)
#
# send_vagrant_command('rm {0}'.format(foo_file_name), CLIENT)
# send_vagrant_command('rm {0}'.format(foo_bar_file_name), CLIENT)
@collectors(['IMA'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_08_delta_reports_whitelist(self):
"""Test that appraisal works with delta reports. Each report should be
appraised individually. Checks that a failed appraisal can be followed
by a successful appraisal if there are no errors in the second delta
report.
steps:
- Run hirs report with an empty required set and delta reports
enabled
- Check first report for success and to make sure the test files
are not there
- Add a test file (foo-file) to the whitelist with a hash that
indicates the file is empty
- Create foo-file with contents and read it as root so it is
measured by IMA
- Run second hirs report
- Check for failed appraisal (foo-file should be a whitelist
mismatch because the file isn't empty)
- Check that the report includes foo-file
- Run third hirs report
- Check for successful appraisal (the mismatch was in the previous
report so it won't be included in this one.
- Check that foo-file is not in this report
"""
logging.info("*****************beginning of Delta Reports whitelist appraisal test*****************")
# unique_name = uuid.uuid4().hex
# baseline_name = 'delta-reports-whitelist-baseline-' + unique_name
# foo_file_name = 'foo-file-' + unique_name
# foo_bar_file_name = 'foo-bar-file-' + unique_name
# test_hash = 'a94a8fe5ccb19ba61c4c0873d391e987982fbbd3'
#
# baseline = {"name": baseline_name,
# "description": "a simple hard-coded ima baseline "
# "for delta reports systems testing",
# "records": []}
#
# ima_policy = Portal.add_ima_policy(whitelist=baseline, delta_reports_enabled="true", policy_name_prefix="delta_with_whitelist")
# Portal.set_default_policies(ima_policy=ima_policy)
# run_hirs_report(CLIENT)
# report = Portal.get_latest_report()
# found_foo_file = foo_file_name in report
# self.assertFalse(found_foo_file)
#
# Portal.add_to_ima_baseline(baseline_name, foo_file_name, test_hash)
#
# #create foo_file_name. Don't create foo_bar_file_name yet.
# send_vagrant_command('echo \'foo-file\' > {0}'.format(foo_file_name), CLIENT)
# send_vagrant_command('sudo cat {0}'.format(foo_file_name), CLIENT)
#
# result = run_hirs_report(CLIENT)
# self.assertFalse(result, msg="report should fail - whitelist mismatch for " + foo_bar_file_name)
# report = Portal.get_latest_report()
# found_foo_file = foo_file_name in report
# self.assertTrue(found_foo_file)
#
# result = run_hirs_report(CLIENT)
# self.assertTrue(result, msg="delta reporting should pass because the mismatched record should be found in a previous report")
# report = Portal.get_latest_report()
# found_foo_file = foo_file_name in report
# self.assertFalse(found_foo_file)
#
# send_vagrant_command('rm {0}'.format(foo_file_name), CLIENT)
@collectors(['IMA', 'TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_09_on_demand(self):
"""Test that on-demand (server-initiated) appraisal works.
steps:
- push a simple ima baseline
- set the policy
- touch a random file, take the hash, then remove it
- kick off an on-demand report on the server for the default device group
- sleep to let the appraisal finish
- pull the generated report
- check that it passed appraisal
- check that it has the random filename and hash
- check that it contains a TPM Report
"""
logging.info("*****************beginning of on-demand test*****************")
# baseline = make_simple_ima_baseline()
# policy_name = Portal.add_ima_policy(required_set=baseline, delta_reports_enabled="false", policy_name_prefix='on_demand')
# logging.info('on demand policy name: %s', policy_name)
# Portal.set_default_policies(ima_policy=policy_name, tpm_policy=DEFAULT_TPM_POLICY)
# first_report_summary = Portal.get_latest_report_summary()
#
# (filename, sha_hash) = touch_random_file_and_remove(CLIENT)
# partial_filename = filename.split('/')[-1]
# logging.info("touched file {} with hash {}".format(filename, sha_hash))
# Portal.start_on_demand()
# logging.info("started on-demand appraisal")
#
# latest_report_summary = None
#
# attempts = 0
# while latest_report_summary == None or latest_report_summary['report']['id'] == first_report_summary['report']['id']:
# attempts += 1
# time.sleep(20)
# latest_report_summary = Portal.get_latest_report_summary()
# if attempts == 6:
# self.fail("No new report summary was found after 120 seconds; failing.")
#
# self.assertEqual(latest_report_summary["hirsAppraisalResult"]["appraisalStatus"], 'PASS')
#
# self.assertTrue(Portal.report_contains_ima_record(
# partial_filename, sha_hash, latest_report_summary['report']['id']))
# sub_reports = latest_report_summary['report']['reports']
# self.assertTrue(any(sr for sr in sub_reports if 'TPMReport' in sr['reportType']),
# "report summary should contain a TPMReport as a sub-report")
@collectors(['IMA'], COLLECTOR_LIST)
@unittest.skip("SELinux issues are preventing repo sync from working")
def test_10_failing_ima_appraisal_broad_repo_baseline(self):
"""Test that an appraisal not containing expected packages in a broad repo IMA baseline fails.
steps:
- Create a Yum repository with a local file URL and sync it
- Create a broad baseline using the Yum repository
- Add the baseline to the required set for the default IMA policy
- Run a HIRS report and ensure it fails
- Ensure that at least one of the expected alerts has been generated
"""
logging.info("*****************beginning of broad repo failing appraisal test*****************")
# repo_name = "Test Yum Repository"
# baseline_name = "Test Broad Baseline"
# policy_name = "Test Broad Repo IMA Policy"
# repo_url = 'file:///flamethrower/Systems_Tests/resources/repositories/small_yum_repo'
#
# Portal.configure_yum_repository(repo_name, repo_url)
# Portal.create_broad_ima_baseline(baseline_name, repo_name)
# Portal.create_policy(policy_name, "IMA")
# Portal.add_baseline_to_required_sets(policy_name, baseline_name)
# Portal.set_tpm_ima_policy(ima_policy=policy_name, tpm_policy=DEFAULT_TPM_POLICY)
#
# self.assertFalse(run_hirs_report(CLIENT))
# alerts = Portal.get_alerts_from_latest_report()
# self.assertTrue(alerts_contain(alerts['data'], {
# 'source': 'IMA_APPRAISER',
# 'type': 'MISSING_RECORD',
# 'expected': '(/usr/lib64/glusterfs/3.7.6/xlator/features/quota.so, SHA-1 - 0xc9b5e8df6b50f2f58ea55fd41a962393d9eeec94)',
# }))
@collectors(['IMA'], COLLECTOR_LIST)
@unittest.skip("SELinux issues are preventing repo sync from working")
@unittest.skipIf(is_ubuntu_client(CLIENT_OS), "Skipping this test due to client OS " + CLIENT_OS)
def test_11_successful_ima_appraisal_broad_repo_baseline(self):
"""Test that an appraisal containing expected packages in a broad repo IMA baseline passes.
This test only works on CentOS 6 and 7.
steps:
- Create a Yum repository with a local file URL and sync it
- Create a broad baseline using the Yum repository
- Add the baseline to the required set for the default IMA policy
- Install RPMs in repository to client machine and read them with root to ensure their placement in the IMA log
- Run a HIRS report and ensure it passes
- Ensure that there are no new alerts
"""
logging.info("*****************beginning of broad repo successful appraisal test*****************")
# repo_name = "Test Yum Repository"
# baseline_name = "Test Broad Baseline"
# policy_name = "Test Broad Repo IMA Policy"
# repo_url = 'file:///flamethrower/Systems_Tests/resources/repositories/two_package_yum_repo'
#
# Portal.configure_yum_repository(repo_name, repo_url)
# Portal.create_broad_ima_baseline(baseline_name, repo_name)
# Portal.create_policy(policy_name, "IMA")
# Portal.add_baseline_to_required_sets(policy_name, baseline_name)
# Portal.set_partial_paths_for_ima_policy(policy_name, True)
# Portal.set_tpm_ima_policy(ima_policy=policy_name, tpm_policy=DEFAULT_TPM_POLICY)
#
# if CLIENT_OS in ["centos6", "centos7"]:
# send_vagrant_command("sudo rpm -i --force /flamethrower/Systems_Tests/resources/repositories/two_package_yum_repo/SimpleTest1-1-1.noarch.rpm", CLIENT)
# send_vagrant_command("sudo rpm -i --force /flamethrower/Systems_Tests/resources/repositories/two_package_yum_repo/SimpleTest2-1-1.noarch.rpm", CLIENT)
# else:
# logging.error("unsupported client os: %s", CLIENT_OS)
#
# send_vagrant_command("sudo find /opt/simpletest -type f -exec head {} \;", CLIENT)
#
# self.assertTrue(run_hirs_report(CLIENT))
# self.assertEqual(Portal.get_alert_count_from_latest_report(), 0)
def test_12_attestation_ca_portal_online(self):
"""Test that the Attestation CA Portal is online and accessible by making a GET request.
If not online, an exception will be raised since the response code is non-200"""
logging.info("*****************beginning of attestation ca portal online test *****************")
AcaPortal.check_is_online()
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_13_tpm2_initial_provision(self):
"""Test that running the tpm2 hirs provisioner works"""
logging.info("*****************beginning of initial provisioner run *****************")
# Run the provisioner to ensure that it provisions successfully
provisioner_out = run_hirs_provisioner_tpm2(CLIENT)
print("Initial provisioner run output: {0}".format(provisioner_out))
def test_14_device_info_report_stored_after_provisioning(self):
"""Test that running the hirs provisioner results in storing a device info report for
the device in the DB"""
logging.info("*****************beginning of provisioner + device info report test *****************")
logging.info("getting devices from ACA portal")
aca_portal_devices = AcaPortal.get_devices()
self.assertEqual(aca_portal_devices['recordsTotal'], 1)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_15_supply_chain_validation_summary_stored_after_second_provisioning(self):
"""Test that running the hirs provisioner, a second time, results in storing a supply chain validation
record in the database"""
logging.info("*****************beginning of provisioner + supply chain validation summary test *****************")
if is_tpm2(TPM_VERSION):
logging.info("Using TPM 2.0")
logging.info("Uploading CA cert: " + CA_CERT_LOCATION)
AcaPortal.upload_ca_cert(CA_CERT_LOCATION)
AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT)
else:
# Supply chain validation only supported on CentOS 7
if CLIENT_OS == "centos7":
AcaPortal.upload_ca_cert(EK_CA_CERT_LOCATION)
AcaPortal.enable_ec_validation()
provisioner_out = run_hirs_provisioner(CLIENT)
print("Second provisioner run output: {0}".format(provisioner_out))
supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries()
# verify this is one SCVS record indicating PASS
self.assertEqual(supply_chain_validation_summaries['recordsTotal'], 2)
self.assertEqual(supply_chain_validation_summaries['data'][0]['overallValidationResult'], "PASS")
self.assertEqual(supply_chain_validation_summaries['data'][1]['overallValidationResult'], "PASS")
# verify device has been updated with supply chain appraisal result
devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_16_ek_info_report(self):
"""Test that running the hirs provisioner results in storing EK certs info report for
the device in the DB"""
logging.info("*****************beginning of provisioner + Endorsement certs info report test *****************")
logging.info("getting ek certs from ACA portal")
cert_list = AcaPortal.get_ek_certs()
self.assertEqual(cert_list['recordsTotal'], 1)
self.assertEqual(cert_list['data'][0]['credentialType'], "TCPA Trusted Platform Module Endorsement")
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_17_pk_info_report(self):
"""Test that running the hirs provisioner results in storing PK certs info report for
the device in the DB"""
logging.info("*****************beginning of provisioner + Platform certs info report test *****************")
logging.info("getting pk certs from ACA portal")
cert_list = AcaPortal.get_pk_certs()
self.assertEqual(cert_list['recordsTotal'], 1)
self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement")
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_18_trust_chain_info_report(self):
"""Test that running the hirs provisioner results in storing trust chains info report for
the device in the DB"""
logging.info("*****************beginning of provisioner + Trust chains info report test *****************")
logging.info("getting trust chains from ACA portal")
trust_chain_list = AcaPortal.get_trust_chains()
self.assertEqual(trust_chain_list['recordsTotal'], 1)
def make_simple_ima_baseline():
timestamp = get_current_timestamp()
if CLIENT_OS == "centos6":
records = [{"path": "/lib/udev/console_init",
"hash": send_command_sha1sum("sha1sum /lib/udev/console_init")},
{"path": "/bin/mknod",
"hash": send_command_sha1sum("sha1sum /bin/mknod")}]
elif CLIENT_OS == "centos7":
records = [{"path": "/lib/systemd/rhel-readonly",
"hash": send_command_sha1sum("sha1sum /lib/systemd/rhel-readonly")},
{"path": "/bin/sort",
"hash": send_command_sha1sum("sha1sum /bin/sort")}]
elif CLIENT_OS == "ubuntu16":
records = [{"path": "/lib/systemd/systemd-udevd",
"hash": send_command_sha1sum("sha1sum /lib/systemd/systemd-udevd")},
{"path": "/bin/udevadm",
"hash": send_command_sha1sum("sha1sum /bin/udevadm")}]
else:
logging.error("unsupported client os type: %s", CLIENT_OS)
simple_baseline = {"name": "simple_ima_baseline_{0}".format(timestamp),
"description": "a simple hard-coded ima baseline for systems testing",
"records": records}
return simple_baseline
def make_baseline_from_xml(xml_report, appraiser_type):
"""search the xml for records and add each one to a dictionary."""
timestamp = get_current_timestamp()
baseline_name = "full_{0}_baseline_{1}".format(appraiser_type, timestamp)
baseline_description = "{0} baseline created by parsing an xml report and uploaded for systems testing".format(appraiser_type)
baseline = {"name": baseline_name, "description": baseline_description}
baseline["records"] = []
tree = parse_xml_with_stripped_namespaces(xml_report)
if appraiser_type == "TPM":
pcr_tags = get_all_nodes_recursively(tree, "PcrValue")
for pcr_tag in pcr_tags:
tpm_digest = get_all_nodes_recursively(pcr_tag, "digest")[0].text
parsed_record = {}
parsed_record["pcr"] = pcr_tag.attrib['PcrNumber']
parsed_record["hash"] = binascii.hexlify(binascii.a2b_base64(tpm_digest))
baseline["records"].append(parsed_record)
if appraiser_type == "IMA":
ima_records = get_all_nodes_recursively(tree, "imaRecords")
for ima_record in ima_records:
ima_path = get_all_nodes_recursively(ima_record, "path")[0].text
ima_digest = get_all_nodes_recursively(ima_record, "digest")[0].text
parsed_record = {}
parsed_record['path'] = ima_path
hash64 = ima_digest
parsed_record["hash"] = (
binascii.hexlify(binascii.a2b_base64(hash64)))
baseline["records"].append(parsed_record)
logging.info("created {0} baseline from xml with {1} records".format(
appraiser_type, str(len(baseline["records"]))))
return baseline
def make_simple_ima_blacklist_baseline():
return {
"name": "simple_ima_blacklist_baseline_{0}".format(get_current_timestamp()),
"description": "a simple blacklist ima baseline for systems testing",
"records": [{"path": "/boot/usb-storage-foo.ko"}]
#"records": [{"path": "usb-storage-foo.ko"}]
}
def make_simple_ima_blacklist_baseline_with_hash():
return {
"name": "simple_ima_blacklist_baseline_{0}".format(get_current_timestamp()),
"description": "a simple blacklist ima baseline for systems testing",
"records": [{"hash": USB_STORAGE_FILE_HASH}]
}
def make_simple_ima_blacklist_baseline_with_file_and_hash():
return {
"name": "simple_ima_blacklist_baseline_{0}".format(get_current_timestamp()),
"description": "a simple blacklist ima baseline for systems testing",
"records": [{"path": "usb-storage_2.ko",
"hash": USB_STORAGE_FILE_HASH}]
}
def make_simple_ima_blacklist_baseline_with_updated_file_and_hash():
return {
"name": "simple_ima_blacklist_baseline_{0}".format(get_current_timestamp()),
"description": "a simple blacklist ima baseline for systems testing",
"records": [{"path": "test-file",
"hash": USB_STORAGE_FILE_HASH_2}]
}
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(SystemTest)
ret = not unittest.TextTestRunner(verbosity=2).run(suite).wasSuccessful()
sys.exit(ret)

View File

@ -0,0 +1,479 @@
# Defines core methods shared amongst system test scripts
import sets
import unittest
import shlex
import subprocess
import os
import binascii
import requests
import logging
import random
import time
import datetime
import json
import pprint
import xml.etree.ElementTree as ET
from StringIO import StringIO
DEFAULT_GROUP_NAME = "Default Group"
DEFAULT_TPM_POLICY = "Test TPM Policy"
DEFAULT_IMA_POLICY = "Test IMA Policy"
CACHED_XML_REPORT = None
APPRAISAL_SUCCESS_MESSAGE = "Appraisal passed"
class HIRSPortal:
def __init__(self, hirs_server_url):
self.server_url = hirs_server_url
def request(self, method, path, params={}, data={}, files={}, expected_status_codes=[200], operation=None, verify=False):
return web_request(self.server_url, method, path, params, data, files, expected_status_codes, operation, verify)
def set_default_policies(self, tpm_policy="No Policy",
ima_policy="No Policy"):
"""set the given policies to be the policies for the default group."""
payload = {"description": "default group modified for systems tests",
"name": DEFAULT_GROUP_NAME}
# TODO this will report failure if the group already exists. Not sure how to avoid this
request_result = self.request("post", "portal/group/create", data=payload)
self.set_tpm_ima_policy(DEFAULT_GROUP_NAME, tpm_policy, ima_policy)
def set_tpm_ima_policy(self, group_name=DEFAULT_GROUP_NAME, tpm_policy=None, ima_policy=None):
"""set the TPM and IMA policy for the group"""
payload = {"name": group_name,
"ima": ima_policy,
"tpm": tpm_policy,
"optionRadio" : "existingImaPolicy",
"policyName" : ""}
self.request("post", "portal/group/update/policies", data=payload)
payload = {"name": group_name,
"ima": ima_policy,
"tpm": tpm_policy,
"optionRadio" : "existingTpmPolicy",
"policyName" : ""}
self.request("post", "portal/group/update/policies", data=payload)
def set_group_appraisal_wait_setting(self, group_name=DEFAULT_GROUP_NAME,
is_client_waiting='checked'):
"""set the specified group's client wait for appraisal setting to the specified value."""
self.request("post", "portal/group/editWaitForAppraisalCompletion", data={"groupName": group_name, "enabled" : is_client_waiting})
def get_latest_report(self):
"""Retrieves the latest report that was created for the given client.
The retrieved report is cached. Calling run_hirs_report will clear the
latest report from the cache.
"""
global CACHED_XML_REPORT
if CACHED_XML_REPORT:
logging.info("found cached XML report")
return CACHED_XML_REPORT
logging.info("cached XML report not found, retrieving latest report from"
"the server")
latest_report_id = self.get_latest_report_summary()['report']['id']
logging.info("requesting raw report")
request_result = self.request("get", "portal/report/xml/raw?uuid=" + latest_report_id, operation="get latest report")
CACHED_XML_REPORT = request_result.text
return CACHED_XML_REPORT
def get_alert_count_from_latest_report(self):
""" Retrieves the alert count from the latest report. """
return self.get_alerts_from_latest_report()['recordsTotal']
def get_alerts_from_latest_report(self):
""" Retrieves the alert list from the latest report. """
latest_report_id = self.get_latest_report_summary()['report']['id']
return self.request("get", "portal/alerts/list?report=" + latest_report_id).json()
def start_on_demand(self, group_name="Default%20Group"):
self.request("get", "portal/on-demand/group/" + group_name)
def get_latest_report_summary(self):
"""Pull the latest report summary from the Portal."""
all_reports = self.request("get", "portal/report/list").json()['data']
if len(all_reports) == 0:
return None
return max(all_reports, key=lambda report: report['timestamp'])
def get_devices(self):
"""Get devices Portal."""
return self.request("get", "portal/devices/list").json()
def report_contains_ima_record(self, filename, sha_hash, report_id):
"""Check whether the report with the given id contains the given filename
and hash.
"""
logging.info("checking if report with ID {} contains file {} with hash {}".format(
report_id, filename, sha_hash))
ima_records = self.request("get", "portal/report/list/imaRecords", params={'scope': 'REPORT', 'id': report_id}).json()['data']
def record_matcher(record):
# check for IMA records with this hash, and if the filename is in the record's path
# (works for full or partial path)
return (record['hash']['digestString'] == sha_hash) and (filename in record['path'])
matching_records = filter(record_matcher, ima_records)
return len(matching_records) > 0
def upload_payload(self, payload):
json_path = "tmp.json"
json_file = open(json_path, 'w')
json_file.write(json.dumps(payload))
json_file.close()
post_file = {'file': open(json_path, 'rb')}
logging.debug("uploading policy:\n{0}".format(pprint.pformat(payload)))
response = self.request("post", "portal/policies/import", files=post_file, operation="upload policy")
post_file['file'].close()
os.remove(json_path)
return payload["name"]
def add_ima_policy(self, required_set=None, whitelist=None, blacklist=None, ignore=None, unknown_fail="false", delta_reports_enabled="false", policy_name_prefix=""):
timestamp = get_current_timestamp()
policy_name = "{0}_IMA_systems_test_policy_{1}".format(policy_name_prefix, timestamp)
policy_description = "IMA policy for systems testing"
payload = {"name": policy_name,
"description": policy_description,
"type": "IMA"}
required_payload, whitelist_payload, ignore_payload, blacklist_payload = [], [], [], []
if required_set is not None:
required_payload.append(required_set)
if whitelist is not None:
whitelist_payload.append(whitelist)
if blacklist is not None:
blacklist_payload.append(blacklist)
if ignore is not None:
ignore_payload.append(ignore)
ima_payload = {
"deltaReportEnable": delta_reports_enabled,
"failOnUnknowns": unknown_fail,
"validatePcr": "false",
"checkSubsequentBaselines": "true",
"partialPathEnable": "true",
"required": required_payload,
"whitelist": whitelist_payload,
"blacklist": blacklist_payload,
"ignoreSet": ignore_payload
}
payload.update(ima_payload)
return self.upload_payload(payload)
def add_tpm_wl_policy(self, baseline, policy_name_prefix=""):
timestamp = get_current_timestamp()
policy_name = "{0}_TPM_systems_test_wl_policy_{1}".format(policy_name_prefix, timestamp)
policy_description = "TPM white list policy for systems testing"
payload = {"name": policy_name,
"description": policy_description,
"type": "TPM"}
tpm_payload = {"appraiserPcrMask": 0xffffff,
"reportPcrMask": 0xffffff,
"appraiseFullReport": "true",
"validateSignature": "true",
"white-list-baselines": [baseline]}
payload.update(tpm_payload)
return self.upload_payload(payload)
def add_tpm_bl_policy(self, baseline, policy_name_prefix=""):
timestamp = get_current_timestamp()
policy_name = "{0}_TPM_systems_test_bl_policy_{1}".format(policy_name_prefix, timestamp)
policy_description = "TPM black list policy for systems testing"
payload = {"name": policy_name,
"description": policy_description,
"type": "TPM"}
tpm_payload = {"appraiserPcrMask": 0xffffff,
"reportPcrMask": 0xffffff,
"appraiseFullReport": "true",
"validateSignature": "true",
"black-list-baselines": [baseline]}
payload.update(tpm_payload)
return self.upload_payload(payload)
def add_to_ima_baseline(self, baseline_name, file_path, file_hash):
self.request("post", "portal/baselines/record/ima/add", data={'name': baseline_name, 'path': file_path, 'hash': file_hash}, operation="add to IMA baseline")
def upload_csv_baseline(self, baseline_path, appraiser_type):
post_file = {'file': open(baseline_path, 'rb')}
current_time = datetime.datetime.now()
baseline_name = baseline_path.split('.')[0] + '_' + str(current_time.hour) + '-' + str(current_time.minute) + '-' + str(current_time.second)
self.request("post", "uploadImaCsv", data={'baselineName': baseline_name, 'optionsRadios': appraiser_type}, files=post_file, operation="upload baseline")
if request_result != 200:
logging.error("upload baseline return code: {0}, response text:\n"
"{1}".format(request_result.status_code, request_result.text))
post_file['file'].close()
subprocess.call("rm " + baseline_path, shell=True)
return baseline_name
"""Creates a Yum repository, configures it with a URL, triggers an update, and waits for the update to complete via Portal endpoints."""
def configure_yum_repository(self, baseline_name, base_url):
self.request("post", "portal/repository/create", params={'name':baseline_name,'type':'Yum'}, operation="create Yum repository")
self.request("post", "portal/repository/update/url", params={'name':baseline_name,'baseUrl':base_url}, operation="set URL of Yum repository")
self.request("post", "portal/repository/job/trigger", params={'name':baseline_name}, operation="update Yum repository")
# 4. wait for update to finish
update_complete = False
max_wait_time_seconds = 240
sleep_time_seconds = 5
counter = 1
while not update_complete:
time.sleep(sleep_time_seconds)
if counter * sleep_time_seconds >= max_wait_time_seconds:
msg = "Timeout waiting for repository update: {0} seconds".format(max_wait_time_seconds)
logging.error(msg)
raise RuntimeError(msg)
counter += 1
request_result = self.request("get", "portal/repository/job/check", params={'name':baseline_name}, operation="check status of repo update job")
update_complete = not json.loads(request_result.text)['jobCurrentlyRunning']
"""Creates a BroadRepoImaBaseline repository, configures it with a repository, and updates the baseline from the repository's contents via Portal endpoints."""
def create_broad_ima_baseline(self, baseline_name, repository_name):
self.request("post", "portal/baselines/create", params={'name':baseline_name,'type':'broad'}, operation="create broad baseline")
self.request("post", "portal/baselines/update/repositories", params={'name':baseline_name,'repositories':[repository_name]}, operation="add repository to broad baseline")
self.request("post", "portal/baselines/triggerupdate", params={'name':baseline_name}, operation="update broad repository from its repository")
"""Creates a new Policy with the given type and name via Portal endpoints."""
def create_policy(self, name, policy_type):
self.request("post", "portal/policies/create", params={'name':name,'type':policy_type}, operation="create new policy")
"""Enables or disables partial path checking for an IMA policy."""
def set_partial_paths_for_ima_policy(self, policy_name, enabled):
checked = 'unchecked'
if enabled:
checked = 'checked'
self.request("post", "portal/policies/update", params={'name':policy_name,'partial':checked}, operation="update policy's partial path setting")
"""Enables or disables kernel detection for a TPM policy."""
def set_kernel_setting(self, policy_name, kernel_detect_enabled, kernel_alert_enabled, kernel_alert_severity="UNSPECIFIED"):
kernel_detect_checked = 'false'
if kernel_detect_enabled:
kernel_detect_checked = 'true'
kernel_alert_checked = 'false'
if kernel_alert_enabled:
kernel_alert_checked = 'true'
self.request("post", "portal/policies/update/editKernelDetectSettings", params={'name':policy_name,'kernelDetectToggle':kernel_detect_checked,'kernelAlertToggle':kernel_alert_checked,'kernelAlertSeverity':kernel_alert_severity}, operation="update policy's kernel detection setting")
"""Creates a new Policy with the given type and name via Portal endpoints."""
def add_baseline_to_required_sets(self, policy_name, baseline_name):
self.request("post", "portal/policies/update", params={'name':policy_name,'required':[baseline_name]}, operation="add baseline to required sets")
def get_alerts(self):
return self.request("get", "portal/alerts/list").json()
class AttestationCAPortal:
def __init__(self, hirs_server_url):
self.server_url = hirs_server_url
def request(self, method, path, params={}, data={}, files={}, expected_status_codes=[200], operation=None, verify=False):
return web_request(self.server_url, method, path, params, data, files, expected_status_codes, operation, verify)
def check_is_online(self):
return self.request("get", "portal/certificate-request/platform-credentials/list").json()
def get_supply_chain_validation_summaries(self):
return self.request("get", "portal/validation-reports/list").json()
def disable_supply_chain_validations(self):
# the initial POST request goes through, but the redirect from the server is attempted which results in a 404,
# or possibly a 200 on centos7, apparently.
self.request("post", "portal/policy/update-ec-validation",
expected_status_codes=[404, 200], params={'ecValidate': "unchecked",})
self.request("post", "portal/policy/update-pc-validation",
expected_status_codes=[404, 200], params={'pcValidate': 'unchecked'})
self.request("post", "portal/policy/update-pc-attribute-validation",
expected_status_codes=[404, 200], params={'pcAttributeValidate': 'unchecked'})
def enable_supply_chain_validations(self):
# the initial POST request goes through, but the redirect from the server is attempted which results in a 404,
# or possibly a 200 on centos7, apparently.
self.request("post", "portal/policy/update-ec-validation",
expected_status_codes=[404, 200], params={'ecValidate': "checked",})
self.request("post", "portal/policy/update-pc-validation",
expected_status_codes=[404, 200], params={'pcValidate': 'checked'})
#self.request("post", "portal/policy/update-pc-attribute-validation",
# expected_status_codes=[404, 200], params={'pcAttributeValidate': 'checked'})
def enable_ec_validation(self):
self.request("post", "portal/policy/update-ec-validation",
expected_status_codes=[404, 200], params={'ecValidate': "checked",})
def get_devices(self):
"""Get devices from ACA portal."""
return self.request("get", "portal/devices/list").json()
def get_ek_certs(self):
"""Get EK certs from ACA portal."""
return self.request("get", "portal/certificate-request/endorsement-key-credentials/list").json()
def get_pk_certs(self):
"""Get PK certs from ACA portal."""
return self.request("get", "portal/certificate-request/platform-credentials/list").json()
def get_trust_chains(self):
"""Get trust chains from ACA portal."""
return self.request("get", "portal/certificate-request/trust-chain/list").json()
def upload_ca_cert(self, ca_cert_file):
file = {'file': open(ca_cert_file, 'rb')}
self.request("post", "portal/certificate-request/trust-chain/upload", files=file, operation="upload CA cert")
def web_request(server_url, method, path, params={}, data={}, files={}, expected_status_codes=[200], operation=None, verify=False):
url = server_url + path
if method not in ['get', 'post']:
raise ValueError("Method " + method + " not valid.")
request_response = getattr(requests, method)(url, params=params, data=data, files=files, verify=verify)
request_msg = method + " " + url
if operation == None:
operation = request_msg
else:
operation += " (" + request_msg + ")"
check_request_response(expected_status_codes, request_response, operation)
return request_response
"""Checks a requests response to see if its status code matches the expected status code. If it does, this method returns True. If it does not, this
method will log the error and return False."""
def check_request_response(expected_status_codes, request_result, operation):
if not request_result.status_code in expected_status_codes:
message = "Unable to " + operation + ": {0}, response text:\n{1}".format(request_result.status_code, request_result.text)
logging.error(message)
raise RuntimeError(message)
def collectors(collectors, collector_list):
enabled_collectors = sets.Set(collector_list)
tested_collectors = sets.Set(collectors)
if tested_collectors.issubset(enabled_collectors):
return lambda func: func
return unittest.skip("{0} collector isn't enabled".format(tested_collectors.difference(enabled_collectors)))
def send_command(full_command, accept_nonzero_status=False):
parsed_command = shlex.split(full_command)
p = subprocess.Popen(parsed_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
client_out, client_err = p.communicate()
if p.returncode != 0 and not accept_nonzero_status:
logging.error("Command: " + full_command + " exited with return code " + str(p.returncode))
logging.error(str(client_out))
logging.error(str(client_err))
raise RuntimeError("Command exited with a nonzero status, out:\n" + str(client_out) + "\nerr:\n" + str(client_err))
return client_out
def send_command_sha1sum(full_command, accept_nonzero_status=False):
sha1sum_command = shlex.split(full_command)
head_command = ['head', '-c40']
p1 = subprocess.Popen(sha1sum_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p2 = subprocess.Popen(head_command, stdin=p1.stdout,stdout=subprocess.PIPE)
p1.stdout.close()
client_out, client_err = p2.communicate()
if p2.returncode != 0 and not accept_nonzero_status:
logging.error("Command: " + full_command + " exited with return code " + str(p2.returncode))
logging.error(str(client_out))
logging.error(str(client_err))
raise RuntimeError("Command exited with a nonzero status, out:\n" + str(client_out) + "\nerr:\n" + str(client_err))
return client_out
def run_hirs_report(client_hostname):
"""Runs a hirs report for the specified client host name.
The cached xml report is cleared.
Returns true if the client output indicated appraisal success. false otherwise
"""
client_out = run_hirs_report_and_clear_cache_V2(client_hostname)
if APPRAISAL_SUCCESS_MESSAGE in client_out:
logging.info("Report appraisal passed")
return True
else:
logging.info("Report appraisal unsuccessful: " + client_out)
return False
def run_hirs_report_and_clear_cache(client_hostname):
"""Runs a hirs report for the specified client host name.
The cached xml report is cleared.
Returns the client output text from running the command.
"""
logging.info("running hirs report over ssh on {0}".format(client_hostname))
client_out = send_command("sudo hirs report",accept_nonzero_status=True)
global CACHED_XML_REPORT
if CACHED_XML_REPORT:
logging.info("clearing cached XML report")
CACHED_XML_REPORT = None
return client_out
def run_hirs_provisioner_tpm2(client_hostname):
"""Runs the hirs provisioner TPM2
"""
logging.info("running hirs provisioner tpm2 on {0}".format(client_hostname))
client_out = send_command("hirs-provisioner-tpm2 provision")
return client_out
def parse_xml_with_stripped_namespaces(raw_xml_string):
"""Parses the raw XML text in to an XML node element.
Strips namespaces which conflict with recusive tree search.
"""
it = ET.iterparse(StringIO(raw_xml_string))
for _, el in it:
if '}' in el.tag:
el.tag = el.tag.split('}', 1)[1] # strip all namespaces
for at in el.attrib.keys(): # strip namespaces of attributes too
if '}' in at:
newat = at.split('}', 1)[1]
el.attrib[newat] = el.attrib[at]
del el.attrib[at]
return it.root
def get_all_nodes_recursively(tree_node, node_name):
return tree_node.findall('.//' + node_name)
def touch_random_file_and_remove(client_hostname):
"""Write a random string to a random filename in /tmp/, read it as root, then delete it.
"""
random_number = str(int(random.random() * 100000))
filename = "/tmp/on_demand_test_file{}.txt".format(random_number)
echo_command = "echo {} > {}".format(random_number, filename)
cat_command = "sudo cat {}".format(filename)
sha_command = "sha1sum {}".format(filename)
rm_command = "rm {}".format(filename)
combined_command = "{};{};{};{}".format(echo_command, cat_command, sha_command, rm_command)
# command_output = send_vagrant_command(combined_command, client_hostname)
sha_hash = command_output.split()[1]
return (filename, sha_hash)
def get_random_pcr_hex_value():
""" Gets a random TPM PCR value by combining 2 UUIDs and getting a substring
"""
# get 40 hex chars
return str(binascii.b2a_hex(os.urandom(20)))
def get_current_timestamp():
current_time = datetime.datetime.now()
return current_time.strftime('%H-%M-%S')
def is_ubuntu_client(client_os):
return client_os in ["ubuntu14", "ubuntu16"]
def is_tpm2(tpm_version):
return tpm_version in ["2.0", "2"]

View File

@ -0,0 +1,14 @@
#!/bin/bash
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
export CLIENT_OS=centos7
export CLIENT_HOSTNAME=hirs-client-$CLIENT_OS-tpm2
export SERVER_OS=$CLIENT_OS
export SERVER_HOSTNAME=hirs-appraiser-$SERVER_OS
export ENABLED_COLLECTORS=
export TPM_VERSION=2.0
$SCRIPT_DIR/systems-test.core.sh

View File

@ -0,0 +1,28 @@
#!/bin/bash
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
TEST_LOG=$SCRIPT_DIR/test_logs/system_test_$CLIENT_OS.log
LOG_LEVEL=logging.INFO
export CLIENT_HOSTNAME CLIENT_OS TPM_VERSION ENABLED_COLLECTORS TEST_LOG LOG_LEVEL
# Prepare log file directory
rm -rf $SCRIPT_DIR/test_logs
mkdir -p $SCRIPT_DIR/test_logs
# Run system tests
echo "===========Running systems tests on ${SERVER_HOSTNAME} and ${CLIENT_HOSTNAME}==========="
TEST_OUTPUT=$SCRIPT_DIR/test_logs/test_output$$.txt
python $SCRIPT_DIR/system_test.py 2>&1 | tee $TEST_OUTPUT
SYSTEM_TEST_EXIT_CODE=$PIPESTATUS
# Check result
if [[ $SYSTEM_TEST_EXIT_CODE == 0 ]]
then
echo "SUCCESS: System tests passed"
exit 0
fi
echo "ERROR: System tests failed"
exit 1

7
.gitignore vendored
View File

@ -1,5 +1,12 @@
# compiled python for systems tests
*.pyc
*.pydevproject
# vi swap files
*.swp
# system test logs
.ci/system-tests/test_logs
# NetBeans specific #
private/

View File

@ -1,3 +1,6 @@
# NOTE: if you are editing this, try using the yamllint tool to check your work.
# yamllint disable rule:line-length
---
os:
- linux
@ -40,7 +43,7 @@ jobs:
- script: docker run --rm -v $(pwd):/HIRS hirs/hirs-ci:ubuntu18 /bin/bash -c "cd /HIRS; ./package/package.ubuntu.sh"
env: null
name: "Package Ubuntu"
- stage: integration-tests
script: .ci/integration-tests/./run-integration-tests.sh
- stage: system-tests
script: .ci/system-tests/./run-system-tests.sh
env: null
name: "Integration Tests"
name: "System Tests"