CORDA-3033: Introducing Destination interface for initiating flows with (#5242)

Party and AnonymousParty have been retrofitted to implement this interface and are currently the only supported types. A new FlowLogic.initiateFlow(Destination) method overload to easily support the addition of new destination types in future versions.
This commit is contained in:
Shams Asari 2019-06-21 16:39:56 +01:00 committed by GitHub
parent f0f05df9f4
commit 76eec9aa8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 183 additions and 122 deletions

View File

@ -2897,7 +2897,7 @@ public abstract class net.corda.core.identity.AbstractParty extends java.lang.Ob
##
@DoNotImplement
@CordaSerializable
public final class net.corda.core.identity.AnonymousParty extends net.corda.core.identity.AbstractParty
public final class net.corda.core.identity.AnonymousParty extends net.corda.core.identity.AbstractParty implements net.corda.core.flows.Destination
public <init>(java.security.PublicKey)
@Nullable
public net.corda.core.identity.CordaX500Name nameOrNull()
@ -2978,7 +2978,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec
##
@DoNotImplement
@CordaSerializable
public final class net.corda.core.identity.Party extends net.corda.core.identity.AbstractParty
public final class net.corda.core.identity.Party extends net.corda.core.identity.AbstractParty implements net.corda.core.flows.Destination
public <init>(java.security.cert.X509Certificate)
public <init>(net.corda.core.identity.CordaX500Name, java.security.PublicKey)
@NotNull

2
.idea/compiler.xml generated
View File

@ -254,6 +254,8 @@
<module name="test-cli_test" target="1.8" />
<module name="test-common_main" target="1.8" />
<module name="test-common_test" target="1.8" />
<module name="test-db_main" target="1.8" />
<module name="test-db_test" target="1.8" />
<module name="test-utils_integrationTest" target="1.8" />
<module name="test-utils_main" target="1.8" />
<module name="test-utils_test" target="1.8" />

View File

@ -502,4 +502,4 @@ wrapper {
buildScan {
termsOfServiceUrl = 'https://gradle.com/terms-of-service'
termsOfServiceAgree = 'yes'
}
}

View File

@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.TransactionSignature
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.identity.groupPublicKeysByWellKnownParty
import net.corda.core.internal.toMultiMap
@ -68,7 +69,11 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig
val myOptionalKeys: Iterable<PublicKey>?,
override val progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : FlowLogic<SignedTransaction>() {
@JvmOverloads
constructor(partiallySignedTx: SignedTransaction, sessionsToCollectFrom: Collection<FlowSession>, progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : this(partiallySignedTx, sessionsToCollectFrom, null, progressTracker)
constructor(
partiallySignedTx: SignedTransaction,
sessionsToCollectFrom: Collection<FlowSession>,
progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()
) : this(partiallySignedTx, sessionsToCollectFrom, null, progressTracker)
companion object {
object COLLECTING : ProgressTracker.Step("Collecting signatures from counterparties.")
@ -78,7 +83,6 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig
fun tracker() = ProgressTracker(COLLECTING, VERIFYING)
// TODO: Make the progress tracker adapt to the number of counterparties to collect from.
}
@Suspendable
@ -108,54 +112,60 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig
// If the unsigned counterparties list is empty then we don't need to collect any more signatures here.
if (unsigned.isEmpty()) return partiallySignedTx
val setOfAllSessionKeys = sessionsToCollectFrom.groupBy { it.sessionOwningKey }.map {
require(it.value.size == 1) { "There are multiple sessions initiated for party key ${it.key.toStringShort()}" }
it.key to it.value.first()
}.toMap()
val setOfAllSessionKeys: Map<PublicKey, FlowSession> = sessionsToCollectFrom
.groupBy {
val destination = it.destination
when (destination) {
is Party -> destination.owningKey
is AnonymousParty -> destination.owningKey
else -> throw IllegalArgumentException("Signatures can only be collected from Party or AnonymousParty, not $destination")
}
}
.mapValues {
require(it.value.size == 1) { "There are multiple sessions initiated for party key ${it.key.toStringShort()}" }
it.value.first()
}
val partyToKeysItSignsFor = groupPublicKeysByWellKnownParty(serviceHub, unsigned)
val keyToSigningParty = partyToKeysItSignsFor.flatMap { (wellKnown, allKeysItSignsFor) ->
allKeysItSignsFor.map { it to wellKnown }
}.toMap()
val partyToKeysItSignsFor: Map<Party, List<PublicKey>> = groupPublicKeysByWellKnownParty(serviceHub, unsigned)
val keyToSigningParty: Map<PublicKey, Party> = partyToKeysItSignsFor
.flatMap { (wellKnown, allKeysItSignsFor) -> allKeysItSignsFor.map { it to wellKnown } }
.toMap()
val unrelatedSessions = sessionsToCollectFrom.filterNot {
if (it.sessionOwningKey == it.counterparty.owningKey) {
//this session was initiated by a wellKnownParty
//the session must have a corresponding unsigned
it.counterparty in partyToKeysItSignsFor
if (it.destination is Party) {
// The session must have a corresponding unsigned.
it.destination in partyToKeysItSignsFor
} else {
//this session was not initiated by a wellKnownParty
//so must directly exist in the unsigned
unsigned.contains(it.sessionOwningKey)
// setOfAllSessionKeys has already checked for valid destination types so we can safely cast to AnonoymousParty here.
// This session was not initiated by a wellKnownParty so must directly exist in the unsigned.
(it.destination as AnonymousParty).owningKey in unsigned
}
}
val keyToSessionMap = unsigned.map {
if (it in setOfAllSessionKeys) {
// the unsigned key exists directly as a sessionKey, so use that session
it to setOfAllSessionKeys[it]!!
val keyToSessionList = unsigned.map {
val session = setOfAllSessionKeys[it]
if (session != null) {
// The unsigned key exists directly as a sessionKey, so use that session
it to session
} else {
//it might be delegated to a wellKnownParty
val wellKnownParty: Party? = keyToSigningParty[it]
if (wellKnownParty != null) {
//there is a wellKnownParty for this key, check if it has a session, and if so - use that session
val sessionForWellKnownParty = setOfAllSessionKeys[wellKnownParty.owningKey]
?: throw IllegalStateException("No session available to request signature for key: ${it.toStringShort()}")
it to sessionForWellKnownParty
} else {
throw IllegalStateException("Could not find a session or wellKnown party for key ${it.toStringShort()}")
// It might be delegated to a wellKnownParty
val wellKnownParty = checkNotNull(keyToSigningParty[it]) { "Could not find a session or wellKnown party for key ${it.toStringShort()}" }
// There is a wellKnownParty for this key, check if it has a session, and if so - use that session
val sessionForWellKnownParty = checkNotNull(setOfAllSessionKeys[wellKnownParty.owningKey]) {
"No session available to request signature for key: ${it.toStringShort()}"
}
it to sessionForWellKnownParty
}
}
//now invert the map to find the keys per session
val sessionToKeysMap = keyToSessionMap.map { it.second to it.first }.toMultiMap()
// Now invert the map to find the keys per session
val sessionToKeysMap = keyToSessionList.map { it.second to it.first }.toMultiMap()
require(unrelatedSessions.isEmpty()) {
"The Initiator of CollectSignaturesFlow must pass in exactly the sessions required to sign the transaction."
}
// Collect signatures from all counterparties and append them to the partially signed transaction.
val counterpartySignatures = sessionToKeysMap.flatMap { (session, keys)->
val counterpartySignatures = sessionToKeysMap.flatMap { (session, keys) ->
subFlow(CollectSignatureFlow(partiallySignedTx, session, keys))
}
val stx = partiallySignedTx + counterpartySignatures

View File

@ -6,7 +6,6 @@ import net.corda.core.CordaInternal
import net.corda.core.DeleteForDJVM
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.*
@ -117,17 +116,11 @@ abstract class FlowLogic<out T> {
val serviceHub: ServiceHub get() = stateMachine.serviceHub
/**
* Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note
* that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
* Creates a communication session with [destination]. Subsequently you may send/receive using this session object. How the messaging
* is routed depends on the [Destination] type, including whether this call does any initial communication.
*/
@Suspendable
fun initiateFlow(requested: AbstractParty): FlowSession {
val wellKnown = serviceHub.identityService.wellKnownPartyFromAnonymous(requested)
if (wellKnown == null) {
throw IllegalStateException("could not initiate flow with party $requested as they are not in the node identity service")
}
return stateMachine.initiateFlow(wellKnown = wellKnown, requested = requested)
}
fun initiateFlow(destination: Destination): FlowSession = stateMachine.initiateFlow(destination)
/**
* Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note

View File

@ -2,12 +2,12 @@ package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.DoNotImplement
import net.corda.core.KeepForDJVM
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.utilities.UntrustworthyData
import java.security.PublicKey
/**
*
* A [FlowSession] is a handle on a communication sequence between two paired flows, possibly running on separate nodes.
* It is used to send and receive messages between the flows as well as to query information about the counter-flow.
*
@ -45,16 +45,19 @@ import java.security.PublicKey
*/
@DoNotImplement
abstract class FlowSession {
/**
* The current [sessionOwningKey] in the context of this session. It is not guaranteed to be [counterparty.owningKey] as the session
* may have been initiated with an [net.corda.core.identity.AnonymousParty] useful for grouping sessions vs signing requests
* The [Destination] on the other side of this session. In the case of a session created by [FlowLogic.initiateFlow] this is the same
* destination as the one passed to that function.
*/
abstract val sessionOwningKey: PublicKey
abstract val destination: Destination
/**
* The [Party] on the other side of this session. In the case of a session created by [FlowLogic.initiateFlow]
* [counterparty] is the same Party as the one passed to that function.
* If the destination on the other side of this session is a [Party] then returns that, otherwise throws [IllegalStateException].
*
* Only use this method if it's known the other side is a [Party], otherwise use [destination].
*
* @throws IllegalStateException if the other side is not a [Party].
* @see destination
*/
abstract val counterparty: Party
@ -189,3 +192,14 @@ abstract class FlowSession {
@Suspendable
abstract fun send(payload: Any)
}
/**
* An abstraction for flow session destinations. A flow can send to and receive from objects which implement this interface. The specifics
* of how the messages are routed depend on the implementation.
*
* Corda currently only supports a fixed set of destination types, namely [Party] and [AnonymousParty]. New destination types will be added
* in future releases.
*/
@DoNotImplement
@KeepForDJVM
interface Destination

View File

@ -3,15 +3,22 @@ package net.corda.core.identity
import net.corda.core.KeepForDJVM
import net.corda.core.contracts.PartyAndReference
import net.corda.core.crypto.toStringShort
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowLogic
import net.corda.core.utilities.OpaqueBytes
import java.security.PublicKey
/**
* The [AnonymousParty] class contains enough information to uniquely identify a [Party] while excluding private
* information such as name. It is intended to represent a party on the distributed ledger.
*
* ### Flow sessions
*
* Anonymous parties can be used to communicate using the [FlowLogic.initiateFlow] method. Message routing is simply routing to the well-known
* [Party] the anonymous party belongs to. This mechanism assumes the party initiating the communication knows who the anonymous party is.
*/
@KeepForDJVM
class AnonymousParty(owningKey: PublicKey) : AbstractParty(owningKey) {
class AnonymousParty(owningKey: PublicKey) : Destination, AbstractParty(owningKey) {
override fun nameOrNull(): CordaX500Name? = null
override fun ref(bytes: OpaqueBytes): PartyAndReference = PartyAndReference(this, bytes)
override fun toString() = "Anonymous(${owningKey.toStringShort()})"

View File

@ -4,6 +4,8 @@ import net.corda.core.KeepForDJVM
import net.corda.core.contracts.PartyAndReference
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.Crypto
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowLogic
import net.corda.core.utilities.OpaqueBytes
import java.security.PublicKey
import java.security.cert.X509Certificate
@ -25,10 +27,15 @@ import java.security.cert.X509Certificate
*
* Note that equality is based solely on the owning key.
*
* ### Flow sessions
*
* Communication with other parties is done using the flow framework with the [FlowLogic.initiateFlow] method. Message routing is done by
* using the network map to look up the connectivity details pertaining to the [Party].
*
* @see CompositeKey
*/
@KeepForDJVM
class Party(val name: CordaX500Name, owningKey: PublicKey) : AbstractParty(owningKey) {
class Party(val name: CordaX500Name, owningKey: PublicKey) : Destination, AbstractParty(owningKey) {
constructor(certificate: X509Certificate)
: this(CordaX500Name.build(certificate.subjectX500Principal), Crypto.toSupportedPublicKey(certificate.publicKey))

View File

@ -5,11 +5,7 @@ import net.corda.core.DeleteForDJVM
import net.corda.core.DoNotImplement
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.AbstractParty
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import org.slf4j.Logger
@ -22,10 +18,7 @@ interface FlowStateMachine<FLOWRETURN> {
fun <SUSPENDRETURN : Any> suspend(ioRequest: FlowIORequest<SUSPENDRETURN>, maySkipCheckpoint: Boolean): SUSPENDRETURN
@Suspendable
fun initiateFlow(party: Party): FlowSession = initiateFlow(wellKnown = party, requested = null)
@Suspendable
fun initiateFlow(wellKnown: Party, requested: AbstractParty?): FlowSession
fun initiateFlow(destination: Destination): FlowSession
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>)

View File

@ -7,6 +7,7 @@ import net.corda.core.crypto.toStringShort
import net.corda.core.identity.*
import net.corda.core.internal.hash
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import java.security.InvalidAlgorithmParameterException
import java.security.PublicKey
import java.security.cert.*
@ -27,7 +28,7 @@ interface IdentityService {
val caCertStore: CertStore
companion object {
val log = contextLogger()
private val log = contextLogger()
}
/**
@ -100,7 +101,7 @@ interface IdentityService {
// The original version of this would return the party as-is if it was a Party (rather than AnonymousParty),
// however that means that we don't verify that we know who owns the key. As such as now enforce turning the key
// into a party, and from there figure out the well known party.
log.debug("Attempting to find wellKnownParty for: ${party.owningKey.hash}")
log.debug { "Attempting to find wellKnownParty for: ${party.owningKey.hash}" }
val candidate = partyFromKey(party.owningKey)
// TODO: This should be done via the network map cache, which is the authoritative source of well known identities
return if (candidate != null) {

View File

@ -195,7 +195,7 @@ class CollectSignaturesFlowTests : WithContracts {
}
@InitiatingFlow
class AnonymousSessionTestFlow(val cis: List<PartyAndCertificate>) : FlowLogic<SignedTransaction>() {
class AnonymousSessionTestFlow(private val cis: List<PartyAndCertificate>) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
@ -227,14 +227,14 @@ class AnonymousSessionTestFlowResponder(private val otherSideSession: FlowSessio
override fun checkTransaction(stx: SignedTransaction) = requireThat {
}
}
val stxId = subFlow(signFlow).id
subFlow(signFlow)
}
}
@InitiatingFlow
class MixAndMatchAnonymousSessionTestFlow(val cis: List<PartyAndCertificate>,
val keysToLookUp: Set<PublicKey>,
val keysToKeepAnonymous: Set<PublicKey>) : FlowLogic<SignedTransaction>() {
class MixAndMatchAnonymousSessionTestFlow(private val cis: List<PartyAndCertificate>,
private val keysToLookUp: Set<PublicKey>,
private val keysToKeepAnonymous: Set<PublicKey>) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
@ -255,7 +255,7 @@ class MixAndMatchAnonymousSessionTestFlow(val cis: List<PartyAndCertificate>,
val resolvedParties = keysToLookUp.map { serviceHub.identityService.wellKnownPartyFromAnonymous(AnonymousParty(it))!! }.toSet()
val anonymousParties = keysToKeepAnonymous.map { AnonymousParty(it) }
val sessionsToCollectFrom = (resolvedParties + anonymousParties).map { initiateFlow(it) }
val sessionsToCollectFrom = (resolvedParties + anonymousParties).map { initiateFlow(it as Destination) }
return subFlow(CollectSignaturesFlow(signedByUsTx, sessionsToCollectFrom, myOptionalKeys = listOf(ourKey)))
}
}
@ -269,6 +269,6 @@ class MixAndMatchAnonymousSessionTestFlowResponder(private val otherSideSession:
override fun checkTransaction(stx: SignedTransaction) = requireThat {
}
}
val stxId = subFlow(signFlow).id
subFlow(signFlow)
}
}

View File

@ -9,6 +9,10 @@ release, see :doc:`app-upgrade-notes`.
Version 5.0
-----------
* Introduced a new ``Destination`` abstraction for communicating with non-Party destinations using the new ``FlowLogic.initateFlow(Destination)``
method. ``Party`` and ``AnonymousParty`` have been retrofitted to implement ``Destination``. Initiating a flow to an ``AnonymousParty``
means resolving to the well-known identity ``Party`` and then communicating with that.
* Removed ``finance-workflows`` dependency on jackson library. The functions that used jackson (e.g. ``FinanceJSONSupport``) have been moved
into IRS Demo.

View File

@ -5,6 +5,7 @@ import net.corda.confidential.SwapIdentitiesFlow
import net.corda.core.contracts.Amount
import net.corda.core.contracts.InsufficientBalanceException
import net.corda.core.flows.*
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.TransactionBuilder
@ -52,7 +53,7 @@ open class CashPaymentFlow(
progressTracker.currentStep = GENERATING_ID
val recipientSession = initiateFlow(recipient)
recipientSession.send(anonymous)
val anonymousRecipient = if (anonymous) {
val anonymousRecipient: AbstractParty = if (anonymous) {
subFlow(SwapIdentitiesFlow(recipientSession))[recipient]!!
} else {
recipient

View File

@ -154,7 +154,7 @@ object CashUtils {
// If anonymous is true, generate a new identity that change will be sent to for confidentiality purposes. This means that a
// third party with a copy of the transaction (such as the notary) cannot identify who the change was
// sent to
val changeIdentity = if (anonymous) services.keyManagementService.freshKeyAndCert(ourIdentity, revocationEnabled).party.anonymise() else ourIdentity.party
val changeIdentity: AbstractParty = if (anonymous) services.keyManagementService.freshKeyAndCert(ourIdentity, revocationEnabled).party.anonymise() else ourIdentity.party
return OnLedgerAsset.generateSpend(
tx,
payments,

View File

@ -22,8 +22,6 @@ import javax.annotation.concurrent.ThreadSafe
@ThreadSafe
class InMemoryIdentityService(identities: List<PartyAndCertificate> = emptyList(),
override val trustRoot: X509Certificate) : SingletonSerializeAsToken(), IdentityServiceInternal {
companion object {
private val log = contextLogger()
}

View File

@ -1,6 +1,7 @@
package net.corda.node.services.statemachine
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.Destination
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowAsyncOperation
@ -19,10 +20,10 @@ sealed class Action {
data class TrackTransaction(val hash: SecureHash) : Action()
/**
* Send an initial session message to [party].
* Send an initial session message to [destination].
*/
data class SendInitial(
val party: Party,
val destination: Destination,
val initialise: InitialSessionMessage,
val deduplicationId: SenderDeduplicationId
) : Action()

View File

@ -168,7 +168,7 @@ class ActionExecutorImpl(
@Suspendable
private fun executeSendInitial(action: Action.SendInitial) {
flowMessaging.sendSessionMessage(action.party, action.initialise, action.deduplicationId)
flowMessaging.sendSessionMessage(action.destination, action.initialise, action.deduplicationId)
}
@Suspendable

View File

@ -1,7 +1,7 @@
package net.corda.node.services.statemachine
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes
@ -70,9 +70,8 @@ sealed class Event {
*
* Initiate a flow. This causes a new session object to be created and returned to the flow. Note that no actual
* communication takes place at this time, only on the first send/receive operation on the session.
* @param party the [Party] to create a session with.
*/
data class InitiateFlow(val wellKnownParty: Party, val requestedParty: AbstractParty?) : Event()
data class InitiateFlow(val destination: Destination) : Event()
/**
* Signal the entering into a subflow.

View File

@ -3,7 +3,9 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import com.esotericsoftware.kryo.KryoException
import net.corda.core.context.InvocationOrigin
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowException
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.serialize
@ -20,11 +22,10 @@ import java.io.NotSerializableException
*/
interface FlowMessaging {
/**
* Send [message] to [party] using [deduplicationId]. Optionally [acknowledgementHandler] may be specified to
* listen on the send acknowledgement.
* Send [message] to [destination] using [deduplicationId].
*/
@Suspendable
fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: SenderDeduplicationId)
fun sendSessionMessage(destination: Destination, message: SessionMessage, deduplicationId: SenderDeduplicationId)
/**
* Start the messaging using the [onMessage] message handler.
@ -49,10 +50,20 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
}
@Suspendable
override fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: SenderDeduplicationId) {
log.trace { "Sending message $deduplicationId $message to party $party" }
override fun sendSessionMessage(destination: Destination, message: SessionMessage, deduplicationId: SenderDeduplicationId) {
val party = if (destination is Party) {
log.trace { "Sending message $deduplicationId $message to $destination" }
destination
} else {
// We assume that the destination type has already been checked by initiateFlow
val wellKnown = requireNotNull(serviceHub.identityService.wellKnownPartyFromAnonymous(destination as AnonymousParty)) {
"We do not know who $destination belongs to"
}
log.trace { "Sending message $deduplicationId $message to $wellKnown on behalf of $destination" }
wellKnown
}
val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId, message.additionalHeaders(party))
val partyInfo = serviceHub.networkMapCache.getPartyInfo(party) ?: throw IllegalArgumentException("Don't know about $party")
val partyInfo = requireNotNull(serviceHub.networkMapCache.getPartyInfo(party)) { "Don't know about $party" }
val address = serviceHub.networkService.getAddressOfParty(partyInfo)
val sequenceKey = when (message) {
is InitialSessionMessage -> message.initiatorSessionId

View File

@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowSession
import net.corda.core.identity.Party
@ -13,34 +14,30 @@ import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.UntrustworthyData
import java.security.PublicKey
class FlowSessionImpl(
override val counterparty: Party,
override val sessionOwningKey: PublicKey,
override val destination: Destination,
val sourceSessionId: SessionId
) : FlowSession() {
override fun toString() = "FlowSessionImpl(counterparty=$counterparty, sourceSessionId=$sourceSessionId)"
override val counterparty: Party get() = checkNotNull(destination as? Party) { "$destination is not a Party" }
override fun equals(other: Any?): Boolean {
return (other as? FlowSessionImpl)?.sourceSessionId == sourceSessionId
}
override fun toString(): String = "FlowSessionImpl(destination=$destination, sourceSessionId=$sourceSessionId)"
override fun hashCode() = sourceSessionId.hashCode()
override fun equals(other: Any?): Boolean = other === this || other is FlowSessionImpl && other.sourceSessionId == sourceSessionId
private fun getFlowStateMachine(): FlowStateMachine<*> {
return Fiber.currentFiber() as FlowStateMachine<*>
}
override fun hashCode(): Int = sourceSessionId.hashCode()
private val flowStateMachine: FlowStateMachine<*> get() = Fiber.currentFiber() as FlowStateMachine<*>
@Suspendable
override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo {
val request = FlowIORequest.GetFlowInfo(NonEmptySet.of(this))
return getFlowStateMachine().suspend(request, maySkipCheckpoint)[this]!!
return flowStateMachine.suspend(request, maySkipCheckpoint).getValue(this)
}
@Suspendable
override fun getCounterpartyFlowInfo() = getCounterpartyFlowInfo(maySkipCheckpoint = false)
override fun getCounterpartyFlowInfo(): FlowInfo = getCounterpartyFlowInfo(maySkipCheckpoint = false)
@Suspendable
override fun <R : Any> sendAndReceive(
@ -53,8 +50,8 @@ class FlowSessionImpl(
sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT)),
shouldRetrySend = false
)
val responseValues: Map<FlowSession, SerializedBytes<Any>> = getFlowStateMachine().suspend(request, maySkipCheckpoint)
val responseForCurrentSession = responseValues[this]!!
val responseValues: Map<FlowSession, SerializedBytes<Any>> = flowStateMachine.suspend(request, maySkipCheckpoint)
val responseForCurrentSession = responseValues.getValue(this)
return responseForCurrentSession.checkPayloadIs(receiveType)
}
@ -66,7 +63,7 @@ class FlowSessionImpl(
override fun <R : Any> receive(receiveType: Class<R>, maySkipCheckpoint: Boolean): UntrustworthyData<R> {
enforceNotPrimitive(receiveType)
val request = FlowIORequest.Receive(NonEmptySet.of(this))
return getFlowStateMachine().suspend(request, maySkipCheckpoint)[this]!!.checkPayloadIs(receiveType)
return flowStateMachine.suspend(request, maySkipCheckpoint).getValue(this).checkPayloadIs(receiveType)
}
@Suspendable
@ -77,7 +74,7 @@ class FlowSessionImpl(
val request = FlowIORequest.Send(
sessionToMessage = mapOf(this to payload.serialize(context = SerializationDefaults.P2P_CONTEXT))
)
return getFlowStateMachine().suspend(request, maySkipCheckpoint)
return flowStateMachine.suspend(request, maySkipCheckpoint)
}
@Suspendable

View File

@ -10,7 +10,7 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.cordapp.Cordapp
import net.corda.core.flows.*
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.serialization.internal.CheckpointSerializationContext
@ -355,9 +355,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
override fun initiateFlow(wellKnown: Party, requested: AbstractParty?): FlowSession {
override fun initiateFlow(destination: Destination): FlowSession {
require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" }
val resume = processEventImmediately(
Event.InitiateFlow(wellKnownParty = wellKnown, requestedParty = requested),
Event.InitiateFlow(destination),
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = true
) as FlowContinuation.Resume

View File

@ -466,7 +466,7 @@ class SingleThreadedStateMachineManager(
try {
val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
val initiatedSessionId = SessionId.createRandom(secureRandom)
val senderSession = FlowSessionImpl(sender, sender.owningKey, initiatedSessionId)
val senderSession = FlowSessionImpl(sender, initiatedSessionId)
val flowLogic = initiatedFlowFactory.createFlow(senderSession)
val initiatedFlowInfo = when (initiatedFlowFactory) {
is InitiatedFlowFactory.Core -> FlowInfo(serviceHub.myInfo.platformVersion, "corda")

View File

@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
import net.corda.core.context.InvocationContext
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
@ -98,7 +99,7 @@ sealed class SessionState {
* We haven't yet sent the initialisation message
*/
data class Uninitiated(
val party: Party,
val destination: Destination,
val initiatingSubFlow: SubFlow.Initiating,
val sourceSessionId: SessionId,
val additionalEntropy: Long

View File

@ -222,7 +222,7 @@ class StartedFlowTransition(
deduplicationSeed = sessionState.deduplicationSeed
)
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, newSessionState)
actions.add(Action.SendInitial(sessionState.party, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
actions.add(Action.SendInitial(sessionState.destination, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
newSessions[sourceSessionId] = newSessionState
}
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
@ -256,7 +256,7 @@ class StartedFlowTransition(
when (existingSessionState) {
is SessionState.Uninitiated -> {
val initialMessage = createInitialSessionMessage(existingSessionState.initiatingSubFlow, sourceSessionId, existingSessionState.additionalEntropy, message)
actions.add(Action.SendInitial(existingSessionState.party, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
actions.add(Action.SendInitial(existingSessionState.destination, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
newSessions[sourceSessionId] = SessionState.Initiating(
bufferedMessages = emptyList(),
rejectionError = null,

View File

@ -235,8 +235,8 @@ class TopLevelTransition(
return@builder FlowContinuation.ProcessEvents
}
val sourceSessionId = SessionId.createRandom(context.secureRandom)
val sessionImpl = FlowSessionImpl(event.wellKnownParty, event.requestedParty?.owningKey?: event.wellKnownParty.owningKey, sourceSessionId)
val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.wellKnownParty, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
val sessionImpl = FlowSessionImpl(event.destination, sourceSessionId)
val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
actions.add(Action.AddSessionBinding(context.id, sourceSessionId))
FlowContinuation.Resume(sessionImpl)

View File

@ -36,8 +36,7 @@ import net.corda.testing.internal.LogHelper
import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
import net.corda.testing.node.internal.*
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.Assertions.*
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
import org.assertj.core.api.Condition
import org.junit.After
@ -463,6 +462,28 @@ class FlowFrameworkTests {
)
}
@Test
fun `initiating flow using unknown AnonymousParty`() {
val anonymousBob = bobNode.services.keyManagementService.freshKeyAndCert(bobNode.info.legalIdentitiesAndCerts.single(), false).party.anonymise()
bobNode.registerCordappFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
val result = aliceNode.services.startFlow(SendAndReceiveFlow(anonymousBob, "Hello")).resultFuture
mockNet.runNetwork()
assertThatIllegalArgumentException()
.isThrownBy { result.getOrThrow() }
.withMessage("We do not know who $anonymousBob belongs to")
}
@Test
fun `initiating flow using known AnonymousParty`() {
val anonymousBob = bobNode.services.keyManagementService.freshKeyAndCert(bobNode.info.legalIdentitiesAndCerts.single(), false)
aliceNode.services.identityService.verifyAndRegisterIdentity(anonymousBob)
val bobResponderFlow = bobNode.registerCordappFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
val result = aliceNode.services.startFlow(SendAndReceiveFlow(anonymousBob.party.anonymise(), "Hello")).resultFuture
mockNet.runNetwork()
bobResponderFlow.getOrThrow()
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
}
//region Helpers
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
@ -762,12 +783,12 @@ internal class MyFlowException(override val message: String) : FlowException() {
internal class MyPeerFlowException(override val message: String, val peer: Party) : FlowException()
@InitiatingFlow
internal class SendAndReceiveFlow(private val otherParty: Party, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic<Any>() {
internal class SendAndReceiveFlow(private val destination: Destination, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic<Any>() {
constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession)
@Suspendable
override fun call(): Any {
return (otherPartySession ?: initiateFlow(otherParty)).sendAndReceive<Any>(payload).unwrap { it }
return (otherPartySession ?: initiateFlow(destination)).sendAndReceive<Any>(payload).unwrap { it }
}
}

View File

@ -142,8 +142,8 @@ class RetryFlowMockTest {
val alice = TestIdentity(CordaX500Name.parse("L=London,O=Alice Ltd,OU=Trade,C=GB")).party
val records = nodeA.smm.flowHospital.track().updates.toBlocking().toIterable().iterator()
val flow: FlowStateMachine<Unit> = nodeA.services.startFlow(FinalityHandler(object : FlowSession() {
override val sessionOwningKey = alice.owningKey
override val counterparty = alice
override val destination: Destination get() = alice
override val counterparty: Party get() = alice
override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo {
TODO("not implemented")