Merge from OS to ENT including Bridge manager work

This commit is contained in:
Matthew Nesbit 2018-01-26 16:55:43 +00:00
commit 054c44d4bc
30 changed files with 392 additions and 466 deletions

View File

@ -1323,7 +1323,7 @@ public @interface net.corda.core.flows.InitiatingFlow
@org.jetbrains.annotations.NotNull public String toString() @org.jetbrains.annotations.NotNull public String toString()
## ##
@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$TimeWindowInvalid extends net.corda.core.flows.NotaryError @net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$TimeWindowInvalid extends net.corda.core.flows.NotaryError
public static final net.corda.core.flows.NotaryError$TimeWindowInvalid INSTANCE @kotlin.jvm.JvmField @org.jetbrains.annotations.NotNull public static final net.corda.core.flows.NotaryError$TimeWindowInvalid INSTANCE
## ##
@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$TransactionInvalid extends net.corda.core.flows.NotaryError @net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$TransactionInvalid extends net.corda.core.flows.NotaryError
public <init>(Throwable) public <init>(Throwable)
@ -1921,7 +1921,7 @@ public final class net.corda.core.node.services.TimeWindowChecker extends java.l
public <init>() public <init>()
public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party) public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party)
@org.jetbrains.annotations.NotNull protected org.slf4j.Logger getLog() @org.jetbrains.annotations.NotNull protected org.slf4j.Logger getLog()
@org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker() @org.jetbrains.annotations.NotNull protected net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker()
@org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.UniquenessProvider getUniquenessProvider() @org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.UniquenessProvider getUniquenessProvider()
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.TransactionSignature sign(net.corda.core.crypto.SecureHash) @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.TransactionSignature sign(net.corda.core.crypto.SecureHash)
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.DigitalSignature$WithKey sign(byte[]) @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.DigitalSignature$WithKey sign(byte[])

View File

@ -18,6 +18,7 @@ import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.unwrap import net.corda.core.utilities.unwrap
import java.security.SignatureException import java.security.SignatureException
import java.time.Instant
import java.util.function.Predicate import java.util.function.Predicate
class NotaryFlow { class NotaryFlow {
@ -167,7 +168,14 @@ sealed class NotaryError {
} }
/** Occurs when time specified in the [TimeWindow] command is outside the allowed tolerance. */ /** Occurs when time specified in the [TimeWindow] command is outside the allowed tolerance. */
object TimeWindowInvalid : NotaryError() data class TimeWindowInvalid(val currentTime: Instant, val txTimeWindow: TimeWindow) : NotaryError() {
override fun toString() = "Current time $currentTime is outside the time bounds specified by the transaction: $txTimeWindow"
companion object {
@JvmField @Deprecated("Here only for binary compatibility purposes, do not use.")
val INSTANCE = TimeWindowInvalid(Instant.EPOCH, TimeWindow.fromOnly(Instant.EPOCH))
}
}
data class TransactionInvalid(val cause: Throwable) : NotaryError() { data class TransactionInvalid(val cause: Throwable) : NotaryError() {
override fun toString() = cause.toString() override fun toString() = cause.toString()

View File

@ -12,6 +12,7 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import org.slf4j.Logger import org.slf4j.Logger
import java.security.PublicKey import java.security.PublicKey
import java.time.Clock
abstract class NotaryService : SingletonSerializeAsToken() { abstract class NotaryService : SingletonSerializeAsToken() {
companion object { companion object {
@ -27,6 +28,24 @@ abstract class NotaryService : SingletonSerializeAsToken() {
if (custom) append(".custom") if (custom) append(".custom")
}.toString() }.toString()
} }
/**
* Checks if the current instant provided by the clock falls within the specified time window.
*
* @throws NotaryException if current time is outside the specified time window. The exception contains
* the [NotaryError.TimeWindowInvalid] error.
*/
@JvmStatic
@Throws(NotaryException::class)
fun validateTimeWindow(clock: Clock, timeWindow: TimeWindow?) {
if (timeWindow == null) return
val currentTime = clock.instant()
if (currentTime !in timeWindow) {
throw NotaryException(
NotaryError.TimeWindowInvalid(currentTime, timeWindow)
)
}
}
} }
abstract val services: ServiceHub abstract val services: ServiceHub
@ -52,14 +71,9 @@ abstract class TrustedAuthorityNotaryService : NotaryService() {
} }
protected open val log: Logger get() = staticLog protected open val log: Logger get() = staticLog
// TODO: specify the valid time window in config, and convert TimeWindowChecker to a utility method
protected abstract val timeWindowChecker: TimeWindowChecker
protected abstract val uniquenessProvider: UniquenessProvider protected abstract val uniquenessProvider: UniquenessProvider
fun validateTimeWindow(t: TimeWindow?) { fun validateTimeWindow(t: TimeWindow?) = NotaryService.validateTimeWindow(services.clock, t)
if (t != null && !timeWindowChecker.isValid(t))
throw NotaryException(NotaryError.TimeWindowInvalid)
}
/** /**
* A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that * A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that
@ -98,4 +112,7 @@ abstract class TrustedAuthorityNotaryService : NotaryService() {
val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID)) val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID))
return services.keyManagementService.sign(signableData, notaryIdentityKey) return services.keyManagementService.sign(signableData, notaryIdentityKey)
} }
@Deprecated("This property is no longer used") @Suppress("DEPRECATION")
protected open val timeWindowChecker: TimeWindowChecker get() = throw UnsupportedOperationException("No default implementation, need to override")
} }

View File

@ -6,6 +6,7 @@ import java.time.Clock
/** /**
* Checks if the current instant provided by the input clock falls within the provided time-window. * Checks if the current instant provided by the input clock falls within the provided time-window.
*/ */
@Deprecated("This class is no longer used")
class TimeWindowChecker(val clock: Clock = Clock.systemUTC()) { class TimeWindowChecker(val clock: Clock = Clock.systemUTC()) {
fun isValid(timeWindow: TimeWindow): Boolean = clock.instant() in timeWindow fun isValid(timeWindow: TimeWindow): Boolean = clock.instant() in timeWindow
} }

View File

@ -1,39 +0,0 @@
package net.corda.core.node.services
import net.corda.core.contracts.TimeWindow
import net.corda.core.utilities.seconds
import org.junit.Test
import java.time.Clock
import java.time.Instant
import java.time.ZoneOffset
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class TimeWindowCheckerTests {
val clock: Clock = Clock.fixed(Instant.now(), ZoneOffset.UTC)
val timeWindowChecker = TimeWindowChecker(clock)
@Test
fun `should return true for valid time-window`() {
val now = clock.instant()
val timeWindowBetween = TimeWindow.between(now - 10.seconds, now + 10.seconds)
val timeWindowFromOnly = TimeWindow.fromOnly(now - 10.seconds)
val timeWindowUntilOnly = TimeWindow.untilOnly(now + 10.seconds)
assertTrue { timeWindowChecker.isValid(timeWindowBetween) }
assertTrue { timeWindowChecker.isValid(timeWindowFromOnly) }
assertTrue { timeWindowChecker.isValid(timeWindowUntilOnly) }
}
@Test
fun `should return false for invalid time-window`() {
val now = clock.instant()
val timeWindowBetweenPast = TimeWindow.between(now - 10.seconds, now - 2.seconds)
val timeWindowBetweenFuture = TimeWindow.between(now + 2.seconds, now + 10.seconds)
val timeWindowFromOnlyFuture = TimeWindow.fromOnly(now + 10.seconds)
val timeWindowUntilOnlyPast = TimeWindow.untilOnly(now - 10.seconds)
assertFalse { timeWindowChecker.isValid(timeWindowBetweenPast) }
assertFalse { timeWindowChecker.isValid(timeWindowBetweenFuture) }
assertFalse { timeWindowChecker.isValid(timeWindowFromOnlyFuture) }
assertFalse { timeWindowChecker.isValid(timeWindowUntilOnlyPast) }
}
}

View File

@ -191,6 +191,9 @@ R3 Corda 3.0 Developer Preview
* Enterprise Corda only: Compatibility with PostgreSQL 9.6 database. * Enterprise Corda only: Compatibility with PostgreSQL 9.6 database.
* Move to a message based control of peer to peer bridge formation to allow for future out of process bridging components.
This removes the legacy Artemis bridges completely, so the ``useAMQPBridges`` configuration property has been removed.
.. _changelog_v2: .. _changelog_v2:
Corda 2.0 Corda 2.0

View File

@ -206,11 +206,6 @@ path to the node's base directory.
:exportJMXTo: If set to ``http``, will enable JMX metrics reporting via the Jolokia HTTP/JSON agent. :exportJMXTo: If set to ``http``, will enable JMX metrics reporting via the Jolokia HTTP/JSON agent.
Default Jolokia access url is http://127.0.0.1:7005/jolokia/ Default Jolokia access url is http://127.0.0.1:7005/jolokia/
.. _config_amqp_bridge:
:useAMQPBridges: Optionally can be set to ``false`` to use Artemis CORE Bridges for peer-to-peer communications.
Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes.
:transactionCacheSizeMegaBytes: Optionally specify how much memory should be used for caching of ledger transactions in memory. :transactionCacheSizeMegaBytes: Optionally specify how much memory should be used for caching of ledger transactions in memory.
Otherwise defaults to 8MB plus 5% of all heap memory above 300MB. Otherwise defaults to 8MB plus 5% of all heap memory above 300MB.

View File

@ -25,6 +25,8 @@ class ArtemisMessagingComponent {
const val INTERNAL_PREFIX = "internal." const val INTERNAL_PREFIX = "internal."
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." //TODO Come up with better name for common peers/services queue const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." //TODO Come up with better name for common peers/services queue
const val P2P_PREFIX = "p2p.inbound." const val P2P_PREFIX = "p2p.inbound."
const val BRIDGE_CONTROL = "${INTERNAL_PREFIX}bridge.control"
const val BRIDGE_NOTIFY = "${INTERNAL_PREFIX}bridge.notify"
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications" const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
} }

