CORDA-1942: StartedMockNode.registerResponderFlow simplified (#4483)

The ResponderFlowFactory parameter is not used and so removed. Also, instead of returning a Future it returns an Observable of responder flows, to support multiple invocations. And finally renamed to registerInitiatedFlow to stick with the existing naming strategy.
This commit is contained in:
Shams Asari 2019-01-02 12:49:29 +00:00 committed by GitHub
parent 9a484998bb
commit c205bd2a21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 39 additions and 89 deletions

View File

@ -6505,10 +6505,6 @@ public final class net.corda.testing.node.NotarySpec extends java.lang.Object
public int hashCode()
public String toString()
##
public interface net.corda.testing.node.ResponderFlowFactory
@NotNull
public abstract F invoke(net.corda.core.flows.FlowSession)
##
public final class net.corda.testing.node.StartedMockNode extends java.lang.Object
@NotNull
public final java.util.List<kotlin.Pair<F, net.corda.core.concurrent.CordaFuture<?>>> findStateMachines(Class<F>)

View File

@ -145,7 +145,8 @@ Version 4.0
* Removed experimental feature ``CordformDefinition``
* Added ``registerResponderFlow`` method to ``StartedMockNode``, to support isolated testing of responder flow behaviour.
* Added new overload of ``StartedMockNode.registerInitiatedFlow`` which allows registering custom initiating-responder flow pairs, which
can be useful for testing error cases.
* "app", "rpc", "p2p" and "unknown" are no longer allowed as uploader values when importing attachments. These are used
internally in security sensitive code.

View File

@ -4,16 +4,14 @@ import com.google.common.jimfs.Jimfs
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.toFuture
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.config.NodeConfiguration
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.DUMMY_NOTARY_NAME
@ -24,7 +22,6 @@ import net.corda.testing.node.internal.newContext
import rx.Observable
import java.math.BigInteger
import java.nio.file.Path
import java.util.concurrent.Future
/**
* Immutable builder for configuring a [StartedMockNode] or an [UnstartedMockNode] via [MockNetwork.createNode] and
@ -175,13 +172,31 @@ class StartedMockNode private constructor(private val node: TestStartedNode) {
/**
* Starts an already constructed flow. Note that you must be on the server thread to call this method.
* @param context indicates who started the flow, see: [InvocationContext].
*/
fun <T> startFlow(logic: FlowLogic<T>): CordaFuture<T> = node.services.startFlow(logic, node.services.newContext()).getOrThrow().resultFuture
/** Register a flow that is initiated by another flow .**/
/**
* Manually register an initiating-responder flow pair based on the [FlowLogic] annotations.
*
* @param initiatedFlowClass [FlowLogic] class which is annotated with [InitiatedBy].
* @return An [Observable] which emits responder flows each time one is executed.
*/
fun <F : FlowLogic<*>> registerInitiatedFlow(initiatedFlowClass: Class<F>): Observable<F> = node.registerInitiatedFlow(initiatedFlowClass)
/**
* Register a *custom* relationship between initiating and receiving flow on a node-by-node basis. This is used when
* we want to manually specify that a particular initiating flow class will have a particular responder.
*
* Note that this change affects _only_ the node on which this method is called, and not the entire network.
*
* @param initiatingFlowClass The [FlowLogic]-inheriting class to register a new responder for.
* @param initiatedFlowClass The class of the responder flow.
* @return An [Observable] which emits responder flows each time one is executed.
*/
fun <F : FlowLogic<*>> registerInitiatedFlow(initiatingFlowClass: Class<out FlowLogic<*>>, initiatedFlowClass: Class<F>): Observable<F> {
return node.registerInitiatedFlow(initiatingFlowClass, initiatedFlowClass)
}
/** Stop the node. **/
fun stop() = node.internals.stop()
@ -209,65 +224,8 @@ class StartedMockNode private constructor(private val node: TestStartedNode) {
statement()
}
}
/**
* Register an [InitiatedFlowFactory], to control relationship between initiating and receiving flow classes
* explicitly on a node-by-node basis. This is used when we want to manually specify that a particular initiating
* flow class will have a particular responder.
*
* An [ResponderFlowFactory] is responsible for converting a [FlowSession] into the [FlowLogic] that will respond
* to the initiated flow. The registry records one responder type, and hence one factory, for each initiator flow
* type. If a factory is already registered for the type, it is overwritten in the registry when a new factory is
* registered.
*
* Note that this change affects _only_ the node on which this method is called, and not the entire network.
*
* @property initiatingFlowClass The [FlowLogic]-inheriting class to register a new responder for.
* @property flowFactory The flow factory that will create the responding flow.
* @property responderFlowClass The class of the responder flow.
* @return A [CordaFuture] that will complete the first time the responding flow is created.
*/
fun <F : FlowLogic<*>> registerResponderFlow(initiatingFlowClass: Class<out FlowLogic<*>>,
flowFactory: ResponderFlowFactory<F>,
responderFlowClass: Class<F>): CordaFuture<F> =
node.registerInitiatedFlow(initiatingFlowClass, responderFlowClass).toFuture()
}
/**
* Responsible for converting a [FlowSession] into the [FlowLogic] that will respond to an initiated flow.
*
* @param F The [FlowLogic]-inherited type of the responder class this factory creates.
*/
@FunctionalInterface
interface ResponderFlowFactory<F : FlowLogic<*>> {
/**
* Given the provided [FlowSession], create a responder [FlowLogic] of the desired type.
*
* @param flowSession The [FlowSession] to use to create the responder flow object.
* @return The constructed responder flow object.
*/
fun invoke(flowSession: FlowSession): F
}
/**
* Kotlin-only utility function using a reified type parameter and a lambda parameter to simplify the
* [InitiatedFlowFactory.registerFlowFactory] function.
*
* @param F The [FlowLogic]-inherited type of the responder to register.
* @property initiatingFlowClass The [FlowLogic]-inheriting class to register a new responder for.
* @property flowFactory A lambda converting a [FlowSession] into an instance of the responder class [F].
* @return A [CordaFuture] that will complete the first time the responding flow is created.
*/
inline fun <reified F : FlowLogic<*>> StartedMockNode.registerResponderFlow(
initiatingFlowClass: Class<out FlowLogic<*>>,
noinline flowFactory: (FlowSession) -> F): Future<F> = registerResponderFlow(
initiatingFlowClass,
object : ResponderFlowFactory<F> {
override fun invoke(flowSession: FlowSession) = flowFactory(flowSession)
},
F::class.java)
/**
* A mock node brings up a suite of in-memory services in a fast manner suitable for unit testing.
* Components that do IO are either swapped out for mocks, or pointed to a [Jimfs] in memory filesystem or an in

View File

@ -1,6 +1,7 @@
package net.corda.testing.node;
import co.paralleluniverse.fibers.Suspendable;
import net.corda.core.Utils;
import net.corda.core.concurrent.CordaFuture;
import net.corda.core.flows.*;
import net.corda.core.identity.Party;
@ -39,10 +40,10 @@ public class TestResponseFlowInIsolationInJava {
@Test
public void test() throws Exception {
// This method returns the Responder flow object used by node B.
Future<Responder> initiatedResponderFlowFuture = b.registerResponderFlow(
Future<Responder> initiatedResponderFlowFuture = Utils.toFuture(b.registerInitiatedFlow(
// We tell node B to respond to BadInitiator with Responder.
// We want to observe the Responder flow object to check for errors.
BadInitiator.class, Responder::new, Responder.class);
BadInitiator.class, Responder.class));
// We run the BadInitiator flow on node A.
BadInitiator flow = new BadInitiator(b.getInfo().getLegalIdentities().get(0));

View File

@ -3,16 +3,15 @@ package net.corda.testing.node.internal
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.toFuture
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.testing.internal.chooseIdentity
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetworkParameters
import net.corda.testing.node.registerResponderFlow
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Test
import java.util.concurrent.ExecutionException
import kotlin.test.assertFailsWith
/**
* Test based on the example given as an answer to this SO question:
@ -32,7 +31,7 @@ class TestResponseFlowInIsolation {
// This is the real implementation of Initiator.
@InitiatingFlow
open class Initiator(val counterparty: Party) : FlowLogic<Unit>() {
open class Initiator(private val counterparty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(counterparty)
@ -42,7 +41,7 @@ class TestResponseFlowInIsolation {
// This is the response flow that we want to isolate for testing.
@InitiatedBy(Initiator::class)
class Responder(val counterpartySession: FlowSession) : FlowLogic<Unit>() {
class Responder(private val counterpartySession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val string = counterpartySession.receive<String>().unwrap { contents -> contents }
@ -54,7 +53,7 @@ class TestResponseFlowInIsolation {
// This is a fake implementation of Initiator to check how Responder responds to non-golden-path scenarios.
@InitiatingFlow
class BadInitiator(val counterparty: Party): FlowLogic<Unit>() {
class BadInitiator(private val counterparty: Party): FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(counterparty)
@ -63,12 +62,10 @@ class TestResponseFlowInIsolation {
}
@Test
fun `test`() {
fun test() {
// This method returns the Responder flow object used by node B.
// We tell node B to respond to BadInitiator with Responder.
val initiatedResponderFlowFuture = b.registerResponderFlow(
initiatingFlowClass = BadInitiator::class.java,
flowFactory = ::Responder)
val initiatedResponderFlowFuture = b.registerInitiatedFlow(BadInitiator::class.java, Responder::class.java).toFuture()
// We run the BadInitiator flow on node A.
val flow = BadInitiator(b.info.chooseIdentity())
@ -77,14 +74,11 @@ class TestResponseFlowInIsolation {
future.get()
// We check that the invocation of the Responder flow object has caused an ExecutionException.
val initiatedResponderFlow = initiatedResponderFlowFuture.get()
val initiatedResponderFlow = initiatedResponderFlowFuture.getOrThrow()
val initiatedResponderFlowResultFuture = initiatedResponderFlow.stateMachine.resultFuture
val exceptionFromFlow = assertFailsWith<ExecutionException> {
initiatedResponderFlowResultFuture.get()
}.cause
assertThat(exceptionFromFlow)
.isInstanceOf(FlowException::class.java)
.hasMessage("String did not contain the expected message.")
assertThatExceptionOfType(FlowException::class.java)
.isThrownBy { initiatedResponderFlowResultFuture.getOrThrow() }
.withMessage("String did not contain the expected message.")
}
}
}