From 975b81c2c4bc221cf40b668ab81ae2ef9fc317fd Mon Sep 17 00:00:00 2001 From: Michele Sollecito Date: Fri, 23 Mar 2018 11:45:19 +0000 Subject: [PATCH] [CORDA-1243]: Avoid drain-related deadlocks between 2 nodes. (#2866) --- .../net/corda/core/flows/FlowTestsUtils.kt | 2 +- .../kotlin/net/corda/MessageState.kt | 70 +++++++++++ .../kotlin/net/corda/RpcInfo.kt | 7 ++ .../{test => }/node/NodeStartAndStopTest.kt | 2 +- .../node/NodeStatePersistenceTests.kt | 84 ++----------- .../FlowsDrainingModeContentionTest.kt | 115 ++++++++++++++++++ .../statemachine/StateMachineManagerImpl.kt | 10 +- 7 files changed, 215 insertions(+), 75 deletions(-) create mode 100644 node/src/integration-test/kotlin/net/corda/MessageState.kt create mode 100644 node/src/integration-test/kotlin/net/corda/RpcInfo.kt rename node/src/integration-test/kotlin/net/corda/{test => }/node/NodeStartAndStopTest.kt (94%) rename node/src/integration-test/kotlin/net/corda/{test => }/node/NodeStatePersistenceTests.kt (71%) create mode 100644 node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt diff --git a/core/src/test/kotlin/net/corda/core/flows/FlowTestsUtils.kt b/core/src/test/kotlin/net/corda/core/flows/FlowTestsUtils.kt index 1c5934aadb..aa95898169 100644 --- a/core/src/test/kotlin/net/corda/core/flows/FlowTestsUtils.kt +++ b/core/src/test/kotlin/net/corda/core/flows/FlowTestsUtils.kt @@ -38,7 +38,7 @@ class NoAnswer(private val closure: () -> Unit = {}) : FlowLogic() { /** * Allows to register a flow of type [R] against an initiating flow of type [I]. */ -inline fun , reified R : FlowLogic<*>> StartedNode.registerInitiatedFlow(initiatingFlowType: KClass, crossinline construct: (session: FlowSession) -> R) { +inline fun , reified R : FlowLogic<*>> StartedNode<*>.registerInitiatedFlow(initiatingFlowType: KClass, crossinline construct: (session: FlowSession) -> R) { internalRegisterFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> construct(session) }, R::class.javaObjectType, true) } diff --git a/node/src/integration-test/kotlin/net/corda/MessageState.kt b/node/src/integration-test/kotlin/net/corda/MessageState.kt new file mode 100644 index 0000000000..fcbe58417c --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/MessageState.kt @@ -0,0 +1,70 @@ +package net.corda + +import net.corda.core.contracts.* +import net.corda.core.identity.AbstractParty +import net.corda.core.identity.Party +import net.corda.core.schemas.MappedSchema +import net.corda.core.schemas.PersistentState +import net.corda.core.schemas.QueryableState +import net.corda.core.serialization.CordaSerializable +import net.corda.core.transactions.LedgerTransaction +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Table + +@CordaSerializable +data class Message(val value: String) + +data class MessageState(val message: Message, val by: Party, override val linearId: UniqueIdentifier = UniqueIdentifier()) : LinearState, QueryableState { + override val participants: List = listOf(by) + + override fun generateMappedObject(schema: MappedSchema): PersistentState { + return when (schema) { + is MessageSchemaV1 -> MessageSchemaV1.PersistentMessage( + by = by.name.toString(), + value = message.value + ) + else -> throw IllegalArgumentException("Unrecognised schema $schema") + } + } + + override fun supportedSchemas(): Iterable = listOf(MessageSchemaV1) +} + +object MessageSchema +object MessageSchemaV1 : MappedSchema( + schemaFamily = MessageSchema.javaClass, + version = 1, + mappedTypes = listOf(PersistentMessage::class.java)) { + + @Entity + @Table(name = "messages") + class PersistentMessage( + @Column(name = "by") + var by: String, + + @Column(name = "value") + var value: String + ) : PersistentState() +} + +const val MESSAGE_CONTRACT_PROGRAM_ID = "net.corda.MessageContract" + +open class MessageContract : Contract { + override fun verify(tx: LedgerTransaction) { + val command = tx.commands.requireSingleCommand() + requireThat { + // Generic constraints around the IOU transaction. + "No inputs should be consumed when sending a message." using (tx.inputs.isEmpty()) + "Only one output state should be created." using (tx.outputs.size == 1) + val out = tx.outputsOfType().single() + "Message sender must sign." using (command.signers.containsAll(out.participants.map { it.owningKey })) + + "Message value must not be empty." using (out.message.value.isNotBlank()) + } + } + + interface Commands : CommandData { + class Send : Commands + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/RpcInfo.kt b/node/src/integration-test/kotlin/net/corda/RpcInfo.kt new file mode 100644 index 0000000000..c826abb1c4 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/RpcInfo.kt @@ -0,0 +1,7 @@ +package net.corda + +import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.NetworkHostAndPort + +@CordaSerializable +data class RpcInfo(val address: NetworkHostAndPort, val username: String, val password: String) \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/test/node/NodeStartAndStopTest.kt b/node/src/integration-test/kotlin/net/corda/node/NodeStartAndStopTest.kt similarity index 94% rename from node/src/integration-test/kotlin/net/corda/test/node/NodeStartAndStopTest.kt rename to node/src/integration-test/kotlin/net/corda/node/NodeStartAndStopTest.kt index 310d653926..39b9c7598b 100644 --- a/node/src/integration-test/kotlin/net/corda/test/node/NodeStartAndStopTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/NodeStartAndStopTest.kt @@ -1,4 +1,4 @@ -package net.corda.test.node +package net.corda.node import net.corda.core.utilities.getOrThrow import net.corda.testing.core.ALICE_NAME diff --git a/node/src/integration-test/kotlin/net/corda/test/node/NodeStatePersistenceTests.kt b/node/src/integration-test/kotlin/net/corda/node/NodeStatePersistenceTests.kt similarity index 71% rename from node/src/integration-test/kotlin/net/corda/test/node/NodeStatePersistenceTests.kt rename to node/src/integration-test/kotlin/net/corda/node/NodeStatePersistenceTests.kt index 18d0b2748d..e0b0599ba6 100644 --- a/node/src/integration-test/kotlin/net/corda/test/node/NodeStatePersistenceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/NodeStatePersistenceTests.kt @@ -1,37 +1,34 @@ -package net.corda.test.node +package net.corda.node import co.paralleluniverse.fibers.Suspendable +import net.corda.MESSAGE_CONTRACT_PROGRAM_ID +import net.corda.Message +import net.corda.MessageContract +import net.corda.MessageState import net.corda.client.rpc.CordaRPCClient -import net.corda.core.contracts.* +import net.corda.core.contracts.Command +import net.corda.core.contracts.StateAndContract +import net.corda.core.contracts.StateAndRef import net.corda.core.flows.FinalityFlow import net.corda.core.flows.FlowLogic import net.corda.core.flows.StartableByRPC -import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party +import net.corda.core.internal.packageName import net.corda.core.messaging.startFlow -import net.corda.core.schemas.MappedSchema -import net.corda.core.schemas.PersistentState -import net.corda.core.schemas.QueryableState -import net.corda.core.serialization.CordaSerializable -import net.corda.core.transactions.LedgerTransaction import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.getOrThrow import net.corda.node.services.Permissions.Companion.invokeRpc import net.corda.node.services.Permissions.Companion.startFlow -import net.corda.testing.node.User import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters -import net.corda.testing.driver.PortAllocation import net.corda.testing.driver.driver import net.corda.testing.driver.internal.RandomFree +import net.corda.testing.node.User import org.junit.Assume.assumeFalse import org.junit.Test import java.lang.management.ManagementFactory -import javax.persistence.Column -import javax.persistence.Entity -import javax.persistence.Table import kotlin.test.assertEquals import kotlin.test.assertNotNull @@ -44,7 +41,7 @@ class NodeStatePersistenceTests { val user = User("mark", "dadada", setOf(startFlow(), invokeRpc("vaultQuery"))) val message = Message("Hello world!") - val stateAndRef: StateAndRef? = driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified(), portAllocation = RandomFree)) { + val stateAndRef: StateAndRef? = driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified(), portAllocation = RandomFree, extraCordappPackagesToScan = listOf(MessageState::class.packageName))) { val nodeName = { val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow() val nodeName = nodeHandle.nodeInfo.singleIdentity().name @@ -78,7 +75,7 @@ class NodeStatePersistenceTests { val user = User("mark", "dadada", setOf(startFlow(), invokeRpc("vaultQuery"))) val message = Message("Hello world!") - val stateAndRef: StateAndRef? = driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified(), portAllocation = RandomFree)) { + val stateAndRef: StateAndRef? = driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified(), portAllocation = RandomFree, extraCordappPackagesToScan = listOf(MessageState::class.packageName))) { val nodeName = { val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow() val nodeName = nodeHandle.nodeInfo.singleIdentity().name @@ -110,63 +107,6 @@ fun isQuasarAgentSpecified(): Boolean { return jvmArgs.any { it.startsWith("-javaagent:") && it.endsWith("quasar.jar") } } -@CordaSerializable -data class Message(val value: String) - -data class MessageState(val message: Message, val by: Party, override val linearId: UniqueIdentifier = UniqueIdentifier()) : LinearState, QueryableState { - override val participants: List = listOf(by) - - override fun generateMappedObject(schema: MappedSchema): PersistentState { - return when (schema) { - is MessageSchemaV1 -> MessageSchemaV1.PersistentMessage( - by = by.name.toString(), - value = message.value - ) - else -> throw IllegalArgumentException("Unrecognised schema $schema") - } - } - - override fun supportedSchemas(): Iterable = listOf(MessageSchemaV1) -} - -object MessageSchema -object MessageSchemaV1 : MappedSchema( - schemaFamily = MessageSchema.javaClass, - version = 1, - mappedTypes = listOf(PersistentMessage::class.java)) { - - @Entity - @Table(name = "messages") - class PersistentMessage( - @Column(name = "by") - var by: String, - - @Column(name = "value") - var value: String - ) : PersistentState() -} - -const val MESSAGE_CONTRACT_PROGRAM_ID = "net.corda.test.node.MessageContract" - -open class MessageContract : Contract { - override fun verify(tx: LedgerTransaction) { - val command = tx.commands.requireSingleCommand() - requireThat { - // Generic constraints around the IOU transaction. - "No inputs should be consumed when sending a message." using (tx.inputs.isEmpty()) - "Only one output state should be created." using (tx.outputs.size == 1) - val out = tx.outputsOfType().single() - "Message sender must sign." using (command.signers.containsAll(out.participants.map { it.owningKey })) - - "Message value must not be empty." using (out.message.value.isNotBlank()) - } - } - - interface Commands : CommandData { - class Send : Commands - } -} - @StartableByRPC class SendMessageFlow(private val message: Message, private val notary: Party) : FlowLogic() { companion object { diff --git a/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt b/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt new file mode 100644 index 0000000000..e0cb4460b1 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt @@ -0,0 +1,115 @@ +package net.corda.node.modes.draining + +import co.paralleluniverse.fibers.Suspendable +import net.corda.MESSAGE_CONTRACT_PROGRAM_ID +import net.corda.Message +import net.corda.MessageContract +import net.corda.MessageState +import net.corda.core.contracts.Command +import net.corda.core.contracts.StateAndContract +import net.corda.core.flows.* +import net.corda.core.identity.Party +import net.corda.core.internal.packageName +import net.corda.core.messaging.startFlow +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap +import net.corda.RpcInfo +import net.corda.client.rpc.CordaRPCClient +import net.corda.node.services.Permissions.Companion.all +import net.corda.testing.core.singleIdentity +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.PortAllocation +import net.corda.testing.driver.driver +import net.corda.testing.node.User +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService + +class FlowsDrainingModeContentionTest { + + private val portAllocation = PortAllocation.Incremental(10000) + private val user = User("mark", "dadada", setOf(all())) + private val users = listOf(user) + + private var executor: ScheduledExecutorService? = null + + @Before + fun setup() { + executor = Executors.newSingleThreadScheduledExecutor() + } + + @After + fun cleanUp() { + executor!!.shutdown() + } + + @Test + fun `draining mode does not deadlock with acks between 2 nodes`() { + + val message = "Ground control to Major Tom" + + driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation, extraCordappPackagesToScan = listOf(MessageState::class.packageName))) { + + val nodeA = startNode(rpcUsers = users).getOrThrow() + val nodeB = startNode(rpcUsers = users).getOrThrow() + defaultNotaryNode.getOrThrow() + + val nodeARpcInfo = RpcInfo(nodeA.rpcAddress, user.username, user.password) + val flow = nodeA.rpc.startFlow(::ProposeTransactionAndWaitForCommit, message, nodeARpcInfo, nodeB.nodeInfo.singleIdentity(), defaultNotaryIdentity) + val committedTx = flow.returnValue.getOrThrow() + + committedTx.inputs + committedTx.tx.outputs + assertThat(committedTx.tx.outputsOfType().single().message.value).isEqualTo(message) + } + } +} + +@StartableByRPC +@InitiatingFlow +class ProposeTransactionAndWaitForCommit(private val data: String, private val myRpcInfo: RpcInfo, private val counterParty: Party, private val notary: Party) : FlowLogic() { + + @Suspendable + override fun call(): SignedTransaction { + + val session = initiateFlow(counterParty) + val messageState = MessageState(message = Message(data), by = ourIdentity) + val command = Command(MessageContract.Commands.Send(), messageState.participants.map { it.owningKey }) + val transaction = TransactionBuilder(notary) + transaction.withItems(StateAndContract(messageState, MESSAGE_CONTRACT_PROGRAM_ID), command) + val signedTx = serviceHub.signInitialTransaction(transaction) + + subFlow(SendTransactionFlow(session, signedTx)) + session.send(myRpcInfo) + + return waitForLedgerCommit(signedTx.id) + } +} + +@InitiatedBy(ProposeTransactionAndWaitForCommit::class) +class SignTransactionTriggerDrainingModeAndFinality(private val session: FlowSession) : FlowLogic() { + + @Suspendable + override fun call() { + + val tx = subFlow(ReceiveTransactionFlow(session)) + val signedTx = serviceHub.addSignature(tx) + val initiatingRpcInfo = session.receive().unwrap { it } + + triggerDrainingModeForInitiatingNode(initiatingRpcInfo) + + subFlow(FinalityFlow(signedTx, setOf(session.counterparty))) + } + + private fun triggerDrainingModeForInitiatingNode(initiatingRpcInfo: RpcInfo) { + + CordaRPCClient(initiatingRpcInfo.address).start(initiatingRpcInfo.username, initiatingRpcInfo.password).use { + it.proxy.setFlowsDrainingModeEnabled(true) + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt index a4fcc7a440..e848df74ae 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManagerImpl.kt @@ -12,6 +12,7 @@ import com.google.common.util.concurrent.MoreExecutors import net.corda.core.CordaException import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext +import net.corda.core.context.InvocationOrigin import net.corda.core.crypto.SecureHash import net.corda.core.crypto.newSecureRandom import net.corda.core.flows.FlowException @@ -640,10 +641,17 @@ class StateMachineManagerImpl( } } + // This prevents a "deadlock" in case an initiated flow tries to start a session against a draining node that is also the initiator. + // It does not help in case more than 2 nodes are involved in a circle, so the kill switch via RPC should be used in that case. + val additionalHeaders = if (mightDeadlockDrainingSender(fiber, party)) emptyMap() else message.additionalHeaders() serviceHub.networkService.apply { - send(createMessage(sessionTopic, serialized.bytes), address, retryId = retryId, additionalHeaders = message.additionalHeaders()) + send(createMessage(sessionTopic, serialized.bytes), address, retryId = retryId, additionalHeaders = additionalHeaders) } } + + private fun mightDeadlockDrainingSender(fiber: FlowStateMachineImpl<*>?, target: Party): Boolean { + return fiber?.context?.origin.let { it is InvocationOrigin.Peer && it.party == target.name } + } } private fun SessionMessage.additionalHeaders(): Map {