Merge branch 'release/os/4.6' into rfowler-os-4.6-ent-4.6-20200922

This commit is contained in:
Ryan Fowler 2020-09-22 11:42:00 +01:00
commit ed9f6f0ce1
49 changed files with 1319 additions and 239 deletions

View File

@ -108,7 +108,9 @@ pipeline {
"-Ddocker.container.env.parameter.CORDA_ARTIFACTORY_USERNAME=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " + "-Ddocker.container.env.parameter.CORDA_ARTIFACTORY_USERNAME=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Ddocker.container.env.parameter.CORDA_ARTIFACTORY_PASSWORD=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " + "-Ddocker.container.env.parameter.CORDA_ARTIFACTORY_PASSWORD=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Ddocker.dockerfile=DockerfileJDK11Azul" + "-Ddocker.dockerfile=DockerfileJDK11Azul" +
" clean pushBuildImage preAllocateForParallelRegressionTest preAllocateForAllParallelSlowIntegrationTest --stacktrace" " clean preAllocateForAllParallelUnitTest preAllocateForAllParallelIntegrationTest " +
" preAllocateForAllParallelSlowIntegrationTest preAllocateForAllParallelSmokeTest " +
" pushBuildImage --stacktrace"
} }
sh "kubectl auth can-i get pods" sh "kubectl auth can-i get pods"
} }
@ -116,7 +118,7 @@ pipeline {
stage('Testing phase') { stage('Testing phase') {
parallel { parallel {
stage('Regression Test') { stage('Unit Test') {
steps { steps {
sh "./gradlew " + sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " + "-DbuildId=\"\${BUILD_ID}\" " +
@ -126,7 +128,33 @@ pipeline {
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " + "-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " + "-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " + "-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" parallelRegressionTest --stacktrace" " allParallelUnitTest --stacktrace"
}
}
stage('Integration Test') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
"-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" allParallelIntegrationTest --stacktrace"
}
}
stage('Smoke Test') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
"-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" allParallelSmokeTest --stacktrace"
} }
} }
stage('Slow Integration Test') { stage('Slow Integration Test') {

View File

@ -340,9 +340,9 @@ allprojects {
attributes('Corda-Docs-Link': corda_docs_link) attributes('Corda-Docs-Link': corda_docs_link)
} }
} }
tasks.withType(Test).configureEach { tasks.withType(Test).configureEach {
forkEvery = 10 forkEvery = 20
ignoreFailures = project.hasProperty('tests.ignoreFailures') ? project.property('tests.ignoreFailures').toBoolean() : false ignoreFailures = project.hasProperty('tests.ignoreFailures') ? project.property('tests.ignoreFailures').toBoolean() : false
failFast = project.hasProperty('tests.failFast') ? project.property('tests.failFast').toBoolean() : false failFast = project.hasProperty('tests.failFast') ? project.property('tests.failFast').toBoolean() : false

View File

@ -65,6 +65,9 @@ processSmokeTestResources {
from(project(':finance:contracts').tasks['jar']) { from(project(':finance:contracts').tasks['jar']) {
rename '.*finance-contracts-.*', 'cordapp-finance-contracts.jar' rename '.*finance-contracts-.*', 'cordapp-finance-contracts.jar'
} }
from(project(':testing:cordapps:sleeping').tasks['jar']) {
rename 'testing-sleeping-cordapp-*', 'cordapp-sleeping.jar'
}
} }
// To find potential version conflicts, run "gradle htmlDependencyReport" and then look in // To find potential version conflicts, run "gradle htmlDependencyReport" and then look in
@ -94,6 +97,7 @@ dependencies {
smokeTestCompile project(':smoke-test-utils') smokeTestCompile project(':smoke-test-utils')
smokeTestCompile project(':finance:contracts') smokeTestCompile project(':finance:contracts')
smokeTestCompile project(':finance:workflows') smokeTestCompile project(':finance:workflows')
smokeTestCompile project(':testing:cordapps:sleeping')
smokeTestCompile "org.apache.logging.log4j:log4j-slf4j-impl:$log4j_version" smokeTestCompile "org.apache.logging.log4j:log4j-slf4j-impl:$log4j_version"
smokeTestCompile "org.apache.logging.log4j:log4j-core:$log4j_version" smokeTestCompile "org.apache.logging.log4j:log4j-core:$log4j_version"
smokeTestCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version" smokeTestCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"

View File

@ -561,6 +561,40 @@ class CordaRPCClientReconnectionTest {
} }
} }
@Test(timeout=300_000)
fun `rpc re-attaches to client id flow on node restart with flows draining mode on`() {
driver(DriverParameters(inMemoryDB = false, cordappsForAllNodes = listOf(this.enclosedCordapp()))) {
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
fun startNode(additionalCustomOverrides: Map<String, Any?> = emptyMap()): NodeHandle {
return startNode(
providedName = CHARLIE_NAME,
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
customOverrides = mapOf("rpcSettings.address" to address.toString()) + additionalCustomOverrides
).getOrThrow()
}
val node = startNode()
val client = CordaRPCClient(node.rpcAddress, config)
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
val clientId = UUID.randomUUID().toString()
val flowHandle0 = rpcOps.startFlowWithClientId(clientId, ::SimpleFlow)
node.rpc.setFlowsDrainingModeEnabled(true)
node.stop()
thread {
sleep(1000)
startNode()
}
val result0 = flowHandle0.returnValue.getOrThrow()
assertEquals(5, result0)
assertThat(rpcOps.reconnectingRPCConnection.isClosed())
}
}
}
@StartableByRPC @StartableByRPC
class SimpleFlow : FlowLogic<Int>() { class SimpleFlow : FlowLogic<Int>() {

View File

@ -319,7 +319,7 @@ class ReconnectingCordaRPCOps private constructor(
checkIfClosed() checkIfClosed()
var remainingAttempts = maxNumberOfAttempts var remainingAttempts = maxNumberOfAttempts
var lastException: Throwable? = null var lastException: Throwable? = null
while (remainingAttempts != 0 && !reconnectingRPCConnection.isClosed()) { loop@ while (remainingAttempts != 0 && !reconnectingRPCConnection.isClosed()) {
try { try {
log.debug { "Invoking RPC $method..." } log.debug { "Invoking RPC $method..." }
return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also { return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also {

View File

@ -8,34 +8,49 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.InputStreamAndHash import net.corda.core.internal.InputStreamAndHash
import net.corda.core.messaging.* import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.messaging.vaultQueryBy
import net.corda.core.messaging.vaultTrackBy
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.services.Vault import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.* import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.node.services.vault.SortAttribute
import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.finance.DOLLARS import net.corda.finance.DOLLARS
import net.corda.finance.POUNDS import net.corda.finance.POUNDS
import net.corda.finance.SWISS_FRANCS import net.corda.finance.SWISS_FRANCS
import net.corda.finance.USD import net.corda.finance.USD
import net.corda.finance.contracts.asset.Cash import net.corda.finance.contracts.asset.Cash
import net.corda.finance.workflows.getCashBalance
import net.corda.finance.workflows.getCashBalances
import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow import net.corda.finance.flows.CashPaymentFlow
import net.corda.finance.workflows.getCashBalance
import net.corda.finance.workflows.getCashBalances
import net.corda.java.rpc.StandaloneCordaRPCJavaClientTest import net.corda.java.rpc.StandaloneCordaRPCJavaClientTest
import net.corda.nodeapi.internal.config.User import net.corda.nodeapi.internal.config.User
import net.corda.sleeping.SleepingFlow
import net.corda.smoketesting.NodeConfig import net.corda.smoketesting.NodeConfig
import net.corda.smoketesting.NodeProcess import net.corda.smoketesting.NodeProcess
import org.apache.commons.io.output.NullOutputStream import org.apache.commons.io.output.NullOutputStream
import org.hamcrest.text.MatchesPattern import org.hamcrest.text.MatchesPattern
import org.junit.* import org.junit.After
import org.junit.Before
import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
import org.junit.rules.ExpectedException import org.junit.rules.ExpectedException
import java.io.FilterInputStream import java.io.FilterInputStream
import java.io.InputStream import java.io.InputStream
import java.util.* import java.util.Currency
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern import java.util.regex.Pattern
@ -242,7 +257,7 @@ class StandaloneCordaRPClientTest {
exception.expect(PermissionException::class.java) exception.expect(PermissionException::class.java)
exception.expectMessage(MatchesPattern(Pattern.compile("User not authorized to perform RPC call .*killFlow.*"))) exception.expectMessage(MatchesPattern(Pattern.compile("User not authorized to perform RPC call .*killFlow.*")))
val flowHandle = rpcProxy.startFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), notaryNodeIdentity) val flowHandle = rpcProxy.startFlow(::SleepingFlow, 1.minutes)
notary.connect(nonUser).use { connection -> notary.connect(nonUser).use { connection ->
val rpcProxy = connection.proxy val rpcProxy = connection.proxy
rpcProxy.killFlow(flowHandle.id) rpcProxy.killFlow(flowHandle.id)
@ -251,7 +266,7 @@ class StandaloneCordaRPClientTest {
@Test(timeout=300_000) @Test(timeout=300_000)
fun `test kill flow with killFlow permission`() { fun `test kill flow with killFlow permission`() {
val flowHandle = rpcProxy.startFlow(::CashIssueFlow, 83.DOLLARS, OpaqueBytes.of(0), notaryNodeIdentity) val flowHandle = rpcProxy.startFlow(::SleepingFlow, 1.minutes)
notary.connect(rpcUser).use { connection -> notary.connect(rpcUser).use { connection ->
val rpcProxy = connection.proxy val rpcProxy = connection.proxy
assertTrue(rpcProxy.killFlow(flowHandle.id)) assertTrue(rpcProxy.killFlow(flowHandle.id))

View File

@ -57,6 +57,9 @@ class RPCPermissionsTests : AbstractRPCTest() {
assertNotAllowed { assertNotAllowed {
proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow") proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow")
} }
assertNotAllowed {
proxy.validatePermission("startFlow", "net.corda.flows.DummyFlow")
}
} }
} }
@ -67,6 +70,10 @@ class RPCPermissionsTests : AbstractRPCTest() {
val proxy = testProxyFor(adminUser) val proxy = testProxyFor(adminUser)
proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow") proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow")
proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.DummyFlow") proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.DummyFlow")
proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.DummyFlow")
proxy.validatePermission("startFlow", "net.corda.flows.DummyFlow")
proxy.validatePermission("startTrackedFlow", "net.corda.flows.DummyFlow")
proxy.validatePermission("startFlowWithClientId", "net.corda.flows.DummyFlow")
} }
} }
@ -77,6 +84,10 @@ class RPCPermissionsTests : AbstractRPCTest() {
val proxy = testProxyFor(joeUser) val proxy = testProxyFor(joeUser)
proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow") proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow")
proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.DummyFlow") proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.DummyFlow")
proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.DummyFlow")
proxy.validatePermission("startFlow", "net.corda.flows.DummyFlow")
proxy.validatePermission("startTrackedFlow", "net.corda.flows.DummyFlow")
proxy.validatePermission("startFlowWithClientId", "net.corda.flows.DummyFlow")
} }
} }
@ -91,6 +102,18 @@ class RPCPermissionsTests : AbstractRPCTest() {
assertNotAllowed { assertNotAllowed {
proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.OtherFlow") proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.OtherFlow")
} }
assertNotAllowed {
proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.OtherFlow")
}
assertNotAllowed {
proxy.validatePermission("startFlow", "net.corda.flows.OtherFlow")
}
assertNotAllowed {
proxy.validatePermission("startTrackedFlow", "net.corda.flows.OtherFlow")
}
assertNotAllowed {
proxy.validatePermission("startFlowWithClientId", "net.corda.flows.OtherFlow")
}
} }
} }
@ -121,6 +144,16 @@ class RPCPermissionsTests : AbstractRPCTest() {
proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow") proxy.validatePermission("startFlowDynamic", "net.corda.flows.DummyFlow")
proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.DummyFlow") proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.DummyFlow")
proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.OtherFlow") proxy.validatePermission("startTrackedFlowDynamic", "net.corda.flows.OtherFlow")
proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.OtherFlow")
proxy.validatePermission("startFlowDynamicWithClientId", "net.corda.flows.DummyFlow")
proxy.validatePermission("startFlow", "net.corda.flows.OtherFlow")
proxy.validatePermission("startFlow", "net.corda.flows.DummyFlow")
proxy.validatePermission("startTrackedFlow", "net.corda.flows.DummyFlow")
proxy.validatePermission("startTrackedFlow", "net.corda.flows.OtherFlow")
proxy.validatePermission("startFlowWithClientId", "net.corda.flows.OtherFlow")
proxy.validatePermission("startFlowWithClientId", "net.corda.flows.DummyFlow")
assertNotAllowed { assertNotAllowed {
proxy.validatePermission("startTrackedFlowDynamic", "net.banned.flows.OtherFlow") proxy.validatePermission("startTrackedFlowDynamic", "net.banned.flows.OtherFlow")
} }

View File

@ -20,7 +20,7 @@ quasarVersion11=0.8.1_r3
jdkClassifier11=jdk11 jdkClassifier11=jdk11
proguardVersion=6.1.1 proguardVersion=6.1.1
bouncycastleVersion=1.66 bouncycastleVersion=1.66
classgraphVersion=4.8.89 classgraphVersion=4.8.90
disruptorVersion=3.4.2 disruptorVersion=3.4.2
typesafeConfigVersion=1.3.4 typesafeConfigVersion=1.3.4
jsr305Version=3.0.2 jsr305Version=3.0.2

View File

