Merge pull request #381 from corda/andr3ej-matthew-os-merge

Matthew OS merge
This commit is contained in:
Matthew Nesbit 2018-01-19 08:52:57 +00:00 committed by GitHub
commit 5dc15b8032
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 409 additions and 121 deletions

View File

@ -29,7 +29,7 @@ import net.corda.core.utilities.getOrThrow
import net.corda.nodeapi.ArtemisConsumer
import net.corda.nodeapi.ArtemisProducer
import net.corda.nodeapi.RPCApi
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.client.ClientMessage
@ -194,7 +194,7 @@ class RPCClientProxyHandler(
TimeUnit.MILLISECONDS
)
sessionAndProducerPool.run {
it.session.createTemporaryQueue(clientAddress, ActiveMQDefaultConfiguration.getDefaultRoutingType(), clientAddress)
it.session.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress)
}
val sessionFactory = serverLocator.createSessionFactory()
val session = sessionFactory.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)

View File

@ -158,6 +158,13 @@ R3 Corda 3.0 Developer Preview
* Peer-to-peer communications is now via AMQP 1.0 as default.
Although the legacy Artemis CORE bridging can still be used by setting the ``useAMQPBridges`` configuration property to false.
* The Artemis topics used for peer-to-peer communication have been changed to be more consistent with future cryptographic
agility and to open up the future possibility of sharing brokers between nodes. This is a breaking wire level change
as it means that nodes after this change will not be able to communicate correctly with nodes running the previous version.
Also, any pending enqueued messages in the Artemis message store will not be delivered correctly to their original target.
However, assuming a clean reset of the artemis data and that the nodes are consistent versions,
data persisted via the AMQP serializer will be forward compatible.
* Enterprise Corda only: Compatibility with SQL Server 2017 and SQL Azure databases.
* Enterprise Corda only: node configuration property ``database.schema`` and documented existing database properties.

View File

@ -46,7 +46,7 @@ Message queues
The node makes use of various queues for its operation. The more important ones are described below. Others are used
for maintenance and other minor purposes.
:``p2p.inbound``:
:``p2p.inbound.$identity``:
The node listens for messages sent from other peer nodes on this queue. Only clients who are authenticated to be
nodes on the same network are given permission to send. Messages which are routed internally are also sent to this
queue (e.g. two flows on the same node communicating with each other).
@ -54,7 +54,7 @@ for maintenance and other minor purposes.
:``internal.peers.$identity``:
These are a set of private queues only available to the node which it uses to route messages destined to other peers.
The queue name ends in the base 58 encoding of the peer's identity key. There is at most one queue per peer. The broker
creates a bridge from this queue to the peer's ``p2p.inbound`` queue, using the network map service to lookup the
creates a bridge from this queue to the peer's ``p2p.inbound.$identity`` queue, using the network map service to lookup the
peer's network address.
:``internal.services.$identity``:
@ -86,7 +86,7 @@ Clients attempting to connect to the node's broker fall in one of four groups:
#. Anyone connecting with the username ``SystemUsers/Peer`` is treated as a peer on the same Corda network as the node. Their
TLS root CA must be the same as the node's root CA - the root CA is the doorman of the network and having the same root CA
implies we've been let in by the same doorman. If they are part of the same network then they are only given permission
to send to our ``p2p.inbound`` queue, otherwise they are rejected.
to send to our ``p2p.inbound.$identity`` queue, otherwise they are rejected.
#. Every other username is treated as a RPC user and authenticated against the node's list of valid RPC users. If that
is successful then they are only given sufficient permission to perform RPC, otherwise they are rejected.

View File

