In AbstractNode check if an initiating flow already has a flow registered to it and throw exception if one has been (#3713)

* In AbstractNode check if an initiating flow already has a flow registered to it and throw exception if one has been

In AbstractNode.internalRegisterFlowFactory check if a flow factory already exists for a initiating flow
If a factory does exist, throw an exception
A few tests needed to be fixed due to this change

* Reorder test setup to prevent flows from being registered multiple times

* Use check instead of if and throws statement in AbstractNode when checking for initiating flow having multiple flows mapped to it

Use check instead of if throws statement
Change names of methods in tests
Improve FlowRegistrationTest to better check if flow has been registered properly

* tidy up FlowFrameworkTests and FlowRegistrationTest
This commit is contained in:
Dan Newton 2018-08-01 07:58:00 +01:00 committed by Shams Asari
parent 8b501b1b80
commit 0762a61aca
6 changed files with 120 additions and 61 deletions

View File

@ -42,7 +42,6 @@ class CustomVaultQueryTest {
mockNet = MockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.finance", "net.corda.docs", "com.template"))
nodeA = mockNet.createPartyNode()
nodeB = mockNet.createPartyNode()
nodeA.registerInitiatedFlow(TopupIssuerFlow.TopupIssuer::class.java)
notary = mockNet.defaultNotaryIdentity
}

View File

@ -36,7 +36,6 @@ class WorkflowTransactionBuildTutorialTest {
mockNet = MockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.docs"))
aliceNode = mockNet.createPartyNode(ALICE_NAME)
bobNode = mockNet.createPartyNode(BOB_NAME)
aliceNode.registerInitiatedFlow(RecordCompletionFlow::class.java)
alice = aliceNode.services.myInfo.identityFromX500Name(ALICE_NAME)
bob = bobNode.services.myInfo.identityFromX500Name(BOB_NAME)
}

View File

@ -188,7 +188,6 @@ abstract class MQSecurityTest : NodeBasedTest() {
protected fun startBobAndCommunicateWithAlice(): Party {
val bob = startNode(BOB_NAME)
bob.registerInitiatedFlow(ReceiveFlow::class.java)
val bobParty = bob.info.singleIdentity()
// Perform a protocol exchange to force the peer queue to be created
alice.services.startFlow(SendFlow(bobParty, 0)).resultFuture.getOrThrow()

View File

@ -659,6 +659,9 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
} else {
Observable.empty()
}
check(initiatingFlowClass !in flowFactories.keys) {
"$initiatingFlowClass is attempting to register multiple initiated flows"
}
flowFactories[initiatingFlowClass] = flowFactory
return observable
}

View File

@ -0,0 +1,72 @@
package net.corda.node.internal
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.utilities.unwrap
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNodeParameters
import net.corda.testing.node.StartedMockNode
import org.assertj.core.api.Assertions.assertThatIllegalStateException
import org.junit.After
import org.junit.Before
import org.junit.Test
import kotlin.test.assertNotNull
class FlowRegistrationTest {
lateinit var mockNetwork: MockNetwork
lateinit var initiator: StartedMockNode
lateinit var responder: StartedMockNode
@Before
fun setup() {
// no cordapps scanned so it can be tested in isolation
mockNetwork = MockNetwork(emptyList())
initiator = mockNetwork.createNode(MockNodeParameters(legalName = CordaX500Name("initiator", "Reading", "GB")))
responder = mockNetwork.createNode(MockNodeParameters(legalName = CordaX500Name("responder", "Reading", "GB")))
mockNetwork.runNetwork()
}
@After
fun tearDown() {
mockNetwork.stopNodes()
}
@Test
fun `startup fails when two flows initiated by the same flow are registered`() {
// register the same flow twice to invoke the error without causing errors in other tests
responder.registerInitiatedFlow(Responder::class.java)
assertThatIllegalStateException().isThrownBy { responder.registerInitiatedFlow(Responder::class.java) }
}
@Test
fun `a single initiated flow can be registered without error`() {
responder.registerInitiatedFlow(Responder::class.java)
val result = initiator.startFlow(Initiator(responder.info.singleIdentity()))
mockNetwork.runNetwork()
assertNotNull(result.get())
}
}
@InitiatingFlow
class Initiator(val party: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
return initiateFlow(party).sendAndReceive<String>("Hello there").unwrap { it }
}
}
@InitiatedBy(Initiator::class)
private class Responder(val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
session.send("What's up")
}
}

View File

@ -53,6 +53,7 @@ class FlowFrameworkTests {
init {
LogHelper.setLevel("+net.corda.flow")
}
}
private lateinit var mockNet: InternalMockNetwork
private lateinit var aliceNode: TestStartedNode
@ -62,9 +63,8 @@ class FlowFrameworkTests {
private lateinit var notaryIdentity: Party
private val receivedSessionMessages = ArrayList<SessionTransfer>()
@BeforeClass
@JvmStatic
fun beforeClass() {
@Before
fun setUpMockNet() {
mockNet = InternalMockNetwork(
cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"),
servicePeerAllocationStrategy = RoundRobin()
@ -85,15 +85,9 @@ class FlowFrameworkTests {
return mockNet.messagingNetwork.receivedMessages.toSessionTransfers()
}
@AfterClass @JvmStatic
fun afterClass() {
mockNet.stopNodes()
}
}
@After
fun cleanUp() {
mockNet.stopNodes()
receivedSessionMessages.clear()
}
@ -474,10 +468,10 @@ class FlowFrameworkTripartyTests {
private lateinit var charlie: Party
private lateinit var notaryIdentity: Party
private val receivedSessionMessages = ArrayList<SessionTransfer>()
}
@BeforeClass
@JvmStatic
fun beforeClass() {
@Before
fun setUpGlobalMockNet() {
mockNet = InternalMockNetwork(
cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"),
servicePeerAllocationStrategy = RoundRobin()
@ -497,23 +491,16 @@ class FlowFrameworkTripartyTests {
receivedSessionMessagesObservable().forEach { receivedSessionMessages += it }
}
@AfterClass @JvmStatic
fun afterClass() {
@After
fun cleanUp() {
mockNet.stopNodes()
receivedSessionMessages.clear()
}
private fun receivedSessionMessagesObservable(): Observable<SessionTransfer> {
return mockNet.messagingNetwork.receivedMessages.toSessionTransfers()
}
}
@After
fun cleanUp() {
receivedSessionMessages.clear()
}
@Test
fun `sending to multiple parties`() {
bobNode.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }