From 55e4688cc5598b5dfd2501787203fa117d8d4b15 Mon Sep 17 00:00:00 2001 From: Andrzej Cichocki Date: Mon, 20 Nov 2017 10:33:13 +0000 Subject: [PATCH] CORDA-787 Split NodeMessagingClient into 3 (#2063) --- .../kotlin/net/corda/node/internal/Node.kt | 40 +++-- .../messaging/ArtemisMessagingClient.kt | 58 +++++++ ...ssagingClient.kt => P2PMessagingClient.kt} | 149 +++--------------- .../services/messaging/RPCMessagingClient.kt | 29 ++++ .../messaging/VerifierMessagingClient.kt | 73 +++++++++ .../node/services/schema/NodeSchemaService.kt | 6 +- .../OutOfProcessTransactionVerifierService.kt | 7 +- .../messaging/ArtemisMessagingTests.kt | 23 +-- 8 files changed, 227 insertions(+), 158 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingClient.kt rename node/src/main/kotlin/net/corda/node/services/messaging/{NodeMessagingClient.kt => P2PMessagingClient.kt} (76%) create mode 100644 node/src/main/kotlin/net/corda/node/services/messaging/RPCMessagingClient.kt create mode 100644 node/src/main/kotlin/net/corda/node/services/messaging/VerifierMessagingClient.kt diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 534a63c344..cd0b5bb712 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -8,6 +8,7 @@ import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.RPCOps import net.corda.core.node.NodeInfo import net.corda.core.node.ServiceHub +import net.corda.core.node.services.TransactionVerifierService import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.loggerFor import net.corda.core.serialization.internal.SerializationEnvironmentImpl @@ -19,9 +20,9 @@ import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserServiceImpl import net.corda.node.services.api.SchemaService import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.messaging.ArtemisMessagingServer -import net.corda.node.services.messaging.MessagingService -import net.corda.node.services.messaging.NodeMessagingClient +import net.corda.node.services.config.VerifierType +import net.corda.node.services.messaging.* +import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.utilities.AddressUtils import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.CordaPersistence @@ -80,7 +81,10 @@ open class Node(configuration: NodeConfiguration, } override val log: Logger get() = logger - override fun makeTransactionVerifierService() = (network as NodeMessagingClient).verifierService + override fun makeTransactionVerifierService(): TransactionVerifierService = when (configuration.verifierType) { + VerifierType.OutOfProcess -> verifierMessagingClient!!.verifierService + VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4) + } private val sameVmNodeNumber = sameVmNodeCounter.incrementAndGet() // Under normal (non-test execution) it will always be "1" @@ -135,15 +139,18 @@ open class Node(configuration: NodeConfiguration, val advertisedAddress = info.addresses.single() printBasicNodeInfo("Incoming connection address", advertisedAddress.toString()) - - return NodeMessagingClient( + rpcMessagingClient = RPCMessagingClient(configuration, serverAddress) + verifierMessagingClient = when (configuration.verifierType) { + VerifierType.OutOfProcess -> VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics) + VerifierType.InMemory -> null + } + return P2PMessagingClient( configuration, versionInfo, serverAddress, info.legalIdentities[0].owningKey, serverThread, database, - services.monitoringService.metrics, advertisedAddress) } @@ -198,9 +205,16 @@ open class Node(configuration: NodeConfiguration, runOnStop += this::stop start() } - - // Start up the MQ client. - (network as NodeMessagingClient).start(rpcOps, userService) + // Start up the MQ clients. + rpcMessagingClient.run { + start(rpcOps, userService) + runOnStop += this::stop + } + verifierMessagingClient?.run { + start() + runOnStop += this::stop + } + (network as P2PMessagingClient).start() } /** @@ -290,9 +304,13 @@ open class Node(configuration: NodeConfiguration, checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader)) } + private lateinit var rpcMessagingClient: RPCMessagingClient + private var verifierMessagingClient: VerifierMessagingClient? = null /** Starts a blocking event loop for message dispatch. */ fun run() { - (network as NodeMessagingClient).run(messageBroker!!.serverControl) + rpcMessagingClient.start2(messageBroker!!.serverControl) + verifierMessagingClient?.start2() + (network as P2PMessagingClient).run() } private var shutdown = false diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingClient.kt new file mode 100644 index 0000000000..f377087b90 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingClient.kt @@ -0,0 +1,58 @@ +package net.corda.node.services.messaging + +import net.corda.core.serialization.internal.nodeSerializationEnv +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.loggerFor +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.nodeapi.ArtemisTcpTransport +import net.corda.nodeapi.ConnectionDirection +import net.corda.nodeapi.config.SSLConfiguration +import org.apache.activemq.artemis.api.core.client.* +import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE + +class ArtemisMessagingClient(private val config: SSLConfiguration, private val serverAddress: NetworkHostAndPort) { + companion object { + private val log = loggerFor() + } + + class Started(val sessionFactory: ClientSessionFactory, val session: ClientSession, val producer: ClientProducer) + + var started: Started? = null + private set + + fun start(): Started = synchronized(this) { + check(started == null) { "start can't be called twice" } + log.info("Connecting to message broker: $serverAddress") + // TODO Add broker CN to config for host verification in case the embedded broker isn't used + val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config) + val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { + // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this + // would be the default and the two lines below can be deleted. + connectionTTL = -1 + clientFailureCheckPeriod = -1 + minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE + isUseGlobalPools = nodeSerializationEnv != null + } + val sessionFactory = locator.createSessionFactory() + // Login using the node username. The broker will authentiate us as its node (as opposed to another peer) + // using our TLS certificate. + // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer + // size of 1MB is acknowledged. + val session = sessionFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE) + session.start() + // Create a general purpose producer. + val producer = session.createProducer() + return Started(sessionFactory, session, producer).also { started = it } + } + + fun stop() = synchronized(this) { + started!!.run { + producer.close() + // Ensure any trailing messages are committed to the journal + session.commit() + // Closing the factory closes all the sessions it produced as well. + sessionFactory.close() + } + started = null + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt similarity index 76% rename from node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt rename to node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 0720bd443a..2b21ae766a 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -1,50 +1,32 @@ package net.corda.node.services.messaging -import com.codahale.metrics.MetricRegistry -import net.corda.core.crypto.random63BitValue import net.corda.core.identity.CordaX500Name import net.corda.core.internal.ThreadBox import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.MessageRecipients -import net.corda.core.messaging.RPCOps import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.services.PartyInfo -import net.corda.core.node.services.TransactionVerifierService import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize -import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.serialization.serialize -import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.loggerFor import net.corda.core.utilities.sequence import net.corda.core.utilities.trace import net.corda.node.VersionInfo -import net.corda.node.services.RPCUserService import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.config.VerifierType import net.corda.node.services.statemachine.StateMachineManagerImpl -import net.corda.node.services.transactions.InMemoryTransactionVerifierService -import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService import net.corda.node.utilities.* -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress -import net.corda.nodeapi.ArtemisTcpTransport -import net.corda.nodeapi.ConnectionDirection -import net.corda.nodeapi.VerifierApi -import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME -import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.Message.* import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.* -import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE -import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl import java.security.PublicKey import java.time.Instant import java.util.* @@ -77,18 +59,16 @@ import javax.persistence.Lob * If not provided, will default to [serverAddress]. */ @ThreadSafe -class NodeMessagingClient(private val config: NodeConfiguration, - private val versionInfo: VersionInfo, - private val serverAddress: NetworkHostAndPort, - private val myIdentity: PublicKey, - private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor, - private val database: CordaPersistence, - private val metrics: MetricRegistry, - advertisedAddress: NetworkHostAndPort = serverAddress +class P2PMessagingClient(config: NodeConfiguration, + private val versionInfo: VersionInfo, + serverAddress: NetworkHostAndPort, + private val myIdentity: PublicKey, + private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor, + private val database: CordaPersistence, + advertisedAddress: NetworkHostAndPort = serverAddress ) : SingletonSerializeAsToken(), MessagingService { companion object { - private val log = loggerFor() - + private val log = loggerFor() // This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic". // We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint // that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid @@ -99,8 +79,6 @@ class NodeMessagingClient(private val config: NodeConfiguration, private val releaseVersionProperty = SimpleString("release-version") private val platformVersionProperty = SimpleString("platform-version") private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt() - private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}" - private val messageMaxRetryCount: Int = 3 fun createProcessedMessage(): AppendOnlyPersistentMap { @@ -144,15 +122,8 @@ class NodeMessagingClient(private val config: NodeConfiguration, } private class InnerState { - var started = false var running = false - var producer: ClientProducer? = null var p2pConsumer: ClientConsumer? = null - var session: ClientSession? = null - var sessionFactory: ClientSessionFactory? = null - var rpcServer: RPCServer? = null - // Consumer for inbound client RPC messages. - var verificationResponseConsumer: ClientConsumer? = null } private val messagesToRedeliver = database.transaction { @@ -161,11 +132,6 @@ class NodeMessagingClient(private val config: NodeConfiguration, private val scheduledMessageRedeliveries = ConcurrentHashMap>() - val verifierService = when (config.verifierType) { - VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4) - VerifierType.OutOfProcess -> createOutOfProcessVerifierService() - } - /** A registration to handle messages of different types */ data class Handler(val topicSession: TopicSession, val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration @@ -176,7 +142,8 @@ class NodeMessagingClient(private val config: NodeConfiguration, private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1) override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress) - + private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong() + private val artemis = ArtemisMessagingClient(config, serverAddress) private val state = ThreadBox(InnerState()) private val handlers = CopyOnWriteArrayList() @@ -209,54 +176,11 @@ class NodeMessagingClient(private val config: NodeConfiguration, var recipients: ByteArray = ByteArray(0) ) - fun start(rpcOps: RPCOps, userService: RPCUserService) { + fun start() { state.locked { - check(!started) { "start can't be called twice" } - started = true - - log.info("Connecting to message broker: $serverAddress") - // TODO Add broker CN to config for host verification in case the embedded broker isn't used - val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config) - val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { - // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this - // would be the default and the two lines below can be deleted. - connectionTTL = -1 - clientFailureCheckPeriod = -1 - minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE - isUseGlobalPools = nodeSerializationEnv != null - } - sessionFactory = locator.createSessionFactory() - - // Login using the node username. The broker will authentiate us as its node (as opposed to another peer) - // using our TLS certificate. - // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer - // size of 1MB is acknowledged. - val session = sessionFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE) - this.session = session - session.start() - - // Create a general purpose producer. - val producer = session.createProducer() - this.producer = producer - + val session = artemis.start().session // Create a queue, consumer and producer for handling P2P network messages. p2pConsumer = session.createConsumer(P2P_QUEUE) - - val myCert = loadKeyStore(config.sslKeystore, config.keyStorePassword).getX509Certificate(X509Utilities.CORDA_CLIENT_TLS) - rpcServer = RPCServer(rpcOps, NODE_USER, NODE_USER, locator, userService, CordaX500Name.build(myCert.subjectX500Principal)) - - fun checkVerifierCount() { - if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) { - log.warn("No connected verifier listening on $VERIFICATION_REQUESTS_QUEUE_NAME!") - } - } - - if (config.verifierType == VerifierType.OutOfProcess) { - createQueueIfAbsent(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME) - createQueueIfAbsent(verifierResponseAddress) - verificationResponseConsumer = session.createConsumer(verifierResponseAddress) - messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS) - } } resumeMessageRedelivery() @@ -309,14 +233,12 @@ class NodeMessagingClient(private val config: NodeConfiguration, /** * Starts the p2p event loop: this method only returns once [stop] has been called. */ - fun run(serverControl: ActiveMQServerControl) { + fun run() { try { val consumer = state.locked { - check(started) { "start must be called first" } + check(artemis.started != null) { "start must be called first" } check(!running) { "run can't be called twice" } running = true - rpcServer!!.start(serverControl) - (verifierService as? OutOfProcessTransactionVerifierService)?.start(verificationResponseConsumer!!) // If it's null, it means we already called stop, so return immediately. p2pConsumer ?: return } @@ -404,7 +326,7 @@ class NodeMessagingClient(private val config: NodeConfiguration, override fun stop() { val running = state.locked { // We allow stop() to be called without a run() in between, but it must have at least been started. - check(started) + check(artemis.started != null) val prevRunning = running running = false val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice") @@ -423,13 +345,7 @@ class NodeMessagingClient(private val config: NodeConfiguration, // Only first caller to gets running true to protect against double stop, which seems to happen in some integration tests. if (running) { state.locked { - producer?.close() - producer = null - // Ensure any trailing messages are committed to the journal - session!!.commit() - // Closing the factory closes all the sessions it produced as well. - sessionFactory!!.close() - sessionFactory = null + artemis.stop() } } } @@ -440,7 +356,8 @@ class NodeMessagingClient(private val config: NodeConfiguration, messagingExecutor.fetchFrom { state.locked { val mqAddress = getMQAddress(target) - val artemisMessage = session!!.createMessage(true).apply { + val artemis = artemis.started!! + val artemisMessage = artemis.session.createMessage(true).apply { putStringProperty(cordaVendorProperty, cordaVendor) putStringProperty(releaseVersionProperty, releaseVersion) putIntProperty(platformVersionProperty, versionInfo.platformVersion) @@ -459,15 +376,14 @@ class NodeMessagingClient(private val config: NodeConfiguration, "Send to: $mqAddress topic: ${message.topicSession.topic} " + "sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}" } - producer!!.send(mqAddress, artemisMessage) - + artemis.producer.send(mqAddress, artemisMessage) retryId?.let { database.transaction { messagesToRedeliver.computeIfAbsent(it, { Pair(message, target) }) } scheduledMessageRedeliveries[it] = messagingExecutor.schedule({ sendWithRetry(0, mqAddress, artemisMessage, it) - }, config.messageRedeliveryDelaySeconds.toLong(), TimeUnit.SECONDS) + }, messageRedeliveryDelaySeconds, TimeUnit.SECONDS) } } @@ -498,12 +414,12 @@ class NodeMessagingClient(private val config: NodeConfiguration, state.locked { log.trace { "Retry #$retryCount sending message $message to $address for $retryId" } - producer!!.send(address, message) + artemis.started!!.producer.send(address, message) } scheduledMessageRedeliveries[retryId] = messagingExecutor.schedule({ sendWithRetry(retryCount + 1, address, message, retryId) - }, config.messageRedeliveryDelaySeconds.toLong(), TimeUnit.SECONDS) + }, messageRedeliveryDelaySeconds, TimeUnit.SECONDS) } override fun cancelRedelivery(retryId: Long) { @@ -533,10 +449,11 @@ class NodeMessagingClient(private val config: NodeConfiguration, /** Attempts to create a durable queue on the broker which is bound to an address of the same name. */ private fun createQueueIfAbsent(queueName: String) { state.alreadyLocked { - val queueQuery = session!!.queueQuery(SimpleString(queueName)) + val session = artemis.started!!.session + val queueQuery = session.queueQuery(SimpleString(queueName)) if (!queueQuery.isExists) { log.info("Create fresh queue $queueName bound on same address") - session!!.createQueue(queueName, RoutingType.MULTICAST, queueName, true) + session.createQueue(queueName, RoutingType.MULTICAST, queueName, true) } } } @@ -564,22 +481,6 @@ class NodeMessagingClient(private val config: NodeConfiguration, return NodeClientMessage(topicSession, data, uuid) } - private fun createOutOfProcessVerifierService(): TransactionVerifierService { - return object : OutOfProcessTransactionVerifierService(metrics) { - override fun sendRequest(nonce: Long, transaction: LedgerTransaction) { - messagingExecutor.fetchFrom { - state.locked { - val message = session!!.createMessage(false) - val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress)) - request.writeToClientMessage(message) - producer!!.send(VERIFICATION_REQUESTS_QUEUE_NAME, message) - } - } - } - - } - } - // TODO Rethink PartyInfo idea and merging PeerAddress/ServiceAddress (the only difference is that Service address doesn't hold host and port) override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients { return when (partyInfo) { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCMessagingClient.kt new file mode 100644 index 0000000000..4aeb8339fa --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCMessagingClient.kt @@ -0,0 +1,29 @@ +package net.corda.node.services.messaging + +import net.corda.core.identity.CordaX500Name +import net.corda.core.messaging.RPCOps +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.services.RPCUserService +import net.corda.node.utilities.* +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.nodeapi.config.SSLConfiguration +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl + +class RPCMessagingClient(private val config: SSLConfiguration, serverAddress: NetworkHostAndPort) : SingletonSerializeAsToken() { + private val artemis = ArtemisMessagingClient(config, serverAddress) + private var rpcServer: RPCServer? = null + fun start(rpcOps: RPCOps, userService: RPCUserService) = synchronized(this) { + val locator = artemis.start().sessionFactory.serverLocator + val myCert = loadKeyStore(config.sslKeystore, config.keyStorePassword).getX509Certificate(X509Utilities.CORDA_CLIENT_TLS) + rpcServer = RPCServer(rpcOps, NODE_USER, NODE_USER, locator, userService, CordaX500Name.build(myCert.subjectX500Principal)) + } + + fun start2(serverControl: ActiveMQServerControl) = synchronized(this) { + rpcServer!!.start(serverControl) + } + + fun stop() = synchronized(this) { + artemis.stop() + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/VerifierMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/VerifierMessagingClient.kt new file mode 100644 index 0000000000..d916e63cc5 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/messaging/VerifierMessagingClient.kt @@ -0,0 +1,73 @@ +package net.corda.node.services.messaging + +import com.codahale.metrics.MetricRegistry +import net.corda.core.crypto.random63BitValue +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.loggerFor +import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService +import net.corda.node.utilities.* +import net.corda.nodeapi.VerifierApi +import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME +import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX +import net.corda.nodeapi.config.SSLConfiguration +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.* +import java.util.concurrent.* + +class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHostAndPort, metrics: MetricRegistry) : SingletonSerializeAsToken() { + companion object { + private val log = loggerFor() + private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}" + } + + private val artemis = ArtemisMessagingClient(config, serverAddress) + /** An executor for sending messages */ + private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1) + private var verificationResponseConsumer: ClientConsumer? = null + fun start(): Unit = synchronized(this) { + val session = artemis.start().session + fun checkVerifierCount() { + if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) { + log.warn("No connected verifier listening on $VERIFICATION_REQUESTS_QUEUE_NAME!") + } + } + + // Attempts to create a durable queue on the broker which is bound to an address of the same name. + fun createQueueIfAbsent(queueName: String) { + val queueQuery = session.queueQuery(SimpleString(queueName)) + if (!queueQuery.isExists) { + log.info("Create fresh queue $queueName bound on same address") + session.createQueue(queueName, RoutingType.MULTICAST, queueName, true) + } + } + createQueueIfAbsent(VERIFICATION_REQUESTS_QUEUE_NAME) + createQueueIfAbsent(verifierResponseAddress) + verificationResponseConsumer = session.createConsumer(verifierResponseAddress) + messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS) + } + + fun start2() = synchronized(this) { + verifierService.start(verificationResponseConsumer!!) + } + + fun stop() = synchronized(this) { + artemis.stop() + } + + internal val verifierService = OutOfProcessTransactionVerifierService(metrics) { nonce, transaction -> + messagingExecutor.fetchFrom { + sendRequest(nonce, transaction) + } + } + + private fun sendRequest(nonce: Long, transaction: LedgerTransaction) = synchronized(this) { + val started = artemis.started!! + val message = started.session.createMessage(false) + val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress)) + request.writeToClientMessage(message) + started.producer.send(VERIFICATION_REQUESTS_QUEUE_NAME, message) + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index 00cb1b5f79..99282681b8 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -14,7 +14,7 @@ import net.corda.node.services.api.SchemaService import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.PersistentKeyManagementService -import net.corda.node.services.messaging.NodeMessagingClient +import net.corda.node.services.messaging.P2PMessagingClient import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.DBTransactionMappingStorage import net.corda.node.services.persistence.DBTransactionStorage @@ -47,8 +47,8 @@ class NodeSchemaService(cordappLoader: CordappLoader?) : SchemaService, Singleto PersistentUniquenessProvider.PersistentNotaryCommit::class.java, NodeSchedulerService.PersistentScheduledState::class.java, NodeAttachmentService.DBAttachment::class.java, - NodeMessagingClient.ProcessedMessage::class.java, - NodeMessagingClient.RetryMessage::class.java, + P2PMessagingClient.ProcessedMessage::class.java, + P2PMessagingClient.RetryMessage::class.java, NodeAttachmentService.DBAttachment::class.java, RaftUniquenessProvider.RaftState::class.java, BFTNonValidatingNotaryService.PersistedCommittedState::class.java, diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt index 4819672f9f..3fefbf9822 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt @@ -16,8 +16,9 @@ import net.corda.nodeapi.VerifierApi import org.apache.activemq.artemis.api.core.client.ClientConsumer import java.util.concurrent.ConcurrentHashMap -abstract class OutOfProcessTransactionVerifierService( - private val metrics: MetricRegistry +class OutOfProcessTransactionVerifierService( + private val metrics: MetricRegistry, + private val sendRequest: (Long, LedgerTransaction) -> Unit ) : SingletonSerializeAsToken(), TransactionVerifierService { companion object { val log = loggerFor() @@ -60,8 +61,6 @@ abstract class OutOfProcessTransactionVerifierService( } } - abstract fun sendRequest(nonce: Long, transaction: LedgerTransaction) - override fun verify(transaction: LedgerTransaction): CordaFuture<*> { log.info("Verifying ${transaction.id}") val future = openFuture() diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index ebc9e59b74..225ea6952f 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -1,11 +1,9 @@ package net.corda.node.services.messaging -import com.codahale.metrics.MetricRegistry import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.generateKeyPair import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.openFuture -import net.corda.core.messaging.RPCOps import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserServiceImpl @@ -58,16 +56,10 @@ class ArtemisMessagingTests { private lateinit var database: CordaPersistence private lateinit var userService: RPCUserService private lateinit var networkMapRegistrationFuture: CordaFuture - - private var messagingClient: NodeMessagingClient? = null + private var messagingClient: P2PMessagingClient? = null private var messagingServer: ArtemisMessagingServer? = null private lateinit var networkMapCache: NetworkMapCacheImpl - - private val rpcOps = object : RPCOps { - override val protocolVersion: Int get() = throw UnsupportedOperationException() - } - @Before fun setUp() { val baseDirectory = temporaryFolder.root.toPath() @@ -186,10 +178,10 @@ class ArtemisMessagingTests { } private fun startNodeMessagingClient() { - messagingClient!!.start(rpcOps, userService) + messagingClient!!.start() } - private fun createAndStartClientAndServer(receivedMessages: LinkedBlockingQueue): NodeMessagingClient { + private fun createAndStartClientAndServer(receivedMessages: LinkedBlockingQueue): P2PMessagingClient { createMessagingServer().start() val messagingClient = createMessagingClient() @@ -198,20 +190,19 @@ class ArtemisMessagingTests { receivedMessages.add(message) } // Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered. - thread { messagingClient.run(messagingServer!!.serverControl) } + thread { messagingClient.run() } return messagingClient } - private fun createMessagingClient(server: NetworkHostAndPort = NetworkHostAndPort("localhost", serverPort)): NodeMessagingClient { + private fun createMessagingClient(server: NetworkHostAndPort = NetworkHostAndPort("localhost", serverPort)): P2PMessagingClient { return database.transaction { - NodeMessagingClient( + P2PMessagingClient( config, MOCK_VERSION_INFO, server, identity.public, ServiceAffinityExecutor("ArtemisMessagingTests", 1), - database, - MetricRegistry()).apply { + database).apply { config.configureWithDevSSLCertificate() messagingClient = this }