CORDA-654: Move from chooseIdentity() to singleIdentity() (#1819)

Move from `chooseIdentity()` to `singleIdentity()` where tests use a single identity, or to fetching by name where multiple identities are present, so we stop using the first identity as special.
This commit is contained in:
Ross Nicoll 2017-10-16 15:51:26 +01:00 committed by GitHub
parent d8b81f755a
commit 63b7eb3f70
11 changed files with 329 additions and 301 deletions

View File

@ -40,12 +40,12 @@ class IdentitySyncFlowTests {
@Test
fun `sync confidential identities`() {
// Set up values we'll need
mockNet.createNotaryNode()
val aliceNode = mockNet.createPartyNode(ALICE.name)
val bobNode = mockNet.createPartyNode(BOB.name)
val alice: Party = aliceNode.services.myInfo.chooseIdentity()
val bob: Party = bobNode.services.myInfo.chooseIdentity()
val notary = aliceNode.services.getDefaultNotary()
val notaryNode = mockNet.createNotaryNode()
val aliceNode = mockNet.createPartyNode(ALICE_NAME)
val bobNode = mockNet.createPartyNode(BOB_NAME)
val alice: Party = aliceNode.info.singleIdentity()
val bob: Party = bobNode.info.singleIdentity()
val notary = notaryNode.services.getDefaultNotary()
bobNode.internals.registerInitiatedFlow(Receive::class.java)
// Alice issues then pays some cash to a new confidential identity that Bob doesn't know about
@ -71,12 +71,12 @@ class IdentitySyncFlowTests {
fun `don't offer other's identities confidential identities`() {
// Set up values we'll need
val notaryNode = mockNet.createNotaryNode()
val aliceNode = mockNet.createPartyNode(ALICE.name)
val bobNode = mockNet.createPartyNode(BOB.name)
val charlieNode = mockNet.createPartyNode(CHARLIE.name)
val alice: Party = aliceNode.services.myInfo.chooseIdentity()
val bob: Party = bobNode.services.myInfo.chooseIdentity()
val charlie: Party = charlieNode.services.myInfo.chooseIdentity()
val aliceNode = mockNet.createPartyNode(ALICE_NAME)
val bobNode = mockNet.createPartyNode(BOB_NAME)
val charlieNode = mockNet.createPartyNode(CHARLIE_NAME)
val alice: Party = aliceNode.info.singleIdentity()
val bob: Party = bobNode.info.singleIdentity()
val charlie: Party = charlieNode.info.singleIdentity()
val notary = notaryNode.services.getDefaultNotary()
bobNode.internals.registerInitiatedFlow(Receive::class.java)

View File

@ -19,8 +19,8 @@ class SwapIdentitiesFlowTests {
val notaryNode = mockNet.createNotaryNode()
val aliceNode = mockNet.createPartyNode(ALICE.name)
val bobNode = mockNet.createPartyNode(BOB.name)
val alice = aliceNode.services.myInfo.identityFromX500Name(ALICE_NAME)
val bob = bobNode.services.myInfo.identityFromX500Name(BOB_NAME)
val alice = aliceNode.info.singleIdentity()
val bob = bobNode.services.myInfo.singleIdentity()
// Run the flows
val requesterFlow = aliceNode.services.startFlow(SwapIdentitiesFlow(bob))
@ -59,7 +59,7 @@ class SwapIdentitiesFlowTests {
val notaryNode = mockNet.createNotaryNode(DUMMY_NOTARY.name)
val aliceNode = mockNet.createPartyNode(ALICE.name)
val bobNode = mockNet.createPartyNode(BOB.name)
val bob: Party = bobNode.services.myInfo.chooseIdentity()
val bob: Party = bobNode.services.myInfo.singleIdentity()
val notBob = notaryNode.database.transaction {
notaryNode.services.keyManagementService.freshKeyAndCert(notaryNode.services.myInfo.chooseIdentityAndCert(), false)
}
@ -84,7 +84,7 @@ class SwapIdentitiesFlowTests {
val notaryNode = mockNet.createNotaryNode(DUMMY_NOTARY.name)
val aliceNode = mockNet.createPartyNode(ALICE.name)
val bobNode = mockNet.createPartyNode(BOB.name)
val bob: Party = bobNode.services.myInfo.chooseIdentity()
val bob: Party = bobNode.services.myInfo.singleIdentity()
// Check that the wrong signature is rejected
notaryNode.database.transaction {
notaryNode.services.keyManagementService.freshKeyAndCert(notaryNode.services.myInfo.chooseIdentityAndCert(), false)

View File

@ -17,6 +17,7 @@ import net.corda.testing.ALICE
import net.corda.testing.ALICE_NAME
import net.corda.testing.BOB
import net.corda.testing.node.MockNetwork
import net.corda.testing.singleIdentity
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -60,7 +61,7 @@ class AttachmentTests {
// Ensure that registration was successful before progressing any further
mockNet.runNetwork()
aliceNode.internals.ensureRegistered()
val alice = aliceNode.services.myInfo.identityFromX500Name(ALICE_NAME)
val alice = aliceNode.info.singleIdentity()
aliceNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
@ -105,7 +106,7 @@ class AttachmentTests {
// Get node one to fetch a non-existent attachment.
val hash = SecureHash.randomSHA256()
mockNet.runNetwork()
val alice = aliceNode.services.myInfo.identityFromX500Name(ALICE_NAME)
val alice = aliceNode.info.singleIdentity()
val bobFlow = bobNode.startAttachmentFlow(setOf(hash), alice)
mockNet.runNetwork()
val e = assertFailsWith<FetchDataFlow.HashNotFound> { bobFlow.resultFuture.getOrThrow() }

View File

@ -45,10 +45,10 @@ class CollectSignaturesFlowTests {
charlieNode = mockNet.createPartyNode(CHARLIE.name)
mockNet.runNetwork()
aliceNode.internals.ensureRegistered()
alice = aliceNode.services.myInfo.identityFromX500Name(ALICE_NAME)
bob = bobNode.services.myInfo.identityFromX500Name(BOB_NAME)
charlie = charlieNode.services.myInfo.identityFromX500Name(CHARLIE_NAME)
notary = notaryNode.services.networkMapCache.getNotary(DUMMY_NOTARY_SERVICE_NAME)!!
alice = aliceNode.info.singleIdentity()
bob = bobNode.info.singleIdentity()
charlie = charlieNode.info.singleIdentity()
notary = notaryNode.services.getDefaultNotary()
}
@After

View File

@ -33,9 +33,9 @@ class FinalityFlowTests {
aliceNode.internals.ensureRegistered()
aliceServices = aliceNode.services
bobServices = bobNode.services
alice = aliceNode.services.myInfo.identityFromX500Name(ALICE_NAME)
bob = bobNode.services.myInfo.identityFromX500Name(BOB_NAME)
notary = notaryNode.services.networkMapCache.getNotary(DUMMY_NOTARY_SERVICE_NAME)!!
alice = aliceNode.info.singleIdentity()
bob = bobNode.info.singleIdentity()
notary = notaryNode.services.getDefaultNotary()
}
@After

View File

@ -3,13 +3,12 @@ package net.corda.node.services.vault
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.node.services.Vault
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.OpaqueBytes
import org.hibernate.annotations.Generated
import org.hibernate.annotations.GenerationTime
import java.io.Serializable
import java.time.Instant
import java.util.*
@ -32,7 +31,7 @@ object VaultSchemaV1 : MappedSchema(schemaFamily = VaultSchema.javaClass, versio
class VaultStates(
/** refers to the X500Name of the notary a state is attached to */
@Column(name = "notary_name")
var notary: AbstractParty,
var notary: Party,
/** references a concrete ContractState that is [QueryableState] and has a [MappedSchema] */
@Column(name = "contract_state_class_name")

View File

@ -31,7 +31,6 @@ import net.corda.finance.`issued by`
import net.corda.finance.contracts.CommercialPaper
import net.corda.finance.contracts.asset.CASH
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.contracts.asset.ownedBy
import net.corda.finance.flows.TwoPartyTradeFlow.Buyer
import net.corda.finance.flows.TwoPartyTradeFlow.Seller
import net.corda.node.internal.StartedNode
@ -103,12 +102,14 @@ class TwoPartyTradeFlowTests(val anonymous: Boolean) {
mockNet = MockNetwork(false, true, cordappPackages = cordappPackages)
ledger(MockServices(cordappPackages), initialiseSerialization = false) {
val notaryNode = mockNet.createNotaryNode()
val aliceNode = mockNet.createPartyNode(ALICE.name)
val bobNode = mockNet.createPartyNode(BOB.name)
val bankNode = mockNet.createPartyNode(BOC.name)
val aliceNode = mockNet.createPartyNode(ALICE_NAME)
val bobNode = mockNet.createPartyNode(BOB_NAME)
val bankNode = mockNet.createPartyNode(BOC_NAME)
val alice = aliceNode.info.singleIdentity()
val bank = bankNode.info.singleIdentity()
val notary = notaryNode.services.getDefaultNotary()
val cashIssuer = bankNode.info.chooseIdentity().ref(1)
val cpIssuer = bankNode.info.chooseIdentity().ref(1, 2, 3)
val cashIssuer = bank.ref(1)
val cpIssuer = bank.ref(1, 2, 3)
aliceNode.internals.disableDBCloseOnStop()
bobNode.internals.disableDBCloseOnStop()
@ -119,8 +120,8 @@ class TwoPartyTradeFlowTests(val anonymous: Boolean) {
}
val alicesFakePaper = aliceNode.database.transaction {
fillUpForSeller(false, cpIssuer, aliceNode.info.chooseIdentity(),
1200.DOLLARS `issued by` bankNode.info.chooseIdentity().ref(0), null, notary).second
fillUpForSeller(false, cpIssuer, alice,
1200.DOLLARS `issued by` bank.ref(0), null, notary).second
}
insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, bankNode)
@ -151,10 +152,12 @@ class TwoPartyTradeFlowTests(val anonymous: Boolean) {
mockNet = MockNetwork(false, true, cordappPackages = cordappPackages)
ledger(MockServices(cordappPackages), initialiseSerialization = false) {
val notaryNode = mockNet.createNotaryNode()
val aliceNode = mockNet.createPartyNode(ALICE.name)
val bobNode = mockNet.createPartyNode(BOB.name)
val bankNode = mockNet.createPartyNode(BOC.name)
val issuer = bankNode.info.chooseIdentity().ref(1)
val aliceNode = mockNet.createPartyNode(ALICE_NAME)
val bobNode = mockNet.createPartyNode(BOB_NAME)
val bankNode = mockNet.createPartyNode(BOC_NAME)
val alice = aliceNode.info.singleIdentity()
val bank = bankNode.info.singleIdentity()
val issuer = bank.ref(1)
val notary = aliceNode.services.getDefaultNotary()
aliceNode.internals.disableDBCloseOnStop()
@ -166,8 +169,8 @@ class TwoPartyTradeFlowTests(val anonymous: Boolean) {
}
val alicesFakePaper = aliceNode.database.transaction {
fillUpForSeller(false, issuer, aliceNode.info.chooseIdentity(),
1200.DOLLARS `issued by` bankNode.info.chooseIdentity().ref(0), null, notary).second
fillUpForSeller(false, issuer, alice,
1200.DOLLARS `issued by` bank.ref(0), null, notary).second
}
insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, bankNode)
@ -205,31 +208,27 @@ class TwoPartyTradeFlowTests(val anonymous: Boolean) {
mockNet = MockNetwork(false, cordappPackages = cordappPackages)
ledger(MockServices(cordappPackages), initialiseSerialization = false) {
val notaryNode = mockNet.createNotaryNode()
val aliceNode = mockNet.createPartyNode(ALICE.name)
var bobNode = mockNet.createPartyNode(BOB.name)
val bankNode = mockNet.createPartyNode(BOC.name)
val issuer = bankNode.info.chooseIdentity().ref(1, 2, 3)
aliceNode.database.transaction {
aliceNode.services.identityService.verifyAndRegisterIdentity(bobNode.info.chooseIdentityAndCert())
}
bobNode.database.transaction {
bobNode.services.identityService.verifyAndRegisterIdentity(aliceNode.info.chooseIdentityAndCert())
}
val aliceNode = mockNet.createPartyNode(ALICE_NAME)
var bobNode = mockNet.createPartyNode(BOB_NAME)
val bankNode = mockNet.createPartyNode(BOC_NAME)
aliceNode.internals.disableDBCloseOnStop()
bobNode.internals.disableDBCloseOnStop()
val bobAddr = bobNode.network.myAddress as InMemoryMessagingNetwork.PeerHandle
mockNet.runNetwork() // Clear network map registration messages
val notary = aliceNode.services.getDefaultNotary()
val notary = notaryNode.services.getDefaultNotary()
val alice = aliceNode.info.singleIdentity()
val bank = bankNode.info.singleIdentity()
val issuer = bank.ref(1, 2, 3)
bobNode.database.transaction {
bobNode.services.fillWithSomeTestCash(2000.DOLLARS, bankNode.services, outputNotary = notary,
issuedBy = issuer)
}
val alicesFakePaper = aliceNode.database.transaction {
fillUpForSeller(false, issuer, aliceNode.info.chooseIdentity(),
1200.DOLLARS `issued by` bankNode.info.chooseIdentity().ref(0), null, notary).second
fillUpForSeller(false, issuer, alice,
1200.DOLLARS `issued by` bank.ref(0), null, notary).second
}
insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, bankNode)
val aliceFuture = runBuyerAndSeller(notary, aliceNode, bobNode, "alice's paper".outputStateAndRef()).sellerResult
@ -274,7 +273,7 @@ class TwoPartyTradeFlowTests(val anonymous: Boolean) {
id: Int, notaryIdentity: Pair<ServiceInfo, KeyPair>?, entropyRoot: BigInteger): MockNetwork.MockNode {
return MockNetwork.MockNode(config, network, networkMapAddr, bobAddr.id, notaryIdentity, entropyRoot)
}
}, BOB.name)
}, BOB_NAME)
// Find the future representing the result of this state machine again.
val bobFuture = bobNode.smm.findStateMachines(BuyerAcceptor::class.java).single().second
@ -329,13 +328,16 @@ class TwoPartyTradeFlowTests(val anonymous: Boolean) {
fun `check dependencies of sale asset are resolved`() {
mockNet = MockNetwork(false, cordappPackages = cordappPackages)
val notaryNode = mockNet.createNotaryNode()
val aliceNode = makeNodeWithTracking(ALICE.name)
val bobNode = makeNodeWithTracking(BOB.name)
val bankNode = makeNodeWithTracking(BOC.name)
val issuer = bankNode.info.chooseIdentity().ref(1, 2, 3)
val aliceNode = makeNodeWithTracking(ALICE_NAME)
val bobNode = makeNodeWithTracking(BOB_NAME)
val bankNode = makeNodeWithTracking(BOC_NAME)
mockNet.runNetwork()
notaryNode.internals.ensureRegistered()
val notary = aliceNode.services.getDefaultNotary()
val alice = aliceNode.info.singleIdentity()
val bob = bobNode.info.singleIdentity()
val bank = bankNode.info.singleIdentity()
val issuer = bank.ref(1, 2, 3)
ledger(aliceNode.services, initialiseSerialization = false) {
@ -351,12 +353,12 @@ class TwoPartyTradeFlowTests(val anonymous: Boolean) {
}
val bobsFakeCash = bobNode.database.transaction {
fillUpForBuyer(false, issuer, AnonymousParty(bobNode.info.chooseIdentity().owningKey), notary)
fillUpForBuyer(false, issuer, AnonymousParty(bob.owningKey), notary)
}.second
val bobsSignedTxns = insertFakeTransactions(bobsFakeCash, bobNode, notaryNode, bankNode)
val alicesFakePaper = aliceNode.database.transaction {
fillUpForSeller(false, issuer, aliceNode.info.chooseIdentity(),
1200.DOLLARS `issued by` bankNode.info.chooseIdentity().ref(0), attachmentID, notary).second
fillUpForSeller(false, issuer, alice,
1200.DOLLARS `issued by` bank.ref(0), attachmentID, notary).second
}
val alicesSignedTxns = insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, bankNode)
@ -433,14 +435,16 @@ class TwoPartyTradeFlowTests(val anonymous: Boolean) {
fun `track works`() {
mockNet = MockNetwork(false, cordappPackages = cordappPackages)
val notaryNode = mockNet.createNotaryNode()
val aliceNode = makeNodeWithTracking(ALICE.name)
val bobNode = makeNodeWithTracking(BOB.name)
val bankNode = makeNodeWithTracking(BOC.name)
val issuer = bankNode.info.chooseIdentity().ref(1, 2, 3)
val aliceNode = makeNodeWithTracking(ALICE_NAME)
val bobNode = makeNodeWithTracking(BOB_NAME)
val bankNode = makeNodeWithTracking(BOC_NAME)
mockNet.runNetwork()
notaryNode.internals.ensureRegistered()
val notary = aliceNode.services.getDefaultNotary()
val alice: Party = aliceNode.info.singleIdentity()
val bank: Party = bankNode.info.singleIdentity()
val issuer = bank.ref(1, 2, 3)
ledger(aliceNode.services, initialiseSerialization = false) {
// Insert a prospectus type attachment into the commercial paper transaction.
@ -461,8 +465,8 @@ class TwoPartyTradeFlowTests(val anonymous: Boolean) {
insertFakeTransactions(bobsFakeCash, bobNode, notaryNode, bankNode)
val alicesFakePaper = aliceNode.database.transaction {
fillUpForSeller(false, issuer, aliceNode.info.chooseIdentity(),
1200.DOLLARS `issued by` bankNode.info.chooseIdentity().ref(0), attachmentID, notary).second
fillUpForSeller(false, issuer, alice,
1200.DOLLARS `issued by` bank.ref(0), attachmentID, notary).second
}
insertFakeTransactions(alicesFakePaper, aliceNode, notaryNode, bankNode)
@ -587,22 +591,23 @@ class TwoPartyTradeFlowTests(val anonymous: Boolean) {
expectedMessageSubstring: String
) {
val notaryNode = mockNet.createNotaryNode()
val aliceNode = mockNet.createPartyNode(ALICE.name)
val bobNode = mockNet.createPartyNode(BOB.name)
val bankNode = mockNet.createPartyNode(BOC.name)
val issuer = bankNode.info.chooseIdentity().ref(1, 2, 3)
val aliceNode = mockNet.createPartyNode(ALICE_NAME)
val bobNode = mockNet.createPartyNode(BOB_NAME)
val bankNode = mockNet.createPartyNode(BOC_NAME)
mockNet.runNetwork()
notaryNode.internals.ensureRegistered()
val notary = aliceNode.services.getDefaultNotary()
val alice = aliceNode.info.singleIdentity()
val bob = bobNode.info.singleIdentity()
val bank = bankNode.info.singleIdentity()
val issuer = bank.ref(1, 2, 3)
val bobsBadCash = bobNode.database.transaction {
fillUpForBuyer(bobError, issuer, bobNode.info.chooseIdentity(),
notary).second
fillUpForBuyer(bobError, issuer, bob, notary).second
}
val alicesFakePaper = aliceNode.database.transaction {
fillUpForSeller(aliceError, issuer, aliceNode.info.chooseIdentity(),
1200.DOLLARS `issued by` issuer, null, notary).second
fillUpForSeller(aliceError, issuer, alice,1200.DOLLARS `issued by` issuer, null, notary).second
}
insertFakeTransactions(bobsBadCash, bobNode, notaryNode, bankNode)

View File

@ -58,28 +58,33 @@ class FlowFrameworkTests {
private lateinit var mockNet: MockNetwork
private val receivedSessionMessages = ArrayList<SessionTransfer>()
private lateinit var node1: StartedNode<MockNode>
private lateinit var node2: StartedNode<MockNode>
private lateinit var notary: StartedNode<MockNode>
private lateinit var aliceNode: StartedNode<MockNode>
private lateinit var bobNode: StartedNode<MockNode>
private lateinit var notaryIdentity: Party
private lateinit var alice: Party
private lateinit var bob: Party
@Before
fun start() {
mockNet = MockNetwork(servicePeerAllocationStrategy = RoundRobin(), cordappPackages = listOf("net.corda.finance.contracts", "net.corda.testing.contracts"))
node1 = mockNet.createNode()
node2 = mockNet.createNode()
aliceNode = mockNet.createNode(legalName = ALICE_NAME)
bobNode = mockNet.createNode(legalName = BOB_NAME)
mockNet.runNetwork()
node1.internals.ensureRegistered()
aliceNode.internals.ensureRegistered()
// We intentionally create our own notary and ignore the one provided by the network
// Note that these notaries don't operate correctly as they don't share their state. They are only used for testing
// service addressing.
notary = mockNet.createNotaryNode()
val notary = mockNet.createNotaryNode()
receivedSessionMessagesObservable().forEach { receivedSessionMessages += it }
mockNet.runNetwork()
notaryIdentity = notary.services.myInfo.legalIdentities[1]
// Extract identities
alice = aliceNode.info.singleIdentity()
bob = bobNode.info.singleIdentity()
notaryIdentity = notary.services.getDefaultNotary()
}
@After
@ -90,24 +95,24 @@ class FlowFrameworkTests {
@Test
fun `newly added flow is preserved on restart`() {
node1.services.startFlow(NoOpFlow(nonTerminating = true))
node1.internals.acceptableLiveFiberCountOnStop = 1
val restoredFlow = node1.restartAndGetRestoredFlow<NoOpFlow>()
aliceNode.services.startFlow(NoOpFlow(nonTerminating = true))
aliceNode.internals.acceptableLiveFiberCountOnStop = 1
val restoredFlow = aliceNode.restartAndGetRestoredFlow<NoOpFlow>()
assertThat(restoredFlow.flowStarted).isTrue()
}
@Test
fun `flow can lazily use the serviceHub in its constructor`() {
val flow = LazyServiceHubAccessFlow()
node1.services.startFlow(flow)
aliceNode.services.startFlow(flow)
assertThat(flow.lazyTime).isNotNull()
}
@Test
fun `exception while fiber suspended`() {
node2.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
val flow = ReceiveFlow(node2.info.chooseIdentity())
val fiber = node1.services.startFlow(flow) as FlowStateMachineImpl
bobNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
val flow = ReceiveFlow(bob)
val fiber = aliceNode.services.startFlow(flow) as FlowStateMachineImpl
// Before the flow runs change the suspend action to throw an exception
val exceptionDuringSuspend = Exception("Thrown during suspend")
fiber.actionOnSuspend = {
@ -117,31 +122,31 @@ class FlowFrameworkTests {
assertThatThrownBy {
fiber.resultFuture.getOrThrow()
}.isSameAs(exceptionDuringSuspend)
assertThat(node1.smm.allStateMachines).isEmpty()
assertThat(aliceNode.smm.allStateMachines).isEmpty()
// Make sure the fiber does actually terminate
assertThat(fiber.isTerminated).isTrue()
}
@Test
fun `flow restarted just after receiving payload`() {
node2.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }
node1.services.startFlow(SendFlow("Hello", node2.info.chooseIdentity()))
bobNode.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }
aliceNode.services.startFlow(SendFlow("Hello", bob))
// We push through just enough messages to get only the payload sent
node2.pumpReceive()
node2.internals.disableDBCloseOnStop()
node2.internals.acceptableLiveFiberCountOnStop = 1
node2.dispose()
bobNode.pumpReceive()
bobNode.internals.disableDBCloseOnStop()
bobNode.internals.acceptableLiveFiberCountOnStop = 1
bobNode.dispose()
mockNet.runNetwork()
val restoredFlow = node2.restartAndGetRestoredFlow<InitiatedReceiveFlow>()
val restoredFlow = bobNode.restartAndGetRestoredFlow<InitiatedReceiveFlow>()
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
}
@Test
fun `flow added before network map does run after init`() {
val node3 = mockNet.createNode() //create vanilla node
val charlieNode = mockNet.createNode() //create vanilla node
val flow = NoOpFlow()
node3.services.startFlow(flow)
charlieNode.services.startFlow(flow)
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
mockNet.runNetwork() // Allow network map messages to flow
assertEquals(true, flow.flowStarted) // Now we should have run the flow
@ -149,40 +154,40 @@ class FlowFrameworkTests {
@Test
fun `flow added before network map will be init checkpointed`() {
var node3 = mockNet.createNode() //create vanilla node
var charlieNode = mockNet.createNode() //create vanilla node
val flow = NoOpFlow()
node3.services.startFlow(flow)
charlieNode.services.startFlow(flow)
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
node3.internals.disableDBCloseOnStop()
node3.services.networkMapCache.clearNetworkMapCache() // zap persisted NetworkMapCache to force use of network.
node3.dispose()
charlieNode.internals.disableDBCloseOnStop()
charlieNode.services.networkMapCache.clearNetworkMapCache() // zap persisted NetworkMapCache to force use of network.
charlieNode.dispose()
node3 = mockNet.createNode(node3.internals.id)
val restoredFlow = node3.getSingleFlow<NoOpFlow>().first
charlieNode = mockNet.createNode(charlieNode.internals.id)
val restoredFlow = charlieNode.getSingleFlow<NoOpFlow>().first
assertEquals(false, restoredFlow.flowStarted) // Not started yet as no network activity has been allowed yet
mockNet.runNetwork() // Allow network map messages to flow
node3.smm.executor.flush()
charlieNode.smm.executor.flush()
assertEquals(true, restoredFlow.flowStarted) // Now we should have run the flow and hopefully cleared the init checkpoint
node3.internals.disableDBCloseOnStop()
node3.services.networkMapCache.clearNetworkMapCache() // zap persisted NetworkMapCache to force use of network.
node3.dispose()
charlieNode.internals.disableDBCloseOnStop()
charlieNode.services.networkMapCache.clearNetworkMapCache() // zap persisted NetworkMapCache to force use of network.
charlieNode.dispose()
// Now it is completed the flow should leave no Checkpoint.
node3 = mockNet.createNode(node3.internals.id)
charlieNode = mockNet.createNode(charlieNode.internals.id)
mockNet.runNetwork() // Allow network map messages to flow
node3.smm.executor.flush()
assertTrue(node3.smm.findStateMachines(NoOpFlow::class.java).isEmpty())
charlieNode.smm.executor.flush()
assertTrue(charlieNode.smm.findStateMachines(NoOpFlow::class.java).isEmpty())
}
@Test
fun `flow loaded from checkpoint will respond to messages from before start`() {
node1.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
node2.services.startFlow(ReceiveFlow(node1.info.chooseIdentity()).nonTerminating()) // Prepare checkpointed receive flow
aliceNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow
// Make sure the add() has finished initial processing.
node2.smm.executor.flush()
node2.internals.disableDBCloseOnStop()
node2.dispose() // kill receiver
val restoredFlow = node2.restartAndGetRestoredFlow<ReceiveFlow>()
bobNode.smm.executor.flush()
bobNode.internals.disableDBCloseOnStop()
bobNode.dispose() // kill receiver
val restoredFlow = bobNode.restartAndGetRestoredFlow<ReceiveFlow>()
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
}
@ -194,26 +199,27 @@ class FlowFrameworkTests {
var sentCount = 0
mockNet.messagingNetwork.sentMessages.toSessionTransfers().filter { it.isPayloadTransfer }.forEach { sentCount++ }
val node3 = mockNet.createNode()
val secondFlow = node3.registerFlowFactory(PingPongFlow::class) { PingPongFlow(it, payload2) }
val charlieNode = mockNet.createNode(legalName = CHARLIE_NAME)
val secondFlow = charlieNode.registerFlowFactory(PingPongFlow::class) { PingPongFlow(it, payload2) }
mockNet.runNetwork()
val charlie = charlieNode.info.singleIdentity()
// Kick off first send and receive
node2.services.startFlow(PingPongFlow(node3.info.chooseIdentity(), payload))
node2.database.transaction {
assertEquals(1, node2.checkpointStorage.checkpoints().size)
bobNode.services.startFlow(PingPongFlow(charlie, payload))
bobNode.database.transaction {
assertEquals(1, bobNode.checkpointStorage.checkpoints().size)
}
// Make sure the add() has finished initial processing.
node2.smm.executor.flush()
node2.internals.disableDBCloseOnStop()
bobNode.smm.executor.flush()
bobNode.internals.disableDBCloseOnStop()
// Restart node and thus reload the checkpoint and resend the message with same UUID
node2.dispose()
node2.database.transaction {
assertEquals(1, node2.checkpointStorage.checkpoints().size) // confirm checkpoint
node2.services.networkMapCache.clearNetworkMapCache()
bobNode.dispose()
bobNode.database.transaction {
assertEquals(1, bobNode.checkpointStorage.checkpoints().size) // confirm checkpoint
bobNode.services.networkMapCache.clearNetworkMapCache()
}
val node2b = mockNet.createNode(node2.internals.id)
node2.internals.manuallyCloseDB()
val node2b = mockNet.createNode(bobNode.internals.id)
bobNode.internals.manuallyCloseDB()
val (firstAgain, fut1) = node2b.getSingleFlow<PingPongFlow>()
// Run the network which will also fire up the second flow. First message should get deduped. So message data stays in sync.
mockNet.runNetwork()
@ -228,8 +234,8 @@ class FlowFrameworkTests {
node2b.database.transaction {
assertEquals(0, node2b.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
}
node3.database.transaction {
assertEquals(0, node3.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
charlieNode.database.transaction {
assertEquals(0, charlieNode.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
}
assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3")
assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3")
@ -239,87 +245,89 @@ class FlowFrameworkTests {
@Test
fun `sending to multiple parties`() {
val node3 = mockNet.createNode()
val charlieNode = mockNet.createNode(legalName = CHARLIE_NAME)
mockNet.runNetwork()
node2.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }
node3.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }
val charlie = charlieNode.info.singleIdentity()
bobNode.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }
charlieNode.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }
val payload = "Hello World"
node1.services.startFlow(SendFlow(payload, node2.info.chooseIdentity(), node3.info.chooseIdentity()))
aliceNode.services.startFlow(SendFlow(payload, bob, charlie))
mockNet.runNetwork()
val node2Flow = node2.getSingleFlow<InitiatedReceiveFlow>().first
val node3Flow = node3.getSingleFlow<InitiatedReceiveFlow>().first
assertThat(node2Flow.receivedPayloads[0]).isEqualTo(payload)
assertThat(node3Flow.receivedPayloads[0]).isEqualTo(payload)
val bobFlow = bobNode.getSingleFlow<InitiatedReceiveFlow>().first
val charlieFlow = charlieNode.getSingleFlow<InitiatedReceiveFlow>().first
assertThat(bobFlow.receivedPayloads[0]).isEqualTo(payload)
assertThat(charlieFlow.receivedPayloads[0]).isEqualTo(payload)
assertSessionTransfers(node2,
node1 sent sessionInit(SendFlow::class, payload = payload) to node2,
node2 sent sessionConfirm() to node1,
node1 sent normalEnd to node2
assertSessionTransfers(bobNode,
aliceNode sent sessionInit(SendFlow::class, payload = payload) to bobNode,
bobNode sent sessionConfirm() to aliceNode,
aliceNode sent normalEnd to bobNode
//There's no session end from the other flows as they're manually suspended
)
assertSessionTransfers(node3,
node1 sent sessionInit(SendFlow::class, payload = payload) to node3,
node3 sent sessionConfirm() to node1,
node1 sent normalEnd to node3
assertSessionTransfers(charlieNode,
aliceNode sent sessionInit(SendFlow::class, payload = payload) to charlieNode,
charlieNode sent sessionConfirm() to aliceNode,
aliceNode sent normalEnd to charlieNode
//There's no session end from the other flows as they're manually suspended
)
node2.internals.acceptableLiveFiberCountOnStop = 1
node3.internals.acceptableLiveFiberCountOnStop = 1
bobNode.internals.acceptableLiveFiberCountOnStop = 1
charlieNode.internals.acceptableLiveFiberCountOnStop = 1
}
@Test
fun `receiving from multiple parties`() {
val node3 = mockNet.createNode()
val charlieNode = mockNet.createNode(legalName = CHARLIE_NAME)
mockNet.runNetwork()
val node2Payload = "Test 1"
val node3Payload = "Test 2"
node2.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(node2Payload, it) }
node3.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(node3Payload, it) }
val multiReceiveFlow = ReceiveFlow(node2.info.chooseIdentity(), node3.info.chooseIdentity()).nonTerminating()
node1.services.startFlow(multiReceiveFlow)
node1.internals.acceptableLiveFiberCountOnStop = 1
val charlie = charlieNode.info.singleIdentity()
val bobPayload = "Test 1"
val charliePayload = "Test 2"
bobNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(bobPayload, it) }
charlieNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(charliePayload, it) }
val multiReceiveFlow = ReceiveFlow(bob, charlie).nonTerminating()
aliceNode.services.startFlow(multiReceiveFlow)
aliceNode.internals.acceptableLiveFiberCountOnStop = 1
mockNet.runNetwork()
assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(node2Payload)
assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(node3Payload)
assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(bobPayload)
assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(charliePayload)
assertSessionTransfers(node2,
node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm() to node1,
node2 sent sessionData(node2Payload) to node1,
node2 sent normalEnd to node1
assertSessionTransfers(bobNode,
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
bobNode sent sessionConfirm() to aliceNode,
bobNode sent sessionData(bobPayload) to aliceNode,
bobNode sent normalEnd to aliceNode
)
assertSessionTransfers(node3,
node1 sent sessionInit(ReceiveFlow::class) to node3,
node3 sent sessionConfirm() to node1,
node3 sent sessionData(node3Payload) to node1,
node3 sent normalEnd to node1
assertSessionTransfers(charlieNode,
aliceNode sent sessionInit(ReceiveFlow::class) to charlieNode,
charlieNode sent sessionConfirm() to aliceNode,
charlieNode sent sessionData(charliePayload) to aliceNode,
charlieNode sent normalEnd to aliceNode
)
}
@Test
fun `both sides do a send as their first IO request`() {
node2.registerFlowFactory(PingPongFlow::class) { PingPongFlow(it, 20L) }
node1.services.startFlow(PingPongFlow(node2.info.chooseIdentity(), 10L))
bobNode.registerFlowFactory(PingPongFlow::class) { PingPongFlow(it, 20L) }
aliceNode.services.startFlow(PingPongFlow(bob, 10L))
mockNet.runNetwork()
assertSessionTransfers(
node1 sent sessionInit(PingPongFlow::class, payload = 10L) to node2,
node2 sent sessionConfirm() to node1,
node2 sent sessionData(20L) to node1,
node1 sent sessionData(11L) to node2,
node2 sent sessionData(21L) to node1,
node1 sent normalEnd to node2,
node2 sent normalEnd to node1
aliceNode sent sessionInit(PingPongFlow::class, payload = 10L) to bobNode,
bobNode sent sessionConfirm() to aliceNode,
bobNode sent sessionData(20L) to aliceNode,
aliceNode sent sessionData(11L) to bobNode,
bobNode sent sessionData(21L) to aliceNode,
aliceNode sent normalEnd to bobNode,
bobNode sent normalEnd to aliceNode
)
}
@Test
fun `other side ends before doing expected send`() {
node2.registerFlowFactory(ReceiveFlow::class) { NoOpFlow() }
val resultFuture = node1.services.startFlow(ReceiveFlow(node2.info.chooseIdentity())).resultFuture
bobNode.registerFlowFactory(ReceiveFlow::class) { NoOpFlow() }
val resultFuture = aliceNode.services.startFlow(ReceiveFlow(bob)).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
resultFuture.getOrThrow()
@ -328,11 +336,11 @@ class FlowFrameworkTests {
@Test
fun `receiving unexpected session end before entering sendAndReceive`() {
node2.registerFlowFactory(WaitForOtherSideEndBeforeSendAndReceive::class) { NoOpFlow() }
bobNode.registerFlowFactory(WaitForOtherSideEndBeforeSendAndReceive::class) { NoOpFlow() }
val sessionEndReceived = Semaphore(0)
receivedSessionMessagesObservable().filter { it.message is SessionEnd }.subscribe { sessionEndReceived.release() }
val resultFuture = node1.services.startFlow(
WaitForOtherSideEndBeforeSendAndReceive(node2.info.chooseIdentity(), sessionEndReceived)).resultFuture
val resultFuture = aliceNode.services.startFlow(
WaitForOtherSideEndBeforeSendAndReceive(bob, sessionEndReceived)).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
resultFuture.getOrThrow()
@ -355,14 +363,14 @@ class FlowFrameworkTests {
@Test
fun `non-FlowException thrown on other side`() {
val erroringFlowFuture = node2.registerFlowFactory(ReceiveFlow::class) {
val erroringFlowFuture = bobNode.registerFlowFactory(ReceiveFlow::class) {
ExceptionFlow { Exception("evil bug!") }
}
val erroringFlowSteps = erroringFlowFuture.flatMap { it.progressSteps }
val receiveFlow = ReceiveFlow(node2.info.chooseIdentity())
val receiveFlow = ReceiveFlow(bob)
val receiveFlowSteps = receiveFlow.progressSteps
val receiveFlowResult = node1.services.startFlow(receiveFlow).resultFuture
val receiveFlowResult = aliceNode.services.startFlow(receiveFlow).resultFuture
mockNet.runNetwork()
@ -381,20 +389,20 @@ class FlowFrameworkTests {
)
assertSessionTransfers(
node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm() to node1,
node2 sent erroredEnd() to node1
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
bobNode sent sessionConfirm() to aliceNode,
bobNode sent erroredEnd() to aliceNode
)
}
@Test
fun `FlowException thrown on other side`() {
val erroringFlow = node2.registerFlowFactory(ReceiveFlow::class) {
val erroringFlow = bobNode.registerFlowFactory(ReceiveFlow::class) {
ExceptionFlow { MyFlowException("Nothing useful") }
}
val erroringFlowSteps = erroringFlow.flatMap { it.progressSteps }
val receivingFiber = node1.services.startFlow(ReceiveFlow(node2.info.chooseIdentity())) as FlowStateMachineImpl
val receivingFiber = aliceNode.services.startFlow(ReceiveFlow(bob)) as FlowStateMachineImpl
mockNet.runNetwork()
@ -402,8 +410,8 @@ class FlowFrameworkTests {
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
.withMessage("Nothing useful")
.withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow
node2.database.transaction {
assertThat(node2.checkpointStorage.checkpoints()).isEmpty()
bobNode.database.transaction {
assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty()
}
assertThat(receivingFiber.isTerminated).isTrue()
@ -414,9 +422,9 @@ class FlowFrameworkTests {
)
assertSessionTransfers(
node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm() to node1,
node2 sent erroredEnd(erroringFlow.get().exceptionThrown) to node1
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
bobNode sent sessionConfirm() to aliceNode,
bobNode sent erroredEnd(erroringFlow.get().exceptionThrown) to aliceNode
)
// Make sure the original stack trace isn't sent down the wire
assertThat((receivedSessionMessages.last().message as ErrorSessionEnd).errorResponse!!.stackTrace).isEmpty()
@ -424,12 +432,13 @@ class FlowFrameworkTests {
@Test
fun `FlowException propagated in invocation chain`() {
val node3 = mockNet.createNode()
val charlieNode = mockNet.createNode(legalName = CHARLIE_NAME)
mockNet.runNetwork()
val charlie = charlieNode.info.singleIdentity()
node3.registerFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Chain") } }
node2.registerFlowFactory(ReceiveFlow::class) { ReceiveFlow(node3.info.chooseIdentity()) }
val receivingFiber = node1.services.startFlow(ReceiveFlow(node2.info.chooseIdentity()))
charlieNode.registerFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Chain") } }
bobNode.registerFlowFactory(ReceiveFlow::class) { ReceiveFlow(charlie) }
val receivingFiber = aliceNode.services.startFlow(ReceiveFlow(bob))
mockNet.runNetwork()
assertThatExceptionOfType(MyFlowException::class.java)
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
@ -438,34 +447,35 @@ class FlowFrameworkTests {
@Test
fun `FlowException thrown and there is a 3rd unrelated party flow`() {
val node3 = mockNet.createNode()
val charlieNode = mockNet.createNode(legalName = CHARLIE_NAME)
mockNet.runNetwork()
val charlie = charlieNode.info.singleIdentity()
// Node 2 will send its payload and then block waiting for the receive from node 1. Meanwhile node 1 will move
// onto node 3 which will throw the exception
val node2Fiber = node2
// Bob will send its payload and then block waiting for the receive from Alice. Meanwhile Alice will move
// onto Charlie which will throw the exception
val node2Fiber = bobNode
.registerFlowFactory(ReceiveFlow::class) { SendAndReceiveFlow(it, "Hello") }
.map { it.stateMachine }
node3.registerFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Nothing useful") } }
charlieNode.registerFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Nothing useful") } }
val node1Fiber = node1.services.startFlow(ReceiveFlow(node2.info.chooseIdentity(), node3.info.chooseIdentity())) as FlowStateMachineImpl
val aliceFiber = aliceNode.services.startFlow(ReceiveFlow(bob, charlie)) as FlowStateMachineImpl
mockNet.runNetwork()
// Node 1 will terminate with the error it received from node 3 but it won't propagate that to node 2 (as it's
// Alice will terminate with the error it received from Charlie but it won't propagate that to Bob (as it's
// not relevant to it) but it will end its session with it
assertThatExceptionOfType(MyFlowException::class.java).isThrownBy {
node1Fiber.resultFuture.getOrThrow()
aliceFiber.resultFuture.getOrThrow()
}
val node2ResultFuture = node2Fiber.getOrThrow().resultFuture
val bobResultFuture = node2Fiber.getOrThrow().resultFuture
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
node2ResultFuture.getOrThrow()
bobResultFuture.getOrThrow()
}
assertSessionTransfers(node2,
node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm() to node1,
node2 sent sessionData("Hello") to node1,
node1 sent erroredEnd() to node2
assertSessionTransfers(bobNode,
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
bobNode sent sessionConfirm() to aliceNode,
bobNode sent sessionData("Hello") to aliceNode,
aliceNode sent erroredEnd() to bobNode
)
}
@ -499,16 +509,16 @@ class FlowFrameworkTests {
}
}
node2.registerFlowFactory(AskForExceptionFlow::class) { ConditionalExceptionFlow(it, "Hello") }
val resultFuture = node1.services.startFlow(RetryOnExceptionFlow(node2.info.chooseIdentity())).resultFuture
bobNode.registerFlowFactory(AskForExceptionFlow::class) { ConditionalExceptionFlow(it, "Hello") }
val resultFuture = aliceNode.services.startFlow(RetryOnExceptionFlow(bob)).resultFuture
mockNet.runNetwork()
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello")
}
@Test
fun `serialisation issue in counterparty`() {
node2.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(NonSerialisableData(1), it) }
val result = node1.services.startFlow(ReceiveFlow(node2.info.chooseIdentity())).resultFuture
bobNode.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(NonSerialisableData(1), it) }
val result = aliceNode.services.startFlow(ReceiveFlow(bob)).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
result.getOrThrow()
@ -517,10 +527,10 @@ class FlowFrameworkTests {
@Test
fun `FlowException has non-serialisable object`() {
node2.registerFlowFactory(ReceiveFlow::class) {
bobNode.registerFlowFactory(ReceiveFlow::class) {
ExceptionFlow { NonSerialisableFlowException(NonSerialisableData(1)) }
}
val result = node1.services.startFlow(ReceiveFlow(node2.info.chooseIdentity())).resultFuture
val result = aliceNode.services.startFlow(ReceiveFlow(bob)).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(FlowException::class.java).isThrownBy {
result.getOrThrow()
@ -531,13 +541,13 @@ class FlowFrameworkTests {
fun `wait for transaction`() {
val ptx = TransactionBuilder(notary = notaryIdentity)
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(node1.info.chooseIdentity().owningKey))
val stx = node1.services.signInitialTransaction(ptx)
.addCommand(dummyCommand(alice.owningKey))
val stx = aliceNode.services.signInitialTransaction(ptx)
val committerFiber = node1.registerFlowFactory(WaitingFlows.Waiter::class) {
val committerFiber = aliceNode.registerFlowFactory(WaitingFlows.Waiter::class) {
WaitingFlows.Committer(it)
}.map { it.stateMachine }
val waiterStx = node2.services.startFlow(WaitingFlows.Waiter(stx, node1.info.chooseIdentity())).resultFuture
val waiterStx = bobNode.services.startFlow(WaitingFlows.Waiter(stx, alice)).resultFuture
mockNet.runNetwork()
assertThat(waiterStx.getOrThrow()).isEqualTo(committerFiber.getOrThrow().resultFuture.getOrThrow())
}
@ -547,12 +557,12 @@ class FlowFrameworkTests {
val ptx = TransactionBuilder(notary = notaryIdentity)
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
.addCommand(dummyCommand())
val stx = node1.services.signInitialTransaction(ptx)
val stx = aliceNode.services.signInitialTransaction(ptx)
node1.registerFlowFactory(WaitingFlows.Waiter::class) {
aliceNode.registerFlowFactory(WaitingFlows.Waiter::class) {
WaitingFlows.Committer(it) { throw Exception("Error") }
}
val waiter = node2.services.startFlow(WaitingFlows.Waiter(stx, node1.info.chooseIdentity())).resultFuture
val waiter = bobNode.services.startFlow(WaitingFlows.Waiter(stx, alice)).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
waiter.getOrThrow()
@ -563,13 +573,13 @@ class FlowFrameworkTests {
fun `verify vault query service is tokenizable by force checkpointing within a flow`() {
val ptx = TransactionBuilder(notary = notaryIdentity)
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(node1.info.chooseIdentity().owningKey))
val stx = node1.services.signInitialTransaction(ptx)
.addCommand(dummyCommand(alice.owningKey))
val stx = aliceNode.services.signInitialTransaction(ptx)
node1.registerFlowFactory(VaultQueryFlow::class) {
aliceNode.registerFlowFactory(VaultQueryFlow::class) {
WaitingFlows.Committer(it)
}
val result = node2.services.startFlow(VaultQueryFlow(stx, node1.info.chooseIdentity())).resultFuture
val result = bobNode.services.startFlow(VaultQueryFlow(stx, alice)).resultFuture
mockNet.runNetwork()
assertThat(result.getOrThrow()).isEmpty()
@ -577,15 +587,15 @@ class FlowFrameworkTests {
@Test
fun `customised client flow`() {
val receiveFlowFuture = node2.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it) }
node1.services.startFlow(CustomSendFlow("Hello", node2.info.chooseIdentity())).resultFuture
val receiveFlowFuture = bobNode.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it) }
aliceNode.services.startFlow(CustomSendFlow("Hello", bob)).resultFuture
mockNet.runNetwork()
assertThat(receiveFlowFuture.getOrThrow().receivedPayloads).containsOnly("Hello")
}
@Test
fun `customised client flow which has annotated @InitiatingFlow again`() {
val result = node1.services.startFlow(IncorrectCustomSendFlow("Hello", node2.info.chooseIdentity())).resultFuture
val result = aliceNode.services.startFlow(IncorrectCustomSendFlow("Hello", bob)).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(IllegalArgumentException::class.java).isThrownBy {
result.getOrThrow()
@ -594,12 +604,12 @@ class FlowFrameworkTests {
@Test
fun `upgraded initiating flow`() {
node2.registerFlowFactory(UpgradedFlow::class, initiatedFlowVersion = 1) { InitiatedSendFlow("Old initiated", it) }
val result = node1.services.startFlow(UpgradedFlow(node2.info.chooseIdentity())).resultFuture
bobNode.registerFlowFactory(UpgradedFlow::class, initiatedFlowVersion = 1) { InitiatedSendFlow("Old initiated", it) }
val result = aliceNode.services.startFlow(UpgradedFlow(bob)).resultFuture
mockNet.runNetwork()
assertThat(receivedSessionMessages).startsWith(
node1 sent sessionInit(UpgradedFlow::class, flowVersion = 2) to node2,
node2 sent sessionConfirm(flowVersion = 1) to node1
aliceNode sent sessionInit(UpgradedFlow::class, flowVersion = 2) to bobNode,
bobNode sent sessionConfirm(flowVersion = 1) to aliceNode
)
val (receivedPayload, node2FlowVersion) = result.getOrThrow()
assertThat(receivedPayload).isEqualTo("Old initiated")
@ -608,20 +618,20 @@ class FlowFrameworkTests {
@Test
fun `upgraded initiated flow`() {
node2.registerFlowFactory(SendFlow::class, initiatedFlowVersion = 2) { UpgradedFlow(it) }
val initiatingFlow = SendFlow("Old initiating", node2.info.chooseIdentity())
val flowInfo = node1.services.startFlow(initiatingFlow).resultFuture
bobNode.registerFlowFactory(SendFlow::class, initiatedFlowVersion = 2) { UpgradedFlow(it) }
val initiatingFlow = SendFlow("Old initiating", bob)
val flowInfo = aliceNode.services.startFlow(initiatingFlow).resultFuture
mockNet.runNetwork()
assertThat(receivedSessionMessages).startsWith(
node1 sent sessionInit(SendFlow::class, flowVersion = 1, payload = "Old initiating") to node2,
node2 sent sessionConfirm(flowVersion = 2) to node1
aliceNode sent sessionInit(SendFlow::class, flowVersion = 1, payload = "Old initiating") to bobNode,
bobNode sent sessionConfirm(flowVersion = 2) to aliceNode
)
assertThat(flowInfo.get().flowVersion).isEqualTo(2)
}
@Test
fun `unregistered flow`() {
val future = node1.services.startFlow(SendFlow("Hello", node2.info.chooseIdentity())).resultFuture
val future = aliceNode.services.startFlow(SendFlow("Hello", bob)).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java)
.isThrownBy { future.getOrThrow() }
@ -630,7 +640,7 @@ class FlowFrameworkTests {
@Test
fun `unknown class in session init`() {
node1.sendSessionMessage(SessionInit(random63BitValue(), "not.a.real.Class", 1, "version", null), node2)
aliceNode.sendSessionMessage(SessionInit(random63BitValue(), "not.a.real.Class", 1, "version", null), bob)
mockNet.runNetwork()
assertThat(receivedSessionMessages).hasSize(2) // Only the session-init and session-reject are expected
val reject = receivedSessionMessages.last().message as SessionReject
@ -639,7 +649,7 @@ class FlowFrameworkTests {
@Test
fun `non-flow class in session init`() {
node1.sendSessionMessage(SessionInit(random63BitValue(), String::class.java.name, 1, "version", null), node2)
aliceNode.sendSessionMessage(SessionInit(random63BitValue(), String::class.java.name, 1, "version", null), bob)
mockNet.runNetwork()
assertThat(receivedSessionMessages).hasSize(2) // Only the session-init and session-reject are expected
val reject = receivedSessionMessages.last().message as SessionReject
@ -648,23 +658,23 @@ class FlowFrameworkTests {
@Test
fun `single inlined sub-flow`() {
node2.registerFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
val result = node1.services.startFlow(SendAndReceiveFlow(node2.info.chooseIdentity(), "Hello")).resultFuture
bobNode.registerFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
val result = aliceNode.services.startFlow(SendAndReceiveFlow(bob, "Hello")).resultFuture
mockNet.runNetwork()
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
}
@Test
fun `double inlined sub-flow`() {
node2.registerFlowFactory(SendAndReceiveFlow::class) { DoubleInlinedSubFlow(it) }
val result = node1.services.startFlow(SendAndReceiveFlow(node2.info.chooseIdentity(), "Hello")).resultFuture
bobNode.registerFlowFactory(SendAndReceiveFlow::class) { DoubleInlinedSubFlow(it) }
val result = aliceNode.services.startFlow(SendAndReceiveFlow(bob, "Hello")).resultFuture
mockNet.runNetwork()
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
}
@Test
fun `double initiateFlow throws`() {
val future = node1.services.startFlow(DoubleInitiatingFlow()).resultFuture
val future = aliceNode.services.startFlow(DoubleInitiatingFlow()).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(IllegalStateException::class.java)
.isThrownBy { future.getOrThrow() }
@ -675,8 +685,8 @@ class FlowFrameworkTests {
private class DoubleInitiatingFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
initiateFlow(serviceHub.myInfo.chooseIdentity())
initiateFlow(serviceHub.myInfo.chooseIdentity())
initiateFlow(ourIdentity)
initiateFlow(ourIdentity)
}
}
@ -718,9 +728,9 @@ class FlowFrameworkTests {
private val normalEnd = NormalSessionEnd(0)
private fun erroredEnd(errorResponse: FlowException? = null) = ErrorSessionEnd(0, errorResponse)
private fun StartedNode<*>.sendSessionMessage(message: SessionMessage, destination: StartedNode<*>) {
private fun StartedNode<*>.sendSessionMessage(message: SessionMessage, destination: Party) {
services.networkService.apply {
val address = getAddressOfParty(PartyInfo.SingleNode(destination.info.chooseIdentity(), emptyList()))
val address = getAddressOfParty(PartyInfo.SingleNode(destination, emptyList()))
send(createMessage(StateMachineManager.sessionTopic, message.serialize().bytes), address)
}
}

View File

@ -8,11 +8,12 @@ import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.internal.StartedNode
import net.corda.node.services.api.ServiceHubInternal
import net.corda.testing.*
import net.corda.testing.contracts.DummyContract
import net.corda.testing.node.MockNetwork
@ -27,18 +28,21 @@ import kotlin.test.assertFailsWith
class NotaryServiceTests {
lateinit var mockNet: MockNetwork
lateinit var notaryNode: StartedNode<MockNetwork.MockNode>
lateinit var clientNode: StartedNode<MockNetwork.MockNode>
lateinit var notaryServices: ServiceHubInternal
lateinit var aliceServices: ServiceHubInternal
lateinit var notary: Party
lateinit var alice: Party
@Before
fun setup() {
mockNet = MockNetwork(cordappPackages = listOf("net.corda.testing.contracts"))
notaryNode = mockNet.createNotaryNode(legalName = DUMMY_NOTARY.name, validating = false)
clientNode = mockNet.createNode()
val notaryNode = mockNet.createNotaryNode(legalName = DUMMY_NOTARY.name, validating = false)
aliceServices = mockNet.createNode(legalName = ALICE_NAME).services
mockNet.runNetwork() // Clear network map registration messages
notaryNode.internals.ensureRegistered()
notary = clientNode.services.getDefaultNotary()
notaryServices = notaryNode.services
notary = notaryServices.getDefaultNotary()
alice = aliceServices.myInfo.singleIdentity()
}
@After
@ -49,12 +53,12 @@ class NotaryServiceTests {
@Test
fun `should sign a unique transaction with a valid time-window`() {
val stx = run {
val inputState = issueState(clientNode)
val inputState = issueState(aliceServices, alice)
val tx = TransactionBuilder(notary)
.addInputState(inputState)
.addCommand(dummyCommand(clientNode.info.chooseIdentity().owningKey))
.addCommand(dummyCommand(alice.owningKey))
.setTimeWindow(Instant.now(), 30.seconds)
clientNode.services.signInitialTransaction(tx)
aliceServices.signInitialTransaction(tx)
}
val future = runNotaryClient(stx)
@ -65,11 +69,11 @@ class NotaryServiceTests {
@Test
fun `should sign a unique transaction without a time-window`() {
val stx = run {
val inputState = issueState(clientNode)
val inputState = issueState(aliceServices, alice)
val tx = TransactionBuilder(notary)
.addInputState(inputState)
.addCommand(dummyCommand(clientNode.info.chooseIdentity().owningKey))
clientNode.services.signInitialTransaction(tx)
.addCommand(dummyCommand(alice.owningKey))
aliceServices.signInitialTransaction(tx)
}
val future = runNotaryClient(stx)
@ -80,12 +84,12 @@ class NotaryServiceTests {
@Test
fun `should report error for transaction with an invalid time-window`() {
val stx = run {
val inputState = issueState(clientNode)
val inputState = issueState(aliceServices, alice)
val tx = TransactionBuilder(notary)
.addInputState(inputState)
.addCommand(dummyCommand(clientNode.info.chooseIdentity().owningKey))
.addCommand(dummyCommand(alice.owningKey))
.setTimeWindow(Instant.now().plusSeconds(3600), 30.seconds)
clientNode.services.signInitialTransaction(tx)
aliceServices.signInitialTransaction(tx)
}
val future = runNotaryClient(stx)
@ -97,17 +101,17 @@ class NotaryServiceTests {
@Test
fun `should sign identical transaction multiple times (signing is idempotent)`() {
val stx = run {
val inputState = issueState(clientNode)
val inputState = issueState(aliceServices, alice)
val tx = TransactionBuilder(notary)
.addInputState(inputState)
.addCommand(dummyCommand(clientNode.info.chooseIdentity().owningKey))
clientNode.services.signInitialTransaction(tx)
.addCommand(dummyCommand(alice.owningKey))
aliceServices.signInitialTransaction(tx)
}
val firstAttempt = NotaryFlow.Client(stx)
val secondAttempt = NotaryFlow.Client(stx)
val f1 = clientNode.services.startFlow(firstAttempt)
val f2 = clientNode.services.startFlow(secondAttempt)
val f1 = aliceServices.startFlow(firstAttempt)
val f2 = aliceServices.startFlow(secondAttempt)
mockNet.runNetwork()
@ -116,25 +120,25 @@ class NotaryServiceTests {
@Test
fun `should report conflict when inputs are reused across transactions`() {
val inputState = issueState(clientNode)
val inputState = issueState(aliceServices, alice)
val stx = run {
val tx = TransactionBuilder(notary)
.addInputState(inputState)
.addCommand(dummyCommand(clientNode.info.chooseIdentity().owningKey))
clientNode.services.signInitialTransaction(tx)
.addCommand(dummyCommand(alice.owningKey))
aliceServices.signInitialTransaction(tx)
}
val stx2 = run {
val tx = TransactionBuilder(notary)
.addInputState(inputState)
.addInputState(issueState(clientNode))
.addCommand(dummyCommand(clientNode.info.chooseIdentity().owningKey))
clientNode.services.signInitialTransaction(tx)
.addInputState(issueState(aliceServices, alice))
.addCommand(dummyCommand(alice.owningKey))
aliceServices.signInitialTransaction(tx)
}
val firstSpend = NotaryFlow.Client(stx)
val secondSpend = NotaryFlow.Client(stx2) // Double spend the inputState in a second transaction.
clientNode.services.startFlow(firstSpend)
val future = clientNode.services.startFlow(secondSpend)
aliceServices.startFlow(firstSpend)
val future = aliceServices.startFlow(secondSpend)
mockNet.runNetwork()
@ -146,16 +150,16 @@ class NotaryServiceTests {
private fun runNotaryClient(stx: SignedTransaction): CordaFuture<List<TransactionSignature>> {
val flow = NotaryFlow.Client(stx)
val future = clientNode.services.startFlow(flow).resultFuture
val future = aliceServices.startFlow(flow).resultFuture
mockNet.runNetwork()
return future
}
fun issueState(node: StartedNode<*>): StateAndRef<*> {
val tx = DummyContract.generateInitial(Random().nextInt(), notary, node.info.chooseIdentity().ref(0))
val signedByNode = node.services.signInitialTransaction(tx)
val stx = notaryNode.services.addSignature(signedByNode, notary.owningKey)
node.services.recordTransactions(stx)
fun issueState(services: ServiceHub, identity: Party): StateAndRef<*> {
val tx = DummyContract.generateInitial(Random().nextInt(), notary, identity.ref(0))
val signedByNode = services.signInitialTransaction(tx)
val stx = notaryServices.addSignature(signedByNode, notary.owningKey)
services.recordTransactions(stx)
return StateAndRef(tx.outputStates().first(), StateRef(stx.id, 0))
}
}

View File

@ -42,8 +42,8 @@ class ValidatingNotaryServiceTests {
notaryNode.internals.ensureRegistered()
notaryServices = notaryNode.services
aliceServices = aliceNode.services
notary = notaryServices.networkMapCache.getNotary(DUMMY_NOTARY_SERVICE_NAME)!!
alice = aliceServices.myInfo.identityFromX500Name(ALICE_NAME)
notary = notaryServices.getDefaultNotary()
alice = aliceNode.info.singleIdentity()
}
@After

View File

@ -69,11 +69,11 @@ val MEGA_CORP: Party get() = MEGA_CORP_IDENTITY.party
val MINI_CORP_IDENTITY: PartyAndCertificate get() = getTestPartyAndCertificate(CordaX500Name(organisation = "MiniCorp", locality = "London", country = "GB"), MINI_CORP_PUBKEY)
val MINI_CORP: Party get() = MINI_CORP_IDENTITY.party
val BOC_NAME: CordaX500Name = CordaX500Name(organisation = "BankOfCorda", locality = "London", country = "GB")
val BOC_KEY: KeyPair by lazy { generateKeyPair() }
val BOC_PUBKEY: PublicKey get() = BOC_KEY.public
val BOC_IDENTITY: PartyAndCertificate get() = getTestPartyAndCertificate(CordaX500Name(organisation = "BankOfCorda", locality = "London", country = "GB"), BOC_PUBKEY)
val BOC_IDENTITY: PartyAndCertificate get() = getTestPartyAndCertificate(BOC_NAME, BOC_PUBKEY)
val BOC: Party get() = BOC_IDENTITY.party
val BOC_PARTY_REF = BOC.ref(OpaqueBytes.of(1)).reference
val BIG_CORP_KEY: KeyPair by lazy { generateKeyPair() }
val BIG_CORP_PUBKEY: PublicKey get() = BIG_CORP_KEY.public
@ -166,5 +166,14 @@ inline fun <reified T : Any> T.amqpSpecific(reason: String, function: () -> Unit
fun NodeInfo.chooseIdentityAndCert(): PartyAndCertificate = legalIdentitiesAndCerts.first()
fun NodeInfo.chooseIdentity(): Party = chooseIdentityAndCert().party
/**
* Extract a single identity from the node info. Throws an error if the node has multiple identities.
*/
fun NodeInfo.singleIdentityAndCert(): PartyAndCertificate = legalIdentitiesAndCerts.single()
/**
* Extract a single identity from the node info. Throws an error if the node has multiple identities.
*/
fun NodeInfo.singleIdentity(): Party = singleIdentityAndCert().party
/** Returns the identity of the first notary found on the network */
fun ServiceHub.getDefaultNotary(): Party = networkMapCache.notaryIdentities.first()