Added security to RPC and P2P systems.

This commit is contained in:
Shams Asari 2016-11-07 14:30:40 +00:00
parent b6e56641bd
commit 4addb91f80
29 changed files with 875 additions and 415 deletions

View File

@ -9,6 +9,7 @@ import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLI
import net.corda.node.services.messaging.CordaRPCOps
import net.corda.node.services.messaging.RPCException
import net.corda.node.services.messaging.rpcLog
import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientSession
@ -38,11 +39,11 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur
private val state = ThreadBox(State())
/** Opens the connection to the server and registers a JVM shutdown hook to cleanly disconnect. */
@Throws(ActiveMQNotConnectedException::class)
@Throws(ActiveMQException::class)
fun start(username: String, password: String) {
state.locked {
check(!running)
checkStorePasswords() // Check the password.
checkStorePasswords()
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(ConnectionDirection.OUTBOUND, host.hostText, host.port))
serverLocator.threadPoolMaxSize = 1
// TODO: Configure session reconnection, confirmation window sizes and other Artemis features.
@ -53,7 +54,6 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur
session.start()
clientImpl = CordaRPCClientImpl(session, state.lock, username)
running = true
// We will use the ID in strings so strip the sign bit.
}
Runtime.getRuntime().addShutdownHook(Thread {

View File

@ -41,7 +41,7 @@ import kotlin.reflect.jvm.javaMethod
* # Design notes
*
* The way RPCs are handled is fairly standard except for the handling of observables. When an RPC might return
* an [rx.Observable] it is specially tagged. This causes the client to create a new transient queue for the
* an [Observable] it is specially tagged. This causes the client to create a new transient queue for the
* receiving of observables and their observations with a random ID in the name. This ID is sent to the server in
* a message header. All observations are sent via this single queue.
*
@ -141,19 +141,20 @@ class CordaRPCClientImpl(private val session: ClientSession,
*/
@ThreadSafe
private inner class RPCProxyHandler(private val timeout: Duration?) : InvocationHandler, Closeable {
private val proxyAddress = constructAddress()
private val proxyId = random63BitValue()
private val consumer: ClientConsumer
var serverProtocolVersion = 0
init {
val proxyAddress = constructAddress(proxyId)
consumer = sessionLock.withLock {
session.createTemporaryQueue(proxyAddress, proxyAddress)
session.createConsumer(proxyAddress)
}
}
private fun constructAddress() = "${ArtemisMessagingComponent.CLIENTS_PREFIX}$username.rpc.${random63BitValue()}"
private fun constructAddress(addressId: Long) = "${ArtemisMessagingComponent.CLIENTS_PREFIX}$username.rpc.$addressId"
@Synchronized
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
@ -230,9 +231,10 @@ class CordaRPCClientImpl(private val session: ClientSession,
private fun maybePrepareForObservables(location: Throwable, method: Method, msg: ClientMessage): Kryo {
// Create a temporary queue just for the emissions on any observables that are returned.
val observationsQueueName = constructAddress()
val observationsId = random63BitValue()
val observationsQueueName = constructAddress(observationsId)
session.createTemporaryQueue(observationsQueueName, observationsQueueName)
msg.putStringProperty(ClientRPCRequestMessage.OBSERVATIONS_TO, observationsQueueName)
msg.putLongProperty(ClientRPCRequestMessage.OBSERVATIONS_TO, observationsId)
// And make sure that we deserialise observable handles so that they're linked to the right
// queue. Also record a bit of metadata for debugging purposes.
return createRPCKryo(observableSerializer = ObservableDeserializer(observationsQueueName, method.name, location))
@ -241,7 +243,7 @@ class CordaRPCClientImpl(private val session: ClientSession,
private fun createMessage(method: Method): ClientMessage {
return session.createMessage(false).apply {
putStringProperty(ClientRPCRequestMessage.METHOD_NAME, method.name)
putStringProperty(ClientRPCRequestMessage.REPLY_TO, proxyAddress)
putLongProperty(ClientRPCRequestMessage.REPLY_TO, proxyId)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
@ -257,10 +259,10 @@ class CordaRPCClientImpl(private val session: ClientSession,
override fun close() {
consumer.close()
sessionLock.withLock { session.deleteQueue(proxyAddress) }
sessionLock.withLock { session.deleteQueue(constructAddress(proxyId)) }
}
override fun toString() = "Corda RPC Proxy listening on queue $proxyAddress"
override fun toString() = "Corda RPC Proxy listening on queue ${constructAddress(proxyId)}"
}
/**

View File

@ -1,8 +1,6 @@
package net.corda.client
import net.corda.client.impl.CordaRPCClientImpl
import net.corda.core.millis
import net.corda.core.random63BitValue
import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.LogHelper
import net.corda.node.services.RPCUserService
@ -22,14 +20,12 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
import org.junit.Test
import rx.Observable
import rx.subjects.PublishSubject
import java.io.Closeable
import java.time.Duration
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
@ -37,7 +33,6 @@ import java.util.concurrent.locks.ReentrantLock
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
import kotlin.test.fail
class ClientRPCInfrastructureTests {
// TODO: Test that timeouts work
@ -47,7 +42,7 @@ class ClientRPCInfrastructureTests {
lateinit var clientSession: ClientSession
lateinit var producer: ClientProducer
lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor
var proxy: TestOps? = null
lateinit var proxy: TestOps
private val authenticatedUser = User("test", "password", permissions = setOf())
@ -94,16 +89,10 @@ class ClientRPCInfrastructureTests {
clientSession.start()
LogHelper.setLevel("+net.corda.rpc")
}
private fun createProxyUsingReplyTo(username: String, timeout: Duration? = null): TestOps {
val proxy = CordaRPCClientImpl(clientSession, ReentrantLock(), username).proxyFor(TestOps::class.java, timeout = timeout)
this.proxy = proxy
return proxy
proxy = CordaRPCClientImpl(clientSession, ReentrantLock(), authenticatedUser.username).proxyFor(TestOps::class.java)
}
private fun createProxyUsingAuthenticatedReplyTo() = createProxyUsingReplyTo(authenticatedUser.username)
@After
fun shutdown() {
(proxy as Closeable?)?.close()
@ -156,7 +145,6 @@ class ClientRPCInfrastructureTests {
@Test
fun `simple RPCs`() {
val proxy = createProxyUsingAuthenticatedReplyTo()
// Does nothing, doesn't throw.
proxy.void()
@ -169,7 +157,6 @@ class ClientRPCInfrastructureTests {
@Test
fun `simple observable`() {
val proxy = createProxyUsingAuthenticatedReplyTo()
// This tests that the observations are transmitted correctly, also completion is transmitted.
val observations = proxy.makeObservable().toBlocking().toIterable().toList()
assertEquals(listOf(1, 2, 3, 4), observations)
@ -177,7 +164,6 @@ class ClientRPCInfrastructureTests {
@Test
fun `complex observables`() {
val proxy = createProxyUsingAuthenticatedReplyTo()
// This checks that we can return an object graph with complex usage of observables, like an observable
// that emits objects that contain more observables.
val serverQuotes = PublishSubject.create<Pair<String, Observable<String>>>()
@ -224,31 +210,12 @@ class ClientRPCInfrastructureTests {
@Test
fun versioning() {
val proxy = createProxyUsingAuthenticatedReplyTo()
assertFailsWith<UnsupportedOperationException> { proxy.addedLater() }
}
@Test
fun `authenticated user is available to RPC`() {
val proxy = createProxyUsingAuthenticatedReplyTo()
assertThat(proxy.captureUser()).isEqualTo(authenticatedUser.username)
}
@Test
fun `using another username for the reply-to`() {
assertThatExceptionOfType(RPCException.DeadlineExceeded::class.java).isThrownBy {
val proxy = createProxyUsingReplyTo(random63BitValue().toString(), timeout = 300.millis)
proxy.void()
fail("RPC successfully returned using someone else's username for the reply-to")
}
}
@Test
fun `using another username for the reply-to, which contains our username as a prefix`() {
assertThatExceptionOfType(RPCException.DeadlineExceeded::class.java).isThrownBy {
val proxy = createProxyUsingReplyTo("${authenticatedUser.username}extra", timeout = 300.millis)
proxy.void()
fail("RPC successfully returned using someone else's username for the reply-to")
}
}
}

View File

@ -107,11 +107,11 @@ object X509Utilities {
}
/**
* Helper method to create Subject field contents
* Return a bogus X509 for dev purposes. Use [getX509Name] for something more real.
*/
fun getDevX509Name(domain: String): X500Name {
fun getDevX509Name(commonName: String): X500Name {
val nameBuilder = X500NameBuilder(BCStyle.INSTANCE)
nameBuilder.addRDN(BCStyle.CN, domain)
nameBuilder.addRDN(BCStyle.CN, commonName)
nameBuilder.addRDN(BCStyle.O, "R3")
nameBuilder.addRDN(BCStyle.OU, "corda")
nameBuilder.addRDN(BCStyle.L, "London")
@ -574,18 +574,21 @@ object X509Utilities {
storePassword: String,
keyPassword: String,
caKeyStore: KeyStore,
caKeyPassword: String): KeyStore {
val rootCA = loadCertificateAndKey(caKeyStore,
caKeyPassword: String,
commonName: String): KeyStore {
val rootCA = X509Utilities.loadCertificateAndKey(
caKeyStore,
caKeyPassword,
CORDA_ROOT_CA_PRIVATE_KEY)
val intermediateCA = loadCertificateAndKey(caKeyStore,
val intermediateCA = X509Utilities.loadCertificateAndKey(
caKeyStore,
caKeyPassword,
CORDA_INTERMEDIATE_CA_PRIVATE_KEY)
val serverKey = generateECDSAKeyPairForSSL()
val host = InetAddress.getLocalHost()
val subject = getDevX509Name(host.canonicalHostName)
val serverCert = createServerCert(subject,
val serverCert = createServerCert(
getDevX509Name(commonName),
serverKey.public,
intermediateCA,
if (host.canonicalHostName == host.hostName) listOf() else listOf(host.hostName),
@ -594,7 +597,8 @@ object X509Utilities {
val keyPass = keyPassword.toCharArray()
val keyStore = loadOrCreateKeyStore(keyStoreFilePath, storePassword)
keyStore.addOrReplaceKey(CORDA_CLIENT_CA_PRIVATE_KEY,
keyStore.addOrReplaceKey(
CORDA_CLIENT_CA_PRIVATE_KEY,
serverKey.private,
keyPass,
arrayOf(serverCert, intermediateCA.certificate, rootCA.certificate))

View File

@ -7,6 +7,8 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.serialization.DeserializeAsKotlinObjectDef
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x500.style.BCStyle
import java.time.Instant
import java.util.*
import java.util.concurrent.atomic.AtomicBoolean
@ -37,7 +39,7 @@ interface MessagingService {
* @param sessionID identifier for the session the message is part of. For services listening before
* a session is established, use [DEFAULT_SESSION_ID].
*/
fun addMessageHandler(topic: String = "", sessionID: Long = DEFAULT_SESSION_ID, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
fun addMessageHandler(topic: String = "", sessionID: Long = DEFAULT_SESSION_ID, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
/**
* The provided function will be invoked for each received message whose topic and session matches. The callback
@ -49,7 +51,7 @@ interface MessagingService {
*
* @param topicSession identifier for the topic and session to listen for messages arriving on.
*/
fun addMessageHandler(topicSession: TopicSession, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
fun addMessageHandler(topicSession: TopicSession, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
/**
* Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once
@ -104,7 +106,7 @@ fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESS
* @param sessionID identifier for the session the message is part of. For services listening before
* a session is established, use [DEFAULT_SESSION_ID].
*/
fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, callback: (Message) -> Unit)
fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, callback: (ReceivedMessage) -> Unit)
= runOnNextMessage(TopicSession(topic, sessionID), callback)
/**
@ -114,7 +116,7 @@ fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, callback:
*
* @param topicSession identifier for the topic and session to listen for messages arriving on.
*/
inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, crossinline callback: (Message) -> Unit) {
inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, crossinline callback: (ReceivedMessage) -> Unit) {
val consumed = AtomicBoolean()
addMessageHandler(topicSession) { msg, reg ->
removeMessageHandler(reg)
@ -155,12 +157,7 @@ interface MessageHandlerRegistration
* a session is established, use [DEFAULT_SESSION_ID].
*/
data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION_ID) {
companion object {
val Blank = TopicSession("", DEFAULT_SESSION_ID)
}
fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID
override fun toString(): String = "$topic.$sessionID"
}
@ -181,6 +178,15 @@ interface Message {
val uniqueMessageId: UUID
}
// TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that.
// The certificate would need to be serialised into the message header or just its fingerprint and then download it via RPC,
// or something like that.
interface ReceivedMessage : Message {
/** The authenticated sender. */
val peer: X500Name
val peerLegalName: String get() = peer.getRDNs(BCStyle.CN).first().first.value.toString()
}
/** A singleton that's useful for validating topic strings */
object TopicStringValidator {
private val regex = "[a-zA-Z0-9.]+".toPattern()

View File

@ -49,8 +49,7 @@ object NotaryFlow {
throw NotaryException(NotaryError.SignaturesMissing(ex.missing))
}
val request = SignRequest(stx, serviceHub.myInfo.legalIdentity)
val response = sendAndReceive<Result>(notaryParty, request)
val response = sendAndReceive<Result>(notaryParty, SignRequest(stx))
return validateResponse(response)
}
@ -95,14 +94,13 @@ object NotaryFlow {
@Suspendable
override fun call() {
val (stx, reqIdentity) = receive<SignRequest>(otherSide).unwrap { it }
val stx = receive<SignRequest>(otherSide).unwrap { it.tx }
val wtx = stx.tx
val result = try {
validateTimestamp(wtx)
beforeCommit(stx, reqIdentity)
commitInputStates(wtx, reqIdentity)
beforeCommit(stx)
commitInputStates(wtx)
val sig = sign(stx.id.bytes)
Result.Success(sig)
} catch(e: NotaryException) {
@ -127,12 +125,12 @@ object NotaryFlow {
* undo the commit of the input states (the exact mechanism still needs to be worked out).
*/
@Suspendable
open fun beforeCommit(stx: SignedTransaction, reqIdentity: Party) {
open fun beforeCommit(stx: SignedTransaction) {
}
private fun commitInputStates(tx: WireTransaction, reqIdentity: Party) {
private fun commitInputStates(tx: WireTransaction) {
try {
uniquenessProvider.commit(tx.inputs, tx.id, reqIdentity)
uniquenessProvider.commit(tx.inputs, tx.id, otherSide)
} catch (e: UniquenessException) {
val conflictData = e.error.serialize()
val signedConflict = SignedData(conflictData, sign(conflictData.bytes))
@ -146,8 +144,7 @@ object NotaryFlow {
}
}
/** TODO: The caller must authenticate instead of just specifying its identity */
data class SignRequest(val tx: SignedTransaction, val callerIdentity: Party)
data class SignRequest(val tx: SignedTransaction)
sealed class Result {
class Error(val error: NotaryError) : Result()

View File

@ -21,11 +21,11 @@ class ValidatingNotaryFlow(otherSide: Party,
NotaryFlow.Service(otherSide, timestampChecker, uniquenessProvider) {
@Suspendable
override fun beforeCommit(stx: SignedTransaction, reqIdentity: Party) {
override fun beforeCommit(stx: SignedTransaction) {
try {
checkSignatures(stx)
val wtx = stx.tx
resolveTransaction(reqIdentity, wtx)
resolveTransaction(wtx)
wtx.toLedgerTransaction(serviceHub).verify()
} catch (e: Exception) {
when (e) {
@ -45,7 +45,7 @@ class ValidatingNotaryFlow(otherSide: Party,
}
@Suspendable
private fun resolveTransaction(reqIdentity: Party, wtx: WireTransaction) {
subFlow(ResolveTransactionsFlow(wtx, reqIdentity))
private fun resolveTransaction(wtx: WireTransaction) {
subFlow(ResolveTransactionsFlow(wtx, otherSide))
}
}

View File

@ -145,7 +145,7 @@ class X509UtilitiesTest {
val caCertAndKey = X509Utilities.loadCertificateAndKey(caKeyStore, "cakeypass", X509Utilities.CORDA_INTERMEDIATE_CA_PRIVATE_KEY)
// Generate server cert and private key and populate another keystore suitable for SSL
X509Utilities.createKeystoreForSSL(tmpServerKeyStore, "serverstorepass", "serverkeypass", caKeyStore, "cakeypass")
X509Utilities.createKeystoreForSSL(tmpServerKeyStore, "serverstorepass", "serverkeypass", caKeyStore, "cakeypass", "Mega Corp.")
// Load back server certificate
val serverKeyStore = X509Utilities.loadKeyStore(tmpServerKeyStore, "serverstorepass")
@ -153,9 +153,8 @@ class X509UtilitiesTest {
serverCertAndKey.certificate.checkValidity(Date())
serverCertAndKey.certificate.verify(caCertAndKey.certificate.publicKey)
val host = InetAddress.getLocalHost()
assertTrue { serverCertAndKey.certificate.subjectDN.name.contains("CN=" + host.canonicalHostName) }
assertTrue { serverCertAndKey.certificate.subjectDN.name.contains("CN=Mega Corp.") }
// Now sign something with private key and verify against certificate public key
val testData = "123456".toByteArray()
@ -183,7 +182,7 @@ class X509UtilitiesTest {
"trustpass")
// Generate server cert and private key and populate another keystore suitable for SSL
val keyStore = X509Utilities.createKeystoreForSSL(tmpServerKeyStore, "serverstorepass", "serverstorepass", caKeyStore, "cakeypass")
val keyStore = X509Utilities.createKeystoreForSSL(tmpServerKeyStore, "serverstorepass", "serverstorepass", caKeyStore, "cakeypass", "Mega Corp.")
val trustStore = X509Utilities.loadKeyStore(tmpTrustStore, "trustpass")
val context = SSLContext.getInstance("TLS")
@ -257,8 +256,7 @@ class X509UtilitiesTest {
val peerX500Principal = (peerChain[0] as X509Certificate).subjectX500Principal
val x500name = X500Name(peerX500Principal.name)
val cn = x500name.getRDNs(BCStyle.CN).first().first.value.toString()
val hostname = InetAddress.getLocalHost().canonicalHostName
assertEquals(hostname, cn)
assertEquals("Mega Corp.", cn)
val output = DataOutputStream(clientSocket.outputStream)

View File

@ -17,15 +17,6 @@ messaging subsystem directly. Instead you will build on top of the :doc:`flow fr
which adds a layer on top of raw messaging to manage multi-step flows and let you think in terms of identities
rather than specific network endpoints.
Messaging types
---------------
Every ``Message`` object has an associated *topic* and may have a *session ID*. These are wrapped in a ``TopicSession``.
An implementation of ``MessagingService`` can be used to create messages and send them. You can get access to the
messaging service via the ``ServiceHub`` object that is provided to your app. Endpoints on the network are
identified at the lowest level using ``SingleMessageRecipient`` which may be e.g. an IP address, or in future
versions perhaps a routing path through the network.
.. _network-map-service:
Network Map Service
@ -48,4 +39,72 @@ The network map currently supports:
* Looking up node for a party
* Suggesting a node providing a specific service, based on suitability for a contract and parties, for example suggesting
an appropriate interest rates oracle for a interest rate swap contract. Currently no recommendation logic is in place.
The code simply picks the first registered node that supports the required service.
The code simply picks the first registered node that supports the required service.
Message queues
--------------
The node makes use of various queues for its operation. The more important ones are described below. Others are used
for maintenance and other minor purposes.
:``p2p.inbound``
The node listens for messages sent from other peer nodes on this queue. Only clients who are authenticated to be
nodes on the same network are given permission to send. Messages which are routed internally are also sent to this
queue (e.g. two flows on the same node communicating with each other).
:``internal.peers.$identity``
These are a set of private queues only available to the node which it uses to route messages destined to other peers.
The queue name ends in the base 58 encoding of the peer's identity key. There is at most one queue per peer. The broker
creates a bridge from this queue to the peer's ``p2p.inbound`` queue, using the network map service to lookup the
peer's network address.
:``internal.networkmap``
This is another private queue just for the node which functions in a similar manner to the ``p2p.peers.*`` queues
except this is used to form a connection to the network map node. The node running the network map service is treated
differently as it provides information about the rest of the network.
:``rpc.requests``
RPC clients send their requests here, and it's only open for sending by clients authenticated as RPC users.
:``clients.$user.rpc.$random``
RPC clients are given permission to create a temporary queue incorporating their username (``$user``) and sole
permission to receive messages from it. RPC requests are required to include a random number (``$random``) from
which the node is able to construct the queue the user is listening on and send the response to that. This mechanism
prevents other users from being able listen in on the responses.
Security
--------
Clients attempting to connect to the node's broker fall in one of four groups:
# Anyone connecting with the username ``SystemUsers/Node`` is treated as the node hosting the broker, or a logical
component of the node. The TLS certificate they provide must match the one broker has for the node. If that's the case
they are given full access to all valid queues, otherwise they are rejected.
# Anyone connecting with the username ``SystemUsers/Peer`` is treated as a peer on the same Corda network as the node. Their
TLS root CA must be the same as the node's root CA - the root CA is the doorman of the network and having the same root CA
implies we've been let in by the same doorman. If they are part of the same network then they are only given permission
to send to our ``p2p.inbound`` queue, otherwise they are rejected.
# Every other username is treated as a RPC user and authenticated against the node's list of valid RPC users. If that
is successful then they are only given sufficient permission to perform RPC, otherwise they are rejected.
# Clients connecting without a username and password are rejected.
Artemis provides a feature of annotating each received message with the validated user. This allows the node's messaging
service to provide authenticated messages to the rest of the system. For the first two client types described above the
validated user is the X.500 subject DN of the client TLS certificate and we assume the common name is the legal name of
the peer. This allows the flow framework to authentically determine the ``Party`` initiating a new flow. For RPC clients
the validated user is the username itself and the RPC framework uses this to determine what permissions the user has.
.. note:: ``Party`` lookup is currently done by the legal name which isn't guaranteed to be unique. A future version will
use the full X.500 name as it can provide additional structures for uniqueness.
Messaging types
---------------
Every ``Message`` object has an associated *topic* and may have a *session ID*. These are wrapped in a ``TopicSession``.
An implementation of ``MessagingService`` can be used to create messages and send them. You can get access to the
messaging service via the ``ServiceHub`` object that is provided to your app. Endpoints on the network are
identified at the lowest level using ``SingleMessageRecipient`` which may be e.g. an IP address, or in future
versions perhaps a routing path through the network.

View File

@ -20,13 +20,8 @@ Setting up your own network
Certificates
------------
If two nodes are to communicate successfully then both need to have
each other's root certificate in their truststores. The simplest way
to achieve this is to have all nodes sign off of a single root.
Later R3 will provide this root for production use, but for testing you
can use ``certSigningRequestUtility.jar`` to generate a node
certificate with a fixed test root:
All nodes belonging to the same Corda network must have the same root CA. For testing purposes you can
use ``certSigningRequestUtility.jar`` to generate a node certificate with a fixed test root:
.. sourcecode:: bash

View File

@ -62,6 +62,7 @@ sourceSets {
dependencies {
compile project(':finance')
testCompile project(':test-utils')
testCompile project(':client')
compile "com.google.code.findbugs:jsr305:3.0.1"

View File

@ -0,0 +1,230 @@
package net.corda.services.messaging
import co.paralleluniverse.fibers.Suspendable
import com.google.common.net.HostAndPort
import net.corda.client.impl.CordaRPCClientImpl
import net.corda.core.crypto.Party
import net.corda.core.crypto.composite
import net.corda.core.crypto.generateKeyPair
import net.corda.core.flows.FlowLogic
import net.corda.core.getOrThrow
import net.corda.core.random63BitValue
import net.corda.core.seconds
import net.corda.node.internal.Node
import net.corda.node.services.User
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NETWORK_MAP_ADDRESS
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import net.corda.node.services.messaging.CordaRPCOps
import net.corda.node.services.messaging.NodeMessagingClient.Companion.RPC_QUEUE_REMOVALS_QUEUE
import net.corda.testing.messaging.SimpleMQClient
import net.corda.testing.node.NodeBasedTest
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.SimpleString
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.*
import java.util.concurrent.locks.ReentrantLock
/**
* Runs a series of MQ-related attacks against a node. Subclasses need to call [startAttacker] to connect
* the attacker to [alice].
*/
abstract class MQSecurityTest : NodeBasedTest() {
val rpcUser = User("user1", "pass", permissions = emptySet())
lateinit var alice: Node
lateinit var attacker: SimpleMQClient
private val clients = ArrayList<SimpleMQClient>()
@Before
fun start() {
alice = startNode("Alice", rpcUsers = extraRPCUsers + rpcUser)
attacker = SimpleMQClient(alice.configuration.artemisAddress)
startAttacker(attacker)
}
open val extraRPCUsers: List<User> get() = emptyList()
abstract fun startAttacker(attacker: SimpleMQClient)
@After
fun stopClients() {
clients.forEach { it.stop() }
}
@Test
fun `consume message from P2P queue`() {
assertConsumeAttackFails(P2P_QUEUE)
}
@Test
fun `consume message from peer queue`() {
val bobParty = startBobAndCommunicateWithAlice()
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
}
@Test
fun `send message to peer address`() {
val bobParty = startBobAndCommunicateWithAlice()
assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
}
@Test
fun `create queue for unknown peer`() {
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.composite.toBase58String()}"
assertNonTempQueueCreationAttackFails(invalidPeerQueue, durable = true)
assertNonTempQueueCreationAttackFails(invalidPeerQueue, durable = false)
assertTempQueueCreationAttackFails(invalidPeerQueue)
}
@Test
fun `consume message from network map queue`() {
assertConsumeAttackFails(NETWORK_MAP_ADDRESS.toString())
}
@Test
fun `send message to network map address`() {
assertSendAttackFails(NETWORK_MAP_ADDRESS.toString())
}
@Test
fun `consume message from RPC requests queue`() {
assertConsumeAttackFails(RPC_REQUESTS_QUEUE)
}
@Test
fun `consume message from logged in user's RPC queue`() {
val user1Queue = loginToRPCAndGetClientQueue()
assertConsumeAttackFails(user1Queue)
}
@Test
fun `send message on logged in user's RPC address`() {
val user1Queue = loginToRPCAndGetClientQueue()
assertSendAttackFails(user1Queue)
}
@Test
fun `create queue for valid RPC user`() {
val user1Queue = "$CLIENTS_PREFIX${rpcUser.username}.rpc.${random63BitValue()}"
assertTempQueueCreationAttackFails(user1Queue)
}
@Test
fun `create queue for invalid RPC user`() {
val invalidRPCQueue = "$CLIENTS_PREFIX${random63BitValue()}.rpc.${random63BitValue()}"
assertTempQueueCreationAttackFails(invalidRPCQueue)
}
@Test
fun `consume message from RPC queue removals queue`() {
assertConsumeAttackFails(RPC_QUEUE_REMOVALS_QUEUE)
}
@Test
fun `send message to notifications address`() {
assertSendAttackFails(NOTIFICATIONS_ADDRESS)
}
@Test
fun `create random queue`() {
val randomQueue = random63BitValue().toString()
assertNonTempQueueCreationAttackFails(randomQueue, durable = false)
assertNonTempQueueCreationAttackFails(randomQueue, durable = true)
assertTempQueueCreationAttackFails(randomQueue)
}
fun clientTo(target: HostAndPort): SimpleMQClient {
val client = SimpleMQClient(target)
clients += client
return client
}
fun loginToRPC(target: HostAndPort, rpcUser: User): SimpleMQClient {
val client = clientTo(target)
client.loginToRPC(rpcUser)
return client
}
fun SimpleMQClient.loginToRPC(rpcUser: User): CordaRPCOps {
start(rpcUser.username, rpcUser.password)
val clientImpl = CordaRPCClientImpl(session, ReentrantLock(), rpcUser.username)
return clientImpl.proxyFor(CordaRPCOps::class.java, timeout = 1.seconds)
}
fun loginToRPCAndGetClientQueue(): String {
val rpcClient = loginToRPC(alice.configuration.artemisAddress, rpcUser)
val clientQueueQuery = SimpleString("$CLIENTS_PREFIX${rpcUser.username}.rpc.*")
return rpcClient.session.addressQuery(clientQueueQuery).queueNames.single().toString()
}
fun assertTempQueueCreationAttackFails(queue: String) {
assertAttackFails(queue, "CREATE_NON_DURABLE_QUEUE") {
attacker.session.createTemporaryQueue(queue, queue)
}
// Double-check
assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java).isThrownBy {
attacker.session.createConsumer(queue)
}
}
fun assertNonTempQueueCreationAttackFails(queue: String, durable: Boolean) {
val permission = if (durable) "CREATE_DURABLE_QUEUE" else "CREATE_NON_DURABLE_QUEUE"
assertAttackFails(queue, permission) {
attacker.session.createQueue(queue, queue, durable)
}
// Double-check
assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java).isThrownBy {
attacker.session.createConsumer(queue)
}
}
fun assertSendAttackFails(address: String) {
val message = attacker.createMessage()
assertAttackFails(address, "SEND") {
attacker.producer.send(address, message)
}
// TODO Make sure no actual message is received
}
fun assertConsumeAttackFails(queue: String) {
assertAttackFails(queue, "CONSUME") {
attacker.session.createConsumer(queue)
}
assertAttackFails(queue, "BROWSE") {
attacker.session.createConsumer(queue, true)
}
}
fun assertAttackFails(queue: String, permission: String, attack: () -> Unit) {
assertThatExceptionOfType(ActiveMQSecurityException::class.java)
.isThrownBy(attack)
.withMessageContaining(queue)
.withMessageContaining(permission)
}
private fun startBobAndCommunicateWithAlice(): Party {
val bob = startNode("Bob")
bob.services.registerFlowInitiator(SendFlow::class, ::ReceiveFlow)
val bobParty = bob.info.legalIdentity
// Perform a protocol exchange to force the peer queue to be created
alice.services.startFlow(SendFlow(bobParty, 0)).resultFuture.getOrThrow()
return bobParty
}
private class SendFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() = send(otherParty, payload)
}
private class ReceiveFlow(val otherParty: Party) : FlowLogic<Any>() {
@Suspendable
override fun call() = receive<Any>(otherParty).unwrap { it }
}
}

View File

@ -0,0 +1,53 @@
package net.corda.services.messaging
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import net.corda.testing.messaging.SimpleMQClient
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Test
/**
* Runs the security tests with the attacker pretending to be a node on the network.
*/
class P2PSecurityTest : MQSecurityTest() {
override fun startAttacker(attacker: SimpleMQClient) {
attacker.start(PEER_USER, PEER_USER) // Login as a peer
}
@Test
fun `send message to RPC requests address`() {
assertSendAttackFails(RPC_REQUESTS_QUEUE)
}
@Test
fun `only the node running the broker can login using the special node user`() {
val attacker = SimpleMQClient(alice.configuration.artemisAddress)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.start(NODE_USER, NODE_USER)
}
attacker.stop()
}
@Test
fun `login as the default cluster user`() {
val attacker = SimpleMQClient(alice.configuration.artemisAddress)
assertThatExceptionOfType(ActiveMQClusterSecurityException::class.java).isThrownBy {
attacker.start(ActiveMQDefaultConfiguration.getDefaultClusterUser(), ActiveMQDefaultConfiguration.getDefaultClusterPassword())
}
attacker.stop()
}
@Test
fun `login without a username and password`() {
val attacker = SimpleMQClient(alice.configuration.artemisAddress)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.start()
}
attacker.stop()
}
}

View File

@ -0,0 +1,15 @@
package net.corda.services.messaging
import net.corda.node.services.User
import net.corda.testing.messaging.SimpleMQClient
/**
* Runs the security tests with the attacker being a valid RPC user of Alice.
*/
class RPCSecurityTest : MQSecurityTest() {
override val extraRPCUsers = listOf(User("evil", "pass", permissions = emptySet()))
override fun startAttacker(attacker: SimpleMQClient) {
attacker.loginToRPC(extraRPCUsers[0])
}
}

View File

@ -15,6 +15,7 @@ import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.messaging.RPCOps
@ -118,15 +119,15 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
private lateinit var userService: RPCUserService
override fun makeMessagingService(): MessagingServiceInternal {
val legalIdentity = obtainLegalIdentity()
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null
userService = RPCUserServiceImpl(configuration)
val serverAddr = with(configuration) {
messagingServerAddress ?: {
messageBroker = ArtemisMessagingServer(this, artemisAddress, myIdentityOrNullIfNetworkMapService, services.networkMapCache, userService)
messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache, userService)
artemisAddress
}()
}
val legalIdentity = obtainLegalIdentity()
val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database, networkMapRegistrationFuture)
}
@ -135,12 +136,13 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
messageBroker?.apply {
runOnStop += Runnable { messageBroker?.stop() }
start()
bridgeToNetworkMapService(networkMapService)
if (networkMapService is NetworkMapAddress) {
bridgeToNetworkMapService(networkMapService)
}
}
// Start up the MQ client.
val net = net as NodeMessagingClient
net.configureWithDevSSLCertificate() // TODO: Client might need a separate certificate
net.start(rpcOps, userService)
}
@ -151,7 +153,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
// Export JMX monitoring statistics and data over REST/JSON.
if (configuration.exportJMXto.split(',').contains("http")) {
val classpath = System.getProperty("java.class.path").split(System.getProperty("path.separator"))
val warpath = classpath.firstOrNull() { it.contains("jolokia-agent-war-2") && it.endsWith(".war") }
val warpath = classpath.firstOrNull { it.contains("jolokia-agent-war-2") && it.endsWith(".war") }
if (warpath != null) {
handlerCollection.addHandler(WebAppContext().apply {
// Find the jolokia WAR file on the classpath.
@ -174,7 +176,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
httpsConfiguration.outputBufferSize = 32768
httpsConfiguration.addCustomizer(SecureRequestCustomizer())
val sslContextFactory = SslContextFactory()
sslContextFactory.setKeyStorePath(configuration.keyStorePath.toString())
sslContextFactory.keyStorePath = configuration.keyStorePath.toString()
sslContextFactory.setKeyStorePassword(configuration.keyStorePassword)
sslContextFactory.setKeyManagerPassword(configuration.keyStorePassword)
sslContextFactory.setTrustStorePath(configuration.trustStorePath.toString())

View File

@ -100,7 +100,9 @@ inline fun <reified T : Any> Config.getListOrElse(path: String, default: Config.
* Strictly for dev only automatically construct a server certificate/private key signed from
* the CA certs in Node resources. Then provision KeyStores into certificates folder under node path.
*/
fun NodeSSLConfiguration.configureWithDevSSLCertificate() {
fun NodeConfiguration.configureWithDevSSLCertificate() = configureDevKeyAndTrustStores(myLegalName)
private fun NodeSSLConfiguration.configureDevKeyAndTrustStores(myLegalName: String) {
certificatesPath.createDirectories()
if (!trustStorePath.exists()) {
javaClass.classLoader.getResourceAsStream("net/corda/node/internal/certificates/cordatruststore.jks").copyTo(trustStorePath)
@ -109,7 +111,7 @@ fun NodeSSLConfiguration.configureWithDevSSLCertificate() {
val caKeyStore = X509Utilities.loadKeyStore(
javaClass.classLoader.getResourceAsStream("net/corda/node/internal/certificates/cordadevcakeys.jks"),
"cordacadevpass")
X509Utilities.createKeystoreForSSL(keyStorePath, keyStorePassword, keyStorePassword, caKeyStore, "cordacadevkeypass")
X509Utilities.createKeystoreForSSL(keyStorePath, keyStorePassword, keyStorePassword, caKeyStore, "cordacadevkeypass", myLegalName)
}
}
@ -120,6 +122,6 @@ fun configureTestSSL(): NodeSSLConfiguration = object : NodeSSLConfiguration {
override val trustStorePassword: String get() = "trustpass"
init {
configureWithDevSSLCertificate()
configureDevKeyAndTrustStores("Mega Corp.")
}
}

View File

@ -8,7 +8,6 @@ import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.read
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.services.config.NodeSSLConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
@ -28,12 +27,20 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
System.setProperty("org.jboss.logging.provider", "slf4j")
}
const val PEERS_PREFIX = "peers."
// System users must contain an invalid RPC username character to prevent any chance of name clash which in this
// case is a forward slash
const val NODE_USER = "SystemUsers/Node"
const val PEER_USER = "SystemUsers/Peer"
const val INTERNAL_PREFIX = "internal."
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers."
const val CLIENTS_PREFIX = "clients."
const val P2P_QUEUE = "p2p.inbound"
const val RPC_REQUESTS_QUEUE = "rpc.requests"
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
@JvmStatic
protected val NETWORK_MAP_ADDRESS = SimpleString("${PEERS_PREFIX}networkmap")
val NETWORK_MAP_ADDRESS = SimpleString("${INTERNAL_PREFIX}networkmap")
/**
* Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should
@ -46,27 +53,6 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address")
return addr.hostAndPort
}
/**
* Assuming the passed in target address is actually an ArtemisAddress will extract the queue name used.
* For now the queue name is the Base58 version of the node's identity.
* This should only be used in the internals of the messaging services to keep addressing opaque for the future.
* N.B. Marked as JvmStatic to allow use in the inherited classes.
*/
@JvmStatic
protected fun toQueueName(target: MessageRecipients): SimpleString {
val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address")
return addr.queueName
}
/**
* Convert the identity, host and port of this node into the appropriate [SingleMessageRecipient].
*
* N.B. Marked as JvmStatic to allow use in the inherited classes.
*/
@JvmStatic
protected fun toMyAddress(myIdentity: CompositeKey?, myHostPort: HostAndPort): SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, myHostPort) else NetworkMapAddress(myHostPort)
}
protected interface ArtemisAddress {
@ -74,7 +60,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
val hostAndPort: HostAndPort
}
protected data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
override val queueName: SimpleString get() = NETWORK_MAP_ADDRESS
}
@ -84,18 +70,13 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
* For instance it may contain onion routing data.
*/
data class NodeAddress(val identity: CompositeKey, override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
override val queueName: SimpleString by lazy { SimpleString(PEERS_PREFIX + identity.toBase58String()) }
override val queueName: SimpleString = SimpleString("$PEERS_PREFIX${identity.toBase58String()}")
override fun toString(): String = "${javaClass.simpleName}(identity = $queueName, $hostAndPort)"
}
/** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */
abstract val config: NodeSSLConfiguration
protected fun parseKeyFromQueueName(name: String): CompositeKey {
require(name.startsWith(PEERS_PREFIX))
return CompositeKey.parseFromBase58(name.substring(PEERS_PREFIX.length))
}
protected enum class ConnectionDirection { INBOUND, OUTBOUND }
// Restrict enabled Cipher Suites to AES and GCM as minimum for the bulk cipher.
@ -135,7 +116,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
mapOf(
// Basic TCP target details
TransportConstants.HOST_PROP_NAME to host,
TransportConstants.PORT_PROP_NAME to port.toInt(),
TransportConstants.PORT_PROP_NAME to port,
// Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop
@ -159,10 +140,6 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
)
}
fun configureWithDevSSLCertificate() {
config.configureWithDevSSLCertificate()
}
protected fun Path.expectedOnDefaultFileSystem() {
require(fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
}

View File

@ -4,14 +4,23 @@ import com.google.common.net.HostAndPort
import net.corda.core.ThreadBox
import net.corda.core.crypto.AddressFormatException
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.X509Utilities
import net.corda.core.crypto.X509Utilities.CORDA_CLIENT_CA
import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA
import net.corda.core.crypto.newSecureRandom
import net.corda.core.div
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChangeType
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.printBasicNodeInfo
import net.corda.node.services.RPCUserService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.NODE_USER
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.INBOUND
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.OUTBOUND
import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.NODE_ROLE
import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.PEER_ROLE
import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.RPC_ROLE
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration
@ -21,11 +30,14 @@ import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal
import rx.Subscription
import java.io.IOException
import java.math.BigInteger
import java.security.Principal
import java.security.PublicKey
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import javax.security.auth.Subject
@ -55,10 +67,8 @@ import javax.security.auth.spi.LoginModule
@ThreadSafe
class ArtemisMessagingServer(override val config: NodeConfiguration,
val myHostPort: HostAndPort,
val myIdentity: CompositeKey?,
val networkMapCache: NetworkMapCache,
val userService: RPCUserService) : ArtemisMessagingComponent() {
companion object {
private val log = loggerFor<ArtemisMessagingServer>()
}
@ -70,7 +80,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private val mutex = ThreadBox(InnerState())
private lateinit var activeMQServer: ActiveMQServer
private var networkChangeHandle: Subscription? = null
private val myQueueName = toQueueName(toMyAddress(myIdentity, myHostPort))
init {
config.basedir.expectedOnDefaultFileSystem()
@ -79,7 +88,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
fun start() = mutex.locked {
if (!running) {
configureAndStartServer()
networkChangeHandle = networkMapCache.changed.subscribe { onNetworkChange(it) }
networkChangeHandle = networkMapCache.changed.subscribe { destroyPossibleStaleBridge(it) }
running = true
}
}
@ -91,50 +100,29 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
running = false
}
fun bridgeToNetworkMapService(networkMapService: SingleMessageRecipient?) {
if ((networkMapService != null) && (networkMapService is NetworkMapAddress)) {
val query = activeMQServer.queueQuery(NETWORK_MAP_ADDRESS)
if (!query.isExists) {
activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false)
}
maybeDeployBridgeForAddress(NETWORK_MAP_ADDRESS, networkMapService)
fun bridgeToNetworkMapService(networkMapService: NetworkMapAddress) {
val query = activeMQServer.queueQuery(NETWORK_MAP_ADDRESS)
if (!query.isExists) {
activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false)
}
maybeDeployBridgeForAddress(networkMapService)
}
private fun onNetworkChange(change: NetworkMapCache.MapChange) {
val address = change.node.address
if (address is ArtemisMessagingComponent.ArtemisAddress) {
val queueName = address.queueName
when (change.type) {
NetworkMapCache.MapChangeType.Added -> {
val query = activeMQServer.queueQuery(queueName)
if (query.isExists) {
// Queue exists so now wire up bridge
maybeDeployBridgeForAddress(queueName, change.node.address)
}
}
private fun destroyPossibleStaleBridge(change: NetworkMapCache.MapChange) {
fun removePreviousBridge() {
(change.prevNodeInfo?.address as? ArtemisAddress)?.let {
maybeDestroyBridge(it.queueName)
}
}
NetworkMapCache.MapChangeType.Modified -> {
(change.prevNodeInfo?.address as? ArtemisMessagingComponent.ArtemisAddress)?.let {
// remove any previous possibly different bridge
maybeDestroyBridge(it.queueName)
}
val query = activeMQServer.queueQuery(queueName)
if (query.isExists) {
// Deploy new bridge
maybeDeployBridgeForAddress(queueName, change.node.address)
}
}
NetworkMapCache.MapChangeType.Removed -> {
(change.prevNodeInfo?.address as? ArtemisMessagingComponent.ArtemisAddress)?.let {
// Remove old bridge
maybeDestroyBridge(it.queueName)
}
// just in case of NetworkMapCache version issues
maybeDestroyBridge(queueName)
}
if (change.type == MapChangeType.Modified) {
removePreviousBridge()
} else if (change.type == MapChangeType.Removed) {
removePreviousBridge()
// TODO Fix the network map change data classes so that the remove event doesn't have two NodeInfo fields
val address = change.node.address
if (address is ArtemisAddress) {
maybeDestroyBridge(address.queueName)
}
}
}
@ -142,71 +130,75 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private fun configureAndStartServer() {
val config = createArtemisConfig()
val securityManager = createArtemisSecurityManager()
activeMQServer = ActiveMQServerImpl(config, securityManager).apply {
// Throw any exceptions which are detected during startup
registerActivationFailureListener { exception -> throw exception }
// Some types of queue might need special preparation on our side, like dialling back or preparing
// a lazily initialised subsystem.
registerPostQueueCreationCallback { queueName ->
log.debug("Queue created: $queueName")
if (queueName.startsWith(PEERS_PREFIX) && queueName != NETWORK_MAP_ADDRESS && queueName != myQueueName) {
try {
val identity = parseKeyFromQueueName(queueName.toString())
val nodeInfo = networkMapCache.getNodeByCompositeKey(identity)
if (nodeInfo != null) {
maybeDeployBridgeForAddress(queueName, nodeInfo.address)
} else {
log.error("Queue created for a peer that we don't know from the network map: $queueName")
}
} catch (e: AddressFormatException) {
log.error("Flow violation: Could not parse queue name as Base 58: $queueName")
}
}
}
registerPostQueueDeletionCallback { address, qName ->
if (qName == address)
log.debug("Queue deleted: $qName")
else
log.debug("Queue deleted: $qName for $address")
}
registerPostQueueCreationCallback { deployBridgeFromNewPeerQueue(it) }
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
}
activeMQServer.start()
printBasicNodeInfo("Node listening on address", myHostPort.toString())
}
private fun deployBridgeFromNewPeerQueue(queueName: SimpleString) {
log.debug { "Queue created: $queueName" }
if (!queueName.startsWith(PEERS_PREFIX)) return
try {
val identity = CompositeKey.parseFromBase58(queueName.substring(PEERS_PREFIX.length))
val nodeInfo = networkMapCache.getNodeByCompositeKey(identity)
if (nodeInfo != null) {
val address = nodeInfo.address
if (address is NodeAddress) {
maybeDeployBridgeForAddress(address)
} else {
log.error("Don't know how to deal with $address")
}
} else {
log.error("Queue created for a peer that we don't know from the network map: $queueName")
}
} catch (e: AddressFormatException) {
log.error("Flow violation: Could not parse queue name as Base 58: $queueName")
}
}
private fun createArtemisConfig(): Configuration = ConfigurationImpl().apply {
val artemisDir = config.basedir / "artemis"
bindingsDirectory = (artemisDir / "bindings").toString()
journalDirectory = (artemisDir / "journal").toString()
largeMessagesDirectory = (artemisDir / "largemessages").toString()
acceptorConfigurations = setOf(
tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", myHostPort.port)
)
largeMessagesDirectory = (artemisDir / "large-messages").toString()
acceptorConfigurations = setOf(tcpTransport(INBOUND, "0.0.0.0", myHostPort.port))
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
isPersistIDCache = true
isPopulateValidatedUser = true
configureQueueSecurity()
managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS)
// Artemis allows multiple servers to be grouped together into a cluster for load balancing purposes. The cluster
// user is used for connecting the nodes together. It has super-user privileges and so it's imperative that its
// password is changed from the default (as warned in the docs). Since we don't need this feature we turn it off
// by having its password be an unknown securely random 128-bit value.
clusterPassword = BigInteger(128, newSecureRandom()).toString(16)
configureAddressSecurity()
}
private fun ConfigurationImpl.configureQueueSecurity() {
val nodeRPCSendRole = restrictedRole(NODE_USER, send = true) // The node needs to be able to send responses on the client queues
/**
* Authenticated clients connecting to us fall in one of three groups:
* 1. The node hosting us and any of its logically connected components. These are given full access to all valid queues.
* 2. Peers on the same network as us. These are only given permission to send to our P2P inbound queue.
* 3. RPC users. These are only given sufficient access to perform RPC with us.
*/
private fun ConfigurationImpl.configureAddressSecurity() {
val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true)
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
securityRoles[P2P_QUEUE] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
securityRoles[RPC_REQUESTS_QUEUE] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true))
for ((username) in userService.users) {
// Clients need to be able to consume the responses on their queues, and they're also responsible for creating and destroying them
val clientRole = restrictedRole(username, consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true)
securityRoles["$CLIENTS_PREFIX$username.rpc.*"] = setOf(nodeRPCSendRole, clientRole)
securityRoles["$CLIENTS_PREFIX$username.rpc.*"] = setOf(
nodeInternalRole,
restrictedRole("$CLIENTS_PREFIX$username", consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true))
}
// TODO Restrict this down to just what the node needs
securityRoles["#"] = setOf(Role(NODE_USER, true, true, true, true, true, true, true, true))
securityRoles[RPC_REQUESTS_QUEUE] = setOf(
restrictedRole(NODE_USER, createNonDurableQueue = true, deleteNonDurableQueue = true),
restrictedRole(RPC_REQUESTS_QUEUE, send = true)) // Clients need to be able to send their requests
}
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
@ -217,14 +209,22 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
}
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
val ourRootCAPublicKey = X509Utilities
.loadCertificateFromKeyStore(config.trustStorePath, config.trustStorePassword, CORDA_ROOT_CA)
.publicKey
val ourPublicKey = X509Utilities
.loadCertificateFromKeyStore(config.keyStorePath, config.keyStorePassword, CORDA_CLIENT_CA)
.publicKey
val securityConfig = object : SecurityConfiguration() {
// Override to make it work with our login module
override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> {
val options = mapOf(RPCUserService::class.java.name to userService)
val options = mapOf(
RPCUserService::class.java.name to userService,
CORDA_ROOT_CA to ourRootCAPublicKey,
CORDA_CLIENT_CA to ourPublicKey)
return arrayOf(AppConfigurationEntry(name, REQUIRED, options))
}
}
return ActiveMQJAASSecurityManager(NodeLoginModule::class.java.name, securityConfig)
}
@ -232,39 +232,39 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private fun addConnector(hostAndPort: HostAndPort) = activeMQServer.configuration.addConnectorConfiguration(
hostAndPort.toString(),
tcpTransport(
ConnectionDirection.OUTBOUND,
hostAndPort.hostText,
hostAndPort.port
)
tcpTransport(OUTBOUND, hostAndPort.hostText, hostAndPort.port)
)
private fun bridgeExists(name: SimpleString) = activeMQServer.clusterManager.bridges.containsKey(name.toString())
private fun deployBridge(hostAndPort: HostAndPort, name: String) {
activeMQServer.deployBridge(BridgeConfiguration().apply {
setName(name)
queueName = name
forwardingAddress = name
staticConnectors = listOf(hostAndPort.toString())
confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridges automatic deduplication logic
})
private fun maybeDeployBridgeForAddress(address: ArtemisAddress) {
if (!connectorExists(address.hostAndPort)) {
addConnector(address.hostAndPort)
}
if (!bridgeExists(address.queueName)) {
deployBridge(address)
}
}
/**
* For every queue created we need to have a bridge deployed in case the address of the queue
* is that of a remote party.
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
* messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it,
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
* P2P address.
*/
private fun maybeDeployBridgeForAddress(name: SimpleString, nodeInfo: SingleMessageRecipient) {
require(name.startsWith(PEERS_PREFIX))
val hostAndPort = toHostAndPort(nodeInfo)
if (hostAndPort == myHostPort)
return
if (!connectorExists(hostAndPort))
addConnector(hostAndPort)
if (!bridgeExists(name))
deployBridge(hostAndPort, name.toString())
private fun deployBridge(address: ArtemisAddress) {
activeMQServer.deployBridge(BridgeConfiguration().apply {
name = address.queueName.toString()
queueName = address.queueName.toString()
forwardingAddress = P2P_QUEUE
staticConnectors = listOf(address.hostAndPort.toString())
confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
// As a peer of the target node we must connect to it using the peer user. Actual authentication is done using
// our TLS certificate.
user = PEER_USER
password = PEER_USER
})
}
private fun maybeDestroyBridge(name: SimpleString) {
@ -273,52 +273,89 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
}
}
// We could have used the built-in PropertiesLoginModule but that exposes a roles properties file. Roles are used
// for queue access control and our RPC users must only have access to the queues they need and this cannot be allowed
// to be modified.
/**
* Clients must connect to us with a username and password and must use TLS. If a someone connects with
* [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate
* is the same as our one in our key store. Then they're given full access to all valid queues. If they connect with
* [ArtemisMessagingComponent.PEER_USER] then we confirm they belong on our P2P network by checking their root CA is
* the same as our root CA. If that's the case the only access they're given is the ablility send to our P2P address.
* In both cases the messages these authenticated nodes send to us are tagged with their subject DN and we assume
* the CN within that is their legal name.
* Otherwise if the username is neither of the above we assume it's an RPC user and authenticate against our list of
* valid RPC users. RPC clients are given permission to perform RPC and nothing else.
*/
class NodeLoginModule : LoginModule {
companion object {
const val NODE_USER = "Node"
// Include forbidden username character to prevent name clash with any RPC usernames
const val PEER_ROLE = "SystemRoles/Peer"
const val NODE_ROLE = "SystemRoles/Node"
const val RPC_ROLE = "SystemRoles/RPC"
}
private var loginSucceeded: Boolean = false
private lateinit var subject: Subject
private lateinit var callbackHandler: CallbackHandler
private lateinit var userService: RPCUserService
private lateinit var ourRootCAPublicKey: PublicKey
private lateinit var ourPublicKey: PublicKey
private val principals = ArrayList<Principal>()
override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map<String, *>, options: Map<String, *>) {
this.subject = subject
this.callbackHandler = callbackHandler
userService = options[RPCUserService::class.java.name] as RPCUserService
ourRootCAPublicKey = options[CORDA_ROOT_CA] as PublicKey
ourPublicKey = options[CORDA_CLIENT_CA] as PublicKey
}
override fun login(): Boolean {
val nameCallback = NameCallback("Username: ")
val passwordCallback = PasswordCallback("Password: ", false)
val certificateCallback = CertificateCallback()
try {
callbackHandler.handle(arrayOf(nameCallback, passwordCallback))
callbackHandler.handle(arrayOf(nameCallback, passwordCallback, certificateCallback))
} catch (e: IOException) {
throw LoginException(e.message)
} catch (e: UnsupportedCallbackException) {
throw LoginException("${e.message} not available to obtain information from user")
}
val username = nameCallback.name ?: throw FailedLoginException("User name is null")
val receivedPassword = passwordCallback.password ?: throw FailedLoginException("Password is null")
val password = if (username == NODE_USER) "Node" else userService.getUser(username)?.password ?: throw FailedLoginException("User does not exist")
if (password != String(receivedPassword)) {
throw FailedLoginException("Password does not match")
val username = nameCallback.name ?: throw FailedLoginException("Username not provided")
val password = String(passwordCallback.password ?: throw FailedLoginException("Password not provided"))
val validatedUser = if (username == PEER_USER || username == NODE_USER) {
val certificates = certificateCallback.certificates ?: throw FailedLoginException("No TLS?")
val peerCertificate = certificates.first()
val role = if (username == NODE_USER) {
if (peerCertificate.publicKey != ourPublicKey) {
throw FailedLoginException("Only the node can login as $NODE_USER")
}
NODE_ROLE
} else {
val theirRootCAPublicKey = certificates.last().publicKey
if (theirRootCAPublicKey != ourRootCAPublicKey) {
throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey")
}
PEER_ROLE // This enables the peer to send to our P2P address
}
principals += RolePrincipal(role)
peerCertificate.subjectDN.name
} else {
// Otherwise assume they're an RPC user
val rpcUser = userService.getUser(username) ?: throw FailedLoginException("User does not exist")
if (password != rpcUser.password) {
// TODO Switch to hashed passwords
// TODO Retrieve client IP address to include in exception message
throw FailedLoginException("Password for user $username does not match")
}
principals += RolePrincipal(RPC_ROLE) // This enables the RPC client to send requests
principals += RolePrincipal("$CLIENTS_PREFIX$username") // This enables the RPC client to receive responses
username
}
principals += UserPrincipal(username)
principals += RolePrincipal(username) // The roles are configured using the usernames
if (username != NODE_USER) {
principals += RolePrincipal(RPC_REQUESTS_QUEUE) // This enables the RPC client to send requests
}
principals += UserPrincipal(validatedUser)
loginSucceeded = true
return loginSucceeded
@ -347,5 +384,4 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
loginSucceeded = false
}
}
}

View File

@ -19,6 +19,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
@ -56,15 +57,16 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val database: Database,
val networkMapRegistrationFuture: ListenableFuture<Unit>) : ArtemisMessagingComponent(), MessagingServiceInternal {
companion object {
val log = loggerFor<NodeMessagingClient>()
private val log = loggerFor<NodeMessagingClient>()
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
// confusion.
val TOPIC_PROPERTY = "platform-topic"
const val TOPIC_PROPERTY = "platform-topic"
const val SESSION_ID_PROPERTY = "session-id"
val SESSION_ID_PROPERTY = "session-id"
const val RPC_QUEUE_REMOVALS_QUEUE = "rpc.qremovals"
/**
* This should be the only way to generate an ArtemisAddress and that only of the remote NetworkMapService node.
@ -77,11 +79,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
private class InnerState {
var started = false
var running = false
val knownQueues = mutableSetOf<SimpleString>()
var producer: ClientProducer? = null
var p2pConsumer: ClientConsumer? = null
var session: ClientSession? = null
var clientFactory: ClientSessionFactory? = null
var rpcDispatcher: RPCDispatcher? = null
// Consumer for inbound client RPC messages.
var rpcConsumer: ClientConsumer? = null
var rpcNotificationConsumer: ClientConsumer? = null
@ -89,12 +91,12 @@ class NodeMessagingClient(override val config: NodeConfiguration,
/** A registration to handle messages of different types */
data class Handler(val topicSession: TopicSession,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
/**
* Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache.
*/
override val myAddress: SingleMessageRecipient = toMyAddress(myIdentity, serverHostPort)
override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort)
private val state = ThreadBox(InnerState())
private val handlers = CopyOnWriteArrayList<Handler>()
@ -104,13 +106,12 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(
object : AbstractJDBCHashSet<UUID, Table>(Table, loadOnInit = true) {
override fun elementFromRow(row: ResultRow): UUID = row[table.uuid]
override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry
}
})
object : AbstractJDBCHashSet<UUID, Table>(Table, loadOnInit = true) {
override fun elementFromRow(row: ResultRow): UUID = row[table.uuid]
override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) {
insert[table.uuid] = entry
}
})
fun start(rpcOps: RPCOps, userService: RPCUserService) {
state.locked {
@ -118,14 +119,15 @@ class NodeMessagingClient(override val config: NodeConfiguration,
started = true
log.info("Connecting to server: $serverHostPort")
// Connect to our server. TODO: This should use the in-VM transport.
val tcpTransport = tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
clientFactory = locator.createSessionFactory()
// Create a session. Note that the acknowledgement of messages is not flushed to
// the Artermis journal until the default buffer size of 1MB is acknowledged.
val session = clientFactory!!.createSession("Node", "Node", false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
// Login using the node username. The broker will authentiate us as its node (as opposed to another peer)
// using our TLS certificate.
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
// size of 1MB is acknowledged.
val session = clientFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
this.session = session
session.start()
@ -133,22 +135,17 @@ class NodeMessagingClient(override val config: NodeConfiguration,
producer = session.createProducer()
// Create a queue, consumer and producer for handling P2P network messages.
val queueName = toQueueName(myAddress)
val query = session.queueQuery(queueName)
if (!query.isExists) {
session.createQueue(queueName, queueName, true)
}
knownQueues.add(queueName)
p2pConsumer = makeConsumer(session, queueName, true)
createQueueIfAbsent(SimpleString(P2P_QUEUE))
p2pConsumer = makeP2PConsumer(session, true)
networkMapRegistrationFuture.success {
state.locked {
log.info("Network map is complete, so removing filter from Artemis consumer.")
log.info("Network map is complete, so removing filter from P2P consumer.")
try {
p2pConsumer!!.close()
} catch(e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do.
}
p2pConsumer = makeConsumer(session, queueName, false)
p2pConsumer = makeP2PConsumer(session, false)
}
}
@ -156,10 +153,12 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// bridge) and those clients must have authenticated. We could use a single consumer for everything
// and perhaps we should, but these queues are not worth persisting.
session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1")
// The custom name for the queue is intentional - we may wish other things to subscribe to the
// NOTIFICATIONS_ADDRESS with different filters in future
session.createTemporaryQueue(NOTIFICATIONS_ADDRESS, RPC_QUEUE_REMOVALS_QUEUE, "_AMQ_NotifType = 1")
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer("rpc.qremovals")
dispatcher = createRPCDispatcher(rpcOps, userService)
rpcNotificationConsumer = session.createConsumer(RPC_QUEUE_REMOVALS_QUEUE)
rpcDispatcher = createRPCDispatcher(rpcOps, userService)
}
}
@ -168,13 +167,13 @@ class NodeMessagingClient(override val config: NodeConfiguration,
* the original and make another without a filter. We do this so that there is a network map in place for all other
* message handlers.
*/
private fun makeConsumer(session: ClientSession, queueName: SimpleString, networkMapOnly: Boolean): ClientConsumer {
private fun makeP2PConsumer(session: ClientSession, networkMapOnly: Boolean): ClientConsumer {
return if (networkMapOnly) {
// Filter for just the network map messages.
val messageFilter = SimpleString("hyphenated_props:$TOPIC_PROPERTY like 'platform.network_map.%'")
session.createConsumer(queueName, messageFilter)
val messageFilter = "hyphenated_props:$TOPIC_PROPERTY like 'platform.network_map.%'"
session.createConsumer(P2P_QUEUE, messageFilter)
} else
session.createConsumer(queueName)
session.createConsumer(P2P_QUEUE)
}
private var shutdownLatch = CountDownLatch(1)
@ -194,7 +193,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
null
} ?: return false
val message: Message? = artemisToCordaMessage(artemisMessage)
val message: ReceivedMessage? = artemisToCordaMessage(artemisMessage)
if (message != null)
deliver(message)
@ -217,11 +216,10 @@ class NodeMessagingClient(override val config: NodeConfiguration,
private fun runPreNetworkMap() {
val consumer = state.locked {
check(started)
check(started) { "start must be called first" }
check(!running) { "run can't be called twice" }
running = true
// Optionally, start RPC dispatch.
dispatcher?.start(rpcConsumer!!, rpcNotificationConsumer!!, executor)
rpcDispatcher!!.start(rpcConsumer!!, rpcNotificationConsumer!!, executor)
p2pConsumer!!
}
@ -254,7 +252,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
shutdownLatch.countDown()
}
private fun artemisToCordaMessage(message: ClientMessage): Message? {
private fun artemisToCordaMessage(message: ClientMessage): ReceivedMessage? {
try {
if (!message.containsProperty(TOPIC_PROPERTY)) {
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
@ -268,17 +266,18 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
// Use the magic deduplication property built into Artemis as our message identity too
val uuid = UUID.fromString(message.getStringProperty(HDR_DUPLICATE_DETECTION_ID))
val user = message.getStringProperty(HDR_VALIDATED_USER)
val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" }
log.info("Received message from: ${message.address} user: $user topic: $topic sessionID: $sessionID uuid: $uuid")
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
val msg = object : Message {
val msg = object : ReceivedMessage {
override val topicSession = TopicSession(topic, sessionID)
override val data: ByteArray = body
override val peer: X500Name = X500Name(user)
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
override val uniqueMessageId: UUID = uuid
override fun toString() = topic + "#" + data.opaque()
override fun toString() = "$topic#${data.opaque()}"
}
return msg
@ -288,7 +287,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
private fun deliver(msg: Message): Boolean {
private fun deliver(msg: ReceivedMessage): Boolean {
state.checkNotLocked()
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
// or removed whilst the filter is executing will not affect anything.
@ -324,7 +323,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
return true
}
private fun callHandlers(msg: Message, deliverTo: List<Handler>) {
private fun callHandlers(msg: ReceivedMessage, deliverTo: List<Handler>) {
for (handler in deliverTo) {
handler.callback(msg, handler)
}
@ -368,40 +367,54 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
override fun send(message: Message, target: MessageRecipients) {
val queueName = toQueueName(target)
state.locked {
val mqAddress = getMQAddress(target)
val artemisMessage = session!!.createMessage(true).apply {
val sessionID = message.topicSession.sessionID
putStringProperty(TOPIC_PROPERTY, message.topicSession.topic)
putLongProperty(SESSION_ID_PROPERTY, sessionID)
writeBodyBufferBytes(message.data)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString()))
}
if (knownQueues.add(queueName)) {
maybeCreateQueue(queueName)
}
log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}")
producer!!.send(queueName, artemisMessage)
log.info("Send to: $mqAddress topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}")
producer!!.send(mqAddress, artemisMessage)
}
}
private fun maybeCreateQueue(queueName: SimpleString) {
private fun getMQAddress(target: MessageRecipients): SimpleString {
return if (target == myAddress) {
// If we are sending to ourselves then route the message directly to our P2P queue.
SimpleString(P2P_QUEUE)
} else {
// Otherwise we send the message to an internal queue for the target residing on our broker. It's then the
// broker's job to route the message to the target's P2P queue.
val internalTargetQueue = (target as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address")
createQueueIfAbsent(internalTargetQueue)
internalTargetQueue
}
}
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
private fun createQueueIfAbsent(queueName: SimpleString) {
state.alreadyLocked {
val queueQuery = session!!.queueQuery(queueName)
if (!queueQuery.isExists) {
log.info("Create fresh queue $queueName")
session!!.createQueue(queueName, queueName, true /* durable */)
log.info("Create fresh queue $queueName bound on same address")
session!!.createQueue(queueName, queueName, true)
}
}
}
override fun addMessageHandler(topic: String, sessionID: Long, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
= addMessageHandler(TopicSession(topic, sessionID), callback)
override fun addMessageHandler(topic: String,
sessionID: Long,
callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
return addMessageHandler(TopicSession(topic, sessionID), callback)
}
override fun addMessageHandler(topicSession: TopicSession,
callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." }
val handler = Handler(topicSession, callback)
handlers.add(handler)
@ -423,8 +436,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
var dispatcher: RPCDispatcher? = null
private fun createRPCDispatcher(ops: RPCOps, userService: RPCUserService) = object : RPCDispatcher(ops, userService) {
override fun send(data: SerializedBytes<*>, toAddress: String) {
state.locked {

View File

@ -143,8 +143,8 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService) {
/** Convert an Artemis [ClientMessage] to a MQ-neutral [ClientRPCRequestMessage]. */
private fun ClientMessage.toRPCRequestMessage(): ClientRPCRequestMessage {
val user = getUser(this)
val replyTo = getAuthenticatedAddress(user, ClientRPCRequestMessage.REPLY_TO, true)!!
val observationsTo = getAuthenticatedAddress(user, ClientRPCRequestMessage.OBSERVATIONS_TO, false)
val replyTo = getReturnAddress(user, ClientRPCRequestMessage.REPLY_TO, true)!!
val observationsTo = getReturnAddress(user, ClientRPCRequestMessage.OBSERVATIONS_TO, false)
val argBytes = ByteArray(bodySize).apply { bodyBuffer.readBytes(this) }
if (argBytes.isEmpty()) {
throw RPCException("empty serialized args")
@ -158,12 +158,11 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService) {
return userService.getUser(message.requiredString(Message.HDR_VALIDATED_USER.toString()))!!
}
private fun ClientMessage.getAuthenticatedAddress(user: User, property: String, required: Boolean): String? {
val address: String? = if (required) requiredString(property) else getStringProperty(property)
val expectedAddressPrefix = "${ArtemisMessagingComponent.CLIENTS_PREFIX}${user.username}."
if (address != null && !address.startsWith(expectedAddressPrefix)) {
throw RPCException("$property address does not match up with the user")
private fun ClientMessage.getReturnAddress(user: User, property: String, required: Boolean): String? {
return if (containsProperty(property)) {
"${ArtemisMessagingComponent.CLIENTS_PREFIX}${user.username}.rpc.${getLongProperty(property)}"
} else {
if (required) throw RPCException("missing $property property") else null
}
return address
}
}

View File

@ -196,7 +196,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val session = FlowSession(sessionFlow, nodeIdentity, random63BitValue(), null)
openSessions[Pair(sessionFlow, nodeIdentity)] = session
val counterpartyFlow = sessionFlow.getCounterpartyMarker(nodeIdentity).name
val sessionInit = SessionInit(session.ourSessionId, serviceHub.myInfo.legalIdentity, counterpartyFlow, firstPayload)
val sessionInit = SessionInit(session.ourSessionId, counterpartyFlow, firstPayload)
val sessionInitResponse = sendAndReceiveInternal<SessionInitResponse>(session, sessionInit)
if (sessionInitResponse is SessionConfirm) {
session.otherPartySessionId = sessionInitResponse.initiatedSessionId

View File

@ -211,7 +211,18 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val sessionMessage = message.data.deserialize<SessionMessage>()
when (sessionMessage) {
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage)
is SessionInit -> onSessionInit(sessionMessage)
is SessionInit -> {
// TODO SECURITY Look up the party with the full X.500 name instead of just the legal name which
// isn't required to be unique
// TODO For now have the doorman block signups with identical names, and names with characters that
// are used in X.500 name textual serialisation
val otherParty = serviceHub.networkMapCache.getNodeByLegalName(message.peerLegalName)?.legalIdentity
if (otherParty != null) {
onSessionInit(sessionMessage, otherParty)
} else {
logger.error("Unknown peer ${message.peer} in $sessionMessage")
}
}
}
}
}
@ -254,10 +265,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun onSessionInit(sessionInit: SessionInit) {
logger.trace { "Received $sessionInit" }
//TODO Verify the other party are who they say they are from the TLS subsystem
val otherParty = sessionInit.initiatorParty
private fun onSessionInit(sessionInit: SessionInit, otherParty: Party) {
logger.trace { "Received $sessionInit $otherParty" }
val otherPartySessionId = sessionInit.initiatorSessionId
try {
val markerClass = Class.forName(sessionInit.flowName)
@ -446,10 +455,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val recipientSessionId: Long
}
data class SessionInit(val initiatorSessionId: Long,
val initiatorParty: Party,
val flowName: String,
val firstPayload: Any?) : SessionMessage
data class SessionInit(val initiatorSessionId: Long, val flowName: String, val firstPayload: Any?) : SessionMessage
interface SessionInitResponse : ExistingSessionMessage

View File

@ -1,5 +1,3 @@
@file:Suppress("UNUSED_VARIABLE")
package net.corda.node.messaging
import net.corda.core.messaging.Message
@ -9,7 +7,6 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.node.services.ServiceInfo
import net.corda.node.services.network.NetworkMapService
import net.corda.testing.node.MockNetwork
import org.junit.Before
import org.junit.Test
import java.util.*
import kotlin.test.assertEquals
@ -17,12 +14,7 @@ import kotlin.test.assertFails
import kotlin.test.assertTrue
class InMemoryMessagingTests {
lateinit var network: MockNetwork
@Before
fun setUp() {
network = MockNetwork()
}
val network = MockNetwork()
@Test
fun topicStringValidation() {
@ -115,6 +107,5 @@ class InMemoryMessagingTests {
node2.net.send(validMessage2, node1.net.myAddress)
network.runNetwork()
assertEquals(2, received)
}
}

View File

@ -13,17 +13,19 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.utilities.LogHelper
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.messaging.RPCOps
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.freeLocalHostAndPort
import net.corda.testing.node.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.jetbrains.exposed.sql.Database
import org.junit.After
@ -39,7 +41,6 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue
class ArtemisMessagingTests {
@Rule @JvmField val temporaryFolder = TemporaryFolder()
@ -57,7 +58,6 @@ class ArtemisMessagingTests {
var messagingClient: NodeMessagingClient? = null
var messagingServer: ArtemisMessagingServer? = null
val networkMapCache = InMemoryNetworkMapCache()
val rpcOps = object : RPCOps {
@ -196,7 +196,7 @@ class ArtemisMessagingTests {
createAndStartClientAndServer(receivedMessages)
for (iter in 1..iterations) {
val firstActual: Message = receivedMessages.take()
assertTrue(String(firstActual.data).equals("first msg $iter"))
assertThat(String(firstActual.data)).isEqualTo("first msg $iter")
}
assertNull(receivedMessages.poll(200, MILLISECONDS))
}
@ -223,16 +223,22 @@ class ArtemisMessagingTests {
private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient {
return databaseTransaction(database) {
NodeMessagingClient(config, server, identity.public.composite, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), database, networkMapRegistrationFuture).apply {
configureWithDevSSLCertificate()
NodeMessagingClient(
config,
server,
identity.public.composite,
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
database,
networkMapRegistrationFuture).apply {
config.configureWithDevSSLCertificate()
messagingClient = this
}
}
}
private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer {
return ArtemisMessagingServer(config, local, identity.public.composite, networkMapCache, userService).apply {
configureWithDevSSLCertificate()
return ArtemisMessagingServer(config, local, networkMapCache, userService).apply {
config.configureWithDevSSLCertificate()
messagingServer = this
}
}

View File

@ -197,14 +197,14 @@ class StateMachineManagerTests {
assertThat(node3Flow.receivedPayloads[0]).isEqualTo(payload)
assertSessionTransfers(node2,
node1 sent sessionInit(node1, SendFlow::class, payload) to node2,
node1 sent sessionInit(SendFlow::class, payload) to node2,
node2 sent sessionConfirm() to node1,
node1 sent sessionEnd() to node2
//There's no session end from the other flows as they're manually suspended
)
assertSessionTransfers(node3,
node1 sent sessionInit(node1, SendFlow::class, payload) to node3,
node1 sent sessionInit(SendFlow::class, payload) to node3,
node3 sent sessionConfirm() to node1,
node1 sent sessionEnd() to node3
//There's no session end from the other flows as they're manually suspended
@ -230,14 +230,14 @@ class StateMachineManagerTests {
assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(node3Payload)
assertSessionTransfers(node2,
node1 sent sessionInit(node1, ReceiveThenSuspendFlow::class) to node2,
node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node2,
node2 sent sessionConfirm() to node1,
node2 sent sessionData(node2Payload) to node1,
node2 sent sessionEnd() to node1
)
assertSessionTransfers(node3,
node1 sent sessionInit(node1, ReceiveThenSuspendFlow::class) to node3,
node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node3,
node3 sent sessionConfirm() to node1,
node3 sent sessionData(node3Payload) to node1,
node3 sent sessionEnd() to node1
@ -251,7 +251,7 @@ class StateMachineManagerTests {
net.runNetwork()
assertSessionTransfers(
node1 sent sessionInit(node1, PingPongFlow::class, 10L) to node2,
node1 sent sessionInit(PingPongFlow::class, 10L) to node2,
node2 sent sessionConfirm() to node1,
node2 sent sessionData(20L) to node1,
node1 sent sessionData(11L) to node2,
@ -267,7 +267,7 @@ class StateMachineManagerTests {
net.runNetwork()
assertThatThrownBy { future.getOrThrow() }.isInstanceOf(FlowSessionException::class.java)
assertSessionTransfers(
node1 sent sessionInit(node1, ReceiveThenSuspendFlow::class) to node2,
node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node2,
node2 sent sessionConfirm() to node1,
node2 sent sessionEnd() to node1
)
@ -288,9 +288,7 @@ class StateMachineManagerTests {
return smm.findStateMachines(P::class.java).single()
}
private fun sessionInit(initiatorNode: MockNode, flowMarker: KClass<*>, payload: Any? = null): SessionInit {
return SessionInit(0, initiatorNode.info.legalIdentity, flowMarker.java.name, payload)
}
private fun sessionInit(flowMarker: KClass<*>, payload: Any? = null) = SessionInit(0, flowMarker.java.name, payload)
private fun sessionConfirm() = SessionConfirm(0, 0)
@ -314,7 +312,7 @@ class StateMachineManagerTests {
private fun Observable<MessageTransfer>.toSessionTransfers(): Observable<SessionTransfer> {
return filter { it.message.topicSession == StateMachineManager.sessionTopic }.map {
val from = it.sender.myAddress.id
val from = it.sender.id
val message = it.message.data.deserialize<SessionMessage>()
val to = (it.recipients as InMemoryMessagingNetwork.Handle).id
SessionTransfer(from, sanitise(message), to)
@ -371,7 +369,6 @@ class StateMachineManagerTests {
@Suspendable
override fun call() {
receivedPayloads = otherParties.map { receive<Any>(it).unwrap { it } }
println(receivedPayloads)
Fiber.park()
}
}
@ -384,9 +381,7 @@ class StateMachineManagerTests {
@Suspendable
override fun call() {
receivedPayload = sendAndReceive<Long>(otherParty, payload).unwrap { it }
println("${fsm.id} Received $receivedPayload")
receivedPayload2 = sendAndReceive<Long>(otherParty, payload + 1).unwrap { it }
println("${fsm.id} Received $receivedPayload2")
}
}

View File

@ -109,7 +109,7 @@ class NetworkMapVisualiser : Application() {
}
// Fire the message bullets between nodes.
simulation.network.messagingNetwork.sentMessages.observeOn(uiThread).subscribe { msg: InMemoryMessagingNetwork.MessageTransfer ->
val senderNode: MockNetwork.MockNode = simulation.network.addressToNode(msg.sender.myAddress)
val senderNode: MockNetwork.MockNode = simulation.network.addressToNode(msg.sender)
val destNode: MockNetwork.MockNode = simulation.network.addressToNode(msg.recipients as SingleMessageRecipient)
if (transferIsInteresting(msg)) {
@ -345,7 +345,7 @@ class NetworkMapVisualiser : Application() {
private fun transferIsInteresting(transfer: InMemoryMessagingNetwork.MessageTransfer): Boolean {
// Loopback messages are boring.
if (transfer.sender.myAddress == transfer.recipients) return false
if (transfer.sender == transfer.recipients) return false
// Network map push acknowledgements are boring.
if (NetworkMapService.PUSH_ACK_FLOW_TOPIC in transfer.message.topicSession.topic) return false
val message = transfer.message.data.deserialize<Any>()

View File

@ -0,0 +1,36 @@
package net.corda.testing.messaging
import com.google.common.net.HostAndPort
import net.corda.node.services.config.NodeSSLConfiguration
import net.corda.node.services.config.configureTestSSL
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.OUTBOUND
import org.apache.activemq.artemis.api.core.client.*
/**
* As the name suggests this is a simple client for connecting to MQ brokers.
*/
class SimpleMQClient(val target: HostAndPort) : ArtemisMessagingComponent() {
override val config: NodeSSLConfiguration = configureTestSSL()
lateinit var sessionFactory: ClientSessionFactory
lateinit var session: ClientSession
lateinit var producer: ClientProducer
fun start(username: String? = null, password: String? = null) {
val tcpTransport = tcpTransport(OUTBOUND, target.hostText, target.port)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
isBlockOnNonDurableSend = true
threadPoolMaxSize = 1
}
sessionFactory = locator.createSessionFactory()
session = sessionFactory.createSession(username, password, false, true, true, locator.isPreAcknowledge, locator.ackBatchSize)
session.start()
producer = session.createProducer()
}
fun createMessage(): ClientMessage = session.createMessage(false)
fun stop() {
sessionFactory.close()
}
}

View File

@ -5,6 +5,7 @@ import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.ThreadBox
import net.corda.core.getOrThrow
import net.corda.core.crypto.X509Utilities
import net.corda.core.messaging.*
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.trace
@ -14,6 +15,7 @@ import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.JDBCHashSet
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging
import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database
import org.slf4j.LoggerFactory
import rx.Observable
@ -43,8 +45,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
private var counter = 0 // -1 means stopped.
private val handleEndpointMap = HashMap<Handle, InMemoryMessaging>()
data class MessageTransfer(val sender: InMemoryMessaging, val message: Message, val recipients: MessageRecipients) {
override fun toString() = "${message.topicSession} from '${sender.myAddress}' to '$recipients'"
data class MessageTransfer(val sender: Handle, val message: Message, val recipients: MessageRecipients) {
override fun toString() = "${message.topicSession} from '$sender' to '$recipients'"
}
// All sent messages are kept here until pumpSend is called, or manuallyPumped is set to false
@ -85,8 +87,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
@Synchronized
fun createNode(manuallyPumped: Boolean,
executor: AffinityExecutor,
database: Database)
: Pair<Handle, MessagingServiceBuilder<InMemoryMessaging>> {
database: Database): Pair<Handle, MessagingServiceBuilder<InMemoryMessaging>> {
check(counter >= 0) { "In memory network stopped: please recreate." }
val builder = createNodeWithID(manuallyPumped, counter, executor, database = database) as Builder
counter++
@ -118,8 +119,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
@Synchronized
private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) {
val transfer = MessageTransfer(from, message, recipients)
messageSendQueue.add(transfer)
messageSendQueue += MessageTransfer(from.myAddress, message, recipients)
}
@Synchronized
@ -164,15 +164,13 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
// If block is set to true this function will only return once a message has been pushed onto the recipients' queues
fun pumpSend(block: Boolean): MessageTransfer? {
val transfer = (if (block) messageSendQueue.take() else messageSendQueue.poll()) ?: return null
val recipients = transfer.recipients
val from = transfer.sender.myAddress
log.trace { transfer.toString() }
val calc = latencyCalculator
if (calc != null && recipients is SingleMessageRecipient) {
if (calc != null && transfer.recipients is SingleMessageRecipient) {
val messageSent = SettableFuture.create<Unit>()
// Inject some artificial latency.
timer.schedule(calc.between(from, recipients).toMillis()) {
timer.schedule(calc.between(transfer.sender, transfer.recipients).toMillis()) {
pumpSendInternal(transfer)
messageSent.set(Unit)
}
@ -189,7 +187,6 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
fun pumpSendInternal(transfer: MessageTransfer) {
when (transfer.recipients) {
is Handle -> getQueueForHandle(transfer.recipients).add(transfer)
is AllPossibleRecipients -> {
// This means all possible recipients _that the network knows about at the time_, not literally everyone
// who joins into the indefinite future.
@ -214,14 +211,14 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
private val executor: AffinityExecutor,
private val database: Database) : SingletonSerializeAsToken(), MessagingServiceInternal {
inner class Handler(val topicSession: TopicSession,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@Volatile
private var running = true
private inner class InnerState {
val handlers: MutableList<Handler> = ArrayList()
val pendingRedelivery = JDBCHashSet<Message>("pending_messages",loadOnInit = true)
val pendingRedelivery = JDBCHashSet<MessageTransfer>("pending_messages", loadOnInit = true)
}
private val state = ThreadBox(InnerState())
@ -240,23 +237,22 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
}
}
override fun addMessageHandler(topic: String, sessionID: Long, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
override fun addMessageHandler(topic: String, sessionID: Long, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration
= addMessageHandler(TopicSession(topic, sessionID), callback)
override fun addMessageHandler(topicSession: TopicSession, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
override fun addMessageHandler(topicSession: TopicSession, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
check(running)
val (handler, items) = state.locked {
val (handler, transfers) = state.locked {
val handler = Handler(topicSession, callback).apply { handlers.add(this) }
val pending = ArrayList<Message>()
val pending = ArrayList<MessageTransfer>()
databaseTransaction(database) {
pending.addAll(pendingRedelivery)
pendingRedelivery.clear()
}
Pair(handler, pending)
}
for (message in items) {
send(message, handle)
}
transfers.forEach { pumpSendInternal(it) }
return handler
}
@ -323,9 +319,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
while (deliverTo == null) {
val transfer = (if (block) q.take() else q.poll()) ?: return null
deliverTo = state.locked {
val h = handlers.filter { if (it.topicSession.isBlank()) true else transfer.message.topicSession == it.topicSession }
if (h.isEmpty()) {
val matchingHandlers = handlers.filter { it.topicSession.isBlank() || transfer.message.topicSession == it.topicSession }
if (matchingHandlers.isEmpty()) {
// Got no handlers for this message yet. Keep the message around and attempt redelivery after a new
// handler has been registered. The purpose of this path is to make unit tests that have multi-threading
// reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting
@ -333,11 +328,11 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
// least sometimes.
log.warn("Message to ${transfer.message.topicSession} could not be delivered")
databaseTransaction(database) {
pendingRedelivery.add(transfer.message)
pendingRedelivery.add(transfer)
}
null
} else {
h
matchingHandlers
}
}
if (deliverTo != null) {
@ -357,7 +352,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
databaseTransaction(database) {
for (handler in deliverTo) {
try {
handler.callback(transfer.message, handler)
handler.callback(transfer.toReceivedMessage(), handler)
} catch(e: Exception) {
log.error("Caught exception in handler for $this/${handler.topicSession}", e)
}
@ -371,5 +366,13 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
}
return transfer
}
private fun MessageTransfer.toReceivedMessage() = object : ReceivedMessage {
override val topicSession: TopicSession get() = message.topicSession
override val data: ByteArray get() = message.data
override val peer: X500Name get() = X509Utilities.getDevX509Name(sender.description)
override val debugTimestamp: Instant get() = message.debugTimestamp
override val uniqueMessageId: UUID get() = message.uniqueMessageId
}
}
}

View File

@ -0,0 +1,69 @@
package net.corda.testing.node
import net.corda.core.getOrThrow
import net.corda.node.internal.Node
import net.corda.node.services.User
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
import net.corda.testing.freeLocalHostAndPort
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.rules.TemporaryFolder
import java.util.*
import kotlin.concurrent.thread
/**
* Extend this class if you need to run nodes in a test. You could use the driver DSL but it's extremely slow for testing
* purposes.
*/
abstract class NodeBasedTest {
@Rule
@JvmField
val tempFolder = TemporaryFolder()
private val nodes = ArrayList<Node>()
lateinit var networkMapNode: Node
@Before
fun startNetworkMapNode() {
networkMapNode = startNode("Network Map", emptyMap())
}
@After
fun stopNodes() {
nodes.forEach(Node::stop)
}
fun startNode(legalName: String, rpcUsers: List<User> = emptyList()): Node {
return startNode(legalName, mapOf(
"networkMapAddress" to networkMapNode.configuration.artemisAddress.toString(),
"rpcUsers" to rpcUsers.map { mapOf(
"user" to it.username,
"password" to it.password,
"permissions" to it.permissions)
}
))
}
private fun startNode(legalName: String, config: Map<String, Any>): Node {
val config = ConfigHelper.loadConfig(
baseDirectoryPath = tempFolder.newFolder(legalName).toPath(),
allowMissingConfig = true,
configOverrides = config + mapOf(
"myLegalName" to legalName,
"artemisAddress" to freeLocalHostAndPort().toString(),
"extraAdvertisedServiceIds" to ""
)
)
val node = FullNodeConfiguration(config).createNode()
node.start()
nodes += node
thread(name = legalName) {
node.run()
}
node.networkMapRegistrationFuture.getOrThrow()
return node
}
}