diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 354adb1db8..a34d83e643 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -2464,7 +2464,6 @@ public interface net.corda.core.flows.IdentifiableException @Nullable public Long getErrorId() ## -@CordaSerializable public final class net.corda.core.flows.IllegalFlowLogicException extends java.lang.IllegalArgumentException public (Class, String) public (String, String) diff --git a/.ci/dev/unit/Jenkinsfile b/.ci/dev/unit/Jenkinsfile index 67d5e16e2b..9bd4c4243b 100644 --- a/.ci/dev/unit/Jenkinsfile +++ b/.ci/dev/unit/Jenkinsfile @@ -29,20 +29,18 @@ pipeline { } } - stage('Corda Pull Request - Run Tests') { - stage('Unit Tests') { - steps { - sh "./gradlew " + - "-DbuildId=\"\${BUILD_ID}\" " + - "-Dkubenetize=true " + - "-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" + - " allParallelUnitTest" - if (env.CHANGE_ID) { - pullRequest.createStatus(status: 'success', - context: 'continuous-integration/jenkins/pr-merge/unitTest', - description: 'Unit Tests Passed', - targetUrl: "${env.JOB_URL}/testResults") - } + stage('Unit Tests') { + steps { + sh "./gradlew " + + "-DbuildId=\"\${BUILD_ID}\" " + + "-Dkubenetize=true " + + "-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" + + " allParallelUnitTest" + if (env.CHANGE_ID) { + pullRequest.createStatus(status: 'success', + context: 'continuous-integration/jenkins/pr-merge/unitTest', + description: 'Unit Tests Passed', + targetUrl: "${env.JOB_URL}/testResults") } } } diff --git a/Jenkinsfile b/Jenkinsfile index d19be16f6e..c43acc1037 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,3 +1,4 @@ +import static com.r3.build.BuildControl.killAllExistingBuildsForJob @Library('existing-build-control') import static com.r3.build.BuildControl.killAllExistingBuildsForJob @@ -39,15 +40,6 @@ pipeline { " allParallelIntegrationTest" } } - stage('Unit Tests') { - steps { - sh "./gradlew " + - "-DbuildId=\"\${BUILD_ID}\" " + - "-Dkubenetize=true " + - "-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" + - " allParallelUnitTest" - } - } } } diff --git a/build.gradle b/build.gradle index c5d1822e76..074e3e5811 100644 --- a/build.gradle +++ b/build.gradle @@ -612,6 +612,14 @@ task allParallelSlowIntegrationTest(type: ParallelTestGroup) { memoryInGbPerFork 10 distribute Distribution.CLASS } +task allParallelSmokeTest(type: ParallelTestGroup) { + testGroups "smokeTest" + numberOfShards 4 + streamOutput true + coresPerFork 6 + memoryInGbPerFork 10 + distribute Distribution.METHOD +} task allParallelUnitTest(type: ParallelTestGroup) { testGroups "test" numberOfShards 10 @@ -623,7 +631,15 @@ task allParallelUnitTest(type: ParallelTestGroup) { task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) { testGroups "test", "integrationTest" numberOfShards 15 - streamOutput true + streamOutput false + coresPerFork 6 + memoryInGbPerFork 10 + distribute Distribution.CLASS +} +task parallelRegressionTest(type: ParallelTestGroup) { + testGroups "test", "integrationTest", "slowIntegrationTest", "smokeTest" + numberOfShards 5 + streamOutput false coresPerFork 6 memoryInGbPerFork 10 distribute Distribution.CLASS diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 87468c4f89..0e2e78274d 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -23,15 +23,7 @@ allprojects { } } -configurations { - runtime -} - dependencies { - // Add the top-level projects ONLY to the host project. - runtime project.childProjects.collect { n, p -> - project(p.path) - } compile gradleApi() compile "io.fabric8:kubernetes-client:4.4.1" compile 'org.apache.commons:commons-compress:1.19' diff --git a/buildSrc/src/main/groovy/net/corda/testing/BucketingAllocator.java b/buildSrc/src/main/groovy/net/corda/testing/BucketingAllocator.java index 0df56553c2..d75bd4828d 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/BucketingAllocator.java +++ b/buildSrc/src/main/groovy/net/corda/testing/BucketingAllocator.java @@ -51,13 +51,12 @@ public class BucketingAllocator { System.out.println("Number of tests: " + container.testsForFork.stream().mapToInt(b -> b.foundTests.size()).sum()); System.out.println("Tests to Run: "); container.testsForFork.forEach(tb -> { - System.out.println(tb.nameWithAsterix); + System.out.println(tb.testName); tb.foundTests.forEach(ft -> System.out.println("\t" + ft.getFirst() + ", " + ft.getSecond())); }); }); } - private void allocateTestsToForks(@NotNull List matchedTests) { matchedTests.forEach(matchedTestBucket -> { TestsForForkContainer smallestContainer = Collections.min(forkContainers, Comparator.comparing(TestsForForkContainer::getCurrentDuration)); @@ -69,10 +68,9 @@ public class BucketingAllocator { return allDiscoveredTests.stream().map(tuple -> { String testName = tuple.getFirst(); Object task = tuple.getSecond(); - String noAsterixName = testName.substring(0, testName.length() - 1); //2DO [can this filtering algorithm be improved - the test names are sorted, it should be possible to do something using binary search] - List> matchingTests = allTestsFromCSV.stream().filter(testFromCSV -> testFromCSV.getFirst().startsWith(noAsterixName)).collect(Collectors.toList()); - return new TestBucket(task, testName, noAsterixName, matchingTests); + List> matchingTests = allTestsFromCSV.stream().filter(testFromCSV -> testFromCSV.getFirst().startsWith(testName)).collect(Collectors.toList()); + return new TestBucket(task, testName, matchingTests); }).sorted(Comparator.comparing(TestBucket::getDuration).reversed()).collect(Collectors.toList()); } @@ -81,22 +79,20 @@ public class BucketingAllocator { TestLister lister = source.getFirst(); Object testTask = source.getSecond(); return lister.getAllTestsDiscovered().stream().map(test -> new Tuple2<>(test, testTask)).collect(Collectors.toList()); - }).flatMap(Collection::stream).collect(Collectors.toList()); + }).flatMap(Collection::stream).sorted(Comparator.comparing(Tuple2::getFirst)).collect(Collectors.toList()); } public static class TestBucket { final Object testTask; - final String nameWithAsterix; - final String nameWithoutAsterix; + final String testName; final List> foundTests; final Double duration; - public TestBucket(Object testTask, String nameWithAsterix, String nameWithoutAsterix, List> foundTests) { + public TestBucket(Object testTask, String testName, List> foundTests) { this.testTask = testTask; - this.nameWithAsterix = nameWithAsterix; - this.nameWithoutAsterix = nameWithoutAsterix; + this.testName = testName; this.foundTests = foundTests; - duration = Math.max(foundTests.stream().mapToDouble(tp -> Math.max(tp.getSecond(), 10)).sum(), 10); + duration = Math.max(foundTests.stream().mapToDouble(tp -> Math.max(tp.getSecond(), 1)).sum(), 1); } public Double getDuration() { @@ -107,8 +103,7 @@ public class BucketingAllocator { public String toString() { return "TestBucket{" + "testTask=" + testTask + - ", nameWithAsterix='" + nameWithAsterix + '\'' + - ", nameWithoutAsterix='" + nameWithoutAsterix + '\'' + + ", nameWithAsterix='" + testName + '\'' + ", foundTests=" + foundTests + ", duration=" + duration + '}'; @@ -142,7 +137,7 @@ public class BucketingAllocator { } public List getTestsForTask(Object task) { - return frozenTests.getOrDefault(task, Collections.emptyList()).stream().map(it -> it.nameWithAsterix).collect(Collectors.toList()); + return frozenTests.getOrDefault(task, Collections.emptyList()).stream().map(it -> it.testName).collect(Collectors.toList()); } public List getBucketsForFork() { diff --git a/buildSrc/src/main/groovy/net/corda/testing/BucketingAllocatorTask.java b/buildSrc/src/main/groovy/net/corda/testing/BucketingAllocatorTask.java index 6f073a5fc2..37b9b4a0e6 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/BucketingAllocatorTask.java +++ b/buildSrc/src/main/groovy/net/corda/testing/BucketingAllocatorTask.java @@ -41,8 +41,8 @@ public class BucketingAllocatorTask extends DefaultTask { this.dependsOn(source); } - public List getTestsForForkAndTestTask(Integer fork, Test testTask) { - return allocator.getTestsForForkAndTestTask(fork, testTask); + public List getTestIncludesForForkAndTestTask(Integer fork, Test testTask) { + return allocator.getTestsForForkAndTestTask(fork, testTask).stream().map(t -> t + "*").collect(Collectors.toList()); } @TaskAction @@ -56,11 +56,11 @@ public class BucketingAllocatorTask extends DefaultTask { String duration = "Duration(ms)"; List records = CSVFormat.DEFAULT.withHeader().parse(reader).getRecords(); return records.stream().map(record -> { - try{ + try { String testName = record.get(name); String testDuration = record.get(duration); - return new Tuple2<>(testName, Math.max(Double.parseDouble(testDuration), 10)); - }catch (IllegalArgumentException | IllegalStateException e){ + return new Tuple2<>(testName, Math.max(Double.parseDouble(testDuration), 1)); + } catch (IllegalArgumentException | IllegalStateException e) { return null; } }).filter(Objects::nonNull).sorted(Comparator.comparing(Tuple2::getFirst)).collect(Collectors.toList()); diff --git a/buildSrc/src/main/groovy/net/corda/testing/DistributedTesting.groovy b/buildSrc/src/main/groovy/net/corda/testing/DistributedTesting.groovy index bd3809d200..c6dfc2d5a2 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/DistributedTesting.groovy +++ b/buildSrc/src/main/groovy/net/corda/testing/DistributedTesting.groovy @@ -131,7 +131,7 @@ class DistributedTesting implements Plugin { filter { def fork = getPropertyAsInt(subProject, "dockerFork", 0) subProject.logger.info("requesting tests to include in testing task ${task.getPath()} (idx: ${fork})") - List includes = globalAllocator.getTestsForForkAndTestTask( + List includes = globalAllocator.getTestIncludesForForkAndTestTask( fork, task) subProject.logger.info "got ${includes.size()} tests to include into testing task ${task.getPath()}" diff --git a/buildSrc/src/main/groovy/net/corda/testing/KubesTest.java b/buildSrc/src/main/groovy/net/corda/testing/KubesTest.java new file mode 100644 index 0000000000..4d50730289 --- /dev/null +++ b/buildSrc/src/main/groovy/net/corda/testing/KubesTest.java @@ -0,0 +1,339 @@ +package net.corda.testing; + +import io.fabric8.kubernetes.api.model.*; +import io.fabric8.kubernetes.client.*; +import io.fabric8.kubernetes.client.dsl.ExecListener; +import io.fabric8.kubernetes.client.dsl.ExecWatch; +import io.fabric8.kubernetes.client.utils.Serialization; +import okhttp3.Response; +import org.gradle.api.DefaultTask; +import org.gradle.api.tasks.TaskAction; +import org.jetbrains.annotations.NotNull; + +import java.io.*; +import java.math.BigInteger; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class KubesTest extends DefaultTask { + + static final ExecutorService executorService = Executors.newCachedThreadPool(); + static final ExecutorService singleThreadedExecutor = Executors.newSingleThreadExecutor(); + + String dockerTag; + String fullTaskToExecutePath; + String taskToExecuteName; + Boolean printOutput = false; + Integer numberOfCoresPerFork = 4; + Integer memoryGbPerFork = 6; + public volatile List testOutput = Collections.emptyList(); + public volatile List containerResults = Collections.emptyList(); + + String namespace = "thisisatest"; + int k8sTimeout = 50 * 1_000; + int webSocketTimeout = k8sTimeout * 6; + int numberOfPods = 20; + int timeoutInMinutesForPodToStart = 60; + + Distribution distribution = Distribution.METHOD; + + @TaskAction + public void runDistributedTests() { + String buildId = System.getProperty("buildId", "0"); + String currentUser = System.getProperty("user.name", "UNKNOWN_USER"); + + String stableRunId = new BigInteger(64, new Random(buildId.hashCode() + currentUser.hashCode() + taskToExecuteName.hashCode())).toString(36).toLowerCase(); + String suffix = new BigInteger(64, new Random()).toString(36).toLowerCase(); + + final KubernetesClient client = getKubernetesClient(); + + + try { + client.pods().inNamespace(namespace).list().getItems().forEach(podToDelete -> { + if (podToDelete.getMetadata().getName().contains(stableRunId)) { + getProject().getLogger().lifecycle("deleting: " + podToDelete.getMetadata().getName()); + client.resource(podToDelete).delete(); + } + }); + } catch (Exception ignored) { + //it's possible that a pod is being deleted by the original build, this can lead to racey conditions + } + + List> futures = IntStream.range(0, numberOfPods).mapToObj(i -> { + String potentialPodName = (taskToExecuteName + "-" + stableRunId + suffix + i).toLowerCase(); + String podName = potentialPodName.substring(0, Math.min(potentialPodName.length(), 62)); + return runBuild(client, namespace, numberOfPods, i, podName, printOutput, 3); + }).collect(Collectors.toList()); + + this.testOutput = Collections.synchronizedList(futures.stream().map(it -> { + try { + return it.get().getBinaryResults(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }).flatMap(Collection::stream).collect(Collectors.toList())); + this.containerResults = futures.stream().map(it -> { + try { + return it.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } + + @NotNull + KubernetesClient getKubernetesClient() { + io.fabric8.kubernetes.client.Config config = new io.fabric8.kubernetes.client.ConfigBuilder() + .withConnectionTimeout(k8sTimeout) + .withRequestTimeout(k8sTimeout) + .withRollingTimeout(k8sTimeout) + .withWebsocketTimeout(webSocketTimeout) + .withWebsocketPingInterval(webSocketTimeout) + .build(); + + return new DefaultKubernetesClient(config); + } + + CompletableFuture runBuild(KubernetesClient client, + String namespace, + int numberOfPods, + int podIdx, + String podName, + boolean printOutput, + int numberOfRetries) { + + CompletableFuture toReturn = new CompletableFuture<>(); + executorService.submit(() -> { + int tryCount = 0; + Pod createdPod = null; + while (tryCount < numberOfRetries) { + try { + Pod podRequest = buildPod(podName); + getProject().getLogger().lifecycle("requesting pod: " + podName); + createdPod = client.pods().inNamespace(namespace).create(podRequest); + getProject().getLogger().lifecycle("scheduled pod: " + podName); + File outputFile = Files.createTempFile("container", ".log").toFile(); + attachStatusListenerToPod(client, namespace, podName); + schedulePodForDeleteOnShutdown(podName, client, createdPod); + waitForPodToStart(podName, client, namespace); + PipedOutputStream stdOutOs = new PipedOutputStream(); + PipedInputStream stdOutIs = new PipedInputStream(4096); + ByteArrayOutputStream errChannelStream = new ByteArrayOutputStream(); + KubePodResult result = new KubePodResult(createdPod, outputFile); + CompletableFuture waiter = new CompletableFuture<>(); + ExecListener execListener = buildExecListenerForPod(podName, errChannelStream, waiter, result); + stdOutIs.connect(stdOutOs); + String[] buildCommand = getBuildCommand(numberOfPods, podIdx); + ExecWatch execWatch = client.pods().inNamespace(namespace).withName(podName) + .writingOutput(stdOutOs) + .writingErrorChannel(errChannelStream) + .usingListener(execListener).exec(buildCommand); + + startLogPumping(outputFile, stdOutIs, podIdx, printOutput); + KubePodResult execResult = waiter.join(); + getLogger().lifecycle("build has ended on on pod " + podName + " (" + podIdx + "/" + numberOfPods + ")"); + getLogger().lifecycle("Gathering test results from " + execResult.getCreatedPod().getMetadata().getName()); + Collection binaryResults = downloadTestXmlFromPod(client, namespace, execResult.getCreatedPod()); + getLogger().lifecycle("deleting: " + execResult.getCreatedPod().getMetadata().getName()); + client.resource(execResult.getCreatedPod()).delete(); + result.setBinaryResults(binaryResults); + toReturn.complete(result); + break; + } catch (Exception e) { + getLogger().error("Encountered error during testing cycle on pod " + podName + " (" + podIdx / numberOfPods + ")", e); + try { + if (createdPod != null) { + client.pods().delete(createdPod); + while (client.pods().inNamespace(namespace).list().getItems().stream().anyMatch(p -> Objects.equals(p.getMetadata().getName(), podName))) { + getLogger().warn("pod " + podName + " has not been deleted, waiting 1s"); + Thread.sleep(1000); + } + } + } catch (Exception ignored) { + } + tryCount++; + getLogger().lifecycle("will retry ${podName} another ${numberOfRetries - tryCount} times"); + } + } + if (tryCount >= numberOfRetries) { + toReturn.completeExceptionally(new RuntimeException("Failed to build in pod ${podName} (${podIdx}/${numberOfPods}) within retry limit")); + } + }); + return toReturn; + } + + + Pod buildPod(String podName) { + return new PodBuilder().withNewMetadata().withName(podName).endMetadata() + .withNewSpec() + .addNewVolume() + .withName("gradlecache") + .withNewHostPath() + .withPath("/tmp/gradle") + .withType("DirectoryOrCreate") + .endHostPath() + .endVolume() + .addNewContainer() + .withImage(dockerTag) + .withCommand("bash") + .withArgs("-c", "sleep 3600") + .addNewEnv() + .withName("DRIVER_NODE_MEMORY") + .withValue("1024m") + .withName("DRIVER_WEB_MEMORY") + .withValue("1024m") + .endEnv() + .withName(podName) + .withNewResources() + .addToRequests("cpu", new Quantity(numberOfCoresPerFork.toString())) + .addToRequests("memory", new Quantity(memoryGbPerFork.toString() + "Gi")) + .endResources() + .addNewVolumeMount() + .withName("gradlecache") + .withMountPath("/tmp/gradle") + .endVolumeMount() + .endContainer() + .withImagePullSecrets(new LocalObjectReference("regcred")) + .withRestartPolicy("Never") + .endSpec() + .build(); + } + + void startLogPumping(File outputFile, InputStream stdOutIs, Integer podIdx, boolean printOutput) { + Thread loggingThread = new Thread(() -> { + try (BufferedWriter out = new BufferedWriter(new FileWriter(outputFile)); + BufferedReader br = new BufferedReader(new InputStreamReader(stdOutIs))) { + String line; + while ((line = br.readLine()) != null) { + String toWrite = ("Container" + podIdx + ": " + line).trim(); + if (printOutput) { + getProject().getLogger().lifecycle(toWrite); + } + out.write(line); + out.newLine(); + } + } catch (IOException ignored) { + } + }); + + loggingThread.setDaemon(true); + loggingThread.start(); + } + + Watch attachStatusListenerToPod(KubernetesClient client, String namespace, String podName) { + return client.pods().inNamespace(namespace).withName(podName).watch(new Watcher() { + @Override + public void eventReceived(Watcher.Action action, Pod resource) { + getProject().getLogger().lifecycle("[StatusChange] pod " + resource.getMetadata().getName() + " " + action.name() + " (" + resource.getStatus().getPhase() + ")"); + } + + @Override + public void onClose(KubernetesClientException cause) { + } + }); + } + + void waitForPodToStart(String podName, KubernetesClient client, String namespace) { + getProject().getLogger().lifecycle("Waiting for pod " + podName + " to start before executing build"); + try { + client.pods().inNamespace(namespace).withName(podName).waitUntilReady(timeoutInMinutesForPodToStart, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + getProject().getLogger().lifecycle("pod " + podName + " has started, executing build"); + } + + Collection downloadTestXmlFromPod(KubernetesClient client, String namespace, Pod cp) { + String resultsInContainerPath = "/tmp/source/build/test-reports"; + String binaryResultsFile = "results.bin"; + String podName = cp.getMetadata().getName(); + Path tempDir = new File(new File(getProject().getBuildDir(), "test-results-xml"), podName).toPath(); + + if (!tempDir.toFile().exists()) { + tempDir.toFile().mkdirs(); + } + + getProject().getLogger().lifecycle("Saving " + podName + " results to: " + tempDir.toAbsolutePath().toFile().getAbsolutePath()); + client.pods() + .inNamespace(namespace) + .withName(podName) + .dir(resultsInContainerPath) + .copy(tempDir); + + return findFolderContainingBinaryResultsFile(new File(tempDir.toFile().getAbsolutePath()), binaryResultsFile); + } + + String[] getBuildCommand(int numberOfPods, int podIdx) { + String shellScript = "let x=1 ; while [ ${x} -ne 0 ] ; do echo \"Waiting for DNS\" ; curl services.gradle.org > /dev/null 2>&1 ; x=$? ; sleep 1 ; done ; " + "cd /tmp/source ; " + + "let y=1 ; while [ ${y} -ne 0 ] ; do echo \"Preparing build directory\" ; ./gradlew testClasses integrationTestClasses --parallel 2>&1 ; y=$? ; sleep 1 ; done ;" + + "./gradlew -D" + ListTests.DISTRIBUTION_PROPERTY + "=" + distribution.name() + " -Dkubenetize -PdockerFork=" + podIdx + " -PdockerForks=" + numberOfPods + " " + fullTaskToExecutePath + " --info 2>&1 ;" + + "let rs=$? ; sleep 10 ; exit ${rs}"; + return new String[]{"bash", "-c", shellScript}; + } + + List findFolderContainingBinaryResultsFile(File start, String fileNameToFind) { + Queue filesToInspect = new LinkedList<>(Collections.singletonList(start)); + List folders = new ArrayList<>(); + while (!filesToInspect.isEmpty()) { + File fileToInspect = filesToInspect.poll(); + if (fileToInspect.getAbsolutePath().endsWith(fileNameToFind)) { + folders.add(fileToInspect.getParentFile()); + } + + if (fileToInspect.isDirectory()) { + filesToInspect.addAll(Arrays.stream(fileToInspect.listFiles()).collect(Collectors.toList())); + } + } + return folders; + } + + void schedulePodForDeleteOnShutdown(String podName, KubernetesClient client, Pod createdPod) { + getProject().getLogger().info("attaching shutdown hook for pod " + podName); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("Deleting pod: " + podName); + client.pods().delete(createdPod); + })); + } + + ExecListener buildExecListenerForPod(String podName, ByteArrayOutputStream errChannelStream, CompletableFuture waitingFuture, KubePodResult result) { + + return new ExecListener() { + final Long start = System.currentTimeMillis(); + + @Override + public void onOpen(Response response) { + getProject().getLogger().lifecycle("Build started on pod " + podName); + } + + @Override + public void onFailure(Throwable t, Response response) { + getProject().getLogger().lifecycle("Received error from rom pod " + podName); + waitingFuture.completeExceptionally(t); + } + + @Override + public void onClose(int code, String reason) { + getProject().getLogger().lifecycle("Received onClose() from pod " + podName + " , build took: " + ((System.currentTimeMillis() - start) / 1000) + " seconds"); + try { + String errChannelContents = errChannelStream.toString(); + Status status = Serialization.unmarshal(errChannelContents, Status.class); + Integer resultCode = Optional.ofNullable(status).map(Status::getDetails) + .map(StatusDetails::getCauses) + .flatMap(c -> c.stream().findFirst()) + .map(StatusCause::getMessage) + .map(Integer::parseInt).orElse(0); + result.setResultCode(resultCode); + waitingFuture.complete(result); + } catch (Exception e) { + waitingFuture.completeExceptionally(e); + } + } + }; + } + + +} diff --git a/buildSrc/src/main/groovy/net/corda/testing/ListTests.groovy b/buildSrc/src/main/groovy/net/corda/testing/ListTests.groovy index 09c6290e90..b1836d1eec 100644 --- a/buildSrc/src/main/groovy/net/corda/testing/ListTests.groovy +++ b/buildSrc/src/main/groovy/net/corda/testing/ListTests.groovy @@ -79,7 +79,7 @@ class ListTests extends DefaultTask implements TestLister { .collect { c -> (c.getSubclasses() + Collections.singletonList(c)) } .flatten() .collect { ClassInfo c -> - c.getMethodInfo().filter { m -> m.hasAnnotation("org.junit.Test") }.collect { m -> c.name + "." + m.name + "*" } + c.getMethodInfo().filter { m -> m.hasAnnotation("org.junit.Test") }.collect { m -> c.name + "." + m.name } }.flatten() .toSet() @@ -97,7 +97,7 @@ class ListTests extends DefaultTask implements TestLister { .getClassesWithMethodAnnotation("org.junit.Test") .collect { c -> (c.getSubclasses() + Collections.singletonList(c)) } .flatten() - .collect { ClassInfo c -> c.name + "*" }.flatten() + .collect { ClassInfo c -> c.name }.flatten() .toSet() this.allTests = results.stream().sorted().collect(Collectors.toList()) break diff --git a/buildSrc/src/test/java/net/corda/testing/BucketingAllocatorTest.java b/buildSrc/src/test/java/net/corda/testing/BucketingAllocatorTest.java index 7a7c239489..1b13eb086b 100644 --- a/buildSrc/src/test/java/net/corda/testing/BucketingAllocatorTest.java +++ b/buildSrc/src/test/java/net/corda/testing/BucketingAllocatorTest.java @@ -21,12 +21,12 @@ public class BucketingAllocatorTest { BucketingAllocator bucketingAllocator = new BucketingAllocator(1, Collections::emptyList); Object task = new Object(); - bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass*", "AnotherTestingClass*"), task); + bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass", "AnotherTestingClass"), task); bucketingAllocator.generateTestPlan(); List testsForForkAndTestTask = bucketingAllocator.getTestsForForkAndTestTask(0, task); - Assert.assertThat(testsForForkAndTestTask, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass*", "AnotherTestingClass*")); + Assert.assertThat(testsForForkAndTestTask, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass", "AnotherTestingClass")); } @@ -37,7 +37,7 @@ public class BucketingAllocatorTest { BucketingAllocator bucketingAllocator = new BucketingAllocator(2, Collections::emptyList); Object task = new Object(); - bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass*", "AnotherTestingClass*"), task); + bucketingAllocator.addSource(() -> Arrays.asList("SomeTestingClass", "AnotherTestingClass"), task); bucketingAllocator.generateTestPlan(); List testsForForkOneAndTestTask = bucketingAllocator.getTestsForForkAndTestTask(0, task); @@ -48,7 +48,7 @@ public class BucketingAllocatorTest { List allTests = Stream.of(testsForForkOneAndTestTask, testsForForkTwoAndTestTask).flatMap(Collection::stream).collect(Collectors.toList()); - Assert.assertThat(allTests, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass*", "AnotherTestingClass*")); + Assert.assertThat(allTests, IsIterableContainingInAnyOrder.containsInAnyOrder("SomeTestingClass", "AnotherTestingClass")); } } \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 7c1d7cdc2d..ee2f5052a6 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -1,10 +1,10 @@ package net.corda.client.rpc.internal -import co.paralleluniverse.common.util.SameThreadExecutor import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.RemovalCause import com.github.benmanes.caffeine.cache.RemovalListener +import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.ThreadFactoryBuilder import net.corda.client.rpc.ConnectionFailureException @@ -132,7 +132,10 @@ class RPCClientProxyHandler( private var sendExecutor: ExecutorService? = null // A sticky pool for running Observable.onNext()s. We need the stickiness to preserve the observation ordering. - private val observationExecutorThreadFactory = ThreadFactoryBuilder().setNameFormat("rpc-client-observation-pool-%d").setDaemon(true).build() + private val observationExecutorThreadFactory = ThreadFactoryBuilder() + .setNameFormat("rpc-client-observation-pool-%d") + .setDaemon(true) + .build() private val observationExecutorPool = LazyStickyPool(rpcConfiguration.observationExecutorPoolSize) { Executors.newFixedThreadPool(1, observationExecutorThreadFactory) } @@ -156,12 +159,14 @@ class RPCClientProxyHandler( private val observablesToReap = ThreadBox(object { var observables = ArrayList() }) - private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext) + private val serializationContextWithObservableContext = RpcClientObservableDeSerializer + .createContext(serializationContext, observableContext) private fun createRpcObservableMap(): RpcObservableMap { val onObservableRemove = RemovalListener>> { key, _, cause -> val observableId = key!! val rpcCallSite: CallSite? = callSiteMap?.remove(observableId) + if (cause == RemovalCause.COLLECTED) { log.warn(listOf( "A hot observable returned from an RPC was never subscribed to.", @@ -175,7 +180,13 @@ class RPCClientProxyHandler( } observablesToReap.locked { observables.add(observableId) } } - return cacheFactory.buildNamed(Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()), "RpcClientProxyHandler_rpcObservable") + return cacheFactory.buildNamed( + Caffeine.newBuilder() + .weakValues() + .removalListener(onObservableRemove) + .executor(MoreExecutors.directExecutor()), + "RpcClientProxyHandler_rpcObservable" + ) } private var sessionFactory: ClientSessionFactory? = null diff --git a/constants.properties b/constants.properties index f08b087465..0352e9178b 100644 --- a/constants.properties +++ b/constants.properties @@ -32,3 +32,5 @@ metricsVersion=4.1.0 metricsNewRelicVersion=1.1.1 openSourceBranch=https://github.com/corda/corda/blob/master openSourceSamplesBranch=https://github.com/corda/samples/blob/master +jolokiaAgentVersion=1.6.1 + diff --git a/core-tests/src/test/kotlin/net/corda/coretests/contracts/TransactionVerificationExceptionSerialisationTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/contracts/TransactionVerificationExceptionSerialisationTests.kt index 03e2f5e731..e8bf2cf807 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/contracts/TransactionVerificationExceptionSerialisationTests.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/contracts/TransactionVerificationExceptionSerialisationTests.kt @@ -1,8 +1,13 @@ package net.corda.coretests.contracts +import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint import net.corda.core.contracts.Contract +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionState import net.corda.core.contracts.TransactionVerificationException import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.generateKeyPair +import net.corda.core.identity.Party import net.corda.core.internal.createContractCreationError import net.corda.core.internal.createContractRejection import net.corda.core.transactions.LedgerTransaction @@ -13,6 +18,9 @@ import net.corda.serialization.internal.amqp.SerializationOutput import net.corda.serialization.internal.amqp.SerializerFactoryBuilder import net.corda.serialization.internal.amqp.custom.PublicKeySerializer import net.corda.serialization.internal.amqp.custom.ThrowableSerializer +import net.corda.testing.common.internal.testNetworkParameters +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME import net.corda.testing.core.DUMMY_BANK_A_NAME import net.corda.testing.core.DUMMY_NOTARY_NAME import net.corda.testing.core.TestIdentity @@ -23,7 +31,9 @@ class TransactionVerificationExceptionSerialisationTests { private fun defaultFactory() = SerializerFactoryBuilder.build( AllWhitelist, ClassLoader.getSystemClassLoader() - ).apply { register(ThrowableSerializer(this)) } + ).apply { + register(ThrowableSerializer(this)) + } private val context get() = AMQP_RPC_CLIENT_CONTEXT @@ -179,4 +189,125 @@ class TransactionVerificationExceptionSerialisationTests { assertEquals(exc.message, exc2.message) } + + @Test + fun transactionNetworkParameterOrderingExceptionTest() { + val exception = TransactionVerificationException.TransactionNetworkParameterOrderingException( + txid, + StateRef(SecureHash.zeroHash, 1), + testNetworkParameters(), + testNetworkParameters()) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } + + @Test + fun missingNetworkParametersExceptionTest() { + val exception = TransactionVerificationException.MissingNetworkParametersException(txid, SecureHash.zeroHash) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } + + @Test + fun constraintPropagationRejectionTest() { + val exception = TransactionVerificationException.ConstraintPropagationRejection(txid, "com.test.Contract", + AlwaysAcceptAttachmentConstraint, AlwaysAcceptAttachmentConstraint) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + assertEquals("com.test.Contract", exception2.contractClass) + } + + @Test + fun transactionDuplicateEncumbranceExceptionTest() { + val exception = TransactionVerificationException.TransactionDuplicateEncumbranceException(txid, 1) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } + + @Test + fun transactionNonMatchingEncumbranceExceptionTest() { + val exception = TransactionVerificationException.TransactionNonMatchingEncumbranceException(txid, listOf(1, 2, 3)) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } + + @Test + fun transactionNotaryMismatchEncumbranceExceptionTest() { + val exception = TransactionVerificationException.TransactionNotaryMismatchEncumbranceException( + txid, 1, 2, Party(ALICE_NAME, generateKeyPair().public), Party(BOB_NAME, generateKeyPair().public)) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } + + @Test + fun transactionContractConflictExceptionTest() { + val exception = TransactionVerificationException.TransactionContractConflictException( + txid, TransactionState(DummyContractState(), notary = Party(BOB_NAME, generateKeyPair().public)), "aa") + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } + + @Test + fun transactionRequiredContractUnspecifiedExceptionTest() { + val exception = TransactionVerificationException.TransactionRequiredContractUnspecifiedException( + txid, TransactionState(DummyContractState(), notary = Party(BOB_NAME, generateKeyPair().public))) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } } \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/contracts/TransactionVerificationException.kt b/core/src/main/kotlin/net/corda/core/contracts/TransactionVerificationException.kt index 84429f19bf..826a7d9eb4 100644 --- a/core/src/main/kotlin/net/corda/core/contracts/TransactionVerificationException.kt +++ b/core/src/main/kotlin/net/corda/core/contracts/TransactionVerificationException.kt @@ -70,8 +70,17 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S * @property outputConstraint The constraint of the outputs state. */ @KeepForDJVM - class ConstraintPropagationRejection(txId: SecureHash, val contractClass: String, inputConstraint: AttachmentConstraint, outputConstraint: AttachmentConstraint) - : TransactionVerificationException(txId, "Contract constraints for $contractClass are not propagated correctly. The outputConstraint: $outputConstraint is not a valid transition from the input constraint: $inputConstraint.", null) + class ConstraintPropagationRejection(txId: SecureHash, message: String) : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, + contractClass: String, + inputConstraint: AttachmentConstraint, + outputConstraint: AttachmentConstraint) : + this(txId, "Contract constraints for $contractClass are not propagated correctly. " + + "The outputConstraint: $outputConstraint is not a valid transition from the input constraint: $inputConstraint.") + + // This is only required for backwards compatibility. In case the message format changes, update the index. + val contractClass: String = message.split(" ")[3] + } /** * The transaction attachment that contains the [contractClass] class didn't meet the constraints specified by @@ -153,19 +162,24 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S * be satisfied. */ @KeepForDJVM - class TransactionDuplicateEncumbranceException(txId: SecureHash, index: Int) - : TransactionVerificationException(txId, "The bi-directionality property of encumbered output states " + - "is not satisfied. Index $index is referenced more than once", null) + class TransactionDuplicateEncumbranceException(txId: SecureHash, message: String) + : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, index: Int) : this(txId, "The bi-directionality property of encumbered output states " + + "is not satisfied. Index $index is referenced more than once") + } /** * An encumbered state should also be referenced as the encumbrance of another state in order to satisfy the * bi-directionality property (a full cycle should be present). */ @KeepForDJVM - class TransactionNonMatchingEncumbranceException(txId: SecureHash, nonMatching: Collection) - : TransactionVerificationException(txId, "The bi-directionality property of encumbered output states " + - "is not satisfied. Encumbered states should also be referenced as an encumbrance of another state to form " + - "a full cycle. Offending indices $nonMatching", null) + class TransactionNonMatchingEncumbranceException(txId: SecureHash, message: String) + : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, nonMatching: Collection) : this(txId, + "The bi-directionality property of encumbered output states " + + "is not satisfied. Encumbered states should also be referenced as an encumbrance of another state to form " + + "a full cycle. Offending indices $nonMatching") + } /** * All encumbered states should be assigned to the same notary. This is due to the fact that multi-notary @@ -173,9 +187,13 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S * in the same transaction. */ @KeepForDJVM - class TransactionNotaryMismatchEncumbranceException(txId: SecureHash, encumberedIndex: Int, encumbranceIndex: Int, encumberedNotary: Party, encumbranceNotary: Party) - : TransactionVerificationException(txId, "Encumbered output states assigned to different notaries found. " + - "Output state with index $encumberedIndex is assigned to notary [$encumberedNotary], while its encumbrance with index $encumbranceIndex is assigned to notary [$encumbranceNotary]", null) + class TransactionNotaryMismatchEncumbranceException(txId: SecureHash, message: String) + : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, encumberedIndex: Int, encumbranceIndex: Int, encumberedNotary: Party, encumbranceNotary: Party) : + this(txId, "Encumbered output states assigned to different notaries found. " + + "Output state with index $encumberedIndex is assigned to notary [$encumberedNotary], " + + "while its encumbrance with index $encumbranceIndex is assigned to notary [$encumbranceNotary]") + } /** * If a state is identified as belonging to a contract, either because the state class is defined as an inner class @@ -186,35 +204,44 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S * @param requiredContractClassName The class name of the contract to which the state belongs. */ @KeepForDJVM - class TransactionContractConflictException(txId: SecureHash, state: TransactionState, requiredContractClassName: String) - : TransactionVerificationException(txId, - """ - State of class ${state.data::class.java.typeName} belongs to contract $requiredContractClassName, but + class TransactionContractConflictException(txId: SecureHash, message: String) + : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, state: TransactionState, requiredContractClassName: String): this(txId, + """ + State of class ${state.data ::class.java.typeName} belongs to contract $requiredContractClassName, but is bundled in TransactionState with ${state.contract}. For details see: https://docs.corda.net/api-contract-constraints.html#contract-state-agreement - """.trimIndent().replace('\n', ' '), null) + """.trimIndent().replace('\n', ' ')) + } // TODO: add reference to documentation @KeepForDJVM - class TransactionRequiredContractUnspecifiedException(txId: SecureHash, state: TransactionState) - : TransactionVerificationException(txId, - """ + class TransactionRequiredContractUnspecifiedException(txId: SecureHash, message: String) + : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, state: TransactionState) : this(txId, + """ State of class ${state.data::class.java.typeName} does not have a specified owning contract. Add the @BelongsToContract annotation to this class to ensure that it can only be bundled in a TransactionState with the correct contract. For details see: https://docs.corda.net/api-contract-constraints.html#contract-state-agreement - """.trimIndent(), null) - + """.trimIndent()) + } /** * If the network parameters associated with an input or reference state in a transaction are more recent than the network parameters of the new transaction itself. */ @KeepForDJVM - class TransactionNetworkParameterOrderingException(txId: SecureHash, inputStateRef: StateRef, txnNetworkParameters: NetworkParameters, inputNetworkParameters: NetworkParameters) - : TransactionVerificationException(txId, "The network parameters epoch (${txnNetworkParameters.epoch}) of this transaction " + - "is older than the epoch (${inputNetworkParameters.epoch}) of input state: $inputStateRef", null) + class TransactionNetworkParameterOrderingException(txId: SecureHash, message: String) : + TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, + inputStateRef: StateRef, + txnNetworkParameters: NetworkParameters, + inputNetworkParameters: NetworkParameters) + : this(txId, "The network parameters epoch (${txnNetworkParameters.epoch}) of this transaction " + + "is older than the epoch (${inputNetworkParameters.epoch}) of input state: $inputStateRef") + } /** * Thrown when the network parameters with hash: missingNetworkParametersHash is not available at this node. Usually all the parameters @@ -224,9 +251,11 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S * @param missingNetworkParametersHash Missing hash of the network parameters associated to this transaction */ @KeepForDJVM - class MissingNetworkParametersException(txId: SecureHash, missingNetworkParametersHash: SecureHash) - : TransactionVerificationException(txId, "Couldn't find network parameters with hash: $missingNetworkParametersHash related to this transaction: $txId", null) - + class MissingNetworkParametersException(txId: SecureHash, message: String) + : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, missingNetworkParametersHash: SecureHash) : + this(txId, "Couldn't find network parameters with hash: $missingNetworkParametersHash related to this transaction: $txId") + } /** Whether the inputs or outputs list contains an encumbrance issue, see [TransactionMissingEncumbranceException]. */ @CordaSerializable diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 5333bf4f02..18e6dfa57d 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -6,6 +6,8 @@ import net.corda.core.CordaInternal import net.corda.core.DeleteForDJVM import net.corda.core.contracts.StateRef import net.corda.core.crypto.SecureHash +import net.corda.core.identity.AbstractParty +import net.corda.core.identity.AnonymousParty import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.* @@ -120,14 +122,18 @@ abstract class FlowLogic { * is routed depends on the [Destination] type, including whether this call does any initial communication. */ @Suspendable - fun initiateFlow(destination: Destination): FlowSession = stateMachine.initiateFlow(destination) + fun initiateFlow(destination: Destination): FlowSession { + require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" } + return stateMachine.initiateFlow(destination, serviceHub.identityService.wellKnownPartyFromAnonymous(destination as AbstractParty) + ?: throw IllegalArgumentException("Could not resolve destination: $destination")) + } /** * Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note * that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive. */ @Suspendable - fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party) + fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party, party) /** * Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt index 7cd903d712..7781c38b95 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt @@ -45,7 +45,6 @@ interface FlowLogicRefFactory { * * @property type the fully qualified name of the class that failed checks. */ -@CordaSerializable class IllegalFlowLogicException(val type: String, msg: String) : IllegalArgumentException("A FlowLogicRef cannot be constructed for FlowLogic of type $type: $msg") { constructor(type: Class<*>, msg: String) : this(type.name, msg) diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt index 83fc40a152..d93ff07f6b 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt @@ -18,7 +18,7 @@ interface FlowStateMachine { fun suspend(ioRequest: FlowIORequest, maySkipCheckpoint: Boolean): SUSPENDRETURN @Suspendable - fun initiateFlow(destination: Destination): FlowSession + fun initiateFlow(destination: Destination, wellKnownParty: Party): FlowSession fun checkFlowPermission(permissionName: String, extraAuditData: Map) diff --git a/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt b/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt index 3b7eab103a..abcaa6b4ff 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt @@ -80,9 +80,7 @@ interface IdentityService { * @param key The owning [PublicKey] of the [Party]. * @return Returns a [Party] with a matching owningKey if known, else returns null. */ - fun partyFromKey(key: PublicKey): Party? = - @Suppress("DEPRECATION") - certificateFromKey(key)?.party + fun partyFromKey(key: PublicKey): Party? /** * Resolves a party name to the well known identity [Party] instance for this name. Where possible well known identity diff --git a/detekt-baseline.xml b/detekt-baseline.xml index 740f8e1f80..d37757a64c 100644 --- a/detekt-baseline.xml +++ b/detekt-baseline.xml @@ -708,6 +708,7 @@ LongParameterList:LedgerTransaction.kt$LedgerTransaction$(inputs: List<StateAndRef<ContractState>>, outputs: List<TransactionState<ContractState>>, commands: List<CommandWithParties<CommandData>>, attachments: List<Attachment>, id: SecureHash, notary: Party?, timeWindow: TimeWindow?, privacySalt: PrivacySalt ) LongParameterList:LedgerTransaction.kt$LedgerTransaction.Companion$( inputs: List<StateAndRef<ContractState>>, outputs: List<TransactionState<ContractState>>, commands: List<CommandWithParties<CommandData>>, attachments: List<Attachment>, id: SecureHash, notary: Party?, timeWindow: TimeWindow?, privacySalt: PrivacySalt, networkParameters: NetworkParameters, references: List<StateAndRef<ContractState>>, componentGroups: List<ComponentGroup>? = null, serializedInputs: List<SerializedStateAndRef>? = null, serializedReferences: List<SerializedStateAndRef>? = null, isAttachmentTrusted: (Attachment) -> Boolean ) LongParameterList:MockServices.kt$MockServices.Companion$( cordappLoader: CordappLoader, identityService: IdentityService, networkParameters: NetworkParameters, initialIdentity: TestIdentity, moreKeys: Set<KeyPair>, keyManagementService: KeyManagementService, schemaService: SchemaService, persistence: CordaPersistence ) + LongParameterList:MockServices.kt$MockServices.Companion$( cordappPackages: List<String>, initialIdentity: TestIdentity, networkParameters: NetworkParameters = testNetworkParameters(modifiedTime = Instant.MIN), moreKeys: Set<KeyPair>, moreIdentities: Set<PartyAndCertificate>, cacheFactory: TestingNamedCacheFactory = TestingNamedCacheFactory() ) LongParameterList:NetworkBootstrapperTest.kt$NetworkBootstrapperTest$(copyCordapps: CopyCordapps = CopyCordapps.FirstRunOnly, packageOwnership: Map<String, PublicKey>? = emptyMap(), minimumPlatformVerison: Int? = PLATFORM_VERSION, maxMessageSize: Int? = DEFAULT_MAX_MESSAGE_SIZE, maxTransactionSize: Int? = DEFAULT_MAX_TRANSACTION_SIZE, eventHorizon: Duration? = 30.days) LongParameterList:NetworkMapUpdater.kt$NetworkMapUpdater$(trustRoot: X509Certificate, currentParametersHash: SecureHash, ourNodeInfo: SignedNodeInfo, networkParameters: NetworkParameters, keyManagementService: KeyManagementService, networkParameterAcceptanceSettings: NetworkParameterAcceptanceSettings) LongParameterList:NetworkParameters.kt$NetworkParameters$(minimumPlatformVersion: Int = this.minimumPlatformVersion, notaries: List<NotaryInfo> = this.notaries, maxMessageSize: Int = this.maxMessageSize, maxTransactionSize: Int = this.maxTransactionSize, modifiedTime: Instant = this.modifiedTime, epoch: Int = this.epoch, whitelistedContractImplementations: Map<String, List<AttachmentId>> = this.whitelistedContractImplementations ) @@ -1226,6 +1227,7 @@ MagicNumber:TransactionBuilder.kt$TransactionBuilder$4 MagicNumber:TransactionDSLInterpreter.kt$TransactionDSL$30 MagicNumber:TransactionUtils.kt$4 + MagicNumber:TransactionVerificationException.kt$TransactionVerificationException.ConstraintPropagationRejection$3 MagicNumber:TransactionVerifierServiceInternal.kt$Verifier$4 MagicNumber:TransactionViewer.kt$TransactionViewer$15.0 MagicNumber:TransactionViewer.kt$TransactionViewer$20.0 @@ -2216,7 +2218,6 @@ MaxLineLength:H2SecurityTests.kt$H2SecurityTests$startNode(customOverrides = mapOf(h2AddressKey to "${InetAddress.getLocalHost().hostAddress}:${getFreePort()}")).getOrThrow() MaxLineLength:HTTPNetworkRegistrationService.kt$HTTPNetworkRegistrationService$in TRANSIENT_ERROR_STATUS_CODES -> throw ServiceUnavailableException("Could not connect with Doorman. Http response status code was ${conn.responseCode}.") MaxLineLength:HardRestartTest.kt$HardRestartTest$val rpc = tlRpc.get() ?: CordaRPCClient(a.rpcAddress).start(demoUser.username, demoUser.password).proxy.also { tlRpc.set(it) } - MaxLineLength:HibernateColumnConverterTests.kt$HibernateColumnConverterTests$// AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a // cache miss was doing a flush. This also checks that loading during flush does actually work. @Test fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() MaxLineLength:HibernateConfiguration.kt$HibernateConfiguration$CordaMaterializedBlobType : AbstractSingleColumnStandardBasicType MaxLineLength:HibernateConfiguration.kt$HibernateConfiguration$CordaWrapperBinaryType : AbstractSingleColumnStandardBasicType MaxLineLength:HibernateConfiguration.kt$HibernateConfiguration$MapBlobToPostgresByteA : AbstractSingleColumnStandardBasicType @@ -2242,6 +2243,8 @@ MaxLineLength:HibernateConfigurationTest.kt$HibernateConfigurationTest$val joinVaultStatesToCash = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), cashStatesSchema.get<PersistentStateRef>("stateRef")) MaxLineLength:HibernateConfigurationTest.kt$HibernateConfigurationTest$val schemaService = NodeSchemaService(extraSchemas = setOf(CashSchemaV1, SampleCashSchemaV1, SampleCashSchemaV2, SampleCashSchemaV3, DummyLinearStateSchemaV1, DummyLinearStateSchemaV2, DummyDealStateSchemaV1)) MaxLineLength:HibernateConfigurationTest.kt$HibernateConfigurationTest.<no name provided>$override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, database, schemaService, cordappClassloader).apply { start() } + MaxLineLength:HibernateInteractionTests.kt$HibernateInteractionTests$// AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a // cache miss was doing a flush. This also checks that loading during flush does actually work. @Test fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() + MaxLineLength:HibernateInteractionTests.kt$HibernateInteractionTests$@Test fun `when a cascade is in progress (because of nested entities), the node avoids to flush & detach entities, since it's not allowed by Hibernate`() MaxLineLength:HibernateQueryCriteriaParser.kt$AbstractQueryCriteriaParser$abstract MaxLineLength:HibernateQueryCriteriaParser.kt$HibernateAttachmentQueryCriteriaParser$AbstractQueryCriteriaParser<AttachmentQueryCriteria, AttachmentsQueryCriteriaParser, AttachmentSort>(), AttachmentsQueryCriteriaParser MaxLineLength:HibernateQueryCriteriaParser.kt$HibernateAttachmentQueryCriteriaParser$private val criteriaQuery: CriteriaQuery<NodeAttachmentService.DBAttachment> @@ -2369,15 +2372,6 @@ MaxLineLength:InstallShellExtensionsParser.kt$ShellExtensionsGenerator$printWarning("Cannot install shell extension for bash major version earlier than $minSupportedBashVersion. Please upgrade your bash version. Aliases should still work.") MaxLineLength:InstallShellExtensionsParser.kt$ShellExtensionsGenerator$println("Installation complete, ${parent.alias} is available in bash, but autocompletion was not installed because of an old version of bash.") MaxLineLength:InstantSerializer.kt$InstantSerializer : Proxy - MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("attachments", "Commands to extract information about attachments stored within the node", AttachmentShellCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("checkpoints", "Commands to extract information about checkpoints stored within the node", CheckpointShellCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("flow", "Commands to work with flows. Flows are how you can change the ledger.", FlowShellCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("hashLookup", "Checks if a transaction with matching Id hash exists.", HashLookupShellCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("output-format", "Commands to inspect and update the output format.", OutputFormatCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("run", "Runs a method from the CordaRPCOps interface on the node.", RunShellCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$val stateObservable = runFlowFromString({ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) }, inputData, flowClazz, inputObjectMapper) - MaxLineLength:InteractiveShell.kt$InteractiveShell${ private val log = LoggerFactory.getLogger(javaClass) private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps private lateinit var ops: InternalCordaRPCOps private lateinit var rpcConn: AutoCloseable private var shell: Shell? = null private var classLoader: ClassLoader? = null private lateinit var shellConfiguration: ShellConfiguration private var onExit: () -> Unit = {} @JvmStatic fun getCordappsClassloader() = classLoader enum class OutputFormat { JSON, YAML } fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) { rpcOps = { username: String, password: String -> val connection = if (standalone) { CordaRPCClient( configuration.hostAndPort, configuration.ssl, classLoader ).start(username, password, gracefulReconnect = GracefulReconnect()) } else { CordaRPCClient( hostAndPort = configuration.hostAndPort, configuration = CordaRPCClientConfiguration.DEFAULT.copy( maxReconnectAttempts = 1 ), sslConfiguration = configuration.ssl, classLoader = classLoader ).start(username, password) } rpcConn = connection connection.proxy as InternalCordaRPCOps } _startShell(configuration, classLoader) } private fun _startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) { shellConfiguration = configuration InteractiveShell.classLoader = classLoader val runSshDaemon = configuration.sshdPort != null val config = Properties() if (runSshDaemon) { // Enable SSH access. Note: these have to be strings, even though raw object assignments also work. config["crash.ssh.port"] = configuration.sshdPort?.toString() config["crash.auth"] = "corda" configuration.sshHostKeyDirectory?.apply { val sshKeysDir = configuration.sshHostKeyDirectory.createDirectories() config["crash.ssh.keypath"] = (sshKeysDir / "hostkey.pem").toString() config["crash.ssh.keygen"] = "true" } } ExternalResolver.INSTANCE.addCommand("output-format", "Commands to inspect and update the output format.", OutputFormatCommand::class.java) ExternalResolver.INSTANCE.addCommand("run", "Runs a method from the CordaRPCOps interface on the node.", RunShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("flow", "Commands to work with flows. Flows are how you can change the ledger.", FlowShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("start", "An alias for 'flow start'", StartShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("hashLookup", "Checks if a transaction with matching Id hash exists.", HashLookupShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("attachments", "Commands to extract information about attachments stored within the node", AttachmentShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("checkpoints", "Commands to extract information about checkpoints stored within the node", CheckpointShellCommand::class.java) shell = ShellLifecycle(configuration.commandsDirectory).start(config, configuration.user, configuration.password) } fun runLocalShell(onExit: () -> Unit = {}) { this.onExit = onExit val terminal = TerminalFactory.create() val consoleReader = ConsoleReader("Corda", FileInputStream(FileDescriptor.`in`), System.out, terminal) val jlineProcessor = JLineProcessor(terminal.isAnsiSupported, shell, consoleReader, System.out) InterruptHandler { jlineProcessor.interrupt() }.install() thread(name = "Command line shell processor", isDaemon = true) { Emoji.renderIfSupported { try { jlineProcessor.run() } catch (e: IndexOutOfBoundsException) { log.warn("Cannot parse malformed command.") } } } thread(name = "Command line shell terminator", isDaemon = true) { // Wait for the shell to finish. jlineProcessor.closed() log.info("Command shell has exited") terminal.restore() onExit.invoke() } } class ShellLifecycle(private val shellCommands: Path) : PluginLifeCycle() { fun start(config: Properties, localUserName: String = "", localUserPassword: String = ""): Shell { val classLoader = this.javaClass.classLoader val classpathDriver = ClassPathMountFactory(classLoader) val fileDriver = FileMountFactory(Utils.getCurrentDirectory()) val extraCommandsPath = shellCommands.toAbsolutePath().createDirectories() val commandsFS = FS.Builder() .register("file", fileDriver) .mount("file:$extraCommandsPath") .register("classpath", classpathDriver) .mount("classpath:/net/corda/tools/shell/") .mount("classpath:/crash/commands/") .build() val confFS = FS.Builder() .register("classpath", classpathDriver) .mount("classpath:/crash") .build() val discovery = object : ServiceLoaderDiscovery(classLoader) { override fun getPlugins(): Iterable<CRaSHPlugin<*>> { // Don't use the Java language plugin (we may not have tools.jar available at runtime), this // will cause any commands using JIT Java compilation to be suppressed. In CRaSH upstream that // is only the 'jmx' command. return super.getPlugins().filterNot { it is JavaLanguage } + CordaAuthenticationPlugin(rpcOps) } } val attributes = emptyMap<String, Any>() val context = PluginContext(discovery, attributes, commandsFS, confFS, classLoader) context.refresh() this.config = config start(context) ops = makeRPCOps(rpcOps, localUserName, localUserPassword) return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, ops, StdoutANSIProgressRenderer)) } } fun nodeInfo() = try { ops.nodeInfo() } catch (e: UndeclaredThrowableException) { throw e.cause ?: e } @JvmStatic fun setOutputFormat(outputFormat: OutputFormat) { this.outputFormat = outputFormat } @JvmStatic fun getOutputFormat(): OutputFormat { return outputFormat } fun createYamlInputMapper(rpcOps: CordaRPCOps): ObjectMapper { // Return a standard Corda Jackson object mapper, configured to use YAML by default and with extra // serializers. return JacksonSupport.createDefaultMapper(rpcOps, YAMLFactory(), true).apply { val rpcModule = SimpleModule().apply { addDeserializer(InputStream::class.java, InputStreamDeserializer) addDeserializer(UniqueIdentifier::class.java, UniqueIdentifierDeserializer) } registerModule(rpcModule) } } private fun createOutputMapper(outputFormat: OutputFormat): ObjectMapper { val factory = when(outputFormat) { OutputFormat.JSON -> JsonFactory() OutputFormat.YAML -> YAMLFactory().disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER) } return JacksonSupport.createNonRpcMapper(factory).apply { // Register serializers for stateful objects from libraries that are special to the RPC system and don't // make sense to print out to the screen. For classes we own, annotations can be used instead. val rpcModule = SimpleModule().apply { addSerializer(Observable::class.java, ObservableSerializer) addSerializer(InputStream::class.java, InputStreamSerializer) } registerModule(rpcModule) disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) enable(SerializationFeature.INDENT_OUTPUT) } } // TODO: A default renderer could be used, instead of an object mapper. See: http://www.crashub.org/1.3/reference.html#_renderers private var outputFormat = OutputFormat.YAML @VisibleForTesting lateinit var latch: CountDownLatch private set /** * Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out * the list of options if the request is ambiguous. Then parses [inputData] as constructor arguments using * the [runFlowFromString] method and starts the requested flow. Ctrl-C can be used to cancel. */ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps)) { val matches = try { rpcOps.registeredFlows().filter { nameFragment in it } } catch (e: PermissionException) { output.println(e.message ?: "Access denied", Color.red) return } if (matches.isEmpty()) { output.println("No matching flow found, run 'flow list' to see your options.", Color.red) return } else if (matches.size > 1 && matches.find { it.endsWith(nameFragment)} == null) { output.println("Ambiguous name provided, please be more specific. Your options are:") matches.forEachIndexed { i, s -> output.println("${i + 1}. $s", Color.yellow) } return } val flowName = matches.find { it.endsWith(nameFragment)} ?: matches.single() val flowClazz: Class<FlowLogic<*>> = if (classLoader != null) { uncheckedCast(Class.forName(flowName, true, classLoader)) } else { uncheckedCast(Class.forName(flowName)) } try { // Show the progress tracker on the console until the flow completes or is interrupted with a // Ctrl-C keypress. val stateObservable = runFlowFromString({ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) }, inputData, flowClazz, inputObjectMapper) latch = CountDownLatch(1) ansiProgressRenderer.render(stateObservable, latch::countDown) // Wait for the flow to end and the progress tracker to notice. By the time the latch is released // the tracker is done with the screen. while (!Thread.currentThread().isInterrupted) { try { latch.await() break } catch (e: InterruptedException) { try { rpcOps.killFlow(stateObservable.id) } finally { Thread.currentThread().interrupt() break } } } output.println("Flow completed with result: ${stateObservable.returnValue.get()}") } catch (e: NoApplicableConstructor) { output.println("No matching constructor found:", Color.red) e.errors.forEach { output.println("- $it", Color.red) } } catch (e: PermissionException) { output.println(e.message ?: "Access denied", Color.red) } catch (e: ExecutionException) { // ignoring it as already logged by the progress handler subscriber } finally { InputStreamDeserializer.closeAll() } } class NoApplicableConstructor(val errors: List<String>) : CordaException(this.toString()) { override fun toString() = (listOf("No applicable constructor for flow. Problems were:") + errors).joinToString(System.lineSeparator()) } /** * Tidies up a possibly generic type name by chopping off the package names of classes in a hard-coded set of * hierarchies that are known to be widely used and recognised, and also not have (m)any ambiguous names in them. * * This is used for printing error messages when something doesn't match. */ private fun maybeAbbreviateGenericType(type: Type, extraRecognisedPackage: String): String { val packagesToAbbreviate = listOf("java.", "net.corda.core.", "kotlin.", extraRecognisedPackage) fun shouldAbbreviate(typeName: String) = packagesToAbbreviate.any { typeName.startsWith(it) } fun abbreviated(typeName: String) = if (shouldAbbreviate(typeName)) typeName.split('.').last() else typeName fun innerLoop(type: Type): String = when (type) { is ParameterizedType -> { val args: List<String> = type.actualTypeArguments.map(::innerLoop) abbreviated(type.rawType.typeName) + '<' + args.joinToString(", ") + '>' } is GenericArrayType -> { innerLoop(type.genericComponentType) + "[]" } is Class<*> -> { if (type.isArray) abbreviated(type.simpleName) else abbreviated(type.name).replace('$', '.') } else -> type.toString() } return innerLoop(type) } @JvmStatic fun killFlowById(id: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps)) { try { val runId = try { inputObjectMapper.readValue(id, StateMachineRunId::class.java) } catch (e: JsonMappingException) { output.println("Cannot parse flow ID of '$id' - expecting a UUID.", Color.red) log.error("Failed to parse flow ID", e) return } if (rpcOps.killFlow(runId)) { output.println("Killed flow $runId", Color.yellow) } else { output.println("Failed to kill flow $runId", Color.red) } } finally { output.flush() } } // TODO: This utility is generally useful and might be better moved to the node class, or an RPC, if we can commit to making it stable API. /** * Given a [FlowLogic] class and a string in one-line Yaml form, finds an applicable constructor and starts * the flow, returning the created flow logic. Useful for lightweight invocation where text is preferable * to statically typed, compiled code. * * See the [StringToMethodCallParser] class to learn more about limitations and acceptable syntax. * * @throws NoApplicableConstructor if no constructor could be found for the given set of types. */ @Throws(NoApplicableConstructor::class) fun <T> runFlowFromString(invoke: (Class<out FlowLogic<T>>, Array<out Any?>) -> FlowProgressHandle<T>, inputData: String, clazz: Class<out FlowLogic<T>>, om: ObjectMapper): FlowProgressHandle<T> { val errors = ArrayList<String>() val parser = StringToMethodCallParser(clazz, om) val nameTypeList = getMatchingConstructorParamsAndTypes(parser, inputData, clazz) try { val args = parser.parseArguments(clazz.name, nameTypeList, inputData) return invoke(clazz, args) } catch (e: StringToMethodCallParser.UnparseableCallException.ReflectionDataMissing) { val argTypes = nameTypeList.map { (_, type) -> type } errors.add("$argTypes: <constructor missing parameter reflection data>") } catch (e: StringToMethodCallParser.UnparseableCallException) { val argTypes = nameTypeList.map { (_, type) -> type } errors.add("$argTypes: ${e.message}") } throw NoApplicableConstructor(errors) } private fun <T> getMatchingConstructorParamsAndTypes(parser: StringToMethodCallParser<FlowLogic<T>>, inputData: String, clazz: Class<out FlowLogic<T>>) : List<Pair<String, Type>> { val errors = ArrayList<String>() val classPackage = clazz.packageName_ lateinit var paramNamesFromConstructor: List<String> for (ctor in clazz.constructors) { // Attempt construction with the given arguments. fun getPrototype(): List<String> { val argTypes = ctor.genericParameterTypes.map { // If the type name is in the net.corda.core or java namespaces, chop off the package name // because these hierarchies don't have (m)any ambiguous names and the extra detail is just noise. maybeAbbreviateGenericType(it, classPackage) } return paramNamesFromConstructor.zip(argTypes).map { (name, type) -> "$name: $type" } } try { paramNamesFromConstructor = parser.paramNamesFromConstructor(ctor) val nameTypeList = paramNamesFromConstructor.zip(ctor.genericParameterTypes) parser.validateIsMatchingCtor(clazz.name, nameTypeList, inputData) return nameTypeList } catch (e: StringToMethodCallParser.UnparseableCallException.MissingParameter) { errors.add("${getPrototype()}: missing parameter ${e.paramName}") } catch (e: StringToMethodCallParser.UnparseableCallException.TooManyParameters) { errors.add("${getPrototype()}: too many parameters") } catch (e: StringToMethodCallParser.UnparseableCallException.ReflectionDataMissing) { val argTypes = ctor.genericParameterTypes.map { it.typeName } errors.add("$argTypes: <constructor missing parameter reflection data>") } catch (e: StringToMethodCallParser.UnparseableCallException) { val argTypes = ctor.genericParameterTypes.map { it.typeName } errors.add("$argTypes: ${e.message}") } } throw NoApplicableConstructor(errors) } // TODO Filtering on error/success when we will have some sort of flow auditing, for now it doesn't make much sense. @JvmStatic fun runStateMachinesView(out: RenderPrintWriter, rpcOps: CordaRPCOps): Any? { val proxy = rpcOps val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed() val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) } val subscriber = FlowWatchPrintingSubscriber(out) stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber) var result: Any? = subscriber.future if (result is Future<*>) { if (!result.isDone) { out.cls() out.println("Waiting for completion or Ctrl-C ... ") out.flush() } try { result = result.get() } catch (e: InterruptedException) { subscriber.unsubscribe() Thread.currentThread().interrupt() } catch (e: ExecutionException) { throw e.rootCause } catch (e: InvocationTargetException) { throw e.rootCause } } return result } @JvmStatic fun runAttachmentTrustInfoView( out: RenderPrintWriter, rpcOps: InternalCordaRPCOps ): Any { return AttachmentTrustTable(out, rpcOps.attachmentTrustInfos) } @JvmStatic fun runDumpCheckpoints(rpcOps: InternalCordaRPCOps) { rpcOps.dumpCheckpoints() } @JvmStatic fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps, inputObjectMapper: ObjectMapper): Any? { val cmd = input.joinToString(" ").trim { it <= ' ' } if (cmd.startsWith("startflow", ignoreCase = true)) { // The flow command provides better support and startFlow requires special handling anyway due to // the generic startFlow RPC interface which offers no type information with which to parse the // string form of the command. out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow) return null } else if (cmd.substringAfter(" ").trim().equals("gracefulShutdown", ignoreCase = true)) { return InteractiveShell.gracefulShutdown(out, cordaRPCOps) } var result: Any? = null try { InputStreamSerializer.invokeContext = context val parser = StringToMethodCallParser(CordaRPCOps::class.java, inputObjectMapper) val call = parser.parse(cordaRPCOps, cmd) result = call.call() if (result != null && result !== kotlin.Unit && result !is Void) { result = printAndFollowRPCResponse(result, out, outputFormat) } if (result is Future<*>) { if (!result.isDone) { out.println("Waiting for completion or Ctrl-C ... ") out.flush() } try { result = result.get() } catch (e: InterruptedException) { Thread.currentThread().interrupt() } catch (e: ExecutionException) { throw e.rootCause } catch (e: InvocationTargetException) { throw e.rootCause } } } catch (e: StringToMethodCallParser.UnparseableCallException) { out.println(e.message, Color.red) if (e !is StringToMethodCallParser.UnparseableCallException.NoSuchFile) { out.println("Please try 'man run' to learn what syntax is acceptable") } } catch (e: Exception) { out.println("RPC failed: ${e.rootCause}", Color.red) } finally { InputStreamSerializer.invokeContext = null InputStreamDeserializer.closeAll() } return result } @JvmStatic fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps) { fun display(statements: RenderPrintWriter.() -> Unit) { statements.invoke(userSessionOut) userSessionOut.flush() } var isShuttingDown = false try { display { println("Orchestrating a clean shutdown, press CTRL+C to cancel...") } isShuttingDown = true display { println("...enabling draining mode") println("...waiting for in-flight flows to be completed") } cordaRPCOps.terminate(true) val latch = CountDownLatch(1) @Suppress("DEPRECATION") cordaRPCOps.pendingFlowsCount().updates.doOnError { error -> log.error(error.message) throw error }.doAfterTerminate(latch::countDown).subscribe( // For each update. { (first, second) -> display { println("...remaining: $first / $second") } }, // On error. { error -> if (!isShuttingDown) { display { println("RPC failed: ${error.rootCause}", Color.red) } } }, // When completed. { rpcConn.close() // This will only show up in the standalone Shell, because the embedded one is killed as part of a node's shutdown. display { println("...done, quitting the shell now.") } onExit.invoke() }) while (!Thread.currentThread().isInterrupted) { try { latch.await() break } catch (e: InterruptedException) { try { cordaRPCOps.setFlowsDrainingModeEnabled(false) display { println("...cancelled clean shutdown.") } } finally { Thread.currentThread().interrupt() break } } } } catch (e: StringToMethodCallParser.UnparseableCallException) { display { println(e.message, Color.red) println("Please try 'man run' to learn what syntax is acceptable") } } catch (e: Exception) { if (!isShuttingDown) { display { println("RPC failed: ${e.rootCause}", Color.red) } } } finally { InputStreamSerializer.invokeContext = null InputStreamDeserializer.closeAll() } } private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter, outputFormat: OutputFormat): CordaFuture<Unit> { val outputMapper = createOutputMapper(outputFormat) val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } return maybeFollow(response, mapElement, out) } private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber<Any>() { private var count = 0 val future = openFuture<Unit>() init { // The future is public and can be completed by something else to indicate we don't wish to follow // anymore (e.g. the user pressing Ctrl-C). future.then { unsubscribe() } } @Synchronized override fun onCompleted() { toStream.println("Observable has completed") future.set(Unit) } @Synchronized override fun onNext(t: Any?) { count++ toStream.println("Observation $count: " + printerFun(t)) toStream.flush() } @Synchronized override fun onError(e: Throwable) { toStream.println("Observable completed with an error") e.printStackTrace(toStream) future.setException(e) } } private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> { // Match on a couple of common patterns for "important" observables. It's tough to do this in a generic // way because observables can be embedded anywhere in the object graph, and can emit other arbitrary // object graphs that contain yet more observables. So we just look for top level responses that follow // the standard "track" pattern, and print them until the user presses Ctrl-C if (response == null) return doneFuture(Unit) if (response is DataFeed<*, *>) { out.println("Snapshot:") out.println(printerFun(response.snapshot)) out.flush() out.println("Updates:") return printNextElements(response.updates, printerFun, out) } if (response is Observable<*>) { return printNextElements(response, printerFun, out) } out.println(printerFun(response)) return doneFuture(Unit) } private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> { val subscriber = PrintingSubscriber(printerFun, out) uncheckedCast(elements).subscribe(subscriber) return subscriber.future } } - MaxLineLength:InteractiveShell.kt$InteractiveShell.NoApplicableConstructor$override fun toString() MaxLineLength:InteractiveShellIntegrationTest.kt$InteractiveShellIntegrationTest$private MaxLineLength:InterestSwapRestAPI.kt$InterestRateSwapAPI MaxLineLength:InternalAccessTestHelpers.kt$fun <T> ifThrowsAppend(strToAppendFn: () -> String, block: () -> T): T @@ -2604,7 +2598,6 @@ MaxLineLength:MockServices.kt$MockServices$this(cordappLoaderForPackages(cordappPackages), identityService, networkParameters, initialIdentity, moreKeys, keyManagementService) MaxLineLength:MockServices.kt$MockServices$this(cordappLoaderForPackages(cordappPackages), identityService, testNetworkParameters(modifiedTime = Instant.MIN), initialIdentity, moreKeys) MaxLineLength:MockServices.kt$MockServices.Companion$ @JvmStatic @JvmOverloads fun makeTestDatabaseAndMockServices(cordappPackages: List<String>, identityService: IdentityService, initialIdentity: TestIdentity, networkParameters: NetworkParameters = testNetworkParameters(modifiedTime = Instant.MIN), vararg moreKeys: KeyPair): Pair<CordaPersistence, MockServices> - MaxLineLength:MockServices.kt$MockServices.Companion$ @JvmStatic @JvmOverloads fun makeTestDatabaseAndPersistentServices( cordappPackages: List<String>, initialIdentity: TestIdentity, networkParameters: NetworkParameters = testNetworkParameters(modifiedTime = Instant.MIN), moreKeys: Set<KeyPair>, moreIdentities: Set<PartyAndCertificate> ): Pair<CordaPersistence, MockServices> MaxLineLength:MockServices.kt$MockServices.Companion$makeMockMockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys, keyManagementService, schemaService, persistence) MaxLineLength:MockServices.kt$MockServices.Companion$makeMockMockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys.toSet(), keyManagementService, schemaService, database) MaxLineLength:MockServices.kt$MockServices.Companion$return object : MockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys.toTypedArray(), keyManagementService) { override var networkParametersService: NetworkParametersService = MockNetworkParametersStorage(networkParameters) override val vaultService: VaultService = makeVaultService(schemaService, persistence, cordappLoader) override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) { ServiceHubInternal.recordTransactions( statesToRecord, txs as? Collection ?: txs.toList(), validatedTransactions as WritableTransactionStorage, mockStateMachineRecordedTransactionMappingStorage, vaultService as VaultServiceInternal, persistence ) } override fun jdbcSession(): Connection = persistence.createSession() override fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T { return block(contextTransaction.restrictedEntityManager) } override fun withEntityManager(block: Consumer<EntityManager>) { return block.accept(contextTransaction.restrictedEntityManager) } } @@ -3153,7 +3146,6 @@ MaxLineLength:RPCApi.kt$return sessionId(RPC_SESSION_ID_FIELD_NAME, RPC_SESSION_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot extract the session id from client message.") MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$private MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext) - MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$return cacheFactory.buildNamed(Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()), "RpcClientProxyHandler_rpcObservable") MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$throw UnsupportedOperationException("Method $calledMethod was added in RPC protocol version $sinceVersion but the server is running $serverProtocolVersion") MaxLineLength:RPCDriver.kt$RPCDriverDSL$val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, driverDSL.driverDirectory / serverName, hostAndPort) MaxLineLength:RPCDriver.kt$RPCDriverDSL.Companion$fun createRpcServerArtemisConfig(maxFileSize: Int, maxBufferedBytesPerClient: Long, baseDirectory: Path, hostAndPort: NetworkHostAndPort): Configuration @@ -3581,17 +3573,14 @@ MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$ContractCreationError : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$ContractRejection : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$InvalidAttachmentException : TransactionVerificationException - MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$MissingNetworkParametersException : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$NotaryChangeInWrongTransactionType : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$OverlappingAttachmentsException : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$PackageOwnershipException : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$SignersMissing : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$TransactionNetworkParameterOrderingException : TransactionVerificationException - MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$TransactionNotaryMismatchEncumbranceException : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException.ContractCreationError$internal constructor(txId: SecureHash, contractClass: String, cause: Throwable) : this(txId, contractClass, cause, cause.message ?: "") MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException.ContractRejection$internal constructor(txId: SecureHash, contract: Contract, cause: Throwable) : this(txId, contract.javaClass.name, cause, cause.message ?: "") MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException.PackageOwnershipException$"""The attachment JAR: $attachmentHash containing the class: $invalidClassName is not signed by the owner of package $packageName specified in the network parameters. Please check the source of this attachment and if it is malicious contact your zone operator to report this incident. For details see: https://docs.corda.net/network-map.html#network-parameters""" - MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException.TransactionNotaryMismatchEncumbranceException$"Output state with index $encumberedIndex is assigned to notary [$encumberedNotary], while its encumbrance with index $encumbranceIndex is assigned to notary [$encumbranceNotary]" MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException.UntrustedAttachmentsException$"Please follow the operational steps outlined in https://docs.corda.net/cordapp-build-systems.html#cordapp-contract-attachments to learn more and continue." MaxLineLength:TransactionVerificationException.kt$net.corda.core.contracts.TransactionVerificationException.kt MaxLineLength:TransactionVerificationRequest.kt$TransactionVerificationRequest$@Suppress("MemberVisibilityCanBePrivate") //TODO the use of deprecated toLedgerTransaction need to be revisited as resolveContractAttachment requires attachments of the transactions which created input states... //TODO ...to check contract version non downgrade rule, curretly dummy Attachment if not fund is used which sets contract version to '1' @CordaSerializable @@ -3909,7 +3898,6 @@ ReturnCount:FlowManager.kt$NodeFlowManager.FlowWeightComparator$override fun compare(o1: NodeFlowManager.RegisteredFlowContainer, o2: NodeFlowManager.RegisteredFlowContainer): Int ReturnCount:InteractiveShell.kt$InteractiveShell$ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps)) ReturnCount:InteractiveShell.kt$InteractiveShell$@JvmStatic fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps, inputObjectMapper: ObjectMapper): Any? - ReturnCount:InteractiveShell.kt$InteractiveShell$private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> ReturnCount:Interpolators.kt$LinearInterpolator$override fun interpolate(x: Double): Double ReturnCount:JarScanningCordappLoader.kt$JarScanningCordappLoader$private fun parseCordappInfo(manifest: Manifest?, defaultName: String): Cordapp.Info ReturnCount:LocalSerializerFactory.kt$DefaultLocalSerializerFactory$override fun get(actualClass: Class<*>, declaredType: Type): AMQPSerializer<Any> diff --git a/detekt-config.yml b/detekt-config.yml index 033fbdca4e..65bb613bd7 100644 --- a/detekt-config.yml +++ b/detekt-config.yml @@ -10,29 +10,33 @@ complexity: active: true ComplexCondition: active: true + excludes: "**/buildSrc/**" threshold: 4 ComplexMethod: active: true + excludes: "**/buildSrc/**" threshold: 10 ignoreSingleWhenExpression: true LargeClass: active: true - excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt" + excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt,**/buildSrc/**" threshold: 600 LongMethod: active: true - excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt" + excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt,**/buildSrc/**" threshold: 120 LongParameterList: active: true + excludes: "**/buildSrc/**" threshold: 6 ignoreDefaultParameters: false NestedBlockDepth: active: true + excludes: "**/buildSrc/**" threshold: 4 TooManyFunctions: active: true - excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt" + excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt,**/buildSrc/**" thresholdInFiles: 15 thresholdInClasses: 15 thresholdInInterfaces: 15 @@ -41,6 +45,7 @@ complexity: empty-blocks: active: true + excludes: "**/buildSrc/**" EmptyCatchBlock: active: true allowedExceptionNameRegex: "^(_|(ignore|expected).*)" @@ -71,6 +76,7 @@ empty-blocks: exceptions: active: true + excludes: "**/buildSrc/**" TooGenericExceptionCaught: active: true exceptionNames: @@ -92,6 +98,7 @@ exceptions: naming: active: true + excludes: "**/buildSrc/**" ClassNaming: active: true classPattern: '[A-Z$][a-zA-Z0-9$]*' @@ -127,6 +134,7 @@ naming: performance: active: true + excludes: "**/buildSrc/**" ForEachOnRange: active: true SpreadOperator: @@ -136,6 +144,7 @@ performance: potential-bugs: active: true + excludes: "**/buildSrc/**" DuplicateCaseInWhenExpression: active: true EqualsWithHashCodeExist: @@ -149,10 +158,11 @@ style: active: true ForbiddenComment: active: true + excludes: "**/buildSrc/**" values: 'TODO:,FIXME:,STOPSHIP:' MagicNumber: active: true - excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt" + excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt,**/buildSrc/**" ignoreNumbers: '-1,0,1,2' ignoreHashCodeFunction: true ignorePropertyDeclaration: false @@ -163,23 +173,30 @@ style: ignoreEnums: false MaxLineLength: active: true + excludes: "**/buildSrc/**" maxLineLength: 140 excludePackageStatements: true excludeImportStatements: true ModifierOrder: active: true + excludes: "**/buildSrc/**" OptionalAbstractKeyword: active: true + excludes: "**/buildSrc/**" ReturnCount: active: true + excludes: "**/buildSrc/**" max: 2 excludedFunctions: "equals" excludeReturnFromLambda: true SafeCast: active: true + excludes: "**/buildSrc/**" ThrowsCount: active: true + excludes: "**/buildSrc/**" max: 2 WildcardImport: active: true + excludes: "**/buildSrc/**" excludeImports: 'java.util.*,kotlinx.android.synthetic.*' \ No newline at end of file diff --git a/docs/source/conf.py b/docs/source/conf.py index 1fa6e4a1e2..f34756ac7d 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -25,7 +25,8 @@ corda_substitutions = { "|quasar_version|" : constants_properties_dict["quasarVersion"], "|platform_version|" : constants_properties_dict["platformVersion"], "|os_branch|" : constants_properties_dict["openSourceBranch"], - "|os_samples_branch|" : constants_properties_dict["openSourceSamplesBranch"] + "|os_samples_branch|" : constants_properties_dict["openSourceSamplesBranch"], + "|jolokia_version|" : constants_properties_dict["jolokiaAgentVersion"] } def setup(app): diff --git a/docs/source/node-administration.rst b/docs/source/node-administration.rst index bb44e666df..379e5efb39 100644 --- a/docs/source/node-administration.rst +++ b/docs/source/node-administration.rst @@ -81,6 +81,12 @@ Note that in production, exposing the database via the node is not recommended. Monitoring your node -------------------- +This section covers monitoring performance and health of a node in Corda Enterprise with Jolokia and Graphite. General best practices for monitoring (e.g. setting up TCP checks for the ports the node communicates on, database health checks etc.) are not covered here but should be followed. + + +Monitoring via Jolokia +++++++++++++++++++++++ + Like most Java servers, the node can be configured to export various useful metrics and management operations via the industry-standard `JMX infrastructure `_. JMX is a standard API for registering so-called *MBeans* ... objects whose properties and methods are intended for server management. As Java @@ -106,8 +112,12 @@ Here are a few ways to build dashboards and extract monitoring data for a node: It can bridge any data input to any output using their plugin system, for example, Telegraf can be configured to collect data from Jolokia and write to DataDog web api. -The Node configuration parameter `jmxMonitoringHttpPort` has to be present in order to ensure a Jolokia agent is instrumented with -the JVM run-time. +In order to ensure that a Jolokia agent is instrumented with the JVM run-time, you can choose one of these options: + +* Specify the Node configuration parameter ``jmxMonitoringHttpPort`` which will attempt to load the jolokia driver from the ``drivers`` folder. + The format of the driver name needs to be ``jolokia-jvm-{VERSION}-agent.jar`` where VERSION is the version required by Corda, currently |jolokia_version|. +* Start the node with ``java -Dcapsule.jvm.args="-javaagent:drivers/jolokia-jvm-1.6.0-agent.jar=port=7777,host=localhost" -jar corda.jar``. + The following JMX statistics are exported: @@ -126,6 +136,8 @@ via a file called ``jolokia-access.xml``. Several Jolokia policy based security configuration files (``jolokia-access.xml``) are available for dev, test, and prod environments under ``/config/``. +To pass a security policy use ``java -Dcapsule.jvm.args=-javaagent:./drivers/jolokia-jvm-1.6.0-agent.jar,policyLocation=file:./config-path/jolokia-access.xml -jar corda.jar`` + Notes for development use +++++++++++++++++++++++++ diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt index 47cf9ab543..bbeff4adce 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt @@ -33,6 +33,7 @@ import java.io.File import java.net.URL import java.nio.file.FileAlreadyExistsException import java.nio.file.Path +import java.nio.file.Paths import java.nio.file.StandardCopyOption.REPLACE_EXISTING import java.security.PublicKey import java.time.Duration @@ -68,7 +69,7 @@ internal constructor(private val initSerEnv: Boolean, companion object { // TODO This will probably need to change once we start using a bundled JVM private val nodeInfoGenCmd = listOf( - "java", + Paths.get(System.getProperty("java.home"), "bin", "java").toString(), "-jar", "corda.jar", "generate-node-info" diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt index 4ccfff8ddb..95f950d13c 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt @@ -25,9 +25,6 @@ class DatabaseTransaction( ) { val id: UUID = UUID.randomUUID() - val flushing: Boolean get() = _flushingCount > 0 - private var _flushingCount = 0 - val connection: Connection by lazy(LazyThreadSafetyMode.NONE) { database.dataSource.connection.apply { autoCommit = false @@ -37,27 +34,6 @@ class DatabaseTransaction( private val sessionDelegate = lazy { val session = database.entityManagerFactory.withOptions().connection(connection).openSession() - session.addEventListeners(object : BaseSessionEventListener() { - override fun flushStart() { - _flushingCount++ - super.flushStart() - } - - override fun flushEnd(numberOfEntities: Int, numberOfCollections: Int) { - super.flushEnd(numberOfEntities, numberOfCollections) - _flushingCount-- - } - - override fun partialFlushStart() { - _flushingCount++ - super.partialFlushStart() - } - - override fun partialFlushEnd(numberOfEntities: Int, numberOfCollections: Int) { - super.partialFlushEnd(numberOfEntities, numberOfCollections) - _flushingCount-- - } - }) hibernateTransaction = session.beginTransaction() session } diff --git a/node/build.gradle b/node/build.gradle index cbcc56f69f..ce88a4bf56 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -3,7 +3,12 @@ buildscript { def properties = new Properties() file("$projectDir/src/main/resources/build.properties").withInputStream { properties.load(it) } - ext.jolokia_version = properties.getProperty('jolokiaAgentVersion') + + Properties constants = new Properties() + file("$rootDir/constants.properties").withInputStream { constants.load(it) } + + + ext.jolokia_version = constants.getProperty('jolokiaAgentVersion') dependencies { classpath group: 'com.github.docker-java', name: 'docker-java', version: '3.1.5' diff --git a/node/src/integration-test/kotlin/net/corda/node/NodeRPCTests.kt b/node/src/integration-test/kotlin/net/corda/node/NodeRPCTests.kt index 321cc07fc0..518dad671a 100644 --- a/node/src/integration-test/kotlin/net/corda/node/NodeRPCTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/NodeRPCTests.kt @@ -10,7 +10,6 @@ import org.junit.Test import kotlin.test.assertEquals import kotlin.test.assertTrue -@Ignore class NodeRPCTests { private val CORDA_VERSION_REGEX = "\\d+(\\.\\d+)?(-\\w+)?".toRegex() private val CORDA_VENDOR = "Corda Open Source" @@ -28,7 +27,6 @@ class NodeRPCTests { driver(DriverParameters(notarySpecs = emptyList(), cordappsForAllNodes = CORDAPPS, extraCordappPackagesToScan = emptyList())) { val nodeDiagnosticInfo = startNode().get().rpc.nodeDiagnosticInfo() assertTrue(nodeDiagnosticInfo.version.matches(CORDA_VERSION_REGEX)) - assertTrue(nodeDiagnosticInfo.revision.matches(HEXADECIMAL_REGEX)) assertEquals(PLATFORM_VERSION, nodeDiagnosticInfo.platformVersion) assertEquals(CORDA_VENDOR, nodeDiagnosticInfo.vendor) nodeDiagnosticInfo.cordapps.forEach { println("${it.shortName} ${it.type}") } diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index b02d0a6c76..d1f2f7e7b2 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -150,6 +150,7 @@ internal class CordaRPCOpsImpl( override fun killFlow(id: StateMachineRunId): Boolean = if (smm.killFlow(id)) true else smm.flowHospital.dropSessionInit(id.uuid) override fun stateMachinesFeed(): DataFeed, StateMachineUpdate> { + val (allStateMachines, changes) = smm.track() return DataFeed( allStateMachines.map { stateMachineInfoFromFlowLogic(it) }, diff --git a/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt index b1cf9d3e9f..28ec775b46 100644 --- a/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt +++ b/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt @@ -6,13 +6,10 @@ import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.identity.x500Matches import net.corda.core.internal.CertRole -import net.corda.core.internal.hash -import net.corda.core.node.services.IdentityService import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.contextLogger import net.corda.core.utilities.trace import net.corda.node.services.api.IdentityServiceInternal -import net.corda.node.services.persistence.WritablePublicKeyToOwningIdentityCache import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.crypto.x509Certificates import java.security.InvalidAlgorithmParameterException @@ -101,6 +98,10 @@ class InMemoryIdentityService( return keyToPartyAndCerts[identityCertChain[1].publicKey] } + override fun partyFromKey(key: PublicKey): Party? { + return certificateFromKey(key)?.party ?: keyToName[key.toStringShort()]?.let { wellKnownPartyFromX500Name(it) } + } + override fun certificateFromKey(owningKey: PublicKey): PartyAndCertificate? = keyToPartyAndCerts[owningKey] // We give the caller a copy of the data set to avoid any locking problems diff --git a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt index 6dd70ea852..b86355302c 100644 --- a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt +++ b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt @@ -296,6 +296,12 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri keyToPartyAndCert[owningKey.toStringShort()] } + override fun partyFromKey(key: PublicKey): Party? { + return certificateFromKey(key)?.party ?: database.transaction { + keyToName[key.toStringShort()] + }?.let { wellKnownPartyFromX500Name(it) } + } + private fun certificateFromCordaX500Name(name: CordaX500Name): PartyAndCertificate? { return database.transaction { val partyId = nameToKey[name] diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt index 17fff1da60..f0551758d4 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt @@ -71,7 +71,7 @@ sealed class Event { * Initiate a flow. This causes a new session object to be created and returned to the flow. Note that no actual * communication takes place at this time, only on the first send/receive operation on the session. */ - data class InitiateFlow(val destination: Destination) : Event() + data class InitiateFlow(val destination: Destination, val wellKnownParty: Party) : Event() /** * Signal the entering into a subflow. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowLogicRefFactoryImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowLogicRefFactoryImpl.kt index 2d8695de5c..dfd2b1a8db 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowLogicRefFactoryImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowLogicRefFactoryImpl.kt @@ -5,6 +5,8 @@ import net.corda.core.flows.* import net.corda.core.internal.VisibleForTesting import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.contextLogger +import org.slf4j.Logger import java.lang.reflect.ParameterizedType import java.lang.reflect.Type import java.lang.reflect.TypeVariable @@ -33,7 +35,12 @@ data class FlowLogicRefImpl internal constructor(val flowLogicClassName: String, * in response to a potential malicious use or buggy update to an app etc. */ // TODO: Replace with a per app classloader/cordapp provider/cordapp loader - this will do for now +@Suppress("ReturnCount", "TooManyFunctions") open class FlowLogicRefFactoryImpl(private val classloader: ClassLoader) : SingletonSerializeAsToken(), FlowLogicRefFactory { + companion object { + private val log: Logger = contextLogger() + } + override fun create(flowClass: Class>, vararg args: Any?): FlowLogicRef { if (!flowClass.isAnnotationPresent(SchedulableFlow::class.java)) { throw IllegalFlowLogicException(flowClass, "because it's not a schedulable flow") @@ -76,20 +83,100 @@ open class FlowLogicRefFactoryImpl(private val classloader: ClassLoader) : Singl return createKotlin(flowClass, argsMap) } - protected open fun findConstructor(flowClass: Class>, argTypes: List?>): KFunction> { + private fun matchConstructorArgs(ctorTypes: List>, optional: List, + argTypes: List?>): Pair { + // There must be at least as many constructor arguments as supplied arguments + if (argTypes.size > ctorTypes.size) { + return Pair(false, 0) + } + + // Check if all constructor arguments are assignable for all supplied arguments, then for remaining arguments in constructor + // check that they are optional. If they are it's still a match. Return if matched and the number of default args consumed. + var numDefaultsUsed = 0 + var index = 0 + for (conArg in ctorTypes) { + if (index < argTypes.size) { + val argType = argTypes[index] + if (argType != null && !conArg.isAssignableFrom(argType)) { + return Pair(false, 0) + } + } else { + if (index >= optional.size || !optional[index]) { + return Pair(false, 0) + } + numDefaultsUsed++ + } + index++ + } + + return Pair(true, numDefaultsUsed) + } + + private fun handleNoMatchingConstructor(flowClass: Class>, argTypes: List?>) { + log.error("Cannot find Constructor to match arguments: ${argTypes.joinToString()}") + log.info("Candidate constructors are:") + for (ctor in flowClass.kotlin.constructors) { + log.info("${ctor}") + } + } + + private fun findConstructorCheckDefaultParams(flowClass: Class>, argTypes: List?>): + KFunction> { + // There may be multiple matches. If there are, we will use the one with the least number of default parameter matches. + var ctorMatch: KFunction>? = null + var matchNumDefArgs = 0 + for (ctor in flowClass.kotlin.constructors) { + // Get the types of the arguments, always boxed (as that's what we get in the invocation). + val ctorTypes = ctor.javaConstructor!!.parameterTypes.map { + if (it == null) { it } else { Primitives.wrap(it) } + } + + val optional = ctor.parameters.map { it.isOptional } + val (matched, numDefaultsUsed) = matchConstructorArgs(ctorTypes, optional, argTypes) + if (matched) { + if (ctorMatch == null || numDefaultsUsed < matchNumDefArgs) { + ctorMatch = ctor + matchNumDefArgs = numDefaultsUsed + } + } + } + + if (ctorMatch == null) { + handleNoMatchingConstructor(flowClass, argTypes) + // Must do the throw here, not in handleNoMatchingConstructor(added for Detekt) else we can't return ctorMatch as non-null + throw IllegalFlowLogicException(flowClass, "No constructor found that matches arguments (${argTypes.joinToString()}), " + + "see log for more information.") + } + + log.info("Matched constructor: ${ctorMatch} (num_default_args_used=$matchNumDefArgs)") + return ctorMatch + } + + private fun findConstructorDirectMatch(flowClass: Class>, argTypes: List?>): KFunction> { return flowClass.kotlin.constructors.single { ctor -> // Get the types of the arguments, always boxed (as that's what we get in the invocation). val ctorTypes = ctor.javaConstructor!!.parameterTypes.map { Primitives.wrap(it) } if (argTypes.size != ctorTypes.size) return@single false for ((argType, ctorType) in argTypes.zip(ctorTypes)) { - if (argType == null) continue // Try and find a match based on the other arguments. + if (argType == null) continue // Try and find a match based on the other arguments. if (!ctorType.isAssignableFrom(argType)) return@single false } true } } + protected open fun findConstructor(flowClass: Class>, argTypes: List?>): KFunction> { + try { + return findConstructorDirectMatch(flowClass, argTypes) + } catch(e: java.lang.IllegalArgumentException) { + log.trace("findConstructorDirectMatch threw IllegalArgumentException (more than 1 matches).") + } catch (e: NoSuchElementException) { + log.trace("findConstructorDirectMatch threw NoSuchElementException (no matches).") + } + return findConstructorCheckDefaultParams(flowClass, argTypes) + } + /** * Create a [FlowLogicRef] by trying to find a Kotlin constructor that matches the given args. * diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt index 3ddc6151d3..0dc2e53b23 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt @@ -17,10 +17,11 @@ import net.corda.core.utilities.UntrustworthyData class FlowSessionImpl( override val destination: Destination, + private val wellKnownParty: Party, val sourceSessionId: SessionId ) : FlowSession() { - override val counterparty: Party get() = checkNotNull(destination as? Party) { "$destination is not a Party" } + override val counterparty: Party get() = wellKnownParty override fun toString(): String = "FlowSessionImpl(destination=$destination, sourceSessionId=$sourceSessionId)" diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 68ad5d93d8..229ca494e0 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -355,10 +355,10 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } @Suspendable - override fun initiateFlow(destination: Destination): FlowSession { + override fun initiateFlow(destination: Destination, wellKnownParty: Party): FlowSession { require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" } val resume = processEventImmediately( - Event.InitiateFlow(destination), + Event.InitiateFlow(destination, wellKnownParty), isDbTransactionOpenOnEntry = true, isDbTransactionOpenOnExit = true ) as FlowContinuation.Resume diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index c3794701b5..e85a7347b3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -466,7 +466,7 @@ class SingleThreadedStateMachineManager( try { val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage) val initiatedSessionId = SessionId.createRandom(secureRandom) - val senderSession = FlowSessionImpl(sender, initiatedSessionId) + val senderSession = FlowSessionImpl(sender, sender, initiatedSessionId) val flowLogic = initiatedFlowFactory.createFlow(senderSession) val initiatedFlowInfo = when (initiatedFlowFactory) { is InitiatedFlowFactory.Core -> FlowInfo(serviceHub.myInfo.platformVersion, "corda") diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 22e08c6b41..2c1e852758 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -235,7 +235,7 @@ class TopLevelTransition( return@builder FlowContinuation.ProcessEvents } val sourceSessionId = SessionId.createRandom(context.secureRandom) - val sessionImpl = FlowSessionImpl(event.destination, sourceSessionId) + val sessionImpl = FlowSessionImpl(event.destination, event.wellKnownParty, sourceSessionId) val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong())) currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions)) actions.add(Action.AddSessionBinding(context.id, sourceSessionId)) diff --git a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt index 6ccf10b441..f45ddbb7cf 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt @@ -7,6 +7,8 @@ import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.nodeapi.internal.persistence.contextTransaction import net.corda.nodeapi.internal.persistence.currentDBSession +import org.hibernate.Session +import org.hibernate.internal.SessionImpl import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean @@ -191,14 +193,23 @@ abstract class AppendOnlyPersistentMapBase( private fun loadValue(key: K): V? { val session = currentDBSession() - val flushing = contextTransaction.flushing - if (!flushing) { + val isSafeToDetach = isSafeToFlushAndDetach(session) + if (isSafeToDetach) { // IMPORTANT: The flush is needed because detach() makes the queue of unflushed entries invalid w.r.t. Hibernate internal state if the found entity is unflushed. // We want the detach() so that we rely on our cache memory management and don't retain strong references in the Hibernate session. session.flush() } val result = session.find(persistentEntityClass, toPersistentEntityKey(key)) - return result?.apply { if (!flushing) session.detach(result) }?.let(fromPersistentEntity)?.second + return result?.apply { if (isSafeToDetach) session.detach(result) }?.let(fromPersistentEntity)?.second + } + + private fun isSafeToFlushAndDetach(session: Session): Boolean { + if (session !is SessionImpl) + return true + + val flushInProgress = session.persistenceContext.isFlushing + val cascadeInProgress = session.persistenceContext.cascadeLevel > 0 + return !flushInProgress && !cascadeInProgress } protected fun transactionalLoadValue(key: K): Transactional { diff --git a/node/src/main/resources/build.properties b/node/src/main/resources/build.properties index cbe490156a..0b42fe7292 100644 --- a/node/src/main/resources/build.properties +++ b/node/src/main/resources/build.properties @@ -1,4 +1,6 @@ # Build constants exported as resource file to make them visible in Node program # Note: sadly, due to present limitation of IntelliJ-IDEA in processing resource files, these constants cannot be # imported from top-level 'constants.properties' file -jolokiaAgentVersion=1.6.1 +#jolokiaAgentVersion=1.6.1 + + diff --git a/node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt index c364bf96ee..989312d392 100644 --- a/node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt @@ -261,6 +261,17 @@ class PersistentIdentityServiceTests { } } + @Test + fun `resolve key to party for key without certificate`() { + // Register Alice's PartyAndCert as if it was done so via the network map cache. + identityService.verifyAndRegisterIdentity(alice.identity) + // Use a key which is not tied to a cert. + val publicKey = Crypto.generateKeyPair().public + // Register the PublicKey to Alice's CordaX500Name. + identityService.registerKey(publicKey, alice.party) + assertEquals(alice.party, identityService.partyFromKey(publicKey)) + } + @Test fun `register incorrect party to public key `(){ database.transaction { identityService.verifyAndRegisterIdentity(ALICE_IDENTITY) } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateColumnConverterTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateColumnConverterTests.kt deleted file mode 100644 index 175fab84e3..0000000000 --- a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateColumnConverterTests.kt +++ /dev/null @@ -1,82 +0,0 @@ -package net.corda.node.services.persistence - -import net.corda.core.identity.CordaX500Name -import net.corda.core.identity.Party -import net.corda.core.transactions.SignedTransaction -import net.corda.core.transactions.TransactionBuilder -import net.corda.core.utilities.OpaqueBytes -import net.corda.finance.DOLLARS -import net.corda.finance.`issued by` -import net.corda.finance.contracts.asset.Cash -import net.corda.finance.issuedBy -import net.corda.node.services.identity.PersistentIdentityService -import net.corda.node.services.keys.E2ETestKeyManagementService -import net.corda.nodeapi.internal.persistence.CordaPersistence -import net.corda.testing.common.internal.testNetworkParameters -import net.corda.testing.core.SerializationEnvironmentRule -import net.corda.testing.core.TestIdentity -import net.corda.testing.internal.TestingNamedCacheFactory -import net.corda.testing.node.MockServices -import org.junit.Before -import org.junit.Rule -import org.junit.Test -import kotlin.test.assertEquals - -class HibernateColumnConverterTests { - - @Rule - @JvmField - val testSerialization = SerializationEnvironmentRule() - - private val cordapps = listOf("net.corda.finance") - - private val myself = TestIdentity(CordaX500Name("Me", "London", "GB")) - private val notary = TestIdentity(CordaX500Name("NotaryService", "London", "GB"), 1337L) - - lateinit var services: MockServices - lateinit var database: CordaPersistence - - @Before - fun setUp() { - val (db, mockServices) = MockServices.makeTestDatabaseAndPersistentServices( - cordappPackages = cordapps, - initialIdentity = myself, - networkParameters = testNetworkParameters(minimumPlatformVersion = 4), - moreIdentities = setOf(notary.identity), - moreKeys = emptySet() - ) - services = mockServices - database = db - } - - // AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a - // cache miss was doing a flush. This also checks that loading during flush does actually work. - @Test - fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() { - val expected = 500.DOLLARS - val ref = OpaqueBytes.of(0x01) - - // Create parallel set of key and identity services so that the values are not cached, forcing the node caches to do a lookup. - val cacheFactory = TestingNamedCacheFactory() - val identityService = PersistentIdentityService(cacheFactory) - val originalIdentityService: PersistentIdentityService = services.identityService as PersistentIdentityService - identityService.database = originalIdentityService.database - identityService.start(originalIdentityService.trustRoot, pkToIdCache = PublicKeyToOwningIdentityCacheImpl(database, cacheFactory)) - val keyService = E2ETestKeyManagementService(identityService) - keyService.start(setOf(myself.keyPair)) - - // New identity for a notary (doesn't matter that it's for Bank Of Corda... since not going to use it as an actual notary etc). - val newKeyAndCert = keyService.freshKeyAndCert(services.myInfo.legalIdentitiesAndCerts[0], false) - val randomNotary = Party(myself.name, newKeyAndCert.owningKey) - - val ourIdentity = services.myInfo.legalIdentities.first() - val builder = TransactionBuilder(notary.party) - val issuer = services.myInfo.legalIdentities.first().ref(ref) - val signers = Cash().generateIssue(builder, expected.issuedBy(issuer), ourIdentity, randomNotary) - val tx: SignedTransaction = services.signInitialTransaction(builder, signers) - services.recordTransactions(tx) - - val output = tx.tx.outputsOfType().single() - assertEquals(expected.`issued by`(ourIdentity.ref(ref)), output.amount) - } -} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateInteractionTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateInteractionTests.kt new file mode 100644 index 0000000000..3f321008c1 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateInteractionTests.kt @@ -0,0 +1,168 @@ +package net.corda.node.services.persistence + +import net.corda.core.contracts.BelongsToContract +import net.corda.core.contracts.Contract +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.TransactionState +import net.corda.core.contracts.TypeOnlyCommandData +import net.corda.core.identity.AbstractParty +import net.corda.core.identity.CordaX500Name +import net.corda.core.schemas.MappedSchema +import net.corda.core.schemas.PersistentState +import net.corda.core.schemas.QueryableState +import net.corda.core.serialization.CordaSerializable +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.OpaqueBytes +import net.corda.finance.DOLLARS +import net.corda.finance.`issued by` +import net.corda.finance.contracts.asset.Cash +import net.corda.finance.issuedBy +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.testing.common.internal.testNetworkParameters +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.core.TestIdentity +import net.corda.testing.internal.TestingNamedCacheFactory +import net.corda.testing.node.MockServices +import org.assertj.core.api.Assertions.assertThat +import org.hibernate.annotations.Cascade +import org.hibernate.annotations.CascadeType +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import java.lang.IllegalArgumentException +import javax.persistence.Entity +import javax.persistence.Id +import javax.persistence.OneToMany +import javax.persistence.Table +import javax.persistence.GeneratedValue +import javax.persistence.GenerationType +import kotlin.test.assertEquals + +/** + * These tests cover the interactions between Corda and Hibernate with regards to flushing/detaching/cascading. + */ +class HibernateInteractionTests { + + @Rule + @JvmField + val testSerialization = SerializationEnvironmentRule() + + private val cordapps = listOf("net.corda.finance", "net.corda.node.services.persistence") + + private val myself = TestIdentity(CordaX500Name("Me", "London", "GB")) + private val notary = TestIdentity(CordaX500Name("NotaryService", "London", "GB"), 1337L) + + lateinit var services: MockServices + lateinit var database: CordaPersistence + + @Before + fun setUp() { + val (db, mockServices) = MockServices.makeTestDatabaseAndPersistentServices( + cordappPackages = cordapps, + initialIdentity = myself, + networkParameters = testNetworkParameters(minimumPlatformVersion = 4), + moreIdentities = setOf(notary.identity), + moreKeys = emptySet(), + // forcing a cache size of zero, so that all requests lead to a cache miss and end up hitting the database + cacheFactory = TestingNamedCacheFactory(0) + ) + services = mockServices + database = db + } + + // AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a + // cache miss was doing a flush. This also checks that loading during flush does actually work. + @Test + fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() { + val expected = 500.DOLLARS + val ref = OpaqueBytes.of(0x01) + + val ourIdentity = services.myInfo.legalIdentities.first() + val builder = TransactionBuilder(notary.party) + val issuer = services.myInfo.legalIdentities.first().ref(ref) + val signers = Cash().generateIssue(builder, expected.issuedBy(issuer), ourIdentity, notary.party) + val tx: SignedTransaction = services.signInitialTransaction(builder, signers) + services.recordTransactions(tx) + + val output = tx.tx.outputsOfType().single() + assertEquals(expected.`issued by`(ourIdentity.ref(ref)), output.amount) + } + + @Test + fun `when a cascade is in progress (because of nested entities), the node avoids to flush & detach entities, since it's not allowed by Hibernate`() { + val ourIdentity = services.myInfo.legalIdentities.first() + + val childEntities = listOf(SimpleContract.ChildState(ourIdentity)) + val parentEntity = SimpleContract.ParentState(childEntities) + + val builder = TransactionBuilder(notary.party) + .addOutputState(TransactionState(parentEntity, SimpleContract::class.java.name, notary.party)) + .addCommand(SimpleContract.Issue(), listOf(ourIdentity.owningKey)) + val tx: SignedTransaction = services.signInitialTransaction(builder, listOf(ourIdentity.owningKey)) + services.recordTransactions(tx) + + val output = tx.tx.outputsOfType().single() + assertThat(output.children.single().member).isEqualTo(ourIdentity) + } + + object PersistenceSchema: MappedSchema(PersistenceSchema::class.java, 1, listOf(Parent::class.java, Child::class.java)) { + + @Entity(name = "parents") + @Table + class Parent: PersistentState() { + + @Cascade(CascadeType.ALL) + @OneToMany(targetEntity = Child::class) + val children: MutableCollection = mutableSetOf() + + fun addChild(child: Child) { + children.add(child) + } + } + + @Entity(name = "children") + class Child( + @Id + // Do not change this: this generation type is required in order to trigger the proper cascade ordering. + @GeneratedValue(strategy = GenerationType.IDENTITY) + val identifier: Int?, + + val member: AbstractParty? + ) { + constructor(member: AbstractParty): this(null, member) + } + + } + + class SimpleContract: Contract { + + @BelongsToContract(SimpleContract::class) + @CordaSerializable + data class ParentState(val children: List): ContractState, QueryableState { + override fun supportedSchemas(): Iterable = listOf(PersistenceSchema) + + override fun generateMappedObject(schema: MappedSchema): PersistentState { + return when(schema) { + is PersistenceSchema -> { + val parent = PersistenceSchema.Parent() + children.forEach { parent.addChild(PersistenceSchema.Child(it.member)) } + parent + } + else -> throw IllegalArgumentException("Unrecognised schema $schema") + } + } + + override val participants: List = children.map { it.member } + } + + @CordaSerializable + data class ChildState(val member: AbstractParty) + + override fun verify(tx: LedgerTransaction) {} + + class Issue: TypeOnlyCommandData() + } + +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 3644845e68..7a30fad6ee 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -69,8 +69,8 @@ class FlowFrameworkTests { @Before fun setUpMockNet() { mockNet = InternalMockNetwork( - cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP), - servicePeerAllocationStrategy = RoundRobin() + cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP), + servicePeerAllocationStrategy = RoundRobin() ) aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) @@ -139,13 +139,13 @@ class FlowFrameworkTests { mockNet.runNetwork() assertSessionTransfers( - aliceNode sent sessionInit(PingPongFlow::class, payload = 10L) to bobNode, - bobNode sent sessionConfirm() to aliceNode, - bobNode sent sessionData(20L) to aliceNode, - aliceNode sent sessionData(11L) to bobNode, - bobNode sent sessionData(21L) to aliceNode, - aliceNode sent normalEnd to bobNode, - bobNode sent normalEnd to aliceNode + aliceNode sent sessionInit(PingPongFlow::class, payload = 10L) to bobNode, + bobNode sent sessionConfirm() to aliceNode, + bobNode sent sessionData(20L) to aliceNode, + aliceNode sent sessionData(11L) to bobNode, + bobNode sent sessionData(21L) to aliceNode, + aliceNode sent normalEnd to bobNode, + bobNode sent normalEnd to aliceNode ) } @@ -167,7 +167,8 @@ class FlowFrameworkTests { it.message is ExistingSessionMessage && it.message.payload === EndSessionMessage }.subscribe { sessionEndReceived.release() } val resultFuture = aliceNode.services.startFlow( - WaitForOtherSideEndBeforeSendAndReceive(bob, sessionEndReceived)).resultFuture + WaitForOtherSideEndBeforeSendAndReceive(bob, sessionEndReceived) + ).resultFuture mockNet.runNetwork() assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy { resultFuture.getOrThrow() @@ -186,10 +187,10 @@ class FlowFrameworkTests { mockNet.runNetwork() assertThatExceptionOfType(MyFlowException::class.java) - .isThrownBy { receivingFiber.resultFuture.getOrThrow() } - .withMessage("Nothing useful") - .withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow - .withStackTraceContaining("Received counter-flow exception from peer") + .isThrownBy { receivingFiber.resultFuture.getOrThrow() } + .withMessage("Nothing useful") + .withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow + .withStackTraceContaining("Received counter-flow exception from peer") bobNode.database.transaction { assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty() } @@ -197,15 +198,15 @@ class FlowFrameworkTests { assertThat(receivingFiber.state).isEqualTo(Strand.State.WAITING) assertThat((erroringFlow.get().stateMachine as FlowStateMachineImpl).state).isEqualTo(Strand.State.WAITING) assertThat(erroringFlowSteps.get()).containsExactly( - Notification.createOnNext(ProgressTracker.STARTING), - Notification.createOnNext(ExceptionFlow.START_STEP), - Notification.createOnError(erroringFlow.get().exceptionThrown) + Notification.createOnNext(ProgressTracker.STARTING), + Notification.createOnNext(ExceptionFlow.START_STEP), + Notification.createOnError(erroringFlow.get().exceptionThrown) ) assertSessionTransfers( - aliceNode sent sessionInit(ReceiveFlow::class) to bobNode, - bobNode sent sessionConfirm() to aliceNode, - bobNode sent errorMessage(erroringFlow.get().exceptionThrown) to aliceNode + aliceNode sent sessionInit(ReceiveFlow::class) to bobNode, + bobNode sent sessionConfirm() to aliceNode, + bobNode sent errorMessage(erroringFlow.get().exceptionThrown) to aliceNode ) // Make sure the original stack trace isn't sent down the wire val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage @@ -296,8 +297,8 @@ class FlowFrameworkTests { @Test fun waitForLedgerCommit() { val ptx = TransactionBuilder(notary = notaryIdentity) - .addOutputState(DummyState(), DummyContract.PROGRAM_ID) - .addCommand(dummyCommand(alice.owningKey)) + .addOutputState(DummyState(), DummyContract.PROGRAM_ID) + .addCommand(dummyCommand(alice.owningKey)) val stx = aliceNode.services.signInitialTransaction(ptx) val committerStx = aliceNode.registerCordappFlowFactory(CommitterFlow::class) { @@ -313,8 +314,8 @@ class FlowFrameworkTests { @Test fun `waitForLedgerCommit throws exception if any active session ends in error`() { val ptx = TransactionBuilder(notary = notaryIdentity) - .addOutputState(DummyState(), DummyContract.PROGRAM_ID) - .addCommand(dummyCommand()) + .addOutputState(DummyState(), DummyContract.PROGRAM_ID) + .addCommand(dummyCommand()) val stx = aliceNode.services.signInitialTransaction(ptx) aliceNode.registerCordappFlowFactory(WaitForLedgerCommitFlow::class) { ExceptionFlow { throw Exception("Error") } } @@ -354,8 +355,8 @@ class FlowFrameworkTests { val result = aliceNode.services.startFlow(UpgradedFlow(bob)).resultFuture mockNet.runNetwork() assertThat(receivedSessionMessages).startsWith( - aliceNode sent sessionInit(UpgradedFlow::class, flowVersion = 2) to bobNode, - bobNode sent sessionConfirm(flowVersion = 1) to aliceNode + aliceNode sent sessionInit(UpgradedFlow::class, flowVersion = 2) to bobNode, + bobNode sent sessionConfirm(flowVersion = 1) to aliceNode ) val (receivedPayload, node2FlowVersion) = result.getOrThrow() assertThat(receivedPayload).isEqualTo("Old initiated") @@ -369,8 +370,8 @@ class FlowFrameworkTests { val flowInfo = aliceNode.services.startFlow(initiatingFlow).resultFuture mockNet.runNetwork() assertThat(receivedSessionMessages).startsWith( - aliceNode sent sessionInit(SendFlow::class, flowVersion = 1, payload = "Old initiating") to bobNode, - bobNode sent sessionConfirm(flowVersion = 2) to aliceNode + aliceNode sent sessionInit(SendFlow::class, flowVersion = 1, payload = "Old initiating") to bobNode, + bobNode sent sessionConfirm(flowVersion = 2) to aliceNode ) assertThat(flowInfo.get().flowVersion).isEqualTo(2) } @@ -380,8 +381,8 @@ class FlowFrameworkTests { val future = aliceNode.services.startFlow(NeverRegisteredFlow("Hello", bob)).resultFuture mockNet.runNetwork() assertThatExceptionOfType(UnexpectedFlowEndException::class.java) - .isThrownBy { future.getOrThrow() } - .withMessageEndingWith("${NeverRegisteredFlow::class.java.name} is not registered") + .isThrownBy { future.getOrThrow() } + .withMessageEndingWith("${NeverRegisteredFlow::class.java.name} is not registered") } @Test @@ -441,9 +442,9 @@ class FlowFrameworkTests { erroringFlowFuture.getOrThrow() val flowSteps = erroringFlowSteps.get() assertThat(flowSteps).containsExactly( - Notification.createOnNext(ProgressTracker.STARTING), - Notification.createOnNext(ExceptionFlow.START_STEP), - Notification.createOnError(erroringFlowFuture.get().exceptionThrown) + Notification.createOnNext(ProgressTracker.STARTING), + Notification.createOnNext(ExceptionFlow.START_STEP), + Notification.createOnError(erroringFlowFuture.get().exceptionThrown) ) val receiveFlowException = assertFailsWith(UnexpectedFlowEndException::class) { @@ -451,28 +452,28 @@ class FlowFrameworkTests { } assertThat(receiveFlowException.message).doesNotContain("evil bug!") assertThat(receiveFlowSteps.get()).containsExactly( - Notification.createOnNext(ProgressTracker.STARTING), - Notification.createOnNext(ReceiveFlow.START_STEP), - Notification.createOnError(receiveFlowException) + Notification.createOnNext(ProgressTracker.STARTING), + Notification.createOnNext(ReceiveFlow.START_STEP), + Notification.createOnError(receiveFlowException) ) assertSessionTransfers( - aliceNode sent sessionInit(ReceiveFlow::class) to bobNode, - bobNode sent sessionConfirm() to aliceNode, - bobNode sent errorMessage() to aliceNode + aliceNode sent sessionInit(ReceiveFlow::class) to bobNode, + bobNode sent sessionConfirm() to aliceNode, + bobNode sent errorMessage() to aliceNode ) } @Test fun `initiating flow using unknown AnonymousParty`() { val anonymousBob = bobNode.services.keyManagementService.freshKeyAndCert(bobNode.info.legalIdentitiesAndCerts.single(), false) - .party.anonymise() + .party.anonymise() bobNode.registerCordappFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) } val result = aliceNode.services.startFlow(SendAndReceiveFlow(anonymousBob, "Hello")).resultFuture mockNet.runNetwork() assertThatIllegalArgumentException() - .isThrownBy { result.getOrThrow() } - .withMessage("We do not know who $anonymousBob belongs to") + .isThrownBy { result.getOrThrow() } + .withMessage("Could not resolve destination: $anonymousBob") } @Test @@ -497,16 +498,18 @@ class FlowFrameworkTests { private val FlowLogic<*>.progressSteps: CordaFuture>> get() { return progressTracker!!.changes - .ofType(Change.Position::class.java) - .map { it.newStep } - .materialize() - .toList() - .toFuture() + .ofType(Change.Position::class.java) + .map { it.newStep } + .materialize() + .toList() + .toFuture() } @InitiatingFlow - private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party, - @Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic() { + private class WaitForOtherSideEndBeforeSendAndReceive( + val otherParty: Party, + @Transient val receivedOtherFlowEnd: Semaphore + ) : FlowLogic() { @Suspendable override fun call() { // Kick off the flow on the other side ... @@ -626,7 +629,8 @@ class FlowFrameworkTests { //endregion Helpers } -internal fun sessionConfirm(flowVersion: Int = 1) = ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, ""))) +internal fun sessionConfirm(flowVersion: Int = 1) = + ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, ""))) internal inline fun > TestStartedNode.getSingleFlow(): Pair> { return smm.findStateMachines(P::class.java).single() @@ -637,17 +641,17 @@ private fun sanitise(message: SessionMessage) = when (message) { is ExistingSessionMessage -> { val payload = message.payload message.copy( - recipientSessionId = SessionId(0), - payload = when (payload) { - is ConfirmSessionMessage -> payload.copy( - initiatedSessionId = SessionId(0), - initiatedFlowInfo = payload.initiatedFlowInfo.copy(appName = "") - ) - is ErrorSessionMessage -> payload.copy( - errorId = 0 - ) - else -> payload - } + recipientSessionId = SessionId(0), + payload = when (payload) { + is ConfirmSessionMessage -> payload.copy( + initiatedSessionId = SessionId(0), + initiatedFlowInfo = payload.initiatedFlowInfo.copy(appName = "") + ) + is ErrorSessionMessage -> payload.copy( + errorId = 0 + ) + else -> payload + } ) } } @@ -667,10 +671,12 @@ internal fun TestStartedNode.sendSessionMessage(message: SessionMessage, destina } } -internal fun errorMessage(errorResponse: FlowException? = null) = ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0)) +internal fun errorMessage(errorResponse: FlowException? = null) = + ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0)) internal infix fun TestStartedNode.sent(message: SessionMessage): Pair = Pair(internals.id, message) -internal infix fun Pair.to(node: TestStartedNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress) +internal infix fun Pair.to(node: TestStartedNode): SessionTransfer = + SessionTransfer(first, second, node.network.myAddress) internal data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) { val isPayloadTransfer: Boolean @@ -785,7 +791,11 @@ internal class MyFlowException(override val message: String) : FlowException() { internal class MyPeerFlowException(override val message: String, val peer: Party) : FlowException() @InitiatingFlow -internal class SendAndReceiveFlow(private val destination: Destination, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic() { +internal class SendAndReceiveFlow( + private val destination: Destination, + private val payload: Any, + private val otherPartySession: FlowSession? = null +) : FlowLogic() { constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession) @Suspendable @@ -795,7 +805,8 @@ internal class SendAndReceiveFlow(private val destination: Destination, private } @InitiatingFlow -internal class PingPongFlow(private val otherParty: Party, private val payload: Long, private val otherPartySession: FlowSession? = null) : FlowLogic() { +internal class PingPongFlow(private val otherParty: Party, private val payload: Long, private val otherPartySession: FlowSession? = null) : + FlowLogic() { constructor(otherPartySession: FlowSession, payload: Long) : this(otherPartySession.counterparty, payload, otherPartySession) @Transient diff --git a/testing/DockerfileBase b/testing/DockerfileBase index 6af9e25d5e..25f77adf2d 100644 --- a/testing/DockerfileBase +++ b/testing/DockerfileBase @@ -3,8 +3,17 @@ ENV GRADLE_USER_HOME=/tmp/gradle RUN mkdir /tmp/gradle && mkdir -p /home/root/.m2/repository RUN apt-get update && apt-get install -y curl libatomic1 && \ - curl -O https://d3pxv6yz143wms.cloudfront.net/8.222.10.1/java-1.8.0-amazon-corretto-jdk_8.222.10-1_amd64.deb && \ - apt-get install -y java-common && dpkg -i java-1.8.0-amazon-corretto-jdk_8.222.10-1_amd64.deb && \ + curl -O https://cdn.azul.com/zulu/bin/zulu8.40.0.25-ca-jdk8.0.222-linux_amd64.deb && \ + apt-get install -y java-common && apt install -y ./zulu8.40.0.25-ca-jdk8.0.222-linux_amd64.deb && \ apt-get clean && \ - mkdir -p /tmp/source + rm -f zulu8.40.0.25-ca-jdk8.0.222-linux_amd64.deb && \ + curl -O https://cdn.azul.com/zulu/bin/zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz && \ + mv /zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz /usr/lib/jvm/ && \ + cd /usr/lib/jvm/ && \ + tar -zxvf zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz && \ + rm -rf zulu-8-amd64 && \ + mv zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64 zulu-8-amd64 && \ + rm -f zulu8.40.0.25-ca-fx-jdk8.0.222-linux_x64.tar.gz && \ + cd / && mkdir -p /tmp/source + diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index 27cb01c617..01bd936cbe 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -139,10 +139,12 @@ open class MockServices private constructor( * Makes database and persistent services appropriate for unit tests which require persistence across the vault, identity service * and key managment service. * - * @param cordappPackages A [List] of cordapp packages to scan for any cordapp code, e.g. contract verification code, flows and services. + * @param cordappPackages A [List] of cordapp packages to scan for any cordapp code, e.g. contract verification code, + * flows and services. * @param initialIdentity The first (typically sole) identity the services will represent. * @param moreKeys A list of additional [KeyPair] instances to be used by [MockServices]. * @param moreIdentities A list of additional [KeyPair] instances to be used by [MockServices]. + * @param cacheFactory A custom cache factory to be used by the created [IdentityService] * @return A pair where the first element is the instance of [CordaPersistence] and the second is [MockServices]. */ @JvmStatic @@ -152,12 +154,13 @@ open class MockServices private constructor( initialIdentity: TestIdentity, networkParameters: NetworkParameters = testNetworkParameters(modifiedTime = Instant.MIN), moreKeys: Set, - moreIdentities: Set + moreIdentities: Set, + cacheFactory: TestingNamedCacheFactory = TestingNamedCacheFactory() ): Pair { val cordappLoader = cordappLoaderForPackages(cordappPackages) val dataSourceProps = makeTestDataSourceProperties() val schemaService = NodeSchemaService(cordappLoader.cordappSchemas) - val identityService = PersistentIdentityService(TestingNamedCacheFactory()) + val identityService = PersistentIdentityService(cacheFactory) val persistence = configureDatabase( hikariProperties = dataSourceProps, databaseConfig = DatabaseConfig(), @@ -167,7 +170,7 @@ open class MockServices private constructor( internalSchemas = schemaService.internalSchemas() ) - val pkToIdCache = PublicKeyToOwningIdentityCacheImpl(persistence, TestingNamedCacheFactory()) + val pkToIdCache = PublicKeyToOwningIdentityCacheImpl(persistence, cacheFactory) // Create a persistent identity service and add all the supplied identities. identityService.apply { diff --git a/tools/network-builder/src/main/kotlin/net/corda/networkbuilder/notaries/NotaryCopier.kt b/tools/network-builder/src/main/kotlin/net/corda/networkbuilder/notaries/NotaryCopier.kt index 7e476905e0..7e4210d144 100644 --- a/tools/network-builder/src/main/kotlin/net/corda/networkbuilder/notaries/NotaryCopier.kt +++ b/tools/network-builder/src/main/kotlin/net/corda/networkbuilder/notaries/NotaryCopier.kt @@ -6,6 +6,7 @@ import net.corda.networkbuilder.nodes.FoundNode import net.corda.networkbuilder.nodes.NodeCopier import org.slf4j.LoggerFactory import java.io.File +import java.nio.file.Paths class NotaryCopier(private val cacheDir: File) : NodeCopier(cacheDir) { @@ -28,7 +29,9 @@ class NotaryCopier(private val cacheDir: File) : NodeCopier(cacheDir) { fun generateNodeInfo(dirToGenerateFrom: File): File { val nodeInfoGeneratorProcess = ProcessBuilder() - .command(listOf("java", "-jar", "corda.jar", "generate-node-info")) + .command(listOf( + Paths.get(System.getProperty("java.home"), "bin", "java").toString(), + "-jar", "corda.jar", "generate-node-info")) .directory(dirToGenerateFrom) .inheritIO() .start() diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index ac5edb6e7e..a78edf970b 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -13,6 +13,7 @@ import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.GracefulReconnect import net.corda.client.rpc.PermissionException +import net.corda.client.rpc.notUsed import net.corda.core.CordaException import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.UniqueIdentifier @@ -70,6 +71,7 @@ import kotlin.concurrent.thread // TODO: Resurrect or reimplement the mail plugin. // TODO: Make it notice new shell commands added after the node started. +@Suppress("MaxLineLength") object InteractiveShell { private val log = LoggerFactory.getLogger(javaClass) private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps @@ -521,8 +523,11 @@ object InteractiveShell { val parser = StringToMethodCallParser(CordaRPCOps::class.java, inputObjectMapper) val call = parser.parse(cordaRPCOps, cmd) result = call.call() + var subscription : Subscriber<*>? = null if (result != null && result !== kotlin.Unit && result !is Void) { - result = printAndFollowRPCResponse(result, out, outputFormat) + val (subs, future) = printAndFollowRPCResponse(result, out, outputFormat) + subscription = subs + result = future } if (result is Future<*>) { if (!result.isDone) { @@ -532,6 +537,7 @@ object InteractiveShell { try { result = result.get() } catch (e: InterruptedException) { + subscription?.unsubscribe() Thread.currentThread().interrupt() } catch (e: ExecutionException) { throw e.rootCause @@ -621,7 +627,11 @@ object InteractiveShell { } } - private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter, outputFormat: OutputFormat): CordaFuture { + private fun printAndFollowRPCResponse( + response: Any?, + out: PrintWriter, + outputFormat: OutputFormat + ): Pair> { val outputMapper = createOutputMapper(outputFormat) val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } @@ -659,34 +669,52 @@ object InteractiveShell { } } - private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture { + private fun maybeFollow( + response: Any?, + printerFun: (Any?) -> String, + out: PrintWriter + ): Pair> { // Match on a couple of common patterns for "important" observables. It's tough to do this in a generic // way because observables can be embedded anywhere in the object graph, and can emit other arbitrary // object graphs that contain yet more observables. So we just look for top level responses that follow // the standard "track" pattern, and print them until the user presses Ctrl-C - if (response == null) return doneFuture(Unit) + var result = Pair>(null, doneFuture(Unit)) - if (response is DataFeed<*, *>) { - out.println("Snapshot:") - out.println(printerFun(response.snapshot)) - out.flush() - out.println("Updates:") - return printNextElements(response.updates, printerFun, out) + + when { + response is DataFeed<*, *> -> { + out.println("Snapshot:") + out.println(printerFun(response.snapshot)) + out.flush() + out.println("Updates:") + + val unsubscribeAndPrint: (Any?) -> String = { resp -> + if (resp is StateMachineUpdate.Added) { + resp.stateMachineInfo.progressTrackerStepAndUpdates?.updates?.notUsed() + } + printerFun(resp) + } + + result = printNextElements(response.updates, unsubscribeAndPrint, out) + } + response is Observable<*> -> { + result = printNextElements(response, printerFun, out) + } + response != null -> { + out.println(printerFun(response)) + } } - if (response is Observable<*>) { - - return printNextElements(response, printerFun, out) - } - - out.println(printerFun(response)) - return doneFuture(Unit) + return result } - private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture { - + private fun printNextElements( + elements: Observable<*>, + printerFun: (Any?) -> String, + out: PrintWriter + ): Pair> { val subscriber = PrintingSubscriber(printerFun, out) uncheckedCast(elements).subscribe(subscriber) - return subscriber.future + return Pair(subscriber, subscriber.future) } } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/utlities/ANSIProgressRenderer.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/utlities/ANSIProgressRenderer.kt index 20037be4bc..f670d5388f 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/utlities/ANSIProgressRenderer.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/utlities/ANSIProgressRenderer.kt @@ -177,7 +177,7 @@ abstract class ANSIProgressRenderer { ansi.fgRed() ansi.a("${IntStream.range(indent, indent).mapToObj { "\t" }.toList().joinToString(separator = "") { s -> s }} $errorIcon ${error.message}") ansi.reset() - errorToPrint = error.cause + errorToPrint = errorToPrint.cause indent++ } ansi.eraseLine(Ansi.Erase.FORWARD)