mirror of
https://github.com/corda/corda.git
synced 2025-02-21 09:51:57 +00:00
CORDA-787 Remove unused class (#2049)
* and refactor a superclass that isn't really one * move it to internal * misc refactorings
This commit is contained in:
parent
953a4a3790
commit
039cacae76
@ -1,19 +1,17 @@
|
||||
package net.corda.nodeapi
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import net.corda.core.messaging.MessageRecipientGroup
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.toBase58String
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import java.security.PublicKey
|
||||
|
||||
/**
|
||||
* The base class for Artemis services that defines shared data structures and SSL transport configuration.
|
||||
*/
|
||||
abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
|
||||
class ArtemisMessagingComponent {
|
||||
companion object {
|
||||
init {
|
||||
System.setProperty("org.jboss.logging.provider", "slf4j")
|
||||
@ -66,7 +64,4 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
|
||||
data class ServiceAddress(val identity: PublicKey) : ArtemisAddress, MessageRecipientGroup {
|
||||
override val queueName: String = "$PEERS_PREFIX${identity.toBase58String()}"
|
||||
}
|
||||
|
||||
/** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */
|
||||
abstract val config: SSLConfiguration?
|
||||
}
|
@ -3,8 +3,8 @@ package net.corda.services.messaging
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.internal.*
|
||||
import net.corda.node.utilities.*
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.testing.MEGA_CORP
|
||||
|
@ -17,10 +17,10 @@ import net.corda.core.utilities.toBase58String
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
|
@ -142,7 +142,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
info.legalIdentities[0].owningKey,
|
||||
serverThread,
|
||||
database,
|
||||
services.monitoringService,
|
||||
services.monitoringService.metrics,
|
||||
advertisedAddress)
|
||||
}
|
||||
|
||||
|
@ -1,56 +0,0 @@
|
||||
package net.corda.node.services.api
|
||||
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.node.services.messaging.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* Abstract superclass for services that a node can host, which provides helper functions.
|
||||
*/
|
||||
@ThreadSafe
|
||||
abstract class AbstractNodeService(val network: MessagingService) : SingletonSerializeAsToken() {
|
||||
/**
|
||||
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
|
||||
* common boilerplate code. Exceptions are caught and passed to the provided consumer. If you just want a simple
|
||||
* acknowledgement response with no content, use [net.corda.core.messaging.Ack].
|
||||
*
|
||||
* @param topic the topic, without the default session ID postfix (".0).
|
||||
* @param handler a function to handle the deserialised request and return an optional response (if return type not Unit)
|
||||
* @param exceptionConsumer a function to which any thrown exception is passed.
|
||||
*/
|
||||
protected inline fun <reified Q : ServiceRequestMessage, reified R : Any>
|
||||
addMessageHandler(topic: String,
|
||||
crossinline handler: (Q) -> R,
|
||||
crossinline exceptionConsumer: (Message, Exception) -> Unit): MessageHandlerRegistration {
|
||||
return network.addMessageHandler(topic, MessagingService.DEFAULT_SESSION_ID) { message, _ ->
|
||||
try {
|
||||
val request = message.data.deserialize<Q>()
|
||||
val response = handler(request)
|
||||
// If the return type R is Unit, then do not send a response
|
||||
if (response.javaClass != Unit.javaClass) {
|
||||
val msg = network.createMessage(topic, request.sessionID, response.serialize().bytes)
|
||||
network.send(msg, request.replyTo)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
exceptionConsumer(message, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
|
||||
* common boilerplate code. Exceptions are propagated to the messaging layer. If you just want a simple
|
||||
* acknowledgement response with no content, use [net.corda.core.messaging.Ack].
|
||||
*
|
||||
* @param topic the topic, without the default session ID postfix (".0).
|
||||
* @param handler a function to handle the deserialised request and return an optional response (if return type not Unit).
|
||||
*/
|
||||
protected inline fun <reified Q : ServiceRequestMessage, reified R : Any>
|
||||
addMessageHandler(topic: String,
|
||||
crossinline handler: (Q) -> R): MessageHandlerRegistration {
|
||||
return addMessageHandler(topic, handler, { _: Message, exception: Exception -> throw exception })
|
||||
}
|
||||
|
||||
}
|
@ -11,6 +11,7 @@ import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
@ -27,8 +28,14 @@ import net.corda.node.utilities.X509Utilities.CORDA_CLIENT_TLS
|
||||
import net.corda.node.utilities.X509Utilities.CORDA_ROOT_CA
|
||||
import net.corda.node.utilities.loadKeyStore
|
||||
import net.corda.nodeapi.*
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisPeerAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration
|
||||
@ -86,19 +93,16 @@ import javax.security.cert.CertificateException
|
||||
* a fully connected network, trusted network or on localhost.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
val p2pPort: Int,
|
||||
class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
private val p2pPort: Int,
|
||||
val rpcPort: Int?,
|
||||
val networkMapCache: NetworkMapCache,
|
||||
val userService: RPCUserService) : ArtemisMessagingComponent() {
|
||||
val userService: RPCUserService) : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = loggerFor<ArtemisMessagingServer>()
|
||||
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
|
||||
@JvmStatic
|
||||
val MAX_FILE_SIZE = 10485760
|
||||
|
||||
val ipDetectRequestProperty = "ip-request-id"
|
||||
val ipDetectResponseProperty = "ip-address"
|
||||
}
|
||||
|
||||
private class InnerState {
|
||||
|
@ -205,19 +205,6 @@ fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: Mess
|
||||
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID(), retryId: Long? = null)
|
||||
= send(createMessage(topicSession, payload.serialize().bytes, uuid), to, retryId)
|
||||
|
||||
/**
|
||||
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
|
||||
* on the messaging service interface until you have successfully started up the system. One of these objects should
|
||||
* be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations
|
||||
* may let you cast the returned future to an object that lets you get status info.
|
||||
*
|
||||
* A specific implementation of the controller class will have extra features that let you customise it before starting
|
||||
* it up.
|
||||
*/
|
||||
interface MessagingServiceBuilder<out T : MessagingService> {
|
||||
fun start(): ListenableFuture<out T>
|
||||
}
|
||||
|
||||
interface MessageHandlerRegistration
|
||||
|
||||
/**
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.ThreadBox
|
||||
@ -10,6 +11,7 @@ import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.node.services.TransactionVerifierService
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.serialization.serialize
|
||||
@ -20,14 +22,17 @@ import net.corda.core.utilities.sequence
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.api.MonitoringService
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.VerifierType
|
||||
import net.corda.node.services.statemachine.StateMachineManagerImpl
|
||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
|
||||
import net.corda.node.utilities.*
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
@ -72,15 +77,15 @@ import javax.persistence.Lob
|
||||
* If not provided, will default to [serverAddress].
|
||||
*/
|
||||
@ThreadSafe
|
||||
class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
class NodeMessagingClient(private val config: NodeConfiguration,
|
||||
private val versionInfo: VersionInfo,
|
||||
private val serverAddress: NetworkHostAndPort,
|
||||
private val myIdentity: PublicKey,
|
||||
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
|
||||
private val database: CordaPersistence,
|
||||
private val monitoringService: MonitoringService,
|
||||
private val metrics: MetricRegistry,
|
||||
advertisedAddress: NetworkHostAndPort = serverAddress
|
||||
) : ArtemisMessagingComponent(), MessagingService {
|
||||
) : SingletonSerializeAsToken(), MessagingService {
|
||||
companion object {
|
||||
private val log = loggerFor<NodeMessagingClient>()
|
||||
|
||||
@ -560,7 +565,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
}
|
||||
|
||||
private fun createOutOfProcessVerifierService(): TransactionVerifierService {
|
||||
return object : OutOfProcessTransactionVerifierService(monitoringService) {
|
||||
return object : OutOfProcessTransactionVerifierService(metrics) {
|
||||
override fun sendRequest(nonce: Long, transaction: LedgerTransaction) {
|
||||
messagingExecutor.fetchFrom {
|
||||
state.locked {
|
||||
|
@ -31,7 +31,7 @@ import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.logging.pushToLoggingContext
|
||||
import net.corda.nodeapi.*
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import org.apache.activemq.artemis.api.core.Message
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
|
@ -3,7 +3,7 @@ package net.corda.node.services.messaging
|
||||
import net.corda.client.rpc.PermissionException
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
|
||||
data class RpcAuthContext(val invocation: InvocationContext, val grantedPermissions: RpcPermissions) {
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import com.google.common.util.concurrent.MoreExecutors
|
||||
import net.corda.core.internal.concurrent.fork
|
||||
import net.corda.core.node.services.TransactionVerifierService
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -8,7 +7,6 @@ import net.corda.core.transactions.LedgerTransaction
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
class InMemoryTransactionVerifierService(numberOfWorkers: Int) : SingletonSerializeAsToken(), TransactionVerifierService {
|
||||
private val workerPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfWorkers))
|
||||
|
||||
private val workerPool = Executors.newFixedThreadPool(numberOfWorkers)
|
||||
override fun verify(transaction: LedgerTransaction) = workerPool.fork(transaction::verify)
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.node.services.transactions
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.codahale.metrics.Timer
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.SecureHash
|
||||
@ -11,13 +12,12 @@ import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.api.MonitoringService
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
abstract class OutOfProcessTransactionVerifierService(
|
||||
val monitoringService: MonitoringService
|
||||
private val metrics: MetricRegistry
|
||||
) : SingletonSerializeAsToken(), TransactionVerifierService {
|
||||
companion object {
|
||||
val log = loggerFor<OutOfProcessTransactionVerifierService>()
|
||||
@ -34,16 +34,16 @@ abstract class OutOfProcessTransactionVerifierService(
|
||||
// Metrics
|
||||
private fun metric(name: String) = "OutOfProcessTransactionVerifierService.$name"
|
||||
|
||||
private val durationTimer = monitoringService.metrics.timer(metric("Verification.Duration"))
|
||||
private val successMeter = monitoringService.metrics.meter(metric("Verification.Success"))
|
||||
private val failureMeter = monitoringService.metrics.meter(metric("Verification.Failure"))
|
||||
private val durationTimer = metrics.timer(metric("Verification.Duration"))
|
||||
private val successMeter = metrics.meter(metric("Verification.Success"))
|
||||
private val failureMeter = metrics.meter(metric("Verification.Failure"))
|
||||
|
||||
class VerificationResultForUnknownTransaction(nonce: Long) :
|
||||
Exception("Verification result arrived for unknown transaction nonce $nonce")
|
||||
|
||||
fun start(responseConsumer: ClientConsumer) {
|
||||
log.info("Starting out of process verification service")
|
||||
monitoringService.metrics.register(metric("VerificationsInFlight"), Gauge { verificationHandles.size })
|
||||
metrics.register(metric("VerificationsInFlight"), Gauge { verificationHandles.size })
|
||||
responseConsumer.setMessageHandler { message ->
|
||||
val response = VerifierApi.VerificationResponse.fromClientMessage(message)
|
||||
val handle = verificationHandles.remove(response.verificationId) ?:
|
||||
|
@ -9,7 +9,6 @@ import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.RPCUserServiceImpl
|
||||
import net.corda.node.services.api.MonitoringService
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.network.NetworkMapCacheImpl
|
||||
@ -30,7 +29,6 @@ import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import org.mockito.Mockito.mock
|
||||
import java.net.ServerSocket
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
@ -213,7 +211,7 @@ class ArtemisMessagingTests {
|
||||
identity.public,
|
||||
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
|
||||
database,
|
||||
MonitoringService(MetricRegistry())).apply {
|
||||
MetricRegistry()).apply {
|
||||
config.configureWithDevSSLCertificate()
|
||||
messagingClient = this
|
||||
}
|
||||
|
@ -1,8 +1,6 @@
|
||||
package net.corda.testing.node
|
||||
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.crypto.CompositeKey
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.ThreadBox
|
||||
@ -12,6 +10,8 @@ import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -33,6 +33,19 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.concurrent.schedule
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
/**
|
||||
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
|
||||
* on the messaging service interface until you have successfully started up the system. One of these objects should
|
||||
* be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations
|
||||
* may let you cast the returned future to an object that lets you get status info.
|
||||
*
|
||||
* A specific implementation of the controller class will have extra features that let you customise it before starting
|
||||
* it up.
|
||||
*/
|
||||
interface MessagingServiceBuilder<out T : MessagingService> {
|
||||
fun start(): CordaFuture<out T>
|
||||
}
|
||||
|
||||
/**
|
||||
* An in-memory network allows you to manufacture [InMemoryMessaging]s for a set of participants. Each
|
||||
* [InMemoryMessaging] maintains a queue of messages it has received, and a background thread that dispatches
|
||||
@ -190,7 +203,7 @@ class InMemoryMessagingNetwork(
|
||||
val serviceHandles: List<ServiceHandle>,
|
||||
val executor: AffinityExecutor,
|
||||
val database: CordaPersistence) : MessagingServiceBuilder<InMemoryMessaging> {
|
||||
override fun start(): ListenableFuture<InMemoryMessaging> {
|
||||
override fun start(): CordaFuture<InMemoryMessaging> {
|
||||
synchronized(this@InMemoryMessagingNetwork) {
|
||||
val node = InMemoryMessaging(manuallyPumped, id, executor, database)
|
||||
handleEndpointMap[id] = node
|
||||
@ -198,7 +211,7 @@ class InMemoryMessagingNetwork(
|
||||
serviceToPeersMapping.getOrPut(it) { LinkedHashSet<PeerHandle>() }.add(id)
|
||||
Unit
|
||||
}
|
||||
return Futures.immediateFuture(node)
|
||||
return doneFuture(node)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -244,7 +257,7 @@ class InMemoryMessagingNetwork(
|
||||
log.trace { transfer.toString() }
|
||||
val calc = latencyCalculator
|
||||
if (calc != null && transfer.recipients is SingleMessageRecipient) {
|
||||
val messageSent = SettableFuture.create<Unit>()
|
||||
val messageSent = openFuture<Unit>()
|
||||
// Inject some artificial latency.
|
||||
timer.schedule(calc.between(transfer.sender, transfer.recipients).toMillis()) {
|
||||
pumpSendInternal(transfer)
|
||||
|
@ -3,7 +3,6 @@ package net.corda.testing.messaging
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
@ -14,7 +13,7 @@ import org.apache.activemq.artemis.api.core.client.*
|
||||
* As the name suggests this is a simple client for connecting to MQ brokers.
|
||||
*/
|
||||
class SimpleMQClient(val target: NetworkHostAndPort,
|
||||
override val config: SSLConfiguration? = configureTestSSL(DEFAULT_MQ_LEGAL_NAME)) : ArtemisMessagingComponent() {
|
||||
private val config: SSLConfiguration? = configureTestSSL(DEFAULT_MQ_LEGAL_NAME)) {
|
||||
companion object {
|
||||
val DEFAULT_MQ_LEGAL_NAME = CordaX500Name(organisation = "SimpleMQClient", locality = "London", country = "GB")
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.config.configureDevKeyAndTrustStores
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
|
Loading…
x
Reference in New Issue
Block a user