Merged in mnesbit-cor-261-artemis-over-ssl (pull request #247)

Create MessageServiceInternal interface to allow NetworkMapCache to register addresses with the network service.
This commit is contained in:
Matthew Nesbit 2016-07-28 13:35:44 +01:00
commit 454f555728
11 changed files with 68 additions and 33 deletions

View File

@ -53,8 +53,6 @@ interface MessagingService {
*/
fun send(message: Message, target: MessageRecipients)
fun stop()
/**
* Returns an initialised [Message] with the current time, etc, already filled in.
*/
@ -83,19 +81,6 @@ fun MessagingService.send(topic: String, payload: Any, to: MessageRecipients) {
send(createMessage(topic, payload.serialize().bits), to)
}
/**
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
* on the messaging service interface until you have successfully started up the system. One of these objects should
* be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations
* may let you cast the returned future to an object that lets you get status info.
*
* A specific implementation of the controller class will have extra features that let you customise it before starting
* it up.
*/
interface MessagingServiceBuilder<out T : MessagingService> {
fun start(): ListenableFuture<out T>
}
interface MessageHandlerRegistration
/**

View File

@ -21,10 +21,7 @@ import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.node.api.APIServer
import com.r3corda.node.services.NotaryChangeService
import com.r3corda.node.services.api.AcceptsFileUpload
import com.r3corda.node.services.api.CheckpointStorage
import com.r3corda.node.services.api.MonitoringService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.api.*
import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.events.NodeSchedulerService
@ -87,8 +84,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
val servicesThatAcceptUploads: List<AcceptsFileUpload> = _servicesThatAcceptUploads
val services = object : ServiceHubInternal() {
override val networkService: MessagingService get() = net
override val networkMapCache: NetworkMapCache = InMemoryNetworkMapCache()
override val networkService: MessagingServiceInternal get() = net
override val networkMapCache: NetworkMapCache get() = netMapCache
override val storageService: TxWritableStorageService get() = storage
override val walletService: WalletService get() = wallet
override val keyManagementService: KeyManagementService get() = keyManagement
@ -122,7 +119,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
var inNodeNetworkMapService: NetworkMapService? = null
var inNodeNotaryService: NotaryService? = null
lateinit var identity: IdentityService
lateinit var net: MessagingService
lateinit var net: MessagingServiceInternal
lateinit var netMapCache: NetworkMapCache
lateinit var api: APIServer
lateinit var scheduler: SchedulerService
lateinit var protocolLogicFactory: ProtocolLogicRefFactory
@ -151,6 +149,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
storage = storageServices.first
checkpointStorage = storageServices.second
net = makeMessagingService()
netMapCache = InMemoryNetworkMapCache(net)
wallet = NodeWalletService(services)
makeInterestRatesOracleService()
identity = makeIdentityService()
@ -312,7 +311,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
net.stop()
}
protected abstract fun makeMessagingService(): MessagingService
protected abstract fun makeMessagingService(): MessagingServiceInternal
protected abstract fun startMessagingService()

View File

@ -8,6 +8,7 @@ import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingService
import com.r3corda.node.servlets.AttachmentDownloadServlet
@ -72,7 +73,7 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
// when our process shuts down, but we try in stop() anyway just to be nice.
private var nodeFileLock: FileLock? = null
override fun makeMessagingService(): MessagingService = ArtemisMessagingService(dir, p2pAddr, configuration, serverThread)
override fun makeMessagingService(): MessagingServiceInternal = ArtemisMessagingService(dir, p2pAddr, configuration, serverThread)
override fun startMessagingService() {
// Start up the MQ service.

View File

@ -12,6 +12,7 @@ import com.r3corda.core.node.services.ServiceType
import com.r3corda.core.node.services.testing.MockIdentityService
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.services.network.NetworkMapService
@ -77,7 +78,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
// We only need to override the messaging service here, as currently everything that hits disk does so
// through the java.nio API which we are already mocking via Jimfs.
override fun makeMessagingService(): MessagingService {
override fun makeMessagingService(): MessagingServiceInternal {
require(id >= 0) { "Node ID must be zero or positive, was passed: " + id }
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, configuration.myLegalName).start().get()
}

View File

@ -2,15 +2,40 @@ package com.r3corda.node.services.api
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.TxWritableStorageService
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory
interface MessagingServiceInternal: MessagingService {
fun stop()
// Allow messaging service to be signalled by the NetworkMapCache about Nodes
// Thus providing an opportunity to permission the other Node and possibly to setup a link
fun registerTrustedAddress(address: SingleMessageRecipient)
}
/**
* This class lets you start up a [MessagingService]. Its purpose is to stop you from getting access to the methods
* on the messaging service interface until you have successfully started up the system. One of these objects should
* be the only way to obtain a reference to a [MessagingService]. Startup may be a slow process: some implementations
* may let you cast the returned future to an object that lets you get status info.
*
* A specific implementation of the controller class will have extra features that let you customise it before starting
* it up.
*/
interface MessagingServiceBuilder<out T : MessagingServiceInternal> {
fun start(): ListenableFuture<out T>
}
abstract class ServiceHubInternal : ServiceHub {
abstract val monitoringService: MonitoringService
abstract val protocolLogicRefFactory: ProtocolLogicRefFactory
abstract override val networkService: MessagingServiceInternal
/**
* Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then
* sends them to the wallet for further processing. This is intended for implementations to call from

View File

@ -3,12 +3,15 @@ package com.r3corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.WhitelistTrustManagerProvider
import com.r3corda.core.crypto.X509Utilities
import com.r3corda.core.crypto.newSecureRandom
import com.r3corda.core.crypto.registerWhitelistTrustManager
import com.r3corda.core.messaging.*
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.internal.Node
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration
@ -57,12 +60,19 @@ import javax.annotation.concurrent.ThreadSafe
class ArtemisMessagingService(val directory: Path,
val myHostPort: HostAndPort,
val config: NodeConfiguration,
val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingService {
val defaultExecutor: Executor = RunOnCallerThread) : SingletonSerializeAsToken(), MessagingServiceInternal {
// In future: can contain onion routing info, etc.
private data class Address(val hostAndPort: HostAndPort) : SingleMessageRecipient
companion object {
init {
// Until https://issues.apache.org/jira/browse/ARTEMIS-656 is resolved gate acceptable
// certificate hosts manually.
registerWhitelistTrustManager()
}
val log = loggerFor<ArtemisMessagingService>()
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
@ -240,6 +250,12 @@ class ArtemisMessagingService(val directory: Path,
}
}
override fun registerTrustedAddress(address: SingleMessageRecipient) {
require(address is Address) { "Address is not an Artemis Message Address" }
val hostName = (address as Address).hostAndPort.hostText
WhitelistTrustManagerProvider.addWhitelistEntry(hostName)
}
override fun send(message: Message, target: MessageRecipients) {
if (target !is Address)
TODO("Only simple sends to single recipients are currently implemented")

View File

@ -10,6 +10,8 @@ import com.r3corda.core.messaging.*
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.api.MessagingServiceBuilder
import com.r3corda.node.services.api.MessagingServiceInternal
import org.slf4j.LoggerFactory
import rx.Observable
import rx.subjects.PublishSubject
@ -194,7 +196,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
* An instance can be obtained by creating a builder and then using the start method.
*/
@ThreadSafe
inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), MessagingService {
inner class InMemoryMessaging(private val manuallyPumped: Boolean, private val handle: Handle) : SingletonSerializeAsToken(), MessagingServiceInternal {
inner class Handler(val executor: Executor?, val topic: String,
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
@ -221,6 +224,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
}
}
override fun registerTrustedAddress(address: SingleMessageRecipient) {}
override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
check(running)
val (handler, items) = state.locked {

View File

@ -18,6 +18,7 @@ import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.SingletonSerializeAsToken
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.api.RegulatorService
import com.r3corda.node.services.clientapi.NodeInterestRates
import com.r3corda.node.services.transactions.NotaryService
@ -31,7 +32,7 @@ import javax.annotation.concurrent.ThreadSafe
* Extremely simple in-memory cache of the network map.
*/
@ThreadSafe
open class InMemoryNetworkMapCache() : SingletonSerializeAsToken(), NetworkMapCache {
open class InMemoryNetworkMapCache(val netInternal: MessagingServiceInternal?) : SingletonSerializeAsToken(), NetworkMapCache {
override val networkMapNodes: List<NodeInfo>
get() = get(NetworkMapService.Type)
override val regulators: List<NodeInfo>
@ -93,6 +94,7 @@ open class InMemoryNetworkMapCache() : SingletonSerializeAsToken(), NetworkMapCa
override fun addNode(node: NodeInfo) {
registeredNodes[node.identity] = node
netInternal?.registerTrustedAddress(node.address)
}
override fun removeNode(node: NodeInfo) {

View File

@ -9,7 +9,7 @@ import com.r3corda.core.node.NodeInfo
/**
* Network map cache with no backing map service.
*/
class MockNetworkMapCache() : InMemoryNetworkMapCache() {
class MockNetworkMapCache() : InMemoryNetworkMapCache(null) {
data class MockAddress(val id: String): SingleMessageRecipient
init {

View File

@ -3,13 +3,13 @@ package com.r3corda.node.services
import com.codahale.metrics.MetricRegistry
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.node.services.*
import com.r3corda.core.node.services.testing.MockStorageService
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.ProtocolLogicRefFactory
import com.r3corda.core.testing.MOCK_IDENTITY_SERVICE
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.api.MonitoringService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.network.MockNetworkMapCache
@ -22,7 +22,7 @@ import java.time.Clock
open class MockServices(
customWallet: WalletService? = null,
val keyManagement: KeyManagementService? = null,
val net: MessagingService? = null,
val net: MessagingServiceInternal? = null,
val identity: IdentityService? = MOCK_IDENTITY_SERVICE,
val storage: TxWritableStorageService? = MockStorageService(),
val mapCache: NetworkMapCache? = MockNetworkMapCache(),
@ -36,7 +36,7 @@ open class MockServices(
get() = keyManagement ?: throw UnsupportedOperationException()
override val identityService: IdentityService
get() = identity ?: throw UnsupportedOperationException()
override val networkService: MessagingService
override val networkService: MessagingServiceInternal
get() = net ?: throw UnsupportedOperationException()
override val networkMapCache: NetworkMapCache
get() = mapCache ?: throw UnsupportedOperationException()

View File

@ -7,6 +7,7 @@ import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.node.services.MockServices
import com.r3corda.node.services.api.Checkpoint
import com.r3corda.node.services.api.CheckpointStorage
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.network.InMemoryMessagingNetwork
import com.r3corda.node.utilities.AffinityExecutor
import org.assertj.core.api.Assertions.assertThat
@ -45,7 +46,7 @@ class StateMachineManagerTests {
}
private fun createManager() = StateMachineManager(object : MockServices() {
override val networkService: MessagingService get() = network
override val networkService: MessagingServiceInternal get() = network
}, emptyList(), checkpointStorage, AffinityExecutor.SAME_THREAD)