Abstract InMemoryMessaging behind an interface so as not expose net.corda.nodeapi.internal.persistence.CordaPersistence in public API (#2390)

This commit is contained in:
Anthony Keenan 2018-01-19 10:57:08 +00:00 committed by GitHub
parent de4c062529
commit 35f89e03ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 30 deletions

View File

@ -19,7 +19,6 @@ import net.corda.core.utilities.trace
import net.corda.node.services.messaging.*
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging
import org.apache.activemq.artemis.utils.ReusableLatch
import org.slf4j.LoggerFactory
import rx.Observable
@ -33,8 +32,8 @@ import kotlin.concurrent.schedule
import kotlin.concurrent.thread
/**
* An in-memory network allows you to manufacture [InMemoryMessaging]s for a set of participants. Each
* [InMemoryMessaging] maintains a queue of messages it has received, and a background thread that dispatches
* An in-memory network allows you to manufacture [TestMessagingService]s for a set of participants. Each
* [TestMessagingService] maintains a queue of messages it has received, and a background thread that dispatches
* messages one by one to registered handlers. Alternatively, a messaging system may be manually pumped, in which
* case no thread is created and a caller is expected to force delivery one at a time (this is useful for unit
* testing).
@ -88,7 +87,7 @@ class InMemoryMessagingNetwork internal constructor(
val receivedMessages: Observable<MessageTransfer>
get() = _receivedMessages
val endpoints: List<InMemoryMessaging> @Synchronized get() = handleEndpointMap.values.toList()
val endpoints: List<TestMessagingService> @Synchronized get() = handleEndpointMap.values.toList()
/**
* Creates a node at the given address: useful if you want to recreate a node to simulate a restart.
*
@ -99,14 +98,14 @@ class InMemoryMessagingNetwork internal constructor(
* @param id the numeric ID to use, e.g. set to whatever ID the node used last time.
* @param description text string that identifies this node for message logging (if is enabled) or null to autogenerate.
*/
fun createNodeWithID(
internal fun createNodeWithID(
manuallyPumped: Boolean,
id: Int,
executor: AffinityExecutor,
notaryService: PartyAndCertificate?,
description: CordaX500Name = CordaX500Name(organisation = "In memory node $id", locality = "London", country = "UK"),
database: CordaPersistence)
: InMemoryMessaging {
: TestMessagingService {
val peerHandle = PeerHandle(id, description)
peersMapping[peerHandle.description] = peerHandle // Assume that the same name - the same entity in MockNetwork.
notaryService?.let { if (it.owningKey !is CompositeKey) peersMapping[it.name] = peerHandle }
@ -256,17 +255,20 @@ class InMemoryMessagingNetwork internal constructor(
override val peer: CordaX500Name) : ReceivedMessage
/**
* An [InMemoryMessaging] provides a [MessagingService] that isn't backed by any kind of network or disk storage
* system, but just uses regular queues on the heap instead. It is intended for unit testing and developer convenience
* when all entities on 'the network' are being simulated in-process.
*
* An instance can be obtained by creating a builder and then using the start method.
* A [TestMessagingService] that provides a [MessagingService] abstraction that also contains the ability to
* receive messages from the queue for testing purposes.
*/
@DoNotImplement
interface TestMessagingService : MessagingService {
fun pumpReceive(block: Boolean): InMemoryMessagingNetwork.MessageTransfer?
fun stop()
}
@ThreadSafe
inner class InMemoryMessaging(private val manuallyPumped: Boolean,
private val peerHandle: PeerHandle,
private val executor: AffinityExecutor,
private val database: CordaPersistence) : SingletonSerializeAsToken(), MessagingService {
private inner class InMemoryMessaging(private val manuallyPumped: Boolean,
private val peerHandle: PeerHandle,
private val executor: AffinityExecutor,
private val database: CordaPersistence) : SingletonSerializeAsToken(), TestMessagingService {
private inner class Handler(val topicSession: TopicSession,
val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@ -341,7 +343,7 @@ class InMemoryMessagingNetwork internal constructor(
acknowledgementHandler?.invoke()
}
fun stop() {
override fun stop() {
if (backgroundThread != null) {
backgroundThread.interrupt()
backgroundThread.join()
@ -364,7 +366,7 @@ class InMemoryMessagingNetwork internal constructor(
*
* @return the message that was processed, if any in this round.
*/
fun pumpReceive(block: Boolean): MessageTransfer? {
override fun pumpReceive(block: Boolean): MessageTransfer? {
check(manuallyPumped)
check(running)
executor.flush()

View File

@ -62,7 +62,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
fun StartedNode<MockNetwork.MockNode>.pumpReceive(block: Boolean = false): InMemoryMessagingNetwork.MessageTransfer? {
return (network as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(block)
return (network as InMemoryMessagingNetwork.TestMessagingService).pumpReceive(block)
}
/** Helper builder for configuring a [MockNetwork] from Java. */
@ -172,29 +172,32 @@ open class MockNetwork(private val cordappPackages: List<String>,
* Returns the single notary node on the network. Throws if there are none or more than one.
* @see notaryNodes
*/
val defaultNotaryNode: StartedNode<MockNode> get() {
return when (notaryNodes.size) {
0 -> throw IllegalStateException("There are no notaries defined on the network")
1 -> notaryNodes[0]
else -> throw IllegalStateException("There is more than one notary defined on the network")
val defaultNotaryNode: StartedNode<MockNode>
get() {
return when (notaryNodes.size) {
0 -> throw IllegalStateException("There are no notaries defined on the network")
1 -> notaryNodes[0]
else -> throw IllegalStateException("There is more than one notary defined on the network")
}
}
}
/**
* Return the identity of the default notary node.
* @see defaultNotaryNode
*/
val defaultNotaryIdentity: Party get() {
return defaultNotaryNode.info.legalIdentities.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities")
}
val defaultNotaryIdentity: Party
get() {
return defaultNotaryNode.info.legalIdentities.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities")
}
/**
* Return the identity of the default notary node.
* @see defaultNotaryNode
*/
val defaultNotaryIdentityAndCert: PartyAndCertificate get() {
return defaultNotaryNode.info.legalIdentitiesAndCerts.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities")
}
val defaultNotaryIdentityAndCert: PartyAndCertificate
get() {
return defaultNotaryNode.info.legalIdentitiesAndCerts.singleOrNull() ?: throw IllegalStateException("Default notary has multiple identities")
}
/**
* Because this executor is shared, we need to be careful about nodes shutting it down.