diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 9de4fd6afb..813569dc89 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -29,7 +29,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.nodeapi.ArtemisConsumer import net.corda.nodeapi.ArtemisProducer import net.corda.nodeapi.RPCApi -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration +import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE import org.apache.activemq.artemis.api.core.client.ClientMessage @@ -194,7 +194,7 @@ class RPCClientProxyHandler( TimeUnit.MILLISECONDS ) sessionAndProducerPool.run { - it.session.createTemporaryQueue(clientAddress, ActiveMQDefaultConfiguration.getDefaultRoutingType(), clientAddress) + it.session.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress) } val sessionFactory = serverLocator.createSessionFactory() val session = sessionFactory.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 08d5020fe7..29970a8b30 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -158,6 +158,13 @@ R3 Corda 3.0 Developer Preview * Peer-to-peer communications is now via AMQP 1.0 as default. Although the legacy Artemis CORE bridging can still be used by setting the ``useAMQPBridges`` configuration property to false. +* The Artemis topics used for peer-to-peer communication have been changed to be more consistent with future cryptographic + agility and to open up the future possibility of sharing brokers between nodes. This is a breaking wire level change + as it means that nodes after this change will not be able to communicate correctly with nodes running the previous version. + Also, any pending enqueued messages in the Artemis message store will not be delivered correctly to their original target. + However, assuming a clean reset of the artemis data and that the nodes are consistent versions, + data persisted via the AMQP serializer will be forward compatible. + * Enterprise Corda only: Compatibility with SQL Server 2017 and SQL Azure databases. * Enterprise Corda only: node configuration property ``database.schema`` and documented existing database properties. diff --git a/docs/source/messaging.rst b/docs/source/messaging.rst index 55f48418d3..6dd5b33cc2 100644 --- a/docs/source/messaging.rst +++ b/docs/source/messaging.rst @@ -46,7 +46,7 @@ Message queues The node makes use of various queues for its operation. The more important ones are described below. Others are used for maintenance and other minor purposes. -:``p2p.inbound``: +:``p2p.inbound.$identity``: The node listens for messages sent from other peer nodes on this queue. Only clients who are authenticated to be nodes on the same network are given permission to send. Messages which are routed internally are also sent to this queue (e.g. two flows on the same node communicating with each other). @@ -54,7 +54,7 @@ for maintenance and other minor purposes. :``internal.peers.$identity``: These are a set of private queues only available to the node which it uses to route messages destined to other peers. The queue name ends in the base 58 encoding of the peer's identity key. There is at most one queue per peer. The broker - creates a bridge from this queue to the peer's ``p2p.inbound`` queue, using the network map service to lookup the + creates a bridge from this queue to the peer's ``p2p.inbound.$identity`` queue, using the network map service to lookup the peer's network address. :``internal.services.$identity``: @@ -86,7 +86,7 @@ Clients attempting to connect to the node's broker fall in one of four groups: #. Anyone connecting with the username ``SystemUsers/Peer`` is treated as a peer on the same Corda network as the node. Their TLS root CA must be the same as the node's root CA - the root CA is the doorman of the network and having the same root CA implies we've been let in by the same doorman. If they are part of the same network then they are only given permission - to send to our ``p2p.inbound`` queue, otherwise they are rejected. + to send to our ``p2p.inbound.$identity`` queue, otherwise they are rejected. #. Every other username is treated as a RPC user and authenticated against the node's list of valid RPC users. If that is successful then they are only given sufficient permission to perform RPC, otherwise they are rejected. diff --git a/docs/source/upgrading-cordapps.rst b/docs/source/upgrading-cordapps.rst index f7f8a6b038..893884bb78 100644 --- a/docs/source/upgrading-cordapps.rst +++ b/docs/source/upgrading-cordapps.rst @@ -1,3 +1,9 @@ +.. highlight:: kotlin +.. raw:: html + + <script type="text/javascript" src="_static/jquery.js"></script> + <script type="text/javascript" src="_static/codesets.js"></script> + Upgrading a CorDapp (outside of platform version upgrades) ========================================================== @@ -127,17 +133,39 @@ The ``InitiatingFlow`` version number is included in the flow session handshake the flow running on the other side. In particular, it has a ``flowVersion`` property which can be used to programmatically evolve flows across versions. For example: -.. sourcecode:: kotlin +.. container:: codeset - @Suspendable - override fun call() { - val otherFlowVersion = otherSession.getCounterpartyFlowInfo().flowVersion - val receivedString = if (otherFlowVersion == 1) { - receive<Int>(otherParty).unwrap { it.toString() } - } else { - receive<String>(otherParty).unwrap { it } + .. sourcecode:: kotlin + + @Suspendable + override fun call() { + val otherFlowVersion = otherSession.getCounterpartyFlowInfo().flowVersion + val receivedString = if (otherFlowVersion == 1) { + otherSession.receive<Int>().unwrap { it.toString() } + } else { + otherSession.receive<String>().unwrap { it } + } + } + + .. sourcecode:: java + + @Suspendable + @Override public Void call() throws FlowException { + int otherFlowVersion = otherSession.getCounterpartyFlowInfo().getFlowVersion(); + String receivedString; + + if (otherFlowVersion == 1) { + receivedString = otherSession.receive(Integer.class).unwrap(integer -> { + return integer.toString(); + }); + } else { + receivedString = otherSession.receive(String.class).unwrap(string -> { + return string; + }); + } + + return null; } - } This code shows a flow that in its first version expected to receive an Int, but in subsequent versions was modified to expect a String. This flow is still able to communicate with parties that are running the older CorDapp containing @@ -147,30 +175,73 @@ How do I deal with interface changes to inlined subflows? ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Here is an example of an in-lined subflow: -.. sourcecode:: kotlin +.. container:: codeset - @StartableByRPC - @InitiatingFlow - class FlowA(val recipient: Party) : FlowLogic<Unit>() { - @Suspendable - override fun call() { - subFlow(FlowB(recipient)) + .. sourcecode:: kotlin + + @StartableByRPC + @InitiatingFlow + class FlowA(val recipient: Party) : FlowLogic<Unit>() { + @Suspendable + override fun call() { + subFlow(FlowB(recipient)) + } } - } - @InitiatedBy(FlowA::class) - class FlowC(val otherSession: FlowSession) : FlowLogic() { - // Omitted. - } - - // Note: No annotations. This is used as an inlined subflow. - class FlowB(val recipient: Party) : FlowLogic<Unit>() { - @Suspendable - override fun call() { - val message = "I'm an inlined subflow, so I inherit the @InitiatingFlow's session ID and type." - initiateFlow(recipient).send(message) + @InitiatedBy(FlowA::class) + class FlowC(val otherSession: FlowSession) : FlowLogic() { + // Omitted. + } + + // Note: No annotations. This is used as an inlined subflow. + class FlowB(val recipient: Party) : FlowLogic<Unit>() { + @Suspendable + override fun call() { + val message = "I'm an inlined subflow, so I inherit the @InitiatingFlow's session ID and type." + initiateFlow(recipient).send(message) + } + } + + .. sourcecode:: java + + @StartableByRPC + @InitiatingFlow + class FlowA extends FlowLogic<Void> { + private final Party recipient; + + public FlowA(Party recipient) { + this.recipient = recipient; + } + + @Suspendable + @Override public Void call() throws FlowException { + subFlow(new FlowB(recipient)); + + return null; + } + } + + @InitiatedBy(FlowA.class) + class FlowC extends FlowLogic<Void> { + // Omitted. + } + + // Note: No annotations. This is used as an inlined subflow. + class FlowB extends FlowLogic<Void> { + private final Party recipient; + + public FlowB(Party recipient) { + this.recipient = recipient; + } + + @Suspendable + @Override public Void call() { + String message = "I'm an inlined subflow, so I inherit the @InitiatingFlow's session ID and type."; + initiateFlow(recipient).send(message); + + return null; + } } - } Inlined subflows are treated as being the flow that invoked them when initiating a new flow session with a counterparty. Suppose flow ``A`` calls inlined subflow B, which, in turn, initiates a session with a counterparty. The ``FlowLogic`` @@ -361,34 +432,138 @@ for details. For backwards compatible changes such as adding columns, the procedure for upgrading a state schema is to extend the existing object relational mapper. For example, we can update: -.. sourcecode:: kotlin +.. container:: codeset - object ObligationSchemaV1 : MappedSchema(Obligation::class.java, 1, listOf(ObligationEntity::class.java)) { - @Entity @Table(name = "obligations") - class ObligationEntity(obligation: Obligation) : PersistentState() { - @Column var currency: String = obligation.amount.token.toString() - @Column var amount: Long = obligation.amount.quantity - @Column @Lob var lender: ByteArray = obligation.lender.owningKey.encoded - @Column @Lob var borrower: ByteArray = obligation.borrower.owningKey.encoded - @Column var linear_id: String = obligation.linearId.id.toString() + .. sourcecode:: kotlin + + object ObligationSchemaV1 : MappedSchema(Obligation::class.java, 1, listOf(ObligationEntity::class.java)) { + @Entity @Table(name = "obligations") + class ObligationEntity(obligation: Obligation) : PersistentState() { + @Column var currency: String = obligation.amount.token.toString() + @Column var amount: Long = obligation.amount.quantity + @Column @Lob var lender: ByteArray = obligation.lender.owningKey.encoded + @Column @Lob var borrower: ByteArray = obligation.borrower.owningKey.encoded + @Column var linear_id: String = obligation.linearId.id.toString() + } + } + + .. sourcecode:: java + + public class ObligationSchemaV1 extends MappedSchema { + public IOUSchemaV1() { + super(Obligation.class, 1, ImmutableList.of(ObligationEntity.class)); + } + + @Entity + @Table(name = "obligations") + public static class ObligationEntity extends PersistentState { + @Column(name = "currency") private final String currency; + @Column(name = "amount") private final Long amount; + @Column(name = "lender") @Lob private final Byte[] lender; + @Column(name = "borrower") @Lob private final Byte[] borrower; + @Column(name = "linear_id") private final UUID linearId; + + + public ObligationEntity(String currency, Long amount, Byte[] lender, Byte[] borrower, UUID linearId) { + this.currency = currency; + this.amount = amount; + this.lender = lender; + this.borrower = borrower; + this.linearId = linearId; + } + + public String getCurrency() { + return currency; + } + + public Long getAmount() { + return amount; + } + + public ByteArray getLender() { + return lender; + } + + public ByteArray getBorrower() { + return borrower; + } + + public UUID getId() { + return linearId; + } + } } - } To: -.. sourcecode:: kotlin +.. container:: codeset - object ObligationSchemaV1 : MappedSchema(Obligation::class.java, 1, listOf(ObligationEntity::class.java)) { - @Entity @Table(name = "obligations") - class ObligationEntity(obligation: Obligation) : PersistentState() { - @Column var currency: String = obligation.amount.token.toString() - @Column var amount: Long = obligation.amount.quantity - @Column @Lob var lender: ByteArray = obligation.lender.owningKey.encoded - @Column @Lob var borrower: ByteArray = obligation.borrower.owningKey.encoded - @Column var linear_id: String = obligation.linearId.id.toString() - @Column var defaulted: Bool = obligation.amount.inDefault // NEW COLUNM! + .. sourcecode:: kotlin + + object ObligationSchemaV1 : MappedSchema(Obligation::class.java, 1, listOf(ObligationEntity::class.java)) { + @Entity @Table(name = "obligations") + class ObligationEntity(obligation: Obligation) : PersistentState() { + @Column var currency: String = obligation.amount.token.toString() + @Column var amount: Long = obligation.amount.quantity + @Column @Lob var lender: ByteArray = obligation.lender.owningKey.encoded + @Column @Lob var borrower: ByteArray = obligation.borrower.owningKey.encoded + @Column var linear_id: String = obligation.linearId.id.toString() + @Column var defaulted: Bool = obligation.amount.inDefault // NEW COLUMN! + } + } + + .. sourcecode:: java + + public class ObligationSchemaV1 extends MappedSchema { + public IOUSchemaV1() { + super(Obligation.class, 1, ImmutableList.of(ObligationEntity.class)); + } + + @Entity + @Table(name = "obligations") + public static class ObligationEntity extends PersistentState { + @Column(name = "currency") private final String currency; + @Column(name = "amount") private final Long amount; + @Column(name = "lender") @Lob private final Byte[] lender; + @Column(name = "borrower") @Lob private final Byte[] borrower; + @Column(name = "linear_id") private final UUID linearId; + @Column(name = "defaulted") private final Boolean defaulted; // NEW COLUMN! + + + public ObligationEntity(String currency, Long amount, Byte[] lender, Byte[] borrower, UUID linearId, Boolean defaulted) { + this.currency = currency; + this.amount = amount; + this.lender = lender; + this.borrower = borrower; + this.linearId = linearId; + this.defaulted = defaulted; + } + + public String getCurrency() { + return currency; + } + + public Long getAmount() { + return amount; + } + + public ByteArray getLender() { + return lender; + } + + public ByteArray getBorrower() { + return borrower; + } + + public UUID getId() { + return linearId; + } + + public Boolean isDefaulted() { + return defaulted; + } + } } - } Thus adding a new column with a default value. @@ -397,21 +572,43 @@ used, as changes to the state are required. To make a backwards-incompatible cha because a property was removed from a state object), the procedure is to define another object relational mapper, then add it to the ``supportedSchemas`` property of your ``QueryableState``, like so: -.. sourcecode:: kotlin +.. container:: codeset - override fun supportedSchemas(): Iterable<MappedSchema> = listOf(ExampleSchemaV1, ExampleSchemaV2) + .. sourcecode:: kotlin + + override fun supportedSchemas(): Iterable<MappedSchema> = listOf(ExampleSchemaV1, ExampleSchemaV2) + + .. sourcecode:: java + + @Override public Iterable<MappedSchema> supportedSchemas() { + return ImmutableList.of(new ExampleSchemaV1(), new ExampleSchemaV2()); + } Then, in ``generateMappedObject``, add support for the new schema: -.. sourcecode:: kotlin +.. container:: codeset - override fun generateMappedObject(schema: MappedSchema): PersistentState { - return when (schema) { - is DummyLinearStateSchemaV1 -> // Omitted. - is DummyLinearStateSchemaV2 -> // Omitted. - else -> throw IllegalArgumentException("Unrecognised schema $schema") + .. sourcecode:: kotlin + + override fun generateMappedObject(schema: MappedSchema): PersistentState { + return when (schema) { + is DummyLinearStateSchemaV1 -> // Omitted. + is DummyLinearStateSchemaV2 -> // Omitted. + else -> throw IllegalArgumentException("Unrecognised schema $schema") + } + } + + .. sourcecode:: java + + @Override public PersistentState generateMappedObject(MappedSchema schema) { + if (schema instanceof DummyLinearStateSchemaV1) { + // Omitted. + } else if (schema instanceof DummyLinearStateSchemaV2) { + // Omitted. + } else { + throw new IllegalArgumentException("Unrecognised schema $schema"); + } } - } With this approach, whenever the state object is stored in the vault, a representation of it will be stored in two separate database tables where possible - one for each supported schema. \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt index 7c11cf4bb7..906205b765 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt @@ -1,11 +1,12 @@ package net.corda.nodeapi.internal +import net.corda.core.crypto.toStringShort +import net.corda.core.identity.Party import net.corda.core.messaging.MessageRecipientGroup import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.serialization.CordaSerializable import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.toBase58String import java.security.PublicKey /** @@ -23,7 +24,7 @@ class ArtemisMessagingComponent { const val PEER_USER = "SystemUsers/Peer" const val INTERNAL_PREFIX = "internal." const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." //TODO Come up with better name for common peers/services queue - const val P2P_QUEUE = "p2p.inbound" + const val P2P_PREFIX = "p2p.inbound." const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications" } @@ -49,7 +50,7 @@ class ArtemisMessagingComponent { @CordaSerializable data class NodeAddress(override val queueName: String, override val hostAndPort: NetworkHostAndPort) : ArtemisPeerAddress { constructor(peerIdentity: PublicKey, hostAndPort: NetworkHostAndPort) : - this("$PEERS_PREFIX${peerIdentity.toBase58String()}", hostAndPort) + this("$PEERS_PREFIX${peerIdentity.toStringShort()}", hostAndPort) } /** @@ -62,6 +63,30 @@ class ArtemisMessagingComponent { * @param identity The service identity's owning key. */ data class ServiceAddress(val identity: PublicKey) : ArtemisAddress, MessageRecipientGroup { - override val queueName: String = "$PEERS_PREFIX${identity.toBase58String()}" + override val queueName: String = "$PEERS_PREFIX${identity.toStringShort()}" } + + /** + * [RemoteInboxAddress] implements [SingleMessageRecipient]. It represents the non-local address of a remote inbox. + * @param identity The Node public identity + */ + data class RemoteInboxAddress(val identity: PublicKey) : ArtemisAddress, SingleMessageRecipient { + constructor(party: Party) : this(party.owningKey) + + companion object { + /** + * When transferring a message from the local holding queue to the remote inbox queue + * this method provides a simple translation of the address string. + * The topics are distinct so that proper segregation of internal + * and external access permissions can be made. + */ + fun translateLocalQueueToInboxAddress(address: String): String { + require(address.startsWith(PEERS_PREFIX)) { "Failed to map address: $address to a remote topic as it is not in the $PEERS_PREFIX namespace" } + return P2P_PREFIX + address.substring(PEERS_PREFIX.length) + } + } + + override val queueName: String = "$P2P_PREFIX${identity.toStringShort()}" + } + } diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 32d43cb5e3..e04b6c75c3 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -3,18 +3,18 @@ package net.corda.node.amqp import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.crypto.toStringShort import net.corda.core.internal.div import net.corda.core.node.NodeInfo import net.corda.core.node.services.NetworkMapCache import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.toBase58String import net.corda.node.internal.protonwrapper.netty.AMQPServer import net.corda.node.internal.security.RPCSecurityManager +import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.config.* import net.corda.node.services.messaging.ArtemisMessagingClient import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.nodeapi.internal.ArtemisMessagingComponent -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE import net.corda.nodeapi.internal.crypto.loadKeyStore import net.corda.testing.* import net.corda.testing.internal.rigorousMock @@ -53,7 +53,7 @@ class AMQPBridgeTest { @Test fun `test acked and nacked messages`() { // Create local queue - val sourceQueueName = "internal.peers." + BOB.publicKey.toBase58String() + val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort() val (artemisServer, artemisClient) = createArtemis(sourceQueueName) // Pre-populate local queue with 3 messages @@ -133,11 +133,13 @@ class AMQPBridgeTest { @Test fun `Test legacy bridge still works`() { // Create local queue - val sourceQueueName = "internal.peers." + ALICE.publicKey.toBase58String() + val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort() val (artemisLegacyServer, artemisLegacyClient) = createLegacyArtemis(sourceQueueName) val (artemisServer, artemisClient) = createArtemis(null) + val inbox = ArtemisMessagingComponent.RemoteInboxAddress(BOB.party).queueName + artemisClient.started!!.session.createQueue(inbox, RoutingType.ANYCAST, inbox, true) val artemis = artemisLegacyClient.started!! for (i in 0 until 3) { @@ -150,8 +152,7 @@ class AMQPBridgeTest { artemis.producer.send(sourceQueueName, artemisMessage) } - - val subs = artemisClient.started!!.session.createConsumer(P2P_QUEUE) + val subs = artemisClient.started!!.session.createConsumer(inbox) for (i in 0 until 3) { val msg = subs.receive() val messageBody = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) } @@ -177,9 +178,9 @@ class AMQPBridgeTest { doReturn(true).whenever(it).useAMQPBridges } artemisConfig.configureWithDevSSLCertificate() - val networkMap = rigorousMock<NetworkMapCache>().also { + val networkMap = rigorousMock<NetworkMapCacheInternal>().also { doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed - doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByLegalIdentityKey(any()) + doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any()) } val userService = rigorousMock<RPCSecurityManager>() val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, null, networkMap, userService, MAX_MESSAGE_SIZE) @@ -189,7 +190,7 @@ class AMQPBridgeTest { val artemis = artemisClient.started!! if (sourceQueueName != null) { // Local queue for outgoing messages - artemis.session.createQueue(sourceQueueName, RoutingType.MULTICAST, sourceQueueName, true) + artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true) } return Pair(artemisServer, artemisClient) } @@ -206,9 +207,9 @@ class AMQPBridgeTest { doReturn(ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0))).whenever(it).activeMQServer } artemisConfig.configureWithDevSSLCertificate() - val networkMap = rigorousMock<NetworkMapCache>().also { + val networkMap = rigorousMock<NetworkMapCacheInternal>().also { doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed - doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByLegalIdentityKey(any()) + doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any()) } val userService = rigorousMock<RPCSecurityManager>() val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, null, networkMap, userService, MAX_MESSAGE_SIZE) @@ -217,7 +218,7 @@ class AMQPBridgeTest { artemisClient.start() val artemis = artemisClient.started!! // Local queue for outgoing messages - artemis.session.createQueue(sourceQueueName, RoutingType.MULTICAST, sourceQueueName, true) + artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true) return Pair(artemisServer, artemisClient) } diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index 88c299dd41..6c06fa607c 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -13,11 +13,13 @@ import net.corda.node.internal.protonwrapper.messages.MessageStatus import net.corda.node.internal.protonwrapper.netty.AMQPClient import net.corda.node.internal.protonwrapper.netty.AMQPServer import net.corda.node.internal.security.RPCSecurityManager +import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.messaging.ArtemisMessagingClient import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.crypto.loadKeyStore import net.corda.testing.* @@ -48,7 +50,7 @@ class ProtonWrapperTests { amqpServer.start() val receiveSubs = amqpServer.onReceive.subscribe { assertEquals(BOB_NAME.toString(), it.sourceLegalName) - assertEquals("p2p.inbound", it.topic) + assertEquals(P2P_PREFIX + "Test", it.topic) assertEquals("Test", String(it.payload)) it.complete(true) } @@ -64,7 +66,7 @@ class ProtonWrapperTests { assertEquals(true, clientConnect.connected) assertEquals(ALICE_NAME, CordaX500Name.build(clientConnect.remoteCert!!.subjectX500Principal)) val msg = amqpClient.createMessage("Test".toByteArray(), - "p2p.inbound", + P2P_PREFIX + "Test", ALICE_NAME.toString(), emptyMap()) amqpClient.write(msg) @@ -151,8 +153,8 @@ class ProtonWrapperTests { assertEquals(true, clientConnected.get().connected) assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal)) val artemis = artemisClient.started!! - val sendAddress = "p2p.inbound" - artemis.session.createQueue(sendAddress, RoutingType.MULTICAST, "queue", true) + val sendAddress = P2P_PREFIX + "Test" + artemis.session.createQueue(sendAddress, RoutingType.ANYCAST, "queue", true) val consumer = artemis.session.createConsumer("queue") val testData = "Test".toByteArray() val testProperty = mutableMapOf<Any?, Any?>() @@ -230,7 +232,7 @@ class ProtonWrapperTests { } artemisConfig.configureWithDevSSLCertificate() - val networkMap = rigorousMock<NetworkMapCache>().also { + val networkMap = rigorousMock<NetworkMapCacheInternal>().also { doReturn(never<NetworkMapCache.MapChange>()).whenever(it).changed } val userService = rigorousMock<RPCSecurityManager>() diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt index d1a69c9be5..f723202758 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -5,6 +5,7 @@ import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCConnection import net.corda.core.crypto.generateKeyPair import net.corda.core.crypto.random63BitValue +import net.corda.core.crypto.toStringShort import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowSession import net.corda.core.flows.InitiatedBy @@ -14,14 +15,13 @@ import net.corda.core.identity.Party import net.corda.core.messaging.CordaRPCOps import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.getOrThrow -import net.corda.core.utilities.toBase58String import net.corda.core.utilities.unwrap import net.corda.node.internal.Node import net.corda.node.internal.StartedNode import net.corda.nodeapi.RPCApi import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.testing.ALICE_NAME @@ -79,30 +79,30 @@ abstract class MQSecurityTest : NodeBasedTest() { @Test fun `consume message from P2P queue`() { - assertConsumeAttackFails(P2P_QUEUE) + assertConsumeAttackFails("$P2P_PREFIX${alice.info.chooseIdentity().owningKey.toStringShort()}") } @Test fun `consume message from peer queue`() { val bobParty = startBobAndCommunicateWithAlice() - assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}") + assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}") } @Test fun `send message to address of peer which has been communicated with`() { val bobParty = startBobAndCommunicateWithAlice() - assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}") + assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}") } @Test fun `create queue for peer which has not been communicated with`() { val bob = startNode(BOB_NAME) - assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toBase58String()}") + assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toStringShort()}") } @Test fun `create queue for unknown peer`() { - val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toBase58String()}" + val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toStringShort()}" assertAllQueueCreationAttacksFail(invalidPeerQueue) } diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index f96ce6e97f..195f819e21 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -40,6 +40,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import rx.Scheduler import rx.schedulers.Schedulers +import java.security.PublicKey import java.time.Clock import java.util.concurrent.atomic.AtomicInteger import javax.management.ObjectName @@ -166,11 +167,14 @@ open class Node(configuration: NodeConfiguration, VerifierType.OutOfProcess -> VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, networkParameters.maxMessageSize) VerifierType.InMemory -> null } + require(info.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" } + val serviceIdentity: PublicKey? = if (info.legalIdentities.size == 1) null else info.legalIdentities[1].owningKey return P2PMessagingClient( configuration, versionInfo, serverAddress, info.legalIdentities[0].owningKey, + serviceIdentity, serverThread, database, advertisedAddress, diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 7a4f2f669d..e0094054a1 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -29,6 +29,10 @@ interface NetworkMapCacheBaseInternal : NetworkMapCacheBase { fun getNodeByHash(nodeHash: SecureHash): NodeInfo? + /** Find nodes from the [PublicKey] toShortString representation. + * This is used for Artemis bridge lookup process. */ + fun getNodesByOwningKeyIndex(identityKeyIndex: String): List<NodeInfo> + /** Adds a node to the local cache (generally only used for adding ourselves). */ fun addNode(node: NodeInfo) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/AMQPBridgeManager.kt b/node/src/main/kotlin/net/corda/node/services/messaging/AMQPBridgeManager.kt index 8b285bf999..93b1137a00 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/AMQPBridgeManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/AMQPBridgeManager.kt @@ -12,8 +12,8 @@ import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.messaging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress import net.corda.nodeapi.internal.crypto.loadKeyStore import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE @@ -132,7 +132,8 @@ internal class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress: properties[key.toString()] = value } log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } - val sendableMessage = amqpClient.createMessage(data, P2P_QUEUE, + val peerInbox = translateLocalQueueToInboxAddress(queueName) + val sendableMessage = amqpClient.createMessage(data, peerInbox, legalNames.first().toString(), properties) sendableMessage.onComplete.then { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 285d4ce94c..7eacea9520 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -8,16 +8,15 @@ import net.corda.core.internal.div import net.corda.core.internal.noneOrSingle import net.corda.core.internal.uncheckedCast import net.corda.core.node.NodeInfo -import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug -import net.corda.core.utilities.parsePublicKeyBase58 import net.corda.node.internal.Node import net.corda.node.internal.security.Password import net.corda.node.internal.security.RPCSecurityManager +import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE @@ -31,7 +30,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisPeerAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress @@ -94,7 +93,7 @@ import javax.security.cert.CertificateException class ArtemisMessagingServer(private val config: NodeConfiguration, private val p2pPort: Int, val rpcPort: Int?, - val networkMapCache: NetworkMapCache, + val networkMapCache: NetworkMapCacheInternal, val securityManager: RPCSecurityManager, val maxMessageSize: Int) : SingletonSerializeAsToken() { companion object { @@ -191,7 +190,6 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, // by having its password be an unknown securely random 128-bit value. clusterPassword = BigInteger(128, newSecureRandom()).toString(16) queueConfigurations = listOf( - queueConfig(P2P_QUEUE, durable = true), // Create an RPC queue: this will service locally connected clients only (not via a bridge) and those // clients must have authenticated. We could use a single consumer for everything and perhaps we should, // but these queues are not worth persisting. @@ -243,7 +241,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, private fun ConfigurationImpl.configureAddressSecurity(): Pair<Configuration, LoginListener> { val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true) securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node - securityRoles[P2P_QUEUE] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true)) + securityRoles["$P2P_PREFIX#"] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true)) securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true)) // Each RPC user must have its own role and its own queue. This prevents users accessing each other's queues // and stealing RPC responses. @@ -309,8 +307,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, if (queueName.startsWith(PEERS_PREFIX)) { try { - val identity = parsePublicKeyBase58(queueName.substring(PEERS_PREFIX.length)) - val nodeInfos = networkMapCache.getNodesByLegalIdentityKey(identity) + val nodeInfos = networkMapCache.getNodesByOwningKeyIndex(queueName.substring(PEERS_PREFIX.length)) if (nodeInfos.isNotEmpty()) { nodeInfos.forEach { deployBridgeToPeer(it) } } else { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/CoreBridgeManager.kt b/node/src/main/kotlin/net/corda/node/services/messaging/CoreBridgeManager.kt index 4161d89836..b821d13cff 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/CoreBridgeManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/CoreBridgeManager.kt @@ -10,15 +10,17 @@ import net.corda.node.services.config.NodeConfiguration import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.internal.ArtemisMessagingComponent -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress import net.corda.nodeapi.internal.crypto.X509Utilities +import org.apache.activemq.artemis.api.core.Message import org.apache.activemq.artemis.core.config.BridgeConfiguration import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants import org.apache.activemq.artemis.core.server.ActiveMQServer +import org.apache.activemq.artemis.core.server.cluster.Transformer import org.apache.activemq.artemis.spi.core.remoting.* import org.apache.activemq.artemis.utils.ConfigurationHelper import java.time.Duration @@ -46,7 +48,7 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ /** - * All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving + * All nodes are expected to have a public facing address called p2p.inbound.$identity for receiving * messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it, * as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's * P2P address. @@ -64,7 +66,6 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ activeMQServer.deployBridge(BridgeConfiguration().apply { name = getBridgeName(queueName, target) this.queueName = queueName - forwardingAddress = P2P_QUEUE staticConnectors = listOf(target.toString()) confirmationWindowSize = 100000 // a guess isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic @@ -77,6 +78,7 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ // our TLS certificate. user = PEER_USER password = PEER_USER + transformerClassName = InboxTopicTransformer::class.java.name }) } @@ -99,6 +101,13 @@ internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServ } } +class InboxTopicTransformer : Transformer { + override fun transform(message: Message): Message { + message.address = translateLocalQueueToInboxAddress(message.address) + return message + } +} + class VerifyingNettyConnectorFactory : NettyConnectorFactory() { override fun createConnector(configuration: MutableMap<String, Any>, handler: BufferHandler?, diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 0e305c6f4c..b103254608 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -19,7 +19,6 @@ import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.node.utilities.PersistentMap import net.corda.nodeapi.internal.ArtemisMessagingComponent.* -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException @@ -54,21 +53,28 @@ import javax.persistence.Lob * invoke methods on the provided implementation. There is more documentation on this in the docsite and the * CordaRPCClient class. * - * @param serverAddress The address of the broker instance to connect to (might be running in the same process). - * @param myIdentity The public key to be used as the ArtemisMQ address and queue name for the node. - * @param nodeExecutor An executor to run received message tasks upon. - * @param advertisedAddress The node address for inbound connections, advertised to the network map service and peers. - * If not provided, will default to [serverAddress]. + * @param config The configuration of the node, which is used for controlling the message redelivery options. + * @param versionInfo All messages from the node carry the version info and received messages are checked against this for compatibility. + * @param serverAddress The host and port of the Artemis broker. + * @param myIdentity The primary identity of the node, which defines the messaging address for externally received messages. + * It is also used to construct the myAddress field, which is ultimately advertised in the network map. + * @param serviceIdentity An optional second identity if the node is also part of a group address, for example a notary. + * @param nodeExecutor The received messages are marshalled onto the server executor to prevent Netty buffers leaking during fiber suspends. + * @param database The nodes database, which is used to deduplicate messages. + * @param advertisedAddress The externally advertised version of the Artemis broker address used to construct myAddress and included + * in the network map data. + * @param maxMessageSize A bound applied to the message size. */ @ThreadSafe class P2PMessagingClient(config: NodeConfiguration, private val versionInfo: VersionInfo, serverAddress: NetworkHostAndPort, private val myIdentity: PublicKey, + private val serviceIdentity: PublicKey?, private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor, private val database: CordaPersistence, advertisedAddress: NetworkHostAndPort = serverAddress, - private val maxMessageSize: Int + maxMessageSize: Int ) : SingletonSerializeAsToken(), MessagingService { companion object { private val log = contextLogger() @@ -126,6 +132,7 @@ class P2PMessagingClient(config: NodeConfiguration, private class InnerState { var running = false var p2pConsumer: ClientConsumer? = null + var serviceConsumer: ClientConsumer? = null } private val messagesToRedeliver = database.transaction { @@ -181,8 +188,20 @@ class P2PMessagingClient(config: NodeConfiguration, fun start() { state.locked { val session = artemis.start().session + val inbox = RemoteInboxAddress(myIdentity).queueName // Create a queue, consumer and producer for handling P2P network messages. - p2pConsumer = session.createConsumer(P2P_QUEUE) + createQueueIfAbsent(inbox) + p2pConsumer = session.createConsumer(inbox) + if (serviceIdentity != null) { + val serviceAddress = RemoteInboxAddress(serviceIdentity).queueName + createQueueIfAbsent(serviceAddress) + val serviceHandler = session.createConsumer(serviceAddress) + serviceHandler.setMessageHandler { msg -> + val message: ReceivedMessage? = artemisToCordaMessage(msg) + if (message != null) + deliver(msg, message) + } + } } resumeMessageRedelivery() @@ -331,6 +350,13 @@ class P2PMessagingClient(config: NodeConfiguration, // Ignore it: this can happen if the server has gone away before we do. } p2pConsumer = null + val s = serviceConsumer + try { + s?.close() + } catch (e: ActiveMQObjectClosedException) { + // Ignore it: this can happen if the server has gone away before we do. + } + serviceConsumer = null prevRunning } if (running && !nodeExecutor.isOnThread) { @@ -430,7 +456,7 @@ class P2PMessagingClient(config: NodeConfiguration, private fun getMQAddress(target: MessageRecipients): String { return if (target == myAddress) { // If we are sending to ourselves then route the message directly to our P2P queue. - P2P_QUEUE + RemoteInboxAddress(myIdentity).queueName } else { // Otherwise we send the message to an internal queue for the target residing on our broker. It's then the // broker's job to route the message to the target's P2P queue. @@ -447,7 +473,7 @@ class P2PMessagingClient(config: NodeConfiguration, val queueQuery = session.queueQuery(SimpleString(queueName)) if (!queueQuery.isExists) { log.info("Create fresh queue $queueName bound on same address") - session.createQueue(queueName, RoutingType.MULTICAST, queueName, true) + session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) } } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/VerifierMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/VerifierMessagingClient.kt index 9a8c0cae66..9a8c82c4eb 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/VerifierMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/VerifierMessagingClient.kt @@ -7,15 +7,15 @@ import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.loggerFor import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService -import net.corda.node.utilities.* +import net.corda.node.utilities.AffinityExecutor import net.corda.nodeapi.VerifierApi import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX import net.corda.nodeapi.internal.config.SSLConfiguration import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.client.* -import java.util.concurrent.* +import org.apache.activemq.artemis.api.core.client.ClientConsumer +import java.util.concurrent.TimeUnit class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHostAndPort, metrics: MetricRegistry, private val maxMessageSize: Int) : SingletonSerializeAsToken() { companion object { @@ -40,7 +40,7 @@ class VerifierMessagingClient(config: SSLConfiguration, serverAddress: NetworkHo val queueQuery = session.queueQuery(SimpleString(queueName)) if (!queueQuery.isExists) { log.info("Create fresh queue $queueName bound on same address") - session.createQueue(queueName, RoutingType.MULTICAST, queueName, true) + session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) } } createQueueIfAbsent(VERIFICATION_REQUESTS_QUEUE_NAME) diff --git a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt index b086e1e4c6..0e6d2c2993 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt @@ -160,6 +160,12 @@ open class PersistentNetworkMapCache( private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } }) + override fun getNodesByOwningKeyIndex(identityKeyIndex: String): List<NodeInfo> { + return database.transaction { + queryByIdentityKeyIndex(session, identityKeyIndex) + } + } + override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = database.transaction { queryByAddress(session, address) } override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = identityByLegalNameCache.get(name).orElse(null) @@ -245,15 +251,23 @@ open class PersistentNetworkMapCache( } private fun findByIdentityKey(session: Session, identityKey: PublicKey): List<NodeInfoSchemaV1.PersistentNodeInfo> { + return findByIdentityKeyIndex(session, identityKey.toStringShort()) + } + + private fun findByIdentityKeyIndex(session: Session, identityKeyIndex: String): List<NodeInfoSchemaV1.PersistentNodeInfo> { val query = session.createQuery( "SELECT n FROM ${NodeInfoSchemaV1.PersistentNodeInfo::class.java.name} n JOIN n.legalIdentitiesAndCerts l WHERE l.owningKeyHash = :owningKeyHash", NodeInfoSchemaV1.PersistentNodeInfo::class.java) - query.setParameter("owningKeyHash", identityKey.toStringShort()) + query.setParameter("owningKeyHash", identityKeyIndex) return query.resultList } private fun queryByIdentityKey(session: Session, identityKey: PublicKey): List<NodeInfo> { - val result = findByIdentityKey(session, identityKey) + return queryByIdentityKeyIndex(session, identityKey.toStringShort()) + } + + private fun queryByIdentityKeyIndex(session: Session, identityKeyIndex: String): List<NodeInfo> { + val result = findByIdentityKeyIndex(session, identityKeyIndex) return result.map { it.toNodeInfo() } } diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index 1fb6840b71..255e47bcbc 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -178,6 +178,7 @@ class ArtemisMessagingTests { MOCK_VERSION_INFO.copy(platformVersion = platformVersion), server, identity.public, + null, ServiceAffinityExecutor("ArtemisMessagingTests", 1), database, maxMessageSize = maxMessageSize).apply {