View File

@ -0,0 +1,50 @@
package net.corda.nodeapi.internal
import net.corda.core.identity.CordaX500Name
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.NetworkHostAndPort
/**
* The information required to construct a bridge to a remote peer.
* @property queueName The local source queue from which to move messages.
* @property targets The list of TCP connection targets on which the peer resides
* @property legalNames The list of acceptable [CordaX500Name] names that should be presented as subject of the validated peer TLS certificate.
*/
@CordaSerializable
data class BridgeEntry(val queueName: String, val targets: List<NetworkHostAndPort>, val legalNames: List<CordaX500Name>)
sealed class BridgeControl {
/**
* This message is sent on node start to inform any bridges of valid inbound peer-to-peer topics and pre-existing outbound queues needing bridging.
* @property nodeIdentity This is used for informational purposes to identify the originating node instance.
* @property inboxQueues The list of P2P inbox queue names/addresses, which could be used to filter inbound messages and prevent any identity spoofing.
* @property sendQueues The list [BridgeEntry] for all pre-existing local queues requiring a bridge to a remote peer.
*/
@CordaSerializable
data class NodeToBridgeSnapshot(val nodeIdentity: String, val inboxQueues: List<String>, val sendQueues: List<BridgeEntry>) : BridgeControl()
/**
* This message is sent on bridge start to re-request NodeToBridgeSnapshot information from all nodes on the broker.
* @property bridgeIdentity This is used for informational purposes to identify the originating bridge instance.
*/
@CordaSerializable
data class BridgeToNodeSnapshotRequest(val bridgeIdentity: String) : BridgeControl()
/**
* This message is sent to any active bridges to create a new bridge if one does not already exist. It may also be sent if updated
* information arrives from the network map to allow connection details of a pre-existing queue to now be resolved.
* @property nodeIdentity This is used for informational purposes to identify the originating node instance.
* @property bridgeInfo The connection details of the new bridge.
*/
@CordaSerializable
data class Create(val nodeIdentity: String, val bridgeInfo: BridgeEntry) : BridgeControl()
/**
* This message is sent to any active bridges to tear down an existing bridge. Typically this is only done when there is a change in network map details for a peer.
* The source queue is not affected by this operation and it is the responsibility of the node to ensure there are no unsent messages and to delete the durable queue.
* @property nodeIdentity This is used for informational purposes to identify the originating node instance.
* @property bridgeInfo The connection details of the bridge to be removed
*/
@CordaSerializable
data class Delete(val nodeIdentity: String, val bridgeInfo: BridgeEntry) : BridgeControl()
}

View File

