Create MessageServiceInternal interface to allow NetworkMapCache to register addresses with the network service.

Activate WhitelistTrustManager for now to secure the TLS messaging.
This commit is contained in:
Matthew Nesbit 2016-07-28 11:24:52 +01:00
parent 7e6c1332e7
commit 7d5ee8ba08
11 changed files with 68 additions and 33 deletions

View File

@ -53,8 +53,6 @@ interface MessagingService {
*/ */
fun send(message: Message, target: MessageRecipients) fun send(message: Message, target: MessageRecipients)
fun stop()
/** /**
* Returns an initialised [Message] with the current time, etc, already filled in. * Returns an initialised [Message] with the current time, etc, already filled in.
*/ */
@ -83,19 +81,6 @@ fun MessagingService.send(topic: String, payload: Any, to: MessageRecipients) {
send(createMessage(topic, payload.serialize().bits), to) send(createMessage(topic, payload.serialize().bits), to)
} }
/**
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
* on the messaging service interface until you have successfully started up the system. One of these objects should
* be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations
* may let you cast the returned future to an object that lets you get status info.
*
* A specific implementation of the controller class will have extra features that let you customise it before starting
* it up.
*/
interface MessagingServiceBuilder<out T : MessagingService> {
fun start(): ListenableFuture<out T>
}
interface MessageHandlerRegistration interface MessageHandlerRegistration
/** /**

View File

@ -21,10 +21,7 @@ import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize import com.r3corda.core.serialization.serialize
import com.r3corda.node.api.APIServer import com.r3corda.node.api.APIServer
import com.r3corda.node.services.NotaryChangeService import com.r3corda.node.services.NotaryChangeService
import com.r3corda.node.services.api.AcceptsFileUpload import com.r3corda.node.services.api.*
import com.r3corda.node.services.api.CheckpointStorage
import com.r3corda.node.services.api.MonitoringService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.events.NodeSchedulerService import com.r3corda.node.services.events.NodeSchedulerService
@ -87,8 +84,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
val servicesThatAcceptUploads: List<AcceptsFileUpload> = _servicesThatAcceptUploads val servicesThatAcceptUploads: List<AcceptsFileUpload> = _servicesThatAcceptUploads
val services = object : ServiceHubInternal() { val services = object : ServiceHubInternal() {
override val networkService: MessagingService get() = net override val networkService: MessagingServiceInternal get() = net
override val networkMapCache: NetworkMapCache = InMemoryNetworkMapCache() override val networkMapCache: NetworkMapCache get() = netMapCache
override val storageService: TxWritableStorageService get() = storage override val storageService: TxWritableStorageService get() = storage
override val walletService: WalletService get() = wallet override val walletService: WalletService get() = wallet
override val keyManagementService: KeyManagementService get() = keyManagement override val keyManagementService: KeyManagementService get() = keyManagement
@ -122,7 +119,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
var inNodeNetworkMapService: NetworkMapService? = null var inNodeNetworkMapService: NetworkMapService? = null
var inNodeNotaryService: NotaryService? = null var inNodeNotaryService: NotaryService? = null
lateinit var identity: IdentityService lateinit var identity: IdentityService
lateinit var net: MessagingService lateinit var net: MessagingServiceInternal
lateinit var netMapCache: NetworkMapCache
lateinit var api: APIServer lateinit var api: APIServer
lateinit var scheduler: SchedulerService lateinit var scheduler: SchedulerService
lateinit var protocolLogicFactory: ProtocolLogicRefFactory lateinit var protocolLogicFactory: ProtocolLogicRefFactory
@ -151,6 +149,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
storage = storageServices.first storage = storageServices.first
checkpointStorage = storageServices.second checkpointStorage = storageServices.second
net = makeMessagingService() net = makeMessagingService()
netMapCache = InMemoryNetworkMapCache(net)
wallet = NodeWalletService(services) wallet = NodeWalletService(services)
makeInterestRatesOracleService() makeInterestRatesOracleService()
identity = makeIdentityService() identity = makeIdentityService()
@ -312,7 +311,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
net.stop() net.stop()
} }
protected abstract fun makeMessagingService(): MessagingService protected abstract fun makeMessagingService(): MessagingServiceInternal
protected abstract fun startMessagingService() protected abstract fun startMessagingService()

View File

@ -8,6 +8,7 @@ import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.serialization.NodeClock import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingService import com.r3corda.node.services.messaging.ArtemisMessagingService
import com.r3corda.node.servlets.AttachmentDownloadServlet import com.r3corda.node.servlets.AttachmentDownloadServlet
@ -67,7 +68,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
// when our process shuts down, but we try in stop() anyway just to be nice. // when our process shuts down, but we try in stop() anyway just to be nice.
private var nodeFileLock: FileLock? = null private var nodeFileLock: FileLock? = null
override fun makeMessagingService(): MessagingService = ArtemisMessagingService(dir, p2pAddr, configuration, serverThread) override fun makeMessagingService(): MessagingServiceInternal = ArtemisMessagingService(dir, p2pAddr, configuration, serverThread)
override fun startMessagingService() { override fun startMessagingService() {
// Start up the MQ service. // Start up the MQ service.

View File

@ -12,6 +12,7 @@ import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.testing.MockIdentityService import com.r3corda.core.node.services.testing.MockIdentityService
import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.internal.AbstractNode import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.network.InMemoryMessagingNetwork import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService
@ -77,7 +78,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
// We only need to override the messaging service here, as currently everything that hits disk does so // We only need to override the messaging service here, as currently everything that hits disk does so
// through the java.nio API which we are already mocking via Jimfs. // through the java.nio API which we are already mocking via Jimfs.
override fun makeMessagingService(): MessagingService { override fun makeMessagingService(): MessagingServiceInternal {
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id } require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, configuration.myLegalName).start().get() return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, configuration.myLegalName).start().get()
} }

View File

@ -2,15 +2,40 @@ package com.r3corda.node.services.api
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.TxWritableStorageService import com.r3corda.core.node.services.TxWritableStorageService
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.protocols.ProtocolLogicRefFactory
interface MessagingServiceInternal: MessagingService {
fun stop()
// Allow messaging service to be signalled by the NetworkMapCache about Nodes
// Thus providing an opportunity to permission the other Node and possibly to setup a link
fun registerTrustedAddress(address: SingleMessageRecipient)
}
/**
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
* on the messaging service interface until you have successfully started up the system. One of these objects should
* be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations
* may let you cast the returned future to an object that lets you get status info.
*
* A specific implementation of the controller class will have extra features that let you customise it before starting
* it up.
*/
interface MessagingServiceBuilder<out T : MessagingServiceInternal> {
fun start(): ListenableFuture<out T>
}
abstract class ServiceHubInternal : ServiceHub { abstract class ServiceHubInternal : ServiceHub {
abstract val monitoringService: MonitoringService abstract val monitoringService: MonitoringService
abstract val protocolLogicRefFactory: ProtocolLogicRefFactory abstract val protocolLogicRefFactory: ProtocolLogicRefFactory
abstract override val networkService: MessagingServiceInternal
/** /**
* Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then * Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then
* sends them to the wallet for further processing. This is intended for implementations to call from * sends them to the wallet for further processing. This is intended for implementations to call from

View File

@ -3,12 +3,15 @@ package com.r3corda.node.services.messaging
import com.google.common.net.HostAndPort import com.google.common.net.HostAndPort
import com.r3corda.core.RunOnCallerThread import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.ThreadBox import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.WhitelistTrustManagerProvider
import com.r3corda.core.crypto.X509Utilities import com.r3corda.core.crypto.X509Utilities
import com.r3corda.core.crypto.newSecureRandom import com.r3corda.core.crypto.newSecureRandom
import com.r3corda.core.crypto.registerWhitelistTrustManager
import com.r3corda.core.messaging.* import com.r3corda.core.messaging.*
import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.internal.Node import com.r3corda.node.internal.Node
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.config.NodeConfiguration
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.api.core.TransportConfiguration
@ -57,12 +60,19 @@ import javax.annotation.concurrent.ThreadSafe
class ArtemisMessagingService(val directory: Path, class ArtemisMessagingService(val directory: Path,
val myHostPort: HostAndPort, val myHostPort: HostAndPort,
val config: NodeConfiguration, val config: NodeConfiguration,
val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingService { val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingServiceInternal {
// In future: can contain onion routing info, etc. // In future: can contain onion routing info, etc.
private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient
companion object { companion object {
init {
// Until https://issues.apache.org/jira/browse/ARTEMIS-656 is resolved gate acceptable
// certificate hosts manually.
registerWhitelistTrustManager()
}
val log = loggerFor<ArtemisMessagingService>() val log = loggerFor<ArtemisMessagingService>()
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic". // This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
@ -240,6 +250,12 @@ class ArtemisMessagingService(val directory: Path,
} }
} }
override fun registerTrustedAddress(address: SingleMessageRecipient) {
require(address is Address) { "Address is not an Artemis Message Address" }
val hostName = (address as Address).hostAndPort.hostText
WhitelistTrustManagerProvider.addWhitelistEntry(hostName)
}
override fun send(message: Message, target: MessageRecipients) { override fun send(message: Message, target: MessageRecipients) {
if (target !is Address) if (target !is Address)
TODO("Only simple sends to single recipients are currently implemented") TODO("Only simple sends to single recipients are currently implemented")

View File

@ -10,6 +10,8 @@ import com.r3corda.core.messaging.*
import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.MessagingServiceBuilder
import com.r3corda.node.services.api.MessagingServiceInternal
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
@ -194,7 +196,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
* An instance can be obtained by creating a builder and then using the start method. * An instance can be obtained by creating a builder and then using the start method.
*/ */
@ThreadSafe @ThreadSafe
inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), MessagingService { inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), MessagingServiceInternal {
inner class Handler(val executor: Executor?, val topic: String, inner class Handler(val executor: Executor?, val topic: String,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@ -221,6 +224,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
} }
} }
override fun registerTrustedAddress(address: SingleMessageRecipient) {}
override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
check(running) check(running)
val (handler, items) = state.locked { val (handler, items) = state.locked {

View File

@ -18,6 +18,7 @@ import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize import com.r3corda.core.serialization.serialize
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.api.RegulatorService import com.r3corda.node.services.api.RegulatorService
import com.r3corda.node.services.clientapi.NodeInterestRates import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.transactions.NotaryService import com.r3corda.node.services.transactions.NotaryService
@ -31,7 +32,7 @@ import javax.annotation.concurrent.ThreadSafe
* Extremely simple in-memory cache of the network map. * Extremely simple in-memory cache of the network map.
*/ */
@ThreadSafe @ThreadSafe
open class InMemoryNetworkMapCache() : SingletonSerializeAsToken(), NetworkMapCache { open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) : SingletonSerializeAsToken(), NetworkMapCache {
override val networkMapNodes: List<NodeInfo> override val networkMapNodes: List<NodeInfo>
get() = get(NetworkMapService.Type) get() = get(NetworkMapService.Type)
override val regulators: List<NodeInfo> override val regulators: List<NodeInfo>
@ -93,6 +94,7 @@ open class InMemoryNetworkMapCache() : SingletonSerializeAsToken(), NetworkMapCa
override fun addNode(node: NodeInfo) { override fun addNode(node: NodeInfo) {
registeredNodes[node.identity] = node registeredNodes[node.identity] = node
netInternal?.registerTrustedAddress(node.address)
} }
override fun removeNode(node: NodeInfo) { override fun removeNode(node: NodeInfo) {

View File

@ -9,7 +9,7 @@ import com.r3corda.core.node.NodeInfo
/** /**
* Network map cache with no backing map service. * Network map cache with no backing map service.
*/ */
class MockNetworkMapCache() : InMemoryNetworkMapCache() { class MockNetworkMapCache() : InMemoryNetworkMapCache(null) {
data class MockAddress(val id: String): SingleMessageRecipient data class MockAddress(val id: String): SingleMessageRecipient
init { init {

View File

@ -3,13 +3,13 @@ package com.r3corda.node.services
import com.codahale.metrics.MetricRegistry import com.codahale.metrics.MetricRegistry
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.* import com.r3corda.core.node.services.*
import com.r3corda.core.node.services.testing.MockStorageService import com.r3corda.core.node.services.testing.MockStorageService
import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE
import com.r3corda.node.serialization.NodeClock import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.api.MonitoringService import com.r3corda.node.services.api.MonitoringService
import com.r3corda.node.services.api.ServiceHubInternal import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.network.MockNetworkMapCache import com.r3corda.node.services.network.MockNetworkMapCache
@ -22,7 +22,7 @@ import java.time.Clock
open class MockServices( open class MockServices(
customWallet: WalletService? = null, customWallet: WalletService? = null,
val keyManagement: KeyManagementService? = null, val keyManagement: KeyManagementService? = null,
val net: MessagingService? = null, val net: MessagingServiceInternal? = null,
val identity: IdentityService? = MOCK_IDENTITY_SERVICE, val identity: IdentityService? = MOCK_IDENTITY_SERVICE,
val storage: TxWritableStorageService? = MockStorageService(), val storage: TxWritableStorageService? = MockStorageService(),
val mapCache: NetworkMapCache? = MockNetworkMapCache(), val mapCache: NetworkMapCache? = MockNetworkMapCache(),
@ -36,7 +36,7 @@ open class MockServices(
get() = keyManagement ?: throw UnsupportedOperationException() get() = keyManagement ?: throw UnsupportedOperationException()
override val identityService: IdentityService override val identityService: IdentityService
get() = identity ?: throw UnsupportedOperationException() get() = identity ?: throw UnsupportedOperationException()
override val networkService: MessagingService override val networkService: MessagingServiceInternal
get() = net ?: throw UnsupportedOperationException() get() = net ?: throw UnsupportedOperationException()
override val networkMapCache: NetworkMapCache override val networkMapCache: NetworkMapCache
get() = mapCache ?: throw UnsupportedOperationException() get() = mapCache ?: throw UnsupportedOperationException()

View File

@ -7,6 +7,7 @@ import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.node.services.MockServices import com.r3corda.node.services.MockServices
import com.r3corda.node.services.api.Checkpoint import com.r3corda.node.services.api.Checkpoint
import com.r3corda.node.services.api.CheckpointStorage import com.r3corda.node.services.api.CheckpointStorage
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.network.InMemoryMessagingNetwork import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.utilities.AffinityExecutor import com.r3corda.node.utilities.AffinityExecutor
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
@ -45,7 +46,7 @@ class StateMachineManagerTests {
} }
private fun createManager() = StateMachineManager(object : MockServices() { private fun createManager() = StateMachineManager(object : MockServices() {
override val networkService: MessagingService get() = network override val networkService: MessagingServiceInternal get() = network
}, emptyList(), checkpointStorage, AffinityExecutor.SAME_THREAD) }, emptyList(), checkpointStorage, AffinityExecutor.SAME_THREAD)