@ -2,6 +2,7 @@ package net.corda.coretests.indentity
import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs import com.google.common.jimfs.Jimfs
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.entropyToKeyPair import net.corda.core.crypto.entropyToKeyPair
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
@ -14,6 +15,7 @@ import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.getTestPartyAndCertificate import net.corda.testing.core.getTestPartyAndCertificate
import net.corda.coretesting.internal.DEV_ROOT_CA import net.corda.coretesting.internal.DEV_ROOT_CA
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.Before
import org.junit.Rule import org.junit.Rule
import org.junit.Test import org.junit.Test
import java.math.BigInteger import java.math.BigInteger
@ -24,6 +26,13 @@ class PartyAndCertificateTest {
@JvmField @JvmField
val testSerialization = SerializationEnvironmentRule() val testSerialization = SerializationEnvironmentRule()
@Before
fun setUp() {
// Register providers before creating Jimfs filesystem. JimFs creates an SSHD instance which
// register BouncyCastle and EdDSA provider separately, which wrecks havoc.
Crypto.registerProviders()
}
@Test(timeout=300_000) @Test(timeout=300_000)
fun `reject a path with no roles`() { fun `reject a path with no roles`() {
val path = X509Utilities.buildCertPath(DEV_ROOT_CA.certificate) val path = X509Utilities.buildCertPath(DEV_ROOT_CA.certificate)

View File

@ -314,6 +314,7 @@ interface CordaRPCOps : RPCOps {
/** /**
* Removes a flow's [clientId] to result/ exception mapping. If the mapping is of a running flow, then the mapping will not get removed. * Removes a flow's [clientId] to result/ exception mapping. If the mapping is of a running flow, then the mapping will not get removed.
* This version will only remove flow's that were started by the same user currently calling [removeClientId].
* *
* See [startFlowDynamicWithClientId] for more information. * See [startFlowDynamicWithClientId] for more information.
* *
@ -322,13 +323,32 @@ interface CordaRPCOps : RPCOps {
fun removeClientId(clientId: String): Boolean fun removeClientId(clientId: String): Boolean
/** /**
* Returns all finished flows that were started with a client id. * Removes a flow's [clientId] to result/ exception mapping. If the mapping is of a running flow, then the mapping will not get removed.
* This version can be called for all client ids, ignoring which user originally started a flow with [clientId].
* *
* @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully, * See [startFlowDynamicWithClientId] for more information.
* [false] if completed exceptionally. *
* @return whether the mapping was removed.
*/
fun removeClientIdAsAdmin(clientId: String): Boolean
/**
* Returns all finished flows that were started with a client ID for which the client ID mapping has not been removed. This version only
* returns the client ids for flows started by the same user currently calling [finishedFlowsWithClientIds].
*
* @return A [Map] containing client ids for finished flows started by the user calling [finishedFlowsWithClientIds], mapped to [true]
* if finished successfully, [false] if completed exceptionally.
*/ */
fun finishedFlowsWithClientIds(): Map<String, Boolean> fun finishedFlowsWithClientIds(): Map<String, Boolean>
/**
* Returns all finished flows that were started with a client id by all RPC users for which the client ID mapping has not been removed.
*
* @return A [Map] containing all client ids for finished flows, mapped to [true] if finished successfully,
* [false] if completed exceptionally.
*/
fun finishedFlowsWithClientIdsAsAdmin(): Map<String, Boolean>
/** Returns Node's NodeInfo, assuming this will not change while the node is running. */ /** Returns Node's NodeInfo, assuming this will not change while the node is running. */
fun nodeInfo(): NodeInfo fun nodeInfo(): NodeInfo

View File

@ -35,6 +35,12 @@ class RejectedCommandException(message: String) :
CordaRuntimeException(message), CordaRuntimeException(message),
@Suppress("DEPRECATION") net.corda.core.ClientRelevantError @Suppress("DEPRECATION") net.corda.core.ClientRelevantError
/**
* Thrown to indicate that the command was rejected by the node, typically due to a special temporary mode.
*/
class MissingAttachmentException(message: String) :
CordaRuntimeException(message)
/** /**
* Allows an implementing [Throwable] to be propagated to RPC clients. * Allows an implementing [Throwable] to be propagated to RPC clients.
*/ */

View File

@ -242,6 +242,8 @@ dependencies {
slowIntegrationTestRuntime configurations.runtime slowIntegrationTestRuntime configurations.runtime
slowIntegrationTestRuntime configurations.testRuntime slowIntegrationTestRuntime configurations.testRuntime
integrationTestCompile(project(":testing:cordapps:missingmigration"))
testCompile project(':testing:cordapps:dbfailure:dbfworkflows') testCompile project(':testing:cordapps:dbfailure:dbfworkflows')
} }

View File

@ -1,25 +1,45 @@
package net.corda.node.flows package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.PermissionException
import net.corda.core.CordaRuntimeException import net.corda.core.CordaRuntimeException
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.ResultSerializationException import net.corda.core.flows.ResultSerializationException
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.FlowHandleWithClientId
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startFlowWithClientId import net.corda.core.messaging.startFlowWithClientId
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.Checkpoint
import net.corda.nodeapi.exceptions.RejectedCommandException
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.driver import net.corda.testing.driver.driver
import net.corda.testing.node.User
import org.assertj.core.api.Assertions import org.assertj.core.api.Assertions
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import rx.Observable import rx.Observable
import java.time.Duration
import java.time.Instant
import java.util.UUID import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeoutException
import kotlin.reflect.KClass
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertNotEquals import kotlin.test.assertNotEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue import kotlin.test.assertTrue
class FlowWithClientIdTest { class FlowWithClientIdTest {
@ -29,7 +49,7 @@ class FlowWithClientIdTest {
ResultFlow.hook = null ResultFlow.hook = null
} }
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `start flow with client id`() { fun `start flow with client id`() {
val clientId = UUID.randomUUID().toString() val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
@ -41,7 +61,7 @@ class FlowWithClientIdTest {
} }
} }
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `remove client id`() { fun `remove client id`() {
val clientId = UUID.randomUUID().toString() val clientId = UUID.randomUUID().toString()
var counter = 0 var counter = 0
@ -64,7 +84,7 @@ class FlowWithClientIdTest {
} }
} }
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `on flow unserializable result a 'CordaRuntimeException' is thrown containing in its message the unserializable type`() { fun `on flow unserializable result a 'CordaRuntimeException' is thrown containing in its message the unserializable type`() {
val clientId = UUID.randomUUID().toString() val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
@ -79,7 +99,7 @@ class FlowWithClientIdTest {
} }
} }
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `If flow has an unserializable exception result then it gets converted into a 'CordaRuntimeException'`() { fun `If flow has an unserializable exception result then it gets converted into a 'CordaRuntimeException'`() {
ResultFlow.hook = { ResultFlow.hook = {
throw UnserializableException() throw UnserializableException()
@ -107,7 +127,7 @@ class FlowWithClientIdTest {
} }
} }
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `reattachFlowWithClientId can retrieve results from existing flow future`() { fun `reattachFlowWithClientId can retrieve results from existing flow future`() {
val clientId = UUID.randomUUID().toString() val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
@ -141,7 +161,7 @@ class FlowWithClientIdTest {
} }
} }
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `finishedFlowsWithClientIds returns completed flows with client ids`() { fun `finishedFlowsWithClientIds returns completed flows with client ids`() {
val clientId = UUID.randomUUID().toString() val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
@ -151,35 +171,344 @@ class FlowWithClientIdTest {
assertEquals(true, finishedFlows[clientId]) assertEquals(true, finishedFlows[clientId])
} }
} }
}
@StartableByRPC @Test(timeout=300_000)
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() { fun `a client id flow can be re-attached when flows draining mode is on`() {
companion object { val clientId = UUID.randomUUID().toString()
var hook: (() -> Unit)? = null driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
var suspendableHook: FlowLogic<Unit>? = null val nodeA = startNode().getOrThrow()
val result0 = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds)
assertEquals(5, result0)
nodeA.rpc.setFlowsDrainingModeEnabled(true)
val result1 = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds)
assertEquals(5, result1)
}
} }
@Suspendable @Test(timeout=300_000)
override fun call(): A { fun `if client id flow does not exist and flows draining mode is on, a RejectedCommandException gets thrown`() {
hook?.invoke() val clientId = UUID.randomUUID().toString()
suspendableHook?.let { subFlow(it) } driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
return result val nodeA = startNode().getOrThrow()
}
}
@StartableByRPC nodeA.rpc.setFlowsDrainingModeEnabled(true)
internal class UnserializableResultFlow: FlowLogic<OpenFuture<Observable<Unit>>>() { assertFailsWith<RejectedCommandException>("Node is draining before shutdown. Cannot start new flows through RPC.") {
companion object { nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
val UNSERIALIZABLE_OBJECT = openFuture<Observable<Unit>>().also { it.set(Observable.empty<Unit>())} }
}
} }
@Suspendable @Test(timeout = 300_000)
override fun call(): OpenFuture<Observable<Unit>> { fun `a killed flow's exception can be retrieved after restarting the node`() {
return UNSERIALIZABLE_OBJECT val clientId = UUID.randomUUID().toString()
}
}
internal class UnserializableException( driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet(), inMemoryDB = false)) {
val unserializableObject: BrokenMap<Unit, Unit> = BrokenMap() val nodeA = startNode(providedName = ALICE_NAME).getOrThrow()
): CordaRuntimeException("123") var flowHandle0: FlowHandleWithClientId<Unit>? = null
assertFailsWith<KilledFlowException> {
flowHandle0 = nodeA.rpc.startFlowWithClientId(clientId, ::HospitalizeFlow)
nodeA.waitForOvernightObservation(flowHandle0!!.id, 20.seconds)
nodeA.rpc.killFlow(flowHandle0!!.id)
flowHandle0!!.returnValue.getOrThrow(20.seconds)
}
val flowHandle1: FlowHandleWithClientId<Unit> = nodeA.rpc.startFlowWithClientId(clientId, ::HospitalizeFlow)
assertFailsWith<KilledFlowException> {
flowHandle1.returnValue.getOrThrow(20.seconds)
}
assertEquals(flowHandle0!!.id, flowHandle1.id)
assertTrue(nodeA.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED))
assertTrue(nodeA.hasException(flowHandle0!!.id, KilledFlowException::class))
nodeA.stop()
val nodeARestarted = startNode(providedName = ALICE_NAME).getOrThrow()
assertFailsWith<KilledFlowException> {
nodeARestarted.rpc.reattachFlowWithClientId<Unit>(clientId)!!.returnValue.getOrThrow(20.seconds)
}
}
}
private fun NodeHandle.hasStatus(id: StateMachineRunId, status: Checkpoint.FlowStatus): Boolean {
return rpc.startFlow(::IsFlowInStatus, id, status.ordinal).returnValue.getOrThrow(20.seconds)
}
private fun <T : Exception> NodeHandle.hasException(id: StateMachineRunId, type: KClass<T>): Boolean {
return rpc.startFlow(::GetExceptionType, id).returnValue.getOrThrow(20.seconds) == type.qualifiedName
}
private fun NodeHandle.waitForOvernightObservation(id: StateMachineRunId, timeout: Duration) {
val timeoutTime = Instant.now().plusSeconds(timeout.seconds)
var exists = false
while (Instant.now().isBefore(timeoutTime) && !exists) {
exists = rpc.startFlow(::IsFlowInStatus, id, Checkpoint.FlowStatus.HOSPITALIZED.ordinal).returnValue.getOrThrow(timeout)
Thread.sleep(1.seconds.toMillis())
}
if (!exists) {
throw TimeoutException("Flow was not kept for observation during timeout duration")
}
}
@Test(timeout = 300_000)
fun `reattaching to existing running flow using startFlowWithClientId for flow started by another user throws a permission exception`() {
val user = User("TonyStark", "I AM IRONMAN", setOf(Permissions.all()))
val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all()))
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
val latch = CountDownLatch(1)
ResultFlow.hook = {
latch.await()
}
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
val reattachedByStarter = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
assertFailsWith<PermissionException> {
CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
it.proxy.startFlowWithClientId(clientId, ::ResultFlow, 5)
}
}
latch.countDown()
assertEquals(5, flowHandle.returnValue.getOrThrow(20.seconds))
assertEquals(5, reattachedByStarter.returnValue.getOrThrow(20.seconds))
}
}
@Test(timeout = 300_000)
fun `reattaching to existing completed flow using startFlowWithClientId for flow started by another user throws a permission exception`() {
val user = User("TonyStark", "I AM IRONMAN", setOf(Permissions.all()))
val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all()))
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds)
assertFailsWith<PermissionException> {
CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
it.proxy.startFlowWithClientId(clientId, ::ResultFlow, 5)
}
}
}
}
@Test(timeout = 300_000)
fun `reattaching to existing completed flow using startFlowWithClientId for flow started by another user throws a permission exception (after node restart)`() {
val user = User("TonyStark", "I AM IRONMAN", setOf(Permissions.all()))
val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all()))
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet(), inMemoryDB = false)) {
var nodeA = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user, spy)).getOrThrow()
nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds)
nodeA.stop()
nodeA = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user, spy)).getOrThrow(20.seconds)
assertFailsWith<PermissionException> {
CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
it.proxy.startFlowWithClientId(clientId, ::ResultFlow, 5)
}
}
}
}
@Test(timeout = 300_000)
fun `reattaching to existing flow using reattachFlowWithClientId for flow started by another user returns null`() {
val user = User("dan", "this is my password", setOf(Permissions.all()))
val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all()))
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
val reattachedByStarter = nodeA.rpc.reattachFlowWithClientId<Int>(clientId)?.returnValue?.getOrThrow(20.seconds)
val reattachedBySpy = CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
it.proxy.reattachFlowWithClientId<Int>(clientId)?.returnValue?.getOrThrow(20.seconds)
}
assertEquals(5, flowHandle.returnValue.getOrThrow(20.seconds))
assertEquals(5, reattachedByStarter)
assertNull(reattachedBySpy)
}
}
@Test(timeout = 300_000)
fun `removeClientId does not remove mapping for flows started by another user`() {
val user = User("dan", "this is my password", setOf(Permissions.all()))
val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all()))
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
flowHandle.returnValue.getOrThrow(20.seconds)
val removedBySpy = CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
it.proxy.removeClientId(clientId)
}
val reattachedByStarter = nodeA.rpc.reattachFlowWithClientId<Int>(clientId)?.returnValue?.getOrThrow(20.seconds)
val removedByStarter = nodeA.rpc.removeClientId(clientId)
assertEquals(5, reattachedByStarter)
assertTrue(removedByStarter)
assertFalse(removedBySpy)
}
}
@Test(timeout = 300_000)
fun `removeClientIdAsAdmin does remove mapping for flows started by another user`() {
val user = User("dan", "this is my password", setOf(Permissions.all()))
val spy = User("spy", "l33t h4ck4r", setOf(Permissions.all()))
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
val flowHandle = nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5)
flowHandle.returnValue.getOrThrow(20.seconds)
val removedBySpy = CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
it.proxy.removeClientIdAsAdmin(clientId)
}
val reattachedByStarter = nodeA.rpc.reattachFlowWithClientId<Int>(clientId)?.returnValue?.getOrThrow(20.seconds)
val removedByStarter = nodeA.rpc.removeClientIdAsAdmin(clientId)
assertNull(reattachedByStarter)
assertFalse(removedByStarter)
assertTrue(removedBySpy)
}
}
@Test(timeout = 300_000)
fun `finishedFlowsWithClientIds does not return flows started by other users`() {
val user = User("CaptainAmerica", "That really is America's ass", setOf(Permissions.all()))
val spy = User("nsa", "EternalBlue", setOf(Permissions.all()))
val clientIdForUser = UUID.randomUUID().toString()
val clientIdForSpy = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
val flowHandleStartedByUser = nodeA.rpc.startFlowWithClientId(clientIdForUser, ::ResultFlow, 5)
CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
val flowHandleStartedBySpy = it.proxy.startFlowWithClientId(clientIdForSpy, ::ResultFlow, 10)
flowHandleStartedByUser.returnValue.getOrThrow(20.seconds)
flowHandleStartedBySpy.returnValue.getOrThrow(20.seconds)
val userFinishedFlows = nodeA.rpc.finishedFlowsWithClientIds()
val spyFinishedFlows = it.proxy.finishedFlowsWithClientIds()
assertEquals(1, userFinishedFlows.size)
assertEquals(clientIdForUser, userFinishedFlows.keys.single())
assertEquals(5, nodeA.rpc.reattachFlowWithClientId<Int>(userFinishedFlows.keys.single())!!.returnValue.getOrThrow())
assertEquals(1, spyFinishedFlows.size)
assertEquals(clientIdForSpy, spyFinishedFlows.keys.single())
assertEquals(10, it.proxy.reattachFlowWithClientId<Int>(spyFinishedFlows.keys.single())!!.returnValue.getOrThrow())
}
}
}
@Test(timeout = 300_000)
fun `finishedFlowsWithClientIdsAsAdmin does return flows started by other users`() {
val user = User("CaptainAmerica", "That really is America's ass", setOf(Permissions.all()))
val spy = User("nsa", "EternalBlue", setOf(Permissions.all()))
val clientIdForUser = UUID.randomUUID().toString()
val clientIdForSpy = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode(rpcUsers = listOf(user, spy)).getOrThrow()
val flowHandleStartedByUser = nodeA.rpc.startFlowWithClientId(clientIdForUser, ::ResultFlow, 5)
CordaRPCClient(nodeA.rpcAddress).start(spy.username, spy.password).use {
val flowHandleStartedBySpy = it.proxy.startFlowWithClientId(clientIdForSpy, ::ResultFlow, 10)
flowHandleStartedByUser.returnValue.getOrThrow(20.seconds)
flowHandleStartedBySpy.returnValue.getOrThrow(20.seconds)
val userFinishedFlows = nodeA.rpc.finishedFlowsWithClientIdsAsAdmin()
val spyFinishedFlows = it.proxy.finishedFlowsWithClientIdsAsAdmin()
assertEquals(2, userFinishedFlows.size)
assertEquals(2, spyFinishedFlows.size)
assertEquals(userFinishedFlows, spyFinishedFlows)
}
}
}
@StartableByRPC
internal class ResultFlow<A>(private val result: A) : FlowLogic<A>() {
companion object {
var hook: (() -> Unit)? = null
var suspendableHook: FlowLogic<Unit>? = null
}
@Suspendable
override fun call(): A {
hook?.invoke()
suspendableHook?.let { subFlow(it) }
return result
}
}
@StartableByRPC
internal class UnserializableResultFlow : FlowLogic<OpenFuture<Observable<Unit>>>() {
companion object {
val UNSERIALIZABLE_OBJECT = openFuture<Observable<Unit>>().also { it.set(Observable.empty<Unit>()) }
}
@Suspendable
override fun call(): OpenFuture<Observable<Unit>> {
return UNSERIALIZABLE_OBJECT
}
}
@StartableByRPC
internal class HospitalizeFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
throw HospitalizeFlowException("time to go to the doctors")
}
}
@StartableByRPC
internal class IsFlowInStatus(private val id: StateMachineRunId, private val ordinal: Int) : FlowLogic<Boolean>() {
@Suspendable
override fun call(): Boolean {
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints where status = ? and flow_id = ?")
.apply {
setInt(1, ordinal)
setString(2, id.uuid.toString())
}
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}.toInt() == 1
}
}
@StartableByRPC
internal class GetExceptionType(private val id: StateMachineRunId) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
return serviceHub.jdbcSession().prepareStatement("select type from node_flow_exceptions where flow_id = ?")
.apply { setString(1, id.uuid.toString()) }
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getString(1)
}
}
}
}
internal class UnserializableException(
val unserializableObject: BrokenMap<Unit, Unit> = BrokenMap()
) : CordaRuntimeException("123")
}