@ -1,18 +1,18 @@
package net.corda.node.amqp package net.corda.node.amqp
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.crypto.toStringShort import net.corda.core.crypto.toStringShort
import net.corda.core.internal.div import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.protonwrapper.netty.AMQPServer import net.corda.node.internal.protonwrapper.netty.AMQPServer
import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.* import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.AMQPBridgeManager
import net.corda.node.services.messaging.ArtemisMessagingClient import net.corda.node.services.messaging.ArtemisMessagingClient
import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.BridgeManager
import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.testing.core.* import net.corda.testing.core.*
import net.corda.testing.internal.rigorousMock import net.corda.testing.internal.rigorousMock
@ -24,7 +24,6 @@ import org.junit.Ignore
import org.junit.Rule import org.junit.Rule
import org.junit.Test import org.junit.Test
import org.junit.rules.TemporaryFolder import org.junit.rules.TemporaryFolder
import rx.Observable
import java.util.* import java.util.*
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertNotEquals import kotlin.test.assertNotEquals
@ -52,7 +51,7 @@ class AMQPBridgeTest {
fun `test acked and nacked messages`() { fun `test acked and nacked messages`() {
// Create local queue // Create local queue
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort() val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
val (artemisServer, artemisClient) = createArtemis(sourceQueueName) val (artemisServer, artemisClient, bridgeManager) = createArtemis(sourceQueueName)
// Pre-populate local queue with 3 messages // Pre-populate local queue with 3 messages
val artemis = artemisClient.started!! val artemis = artemisClient.started!!
@ -117,54 +116,25 @@ class AMQPBridgeTest {
} }
artemis.producer.send(sourceQueueName, artemisMessage) artemis.producer.send(sourceQueueName, artemisMessage)
val received5 = receive.next()
val messageID5 = received5.applicationProperties["CountProp"] as Int
assertArrayEquals("Test_end".toByteArray(), received5.payload)
assertEquals(-1, messageID5) // next message should be in order
received5.complete(true)
while (true) {
val received5 = receive.next()
val messageID5 = received5.applicationProperties["CountProp"] as Int
if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip
assertEquals(-1, messageID5) // next message should be in order though
assertArrayEquals("Test_end".toByteArray(), received5.payload)
break
}
received5.complete(true)
}
bridgeManager.stop()
amqpServer.stop() amqpServer.stop()
artemisClient.stop() artemisClient.stop()
artemisServer.stop() artemisServer.stop()
} }
@Test private fun createArtemis(sourceQueueName: String?): Triple<ArtemisMessagingServer, ArtemisMessagingClient, BridgeManager> {
fun `Test legacy bridge still works`() {
// Create local queue
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
val (artemisLegacyServer, artemisLegacyClient) = createLegacyArtemis(sourceQueueName)
val (artemisServer, artemisClient) = createArtemis(null)
val inbox = ArtemisMessagingComponent.RemoteInboxAddress(BOB.party).queueName
artemisClient.started!!.session.createQueue(inbox, RoutingType.ANYCAST, inbox, true)
val artemis = artemisLegacyClient.started!!
for (i in 0 until 3) {
val artemisMessage = artemis.session.createMessage(true).apply {
putIntProperty("CountProp", i)
writeBodyBufferBytes("Test$i".toByteArray())
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
artemis.producer.send(sourceQueueName, artemisMessage)
}
val subs = artemisClient.started!!.session.createConsumer(inbox)
for (i in 0 until 3) {
val msg = subs.receive()
val messageBody = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
assertArrayEquals("Test$i".toByteArray(), messageBody)
assertEquals(i, msg.getIntProperty("CountProp"))
}
artemisClient.stop()
artemisServer.stop()
artemisLegacyClient.stop()
artemisLegacyServer.stop()
}
private fun createArtemis(sourceQueueName: String?): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also { val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory
doReturn(ALICE_NAME).whenever(it).myLegalName doReturn(ALICE_NAME).whenever(it).myLegalName
@ -173,50 +143,21 @@ class AMQPBridgeTest {
doReturn(artemisAddress).whenever(it).p2pAddress doReturn(artemisAddress).whenever(it).p2pAddress
doReturn("").whenever(it).exportJMXto doReturn("").whenever(it).exportJMXto
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(true).whenever(it).useAMQPBridges
} }
artemisConfig.configureWithDevSSLCertificate() artemisConfig.configureWithDevSSLCertificate()
val networkMap = rigorousMock<NetworkMapCacheInternal>().also { val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, MAX_MESSAGE_SIZE)
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
}
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, networkMap, MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE) val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE)
artemisServer.start() artemisServer.start()
artemisClient.start() artemisClient.start()
val bridgeManager = AMQPBridgeManager(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE)
bridgeManager.start()
val artemis = artemisClient.started!! val artemis = artemisClient.started!!
if (sourceQueueName != null) { if (sourceQueueName != null) {
// Local queue for outgoing messages // Local queue for outgoing messages
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true) artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
bridgeManager.deployBridge(sourceQueueName, amqpAddress, setOf(BOB.name))
} }
return Pair(artemisServer, artemisClient) return Triple(artemisServer, artemisClient, bridgeManager)
}
private fun createLegacyArtemis(sourceQueueName: String): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / "artemis2").whenever(it).baseDirectory
doReturn(BOB_NAME).whenever(it).myLegalName
doReturn("trustpass").whenever(it).trustStorePassword
doReturn("cordacadevpass").whenever(it).keyStorePassword
doReturn(artemisAddress).whenever(it).p2pAddress
doReturn("").whenever(it).exportJMXto
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(false).whenever(it).useAMQPBridges
doReturn(ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0))).whenever(it).activeMQServer
}
artemisConfig.configureWithDevSSLCertificate()
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
}
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, networkMap, MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress2, MAX_MESSAGE_SIZE)
artemisServer.start()
artemisClient.start()
val artemis = artemisClient.started!!
// Local queue for outgoing messages
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
return Pair(artemisServer, artemisClient)
} }
private fun createAMQPServer(): AMQPServer { private fun createAMQPServer(): AMQPServer {

View File

@ -6,13 +6,11 @@ import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.nio.NioEventLoopGroup
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.div import net.corda.core.internal.div
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.toFuture import net.corda.core.toFuture
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.protonwrapper.messages.MessageStatus import net.corda.node.internal.protonwrapper.messages.MessageStatus
import net.corda.node.internal.protonwrapper.netty.AMQPClient import net.corda.node.internal.protonwrapper.netty.AMQPClient
import net.corda.node.internal.protonwrapper.netty.AMQPServer import net.corda.node.internal.protonwrapper.netty.AMQPServer
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.config.configureWithDevSSLCertificate
@ -27,7 +25,6 @@ import org.junit.Assert.assertArrayEquals
import org.junit.Rule import org.junit.Rule
import org.junit.Test import org.junit.Test
import org.junit.rules.TemporaryFolder import org.junit.rules.TemporaryFolder
import rx.Observable.never
import kotlin.test.assertEquals import kotlin.test.assertEquals
class ProtonWrapperTests { class ProtonWrapperTests {
@ -227,14 +224,10 @@ class ProtonWrapperTests {
doReturn(NetworkHostAndPort("0.0.0.0", artemisPort)).whenever(it).p2pAddress doReturn(NetworkHostAndPort("0.0.0.0", artemisPort)).whenever(it).p2pAddress
doReturn("").whenever(it).exportJMXto doReturn("").whenever(it).exportJMXto
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(true).whenever(it).useAMQPBridges
} }
artemisConfig.configureWithDevSSLCertificate() artemisConfig.configureWithDevSSLCertificate()
val networkMap = rigorousMock<NetworkMapCacheInternal>().also { val server = ArtemisMessagingServer(artemisConfig, artemisPort, MAX_MESSAGE_SIZE)
doReturn(never<NetworkMapCache.MapChange>()).whenever(it).changed
}
val server = ArtemisMessagingServer(artemisConfig, artemisPort, networkMap, MAX_MESSAGE_SIZE)
val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE) val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE)
server.start() server.start()
client.start() client.start()

View File

@ -137,6 +137,7 @@ open class Node(configuration: NodeConfiguration,
override lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor override lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor
private var messageBroker: ArtemisMessagingServer? = null private var messageBroker: ArtemisMessagingServer? = null
private var bridgeControlListener: BridgeControlListener? = null
private var rpcBroker: ArtemisBroker? = null private var rpcBroker: ArtemisBroker? = null
private var shutdownHook: ShutdownHook? = null private var shutdownHook: ShutdownHook? = null
@ -152,6 +153,7 @@ open class Node(configuration: NodeConfiguration,
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker() val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) else startLocalRpcBroker() val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) else startLocalRpcBroker()
val advertisedAddress = info.addresses.single() val advertisedAddress = info.addresses.single()
bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString()) printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())
rpcServerAddresses?.let { rpcServerAddresses?.let {
@ -171,6 +173,7 @@ open class Node(configuration: NodeConfiguration,
serviceIdentity, serviceIdentity,
serverThread, serverThread,
database, database,
services.networkMapCache,
advertisedAddress, advertisedAddress,
networkParameters.maxMessageSize) networkParameters.maxMessageSize)
} }
@ -194,7 +197,7 @@ open class Node(configuration: NodeConfiguration,
private fun makeLocalMessageBroker(): NetworkHostAndPort { private fun makeLocalMessageBroker(): NetworkHostAndPort {
with(configuration) { with(configuration) {
messageBroker = ArtemisMessagingServer(this, p2pAddress.port, services.networkMapCache, networkParameters.maxMessageSize) messageBroker = ArtemisMessagingServer(this, p2pAddress.port, networkParameters.maxMessageSize)
return NetworkHostAndPort("localhost", p2pAddress.port) return NetworkHostAndPort("localhost", p2pAddress.port)
} }
} }
@ -257,6 +260,11 @@ open class Node(configuration: NodeConfiguration,
runOnStop += this::close runOnStop += this::close
start() start()
} }
// Start P2P bridge service
bridgeControlListener?.apply {
runOnStop += this::stop
start()
}
// Start up the MQ clients. // Start up the MQ clients.
rpcMessagingClient?.run { rpcMessagingClient?.run {
runOnStop += this::close runOnStop += this::close

View File

@ -402,6 +402,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
private fun encodePayloadBytes(msg: SendableMessageImpl): ByteBuf { private fun encodePayloadBytes(msg: SendableMessageImpl): ByteBuf {
val message = Proton.message() as ProtonJMessage val message = Proton.message() as ProtonJMessage
message.body = Data(Binary(msg.payload)) message.body = Data(Binary(msg.payload))
message.isDurable = true
message.properties = Properties() message.properties = Properties()
val appProperties = HashMap(msg.applicationProperties) val appProperties = HashMap(msg.applicationProperties)
//TODO We shouldn't have to do this, but Artemis Server doesn't set the header on AMQP packets. //TODO We shouldn't have to do this, but Artemis Server doesn't set the header on AMQP packets.

View File

@ -48,7 +48,6 @@ interface NodeConfiguration : NodeSSLConfiguration {
val sshd: SSHDConfiguration? val sshd: SSHDConfiguration?
val database: DatabaseConfig val database: DatabaseConfig
val relay: RelayConfiguration? val relay: RelayConfiguration?
val useAMQPBridges: Boolean get() = true
val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize
val attachmentContentCacheSizeBytes: Long get() = defaultAttachmentContentCacheSize val attachmentContentCacheSizeBytes: Long get() = defaultAttachmentContentCacheSize
val attachmentCacheBound: Long get() = defaultAttachmentCacheBound val attachmentCacheBound: Long get() = defaultAttachmentCacheBound
@ -161,7 +160,6 @@ data class NodeConfigurationImpl(
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(), override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(),
override val sshd: SSHDConfiguration? = null, override val sshd: SSHDConfiguration? = null,
override val database: DatabaseConfig = DatabaseConfig(exportHibernateJMXStatistics = devMode), override val database: DatabaseConfig = DatabaseConfig(exportHibernateJMXStatistics = devMode),
override val useAMQPBridges: Boolean = true,
private val transactionCacheSizeMegaBytes: Int? = null, private val transactionCacheSizeMegaBytes: Int? = null,
private val attachmentContentCacheSizeMegaBytes: Int? = null, private val attachmentContentCacheSizeMegaBytes: Int? = null,
override val attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound, override val attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound,

View File

@ -3,6 +3,7 @@ package net.corda.node.services.messaging
import io.netty.channel.EventLoopGroup import io.netty.channel.EventLoopGroup
import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.nio.NioEventLoopGroup
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.VisibleForTesting
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
@ -32,7 +33,8 @@ import kotlin.concurrent.withLock
* independent Session for message consumption. * independent Session for message consumption.
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager. * The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
*/ */
internal class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress: NetworkHostAndPort, val maxMessageSize: Int) : BridgeManager { @VisibleForTesting
class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress: NetworkHostAndPort, val maxMessageSize: Int) : BridgeManager {
private val lock = ReentrantLock() private val lock = ReentrantLock()
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>() private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
@ -177,6 +179,13 @@ internal class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress:
} }
} }
override fun destroyBridge(queueName: String, hostAndPort: NetworkHostAndPort) {
lock.withLock {
val bridge = bridgeNameToBridgeMap.remove(getBridgeName(queueName, hostAndPort))
bridge?.stop()
}
}
override fun bridgeExists(bridgeName: String): Boolean = lock.withLock { bridgeNameToBridgeMap.containsKey(bridgeName) } override fun bridgeExists(bridgeName: String): Boolean = lock.withLock { bridgeNameToBridgeMap.containsKey(bridgeName) }
override fun start() { override fun start() {

View File

@ -1,13 +1,9 @@
package net.corda.node.services.messaging package net.corda.node.services.messaging
import net.corda.core.crypto.AddressFormatException
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox import net.corda.core.internal.ThreadBox
import net.corda.core.internal.div import net.corda.core.internal.div
import net.corda.core.internal.noneOrSingle import net.corda.core.internal.noneOrSingle
import net.corda.core.internal.uncheckedCast import net.corda.core.internal.uncheckedCast
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
@ -17,7 +13,6 @@ import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.internal.artemis.BrokerAddresses import net.corda.node.internal.artemis.BrokerAddresses
import net.corda.node.internal.artemis.CertificateChainCheckPolicy import net.corda.node.internal.artemis.CertificateChainCheckPolicy
import net.corda.node.internal.artemis.SecureArtemisConfiguration import net.corda.node.internal.artemis.SecureArtemisConfiguration
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
@ -25,12 +20,10 @@ import net.corda.node.services.messaging.NodeLoginModule.Companion.VERIFIER_ROLE
import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.VerifierApi import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisPeerAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
import net.corda.nodeapi.internal.requireOnDefaultFileSystem import net.corda.nodeapi.internal.requireOnDefaultFileSystem
@ -47,7 +40,6 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal
import rx.Subscription
import java.io.IOException import java.io.IOException
import java.security.KeyStoreException import java.security.KeyStoreException
import java.security.Principal import java.security.Principal
@ -79,7 +71,6 @@ import javax.security.auth.spi.LoginModule
@ThreadSafe @ThreadSafe
class ArtemisMessagingServer(private val config: NodeConfiguration, class ArtemisMessagingServer(private val config: NodeConfiguration,
private val p2pPort: Int, private val p2pPort: Int,
val networkMapCache: NetworkMapCacheInternal,
val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() { val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() {
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
@ -91,29 +82,19 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
private val mutex = ThreadBox(InnerState()) private val mutex = ThreadBox(InnerState())
private lateinit var activeMQServer: ActiveMQServer private lateinit var activeMQServer: ActiveMQServer
override val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl override val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl
private var networkChangeHandle: Subscription? = null
private lateinit var bridgeManager: BridgeManager
init { init {
config.baseDirectory.requireOnDefaultFileSystem() config.baseDirectory.requireOnDefaultFileSystem()
} }
/**
* The server will make sure the bridge exists on network map changes, see method [updateBridgesOnNetworkChange]
* We assume network map will be updated accordingly when the client node register with the network map.
*/
override fun start() = mutex.locked { override fun start() = mutex.locked {
if (!running) { if (!running) {
configureAndStartServer() configureAndStartServer()
networkChangeHandle = networkMapCache.changed.subscribe { updateBridgesOnNetworkChange(it) }
running = true running = true
} }
} }
override fun stop() = mutex.locked { override fun stop() = mutex.locked {
bridgeManager.close()
networkChangeHandle?.unsubscribe()
networkChangeHandle = null
activeMQServer.stop() activeMQServer.stop()
running = false running = false
} }
@ -134,17 +115,11 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
registerActivationFailureListener { exception -> throw exception } registerActivationFailureListener { exception -> throw exception }
// Some types of queue might need special preparation on our side, like dialling back or preparing // Some types of queue might need special preparation on our side, like dialling back or preparing
// a lazily initialised subsystem. // a lazily initialised subsystem.
registerPostQueueCreationCallback { deployBridgesFromNewQueue(it.toString()) } registerPostQueueCreationCallback { log.debug { "Queue Created: $it" } }
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } } registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
} }
// Config driven switch between legacy CORE bridges and the newer AMQP protocol bridges. // Config driven switch between legacy CORE bridges and the newer AMQP protocol bridges.
bridgeManager = if (config.useAMQPBridges) {
AMQPBridgeManager(config, NetworkHostAndPort("localhost", p2pPort), maxMessageSize)
} else {
CoreBridgeManager(config, activeMQServer)
}
activeMQServer.start() activeMQServer.start()
bridgeManager.start()
Node.printBasicNodeInfo("Listening on port", p2pPort.toString()) Node.printBasicNodeInfo("Listening on port", p2pPort.toString())
} }
@ -223,76 +198,8 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
return ActiveMQJAASSecurityManager(NodeLoginModule::class.java.name, securityConfig) return ActiveMQJAASSecurityManager(NodeLoginModule::class.java.name, securityConfig)
} }
private fun deployBridgesFromNewQueue(queueName: String) {
log.debug { "Queue created: $queueName, deploying bridge(s)" }
fun deployBridgeToPeer(nodeInfo: NodeInfo) {
log.debug("Deploying bridge for $queueName to $nodeInfo")
val address = nodeInfo.addresses.single()
bridgeManager.deployBridge(queueName, address, nodeInfo.legalIdentitiesAndCerts.map { it.name }.toSet())
}
if (queueName.startsWith(PEERS_PREFIX)) {
try {
val nodeInfos = networkMapCache.getNodesByOwningKeyIndex(queueName.substring(PEERS_PREFIX.length))
if (nodeInfos.isNotEmpty()) {
nodeInfos.forEach { deployBridgeToPeer(it) }
} else {
log.error("Queue created for a peer that we don't know from the network map: $queueName")
}
} catch (e: AddressFormatException) {
log.error("Flow violation: Could not parse peer queue name as Base 58: $queueName")
}
}
}
/**
* The bridge will be created automatically when the queues are created, however, this is not the case when the network map restarted.
* The queues are restored from the journal, and because the queues are added before we register the callback handler, this method will never get called for existing queues.
* This results in message queues up and never get send out. (https://github.com/corda/corda/issues/37)
*
* We create the bridges indirectly now because the network map is not persisted and there are no ways to obtain host and port information on startup.
* TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service.
*/
private fun updateBridgesOnNetworkChange(change: MapChange) {
log.debug { "Updating bridges on network map change: ${change.node}" }
fun gatherAddresses(node: NodeInfo): Sequence<ArtemisPeerAddress> {
val address = node.addresses.single()
return node.legalIdentitiesAndCerts.map { NodeAddress(it.party.owningKey, address) }.asSequence()
}
fun deployBridges(node: NodeInfo) {
gatherAddresses(node)
.filter { queueExists(it.queueName) && !bridgeManager.bridgeExists(it.bridgeName) }
.forEach { deployBridge(it, node.legalIdentitiesAndCerts.map { it.name }.toSet()) }
}
when (change) {
is MapChange.Added -> {
deployBridges(change.node)
}
is MapChange.Removed -> {
bridgeManager.destroyBridges(change.node)
}
is MapChange.Modified -> {
// TODO Figure out what has actually changed and only destroy those bridges that need to be.
bridgeManager.destroyBridges(change.previousNode)
deployBridges(change.node)
}
}
}
private fun deployBridge(address: ArtemisPeerAddress, legalNames: Set<CordaX500Name>) {
bridgeManager.deployBridge(address.queueName, address.hostAndPort, legalNames)
}
private fun createTcpTransport(connectionDirection: ConnectionDirection, host: String, port: Int, enableSSL: Boolean = true) = private fun createTcpTransport(connectionDirection: ConnectionDirection, host: String, port: Int, enableSSL: Boolean = true) =
ArtemisTcpTransport.tcpTransport(connectionDirection, NetworkHostAndPort(host, port), config, enableSSL = enableSSL) ArtemisTcpTransport.tcpTransport(connectionDirection, NetworkHostAndPort(host, port), config, enableSSL = enableSSL)
private fun queueExists(queueName: String): Boolean = activeMQServer.queueQuery(SimpleString(queueName)).isExists
private val ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort)
private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
} }
/** /**

View File

@ -0,0 +1,116 @@
package net.corda.node.services.messaging
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.node.services.config.NodeConfiguration
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.BridgeControl
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import java.util.*
internal class BridgeControlListener(val config: NodeConfiguration,
val p2pAddress: NetworkHostAndPort,
val maxMessageSize: Int) : AutoCloseable {
private val bridgeId: String = UUID.randomUUID().toString()
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, p2pAddress, maxMessageSize)
private val validInboundQueues = mutableSetOf<String>()
private var artemis: ArtemisMessagingClient? = null
private var controlConsumer: ClientConsumer? = null
companion object {
private val log = contextLogger()
}
fun start() {
stop()
bridgeManager.start()
val artemis = ArtemisMessagingClient(config, p2pAddress, maxMessageSize)
this.artemis = artemis
artemis.start()
val artemisClient = artemis.started!!
val artemisSession = artemisClient.session
val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue)
val control = artemisSession.createConsumer(bridgeControlQueue)
controlConsumer = control
control.setMessageHandler { msg ->
try {
processControlMessage(msg)
} catch (ex: Exception) {
log.error("Unable to process bridge control message", ex)
}
}
val startupMessage = BridgeControl.BridgeToNodeSnapshotRequest(bridgeId).serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val bridgeRequest = artemisSession.createMessage(false)
bridgeRequest.writeBodyBufferBytes(startupMessage)
artemisClient.producer.send(BRIDGE_NOTIFY, bridgeRequest)
}
fun stop() {
controlConsumer?.close()
controlConsumer = null
artemis?.stop()
artemis = null
bridgeManager.stop()
}
override fun close() = stop()
private fun validateInboxQueueName(queueName: String): Boolean {
return queueName.startsWith(P2P_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists
}
private fun validateBridgingQueueName(queueName: String): Boolean {
return queueName.startsWith(PEERS_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists
}
private fun processControlMessage(msg: ClientMessage) {
val data: ByteArray = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
val controlMessage = data.deserialize<BridgeControl>(context = SerializationDefaults.P2P_CONTEXT)
log.info("Received bridge control message $controlMessage")
when (controlMessage) {
is BridgeControl.NodeToBridgeSnapshot -> {
if (!controlMessage.inboxQueues.all { validateInboxQueueName(it) }) {
log.error("Invalid queue names in control message $controlMessage")
return
}
if (!controlMessage.sendQueues.all { validateBridgingQueueName(it.queueName) }) {
log.error("Invalid queue names in control message $controlMessage")
return
}
for (outQueue in controlMessage.sendQueues) {
bridgeManager.deployBridge(outQueue.queueName, outQueue.targets.first(), outQueue.legalNames.toSet())
}
// TODO For now we just record the inboxes, but we don't use the information, but eventually out of process bridges will use this for validating inbound messages.
validInboundQueues.addAll(controlMessage.inboxQueues)
}
is BridgeControl.BridgeToNodeSnapshotRequest -> {
log.error("Message from Bridge $controlMessage detected on wrong topic!")
}
is BridgeControl.Create -> {
if (!validateBridgingQueueName((controlMessage.bridgeInfo.queueName))) {
log.error("Invalid queue names in control message $controlMessage")
return
}
bridgeManager.deployBridge(controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets.first(), controlMessage.bridgeInfo.legalNames.toSet())
}
is BridgeControl.Delete -> {
if (!controlMessage.bridgeInfo.queueName.startsWith(PEERS_PREFIX)) {
log.error("Invalid queue names in control message $controlMessage")
return
}
bridgeManager.destroyBridge(controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets.first())
}
}
}
}

View File

@ -1,17 +1,21 @@
package net.corda.node.services.messaging package net.corda.node.services.messaging
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.VisibleForTesting
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
/** /**
* Provides an internal interface that the [ArtemisMessagingServer] delegates to for Bridge activities. * Provides an internal interface that the [ArtemisMessagingServer] delegates to for Bridge activities.
*/ */
internal interface BridgeManager : AutoCloseable { @VisibleForTesting
interface BridgeManager : AutoCloseable {
fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>) fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>)
fun destroyBridges(node: NodeInfo) fun destroyBridges(node: NodeInfo)
fun destroyBridge(queueName: String, hostAndPort: NetworkHostAndPort)
fun bridgeExists(bridgeName: String): Boolean fun bridgeExists(bridgeName: String): Boolean
fun start() fun start()

View File

@ -1,178 +0,0 @@
package net.corda.node.services.messaging
import io.netty.handler.ssl.SslHandler
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.node.services.config.NodeConfiguration
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.crypto.x509
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.cluster.Transformer
import org.apache.activemq.artemis.spi.core.remoting.*
import org.apache.activemq.artemis.utils.ConfigurationHelper
import java.time.Duration
import java.util.concurrent.Executor
import java.util.concurrent.ScheduledExecutorService
import javax.security.auth.x500.X500Principal
/**
* This class simply moves the legacy CORE bridge code from [ArtemisMessagingServer]
* into a class implementing [BridgeManager].
* It has no lifecycle events, because the bridges are internal to the ActiveMQServer instance and thus
* stop when it is stopped.
*/
internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServer: ActiveMQServer) : BridgeManager {
companion object {
private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
private val ArtemisMessagingComponent.ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort)
}
private fun gatherAddresses(node: NodeInfo): Sequence<ArtemisMessagingComponent.ArtemisPeerAddress> {
val address = node.addresses.single()
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence()
}
/**
* All nodes are expected to have a public facing address called p2p.inbound.$identity for receiving
* messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it,
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
* P2P address.
*/
override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>) {
val connectionDirection = ConnectionDirection.Outbound(
connectorFactoryClassName = VerifyingNettyConnectorFactory::class.java.name,
expectedCommonNames = legalNames
)
val tcpTransport = ArtemisTcpTransport.tcpTransport(connectionDirection, target, config, enableSSL = true)
tcpTransport.params[ArtemisMessagingServer::class.java.name] = this
// We intentionally overwrite any previous connector config in case the peer legal name changed
activeMQServer.configuration.addConnectorConfiguration(target.toString(), tcpTransport)
activeMQServer.deployBridge(BridgeConfiguration().apply {
name = getBridgeName(queueName, target)
this.queueName = queueName
staticConnectors = listOf(target.toString())
confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
// We keep trying until the network map deems the node unreachable and tells us it's been removed at which
// point we destroy the bridge
retryInterval = config.activeMQServer.bridge.retryIntervalMs
retryIntervalMultiplier = config.activeMQServer.bridge.retryIntervalMultiplier
maxRetryInterval = Duration.ofMinutes(config.activeMQServer.bridge.maxRetryIntervalMin).toMillis()
// As a peer of the target node we must connect to it using the peer user. Actual authentication is done using
// our TLS certificate.
user = PEER_USER
password = PEER_USER
transformerClassName = InboxTopicTransformer::class.java.name
})
}
override fun bridgeExists(bridgeName: String): Boolean = activeMQServer.clusterManager.bridges.containsKey(bridgeName)
override fun start() {
// Nothing to do
}
override fun stop() {
// Nothing to do
}
override fun close() = stop()
override fun destroyBridges(node: NodeInfo) {
gatherAddresses(node).forEach {
activeMQServer.destroyBridge(it.bridgeName)
}
}
}
class InboxTopicTransformer : Transformer {
override fun transform(message: Message): Message {
message.address = translateLocalQueueToInboxAddress(message.address)
return message
}
}
class VerifyingNettyConnectorFactory : NettyConnectorFactory() {
override fun createConnector(configuration: MutableMap<String, Any>,
handler: BufferHandler?,
listener: ClientConnectionLifeCycleListener?,
closeExecutor: Executor?,
threadPool: Executor?,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?): Connector {
return VerifyingNettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool,
protocolManager)
}
private class VerifyingNettyConnector(configuration: MutableMap<String, Any>,
handler: BufferHandler?,
listener: ClientConnectionLifeCycleListener?,
closeExecutor: Executor?,
threadPool: Executor?,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?) :
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) {
companion object {
private val log = contextLogger()
}
private val sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration)
override fun createConnection(): Connection? {
val connection = super.createConnection() as? NettyConnection
if (sslEnabled && connection != null) {
val expectedLegalNames: Set<CordaX500Name> = uncheckedCast(configuration[ArtemisTcpTransport.VERIFY_PEER_LEGAL_NAME] ?: emptySet<CordaX500Name>())
try {
val session = connection.channel
.pipeline()
.get(SslHandler::class.java)
.engine()
.session
// Checks the peer name is the one we are expecting.
// TODO Some problems here: after introduction of multiple legal identities on the node and removal of the main one,
// we run into the issue, who are we connecting to. There are some solutions to that: advertise `network identity`;
// have mapping port -> identity (but, design doc says about removing SingleMessageRecipient and having just NetworkHostAndPort,
// it was convenient to store that this way); SNI.
val peerLegalName = CordaX500Name.parse(session.peerPrincipal.name)
val expectedLegalName = expectedLegalNames.singleOrNull { it == peerLegalName }
require(expectedLegalName != null) {
"Peer has wrong CN - expected $expectedLegalNames but got $peerLegalName. This is either a fatal " +
"misconfiguration by the remote peer or an SSL man-in-the-middle attack!"
}
// Make sure certificate has the same name.
val peerCertificateName = CordaX500Name.build(X500Principal(session.peerCertificateChain[0].subjectDN.name))
require(peerCertificateName == expectedLegalName) {
"Peer has wrong subject name in the certificate - expected $expectedLegalNames but got $peerCertificateName. This is either a fatal " +
"misconfiguration by the remote peer or an SSL man-in-the-middle attack!"
}
X509Utilities.validateCertificateChain(
session.localCertificates.last().x509,
session.peerCertificates.x509)
} catch (e: IllegalArgumentException) {
connection.close()
log.error(e.message)
return null
}
}
return connection
}
}
}

