diff --git a/.ci/dev/gkeStorageClass.yml b/.ci/dev/gkeStorageClass.yml new file mode 100644 index 0000000000..f1a09eadb2 --- /dev/null +++ b/.ci/dev/gkeStorageClass.yml @@ -0,0 +1,8 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: testing-storage +provisioner: kubernetes.io/gce-pd +parameters: + type: pd-standard + replication-type: none \ No newline at end of file diff --git a/.ci/dev/regression/Jenkinsfile b/.ci/dev/regression/Jenkinsfile index a880c0611f..fbc61e78ac 100644 --- a/.ci/dev/regression/Jenkinsfile +++ b/.ci/dev/regression/Jenkinsfile @@ -4,7 +4,7 @@ import static com.r3.build.BuildControl.killAllExistingBuildsForJob killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger()) pipeline { - agent { label 'k8s' } + agent { label 'gke' } options { timestamps() buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7')) @@ -26,7 +26,7 @@ pipeline { "-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " + "-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " + "-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" + - " clean pushBuildImage preAllocateForParallelRegressionTest --stacktrace" + " clean pushBuildImage --stacktrace" } sh "kubectl auth can-i get pods" } @@ -42,7 +42,7 @@ pipeline { "-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " + "-Dgit.branch=\"\${GIT_BRANCH}\" " + "-Dgit.target.branch=\"\${GIT_BRANCH}\" " + - " deAllocateForParallelRegressionTest parallelRegressionTest --stacktrace" + " parallelRegressionTest --stacktrace" } } } diff --git a/Jenkinsfile b/Jenkinsfile index fa9e23f174..cdfb41663a 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -23,7 +23,7 @@ pipeline { "-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " + "-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " + "-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" + - " clean pushBuildImage preAllocateForAllParallelIntegrationTest --stacktrace" + " clean pushBuildImage preAllocateForAllParallelUnitAndIntegrationTest --stacktrace" } sh "kubectl auth can-i get pods" } @@ -31,7 +31,7 @@ pipeline { stage('Corda Pull Request - Run Tests') { parallel { - stage('Integration Tests') { + stage('Integration and Unit Tests') { steps { sh "./gradlew " + "-DbuildId=\"\${BUILD_ID}\" " + @@ -41,21 +41,10 @@ pipeline { "-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " + "-Dgit.branch=\"\${GIT_BRANCH}\" " + "-Dgit.target.branch=\"\${CHANGE_TARGET}\" " + - " deAllocateForAllParallelIntegrationTest allParallelIntegrationTest --stacktrace" + " deAllocateForAllParallelUnitAndIntegrationTest allParallelUnitAndIntegrationTest --stacktrace" } } -// stage('Unit Tests') { -// steps { -// sh "./gradlew " + -// "-DbuildId=\"\${BUILD_ID}\" " + -// "-Dkubenetize=true " + -// "-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\"" + -// " deAllocateForAllParallelUnitTest allParallelUnitTest --stacktrace" -// } -// } - } - } } diff --git a/build.gradle b/build.gradle index f57ef9270f..ff88d585dd 100644 --- a/build.gradle +++ b/build.gradle @@ -627,15 +627,15 @@ task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) { streamOutput false coresPerFork 6 memoryInGbPerFork 10 - distribute DistributeTestsBy.CLASS + distribute DistributeTestsBy.METHOD } task parallelRegressionTest(type: ParallelTestGroup) { testGroups "test", "integrationTest", "slowIntegrationTest", "smokeTest" - numberOfShards 5 + numberOfShards 6 streamOutput false coresPerFork 6 memoryInGbPerFork 10 - distribute DistributeTestsBy.CLASS + distribute DistributeTestsBy.METHOD } task allParallelSmokeTest(type: ParallelTestGroup) { testGroups "slowIntegrationTest", "smokeTest" diff --git a/buildSrc/src/main/groovy/net/corda/testing/BucketingAllocator.java b/buildSrc/src/main/groovy/net/corda/testing/BucketingAllocator.java index 95fc3f0da3..14c2ea1c07 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/BucketingAllocator.java +++ b/buildSrc/src/main/groovy/net/corda/testing/BucketingAllocator.java @@ -20,12 +20,17 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static net.corda.testing.ListTests.DISTRIBUTION_PROPERTY; + public class BucketingAllocator { private static final Logger LOG = LoggerFactory.getLogger(BucketingAllocator.class); private final List forkContainers; private final Supplier timedTestsProvider; private List> sources = new ArrayList<>(); + private DistributeTestsBy distribution = System.getProperty(DISTRIBUTION_PROPERTY) != null && !System.getProperty(DISTRIBUTION_PROPERTY).isEmpty() ? + DistributeTestsBy.valueOf(System.getProperty(DISTRIBUTION_PROPERTY)) : DistributeTestsBy.METHOD; + public BucketingAllocator(Integer forkCount, Supplier timedTestsProvider) { this.forkContainers = IntStream.range(0, forkCount).mapToObj(TestsForForkContainer::new).collect(Collectors.toList()); @@ -104,7 +109,17 @@ public class BucketingAllocator { // If the gradle task is distributing by class rather than method, then 'testName' will be the className // and not className.testName // No matter which it is, we return the mean test duration as the duration value if not found. - final List> matchingTests = tests.startsWith(testName); + final List> matchingTests; + switch (distribution) { + case METHOD: + matchingTests = tests.equals(testName); + break; + case CLASS: + matchingTests = tests.startsWith(testName); + break; + default: + throw new IllegalArgumentException("Unknown distribution type: " + distribution); + } return new TestBucket(task, testName, matchingTests); }).sorted(Comparator.comparing(TestBucket::getDuration).reversed()).collect(Collectors.toList()); diff --git a/buildSrc/src/main/groovy/net/corda/testing/KubesTest.java b/buildSrc/src/main/groovy/net/corda/testing/KubesTest.java index d47979c0c5..984d16f175 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/KubesTest.java +++ b/buildSrc/src/main/groovy/net/corda/testing/KubesTest.java @@ -32,7 +32,10 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.io.RandomAccessFile; import java.math.BigInteger; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -139,7 +142,26 @@ public class KubesTest extends DefaultTask { } @NotNull - private KubernetesClient getKubernetesClient() { + private synchronized KubernetesClient getKubernetesClient() { + + try (RandomAccessFile file = new RandomAccessFile("/tmp/refresh.lock", "rw"); + FileChannel c = file.getChannel(); + FileLock lock = c.lock()) { + + getProject().getLogger().quiet("Invoking kubectl to attempt to refresh token"); + ProcessBuilder tokenRefreshCommand = new ProcessBuilder().command("kubectl", "auth", "can-i", "get", "pods"); + Process refreshProcess = tokenRefreshCommand.start(); + int resultCodeOfRefresh = refreshProcess.waitFor(); + getProject().getLogger().quiet("Completed Token refresh"); + + if (resultCodeOfRefresh != 0) { + throw new RuntimeException("Failed to invoke kubectl to refresh tokens"); + } + + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder() .withConnectionTimeout(DEFAULT_K8S_TIMEOUT_VALUE_MILLIES) .withRequestTimeout(DEFAULT_K8S_TIMEOUT_VALUE_MILLIES) diff --git a/buildSrc/src/main/groovy/net/corda/testing/ListTests.java b/buildSrc/src/main/groovy/net/corda/testing/ListTests.java index a2ee34eda0..7c7e2804c4 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/ListTests.java +++ b/buildSrc/src/main/groovy/net/corda/testing/ListTests.java @@ -6,6 +6,7 @@ import io.github.classgraph.ClassInfoList; import org.gradle.api.DefaultTask; import org.gradle.api.file.FileCollection; import org.gradle.api.tasks.TaskAction; +import org.jetbrains.annotations.NotNull; import java.math.BigInteger; import java.util.ArrayList; @@ -13,6 +14,7 @@ import java.util.Collection; import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; interface TestLister { List getAllTestsDiscovered(); @@ -47,23 +49,7 @@ public class ListTests extends DefaultTask implements TestLister { Collection results; switch (distribution) { case METHOD: - results = new ClassGraph() - .enableClassInfo() - .enableMethodInfo() - .ignoreClassVisibility() - .ignoreMethodVisibility() - .enableAnnotationInfo() - .overrideClasspath(scanClassPath) - .scan() - .getClassesWithMethodAnnotation("org.junit.Test") - .stream() - .map(classInfo -> { - ClassInfoList returnList = new ClassInfoList(); - returnList.add(classInfo); - returnList.addAll(classInfo.getSubclasses()); - return returnList; - }) - .flatMap(ClassInfoList::stream) + results = getClassGraphStreamOfTestClasses() .map(classInfo -> classInfo.getMethodInfo().filter(methodInfo -> methodInfo.hasAnnotation("org.junit.Test")) .stream().map(methodInfo -> classInfo.getName() + "." + methodInfo.getName())) .flatMap(Function.identity()) @@ -72,28 +58,32 @@ public class ListTests extends DefaultTask implements TestLister { this.allTests = results.stream().sorted().collect(Collectors.toList()); break; case CLASS: - results = new ClassGraph() - .enableClassInfo() - .enableMethodInfo() - .ignoreClassVisibility() - .ignoreMethodVisibility() - .enableAnnotationInfo() - .overrideClasspath(scanClassPath) - .scan() - .getClassesWithMethodAnnotation("org.junit.Test") - .stream() - .map(classInfo -> { - ClassInfoList returnList = new ClassInfoList(); - returnList.add(classInfo); - returnList.addAll(classInfo.getSubclasses()); - return returnList; - }) - .flatMap(ClassInfoList::stream) + results = getClassGraphStreamOfTestClasses() .map(ClassInfo::getName) .collect(Collectors.toSet()); this.allTests = results.stream().sorted().collect(Collectors.toList()); break; } - getProject().getLogger().lifecycle("THESE ARE ALL THE TESTSSS!!!!!!!!: " + allTests.toString()); + } + + @NotNull + private Stream getClassGraphStreamOfTestClasses() { + return new ClassGraph() + .enableClassInfo() + .enableMethodInfo() + .ignoreClassVisibility() + .ignoreMethodVisibility() + .enableAnnotationInfo() + .overrideClasspath(scanClassPath) + .scan() + .getClassesWithMethodAnnotation("org.junit.Test") + .stream() + .map(classInfo -> { + ClassInfoList returnList = new ClassInfoList(); + returnList.add(classInfo); + returnList.addAll(classInfo.getSubclasses()); + return returnList; + }) + .flatMap(ClassInfoList::stream); } } \ No newline at end of file diff --git a/buildSrc/src/main/groovy/net/corda/testing/Tests.java b/buildSrc/src/main/groovy/net/corda/testing/Tests.java index 5c11a7cc22..26e01e1db0 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/Tests.java +++ b/buildSrc/src/main/groovy/net/corda/testing/Tests.java @@ -170,6 +170,20 @@ public class Tests { return results; } + @NotNull + List> equals(@NotNull final String testPrefix) { + List> results = this.tests.keySet().stream() + .filter(t -> t.equals(testPrefix)) + .map(t -> new Tuple2<>(t, getDuration(t))) + .collect(Collectors.toList()); + // We don't know if the testPrefix is a classname or classname.methodname (exact match). + if (results == null || results.isEmpty()) { + LOG.warn("In {} previously executed tests, could not find any starting with {}", tests.size(), testPrefix); + results = Arrays.asList(new Tuple2<>(testPrefix, getMeanDurationForTests())); + } + return results; + } + /** * How many times has this function been run? Every call to addDuration increments the current value. * diff --git a/docs/source/api-flows.rst b/docs/source/api-flows.rst index 3732ee8cc1..2438352ca6 100644 --- a/docs/source/api-flows.rst +++ b/docs/source/api-flows.rst @@ -611,9 +611,17 @@ flow to receive the transaction: :dedent: 12 ``idOfTxWeSigned`` is an optional parameter used to confirm that we got the right transaction. It comes from using ``SignTransactionFlow`` -which is described below. +which is described in the error handling behaviour section. -**Error handling behaviour** +Finalizing transactions with only one participant +................................................. + +In some cases, transactions will only have one participant, the initiator. In these instances, there are no other +parties to send the transactions to during ``FinalityFlow``. In these cases the ``counterpartySession`` list must exist, +but be empty. + +Error handling behaviour +........................ Once a transaction has been notarised and its input states consumed by the flow initiator (eg. sender), should the participant(s) receiving the transaction fail to verify it, or the receiving flow (the finality handler) fails due to some other error, we then have a scenario where not diff --git a/docs/source/flow-state-machines.rst b/docs/source/flow-state-machines.rst index bd2fd638d7..46fbf1eb28 100644 --- a/docs/source/flow-state-machines.rst +++ b/docs/source/flow-state-machines.rst @@ -337,6 +337,13 @@ transaction that uses them. This flow returns a list of ``LedgerTransaction`` ob we don't download a transaction from the peer, they know we must have already seen it before. Fixing this privacy leak will come later. +Finalizing transactions with only one participant +................................................. + +In some cases, transactions will only have one participant, the initiator. In these instances, there are no other +parties to send the transactions to during ``FinalityFlow``. In these cases the ``counterpartySession`` list must exist, +but be empty. + CollectSignaturesFlow/SignTransactionFlow ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ We also invoke two other subflows: diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt index 7ce883c37b..b1f22d42e2 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt @@ -1061,7 +1061,7 @@ class StatemachineErrorHandlingTest { * that 3 retries are attempted before recovering. */ @Test - fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully - responding flow`() { + fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() { startDriver { val charlie = createBytemanNode(CHARLIE_NAME) val alice = createNode(ALICE_NAME) @@ -1156,7 +1156,7 @@ class StatemachineErrorHandlingTest { * the responding flow to recover and finish its flow. */ @Test - fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists - responding flow`() { + fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists`() { startDriver { val charlie = createBytemanNode(CHARLIE_NAME) val alice = createNode(ALICE_NAME) @@ -1244,7 +1244,7 @@ class StatemachineErrorHandlingTest { * succeeds and the flow finishes. */ @Test - fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully - responding flow`() { + fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() { startDriver { val charlie = createBytemanNode(CHARLIE_NAME) val alice = createNode(ALICE_NAME) @@ -1340,7 +1340,7 @@ class StatemachineErrorHandlingTest { * send to the responding node and the responding node successfully received it. */ @Test - fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation`() { + fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation` () { startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) { val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS) val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)