[#88] Integrate System Tests with Dockerized TPM 1.2 Provisioner (#208)

* Initial system test for TPM 1.2 emulator.

* Update .travis.yml file.

* Added system test: test_20_tpm_1_2_initial_provision

* Cleaned up files.

* Correct docker location

* Re-arranged system tests.

* Execute test_12_attestation_ca_portal_online for all current collectors.

* Clean up files.

* Cleaned up files.

* Cleaned up files.

* Cleaned up files.

* Cleaned up files

* Updated system test driver.

* Set logging properties to DEBUG.

* Commented out test_13_tpm_1_2_initial_provision. Need to fix it.
This commit is contained in:
busaboy1340 2019-12-23 05:28:26 -05:00 committed by GitHub
parent 404f2ab5dd
commit a2497c064c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1378 additions and 1312 deletions

View File

@ -25,13 +25,8 @@ services:
volumes: volumes:
- ../../:/HIRS - ../../:/HIRS
entrypoint: /bin/bash -c entrypoint: /bin/bash -c
command: [yum list installed|grep paccor; command: [HIRS/.ci/setup/setup-tpmprovisioner.sh;
yum info dmidecode; HIRS/.ci/system-tests/systems-test-centos7-tpm1-2.sh]
dmidecode -u;
lshw -c disk -numeric;
lshw -c display -numeric;
lshw -c network -numeric;
HIRS/.ci/setup/setup-tpmprovisioner.sh]
devices: devices:
- "/dev/mem:/dev/mem" - "/dev/mem:/dev/mem"
cap_add: cap_add:

View File

@ -10,9 +10,9 @@ until [ "`curl --silent --connect-timeout 1 -I -k https://${HIRS_ACA_PORTAL_IP}:
done done
echo "ACA is up!" echo "ACA is up!"
# Function to install TPM2 Provisioner packages. # Function to install TPM 2.0 Provisioner packages
function InstallProvisioner { function InstallProvisioner {
echo "===========Installing TPM2 Provisioner Packages...===========" echo "===========Installing TPM 2.0 Provisioner Packages...==========="
pushd /HIRS pushd /HIRS
if [ ! -d package/rpm/RPMS ]; then if [ ! -d package/rpm/RPMS ]; then
@ -22,9 +22,9 @@ function InstallProvisioner {
popd popd
} }
# Function to initialize the TPM2 Emulator with a bad base certificate # Function to initialize the TPM 2.0 Emulator with a bad base certificate
function InitTpm2Emulator { function InitTpm2Emulator {
echo "===========Initializing TPM2 Emulator with bad base certificate...===========" echo "===========Initializing TPM 2.0 Emulator with bad base certificate...==========="
mkdir -p /var/run/dbus mkdir -p /var/run/dbus
if [ -e /var/run/dbus/pid ]; then if [ -e /var/run/dbus/pid ]; then
@ -139,7 +139,7 @@ function InitTpm2Emulator {
echo "Loading PC cert $PC_DIR/$platform_cert into NVRAM." echo "Loading PC cert $PC_DIR/$platform_cert into NVRAM."
tpm2_nvwrite -x 0x1c90000 -a 0x40000001 $PC_DIR/$platform_cert tpm2_nvwrite -x 0x1c90000 -a 0x40000001 $PC_DIR/$platform_cert
echo "===========TPM2 Emulator Initialization Complete!===========" echo "===========TPM 2.0 Emulator Initialization Complete!==========="
# Set Logging to INFO Level # Set Logging to INFO Level
sed -i "s/WARN/INFO/" /etc/hirs/TPM2_Provisioner/log4cplus_config.ini sed -i "s/WARN/INFO/" /etc/hirs/TPM2_Provisioner/log4cplus_config.ini
@ -178,15 +178,15 @@ DEFAULT_SITE_CONFIG_FILE
# Install packages # Install packages
InstallProvisioner InstallProvisioner
# Install TPM2 Emulator # Install TPM 2.0 Emulator
InitTpm2Emulator InitTpm2Emulator
# Update the hirs-site.config file # Update the hirs-site.config file
UpdateHirsSiteConfigFile UpdateHirsSiteConfigFile
echo "" echo ""
echo "TPM2 Emulator NV RAM list" echo "TPM 2.0 Emulator NV RAM list"
tpm2_nvlist tpm2_nvlist
echo "" echo ""
echo "===========HIRS ACA TPM2 Provisioner Setup Complete!===========" echo "===========HIRS ACA TPM 2.0 Provisioner Setup Complete!==========="

View File

@ -10,9 +10,9 @@ until [ "`curl --silent --connect-timeout 1 -I -k https://${HIRS_ACA_PORTAL_IP}:
done done
echo "ACA is up!" echo "ACA is up!"
# Function to install TPM2 Provisioner packages. # Function to install TPM 2.0 Provisioner packages
function InstallProvisioner { function InstallProvisioner {
echo "===========Installing TPM2 Provisioner Packages...===========" echo "===========Installing TPM 2.0 Provisioner Packages...==========="
pushd /HIRS pushd /HIRS
if [ ! -d package/rpm/RPMS ]; then if [ ! -d package/rpm/RPMS ]; then
@ -22,9 +22,9 @@ function InstallProvisioner {
popd popd
} }
# Function to initialize the TPM2 Emulator with a good base certificate # Function to initialize the TPM 2.0 Emulator with a good base certificate
function InitTpm2Emulator { function InitTpm2Emulator {
echo "===========Initializing TPM2 Emulator with good base certificate...===========" echo "===========Initializing TPM 2.0 Emulator with good base certificate...==========="
mkdir -p /var/run/dbus mkdir -p /var/run/dbus
if [ -e /var/run/dbus/pid ]; then if [ -e /var/run/dbus/pid ]; then
@ -178,7 +178,7 @@ function InitTpm2Emulator {
echo "Loading PC cert $PC_DIR/$pBase_certA into NVRAM." echo "Loading PC cert $PC_DIR/$pBase_certA into NVRAM."
tpm2_nvwrite -x 0x1c90000 -a 0x40000001 $PC_DIR/$pBase_certA tpm2_nvwrite -x 0x1c90000 -a 0x40000001 $PC_DIR/$pBase_certA
echo "===========TPM2 Emulator Initialization Complete!===========" echo "===========TPM 2.0 Emulator Initialization Complete!==========="
# Set Logging to INFO Level # Set Logging to INFO Level
sed -i "s/WARN/INFO/" /etc/hirs/TPM2_Provisioner/log4cplus_config.ini sed -i "s/WARN/INFO/" /etc/hirs/TPM2_Provisioner/log4cplus_config.ini
@ -217,15 +217,15 @@ DEFAULT_SITE_CONFIG_FILE
# Install packages # Install packages
InstallProvisioner InstallProvisioner
# Install TPM2 Emulator # Install TPM 2.0 Emulator
InitTpm2Emulator InitTpm2Emulator
# Update the hirs-site.config file # Update the hirs-site.config file
UpdateHirsSiteConfigFile UpdateHirsSiteConfigFile
echo "" echo ""
echo "TPM2 Emulator NV RAM list" echo "TPM 2.0 Emulator NV RAM list"
tpm2_nvlist tpm2_nvlist
echo "" echo ""
echo "===========HIRS ACA TPM2 Provisioner Setup Complete!===========" echo "===========HIRS ACA TPM 2.0 Provisioner Setup Complete!==========="

View File

@ -10,9 +10,9 @@ until [ "`curl --silent --connect-timeout 1 -I -k https://${HIRS_ACA_PORTAL_IP}:
done done
echo "ACA is up!" echo "ACA is up!"
# Function to install TPM2 Provisioner packages. # Function to install TPM 2.0 Provisioner packages
function InstallProvisioner { function InstallProvisioner {
echo "===========Installing TPM2 Provisioner Packages...===========" echo "===========Installing TPM 2.0 Provisioner Packages...==========="
pushd /HIRS pushd /HIRS
if [ ! -d package/rpm/RPMS ]; then if [ ! -d package/rpm/RPMS ]; then
@ -22,9 +22,9 @@ function InstallProvisioner {
popd popd
} }
# Function to initialize the TPM2 Emulator # Function to initialize the TPM 2.0 Emulator
function InitTpm2Emulator { function InitTpm2Emulator {
echo "===========Initializing TPM2 Emulator...===========" echo "===========Initializing TPM 2.0 Emulator...==========="
mkdir -p /var/run/dbus mkdir -p /var/run/dbus
if [ -e /var/run/dbus/pid ]; then if [ -e /var/run/dbus/pid ]; then
@ -106,7 +106,7 @@ function InitTpm2Emulator {
echo "Loading PC cert $PC_DIR/$platform_cert into NVRAM." echo "Loading PC cert $PC_DIR/$platform_cert into NVRAM."
tpm2_nvwrite -x 0x1c90000 -a 0x40000001 $PC_DIR/$platform_cert tpm2_nvwrite -x 0x1c90000 -a 0x40000001 $PC_DIR/$platform_cert
echo "===========TPM2 Emulator Initialization Complete!===========" echo "===========TPM 2.0 Emulator Initialization Complete!==========="
# Set Logging to INFO Level # Set Logging to INFO Level
sed -i "s/WARN/INFO/" /etc/hirs/TPM2_Provisioner/log4cplus_config.ini sed -i "s/WARN/INFO/" /etc/hirs/TPM2_Provisioner/log4cplus_config.ini
@ -145,15 +145,15 @@ DEFAULT_SITE_CONFIG_FILE
# Install packages # Install packages
InstallProvisioner InstallProvisioner
# Install TPM2 Emulator # Install TPM 2.0 Emulator
InitTpm2Emulator InitTpm2Emulator
# Update the hirs-site.config file # Update the hirs-site.config file
UpdateHirsSiteConfigFile UpdateHirsSiteConfigFile
echo "" echo ""
echo "TPM2 Emulator NV RAM list" echo "TPM 2.0 Emulator NV RAM list"
tpm2_nvlist tpm2_nvlist
echo "" echo ""
echo "===========HIRS ACA TPM2 Provisioner Setup Complete!===========" echo "===========HIRS ACA TPM 2.0 Provisioner Setup Complete!==========="

View File

@ -1,6 +1,6 @@
#!/bin/bash #!/bin/bash
# Script to setup the TPM Provisioner Docker Image for Integration Tests # Script to setup the TPM 1.2 Provisioner Docker Image for Integration Tests
set -e set -e
# Wait for ACA to boot # Wait for ACA to boot
@ -10,21 +10,24 @@ until [ "`curl --silent --connect-timeout 1 -I -k https://${HIRS_ACA_PORTAL_IP}:
done done
echo "ACA is up!" echo "ACA is up!"
# Function to install Provisioner packages. # Function to install TPM 1.2 Provisioner packages
function InstallProvisioner { function InstallProvisioner {
echo "===========Installing Provisioner Packages...===========" echo "===========Installing TPM 1.2 Provisioner Packages...==========="
pushd /HIRS pushd /HIRS
if [ ! -d package/rpm/RPMS ]; then if [ ! -d package/rpm/RPMS ]; then
./package/package.centos.sh ./package/package.centos.sh
fi fi
yum install -y package/rpm/RPMS/noarch/HIRS_Provisioner_TPM_1_2*.el7.noarch.rpm yum install -y package/rpm/RPMS/noarch/HIRS_Provisioner_TPM_1_2*.el7.noarch.rpm
popd popd
} }
# Function to initialize the TPM Emulator # Function to initialize the TPM 1.2 Emulator
function InitTpmEmulator { function InitTpmEmulator {
echo "===========Initializing TPM Emulator...===========" echo "===========Initializing TPM 1.2 Emulator...==========="
# Set variables for server # Set variables for server
export TPM_PATH=/tpm_emulator/tpm_storage export TPM_PATH=/tpm_emulator/tpm_storage
@ -41,30 +44,40 @@ function InitTpmEmulator {
pushd /tpm_emulator pushd /tpm_emulator
echo "Activate Software TPM..."
# Activate Software TPM # Activate Software TPM
./tpm/tpm_server > tpm.log 2>&1 & ./tpm/tpm_server > tpm.log 2>&1 &
./libtpm/utils/tpmbios ./libtpm/utils/tpmbios
echo "Restarting Software TPM after Activation..."
# Restart Software TPM after Activation # Restart Software TPM after Activation
pkill tpm_server pkill tpm_server
./tpm/tpm_server > tpm.log 2>&1 & ./tpm/tpm_server > tpm.log 2>&1 &
./libtpm/utils/tpmbios ./libtpm/utils/tpmbios
echo "Creating EK on Software TPM..."
# Create EK on Software TPM # Create EK on Software TPM
./libtpm/utils/createek ./libtpm/utils/createek
echo "Initializing last memory address..."
# Initialize last memory address # Initialize last memory address
./libtpm/utils/nv_definespace -in ffffffff -sz 0 ./libtpm/utils/nv_definespace -in ffffffff -sz 0
popd popd
echo "Starting TrouSerS Daemon" echo "Starting TrouSerS Daemon..."
tcsd -e tcsd -e
echo "Testing TPM Connectivity" echo "Taking TPM 1.2 Ownership..."
tpm_takeownership -y -z
echo "Testing TPM 1.2 Connectivity..."
tpm_selftest tpm_selftest
echo "===========TPM Emulator Initialization Complete!===========" echo "TPM 1.2 NV info..."
tpm_nvinfo
echo "===========TPM 1.2 Emulator Initialization Complete!==========="
} }
# Function to update the hirs-site.config file # Function to update the hirs-site.config file
@ -97,14 +110,39 @@ DEFAULT_SITE_CONFIG_FILE
cat /etc/hirs/hirs-site.config cat /etc/hirs/hirs-site.config
} }
function UpdateLoggingConfigFile {
LOGGING_CONFIG="/etc/hirs/logging.properties"
echo ""
echo "===========Updating ${LOGGING_CONFIG} file...==========="
cat /etc/hirs/logging.properties
cat <<DEFAULT_LOGGING_CONFIG_FILE > $LOGGING_CONFIG
root.level=DEBUG
hirs.level=DEBUG
org.hibernate.level=
org.springframework.level=
org.apache.activemq.level=
tpm2_provisioner.level=DEBUG
DEFAULT_LOGGING_CONFIG_FILE
echo ""
echo "===========New Logging Properties File==========="
cat /etc/hirs/logging.properties
}
# Install packages # Install packages
InstallProvisioner InstallProvisioner
# Install TPM Emulator # Install TPM 1.2 Emulator
InitTpmEmulator InitTpmEmulator
# Update the hirs-site.config file # Update the hirs-site.config file
UpdateHirsSiteConfigFile UpdateHirsSiteConfigFile
# Update the logging.properties file
UpdateLoggingConfigFile
echo "" echo ""
echo "===========HIRS ACA Provisioner Setup Complete!===========" echo "===========HIRS ACA TPM 1.2 Provisioner Setup Complete!==========="

View File

@ -14,10 +14,10 @@ cd .ci/docker
docker-compose -f docker-compose-tpm2-base-delta-bad.yml up -d docker-compose -f docker-compose-tpm2-base-delta-bad.yml up -d
tpm2_container_id="$(docker ps -aqf "name=hirs-aca-provisioner-tpm2")" tpm2_container_id="$(docker ps -aqf "name=hirs-aca-provisioner-tpm2")"
echo "TPM2 Container ID: $tpm2_container_id" echo "TPM 2.0 Container ID: $tpm2_container_id"
tpm2_container_status="$(docker inspect $tpm2_container_id --format='{{.State.Status}}')" tpm2_container_status="$(docker inspect $tpm2_container_id --format='{{.State.Status}}')"
echo "TPM2 Container Status: $tpm2_container_status" echo "TPM 2.0 Container Status: $tpm2_container_status"
while [[ $tpm2_container_status == "running" ]] while [[ $tpm2_container_status == "running" ]]
do do
@ -32,7 +32,7 @@ done
# Store container exit code # Store container exit code
tpm2_container_exit_code="$(docker inspect $tpm2_container_id --format='{{.State.ExitCode}}')" tpm2_container_exit_code="$(docker inspect $tpm2_container_id --format='{{.State.ExitCode}}')"
echo "TPM2 Container Exit Code: $tpm2_container_exit_code" echo "TPM 2.0 Container Exit Code: $tpm2_container_exit_code"
# Display container log # Display container log
echo "" echo ""

View File

@ -14,10 +14,10 @@ cd .ci/docker
docker-compose -f docker-compose-tpm2-base-delta-good.yml up -d docker-compose -f docker-compose-tpm2-base-delta-good.yml up -d
tpm2_container_id="$(docker ps -aqf "name=hirs-aca-provisioner-tpm2")" tpm2_container_id="$(docker ps -aqf "name=hirs-aca-provisioner-tpm2")"
echo "TPM2 Container ID: $tpm2_container_id" echo "TPM 2.0 Container ID: $tpm2_container_id"
tpm2_container_status="$(docker inspect $tpm2_container_id --format='{{.State.Status}}')" tpm2_container_status="$(docker inspect $tpm2_container_id --format='{{.State.Status}}')"
echo "TPM2 Container Status: $tpm2_container_status" echo "TPM 2.0 Container Status: $tpm2_container_status"
while [[ $tpm2_container_status == "running" ]] while [[ $tpm2_container_status == "running" ]]
do do
@ -32,7 +32,7 @@ done
# Store container exit code # Store container exit code
tpm2_container_exit_code="$(docker inspect $tpm2_container_id --format='{{.State.ExitCode}}')" tpm2_container_exit_code="$(docker inspect $tpm2_container_id --format='{{.State.ExitCode}}')"
echo "TPM2 Container Exit Code: $tpm2_container_exit_code" echo "TPM 2.0 Container Exit Code: $tpm2_container_exit_code"
# Display container log # Display container log
echo "" echo ""

View File

@ -14,10 +14,10 @@ cd .ci/docker
docker-compose -f docker-compose-tpm2.yml up -d docker-compose -f docker-compose-tpm2.yml up -d
tpm2_container_id="$(docker ps -aqf "name=hirs-aca-provisioner-tpm2")" tpm2_container_id="$(docker ps -aqf "name=hirs-aca-provisioner-tpm2")"
echo "TPM2 Container ID: $tpm2_container_id" echo "TPM 2.0 Container ID: $tpm2_container_id"
tpm2_container_status="$(docker inspect $tpm2_container_id --format='{{.State.Status}}')" tpm2_container_status="$(docker inspect $tpm2_container_id --format='{{.State.Status}}')"
echo "TPM2 Container Status: $tpm2_container_status" echo "TPM 2.0 Container Status: $tpm2_container_status"
while [[ $tpm2_container_status == "running" ]] while [[ $tpm2_container_status == "running" ]]
do do
@ -32,7 +32,7 @@ done
# Store container exit code # Store container exit code
tpm2_container_exit_code="$(docker inspect $tpm2_container_id --format='{{.State.ExitCode}}')" tpm2_container_exit_code="$(docker inspect $tpm2_container_id --format='{{.State.ExitCode}}')"
echo "TPM2 Container Exit Code: $tpm2_container_exit_code" echo "TPM 2.0 Container Exit Code: $tpm2_container_exit_code"
# Display container log # Display container log
echo "" echo ""

View File

@ -5,7 +5,7 @@
set -e set -e
echo "" echo ""
echo "System Tests Starting..." echo "System Tests TPM 1.2 Starting..."
echo "" echo ""
# Start System Testing Docker Environment # Start System Testing Docker Environment
@ -14,10 +14,10 @@ cd .ci/docker
docker-compose up -d docker-compose up -d
tpm_container_id="$(docker ps -aqf "name=hirs-aca-provisioner")" tpm_container_id="$(docker ps -aqf "name=hirs-aca-provisioner")"
echo "TPM Container ID: $tpm_container_id" echo "TPM 1.2 Container ID: $tpm_container_id"
tpm_container_status="$(docker inspect $tpm_container_id --format='{{.State.Status}}')" tpm_container_status="$(docker inspect $tpm_container_id --format='{{.State.Status}}')"
echo "TPM Container Status: $tpm_container_status" echo "TPM 1.2 Container Status: $tpm_container_status"
while [[ $tpm_container_status == "running" ]] while [[ $tpm_container_status == "running" ]]
do do
@ -32,7 +32,7 @@ done
# Store container exit codes # Store container exit codes
tpm_container_exit_code="$(docker inspect $tpm_container_id --format='{{.State.ExitCode}}')" tpm_container_exit_code="$(docker inspect $tpm_container_id --format='{{.State.ExitCode}}')"
echo "TPM Container Exit Code: $tpm_container_exit_code" echo "TPM 1.2 Container Exit Code: $tpm_container_exit_code"
# Display container logs # Display container logs
echo "" echo ""
@ -40,7 +40,7 @@ echo "===========hirs-aca-provisioner System Tests Log:==========="
docker logs $tpm_container_id docker logs $tpm_container_id
echo "" echo ""
echo "End of TPM 1.2 System Tests, cleaning up..." echo "End of System Tests TPM 1.2, cleaning up..."
echo "" echo ""
# Clean up services and network # Clean up services and network
docker-compose down docker-compose down
@ -48,9 +48,9 @@ docker-compose down
# Return container exit codes # Return container exit codes
if [[ $tpm_container_exit_code == 0 ]] if [[ $tpm_container_exit_code == 0 ]]
then then
echo "SUCCESS: TPM 1.2 System tests passed" echo "SUCCESS: System Tests TPM 1.2 passed"
exit 0 exit 0
fi fi
echo "ERROR: System tests failed" echo "ERROR: System Tests TPM 1.2 failed"
exit 1 exit 1

View File

@ -23,17 +23,22 @@ import sys
import argparse import argparse
from system_test_core import HIRSPortal, AttestationCAPortal, collectors, \ from system_test_core import HIRSPortal, AttestationCAPortal, collectors, \
send_command, send_command_sha1sum, run_hirs_report, \ send_command, send_command_sha1sum, run_hirs_report, run_hirs_provisioner_tpm_1_2, \
run_hirs_provisioner_tpm2, parse_xml_with_stripped_namespaces, get_current_timestamp, \ run_hirs_provisioner_tpm_2_0, parse_xml_with_stripped_namespaces, \
get_all_nodes_recursively, touch_random_file_and_remove, get_random_pcr_hex_value, \ get_all_nodes_recursively, touch_random_file_and_remove, get_random_pcr_hex_value, \
is_ubuntu_client, is_tpm2,\ get_current_timestamp, is_ubuntu_client, is_tpm_2_0, is_tpm_1_2, \
DEFAULT_IMA_POLICY, DEFAULT_TPM_POLICY DEFAULT_IMA_POLICY, DEFAULT_TPM_POLICY, \
make_simple_ima_baseline, make_baseline_from_xml, \
make_simple_ima_blacklist_baseline, \
make_simple_ima_blacklist_baseline_with_hash, \
make_simple_ima_blacklist_baseline_with_file_and_hash, \
make_simple_ima_blacklist_baseline_with_updated_file_and_hash
NUMBER_OF_PCRS = 24 NUMBER_OF_PCRS = 24
suffix = os.environ.get('RANDOM_SYS_TEST_ID') suffix = os.environ.get('RANDOM_SYS_TEST_ID')
if suffix != None: if suffix != None:
print "Configuring with suffix " + suffix print("Configuring with suffix: %s" % suffix)
suffix = "-" + suffix suffix = "-" + suffix
else: else:
suffix = "" suffix = ""
@ -70,7 +75,7 @@ FORMAT = "%(asctime)-15s %(message)s"
provisioner_out = None provisioner_out = None
logging.basicConfig(filename=TEST_LOG_FILE,level=eval(LOG_LEVEL), format=FORMAT) logging.basicConfig(filename=TEST_LOG_FILE,level=eval(LOG_LEVEL), format=FORMAT)
logging.info("*****************beginning of system_test.py*****************") logging.info("***************** Beginning of system_test.py *****************")
logging.info("The ACA Portal is: " + HIRS_ATTESTATION_CA_PORTAL_URL) logging.info("The ACA Portal is: " + HIRS_ATTESTATION_CA_PORTAL_URL)
Portal = HIRSPortal(HIRS_SERVER_URL) Portal = HIRSPortal(HIRS_SERVER_URL)
@ -91,17 +96,23 @@ class SystemTest(unittest.TestCase):
def tearDown(self): def tearDown(self):
"""Tears down the state for testing""" """Tears down the state for testing"""
def test_01_attestation_ca_portal_online(self):
"""Test that the Attestation CA Portal is online and accessible by making a GET request.
If not online, an exception will be raised since the response code is non-200"""
logging.info("***************** Beginning of attestation ca portal online test *****************")
AcaPortal.check_is_online()
@collectors(['IMA', 'TPM'], COLLECTOR_LIST) @collectors(['IMA', 'TPM'], COLLECTOR_LIST)
def test_01_empty_baselines(self): def test_02_empty_baselines(self):
"""Test that appraisal succeeds with empty IMA and TPM baselines""" """Test that appraisal succeeds with empty IMA and TPM baselines"""
logging.info("*****************beginning of empty baseline test*****************") logging.info("***************** Beginning of empty baseline test *****************")
# Portal.set_default_policies(ima_policy=DEFAULT_IMA_POLICY, tpm_policy=DEFAULT_TPM_POLICY) # Portal.set_default_policies(ima_policy=DEFAULT_IMA_POLICY, tpm_policy=DEFAULT_TPM_POLICY)
# result = run_hirs_report(CLIENT) # result = run_hirs_report(CLIENT)
# self.assertTrue(result) # self.assertTrue(result)
# self.assertEqual(0, Portal.get_alert_count_from_latest_report()) # self.assertEqual(0, Portal.get_alert_count_from_latest_report())
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
def test_02_small_ima_appraisal(self): def test_03_small_ima_appraisal(self):
"""Test that appraisal works with a small hard-coded IMA baseline """Test that appraisal works with a small hard-coded IMA baseline
steps: steps:
@ -110,7 +121,7 @@ class SystemTest(unittest.TestCase):
- set the default device group to point to that policy - set the default device group to point to that policy
- run a report from the client machine using vagrant ssh - run a report from the client machine using vagrant ssh
""" """
logging.info("*****************beginning of small IMA appraisal test*****************") logging.info("***************** Beginning of small IMA appraisal test *****************")
# baseline = make_simple_ima_baseline() # baseline = make_simple_ima_baseline()
# policy_name = Portal.add_ima_policy(required_set=baseline, policy_name_prefix='small_ima') # policy_name = Portal.add_ima_policy(required_set=baseline, policy_name_prefix='small_ima')
# Portal.set_default_policies(ima_policy=policy_name) # Portal.set_default_policies(ima_policy=policy_name)
@ -118,7 +129,7 @@ class SystemTest(unittest.TestCase):
# self.assertTrue(result) # self.assertTrue(result)
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
def test_03_large_ima_appraisal(self): def test_04_large_ima_appraisal(self):
"""Test that appraisal works with a full-size IMA baseline """Test that appraisal works with a full-size IMA baseline
steps: steps:
@ -129,7 +140,7 @@ class SystemTest(unittest.TestCase):
- set the default device group to point to that policy - set the default device group to point to that policy
- run a report from the client machine using vagrant ssh - run a report from the client machine using vagrant ssh
""" """
logging.info("*****************beginning of large IMA appraisal test*****************") logging.info("***************** Beginning of large IMA appraisal test *****************")
# empty_ima_policy = Portal.add_ima_policy(required_set=None, policy_name_prefix="empty") # empty_ima_policy = Portal.add_ima_policy(required_set=None, policy_name_prefix="empty")
# Portal.set_default_policies(ima_policy=empty_ima_policy, # Portal.set_default_policies(ima_policy=empty_ima_policy,
# tpm_policy=DEFAULT_TPM_POLICY) # tpm_policy=DEFAULT_TPM_POLICY)
@ -148,7 +159,7 @@ class SystemTest(unittest.TestCase):
# self.assertTrue(True) # self.assertTrue(True)
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
def test_04_small_ima_appraisal_required_set_missing(self): def test_05_small_ima_appraisal_required_set_missing(self):
"""Test that appraisal results in an appropriate alert generation when a required set file is missing """Test that appraisal results in an appropriate alert generation when a required set file is missing
steps: steps:
@ -159,7 +170,7 @@ class SystemTest(unittest.TestCase):
- run a report from the client machine using vagrant ssh - run a report from the client machine using vagrant ssh
- make sure it failed and that one appropriate alert was thrown - make sure it failed and that one appropriate alert was thrown
""" """
logging.info("*****************beginning of small IMA appraisal test with required set missing*****************") logging.info("***************** Beginning of small IMA appraisal test with required set missing *****************")
# baseline = make_simple_ima_baseline() # baseline = make_simple_ima_baseline()
# baseline["name"] = "ima_baseline_missing_required_record_{0}".format(get_current_timestamp()) # baseline["name"] = "ima_baseline_missing_required_record_{0}".format(get_current_timestamp())
# random_hash = str(hashlib.sha1(str(random.random())).hexdigest()) # random_hash = str(hashlib.sha1(str(random.random())).hexdigest())
@ -180,8 +191,8 @@ class SystemTest(unittest.TestCase):
# self.assertTrue(random_hash in latest_alert['expected']) # self.assertTrue(random_hash in latest_alert['expected'])
# self.assertTrue(missing_file in latest_alert['expected']) # self.assertTrue(missing_file in latest_alert['expected'])
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
def test_05_tpm_white_list_appraisal(self): def test_06_tpm_white_list_appraisal(self):
"""Test that appraisal works with a TPM white list baseline """Test that appraisal works with a TPM white list baseline
steps: steps:
@ -192,7 +203,7 @@ class SystemTest(unittest.TestCase):
- set the default device group to point to that policy - set the default device group to point to that policy
- run a report from the client machine - run a report from the client machine
""" """
logging.info("*****************beginning of TPM white list appraisal test*****************") logging.info("***************** Beginning of TPM white list appraisal test *****************")
# empty_ima_policy = Portal.add_ima_policy(required_set=None) # empty_ima_policy = Portal.add_ima_policy(required_set=None)
# Portal.set_default_policies(ima_policy=empty_ima_policy, # Portal.set_default_policies(ima_policy=empty_ima_policy,
# tpm_policy=DEFAULT_TPM_POLICY) # tpm_policy=DEFAULT_TPM_POLICY)
@ -238,8 +249,8 @@ class SystemTest(unittest.TestCase):
# self.assertTrue(reported_hash in pcr_alert['received']) # self.assertTrue(reported_hash in pcr_alert['received'])
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_06_ima_blacklist_appraisal(self): def test_07_ima_blacklist_appraisal(self):
"""Test that appraisal works with a small IMA blacklist baseline """Test that appraisal works with a small IMA blacklist baseline
steps: steps:
@ -249,7 +260,7 @@ class SystemTest(unittest.TestCase):
- touch a file on the client that is contained in the blacklist - touch a file on the client that is contained in the blacklist
- run another report from the client machine and ensure the appraisal fails - run another report from the client machine and ensure the appraisal fails
""" """
logging.info("*****************beginning of blacklist IMA appraisal test*****************") logging.info("***************** Beginning of blacklist IMA appraisal test *****************")
# baseline = make_simple_ima_blacklist_baseline() # baseline = make_simple_ima_blacklist_baseline()
# policy_name = Portal.add_ima_policy(blacklist=baseline, policy_name_prefix='small_ima_blacklist') # policy_name = Portal.add_ima_policy(blacklist=baseline, policy_name_prefix='small_ima_blacklist')
# Portal.set_default_policies(ima_policy=policy_name) # Portal.set_default_policies(ima_policy=policy_name)
@ -350,8 +361,8 @@ class SystemTest(unittest.TestCase):
# self.assertTrue(result) # self.assertTrue(result)
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_07_delta_reports_required_set(self): def test_08_delta_reports_required_set(self):
"""Test that appraisal works with delta reports and required sets. """Test that appraisal works with delta reports and required sets.
steps: steps:
@ -372,7 +383,7 @@ class SystemTest(unittest.TestCase):
- Check that foo-bar-file is in this report, but not foo-file - Check that foo-bar-file is in this report, but not foo-file
""" """
logging.info("*****************beginning of Delta Reports required set appraisal test*****************") logging.info("***************** Beginning of Delta Reports required set appraisal test *****************")
# unique_name = uuid.uuid4().hex # unique_name = uuid.uuid4().hex
# baseline_name = 'delta-reports-required-baseline-' + unique_name # baseline_name = 'delta-reports-required-baseline-' + unique_name
# foo_file_name = 'foo-file-' + unique_name # foo_file_name = 'foo-file-' + unique_name
@ -424,8 +435,8 @@ class SystemTest(unittest.TestCase):
# send_vagrant_command('rm {0}'.format(foo_bar_file_name), CLIENT) # send_vagrant_command('rm {0}'.format(foo_bar_file_name), CLIENT)
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_08_delta_reports_whitelist(self): def test_09_delta_reports_whitelist(self):
"""Test that appraisal works with delta reports. Each report should be """Test that appraisal works with delta reports. Each report should be
appraised individually. Checks that a failed appraisal can be followed appraised individually. Checks that a failed appraisal can be followed
by a successful appraisal if there are no errors in the second delta by a successful appraisal if there are no errors in the second delta
@ -450,7 +461,7 @@ class SystemTest(unittest.TestCase):
- Check that foo-file is not in this report - Check that foo-file is not in this report
""" """
logging.info("*****************beginning of Delta Reports whitelist appraisal test*****************") logging.info("***************** Beginning of Delta Reports whitelist appraisal test *****************")
# unique_name = uuid.uuid4().hex # unique_name = uuid.uuid4().hex
# baseline_name = 'delta-reports-whitelist-baseline-' + unique_name # baseline_name = 'delta-reports-whitelist-baseline-' + unique_name
# foo_file_name = 'foo-file-' + unique_name # foo_file_name = 'foo-file-' + unique_name
@ -490,8 +501,8 @@ class SystemTest(unittest.TestCase):
# send_vagrant_command('rm {0}'.format(foo_file_name), CLIENT) # send_vagrant_command('rm {0}'.format(foo_file_name), CLIENT)
@collectors(['IMA', 'TPM'], COLLECTOR_LIST) @collectors(['IMA', 'TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_09_on_demand(self): def test_10_on_demand(self):
"""Test that on-demand (server-initiated) appraisal works. """Test that on-demand (server-initiated) appraisal works.
steps: steps:
@ -505,7 +516,7 @@ class SystemTest(unittest.TestCase):
- check that it has the random filename and hash - check that it has the random filename and hash
- check that it contains a TPM Report - check that it contains a TPM Report
""" """
logging.info("*****************beginning of on-demand test*****************") logging.info("***************** Beginning of on-demand test *****************")
# baseline = make_simple_ima_baseline() # baseline = make_simple_ima_baseline()
# policy_name = Portal.add_ima_policy(required_set=baseline, delta_reports_enabled="false", policy_name_prefix='on_demand') # policy_name = Portal.add_ima_policy(required_set=baseline, delta_reports_enabled="false", policy_name_prefix='on_demand')
# logging.info('on demand policy name: %s', policy_name) # logging.info('on demand policy name: %s', policy_name)
@ -538,7 +549,7 @@ class SystemTest(unittest.TestCase):
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
@unittest.skip("SELinux issues are preventing repo sync from working") @unittest.skip("SELinux issues are preventing repo sync from working")
def test_10_failing_ima_appraisal_broad_repo_baseline(self): def test_11_failing_ima_appraisal_broad_repo_baseline(self):
"""Test that an appraisal not containing expected packages in a broad repo IMA baseline fails. """Test that an appraisal not containing expected packages in a broad repo IMA baseline fails.
steps: steps:
@ -548,7 +559,7 @@ class SystemTest(unittest.TestCase):
- Run a HIRS report and ensure it fails - Run a HIRS report and ensure it fails
- Ensure that at least one of the expected alerts has been generated - Ensure that at least one of the expected alerts has been generated
""" """
logging.info("*****************beginning of broad repo failing appraisal test*****************") logging.info("***************** Beginning of broad repo failing appraisal test *****************")
# repo_name = "Test Yum Repository" # repo_name = "Test Yum Repository"
# baseline_name = "Test Broad Baseline" # baseline_name = "Test Broad Baseline"
# policy_name = "Test Broad Repo IMA Policy" # policy_name = "Test Broad Repo IMA Policy"
@ -571,7 +582,7 @@ class SystemTest(unittest.TestCase):
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
@unittest.skip("SELinux issues are preventing repo sync from working") @unittest.skip("SELinux issues are preventing repo sync from working")
@unittest.skipIf(is_ubuntu_client(CLIENT_OS), "Skipping this test due to client OS " + CLIENT_OS) @unittest.skipIf(is_ubuntu_client(CLIENT_OS), "Skipping this test due to client OS " + CLIENT_OS)
def test_11_successful_ima_appraisal_broad_repo_baseline(self): def test_12_successful_ima_appraisal_broad_repo_baseline(self):
"""Test that an appraisal containing expected packages in a broad repo IMA baseline passes. """Test that an appraisal containing expected packages in a broad repo IMA baseline passes.
This test only works on CentOS 6 and 7. This test only works on CentOS 6 and 7.
@ -583,7 +594,7 @@ class SystemTest(unittest.TestCase):
- Run a HIRS report and ensure it passes - Run a HIRS report and ensure it passes
- Ensure that there are no new alerts - Ensure that there are no new alerts
""" """
logging.info("*****************beginning of broad repo successful appraisal test*****************") logging.info("***************** Beginning of broad repo successful appraisal test *****************")
# repo_name = "Test Yum Repository" # repo_name = "Test Yum Repository"
# baseline_name = "Test Broad Baseline" # baseline_name = "Test Broad Baseline"
# policy_name = "Test Broad Repo IMA Policy" # policy_name = "Test Broad Repo IMA Policy"
@ -607,128 +618,129 @@ class SystemTest(unittest.TestCase):
# self.assertTrue(run_hirs_report(CLIENT)) # self.assertTrue(run_hirs_report(CLIENT))
# self.assertEqual(Portal.get_alert_count_from_latest_report(), 0) # self.assertEqual(Portal.get_alert_count_from_latest_report(), 0)
@collectors(['TPM'], COLLECTOR_LIST) # @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) # @unittest.skipIf(not is_tpm_1_2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_12_attestation_ca_portal_online(self): # def test_13_tpm_1_2_initial_provision(self):
"""Test that the Attestation CA Portal is online and accessible by making a GET request. # """Test that running the TPM 1.2 hirs provisioner works"""
If not online, an exception will be raised since the response code is non-200""" # logging.info("***************** Beginning of initial TPM 1.2 provisioner run *****************")
logging.info("*****************beginning of attestation ca portal online test *****************") #
AcaPortal.check_is_online() # # Run the provisioner to ensure that it provisions successfully
# provisioner_out = run_hirs_provisioner_tpm_1_2(CLIENT)
# print("Initial TPM 1.2 provisioner run output: {0}".format(provisioner_out))
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_13_tpm2_initial_provision(self): def test_14_tpm_2_0_initial_provision(self):
"""Test that running the tpm2 hirs provisioner works""" """Test that running the TPM 2.0 hirs provisioner works"""
logging.info("*****************beginning of initial provisioner run *****************") logging.info("***************** Beginning of initial TPM 2.0 provisioner run *****************")
# Run the provisioner to ensure that it provisions successfully # Run the provisioner to ensure that it provisions successfully
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("Initial provisioner run output: {0}".format(provisioner_out)) print("Initial TPM 2.0 provisioner run output: {0}".format(provisioner_out))
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_14_device_info_report_stored_after_provisioning(self): def test_15_device_info_report_stored_after_provisioning(self):
"""Test that running the hirs provisioner results in storing a device info report for """Test that running the hirs provisioner results in storing a device info report for
the device in the DB""" the device in the DB"""
logging.info("*****************beginning of provisioner + device info report test *****************") logging.info("***************** Beginning of device info report test *****************")
logging.info("getting devices from ACA portal")
logging.info("Getting devices from ACA portal...")
aca_portal_devices = AcaPortal.get_devices() aca_portal_devices = AcaPortal.get_devices()
self.assertEqual(aca_portal_devices['recordsTotal'], 1) self.assertEqual(aca_portal_devices['recordsTotal'], 1)
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_15_supply_chain_validation_summary_stored_after_second_provisioning(self): def test_16_supply_chain_validation_summary_stored_after_second_provisioning(self):
"""Test that running the hirs provisioner, a second time, results in storing a supply chain validation """Test that running the hirs provisioner, a second time, results in storing a supply chain validation
record in the database""" record in the database"""
logging.info("*****************beginning of provisioner + supply chain validation summary test *****************") logging.info("***************** Beginning of supply chain validation summary test *****************")
if is_tpm2(TPM_VERSION):
logging.info("Using TPM 2.0")
logging.info("Uploading CA cert: " + CA_CERT_LOCATION) logging.info("Uploading CA cert: " + CA_CERT_LOCATION)
AcaPortal.upload_ca_cert(CA_CERT_LOCATION) AcaPortal.upload_ca_cert(CA_CERT_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT)
else:
# Supply chain validation only supported on CentOS 7
if CLIENT_OS == "centos7":
AcaPortal.upload_ca_cert(EK_CA_CERT_LOCATION)
AcaPortal.enable_ec_validation()
provisioner_out = run_hirs_provisioner(CLIENT)
provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("Second provisioner run output: {0}".format(provisioner_out)) print("Second provisioner run output: {0}".format(provisioner_out))
supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries() supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries()
# verify this is one SCVS record indicating PASS # verify this is one SCVS record indicating PASS
self.assertEqual(supply_chain_validation_summaries['recordsTotal'], 2) self.assertEqual(supply_chain_validation_summaries['recordsTotal'], 2)
self.assertEqual(supply_chain_validation_summaries['data'][0]['overallValidationResult'], "PASS") self.assertEqual(supply_chain_validation_summaries['data'][0]['overallValidationResult'], "PASS")
self.assertEqual(supply_chain_validation_summaries['data'][1]['overallValidationResult'], "PASS") self.assertEqual(supply_chain_validation_summaries['data'][1]['overallValidationResult'], "PASS")
# verify device has been updated with supply chain appraisal result # verify device has been updated with supply chain appraisal result
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_16_ek_info_report(self): def test_17_ek_info_report(self):
"""Test that running the hirs provisioner results in storing EK certs info report for """Test that running the hirs provisioner results in storing EK certs info report for
the device in the DB""" the device in the DB"""
logging.info("*****************beginning of provisioner + Endorsement certs info report test *****************") logging.info("***************** Beginning of Endorsement Certs info report test *****************")
logging.info("getting ek certs from ACA portal")
logging.info("Getting EK Certs from ACA portal...")
cert_list = AcaPortal.get_ek_certs() cert_list = AcaPortal.get_ek_certs()
self.assertEqual(cert_list['recordsTotal'], 1) self.assertEqual(cert_list['recordsTotal'], 1)
self.assertEqual(cert_list['data'][0]['credentialType'], "TCPA Trusted Platform Module Endorsement") self.assertEqual(cert_list['data'][0]['credentialType'], "TCPA Trusted Platform Module Endorsement")
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_17_pk_info_report(self): def test_18_pk_info_report(self):
"""Test that running the hirs provisioner results in storing PK certs info report for """Test that running the hirs provisioner results in storing PK certs info report for
the device in the DB""" the device in the DB"""
logging.info("*****************beginning of provisioner + Platform certs info report test *****************") logging.info("***************** Beginning Platform Certs info report test *****************")
logging.info("getting pk certs from ACA portal")
logging.info("Getting PK Certs from ACA portal...")
cert_list = AcaPortal.get_pk_certs() cert_list = AcaPortal.get_pk_certs()
self.assertEqual(cert_list['recordsTotal'], 1) self.assertEqual(cert_list['recordsTotal'], 1)
self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement") self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement")
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_18_trust_chain_info_report(self): def test_19_trust_chain_info_report(self):
"""Test that running the hirs provisioner results in storing trust chains info report for """Test that running the hirs provisioner results in storing trust chains info report for
the device in the DB""" the device in the DB"""
logging.info("*****************beginning of provisioner + Trust chains info report test *****************") logging.info("***************** Beginning of Trust Chain info report test *****************")
logging.info("getting trust chains from ACA portal") logging.info("Getting Trust Chains from ACA portal...")
trust_chain_list = AcaPortal.get_trust_chains() trust_chain_list = AcaPortal.get_trust_chains()
self.assertEqual(trust_chain_list['recordsTotal'], 1) self.assertEqual(trust_chain_list['recordsTotal'], 1)
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A1_base_delta(self): def test_20_A1_base_delta(self):
"""Test Delta Certificates A1 - Provisioning with Good Base Platform Cert (via Platform Cert on TPM Emulator)""" """Test Delta Certificates A1 - Provisioning with Good Base Platform Cert (via Platform Cert on TPM Emulator)"""
logging.info("*****************test_19_A1 - beginning of delta certificate test *****************") logging.info("***************** test_20_A1 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform Cert (via Platform Cert on TPM Emulator)") logging.info("Provisioning with Good Base Platform Cert (via Platform Cert on TPM Emulator)")
logging.info("Check if ACA is online...") logging.info("Check if ACA is online...")
AcaPortal.check_is_online() AcaPortal.check_is_online()
logging.info("Uploading CA cert: " + CA_CERT_LOCATION) logging.info("Uploading CA Cert: " + CA_CERT_LOCATION)
AcaPortal.upload_ca_cert(CA_CERT_LOCATION) AcaPortal.upload_ca_cert(CA_CERT_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A1_base_delta run output: {0}".format(provisioner_out)) print("test_20_A1_base_delta run output: {0}".format(provisioner_out))
# Verify device supply chain appraisal result is PASS # Verify device supply chain appraisal result is PASS
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A2_base_delta(self): def test_20_A2_base_delta(self):
"""Test Delta Certificates A2 - Attempt to upload Base cert with holder already having a Base Platform Cert associated with it""" """Test Delta Certificates A2 - Attempt to upload Base cert with holder already having a Base Platform Cert associated with it"""
logging.info("*****************test_19_A2 - beginning of delta certificate test *****************") logging.info("***************** test_20_A2 - Beginning of delta certificate test *****************")
logging.info("Attempt to upload PBaseCertB, with PBaseCertA already loaded in the ACA.") logging.info("Attempt to upload PBaseCertB, with PBaseCertA already loaded in the ACA.")
print("test_19_A2_base_delta. PBaseCertA has already been loaded. Attempting to upload second Platform Cert: %s" % (PBaseCertB_LOCATION)) print("test_20_A2_base_delta. PBaseCertA has already been loaded. Attempting to upload second Platform Cert: %s" % (PBaseCertB_LOCATION))
# Confirm there is one Platform Base Cert already loaded # Confirm there is one Platform Base Cert already loaded
cert_list = AcaPortal.get_pk_certs() cert_list = AcaPortal.get_pk_certs()
self.assertEqual(cert_list['recordsTotal'], 1) self.assertEqual(cert_list['recordsTotal'], 1)
print("Number of Platform certs: %d" % (cert_list['recordsTotal'])) print("Number of Platform Certs: %d" % (cert_list['recordsTotal']))
self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement") self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement")
self.assertEqual(cert_list['data'][0]['platformType'], "Base") self.assertEqual(cert_list['data'][0]['platformType'], "Base")
@ -739,22 +751,20 @@ class SystemTest(unittest.TestCase):
# Confirm Platform Base Cert has not been loaded # Confirm Platform Base Cert has not been loaded
cert_list = AcaPortal.get_pk_certs() cert_list = AcaPortal.get_pk_certs()
self.assertEqual(cert_list['recordsTotal'], 1) self.assertEqual(cert_list['recordsTotal'], 1)
print("Number of Platform certs: %d" % (cert_list['recordsTotal'])) print("Number of Platform Certs: %d" % (cert_list['recordsTotal']))
self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement") self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement")
self.assertEqual(cert_list['data'][0]['platformType'], "Base") self.assertEqual(cert_list['data'][0]['platformType'], "Base")
if (cert_list['recordsTotal'] == 1): if (cert_list['recordsTotal'] == 1):
print ("SUCCESS.") print ("SUCCESS.\n")
print ("")
else: else:
print ("FAILED.") print ("FAILED.\n")
print ("")
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A3_base_delta(self): def test_20_A3_base_delta(self):
"""Test Delta Certificates A3 - Provisioning with Good Base Platform Cert Base and 1 Delta Cert""" """Test Delta Certificates A3 - Provisioning with Good Base Platform Cert Base and 1 Delta Cert"""
logging.info("*****************test_19_A3 - beginning of delta certificate test *****************") logging.info("***************** test_20_A3 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform Cert Base and 1 Delta Cert") logging.info("Provisioning with Good Base Platform Cert Base and 1 Delta Cert")
# Verify device supply chain appraisal result is PASS # Verify device supply chain appraisal result is PASS
@ -764,8 +774,8 @@ class SystemTest(unittest.TestCase):
# Upload the SIDeltaCertA1 and provision # Upload the SIDeltaCertA1 and provision
AcaPortal.upload_pk_cert(SIDeltaCertA1_LOCATION) AcaPortal.upload_pk_cert(SIDeltaCertA1_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A3_base_delta run output: {0}".format(provisioner_out)) print("test_20_A3_base_delta run output: {0}".format(provisioner_out))
supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries() supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries()
# Verify this is one SCVS record indicating PASS # Verify this is one SCVS record indicating PASS
@ -778,10 +788,10 @@ class SystemTest(unittest.TestCase):
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A4_base_delta(self): def test_20_A4_base_delta(self):
"""Test Delta Certificates A4 - Provisioning with Good Base Platform Cert Base and 2 Delta Certs""" """Test Delta Certificates A4 - Provisioning with Good Base Platform Cert Base and 2 Delta Certs"""
logging.info("*****************test_19_A4 - beginning of delta certificate test *****************") logging.info("***************** test_20_A4 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform Cert Base and 2 Delta Certs") logging.info("Provisioning with Good Base Platform Cert Base and 2 Delta Certs")
# Verify device supply chain appraisal result is PASS # Verify device supply chain appraisal result is PASS
@ -791,9 +801,9 @@ class SystemTest(unittest.TestCase):
# Upload the VARDeltaCertA1 and provision # Upload the VARDeltaCertA1 and provision
AcaPortal.upload_pk_cert(VARDeltaCertA1_LOCATION) AcaPortal.upload_pk_cert(VARDeltaCertA1_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A4_base_delta run output: {0}".format(provisioner_out)) print("test_20_A4_base_delta run output: {0}".format(provisioner_out))
supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries() supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries()
# Verify this is one SCVS record indicating PASS # Verify this is one SCVS record indicating PASS
@ -807,10 +817,10 @@ class SystemTest(unittest.TestCase):
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A5_base_delta(self): def test_20_A5_base_delta(self):
"""Test Delta Certificates A5 - Provisioning with Good Base Platform Cert and 1 Bad Delta Cert""" """Test Delta Certificates A5 - Provisioning with Good Base Platform Cert and 1 Bad Delta Cert"""
logging.info("*****************test_19_A5 - beginning of delta certificate test *****************") logging.info("***************** test_20_A5 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform Cert and 1 Bad Delta Cert") logging.info("Provisioning with Good Base Platform Cert and 1 Bad Delta Cert")
# TODO: Determine if we need this test # TODO: Determine if we need this test
@ -822,7 +832,7 @@ class SystemTest(unittest.TestCase):
# # Upload the VARDelta cert and provision # # Upload the VARDelta cert and provision
# AcaPortal.upload_pk_cert(SIDeltaCertA2_LOCATION) # AcaPortal.upload_pk_cert(SIDeltaCertA2_LOCATION)
# AcaPortal.enable_supply_chain_validations() # AcaPortal.enable_supply_chain_validations()
# provisioner_out = run_hirs_provisioner_tpm2(CLIENT) # provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
# #
# print("test_19_A4_base_delta SHOULD FAIL provisioning!!") # print("test_19_A4_base_delta SHOULD FAIL provisioning!!")
# print("test_19_A4_base_delta run output: {0}".format(provisioner_out)) # print("test_19_A4_base_delta run output: {0}".format(provisioner_out))
@ -831,10 +841,10 @@ class SystemTest(unittest.TestCase):
# self.assertIn("Provisioning failed", format(provisioner_out)) # self.assertIn("Provisioning failed", format(provisioner_out))
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A6_base_delta(self): def test_20_A6_base_delta(self):
"""Test Delta Certificates A6 - Provisioning with Good Base Platform, 2 Good Delta Certs and 1 Bad Delta Cert""" """Test Delta Certificates A6 - Provisioning with Good Base Platform, 2 Good Delta Certs and 1 Bad Delta Cert"""
logging.info("*****************test_19_A6 - beginning of delta certificate test *****************") logging.info("***************** test_20_A6 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform, 2 Good Delta Certs and 1 Bad Delta Cert") logging.info("Provisioning with Good Base Platform, 2 Good Delta Certs and 1 Bad Delta Cert")
# Verify device supply chain appraisal result is PASS # Verify device supply chain appraisal result is PASS
@ -844,10 +854,10 @@ class SystemTest(unittest.TestCase):
# Upload the SIDeltaCertA2 and provision # Upload the SIDeltaCertA2 and provision
AcaPortal.upload_pk_cert(SIDeltaCertA2_LOCATION) AcaPortal.upload_pk_cert(SIDeltaCertA2_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A6_base_delta SHOULD FAIL provisioning using: %s" % (SIDeltaCertA2_LOCATION)) print("test_20_A6_base_delta SHOULD FAIL provisioning using: %s" % (SIDeltaCertA2_LOCATION))
print("test_19_A6_base_delta run output: {0}".format(provisioner_out)) print("test_20_A6_base_delta run output: {0}".format(provisioner_out))
# Provisioning should fail since the Delta contains a bad component. # Provisioning should fail since the Delta contains a bad component.
self.assertIn("Provisioning failed", format(provisioner_out)) self.assertIn("Provisioning failed", format(provisioner_out))
@ -855,30 +865,30 @@ class SystemTest(unittest.TestCase):
# Upload the SIDeltaCertA2_resolved and provision # Upload the SIDeltaCertA2_resolved and provision
AcaPortal.upload_pk_cert(SIDeltaCertA2_resolved_LOCATION) AcaPortal.upload_pk_cert(SIDeltaCertA2_resolved_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A6_base_delta SHOULD PASS provisioning using: %s" % (SIDeltaCertA2_resolved_LOCATION)) print("test_20_A6_base_delta SHOULD PASS provisioning using: %s" % (SIDeltaCertA2_resolved_LOCATION))
print("test_19_A6_base_delta run output: {0}".format(provisioner_out)) print("test_20_A6_base_delta run output: {0}".format(provisioner_out))
# Verify device has been updated with supply chain appraisal result # Verify device has been updated with supply chain appraisal result
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A7_base_delta(self): def test_20_A7_base_delta(self):
"""Test Delta Certificates A7 - Provisioning with Good Base Platform, 2 Good Delta Certs and """Test Delta Certificates A7 - Provisioning with Good Base Platform, 2 Good Delta Certs and
1 Bad Delta Cert with non present component""" 1 Bad Delta Cert with non present component"""
logging.info("*****************test_19_A7 - beginning of delta certificate test *****************") logging.info("***************** test_20_A7 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform, 2 Good Delta Certs and 1 Bad Delta Cert with non present component") logging.info("Provisioning with Good Base Platform, 2 Good Delta Certs and 1 Bad Delta Cert with non present component")
# Upload the VARDeltaCertA2 and provision # Upload the VARDeltaCertA2 and provision
AcaPortal.upload_pk_cert(VARDeltaCertA2_LOCATION) AcaPortal.upload_pk_cert(VARDeltaCertA2_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A7_base_delta SHOULD FAIL provisioning using: %s" % (VARDeltaCertA2_LOCATION)) print("test_20_A7_base_delta SHOULD FAIL provisioning using: %s" % (VARDeltaCertA2_LOCATION))
print("test_19_A7_base_delta run output: {0}".format(provisioner_out)) print("test_20_A7_base_delta run output: {0}".format(provisioner_out))
# Provisioning should fail since the Delta contains a component thats not in the Base # Provisioning should fail since the Delta contains a component thats not in the Base
self.assertIn("Provisioning failed", format(provisioner_out)) self.assertIn("Provisioning failed", format(provisioner_out))
@ -886,40 +896,39 @@ class SystemTest(unittest.TestCase):
# Upload the VARDeltaCertA2_resolved and provision # Upload the VARDeltaCertA2_resolved and provision
AcaPortal.upload_pk_cert(VARDeltaCertA2_resolved_LOCATION) AcaPortal.upload_pk_cert(VARDeltaCertA2_resolved_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A7_base_delta SHOULD PASS provisioning using: %s" % (VARDeltaCertA2_resolved_LOCATION)) print("test_20_A7_base_delta SHOULD PASS provisioning using: %s" % (VARDeltaCertA2_resolved_LOCATION))
print("test_19_A7_base_delta run output: {0}".format(provisioner_out)) print("test_20_A7_base_delta run output: {0}".format(provisioner_out))
# Verify device has been updated with supply chain appraisal result # Verify device has been updated with supply chain appraisal result
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A8_base_delta(self): def test_20_A8_base_delta(self):
"""Test Delta Certificates A8 - Provisioning with Good Base Platform, 2 Good Delta Certs with 1 Delta cert """Test Delta Certificates A8 - Provisioning with Good Base Platform, 2 Good Delta Certs with 1 Delta cert
replacing component from previous, using the Delta as a base certificate""" replacing component from previous, using the Delta as a base certificate"""
logging.info("*****************test_19_A8 - beginning of delta certificate test *****************") logging.info("***************** test_20_A8 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform, 2 Good Delta Certs with 1 Delta cert replacing component from previous, using the Delta as a base certificate") logging.info("Provisioning with Good Base Platform, 2 Good Delta Certs with 1 Delta cert replacing component from previous, using the Delta as a base certificate")
# Upload the SIDeltaCertA3 and provision # Upload the SIDeltaCertA3 and provision
AcaPortal.upload_pk_cert(SIDeltaCertA3_LOCATION) AcaPortal.upload_pk_cert(SIDeltaCertA3_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A8_base_delta run output: {0}".format(provisioner_out)) print("test_20_A8_base_delta run output: {0}".format(provisioner_out))
supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries()
# Verify device has been updated with supply chain appraisal result # Verify device has been updated with supply chain appraisal result
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['BASE_DELTA_BAD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_BAD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_B1_base_delta(self): def test_20_B1_base_delta(self):
"""Test Base/Delta Certificates B1 - Provisioning with Bad Platform Cert Base """ """Test Base/Delta Certificates B1 - Provisioning with Bad Platform Cert Base """
logging.info("*****************test_19_B1 - beginning of delta certificate test *****************") logging.info("***************** test_20_B1 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Bad Platform Cert Base") logging.info("Provisioning with Bad Platform Cert Base")
logging.info("Check if ACA is online...") logging.info("Check if ACA is online...")
@ -928,19 +937,19 @@ class SystemTest(unittest.TestCase):
logging.info("Uploading CA cert: " + CA_CERT_LOCATION) logging.info("Uploading CA cert: " + CA_CERT_LOCATION)
AcaPortal.upload_ca_cert(CA_CERT_LOCATION) AcaPortal.upload_ca_cert(CA_CERT_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_B1_base_delta SHOULD FAIL provisioning using: %s" % (PBaseCertB_LOCATION)) print("test_20_B1_base_delta SHOULD FAIL provisioning using: %s" % (PBaseCertB_LOCATION))
print("test_19_B1_base_delta run output: {0}".format(provisioner_out)) print("test_20_B1_base_delta run output: {0}".format(provisioner_out))
# Provisioning should fail since the PC contains FAULTY components. # Provisioning should fail since the PC contains FAULTY components.
self.assertIn("Provisioning failed", format(provisioner_out)) self.assertIn("Provisioning failed", format(provisioner_out))
@collectors(['BASE_DELTA_BAD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_BAD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_B2_base_delta(self): def test_20_B2_base_delta(self):
"""Test Base/Delta Certificates B2 - Provisioning with Bad Platform Cert Base and 1 Good delta with 1 bad component unresolved""" """Test Base/Delta Certificates B2 - Provisioning with Bad Platform Cert Base and 1 Good delta with 1 bad component unresolved"""
logging.info("*****************test_19_B2 - beginning of delta certificate test *****************") logging.info("***************** test_20_B2 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Bad Platform Cert Base and 1 Good delta with 1 bad component unresolved") logging.info("Provisioning with Bad Platform Cert Base and 1 Good delta with 1 bad component unresolved")
# Verify device supply chain appraisal result is FAIL # Verify device supply chain appraisal result is FAIL
@ -950,19 +959,19 @@ class SystemTest(unittest.TestCase):
# Upload the SIDeltaCertB1 and provision # Upload the SIDeltaCertB1 and provision
AcaPortal.upload_pk_cert(SIDeltaCertB1_LOCATION) AcaPortal.upload_pk_cert(SIDeltaCertB1_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_B2_base_delta SHOULD FAIL provisioning using: %s" % (SIDeltaCertB1_LOCATION)) print("test_20_B2_base_delta SHOULD FAIL provisioning using: %s" % (SIDeltaCertB1_LOCATION))
print("test_19_B2_base_delta run output: {0}".format(provisioner_out)) print("test_20_B2_base_delta run output: {0}".format(provisioner_out))
# Provisioning should fail since the delta contains FAULTY component. # Provisioning should fail since the delta contains FAULTY component.
self.assertIn("Provisioning failed", format(provisioner_out)) self.assertIn("Provisioning failed", format(provisioner_out))
@collectors(['BASE_DELTA_BAD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_BAD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_B3_base_delta(self): def test_20_B3_base_delta(self):
"""Test Base/Delta Certificates B3 - Provisioning with Bad Platform Cert Base and 2 Good delta with all component resolved""" """Test Base/Delta Certificates B3 - Provisioning with Bad Platform Cert Base and 2 Good delta with all component resolved"""
logging.info("*****************test_19_B3 - beginning of delta certificate test *****************") logging.info("***************** test_20_B3 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Bad Platform Cert Base and 2 Good delta with all component resolved") logging.info("Provisioning with Bad Platform Cert Base and 2 Good delta with all component resolved")
# Verify device supply chain appraisal result is FAIL # Verify device supply chain appraisal result is FAIL
@ -972,103 +981,14 @@ class SystemTest(unittest.TestCase):
# Upload the VARDeltaCertB1 and provision # Upload the VARDeltaCertB1 and provision
AcaPortal.upload_pk_cert(VARDeltaCertB1_LOCATION) AcaPortal.upload_pk_cert(VARDeltaCertB1_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_B3_base_delta run output: {0}".format(provisioner_out)) print("test_20_B3_base_delta run output: {0}".format(provisioner_out))
# Verify device has been updated with supply chain appraisal of PASS # Verify device has been updated with supply chain appraisal of PASS
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
def make_simple_ima_baseline():
timestamp = get_current_timestamp()
if CLIENT_OS == "centos6":
records = [{"path": "/lib/udev/console_init",
"hash": send_command_sha1sum("sha1sum /lib/udev/console_init")},
{"path": "/bin/mknod",
"hash": send_command_sha1sum("sha1sum /bin/mknod")}]
elif CLIENT_OS == "centos7":
records = [{"path": "/lib/systemd/rhel-readonly",
"hash": send_command_sha1sum("sha1sum /lib/systemd/rhel-readonly")},
{"path": "/bin/sort",
"hash": send_command_sha1sum("sha1sum /bin/sort")}]
elif CLIENT_OS == "ubuntu16":
records = [{"path": "/lib/systemd/systemd-udevd",
"hash": send_command_sha1sum("sha1sum /lib/systemd/systemd-udevd")},
{"path": "/bin/udevadm",
"hash": send_command_sha1sum("sha1sum /bin/udevadm")}]
else:
logging.error("unsupported client os type: %s", CLIENT_OS)
simple_baseline = {"name": "simple_ima_baseline_{0}".format(timestamp),
"description": "a simple hard-coded ima baseline for systems testing",
"records": records}
return simple_baseline
def make_baseline_from_xml(xml_report, appraiser_type):
"""search the xml for records and add each one to a dictionary."""
timestamp = get_current_timestamp()
baseline_name = "full_{0}_baseline_{1}".format(appraiser_type, timestamp)
baseline_description = "{0} baseline created by parsing an xml report and uploaded for systems testing".format(appraiser_type)
baseline = {"name": baseline_name, "description": baseline_description}
baseline["records"] = []
tree = parse_xml_with_stripped_namespaces(xml_report)
if appraiser_type == "TPM":
pcr_tags = get_all_nodes_recursively(tree, "PcrValue")
for pcr_tag in pcr_tags:
tpm_digest = get_all_nodes_recursively(pcr_tag, "digest")[0].text
parsed_record = {}
parsed_record["pcr"] = pcr_tag.attrib['PcrNumber']
parsed_record["hash"] = binascii.hexlify(binascii.a2b_base64(tpm_digest))
baseline["records"].append(parsed_record)
if appraiser_type == "IMA":
ima_records = get_all_nodes_recursively(tree, "imaRecords")
for ima_record in ima_records:
ima_path = get_all_nodes_recursively(ima_record, "path")[0].text
ima_digest = get_all_nodes_recursively(ima_record, "digest")[0].text
parsed_record = {}
parsed_record['path'] = ima_path
hash64 = ima_digest
parsed_record["hash"] = (
binascii.hexlify(binascii.a2b_base64(hash64)))
baseline["records"].append(parsed_record)
logging.info("created {0} baseline from xml with {1} records".format(
appraiser_type, str(len(baseline["records"]))))
return baseline
def make_simple_ima_blacklist_baseline():
return {
"name": "simple_ima_blacklist_baseline_{0}".format(get_current_timestamp()),
"description": "a simple blacklist ima baseline for systems testing",
"records": [{"path": "/boot/usb-storage-foo.ko"}]
#"records": [{"path": "usb-storage-foo.ko"}]
}
def make_simple_ima_blacklist_baseline_with_hash():
return {
"name": "simple_ima_blacklist_baseline_{0}".format(get_current_timestamp()),
"description": "a simple blacklist ima baseline for systems testing",
"records": [{"hash": USB_STORAGE_FILE_HASH}]
}
def make_simple_ima_blacklist_baseline_with_file_and_hash():
return {
"name": "simple_ima_blacklist_baseline_{0}".format(get_current_timestamp()),
"description": "a simple blacklist ima baseline for systems testing",
"records": [{"path": "usb-storage_2.ko",
"hash": USB_STORAGE_FILE_HASH}]
}
def make_simple_ima_blacklist_baseline_with_updated_file_and_hash():
return {
"name": "simple_ima_blacklist_baseline_{0}".format(get_current_timestamp()),
"description": "a simple blacklist ima baseline for systems testing",
"records": [{"path": "test-file",
"hash": USB_STORAGE_FILE_HASH_2}]
}
if __name__ == '__main__': if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(SystemTest) suite = unittest.TestLoader().loadTestsFromTestCase(SystemTest)
ret = not unittest.TextTestRunner(verbosity=2).run(suite).wasSuccessful() ret = not unittest.TextTestRunner(verbosity=2).run(suite).wasSuccessful()

View File

@ -20,17 +20,17 @@ import sys
import argparse import argparse
from system_test_core import HIRSPortal, AttestationCAPortal, collectors, \ from system_test_core import HIRSPortal, AttestationCAPortal, collectors, \
send_command, send_command_sha1sum, run_hirs_report, \ send_command, send_command_sha1sum, run_hirs_report, run_hirs_provisioner_tpm_1_2, \
run_hirs_provisioner_tpm2, parse_xml_with_stripped_namespaces, get_current_timestamp, \ run_hirs_provisioner_tpm_2_0, parse_xml_with_stripped_namespaces, get_current_timestamp, \
get_all_nodes_recursively, touch_random_file_and_remove, get_random_pcr_hex_value, \ get_all_nodes_recursively, touch_random_file_and_remove, get_random_pcr_hex_value, \
is_ubuntu_client, is_tpm2, \ is_ubuntu_client, is_tpm_2_0, is_tpm_1_2, \
DEFAULT_IMA_POLICY, DEFAULT_TPM_POLICY DEFAULT_IMA_POLICY, DEFAULT_TPM_POLICY
NUMBER_OF_PCRS = 24 NUMBER_OF_PCRS = 24
suffix = os.environ.get('RANDOM_SYS_TEST_ID') suffix = os.environ.get('RANDOM_SYS_TEST_ID')
if suffix != None: if suffix != None:
print "Configuring with suffix " + suffix print("Configuring with suffix: %s" % suffix)
suffix = "-" + suffix suffix = "-" + suffix
else: else:
suffix = "" suffix = ""
@ -38,13 +38,14 @@ else:
# Change to point to your HIRS directory # Change to point to your HIRS directory
HOME_DIR = "/HIRS/" HOME_DIR = "/HIRS/"
HIRS_ACA_PORTAL_IP="172.17.0.2" HIRS_ACA_PORTAL_IP="172.17.0.2"
TPM_VERSION="2.0"
#TPM_VERSION="1.2"
# Change accordingly # Change accordingly
#COLLECTOR_LIST = None #COLLECTOR_LIST = None
#COLLECTOR_LIST = ["IMA"] #COLLECTOR_LIST = ["IMA"]
#COLLECTOR_LIST = ["TPM"] COLLECTOR_LIST = ["TPM"]
#COLLECTOR_LIST = ["IMA", "TPM"] #COLLECTOR_LIST = ["IMA", "TPM"]
COLLECTOR_LIST = ["BASE_DELTA_GOOD"] #COLLECTOR_LIST = ["BASE_DELTA_GOOD"]
#COLLECTOR_LIST = ["BASE_DELTA_BAD"] #COLLECTOR_LIST = ["BASE_DELTA_BAD"]
FORMAT = "%(asctime)-15s %(message)s" FORMAT = "%(asctime)-15s %(message)s"
@ -65,7 +66,7 @@ CLIENT_HOSTNAME="hirs-client-"+ CLIENT_OS + "-tpm2"
CLIENT=CLIENT_HOSTNAME CLIENT=CLIENT_HOSTNAME
SERVER_OS="$CLIENT_OS" SERVER_OS="$CLIENT_OS"
SERVER_HOSTNAME="hirs-appraiser-$SERVER_OS" SERVER_HOSTNAME="hirs-appraiser-$SERVER_OS"
TPM_VERSION="2.0"
HIRS_ATTESTATION_CA_PORTAL_URL = "https://" + \ HIRS_ATTESTATION_CA_PORTAL_URL = "https://" + \
HIRS_ACA_PORTAL_IP + ":" + \ HIRS_ACA_PORTAL_IP + ":" + \
HIRS_ACA_PORTAL_PORT + \ HIRS_ACA_PORTAL_PORT + \
@ -116,23 +117,29 @@ class SystemTest(unittest.TestCase):
def tearDown(self): def tearDown(self):
"""Tears down the state for testing""" """Tears down the state for testing"""
def test_01_attestation_ca_portal_online(self):
"""Test that the Attestation CA Portal is online and accessible by making a GET request.
If not online, an exception will be raised since the response code is non-200"""
logging.info("***************** Beginning of attestation ca portal online test *****************")
AcaPortal.check_is_online()
@collectors(['IMA', 'TPM'], COLLECTOR_LIST) @collectors(['IMA', 'TPM'], COLLECTOR_LIST)
def test_01_empty_baselines(self): def test_02_empty_baselines(self):
"""Test that appraisal succeeds with empty IMA and TPM baselines""" """Test that appraisal succeeds with empty IMA and TPM baselines"""
logging.info("*****************test_01 - beginning of empty baseline test*****************") logging.info("***************** Beginning of empty baseline test *****************")
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
def test_02_small_ima_appraisal(self): def test_03_small_ima_appraisal(self):
"""Test that appraisal works with a small hard-coded IMA baseline""" """Test that appraisal works with a small hard-coded IMA baseline"""
logging.info("*****************test_02 - beginning of small IMA appraisal test*****************") logging.info("***************** Beginning of small IMA appraisal test *****************")
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
def test_03_large_ima_appraisal(self): def test_04_large_ima_appraisal(self):
"""Test that appraisal works with a full-size IMA baseline""" """Test that appraisal works with a full-size IMA baseline"""
logging.info("*****************test_03 - beginning of large IMA appraisal test*****************") logging.info("***************** Beginning of large IMA appraisal test *****************")
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
def test_04_small_ima_appraisal_required_set_missing(self): def test_05_small_ima_appraisal_required_set_missing(self):
"""Test that appraisal results in an appropriate alert generation when a required set file is missing """Test that appraisal results in an appropriate alert generation when a required set file is missing
steps: steps:
@ -143,10 +150,10 @@ class SystemTest(unittest.TestCase):
- run a report from the client machine using vagrant ssh - run a report from the client machine using vagrant ssh
- make sure it failed and that one appropriate alert was thrown - make sure it failed and that one appropriate alert was thrown
""" """
logging.info("*****************test_04 - beginning of small IMA appraisal test with required set missing*****************") logging.info("***************** Beginning of small IMA appraisal test with required set missing *****************")
@collectors(['TPM', 'IMA'], COLLECTOR_LIST) @collectors(['TPM', 'IMA'], COLLECTOR_LIST)
def test_05_tpm_white_list_appraisal(self): def test_06_tpm_white_list_appraisal(self):
"""Test that appraisal works with a TPM white list baseline """Test that appraisal works with a TPM white list baseline
steps: steps:
@ -157,11 +164,11 @@ class SystemTest(unittest.TestCase):
- set the default device group to point to that policy - set the default device group to point to that policy
- run a report from the client machine - run a report from the client machine
""" """
logging.info("*****************test_05 - beginning of TPM white list appraisal test*****************") logging.info("***************** Beginning of TPM white list appraisal test *****************")
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_06_ima_blacklist_appraisal(self): def test_07_ima_blacklist_appraisal(self):
"""Test that appraisal works with a small IMA blacklist baseline """Test that appraisal works with a small IMA blacklist baseline
steps: steps:
@ -171,11 +178,11 @@ class SystemTest(unittest.TestCase):
- touch a file on the client that is contained in the blacklist - touch a file on the client that is contained in the blacklist
- run another report from the client machine and ensure the appraisal fails - run another report from the client machine and ensure the appraisal fails
""" """
logging.info("*****************test_06 - beginning of blacklist IMA appraisal test*****************") logging.info("***************** Beginning of blacklist IMA appraisal test *****************")
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_07_delta_reports_required_set(self): def test_08_delta_reports_required_set(self):
"""Test that appraisal works with delta reports and required sets. """Test that appraisal works with delta reports and required sets.
steps: steps:
@ -195,11 +202,11 @@ class SystemTest(unittest.TestCase):
so it won't be included in this one. so it won't be included in this one.
- Check that foo-bar-file is in this report, but not foo-file - Check that foo-bar-file is in this report, but not foo-file
""" """
logging.info("*****************test_07 - beginning of Delta Reports required set appraisal test*****************") logging.info("***************** Beginning of Delta Reports required set appraisal test *****************")
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_08_delta_reports_whitelist(self): def test_09_delta_reports_whitelist(self):
"""Test that appraisal works with delta reports. Each report should be """Test that appraisal works with delta reports. Each report should be
appraised individually. Checks that a failed appraisal can be followed appraised individually. Checks that a failed appraisal can be followed
by a successful appraisal if there are no errors in the second delta by a successful appraisal if there are no errors in the second delta
@ -223,11 +230,11 @@ class SystemTest(unittest.TestCase):
report so it won't be included in this one. report so it won't be included in this one.
- Check that foo-file is not in this report - Check that foo-file is not in this report
""" """
logging.info("*****************test_08 - beginning of Delta Reports whitelist appraisal test*****************") logging.info("***************** Beginning of Delta Reports whitelist appraisal test *****************")
@collectors(['IMA', 'TPM'], COLLECTOR_LIST) @collectors(['IMA', 'TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_09_on_demand(self): def test_10_on_demand(self):
"""Test that on-demand (server-initiated) appraisal works. """Test that on-demand (server-initiated) appraisal works.
steps: steps:
@ -241,11 +248,11 @@ class SystemTest(unittest.TestCase):
- check that it has the random filename and hash - check that it has the random filename and hash
- check that it contains a TPM Report - check that it contains a TPM Report
""" """
logging.info("*****************test_09 - beginning of on-demand test*****************") logging.info("***************** Beginning of on-demand test *****************")
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
@unittest.skip("SELinux issues are preventing repo sync from working") @unittest.skip("SELinux issues are preventing repo sync from working")
def test_10_failing_ima_appraisal_broad_repo_baseline(self): def test_11_failing_ima_appraisal_broad_repo_baseline(self):
"""Test that an appraisal not containing expected packages in a broad repo IMA baseline fails. """Test that an appraisal not containing expected packages in a broad repo IMA baseline fails.
steps: steps:
@ -255,12 +262,12 @@ class SystemTest(unittest.TestCase):
- Run a HIRS report and ensure it fails - Run a HIRS report and ensure it fails
- Ensure that at least one of the expected alerts has been generated - Ensure that at least one of the expected alerts has been generated
""" """
logging.info("*****************test_10 - beginning of broad repo failing appraisal test*****************") logging.info("***************** Beginning of broad repo failing appraisal test *****************")
@collectors(['IMA'], COLLECTOR_LIST) @collectors(['IMA'], COLLECTOR_LIST)
@unittest.skip("SELinux issues are preventing repo sync from working") @unittest.skip("SELinux issues are preventing repo sync from working")
@unittest.skipIf(is_ubuntu_client(CLIENT_OS), "Skipping this test due to client OS " + CLIENT_OS) @unittest.skipIf(is_ubuntu_client(CLIENT_OS), "Skipping this test due to client OS " + CLIENT_OS)
def test_11_successful_ima_appraisal_broad_repo_baseline(self): def test_12_successful_ima_appraisal_broad_repo_baseline(self):
"""Test that an appraisal containing expected packages in a broad repo IMA baseline passes. """Test that an appraisal containing expected packages in a broad repo IMA baseline passes.
This test only works on CentOS 6 and 7. This test only works on CentOS 6 and 7.
@ -272,213 +279,221 @@ class SystemTest(unittest.TestCase):
- Run a HIRS report and ensure it passes - Run a HIRS report and ensure it passes
- Ensure that there are no new alerts - Ensure that there are no new alerts
""" """
logging.info("*****************test_11 - beginning of broad repo successful appraisal test*****************") logging.info("***************** Beginning of broad repo successful appraisal test *****************")
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_1_2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_12_attestation_ca_portal_online(self): def test_13_tpm_1_2_initial_provision(self):
"""Test that the Attestation CA Portal is online and accessible by making a GET request. """Test that running the TPM 1.2 hirs provisioner works"""
If not online, an exception will be raised since the response code is non-200""" logging.info("***************** Beginning of initial TPM 1.2 provisioner run *****************")
logging.info("*****************test_12 - beginning of attestation ca portal online test *****************")
AcaPortal.check_is_online() # Run the provisioner to ensure that it provisions successfully
provisioner_out = run_hirs_provisioner_tpm_1_2(CLIENT)
print("Initial TPM 1.2 provisioner run output: {0}".format(provisioner_out))
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_13_tpm2_initial_provision(self): def test_14_tpm_2_0_initial_provision(self):
"""Test that running the tpm2 hirs provisioner works""" """Test that running the TPM 2.0 hirs provisioner works"""
logging.info("*****************test_13 - beginning of initial provisioner run *****************") logging.info("***************** Beginning of initial TPM 2.0 provisioner run *****************")
# Run the provisioner to ensure that it provisions successfully # Run the provisioner to ensure that it provisions successfully
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm2(CLIENT)
print("Initial provisioner run output: {0}".format(provisioner_out)) print("Initial provisioner run output: {0}".format(provisioner_out))
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_14_device_info_report_stored_after_provisioning(self): def test_15_device_info_report_stored_after_provisioning(self):
"""Test that running the hirs provisioner results in storing a device info report for """Test that running the hirs provisioner results in storing a device info report for
the device in the DB""" the device in the DB"""
logging.info("*****************test_14 - beginning of provisioner + device info report test *****************") logging.info("***************** Beginning of device info report test *****************")
logging.info("getting devices from ACA portal")
logging.info("Getting devices from ACA portal...")
aca_portal_devices = AcaPortal.get_devices() aca_portal_devices = AcaPortal.get_devices()
self.assertEqual(aca_portal_devices['recordsTotal'], 1) self.assertEqual(aca_portal_devices['recordsTotal'], 1)
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_15_supply_chain_validation_summary_stored_after_second_provisioning(self): def test_16_supply_chain_validation_summary_stored_after_second_provisioning(self):
"""Test that running the hirs provisioner, a second time, results in storing a supply chain validation """Test that running the hirs provisioner, a second time, results in storing a supply chain validation
record in the database""" record in the database"""
logging.info("*****************test_15 - beginning of provisioner + supply chain validation summary test *****************") logging.info("***************** Beginning of supply chain validation summary test *****************")
if is_tpm2(TPM_VERSION):
logging.info("Using TPM 2.0")
logging.info("Uploading CA cert: " + CA_CERT_LOCATION) logging.info("Uploading CA cert: " + CA_CERT_LOCATION)
AcaPortal.upload_ca_cert(CA_CERT_LOCATION) AcaPortal.upload_ca_cert(CA_CERT_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT)
else:
# Supply chain validation only supported on CentOS 7
if CLIENT_OS == "centos7":
AcaPortal.upload_ca_cert(EK_CA_CERT_LOCATION)
AcaPortal.enable_ec_validation()
provisioner_out = run_hirs_provisioner(CLIENT)
provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("Second provisioner run output: {0}".format(provisioner_out)) print("Second provisioner run output: {0}".format(provisioner_out))
supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries() supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries()
# verify this is one SCVS record indicating PASS # verify this is one SCVS record indicating PASS
self.assertEqual(supply_chain_validation_summaries['recordsTotal'], 2) self.assertEqual(supply_chain_validation_summaries['recordsTotal'], 2)
self.assertEqual(supply_chain_validation_summaries['data'][0]['overallValidationResult'], "PASS") self.assertEqual(supply_chain_validation_summaries['data'][0]['overallValidationResult'], "PASS")
self.assertEqual(supply_chain_validation_summaries['data'][1]['overallValidationResult'], "PASS") self.assertEqual(supply_chain_validation_summaries['data'][1]['overallValidationResult'], "PASS")
# verify device has been updated with supply chain appraisal result # verify device has been updated with supply chain appraisal result
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_16_ek_info_report(self): def test_17_ek_info_report(self):
"""Test that running the hirs provisioner results in storing EK certs info report for """Test that running the hirs provisioner results in storing EK certs info report for
the device in the DB""" the device in the DB"""
logging.info("*****************test_16 - beginning of provisioner + Endorsement certs info report test *****************") logging.info("***************** Beginning of Endorsement Certs info report test *****************")
logging.info("getting ek certs from ACA portal")
logging.info("Getting EK Certs from ACA portal...")
cert_list = AcaPortal.get_ek_certs() cert_list = AcaPortal.get_ek_certs()
self.assertEqual(cert_list['recordsTotal'], 1) self.assertEqual(cert_list['recordsTotal'], 1)
self.assertEqual(cert_list['data'][0]['credentialType'], "TCPA Trusted Platform Module Endorsement") self.assertEqual(cert_list['data'][0]['credentialType'], "TCPA Trusted Platform Module Endorsement")
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_17_pk_info_report(self): def test_18_pk_info_report(self):
"""Test that running the hirs provisioner results in storing PK certs info report for """Test that running the hirs provisioner results in storing PK certs info report for
the device in the DB""" the device in the DB"""
logging.info("*****************test_17 - beginning of provisioner + Platform certs info report test *****************") logging.info("***************** Beginning Platform Certs info report test *****************")
logging.info("getting pk certs from ACA portal")
logging.info("Getting PK Certs from ACA portal...")
cert_list = AcaPortal.get_pk_certs() cert_list = AcaPortal.get_pk_certs()
self.assertEqual(cert_list['recordsTotal'], 1) self.assertEqual(cert_list['recordsTotal'], 1)
self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement") self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement")
@collectors(['TPM'], COLLECTOR_LIST) @collectors(['TPM'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_18_trust_chain_info_report(self): def test_19_trust_chain_info_report(self):
"""Test that running the hirs provisioner results in storing trust chains info report for """Test that running the hirs provisioner results in storing trust chains info report for
the device in the DB""" the device in the DB"""
logging.info("*****************test_18 - beginning of provisioner + Trust chains info report test *****************") logging.info("***************** Beginning of Trust Chain info report test *****************")
logging.info("getting trust chains from ACA portal")
logging.info("Getting Trust Chains from ACA portal...")
trust_chain_list = AcaPortal.get_trust_chains() trust_chain_list = AcaPortal.get_trust_chains()
self.assertEqual(trust_chain_list['recordsTotal'], 1) self.assertEqual(trust_chain_list['recordsTotal'], 1)
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A1_base_delta(self): def test_20_A1_base_delta(self):
"""Test Delta Certificates A1 - Provisioning with Good Base Platform Cert Base (via Platform Cert on TPM)""" """Test Delta Certificates A1 - Provisioning with Good Base Platform Cert (via Platform Cert on TPM Emulator)"""
logging.info("*****************test_19_A1 - beginning of delta certificate test *****************") logging.info("***************** test_20_A1 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform Cert (via Platform Cert on TPM Emulator)") logging.info("Provisioning with Good Base Platform Cert (via Platform Cert on TPM Emulator)")
logging.info("Check if ACA is online...") logging.info("Check if ACA is online...")
AcaPortal.check_is_online() AcaPortal.check_is_online()
logging.info("Uploading CA cert: " + CA_CERT_LOCATION) logging.info("Uploading CA Cert: " + CA_CERT_LOCATION)
AcaPortal.upload_ca_cert(CA_CERT_LOCATION) AcaPortal.upload_ca_cert(CA_CERT_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A1_base_delta run output: {0}".format(provisioner_out)) print("test_20_A1_base_delta run output: {0}".format(provisioner_out))
# Verify device supply chain appraisal result is PASS # Verify device supply chain appraisal result is PASS
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A2_base_delta(self): def test_20_A2_base_delta(self):
"""Test Delta Certificates A2 - Attempt to upload Base cert with holder already having a Base Platform Cert associated with it""" """Test Delta Certificates A2 - Attempt to upload Base cert with holder already having a Base Platform Cert associated with it"""
logging.info("*****************test_19_A8 - beginning of delta certificate test *****************") logging.info("***************** test_20_A2 - Beginning of delta certificate test *****************")
logging.info("Attempt to upload PBaseCertA, with PBaseCertA already loaded in the ACA.") logging.info("Attempt to upload PBaseCertB, with PBaseCertA already loaded in the ACA.")
print("test_19_A2_base_delta Platform Cert has already been loaded. Attempting to upload second Platform Cert: %s" % (PBaseCertA_LOCATION)) print("test_20_A2_base_delta. PBaseCertA has already been loaded. Attempting to upload second Platform Cert: %s" % (PBaseCertB_LOCATION))
# Confirm there is a Platform Cert already loaded # Confirm there is one Platform Base Cert already loaded
cert_list = AcaPortal.get_pk_certs() cert_list = AcaPortal.get_pk_certs()
self.assertEqual(cert_list['recordsTotal'], 1) self.assertEqual(cert_list['recordsTotal'], 1)
print("Number of Platform Certs: %d" % (cert_list['recordsTotal']))
self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement") self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement")
self.assertEqual(cert_list['data'][0]['platformType'], "Base") self.assertEqual(cert_list['data'][0]['platformType'], "Base")
# Try uploading a second Platform Base Cert # Try uploading a second Platform Base Cert
AcaPortal.upload_pk_cert(PBaseCertA_LOCATION) print("Attempting to upload a second Platform Base Cert...")
AcaPortal.upload_pk_cert(PBaseCertB_LOCATION)
# Confirm Platform Base Cert has not been loaded # Confirm Platform Base Cert has not been loaded
cert_list = AcaPortal.get_pk_certs() cert_list = AcaPortal.get_pk_certs()
self.assertEqual(cert_list['recordsTotal'], 1) self.assertEqual(cert_list['recordsTotal'], 1)
print("Number of Platform Certs: %d" % (cert_list['recordsTotal']))
self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement") self.assertEqual(cert_list['data'][0]['credentialType'], "TCG Trusted Platform Endorsement")
self.assertEqual(cert_list['data'][0]['platformType'], "Base") self.assertEqual(cert_list['data'][0]['platformType'], "Base")
if (cert_list['recordsTotal'] == 1): if (cert_list['recordsTotal'] == 1):
print ("SUCCESS.") print ("SUCCESS.\n")
else: else:
print ("FAILED.") print ("FAILED.\n")
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A3_base_delta(self): def test_20_A3_base_delta(self):
"""Test Delta Certificates A3 - Provisioning with Good Base Platform Cert Base and 1 Delta Cert""" """Test Delta Certificates A3 - Provisioning with Good Base Platform Cert Base and 1 Delta Cert"""
logging.info("*****************test_19_A3 - beginning of delta certificate test *****************") logging.info("***************** test_20_A3 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform Cert Base and 1 Delta Cert") logging.info("Provisioning with Good Base Platform Cert Base and 1 Delta Cert")
# Verify device supply chain appraisal result is PASS # Verify device supply chain appraisal result is PASS
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
# Upload the SIDelta cert and provision # Upload the SIDeltaCertA1 and provision
AcaPortal.upload_pk_cert(SIDeltaCertA1_LOCATION) AcaPortal.upload_pk_cert(SIDeltaCertA1_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_20_A3_base_delta run output: {0}".format(provisioner_out))
print("test_19_A3_base_delta run output: {0}".format(provisioner_out))
supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries() supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries()
# verify this is one SCVS record indicating PASS # Verify this is one SCVS record indicating PASS
self.assertEqual(supply_chain_validation_summaries['recordsTotal'], 2) self.assertEqual(supply_chain_validation_summaries['recordsTotal'], 2)
self.assertEqual(supply_chain_validation_summaries['data'][0]['overallValidationResult'], "PASS") self.assertEqual(supply_chain_validation_summaries['data'][0]['overallValidationResult'], "PASS")
self.assertEqual(supply_chain_validation_summaries['data'][1]['overallValidationResult'], "PASS") self.assertEqual(supply_chain_validation_summaries['data'][1]['overallValidationResult'], "PASS")
# verify device has been updated with supply chain appraisal result
# Verify device has been updated with supply chain appraisal result
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A4_base_delta(self): def test_20_A4_base_delta(self):
"""Test Delta Certificates A4 - Provisioning with Good Base Platform Cert Base and 2 Delta Certs""" """Test Delta Certificates A4 - Provisioning with Good Base Platform Cert Base and 2 Delta Certs"""
logging.info("*****************test_19_A4 - beginning of delta certificate test *****************") logging.info("***************** test_20_A4 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform Cert Base and 2 Delta Certs") logging.info("Provisioning with Good Base Platform Cert Base and 2 Delta Certs")
# Verify device supply chain appraisal result is PASS # Verify device supply chain appraisal result is PASS
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
# Upload the VARDelta cert and provision # Upload the VARDeltaCertA1 and provision
AcaPortal.upload_pk_cert(VARDeltaCertA1_LOCATION) AcaPortal.upload_pk_cert(VARDeltaCertA1_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A4_base_delta run output: {0}".format(provisioner_out)) print("test_20_A4_base_delta run output: {0}".format(provisioner_out))
supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries() supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries()
# verify this is one SCVS record indicating PASS
# Verify this is one SCVS record indicating PASS
self.assertEqual(supply_chain_validation_summaries['recordsTotal'], 3) self.assertEqual(supply_chain_validation_summaries['recordsTotal'], 3)
self.assertEqual(supply_chain_validation_summaries['data'][0]['overallValidationResult'], "PASS") self.assertEqual(supply_chain_validation_summaries['data'][0]['overallValidationResult'], "PASS")
self.assertEqual(supply_chain_validation_summaries['data'][1]['overallValidationResult'], "PASS") self.assertEqual(supply_chain_validation_summaries['data'][1]['overallValidationResult'], "PASS")
self.assertEqual(supply_chain_validation_summaries['data'][2]['overallValidationResult'], "PASS") self.assertEqual(supply_chain_validation_summaries['data'][2]['overallValidationResult'], "PASS")
# verify device has been updated with supply chain appraisal result
# Verify device has been updated with supply chain appraisal result
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A5_base_delta(self): def test_20_A5_base_delta(self):
"""Test Delta Certificates A5 - Provisioning with Good Base Platform Cert and 1 Bad Delta Cert""" """Test Delta Certificates A5 - Provisioning with Good Base Platform Cert and 1 Bad Delta Cert"""
logging.info("*****************test_19_A5 - beginning of delta certificate test *****************") logging.info("***************** test_20_A5 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform Cert and 1 Bad Delta Cert") logging.info("Provisioning with Good Base Platform Cert and 1 Bad Delta Cert")
# TODO: Determine if we need this test # TODO: Determine if we need this test
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A6_base_delta(self): def test_20_A6_base_delta(self):
"""Test Delta Certificates A6 - Provisioning with Good Base Platform, 2 Good Delta Certs and 1 Bad Delta Cert""" """Test Delta Certificates A6 - Provisioning with Good Base Platform, 2 Good Delta Certs and 1 Bad Delta Cert"""
logging.info("*****************test_19_A6 - beginning of delta certificate test *****************") logging.info("***************** test_20_A6 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform, 2 Good Delta Certs and 1 Bad Delta Cert") logging.info("Provisioning with Good Base Platform, 2 Good Delta Certs and 1 Bad Delta Cert")
# Verify device supply chain appraisal result is PASS # Verify device supply chain appraisal result is PASS
@ -488,41 +503,41 @@ class SystemTest(unittest.TestCase):
# Upload the SIDeltaCertA2 and provision # Upload the SIDeltaCertA2 and provision
AcaPortal.upload_pk_cert(SIDeltaCertA2_LOCATION) AcaPortal.upload_pk_cert(SIDeltaCertA2_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A6_base_delta SHOULD FAIL provisioning using: %s" % (SIDeltaCertA2_LOCATION)) print("test_20_A6_base_delta SHOULD FAIL provisioning using: %s" % (SIDeltaCertA2_LOCATION))
print("test_19_A6_base_delta run output: {0}".format(provisioner_out)) print("test_20_A6_base_delta run output: {0}".format(provisioner_out))
# Provisioning should fail since the Delta contains a bad component. # Provisioning should fail since the Delta contains a bad component.
self.assertIn("Provisioning failed", format(provisioner_out)) self.assertIn("Provisioning failed", format(provisioner_out))
# Upload the SIDeltaCertA2_resolved cert and provision # Upload the SIDeltaCertA2_resolved and provision
AcaPortal.upload_pk_cert(SIDeltaCertA2_resolved_LOCATION) AcaPortal.upload_pk_cert(SIDeltaCertA2_resolved_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A6_base_delta SHOULD PASS provisioning using: %s" % (SIDeltaCertA2_resolved_LOCATION)) print("test_20_A6_base_delta SHOULD PASS provisioning using: %s" % (SIDeltaCertA2_resolved_LOCATION))
print("test_19_A6_base_delta run output: {0}".format(provisioner_out)) print("test_20_A6_base_delta run output: {0}".format(provisioner_out))
# verify device has been updated with supply chain appraisal result # Verify device has been updated with supply chain appraisal result
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A7_base_delta(self): def test_20_A7_base_delta(self):
"""Test Delta Certificates A7 - Provisioning with Good Base Platform, 2 Good Delta Certs and """Test Delta Certificates A7 - Provisioning with Good Base Platform, 2 Good Delta Certs and
1 Bad Delta Cert with non present component""" 1 Bad Delta Cert with non present component"""
logging.info("*****************test_19_A7 - beginning of delta certificate test *****************") logging.info("***************** test_20_A7 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform, 2 Good Delta Certs and 1 Bad Delta Cert with non present component") logging.info("Provisioning with Good Base Platform, 2 Good Delta Certs and 1 Bad Delta Cert with non present component")
# Upload the VARDeltaCertA2 and provision # Upload the VARDeltaCertA2 and provision
AcaPortal.upload_pk_cert(VARDeltaCertA2_LOCATION) AcaPortal.upload_pk_cert(VARDeltaCertA2_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A7_base_delta SHOULD FAIL provisioning using: %s" % (VARDeltaCertA2_LOCATION)) print("test_20_A7_base_delta SHOULD FAIL provisioning using: %s" % (VARDeltaCertA2_LOCATION))
print("test_19_A7_base_delta run output: {0}".format(provisioner_out)) print("test_20_A7_base_delta run output: {0}".format(provisioner_out))
# Provisioning should fail since the Delta contains a component thats not in the Base # Provisioning should fail since the Delta contains a component thats not in the Base
self.assertIn("Provisioning failed", format(provisioner_out)) self.assertIn("Provisioning failed", format(provisioner_out))
@ -530,63 +545,61 @@ class SystemTest(unittest.TestCase):
# Upload the VARDeltaCertA2_resolved and provision # Upload the VARDeltaCertA2_resolved and provision
AcaPortal.upload_pk_cert(VARDeltaCertA2_resolved_LOCATION) AcaPortal.upload_pk_cert(VARDeltaCertA2_resolved_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A7_base_delta SHOULD PASS provisioning using: %s" % (VARDeltaCertA2_resolved_LOCATION)) print("test_20_A7_base_delta SHOULD PASS provisioning using: %s" % (VARDeltaCertA2_resolved_LOCATION))
print("test_19_A7_base_delta run output: {0}".format(provisioner_out)) print("test_20_A7_base_delta run output: {0}".format(provisioner_out))
# verify device has been updated with supply chain appraisal result # Verify device has been updated with supply chain appraisal result
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_GOOD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_A8_base_delta(self): def test_20_A8_base_delta(self):
"""Test Delta Certificates A8 - Provisioning with Good Base Platform, 2 Good Delta Certs with 1 Delta cert """Test Delta Certificates A8 - Provisioning with Good Base Platform, 2 Good Delta Certs with 1 Delta cert
replacing component from previous, using the Delta as a base certificate""" replacing component from previous, using the Delta as a base certificate"""
logging.info("*****************test_19_A8 - beginning of delta certificate test *****************") logging.info("***************** test_20_A8 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Good Base Platform, 2 Good Delta Certs with 1 Delta cert replacing component from previous, using the Delta as a base certificate") logging.info("Provisioning with Good Base Platform, 2 Good Delta Certs with 1 Delta cert replacing component from previous, using the Delta as a base certificate")
# Upload the SIDeltaCertA3 and provision # Upload the SIDeltaCertA3 and provision
AcaPortal.upload_pk_cert(SIDeltaCertA3_LOCATION) AcaPortal.upload_pk_cert(SIDeltaCertA3_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_A8_base_delta run output: {0}".format(provisioner_out)) print("test_20_A8_base_delta run output: {0}".format(provisioner_out))
supply_chain_validation_summaries = AcaPortal.get_supply_chain_validation_summaries()
# Verify device has been updated with supply chain appraisal result # Verify device has been updated with supply chain appraisal result
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS") self.assertEqual(devices['data'][0]['device']['supplyChainStatus'], "PASS")
@collectors(['BASE_DELTA_BAD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_BAD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_B1_base_delta(self): def test_20_B1_base_delta(self):
"""Test Delta Certificates B1 - Provisioning with Bad Platform Cert Base (ACA upload)""" """Test Base/Delta Certificates B1 - Provisioning with Bad Platform Cert Base """
logging.info("*****************test_19_B1 - beginning of delta certificate test *****************") logging.info("***************** test_20_B1 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Bad Platform Cert Base") logging.info("Provisioning with Bad Platform Cert Base")
logging.info("Check if ACA is online...") logging.info("Check if ACA is online...")
AcaPortal.check_is_online() AcaPortal.check_is_online()
if is_tpm2(TPM_VERSION):
logging.info("Using TPM 2.0")
logging.info("Uploading CA cert: " + CA_CERT_LOCATION) logging.info("Uploading CA cert: " + CA_CERT_LOCATION)
AcaPortal.upload_ca_cert(CA_CERT_LOCATION) AcaPortal.upload_ca_cert(CA_CERT_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_B1_base_delta run output: {0}".format(provisioner_out)) print("test_20_B1_base_delta SHOULD FAIL provisioning using: %s" % (PBaseCertB_LOCATION))
print("test_20_B1_base_delta run output: {0}".format(provisioner_out))
# Provisioning should fail since the PC contains FAULTY component. # Provisioning should fail since the PC contains FAULTY components.
self.assertIn("Provisioning failed", format(provisioner_out)) self.assertIn("Provisioning failed", format(provisioner_out))
@collectors(['BASE_DELTA_BAD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_BAD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_B2_base_delta(self): def test_20_B2_base_delta(self):
"""Test Delta Certificates B2 - Provisioning with Bad Platform Cert Base and 1 Good delta with 1 bad component unresolved""" """Test Base/Delta Certificates B2 - Provisioning with Bad Platform Cert Base and 1 Good delta with 1 bad component unresolved"""
logging.info("*****************test_19_B2 - beginning of delta certificate test *****************") logging.info("***************** test_20_B2 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Bad Platform Cert Base and 1 Good delta with 1 bad component unresolved") logging.info("Provisioning with Bad Platform Cert Base and 1 Good delta with 1 bad component unresolved")
logging.info("Uploading Delta Platform Cert: " + SIDeltaCertB1_LOCATION)
# Verify device supply chain appraisal result is FAIL # Verify device supply chain appraisal result is FAIL
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()
@ -595,19 +608,19 @@ class SystemTest(unittest.TestCase):
# Upload the SIDeltaCertB1 and provision # Upload the SIDeltaCertB1 and provision
AcaPortal.upload_pk_cert(SIDeltaCertB1_LOCATION) AcaPortal.upload_pk_cert(SIDeltaCertB1_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_B2_base_delta SHOULD FAIL provisioning using: %s" % (SIDeltaCertB1_LOCATION)) print("test_20_B2_base_delta SHOULD FAIL provisioning using: %s" % (SIDeltaCertB1_LOCATION))
print("test_19_B2_base_delta run output: {0}".format(provisioner_out)) print("test_20_B2_base_delta run output: {0}".format(provisioner_out))
# Provisioning should fail since the delta contains FAULTY component. # Provisioning should fail since the delta contains FAULTY component.
self.assertIn("Provisioning failed", format(provisioner_out)) self.assertIn("Provisioning failed", format(provisioner_out))
@collectors(['BASE_DELTA_BAD'], COLLECTOR_LIST) @collectors(['BASE_DELTA_BAD'], COLLECTOR_LIST)
@unittest.skipIf(not is_tpm2(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION) @unittest.skipIf(not is_tpm_2_0(TPM_VERSION), "Skipping this test due to TPM Version " + TPM_VERSION)
def test_19_B3_base_delta(self): def test_20_B3_base_delta(self):
"""Test Delta Certificates B3 - Provisioning with Bad Platform Cert Base and 2 Good delta with all component resolved""" """Test Base/Delta Certificates B3 - Provisioning with Bad Platform Cert Base and 2 Good delta with all component resolved"""
logging.info("*****************test_19_B3 - beginning of delta certificate test *****************") logging.info("***************** test_20_B3 - Beginning of delta certificate test *****************")
logging.info("Provisioning with Bad Platform Cert Base and 2 Good delta with all component resolved") logging.info("Provisioning with Bad Platform Cert Base and 2 Good delta with all component resolved")
# Verify device supply chain appraisal result is FAIL # Verify device supply chain appraisal result is FAIL
@ -617,9 +630,9 @@ class SystemTest(unittest.TestCase):
# Upload the VARDeltaCertB1 and provision # Upload the VARDeltaCertB1 and provision
AcaPortal.upload_pk_cert(VARDeltaCertB1_LOCATION) AcaPortal.upload_pk_cert(VARDeltaCertB1_LOCATION)
AcaPortal.enable_supply_chain_validations() AcaPortal.enable_supply_chain_validations()
provisioner_out = run_hirs_provisioner_tpm2(CLIENT) provisioner_out = run_hirs_provisioner_tpm_2_0(CLIENT)
print("test_19_B3_base_delta run output: {0}".format(provisioner_out)) print("test_20_B3_base_delta run output: {0}".format(provisioner_out))
# Verify device has been updated with supply chain appraisal of PASS # Verify device has been updated with supply chain appraisal of PASS
devices = AcaPortal.get_devices() devices = AcaPortal.get_devices()

View File

@ -290,8 +290,8 @@ class AttestationCAPortal:
def disable_supply_chain_validations(self): def disable_supply_chain_validations(self):
# the initial POST request goes through, but the redirect from the server is attempted which results in a 404, # The initial POST request goes through, but the redirect from the server is attempted
# or possibly a 200 on centos7, apparently. # which results in a 404, or possibly a 200 on centos7, apparently.
self.request("post", "portal/policy/update-ec-validation", self.request("post", "portal/policy/update-ec-validation",
expected_status_codes=[404, 200], params={'ecValidate': "unchecked",}) expected_status_codes=[404, 200], params={'ecValidate': "unchecked",})
self.request("post", "portal/policy/update-pc-validation", self.request("post", "portal/policy/update-pc-validation",
@ -301,8 +301,8 @@ class AttestationCAPortal:
def enable_supply_chain_validations(self): def enable_supply_chain_validations(self):
# the initial POST request goes through, but the redirect from the server is attempted which results in a 404, # The initial POST request goes through, but the redirect from the server is attempted
# or possibly a 200 on centos7, apparently. # which results in a 404, or possibly a 200 on centos7, apparently.
self.request("post", "portal/policy/update-ec-validation", self.request("post", "portal/policy/update-ec-validation",
expected_status_codes=[404, 200], params={'ecValidate': "checked",}) expected_status_codes=[404, 200], params={'ecValidate': "checked",})
self.request("post", "portal/policy/update-pc-validation", self.request("post", "portal/policy/update-pc-validation",
@ -422,10 +422,16 @@ def run_hirs_report_and_clear_cache(client_hostname):
CACHED_XML_REPORT = None CACHED_XML_REPORT = None
return client_out return client_out
def run_hirs_provisioner_tpm2(client_hostname): def run_hirs_provisioner_tpm_1_2(client_hostname):
"""Runs the hirs provisioner TPM2 """Runs the hirs provisioner TPM 1.2"""
logging.info("running hirs provisioner TPM 1.2 on {0}".format(client_hostname))
client_out = send_command("hirs-provisioner provision")
return client_out
def run_hirs_provisioner_tpm_2_0(client_hostname):
"""Runs the hirs provisioner TPM 2.0
""" """
logging.info("running hirs provisioner tpm2 on {0}".format(client_hostname)) logging.info("running hirs provisioner TPM 2.0 on {0}".format(client_hostname))
client_out = send_command("hirs-provisioner-tpm2 provision") client_out = send_command("hirs-provisioner-tpm2 provision")
return client_out return client_out
@ -448,7 +454,6 @@ def parse_xml_with_stripped_namespaces(raw_xml_string):
def get_all_nodes_recursively(tree_node, node_name): def get_all_nodes_recursively(tree_node, node_name):
return tree_node.findall('.//' + node_name) return tree_node.findall('.//' + node_name)
def touch_random_file_and_remove(client_hostname): def touch_random_file_and_remove(client_hostname):
"""Write a random string to a random filename in /tmp/, read it as root, then delete it. """Write a random string to a random filename in /tmp/, read it as root, then delete it.
""" """
@ -461,11 +466,99 @@ def touch_random_file_and_remove(client_hostname):
rm_command = "rm {}".format(filename) rm_command = "rm {}".format(filename)
combined_command = "{};{};{};{}".format(echo_command, cat_command, sha_command, rm_command) combined_command = "{};{};{};{}".format(echo_command, cat_command, sha_command, rm_command)
# command_output = send_vagrant_command(combined_command, client_hostname)
sha_hash = command_output.split()[1] sha_hash = command_output.split()[1]
return (filename, sha_hash) return (filename, sha_hash)
def make_simple_ima_baseline():
timestamp = get_current_timestamp()
if CLIENT_OS == "centos6":
records = [{"path": "/lib/udev/console_init",
"hash": send_command_sha1sum("sha1sum /lib/udev/console_init")},
{"path": "/bin/mknod",
"hash": send_command_sha1sum("sha1sum /bin/mknod")}]
elif CLIENT_OS == "centos7":
records = [{"path": "/lib/systemd/rhel-readonly",
"hash": send_command_sha1sum("sha1sum /lib/systemd/rhel-readonly")},
{"path": "/bin/sort",
"hash": send_command_sha1sum("sha1sum /bin/sort")}]
elif CLIENT_OS == "ubuntu16":
records = [{"path": "/lib/systemd/systemd-udevd",
"hash": send_command_sha1sum("sha1sum /lib/systemd/systemd-udevd")},
{"path": "/bin/udevadm",
"hash": send_command_sha1sum("sha1sum /bin/udevadm")}]
else:
logging.error("unsupported client os type: %s", CLIENT_OS)
simple_baseline = {"name": "simple_ima_baseline_{0}".format(timestamp),
"description": "a simple hard-coded ima baseline for systems testing",
"records": records}
return simple_baseline
def make_baseline_from_xml(xml_report, appraiser_type):
"""search the xml for records and add each one to a dictionary."""
timestamp = get_current_timestamp()
baseline_name = "full_{0}_baseline_{1}".format(appraiser_type, timestamp)
baseline_description = "{0} baseline created by parsing an xml report and uploaded for systems testing".format(appraiser_type)
baseline = {"name": baseline_name, "description": baseline_description}
baseline["records"] = []
tree = parse_xml_with_stripped_namespaces(xml_report)
if appraiser_type == "TPM":
pcr_tags = get_all_nodes_recursively(tree, "PcrValue")
for pcr_tag in pcr_tags:
tpm_digest = get_all_nodes_recursively(pcr_tag, "digest")[0].text
parsed_record = {}
parsed_record["pcr"] = pcr_tag.attrib['PcrNumber']
parsed_record["hash"] = binascii.hexlify(binascii.a2b_base64(tpm_digest))
baseline["records"].append(parsed_record)
if appraiser_type == "IMA":
ima_records = get_all_nodes_recursively(tree, "imaRecords")
for ima_record in ima_records:
ima_path = get_all_nodes_recursively(ima_record, "path")[0].text
ima_digest = get_all_nodes_recursively(ima_record, "digest")[0].text
parsed_record = {}
parsed_record['path'] = ima_path
hash64 = ima_digest
parsed_record["hash"] = (
binascii.hexlify(binascii.a2b_base64(hash64)))
baseline["records"].append(parsed_record)
logging.info("created {0} baseline from xml with {1} records".format(
appraiser_type, str(len(baseline["records"]))))
return baseline
def make_simple_ima_blacklist_baseline():
return {
"name": "simple_ima_blacklist_baseline_{0}".format(get_current_timestamp()),
"description": "a simple blacklist ima baseline for systems testing",
"records": [{"path": "/boot/usb-storage-foo.ko"}]
#"records": [{"path": "usb-storage-foo.ko"}]
}
def make_simple_ima_blacklist_baseline_with_hash():
return {
"name": "simple_ima_blacklist_baseline_{0}".format(get_current_timestamp()),
"description": "a simple blacklist ima baseline for systems testing",
"records": [{"hash": USB_STORAGE_FILE_HASH}]
}
def make_simple_ima_blacklist_baseline_with_file_and_hash():
return {
"name": "simple_ima_blacklist_baseline_{0}".format(get_current_timestamp()),
"description": "a simple blacklist ima baseline for systems testing",
"records": [{"path": "usb-storage_2.ko",
"hash": USB_STORAGE_FILE_HASH}]
}
def make_simple_ima_blacklist_baseline_with_updated_file_and_hash():
return {
"name": "simple_ima_blacklist_baseline_{0}".format(get_current_timestamp()),
"description": "a simple blacklist ima baseline for systems testing",
"records": [{"path": "test-file",
"hash": USB_STORAGE_FILE_HASH_2}]
}
def get_random_pcr_hex_value(): def get_random_pcr_hex_value():
""" Gets a random TPM PCR value by combining 2 UUIDs and getting a substring """ Gets a random TPM PCR value by combining 2 UUIDs and getting a substring
""" """
@ -479,5 +572,8 @@ def get_current_timestamp():
def is_ubuntu_client(client_os): def is_ubuntu_client(client_os):
return client_os in ["ubuntu14", "ubuntu16"] return client_os in ["ubuntu14", "ubuntu16"]
def is_tpm2(tpm_version): def is_tpm_1_2(tpm_version):
return tpm_version in ["1.2"]
def is_tpm_2_0(tpm_version):
return tpm_version in ["2.0", "2"] return tpm_version in ["2.0", "2"]

View File

@ -0,0 +1,14 @@
#!/bin/bash
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
export CLIENT_OS=centos7
export CLIENT_HOSTNAME=hirs-client-$CLIENT_OS-tpm1_2
export SERVER_OS=$CLIENT_OS
export SERVER_HOSTNAME=hirs-appraiser-$SERVER_OS
export ENABLED_COLLECTORS=TPM
export TPM_VERSION=1.2
$SCRIPT_DIR/systems-test.core.sh

View File

@ -20,9 +20,9 @@ SYSTEM_TEST_EXIT_CODE=$PIPESTATUS
# Check result # Check result
if [[ $SYSTEM_TEST_EXIT_CODE == 0 ]] if [[ $SYSTEM_TEST_EXIT_CODE == 0 ]]
then then
echo "SUCCESS: System tests passed" echo "SUCCESS: System tests TPM $TPM_VERSION passed"
exit 0 exit 0
fi fi
echo "ERROR: System tests failed" echo "ERROR: System tests TPM $TPM_VERSION failed"
exit 1 exit 1

View File

@ -43,16 +43,16 @@ jobs:
- stage: Packaging and System Tests - stage: Packaging and System Tests
script: .ci/system-tests/./run-system-tests.sh script: .ci/system-tests/./run-system-tests.sh
env: null env: null
name: "System Tests" name: "System Tests TPM 1.2"
- stage: Packaging and System Tests - stage: Packaging and System Tests
script: .ci/system-tests/./run-system-tests-tpm2.sh script: .ci/system-tests/./run-system-tests-tpm2.sh
env: null env: null
name: "System Tests TPM2" name: "System Tests TPM 2.0"
- stage: Packaging and System Tests - stage: Packaging and System Tests
script: .ci/system-tests/./run-system-tests-tpm2-base-delta-bad.sh script: .ci/system-tests/./run-system-tests-tpm2-base-delta-bad.sh
env: null env: null
name: "System Tests TPM2 Base/Delta Bad" name: "System Tests TPM 2.0 Base/Delta(Bad)"
- stage: Packaging and System Tests - stage: Packaging and System Tests
script: .ci/system-tests/./run-system-tests-tpm2-base-delta-good.sh script: .ci/system-tests/./run-system-tests-tpm2-base-delta-good.sh
env: null env: null
name: "System Tests TPM2 Base/Delta Good" name: "System Tests TPM 2.0 Base/Delta(Good)"

View File

@ -1,13 +1,12 @@
package hirs.utils; package hirs.utils;
import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URL;
/** /**
* Utility class to get the current version from the VERSION file. * Utility class to get the current version from the VERSION file.
@ -59,16 +58,7 @@ public final class VersionHelper {
* @throws IOException * @throws IOException
*/ */
private static String getFileContents(final String filename) throws IOException { private static String getFileContents(final String filename) throws IOException {
URL url = Resources.getResource(filename);
File versionFileLink = new File(VersionHelper.class.getClassLoader() return Resources.toString(url, Charsets.UTF_8).trim();
.getResource(filename).getFile());
String versionFilePath = versionFileLink.getCanonicalPath();
BufferedReader reader = new BufferedReader(
new InputStreamReader(
new FileInputStream(versionFilePath), "UTF-8"));
String version = reader.readLine();
reader.close();
return version;
} }
} }

View File

@ -14,8 +14,8 @@ public class VersionHelperTest {
@Test @Test
public void testGetVersionFail() { public void testGetVersionFail() {
String version = VersionHelper.getVersion("somefile"); String actual = VersionHelper.getVersion("somefile");
Assert.assertEquals(version, ""); Assert.assertTrue(actual.startsWith(""));
} }
/** /**