From f9c034aa7c1e18a4c86e875cd8e79761790dc5d3 Mon Sep 17 00:00:00 2001 From: Stefano Franz Date: Tue, 18 Jun 2019 12:13:09 +0000 Subject: [PATCH] CORDA-3000: Allow AbstractParty to initiate flow (#5219) --- build.gradle | 2 +- .../corda/core/flows/CollectSignaturesFlow.kt | 64 ++++++-- .../kotlin/net/corda/core/flows/FlowLogic.kt | 15 +- .../net/corda/core/flows/FlowSession.kt | 8 + .../corda/core/internal/FlowStateMachine.kt | 6 +- .../core/node/services/IdentityService.kt | 7 + .../core/flows/CollectSignaturesFlowTests.kt | 146 ++++++++++++++++-- .../services/api/IdentityServiceInternal.kt | 2 + .../identity/InMemoryIdentityService.kt | 2 + .../identity/PersistentIdentityService.kt | 5 +- .../corda/node/services/statemachine/Event.kt | 3 +- .../services/statemachine/FlowSessionImpl.kt | 4 +- .../statemachine/FlowStateMachineImpl.kt | 9 +- .../SingleThreadedStateMachineManager.kt | 19 +-- .../transitions/TopLevelTransition.kt | 4 +- .../statemachine/RetryFlowMockTest.kt | 1 + 16 files changed, 252 insertions(+), 45 deletions(-) diff --git a/build.gradle b/build.gradle index 640f4b052e..003ec7003f 100644 --- a/build.gradle +++ b/build.gradle @@ -469,4 +469,4 @@ wrapper { buildScan { termsOfServiceUrl = 'https://gradle.com/terms-of-service' termsOfServiceAgree = 'yes' -} +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt b/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt index aab20f5e78..14351e0a7d 100644 --- a/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt @@ -3,8 +3,10 @@ package net.corda.core.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.crypto.TransactionSignature import net.corda.core.crypto.isFulfilledBy +import net.corda.core.crypto.toStringShort import net.corda.core.identity.Party import net.corda.core.identity.groupPublicKeysByWellKnownParty +import net.corda.core.internal.toMultiMap import net.corda.core.node.ServiceHub import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction @@ -65,7 +67,8 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig val sessionsToCollectFrom: Collection, val myOptionalKeys: Iterable?, override val progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : FlowLogic() { - @JvmOverloads constructor(partiallySignedTx: SignedTransaction, sessionsToCollectFrom: Collection, progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : this(partiallySignedTx, sessionsToCollectFrom, null, progressTracker) + @JvmOverloads + constructor(partiallySignedTx: SignedTransaction, sessionsToCollectFrom: Collection, progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : this(partiallySignedTx, sessionsToCollectFrom, null, progressTracker) companion object { object COLLECTING : ProgressTracker.Step("Collecting signatures from counterparties.") @@ -105,14 +108,55 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig // If the unsigned counterparties list is empty then we don't need to collect any more signatures here. if (unsigned.isEmpty()) return partiallySignedTx - val partyToKeysMap = groupPublicKeysByWellKnownParty(serviceHub, unsigned) - // Check that we have a session for all parties. No more, no less. - require(sessionsToCollectFrom.map { it.counterparty }.toSet() == partyToKeysMap.keys) { + val setOfAllSessionKeys = sessionsToCollectFrom.groupBy { it.sessionOwningKey }.map { + require(it.value.size == 1) { "There are multiple sessions initiated for party key ${it.key.toStringShort()}" } + it.key to it.value.first() + }.toMap() + + val partyToKeysItSignsFor = groupPublicKeysByWellKnownParty(serviceHub, unsigned) + val keyToSigningParty = partyToKeysItSignsFor.flatMap { (wellKnown, allKeysItSignsFor) -> + allKeysItSignsFor.map { it to wellKnown } + }.toMap() + + val unrelatedSessions = sessionsToCollectFrom.filterNot { + if (it.sessionOwningKey == it.counterparty.owningKey) { + //this session was initiated by a wellKnownParty + //the session must have a corresponding unsigned + it.counterparty in partyToKeysItSignsFor + } else { + //this session was not initiated by a wellKnownParty + //so must directly exist in the unsigned + unsigned.contains(it.sessionOwningKey) + } + } + + val keyToSessionMap = unsigned.map { + if (it in setOfAllSessionKeys) { + // the unsigned key exists directly as a sessionKey, so use that session + it to setOfAllSessionKeys[it]!! + } else { + //it might be delegated to a wellKnownParty + val wellKnownParty: Party? = keyToSigningParty[it] + if (wellKnownParty != null) { + //there is a wellKnownParty for this key, check if it has a session, and if so - use that session + val sessionForWellKnownParty = setOfAllSessionKeys[wellKnownParty.owningKey] + ?: throw IllegalStateException("No session available to request signature for key: ${it.toStringShort()}") + it to sessionForWellKnownParty + } else { + throw IllegalStateException("Could not find a session or wellKnown party for key ${it.toStringShort()}") + } + } + } + + //now invert the map to find the keys per session + val sessionToKeysMap = keyToSessionMap.map { it.second to it.first }.toMultiMap() + + require(unrelatedSessions.isEmpty()) { "The Initiator of CollectSignaturesFlow must pass in exactly the sessions required to sign the transaction." } // Collect signatures from all counterparties and append them to the partially signed transaction. - val counterpartySignatures = sessionsToCollectFrom.flatMap { session -> - subFlow(CollectSignatureFlow(partiallySignedTx, session, partyToKeysMap[session.counterparty]!!)) + val counterpartySignatures = sessionToKeysMap.flatMap { (session, keys)-> + subFlow(CollectSignatureFlow(partiallySignedTx, session, keys)) } val stx = partiallySignedTx + counterpartySignatures @@ -196,7 +240,7 @@ class CollectSignatureFlow(val partiallySignedTx: SignedTransaction, val session * @param otherSideSession The session which is providing you a transaction to sign. */ abstract class SignTransactionFlow @JvmOverloads constructor(val otherSideSession: FlowSession, - override val progressTracker: ProgressTracker = SignTransactionFlow.tracker()) : FlowLogic() { + override val progressTracker: ProgressTracker = SignTransactionFlow.tracker()) : FlowLogic() { companion object { object RECEIVING : ProgressTracker.Step("Receiving transaction proposal for signing.") @@ -247,12 +291,6 @@ abstract class SignTransactionFlow @JvmOverloads constructor(val otherSideSessio @Suspendable private fun checkSignatures(stx: SignedTransaction) { - // We set `ignoreUnrecognisedParties` to `true` in `groupPublicKeysByWellKnownParty`. This is because we don't - // need to recognise all keys, but just the initiator's. - val signingWellKnownIdentities = groupPublicKeysByWellKnownParty(serviceHub, stx.sigs.map(TransactionSignature::by), true) - require(otherSideSession.counterparty in signingWellKnownIdentities) { - "The Initiator of CollectSignaturesFlow must have signed the transaction. Found $signingWellKnownIdentities, expected $otherSideSession" - } val signed = stx.sigs.map { it.by } val allSigners = stx.tx.requiredSigningKeys val notSigned = allSigners - signed diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index bca4a37b5c..481d875384 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -6,6 +6,7 @@ import net.corda.core.CordaInternal import net.corda.core.DeleteForDJVM import net.corda.core.contracts.StateRef import net.corda.core.crypto.SecureHash +import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.* @@ -115,6 +116,19 @@ abstract class FlowLogic { */ val serviceHub: ServiceHub get() = stateMachine.serviceHub + /** + * Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note + * that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive. + */ + @Suspendable + fun initiateFlow(requested: AbstractParty): FlowSession { + val wellKnown = serviceHub.identityService.wellKnownPartyFromAnonymous(requested) + if (wellKnown == null) { + throw IllegalStateException("could not initiate flow with party $requested as they are not in the node identity service") + } + return stateMachine.initiateFlow(wellKnown = wellKnown, requested = requested) + } + /** * Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note * that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive. @@ -252,7 +266,6 @@ abstract class FlowLogic { return sendAndReceiveWithRetry(R::class.java, payload) } - /** Suspends until a message has been received for each session in the specified [sessions]. * * Consider [receiveAll(receiveType: Class, sessions: List): List>] when the same type is expected from all sessions. diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt b/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt index b1782f5424..5f45e985c0 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable import net.corda.core.DoNotImplement import net.corda.core.identity.Party import net.corda.core.utilities.UntrustworthyData +import java.security.PublicKey /** * @@ -44,6 +45,13 @@ import net.corda.core.utilities.UntrustworthyData */ @DoNotImplement abstract class FlowSession { + + /** + * The current [sessionOwningKey] in the context of this session. It is not guaranteed to be [counterparty.owningKey] as the session + * may have been initiated with an [net.corda.core.identity.AnonymousParty] useful for grouping sessions vs signing requests + */ + abstract val sessionOwningKey: PublicKey + /** * The [Party] on the other side of this session. In the case of a session created by [FlowLogic.initiateFlow] * [counterparty] is the same Party as the one passed to that function. diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt index d2c8222b0d..1a7d71263d 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt @@ -9,6 +9,7 @@ import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowSession import net.corda.core.flows.FlowStackSnapshot import net.corda.core.flows.StateMachineRunId +import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party import net.corda.core.node.ServiceHub import org.slf4j.Logger @@ -21,7 +22,10 @@ interface FlowStateMachine { fun suspend(ioRequest: FlowIORequest, maySkipCheckpoint: Boolean): SUSPENDRETURN @Suspendable - fun initiateFlow(party: Party): FlowSession + fun initiateFlow(party: Party): FlowSession = initiateFlow(wellKnown = party, requested = null) + + @Suspendable + fun initiateFlow(wellKnown: Party, requested: AbstractParty?): FlowSession fun checkFlowPermission(permissionName: String, extraAuditData: Map) diff --git a/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt b/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt index b8f2bc7757..57aebf2417 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt @@ -5,6 +5,8 @@ import net.corda.core.DoNotImplement import net.corda.core.contracts.PartyAndReference import net.corda.core.crypto.toStringShort import net.corda.core.identity.* +import net.corda.core.internal.hash +import net.corda.core.utilities.contextLogger import java.security.InvalidAlgorithmParameterException import java.security.PublicKey import java.security.cert.* @@ -24,6 +26,10 @@ interface IdentityService { val trustAnchor: TrustAnchor val caCertStore: CertStore + companion object { + val log = contextLogger() + } + /** * Verify and then store an identity. * @@ -94,6 +100,7 @@ interface IdentityService { // The original version of this would return the party as-is if it was a Party (rather than AnonymousParty), // however that means that we don't verify that we know who owns the key. As such as now enforce turning the key // into a party, and from there figure out the well known party. + log.debug("Attempting to find wellKnownParty for: ${party.owningKey.hash}") val candidate = partyFromKey(party.owningKey) // TODO: This should be done via the network map cache, which is the authoritative source of well known identities return if (candidate != null) { diff --git a/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt b/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt index 5e18e0af6f..b79f9fab14 100644 --- a/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt @@ -6,13 +6,11 @@ import net.corda.core.contracts.Command import net.corda.core.contracts.StateAndContract import net.corda.core.contracts.requireThat import net.corda.core.flows.mixins.WithContracts -import net.corda.core.identity.CordaX500Name -import net.corda.core.identity.Party -import net.corda.core.identity.excludeHostNode -import net.corda.core.identity.groupAbstractPartyByWellKnownParty +import net.corda.core.identity.* import net.corda.core.node.services.IdentityService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder +import net.corda.node.services.api.IdentityServiceInternal import net.corda.testing.contracts.DummyContract import net.corda.testing.core.* import net.corda.testing.internal.matchers.flow.willReturn @@ -23,8 +21,11 @@ import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.TestStartedNode import net.corda.testing.node.internal.enclosedCordapp +import org.hamcrest.CoreMatchers.`is` import org.junit.AfterClass +import org.junit.Assert import org.junit.Test +import java.security.PublicKey class CollectSignaturesFlowTests : WithContracts { companion object { @@ -55,11 +56,59 @@ class CollectSignaturesFlowTests : WithContracts { aliceNode.verifyAndRegister(bConfidentialIdentity) assert.that( - aliceNode.startTestFlow(alice, bConfidentialIdentity.party, charlie), + aliceNode.startTestFlow(alice, bConfidentialIdentity.party, charlie), willReturn(requiredSignatures(3)) ) } + @Test + fun `successfully collects signatures when sessions are initiated with AnonymousParty`() { + val aConfidentialIdentity1 = aliceNode.createConfidentialIdentity(alice) + val bConfidentialIdentity1 = bobNode.createConfidentialIdentity(bob) + val bConfidentialIdentity2 = bobNode.createConfidentialIdentity(bob) + val cConfidentialIdentity1 = charlieNode.createConfidentialIdentity(charlie) + + bobNode.registerInitiatedFlow(AnonymousSessionTestFlowResponder::class.java) + charlieNode.registerInitiatedFlow(AnonymousSessionTestFlowResponder::class.java) + + val owners = listOf(aConfidentialIdentity1, bConfidentialIdentity1, bConfidentialIdentity2, cConfidentialIdentity1) + + val future = aliceNode.startFlow(AnonymousSessionTestFlow(owners)).resultFuture + mockNet.runNetwork() + val stx = future.get() + val missingSigners = stx.getMissingSigners() + Assert.assertThat(missingSigners, `is`(emptySet())) + } + + @Test + fun `successfully collects signatures when sessions are initiated with both AnonymousParty and WellKnownParty`() { + val aConfidentialIdentity1 = aliceNode.createConfidentialIdentity(alice) + val bConfidentialIdentity1 = bobNode.createConfidentialIdentity(bob) + val bConfidentialIdentity2 = bobNode.createConfidentialIdentity(bob) + val cConfidentialIdentity1 = charlieNode.createConfidentialIdentity(charlie) + val cConfidentialIdentity2 = charlieNode.createConfidentialIdentity(charlie) + + bobNode.registerInitiatedFlow(MixAndMatchAnonymousSessionTestFlowResponder::class.java) + charlieNode.registerInitiatedFlow(MixAndMatchAnonymousSessionTestFlowResponder::class.java) + + val owners = listOf( + aConfidentialIdentity1, + bConfidentialIdentity1, + bConfidentialIdentity2, + cConfidentialIdentity1, + cConfidentialIdentity2 + ) + + val keysToLookup = listOf(bConfidentialIdentity1.owningKey, bConfidentialIdentity2.owningKey, cConfidentialIdentity1.owningKey) + val keysToKeepAnonymous = listOf(cConfidentialIdentity2.owningKey) + + val future = aliceNode.startFlow(MixAndMatchAnonymousSessionTestFlow(owners, keysToLookup.toSet(), keysToKeepAnonymous.toSet())).resultFuture + mockNet.runNetwork() + val stx = future.get() + val missingSigners = stx.getMissingSigners() + Assert.assertThat(missingSigners, `is`(emptySet())) + } + @Test fun `no need to collect any signatures`() { val ptx = aliceNode.signDummyContract(alice.ref(1)) @@ -97,10 +146,10 @@ class CollectSignaturesFlowTests : WithContracts { //region Operators private fun TestStartedNode.startTestFlow(vararg party: Party) = startFlowAndRunNetwork( - TestFlow.Initiator(DummyContract.MultiOwnerState( - MAGIC_NUMBER, - listOf(*party)), - mockNet.defaultNotaryIdentity)) + TestFlow.Initiator(DummyContract.MultiOwnerState( + MAGIC_NUMBER, + listOf(*party)), + mockNet.defaultNotaryIdentity)) //region Test Flow // With this flow, the initiator starts the "CollectTransactionFlow". It is then the responders responsibility to @@ -144,3 +193,82 @@ class CollectSignaturesFlowTests : WithContracts { } //region } + +@InitiatingFlow +class AnonymousSessionTestFlow(val cis: List) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + + for (ci in cis) { + if (ci.name != ourIdentity.name) { + (serviceHub.identityService as IdentityServiceInternal).verifyAndRegisterIdentity(ci) + } + } + val state = DummyContract.MultiOwnerState(owners = cis.map { AnonymousParty(it.owningKey) }) + val create = net.corda.testing.contracts.DummyContract.Commands.Create() + val txBuilder = TransactionBuilder(notary = serviceHub.networkMapCache.notaryIdentities.first()) + .addOutputState(state) + .addCommand(create, cis.map { it.owningKey }) + + + val ourKey = cis.single { it.name == ourIdentity.name }.owningKey + val signedByUsTx = serviceHub.signInitialTransaction(txBuilder, ourKey) + val sessionsToCollectFrom = cis.filter { it.name != ourIdentity.name }.map { initiateFlow(AnonymousParty(it.owningKey)) } + return subFlow(CollectSignaturesFlow(signedByUsTx, sessionsToCollectFrom, myOptionalKeys = listOf(ourKey))) + } +} + +@InitiatedBy(AnonymousSessionTestFlow::class) +class AnonymousSessionTestFlowResponder(private val otherSideSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val signFlow = object : SignTransactionFlow(otherSideSession) { + @Suspendable + override fun checkTransaction(stx: SignedTransaction) = requireThat { + } + } + val stxId = subFlow(signFlow).id + } +} + +@InitiatingFlow +class MixAndMatchAnonymousSessionTestFlow(val cis: List, + val keysToLookUp: Set, + val keysToKeepAnonymous: Set) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + + for (ci in cis) { + if (ci.name != ourIdentity.name) { + (serviceHub.identityService as IdentityServiceInternal).verifyAndRegisterIdentity(ci) + } + } + val state = DummyContract.MultiOwnerState(owners = cis.map { AnonymousParty(it.owningKey) }) + val create = net.corda.testing.contracts.DummyContract.Commands.Create() + val txBuilder = TransactionBuilder(notary = serviceHub.networkMapCache.notaryIdentities.first()) + .addOutputState(state) + .addCommand(create, cis.map { it.owningKey }) + + + val ourKey = cis.single { it.name == ourIdentity.name }.owningKey + val signedByUsTx = serviceHub.signInitialTransaction(txBuilder, ourKey) + + val resolvedParties = keysToLookUp.map { serviceHub.identityService.wellKnownPartyFromAnonymous(AnonymousParty(it))!! }.toSet() + val anonymousParties = keysToKeepAnonymous.map { AnonymousParty(it) } + val sessionsToCollectFrom = (resolvedParties + anonymousParties).map { initiateFlow(it) } + return subFlow(CollectSignaturesFlow(signedByUsTx, sessionsToCollectFrom, myOptionalKeys = listOf(ourKey))) + } +} + +@InitiatedBy(MixAndMatchAnonymousSessionTestFlow::class) +class MixAndMatchAnonymousSessionTestFlowResponder(private val otherSideSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val signFlow = object : SignTransactionFlow(otherSideSession) { + @Suspendable + override fun checkTransaction(stx: SignedTransaction) = requireThat { + } + } + val stxId = subFlow(signFlow).id + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/api/IdentityServiceInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/IdentityServiceInternal.kt index 070e5b0ed1..94a2199611 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/IdentityServiceInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/IdentityServiceInternal.kt @@ -1,6 +1,7 @@ package net.corda.node.services.api import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.CertRole import net.corda.core.node.services.IdentityService @@ -8,6 +9,7 @@ import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.crypto.x509Certificates import java.security.InvalidAlgorithmParameterException +import java.security.PublicKey import java.security.cert.CertPathValidatorException import java.security.cert.CertificateExpiredException import java.security.cert.CertificateNotYetValidException diff --git a/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt index 3b46ddaca1..d57a37425d 100644 --- a/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt +++ b/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt @@ -22,6 +22,8 @@ import javax.annotation.concurrent.ThreadSafe @ThreadSafe class InMemoryIdentityService(identities: List = emptyList(), override val trustRoot: X509Certificate) : SingletonSerializeAsToken(), IdentityServiceInternal { + + companion object { private val log = contextLogger() } diff --git a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt index 2fe6d1deb8..be70ca57c7 100644 --- a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt +++ b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt @@ -9,6 +9,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.MAX_HASH_HEX_SIZE import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug +import net.corda.core.utilities.toBase58String import net.corda.node.services.api.IdentityServiceInternal import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.crypto.X509CertificateFactory @@ -170,7 +171,9 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri } // We give the caller a copy of the data set to avoid any locking problems - override fun getAllIdentities(): Iterable = database.transaction { keyToParties.allPersisted().map { it.second }.asIterable() } + override fun getAllIdentities(): Iterable = database.transaction { + keyToParties.allPersisted().map { it.second }.asIterable() + } override fun wellKnownPartyFromX500Name(name: CordaX500Name): Party? = certificateFromCordaX500Name(name)?.party diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt index e08d2d5884..25cb7b79fa 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt @@ -1,6 +1,7 @@ package net.corda.node.services.statemachine import net.corda.core.flows.FlowLogic +import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest import net.corda.core.serialization.SerializedBytes @@ -71,7 +72,7 @@ sealed class Event { * communication takes place at this time, only on the first send/receive operation on the session. * @param party the [Party] to create a session with. */ - data class InitiateFlow(val party: Party) : Event() + data class InitiateFlow(val wellKnownParty: Party, val requestedParty: AbstractParty?) : Event() /** * Signal the entering into a subflow. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt index 364546ce9d..08ff17f3ea 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt @@ -7,15 +7,17 @@ import net.corda.core.flows.FlowSession import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.checkPayloadIs import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.serialize import net.corda.core.utilities.NonEmptySet import net.corda.core.utilities.UntrustworthyData -import net.corda.core.internal.checkPayloadIs +import java.security.PublicKey class FlowSessionImpl( override val counterparty: Party, + override val sessionOwningKey: PublicKey, val sourceSessionId: SessionId ) : FlowSession() { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 36a4e1ecb0..0c7e3b56ff 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -10,6 +10,7 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.cordapp.Cordapp import net.corda.core.flows.* +import net.corda.core.identity.AbstractParty import net.corda.core.identity.Party import net.corda.core.internal.* import net.corda.core.serialization.internal.CheckpointSerializationContext @@ -230,7 +231,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, "Transaction context is missing. This might happen if a suspendable method is not annotated with @Suspendable annotation." } } else { - require(contextTransactionOrNull == null){"Transaction is marked as not present, but is not null"} + require(contextTransactionOrNull == null) { "Transaction is marked as not present, but is not null" } } } @@ -354,9 +355,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } @Suspendable - override fun initiateFlow(party: Party): FlowSession { + override fun initiateFlow(wellKnown: Party, requested: AbstractParty?): FlowSession { val resume = processEventImmediately( - Event.InitiateFlow(party), + Event.InitiateFlow(wellKnownParty = wellKnown, requestedParty = requested), isDbTransactionOpenOnEntry = true, isDbTransactionOpenOnExit = true ) as FlowContinuation.Resume @@ -440,7 +441,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, isDbTransactionOpenOnEntry = true, isDbTransactionOpenOnExit = false ) - require(continuation == FlowContinuation.ProcessEvents){"Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation "} + require(continuation == FlowContinuation.ProcessEvents) { "Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation " } unpark(SERIALIZER_BLOCKER) } return uncheckedCast(processEventsUntilFlowIsResumed( diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 76994a826e..34883e0e72 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -37,11 +37,7 @@ import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion -import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor -import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker -import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor -import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor -import net.corda.node.services.statemachine.interceptors.PrintingInterceptor +import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.errorAndTerminate @@ -57,12 +53,13 @@ import rx.subjects.PublishSubject import java.lang.Integer.min import java.security.SecureRandom import java.util.HashSet -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors -import java.util.concurrent.ScheduledFuture -import java.util.concurrent.TimeUnit +import java.util.concurrent.* import javax.annotation.concurrent.ThreadSafe +import kotlin.collections.ArrayList +import kotlin.collections.HashMap +import kotlin.collections.component1 +import kotlin.collections.component2 +import kotlin.collections.set import kotlin.streams.toList /** @@ -469,7 +466,7 @@ class SingleThreadedStateMachineManager( try { val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage) val initiatedSessionId = SessionId.createRandom(secureRandom) - val senderSession = FlowSessionImpl(sender, initiatedSessionId) + val senderSession = FlowSessionImpl(sender, sender.owningKey, initiatedSessionId) val flowLogic = initiatedFlowFactory.createFlow(senderSession) val initiatedFlowInfo = when (initiatedFlowFactory) { is InitiatedFlowFactory.Core -> FlowInfo(serviceHub.myInfo.platformVersion, "corda") diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index d9e7b7e173..0b0c1aa6c4 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -235,8 +235,8 @@ class TopLevelTransition( return@builder FlowContinuation.ProcessEvents } val sourceSessionId = SessionId.createRandom(context.secureRandom) - val sessionImpl = FlowSessionImpl(event.party, sourceSessionId) - val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.party, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong())) + val sessionImpl = FlowSessionImpl(event.wellKnownParty, event.requestedParty?.owningKey?: event.wellKnownParty.owningKey, sourceSessionId) + val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.wellKnownParty, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong())) currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions)) actions.add(Action.AddSessionBinding(context.id, sourceSessionId)) FlowContinuation.Resume(sessionImpl) diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt index 776c8eb9f6..4b8c261e9d 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt @@ -142,6 +142,7 @@ class RetryFlowMockTest { val alice = TestIdentity(CordaX500Name.parse("L=London,O=Alice Ltd,OU=Trade,C=GB")).party val records = nodeA.smm.flowHospital.track().updates.toBlocking().toIterable().iterator() val flow: FlowStateMachine = nodeA.services.startFlow(FinalityHandler(object : FlowSession() { + override val sessionOwningKey = alice.owningKey override val counterparty = alice override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo {