mirror of
https://github.com/corda/corda.git
synced 2024-12-28 08:48:57 +00:00
Merge pull request #428 from corda/mnesbit-merge-20180131
Merge up bridging code move from OS
This commit is contained in:
commit
2edf632f7f
@ -1,34 +1,37 @@
|
|||||||
Client RPC
|
Client RPC
|
||||||
==========
|
==========
|
||||||
|
|
||||||
There are multiple ways to interact with a node from a *client program*, but if your client is written in a JVM
|
.. contents::
|
||||||
compatible language the easiest way to do so is using the client library. The library connects to your running
|
|
||||||
node using a message queue protocol and then provides a simple RPC interface to interact with it. You make calls
|
|
||||||
on a Java object as normal, and the marshalling back and forth is handled for you.
|
|
||||||
|
|
||||||
The starting point for the client library is the `CordaRPCClient`_ class. This provides a ``start`` method that
|
Overview
|
||||||
returns a `CordaRPCConnection`_, holding an implementation of the `CordaRPCOps`_ that may be accessed with ``proxy``
|
--------
|
||||||
in Kotlin and ``getProxy()`` in Java. Observables that are returned by RPCs can be subscribed to in order to receive
|
Corda provides a client library that allows you to easily write clients in a JVM-compatible language to interact
|
||||||
an ongoing stream of updates from the node. More detail on how to use this is provided in the docs for the proxy method.
|
with a running node. The library connects to the node using a message queue protocol and then provides a simple RPC
|
||||||
|
interface to interact with the node. You make calls on a Java object as normal, and the marshalling back and forth is
|
||||||
|
handled for you.
|
||||||
|
|
||||||
|
The starting point for the client library is the `CordaRPCClient`_ class. `CordaRPCClient`_ provides a ``start`` method
|
||||||
|
that returns a `CordaRPCConnection`_. A `CordaRPCConnection`_ allows you to access an implementation of the
|
||||||
|
`CordaRPCOps`_ interface with ``proxy`` in Kotlin or ``getProxy()`` in Java. The observables that are returned by RPC
|
||||||
|
operations can be subscribed to in order to receive an ongoing stream of updates from the node. More detail on this
|
||||||
|
functionality is provided in the docs for the ``proxy`` method.
|
||||||
|
|
||||||
.. warning:: The returned `CordaRPCConnection`_ is somewhat expensive to create and consumes a small amount of
|
.. warning:: The returned `CordaRPCConnection`_ is somewhat expensive to create and consumes a small amount of
|
||||||
server side resources. When you're done with it, call ``close`` on it. Alternatively you may use the ``use``
|
server side resources. When you're done with it, call ``close`` on it. Alternatively you may use the ``use``
|
||||||
method on `CordaRPCClient`_ which cleans up automatically after the passed in lambda finishes. Don't create
|
method on `CordaRPCClient`_ which cleans up automatically after the passed in lambda finishes. Don't create
|
||||||
a new proxy for every call you make - reuse an existing one.
|
a new proxy for every call you make - reuse an existing one.
|
||||||
|
|
||||||
For a brief tutorial on how one can use the RPC API see :doc:`tutorial-clientrpc-api`.
|
For a brief tutorial on using the RPC API, see :doc:`tutorial-clientrpc-api`.
|
||||||
|
|
||||||
.. _rpc_security_mgmt_ref:
|
.. _rpc_security_mgmt_ref:
|
||||||
|
|
||||||
RPC permissions
|
RPC permissions
|
||||||
---------------
|
---------------
|
||||||
If a node's owner needs to interact with their node via RPC (e.g. to read the contents of the node's storage), they
|
For a node's owner to interact with their node via RPC, they must define one or more RPC users. Each user is
|
||||||
must define one or more RPC users. Each user is authenticated with a username and password, and is assigned a set of
|
authenticated with a username and password, and is assigned a set of permissions that control which RPC operations they
|
||||||
permissions that RPC can use for fine-grain access control.
|
can perform.
|
||||||
|
|
||||||
These users are added to the node's ``node.conf`` file.
|
RPC users are created by adding them to the ``rpcUsers`` list in the node's ``node.conf`` file:
|
||||||
|
|
||||||
The simplest way of adding an RPC user is to include it in the ``rpcUsers`` list:
|
|
||||||
|
|
||||||
.. container:: codeset
|
.. container:: codeset
|
||||||
|
|
||||||
@ -43,7 +46,12 @@ The simplest way of adding an RPC user is to include it in the ``rpcUsers`` list
|
|||||||
...
|
...
|
||||||
]
|
]
|
||||||
|
|
||||||
Users need permissions to invoke any RPC call. By default, nothing is allowed. These permissions are specified as follows:
|
By default, RPC users are not permissioned to perform any RPC operations.
|
||||||
|
|
||||||
|
Granting flow permissions
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
You provide an RPC user with the permission to start a specific flow using the syntax
|
||||||
|
``StartFlow.<fully qualified flow name>``:
|
||||||
|
|
||||||
.. container:: codeset
|
.. container:: codeset
|
||||||
|
|
||||||
@ -61,14 +69,64 @@ Users need permissions to invoke any RPC call. By default, nothing is allowed. T
|
|||||||
...
|
...
|
||||||
]
|
]
|
||||||
|
|
||||||
Permissions Syntax
|
You can also provide an RPC user with the permission to start any flow using the syntax
|
||||||
^^^^^^^^^^^^^^^^^^
|
``InvokeRpc.startFlow``:
|
||||||
|
|
||||||
Fine grained permissions allow a user to invoke a specific RPC operation, or to start a specific flow. The syntax is:
|
.. container:: codeset
|
||||||
|
|
||||||
- to start a specific flow: ``StartFlow.<fully qualified flow name>`` e.g., ``StartFlow.net.corda.flows.ExampleFlow1``.
|
.. sourcecode:: groovy
|
||||||
- to invoke a RPC operation: ``InvokeRpc.<rpc method name>`` e.g., ``InvokeRpc.nodeInfo``.
|
|
||||||
.. note:: Permission ``InvokeRpc.startFlow`` allows a user to initiate all flows.
|
rpcUsers=[
|
||||||
|
{
|
||||||
|
username=exampleUser
|
||||||
|
password=examplePass
|
||||||
|
permissions=[
|
||||||
|
"InvokeRpc.startFlow"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
...
|
||||||
|
]
|
||||||
|
|
||||||
|
Granting other RPC permissions
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
You provide an RPC user with the permission to perform a specific RPC operation using the syntax
|
||||||
|
``InvokeRpc.<rpc method name>``:
|
||||||
|
|
||||||
|
.. container:: codeset
|
||||||
|
|
||||||
|
.. sourcecode:: groovy
|
||||||
|
|
||||||
|
rpcUsers=[
|
||||||
|
{
|
||||||
|
username=exampleUser
|
||||||
|
password=examplePass
|
||||||
|
permissions=[
|
||||||
|
"InvokeRpc.nodeInfo",
|
||||||
|
"InvokeRpc.networkMapSnapshot"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
...
|
||||||
|
]
|
||||||
|
|
||||||
|
Granting all permissions
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
You can provide an RPC user with the permission to perform any RPC operation (including starting any flow) using the
|
||||||
|
``ALL`` permission:
|
||||||
|
|
||||||
|
.. container:: codeset
|
||||||
|
|
||||||
|
.. sourcecode:: groovy
|
||||||
|
|
||||||
|
rpcUsers=[
|
||||||
|
{
|
||||||
|
username=exampleUser
|
||||||
|
password=examplePass
|
||||||
|
permissions=[
|
||||||
|
"ALL"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
...
|
||||||
|
]
|
||||||
|
|
||||||
.. _authentication_ref:
|
.. _authentication_ref:
|
||||||
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.services.messaging
|
package net.corda.nodeapi.internal
|
||||||
|
|
||||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.services.messaging
|
package net.corda.nodeapi.internal.bridging
|
||||||
|
|
||||||
import io.netty.channel.EventLoopGroup
|
import io.netty.channel.EventLoopGroup
|
||||||
import io.netty.channel.nio.NioEventLoopGroup
|
import io.netty.channel.nio.NioEventLoopGroup
|
||||||
@ -7,14 +7,15 @@ import net.corda.core.internal.VisibleForTesting
|
|||||||
import net.corda.core.node.NodeInfo
|
import net.corda.core.node.NodeInfo
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
import net.corda.node.internal.protonwrapper.messages.MessageStatus
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
import net.corda.node.internal.protonwrapper.netty.AMQPClient
|
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
|
||||||
import net.corda.node.services.messaging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName
|
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
||||||
|
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName
|
||||||
|
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||||
@ -34,7 +35,7 @@ import kotlin.concurrent.withLock
|
|||||||
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
|
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress: NetworkHostAndPort, val maxMessageSize: Int) : BridgeManager {
|
class AMQPBridgeManager(val config: NodeSSLConfiguration, val p2pAddress: NetworkHostAndPort, val maxMessageSize: Int) : BridgeManager {
|
||||||
|
|
||||||
private val lock = ReentrantLock()
|
private val lock = ReentrantLock()
|
||||||
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
|
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
|
@ -1,25 +1,25 @@
|
|||||||
package net.corda.node.services.messaging
|
package net.corda.nodeapi.internal.bridging
|
||||||
|
|
||||||
import net.corda.core.serialization.SerializationDefaults
|
import net.corda.core.serialization.SerializationDefaults
|
||||||
import net.corda.core.serialization.deserialize
|
import net.corda.core.serialization.deserialize
|
||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||||
import net.corda.nodeapi.internal.BridgeControl
|
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType
|
import org.apache.activemq.artemis.api.core.RoutingType
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
internal class BridgeControlListener(val config: NodeConfiguration,
|
class BridgeControlListener(val config: NodeSSLConfiguration,
|
||||||
val p2pAddress: NetworkHostAndPort,
|
val p2pAddress: NetworkHostAndPort,
|
||||||
val maxMessageSize: Int) : AutoCloseable {
|
val maxMessageSize: Int) : AutoCloseable {
|
||||||
private val bridgeId: String = UUID.randomUUID().toString()
|
private val bridgeId: String = UUID.randomUUID().toString()
|
||||||
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, p2pAddress, maxMessageSize)
|
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, p2pAddress, maxMessageSize)
|
||||||
private val validInboundQueues = mutableSetOf<String>()
|
private val validInboundQueues = mutableSetOf<String>()
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.nodeapi.internal
|
package net.corda.nodeapi.internal.bridging
|
||||||
|
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.serialization.CordaSerializable
|
import net.corda.core.serialization.CordaSerializable
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.services.messaging
|
package net.corda.nodeapi.internal.bridging
|
||||||
|
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.internal.VisibleForTesting
|
import net.corda.core.internal.VisibleForTesting
|
||||||
@ -6,7 +6,7 @@ import net.corda.core.node.NodeInfo
|
|||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides an internal interface that the [ArtemisMessagingServer] delegates to for Bridge activities.
|
* Provides an internal interface that the [BridgeControlListener] delegates to for Bridge activities.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
interface BridgeManager : AutoCloseable {
|
interface BridgeManager : AutoCloseable {
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.internal.protonwrapper.engine
|
package net.corda.nodeapi.internal.protonwrapper.engine
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf
|
import io.netty.buffer.ByteBuf
|
||||||
import io.netty.buffer.PooledByteBufAllocator
|
import io.netty.buffer.PooledByteBufAllocator
|
||||||
@ -7,9 +7,9 @@ import io.netty.channel.Channel
|
|||||||
import io.netty.channel.ChannelHandlerContext
|
import io.netty.channel.ChannelHandlerContext
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
import net.corda.node.internal.protonwrapper.messages.MessageStatus
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
import net.corda.node.internal.protonwrapper.messages.impl.ReceivedMessageImpl
|
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
|
||||||
import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl
|
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||||
import org.apache.qpid.proton.Proton
|
import org.apache.qpid.proton.Proton
|
||||||
import org.apache.qpid.proton.amqp.Binary
|
import org.apache.qpid.proton.amqp.Binary
|
||||||
import org.apache.qpid.proton.amqp.Symbol
|
import org.apache.qpid.proton.amqp.Symbol
|
@ -1,12 +1,12 @@
|
|||||||
package net.corda.node.internal.protonwrapper.engine
|
package net.corda.nodeapi.internal.protonwrapper.engine
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf
|
import io.netty.buffer.ByteBuf
|
||||||
import io.netty.channel.Channel
|
import io.netty.channel.Channel
|
||||||
import io.netty.channel.ChannelHandlerContext
|
import io.netty.channel.ChannelHandlerContext
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
import net.corda.node.internal.protonwrapper.messages.MessageStatus
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
import net.corda.node.internal.protonwrapper.messages.impl.ReceivedMessageImpl
|
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
|
||||||
import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl
|
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||||
import org.apache.qpid.proton.Proton
|
import org.apache.qpid.proton.Proton
|
||||||
import org.apache.qpid.proton.amqp.messaging.Accepted
|
import org.apache.qpid.proton.amqp.messaging.Accepted
|
||||||
import org.apache.qpid.proton.amqp.messaging.Rejected
|
import org.apache.qpid.proton.amqp.messaging.Rejected
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.internal.protonwrapper.engine
|
package net.corda.nodeapi.internal.protonwrapper.engine
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf
|
import io.netty.buffer.ByteBuf
|
||||||
import org.apache.qpid.proton.codec.WritableBuffer
|
import org.apache.qpid.proton.codec.WritableBuffer
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.internal.protonwrapper.messages
|
package net.corda.nodeapi.internal.protonwrapper.messages
|
||||||
|
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
|
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.internal.protonwrapper.messages
|
package net.corda.nodeapi.internal.protonwrapper.messages
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The processing state of a message.
|
* The processing state of a message.
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.internal.protonwrapper.messages
|
package net.corda.nodeapi.internal.protonwrapper.messages
|
||||||
|
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
|
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.internal.protonwrapper.messages
|
package net.corda.nodeapi.internal.protonwrapper.messages
|
||||||
|
|
||||||
import net.corda.core.concurrent.CordaFuture
|
import net.corda.core.concurrent.CordaFuture
|
||||||
|
|
@ -1,9 +1,9 @@
|
|||||||
package net.corda.node.internal.protonwrapper.messages.impl
|
package net.corda.nodeapi.internal.protonwrapper.messages.impl
|
||||||
|
|
||||||
import io.netty.channel.Channel
|
import io.netty.channel.Channel
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.internal.protonwrapper.messages.MessageStatus
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
import net.corda.node.internal.protonwrapper.messages.ReceivedMessage
|
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||||
import org.apache.qpid.proton.engine.Delivery
|
import org.apache.qpid.proton.engine.Delivery
|
||||||
|
|
||||||
/**
|
/**
|
@ -1,11 +1,11 @@
|
|||||||
package net.corda.node.internal.protonwrapper.messages.impl
|
package net.corda.nodeapi.internal.protonwrapper.messages.impl
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf
|
import io.netty.buffer.ByteBuf
|
||||||
import net.corda.core.concurrent.CordaFuture
|
import net.corda.core.concurrent.CordaFuture
|
||||||
import net.corda.core.internal.concurrent.openFuture
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.internal.protonwrapper.messages.MessageStatus
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
import net.corda.node.internal.protonwrapper.messages.SendableMessage
|
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An internal packet management class that allows handling of the encoded buffers and
|
* An internal packet management class that allows handling of the encoded buffers and
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.internal.protonwrapper.netty
|
package net.corda.nodeapi.internal.protonwrapper.netty
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf
|
import io.netty.buffer.ByteBuf
|
||||||
import io.netty.channel.ChannelDuplexHandler
|
import io.netty.channel.ChannelDuplexHandler
|
||||||
@ -10,11 +10,11 @@ import io.netty.handler.ssl.SslHandshakeCompletionEvent
|
|||||||
import io.netty.util.ReferenceCountUtil
|
import io.netty.util.ReferenceCountUtil
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
import net.corda.node.internal.protonwrapper.engine.EventProcessor
|
|
||||||
import net.corda.node.internal.protonwrapper.messages.ReceivedMessage
|
|
||||||
import net.corda.node.internal.protonwrapper.messages.impl.ReceivedMessageImpl
|
|
||||||
import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl
|
|
||||||
import net.corda.nodeapi.internal.crypto.x509
|
import net.corda.nodeapi.internal.crypto.x509
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.engine.EventProcessor
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||||
import org.apache.qpid.proton.engine.ProtonJTransport
|
import org.apache.qpid.proton.engine.ProtonJTransport
|
||||||
import org.apache.qpid.proton.engine.Transport
|
import org.apache.qpid.proton.engine.Transport
|
||||||
import org.apache.qpid.proton.engine.impl.ProtocolTracer
|
import org.apache.qpid.proton.engine.impl.ProtocolTracer
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.internal.protonwrapper.netty
|
package net.corda.nodeapi.internal.protonwrapper.netty
|
||||||
|
|
||||||
import io.netty.bootstrap.Bootstrap
|
import io.netty.bootstrap.Bootstrap
|
||||||
import io.netty.channel.*
|
import io.netty.channel.*
|
||||||
@ -12,9 +12,9 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory
|
|||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.node.internal.protonwrapper.messages.ReceivedMessage
|
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||||
import net.corda.node.internal.protonwrapper.messages.SendableMessage
|
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
|
||||||
import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl
|
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import java.security.KeyStore
|
import java.security.KeyStore
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.internal.protonwrapper.netty
|
package net.corda.nodeapi.internal.protonwrapper.netty
|
||||||
|
|
||||||
import io.netty.bootstrap.ServerBootstrap
|
import io.netty.bootstrap.ServerBootstrap
|
||||||
import io.netty.channel.Channel
|
import io.netty.channel.Channel
|
||||||
@ -14,9 +14,9 @@ import io.netty.util.internal.logging.InternalLoggerFactory
|
|||||||
import io.netty.util.internal.logging.Slf4JLoggerFactory
|
import io.netty.util.internal.logging.Slf4JLoggerFactory
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.node.internal.protonwrapper.messages.ReceivedMessage
|
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||||
import net.corda.node.internal.protonwrapper.messages.SendableMessage
|
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
|
||||||
import net.corda.node.internal.protonwrapper.messages.impl.SendableMessageImpl
|
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||||
import org.apache.qpid.proton.engine.Delivery
|
import org.apache.qpid.proton.engine.Delivery
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.internal.protonwrapper.netty
|
package net.corda.nodeapi.internal.protonwrapper.netty
|
||||||
|
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.security.cert.X509Certificate
|
import java.security.cert.X509Certificate
|
@ -1,4 +1,4 @@
|
|||||||
package net.corda.node.internal.protonwrapper.netty
|
package net.corda.nodeapi.internal.protonwrapper.netty
|
||||||
|
|
||||||
import io.netty.handler.ssl.SslHandler
|
import io.netty.handler.ssl.SslHandler
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
@ -5,15 +5,15 @@ import com.nhaarman.mockito_kotlin.whenever
|
|||||||
import net.corda.core.crypto.toStringShort
|
import net.corda.core.crypto.toStringShort
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.internal.protonwrapper.netty.AMQPServer
|
|
||||||
import net.corda.node.services.config.CertChainPolicyConfig
|
import net.corda.node.services.config.CertChainPolicyConfig
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||||
import net.corda.node.services.messaging.AMQPBridgeManager
|
|
||||||
import net.corda.node.services.messaging.ArtemisMessagingClient
|
|
||||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||||
import net.corda.node.services.messaging.BridgeManager
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
|
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager
|
||||||
|
import net.corda.nodeapi.internal.bridging.BridgeManager
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||||
import net.corda.testing.core.*
|
import net.corda.testing.core.*
|
||||||
import net.corda.testing.internal.rigorousMock
|
import net.corda.testing.internal.rigorousMock
|
||||||
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
|
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
|
||||||
|
@ -8,16 +8,16 @@ import net.corda.core.identity.CordaX500Name
|
|||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.toFuture
|
import net.corda.core.toFuture
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.internal.protonwrapper.messages.MessageStatus
|
|
||||||
import net.corda.node.internal.protonwrapper.netty.AMQPClient
|
|
||||||
import net.corda.node.internal.protonwrapper.netty.AMQPServer
|
|
||||||
import net.corda.node.services.config.CertChainPolicyConfig
|
import net.corda.node.services.config.CertChainPolicyConfig
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||||
import net.corda.node.services.messaging.ArtemisMessagingClient
|
|
||||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||||
import net.corda.testing.core.*
|
import net.corda.testing.core.*
|
||||||
import net.corda.testing.internal.rigorousMock
|
import net.corda.testing.internal.rigorousMock
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType
|
import org.apache.activemq.artemis.api.core.RoutingType
|
||||||
|
@ -33,6 +33,7 @@ import net.corda.node.utilities.AffinityExecutor
|
|||||||
import net.corda.node.utilities.DemoClock
|
import net.corda.node.utilities.DemoClock
|
||||||
import net.corda.nodeapi.internal.ShutdownHook
|
import net.corda.nodeapi.internal.ShutdownHook
|
||||||
import net.corda.nodeapi.internal.addShutdownHook
|
import net.corda.nodeapi.internal.addShutdownHook
|
||||||
|
import net.corda.nodeapi.internal.bridging.BridgeControlListener
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.serialization.*
|
import net.corda.nodeapi.internal.serialization.*
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
|
||||||
|
@ -22,12 +22,13 @@ import net.corda.node.services.statemachine.FlowMessagingImpl
|
|||||||
import net.corda.node.utilities.AffinityExecutor
|
import net.corda.node.utilities.AffinityExecutor
|
||||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||||
import net.corda.node.utilities.PersistentMap
|
import net.corda.node.utilities.PersistentMap
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||||
import net.corda.nodeapi.internal.BridgeControl
|
import net.corda.nodeapi.internal.bridging.BridgeControl
|
||||||
import net.corda.nodeapi.internal.BridgeEntry
|
import net.corda.nodeapi.internal.bridging.BridgeEntry
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||||
@ -232,14 +233,16 @@ class P2PMessagingClient(config: NodeConfiguration,
|
|||||||
val bridgeConsumer = session.createConsumer(bridgeNotifyQueue)
|
val bridgeConsumer = session.createConsumer(bridgeNotifyQueue)
|
||||||
bridgeNotifyConsumer = bridgeConsumer
|
bridgeNotifyConsumer = bridgeConsumer
|
||||||
bridgeConsumer.setMessageHandler { msg ->
|
bridgeConsumer.setMessageHandler { msg ->
|
||||||
val data: ByteArray = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
|
state.locked {
|
||||||
val notifyMessage = data.deserialize<BridgeControl>(context = SerializationDefaults.P2P_CONTEXT)
|
val data: ByteArray = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
|
||||||
log.info(notifyMessage.toString())
|
val notifyMessage = data.deserialize<BridgeControl>(context = SerializationDefaults.P2P_CONTEXT)
|
||||||
when (notifyMessage) {
|
log.info(notifyMessage.toString())
|
||||||
is BridgeControl.BridgeToNodeSnapshotRequest -> enumerateBridges(session, inboxes)
|
when (notifyMessage) {
|
||||||
else -> log.error("Unexpected Bridge Control message type on notify topc $notifyMessage")
|
is BridgeControl.BridgeToNodeSnapshotRequest -> enumerateBridges(session, inboxes)
|
||||||
|
else -> log.error("Unexpected Bridge Control message type on notify topc $notifyMessage")
|
||||||
|
}
|
||||||
|
msg.acknowledge()
|
||||||
}
|
}
|
||||||
msg.acknowledge()
|
|
||||||
}
|
}
|
||||||
networkChangeSubscription = networkMap.changed.subscribe { updateBridgesOnNetworkChange(it) }
|
networkChangeSubscription = networkMap.changed.subscribe { updateBridgesOnNetworkChange(it) }
|
||||||
}
|
}
|
||||||
@ -257,7 +260,7 @@ class P2PMessagingClient(config: NodeConfiguration,
|
|||||||
fun gatherAddresses(node: NodeInfo): Sequence<BridgeEntry> {
|
fun gatherAddresses(node: NodeInfo): Sequence<BridgeEntry> {
|
||||||
return node.legalIdentitiesAndCerts.map {
|
return node.legalIdentitiesAndCerts.map {
|
||||||
val messagingAddress = NodeAddress(it.party.owningKey, node.addresses.first())
|
val messagingAddress = NodeAddress(it.party.owningKey, node.addresses.first())
|
||||||
BridgeEntry(messagingAddress.queueName, node.addresses, listOf(it.party.name))
|
BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name })
|
||||||
}.filter { artemis.started!!.session.queueQuery(SimpleString(it.queueName)).isExists }.asSequence()
|
}.filter { artemis.started!!.session.queueQuery(SimpleString(it.queueName)).isExists }.asSequence()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@ import net.corda.core.messaging.RPCOps
|
|||||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.internal.security.RPCSecurityManager
|
import net.corda.node.internal.security.RPCSecurityManager
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||||
|
@ -11,6 +11,7 @@ import net.corda.node.utilities.AffinityExecutor
|
|||||||
import net.corda.nodeapi.VerifierApi
|
import net.corda.nodeapi.VerifierApi
|
||||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
|
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
|
||||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
|
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType
|
import org.apache.activemq.artemis.api.core.RoutingType
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
|
Loading…
Reference in New Issue
Block a user