diff --git a/.ci/dev/compatibility/JenkinsfileJDK11Azul b/.ci/dev/compatibility/JenkinsfileJDK11Azul index e466dd1b96..cd1a2da800 100644 --- a/.ci/dev/compatibility/JenkinsfileJDK11Azul +++ b/.ci/dev/compatibility/JenkinsfileJDK11Azul @@ -108,7 +108,9 @@ pipeline { "-Ddocker.container.env.parameter.CORDA_ARTIFACTORY_USERNAME=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " + "-Ddocker.container.env.parameter.CORDA_ARTIFACTORY_PASSWORD=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " + "-Ddocker.dockerfile=DockerfileJDK11Azul" + - " clean pushBuildImage preAllocateForParallelRegressionTest preAllocateForAllParallelSlowIntegrationTest --stacktrace" + " clean preAllocateForAllParallelUnitTest preAllocateForAllParallelIntegrationTest " + + " preAllocateForAllParallelSlowIntegrationTest preAllocateForAllParallelSmokeTest " + + " pushBuildImage --stacktrace" } sh "kubectl auth can-i get pods" } @@ -116,7 +118,7 @@ pipeline { stage('Testing phase') { parallel { - stage('Regression Test') { + stage('Unit Test') { steps { sh "./gradlew " + "-DbuildId=\"\${BUILD_ID}\" " + @@ -126,7 +128,33 @@ pipeline { "-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " + "-Dgit.branch=\"\${GIT_BRANCH}\" " + "-Dgit.target.branch=\"\${GIT_BRANCH}\" " + - " parallelRegressionTest --stacktrace" + " allParallelUnitTest --stacktrace" + } + } + stage('Integration Test') { + steps { + sh "./gradlew " + + "-DbuildId=\"\${BUILD_ID}\" " + + "-Dkubenetize=true " + + "-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " + + "-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " + + "-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " + + "-Dgit.branch=\"\${GIT_BRANCH}\" " + + "-Dgit.target.branch=\"\${GIT_BRANCH}\" " + + " allParallelIntegrationTest --stacktrace" + } + } + stage('Smoke Test') { + steps { + sh "./gradlew " + + "-DbuildId=\"\${BUILD_ID}\" " + + "-Dkubenetize=true " + + "-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " + + "-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " + + "-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " + + "-Dgit.branch=\"\${GIT_BRANCH}\" " + + "-Dgit.target.branch=\"\${GIT_BRANCH}\" " + + " allParallelSmokeTest --stacktrace" } } stage('Slow Integration Test') { diff --git a/build.gradle b/build.gradle index fcacad11a4..a46fabf303 100644 --- a/build.gradle +++ b/build.gradle @@ -340,9 +340,9 @@ allprojects { attributes('Corda-Docs-Link': corda_docs_link) } } - + tasks.withType(Test).configureEach { - forkEvery = 10 + forkEvery = 20 ignoreFailures = project.hasProperty('tests.ignoreFailures') ? project.property('tests.ignoreFailures').toBoolean() : false failFast = project.hasProperty('tests.failFast') ? project.property('tests.failFast').toBoolean() : false diff --git a/client/rpc/build.gradle b/client/rpc/build.gradle index d27d64c7d0..38bc661404 100644 --- a/client/rpc/build.gradle +++ b/client/rpc/build.gradle @@ -65,6 +65,9 @@ processSmokeTestResources { from(project(':finance:contracts').tasks['jar']) { rename '.*finance-contracts-.*', 'cordapp-finance-contracts.jar' } + from(project(':testing:cordapps:sleeping').tasks['jar']) { + rename 'testing-sleeping-cordapp-*', 'cordapp-sleeping.jar' + } } // To find potential version conflicts, run "gradle htmlDependencyReport" and then look in @@ -94,6 +97,7 @@ dependencies { smokeTestCompile project(':smoke-test-utils') smokeTestCompile project(':finance:contracts') smokeTestCompile project(':finance:workflows') + smokeTestCompile project(':testing:cordapps:sleeping') smokeTestCompile "org.apache.logging.log4j:log4j-slf4j-impl:$log4j_version" smokeTestCompile "org.apache.logging.log4j:log4j-core:$log4j_version" smokeTestCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version" diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt index 68db0b1acf..b7bdc19f50 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt @@ -561,6 +561,40 @@ class CordaRPCClientReconnectionTest { } } + @Test(timeout=300_000) + fun `rpc re-attaches to client id flow on node restart with flows draining mode on`() { + driver(DriverParameters(inMemoryDB = false, cordappsForAllNodes = listOf(this.enclosedCordapp()))) { + val address = NetworkHostAndPort("localhost", portAllocator.nextPort()) + fun startNode(additionalCustomOverrides: Map = emptyMap()): NodeHandle { + return startNode( + providedName = CHARLIE_NAME, + rpcUsers = listOf(CordaRPCClientTest.rpcUser), + customOverrides = mapOf("rpcSettings.address" to address.toString()) + additionalCustomOverrides + ).getOrThrow() + } + + val node = startNode() + val client = CordaRPCClient(node.rpcAddress, config) + (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use { + val rpcOps = it.proxy as ReconnectingCordaRPCOps + val clientId = UUID.randomUUID().toString() + val flowHandle0 = rpcOps.startFlowWithClientId(clientId, ::SimpleFlow) + + node.rpc.setFlowsDrainingModeEnabled(true) + node.stop() + + thread { + sleep(1000) + startNode() + } + + val result0 = flowHandle0.returnValue.getOrThrow() + assertEquals(5, result0) + assertThat(rpcOps.reconnectingRPCConnection.isClosed()) + } + } + } + @StartableByRPC class SimpleFlow : FlowLogic() { diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index 6fe840171d..71964d961e 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -319,7 +319,7 @@ class ReconnectingCordaRPCOps private constructor( checkIfClosed() var remainingAttempts = maxNumberOfAttempts var lastException: Throwable? = null - while (remainingAttempts != 0 && !reconnectingRPCConnection.isClosed()) { + loop@ while (remainingAttempts != 0 && !reconnectingRPCConnection.isClosed()) { try { log.debug { "Invoking RPC $method..." } return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also { diff --git a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt index b6fd9e5544..dea9797cd2 100644 --- a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt +++ b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt @@ -8,34 +8,49 @@ import net.corda.core.crypto.SecureHash import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.InputStreamAndHash -import net.corda.core.messaging.* +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.messaging.startFlow +import net.corda.core.messaging.startTrackedFlow +import net.corda.core.messaging.vaultQueryBy +import net.corda.core.messaging.vaultTrackBy import net.corda.core.node.NodeInfo import net.corda.core.node.services.Vault -import net.corda.core.node.services.vault.* +import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM +import net.corda.core.node.services.vault.PageSpecification +import net.corda.core.node.services.vault.QueryCriteria +import net.corda.core.node.services.vault.Sort +import net.corda.core.node.services.vault.SortAttribute import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.minutes import net.corda.core.utilities.seconds import net.corda.finance.DOLLARS import net.corda.finance.POUNDS import net.corda.finance.SWISS_FRANCS import net.corda.finance.USD import net.corda.finance.contracts.asset.Cash -import net.corda.finance.workflows.getCashBalance -import net.corda.finance.workflows.getCashBalances import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashPaymentFlow +import net.corda.finance.workflows.getCashBalance +import net.corda.finance.workflows.getCashBalances import net.corda.java.rpc.StandaloneCordaRPCJavaClientTest import net.corda.nodeapi.internal.config.User +import net.corda.sleeping.SleepingFlow import net.corda.smoketesting.NodeConfig import net.corda.smoketesting.NodeProcess import org.apache.commons.io.output.NullOutputStream import org.hamcrest.text.MatchesPattern -import org.junit.* +import org.junit.After +import org.junit.Before +import org.junit.Ignore +import org.junit.Rule +import org.junit.Test import org.junit.rules.ExpectedException import java.io.FilterInputStream import java.io.InputStream -import java.util.* +import java.util.Currency import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern @@ -242,7 +257,7 @@ class StandaloneCordaRPClientTest { exception.expect(PermissionException::class.java) exception.expectMessage(MatchesPattern(Pattern.compile("User not authorized to perform RPC call .*killFlow.*"))) - val flowHandle = rpcProxy.startFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), notaryNodeIdentity) + val flowHandle = rpcProxy.startFlow(::SleepingFlow, 1.minutes) notary.connect(nonUser).use { connection -> val rpcProxy = connection.proxy rpcProxy.killFlow(flowHandle.id) @@ -251,7 +266,7 @@ class StandaloneCordaRPClientTest { @Test(timeout=300_000) fun `test kill flow with killFlow permission`() { - val flowHandle = rpcProxy.startFlow(::CashIssueFlow, 83.DOLLARS, OpaqueBytes.of(0), notaryNodeIdentity) + val flowHandle = rpcProxy.startFlow(::SleepingFlow, 1.minutes) notary.connect(rpcUser).use { connection -> val rpcProxy = connection.proxy assertTrue(rpcProxy.killFlow(flowHandle.id)) diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt index f7ee175318..c71d211759 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt @@ -57,6 +57,9 @@ class RPCPermissionsTests : AbstractRPCTest() { assertNotAllowed { proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow") } + assertNotAllowed { + proxy.validatePermission("startFlow", "net.corda.flows.DummyFlow") + } } } @@ -67,6 +70,10 @@ class RPCPermissionsTests : AbstractRPCTest() { val proxy = testProxyFor(adminUser) proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow") proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.DummyFlow") + proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.DummyFlow") + proxy.validatePermission("startFlow", "net.corda.flows.DummyFlow") + proxy.validatePermission("startTrackedFlow", "net.corda.flows.DummyFlow") + proxy.validatePermission("startFlowWithClientId", "net.corda.flows.DummyFlow") } } @@ -77,6 +84,10 @@ class RPCPermissionsTests : AbstractRPCTest() { val proxy = testProxyFor(joeUser) proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow") proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.DummyFlow") + proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.DummyFlow") + proxy.validatePermission("startFlow", "net.corda.flows.DummyFlow") + proxy.validatePermission("startTrackedFlow", "net.corda.flows.DummyFlow") + proxy.validatePermission("startFlowWithClientId", "net.corda.flows.DummyFlow") } } @@ -91,6 +102,18 @@ class RPCPermissionsTests : AbstractRPCTest() { assertNotAllowed { proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.OtherFlow") } + assertNotAllowed { + proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.OtherFlow") + } + assertNotAllowed { + proxy.validatePermission("startFlow", "net.corda.flows.OtherFlow") + } + assertNotAllowed { + proxy.validatePermission("startTrackedFlow", "net.corda.flows.OtherFlow") + } + assertNotAllowed { + proxy.validatePermission("startFlowWithClientId", "net.corda.flows.OtherFlow") + } } } @@ -121,6 +144,16 @@ class RPCPermissionsTests : AbstractRPCTest() { proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow") proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.DummyFlow") proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.OtherFlow") + proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.OtherFlow") + proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.DummyFlow") + + proxy.validatePermission("startFlow", "net.corda.flows.OtherFlow") + proxy.validatePermission("startFlow", "net.corda.flows.DummyFlow") + proxy.validatePermission("startTrackedFlow", "net.corda.flows.DummyFlow") + proxy.validatePermission("startTrackedFlow", "net.corda.flows.OtherFlow") + proxy.validatePermission("startFlowWithClientId", "net.corda.flows.OtherFlow") + proxy.validatePermission("startFlowWithClientId", "net.corda.flows.DummyFlow") + assertNotAllowed { proxy.validatePermission("startTrackedFlowDynamic", "net.banned.flows.OtherFlow") } diff --git a/constants.properties b/constants.properties index d3b5385722..b0e26108cd 100644 --- a/constants.properties +++ b/constants.properties @@ -20,7 +20,7 @@ quasarVersion11=0.8.1_r3 jdkClassifier11=jdk11 proguardVersion=6.1.1 bouncycastleVersion=1.66 -classgraphVersion=4.8.89 +classgraphVersion=4.8.90 disruptorVersion=3.4.2 typesafeConfigVersion=1.3.4 jsr305Version=3.0.2 diff --git a/core-tests/src/test/kotlin/net/corda/coretests/indentity/PartyAndCertificateTest.kt b/core-tests/src/test/kotlin/net/corda/coretests/indentity/PartyAndCertificateTest.kt index 54898eeaeb..9908e01521 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/indentity/PartyAndCertificateTest.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/indentity/PartyAndCertificateTest.kt @@ -2,6 +2,7 @@ package net.corda.coretests.indentity import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Jimfs +import net.corda.core.crypto.Crypto import net.corda.core.crypto.entropyToKeyPair import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party @@ -14,6 +15,7 @@ import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.getTestPartyAndCertificate import net.corda.coretesting.internal.DEV_ROOT_CA import org.assertj.core.api.Assertions.assertThat +import org.junit.Before import org.junit.Rule import org.junit.Test import java.math.BigInteger @@ -24,6 +26,13 @@ class PartyAndCertificateTest { @JvmField val testSerialization = SerializationEnvironmentRule() + @Before + fun setUp() { + // Register providers before creating Jimfs filesystem. JimFs creates an SSHD instance which + // register BouncyCastle and EdDSA provider separately, which wrecks havoc. + Crypto.registerProviders() + } + @Test(timeout=300_000) fun `reject a path with no roles`() { val path = X509Utilities.buildCertPath(DEV_ROOT_CA.certificate) diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index 51e6939257..c2351baa4c 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -314,6 +314,7 @@ interface CordaRPCOps : RPCOps { /** * Removes a flow's [clientId] to result/ exception mapping. If the mapping is of a running flow, then the mapping will not get removed. + * This version will only remove flow's that were started by the same user currently calling [removeClientId]. * * See [startFlowDynamicWithClientId] for more information. * @@ -322,13 +323,32 @@ interface CordaRPCOps : RPCOps { fun removeClientId(clientId: String): Boolean /** - * Returns all finished flows that were started with a client id. + * Removes a flow's [clientId] to result/ exception mapping. If the mapping is of a running flow, then the mapping will not get removed. + * This version can be called for all client ids, ignoring which user originally started a flow with [clientId]. * - * @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully, - * [false] if completed exceptionally. + * See [startFlowDynamicWithClientId] for more information. + * + * @return whether the mapping was removed. + */ + fun removeClientIdAsAdmin(clientId: String): Boolean + + /** + * Returns all finished flows that were started with a client ID for which the client ID mapping has not been removed. This version only + * returns the client ids for flows started by the same user currently calling [finishedFlowsWithClientIds]. + * + * @return A [Map] containing client ids for finished flows started by the user calling [finishedFlowsWithClientIds], mapped to [true] + * if finished successfully, [false] if completed exceptionally. */ fun finishedFlowsWithClientIds(): Map + /** + * Returns all finished flows that were started with a client id by all RPC users for which the client ID mapping has not been removed. + * + * @return A [Map] containing all client ids for finished flows, mapped to [true] if finished successfully, + * [false] if completed exceptionally. + */ + fun finishedFlowsWithClientIdsAsAdmin(): Map + /** Returns Node's NodeInfo, assuming this will not change while the node is running. */ fun nodeInfo(): NodeInfo diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/exceptions/RpcExceptions.kt b/node-api/src/main/kotlin/net/corda/nodeapi/exceptions/RpcExceptions.kt index ce550f291d..f9613c812a 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/exceptions/RpcExceptions.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/exceptions/RpcExceptions.kt @@ -35,6 +35,12 @@ class RejectedCommandException(message: String) : CordaRuntimeException(message), @Suppress("DEPRECATION") net.corda.core.ClientRelevantError +/** + * Thrown to indicate that the command was rejected by the node, typically due to a special temporary mode. + */ +class MissingAttachmentException(message: String) : + CordaRuntimeException(message) + /** * Allows an implementing [Throwable] to be propagated to RPC clients. */ diff --git a/node/build.gradle b/node/build.gradle index 5f5eb08b4d..fe895177f7 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -242,6 +242,8 @@ dependencies { slowIntegrationTestRuntime configurations.runtime slowIntegrationTestRuntime configurations.testRuntime + integrationTestCompile(project(":testing:cordapps:missingmigration")) + testCompile project(':testing:cordapps:dbfailure:dbfworkflows') } diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt index 99f5674620..17b4f36320 100644 --- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt @@ -1,25 +1,45 @@ package net.corda.node.flows import co.paralleluniverse.fibers.Suspendable +import net.corda.client.rpc.CordaRPCClient +import net.corda.client.rpc.PermissionException import net.corda.core.CordaRuntimeException import net.corda.core.flows.FlowLogic +import net.corda.core.flows.HospitalizeFlowException +import net.corda.core.flows.KilledFlowException import net.corda.core.flows.ResultSerializationException import net.corda.core.flows.StartableByRPC +import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.openFuture +import net.corda.core.messaging.FlowHandleWithClientId +import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlowWithClientId import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds +import net.corda.node.services.Permissions +import net.corda.node.services.statemachine.Checkpoint +import net.corda.nodeapi.exceptions.RejectedCommandException +import net.corda.testing.core.ALICE_NAME import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.driver +import net.corda.testing.node.User import org.assertj.core.api.Assertions import org.junit.Before import org.junit.Test import rx.Observable +import java.time.Duration +import java.time.Instant import java.util.UUID +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeoutException +import kotlin.reflect.KClass import kotlin.test.assertEquals import kotlin.test.assertFailsWith +import kotlin.test.assertFalse import kotlin.test.assertNotEquals +import kotlin.test.assertNull import kotlin.test.assertTrue class FlowWithClientIdTest { @@ -29,7 +49,7 @@ class FlowWithClientIdTest { ResultFlow.hook = null } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `start flow with client id`() { val clientId = UUID.randomUUID().toString() driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { @@ -41,7 +61,7 @@ class FlowWithClientIdTest { } } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `remove client id`() { val clientId = UUID.randomUUID().toString() var counter = 0 @@ -64,7 +84,7 @@ class FlowWithClientIdTest { } } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `on flow unserializable result a 'CordaRuntimeException' is thrown containing in its message the unserializable type`() { val clientId = UUID.randomUUID().toString() driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { @@ -79,7 +99,7 @@ class FlowWithClientIdTest { } } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `If flow has an unserializable exception result then it gets converted into a 'CordaRuntimeException'`() { ResultFlow.hook = { throw UnserializableException() @@ -107,7 +127,7 @@ class FlowWithClientIdTest { } } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `reattachFlowWithClientId can retrieve results from existing flow future`() { val clientId = UUID.randomUUID().toString() driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { @@ -141,7 +161,7 @@ class FlowWithClientIdTest { } } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `finishedFlowsWithClientIds returns completed flows with client ids`() { val clientId = UUID.randomUUID().toString() driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { @@ -151,35 +171,344 @@ class FlowWithClientIdTest { assertEquals(true, finishedFlows[clientId]) } } -} -@StartableByRPC -internal class ResultFlow(private val result: A): FlowLogic() { - companion object { - var hook: (() -> Unit)? = null - var suspendableHook: FlowLogic? = null + @Test(timeout=300_000) + fun `a client id flow can be re-attached when flows draining mode is on`() { + val clientId = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode().getOrThrow() + val result0 = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds) + assertEquals(5, result0) + + nodeA.rpc.setFlowsDrainingModeEnabled(true) + val result1 = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds) + assertEquals(5, result1) + } } - @Suspendable - override fun call(): A { - hook?.invoke() - suspendableHook?.let { subFlow(it) } - return result - } -} + @Test(timeout=300_000) + fun `if client id flow does not exist and flows draining mode is on, a RejectedCommandException gets thrown`() { + val clientId = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode().getOrThrow() -@StartableByRPC -internal class UnserializableResultFlow: FlowLogic>>() { - companion object { - val UNSERIALIZABLE_OBJECT = openFuture>().also { it.set(Observable.empty())} + nodeA.rpc.setFlowsDrainingModeEnabled(true) + assertFailsWith("Node is draining before shutdown. Cannot start new flows through RPC.") { + nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5) + } + } } - @Suspendable - override fun call(): OpenFuture> { - return UNSERIALIZABLE_OBJECT - } -} + @Test(timeout = 300_000) + fun `a killed flow's exception can be retrieved after restarting the node`() { + val clientId = UUID.randomUUID().toString() -internal class UnserializableException( - val unserializableObject: BrokenMap = BrokenMap() -): CordaRuntimeException("123") \ No newline at end of file + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet(), inMemoryDB = false)) { + val nodeA = startNode(providedName = ALICE_NAME).getOrThrow() + var flowHandle0: FlowHandleWithClientId? = null + assertFailsWith { + flowHandle0 = nodeA.rpc.startFlowWithClientId(clientId, ::HospitalizeFlow) + nodeA.waitForOvernightObservation(flowHandle0!!.id, 20.seconds) + nodeA.rpc.killFlow(flowHandle0!!.id) + flowHandle0!!.returnValue.getOrThrow(20.seconds) + } + + val flowHandle1: FlowHandleWithClientId = nodeA.rpc.startFlowWithClientId(clientId, ::HospitalizeFlow) + assertFailsWith { + flowHandle1.returnValue.getOrThrow(20.seconds) + } + + assertEquals(flowHandle0!!.id, flowHandle1.id) + assertTrue(nodeA.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED)) + assertTrue(nodeA.hasException(flowHandle0!!.id, KilledFlowException::class)) + + nodeA.stop() + val nodeARestarted = startNode(providedName = ALICE_NAME).getOrThrow() + + assertFailsWith { + nodeARestarted.rpc.reattachFlowWithClientId(clientId)!!.returnValue.getOrThrow(20.seconds) + } + } + } + + private fun NodeHandle.hasStatus(id: StateMachineRunId, status: Checkpoint.FlowStatus): Boolean { + return rpc.startFlow(::IsFlowInStatus, id, status.ordinal).returnValue.getOrThrow(20.seconds) + } + + private fun NodeHandle.hasException(id: StateMachineRunId, type: KClass): Boolean { + return rpc.startFlow(::GetExceptionType, id).returnValue.getOrThrow(20.seconds) == type.qualifiedName + } + + private fun NodeHandle.waitForOvernightObservation(id: StateMachineRunId, timeout: Duration) { + val timeoutTime = Instant.now().plusSeconds(timeout.seconds) + var exists = false + while (Instant.now().isBefore(timeoutTime) && !exists) { + exists = rpc.startFlow(::IsFlowInStatus, id, Checkpoint.FlowStatus.HOSPITALIZED.ordinal).returnValue.getOrThrow(timeout) + Thread.sleep(1.seconds.toMillis()) + } + if (!exists) { + throw TimeoutException("Flow was not kept for observation during timeout duration") + } + } + + @Test(timeout = 300_000) + fun `reattaching to existing running flow using startFlowWithClientId for flow started by another user throws a permission exception`() { + val user = User("TonyStark", "I AM IRONMAN", setOf(Permissions.all())) + val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all())) + val clientId = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow() + val latch = CountDownLatch(1) + ResultFlow.hook = { + latch.await() + } + val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5) + val reattachedByStarter = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5) + + assertFailsWith { + CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use { + it.proxy.startFlowWithClientId(clientId, ::ResultFlow, 5) + } + } + + latch.countDown() + + assertEquals(5, flowHandle.returnValue.getOrThrow(20.seconds)) + assertEquals(5, reattachedByStarter.returnValue.getOrThrow(20.seconds)) + } + } + + @Test(timeout = 300_000) + fun `reattaching to existing completed flow using startFlowWithClientId for flow started by another user throws a permission exception`() { + val user = User("TonyStark", "I AM IRONMAN", setOf(Permissions.all())) + val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all())) + val clientId = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow() + nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds) + + assertFailsWith { + CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use { + it.proxy.startFlowWithClientId(clientId, ::ResultFlow, 5) + } + } + } + } + + @Test(timeout = 300_000) + fun `reattaching to existing completed flow using startFlowWithClientId for flow started by another user throws a permission exception (after node restart)`() { + val user = User("TonyStark", "I AM IRONMAN", setOf(Permissions.all())) + val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all())) + val clientId = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet(), inMemoryDB = false)) { + var nodeA = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user, spy)).getOrThrow() + nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds) + + nodeA.stop() + nodeA = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user, spy)).getOrThrow(20.seconds) + + assertFailsWith { + CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use { + it.proxy.startFlowWithClientId(clientId, ::ResultFlow, 5) + } + } + } + } + + @Test(timeout = 300_000) + fun `reattaching to existing flow using reattachFlowWithClientId for flow started by another user returns null`() { + val user = User("dan", "this is my password", setOf(Permissions.all())) + val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all())) + val clientId = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow() + val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5) + val reattachedByStarter = nodeA.rpc.reattachFlowWithClientId(clientId)?.returnValue?.getOrThrow(20.seconds) + + val reattachedBySpy = CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use { + it.proxy.reattachFlowWithClientId(clientId)?.returnValue?.getOrThrow(20.seconds) + } + + assertEquals(5, flowHandle.returnValue.getOrThrow(20.seconds)) + assertEquals(5, reattachedByStarter) + assertNull(reattachedBySpy) + } + } + + @Test(timeout = 300_000) + fun `removeClientId does not remove mapping for flows started by another user`() { + val user = User("dan", "this is my password", setOf(Permissions.all())) + val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all())) + val clientId = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow() + val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5) + + flowHandle.returnValue.getOrThrow(20.seconds) + + val removedBySpy = CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use { + it.proxy.removeClientId(clientId) + } + + val reattachedByStarter = nodeA.rpc.reattachFlowWithClientId(clientId)?.returnValue?.getOrThrow(20.seconds) + val removedByStarter = nodeA.rpc.removeClientId(clientId) + + assertEquals(5, reattachedByStarter) + assertTrue(removedByStarter) + assertFalse(removedBySpy) + } + } + + @Test(timeout = 300_000) + fun `removeClientIdAsAdmin does remove mapping for flows started by another user`() { + val user = User("dan", "this is my password", setOf(Permissions.all())) + val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all())) + val clientId = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow() + val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5) + + flowHandle.returnValue.getOrThrow(20.seconds) + + val removedBySpy = CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use { + it.proxy.removeClientIdAsAdmin(clientId) + } + + val reattachedByStarter = nodeA.rpc.reattachFlowWithClientId(clientId)?.returnValue?.getOrThrow(20.seconds) + val removedByStarter = nodeA.rpc.removeClientIdAsAdmin(clientId) + + assertNull(reattachedByStarter) + assertFalse(removedByStarter) + assertTrue(removedBySpy) + } + } + + @Test(timeout = 300_000) + fun `finishedFlowsWithClientIds does not return flows started by other users`() { + val user = User("CaptainAmerica", "That really is America's ass", setOf(Permissions.all())) + val spy = User("nsa", "EternalBlue", setOf(Permissions.all())) + val clientIdForUser = UUID.randomUUID().toString() + val clientIdForSpy = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow() + val flowHandleStartedByUser = nodeA.rpc.startFlowWithClientId(clientIdForUser, ::ResultFlow, 5) + + CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use { + val flowHandleStartedBySpy = it.proxy.startFlowWithClientId(clientIdForSpy, ::ResultFlow, 10) + + flowHandleStartedByUser.returnValue.getOrThrow(20.seconds) + flowHandleStartedBySpy.returnValue.getOrThrow(20.seconds) + + val userFinishedFlows = nodeA.rpc.finishedFlowsWithClientIds() + val spyFinishedFlows = it.proxy.finishedFlowsWithClientIds() + + assertEquals(1, userFinishedFlows.size) + assertEquals(clientIdForUser, userFinishedFlows.keys.single()) + assertEquals(5, nodeA.rpc.reattachFlowWithClientId(userFinishedFlows.keys.single())!!.returnValue.getOrThrow()) + assertEquals(1, spyFinishedFlows.size) + assertEquals(clientIdForSpy, spyFinishedFlows.keys.single()) + assertEquals(10, it.proxy.reattachFlowWithClientId(spyFinishedFlows.keys.single())!!.returnValue.getOrThrow()) + } + } + } + + @Test(timeout = 300_000) + fun `finishedFlowsWithClientIdsAsAdmin does return flows started by other users`() { + val user = User("CaptainAmerica", "That really is America's ass", setOf(Permissions.all())) + val spy = User("nsa", "EternalBlue", setOf(Permissions.all())) + val clientIdForUser = UUID.randomUUID().toString() + val clientIdForSpy = UUID.randomUUID().toString() + driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { + val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow() + val flowHandleStartedByUser = nodeA.rpc.startFlowWithClientId(clientIdForUser, ::ResultFlow, 5) + + CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use { + val flowHandleStartedBySpy = it.proxy.startFlowWithClientId(clientIdForSpy, ::ResultFlow, 10) + + flowHandleStartedByUser.returnValue.getOrThrow(20.seconds) + flowHandleStartedBySpy.returnValue.getOrThrow(20.seconds) + + val userFinishedFlows = nodeA.rpc.finishedFlowsWithClientIdsAsAdmin() + val spyFinishedFlows = it.proxy.finishedFlowsWithClientIdsAsAdmin() + + assertEquals(2, userFinishedFlows.size) + assertEquals(2, spyFinishedFlows.size) + assertEquals(userFinishedFlows, spyFinishedFlows) + } + } + } + + @StartableByRPC + internal class ResultFlow(private val result: A) : FlowLogic() { + companion object { + var hook: (() -> Unit)? = null + var suspendableHook: FlowLogic? = null + } + + @Suspendable + override fun call(): A { + hook?.invoke() + suspendableHook?.let { subFlow(it) } + return result + } + } + + @StartableByRPC + internal class UnserializableResultFlow : FlowLogic>>() { + companion object { + val UNSERIALIZABLE_OBJECT = openFuture>().also { it.set(Observable.empty()) } + } + + @Suspendable + override fun call(): OpenFuture> { + return UNSERIALIZABLE_OBJECT + } + } + + @StartableByRPC + internal class HospitalizeFlow : FlowLogic() { + + @Suspendable + override fun call() { + throw HospitalizeFlowException("time to go to the doctors") + } + } + + @StartableByRPC + internal class IsFlowInStatus(private val id: StateMachineRunId, private val ordinal: Int) : FlowLogic() { + @Suspendable + override fun call(): Boolean { + return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints where status = ? and flow_id = ?") + .apply { + setInt(1, ordinal) + setString(2, id.uuid.toString()) + } + .use { ps -> + ps.executeQuery().use { rs -> + rs.next() + rs.getLong(1) + } + }.toInt() == 1 + } + } + + @StartableByRPC + internal class GetExceptionType(private val id: StateMachineRunId) : FlowLogic() { + @Suspendable + override fun call(): String { + return serviceHub.jdbcSession().prepareStatement("select type from node_flow_exceptions where flow_id = ?") + .apply { setString(1, id.uuid.toString()) } + .use { ps -> + ps.executeQuery().use { rs -> + rs.next() + rs.getString(1) + } + + } + } + } + + internal class UnserializableException( + val unserializableObject: BrokenMap = BrokenMap() + ) : CordaRuntimeException("123") +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/KillFlowTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/KillFlowTest.kt index 0404412fe4..21a1a8e5bc 100644 --- a/node/src/integration-test/kotlin/net/corda/node/flows/KillFlowTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/flows/KillFlowTest.kt @@ -6,6 +6,7 @@ import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowExternalOperation import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowSession +import net.corda.core.flows.HospitalizeFlowException import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.KilledFlowException @@ -210,6 +211,26 @@ class KillFlowTest { } } + @Test(timeout = 300_000) + fun `killing a hospitalized flow ends the flow immediately`() { + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { + val alice = startNode(providedName = ALICE_NAME).getOrThrow() + alice.rpc.let { rpc -> + val handle = rpc.startFlow(::AFlowThatGetsMurderedWhileInTheHospital) + Thread.sleep(5000) + val time = measureTimeMillis { + rpc.killFlow(handle.id) + assertFailsWith { + handle.returnValue.getOrThrow(1.minutes) + } + } + assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow") + assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow") + assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)) + } + } + } + @Test(timeout = 300_000) fun `a killed flow will propagate the killed error to counter parties if it was suspended`() { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { @@ -482,6 +503,15 @@ class KillFlowTest { } } + @StartableByRPC + @InitiatingFlow + class AFlowThatGetsMurderedWhileInTheHospital : FlowLogic() { + @Suspendable + override fun call() { + throw HospitalizeFlowException("time to go to the doctors") + } + } + @StartableByRPC @InitiatingFlow class AFlowThatGetsMurderedAndSomehowKillsItsFriends(private val parties: List) : FlowLogic() { diff --git a/node/src/integration-test/kotlin/net/corda/node/persistence/DbSchemaInitialisationTest.kt b/node/src/integration-test/kotlin/net/corda/node/persistence/DbSchemaInitialisationTest.kt index 8c8de1a63e..e29b1429bf 100644 --- a/node/src/integration-test/kotlin/net/corda/node/persistence/DbSchemaInitialisationTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/persistence/DbSchemaInitialisationTest.kt @@ -2,10 +2,15 @@ package net.corda.node.persistence import net.corda.core.utilities.getOrThrow import net.corda.node.flows.isQuasarAgentSpecified -import net.corda.node.internal.ConfigurationException +import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException +import net.corda.nodeapi.internal.persistence.HibernateSchemaChangeException +import net.corda.testing.core.ALICE_NAME import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.NodeParameters import net.corda.testing.driver.driver +import net.corda.testing.node.TestCordapp +import net.corda.testing.node.internal.startNode +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.Test import kotlin.test.assertFailsWith @@ -13,10 +18,33 @@ class DbSchemaInitialisationTest { @Test(timeout = 300_000) fun `database initialisation not allowed in config`() { driver(DriverParameters(startNodesInProcess = isQuasarAgentSpecified(), cordappsForAllNodes = emptyList())) { - assertFailsWith(ConfigurationException::class) { + assertFailsWith(IllegalStateException::class) { startNode(NodeParameters(customOverrides = mapOf("database.initialiseSchema" to "false"))).getOrThrow() } } } + @Test(timeout = 300_000) + fun `app migration resource is only mandatory when not in dev mode`() { + driver(DriverParameters(startNodesInProcess = true, + cordappsForAllNodes = emptyList(), + allowHibernateToManageAppSchema = false)) { + // in dev mode, it fails because the schema of our test CorDapp is missing + assertThatExceptionOfType(HibernateSchemaChangeException::class.java) + .isThrownBy { + startNode(NodeParameters(additionalCordapps = listOf(TestCordapp.findCordapp("net.corda.failtesting.missingmigrationcordapp")))).getOrThrow() + } + .withMessage("Incompatible schema change detected. Please run schema migration scripts (node with sub-command run-migration-scripts). Reason: Schema-validation: missing table [test_table]") + + // without devMode, it doesn't even get this far as it complains about the schema migration missing. + assertThatExceptionOfType(CouldNotCreateDataSourceException::class.java) + .isThrownBy { + startNode( + ALICE_NAME, + false, + NodeParameters(additionalCordapps = listOf(TestCordapp.findCordapp("net.corda.failtesting.missingmigrationcordapp")))).getOrThrow() + } + .withMessage("Could not create the DataSource: No migration defined for schema: net.corda.failtesting.missingmigrationcordapp.MissingMigrationSchema v1") + } + } } \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt index 9c701ccd3a..bf8d2910b6 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt @@ -5,9 +5,15 @@ import net.corda.core.node.NodeInfo import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.schemas.NodeInfoSchemaV1 import net.corda.node.services.identity.InMemoryIdentityService +import net.corda.node.utilities.createKeyPairAndSelfSignedTLSCertificate import net.corda.nodeapi.internal.DEV_ROOT_CA import net.corda.nodeapi.internal.persistence.DatabaseConfig -import net.corda.testing.core.* +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.CHARLIE_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.core.TestIdentity import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.configureDatabase import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties @@ -159,6 +165,14 @@ class PersistentNetworkMapCacheTest { assertThat(charlieNetMapCache.getNodeByLegalName(BOB_NAME)).isNotNull } + @Test(timeout=300_000) + fun `negative test - invalid trust root leads to no node added`() { + val (_, badCert) = createKeyPairAndSelfSignedTLSCertificate(DEV_ROOT_CA.certificate.issuerX500Principal) + val netMapCache = PersistentNetworkMapCache(TestingNamedCacheFactory(), database, InMemoryIdentityService(trustRoot = badCert)) + netMapCache.addOrUpdateNode(createNodeInfo(listOf(ALICE))) + assertThat(netMapCache.allNodes).hasSize(0) + } + private fun createNodeInfo(identities: List, address: NetworkHostAndPort = NetworkHostAndPort("localhost", portCounter++)): NodeInfo { return NodeInfo( diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index f106874b91..fe29ff8138 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -162,6 +162,8 @@ import net.corda.nodeapi.internal.persistence.OutstandingDatabaseChangesExceptio import net.corda.nodeapi.internal.persistence.RestrictedConnection import net.corda.nodeapi.internal.persistence.RestrictedEntityManager import net.corda.nodeapi.internal.persistence.SchemaMigration +import net.corda.nodeapi.internal.persistence.contextDatabase +import net.corda.nodeapi.internal.persistence.withoutDatabaseAccess import net.corda.tools.shell.InteractiveShell import org.apache.activemq.artemis.utils.ReusableLatch import org.jolokia.jvmagent.JolokiaServer @@ -245,7 +247,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, private val notaryLoader = configuration.notary?.let { NotaryLoader(it, versionInfo) } - val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo).closeOnStop() + val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo).closeOnStop(false) val schemaService = NodeSchemaService(cordappLoader.cordappSchemas).tokenize() val identityService = PersistentIdentityService(cacheFactory).tokenize() val database: CordaPersistence = createCordaPersistence( @@ -388,8 +390,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration, return this } - protected fun T.closeOnStop(): T { - runOnStop += this::close + protected fun T.closeOnStop(usesDatabase: Boolean = true): T { + if (usesDatabase) { + contextDatabase // Will throw if no database is available, since this would run after closing the database, yet claims it needs it. + runOnStop += this::close + } else { + runOnStop += { withoutDatabaseAccess { this.close() } } + } return this } @@ -470,9 +477,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration, pendingCoreChanges = schemaMigration.getPendingChangesCount(schemaService.internalSchemas, true) } if(updateAppSchemas) { - schemaMigration.runMigration(!updateAppSchemasWithCheckpoints && haveCheckpoints, schemaService.appSchemas, false) + schemaMigration.runMigration(!updateAppSchemasWithCheckpoints && haveCheckpoints, schemaService.appSchemas, !configuration.devMode) } else { - pendingAppChanges = schemaMigration.getPendingChangesCount(schemaService.appSchemas, false) + pendingAppChanges = schemaMigration.getPendingChangesCount(schemaService.appSchemas, !configuration.devMode) } } // Now log the vendor string as this will also cause a connection to be tested eagerly. @@ -541,7 +548,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, installCoreFlows() registerCordappFlows() services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows } - val rpcOps = makeRPCOps(cordappLoader) startShell() networkMapClient?.start(trustRoot) @@ -554,6 +560,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration, networkMapCache.start(netParams.notaries) startDatabase() + // The following services need to be closed before the database, so need to be registered after it is started. + networkMapUpdater.closeOnStop() + schedulerService.closeOnStop() + val rpcOps = makeRPCOps(cordappLoader) identityService.start(trustRoot, keyStoreHandler.nodeIdentity, netParams.notaries.map { it.identity }, pkToIdCache) @@ -794,7 +804,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, configuration.baseDirectory, configuration.extraNetworkMapKeys, networkParametersStorage - ).closeOnStop() + ) protected open fun makeNodeSchedulerService() = NodeSchedulerService( platformClock, @@ -805,7 +815,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, nodeProperties, configuration.drainingModePollPeriod, unfinishedSchedules = busyNodeLatch - ).tokenize().closeOnStop() + ).tokenize() private fun makeCordappLoader(configuration: NodeConfiguration, versionInfo: VersionInfo): CordappLoader { val generatedCordapps = mutableListOf(VirtualCordapp.generateCore(versionInfo)) @@ -991,7 +1001,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, database.startHikariPool(configuration.dataSourceProperties, metricRegistry) { dataSource, haveCheckpoints -> SchemaMigration(dataSource, cordappLoader, configuration.networkParametersPath, configuration.myLegalName) .checkOrUpdate(schemaService.internalSchemas, runMigrationScripts, haveCheckpoints, true) - .checkOrUpdate(schemaService.appSchemas, runMigrationScripts, haveCheckpoints && !allowAppSchemaUpgradeWithCheckpoints, false) + .checkOrUpdate(schemaService.appSchemas, runMigrationScripts, haveCheckpoints && !allowAppSchemaUpgradeWithCheckpoints, !configuration.devMode) } /** Loads and starts a notary service if it is configured. */ diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index b9beecb325..3612abbf2d 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -1,6 +1,5 @@ package net.corda.node.internal -import net.corda.client.rpc.RPCException import net.corda.client.rpc.notUsed import net.corda.common.logging.CordaVersion import net.corda.core.CordaRuntimeException @@ -55,6 +54,7 @@ import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.rpc.context import net.corda.node.services.statemachine.StateMachineManager +import net.corda.nodeapi.exceptions.MissingAttachmentException import net.corda.nodeapi.exceptions.NonRpcFlowException import net.corda.nodeapi.exceptions.RejectedCommandException import rx.Observable @@ -163,14 +163,18 @@ internal class CordaRPCOpsImpl( override fun killFlow(id: StateMachineRunId): Boolean = smm.killFlow(id) override fun reattachFlowWithClientId(clientId: String): FlowHandleWithClientId? { - return smm.reattachFlowWithClientId(clientId)?.run { + return smm.reattachFlowWithClientId(clientId, context().principal())?.run { FlowHandleWithClientIdImpl(id = id, returnValue = resultFuture, clientId = clientId) } } - override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId) + override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId, context().principal(), false) - override fun finishedFlowsWithClientIds(): Map = smm.finishedFlowsWithClientIds() + override fun removeClientIdAsAdmin(clientId: String): Boolean = smm.removeClientId(clientId, context().principal(), true) + + override fun finishedFlowsWithClientIds(): Map = smm.finishedFlowsWithClientIds(context().principal(), false) + + override fun finishedFlowsWithClientIdsAsAdmin(): Map = smm.finishedFlowsWithClientIds(context().principal(), true) override fun stateMachinesFeed(): DataFeed, StateMachineUpdate> { @@ -267,7 +271,8 @@ internal class CordaRPCOpsImpl( private fun startFlow(logicType: Class>, context: InvocationContext, args: Array): FlowStateMachineHandle { if (!logicType.isAnnotationPresent(StartableByRPC::class.java)) throw NonRpcFlowException(logicType) if (isFlowsDrainingModeEnabled()) { - throw RejectedCommandException("Node is draining before shutdown. Cannot start new flows through RPC.") + return context.clientId?.let { smm.reattachFlowWithClientId(it, context.principal()) } + ?: throw RejectedCommandException("Node is draining before shutdown. Cannot start new flows through RPC.") } return flowStarter.invokeFlowAsync(logicType, context, *args).getOrThrow() } @@ -278,7 +283,7 @@ internal class CordaRPCOpsImpl( override fun openAttachment(id: SecureHash): InputStream { return services.attachments.openAttachment(id)?.open() ?: - throw RPCException("Unable to open attachment with id: $id") + throw MissingAttachmentException("Unable to open attachment with id: $id") } override fun uploadAttachment(jar: InputStream): SecureHash { diff --git a/node/src/main/kotlin/net/corda/node/internal/security/RPCPermissionResolver.kt b/node/src/main/kotlin/net/corda/node/internal/security/RPCPermissionResolver.kt index 6d42383d73..e166fca2af 100644 --- a/node/src/main/kotlin/net/corda/node/internal/security/RPCPermissionResolver.kt +++ b/node/src/main/kotlin/net/corda/node/internal/security/RPCPermissionResolver.kt @@ -41,30 +41,34 @@ internal object RPCPermissionResolver : PermissionResolver { private const val ACTION_INVOKE_RPC = "invokerpc" private const val ACTION_ALL = "all" private val FLOW_RPC_CALLS = setOf( - "startFlowDynamic", - "startTrackedFlowDynamic", - "startFlow", - "startTrackedFlow") + "startFlowDynamic", + "startTrackedFlowDynamic", + "startFlowDynamicWithClientId", + "startFlow", + "startTrackedFlow", + "startFlowWithClientId" + ) + + private val FLOW_RPC_PERMITTED_START_FLOW_CALLS = setOf("startFlow", "startFlowDynamic") + private val FLOW_RPC_PERMITTED_TRACKED_START_FLOW_CALLS = setOf("startTrackedFlow", "startTrackedFlowDynamic") + private val FLOW_RPC_PERMITTED_START_FLOW_WITH_CLIENT_ID_CALLS = setOf("startFlowWithClientId", "startFlowDynamicWithClientId") override fun resolvePermission(representation: String): Permission { when (representation.substringBefore(SEPARATOR).toLowerCase()) { ACTION_INVOKE_RPC -> { val rpcCall = representation.substringAfter(SEPARATOR, "") - require(representation.count { it == SEPARATOR } == 1 && rpcCall.isNotEmpty()) { - "Malformed permission string" - } + require(representation.count { it == SEPARATOR } == 1 && rpcCall.isNotEmpty()) { "Malformed permission string" } val legacyPermitted = when (rpcCall) { - "startFlow" -> setOf("startFlowDynamic", rpcCall) - "startTrackedFlow" -> setOf("startTrackedFlowDynamic", rpcCall) + "startFlow" -> FLOW_RPC_PERMITTED_START_FLOW_CALLS + "startTrackedFlow" -> FLOW_RPC_PERMITTED_TRACKED_START_FLOW_CALLS + "startFlowWithClientId" -> FLOW_RPC_PERMITTED_START_FLOW_WITH_CLIENT_ID_CALLS else -> setOf(rpcCall) } return RPCPermission(legacyPermitted.toFullyQualified()) } ACTION_START_FLOW -> { val targetFlow = representation.substringAfter(SEPARATOR, "") - require(targetFlow.isNotEmpty()) { - "Missing target flow after StartFlow" - } + require(targetFlow.isNotEmpty()) { "Missing target flow after StartFlow" } return RPCPermission(FLOW_RPC_CALLS.toFullyQualified(), targetFlow) } ACTION_ALL -> { diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt index 6e750619f4..96eef77dfd 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt @@ -11,6 +11,7 @@ import java.util.stream.Stream /** * Thread-safe storage of fiber checkpoints. */ +@Suppress("TooManyFunctions") interface CheckpointStorage { /** * Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id @@ -100,5 +101,7 @@ interface CheckpointStorage { */ fun getFlowException(id: StateMachineRunId, throwIfMissing: Boolean = false): Any? + fun addFlowException(id: StateMachineRunId, exception: Throwable) + fun removeFlowException(id: StateMachineRunId): Boolean } diff --git a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt index 19b289df98..6b33dcf882 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/ConfigSections.kt @@ -14,7 +14,6 @@ import net.corda.common.validation.internal.Validated.Companion.invalid import net.corda.common.validation.internal.Validated.Companion.valid import net.corda.core.context.AuthServiceId import net.corda.core.internal.notary.NotaryServiceFlow -import net.corda.node.internal.ConfigurationException import net.corda.node.services.config.AuthDataSourceType import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.CertChainPolicyType @@ -281,17 +280,26 @@ internal object DatabaseConfigSpec : Configuration.Specification private val mappedSchemaCacheSize by long().optional().withDefaultValue(DatabaseConfig.Defaults.mappedSchemaCacheSize) override fun parseValid(configuration: Config, options: Configuration.Options): Valid { - if (initialiseSchema.isSpecifiedBy(configuration)){ - throw ConfigurationException("Unsupported configuration database/initialiseSchema - this option has been removed, please use the run-migration-scripts sub-command or the database management tool to modify schemas") + if (initialiseSchema.isSpecifiedBy(configuration)) { + return invalid(Configuration.Validation.Error.BadPath.of( + "Unsupported configuration database/initialiseSchema - this option has been removed, please use the run-migration-scripts sub-command or the database management tool to modify schemas", + "initialiseSchema", + "Boolean")) } - if (initialiseAppSchema.isSpecifiedBy(configuration)){ - throw ConfigurationException("Unsupported configuration database/initialiseAppSchema - this option has been removed, please use the run-migration-scripts sub-command or the database management tool to modify schemas") + if (initialiseAppSchema.isSpecifiedBy(configuration)) { + return invalid(Configuration.Validation.Error.BadPath.of( + "Unsupported configuration database/initialiseAppSchema - this option has been removed, please use the run-migration-scripts sub-command or the database management tool to modify schemas", + "initialiseAppSchema", + SchemaInitializationType::class.qualifiedName!!)) } - if (transactionIsolationLevel.isSpecifiedBy(configuration)){ - throw ConfigurationException("Unsupported configuration database/transactionIsolationLevel - this option has been removed and cannot be changed") + if (transactionIsolationLevel.isSpecifiedBy(configuration)) { + return invalid(Configuration.Validation.Error.BadPath.of( + "Unsupported configuration database/transactionIsolationLevel - this option has been removed and cannot be changed", + "transactionIsolationLevel", + TransactionIsolationLevel::class.qualifiedName!!)) } - val config = configuration.withOptions(options) + val config = configuration.withOptions(options) return valid(DatabaseConfig(config[exportHibernateJMXStatistics], config[mappedSchemaCacheSize])) } } diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt index 504c3df727..0b2b006583 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt @@ -188,6 +188,11 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, val nextScheduleDelay = try { updateNetworkMapCache() } catch (e: Exception) { + // Check to see if networkmap was reachable before and cached information exists + if (networkMapCache.allNodeHashes.size > 1) { + logger.debug("Networkmap Service unreachable but more than one nodeInfo entries found in the cache. Allowing node start-up to proceed.") + networkMapCache.nodeReady.set(null) + } logger.warn("Error encountered while updating network map, will retry in $defaultWatchHttpNetworkMapRetryInterval", e) defaultWatchHttpNetworkMapRetryInterval } diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt index d97025f4d1..5ada3d8533 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -19,7 +19,6 @@ import net.corda.core.node.services.PartyInfo import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.Try import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug import net.corda.node.internal.schemas.NodeInfoSchemaV1 @@ -32,6 +31,7 @@ import org.hibernate.Session import rx.Observable import rx.subjects.PublishSubject import java.security.PublicKey +import java.security.cert.CertPathValidatorException import java.util.* import javax.annotation.concurrent.ThreadSafe import javax.persistence.PersistenceException @@ -253,12 +253,15 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory, } private fun verifyIdentities(node: NodeInfo): Boolean { - val failures = node.legalIdentitiesAndCerts.mapNotNull { Try.on { it.verify(identityService.trustAnchor) } as? Try.Failure } - if (failures.isNotEmpty()) { - logger.warn("$node has ${failures.size} invalid identities:") - failures.forEach { logger.warn("", it) } + for (identity in node.legalIdentitiesAndCerts) { + try { + identity.verify(identityService.trustAnchor) + } catch (e: CertPathValidatorException) { + logger.warn("$node has invalid identity:\nError:$e\nIdentity:${identity.certPath}") + return false + } } - return failures.isEmpty() + return true } private fun verifyAndRegisterIdentities(node: NodeInfo): Boolean { diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index a088e3bf6e..a3ccbf018c 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -26,6 +26,7 @@ import net.corda.nodeapi.internal.persistence.currentDBSession import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY import org.apache.commons.lang3.exception.ExceptionUtils import org.hibernate.annotations.Type +import java.security.Principal import java.sql.Connection import java.sql.SQLException import java.time.Clock @@ -392,7 +393,7 @@ class DBCheckpointStorage( val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) { val errored = checkpoint.errorState as? ErrorState.Errored - errored?.let { createDBFlowException(flowId, it, now) } + errored?.run { createDBFlowException(flowId, errors.last().exception, now) } ?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}") } else { null @@ -460,7 +461,7 @@ class DBCheckpointStorage( val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) { val errored = checkpoint.errorState as? ErrorState.Errored - errored?.let { createDBFlowException(flowId, it, now) } + errored?.run { createDBFlowException(flowId, errors.last().exception, now) } ?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}") } else { null @@ -572,13 +573,21 @@ class DBCheckpointStorage( override fun getFinishedFlowsResultsMetadata(): Stream> { val session = currentDBSession() val jpqlQuery = - """select new ${DBFlowResultMetadataFields::class.java.name}(checkpoint.id, checkpoint.status, metadata.userSuppliedIdentifier) + """select new ${DBFlowResultMetadataFields::class.java.name}( + checkpoint.id, + checkpoint.status, + metadata.userSuppliedIdentifier, + metadata.startedBy + ) from ${DBFlowCheckpoint::class.java.name} checkpoint join ${DBFlowMetadata::class.java.name} metadata on metadata.id = checkpoint.flowMetadata - where checkpoint.status = ${FlowStatus.COMPLETED.ordinal} or checkpoint.status = ${FlowStatus.FAILED.ordinal}""".trimIndent() + where checkpoint.status = ${FlowStatus.COMPLETED.ordinal} + or checkpoint.status = ${FlowStatus.FAILED.ordinal} + or checkpoint.status = ${FlowStatus.KILLED.ordinal}""".trimIndent() val query = session.createQuery(jpqlQuery, DBFlowResultMetadataFields::class.java) return query.resultList.stream().map { - StateMachineRunId(UUID.fromString(it.id)) to FlowResultMetadata(it.status, it.clientId) + val startedBy = it.startedBy + StateMachineRunId(UUID.fromString(it.id)) to FlowResultMetadata(it.status, it.clientId, Principal { startedBy }) } } @@ -600,14 +609,21 @@ class DBCheckpointStorage( return serializedFlowException?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT) } + override fun addFlowException(id: StateMachineRunId, exception: Throwable) { + currentDBSession().save(createDBFlowException(id.uuid.toString(), exception, clock.instant())) + } + override fun removeFlowException(id: StateMachineRunId): Boolean { - val flowId = id.uuid.toString() - return deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, flowId) == 1 + return deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, id.uuid.toString()) == 1 } override fun updateStatus(runId: StateMachineRunId, flowStatus: FlowStatus) { - val update = "Update ${NODE_DATABASE_PREFIX}checkpoints set status = ${flowStatus.ordinal} where flow_id = '${runId.uuid}'" - currentDBSession().createNativeQuery(update).executeUpdate() + currentDBSession() + .createNativeQuery("Update ${NODE_DATABASE_PREFIX}checkpoints set status = :status, timestamp = :timestamp where flow_id = :id") + .setParameter("status", flowStatus.ordinal) + .setParameter("timestamp", clock.instant()) + .setParameter("id", runId.uuid.toString()) + .executeUpdate() } override fun updateCompatible(runId: StateMachineRunId, compatible: Boolean) { @@ -659,17 +675,15 @@ class DBCheckpointStorage( ) } - private fun createDBFlowException(flowId: String, errorState: ErrorState.Errored, now: Instant): DBFlowException { - return errorState.errors.last().exception.let { - DBFlowException( - flow_id = flowId, - type = it::class.java.name.truncate(MAX_EXC_TYPE_LENGTH, true), - message = it.message?.truncate(MAX_EXC_MSG_LENGTH, false), - stackTrace = it.stackTraceToString(), - value = it.storageSerialize().bytes, - persistedInstant = now - ) - } + private fun createDBFlowException(flowId: String, exception: Throwable, now: Instant): DBFlowException { + return DBFlowException( + flow_id = flowId, + type = exception::class.java.name.truncate(MAX_EXC_TYPE_LENGTH, true), + message = exception.message?.truncate(MAX_EXC_MSG_LENGTH, false), + stackTrace = exception.stackTraceToString(), + value = exception.storageSerialize().bytes, + persistedInstant = now + ) } private fun setDBFlowMetadataFinishTime(flowId: String, now: Instant) { @@ -746,7 +760,8 @@ class DBCheckpointStorage( private class DBFlowResultMetadataFields( val id: String, val status: FlowStatus, - val clientId: String? + val clientId: String?, + val startedBy: String ) private fun T.storageSerialize(): SerializedBytes { diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index 165e2c1f75..7bafadc44e 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -247,7 +247,6 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: txStorage.locked { val existingTransaction = getTransaction(id) if (existingTransaction == null) { - updates.filter { it.id == id }.toFuture() updateFuture } else { updateFuture.cancel(false) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt index 615afb8df5..67dfb0916e 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt @@ -69,6 +69,21 @@ sealed class Action { */ data class RemoveCheckpoint(val id: StateMachineRunId, val mayHavePersistentResults: Boolean = false) : Action() + /** + * Remove a flow's exception from the database. + * + * @param id The id of the flow + */ + data class RemoveFlowException(val id: StateMachineRunId) : Action() + + /** + * Persist an exception to the database for the related flow. + * + * @param id The id of the flow + * @param exception The exception to persist + */ + data class AddFlowException(val id: StateMachineRunId, val exception: Throwable) : Action() + /** * Persist the deduplication facts of [deduplicationHandlers]. */ diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index 929d3d594a..87b4a508c5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -69,6 +69,8 @@ internal class ActionExecutorImpl( is Action.CancelFlowTimeout -> cancelFlowTimeout(action) is Action.MoveFlowToPaused -> executeMoveFlowToPaused(action) is Action.UpdateFlowStatus -> executeUpdateFlowStatus(action) + is Action.RemoveFlowException -> executeRemoveFlowException(action) + is Action.AddFlowException -> executeAddFlowException(action) } } private fun executeReleaseSoftLocks(action: Action.ReleaseSoftLocks) { @@ -252,4 +254,12 @@ internal class ActionExecutorImpl( private fun scheduleFlowTimeout(action: Action.ScheduleFlowTimeout) { stateMachineManager.scheduleFlowTimeout(action.flowId) } + + private fun executeRemoveFlowException(action: Action.RemoveFlowException) { + checkpointStorage.removeFlowException(action.id) + } + + private fun executeAddFlowException(action: Action.AddFlowException) { + checkpointStorage.addFlowException(action.id, action.exception) + } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 1802f6c89c..4633636c9e 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -6,11 +6,13 @@ import co.paralleluniverse.fibers.instrument.JavaAgent import co.paralleluniverse.strands.channels.Channel import com.codahale.metrics.Gauge import com.google.common.util.concurrent.ThreadFactoryBuilder +import net.corda.client.rpc.PermissionException import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.flows.FlowException import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic +import net.corda.core.flows.KilledFlowException import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party import net.corda.core.internal.FlowStateMachine @@ -47,6 +49,7 @@ import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl import net.corda.serialization.internal.withTokenContext import org.apache.activemq.artemis.utils.ReusableLatch import rx.Observable +import java.security.Principal import java.security.SecureRandom import java.util.ArrayList import java.util.HashSet @@ -78,8 +81,6 @@ internal class SingleThreadedStateMachineManager( private val VALID_KILL_FLOW_STATUSES = setOf( Checkpoint.FlowStatus.RUNNABLE, - Checkpoint.FlowStatus.FAILED, - Checkpoint.FlowStatus.COMPLETED, Checkpoint.FlowStatus.HOSPITALIZED, Checkpoint.FlowStatus.PAUSED ) @@ -180,7 +181,7 @@ internal class SingleThreadedStateMachineManager( flowTimeoutScheduler::resetCustomTimeout ) - val (fibers, pausedFlows) = restoreFlowsFromCheckpoints() + val (flows, pausedFlows) = restoreFlowsFromCheckpoints() metrics.register("Flows.InFlight", Gauge { innerState.flows.size }) setFlowDefaultUncaughtExceptionHandler() @@ -196,35 +197,40 @@ internal class SingleThreadedStateMachineManager( } // - Incompatible checkpoints need to be handled upon implementing CORDA-3897 - for (flow in fibers.values) { + for ((id, flow) in flows) { flow.fiber.clientId?.let { - innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(flow.fiber.id, doneFuture(flow.fiber)) - } - } - - for (pausedFlow in pausedFlows) { - pausedFlow.value.checkpoint.checkpointState.invocationContext.clientId?.let { innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active( - pausedFlow.key, - doneClientIdFuture(pausedFlow.key, pausedFlow.value.resultFuture, it) + flowId = id, + user = flow.fiber.transientState.checkpoint.checkpointState.invocationContext.principal(), + flowStateMachineFuture = doneFuture(flow.fiber) ) } } - val finishedFlowsResults = checkpointStorage.getFinishedFlowsResultsMetadata().toList() - for ((id, finishedFlowResult) in finishedFlowsResults) { - finishedFlowResult.clientId?.let { - if (finishedFlowResult.status == Checkpoint.FlowStatus.COMPLETED) { - innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed(id, true) - } else { - innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed(id, false) - } + for ((id, pausedFlow) in pausedFlows) { + pausedFlow.checkpoint.checkpointState.invocationContext.clientId?.let { clientId -> + innerState.clientIdsToFlowIds[clientId] = FlowWithClientIdStatus.Active( + flowId = id, + user = pausedFlow.checkpoint.checkpointState.invocationContext.principal(), + flowStateMachineFuture = doneClientIdFuture(id, pausedFlow.resultFuture, clientId) + ) + } + } + + val finishedFlows = checkpointStorage.getFinishedFlowsResultsMetadata().toList() + for ((id, finishedFlow) in finishedFlows) { + finishedFlow.clientId?.let { + innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed( + flowId = id, + user = finishedFlow.user, + succeeded = finishedFlow.status == Checkpoint.FlowStatus.COMPLETED + ) } ?: logger.error("Found finished flow $id without a client id. Something is very wrong and this flow will be ignored.") } return { logger.info("Node ready, info: ${serviceHub.myInfo}") - resumeRestoredFlows(fibers) + resumeRestoredFlows(flows) flowMessaging.start { _, deduplicationHandler -> executor.execute { deliverExternalEvent(deduplicationHandler.externalCause) @@ -289,7 +295,7 @@ internal class SingleThreadedStateMachineManager( } } - @Suppress("ComplexMethod") + @Suppress("ComplexMethod", "NestedBlockDepth") private fun startFlow( flowId: StateMachineRunId, flowLogic: FlowLogic, @@ -311,7 +317,7 @@ internal class SingleThreadedStateMachineManager( status } else { newFuture = openFuture() - FlowWithClientIdStatus.Active(flowId, newFuture!!) + FlowWithClientIdStatus.Active(flowId, context.principal(), newFuture!!) } } } @@ -321,6 +327,13 @@ internal class SingleThreadedStateMachineManager( // If the flow ID is the same as the one recorded in the client ID map, // then this start flow event has been retried, and we should not de-duplicate. if (flowId != it.flowId) { + // If the user that started the original flow is not the same as the user making the current request, + // return an exception as they are not permitted to see the result of the flow + if (!it.isPermitted(context.principal())) { + return@startFlow openFuture>().apply { + setException(PermissionException("A flow using this client id [$clientId] has already been started by another user")) + } + } val existingFuture = activeOrRemovedClientIdFuture(it, clientId) return@startFlow uncheckedCast(existingFuture) } @@ -352,28 +365,62 @@ internal class SingleThreadedStateMachineManager( override fun killFlow(id: StateMachineRunId): Boolean { val flow = innerState.withLock { flows[id] } - val killFlowResult = if (flow != null) { - flow.withFlowLock(VALID_KILL_FLOW_STATUSES) { + val killFlowResult = flow?.let { killInMemoryFlow(it) } ?: killOutOfMemoryFlow(id) + return killFlowResult || flowHospital.dropSessionInit(id) + } + + private fun killInMemoryFlow(flow: Flow<*>): Boolean { + val id = flow.fiber.id + return flow.withFlowLock(VALID_KILL_FLOW_STATUSES) { + if (!flow.fiber.transientState.isKilled) { + flow.fiber.transientState = flow.fiber.transientState.copy(isKilled = true) logger.info("Killing flow $id known to this node.") - // The checkpoint and soft locks are removed here instead of relying on the processing of the next event after setting - // the killed flag. This is to ensure a flow can be removed from the database, even if it is stuck in a infinite loop. - database.transaction { - checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true) - serviceHub.vaultService.softLockRelease(id.uuid) + // The checkpoint and soft locks are handled here as well as in a flow's transition. This means that we do not need to rely + // on the processing of the next event after setting the killed flag. This is to ensure a flow can be updated/removed from + // the database, even if it is stuck in a infinite loop. + if (flow.fiber.transientState.isAnyCheckpointPersisted) { + database.transaction { + if (flow.fiber.clientId != null) { + checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.KILLED) + checkpointStorage.removeFlowException(id) + checkpointStorage.addFlowException(id, KilledFlowException(id)) + } else { + checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true) + } + serviceHub.vaultService.softLockRelease(id.uuid) + } } unfinishedFibers.countDown() - - flow.fiber.transientState = flow.fiber.transientState.copy(isKilled = true) scheduleEvent(Event.DoRemainingWork) true + } else { + logger.info("A repeated request to kill flow $id has been made, ignoring...") + false } - } else { - // It may be that the id refers to a checkpoint that couldn't be deserialised into a flow, so we delete it if it exists. - database.transaction { checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true) } } + } - return killFlowResult || flowHospital.dropSessionInit(id) + private fun killOutOfMemoryFlow(id: StateMachineRunId): Boolean { + return database.transaction { + val checkpoint = checkpointStorage.getCheckpoint(id) + when { + checkpoint != null && checkpoint.status == Checkpoint.FlowStatus.COMPLETED -> { + logger.info("Attempt to kill flow $id which has already completed, ignoring...") + false + } + checkpoint != null && checkpoint.status == Checkpoint.FlowStatus.FAILED -> { + logger.info("Attempt to kill flow $id which has already failed, ignoring...") + false + } + checkpoint != null && checkpoint.status == Checkpoint.FlowStatus.KILLED -> { + logger.info("Attempt to kill flow $id which has already been killed, ignoring...") + false + } + // It may be that the id refers to a checkpoint that couldn't be deserialised into a flow, so we delete it if it exists. + else -> checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true) + } + } } private fun markAllFlowsAsPaused() { @@ -415,10 +462,10 @@ internal class SingleThreadedStateMachineManager( if (flow != null) { decrementLiveFibers() totalFinishedFlows.inc() - return when (removalReason) { + when (removalReason) { is FlowRemovalReason.OrderlyFinish -> removeFlowOrderly(flow, removalReason, lastState) is FlowRemovalReason.ErrorFinish -> removeFlowError(flow, removalReason, lastState) - FlowRemovalReason.SoftShutdown -> flow.fiber.scheduleEvent(Event.SoftShutdown) + FlowRemovalReason.SoftShutdown -> { /* No further tidy up is required */ } } } else { logger.warn("Flow $flowId re-finished") @@ -601,7 +648,9 @@ internal class SingleThreadedStateMachineManager( val events = mutableListOf() do { val event = oldEventQueue.tryReceive() - if (event is Event.Pause || event is Event.GeneratedByExternalEvent) events.add(event) + if (event is Event.Pause || event is Event.SoftShutdown || event is Event.GeneratedByExternalEvent) { + events.add(event) + } } while (event != null) // Only redeliver events if they were not persisted to the database @@ -967,14 +1016,16 @@ internal class SingleThreadedStateMachineManager( lastState: StateMachineState ) { drainFlowEventQueue(flow) - // Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition]) - startedFutures.remove(flow.fiber.id)?.set(Unit) flow.fiber.clientId?.let { - if (flow.fiber.isKilled) { + // If the flow was killed before fully initialising and persisting its initial checkpoint, + // then remove it from the client id map (removing the final proof of its existence from the node) + if (flow.fiber.isKilled && !flow.fiber.transientState.isAnyCheckpointPersisted) { clientIdsToFlowIds.remove(it) } else { setClientIdAsFailed(it, flow.fiber.id) } - } + } + // Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition]) + startedFutures.remove(flow.fiber.id)?.set(Unit) val flowError = removalReason.flowErrors[0] // TODO what to do with several? val exception = flowError.exception (exception as? FlowException)?.originalErrorId = flowError.errorId @@ -1030,8 +1081,9 @@ internal class SingleThreadedStateMachineManager( succeeded: Boolean ) { clientIdsToFlowIds.compute(clientId) { _, existingStatus -> - require(existingStatus != null && existingStatus is FlowWithClientIdStatus.Active) - FlowWithClientIdStatus.Removed(id, succeeded) + val status = requireNotNull(existingStatus) + require(existingStatus is FlowWithClientIdStatus.Active) + FlowWithClientIdStatus.Removed(flowId = id, user = status.user, succeeded = succeeded) } } @@ -1069,11 +1121,15 @@ internal class SingleThreadedStateMachineManager( } ) - override fun reattachFlowWithClientId(clientId: String): FlowStateMachineHandle? { + override fun reattachFlowWithClientId(clientId: String, user: Principal): FlowStateMachineHandle? { return innerState.withLock { clientIdsToFlowIds[clientId]?.let { - val existingFuture = activeOrRemovedClientIdFutureForReattach(it, clientId) - existingFuture?.let { uncheckedCast(existingFuture.get()) } + if (!it.isPermitted(user)) { + null + } else { + val existingFuture = activeOrRemovedClientIdFutureForReattach(it, clientId) + uncheckedCast(existingFuture?.let {existingFuture.get() }) + } } } } @@ -1110,11 +1166,11 @@ internal class SingleThreadedStateMachineManager( } } - override fun removeClientId(clientId: String): Boolean { + override fun removeClientId(clientId: String, user: Principal, isAdmin: Boolean): Boolean { var removedFlowId: StateMachineRunId? = null innerState.withLock { clientIdsToFlowIds.computeIfPresent(clientId) { _, existingStatus -> - if (existingStatus is FlowWithClientIdStatus.Removed) { + if (existingStatus is FlowWithClientIdStatus.Removed && (existingStatus.isPermitted(user) || isAdmin)) { removedFlowId = existingStatus.flowId null } else { @@ -1129,9 +1185,10 @@ internal class SingleThreadedStateMachineManager( return false } - override fun finishedFlowsWithClientIds(): Map { + override fun finishedFlowsWithClientIds(user: Principal, isAdmin: Boolean): Map { return innerState.withLock { clientIdsToFlowIds.asSequence() + .filter { (_, status) -> status.isPermitted(user) || isAdmin } .filter { (_, status) -> status is FlowWithClientIdStatus.Removed } .map { (clientId, status) -> clientId to (status as FlowWithClientIdStatus.Removed).succeeded } .toMap() diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 05887df101..f4de47ced0 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -12,6 +12,7 @@ import net.corda.core.utilities.Try import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.ReceivedMessage import rx.Observable +import java.security.Principal /** * A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects. @@ -112,14 +113,14 @@ interface StateMachineManager { * * @param clientId The client id relating to an existing flow */ - fun reattachFlowWithClientId(clientId: String): FlowStateMachineHandle? + fun reattachFlowWithClientId(clientId: String, user: Principal): FlowStateMachineHandle? /** * Removes a flow's [clientId] to result/ exception mapping. * * @return whether the mapping was removed. */ - fun removeClientId(clientId: String): Boolean + fun removeClientId(clientId: String, user: Principal, isAdmin: Boolean): Boolean /** * Returns all finished flows that were started with a client id. @@ -127,7 +128,7 @@ interface StateMachineManager { * @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully, * [false] if completed exceptionally. */ - fun finishedFlowsWithClientIds(): Map + fun finishedFlowsWithClientIds(user: Principal, isAdmin: Boolean): Map } // These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 5293616cef..df17288549 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -23,6 +23,7 @@ import net.corda.core.serialization.internal.checkpointDeserialize import net.corda.core.utilities.Try import net.corda.node.services.messaging.DeduplicationHandler import java.lang.IllegalStateException +import java.security.Principal import java.time.Instant import java.util.concurrent.Future import java.util.concurrent.Semaphore @@ -424,16 +425,21 @@ sealed class SubFlowVersion { data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion() } -sealed class FlowWithClientIdStatus(val flowId: StateMachineRunId) { +sealed class FlowWithClientIdStatus(val flowId: StateMachineRunId, val user: Principal) { + + fun isPermitted(user: Principal): Boolean = user.name == this.user.name + class Active( flowId: StateMachineRunId, + user: Principal, val flowStateMachineFuture: CordaFuture> - ) : FlowWithClientIdStatus(flowId) + ) : FlowWithClientIdStatus(flowId, user) - class Removed(flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus(flowId) + class Removed(flowId: StateMachineRunId, user: Principal, val succeeded: Boolean) : FlowWithClientIdStatus(flowId, user) } data class FlowResultMetadata( val status: Checkpoint.FlowStatus, - val clientId: String? + val clientId: String?, + val user: Principal ) \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt index f80733f9c5..41e6c2f0a6 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt @@ -3,11 +3,13 @@ package net.corda.node.services.statemachine.transitions import net.corda.core.flows.FlowException import net.corda.core.flows.KilledFlowException import net.corda.node.services.statemachine.Action +import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.services.statemachine.ErrorSessionMessage import net.corda.node.services.statemachine.Event import net.corda.node.services.statemachine.FlowError import net.corda.node.services.statemachine.FlowRemovalReason +import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.SessionId import net.corda.node.services.statemachine.SessionState import net.corda.node.services.statemachine.StateMachineState @@ -29,24 +31,34 @@ class KilledFlowTransition( startingState.checkpoint.checkpointState.sessions, errorMessages ) + + val newCheckpoint = startingState.checkpoint.copy( + status = Checkpoint.FlowStatus.KILLED, + flowState = FlowState.Finished, + checkpointState = startingState.checkpoint.checkpointState.copy(sessions = newSessions) + ) + currentState = currentState.copy( - checkpoint = startingState.checkpoint.setSessions(sessions = newSessions), + checkpoint = newCheckpoint, pendingDeduplicationHandlers = emptyList(), isRemoved = true ) - actions += Action.PropagateErrors( - errorMessages, - initiatedSessions, - startingState.senderUUID - ) + + actions += Action.PropagateErrors(errorMessages, initiatedSessions, startingState.senderUUID) if (!startingState.isFlowResumed) { actions += Action.CreateTransaction } - // The checkpoint and soft locks are also removed directly in [StateMachineManager.killFlow] - if (startingState.isAnyCheckpointPersisted) { + + // The checkpoint is updated/removed and soft locks are removed directly in [StateMachineManager.killFlow] as well + if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) { actions += Action.RemoveCheckpoint(context.id, mayHavePersistentResults = true) + } else if (startingState.isAnyCheckpointPersisted) { + actions += Action.UpdateFlowStatus(context.id, Checkpoint.FlowStatus.KILLED) + actions += Action.RemoveFlowException(context.id) + actions += Action.AddFlowException(context.id, killedFlowError.exception) } + actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers) actions += Action.ReleaseSoftLocks(context.id.uuid) actions += Action.CommitTransaction(currentState) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 85af30f898..a6a3495466 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -52,7 +52,7 @@ class StartedFlowTransition( is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition() is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest) FlowIORequest.ForceCheckpoint -> executeForceCheckpoint() - }.let { scheduleTerminateSessionsIfRequired(it) } + }.let { terminateSessionsIfRequired(it) } } private fun waitForSessionConfirmationsTransition(): TransitionResult { @@ -426,7 +426,7 @@ class StartedFlowTransition( private fun collectEndedSessionErrors(sessionIds: Collection, checkpoint: Checkpoint): List { return sessionIds.filter { sessionId -> - !checkpoint.checkpointState.sessions.containsKey(sessionId) + sessionId !in checkpoint.checkpointState.sessions }.map {sessionId -> UnexpectedFlowEndException( "Tried to access ended session $sessionId", @@ -525,7 +525,7 @@ class StartedFlowTransition( return builder { resumeFlowLogic(Unit) } } - private fun scheduleTerminateSessionsIfRequired(transition: TransitionResult): TransitionResult { + private fun terminateSessionsIfRequired(transition: TransitionResult): TransitionResult { // If there are sessions to be closed, close them on the currently executing transition val sessionsToBeTerminated = findSessionsToBeTerminated(transition.newState) return if (sessionsToBeTerminated.isNotEmpty()) { diff --git a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt index 029f4c5810..68c2df4307 100644 --- a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt @@ -2,7 +2,6 @@ package net.corda.node import co.paralleluniverse.fibers.Suspendable import net.corda.client.rpc.PermissionException -import net.corda.client.rpc.RPCException import net.corda.core.context.AuthServiceId import net.corda.core.context.InvocationContext import net.corda.core.contracts.Amount @@ -41,6 +40,7 @@ import net.corda.node.services.Permissions.Companion.invokeRpc import net.corda.node.services.Permissions.Companion.startFlow import net.corda.node.services.rpc.CURRENT_RPC_CONTEXT import net.corda.node.services.rpc.RpcAuthContext +import net.corda.nodeapi.exceptions.MissingAttachmentException import net.corda.nodeapi.exceptions.NonRpcFlowException import net.corda.nodeapi.internal.config.User import net.corda.testing.core.ALICE_NAME @@ -361,7 +361,7 @@ class CordaRPCOpsImplTest { withPermissions(invokeRpc(CordaRPCOps::openAttachment)) { assertThatThrownBy { rpc.openAttachment(SecureHash.zeroHash) - }.isInstanceOf(RPCException::class.java) + }.isInstanceOf(MissingAttachmentException::class.java) .withFailMessage("Unable to open attachment with id: ${SecureHash.zeroHash}") } } diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt index 8ef62bbcd9..d6b1f30bb0 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt @@ -65,6 +65,8 @@ import org.junit.Test import rx.schedulers.TestScheduler import java.io.IOException import java.net.URL +import java.nio.file.FileSystem +import java.nio.file.Path import java.security.KeyPair import java.time.Instant import java.time.temporal.ChronoUnit @@ -81,11 +83,12 @@ class NetworkMapUpdaterTest { val testSerialization = SerializationEnvironmentRule(true) private val cacheExpiryMs = 1000 private val privateNetUUID = UUID.randomUUID() - private val fs = Jimfs.newFileSystem(unix()) - private val baseDir = fs.getPath("/node") - private val nodeInfoDir = baseDir / NODE_INFO_DIRECTORY + private lateinit var fs: FileSystem + private lateinit var baseDir: Path + private val nodeInfoDir + get() = baseDir / NODE_INFO_DIRECTORY private val scheduler = TestScheduler() - private val fileWatcher = NodeInfoWatcher(baseDir, scheduler) + private lateinit var fileWatcher: NodeInfoWatcher private val nodeReadyFuture = openFuture() private val networkMapCache = createMockNetworkMapCache() private lateinit var ourKeyPair: KeyPair @@ -97,6 +100,14 @@ class NetworkMapUpdaterTest { @Before fun setUp() { + // Register providers before creating Jimfs filesystem. JimFs creates an SSHD instance which + // register BouncyCastle and EdDSA provider separately, which wrecks havoc. + Crypto.registerProviders() + + fs = Jimfs.newFileSystem(unix()) + baseDir = fs.getPath("/node") + fileWatcher = NodeInfoWatcher(baseDir, scheduler) + ourKeyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME) ourNodeInfo = createNodeInfoAndSigned("Our info", ourKeyPair).signed server = NetworkMapServer(cacheExpiryMs.millis) diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkParametersReaderTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkParametersReaderTest.kt index 70e4b0e55a..db735b7ec4 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkParametersReaderTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkParametersReaderTest.kt @@ -3,6 +3,7 @@ package net.corda.node.services.network import com.google.common.jimfs.Configuration import com.google.common.jimfs.Jimfs import net.corda.core.identity.CordaX500Name +import net.corda.core.crypto.Crypto import net.corda.core.internal.* import net.corda.core.serialization.deserialize import net.corda.core.utilities.days @@ -37,7 +38,7 @@ class NetworkParametersReaderTest { @JvmField val testSerialization = SerializationEnvironmentRule(true) - private val fs: FileSystem = Jimfs.newFileSystem(Configuration.unix()) + private lateinit var fs: FileSystem private val cacheTimeout = 100000.seconds private lateinit var server: NetworkMapServer @@ -45,6 +46,11 @@ class NetworkParametersReaderTest { @Before fun setUp() { + // Register providers before creating Jimfs filesystem. JimFs creates an SSHD instance which + // register BouncyCastle and EdDSA provider separately, which wrecks havoc. + Crypto.registerProviders() + + fs = Jimfs.newFileSystem(Configuration.unix()) server = NetworkMapServer(cacheTimeout) val address = server.start() networkMapClient = NetworkMapClient(URL("http://$address"), VersionInfo(1, "TEST", "TEST", "TEST")) @@ -127,4 +133,5 @@ class NetworkParametersReaderTest { netParamsForNode.verifiedNetworkParametersCert(DEV_ROOT_CA.certificate) } } -} \ No newline at end of file +} + diff --git a/node/src/test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt index 2a9f450cb0..0668bfb431 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt @@ -2,6 +2,7 @@ package net.corda.node.services.network import com.google.common.jimfs.Configuration import com.google.common.jimfs.Jimfs +import net.corda.core.crypto.Crypto import net.corda.core.internal.NODE_INFO_DIRECTORY import net.corda.core.internal.createDirectories import net.corda.core.internal.div @@ -49,6 +50,10 @@ class NodeInfoWatcherTest { @Before fun start() { + // Register providers before creating Jimfs filesystem. JimFs creates an SSHD instance which + // register BouncyCastle and EdDSA provider separately, which wrecks havoc. + Crypto.registerProviders() + nodeInfoAndSigned = createNodeInfoAndSigned(ALICE_NAME) val identityService = makeTestIdentityService() keyManagementService = MockKeyManagementService(identityService) diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/NodeAttachmentServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/persistence/NodeAttachmentServiceTest.kt index 801c917c35..4c1e8d2bac 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/NodeAttachmentServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/NodeAttachmentServiceTest.kt @@ -7,6 +7,7 @@ import com.google.common.jimfs.Jimfs import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever import net.corda.core.contracts.ContractAttachment +import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sha256 import net.corda.core.flows.FlowLogic @@ -67,6 +68,10 @@ class NodeAttachmentServiceTest { @Before fun setUp() { + // Register providers before creating Jimfs filesystem. JimFs creates an SSHD instance which + // register BouncyCastle and EdDSA provider separately, which wrecks havoc. + Crypto.registerProviders() + LogHelper.setLevel(PersistentUniquenessProvider::class) val dataSourceProperties = makeTestDataSourceProperties() @@ -90,6 +95,7 @@ class NodeAttachmentServiceTest { @After fun tearDown() { database.close() + fs.close() } @Test(timeout=300_000) diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt index 0665fea236..3ff6916e10 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt @@ -4,11 +4,14 @@ import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.concurrent.Semaphore import net.corda.core.CordaRuntimeException import net.corda.core.flows.FlowLogic +import net.corda.core.flows.HospitalizeFlowException import net.corda.core.flows.KilledFlowException +import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.FlowIORequest import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.internal.concurrent.transpose import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.minutes import net.corda.core.utilities.seconds import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.testing.core.ALICE_NAME @@ -18,6 +21,7 @@ import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.InternalMockNodeParameters import net.corda.testing.node.internal.TestStartedNode +import net.corda.testing.node.internal.newContext import net.corda.testing.node.internal.startFlow import net.corda.testing.node.internal.startFlowWithClientId import org.assertj.core.api.Assertions.assertThatExceptionOfType @@ -27,14 +31,17 @@ import org.junit.Before import org.junit.Test import rx.Observable import java.sql.SQLTransientConnectionException +import java.time.Duration +import java.time.Instant import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger import kotlin.concurrent.thread import kotlin.test.assertEquals import kotlin.test.assertFailsWith -import kotlin.test.assertNotEquals +import kotlin.test.assertFalse import kotlin.test.assertNull import kotlin.test.assertTrue @@ -242,9 +249,8 @@ class FlowClientIdTests { } @Test(timeout = 300_000) - fun `killing a flow, removes the flow from the client id mapping`() { + fun `killing a flow, sets the flow status to killed and adds an exception to the database`() { var counter = 0 - val flowIsRunning = Semaphore(0) val waitUntilFlowIsRunning = Semaphore(0) ResultFlow.suspendableHook = object : FlowLogic() { var firstRun = true @@ -255,7 +261,7 @@ class FlowClientIdTests { if (firstRun) { firstRun = false waitUntilFlowIsRunning.release() - flowIsRunning.acquire() + sleep(1.minutes) } } } @@ -266,16 +272,66 @@ class FlowClientIdTests { flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) waitUntilFlowIsRunning.acquire() aliceNode.internals.smm.killFlow(flowHandle0!!.id) - flowIsRunning.release() flowHandle0!!.resultFuture.getOrThrow() } - // a new flow will start since the client id mapping was removed when flow got killed val flowHandle1: FlowStateMachineHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) - flowHandle1.resultFuture.getOrThrow() + assertFailsWith { + flowHandle1.resultFuture.getOrThrow() + } - assertNotEquals(flowHandle0!!.id, flowHandle1.id) - assertEquals(2, counter) + assertEquals(flowHandle0!!.id, flowHandle1.id) + assertEquals(1, counter) + assertTrue(aliceNode.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED)) + assertTrue(aliceNode.hasException(flowHandle0!!.id)) + } + + @Test(timeout = 300_000) + fun `killing a hospitalized flow, sets the flow status to killed and adds an exception to the database`() { + val clientId = UUID.randomUUID().toString() + + var flowHandle0: FlowStateMachineHandle? = null + assertFailsWith { + flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow()) + aliceNode.waitForOvernightObservation(flowHandle0!!.id, 20.seconds) + aliceNode.internals.smm.killFlow(flowHandle0!!.id) + flowHandle0!!.resultFuture.getOrThrow() + } + + val flowHandle1: FlowStateMachineHandle = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow()) + assertFailsWith { + flowHandle1.resultFuture.getOrThrow() + } + + assertEquals(flowHandle0!!.id, flowHandle1.id) + assertTrue(aliceNode.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED)) + assertTrue(aliceNode.hasException(flowHandle0!!.id)) + } + + @Test(timeout = 300_000) + fun `killing a flow twice does nothing`() { + val clientId = UUID.randomUUID().toString() + + var flowHandle0: FlowStateMachineHandle? = null + assertFailsWith { + flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow()) + aliceNode.waitForOvernightObservation(flowHandle0!!.id, 20.seconds) + aliceNode.internals.smm.killFlow(flowHandle0!!.id) + flowHandle0!!.resultFuture.getOrThrow() + } + + val flowHandle1: FlowStateMachineHandle = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow()) + assertFailsWith { + flowHandle1.resultFuture.getOrThrow() + } + + assertEquals(flowHandle0!!.id, flowHandle1.id) + assertTrue(aliceNode.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED)) + assertTrue(aliceNode.hasException(flowHandle0!!.id)) + + assertFalse(aliceNode.internals.smm.killFlow(flowHandle0!!.id)) + assertTrue(aliceNode.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED)) + assertTrue(aliceNode.hasException(flowHandle0!!.id)) } @Test(timeout = 300_000) @@ -285,7 +341,7 @@ class FlowClientIdTests { ResultFlow.hook = { counter++ } val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) flowHandle0.resultFuture.getOrThrow(20.seconds) - val removed = aliceNode.smm.removeClientId(clientId) + val removed = aliceNode.smm.removeClientId(clientId, aliceNode.user, false) // On new request with clientId, after the same clientId was removed, a brand new flow will start with that clientId val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) flowHandle1.resultFuture.getOrThrow(20.seconds) @@ -308,7 +364,7 @@ class FlowClientIdTests { assertEquals(1, findRecordsFromDatabase().size) } - aliceNode.smm.removeClientId(clientId) + aliceNode.smm.removeClientId(clientId, aliceNode.user, false) // assert database status after remove aliceNode.services.database.transaction { @@ -319,7 +375,7 @@ class FlowClientIdTests { } } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `removing a client id exception clears resources properly`() { val clientId = UUID.randomUUID().toString() ResultFlow.hook = { throw IllegalStateException() } @@ -334,7 +390,7 @@ class FlowClientIdTests { assertEquals(1, findRecordsFromDatabase().size) } - aliceNode.smm.removeClientId(clientId) + aliceNode.smm.removeClientId(clientId, aliceNode.user, false) // assert database status after remove aliceNode.services.database.transaction { @@ -345,7 +401,7 @@ class FlowClientIdTests { } } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `flow's client id mapping can only get removed once the flow gets removed`() { val clientId = UUID.randomUUID().toString() var tries = 0 @@ -362,7 +418,7 @@ class FlowClientIdTests { var removed = false while (!removed) { - removed = aliceNode.smm.removeClientId(clientId) + removed = aliceNode.smm.removeClientId(clientId, aliceNode.user, false) if (!removed) ++failedRemovals ++tries if (tries >= maxTries) { @@ -581,7 +637,7 @@ class FlowClientIdTests { assertEquals("Flow's ${flowHandle0!!.id} exception was not found in the database. Something is very wrong.", e.message) } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `completed flow started with a client id nulls its flow state in database after its lifetime`() { val clientId = UUID.randomUUID().toString() val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) @@ -593,7 +649,7 @@ class FlowClientIdTests { } } - @Test(timeout=300_000) + @Test(timeout = 300_000) fun `failed flow started with a client id nulls its flow state in database after its lifetime`() { val clientId = UUID.randomUUID().toString() ResultFlow.hook = { throw IllegalStateException() } @@ -609,11 +665,12 @@ class FlowClientIdTests { assertNull(dbFlowCheckpoint!!.blob!!.flowStack) } } + @Test(timeout = 300_000) fun `reattachFlowWithClientId can retrieve existing flow future`() { val clientId = UUID.randomUUID().toString() val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)) - val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId) + val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId, aliceNode.user) assertEquals(10, flowHandle.resultFuture.getOrThrow(20.seconds)) assertEquals(clientId, flowHandle.clientId) @@ -625,7 +682,7 @@ class FlowClientIdTests { fun `reattachFlowWithClientId can retrieve a null result from a flow future`() { val clientId = UUID.randomUUID().toString() val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(null)) - val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId) + val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId, aliceNode.user) assertEquals(null, flowHandle.resultFuture.getOrThrow(20.seconds)) assertEquals(clientId, flowHandle.clientId) @@ -641,7 +698,7 @@ class FlowClientIdTests { assertEquals(10, flowHandle.resultFuture.getOrThrow(20.seconds)) assertEquals(clientId, flowHandle.clientId) - val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId) + val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId, aliceNode.user) assertEquals(flowHandle.id, reattachedFlowHandle?.id) assertEquals(flowHandle.resultFuture.get(), reattachedFlowHandle?.resultFuture?.get()) @@ -649,7 +706,7 @@ class FlowClientIdTests { @Test(timeout = 300_000) fun `reattachFlowWithClientId returns null if no flow matches the client id`() { - assertEquals(null, aliceNode.smm.reattachFlowWithClientId(UUID.randomUUID().toString())) + assertEquals(null, aliceNode.smm.reattachFlowWithClientId(UUID.randomUUID().toString(), aliceNode.user)) } @Test(timeout = 300_000) @@ -657,7 +714,7 @@ class FlowClientIdTests { ResultFlow.hook = { throw IllegalStateException("Bla bla bla") } val clientId = UUID.randomUUID().toString() val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)) - val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId) + val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId, aliceNode.user) assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy { flowHandle.resultFuture.getOrThrow(20.seconds) @@ -678,7 +735,7 @@ class FlowClientIdTests { flowHandle.resultFuture.getOrThrow(20.seconds) }.withMessage("Bla bla bla") - val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId) + val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId(clientId, aliceNode.user) // [CordaRunTimeException] returned because [IllegalStateException] is not serializable assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy { @@ -686,6 +743,22 @@ class FlowClientIdTests { }.withMessage("java.lang.IllegalStateException: Bla bla bla") } + @Test(timeout = 300_000) + fun `reattachFlowWithClientId can retrieve exception from killed flow`() { + val clientId = UUID.randomUUID().toString() + var flowHandle0: FlowStateMachineHandle + assertFailsWith { + flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow()) + aliceNode.waitForOvernightObservation(flowHandle0.id, 20.seconds) + aliceNode.internals.smm.killFlow(flowHandle0.id) + flowHandle0.resultFuture.getOrThrow() + } + + assertFailsWith { + aliceNode.smm.reattachFlowWithClientId(clientId, aliceNode.user)?.resultFuture?.getOrThrow() + } + } + @Test(timeout = 300_000) fun `finishedFlowsWithClientIds returns completed flows with client ids`() { val clientIds = listOf("a", "b", "c", "d", "e") @@ -708,7 +781,7 @@ class FlowClientIdTests { flows.map { it.resultFuture }.transpose().getOrThrow(30.seconds) assertFailsWith { failedFlow.resultFuture.getOrThrow(20.seconds) } - val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds() + val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds(aliceNode.user, false) lock.countDown() @@ -720,42 +793,126 @@ class FlowClientIdTests { assertEquals( listOf(10, 10, 10), - finishedFlows.filterValues { it }.map { aliceNode.smm.reattachFlowWithClientId(it.key)?.resultFuture?.get() } + finishedFlows.filterValues { it } + .map { aliceNode.smm.reattachFlowWithClientId(it.key, aliceNode.user)?.resultFuture?.get() } ) // [CordaRunTimeException] returned because [IllegalStateException] is not serializable assertFailsWith { - finishedFlows.filterValues { !it }.map { aliceNode.smm.reattachFlowWithClientId(it.key)?.resultFuture?.getOrThrow() } + finishedFlows.filterValues { !it } + .map { aliceNode.smm.reattachFlowWithClientId(it.key, aliceNode.user)?.resultFuture?.getOrThrow() } } } -} -internal class ResultFlow(private val result: A): FlowLogic() { - companion object { - var hook: ((String?) -> Unit)? = null - var suspendableHook: FlowLogic? = null + @Test(timeout = 300_000) + fun `finishedFlowsWithClientIds returns exception for killed flows`() { + val clientId = UUID.randomUUID().toString() + var flowHandle0: FlowStateMachineHandle + assertFailsWith { + flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow()) + aliceNode.waitForOvernightObservation(flowHandle0.id, 20.seconds) + aliceNode.internals.smm.killFlow(flowHandle0.id) + flowHandle0.resultFuture.getOrThrow() + } + + val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds(aliceNode.user, false) + + assertFailsWith { + finishedFlows.keys.single() + .let { aliceNode.smm.reattachFlowWithClientId(it, aliceNode.user)?.resultFuture?.getOrThrow() } + } } - @Suspendable - override fun call(): A { - hook?.invoke(stateMachine.clientId) - suspendableHook?.let { subFlow(it) } - return result - } -} + private val TestStartedNode.user get() = services.newContext().principal() -internal class UnSerializableResultFlow: FlowLogic() { - companion object { - var firstRun = true + private fun TestStartedNode.hasStatus(id: StateMachineRunId, status: Checkpoint.FlowStatus): Boolean { + return services.database.transaction { + services.jdbcSession().prepareStatement("select count(*) from node_checkpoints where status = ? and flow_id = ?") + .apply { + setInt(1, status.ordinal) + setString(2, id.uuid.toString()) + } + .use { ps -> + ps.executeQuery().use { rs -> + rs.next() + rs.getLong(1) + } + }.toInt() == 1 + } } - @Suspendable - override fun call(): Any { - stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) - return if (firstRun) { - firstRun = false - Observable.empty() - } else { - 5 // serializable result + private fun TestStartedNode.hasException(id: StateMachineRunId): Boolean { + return services.database.transaction { + services.jdbcSession().prepareStatement("select count(*) from node_flow_exceptions where flow_id = ?") + .apply { setString(1, id.uuid.toString()) } + .use { ps -> + ps.executeQuery().use { rs -> + rs.next() + rs.getLong(1) + } + }.toInt() == 1 + } + } + + private fun TestStartedNode.waitForOvernightObservation(id: StateMachineRunId, timeout: Duration) { + val timeoutTime = Instant.now().plusSeconds(timeout.seconds) + var exists = false + while (Instant.now().isBefore(timeoutTime) && !exists) { + services.database.transaction { + exists = services.jdbcSession().prepareStatement("select count(*) from node_checkpoints where status = ? and flow_id = ?") + .apply { + setInt(1, Checkpoint.FlowStatus.HOSPITALIZED.ordinal) + setString(2, id.uuid.toString()) + } + .use { ps -> + ps.executeQuery().use { rs -> + rs.next() + rs.getLong(1) + } + }.toInt() == 1 + Thread.sleep(1.seconds.toMillis()) + } + } + if (!exists) { + throw TimeoutException("Flow was not kept for observation during timeout duration") + } + } + + internal class ResultFlow(private val result: A) : FlowLogic() { + companion object { + var hook: ((String?) -> Unit)? = null + var suspendableHook: FlowLogic? = null + } + + @Suspendable + override fun call(): A { + hook?.invoke(stateMachine.clientId) + suspendableHook?.let { subFlow(it) } + return result + } + } + + internal class UnSerializableResultFlow : FlowLogic() { + companion object { + var firstRun = true + } + + @Suspendable + override fun call(): Any { + stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) + return if (firstRun) { + firstRun = false + Observable.empty() + } else { + 5 // serializable result + } + } + } + + internal class HospitalizeFlow : FlowLogic() { + + @Suspendable + override fun call() { + throw HospitalizeFlowException("time to go to the doctors") } } } \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index 3d4ddfab46..e30462d2bf 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -134,8 +134,8 @@ class RetryFlowMockTest { Assume.assumeTrue(!IS_OPENJ9) val partyB = nodeB.info.legalIdentities.first() assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy { - nodeA.startFlow(UnbalancedSendAndReceiveFlow(partyB)).getOrThrow(20.seconds) - }.withMessage("Received session end message instead of a data session message. Mismatched send and receive?") + nodeA.startFlow(UnbalancedSendAndReceiveFlow(partyB)).getOrThrow(60.seconds) + } } @Test(timeout=300_000) diff --git a/node/src/test/kotlin/net/corda/node/utilities/registration/NetworkRegistrationHelperTest.kt b/node/src/test/kotlin/net/corda/node/utilities/registration/NetworkRegistrationHelperTest.kt index b17b437fad..7990cf7502 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/registration/NetworkRegistrationHelperTest.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/registration/NetworkRegistrationHelperTest.kt @@ -41,6 +41,7 @@ import org.junit.Before import org.junit.Test import java.lang.IllegalStateException import java.nio.file.Files +import java.nio.file.FileSystem import java.security.PublicKey import java.security.cert.CertPathValidatorException import java.security.cert.X509Certificate @@ -50,7 +51,7 @@ import kotlin.test.assertFalse import kotlin.test.assertTrue class NetworkRegistrationHelperTest { - private val fs = Jimfs.newFileSystem(unix()) + private lateinit var fs: FileSystem private val nodeLegalName = ALICE_NAME private lateinit var config: NodeConfiguration @@ -59,6 +60,11 @@ class NetworkRegistrationHelperTest { @Before fun init() { + // Register providers before creating Jimfs filesystem. JimFs creates an SSHD instance which + // register BouncyCastle and EdDSA provider separately, which wrecks havoc. + Crypto.registerProviders() + + fs = Jimfs.newFileSystem(unix()) val baseDirectory = fs.getPath("/baseDir").createDirectories() abstract class AbstractNodeConfiguration : NodeConfiguration diff --git a/settings.gradle b/settings.gradle index abcdd7c676..858ffcb7fe 100644 --- a/settings.gradle +++ b/settings.gradle @@ -101,6 +101,8 @@ include 'serialization-djvm:deserializers' include 'serialization-tests' include 'testing:cordapps:dbfailure:dbfcontracts' include 'testing:cordapps:dbfailure:dbfworkflows' +include 'testing:cordapps:missingmigration' +include 'testing:cordapps:sleeping' // Common libraries - start include 'common-validation' diff --git a/testing/cordapps/missingmigration/build.gradle b/testing/cordapps/missingmigration/build.gradle new file mode 100644 index 0000000000..e004a34255 --- /dev/null +++ b/testing/cordapps/missingmigration/build.gradle @@ -0,0 +1,16 @@ +apply plugin: 'kotlin' +//apply plugin: 'net.corda.plugins.cordapp' +//apply plugin: 'net.corda.plugins.quasar-utils' + +dependencies { + compile project(":core") +} + +jar { + baseName "testing-missingmigration-cordapp" + manifest { + // This JAR is part of Corda's testing framework. + // Driver will not include it as part of an out-of-process node. + attributes('Corda-Testing': true) + } +} \ No newline at end of file diff --git a/testing/cordapps/missingmigration/src/main/kotlin/net/corda/failtesting/missingmigrationcordapp/MissingMigrationSchema.kt b/testing/cordapps/missingmigration/src/main/kotlin/net/corda/failtesting/missingmigrationcordapp/MissingMigrationSchema.kt new file mode 100644 index 0000000000..d5ea338980 --- /dev/null +++ b/testing/cordapps/missingmigration/src/main/kotlin/net/corda/failtesting/missingmigrationcordapp/MissingMigrationSchema.kt @@ -0,0 +1,24 @@ +package net.corda.failtesting.missingmigrationcordapp + +import net.corda.core.schemas.MappedSchema +import net.corda.core.schemas.PersistentState +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Table + +object MissingMigrationSchema + +object MissingMigrationSchemaV1 : MappedSchema( + schemaFamily = MissingMigrationSchema.javaClass, + version = 1, + mappedTypes = listOf(MissingMigrationSchemaV1.TestEntity::class.java)) { + + @Entity + @Table(name = "test_table") + class TestEntity( + @Column(name = "random_value") + var randomValue: String + ) : PersistentState() { + constructor() : this("") + } +} \ No newline at end of file diff --git a/testing/cordapps/missingmigration/src/main/kotlin/net/corda/failtesting/missingmigrationcordapp/SimpleFlow.kt b/testing/cordapps/missingmigration/src/main/kotlin/net/corda/failtesting/missingmigrationcordapp/SimpleFlow.kt new file mode 100644 index 0000000000..efcf4b308a --- /dev/null +++ b/testing/cordapps/missingmigration/src/main/kotlin/net/corda/failtesting/missingmigrationcordapp/SimpleFlow.kt @@ -0,0 +1,13 @@ +package net.corda.failtesting.missingmigrationcordapp + +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.StartableByRPC + +@StartableByRPC +@InitiatingFlow +class SimpleFlow : FlowLogic() { + override fun call() { + logger.info("Running simple flow doing nothing") + } +} \ No newline at end of file diff --git a/testing/cordapps/missingmigration/src/main/kotlin/net/corda/failtesting/missingmigrationcordapp/TestEntity.kt b/testing/cordapps/missingmigration/src/main/kotlin/net/corda/failtesting/missingmigrationcordapp/TestEntity.kt new file mode 100644 index 0000000000..1f8a6ffc8b --- /dev/null +++ b/testing/cordapps/missingmigration/src/main/kotlin/net/corda/failtesting/missingmigrationcordapp/TestEntity.kt @@ -0,0 +1,16 @@ +package net.corda.failtesting.missingmigrationcordapp + +import net.corda.core.identity.AbstractParty +import net.corda.core.schemas.MappedSchema +import net.corda.core.schemas.PersistentState +import net.corda.core.schemas.QueryableState + +class TestEntity(val randomValue: String, override val participants: List) : QueryableState { + override fun supportedSchemas(): Iterable { + return listOf(MissingMigrationSchemaV1) + } + + override fun generateMappedObject(schema: MappedSchema): PersistentState { + return MissingMigrationSchemaV1.TestEntity(randomValue) + } +} \ No newline at end of file diff --git a/testing/cordapps/sleeping/build.gradle b/testing/cordapps/sleeping/build.gradle new file mode 100644 index 0000000000..04ee3472a8 --- /dev/null +++ b/testing/cordapps/sleeping/build.gradle @@ -0,0 +1,14 @@ +apply plugin: 'kotlin' + +dependencies { + compile project(":core") +} + +jar { + baseName "testing-sleeping-cordapp" + manifest { + // This JAR is part of Corda's testing framework. + // Driver will not include it as part of an out-of-process node. + attributes('Corda-Testing': true) + } +} \ No newline at end of file diff --git a/testing/cordapps/sleeping/src/main/kotlin/net/corda/sleeping/SleepingFlow.kt b/testing/cordapps/sleeping/src/main/kotlin/net/corda/sleeping/SleepingFlow.kt new file mode 100644 index 0000000000..d848b85313 --- /dev/null +++ b/testing/cordapps/sleeping/src/main/kotlin/net/corda/sleeping/SleepingFlow.kt @@ -0,0 +1,15 @@ +package net.corda.sleeping + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import java.time.Duration + +@StartableByRPC +class SleepingFlow(private val duration: Duration) : FlowLogic() { + + @Suspendable + override fun call() { + sleep(duration) + } +} \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index 874dddf8e0..64fc1b4950 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -11,8 +11,14 @@ import net.corda.core.flows.InitiatedBy import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate -import net.corda.core.internal.* +import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.NetworkParametersStorage +import net.corda.core.internal.PLATFORM_VERSION +import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.createDirectories +import net.corda.core.internal.div import net.corda.core.internal.notary.NotaryService +import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.RPCOps @@ -25,6 +31,9 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger import net.corda.core.utilities.hours import net.corda.core.utilities.seconds +import net.corda.coretesting.internal.rigorousMock +import net.corda.coretesting.internal.stubs.CertificateStoreStubs +import net.corda.coretesting.internal.testThreadFactory import net.corda.node.VersionInfo import net.corda.node.internal.AbstractNode import net.corda.node.internal.InitiatedFlowFactory @@ -32,7 +41,11 @@ import net.corda.node.internal.NodeFlowManager import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.StartedNodeServices -import net.corda.node.services.config.* +import net.corda.node.services.config.FlowTimeoutConfiguration +import net.corda.node.services.config.NetworkParameterAcceptanceSettings +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.NotaryConfig +import net.corda.node.services.config.VerifierType import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.BasicHSMKeyManagementService import net.corda.node.services.keys.KeyManagementServiceInternal @@ -49,11 +62,12 @@ import net.corda.nodeapi.internal.network.NetworkParametersCopier import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.testing.common.internal.testNetworkParameters -import net.corda.coretesting.internal.rigorousMock -import net.corda.coretesting.internal.stubs.CertificateStoreStubs -import net.corda.coretesting.internal.testThreadFactory -import net.corda.testing.node.* +import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.MockNetworkNotarySpec +import net.corda.testing.node.MockNetworkParameters +import net.corda.testing.node.MockNodeParameters import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import net.corda.testing.node.TestClock import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.sshd.common.util.security.SecurityUtils import rx.Observable @@ -377,7 +391,7 @@ open class InternalMockNetwork(cordappPackages: List = emptyList(), } override fun makeMessagingService(): MockNodeMessagingService { - return MockNodeMessagingService(configuration, serverThread).closeOnStop() + return MockNodeMessagingService(configuration, serverThread).closeOnStop(usesDatabase = false) } override fun startMessagingService(rpcOps: List, diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index 23508eafdd..ab8abadc26 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -631,7 +631,9 @@ object InteractiveShell { } @JvmStatic - fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps) { + fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps): Int { + + var result = 0 // assume it all went well fun display(statements: RenderPrintWriter.() -> Unit) { statements.invoke(userSessionOut) @@ -676,13 +678,16 @@ object InteractiveShell { // Cancelled whilst draining flows. So let's carry on from here cordaRPCOps.setFlowsDrainingModeEnabled(false) display { println("...cancelled clean shutdown.") } + result = 1 } } catch (e: Exception) { display { println("RPC failed: ${e.rootCause}", Decoration.bold, Color.red) } + result = 1 } finally { InputStreamSerializer.invokeContext = null InputStreamDeserializer.closeAll() } + return result; } private fun printAndFollowRPCResponse(