Introduced FlowLogic.getFlowContext which provides the flow version and app name of the other side.

This commit is contained in:
Shams Asari
2017-08-09 12:12:47 +01:00
parent f0c7d7665a
commit 008301c4e8
25 changed files with 374 additions and 279 deletions

View File

@ -7,6 +7,7 @@ import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.abbreviate import net.corda.core.internal.abbreviate
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.UntrustworthyData
@ -52,10 +53,15 @@ abstract class FlowLogic<out T> {
*/ */
val serviceHub: ServiceHub get() = stateMachine.serviceHub val serviceHub: ServiceHub get() = stateMachine.serviceHub
@Deprecated("This is no longer used and will be removed in a future release. If you are using this to communicate " + /**
"with the same party but for two different message streams, then the correct way of doing that is to use sub-flows", * Returns a [FlowContext] object describing the flow [otherParty] is using. With [FlowContext.flowVersion] it
level = DeprecationLevel.ERROR) * provides the necessary information needed for the evolution of flows and enabling backwards compatibility.
open fun getCounterpartyMarker(party: Party): Class<*> = javaClass *
* This method can be called before any send or receive has been done with [otherParty]. In such a case this will force
* them to start their flow.
*/
@Suspendable
fun getFlowContext(otherParty: Party): FlowContext = stateMachine.getFlowContext(otherParty, flowUsedForSessions)
/** /**
* Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response * Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
@ -90,11 +96,6 @@ abstract class FlowLogic<out T> {
return stateMachine.sendAndReceive(receiveType, otherParty, payload, flowUsedForSessions) return stateMachine.sendAndReceive(receiveType, otherParty, payload, flowUsedForSessions)
} }
/** @see sendAndReceiveWithRetry */
internal inline fun <reified R : Any> sendAndReceiveWithRetry(otherParty: Party, payload: Any): UntrustworthyData<R> {
return sendAndReceiveWithRetry(R::class.java, otherParty, payload)
}
/** /**
* Similar to [sendAndReceive] but also instructs the `payload` to be redelivered until the expected message is received. * Similar to [sendAndReceive] but also instructs the `payload` to be redelivered until the expected message is received.
* *
@ -104,9 +105,8 @@ abstract class FlowLogic<out T> {
* oracle services. If one or more nodes in the service cluster go down mid-session, the message will be redelivered * oracle services. If one or more nodes in the service cluster go down mid-session, the message will be redelivered
* to a different one, so there is no need to wait until the initial node comes back up to obtain a response. * to a different one, so there is no need to wait until the initial node comes back up to obtain a response.
*/ */
@Suspendable internal inline fun <reified R : Any> sendAndReceiveWithRetry(otherParty: Party, payload: Any): UntrustworthyData<R> {
internal open fun <R : Any> sendAndReceiveWithRetry(receiveType: Class<R>, otherParty: Party, payload: Any): UntrustworthyData<R> { return stateMachine.sendAndReceive(R::class.java, otherParty, payload, flowUsedForSessions, true)
return stateMachine.sendAndReceive(receiveType, otherParty, payload, flowUsedForSessions, true)
} }
/** /**
@ -181,7 +181,9 @@ abstract class FlowLogic<out T> {
* @param extraAuditData in the audit log for this permission check these extra key value pairs will be recorded. * @param extraAuditData in the audit log for this permission check these extra key value pairs will be recorded.
*/ */
@Throws(FlowException::class) @Throws(FlowException::class)
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>) = stateMachine.checkFlowPermission(permissionName, extraAuditData) fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>) {
stateMachine.checkFlowPermission(permissionName, extraAuditData)
}
/** /**
@ -190,7 +192,9 @@ abstract class FlowLogic<out T> {
* @param comment a general human readable summary of the event. * @param comment a general human readable summary of the event.
* @param extraAuditData in the audit log for this permission check these extra key value pairs will be recorded. * @param extraAuditData in the audit log for this permission check these extra key value pairs will be recorded.
*/ */
fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>) = stateMachine.recordAuditEvent(eventType, comment, extraAuditData) fun recordAuditEvent(eventType: String, comment: String, extraAuditData: Map<String, String>) {
stateMachine.recordAuditEvent(eventType, comment, extraAuditData)
}
/** /**
* Override this to provide a [ProgressTracker]. If one is provided and stepped, the framework will do something * Override this to provide a [ProgressTracker]. If one is provided and stepped, the framework will do something
@ -262,3 +266,20 @@ abstract class FlowLogic<out T> {
} }
} }
} }
/**
* Version and name of the CorDapp hosting the other side of the flow.
*/
@CordaSerializable
data class FlowContext(
/**
* The integer flow version the other side is using.
* @see InitiatingFlow
*/
val flowVersion: Int,
/**
* Name of the CorDapp jar hosting the flow, without the .jar extension. It will include a unique identifier
* to deduplicate it from other releases of the same CorDapp, typically a version string. See the
* [CorDapp JAR format](https://docs.corda.net/cordapp-build-systems.html#cordapp-jar-format) for more details.
*/
val appName: String)

View File

