CORDA-3000: Allow AbstractParty to initiate flow (#5219)

This commit is contained in:
Stefano Franz 2019-06-18 12:13:09 +00:00 committed by Shams Asari
parent f3e06aa623
commit f9c034aa7c
16 changed files with 252 additions and 45 deletions

View File

@ -469,4 +469,4 @@ wrapper {
buildScan {
termsOfServiceUrl = 'https://gradle.com/terms-of-service'
termsOfServiceAgree = 'yes'
}
}

View File

@ -3,8 +3,10 @@ package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.TransactionSignature
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.Party
import net.corda.core.identity.groupPublicKeysByWellKnownParty
import net.corda.core.internal.toMultiMap
import net.corda.core.node.ServiceHub
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
@ -65,7 +67,8 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig
val sessionsToCollectFrom: Collection<FlowSession>,
val myOptionalKeys: Iterable<PublicKey>?,
override val progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : FlowLogic<SignedTransaction>() {
@JvmOverloads constructor(partiallySignedTx: SignedTransaction, sessionsToCollectFrom: Collection<FlowSession>, progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : this(partiallySignedTx, sessionsToCollectFrom, null, progressTracker)
@JvmOverloads
constructor(partiallySignedTx: SignedTransaction, sessionsToCollectFrom: Collection<FlowSession>, progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : this(partiallySignedTx, sessionsToCollectFrom, null, progressTracker)
companion object {
object COLLECTING : ProgressTracker.Step("Collecting signatures from counterparties.")
@ -105,14 +108,55 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig
// If the unsigned counterparties list is empty then we don't need to collect any more signatures here.
if (unsigned.isEmpty()) return partiallySignedTx
val partyToKeysMap = groupPublicKeysByWellKnownParty(serviceHub, unsigned)
// Check that we have a session for all parties. No more, no less.
require(sessionsToCollectFrom.map { it.counterparty }.toSet() == partyToKeysMap.keys) {
val setOfAllSessionKeys = sessionsToCollectFrom.groupBy { it.sessionOwningKey }.map {
require(it.value.size == 1) { "There are multiple sessions initiated for party key ${it.key.toStringShort()}" }
it.key to it.value.first()
}.toMap()
val partyToKeysItSignsFor = groupPublicKeysByWellKnownParty(serviceHub, unsigned)
val keyToSigningParty = partyToKeysItSignsFor.flatMap { (wellKnown, allKeysItSignsFor) ->
allKeysItSignsFor.map { it to wellKnown }
}.toMap()
val unrelatedSessions = sessionsToCollectFrom.filterNot {
if (it.sessionOwningKey == it.counterparty.owningKey) {
//this session was initiated by a wellKnownParty
//the session must have a corresponding unsigned
it.counterparty in partyToKeysItSignsFor
} else {
//this session was not initiated by a wellKnownParty
//so must directly exist in the unsigned
unsigned.contains(it.sessionOwningKey)
}
}
val keyToSessionMap = unsigned.map {
if (it in setOfAllSessionKeys) {
// the unsigned key exists directly as a sessionKey, so use that session
it to setOfAllSessionKeys[it]!!
} else {
//it might be delegated to a wellKnownParty
val wellKnownParty: Party? = keyToSigningParty[it]
if (wellKnownParty != null) {
//there is a wellKnownParty for this key, check if it has a session, and if so - use that session
val sessionForWellKnownParty = setOfAllSessionKeys[wellKnownParty.owningKey]
?: throw IllegalStateException("No session available to request signature for key: ${it.toStringShort()}")
it to sessionForWellKnownParty
} else {
throw IllegalStateException("Could not find a session or wellKnown party for key ${it.toStringShort()}")
}
}
}
//now invert the map to find the keys per session
val sessionToKeysMap = keyToSessionMap.map { it.second to it.first }.toMultiMap()
require(unrelatedSessions.isEmpty()) {
"The Initiator of CollectSignaturesFlow must pass in exactly the sessions required to sign the transaction."
}
// Collect signatures from all counterparties and append them to the partially signed transaction.
val counterpartySignatures = sessionsToCollectFrom.flatMap { session ->
subFlow(CollectSignatureFlow(partiallySignedTx, session, partyToKeysMap[session.counterparty]!!))
val counterpartySignatures = sessionToKeysMap.flatMap { (session, keys)->
subFlow(CollectSignatureFlow(partiallySignedTx, session, keys))
}
val stx = partiallySignedTx + counterpartySignatures
@ -196,7 +240,7 @@ class CollectSignatureFlow(val partiallySignedTx: SignedTransaction, val session
* @param otherSideSession The session which is providing you a transaction to sign.
*/
abstract class SignTransactionFlow @JvmOverloads constructor(val otherSideSession: FlowSession,
override val progressTracker: ProgressTracker = SignTransactionFlow.tracker()) : FlowLogic<SignedTransaction>() {
override val progressTracker: ProgressTracker = SignTransactionFlow.tracker()) : FlowLogic<SignedTransaction>() {
companion object {
object RECEIVING : ProgressTracker.Step("Receiving transaction proposal for signing.")
@ -247,12 +291,6 @@ abstract class SignTransactionFlow @JvmOverloads constructor(val otherSideSessio
@Suspendable
private fun checkSignatures(stx: SignedTransaction) {
// We set `ignoreUnrecognisedParties` to `true` in `groupPublicKeysByWellKnownParty`. This is because we don't
// need to recognise all keys, but just the initiator's.
val signingWellKnownIdentities = groupPublicKeysByWellKnownParty(serviceHub, stx.sigs.map(TransactionSignature::by), true)
require(otherSideSession.counterparty in signingWellKnownIdentities) {
"The Initiator of CollectSignaturesFlow must have signed the transaction. Found $signingWellKnownIdentities, expected $otherSideSession"
}
val signed = stx.sigs.map { it.by }
val allSigners = stx.tx.requiredSigningKeys
val notSigned = allSigners - signed

View File

@ -6,6 +6,7 @@ import net.corda.core.CordaInternal
import net.corda.core.DeleteForDJVM
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.*
@ -115,6 +116,19 @@ abstract class FlowLogic<out T> {
*/
val serviceHub: ServiceHub get() = stateMachine.serviceHub
/**
* Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note
* that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
*/
@Suspendable
fun initiateFlow(requested: AbstractParty): FlowSession {
val wellKnown = serviceHub.identityService.wellKnownPartyFromAnonymous(requested)
if (wellKnown == null) {
throw IllegalStateException("could not initiate flow with party $requested as they are not in the node identity service")
}
return stateMachine.initiateFlow(wellKnown = wellKnown, requested = requested)
}
/**
* Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note
* that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
@ -252,7 +266,6 @@ abstract class FlowLogic<out T> {
return sendAndReceiveWithRetry(R::class.java, payload)
}
/** Suspends until a message has been received for each session in the specified [sessions].
*
* Consider [receiveAll(receiveType: Class<R>, sessions: List<FlowSession>): List<UntrustworthyData<R>>] when the same type is expected from all sessions.

View File

@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.DoNotImplement
import net.corda.core.identity.Party
import net.corda.core.utilities.UntrustworthyData
import java.security.PublicKey
/**
*
@ -44,6 +45,13 @@ import net.corda.core.utilities.UntrustworthyData
*/
@DoNotImplement
abstract class FlowSession {
/**
* The current [sessionOwningKey] in the context of this session. It is not guaranteed to be [counterparty.owningKey] as the session
* may have been initiated with an [net.corda.core.identity.AnonymousParty] useful for grouping sessions vs signing requests
*/
abstract val sessionOwningKey: PublicKey
/**
* The [Party] on the other side of this session. In the case of a session created by [FlowLogic.initiateFlow]
* [counterparty] is the same Party as the one passed to that function.

View File

@ -9,6 +9,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import org.slf4j.Logger
@ -21,7 +22,10 @@ interface FlowStateMachine<FLOWRETURN> {
fun <SUSPENDRETURN : Any> suspend(ioRequest: FlowIORequest<SUSPENDRETURN>, maySkipCheckpoint: Boolean): SUSPENDRETURN
@Suspendable
fun initiateFlow(party: Party): FlowSession
fun initiateFlow(party: Party): FlowSession = initiateFlow(wellKnown = party, requested = null)
@Suspendable
fun initiateFlow(wellKnown: Party, requested: AbstractParty?): FlowSession
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>)

View File

@ -5,6 +5,8 @@ import net.corda.core.DoNotImplement
import net.corda.core.contracts.PartyAndReference
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.*
import net.corda.core.internal.hash
import net.corda.core.utilities.contextLogger
import java.security.InvalidAlgorithmParameterException
import java.security.PublicKey
import java.security.cert.*
@ -24,6 +26,10 @@ interface IdentityService {
val trustAnchor: TrustAnchor
val caCertStore: CertStore
companion object {
val log = contextLogger()
}
/**
* Verify and then store an identity.
*
@ -94,6 +100,7 @@ interface IdentityService {
// The original version of this would return the party as-is if it was a Party (rather than AnonymousParty),
// however that means that we don't verify that we know who owns the key. As such as now enforce turning the key
// into a party, and from there figure out the well known party.
log.debug("Attempting to find wellKnownParty for: ${party.owningKey.hash}")
val candidate = partyFromKey(party.owningKey)
// TODO: This should be done via the network map cache, which is the authoritative source of well known identities
return if (candidate != null) {

View File

@ -6,13 +6,11 @@ import net.corda.core.contracts.Command
import net.corda.core.contracts.StateAndContract
import net.corda.core.contracts.requireThat
import net.corda.core.flows.mixins.WithContracts
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.excludeHostNode
import net.corda.core.identity.groupAbstractPartyByWellKnownParty
import net.corda.core.identity.*
import net.corda.core.node.services.IdentityService
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.testing.contracts.DummyContract
import net.corda.testing.core.*
import net.corda.testing.internal.matchers.flow.willReturn
@ -23,8 +21,11 @@ import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.enclosedCordapp
import org.hamcrest.CoreMatchers.`is`
import org.junit.AfterClass
import org.junit.Assert
import org.junit.Test
import java.security.PublicKey
class CollectSignaturesFlowTests : WithContracts {
companion object {
@ -55,11 +56,59 @@ class CollectSignaturesFlowTests : WithContracts {
aliceNode.verifyAndRegister(bConfidentialIdentity)
assert.that(
aliceNode.startTestFlow(alice, bConfidentialIdentity.party, charlie),
aliceNode.startTestFlow(alice, bConfidentialIdentity.party, charlie),
willReturn(requiredSignatures(3))
)
}
@Test
fun `successfully collects signatures when sessions are initiated with AnonymousParty`() {
val aConfidentialIdentity1 = aliceNode.createConfidentialIdentity(alice)
val bConfidentialIdentity1 = bobNode.createConfidentialIdentity(bob)
val bConfidentialIdentity2 = bobNode.createConfidentialIdentity(bob)
val cConfidentialIdentity1 = charlieNode.createConfidentialIdentity(charlie)
bobNode.registerInitiatedFlow(AnonymousSessionTestFlowResponder::class.java)
charlieNode.registerInitiatedFlow(AnonymousSessionTestFlowResponder::class.java)
val owners = listOf(aConfidentialIdentity1, bConfidentialIdentity1, bConfidentialIdentity2, cConfidentialIdentity1)
val future = aliceNode.startFlow(AnonymousSessionTestFlow(owners)).resultFuture
mockNet.runNetwork()
val stx = future.get()
val missingSigners = stx.getMissingSigners()
Assert.assertThat(missingSigners, `is`(emptySet()))
}
@Test
fun `successfully collects signatures when sessions are initiated with both AnonymousParty and WellKnownParty`() {
val aConfidentialIdentity1 = aliceNode.createConfidentialIdentity(alice)
val bConfidentialIdentity1 = bobNode.createConfidentialIdentity(bob)
val bConfidentialIdentity2 = bobNode.createConfidentialIdentity(bob)
val cConfidentialIdentity1 = charlieNode.createConfidentialIdentity(charlie)
val cConfidentialIdentity2 = charlieNode.createConfidentialIdentity(charlie)
bobNode.registerInitiatedFlow(MixAndMatchAnonymousSessionTestFlowResponder::class.java)
charlieNode.registerInitiatedFlow(MixAndMatchAnonymousSessionTestFlowResponder::class.java)
val owners = listOf(
aConfidentialIdentity1,
bConfidentialIdentity1,
bConfidentialIdentity2,
cConfidentialIdentity1,
cConfidentialIdentity2
)
val keysToLookup = listOf(bConfidentialIdentity1.owningKey, bConfidentialIdentity2.owningKey, cConfidentialIdentity1.owningKey)
val keysToKeepAnonymous = listOf(cConfidentialIdentity2.owningKey)
val future = aliceNode.startFlow(MixAndMatchAnonymousSessionTestFlow(owners, keysToLookup.toSet(), keysToKeepAnonymous.toSet())).resultFuture
mockNet.runNetwork()
val stx = future.get()
val missingSigners = stx.getMissingSigners()
Assert.assertThat(missingSigners, `is`(emptySet()))
}
@Test
fun `no need to collect any signatures`() {
val ptx = aliceNode.signDummyContract(alice.ref(1))
@ -97,10 +146,10 @@ class CollectSignaturesFlowTests : WithContracts {
//region Operators
private fun TestStartedNode.startTestFlow(vararg party: Party) =
startFlowAndRunNetwork(
TestFlow.Initiator(DummyContract.MultiOwnerState(
MAGIC_NUMBER,
listOf(*party)),
mockNet.defaultNotaryIdentity))
TestFlow.Initiator(DummyContract.MultiOwnerState(
MAGIC_NUMBER,
listOf(*party)),
mockNet.defaultNotaryIdentity))
//region Test Flow
// With this flow, the initiator starts the "CollectTransactionFlow". It is then the responders responsibility to
@ -144,3 +193,82 @@ class CollectSignaturesFlowTests : WithContracts {
}
//region
}
@InitiatingFlow
class AnonymousSessionTestFlow(val cis: List<PartyAndCertificate>) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
for (ci in cis) {
if (ci.name != ourIdentity.name) {
(serviceHub.identityService as IdentityServiceInternal).verifyAndRegisterIdentity(ci)
}
}
val state = DummyContract.MultiOwnerState(owners = cis.map { AnonymousParty(it.owningKey) })
val create = net.corda.testing.contracts.DummyContract.Commands.Create()
val txBuilder = TransactionBuilder(notary = serviceHub.networkMapCache.notaryIdentities.first())
.addOutputState(state)
.addCommand(create, cis.map { it.owningKey })
val ourKey = cis.single { it.name == ourIdentity.name }.owningKey
val signedByUsTx = serviceHub.signInitialTransaction(txBuilder, ourKey)
val sessionsToCollectFrom = cis.filter { it.name != ourIdentity.name }.map { initiateFlow(AnonymousParty(it.owningKey)) }
return subFlow(CollectSignaturesFlow(signedByUsTx, sessionsToCollectFrom, myOptionalKeys = listOf(ourKey)))
}
}
@InitiatedBy(AnonymousSessionTestFlow::class)
class AnonymousSessionTestFlowResponder(private val otherSideSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val signFlow = object : SignTransactionFlow(otherSideSession) {
@Suspendable
override fun checkTransaction(stx: SignedTransaction) = requireThat {
}
}
val stxId = subFlow(signFlow).id
}
}
@InitiatingFlow
class MixAndMatchAnonymousSessionTestFlow(val cis: List<PartyAndCertificate>,
val keysToLookUp: Set<PublicKey>,
val keysToKeepAnonymous: Set<PublicKey>) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
for (ci in cis) {
if (ci.name != ourIdentity.name) {
(serviceHub.identityService as IdentityServiceInternal).verifyAndRegisterIdentity(ci)
}
}
val state = DummyContract.MultiOwnerState(owners = cis.map { AnonymousParty(it.owningKey) })
val create = net.corda.testing.contracts.DummyContract.Commands.Create()
val txBuilder = TransactionBuilder(notary = serviceHub.networkMapCache.notaryIdentities.first())
.addOutputState(state)
.addCommand(create, cis.map { it.owningKey })
val ourKey = cis.single { it.name == ourIdentity.name }.owningKey
val signedByUsTx = serviceHub.signInitialTransaction(txBuilder, ourKey)
val resolvedParties = keysToLookUp.map { serviceHub.identityService.wellKnownPartyFromAnonymous(AnonymousParty(it))!! }.toSet()
val anonymousParties = keysToKeepAnonymous.map { AnonymousParty(it) }
val sessionsToCollectFrom = (resolvedParties + anonymousParties).map { initiateFlow(it) }
return subFlow(CollectSignaturesFlow(signedByUsTx, sessionsToCollectFrom, myOptionalKeys = listOf(ourKey)))
}
}
@InitiatedBy(MixAndMatchAnonymousSessionTestFlow::class)
class MixAndMatchAnonymousSessionTestFlowResponder(private val otherSideSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val signFlow = object : SignTransactionFlow(otherSideSession) {
@Suspendable
override fun checkTransaction(stx: SignedTransaction) = requireThat {
}
}
val stxId = subFlow(signFlow).id
}
}

View File

@ -1,6 +1,7 @@
package net.corda.node.services.api
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.CertRole
import net.corda.core.node.services.IdentityService
@ -8,6 +9,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.crypto.x509Certificates
import java.security.InvalidAlgorithmParameterException
import java.security.PublicKey
import java.security.cert.CertPathValidatorException
import java.security.cert.CertificateExpiredException
import java.security.cert.CertificateNotYetValidException

View File

@ -22,6 +22,8 @@ import javax.annotation.concurrent.ThreadSafe
@ThreadSafe
class InMemoryIdentityService(identities: List<PartyAndCertificate> = emptyList(),
override val trustRoot: X509Certificate) : SingletonSerializeAsToken(), IdentityServiceInternal {
companion object {
private val log = contextLogger()
}

View File

@ -9,6 +9,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.toBase58String
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
@ -170,7 +171,9 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
}
// We give the caller a copy of the data set to avoid any locking problems
override fun getAllIdentities(): Iterable<PartyAndCertificate> = database.transaction { keyToParties.allPersisted().map { it.second }.asIterable() }
override fun getAllIdentities(): Iterable<PartyAndCertificate> = database.transaction {
keyToParties.allPersisted().map { it.second }.asIterable()
}
override fun wellKnownPartyFromX500Name(name: CordaX500Name): Party? = certificateFromCordaX500Name(name)?.party

View File

@ -1,6 +1,7 @@
package net.corda.node.services.statemachine
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes
@ -71,7 +72,7 @@ sealed class Event {
* communication takes place at this time, only on the first send/receive operation on the session.
* @param party the [Party] to create a session with.
*/
data class InitiateFlow(val party: Party) : Event()
data class InitiateFlow(val wellKnownParty: Party, val requestedParty: AbstractParty?) : Event()
/**
* Signal the entering into a subflow.

View File

@ -7,15 +7,17 @@ import net.corda.core.flows.FlowSession
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.checkPayloadIs
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.internal.checkPayloadIs
import java.security.PublicKey
class FlowSessionImpl(
override val counterparty: Party,
override val sessionOwningKey: PublicKey,
val sourceSessionId: SessionId
) : FlowSession() {

View File

@ -10,6 +10,7 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.cordapp.Cordapp
import net.corda.core.flows.*
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.serialization.internal.CheckpointSerializationContext
@ -230,7 +231,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
"Transaction context is missing. This might happen if a suspendable method is not annotated with @Suspendable annotation."
}
} else {
require(contextTransactionOrNull == null){"Transaction is marked as not present, but is not null"}
require(contextTransactionOrNull == null) { "Transaction is marked as not present, but is not null" }
}
}
@ -354,9 +355,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
override fun initiateFlow(party: Party): FlowSession {
override fun initiateFlow(wellKnown: Party, requested: AbstractParty?): FlowSession {
val resume = processEventImmediately(
Event.InitiateFlow(party),
Event.InitiateFlow(wellKnownParty = wellKnown, requestedParty = requested),
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = true
) as FlowContinuation.Resume
@ -440,7 +441,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = false
)
require(continuation == FlowContinuation.ProcessEvents){"Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation "}
require(continuation == FlowContinuation.ProcessEvents) { "Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation " }
unpark(SERIALIZER_BLOCKER)
}
return uncheckedCast(processEventsUntilFlowIsResumed(

View File

@ -37,11 +37,7 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
import net.corda.node.services.statemachine.interceptors.*
import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.errorAndTerminate
@ -57,12 +53,13 @@ import rx.subjects.PublishSubject
import java.lang.Integer.min
import java.security.SecureRandom
import java.util.HashSet
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.*
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
import kotlin.collections.HashMap
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
import kotlin.streams.toList
/**
@ -469,7 +466,7 @@ class SingleThreadedStateMachineManager(
try {
val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
val initiatedSessionId = SessionId.createRandom(secureRandom)
val senderSession = FlowSessionImpl(sender, initiatedSessionId)
val senderSession = FlowSessionImpl(sender, sender.owningKey, initiatedSessionId)
val flowLogic = initiatedFlowFactory.createFlow(senderSession)
val initiatedFlowInfo = when (initiatedFlowFactory) {
is InitiatedFlowFactory.Core -> FlowInfo(serviceHub.myInfo.platformVersion, "corda")

View File

@ -235,8 +235,8 @@ class TopLevelTransition(
return@builder FlowContinuation.ProcessEvents
}
val sourceSessionId = SessionId.createRandom(context.secureRandom)
val sessionImpl = FlowSessionImpl(event.party, sourceSessionId)
val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.party, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
val sessionImpl = FlowSessionImpl(event.wellKnownParty, event.requestedParty?.owningKey?: event.wellKnownParty.owningKey, sourceSessionId)
val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.wellKnownParty, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
actions.add(Action.AddSessionBinding(context.id, sourceSessionId))
FlowContinuation.Resume(sessionImpl)

View File

@ -142,6 +142,7 @@ class RetryFlowMockTest {
val alice = TestIdentity(CordaX500Name.parse("L=London,O=Alice Ltd,OU=Trade,C=GB")).party
val records = nodeA.smm.flowHospital.track().updates.toBlocking().toIterable().iterator()
val flow: FlowStateMachine<Unit> = nodeA.services.startFlow(FinalityHandler(object : FlowSession() {
override val sessionOwningKey = alice.owningKey
override val counterparty = alice
override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo {