class NodeMessagingClient : ArtemisMessagingComponent, MessagingServiceInternal
This class implements the MessagingService API using Apache Artemis, the successor to their ActiveMQ product. Artemis is a message queue broker and here we run a client connecting to the specified broker instance ArtemisMessagingServer. Its primarily concerned with peer-to-peer messaging.
Message handlers are run on the provided AffinityExecutor synchronously, that is, the Artemis callback threads are blocked until the handler is scheduled and completed. This allows backpressure to propagate from the given executor through into Artemis and from there, back through to senders.
An implementation of CordaRPCOps can be provided. If given, clients using the CordaMQClient RPC library can invoke methods on the provided implementation. There is more documentation on this in the docsite and the CordaRPCClient class.
serverHostPort
- The address of the broker instance to connect to (might be running in the same process)myIdentity
- Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate
that this is a NetworkMapService node which will be bound globally to the name "networkmap"executor
- An executor to run received message tasks upon.persistentInbox
- If true the inbox will be created persistent if not already created.
If false the inbox queue will be transient, which is appropriate for UI clients for example.Handler |
data class Handler : MessageHandlerRegistration A registration to handle messages of different types |
<init> |
NodeMessagingClient(config: NodeConfiguration, serverHostPort: <ERROR CLASS>, myIdentity: PublicKey?, executor: AffinityExecutor, persistentInbox: Boolean = true, rpcOps: CordaRPCOps? = null) This class implements the MessagingService API using Apache Artemis, the successor to their ActiveMQ product. Artemis is a message queue broker and here we run a client connecting to the specified broker instance ArtemisMessagingServer. Its primarily concerned with peer-to-peer messaging. |
executor |
val executor: AffinityExecutor |
myAddress |
val myAddress: SingleMessageRecipient Apart from the NetworkMapService this is the only other address accessible to the node outside of lookups against the NetworkMapCache. |
myIdentity |
val myIdentity: PublicKey? |
persistentInbox |
val persistentInbox: Boolean |
serverHostPort |
val serverHostPort: <ERROR CLASS> |
config |
val config: NodeSSLConfiguration |
addMessageHandler |
fun addMessageHandler(topic: String, sessionID: Long, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration The provided function will be invoked for each received message whose topic matches the given string, on the given executor. fun addMessageHandler(topicSession: TopicSession, executor: Executor?, callback: (Message, MessageHandlerRegistration) -> Unit): MessageHandlerRegistration The provided function will be invoked for each received message whose topic and session matches, on the given executor. |
createMessage |
fun createMessage(topicSession: TopicSession, data: ByteArray): Message fun createMessage(topic: String, sessionID: Long, data: ByteArray): Message Returns an initialised Message with the current time, etc, already filled in. |
removeMessageHandler |
fun removeMessageHandler(registration: MessageHandlerRegistration): Unit Removes a handler given the object returned from addMessageHandler. The callback will no longer be invoked once this method has returned, although executions that are currently in flight will not be interrupted. |
run |
fun run(): Unit Starts the p2p event loop: this method only returns once stop has been called. |
send |
fun send(message: Message, target: MessageRecipients): Unit Sends a message to the given receiver. The details of how receivers are identified is up to the messaging implementation: the type system provides an opaque high level view, with more fine grained control being available via type casting. Once this function returns the message is queued for delivery but not necessarily delivered: if the recipients are offline then the message could be queued hours or days later. |
start |
fun start(): Unit |
stop |
fun stop(): Unit Initiates shutdown: if called from a thread that isnt controlled by the executor passed to the constructor then this will block until all in-flight messages have finished being handled and acknowledged. If called from a thread thats a part of the AffinityExecutor given to the constructor, it returns immediately and shutdown is asynchronous. |
checkStorePasswords |
fun checkStorePasswords(): Unit Returns nothing if the keystore was opened OK or throws if not. Useful to check the password, as unfortunately Artemis tends to bury the exception when the password is wrong. |
configureWithDevSSLCertificate |
fun configureWithDevSSLCertificate(): Unit Strictly for dev only automatically construct a server certificate/private key signed from the CA certs in Node resources. Then provision KeyStores into certificates folder under node path. |
parseKeyFromQueueName |
fun parseKeyFromQueueName(name: String): PublicKey |
tcpTransport |
fun tcpTransport(direction: ConnectionDirection, host: String, port: Int): <ERROR CLASS> |
SESSION_ID_PROPERTY |
val SESSION_ID_PROPERTY: String |
TOPIC_PROPERTY |
val TOPIC_PROPERTY: String |
log |
val log: <ERROR CLASS> |
makeNetworkMapAddress |
fun makeNetworkMapAddress(hostAndPort: <ERROR CLASS>): SingleMessageRecipient This should be the only way to generate an ArtemisAddress and that only of the remote NetworkMapService node. All other addresses come from the NetworkMapCache, or myAddress below. The node will populate with their own identity based address when they register with the NetworkMapService. |
runOnNextMessage |
fun MessagingService.runOnNextMessage(topic: String, sessionID: Long, executor: Executor? = null, callback: (Message) -> Unit): Unit Registers a handler for the given topic and session ID that runs the given callback with the message and then removes itself. This is useful for one-shot handlers that arent supposed to stick around permanently. Note that this callback doesnt take the registration object, unlike the callback to MessagingService.addMessageHandler, as the handler is automatically deregistered before the callback runs. fun MessagingService.runOnNextMessage(topicSession: TopicSession, executor: Executor? = null, callback: (Message) -> Unit): Unit Registers a handler for the given topic and session that runs the given callback with the message and then removes itself. This is useful for one-shot handlers that arent supposed to stick around permanently. Note that this callback doesnt take the registration object, unlike the callback to MessagingService.addMessageHandler. |
send |
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients): Unit fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients): Unit |