@ -1,3 +1,9 @@
.. highlight:: kotlin
.. raw:: html
<script type="text/javascript" src="_static/jquery.js"></script>
<script type="text/javascript" src="_static/codesets.js"></script>
Upgrading a CorDapp (outside of platform version upgrades)
==========================================================
@ -127,17 +133,39 @@ The ``InitiatingFlow`` version number is included in the flow session handshake
the flow running on the other side. In particular, it has a ``flowVersion`` property which can be used to
programmatically evolve flows across versions. For example:
.. sourcecode:: kotlin
.. container:: codeset
@Suspendable
override fun call() {
val otherFlowVersion = otherSession.getCounterpartyFlowInfo().flowVersion
val receivedString = if (otherFlowVersion == 1) {
receive<Int>(otherParty).unwrap { it.toString() }
} else {
receive<String>(otherParty).unwrap { it }
.. sourcecode:: kotlin
@Suspendable
override fun call() {
val otherFlowVersion = otherSession.getCounterpartyFlowInfo().flowVersion
val receivedString = if (otherFlowVersion == 1) {
otherSession.receive<Int>().unwrap { it.toString() }
} else {
otherSession.receive<String>().unwrap { it }
}
}
.. sourcecode:: java
@Suspendable
@Override public Void call() throws FlowException {
int otherFlowVersion = otherSession.getCounterpartyFlowInfo().getFlowVersion();
String receivedString;
if (otherFlowVersion == 1) {
receivedString = otherSession.receive(Integer.class).unwrap(integer -> {
return integer.toString();
});
} else {
receivedString = otherSession.receive(String.class).unwrap(string -> {
return string;
});
}
return null;
}
}
This code shows a flow that in its first version expected to receive an Int, but in subsequent versions was modified to
expect a String. This flow is still able to communicate with parties that are running the older CorDapp containing
@ -147,30 +175,73 @@ How do I deal with interface changes to inlined subflows?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Here is an example of an in-lined subflow:
.. sourcecode:: kotlin
.. container:: codeset
@StartableByRPC
@InitiatingFlow
class FlowA(val recipient: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(FlowB(recipient))
.. sourcecode:: kotlin
@StartableByRPC
@InitiatingFlow
class FlowA(val recipient: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(FlowB(recipient))
}
}
}
@InitiatedBy(FlowA::class)
class FlowC(val otherSession: FlowSession) : FlowLogic() {
// Omitted.
}
// Note: No annotations. This is used as an inlined subflow.
class FlowB(val recipient: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val message = "I'm an inlined subflow, so I inherit the @InitiatingFlow's session ID and type."
initiateFlow(recipient).send(message)
@InitiatedBy(FlowA::class)
class FlowC(val otherSession: FlowSession) : FlowLogic() {
// Omitted.
}
// Note: No annotations. This is used as an inlined subflow.
class FlowB(val recipient: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val message = "I'm an inlined subflow, so I inherit the @InitiatingFlow's session ID and type."
initiateFlow(recipient).send(message)
}
}
.. sourcecode:: java
@StartableByRPC
@InitiatingFlow
class FlowA extends FlowLogic<Void> {
private final Party recipient;
public FlowA(Party recipient) {
this.recipient = recipient;
}
@Suspendable
@Override public Void call() throws FlowException {
subFlow(new FlowB(recipient));
return null;
}
}
@InitiatedBy(FlowA.class)
class FlowC extends FlowLogic<Void> {
// Omitted.
}
// Note: No annotations. This is used as an inlined subflow.
class FlowB extends FlowLogic<Void> {
private final Party recipient;
public FlowB(Party recipient) {
this.recipient = recipient;
}
@Suspendable
@Override public Void call() {
String message = "I'm an inlined subflow, so I inherit the @InitiatingFlow's session ID and type.";
initiateFlow(recipient).send(message);
return null;
}
}
}
Inlined subflows are treated as being the flow that invoked them when initiating a new flow session with a counterparty.
Suppose flow ``A`` calls inlined subflow B, which, in turn, initiates a session with a counterparty. The ``FlowLogic``
@ -361,34 +432,138 @@ for details.
For backwards compatible changes such as adding columns, the procedure for upgrading a state schema is to extend the
existing object relational mapper. For example, we can update:
.. sourcecode:: kotlin
.. container:: codeset
object ObligationSchemaV1 : MappedSchema(Obligation::class.java, 1, listOf(ObligationEntity::class.java)) {
@Entity @Table(name = "obligations")
class ObligationEntity(obligation: Obligation) : PersistentState() {
@Column var currency: String = obligation.amount.token.toString()
@Column var amount: Long = obligation.amount.quantity
@Column @Lob var lender: ByteArray = obligation.lender.owningKey.encoded
@Column @Lob var borrower: ByteArray = obligation.borrower.owningKey.encoded
@Column var linear_id: String = obligation.linearId.id.toString()
.. sourcecode:: kotlin
object ObligationSchemaV1 : MappedSchema(Obligation::class.java, 1, listOf(ObligationEntity::class.java)) {
@Entity @Table(name = "obligations")
class ObligationEntity(obligation: Obligation) : PersistentState() {
@Column var currency: String = obligation.amount.token.toString()
@Column var amount: Long = obligation.amount.quantity
@Column @Lob var lender: ByteArray = obligation.lender.owningKey.encoded
@Column @Lob var borrower: ByteArray = obligation.borrower.owningKey.encoded
@Column var linear_id: String = obligation.linearId.id.toString()
}
}
.. sourcecode:: java
public class ObligationSchemaV1 extends MappedSchema {
public IOUSchemaV1() {
super(Obligation.class, 1, ImmutableList.of(ObligationEntity.class));
}
@Entity
@Table(name = "obligations")
public static class ObligationEntity extends PersistentState {
@Column(name = "currency") private final String currency;
@Column(name = "amount") private final Long amount;
@Column(name = "lender") @Lob private final Byte[] lender;
@Column(name = "borrower") @Lob private final Byte[] borrower;
@Column(name = "linear_id") private final UUID linearId;
public ObligationEntity(String currency, Long amount, Byte[] lender, Byte[] borrower, UUID linearId) {
this.currency = currency;
this.amount = amount;
this.lender = lender;
this.borrower = borrower;
this.linearId = linearId;
}
public String getCurrency() {
return currency;
}
public Long getAmount() {
return amount;
}
public ByteArray getLender() {
return lender;
}
public ByteArray getBorrower() {
return borrower;
}
public UUID getId() {
return linearId;
}
}
}
}
To:
.. sourcecode:: kotlin
.. container:: codeset
object ObligationSchemaV1 : MappedSchema(Obligation::class.java, 1, listOf(ObligationEntity::class.java)) {
@Entity @Table(name = "obligations")
class ObligationEntity(obligation: Obligation) : PersistentState() {
@Column var currency: String = obligation.amount.token.toString()
@Column var amount: Long = obligation.amount.quantity
@Column @Lob var lender: ByteArray = obligation.lender.owningKey.encoded
@Column @Lob var borrower: ByteArray = obligation.borrower.owningKey.encoded
@Column var linear_id: String = obligation.linearId.id.toString()
@Column var defaulted: Bool = obligation.amount.inDefault // NEW COLUNM!
.. sourcecode:: kotlin
object ObligationSchemaV1 : MappedSchema(Obligation::class.java, 1, listOf(ObligationEntity::class.java)) {
@Entity @Table(name = "obligations")
class ObligationEntity(obligation: Obligation) : PersistentState() {
@Column var currency: String = obligation.amount.token.toString()
@Column var amount: Long = obligation.amount.quantity
@Column @Lob var lender: ByteArray = obligation.lender.owningKey.encoded
@Column @Lob var borrower: ByteArray = obligation.borrower.owningKey.encoded
@Column var linear_id: String = obligation.linearId.id.toString()
@Column var defaulted: Bool = obligation.amount.inDefault // NEW COLUMN!
}
}
.. sourcecode:: java
public class ObligationSchemaV1 extends MappedSchema {
public IOUSchemaV1() {
super(Obligation.class, 1, ImmutableList.of(ObligationEntity.class));
}
@Entity
@Table(name = "obligations")
public static class ObligationEntity extends PersistentState {
@Column(name = "currency") private final String currency;
@Column(name = "amount") private final Long amount;
@Column(name = "lender") @Lob private final Byte[] lender;
@Column(name = "borrower") @Lob private final Byte[] borrower;
@Column(name = "linear_id") private final UUID linearId;
@Column(name = "defaulted") private final Boolean defaulted; // NEW COLUMN!
public ObligationEntity(String currency, Long amount, Byte[] lender, Byte[] borrower, UUID linearId, Boolean defaulted) {
this.currency = currency;
this.amount = amount;
this.lender = lender;
this.borrower = borrower;
this.linearId = linearId;
this.defaulted = defaulted;
}
public String getCurrency() {
return currency;
}
public Long getAmount() {
return amount;
}
public ByteArray getLender() {
return lender;
}
public ByteArray getBorrower() {
return borrower;
}
public UUID getId() {
return linearId;
}
public Boolean isDefaulted() {
return defaulted;
}
}
}
}
Thus adding a new column with a default value.
@ -397,21 +572,43 @@ used, as changes to the state are required. To make a backwards-incompatible cha
because a property was removed from a state object), the procedure is to define another object relational mapper, then
add it to the ``supportedSchemas`` property of your ``QueryableState``, like so:
.. sourcecode:: kotlin
.. container:: codeset
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(ExampleSchemaV1, ExampleSchemaV2)
.. sourcecode:: kotlin
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(ExampleSchemaV1, ExampleSchemaV2)
.. sourcecode:: java
@Override public Iterable<MappedSchema> supportedSchemas() {
return ImmutableList.of(new ExampleSchemaV1(), new ExampleSchemaV2());
}
Then, in ``generateMappedObject``, add support for the new schema:
.. sourcecode:: kotlin
.. container:: codeset
override fun generateMappedObject(schema: MappedSchema): PersistentState {
return when (schema) {
is DummyLinearStateSchemaV1 -> // Omitted.
is DummyLinearStateSchemaV2 -> // Omitted.
else -> throw IllegalArgumentException("Unrecognised schema $schema")
.. sourcecode:: kotlin
override fun generateMappedObject(schema: MappedSchema): PersistentState {
return when (schema) {
is DummyLinearStateSchemaV1 -> // Omitted.
is DummyLinearStateSchemaV2 -> // Omitted.
else -> throw IllegalArgumentException("Unrecognised schema $schema")
}
}
.. sourcecode:: java
@Override public PersistentState generateMappedObject(MappedSchema schema) {
if (schema instanceof DummyLinearStateSchemaV1) {
// Omitted.
} else if (schema instanceof DummyLinearStateSchemaV2) {
// Omitted.
} else {
throw new IllegalArgumentException("Unrecognised schema $schema");
}
}
}
With this approach, whenever the state object is stored in the vault, a representation of it will be stored in two
separate database tables where possible - one for each supported schema.

View File

@ -1,11 +1,12 @@
package net.corda.nodeapi.internal
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.Party
import net.corda.core.messaging.MessageRecipientGroup
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.toBase58String
import java.security.PublicKey
/**
@ -23,7 +24,7 @@ class ArtemisMessagingComponent {
const val PEER_USER = "SystemUsers/Peer"
const val INTERNAL_PREFIX = "internal."
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." //TODO Come up with better name for common peers/services queue
const val P2P_QUEUE = "p2p.inbound"
const val P2P_PREFIX = "p2p.inbound."
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
}
@ -49,7 +50,7 @@ class ArtemisMessagingComponent {
@CordaSerializable
data class NodeAddress(override val queueName: String, override val hostAndPort: NetworkHostAndPort) : ArtemisPeerAddress {
constructor(peerIdentity: PublicKey, hostAndPort: NetworkHostAndPort) :
this("$PEERS_PREFIX${peerIdentity.toBase58String()}", hostAndPort)
this("$PEERS_PREFIX${peerIdentity.toStringShort()}", hostAndPort)
}
/**
@ -62,6 +63,30 @@ class ArtemisMessagingComponent {
* @param identity The service identity's owning key.
*/
data class ServiceAddress(val identity: PublicKey) : ArtemisAddress, MessageRecipientGroup {
override val queueName: String = "$PEERS_PREFIX${identity.toBase58String()}"
override val queueName: String = "$PEERS_PREFIX${identity.toStringShort()}"
}
/**
* [RemoteInboxAddress] implements [SingleMessageRecipient]. It represents the non-local address of a remote inbox.
* @param identity The Node public identity
*/
data class RemoteInboxAddress(val identity: PublicKey) : ArtemisAddress, SingleMessageRecipient {
constructor(party: Party) : this(party.owningKey)
companion object {
/**
* When transferring a message from the local holding queue to the remote inbox queue
* this method provides a simple translation of the address string.
* The topics are distinct so that proper segregation of internal
* and external access permissions can be made.
*/
fun translateLocalQueueToInboxAddress(address: String): String {
require(address.startsWith(PEERS_PREFIX)) { "Failed to map address: $address to a remote topic as it is not in the $PEERS_PREFIX namespace" }
return P2P_PREFIX + address.substring(PEERS_PREFIX.length)
}
}
override val queueName: String = "$P2P_PREFIX${identity.toStringShort()}"
}
}

View File

@ -3,18 +3,18 @@ package net.corda.node.amqp
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.crypto.toStringShort
import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.toBase58String
import net.corda.node.internal.protonwrapper.netty.AMQPServer
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.*
import net.corda.node.services.messaging.ArtemisMessagingClient
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.crypto.loadKeyStore
import net.corda.testing.*
import net.corda.testing.internal.rigorousMock
@ -53,7 +53,7 @@ class AMQPBridgeTest {
@Test
fun `test acked and nacked messages`() {
// Create local queue
val sourceQueueName = "internal.peers." + BOB.publicKey.toBase58String()
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
val (artemisServer, artemisClient) = createArtemis(sourceQueueName)
// Pre-populate local queue with 3 messages
@ -133,11 +133,13 @@ class AMQPBridgeTest {
@Test
fun `Test legacy bridge still works`() {
// Create local queue
val sourceQueueName = "internal.peers." + ALICE.publicKey.toBase58String()
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
val (artemisLegacyServer, artemisLegacyClient) = createLegacyArtemis(sourceQueueName)
val (artemisServer, artemisClient) = createArtemis(null)
val inbox = ArtemisMessagingComponent.RemoteInboxAddress(BOB.party).queueName
artemisClient.started!!.session.createQueue(inbox, RoutingType.ANYCAST, inbox, true)
val artemis = artemisLegacyClient.started!!
for (i in 0 until 3) {
@ -150,8 +152,7 @@ class AMQPBridgeTest {
artemis.producer.send(sourceQueueName, artemisMessage)
}
val subs = artemisClient.started!!.session.createConsumer(P2P_QUEUE)
val subs = artemisClient.started!!.session.createConsumer(inbox)
for (i in 0 until 3) {
val msg = subs.receive()
val messageBody = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
@ -177,9 +178,9 @@ class AMQPBridgeTest {
doReturn(true).whenever(it).useAMQPBridges
}
artemisConfig.configureWithDevSSLCertificate()
val networkMap = rigorousMock<NetworkMapCache>().also {
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByLegalIdentityKey(any())
doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
}
val userService = rigorousMock<RPCSecurityManager>()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, null, networkMap, userService, MAX_MESSAGE_SIZE)
@ -189,7 +190,7 @@ class AMQPBridgeTest {
val artemis = artemisClient.started!!
if (sourceQueueName != null) {
// Local queue for outgoing messages
artemis.session.createQueue(sourceQueueName, RoutingType.MULTICAST, sourceQueueName, true)
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
}
return Pair(artemisServer, artemisClient)
}
@ -206,9 +207,9 @@ class AMQPBridgeTest {
doReturn(ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0))).whenever(it).activeMQServer
}
artemisConfig.configureWithDevSSLCertificate()
val networkMap = rigorousMock<NetworkMapCache>().also {
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByLegalIdentityKey(any())
doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
}
val userService = rigorousMock<RPCSecurityManager>()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, null, networkMap, userService, MAX_MESSAGE_SIZE)
@ -217,7 +218,7 @@ class AMQPBridgeTest {
artemisClient.start()
val artemis = artemisClient.started!!
// Local queue for outgoing messages
artemis.session.createQueue(sourceQueueName, RoutingType.MULTICAST, sourceQueueName, true)
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
return Pair(artemisServer, artemisClient)
}

View File

@ -13,11 +13,13 @@ import net.corda.node.internal.protonwrapper.messages.MessageStatus
import net.corda.node.internal.protonwrapper.netty.AMQPClient
import net.corda.node.internal.protonwrapper.netty.AMQPServer
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingClient
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.crypto.loadKeyStore
import net.corda.testing.*
@ -48,7 +50,7 @@ class ProtonWrapperTests {
amqpServer.start()
val receiveSubs = amqpServer.onReceive.subscribe {
assertEquals(BOB_NAME.toString(), it.sourceLegalName)
assertEquals("p2p.inbound", it.topic)
assertEquals(P2P_PREFIX + "Test", it.topic)
assertEquals("Test", String(it.payload))
it.complete(true)
}
@ -64,7 +66,7 @@ class ProtonWrapperTests {
assertEquals(true, clientConnect.connected)
assertEquals(ALICE_NAME, CordaX500Name.build(clientConnect.remoteCert!!.subjectX500Principal))
val msg = amqpClient.createMessage("Test".toByteArray(),
"p2p.inbound",
P2P_PREFIX + "Test",
ALICE_NAME.toString(),
emptyMap())
amqpClient.write(msg)
@ -151,8 +153,8 @@ class ProtonWrapperTests {
assertEquals(true, clientConnected.get().connected)
assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal))
val artemis = artemisClient.started!!
val sendAddress = "p2p.inbound"
artemis.session.createQueue(sendAddress, RoutingType.MULTICAST, "queue", true)
val sendAddress = P2P_PREFIX + "Test"
artemis.session.createQueue(sendAddress, RoutingType.ANYCAST, "queue", true)
val consumer = artemis.session.createConsumer("queue")
val testData = "Test".toByteArray()
val testProperty = mutableMapOf<Any?, Any?>()
@ -230,7 +232,7 @@ class ProtonWrapperTests {
}
artemisConfig.configureWithDevSSLCertificate()
val networkMap = rigorousMock<NetworkMapCache>().also {
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
doReturn(never<NetworkMapCache.MapChange>()).whenever(it).changed
}
val userService = rigorousMock<RPCSecurityManager>()

View File

@ -5,6 +5,7 @@ import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCConnection
import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.random63BitValue
import net.corda.core.crypto.toStringShort
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
@ -14,14 +15,13 @@ import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.toBase58String
import net.corda.core.utilities.unwrap
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.testing.ALICE_NAME
@ -79,30 +79,30 @@ abstract class MQSecurityTest : NodeBasedTest() {
@Test
fun `consume message from P2P queue`() {
assertConsumeAttackFails(P2P_QUEUE)
assertConsumeAttackFails("$P2P_PREFIX${alice.info.chooseIdentity().owningKey.toStringShort()}")
}
@Test
fun `consume message from peer queue`() {
val bobParty = startBobAndCommunicateWithAlice()
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
}
@Test
fun `send message to address of peer which has been communicated with`() {
val bobParty = startBobAndCommunicateWithAlice()
assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
}
@Test
fun `create queue for peer which has not been communicated with`() {
val bob = startNode(BOB_NAME)
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toBase58String()}")
assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toStringShort()}")
}
@Test
fun `create queue for unknown peer`() {
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toBase58String()}"
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toStringShort()}"
assertAllQueueCreationAttacksFail(invalidPeerQueue)
}

View File

@ -40,6 +40,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.Scheduler
import rx.schedulers.Schedulers
import java.security.PublicKey
import java.time.Clock
import java.util.concurrent.atomic.AtomicInteger
import javax.management.ObjectName
@ -166,11 +167,14 @@ open class Node(configuration: NodeConfiguration,
VerifierType.OutOfProcess -> VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, networkParameters.maxMessageSize)
VerifierType.InMemory -> null
}
require(info.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" }
val serviceIdentity: PublicKey? = if (info.legalIdentities.size == 1) null else info.legalIdentities[1].owningKey
return P2PMessagingClient(
configuration,
versionInfo,
serverAddress,
info.legalIdentities[0].owningKey,
serviceIdentity,
serverThread,
database,
advertisedAddress,

View File

@ -29,6 +29,10 @@ interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {
fun getNodeByHash(nodeHash: SecureHash): NodeInfo?
/** Find nodes from the [PublicKey] toShortString representation.
* This is used for Artemis bridge lookup process. */
fun getNodesByOwningKeyIndex(identityKeyIndex: String): List<NodeInfo>
/** Adds a node to the local cache (generally only used for adding ourselves). */
fun addNode(node: NodeInfo)

View File

@ -12,8 +12,8 @@ import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
import net.corda.nodeapi.internal.crypto.loadKeyStore
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
@ -132,7 +132,8 @@ internal class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress:
properties[key.toString()] = value
}
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
val sendableMessage = amqpClient.createMessage(data, P2P_QUEUE,
val peerInbox = translateLocalQueueToInboxAddress(queueName)
val sendableMessage = amqpClient.createMessage(data, peerInbox,
legalNames.first().toString(),
properties)
sendableMessage.onComplete.then {

View File

@ -8,16 +8,15 @@ import net.corda.core.internal.div
import net.corda.core.internal.noneOrSingle
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.parsePublicKeyBase58
import net.corda.node.internal.Node
import net.corda.node.internal.security.Password
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
@ -31,7 +30,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisPeerAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
@ -94,7 +93,7 @@ import javax.security.cert.CertificateException
class ArtemisMessagingServer(private val config: NodeConfiguration,
private val p2pPort: Int,
val rpcPort: Int?,
val networkMapCache: NetworkMapCache,
val networkMapCache: NetworkMapCacheInternal,
val securityManager: RPCSecurityManager,
val maxMessageSize: Int) : SingletonSerializeAsToken() {
companion object {
@ -191,7 +190,6 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
// by having its password be an unknown securely random 128-bit value.
clusterPassword = BigInteger(128, newSecureRandom()).toString(16)
queueConfigurations = listOf(
queueConfig(P2P_QUEUE, durable = true),
// Create an RPC queue: this will service locally connected clients only (not via a bridge) and those
// clients must have authenticated. We could use a single consumer for everything and perhaps we should,
// but these queues are not worth persisting.
@ -243,7 +241,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
private fun ConfigurationImpl.configureAddressSecurity(): Pair<Configuration, LoginListener> {
val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true)
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
securityRoles[P2P_QUEUE] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
securityRoles["$P2P_PREFIX#"] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true))
// Each RPC user must have its own role and its own queue. This prevents users accessing each other's queues
// and stealing RPC responses.
@ -309,8 +307,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
if (queueName.startsWith(PEERS_PREFIX)) {
try {
val identity = parsePublicKeyBase58(queueName.substring(PEERS_PREFIX.length))
val nodeInfos = networkMapCache.getNodesByLegalIdentityKey(identity)
val nodeInfos = networkMapCache.getNodesByOwningKeyIndex(queueName.substring(PEERS_PREFIX.length))
if (nodeInfos.isNotEmpty()) {
nodeInfos.forEach { deployBridgeToPeer(it) }
} else {

View File

@ -10,15 +10,17 @@ import net.corda.node.services.config.NodeConfiguration
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
import net.corda.nodeapi.internal.crypto.X509Utilities
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.cluster.Transformer
import org.apache.activemq.artemis.spi.core.remoting.*
import org.apache.activemq.artemis.utils.ConfigurationHelper
import java.time.Duration
@ -46,7 +48,7 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ
/**
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
* All nodes are expected to have a public facing address called p2p.inbound.$identity for receiving
* messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it,
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
* P2P address.
@ -64,7 +66,6 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ
activeMQServer.deployBridge(BridgeConfiguration().apply {
name = getBridgeName(queueName, target)
this.queueName = queueName
forwardingAddress = P2P_QUEUE
staticConnectors = listOf(target.toString())
confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
@ -77,6 +78,7 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ
// our TLS certificate.
user = PEER_USER
password = PEER_USER
transformerClassName = InboxTopicTransformer::class.java.name
})
}
@ -99,6 +101,13 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ
}
}
class InboxTopicTransformer : Transformer {
override fun transform(message: Message): Message {
message.address = translateLocalQueueToInboxAddress(message.address)
return message
}
}
class VerifyingNettyConnectorFactory : NettyConnectorFactory() {
override fun createConnector(configuration: MutableMap<String, Any>,
handler: BufferHandler?,

View File

@ -19,7 +19,6 @@ import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
@ -54,21 +53,28 @@ import javax.persistence.Lob
* invoke methods on the provided implementation. There is more documentation on this in the docsite and the
* CordaRPCClient class.
*
* @param serverAddress The address of the broker instance to connect to (might be running in the same process).
* @param myIdentity The public key to be used as the ArtemisMQ address and queue name for the node.
* @param nodeExecutor An executor to run received message tasks upon.
* @param advertisedAddress The node address for inbound connections, advertised to the network map service and peers.
* If not provided, will default to [serverAddress].
* @param config The configuration of the node, which is used for controlling the message redelivery options.
* @param versionInfo All messages from the node carry the version info and received messages are checked against this for compatibility.
* @param serverAddress The host and port of the Artemis broker.
* @param myIdentity The primary identity of the node, which defines the messaging address for externally received messages.
* It is also used to construct the myAddress field, which is ultimately advertised in the network map.
* @param serviceIdentity An optional second identity if the node is also part of a group address, for example a notary.
* @param nodeExecutor The received messages are marshalled onto the server executor to prevent Netty buffers leaking during fiber suspends.
* @param database The nodes database, which is used to deduplicate messages.
* @param advertisedAddress The externally advertised version of the Artemis broker address used to construct myAddress and included
* in the network map data.
* @param maxMessageSize A bound applied to the message size.
*/
@ThreadSafe
class P2PMessagingClient(config: NodeConfiguration,
private val versionInfo: VersionInfo,
serverAddress: NetworkHostAndPort,
private val myIdentity: PublicKey,
private val serviceIdentity: PublicKey?,
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
private val database: CordaPersistence,
advertisedAddress: NetworkHostAndPort = serverAddress,
private val maxMessageSize: Int
maxMessageSize: Int
) : SingletonSerializeAsToken(), MessagingService {
companion object {
private val log = contextLogger()
@ -126,6 +132,7 @@ class P2PMessagingClient(config: NodeConfiguration,
private class InnerState {
var running = false
var p2pConsumer: ClientConsumer? = null
var serviceConsumer: ClientConsumer? = null
}
private val messagesToRedeliver = database.transaction {
@ -181,8 +188,20 @@ class P2PMessagingClient(config: NodeConfiguration,
fun start() {
state.locked {
val session = artemis.start().session
val inbox = RemoteInboxAddress(myIdentity).queueName
// Create a queue, consumer and producer for handling P2P network messages.
p2pConsumer = session.createConsumer(P2P_QUEUE)
createQueueIfAbsent(inbox)
p2pConsumer = session.createConsumer(inbox)
if (serviceIdentity != null) {
val serviceAddress = RemoteInboxAddress(serviceIdentity).queueName
createQueueIfAbsent(serviceAddress)
val serviceHandler = session.createConsumer(serviceAddress)
serviceHandler.setMessageHandler { msg ->
val message: ReceivedMessage? = artemisToCordaMessage(msg)
if (message != null)
deliver(msg, message)
}
}
}
resumeMessageRedelivery()
@ -331,6 +350,13 @@ class P2PMessagingClient(config: NodeConfiguration,
// Ignore it: this can happen if the server has gone away before we do.
}
p2pConsumer = null
val s = serviceConsumer
try {
s?.close()
} catch (e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do.
}
serviceConsumer = null
prevRunning
}
if (running && !nodeExecutor.isOnThread) {
@ -430,7 +456,7 @@ class P2PMessagingClient(config: NodeConfiguration,
private fun getMQAddress(target: MessageRecipients): String {
return if (target == myAddress) {
// If we are sending to ourselves then route the message directly to our P2P queue.
P2P_QUEUE
RemoteInboxAddress(myIdentity).queueName
} else {
// Otherwise we send the message to an internal queue for the target residing on our broker. It's then the
// broker's job to route the message to the target's P2P queue.
@ -447,7 +473,7 @@ class P2PMessagingClient(config: NodeConfiguration,
val queueQuery = session.queueQuery(SimpleString(queueName))
if (!queueQuery.isExists) {
log.info("Create fresh queue $queueName bound on same address")
session.createQueue(queueName, RoutingType.MULTICAST, queueName, true)
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
}
}
}

View File

@ -7,15 +7,15 @@ import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
import net.corda.node.utilities.*
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
import net.corda.nodeapi.internal.config.SSLConfiguration
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import java.util.concurrent.*
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import java.util.concurrent.TimeUnit
class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHostAndPort, metrics: MetricRegistry, private val maxMessageSize: Int) : SingletonSerializeAsToken() {
companion object {
@ -40,7 +40,7 @@ class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHo
val queueQuery = session.queueQuery(SimpleString(queueName))
if (!queueQuery.isExists) {
log.info("Create fresh queue $queueName bound on same address")
session.createQueue(queueName, RoutingType.MULTICAST, queueName, true)
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
}
}
createQueueIfAbsent(VERIFICATION_REQUESTS_QUEUE_NAME)

View File

@ -160,6 +160,12 @@ open class PersistentNetworkMapCache(
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } })
override fun getNodesByOwningKeyIndex(identityKeyIndex: String): List<NodeInfo> {
return database.transaction {
queryByIdentityKeyIndex(session, identityKeyIndex)
}
}
override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = database.transaction { queryByAddress(session, address) }
override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = identityByLegalNameCache.get(name).orElse(null)
@ -245,15 +251,23 @@ open class PersistentNetworkMapCache(
}
private fun findByIdentityKey(session: Session, identityKey: PublicKey): List<NodeInfoSchemaV1.PersistentNodeInfo> {
return findByIdentityKeyIndex(session, identityKey.toStringShort())
}
private fun findByIdentityKeyIndex(session: Session, identityKeyIndex: String): List<NodeInfoSchemaV1.PersistentNodeInfo> {
val query = session.createQuery(
"SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.owningKeyHash = :owningKeyHash",
NodeInfoSchemaV1.PersistentNodeInfo::class.java)
query.setParameter("owningKeyHash", identityKey.toStringShort())
query.setParameter("owningKeyHash", identityKeyIndex)
return query.resultList
}
private fun queryByIdentityKey(session: Session, identityKey: PublicKey): List<NodeInfo> {
val result = findByIdentityKey(session, identityKey)
return queryByIdentityKeyIndex(session, identityKey.toStringShort())
}
private fun queryByIdentityKeyIndex(session: Session, identityKeyIndex: String): List<NodeInfo> {
val result = findByIdentityKeyIndex(session, identityKeyIndex)
return result.map { it.toNodeInfo() }
}

View File

@ -178,6 +178,7 @@ class ArtemisMessagingTests {
MOCK_VERSION_INFO.copy(platformVersion = platformVersion),
server,
identity.public,
null,
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
database,
maxMessageSize = maxMessageSize).apply {