mirror of
https://github.com/corda/corda.git
synced 2024-12-28 00:38:55 +00:00
Merge commit '6edf95506bb2bd96ebffade9b9dd82d468cd4252' into enterprise
# Conflicts: # docs/source/changelog.rst # node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt
This commit is contained in:
commit
f3c5ae2ff6
@ -29,7 +29,7 @@ import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.nodeapi.ArtemisConsumer
|
||||
import net.corda.nodeapi.ArtemisProducer
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
@ -194,7 +194,7 @@ class RPCClientProxyHandler(
|
||||
TimeUnit.MILLISECONDS
|
||||
)
|
||||
sessionAndProducerPool.run {
|
||||
it.session.createTemporaryQueue(clientAddress, ActiveMQDefaultConfiguration.getDefaultRoutingType(), clientAddress)
|
||||
it.session.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress)
|
||||
}
|
||||
val sessionFactory = serverLocator.createSessionFactory()
|
||||
val session = sessionFactory.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
|
@ -158,6 +158,13 @@ R3 Corda 3.0 Developer Preview
|
||||
* Peer-to-peer communications is now via AMQP 1.0 as default.
|
||||
Although the legacy Artemis CORE bridging can still be used by setting the ``useAMQPBridges`` configuration property to false.
|
||||
|
||||
* The Artemis topics used for peer-to-peer communication have been changed to be more consistent with future cryptographic
|
||||
agility and to open up the future possibility of sharing brokers between nodes. This is a breaking wire level change
|
||||
as it means that nodes after this change will not be able to communicate correctly with nodes running the previous version.
|
||||
Also, any pending enqueued messages in the Artemis message store will not be delivered correctly to their original target.
|
||||
However, assuming a clean reset of the artemis data and that the nodes are consistent versions,
|
||||
data persisted via the AMQP serializer will be forward compatible.
|
||||
|
||||
* Enterprise Corda only: Compatibility with SQL Server 2017 and SQL Azure databases.
|
||||
|
||||
* Enterprise Corda only: node configuration property ``database.schema`` and documented existing database properties.
|
||||
|
@ -46,7 +46,7 @@ Message queues
|
||||
The node makes use of various queues for its operation. The more important ones are described below. Others are used
|
||||
for maintenance and other minor purposes.
|
||||
|
||||
:``p2p.inbound``:
|
||||
:``p2p.inbound.$identity``:
|
||||
The node listens for messages sent from other peer nodes on this queue. Only clients who are authenticated to be
|
||||
nodes on the same network are given permission to send. Messages which are routed internally are also sent to this
|
||||
queue (e.g. two flows on the same node communicating with each other).
|
||||
@ -54,7 +54,7 @@ for maintenance and other minor purposes.
|
||||
:``internal.peers.$identity``:
|
||||
These are a set of private queues only available to the node which it uses to route messages destined to other peers.
|
||||
The queue name ends in the base 58 encoding of the peer's identity key. There is at most one queue per peer. The broker
|
||||
creates a bridge from this queue to the peer's ``p2p.inbound`` queue, using the network map service to lookup the
|
||||
creates a bridge from this queue to the peer's ``p2p.inbound.$identity`` queue, using the network map service to lookup the
|
||||
peer's network address.
|
||||
|
||||
:``internal.services.$identity``:
|
||||
@ -86,7 +86,7 @@ Clients attempting to connect to the node's broker fall in one of four groups:
|
||||
#. Anyone connecting with the username ``SystemUsers/Peer`` is treated as a peer on the same Corda network as the node. Their
|
||||
TLS root CA must be the same as the node's root CA - the root CA is the doorman of the network and having the same root CA
|
||||
implies we've been let in by the same doorman. If they are part of the same network then they are only given permission
|
||||
to send to our ``p2p.inbound`` queue, otherwise they are rejected.
|
||||
to send to our ``p2p.inbound.$identity`` queue, otherwise they are rejected.
|
||||
|
||||
#. Every other username is treated as a RPC user and authenticated against the node's list of valid RPC users. If that
|
||||
is successful then they are only given sufficient permission to perform RPC, otherwise they are rejected.
|
||||
|
@ -1,3 +1,9 @@
|
||||
.. highlight:: kotlin
|
||||
.. raw:: html
|
||||
|
||||
<script type="text/javascript" src="_static/jquery.js"></script>
|
||||
<script type="text/javascript" src="_static/codesets.js"></script>
|
||||
|
||||
Upgrading a CorDapp (outside of platform version upgrades)
|
||||
==========================================================
|
||||
|
||||
@ -127,17 +133,39 @@ The ``InitiatingFlow`` version number is included in the flow session handshake
|
||||
the flow running on the other side. In particular, it has a ``flowVersion`` property which can be used to
|
||||
programmatically evolve flows across versions. For example:
|
||||
|
||||
.. sourcecode:: kotlin
|
||||
.. container:: codeset
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val otherFlowVersion = otherSession.getCounterpartyFlowInfo().flowVersion
|
||||
val receivedString = if (otherFlowVersion == 1) {
|
||||
receive<Int>(otherParty).unwrap { it.toString() }
|
||||
} else {
|
||||
receive<String>(otherParty).unwrap { it }
|
||||
.. sourcecode:: kotlin
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val otherFlowVersion = otherSession.getCounterpartyFlowInfo().flowVersion
|
||||
val receivedString = if (otherFlowVersion == 1) {
|
||||
otherSession.receive<Int>().unwrap { it.toString() }
|
||||
} else {
|
||||
otherSession.receive<String>().unwrap { it }
|
||||
}
|
||||
}
|
||||
|
||||
.. sourcecode:: java
|
||||
|
||||
@Suspendable
|
||||
@Override public Void call() throws FlowException {
|
||||
int otherFlowVersion = otherSession.getCounterpartyFlowInfo().getFlowVersion();
|
||||
String receivedString;
|
||||
|
||||
if (otherFlowVersion == 1) {
|
||||
receivedString = otherSession.receive(Integer.class).unwrap(integer -> {
|
||||
return integer.toString();
|
||||
});
|
||||
} else {
|
||||
receivedString = otherSession.receive(String.class).unwrap(string -> {
|
||||
return string;
|
||||
});
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
This code shows a flow that in its first version expected to receive an Int, but in subsequent versions was modified to
|
||||
expect a String. This flow is still able to communicate with parties that are running the older CorDapp containing
|
||||
@ -147,30 +175,73 @@ How do I deal with interface changes to inlined subflows?
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
Here is an example of an in-lined subflow:
|
||||
|
||||
.. sourcecode:: kotlin
|
||||
.. container:: codeset
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class FlowA(val recipient: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
subFlow(FlowB(recipient))
|
||||
.. sourcecode:: kotlin
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class FlowA(val recipient: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
subFlow(FlowB(recipient))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(FlowA::class)
|
||||
class FlowC(val otherSession: FlowSession) : FlowLogic() {
|
||||
// Omitted.
|
||||
}
|
||||
|
||||
// Note: No annotations. This is used as an inlined subflow.
|
||||
class FlowB(val recipient: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val message = "I'm an inlined subflow, so I inherit the @InitiatingFlow's session ID and type."
|
||||
initiateFlow(recipient).send(message)
|
||||
@InitiatedBy(FlowA::class)
|
||||
class FlowC(val otherSession: FlowSession) : FlowLogic() {
|
||||
// Omitted.
|
||||
}
|
||||
|
||||
// Note: No annotations. This is used as an inlined subflow.
|
||||
class FlowB(val recipient: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val message = "I'm an inlined subflow, so I inherit the @InitiatingFlow's session ID and type."
|
||||
initiateFlow(recipient).send(message)
|
||||
}
|
||||
}
|
||||
|
||||
.. sourcecode:: java
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class FlowA extends FlowLogic<Void> {
|
||||
private final Party recipient;
|
||||
|
||||
public FlowA(Party recipient) {
|
||||
this.recipient = recipient;
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@Override public Void call() throws FlowException {
|
||||
subFlow(new FlowB(recipient));
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(FlowA.class)
|
||||
class FlowC extends FlowLogic<Void> {
|
||||
// Omitted.
|
||||
}
|
||||
|
||||
// Note: No annotations. This is used as an inlined subflow.
|
||||
class FlowB extends FlowLogic<Void> {
|
||||
private final Party recipient;
|
||||
|
||||
public FlowB(Party recipient) {
|
||||
this.recipient = recipient;
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@Override public Void call() {
|
||||
String message = "I'm an inlined subflow, so I inherit the @InitiatingFlow's session ID and type.";
|
||||
initiateFlow(recipient).send(message);
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Inlined subflows are treated as being the flow that invoked them when initiating a new flow session with a counterparty.
|
||||
Suppose flow ``A`` calls inlined subflow B, which, in turn, initiates a session with a counterparty. The ``FlowLogic``
|
||||
@ -361,34 +432,138 @@ for details.
|
||||
For backwards compatible changes such as adding columns, the procedure for upgrading a state schema is to extend the
|
||||
existing object relational mapper. For example, we can update:
|
||||
|
||||
.. sourcecode:: kotlin
|
||||
.. container:: codeset
|
||||
|
||||
object ObligationSchemaV1 : MappedSchema(Obligation::class.java, 1, listOf(ObligationEntity::class.java)) {
|
||||
@Entity @Table(name = "obligations")
|
||||
class ObligationEntity(obligation: Obligation) : PersistentState() {
|
||||
@Column var currency: String = obligation.amount.token.toString()
|
||||
@Column var amount: Long = obligation.amount.quantity
|
||||
@Column @Lob var lender: ByteArray = obligation.lender.owningKey.encoded
|
||||
@Column @Lob var borrower: ByteArray = obligation.borrower.owningKey.encoded
|
||||
@Column var linear_id: String = obligation.linearId.id.toString()
|
||||
.. sourcecode:: kotlin
|
||||
|
||||
object ObligationSchemaV1 : MappedSchema(Obligation::class.java, 1, listOf(ObligationEntity::class.java)) {
|
||||
@Entity @Table(name = "obligations")
|
||||
class ObligationEntity(obligation: Obligation) : PersistentState() {
|
||||
@Column var currency: String = obligation.amount.token.toString()
|
||||
@Column var amount: Long = obligation.amount.quantity
|
||||
@Column @Lob var lender: ByteArray = obligation.lender.owningKey.encoded
|
||||
@Column @Lob var borrower: ByteArray = obligation.borrower.owningKey.encoded
|
||||
@Column var linear_id: String = obligation.linearId.id.toString()
|
||||
}
|
||||
}
|
||||
|
||||
.. sourcecode:: java
|
||||
|
||||
public class ObligationSchemaV1 extends MappedSchema {
|
||||
public IOUSchemaV1() {
|
||||
super(Obligation.class, 1, ImmutableList.of(ObligationEntity.class));
|
||||
}
|
||||
|
||||
@Entity
|
||||
@Table(name = "obligations")
|
||||
public static class ObligationEntity extends PersistentState {
|
||||
@Column(name = "currency") private final String currency;
|
||||
@Column(name = "amount") private final Long amount;
|
||||
@Column(name = "lender") @Lob private final Byte[] lender;
|
||||
@Column(name = "borrower") @Lob private final Byte[] borrower;
|
||||
@Column(name = "linear_id") private final UUID linearId;
|
||||
|
||||
|
||||
public ObligationEntity(String currency, Long amount, Byte[] lender, Byte[] borrower, UUID linearId) {
|
||||
this.currency = currency;
|
||||
this.amount = amount;
|
||||
this.lender = lender;
|
||||
this.borrower = borrower;
|
||||
this.linearId = linearId;
|
||||
}
|
||||
|
||||
public String getCurrency() {
|
||||
return currency;
|
||||
}
|
||||
|
||||
public Long getAmount() {
|
||||
return amount;
|
||||
}
|
||||
|
||||
public ByteArray getLender() {
|
||||
return lender;
|
||||
}
|
||||
|
||||
public ByteArray getBorrower() {
|
||||
return borrower;
|
||||
}
|
||||
|
||||
public UUID getId() {
|
||||
return linearId;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
To:
|
||||
|
||||
.. sourcecode:: kotlin
|
||||
.. container:: codeset
|
||||
|
||||
object ObligationSchemaV1 : MappedSchema(Obligation::class.java, 1, listOf(ObligationEntity::class.java)) {
|
||||
@Entity @Table(name = "obligations")
|
||||
class ObligationEntity(obligation: Obligation) : PersistentState() {
|
||||
@Column var currency: String = obligation.amount.token.toString()
|
||||
@Column var amount: Long = obligation.amount.quantity
|
||||
@Column @Lob var lender: ByteArray = obligation.lender.owningKey.encoded
|
||||
@Column @Lob var borrower: ByteArray = obligation.borrower.owningKey.encoded
|
||||
@Column var linear_id: String = obligation.linearId.id.toString()
|
||||
@Column var defaulted: Bool = obligation.amount.inDefault // NEW COLUNM!
|
||||
.. sourcecode:: kotlin
|
||||
|
||||
object ObligationSchemaV1 : MappedSchema(Obligation::class.java, 1, listOf(ObligationEntity::class.java)) {
|
||||
@Entity @Table(name = "obligations")
|
||||
class ObligationEntity(obligation: Obligation) : PersistentState() {
|
||||
@Column var currency: String = obligation.amount.token.toString()
|
||||
@Column var amount: Long = obligation.amount.quantity
|
||||
@Column @Lob var lender: ByteArray = obligation.lender.owningKey.encoded
|
||||
@Column @Lob var borrower: ByteArray = obligation.borrower.owningKey.encoded
|
||||
@Column var linear_id: String = obligation.linearId.id.toString()
|
||||
@Column var defaulted: Bool = obligation.amount.inDefault // NEW COLUMN!
|
||||
}
|
||||
}
|
||||
|
||||
.. sourcecode:: java
|
||||
|
||||
public class ObligationSchemaV1 extends MappedSchema {
|
||||
public IOUSchemaV1() {
|
||||
super(Obligation.class, 1, ImmutableList.of(ObligationEntity.class));
|
||||
}
|
||||
|
||||
@Entity
|
||||
@Table(name = "obligations")
|
||||
public static class ObligationEntity extends PersistentState {
|
||||
@Column(name = "currency") private final String currency;
|
||||
@Column(name = "amount") private final Long amount;
|
||||
@Column(name = "lender") @Lob private final Byte[] lender;
|
||||
@Column(name = "borrower") @Lob private final Byte[] borrower;
|
||||
@Column(name = "linear_id") private final UUID linearId;
|
||||
@Column(name = "defaulted") private final Boolean defaulted; // NEW COLUMN!
|
||||
|
||||
|
||||
public ObligationEntity(String currency, Long amount, Byte[] lender, Byte[] borrower, UUID linearId, Boolean defaulted) {
|
||||
this.currency = currency;
|
||||
this.amount = amount;
|
||||
this.lender = lender;
|
||||
this.borrower = borrower;
|
||||
this.linearId = linearId;
|
||||
this.defaulted = defaulted;
|
||||
}
|
||||
|
||||
public String getCurrency() {
|
||||
return currency;
|
||||
}
|
||||
|
||||
public Long getAmount() {
|
||||
return amount;
|
||||
}
|
||||
|
||||
public ByteArray getLender() {
|
||||
return lender;
|
||||
}
|
||||
|
||||
public ByteArray getBorrower() {
|
||||
return borrower;
|
||||
}
|
||||
|
||||
public UUID getId() {
|
||||
return linearId;
|
||||
}
|
||||
|
||||
public Boolean isDefaulted() {
|
||||
return defaulted;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Thus adding a new column with a default value.
|
||||
|
||||
@ -397,21 +572,43 @@ used, as changes to the state are required. To make a backwards-incompatible cha
|
||||
because a property was removed from a state object), the procedure is to define another object relational mapper, then
|
||||
add it to the ``supportedSchemas`` property of your ``QueryableState``, like so:
|
||||
|
||||
.. sourcecode:: kotlin
|
||||
.. container:: codeset
|
||||
|
||||
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(ExampleSchemaV1, ExampleSchemaV2)
|
||||
.. sourcecode:: kotlin
|
||||
|
||||
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(ExampleSchemaV1, ExampleSchemaV2)
|
||||
|
||||
.. sourcecode:: java
|
||||
|
||||
@Override public Iterable<MappedSchema> supportedSchemas() {
|
||||
return ImmutableList.of(new ExampleSchemaV1(), new ExampleSchemaV2());
|
||||
}
|
||||
|
||||
Then, in ``generateMappedObject``, add support for the new schema:
|
||||
|
||||
.. sourcecode:: kotlin
|
||||
.. container:: codeset
|
||||
|
||||
override fun generateMappedObject(schema: MappedSchema): PersistentState {
|
||||
return when (schema) {
|
||||
is DummyLinearStateSchemaV1 -> // Omitted.
|
||||
is DummyLinearStateSchemaV2 -> // Omitted.
|
||||
else -> throw IllegalArgumentException("Unrecognised schema $schema")
|
||||
.. sourcecode:: kotlin
|
||||
|
||||
override fun generateMappedObject(schema: MappedSchema): PersistentState {
|
||||
return when (schema) {
|
||||
is DummyLinearStateSchemaV1 -> // Omitted.
|
||||
is DummyLinearStateSchemaV2 -> // Omitted.
|
||||
else -> throw IllegalArgumentException("Unrecognised schema $schema")
|
||||
}
|
||||
}
|
||||
|
||||
.. sourcecode:: java
|
||||
|
||||
@Override public PersistentState generateMappedObject(MappedSchema schema) {
|
||||
if (schema instanceof DummyLinearStateSchemaV1) {
|
||||
// Omitted.
|
||||
} else if (schema instanceof DummyLinearStateSchemaV2) {
|
||||
// Omitted.
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unrecognised schema $schema");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
With this approach, whenever the state object is stored in the vault, a representation of it will be stored in two
|
||||
separate database tables where possible - one for each supported schema.
|
@ -1,11 +1,12 @@
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.messaging.MessageRecipientGroup
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.toBase58String
|
||||
import java.security.PublicKey
|
||||
|
||||
/**
|
||||
@ -23,7 +24,7 @@ class ArtemisMessagingComponent {
|
||||
const val PEER_USER = "SystemUsers/Peer"
|
||||
const val INTERNAL_PREFIX = "internal."
|
||||
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." //TODO Come up with better name for common peers/services queue
|
||||
const val P2P_QUEUE = "p2p.inbound"
|
||||
const val P2P_PREFIX = "p2p.inbound."
|
||||
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
|
||||
}
|
||||
|
||||
@ -49,7 +50,7 @@ class ArtemisMessagingComponent {
|
||||
@CordaSerializable
|
||||
data class NodeAddress(override val queueName: String, override val hostAndPort: NetworkHostAndPort) : ArtemisPeerAddress {
|
||||
constructor(peerIdentity: PublicKey, hostAndPort: NetworkHostAndPort) :
|
||||
this("$PEERS_PREFIX${peerIdentity.toBase58String()}", hostAndPort)
|
||||
this("$PEERS_PREFIX${peerIdentity.toStringShort()}", hostAndPort)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -62,6 +63,30 @@ class ArtemisMessagingComponent {
|
||||
* @param identity The service identity's owning key.
|
||||
*/
|
||||
data class ServiceAddress(val identity: PublicKey) : ArtemisAddress, MessageRecipientGroup {
|
||||
override val queueName: String = "$PEERS_PREFIX${identity.toBase58String()}"
|
||||
override val queueName: String = "$PEERS_PREFIX${identity.toStringShort()}"
|
||||
}
|
||||
|
||||
/**
|
||||
* [RemoteInboxAddress] implements [SingleMessageRecipient]. It represents the non-local address of a remote inbox.
|
||||
* @param identity The Node public identity
|
||||
*/
|
||||
data class RemoteInboxAddress(val identity: PublicKey) : ArtemisAddress, SingleMessageRecipient {
|
||||
constructor(party: Party) : this(party.owningKey)
|
||||
|
||||
companion object {
|
||||
/**
|
||||
* When transferring a message from the local holding queue to the remote inbox queue
|
||||
* this method provides a simple translation of the address string.
|
||||
* The topics are distinct so that proper segregation of internal
|
||||
* and external access permissions can be made.
|
||||
*/
|
||||
fun translateLocalQueueToInboxAddress(address: String): String {
|
||||
require(address.startsWith(PEERS_PREFIX)) { "Failed to map address: $address to a remote topic as it is not in the $PEERS_PREFIX namespace" }
|
||||
return P2P_PREFIX + address.substring(PEERS_PREFIX.length)
|
||||
}
|
||||
}
|
||||
|
||||
override val queueName: String = "$P2P_PREFIX${identity.toStringShort()}"
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -3,18 +3,18 @@ package net.corda.node.amqp
|
||||
import com.nhaarman.mockito_kotlin.any
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.toBase58String
|
||||
import net.corda.node.internal.protonwrapper.netty.AMQPServer
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.node.services.messaging.ArtemisMessagingClient
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.internal.crypto.loadKeyStore
|
||||
import net.corda.testing.*
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
@ -53,7 +53,7 @@ class AMQPBridgeTest {
|
||||
@Test
|
||||
fun `test acked and nacked messages`() {
|
||||
// Create local queue
|
||||
val sourceQueueName = "internal.peers." + BOB.publicKey.toBase58String()
|
||||
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
|
||||
val (artemisServer, artemisClient) = createArtemis(sourceQueueName)
|
||||
|
||||
// Pre-populate local queue with 3 messages
|
||||
@ -133,11 +133,13 @@ class AMQPBridgeTest {
|
||||
@Test
|
||||
fun `Test legacy bridge still works`() {
|
||||
// Create local queue
|
||||
val sourceQueueName = "internal.peers." + ALICE.publicKey.toBase58String()
|
||||
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
|
||||
val (artemisLegacyServer, artemisLegacyClient) = createLegacyArtemis(sourceQueueName)
|
||||
|
||||
|
||||
val (artemisServer, artemisClient) = createArtemis(null)
|
||||
val inbox = ArtemisMessagingComponent.RemoteInboxAddress(BOB.party).queueName
|
||||
artemisClient.started!!.session.createQueue(inbox, RoutingType.ANYCAST, inbox, true)
|
||||
|
||||
val artemis = artemisLegacyClient.started!!
|
||||
for (i in 0 until 3) {
|
||||
@ -150,8 +152,7 @@ class AMQPBridgeTest {
|
||||
artemis.producer.send(sourceQueueName, artemisMessage)
|
||||
}
|
||||
|
||||
|
||||
val subs = artemisClient.started!!.session.createConsumer(P2P_QUEUE)
|
||||
val subs = artemisClient.started!!.session.createConsumer(inbox)
|
||||
for (i in 0 until 3) {
|
||||
val msg = subs.receive()
|
||||
val messageBody = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
|
||||
@ -177,9 +178,9 @@ class AMQPBridgeTest {
|
||||
doReturn(true).whenever(it).useAMQPBridges
|
||||
}
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
val networkMap = rigorousMock<NetworkMapCache>().also {
|
||||
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
|
||||
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
|
||||
doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByLegalIdentityKey(any())
|
||||
doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
|
||||
}
|
||||
val userService = rigorousMock<RPCSecurityManager>()
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, null, networkMap, userService, MAX_MESSAGE_SIZE)
|
||||
@ -189,7 +190,7 @@ class AMQPBridgeTest {
|
||||
val artemis = artemisClient.started!!
|
||||
if (sourceQueueName != null) {
|
||||
// Local queue for outgoing messages
|
||||
artemis.session.createQueue(sourceQueueName, RoutingType.MULTICAST, sourceQueueName, true)
|
||||
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
|
||||
}
|
||||
return Pair(artemisServer, artemisClient)
|
||||
}
|
||||
@ -206,9 +207,9 @@ class AMQPBridgeTest {
|
||||
doReturn(ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0))).whenever(it).activeMQServer
|
||||
}
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
val networkMap = rigorousMock<NetworkMapCache>().also {
|
||||
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
|
||||
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
|
||||
doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByLegalIdentityKey(any())
|
||||
doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
|
||||
}
|
||||
val userService = rigorousMock<RPCSecurityManager>()
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, null, networkMap, userService, MAX_MESSAGE_SIZE)
|
||||
@ -217,7 +218,7 @@ class AMQPBridgeTest {
|
||||
artemisClient.start()
|
||||
val artemis = artemisClient.started!!
|
||||
// Local queue for outgoing messages
|
||||
artemis.session.createQueue(sourceQueueName, RoutingType.MULTICAST, sourceQueueName, true)
|
||||
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
|
||||
return Pair(artemisServer, artemisClient)
|
||||
}
|
||||
|
||||
|
@ -13,11 +13,13 @@ import net.corda.node.internal.protonwrapper.messages.MessageStatus
|
||||
import net.corda.node.internal.protonwrapper.netty.AMQPClient
|
||||
import net.corda.node.internal.protonwrapper.netty.AMQPServer
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.config.CertChainPolicyConfig
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.messaging.ArtemisMessagingClient
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.crypto.loadKeyStore
|
||||
import net.corda.testing.*
|
||||
@ -48,7 +50,7 @@ class ProtonWrapperTests {
|
||||
amqpServer.start()
|
||||
val receiveSubs = amqpServer.onReceive.subscribe {
|
||||
assertEquals(BOB_NAME.toString(), it.sourceLegalName)
|
||||
assertEquals("p2p.inbound", it.topic)
|
||||
assertEquals(P2P_PREFIX + "Test", it.topic)
|
||||
assertEquals("Test", String(it.payload))
|
||||
it.complete(true)
|
||||
}
|
||||
@ -64,7 +66,7 @@ class ProtonWrapperTests {
|
||||
assertEquals(true, clientConnect.connected)
|
||||
assertEquals(ALICE_NAME, CordaX500Name.build(clientConnect.remoteCert!!.subjectX500Principal))
|
||||
val msg = amqpClient.createMessage("Test".toByteArray(),
|
||||
"p2p.inbound",
|
||||
P2P_PREFIX + "Test",
|
||||
ALICE_NAME.toString(),
|
||||
emptyMap())
|
||||
amqpClient.write(msg)
|
||||
@ -151,8 +153,8 @@ class ProtonWrapperTests {
|
||||
assertEquals(true, clientConnected.get().connected)
|
||||
assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal))
|
||||
val artemis = artemisClient.started!!
|
||||
val sendAddress = "p2p.inbound"
|
||||
artemis.session.createQueue(sendAddress, RoutingType.MULTICAST, "queue", true)
|
||||
val sendAddress = P2P_PREFIX + "Test"
|
||||
artemis.session.createQueue(sendAddress, RoutingType.ANYCAST, "queue", true)
|
||||
val consumer = artemis.session.createConsumer("queue")
|
||||
val testData = "Test".toByteArray()
|
||||
val testProperty = mutableMapOf<Any?, Any?>()
|
||||
@ -230,7 +232,7 @@ class ProtonWrapperTests {
|
||||
}
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
|
||||
val networkMap = rigorousMock<NetworkMapCache>().also {
|
||||
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
|
||||
doReturn(never<NetworkMapCache.MapChange>()).whenever(it).changed
|
||||
}
|
||||
val userService = rigorousMock<RPCSecurityManager>()
|
||||
|
@ -5,6 +5,7 @@ import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCConnection
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
@ -14,14 +15,13 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.toBase58String
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.testing.ALICE_NAME
|
||||
@ -79,30 +79,30 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
|
||||
@Test
|
||||
fun `consume message from P2P queue`() {
|
||||
assertConsumeAttackFails(P2P_QUEUE)
|
||||
assertConsumeAttackFails("$P2P_PREFIX${alice.info.chooseIdentity().owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `consume message from peer queue`() {
|
||||
val bobParty = startBobAndCommunicateWithAlice()
|
||||
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
|
||||
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `send message to address of peer which has been communicated with`() {
|
||||
val bobParty = startBobAndCommunicateWithAlice()
|
||||
assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
|
||||
assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create queue for peer which has not been communicated with`() {
|
||||
val bob = startNode(BOB_NAME)
|
||||
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toBase58String()}")
|
||||
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `create queue for unknown peer`() {
|
||||
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toBase58String()}"
|
||||
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toStringShort()}"
|
||||
assertAllQueueCreationAttacksFail(invalidPeerQueue)
|
||||
}
|
||||
|
||||
|
@ -40,6 +40,7 @@ import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Scheduler
|
||||
import rx.schedulers.Schedulers
|
||||
import java.security.PublicKey
|
||||
import java.time.Clock
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.management.ObjectName
|
||||
@ -166,11 +167,14 @@ open class Node(configuration: NodeConfiguration,
|
||||
VerifierType.OutOfProcess -> VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, networkParameters.maxMessageSize)
|
||||
VerifierType.InMemory -> null
|
||||
}
|
||||
require(info.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" }
|
||||
val serviceIdentity: PublicKey? = if (info.legalIdentities.size == 1) null else info.legalIdentities[1].owningKey
|
||||
return P2PMessagingClient(
|
||||
configuration,
|
||||
versionInfo,
|
||||
serverAddress,
|
||||
info.legalIdentities[0].owningKey,
|
||||
serviceIdentity,
|
||||
serverThread,
|
||||
database,
|
||||
advertisedAddress,
|
||||
|
@ -29,6 +29,10 @@ interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {
|
||||
|
||||
fun getNodeByHash(nodeHash: SecureHash): NodeInfo?
|
||||
|
||||
/** Find nodes from the [PublicKey] toShortString representation.
|
||||
* This is used for Artemis bridge lookup process. */
|
||||
fun getNodesByOwningKeyIndex(identityKeyIndex: String): List<NodeInfo>
|
||||
|
||||
/** Adds a node to the local cache (generally only used for adding ourselves). */
|
||||
fun addNode(node: NodeInfo)
|
||||
|
||||
|
@ -12,8 +12,8 @@ import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
||||
import net.corda.nodeapi.internal.crypto.loadKeyStore
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
@ -132,7 +132,8 @@ internal class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress:
|
||||
properties[key.toString()] = value
|
||||
}
|
||||
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
||||
val sendableMessage = amqpClient.createMessage(data, P2P_QUEUE,
|
||||
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
||||
val sendableMessage = amqpClient.createMessage(data, peerInbox,
|
||||
legalNames.first().toString(),
|
||||
properties)
|
||||
sendableMessage.onComplete.then {
|
||||
|
@ -8,16 +8,15 @@ import net.corda.core.internal.div
|
||||
import net.corda.core.internal.noneOrSingle
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.parsePublicKeyBase58
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.internal.security.Password
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
|
||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
|
||||
@ -31,7 +30,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisPeerAddress
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
||||
@ -94,7 +93,7 @@ import javax.security.cert.CertificateException
|
||||
class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
private val p2pPort: Int,
|
||||
val rpcPort: Int?,
|
||||
val networkMapCache: NetworkMapCache,
|
||||
val networkMapCache: NetworkMapCacheInternal,
|
||||
val securityManager: RPCSecurityManager,
|
||||
val maxMessageSize: Int) : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
@ -191,7 +190,6 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
// by having its password be an unknown securely random 128-bit value.
|
||||
clusterPassword = BigInteger(128, newSecureRandom()).toString(16)
|
||||
queueConfigurations = listOf(
|
||||
queueConfig(P2P_QUEUE, durable = true),
|
||||
// Create an RPC queue: this will service locally connected clients only (not via a bridge) and those
|
||||
// clients must have authenticated. We could use a single consumer for everything and perhaps we should,
|
||||
// but these queues are not worth persisting.
|
||||
@ -243,7 +241,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
private fun ConfigurationImpl.configureAddressSecurity(): Pair<Configuration, LoginListener> {
|
||||
val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true)
|
||||
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
|
||||
securityRoles[P2P_QUEUE] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
|
||||
securityRoles["$P2P_PREFIX#"] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
|
||||
securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true))
|
||||
// Each RPC user must have its own role and its own queue. This prevents users accessing each other's queues
|
||||
// and stealing RPC responses.
|
||||
@ -309,8 +307,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
|
||||
if (queueName.startsWith(PEERS_PREFIX)) {
|
||||
try {
|
||||
val identity = parsePublicKeyBase58(queueName.substring(PEERS_PREFIX.length))
|
||||
val nodeInfos = networkMapCache.getNodesByLegalIdentityKey(identity)
|
||||
val nodeInfos = networkMapCache.getNodesByOwningKeyIndex(queueName.substring(PEERS_PREFIX.length))
|
||||
if (nodeInfos.isNotEmpty()) {
|
||||
nodeInfos.forEach { deployBridgeToPeer(it) }
|
||||
} else {
|
||||
|
@ -10,15 +10,17 @@ import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import org.apache.activemq.artemis.api.core.Message
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer
|
||||
import org.apache.activemq.artemis.spi.core.remoting.*
|
||||
import org.apache.activemq.artemis.utils.ConfigurationHelper
|
||||
import java.time.Duration
|
||||
@ -46,7 +48,7 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ
|
||||
|
||||
|
||||
/**
|
||||
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
|
||||
* All nodes are expected to have a public facing address called p2p.inbound.$identity for receiving
|
||||
* messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it,
|
||||
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
|
||||
* P2P address.
|
||||
@ -64,7 +66,6 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ
|
||||
activeMQServer.deployBridge(BridgeConfiguration().apply {
|
||||
name = getBridgeName(queueName, target)
|
||||
this.queueName = queueName
|
||||
forwardingAddress = P2P_QUEUE
|
||||
staticConnectors = listOf(target.toString())
|
||||
confirmationWindowSize = 100000 // a guess
|
||||
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
|
||||
@ -77,6 +78,7 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ
|
||||
// our TLS certificate.
|
||||
user = PEER_USER
|
||||
password = PEER_USER
|
||||
transformerClassName = InboxTopicTransformer::class.java.name
|
||||
})
|
||||
}
|
||||
|
||||
@ -99,6 +101,13 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ
|
||||
}
|
||||
}
|
||||
|
||||
class InboxTopicTransformer : Transformer {
|
||||
override fun transform(message: Message): Message {
|
||||
message.address = translateLocalQueueToInboxAddress(message.address)
|
||||
return message
|
||||
}
|
||||
}
|
||||
|
||||
class VerifyingNettyConnectorFactory : NettyConnectorFactory() {
|
||||
override fun createConnector(configuration: MutableMap<String, Any>,
|
||||
handler: BufferHandler?,
|
||||
|
@ -19,7 +19,6 @@ import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.node.utilities.PersistentMap
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
@ -54,21 +53,28 @@ import javax.persistence.Lob
|
||||
* invoke methods on the provided implementation. There is more documentation on this in the docsite and the
|
||||
* CordaRPCClient class.
|
||||
*
|
||||
* @param serverAddress The address of the broker instance to connect to (might be running in the same process).
|
||||
* @param myIdentity The public key to be used as the ArtemisMQ address and queue name for the node.
|
||||
* @param nodeExecutor An executor to run received message tasks upon.
|
||||
* @param advertisedAddress The node address for inbound connections, advertised to the network map service and peers.
|
||||
* If not provided, will default to [serverAddress].
|
||||
* @param config The configuration of the node, which is used for controlling the message redelivery options.
|
||||
* @param versionInfo All messages from the node carry the version info and received messages are checked against this for compatibility.
|
||||
* @param serverAddress The host and port of the Artemis broker.
|
||||
* @param myIdentity The primary identity of the node, which defines the messaging address for externally received messages.
|
||||
* It is also used to construct the myAddress field, which is ultimately advertised in the network map.
|
||||
* @param serviceIdentity An optional second identity if the node is also part of a group address, for example a notary.
|
||||
* @param nodeExecutor The received messages are marshalled onto the server executor to prevent Netty buffers leaking during fiber suspends.
|
||||
* @param database The nodes database, which is used to deduplicate messages.
|
||||
* @param advertisedAddress The externally advertised version of the Artemis broker address used to construct myAddress and included
|
||||
* in the network map data.
|
||||
* @param maxMessageSize A bound applied to the message size.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class P2PMessagingClient(config: NodeConfiguration,
|
||||
private val versionInfo: VersionInfo,
|
||||
serverAddress: NetworkHostAndPort,
|
||||
private val myIdentity: PublicKey,
|
||||
private val serviceIdentity: PublicKey?,
|
||||
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
|
||||
private val database: CordaPersistence,
|
||||
advertisedAddress: NetworkHostAndPort = serverAddress,
|
||||
private val maxMessageSize: Int
|
||||
maxMessageSize: Int
|
||||
) : SingletonSerializeAsToken(), MessagingService {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
@ -126,6 +132,7 @@ class P2PMessagingClient(config: NodeConfiguration,
|
||||
private class InnerState {
|
||||
var running = false
|
||||
var p2pConsumer: ClientConsumer? = null
|
||||
var serviceConsumer: ClientConsumer? = null
|
||||
}
|
||||
|
||||
private val messagesToRedeliver = database.transaction {
|
||||
@ -181,8 +188,20 @@ class P2PMessagingClient(config: NodeConfiguration,
|
||||
fun start() {
|
||||
state.locked {
|
||||
val session = artemis.start().session
|
||||
val inbox = RemoteInboxAddress(myIdentity).queueName
|
||||
// Create a queue, consumer and producer for handling P2P network messages.
|
||||
p2pConsumer = session.createConsumer(P2P_QUEUE)
|
||||
createQueueIfAbsent(inbox)
|
||||
p2pConsumer = session.createConsumer(inbox)
|
||||
if (serviceIdentity != null) {
|
||||
val serviceAddress = RemoteInboxAddress(serviceIdentity).queueName
|
||||
createQueueIfAbsent(serviceAddress)
|
||||
val serviceHandler = session.createConsumer(serviceAddress)
|
||||
serviceHandler.setMessageHandler { msg ->
|
||||
val message: ReceivedMessage? = artemisToCordaMessage(msg)
|
||||
if (message != null)
|
||||
deliver(msg, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
resumeMessageRedelivery()
|
||||
@ -331,6 +350,13 @@ class P2PMessagingClient(config: NodeConfiguration,
|
||||
// Ignore it: this can happen if the server has gone away before we do.
|
||||
}
|
||||
p2pConsumer = null
|
||||
val s = serviceConsumer
|
||||
try {
|
||||
s?.close()
|
||||
} catch (e: ActiveMQObjectClosedException) {
|
||||
// Ignore it: this can happen if the server has gone away before we do.
|
||||
}
|
||||
serviceConsumer = null
|
||||
prevRunning
|
||||
}
|
||||
if (running && !nodeExecutor.isOnThread) {
|
||||
@ -430,7 +456,7 @@ class P2PMessagingClient(config: NodeConfiguration,
|
||||
private fun getMQAddress(target: MessageRecipients): String {
|
||||
return if (target == myAddress) {
|
||||
// If we are sending to ourselves then route the message directly to our P2P queue.
|
||||
P2P_QUEUE
|
||||
RemoteInboxAddress(myIdentity).queueName
|
||||
} else {
|
||||
// Otherwise we send the message to an internal queue for the target residing on our broker. It's then the
|
||||
// broker's job to route the message to the target's P2P queue.
|
||||
@ -447,7 +473,7 @@ class P2PMessagingClient(config: NodeConfiguration,
|
||||
val queueQuery = session.queueQuery(SimpleString(queueName))
|
||||
if (!queueQuery.isExists) {
|
||||
log.info("Create fresh queue $queueName bound on same address")
|
||||
session.createQueue(queueName, RoutingType.MULTICAST, queueName, true)
|
||||
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,15 +7,15 @@ import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
|
||||
import net.corda.node.utilities.*
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.VerifierApi
|
||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
|
||||
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import java.util.concurrent.*
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHostAndPort, metrics: MetricRegistry, private val maxMessageSize: Int) : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
@ -40,7 +40,7 @@ class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHo
|
||||
val queueQuery = session.queueQuery(SimpleString(queueName))
|
||||
if (!queueQuery.isExists) {
|
||||
log.info("Create fresh queue $queueName bound on same address")
|
||||
session.createQueue(queueName, RoutingType.MULTICAST, queueName, true)
|
||||
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
|
||||
}
|
||||
}
|
||||
createQueueIfAbsent(VERIFICATION_REQUESTS_QUEUE_NAME)
|
||||
|
@ -160,6 +160,12 @@ open class PersistentNetworkMapCache(
|
||||
|
||||
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } })
|
||||
|
||||
override fun getNodesByOwningKeyIndex(identityKeyIndex: String): List<NodeInfo> {
|
||||
return database.transaction {
|
||||
queryByIdentityKeyIndex(session, identityKeyIndex)
|
||||
}
|
||||
}
|
||||
|
||||
override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = database.transaction { queryByAddress(session, address) }
|
||||
|
||||
override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = identityByLegalNameCache.get(name).orElse(null)
|
||||
@ -245,15 +251,23 @@ open class PersistentNetworkMapCache(
|
||||
}
|
||||
|
||||
private fun findByIdentityKey(session: Session, identityKey: PublicKey): List<NodeInfoSchemaV1.PersistentNodeInfo> {
|
||||
return findByIdentityKeyIndex(session, identityKey.toStringShort())
|
||||
}
|
||||
|
||||
private fun findByIdentityKeyIndex(session: Session, identityKeyIndex: String): List<NodeInfoSchemaV1.PersistentNodeInfo> {
|
||||
val query = session.createQuery(
|
||||
"SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.owningKeyHash = :owningKeyHash",
|
||||
NodeInfoSchemaV1.PersistentNodeInfo::class.java)
|
||||
query.setParameter("owningKeyHash", identityKey.toStringShort())
|
||||
query.setParameter("owningKeyHash", identityKeyIndex)
|
||||
return query.resultList
|
||||
}
|
||||
|
||||
private fun queryByIdentityKey(session: Session, identityKey: PublicKey): List<NodeInfo> {
|
||||
val result = findByIdentityKey(session, identityKey)
|
||||
return queryByIdentityKeyIndex(session, identityKey.toStringShort())
|
||||
}
|
||||
|
||||
private fun queryByIdentityKeyIndex(session: Session, identityKeyIndex: String): List<NodeInfo> {
|
||||
val result = findByIdentityKeyIndex(session, identityKeyIndex)
|
||||
return result.map { it.toNodeInfo() }
|
||||
}
|
||||
|
||||
|
@ -178,6 +178,7 @@ class ArtemisMessagingTests {
|
||||
MOCK_VERSION_INFO.copy(platformVersion = platformVersion),
|
||||
server,
|
||||
identity.public,
|
||||
null,
|
||||
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
|
||||
database,
|
||||
maxMessageSize = maxMessageSize).apply {
|
||||
|
Loading…
Reference in New Issue
Block a user