CORDA-577: FlowSession porting (#1530)

* Throw exception if a flow is initiated twice for the same Party

* Chunk of porting

* Need ReceiveTransactionFlow

(cherry picked from commit 774383e)

* Notaries compile

* TwoPartyTrade

* SimmFlow & StateRevisionFlow

(cherry picked from commit da602b1)

* TwoPArtyDealFlow regulator send

* installCoreFlow

* IRSTradeFlow
UpdateBusinessDayFlow
RatesFixFlow
NodeInterestRates

(cherry picked from commit 6c8d314)

* Added recordTransaction parameter to ReceiveTransactionFlow

* Some Tests, Flows

* Fixed typo in record tx param

* more things

* Fix CollectSignatures

* FlowFrameworkTests

(cherry picked from commit 2c50bc3)

* Fix TwoPartyTradeFlow

* CustomVaultQuery

(cherry picked from commit 48f88e8)

* FlowsInJavaTest

* WorkflowTransactionBuildTutorial

* PersistentNetworkMapCacheTest

* FlowCookBookJava

(cherry picked from commit 9b48114)

* Fix RatesFixFlow

* Fix TwoPartyDealFlow to get signature of initiating side

* Integration tests

(cherry picked from commit dbcd965)

* CordappSmokeTest

(cherry picked from commit d19cbd6)

* Inlined FinalityFlow

* Updated uses of FinalityFlow

* ContractUpgradeFlowTest passes

* CollectSignaturesFlow refactor

(cherry picked from commit 5e7b1a7)

* Check that we are not the recipient of cash

* Fix Simm demo

* WorkflowTransactionBuildTutorialTest

* Fix CashPaymentFlowTests

* ScheduledFlowTests

* FlowFrameworkTests

* Add cordappPackagesToScan Driver param

* FinalityFlowTests

* Fix LoaderTestFlow

* NodeMonitorModelTest

* BankOfCordaRPCClientTest

* rename to extraCordappPackagesToScan

* Fixed broken merge

* BankOfCordaHttpAPITest

* Fix CollectSignaturesFlow

* Fix annotation on DummyFlow to stop warning

* Fix TraderDemoTest

* Review feedback

* Doc improvements and minor changes

* Address some PR comments

* Looping regulators into the FinalityFlow broadcast rather than sending separately in TwoPartyDealFlow.

* Add Uninitiated FlowState

* Add test for double initiateFlow exception

* Some more s&r victims

* FlowSession utilities (#1562)

* Merge fix

* CollectSignatureFlow can handle several signing keys

* Actually handle several signing keys

* update kdoc

* Correct SignTransactionFlow error message

* Create deprecated flows package

* Add internal deprecated flows

* Reverted FinalityFlow to auto-broadcast all tx participants

* Move the deprecated packages into another PR
This commit is contained in:
Andras Slemmer
2017-09-21 12:12:25 +01:00
committed by josecoll
parent 78500205df
commit 33421bdd44
95 changed files with 956 additions and 1068 deletions

View File

@ -1,19 +1,16 @@
package net.corda.node
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.core.utilities.unwrap
import net.corda.node.services.FlowPermissions.Companion.startFlowPermission
import net.corda.nodeapi.User
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.chooseIdentity
import net.corda.testing.driver.driver
import org.assertj.core.api.Assertions.assertThat
@ -41,15 +38,15 @@ class CordappScanningDriverTest {
@InitiatingFlow
class ReceiveFlow(val otherParty: Party) :FlowLogic<String>() {
@Suspendable
override fun call(): String = receive<String>(otherParty).unwrap { it }
override fun call(): String = initiateFlow(otherParty).receive<String>().unwrap { it }
}
@InitiatedBy(ReceiveFlow::class)
open class SendClassFlow(val otherParty: Party) : FlowLogic<Unit>() {
open class SendClassFlow(val otherPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() = send(otherParty, javaClass.name)
override fun call() = otherPartySession.send(javaClass.name)
}
@InitiatedBy(ReceiveFlow::class)
class SendSubClassFlow(otherParty: Party) : SendClassFlow(otherParty)
class SendSubClassFlow(otherPartySession: FlowSession) : SendClassFlow(otherPartySession)
}

View File

@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
@ -32,17 +33,18 @@ class FlowVersioningTest : NodeBasedTest() {
@Suspendable
override fun call(): Pair<Int, Int> {
// Execute receive() outside of the Pair constructor to avoid Kotlin/Quasar instrumentation bug.
val alicePlatformVersionAccordingToBob = receive<Int>(initiatedParty).unwrap { it }
val session = initiateFlow(initiatedParty)
val alicePlatformVersionAccordingToBob = session.receive<Int>().unwrap { it }
return Pair(
alicePlatformVersionAccordingToBob,
getFlowInfo(initiatedParty).flowVersion
session.getCounterpartyFlowInfo().flowVersion
)
}
}
private class PretendInitiatedCoreFlow(val initiatingParty: Party) : FlowLogic<Unit>() {
private class PretendInitiatedCoreFlow(val otherSideSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() = send(initiatingParty, getFlowInfo(initiatingParty).flowVersion)
override fun call() = otherSideSession.send(otherSideSession.getCounterpartyFlowInfo().flowVersion)
}
}

View File

@ -3,7 +3,6 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.InputStreamAndHash
import net.corda.core.messaging.startFlow
import net.corda.core.transactions.TransactionBuilder
@ -39,18 +38,19 @@ class LargeTransactionsTest {
val stx = serviceHub.signInitialTransaction(tx, ourIdentity.owningKey)
// Send to the other side and wait for it to trigger resolution from us.
val bob = serviceHub.identityService.partyFromX500Name(BOB.name)!!
subFlow(SendTransactionFlow(bob, stx))
receive<Unit>(bob)
val bobSession = initiateFlow(bob)
subFlow(SendTransactionFlow(bobSession, stx))
bobSession.receive<Unit>()
}
}
@InitiatedBy(SendLargeTransactionFlow::class) @Suppress("UNUSED")
class ReceiveLargeTransactionFlow(private val counterParty: Party) : FlowLogic<Unit>() {
class ReceiveLargeTransactionFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(ReceiveTransactionFlow(counterParty))
subFlow(ReceiveTransactionFlow(otherSide))
// Unblock the other side by sending some dummy object (Unit is fine here as it's a singleton).
send(counterParty, Unit)
otherSide.send(Unit)
}
}

View File

@ -3,17 +3,16 @@ package net.corda.services.messaging
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.crypto.generateKeyPair
import net.corda.core.utilities.toBase58String
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.crypto.random63BitValue
import net.corda.core.utilities.getOrThrow
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.core.utilities.toBase58String
import net.corda.core.utilities.unwrap
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
@ -25,6 +24,8 @@ import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.User
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.chooseIdentity
import net.corda.testing.configureTestSSL
import net.corda.testing.messaging.SimpleMQClient
@ -229,12 +230,12 @@ abstract class MQSecurityTest : NodeBasedTest() {
@InitiatingFlow
private class SendFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() = send(otherParty, payload)
override fun call() = initiateFlow(otherParty).send(payload)
}
@InitiatedBy(SendFlow::class)
private class ReceiveFlow(val otherParty: Party) : FlowLogic<Any>() {
private class ReceiveFlow(val otherPartySession: FlowSession) : FlowLogic<Any>() {
@Suspendable
override fun call() = receive<Any>(otherParty).unwrap { it }
override fun call() = otherPartySession.receive<Any>().unwrap { it }
}
}

View File

@ -18,8 +18,8 @@ import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.FlowPermissions
import net.corda.nodeapi.ServiceInfo
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.nodeapi.ServiceInfo
import net.corda.nodeapi.User
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.chooseIdentity
@ -151,6 +151,6 @@ class SendMessageFlow(private val message: Message) : FlowLogic<SignedTransactio
val signedTx = serviceHub.signInitialTransaction(txBuilder)
progressTracker.currentStep = FINALISING_TRANSACTION
return subFlow(FinalityFlow(signedTx, FINALISING_TRANSACTION.childProgressTracker())).single()
return subFlow(FinalityFlow(signedTx, FINALISING_TRANSACTION.childProgressTracker()))
}
}