View File

@ -6,6 +6,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowExternalOperation import net.corda.core.flows.FlowExternalOperation
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession import net.corda.core.flows.FlowSession
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.KilledFlowException import net.corda.core.flows.KilledFlowException
@ -210,6 +211,26 @@ class KillFlowTest {
} }
} }
@Test(timeout = 300_000)
fun `killing a hospitalized flow ends the flow immediately`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.let { rpc ->
val handle = rpc.startFlow(::AFlowThatGetsMurderedWhileInTheHospital)
Thread.sleep(5000)
val time = measureTimeMillis {
rpc.killFlow(handle.id)
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(1.minutes)
}
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
assertEquals(1, rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds))
}
}
}
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `a killed flow will propagate the killed error to counter parties if it was suspended`() { fun `a killed flow will propagate the killed error to counter parties if it was suspended`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
@ -482,6 +503,15 @@ class KillFlowTest {
} }
} }
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurderedWhileInTheHospital : FlowLogic<Unit>() {
@Suspendable
override fun call() {
throw HospitalizeFlowException("time to go to the doctors")
}
}
@StartableByRPC @StartableByRPC
@InitiatingFlow @InitiatingFlow
class AFlowThatGetsMurderedAndSomehowKillsItsFriends(private val parties: List<Party>) : FlowLogic<Unit>() { class AFlowThatGetsMurderedAndSomehowKillsItsFriends(private val parties: List<Party>) : FlowLogic<Unit>() {

View File

@ -2,10 +2,15 @@ package net.corda.node.persistence
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.node.flows.isQuasarAgentSpecified import net.corda.node.flows.isQuasarAgentSpecified
import net.corda.node.internal.ConfigurationException import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException
import net.corda.nodeapi.internal.persistence.HibernateSchemaChangeException
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeParameters import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver import net.corda.testing.driver.driver
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.internal.startNode
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Test import org.junit.Test
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
@ -13,10 +18,33 @@ class DbSchemaInitialisationTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `database initialisation not allowed in config`() { fun `database initialisation not allowed in config`() {
driver(DriverParameters(startNodesInProcess = isQuasarAgentSpecified(), cordappsForAllNodes = emptyList())) { driver(DriverParameters(startNodesInProcess = isQuasarAgentSpecified(), cordappsForAllNodes = emptyList())) {
assertFailsWith(ConfigurationException::class) { assertFailsWith(IllegalStateException::class) {
startNode(NodeParameters(customOverrides = mapOf("database.initialiseSchema" to "false"))).getOrThrow() startNode(NodeParameters(customOverrides = mapOf("database.initialiseSchema" to "false"))).getOrThrow()
} }
} }
} }
@Test(timeout = 300_000)
fun `app migration resource is only mandatory when not in dev mode`() {
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = emptyList(),
allowHibernateToManageAppSchema = false)) {
// in dev mode, it fails because the schema of our test CorDapp is missing
assertThatExceptionOfType(HibernateSchemaChangeException::class.java)
.isThrownBy {
startNode(NodeParameters(additionalCordapps = listOf(TestCordapp.findCordapp("net.corda.failtesting.missingmigrationcordapp")))).getOrThrow()
}
.withMessage("Incompatible schema change detected. Please run schema migration scripts (node with sub-command run-migration-scripts). Reason: Schema-validation: missing table [test_table]")
// without devMode, it doesn't even get this far as it complains about the schema migration missing.
assertThatExceptionOfType(CouldNotCreateDataSourceException::class.java)
.isThrownBy {
startNode(
ALICE_NAME,
false,
NodeParameters(additionalCordapps = listOf(TestCordapp.findCordapp("net.corda.failtesting.missingmigrationcordapp")))).getOrThrow()
}
.withMessage("Could not create the DataSource: No migration defined for schema: net.corda.failtesting.missingmigrationcordapp.MissingMigrationSchema v1")
}
}
} }

View File