View File

@ -1,10 +1,13 @@
package net.corda.node.services.messaging package net.corda.node.services.messaging
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.ThreadBox import net.corda.core.internal.ThreadBox
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.PartyInfo
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
@ -12,6 +15,7 @@ import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.* import net.corda.core.utilities.*
import net.corda.node.VersionInfo import net.corda.node.VersionInfo
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.FlowMessagingImpl import net.corda.node.services.statemachine.FlowMessagingImpl
@ -19,6 +23,11 @@ import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.PersistentMap import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.ArtemisMessagingComponent.* import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.BridgeControl
import net.corda.nodeapi.internal.BridgeEntry
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
@ -27,6 +36,8 @@ import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientSession
import rx.Subscription
import java.security.PublicKey import java.security.PublicKey
import java.time.Instant import java.time.Instant
import java.util.* import java.util.*
@ -73,6 +84,7 @@ class P2PMessagingClient(config: NodeConfiguration,
private val serviceIdentity: PublicKey?, private val serviceIdentity: PublicKey?,
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor, private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
private val database: CordaPersistence, private val database: CordaPersistence,
private val networkMap: NetworkMapCacheInternal,
advertisedAddress: NetworkHostAndPort = serverAddress, advertisedAddress: NetworkHostAndPort = serverAddress,
maxMessageSize: Int maxMessageSize: Int
) : SingletonSerializeAsToken(), MessagingService { ) : SingletonSerializeAsToken(), MessagingService {
@ -133,6 +145,8 @@ class P2PMessagingClient(config: NodeConfiguration,
var running = false var running = false
var p2pConsumer: ClientConsumer? = null var p2pConsumer: ClientConsumer? = null
var serviceConsumer: ClientConsumer? = null var serviceConsumer: ClientConsumer? = null
var bridgeNotifyConsumer: ClientConsumer? = null
var networkChangeSubscription: Subscription? = null
} }
private val messagesToRedeliver = database.transaction { private val messagesToRedeliver = database.transaction {
@ -147,12 +161,13 @@ class P2PMessagingClient(config: NodeConfiguration,
private val cordaVendor = SimpleString(versionInfo.vendor) private val cordaVendor = SimpleString(versionInfo.vendor)
private val releaseVersion = SimpleString(versionInfo.releaseVersion) private val releaseVersion = SimpleString(versionInfo.releaseVersion)
/** An executor for sending messages */ /** An executor for sending messages */
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1) private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging ${myIdentity.toStringShort()}", 1)
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress) override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong() private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize) private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize)
private val state = ThreadBox(InnerState()) private val state = ThreadBox(InnerState())
private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
private val handlers = ConcurrentHashMap<String, MessageHandler>() private val handlers = ConcurrentHashMap<String, MessageHandler>()
@ -189,11 +204,13 @@ class P2PMessagingClient(config: NodeConfiguration,
state.locked { state.locked {
val session = artemis.start().session val session = artemis.start().session
val inbox = RemoteInboxAddress(myIdentity).queueName val inbox = RemoteInboxAddress(myIdentity).queueName
val inboxes = mutableListOf(inbox)
// Create a queue, consumer and producer for handling P2P network messages. // Create a queue, consumer and producer for handling P2P network messages.
createQueueIfAbsent(inbox) createQueueIfAbsent(inbox)
p2pConsumer = session.createConsumer(inbox) p2pConsumer = session.createConsumer(inbox)
if (serviceIdentity != null) { if (serviceIdentity != null) {
val serviceAddress = RemoteInboxAddress(serviceIdentity).queueName val serviceAddress = RemoteInboxAddress(serviceIdentity).queueName
inboxes += serviceAddress
createQueueIfAbsent(serviceAddress) createQueueIfAbsent(serviceAddress)
val serviceHandler = session.createConsumer(serviceAddress) val serviceHandler = session.createConsumer(serviceAddress)
serviceHandler.setMessageHandler { msg -> serviceHandler.setMessageHandler { msg ->
@ -202,11 +219,96 @@ class P2PMessagingClient(config: NodeConfiguration,
deliver(msg, message) deliver(msg, message)
} }
} }
registerBridgeControl(session, inboxes)
enumerateBridges(session, inboxes)
} }
resumeMessageRedelivery() resumeMessageRedelivery()
} }
private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List<String>) {
val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}"
session.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue)
val bridgeConsumer = session.createConsumer(bridgeNotifyQueue)
bridgeNotifyConsumer = bridgeConsumer
bridgeConsumer.setMessageHandler { msg ->
val data: ByteArray = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
val notifyMessage = data.deserialize<BridgeControl>(context = SerializationDefaults.P2P_CONTEXT)
log.info(notifyMessage.toString())
when (notifyMessage) {
is BridgeControl.BridgeToNodeSnapshotRequest -> enumerateBridges(session, inboxes)
else -> log.error("Unexpected Bridge Control message type on notify topc $notifyMessage")
}
msg.acknowledge()
}
networkChangeSubscription = networkMap.changed.subscribe { updateBridgesOnNetworkChange(it) }
}
private fun sendBridgeControl(message: BridgeControl) {
val client = artemis.started!!
val controlPacket = message.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val artemisMessage = client.session.createMessage(false)
artemisMessage.writeBodyBufferBytes(controlPacket)
client.producer.send(BRIDGE_CONTROL, artemisMessage)
}
private fun updateBridgesOnNetworkChange(change: NetworkMapCache.MapChange) {
log.info("Updating bridges on network map change: ${change.node}")
fun gatherAddresses(node: NodeInfo): Sequence<BridgeEntry> {
return node.legalIdentitiesAndCerts.map {
val messagingAddress = NodeAddress(it.party.owningKey, node.addresses.first())
BridgeEntry(messagingAddress.queueName, node.addresses, listOf(it.party.name))
}.filter { artemis.started!!.session.queueQuery(SimpleString(it.queueName)).isExists }.asSequence()
}
fun deployBridges(node: NodeInfo) {
gatherAddresses(node)
.forEach {
sendBridgeControl(BridgeControl.Create(myIdentity.toStringShort(), it))
}
}
fun destroyBridges(node: NodeInfo) {
gatherAddresses(node)
.forEach {
sendBridgeControl(BridgeControl.Delete(myIdentity.toStringShort(), it))
}
}
when (change) {
is NetworkMapCache.MapChange.Added -> {
deployBridges(change.node)
}
is NetworkMapCache.MapChange.Removed -> {
destroyBridges(change.node)
}
is NetworkMapCache.MapChange.Modified -> {
destroyBridges(change.previousNode)
deployBridges(change.node)
}
}
}
private fun enumerateBridges(session: ClientSession, inboxes: List<String>) {
val requiredBridges = mutableListOf<BridgeEntry>()
fun createBridgeEntry(queueName: SimpleString) {
val keyHash = queueName.substring(PEERS_PREFIX.length)
val peers = networkMap.getNodesByOwningKeyIndex(keyHash)
for (node in peers) {
val bridge = BridgeEntry(queueName.toString(), node.addresses, node.legalIdentities.map { it.name })
requiredBridges += bridge
knownQueues += queueName.toString()
}
}
val queues = session.addressQuery(SimpleString("$PEERS_PREFIX#")).queueNames
for (queue in queues) {
createBridgeEntry(queue)
}
val startupMessage = BridgeControl.NodeToBridgeSnapshot(myIdentity.toStringShort(), inboxes, requiredBridges)
sendBridgeControl(startupMessage)
}
private fun resumeMessageRedelivery() { private fun resumeMessageRedelivery() {
messagesToRedeliver.forEach { retryId, (message, target) -> messagesToRedeliver.forEach { retryId, (message, target) ->
send(message, target, retryId) send(message, target, retryId)
@ -343,12 +445,18 @@ class P2PMessagingClient(config: NodeConfiguration,
check(artemis.started != null) check(artemis.started != null)
val prevRunning = running val prevRunning = running
running = false running = false
networkChangeSubscription?.unsubscribe()
val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice") val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice")
try { try {
c.close() c.close()
} catch (e: ActiveMQObjectClosedException) { } catch (e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do. // Ignore it: this can happen if the server has gone away before we do.
} }
try {
bridgeNotifyConsumer!!.close()
} catch (e: ActiveMQObjectClosedException) {
// Ignore it: this can happen if the server has gone away before we do.
}
p2pConsumer = null p2pConsumer = null
val s = serviceConsumer val s = serviceConsumer
try { try {
@ -357,6 +465,7 @@ class P2PMessagingClient(config: NodeConfiguration,
// Ignore it: this can happen if the server has gone away before we do. // Ignore it: this can happen if the server has gone away before we do.
} }
serviceConsumer = null serviceConsumer = null
knownQueues.clear()
prevRunning prevRunning
} }
if (running && !nodeExecutor.isOnThread) { if (running && !nodeExecutor.isOnThread) {
@ -468,13 +577,25 @@ class P2PMessagingClient(config: NodeConfiguration,
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */ /** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
private fun createQueueIfAbsent(queueName: String) { private fun createQueueIfAbsent(queueName: String) {
state.alreadyLocked { if (!knownQueues.contains(queueName)) {
val session = artemis.started!!.session state.alreadyLocked {
val queueQuery = session.queueQuery(SimpleString(queueName)) val session = artemis.started!!.session
if (!queueQuery.isExists) { val queueQuery = session.queueQuery(SimpleString(queueName))
log.info("Create fresh queue $queueName bound on same address") if (!queueQuery.isExists) {
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) log.info("Create fresh queue $queueName bound on same address")
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
if (queueName.startsWith(PEERS_PREFIX)) {
val keyHash = queueName.substring(PEERS_PREFIX.length)
val peers = networkMap.getNodesByOwningKeyIndex(keyHash)
for (node in peers) {
val bridge = BridgeEntry(queueName, node.addresses, node.legalIdentities.map { it.name })
val createBridgeMessage = BridgeControl.Create(myIdentity.toStringShort(), bridge)
sendBridgeControl(createBridgeMessage)
}
}
}
} }
knownQueues += queueName
} }
} }

View File

@ -13,7 +13,6 @@ import net.corda.core.flows.NotaryException
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.node.services.NotaryService import net.corda.core.node.services.NotaryService
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.UniquenessProvider import net.corda.core.node.services.UniquenessProvider
import net.corda.core.schemas.PersistentStateRef import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.deserialize import net.corda.core.serialization.deserialize
@ -54,8 +53,7 @@ class BFTNonValidatingNotaryService(
// Replica startup must be in parallel with other replicas, otherwise the constructor may not return: // Replica startup must be in parallel with other replicas, otherwise the constructor may not return:
thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) { thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) {
configHandle.use { configHandle.use {
val timeWindowChecker = TimeWindowChecker(services.clock) val replica = Replica(it, replicaId, { createMap() }, services, notaryIdentityKey)
val replica = Replica(it, replicaId, { createMap() }, services, notaryIdentityKey, timeWindowChecker)
replicaHolder.set(replica) replicaHolder.set(replica)
log.info("BFT SMaRt replica $replicaId is running.") log.info("BFT SMaRt replica $replicaId is running.")
} }
@ -131,8 +129,7 @@ class BFTNonValidatingNotaryService(
replicaId: Int, replicaId: Int,
createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistedCommittedState, PersistentStateRef>, createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistedCommittedState, PersistentStateRef>,
services: ServiceHubInternal, services: ServiceHubInternal,
notaryIdentityKey: PublicKey, notaryIdentityKey: PublicKey) : BFTSMaRt.Replica(config, replicaId, createMap, services, notaryIdentityKey) {
timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Replica(config, replicaId, createMap, services, notaryIdentityKey, timeWindowChecker) {
override fun executeCommand(command: ByteArray): ByteArray { override fun executeCommand(command: ByteArray): ByteArray {
val request = command.deserialize<BFTSMaRt.CommitRequest>() val request = command.deserialize<BFTSMaRt.CommitRequest>()
@ -146,7 +143,7 @@ class BFTNonValidatingNotaryService(
val id = ftx.id val id = ftx.id
val inputs = ftx.inputs val inputs = ftx.inputs
val notary = ftx.notary val notary = ftx.notary
validateTimeWindow(ftx.timeWindow) NotaryService.validateTimeWindow(services.clock, ftx.timeWindow)
if (notary !in services.myInfo.legalIdentities) throw NotaryException(NotaryError.WrongNotary) if (notary !in services.myInfo.legalIdentities) throw NotaryException(NotaryError.WrongNotary)
commitInputStates(inputs, id, callerIdentity) commitInputStates(inputs, id, callerIdentity)
log.debug { "Inputs committed successfully, signing $id" } log.debug { "Inputs committed successfully, signing $id" }

View File

@ -13,14 +13,12 @@ import bftsmart.tom.server.defaultservices.DefaultRecoverable
import bftsmart.tom.server.defaultservices.DefaultReplier import bftsmart.tom.server.defaultservices.DefaultReplier
import bftsmart.tom.util.Extractor import bftsmart.tom.util.Extractor
import net.corda.core.contracts.StateRef import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.* import net.corda.core.crypto.*
import net.corda.core.flows.NotaryError import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException import net.corda.core.flows.NotaryException
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.declaredField import net.corda.core.internal.declaredField
import net.corda.core.internal.toTypedArray import net.corda.core.internal.toTypedArray
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.UniquenessProvider import net.corda.core.node.services.UniquenessProvider
import net.corda.core.schemas.PersistentStateRef import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
@ -178,8 +176,7 @@ object BFTSMaRt {
createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx,
BFTNonValidatingNotaryService.PersistedCommittedState, PersistentStateRef>, BFTNonValidatingNotaryService.PersistedCommittedState, PersistentStateRef>,
protected val services: ServiceHubInternal, protected val services: ServiceHubInternal,
protected val notaryIdentityKey: PublicKey, protected val notaryIdentityKey: PublicKey) : DefaultRecoverable() {
private val timeWindowChecker: TimeWindowChecker) : DefaultRecoverable() {
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
} }
@ -218,7 +215,7 @@ object BFTSMaRt {
/** /**
* Implement logic to execute the command and commit the transaction to the log. * Implement logic to execute the command and commit the transaction to the log.
* Helper methods are provided for transaction processing: [commitInputStates], [validateTimeWindow], and [sign]. * Helper methods are provided for transaction processing: [commitInputStates], and [sign].
*/ */
abstract fun executeCommand(command: ByteArray): ByteArray? abstract fun executeCommand(command: ByteArray): ByteArray?
@ -245,11 +242,6 @@ object BFTSMaRt {
} }
} }
protected fun validateTimeWindow(t: TimeWindow?) {
if (t != null && !timeWindowChecker.isValid(t))
throw NotaryException(NotaryError.TimeWindowInvalid)
}
protected fun sign(bytes: ByteArray): DigitalSignature.WithKey { protected fun sign(bytes: ByteArray): DigitalSignature.WithKey {
return services.database.transaction { services.keyManagementService.sign(bytes, notaryIdentityKey) } return services.database.transaction { services.keyManagementService.sign(bytes, notaryIdentityKey) }
} }

View File

@ -3,7 +3,6 @@ package net.corda.node.services.transactions
import net.corda.core.flows.FlowSession import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotaryFlow import net.corda.core.flows.NotaryFlow
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.node.services.TrustedAuthorityNotaryService
import java.security.PublicKey import java.security.PublicKey
@ -13,8 +12,6 @@ class RaftNonValidatingNotaryService(
override val notaryIdentityKey: PublicKey, override val notaryIdentityKey: PublicKey,
override val uniquenessProvider: RaftUniquenessProvider override val uniquenessProvider: RaftUniquenessProvider
) : TrustedAuthorityNotaryService() { ) : TrustedAuthorityNotaryService() {
override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service { override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service {
return NonValidatingNotaryFlow(otherPartySession, this) return NonValidatingNotaryFlow(otherPartySession, this)
} }

View File

@ -3,7 +3,6 @@ package net.corda.node.services.transactions
import net.corda.core.flows.FlowSession import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotaryFlow import net.corda.core.flows.NotaryFlow
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.node.services.TrustedAuthorityNotaryService
import java.security.PublicKey import java.security.PublicKey
@ -13,8 +12,6 @@ class RaftValidatingNotaryService(
override val notaryIdentityKey: PublicKey, override val notaryIdentityKey: PublicKey,
override val uniquenessProvider: RaftUniquenessProvider override val uniquenessProvider: RaftUniquenessProvider
) : TrustedAuthorityNotaryService() { ) : TrustedAuthorityNotaryService() {
override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service { override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service {
return ValidatingNotaryFlow(otherPartySession, this) return ValidatingNotaryFlow(otherPartySession, this)
} }

View File

@ -2,14 +2,12 @@ package net.corda.node.services.transactions
import net.corda.core.flows.FlowSession import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotaryFlow import net.corda.core.flows.NotaryFlow
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import java.security.PublicKey import java.security.PublicKey
/** A simple Notary service that does not perform transaction validation */ /** A simple Notary service that does not perform transaction validation */
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
override val timeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider = PersistentUniquenessProvider() override val uniquenessProvider = PersistentUniquenessProvider()
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = NonValidatingNotaryFlow(otherPartySession, this) override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = NonValidatingNotaryFlow(otherPartySession, this)

View File

@ -2,15 +2,12 @@ package net.corda.node.services.transactions
import net.corda.core.flows.FlowSession import net.corda.core.flows.FlowSession
import net.corda.core.flows.NotaryFlow import net.corda.core.flows.NotaryFlow
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import java.security.PublicKey import java.security.PublicKey
/** A Notary service that validates the transaction chain of the submitted transaction before committing it */ /** A Notary service that validates the transaction chain of the submitted transaction before committing it */
class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
override val timeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider = PersistentUniquenessProvider() override val uniquenessProvider = PersistentUniquenessProvider()
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = ValidatingNotaryFlow(otherPartySession, this) override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = ValidatingNotaryFlow(otherPartySession, this)

View File

@ -32,7 +32,6 @@ enterpriseConfiguration = {
waitInterval = 40000 waitInterval = 40000
} }
} }
useAMQPBridges = true
rpcSettings = { rpcSettings = {
useSsl = false useSsl = false
standAloneBroker = false standAloneBroker = false

View File

@ -2,7 +2,6 @@ package net.corda.node.services.messaging
import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.context.AuthServiceId
import net.corda.core.crypto.generateKeyPair import net.corda.core.crypto.generateKeyPair
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import com.codahale.metrics.MetricRegistry import com.codahale.metrics.MetricRegistry
@ -10,8 +9,6 @@ import net.corda.core.crypto.generateKeyPair
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.configureDatabase import net.corda.node.internal.configureDatabase
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.config.configureWithDevSSLCertificate
@ -59,7 +56,6 @@ class ArtemisMessagingTest {
private lateinit var config: NodeConfiguration private lateinit var config: NodeConfiguration
private lateinit var database: CordaPersistence private lateinit var database: CordaPersistence
private lateinit var securityManager: RPCSecurityManager
private var messagingClient: P2PMessagingClient? = null private var messagingClient: P2PMessagingClient? = null
private var messagingServer: ArtemisMessagingServer? = null private var messagingServer: ArtemisMessagingServer? = null
@ -67,7 +63,6 @@ class ArtemisMessagingTest {
@Before @Before
fun setUp() { fun setUp() {
securityManager = RPCSecurityManagerImpl.fromUserList(users = emptyList(), id = AuthServiceId("TEST"))
abstract class AbstractNodeConfiguration : NodeConfiguration abstract class AbstractNodeConfiguration : NodeConfiguration
config = rigorousMock<AbstractNodeConfiguration>().also { config = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath()).whenever(it).baseDirectory doReturn(temporaryFolder.root.toPath()).whenever(it).baseDirectory
@ -78,7 +73,6 @@ class ArtemisMessagingTest {
doReturn("").whenever(it).exportJMXto doReturn("").whenever(it).exportJMXto
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
doReturn(5).whenever(it).messageRedeliveryDelaySeconds doReturn(5).whenever(it).messageRedeliveryDelaySeconds
doReturn(true).whenever(it).useAMQPBridges
} }
LogHelper.setLevel(PersistentUniquenessProvider::class) LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), rigorousMock()) database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), rigorousMock())
@ -181,6 +175,7 @@ class ArtemisMessagingTest {
null, null,
ServiceAffinityExecutor("ArtemisMessagingTests", 1), ServiceAffinityExecutor("ArtemisMessagingTests", 1),
database, database,
networkMapCache,
maxMessageSize = maxMessageSize).apply { maxMessageSize = maxMessageSize).apply {
config.configureWithDevSSLCertificate() config.configureWithDevSSLCertificate()
messagingClient = this messagingClient = this
@ -189,7 +184,7 @@ class ArtemisMessagingTest {
} }
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer { private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
return ArtemisMessagingServer(config, local, networkMapCache, maxMessageSize).apply { return ArtemisMessagingServer(config, local, maxMessageSize).apply {
config.configureWithDevSSLCertificate() config.configureWithDevSSLCertificate()
messagingServer = this messagingServer = this
} }

View File

@ -22,7 +22,6 @@ import java.security.SignatureException
// START 1 // START 1
@CordaService @CordaService
class MyCustomValidatingNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { class MyCustomValidatingNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
override val timeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider = PersistentUniquenessProvider() override val uniquenessProvider = PersistentUniquenessProvider()
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this) override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this)

View File

@ -5,7 +5,6 @@ import com.google.common.jimfs.Jimfs
import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.DoNotImplement import net.corda.core.DoNotImplement
import net.corda.core.crypto.entropyToKeyPair
import net.corda.core.crypto.Crypto import net.corda.core.crypto.Crypto
import net.corda.core.crypto.random63BitValue import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
@ -515,7 +514,6 @@ private fun mockNodeConfiguration(): NodeConfiguration {
doReturn(5).whenever(it).messageRedeliveryDelaySeconds doReturn(5).whenever(it).messageRedeliveryDelaySeconds
doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec
doReturn(null).whenever(it).devModeOptions doReturn(null).whenever(it).devModeOptions
doReturn(true).whenever(it).useAMQPBridges
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
} }
} }