mirror of
https://github.com/corda/corda.git
synced 2025-02-25 19:11:45 +00:00
Merge remote-tracking branch 'origin/release/os/4.3' into zoltan/TM-40-k8s_workers_persistent_storage
# Conflicts: # buildSrc/src/main/groovy/net/corda/testing/KubesTest.groovy # buildSrc/src/main/java/net/corda/testing/KubePodResult.java
This commit is contained in:
commit
6f13263b3b
@ -2464,7 +2464,6 @@ public interface net.corda.core.flows.IdentifiableException
|
||||
@Nullable
|
||||
public Long getErrorId()
|
||||
##
|
||||
@CordaSerializable
|
||||
public final class net.corda.core.flows.IllegalFlowLogicException extends java.lang.IllegalArgumentException
|
||||
public <init>(Class<?>, String)
|
||||
public <init>(String, String)
|
||||
|
26
.ci/dev/unit/Jenkinsfile
vendored
26
.ci/dev/unit/Jenkinsfile
vendored
@ -29,20 +29,18 @@ pipeline {
|
||||
}
|
||||
}
|
||||
|
||||
stage('Corda Pull Request - Run Tests') {
|
||||
stage('Unit Tests') {
|
||||
steps {
|
||||
sh "./gradlew " +
|
||||
"-DbuildId=\"\${BUILD_ID}\" " +
|
||||
"-Dkubenetize=true " +
|
||||
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
|
||||
" allParallelUnitTest"
|
||||
if (env.CHANGE_ID) {
|
||||
pullRequest.createStatus(status: 'success',
|
||||
context: 'continuous-integration/jenkins/pr-merge/unitTest',
|
||||
description: 'Unit Tests Passed',
|
||||
targetUrl: "${env.JOB_URL}/testResults")
|
||||
}
|
||||
stage('Unit Tests') {
|
||||
steps {
|
||||
sh "./gradlew " +
|
||||
"-DbuildId=\"\${BUILD_ID}\" " +
|
||||
"-Dkubenetize=true " +
|
||||
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
|
||||
" allParallelUnitTest"
|
||||
if (env.CHANGE_ID) {
|
||||
pullRequest.createStatus(status: 'success',
|
||||
context: 'continuous-integration/jenkins/pr-merge/unitTest',
|
||||
description: 'Unit Tests Passed',
|
||||
targetUrl: "${env.JOB_URL}/testResults")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
10
Jenkinsfile
vendored
10
Jenkinsfile
vendored
@ -1,3 +1,4 @@
|
||||
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
|
||||
@Library('existing-build-control')
|
||||
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
|
||||
|
||||
@ -39,15 +40,6 @@ pipeline {
|
||||
" allParallelIntegrationTest"
|
||||
}
|
||||
}
|
||||
stage('Unit Tests') {
|
||||
steps {
|
||||
sh "./gradlew " +
|
||||
"-DbuildId=\"\${BUILD_ID}\" " +
|
||||
"-Dkubenetize=true " +
|
||||
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
|
||||
" allParallelUnitTest"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
18
build.gradle
18
build.gradle
@ -612,6 +612,14 @@ task allParallelSlowIntegrationTest(type: ParallelTestGroup) {
|
||||
memoryInGbPerFork 10
|
||||
distribute Distribution.CLASS
|
||||
}
|
||||
task allParallelSmokeTest(type: ParallelTestGroup) {
|
||||
testGroups "smokeTest"
|
||||
numberOfShards 4
|
||||
streamOutput true
|
||||
coresPerFork 6
|
||||
memoryInGbPerFork 10
|
||||
distribute Distribution.METHOD
|
||||
}
|
||||
task allParallelUnitTest(type: ParallelTestGroup) {
|
||||
testGroups "test"
|
||||
numberOfShards 10
|
||||
@ -623,7 +631,15 @@ task allParallelUnitTest(type: ParallelTestGroup) {
|
||||
task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
|
||||
testGroups "test", "integrationTest"
|
||||
numberOfShards 15
|
||||
streamOutput true
|
||||
streamOutput false
|
||||
coresPerFork 6
|
||||
memoryInGbPerFork 10
|
||||
distribute Distribution.CLASS
|
||||
}
|
||||
task parallelRegressionTest(type: ParallelTestGroup) {
|
||||
testGroups "test", "integrationTest", "slowIntegrationTest", "smokeTest"
|
||||
numberOfShards 5
|
||||
streamOutput false
|
||||
coresPerFork 6
|
||||
memoryInGbPerFork 10
|
||||
distribute Distribution.CLASS
|
||||
|
@ -23,15 +23,7 @@ allprojects {
|
||||
}
|
||||
}
|
||||
|
||||
configurations {
|
||||
runtime
|
||||
}
|
||||
|
||||
dependencies {
|
||||
// Add the top-level projects ONLY to the host project.
|
||||
runtime project.childProjects.collect { n, p ->
|
||||
project(p.path)
|
||||
}
|
||||
compile gradleApi()
|
||||
compile "io.fabric8:kubernetes-client:4.4.1"
|
||||
compile 'org.apache.commons:commons-compress:1.19'
|
||||
|
@ -51,13 +51,12 @@ public class BucketingAllocator {
|
||||
System.out.println("Number of tests: " + container.testsForFork.stream().mapToInt(b -> b.foundTests.size()).sum());
|
||||
System.out.println("Tests to Run: ");
|
||||
container.testsForFork.forEach(tb -> {
|
||||
System.out.println(tb.nameWithAsterix);
|
||||
System.out.println(tb.testName);
|
||||
tb.foundTests.forEach(ft -> System.out.println("\t" + ft.getFirst() + ", " + ft.getSecond()));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
private void allocateTestsToForks(@NotNull List<TestBucket> matchedTests) {
|
||||
matchedTests.forEach(matchedTestBucket -> {
|
||||
TestsForForkContainer smallestContainer = Collections.min(forkContainers, Comparator.comparing(TestsForForkContainer::getCurrentDuration));
|
||||
@ -69,10 +68,9 @@ public class BucketingAllocator {
|
||||
return allDiscoveredTests.stream().map(tuple -> {
|
||||
String testName = tuple.getFirst();
|
||||
Object task = tuple.getSecond();
|
||||
String noAsterixName = testName.substring(0, testName.length() - 1);
|
||||
//2DO [can this filtering algorithm be improved - the test names are sorted, it should be possible to do something using binary search]
|
||||
List<Tuple2<String, Double>> matchingTests = allTestsFromCSV.stream().filter(testFromCSV -> testFromCSV.getFirst().startsWith(noAsterixName)).collect(Collectors.toList());
|
||||
return new TestBucket(task, testName, noAsterixName, matchingTests);
|
||||
List<Tuple2<String, Double>> matchingTests = allTestsFromCSV.stream().filter(testFromCSV -> testFromCSV.getFirst().startsWith(testName)).collect(Collectors.toList());
|
||||
return new TestBucket(task, testName, matchingTests);
|
||||
}).sorted(Comparator.comparing(TestBucket::getDuration).reversed()).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@ -81,22 +79,20 @@ public class BucketingAllocator {
|
||||
TestLister lister = source.getFirst();
|
||||
Object testTask = source.getSecond();
|
||||
return lister.getAllTestsDiscovered().stream().map(test -> new Tuple2<>(test, testTask)).collect(Collectors.toList());
|
||||
}).flatMap(Collection::stream).collect(Collectors.toList());
|
||||
}).flatMap(Collection::stream).sorted(Comparator.comparing(Tuple2::getFirst)).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static class TestBucket {
|
||||
final Object testTask;
|
||||
final String nameWithAsterix;
|
||||
final String nameWithoutAsterix;
|
||||
final String testName;
|
||||
final List<Tuple2<String, Double>> foundTests;
|
||||
final Double duration;
|
||||
|
||||
public TestBucket(Object testTask, String nameWithAsterix, String nameWithoutAsterix, List<Tuple2<String, Double>> foundTests) {
|
||||
public TestBucket(Object testTask, String testName, List<Tuple2<String, Double>> foundTests) {
|
||||
this.testTask = testTask;
|
||||
this.nameWithAsterix = nameWithAsterix;
|
||||
this.nameWithoutAsterix = nameWithoutAsterix;
|
||||
this.testName = testName;
|
||||
this.foundTests = foundTests;
|
||||
duration = Math.max(foundTests.stream().mapToDouble(tp -> Math.max(tp.getSecond(), 10)).sum(), 10);
|
||||
duration = Math.max(foundTests.stream().mapToDouble(tp -> Math.max(tp.getSecond(), 1)).sum(), 1);
|
||||
}
|
||||
|
||||
public Double getDuration() {
|
||||
@ -107,8 +103,7 @@ public class BucketingAllocator {
|
||||
public String toString() {
|
||||
return "TestBucket{" +
|
||||
"testTask=" + testTask +
|
||||
", nameWithAsterix='" + nameWithAsterix + '\'' +
|
||||
", nameWithoutAsterix='" + nameWithoutAsterix + '\'' +
|
||||
", nameWithAsterix='" + testName + '\'' +
|
||||
", foundTests=" + foundTests +
|
||||
", duration=" + duration +
|
||||
'}';
|
||||
@ -142,7 +137,7 @@ public class BucketingAllocator {
|
||||
}
|
||||
|
||||
public List<String> getTestsForTask(Object task) {
|
||||
return frozenTests.getOrDefault(task, Collections.emptyList()).stream().map(it -> it.nameWithAsterix).collect(Collectors.toList());
|
||||
return frozenTests.getOrDefault(task, Collections.emptyList()).stream().map(it -> it.testName).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public List<TestBucket> getBucketsForFork() {
|
||||
|
@ -41,8 +41,8 @@ public class BucketingAllocatorTask extends DefaultTask {
|
||||
this.dependsOn(source);
|
||||
}
|
||||
|
||||
public List<String> getTestsForForkAndTestTask(Integer fork, Test testTask) {
|
||||
return allocator.getTestsForForkAndTestTask(fork, testTask);
|
||||
public List<String> getTestIncludesForForkAndTestTask(Integer fork, Test testTask) {
|
||||
return allocator.getTestsForForkAndTestTask(fork, testTask).stream().map(t -> t + "*").collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@TaskAction
|
||||
@ -56,11 +56,11 @@ public class BucketingAllocatorTask extends DefaultTask {
|
||||
String duration = "Duration(ms)";
|
||||
List<CSVRecord> records = CSVFormat.DEFAULT.withHeader().parse(reader).getRecords();
|
||||
return records.stream().map(record -> {
|
||||
try{
|
||||
try {
|
||||
String testName = record.get(name);
|
||||
String testDuration = record.get(duration);
|
||||
return new Tuple2<>(testName, Math.max(Double.parseDouble(testDuration), 10));
|
||||
}catch (IllegalArgumentException | IllegalStateException e){
|
||||
return new Tuple2<>(testName, Math.max(Double.parseDouble(testDuration), 1));
|
||||
} catch (IllegalArgumentException | IllegalStateException e) {
|
||||
return null;
|
||||
}
|
||||
}).filter(Objects::nonNull).sorted(Comparator.comparing(Tuple2::getFirst)).collect(Collectors.toList());
|
||||
|
@ -131,7 +131,7 @@ class DistributedTesting implements Plugin<Project> {
|
||||
filter {
|
||||
def fork = getPropertyAsInt(subProject, "dockerFork", 0)
|
||||
subProject.logger.info("requesting tests to include in testing task ${task.getPath()} (idx: ${fork})")
|
||||
List<String> includes = globalAllocator.getTestsForForkAndTestTask(
|
||||
List<String> includes = globalAllocator.getTestIncludesForForkAndTestTask(
|
||||
fork,
|
||||
task)
|
||||
subProject.logger.info "got ${includes.size()} tests to include into testing task ${task.getPath()}"
|
||||
|
339
buildSrc/src/main/groovy/net/corda/testing/KubesTest.java
Normal file
339
buildSrc/src/main/groovy/net/corda/testing/KubesTest.java
Normal file
@ -0,0 +1,339 @@
|
||||
package net.corda.testing;
|
||||
|
||||
import io.fabric8.kubernetes.api.model.*;
|
||||
import io.fabric8.kubernetes.client.*;
|
||||
import io.fabric8.kubernetes.client.dsl.ExecListener;
|
||||
import io.fabric8.kubernetes.client.dsl.ExecWatch;
|
||||
import io.fabric8.kubernetes.client.utils.Serialization;
|
||||
import okhttp3.Response;
|
||||
import org.gradle.api.DefaultTask;
|
||||
import org.gradle.api.tasks.TaskAction;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.io.*;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
public class KubesTest extends DefaultTask {
|
||||
|
||||
static final ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
static final ExecutorService singleThreadedExecutor = Executors.newSingleThreadExecutor();
|
||||
|
||||
String dockerTag;
|
||||
String fullTaskToExecutePath;
|
||||
String taskToExecuteName;
|
||||
Boolean printOutput = false;
|
||||
Integer numberOfCoresPerFork = 4;
|
||||
Integer memoryGbPerFork = 6;
|
||||
public volatile List<File> testOutput = Collections.emptyList();
|
||||
public volatile List<KubePodResult> containerResults = Collections.emptyList();
|
||||
|
||||
String namespace = "thisisatest";
|
||||
int k8sTimeout = 50 * 1_000;
|
||||
int webSocketTimeout = k8sTimeout * 6;
|
||||
int numberOfPods = 20;
|
||||
int timeoutInMinutesForPodToStart = 60;
|
||||
|
||||
Distribution distribution = Distribution.METHOD;
|
||||
|
||||
@TaskAction
|
||||
public void runDistributedTests() {
|
||||
String buildId = System.getProperty("buildId", "0");
|
||||
String currentUser = System.getProperty("user.name", "UNKNOWN_USER");
|
||||
|
||||
String stableRunId = new BigInteger(64, new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode())).toString(36).toLowerCase();
|
||||
String suffix = new BigInteger(64, new Random()).toString(36).toLowerCase();
|
||||
|
||||
final KubernetesClient client = getKubernetesClient();
|
||||
|
||||
|
||||
try {
|
||||
client.pods().inNamespace(namespace).list().getItems().forEach(podToDelete -> {
|
||||
if (podToDelete.getMetadata().getName().contains(stableRunId)) {
|
||||
getProject().getLogger().lifecycle("deleting: " + podToDelete.getMetadata().getName());
|
||||
client.resource(podToDelete).delete();
|
||||
}
|
||||
});
|
||||
} catch (Exception ignored) {
|
||||
//it's possible that a pod is being deleted by the original build, this can lead to racey conditions
|
||||
}
|
||||
|
||||
List<CompletableFuture<KubePodResult>> futures = IntStream.range(0, numberOfPods).mapToObj(i -> {
|
||||
String potentialPodName = (taskToExecuteName + "-" + stableRunId + suffix + i).toLowerCase();
|
||||
String podName = potentialPodName.substring(0, Math.min(potentialPodName.length(), 62));
|
||||
return runBuild(client, namespace, numberOfPods, i, podName, printOutput, 3);
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
this.testOutput = Collections.synchronizedList(futures.stream().map(it -> {
|
||||
try {
|
||||
return it.get().getBinaryResults();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}).flatMap(Collection::stream).collect(Collectors.toList()));
|
||||
this.containerResults = futures.stream().map(it -> {
|
||||
try {
|
||||
return it.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@NotNull
|
||||
KubernetesClient getKubernetesClient() {
|
||||
io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder()
|
||||
.withConnectionTimeout(k8sTimeout)
|
||||
.withRequestTimeout(k8sTimeout)
|
||||
.withRollingTimeout(k8sTimeout)
|
||||
.withWebsocketTimeout(webSocketTimeout)
|
||||
.withWebsocketPingInterval(webSocketTimeout)
|
||||
.build();
|
||||
|
||||
return new DefaultKubernetesClient(config);
|
||||
}
|
||||
|
||||
CompletableFuture<KubePodResult> runBuild(KubernetesClient client,
|
||||
String namespace,
|
||||
int numberOfPods,
|
||||
int podIdx,
|
||||
String podName,
|
||||
boolean printOutput,
|
||||
int numberOfRetries) {
|
||||
|
||||
CompletableFuture<KubePodResult> toReturn = new CompletableFuture<>();
|
||||
executorService.submit(() -> {
|
||||
int tryCount = 0;
|
||||
Pod createdPod = null;
|
||||
while (tryCount < numberOfRetries) {
|
||||
try {
|
||||
Pod podRequest = buildPod(podName);
|
||||
getProject().getLogger().lifecycle("requesting pod: " + podName);
|
||||
createdPod = client.pods().inNamespace(namespace).create(podRequest);
|
||||
getProject().getLogger().lifecycle("scheduled pod: " + podName);
|
||||
File outputFile = Files.createTempFile("container", ".log").toFile();
|
||||
attachStatusListenerToPod(client, namespace, podName);
|
||||
schedulePodForDeleteOnShutdown(podName, client, createdPod);
|
||||
waitForPodToStart(podName, client, namespace);
|
||||
PipedOutputStream stdOutOs = new PipedOutputStream();
|
||||
PipedInputStream stdOutIs = new PipedInputStream(4096);
|
||||
ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream();
|
||||
KubePodResult result = new KubePodResult(createdPod, outputFile);
|
||||
CompletableFuture<KubePodResult> waiter = new CompletableFuture<>();
|
||||
ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter, result);
|
||||
stdOutIs.connect(stdOutOs);
|
||||
String[] buildCommand = getBuildCommand(numberOfPods, podIdx);
|
||||
ExecWatch execWatch = client.pods().inNamespace(namespace).withName(podName)
|
||||
.writingOutput(stdOutOs)
|
||||
.writingErrorChannel(errChannelStream)
|
||||
.usingListener(execListener).exec(buildCommand);
|
||||
|
||||
startLogPumping(outputFile, stdOutIs, podIdx, printOutput);
|
||||
KubePodResult execResult = waiter.join();
|
||||
getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + ")");
|
||||
getLogger().lifecycle("Gathering test results from " + execResult.getCreatedPod().getMetadata().getName());
|
||||
Collection<File> binaryResults = downloadTestXmlFromPod(client, namespace, execResult.getCreatedPod());
|
||||
getLogger().lifecycle("deleting: " + execResult.getCreatedPod().getMetadata().getName());
|
||||
client.resource(execResult.getCreatedPod()).delete();
|
||||
result.setBinaryResults(binaryResults);
|
||||
toReturn.complete(result);
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
getLogger().error("Encountered error during testing cycle on pod " + podName + " (" + podIdx / numberOfPods + ")", e);
|
||||
try {
|
||||
if (createdPod != null) {
|
||||
client.pods().delete(createdPod);
|
||||
while (client.pods().inNamespace(namespace).list().getItems().stream().anyMatch(p -> Objects.equals(p.getMetadata().getName(), podName))) {
|
||||
getLogger().warn("pod " + podName + " has not been deleted, waiting 1s");
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
}
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
tryCount++;
|
||||
getLogger().lifecycle("will retry ${podName} another ${numberOfRetries - tryCount} times");
|
||||
}
|
||||
}
|
||||
if (tryCount >= numberOfRetries) {
|
||||
toReturn.completeExceptionally(new RuntimeException("Failed to build in pod ${podName} (${podIdx}/${numberOfPods}) within retry limit"));
|
||||
}
|
||||
});
|
||||
return toReturn;
|
||||
}
|
||||
|
||||
|
||||
Pod buildPod(String podName) {
|
||||
return new PodBuilder().withNewMetadata().withName(podName).endMetadata()
|
||||
.withNewSpec()
|
||||
.addNewVolume()
|
||||
.withName("gradlecache")
|
||||
.withNewHostPath()
|
||||
.withPath("/tmp/gradle")
|
||||
.withType("DirectoryOrCreate")
|
||||
.endHostPath()
|
||||
.endVolume()
|
||||
.addNewContainer()
|
||||
.withImage(dockerTag)
|
||||
.withCommand("bash")
|
||||
.withArgs("-c", "sleep 3600")
|
||||
.addNewEnv()
|
||||
.withName("DRIVER_NODE_MEMORY")
|
||||
.withValue("1024m")
|
||||
.withName("DRIVER_WEB_MEMORY")
|
||||
.withValue("1024m")
|
||||
.endEnv()
|
||||
.withName(podName)
|
||||
.withNewResources()
|
||||
.addToRequests("cpu", new Quantity(numberOfCoresPerFork.toString()))
|
||||
.addToRequests("memory", new Quantity(memoryGbPerFork.toString() + "Gi"))
|
||||
.endResources()
|
||||
.addNewVolumeMount()
|
||||
.withName("gradlecache")
|
||||
.withMountPath("/tmp/gradle")
|
||||
.endVolumeMount()
|
||||
.endContainer()
|
||||
.withImagePullSecrets(new LocalObjectReference("regcred"))
|
||||
.withRestartPolicy("Never")
|
||||
.endSpec()
|
||||
.build();
|
||||
}
|
||||
|
||||
void startLogPumping(File outputFile, InputStream stdOutIs, Integer podIdx, boolean printOutput) {
|
||||
Thread loggingThread = new Thread(() -> {
|
||||
try (BufferedWriter out = new BufferedWriter(new FileWriter(outputFile));
|
||||
BufferedReader br = new BufferedReader(new InputStreamReader(stdOutIs))) {
|
||||
String line;
|
||||
while ((line = br.readLine()) != null) {
|
||||
String toWrite = ("Container" + podIdx + ": " + line).trim();
|
||||
if (printOutput) {
|
||||
getProject().getLogger().lifecycle(toWrite);
|
||||
}
|
||||
out.write(line);
|
||||
out.newLine();
|
||||
}
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
});
|
||||
|
||||
loggingThread.setDaemon(true);
|
||||
loggingThread.start();
|
||||
}
|
||||
|
||||
Watch attachStatusListenerToPod(KubernetesClient client, String namespace, String podName) {
|
||||
return client.pods().inNamespace(namespace).withName(podName).watch(new Watcher<Pod>() {
|
||||
@Override
|
||||
public void eventReceived(Watcher.Action action, Pod resource) {
|
||||
getProject().getLogger().lifecycle("[StatusChange] pod " + resource.getMetadata().getName() + " " + action.name() + " (" + resource.getStatus().getPhase() + ")");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(KubernetesClientException cause) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void waitForPodToStart(String podName, KubernetesClient client, String namespace) {
|
||||
getProject().getLogger().lifecycle("Waiting for pod " + podName + " to start before executing build");
|
||||
try {
|
||||
client.pods().inNamespace(namespace).withName(podName).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
getProject().getLogger().lifecycle("pod " + podName + " has started, executing build");
|
||||
}
|
||||
|
||||
Collection<File> downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) {
|
||||
String resultsInContainerPath = "/tmp/source/build/test-reports";
|
||||
String binaryResultsFile = "results.bin";
|
||||
String podName = cp.getMetadata().getName();
|
||||
Path tempDir = new File(new File(getProject().getBuildDir(), "test-results-xml"), podName).toPath();
|
||||
|
||||
if (!tempDir.toFile().exists()) {
|
||||
tempDir.toFile().mkdirs();
|
||||
}
|
||||
|
||||
getProject().getLogger().lifecycle("Saving " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath());
|
||||
client.pods()
|
||||
.inNamespace(namespace)
|
||||
.withName(podName)
|
||||
.dir(resultsInContainerPath)
|
||||
.copy(tempDir);
|
||||
|
||||
return findFolderContainingBinaryResultsFile(new File(tempDir.toFile().getAbsolutePath()), binaryResultsFile);
|
||||
}
|
||||
|
||||
String[] getBuildCommand(int numberOfPods, int podIdx) {
|
||||
String shellScript = "let x=1 ; while [ ${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl services.gradle.org > /dev/null 2>&1 ; x=$? ; sleep 1 ; done ; " + "cd /tmp/source ; " +
|
||||
"let y=1 ; while [ ${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=$? ; sleep 1 ; done ;" +
|
||||
"./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + distribution.name() + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " --info 2>&1 ;" +
|
||||
"let rs=$? ; sleep 10 ; exit ${rs}";
|
||||
return new String[]{"bash", "-c", shellScript};
|
||||
}
|
||||
|
||||
List<File> findFolderContainingBinaryResultsFile(File start, String fileNameToFind) {
|
||||
Queue<File> filesToInspect = new LinkedList<>(Collections.singletonList(start));
|
||||
List<File> folders = new ArrayList<>();
|
||||
while (!filesToInspect.isEmpty()) {
|
||||
File fileToInspect = filesToInspect.poll();
|
||||
if (fileToInspect.getAbsolutePath().endsWith(fileNameToFind)) {
|
||||
folders.add(fileToInspect.getParentFile());
|
||||
}
|
||||
|
||||
if (fileToInspect.isDirectory()) {
|
||||
filesToInspect.addAll(Arrays.stream(fileToInspect.listFiles()).collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
return folders;
|
||||
}
|
||||
|
||||
void schedulePodForDeleteOnShutdown(String podName, KubernetesClient client, Pod createdPod) {
|
||||
getProject().getLogger().info("attaching shutdown hook for pod " + podName);
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
System.out.println("Deleting pod: " + podName);
|
||||
client.pods().delete(createdPod);
|
||||
}));
|
||||
}
|
||||
|
||||
ExecListener buildExecListenerForPod(String podName, ByteArrayOutputStream errChannelStream, CompletableFuture<KubePodResult> waitingFuture, KubePodResult result) {
|
||||
|
||||
return new ExecListener() {
|
||||
final Long start = System.currentTimeMillis();
|
||||
|
||||
@Override
|
||||
public void onOpen(Response response) {
|
||||
getProject().getLogger().lifecycle("Build started on pod " + podName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t, Response response) {
|
||||
getProject().getLogger().lifecycle("Received error from rom pod " + podName);
|
||||
waitingFuture.completeExceptionally(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(int code, String reason) {
|
||||
getProject().getLogger().lifecycle("Received onClose() from pod " + podName + " , build took: " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
|
||||
try {
|
||||
String errChannelContents = errChannelStream.toString();
|
||||
Status status = Serialization.unmarshal(errChannelContents, Status.class);
|
||||
Integer resultCode = Optional.ofNullable(status).map(Status::getDetails)
|
||||
.map(StatusDetails::getCauses)
|
||||
.flatMap(c -> c.stream().findFirst())
|
||||
.map(StatusCause::getMessage)
|
||||
.map(Integer::parseInt).orElse(0);
|
||||
result.setResultCode(resultCode);
|
||||
waitingFuture.complete(result);
|
||||
} catch (Exception e) {
|
||||
waitingFuture.completeExceptionally(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -79,7 +79,7 @@ class ListTests extends DefaultTask implements TestLister {
|
||||
.collect { c -> (c.getSubclasses() + Collections.singletonList(c)) }
|
||||
.flatten()
|
||||
.collect { ClassInfo c ->
|
||||
c.getMethodInfo().filter { m -> m.hasAnnotation("org.junit.Test") }.collect { m -> c.name + "." + m.name + "*" }
|
||||
c.getMethodInfo().filter { m -> m.hasAnnotation("org.junit.Test") }.collect { m -> c.name + "." + m.name }
|
||||
}.flatten()
|
||||
.toSet()
|
||||
|
||||
@ -97,7 +97,7 @@ class ListTests extends DefaultTask implements TestLister {
|
||||
.getClassesWithMethodAnnotation("org.junit.Test")
|
||||
.collect { c -> (c.getSubclasses() + Collections.singletonList(c)) }
|
||||
.flatten()
|
||||
.collect { ClassInfo c -> c.name + "*" }.flatten()
|
||||
.collect { ClassInfo c -> c.name }.flatten()
|
||||
.toSet()
|
||||
this.allTests = results.stream().sorted().collect(Collectors.toList())
|
||||
break
|
||||
|
@ -21,12 +21,12 @@ public class BucketingAllocatorTest {
|
||||
BucketingAllocator bucketingAllocator = new BucketingAllocator(1, Collections::emptyList);
|
||||
|
||||
Object task = new Object();
|
||||
bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass*", "AnotherTestingClass*"), task);
|
||||
bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass", "AnotherTestingClass"), task);
|
||||
|
||||
bucketingAllocator.generateTestPlan();
|
||||
List<String> testsForForkAndTestTask = bucketingAllocator.getTestsForForkAndTestTask(0, task);
|
||||
|
||||
Assert.assertThat(testsForForkAndTestTask, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass*", "AnotherTestingClass*"));
|
||||
Assert.assertThat(testsForForkAndTestTask, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass", "AnotherTestingClass"));
|
||||
|
||||
}
|
||||
|
||||
@ -37,7 +37,7 @@ public class BucketingAllocatorTest {
|
||||
BucketingAllocator bucketingAllocator = new BucketingAllocator(2, Collections::emptyList);
|
||||
|
||||
Object task = new Object();
|
||||
bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass*", "AnotherTestingClass*"), task);
|
||||
bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass", "AnotherTestingClass"), task);
|
||||
|
||||
bucketingAllocator.generateTestPlan();
|
||||
List<String> testsForForkOneAndTestTask = bucketingAllocator.getTestsForForkAndTestTask(0, task);
|
||||
@ -48,7 +48,7 @@ public class BucketingAllocatorTest {
|
||||
|
||||
List<String> allTests = Stream.of(testsForForkOneAndTestTask, testsForForkTwoAndTestTask).flatMap(Collection::stream).collect(Collectors.toList());
|
||||
|
||||
Assert.assertThat(allTests, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass*", "AnotherTestingClass*"));
|
||||
Assert.assertThat(allTests, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass", "AnotherTestingClass"));
|
||||
|
||||
}
|
||||
}
|
@ -1,10 +1,10 @@
|
||||
package net.corda.client.rpc.internal
|
||||
|
||||
import co.paralleluniverse.common.util.SameThreadExecutor
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.RemovalCause
|
||||
import com.github.benmanes.caffeine.cache.RemovalListener
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||
import net.corda.client.rpc.ConnectionFailureException
|
||||
@ -132,7 +132,10 @@ class RPCClientProxyHandler(
|
||||
private var sendExecutor: ExecutorService? = null
|
||||
|
||||
// A sticky pool for running Observable.onNext()s. We need the stickiness to preserve the observation ordering.
|
||||
private val observationExecutorThreadFactory = ThreadFactoryBuilder().setNameFormat("rpc-client-observation-pool-%d").setDaemon(true).build()
|
||||
private val observationExecutorThreadFactory = ThreadFactoryBuilder()
|
||||
.setNameFormat("rpc-client-observation-pool-%d")
|
||||
.setDaemon(true)
|
||||
.build()
|
||||
private val observationExecutorPool = LazyStickyPool(rpcConfiguration.observationExecutorPoolSize) {
|
||||
Executors.newFixedThreadPool(1, observationExecutorThreadFactory)
|
||||
}
|
||||
@ -156,12 +159,14 @@ class RPCClientProxyHandler(
|
||||
private val observablesToReap = ThreadBox(object {
|
||||
var observables = ArrayList<InvocationId>()
|
||||
})
|
||||
private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext)
|
||||
private val serializationContextWithObservableContext = RpcClientObservableDeSerializer
|
||||
.createContext(serializationContext, observableContext)
|
||||
|
||||
private fun createRpcObservableMap(): RpcObservableMap {
|
||||
val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, _, cause ->
|
||||
val observableId = key!!
|
||||
val rpcCallSite: CallSite? = callSiteMap?.remove(observableId)
|
||||
|
||||
if (cause == RemovalCause.COLLECTED) {
|
||||
log.warn(listOf(
|
||||
"A hot observable returned from an RPC was never subscribed to.",
|
||||
@ -175,7 +180,13 @@ class RPCClientProxyHandler(
|
||||
}
|
||||
observablesToReap.locked { observables.add(observableId) }
|
||||
}
|
||||
return cacheFactory.buildNamed(Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()), "RpcClientProxyHandler_rpcObservable")
|
||||
return cacheFactory.buildNamed(
|
||||
Caffeine.newBuilder()
|
||||
.weakValues()
|
||||
.removalListener(onObservableRemove)
|
||||
.executor(MoreExecutors.directExecutor()),
|
||||
"RpcClientProxyHandler_rpcObservable"
|
||||
)
|
||||
}
|
||||
|
||||
private var sessionFactory: ClientSessionFactory? = null
|
||||
|
@ -32,3 +32,5 @@ metricsVersion=4.1.0
|
||||
metricsNewRelicVersion=1.1.1
|
||||
openSourceBranch=https://github.com/corda/corda/blob/master
|
||||
openSourceSamplesBranch=https://github.com/corda/samples/blob/master
|
||||
jolokiaAgentVersion=1.6.1
|
||||
|
||||
|
@ -1,8 +1,13 @@
|
||||
package net.corda.coretests.contracts
|
||||
|
||||
import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.contracts.TransactionVerificationException
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.createContractCreationError
|
||||
import net.corda.core.internal.createContractRejection
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
@ -13,6 +18,9 @@ import net.corda.serialization.internal.amqp.SerializationOutput
|
||||
import net.corda.serialization.internal.amqp.SerializerFactoryBuilder
|
||||
import net.corda.serialization.internal.amqp.custom.PublicKeySerializer
|
||||
import net.corda.serialization.internal.amqp.custom.ThrowableSerializer
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.TestIdentity
|
||||
@ -23,7 +31,9 @@ class TransactionVerificationExceptionSerialisationTests {
|
||||
private fun defaultFactory() = SerializerFactoryBuilder.build(
|
||||
AllWhitelist,
|
||||
ClassLoader.getSystemClassLoader()
|
||||
).apply { register(ThrowableSerializer(this)) }
|
||||
).apply {
|
||||
register(ThrowableSerializer(this))
|
||||
}
|
||||
|
||||
private val context get() = AMQP_RPC_CLIENT_CONTEXT
|
||||
|
||||
@ -179,4 +189,125 @@ class TransactionVerificationExceptionSerialisationTests {
|
||||
|
||||
assertEquals(exc.message, exc2.message)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun transactionNetworkParameterOrderingExceptionTest() {
|
||||
val exception = TransactionVerificationException.TransactionNetworkParameterOrderingException(
|
||||
txid,
|
||||
StateRef(SecureHash.zeroHash, 1),
|
||||
testNetworkParameters(),
|
||||
testNetworkParameters())
|
||||
val exception2 = DeserializationInput(factory)
|
||||
.deserialize(
|
||||
SerializationOutput(factory)
|
||||
.serialize(exception, context),
|
||||
context)
|
||||
|
||||
assertEquals(exception.message, exception2.message)
|
||||
assertEquals(exception.cause?.message, exception2.cause?.message)
|
||||
assertEquals(exception.txId, exception2.txId)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun missingNetworkParametersExceptionTest() {
|
||||
val exception = TransactionVerificationException.MissingNetworkParametersException(txid, SecureHash.zeroHash)
|
||||
val exception2 = DeserializationInput(factory)
|
||||
.deserialize(
|
||||
SerializationOutput(factory)
|
||||
.serialize(exception, context),
|
||||
context)
|
||||
|
||||
assertEquals(exception.message, exception2.message)
|
||||
assertEquals(exception.cause?.message, exception2.cause?.message)
|
||||
assertEquals(exception.txId, exception2.txId)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun constraintPropagationRejectionTest() {
|
||||
val exception = TransactionVerificationException.ConstraintPropagationRejection(txid, "com.test.Contract",
|
||||
AlwaysAcceptAttachmentConstraint, AlwaysAcceptAttachmentConstraint)
|
||||
val exception2 = DeserializationInput(factory)
|
||||
.deserialize(
|
||||
SerializationOutput(factory)
|
||||
.serialize(exception, context),
|
||||
context)
|
||||
|
||||
assertEquals(exception.message, exception2.message)
|
||||
assertEquals(exception.cause?.message, exception2.cause?.message)
|
||||
assertEquals(exception.txId, exception2.txId)
|
||||
assertEquals("com.test.Contract", exception2.contractClass)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun transactionDuplicateEncumbranceExceptionTest() {
|
||||
val exception = TransactionVerificationException.TransactionDuplicateEncumbranceException(txid, 1)
|
||||
val exception2 = DeserializationInput(factory)
|
||||
.deserialize(
|
||||
SerializationOutput(factory)
|
||||
.serialize(exception, context),
|
||||
context)
|
||||
|
||||
assertEquals(exception.message, exception2.message)
|
||||
assertEquals(exception.cause?.message, exception2.cause?.message)
|
||||
assertEquals(exception.txId, exception2.txId)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun transactionNonMatchingEncumbranceExceptionTest() {
|
||||
val exception = TransactionVerificationException.TransactionNonMatchingEncumbranceException(txid, listOf(1, 2, 3))
|
||||
val exception2 = DeserializationInput(factory)
|
||||
.deserialize(
|
||||
SerializationOutput(factory)
|
||||
.serialize(exception, context),
|
||||
context)
|
||||
|
||||
assertEquals(exception.message, exception2.message)
|
||||
assertEquals(exception.cause?.message, exception2.cause?.message)
|
||||
assertEquals(exception.txId, exception2.txId)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun transactionNotaryMismatchEncumbranceExceptionTest() {
|
||||
val exception = TransactionVerificationException.TransactionNotaryMismatchEncumbranceException(
|
||||
txid, 1, 2, Party(ALICE_NAME, generateKeyPair().public), Party(BOB_NAME, generateKeyPair().public))
|
||||
val exception2 = DeserializationInput(factory)
|
||||
.deserialize(
|
||||
SerializationOutput(factory)
|
||||
.serialize(exception, context),
|
||||
context)
|
||||
|
||||
assertEquals(exception.message, exception2.message)
|
||||
assertEquals(exception.cause?.message, exception2.cause?.message)
|
||||
assertEquals(exception.txId, exception2.txId)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun transactionContractConflictExceptionTest() {
|
||||
val exception = TransactionVerificationException.TransactionContractConflictException(
|
||||
txid, TransactionState(DummyContractState(), notary = Party(BOB_NAME, generateKeyPair().public)), "aa")
|
||||
val exception2 = DeserializationInput(factory)
|
||||
.deserialize(
|
||||
SerializationOutput(factory)
|
||||
.serialize(exception, context),
|
||||
context)
|
||||
|
||||
assertEquals(exception.message, exception2.message)
|
||||
assertEquals(exception.cause?.message, exception2.cause?.message)
|
||||
assertEquals(exception.txId, exception2.txId)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun transactionRequiredContractUnspecifiedExceptionTest() {
|
||||
val exception = TransactionVerificationException.TransactionRequiredContractUnspecifiedException(
|
||||
txid, TransactionState(DummyContractState(), notary = Party(BOB_NAME, generateKeyPair().public)))
|
||||
val exception2 = DeserializationInput(factory)
|
||||
.deserialize(
|
||||
SerializationOutput(factory)
|
||||
.serialize(exception, context),
|
||||
context)
|
||||
|
||||
assertEquals(exception.message, exception2.message)
|
||||
assertEquals(exception.cause?.message, exception2.cause?.message)
|
||||
assertEquals(exception.txId, exception2.txId)
|
||||
}
|
||||
}
|
@ -70,8 +70,17 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S
|
||||
* @property outputConstraint The constraint of the outputs state.
|
||||
*/
|
||||
@KeepForDJVM
|
||||
class ConstraintPropagationRejection(txId: SecureHash, val contractClass: String, inputConstraint: AttachmentConstraint, outputConstraint: AttachmentConstraint)
|
||||
: TransactionVerificationException(txId, "Contract constraints for $contractClass are not propagated correctly. The outputConstraint: $outputConstraint is not a valid transition from the input constraint: $inputConstraint.", null)
|
||||
class ConstraintPropagationRejection(txId: SecureHash, message: String) : TransactionVerificationException(txId, message, null) {
|
||||
constructor(txId: SecureHash,
|
||||
contractClass: String,
|
||||
inputConstraint: AttachmentConstraint,
|
||||
outputConstraint: AttachmentConstraint) :
|
||||
this(txId, "Contract constraints for $contractClass are not propagated correctly. " +
|
||||
"The outputConstraint: $outputConstraint is not a valid transition from the input constraint: $inputConstraint.")
|
||||
|
||||
// This is only required for backwards compatibility. In case the message format changes, update the index.
|
||||
val contractClass: String = message.split(" ")[3]
|
||||
}
|
||||
|
||||
/**
|
||||
* The transaction attachment that contains the [contractClass] class didn't meet the constraints specified by
|
||||
@ -153,19 +162,24 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S
|
||||
* be satisfied.
|
||||
*/
|
||||
@KeepForDJVM
|
||||
class TransactionDuplicateEncumbranceException(txId: SecureHash, index: Int)
|
||||
: TransactionVerificationException(txId, "The bi-directionality property of encumbered output states " +
|
||||
"is not satisfied. Index $index is referenced more than once", null)
|
||||
class TransactionDuplicateEncumbranceException(txId: SecureHash, message: String)
|
||||
: TransactionVerificationException(txId, message, null) {
|
||||
constructor(txId: SecureHash, index: Int) : this(txId, "The bi-directionality property of encumbered output states " +
|
||||
"is not satisfied. Index $index is referenced more than once")
|
||||
}
|
||||
|
||||
/**
|
||||
* An encumbered state should also be referenced as the encumbrance of another state in order to satisfy the
|
||||
* bi-directionality property (a full cycle should be present).
|
||||
*/
|
||||
@KeepForDJVM
|
||||
class TransactionNonMatchingEncumbranceException(txId: SecureHash, nonMatching: Collection<Int>)
|
||||
: TransactionVerificationException(txId, "The bi-directionality property of encumbered output states " +
|
||||
"is not satisfied. Encumbered states should also be referenced as an encumbrance of another state to form " +
|
||||
"a full cycle. Offending indices $nonMatching", null)
|
||||
class TransactionNonMatchingEncumbranceException(txId: SecureHash, message: String)
|
||||
: TransactionVerificationException(txId, message, null) {
|
||||
constructor(txId: SecureHash, nonMatching: Collection<Int>) : this(txId,
|
||||
"The bi-directionality property of encumbered output states " +
|
||||
"is not satisfied. Encumbered states should also be referenced as an encumbrance of another state to form " +
|
||||
"a full cycle. Offending indices $nonMatching")
|
||||
}
|
||||
|
||||
/**
|
||||
* All encumbered states should be assigned to the same notary. This is due to the fact that multi-notary
|
||||
@ -173,9 +187,13 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S
|
||||
* in the same transaction.
|
||||
*/
|
||||
@KeepForDJVM
|
||||
class TransactionNotaryMismatchEncumbranceException(txId: SecureHash, encumberedIndex: Int, encumbranceIndex: Int, encumberedNotary: Party, encumbranceNotary: Party)
|
||||
: TransactionVerificationException(txId, "Encumbered output states assigned to different notaries found. " +
|
||||
"Output state with index $encumberedIndex is assigned to notary [$encumberedNotary], while its encumbrance with index $encumbranceIndex is assigned to notary [$encumbranceNotary]", null)
|
||||
class TransactionNotaryMismatchEncumbranceException(txId: SecureHash, message: String)
|
||||
: TransactionVerificationException(txId, message, null) {
|
||||
constructor(txId: SecureHash, encumberedIndex: Int, encumbranceIndex: Int, encumberedNotary: Party, encumbranceNotary: Party) :
|
||||
this(txId, "Encumbered output states assigned to different notaries found. " +
|
||||
"Output state with index $encumberedIndex is assigned to notary [$encumberedNotary], " +
|
||||
"while its encumbrance with index $encumbranceIndex is assigned to notary [$encumbranceNotary]")
|
||||
}
|
||||
|
||||
/**
|
||||
* If a state is identified as belonging to a contract, either because the state class is defined as an inner class
|
||||
@ -186,35 +204,44 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S
|
||||
* @param requiredContractClassName The class name of the contract to which the state belongs.
|
||||
*/
|
||||
@KeepForDJVM
|
||||
class TransactionContractConflictException(txId: SecureHash, state: TransactionState<ContractState>, requiredContractClassName: String)
|
||||
: TransactionVerificationException(txId,
|
||||
"""
|
||||
State of class ${state.data::class.java.typeName} belongs to contract $requiredContractClassName, but
|
||||
class TransactionContractConflictException(txId: SecureHash, message: String)
|
||||
: TransactionVerificationException(txId, message, null) {
|
||||
constructor(txId: SecureHash, state: TransactionState<ContractState>, requiredContractClassName: String): this(txId,
|
||||
"""
|
||||
State of class ${state.data ::class.java.typeName} belongs to contract $requiredContractClassName, but
|
||||
is bundled in TransactionState with ${state.contract}.
|
||||
|
||||
For details see: https://docs.corda.net/api-contract-constraints.html#contract-state-agreement
|
||||
""".trimIndent().replace('\n', ' '), null)
|
||||
""".trimIndent().replace('\n', ' '))
|
||||
}
|
||||
|
||||
// TODO: add reference to documentation
|
||||
@KeepForDJVM
|
||||
class TransactionRequiredContractUnspecifiedException(txId: SecureHash, state: TransactionState<ContractState>)
|
||||
: TransactionVerificationException(txId,
|
||||
"""
|
||||
class TransactionRequiredContractUnspecifiedException(txId: SecureHash, message: String)
|
||||
: TransactionVerificationException(txId, message, null) {
|
||||
constructor(txId: SecureHash, state: TransactionState<ContractState>) : this(txId,
|
||||
"""
|
||||
State of class ${state.data::class.java.typeName} does not have a specified owning contract.
|
||||
Add the @BelongsToContract annotation to this class to ensure that it can only be bundled in a TransactionState
|
||||
with the correct contract.
|
||||
|
||||
For details see: https://docs.corda.net/api-contract-constraints.html#contract-state-agreement
|
||||
""".trimIndent(), null)
|
||||
|
||||
""".trimIndent())
|
||||
}
|
||||
|
||||
/**
|
||||
* If the network parameters associated with an input or reference state in a transaction are more recent than the network parameters of the new transaction itself.
|
||||
*/
|
||||
@KeepForDJVM
|
||||
class TransactionNetworkParameterOrderingException(txId: SecureHash, inputStateRef: StateRef, txnNetworkParameters: NetworkParameters, inputNetworkParameters: NetworkParameters)
|
||||
: TransactionVerificationException(txId, "The network parameters epoch (${txnNetworkParameters.epoch}) of this transaction " +
|
||||
"is older than the epoch (${inputNetworkParameters.epoch}) of input state: $inputStateRef", null)
|
||||
class TransactionNetworkParameterOrderingException(txId: SecureHash, message: String) :
|
||||
TransactionVerificationException(txId, message, null) {
|
||||
constructor(txId: SecureHash,
|
||||
inputStateRef: StateRef,
|
||||
txnNetworkParameters: NetworkParameters,
|
||||
inputNetworkParameters: NetworkParameters)
|
||||
: this(txId, "The network parameters epoch (${txnNetworkParameters.epoch}) of this transaction " +
|
||||
"is older than the epoch (${inputNetworkParameters.epoch}) of input state: $inputStateRef")
|
||||
}
|
||||
|
||||
/**
|
||||
* Thrown when the network parameters with hash: missingNetworkParametersHash is not available at this node. Usually all the parameters
|
||||
@ -224,9 +251,11 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S
|
||||
* @param missingNetworkParametersHash Missing hash of the network parameters associated to this transaction
|
||||
*/
|
||||
@KeepForDJVM
|
||||
class MissingNetworkParametersException(txId: SecureHash, missingNetworkParametersHash: SecureHash)
|
||||
: TransactionVerificationException(txId, "Couldn't find network parameters with hash: $missingNetworkParametersHash related to this transaction: $txId", null)
|
||||
|
||||
class MissingNetworkParametersException(txId: SecureHash, message: String)
|
||||
: TransactionVerificationException(txId, message, null) {
|
||||
constructor(txId: SecureHash, missingNetworkParametersHash: SecureHash) :
|
||||
this(txId, "Couldn't find network parameters with hash: $missingNetworkParametersHash related to this transaction: $txId")
|
||||
}
|
||||
|
||||
/** Whether the inputs or outputs list contains an encumbrance issue, see [TransactionMissingEncumbranceException]. */
|
||||
@CordaSerializable
|
||||
|
@ -6,6 +6,8 @@ import net.corda.core.CordaInternal
|
||||
import net.corda.core.DeleteForDJVM
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.AnonymousParty
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.*
|
||||
@ -120,14 +122,18 @@ abstract class FlowLogic<out T> {
|
||||
* is routed depends on the [Destination] type, including whether this call does any initial communication.
|
||||
*/
|
||||
@Suspendable
|
||||
fun initiateFlow(destination: Destination): FlowSession = stateMachine.initiateFlow(destination)
|
||||
fun initiateFlow(destination: Destination): FlowSession {
|
||||
require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" }
|
||||
return stateMachine.initiateFlow(destination, serviceHub.identityService.wellKnownPartyFromAnonymous(destination as AbstractParty)
|
||||
?: throw IllegalArgumentException("Could not resolve destination: $destination"))
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note
|
||||
* that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
|
||||
*/
|
||||
@Suspendable
|
||||
fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party)
|
||||
fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party, party)
|
||||
|
||||
/**
|
||||
* Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that
|
||||
|
@ -45,7 +45,6 @@ interface FlowLogicRefFactory {
|
||||
*
|
||||
* @property type the fully qualified name of the class that failed checks.
|
||||
*/
|
||||
@CordaSerializable
|
||||
class IllegalFlowLogicException(val type: String, msg: String) :
|
||||
IllegalArgumentException("A FlowLogicRef cannot be constructed for FlowLogic of type $type: $msg") {
|
||||
constructor(type: Class<*>, msg: String) : this(type.name, msg)
|
||||
|
@ -18,7 +18,7 @@ interface FlowStateMachine<FLOWRETURN> {
|
||||
fun <SUSPENDRETURN : Any> suspend(ioRequest: FlowIORequest<SUSPENDRETURN>, maySkipCheckpoint: Boolean): SUSPENDRETURN
|
||||
|
||||
@Suspendable
|
||||
fun initiateFlow(destination: Destination): FlowSession
|
||||
fun initiateFlow(destination: Destination, wellKnownParty: Party): FlowSession
|
||||
|
||||
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>)
|
||||
|
||||
|
@ -80,9 +80,7 @@ interface IdentityService {
|
||||
* @param key The owning [PublicKey] of the [Party].
|
||||
* @return Returns a [Party] with a matching owningKey if known, else returns null.
|
||||
*/
|
||||
fun partyFromKey(key: PublicKey): Party? =
|
||||
@Suppress("DEPRECATION")
|
||||
certificateFromKey(key)?.party
|
||||
fun partyFromKey(key: PublicKey): Party?
|
||||
|
||||
/**
|
||||
* Resolves a party name to the well known identity [Party] instance for this name. Where possible well known identity
|
||||
|
File diff suppressed because one or more lines are too long
@ -10,29 +10,33 @@ complexity:
|
||||
active: true
|
||||
ComplexCondition:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
threshold: 4
|
||||
ComplexMethod:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
threshold: 10
|
||||
ignoreSingleWhenExpression: true
|
||||
LargeClass:
|
||||
active: true
|
||||
excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt"
|
||||
excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt,**/buildSrc/**"
|
||||
threshold: 600
|
||||
LongMethod:
|
||||
active: true
|
||||
excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt"
|
||||
excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt,**/buildSrc/**"
|
||||
threshold: 120
|
||||
LongParameterList:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
threshold: 6
|
||||
ignoreDefaultParameters: false
|
||||
NestedBlockDepth:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
threshold: 4
|
||||
TooManyFunctions:
|
||||
active: true
|
||||
excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt"
|
||||
excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt,**/buildSrc/**"
|
||||
thresholdInFiles: 15
|
||||
thresholdInClasses: 15
|
||||
thresholdInInterfaces: 15
|
||||
@ -41,6 +45,7 @@ complexity:
|
||||
|
||||
empty-blocks:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
EmptyCatchBlock:
|
||||
active: true
|
||||
allowedExceptionNameRegex: "^(_|(ignore|expected).*)"
|
||||
@ -71,6 +76,7 @@ empty-blocks:
|
||||
|
||||
exceptions:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
TooGenericExceptionCaught:
|
||||
active: true
|
||||
exceptionNames:
|
||||
@ -92,6 +98,7 @@ exceptions:
|
||||
|
||||
naming:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
ClassNaming:
|
||||
active: true
|
||||
classPattern: '[A-Z$][a-zA-Z0-9$]*'
|
||||
@ -127,6 +134,7 @@ naming:
|
||||
|
||||
performance:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
ForEachOnRange:
|
||||
active: true
|
||||
SpreadOperator:
|
||||
@ -136,6 +144,7 @@ performance:
|
||||
|
||||
potential-bugs:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
DuplicateCaseInWhenExpression:
|
||||
active: true
|
||||
EqualsWithHashCodeExist:
|
||||
@ -149,10 +158,11 @@ style:
|
||||
active: true
|
||||
ForbiddenComment:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
values: 'TODO:,FIXME:,STOPSHIP:'
|
||||
MagicNumber:
|
||||
active: true
|
||||
excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt"
|
||||
excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt,**/buildSrc/**"
|
||||
ignoreNumbers: '-1,0,1,2'
|
||||
ignoreHashCodeFunction: true
|
||||
ignorePropertyDeclaration: false
|
||||
@ -163,23 +173,30 @@ style:
|
||||
ignoreEnums: false
|
||||
MaxLineLength:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
maxLineLength: 140
|
||||
excludePackageStatements: true
|
||||
excludeImportStatements: true
|
||||
ModifierOrder:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
OptionalAbstractKeyword:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
ReturnCount:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
max: 2
|
||||
excludedFunctions: "equals"
|
||||
excludeReturnFromLambda: true
|
||||
SafeCast:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
ThrowsCount:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
max: 2
|
||||
WildcardImport:
|
||||
active: true
|
||||
excludes: "**/buildSrc/**"
|
||||
excludeImports: 'java.util.*,kotlinx.android.synthetic.*'
|
@ -25,7 +25,8 @@ corda_substitutions = {
|
||||
"|quasar_version|" : constants_properties_dict["quasarVersion"],
|
||||
"|platform_version|" : constants_properties_dict["platformVersion"],
|
||||
"|os_branch|" : constants_properties_dict["openSourceBranch"],
|
||||
"|os_samples_branch|" : constants_properties_dict["openSourceSamplesBranch"]
|
||||
"|os_samples_branch|" : constants_properties_dict["openSourceSamplesBranch"],
|
||||
"|jolokia_version|" : constants_properties_dict["jolokiaAgentVersion"]
|
||||
}
|
||||
|
||||
def setup(app):
|
||||
|
@ -81,6 +81,12 @@ Note that in production, exposing the database via the node is not recommended.
|
||||
Monitoring your node
|
||||
--------------------
|
||||
|
||||
This section covers monitoring performance and health of a node in Corda Enterprise with Jolokia and Graphite. General best practices for monitoring (e.g. setting up TCP checks for the ports the node communicates on, database health checks etc.) are not covered here but should be followed.
|
||||
|
||||
|
||||
Monitoring via Jolokia
|
||||
++++++++++++++++++++++
|
||||
|
||||
Like most Java servers, the node can be configured to export various useful metrics and management operations via the industry-standard
|
||||
`JMX infrastructure <https://en.wikipedia.org/wiki/Java_Management_Extensions>`_. JMX is a standard API
|
||||
for registering so-called *MBeans* ... objects whose properties and methods are intended for server management. As Java
|
||||
@ -106,8 +112,12 @@ Here are a few ways to build dashboards and extract monitoring data for a node:
|
||||
It can bridge any data input to any output using their plugin system, for example, Telegraf can
|
||||
be configured to collect data from Jolokia and write to DataDog web api.
|
||||
|
||||
The Node configuration parameter `jmxMonitoringHttpPort` has to be present in order to ensure a Jolokia agent is instrumented with
|
||||
the JVM run-time.
|
||||
In order to ensure that a Jolokia agent is instrumented with the JVM run-time, you can choose one of these options:
|
||||
|
||||
* Specify the Node configuration parameter ``jmxMonitoringHttpPort`` which will attempt to load the jolokia driver from the ``drivers`` folder.
|
||||
The format of the driver name needs to be ``jolokia-jvm-{VERSION}-agent.jar`` where VERSION is the version required by Corda, currently |jolokia_version|.
|
||||
* Start the node with ``java -Dcapsule.jvm.args="-javaagent:drivers/jolokia-jvm-1.6.0-agent.jar=port=7777,host=localhost" -jar corda.jar``.
|
||||
|
||||
|
||||
The following JMX statistics are exported:
|
||||
|
||||
@ -126,6 +136,8 @@ via a file called ``jolokia-access.xml``.
|
||||
Several Jolokia policy based security configuration files (``jolokia-access.xml``) are available for dev, test, and prod
|
||||
environments under ``/config/<env>``.
|
||||
|
||||
To pass a security policy use ``java -Dcapsule.jvm.args=-javaagent:./drivers/jolokia-jvm-1.6.0-agent.jar,policyLocation=file:./config-path/jolokia-access.xml -jar corda.jar``
|
||||
|
||||
Notes for development use
|
||||
+++++++++++++++++++++++++
|
||||
|
||||
|
@ -33,6 +33,7 @@ import java.io.File
|
||||
import java.net.URL
|
||||
import java.nio.file.FileAlreadyExistsException
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
|
||||
import java.security.PublicKey
|
||||
import java.time.Duration
|
||||
@ -68,7 +69,7 @@ internal constructor(private val initSerEnv: Boolean,
|
||||
companion object {
|
||||
// TODO This will probably need to change once we start using a bundled JVM
|
||||
private val nodeInfoGenCmd = listOf(
|
||||
"java",
|
||||
Paths.get(System.getProperty("java.home"), "bin", "java").toString(),
|
||||
"-jar",
|
||||
"corda.jar",
|
||||
"generate-node-info"
|
||||
|
@ -25,9 +25,6 @@ class DatabaseTransaction(
|
||||
) {
|
||||
val id: UUID = UUID.randomUUID()
|
||||
|
||||
val flushing: Boolean get() = _flushingCount > 0
|
||||
private var _flushingCount = 0
|
||||
|
||||
val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
|
||||
database.dataSource.connection.apply {
|
||||
autoCommit = false
|
||||
@ -37,27 +34,6 @@ class DatabaseTransaction(
|
||||
|
||||
private val sessionDelegate = lazy {
|
||||
val session = database.entityManagerFactory.withOptions().connection(connection).openSession()
|
||||
session.addEventListeners(object : BaseSessionEventListener() {
|
||||
override fun flushStart() {
|
||||
_flushingCount++
|
||||
super.flushStart()
|
||||
}
|
||||
|
||||
override fun flushEnd(numberOfEntities: Int, numberOfCollections: Int) {
|
||||
super.flushEnd(numberOfEntities, numberOfCollections)
|
||||
_flushingCount--
|
||||
}
|
||||
|
||||
override fun partialFlushStart() {
|
||||
_flushingCount++
|
||||
super.partialFlushStart()
|
||||
}
|
||||
|
||||
override fun partialFlushEnd(numberOfEntities: Int, numberOfCollections: Int) {
|
||||
super.partialFlushEnd(numberOfEntities, numberOfCollections)
|
||||
_flushingCount--
|
||||
}
|
||||
})
|
||||
hibernateTransaction = session.beginTransaction()
|
||||
session
|
||||
}
|
||||
|
@ -3,7 +3,12 @@ buildscript {
|
||||
def properties = new Properties()
|
||||
file("$projectDir/src/main/resources/build.properties").withInputStream { properties.load(it) }
|
||||
|
||||
ext.jolokia_version = properties.getProperty('jolokiaAgentVersion')
|
||||
|
||||
Properties constants = new Properties()
|
||||
file("$rootDir/constants.properties").withInputStream { constants.load(it) }
|
||||
|
||||
|
||||
ext.jolokia_version = constants.getProperty('jolokiaAgentVersion')
|
||||
|
||||
dependencies {
|
||||
classpath group: 'com.github.docker-java', name: 'docker-java', version: '3.1.5'
|
||||
|
@ -10,7 +10,6 @@ import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
@Ignore
|
||||
class NodeRPCTests {
|
||||
private val CORDA_VERSION_REGEX = "\\d+(\\.\\d+)?(-\\w+)?".toRegex()
|
||||
private val CORDA_VENDOR = "Corda Open Source"
|
||||
@ -28,7 +27,6 @@ class NodeRPCTests {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), cordappsForAllNodes = CORDAPPS, extraCordappPackagesToScan = emptyList())) {
|
||||
val nodeDiagnosticInfo = startNode().get().rpc.nodeDiagnosticInfo()
|
||||
assertTrue(nodeDiagnosticInfo.version.matches(CORDA_VERSION_REGEX))
|
||||
assertTrue(nodeDiagnosticInfo.revision.matches(HEXADECIMAL_REGEX))
|
||||
assertEquals(PLATFORM_VERSION, nodeDiagnosticInfo.platformVersion)
|
||||
assertEquals(CORDA_VENDOR, nodeDiagnosticInfo.vendor)
|
||||
nodeDiagnosticInfo.cordapps.forEach { println("${it.shortName} ${it.type}") }
|
||||
|
@ -150,6 +150,7 @@ internal class CordaRPCOpsImpl(
|
||||
override fun killFlow(id: StateMachineRunId): Boolean = if (smm.killFlow(id)) true else smm.flowHospital.dropSessionInit(id.uuid)
|
||||
|
||||
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
|
||||
|
||||
val (allStateMachines, changes) = smm.track()
|
||||
return DataFeed(
|
||||
allStateMachines.map { stateMachineInfoFromFlowLogic(it) },
|
||||
|
@ -6,13 +6,10 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.identity.x500Matches
|
||||
import net.corda.core.internal.CertRole
|
||||
import net.corda.core.internal.hash
|
||||
import net.corda.core.node.services.IdentityService
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.api.IdentityServiceInternal
|
||||
import net.corda.node.services.persistence.WritablePublicKeyToOwningIdentityCache
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.crypto.x509Certificates
|
||||
import java.security.InvalidAlgorithmParameterException
|
||||
@ -101,6 +98,10 @@ class InMemoryIdentityService(
|
||||
return keyToPartyAndCerts[identityCertChain[1].publicKey]
|
||||
}
|
||||
|
||||
override fun partyFromKey(key: PublicKey): Party? {
|
||||
return certificateFromKey(key)?.party ?: keyToName[key.toStringShort()]?.let { wellKnownPartyFromX500Name(it) }
|
||||
}
|
||||
|
||||
override fun certificateFromKey(owningKey: PublicKey): PartyAndCertificate? = keyToPartyAndCerts[owningKey]
|
||||
|
||||
// We give the caller a copy of the data set to avoid any locking problems
|
||||
|
@ -296,6 +296,12 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
|
||||
keyToPartyAndCert[owningKey.toStringShort()]
|
||||
}
|
||||
|
||||
override fun partyFromKey(key: PublicKey): Party? {
|
||||
return certificateFromKey(key)?.party ?: database.transaction {
|
||||
keyToName[key.toStringShort()]
|
||||
}?.let { wellKnownPartyFromX500Name(it) }
|
||||
}
|
||||
|
||||
private fun certificateFromCordaX500Name(name: CordaX500Name): PartyAndCertificate? {
|
||||
return database.transaction {
|
||||
val partyId = nameToKey[name]
|
||||
|
@ -71,7 +71,7 @@ sealed class Event {
|
||||
* Initiate a flow. This causes a new session object to be created and returned to the flow. Note that no actual
|
||||
* communication takes place at this time, only on the first send/receive operation on the session.
|
||||
*/
|
||||
data class InitiateFlow(val destination: Destination) : Event()
|
||||
data class InitiateFlow(val destination: Destination, val wellKnownParty: Party) : Event()
|
||||
|
||||
/**
|
||||
* Signal the entering into a subflow.
|
||||
|
@ -5,6 +5,8 @@ import net.corda.core.flows.*
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.slf4j.Logger
|
||||
import java.lang.reflect.ParameterizedType
|
||||
import java.lang.reflect.Type
|
||||
import java.lang.reflect.TypeVariable
|
||||
@ -33,7 +35,12 @@ data class FlowLogicRefImpl internal constructor(val flowLogicClassName: String,
|
||||
* in response to a potential malicious use or buggy update to an app etc.
|
||||
*/
|
||||
// TODO: Replace with a per app classloader/cordapp provider/cordapp loader - this will do for now
|
||||
@Suppress("ReturnCount", "TooManyFunctions")
|
||||
open class FlowLogicRefFactoryImpl(private val classloader: ClassLoader) : SingletonSerializeAsToken(), FlowLogicRefFactory {
|
||||
companion object {
|
||||
private val log: Logger = contextLogger()
|
||||
}
|
||||
|
||||
override fun create(flowClass: Class<out FlowLogic<*>>, vararg args: Any?): FlowLogicRef {
|
||||
if (!flowClass.isAnnotationPresent(SchedulableFlow::class.java)) {
|
||||
throw IllegalFlowLogicException(flowClass, "because it's not a schedulable flow")
|
||||
@ -76,20 +83,100 @@ open class FlowLogicRefFactoryImpl(private val classloader: ClassLoader) : Singl
|
||||
return createKotlin(flowClass, argsMap)
|
||||
}
|
||||
|
||||
protected open fun findConstructor(flowClass: Class<out FlowLogic<*>>, argTypes: List<Class<Any>?>): KFunction<FlowLogic<*>> {
|
||||
private fun matchConstructorArgs(ctorTypes: List<Class<out Any>>, optional: List<Boolean>,
|
||||
argTypes: List<Class<Any>?>): Pair<Boolean, Int> {
|
||||
// There must be at least as many constructor arguments as supplied arguments
|
||||
if (argTypes.size > ctorTypes.size) {
|
||||
return Pair(false, 0)
|
||||
}
|
||||
|
||||
// Check if all constructor arguments are assignable for all supplied arguments, then for remaining arguments in constructor
|
||||
// check that they are optional. If they are it's still a match. Return if matched and the number of default args consumed.
|
||||
var numDefaultsUsed = 0
|
||||
var index = 0
|
||||
for (conArg in ctorTypes) {
|
||||
if (index < argTypes.size) {
|
||||
val argType = argTypes[index]
|
||||
if (argType != null && !conArg.isAssignableFrom(argType)) {
|
||||
return Pair(false, 0)
|
||||
}
|
||||
} else {
|
||||
if (index >= optional.size || !optional[index]) {
|
||||
return Pair(false, 0)
|
||||
}
|
||||
numDefaultsUsed++
|
||||
}
|
||||
index++
|
||||
}
|
||||
|
||||
return Pair(true, numDefaultsUsed)
|
||||
}
|
||||
|
||||
private fun handleNoMatchingConstructor(flowClass: Class<out FlowLogic<*>>, argTypes: List<Class<Any>?>) {
|
||||
log.error("Cannot find Constructor to match arguments: ${argTypes.joinToString()}")
|
||||
log.info("Candidate constructors are:")
|
||||
for (ctor in flowClass.kotlin.constructors) {
|
||||
log.info("${ctor}")
|
||||
}
|
||||
}
|
||||
|
||||
private fun findConstructorCheckDefaultParams(flowClass: Class<out FlowLogic<*>>, argTypes: List<Class<Any>?>):
|
||||
KFunction<FlowLogic<*>> {
|
||||
// There may be multiple matches. If there are, we will use the one with the least number of default parameter matches.
|
||||
var ctorMatch: KFunction<FlowLogic<*>>? = null
|
||||
var matchNumDefArgs = 0
|
||||
for (ctor in flowClass.kotlin.constructors) {
|
||||
// Get the types of the arguments, always boxed (as that's what we get in the invocation).
|
||||
val ctorTypes = ctor.javaConstructor!!.parameterTypes.map {
|
||||
if (it == null) { it } else { Primitives.wrap(it) }
|
||||
}
|
||||
|
||||
val optional = ctor.parameters.map { it.isOptional }
|
||||
val (matched, numDefaultsUsed) = matchConstructorArgs(ctorTypes, optional, argTypes)
|
||||
if (matched) {
|
||||
if (ctorMatch == null || numDefaultsUsed < matchNumDefArgs) {
|
||||
ctorMatch = ctor
|
||||
matchNumDefArgs = numDefaultsUsed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (ctorMatch == null) {
|
||||
handleNoMatchingConstructor(flowClass, argTypes)
|
||||
// Must do the throw here, not in handleNoMatchingConstructor(added for Detekt) else we can't return ctorMatch as non-null
|
||||
throw IllegalFlowLogicException(flowClass, "No constructor found that matches arguments (${argTypes.joinToString()}), "
|
||||
+ "see log for more information.")
|
||||
}
|
||||
|
||||
log.info("Matched constructor: ${ctorMatch} (num_default_args_used=$matchNumDefArgs)")
|
||||
return ctorMatch
|
||||
}
|
||||
|
||||
private fun findConstructorDirectMatch(flowClass: Class<out FlowLogic<*>>, argTypes: List<Class<Any>?>): KFunction<FlowLogic<*>> {
|
||||
return flowClass.kotlin.constructors.single { ctor ->
|
||||
// Get the types of the arguments, always boxed (as that's what we get in the invocation).
|
||||
val ctorTypes = ctor.javaConstructor!!.parameterTypes.map { Primitives.wrap(it) }
|
||||
if (argTypes.size != ctorTypes.size)
|
||||
return@single false
|
||||
for ((argType, ctorType) in argTypes.zip(ctorTypes)) {
|
||||
if (argType == null) continue // Try and find a match based on the other arguments.
|
||||
if (argType == null) continue // Try and find a match based on the other arguments.
|
||||
if (!ctorType.isAssignableFrom(argType)) return@single false
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
protected open fun findConstructor(flowClass: Class<out FlowLogic<*>>, argTypes: List<Class<Any>?>): KFunction<FlowLogic<*>> {
|
||||
try {
|
||||
return findConstructorDirectMatch(flowClass, argTypes)
|
||||
} catch(e: java.lang.IllegalArgumentException) {
|
||||
log.trace("findConstructorDirectMatch threw IllegalArgumentException (more than 1 matches).")
|
||||
} catch (e: NoSuchElementException) {
|
||||
log.trace("findConstructorDirectMatch threw NoSuchElementException (no matches).")
|
||||
}
|
||||
return findConstructorCheckDefaultParams(flowClass, argTypes)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a [FlowLogicRef] by trying to find a Kotlin constructor that matches the given args.
|
||||
*
|
||||
|
@ -17,10 +17,11 @@ import net.corda.core.utilities.UntrustworthyData
|
||||
|
||||
class FlowSessionImpl(
|
||||
override val destination: Destination,
|
||||
private val wellKnownParty: Party,
|
||||
val sourceSessionId: SessionId
|
||||
) : FlowSession() {
|
||||
|
||||
override val counterparty: Party get() = checkNotNull(destination as? Party) { "$destination is not a Party" }
|
||||
override val counterparty: Party get() = wellKnownParty
|
||||
|
||||
override fun toString(): String = "FlowSessionImpl(destination=$destination, sourceSessionId=$sourceSessionId)"
|
||||
|
||||
|
@ -355,10 +355,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun initiateFlow(destination: Destination): FlowSession {
|
||||
override fun initiateFlow(destination: Destination, wellKnownParty: Party): FlowSession {
|
||||
require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" }
|
||||
val resume = processEventImmediately(
|
||||
Event.InitiateFlow(destination),
|
||||
Event.InitiateFlow(destination, wellKnownParty),
|
||||
isDbTransactionOpenOnEntry = true,
|
||||
isDbTransactionOpenOnExit = true
|
||||
) as FlowContinuation.Resume
|
||||
|
@ -466,7 +466,7 @@ class SingleThreadedStateMachineManager(
|
||||
try {
|
||||
val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
|
||||
val initiatedSessionId = SessionId.createRandom(secureRandom)
|
||||
val senderSession = FlowSessionImpl(sender, initiatedSessionId)
|
||||
val senderSession = FlowSessionImpl(sender, sender, initiatedSessionId)
|
||||
val flowLogic = initiatedFlowFactory.createFlow(senderSession)
|
||||
val initiatedFlowInfo = when (initiatedFlowFactory) {
|
||||
is InitiatedFlowFactory.Core -> FlowInfo(serviceHub.myInfo.platformVersion, "corda")
|
||||
|
@ -235,7 +235,7 @@ class TopLevelTransition(
|
||||
return@builder FlowContinuation.ProcessEvents
|
||||
}
|
||||
val sourceSessionId = SessionId.createRandom(context.secureRandom)
|
||||
val sessionImpl = FlowSessionImpl(event.destination, sourceSessionId)
|
||||
val sessionImpl = FlowSessionImpl(event.destination, event.wellKnownParty, sourceSessionId)
|
||||
val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
|
||||
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
|
||||
actions.add(Action.AddSessionBinding(context.id, sourceSessionId))
|
||||
|
@ -7,6 +7,8 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||
import net.corda.nodeapi.internal.persistence.contextTransaction
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import org.hibernate.Session
|
||||
import org.hibernate.internal.SessionImpl
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
@ -191,14 +193,23 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
|
||||
|
||||
private fun loadValue(key: K): V? {
|
||||
val session = currentDBSession()
|
||||
val flushing = contextTransaction.flushing
|
||||
if (!flushing) {
|
||||
val isSafeToDetach = isSafeToFlushAndDetach(session)
|
||||
if (isSafeToDetach) {
|
||||
// IMPORTANT: The flush is needed because detach() makes the queue of unflushed entries invalid w.r.t. Hibernate internal state if the found entity is unflushed.
|
||||
// We want the detach() so that we rely on our cache memory management and don't retain strong references in the Hibernate session.
|
||||
session.flush()
|
||||
}
|
||||
val result = session.find(persistentEntityClass, toPersistentEntityKey(key))
|
||||
return result?.apply { if (!flushing) session.detach(result) }?.let(fromPersistentEntity)?.second
|
||||
return result?.apply { if (isSafeToDetach) session.detach(result) }?.let(fromPersistentEntity)?.second
|
||||
}
|
||||
|
||||
private fun isSafeToFlushAndDetach(session: Session): Boolean {
|
||||
if (session !is SessionImpl)
|
||||
return true
|
||||
|
||||
val flushInProgress = session.persistenceContext.isFlushing
|
||||
val cascadeInProgress = session.persistenceContext.cascadeLevel > 0
|
||||
return !flushInProgress && !cascadeInProgress
|
||||
}
|
||||
|
||||
protected fun transactionalLoadValue(key: K): Transactional<V> {
|
||||
|
@ -1,4 +1,6 @@
|
||||
# Build constants exported as resource file to make them visible in Node program
|
||||
# Note: sadly, due to present limitation of IntelliJ-IDEA in processing resource files, these constants cannot be
|
||||
# imported from top-level 'constants.properties' file
|
||||
jolokiaAgentVersion=1.6.1
|
||||
#jolokiaAgentVersion=1.6.1
|
||||
|
||||
|
||||
|
@ -261,6 +261,17 @@ class PersistentIdentityServiceTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `resolve key to party for key without certificate`() {
|
||||
// Register Alice's PartyAndCert as if it was done so via the network map cache.
|
||||
identityService.verifyAndRegisterIdentity(alice.identity)
|
||||
// Use a key which is not tied to a cert.
|
||||
val publicKey = Crypto.generateKeyPair().public
|
||||
// Register the PublicKey to Alice's CordaX500Name.
|
||||
identityService.registerKey(publicKey, alice.party)
|
||||
assertEquals(alice.party, identityService.partyFromKey(publicKey))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `register incorrect party to public key `(){
|
||||
database.transaction { identityService.verifyAndRegisterIdentity(ALICE_IDENTITY) }
|
||||
|
@ -1,82 +0,0 @@
|
||||
package net.corda.node.services.persistence
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.`issued by`
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.issuedBy
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.keys.E2ETestKeyManagementService
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import net.corda.testing.node.MockServices
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class HibernateColumnConverterTests {
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
private val cordapps = listOf("net.corda.finance")
|
||||
|
||||
private val myself = TestIdentity(CordaX500Name("Me", "London", "GB"))
|
||||
private val notary = TestIdentity(CordaX500Name("NotaryService", "London", "GB"), 1337L)
|
||||
|
||||
lateinit var services: MockServices
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val (db, mockServices) = MockServices.makeTestDatabaseAndPersistentServices(
|
||||
cordappPackages = cordapps,
|
||||
initialIdentity = myself,
|
||||
networkParameters = testNetworkParameters(minimumPlatformVersion = 4),
|
||||
moreIdentities = setOf(notary.identity),
|
||||
moreKeys = emptySet()
|
||||
)
|
||||
services = mockServices
|
||||
database = db
|
||||
}
|
||||
|
||||
// AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a
|
||||
// cache miss was doing a flush. This also checks that loading during flush does actually work.
|
||||
@Test
|
||||
fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() {
|
||||
val expected = 500.DOLLARS
|
||||
val ref = OpaqueBytes.of(0x01)
|
||||
|
||||
// Create parallel set of key and identity services so that the values are not cached, forcing the node caches to do a lookup.
|
||||
val cacheFactory = TestingNamedCacheFactory()
|
||||
val identityService = PersistentIdentityService(cacheFactory)
|
||||
val originalIdentityService: PersistentIdentityService = services.identityService as PersistentIdentityService
|
||||
identityService.database = originalIdentityService.database
|
||||
identityService.start(originalIdentityService.trustRoot, pkToIdCache = PublicKeyToOwningIdentityCacheImpl(database, cacheFactory))
|
||||
val keyService = E2ETestKeyManagementService(identityService)
|
||||
keyService.start(setOf(myself.keyPair))
|
||||
|
||||
// New identity for a notary (doesn't matter that it's for Bank Of Corda... since not going to use it as an actual notary etc).
|
||||
val newKeyAndCert = keyService.freshKeyAndCert(services.myInfo.legalIdentitiesAndCerts[0], false)
|
||||
val randomNotary = Party(myself.name, newKeyAndCert.owningKey)
|
||||
|
||||
val ourIdentity = services.myInfo.legalIdentities.first()
|
||||
val builder = TransactionBuilder(notary.party)
|
||||
val issuer = services.myInfo.legalIdentities.first().ref(ref)
|
||||
val signers = Cash().generateIssue(builder, expected.issuedBy(issuer), ourIdentity, randomNotary)
|
||||
val tx: SignedTransaction = services.signInitialTransaction(builder, signers)
|
||||
services.recordTransactions(tx)
|
||||
|
||||
val output = tx.tx.outputsOfType<Cash.State>().single()
|
||||
assertEquals(expected.`issued by`(ourIdentity.ref(ref)), output.amount)
|
||||
}
|
||||
}
|
@ -0,0 +1,168 @@
|
||||
package net.corda.node.services.persistence
|
||||
|
||||
import net.corda.core.contracts.BelongsToContract
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.contracts.TypeOnlyCommandData
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.QueryableState
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.`issued by`
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.issuedBy
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.internal.TestingNamedCacheFactory
|
||||
import net.corda.testing.node.MockServices
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.hibernate.annotations.Cascade
|
||||
import org.hibernate.annotations.CascadeType
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import java.lang.IllegalArgumentException
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Id
|
||||
import javax.persistence.OneToMany
|
||||
import javax.persistence.Table
|
||||
import javax.persistence.GeneratedValue
|
||||
import javax.persistence.GenerationType
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
/**
|
||||
* These tests cover the interactions between Corda and Hibernate with regards to flushing/detaching/cascading.
|
||||
*/
|
||||
class HibernateInteractionTests {
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule()
|
||||
|
||||
private val cordapps = listOf("net.corda.finance", "net.corda.node.services.persistence")
|
||||
|
||||
private val myself = TestIdentity(CordaX500Name("Me", "London", "GB"))
|
||||
private val notary = TestIdentity(CordaX500Name("NotaryService", "London", "GB"), 1337L)
|
||||
|
||||
lateinit var services: MockServices
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val (db, mockServices) = MockServices.makeTestDatabaseAndPersistentServices(
|
||||
cordappPackages = cordapps,
|
||||
initialIdentity = myself,
|
||||
networkParameters = testNetworkParameters(minimumPlatformVersion = 4),
|
||||
moreIdentities = setOf(notary.identity),
|
||||
moreKeys = emptySet(),
|
||||
// forcing a cache size of zero, so that all requests lead to a cache miss and end up hitting the database
|
||||
cacheFactory = TestingNamedCacheFactory(0)
|
||||
)
|
||||
services = mockServices
|
||||
database = db
|
||||
}
|
||||
|
||||
// AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a
|
||||
// cache miss was doing a flush. This also checks that loading during flush does actually work.
|
||||
@Test
|
||||
fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() {
|
||||
val expected = 500.DOLLARS
|
||||
val ref = OpaqueBytes.of(0x01)
|
||||
|
||||
val ourIdentity = services.myInfo.legalIdentities.first()
|
||||
val builder = TransactionBuilder(notary.party)
|
||||
val issuer = services.myInfo.legalIdentities.first().ref(ref)
|
||||
val signers = Cash().generateIssue(builder, expected.issuedBy(issuer), ourIdentity, notary.party)
|
||||
val tx: SignedTransaction = services.signInitialTransaction(builder, signers)
|
||||
services.recordTransactions(tx)
|
||||
|
||||
val output = tx.tx.outputsOfType<Cash.State>().single()
|
||||
assertEquals(expected.`issued by`(ourIdentity.ref(ref)), output.amount)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `when a cascade is in progress (because of nested entities), the node avoids to flush & detach entities, since it's not allowed by Hibernate`() {
|
||||
val ourIdentity = services.myInfo.legalIdentities.first()
|
||||
|
||||
val childEntities = listOf(SimpleContract.ChildState(ourIdentity))
|
||||
val parentEntity = SimpleContract.ParentState(childEntities)
|
||||
|
||||
val builder = TransactionBuilder(notary.party)
|
||||
.addOutputState(TransactionState(parentEntity, SimpleContract::class.java.name, notary.party))
|
||||
.addCommand(SimpleContract.Issue(), listOf(ourIdentity.owningKey))
|
||||
val tx: SignedTransaction = services.signInitialTransaction(builder, listOf(ourIdentity.owningKey))
|
||||
services.recordTransactions(tx)
|
||||
|
||||
val output = tx.tx.outputsOfType<SimpleContract.ParentState>().single()
|
||||
assertThat(output.children.single().member).isEqualTo(ourIdentity)
|
||||
}
|
||||
|
||||
object PersistenceSchema: MappedSchema(PersistenceSchema::class.java, 1, listOf(Parent::class.java, Child::class.java)) {
|
||||
|
||||
@Entity(name = "parents")
|
||||
@Table
|
||||
class Parent: PersistentState() {
|
||||
|
||||
@Cascade(CascadeType.ALL)
|
||||
@OneToMany(targetEntity = Child::class)
|
||||
val children: MutableCollection<Child> = mutableSetOf()
|
||||
|
||||
fun addChild(child: Child) {
|
||||
children.add(child)
|
||||
}
|
||||
}
|
||||
|
||||
@Entity(name = "children")
|
||||
class Child(
|
||||
@Id
|
||||
// Do not change this: this generation type is required in order to trigger the proper cascade ordering.
|
||||
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||
val identifier: Int?,
|
||||
|
||||
val member: AbstractParty?
|
||||
) {
|
||||
constructor(member: AbstractParty): this(null, member)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class SimpleContract: Contract {
|
||||
|
||||
@BelongsToContract(SimpleContract::class)
|
||||
@CordaSerializable
|
||||
data class ParentState(val children: List<ChildState>): ContractState, QueryableState {
|
||||
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(PersistenceSchema)
|
||||
|
||||
override fun generateMappedObject(schema: MappedSchema): PersistentState {
|
||||
return when(schema) {
|
||||
is PersistenceSchema -> {
|
||||
val parent = PersistenceSchema.Parent()
|
||||
children.forEach { parent.addChild(PersistenceSchema.Child(it.member)) }
|
||||
parent
|
||||
}
|
||||
else -> throw IllegalArgumentException("Unrecognised schema $schema")
|
||||
}
|
||||
}
|
||||
|
||||
override val participants: List<AbstractParty> = children.map { it.member }
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
data class ChildState(val member: AbstractParty)
|
||||
|
||||
override fun verify(tx: LedgerTransaction) {}
|
||||
|
||||
class Issue: TypeOnlyCommandData()
|
||||
}
|
||||
|
||||
}
|
@ -69,8 +69,8 @@ class FlowFrameworkTests {
|
||||
@Before
|
||||
fun setUpMockNet() {
|
||||
mockNet = InternalMockNetwork(
|
||||
cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP),
|
||||
servicePeerAllocationStrategy = RoundRobin()
|
||||
cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP),
|
||||
servicePeerAllocationStrategy = RoundRobin()
|
||||
)
|
||||
|
||||
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
|
||||
@ -139,13 +139,13 @@ class FlowFrameworkTests {
|
||||
mockNet.runNetwork()
|
||||
|
||||
assertSessionTransfers(
|
||||
aliceNode sent sessionInit(PingPongFlow::class, payload = 10L) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent sessionData(20L) to aliceNode,
|
||||
aliceNode sent sessionData(11L) to bobNode,
|
||||
bobNode sent sessionData(21L) to aliceNode,
|
||||
aliceNode sent normalEnd to bobNode,
|
||||
bobNode sent normalEnd to aliceNode
|
||||
aliceNode sent sessionInit(PingPongFlow::class, payload = 10L) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent sessionData(20L) to aliceNode,
|
||||
aliceNode sent sessionData(11L) to bobNode,
|
||||
bobNode sent sessionData(21L) to aliceNode,
|
||||
aliceNode sent normalEnd to bobNode,
|
||||
bobNode sent normalEnd to aliceNode
|
||||
)
|
||||
}
|
||||
|
||||
@ -167,7 +167,8 @@ class FlowFrameworkTests {
|
||||
it.message is ExistingSessionMessage && it.message.payload === EndSessionMessage
|
||||
}.subscribe { sessionEndReceived.release() }
|
||||
val resultFuture = aliceNode.services.startFlow(
|
||||
WaitForOtherSideEndBeforeSendAndReceive(bob, sessionEndReceived)).resultFuture
|
||||
WaitForOtherSideEndBeforeSendAndReceive(bob, sessionEndReceived)
|
||||
).resultFuture
|
||||
mockNet.runNetwork()
|
||||
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
|
||||
resultFuture.getOrThrow()
|
||||
@ -186,10 +187,10 @@ class FlowFrameworkTests {
|
||||
mockNet.runNetwork()
|
||||
|
||||
assertThatExceptionOfType(MyFlowException::class.java)
|
||||
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
|
||||
.withMessage("Nothing useful")
|
||||
.withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow
|
||||
.withStackTraceContaining("Received counter-flow exception from peer")
|
||||
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
|
||||
.withMessage("Nothing useful")
|
||||
.withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow
|
||||
.withStackTraceContaining("Received counter-flow exception from peer")
|
||||
bobNode.database.transaction {
|
||||
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
|
||||
}
|
||||
@ -197,15 +198,15 @@ class FlowFrameworkTests {
|
||||
assertThat(receivingFiber.state).isEqualTo(Strand.State.WAITING)
|
||||
assertThat((erroringFlow.get().stateMachine as FlowStateMachineImpl).state).isEqualTo(Strand.State.WAITING)
|
||||
assertThat(erroringFlowSteps.get()).containsExactly(
|
||||
Notification.createOnNext(ProgressTracker.STARTING),
|
||||
Notification.createOnNext(ExceptionFlow.START_STEP),
|
||||
Notification.createOnError(erroringFlow.get().exceptionThrown)
|
||||
Notification.createOnNext(ProgressTracker.STARTING),
|
||||
Notification.createOnNext(ExceptionFlow.START_STEP),
|
||||
Notification.createOnError(erroringFlow.get().exceptionThrown)
|
||||
)
|
||||
|
||||
assertSessionTransfers(
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent errorMessage(erroringFlow.get().exceptionThrown) to aliceNode
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent errorMessage(erroringFlow.get().exceptionThrown) to aliceNode
|
||||
)
|
||||
// Make sure the original stack trace isn't sent down the wire
|
||||
val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage
|
||||
@ -296,8 +297,8 @@ class FlowFrameworkTests {
|
||||
@Test
|
||||
fun waitForLedgerCommit() {
|
||||
val ptx = TransactionBuilder(notary = notaryIdentity)
|
||||
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand(alice.owningKey))
|
||||
val stx = aliceNode.services.signInitialTransaction(ptx)
|
||||
|
||||
val committerStx = aliceNode.registerCordappFlowFactory(CommitterFlow::class) {
|
||||
@ -313,8 +314,8 @@ class FlowFrameworkTests {
|
||||
@Test
|
||||
fun `waitForLedgerCommit throws exception if any active session ends in error`() {
|
||||
val ptx = TransactionBuilder(notary = notaryIdentity)
|
||||
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand())
|
||||
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand())
|
||||
val stx = aliceNode.services.signInitialTransaction(ptx)
|
||||
|
||||
aliceNode.registerCordappFlowFactory(WaitForLedgerCommitFlow::class) { ExceptionFlow { throw Exception("Error") } }
|
||||
@ -354,8 +355,8 @@ class FlowFrameworkTests {
|
||||
val result = aliceNode.services.startFlow(UpgradedFlow(bob)).resultFuture
|
||||
mockNet.runNetwork()
|
||||
assertThat(receivedSessionMessages).startsWith(
|
||||
aliceNode sent sessionInit(UpgradedFlow::class, flowVersion = 2) to bobNode,
|
||||
bobNode sent sessionConfirm(flowVersion = 1) to aliceNode
|
||||
aliceNode sent sessionInit(UpgradedFlow::class, flowVersion = 2) to bobNode,
|
||||
bobNode sent sessionConfirm(flowVersion = 1) to aliceNode
|
||||
)
|
||||
val (receivedPayload, node2FlowVersion) = result.getOrThrow()
|
||||
assertThat(receivedPayload).isEqualTo("Old initiated")
|
||||
@ -369,8 +370,8 @@ class FlowFrameworkTests {
|
||||
val flowInfo = aliceNode.services.startFlow(initiatingFlow).resultFuture
|
||||
mockNet.runNetwork()
|
||||
assertThat(receivedSessionMessages).startsWith(
|
||||
aliceNode sent sessionInit(SendFlow::class, flowVersion = 1, payload = "Old initiating") to bobNode,
|
||||
bobNode sent sessionConfirm(flowVersion = 2) to aliceNode
|
||||
aliceNode sent sessionInit(SendFlow::class, flowVersion = 1, payload = "Old initiating") to bobNode,
|
||||
bobNode sent sessionConfirm(flowVersion = 2) to aliceNode
|
||||
)
|
||||
assertThat(flowInfo.get().flowVersion).isEqualTo(2)
|
||||
}
|
||||
@ -380,8 +381,8 @@ class FlowFrameworkTests {
|
||||
val future = aliceNode.services.startFlow(NeverRegisteredFlow("Hello", bob)).resultFuture
|
||||
mockNet.runNetwork()
|
||||
assertThatExceptionOfType(UnexpectedFlowEndException::class.java)
|
||||
.isThrownBy { future.getOrThrow() }
|
||||
.withMessageEndingWith("${NeverRegisteredFlow::class.java.name} is not registered")
|
||||
.isThrownBy { future.getOrThrow() }
|
||||
.withMessageEndingWith("${NeverRegisteredFlow::class.java.name} is not registered")
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -441,9 +442,9 @@ class FlowFrameworkTests {
|
||||
erroringFlowFuture.getOrThrow()
|
||||
val flowSteps = erroringFlowSteps.get()
|
||||
assertThat(flowSteps).containsExactly(
|
||||
Notification.createOnNext(ProgressTracker.STARTING),
|
||||
Notification.createOnNext(ExceptionFlow.START_STEP),
|
||||
Notification.createOnError(erroringFlowFuture.get().exceptionThrown)
|
||||
Notification.createOnNext(ProgressTracker.STARTING),
|
||||
Notification.createOnNext(ExceptionFlow.START_STEP),
|
||||
Notification.createOnError(erroringFlowFuture.get().exceptionThrown)
|
||||
)
|
||||
|
||||
val receiveFlowException = assertFailsWith(UnexpectedFlowEndException::class) {
|
||||
@ -451,28 +452,28 @@ class FlowFrameworkTests {
|
||||
}
|
||||
assertThat(receiveFlowException.message).doesNotContain("evil bug!")
|
||||
assertThat(receiveFlowSteps.get()).containsExactly(
|
||||
Notification.createOnNext(ProgressTracker.STARTING),
|
||||
Notification.createOnNext(ReceiveFlow.START_STEP),
|
||||
Notification.createOnError(receiveFlowException)
|
||||
Notification.createOnNext(ProgressTracker.STARTING),
|
||||
Notification.createOnNext(ReceiveFlow.START_STEP),
|
||||
Notification.createOnError(receiveFlowException)
|
||||
)
|
||||
|
||||
assertSessionTransfers(
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent errorMessage() to aliceNode
|
||||
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
|
||||
bobNode sent sessionConfirm() to aliceNode,
|
||||
bobNode sent errorMessage() to aliceNode
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `initiating flow using unknown AnonymousParty`() {
|
||||
val anonymousBob = bobNode.services.keyManagementService.freshKeyAndCert(bobNode.info.legalIdentitiesAndCerts.single(), false)
|
||||
.party.anonymise()
|
||||
.party.anonymise()
|
||||
bobNode.registerCordappFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
|
||||
val result = aliceNode.services.startFlow(SendAndReceiveFlow(anonymousBob, "Hello")).resultFuture
|
||||
mockNet.runNetwork()
|
||||
assertThatIllegalArgumentException()
|
||||
.isThrownBy { result.getOrThrow() }
|
||||
.withMessage("We do not know who $anonymousBob belongs to")
|
||||
.isThrownBy { result.getOrThrow() }
|
||||
.withMessage("Could not resolve destination: $anonymousBob")
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -497,16 +498,18 @@ class FlowFrameworkTests {
|
||||
private val FlowLogic<*>.progressSteps: CordaFuture<List<Notification<ProgressTracker.Step>>>
|
||||
get() {
|
||||
return progressTracker!!.changes
|
||||
.ofType(Change.Position::class.java)
|
||||
.map { it.newStep }
|
||||
.materialize()
|
||||
.toList()
|
||||
.toFuture()
|
||||
.ofType(Change.Position::class.java)
|
||||
.map { it.newStep }
|
||||
.materialize()
|
||||
.toList()
|
||||
.toFuture()
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party,
|
||||
@Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic<Unit>() {
|
||||
private class WaitForOtherSideEndBeforeSendAndReceive(
|
||||
val otherParty: Party,
|
||||
@Transient val receivedOtherFlowEnd: Semaphore
|
||||
) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
// Kick off the flow on the other side ...
|
||||
@ -626,7 +629,8 @@ class FlowFrameworkTests {
|
||||
//endregion Helpers
|
||||
}
|
||||
|
||||
internal fun sessionConfirm(flowVersion: Int = 1) = ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, "")))
|
||||
internal fun sessionConfirm(flowVersion: Int = 1) =
|
||||
ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, "")))
|
||||
|
||||
internal inline fun <reified P : FlowLogic<*>> TestStartedNode.getSingleFlow(): Pair<P, CordaFuture<*>> {
|
||||
return smm.findStateMachines(P::class.java).single()
|
||||
@ -637,17 +641,17 @@ private fun sanitise(message: SessionMessage) = when (message) {
|
||||
is ExistingSessionMessage -> {
|
||||
val payload = message.payload
|
||||
message.copy(
|
||||
recipientSessionId = SessionId(0),
|
||||
payload = when (payload) {
|
||||
is ConfirmSessionMessage -> payload.copy(
|
||||
initiatedSessionId = SessionId(0),
|
||||
initiatedFlowInfo = payload.initiatedFlowInfo.copy(appName = "")
|
||||
)
|
||||
is ErrorSessionMessage -> payload.copy(
|
||||
errorId = 0
|
||||
)
|
||||
else -> payload
|
||||
}
|
||||
recipientSessionId = SessionId(0),
|
||||
payload = when (payload) {
|
||||
is ConfirmSessionMessage -> payload.copy(
|
||||
initiatedSessionId = SessionId(0),
|
||||
initiatedFlowInfo = payload.initiatedFlowInfo.copy(appName = "")
|
||||
)
|
||||
is ErrorSessionMessage -> payload.copy(
|
||||
errorId = 0
|
||||
)
|
||||
else -> payload
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -667,10 +671,12 @@ internal fun TestStartedNode.sendSessionMessage(message: SessionMessage, destina
|
||||
}
|
||||
}
|
||||
|
||||
internal fun errorMessage(errorResponse: FlowException? = null) = ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0))
|
||||
internal fun errorMessage(errorResponse: FlowException? = null) =
|
||||
ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0))
|
||||
|
||||
internal infix fun TestStartedNode.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(internals.id, message)
|
||||
internal infix fun Pair<Int, SessionMessage>.to(node: TestStartedNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress)
|
||||
internal infix fun Pair<Int, SessionMessage>.to(node: TestStartedNode): SessionTransfer =
|
||||
SessionTransfer(first, second, node.network.myAddress)
|
||||
|
||||
internal data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) {
|
||||
val isPayloadTransfer: Boolean
|
||||
@ -785,7 +791,11 @@ internal class MyFlowException(override val message: String) : FlowException() {
|
||||
internal class MyPeerFlowException(override val message: String, val peer: Party) : FlowException()
|
||||
|
||||
@InitiatingFlow
|
||||
internal class SendAndReceiveFlow(private val destination: Destination, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic<Any>() {
|
||||
internal class SendAndReceiveFlow(
|
||||
private val destination: Destination,
|
||||
private val payload: Any,
|
||||
private val otherPartySession: FlowSession? = null
|
||||
) : FlowLogic<Any>() {
|
||||
constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession)
|
||||
|
||||
@Suspendable
|
||||
@ -795,7 +805,8 @@ internal class SendAndReceiveFlow(private val destination: Destination, private
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
internal class PingPongFlow(private val otherParty: Party, private val payload: Long, private val otherPartySession: FlowSession? = null) : FlowLogic<Unit>() {
|
||||
internal class PingPongFlow(private val otherParty: Party, private val payload: Long, private val otherPartySession: FlowSession? = null) :
|
||||
FlowLogic<Unit>() {
|
||||
constructor(otherPartySession: FlowSession, payload: Long) : this(otherPartySession.counterparty, payload, otherPartySession)
|
||||
|
||||
@Transient
|
||||
|
@ -3,8 +3,17 @@ ENV GRADLE_USER_HOME=/tmp/gradle
|
||||
RUN mkdir /tmp/gradle && mkdir -p /home/root/.m2/repository
|
||||
|
||||
RUN apt-get update && apt-get install -y curl libatomic1 && \
|
||||
curl -O https://d3pxv6yz143wms.cloudfront.net/8.222.10.1/java-1.8.0-amazon-corretto-jdk_8.222.10-1_amd64.deb && \
|
||||
apt-get install -y java-common && dpkg -i java-1.8.0-amazon-corretto-jdk_8.222.10-1_amd64.deb && \
|
||||
curl -O https://cdn.azul.com/zulu/bin/zulu8.40.0.25-ca-jdk8.0.222-linux_amd64.deb && \
|
||||
apt-get install -y java-common && apt install -y ./zulu8.40.0.25-ca-jdk8.0.222-linux_amd64.deb && \
|
||||
apt-get clean && \
|
||||
mkdir -p /tmp/source
|
||||
rm -f zulu8.40.0.25-ca-jdk8.0.222-linux_amd64.deb && \
|
||||
curl -O https://cdn.azul.com/zulu/bin/zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz && \
|
||||
mv /zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz /usr/lib/jvm/ && \
|
||||
cd /usr/lib/jvm/ && \
|
||||
tar -zxvf zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz && \
|
||||
rm -rf zulu-8-amd64 && \
|
||||
mv zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64 zulu-8-amd64 && \
|
||||
rm -f zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz && \
|
||||
cd / && mkdir -p /tmp/source
|
||||
|
||||
|
||||
|
@ -139,10 +139,12 @@ open class MockServices private constructor(
|
||||
* Makes database and persistent services appropriate for unit tests which require persistence across the vault, identity service
|
||||
* and key managment service.
|
||||
*
|
||||
* @param cordappPackages A [List] of cordapp packages to scan for any cordapp code, e.g. contract verification code, flows and services.
|
||||
* @param cordappPackages A [List] of cordapp packages to scan for any cordapp code, e.g. contract verification code,
|
||||
* flows and services.
|
||||
* @param initialIdentity The first (typically sole) identity the services will represent.
|
||||
* @param moreKeys A list of additional [KeyPair] instances to be used by [MockServices].
|
||||
* @param moreIdentities A list of additional [KeyPair] instances to be used by [MockServices].
|
||||
* @param cacheFactory A custom cache factory to be used by the created [IdentityService]
|
||||
* @return A pair where the first element is the instance of [CordaPersistence] and the second is [MockServices].
|
||||
*/
|
||||
@JvmStatic
|
||||
@ -152,12 +154,13 @@ open class MockServices private constructor(
|
||||
initialIdentity: TestIdentity,
|
||||
networkParameters: NetworkParameters = testNetworkParameters(modifiedTime = Instant.MIN),
|
||||
moreKeys: Set<KeyPair>,
|
||||
moreIdentities: Set<PartyAndCertificate>
|
||||
moreIdentities: Set<PartyAndCertificate>,
|
||||
cacheFactory: TestingNamedCacheFactory = TestingNamedCacheFactory()
|
||||
): Pair<CordaPersistence, MockServices> {
|
||||
val cordappLoader = cordappLoaderForPackages(cordappPackages)
|
||||
val dataSourceProps = makeTestDataSourceProperties()
|
||||
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
|
||||
val identityService = PersistentIdentityService(TestingNamedCacheFactory())
|
||||
val identityService = PersistentIdentityService(cacheFactory)
|
||||
val persistence = configureDatabase(
|
||||
hikariProperties = dataSourceProps,
|
||||
databaseConfig = DatabaseConfig(),
|
||||
@ -167,7 +170,7 @@ open class MockServices private constructor(
|
||||
internalSchemas = schemaService.internalSchemas()
|
||||
)
|
||||
|
||||
val pkToIdCache = PublicKeyToOwningIdentityCacheImpl(persistence, TestingNamedCacheFactory())
|
||||
val pkToIdCache = PublicKeyToOwningIdentityCacheImpl(persistence, cacheFactory)
|
||||
|
||||
// Create a persistent identity service and add all the supplied identities.
|
||||
identityService.apply {
|
||||
|
@ -6,6 +6,7 @@ import net.corda.networkbuilder.nodes.FoundNode
|
||||
import net.corda.networkbuilder.nodes.NodeCopier
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.File
|
||||
import java.nio.file.Paths
|
||||
|
||||
class NotaryCopier(private val cacheDir: File) : NodeCopier(cacheDir) {
|
||||
|
||||
@ -28,7 +29,9 @@ class NotaryCopier(private val cacheDir: File) : NodeCopier(cacheDir) {
|
||||
|
||||
fun generateNodeInfo(dirToGenerateFrom: File): File {
|
||||
val nodeInfoGeneratorProcess = ProcessBuilder()
|
||||
.command(listOf("java", "-jar", "corda.jar", "generate-node-info"))
|
||||
.command(listOf(
|
||||
Paths.get(System.getProperty("java.home"), "bin", "java").toString(),
|
||||
"-jar", "corda.jar", "generate-node-info"))
|
||||
.directory(dirToGenerateFrom)
|
||||
.inheritIO()
|
||||
.start()
|
||||
|
@ -13,6 +13,7 @@ import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||
import net.corda.client.rpc.GracefulReconnect
|
||||
import net.corda.client.rpc.PermissionException
|
||||
import net.corda.client.rpc.notUsed
|
||||
import net.corda.core.CordaException
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.contracts.UniqueIdentifier
|
||||
@ -70,6 +71,7 @@ import kotlin.concurrent.thread
|
||||
// TODO: Resurrect or reimplement the mail plugin.
|
||||
// TODO: Make it notice new shell commands added after the node started.
|
||||
|
||||
@Suppress("MaxLineLength")
|
||||
object InteractiveShell {
|
||||
private val log = LoggerFactory.getLogger(javaClass)
|
||||
private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps
|
||||
@ -521,8 +523,11 @@ object InteractiveShell {
|
||||
val parser = StringToMethodCallParser(CordaRPCOps::class.java, inputObjectMapper)
|
||||
val call = parser.parse(cordaRPCOps, cmd)
|
||||
result = call.call()
|
||||
var subscription : Subscriber<*>? = null
|
||||
if (result != null && result !== kotlin.Unit && result !is Void) {
|
||||
result = printAndFollowRPCResponse(result, out, outputFormat)
|
||||
val (subs, future) = printAndFollowRPCResponse(result, out, outputFormat)
|
||||
subscription = subs
|
||||
result = future
|
||||
}
|
||||
if (result is Future<*>) {
|
||||
if (!result.isDone) {
|
||||
@ -532,6 +537,7 @@ object InteractiveShell {
|
||||
try {
|
||||
result = result.get()
|
||||
} catch (e: InterruptedException) {
|
||||
subscription?.unsubscribe()
|
||||
Thread.currentThread().interrupt()
|
||||
} catch (e: ExecutionException) {
|
||||
throw e.rootCause
|
||||
@ -621,7 +627,11 @@ object InteractiveShell {
|
||||
}
|
||||
}
|
||||
|
||||
private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter, outputFormat: OutputFormat): CordaFuture<Unit> {
|
||||
private fun printAndFollowRPCResponse(
|
||||
response: Any?,
|
||||
out: PrintWriter,
|
||||
outputFormat: OutputFormat
|
||||
): Pair<PrintingSubscriber?, CordaFuture<Unit>> {
|
||||
val outputMapper = createOutputMapper(outputFormat)
|
||||
|
||||
val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) }
|
||||
@ -659,34 +669,52 @@ object InteractiveShell {
|
||||
}
|
||||
}
|
||||
|
||||
private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> {
|
||||
private fun maybeFollow(
|
||||
response: Any?,
|
||||
printerFun: (Any?) -> String,
|
||||
out: PrintWriter
|
||||
): Pair<PrintingSubscriber?, CordaFuture<Unit>> {
|
||||
// Match on a couple of common patterns for "important" observables. It's tough to do this in a generic
|
||||
// way because observables can be embedded anywhere in the object graph, and can emit other arbitrary
|
||||
// object graphs that contain yet more observables. So we just look for top level responses that follow
|
||||
// the standard "track" pattern, and print them until the user presses Ctrl-C
|
||||
if (response == null) return doneFuture(Unit)
|
||||
var result = Pair<PrintingSubscriber?, CordaFuture<Unit>>(null, doneFuture(Unit))
|
||||
|
||||
if (response is DataFeed<*, *>) {
|
||||
out.println("Snapshot:")
|
||||
out.println(printerFun(response.snapshot))
|
||||
out.flush()
|
||||
out.println("Updates:")
|
||||
return printNextElements(response.updates, printerFun, out)
|
||||
|
||||
when {
|
||||
response is DataFeed<*, *> -> {
|
||||
out.println("Snapshot:")
|
||||
out.println(printerFun(response.snapshot))
|
||||
out.flush()
|
||||
out.println("Updates:")
|
||||
|
||||
val unsubscribeAndPrint: (Any?) -> String = { resp ->
|
||||
if (resp is StateMachineUpdate.Added) {
|
||||
resp.stateMachineInfo.progressTrackerStepAndUpdates?.updates?.notUsed()
|
||||
}
|
||||
printerFun(resp)
|
||||
}
|
||||
|
||||
result = printNextElements(response.updates, unsubscribeAndPrint, out)
|
||||
}
|
||||
response is Observable<*> -> {
|
||||
result = printNextElements(response, printerFun, out)
|
||||
}
|
||||
response != null -> {
|
||||
out.println(printerFun(response))
|
||||
}
|
||||
}
|
||||
if (response is Observable<*>) {
|
||||
|
||||
return printNextElements(response, printerFun, out)
|
||||
}
|
||||
|
||||
out.println(printerFun(response))
|
||||
return doneFuture(Unit)
|
||||
return result
|
||||
}
|
||||
|
||||
private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> {
|
||||
|
||||
private fun printNextElements(
|
||||
elements: Observable<*>,
|
||||
printerFun: (Any?) -> String,
|
||||
out: PrintWriter
|
||||
): Pair<PrintingSubscriber?, CordaFuture<Unit>> {
|
||||
val subscriber = PrintingSubscriber(printerFun, out)
|
||||
uncheckedCast(elements).subscribe(subscriber)
|
||||
return subscriber.future
|
||||
return Pair(subscriber, subscriber.future)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -177,7 +177,7 @@ abstract class ANSIProgressRenderer {
|
||||
ansi.fgRed()
|
||||
ansi.a("${IntStream.range(indent, indent).mapToObj { "\t" }.toList().joinToString(separator = "") { s -> s }} $errorIcon ${error.message}")
|
||||
ansi.reset()
|
||||
errorToPrint = error.cause
|
||||
errorToPrint = errorToPrint.cause
|
||||
indent++
|
||||
}
|
||||
ansi.eraseLine(Ansi.Erase.FORWARD)
|
||||
|
Loading…
x
Reference in New Issue
Block a user