mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
Merge branch 'release/os/4.3' into CORDA-3304-rpc-max-retries
This commit is contained in:
commit
42e364386d
26
.ci/dev/unit/Jenkinsfile
vendored
26
.ci/dev/unit/Jenkinsfile
vendored
@ -29,20 +29,18 @@ pipeline {
|
||||
}
|
||||
}
|
||||
|
||||
stage('Corda Pull Request - Run Tests') {
|
||||
stage('Unit Tests') {
|
||||
steps {
|
||||
sh "./gradlew " +
|
||||
"-DbuildId=\"\${BUILD_ID}\" " +
|
||||
"-Dkubenetize=true " +
|
||||
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
|
||||
" allParallelUnitTest"
|
||||
if (env.CHANGE_ID) {
|
||||
pullRequest.createStatus(status: 'success',
|
||||
context: 'continuous-integration/jenkins/pr-merge/unitTest',
|
||||
description: 'Unit Tests Passed',
|
||||
targetUrl: "${env.JOB_URL}/testResults")
|
||||
}
|
||||
stage('Unit Tests') {
|
||||
steps {
|
||||
sh "./gradlew " +
|
||||
"-DbuildId=\"\${BUILD_ID}\" " +
|
||||
"-Dkubenetize=true " +
|
||||
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
|
||||
" allParallelUnitTest"
|
||||
if (env.CHANGE_ID) {
|
||||
pullRequest.createStatus(status: 'success',
|
||||
context: 'continuous-integration/jenkins/pr-merge/unitTest',
|
||||
description: 'Unit Tests Passed',
|
||||
targetUrl: "${env.JOB_URL}/testResults")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
4
.idea/codeStyles/Project.xml
generated
4
.idea/codeStyles/Project.xml
generated
@ -7,6 +7,10 @@
|
||||
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
|
||||
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
|
||||
</JavaCodeStyleSettings>
|
||||
<GroovyCodeStyleSettings>
|
||||
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
|
||||
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
|
||||
</GroovyCodeStyleSettings>
|
||||
<JetCodeStyleSettings>
|
||||
<option name="PACKAGES_TO_USE_STAR_IMPORTS">
|
||||
<value>
|
||||
|
3
Jenkinsfile
vendored
3
Jenkinsfile
vendored
@ -1,3 +1,4 @@
|
||||
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
|
||||
@Library('existing-build-control')
|
||||
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
|
||||
|
||||
@ -8,7 +9,7 @@ pipeline {
|
||||
options { timestamps() }
|
||||
|
||||
environment {
|
||||
DOCKER_TAG_TO_USE = "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}"
|
||||
DOCKER_TAG_TO_USE = "${env.GIT_COMMIT.subSequence(0, 8)}"
|
||||
EXECUTOR_NUMBER = "${env.EXECUTOR_NUMBER}"
|
||||
BUILD_ID = "${env.BUILD_ID}-${env.JOB_NAME}"
|
||||
}
|
||||
|
18
build.gradle
18
build.gradle
@ -612,6 +612,14 @@ task allParallelSlowIntegrationTest(type: ParallelTestGroup) {
|
||||
memoryInGbPerFork 10
|
||||
distribute Distribution.CLASS
|
||||
}
|
||||
task allParallelSmokeTest(type: ParallelTestGroup) {
|
||||
testGroups "smokeTest"
|
||||
numberOfShards 4
|
||||
streamOutput true
|
||||
coresPerFork 6
|
||||
memoryInGbPerFork 10
|
||||
distribute Distribution.METHOD
|
||||
}
|
||||
task allParallelUnitTest(type: ParallelTestGroup) {
|
||||
testGroups "test"
|
||||
numberOfShards 10
|
||||
@ -623,7 +631,15 @@ task allParallelUnitTest(type: ParallelTestGroup) {
|
||||
task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
|
||||
testGroups "test", "integrationTest"
|
||||
numberOfShards 15
|
||||
streamOutput true
|
||||
streamOutput false
|
||||
coresPerFork 6
|
||||
memoryInGbPerFork 10
|
||||
distribute Distribution.CLASS
|
||||
}
|
||||
task parallelRegressionTest(type: ParallelTestGroup) {
|
||||
testGroups "test", "integrationTest", "slowIntegrationTest", "smokeTest"
|
||||
numberOfShards 5
|
||||
streamOutput false
|
||||
coresPerFork 6
|
||||
memoryInGbPerFork 10
|
||||
distribute Distribution.CLASS
|
||||
|
@ -51,13 +51,12 @@ public class BucketingAllocator {
|
||||
System.out.println("Number of tests: " + container.testsForFork.stream().mapToInt(b -> b.foundTests.size()).sum());
|
||||
System.out.println("Tests to Run: ");
|
||||
container.testsForFork.forEach(tb -> {
|
||||
System.out.println(tb.nameWithAsterix);
|
||||
System.out.println(tb.testName);
|
||||
tb.foundTests.forEach(ft -> System.out.println("\t" + ft.getFirst() + ", " + ft.getSecond()));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
private void allocateTestsToForks(@NotNull List<TestBucket> matchedTests) {
|
||||
matchedTests.forEach(matchedTestBucket -> {
|
||||
TestsForForkContainer smallestContainer = Collections.min(forkContainers, Comparator.comparing(TestsForForkContainer::getCurrentDuration));
|
||||
@ -69,10 +68,9 @@ public class BucketingAllocator {
|
||||
return allDiscoveredTests.stream().map(tuple -> {
|
||||
String testName = tuple.getFirst();
|
||||
Object task = tuple.getSecond();
|
||||
String noAsterixName = testName.substring(0, testName.length() - 1);
|
||||
//2DO [can this filtering algorithm be improved - the test names are sorted, it should be possible to do something using binary search]
|
||||
List<Tuple2<String, Double>> matchingTests = allTestsFromCSV.stream().filter(testFromCSV -> testFromCSV.getFirst().startsWith(noAsterixName)).collect(Collectors.toList());
|
||||
return new TestBucket(task, testName, noAsterixName, matchingTests);
|
||||
List<Tuple2<String, Double>> matchingTests = allTestsFromCSV.stream().filter(testFromCSV -> testFromCSV.getFirst().startsWith(testName)).collect(Collectors.toList());
|
||||
return new TestBucket(task, testName, matchingTests);
|
||||
}).sorted(Comparator.comparing(TestBucket::getDuration).reversed()).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@ -81,22 +79,20 @@ public class BucketingAllocator {
|
||||
TestLister lister = source.getFirst();
|
||||
Object testTask = source.getSecond();
|
||||
return lister.getAllTestsDiscovered().stream().map(test -> new Tuple2<>(test, testTask)).collect(Collectors.toList());
|
||||
}).flatMap(Collection::stream).collect(Collectors.toList());
|
||||
}).flatMap(Collection::stream).sorted(Comparator.comparing(Tuple2::getFirst)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static class TestBucket {
|
||||
final Object testTask;
|
||||
final String nameWithAsterix;
|
||||
final String nameWithoutAsterix;
|
||||
final String testName;
|
||||
final List<Tuple2<String, Double>> foundTests;
|
||||
final Double duration;
|
||||
|
||||
public TestBucket(Object testTask, String nameWithAsterix, String nameWithoutAsterix, List<Tuple2<String, Double>> foundTests) {
|
||||
public TestBucket(Object testTask, String testName, List<Tuple2<String, Double>> foundTests) {
|
||||
this.testTask = testTask;
|
||||
this.nameWithAsterix = nameWithAsterix;
|
||||
this.nameWithoutAsterix = nameWithoutAsterix;
|
||||
this.testName = testName;
|
||||
this.foundTests = foundTests;
|
||||
duration = Math.max(foundTests.stream().mapToDouble(tp -> Math.max(tp.getSecond(), 10)).sum(), 10);
|
||||
duration = Math.max(foundTests.stream().mapToDouble(tp -> Math.max(tp.getSecond(), 1)).sum(), 1);
|
||||
}
|
||||
|
||||
public Double getDuration() {
|
||||
@ -107,8 +103,7 @@ public class BucketingAllocator {
|
||||
public String toString() {
|
||||
return "TestBucket{" +
|
||||
"testTask=" + testTask +
|
||||
", nameWithAsterix='" + nameWithAsterix + '\'' +
|
||||
", nameWithoutAsterix='" + nameWithoutAsterix + '\'' +
|
||||
", nameWithAsterix='" + testName + '\'' +
|
||||
", foundTests=" + foundTests +
|
||||
", duration=" + duration +
|
||||
'}';
|
||||
@ -142,7 +137,7 @@ public class BucketingAllocator {
|
||||
}
|
||||
|
||||
public List<String> getTestsForTask(Object task) {
|
||||
return frozenTests.getOrDefault(task, Collections.emptyList()).stream().map(it -> it.nameWithAsterix).collect(Collectors.toList());
|
||||
return frozenTests.getOrDefault(task, Collections.emptyList()).stream().map(it -> it.testName).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public List<TestBucket> getBucketsForFork() {
|
||||
|
@ -41,8 +41,8 @@ public class BucketingAllocatorTask extends DefaultTask {
|
||||
this.dependsOn(source);
|
||||
}
|
||||
|
||||
public List<String> getTestsForForkAndTestTask(Integer fork, Test testTask) {
|
||||
return allocator.getTestsForForkAndTestTask(fork, testTask);
|
||||
public List<String> getTestIncludesForForkAndTestTask(Integer fork, Test testTask) {
|
||||
return allocator.getTestsForForkAndTestTask(fork, testTask).stream().map(t -> t + "*").collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@TaskAction
|
||||
@ -56,11 +56,11 @@ public class BucketingAllocatorTask extends DefaultTask {
|
||||
String duration = "Duration(ms)";
|
||||
List<CSVRecord> records = CSVFormat.DEFAULT.withHeader().parse(reader).getRecords();
|
||||
return records.stream().map(record -> {
|
||||
try{
|
||||
try {
|
||||
String testName = record.get(name);
|
||||
String testDuration = record.get(duration);
|
||||
return new Tuple2<>(testName, Math.max(Double.parseDouble(testDuration), 10));
|
||||
}catch (IllegalArgumentException | IllegalStateException e){
|
||||
return new Tuple2<>(testName, Math.max(Double.parseDouble(testDuration), 1));
|
||||
} catch (IllegalArgumentException | IllegalStateException e) {
|
||||
return null;
|
||||
}
|
||||
}).filter(Objects::nonNull).sorted(Comparator.comparing(Tuple2::getFirst)).collect(Collectors.toList());
|
||||
|
@ -131,7 +131,7 @@ class DistributedTesting implements Plugin<Project> {
|
||||
filter {
|
||||
def fork = getPropertyAsInt(subProject, "dockerFork", 0)
|
||||
subProject.logger.info("requesting tests to include in testing task ${task.getPath()} (idx: ${fork})")
|
||||
List<String> includes = globalAllocator.getTestsForForkAndTestTask(
|
||||
List<String> includes = globalAllocator.getTestIncludesForForkAndTestTask(
|
||||
fork,
|
||||
task)
|
||||
subProject.logger.info "got ${includes.size()} tests to include into testing task ${task.getPath()}"
|
||||
|
@ -1,325 +0,0 @@
|
||||
package net.corda.testing
|
||||
|
||||
import io.fabric8.kubernetes.api.model.*
|
||||
import io.fabric8.kubernetes.client.*
|
||||
import io.fabric8.kubernetes.client.dsl.ExecListener
|
||||
import io.fabric8.kubernetes.client.dsl.ExecWatch
|
||||
import io.fabric8.kubernetes.client.utils.Serialization
|
||||
import okhttp3.Response
|
||||
import org.gradle.api.DefaultTask
|
||||
import org.gradle.api.GradleException
|
||||
import org.gradle.api.tasks.TaskAction
|
||||
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.ExecutorService
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.stream.Collectors
|
||||
import java.util.stream.IntStream
|
||||
|
||||
class KubesTest extends DefaultTask {
|
||||
|
||||
static final ExecutorService executorService = Executors.newCachedThreadPool()
|
||||
static final ExecutorService singleThreadedExecutor = Executors.newSingleThreadExecutor()
|
||||
|
||||
String dockerTag
|
||||
String fullTaskToExecutePath
|
||||
String taskToExecuteName
|
||||
Boolean printOutput = false
|
||||
Integer numberOfCoresPerFork = 4
|
||||
Integer memoryGbPerFork = 6
|
||||
public volatile List<File> testOutput = Collections.emptyList()
|
||||
public volatile List<KubePodResult> containerResults = Collections.emptyList()
|
||||
|
||||
String namespace = "thisisatest"
|
||||
int k8sTimeout = 50 * 1_000
|
||||
int webSocketTimeout = k8sTimeout * 6
|
||||
int numberOfPods = 20
|
||||
int timeoutInMinutesForPodToStart = 60
|
||||
|
||||
Distribution distribution = Distribution.METHOD
|
||||
|
||||
@TaskAction
|
||||
void runTestsOnKubes() {
|
||||
|
||||
try {
|
||||
Class.forName("org.apache.commons.compress.archivers.tar.TarArchiveInputStream")
|
||||
} catch (ClassNotFoundException ignored) {
|
||||
throw new GradleException("Apache Commons compress has not be loaded, this can happen if running from within intellj - please select \"delegate to gradle\" for build and test actions")
|
||||
}
|
||||
|
||||
def buildId = System.getProperty("buildId") ? System.getProperty("buildId") :
|
||||
(project.hasProperty("corda_revision") ? project.property("corda_revision").toString() : "0")
|
||||
|
||||
def currentUser = System.getProperty("user.name") ? System.getProperty("user.name") : "UNKNOWN_USER"
|
||||
|
||||
String stableRunId = new BigInteger(64, new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode())).toString(36).toLowerCase()
|
||||
String suffix = new BigInteger(64, new Random()).toString(36).toLowerCase()
|
||||
|
||||
io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder()
|
||||
.withConnectionTimeout(k8sTimeout)
|
||||
.withRequestTimeout(k8sTimeout)
|
||||
.withRollingTimeout(k8sTimeout)
|
||||
.withWebsocketTimeout(webSocketTimeout)
|
||||
.withWebsocketPingInterval(webSocketTimeout)
|
||||
.build()
|
||||
|
||||
final KubernetesClient client = new DefaultKubernetesClient(config)
|
||||
|
||||
try {
|
||||
client.pods().inNamespace(namespace).list().getItems().forEach({ podToDelete ->
|
||||
if (podToDelete.getMetadata().name.contains(stableRunId)) {
|
||||
project.logger.lifecycle("deleting: " + podToDelete.getMetadata().getName())
|
||||
client.resource(podToDelete).delete()
|
||||
}
|
||||
})
|
||||
} catch (Exception ignored) {
|
||||
//it's possible that a pod is being deleted by the original build, this can lead to racey conditions
|
||||
}
|
||||
|
||||
List<CompletableFuture<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj({ i ->
|
||||
String potentialPodName = (taskToExecuteName + "-" + stableRunId + suffix + i).toLowerCase()
|
||||
String podName = potentialPodName.substring(0, Math.min(potentialPodName.size(), 62))
|
||||
runBuild(client, namespace, numberOfPods, i, podName, printOutput, 3)
|
||||
}).collect(Collectors.toList())
|
||||
this.testOutput = Collections.synchronizedList(futures.collect { it -> it.get().binaryResults }.flatten())
|
||||
this.containerResults = futures.collect { it -> it.get() }
|
||||
}
|
||||
|
||||
CompletableFuture<KubePodResult> runBuild(KubernetesClient client,
|
||||
String namespace,
|
||||
int numberOfPods,
|
||||
int podIdx,
|
||||
String podName,
|
||||
boolean printOutput,
|
||||
int numberOfRetries) {
|
||||
|
||||
CompletableFuture<KubePodResult> toReturn = new CompletableFuture<KubePodResult>()
|
||||
|
||||
executorService.submit({
|
||||
int tryCount = 0
|
||||
Pod createdPod = null
|
||||
while (tryCount < numberOfRetries) {
|
||||
try {
|
||||
Pod podRequest = buildPod(podName)
|
||||
project.logger.lifecycle("requesting pod: " + podName)
|
||||
createdPod = client.pods().inNamespace(namespace).create(podRequest)
|
||||
project.logger.lifecycle("scheduled pod: " + podName)
|
||||
File outputFile = Files.createTempFile("container", ".log").toFile()
|
||||
attachStatusListenerToPod(client, namespace, podName)
|
||||
schedulePodForDeleteOnShutdown(podName, client, createdPod)
|
||||
waitForPodToStart(podName, client, namespace)
|
||||
def stdOutOs = new PipedOutputStream()
|
||||
def stdOutIs = new PipedInputStream(4096)
|
||||
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
|
||||
KubePodResult result = new KubePodResult(createdPod, null, outputFile)
|
||||
CompletableFuture<KubePodResult> waiter = new CompletableFuture<>()
|
||||
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter, result)
|
||||
stdOutIs.connect(stdOutOs)
|
||||
String[] buildCommand = getBuildCommand(numberOfPods, podIdx)
|
||||
ExecWatch execWatch = client.pods().inNamespace(namespace).withName(podName)
|
||||
.writingOutput(stdOutOs)
|
||||
.writingErrorChannel(errChannelStream)
|
||||
.usingListener(execListener).exec(buildCommand)
|
||||
|
||||
startLogPumping(outputFile, stdOutIs, podIdx, printOutput)
|
||||
KubePodResult execResult = waiter.join()
|
||||
project.logger.lifecycle("build has ended on on pod ${podName} (${podIdx}/${numberOfPods})")
|
||||
project.logger.lifecycle "Gathering test results from ${execResult.createdPod.metadata.name}"
|
||||
def binaryResults = downloadTestXmlFromPod(client, namespace, execResult.createdPod)
|
||||
project.logger.lifecycle("deleting: " + execResult.createdPod.getMetadata().getName())
|
||||
client.resource(execResult.createdPod).delete()
|
||||
result.binaryResults = binaryResults
|
||||
toReturn.complete(result)
|
||||
break
|
||||
} catch (Exception e) {
|
||||
logger.error("Encountered error during testing cycle on pod ${podName} (${podIdx}/${numberOfPods})", e)
|
||||
try {
|
||||
if (createdPod) {
|
||||
client.pods().delete(createdPod)
|
||||
while (client.pods().inNamespace(namespace).list().getItems().find { p -> p.metadata.name == podName }) {
|
||||
logger.warn("pod ${podName} has not been deleted, waiting 1s")
|
||||
Thread.sleep(1000)
|
||||
}
|
||||
}
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
tryCount++
|
||||
logger.lifecycle("will retry ${podName} another ${numberOfRetries - tryCount} times")
|
||||
}
|
||||
}
|
||||
if (tryCount >= numberOfRetries) {
|
||||
toReturn.completeExceptionally(new RuntimeException("Failed to build in pod ${podName} (${podIdx}/${numberOfPods}) within retry limit"))
|
||||
}
|
||||
})
|
||||
return toReturn
|
||||
}
|
||||
|
||||
void startLogPumping(File outputFile, stdOutIs, podIdx, boolean printOutput) {
|
||||
Thread loggingThread = new Thread({ ->
|
||||
BufferedWriter out = null
|
||||
BufferedReader br = null
|
||||
try {
|
||||
out = new BufferedWriter(new FileWriter(outputFile))
|
||||
br = new BufferedReader(new InputStreamReader(stdOutIs))
|
||||
String line
|
||||
while ((line = br.readLine()) != null) {
|
||||
def toWrite = ("${taskToExecuteName}/Container" + podIdx + ": " + line).trim()
|
||||
if (printOutput) {
|
||||
project.logger.lifecycle(toWrite)
|
||||
}
|
||||
out.println(toWrite)
|
||||
}
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
finally {
|
||||
out?.close()
|
||||
br?.close()
|
||||
}
|
||||
})
|
||||
|
||||
loggingThread.setDaemon(true)
|
||||
loggingThread.start()
|
||||
}
|
||||
|
||||
ExecListener buildExecListenerForPod(podName, errChannelStream, CompletableFuture<KubePodResult> waitingFuture, KubePodResult result) {
|
||||
|
||||
new ExecListener() {
|
||||
final Long start = System.currentTimeMillis()
|
||||
@Override
|
||||
void onOpen(Response response) {
|
||||
project.logger.lifecycle("Build started on pod $podName")
|
||||
}
|
||||
|
||||
@Override
|
||||
void onFailure(Throwable t, Response response) {
|
||||
project.logger.lifecycle("Received error from rom pod $podName")
|
||||
waitingFuture.completeExceptionally(t)
|
||||
}
|
||||
|
||||
@Override
|
||||
void onClose(int code, String reason) {
|
||||
project.logger.lifecycle("Received onClose() from pod ${podName}, build took: ${(System.currentTimeMillis() - start) / 1000} seconds")
|
||||
try {
|
||||
def errChannelContents = errChannelStream.toString()
|
||||
Status status = Serialization.unmarshal(errChannelContents, Status.class);
|
||||
result.resultCode = status.details?.causes?.first()?.message?.toInteger() ? status.details?.causes?.first()?.message?.toInteger() : 0
|
||||
waitingFuture.complete(result)
|
||||
} catch (Exception e) {
|
||||
waitingFuture.completeExceptionally(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void schedulePodForDeleteOnShutdown(String podName, client, Pod createdPod) {
|
||||
project.logger.info("attaching shutdown hook for pod ${podName}")
|
||||
Runtime.getRuntime().addShutdownHook({
|
||||
println "Deleting pod: " + podName
|
||||
client.pods().delete(createdPod)
|
||||
})
|
||||
}
|
||||
|
||||
Watch attachStatusListenerToPod(KubernetesClient client, String namespace, String podName) {
|
||||
client.pods().inNamespace(namespace).withName(podName).watch(new Watcher<Pod>() {
|
||||
@Override
|
||||
void eventReceived(Watcher.Action action, Pod resource) {
|
||||
project.logger.lifecycle("[StatusChange] pod ${resource.getMetadata().getName()} ${action.name()} (${resource.status.phase})")
|
||||
}
|
||||
|
||||
@Override
|
||||
void onClose(KubernetesClientException cause) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
void waitForPodToStart(String podName, KubernetesClient client, String namespace) {
|
||||
project.logger.lifecycle("Waiting for pod " + podName + " to start before executing build")
|
||||
client.pods().inNamespace(namespace).withName(podName).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES)
|
||||
project.logger.lifecycle("pod " + podName + " has started, executing build")
|
||||
}
|
||||
|
||||
Pod buildPod(String podName) {
|
||||
return new PodBuilder().withNewMetadata().withName(podName).endMetadata()
|
||||
.withNewSpec()
|
||||
.addNewVolume()
|
||||
.withName("gradlecache")
|
||||
.withNewHostPath()
|
||||
.withPath("/tmp/gradle")
|
||||
.withType("DirectoryOrCreate")
|
||||
.endHostPath()
|
||||
.endVolume()
|
||||
.addNewContainer()
|
||||
.withImage(dockerTag)
|
||||
.withCommand("bash")
|
||||
.withArgs("-c", "sleep 3600")
|
||||
.addNewEnv()
|
||||
.withName("DRIVER_NODE_MEMORY")
|
||||
.withValue("1024m")
|
||||
.withName("DRIVER_WEB_MEMORY")
|
||||
.withValue("1024m")
|
||||
.endEnv()
|
||||
.withName(podName)
|
||||
.withNewResources()
|
||||
.addToRequests("cpu", new Quantity("${numberOfCoresPerFork}"))
|
||||
.addToRequests("memory", new Quantity("${memoryGbPerFork}Gi"))
|
||||
.endResources()
|
||||
.addNewVolumeMount()
|
||||
.withName("gradlecache")
|
||||
.withMountPath("/tmp/gradle")
|
||||
.endVolumeMount()
|
||||
.endContainer()
|
||||
.withImagePullSecrets(new LocalObjectReference("regcred"))
|
||||
.withRestartPolicy("Never")
|
||||
.endSpec()
|
||||
.build()
|
||||
}
|
||||
|
||||
String[] getBuildCommand(int numberOfPods, int podIdx) {
|
||||
return ["bash", "-c",
|
||||
"let x=1 ; while [ \${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl services.gradle.org > /dev/null 2>&1 ; x=\$? ; sleep 1 ; done ; " +
|
||||
"cd /tmp/source ; " +
|
||||
"let y=1 ; while [ \${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=\$? ; sleep 1 ; done ;" +
|
||||
"./gradlew -D${ListTests.DISTRIBUTION_PROPERTY}=${distribution.name()} -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " $fullTaskToExecutePath --info 2>&1 ;" +
|
||||
"let rs=\$? ; sleep 10 ; exit \${rs}"]
|
||||
}
|
||||
|
||||
Collection<File> downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) {
|
||||
String resultsInContainerPath = "/tmp/source/build/test-reports"
|
||||
String binaryResultsFile = "results.bin"
|
||||
String podName = cp.getMetadata().getName()
|
||||
Path tempDir = new File(new File(project.getBuildDir(), "test-results-xml"), podName).toPath()
|
||||
|
||||
if (!tempDir.toFile().exists()) {
|
||||
tempDir.toFile().mkdirs()
|
||||
}
|
||||
|
||||
project.logger.lifecycle("Saving " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath())
|
||||
client.pods()
|
||||
.inNamespace(namespace)
|
||||
.withName(podName)
|
||||
.dir(resultsInContainerPath)
|
||||
.copy(tempDir)
|
||||
|
||||
return findFolderContainingBinaryResultsFile(new File(tempDir.toFile().getAbsolutePath()), binaryResultsFile)
|
||||
}
|
||||
|
||||
List<File> findFolderContainingBinaryResultsFile(File start, String fileNameToFind) {
|
||||
Queue<File> filesToInspect = new LinkedList<>(Collections.singletonList(start))
|
||||
List<File> folders = new ArrayList<>()
|
||||
while (!filesToInspect.isEmpty()) {
|
||||
File fileToInspect = filesToInspect.poll()
|
||||
if (fileToInspect.getAbsolutePath().endsWith(fileNameToFind)) {
|
||||
folders.add(fileToInspect.parentFile)
|
||||
}
|
||||
|
||||
if (fileToInspect.isDirectory()) {
|
||||
filesToInspect.addAll(Arrays.stream(fileToInspect.listFiles()).collect(Collectors.toList()))
|
||||
}
|
||||
}
|
||||
return folders
|
||||
}
|
||||
|
||||
}
|
385
buildSrc/src/main/groovy/net/corda/testing/KubesTest.java
Normal file
385
buildSrc/src/main/groovy/net/corda/testing/KubesTest.java
Normal file
@ -0,0 +1,385 @@
|
||||
package net.corda.testing;
|
||||
|
||||
import io.fabric8.kubernetes.api.model.*;
|
||||
import io.fabric8.kubernetes.client.*;
|
||||
import io.fabric8.kubernetes.client.dsl.ExecListener;
|
||||
import io.fabric8.kubernetes.client.dsl.PodResource;
|
||||
import io.fabric8.kubernetes.client.utils.Serialization;
|
||||
import net.corda.testing.retry.Retry;
|
||||
import okhttp3.Response;
|
||||
import org.gradle.api.DefaultTask;
|
||||
import org.gradle.api.tasks.TaskAction;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.io.*;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
public class KubesTest extends DefaultTask {
|
||||
|
||||
private static final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
/**
|
||||
* Name of the k8s Secret object that holds the credentials to access the docker image registry
|
||||
*/
|
||||
private static final String REGISTRY_CREDENTIALS_SECRET_NAME = "regcred";
|
||||
|
||||
String dockerTag;
|
||||
String fullTaskToExecutePath;
|
||||
String taskToExecuteName;
|
||||
Boolean printOutput = false;
|
||||
Integer numberOfCoresPerFork = 4;
|
||||
Integer memoryGbPerFork = 6;
|
||||
public volatile List<File> testOutput = Collections.emptyList();
|
||||
public volatile List<KubePodResult> containerResults = Collections.emptyList();
|
||||
|
||||
String namespace = "thisisatest";
|
||||
int k8sTimeout = 50 * 1_000;
|
||||
int webSocketTimeout = k8sTimeout * 6;
|
||||
int numberOfPods = 20;
|
||||
int timeoutInMinutesForPodToStart = 60;
|
||||
|
||||
Distribution distribution = Distribution.METHOD;
|
||||
|
||||
@TaskAction
|
||||
public void runDistributedTests() {
|
||||
String buildId = System.getProperty("buildId", "0");
|
||||
String currentUser = System.getProperty("user.name", "UNKNOWN_USER");
|
||||
|
||||
String stableRunId = rnd64Base36(new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode()));
|
||||
String random = rnd64Base36(new Random());
|
||||
|
||||
final KubernetesClient client = getKubernetesClient();
|
||||
|
||||
try {
|
||||
client.pods().inNamespace(namespace).list().getItems().forEach(podToDelete -> {
|
||||
if (podToDelete.getMetadata().getName().contains(stableRunId)) {
|
||||
getProject().getLogger().lifecycle("deleting: " + podToDelete.getMetadata().getName());
|
||||
client.resource(podToDelete).delete();
|
||||
}
|
||||
});
|
||||
} catch (Exception ignored) {
|
||||
//it's possible that a pod is being deleted by the original build, this can lead to racey conditions
|
||||
}
|
||||
|
||||
List<Future<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj(i -> {
|
||||
String podName = taskToExecuteName.toLowerCase() + "-" + stableRunId + "-" + random + "-" + i;
|
||||
return submitBuild(client, namespace, numberOfPods, i, podName, printOutput, 3);
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
this.testOutput = Collections.synchronizedList(futures.stream().map(it -> {
|
||||
try {
|
||||
return it.get().getBinaryResults();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}).flatMap(Collection::stream).collect(Collectors.toList()));
|
||||
this.containerResults = futures.stream().map(it -> {
|
||||
try {
|
||||
return it.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private KubernetesClient getKubernetesClient() {
|
||||
io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder()
|
||||
.withConnectionTimeout(k8sTimeout)
|
||||
.withRequestTimeout(k8sTimeout)
|
||||
.withRollingTimeout(k8sTimeout)
|
||||
.withWebsocketTimeout(webSocketTimeout)
|
||||
.withWebsocketPingInterval(webSocketTimeout)
|
||||
.build();
|
||||
|
||||
return new DefaultKubernetesClient(config);
|
||||
}
|
||||
|
||||
private static String rnd64Base36(Random rnd) {
|
||||
return new BigInteger(64, rnd)
|
||||
.toString(36)
|
||||
.toLowerCase();
|
||||
}
|
||||
|
||||
private Future<KubePodResult> submitBuild(
|
||||
KubernetesClient client,
|
||||
String namespace,
|
||||
int numberOfPods,
|
||||
int podIdx,
|
||||
String podName,
|
||||
boolean printOutput,
|
||||
int numberOfRetries
|
||||
) {
|
||||
return executorService.submit(new Callable<KubePodResult>() {
|
||||
@Override
|
||||
public KubePodResult call() throws Exception {
|
||||
PersistentVolumeClaim pvc = createPvc(client, podName);
|
||||
return buildRunPodWithRetriesOrThrow(client, namespace, pvc, numberOfPods, podIdx, podName, printOutput, numberOfRetries);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static void addShutdownHook(Runnable hook) {
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(hook));
|
||||
}
|
||||
|
||||
private PersistentVolumeClaim createPvc(KubernetesClient client, String name) {
|
||||
PersistentVolumeClaim pvc = client.persistentVolumeClaims()
|
||||
.inNamespace(namespace)
|
||||
.createNew()
|
||||
|
||||
.editOrNewMetadata().withName(name).endMetadata()
|
||||
|
||||
.editOrNewSpec()
|
||||
.withAccessModes("ReadWriteOnce")
|
||||
.editOrNewResources().addToRequests("storage", new Quantity("100Mi")).endResources()
|
||||
.endSpec()
|
||||
|
||||
.done();
|
||||
|
||||
addShutdownHook(() -> {
|
||||
System.out.println("Deleing PVC: " + pvc.getMetadata().getName());
|
||||
client.persistentVolumeClaims().delete(pvc);
|
||||
});
|
||||
return pvc;
|
||||
}
|
||||
|
||||
private KubePodResult buildRunPodWithRetriesOrThrow(
|
||||
KubernetesClient client,
|
||||
String namespace,
|
||||
PersistentVolumeClaim pvc,
|
||||
int numberOfPods,
|
||||
int podIdx,
|
||||
String podName,
|
||||
boolean printOutput,
|
||||
int numberOfRetries
|
||||
) {
|
||||
addShutdownHook(() -> {
|
||||
System.out.println("deleting pod: " + podName);
|
||||
client.pods().inNamespace(namespace).withName(podName).delete();
|
||||
});
|
||||
|
||||
try {
|
||||
// pods might die, so we retry
|
||||
return Retry.fixed(numberOfRetries).run(() -> {
|
||||
// remove pod if exists
|
||||
PodResource<Pod, DoneablePod> oldPod = client.pods().inNamespace(namespace).withName(podName);
|
||||
if (oldPod.get() != null) {
|
||||
getLogger().lifecycle("deleting pod: {}", podName);
|
||||
oldPod.delete();
|
||||
while (oldPod.get() != null) {
|
||||
getLogger().info("waiting for pod {} to be removed", podName);
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
// recreate and run
|
||||
getProject().getLogger().lifecycle("creating pod: " + podName);
|
||||
Pod createdPod = client.pods().inNamespace(namespace).create(buildPodRequest(podName, pvc));
|
||||
getProject().getLogger().lifecycle("scheduled pod: " + podName);
|
||||
|
||||
attachStatusListenerToPod(client, createdPod);
|
||||
waitForPodToStart(client, createdPod);
|
||||
|
||||
PipedOutputStream stdOutOs = new PipedOutputStream();
|
||||
PipedInputStream stdOutIs = new PipedInputStream(4096);
|
||||
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
|
||||
|
||||
CompletableFuture<Integer> waiter = new CompletableFuture<>();
|
||||
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter);
|
||||
stdOutIs.connect(stdOutOs);
|
||||
client.pods().inNamespace(namespace).withName(podName)
|
||||
.writingOutput(stdOutOs)
|
||||
.writingErrorChannel(errChannelStream)
|
||||
.usingListener(execListener)
|
||||
.exec(getBuildCommand(numberOfPods, podIdx));
|
||||
|
||||
File podOutput = startLogPumping(stdOutIs, podIdx, printOutput);
|
||||
int resCode = waiter.join();
|
||||
getProject().getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + "), gathering results");
|
||||
Collection<File> binaryResults = downloadTestXmlFromPod(client, namespace, createdPod);
|
||||
return new KubePodResult(resCode, podOutput, binaryResults);
|
||||
});
|
||||
} catch (Retry.RetryException e) {
|
||||
throw new RuntimeException("Failed to build in pod " + podName + " (" + podIdx + "/" + numberOfPods + ") in " + numberOfRetries + " attempts", e);
|
||||
}
|
||||
}
|
||||
|
||||
private Pod buildPodRequest(String podName, PersistentVolumeClaim pvc) {
|
||||
return new PodBuilder()
|
||||
.withNewMetadata().withName(podName).endMetadata()
|
||||
|
||||
.withNewSpec()
|
||||
|
||||
.addNewVolume()
|
||||
.withName("gradlecache")
|
||||
.withNewHostPath()
|
||||
.withType("DirectoryOrCreate")
|
||||
.withPath("/tmp/gradle")
|
||||
.endHostPath()
|
||||
.endVolume()
|
||||
|
||||
.addNewVolume()
|
||||
.withName("testruns")
|
||||
.withNewPersistentVolumeClaim()
|
||||
.withClaimName(pvc.getMetadata().getName())
|
||||
.endPersistentVolumeClaim()
|
||||
.endVolume()
|
||||
|
||||
.addNewContainer()
|
||||
.withImage(dockerTag)
|
||||
.withCommand("bash")
|
||||
.withArgs("-c", "sleep 3600")
|
||||
.addNewEnv()
|
||||
.withName("DRIVER_NODE_MEMORY")
|
||||
.withValue("1024m")
|
||||
.withName("DRIVER_WEB_MEMORY")
|
||||
.withValue("1024m")
|
||||
.endEnv()
|
||||
.withName(podName)
|
||||
.withNewResources()
|
||||
.addToRequests("cpu", new Quantity(numberOfCoresPerFork.toString()))
|
||||
.addToRequests("memory", new Quantity(memoryGbPerFork.toString() + "Gi"))
|
||||
.endResources()
|
||||
.addNewVolumeMount().withName("gradlecache").withMountPath("/tmp/gradle").endVolumeMount()
|
||||
.addNewVolumeMount().withName("testruns").withMountPath("/test-runs").endVolumeMount()
|
||||
.endContainer()
|
||||
|
||||
.addNewImagePullSecret(REGISTRY_CREDENTIALS_SECRET_NAME)
|
||||
.withRestartPolicy("Never")
|
||||
|
||||
.endSpec()
|
||||
.build();
|
||||
}
|
||||
|
||||
private File startLogPumping(InputStream stdOutIs, int podIdx, boolean printOutput) throws IOException {
|
||||
File outputFile = Files.createTempFile("container", ".log").toFile();
|
||||
Thread loggingThread = new Thread(() -> {
|
||||
try (BufferedWriter out = new BufferedWriter(new FileWriter(outputFile));
|
||||
BufferedReader br = new BufferedReader(new InputStreamReader(stdOutIs))) {
|
||||
String line;
|
||||
while ((line = br.readLine()) != null) {
|
||||
String toWrite = ("Container" + podIdx + ": " + line).trim();
|
||||
if (printOutput) {
|
||||
getProject().getLogger().lifecycle(toWrite);
|
||||
}
|
||||
out.write(line);
|
||||
out.newLine();
|
||||
}
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
});
|
||||
|
||||
loggingThread.setDaemon(true);
|
||||
loggingThread.start();
|
||||
return outputFile;
|
||||
}
|
||||
|
||||
private Watch attachStatusListenerToPod(KubernetesClient client, Pod pod) {
|
||||
return client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).watch(new Watcher<Pod>() {
|
||||
@Override
|
||||
public void eventReceived(Watcher.Action action, Pod resource) {
|
||||
getProject().getLogger().lifecycle("[StatusChange] pod " + resource.getMetadata().getName() + " " + action.name() + " (" + resource.getStatus().getPhase() + ")");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(KubernetesClientException cause) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void waitForPodToStart(KubernetesClient client, Pod pod) {
|
||||
getProject().getLogger().lifecycle("Waiting for pod " + pod.getMetadata().getName() + " to start before executing build");
|
||||
try {
|
||||
client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
getProject().getLogger().lifecycle("pod " + pod.getMetadata().getName() + " has started, executing build");
|
||||
}
|
||||
|
||||
private Collection<File> downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) {
|
||||
String resultsInContainerPath = "/tmp/source/build/test-reports";
|
||||
String binaryResultsFile = "results.bin";
|
||||
String podName = cp.getMetadata().getName();
|
||||
Path tempDir = new File(new File(getProject().getBuildDir(), "test-results-xml"), podName).toPath();
|
||||
|
||||
if (!tempDir.toFile().exists()) {
|
||||
tempDir.toFile().mkdirs();
|
||||
}
|
||||
|
||||
getProject().getLogger().lifecycle("Saving " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath());
|
||||
client.pods()
|
||||
.inNamespace(namespace)
|
||||
.withName(podName)
|
||||
.dir(resultsInContainerPath)
|
||||
.copy(tempDir);
|
||||
|
||||
return findFolderContainingBinaryResultsFile(new File(tempDir.toFile().getAbsolutePath()), binaryResultsFile);
|
||||
}
|
||||
|
||||
private String[] getBuildCommand(int numberOfPods, int podIdx) {
|
||||
String shellScript = "let x=1 ; while [ ${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl services.gradle.org > /dev/null 2>&1 ; x=$? ; sleep 1 ; done ; " + "cd /tmp/source ; " +
|
||||
"let y=1 ; while [ ${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=$? ; sleep 1 ; done ;" +
|
||||
"./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + distribution.name() + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " --info 2>&1 ;" +
|
||||
"let rs=$? ; sleep 10 ; exit ${rs}";
|
||||
return new String[]{"bash", "-c", shellScript};
|
||||
}
|
||||
|
||||
private List<File> findFolderContainingBinaryResultsFile(File start, String fileNameToFind) {
|
||||
Queue<File> filesToInspect = new LinkedList<>(Collections.singletonList(start));
|
||||
List<File> folders = new ArrayList<>();
|
||||
while (!filesToInspect.isEmpty()) {
|
||||
File fileToInspect = filesToInspect.poll();
|
||||
if (fileToInspect.getAbsolutePath().endsWith(fileNameToFind)) {
|
||||
folders.add(fileToInspect.getParentFile());
|
||||
}
|
||||
|
||||
if (fileToInspect.isDirectory()) {
|
||||
filesToInspect.addAll(Arrays.stream(fileToInspect.listFiles()).collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
return folders;
|
||||
}
|
||||
|
||||
private ExecListener buildExecListenerForPod(String podName, ByteArrayOutputStream errChannelStream, CompletableFuture<Integer> waitingFuture) {
|
||||
|
||||
return new ExecListener() {
|
||||
final Long start = System.currentTimeMillis();
|
||||
|
||||
@Override
|
||||
public void onOpen(Response response) {
|
||||
getProject().getLogger().lifecycle("Build started on pod " + podName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t, Response response) {
|
||||
getProject().getLogger().lifecycle("Received error from rom pod " + podName);
|
||||
waitingFuture.completeExceptionally(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(int code, String reason) {
|
||||
getProject().getLogger().lifecycle("Received onClose() from pod " + podName + " , build took: " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
|
||||
try {
|
||||
String errChannelContents = errChannelStream.toString();
|
||||
Status status = Serialization.unmarshal(errChannelContents, Status.class);
|
||||
Integer resultCode = Optional.ofNullable(status).map(Status::getDetails)
|
||||
.map(StatusDetails::getCauses)
|
||||
.flatMap(c -> c.stream().findFirst())
|
||||
.map(StatusCause::getMessage)
|
||||
.map(Integer::parseInt).orElse(0);
|
||||
waitingFuture.complete(resultCode);
|
||||
} catch (Exception e) {
|
||||
waitingFuture.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
@ -79,7 +79,7 @@ class ListTests extends DefaultTask implements TestLister {
|
||||
.collect { c -> (c.getSubclasses() + Collections.singletonList(c)) }
|
||||
.flatten()
|
||||
.collect { ClassInfo c ->
|
||||
c.getMethodInfo().filter { m -> m.hasAnnotation("org.junit.Test") }.collect { m -> c.name + "." + m.name + "*" }
|
||||
c.getMethodInfo().filter { m -> m.hasAnnotation("org.junit.Test") }.collect { m -> c.name + "." + m.name }
|
||||
}.flatten()
|
||||
.toSet()
|
||||
|
||||
@ -97,7 +97,7 @@ class ListTests extends DefaultTask implements TestLister {
|
||||
.getClassesWithMethodAnnotation("org.junit.Test")
|
||||
.collect { c -> (c.getSubclasses() + Collections.singletonList(c)) }
|
||||
.flatten()
|
||||
.collect { ClassInfo c -> c.name + "*" }.flatten()
|
||||
.collect { ClassInfo c -> c.name }.flatten()
|
||||
.toSet()
|
||||
this.allTests = results.stream().sorted().collect(Collectors.toList())
|
||||
break
|
||||
|
@ -1,41 +1,29 @@
|
||||
package net.corda.testing;
|
||||
|
||||
import io.fabric8.kubernetes.api.model.Pod;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class KubePodResult {
|
||||
|
||||
private final Pod createdPod;
|
||||
private final CompletableFuture<Void> waiter;
|
||||
private volatile Integer resultCode = 255;
|
||||
private final int resultCode;
|
||||
private final File output;
|
||||
private volatile Collection<File> binaryResults = Collections.emptyList();
|
||||
private final Collection<File> binaryResults;
|
||||
|
||||
KubePodResult(Pod createdPod, CompletableFuture<Void> waiter, File output) {
|
||||
this.createdPod = createdPod;
|
||||
this.waiter = waiter;
|
||||
public KubePodResult(int resultCode, File output, Collection<File> binaryResults) {
|
||||
this.resultCode = resultCode;
|
||||
this.output = output;
|
||||
this.binaryResults = binaryResults;
|
||||
}
|
||||
|
||||
public void setResultCode(Integer code) {
|
||||
synchronized (createdPod) {
|
||||
this.resultCode = code;
|
||||
}
|
||||
}
|
||||
|
||||
public Integer getResultCode() {
|
||||
synchronized (createdPod) {
|
||||
return this.resultCode;
|
||||
}
|
||||
public int getResultCode() {
|
||||
return resultCode;
|
||||
}
|
||||
|
||||
public File getOutput() {
|
||||
synchronized (createdPod) {
|
||||
return output;
|
||||
}
|
||||
return output;
|
||||
}
|
||||
};
|
||||
|
||||
public Collection<File> getBinaryResults() {
|
||||
return binaryResults;
|
||||
}
|
||||
}
|
||||
|
@ -152,10 +152,10 @@ public class KubesReporting extends DefaultTask {
|
||||
if (!containersWithNonZeroReturnCodes.isEmpty()) {
|
||||
String reportUrl = new ConsoleRenderer().asClickableFileUrl(new File(destinationDir, "index.html"));
|
||||
if (shouldPrintOutput){
|
||||
containersWithNonZeroReturnCodes.forEach(container -> {
|
||||
containersWithNonZeroReturnCodes.forEach(podResult -> {
|
||||
try {
|
||||
System.out.println("\n##### CONTAINER OUTPUT START #####");
|
||||
IOUtils.copy(new FileInputStream(container.getOutput()), System.out);
|
||||
IOUtils.copy(new FileInputStream(podResult.getOutput()), System.out);
|
||||
System.out.println("##### CONTAINER OUTPUT END #####\n");
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
|
48
buildSrc/src/main/java/net/corda/testing/retry/Retry.java
Normal file
48
buildSrc/src/main/java/net/corda/testing/retry/Retry.java
Normal file
@ -0,0 +1,48 @@
|
||||
package net.corda.testing.retry;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
public final class Retry {
|
||||
private static final Logger log = LoggerFactory.getLogger(Retry.class);
|
||||
|
||||
public interface RetryStrategy {
|
||||
<T> T run(Callable<T> op) throws RetryException;
|
||||
}
|
||||
|
||||
public static final class RetryException extends RuntimeException {
|
||||
public RetryException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public RetryException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
|
||||
public static RetryStrategy fixed(int times) {
|
||||
if (times < 1) throw new IllegalArgumentException();
|
||||
return new RetryStrategy() {
|
||||
@Override
|
||||
public <T> T run(Callable<T> op) {
|
||||
int run = 0;
|
||||
Exception last = null;
|
||||
while (run < times) {
|
||||
try {
|
||||
return op.call();
|
||||
} catch (Exception e) {
|
||||
last = e;
|
||||
log.info("Exception caught: " + e.getMessage());
|
||||
}
|
||||
run++;
|
||||
}
|
||||
throw new RetryException("Operation failed " + run + " times", last);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -21,12 +21,12 @@ public class BucketingAllocatorTest {
|
||||
BucketingAllocator bucketingAllocator = new BucketingAllocator(1, Collections::emptyList);
|
||||
|
||||
Object task = new Object();
|
||||
bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass*", "AnotherTestingClass*"), task);
|
||||
bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass", "AnotherTestingClass"), task);
|
||||
|
||||
bucketingAllocator.generateTestPlan();
|
||||
List<String> testsForForkAndTestTask = bucketingAllocator.getTestsForForkAndTestTask(0, task);
|
||||
|
||||
Assert.assertThat(testsForForkAndTestTask, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass*", "AnotherTestingClass*"));
|
||||
Assert.assertThat(testsForForkAndTestTask, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass", "AnotherTestingClass"));
|
||||
|
||||
}
|
||||
|
||||
@ -37,7 +37,7 @@ public class BucketingAllocatorTest {
|
||||
BucketingAllocator bucketingAllocator = new BucketingAllocator(2, Collections::emptyList);
|
||||
|
||||
Object task = new Object();
|
||||
bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass*", "AnotherTestingClass*"), task);
|
||||
bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass", "AnotherTestingClass"), task);
|
||||
|
||||
bucketingAllocator.generateTestPlan();
|
||||
List<String> testsForForkOneAndTestTask = bucketingAllocator.getTestsForForkAndTestTask(0, task);
|
||||
@ -48,7 +48,7 @@ public class BucketingAllocatorTest {
|
||||
|
||||
List<String> allTests = Stream.of(testsForForkOneAndTestTask, testsForForkTwoAndTestTask).flatMap(Collection::stream).collect(Collectors.toList());
|
||||
|
||||
Assert.assertThat(allTests, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass*", "AnotherTestingClass*"));
|
||||
Assert.assertThat(allTests, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass", "AnotherTestingClass"));
|
||||
|
||||
}
|
||||
}
|
@ -32,3 +32,5 @@ metricsVersion=4.1.0
|
||||
metricsNewRelicVersion=1.1.1
|
||||
openSourceBranch=https://github.com/corda/corda/blob/master
|
||||
openSourceSamplesBranch=https://github.com/corda/samples/blob/master
|
||||
jolokiaAgentVersion=1.6.1
|
||||
|
||||
|
@ -6,6 +6,8 @@ import net.corda.core.CordaInternal
|
||||
import net.corda.core.DeleteForDJVM
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.AnonymousParty
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.*
|
||||
@ -120,14 +122,18 @@ abstract class FlowLogic<out T> {
|
||||
* is routed depends on the [Destination] type, including whether this call does any initial communication.
|
||||
*/
|
||||
@Suspendable
|
||||
fun initiateFlow(destination: Destination): FlowSession = stateMachine.initiateFlow(destination)
|
||||
fun initiateFlow(destination: Destination): FlowSession {
|
||||
require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" }
|
||||
return stateMachine.initiateFlow(destination, serviceHub.identityService.wellKnownPartyFromAnonymous(destination as AbstractParty)
|
||||
?: throw IllegalArgumentException("Could not resolve destination: $destination"))
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note
|
||||
* that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
|
||||
*/
|
||||
@Suspendable
|
||||
fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party)
|
||||
fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party, party)
|
||||
|
||||
/**
|
||||
* Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that
|
||||
|
@ -18,7 +18,7 @@ interface FlowStateMachine<FLOWRETURN> {
|
||||
fun <SUSPENDRETURN : Any> suspend(ioRequest: FlowIORequest<SUSPENDRETURN>, maySkipCheckpoint: Boolean): SUSPENDRETURN
|
||||
|
||||
@Suspendable
|
||||
fun initiateFlow(destination: Destination): FlowSession
|
||||
fun initiateFlow(destination: Destination, wellKnownParty: Party): FlowSession
|
||||
|
||||
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>)
|
||||
|
||||
|
@ -80,9 +80,7 @@ interface IdentityService {
|
||||
* @param key The owning [PublicKey] of the [Party].
|
||||
* @return Returns a [Party] with a matching owningKey if known, else returns null.
|
||||
*/
|
||||
fun partyFromKey(key: PublicKey): Party? =
|
||||
@Suppress("DEPRECATION")
|
||||
certificateFromKey(key)?.party
|
||||
fun partyFromKey(key: PublicKey): Party?
|
||||
|
||||
/**
|
||||
* Resolves a party name to the well known identity [Party] instance for this name. Where possible well known identity
|
||||
|
@ -1395,7 +1395,6 @@
|
||||
<ID>MaxLineLength:AddressBindingFailureTests.kt$AddressBindingFailureTests$@Test fun `rpc admin address`()</ID>
|
||||
<ID>MaxLineLength:AddressBindingFailureTests.kt$AddressBindingFailureTests$assertThat(exception.addresses).contains(address).withFailMessage("Expected addresses to contain $address but was ${exception.addresses}.")</ID>
|
||||
<ID>MaxLineLength:AddressBindingFailureTests.kt$AddressBindingFailureTests$assertThatThrownBy { startNode(customOverrides = overrides(address)).getOrThrow() }</ID>
|
||||
<ID>MaxLineLength:AddressBindingFailureTests.kt$AddressBindingFailureTests$driver</ID>
|
||||
<ID>MaxLineLength:AliasPrivateKeyTest.kt$AliasPrivateKeyTest$signingCertStore.query { setPrivateKey(alias, aliasPrivateKey, listOf(NOT_YET_REGISTERED_MARKER_KEYS_AND_CERTS.ECDSAR1_CERT), "entrypassword") }</ID>
|
||||
<ID>MaxLineLength:AliasPrivateKeyTest.kt$AliasPrivateKeyTest$val signingCertStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory, "keystorepass").get(createNew = true)</ID>
|
||||
<ID>MaxLineLength:AliasPrivateKeyTest.kt$AliasPrivateKeyTest${ val alias = "01234567890" val aliasPrivateKey = AliasPrivateKey(alias) val certificatesDirectory = tempFolder.root.toPath() val signingCertStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory, "keystorepass").get(createNew = true) signingCertStore.query { setPrivateKey(alias, aliasPrivateKey, listOf(NOT_YET_REGISTERED_MARKER_KEYS_AND_CERTS.ECDSAR1_CERT), "entrypassword") } // We can retrieve the certificate. assertTrue { signingCertStore.contains(alias) } // We can retrieve the certificate. assertEquals(NOT_YET_REGISTERED_MARKER_KEYS_AND_CERTS.ECDSAR1_CERT, signingCertStore[alias]) // Although we can store an AliasPrivateKey, we cannot retrieve it. But, it's fine as we use certStore for storing/handling certs only. assertThatIllegalArgumentException().isThrownBy { signingCertStore.query { getPrivateKey(alias, "entrypassword") } }.withMessage("Unrecognised algorithm: 1.3.6.1.4.1.50530.1.2") }</ID>
|
||||
@ -2010,7 +2009,6 @@
|
||||
<ID>MaxLineLength:CordappContext.kt$CordappContext.EmptyCordappConfig$override fun getLong(path: String)</ID>
|
||||
<ID>MaxLineLength:CordappContext.kt$CordappContext.EmptyCordappConfig$override fun getNumber(path: String)</ID>
|
||||
<ID>MaxLineLength:CordappContext.kt$CordappContext.EmptyCordappConfig$override fun getString(path: String)</ID>
|
||||
<ID>MaxLineLength:CordappResolver.kt$CordappResolver$logger.error("ATTENTION: More than one CorDapp installed on the node for contract $className. Please remove the previous version when upgrading to a new version.")</ID>
|
||||
<ID>MaxLineLength:CordappResolverTest.kt$CordappResolverTest$@Test fun `when the same cordapp is registered for the same class multiple times, the resolver deduplicates and returns it as the current one`()</ID>
|
||||
<ID>MaxLineLength:CordappSmokeTest.kt$CordappSmokeTest$(additionalNodeInfoDir / "nodeInfo-41408E093F95EAD51F6892C34DEB65AE1A3569A4B0E5744769A1B485AF8E04B5").write(signedNodeInfo.serialize().bytes)</ID>
|
||||
<ID>MaxLineLength:CordappSmokeTest.kt$CordappSmokeTest$val nodeInfo = createNodeInfoWithSingleIdentity(CordaX500Name(organisation = "Bob Corp", locality = "Madrid", country = "ES"), dummyKeyPair, dummyKeyPair.public)</ID>
|
||||
@ -3144,8 +3142,6 @@
|
||||
<ID>MaxLineLength:RPCApi.kt$private fun Trace.mapToExternal(message: ClientMessage)</ID>
|
||||
<ID>MaxLineLength:RPCApi.kt$return invocationId(RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot extract reply id from client message.")</ID>
|
||||
<ID>MaxLineLength:RPCApi.kt$return sessionId(RPC_SESSION_ID_FIELD_NAME, RPC_SESSION_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot extract the session id from client message.")</ID>
|
||||
<ID>MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$private</ID>
|
||||
<ID>MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext)</ID>
|
||||
<ID>MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$throw UnsupportedOperationException("Method $calledMethod was added in RPC protocol version $sinceVersion but the server is running $serverProtocolVersion")</ID>
|
||||
<ID>MaxLineLength:RPCDriver.kt$RPCDriverDSL$val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, driverDSL.driverDirectory / serverName, hostAndPort)</ID>
|
||||
<ID>MaxLineLength:RPCDriver.kt$RPCDriverDSL.Companion$fun createRpcServerArtemisConfig(maxFileSize: Int, maxBufferedBytesPerClient: Long, baseDirectory: Path, hostAndPort: NetworkHostAndPort): Configuration</ID>
|
||||
@ -3875,64 +3871,6 @@
|
||||
<ID>NestedBlockDepth:StatusTransitions.kt$StatusTransitions$ fun verify(tx: LedgerTransaction)</ID>
|
||||
<ID>NestedBlockDepth:ThrowableSerializer.kt$ThrowableSerializer$override fun fromProxy(proxy: ThrowableProxy): Throwable</ID>
|
||||
<ID>NestedBlockDepth:TransactionVerifierServiceInternal.kt$Verifier$ private fun verifyConstraintsValidity(contractAttachmentsByContract: Map<ContractClassName, ContractAttachment>)</ID>
|
||||
<ID>ReturnCount:AbstractPartyDescriptor.kt$AbstractPartyDescriptor$override fun <X : Any> unwrap(value: AbstractParty?, type: Class<X>, options: WrapperOptions): X?</ID>
|
||||
<ID>ReturnCount:AbstractPartyDescriptor.kt$AbstractPartyDescriptor$override fun <X : Any> wrap(value: X?, options: WrapperOptions): AbstractParty?</ID>
|
||||
<ID>ReturnCount:Address.kt$Address.Companion$fun <ERROR> validFromRawValue(rawValue: String, mapError: (String) -> ERROR): Validated<Address, ERROR></ID>
|
||||
<ID>ReturnCount:Amount.kt$Amount.Companion$ @JvmStatic fun getDisplayTokenSize(token: Any): BigDecimal</ID>
|
||||
<ID>ReturnCount:AttachmentVersionNumberMigration.kt$AttachmentVersionNumberMigration$override fun execute(database: Database?)</ID>
|
||||
<ID>ReturnCount:AttachmentsClassLoader.kt$AttachmentsClassLoader$private fun containsClasses(attachment: Attachment): Boolean</ID>
|
||||
<ID>ReturnCount:BankOfCordaWebApi.kt$BankOfCordaWebApi$ @POST @Path("issue-asset-request") @Consumes(MediaType.APPLICATION_JSON) fun issueAssetRequest(params: IssueRequestParams): Response</ID>
|
||||
<ID>ReturnCount:BridgeControlListener.kt$BridgeControlListener$private fun processControlMessage(msg: ClientMessage)</ID>
|
||||
<ID>ReturnCount:ByteArrays.kt$fun hexToBin(ch: Char): Int</ID>
|
||||
<ID>ReturnCount:ByteBufferStreams.kt$ByteBufferInputStream$@Throws(IOException::class) override fun read(b: ByteArray, offset: Int, length: Int): Int</ID>
|
||||
<ID>ReturnCount:CheckpointAgent.kt$CheckpointHook$private fun <T> getArrayValue(clazz: Class<T>, value: Any?): String?</ID>
|
||||
<ID>ReturnCount:CheckpointAgent.kt$CheckpointHook$private fun instrumentClass(clazz: CtClass): CtClass?</ID>
|
||||
<ID>ReturnCount:CloseableTab.kt$CloseableTab$fun requestClose()</ID>
|
||||
<ID>ReturnCount:CordaAuthenticationPlugin.kt$CordaAuthenticationPlugin$override fun authenticate(username: String?, credential: String?): AuthInfo</ID>
|
||||
<ID>ReturnCount:CordaClassResolver.kt$CordaClassResolver$private fun checkClass(type: Class<*>): Registration?</ID>
|
||||
<ID>ReturnCount:Crypto.kt$Crypto$// Custom key pair generator from an entropy required for various tests. It is similar to deriveKeyPairECDSA, // but the accepted range of the input entropy is more relaxed: // 2 <= entropy < N, where N is the order of base-point G. private fun deriveECDSAKeyPairFromEntropy(signatureScheme: SignatureScheme, entropy: BigInteger): KeyPair</ID>
|
||||
<ID>ReturnCount:Crypto.kt$Crypto$// Given the domain parameters, this routine deterministically generates an ECDSA key pair // in accordance with X9.62 section 5.2.1 pages 26, 27. private fun deriveKeyPairECDSA(parameterSpec: ECParameterSpec, privateKey: PrivateKey, seed: ByteArray): KeyPair</ID>
|
||||
<ID>ReturnCount:DBRunnerExtension.kt$DBRunnerExtension$private fun getDatabaseContext(context: ExtensionContext?): TestDatabaseContext?</ID>
|
||||
<ID>ReturnCount:Emoji.kt$Emoji$fun renderIfSupported(obj: Any): String</ID>
|
||||
<ID>ReturnCount:FlowLogicRefFactoryImpl.kt$FlowLogicRefFactoryImpl$private fun buildParams(constructor: KFunction<FlowLogic<*>>, args: Map<String, Any?>): HashMap<KParameter, Any?>?</ID>
|
||||
<ID>ReturnCount:FlowManager.kt$NodeFlowManager.FlowWeightComparator$override fun compare(o1: NodeFlowManager.RegisteredFlowContainer, o2: NodeFlowManager.RegisteredFlowContainer): Int</ID>
|
||||
<ID>ReturnCount:InteractiveShell.kt$InteractiveShell$ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps))</ID>
|
||||
<ID>ReturnCount:InteractiveShell.kt$InteractiveShell$@JvmStatic fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps, inputObjectMapper: ObjectMapper): Any?</ID>
|
||||
<ID>ReturnCount:Interpolators.kt$LinearInterpolator$override fun interpolate(x: Double): Double</ID>
|
||||
<ID>ReturnCount:JarScanningCordappLoader.kt$JarScanningCordappLoader$private fun parseCordappInfo(manifest: Manifest?, defaultName: String): Cordapp.Info</ID>
|
||||
<ID>ReturnCount:LocalSerializerFactory.kt$DefaultLocalSerializerFactory$override fun get(actualClass: Class<*>, declaredType: Type): AMQPSerializer<Any></ID>
|
||||
<ID>ReturnCount:LocalTypeInformationBuilder.kt$ private fun constructorForDeserialization(type: Type): KFunction<Any>?</ID>
|
||||
<ID>ReturnCount:LocalTypeInformationBuilder.kt$LocalTypeInformationBuilder$ private fun buildNonAtomic(rawType: Class<*>, type: Type, typeIdentifier: TypeIdentifier, typeParameterInformation: List<LocalTypeInformation>): LocalTypeInformation</ID>
|
||||
<ID>ReturnCount:LocalTypeInformationBuilder.kt$LocalTypeInformationBuilder$private fun makeConstructorPairedProperty(constructorIndex: Int, descriptor: PropertyDescriptor, constructorInformation: LocalConstructorInformation): LocalPropertyInformation?</ID>
|
||||
<ID>ReturnCount:Main.kt$Node$fun isAccepted(tx: Transaction): Boolean</ID>
|
||||
<ID>ReturnCount:MockNodeMessagingService.kt$MockNodeMessagingService$ private fun getNextQueue(q: LinkedBlockingQueue<InMemoryMessagingNetwork.MessageTransfer>, block: Boolean): Pair<InMemoryMessagingNetwork.MessageTransfer, List<Handler>>?</ID>
|
||||
<ID>ReturnCount:NetworkRegistrationHelper.kt$NodeRegistrationHelper$override fun validateAndGetTlsCrlIssuerCert(): X509Certificate?</ID>
|
||||
<ID>ReturnCount:NodeAttachmentTrustCalculator.kt$NodeAttachmentTrustCalculator$override fun calculate(attachment: Attachment): Boolean</ID>
|
||||
<ID>ReturnCount:NodeConfigurationImpl.kt$NodeConfigurationImpl$private fun validateDevModeOptions(): List<String></ID>
|
||||
<ID>ReturnCount:NodeSchemaService.kt$NodeSchemaService$// Because schema is always one supported by the state, just delegate. override fun generateMappedObject(state: ContractState, schema: MappedSchema): PersistentState</ID>
|
||||
<ID>ReturnCount:NodeStartup.kt$NodeStartup$fun initialiseAndRun(cmdLineOptions: SharedNodeCmdLineOptions, afterNodeInitialisation: RunAfterNodeInitialisation, requireCertificates: Boolean = false): Int</ID>
|
||||
<ID>ReturnCount:NodeStartup.kt$NodeStartup$fun isNodeRunningAt(baseDirectory: Path): Boolean</ID>
|
||||
<ID>ReturnCount:NodeStartup.kt$NodeStartup$private fun canReadCertificatesDirectory(certDirectory: Path, devMode: Boolean): Boolean</ID>
|
||||
<ID>ReturnCount:NodeStartup.kt$fun CliWrapperBase.initLogging(baseDirectory: Path): Boolean</ID>
|
||||
<ID>ReturnCount:NodeVaultService.kt$NodeVaultService$@Suspendable @Throws(StatesNotAvailableException::class) override fun <T : FungibleState<*>> tryLockFungibleStatesForSpending( lockId: UUID, eligibleStatesQuery: QueryCriteria, amount: Amount<*>, contractStateType: Class<out T> ): List<StateAndRef<T>></ID>
|
||||
<ID>ReturnCount:ObjectDiffer.kt$ObjectDiffer$fun diff(a: Any?, b: Any?): DiffTree?</ID>
|
||||
<ID>ReturnCount:PartialMerkleTree.kt$PartialMerkleTree$// Helper function to compute the path. False means go to the left and True to the right. // Because the path is updated recursively, the path is returned in reverse order. private fun leafIndexHelper(leaf: SecureHash, node: PartialTree, path: MutableList<Boolean>): Boolean</ID>
|
||||
<ID>ReturnCount:PersistentNetworkMapCache.kt$PersistentNetworkMapCache$override fun getPartyInfo(party: Party): PartyInfo?</ID>
|
||||
<ID>ReturnCount:RPCClientProxyHandler.kt$RPCClientProxyHandler$// This is the general function that transforms a client side RPC to internal Artemis messages. override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?): Any?</ID>
|
||||
<ID>ReturnCount:RPCSecurityManagerImpl.kt$RPCPermissionResolver$override fun resolvePermission(representation: String): Permission</ID>
|
||||
<ID>ReturnCount:RigorousMock.kt$SpectatorDefaultAnswer$override fun answerImpl(invocation: InvocationOnMock): Any?</ID>
|
||||
<ID>ReturnCount:SerialFilter.kt$SerialFilter$internal fun applyPredicate(acceptClass: (Class<*>) -> Boolean, serialClass: Class<*>?): Boolean</ID>
|
||||
<ID>ReturnCount:ServicesForResolutionImpl.kt$ServicesForResolutionImpl$// We may need to recursively chase transactions if there are notary changes. fun inner(stateRef: StateRef, forContractClassName: String?): Attachment</ID>
|
||||
<ID>ReturnCount:SingleThreadedStateMachineManager.kt$SingleThreadedStateMachineManager$override fun retryFlowFromSafePoint(currentState: StateMachineState)</ID>
|
||||
<ID>ReturnCount:SingleThreadedStateMachineManager.kt$SingleThreadedStateMachineManager$private fun createFlowFromCheckpoint( id: StateMachineRunId, serializedCheckpoint: SerializedBytes<Checkpoint>, isAnyCheckpointPersisted: Boolean, isStartIdempotent: Boolean, initialDeduplicationHandler: DeduplicationHandler? ): Flow?</ID>
|
||||
<ID>ReturnCount:SpecificationTest.kt$SpecificationTest$fun parseMax(elements: List<Long>): Valid<Long></ID>
|
||||
<ID>ReturnCount:StandaloneShell.kt$StandaloneShell$override fun runProgram(): Int</ID>
|
||||
<ID>ReturnCount:TransactionBuilder.kt$TransactionBuilder$ private fun handleContract( contractClassName: ContractClassName, inputStates: List<TransactionState<ContractState>>?, outputStates: List<TransactionState<ContractState>>?, explicitContractAttachment: AttachmentId?, services: ServicesForResolution ): Pair<AttachmentId, List<TransactionState<ContractState>>?></ID>
|
||||
<ID>ReturnCount:TransactionUtils.kt$ fun <T : Any> deserialiseComponentGroup(componentGroups: List<ComponentGroup>, clazz: KClass<T>, groupEnum: ComponentGroupEnum, forceDeserialize: Boolean = false, factory: SerializationFactory = SerializationFactory.defaultFactory, context: SerializationContext = factory.defaultContext): List<T></ID>
|
||||
<ID>ReturnCount:TransitionExecutorImpl.kt$TransitionExecutorImpl$@Suspendable override fun executeTransition( fiber: FlowFiber, previousState: StateMachineState, event: Event, transition: TransitionResult, actionExecutor: ActionExecutor ): Pair<FlowContinuation, StateMachineState></ID>
|
||||
<ID>ReturnCount:TypeParameterUtils.kt$ private fun inferTypeVariables(actualClass: Class<*>, declaredClass: Class<*>, declaredType: ParameterizedType): Type?</ID>
|
||||
<ID>ReturnCount:Util.kt$fun <T> debugCompare(perLeft: Perceivable<T>, perRight: Perceivable<T>)</ID>
|
||||
<ID>ReturnCount:Util.kt$fun debugCompare(arrLeft: Arrangement, arrRight: Arrangement)</ID>
|
||||
<ID>SpreadOperator:AMQPSerializationScheme.kt$AbstractAMQPSerializationScheme$(*it.whitelist.toTypedArray())</ID>
|
||||
<ID>SpreadOperator:AbstractNode.kt$FlowStarterImpl$(logicType, *args)</ID>
|
||||
<ID>SpreadOperator:AbstractParty.kt$AbstractParty$(*bytes)</ID>
|
||||
|
@ -183,12 +183,6 @@ style:
|
||||
OptionalAbstractKeyword:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
ReturnCount:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
max: 2
|
||||
excludedFunctions: "equals"
|
||||
excludeReturnFromLambda: true
|
||||
SafeCast:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
|
@ -25,7 +25,8 @@ corda_substitutions = {
|
||||
"|quasar_version|" : constants_properties_dict["quasarVersion"],
|
||||
"|platform_version|" : constants_properties_dict["platformVersion"],
|
||||
"|os_branch|" : constants_properties_dict["openSourceBranch"],
|
||||
"|os_samples_branch|" : constants_properties_dict["openSourceSamplesBranch"]
|
||||
"|os_samples_branch|" : constants_properties_dict["openSourceSamplesBranch"],
|
||||
"|jolokia_version|" : constants_properties_dict["jolokiaAgentVersion"]
|
||||
}
|
||||
|
||||
def setup(app):
|
||||
|
@ -195,20 +195,20 @@ parameters will be accepted without user input. The following parameters with th
|
||||
:start-after: DOCSTART 1
|
||||
:end-before: DOCEND 1
|
||||
|
||||
This behaviour can be turned off by setting the optional node configuration property ``NetworkParameterAcceptanceSettings.autoAcceptEnabled``
|
||||
This behaviour can be turned off by setting the optional node configuration property ``networkParameterAcceptanceSettings.autoAcceptEnabled``
|
||||
to ``false``. For example:
|
||||
|
||||
.. sourcecode:: guess
|
||||
|
||||
...
|
||||
NetworkParameterAcceptanceSettings {
|
||||
networkParameterAcceptanceSettings {
|
||||
autoAcceptEnabled = false
|
||||
}
|
||||
...
|
||||
|
||||
It is also possible to switch off this behaviour at a more granular parameter level. This can be achieved by specifying the set of
|
||||
``@AutoAcceptable`` parameters that should not be auto-acceptable in the optional
|
||||
``NetworkParameterAcceptanceSettings.excludedAutoAcceptableParameters`` node configuration property.
|
||||
``networkParameterAcceptanceSettings.excludedAutoAcceptableParameters`` node configuration property.
|
||||
|
||||
For example, auto-acceptance can be switched off for any updates that change the ``packageOwnership`` map by adding the following to the
|
||||
node configuration:
|
||||
@ -216,7 +216,7 @@ node configuration:
|
||||
.. sourcecode:: guess
|
||||
|
||||
...
|
||||
NetworkParameterAcceptanceSettings {
|
||||
networkParameterAcceptanceSettings {
|
||||
excludedAutoAcceptableParameters: ["packageOwnership"]
|
||||
}
|
||||
...
|
||||
|
@ -81,6 +81,12 @@ Note that in production, exposing the database via the node is not recommended.
|
||||
Monitoring your node
|
||||
--------------------
|
||||
|
||||
This section covers monitoring performance and health of a node in Corda Enterprise with Jolokia and Graphite. General best practices for monitoring (e.g. setting up TCP checks for the ports the node communicates on, database health checks etc.) are not covered here but should be followed.
|
||||
|
||||
|
||||
Monitoring via Jolokia
|
||||
++++++++++++++++++++++
|
||||
|
||||
Like most Java servers, the node can be configured to export various useful metrics and management operations via the industry-standard
|
||||
`JMX infrastructure <https://en.wikipedia.org/wiki/Java_Management_Extensions>`_. JMX is a standard API
|
||||
for registering so-called *MBeans* ... objects whose properties and methods are intended for server management. As Java
|
||||
@ -106,8 +112,12 @@ Here are a few ways to build dashboards and extract monitoring data for a node:
|
||||
It can bridge any data input to any output using their plugin system, for example, Telegraf can
|
||||
be configured to collect data from Jolokia and write to DataDog web api.
|
||||
|
||||
The Node configuration parameter `jmxMonitoringHttpPort` has to be present in order to ensure a Jolokia agent is instrumented with
|
||||
the JVM run-time.
|
||||
In order to ensure that a Jolokia agent is instrumented with the JVM run-time, you can choose one of these options:
|
||||
|
||||
* Specify the Node configuration parameter ``jmxMonitoringHttpPort`` which will attempt to load the jolokia driver from the ``drivers`` folder.
|
||||
The format of the driver name needs to be ``jolokia-jvm-{VERSION}-agent.jar`` where VERSION is the version required by Corda, currently |jolokia_version|.
|
||||
* Start the node with ``java -Dcapsule.jvm.args="-javaagent:drivers/jolokia-jvm-1.6.0-agent.jar=port=7777,host=localhost" -jar corda.jar``.
|
||||
|
||||
|
||||
The following JMX statistics are exported:
|
||||
|
||||
@ -126,6 +136,8 @@ via a file called ``jolokia-access.xml``.
|
||||
Several Jolokia policy based security configuration files (``jolokia-access.xml``) are available for dev, test, and prod
|
||||
environments under ``/config/<env>``.
|
||||
|
||||
To pass a security policy use ``java -Dcapsule.jvm.args=-javaagent:./drivers/jolokia-jvm-1.6.0-agent.jar,policyLocation=file:./config-path/jolokia-access.xml -jar corda.jar``
|
||||
|
||||
Notes for development use
|
||||
+++++++++++++++++++++++++
|
||||
|
||||
|
@ -3,7 +3,12 @@ buildscript {
|
||||
def properties = new Properties()
|
||||
file("$projectDir/src/main/resources/build.properties").withInputStream { properties.load(it) }
|
||||
|
||||
ext.jolokia_version = properties.getProperty('jolokiaAgentVersion')
|
||||
|
||||
Properties constants = new Properties()
|
||||
file("$rootDir/constants.properties").withInputStream { constants.load(it) }
|
||||
|
||||
|
||||
ext.jolokia_version = constants.getProperty('jolokiaAgentVersion')
|
||||
|
||||
dependencies {
|
||||
classpath group: 'com.github.docker-java', name: 'docker-java', version: '3.1.5'
|
||||
|
@ -10,7 +10,6 @@ import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
@Ignore
|
||||
class NodeRPCTests {
|
||||
private val CORDA_VERSION_REGEX = "\\d+(\\.\\d+)?(-\\w+)?".toRegex()
|
||||
private val CORDA_VENDOR = "Corda Open Source"
|
||||
@ -28,7 +27,6 @@ class NodeRPCTests {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), cordappsForAllNodes = CORDAPPS, extraCordappPackagesToScan = emptyList())) {
|
||||
val nodeDiagnosticInfo = startNode().get().rpc.nodeDiagnosticInfo()
|
||||
assertTrue(nodeDiagnosticInfo.version.matches(CORDA_VERSION_REGEX))
|
||||
assertTrue(nodeDiagnosticInfo.revision.matches(HEXADECIMAL_REGEX))
|
||||
assertEquals(PLATFORM_VERSION, nodeDiagnosticInfo.platformVersion)
|
||||
assertEquals(CORDA_VENDOR, nodeDiagnosticInfo.vendor)
|
||||
nodeDiagnosticInfo.cordapps.forEach { println("${it.shortName} ${it.type}") }
|
||||
|
@ -6,13 +6,10 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.identity.x500Matches
|
||||
import net.corda.core.internal.CertRole
|
||||
import net.corda.core.internal.hash
|
||||
import net.corda.core.node.services.IdentityService
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.api.IdentityServiceInternal
|
||||
import net.corda.node.services.persistence.WritablePublicKeyToOwningIdentityCache
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.crypto.x509Certificates
|
||||
import java.security.InvalidAlgorithmParameterException
|
||||
@ -101,6 +98,10 @@ class InMemoryIdentityService(
|
||||
return keyToPartyAndCerts[identityCertChain[1].publicKey]
|
||||
}
|
||||
|
||||
override fun partyFromKey(key: PublicKey): Party? {
|
||||
return certificateFromKey(key)?.party ?: keyToName[key.toStringShort()]?.let { wellKnownPartyFromX500Name(it) }
|
||||
}
|
||||
|
||||
override fun certificateFromKey(owningKey: PublicKey): PartyAndCertificate? = keyToPartyAndCerts[owningKey]
|
||||
|
||||
// We give the caller a copy of the data set to avoid any locking problems
|
||||
|
@ -296,6 +296,12 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
|
||||
keyToPartyAndCert[owningKey.toStringShort()]
|
||||
}
|
||||
|
||||
override fun partyFromKey(key: PublicKey): Party? {
|
||||
return certificateFromKey(key)?.party ?: database.transaction {
|
||||
keyToName[key.toStringShort()]
|
||||
}?.let { wellKnownPartyFromX500Name(it) }
|
||||
}
|
||||
|
||||
private fun certificateFromCordaX500Name(name: CordaX500Name): PartyAndCertificate? {
|
||||
return database.transaction {
|
||||
val partyId = nameToKey[name]
|
||||
|
@ -71,7 +71,7 @@ sealed class Event {
|
||||
* Initiate a flow. This causes a new session object to be created and returned to the flow. Note that no actual
|
||||
* communication takes place at this time, only on the first send/receive operation on the session.
|
||||
*/
|
||||
data class InitiateFlow(val destination: Destination) : Event()
|
||||
data class InitiateFlow(val destination: Destination, val wellKnownParty: Party) : Event()
|
||||
|
||||
/**
|
||||
* Signal the entering into a subflow.
|
||||
|
@ -17,10 +17,11 @@ import net.corda.core.utilities.UntrustworthyData
|
||||
|
||||
class FlowSessionImpl(
|
||||
override val destination: Destination,
|
||||
private val wellKnownParty: Party,
|
||||
val sourceSessionId: SessionId
|
||||
) : FlowSession() {
|
||||
|
||||
override val counterparty: Party get() = checkNotNull(destination as? Party) { "$destination is not a Party" }
|
||||
override val counterparty: Party get() = wellKnownParty
|
||||
|
||||
override fun toString(): String = "FlowSessionImpl(destination=$destination, sourceSessionId=$sourceSessionId)"
|
||||
|
||||
|
@ -355,10 +355,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun initiateFlow(destination: Destination): FlowSession {
|
||||
override fun initiateFlow(destination: Destination, wellKnownParty: Party): FlowSession {
|
||||
require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" }
|
||||
val resume = processEventImmediately(
|
||||
Event.InitiateFlow(destination),
|
||||
Event.InitiateFlow(destination, wellKnownParty),
|
||||
isDbTransactionOpenOnEntry = true,
|
||||
isDbTransactionOpenOnExit = true
|
||||
) as FlowContinuation.Resume
|
||||
|
@ -466,7 +466,7 @@ class SingleThreadedStateMachineManager(
|
||||
try {
|
||||
val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
|
||||
val initiatedSessionId = SessionId.createRandom(secureRandom)
|
||||
val senderSession = FlowSessionImpl(sender, initiatedSessionId)
|
||||
val senderSession = FlowSessionImpl(sender, sender, initiatedSessionId)
|
||||
val flowLogic = initiatedFlowFactory.createFlow(senderSession)
|
||||
val initiatedFlowInfo = when (initiatedFlowFactory) {
|
||||
is InitiatedFlowFactory.Core -> FlowInfo(serviceHub.myInfo.platformVersion, "corda")
|
||||
|
@ -235,7 +235,7 @@ class TopLevelTransition(
|
||||
return@builder FlowContinuation.ProcessEvents
|
||||
}
|
||||
val sourceSessionId = SessionId.createRandom(context.secureRandom)
|
||||
val sessionImpl = FlowSessionImpl(event.destination, sourceSessionId)
|
||||
val sessionImpl = FlowSessionImpl(event.destination, event.wellKnownParty, sourceSessionId)
|
||||
val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
|
||||
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
|
||||
actions.add(Action.AddSessionBinding(context.id, sourceSessionId))
|
||||
|
@ -1,4 +1,6 @@
|
||||
# Build constants exported as resource file to make them visible in Node program
|
||||
# Note: sadly, due to present limitation of IntelliJ-IDEA in processing resource files, these constants cannot be
|
||||
# imported from top-level 'constants.properties' file
|
||||
jolokiaAgentVersion=1.6.1
|
||||
#jolokiaAgentVersion=1.6.1
|
||||
|
||||
|
||||
|
@ -261,6 +261,17 @@ class PersistentIdentityServiceTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `resolve key to party for key without certificate`() {
|
||||
// Register Alice's PartyAndCert as if it was done so via the network map cache.
|
||||
identityService.verifyAndRegisterIdentity(alice.identity)
|
||||
// Use a key which is not tied to a cert.
|
||||
val publicKey = Crypto.generateKeyPair().public
|
||||
// Register the PublicKey to Alice's CordaX500Name.
|
||||
identityService.registerKey(publicKey, alice.party)
|
||||
assertEquals(alice.party, identityService.partyFromKey(publicKey))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `register incorrect party to public key `(){
|
||||
database.transaction { identityService.verifyAndRegisterIdentity(ALICE_IDENTITY) }
|
||||
|
@ -69,8 +69,8 @@ class FlowFrameworkTests {
|
||||
@Before
|
||||
fun setUpMockNet() {
|
||||
mockNet = InternalMockNetwork(
|
||||
cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP),
|
||||
servicePeerAllocationStrategy = RoundRobin()
|
||||
cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP),
|
||||
servicePeerAllocationStrategy = RoundRobin()
|
||||
)
|
||||
|
||||
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
|
||||
@ -139,13 +139,13 @@ class FlowFrameworkTests {
|
||||
mockNet.runNetwork()
|
||||
|
||||
assertSessionTransfers(
|
||||
aliceNode sent sessionInit(PingPongFlow::class, payload = 10L) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent sessionData(20L) to aliceNode,
|
||||
aliceNode sent sessionData(11L) to bobNode,
|
||||
bobNode sent sessionData(21L) to aliceNode,
|
||||
aliceNode sent normalEnd to bobNode,
|
||||
bobNode sent normalEnd to aliceNode
|
||||
aliceNode sent sessionInit(PingPongFlow::class, payload = 10L) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent sessionData(20L) to aliceNode,
|
||||
aliceNode sent sessionData(11L) to bobNode,
|
||||
bobNode sent sessionData(21L) to aliceNode,
|
||||
aliceNode sent normalEnd to bobNode,
|
||||
bobNode sent normalEnd to aliceNode
|
||||
)
|
||||
}
|
||||
|
||||
@ -167,7 +167,8 @@ class FlowFrameworkTests {
|
||||
it.message is ExistingSessionMessage && it.message.payload === EndSessionMessage
|
||||
}.subscribe { sessionEndReceived.release() }
|
||||
val resultFuture = aliceNode.services.startFlow(
|
||||
WaitForOtherSideEndBeforeSendAndReceive(bob, sessionEndReceived)).resultFuture
|
||||
WaitForOtherSideEndBeforeSendAndReceive(bob, sessionEndReceived)
|
||||
).resultFuture
|
||||
mockNet.runNetwork()
|
||||
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
|
||||
resultFuture.getOrThrow()
|
||||
@ -186,10 +187,10 @@ class FlowFrameworkTests {
|
||||
mockNet.runNetwork()
|
||||
|
||||
assertThatExceptionOfType(MyFlowException::class.java)
|
||||
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
|
||||
.withMessage("Nothing useful")
|
||||
.withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow
|
||||
.withStackTraceContaining("Received counter-flow exception from peer")
|
||||
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
|
||||
.withMessage("Nothing useful")
|
||||
.withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow
|
||||
.withStackTraceContaining("Received counter-flow exception from peer")
|
||||
bobNode.database.transaction {
|
||||
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
|
||||
}
|
||||
@ -197,15 +198,15 @@ class FlowFrameworkTests {
|
||||
assertThat(receivingFiber.state).isEqualTo(Strand.State.WAITING)
|
||||
assertThat((erroringFlow.get().stateMachine as FlowStateMachineImpl).state).isEqualTo(Strand.State.WAITING)
|
||||
assertThat(erroringFlowSteps.get()).containsExactly(
|
||||
Notification.createOnNext(ProgressTracker.STARTING),
|
||||
Notification.createOnNext(ExceptionFlow.START_STEP),
|
||||
Notification.createOnError(erroringFlow.get().exceptionThrown)
|
||||
Notification.createOnNext(ProgressTracker.STARTING),
|
||||
Notification.createOnNext(ExceptionFlow.START_STEP),
|
||||
Notification.createOnError(erroringFlow.get().exceptionThrown)
|
||||
)
|
||||
|
||||
assertSessionTransfers(
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent errorMessage(erroringFlow.get().exceptionThrown) to aliceNode
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent errorMessage(erroringFlow.get().exceptionThrown) to aliceNode
|
||||
)
|
||||
// Make sure the original stack trace isn't sent down the wire
|
||||
val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage
|
||||
@ -296,8 +297,8 @@ class FlowFrameworkTests {
|
||||
@Test
|
||||
fun waitForLedgerCommit() {
|
||||
val ptx = TransactionBuilder(notary = notaryIdentity)
|
||||
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
val stx = aliceNode.services.signInitialTransaction(ptx)
|
||||
|
||||
val committerStx = aliceNode.registerCordappFlowFactory(CommitterFlow::class) {
|
||||
@ -313,8 +314,8 @@ class FlowFrameworkTests {
|
||||
@Test
|
||||
fun `waitForLedgerCommit throws exception if any active session ends in error`() {
|
||||
val ptx = TransactionBuilder(notary = notaryIdentity)
|
||||
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand())
|
||||
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand())
|
||||
val stx = aliceNode.services.signInitialTransaction(ptx)
|
||||
|
||||
aliceNode.registerCordappFlowFactory(WaitForLedgerCommitFlow::class) { ExceptionFlow { throw Exception("Error") } }
|
||||
@ -354,8 +355,8 @@ class FlowFrameworkTests {
|
||||
val result = aliceNode.services.startFlow(UpgradedFlow(bob)).resultFuture
|
||||
mockNet.runNetwork()
|
||||
assertThat(receivedSessionMessages).startsWith(
|
||||
aliceNode sent sessionInit(UpgradedFlow::class, flowVersion = 2) to bobNode,
|
||||
bobNode sent sessionConfirm(flowVersion = 1) to aliceNode
|
||||
aliceNode sent sessionInit(UpgradedFlow::class, flowVersion = 2) to bobNode,
|
||||
bobNode sent sessionConfirm(flowVersion = 1) to aliceNode
|
||||
)
|
||||
val (receivedPayload, node2FlowVersion) = result.getOrThrow()
|
||||
assertThat(receivedPayload).isEqualTo("Old initiated")
|
||||
@ -369,8 +370,8 @@ class FlowFrameworkTests {
|
||||
val flowInfo = aliceNode.services.startFlow(initiatingFlow).resultFuture
|
||||
mockNet.runNetwork()
|
||||
assertThat(receivedSessionMessages).startsWith(
|
||||
aliceNode sent sessionInit(SendFlow::class, flowVersion = 1, payload = "Old initiating") to bobNode,
|
||||
bobNode sent sessionConfirm(flowVersion = 2) to aliceNode
|
||||
aliceNode sent sessionInit(SendFlow::class, flowVersion = 1, payload = "Old initiating") to bobNode,
|
||||
bobNode sent sessionConfirm(flowVersion = 2) to aliceNode
|
||||
)
|
||||
assertThat(flowInfo.get().flowVersion).isEqualTo(2)
|
||||
}
|
||||
@ -380,8 +381,8 @@ class FlowFrameworkTests {
|
||||
val future = aliceNode.services.startFlow(NeverRegisteredFlow("Hello", bob)).resultFuture
|
||||
mockNet.runNetwork()
|
||||
assertThatExceptionOfType(UnexpectedFlowEndException::class.java)
|
||||
.isThrownBy { future.getOrThrow() }
|
||||
.withMessageEndingWith("${NeverRegisteredFlow::class.java.name} is not registered")
|
||||
.isThrownBy { future.getOrThrow() }
|
||||
.withMessageEndingWith("${NeverRegisteredFlow::class.java.name} is not registered")
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -441,9 +442,9 @@ class FlowFrameworkTests {
|
||||
erroringFlowFuture.getOrThrow()
|
||||
val flowSteps = erroringFlowSteps.get()
|
||||
assertThat(flowSteps).containsExactly(
|
||||
Notification.createOnNext(ProgressTracker.STARTING),
|
||||
Notification.createOnNext(ExceptionFlow.START_STEP),
|
||||
Notification.createOnError(erroringFlowFuture.get().exceptionThrown)
|
||||
Notification.createOnNext(ProgressTracker.STARTING),
|
||||
Notification.createOnNext(ExceptionFlow.START_STEP),
|
||||
Notification.createOnError(erroringFlowFuture.get().exceptionThrown)
|
||||
)
|
||||
|
||||
val receiveFlowException = assertFailsWith(UnexpectedFlowEndException::class) {
|
||||
@ -451,28 +452,28 @@ class FlowFrameworkTests {
|
||||
}
|
||||
assertThat(receiveFlowException.message).doesNotContain("evil bug!")
|
||||
assertThat(receiveFlowSteps.get()).containsExactly(
|
||||
Notification.createOnNext(ProgressTracker.STARTING),
|
||||
Notification.createOnNext(ReceiveFlow.START_STEP),
|
||||
Notification.createOnError(receiveFlowException)
|
||||
Notification.createOnNext(ProgressTracker.STARTING),
|
||||
Notification.createOnNext(ReceiveFlow.START_STEP),
|
||||
Notification.createOnError(receiveFlowException)
|
||||
)
|
||||
|
||||
assertSessionTransfers(
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent errorMessage() to aliceNode
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent errorMessage() to aliceNode
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `initiating flow using unknown AnonymousParty`() {
|
||||
val anonymousBob = bobNode.services.keyManagementService.freshKeyAndCert(bobNode.info.legalIdentitiesAndCerts.single(), false)
|
||||
.party.anonymise()
|
||||
.party.anonymise()
|
||||
bobNode.registerCordappFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
|
||||
val result = aliceNode.services.startFlow(SendAndReceiveFlow(anonymousBob, "Hello")).resultFuture
|
||||
mockNet.runNetwork()
|
||||
assertThatIllegalArgumentException()
|
||||
.isThrownBy { result.getOrThrow() }
|
||||
.withMessage("We do not know who $anonymousBob belongs to")
|
||||
.isThrownBy { result.getOrThrow() }
|
||||
.withMessage("Could not resolve destination: $anonymousBob")
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -497,16 +498,18 @@ class FlowFrameworkTests {
|
||||
private val FlowLogic<*>.progressSteps: CordaFuture<List<Notification<ProgressTracker.Step>>>
|
||||
get() {
|
||||
return progressTracker!!.changes
|
||||
.ofType(Change.Position::class.java)
|
||||
.map { it.newStep }
|
||||
.materialize()
|
||||
.toList()
|
||||
.toFuture()
|
||||
.ofType(Change.Position::class.java)
|
||||
.map { it.newStep }
|
||||
.materialize()
|
||||
.toList()
|
||||
.toFuture()
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party,
|
||||
@Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic<Unit>() {
|
||||
private class WaitForOtherSideEndBeforeSendAndReceive(
|
||||
val otherParty: Party,
|
||||
@Transient val receivedOtherFlowEnd: Semaphore
|
||||
) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
// Kick off the flow on the other side ...
|
||||
@ -626,7 +629,8 @@ class FlowFrameworkTests {
|
||||
//endregion Helpers
|
||||
}
|
||||
|
||||
internal fun sessionConfirm(flowVersion: Int = 1) = ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, "")))
|
||||
internal fun sessionConfirm(flowVersion: Int = 1) =
|
||||
ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, "")))
|
||||
|
||||
internal inline fun <reified P : FlowLogic<*>> TestStartedNode.getSingleFlow(): Pair<P, CordaFuture<*>> {
|
||||
return smm.findStateMachines(P::class.java).single()
|
||||
@ -637,17 +641,17 @@ private fun sanitise(message: SessionMessage) = when (message) {
|
||||
is ExistingSessionMessage -> {
|
||||
val payload = message.payload
|
||||
message.copy(
|
||||
recipientSessionId = SessionId(0),
|
||||
payload = when (payload) {
|
||||
is ConfirmSessionMessage -> payload.copy(
|
||||
initiatedSessionId = SessionId(0),
|
||||
initiatedFlowInfo = payload.initiatedFlowInfo.copy(appName = "")
|
||||
)
|
||||
is ErrorSessionMessage -> payload.copy(
|
||||
errorId = 0
|
||||
)
|
||||
else -> payload
|
||||
}
|
||||
recipientSessionId = SessionId(0),
|
||||
payload = when (payload) {
|
||||
is ConfirmSessionMessage -> payload.copy(
|
||||
initiatedSessionId = SessionId(0),
|
||||
initiatedFlowInfo = payload.initiatedFlowInfo.copy(appName = "")
|
||||
)
|
||||
is ErrorSessionMessage -> payload.copy(
|
||||
errorId = 0
|
||||
)
|
||||
else -> payload
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -667,10 +671,12 @@ internal fun TestStartedNode.sendSessionMessage(message: SessionMessage, destina
|
||||
}
|
||||
}
|
||||
|
||||
internal fun errorMessage(errorResponse: FlowException? = null) = ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0))
|
||||
internal fun errorMessage(errorResponse: FlowException? = null) =
|
||||
ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0))
|
||||
|
||||
internal infix fun TestStartedNode.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(internals.id, message)
|
||||
internal infix fun Pair<Int, SessionMessage>.to(node: TestStartedNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress)
|
||||
internal infix fun Pair<Int, SessionMessage>.to(node: TestStartedNode): SessionTransfer =
|
||||
SessionTransfer(first, second, node.network.myAddress)
|
||||
|
||||
internal data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) {
|
||||
val isPayloadTransfer: Boolean
|
||||
@ -785,7 +791,11 @@ internal class MyFlowException(override val message: String) : FlowException() {
|
||||
internal class MyPeerFlowException(override val message: String, val peer: Party) : FlowException()
|
||||
|
||||
@InitiatingFlow
|
||||
internal class SendAndReceiveFlow(private val destination: Destination, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic<Any>() {
|
||||
internal class SendAndReceiveFlow(
|
||||
private val destination: Destination,
|
||||
private val payload: Any,
|
||||
private val otherPartySession: FlowSession? = null
|
||||
) : FlowLogic<Any>() {
|
||||
constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession)
|
||||
|
||||
@Suspendable
|
||||
@ -795,7 +805,8 @@ internal class SendAndReceiveFlow(private val destination: Destination, private
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
internal class PingPongFlow(private val otherParty: Party, private val payload: Long, private val otherPartySession: FlowSession? = null) : FlowLogic<Unit>() {
|
||||
internal class PingPongFlow(private val otherParty: Party, private val payload: Long, private val otherPartySession: FlowSession? = null) :
|
||||
FlowLogic<Unit>() {
|
||||
constructor(otherPartySession: FlowSession, payload: Long) : this(otherPartySession.counterparty, payload, otherPartySession)
|
||||
|
||||
@Transient
|
||||
|
@ -3,8 +3,17 @@ ENV GRADLE_USER_HOME=/tmp/gradle
|
||||
RUN mkdir /tmp/gradle && mkdir -p /home/root/.m2/repository
|
||||
|
||||
RUN apt-get update && apt-get install -y curl libatomic1 && \
|
||||
curl -O https://d3pxv6yz143wms.cloudfront.net/8.222.10.1/java-1.8.0-amazon-corretto-jdk_8.222.10-1_amd64.deb && \
|
||||
apt-get install -y java-common && dpkg -i java-1.8.0-amazon-corretto-jdk_8.222.10-1_amd64.deb && \
|
||||
curl -O https://cdn.azul.com/zulu/bin/zulu8.40.0.25-ca-jdk8.0.222-linux_amd64.deb && \
|
||||
apt-get install -y java-common && apt install -y ./zulu8.40.0.25-ca-jdk8.0.222-linux_amd64.deb && \
|
||||
apt-get clean && \
|
||||
mkdir -p /tmp/source
|
||||
rm -f zulu8.40.0.25-ca-jdk8.0.222-linux_amd64.deb && \
|
||||
curl -O https://cdn.azul.com/zulu/bin/zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz && \
|
||||
mv /zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz /usr/lib/jvm/ && \
|
||||
cd /usr/lib/jvm/ && \
|
||||
tar -zxvf zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz && \
|
||||
rm -rf zulu-8-amd64 && \
|
||||
mv zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64 zulu-8-amd64 && \
|
||||
rm -f zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz && \
|
||||
cd / && mkdir -p /tmp/source
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user