Merge pull request #5245 from corda/shams-merge-cid-878

CID-878 Flows: Create flow sessions with anonymous parties by the introduction (and the introduction of Destination abstraction)
This commit is contained in:
Shams Asari 2019-06-24 17:26:09 +01:00 committed by GitHub
commit 48b0de5e57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 349 additions and 80 deletions

View File

@ -2897,7 +2897,7 @@ public abstract class net.corda.core.identity.AbstractParty extends java.lang.Ob
##
@DoNotImplement
@CordaSerializable
public final class net.corda.core.identity.AnonymousParty extends net.corda.core.identity.AbstractParty
public final class net.corda.core.identity.AnonymousParty extends net.corda.core.identity.AbstractParty implements net.corda.core.flows.Destination
public <init>(java.security.PublicKey)
@Nullable
public net.corda.core.identity.CordaX500Name nameOrNull()
@ -2978,7 +2978,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec
##
@DoNotImplement
@CordaSerializable
public final class net.corda.core.identity.Party extends net.corda.core.identity.AbstractParty
public final class net.corda.core.identity.Party extends net.corda.core.identity.AbstractParty implements net.corda.core.flows.Destination
public <init>(java.security.cert.X509Certificate)
public <init>(net.corda.core.identity.CordaX500Name, java.security.PublicKey)
@NotNull

View File

