From 027853c3a88a591465964cc5972e2732d600cf2b Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 28 Jul 2016 17:51:30 +0100 Subject: [PATCH] node: Started refactoring ArtemisMessagingService into ArtemisServerService and ArtemisClientService --- .../messaging/ArtemisClientService.kt | 28 ++++++++ .../services/messaging/ArtemisMessaging.kt | 41 +++++++++++ .../messaging/ArtemisMessagingService.kt | 6 +- .../messaging/ArtemisServerService.kt | 70 +++++++++++++++++++ 4 files changed, 142 insertions(+), 3 deletions(-) create mode 100644 node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisClientService.kt create mode 100644 node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessaging.kt create mode 100644 node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisServerService.kt diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisClientService.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisClientService.kt new file mode 100644 index 0000000000..ab98461c34 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisClientService.kt @@ -0,0 +1,28 @@ +package com.r3corda.node.services.messaging + +import com.r3corda.core.messaging.* +import java.util.concurrent.Executor + +/** + * Created by exfalso on 7/28/16. + */ +class ArtemisClientService : MessagingService { + override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun removeMessageHandler(registration: MessageHandlerRegistration) { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun send(message: Message, target: MessageRecipients) { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun createMessage(topic: String, data: ByteArray): Message { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override val myAddress: SingleMessageRecipient + get() = throw UnsupportedOperationException() +} diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessaging.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessaging.kt new file mode 100644 index 0000000000..1b40e3cc37 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessaging.kt @@ -0,0 +1,41 @@ +package com.r3corda.node.services.messaging + +import com.google.common.net.HostAndPort +import com.r3corda.core.ThreadBox +import com.r3corda.core.messaging.Message +import com.r3corda.core.messaging.MessageHandlerRegistration +import com.r3corda.core.messaging.SingleMessageRecipient +import org.apache.activemq.artemis.api.core.client.ClientConsumer +import org.apache.activemq.artemis.api.core.client.ClientProducer +import org.apache.activemq.artemis.api.core.client.ClientSession +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory +import org.apache.activemq.artemis.core.postoffice.Address +import org.apache.activemq.artemis.core.server.ActiveMQServer +import java.util.* +import java.util.concurrent.Executor + + +internal data class ArtemisAddress(val hostAndPort: HostAndPort) : SingleMessageRecipient + +/** A registration to handle messages of different types */ +internal data class ArtemisHandler( + val executor: Executor?, + val topic: String, + val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration + +class ArtemisMessaging { + + private lateinit var activeMQServer: ActiveMQServer + private lateinit var clientFactory: ClientSessionFactory + private var session: ClientSession? = null + private var inboundConsumer: ClientConsumer? = null + + private class InnerState { + var running = false + val sendClients = HashMap() + } + + private val mutex = ThreadBox(InnerState()) + + +} diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt index c5da7a90ca..5bae2827eb 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingService.kt @@ -102,9 +102,9 @@ class ArtemisMessagingService(val directory: Path, private val mutex = ThreadBox(InnerState()) /** A registration to handle messages of different types */ - inner class Handler(val executor: Executor?, - val topicSession: TopicSession, - val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration + data class Handler(val executor: Executor?, + val topicSession: TopicSession, + val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration private val handlers = CopyOnWriteArrayList() diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisServerService.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisServerService.kt new file mode 100644 index 0000000000..19197fd7ab --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisServerService.kt @@ -0,0 +1,70 @@ +package com.r3corda.node.services.messaging + +import com.google.common.net.HostAndPort +import com.r3corda.core.crypto.registerWhitelistTrustManager +import com.r3corda.core.messaging.Message +import com.r3corda.core.messaging.MessageHandlerRegistration +import com.r3corda.core.messaging.MessageRecipients +import com.r3corda.core.messaging.SingleMessageRecipient +import com.r3corda.core.serialization.SingletonSerializeAsToken +import com.r3corda.core.utilities.loggerFor +import com.r3corda.node.internal.Node +import com.r3corda.node.services.api.MessagingServiceInternal +import java.util.concurrent.Executor + +/** + * Created by exfalso on 7/28/16. + */ +class ArtemisServerService : SingletonSerializeAsToken(), MessagingServiceInternal { + + companion object { + init { + // Until https://issues.apache.org/jira/browse/ARTEMIS-656 is resolved gate acceptable + // certificate hosts manually. + registerWhitelistTrustManager() + } + + + val log = loggerFor() + + // This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic". + // We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint + // that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid + // confusion. + val TOPIC_PROPERTY = "platform-topic" + + /** Temp helper until network map is established. */ + fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = ArtemisAddress(hostAndPort) + fun makeRecipient(hostname: String) = makeRecipient(toHostAndPort(hostname)) + fun toHostAndPort(hostname: String) = HostAndPort.fromString(hostname).withDefaultPort(Node.DEFAULT_PORT) + } + + + override fun stop() { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun registerTrustedAddress(address: SingleMessageRecipient) { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun removeMessageHandler(registration: MessageHandlerRegistration) { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun send(message: Message, target: MessageRecipients) { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun createMessage(topic: String, data: ByteArray): Message { + throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override val myAddress: SingleMessageRecipient + get() = throw UnsupportedOperationException() + +}