diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index 96b9a9d4c1..3bfa44fe5f 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -19,7 +19,6 @@ import net.corda.core.utilities.trace import net.corda.node.services.messaging.* import net.corda.node.utilities.AffinityExecutor import net.corda.nodeapi.internal.persistence.CordaPersistence -import net.corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging import org.apache.activemq.artemis.utils.ReusableLatch import org.slf4j.LoggerFactory import rx.Observable @@ -33,8 +32,8 @@ import kotlin.concurrent.schedule import kotlin.concurrent.thread /** - * An in-memory network allows you to manufacture [InMemoryMessaging]s for a set of participants. Each - * [InMemoryMessaging] maintains a queue of messages it has received, and a background thread that dispatches + * An in-memory network allows you to manufacture [TestMessagingService]s for a set of participants. Each + * [TestMessagingService] maintains a queue of messages it has received, and a background thread that dispatches * messages one by one to registered handlers. Alternatively, a messaging system may be manually pumped, in which * case no thread is created and a caller is expected to force delivery one at a time (this is useful for unit * testing). @@ -88,7 +87,7 @@ class InMemoryMessagingNetwork internal constructor( val receivedMessages: Observable get() = _receivedMessages - val endpoints: List @Synchronized get() = handleEndpointMap.values.toList() + val endpoints: List @Synchronized get() = handleEndpointMap.values.toList() /** * Creates a node at the given address: useful if you want to recreate a node to simulate a restart. * @@ -99,14 +98,14 @@ class InMemoryMessagingNetwork internal constructor( * @param id the numeric ID to use, e.g. set to whatever ID the node used last time. * @param description text string that identifies this node for message logging (if is enabled) or null to autogenerate. */ - fun createNodeWithID( + internal fun createNodeWithID( manuallyPumped: Boolean, id: Int, executor: AffinityExecutor, notaryService: PartyAndCertificate?, description: CordaX500Name = CordaX500Name(organisation = "In memory node $id", locality = "London", country = "UK"), database: CordaPersistence) - : InMemoryMessaging { + : TestMessagingService { val peerHandle = PeerHandle(id, description) peersMapping[peerHandle.description] = peerHandle // Assume that the same name - the same entity in MockNetwork. notaryService?.let { if (it.owningKey !is CompositeKey) peersMapping[it.name] = peerHandle } @@ -256,17 +255,20 @@ class InMemoryMessagingNetwork internal constructor( override val peer: CordaX500Name) : ReceivedMessage /** - * An [InMemoryMessaging] provides a [MessagingService] that isn't backed by any kind of network or disk storage - * system, but just uses regular queues on the heap instead. It is intended for unit testing and developer convenience - * when all entities on 'the network' are being simulated in-process. - * - * An instance can be obtained by creating a builder and then using the start method. + * A [TestMessagingService] that provides a [MessagingService] abstraction that also contains the ability to + * receive messages from the queue for testing purposes. */ + @DoNotImplement + interface TestMessagingService : MessagingService { + fun pumpReceive(block: Boolean): InMemoryMessagingNetwork.MessageTransfer? + fun stop() + } + @ThreadSafe - inner class InMemoryMessaging(private val manuallyPumped: Boolean, - private val peerHandle: PeerHandle, - private val executor: AffinityExecutor, - private val database: CordaPersistence) : SingletonSerializeAsToken(), MessagingService { + private inner class InMemoryMessaging(private val manuallyPumped: Boolean, + private val peerHandle: PeerHandle, + private val executor: AffinityExecutor, + private val database: CordaPersistence) : SingletonSerializeAsToken(), TestMessagingService { private inner class Handler(val topicSession: TopicSession, val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration @@ -341,7 +343,7 @@ class InMemoryMessagingNetwork internal constructor( acknowledgementHandler?.invoke() } - fun stop() { + override fun stop() { if (backgroundThread != null) { backgroundThread.interrupt() backgroundThread.join() @@ -364,7 +366,7 @@ class InMemoryMessagingNetwork internal constructor( * * @return the message that was processed, if any in this round. */ - fun pumpReceive(block: Boolean): MessageTransfer? { + override fun pumpReceive(block: Boolean): MessageTransfer? { check(manuallyPumped) check(running) executor.flush() diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt index 7e8aa8225f..b9901a881c 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -62,7 +62,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger fun StartedNode.pumpReceive(block: Boolean = false): InMemoryMessagingNetwork.MessageTransfer? { - return (network as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(block) + return (network as InMemoryMessagingNetwork.TestMessagingService).pumpReceive(block) } /** Helper builder for configuring a [MockNetwork] from Java. */ @@ -172,29 +172,32 @@ open class MockNetwork(private val cordappPackages: List, * Returns the single notary node on the network. Throws if there are none or more than one. * @see notaryNodes */ - val defaultNotaryNode: StartedNode get() { - return when (notaryNodes.size) { - 0 -> throw IllegalStateException("There are no notaries defined on the network") - 1 -> notaryNodes[0] - else -> throw IllegalStateException("There is more than one notary defined on the network") + val defaultNotaryNode: StartedNode + get() { + return when (notaryNodes.size) { + 0 -> throw IllegalStateException("There are no notaries defined on the network") + 1 -> notaryNodes[0] + else -> throw IllegalStateException("There is more than one notary defined on the network") + } } - } /** * Return the identity of the default notary node. * @see defaultNotaryNode */ - val defaultNotaryIdentity: Party get() { - return defaultNotaryNode.info.legalIdentities.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities") - } + val defaultNotaryIdentity: Party + get() { + return defaultNotaryNode.info.legalIdentities.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities") + } /** * Return the identity of the default notary node. * @see defaultNotaryNode */ - val defaultNotaryIdentityAndCert: PartyAndCertificate get() { - return defaultNotaryNode.info.legalIdentitiesAndCerts.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities") - } + val defaultNotaryIdentityAndCert: PartyAndCertificate + get() { + return defaultNotaryNode.info.legalIdentitiesAndCerts.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities") + } /** * Because this executor is shared, we need to be careful about nodes shutting it down.