From 039cacae76011a4afeda134a73cb5a532ec5c0fb Mon Sep 17 00:00:00 2001 From: Andrzej Cichocki Date: Fri, 17 Nov 2017 12:24:55 +0000 Subject: [PATCH] CORDA-787 Remove unused class (#2049) * and refactor a superclass that isn't really one * move it to internal * misc refactorings --- .../ArtemisMessagingComponent.kt | 9 +-- .../messaging/MQSecurityAsNodeTest.kt | 4 +- .../services/messaging/MQSecurityTest.kt | 8 +-- .../kotlin/net/corda/node/internal/Node.kt | 2 +- .../node/services/api/AbstractNodeService.kt | 56 ------------------- .../messaging/ArtemisMessagingServer.kt | 20 ++++--- .../node/services/messaging/Messaging.kt | 13 ----- .../services/messaging/NodeMessagingClient.kt | 17 ++++-- .../node/services/messaging/RPCServer.kt | 2 +- .../node/services/messaging/RpcAuthContext.kt | 2 +- .../InMemoryTransactionVerifierService.kt | 4 +- .../OutOfProcessTransactionVerifierService.kt | 12 ++-- .../messaging/ArtemisMessagingTests.kt | 4 +- .../testing/node/InMemoryMessagingNetwork.kt | 25 +++++++-- .../corda/testing/messaging/SimpleMQClient.kt | 3 +- .../net/corda/verifier/VerifierDriver.kt | 2 +- 16 files changed, 63 insertions(+), 120 deletions(-) rename node-api/src/main/kotlin/net/corda/nodeapi/{ => internal}/ArtemisMessagingComponent.kt (89%) delete mode 100644 node/src/main/kotlin/net/corda/node/services/api/AbstractNodeService.kt diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisMessagingComponent.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt similarity index 89% rename from node-api/src/main/kotlin/net/corda/nodeapi/ArtemisMessagingComponent.kt rename to node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt index 897ea6e713..7c11cf4bb7 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/ArtemisMessagingComponent.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt @@ -1,19 +1,17 @@ -package net.corda.nodeapi +package net.corda.nodeapi.internal import net.corda.core.messaging.MessageRecipientGroup import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.serialization.CordaSerializable -import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.toBase58String -import net.corda.nodeapi.config.SSLConfiguration import java.security.PublicKey /** * The base class for Artemis services that defines shared data structures and SSL transport configuration. */ -abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() { +class ArtemisMessagingComponent { companion object { init { System.setProperty("org.jboss.logging.provider", "slf4j") @@ -66,7 +64,4 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() { data class ServiceAddress(val identity: PublicKey) : ArtemisAddress, MessageRecipientGroup { override val queueName: String = "$PEERS_PREFIX${identity.toBase58String()}" } - - /** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */ - abstract val config: SSLConfiguration? } diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt index 3f27c4e2b2..2d7b09cf1d 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt @@ -3,8 +3,8 @@ package net.corda.services.messaging import net.corda.core.crypto.Crypto import net.corda.core.internal.* import net.corda.node.utilities.* -import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER -import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.RPCApi import net.corda.nodeapi.config.SSLConfiguration import net.corda.testing.MEGA_CORP diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt index 15ba68fb24..1ddeb4497f 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -17,10 +17,10 @@ import net.corda.core.utilities.toBase58String import net.corda.core.utilities.unwrap import net.corda.node.internal.Node import net.corda.node.internal.StartedNode -import net.corda.nodeapi.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX -import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS -import net.corda.nodeapi.ArtemisMessagingComponent.Companion.P2P_QUEUE -import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEERS_PREFIX +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.RPCApi import net.corda.nodeapi.User import net.corda.nodeapi.config.SSLConfiguration diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index b631a4a806..d6850bd695 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -142,7 +142,7 @@ open class Node(configuration: NodeConfiguration, info.legalIdentities[0].owningKey, serverThread, database, - services.monitoringService, + services.monitoringService.metrics, advertisedAddress) } diff --git a/node/src/main/kotlin/net/corda/node/services/api/AbstractNodeService.kt b/node/src/main/kotlin/net/corda/node/services/api/AbstractNodeService.kt deleted file mode 100644 index 42b12629c4..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/api/AbstractNodeService.kt +++ /dev/null @@ -1,56 +0,0 @@ -package net.corda.node.services.api - -import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.serialization.deserialize -import net.corda.core.serialization.serialize -import net.corda.node.services.messaging.* -import javax.annotation.concurrent.ThreadSafe - -/** - * Abstract superclass for services that a node can host, which provides helper functions. - */ -@ThreadSafe -abstract class AbstractNodeService(val network: MessagingService) : SingletonSerializeAsToken() { - /** - * Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of - * common boilerplate code. Exceptions are caught and passed to the provided consumer. If you just want a simple - * acknowledgement response with no content, use [net.corda.core.messaging.Ack]. - * - * @param topic the topic, without the default session ID postfix (".0). - * @param handler a function to handle the deserialised request and return an optional response (if return type not Unit) - * @param exceptionConsumer a function to which any thrown exception is passed. - */ - protected inline fun - addMessageHandler(topic: String, - crossinline handler: (Q) -> R, - crossinline exceptionConsumer: (Message, Exception) -> Unit): MessageHandlerRegistration { - return network.addMessageHandler(topic, MessagingService.DEFAULT_SESSION_ID) { message, _ -> - try { - val request = message.data.deserialize() - val response = handler(request) - // If the return type R is Unit, then do not send a response - if (response.javaClass != Unit.javaClass) { - val msg = network.createMessage(topic, request.sessionID, response.serialize().bytes) - network.send(msg, request.replyTo) - } - } catch (e: Exception) { - exceptionConsumer(message, e) - } - } - } - - /** - * Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of - * common boilerplate code. Exceptions are propagated to the messaging layer. If you just want a simple - * acknowledgement response with no content, use [net.corda.core.messaging.Ack]. - * - * @param topic the topic, without the default session ID postfix (".0). - * @param handler a function to handle the deserialised request and return an optional response (if return type not Unit). - */ - protected inline fun - addMessageHandler(topic: String, - crossinline handler: (Q) -> R): MessageHandlerRegistration { - return addMessageHandler(topic, handler, { _: Message, exception: Exception -> throw exception }) - } - -} diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 19ddd89a42..8166d75080 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -11,6 +11,7 @@ import net.corda.core.internal.uncheckedCast import net.corda.core.node.NodeInfo import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache.MapChange +import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor @@ -27,8 +28,14 @@ import net.corda.node.utilities.X509Utilities.CORDA_CLIENT_TLS import net.corda.node.utilities.X509Utilities.CORDA_ROOT_CA import net.corda.node.utilities.loadKeyStore import net.corda.nodeapi.* -import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER -import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisPeerAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl import org.apache.activemq.artemis.core.config.BridgeConfiguration @@ -86,19 +93,16 @@ import javax.security.cert.CertificateException * a fully connected network, trusted network or on localhost. */ @ThreadSafe -class ArtemisMessagingServer(override val config: NodeConfiguration, - val p2pPort: Int, +class ArtemisMessagingServer(private val config: NodeConfiguration, + private val p2pPort: Int, val rpcPort: Int?, val networkMapCache: NetworkMapCache, - val userService: RPCUserService) : ArtemisMessagingComponent() { + val userService: RPCUserService) : SingletonSerializeAsToken() { companion object { private val log = loggerFor() /** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */ @JvmStatic val MAX_FILE_SIZE = 10485760 - - val ipDetectRequestProperty = "ip-request-id" - val ipDetectResponseProperty = "ip-address" } private class InnerState { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt index 1697b49047..2241e3096f 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt @@ -205,19 +205,6 @@ fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: Mess fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID(), retryId: Long? = null) = send(createMessage(topicSession, payload.serialize().bytes, uuid), to, retryId) -/** - * This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods - * on the messaging service interface until you have successfully started up the system. One of these objects should - * be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations - * may let you cast the returned future to an object that lets you get status info. - * - * A specific implementation of the controller class will have extra features that let you customise it before starting - * it up. - */ -interface MessagingServiceBuilder { - fun start(): ListenableFuture -} - interface MessageHandlerRegistration /** diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index 4ae2639b38..0720bd443a 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -1,5 +1,6 @@ package net.corda.node.services.messaging +import com.codahale.metrics.MetricRegistry import net.corda.core.crypto.random63BitValue import net.corda.core.identity.CordaX500Name import net.corda.core.internal.ThreadBox @@ -10,6 +11,7 @@ import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.TransactionVerifierService import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.serialization.serialize @@ -20,14 +22,17 @@ import net.corda.core.utilities.sequence import net.corda.core.utilities.trace import net.corda.node.VersionInfo import net.corda.node.services.RPCUserService -import net.corda.node.services.api.MonitoringService import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.VerifierType import net.corda.node.services.statemachine.StateMachineManagerImpl import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService import net.corda.node.utilities.* -import net.corda.nodeapi.ArtemisMessagingComponent +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.VerifierApi @@ -72,15 +77,15 @@ import javax.persistence.Lob * If not provided, will default to [serverAddress]. */ @ThreadSafe -class NodeMessagingClient(override val config: NodeConfiguration, +class NodeMessagingClient(private val config: NodeConfiguration, private val versionInfo: VersionInfo, private val serverAddress: NetworkHostAndPort, private val myIdentity: PublicKey, private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor, private val database: CordaPersistence, - private val monitoringService: MonitoringService, + private val metrics: MetricRegistry, advertisedAddress: NetworkHostAndPort = serverAddress -) : ArtemisMessagingComponent(), MessagingService { +) : SingletonSerializeAsToken(), MessagingService { companion object { private val log = loggerFor() @@ -560,7 +565,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, } private fun createOutOfProcessVerifierService(): TransactionVerifierService { - return object : OutOfProcessTransactionVerifierService(monitoringService) { + return object : OutOfProcessTransactionVerifierService(metrics) { override fun sendRequest(nonce: Long, transaction: LedgerTransaction) { messagingExecutor.fetchFrom { state.locked { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index 30c8d4adb1..9de779a9a9 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -31,7 +31,7 @@ import net.corda.core.utilities.seconds import net.corda.node.services.RPCUserService import net.corda.node.services.logging.pushToLoggingContext import net.corda.nodeapi.* -import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER import org.apache.activemq.artemis.api.core.Message import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RpcAuthContext.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RpcAuthContext.kt index 7631c797c6..8daa20f2a3 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RpcAuthContext.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RpcAuthContext.kt @@ -3,7 +3,7 @@ package net.corda.node.services.messaging import net.corda.client.rpc.PermissionException import net.corda.core.context.InvocationContext import net.corda.node.services.Permissions -import net.corda.nodeapi.ArtemisMessagingComponent +import net.corda.nodeapi.internal.ArtemisMessagingComponent data class RpcAuthContext(val invocation: InvocationContext, val grantedPermissions: RpcPermissions) { diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/InMemoryTransactionVerifierService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/InMemoryTransactionVerifierService.kt index 5cc77a3bec..62cef75b56 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/InMemoryTransactionVerifierService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/InMemoryTransactionVerifierService.kt @@ -1,6 +1,5 @@ package net.corda.node.services.transactions -import com.google.common.util.concurrent.MoreExecutors import net.corda.core.internal.concurrent.fork import net.corda.core.node.services.TransactionVerifierService import net.corda.core.serialization.SingletonSerializeAsToken @@ -8,7 +7,6 @@ import net.corda.core.transactions.LedgerTransaction import java.util.concurrent.Executors class InMemoryTransactionVerifierService(numberOfWorkers: Int) : SingletonSerializeAsToken(), TransactionVerifierService { - private val workerPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfWorkers)) - + private val workerPool = Executors.newFixedThreadPool(numberOfWorkers) override fun verify(transaction: LedgerTransaction) = workerPool.fork(transaction::verify) } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt index 1a6507796c..4819672f9f 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt @@ -1,6 +1,7 @@ package net.corda.node.services.transactions import com.codahale.metrics.Gauge +import com.codahale.metrics.MetricRegistry import com.codahale.metrics.Timer import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.SecureHash @@ -11,13 +12,12 @@ import net.corda.core.internal.concurrent.openFuture import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.loggerFor -import net.corda.node.services.api.MonitoringService import net.corda.nodeapi.VerifierApi import org.apache.activemq.artemis.api.core.client.ClientConsumer import java.util.concurrent.ConcurrentHashMap abstract class OutOfProcessTransactionVerifierService( - val monitoringService: MonitoringService + private val metrics: MetricRegistry ) : SingletonSerializeAsToken(), TransactionVerifierService { companion object { val log = loggerFor() @@ -34,16 +34,16 @@ abstract class OutOfProcessTransactionVerifierService( // Metrics private fun metric(name: String) = "OutOfProcessTransactionVerifierService.$name" - private val durationTimer = monitoringService.metrics.timer(metric("Verification.Duration")) - private val successMeter = monitoringService.metrics.meter(metric("Verification.Success")) - private val failureMeter = monitoringService.metrics.meter(metric("Verification.Failure")) + private val durationTimer = metrics.timer(metric("Verification.Duration")) + private val successMeter = metrics.meter(metric("Verification.Success")) + private val failureMeter = metrics.meter(metric("Verification.Failure")) class VerificationResultForUnknownTransaction(nonce: Long) : Exception("Verification result arrived for unknown transaction nonce $nonce") fun start(responseConsumer: ClientConsumer) { log.info("Starting out of process verification service") - monitoringService.metrics.register(metric("VerificationsInFlight"), Gauge { verificationHandles.size }) + metrics.register(metric("VerificationsInFlight"), Gauge { verificationHandles.size }) responseConsumer.setMessageHandler { message -> val response = VerifierApi.VerificationResponse.fromClientMessage(message) val handle = verificationHandles.remove(response.verificationId) ?: diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index 1398a3666a..ebc9e59b74 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -9,7 +9,6 @@ import net.corda.core.messaging.RPCOps import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserServiceImpl -import net.corda.node.services.api.MonitoringService import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.network.NetworkMapCacheImpl @@ -30,7 +29,6 @@ import org.junit.Before import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder -import org.mockito.Mockito.mock import java.net.ServerSocket import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit.MILLISECONDS @@ -213,7 +211,7 @@ class ArtemisMessagingTests { identity.public, ServiceAffinityExecutor("ArtemisMessagingTests", 1), database, - MonitoringService(MetricRegistry())).apply { + MetricRegistry()).apply { config.configureWithDevSSLCertificate() messagingClient = this } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index 406e72dda0..e1195084a5 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -1,8 +1,6 @@ package net.corda.testing.node -import com.google.common.util.concurrent.Futures -import com.google.common.util.concurrent.ListenableFuture -import com.google.common.util.concurrent.SettableFuture +import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.CompositeKey import net.corda.core.identity.CordaX500Name import net.corda.core.internal.ThreadBox @@ -12,6 +10,8 @@ import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate +import net.corda.core.internal.concurrent.doneFuture +import net.corda.core.internal.concurrent.openFuture import net.corda.core.node.services.PartyInfo import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken @@ -33,6 +33,19 @@ import javax.annotation.concurrent.ThreadSafe import kotlin.concurrent.schedule import kotlin.concurrent.thread +/** + * This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods + * on the messaging service interface until you have successfully started up the system. One of these objects should + * be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations + * may let you cast the returned future to an object that lets you get status info. + * + * A specific implementation of the controller class will have extra features that let you customise it before starting + * it up. + */ +interface MessagingServiceBuilder { + fun start(): CordaFuture +} + /** * An in-memory network allows you to manufacture [InMemoryMessaging]s for a set of participants. Each * [InMemoryMessaging] maintains a queue of messages it has received, and a background thread that dispatches @@ -190,7 +203,7 @@ class InMemoryMessagingNetwork( val serviceHandles: List, val executor: AffinityExecutor, val database: CordaPersistence) : MessagingServiceBuilder { - override fun start(): ListenableFuture { + override fun start(): CordaFuture { synchronized(this@InMemoryMessagingNetwork) { val node = InMemoryMessaging(manuallyPumped, id, executor, database) handleEndpointMap[id] = node @@ -198,7 +211,7 @@ class InMemoryMessagingNetwork( serviceToPeersMapping.getOrPut(it) { LinkedHashSet() }.add(id) Unit } - return Futures.immediateFuture(node) + return doneFuture(node) } } } @@ -244,7 +257,7 @@ class InMemoryMessagingNetwork( log.trace { transfer.toString() } val calc = latencyCalculator if (calc != null && transfer.recipients is SingleMessageRecipient) { - val messageSent = SettableFuture.create() + val messageSent = openFuture() // Inject some artificial latency. timer.schedule(calc.between(transfer.sender, transfer.recipients).toMillis()) { pumpSendInternal(transfer) diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt index 6109baffc8..705f150f89 100644 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt +++ b/testing/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt @@ -3,7 +3,6 @@ package net.corda.testing.messaging import net.corda.core.identity.CordaX500Name import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.utilities.NetworkHostAndPort -import net.corda.nodeapi.ArtemisMessagingComponent import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.config.SSLConfiguration @@ -14,7 +13,7 @@ import org.apache.activemq.artemis.api.core.client.* * As the name suggests this is a simple client for connecting to MQ brokers. */ class SimpleMQClient(val target: NetworkHostAndPort, - override val config: SSLConfiguration? = configureTestSSL(DEFAULT_MQ_LEGAL_NAME)) : ArtemisMessagingComponent() { + private val config: SSLConfiguration? = configureTestSSL(DEFAULT_MQ_LEGAL_NAME)) { companion object { val DEFAULT_MQ_LEGAL_NAME = CordaX500Name(organisation = "SimpleMQClient", locality = "London", country = "GB") } diff --git a/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt b/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt index ecacae7cdb..1a53162b8b 100644 --- a/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt +++ b/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt @@ -16,7 +16,7 @@ import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.loggerFor import net.corda.node.services.config.configureDevKeyAndTrustStores -import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.VerifierApi