@ -5,9 +5,15 @@ import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.schemas.NodeInfoSchemaV1 import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.utilities.createKeyPairAndSelfSignedTLSCertificate
import net.corda.nodeapi.internal.DEV_ROOT_CA import net.corda.nodeapi.internal.DEV_ROOT_CA
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.* import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
@ -159,6 +165,14 @@ class PersistentNetworkMapCacheTest {
assertThat(charlieNetMapCache.getNodeByLegalName(BOB_NAME)).isNotNull assertThat(charlieNetMapCache.getNodeByLegalName(BOB_NAME)).isNotNull
} }
@Test(timeout=300_000)
fun `negative test - invalid trust root leads to no node added`() {
val (_, badCert) = createKeyPairAndSelfSignedTLSCertificate(DEV_ROOT_CA.certificate.issuerX500Principal)
val netMapCache = PersistentNetworkMapCache(TestingNamedCacheFactory(), database, InMemoryIdentityService(trustRoot = badCert))
netMapCache.addOrUpdateNode(createNodeInfo(listOf(ALICE)))
assertThat(netMapCache.allNodes).hasSize(0)
}
private fun createNodeInfo(identities: List<TestIdentity>, private fun createNodeInfo(identities: List<TestIdentity>,
address: NetworkHostAndPort = NetworkHostAndPort("localhost", portCounter++)): NodeInfo { address: NetworkHostAndPort = NetworkHostAndPort("localhost", portCounter++)): NodeInfo {
return NodeInfo( return NodeInfo(

View File

@ -147,6 +147,7 @@ import net.corda.node.utilities.NotaryLoader
import net.corda.nodeapi.internal.NodeInfoAndSigned import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.cordapp.CordappLoader import net.corda.nodeapi.internal.cordapp.CordappLoader
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.cryptoservice.CryptoService import net.corda.nodeapi.internal.cryptoservice.CryptoService
import net.corda.nodeapi.internal.cryptoservice.bouncycastle.BCCryptoService import net.corda.nodeapi.internal.cryptoservice.bouncycastle.BCCryptoService
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent
@ -162,6 +163,8 @@ import net.corda.nodeapi.internal.persistence.OutstandingDatabaseChangesExceptio
import net.corda.nodeapi.internal.persistence.RestrictedConnection import net.corda.nodeapi.internal.persistence.RestrictedConnection
import net.corda.nodeapi.internal.persistence.RestrictedEntityManager import net.corda.nodeapi.internal.persistence.RestrictedEntityManager
import net.corda.nodeapi.internal.persistence.SchemaMigration import net.corda.nodeapi.internal.persistence.SchemaMigration
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.withoutDatabaseAccess
import net.corda.tools.shell.InteractiveShell import net.corda.tools.shell.InteractiveShell
import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.activemq.artemis.utils.ReusableLatch
import org.jolokia.jvmagent.JolokiaServer import org.jolokia.jvmagent.JolokiaServer
@ -245,7 +248,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
private val notaryLoader = configuration.notary?.let { private val notaryLoader = configuration.notary?.let {
NotaryLoader(it, versionInfo) NotaryLoader(it, versionInfo)
} }
val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo).closeOnStop() val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo).closeOnStop(false)
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas).tokenize() val schemaService = NodeSchemaService(cordappLoader.cordappSchemas).tokenize()
val identityService = PersistentIdentityService(cacheFactory).tokenize() val identityService = PersistentIdentityService(cacheFactory).tokenize()
val database: CordaPersistence = createCordaPersistence( val database: CordaPersistence = createCordaPersistence(
@ -388,8 +391,13 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
return this return this
} }
protected fun <T : AutoCloseable> T.closeOnStop(): T { protected fun <T : AutoCloseable> T.closeOnStop(usesDatabase: Boolean = true): T {
runOnStop += this::close if (usesDatabase) {
contextDatabase // Will throw if no database is available, since this would run after closing the database, yet claims it needs it.
runOnStop += this::close
} else {
runOnStop += { withoutDatabaseAccess { this.close() } }
}
return this return this
} }
@ -470,9 +478,9 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
pendingCoreChanges = schemaMigration.getPendingChangesCount(schemaService.internalSchemas, true) pendingCoreChanges = schemaMigration.getPendingChangesCount(schemaService.internalSchemas, true)
} }
if(updateAppSchemas) { if(updateAppSchemas) {
schemaMigration.runMigration(!updateAppSchemasWithCheckpoints && haveCheckpoints, schemaService.appSchemas, false) schemaMigration.runMigration(!updateAppSchemasWithCheckpoints && haveCheckpoints, schemaService.appSchemas, !configuration.devMode)
} else { } else {
pendingAppChanges = schemaMigration.getPendingChangesCount(schemaService.appSchemas, false) pendingAppChanges = schemaMigration.getPendingChangesCount(schemaService.appSchemas, !configuration.devMode)
} }
} }
// Now log the vendor string as this will also cause a connection to be tested eagerly. // Now log the vendor string as this will also cause a connection to be tested eagerly.
@ -554,6 +562,13 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
networkMapCache.start(netParams.notaries) networkMapCache.start(netParams.notaries)
startDatabase() startDatabase()
// The following services need to be closed before the database, so need to be registered after it is started.
networkMapUpdater.closeOnStop()
schedulerService.closeOnStop()
val rpcOps = makeRPCOps(cordappLoader, checkpointDumper)
val (identity, identityKeyPair) = obtainIdentity()
X509Utilities.validateCertPath(trustRoot, identity.certPath)
identityService.start(trustRoot, keyStoreHandler.nodeIdentity, netParams.notaries.map { it.identity }, pkToIdCache) identityService.start(trustRoot, keyStoreHandler.nodeIdentity, netParams.notaries.map { it.identity }, pkToIdCache)
@ -794,7 +809,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
configuration.baseDirectory, configuration.baseDirectory,
configuration.extraNetworkMapKeys, configuration.extraNetworkMapKeys,
networkParametersStorage networkParametersStorage
).closeOnStop() )
protected open fun makeNodeSchedulerService() = NodeSchedulerService( protected open fun makeNodeSchedulerService() = NodeSchedulerService(
platformClock, platformClock,
@ -805,7 +820,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
nodeProperties, nodeProperties,
configuration.drainingModePollPeriod, configuration.drainingModePollPeriod,
unfinishedSchedules = busyNodeLatch unfinishedSchedules = busyNodeLatch
).tokenize().closeOnStop() ).tokenize()
private fun makeCordappLoader(configuration: NodeConfiguration, versionInfo: VersionInfo): CordappLoader { private fun makeCordappLoader(configuration: NodeConfiguration, versionInfo: VersionInfo): CordappLoader {
val generatedCordapps = mutableListOf(VirtualCordapp.generateCore(versionInfo)) val generatedCordapps = mutableListOf(VirtualCordapp.generateCore(versionInfo))
@ -991,7 +1006,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
database.startHikariPool(configuration.dataSourceProperties, metricRegistry) { dataSource, haveCheckpoints -> database.startHikariPool(configuration.dataSourceProperties, metricRegistry) { dataSource, haveCheckpoints ->
SchemaMigration(dataSource, cordappLoader, configuration.networkParametersPath, configuration.myLegalName) SchemaMigration(dataSource, cordappLoader, configuration.networkParametersPath, configuration.myLegalName)
.checkOrUpdate(schemaService.internalSchemas, runMigrationScripts, haveCheckpoints, true) .checkOrUpdate(schemaService.internalSchemas, runMigrationScripts, haveCheckpoints, true)
.checkOrUpdate(schemaService.appSchemas, runMigrationScripts, haveCheckpoints && !allowAppSchemaUpgradeWithCheckpoints, false) .checkOrUpdate(schemaService.appSchemas, runMigrationScripts, haveCheckpoints && !allowAppSchemaUpgradeWithCheckpoints, !configuration.devMode)
} }
/** Loads and starts a notary service if it is configured. */ /** Loads and starts a notary service if it is configured. */

View File

@ -1,6 +1,5 @@
package net.corda.node.internal package net.corda.node.internal
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.notUsed import net.corda.client.rpc.notUsed
import net.corda.common.logging.CordaVersion import net.corda.common.logging.CordaVersion
import net.corda.core.CordaRuntimeException import net.corda.core.CordaRuntimeException
@ -55,6 +54,7 @@ import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.rpc.context import net.corda.node.services.rpc.context
import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.statemachine.StateMachineManager
import net.corda.nodeapi.exceptions.MissingAttachmentException
import net.corda.nodeapi.exceptions.NonRpcFlowException import net.corda.nodeapi.exceptions.NonRpcFlowException
import net.corda.nodeapi.exceptions.RejectedCommandException import net.corda.nodeapi.exceptions.RejectedCommandException
import rx.Observable import rx.Observable
@ -163,14 +163,18 @@ internal class CordaRPCOpsImpl(
override fun killFlow(id: StateMachineRunId): Boolean = smm.killFlow(id) override fun killFlow(id: StateMachineRunId): Boolean = smm.killFlow(id)
override fun <T> reattachFlowWithClientId(clientId: String): FlowHandleWithClientId<T>? { override fun <T> reattachFlowWithClientId(clientId: String): FlowHandleWithClientId<T>? {
return smm.reattachFlowWithClientId<T>(clientId)?.run { return smm.reattachFlowWithClientId<T>(clientId, context().principal())?.run {
FlowHandleWithClientIdImpl(id = id, returnValue = resultFuture, clientId = clientId) FlowHandleWithClientIdImpl(id = id, returnValue = resultFuture, clientId = clientId)
} }
} }
override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId) override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId, context().principal(), false)
override fun finishedFlowsWithClientIds(): Map<String, Boolean> = smm.finishedFlowsWithClientIds() override fun removeClientIdAsAdmin(clientId: String): Boolean = smm.removeClientId(clientId, context().principal(), true)
override fun finishedFlowsWithClientIds(): Map<String, Boolean> = smm.finishedFlowsWithClientIds(context().principal(), false)
override fun finishedFlowsWithClientIdsAsAdmin(): Map<String, Boolean> = smm.finishedFlowsWithClientIds(context().principal(), true)
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> { override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
@ -267,7 +271,8 @@ internal class CordaRPCOpsImpl(
private fun <T> startFlow(logicType: Class<out FlowLogic<T>>, context: InvocationContext, args: Array<out Any?>): FlowStateMachineHandle<T> { private fun <T> startFlow(logicType: Class<out FlowLogic<T>>, context: InvocationContext, args: Array<out Any?>): FlowStateMachineHandle<T> {
if (!logicType.isAnnotationPresent(StartableByRPC::class.java)) throw NonRpcFlowException(logicType) if (!logicType.isAnnotationPresent(StartableByRPC::class.java)) throw NonRpcFlowException(logicType)
if (isFlowsDrainingModeEnabled()) { if (isFlowsDrainingModeEnabled()) {
throw RejectedCommandException("Node is draining before shutdown. Cannot start new flows through RPC.") return context.clientId?.let { smm.reattachFlowWithClientId<T>(it, context.principal()) }
?: throw RejectedCommandException("Node is draining before shutdown. Cannot start new flows through RPC.")
} }
return flowStarter.invokeFlowAsync(logicType, context, *args).getOrThrow() return flowStarter.invokeFlowAsync(logicType, context, *args).getOrThrow()
} }
@ -278,7 +283,7 @@ internal class CordaRPCOpsImpl(
override fun openAttachment(id: SecureHash): InputStream { override fun openAttachment(id: SecureHash): InputStream {
return services.attachments.openAttachment(id)?.open() ?: return services.attachments.openAttachment(id)?.open() ?:
throw RPCException("Unable to open attachment with id: $id") throw MissingAttachmentException("Unable to open attachment with id: $id")
} }
override fun uploadAttachment(jar: InputStream): SecureHash { override fun uploadAttachment(jar: InputStream): SecureHash {

View File

@ -16,8 +16,10 @@ import org.apache.shiro.authc.*
import org.apache.shiro.authc.credential.PasswordMatcher import org.apache.shiro.authc.credential.PasswordMatcher
import org.apache.shiro.authc.credential.SimpleCredentialsMatcher import org.apache.shiro.authc.credential.SimpleCredentialsMatcher
import org.apache.shiro.authz.AuthorizationInfo import org.apache.shiro.authz.AuthorizationInfo
import org.apache.shiro.authz.Permission
import org.apache.shiro.authz.SimpleAuthorizationInfo import org.apache.shiro.authz.SimpleAuthorizationInfo
import org.apache.shiro.authz.permission.DomainPermission import org.apache.shiro.authz.permission.DomainPermission
import org.apache.shiro.authz.permission.PermissionResolver
import org.apache.shiro.cache.CacheManager import org.apache.shiro.cache.CacheManager
import org.apache.shiro.mgt.DefaultSecurityManager import org.apache.shiro.mgt.DefaultSecurityManager
import org.apache.shiro.realm.AuthorizingRealm import org.apache.shiro.realm.AuthorizingRealm
@ -121,6 +123,67 @@ internal class RPCPermission : DomainPermission {
constructor() : super() constructor() : super()
} }
/*
* A [org.apache.shiro.authz.permission.PermissionResolver] implementation for RPC permissions.
* Provides a method to construct an [RPCPermission] instance from its string representation
* in the form used by a Node admin.
*
* Currently valid permission strings have the forms:
*
* - `ALL`: allowing all type of RPC calls
*
* - `InvokeRpc.$RPCMethodName`: allowing to call a given RPC method without restrictions on its arguments.
*
* - `StartFlow.$FlowClassName`: allowing to call a `startFlow*` RPC method targeting a Flow instance
* of a given class
*/
private object RPCPermissionResolver : PermissionResolver {
private const val SEPARATOR = '.'
private const val ACTION_START_FLOW = "startflow"
private const val ACTION_INVOKE_RPC = "invokerpc"
private const val ACTION_ALL = "all"
private val FLOW_RPC_CALLS = setOf(
"startFlowDynamic",
"startTrackedFlowDynamic",
"startFlowDynamicWithClientId",
"startFlow",
"startTrackedFlow",
"startFlowWithClientId"
)
private val FLOW_RPC_PERMITTED_START_FLOW_CALLS = setOf("startFlow", "startFlowDynamic")
private val FLOW_RPC_PERMITTED_TRACKED_START_FLOW_CALLS = setOf("startTrackedFlow", "startTrackedFlowDynamic")
private val FLOW_RPC_PERMITTED_START_FLOW_WITH_CLIENT_ID_CALLS = setOf("startFlowWithClientId", "startFlowDynamicWithClientId")
override fun resolvePermission(representation: String): Permission {
val action = representation.substringBefore(SEPARATOR).toLowerCase()
when (action) {
ACTION_INVOKE_RPC -> {
val rpcCall = representation.substringAfter(SEPARATOR, "")
require(representation.count { it == SEPARATOR } == 1 && rpcCall.isNotEmpty()) { "Malformed permission string" }
val permitted = when (rpcCall) {
"startFlow" -> FLOW_RPC_PERMITTED_START_FLOW_CALLS
"startTrackedFlow" -> FLOW_RPC_PERMITTED_TRACKED_START_FLOW_CALLS
"startFlowWithClientId" -> FLOW_RPC_PERMITTED_START_FLOW_WITH_CLIENT_ID_CALLS
else -> setOf(rpcCall)
}
return RPCPermission(permitted)
}
ACTION_START_FLOW -> {
val targetFlow = representation.substringAfter(SEPARATOR, "")
require(targetFlow.isNotEmpty()) { "Missing target flow after StartFlow" }
return RPCPermission(FLOW_RPC_CALLS, targetFlow)
}
ACTION_ALL -> {
// Leaving empty set of targets and actions to match everything
return RPCPermission()
}
else -> throw IllegalArgumentException("Unknown permission action specifier: $action")
}
}
}
class ShiroAuthorizingSubject( class ShiroAuthorizingSubject(
private val subjectId: PrincipalCollection, private val subjectId: PrincipalCollection,
private val manager: DefaultSecurityManager) : AuthorizingSubject { private val manager: DefaultSecurityManager) : AuthorizingSubject {

View File

@ -11,6 +11,7 @@ import java.util.stream.Stream
/** /**
* Thread-safe storage of fiber checkpoints. * Thread-safe storage of fiber checkpoints.
*/ */
@Suppress("TooManyFunctions")
interface CheckpointStorage { interface CheckpointStorage {
/** /**
* Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id * Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id
@ -100,5 +101,7 @@ interface CheckpointStorage {
*/ */
fun getFlowException(id: StateMachineRunId, throwIfMissing: Boolean = false): Any? fun getFlowException(id: StateMachineRunId, throwIfMissing: Boolean = false): Any?
fun addFlowException(id: StateMachineRunId, exception: Throwable)
fun removeFlowException(id: StateMachineRunId): Boolean fun removeFlowException(id: StateMachineRunId): Boolean
} }

View File

@ -14,7 +14,6 @@ import net.corda.common.validation.internal.Validated.Companion.invalid
import net.corda.common.validation.internal.Validated.Companion.valid import net.corda.common.validation.internal.Validated.Companion.valid
import net.corda.core.context.AuthServiceId import net.corda.core.context.AuthServiceId
import net.corda.core.internal.notary.NotaryServiceFlow import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.node.internal.ConfigurationException
import net.corda.node.services.config.AuthDataSourceType import net.corda.node.services.config.AuthDataSourceType
import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.CertChainPolicyType import net.corda.node.services.config.CertChainPolicyType
@ -281,17 +280,26 @@ internal object DatabaseConfigSpec : Configuration.Specification<DatabaseConfig>
private val mappedSchemaCacheSize by long().optional().withDefaultValue(DatabaseConfig.Defaults.mappedSchemaCacheSize) private val mappedSchemaCacheSize by long().optional().withDefaultValue(DatabaseConfig.Defaults.mappedSchemaCacheSize)
override fun parseValid(configuration: Config, options: Configuration.Options): Valid<DatabaseConfig> { override fun parseValid(configuration: Config, options: Configuration.Options): Valid<DatabaseConfig> {
if (initialiseSchema.isSpecifiedBy(configuration)){ if (initialiseSchema.isSpecifiedBy(configuration)) {
throw ConfigurationException("Unsupported configuration database/initialiseSchema - this option has been removed, please use the run-migration-scripts sub-command or the database management tool to modify schemas") return invalid(Configuration.Validation.Error.BadPath.of(
"Unsupported configuration database/initialiseSchema - this option has been removed, please use the run-migration-scripts sub-command or the database management tool to modify schemas",
"initialiseSchema",
"Boolean"))
} }
if (initialiseAppSchema.isSpecifiedBy(configuration)){ if (initialiseAppSchema.isSpecifiedBy(configuration)) {
throw ConfigurationException("Unsupported configuration database/initialiseAppSchema - this option has been removed, please use the run-migration-scripts sub-command or the database management tool to modify schemas") return invalid(Configuration.Validation.Error.BadPath.of(
"Unsupported configuration database/initialiseAppSchema - this option has been removed, please use the run-migration-scripts sub-command or the database management tool to modify schemas",
"initialiseAppSchema",
SchemaInitializationType::class.qualifiedName!!))
} }
if (transactionIsolationLevel.isSpecifiedBy(configuration)){ if (transactionIsolationLevel.isSpecifiedBy(configuration)) {
throw ConfigurationException("Unsupported configuration database/transactionIsolationLevel - this option has been removed and cannot be changed") return invalid(Configuration.Validation.Error.BadPath.of(
"Unsupported configuration database/transactionIsolationLevel - this option has been removed and cannot be changed",
"transactionIsolationLevel",
TransactionIsolationLevel::class.qualifiedName!!))
} }
val config = configuration.withOptions(options)
val config = configuration.withOptions(options)
return valid(DatabaseConfig(config[exportHibernateJMXStatistics], config[mappedSchemaCacheSize])) return valid(DatabaseConfig(config[exportHibernateJMXStatistics], config[mappedSchemaCacheSize]))
} }
} }

View File

@ -188,6 +188,11 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
val nextScheduleDelay = try { val nextScheduleDelay = try {
updateNetworkMapCache() updateNetworkMapCache()
} catch (e: Exception) { } catch (e: Exception) {
// Check to see if networkmap was reachable before and cached information exists
if (networkMapCache.allNodeHashes.size > 1) {
logger.debug("Networkmap Service unreachable but more than one nodeInfo entries found in the cache. Allowing node start-up to proceed.")
networkMapCache.nodeReady.set(null)
}
logger.warn("Error encountered while updating network map, will retry in $defaultWatchHttpNetworkMapRetryInterval", e) logger.warn("Error encountered while updating network map, will retry in $defaultWatchHttpNetworkMapRetryInterval", e)
defaultWatchHttpNetworkMapRetryInterval defaultWatchHttpNetworkMapRetryInterval
} }

View File

@ -19,7 +19,6 @@ import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.node.internal.schemas.NodeInfoSchemaV1 import net.corda.node.internal.schemas.NodeInfoSchemaV1
@ -32,6 +31,7 @@ import org.hibernate.Session
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.security.PublicKey import java.security.PublicKey
import java.security.cert.CertPathValidatorException
import java.util.* import java.util.*
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
import javax.persistence.PersistenceException import javax.persistence.PersistenceException
@ -253,12 +253,15 @@ open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
} }
private fun verifyIdentities(node: NodeInfo): Boolean { private fun verifyIdentities(node: NodeInfo): Boolean {
val failures = node.legalIdentitiesAndCerts.mapNotNull { Try.on { it.verify(identityService.trustAnchor) } as? Try.Failure } for (identity in node.legalIdentitiesAndCerts) {
if (failures.isNotEmpty()) { try {
logger.warn("$node has ${failures.size} invalid identities:") identity.verify(identityService.trustAnchor)
failures.forEach { logger.warn("", it) } } catch (e: CertPathValidatorException) {
logger.warn("$node has invalid identity:\nError:$e\nIdentity:${identity.certPath}")
return false
}
} }
return failures.isEmpty() return true
} }
private fun verifyAndRegisterIdentities(node: NodeInfo): Boolean { private fun verifyAndRegisterIdentities(node: NodeInfo): Boolean {

View File

@ -26,6 +26,7 @@ import net.corda.nodeapi.internal.persistence.currentDBSession
import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY
import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.commons.lang3.exception.ExceptionUtils
import org.hibernate.annotations.Type import org.hibernate.annotations.Type
import java.security.Principal
import java.sql.Connection import java.sql.Connection
import java.sql.SQLException import java.sql.SQLException
import java.time.Clock import java.time.Clock
@ -392,7 +393,7 @@ class DBCheckpointStorage(
val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) { val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) {
val errored = checkpoint.errorState as? ErrorState.Errored val errored = checkpoint.errorState as? ErrorState.Errored
errored?.let { createDBFlowException(flowId, it, now) } errored?.run { createDBFlowException(flowId, errors.last().exception, now) }
?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}") ?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}")
} else { } else {
null null
@ -460,7 +461,7 @@ class DBCheckpointStorage(
val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) { val dbFlowException = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) {
val errored = checkpoint.errorState as? ErrorState.Errored val errored = checkpoint.errorState as? ErrorState.Errored
errored?.let { createDBFlowException(flowId, it, now) } errored?.run { createDBFlowException(flowId, errors.last().exception, now) }
?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}") ?: throw IllegalStateException("Found '${checkpoint.status}' checkpoint whose error state is not ${ErrorState.Errored::class.java.simpleName}")
} else { } else {
null null
@ -572,13 +573,21 @@ class DBCheckpointStorage(
override fun getFinishedFlowsResultsMetadata(): Stream<Pair<StateMachineRunId, FlowResultMetadata>> { override fun getFinishedFlowsResultsMetadata(): Stream<Pair<StateMachineRunId, FlowResultMetadata>> {
val session = currentDBSession() val session = currentDBSession()
val jpqlQuery = val jpqlQuery =
"""select new ${DBFlowResultMetadataFields::class.java.name}(checkpoint.id, checkpoint.status, metadata.userSuppliedIdentifier) """select new ${DBFlowResultMetadataFields::class.java.name}(
checkpoint.id,
checkpoint.status,
metadata.userSuppliedIdentifier,
metadata.startedBy
)
from ${DBFlowCheckpoint::class.java.name} checkpoint from ${DBFlowCheckpoint::class.java.name} checkpoint
join ${DBFlowMetadata::class.java.name} metadata on metadata.id = checkpoint.flowMetadata join ${DBFlowMetadata::class.java.name} metadata on metadata.id = checkpoint.flowMetadata
where checkpoint.status = ${FlowStatus.COMPLETED.ordinal} or checkpoint.status = ${FlowStatus.FAILED.ordinal}""".trimIndent() where checkpoint.status = ${FlowStatus.COMPLETED.ordinal}
or checkpoint.status = ${FlowStatus.FAILED.ordinal}
or checkpoint.status = ${FlowStatus.KILLED.ordinal}""".trimIndent()
val query = session.createQuery(jpqlQuery, DBFlowResultMetadataFields::class.java) val query = session.createQuery(jpqlQuery, DBFlowResultMetadataFields::class.java)
return query.resultList.stream().map { return query.resultList.stream().map {
StateMachineRunId(UUID.fromString(it.id)) to FlowResultMetadata(it.status, it.clientId) val startedBy = it.startedBy
StateMachineRunId(UUID.fromString(it.id)) to FlowResultMetadata(it.status, it.clientId, Principal { startedBy })
} }
} }
@ -600,14 +609,21 @@ class DBCheckpointStorage(
return serializedFlowException?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT) return serializedFlowException?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
} }
override fun addFlowException(id: StateMachineRunId, exception: Throwable) {
currentDBSession().save(createDBFlowException(id.uuid.toString(), exception, clock.instant()))
}
override fun removeFlowException(id: StateMachineRunId): Boolean { override fun removeFlowException(id: StateMachineRunId): Boolean {
val flowId = id.uuid.toString() return deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, id.uuid.toString()) == 1
return deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, flowId) == 1
} }
override fun updateStatus(runId: StateMachineRunId, flowStatus: FlowStatus) { override fun updateStatus(runId: StateMachineRunId, flowStatus: FlowStatus) {
val update = "Update ${NODE_DATABASE_PREFIX}checkpoints set status = ${flowStatus.ordinal} where flow_id = '${runId.uuid}'" currentDBSession()
currentDBSession().createNativeQuery(update).executeUpdate() .createNativeQuery("Update ${NODE_DATABASE_PREFIX}checkpoints set status = :status, timestamp = :timestamp where flow_id = :id")
.setParameter("status", flowStatus.ordinal)
.setParameter("timestamp", clock.instant())
.setParameter("id", runId.uuid.toString())
.executeUpdate()
} }
override fun updateCompatible(runId: StateMachineRunId, compatible: Boolean) { override fun updateCompatible(runId: StateMachineRunId, compatible: Boolean) {
@ -659,17 +675,15 @@ class DBCheckpointStorage(
) )
} }
private fun createDBFlowException(flowId: String, errorState: ErrorState.Errored, now: Instant): DBFlowException { private fun createDBFlowException(flowId: String, exception: Throwable, now: Instant): DBFlowException {
return errorState.errors.last().exception.let { return DBFlowException(
DBFlowException( flow_id = flowId,
flow_id = flowId, type = exception::class.java.name.truncate(MAX_EXC_TYPE_LENGTH, true),
type = it::class.java.name.truncate(MAX_EXC_TYPE_LENGTH, true), message = exception.message?.truncate(MAX_EXC_MSG_LENGTH, false),
message = it.message?.truncate(MAX_EXC_MSG_LENGTH, false), stackTrace = exception.stackTraceToString(),
stackTrace = it.stackTraceToString(), value = exception.storageSerialize().bytes,
value = it.storageSerialize().bytes, persistedInstant = now
persistedInstant = now )
)
}
} }
private fun setDBFlowMetadataFinishTime(flowId: String, now: Instant) { private fun setDBFlowMetadataFinishTime(flowId: String, now: Instant) {
@ -746,7 +760,8 @@ class DBCheckpointStorage(
private class DBFlowResultMetadataFields( private class DBFlowResultMetadataFields(
val id: String, val id: String,
val status: FlowStatus, val status: FlowStatus,
val clientId: String? val clientId: String?,
val startedBy: String
) )
private fun <T : Any> T.storageSerialize(): SerializedBytes<T> { private fun <T : Any> T.storageSerialize(): SerializedBytes<T> {

View File

@ -247,7 +247,6 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
txStorage.locked { txStorage.locked {
val existingTransaction = getTransaction(id) val existingTransaction = getTransaction(id)
if (existingTransaction == null) { if (existingTransaction == null) {
updates.filter { it.id == id }.toFuture()
updateFuture updateFuture
} else { } else {
updateFuture.cancel(false) updateFuture.cancel(false)

View File

@ -69,6 +69,21 @@ sealed class Action {
*/ */
data class RemoveCheckpoint(val id: StateMachineRunId, val mayHavePersistentResults: Boolean = false) : Action() data class RemoveCheckpoint(val id: StateMachineRunId, val mayHavePersistentResults: Boolean = false) : Action()
/**
* Remove a flow's exception from the database.
*
* @param id The id of the flow
*/
data class RemoveFlowException(val id: StateMachineRunId) : Action()
/**
* Persist an exception to the database for the related flow.
*
* @param id The id of the flow
* @param exception The exception to persist
*/
data class AddFlowException(val id: StateMachineRunId, val exception: Throwable) : Action()
/** /**
* Persist the deduplication facts of [deduplicationHandlers]. * Persist the deduplication facts of [deduplicationHandlers].
*/ */

View File

@ -69,6 +69,8 @@ internal class ActionExecutorImpl(
is Action.CancelFlowTimeout -> cancelFlowTimeout(action) is Action.CancelFlowTimeout -> cancelFlowTimeout(action)
is Action.MoveFlowToPaused -> executeMoveFlowToPaused(action) is Action.MoveFlowToPaused -> executeMoveFlowToPaused(action)
is Action.UpdateFlowStatus -> executeUpdateFlowStatus(action) is Action.UpdateFlowStatus -> executeUpdateFlowStatus(action)
is Action.RemoveFlowException -> executeRemoveFlowException(action)
is Action.AddFlowException -> executeAddFlowException(action)
} }
} }
private fun executeReleaseSoftLocks(action: Action.ReleaseSoftLocks) { private fun executeReleaseSoftLocks(action: Action.ReleaseSoftLocks) {
@ -252,4 +254,12 @@ internal class ActionExecutorImpl(
private fun scheduleFlowTimeout(action: Action.ScheduleFlowTimeout) { private fun scheduleFlowTimeout(action: Action.ScheduleFlowTimeout) {
stateMachineManager.scheduleFlowTimeout(action.flowId) stateMachineManager.scheduleFlowTimeout(action.flowId)
} }
private fun executeRemoveFlowException(action: Action.RemoveFlowException) {
checkpointStorage.removeFlowException(action.id)
}
private fun executeAddFlowException(action: Action.AddFlowException) {
checkpointStorage.addFlowException(action.id, action.exception)
}
} }

View File

@ -6,11 +6,13 @@ import co.paralleluniverse.fibers.instrument.JavaAgent
import co.paralleluniverse.strands.channels.Channel import co.paralleluniverse.strands.channels.Channel
import com.codahale.metrics.Gauge import com.codahale.metrics.Gauge
import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.PermissionException
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowException import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.FlowStateMachine
@ -47,6 +49,7 @@ import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
import net.corda.serialization.internal.withTokenContext import net.corda.serialization.internal.withTokenContext
import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.activemq.artemis.utils.ReusableLatch
import rx.Observable import rx.Observable
import java.security.Principal
import java.security.SecureRandom import java.security.SecureRandom
import java.util.ArrayList import java.util.ArrayList
import java.util.HashSet import java.util.HashSet
@ -78,8 +81,6 @@ internal class SingleThreadedStateMachineManager(
private val VALID_KILL_FLOW_STATUSES = setOf( private val VALID_KILL_FLOW_STATUSES = setOf(
Checkpoint.FlowStatus.RUNNABLE, Checkpoint.FlowStatus.RUNNABLE,
Checkpoint.FlowStatus.FAILED,
Checkpoint.FlowStatus.COMPLETED,
Checkpoint.FlowStatus.HOSPITALIZED, Checkpoint.FlowStatus.HOSPITALIZED,
Checkpoint.FlowStatus.PAUSED Checkpoint.FlowStatus.PAUSED
) )
@ -180,7 +181,7 @@ internal class SingleThreadedStateMachineManager(
flowTimeoutScheduler::resetCustomTimeout flowTimeoutScheduler::resetCustomTimeout
) )
val (fibers, pausedFlows) = restoreFlowsFromCheckpoints() val (flows, pausedFlows) = restoreFlowsFromCheckpoints()
metrics.register("Flows.InFlight", Gauge<Int> { innerState.flows.size }) metrics.register("Flows.InFlight", Gauge<Int> { innerState.flows.size })
setFlowDefaultUncaughtExceptionHandler() setFlowDefaultUncaughtExceptionHandler()
@ -196,35 +197,40 @@ internal class SingleThreadedStateMachineManager(
} }
// - Incompatible checkpoints need to be handled upon implementing CORDA-3897 // - Incompatible checkpoints need to be handled upon implementing CORDA-3897
for (flow in fibers.values) { for ((id, flow) in flows) {
flow.fiber.clientId?.let { flow.fiber.clientId?.let {
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(flow.fiber.id, doneFuture(flow.fiber))
}
}
for (pausedFlow in pausedFlows) {
pausedFlow.value.checkpoint.checkpointState.invocationContext.clientId?.let {
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active( innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(
pausedFlow.key, flowId = id,
doneClientIdFuture(pausedFlow.key, pausedFlow.value.resultFuture, it) user = flow.fiber.transientState.checkpoint.checkpointState.invocationContext.principal(),
flowStateMachineFuture = doneFuture(flow.fiber)
) )
} }
} }
val finishedFlowsResults = checkpointStorage.getFinishedFlowsResultsMetadata().toList() for ((id, pausedFlow) in pausedFlows) {
for ((id, finishedFlowResult) in finishedFlowsResults) { pausedFlow.checkpoint.checkpointState.invocationContext.clientId?.let { clientId ->
finishedFlowResult.clientId?.let { innerState.clientIdsToFlowIds[clientId] = FlowWithClientIdStatus.Active(
if (finishedFlowResult.status == Checkpoint.FlowStatus.COMPLETED) { flowId = id,
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed(id, true) user = pausedFlow.checkpoint.checkpointState.invocationContext.principal(),
} else { flowStateMachineFuture = doneClientIdFuture(id, pausedFlow.resultFuture, clientId)
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed(id, false) )
} }
}
val finishedFlows = checkpointStorage.getFinishedFlowsResultsMetadata().toList()
for ((id, finishedFlow) in finishedFlows) {
finishedFlow.clientId?.let {
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed(
flowId = id,
user = finishedFlow.user,
succeeded = finishedFlow.status == Checkpoint.FlowStatus.COMPLETED
)
} ?: logger.error("Found finished flow $id without a client id. Something is very wrong and this flow will be ignored.") } ?: logger.error("Found finished flow $id without a client id. Something is very wrong and this flow will be ignored.")
} }
return { return {
logger.info("Node ready, info: ${serviceHub.myInfo}") logger.info("Node ready, info: ${serviceHub.myInfo}")
resumeRestoredFlows(fibers) resumeRestoredFlows(flows)
flowMessaging.start { _, deduplicationHandler -> flowMessaging.start { _, deduplicationHandler ->
executor.execute { executor.execute {
deliverExternalEvent(deduplicationHandler.externalCause) deliverExternalEvent(deduplicationHandler.externalCause)
@ -289,7 +295,7 @@ internal class SingleThreadedStateMachineManager(
} }
} }
@Suppress("ComplexMethod") @Suppress("ComplexMethod", "NestedBlockDepth")
private fun <A> startFlow( private fun <A> startFlow(
flowId: StateMachineRunId, flowId: StateMachineRunId,
flowLogic: FlowLogic<A>, flowLogic: FlowLogic<A>,
@ -311,7 +317,7 @@ internal class SingleThreadedStateMachineManager(
status status
} else { } else {
newFuture = openFuture() newFuture = openFuture()
FlowWithClientIdStatus.Active(flowId, newFuture!!) FlowWithClientIdStatus.Active(flowId, context.principal(), newFuture!!)
} }
} }
} }
@ -321,6 +327,13 @@ internal class SingleThreadedStateMachineManager(
// If the flow ID is the same as the one recorded in the client ID map, // If the flow ID is the same as the one recorded in the client ID map,
// then this start flow event has been retried, and we should not de-duplicate. // then this start flow event has been retried, and we should not de-duplicate.
if (flowId != it.flowId) { if (flowId != it.flowId) {
// If the user that started the original flow is not the same as the user making the current request,
// return an exception as they are not permitted to see the result of the flow
if (!it.isPermitted(context.principal())) {
return@startFlow openFuture<FlowStateMachineHandle<A>>().apply {
setException(PermissionException("A flow using this client id [$clientId] has already been started by another user"))
}
}
val existingFuture = activeOrRemovedClientIdFuture(it, clientId) val existingFuture = activeOrRemovedClientIdFuture(it, clientId)
return@startFlow uncheckedCast(existingFuture) return@startFlow uncheckedCast(existingFuture)
} }
@ -352,28 +365,62 @@ internal class SingleThreadedStateMachineManager(
override fun killFlow(id: StateMachineRunId): Boolean { override fun killFlow(id: StateMachineRunId): Boolean {
val flow = innerState.withLock { flows[id] } val flow = innerState.withLock { flows[id] }
val killFlowResult = if (flow != null) { val killFlowResult = flow?.let { killInMemoryFlow(it) } ?: killOutOfMemoryFlow(id)
flow.withFlowLock(VALID_KILL_FLOW_STATUSES) { return killFlowResult || flowHospital.dropSessionInit(id)
}
private fun killInMemoryFlow(flow: Flow<*>): Boolean {
val id = flow.fiber.id
return flow.withFlowLock(VALID_KILL_FLOW_STATUSES) {
if (!flow.fiber.transientState.isKilled) {
flow.fiber.transientState = flow.fiber.transientState.copy(isKilled = true)
logger.info("Killing flow $id known to this node.") logger.info("Killing flow $id known to this node.")
// The checkpoint and soft locks are removed here instead of relying on the processing of the next event after setting // The checkpoint and soft locks are handled here as well as in a flow's transition. This means that we do not need to rely
// the killed flag. This is to ensure a flow can be removed from the database, even if it is stuck in a infinite loop. // on the processing of the next event after setting the killed flag. This is to ensure a flow can be updated/removed from
database.transaction { // the database, even if it is stuck in a infinite loop.
checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true) if (flow.fiber.transientState.isAnyCheckpointPersisted) {
serviceHub.vaultService.softLockRelease(id.uuid) database.transaction {
if (flow.fiber.clientId != null) {
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.KILLED)
checkpointStorage.removeFlowException(id)
checkpointStorage.addFlowException(id, KilledFlowException(id))
} else {
checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true)
}
serviceHub.vaultService.softLockRelease(id.uuid)
}
} }
unfinishedFibers.countDown() unfinishedFibers.countDown()
flow.fiber.transientState = flow.fiber.transientState.copy(isKilled = true)
scheduleEvent(Event.DoRemainingWork) scheduleEvent(Event.DoRemainingWork)
true true
} else {
logger.info("A repeated request to kill flow $id has been made, ignoring...")
false
} }
} else {
// It may be that the id refers to a checkpoint that couldn't be deserialised into a flow, so we delete it if it exists.
database.transaction { checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true) }
} }
}
return killFlowResult || flowHospital.dropSessionInit(id) private fun killOutOfMemoryFlow(id: StateMachineRunId): Boolean {
return database.transaction {
val checkpoint = checkpointStorage.getCheckpoint(id)
when {
checkpoint != null && checkpoint.status == Checkpoint.FlowStatus.COMPLETED -> {
logger.info("Attempt to kill flow $id which has already completed, ignoring...")
false
}
checkpoint != null && checkpoint.status == Checkpoint.FlowStatus.FAILED -> {
logger.info("Attempt to kill flow $id which has already failed, ignoring...")
false
}
checkpoint != null && checkpoint.status == Checkpoint.FlowStatus.KILLED -> {
logger.info("Attempt to kill flow $id which has already been killed, ignoring...")
false
}
// It may be that the id refers to a checkpoint that couldn't be deserialised into a flow, so we delete it if it exists.
else -> checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true)
}
}
} }
private fun markAllFlowsAsPaused() { private fun markAllFlowsAsPaused() {
@ -415,10 +462,10 @@ internal class SingleThreadedStateMachineManager(
if (flow != null) { if (flow != null) {
decrementLiveFibers() decrementLiveFibers()
totalFinishedFlows.inc() totalFinishedFlows.inc()
return when (removalReason) { when (removalReason) {
is FlowRemovalReason.OrderlyFinish -> removeFlowOrderly(flow, removalReason, lastState) is FlowRemovalReason.OrderlyFinish -> removeFlowOrderly(flow, removalReason, lastState)
is FlowRemovalReason.ErrorFinish -> removeFlowError(flow, removalReason, lastState) is FlowRemovalReason.ErrorFinish -> removeFlowError(flow, removalReason, lastState)
FlowRemovalReason.SoftShutdown -> flow.fiber.scheduleEvent(Event.SoftShutdown) FlowRemovalReason.SoftShutdown -> { /* No further tidy up is required */ }
} }
} else { } else {
logger.warn("Flow $flowId re-finished") logger.warn("Flow $flowId re-finished")
@ -601,7 +648,9 @@ internal class SingleThreadedStateMachineManager(
val events = mutableListOf<Event>() val events = mutableListOf<Event>()
do { do {
val event = oldEventQueue.tryReceive() val event = oldEventQueue.tryReceive()
if (event is Event.Pause || event is Event.GeneratedByExternalEvent) events.add(event) if (event is Event.Pause || event is Event.SoftShutdown || event is Event.GeneratedByExternalEvent) {
events.add(event)
}
} while (event != null) } while (event != null)
// Only redeliver events if they were not persisted to the database // Only redeliver events if they were not persisted to the database
@ -967,14 +1016,16 @@ internal class SingleThreadedStateMachineManager(
lastState: StateMachineState lastState: StateMachineState
) { ) {
drainFlowEventQueue(flow) drainFlowEventQueue(flow)
// Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition])
startedFutures.remove(flow.fiber.id)?.set(Unit)
flow.fiber.clientId?.let { flow.fiber.clientId?.let {
if (flow.fiber.isKilled) { // If the flow was killed before fully initialising and persisting its initial checkpoint,
// then remove it from the client id map (removing the final proof of its existence from the node)
if (flow.fiber.isKilled && !flow.fiber.transientState.isAnyCheckpointPersisted) {
clientIdsToFlowIds.remove(it) clientIdsToFlowIds.remove(it)
} else { } else {
setClientIdAsFailed(it, flow.fiber.id) } setClientIdAsFailed(it, flow.fiber.id) }
} }
// Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition])
startedFutures.remove(flow.fiber.id)?.set(Unit)
val flowError = removalReason.flowErrors[0] // TODO what to do with several? val flowError = removalReason.flowErrors[0] // TODO what to do with several?
val exception = flowError.exception val exception = flowError.exception
(exception as? FlowException)?.originalErrorId = flowError.errorId (exception as? FlowException)?.originalErrorId = flowError.errorId
@ -1030,8 +1081,9 @@ internal class SingleThreadedStateMachineManager(
succeeded: Boolean succeeded: Boolean
) { ) {
clientIdsToFlowIds.compute(clientId) { _, existingStatus -> clientIdsToFlowIds.compute(clientId) { _, existingStatus ->
require(existingStatus != null && existingStatus is FlowWithClientIdStatus.Active) val status = requireNotNull(existingStatus)
FlowWithClientIdStatus.Removed(id, succeeded) require(existingStatus is FlowWithClientIdStatus.Active)
FlowWithClientIdStatus.Removed(flowId = id, user = status.user, succeeded = succeeded)
} }
} }
@ -1069,11 +1121,15 @@ internal class SingleThreadedStateMachineManager(
} }
) )
override fun <T> reattachFlowWithClientId(clientId: String): FlowStateMachineHandle<T>? { override fun <T> reattachFlowWithClientId(clientId: String, user: Principal): FlowStateMachineHandle<T>? {
return innerState.withLock { return innerState.withLock {
clientIdsToFlowIds[clientId]?.let { clientIdsToFlowIds[clientId]?.let {
val existingFuture = activeOrRemovedClientIdFutureForReattach(it, clientId) if (!it.isPermitted(user)) {
existingFuture?.let { uncheckedCast(existingFuture.get()) } null
} else {
val existingFuture = activeOrRemovedClientIdFutureForReattach(it, clientId)
uncheckedCast(existingFuture?.let {existingFuture.get() })
}
} }
} }
} }
@ -1110,11 +1166,11 @@ internal class SingleThreadedStateMachineManager(
} }
} }
override fun removeClientId(clientId: String): Boolean { override fun removeClientId(clientId: String, user: Principal, isAdmin: Boolean): Boolean {
var removedFlowId: StateMachineRunId? = null var removedFlowId: StateMachineRunId? = null
innerState.withLock { innerState.withLock {
clientIdsToFlowIds.computeIfPresent(clientId) { _, existingStatus -> clientIdsToFlowIds.computeIfPresent(clientId) { _, existingStatus ->
if (existingStatus is FlowWithClientIdStatus.Removed) { if (existingStatus is FlowWithClientIdStatus.Removed && (existingStatus.isPermitted(user) || isAdmin)) {
removedFlowId = existingStatus.flowId removedFlowId = existingStatus.flowId
null null
} else { } else {
@ -1129,9 +1185,10 @@ internal class SingleThreadedStateMachineManager(
return false return false
} }
override fun finishedFlowsWithClientIds(): Map<String, Boolean> { override fun finishedFlowsWithClientIds(user: Principal, isAdmin: Boolean): Map<String, Boolean> {
return innerState.withLock { return innerState.withLock {
clientIdsToFlowIds.asSequence() clientIdsToFlowIds.asSequence()
.filter { (_, status) -> status.isPermitted(user) || isAdmin }
.filter { (_, status) -> status is FlowWithClientIdStatus.Removed } .filter { (_, status) -> status is FlowWithClientIdStatus.Removed }
.map { (clientId, status) -> clientId to (status as FlowWithClientIdStatus.Removed).succeeded } .map { (clientId, status) -> clientId to (status as FlowWithClientIdStatus.Removed).succeeded }
.toMap() .toMap()

View File

@ -12,6 +12,7 @@ import net.corda.core.utilities.Try
import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.services.messaging.ReceivedMessage
import rx.Observable import rx.Observable
import java.security.Principal
/** /**
* A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects. * A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects.
@ -112,14 +113,14 @@ interface StateMachineManager {
* *
* @param clientId The client id relating to an existing flow * @param clientId The client id relating to an existing flow
*/ */
fun <T> reattachFlowWithClientId(clientId: String): FlowStateMachineHandle<T>? fun <T> reattachFlowWithClientId(clientId: String, user: Principal): FlowStateMachineHandle<T>?
/** /**
* Removes a flow's [clientId] to result/ exception mapping. * Removes a flow's [clientId] to result/ exception mapping.
* *
* @return whether the mapping was removed. * @return whether the mapping was removed.
*/ */
fun removeClientId(clientId: String): Boolean fun removeClientId(clientId: String, user: Principal, isAdmin: Boolean): Boolean
/** /**
* Returns all finished flows that were started with a client id. * Returns all finished flows that were started with a client id.
@ -127,7 +128,7 @@ interface StateMachineManager {
* @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully, * @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully,
* [false] if completed exceptionally. * [false] if completed exceptionally.
*/ */
fun finishedFlowsWithClientIds(): Map<String, Boolean> fun finishedFlowsWithClientIds(user: Principal, isAdmin: Boolean): Map<String, Boolean>
} }
// These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call // These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call

View File

@ -23,6 +23,7 @@ import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.core.utilities.Try import net.corda.core.utilities.Try
import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.DeduplicationHandler
import java.lang.IllegalStateException import java.lang.IllegalStateException
import java.security.Principal
import java.time.Instant import java.time.Instant
import java.util.concurrent.Future import java.util.concurrent.Future
import java.util.concurrent.Semaphore import java.util.concurrent.Semaphore
@ -424,16 +425,21 @@ sealed class SubFlowVersion {
data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion() data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion()
} }
sealed class FlowWithClientIdStatus(val flowId: StateMachineRunId) { sealed class FlowWithClientIdStatus(val flowId: StateMachineRunId, val user: Principal) {
fun isPermitted(user: Principal): Boolean = user.name == this.user.name
class Active( class Active(
flowId: StateMachineRunId, flowId: StateMachineRunId,
user: Principal,
val flowStateMachineFuture: CordaFuture<out FlowStateMachineHandle<out Any?>> val flowStateMachineFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>
) : FlowWithClientIdStatus(flowId) ) : FlowWithClientIdStatus(flowId, user)
class Removed(flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus(flowId) class Removed(flowId: StateMachineRunId, user: Principal, val succeeded: Boolean) : FlowWithClientIdStatus(flowId, user)
} }
data class FlowResultMetadata( data class FlowResultMetadata(
val status: Checkpoint.FlowStatus, val status: Checkpoint.FlowStatus,
val clientId: String? val clientId: String?,
val user: Principal
) )

View File

@ -3,11 +3,13 @@ package net.corda.node.services.statemachine.transitions
import net.corda.core.flows.FlowException import net.corda.core.flows.FlowException
import net.corda.core.flows.KilledFlowException import net.corda.core.flows.KilledFlowException
import net.corda.node.services.statemachine.Action import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ErrorSessionMessage import net.corda.node.services.statemachine.ErrorSessionMessage
import net.corda.node.services.statemachine.Event import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowError import net.corda.node.services.statemachine.FlowError
import net.corda.node.services.statemachine.FlowRemovalReason import net.corda.node.services.statemachine.FlowRemovalReason
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.SessionId import net.corda.node.services.statemachine.SessionId
import net.corda.node.services.statemachine.SessionState import net.corda.node.services.statemachine.SessionState
import net.corda.node.services.statemachine.StateMachineState import net.corda.node.services.statemachine.StateMachineState
@ -29,24 +31,34 @@ class KilledFlowTransition(
startingState.checkpoint.checkpointState.sessions, startingState.checkpoint.checkpointState.sessions,
errorMessages errorMessages
) )
val newCheckpoint = startingState.checkpoint.copy(
status = Checkpoint.FlowStatus.KILLED,
flowState = FlowState.Finished,
checkpointState = startingState.checkpoint.checkpointState.copy(sessions = newSessions)
)
currentState = currentState.copy( currentState = currentState.copy(
checkpoint = startingState.checkpoint.setSessions(sessions = newSessions), checkpoint = newCheckpoint,
pendingDeduplicationHandlers = emptyList(), pendingDeduplicationHandlers = emptyList(),
isRemoved = true isRemoved = true
) )
actions += Action.PropagateErrors(
errorMessages, actions += Action.PropagateErrors(errorMessages, initiatedSessions, startingState.senderUUID)
initiatedSessions,
startingState.senderUUID
)
if (!startingState.isFlowResumed) { if (!startingState.isFlowResumed) {
actions += Action.CreateTransaction actions += Action.CreateTransaction
} }
// The checkpoint and soft locks are also removed directly in [StateMachineManager.killFlow]
if (startingState.isAnyCheckpointPersisted) { // The checkpoint is updated/removed and soft locks are removed directly in [StateMachineManager.killFlow] as well
if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
actions += Action.RemoveCheckpoint(context.id, mayHavePersistentResults = true) actions += Action.RemoveCheckpoint(context.id, mayHavePersistentResults = true)
} else if (startingState.isAnyCheckpointPersisted) {
actions += Action.UpdateFlowStatus(context.id, Checkpoint.FlowStatus.KILLED)
actions += Action.RemoveFlowException(context.id)
actions += Action.AddFlowException(context.id, killedFlowError.exception)
} }
actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers) actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
actions += Action.ReleaseSoftLocks(context.id.uuid) actions += Action.ReleaseSoftLocks(context.id.uuid)
actions += Action.CommitTransaction(currentState) actions += Action.CommitTransaction(currentState)

View File

@ -52,7 +52,7 @@ class StartedFlowTransition(
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition() is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest) is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
FlowIORequest.ForceCheckpoint -> executeForceCheckpoint() FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
}.let { scheduleTerminateSessionsIfRequired(it) } }.let { terminateSessionsIfRequired(it) }
} }
private fun waitForSessionConfirmationsTransition(): TransitionResult { private fun waitForSessionConfirmationsTransition(): TransitionResult {
@ -426,7 +426,7 @@ class StartedFlowTransition(
private fun collectEndedSessionErrors(sessionIds: Collection<SessionId>, checkpoint: Checkpoint): List<Throwable> { private fun collectEndedSessionErrors(sessionIds: Collection<SessionId>, checkpoint: Checkpoint): List<Throwable> {
return sessionIds.filter { sessionId -> return sessionIds.filter { sessionId ->
!checkpoint.checkpointState.sessions.containsKey(sessionId) sessionId !in checkpoint.checkpointState.sessions
}.map {sessionId -> }.map {sessionId ->
UnexpectedFlowEndException( UnexpectedFlowEndException(
"Tried to access ended session $sessionId", "Tried to access ended session $sessionId",
@ -525,7 +525,7 @@ class StartedFlowTransition(
return builder { resumeFlowLogic(Unit) } return builder { resumeFlowLogic(Unit) }
} }
private fun scheduleTerminateSessionsIfRequired(transition: TransitionResult): TransitionResult { private fun terminateSessionsIfRequired(transition: TransitionResult): TransitionResult {
// If there are sessions to be closed, close them on the currently executing transition // If there are sessions to be closed, close them on the currently executing transition
val sessionsToBeTerminated = findSessionsToBeTerminated(transition.newState) val sessionsToBeTerminated = findSessionsToBeTerminated(transition.newState)
return if (sessionsToBeTerminated.isNotEmpty()) { return if (sessionsToBeTerminated.isNotEmpty()) {

View File

@ -2,7 +2,6 @@ package net.corda.node
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.PermissionException import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.RPCException
import net.corda.core.context.AuthServiceId import net.corda.core.context.AuthServiceId
import net.corda.core.context.InvocationContext import net.corda.core.context.InvocationContext
import net.corda.core.contracts.Amount import net.corda.core.contracts.Amount
@ -41,6 +40,7 @@ import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.Permissions.Companion.startFlow import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.node.services.rpc.CURRENT_RPC_CONTEXT import net.corda.node.services.rpc.CURRENT_RPC_CONTEXT
import net.corda.node.services.rpc.RpcAuthContext import net.corda.node.services.rpc.RpcAuthContext
import net.corda.nodeapi.exceptions.MissingAttachmentException
import net.corda.nodeapi.exceptions.NonRpcFlowException import net.corda.nodeapi.exceptions.NonRpcFlowException
import net.corda.nodeapi.internal.config.User import net.corda.nodeapi.internal.config.User
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
@ -361,7 +361,7 @@ class CordaRPCOpsImplTest {
withPermissions(invokeRpc(CordaRPCOps::openAttachment)) { withPermissions(invokeRpc(CordaRPCOps::openAttachment)) {
assertThatThrownBy { assertThatThrownBy {
rpc.openAttachment(SecureHash.zeroHash) rpc.openAttachment(SecureHash.zeroHash)
}.isInstanceOf(RPCException::class.java) }.isInstanceOf(MissingAttachmentException::class.java)
.withFailMessage("Unable to open attachment with id: ${SecureHash.zeroHash}") .withFailMessage("Unable to open attachment with id: ${SecureHash.zeroHash}")
} }
} }

View File

@ -65,6 +65,8 @@ import org.junit.Test
import rx.schedulers.TestScheduler import rx.schedulers.TestScheduler
import java.io.IOException import java.io.IOException
import java.net.URL import java.net.URL
import java.nio.file.FileSystem
import java.nio.file.Path
import java.security.KeyPair import java.security.KeyPair
import java.time.Instant import java.time.Instant
import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit
@ -81,11 +83,12 @@ class NetworkMapUpdaterTest {
val testSerialization = SerializationEnvironmentRule(true) val testSerialization = SerializationEnvironmentRule(true)
private val cacheExpiryMs = 1000 private val cacheExpiryMs = 1000
private val privateNetUUID = UUID.randomUUID() private val privateNetUUID = UUID.randomUUID()
private val fs = Jimfs.newFileSystem(unix()) private lateinit var fs: FileSystem
private val baseDir = fs.getPath("/node") private lateinit var baseDir: Path
private val nodeInfoDir = baseDir / NODE_INFO_DIRECTORY private val nodeInfoDir
get() = baseDir / NODE_INFO_DIRECTORY
private val scheduler = TestScheduler() private val scheduler = TestScheduler()
private val fileWatcher = NodeInfoWatcher(baseDir, scheduler) private lateinit var fileWatcher: NodeInfoWatcher
private val nodeReadyFuture = openFuture<Void?>() private val nodeReadyFuture = openFuture<Void?>()
private val networkMapCache = createMockNetworkMapCache() private val networkMapCache = createMockNetworkMapCache()
private lateinit var ourKeyPair: KeyPair private lateinit var ourKeyPair: KeyPair
@ -97,6 +100,14 @@ class NetworkMapUpdaterTest {
@Before @Before
fun setUp() { fun setUp() {
// Register providers before creating Jimfs filesystem. JimFs creates an SSHD instance which
// register BouncyCastle and EdDSA provider separately, which wrecks havoc.
Crypto.registerProviders()
fs = Jimfs.newFileSystem(unix())
baseDir = fs.getPath("/node")
fileWatcher = NodeInfoWatcher(baseDir, scheduler)
ourKeyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME) ourKeyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
ourNodeInfo = createNodeInfoAndSigned("Our info", ourKeyPair).signed ourNodeInfo = createNodeInfoAndSigned("Our info", ourKeyPair).signed
server = NetworkMapServer(cacheExpiryMs.millis) server = NetworkMapServer(cacheExpiryMs.millis)

View File

@ -3,6 +3,7 @@ package net.corda.node.services.network
import com.google.common.jimfs.Configuration import com.google.common.jimfs.Configuration
import com.google.common.jimfs.Jimfs import com.google.common.jimfs.Jimfs
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.crypto.Crypto
import net.corda.core.internal.* import net.corda.core.internal.*
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
import net.corda.core.utilities.days import net.corda.core.utilities.days
@ -37,7 +38,7 @@ class NetworkParametersReaderTest {
@JvmField @JvmField
val testSerialization = SerializationEnvironmentRule(true) val testSerialization = SerializationEnvironmentRule(true)
private val fs: FileSystem = Jimfs.newFileSystem(Configuration.unix()) private lateinit var fs: FileSystem
private val cacheTimeout = 100000.seconds private val cacheTimeout = 100000.seconds
private lateinit var server: NetworkMapServer private lateinit var server: NetworkMapServer
@ -45,6 +46,11 @@ class NetworkParametersReaderTest {
@Before @Before
fun setUp() { fun setUp() {
// Register providers before creating Jimfs filesystem. JimFs creates an SSHD instance which
// register BouncyCastle and EdDSA provider separately, which wrecks havoc.
Crypto.registerProviders()
fs = Jimfs.newFileSystem(Configuration.unix())
server = NetworkMapServer(cacheTimeout) server = NetworkMapServer(cacheTimeout)
val address = server.start() val address = server.start()
networkMapClient = NetworkMapClient(URL("http://$address"), VersionInfo(1, "TEST", "TEST", "TEST")) networkMapClient = NetworkMapClient(URL("http://$address"), VersionInfo(1, "TEST", "TEST", "TEST"))
@ -127,4 +133,5 @@ class NetworkParametersReaderTest {
netParamsForNode.verifiedNetworkParametersCert(DEV_ROOT_CA.certificate) netParamsForNode.verifiedNetworkParametersCert(DEV_ROOT_CA.certificate)
} }
} }
} }

View File

@ -2,6 +2,7 @@ package net.corda.node.services.network
import com.google.common.jimfs.Configuration import com.google.common.jimfs.Configuration
import com.google.common.jimfs.Jimfs import com.google.common.jimfs.Jimfs
import net.corda.core.crypto.Crypto
import net.corda.core.internal.NODE_INFO_DIRECTORY import net.corda.core.internal.NODE_INFO_DIRECTORY
import net.corda.core.internal.createDirectories import net.corda.core.internal.createDirectories
import net.corda.core.internal.div import net.corda.core.internal.div
@ -49,6 +50,10 @@ class NodeInfoWatcherTest {
@Before @Before
fun start() { fun start() {
// Register providers before creating Jimfs filesystem. JimFs creates an SSHD instance which
// register BouncyCastle and EdDSA provider separately, which wrecks havoc.
Crypto.registerProviders()
nodeInfoAndSigned = createNodeInfoAndSigned(ALICE_NAME) nodeInfoAndSigned = createNodeInfoAndSigned(ALICE_NAME)
val identityService = makeTestIdentityService() val identityService = makeTestIdentityService()
keyManagementService = MockKeyManagementService(identityService) keyManagementService = MockKeyManagementService(identityService)

View File

@ -7,6 +7,7 @@ import com.google.common.jimfs.Jimfs
import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.contracts.ContractAttachment import net.corda.core.contracts.ContractAttachment
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sha256 import net.corda.core.crypto.sha256
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
@ -67,6 +68,10 @@ class NodeAttachmentServiceTest {
@Before @Before
fun setUp() { fun setUp() {
// Register providers before creating Jimfs filesystem. JimFs creates an SSHD instance which
// register BouncyCastle and EdDSA provider separately, which wrecks havoc.
Crypto.registerProviders()
LogHelper.setLevel(PersistentUniquenessProvider::class) LogHelper.setLevel(PersistentUniquenessProvider::class)
val dataSourceProperties = makeTestDataSourceProperties() val dataSourceProperties = makeTestDataSourceProperties()
@ -90,6 +95,7 @@ class NodeAttachmentServiceTest {
@After @After
fun tearDown() { fun tearDown() {
database.close() database.close()
fs.close()
} }
@Test(timeout=300_000) @Test(timeout=300_000)

View File

@ -4,11 +4,14 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.concurrent.Semaphore import co.paralleluniverse.strands.concurrent.Semaphore
import net.corda.core.CordaRuntimeException import net.corda.core.CordaRuntimeException
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.KilledFlowException import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachineHandle import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.internal.concurrent.transpose import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
@ -18,6 +21,7 @@ import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.newContext
import net.corda.testing.node.internal.startFlow import net.corda.testing.node.internal.startFlow
import net.corda.testing.node.internal.startFlowWithClientId import net.corda.testing.node.internal.startFlowWithClientId
import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.assertj.core.api.Assertions.assertThatExceptionOfType
@ -27,14 +31,17 @@ import org.junit.Before
import org.junit.Test import org.junit.Test
import rx.Observable import rx.Observable
import java.sql.SQLTransientConnectionException import java.sql.SQLTransientConnectionException
import java.time.Duration
import java.time.Instant
import java.util.* import java.util.*
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread import kotlin.concurrent.thread
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
import kotlin.test.assertNotEquals import kotlin.test.assertFalse
import kotlin.test.assertNull import kotlin.test.assertNull
import kotlin.test.assertTrue import kotlin.test.assertTrue
@ -242,9 +249,8 @@ class FlowClientIdTests {
} }
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `killing a flow, removes the flow from the client id mapping`() { fun `killing a flow, sets the flow status to killed and adds an exception to the database`() {
var counter = 0 var counter = 0
val flowIsRunning = Semaphore(0)
val waitUntilFlowIsRunning = Semaphore(0) val waitUntilFlowIsRunning = Semaphore(0)
ResultFlow.suspendableHook = object : FlowLogic<Unit>() { ResultFlow.suspendableHook = object : FlowLogic<Unit>() {
var firstRun = true var firstRun = true
@ -255,7 +261,7 @@ class FlowClientIdTests {
if (firstRun) { if (firstRun) {
firstRun = false firstRun = false
waitUntilFlowIsRunning.release() waitUntilFlowIsRunning.release()
flowIsRunning.acquire() sleep(1.minutes)
} }
} }
} }
@ -266,16 +272,66 @@ class FlowClientIdTests {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
waitUntilFlowIsRunning.acquire() waitUntilFlowIsRunning.acquire()
aliceNode.internals.smm.killFlow(flowHandle0!!.id) aliceNode.internals.smm.killFlow(flowHandle0!!.id)
flowIsRunning.release()
flowHandle0!!.resultFuture.getOrThrow() flowHandle0!!.resultFuture.getOrThrow()
} }
// a new flow will start since the client id mapping was removed when flow got killed
val flowHandle1: FlowStateMachineHandle<Int> = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) val flowHandle1: FlowStateMachineHandle<Int> = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle1.resultFuture.getOrThrow() assertFailsWith<KilledFlowException> {
flowHandle1.resultFuture.getOrThrow()
}
assertNotEquals(flowHandle0!!.id, flowHandle1.id) assertEquals(flowHandle0!!.id, flowHandle1.id)
assertEquals(2, counter) assertEquals(1, counter)
assertTrue(aliceNode.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED))
assertTrue(aliceNode.hasException(flowHandle0!!.id))
}
@Test(timeout = 300_000)
fun `killing a hospitalized flow, sets the flow status to killed and adds an exception to the database`() {
val clientId = UUID.randomUUID().toString()
var flowHandle0: FlowStateMachineHandle<Unit>? = null
assertFailsWith<KilledFlowException> {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow())
aliceNode.waitForOvernightObservation(flowHandle0!!.id, 20.seconds)
aliceNode.internals.smm.killFlow(flowHandle0!!.id)
flowHandle0!!.resultFuture.getOrThrow()
}
val flowHandle1: FlowStateMachineHandle<Unit> = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow())
assertFailsWith<KilledFlowException> {
flowHandle1.resultFuture.getOrThrow()
}
assertEquals(flowHandle0!!.id, flowHandle1.id)
assertTrue(aliceNode.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED))
assertTrue(aliceNode.hasException(flowHandle0!!.id))
}
@Test(timeout = 300_000)
fun `killing a flow twice does nothing`() {
val clientId = UUID.randomUUID().toString()
var flowHandle0: FlowStateMachineHandle<Unit>? = null
assertFailsWith<KilledFlowException> {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow())
aliceNode.waitForOvernightObservation(flowHandle0!!.id, 20.seconds)
aliceNode.internals.smm.killFlow(flowHandle0!!.id)
flowHandle0!!.resultFuture.getOrThrow()
}
val flowHandle1: FlowStateMachineHandle<Unit> = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow())
assertFailsWith<KilledFlowException> {
flowHandle1.resultFuture.getOrThrow()
}
assertEquals(flowHandle0!!.id, flowHandle1.id)
assertTrue(aliceNode.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED))
assertTrue(aliceNode.hasException(flowHandle0!!.id))
assertFalse(aliceNode.internals.smm.killFlow(flowHandle0!!.id))
assertTrue(aliceNode.hasStatus(flowHandle0!!.id, Checkpoint.FlowStatus.KILLED))
assertTrue(aliceNode.hasException(flowHandle0!!.id))
} }
@Test(timeout = 300_000) @Test(timeout = 300_000)
@ -285,7 +341,7 @@ class FlowClientIdTests {
ResultFlow.hook = { counter++ } ResultFlow.hook = { counter++ }
val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle0.resultFuture.getOrThrow(20.seconds) flowHandle0.resultFuture.getOrThrow(20.seconds)
val removed = aliceNode.smm.removeClientId(clientId) val removed = aliceNode.smm.removeClientId(clientId, aliceNode.user, false)
// On new request with clientId, after the same clientId was removed, a brand new flow will start with that clientId // On new request with clientId, after the same clientId was removed, a brand new flow will start with that clientId
val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
flowHandle1.resultFuture.getOrThrow(20.seconds) flowHandle1.resultFuture.getOrThrow(20.seconds)
@ -308,7 +364,7 @@ class FlowClientIdTests {
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size) assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
} }
aliceNode.smm.removeClientId(clientId) aliceNode.smm.removeClientId(clientId, aliceNode.user, false)
// assert database status after remove // assert database status after remove
aliceNode.services.database.transaction { aliceNode.services.database.transaction {
@ -319,7 +375,7 @@ class FlowClientIdTests {
} }
} }
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `removing a client id exception clears resources properly`() { fun `removing a client id exception clears resources properly`() {
val clientId = UUID.randomUUID().toString() val clientId = UUID.randomUUID().toString()
ResultFlow.hook = { throw IllegalStateException() } ResultFlow.hook = { throw IllegalStateException() }
@ -334,7 +390,7 @@ class FlowClientIdTests {
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size) assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
} }
aliceNode.smm.removeClientId(clientId) aliceNode.smm.removeClientId(clientId, aliceNode.user, false)
// assert database status after remove // assert database status after remove
aliceNode.services.database.transaction { aliceNode.services.database.transaction {
@ -345,7 +401,7 @@ class FlowClientIdTests {
} }
} }
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `flow's client id mapping can only get removed once the flow gets removed`() { fun `flow's client id mapping can only get removed once the flow gets removed`() {
val clientId = UUID.randomUUID().toString() val clientId = UUID.randomUUID().toString()
var tries = 0 var tries = 0
@ -362,7 +418,7 @@ class FlowClientIdTests {
var removed = false var removed = false
while (!removed) { while (!removed) {
removed = aliceNode.smm.removeClientId(clientId) removed = aliceNode.smm.removeClientId(clientId, aliceNode.user, false)
if (!removed) ++failedRemovals if (!removed) ++failedRemovals
++tries ++tries
if (tries >= maxTries) { if (tries >= maxTries) {
@ -581,7 +637,7 @@ class FlowClientIdTests {
assertEquals("Flow's ${flowHandle0!!.id} exception was not found in the database. Something is very wrong.", e.message) assertEquals("Flow's ${flowHandle0!!.id} exception was not found in the database. Something is very wrong.", e.message)
} }
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `completed flow started with a client id nulls its flow state in database after its lifetime`() { fun `completed flow started with a client id nulls its flow state in database after its lifetime`() {
val clientId = UUID.randomUUID().toString() val clientId = UUID.randomUUID().toString()
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5)) val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
@ -593,7 +649,7 @@ class FlowClientIdTests {
} }
} }
@Test(timeout=300_000) @Test(timeout = 300_000)
fun `failed flow started with a client id nulls its flow state in database after its lifetime`() { fun `failed flow started with a client id nulls its flow state in database after its lifetime`() {
val clientId = UUID.randomUUID().toString() val clientId = UUID.randomUUID().toString()
ResultFlow.hook = { throw IllegalStateException() } ResultFlow.hook = { throw IllegalStateException() }
@ -609,11 +665,12 @@ class FlowClientIdTests {
assertNull(dbFlowCheckpoint!!.blob!!.flowStack) assertNull(dbFlowCheckpoint!!.blob!!.flowStack)
} }
} }
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `reattachFlowWithClientId can retrieve existing flow future`() { fun `reattachFlowWithClientId can retrieve existing flow future`() {
val clientId = UUID.randomUUID().toString() val clientId = UUID.randomUUID().toString()
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)) val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10))
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId) val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId, aliceNode.user)
assertEquals(10, flowHandle.resultFuture.getOrThrow(20.seconds)) assertEquals(10, flowHandle.resultFuture.getOrThrow(20.seconds))
assertEquals(clientId, flowHandle.clientId) assertEquals(clientId, flowHandle.clientId)
@ -625,7 +682,7 @@ class FlowClientIdTests {
fun `reattachFlowWithClientId can retrieve a null result from a flow future`() { fun `reattachFlowWithClientId can retrieve a null result from a flow future`() {
val clientId = UUID.randomUUID().toString() val clientId = UUID.randomUUID().toString()
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(null)) val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(null))
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId) val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId, aliceNode.user)
assertEquals(null, flowHandle.resultFuture.getOrThrow(20.seconds)) assertEquals(null, flowHandle.resultFuture.getOrThrow(20.seconds))
assertEquals(clientId, flowHandle.clientId) assertEquals(clientId, flowHandle.clientId)
@ -641,7 +698,7 @@ class FlowClientIdTests {
assertEquals(10, flowHandle.resultFuture.getOrThrow(20.seconds)) assertEquals(10, flowHandle.resultFuture.getOrThrow(20.seconds))
assertEquals(clientId, flowHandle.clientId) assertEquals(clientId, flowHandle.clientId)
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId) val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId, aliceNode.user)
assertEquals(flowHandle.id, reattachedFlowHandle?.id) assertEquals(flowHandle.id, reattachedFlowHandle?.id)
assertEquals(flowHandle.resultFuture.get(), reattachedFlowHandle?.resultFuture?.get()) assertEquals(flowHandle.resultFuture.get(), reattachedFlowHandle?.resultFuture?.get())
@ -649,7 +706,7 @@ class FlowClientIdTests {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `reattachFlowWithClientId returns null if no flow matches the client id`() { fun `reattachFlowWithClientId returns null if no flow matches the client id`() {
assertEquals(null, aliceNode.smm.reattachFlowWithClientId<Int>(UUID.randomUUID().toString())) assertEquals(null, aliceNode.smm.reattachFlowWithClientId<Int>(UUID.randomUUID().toString(), aliceNode.user))
} }
@Test(timeout = 300_000) @Test(timeout = 300_000)
@ -657,7 +714,7 @@ class FlowClientIdTests {
ResultFlow.hook = { throw IllegalStateException("Bla bla bla") } ResultFlow.hook = { throw IllegalStateException("Bla bla bla") }
val clientId = UUID.randomUUID().toString() val clientId = UUID.randomUUID().toString()
val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)) val flowHandle = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10))
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId) val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId, aliceNode.user)
assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy { assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy {
flowHandle.resultFuture.getOrThrow(20.seconds) flowHandle.resultFuture.getOrThrow(20.seconds)
@ -678,7 +735,7 @@ class FlowClientIdTests {
flowHandle.resultFuture.getOrThrow(20.seconds) flowHandle.resultFuture.getOrThrow(20.seconds)
}.withMessage("Bla bla bla") }.withMessage("Bla bla bla")
val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId) val reattachedFlowHandle = aliceNode.smm.reattachFlowWithClientId<Int>(clientId, aliceNode.user)
// [CordaRunTimeException] returned because [IllegalStateException] is not serializable // [CordaRunTimeException] returned because [IllegalStateException] is not serializable
assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy { assertThatExceptionOfType(CordaRuntimeException::class.java).isThrownBy {
@ -686,6 +743,22 @@ class FlowClientIdTests {
}.withMessage("java.lang.IllegalStateException: Bla bla bla") }.withMessage("java.lang.IllegalStateException: Bla bla bla")
} }
@Test(timeout = 300_000)
fun `reattachFlowWithClientId can retrieve exception from killed flow`() {
val clientId = UUID.randomUUID().toString()
var flowHandle0: FlowStateMachineHandle<Unit>
assertFailsWith<KilledFlowException> {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow())
aliceNode.waitForOvernightObservation(flowHandle0.id, 20.seconds)
aliceNode.internals.smm.killFlow(flowHandle0.id)
flowHandle0.resultFuture.getOrThrow()
}
assertFailsWith<KilledFlowException> {
aliceNode.smm.reattachFlowWithClientId<Int>(clientId, aliceNode.user)?.resultFuture?.getOrThrow()
}
}
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `finishedFlowsWithClientIds returns completed flows with client ids`() { fun `finishedFlowsWithClientIds returns completed flows with client ids`() {
val clientIds = listOf("a", "b", "c", "d", "e") val clientIds = listOf("a", "b", "c", "d", "e")
@ -708,7 +781,7 @@ class FlowClientIdTests {
flows.map { it.resultFuture }.transpose().getOrThrow(30.seconds) flows.map { it.resultFuture }.transpose().getOrThrow(30.seconds)
assertFailsWith<java.lang.IllegalStateException> { failedFlow.resultFuture.getOrThrow(20.seconds) } assertFailsWith<java.lang.IllegalStateException> { failedFlow.resultFuture.getOrThrow(20.seconds) }
val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds() val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds(aliceNode.user, false)
lock.countDown() lock.countDown()
@ -720,42 +793,126 @@ class FlowClientIdTests {
assertEquals( assertEquals(
listOf(10, 10, 10), listOf(10, 10, 10),
finishedFlows.filterValues { it }.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key)?.resultFuture?.get() } finishedFlows.filterValues { it }
.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key, aliceNode.user)?.resultFuture?.get() }
) )
// [CordaRunTimeException] returned because [IllegalStateException] is not serializable // [CordaRunTimeException] returned because [IllegalStateException] is not serializable
assertFailsWith<CordaRuntimeException> { assertFailsWith<CordaRuntimeException> {
finishedFlows.filterValues { !it }.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key)?.resultFuture?.getOrThrow() } finishedFlows.filterValues { !it }
.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key, aliceNode.user)?.resultFuture?.getOrThrow() }
} }
} }
}
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() { @Test(timeout = 300_000)
companion object { fun `finishedFlowsWithClientIds returns exception for killed flows`() {
var hook: ((String?) -> Unit)? = null val clientId = UUID.randomUUID().toString()
var suspendableHook: FlowLogic<Unit>? = null var flowHandle0: FlowStateMachineHandle<Unit>
assertFailsWith<KilledFlowException> {
flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, HospitalizeFlow())
aliceNode.waitForOvernightObservation(flowHandle0.id, 20.seconds)
aliceNode.internals.smm.killFlow(flowHandle0.id)
flowHandle0.resultFuture.getOrThrow()
}
val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds(aliceNode.user, false)
assertFailsWith<KilledFlowException> {
finishedFlows.keys.single()
.let { aliceNode.smm.reattachFlowWithClientId<Int>(it, aliceNode.user)?.resultFuture?.getOrThrow() }
}
} }
@Suspendable private val TestStartedNode.user get() = services.newContext().principal()
override fun call(): A {
hook?.invoke(stateMachine.clientId)
suspendableHook?.let { subFlow(it) }
return result
}
}
internal class UnSerializableResultFlow: FlowLogic<Any>() { private fun TestStartedNode.hasStatus(id: StateMachineRunId, status: Checkpoint.FlowStatus): Boolean {
companion object { return services.database.transaction {
var firstRun = true services.jdbcSession().prepareStatement("select count(*) from node_checkpoints where status = ? and flow_id = ?")
.apply {
setInt(1, status.ordinal)
setString(2, id.uuid.toString())
}
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}.toInt() == 1
}
} }
@Suspendable private fun TestStartedNode.hasException(id: StateMachineRunId): Boolean {
override fun call(): Any { return services.database.transaction {
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false) services.jdbcSession().prepareStatement("select count(*) from node_flow_exceptions where flow_id = ?")
return if (firstRun) { .apply { setString(1, id.uuid.toString()) }
firstRun = false .use { ps ->
Observable.empty<Any>() ps.executeQuery().use { rs ->
} else { rs.next()
5 // serializable result rs.getLong(1)
}
}.toInt() == 1
}
}
private fun TestStartedNode.waitForOvernightObservation(id: StateMachineRunId, timeout: Duration) {
val timeoutTime = Instant.now().plusSeconds(timeout.seconds)
var exists = false
while (Instant.now().isBefore(timeoutTime) && !exists) {
services.database.transaction {
exists = services.jdbcSession().prepareStatement("select count(*) from node_checkpoints where status = ? and flow_id = ?")
.apply {
setInt(1, Checkpoint.FlowStatus.HOSPITALIZED.ordinal)
setString(2, id.uuid.toString())
}
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}.toInt() == 1
Thread.sleep(1.seconds.toMillis())
}
}
if (!exists) {
throw TimeoutException("Flow was not kept for observation during timeout duration")
}
}
internal class ResultFlow<A>(private val result: A) : FlowLogic<A>() {
companion object {
var hook: ((String?) -> Unit)? = null
var suspendableHook: FlowLogic<Unit>? = null
}
@Suspendable
override fun call(): A {
hook?.invoke(stateMachine.clientId)
suspendableHook?.let { subFlow(it) }
return result
}
}
internal class UnSerializableResultFlow : FlowLogic<Any>() {
companion object {
var firstRun = true
}
@Suspendable
override fun call(): Any {
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
return if (firstRun) {
firstRun = false
Observable.empty<Any>()
} else {
5 // serializable result
}
}
}
internal class HospitalizeFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
throw HospitalizeFlowException("time to go to the doctors")
} }
} }
} }

