From 7d5ee8ba08f91da3e12d70e0573c8725a24ed11d Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Thu, 28 Jul 2016 11:24:52 +0100 Subject: [PATCH] Create MessageServiceInternal interface to allow NetworkMapCache to register addresses with the network service. Activate WhitelistTrustManager for now to secure the TLS messaging. --- .../com/r3corda/core/messaging/Messaging.kt | 15 ----------- .../com/r3corda/node/internal/AbstractNode.kt | 15 ++++++----- .../kotlin/com/r3corda/node/internal/Node.kt | 3 ++- .../r3corda/node/internal/testing/MockNode.kt | 3 ++- .../node/services/api/ServiceHubInternal.kt | 25 +++++++++++++++++++ .../messaging/ArtemisMessagingService.kt | 18 ++++++++++++- .../network/InMemoryMessagingNetwork.kt | 7 +++++- .../network/InMemoryNetworkMapCache.kt | 4 ++- .../services/network/MockNetworkMapCache.kt | 2 +- .../com/r3corda/node/services/MockServices.kt | 6 ++--- .../statemachine/StateMachineManagerTests.kt | 3 ++- 11 files changed, 68 insertions(+), 33 deletions(-) diff --git a/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt b/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt index 43da9925fe..e89d7863af 100644 --- a/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt +++ b/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt @@ -53,8 +53,6 @@ interface MessagingService { */ fun send(message: Message, target: MessageRecipients) - fun stop() - /** * Returns an initialised [Message] with the current time, etc, already filled in. */ @@ -83,19 +81,6 @@ fun MessagingService.send(topic: String, payload: Any, to: MessageRecipients) { send(createMessage(topic, payload.serialize().bits), to) } -/** - * This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods - * on the messaging service interface until you have successfully started up the system. One of these objects should - * be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations - * may let you cast the returned future to an object that lets you get status info. - * - * A specific implementation of the controller class will have extra features that let you customise it before starting - * it up. - */ -interface MessagingServiceBuilder { - fun start(): ListenableFuture -} - interface MessageHandlerRegistration /** diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index e8a14d8a70..a7ffa45446 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -21,10 +21,7 @@ import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize import com.r3corda.node.api.APIServer import com.r3corda.node.services.NotaryChangeService -import com.r3corda.node.services.api.AcceptsFileUpload -import com.r3corda.node.services.api.CheckpointStorage -import com.r3corda.node.services.api.MonitoringService -import com.r3corda.node.services.api.ServiceHubInternal +import com.r3corda.node.services.api.* import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.events.NodeSchedulerService @@ -87,8 +84,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val servicesThatAcceptUploads: List = _servicesThatAcceptUploads val services = object : ServiceHubInternal() { - override val networkService: MessagingService get() = net - override val networkMapCache: NetworkMapCache = InMemoryNetworkMapCache() + override val networkService: MessagingServiceInternal get() = net + override val networkMapCache: NetworkMapCache get() = netMapCache override val storageService: TxWritableStorageService get() = storage override val walletService: WalletService get() = wallet override val keyManagementService: KeyManagementService get() = keyManagement @@ -122,7 +119,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, var inNodeNetworkMapService: NetworkMapService? = null var inNodeNotaryService: NotaryService? = null lateinit var identity: IdentityService - lateinit var net: MessagingService + lateinit var net: MessagingServiceInternal + lateinit var netMapCache: NetworkMapCache lateinit var api: APIServer lateinit var scheduler: SchedulerService lateinit var protocolLogicFactory: ProtocolLogicRefFactory @@ -151,6 +149,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, storage = storageServices.first checkpointStorage = storageServices.second net = makeMessagingService() + netMapCache = InMemoryNetworkMapCache(net) wallet = NodeWalletService(services) makeInterestRatesOracleService() identity = makeIdentityService() @@ -312,7 +311,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, net.stop() } - protected abstract fun makeMessagingService(): MessagingService + protected abstract fun makeMessagingService(): MessagingServiceInternal protected abstract fun startMessagingService() diff --git a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt index 489d116eda..442bb6c300 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt @@ -8,6 +8,7 @@ import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.ServiceType import com.r3corda.core.utilities.loggerFor import com.r3corda.node.serialization.NodeClock +import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingService import com.r3corda.node.servlets.AttachmentDownloadServlet @@ -67,7 +68,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort, // when our process shuts down, but we try in stop() anyway just to be nice. private var nodeFileLock: FileLock? = null - override fun makeMessagingService(): MessagingService = ArtemisMessagingService(dir, p2pAddr, configuration, serverThread) + override fun makeMessagingService(): MessagingServiceInternal = ArtemisMessagingService(dir, p2pAddr, configuration, serverThread) override fun startMessagingService() { // Start up the MQ service. diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt index ce8b0eae3a..ccb7931821 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/MockNode.kt @@ -12,6 +12,7 @@ import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.testing.MockIdentityService import com.r3corda.core.utilities.loggerFor import com.r3corda.node.internal.AbstractNode +import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.network.InMemoryMessagingNetwork import com.r3corda.node.services.network.NetworkMapService @@ -77,7 +78,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, // We only need to override the messaging service here, as currently everything that hits disk does so // through the java.nio API which we are already mocking via Jimfs. - override fun makeMessagingService(): MessagingService { + override fun makeMessagingService(): MessagingServiceInternal { require(id >= 0) { "Node ID must be zero or positive, was passed: " + id } return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, configuration.myLegalName).start().get() } diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt index e55e853235..3fe120b410 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/api/ServiceHubInternal.kt @@ -2,15 +2,40 @@ package com.r3corda.node.services.api import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.core.messaging.MessagingService +import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.TxWritableStorageService import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogicRefFactory +interface MessagingServiceInternal: MessagingService { + fun stop() + + // Allow messaging service to be signalled by the NetworkMapCache about Nodes + // Thus providing an opportunity to permission the other Node and possibly to setup a link + fun registerTrustedAddress(address: SingleMessageRecipient) +} + +/** + * This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods + * on the messaging service interface until you have successfully started up the system. One of these objects should + * be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations + * may let you cast the returned future to an object that lets you get status info. + * + * A specific implementation of the controller class will have extra features that let you customise it before starting + * it up. + */ +interface MessagingServiceBuilder { + fun start(): ListenableFuture +} + abstract class ServiceHubInternal : ServiceHub { abstract val monitoringService: MonitoringService abstract val protocolLogicRefFactory: ProtocolLogicRefFactory + abstract override val networkService: MessagingServiceInternal + /** * Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then * sends them to the wallet for further processing. This is intended for implementations to call from diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt index b38f6c0fc6..b4f6e2cb08 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt @@ -3,12 +3,15 @@ package com.r3corda.node.services.messaging import com.google.common.net.HostAndPort import com.r3corda.core.RunOnCallerThread import com.r3corda.core.ThreadBox +import com.r3corda.core.crypto.WhitelistTrustManagerProvider import com.r3corda.core.crypto.X509Utilities import com.r3corda.core.crypto.newSecureRandom +import com.r3corda.core.crypto.registerWhitelistTrustManager import com.r3corda.core.messaging.* import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.utilities.loggerFor import com.r3corda.node.internal.Node +import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.config.NodeConfiguration import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.TransportConfiguration @@ -57,12 +60,19 @@ import javax.annotation.concurrent.ThreadSafe class ArtemisMessagingService(val directory: Path, val myHostPort: HostAndPort, val config: NodeConfiguration, - val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingService { + val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingServiceInternal { // In future: can contain onion routing info, etc. private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient companion object { + init { + // Until https://issues.apache.org/jira/browse/ARTEMIS-656 is resolved gate acceptable + // certificate hosts manually. + registerWhitelistTrustManager() + } + + val log = loggerFor() // This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic". @@ -240,6 +250,12 @@ class ArtemisMessagingService(val directory: Path, } } + override fun registerTrustedAddress(address: SingleMessageRecipient) { + require(address is Address) { "Address is not an Artemis Message Address" } + val hostName = (address as Address).hostAndPort.hostText + WhitelistTrustManagerProvider.addWhitelistEntry(hostName) + } + override fun send(message: Message, target: MessageRecipients) { if (target !is Address) TODO("Only simple sends to single recipients are currently implemented") diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt index 5ff369304f..5b54e81461 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryMessagingNetwork.kt @@ -10,6 +10,8 @@ import com.r3corda.core.messaging.* import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.trace +import com.r3corda.node.services.api.MessagingServiceBuilder +import com.r3corda.node.services.api.MessagingServiceInternal import org.slf4j.LoggerFactory import rx.Observable import rx.subjects.PublishSubject @@ -194,7 +196,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria * An instance can be obtained by creating a builder and then using the start method. */ @ThreadSafe - inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), MessagingService { + inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), MessagingServiceInternal { + inner class Handler(val executor: Executor?, val topic: String, val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration @@ -221,6 +224,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria } } + override fun registerTrustedAddress(address: SingleMessageRecipient) {} + override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { check(running) val (handler, items) = state.locked { diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt index fa5cbb2e85..0f0c21ee54 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/InMemoryNetworkMapCache.kt @@ -18,6 +18,7 @@ import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize +import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.api.RegulatorService import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.node.services.transactions.NotaryService @@ -31,7 +32,7 @@ import javax.annotation.concurrent.ThreadSafe * Extremely simple in-memory cache of the network map. */ @ThreadSafe -open class InMemoryNetworkMapCache() : SingletonSerializeAsToken(), NetworkMapCache { +open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) : SingletonSerializeAsToken(), NetworkMapCache { override val networkMapNodes: List get() = get(NetworkMapService.Type) override val regulators: List @@ -93,6 +94,7 @@ open class InMemoryNetworkMapCache() : SingletonSerializeAsToken(), NetworkMapCa override fun addNode(node: NodeInfo) { registeredNodes[node.identity] = node + netInternal?.registerTrustedAddress(node.address) } override fun removeNode(node: NodeInfo) { diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt b/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt index a59c936e5a..bf93437555 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/MockNetworkMapCache.kt @@ -9,7 +9,7 @@ import com.r3corda.core.node.NodeInfo /** * Network map cache with no backing map service. */ -class MockNetworkMapCache() : InMemoryNetworkMapCache() { +class MockNetworkMapCache() : InMemoryNetworkMapCache(null) { data class MockAddress(val id: String): SingleMessageRecipient init { diff --git a/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt b/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt index 9b6203ec49..05b6f2d5fb 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt @@ -3,13 +3,13 @@ package com.r3corda.node.services import com.codahale.metrics.MetricRegistry import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.contracts.SignedTransaction -import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.services.* import com.r3corda.core.node.services.testing.MockStorageService import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE import com.r3corda.node.serialization.NodeClock +import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.api.MonitoringService import com.r3corda.node.services.api.ServiceHubInternal import com.r3corda.node.services.network.MockNetworkMapCache @@ -22,7 +22,7 @@ import java.time.Clock open class MockServices( customWallet: WalletService? = null, val keyManagement: KeyManagementService? = null, - val net: MessagingService? = null, + val net: MessagingServiceInternal? = null, val identity: IdentityService? = MOCK_IDENTITY_SERVICE, val storage: TxWritableStorageService? = MockStorageService(), val mapCache: NetworkMapCache? = MockNetworkMapCache(), @@ -36,7 +36,7 @@ open class MockServices( get() = keyManagement ?: throw UnsupportedOperationException() override val identityService: IdentityService get() = identity ?: throw UnsupportedOperationException() - override val networkService: MessagingService + override val networkService: MessagingServiceInternal get() = net ?: throw UnsupportedOperationException() override val networkMapCache: NetworkMapCache get() = mapCache ?: throw UnsupportedOperationException() diff --git a/node/src/test/kotlin/com/r3corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/com/r3corda/node/services/statemachine/StateMachineManagerTests.kt index 5a7dd02003..4fd25015bd 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/statemachine/StateMachineManagerTests.kt @@ -7,6 +7,7 @@ import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.node.services.MockServices import com.r3corda.node.services.api.Checkpoint import com.r3corda.node.services.api.CheckpointStorage +import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.network.InMemoryMessagingNetwork import com.r3corda.node.utilities.AffinityExecutor import org.assertj.core.api.Assertions.assertThat @@ -45,7 +46,7 @@ class StateMachineManagerTests { } private fun createManager() = StateMachineManager(object : MockServices() { - override val networkService: MessagingService get() = network + override val networkService: MessagingServiceInternal get() = network }, emptyList(), checkpointStorage, AffinityExecutor.SAME_THREAD)