View File

@ -36,8 +36,8 @@ import net.corda.core.utilities.debug
import net.corda.node.internal.classloading.requireAnnotation
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.cordapp.CordappProvider
import net.corda.node.services.FinalityHandler
import net.corda.node.services.NotaryChangeHandler
import net.corda.node.services.NotifyTransactionHandler
import net.corda.node.services.api.*
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
@ -360,15 +360,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
* compatibility [flowFactory] provides a second parameter which is the platform version of the initiating party.
* @suppress
*/
@Deprecated("Use installCoreFlowExpectingFlowSession() instead")
@VisibleForTesting
fun installCoreFlow(clientFlowClass: KClass<out FlowLogic<*>>, flowFactory: (Party) -> FlowLogic<*>) {
log.warn(deprecatedFlowConstructorMessage(clientFlowClass.java))
installCoreFlowExpectingFlowSession(clientFlowClass, { flowSession -> flowFactory(flowSession.counterparty) })
}
@VisibleForTesting
fun installCoreFlowExpectingFlowSession(clientFlowClass: KClass<out FlowLogic<*>>, flowFactory: (FlowSession) -> FlowLogic<*>) {
fun installCoreFlow(clientFlowClass: KClass<out FlowLogic<*>>, flowFactory: (FlowSession) -> FlowLogic<*>) {
require(clientFlowClass.java.flowVersionAndInitiatingClass.first == 1) {
"${InitiatingFlow::class.java.name}.version not applicable for core flows; their version is the node's platform version"
}
@ -378,7 +371,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private fun installCoreFlows() {
installCoreFlow(BroadcastTransactionFlow::class, ::NotifyTransactionHandler)
installCoreFlow(FinalityFlow::class, ::FinalityHandler)
installCoreFlow(NotaryChangeFlow::class, ::NotaryChangeHandler)
installCoreFlow(ContractUpgradeFlow.Initiator::class, ::Acceptor)
installCoreFlow(SwapIdentitiesFlow::class, ::SwapIdentitiesHandler)
@ -407,10 +400,10 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
private fun makeCordappLoader(): CordappLoader {
val scanPackage = System.getProperty("net.corda.node.cordapp.scan.package")
return if (scanPackage != null) {
val scanPackages = System.getProperty("net.corda.node.cordapp.scan.packages")
return if (scanPackages != null) {
check(configuration.devMode) { "Package scanning can only occur in dev mode" }
CordappLoader.createDevMode(scanPackage)
CordappLoader.createDevMode(scanPackages)
} else {
CordappLoader.createDefault(configuration.baseDirectory)
}

View File

@ -57,19 +57,22 @@ class CordappLoader private constructor(private val cordappJarPaths: List<URL>)
* @param scanPackage Resolves the JARs that contain scanPackage and use them as the source for
* the classpath scanning.
*/
fun createDevMode(scanPackage: String): CordappLoader {
val resource = scanPackage.replace('.', '/')
val paths = this::class.java.classLoader.getResources(resource)
.asSequence()
.map {
val uri = if (it.protocol == "jar") {
(it.openConnection() as JarURLConnection).jarFileURL.toURI()
} else {
URI(it.toExternalForm().removeSuffix(resource))
fun createDevMode(scanPackages: String): CordappLoader {
val paths = scanPackages.split(",").flatMap { scanPackage ->
val resource = scanPackage.replace('.', '/')
this::class.java.classLoader.getResources(resource)
.asSequence()
.map {
val uri = if (it.protocol == "jar") {
(it.openConnection() as JarURLConnection).jarFileURL.toURI()
} else {
URI(it.toExternalForm().removeSuffix(resource))
}
uri.toURL()
}
uri.toURL()
}
.toList()
.toList()
}
return CordappLoader(paths)
}

View File

@ -1,10 +1,7 @@
package net.corda.node.services
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.AbstractStateReplacementFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.ReceiveTransactionFlow
import net.corda.core.flows.StateReplacementException
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.transactions.SignedTransaction
@ -12,15 +9,15 @@ import net.corda.core.transactions.SignedTransaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
class NotifyTransactionHandler(val otherParty: Party) : FlowLogic<Unit>() {
class FinalityHandler(private val sender: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val stx = subFlow(ReceiveTransactionFlow(otherParty))
val stx = subFlow(ReceiveTransactionFlow(sender))
serviceHub.recordTransactions(stx)
}
}
class NotaryChangeHandler(otherSide: Party) : AbstractStateReplacementFlow.Acceptor<Party>(otherSide) {
class NotaryChangeHandler(otherSideSession: FlowSession) : AbstractStateReplacementFlow.Acceptor<Party>(otherSideSession) {
/**
* Check the notary change proposal.
*

View File

@ -18,7 +18,7 @@ class FlowSessionInternal(
val ourSessionId: Long,
val initiatingParty: Party?,
var state: FlowSessionState,
val retryable: Boolean = false) {
var retryable: Boolean = false) {
val receivedMessages = ConcurrentLinkedQueue<ReceivedSessionMessage<*>>()
val fiber: FlowStateMachineImpl<*> get() = flow.stateMachine as FlowStateMachineImpl<*>
@ -30,14 +30,19 @@ class FlowSessionInternal(
/**
* [FlowSessionState] describes the session's state.
*
* [Initiating] is pre-handshake. [Initiating.otherParty] at this point holds a [Party] corresponding to either a
* specific peer or a service.
* [Uninitiated] is pre-handshake, where no communication has happened. [Initiating.otherParty] at this point holds a
* [Party] corresponding to either a specific peer or a service.
* [Initiating] is pre-handshake, where the initiating message has been sent.
* [Initiated] is post-handshake. At this point [Initiating.otherParty] will have been resolved to a specific peer
* [Initiated.peerParty], and the peer's sessionId has been initialised.
*/
sealed class FlowSessionState {
abstract val sendToParty: Party
data class Uninitiated(val otherParty: Party) : FlowSessionState() {
override val sendToParty: Party get() = otherParty
}
/** [otherParty] may be a specific peer or a service party */
data class Initiating(val otherParty: Party) : FlowSessionState() {
override val sendToParty: Party get() = otherParty

View File

@ -164,6 +164,16 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Suspendable
override fun initiateFlow(otherParty: Party, sessionFlow: FlowLogic<*>): FlowSession {
val sessionKey = Pair(sessionFlow, otherParty)
if (openSessions.containsKey(sessionKey)) {
throw IllegalStateException(
"Attempted to initiateFlow() twice in the same InitiatingFlow $sessionFlow for the same party " +
"$otherParty. This isn't supported in this version of Corda. Alternatively you may " +
"initiate a new flow by calling initiateFlow() in an " +
"@${InitiatingFlow::class.java.simpleName} sub-flow."
)
}
createNewSession(otherParty, sessionFlow)
val flowSession = FlowSessionImpl(otherParty)
flowSession.stateMachine = this
flowSession.sessionFlow = sessionFlow
@ -186,7 +196,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
logger.debug { "sendAndReceive(${receiveType.name}, $otherParty, ${payload.toString().abbreviate(300)}) ..." }
val session = getConfirmedSessionIfPresent(otherParty, sessionFlow)
val receivedSessionData: ReceivedSessionMessage<SessionData> = if (session == null) {
val newSession = startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = true, retryable = retrySend)
val newSession = initiateSession(otherParty, sessionFlow, payload, waitForConfirmation = true, retryable = retrySend)
// Only do a receive here as the session init has carried the payload
receiveInternal(newSession, receiveType)
} else {
@ -221,7 +231,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val session = getConfirmedSessionIfPresent(otherParty, sessionFlow)
if (session == null) {
// Don't send the payload again if it was already piggy-backed on a session init
startNewSession(otherParty, sessionFlow, payload, waitForConfirmation = false)
initiateSession(otherParty, sessionFlow, payload, waitForConfirmation = false)
} else {
sendInternal(session, createSessionData(session, payload))
}
@ -308,8 +318,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
private fun createSessionData(session: FlowSessionInternal, payload: Any): SessionData {
val sessionState = session.state
val peerSessionId = when (sessionState) {
is FlowSessionState.Initiating -> throw IllegalStateException("We've somehow held onto an unconfirmed session: $session")
is FlowSessionState.Initiated -> sessionState.peerSessionId
else -> throw IllegalStateException("We've somehow held onto a non-initiated session: $session")
}
return SessionData(peerSessionId, payload)
}
@ -332,37 +342,53 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Suspendable
private fun getConfirmedSessionIfPresent(otherParty: Party, sessionFlow: FlowLogic<*>): FlowSessionInternal? {
return openSessions[Pair(sessionFlow, otherParty)]?.apply {
if (state is FlowSessionState.Initiating) {
// Session still initiating, wait for the confirmation
waitForConfirmation()
val session = openSessions[Pair(sessionFlow, otherParty)] ?: return null
return when (session.state) {
is FlowSessionState.Uninitiated -> null
is FlowSessionState.Initiating -> {
session.waitForConfirmation()
session
}
is FlowSessionState.Initiated -> session
}
}
@Suspendable
private fun getConfirmedSession(otherParty: Party, sessionFlow: FlowLogic<*>): FlowSessionInternal {
return getConfirmedSessionIfPresent(otherParty, sessionFlow) ?:
startNewSession(otherParty, sessionFlow, null, waitForConfirmation = true)
initiateSession(otherParty, sessionFlow, null, waitForConfirmation = true)
}
/**
* Creates a new session. The provided [otherParty] can be an identity of any advertised service on the network,
* and might be advertised by more than one node. Therefore we first choose a single node that advertises it
* and use its *legal identity* for communication. At the moment a single node can compose its legal identity out of
* multiple public keys, but we **don't support multiple nodes advertising the same legal identity**.
*/
@Suspendable
private fun startNewSession(otherParty: Party,
sessionFlow: FlowLogic<*>,
firstPayload: Any?,
waitForConfirmation: Boolean,
retryable: Boolean = false): FlowSessionInternal {
logger.trace { "Initiating a new session with $otherParty" }
val session = FlowSessionInternal(sessionFlow, random63BitValue(), null, FlowSessionState.Initiating(otherParty), retryable)
private fun createNewSession(
otherParty: Party,
sessionFlow: FlowLogic<*>
) {
logger.trace { "Creating a new session with $otherParty" }
val session = FlowSessionInternal(sessionFlow, random63BitValue(), null, FlowSessionState.Uninitiated(otherParty))
openSessions[Pair(sessionFlow, otherParty)] = session
val (version, initiatingFlowClass) = sessionFlow.javaClass.flowVersionAndInitiatingClass
val sessionInit = SessionInit(session.ourSessionId, initiatingFlowClass.name, version, sessionFlow.javaClass.appName, firstPayload)
}
@Suspendable
private fun initiateSession(
otherParty: Party,
sessionFlow: FlowLogic<*>,
firstPayload: Any?,
waitForConfirmation: Boolean,
retryable: Boolean = false
): FlowSessionInternal {
val session = openSessions[Pair(sessionFlow, otherParty)]
if (session == null) {
throw IllegalStateException("Expected an Uninitiated session for $otherParty")
}
val state = session.state
if (state !is FlowSessionState.Uninitiated) {
throw IllegalStateException("Tried to initiate a session $session, but it's already initiating/initiated")
}
logger.trace { "Initiating a new session with ${state.otherParty}" }
session.state = FlowSessionState.Initiating(state.otherParty)
session.retryable = retryable
val (version, initiatingFlowClass) = session.flow.javaClass.flowVersionAndInitiatingClass
val sessionInit = SessionInit(session.ourSessionId, initiatingFlowClass.name, version, session.flow.javaClass.appName, firstPayload)
sendInternal(session, sessionInit)
if (waitForConfirmation) {
session.waitForConfirmation()

View File

@ -6,6 +6,7 @@ import net.corda.core.contracts.StateRef
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException
import net.corda.core.identity.CordaX500Name
@ -71,19 +72,19 @@ class BFTNonValidatingNotaryService(override val services: ServiceHubInternal,
fun commitTransaction(tx: Any, otherSide: Party) = client.commitTransaction(tx, otherSide)
override fun createServiceFlow(otherParty: Party): FlowLogic<Void?> = ServiceFlow(otherParty, this)
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = ServiceFlow(otherPartySession, this)
private class ServiceFlow(val otherSide: Party, val service: BFTNonValidatingNotaryService) : FlowLogic<Void?>() {
private class ServiceFlow(val otherSideSession: FlowSession, val service: BFTNonValidatingNotaryService) : FlowLogic<Void?>() {
@Suspendable
override fun call(): Void? {
val stx = receive<FilteredTransaction>(otherSide).unwrap { it }
val stx = otherSideSession.receive<FilteredTransaction>().unwrap { it }
val signatures = commit(stx)
send(otherSide, signatures)
otherSideSession.send(signatures)
return null
}
private fun commit(stx: FilteredTransaction): List<DigitalSignature> {
val response = service.commitTransaction(stx, otherSide)
val response = service.commitTransaction(stx, otherSideSession.counterparty)
when (response) {
is BFTSMaRt.ClusterResponse.Error -> throw NotaryException(response.error)
is BFTSMaRt.ClusterResponse.Signatures -> {

View File

@ -1,16 +1,16 @@
package net.corda.node.services.transactions
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowSession
import net.corda.core.contracts.ComponentGroupEnum
import net.corda.core.flows.NotaryFlow
import net.corda.core.flows.TransactionParts
import net.corda.core.identity.Party
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.core.transactions.FilteredTransaction
import net.corda.core.transactions.NotaryChangeWireTransaction
import net.corda.core.utilities.unwrap
class NonValidatingNotaryFlow(otherSide: Party, service: TrustedAuthorityNotaryService) : NotaryFlow.Service(otherSide, service) {
class NonValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryFlow.Service(otherSideSession, service) {
/**
* The received transaction is not checked for contract-validity, as that would require fully
* resolving it into a [TransactionForVerification], for which the caller would have to reveal the whole transaction
@ -21,7 +21,7 @@ class NonValidatingNotaryFlow(otherSide: Party, service: TrustedAuthorityNotaryS
*/
@Suspendable
override fun receiveAndVerifyTx(): TransactionParts {
val parts = receive<Any>(otherSide).unwrap {
val parts = otherSideSession.receive<Any>().unwrap {
when (it) {
is FilteredTransaction -> {
it.verify()

View File

@ -1,7 +1,7 @@
package net.corda.node.services.transactions
import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotaryFlow
import net.corda.core.identity.Party
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.node.services.api.ServiceHubInternal
@ -16,7 +16,7 @@ class RaftNonValidatingNotaryService(override val services: ServiceHubInternal,
override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services)
override fun createServiceFlow(otherParty: Party): NotaryFlow.Service = NonValidatingNotaryFlow(otherParty, this)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = NonValidatingNotaryFlow(otherPartySession, this)
override fun start() {
uniquenessProvider.start()

View File

@ -1,7 +1,7 @@
package net.corda.node.services.transactions
import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotaryFlow
import net.corda.core.identity.Party
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.node.services.api.ServiceHubInternal
@ -16,7 +16,7 @@ class RaftValidatingNotaryService(override val services: ServiceHubInternal, ove
override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services)
override fun createServiceFlow(otherParty: Party): NotaryFlow.Service = ValidatingNotaryFlow(otherParty, this)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = ValidatingNotaryFlow(otherPartySession, this)
override fun start() {
uniquenessProvider.start()

View File

@ -1,7 +1,7 @@
package net.corda.node.services.transactions
import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotaryFlow
import net.corda.core.identity.Party
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.nodeapi.ServiceType
@ -17,7 +17,7 @@ class SimpleNotaryService(override val services: ServiceHubInternal, override va
override val timeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider = PersistentUniquenessProvider()
override fun createServiceFlow(otherParty: Party): NotaryFlow.Service = NonValidatingNotaryFlow(otherParty, this)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = NonValidatingNotaryFlow(otherPartySession, this)
override fun start() {}
override fun stop() {}

View File

@ -3,7 +3,6 @@ package net.corda.node.services.transactions
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.core.transactions.SignedTransaction
import java.security.SignatureException
@ -14,7 +13,7 @@ import java.security.SignatureException
* has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was
* indeed valid.
*/
class ValidatingNotaryFlow(otherSide: Party, service: TrustedAuthorityNotaryService) : NotaryFlow.Service(otherSide, service) {
class ValidatingNotaryFlow(otherSideSession: FlowSession, service: TrustedAuthorityNotaryService) : NotaryFlow.Service(otherSideSession, service) {
/**
* The received transaction is checked for contract-validity, which requires fully resolving it into a
* [TransactionForVerification], for which the caller also has to to reveal the whole transaction
@ -23,7 +22,7 @@ class ValidatingNotaryFlow(otherSide: Party, service: TrustedAuthorityNotaryServ
@Suspendable
override fun receiveAndVerifyTx(): TransactionParts {
try {
val stx = subFlow(ReceiveTransactionFlow(otherSide, checkSufficientSignatures = false))
val stx = subFlow(ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = false))
val notary = stx.notary
checkNotary(notary)
checkSignatures(stx)

View File

@ -1,7 +1,7 @@
package net.corda.node.services.transactions
import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotaryFlow
import net.corda.core.identity.Party
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.nodeapi.ServiceType
@ -17,7 +17,7 @@ class ValidatingNotaryService(override val services: ServiceHubInternal, overrid
override val timeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider = PersistentUniquenessProvider()
override fun createServiceFlow(otherParty: Party): NotaryFlow.Service = ValidatingNotaryFlow(otherParty, this)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = ValidatingNotaryFlow(otherPartySession, this)
override fun start() {}
override fun stop() {}

View File

@ -4,7 +4,10 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.copyToDirectory
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.list
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
@ -68,21 +71,22 @@ class CordappSmokeTest {
override fun call(): Pair<FlowInfo, FlowInfo> {
// This receive will kick off SendBackInitiatorFlowContext by sending a session-init with our app name.
// SendBackInitiatorFlowContext will send back our context using the information from this session-init
val sessionInitContext = receive<FlowInfo>(otherParty).unwrap { it }
val session = initiateFlow(otherParty)
val sessionInitContext = session.receive<FlowInfo>().unwrap { it }
// This context is taken from the session-confirm message
val sessionConfirmContext = getFlowInfo(otherParty)
val sessionConfirmContext = session.getCounterpartyFlowInfo()
return Pair(sessionInitContext, sessionConfirmContext)
}
}
@Suppress("unused")
@InitiatedBy(GatherContextsFlow::class)
class SendBackInitiatorFlowContext(private val otherParty: Party) : FlowLogic<Unit>() {
class SendBackInitiatorFlowContext(private val otherPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
// An initiated flow calling getFlowContext on its initiator will get the context from the session-init
val sessionInitContext = getFlowInfo(otherParty)
send(otherParty, sessionInitContext)
val sessionInitContext = otherPartySession.getCounterpartyFlowInfo()
otherPartySession.send(sessionInitContext)
}
}
}

View File

@ -1,20 +1,21 @@
package net.corda.node.cordapp
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.node.internal.cordapp.Cordapp
import net.corda.core.flows.InitiatingFlow
import net.corda.node.internal.cordapp.CordappLoader
import org.junit.Assert
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.nio.file.Paths
import org.assertj.core.api.Assertions.assertThat
@InitiatingFlow
class DummyFlow : FlowLogic<Unit>() {
override fun call() { }
}
@InitiatedBy(DummyFlow::class)
class LoaderTestFlow : FlowLogic<Unit>() {
class LoaderTestFlow(unusedSession: FlowSession) : FlowLogic<Unit>() {
override fun call() { }
}

View File

@ -4,10 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.*
import net.corda.core.crypto.*
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.*
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.CordaX500Name
@ -562,9 +559,10 @@ class TwoPartyTradeFlowTests {
} else {
ourIdentityAndCert
}
send(buyer, TestTx(notary, price, anonymous))
val buyerSession = initiateFlow(buyer)
buyerSession.send(TestTx(notary, price, anonymous))
return subFlow(Seller(
buyer,
buyerSession,
assetToSell,
price,
myPartyAndCert))
@ -572,14 +570,14 @@ class TwoPartyTradeFlowTests {
}
@InitiatedBy(SellerInitiator::class)
class BuyerAcceptor(private val seller: Party) : FlowLogic<SignedTransaction>() {
class BuyerAcceptor(private val sellerSession: FlowSession) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val (notary, price, anonymous) = receive<TestTx>(seller).unwrap {
val (notary, price, anonymous) = sellerSession.receive<TestTx>().unwrap {
require(serviceHub.networkMapCache.isNotary(it.notaryIdentity)) { "${it.notaryIdentity} is not a notary" }
it
}
return subFlow(Buyer(seller, notary, price, CommercialPaper.State::class.java, anonymous))
return subFlow(Buyer(sellerSession, notary, price, CommercialPaper.State::class.java, anonymous))
}
}

View File

@ -4,7 +4,6 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.*
import net.corda.core.flows.*
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.node.services.VaultQueryService
import net.corda.core.node.services.queryBy
@ -16,10 +15,10 @@ import net.corda.core.node.services.vault.SortAttribute
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.StartedNode
import net.corda.nodeapi.ServiceInfo
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.nodeapi.ServiceInfo
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.chooseIdentity
import net.corda.testing.contracts.DUMMY_PROGRAM_ID
@ -50,15 +49,15 @@ class ScheduledFlowTests {
val processed: Boolean = false,
override val linearId: UniqueIdentifier = UniqueIdentifier()) : SchedulableState, LinearState {
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
if (!processed) {
return if (!processed) {
val logicRef = flowLogicRefFactory.create(ScheduledFlow::class.java, thisStateRef)
return ScheduledActivity(logicRef, creationTime)
ScheduledActivity(logicRef, creationTime)
} else {
return null
null
}
}
override val participants: List<AbstractParty> = listOf(source, destination)
override val participants: List<Party> get() = listOf(source, destination)
}
class InsertInitialStateFlow(private val destination: Party) : FlowLogic<Unit>() {
@ -70,7 +69,7 @@ class ScheduledFlowTests {
.addOutputState(scheduledState, DUMMY_PROGRAM_ID)
.addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx, setOf(ourIdentity)))
subFlow(FinalityFlow(tx))
}
}
@ -92,7 +91,7 @@ class ScheduledFlowTests {
.addOutputState(newStateOutput, DUMMY_PROGRAM_ID)
.addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx, setOf(scheduledState.source, scheduledState.destination)))
subFlow(FinalityFlow(tx, setOf(scheduledState.destination)))
}
}

View File

@ -2,6 +2,7 @@ package net.corda.node.services.network
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.CordaX500Name
@ -170,17 +171,18 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
override fun call(): String {
println("SEND FLOW to $otherParty")
println("Party key ${otherParty.owningKey.toBase58String()}")
return sendAndReceive<String>(otherParty, "Hi!").unwrap { it }
val session = initiateFlow(otherParty)
return session.sendAndReceive<String>("Hi!").unwrap { it }
}
}
@InitiatedBy(SendFlow::class)
private class SendBackFlow(val otherParty: Party) : FlowLogic<Unit>() {
private class SendBackFlow(val otherSideSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
println("SEND BACK FLOW to $otherParty")
println("Party key ${otherParty.owningKey.toBase58String()}")
send(otherParty, "Hello!")
println("SEND BACK FLOW to ${otherSideSession.counterparty}")
println("Party key ${otherSideSession.counterparty.owningKey.toBase58String()}")
otherSideSession.send("Hello!")
}
}
}

View File

@ -1,116 +0,0 @@
package net.corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.Amount
import net.corda.core.contracts.Issued
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.SendTransactionFlow
import net.corda.core.identity.Party
import net.corda.core.node.services.queryBy
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.finance.USD
import net.corda.finance.contracts.asset.Cash
import net.corda.node.internal.StartedNode
import net.corda.node.services.NotifyTransactionHandler
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.MEGA_CORP
import net.corda.testing.chooseIdentity
import net.corda.testing.node.MockNetwork
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Test
import kotlin.test.assertEquals
/**
* Tests for the data vending service.
*/
class DataVendingServiceTests {
lateinit var mockNet: MockNetwork
@Before
fun setup() {
mockNet = MockNetwork()
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `notify of transaction`() {
val nodes = mockNet.createSomeNodes(2)
val vaultServiceNode = nodes.partyNodes[0]
val registerNode = nodes.partyNodes[1]
val beneficiary = vaultServiceNode.info.chooseIdentity()
val deposit = registerNode.info.chooseIdentity().ref(1)
mockNet.runNetwork()
// Generate an issuance transaction
val ptx = TransactionBuilder(null)
Cash().generateIssue(ptx, Amount(100, Issued(deposit, USD)), beneficiary, DUMMY_NOTARY)
// Complete the cash transaction, and then manually relay it
val tx = registerNode.services.signInitialTransaction(ptx)
vaultServiceNode.database.transaction {
assertThat(vaultServiceNode.services.vaultQueryService.queryBy<Cash.State>().states.isEmpty())
registerNode.sendNotifyTx(tx, vaultServiceNode)
// Check the transaction is in the receiving node
val actual = vaultServiceNode.services.vaultQueryService.queryBy<Cash.State>().states.singleOrNull()
val expected = tx.tx.outRef<Cash.State>(0)
assertEquals(expected, actual)
}
}
/**
* Test that invalid transactions are rejected.
*/
@Test
fun `notify failure`() {
val nodes = mockNet.createSomeNodes(2)
val vaultServiceNode = nodes.partyNodes[0]
val registerNode = nodes.partyNodes[1]
val beneficiary = vaultServiceNode.info.chooseIdentity()
val deposit = MEGA_CORP.ref(1)
mockNet.runNetwork()
// Generate an issuance transaction
val ptx = TransactionBuilder(DUMMY_NOTARY)
Cash().generateIssue(ptx, Amount(100, Issued(deposit, USD)), beneficiary, DUMMY_NOTARY)
// The transaction tries issuing MEGA_CORP cash, but we aren't the issuer, so it's invalid
val tx = registerNode.services.signInitialTransaction(ptx)
vaultServiceNode.database.transaction {
assertThat(vaultServiceNode.services.vaultQueryService.queryBy<Cash.State>().states.isEmpty())
registerNode.sendNotifyTx(tx, vaultServiceNode)
// Check the transaction is not in the receiving node
assertThat(vaultServiceNode.services.vaultQueryService.queryBy<Cash.State>().states.isEmpty())
}
}
private fun StartedNode<*>.sendNotifyTx(tx: SignedTransaction, walletServiceNode: StartedNode<*>) {
walletServiceNode.internals.registerInitiatedFlow(InitiateNotifyTxFlow::class.java)
services.startFlow(NotifyTxFlow(walletServiceNode.info.chooseIdentity(), tx))
mockNet.runNetwork()
}
@InitiatingFlow
private class NotifyTxFlow(val otherParty: Party, val stx: SignedTransaction) : FlowLogic<Void?>() {
@Suspendable
override fun call() = subFlow(SendTransactionFlow(otherParty, stx))
}
@InitiatedBy(NotifyTxFlow::class)
private class InitiateNotifyTxFlow(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() = subFlow(NotifyTransactionHandler(otherParty))
}
}

View File

@ -37,7 +37,7 @@ class NodeSchemaServiceTest {
/**
* Note: this test verifies auto-scanning to register identified [MappedSchema] schemas.
* By default, Driver uses the caller package for auto-scanning:
* System.setProperty("net.corda.node.cordapp.scan.package", callerPackage)
* System.setProperty("net.corda.node.cordapp.scan.packages", callerPackage)
*/
@Test
fun `auto scanning of custom schemas for testing with Driver`() {

View File

@ -124,7 +124,7 @@ class FlowFrameworkTests {
@Test
fun `exception while fiber suspended`() {
node2.registerFlowFactory(ReceiveFlow::class) { SendFlow("Hello", it) }
node2.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
val flow = ReceiveFlow(node2.info.chooseIdentity())
val fiber = node1.services.startFlow(flow) as FlowStateMachineImpl
// Before the flow runs change the suspend action to throw an exception
@ -143,7 +143,7 @@ class FlowFrameworkTests {
@Test
fun `flow restarted just after receiving payload`() {
node2.registerFlowFactory(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
node2.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }
node1.services.startFlow(SendFlow("Hello", node2.info.chooseIdentity()))
// We push through just enough messages to get only the payload sent
@ -152,7 +152,7 @@ class FlowFrameworkTests {
node2.internals.acceptableLiveFiberCountOnStop = 1
node2.dispose()
mockNet.runNetwork()
val restoredFlow = node2.restartAndGetRestoredFlow<ReceiveFlow>(node1)
val restoredFlow = node2.restartAndGetRestoredFlow<InitiatedReceiveFlow>(node1)
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello")
}
@ -195,7 +195,7 @@ class FlowFrameworkTests {
@Test
fun `flow loaded from checkpoint will respond to messages from before start`() {
node1.registerFlowFactory(ReceiveFlow::class) { SendFlow("Hello", it) }
node1.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
node2.services.startFlow(ReceiveFlow(node1.info.chooseIdentity()).nonTerminating()) // Prepare checkpointed receive flow
// Make sure the add() has finished initial processing.
node2.smm.executor.flush()
@ -260,13 +260,13 @@ class FlowFrameworkTests {
fun `sending to multiple parties`() {
val node3 = mockNet.createNode(node1.network.myAddress)
mockNet.runNetwork()
node2.registerFlowFactory(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
node3.registerFlowFactory(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
node2.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }
node3.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() }
val payload = "Hello World"
node1.services.startFlow(SendFlow(payload, node2.info.chooseIdentity(), node3.info.chooseIdentity()))
mockNet.runNetwork()
val node2Flow = node2.getSingleFlow<ReceiveFlow>().first
val node3Flow = node3.getSingleFlow<ReceiveFlow>().first
val node2Flow = node2.getSingleFlow<InitiatedReceiveFlow>().first
val node3Flow = node3.getSingleFlow<InitiatedReceiveFlow>().first
assertThat(node2Flow.receivedPayloads[0]).isEqualTo(payload)
assertThat(node3Flow.receivedPayloads[0]).isEqualTo(payload)
@ -294,8 +294,8 @@ class FlowFrameworkTests {
mockNet.runNetwork()
val node2Payload = "Test 1"
val node3Payload = "Test 2"
node2.registerFlowFactory(ReceiveFlow::class) { SendFlow(node2Payload, it) }
node3.registerFlowFactory(ReceiveFlow::class) { SendFlow(node3Payload, it) }
node2.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(node2Payload, it) }
node3.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(node3Payload, it) }
val multiReceiveFlow = ReceiveFlow(node2.info.chooseIdentity(), node3.info.chooseIdentity()).nonTerminating()
node1.services.startFlow(multiReceiveFlow)
node1.internals.acceptableLiveFiberCountOnStop = 1
@ -420,10 +420,11 @@ class FlowFrameworkTests {
@Suspendable
override fun call() {
// Kick off the flow on the other side ...
send(otherParty, 1)
val session = initiateFlow(otherParty)
session.send(1)
// ... then pause this one until it's received the session-end message from the other side
receivedOtherFlowEnd.acquire()
sendAndReceive<Int>(otherParty, 2)
session.sendAndReceive<Int>(2)
}
}
@ -543,14 +544,14 @@ class FlowFrameworkTests {
)
}
private class ConditionalExceptionFlow(val otherParty: Party, val sendPayload: Any) : FlowLogic<Unit>() {
private class ConditionalExceptionFlow(val otherPartySession: FlowSession, val sendPayload: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val throwException = receive<Boolean>(otherParty).unwrap { it }
val throwException = otherPartySession.receive<Boolean>().unwrap { it }
if (throwException) {
throw MyFlowException("Throwing exception as requested")
}
send(otherParty, sendPayload)
otherPartySession.send(sendPayload)
}
}
@ -559,7 +560,7 @@ class FlowFrameworkTests {
@InitiatingFlow
class AskForExceptionFlow(val otherParty: Party, val throwException: Boolean) : FlowLogic<String>() {
@Suspendable
override fun call(): String = sendAndReceive<String>(otherParty, throwException).unwrap { it }
override fun call(): String = initiateFlow(otherParty).sendAndReceive<String>(throwException).unwrap { it }
}
class RetryOnExceptionFlow(val otherParty: Party) : FlowLogic<String>() {
@ -581,7 +582,7 @@ class FlowFrameworkTests {
@Test
fun `serialisation issue in counterparty`() {
node2.registerFlowFactory(ReceiveFlow::class) { SendFlow(NonSerialisableData(1), it) }
node2.registerFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(NonSerialisableData(1), it) }
val result = node1.services.startFlow(ReceiveFlow(node2.info.chooseIdentity())).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
@ -651,7 +652,7 @@ class FlowFrameworkTests {
@Test
fun `customised client flow`() {
val receiveFlowFuture = node2.registerFlowFactory(SendFlow::class) { ReceiveFlow(it) }
val receiveFlowFuture = node2.registerFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it) }
node1.services.startFlow(CustomSendFlow("Hello", node2.info.chooseIdentity())).resultFuture
mockNet.runNetwork()
assertThat(receiveFlowFuture.getOrThrow().receivedPayloads).containsOnly("Hello")
@ -668,7 +669,7 @@ class FlowFrameworkTests {
@Test
fun `upgraded initiating flow`() {
node2.registerFlowFactory(UpgradedFlow::class, initiatedFlowVersion = 1) { SendFlow("Old initiated", it) }
node2.registerFlowFactory(UpgradedFlow::class, initiatedFlowVersion = 1) { InitiatedSendFlow("Old initiated", it) }
val result = node1.services.startFlow(UpgradedFlow(node2.info.chooseIdentity())).resultFuture
mockNet.runNetwork()
assertThat(receivedSessionMessages).startsWith(
@ -684,13 +685,13 @@ class FlowFrameworkTests {
fun `upgraded initiated flow`() {
node2.registerFlowFactory(SendFlow::class, initiatedFlowVersion = 2) { UpgradedFlow(it) }
val initiatingFlow = SendFlow("Old initiating", node2.info.chooseIdentity())
node1.services.startFlow(initiatingFlow)
val flowInfo = node1.services.startFlow(initiatingFlow).resultFuture
mockNet.runNetwork()
assertThat(receivedSessionMessages).startsWith(
node1 sent sessionInit(SendFlow::class, flowVersion = 1, payload = "Old initiating") to node2,
node2 sent sessionConfirm(flowVersion = 2) to node1
)
assertThat(initiatingFlow.getFlowInfo(node2.info.chooseIdentity()).flowVersion).isEqualTo(2)
assertThat(flowInfo.get().flowVersion).isEqualTo(2)
}
@Test
@ -736,6 +737,23 @@ class FlowFrameworkTests {
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
}
@Test
fun `double initiateFlow throws`() {
val future = node1.services.startFlow(DoubleInitiatingFlow()).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(IllegalStateException::class.java)
.isThrownBy { future.getOrThrow() }
.withMessageContaining("Attempted to initiateFlow() twice")
}
@InitiatingFlow
private class DoubleInitiatingFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() {
initiateFlow(serviceHub.myInfo.chooseIdentity())
initiateFlow(serviceHub.myInfo.chooseIdentity())
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////
//region Helpers
@ -754,16 +772,7 @@ class FlowFrameworkTests {
return smm.findStateMachines(P::class.java).single()
}
@Deprecated("Use registerFlowFactoryExpectingFlowSession() instead")
private inline fun <reified P : FlowLogic<*>> StartedNode<*>.registerFlowFactory(
initiatingFlowClass: KClass<out FlowLogic<*>>,
initiatedFlowVersion: Int = 1,
noinline flowFactory: (Party) -> P): CordaFuture<P>
{
return registerFlowFactoryExpectingFlowSession(initiatingFlowClass, initiatedFlowVersion, { flowFactory(it.counterparty) })
}
private inline fun <reified P : FlowLogic<*>> StartedNode<*>.registerFlowFactoryExpectingFlowSession(
initiatingFlowClass: KClass<out FlowLogic<*>>,
initiatedFlowVersion: Int = 1,
noinline flowFactory: (FlowSession) -> P): CordaFuture<P>
@ -858,13 +867,25 @@ class FlowFrameworkTests {
}
@InitiatingFlow
private open class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<Unit>() {
private open class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<FlowInfo>() {
init {
require(otherParties.isNotEmpty())
}
@Suspendable
override fun call() = otherParties.forEach { send(it, payload) }
override fun call(): FlowInfo {
val flowInfos = otherParties.map {
val session = initiateFlow(it)
session.send(payload)
session.getCounterpartyFlowInfo()
}.toList()
return flowInfos.first()
}
}
private open class InitiatedSendFlow(val payload: Any, val otherPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() = otherPartySession.send(payload)
}
private interface CustomInterface
@ -890,7 +911,7 @@ class FlowFrameworkTests {
@Suspendable
override fun call() {
progressTracker.currentStep = START_STEP
receivedPayloads = otherParties.map { receive<String>(it).unwrap { it } }
receivedPayloads = otherParties.map { initiateFlow(it).receive<String>().unwrap { it } }
progressTracker.currentStep = RECEIVED_STEP
if (nonTerminating) {
Fiber.park()
@ -903,26 +924,54 @@ class FlowFrameworkTests {
}
}
@InitiatingFlow
private class SendAndReceiveFlow(val otherParty: Party, val payload: Any) : FlowLogic<Any>() {
@Suspendable
override fun call(): Any = sendAndReceive<Any>(otherParty, payload).unwrap { it }
}
private class InitiatedReceiveFlow(val otherPartySession: FlowSession) : FlowLogic<Unit>() {
object START_STEP : ProgressTracker.Step("Starting")
object RECEIVED_STEP : ProgressTracker.Step("Received")
override val progressTracker: ProgressTracker = ProgressTracker(START_STEP, RECEIVED_STEP)
private var nonTerminating: Boolean = false
@Transient
var receivedPayloads: List<String> = emptyList()
private class InlinedSendFlow(val payload: String, val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() = send(otherParty, payload)
override fun call() {
progressTracker.currentStep = START_STEP
receivedPayloads = listOf(otherPartySession.receive<String>().unwrap { it })
progressTracker.currentStep = RECEIVED_STEP
if (nonTerminating) {
Fiber.park()
}
}
fun nonTerminating(): InitiatedReceiveFlow {
nonTerminating = true
return this
}
}
@InitiatingFlow
private class PingPongFlow(val otherParty: Party, val payload: Long) : FlowLogic<Unit>() {
private class SendAndReceiveFlow(val otherParty: Party, val payload: Any, val otherPartySession: FlowSession? = null) : FlowLogic<Any>() {
constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession)
@Suspendable
override fun call(): Any = (otherPartySession ?: initiateFlow(otherParty)).sendAndReceive<Any>(payload).unwrap { it }
}
private class InlinedSendFlow(val payload: String, val otherPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() = otherPartySession.send(payload)
}
@InitiatingFlow
private class PingPongFlow(val otherParty: Party, val payload: Long, val otherPartySession: FlowSession? = null) : FlowLogic<Unit>() {
constructor(otherPartySession: FlowSession, payload: Long) : this(otherPartySession.counterparty, payload, otherPartySession)
@Transient var receivedPayload: Long? = null
@Transient var receivedPayload2: Long? = null
@Suspendable
override fun call() {
receivedPayload = sendAndReceive<Long>(otherParty, payload).unwrap { it }
receivedPayload2 = sendAndReceive<Long>(otherParty, payload + 1).unwrap { it }
val session = otherPartySession ?: initiateFlow(otherParty)
receivedPayload = session.sendAndReceive<Long>(payload).unwrap { it }
receivedPayload2 = session.sendAndReceive<Long>(payload + 1).unwrap { it }
}
}
@ -950,17 +999,18 @@ class FlowFrameworkTests {
class Waiter(val stx: SignedTransaction, val otherParty: Party) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
send(otherParty, stx)
val otherPartySession = initiateFlow(otherParty)
otherPartySession.send(stx)
return waitForLedgerCommit(stx.id)
}
}
class Committer(val otherParty: Party, val throwException: (() -> Exception)? = null) : FlowLogic<SignedTransaction>() {
class Committer(val otherPartySession: FlowSession, val throwException: (() -> Exception)? = null) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val stx = receive<SignedTransaction>(otherParty).unwrap { it }
val stx = otherPartySession.receive<SignedTransaction>().unwrap { it }
if (throwException != null) throw throwException.invoke()
return subFlow(FinalityFlow(stx, setOf(otherParty))).single()
return subFlow(FinalityFlow(stx, setOf(otherPartySession.counterparty)))
}
}
}
@ -969,7 +1019,8 @@ class FlowFrameworkTests {
private class VaultQueryFlow(val stx: SignedTransaction, val otherParty: Party) : FlowLogic<List<StateAndRef<ContractState>>>() {
@Suspendable
override fun call(): List<StateAndRef<ContractState>> {
send(otherParty, stx)
val otherPartySession = initiateFlow(otherParty)
otherPartySession.send(stx)
// hold onto reference here to force checkpoint of vaultQueryService and thus
// prove it is registered as a tokenizableService in the node
val vaultQuerySvc = serviceHub.vaultQueryService
@ -979,27 +1030,29 @@ class FlowFrameworkTests {
}
@InitiatingFlow(version = 2)
private class UpgradedFlow(val otherParty: Party) : FlowLogic<Pair<Any, Int>>() {
private class UpgradedFlow(val otherParty: Party, val otherPartySession: FlowSession? = null) : FlowLogic<Pair<Any, Int>>() {
constructor(otherPartySession: FlowSession) : this(otherPartySession.counterparty, otherPartySession)
@Suspendable
override fun call(): Pair<Any, Int> {
val received = receive<Any>(otherParty).unwrap { it }
val otherFlowVersion = getFlowInfo(otherParty).flowVersion
val otherPartySession = this.otherPartySession ?: initiateFlow(otherParty)
val received = otherPartySession.receive<Any>().unwrap { it }
val otherFlowVersion = otherPartySession.getCounterpartyFlowInfo().flowVersion
return Pair(received, otherFlowVersion)
}
}
private class SingleInlinedSubFlow(val otherParty: Party) : FlowLogic<Unit>() {
private class SingleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val payload = receive<String>(otherParty).unwrap { it }
subFlow(InlinedSendFlow(payload + payload, otherParty))
val payload = otherPartySession.receive<String>().unwrap { it }
subFlow(InlinedSendFlow(payload + payload, otherPartySession))
}
}
private class DoubleInlinedSubFlow(val otherParty: Party) : FlowLogic<Unit>() {
private class DoubleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(SingleInlinedSubFlow(otherParty))
subFlow(SingleInlinedSubFlow(otherPartySession))
}
}