@ -3,6 +3,7 @@ package net.corda.core.internal
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowContext
import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
@ -14,6 +15,9 @@ import org.slf4j.Logger
/** This is an internal interface that is implemented by code in the node module. You should look at [FlowLogic]. */ /** This is an internal interface that is implemented by code in the node module. You should look at [FlowLogic]. */
interface FlowStateMachine<R> { interface FlowStateMachine<R> {
@Suspendable
fun getFlowContext(otherParty: Party, sessionFlow: FlowLogic<*>): FlowContext
@Suspendable @Suspendable
fun <T : Any> sendAndReceive(receiveType: Class<T>, fun <T : Any> sendAndReceive(receiveType: Class<T>,
otherParty: Party, otherParty: Party,

View File

@ -6,6 +6,7 @@ import net.corda.core.crypto.*
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.NotaryError import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
@ -21,11 +22,9 @@ abstract class NotaryService : SingletonSerializeAsToken() {
/** /**
* Produces a notary service flow which has the corresponding sends and receives as [NotaryFlow.Client]. * Produces a notary service flow which has the corresponding sends and receives as [NotaryFlow.Client].
* The first parameter is the client [Party] making the request and the second is the platform version * @param otherParty client [Party] making the request
* of the client's node. Use this version parameter to provide backwards compatibility if the notary flow protocol
* changes.
*/ */
abstract fun createServiceFlow(otherParty: Party, platformVersion: Int): FlowLogic<Void?> abstract fun createServiceFlow(otherParty: Party): FlowLogic<Void?>
} }
/** /**

View File

@ -5,21 +5,20 @@ import net.corda.core.contracts.Attachment
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.TestDataVendingFlow
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.FetchAttachmentsFlow import net.corda.core.internal.FetchAttachmentsFlow
import net.corda.core.internal.FetchDataFlow import net.corda.core.internal.FetchDataFlow
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.unwrap
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.persistence.NodeAttachmentService import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.persistence.schemas.requery.AttachmentEntity import net.corda.node.services.persistence.schemas.requery.AttachmentEntity
import net.corda.node.services.statemachine.SessionInit
import net.corda.core.flows.TestDataVendingFlow
import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockNetwork
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
@ -146,11 +145,11 @@ class AttachmentSerializationTest {
} }
private fun launchFlow(clientLogic: ClientLogic, rounds: Int, sendData: Boolean = false) { private fun launchFlow(clientLogic: ClientLogic, rounds: Int, sendData: Boolean = false) {
server.internalRegisterFlowFactory(ClientLogic::class.java, object : InitiatedFlowFactory<ServerLogic> { server.internalRegisterFlowFactory(
override fun createFlow(platformVersion: Int, otherParty: Party, sessionInit: SessionInit): ServerLogic { ClientLogic::class.java,
return ServerLogic(otherParty, sendData) InitiatedFlowFactory.Core { ServerLogic(it, sendData) },
} ServerLogic::class.java,
}, ServerLogic::class.java, track = false) track = false)
client.services.startFlow(clientLogic) client.services.startFlow(clientLogic)
mockNet.runNetwork(rounds) mockNet.runNetwork(rounds)
} }
@ -158,7 +157,9 @@ class AttachmentSerializationTest {
private fun rebootClientAndGetAttachmentContent(checkAttachmentsOnLoad: Boolean = true): String { private fun rebootClientAndGetAttachmentContent(checkAttachmentsOnLoad: Boolean = true): String {
client.stop() client.stop()
client = mockNet.createNode(server.network.myAddress, client.id, object : MockNetwork.Factory<MockNetwork.MockNode> { client = mockNet.createNode(server.network.myAddress, client.id, object : MockNetwork.Factory<MockNetwork.MockNode> {
override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?, advertisedServices: Set<ServiceInfo>, id: Int, overrideServices: Map<ServiceInfo, KeyPair>?, entropyRoot: BigInteger): MockNetwork.MockNode { override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?,
advertisedServices: Set<ServiceInfo>, id: Int, overrideServices: Map<ServiceInfo, KeyPair>?,
entropyRoot: BigInteger): MockNetwork.MockNode {
return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) { return object : MockNetwork.MockNode(config, network, networkMapAddr, advertisedServices, id, overrideServices, entropyRoot) {
override fun startMessagingService(rpcOps: RPCOps) { override fun startMessagingService(rpcOps: RPCOps) {
attachments.checkAttachmentsOnLoad = checkAttachmentsOnLoad attachments.checkAttachmentsOnLoad = checkAttachmentsOnLoad

View File

@ -1,4 +1,4 @@
Cordapp Build Systems CorDapp Build Systems
===================== =====================
Cordapps run on the Corda platform and integrate with it and each other. To learn more about the basics of a Cordapp Cordapps run on the Corda platform and integrate with it and each other. To learn more about the basics of a Cordapp
@ -6,7 +6,7 @@ please read :doc:`cordapp-overview`. To learn about writing a Cordapp as a devel
This article will specifically deal with how to build cordapps, specifically with Gradle. This article will specifically deal with how to build cordapps, specifically with Gradle.
Cordapp JAR Format CorDapp JAR format
------------------ ------------------
The first step to integrating a Cordapp with Corda is to ensure it is in the correct format. The correct format of a JAR The first step to integrating a Cordapp with Corda is to ensure it is in the correct format. The correct format of a JAR
@ -22,6 +22,10 @@ other two JARs.
The ``jar`` task included by default in the cordapp templates will automatically build your JAR in this format as long The ``jar`` task included by default in the cordapp templates will automatically build your JAR in this format as long
as your dependencies are correctly set. as your dependencies are correctly set.
The filename of the jar must include some sort of unique identifier to deduplicate it from other releases of the same
CorDapp. This is typically done by appending the version string. It should not change once the jar has been deployed on
a node. If it is then make sure no one is checking ``FlowContext.appName`` (see :doc:`versioning`).
Building against Corda Building against Corda
---------------------- ----------------------
@ -57,7 +61,7 @@ versions can be found here: https://bintray.com/r3/corda/cordformation.
In certain cases, you may also wish to build against the unstable Master branch. See :doc:`building-against-master`. In certain cases, you may also wish to build against the unstable Master branch. See :doc:`building-against-master`.
Building against Cordapps Building against CorDapps
------------------------- -------------------------
To build against a Cordapp you must add it as a ``cordapp`` dependency to your ``build.gradle``. To build against a Cordapp you must add it as a ``cordapp`` dependency to your ``build.gradle``.

View File

@ -8,10 +8,7 @@ import net.corda.core.node.PluginServiceHub
import net.corda.core.node.services.CordaService import net.corda.core.node.services.CordaService
import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.unwrap
import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.node.services.transactions.ValidatingNotaryService
import java.security.SignatureException import java.security.SignatureException
@ -26,9 +23,7 @@ class MyCustomValidatingNotaryService(override val services: PluginServiceHub) :
override val timeWindowChecker = TimeWindowChecker(services.clock) override val timeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider = PersistentUniquenessProvider() override val uniquenessProvider = PersistentUniquenessProvider()
override fun createServiceFlow(otherParty: Party, platformVersion: Int): FlowLogic<Void?> { override fun createServiceFlow(otherParty: Party): FlowLogic<Void?> = MyValidatingNotaryFlow(otherParty, this)
return MyValidatingNotaryFlow(otherParty, this)
}
override fun start() {} override fun start() {}
override fun stop() {} override fun stop() {}
@ -38,9 +33,8 @@ class MyCustomValidatingNotaryService(override val services: PluginServiceHub) :
// START 2 // START 2
class MyValidatingNotaryFlow(otherSide: Party, service: MyCustomValidatingNotaryService) : NotaryFlow.Service(otherSide, service) { class MyValidatingNotaryFlow(otherSide: Party, service: MyCustomValidatingNotaryService) : NotaryFlow.Service(otherSide, service) {
/** /**
* The received transaction is checked for contract-validity, which requires fully resolving it into a * The received transaction is checked for contract-validity, for which the caller also has to to reveal the whole
* [TransactionForVerification], for which the caller also has to to reveal the whole transaction * transaction dependency chain.
* dependency chain.
*/ */
@Suspendable @Suspendable
override fun receiveAndVerifyTx(): TransactionParts { override fun receiveAndVerifyTx(): TransactionParts {
@ -58,10 +52,6 @@ class MyValidatingNotaryFlow(otherSide: Party, service: MyCustomValidatingNotary
} }
} }
fun processTransaction(stx: SignedTransaction) {
// Add custom transaction processing logic here
}
private fun checkSignatures(stx: SignedTransaction) { private fun checkSignatures(stx: SignedTransaction) {
try { try {
stx.verifySignaturesExcept(serviceHub.myInfo.notaryIdentity.owningKey) stx.verifySignaturesExcept(serviceHub.myInfo.notaryIdentity.owningKey)

View File

@ -31,14 +31,42 @@ for the network.
Flow versioning Flow versioning
--------------- ---------------
A platform which can be extended with CorDapps also requires the ability to version these apps as they evolve from In addition to the evolution of the platform, flows that run on top of the platform can also evolve. It may be that the
release to release. This allows users of these apps, whether they're other nodes or RPC users, to select which version flow protocol between an initiating flow and it's intiated flow changes from one CorDapp release to the next in such as
they wish to use and enables nodes to control which app versions they support. Flows have their own version numbers, way to be backwards incompatible with existing flows. For example, if a sequence of sends and receives needs to change
independent of other versioning, for example of the platform. In particular it is the initiating flow that can be versioned or if the semantics of a particular receive changes.
using the ``version`` property of the ``InitiatingFlow`` annotation. This assigns an integer version number, similar in
concept to the platform version, which is used in the session handshake process when a flow communicates with another party The ``InitiatingFlow`` annotation (see :doc:`flow-state-machine` for more information on the flow annotations) has a ``version``
for the first time. The other party will only accept the session request if it, firstly, has that flow loaded, and secondly, property, which if not specified defaults to 1. This flow version is included in the flow session handshake and exposed
for the same version (see also :doc:`flow-state-machine`). to both parties in the communication via ``FlowLogic.getFlowContext``. This takes in a ``Party`` and will return a
``FlowContext`` object which describes the flow running on the other side. In particular it has the ``flowVersion`` property
which can be used to programmatically evolve flows across versions.
.. container:: codeset
.. sourcecode:: kotlin
@Suspendable
override fun call() {
val flowVersionOfOtherParty = getFlowContext(otherParty).flowVersion
val receivedString = if (flowVersionOfOtherParty == 1) {
receive<Int>(otherParty).unwrap { it.toString() }
} else {
receive<String>(otherParty).unwrap { it }
}
}
The above shows an example evolution of a flow which in the first version was expecting to receive an Int, but then
in subsequent versions was relaxed to receive a String. This flow is still able to communicate with parties which are
running the older flow (or rather older CorDapps containing the older flow).
.. warning:: It's important that ``InitiatingFlow.version`` be incremented each time the flow protocol changes in an
incompatible way.
``FlowContext`` also has ``appName`` which is the name of the CorDapp hosting the flow. This can be used to determine
implementation details of the CorDapp. See :doc:`cordapp-build-systems` for more information on the CorDapp filename.
.. note:: Currently changing any of the properties of a ``CordaSerializable`` type is also backwards incompatible and
requires incrementing of ``InitiatingFlow.version``. This will be relaxed somewhat once the AMQP wire serialisation
format is implemented as it will automatically handle a lot of the data type migration cases.
.. note:: Currently we don't support multiple versions of the same flow loaded in the same node. This will be possible
once we start loading CorDapps in separate class loaders.

View File

@ -71,7 +71,7 @@ class Cordformation implements Plugin<Project> {
def filteredDeps = directDeps.findAll { excludes.collect { exclude -> (exclude.group == it.group) && (exclude.name == it.name) }.findAll { it }.isEmpty() } def filteredDeps = directDeps.findAll { excludes.collect { exclude -> (exclude.group == it.group) && (exclude.name == it.name) }.findAll { it }.isEmpty() }
filteredDeps.each { filteredDeps.each {
// net.corda may be a core dependency which shouldn't be included in this cordapp so give a warning // net.corda may be a core dependency which shouldn't be included in this cordapp so give a warning
if(it.group.contains('net.corda')) { if(it.group.contains('net.corda.')) {
logger.warn("You appear to have included a Corda platform component ($it) using a 'compile' or 'runtime' dependency." + logger.warn("You appear to have included a Corda platform component ($it) using a 'compile' or 'runtime' dependency." +
"This can cause node stability problems. Please use 'corda' instead." + "This can cause node stability problems. Please use 'corda' instead." +
"See http://docs.corda.net/cordapp-build-systems.html") "See http://docs.corda.net/cordapp-build-systems.html")

View File

@ -73,11 +73,7 @@ processResources {
} }
processSmokeTestResources { processSmokeTestResources {
// Build one of the demos so that we can test CorDapp scanning in CordappScanningTest. It doesn't matter which demo // Bring in the fully built corda.jar for use by NodeFactory in the smoke tests
// we use, just make sure the test is updated accordingly.
from(project(':samples:trader-demo').tasks.jar) {
rename 'trader-demo-(.*)', 'trader-demo.jar'
}
from(project(':node:capsule').tasks.buildCordaJAR) { from(project(':node:capsule').tasks.buildCordaJAR) {
rename 'corda-(.*)', 'corda.jar' rename 'corda-(.*)', 'corda.jar'
} }
@ -213,7 +209,13 @@ task integrationTest(type: Test) {
classpath = sourceSets.integrationTest.runtimeClasspath classpath = sourceSets.integrationTest.runtimeClasspath
} }
task smokeTestJar(type: Jar) {
baseName = project.name + '-smoke-test'
from sourceSets.smokeTest.output
}
task smokeTest(type: Test) { task smokeTest(type: Test) {
dependsOn smokeTestJar
testClassesDir = sourceSets.smokeTest.output.classesDir testClassesDir = sourceSets.smokeTest.output.classesDir
classpath = sourceSets.smokeTest.runtimeClasspath classpath = sourceSets.smokeTest.runtimeClasspath
} }

View File

@ -6,35 +6,40 @@ import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.testing.ALICE import net.corda.testing.ALICE
import net.corda.testing.BOB import net.corda.testing.BOB
import net.corda.core.utilities.unwrap
import net.corda.testing.node.NodeBasedTest import net.corda.testing.node.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.Test import org.junit.Test
class FlowVersioningTest : NodeBasedTest() { class FlowVersioningTest : NodeBasedTest() {
@Test @Test
fun `core flows receive platform version of initiator`() { fun `getFlowContext returns the platform version for core flows`() {
val (alice, bob) = listOf( val (alice, bob) = listOf(
startNode(ALICE.name, platformVersion = 2), startNode(ALICE.name, platformVersion = 2),
startNode(BOB.name, platformVersion = 3)).transpose().getOrThrow() startNode(BOB.name, platformVersion = 3)).transpose().getOrThrow()
bob.installCoreFlow(ClientFlow::class, ::SendBackPlatformVersionFlow) bob.installCoreFlow(PretendInitiatingCoreFlow::class, ::PretendInitiatedCoreFlow)
val resultFuture = alice.services.startFlow(ClientFlow(bob.info.legalIdentity)).resultFuture val (alicePlatformVersionAccordingToBob, bobPlatformVersionAccordingToAlice) = alice.services.startFlow(
assertThat(resultFuture.getOrThrow()).isEqualTo(2) PretendInitiatingCoreFlow(bob.info.legalIdentity)).resultFuture.getOrThrow()
assertThat(alicePlatformVersionAccordingToBob).isEqualTo(2)
assertThat(bobPlatformVersionAccordingToAlice).isEqualTo(3)
} }
@InitiatingFlow @InitiatingFlow
private class ClientFlow(val otherParty: Party) : FlowLogic<Any>() { private class PretendInitiatingCoreFlow(val initiatedParty: Party) : FlowLogic<Pair<Int, Int>>() {
@Suspendable @Suspendable
override fun call(): Any { override fun call(): Pair<Int, Int> {
return sendAndReceive<Any>(otherParty, "This is ignored. We only send to kick off the flow on the other side").unwrap { it } return Pair(
receive<Int>(initiatedParty).unwrap { it },
getFlowContext(initiatedParty).flowVersion
)
} }
} }
private class SendBackPlatformVersionFlow(val otherParty: Party, val otherPartysPlatformVersion: Int) : FlowLogic<Unit>() { private class PretendInitiatedCoreFlow(val initiatingParty: Party) : FlowLogic<Unit>() {
@Suspendable @Suspendable
override fun call() = send(otherParty, otherPartysPlatformVersion) override fun call() = send(initiatingParty, getFlowContext(initiatingParty).flowVersion)
} }
} }

View File

@ -30,7 +30,10 @@ import net.corda.flows.CashExitFlow
import net.corda.flows.CashIssueFlow import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow import net.corda.flows.CashPaymentFlow
import net.corda.flows.IssuerFlow import net.corda.flows.IssuerFlow
import net.corda.node.services.* import net.corda.node.services.ContractUpgradeHandler
import net.corda.node.services.NotaryChangeHandler
import net.corda.node.services.NotifyTransactionHandler
import net.corda.node.services.TransactionKeyHandler
import net.corda.node.services.api.* import net.corda.node.services.api.*
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.config.configureWithDevSSLCertificate
@ -62,8 +65,6 @@ import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager import net.corda.node.services.vault.VaultSoftLockManager
import net.corda.node.utilities.* import net.corda.node.utilities.*
import net.corda.node.utilities.AddOrRemove.ADD import net.corda.node.utilities.AddOrRemove.ADD
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.configureDatabase
import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.activemq.artemis.utils.ReusableLatch
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
import org.slf4j.Logger import org.slf4j.Logger
@ -282,7 +283,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private fun handleCustomNotaryService(service: NotaryService) { private fun handleCustomNotaryService(service: NotaryService) {
runOnStop += service::stop runOnStop += service::stop
service.start() service.start()
installCoreFlow(NotaryFlow.Client::class, { party: Party, version: Int -> service.createServiceFlow(party, version) }) installCoreFlow(NotaryFlow.Client::class, service::createServiceFlow)
} }
private inline fun <reified A : Annotation> Class<*>.requireAnnotation(): A { private inline fun <reified A : Annotation> Class<*>.requireAnnotation(): A {
@ -344,9 +345,15 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val initiatingFlow = initiatedFlow.requireAnnotation<InitiatedBy>().value.java val initiatingFlow = initiatedFlow.requireAnnotation<InitiatedBy>().value.java
val (version, classWithAnnotation) = initiatingFlow.flowVersionAndInitiatingClass val (version, classWithAnnotation) = initiatingFlow.flowVersionAndInitiatingClass
require(classWithAnnotation == initiatingFlow) { require(classWithAnnotation == initiatingFlow) {
"${InitiatingFlow::class.java.name} must be annotated on ${initiatingFlow.name} and not on a super-type" "${InitiatedBy::class.java.name} must point to ${classWithAnnotation.name} and not ${initiatingFlow.name}"
} }
val flowFactory = InitiatedFlowFactory.CorDapp(version, { ctor.newInstance(it) }) val jarFile = Paths.get(initiatedFlow.protectionDomain.codeSource.location.toURI())
val appName = if (jarFile.isRegularFile() && jarFile.toString().endsWith(".jar")) {
jarFile.fileName.toString().removeSuffix(".jar")
} else {
"<unknown>"
}
val flowFactory = InitiatedFlowFactory.CorDapp(version, appName, { ctor.newInstance(it) })
val observable = internalRegisterFlowFactory(initiatingFlow, flowFactory, initiatedFlow, track) val observable = internalRegisterFlowFactory(initiatingFlow, flowFactory, initiatedFlow, track)
log.info("Registered ${initiatingFlow.name} to initiate ${initiatedFlow.name} (version $version)") log.info("Registered ${initiatingFlow.name} to initiate ${initiatedFlow.name} (version $version)")
return observable return observable
@ -390,7 +397,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
* @suppress * @suppress
*/ */
@VisibleForTesting @VisibleForTesting
fun installCoreFlow(clientFlowClass: KClass<out FlowLogic<*>>, flowFactory: (Party, Int) -> FlowLogic<*>) { fun installCoreFlow(clientFlowClass: KClass<out FlowLogic<*>>, flowFactory: (Party) -> FlowLogic<*>) {
require(clientFlowClass.java.flowVersionAndInitiatingClass.first == 1) { require(clientFlowClass.java.flowVersionAndInitiatingClass.first == 1) {
"${InitiatingFlow::class.java.name}.version not applicable for core flows; their version is the node's platform version" "${InitiatingFlow::class.java.name}.version not applicable for core flows; their version is the node's platform version"
} }
@ -399,10 +406,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
} }
private fun installCoreFlows() { private fun installCoreFlows() {
installCoreFlow(BroadcastTransactionFlow::class) { otherParty, _ -> NotifyTransactionHandler(otherParty) } installCoreFlow(BroadcastTransactionFlow::class, ::NotifyTransactionHandler)
installCoreFlow(NotaryChangeFlow::class) { otherParty, _ -> NotaryChangeHandler(otherParty) } installCoreFlow(NotaryChangeFlow::class, ::NotaryChangeHandler)
installCoreFlow(ContractUpgradeFlow::class) { otherParty, _ -> ContractUpgradeHandler(otherParty) } installCoreFlow(ContractUpgradeFlow::class, ::ContractUpgradeHandler)
installCoreFlow(TransactionKeyFlow::class) { otherParty, _ -> TransactionKeyHandler(otherParty) } installCoreFlow(TransactionKeyFlow::class, ::TransactionKeyHandler)
} }
/** /**
@ -567,7 +574,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
runOnStop += this::stop runOnStop += this::stop
start() start()
} }
installCoreFlow(NotaryFlow.Client::class, { party: Party, version: Int -> service.createServiceFlow(party, version) }) installCoreFlow(NotaryFlow.Client::class, service::createServiceFlow)
} else { } else {
log.info("Notary type ${notaryServiceType.id} does not match any built-in notary types. " + log.info("Notary type ${notaryServiceType.id} does not match any built-in notary types. " +
"It is expected to be loaded via a CorDapp") "It is expected to be loaded via a CorDapp")

View File

@ -2,26 +2,14 @@ package net.corda.node.internal
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.node.services.statemachine.SessionInit
import net.corda.node.services.statemachine.SessionRejectException
interface InitiatedFlowFactory<out F : FlowLogic<*>> { sealed class InitiatedFlowFactory<out F : FlowLogic<*>> {
fun createFlow(platformVersion: Int, otherParty: Party, sessionInit: SessionInit): F protected abstract val factory: (Party) -> F
fun createFlow(otherParty: Party): F = factory(otherParty)
data class Core<out F : FlowLogic<*>>(val factory: (Party, Int) -> F) : InitiatedFlowFactory<F> { data class Core<out F : FlowLogic<*>>(override val factory: (Party) -> F) : InitiatedFlowFactory<F>()
override fun createFlow(platformVersion: Int, otherParty: Party, sessionInit: SessionInit): F { data class CorDapp<out F : FlowLogic<*>>(val flowVersion: Int,
return factory(otherParty, platformVersion) val appName: String,
} override val factory: (Party) -> F) : InitiatedFlowFactory<F>()
}
data class CorDapp<out F : FlowLogic<*>>(val version: Int, val factory: (Party) -> F) : InitiatedFlowFactory<F> {
override fun createFlow(platformVersion: Int, otherParty: Party, sessionInit: SessionInit): F {
// TODO Add support for multiple versions of the same flow when CorDapps are loaded in separate class loaders
if (sessionInit.flowVerison == version) return factory(otherParty)
throw SessionRejectException(
"Version not supported",
"Version mismatch - ${sessionInit.initiatingFlowClass} is only registered for version $version")
}
}
} }

View File

@ -1,7 +1,8 @@
package net.corda.node.services.statemachine package net.corda.node.services.statemachine
import net.corda.core.identity.Party import net.corda.core.flows.FlowContext
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.node.services.statemachine.FlowSessionState.Initiated import net.corda.node.services.statemachine.FlowSessionState.Initiated
import net.corda.node.services.statemachine.FlowSessionState.Initiating import net.corda.node.services.statemachine.FlowSessionState.Initiating
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
@ -41,7 +42,7 @@ sealed class FlowSessionState {
override val sendToParty: Party get() = otherParty override val sendToParty: Party get() = otherParty
} }
data class Initiated(val peerParty: Party, val peerSessionId: Long) : FlowSessionState() { data class Initiated(val peerParty: Party, val peerSessionId: Long, val context: FlowContext) : FlowSessionState() {
override val sendToParty: Party get() = peerParty override val sendToParty: Party get() = peerParty
} }
} }

View File

@ -10,9 +10,9 @@ import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.* import net.corda.core.flows.*
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.abbreviate
import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.abbreviate
import net.corda.core.internal.staticField import net.corda.core.internal.staticField
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.* import net.corda.core.utilities.*
@ -25,7 +25,6 @@ import net.corda.node.utilities.DatabaseTransaction
import net.corda.node.utilities.DatabaseTransactionManager import net.corda.node.utilities.DatabaseTransactionManager
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.sql.Connection
import java.sql.SQLException import java.sql.SQLException
import java.util.* import java.util.*
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -154,6 +153,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
} }
} }
@Suspendable
override fun getFlowContext(otherParty: Party, sessionFlow: FlowLogic<*>): FlowContext {
val state = getConfirmedSession(otherParty, sessionFlow).state as FlowSessionState.Initiated
return state.context
}
@Suspendable @Suspendable
override fun <T : Any> sendAndReceive(receiveType: Class<T>, override fun <T : Any> sendAndReceive(receiveType: Class<T>,
otherParty: Party, otherParty: Party,
@ -161,7 +166,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
sessionFlow: FlowLogic<*>, sessionFlow: FlowLogic<*>,
retrySend: Boolean): UntrustworthyData<T> { retrySend: Boolean): UntrustworthyData<T> {
logger.debug { "sendAndReceive(${receiveType.name}, $otherParty, ${payload.toString().abbreviate(300)}) ..." } logger.debug { "sendAndReceive(${receiveType.name}, $otherParty, ${payload.toString().abbreviate(300)}) ..." }
val session = getConfirmedSession(otherParty, sessionFlow) val session = getConfirmedSessionIfPresent(otherParty, sessionFlow)
val sessionData = if (session == null) { val sessionData = if (session == null) {
val newSession = startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = true, retryable = retrySend) val newSession = startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = true, retryable = retrySend)
// Only do a receive here as the session init has carried the payload // Only do a receive here as the session init has carried the payload
@ -179,8 +184,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
otherParty: Party, otherParty: Party,
sessionFlow: FlowLogic<*>): UntrustworthyData<T> { sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
logger.debug { "receive(${receiveType.name}, $otherParty) ..." } logger.debug { "receive(${receiveType.name}, $otherParty) ..." }
val session = getConfirmedSession(otherParty, sessionFlow) ?: val session = getConfirmedSession(otherParty, sessionFlow)
startNewSession(otherParty, sessionFlow, null, waitForConfirmation = true)
val sessionData = receiveInternal<SessionData>(session, receiveType) val sessionData = receiveInternal<SessionData>(session, receiveType)
logger.debug { "Received ${sessionData.message.payload.toString().abbreviate(300)}" } logger.debug { "Received ${sessionData.message.payload.toString().abbreviate(300)}" }
return sessionData.checkPayloadIs(receiveType) return sessionData.checkPayloadIs(receiveType)
@ -189,7 +193,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Suspendable @Suspendable
override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) { override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) {
logger.debug { "send($otherParty, ${payload.toString().abbreviate(300)})" } logger.debug { "send($otherParty, ${payload.toString().abbreviate(300)})" }
val session = getConfirmedSession(otherParty, sessionFlow) val session = getConfirmedSessionIfPresent(otherParty, sessionFlow)
if (session == null) { if (session == null) {
// Don't send the payload again if it was already piggy-backed on a session init // Don't send the payload again if it was already piggy-backed on a session init
startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = false) startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = false)
@ -257,7 +261,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
private fun FlowSession.waitForConfirmation() { private fun FlowSession.waitForConfirmation() {
val (peerParty, sessionInitResponse) = receiveInternal<SessionInitResponse>(this, null) val (peerParty, sessionInitResponse) = receiveInternal<SessionInitResponse>(this, null)
if (sessionInitResponse is SessionConfirm) { if (sessionInitResponse is SessionConfirm) {
state = FlowSessionState.Initiated(peerParty, sessionInitResponse.initiatedSessionId) state = FlowSessionState.Initiated(
peerParty,
sessionInitResponse.initiatedSessionId,
FlowContext(sessionInitResponse.flowVersion, sessionInitResponse.appName))
} else { } else {
sessionInitResponse as SessionReject sessionInitResponse as SessionReject
throw UnexpectedFlowEndException("Party ${state.sendToParty} rejected session request: ${sessionInitResponse.errorMessage}") throw UnexpectedFlowEndException("Party ${state.sendToParty} rejected session request: ${sessionInitResponse.errorMessage}")
@ -274,9 +281,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
} }
@Suspendable @Suspendable
private fun sendInternal(session: FlowSession, message: SessionMessage) { private fun sendInternal(session: FlowSession, message: SessionMessage) = suspend(SendOnly(session, message))
suspend(SendOnly(session, message))
}
private inline fun <reified M : ExistingSessionMessage> receiveInternal( private inline fun <reified M : ExistingSessionMessage> receiveInternal(
session: FlowSession, session: FlowSession,
@ -292,15 +297,21 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
} }
@Suspendable @Suspendable
private fun getConfirmedSession(otherParty: Party, sessionFlow: FlowLogic<*>): FlowSession? { private fun getConfirmedSessionIfPresent(otherParty: Party, sessionFlow: FlowLogic<*>): FlowSession? {
return openSessions[Pair(sessionFlow, otherParty)]?.apply { return openSessions[Pair(sessionFlow, otherParty)]?.apply {
if (state is FlowSessionState.Initiating) { if (state is FlowSessionState.Initiating) {
// Session still initiating, try to retrieve the init response. // Session still initiating, wait for the confirmation
waitForConfirmation() waitForConfirmation()
} }
} }
} }
@Suspendable
private fun getConfirmedSession(otherParty: Party, sessionFlow: FlowLogic<*>): FlowSession {
return getConfirmedSessionIfPresent(otherParty, sessionFlow) ?:
startNewSession(otherParty, sessionFlow, null, waitForConfirmation = true)
}
/** /**
* Creates a new session. The provided [otherParty] can be an identity of any advertised service on the network, * Creates a new session. The provided [otherParty] can be an identity of any advertised service on the network,
* and might be advertised by more than one node. Therefore we first choose a single node that advertises it * and might be advertised by more than one node. Therefore we first choose a single node that advertises it
@ -317,7 +328,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty), retryable) val session = FlowSession(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty), retryable)
openSessions[Pair(sessionFlow, otherParty)] = session openSessions[Pair(sessionFlow, otherParty)] = session
val (version, initiatingFlowClass) = sessionFlow.javaClass.flowVersionAndInitiatingClass val (version, initiatingFlowClass) = sessionFlow.javaClass.flowVersionAndInitiatingClass
val sessionInit = SessionInit(session.ourSessionId, initiatingFlowClass.name, version, firstPayload) val sessionInit = SessionInit(session.ourSessionId, initiatingFlowClass.name, version, "not defined", firstPayload)
sendInternal(session, sessionInit) sendInternal(session, sessionInit)
if (waitForConfirmation) { if (waitForConfirmation) {
session.waitForConfirmation() session.waitForConfirmation()
@ -456,6 +467,7 @@ val Class<out FlowLogic<*>>.flowVersionAndInitiatingClass: Pair<Int, Class<out F
} }
current = current.superclass current = current.superclass
?: return found ?: return found
?: throw IllegalArgumentException("$name, as a flow that initiates other flows, must be annotated with ${InitiatingFlow::class.java.name}. See https://docs.corda.net/api-flows.html#flowlogic-annotations.") ?: throw IllegalArgumentException("$name, as a flow that initiates other flows, must be annotated with " +
"${InitiatingFlow::class.java.name}. See https://docs.corda.net/api-flows.html#flowlogic-annotations.")
} }
} }

View File

@ -7,36 +7,37 @@ import net.corda.core.internal.castIfPossible
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.UntrustworthyData
/**
* These internal messages define the flow session protocol.
*/
@CordaSerializable @CordaSerializable
interface SessionMessage interface SessionMessage
data class SessionInit(val initiatorSessionId: Long,
val initiatingFlowClass: String,
val flowVerison: Int,
val firstPayload: Any?) : SessionMessage
interface ExistingSessionMessage : SessionMessage { interface ExistingSessionMessage : SessionMessage {
val recipientSessionId: Long val recipientSessionId: Long
} }
data class SessionData(override val recipientSessionId: Long, val payload: Any) : ExistingSessionMessage {
override fun toString(): String = "${javaClass.simpleName}(recipientSessionId=$recipientSessionId, payload=$payload)"
}
interface SessionInitResponse : ExistingSessionMessage { interface SessionInitResponse : ExistingSessionMessage {
val initiatorSessionId: Long val initiatorSessionId: Long
override val recipientSessionId: Long get() = initiatorSessionId override val recipientSessionId: Long get() = initiatorSessionId
} }
data class SessionConfirm(override val initiatorSessionId: Long, val initiatedSessionId: Long) : SessionInitResponse interface SessionEnd : ExistingSessionMessage
data class SessionInit(val initiatorSessionId: Long,
val initiatingFlowClass: String,
val flowVersion: Int,
val appName: String,
val firstPayload: Any?) : SessionMessage
data class SessionConfirm(override val initiatorSessionId: Long,
val initiatedSessionId: Long,
val flowVersion: Int,
val appName: String) : SessionInitResponse
data class SessionReject(override val initiatorSessionId: Long, val errorMessage: String) : SessionInitResponse data class SessionReject(override val initiatorSessionId: Long, val errorMessage: String) : SessionInitResponse
interface SessionEnd : ExistingSessionMessage data class SessionData(override val recipientSessionId: Long, val payload: Any) : ExistingSessionMessage
data class NormalSessionEnd(override val recipientSessionId: Long) : SessionEnd data class NormalSessionEnd(override val recipientSessionId: Long) : SessionEnd
data class ErrorSessionEnd(override val recipientSessionId: Long, val errorResponse: FlowException?) : SessionEnd data class ErrorSessionEnd(override val recipientSessionId: Long, val errorResponse: FlowException?) : SessionEnd
data class ReceivedSessionMessage<out M : ExistingSessionMessage>(val sender: Party, val message: M) data class ReceivedSessionMessage<out M : ExistingSessionMessage>(val sender: Party, val message: M)

View File

@ -10,10 +10,7 @@ import com.google.common.util.concurrent.MoreExecutors
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.FlowException import net.corda.core.flows.*
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.ThreadBox import net.corda.core.internal.ThreadBox
import net.corda.core.internal.bufferUntilSubscribed import net.corda.core.internal.bufferUntilSubscribed
@ -28,6 +25,7 @@ import net.corda.core.utilities.Try
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace import net.corda.core.utilities.trace
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.Checkpoint import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
@ -205,7 +203,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
* @param allowedUnsuspendedFiberCount Optional parameter is used in some tests. * @param allowedUnsuspendedFiberCount Optional parameter is used in some tests.
*/ */
fun stop(allowedUnsuspendedFiberCount: Int = 0) { fun stop(allowedUnsuspendedFiberCount: Int = 0) {
check(allowedUnsuspendedFiberCount >= 0) require(allowedUnsuspendedFiberCount >= 0)
mutex.locked { mutex.locked {
if (stopping) throw IllegalStateException("Already stopping!") if (stopping) throw IllegalStateException("Already stopping!")
stopping = true stopping = true
@ -340,23 +338,30 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun onSessionInit(sessionInit: SessionInit, receivedMessage: ReceivedMessage, sender: Party) { private fun onSessionInit(sessionInit: SessionInit, receivedMessage: ReceivedMessage, sender: Party) {
logger.trace { "Received $sessionInit from $sender" } logger.trace { "Received $sessionInit from $sender" }
val otherPartySessionId = sessionInit.initiatorSessionId val senderSessionId = sessionInit.initiatorSessionId
fun sendSessionReject(message: String) = sendSessionMessage(sender, SessionReject(otherPartySessionId, message)) fun sendSessionReject(message: String) = sendSessionMessage(sender, SessionReject(senderSessionId, message))
val session = try { val (session, initiatedFlowFactory) = try {
val initiatedFlowFactory = serviceHub.getFlowFactory(sessionInit.loadInitiatingFlowClass()) val initiatedFlowFactory = getInitiatedFlowFactory(sessionInit)
?: throw SessionRejectException("${sessionInit.initiatingFlowClass} is not registered") val flow = initiatedFlowFactory.createFlow(sender)
val flow = initiatedFlowFactory.createFlow(receivedMessage.platformVersion, sender, sessionInit) val senderFlowVersion = when (initiatedFlowFactory) {
val fiber = createFiber(flow, FlowInitiator.Peer(sender)) is InitiatedFlowFactory.Core -> receivedMessage.platformVersion // The flow version for the core flows is the platform version
val session = FlowSession(flow, random63BitValue(), sender, FlowSessionState.Initiated(sender, otherPartySessionId)) is InitiatedFlowFactory.CorDapp -> sessionInit.flowVersion
}
val session = FlowSession(
flow,
random63BitValue(),
sender,
FlowSessionState.Initiated(sender, senderSessionId, FlowContext(senderFlowVersion, sessionInit.appName)))
if (sessionInit.firstPayload != null) { if (sessionInit.firstPayload != null) {
session.receivedMessages += ReceivedSessionMessage(sender, SessionData(session.ourSessionId, sessionInit.firstPayload)) session.receivedMessages += ReceivedSessionMessage(sender, SessionData(session.ourSessionId, sessionInit.firstPayload))
} }
openSessions[session.ourSessionId] = session openSessions[session.ourSessionId] = session
val fiber = createFiber(flow, FlowInitiator.Peer(sender))
fiber.openSessions[Pair(flow, sender)] = session fiber.openSessions[Pair(flow, sender)] = session
updateCheckpoint(fiber) updateCheckpoint(fiber)
session session to initiatedFlowFactory
} catch (e: SessionRejectException) { } catch (e: SessionRejectException) {
logger.warn("${e.logMessage}: $sessionInit") logger.warn("${e.logMessage}: $sessionInit")
sendSessionReject(e.rejectMessage) sendSessionReject(e.rejectMessage)
@ -367,20 +372,28 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
return return
} }
sendSessionMessage(sender, SessionConfirm(otherPartySessionId, session.ourSessionId), session.fiber) val (ourFlowVersion, appName) = when (initiatedFlowFactory) {
// The flow version for the core flows is the platform version
is InitiatedFlowFactory.Core -> serviceHub.myInfo.platformVersion to "corda"
is InitiatedFlowFactory.CorDapp -> initiatedFlowFactory.flowVersion to initiatedFlowFactory.appName
}
sendSessionMessage(sender, SessionConfirm(senderSessionId, session.ourSessionId, ourFlowVersion, appName), session.fiber)
session.fiber.logger.debug { "Initiated by $sender using ${sessionInit.initiatingFlowClass}" } session.fiber.logger.debug { "Initiated by $sender using ${sessionInit.initiatingFlowClass}" }
session.fiber.logger.trace { "Initiated from $sessionInit on $session" } session.fiber.logger.trace { "Initiated from $sessionInit on $session" }
resumeFiber(session.fiber) resumeFiber(session.fiber)
} }
private fun SessionInit.loadInitiatingFlowClass(): Class<out FlowLogic<*>> { private fun getInitiatedFlowFactory(sessionInit: SessionInit): InitiatedFlowFactory<*> {
return try { val initiatingFlowClass = try {
Class.forName(initiatingFlowClass).asSubclass(FlowLogic::class.java) Class.forName(sessionInit.initiatingFlowClass).asSubclass(FlowLogic::class.java)
} catch (e: ClassNotFoundException) { } catch (e: ClassNotFoundException) {
throw SessionRejectException("Don't know $initiatingFlowClass") throw SessionRejectException("Don't know ${sessionInit.initiatingFlowClass}")
} catch (e: ClassCastException) { } catch (e: ClassCastException) {
throw SessionRejectException("$initiatingFlowClass is not a flow") throw SessionRejectException("${sessionInit.initiatingFlowClass} is not a flow")
} }
return serviceHub.getFlowFactory(initiatingFlowClass) ?:
throw SessionRejectException("$initiatingFlowClass is not registered")
} }
private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes<FlowStateMachineImpl<*>> { private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes<FlowStateMachineImpl<*>> {
@ -389,7 +402,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun deserializeFiber(checkpoint: Checkpoint, logger: Logger): FlowStateMachineImpl<*>? { private fun deserializeFiber(checkpoint: Checkpoint, logger: Logger): FlowStateMachineImpl<*>? {
return try { return try {
checkpoint.serializedFiber.deserialize(context = CHECKPOINT_CONTEXT.withTokenContext(serializationContext)).apply { fromCheckpoint = true } checkpoint.serializedFiber.deserialize(context = CHECKPOINT_CONTEXT.withTokenContext(serializationContext)).apply {
fromCheckpoint = true
}
} catch (t: Throwable) { } catch (t: Throwable) {
logger.error("Encountered unrestorable checkpoint!", t) logger.error("Encountered unrestorable checkpoint!", t)
null null

View File

@ -65,9 +65,7 @@ class BFTNonValidatingNotaryService(override val services: ServiceHubInternal, c
fun commitTransaction(tx: Any, otherSide: Party) = client.commitTransaction(tx, otherSide) fun commitTransaction(tx: Any, otherSide: Party) = client.commitTransaction(tx, otherSide)
override fun createServiceFlow(otherParty: Party, platformVersion: Int): FlowLogic<Void?> { override fun createServiceFlow(otherParty: Party): FlowLogic<Void?> = ServiceFlow(otherParty, this)
return ServiceFlow(otherParty, this)
}
private class ServiceFlow(val otherSide: Party, val service: BFTNonValidatingNotaryService) : FlowLogic<Void?>() { private class ServiceFlow(val otherSide: Party, val service: BFTNonValidatingNotaryService) : FlowLogic<Void?>() {
@Suspendable @Suspendable

View File

@ -15,9 +15,7 @@ class RaftNonValidatingNotaryService(override val services: ServiceHubInternal)
override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock) override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services) override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services)
override fun createServiceFlow(otherParty: Party, platformVersion: Int): NotaryFlow.Service { override fun createServiceFlow(otherParty: Party): NotaryFlow.Service = NonValidatingNotaryFlow(otherParty, this)
return NonValidatingNotaryFlow(otherParty, this)
}
override fun start() { override fun start() {
uniquenessProvider.start() uniquenessProvider.start()

View File

@ -15,9 +15,7 @@ class RaftValidatingNotaryService(override val services: ServiceHubInternal) : T
override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock) override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services) override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services)
override fun createServiceFlow(otherParty: Party, platformVersion: Int): NotaryFlow.Service { override fun createServiceFlow(otherParty: Party): NotaryFlow.Service = ValidatingNotaryFlow(otherParty, this)
return ValidatingNotaryFlow(otherParty, this)
}
override fun start() { override fun start() {
uniquenessProvider.start() uniquenessProvider.start()

View File

@ -16,9 +16,7 @@ class SimpleNotaryService(override val services: ServiceHubInternal) : TrustedAu
override val timeWindowChecker = TimeWindowChecker(services.clock) override val timeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider = PersistentUniquenessProvider() override val uniquenessProvider = PersistentUniquenessProvider()
override fun createServiceFlow(otherParty: Party, platformVersion: Int): NotaryFlow.Service { override fun createServiceFlow(otherParty: Party): NotaryFlow.Service = NonValidatingNotaryFlow(otherParty, this)
return NonValidatingNotaryFlow(otherParty, this)
}
override fun start() {} override fun start() {}
override fun stop() {} override fun stop() {}

View File

@ -16,9 +16,7 @@ class ValidatingNotaryService(override val services: ServiceHubInternal) : Trust
override val timeWindowChecker = TimeWindowChecker(services.clock) override val timeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider = PersistentUniquenessProvider() override val uniquenessProvider = PersistentUniquenessProvider()
override fun createServiceFlow(otherParty: Party, platformVersion: Int): NotaryFlow.Service { override fun createServiceFlow(otherParty: Party): NotaryFlow.Service = ValidatingNotaryFlow(otherParty, this)
return ValidatingNotaryFlow(otherParty, this)
}
override fun start() {} override fun start() {}
override fun stop() {} override fun stop() {}

View File

@ -1,52 +0,0 @@
package net.corda.node
import net.corda.core.internal.copyToDirectory
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.nodeapi.User
import net.corda.smoketesting.NodeConfig
import net.corda.smoketesting.NodeProcess
import org.assertj.core.api.Assertions.assertThat
import org.bouncycastle.asn1.x500.X500Name
import org.junit.Test
import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicInteger
class CordappScanningNodeProcessTest {
private companion object {
val user = User("user1", "test", permissions = setOf("ALL"))
val port = AtomicInteger(15100)
}
private val factory = NodeProcess.Factory()
private val aliceConfig = NodeConfig(
legalName = X500Name("CN=Alice Corp,O=Alice Corp,L=Madrid,C=ES"),
p2pPort = port.andIncrement,
rpcPort = port.andIncrement,
webPort = port.andIncrement,
extraServices = emptyList(),
users = listOf(user)
)
@Test
fun `CorDapp jar in plugins directory is scanned`() {
// If the CorDapp jar does't exist then run the smokeTestClasses gradle task
val cordappJar = Paths.get(javaClass.getResource("/trader-demo.jar").toURI())
val pluginsDir = (factory.baseDirectory(aliceConfig) / "plugins").createDirectories()
cordappJar.copyToDirectory(pluginsDir)
factory.create(aliceConfig).use {
it.connect().use {
// If the CorDapp wasn't scanned then SellerFlow won't have been picked up as an RPC flow
assertThat(it.proxy.registeredFlows()).contains("net.corda.traderdemo.flow.SellerFlow")
}
}
}
@Test
fun `empty plugins directory`() {
(factory.baseDirectory(aliceConfig) / "plugins").createDirectories()
factory.create(aliceConfig).close()
}
}

View File

@ -0,0 +1,76 @@
package net.corda.node
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.copyToDirectory
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.list
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.nodeapi.User
import net.corda.smoketesting.NodeConfig
import net.corda.smoketesting.NodeProcess
import org.assertj.core.api.Assertions.assertThat
import org.bouncycastle.asn1.x500.X500Name
import org.junit.Test
import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicInteger
import kotlin.streams.toList
class CordappSmokeTest {
private companion object {
val user = User("user1", "test", permissions = setOf("ALL"))
val port = AtomicInteger(15100)
}
private val factory = NodeProcess.Factory()
private val aliceConfig = NodeConfig(
legalName = X500Name("CN=Alice Corp,O=Alice Corp,L=Madrid,C=ES"),
p2pPort = port.andIncrement,
rpcPort = port.andIncrement,
webPort = port.andIncrement,
extraServices = emptyList(),
users = listOf(user)
)
@Test
fun `FlowContent appName returns the filename of the CorDapp jar`() {
val pluginsDir = (factory.baseDirectory(aliceConfig) / "plugins").createDirectories()
// Find the jar file for the smoke tests of this module
val selfCorDapp = Paths.get("build", "libs").list {
it.filter { "-smoke-test" in it.toString() }.toList().single()
}
selfCorDapp.copyToDirectory(pluginsDir)
factory.create(aliceConfig).use { alice ->
alice.connect().use { connectionToAlice ->
val aliceIdentity = connectionToAlice.proxy.nodeIdentity().legalIdentity
val future = connectionToAlice.proxy.startFlow(::DummyInitiatingFlow, aliceIdentity).returnValue
assertThat(future.getOrThrow().appName).isEqualTo(selfCorDapp.fileName.toString().removeSuffix(".jar"))
}
}
}
@Test
fun `empty plugins directory`() {
(factory.baseDirectory(aliceConfig) / "plugins").createDirectories()
factory.create(aliceConfig).close()
}
@InitiatingFlow
@StartableByRPC
class DummyInitiatingFlow(val otherParty: Party) : FlowLogic<FlowContext>() {
@Suspendable
override fun call() = getFlowContext(otherParty)
}
@Suppress("unused")
@InitiatedBy(DummyInitiatingFlow::class)
class DummyInitiatedFlow(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() = Unit
}
}

View File

@ -4,18 +4,19 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.Amount import net.corda.core.contracts.Amount
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowContext
import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.internal.FlowStateMachine
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.testing.DUMMY_CA
import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.UntrustworthyData
import net.corda.jackson.JacksonSupport import net.corda.jackson.JacksonSupport
import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.shell.InteractiveShell import net.corda.node.shell.InteractiveShell
import net.corda.testing.DUMMY_CA
import net.corda.testing.MEGA_CORP import net.corda.testing.MEGA_CORP
import net.corda.testing.MEGA_CORP_IDENTITY import net.corda.testing.MEGA_CORP_IDENTITY
import org.junit.Test import org.junit.Test
@ -70,32 +71,26 @@ class InteractiveShellTest {
fun party() = check("party: \"${MEGA_CORP.name}\"", MEGA_CORP.name.toString()) fun party() = check("party: \"${MEGA_CORP.name}\"", MEGA_CORP.name.toString())
class DummyFSM(val logic: FlowA) : FlowStateMachine<Any?> { class DummyFSM(val logic: FlowA) : FlowStateMachine<Any?> {
override fun getFlowContext(otherParty: Party, sessionFlow: FlowLogic<*>): FlowContext {
throw UnsupportedOperationException("not implemented")
}
override fun <T : Any> sendAndReceive(receiveType: Class<T>, otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>, retrySend: Boolean): UntrustworthyData<T> { override fun <T : Any> sendAndReceive(receiveType: Class<T>, otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>, retrySend: Boolean): UntrustworthyData<T> {
throw UnsupportedOperationException("not implemented") throw UnsupportedOperationException("not implemented")
} }
override fun <T : Any> receive(receiveType: Class<T>, otherParty: Party, sessionFlow: FlowLogic<*>): UntrustworthyData<T> { override fun <T : Any> receive(receiveType: Class<T>, otherParty: Party, sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
throw UnsupportedOperationException("not implemented") throw UnsupportedOperationException("not implemented")
} }
override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) { override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) {
throw UnsupportedOperationException("not implemented") throw UnsupportedOperationException("not implemented")
} }
override fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction { override fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction {
throw UnsupportedOperationException("not implemented") throw UnsupportedOperationException("not implemented")
} }
override val serviceHub: ServiceHub get() = throw UnsupportedOperationException()
override val serviceHub: ServiceHub override val logger: Logger get() = throw UnsupportedOperationException()
get() = throw UnsupportedOperationException() override val id: StateMachineRunId get() = throw UnsupportedOperationException()
override val logger: Logger override val resultFuture: CordaFuture<Any?> get() = throw UnsupportedOperationException()
get() = throw UnsupportedOperationException() override val flowInitiator: FlowInitiator get() = throw UnsupportedOperationException()
override val id: StateMachineRunId
get() = throw UnsupportedOperationException()
override val resultFuture: CordaFuture<Any?>
get() = throw UnsupportedOperationException()
override val flowInitiator: FlowInitiator
get() = throw UnsupportedOperationException()
override fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>) { override fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>) {
// Do nothing // Do nothing

View File

@ -260,15 +260,15 @@ class FlowFrameworkTests {
assertThat(node3Flow.receivedPayloads[0]).isEqualTo(payload) assertThat(node3Flow.receivedPayloads[0]).isEqualTo(payload)
assertSessionTransfers(node2, assertSessionTransfers(node2,
node1 sent sessionInit(SendFlow::class, 1, payload) to node2, node1 sent sessionInit(SendFlow::class, payload = payload) to node2,
node2 sent sessionConfirm to node1, node2 sent sessionConfirm() to node1,
node1 sent normalEnd to node2 node1 sent normalEnd to node2
//There's no session end from the other flows as they're manually suspended //There's no session end from the other flows as they're manually suspended
) )
assertSessionTransfers(node3, assertSessionTransfers(node3,
node1 sent sessionInit(SendFlow::class, 1, payload) to node3, node1 sent sessionInit(SendFlow::class, payload = payload) to node3,
node3 sent sessionConfirm to node1, node3 sent sessionConfirm() to node1,
node1 sent normalEnd to node3 node1 sent normalEnd to node3
//There's no session end from the other flows as they're manually suspended //There's no session end from the other flows as they're manually suspended
) )
@ -294,14 +294,14 @@ class FlowFrameworkTests {
assertSessionTransfers(node2, assertSessionTransfers(node2,
node1 sent sessionInit(ReceiveFlow::class) to node2, node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm to node1, node2 sent sessionConfirm() to node1,
node2 sent sessionData(node2Payload) to node1, node2 sent sessionData(node2Payload) to node1,
node2 sent normalEnd to node1 node2 sent normalEnd to node1
) )
assertSessionTransfers(node3, assertSessionTransfers(node3,
node1 sent sessionInit(ReceiveFlow::class) to node3, node1 sent sessionInit(ReceiveFlow::class) to node3,
node3 sent sessionConfirm to node1, node3 sent sessionConfirm() to node1,
node3 sent sessionData(node3Payload) to node1, node3 sent sessionData(node3Payload) to node1,
node3 sent normalEnd to node1 node3 sent normalEnd to node1
) )
@ -314,8 +314,8 @@ class FlowFrameworkTests {
mockNet.runNetwork() mockNet.runNetwork()
assertSessionTransfers( assertSessionTransfers(
node1 sent sessionInit(PingPongFlow::class, 1, 10L) to node2, node1 sent sessionInit(PingPongFlow::class, payload = 10L) to node2,
node2 sent sessionConfirm to node1, node2 sent sessionConfirm() to node1,
node2 sent sessionData(20L) to node1, node2 sent sessionData(20L) to node1,
node1 sent sessionData(11L) to node2, node1 sent sessionData(11L) to node2,
node2 sent sessionData(21L) to node1, node2 sent sessionData(21L) to node1,
@ -419,7 +419,7 @@ class FlowFrameworkTests {
assertSessionTransfers( assertSessionTransfers(
node1 sent sessionInit(ReceiveFlow::class) to node2, node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm to node1, node2 sent sessionConfirm() to node1,
node2 sent erroredEnd() to node1 node2 sent erroredEnd() to node1
) )
} }
@ -452,7 +452,7 @@ class FlowFrameworkTests {
assertSessionTransfers( assertSessionTransfers(
node1 sent sessionInit(ReceiveFlow::class) to node2, node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm to node1, node2 sent sessionConfirm() to node1,
node2 sent erroredEnd(erroringFlow.get().exceptionThrown) to node1 node2 sent erroredEnd(erroringFlow.get().exceptionThrown) to node1
) )
// Make sure the original stack trace isn't sent down the wire // Make sure the original stack trace isn't sent down the wire
@ -500,7 +500,7 @@ class FlowFrameworkTests {
assertSessionTransfers(node2, assertSessionTransfers(node2,
node1 sent sessionInit(ReceiveFlow::class) to node2, node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm to node1, node2 sent sessionConfirm() to node1,
node2 sent sessionData("Hello") to node1, node2 sent sessionData("Hello") to node1,
node1 sent erroredEnd() to node2 node1 sent erroredEnd() to node2
) )
@ -627,26 +627,30 @@ class FlowFrameworkTests {
} }
@Test @Test
fun `upgraded flow`() { fun `upgraded initiating flow`() {
node1.services.startFlow(UpgradedFlow(node2.info.legalIdentity)) node2.registerFlowFactory(UpgradedFlow::class, initiatedFlowVersion = 1) { SendFlow("Old initiated", it) }
val result = node1.services.startFlow(UpgradedFlow(node2.info.legalIdentity)).resultFuture
mockNet.runNetwork() mockNet.runNetwork()
assertThat(sessionTransfers).startsWith( assertThat(sessionTransfers).startsWith(
node1 sent sessionInit(UpgradedFlow::class, 2) to node2 node1 sent sessionInit(UpgradedFlow::class, flowVersion = 2) to node2,
node2 sent sessionConfirm(flowVersion = 1) to node1
) )
val (receivedPayload, node2FlowVersion) = result.getOrThrow()
assertThat(receivedPayload).isEqualTo("Old initiated")
assertThat(node2FlowVersion).isEqualTo(1)
} }
@Test @Test
fun `unsupported new flow version`() { fun `upgraded initiated flow`() {
node2.internalRegisterFlowFactory( node2.registerFlowFactory(SendFlow::class, initiatedFlowVersion = 2) { UpgradedFlow(it) }
UpgradedFlow::class.java, val initiatingFlow = SendFlow("Old initiating", node2.info.legalIdentity)
InitiatedFlowFactory.CorDapp(version = 1, factory = ::DoubleInlinedSubFlow), node1.services.startFlow(initiatingFlow)
DoubleInlinedSubFlow::class.java,
track = false)
val result = node1.services.startFlow(UpgradedFlow(node2.info.legalIdentity)).resultFuture
mockNet.runNetwork() mockNet.runNetwork()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java) assertThat(sessionTransfers).startsWith(
.isThrownBy { result.getOrThrow() } node1 sent sessionInit(SendFlow::class, flowVersion = 1, payload = "Old initiating") to node2,
.withMessageContaining("Version") node2 sent sessionConfirm(flowVersion = 2) to node1
)
assertThat(initiatingFlow.getFlowContext(node2.info.legalIdentity).flowVersion).isEqualTo(2)
} }
@Test @Test
@ -660,7 +664,7 @@ class FlowFrameworkTests {
@Test @Test
fun `unknown class in session init`() { fun `unknown class in session init`() {
node1.sendSessionMessage(SessionInit(random63BitValue(), "not.a.real.Class", 1, null), node2) node1.sendSessionMessage(SessionInit(random63BitValue(), "not.a.real.Class", 1, "version", null), node2)
mockNet.runNetwork() mockNet.runNetwork()
assertThat(sessionTransfers).hasSize(2) // Only the session-init and session-reject are expected assertThat(sessionTransfers).hasSize(2) // Only the session-init and session-reject are expected
val reject = sessionTransfers.last().message as SessionReject val reject = sessionTransfers.last().message as SessionReject
@ -669,7 +673,7 @@ class FlowFrameworkTests {
@Test @Test
fun `non-flow class in session init`() { fun `non-flow class in session init`() {
node1.sendSessionMessage(SessionInit(random63BitValue(), String::class.java.name, 1, null), node2) node1.sendSessionMessage(SessionInit(random63BitValue(), String::class.java.name, 1, "version", null), node2)
mockNet.runNetwork() mockNet.runNetwork()
assertThat(sessionTransfers).hasSize(2) // Only the session-init and session-reject are expected assertThat(sessionTransfers).hasSize(2) // Only the session-init and session-reject are expected
val reject = sessionTransfers.last().message as SessionReject val reject = sessionTransfers.last().message as SessionReject
@ -678,7 +682,7 @@ class FlowFrameworkTests {
@Test @Test
fun `single inlined sub-flow`() { fun `single inlined sub-flow`() {
node2.registerFlowFactory(SendAndReceiveFlow::class, ::SingleInlinedSubFlow) node2.registerFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
val result = node1.services.startFlow(SendAndReceiveFlow(node2.info.legalIdentity, "Hello")).resultFuture val result = node1.services.startFlow(SendAndReceiveFlow(node2.info.legalIdentity, "Hello")).resultFuture
mockNet.runNetwork() mockNet.runNetwork()
assertThat(result.getOrThrow()).isEqualTo("HelloHello") assertThat(result.getOrThrow()).isEqualTo("HelloHello")
@ -686,7 +690,7 @@ class FlowFrameworkTests {
@Test @Test
fun `double inlined sub-flow`() { fun `double inlined sub-flow`() {
node2.registerFlowFactory(SendAndReceiveFlow::class, ::DoubleInlinedSubFlow) node2.registerFlowFactory(SendAndReceiveFlow::class) { DoubleInlinedSubFlow(it) }
val result = node1.services.startFlow(SendAndReceiveFlow(node2.info.legalIdentity, "Hello")).resultFuture val result = node1.services.startFlow(SendAndReceiveFlow(node2.info.legalIdentity, "Hello")).resultFuture
mockNet.runNetwork() mockNet.runNetwork()
assertThat(result.getOrThrow()).isEqualTo("HelloHello") assertThat(result.getOrThrow()).isEqualTo("HelloHello")
@ -712,20 +716,21 @@ class FlowFrameworkTests {
private inline fun <reified P : FlowLogic<*>> MockNode.registerFlowFactory( private inline fun <reified P : FlowLogic<*>> MockNode.registerFlowFactory(
initiatingFlowClass: KClass<out FlowLogic<*>>, initiatingFlowClass: KClass<out FlowLogic<*>>,
initiatedFlowVersion: Int = 1,
noinline flowFactory: (Party) -> P): CordaFuture<P> noinline flowFactory: (Party) -> P): CordaFuture<P>
{ {
val observable = internalRegisterFlowFactory(initiatingFlowClass.java, object : InitiatedFlowFactory<P> { val observable = internalRegisterFlowFactory(
override fun createFlow(platformVersion: Int, otherParty: Party, sessionInit: SessionInit): P { initiatingFlowClass.java,
return flowFactory(otherParty) InitiatedFlowFactory.CorDapp(initiatedFlowVersion, "", flowFactory),
} P::class.java,
}, P::class.java, track = true) track = true)
return observable.toFuture() return observable.toFuture()
} }
private fun sessionInit(clientFlowClass: KClass<out FlowLogic<*>>, flowVersion: Int = 1, payload: Any? = null): SessionInit { private fun sessionInit(clientFlowClass: KClass<out FlowLogic<*>>, flowVersion: Int = 1, payload: Any? = null): SessionInit {
return SessionInit(0, clientFlowClass.java.name, flowVersion, payload) return SessionInit(0, clientFlowClass.java.name, flowVersion, "", payload)
} }
private val sessionConfirm = SessionConfirm(0, 0) private fun sessionConfirm(flowVersion: Int = 1) = SessionConfirm(0, 0, flowVersion, "")
private fun sessionData(payload: Any) = SessionData(0, payload) private fun sessionData(payload: Any) = SessionData(0, payload)
private val normalEnd = NormalSessionEnd(0) private val normalEnd = NormalSessionEnd(0)
private fun erroredEnd(errorResponse: FlowException? = null) = ErrorSessionEnd(0, errorResponse) private fun erroredEnd(errorResponse: FlowException? = null) = ErrorSessionEnd(0, errorResponse)
@ -762,8 +767,8 @@ class FlowFrameworkTests {
private fun sanitise(message: SessionMessage) = when (message) { private fun sanitise(message: SessionMessage) = when (message) {
is SessionData -> message.copy(recipientSessionId = 0) is SessionData -> message.copy(recipientSessionId = 0)
is SessionInit -> message.copy(initiatorSessionId = 0) is SessionInit -> message.copy(initiatorSessionId = 0, appName = "")
is SessionConfirm -> message.copy(initiatorSessionId = 0, initiatedSessionId = 0) is SessionConfirm -> message.copy(initiatorSessionId = 0, initiatedSessionId = 0, appName = "")
is NormalSessionEnd -> message.copy(recipientSessionId = 0) is NormalSessionEnd -> message.copy(recipientSessionId = 0)
is ErrorSessionEnd -> message.copy(recipientSessionId = 0) is ErrorSessionEnd -> message.copy(recipientSessionId = 0)
else -> message else -> message
@ -799,7 +804,6 @@ class FlowFrameworkTests {
} }
} }
@InitiatingFlow @InitiatingFlow
private open class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<Unit>() { private open class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<Unit>() {
init { init {
@ -921,9 +925,13 @@ class FlowFrameworkTests {
} }
@InitiatingFlow(version = 2) @InitiatingFlow(version = 2)
private class UpgradedFlow(val otherParty: Party) : FlowLogic<Any>() { private class UpgradedFlow(val otherParty: Party) : FlowLogic<Pair<Any, Int>>() {
@Suspendable @Suspendable
override fun call(): Any = receive<Any>(otherParty).unwrap { it } override fun call(): Pair<Any, Int> {
val received = receive<Any>(otherParty).unwrap { it }
val otherFlowVersion = getFlowContext(otherParty).flowVersion
return Pair(received, otherFlowVersion)
}
} }
private class SingleInlinedSubFlow(val otherParty: Party) : FlowLogic<Unit>() { private class SingleInlinedSubFlow(val otherParty: Party) : FlowLogic<Unit>() {