mirror of
https://github.com/corda/corda.git
synced 2025-01-19 03:06:36 +00:00
node: Started refactoring ArtemisMessagingService into ArtemisServerService and ArtemisClientService
This commit is contained in:
parent
845f2bdd64
commit
027853c3a8
@ -0,0 +1,28 @@
|
||||
package com.r3corda.node.services.messaging
|
||||
|
||||
import com.r3corda.core.messaging.*
|
||||
import java.util.concurrent.Executor
|
||||
|
||||
/**
|
||||
* Created by exfalso on 7/28/16.
|
||||
*/
|
||||
class ArtemisClientService : MessagingService {
|
||||
override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
|
||||
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override fun removeMessageHandler(registration: MessageHandlerRegistration) {
|
||||
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override fun createMessage(topic: String, data: ByteArray): Message {
|
||||
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override val myAddress: SingleMessageRecipient
|
||||
get() = throw UnsupportedOperationException()
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
package com.r3corda.node.services.messaging
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.messaging.MessageHandlerRegistration
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
|
||||
import org.apache.activemq.artemis.core.postoffice.Address
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executor
|
||||
|
||||
|
||||
internal data class ArtemisAddress(val hostAndPort: HostAndPort) : SingleMessageRecipient
|
||||
|
||||
/** A registration to handle messages of different types */
|
||||
internal data class ArtemisHandler(
|
||||
val executor: Executor?,
|
||||
val topic: String,
|
||||
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
|
||||
class ArtemisMessaging {
|
||||
|
||||
private lateinit var activeMQServer: ActiveMQServer
|
||||
private lateinit var clientFactory: ClientSessionFactory
|
||||
private var session: ClientSession? = null
|
||||
private var inboundConsumer: ClientConsumer? = null
|
||||
|
||||
private class InnerState {
|
||||
var running = false
|
||||
val sendClients = HashMap<Address, ClientProducer>()
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
|
||||
|
||||
}
|
@ -102,9 +102,9 @@ class ArtemisMessagingService(val directory: Path,
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
|
||||
/** A registration to handle messages of different types */
|
||||
inner class Handler(val executor: Executor?,
|
||||
val topicSession: TopicSession,
|
||||
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
data class Handler(val executor: Executor?,
|
||||
val topicSession: TopicSession,
|
||||
val callback: (Message, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
|
||||
private val handlers = CopyOnWriteArrayList<Handler>()
|
||||
|
||||
|
@ -0,0 +1,70 @@
|
||||
package com.r3corda.node.services.messaging
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.r3corda.core.crypto.registerWhitelistTrustManager
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.messaging.MessageHandlerRegistration
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.internal.Node
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import java.util.concurrent.Executor
|
||||
|
||||
/**
|
||||
* Created by exfalso on 7/28/16.
|
||||
*/
|
||||
class ArtemisServerService : SingletonSerializeAsToken(), MessagingServiceInternal {
|
||||
|
||||
companion object {
|
||||
init {
|
||||
// Until https://issues.apache.org/jira/browse/ARTEMIS-656 is resolved gate acceptable
|
||||
// certificate hosts manually.
|
||||
registerWhitelistTrustManager()
|
||||
}
|
||||
|
||||
|
||||
val log = loggerFor<ArtemisMessagingService>()
|
||||
|
||||
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
|
||||
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
|
||||
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
|
||||
// confusion.
|
||||
val TOPIC_PROPERTY = "platform-topic"
|
||||
|
||||
/** Temp helper until network map is established. */
|
||||
fun makeRecipient(hostAndPort: HostAndPort): SingleMessageRecipient = ArtemisAddress(hostAndPort)
|
||||
fun makeRecipient(hostname: String) = makeRecipient(toHostAndPort(hostname))
|
||||
fun toHostAndPort(hostname: String) = HostAndPort.fromString(hostname).withDefaultPort(Node.DEFAULT_PORT)
|
||||
}
|
||||
|
||||
|
||||
override fun stop() {
|
||||
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override fun registerTrustedAddress(address: SingleMessageRecipient) {
|
||||
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override fun addMessageHandler(topic: String, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration {
|
||||
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override fun removeMessageHandler(registration: MessageHandlerRegistration) {
|
||||
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override fun send(message: Message, target: MessageRecipients) {
|
||||
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override fun createMessage(topic: String, data: ByteArray): Message {
|
||||
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override val myAddress: SingleMessageRecipient
|
||||
get() = throw UnsupportedOperationException()
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user