View File

@ -134,8 +134,8 @@ class RetryFlowMockTest {
Assume.assumeTrue(!IS_OPENJ9) Assume.assumeTrue(!IS_OPENJ9)
val partyB = nodeB.info.legalIdentities.first() val partyB = nodeB.info.legalIdentities.first()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy { assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
nodeA.startFlow(UnbalancedSendAndReceiveFlow(partyB)).getOrThrow(20.seconds) nodeA.startFlow(UnbalancedSendAndReceiveFlow(partyB)).getOrThrow(60.seconds)
}.withMessage("Received session end message instead of a data session message. Mismatched send and receive?") }
} }
@Test(timeout=300_000) @Test(timeout=300_000)

View File

@ -41,6 +41,7 @@ import org.junit.Before
import org.junit.Test import org.junit.Test
import java.lang.IllegalStateException import java.lang.IllegalStateException
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.FileSystem
import java.security.PublicKey import java.security.PublicKey
import java.security.cert.CertPathValidatorException import java.security.cert.CertPathValidatorException
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
@ -50,7 +51,7 @@ import kotlin.test.assertFalse
import kotlin.test.assertTrue import kotlin.test.assertTrue
class NetworkRegistrationHelperTest { class NetworkRegistrationHelperTest {
private val fs = Jimfs.newFileSystem(unix()) private lateinit var fs: FileSystem
private val nodeLegalName = ALICE_NAME private val nodeLegalName = ALICE_NAME
private lateinit var config: NodeConfiguration private lateinit var config: NodeConfiguration
@ -59,6 +60,11 @@ class NetworkRegistrationHelperTest {
@Before @Before
fun init() { fun init() {
// Register providers before creating Jimfs filesystem. JimFs creates an SSHD instance which
// register BouncyCastle and EdDSA provider separately, which wrecks havoc.
Crypto.registerProviders()
fs = Jimfs.newFileSystem(unix())
val baseDirectory = fs.getPath("/baseDir").createDirectories() val baseDirectory = fs.getPath("/baseDir").createDirectories()
abstract class AbstractNodeConfiguration : NodeConfiguration abstract class AbstractNodeConfiguration : NodeConfiguration

View File

@ -101,6 +101,8 @@ include 'serialization-djvm:deserializers'
include 'serialization-tests' include 'serialization-tests'
include 'testing:cordapps:dbfailure:dbfcontracts' include 'testing:cordapps:dbfailure:dbfcontracts'
include 'testing:cordapps:dbfailure:dbfworkflows' include 'testing:cordapps:dbfailure:dbfworkflows'
include 'testing:cordapps:missingmigration'
include 'testing:cordapps:sleeping'
// Common libraries - start // Common libraries - start
include 'common-validation' include 'common-validation'

View File

@ -0,0 +1,16 @@
apply plugin: 'kotlin'
//apply plugin: 'net.corda.plugins.cordapp'
//apply plugin: 'net.corda.plugins.quasar-utils'
dependencies {
compile project(":core")
}
jar {
baseName "testing-missingmigration-cordapp"
manifest {
// This JAR is part of Corda's testing framework.
// Driver will not include it as part of an out-of-process node.
attributes('Corda-Testing': true)
}
}

View File

@ -0,0 +1,24 @@
package net.corda.failtesting.missingmigrationcordapp
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Table
object MissingMigrationSchema
object MissingMigrationSchemaV1 : MappedSchema(
schemaFamily = MissingMigrationSchema.javaClass,
version = 1,
mappedTypes = listOf(MissingMigrationSchemaV1.TestEntity::class.java)) {
@Entity
@Table(name = "test_table")
class TestEntity(
@Column(name = "random_value")
var randomValue: String
) : PersistentState() {
constructor() : this("")
}
}

View File

@ -0,0 +1,13 @@
package net.corda.failtesting.missingmigrationcordapp
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
@StartableByRPC
@InitiatingFlow
class SimpleFlow : FlowLogic<Unit>() {
override fun call() {
logger.info("Running simple flow doing nothing")
}
}

View File

@ -0,0 +1,16 @@
package net.corda.failtesting.missingmigrationcordapp
import net.corda.core.identity.AbstractParty
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
class TestEntity(val randomValue: String, override val participants: List<AbstractParty>) : QueryableState {
override fun supportedSchemas(): Iterable<MappedSchema> {
return listOf(MissingMigrationSchemaV1)
}
override fun generateMappedObject(schema: MappedSchema): PersistentState {
return MissingMigrationSchemaV1.TestEntity(randomValue)
}
}

View File

@ -0,0 +1,14 @@
apply plugin: 'kotlin'
dependencies {
compile project(":core")
}
jar {
baseName "testing-sleeping-cordapp"
manifest {
// This JAR is part of Corda's testing framework.
// Driver will not include it as part of an out-of-process node.
attributes('Corda-Testing': true)
}
}

View File

@ -0,0 +1,15 @@
package net.corda.sleeping
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import java.time.Duration
@StartableByRPC
class SleepingFlow(private val duration: Duration) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
sleep(duration)
}
}

View File

@ -11,8 +11,14 @@ import net.corda.core.flows.InitiatedBy
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.* import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.NetworkParametersStorage
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.notary.NotaryService import net.corda.core.internal.notary.NotaryService
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
@ -25,6 +31,9 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.hours import net.corda.core.utilities.hours
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.coretesting.internal.rigorousMock
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
import net.corda.coretesting.internal.testThreadFactory
import net.corda.node.VersionInfo import net.corda.node.VersionInfo
import net.corda.node.internal.AbstractNode import net.corda.node.internal.AbstractNode
import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.internal.InitiatedFlowFactory
@ -32,7 +41,11 @@ import net.corda.node.internal.NodeFlowManager
import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.StartedNodeServices import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.config.* import net.corda.node.services.config.FlowTimeoutConfiguration
import net.corda.node.services.config.NetworkParameterAcceptanceSettings
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.config.VerifierType
import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.BasicHSMKeyManagementService import net.corda.node.services.keys.BasicHSMKeyManagementService
import net.corda.node.services.keys.KeyManagementServiceInternal import net.corda.node.services.keys.KeyManagementServiceInternal
@ -49,11 +62,12 @@ import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.common.internal.testNetworkParameters
import net.corda.coretesting.internal.rigorousMock import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.coretesting.internal.stubs.CertificateStoreStubs import net.corda.testing.node.MockNetworkNotarySpec
import net.corda.coretesting.internal.testThreadFactory import net.corda.testing.node.MockNetworkParameters
import net.corda.testing.node.* import net.corda.testing.node.MockNodeParameters
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.TestClock
import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.activemq.artemis.utils.ReusableLatch
import org.apache.sshd.common.util.security.SecurityUtils import org.apache.sshd.common.util.security.SecurityUtils
import rx.Observable import rx.Observable
@ -377,7 +391,7 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
} }
override fun makeMessagingService(): MockNodeMessagingService { override fun makeMessagingService(): MockNodeMessagingService {
return MockNodeMessagingService(configuration, serverThread).closeOnStop() return MockNodeMessagingService(configuration, serverThread).closeOnStop(usesDatabase = false)
} }
override fun startMessagingService(rpcOps: List<RPCOps>, override fun startMessagingService(rpcOps: List<RPCOps>,

View File

@ -631,7 +631,9 @@ object InteractiveShell {
} }
@JvmStatic @JvmStatic
fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps) { fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps): Int {
var result = 0 // assume it all went well
fun display(statements: RenderPrintWriter.() -> Unit) { fun display(statements: RenderPrintWriter.() -> Unit) {
statements.invoke(userSessionOut) statements.invoke(userSessionOut)
@ -676,13 +678,16 @@ object InteractiveShell {
// Cancelled whilst draining flows. So let's carry on from here // Cancelled whilst draining flows. So let's carry on from here
cordaRPCOps.setFlowsDrainingModeEnabled(false) cordaRPCOps.setFlowsDrainingModeEnabled(false)
display { println("...cancelled clean shutdown.") } display { println("...cancelled clean shutdown.") }
result = 1
} }
} catch (e: Exception) { } catch (e: Exception) {
display { println("RPC failed: ${e.rootCause}", Decoration.bold, Color.red) } display { println("RPC failed: ${e.rootCause}", Decoration.bold, Color.red) }
result = 1
} finally { } finally {
InputStreamSerializer.invokeContext = null InputStreamSerializer.invokeContext = null
InputStreamDeserializer.closeAll() InputStreamDeserializer.closeAll()
} }
return result;
} }
private fun printAndFollowRPCResponse( private fun printAndFollowRPCResponse(