@ -3,8 +3,11 @@ package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.TransactionSignature
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.identity.groupPublicKeysByWellKnownParty
import net.corda.core.internal.toMultiMap
import net.corda.core.node.ServiceHub
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
@ -65,7 +68,12 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig
val sessionsToCollectFrom: Collection<FlowSession>,
val myOptionalKeys: Iterable<PublicKey>?,
override val progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : FlowLogic<SignedTransaction>() {
@JvmOverloads constructor(partiallySignedTx: SignedTransaction, sessionsToCollectFrom: Collection<FlowSession>, progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : this(partiallySignedTx, sessionsToCollectFrom, null, progressTracker)
@JvmOverloads
constructor(
partiallySignedTx: SignedTransaction,
sessionsToCollectFrom: Collection<FlowSession>,
progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()
) : this(partiallySignedTx, sessionsToCollectFrom, null, progressTracker)
companion object {
object COLLECTING : ProgressTracker.Step("Collecting signatures from counterparties.")
@ -75,7 +83,6 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig
fun tracker() = ProgressTracker(COLLECTING, VERIFYING)
// TODO: Make the progress tracker adapt to the number of counterparties to collect from.
}
@Suspendable
@ -105,14 +112,61 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig
// If the unsigned counterparties list is empty then we don't need to collect any more signatures here.
if (unsigned.isEmpty()) return partiallySignedTx
val partyToKeysMap = groupPublicKeysByWellKnownParty(serviceHub, unsigned)
// Check that we have a session for all parties. No more, no less.
require(sessionsToCollectFrom.map { it.counterparty }.toSet() == partyToKeysMap.keys) {
val setOfAllSessionKeys: Map<PublicKey, FlowSession> = sessionsToCollectFrom
.groupBy {
val destination = it.destination
when (destination) {
is Party -> destination.owningKey
is AnonymousParty -> destination.owningKey
else -> throw IllegalArgumentException("Signatures can only be collected from Party or AnonymousParty, not $destination")
}
}
.mapValues {
require(it.value.size == 1) { "There are multiple sessions initiated for party key ${it.key.toStringShort()}" }
it.value.first()
}
val partyToKeysItSignsFor: Map<Party, List<PublicKey>> = groupPublicKeysByWellKnownParty(serviceHub, unsigned)
val keyToSigningParty: Map<PublicKey, Party> = partyToKeysItSignsFor
.flatMap { (wellKnown, allKeysItSignsFor) -> allKeysItSignsFor.map { it to wellKnown } }
.toMap()
val unrelatedSessions = sessionsToCollectFrom.filterNot {
if (it.destination is Party) {
// The session must have a corresponding unsigned.
it.destination in partyToKeysItSignsFor
} else {
// setOfAllSessionKeys has already checked for valid destination types so we can safely cast to AnonoymousParty here.
// This session was not initiated by a wellKnownParty so must directly exist in the unsigned.
(it.destination as AnonymousParty).owningKey in unsigned
}
}
val keyToSessionList = unsigned.map {
val session = setOfAllSessionKeys[it]
if (session != null) {
// The unsigned key exists directly as a sessionKey, so use that session
it to session
} else {
// It might be delegated to a wellKnownParty
val wellKnownParty = checkNotNull(keyToSigningParty[it]) { "Could not find a session or wellKnown party for key ${it.toStringShort()}" }
// There is a wellKnownParty for this key, check if it has a session, and if so - use that session
val sessionForWellKnownParty = checkNotNull(setOfAllSessionKeys[wellKnownParty.owningKey]) {
"No session available to request signature for key: ${it.toStringShort()}"
}
it to sessionForWellKnownParty
}
}
// Now invert the map to find the keys per session
val sessionToKeysMap = keyToSessionList.map { it.second to it.first }.toMultiMap()
require(unrelatedSessions.isEmpty()) {
"The Initiator of CollectSignaturesFlow must pass in exactly the sessions required to sign the transaction."
}
// Collect signatures from all counterparties and append them to the partially signed transaction.
val counterpartySignatures = sessionsToCollectFrom.flatMap { session ->
subFlow(CollectSignatureFlow(partiallySignedTx, session, partyToKeysMap[session.counterparty]!!))
val counterpartySignatures = sessionToKeysMap.flatMap { (session, keys) ->
subFlow(CollectSignatureFlow(partiallySignedTx, session, keys))
}
val stx = partiallySignedTx + counterpartySignatures
@ -196,7 +250,7 @@ class CollectSignatureFlow(val partiallySignedTx: SignedTransaction, val session
* @param otherSideSession The session which is providing you a transaction to sign.
*/
abstract class SignTransactionFlow @JvmOverloads constructor(val otherSideSession: FlowSession,
override val progressTracker: ProgressTracker = SignTransactionFlow.tracker()) : FlowLogic<SignedTransaction>() {
override val progressTracker: ProgressTracker = SignTransactionFlow.tracker()) : FlowLogic<SignedTransaction>() {
companion object {
object RECEIVING : ProgressTracker.Step("Receiving transaction proposal for signing.")
@ -247,12 +301,6 @@ abstract class SignTransactionFlow @JvmOverloads constructor(val otherSideSessio
@Suspendable
private fun checkSignatures(stx: SignedTransaction) {
// We set `ignoreUnrecognisedParties` to `true` in `groupPublicKeysByWellKnownParty`. This is because we don't
// need to recognise all keys, but just the initiator's.
val signingWellKnownIdentities = groupPublicKeysByWellKnownParty(serviceHub, stx.sigs.map(TransactionSignature::by), true)
require(otherSideSession.counterparty in signingWellKnownIdentities) {
"The Initiator of CollectSignaturesFlow must have signed the transaction. Found $signingWellKnownIdentities, expected $otherSideSession"
}
val signed = stx.sigs.map { it.by }
val allSigners = stx.tx.requiredSigningKeys
val notSigned = allSigners - signed

View File

@ -115,6 +115,13 @@ abstract class FlowLogic<out T> {
*/
val serviceHub: ServiceHub get() = stateMachine.serviceHub
/**
* Creates a communication session with [destination]. Subsequently you may send/receive using this session object. How the messaging
* is routed depends on the [Destination] type, including whether this call does any initial communication.
*/
@Suspendable
fun initiateFlow(destination: Destination): FlowSession = stateMachine.initiateFlow(destination)
/**
* Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note
* that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
@ -252,7 +259,6 @@ abstract class FlowLogic<out T> {
return sendAndReceiveWithRetry(R::class.java, payload)
}
/** Suspends until a message has been received for each session in the specified [sessions].
*
* Consider [receiveAll(receiveType: Class<R>, sessions: List<FlowSession>): List<UntrustworthyData<R>>] when the same type is expected from all sessions.

View File

@ -2,11 +2,12 @@ package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.DoNotImplement
import net.corda.core.KeepForDJVM
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.utilities.UntrustworthyData
/**
*
* A [FlowSession] is a handle on a communication sequence between two paired flows, possibly running on separate nodes.
* It is used to send and receive messages between the flows as well as to query information about the counter-flow.
*
@ -45,8 +46,18 @@ import net.corda.core.utilities.UntrustworthyData
@DoNotImplement
abstract class FlowSession {
/**
* The [Party] on the other side of this session. In the case of a session created by [FlowLogic.initiateFlow]
* [counterparty] is the same Party as the one passed to that function.
* The [Destination] on the other side of this session. In the case of a session created by [FlowLogic.initiateFlow] this is the same
* destination as the one passed to that function.
*/
abstract val destination: Destination
/**
* If the destination on the other side of this session is a [Party] then returns that, otherwise throws [IllegalStateException].
*
* Only use this method if it's known the other side is a [Party], otherwise use [destination].
*
* @throws IllegalStateException if the other side is not a [Party].
* @see destination
*/
abstract val counterparty: Party
@ -181,3 +192,14 @@ abstract class FlowSession {
@Suspendable
abstract fun send(payload: Any)
}
/**
* An abstraction for flow session destinations. A flow can send to and receive from objects which implement this interface. The specifics
* of how the messages are routed depend on the implementation.
*
* Corda currently only supports a fixed set of destination types, namely [Party] and [AnonymousParty]. New destination types will be added
* in future releases.
*/
@DoNotImplement
@KeepForDJVM
interface Destination

View File

@ -3,15 +3,22 @@ package net.corda.core.identity
import net.corda.core.KeepForDJVM
import net.corda.core.contracts.PartyAndReference
import net.corda.core.crypto.toStringShort
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowLogic
import net.corda.core.utilities.OpaqueBytes
import java.security.PublicKey
/**
* The [AnonymousParty] class contains enough information to uniquely identify a [Party] while excluding private
* information such as name. It is intended to represent a party on the distributed ledger.
*
* ### Flow sessions
*
* Anonymous parties can be used to communicate using the [FlowLogic.initiateFlow] method. Message routing is simply routing to the well-known
* [Party] the anonymous party belongs to. This mechanism assumes the party initiating the communication knows who the anonymous party is.
*/
@KeepForDJVM
class AnonymousParty(owningKey: PublicKey) : AbstractParty(owningKey) {
class AnonymousParty(owningKey: PublicKey) : Destination, AbstractParty(owningKey) {
override fun nameOrNull(): CordaX500Name? = null
override fun ref(bytes: OpaqueBytes): PartyAndReference = PartyAndReference(this, bytes)
override fun toString() = "Anonymous(${owningKey.toStringShort()})"

View File

@ -4,6 +4,8 @@ import net.corda.core.KeepForDJVM
import net.corda.core.contracts.PartyAndReference
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.Crypto
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowLogic
import net.corda.core.utilities.OpaqueBytes
import java.security.PublicKey
import java.security.cert.X509Certificate
@ -25,10 +27,15 @@ import java.security.cert.X509Certificate
*
* Note that equality is based solely on the owning key.
*
* ### Flow sessions
*
* Communication with other parties is done using the flow framework with the [FlowLogic.initiateFlow] method. Message routing is done by
* using the network map to look up the connectivity details pertaining to the [Party].
*
* @see CompositeKey
*/
@KeepForDJVM
class Party(val name: CordaX500Name, owningKey: PublicKey) : AbstractParty(owningKey) {
class Party(val name: CordaX500Name, owningKey: PublicKey) : Destination, AbstractParty(owningKey) {
constructor(certificate: X509Certificate)
: this(CordaX500Name.build(certificate.subjectX500Principal), Crypto.toSupportedPublicKey(certificate.publicKey))

View File

@ -5,10 +5,7 @@ import net.corda.core.DeleteForDJVM
import net.corda.core.DoNotImplement
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import org.slf4j.Logger
@ -21,7 +18,7 @@ interface FlowStateMachine<FLOWRETURN> {
fun <SUSPENDRETURN : Any> suspend(ioRequest: FlowIORequest<SUSPENDRETURN>, maySkipCheckpoint: Boolean): SUSPENDRETURN
@Suspendable
fun initiateFlow(party: Party): FlowSession
fun initiateFlow(destination: Destination): FlowSession
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>)

View File

@ -5,6 +5,9 @@ import net.corda.core.DoNotImplement
import net.corda.core.contracts.PartyAndReference
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.*
import net.corda.core.internal.hash
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import java.security.InvalidAlgorithmParameterException
import java.security.PublicKey
import java.security.cert.*
@ -24,6 +27,10 @@ interface IdentityService {
val trustAnchor: TrustAnchor
val caCertStore: CertStore
companion object {
private val log = contextLogger()
}
/**
* Verify and then store an identity.
*
@ -94,6 +101,7 @@ interface IdentityService {
// The original version of this would return the party as-is if it was a Party (rather than AnonymousParty),
// however that means that we don't verify that we know who owns the key. As such as now enforce turning the key
// into a party, and from there figure out the well known party.
log.debug { "Attempting to find wellKnownParty for: ${party.owningKey.hash}" }
val candidate = partyFromKey(party.owningKey)
// TODO: This should be done via the network map cache, which is the authoritative source of well known identities
return if (candidate != null) {

View File

@ -6,13 +6,11 @@ import net.corda.core.contracts.Command
import net.corda.core.contracts.StateAndContract
import net.corda.core.contracts.requireThat
import net.corda.core.flows.mixins.WithContracts
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.excludeHostNode
import net.corda.core.identity.groupAbstractPartyByWellKnownParty
import net.corda.core.identity.*
import net.corda.core.node.services.IdentityService
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.*
import net.corda.testing.internal.matchers.flow.willReturn
@ -23,8 +21,11 @@ import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.enclosedCordapp
import org.hamcrest.CoreMatchers.`is`
import org.junit.AfterClass
import org.junit.Assert
import org.junit.Test
import java.security.PublicKey
class CollectSignaturesFlowTests : WithContracts {
companion object {
@ -55,11 +56,59 @@ class CollectSignaturesFlowTests : WithContracts {
aliceNode.verifyAndRegister(bConfidentialIdentity)
assertThat(
aliceNode.startTestFlow(alice, bConfidentialIdentity.party, charlie),
aliceNode.startTestFlow(alice, bConfidentialIdentity.party, charlie),
willReturn(requiredSignatures(3))
)
}
@Test
fun `successfully collects signatures when sessions are initiated with AnonymousParty`() {
val aConfidentialIdentity1 = aliceNode.createConfidentialIdentity(alice)
val bConfidentialIdentity1 = bobNode.createConfidentialIdentity(bob)
val bConfidentialIdentity2 = bobNode.createConfidentialIdentity(bob)
val cConfidentialIdentity1 = charlieNode.createConfidentialIdentity(charlie)
bobNode.registerInitiatedFlow(AnonymousSessionTestFlowResponder::class.java)
charlieNode.registerInitiatedFlow(AnonymousSessionTestFlowResponder::class.java)
val owners = listOf(aConfidentialIdentity1, bConfidentialIdentity1, bConfidentialIdentity2, cConfidentialIdentity1)
val future = aliceNode.startFlow(AnonymousSessionTestFlow(owners)).resultFuture
mockNet.runNetwork()
val stx = future.get()
val missingSigners = stx.getMissingSigners()
Assert.assertThat(missingSigners, `is`(emptySet()))
}
@Test
fun `successfully collects signatures when sessions are initiated with both AnonymousParty and WellKnownParty`() {
val aConfidentialIdentity1 = aliceNode.createConfidentialIdentity(alice)
val bConfidentialIdentity1 = bobNode.createConfidentialIdentity(bob)
val bConfidentialIdentity2 = bobNode.createConfidentialIdentity(bob)
val cConfidentialIdentity1 = charlieNode.createConfidentialIdentity(charlie)
val cConfidentialIdentity2 = charlieNode.createConfidentialIdentity(charlie)
bobNode.registerInitiatedFlow(MixAndMatchAnonymousSessionTestFlowResponder::class.java)
charlieNode.registerInitiatedFlow(MixAndMatchAnonymousSessionTestFlowResponder::class.java)
val owners = listOf(
aConfidentialIdentity1,
bConfidentialIdentity1,
bConfidentialIdentity2,
cConfidentialIdentity1,
cConfidentialIdentity2
)
val keysToLookup = listOf(bConfidentialIdentity1.owningKey, bConfidentialIdentity2.owningKey, cConfidentialIdentity1.owningKey)
val keysToKeepAnonymous = listOf(cConfidentialIdentity2.owningKey)
val future = aliceNode.startFlow(MixAndMatchAnonymousSessionTestFlow(owners, keysToLookup.toSet(), keysToKeepAnonymous.toSet())).resultFuture
mockNet.runNetwork()
val stx = future.get()
val missingSigners = stx.getMissingSigners()
Assert.assertThat(missingSigners, `is`(emptySet()))
}
@Test
fun `no need to collect any signatures`() {
val ptx = aliceNode.signDummyContract(alice.ref(1))
@ -97,10 +146,10 @@ class CollectSignaturesFlowTests : WithContracts {
//region Operators
private fun TestStartedNode.startTestFlow(vararg party: Party) =
startFlowAndRunNetwork(
TestFlow.Initiator(DummyContract.MultiOwnerState(
MAGIC_NUMBER,
listOf(*party)),
mockNet.defaultNotaryIdentity))
TestFlow.Initiator(DummyContract.MultiOwnerState(
MAGIC_NUMBER,
listOf(*party)),
mockNet.defaultNotaryIdentity))
//region Test Flow
// With this flow, the initiator starts the "CollectTransactionFlow". It is then the responders responsibility to
@ -144,3 +193,82 @@ class CollectSignaturesFlowTests : WithContracts {
}
//region
}
@InitiatingFlow
class AnonymousSessionTestFlow(private val cis: List<PartyAndCertificate>) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
for (ci in cis) {
if (ci.name != ourIdentity.name) {
(serviceHub.identityService as IdentityServiceInternal).verifyAndRegisterIdentity(ci)
}
}
val state = DummyContract.MultiOwnerState(owners = cis.map { AnonymousParty(it.owningKey) })
val create = net.corda.testing.contracts.DummyContract.Commands.Create()
val txBuilder = TransactionBuilder(notary = serviceHub.networkMapCache.notaryIdentities.first())
.addOutputState(state)
.addCommand(create, cis.map { it.owningKey })
val ourKey = cis.single { it.name == ourIdentity.name }.owningKey
val signedByUsTx = serviceHub.signInitialTransaction(txBuilder, ourKey)
val sessionsToCollectFrom = cis.filter { it.name != ourIdentity.name }.map { initiateFlow(AnonymousParty(it.owningKey)) }
return subFlow(CollectSignaturesFlow(signedByUsTx, sessionsToCollectFrom, myOptionalKeys = listOf(ourKey)))
}
}
@InitiatedBy(AnonymousSessionTestFlow::class)
class AnonymousSessionTestFlowResponder(private val otherSideSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val signFlow = object : SignTransactionFlow(otherSideSession) {
@Suspendable
override fun checkTransaction(stx: SignedTransaction) = requireThat {
}
}
subFlow(signFlow)
}
}
@InitiatingFlow
class MixAndMatchAnonymousSessionTestFlow(private val cis: List<PartyAndCertificate>,
private val keysToLookUp: Set<PublicKey>,
private val keysToKeepAnonymous: Set<PublicKey>) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
for (ci in cis) {
if (ci.name != ourIdentity.name) {
(serviceHub.identityService as IdentityServiceInternal).verifyAndRegisterIdentity(ci)
}
}
val state = DummyContract.MultiOwnerState(owners = cis.map { AnonymousParty(it.owningKey) })
val create = net.corda.testing.contracts.DummyContract.Commands.Create()
val txBuilder = TransactionBuilder(notary = serviceHub.networkMapCache.notaryIdentities.first())
.addOutputState(state)
.addCommand(create, cis.map { it.owningKey })
val ourKey = cis.single { it.name == ourIdentity.name }.owningKey
val signedByUsTx = serviceHub.signInitialTransaction(txBuilder, ourKey)
val resolvedParties = keysToLookUp.map { serviceHub.identityService.wellKnownPartyFromAnonymous(AnonymousParty(it))!! }.toSet()
val anonymousParties = keysToKeepAnonymous.map { AnonymousParty(it) }
val sessionsToCollectFrom = (resolvedParties + anonymousParties).map { initiateFlow(it as Destination) }
return subFlow(CollectSignaturesFlow(signedByUsTx, sessionsToCollectFrom, myOptionalKeys = listOf(ourKey)))
}
}
@InitiatedBy(MixAndMatchAnonymousSessionTestFlow::class)
class MixAndMatchAnonymousSessionTestFlowResponder(private val otherSideSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val signFlow = object : SignTransactionFlow(otherSideSession) {
@Suspendable
override fun checkTransaction(stx: SignedTransaction) = requireThat {
}
}
subFlow(signFlow)
}
}

View File

@ -9,6 +9,10 @@ release, see :doc:`app-upgrade-notes`.
Version 5.0
-----------
* Introduced a new ``Destination`` abstraction for communicating with non-Party destinations using the new ``FlowLogic.initateFlow(Destination)``
method. ``Party`` and ``AnonymousParty`` have been retrofitted to implement ``Destination``. Initiating a flow to an ``AnonymousParty``
means resolving to the well-known identity ``Party`` and then communicating with that.
* Removed ``finance-workflows`` dependency on jackson library. The functions that used jackson (e.g. ``FinanceJSONSupport``) have been moved
into IRS Demo.

View File

@ -5,6 +5,7 @@ import net.corda.confidential.SwapIdentitiesFlow
import net.corda.core.contracts.Amount
import net.corda.core.contracts.InsufficientBalanceException
import net.corda.core.flows.*
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.TransactionBuilder
@ -52,7 +53,7 @@ open class CashPaymentFlow(
progressTracker.currentStep = GENERATING_ID
val recipientSession = initiateFlow(recipient)
recipientSession.send(anonymous)
val anonymousRecipient = if (anonymous) {
val anonymousRecipient: AbstractParty = if (anonymous) {
subFlow(SwapIdentitiesFlow(recipientSession))[recipient]!!
} else {
recipient

View File

@ -154,7 +154,7 @@ object CashUtils {
// If anonymous is true, generate a new identity that change will be sent to for confidentiality purposes. This means that a
// third party with a copy of the transaction (such as the notary) cannot identify who the change was
// sent to
val changeIdentity = if (anonymous) services.keyManagementService.freshKeyAndCert(ourIdentity, revocationEnabled).party.anonymise() else ourIdentity.party
val changeIdentity: AbstractParty = if (anonymous) services.keyManagementService.freshKeyAndCert(ourIdentity, revocationEnabled).party.anonymise() else ourIdentity.party
return OnLedgerAsset.generateSpend(
tx,
payments,

View File

@ -1,6 +1,7 @@
package net.corda.node.services.api
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.CertRole
import net.corda.core.node.services.IdentityService
@ -8,6 +9,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.crypto.x509Certificates
import java.security.InvalidAlgorithmParameterException
import java.security.PublicKey
import java.security.cert.CertPathValidatorException
import java.security.cert.CertificateExpiredException
import java.security.cert.CertificateNotYetValidException

View File

@ -9,6 +9,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.toBase58String
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
@ -170,7 +171,9 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
}
// We give the caller a copy of the data set to avoid any locking problems
override fun getAllIdentities(): Iterable<PartyAndCertificate> = database.transaction { keyToParties.allPersisted().map { it.second }.asIterable() }
override fun getAllIdentities(): Iterable<PartyAndCertificate> = database.transaction {
keyToParties.allPersisted().map { it.second }.asIterable()
}
override fun wellKnownPartyFromX500Name(name: CordaX500Name): Party? = certificateFromCordaX500Name(name)?.party

View File

@ -1,6 +1,7 @@
package net.corda.node.services.statemachine
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.Destination
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowAsyncOperation
@ -19,10 +20,10 @@ sealed class Action {
data class TrackTransaction(val hash: SecureHash) : Action()
/**
* Send an initial session message to [party].
* Send an initial session message to [destination].
*/
data class SendInitial(
val party: Party,
val destination: Destination,
val initialise: InitialSessionMessage,
val deduplicationId: SenderDeduplicationId
) : Action()

View File

@ -168,7 +168,7 @@ class ActionExecutorImpl(
@Suspendable
private fun executeSendInitial(action: Action.SendInitial) {
flowMessaging.sendSessionMessage(action.party, action.initialise, action.deduplicationId)
flowMessaging.sendSessionMessage(action.destination, action.initialise, action.deduplicationId)
}
@Suspendable

View File

@ -1,5 +1,6 @@
package net.corda.node.services.statemachine
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
@ -69,9 +70,8 @@ sealed class Event {
*
* Initiate a flow. This causes a new session object to be created and returned to the flow. Note that no actual
* communication takes place at this time, only on the first send/receive operation on the session.
* @param party the [Party] to create a session with.
*/
data class InitiateFlow(val party: Party) : Event()
data class InitiateFlow(val destination: Destination) : Event()
/**
* Signal the entering into a subflow.

View File

@ -3,7 +3,9 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import com.esotericsoftware.kryo.KryoException
import net.corda.core.context.InvocationOrigin
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowException
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.serialize
@ -20,11 +22,10 @@ import java.io.NotSerializableException
*/
interface FlowMessaging {
/**
* Send [message] to [party] using [deduplicationId]. Optionally [acknowledgementHandler] may be specified to
* listen on the send acknowledgement.
* Send [message] to [destination] using [deduplicationId].
*/
@Suspendable
fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: SenderDeduplicationId)
fun sendSessionMessage(destination: Destination, message: SessionMessage, deduplicationId: SenderDeduplicationId)
/**
* Start the messaging using the [onMessage] message handler.
@ -49,10 +50,20 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
}
@Suspendable
override fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: SenderDeduplicationId) {
log.trace { "Sending message $deduplicationId $message to party $party" }
override fun sendSessionMessage(destination: Destination, message: SessionMessage, deduplicationId: SenderDeduplicationId) {
val party = if (destination is Party) {
log.trace { "Sending message $deduplicationId $message to $destination" }
destination
} else {
// We assume that the destination type has already been checked by initiateFlow
val wellKnown = requireNotNull(serviceHub.identityService.wellKnownPartyFromAnonymous(destination as AnonymousParty)) {
"We do not know who $destination belongs to"
}
log.trace { "Sending message $deduplicationId $message to $wellKnown on behalf of $destination" }
wellKnown
}
val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId, message.additionalHeaders(party))
val partyInfo = serviceHub.networkMapCache.getPartyInfo(party) ?: throw IllegalArgumentException("Don't know about $party")
val partyInfo = requireNotNull(serviceHub.networkMapCache.getPartyInfo(party)) { "Don't know about $party" }
val address = serviceHub.networkService.getAddressOfParty(partyInfo)
val sequenceKey = when (message) {
is InitialSessionMessage -> message.initiatorSessionId

View File

@ -2,43 +2,42 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowSession
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.checkPayloadIs
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.internal.checkPayloadIs
class FlowSessionImpl(
override val counterparty: Party,
override val destination: Destination,
val sourceSessionId: SessionId
) : FlowSession() {
override fun toString() = "FlowSessionImpl(counterparty=$counterparty, sourceSessionId=$sourceSessionId)"
override val counterparty: Party get() = checkNotNull(destination as? Party) { "$destination is not a Party" }
override fun equals(other: Any?): Boolean {
return (other as? FlowSessionImpl)?.sourceSessionId == sourceSessionId
}
override fun toString(): String = "FlowSessionImpl(destination=$destination, sourceSessionId=$sourceSessionId)"
override fun hashCode() = sourceSessionId.hashCode()
override fun equals(other: Any?): Boolean = other === this || other is FlowSessionImpl && other.sourceSessionId == sourceSessionId
private fun getFlowStateMachine(): FlowStateMachine<*> {
return Fiber.currentFiber() as FlowStateMachine<*>
}
override fun hashCode(): Int = sourceSessionId.hashCode()
private val flowStateMachine: FlowStateMachine<*> get() = Fiber.currentFiber() as FlowStateMachine<*>
@Suspendable
override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo {
val request = FlowIORequest.GetFlowInfo(NonEmptySet.of(this))
return getFlowStateMachine().suspend(request, maySkipCheckpoint)[this]!!
return flowStateMachine.suspend(request, maySkipCheckpoint).getValue(this)
}
@Suspendable
override fun getCounterpartyFlowInfo() = getCounterpartyFlowInfo(maySkipCheckpoint = false)
override fun getCounterpartyFlowInfo(): FlowInfo = getCounterpartyFlowInfo(maySkipCheckpoint = false)
@Suspendable
override fun <R : Any> sendAndReceive(
@ -51,8 +50,8 @@ class FlowSessionImpl(
sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)),
shouldRetrySend = false
)
val responseValues: Map<FlowSession, SerializedBytes<Any>> = getFlowStateMachine().suspend(request, maySkipCheckpoint)
val responseForCurrentSession = responseValues[this]!!
val responseValues: Map<FlowSession, SerializedBytes<Any>> = flowStateMachine.suspend(request, maySkipCheckpoint)
val responseForCurrentSession = responseValues.getValue(this)
return responseForCurrentSession.checkPayloadIs(receiveType)
}
@ -64,7 +63,7 @@ class FlowSessionImpl(
override fun <R : Any> receive(receiveType: Class<R>, maySkipCheckpoint: Boolean): UntrustworthyData<R> {
enforceNotPrimitive(receiveType)
val request = FlowIORequest.Receive(NonEmptySet.of(this))
return getFlowStateMachine().suspend(request, maySkipCheckpoint)[this]!!.checkPayloadIs(receiveType)
return flowStateMachine.suspend(request, maySkipCheckpoint).getValue(this).checkPayloadIs(receiveType)
}
@Suspendable
@ -75,7 +74,7 @@ class FlowSessionImpl(
val request = FlowIORequest.Send(
sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT))
)
return getFlowStateMachine().suspend(request, maySkipCheckpoint)
return flowStateMachine.suspend(request, maySkipCheckpoint)
}
@Suspendable

View File

@ -10,6 +10,7 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.cordapp.Cordapp
import net.corda.core.flows.*
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.serialization.internal.CheckpointSerializationContext
@ -230,7 +231,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
"Transaction context is missing. This might happen if a suspendable method is not annotated with @Suspendable annotation."
}
} else {
require(contextTransactionOrNull == null){"Transaction is marked as not present, but is not null"}
require(contextTransactionOrNull == null) { "Transaction is marked as not present, but is not null" }
}
}
@ -354,9 +355,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
override fun initiateFlow(party: Party): FlowSession {
override fun initiateFlow(destination: Destination): FlowSession {
require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" }
val resume = processEventImmediately(
Event.InitiateFlow(party),
Event.InitiateFlow(destination),
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = true
) as FlowContinuation.Resume
@ -440,7 +442,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = false
)
require(continuation == FlowContinuation.ProcessEvents){"Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation "}
require(continuation == FlowContinuation.ProcessEvents) { "Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation " }
unpark(SERIALIZER_BLOCKER)
}
return uncheckedCast(processEventsUntilFlowIsResumed(

View File

@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
import net.corda.core.context.InvocationContext
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
@ -98,7 +99,7 @@ sealed class SessionState {
* We haven't yet sent the initialisation message
*/
data class Uninitiated(
val party: Party,
val destination: Destination,
val initiatingSubFlow: SubFlow.Initiating,
val sourceSessionId: SessionId,
val additionalEntropy: Long

View File

@ -222,7 +222,7 @@ class StartedFlowTransition(
deduplicationSeed = sessionState.deduplicationSeed
)
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, newSessionState)
actions.add(Action.SendInitial(sessionState.party, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
actions.add(Action.SendInitial(sessionState.destination, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
newSessions[sourceSessionId] = newSessionState
}
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
@ -256,7 +256,7 @@ class StartedFlowTransition(
when (existingSessionState) {
is SessionState.Uninitiated -> {
val initialMessage = createInitialSessionMessage(existingSessionState.initiatingSubFlow, sourceSessionId, existingSessionState.additionalEntropy, message)
actions.add(Action.SendInitial(existingSessionState.party, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
actions.add(Action.SendInitial(existingSessionState.destination, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
newSessions[sourceSessionId] = SessionState.Initiating(
bufferedMessages = emptyList(),
rejectionError = null,

View File

@ -235,8 +235,8 @@ class TopLevelTransition(
return@builder FlowContinuation.ProcessEvents
}
val sourceSessionId = SessionId.createRandom(context.secureRandom)
val sessionImpl = FlowSessionImpl(event.party, sourceSessionId)
val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.party, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
val sessionImpl = FlowSessionImpl(event.destination, sourceSessionId)
val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
actions.add(Action.AddSessionBinding(context.id, sourceSessionId))
FlowContinuation.Resume(sessionImpl)

View File

@ -36,8 +36,7 @@ import net.corda.testing.internal.LogHelper
import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
import net.corda.testing.node.internal.*
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.Assertions.*
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
import org.assertj.core.api.Condition
import org.junit.After
@ -463,6 +462,28 @@ class FlowFrameworkTests {
)
}
@Test
fun `initiating flow using unknown AnonymousParty`() {
val anonymousBob = bobNode.services.keyManagementService.freshKeyAndCert(bobNode.info.legalIdentitiesAndCerts.single(), false).party.anonymise()
bobNode.registerCordappFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
val result = aliceNode.services.startFlow(SendAndReceiveFlow(anonymousBob, "Hello")).resultFuture
mockNet.runNetwork()
assertThatIllegalArgumentException()
.isThrownBy { result.getOrThrow() }
.withMessage("We do not know who $anonymousBob belongs to")
}
@Test
fun `initiating flow using known AnonymousParty`() {
val anonymousBob = bobNode.services.keyManagementService.freshKeyAndCert(bobNode.info.legalIdentitiesAndCerts.single(), false)
aliceNode.services.identityService.verifyAndRegisterIdentity(anonymousBob)
val bobResponderFlow = bobNode.registerCordappFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
val result = aliceNode.services.startFlow(SendAndReceiveFlow(anonymousBob.party.anonymise(), "Hello")).resultFuture
mockNet.runNetwork()
bobResponderFlow.getOrThrow()
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
}
//region Helpers
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
@ -762,12 +783,12 @@ internal class MyFlowException(override val message: String) : FlowException() {
internal class MyPeerFlowException(override val message: String, val peer: Party) : FlowException()
@InitiatingFlow
internal class SendAndReceiveFlow(private val otherParty: Party, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic<Any>() {
internal class SendAndReceiveFlow(private val destination: Destination, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic<Any>() {
constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession)
@Suspendable
override fun call(): Any {
return (otherPartySession ?: initiateFlow(otherParty)).sendAndReceive<Any>(payload).unwrap { it }
return (otherPartySession ?: initiateFlow(destination)).sendAndReceive<Any>(payload).unwrap { it }
}
}

View File

@ -142,7 +142,8 @@ class RetryFlowMockTest {
val alice = TestIdentity(CordaX500Name.parse("L=London,O=Alice Ltd,OU=Trade,C=GB")).party
val records = nodeA.smm.flowHospital.track().updates.toBlocking().toIterable().iterator()
val flow: FlowStateMachine<Unit> = nodeA.services.startFlow(FinalityHandler(object : FlowSession() {
override val counterparty = alice
override val destination: Destination get() = alice
override val counterparty: Party get() = alice
override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo {
TODO("not implemented")