diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 1f04313ce4..9226af0e44 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -2897,7 +2897,7 @@ public abstract class net.corda.core.identity.AbstractParty extends java.lang.Ob ## @DoNotImplement @CordaSerializable -public final class net.corda.core.identity.AnonymousParty extends net.corda.core.identity.AbstractParty +public final class net.corda.core.identity.AnonymousParty extends net.corda.core.identity.AbstractParty implements net.corda.core.flows.Destination public (java.security.PublicKey) @Nullable public net.corda.core.identity.CordaX500Name nameOrNull() @@ -2978,7 +2978,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec ## @DoNotImplement @CordaSerializable -public final class net.corda.core.identity.Party extends net.corda.core.identity.AbstractParty +public final class net.corda.core.identity.Party extends net.corda.core.identity.AbstractParty implements net.corda.core.flows.Destination public (java.security.cert.X509Certificate) public (net.corda.core.identity.CordaX500Name, java.security.PublicKey) @NotNull diff --git a/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt b/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt index aab20f5e78..2c93dda546 100644 --- a/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt @@ -3,8 +3,11 @@ package net.corda.core.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.crypto.TransactionSignature import net.corda.core.crypto.isFulfilledBy +import net.corda.core.crypto.toStringShort +import net.corda.core.identity.AnonymousParty import net.corda.core.identity.Party import net.corda.core.identity.groupPublicKeysByWellKnownParty +import net.corda.core.internal.toMultiMap import net.corda.core.node.ServiceHub import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction @@ -65,7 +68,12 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig val sessionsToCollectFrom: Collection, val myOptionalKeys: Iterable?, override val progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : FlowLogic() { - @JvmOverloads constructor(partiallySignedTx: SignedTransaction, sessionsToCollectFrom: Collection, progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : this(partiallySignedTx, sessionsToCollectFrom, null, progressTracker) + @JvmOverloads + constructor( + partiallySignedTx: SignedTransaction, + sessionsToCollectFrom: Collection, + progressTracker: ProgressTracker = CollectSignaturesFlow.tracker() + ) : this(partiallySignedTx, sessionsToCollectFrom, null, progressTracker) companion object { object COLLECTING : ProgressTracker.Step("Collecting signatures from counterparties.") @@ -75,7 +83,6 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig fun tracker() = ProgressTracker(COLLECTING, VERIFYING) // TODO: Make the progress tracker adapt to the number of counterparties to collect from. - } @Suspendable @@ -105,14 +112,61 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig // If the unsigned counterparties list is empty then we don't need to collect any more signatures here. if (unsigned.isEmpty()) return partiallySignedTx - val partyToKeysMap = groupPublicKeysByWellKnownParty(serviceHub, unsigned) - // Check that we have a session for all parties. No more, no less. - require(sessionsToCollectFrom.map { it.counterparty }.toSet() == partyToKeysMap.keys) { + val setOfAllSessionKeys: Map = sessionsToCollectFrom + .groupBy { + val destination = it.destination + when (destination) { + is Party -> destination.owningKey + is AnonymousParty -> destination.owningKey + else -> throw IllegalArgumentException("Signatures can only be collected from Party or AnonymousParty, not $destination") + } + } + .mapValues { + require(it.value.size == 1) { "There are multiple sessions initiated for party key ${it.key.toStringShort()}" } + it.value.first() + } + + val partyToKeysItSignsFor: Map> = groupPublicKeysByWellKnownParty(serviceHub, unsigned) + val keyToSigningParty: Map = partyToKeysItSignsFor + .flatMap { (wellKnown, allKeysItSignsFor) -> allKeysItSignsFor.map { it to wellKnown } } + .toMap() + + val unrelatedSessions = sessionsToCollectFrom.filterNot { + if (it.destination is Party) { + // The session must have a corresponding unsigned. + it.destination in partyToKeysItSignsFor + } else { + // setOfAllSessionKeys has already checked for valid destination types so we can safely cast to AnonoymousParty here. + // This session was not initiated by a wellKnownParty so must directly exist in the unsigned. + (it.destination as AnonymousParty).owningKey in unsigned + } + } + + val keyToSessionList = unsigned.map { + val session = setOfAllSessionKeys[it] + if (session != null) { + // The unsigned key exists directly as a sessionKey, so use that session + it to session + } else { + // It might be delegated to a wellKnownParty + val wellKnownParty = checkNotNull(keyToSigningParty[it]) { "Could not find a session or wellKnown party for key ${it.toStringShort()}" } + // There is a wellKnownParty for this key, check if it has a session, and if so - use that session + val sessionForWellKnownParty = checkNotNull(setOfAllSessionKeys[wellKnownParty.owningKey]) { + "No session available to request signature for key: ${it.toStringShort()}" + } + it to sessionForWellKnownParty + } + } + + // Now invert the map to find the keys per session + val sessionToKeysMap = keyToSessionList.map { it.second to it.first }.toMultiMap() + + require(unrelatedSessions.isEmpty()) { "The Initiator of CollectSignaturesFlow must pass in exactly the sessions required to sign the transaction." } // Collect signatures from all counterparties and append them to the partially signed transaction. - val counterpartySignatures = sessionsToCollectFrom.flatMap { session -> - subFlow(CollectSignatureFlow(partiallySignedTx, session, partyToKeysMap[session.counterparty]!!)) + val counterpartySignatures = sessionToKeysMap.flatMap { (session, keys) -> + subFlow(CollectSignatureFlow(partiallySignedTx, session, keys)) } val stx = partiallySignedTx + counterpartySignatures @@ -196,7 +250,7 @@ class CollectSignatureFlow(val partiallySignedTx: SignedTransaction, val session * @param otherSideSession The session which is providing you a transaction to sign. */ abstract class SignTransactionFlow @JvmOverloads constructor(val otherSideSession: FlowSession, - override val progressTracker: ProgressTracker = SignTransactionFlow.tracker()) : FlowLogic() { + override val progressTracker: ProgressTracker = SignTransactionFlow.tracker()) : FlowLogic() { companion object { object RECEIVING : ProgressTracker.Step("Receiving transaction proposal for signing.") @@ -247,12 +301,6 @@ abstract class SignTransactionFlow @JvmOverloads constructor(val otherSideSessio @Suspendable private fun checkSignatures(stx: SignedTransaction) { - // We set `ignoreUnrecognisedParties` to `true` in `groupPublicKeysByWellKnownParty`. This is because we don't - // need to recognise all keys, but just the initiator's. - val signingWellKnownIdentities = groupPublicKeysByWellKnownParty(serviceHub, stx.sigs.map(TransactionSignature::by), true) - require(otherSideSession.counterparty in signingWellKnownIdentities) { - "The Initiator of CollectSignaturesFlow must have signed the transaction. Found $signingWellKnownIdentities, expected $otherSideSession" - } val signed = stx.sigs.map { it.by } val allSigners = stx.tx.requiredSigningKeys val notSigned = allSigners - signed diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index bca4a37b5c..5333bf4f02 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -115,6 +115,13 @@ abstract class FlowLogic { */ val serviceHub: ServiceHub get() = stateMachine.serviceHub + /** + * Creates a communication session with [destination]. Subsequently you may send/receive using this session object. How the messaging + * is routed depends on the [Destination] type, including whether this call does any initial communication. + */ + @Suspendable + fun initiateFlow(destination: Destination): FlowSession = stateMachine.initiateFlow(destination) + /** * Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note * that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive. @@ -252,7 +259,6 @@ abstract class FlowLogic { return sendAndReceiveWithRetry(R::class.java, payload) } - /** Suspends until a message has been received for each session in the specified [sessions]. * * Consider [receiveAll(receiveType: Class, sessions: List): List>] when the same type is expected from all sessions. diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt b/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt index b1782f5424..ac16d6897d 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt @@ -2,11 +2,12 @@ package net.corda.core.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.DoNotImplement +import net.corda.core.KeepForDJVM +import net.corda.core.identity.AnonymousParty import net.corda.core.identity.Party import net.corda.core.utilities.UntrustworthyData /** - * * A [FlowSession] is a handle on a communication sequence between two paired flows, possibly running on separate nodes. * It is used to send and receive messages between the flows as well as to query information about the counter-flow. * @@ -45,8 +46,18 @@ import net.corda.core.utilities.UntrustworthyData @DoNotImplement abstract class FlowSession { /** - * The [Party] on the other side of this session. In the case of a session created by [FlowLogic.initiateFlow] - * [counterparty] is the same Party as the one passed to that function. + * The [Destination] on the other side of this session. In the case of a session created by [FlowLogic.initiateFlow] this is the same + * destination as the one passed to that function. + */ + abstract val destination: Destination + + /** + * If the destination on the other side of this session is a [Party] then returns that, otherwise throws [IllegalStateException]. + * + * Only use this method if it's known the other side is a [Party], otherwise use [destination]. + * + * @throws IllegalStateException if the other side is not a [Party]. + * @see destination */ abstract val counterparty: Party @@ -181,3 +192,14 @@ abstract class FlowSession { @Suspendable abstract fun send(payload: Any) } + +/** + * An abstraction for flow session destinations. A flow can send to and receive from objects which implement this interface. The specifics + * of how the messages are routed depend on the implementation. + * + * Corda currently only supports a fixed set of destination types, namely [Party] and [AnonymousParty]. New destination types will be added + * in future releases. + */ +@DoNotImplement +@KeepForDJVM +interface Destination diff --git a/core/src/main/kotlin/net/corda/core/identity/AnonymousParty.kt b/core/src/main/kotlin/net/corda/core/identity/AnonymousParty.kt index 9e4ae47a0a..562067b7fd 100644 --- a/core/src/main/kotlin/net/corda/core/identity/AnonymousParty.kt +++ b/core/src/main/kotlin/net/corda/core/identity/AnonymousParty.kt @@ -3,15 +3,22 @@ package net.corda.core.identity import net.corda.core.KeepForDJVM import net.corda.core.contracts.PartyAndReference import net.corda.core.crypto.toStringShort +import net.corda.core.flows.Destination +import net.corda.core.flows.FlowLogic import net.corda.core.utilities.OpaqueBytes import java.security.PublicKey /** * The [AnonymousParty] class contains enough information to uniquely identify a [Party] while excluding private * information such as name. It is intended to represent a party on the distributed ledger. + * + * ### Flow sessions + * + * Anonymous parties can be used to communicate using the [FlowLogic.initiateFlow] method. Message routing is simply routing to the well-known + * [Party] the anonymous party belongs to. This mechanism assumes the party initiating the communication knows who the anonymous party is. */ @KeepForDJVM -class AnonymousParty(owningKey: PublicKey) : AbstractParty(owningKey) { +class AnonymousParty(owningKey: PublicKey) : Destination, AbstractParty(owningKey) { override fun nameOrNull(): CordaX500Name? = null override fun ref(bytes: OpaqueBytes): PartyAndReference = PartyAndReference(this, bytes) override fun toString() = "Anonymous(${owningKey.toStringShort()})" diff --git a/core/src/main/kotlin/net/corda/core/identity/Party.kt b/core/src/main/kotlin/net/corda/core/identity/Party.kt index 828128bf5f..151223bf43 100644 --- a/core/src/main/kotlin/net/corda/core/identity/Party.kt +++ b/core/src/main/kotlin/net/corda/core/identity/Party.kt @@ -4,6 +4,8 @@ import net.corda.core.KeepForDJVM import net.corda.core.contracts.PartyAndReference import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.Crypto +import net.corda.core.flows.Destination +import net.corda.core.flows.FlowLogic import net.corda.core.utilities.OpaqueBytes import java.security.PublicKey import java.security.cert.X509Certificate @@ -25,10 +27,15 @@ import java.security.cert.X509Certificate * * Note that equality is based solely on the owning key. * + * ### Flow sessions + * + * Communication with other parties is done using the flow framework with the [FlowLogic.initiateFlow] method. Message routing is done by + * using the network map to look up the connectivity details pertaining to the [Party]. + * * @see CompositeKey */ @KeepForDJVM -class Party(val name: CordaX500Name, owningKey: PublicKey) : AbstractParty(owningKey) { +class Party(val name: CordaX500Name, owningKey: PublicKey) : Destination, AbstractParty(owningKey) { constructor(certificate: X509Certificate) : this(CordaX500Name.build(certificate.subjectX500Principal), Crypto.toSupportedPublicKey(certificate.publicKey)) diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt index d2c8222b0d..83fc40a152 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt @@ -5,10 +5,7 @@ import net.corda.core.DeleteForDJVM import net.corda.core.DoNotImplement import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowSession -import net.corda.core.flows.FlowStackSnapshot -import net.corda.core.flows.StateMachineRunId +import net.corda.core.flows.* import net.corda.core.identity.Party import net.corda.core.node.ServiceHub import org.slf4j.Logger @@ -21,7 +18,7 @@ interface FlowStateMachine { fun suspend(ioRequest: FlowIORequest, maySkipCheckpoint: Boolean): SUSPENDRETURN @Suspendable - fun initiateFlow(party: Party): FlowSession + fun initiateFlow(destination: Destination): FlowSession fun checkFlowPermission(permissionName: String, extraAuditData: Map) diff --git a/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt b/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt index b8f2bc7757..a5daf72f87 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt @@ -5,6 +5,9 @@ import net.corda.core.DoNotImplement import net.corda.core.contracts.PartyAndReference import net.corda.core.crypto.toStringShort import net.corda.core.identity.* +import net.corda.core.internal.hash +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import java.security.InvalidAlgorithmParameterException import java.security.PublicKey import java.security.cert.* @@ -24,6 +27,10 @@ interface IdentityService { val trustAnchor: TrustAnchor val caCertStore: CertStore + companion object { + private val log = contextLogger() + } + /** * Verify and then store an identity. * @@ -94,6 +101,7 @@ interface IdentityService { // The original version of this would return the party as-is if it was a Party (rather than AnonymousParty), // however that means that we don't verify that we know who owns the key. As such as now enforce turning the key // into a party, and from there figure out the well known party. + log.debug { "Attempting to find wellKnownParty for: ${party.owningKey.hash}" } val candidate = partyFromKey(party.owningKey) // TODO: This should be done via the network map cache, which is the authoritative source of well known identities return if (candidate != null) { diff --git a/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt b/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt index 8a77699eeb..376b0ced1b 100644 --- a/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt @@ -6,13 +6,11 @@ import net.corda.core.contracts.Command import net.corda.core.contracts.StateAndContract import net.corda.core.contracts.requireThat import net.corda.core.flows.mixins.WithContracts -import net.corda.core.identity.CordaX500Name -import net.corda.core.identity.Party -import net.corda.core.identity.excludeHostNode -import net.corda.core.identity.groupAbstractPartyByWellKnownParty +import net.corda.core.identity.* import net.corda.core.node.services.IdentityService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder +import net.corda.node.services.api.IdentityServiceInternal import net.corda.testing.contracts.DummyContract import net.corda.testing.core.* import net.corda.testing.internal.matchers.flow.willReturn @@ -23,8 +21,11 @@ import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.TestStartedNode import net.corda.testing.node.internal.enclosedCordapp +import org.hamcrest.CoreMatchers.`is` import org.junit.AfterClass +import org.junit.Assert import org.junit.Test +import java.security.PublicKey class CollectSignaturesFlowTests : WithContracts { companion object { @@ -55,11 +56,59 @@ class CollectSignaturesFlowTests : WithContracts { aliceNode.verifyAndRegister(bConfidentialIdentity) assertThat( - aliceNode.startTestFlow(alice, bConfidentialIdentity.party, charlie), + aliceNode.startTestFlow(alice, bConfidentialIdentity.party, charlie), willReturn(requiredSignatures(3)) ) } + @Test + fun `successfully collects signatures when sessions are initiated with AnonymousParty`() { + val aConfidentialIdentity1 = aliceNode.createConfidentialIdentity(alice) + val bConfidentialIdentity1 = bobNode.createConfidentialIdentity(bob) + val bConfidentialIdentity2 = bobNode.createConfidentialIdentity(bob) + val cConfidentialIdentity1 = charlieNode.createConfidentialIdentity(charlie) + + bobNode.registerInitiatedFlow(AnonymousSessionTestFlowResponder::class.java) + charlieNode.registerInitiatedFlow(AnonymousSessionTestFlowResponder::class.java) + + val owners = listOf(aConfidentialIdentity1, bConfidentialIdentity1, bConfidentialIdentity2, cConfidentialIdentity1) + + val future = aliceNode.startFlow(AnonymousSessionTestFlow(owners)).resultFuture + mockNet.runNetwork() + val stx = future.get() + val missingSigners = stx.getMissingSigners() + Assert.assertThat(missingSigners, `is`(emptySet())) + } + + @Test + fun `successfully collects signatures when sessions are initiated with both AnonymousParty and WellKnownParty`() { + val aConfidentialIdentity1 = aliceNode.createConfidentialIdentity(alice) + val bConfidentialIdentity1 = bobNode.createConfidentialIdentity(bob) + val bConfidentialIdentity2 = bobNode.createConfidentialIdentity(bob) + val cConfidentialIdentity1 = charlieNode.createConfidentialIdentity(charlie) + val cConfidentialIdentity2 = charlieNode.createConfidentialIdentity(charlie) + + bobNode.registerInitiatedFlow(MixAndMatchAnonymousSessionTestFlowResponder::class.java) + charlieNode.registerInitiatedFlow(MixAndMatchAnonymousSessionTestFlowResponder::class.java) + + val owners = listOf( + aConfidentialIdentity1, + bConfidentialIdentity1, + bConfidentialIdentity2, + cConfidentialIdentity1, + cConfidentialIdentity2 + ) + + val keysToLookup = listOf(bConfidentialIdentity1.owningKey, bConfidentialIdentity2.owningKey, cConfidentialIdentity1.owningKey) + val keysToKeepAnonymous = listOf(cConfidentialIdentity2.owningKey) + + val future = aliceNode.startFlow(MixAndMatchAnonymousSessionTestFlow(owners, keysToLookup.toSet(), keysToKeepAnonymous.toSet())).resultFuture + mockNet.runNetwork() + val stx = future.get() + val missingSigners = stx.getMissingSigners() + Assert.assertThat(missingSigners, `is`(emptySet())) + } + @Test fun `no need to collect any signatures`() { val ptx = aliceNode.signDummyContract(alice.ref(1)) @@ -97,10 +146,10 @@ class CollectSignaturesFlowTests : WithContracts { //region Operators private fun TestStartedNode.startTestFlow(vararg party: Party) = startFlowAndRunNetwork( - TestFlow.Initiator(DummyContract.MultiOwnerState( - MAGIC_NUMBER, - listOf(*party)), - mockNet.defaultNotaryIdentity)) + TestFlow.Initiator(DummyContract.MultiOwnerState( + MAGIC_NUMBER, + listOf(*party)), + mockNet.defaultNotaryIdentity)) //region Test Flow // With this flow, the initiator starts the "CollectTransactionFlow". It is then the responders responsibility to @@ -144,3 +193,82 @@ class CollectSignaturesFlowTests : WithContracts { } //region } + +@InitiatingFlow +class AnonymousSessionTestFlow(private val cis: List) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + + for (ci in cis) { + if (ci.name != ourIdentity.name) { + (serviceHub.identityService as IdentityServiceInternal).verifyAndRegisterIdentity(ci) + } + } + val state = DummyContract.MultiOwnerState(owners = cis.map { AnonymousParty(it.owningKey) }) + val create = net.corda.testing.contracts.DummyContract.Commands.Create() + val txBuilder = TransactionBuilder(notary = serviceHub.networkMapCache.notaryIdentities.first()) + .addOutputState(state) + .addCommand(create, cis.map { it.owningKey }) + + + val ourKey = cis.single { it.name == ourIdentity.name }.owningKey + val signedByUsTx = serviceHub.signInitialTransaction(txBuilder, ourKey) + val sessionsToCollectFrom = cis.filter { it.name != ourIdentity.name }.map { initiateFlow(AnonymousParty(it.owningKey)) } + return subFlow(CollectSignaturesFlow(signedByUsTx, sessionsToCollectFrom, myOptionalKeys = listOf(ourKey))) + } +} + +@InitiatedBy(AnonymousSessionTestFlow::class) +class AnonymousSessionTestFlowResponder(private val otherSideSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val signFlow = object : SignTransactionFlow(otherSideSession) { + @Suspendable + override fun checkTransaction(stx: SignedTransaction) = requireThat { + } + } + subFlow(signFlow) + } +} + +@InitiatingFlow +class MixAndMatchAnonymousSessionTestFlow(private val cis: List, + private val keysToLookUp: Set, + private val keysToKeepAnonymous: Set) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + + for (ci in cis) { + if (ci.name != ourIdentity.name) { + (serviceHub.identityService as IdentityServiceInternal).verifyAndRegisterIdentity(ci) + } + } + val state = DummyContract.MultiOwnerState(owners = cis.map { AnonymousParty(it.owningKey) }) + val create = net.corda.testing.contracts.DummyContract.Commands.Create() + val txBuilder = TransactionBuilder(notary = serviceHub.networkMapCache.notaryIdentities.first()) + .addOutputState(state) + .addCommand(create, cis.map { it.owningKey }) + + + val ourKey = cis.single { it.name == ourIdentity.name }.owningKey + val signedByUsTx = serviceHub.signInitialTransaction(txBuilder, ourKey) + + val resolvedParties = keysToLookUp.map { serviceHub.identityService.wellKnownPartyFromAnonymous(AnonymousParty(it))!! }.toSet() + val anonymousParties = keysToKeepAnonymous.map { AnonymousParty(it) } + val sessionsToCollectFrom = (resolvedParties + anonymousParties).map { initiateFlow(it as Destination) } + return subFlow(CollectSignaturesFlow(signedByUsTx, sessionsToCollectFrom, myOptionalKeys = listOf(ourKey))) + } +} + +@InitiatedBy(MixAndMatchAnonymousSessionTestFlow::class) +class MixAndMatchAnonymousSessionTestFlowResponder(private val otherSideSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val signFlow = object : SignTransactionFlow(otherSideSession) { + @Suspendable + override fun checkTransaction(stx: SignedTransaction) = requireThat { + } + } + subFlow(signFlow) + } +} diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 0a66c76efe..21466043ff 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -9,6 +9,10 @@ release, see :doc:`app-upgrade-notes`. Version 5.0 ----------- +* Introduced a new ``Destination`` abstraction for communicating with non-Party destinations using the new ``FlowLogic.initateFlow(Destination)`` + method. ``Party`` and ``AnonymousParty`` have been retrofitted to implement ``Destination``. Initiating a flow to an ``AnonymousParty`` + means resolving to the well-known identity ``Party`` and then communicating with that. + * Removed ``finance-workflows`` dependency on jackson library. The functions that used jackson (e.g. ``FinanceJSONSupport``) have been moved into IRS Demo. diff --git a/finance/workflows/src/main/kotlin/net/corda/finance/flows/CashPaymentFlow.kt b/finance/workflows/src/main/kotlin/net/corda/finance/flows/CashPaymentFlow.kt index 571d427981..9d59eace0f 100644 --- a/finance/workflows/src/main/kotlin/net/corda/finance/flows/CashPaymentFlow.kt +++ b/finance/workflows/src/main/kotlin/net/corda/finance/flows/CashPaymentFlow.kt @@ -5,6 +5,7 @@ import net.corda.confidential.SwapIdentitiesFlow import net.corda.core.contracts.Amount import net.corda.core.contracts.InsufficientBalanceException import net.corda.core.flows.* +import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.TransactionBuilder @@ -52,7 +53,7 @@ open class CashPaymentFlow( progressTracker.currentStep = GENERATING_ID val recipientSession = initiateFlow(recipient) recipientSession.send(anonymous) - val anonymousRecipient = if (anonymous) { + val anonymousRecipient: AbstractParty = if (anonymous) { subFlow(SwapIdentitiesFlow(recipientSession))[recipient]!! } else { recipient diff --git a/finance/workflows/src/main/kotlin/net/corda/finance/workflows/asset/CashUtils.kt b/finance/workflows/src/main/kotlin/net/corda/finance/workflows/asset/CashUtils.kt index 2d0fe55424..592ae2f5ac 100644 --- a/finance/workflows/src/main/kotlin/net/corda/finance/workflows/asset/CashUtils.kt +++ b/finance/workflows/src/main/kotlin/net/corda/finance/workflows/asset/CashUtils.kt @@ -154,7 +154,7 @@ object CashUtils { // If anonymous is true, generate a new identity that change will be sent to for confidentiality purposes. This means that a // third party with a copy of the transaction (such as the notary) cannot identify who the change was // sent to - val changeIdentity = if (anonymous) services.keyManagementService.freshKeyAndCert(ourIdentity, revocationEnabled).party.anonymise() else ourIdentity.party + val changeIdentity: AbstractParty = if (anonymous) services.keyManagementService.freshKeyAndCert(ourIdentity, revocationEnabled).party.anonymise() else ourIdentity.party return OnLedgerAsset.generateSpend( tx, payments, diff --git a/node/src/main/kotlin/net/corda/node/services/api/IdentityServiceInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/IdentityServiceInternal.kt index 070e5b0ed1..94a2199611 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/IdentityServiceInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/IdentityServiceInternal.kt @@ -1,6 +1,7 @@ package net.corda.node.services.api import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.CertRole import net.corda.core.node.services.IdentityService @@ -8,6 +9,7 @@ import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.crypto.x509Certificates import java.security.InvalidAlgorithmParameterException +import java.security.PublicKey import java.security.cert.CertPathValidatorException import java.security.cert.CertificateExpiredException import java.security.cert.CertificateNotYetValidException diff --git a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt index a8ed927ee3..56ebdf98a5 100644 --- a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt +++ b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt @@ -9,6 +9,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.MAX_HASH_HEX_SIZE import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug +import net.corda.core.utilities.toBase58String import net.corda.node.services.api.IdentityServiceInternal import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.crypto.X509CertificateFactory @@ -170,7 +171,9 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri } // We give the caller a copy of the data set to avoid any locking problems - override fun getAllIdentities(): Iterable = database.transaction { keyToParties.allPersisted().map { it.second }.asIterable() } + override fun getAllIdentities(): Iterable = database.transaction { + keyToParties.allPersisted().map { it.second }.asIterable() + } override fun wellKnownPartyFromX500Name(name: CordaX500Name): Party? = certificateFromCordaX500Name(name)?.party diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt index a785347ff3..925fdb27e1 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt @@ -1,6 +1,7 @@ package net.corda.node.services.statemachine import net.corda.core.crypto.SecureHash +import net.corda.core.flows.Destination import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party import net.corda.core.internal.FlowAsyncOperation @@ -19,10 +20,10 @@ sealed class Action { data class TrackTransaction(val hash: SecureHash) : Action() /** - * Send an initial session message to [party]. + * Send an initial session message to [destination]. */ data class SendInitial( - val party: Party, + val destination: Destination, val initialise: InitialSessionMessage, val deduplicationId: SenderDeduplicationId ) : Action() diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index bd2e5a5169..6a2faba893 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -168,7 +168,7 @@ class ActionExecutorImpl( @Suspendable private fun executeSendInitial(action: Action.SendInitial) { - flowMessaging.sendSessionMessage(action.party, action.initialise, action.deduplicationId) + flowMessaging.sendSessionMessage(action.destination, action.initialise, action.deduplicationId) } @Suspendable diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt index e08d2d5884..17fff1da60 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt @@ -1,5 +1,6 @@ package net.corda.node.services.statemachine +import net.corda.core.flows.Destination import net.corda.core.flows.FlowLogic import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest @@ -69,9 +70,8 @@ sealed class Event { * * Initiate a flow. This causes a new session object to be created and returned to the flow. Note that no actual * communication takes place at this time, only on the first send/receive operation on the session. - * @param party the [Party] to create a session with. */ - data class InitiateFlow(val party: Party) : Event() + data class InitiateFlow(val destination: Destination) : Event() /** * Signal the entering into a subflow. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt index 1ca7490e7d..8963c7616f 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt @@ -3,7 +3,9 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable import com.esotericsoftware.kryo.KryoException import net.corda.core.context.InvocationOrigin +import net.corda.core.flows.Destination import net.corda.core.flows.FlowException +import net.corda.core.identity.AnonymousParty import net.corda.core.identity.Party import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.serialize @@ -20,11 +22,10 @@ import java.io.NotSerializableException */ interface FlowMessaging { /** - * Send [message] to [party] using [deduplicationId]. Optionally [acknowledgementHandler] may be specified to - * listen on the send acknowledgement. + * Send [message] to [destination] using [deduplicationId]. */ @Suspendable - fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: SenderDeduplicationId) + fun sendSessionMessage(destination: Destination, message: SessionMessage, deduplicationId: SenderDeduplicationId) /** * Start the messaging using the [onMessage] message handler. @@ -49,10 +50,20 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging { } @Suspendable - override fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: SenderDeduplicationId) { - log.trace { "Sending message $deduplicationId $message to party $party" } + override fun sendSessionMessage(destination: Destination, message: SessionMessage, deduplicationId: SenderDeduplicationId) { + val party = if (destination is Party) { + log.trace { "Sending message $deduplicationId $message to $destination" } + destination + } else { + // We assume that the destination type has already been checked by initiateFlow + val wellKnown = requireNotNull(serviceHub.identityService.wellKnownPartyFromAnonymous(destination as AnonymousParty)) { + "We do not know who $destination belongs to" + } + log.trace { "Sending message $deduplicationId $message to $wellKnown on behalf of $destination" } + wellKnown + } val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId, message.additionalHeaders(party)) - val partyInfo = serviceHub.networkMapCache.getPartyInfo(party) ?: throw IllegalArgumentException("Don't know about $party") + val partyInfo = requireNotNull(serviceHub.networkMapCache.getPartyInfo(party)) { "Don't know about $party" } val address = serviceHub.networkService.getAddressOfParty(partyInfo) val sequenceKey = when (message) { is InitialSessionMessage -> message.initiatorSessionId diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt index 364546ce9d..3ddc6151d3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt @@ -2,43 +2,42 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Fiber import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.Destination import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowSession import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.checkPayloadIs import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.serialize import net.corda.core.utilities.NonEmptySet import net.corda.core.utilities.UntrustworthyData -import net.corda.core.internal.checkPayloadIs class FlowSessionImpl( - override val counterparty: Party, + override val destination: Destination, val sourceSessionId: SessionId ) : FlowSession() { - override fun toString() = "FlowSessionImpl(counterparty=$counterparty, sourceSessionId=$sourceSessionId)" + override val counterparty: Party get() = checkNotNull(destination as? Party) { "$destination is not a Party" } - override fun equals(other: Any?): Boolean { - return (other as? FlowSessionImpl)?.sourceSessionId == sourceSessionId - } + override fun toString(): String = "FlowSessionImpl(destination=$destination, sourceSessionId=$sourceSessionId)" - override fun hashCode() = sourceSessionId.hashCode() + override fun equals(other: Any?): Boolean = other === this || other is FlowSessionImpl && other.sourceSessionId == sourceSessionId - private fun getFlowStateMachine(): FlowStateMachine<*> { - return Fiber.currentFiber() as FlowStateMachine<*> - } + override fun hashCode(): Int = sourceSessionId.hashCode() + + private val flowStateMachine: FlowStateMachine<*> get() = Fiber.currentFiber() as FlowStateMachine<*> @Suspendable override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo { val request = FlowIORequest.GetFlowInfo(NonEmptySet.of(this)) - return getFlowStateMachine().suspend(request, maySkipCheckpoint)[this]!! + return flowStateMachine.suspend(request, maySkipCheckpoint).getValue(this) } @Suspendable - override fun getCounterpartyFlowInfo() = getCounterpartyFlowInfo(maySkipCheckpoint = false) + override fun getCounterpartyFlowInfo(): FlowInfo = getCounterpartyFlowInfo(maySkipCheckpoint = false) @Suspendable override fun sendAndReceive( @@ -51,8 +50,8 @@ class FlowSessionImpl( sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)), shouldRetrySend = false ) - val responseValues: Map> = getFlowStateMachine().suspend(request, maySkipCheckpoint) - val responseForCurrentSession = responseValues[this]!! + val responseValues: Map> = flowStateMachine.suspend(request, maySkipCheckpoint) + val responseForCurrentSession = responseValues.getValue(this) return responseForCurrentSession.checkPayloadIs(receiveType) } @@ -64,7 +63,7 @@ class FlowSessionImpl( override fun receive(receiveType: Class, maySkipCheckpoint: Boolean): UntrustworthyData { enforceNotPrimitive(receiveType) val request = FlowIORequest.Receive(NonEmptySet.of(this)) - return getFlowStateMachine().suspend(request, maySkipCheckpoint)[this]!!.checkPayloadIs(receiveType) + return flowStateMachine.suspend(request, maySkipCheckpoint).getValue(this).checkPayloadIs(receiveType) } @Suspendable @@ -75,7 +74,7 @@ class FlowSessionImpl( val request = FlowIORequest.Send( sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)) ) - return getFlowStateMachine().suspend(request, maySkipCheckpoint) + return flowStateMachine.suspend(request, maySkipCheckpoint) } @Suspendable diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 36a4e1ecb0..68ad5d93d8 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -10,6 +10,7 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.cordapp.Cordapp import net.corda.core.flows.* +import net.corda.core.identity.AnonymousParty import net.corda.core.identity.Party import net.corda.core.internal.* import net.corda.core.serialization.internal.CheckpointSerializationContext @@ -230,7 +231,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, "Transaction context is missing. This might happen if a suspendable method is not annotated with @Suspendable annotation." } } else { - require(contextTransactionOrNull == null){"Transaction is marked as not present, but is not null"} + require(contextTransactionOrNull == null) { "Transaction is marked as not present, but is not null" } } } @@ -354,9 +355,10 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } @Suspendable - override fun initiateFlow(party: Party): FlowSession { + override fun initiateFlow(destination: Destination): FlowSession { + require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" } val resume = processEventImmediately( - Event.InitiateFlow(party), + Event.InitiateFlow(destination), isDbTransactionOpenOnEntry = true, isDbTransactionOpenOnExit = true ) as FlowContinuation.Resume @@ -440,7 +442,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, isDbTransactionOpenOnEntry = true, isDbTransactionOpenOnExit = false ) - require(continuation == FlowContinuation.ProcessEvents){"Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation "} + require(continuation == FlowContinuation.ProcessEvents) { "Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation " } unpark(SERIALIZER_BLOCKER) } return uncheckedCast(processEventsUntilFlowIsResumed( diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 1e059a361e..7816cbf9d5 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -2,6 +2,7 @@ package net.corda.node.services.statemachine import net.corda.core.context.InvocationContext import net.corda.core.crypto.SecureHash +import net.corda.core.flows.Destination import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic import net.corda.core.identity.Party @@ -98,7 +99,7 @@ sealed class SessionState { * We haven't yet sent the initialisation message */ data class Uninitiated( - val party: Party, + val destination: Destination, val initiatingSubFlow: SubFlow.Initiating, val sourceSessionId: SessionId, val additionalEntropy: Long diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 2d4a9b6ddc..3269a87a2f 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -222,7 +222,7 @@ class StartedFlowTransition( deduplicationSeed = sessionState.deduplicationSeed ) val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, newSessionState) - actions.add(Action.SendInitial(sessionState.party, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))) + actions.add(Action.SendInitial(sessionState.destination, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))) newSessions[sourceSessionId] = newSessionState } currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions)) @@ -256,7 +256,7 @@ class StartedFlowTransition( when (existingSessionState) { is SessionState.Uninitiated -> { val initialMessage = createInitialSessionMessage(existingSessionState.initiatingSubFlow, sourceSessionId, existingSessionState.additionalEntropy, message) - actions.add(Action.SendInitial(existingSessionState.party, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))) + actions.add(Action.SendInitial(existingSessionState.destination, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))) newSessions[sourceSessionId] = SessionState.Initiating( bufferedMessages = emptyList(), rejectionError = null, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index d9e7b7e173..22e08c6b41 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -235,8 +235,8 @@ class TopLevelTransition( return@builder FlowContinuation.ProcessEvents } val sourceSessionId = SessionId.createRandom(context.secureRandom) - val sessionImpl = FlowSessionImpl(event.party, sourceSessionId) - val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.party, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong())) + val sessionImpl = FlowSessionImpl(event.destination, sourceSessionId) + val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong())) currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions)) actions.add(Action.AddSessionBinding(context.id, sourceSessionId)) FlowContinuation.Resume(sessionImpl) diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index ccfd98962e..25fbefcf93 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -36,8 +36,7 @@ import net.corda.testing.internal.LogHelper import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin import net.corda.testing.node.internal.* -import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.assertThatThrownBy +import org.assertj.core.api.Assertions.* import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType import org.assertj.core.api.Condition import org.junit.After @@ -463,6 +462,28 @@ class FlowFrameworkTests { ) } + @Test + fun `initiating flow using unknown AnonymousParty`() { + val anonymousBob = bobNode.services.keyManagementService.freshKeyAndCert(bobNode.info.legalIdentitiesAndCerts.single(), false).party.anonymise() + bobNode.registerCordappFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) } + val result = aliceNode.services.startFlow(SendAndReceiveFlow(anonymousBob, "Hello")).resultFuture + mockNet.runNetwork() + assertThatIllegalArgumentException() + .isThrownBy { result.getOrThrow() } + .withMessage("We do not know who $anonymousBob belongs to") + } + + @Test + fun `initiating flow using known AnonymousParty`() { + val anonymousBob = bobNode.services.keyManagementService.freshKeyAndCert(bobNode.info.legalIdentitiesAndCerts.single(), false) + aliceNode.services.identityService.verifyAndRegisterIdentity(anonymousBob) + val bobResponderFlow = bobNode.registerCordappFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) } + val result = aliceNode.services.startFlow(SendAndReceiveFlow(anonymousBob.party.anonymise(), "Hello")).resultFuture + mockNet.runNetwork() + bobResponderFlow.getOrThrow() + assertThat(result.getOrThrow()).isEqualTo("HelloHello") + } + //region Helpers private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0) @@ -762,12 +783,12 @@ internal class MyFlowException(override val message: String) : FlowException() { internal class MyPeerFlowException(override val message: String, val peer: Party) : FlowException() @InitiatingFlow -internal class SendAndReceiveFlow(private val otherParty: Party, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic() { +internal class SendAndReceiveFlow(private val destination: Destination, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic() { constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession) @Suspendable override fun call(): Any { - return (otherPartySession ?: initiateFlow(otherParty)).sendAndReceive(payload).unwrap { it } + return (otherPartySession ?: initiateFlow(destination)).sendAndReceive(payload).unwrap { it } } } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index 776c8eb9f6..b3a295dac9 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -142,7 +142,8 @@ class RetryFlowMockTest { val alice = TestIdentity(CordaX500Name.parse("L=London,O=Alice Ltd,OU=Trade,C=GB")).party val records = nodeA.smm.flowHospital.track().updates.toBlocking().toIterable().iterator() val flow: FlowStateMachine = nodeA.services.startFlow(FinalityHandler(object : FlowSession() { - override val counterparty = alice + override val destination: Destination get() = alice + override val counterparty: Party get() = alice override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo { TODO("not implemented")