From 0ff9c9e2e3bac340aa7c2275d38880c9af815be8 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Thu, 25 Jan 2018 17:53:34 +0000 Subject: [PATCH 1/2] Move to message based bridge control protocol (#2410) Tidy up Remove dead RPCSecurity logic from ArtemisMessageServer Address PR comments Address PR comments --- docs/source/changelog.rst | 3 + docs/source/corda-configuration-file.rst | 3 - .../internal/ArtemisMessagingComponent.kt | 2 + .../nodeapi/internal/BridgeControlMessages.kt | 50 +++++ .../net/corda/node/amqp/AMQPBridgeTest.kt | 107 +++-------- .../net/corda/node/amqp/ProtonWrapperTests.kt | 9 +- .../kotlin/net/corda/node/internal/Node.kt | 10 +- .../engine/ConnectionStateMachine.kt | 1 + .../node/services/config/NodeConfiguration.kt | 2 - .../services/messaging/AMQPBridgeManager.kt | 11 +- .../messaging/ArtemisMessagingServer.kt | 95 +--------- .../messaging/BridgeControlListener.kt | 116 ++++++++++++ .../node/services/messaging/BridgeManager.kt | 6 +- .../services/messaging/CoreBridgeManager.kt | 178 ------------------ .../services/messaging/P2PMessagingClient.kt | 135 ++++++++++++- node/src/main/resources/reference.conf | 1 - .../messaging/ArtemisMessagingTest.kt | 9 +- .../kotlin/net/corda/testing/node/MockNode.kt | 2 - 18 files changed, 352 insertions(+), 388 deletions(-) create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/BridgeControlMessages.kt create mode 100644 node/src/main/kotlin/net/corda/node/services/messaging/BridgeControlListener.kt delete mode 100644 node/src/main/kotlin/net/corda/node/services/messaging/CoreBridgeManager.kt diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 210e6a99d2..0878044a79 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -165,6 +165,9 @@ UNRELEASED * The ability for CordaServices to register callbacks so they can be notified of shutdown and clean up resource such as open ports. +* Move to a message based control of peer to peer bridge formation to allow for future out of process bridging components. + This removes the legacy Artemis bridges completely, so the ``useAMQPBridges`` configuration property has been removed. + .. _changelog_v1: Release 1.0 diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index a32613acc7..9dd048a719 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -188,9 +188,6 @@ path to the node's base directory. :exportJMXTo: If set to ``http``, will enable JMX metrics reporting via the Jolokia HTTP/JSON agent. Default Jolokia access url is http://127.0.0.1:7005/jolokia/ -:useAMQPBridges: Optionally can be set to ``false`` to use Artemis CORE Bridges for peer-to-peer communications. - Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes. - :transactionCacheSizeMegaBytes: Optionally specify how much memory should be used for caching of ledger transactions in memory. Otherwise defaults to 8MB plus 5% of all heap memory above 300MB. diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt index 906205b765..cd9ae5bc6d 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingComponent.kt @@ -25,6 +25,8 @@ class ArtemisMessagingComponent { const val INTERNAL_PREFIX = "internal." const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." //TODO Come up with better name for common peers/services queue const val P2P_PREFIX = "p2p.inbound." + const val BRIDGE_CONTROL = "${INTERNAL_PREFIX}bridge.control" + const val BRIDGE_NOTIFY = "${INTERNAL_PREFIX}bridge.notify" const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications" } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/BridgeControlMessages.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/BridgeControlMessages.kt new file mode 100644 index 0000000000..a9ec02a109 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/BridgeControlMessages.kt @@ -0,0 +1,50 @@ +package net.corda.nodeapi.internal + +import net.corda.core.identity.CordaX500Name +import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.NetworkHostAndPort + +/** + * The information required to construct a bridge to a remote peer. + * @property queueName The local source queue from which to move messages. + * @property targets The list of TCP connection targets on which the peer resides + * @property legalNames The list of acceptable [CordaX500Name] names that should be presented as subject of the validated peer TLS certificate. + */ +@CordaSerializable +data class BridgeEntry(val queueName: String, val targets: List, val legalNames: List) + +sealed class BridgeControl { + /** + * This message is sent on node start to inform any bridges of valid inbound peer-to-peer topics and pre-existing outbound queues needing bridging. + * @property nodeIdentity This is used for informational purposes to identify the originating node instance. + * @property inboxQueues The list of P2P inbox queue names/addresses, which could be used to filter inbound messages and prevent any identity spoofing. + * @property sendQueues The list [BridgeEntry] for all pre-existing local queues requiring a bridge to a remote peer. + */ + @CordaSerializable + data class NodeToBridgeSnapshot(val nodeIdentity: String, val inboxQueues: List, val sendQueues: List) : BridgeControl() + + /** + * This message is sent on bridge start to re-request NodeToBridgeSnapshot information from all nodes on the broker. + * @property bridgeIdentity This is used for informational purposes to identify the originating bridge instance. + */ + @CordaSerializable + data class BridgeToNodeSnapshotRequest(val bridgeIdentity: String) : BridgeControl() + + /** + * This message is sent to any active bridges to create a new bridge if one does not already exist. It may also be sent if updated + * information arrives from the network map to allow connection details of a pre-existing queue to now be resolved. + * @property nodeIdentity This is used for informational purposes to identify the originating node instance. + * @property bridgeInfo The connection details of the new bridge. + */ + @CordaSerializable + data class Create(val nodeIdentity: String, val bridgeInfo: BridgeEntry) : BridgeControl() + + /** + * This message is sent to any active bridges to tear down an existing bridge. Typically this is only done when there is a change in network map details for a peer. + * The source queue is not affected by this operation and it is the responsibility of the node to ensure there are no unsent messages and to delete the durable queue. + * @property nodeIdentity This is used for informational purposes to identify the originating node instance. + * @property bridgeInfo The connection details of the bridge to be removed + */ + @CordaSerializable + data class Delete(val nodeIdentity: String, val bridgeInfo: BridgeEntry) : BridgeControl() +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index faca545bdc..38abdaa958 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -1,18 +1,18 @@ package net.corda.node.amqp -import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever import net.corda.core.crypto.toStringShort import net.corda.core.internal.div -import net.corda.core.node.NodeInfo -import net.corda.core.node.services.NetworkMapCache import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.protonwrapper.netty.AMQPServer -import net.corda.node.services.api.NetworkMapCacheInternal -import net.corda.node.services.config.* +import net.corda.node.services.config.CertChainPolicyConfig +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.node.services.messaging.AMQPBridgeManager import net.corda.node.services.messaging.ArtemisMessagingClient import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.node.services.messaging.BridgeManager import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.testing.core.* import net.corda.testing.internal.rigorousMock @@ -23,7 +23,6 @@ import org.junit.Assert.assertArrayEquals import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder -import rx.Observable import java.util.* import kotlin.test.assertEquals import kotlin.test.assertNotEquals @@ -49,7 +48,7 @@ class AMQPBridgeTest { fun `test acked and nacked messages`() { // Create local queue val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort() - val (artemisServer, artemisClient) = createArtemis(sourceQueueName) + val (artemisServer, artemisClient, bridgeManager) = createArtemis(sourceQueueName) // Pre-populate local queue with 3 messages val artemis = artemisClient.started!! @@ -114,54 +113,25 @@ class AMQPBridgeTest { } artemis.producer.send(sourceQueueName, artemisMessage) - val received5 = receive.next() - val messageID5 = received5.applicationProperties["CountProp"] as Int - assertArrayEquals("Test_end".toByteArray(), received5.payload) - assertEquals(-1, messageID5) // next message should be in order - received5.complete(true) + while (true) { + val received5 = receive.next() + val messageID5 = received5.applicationProperties["CountProp"] as Int + if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip + assertEquals(-1, messageID5) // next message should be in order though + assertArrayEquals("Test_end".toByteArray(), received5.payload) + break + } + received5.complete(true) + } + + bridgeManager.stop() amqpServer.stop() artemisClient.stop() artemisServer.stop() } - @Test - fun `Test legacy bridge still works`() { - // Create local queue - val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort() - val (artemisLegacyServer, artemisLegacyClient) = createLegacyArtemis(sourceQueueName) - - - val (artemisServer, artemisClient) = createArtemis(null) - val inbox = ArtemisMessagingComponent.RemoteInboxAddress(BOB.party).queueName - artemisClient.started!!.session.createQueue(inbox, RoutingType.ANYCAST, inbox, true) - - val artemis = artemisLegacyClient.started!! - for (i in 0 until 3) { - val artemisMessage = artemis.session.createMessage(true).apply { - putIntProperty("CountProp", i) - writeBodyBufferBytes("Test$i".toByteArray()) - // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) - } - artemis.producer.send(sourceQueueName, artemisMessage) - } - - val subs = artemisClient.started!!.session.createConsumer(inbox) - for (i in 0 until 3) { - val msg = subs.receive() - val messageBody = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) } - assertArrayEquals("Test$i".toByteArray(), messageBody) - assertEquals(i, msg.getIntProperty("CountProp")) - } - - artemisClient.stop() - artemisServer.stop() - artemisLegacyClient.stop() - artemisLegacyServer.stop() - } - - private fun createArtemis(sourceQueueName: String?): Pair { + private fun createArtemis(sourceQueueName: String?): Triple { val artemisConfig = rigorousMock().also { doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory doReturn(ALICE_NAME).whenever(it).myLegalName @@ -170,50 +140,21 @@ class AMQPBridgeTest { doReturn(artemisAddress).whenever(it).p2pAddress doReturn("").whenever(it).exportJMXto doReturn(emptyList()).whenever(it).certificateChainCheckPolicies - doReturn(true).whenever(it).useAMQPBridges } artemisConfig.configureWithDevSSLCertificate() - val networkMap = rigorousMock().also { - doReturn(Observable.never()).whenever(it).changed - doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any()) - } - val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, networkMap, MAX_MESSAGE_SIZE) + val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, MAX_MESSAGE_SIZE) val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE) artemisServer.start() artemisClient.start() + val bridgeManager = AMQPBridgeManager(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE) + bridgeManager.start() val artemis = artemisClient.started!! if (sourceQueueName != null) { // Local queue for outgoing messages artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true) + bridgeManager.deployBridge(sourceQueueName, amqpAddress, setOf(BOB.name)) } - return Pair(artemisServer, artemisClient) - } - - private fun createLegacyArtemis(sourceQueueName: String): Pair { - val artemisConfig = rigorousMock().also { - doReturn(temporaryFolder.root.toPath() / "artemis2").whenever(it).baseDirectory - doReturn(BOB_NAME).whenever(it).myLegalName - doReturn("trustpass").whenever(it).trustStorePassword - doReturn("cordacadevpass").whenever(it).keyStorePassword - doReturn(artemisAddress).whenever(it).p2pAddress - doReturn("").whenever(it).exportJMXto - doReturn(emptyList()).whenever(it).certificateChainCheckPolicies - doReturn(false).whenever(it).useAMQPBridges - doReturn(ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0))).whenever(it).activeMQServer - } - artemisConfig.configureWithDevSSLCertificate() - val networkMap = rigorousMock().also { - doReturn(Observable.never()).whenever(it).changed - doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any()) - } - val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, networkMap, MAX_MESSAGE_SIZE) - val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress2, MAX_MESSAGE_SIZE) - artemisServer.start() - artemisClient.start() - val artemis = artemisClient.started!! - // Local queue for outgoing messages - artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true) - return Pair(artemisServer, artemisClient) + return Triple(artemisServer, artemisClient, bridgeManager) } private fun createAMQPServer(): AMQPServer { diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index 002eb72725..783b8cf533 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -6,13 +6,11 @@ import io.netty.channel.EventLoopGroup import io.netty.channel.nio.NioEventLoopGroup import net.corda.core.identity.CordaX500Name import net.corda.core.internal.div -import net.corda.core.node.services.NetworkMapCache import net.corda.core.toFuture import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.protonwrapper.messages.MessageStatus import net.corda.node.internal.protonwrapper.netty.AMQPClient import net.corda.node.internal.protonwrapper.netty.AMQPServer -import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate @@ -27,7 +25,6 @@ import org.junit.Assert.assertArrayEquals import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder -import rx.Observable.never import kotlin.test.assertEquals class ProtonWrapperTests { @@ -227,14 +224,10 @@ class ProtonWrapperTests { doReturn(NetworkHostAndPort("0.0.0.0", artemisPort)).whenever(it).p2pAddress doReturn("").whenever(it).exportJMXto doReturn(emptyList()).whenever(it).certificateChainCheckPolicies - doReturn(true).whenever(it).useAMQPBridges } artemisConfig.configureWithDevSSLCertificate() - val networkMap = rigorousMock().also { - doReturn(never()).whenever(it).changed - } - val server = ArtemisMessagingServer(artemisConfig, artemisPort, networkMap, MAX_MESSAGE_SIZE) + val server = ArtemisMessagingServer(artemisConfig, artemisPort, MAX_MESSAGE_SIZE) val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE) server.start() client.start() diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index b2714cb4fa..c80d5cebb1 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -137,6 +137,7 @@ open class Node(configuration: NodeConfiguration, override lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor private var messageBroker: ArtemisMessagingServer? = null + private var bridgeControlListener: BridgeControlListener? = null private var rpcBroker: ArtemisBroker? = null private var shutdownHook: ShutdownHook? = null @@ -152,6 +153,7 @@ open class Node(configuration: NodeConfiguration, val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker() val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) else startLocalRpcBroker() val advertisedAddress = info.addresses.single() + bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize) printBasicNodeInfo("Incoming connection address", advertisedAddress.toString()) rpcServerAddresses?.let { @@ -171,6 +173,7 @@ open class Node(configuration: NodeConfiguration, serviceIdentity, serverThread, database, + services.networkMapCache, advertisedAddress, networkParameters.maxMessageSize) } @@ -194,7 +197,7 @@ open class Node(configuration: NodeConfiguration, private fun makeLocalMessageBroker(): NetworkHostAndPort { with(configuration) { - messageBroker = ArtemisMessagingServer(this, p2pAddress.port, services.networkMapCache, networkParameters.maxMessageSize) + messageBroker = ArtemisMessagingServer(this, p2pAddress.port, networkParameters.maxMessageSize) return NetworkHostAndPort("localhost", p2pAddress.port) } } @@ -253,6 +256,11 @@ open class Node(configuration: NodeConfiguration, runOnStop += this::close start() } + // Start P2P bridge service + bridgeControlListener?.apply { + runOnStop += this::stop + start() + } // Start up the MQ clients. rpcMessagingClient?.run { runOnStop += this::close diff --git a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/ConnectionStateMachine.kt index 8ae8acda99..6812f3fc70 100644 --- a/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node/src/main/kotlin/net/corda/node/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -402,6 +402,7 @@ internal class ConnectionStateMachine(serverMode: Boolean, private fun encodePayloadBytes(msg: SendableMessageImpl): ByteBuf { val message = Proton.message() as ProtonJMessage message.body = Data(Binary(msg.payload)) + message.isDurable = true message.properties = Properties() val appProperties = HashMap(msg.applicationProperties) //TODO We shouldn't have to do this, but Artemis Server doesn't set the header on AMQP packets. diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index f8316b466a..4af6cb07e4 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -45,7 +45,6 @@ interface NodeConfiguration : NodeSSLConfiguration { val detectPublicIp: Boolean get() = true val sshd: SSHDConfiguration? val database: DatabaseConfig - val useAMQPBridges: Boolean get() = true val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize val attachmentContentCacheSizeBytes: Long get() = defaultAttachmentContentCacheSize val attachmentCacheBound: Long get() = defaultAttachmentCacheBound @@ -137,7 +136,6 @@ data class NodeConfigurationImpl( override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(), override val sshd: SSHDConfiguration? = null, override val database: DatabaseConfig = DatabaseConfig(initialiseSchema = devMode, exportHibernateJMXStatistics = devMode), - override val useAMQPBridges: Boolean = true, private val transactionCacheSizeMegaBytes: Int? = null, private val attachmentContentCacheSizeMegaBytes: Int? = null, override val attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/AMQPBridgeManager.kt b/node/src/main/kotlin/net/corda/node/services/messaging/AMQPBridgeManager.kt index 95289fe763..a6e2dbe7a5 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/AMQPBridgeManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/AMQPBridgeManager.kt @@ -3,6 +3,7 @@ package net.corda.node.services.messaging import io.netty.channel.EventLoopGroup import io.netty.channel.nio.NioEventLoopGroup import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.VisibleForTesting import net.corda.core.node.NodeInfo import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.debug @@ -32,7 +33,8 @@ import kotlin.concurrent.withLock * independent Session for message consumption. * The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager. */ -internal class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress: NetworkHostAndPort, val maxMessageSize: Int) : BridgeManager { +@VisibleForTesting +class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress: NetworkHostAndPort, val maxMessageSize: Int) : BridgeManager { private val lock = ReentrantLock() private val bridgeNameToBridgeMap = mutableMapOf() @@ -177,6 +179,13 @@ internal class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress: } } + override fun destroyBridge(queueName: String, hostAndPort: NetworkHostAndPort) { + lock.withLock { + val bridge = bridgeNameToBridgeMap.remove(getBridgeName(queueName, hostAndPort)) + bridge?.stop() + } + } + override fun bridgeExists(bridgeName: String): Boolean = lock.withLock { bridgeNameToBridgeMap.containsKey(bridgeName) } override fun start() { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 21b98e5538..f6837b80c1 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -1,13 +1,9 @@ package net.corda.node.services.messaging -import net.corda.core.crypto.AddressFormatException -import net.corda.core.identity.CordaX500Name import net.corda.core.internal.ThreadBox import net.corda.core.internal.div import net.corda.core.internal.noneOrSingle import net.corda.core.internal.uncheckedCast -import net.corda.core.node.NodeInfo -import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger @@ -17,7 +13,6 @@ import net.corda.node.internal.artemis.ArtemisBroker import net.corda.node.internal.artemis.BrokerAddresses import net.corda.node.internal.artemis.CertificateChainCheckPolicy import net.corda.node.internal.artemis.SecureArtemisConfiguration -import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE @@ -25,12 +20,10 @@ import net.corda.node.services.messaging.NodeLoginModule.Companion.VERIFIER_ROLE import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.VerifierApi -import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisPeerAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress import net.corda.nodeapi.internal.requireOnDefaultFileSystem @@ -47,7 +40,6 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal -import rx.Subscription import java.io.IOException import java.security.KeyStoreException import java.security.Principal @@ -79,7 +71,6 @@ import javax.security.auth.spi.LoginModule @ThreadSafe class ArtemisMessagingServer(private val config: NodeConfiguration, private val p2pPort: Int, - val networkMapCache: NetworkMapCacheInternal, val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() { companion object { private val log = contextLogger() @@ -91,29 +82,19 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, private val mutex = ThreadBox(InnerState()) private lateinit var activeMQServer: ActiveMQServer override val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl - private var networkChangeHandle: Subscription? = null - private lateinit var bridgeManager: BridgeManager init { config.baseDirectory.requireOnDefaultFileSystem() } - /** - * The server will make sure the bridge exists on network map changes, see method [updateBridgesOnNetworkChange] - * We assume network map will be updated accordingly when the client node register with the network map. - */ override fun start() = mutex.locked { if (!running) { configureAndStartServer() - networkChangeHandle = networkMapCache.changed.subscribe { updateBridgesOnNetworkChange(it) } running = true } } override fun stop() = mutex.locked { - bridgeManager.close() - networkChangeHandle?.unsubscribe() - networkChangeHandle = null activeMQServer.stop() running = false } @@ -134,17 +115,11 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, registerActivationFailureListener { exception -> throw exception } // Some types of queue might need special preparation on our side, like dialling back or preparing // a lazily initialised subsystem. - registerPostQueueCreationCallback { deployBridgesFromNewQueue(it.toString()) } + registerPostQueueCreationCallback { log.debug { "Queue Created: $it" } } registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } } } // Config driven switch between legacy CORE bridges and the newer AMQP protocol bridges. - bridgeManager = if (config.useAMQPBridges) { - AMQPBridgeManager(config, NetworkHostAndPort("localhost", p2pPort), maxMessageSize) - } else { - CoreBridgeManager(config, activeMQServer) - } activeMQServer.start() - bridgeManager.start() Node.printBasicNodeInfo("Listening on port", p2pPort.toString()) } @@ -223,76 +198,8 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, return ActiveMQJAASSecurityManager(NodeLoginModule::class.java.name, securityConfig) } - private fun deployBridgesFromNewQueue(queueName: String) { - log.debug { "Queue created: $queueName, deploying bridge(s)" } - fun deployBridgeToPeer(nodeInfo: NodeInfo) { - log.debug("Deploying bridge for $queueName to $nodeInfo") - val address = nodeInfo.addresses.single() - bridgeManager.deployBridge(queueName, address, nodeInfo.legalIdentitiesAndCerts.map { it.name }.toSet()) - } - - if (queueName.startsWith(PEERS_PREFIX)) { - try { - val nodeInfos = networkMapCache.getNodesByOwningKeyIndex(queueName.substring(PEERS_PREFIX.length)) - if (nodeInfos.isNotEmpty()) { - nodeInfos.forEach { deployBridgeToPeer(it) } - } else { - log.error("Queue created for a peer that we don't know from the network map: $queueName") - } - } catch (e: AddressFormatException) { - log.error("Flow violation: Could not parse peer queue name as Base 58: $queueName") - } - } - } - - /** - * The bridge will be created automatically when the queues are created, however, this is not the case when the network map restarted. - * The queues are restored from the journal, and because the queues are added before we register the callback handler, this method will never get called for existing queues. - * This results in message queues up and never get send out. (https://github.com/corda/corda/issues/37) - * - * We create the bridges indirectly now because the network map is not persisted and there are no ways to obtain host and port information on startup. - * TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service. - */ - private fun updateBridgesOnNetworkChange(change: MapChange) { - log.debug { "Updating bridges on network map change: ${change.node}" } - fun gatherAddresses(node: NodeInfo): Sequence { - val address = node.addresses.single() - return node.legalIdentitiesAndCerts.map { NodeAddress(it.party.owningKey, address) }.asSequence() - } - - fun deployBridges(node: NodeInfo) { - gatherAddresses(node) - .filter { queueExists(it.queueName) && !bridgeManager.bridgeExists(it.bridgeName) } - .forEach { deployBridge(it, node.legalIdentitiesAndCerts.map { it.name }.toSet()) } - } - - when (change) { - is MapChange.Added -> { - deployBridges(change.node) - } - is MapChange.Removed -> { - bridgeManager.destroyBridges(change.node) - } - is MapChange.Modified -> { - // TODO Figure out what has actually changed and only destroy those bridges that need to be. - bridgeManager.destroyBridges(change.previousNode) - deployBridges(change.node) - } - } - } - - private fun deployBridge(address: ArtemisPeerAddress, legalNames: Set) { - bridgeManager.deployBridge(address.queueName, address.hostAndPort, legalNames) - } - private fun createTcpTransport(connectionDirection: ConnectionDirection, host: String, port: Int, enableSSL: Boolean = true) = ArtemisTcpTransport.tcpTransport(connectionDirection, NetworkHostAndPort(host, port), config, enableSSL = enableSSL) - - private fun queueExists(queueName: String): Boolean = activeMQServer.queueQuery(SimpleString(queueName)).isExists - - private val ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort) - - private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort" } /** diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/BridgeControlListener.kt b/node/src/main/kotlin/net/corda/node/services/messaging/BridgeControlListener.kt new file mode 100644 index 0000000000..f7706b6b9d --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/messaging/BridgeControlListener.kt @@ -0,0 +1,116 @@ +package net.corda.node.services.messaging + +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger +import net.corda.node.services.config.NodeConfiguration +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX +import net.corda.nodeapi.internal.BridgeControl +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.ClientConsumer +import org.apache.activemq.artemis.api.core.client.ClientMessage +import java.util.* + +internal class BridgeControlListener(val config: NodeConfiguration, + val p2pAddress: NetworkHostAndPort, + val maxMessageSize: Int) : AutoCloseable { + private val bridgeId: String = UUID.randomUUID().toString() + private val bridgeManager: BridgeManager = AMQPBridgeManager(config, p2pAddress, maxMessageSize) + private val validInboundQueues = mutableSetOf() + private var artemis: ArtemisMessagingClient? = null + private var controlConsumer: ClientConsumer? = null + + companion object { + private val log = contextLogger() + } + + fun start() { + stop() + bridgeManager.start() + val artemis = ArtemisMessagingClient(config, p2pAddress, maxMessageSize) + this.artemis = artemis + artemis.start() + val artemisClient = artemis.started!! + val artemisSession = artemisClient.session + val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId" + artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue) + val control = artemisSession.createConsumer(bridgeControlQueue) + controlConsumer = control + control.setMessageHandler { msg -> + try { + processControlMessage(msg) + } catch (ex: Exception) { + log.error("Unable to process bridge control message", ex) + } + } + val startupMessage = BridgeControl.BridgeToNodeSnapshotRequest(bridgeId).serialize(context = SerializationDefaults.P2P_CONTEXT).bytes + val bridgeRequest = artemisSession.createMessage(false) + bridgeRequest.writeBodyBufferBytes(startupMessage) + artemisClient.producer.send(BRIDGE_NOTIFY, bridgeRequest) + } + + fun stop() { + controlConsumer?.close() + controlConsumer = null + artemis?.stop() + artemis = null + bridgeManager.stop() + } + + override fun close() = stop() + + private fun validateInboxQueueName(queueName: String): Boolean { + return queueName.startsWith(P2P_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists + } + + private fun validateBridgingQueueName(queueName: String): Boolean { + return queueName.startsWith(PEERS_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists + } + + private fun processControlMessage(msg: ClientMessage) { + val data: ByteArray = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) } + val controlMessage = data.deserialize(context = SerializationDefaults.P2P_CONTEXT) + log.info("Received bridge control message $controlMessage") + when (controlMessage) { + is BridgeControl.NodeToBridgeSnapshot -> { + if (!controlMessage.inboxQueues.all { validateInboxQueueName(it) }) { + log.error("Invalid queue names in control message $controlMessage") + return + } + if (!controlMessage.sendQueues.all { validateBridgingQueueName(it.queueName) }) { + log.error("Invalid queue names in control message $controlMessage") + return + } + for (outQueue in controlMessage.sendQueues) { + bridgeManager.deployBridge(outQueue.queueName, outQueue.targets.first(), outQueue.legalNames.toSet()) + } + // TODO For now we just record the inboxes, but we don't use the information, but eventually out of process bridges will use this for validating inbound messages. + validInboundQueues.addAll(controlMessage.inboxQueues) + } + is BridgeControl.BridgeToNodeSnapshotRequest -> { + log.error("Message from Bridge $controlMessage detected on wrong topic!") + } + is BridgeControl.Create -> { + if (!validateBridgingQueueName((controlMessage.bridgeInfo.queueName))) { + log.error("Invalid queue names in control message $controlMessage") + return + } + bridgeManager.deployBridge(controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets.first(), controlMessage.bridgeInfo.legalNames.toSet()) + } + is BridgeControl.Delete -> { + if (!controlMessage.bridgeInfo.queueName.startsWith(PEERS_PREFIX)) { + log.error("Invalid queue names in control message $controlMessage") + return + } + bridgeManager.destroyBridge(controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets.first()) + } + } + } + +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/BridgeManager.kt b/node/src/main/kotlin/net/corda/node/services/messaging/BridgeManager.kt index 5997921848..3d87e10af0 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/BridgeManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/BridgeManager.kt @@ -1,17 +1,21 @@ package net.corda.node.services.messaging import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.VisibleForTesting import net.corda.core.node.NodeInfo import net.corda.core.utilities.NetworkHostAndPort /** * Provides an internal interface that the [ArtemisMessagingServer] delegates to for Bridge activities. */ -internal interface BridgeManager : AutoCloseable { +@VisibleForTesting +interface BridgeManager : AutoCloseable { fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set) fun destroyBridges(node: NodeInfo) + fun destroyBridge(queueName: String, hostAndPort: NetworkHostAndPort) + fun bridgeExists(bridgeName: String): Boolean fun start() diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/CoreBridgeManager.kt b/node/src/main/kotlin/net/corda/node/services/messaging/CoreBridgeManager.kt deleted file mode 100644 index c3c9a1041d..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/messaging/CoreBridgeManager.kt +++ /dev/null @@ -1,178 +0,0 @@ -package net.corda.node.services.messaging - -import io.netty.handler.ssl.SslHandler -import net.corda.core.identity.CordaX500Name -import net.corda.core.internal.uncheckedCast -import net.corda.core.node.NodeInfo -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.contextLogger -import net.corda.node.services.config.NodeConfiguration -import net.corda.nodeapi.ArtemisTcpTransport -import net.corda.nodeapi.ConnectionDirection -import net.corda.nodeapi.internal.ArtemisMessagingComponent -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER -import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress -import net.corda.nodeapi.internal.crypto.X509Utilities -import net.corda.nodeapi.internal.crypto.x509 -import org.apache.activemq.artemis.api.core.Message -import org.apache.activemq.artemis.core.config.BridgeConfiguration -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory -import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants -import org.apache.activemq.artemis.core.server.ActiveMQServer -import org.apache.activemq.artemis.core.server.cluster.Transformer -import org.apache.activemq.artemis.spi.core.remoting.* -import org.apache.activemq.artemis.utils.ConfigurationHelper -import java.time.Duration -import java.util.concurrent.Executor -import java.util.concurrent.ScheduledExecutorService -import javax.security.auth.x500.X500Principal - -/** - * This class simply moves the legacy CORE bridge code from [ArtemisMessagingServer] - * into a class implementing [BridgeManager]. - * It has no lifecycle events, because the bridges are internal to the ActiveMQServer instance and thus - * stop when it is stopped. - */ -internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServer: ActiveMQServer) : BridgeManager { - companion object { - private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort" - - private val ArtemisMessagingComponent.ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort) - } - - private fun gatherAddresses(node: NodeInfo): Sequence { - val address = node.addresses.single() - return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence() - } - - - /** - * All nodes are expected to have a public facing address called p2p.inbound.$identity for receiving - * messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it, - * as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's - * P2P address. - */ - override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set) { - val connectionDirection = ConnectionDirection.Outbound( - connectorFactoryClassName = VerifyingNettyConnectorFactory::class.java.name, - expectedCommonNames = legalNames - ) - val tcpTransport = ArtemisTcpTransport.tcpTransport(connectionDirection, target, config, enableSSL = true) - tcpTransport.params[ArtemisMessagingServer::class.java.name] = this - // We intentionally overwrite any previous connector config in case the peer legal name changed - activeMQServer.configuration.addConnectorConfiguration(target.toString(), tcpTransport) - - activeMQServer.deployBridge(BridgeConfiguration().apply { - name = getBridgeName(queueName, target) - this.queueName = queueName - staticConnectors = listOf(target.toString()) - confirmationWindowSize = 100000 // a guess - isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic - // We keep trying until the network map deems the node unreachable and tells us it's been removed at which - // point we destroy the bridge - retryInterval = config.activeMQServer.bridge.retryIntervalMs - retryIntervalMultiplier = config.activeMQServer.bridge.retryIntervalMultiplier - maxRetryInterval = Duration.ofMinutes(config.activeMQServer.bridge.maxRetryIntervalMin).toMillis() - // As a peer of the target node we must connect to it using the peer user. Actual authentication is done using - // our TLS certificate. - user = PEER_USER - password = PEER_USER - transformerClassName = InboxTopicTransformer::class.java.name - }) - } - - override fun bridgeExists(bridgeName: String): Boolean = activeMQServer.clusterManager.bridges.containsKey(bridgeName) - - override fun start() { - // Nothing to do - } - - override fun stop() { - // Nothing to do - } - - override fun close() = stop() - - override fun destroyBridges(node: NodeInfo) { - gatherAddresses(node).forEach { - activeMQServer.destroyBridge(it.bridgeName) - } - } -} - -class InboxTopicTransformer : Transformer { - override fun transform(message: Message): Message { - message.address = translateLocalQueueToInboxAddress(message.address) - return message - } -} - -class VerifyingNettyConnectorFactory : NettyConnectorFactory() { - override fun createConnector(configuration: MutableMap, - handler: BufferHandler?, - listener: ClientConnectionLifeCycleListener?, - closeExecutor: Executor?, - threadPool: Executor?, - scheduledThreadPool: ScheduledExecutorService?, - protocolManager: ClientProtocolManager?): Connector { - return VerifyingNettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, - protocolManager) - } - - private class VerifyingNettyConnector(configuration: MutableMap, - handler: BufferHandler?, - listener: ClientConnectionLifeCycleListener?, - closeExecutor: Executor?, - threadPool: Executor?, - scheduledThreadPool: ScheduledExecutorService?, - protocolManager: ClientProtocolManager?) : - NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) { - companion object { - private val log = contextLogger() - } - - private val sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration) - - override fun createConnection(): Connection? { - val connection = super.createConnection() as? NettyConnection - if (sslEnabled && connection != null) { - val expectedLegalNames: Set = uncheckedCast(configuration[ArtemisTcpTransport.VERIFY_PEER_LEGAL_NAME] ?: emptySet()) - try { - val session = connection.channel - .pipeline() - .get(SslHandler::class.java) - .engine() - .session - // Checks the peer name is the one we are expecting. - // TODO Some problems here: after introduction of multiple legal identities on the node and removal of the main one, - // we run into the issue, who are we connecting to. There are some solutions to that: advertise `network identity`; - // have mapping port -> identity (but, design doc says about removing SingleMessageRecipient and having just NetworkHostAndPort, - // it was convenient to store that this way); SNI. - val peerLegalName = CordaX500Name.parse(session.peerPrincipal.name) - val expectedLegalName = expectedLegalNames.singleOrNull { it == peerLegalName } - require(expectedLegalName != null) { - "Peer has wrong CN - expected $expectedLegalNames but got $peerLegalName. This is either a fatal " + - "misconfiguration by the remote peer or an SSL man-in-the-middle attack!" - } - // Make sure certificate has the same name. - val peerCertificateName = CordaX500Name.build(X500Principal(session.peerCertificateChain[0].subjectDN.name)) - require(peerCertificateName == expectedLegalName) { - "Peer has wrong subject name in the certificate - expected $expectedLegalNames but got $peerCertificateName. This is either a fatal " + - "misconfiguration by the remote peer or an SSL man-in-the-middle attack!" - } - X509Utilities.validateCertificateChain( - session.localCertificates.last().x509, - session.peerCertificates.x509) - } catch (e: IllegalArgumentException) { - connection.close() - log.error(e.message) - return null - } - } - return connection - } - } -} - diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 9deabfaccd..4e9d495b16 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -1,10 +1,13 @@ package net.corda.node.services.messaging +import net.corda.core.crypto.toStringShort import net.corda.core.identity.CordaX500Name import net.corda.core.internal.ThreadBox import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient +import net.corda.core.node.NodeInfo +import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.PartyInfo import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SingletonSerializeAsToken @@ -15,12 +18,18 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.sequence import net.corda.core.utilities.trace import net.corda.node.VersionInfo +import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.statemachine.StateMachineManagerImpl import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.node.utilities.PersistentMap import net.corda.nodeapi.internal.ArtemisMessagingComponent.* +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX +import net.corda.nodeapi.internal.BridgeControl +import net.corda.nodeapi.internal.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException @@ -29,6 +38,8 @@ import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientSession +import rx.Subscription import java.security.PublicKey import java.time.Instant import java.util.* @@ -74,6 +85,7 @@ class P2PMessagingClient(config: NodeConfiguration, private val serviceIdentity: PublicKey?, private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor, private val database: CordaPersistence, + private val networkMap: NetworkMapCacheInternal, advertisedAddress: NetworkHostAndPort = serverAddress, maxMessageSize: Int ) : SingletonSerializeAsToken(), MessagingService { @@ -135,6 +147,8 @@ class P2PMessagingClient(config: NodeConfiguration, var running = false var p2pConsumer: ClientConsumer? = null var serviceConsumer: ClientConsumer? = null + var bridgeNotifyConsumer: ClientConsumer? = null + var networkChangeSubscription: Subscription? = null } private val messagesToRedeliver = database.transaction { @@ -150,12 +164,13 @@ class P2PMessagingClient(config: NodeConfiguration, private val cordaVendor = SimpleString(versionInfo.vendor) private val releaseVersion = SimpleString(versionInfo.releaseVersion) /** An executor for sending messages */ - private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1) + private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging ${myIdentity.toStringShort()}", 1) override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress) private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong() private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize) private val state = ThreadBox(InnerState()) + private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap()) private val handlers = CopyOnWriteArrayList() private val processedMessages = createProcessedMessage() @@ -191,11 +206,13 @@ class P2PMessagingClient(config: NodeConfiguration, state.locked { val session = artemis.start().session val inbox = RemoteInboxAddress(myIdentity).queueName + val inboxes = mutableListOf(inbox) // Create a queue, consumer and producer for handling P2P network messages. createQueueIfAbsent(inbox) p2pConsumer = session.createConsumer(inbox) if (serviceIdentity != null) { val serviceAddress = RemoteInboxAddress(serviceIdentity).queueName + inboxes += serviceAddress createQueueIfAbsent(serviceAddress) val serviceHandler = session.createConsumer(serviceAddress) serviceHandler.setMessageHandler { msg -> @@ -207,11 +224,96 @@ class P2PMessagingClient(config: NodeConfiguration, } } } + registerBridgeControl(session, inboxes) + enumerateBridges(session, inboxes) } resumeMessageRedelivery() } + private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List) { + val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}" + session.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue) + val bridgeConsumer = session.createConsumer(bridgeNotifyQueue) + bridgeNotifyConsumer = bridgeConsumer + bridgeConsumer.setMessageHandler { msg -> + val data: ByteArray = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) } + val notifyMessage = data.deserialize(context = SerializationDefaults.P2P_CONTEXT) + log.info(notifyMessage.toString()) + when (notifyMessage) { + is BridgeControl.BridgeToNodeSnapshotRequest -> enumerateBridges(session, inboxes) + else -> log.error("Unexpected Bridge Control message type on notify topc $notifyMessage") + } + msg.acknowledge() + } + networkChangeSubscription = networkMap.changed.subscribe { updateBridgesOnNetworkChange(it) } + } + + private fun sendBridgeControl(message: BridgeControl) { + val client = artemis.started!! + val controlPacket = message.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes + val artemisMessage = client.session.createMessage(false) + artemisMessage.writeBodyBufferBytes(controlPacket) + client.producer.send(BRIDGE_CONTROL, artemisMessage) + } + + private fun updateBridgesOnNetworkChange(change: NetworkMapCache.MapChange) { + log.info("Updating bridges on network map change: ${change.node}") + fun gatherAddresses(node: NodeInfo): Sequence { + return node.legalIdentitiesAndCerts.map { + val messagingAddress = NodeAddress(it.party.owningKey, node.addresses.first()) + BridgeEntry(messagingAddress.queueName, node.addresses, listOf(it.party.name)) + }.filter { artemis.started!!.session.queueQuery(SimpleString(it.queueName)).isExists }.asSequence() + } + + fun deployBridges(node: NodeInfo) { + gatherAddresses(node) + .forEach { + sendBridgeControl(BridgeControl.Create(myIdentity.toStringShort(), it)) + } + } + + fun destroyBridges(node: NodeInfo) { + gatherAddresses(node) + .forEach { + sendBridgeControl(BridgeControl.Delete(myIdentity.toStringShort(), it)) + } + } + + when (change) { + is NetworkMapCache.MapChange.Added -> { + deployBridges(change.node) + } + is NetworkMapCache.MapChange.Removed -> { + destroyBridges(change.node) + } + is NetworkMapCache.MapChange.Modified -> { + destroyBridges(change.previousNode) + deployBridges(change.node) + } + } + } + + private fun enumerateBridges(session: ClientSession, inboxes: List) { + val requiredBridges = mutableListOf() + fun createBridgeEntry(queueName: SimpleString) { + val keyHash = queueName.substring(PEERS_PREFIX.length) + val peers = networkMap.getNodesByOwningKeyIndex(keyHash) + for (node in peers) { + val bridge = BridgeEntry(queueName.toString(), node.addresses, node.legalIdentities.map { it.name }) + requiredBridges += bridge + knownQueues += queueName.toString() + } + } + + val queues = session.addressQuery(SimpleString("$PEERS_PREFIX#")).queueNames + for (queue in queues) { + createBridgeEntry(queue) + } + val startupMessage = BridgeControl.NodeToBridgeSnapshot(myIdentity.toStringShort(), inboxes, requiredBridges) + sendBridgeControl(startupMessage) + } + private fun resumeMessageRedelivery() { messagesToRedeliver.forEach { retryId, (message, target) -> send(message, target, retryId) @@ -361,12 +463,18 @@ class P2PMessagingClient(config: NodeConfiguration, check(artemis.started != null) val prevRunning = running running = false + networkChangeSubscription?.unsubscribe() val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice") try { c.close() } catch (e: ActiveMQObjectClosedException) { // Ignore it: this can happen if the server has gone away before we do. } + try { + bridgeNotifyConsumer!!.close() + } catch (e: ActiveMQObjectClosedException) { + // Ignore it: this can happen if the server has gone away before we do. + } p2pConsumer = null val s = serviceConsumer try { @@ -375,6 +483,7 @@ class P2PMessagingClient(config: NodeConfiguration, // Ignore it: this can happen if the server has gone away before we do. } serviceConsumer = null + knownQueues.clear() prevRunning } if (running && !nodeExecutor.isOnThread) { @@ -487,13 +596,25 @@ class P2PMessagingClient(config: NodeConfiguration, /** Attempts to create a durable queue on the broker which is bound to an address of the same name. */ private fun createQueueIfAbsent(queueName: String) { - state.alreadyLocked { - val session = artemis.started!!.session - val queueQuery = session.queueQuery(SimpleString(queueName)) - if (!queueQuery.isExists) { - log.info("Create fresh queue $queueName bound on same address") - session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) + if (!knownQueues.contains(queueName)) { + state.alreadyLocked { + val session = artemis.started!!.session + val queueQuery = session.queueQuery(SimpleString(queueName)) + if (!queueQuery.isExists) { + log.info("Create fresh queue $queueName bound on same address") + session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) + if (queueName.startsWith(PEERS_PREFIX)) { + val keyHash = queueName.substring(PEERS_PREFIX.length) + val peers = networkMap.getNodesByOwningKeyIndex(keyHash) + for (node in peers) { + val bridge = BridgeEntry(queueName, node.addresses, node.legalIdentities.map { it.name }) + val createBridgeMessage = BridgeControl.Create(myIdentity.toStringShort(), bridge) + sendBridgeControl(createBridgeMessage) + } + } + } } + knownQueues += queueName } } diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index bc2e599de7..f0e0c7deec 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -24,7 +24,6 @@ activeMQServer = { maxRetryIntervalMin = 3 } } -useAMQPBridges = true rpcSettings = { useSsl = false standAloneBroker = false diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index 7d9ee8a42d..7a464d548b 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -2,12 +2,9 @@ package net.corda.node.services.messaging import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever -import net.corda.core.context.AuthServiceId import net.corda.core.crypto.generateKeyPair import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.configureDatabase -import net.corda.node.internal.security.RPCSecurityManager -import net.corda.node.internal.security.RPCSecurityManagerImpl import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate @@ -55,7 +52,6 @@ class ArtemisMessagingTest { private lateinit var config: NodeConfiguration private lateinit var database: CordaPersistence - private lateinit var securityManager: RPCSecurityManager private var messagingClient: P2PMessagingClient? = null private var messagingServer: ArtemisMessagingServer? = null @@ -63,7 +59,6 @@ class ArtemisMessagingTest { @Before fun setUp() { - securityManager = RPCSecurityManagerImpl.fromUserList(users = emptyList(), id = AuthServiceId("TEST")) abstract class AbstractNodeConfiguration : NodeConfiguration config = rigorousMock().also { doReturn(temporaryFolder.root.toPath()).whenever(it).baseDirectory @@ -74,7 +69,6 @@ class ArtemisMessagingTest { doReturn("").whenever(it).exportJMXto doReturn(emptyList()).whenever(it).certificateChainCheckPolicies doReturn(5).whenever(it).messageRedeliveryDelaySeconds - doReturn(true).whenever(it).useAMQPBridges } LogHelper.setLevel(PersistentUniquenessProvider::class) database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock()) @@ -176,6 +170,7 @@ class ArtemisMessagingTest { null, ServiceAffinityExecutor("ArtemisMessagingTests", 1), database, + networkMapCache, maxMessageSize = maxMessageSize).apply { config.configureWithDevSSLCertificate() messagingClient = this @@ -184,7 +179,7 @@ class ArtemisMessagingTest { } private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer { - return ArtemisMessagingServer(config, local, networkMapCache, maxMessageSize).apply { + return ArtemisMessagingServer(config, local, maxMessageSize).apply { config.configureWithDevSSLCertificate() messagingServer = this } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt index 63f30dbd4a..1715b01662 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -5,7 +5,6 @@ import com.google.common.jimfs.Jimfs import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever import net.corda.core.DoNotImplement -import net.corda.core.crypto.entropyToKeyPair import net.corda.core.crypto.Crypto import net.corda.core.crypto.random63BitValue import net.corda.core.identity.CordaX500Name @@ -513,6 +512,5 @@ private fun mockNodeConfiguration(): NodeConfiguration { doReturn(5).whenever(it).messageRedeliveryDelaySeconds doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec doReturn(null).whenever(it).devModeOptions - doReturn(true).whenever(it).useAMQPBridges } } From e357a881818aa27355a5144156baed3cd7dc7433 Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Fri, 26 Jan 2018 09:32:11 +0000 Subject: [PATCH 2/2] Deprecate TimeWindowChecker, make TimeWindowInvalid report exact current time and transaction time window (#2280) * Make notary service return the current time and the transaction time window along with the TimeWindowInvalid error. Deprecate TimeWindowChecker. Add a static method for validating transaction time window to reduce code duplication. --- .ci/api-current.txt | 4 +- .../kotlin/net/corda/core/flows/NotaryFlow.kt | 10 ++++- .../corda/core/node/services/NotaryService.kt | 29 +++++++++++--- .../core/node/services/TimeWindowChecker.kt | 1 + .../node/services/TimeWindowCheckerTests.kt | 39 ------------------- .../BFTNonValidatingNotaryService.kt | 9 ++--- .../node/services/transactions/BFTSMaRt.kt | 12 +----- .../RaftNonValidatingNotaryService.kt | 3 -- .../RaftValidatingNotaryService.kt | 3 -- .../transactions/SimpleNotaryService.kt | 2 - .../transactions/ValidatingNotaryService.kt | 3 -- .../corda/notarydemo/MyCustomNotaryService.kt | 1 - 12 files changed, 40 insertions(+), 76 deletions(-) delete mode 100644 core/src/test/kotlin/net/corda/core/node/services/TimeWindowCheckerTests.kt diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 493a31b91a..54472637a1 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -1304,7 +1304,7 @@ public @interface net.corda.core.flows.InitiatingFlow @org.jetbrains.annotations.NotNull public String toString() ## @net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$TimeWindowInvalid extends net.corda.core.flows.NotaryError - public static final net.corda.core.flows.NotaryError$TimeWindowInvalid INSTANCE + @kotlin.jvm.JvmField @org.jetbrains.annotations.NotNull public static final net.corda.core.flows.NotaryError$TimeWindowInvalid INSTANCE ## @net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$TransactionInvalid extends net.corda.core.flows.NotaryError public (Throwable) @@ -1885,7 +1885,7 @@ public final class net.corda.core.node.services.TimeWindowChecker extends java.l public () public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party) @org.jetbrains.annotations.NotNull protected org.slf4j.Logger getLog() - @org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker() + @org.jetbrains.annotations.NotNull protected net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker() @org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.UniquenessProvider getUniquenessProvider() @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.TransactionSignature sign(net.corda.core.crypto.SecureHash) @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.DigitalSignature$WithKey sign(byte[]) diff --git a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt index 52a802ed0a..395bc6635e 100644 --- a/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/core/flows/NotaryFlow.kt @@ -18,6 +18,7 @@ import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.unwrap import java.security.SignatureException +import java.time.Instant import java.util.function.Predicate class NotaryFlow { @@ -167,7 +168,14 @@ sealed class NotaryError { } /** Occurs when time specified in the [TimeWindow] command is outside the allowed tolerance. */ - object TimeWindowInvalid : NotaryError() + data class TimeWindowInvalid(val currentTime: Instant, val txTimeWindow: TimeWindow) : NotaryError() { + override fun toString() = "Current time $currentTime is outside the time bounds specified by the transaction: $txTimeWindow" + + companion object { + @JvmField @Deprecated("Here only for binary compatibility purposes, do not use.") + val INSTANCE = TimeWindowInvalid(Instant.EPOCH, TimeWindow.fromOnly(Instant.EPOCH)) + } + } data class TransactionInvalid(val cause: Throwable) : NotaryError() { override fun toString() = cause.toString() diff --git a/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt b/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt index 5fff5d54ce..fc24174df4 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt @@ -12,6 +12,7 @@ import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import org.slf4j.Logger import java.security.PublicKey +import java.time.Clock abstract class NotaryService : SingletonSerializeAsToken() { companion object { @@ -27,6 +28,24 @@ abstract class NotaryService : SingletonSerializeAsToken() { if (custom) append(".custom") }.toString() } + + /** + * Checks if the current instant provided by the clock falls within the specified time window. + * + * @throws NotaryException if current time is outside the specified time window. The exception contains + * the [NotaryError.TimeWindowInvalid] error. + */ + @JvmStatic + @Throws(NotaryException::class) + fun validateTimeWindow(clock: Clock, timeWindow: TimeWindow?) { + if (timeWindow == null) return + val currentTime = clock.instant() + if (currentTime !in timeWindow) { + throw NotaryException( + NotaryError.TimeWindowInvalid(currentTime, timeWindow) + ) + } + } } abstract val services: ServiceHub @@ -52,14 +71,9 @@ abstract class TrustedAuthorityNotaryService : NotaryService() { } protected open val log: Logger get() = staticLog - // TODO: specify the valid time window in config, and convert TimeWindowChecker to a utility method - protected abstract val timeWindowChecker: TimeWindowChecker protected abstract val uniquenessProvider: UniquenessProvider - fun validateTimeWindow(t: TimeWindow?) { - if (t != null && !timeWindowChecker.isValid(t)) - throw NotaryException(NotaryError.TimeWindowInvalid) - } + fun validateTimeWindow(t: TimeWindow?) = NotaryService.validateTimeWindow(services.clock, t) /** * A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that @@ -98,4 +112,7 @@ abstract class TrustedAuthorityNotaryService : NotaryService() { val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID)) return services.keyManagementService.sign(signableData, notaryIdentityKey) } + + @Deprecated("This property is no longer used") @Suppress("DEPRECATION") + protected open val timeWindowChecker: TimeWindowChecker get() = throw UnsupportedOperationException("No default implementation, need to override") } \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/node/services/TimeWindowChecker.kt b/core/src/main/kotlin/net/corda/core/node/services/TimeWindowChecker.kt index fe1e95d96b..a4eeb0b487 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/TimeWindowChecker.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/TimeWindowChecker.kt @@ -6,6 +6,7 @@ import java.time.Clock /** * Checks if the current instant provided by the input clock falls within the provided time-window. */ +@Deprecated("This class is no longer used") class TimeWindowChecker(val clock: Clock = Clock.systemUTC()) { fun isValid(timeWindow: TimeWindow): Boolean = clock.instant() in timeWindow } diff --git a/core/src/test/kotlin/net/corda/core/node/services/TimeWindowCheckerTests.kt b/core/src/test/kotlin/net/corda/core/node/services/TimeWindowCheckerTests.kt deleted file mode 100644 index 61d37b5211..0000000000 --- a/core/src/test/kotlin/net/corda/core/node/services/TimeWindowCheckerTests.kt +++ /dev/null @@ -1,39 +0,0 @@ -package net.corda.core.node.services - -import net.corda.core.contracts.TimeWindow -import net.corda.core.utilities.seconds -import org.junit.Test -import java.time.Clock -import java.time.Instant -import java.time.ZoneOffset -import kotlin.test.assertFalse -import kotlin.test.assertTrue - -class TimeWindowCheckerTests { - val clock: Clock = Clock.fixed(Instant.now(), ZoneOffset.UTC) - val timeWindowChecker = TimeWindowChecker(clock) - - @Test - fun `should return true for valid time-window`() { - val now = clock.instant() - val timeWindowBetween = TimeWindow.between(now - 10.seconds, now + 10.seconds) - val timeWindowFromOnly = TimeWindow.fromOnly(now - 10.seconds) - val timeWindowUntilOnly = TimeWindow.untilOnly(now + 10.seconds) - assertTrue { timeWindowChecker.isValid(timeWindowBetween) } - assertTrue { timeWindowChecker.isValid(timeWindowFromOnly) } - assertTrue { timeWindowChecker.isValid(timeWindowUntilOnly) } - } - - @Test - fun `should return false for invalid time-window`() { - val now = clock.instant() - val timeWindowBetweenPast = TimeWindow.between(now - 10.seconds, now - 2.seconds) - val timeWindowBetweenFuture = TimeWindow.between(now + 2.seconds, now + 10.seconds) - val timeWindowFromOnlyFuture = TimeWindow.fromOnly(now + 10.seconds) - val timeWindowUntilOnlyPast = TimeWindow.untilOnly(now - 10.seconds) - assertFalse { timeWindowChecker.isValid(timeWindowBetweenPast) } - assertFalse { timeWindowChecker.isValid(timeWindowBetweenFuture) } - assertFalse { timeWindowChecker.isValid(timeWindowFromOnlyFuture) } - assertFalse { timeWindowChecker.isValid(timeWindowUntilOnlyPast) } - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt index 9f6ffe1d08..979e2d2ef0 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt @@ -13,7 +13,6 @@ import net.corda.core.flows.NotaryException import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.node.services.NotaryService -import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.UniquenessProvider import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.deserialize @@ -54,8 +53,7 @@ class BFTNonValidatingNotaryService( // Replica startup must be in parallel with other replicas, otherwise the constructor may not return: thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) { configHandle.use { - val timeWindowChecker = TimeWindowChecker(services.clock) - val replica = Replica(it, replicaId, { createMap() }, services, notaryIdentityKey, timeWindowChecker) + val replica = Replica(it, replicaId, { createMap() }, services, notaryIdentityKey) replicaHolder.set(replica) log.info("BFT SMaRt replica $replicaId is running.") } @@ -131,8 +129,7 @@ class BFTNonValidatingNotaryService( replicaId: Int, createMap: () -> AppendOnlyPersistentMap, services: ServiceHubInternal, - notaryIdentityKey: PublicKey, - timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Replica(config, replicaId, createMap, services, notaryIdentityKey, timeWindowChecker) { + notaryIdentityKey: PublicKey) : BFTSMaRt.Replica(config, replicaId, createMap, services, notaryIdentityKey) { override fun executeCommand(command: ByteArray): ByteArray { val request = command.deserialize() @@ -146,7 +143,7 @@ class BFTNonValidatingNotaryService( val id = ftx.id val inputs = ftx.inputs val notary = ftx.notary - validateTimeWindow(ftx.timeWindow) + NotaryService.validateTimeWindow(services.clock, ftx.timeWindow) if (notary !in services.myInfo.legalIdentities) throw NotaryException(NotaryError.WrongNotary) commitInputStates(inputs, id, callerIdentity) log.debug { "Inputs committed successfully, signing $id" } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt index 67e49887ea..84ca283b48 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt @@ -13,14 +13,12 @@ import bftsmart.tom.server.defaultservices.DefaultRecoverable import bftsmart.tom.server.defaultservices.DefaultReplier import bftsmart.tom.util.Extractor import net.corda.core.contracts.StateRef -import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.* import net.corda.core.flows.NotaryError import net.corda.core.flows.NotaryException import net.corda.core.identity.Party import net.corda.core.internal.declaredField import net.corda.core.internal.toTypedArray -import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.UniquenessProvider import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.CordaSerializable @@ -178,8 +176,7 @@ object BFTSMaRt { createMap: () -> AppendOnlyPersistentMap, protected val services: ServiceHubInternal, - protected val notaryIdentityKey: PublicKey, - private val timeWindowChecker: TimeWindowChecker) : DefaultRecoverable() { + protected val notaryIdentityKey: PublicKey) : DefaultRecoverable() { companion object { private val log = contextLogger() } @@ -218,7 +215,7 @@ object BFTSMaRt { /** * Implement logic to execute the command and commit the transaction to the log. - * Helper methods are provided for transaction processing: [commitInputStates], [validateTimeWindow], and [sign]. + * Helper methods are provided for transaction processing: [commitInputStates], and [sign]. */ abstract fun executeCommand(command: ByteArray): ByteArray? @@ -245,11 +242,6 @@ object BFTSMaRt { } } - protected fun validateTimeWindow(t: TimeWindow?) { - if (t != null && !timeWindowChecker.isValid(t)) - throw NotaryException(NotaryError.TimeWindowInvalid) - } - protected fun sign(bytes: ByteArray): DigitalSignature.WithKey { return services.database.transaction { services.keyManagementService.sign(bytes, notaryIdentityKey) } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt index e672380398..63fef9fdfe 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt @@ -3,7 +3,6 @@ package net.corda.node.services.transactions import net.corda.core.flows.FlowSession import net.corda.core.flows.NotaryFlow import net.corda.core.node.ServiceHub -import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.TrustedAuthorityNotaryService import java.security.PublicKey @@ -13,8 +12,6 @@ class RaftNonValidatingNotaryService( override val notaryIdentityKey: PublicKey, override val uniquenessProvider: RaftUniquenessProvider ) : TrustedAuthorityNotaryService() { - override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock) - override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service { return NonValidatingNotaryFlow(otherPartySession, this) } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt index 3e4899ae0a..0ddbb14ee0 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt @@ -3,7 +3,6 @@ package net.corda.node.services.transactions import net.corda.core.flows.FlowSession import net.corda.core.flows.NotaryFlow import net.corda.core.node.ServiceHub -import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.TrustedAuthorityNotaryService import java.security.PublicKey @@ -13,8 +12,6 @@ class RaftValidatingNotaryService( override val notaryIdentityKey: PublicKey, override val uniquenessProvider: RaftUniquenessProvider ) : TrustedAuthorityNotaryService() { - override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock) - override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service { return ValidatingNotaryFlow(otherPartySession, this) } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt index cb4401cae5..f9ace1e95e 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt @@ -2,14 +2,12 @@ package net.corda.node.services.transactions import net.corda.core.flows.FlowSession import net.corda.core.flows.NotaryFlow -import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.node.services.api.ServiceHubInternal import java.security.PublicKey /** A simple Notary service that does not perform transaction validation */ class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val timeWindowChecker = TimeWindowChecker(services.clock) override val uniquenessProvider = PersistentUniquenessProvider() override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = NonValidatingNotaryFlow(otherPartySession, this) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt index 5e687c3b6d..01da0911dd 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt @@ -2,15 +2,12 @@ package net.corda.node.services.transactions import net.corda.core.flows.FlowSession import net.corda.core.flows.NotaryFlow -import net.corda.core.node.services.TimeWindowChecker import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.node.services.api.ServiceHubInternal import java.security.PublicKey /** A Notary service that validates the transaction chain of the submitted transaction before committing it */ class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val timeWindowChecker = TimeWindowChecker(services.clock) - override val uniquenessProvider = PersistentUniquenessProvider() override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = ValidatingNotaryFlow(otherPartySession, this) diff --git a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt index 55adae9ee2..1dd9cda94b 100644 --- a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt +++ b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt @@ -22,7 +22,6 @@ import java.security.SignatureException // START 1 @CordaService class MyCustomValidatingNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val timeWindowChecker = TimeWindowChecker(services.clock) override val uniquenessProvider = PersistentUniquenessProvider() override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = MyValidatingNotaryFlow(otherPartySession, this)