mirror of
https://github.com/corda/corda.git
synced 2025-02-20 09:26:41 +00:00
CORDA-787 Split NodeMessagingClient into 3 (#2063)
This commit is contained in:
parent
332915f08b
commit
55e4688cc5
@ -8,6 +8,7 @@ import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.services.TransactionVerifierService
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
|
||||
@ -19,9 +20,9 @@ import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.RPCUserServiceImpl
|
||||
import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.config.VerifierType
|
||||
import net.corda.node.services.messaging.*
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
import net.corda.node.utilities.AddressUtils
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.CordaPersistence
|
||||
@ -80,7 +81,10 @@ open class Node(configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
override val log: Logger get() = logger
|
||||
override fun makeTransactionVerifierService() = (network as NodeMessagingClient).verifierService
|
||||
override fun makeTransactionVerifierService(): TransactionVerifierService = when (configuration.verifierType) {
|
||||
VerifierType.OutOfProcess -> verifierMessagingClient!!.verifierService
|
||||
VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4)
|
||||
}
|
||||
|
||||
private val sameVmNodeNumber = sameVmNodeCounter.incrementAndGet() // Under normal (non-test execution) it will always be "1"
|
||||
|
||||
@ -135,15 +139,18 @@ open class Node(configuration: NodeConfiguration,
|
||||
val advertisedAddress = info.addresses.single()
|
||||
|
||||
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())
|
||||
|
||||
return NodeMessagingClient(
|
||||
rpcMessagingClient = RPCMessagingClient(configuration, serverAddress)
|
||||
verifierMessagingClient = when (configuration.verifierType) {
|
||||
VerifierType.OutOfProcess -> VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics)
|
||||
VerifierType.InMemory -> null
|
||||
}
|
||||
return P2PMessagingClient(
|
||||
configuration,
|
||||
versionInfo,
|
||||
serverAddress,
|
||||
info.legalIdentities[0].owningKey,
|
||||
serverThread,
|
||||
database,
|
||||
services.monitoringService.metrics,
|
||||
advertisedAddress)
|
||||
}
|
||||
|
||||
@ -198,9 +205,16 @@ open class Node(configuration: NodeConfiguration,
|
||||
runOnStop += this::stop
|
||||
start()
|
||||
}
|
||||
|
||||
// Start up the MQ client.
|
||||
(network as NodeMessagingClient).start(rpcOps, userService)
|
||||
// Start up the MQ clients.
|
||||
rpcMessagingClient.run {
|
||||
start(rpcOps, userService)
|
||||
runOnStop += this::stop
|
||||
}
|
||||
verifierMessagingClient?.run {
|
||||
start()
|
||||
runOnStop += this::stop
|
||||
}
|
||||
(network as P2PMessagingClient).start()
|
||||
}
|
||||
|
||||
/**
|
||||
@ -290,9 +304,13 @@ open class Node(configuration: NodeConfiguration,
|
||||
checkpointContext = KRYO_CHECKPOINT_CONTEXT.withClassLoader(classloader))
|
||||
}
|
||||
|
||||
private lateinit var rpcMessagingClient: RPCMessagingClient
|
||||
private var verifierMessagingClient: VerifierMessagingClient? = null
|
||||
/** Starts a blocking event loop for message dispatch. */
|
||||
fun run() {
|
||||
(network as NodeMessagingClient).run(messageBroker!!.serverControl)
|
||||
rpcMessagingClient.start2(messageBroker!!.serverControl)
|
||||
verifierMessagingClient?.start2()
|
||||
(network as P2PMessagingClient).run()
|
||||
}
|
||||
|
||||
private var shutdown = false
|
||||
|
@ -0,0 +1,58 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
|
||||
class ArtemisMessagingClient(private val config: SSLConfiguration, private val serverAddress: NetworkHostAndPort) {
|
||||
companion object {
|
||||
private val log = loggerFor<ArtemisMessagingClient>()
|
||||
}
|
||||
|
||||
class Started(val sessionFactory: ClientSessionFactory, val session: ClientSession, val producer: ClientProducer)
|
||||
|
||||
var started: Started? = null
|
||||
private set
|
||||
|
||||
fun start(): Started = synchronized(this) {
|
||||
check(started == null) { "start can't be called twice" }
|
||||
log.info("Connecting to message broker: $serverAddress")
|
||||
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
|
||||
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config)
|
||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||
// would be the default and the two lines below can be deleted.
|
||||
connectionTTL = -1
|
||||
clientFailureCheckPeriod = -1
|
||||
minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE
|
||||
isUseGlobalPools = nodeSerializationEnv != null
|
||||
}
|
||||
val sessionFactory = locator.createSessionFactory()
|
||||
// Login using the node username. The broker will authentiate us as its node (as opposed to another peer)
|
||||
// using our TLS certificate.
|
||||
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
|
||||
// size of 1MB is acknowledged.
|
||||
val session = sessionFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE)
|
||||
session.start()
|
||||
// Create a general purpose producer.
|
||||
val producer = session.createProducer()
|
||||
return Started(sessionFactory, session, producer).also { started = it }
|
||||
}
|
||||
|
||||
fun stop() = synchronized(this) {
|
||||
started!!.run {
|
||||
producer.close()
|
||||
// Ensure any trailing messages are committed to the journal
|
||||
session.commit()
|
||||
// Closing the factory closes all the sessions it produced as well.
|
||||
sessionFactory.close()
|
||||
}
|
||||
started = null
|
||||
}
|
||||
}
|
@ -1,50 +1,32 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.node.services.TransactionVerifierService
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.sequence
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.VerifierType
|
||||
import net.corda.node.services.statemachine.StateMachineManagerImpl
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
|
||||
import net.corda.node.utilities.*
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
|
||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.Message.*
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
@ -77,18 +59,16 @@ import javax.persistence.Lob
|
||||
* If not provided, will default to [serverAddress].
|
||||
*/
|
||||
@ThreadSafe
|
||||
class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
private val versionInfo: VersionInfo,
|
||||
private val serverAddress: NetworkHostAndPort,
|
||||
private val myIdentity: PublicKey,
|
||||
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
|
||||
private val database: CordaPersistence,
|
||||
private val metrics: MetricRegistry,
|
||||
advertisedAddress: NetworkHostAndPort = serverAddress
|
||||
class P2PMessagingClient(config: NodeConfiguration,
|
||||
private val versionInfo: VersionInfo,
|
||||
serverAddress: NetworkHostAndPort,
|
||||
private val myIdentity: PublicKey,
|
||||
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
|
||||
private val database: CordaPersistence,
|
||||
advertisedAddress: NetworkHostAndPort = serverAddress
|
||||
) : SingletonSerializeAsToken(), MessagingService {
|
||||
companion object {
|
||||
private val log = loggerFor<NodeMessagingClient>()
|
||||
|
||||
private val log = loggerFor<P2PMessagingClient>()
|
||||
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
|
||||
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
|
||||
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
|
||||
@ -99,8 +79,6 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
private val releaseVersionProperty = SimpleString("release-version")
|
||||
private val platformVersionProperty = SimpleString("platform-version")
|
||||
private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt()
|
||||
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
|
||||
|
||||
private val messageMaxRetryCount: Int = 3
|
||||
|
||||
fun createProcessedMessage(): AppendOnlyPersistentMap<UUID, Instant, ProcessedMessage, String> {
|
||||
@ -144,15 +122,8 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
}
|
||||
|
||||
private class InnerState {
|
||||
var started = false
|
||||
var running = false
|
||||
var producer: ClientProducer? = null
|
||||
var p2pConsumer: ClientConsumer? = null
|
||||
var session: ClientSession? = null
|
||||
var sessionFactory: ClientSessionFactory? = null
|
||||
var rpcServer: RPCServer? = null
|
||||
// Consumer for inbound client RPC messages.
|
||||
var verificationResponseConsumer: ClientConsumer? = null
|
||||
}
|
||||
|
||||
private val messagesToRedeliver = database.transaction {
|
||||
@ -161,11 +132,6 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
|
||||
private val scheduledMessageRedeliveries = ConcurrentHashMap<Long, ScheduledFuture<*>>()
|
||||
|
||||
val verifierService = when (config.verifierType) {
|
||||
VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4)
|
||||
VerifierType.OutOfProcess -> createOutOfProcessVerifierService()
|
||||
}
|
||||
|
||||
/** A registration to handle messages of different types */
|
||||
data class Handler(val topicSession: TopicSession,
|
||||
val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
@ -176,7 +142,8 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
|
||||
|
||||
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
|
||||
|
||||
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
|
||||
private val artemis = ArtemisMessagingClient(config, serverAddress)
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val handlers = CopyOnWriteArrayList<Handler>()
|
||||
|
||||
@ -209,54 +176,11 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
var recipients: ByteArray = ByteArray(0)
|
||||
)
|
||||
|
||||
fun start(rpcOps: RPCOps, userService: RPCUserService) {
|
||||
fun start() {
|
||||
state.locked {
|
||||
check(!started) { "start can't be called twice" }
|
||||
started = true
|
||||
|
||||
log.info("Connecting to message broker: $serverAddress")
|
||||
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
|
||||
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config)
|
||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||
// would be the default and the two lines below can be deleted.
|
||||
connectionTTL = -1
|
||||
clientFailureCheckPeriod = -1
|
||||
minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE
|
||||
isUseGlobalPools = nodeSerializationEnv != null
|
||||
}
|
||||
sessionFactory = locator.createSessionFactory()
|
||||
|
||||
// Login using the node username. The broker will authentiate us as its node (as opposed to another peer)
|
||||
// using our TLS certificate.
|
||||
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
|
||||
// size of 1MB is acknowledged.
|
||||
val session = sessionFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE)
|
||||
this.session = session
|
||||
session.start()
|
||||
|
||||
// Create a general purpose producer.
|
||||
val producer = session.createProducer()
|
||||
this.producer = producer
|
||||
|
||||
val session = artemis.start().session
|
||||
// Create a queue, consumer and producer for handling P2P network messages.
|
||||
p2pConsumer = session.createConsumer(P2P_QUEUE)
|
||||
|
||||
val myCert = loadKeyStore(config.sslKeystore, config.keyStorePassword).getX509Certificate(X509Utilities.CORDA_CLIENT_TLS)
|
||||
rpcServer = RPCServer(rpcOps, NODE_USER, NODE_USER, locator, userService, CordaX500Name.build(myCert.subjectX500Principal))
|
||||
|
||||
fun checkVerifierCount() {
|
||||
if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) {
|
||||
log.warn("No connected verifier listening on $VERIFICATION_REQUESTS_QUEUE_NAME!")
|
||||
}
|
||||
}
|
||||
|
||||
if (config.verifierType == VerifierType.OutOfProcess) {
|
||||
createQueueIfAbsent(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME)
|
||||
createQueueIfAbsent(verifierResponseAddress)
|
||||
verificationResponseConsumer = session.createConsumer(verifierResponseAddress)
|
||||
messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
resumeMessageRedelivery()
|
||||
@ -309,14 +233,12 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
/**
|
||||
* Starts the p2p event loop: this method only returns once [stop] has been called.
|
||||
*/
|
||||
fun run(serverControl: ActiveMQServerControl) {
|
||||
fun run() {
|
||||
try {
|
||||
val consumer = state.locked {
|
||||
check(started) { "start must be called first" }
|
||||
check(artemis.started != null) { "start must be called first" }
|
||||
check(!running) { "run can't be called twice" }
|
||||
running = true
|
||||
rpcServer!!.start(serverControl)
|
||||
(verifierService as? OutOfProcessTransactionVerifierService)?.start(verificationResponseConsumer!!)
|
||||
// If it's null, it means we already called stop, so return immediately.
|
||||
p2pConsumer ?: return
|
||||
}
|
||||
@ -404,7 +326,7 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
override fun stop() {
|
||||
val running = state.locked {
|
||||
// We allow stop() to be called without a run() in between, but it must have at least been started.
|
||||
check(started)
|
||||
check(artemis.started != null)
|
||||
val prevRunning = running
|
||||
running = false
|
||||
val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice")
|
||||
@ -423,13 +345,7 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
// Only first caller to gets running true to protect against double stop, which seems to happen in some integration tests.
|
||||
if (running) {
|
||||
state.locked {
|
||||
producer?.close()
|
||||
producer = null
|
||||
// Ensure any trailing messages are committed to the journal
|
||||
session!!.commit()
|
||||
// Closing the factory closes all the sessions it produced as well.
|
||||
sessionFactory!!.close()
|
||||
sessionFactory = null
|
||||
artemis.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -440,7 +356,8 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
messagingExecutor.fetchFrom {
|
||||
state.locked {
|
||||
val mqAddress = getMQAddress(target)
|
||||
val artemisMessage = session!!.createMessage(true).apply {
|
||||
val artemis = artemis.started!!
|
||||
val artemisMessage = artemis.session.createMessage(true).apply {
|
||||
putStringProperty(cordaVendorProperty, cordaVendor)
|
||||
putStringProperty(releaseVersionProperty, releaseVersion)
|
||||
putIntProperty(platformVersionProperty, versionInfo.platformVersion)
|
||||
@ -459,15 +376,14 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
"Send to: $mqAddress topic: ${message.topicSession.topic} " +
|
||||
"sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}"
|
||||
}
|
||||
producer!!.send(mqAddress, artemisMessage)
|
||||
|
||||
artemis.producer.send(mqAddress, artemisMessage)
|
||||
retryId?.let {
|
||||
database.transaction {
|
||||
messagesToRedeliver.computeIfAbsent(it, { Pair(message, target) })
|
||||
}
|
||||
scheduledMessageRedeliveries[it] = messagingExecutor.schedule({
|
||||
sendWithRetry(0, mqAddress, artemisMessage, it)
|
||||
}, config.messageRedeliveryDelaySeconds.toLong(), TimeUnit.SECONDS)
|
||||
}, messageRedeliveryDelaySeconds, TimeUnit.SECONDS)
|
||||
|
||||
}
|
||||
}
|
||||
@ -498,12 +414,12 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
|
||||
state.locked {
|
||||
log.trace { "Retry #$retryCount sending message $message to $address for $retryId" }
|
||||
producer!!.send(address, message)
|
||||
artemis.started!!.producer.send(address, message)
|
||||
}
|
||||
|
||||
scheduledMessageRedeliveries[retryId] = messagingExecutor.schedule({
|
||||
sendWithRetry(retryCount + 1, address, message, retryId)
|
||||
}, config.messageRedeliveryDelaySeconds.toLong(), TimeUnit.SECONDS)
|
||||
}, messageRedeliveryDelaySeconds, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
override fun cancelRedelivery(retryId: Long) {
|
||||
@ -533,10 +449,11 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
|
||||
private fun createQueueIfAbsent(queueName: String) {
|
||||
state.alreadyLocked {
|
||||
val queueQuery = session!!.queueQuery(SimpleString(queueName))
|
||||
val session = artemis.started!!.session
|
||||
val queueQuery = session.queueQuery(SimpleString(queueName))
|
||||
if (!queueQuery.isExists) {
|
||||
log.info("Create fresh queue $queueName bound on same address")
|
||||
session!!.createQueue(queueName, RoutingType.MULTICAST, queueName, true)
|
||||
session.createQueue(queueName, RoutingType.MULTICAST, queueName, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -564,22 +481,6 @@ class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
return NodeClientMessage(topicSession, data, uuid)
|
||||
}
|
||||
|
||||
private fun createOutOfProcessVerifierService(): TransactionVerifierService {
|
||||
return object : OutOfProcessTransactionVerifierService(metrics) {
|
||||
override fun sendRequest(nonce: Long, transaction: LedgerTransaction) {
|
||||
messagingExecutor.fetchFrom {
|
||||
state.locked {
|
||||
val message = session!!.createMessage(false)
|
||||
val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress))
|
||||
request.writeToClientMessage(message)
|
||||
producer!!.send(VERIFICATION_REQUESTS_QUEUE_NAME, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// TODO Rethink PartyInfo idea and merging PeerAddress/ServiceAddress (the only difference is that Service address doesn't hold host and port)
|
||||
override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {
|
||||
return when (partyInfo) {
|
@ -0,0 +1,29 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.utilities.*
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
|
||||
class RPCMessagingClient(private val config: SSLConfiguration, serverAddress: NetworkHostAndPort) : SingletonSerializeAsToken() {
|
||||
private val artemis = ArtemisMessagingClient(config, serverAddress)
|
||||
private var rpcServer: RPCServer? = null
|
||||
fun start(rpcOps: RPCOps, userService: RPCUserService) = synchronized(this) {
|
||||
val locator = artemis.start().sessionFactory.serverLocator
|
||||
val myCert = loadKeyStore(config.sslKeystore, config.keyStorePassword).getX509Certificate(X509Utilities.CORDA_CLIENT_TLS)
|
||||
rpcServer = RPCServer(rpcOps, NODE_USER, NODE_USER, locator, userService, CordaX500Name.build(myCert.subjectX500Principal))
|
||||
}
|
||||
|
||||
fun start2(serverControl: ActiveMQServerControl) = synchronized(this) {
|
||||
rpcServer!!.start(serverControl)
|
||||
}
|
||||
|
||||
fun stop() = synchronized(this) {
|
||||
artemis.stop()
|
||||
}
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
|
||||
import net.corda.node.utilities.*
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
|
||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import java.util.concurrent.*
|
||||
|
||||
class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHostAndPort, metrics: MetricRegistry) : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = loggerFor<VerifierMessagingClient>()
|
||||
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
|
||||
}
|
||||
|
||||
private val artemis = ArtemisMessagingClient(config, serverAddress)
|
||||
/** An executor for sending messages */
|
||||
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
|
||||
private var verificationResponseConsumer: ClientConsumer? = null
|
||||
fun start(): Unit = synchronized(this) {
|
||||
val session = artemis.start().session
|
||||
fun checkVerifierCount() {
|
||||
if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) {
|
||||
log.warn("No connected verifier listening on $VERIFICATION_REQUESTS_QUEUE_NAME!")
|
||||
}
|
||||
}
|
||||
|
||||
// Attempts to create a durable queue on the broker which is bound to an address of the same name.
|
||||
fun createQueueIfAbsent(queueName: String) {
|
||||
val queueQuery = session.queueQuery(SimpleString(queueName))
|
||||
if (!queueQuery.isExists) {
|
||||
log.info("Create fresh queue $queueName bound on same address")
|
||||
session.createQueue(queueName, RoutingType.MULTICAST, queueName, true)
|
||||
}
|
||||
}
|
||||
createQueueIfAbsent(VERIFICATION_REQUESTS_QUEUE_NAME)
|
||||
createQueueIfAbsent(verifierResponseAddress)
|
||||
verificationResponseConsumer = session.createConsumer(verifierResponseAddress)
|
||||
messagingExecutor.scheduleAtFixedRate(::checkVerifierCount, 0, 10, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
fun start2() = synchronized(this) {
|
||||
verifierService.start(verificationResponseConsumer!!)
|
||||
}
|
||||
|
||||
fun stop() = synchronized(this) {
|
||||
artemis.stop()
|
||||
}
|
||||
|
||||
internal val verifierService = OutOfProcessTransactionVerifierService(metrics) { nonce, transaction ->
|
||||
messagingExecutor.fetchFrom {
|
||||
sendRequest(nonce, transaction)
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendRequest(nonce: Long, transaction: LedgerTransaction) = synchronized(this) {
|
||||
val started = artemis.started!!
|
||||
val message = started.session.createMessage(false)
|
||||
val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress))
|
||||
request.writeToClientMessage(message)
|
||||
started.producer.send(VERIFICATION_REQUESTS_QUEUE_NAME, message)
|
||||
}
|
||||
}
|
@ -14,7 +14,7 @@ import net.corda.node.services.api.SchemaService
|
||||
import net.corda.node.services.events.NodeSchedulerService
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.keys.PersistentKeyManagementService
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.messaging.P2PMessagingClient
|
||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
||||
import net.corda.node.services.persistence.DBTransactionMappingStorage
|
||||
import net.corda.node.services.persistence.DBTransactionStorage
|
||||
@ -47,8 +47,8 @@ class NodeSchemaService(cordappLoader: CordappLoader?) : SchemaService, Singleto
|
||||
PersistentUniquenessProvider.PersistentNotaryCommit::class.java,
|
||||
NodeSchedulerService.PersistentScheduledState::class.java,
|
||||
NodeAttachmentService.DBAttachment::class.java,
|
||||
NodeMessagingClient.ProcessedMessage::class.java,
|
||||
NodeMessagingClient.RetryMessage::class.java,
|
||||
P2PMessagingClient.ProcessedMessage::class.java,
|
||||
P2PMessagingClient.RetryMessage::class.java,
|
||||
NodeAttachmentService.DBAttachment::class.java,
|
||||
RaftUniquenessProvider.RaftState::class.java,
|
||||
BFTNonValidatingNotaryService.PersistedCommittedState::class.java,
|
||||
|
@ -16,8 +16,9 @@ import net.corda.nodeapi.VerifierApi
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
abstract class OutOfProcessTransactionVerifierService(
|
||||
private val metrics: MetricRegistry
|
||||
class OutOfProcessTransactionVerifierService(
|
||||
private val metrics: MetricRegistry,
|
||||
private val sendRequest: (Long, LedgerTransaction) -> Unit
|
||||
) : SingletonSerializeAsToken(), TransactionVerifierService {
|
||||
companion object {
|
||||
val log = loggerFor<OutOfProcessTransactionVerifierService>()
|
||||
@ -60,8 +61,6 @@ abstract class OutOfProcessTransactionVerifierService(
|
||||
}
|
||||
}
|
||||
|
||||
abstract fun sendRequest(nonce: Long, transaction: LedgerTransaction)
|
||||
|
||||
override fun verify(transaction: LedgerTransaction): CordaFuture<*> {
|
||||
log.info("Verifying ${transaction.id}")
|
||||
val future = openFuture<Unit>()
|
||||
|
@ -1,11 +1,9 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.RPCUserServiceImpl
|
||||
@ -58,16 +56,10 @@ class ArtemisMessagingTests {
|
||||
private lateinit var database: CordaPersistence
|
||||
private lateinit var userService: RPCUserService
|
||||
private lateinit var networkMapRegistrationFuture: CordaFuture<Unit>
|
||||
|
||||
private var messagingClient: NodeMessagingClient? = null
|
||||
private var messagingClient: P2PMessagingClient? = null
|
||||
private var messagingServer: ArtemisMessagingServer? = null
|
||||
|
||||
private lateinit var networkMapCache: NetworkMapCacheImpl
|
||||
|
||||
private val rpcOps = object : RPCOps {
|
||||
override val protocolVersion: Int get() = throw UnsupportedOperationException()
|
||||
}
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
val baseDirectory = temporaryFolder.root.toPath()
|
||||
@ -186,10 +178,10 @@ class ArtemisMessagingTests {
|
||||
}
|
||||
|
||||
private fun startNodeMessagingClient() {
|
||||
messagingClient!!.start(rpcOps, userService)
|
||||
messagingClient!!.start()
|
||||
}
|
||||
|
||||
private fun createAndStartClientAndServer(receivedMessages: LinkedBlockingQueue<Message>): NodeMessagingClient {
|
||||
private fun createAndStartClientAndServer(receivedMessages: LinkedBlockingQueue<Message>): P2PMessagingClient {
|
||||
createMessagingServer().start()
|
||||
|
||||
val messagingClient = createMessagingClient()
|
||||
@ -198,20 +190,19 @@ class ArtemisMessagingTests {
|
||||
receivedMessages.add(message)
|
||||
}
|
||||
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.
|
||||
thread { messagingClient.run(messagingServer!!.serverControl) }
|
||||
thread { messagingClient.run() }
|
||||
return messagingClient
|
||||
}
|
||||
|
||||
private fun createMessagingClient(server: NetworkHostAndPort = NetworkHostAndPort("localhost", serverPort)): NodeMessagingClient {
|
||||
private fun createMessagingClient(server: NetworkHostAndPort = NetworkHostAndPort("localhost", serverPort)): P2PMessagingClient {
|
||||
return database.transaction {
|
||||
NodeMessagingClient(
|
||||
P2PMessagingClient(
|
||||
config,
|
||||
MOCK_VERSION_INFO,
|
||||
server,
|
||||
identity.public,
|
||||
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
|
||||
database,
|
||||
MetricRegistry()).apply {
|
||||
database).apply {
|
||||
config.configureWithDevSSLCertificate()
|
||||
messagingClient = this
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user