mirror of
synced 2025-03-21 11:35:57 +00:00
Merge branch 'release/os/4.3' of https://github.com/corda/corda into EdP/CORDA-3446-4.4
This commit is contained in:
Normal file
Normal file
@ -0,0 +1,8 @@
apiVersion: storage.k8s.io/v1
kind: StorageClass
name: testing-storage
provisioner: kubernetes.io/gce-pd
type: pd-standard
replication-type: none
@ -4,7 +4,7 @@ import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
agent { label 'k8s' }
agent { label 'gke' }
options {
buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7'))
@ -26,7 +26,7 @@ pipeline {
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage preAllocateForParallelRegressionTest --stacktrace"
" clean pushBuildImage --stacktrace"
sh "kubectl auth can-i get pods"
@ -42,7 +42,7 @@ pipeline {
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" deAllocateForParallelRegressionTest parallelRegressionTest --stacktrace"
" parallelRegressionTest --stacktrace"
@ -23,7 +23,7 @@ pipeline {
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage preAllocateForAllParallelIntegrationTest --stacktrace"
" clean pushBuildImage preAllocateForAllParallelUnitAndIntegrationTest --stacktrace"
sh "kubectl auth can-i get pods"
@ -31,7 +31,7 @@ pipeline {
stage('Corda Pull Request - Run Tests') {
parallel {
stage('Integration Tests') {
stage('Integration and Unit Tests') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
@ -41,21 +41,10 @@ pipeline {
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${CHANGE_TARGET}\" " +
" deAllocateForAllParallelIntegrationTest allParallelIntegrationTest --stacktrace"
" deAllocateForAllParallelUnitAndIntegrationTest allParallelUnitAndIntegrationTest --stacktrace"
// stage('Unit Tests') {
// steps {
// sh "./gradlew " +
// "-DbuildId=\"\${BUILD_ID}\" " +
// "-Dkubenetize=true " +
// "-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\"" +
// " deAllocateForAllParallelUnitTest allParallelUnitTest --stacktrace"
// }
// }
@ -627,15 +627,15 @@ task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute DistributeTestsBy.CLASS
distribute DistributeTestsBy.METHOD
task parallelRegressionTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest", "slowIntegrationTest", "smokeTest"
numberOfShards 5
numberOfShards 6
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute DistributeTestsBy.CLASS
distribute DistributeTestsBy.METHOD
task allParallelSmokeTest(type: ParallelTestGroup) {
testGroups "slowIntegrationTest", "smokeTest"
@ -20,12 +20,17 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static net.corda.testing.ListTests.DISTRIBUTION_PROPERTY;
public class BucketingAllocator {
private static final Logger LOG = LoggerFactory.getLogger(BucketingAllocator.class);
private final List<TestsForForkContainer> forkContainers;
private final Supplier<Tests> timedTestsProvider;
private List<Tuple2<TestLister, Object>> sources = new ArrayList<>();
private DistributeTestsBy distribution = System.getProperty(DISTRIBUTION_PROPERTY) != null && !System.getProperty(DISTRIBUTION_PROPERTY).isEmpty() ?
DistributeTestsBy.valueOf(System.getProperty(DISTRIBUTION_PROPERTY)) : DistributeTestsBy.METHOD;
public BucketingAllocator(Integer forkCount, Supplier<Tests> timedTestsProvider) {
this.forkContainers = IntStream.range(0, forkCount).mapToObj(TestsForForkContainer::new).collect(Collectors.toList());
@ -104,7 +109,17 @@ public class BucketingAllocator {
// If the gradle task is distributing by class rather than method, then 'testName' will be the className
// and not className.testName
// No matter which it is, we return the mean test duration as the duration value if not found.
final List<Tuple2<String, Long>> matchingTests = tests.startsWith(testName);
final List<Tuple2<String, Long>> matchingTests;
switch (distribution) {
case METHOD:
matchingTests = tests.equals(testName);
case CLASS:
matchingTests = tests.startsWith(testName);
throw new IllegalArgumentException("Unknown distribution type: " + distribution);
return new TestBucket(task, testName, matchingTests);
@ -32,7 +32,10 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.RandomAccessFile;
import java.math.BigInteger;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
@ -139,7 +142,26 @@ public class KubesTest extends DefaultTask {
private KubernetesClient getKubernetesClient() {
private synchronized KubernetesClient getKubernetesClient() {
try (RandomAccessFile file = new RandomAccessFile("/tmp/refresh.lock", "rw");
FileChannel c = file.getChannel();
FileLock lock = c.lock()) {
getProject().getLogger().quiet("Invoking kubectl to attempt to refresh token");
ProcessBuilder tokenRefreshCommand = new ProcessBuilder().command("kubectl", "auth", "can-i", "get", "pods");
Process refreshProcess = tokenRefreshCommand.start();
int resultCodeOfRefresh = refreshProcess.waitFor();
getProject().getLogger().quiet("Completed Token refresh");
if (resultCodeOfRefresh != 0) {
throw new RuntimeException("Failed to invoke kubectl to refresh tokens");
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder()
@ -6,6 +6,7 @@ import io.github.classgraph.ClassInfoList;
import org.gradle.api.DefaultTask;
import org.gradle.api.file.FileCollection;
import org.gradle.api.tasks.TaskAction;
import org.jetbrains.annotations.NotNull;
import java.math.BigInteger;
import java.util.ArrayList;
@ -13,6 +14,7 @@ import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
interface TestLister {
List<String> getAllTestsDiscovered();
@ -47,23 +49,7 @@ public class ListTests extends DefaultTask implements TestLister {
Collection<String> results;
switch (distribution) {
case METHOD:
results = new ClassGraph()
.map(classInfo -> {
ClassInfoList returnList = new ClassInfoList();
return returnList;
results = getClassGraphStreamOfTestClasses()
.map(classInfo -> classInfo.getMethodInfo().filter(methodInfo -> methodInfo.hasAnnotation("org.junit.Test"))
.stream().map(methodInfo -> classInfo.getName() + "." + methodInfo.getName()))
@ -72,28 +58,32 @@ public class ListTests extends DefaultTask implements TestLister {
this.allTests = results.stream().sorted().collect(Collectors.toList());
case CLASS:
results = new ClassGraph()
.map(classInfo -> {
ClassInfoList returnList = new ClassInfoList();
return returnList;
results = getClassGraphStreamOfTestClasses()
this.allTests = results.stream().sorted().collect(Collectors.toList());
getProject().getLogger().lifecycle("THESE ARE ALL THE TESTSSS!!!!!!!!: " + allTests.toString());
private Stream<ClassInfo> getClassGraphStreamOfTestClasses() {
return new ClassGraph()
.map(classInfo -> {
ClassInfoList returnList = new ClassInfoList();
return returnList;
@ -170,6 +170,20 @@ public class Tests {
return results;
List<Tuple2<String, Long>> equals(@NotNull final String testPrefix) {
List<Tuple2<String, Long>> results = this.tests.keySet().stream()
.filter(t -> t.equals(testPrefix))
.map(t -> new Tuple2<>(t, getDuration(t)))
// We don't know if the testPrefix is a classname or classname.methodname (exact match).
if (results == null || results.isEmpty()) {
LOG.warn("In {} previously executed tests, could not find any starting with {}", tests.size(), testPrefix);
results = Arrays.asList(new Tuple2<>(testPrefix, getMeanDurationForTests()));
return results;
* How many times has this function been run? Every call to addDuration increments the current value.
@ -611,9 +611,17 @@ flow to receive the transaction:
:dedent: 12
``idOfTxWeSigned`` is an optional parameter used to confirm that we got the right transaction. It comes from using ``SignTransactionFlow``
which is described below.
which is described in the error handling behaviour section.
**Error handling behaviour**
Finalizing transactions with only one participant
In some cases, transactions will only have one participant, the initiator. In these instances, there are no other
parties to send the transactions to during ``FinalityFlow``. In these cases the ``counterpartySession`` list must exist,
but be empty.
Error handling behaviour
Once a transaction has been notarised and its input states consumed by the flow initiator (eg. sender), should the participant(s) receiving the
transaction fail to verify it, or the receiving flow (the finality handler) fails due to some other error, we then have a scenario where not
@ -337,6 +337,13 @@ transaction that uses them. This flow returns a list of ``LedgerTransaction`` ob
we don't download a transaction from the peer, they know we must have already seen it before. Fixing this privacy
leak will come later.
Finalizing transactions with only one participant
In some cases, transactions will only have one participant, the initiator. In these instances, there are no other
parties to send the transactions to during ``FinalityFlow``. In these cases the ``counterpartySession`` list must exist,
but be empty.
We also invoke two other subflows:
@ -1061,7 +1061,7 @@ class StatemachineErrorHandlingTest {
* that 3 retries are attempted before recovering.
fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully - responding flow`() {
fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and complete successfully`() {
startDriver {
val charlie = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
@ -1156,7 +1156,7 @@ class StatemachineErrorHandlingTest {
* the responding flow to recover and finish its flow.
fun `error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists - responding flow`() {
fun `responding flow - error during transition with CommitTransaction action that occurs during the beginning of execution will retry and be kept for observation if error persists`() {
startDriver {
val charlie = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
@ -1244,7 +1244,7 @@ class StatemachineErrorHandlingTest {
* succeeds and the flow finishes.
fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully - responding flow`() {
fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
startDriver {
val charlie = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
@ -1340,7 +1340,7 @@ class StatemachineErrorHandlingTest {
* send to the responding node and the responding node successfully received it.
fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation`() {
fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation` () {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
Reference in New Issue
Block a user