diff --git a/client/src/main/kotlin/net/corda/client/CordaRPCClient.kt b/client/src/main/kotlin/net/corda/client/CordaRPCClient.kt index e47dce195d..6f28a20ffc 100644 --- a/client/src/main/kotlin/net/corda/client/CordaRPCClient.kt +++ b/client/src/main/kotlin/net/corda/client/CordaRPCClient.kt @@ -9,6 +9,7 @@ import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLI import net.corda.node.services.messaging.CordaRPCOps import net.corda.node.services.messaging.RPCException import net.corda.node.services.messaging.rpcLog +import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.ClientSession @@ -38,11 +39,11 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur private val state = ThreadBox(State()) /** Opens the connection to the server and registers a JVM shutdown hook to cleanly disconnect. */ - @Throws(ActiveMQNotConnectedException::class) + @Throws(ActiveMQException::class) fun start(username: String, password: String) { state.locked { check(!running) - checkStorePasswords() // Check the password. + checkStorePasswords() val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(ConnectionDirection.OUTBOUND, host.hostText, host.port)) serverLocator.threadPoolMaxSize = 1 // TODO: Configure session reconnection, confirmation window sizes and other Artemis features. @@ -53,7 +54,6 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur session.start() clientImpl = CordaRPCClientImpl(session, state.lock, username) running = true - // We will use the ID in strings so strip the sign bit. } Runtime.getRuntime().addShutdownHook(Thread { diff --git a/client/src/main/kotlin/net/corda/client/impl/CordaRPCClientImpl.kt b/client/src/main/kotlin/net/corda/client/impl/CordaRPCClientImpl.kt index 67c8a07d38..3487e281b2 100644 --- a/client/src/main/kotlin/net/corda/client/impl/CordaRPCClientImpl.kt +++ b/client/src/main/kotlin/net/corda/client/impl/CordaRPCClientImpl.kt @@ -41,7 +41,7 @@ import kotlin.reflect.jvm.javaMethod * # Design notes * * The way RPCs are handled is fairly standard except for the handling of observables. When an RPC might return - * an [rx.Observable] it is specially tagged. This causes the client to create a new transient queue for the + * an [Observable] it is specially tagged. This causes the client to create a new transient queue for the * receiving of observables and their observations with a random ID in the name. This ID is sent to the server in * a message header. All observations are sent via this single queue. * @@ -141,19 +141,20 @@ class CordaRPCClientImpl(private val session: ClientSession, */ @ThreadSafe private inner class RPCProxyHandler(private val timeout: Duration?) : InvocationHandler, Closeable { - private val proxyAddress = constructAddress() + private val proxyId = random63BitValue() private val consumer: ClientConsumer var serverProtocolVersion = 0 init { + val proxyAddress = constructAddress(proxyId) consumer = sessionLock.withLock { session.createTemporaryQueue(proxyAddress, proxyAddress) session.createConsumer(proxyAddress) } } - private fun constructAddress() = "${ArtemisMessagingComponent.CLIENTS_PREFIX}$username.rpc.${random63BitValue()}" + private fun constructAddress(addressId: Long) = "${ArtemisMessagingComponent.CLIENTS_PREFIX}$username.rpc.$addressId" @Synchronized override fun invoke(proxy: Any, method: Method, args: Array?): Any? { @@ -230,9 +231,10 @@ class CordaRPCClientImpl(private val session: ClientSession, private fun maybePrepareForObservables(location: Throwable, method: Method, msg: ClientMessage): Kryo { // Create a temporary queue just for the emissions on any observables that are returned. - val observationsQueueName = constructAddress() + val observationsId = random63BitValue() + val observationsQueueName = constructAddress(observationsId) session.createTemporaryQueue(observationsQueueName, observationsQueueName) - msg.putStringProperty(ClientRPCRequestMessage.OBSERVATIONS_TO, observationsQueueName) + msg.putLongProperty(ClientRPCRequestMessage.OBSERVATIONS_TO, observationsId) // And make sure that we deserialise observable handles so that they're linked to the right // queue. Also record a bit of metadata for debugging purposes. return createRPCKryo(observableSerializer = ObservableDeserializer(observationsQueueName, method.name, location)) @@ -241,7 +243,7 @@ class CordaRPCClientImpl(private val session: ClientSession, private fun createMessage(method: Method): ClientMessage { return session.createMessage(false).apply { putStringProperty(ClientRPCRequestMessage.METHOD_NAME, method.name) - putStringProperty(ClientRPCRequestMessage.REPLY_TO, proxyAddress) + putLongProperty(ClientRPCRequestMessage.REPLY_TO, proxyId) // Use the magic deduplication property built into Artemis as our message identity too putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) } @@ -257,10 +259,10 @@ class CordaRPCClientImpl(private val session: ClientSession, override fun close() { consumer.close() - sessionLock.withLock { session.deleteQueue(proxyAddress) } + sessionLock.withLock { session.deleteQueue(constructAddress(proxyId)) } } - override fun toString() = "Corda RPC Proxy listening on queue $proxyAddress" + override fun toString() = "Corda RPC Proxy listening on queue ${constructAddress(proxyId)}" } /** diff --git a/client/src/test/kotlin/net/corda/client/ClientRPCInfrastructureTests.kt b/client/src/test/kotlin/net/corda/client/ClientRPCInfrastructureTests.kt index f9d4ef14cd..83a6b6321d 100644 --- a/client/src/test/kotlin/net/corda/client/ClientRPCInfrastructureTests.kt +++ b/client/src/test/kotlin/net/corda/client/ClientRPCInfrastructureTests.kt @@ -1,8 +1,6 @@ package net.corda.client import net.corda.client.impl.CordaRPCClientImpl -import net.corda.core.millis -import net.corda.core.random63BitValue import net.corda.core.serialization.SerializedBytes import net.corda.core.utilities.LogHelper import net.corda.node.services.RPCUserService @@ -22,14 +20,12 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.After import org.junit.Before import org.junit.Test import rx.Observable import rx.subjects.PublishSubject import java.io.Closeable -import java.time.Duration import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.LinkedBlockingQueue @@ -37,7 +33,6 @@ import java.util.concurrent.locks.ReentrantLock import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertTrue -import kotlin.test.fail class ClientRPCInfrastructureTests { // TODO: Test that timeouts work @@ -47,7 +42,7 @@ class ClientRPCInfrastructureTests { lateinit var clientSession: ClientSession lateinit var producer: ClientProducer lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor - var proxy: TestOps? = null + lateinit var proxy: TestOps private val authenticatedUser = User("test", "password", permissions = setOf()) @@ -94,16 +89,10 @@ class ClientRPCInfrastructureTests { clientSession.start() LogHelper.setLevel("+net.corda.rpc") - } - private fun createProxyUsingReplyTo(username: String, timeout: Duration? = null): TestOps { - val proxy = CordaRPCClientImpl(clientSession, ReentrantLock(), username).proxyFor(TestOps::class.java, timeout = timeout) - this.proxy = proxy - return proxy + proxy = CordaRPCClientImpl(clientSession, ReentrantLock(), authenticatedUser.username).proxyFor(TestOps::class.java) } - private fun createProxyUsingAuthenticatedReplyTo() = createProxyUsingReplyTo(authenticatedUser.username) - @After fun shutdown() { (proxy as Closeable?)?.close() @@ -156,7 +145,6 @@ class ClientRPCInfrastructureTests { @Test fun `simple RPCs`() { - val proxy = createProxyUsingAuthenticatedReplyTo() // Does nothing, doesn't throw. proxy.void() @@ -169,7 +157,6 @@ class ClientRPCInfrastructureTests { @Test fun `simple observable`() { - val proxy = createProxyUsingAuthenticatedReplyTo() // This tests that the observations are transmitted correctly, also completion is transmitted. val observations = proxy.makeObservable().toBlocking().toIterable().toList() assertEquals(listOf(1, 2, 3, 4), observations) @@ -177,7 +164,6 @@ class ClientRPCInfrastructureTests { @Test fun `complex observables`() { - val proxy = createProxyUsingAuthenticatedReplyTo() // This checks that we can return an object graph with complex usage of observables, like an observable // that emits objects that contain more observables. val serverQuotes = PublishSubject.create>>() @@ -224,31 +210,12 @@ class ClientRPCInfrastructureTests { @Test fun versioning() { - val proxy = createProxyUsingAuthenticatedReplyTo() assertFailsWith { proxy.addedLater() } } @Test fun `authenticated user is available to RPC`() { - val proxy = createProxyUsingAuthenticatedReplyTo() assertThat(proxy.captureUser()).isEqualTo(authenticatedUser.username) } - @Test - fun `using another username for the reply-to`() { - assertThatExceptionOfType(RPCException.DeadlineExceeded::class.java).isThrownBy { - val proxy = createProxyUsingReplyTo(random63BitValue().toString(), timeout = 300.millis) - proxy.void() - fail("RPC successfully returned using someone else's username for the reply-to") - } - } - - @Test - fun `using another username for the reply-to, which contains our username as a prefix`() { - assertThatExceptionOfType(RPCException.DeadlineExceeded::class.java).isThrownBy { - val proxy = createProxyUsingReplyTo("${authenticatedUser.username}extra", timeout = 300.millis) - proxy.void() - fail("RPC successfully returned using someone else's username for the reply-to") - } - } } diff --git a/core/src/main/kotlin/net/corda/core/crypto/X509Utilities.kt b/core/src/main/kotlin/net/corda/core/crypto/X509Utilities.kt index 09c4d107c5..e42bfa089e 100644 --- a/core/src/main/kotlin/net/corda/core/crypto/X509Utilities.kt +++ b/core/src/main/kotlin/net/corda/core/crypto/X509Utilities.kt @@ -107,11 +107,11 @@ object X509Utilities { } /** - * Helper method to create Subject field contents + * Return a bogus X509 for dev purposes. Use [getX509Name] for something more real. */ - fun getDevX509Name(domain: String): X500Name { + fun getDevX509Name(commonName: String): X500Name { val nameBuilder = X500NameBuilder(BCStyle.INSTANCE) - nameBuilder.addRDN(BCStyle.CN, domain) + nameBuilder.addRDN(BCStyle.CN, commonName) nameBuilder.addRDN(BCStyle.O, "R3") nameBuilder.addRDN(BCStyle.OU, "corda") nameBuilder.addRDN(BCStyle.L, "London") @@ -574,18 +574,21 @@ object X509Utilities { storePassword: String, keyPassword: String, caKeyStore: KeyStore, - caKeyPassword: String): KeyStore { - val rootCA = loadCertificateAndKey(caKeyStore, + caKeyPassword: String, + commonName: String): KeyStore { + val rootCA = X509Utilities.loadCertificateAndKey( + caKeyStore, caKeyPassword, CORDA_ROOT_CA_PRIVATE_KEY) - val intermediateCA = loadCertificateAndKey(caKeyStore, + val intermediateCA = X509Utilities.loadCertificateAndKey( + caKeyStore, caKeyPassword, CORDA_INTERMEDIATE_CA_PRIVATE_KEY) val serverKey = generateECDSAKeyPairForSSL() val host = InetAddress.getLocalHost() - val subject = getDevX509Name(host.canonicalHostName) - val serverCert = createServerCert(subject, + val serverCert = createServerCert( + getDevX509Name(commonName), serverKey.public, intermediateCA, if (host.canonicalHostName == host.hostName) listOf() else listOf(host.hostName), @@ -594,7 +597,8 @@ object X509Utilities { val keyPass = keyPassword.toCharArray() val keyStore = loadOrCreateKeyStore(keyStoreFilePath, storePassword) - keyStore.addOrReplaceKey(CORDA_CLIENT_CA_PRIVATE_KEY, + keyStore.addOrReplaceKey( + CORDA_CLIENT_CA_PRIVATE_KEY, serverKey.private, keyPass, arrayOf(serverCert, intermediateCA.certificate, rootCA.certificate)) diff --git a/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt b/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt index 7f7a01a710..4d2d628976 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt @@ -7,6 +7,8 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.serialization.DeserializeAsKotlinObjectDef import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize +import org.bouncycastle.asn1.x500.X500Name +import org.bouncycastle.asn1.x500.style.BCStyle import java.time.Instant import java.util.* import java.util.concurrent.atomic.AtomicBoolean @@ -37,7 +39,7 @@ interface MessagingService { * @param sessionID identifier for the session the message is part of. For services listening before * a session is established, use [DEFAULT_SESSION_ID]. */ - fun addMessageHandler(topic: String = "", sessionID: Long = DEFAULT_SESSION_ID, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration + fun addMessageHandler(topic: String = "", sessionID: Long = DEFAULT_SESSION_ID, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration /** * The provided function will be invoked for each received message whose topic and session matches. The callback @@ -49,7 +51,7 @@ interface MessagingService { * * @param topicSession identifier for the topic and session to listen for messages arriving on. */ - fun addMessageHandler(topicSession: TopicSession, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration + fun addMessageHandler(topicSession: TopicSession, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration /** * Removes a handler given the object returned from [addMessageHandler]. The callback will no longer be invoked once @@ -104,7 +106,7 @@ fun MessagingService.createMessage(topic: String, sessionID: Long = DEFAULT_SESS * @param sessionID identifier for the session the message is part of. For services listening before * a session is established, use [DEFAULT_SESSION_ID]. */ -fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, callback: (Message) -> Unit) +fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, callback: (ReceivedMessage) -> Unit) = runOnNextMessage(TopicSession(topic, sessionID), callback) /** @@ -114,7 +116,7 @@ fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, callback: * * @param topicSession identifier for the topic and session to listen for messages arriving on. */ -inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, crossinline callback: (Message) -> Unit) { +inline fun MessagingService.runOnNextMessage(topicSession: TopicSession, crossinline callback: (ReceivedMessage) -> Unit) { val consumed = AtomicBoolean() addMessageHandler(topicSession) { msg, reg -> removeMessageHandler(reg) @@ -155,12 +157,7 @@ interface MessageHandlerRegistration * a session is established, use [DEFAULT_SESSION_ID]. */ data class TopicSession(val topic: String, val sessionID: Long = DEFAULT_SESSION_ID) { - companion object { - val Blank = TopicSession("", DEFAULT_SESSION_ID) - } - fun isBlank() = topic.isBlank() && sessionID == DEFAULT_SESSION_ID - override fun toString(): String = "$topic.$sessionID" } @@ -181,6 +178,15 @@ interface Message { val uniqueMessageId: UUID } +// TODO Have ReceivedMessage point to the TLS certificate of the peer, and [peer] would simply be the subject DN of that. +// The certificate would need to be serialised into the message header or just its fingerprint and then download it via RPC, +// or something like that. +interface ReceivedMessage : Message { + /** The authenticated sender. */ + val peer: X500Name + val peerLegalName: String get() = peer.getRDNs(BCStyle.CN).first().first.value.toString() +} + /** A singleton that's useful for validating topic strings */ object TopicStringValidator { private val regex = "[a-zA-Z0-9.]+".toPattern() diff --git a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt index 81315a7164..2d1586b1ae 100644 --- a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt @@ -49,8 +49,7 @@ object NotaryFlow { throw NotaryException(NotaryError.SignaturesMissing(ex.missing)) } - val request = SignRequest(stx, serviceHub.myInfo.legalIdentity) - val response = sendAndReceive(notaryParty, request) + val response = sendAndReceive(notaryParty, SignRequest(stx)) return validateResponse(response) } @@ -95,14 +94,13 @@ object NotaryFlow { @Suspendable override fun call() { - val (stx, reqIdentity) = receive(otherSide).unwrap { it } + val stx = receive(otherSide).unwrap { it.tx } val wtx = stx.tx val result = try { validateTimestamp(wtx) - beforeCommit(stx, reqIdentity) - commitInputStates(wtx, reqIdentity) - + beforeCommit(stx) + commitInputStates(wtx) val sig = sign(stx.id.bytes) Result.Success(sig) } catch(e: NotaryException) { @@ -127,12 +125,12 @@ object NotaryFlow { * undo the commit of the input states (the exact mechanism still needs to be worked out). */ @Suspendable - open fun beforeCommit(stx: SignedTransaction, reqIdentity: Party) { + open fun beforeCommit(stx: SignedTransaction) { } - private fun commitInputStates(tx: WireTransaction, reqIdentity: Party) { + private fun commitInputStates(tx: WireTransaction) { try { - uniquenessProvider.commit(tx.inputs, tx.id, reqIdentity) + uniquenessProvider.commit(tx.inputs, tx.id, otherSide) } catch (e: UniquenessException) { val conflictData = e.error.serialize() val signedConflict = SignedData(conflictData, sign(conflictData.bytes)) @@ -146,8 +144,7 @@ object NotaryFlow { } } - /** TODO: The caller must authenticate instead of just specifying its identity */ - data class SignRequest(val tx: SignedTransaction, val callerIdentity: Party) + data class SignRequest(val tx: SignedTransaction) sealed class Result { class Error(val error: NotaryError) : Result() diff --git a/core/src/main/kotlin/net/corda/flows/ValidatingNotaryFlow.kt b/core/src/main/kotlin/net/corda/flows/ValidatingNotaryFlow.kt index 5a10bfd575..4db4f65075 100644 --- a/core/src/main/kotlin/net/corda/flows/ValidatingNotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/ValidatingNotaryFlow.kt @@ -21,11 +21,11 @@ class ValidatingNotaryFlow(otherSide: Party, NotaryFlow.Service(otherSide, timestampChecker, uniquenessProvider) { @Suspendable - override fun beforeCommit(stx: SignedTransaction, reqIdentity: Party) { + override fun beforeCommit(stx: SignedTransaction) { try { checkSignatures(stx) val wtx = stx.tx - resolveTransaction(reqIdentity, wtx) + resolveTransaction(wtx) wtx.toLedgerTransaction(serviceHub).verify() } catch (e: Exception) { when (e) { @@ -45,7 +45,7 @@ class ValidatingNotaryFlow(otherSide: Party, } @Suspendable - private fun resolveTransaction(reqIdentity: Party, wtx: WireTransaction) { - subFlow(ResolveTransactionsFlow(wtx, reqIdentity)) + private fun resolveTransaction(wtx: WireTransaction) { + subFlow(ResolveTransactionsFlow(wtx, otherSide)) } } diff --git a/core/src/test/kotlin/net/corda/core/crypto/X509UtilitiesTest.kt b/core/src/test/kotlin/net/corda/core/crypto/X509UtilitiesTest.kt index b91043446d..fbbd39fbd4 100644 --- a/core/src/test/kotlin/net/corda/core/crypto/X509UtilitiesTest.kt +++ b/core/src/test/kotlin/net/corda/core/crypto/X509UtilitiesTest.kt @@ -145,7 +145,7 @@ class X509UtilitiesTest { val caCertAndKey = X509Utilities.loadCertificateAndKey(caKeyStore, "cakeypass", X509Utilities.CORDA_INTERMEDIATE_CA_PRIVATE_KEY) // Generate server cert and private key and populate another keystore suitable for SSL - X509Utilities.createKeystoreForSSL(tmpServerKeyStore, "serverstorepass", "serverkeypass", caKeyStore, "cakeypass") + X509Utilities.createKeystoreForSSL(tmpServerKeyStore, "serverstorepass", "serverkeypass", caKeyStore, "cakeypass", "Mega Corp.") // Load back server certificate val serverKeyStore = X509Utilities.loadKeyStore(tmpServerKeyStore, "serverstorepass") @@ -153,9 +153,8 @@ class X509UtilitiesTest { serverCertAndKey.certificate.checkValidity(Date()) serverCertAndKey.certificate.verify(caCertAndKey.certificate.publicKey) - val host = InetAddress.getLocalHost() - assertTrue { serverCertAndKey.certificate.subjectDN.name.contains("CN=" + host.canonicalHostName) } + assertTrue { serverCertAndKey.certificate.subjectDN.name.contains("CN=Mega Corp.") } // Now sign something with private key and verify against certificate public key val testData = "123456".toByteArray() @@ -183,7 +182,7 @@ class X509UtilitiesTest { "trustpass") // Generate server cert and private key and populate another keystore suitable for SSL - val keyStore = X509Utilities.createKeystoreForSSL(tmpServerKeyStore, "serverstorepass", "serverstorepass", caKeyStore, "cakeypass") + val keyStore = X509Utilities.createKeystoreForSSL(tmpServerKeyStore, "serverstorepass", "serverstorepass", caKeyStore, "cakeypass", "Mega Corp.") val trustStore = X509Utilities.loadKeyStore(tmpTrustStore, "trustpass") val context = SSLContext.getInstance("TLS") @@ -257,8 +256,7 @@ class X509UtilitiesTest { val peerX500Principal = (peerChain[0] as X509Certificate).subjectX500Principal val x500name = X500Name(peerX500Principal.name) val cn = x500name.getRDNs(BCStyle.CN).first().first.value.toString() - val hostname = InetAddress.getLocalHost().canonicalHostName - assertEquals(hostname, cn) + assertEquals("Mega Corp.", cn) val output = DataOutputStream(clientSocket.outputStream) diff --git a/docs/source/messaging.rst b/docs/source/messaging.rst index 91d7856d8b..10f45c3d19 100644 --- a/docs/source/messaging.rst +++ b/docs/source/messaging.rst @@ -17,15 +17,6 @@ messaging subsystem directly. Instead you will build on top of the :doc:`flow fr which adds a layer on top of raw messaging to manage multi-step flows and let you think in terms of identities rather than specific network endpoints. -Messaging types ---------------- - -Every ``Message`` object has an associated *topic* and may have a *session ID*. These are wrapped in a ``TopicSession``. -An implementation of ``MessagingService`` can be used to create messages and send them. You can get access to the -messaging service via the ``ServiceHub`` object that is provided to your app. Endpoints on the network are -identified at the lowest level using ``SingleMessageRecipient`` which may be e.g. an IP address, or in future -versions perhaps a routing path through the network. - .. _network-map-service: Network Map Service @@ -48,4 +39,72 @@ The network map currently supports: * Looking up node for a party * Suggesting a node providing a specific service, based on suitability for a contract and parties, for example suggesting an appropriate interest rates oracle for a interest rate swap contract. Currently no recommendation logic is in place. - The code simply picks the first registered node that supports the required service. \ No newline at end of file + The code simply picks the first registered node that supports the required service. + +Message queues +-------------- + +The node makes use of various queues for its operation. The more important ones are described below. Others are used +for maintenance and other minor purposes. + +:``p2p.inbound`` + The node listens for messages sent from other peer nodes on this queue. Only clients who are authenticated to be + nodes on the same network are given permission to send. Messages which are routed internally are also sent to this + queue (e.g. two flows on the same node communicating with each other). + +:``internal.peers.$identity`` + These are a set of private queues only available to the node which it uses to route messages destined to other peers. + The queue name ends in the base 58 encoding of the peer's identity key. There is at most one queue per peer. The broker + creates a bridge from this queue to the peer's ``p2p.inbound`` queue, using the network map service to lookup the + peer's network address. + +:``internal.networkmap`` + This is another private queue just for the node which functions in a similar manner to the ``p2p.peers.*`` queues + except this is used to form a connection to the network map node. The node running the network map service is treated + differently as it provides information about the rest of the network. + +:``rpc.requests`` + RPC clients send their requests here, and it's only open for sending by clients authenticated as RPC users. + +:``clients.$user.rpc.$random`` + RPC clients are given permission to create a temporary queue incorporating their username (``$user``) and sole + permission to receive messages from it. RPC requests are required to include a random number (``$random``) from + which the node is able to construct the queue the user is listening on and send the response to that. This mechanism + prevents other users from being able listen in on the responses. + +Security +-------- + +Clients attempting to connect to the node's broker fall in one of four groups: + +# Anyone connecting with the username ``SystemUsers/Node`` is treated as the node hosting the broker, or a logical +component of the node. The TLS certificate they provide must match the one broker has for the node. If that's the case +they are given full access to all valid queues, otherwise they are rejected. + +# Anyone connecting with the username ``SystemUsers/Peer`` is treated as a peer on the same Corda network as the node. Their +TLS root CA must be the same as the node's root CA - the root CA is the doorman of the network and having the same root CA +implies we've been let in by the same doorman. If they are part of the same network then they are only given permission +to send to our ``p2p.inbound`` queue, otherwise they are rejected. + +# Every other username is treated as a RPC user and authenticated against the node's list of valid RPC users. If that +is successful then they are only given sufficient permission to perform RPC, otherwise they are rejected. + +# Clients connecting without a username and password are rejected. + +Artemis provides a feature of annotating each received message with the validated user. This allows the node's messaging +service to provide authenticated messages to the rest of the system. For the first two client types described above the +validated user is the X.500 subject DN of the client TLS certificate and we assume the common name is the legal name of +the peer. This allows the flow framework to authentically determine the ``Party`` initiating a new flow. For RPC clients +the validated user is the username itself and the RPC framework uses this to determine what permissions the user has. + +.. note:: ``Party`` lookup is currently done by the legal name which isn't guaranteed to be unique. A future version will +use the full X.500 name as it can provide additional structures for uniqueness. + +Messaging types +--------------- + +Every ``Message`` object has an associated *topic* and may have a *session ID*. These are wrapped in a ``TopicSession``. +An implementation of ``MessagingService`` can be used to create messages and send them. You can get access to the +messaging service via the ``ServiceHub`` object that is provided to your app. Endpoints on the network are +identified at the lowest level using ``SingleMessageRecipient`` which may be e.g. an IP address, or in future +versions perhaps a routing path through the network. \ No newline at end of file diff --git a/docs/source/setting-up-a-corda-network.rst b/docs/source/setting-up-a-corda-network.rst index 6e5df38a77..e6b3ae3180 100644 --- a/docs/source/setting-up-a-corda-network.rst +++ b/docs/source/setting-up-a-corda-network.rst @@ -20,13 +20,8 @@ Setting up your own network Certificates ------------ -If two nodes are to communicate successfully then both need to have -each other's root certificate in their truststores. The simplest way -to achieve this is to have all nodes sign off of a single root. - -Later R3 will provide this root for production use, but for testing you -can use ``certSigningRequestUtility.jar`` to generate a node -certificate with a fixed test root: +All nodes belonging to the same Corda network must have the same root CA. For testing purposes you can +use ``certSigningRequestUtility.jar`` to generate a node certificate with a fixed test root: .. sourcecode:: bash diff --git a/node/build.gradle b/node/build.gradle index c187d3c33c..ed2cd6e5e6 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -62,6 +62,7 @@ sourceSets { dependencies { compile project(':finance') testCompile project(':test-utils') + testCompile project(':client') compile "com.google.code.findbugs:jsr305:3.0.1" diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt new file mode 100644 index 0000000000..571b937a62 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -0,0 +1,230 @@ +package net.corda.services.messaging + +import co.paralleluniverse.fibers.Suspendable +import com.google.common.net.HostAndPort +import net.corda.client.impl.CordaRPCClientImpl +import net.corda.core.crypto.Party +import net.corda.core.crypto.composite +import net.corda.core.crypto.generateKeyPair +import net.corda.core.flows.FlowLogic +import net.corda.core.getOrThrow +import net.corda.core.random63BitValue +import net.corda.core.seconds +import net.corda.node.internal.Node +import net.corda.node.services.User +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NETWORK_MAP_ADDRESS +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.P2P_QUEUE +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEERS_PREFIX +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE +import net.corda.node.services.messaging.CordaRPCOps +import net.corda.node.services.messaging.NodeMessagingClient.Companion.RPC_QUEUE_REMOVALS_QUEUE +import net.corda.testing.messaging.SimpleMQClient +import net.corda.testing.node.NodeBasedTest +import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException +import org.apache.activemq.artemis.api.core.SimpleString +import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.util.* +import java.util.concurrent.locks.ReentrantLock + +/** + * Runs a series of MQ-related attacks against a node. Subclasses need to call [startAttacker] to connect + * the attacker to [alice]. + */ +abstract class MQSecurityTest : NodeBasedTest() { + val rpcUser = User("user1", "pass", permissions = emptySet()) + lateinit var alice: Node + lateinit var attacker: SimpleMQClient + private val clients = ArrayList() + + @Before + fun start() { + alice = startNode("Alice", rpcUsers = extraRPCUsers + rpcUser) + attacker = SimpleMQClient(alice.configuration.artemisAddress) + startAttacker(attacker) + } + + open val extraRPCUsers: List get() = emptyList() + + abstract fun startAttacker(attacker: SimpleMQClient) + + @After + fun stopClients() { + clients.forEach { it.stop() } + } + + @Test + fun `consume message from P2P queue`() { + assertConsumeAttackFails(P2P_QUEUE) + } + + @Test + fun `consume message from peer queue`() { + val bobParty = startBobAndCommunicateWithAlice() + assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}") + } + + @Test + fun `send message to peer address`() { + val bobParty = startBobAndCommunicateWithAlice() + assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}") + } + + @Test + fun `create queue for unknown peer`() { + val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.composite.toBase58String()}" + assertNonTempQueueCreationAttackFails(invalidPeerQueue, durable = true) + assertNonTempQueueCreationAttackFails(invalidPeerQueue, durable = false) + assertTempQueueCreationAttackFails(invalidPeerQueue) + } + + @Test + fun `consume message from network map queue`() { + assertConsumeAttackFails(NETWORK_MAP_ADDRESS.toString()) + } + + @Test + fun `send message to network map address`() { + assertSendAttackFails(NETWORK_MAP_ADDRESS.toString()) + } + + @Test + fun `consume message from RPC requests queue`() { + assertConsumeAttackFails(RPC_REQUESTS_QUEUE) + } + + @Test + fun `consume message from logged in user's RPC queue`() { + val user1Queue = loginToRPCAndGetClientQueue() + assertConsumeAttackFails(user1Queue) + } + + @Test + fun `send message on logged in user's RPC address`() { + val user1Queue = loginToRPCAndGetClientQueue() + assertSendAttackFails(user1Queue) + } + + @Test + fun `create queue for valid RPC user`() { + val user1Queue = "$CLIENTS_PREFIX${rpcUser.username}.rpc.${random63BitValue()}" + assertTempQueueCreationAttackFails(user1Queue) + } + + @Test + fun `create queue for invalid RPC user`() { + val invalidRPCQueue = "$CLIENTS_PREFIX${random63BitValue()}.rpc.${random63BitValue()}" + assertTempQueueCreationAttackFails(invalidRPCQueue) + } + + @Test + fun `consume message from RPC queue removals queue`() { + assertConsumeAttackFails(RPC_QUEUE_REMOVALS_QUEUE) + } + + @Test + fun `send message to notifications address`() { + assertSendAttackFails(NOTIFICATIONS_ADDRESS) + } + + @Test + fun `create random queue`() { + val randomQueue = random63BitValue().toString() + assertNonTempQueueCreationAttackFails(randomQueue, durable = false) + assertNonTempQueueCreationAttackFails(randomQueue, durable = true) + assertTempQueueCreationAttackFails(randomQueue) + } + + fun clientTo(target: HostAndPort): SimpleMQClient { + val client = SimpleMQClient(target) + clients += client + return client + } + + fun loginToRPC(target: HostAndPort, rpcUser: User): SimpleMQClient { + val client = clientTo(target) + client.loginToRPC(rpcUser) + return client + } + + fun SimpleMQClient.loginToRPC(rpcUser: User): CordaRPCOps { + start(rpcUser.username, rpcUser.password) + val clientImpl = CordaRPCClientImpl(session, ReentrantLock(), rpcUser.username) + return clientImpl.proxyFor(CordaRPCOps::class.java, timeout = 1.seconds) + } + + fun loginToRPCAndGetClientQueue(): String { + val rpcClient = loginToRPC(alice.configuration.artemisAddress, rpcUser) + val clientQueueQuery = SimpleString("$CLIENTS_PREFIX${rpcUser.username}.rpc.*") + return rpcClient.session.addressQuery(clientQueueQuery).queueNames.single().toString() + } + + fun assertTempQueueCreationAttackFails(queue: String) { + assertAttackFails(queue, "CREATE_NON_DURABLE_QUEUE") { + attacker.session.createTemporaryQueue(queue, queue) + } + // Double-check + assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java).isThrownBy { + attacker.session.createConsumer(queue) + } + } + + fun assertNonTempQueueCreationAttackFails(queue: String, durable: Boolean) { + val permission = if (durable) "CREATE_DURABLE_QUEUE" else "CREATE_NON_DURABLE_QUEUE" + assertAttackFails(queue, permission) { + attacker.session.createQueue(queue, queue, durable) + } + // Double-check + assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java).isThrownBy { + attacker.session.createConsumer(queue) + } + } + + fun assertSendAttackFails(address: String) { + val message = attacker.createMessage() + assertAttackFails(address, "SEND") { + attacker.producer.send(address, message) + } + // TODO Make sure no actual message is received + } + + fun assertConsumeAttackFails(queue: String) { + assertAttackFails(queue, "CONSUME") { + attacker.session.createConsumer(queue) + } + assertAttackFails(queue, "BROWSE") { + attacker.session.createConsumer(queue, true) + } + } + + fun assertAttackFails(queue: String, permission: String, attack: () -> Unit) { + assertThatExceptionOfType(ActiveMQSecurityException::class.java) + .isThrownBy(attack) + .withMessageContaining(queue) + .withMessageContaining(permission) + } + + private fun startBobAndCommunicateWithAlice(): Party { + val bob = startNode("Bob") + bob.services.registerFlowInitiator(SendFlow::class, ::ReceiveFlow) + val bobParty = bob.info.legalIdentity + // Perform a protocol exchange to force the peer queue to be created + alice.services.startFlow(SendFlow(bobParty, 0)).resultFuture.getOrThrow() + return bobParty + } + + private class SendFlow(val otherParty: Party, val payload: Any) : FlowLogic() { + @Suspendable + override fun call() = send(otherParty, payload) + } + + private class ReceiveFlow(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call() = receive(otherParty).unwrap { it } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt new file mode 100644 index 0000000000..2e10d079dd --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt @@ -0,0 +1,53 @@ +package net.corda.services.messaging + +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEER_USER +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE +import net.corda.testing.messaging.SimpleMQClient +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration +import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException +import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.junit.Test + +/** + * Runs the security tests with the attacker pretending to be a node on the network. + */ +class P2PSecurityTest : MQSecurityTest() { + + override fun startAttacker(attacker: SimpleMQClient) { + attacker.start(PEER_USER, PEER_USER) // Login as a peer + } + + @Test + fun `send message to RPC requests address`() { + assertSendAttackFails(RPC_REQUESTS_QUEUE) + } + + @Test + fun `only the node running the broker can login using the special node user`() { + val attacker = SimpleMQClient(alice.configuration.artemisAddress) + assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { + attacker.start(NODE_USER, NODE_USER) + } + attacker.stop() + } + + @Test + fun `login as the default cluster user`() { + val attacker = SimpleMQClient(alice.configuration.artemisAddress) + assertThatExceptionOfType(ActiveMQClusterSecurityException::class.java).isThrownBy { + attacker.start(ActiveMQDefaultConfiguration.getDefaultClusterUser(), ActiveMQDefaultConfiguration.getDefaultClusterPassword()) + } + attacker.stop() + } + + @Test + fun `login without a username and password`() { + val attacker = SimpleMQClient(alice.configuration.artemisAddress) + assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { + attacker.start() + } + attacker.stop() + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/RPCSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/RPCSecurityTest.kt new file mode 100644 index 0000000000..0e21dceccc --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/RPCSecurityTest.kt @@ -0,0 +1,15 @@ +package net.corda.services.messaging + +import net.corda.node.services.User +import net.corda.testing.messaging.SimpleMQClient + +/** + * Runs the security tests with the attacker being a valid RPC user of Alice. + */ +class RPCSecurityTest : MQSecurityTest() { + override val extraRPCUsers = listOf(User("evil", "pass", permissions = emptySet())) + + override fun startAttacker(attacker: SimpleMQClient) { + attacker.loginToRPC(extraRPCUsers[0]) + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 949948fd1e..9adc82ff3a 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -15,6 +15,7 @@ import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserServiceImpl import net.corda.node.services.api.MessagingServiceInternal import net.corda.node.services.config.FullNodeConfiguration +import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.messaging.RPCOps @@ -118,15 +119,15 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: private lateinit var userService: RPCUserService override fun makeMessagingService(): MessagingServiceInternal { - val legalIdentity = obtainLegalIdentity() - val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null userService = RPCUserServiceImpl(configuration) val serverAddr = with(configuration) { messagingServerAddress ?: { - messageBroker = ArtemisMessagingServer(this, artemisAddress, myIdentityOrNullIfNetworkMapService, services.networkMapCache, userService) + messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache, userService) artemisAddress }() } + val legalIdentity = obtainLegalIdentity() + val myIdentityOrNullIfNetworkMapService = if (networkMapService != null) legalIdentity.owningKey else null return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database, networkMapRegistrationFuture) } @@ -135,12 +136,13 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: messageBroker?.apply { runOnStop += Runnable { messageBroker?.stop() } start() - bridgeToNetworkMapService(networkMapService) + if (networkMapService is NetworkMapAddress) { + bridgeToNetworkMapService(networkMapService) + } } // Start up the MQ client. val net = net as NodeMessagingClient - net.configureWithDevSSLCertificate() // TODO: Client might need a separate certificate net.start(rpcOps, userService) } @@ -151,7 +153,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: // Export JMX monitoring statistics and data over REST/JSON. if (configuration.exportJMXto.split(',').contains("http")) { val classpath = System.getProperty("java.class.path").split(System.getProperty("path.separator")) - val warpath = classpath.firstOrNull() { it.contains("jolokia-agent-war-2") && it.endsWith(".war") } + val warpath = classpath.firstOrNull { it.contains("jolokia-agent-war-2") && it.endsWith(".war") } if (warpath != null) { handlerCollection.addHandler(WebAppContext().apply { // Find the jolokia WAR file on the classpath. @@ -174,7 +176,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: httpsConfiguration.outputBufferSize = 32768 httpsConfiguration.addCustomizer(SecureRequestCustomizer()) val sslContextFactory = SslContextFactory() - sslContextFactory.setKeyStorePath(configuration.keyStorePath.toString()) + sslContextFactory.keyStorePath = configuration.keyStorePath.toString() sslContextFactory.setKeyStorePassword(configuration.keyStorePassword) sslContextFactory.setKeyManagerPassword(configuration.keyStorePassword) sslContextFactory.setTrustStorePath(configuration.trustStorePath.toString()) diff --git a/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt b/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt index fa399976f2..147713f4ef 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/ConfigUtilities.kt @@ -100,7 +100,9 @@ inline fun Config.getListOrElse(path: String, default: Config. * Strictly for dev only automatically construct a server certificate/private key signed from * the CA certs in Node resources. Then provision KeyStores into certificates folder under node path. */ -fun NodeSSLConfiguration.configureWithDevSSLCertificate() { +fun NodeConfiguration.configureWithDevSSLCertificate() = configureDevKeyAndTrustStores(myLegalName) + +private fun NodeSSLConfiguration.configureDevKeyAndTrustStores(myLegalName: String) { certificatesPath.createDirectories() if (!trustStorePath.exists()) { javaClass.classLoader.getResourceAsStream("net/corda/node/internal/certificates/cordatruststore.jks").copyTo(trustStorePath) @@ -109,7 +111,7 @@ fun NodeSSLConfiguration.configureWithDevSSLCertificate() { val caKeyStore = X509Utilities.loadKeyStore( javaClass.classLoader.getResourceAsStream("net/corda/node/internal/certificates/cordadevcakeys.jks"), "cordacadevpass") - X509Utilities.createKeystoreForSSL(keyStorePath, keyStorePassword, keyStorePassword, caKeyStore, "cordacadevkeypass") + X509Utilities.createKeystoreForSSL(keyStorePath, keyStorePassword, keyStorePassword, caKeyStore, "cordacadevkeypass", myLegalName) } } @@ -120,6 +122,6 @@ fun configureTestSSL(): NodeSSLConfiguration = object : NodeSSLConfiguration { override val trustStorePassword: String get() = "trustpass" init { - configureWithDevSSLCertificate() + configureDevKeyAndTrustStores("Mega Corp.") } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt index c45b5c98c6..004e7db67b 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt @@ -8,7 +8,6 @@ import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.read import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.node.services.config.NodeSSLConfiguration -import net.corda.node.services.config.configureWithDevSSLCertificate import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory @@ -28,12 +27,20 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { System.setProperty("org.jboss.logging.provider", "slf4j") } - const val PEERS_PREFIX = "peers." + // System users must contain an invalid RPC username character to prevent any chance of name clash which in this + // case is a forward slash + const val NODE_USER = "SystemUsers/Node" + const val PEER_USER = "SystemUsers/Peer" + + const val INTERNAL_PREFIX = "internal." + const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." const val CLIENTS_PREFIX = "clients." + const val P2P_QUEUE = "p2p.inbound" const val RPC_REQUESTS_QUEUE = "rpc.requests" + const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications" @JvmStatic - protected val NETWORK_MAP_ADDRESS = SimpleString("${PEERS_PREFIX}networkmap") + val NETWORK_MAP_ADDRESS = SimpleString("${INTERNAL_PREFIX}networkmap") /** * Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should @@ -46,27 +53,6 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address") return addr.hostAndPort } - - /** - * Assuming the passed in target address is actually an ArtemisAddress will extract the queue name used. - * For now the queue name is the Base58 version of the node's identity. - * This should only be used in the internals of the messaging services to keep addressing opaque for the future. - * N.B. Marked as JvmStatic to allow use in the inherited classes. - */ - @JvmStatic - protected fun toQueueName(target: MessageRecipients): SimpleString { - val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address") - return addr.queueName - - } - - /** - * Convert the identity, host and port of this node into the appropriate [SingleMessageRecipient]. - * - * N.B. Marked as JvmStatic to allow use in the inherited classes. - */ - @JvmStatic - protected fun toMyAddress(myIdentity: CompositeKey?, myHostPort: HostAndPort): SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, myHostPort) else NetworkMapAddress(myHostPort) } protected interface ArtemisAddress { @@ -74,7 +60,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { val hostAndPort: HostAndPort } - protected data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress { + data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress { override val queueName: SimpleString get() = NETWORK_MAP_ADDRESS } @@ -84,18 +70,13 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { * For instance it may contain onion routing data. */ data class NodeAddress(val identity: CompositeKey, override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress { - override val queueName: SimpleString by lazy { SimpleString(PEERS_PREFIX + identity.toBase58String()) } + override val queueName: SimpleString = SimpleString("$PEERS_PREFIX${identity.toBase58String()}") override fun toString(): String = "${javaClass.simpleName}(identity = $queueName, $hostAndPort)" } /** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */ abstract val config: NodeSSLConfiguration - protected fun parseKeyFromQueueName(name: String): CompositeKey { - require(name.startsWith(PEERS_PREFIX)) - return CompositeKey.parseFromBase58(name.substring(PEERS_PREFIX.length)) - } - protected enum class ConnectionDirection { INBOUND, OUTBOUND } // Restrict enabled Cipher Suites to AES and GCM as minimum for the bulk cipher. @@ -135,7 +116,7 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { mapOf( // Basic TCP target details TransportConstants.HOST_PROP_NAME to host, - TransportConstants.PORT_PROP_NAME to port.toInt(), + TransportConstants.PORT_PROP_NAME to port, // Turn on AMQP support, which needs the protocol jar on the classpath. // Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop @@ -159,10 +140,6 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { ) } - fun configureWithDevSSLCertificate() { - config.configureWithDevSSLCertificate() - } - protected fun Path.expectedOnDefaultFileSystem() { require(fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index be11255ca1..f438d7476f 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -4,14 +4,23 @@ import com.google.common.net.HostAndPort import net.corda.core.ThreadBox import net.corda.core.crypto.AddressFormatException import net.corda.core.crypto.CompositeKey +import net.corda.core.crypto.X509Utilities +import net.corda.core.crypto.X509Utilities.CORDA_CLIENT_CA +import net.corda.core.crypto.X509Utilities.CORDA_ROOT_CA +import net.corda.core.crypto.newSecureRandom import net.corda.core.div -import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.services.NetworkMapCache +import net.corda.core.node.services.NetworkMapCache.MapChangeType +import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor import net.corda.node.printBasicNodeInfo import net.corda.node.services.RPCUserService import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.NODE_USER +import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.INBOUND +import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.OUTBOUND +import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.NODE_ROLE +import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.PEER_ROLE +import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.RPC_ROLE import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.core.config.BridgeConfiguration import org.apache.activemq.artemis.core.config.Configuration @@ -21,11 +30,14 @@ import org.apache.activemq.artemis.core.security.Role import org.apache.activemq.artemis.core.server.ActiveMQServer import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager +import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal import rx.Subscription import java.io.IOException +import java.math.BigInteger import java.security.Principal +import java.security.PublicKey import java.util.* import javax.annotation.concurrent.ThreadSafe import javax.security.auth.Subject @@ -55,10 +67,8 @@ import javax.security.auth.spi.LoginModule @ThreadSafe class ArtemisMessagingServer(override val config: NodeConfiguration, val myHostPort: HostAndPort, - val myIdentity: CompositeKey?, val networkMapCache: NetworkMapCache, val userService: RPCUserService) : ArtemisMessagingComponent() { - companion object { private val log = loggerFor() } @@ -70,7 +80,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, private val mutex = ThreadBox(InnerState()) private lateinit var activeMQServer: ActiveMQServer private var networkChangeHandle: Subscription? = null - private val myQueueName = toQueueName(toMyAddress(myIdentity, myHostPort)) init { config.basedir.expectedOnDefaultFileSystem() @@ -79,7 +88,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, fun start() = mutex.locked { if (!running) { configureAndStartServer() - networkChangeHandle = networkMapCache.changed.subscribe { onNetworkChange(it) } + networkChangeHandle = networkMapCache.changed.subscribe { destroyPossibleStaleBridge(it) } running = true } } @@ -91,50 +100,29 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, running = false } - fun bridgeToNetworkMapService(networkMapService: SingleMessageRecipient?) { - if ((networkMapService != null) && (networkMapService is NetworkMapAddress)) { - val query = activeMQServer.queueQuery(NETWORK_MAP_ADDRESS) - if (!query.isExists) { - activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false) - } - - maybeDeployBridgeForAddress(NETWORK_MAP_ADDRESS, networkMapService) + fun bridgeToNetworkMapService(networkMapService: NetworkMapAddress) { + val query = activeMQServer.queueQuery(NETWORK_MAP_ADDRESS) + if (!query.isExists) { + activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false) } + maybeDeployBridgeForAddress(networkMapService) } - private fun onNetworkChange(change: NetworkMapCache.MapChange) { - val address = change.node.address - if (address is ArtemisMessagingComponent.ArtemisAddress) { - val queueName = address.queueName - when (change.type) { - NetworkMapCache.MapChangeType.Added -> { - val query = activeMQServer.queueQuery(queueName) - if (query.isExists) { - // Queue exists so now wire up bridge - maybeDeployBridgeForAddress(queueName, change.node.address) - } - } + private fun destroyPossibleStaleBridge(change: NetworkMapCache.MapChange) { + fun removePreviousBridge() { + (change.prevNodeInfo?.address as? ArtemisAddress)?.let { + maybeDestroyBridge(it.queueName) + } + } - NetworkMapCache.MapChangeType.Modified -> { - (change.prevNodeInfo?.address as? ArtemisMessagingComponent.ArtemisAddress)?.let { - // remove any previous possibly different bridge - maybeDestroyBridge(it.queueName) - } - val query = activeMQServer.queueQuery(queueName) - if (query.isExists) { - // Deploy new bridge - maybeDeployBridgeForAddress(queueName, change.node.address) - } - } - - NetworkMapCache.MapChangeType.Removed -> { - (change.prevNodeInfo?.address as? ArtemisMessagingComponent.ArtemisAddress)?.let { - // Remove old bridge - maybeDestroyBridge(it.queueName) - } - // just in case of NetworkMapCache version issues - maybeDestroyBridge(queueName) - } + if (change.type == MapChangeType.Modified) { + removePreviousBridge() + } else if (change.type == MapChangeType.Removed) { + removePreviousBridge() + // TODO Fix the network map change data classes so that the remove event doesn't have two NodeInfo fields + val address = change.node.address + if (address is ArtemisAddress) { + maybeDestroyBridge(address.queueName) } } } @@ -142,71 +130,75 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, private fun configureAndStartServer() { val config = createArtemisConfig() val securityManager = createArtemisSecurityManager() - activeMQServer = ActiveMQServerImpl(config, securityManager).apply { // Throw any exceptions which are detected during startup registerActivationFailureListener { exception -> throw exception } - // Some types of queue might need special preparation on our side, like dialling back or preparing // a lazily initialised subsystem. - registerPostQueueCreationCallback { queueName -> - log.debug("Queue created: $queueName") - if (queueName.startsWith(PEERS_PREFIX) && queueName != NETWORK_MAP_ADDRESS && queueName != myQueueName) { - try { - val identity = parseKeyFromQueueName(queueName.toString()) - val nodeInfo = networkMapCache.getNodeByCompositeKey(identity) - if (nodeInfo != null) { - maybeDeployBridgeForAddress(queueName, nodeInfo.address) - } else { - log.error("Queue created for a peer that we don't know from the network map: $queueName") - } - } catch (e: AddressFormatException) { - log.error("Flow violation: Could not parse queue name as Base 58: $queueName") - } - } - } - - registerPostQueueDeletionCallback { address, qName -> - if (qName == address) - log.debug("Queue deleted: $qName") - else - log.debug("Queue deleted: $qName for $address") - } + registerPostQueueCreationCallback { deployBridgeFromNewPeerQueue(it) } + registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } } } activeMQServer.start() printBasicNodeInfo("Node listening on address", myHostPort.toString()) } + private fun deployBridgeFromNewPeerQueue(queueName: SimpleString) { + log.debug { "Queue created: $queueName" } + if (!queueName.startsWith(PEERS_PREFIX)) return + try { + val identity = CompositeKey.parseFromBase58(queueName.substring(PEERS_PREFIX.length)) + val nodeInfo = networkMapCache.getNodeByCompositeKey(identity) + if (nodeInfo != null) { + val address = nodeInfo.address + if (address is NodeAddress) { + maybeDeployBridgeForAddress(address) + } else { + log.error("Don't know how to deal with $address") + } + } else { + log.error("Queue created for a peer that we don't know from the network map: $queueName") + } + } catch (e: AddressFormatException) { + log.error("Flow violation: Could not parse queue name as Base 58: $queueName") + } + } + private fun createArtemisConfig(): Configuration = ConfigurationImpl().apply { val artemisDir = config.basedir / "artemis" bindingsDirectory = (artemisDir / "bindings").toString() journalDirectory = (artemisDir / "journal").toString() - largeMessagesDirectory = (artemisDir / "largemessages").toString() - acceptorConfigurations = setOf( - tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", myHostPort.port) - ) + largeMessagesDirectory = (artemisDir / "large-messages").toString() + acceptorConfigurations = setOf(tcpTransport(INBOUND, "0.0.0.0", myHostPort.port)) // Enable built in message deduplication. Note we still have to do our own as the delayed commits // and our own definition of commit mean that the built in deduplication cannot remove all duplicates. idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess isPersistIDCache = true isPopulateValidatedUser = true - configureQueueSecurity() + managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS) + // Artemis allows multiple servers to be grouped together into a cluster for load balancing purposes. The cluster + // user is used for connecting the nodes together. It has super-user privileges and so it's imperative that its + // password is changed from the default (as warned in the docs). Since we don't need this feature we turn it off + // by having its password be an unknown securely random 128-bit value. + clusterPassword = BigInteger(128, newSecureRandom()).toString(16) + configureAddressSecurity() } - private fun ConfigurationImpl.configureQueueSecurity() { - val nodeRPCSendRole = restrictedRole(NODE_USER, send = true) // The node needs to be able to send responses on the client queues - + /** + * Authenticated clients connecting to us fall in one of three groups: + * 1. The node hosting us and any of its logically connected components. These are given full access to all valid queues. + * 2. Peers on the same network as us. These are only given permission to send to our P2P inbound queue. + * 3. RPC users. These are only given sufficient access to perform RPC with us. + */ + private fun ConfigurationImpl.configureAddressSecurity() { + val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true) + securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node + securityRoles[P2P_QUEUE] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true)) + securityRoles[RPC_REQUESTS_QUEUE] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true)) for ((username) in userService.users) { - // Clients need to be able to consume the responses on their queues, and they're also responsible for creating and destroying them - val clientRole = restrictedRole(username, consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true) - securityRoles["$CLIENTS_PREFIX$username.rpc.*"] = setOf(nodeRPCSendRole, clientRole) + securityRoles["$CLIENTS_PREFIX$username.rpc.*"] = setOf( + nodeInternalRole, + restrictedRole("$CLIENTS_PREFIX$username", consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true)) } - - // TODO Restrict this down to just what the node needs - securityRoles["#"] = setOf(Role(NODE_USER, true, true, true, true, true, true, true, true)) - securityRoles[RPC_REQUESTS_QUEUE] = setOf( - restrictedRole(NODE_USER, createNonDurableQueue = true, deleteNonDurableQueue = true), - restrictedRole(RPC_REQUESTS_QUEUE, send = true)) // Clients need to be able to send their requests } private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false, @@ -217,14 +209,22 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, } private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager { + val ourRootCAPublicKey = X509Utilities + .loadCertificateFromKeyStore(config.trustStorePath, config.trustStorePassword, CORDA_ROOT_CA) + .publicKey + val ourPublicKey = X509Utilities + .loadCertificateFromKeyStore(config.keyStorePath, config.keyStorePassword, CORDA_CLIENT_CA) + .publicKey val securityConfig = object : SecurityConfiguration() { // Override to make it work with our login module override fun getAppConfigurationEntry(name: String): Array { - val options = mapOf(RPCUserService::class.java.name to userService) + val options = mapOf( + RPCUserService::class.java.name to userService, + CORDA_ROOT_CA to ourRootCAPublicKey, + CORDA_CLIENT_CA to ourPublicKey) return arrayOf(AppConfigurationEntry(name, REQUIRED, options)) } } - return ActiveMQJAASSecurityManager(NodeLoginModule::class.java.name, securityConfig) } @@ -232,39 +232,39 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, private fun addConnector(hostAndPort: HostAndPort) = activeMQServer.configuration.addConnectorConfiguration( hostAndPort.toString(), - tcpTransport( - ConnectionDirection.OUTBOUND, - hostAndPort.hostText, - hostAndPort.port - ) + tcpTransport(OUTBOUND, hostAndPort.hostText, hostAndPort.port) ) private fun bridgeExists(name: SimpleString) = activeMQServer.clusterManager.bridges.containsKey(name.toString()) - private fun deployBridge(hostAndPort: HostAndPort, name: String) { - activeMQServer.deployBridge(BridgeConfiguration().apply { - setName(name) - queueName = name - forwardingAddress = name - staticConnectors = listOf(hostAndPort.toString()) - confirmationWindowSize = 100000 // a guess - isUseDuplicateDetection = true // Enable the bridges automatic deduplication logic - }) + private fun maybeDeployBridgeForAddress(address: ArtemisAddress) { + if (!connectorExists(address.hostAndPort)) { + addConnector(address.hostAndPort) + } + if (!bridgeExists(address.queueName)) { + deployBridge(address) + } } /** - * For every queue created we need to have a bridge deployed in case the address of the queue - * is that of a remote party. + * All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving + * messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it, + * as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's + * P2P address. */ - private fun maybeDeployBridgeForAddress(name: SimpleString, nodeInfo: SingleMessageRecipient) { - require(name.startsWith(PEERS_PREFIX)) - val hostAndPort = toHostAndPort(nodeInfo) - if (hostAndPort == myHostPort) - return - if (!connectorExists(hostAndPort)) - addConnector(hostAndPort) - if (!bridgeExists(name)) - deployBridge(hostAndPort, name.toString()) + private fun deployBridge(address: ArtemisAddress) { + activeMQServer.deployBridge(BridgeConfiguration().apply { + name = address.queueName.toString() + queueName = address.queueName.toString() + forwardingAddress = P2P_QUEUE + staticConnectors = listOf(address.hostAndPort.toString()) + confirmationWindowSize = 100000 // a guess + isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic + // As a peer of the target node we must connect to it using the peer user. Actual authentication is done using + // our TLS certificate. + user = PEER_USER + password = PEER_USER + }) } private fun maybeDestroyBridge(name: SimpleString) { @@ -273,52 +273,89 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, } } - - // We could have used the built-in PropertiesLoginModule but that exposes a roles properties file. Roles are used - // for queue access control and our RPC users must only have access to the queues they need and this cannot be allowed - // to be modified. + /** + * Clients must connect to us with a username and password and must use TLS. If a someone connects with + * [ArtemisMessagingComponent.NODE_USER] then we confirm it's just us as the node by checking their TLS certificate + * is the same as our one in our key store. Then they're given full access to all valid queues. If they connect with + * [ArtemisMessagingComponent.PEER_USER] then we confirm they belong on our P2P network by checking their root CA is + * the same as our root CA. If that's the case the only access they're given is the ablility send to our P2P address. + * In both cases the messages these authenticated nodes send to us are tagged with their subject DN and we assume + * the CN within that is their legal name. + * Otherwise if the username is neither of the above we assume it's an RPC user and authenticate against our list of + * valid RPC users. RPC clients are given permission to perform RPC and nothing else. + */ class NodeLoginModule : LoginModule { companion object { - const val NODE_USER = "Node" + // Include forbidden username character to prevent name clash with any RPC usernames + const val PEER_ROLE = "SystemRoles/Peer" + const val NODE_ROLE = "SystemRoles/Node" + const val RPC_ROLE = "SystemRoles/RPC" } private var loginSucceeded: Boolean = false private lateinit var subject: Subject private lateinit var callbackHandler: CallbackHandler private lateinit var userService: RPCUserService + private lateinit var ourRootCAPublicKey: PublicKey + private lateinit var ourPublicKey: PublicKey private val principals = ArrayList() override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map, options: Map) { this.subject = subject this.callbackHandler = callbackHandler userService = options[RPCUserService::class.java.name] as RPCUserService + ourRootCAPublicKey = options[CORDA_ROOT_CA] as PublicKey + ourPublicKey = options[CORDA_CLIENT_CA] as PublicKey } override fun login(): Boolean { val nameCallback = NameCallback("Username: ") val passwordCallback = PasswordCallback("Password: ", false) + val certificateCallback = CertificateCallback() try { - callbackHandler.handle(arrayOf(nameCallback, passwordCallback)) + callbackHandler.handle(arrayOf(nameCallback, passwordCallback, certificateCallback)) } catch (e: IOException) { throw LoginException(e.message) } catch (e: UnsupportedCallbackException) { throw LoginException("${e.message} not available to obtain information from user") } - val username = nameCallback.name ?: throw FailedLoginException("User name is null") - val receivedPassword = passwordCallback.password ?: throw FailedLoginException("Password is null") - val password = if (username == NODE_USER) "Node" else userService.getUser(username)?.password ?: throw FailedLoginException("User does not exist") - if (password != String(receivedPassword)) { - throw FailedLoginException("Password does not match") + val username = nameCallback.name ?: throw FailedLoginException("Username not provided") + val password = String(passwordCallback.password ?: throw FailedLoginException("Password not provided")) + + val validatedUser = if (username == PEER_USER || username == NODE_USER) { + val certificates = certificateCallback.certificates ?: throw FailedLoginException("No TLS?") + val peerCertificate = certificates.first() + val role = if (username == NODE_USER) { + if (peerCertificate.publicKey != ourPublicKey) { + throw FailedLoginException("Only the node can login as $NODE_USER") + } + NODE_ROLE + } else { + val theirRootCAPublicKey = certificates.last().publicKey + if (theirRootCAPublicKey != ourRootCAPublicKey) { + throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey") + } + PEER_ROLE // This enables the peer to send to our P2P address + } + principals += RolePrincipal(role) + peerCertificate.subjectDN.name + } else { + // Otherwise assume they're an RPC user + val rpcUser = userService.getUser(username) ?: throw FailedLoginException("User does not exist") + if (password != rpcUser.password) { + // TODO Switch to hashed passwords + // TODO Retrieve client IP address to include in exception message + throw FailedLoginException("Password for user $username does not match") + } + principals += RolePrincipal(RPC_ROLE) // This enables the RPC client to send requests + principals += RolePrincipal("$CLIENTS_PREFIX$username") // This enables the RPC client to receive responses + username } - principals += UserPrincipal(username) - principals += RolePrincipal(username) // The roles are configured using the usernames - if (username != NODE_USER) { - principals += RolePrincipal(RPC_REQUESTS_QUEUE) // This enables the RPC client to send requests - } + principals += UserPrincipal(validatedUser) loginSucceeded = true return loginSucceeded @@ -347,5 +384,4 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, loginSucceeded = false } } - } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index dea10db946..7143569d7c 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -19,6 +19,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.* +import org.bouncycastle.asn1.x500.X500Name import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.statements.InsertStatement @@ -56,15 +57,16 @@ class NodeMessagingClient(override val config: NodeConfiguration, val database: Database, val networkMapRegistrationFuture: ListenableFuture) : ArtemisMessagingComponent(), MessagingServiceInternal { companion object { - val log = loggerFor() + private val log = loggerFor() // This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic". // We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint // that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid // confusion. - val TOPIC_PROPERTY = "platform-topic" + const val TOPIC_PROPERTY = "platform-topic" + const val SESSION_ID_PROPERTY = "session-id" - val SESSION_ID_PROPERTY = "session-id" + const val RPC_QUEUE_REMOVALS_QUEUE = "rpc.qremovals" /** * This should be the only way to generate an ArtemisAddress and that only of the remote NetworkMapService node. @@ -77,11 +79,11 @@ class NodeMessagingClient(override val config: NodeConfiguration, private class InnerState { var started = false var running = false - val knownQueues = mutableSetOf() var producer: ClientProducer? = null var p2pConsumer: ClientConsumer? = null var session: ClientSession? = null var clientFactory: ClientSessionFactory? = null + var rpcDispatcher: RPCDispatcher? = null // Consumer for inbound client RPC messages. var rpcConsumer: ClientConsumer? = null var rpcNotificationConsumer: ClientConsumer? = null @@ -89,12 +91,12 @@ class NodeMessagingClient(override val config: NodeConfiguration, /** A registration to handle messages of different types */ data class Handler(val topicSession: TopicSession, - val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration + val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration /** * Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache. */ - override val myAddress: SingleMessageRecipient = toMyAddress(myIdentity, serverHostPort) + override val myAddress: SingleMessageRecipient = if (myIdentity != null) NodeAddress(myIdentity, serverHostPort) else NetworkMapAddress(serverHostPort) private val state = ThreadBox(InnerState()) private val handlers = CopyOnWriteArrayList() @@ -104,13 +106,12 @@ class NodeMessagingClient(override val config: NodeConfiguration, } private val processedMessages: MutableSet = Collections.synchronizedSet( - object : AbstractJDBCHashSet(Table, loadOnInit = true) { - override fun elementFromRow(row: ResultRow): UUID = row[table.uuid] - - override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) { - insert[table.uuid] = entry - } - }) + object : AbstractJDBCHashSet(Table, loadOnInit = true) { + override fun elementFromRow(row: ResultRow): UUID = row[table.uuid] + override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) { + insert[table.uuid] = entry + } + }) fun start(rpcOps: RPCOps, userService: RPCUserService) { state.locked { @@ -118,14 +119,15 @@ class NodeMessagingClient(override val config: NodeConfiguration, started = true log.info("Connecting to server: $serverHostPort") - // Connect to our server. TODO: This should use the in-VM transport. val tcpTransport = tcpTransport(ConnectionDirection.OUTBOUND, serverHostPort.hostText, serverHostPort.port) val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport) clientFactory = locator.createSessionFactory() - // Create a session. Note that the acknowledgement of messages is not flushed to - // the Artermis journal until the default buffer size of 1MB is acknowledged. - val session = clientFactory!!.createSession("Node", "Node", false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) + // Login using the node username. The broker will authentiate us as its node (as opposed to another peer) + // using our TLS certificate. + // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer + // size of 1MB is acknowledged. + val session = clientFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) this.session = session session.start() @@ -133,22 +135,17 @@ class NodeMessagingClient(override val config: NodeConfiguration, producer = session.createProducer() // Create a queue, consumer and producer for handling P2P network messages. - val queueName = toQueueName(myAddress) - val query = session.queueQuery(queueName) - if (!query.isExists) { - session.createQueue(queueName, queueName, true) - } - knownQueues.add(queueName) - p2pConsumer = makeConsumer(session, queueName, true) + createQueueIfAbsent(SimpleString(P2P_QUEUE)) + p2pConsumer = makeP2PConsumer(session, true) networkMapRegistrationFuture.success { state.locked { - log.info("Network map is complete, so removing filter from Artemis consumer.") + log.info("Network map is complete, so removing filter from P2P consumer.") try { p2pConsumer!!.close() } catch(e: ActiveMQObjectClosedException) { // Ignore it: this can happen if the server has gone away before we do. } - p2pConsumer = makeConsumer(session, queueName, false) + p2pConsumer = makeP2PConsumer(session, false) } } @@ -156,10 +153,12 @@ class NodeMessagingClient(override val config: NodeConfiguration, // bridge) and those clients must have authenticated. We could use a single consumer for everything // and perhaps we should, but these queues are not worth persisting. session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE) - session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1") + // The custom name for the queue is intentional - we may wish other things to subscribe to the + // NOTIFICATIONS_ADDRESS with different filters in future + session.createTemporaryQueue(NOTIFICATIONS_ADDRESS, RPC_QUEUE_REMOVALS_QUEUE, "_AMQ_NotifType = 1") rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE) - rpcNotificationConsumer = session.createConsumer("rpc.qremovals") - dispatcher = createRPCDispatcher(rpcOps, userService) + rpcNotificationConsumer = session.createConsumer(RPC_QUEUE_REMOVALS_QUEUE) + rpcDispatcher = createRPCDispatcher(rpcOps, userService) } } @@ -168,13 +167,13 @@ class NodeMessagingClient(override val config: NodeConfiguration, * the original and make another without a filter. We do this so that there is a network map in place for all other * message handlers. */ - private fun makeConsumer(session: ClientSession, queueName: SimpleString, networkMapOnly: Boolean): ClientConsumer { + private fun makeP2PConsumer(session: ClientSession, networkMapOnly: Boolean): ClientConsumer { return if (networkMapOnly) { // Filter for just the network map messages. - val messageFilter = SimpleString("hyphenated_props:$TOPIC_PROPERTY like 'platform.network_map.%'") - session.createConsumer(queueName, messageFilter) + val messageFilter = "hyphenated_props:$TOPIC_PROPERTY like 'platform.network_map.%'" + session.createConsumer(P2P_QUEUE, messageFilter) } else - session.createConsumer(queueName) + session.createConsumer(P2P_QUEUE) } private var shutdownLatch = CountDownLatch(1) @@ -194,7 +193,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, null } ?: return false - val message: Message? = artemisToCordaMessage(artemisMessage) + val message: ReceivedMessage? = artemisToCordaMessage(artemisMessage) if (message != null) deliver(message) @@ -217,11 +216,10 @@ class NodeMessagingClient(override val config: NodeConfiguration, private fun runPreNetworkMap() { val consumer = state.locked { - check(started) + check(started) { "start must be called first" } check(!running) { "run can't be called twice" } running = true - // Optionally, start RPC dispatch. - dispatcher?.start(rpcConsumer!!, rpcNotificationConsumer!!, executor) + rpcDispatcher!!.start(rpcConsumer!!, rpcNotificationConsumer!!, executor) p2pConsumer!! } @@ -254,7 +252,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, shutdownLatch.countDown() } - private fun artemisToCordaMessage(message: ClientMessage): Message? { + private fun artemisToCordaMessage(message: ClientMessage): ReceivedMessage? { try { if (!message.containsProperty(TOPIC_PROPERTY)) { log.warn("Received message without a $TOPIC_PROPERTY property, ignoring") @@ -268,17 +266,18 @@ class NodeMessagingClient(override val config: NodeConfiguration, val sessionID = message.getLongProperty(SESSION_ID_PROPERTY) // Use the magic deduplication property built into Artemis as our message identity too val uuid = UUID.fromString(message.getStringProperty(HDR_DUPLICATE_DETECTION_ID)) - val user = message.getStringProperty(HDR_VALIDATED_USER) + val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" } log.info("Received message from: ${message.address} user: $user topic: $topic sessionID: $sessionID uuid: $uuid") val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } - val msg = object : Message { + val msg = object : ReceivedMessage { override val topicSession = TopicSession(topic, sessionID) override val data: ByteArray = body + override val peer: X500Name = X500Name(user) override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp) override val uniqueMessageId: UUID = uuid - override fun toString() = topic + "#" + data.opaque() + override fun toString() = "$topic#${data.opaque()}" } return msg @@ -288,7 +287,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } - private fun deliver(msg: Message): Boolean { + private fun deliver(msg: ReceivedMessage): Boolean { state.checkNotLocked() // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added // or removed whilst the filter is executing will not affect anything. @@ -324,7 +323,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, return true } - private fun callHandlers(msg: Message, deliverTo: List) { + private fun callHandlers(msg: ReceivedMessage, deliverTo: List) { for (handler in deliverTo) { handler.callback(msg, handler) } @@ -368,40 +367,54 @@ class NodeMessagingClient(override val config: NodeConfiguration, } override fun send(message: Message, target: MessageRecipients) { - val queueName = toQueueName(target) state.locked { + val mqAddress = getMQAddress(target) val artemisMessage = session!!.createMessage(true).apply { val sessionID = message.topicSession.sessionID putStringProperty(TOPIC_PROPERTY, message.topicSession.topic) putLongProperty(SESSION_ID_PROPERTY, sessionID) writeBodyBufferBytes(message.data) // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString())) } - if (knownQueues.add(queueName)) { - maybeCreateQueue(queueName) - } - log.info("send to: $queueName topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}") - producer!!.send(queueName, artemisMessage) + log.info("Send to: $mqAddress topic: ${message.topicSession.topic} sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}") + producer!!.send(mqAddress, artemisMessage) } } - private fun maybeCreateQueue(queueName: SimpleString) { + private fun getMQAddress(target: MessageRecipients): SimpleString { + return if (target == myAddress) { + // If we are sending to ourselves then route the message directly to our P2P queue. + SimpleString(P2P_QUEUE) + } else { + // Otherwise we send the message to an internal queue for the target residing on our broker. It's then the + // broker's job to route the message to the target's P2P queue. + val internalTargetQueue = (target as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address") + createQueueIfAbsent(internalTargetQueue) + internalTargetQueue + } + } + + /** Attempts to create a durable queue on the broker which is bound to an address of the same name. */ + private fun createQueueIfAbsent(queueName: SimpleString) { state.alreadyLocked { val queueQuery = session!!.queueQuery(queueName) if (!queueQuery.isExists) { - log.info("Create fresh queue $queueName") - session!!.createQueue(queueName, queueName, true /* durable */) + log.info("Create fresh queue $queueName bound on same address") + session!!.createQueue(queueName, queueName, true) } } } - override fun addMessageHandler(topic: String, sessionID: Long, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration - = addMessageHandler(TopicSession(topic, sessionID), callback) + override fun addMessageHandler(topic: String, + sessionID: Long, + callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { + return addMessageHandler(TopicSession(topic, sessionID), callback) + } override fun addMessageHandler(topicSession: TopicSession, - callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { + callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { require(!topicSession.isBlank()) { "Topic must not be blank, as the empty topic is a special case." } val handler = Handler(topicSession, callback) handlers.add(handler) @@ -423,8 +436,6 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } - var dispatcher: RPCDispatcher? = null - private fun createRPCDispatcher(ops: RPCOps, userService: RPCUserService) = object : RPCDispatcher(ops, userService) { override fun send(data: SerializedBytes<*>, toAddress: String) { state.locked { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt index bb30852022..dbbd4a9b01 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt @@ -143,8 +143,8 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService) { /** Convert an Artemis [ClientMessage] to a MQ-neutral [ClientRPCRequestMessage]. */ private fun ClientMessage.toRPCRequestMessage(): ClientRPCRequestMessage { val user = getUser(this) - val replyTo = getAuthenticatedAddress(user, ClientRPCRequestMessage.REPLY_TO, true)!! - val observationsTo = getAuthenticatedAddress(user, ClientRPCRequestMessage.OBSERVATIONS_TO, false) + val replyTo = getReturnAddress(user, ClientRPCRequestMessage.REPLY_TO, true)!! + val observationsTo = getReturnAddress(user, ClientRPCRequestMessage.OBSERVATIONS_TO, false) val argBytes = ByteArray(bodySize).apply { bodyBuffer.readBytes(this) } if (argBytes.isEmpty()) { throw RPCException("empty serialized args") @@ -158,12 +158,11 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService) { return userService.getUser(message.requiredString(Message.HDR_VALIDATED_USER.toString()))!! } - private fun ClientMessage.getAuthenticatedAddress(user: User, property: String, required: Boolean): String? { - val address: String? = if (required) requiredString(property) else getStringProperty(property) - val expectedAddressPrefix = "${ArtemisMessagingComponent.CLIENTS_PREFIX}${user.username}." - if (address != null && !address.startsWith(expectedAddressPrefix)) { - throw RPCException("$property address does not match up with the user") + private fun ClientMessage.getReturnAddress(user: User, property: String, required: Boolean): String? { + return if (containsProperty(property)) { + "${ArtemisMessagingComponent.CLIENTS_PREFIX}${user.username}.rpc.${getLongProperty(property)}" + } else { + if (required) throw RPCException("missing $property property") else null } - return address } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 82c575efea..c929cb6319 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -196,7 +196,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, val session = FlowSession(sessionFlow, nodeIdentity, random63BitValue(), null) openSessions[Pair(sessionFlow, nodeIdentity)] = session val counterpartyFlow = sessionFlow.getCounterpartyMarker(nodeIdentity).name - val sessionInit = SessionInit(session.ourSessionId, serviceHub.myInfo.legalIdentity, counterpartyFlow, firstPayload) + val sessionInit = SessionInit(session.ourSessionId, counterpartyFlow, firstPayload) val sessionInitResponse = sendAndReceiveInternal(session, sessionInit) if (sessionInitResponse is SessionConfirm) { session.otherPartySessionId = sessionInitResponse.initiatedSessionId diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 90c2d060ea..79811febc3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -211,7 +211,18 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val sessionMessage = message.data.deserialize() when (sessionMessage) { is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage) - is SessionInit -> onSessionInit(sessionMessage) + is SessionInit -> { + // TODO SECURITY Look up the party with the full X.500 name instead of just the legal name which + // isn't required to be unique + // TODO For now have the doorman block signups with identical names, and names with characters that + // are used in X.500 name textual serialisation + val otherParty = serviceHub.networkMapCache.getNodeByLegalName(message.peerLegalName)?.legalIdentity + if (otherParty != null) { + onSessionInit(sessionMessage, otherParty) + } else { + logger.error("Unknown peer ${message.peer} in $sessionMessage") + } + } } } } @@ -254,10 +265,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } } - private fun onSessionInit(sessionInit: SessionInit) { - logger.trace { "Received $sessionInit" } - //TODO Verify the other party are who they say they are from the TLS subsystem - val otherParty = sessionInit.initiatorParty + private fun onSessionInit(sessionInit: SessionInit, otherParty: Party) { + logger.trace { "Received $sessionInit $otherParty" } val otherPartySessionId = sessionInit.initiatorSessionId try { val markerClass = Class.forName(sessionInit.flowName) @@ -446,10 +455,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val recipientSessionId: Long } - data class SessionInit(val initiatorSessionId: Long, - val initiatorParty: Party, - val flowName: String, - val firstPayload: Any?) : SessionMessage + data class SessionInit(val initiatorSessionId: Long, val flowName: String, val firstPayload: Any?) : SessionMessage interface SessionInitResponse : ExistingSessionMessage diff --git a/node/src/test/kotlin/net/corda/node/messaging/InMemoryMessagingTests.kt b/node/src/test/kotlin/net/corda/node/messaging/InMemoryMessagingTests.kt index 3a47a787af..a5ee0e1445 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/InMemoryMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/InMemoryMessagingTests.kt @@ -1,5 +1,3 @@ -@file:Suppress("UNUSED_VARIABLE") - package net.corda.node.messaging import net.corda.core.messaging.Message @@ -9,7 +7,6 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.node.services.ServiceInfo import net.corda.node.services.network.NetworkMapService import net.corda.testing.node.MockNetwork -import org.junit.Before import org.junit.Test import java.util.* import kotlin.test.assertEquals @@ -17,12 +14,7 @@ import kotlin.test.assertFails import kotlin.test.assertTrue class InMemoryMessagingTests { - lateinit var network: MockNetwork - - @Before - fun setUp() { - network = MockNetwork() - } + val network = MockNetwork() @Test fun topicStringValidation() { @@ -115,6 +107,5 @@ class InMemoryMessagingTests { node2.net.send(validMessage2, node1.net.myAddress) network.runNetwork() assertEquals(2, received) - } } diff --git a/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt index c93126dde3..04a22a9d0a 100644 --- a/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/ArtemisMessagingTests.kt @@ -13,17 +13,19 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID import net.corda.core.utilities.LogHelper import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.messaging.RPCOps import net.corda.node.services.network.InMemoryNetworkMapCache import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.PersistentUniquenessProvider -import net.corda.node.utilities.AffinityExecutor +import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.databaseTransaction import net.corda.testing.freeLocalHostAndPort import net.corda.testing.node.makeTestDataSourceProperties +import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.jetbrains.exposed.sql.Database import org.junit.After @@ -39,7 +41,6 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import kotlin.concurrent.thread import kotlin.test.assertEquals import kotlin.test.assertNull -import kotlin.test.assertTrue class ArtemisMessagingTests { @Rule @JvmField val temporaryFolder = TemporaryFolder() @@ -57,7 +58,6 @@ class ArtemisMessagingTests { var messagingClient: NodeMessagingClient? = null var messagingServer: ArtemisMessagingServer? = null - val networkMapCache = InMemoryNetworkMapCache() val rpcOps = object : RPCOps { @@ -196,7 +196,7 @@ class ArtemisMessagingTests { createAndStartClientAndServer(receivedMessages) for (iter in 1..iterations) { val firstActual: Message = receivedMessages.take() - assertTrue(String(firstActual.data).equals("first msg $iter")) + assertThat(String(firstActual.data)).isEqualTo("first msg $iter") } assertNull(receivedMessages.poll(200, MILLISECONDS)) } @@ -223,16 +223,22 @@ class ArtemisMessagingTests { private fun createMessagingClient(server: HostAndPort = hostAndPort): NodeMessagingClient { return databaseTransaction(database) { - NodeMessagingClient(config, server, identity.public.composite, AffinityExecutor.ServiceAffinityExecutor("ArtemisMessagingTests", 1), database, networkMapRegistrationFuture).apply { - configureWithDevSSLCertificate() + NodeMessagingClient( + config, + server, + identity.public.composite, + ServiceAffinityExecutor("ArtemisMessagingTests", 1), + database, + networkMapRegistrationFuture).apply { + config.configureWithDevSSLCertificate() messagingClient = this } } } private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer { - return ArtemisMessagingServer(config, local, identity.public.composite, networkMapCache, userService).apply { - configureWithDevSSLCertificate() + return ArtemisMessagingServer(config, local, networkMapCache, userService).apply { + config.configureWithDevSSLCertificate() messagingServer = this } } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt index c54f08597f..3bfbcfb76f 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt @@ -197,14 +197,14 @@ class StateMachineManagerTests { assertThat(node3Flow.receivedPayloads[0]).isEqualTo(payload) assertSessionTransfers(node2, - node1 sent sessionInit(node1, SendFlow::class, payload) to node2, + node1 sent sessionInit(SendFlow::class, payload) to node2, node2 sent sessionConfirm() to node1, node1 sent sessionEnd() to node2 //There's no session end from the other flows as they're manually suspended ) assertSessionTransfers(node3, - node1 sent sessionInit(node1, SendFlow::class, payload) to node3, + node1 sent sessionInit(SendFlow::class, payload) to node3, node3 sent sessionConfirm() to node1, node1 sent sessionEnd() to node3 //There's no session end from the other flows as they're manually suspended @@ -230,14 +230,14 @@ class StateMachineManagerTests { assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(node3Payload) assertSessionTransfers(node2, - node1 sent sessionInit(node1, ReceiveThenSuspendFlow::class) to node2, + node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node2, node2 sent sessionConfirm() to node1, node2 sent sessionData(node2Payload) to node1, node2 sent sessionEnd() to node1 ) assertSessionTransfers(node3, - node1 sent sessionInit(node1, ReceiveThenSuspendFlow::class) to node3, + node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node3, node3 sent sessionConfirm() to node1, node3 sent sessionData(node3Payload) to node1, node3 sent sessionEnd() to node1 @@ -251,7 +251,7 @@ class StateMachineManagerTests { net.runNetwork() assertSessionTransfers( - node1 sent sessionInit(node1, PingPongFlow::class, 10L) to node2, + node1 sent sessionInit(PingPongFlow::class, 10L) to node2, node2 sent sessionConfirm() to node1, node2 sent sessionData(20L) to node1, node1 sent sessionData(11L) to node2, @@ -267,7 +267,7 @@ class StateMachineManagerTests { net.runNetwork() assertThatThrownBy { future.getOrThrow() }.isInstanceOf(FlowSessionException::class.java) assertSessionTransfers( - node1 sent sessionInit(node1, ReceiveThenSuspendFlow::class) to node2, + node1 sent sessionInit(ReceiveThenSuspendFlow::class) to node2, node2 sent sessionConfirm() to node1, node2 sent sessionEnd() to node1 ) @@ -288,9 +288,7 @@ class StateMachineManagerTests { return smm.findStateMachines(P::class.java).single() } - private fun sessionInit(initiatorNode: MockNode, flowMarker: KClass<*>, payload: Any? = null): SessionInit { - return SessionInit(0, initiatorNode.info.legalIdentity, flowMarker.java.name, payload) - } + private fun sessionInit(flowMarker: KClass<*>, payload: Any? = null) = SessionInit(0, flowMarker.java.name, payload) private fun sessionConfirm() = SessionConfirm(0, 0) @@ -314,7 +312,7 @@ class StateMachineManagerTests { private fun Observable.toSessionTransfers(): Observable { return filter { it.message.topicSession == StateMachineManager.sessionTopic }.map { - val from = it.sender.myAddress.id + val from = it.sender.id val message = it.message.data.deserialize() val to = (it.recipients as InMemoryMessagingNetwork.Handle).id SessionTransfer(from, sanitise(message), to) @@ -371,7 +369,6 @@ class StateMachineManagerTests { @Suspendable override fun call() { receivedPayloads = otherParties.map { receive(it).unwrap { it } } - println(receivedPayloads) Fiber.park() } } @@ -384,9 +381,7 @@ class StateMachineManagerTests { @Suspendable override fun call() { receivedPayload = sendAndReceive(otherParty, payload).unwrap { it } - println("${fsm.id} Received $receivedPayload") receivedPayload2 = sendAndReceive(otherParty, payload + 1).unwrap { it } - println("${fsm.id} Received $receivedPayload2") } } diff --git a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/NetworkMapVisualiser.kt b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/NetworkMapVisualiser.kt index 0d301f7b2e..eeef8c7313 100644 --- a/samples/network-visualiser/src/main/kotlin/net/corda/netmap/NetworkMapVisualiser.kt +++ b/samples/network-visualiser/src/main/kotlin/net/corda/netmap/NetworkMapVisualiser.kt @@ -109,7 +109,7 @@ class NetworkMapVisualiser : Application() { } // Fire the message bullets between nodes. simulation.network.messagingNetwork.sentMessages.observeOn(uiThread).subscribe { msg: InMemoryMessagingNetwork.MessageTransfer -> - val senderNode: MockNetwork.MockNode = simulation.network.addressToNode(msg.sender.myAddress) + val senderNode: MockNetwork.MockNode = simulation.network.addressToNode(msg.sender) val destNode: MockNetwork.MockNode = simulation.network.addressToNode(msg.recipients as SingleMessageRecipient) if (transferIsInteresting(msg)) { @@ -345,7 +345,7 @@ class NetworkMapVisualiser : Application() { private fun transferIsInteresting(transfer: InMemoryMessagingNetwork.MessageTransfer): Boolean { // Loopback messages are boring. - if (transfer.sender.myAddress == transfer.recipients) return false + if (transfer.sender == transfer.recipients) return false // Network map push acknowledgements are boring. if (NetworkMapService.PUSH_ACK_FLOW_TOPIC in transfer.message.topicSession.topic) return false val message = transfer.message.data.deserialize() diff --git a/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt b/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt new file mode 100644 index 0000000000..d679beb890 --- /dev/null +++ b/test-utils/src/main/kotlin/net/corda/testing/messaging/SimpleMQClient.kt @@ -0,0 +1,36 @@ +package net.corda.testing.messaging + +import com.google.common.net.HostAndPort +import net.corda.node.services.config.NodeSSLConfiguration +import net.corda.node.services.config.configureTestSSL +import net.corda.node.services.messaging.ArtemisMessagingComponent +import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.OUTBOUND +import org.apache.activemq.artemis.api.core.client.* + +/** + * As the name suggests this is a simple client for connecting to MQ brokers. + */ +class SimpleMQClient(val target: HostAndPort) : ArtemisMessagingComponent() { + override val config: NodeSSLConfiguration = configureTestSSL() + lateinit var sessionFactory: ClientSessionFactory + lateinit var session: ClientSession + lateinit var producer: ClientProducer + + fun start(username: String? = null, password: String? = null) { + val tcpTransport = tcpTransport(OUTBOUND, target.hostText, target.port) + val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { + isBlockOnNonDurableSend = true + threadPoolMaxSize = 1 + } + sessionFactory = locator.createSessionFactory() + session = sessionFactory.createSession(username, password, false, true, true, locator.isPreAcknowledge, locator.ackBatchSize) + session.start() + producer = session.createProducer() + } + + fun createMessage(): ClientMessage = session.createMessage(false) + + fun stop() { + sessionFactory.close() + } +} \ No newline at end of file diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index 50a2650aab..62cc909c8e 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -5,6 +5,7 @@ import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture import net.corda.core.ThreadBox import net.corda.core.getOrThrow +import net.corda.core.crypto.X509Utilities import net.corda.core.messaging.* import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.trace @@ -14,6 +15,7 @@ import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.JDBCHashSet import net.corda.node.utilities.databaseTransaction import net.corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging +import org.bouncycastle.asn1.x500.X500Name import org.jetbrains.exposed.sql.Database import org.slf4j.LoggerFactory import rx.Observable @@ -43,8 +45,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria private var counter = 0 // -1 means stopped. private val handleEndpointMap = HashMap() - data class MessageTransfer(val sender: InMemoryMessaging, val message: Message, val recipients: MessageRecipients) { - override fun toString() = "${message.topicSession} from '${sender.myAddress}' to '$recipients'" + data class MessageTransfer(val sender: Handle, val message: Message, val recipients: MessageRecipients) { + override fun toString() = "${message.topicSession} from '$sender' to '$recipients'" } // All sent messages are kept here until pumpSend is called, or manuallyPumped is set to false @@ -85,8 +87,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria @Synchronized fun createNode(manuallyPumped: Boolean, executor: AffinityExecutor, - database: Database) - : Pair> { + database: Database): Pair> { check(counter >= 0) { "In memory network stopped: please recreate." } val builder = createNodeWithID(manuallyPumped, counter, executor, database = database) as Builder counter++ @@ -118,8 +119,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria @Synchronized private fun msgSend(from: InMemoryMessaging, message: Message, recipients: MessageRecipients) { - val transfer = MessageTransfer(from, message, recipients) - messageSendQueue.add(transfer) + messageSendQueue += MessageTransfer(from.myAddress, message, recipients) } @Synchronized @@ -164,15 +164,13 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria // If block is set to true this function will only return once a message has been pushed onto the recipients' queues fun pumpSend(block: Boolean): MessageTransfer? { val transfer = (if (block) messageSendQueue.take() else messageSendQueue.poll()) ?: return null - val recipients = transfer.recipients - val from = transfer.sender.myAddress log.trace { transfer.toString() } val calc = latencyCalculator - if (calc != null && recipients is SingleMessageRecipient) { + if (calc != null && transfer.recipients is SingleMessageRecipient) { val messageSent = SettableFuture.create() // Inject some artificial latency. - timer.schedule(calc.between(from, recipients).toMillis()) { + timer.schedule(calc.between(transfer.sender, transfer.recipients).toMillis()) { pumpSendInternal(transfer) messageSent.set(Unit) } @@ -189,7 +187,6 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria fun pumpSendInternal(transfer: MessageTransfer) { when (transfer.recipients) { is Handle -> getQueueForHandle(transfer.recipients).add(transfer) - is AllPossibleRecipients -> { // This means all possible recipients _that the network knows about at the time_, not literally everyone // who joins into the indefinite future. @@ -214,14 +211,14 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria private val executor: AffinityExecutor, private val database: Database) : SingletonSerializeAsToken(), MessagingServiceInternal { inner class Handler(val topicSession: TopicSession, - val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration + val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration @Volatile private var running = true private inner class InnerState { val handlers: MutableList = ArrayList() - val pendingRedelivery = JDBCHashSet("pending_messages",loadOnInit = true) + val pendingRedelivery = JDBCHashSet("pending_messages", loadOnInit = true) } private val state = ThreadBox(InnerState()) @@ -240,23 +237,22 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria } } - override fun addMessageHandler(topic: String, sessionID: Long, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration + override fun addMessageHandler(topic: String, sessionID: Long, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration = addMessageHandler(TopicSession(topic, sessionID), callback) - override fun addMessageHandler(topicSession: TopicSession, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { + override fun addMessageHandler(topicSession: TopicSession, callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { check(running) - val (handler, items) = state.locked { + val (handler, transfers) = state.locked { val handler = Handler(topicSession, callback).apply { handlers.add(this) } - val pending = ArrayList() + val pending = ArrayList() databaseTransaction(database) { pending.addAll(pendingRedelivery) pendingRedelivery.clear() } Pair(handler, pending) } - for (message in items) { - send(message, handle) - } + + transfers.forEach { pumpSendInternal(it) } return handler } @@ -323,9 +319,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria while (deliverTo == null) { val transfer = (if (block) q.take() else q.poll()) ?: return null deliverTo = state.locked { - val h = handlers.filter { if (it.topicSession.isBlank()) true else transfer.message.topicSession == it.topicSession } - - if (h.isEmpty()) { + val matchingHandlers = handlers.filter { it.topicSession.isBlank() || transfer.message.topicSession == it.topicSession } + if (matchingHandlers.isEmpty()) { // Got no handlers for this message yet. Keep the message around and attempt redelivery after a new // handler has been registered. The purpose of this path is to make unit tests that have multi-threading // reliable, as a sender may attempt to send a message to a receiver that hasn't finished setting @@ -333,11 +328,11 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria // least sometimes. log.warn("Message to ${transfer.message.topicSession} could not be delivered") databaseTransaction(database) { - pendingRedelivery.add(transfer.message) + pendingRedelivery.add(transfer) } null } else { - h + matchingHandlers } } if (deliverTo != null) { @@ -357,7 +352,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria databaseTransaction(database) { for (handler in deliverTo) { try { - handler.callback(transfer.message, handler) + handler.callback(transfer.toReceivedMessage(), handler) } catch(e: Exception) { log.error("Caught exception in handler for $this/${handler.topicSession}", e) } @@ -371,5 +366,13 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria } return transfer } + + private fun MessageTransfer.toReceivedMessage() = object : ReceivedMessage { + override val topicSession: TopicSession get() = message.topicSession + override val data: ByteArray get() = message.data + override val peer: X500Name get() = X509Utilities.getDevX509Name(sender.description) + override val debugTimestamp: Instant get() = message.debugTimestamp + override val uniqueMessageId: UUID get() = message.uniqueMessageId + } } } diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt new file mode 100644 index 0000000000..381c5edeae --- /dev/null +++ b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt @@ -0,0 +1,69 @@ +package net.corda.testing.node + +import net.corda.core.getOrThrow +import net.corda.node.internal.Node +import net.corda.node.services.User +import net.corda.node.services.config.ConfigHelper +import net.corda.node.services.config.FullNodeConfiguration +import net.corda.testing.freeLocalHostAndPort +import org.junit.After +import org.junit.Before +import org.junit.Rule +import org.junit.rules.TemporaryFolder +import java.util.* +import kotlin.concurrent.thread + +/** + * Extend this class if you need to run nodes in a test. You could use the driver DSL but it's extremely slow for testing + * purposes. + */ +abstract class NodeBasedTest { + @Rule + @JvmField + val tempFolder = TemporaryFolder() + + private val nodes = ArrayList() + lateinit var networkMapNode: Node + + @Before + fun startNetworkMapNode() { + networkMapNode = startNode("Network Map", emptyMap()) + } + + @After + fun stopNodes() { + nodes.forEach(Node::stop) + } + + fun startNode(legalName: String, rpcUsers: List = emptyList()): Node { + return startNode(legalName, mapOf( + "networkMapAddress" to networkMapNode.configuration.artemisAddress.toString(), + "rpcUsers" to rpcUsers.map { mapOf( + "user" to it.username, + "password" to it.password, + "permissions" to it.permissions) + } + )) + } + + private fun startNode(legalName: String, config: Map): Node { + val config = ConfigHelper.loadConfig( + baseDirectoryPath = tempFolder.newFolder(legalName).toPath(), + allowMissingConfig = true, + configOverrides = config + mapOf( + "myLegalName" to legalName, + "artemisAddress" to freeLocalHostAndPort().toString(), + "extraAdvertisedServiceIds" to "" + ) + ) + + val node = FullNodeConfiguration(config).createNode() + node.start() + nodes += node + thread(name = legalName) { + node.run() + } + node.networkMapRegistrationFuture.getOrThrow() + return node + } +} \ No newline at end of file