mirror of
https://github.com/corda/corda.git
synced 2024-12-28 16:58:55 +00:00
Merge pull request #418 from corda/mnesbit-merge-20180126
Merge up from OS including bridge control protocol
This commit is contained in:
commit
8f0457866f
@ -1323,7 +1323,7 @@ public @interface net.corda.core.flows.InitiatingFlow
|
|||||||
@org.jetbrains.annotations.NotNull public String toString()
|
@org.jetbrains.annotations.NotNull public String toString()
|
||||||
##
|
##
|
||||||
@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$TimeWindowInvalid extends net.corda.core.flows.NotaryError
|
@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$TimeWindowInvalid extends net.corda.core.flows.NotaryError
|
||||||
public static final net.corda.core.flows.NotaryError$TimeWindowInvalid INSTANCE
|
@kotlin.jvm.JvmField @org.jetbrains.annotations.NotNull public static final net.corda.core.flows.NotaryError$TimeWindowInvalid INSTANCE
|
||||||
##
|
##
|
||||||
@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$TransactionInvalid extends net.corda.core.flows.NotaryError
|
@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$TransactionInvalid extends net.corda.core.flows.NotaryError
|
||||||
public <init>(Throwable)
|
public <init>(Throwable)
|
||||||
@ -1921,7 +1921,7 @@ public final class net.corda.core.node.services.TimeWindowChecker extends java.l
|
|||||||
public <init>()
|
public <init>()
|
||||||
public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party)
|
public final void commitInputStates(List, net.corda.core.crypto.SecureHash, net.corda.core.identity.Party)
|
||||||
@org.jetbrains.annotations.NotNull protected org.slf4j.Logger getLog()
|
@org.jetbrains.annotations.NotNull protected org.slf4j.Logger getLog()
|
||||||
@org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker()
|
@org.jetbrains.annotations.NotNull protected net.corda.core.node.services.TimeWindowChecker getTimeWindowChecker()
|
||||||
@org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.UniquenessProvider getUniquenessProvider()
|
@org.jetbrains.annotations.NotNull protected abstract net.corda.core.node.services.UniquenessProvider getUniquenessProvider()
|
||||||
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.TransactionSignature sign(net.corda.core.crypto.SecureHash)
|
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.TransactionSignature sign(net.corda.core.crypto.SecureHash)
|
||||||
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.DigitalSignature$WithKey sign(byte[])
|
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.DigitalSignature$WithKey sign(byte[])
|
||||||
|
@ -18,6 +18,7 @@ import net.corda.core.utilities.ProgressTracker
|
|||||||
import net.corda.core.utilities.UntrustworthyData
|
import net.corda.core.utilities.UntrustworthyData
|
||||||
import net.corda.core.utilities.unwrap
|
import net.corda.core.utilities.unwrap
|
||||||
import java.security.SignatureException
|
import java.security.SignatureException
|
||||||
|
import java.time.Instant
|
||||||
import java.util.function.Predicate
|
import java.util.function.Predicate
|
||||||
|
|
||||||
class NotaryFlow {
|
class NotaryFlow {
|
||||||
@ -167,7 +168,14 @@ sealed class NotaryError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Occurs when time specified in the [TimeWindow] command is outside the allowed tolerance. */
|
/** Occurs when time specified in the [TimeWindow] command is outside the allowed tolerance. */
|
||||||
object TimeWindowInvalid : NotaryError()
|
data class TimeWindowInvalid(val currentTime: Instant, val txTimeWindow: TimeWindow) : NotaryError() {
|
||||||
|
override fun toString() = "Current time $currentTime is outside the time bounds specified by the transaction: $txTimeWindow"
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
@JvmField @Deprecated("Here only for binary compatibility purposes, do not use.")
|
||||||
|
val INSTANCE = TimeWindowInvalid(Instant.EPOCH, TimeWindow.fromOnly(Instant.EPOCH))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
data class TransactionInvalid(val cause: Throwable) : NotaryError() {
|
data class TransactionInvalid(val cause: Throwable) : NotaryError() {
|
||||||
override fun toString() = cause.toString()
|
override fun toString() = cause.toString()
|
||||||
|
@ -12,6 +12,7 @@ import net.corda.core.serialization.serialize
|
|||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
|
import java.time.Clock
|
||||||
|
|
||||||
abstract class NotaryService : SingletonSerializeAsToken() {
|
abstract class NotaryService : SingletonSerializeAsToken() {
|
||||||
companion object {
|
companion object {
|
||||||
@ -27,6 +28,24 @@ abstract class NotaryService : SingletonSerializeAsToken() {
|
|||||||
if (custom) append(".custom")
|
if (custom) append(".custom")
|
||||||
}.toString()
|
}.toString()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if the current instant provided by the clock falls within the specified time window.
|
||||||
|
*
|
||||||
|
* @throws NotaryException if current time is outside the specified time window. The exception contains
|
||||||
|
* the [NotaryError.TimeWindowInvalid] error.
|
||||||
|
*/
|
||||||
|
@JvmStatic
|
||||||
|
@Throws(NotaryException::class)
|
||||||
|
fun validateTimeWindow(clock: Clock, timeWindow: TimeWindow?) {
|
||||||
|
if (timeWindow == null) return
|
||||||
|
val currentTime = clock.instant()
|
||||||
|
if (currentTime !in timeWindow) {
|
||||||
|
throw NotaryException(
|
||||||
|
NotaryError.TimeWindowInvalid(currentTime, timeWindow)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract val services: ServiceHub
|
abstract val services: ServiceHub
|
||||||
@ -52,14 +71,9 @@ abstract class TrustedAuthorityNotaryService : NotaryService() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
protected open val log: Logger get() = staticLog
|
protected open val log: Logger get() = staticLog
|
||||||
// TODO: specify the valid time window in config, and convert TimeWindowChecker to a utility method
|
|
||||||
protected abstract val timeWindowChecker: TimeWindowChecker
|
|
||||||
protected abstract val uniquenessProvider: UniquenessProvider
|
protected abstract val uniquenessProvider: UniquenessProvider
|
||||||
|
|
||||||
fun validateTimeWindow(t: TimeWindow?) {
|
fun validateTimeWindow(t: TimeWindow?) = NotaryService.validateTimeWindow(services.clock, t)
|
||||||
if (t != null && !timeWindowChecker.isValid(t))
|
|
||||||
throw NotaryException(NotaryError.TimeWindowInvalid)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that
|
* A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that
|
||||||
@ -98,4 +112,7 @@ abstract class TrustedAuthorityNotaryService : NotaryService() {
|
|||||||
val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID))
|
val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID))
|
||||||
return services.keyManagementService.sign(signableData, notaryIdentityKey)
|
return services.keyManagementService.sign(signableData, notaryIdentityKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated("This property is no longer used") @Suppress("DEPRECATION")
|
||||||
|
protected open val timeWindowChecker: TimeWindowChecker get() = throw UnsupportedOperationException("No default implementation, need to override")
|
||||||
}
|
}
|
@ -6,6 +6,7 @@ import java.time.Clock
|
|||||||
/**
|
/**
|
||||||
* Checks if the current instant provided by the input clock falls within the provided time-window.
|
* Checks if the current instant provided by the input clock falls within the provided time-window.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated("This class is no longer used")
|
||||||
class TimeWindowChecker(val clock: Clock = Clock.systemUTC()) {
|
class TimeWindowChecker(val clock: Clock = Clock.systemUTC()) {
|
||||||
fun isValid(timeWindow: TimeWindow): Boolean = clock.instant() in timeWindow
|
fun isValid(timeWindow: TimeWindow): Boolean = clock.instant() in timeWindow
|
||||||
}
|
}
|
||||||
|
@ -1,39 +0,0 @@
|
|||||||
package net.corda.core.node.services
|
|
||||||
|
|
||||||
import net.corda.core.contracts.TimeWindow
|
|
||||||
import net.corda.core.utilities.seconds
|
|
||||||
import org.junit.Test
|
|
||||||
import java.time.Clock
|
|
||||||
import java.time.Instant
|
|
||||||
import java.time.ZoneOffset
|
|
||||||
import kotlin.test.assertFalse
|
|
||||||
import kotlin.test.assertTrue
|
|
||||||
|
|
||||||
class TimeWindowCheckerTests {
|
|
||||||
val clock: Clock = Clock.fixed(Instant.now(), ZoneOffset.UTC)
|
|
||||||
val timeWindowChecker = TimeWindowChecker(clock)
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun `should return true for valid time-window`() {
|
|
||||||
val now = clock.instant()
|
|
||||||
val timeWindowBetween = TimeWindow.between(now - 10.seconds, now + 10.seconds)
|
|
||||||
val timeWindowFromOnly = TimeWindow.fromOnly(now - 10.seconds)
|
|
||||||
val timeWindowUntilOnly = TimeWindow.untilOnly(now + 10.seconds)
|
|
||||||
assertTrue { timeWindowChecker.isValid(timeWindowBetween) }
|
|
||||||
assertTrue { timeWindowChecker.isValid(timeWindowFromOnly) }
|
|
||||||
assertTrue { timeWindowChecker.isValid(timeWindowUntilOnly) }
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun `should return false for invalid time-window`() {
|
|
||||||
val now = clock.instant()
|
|
||||||
val timeWindowBetweenPast = TimeWindow.between(now - 10.seconds, now - 2.seconds)
|
|
||||||
val timeWindowBetweenFuture = TimeWindow.between(now + 2.seconds, now + 10.seconds)
|
|
||||||
val timeWindowFromOnlyFuture = TimeWindow.fromOnly(now + 10.seconds)
|
|
||||||
val timeWindowUntilOnlyPast = TimeWindow.untilOnly(now - 10.seconds)
|
|
||||||
assertFalse { timeWindowChecker.isValid(timeWindowBetweenPast) }
|
|
||||||
assertFalse { timeWindowChecker.isValid(timeWindowBetweenFuture) }
|
|
||||||
assertFalse { timeWindowChecker.isValid(timeWindowFromOnlyFuture) }
|
|
||||||
assertFalse { timeWindowChecker.isValid(timeWindowUntilOnlyPast) }
|
|
||||||
}
|
|
||||||
}
|
|
@ -191,6 +191,9 @@ R3 Corda 3.0 Developer Preview
|
|||||||
|
|
||||||
* Enterprise Corda only: Compatibility with PostgreSQL 9.6 database.
|
* Enterprise Corda only: Compatibility with PostgreSQL 9.6 database.
|
||||||
|
|
||||||
|
* Move to a message based control of peer to peer bridge formation to allow for future out of process bridging components.
|
||||||
|
This removes the legacy Artemis bridges completely, so the ``useAMQPBridges`` configuration property has been removed.
|
||||||
|
|
||||||
.. _changelog_v2:
|
.. _changelog_v2:
|
||||||
|
|
||||||
Corda 2.0
|
Corda 2.0
|
||||||
|
@ -206,11 +206,6 @@ path to the node's base directory.
|
|||||||
:exportJMXTo: If set to ``http``, will enable JMX metrics reporting via the Jolokia HTTP/JSON agent.
|
:exportJMXTo: If set to ``http``, will enable JMX metrics reporting via the Jolokia HTTP/JSON agent.
|
||||||
Default Jolokia access url is http://127.0.0.1:7005/jolokia/
|
Default Jolokia access url is http://127.0.0.1:7005/jolokia/
|
||||||
|
|
||||||
.. _config_amqp_bridge:
|
|
||||||
|
|
||||||
:useAMQPBridges: Optionally can be set to ``false`` to use Artemis CORE Bridges for peer-to-peer communications.
|
|
||||||
Otherwise, defaults to ``true`` and the AMQP 1.0 protocol will be used for message transfer between nodes.
|
|
||||||
|
|
||||||
:transactionCacheSizeMegaBytes: Optionally specify how much memory should be used for caching of ledger transactions in memory.
|
:transactionCacheSizeMegaBytes: Optionally specify how much memory should be used for caching of ledger transactions in memory.
|
||||||
Otherwise defaults to 8MB plus 5% of all heap memory above 300MB.
|
Otherwise defaults to 8MB plus 5% of all heap memory above 300MB.
|
||||||
|
|
||||||
|
@ -25,6 +25,8 @@ class ArtemisMessagingComponent {
|
|||||||
const val INTERNAL_PREFIX = "internal."
|
const val INTERNAL_PREFIX = "internal."
|
||||||
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." //TODO Come up with better name for common peers/services queue
|
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." //TODO Come up with better name for common peers/services queue
|
||||||
const val P2P_PREFIX = "p2p.inbound."
|
const val P2P_PREFIX = "p2p.inbound."
|
||||||
|
const val BRIDGE_CONTROL = "${INTERNAL_PREFIX}bridge.control"
|
||||||
|
const val BRIDGE_NOTIFY = "${INTERNAL_PREFIX}bridge.notify"
|
||||||
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
|
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,50 @@
|
|||||||
|
package net.corda.nodeapi.internal
|
||||||
|
|
||||||
|
import net.corda.core.identity.CordaX500Name
|
||||||
|
import net.corda.core.serialization.CordaSerializable
|
||||||
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The information required to construct a bridge to a remote peer.
|
||||||
|
* @property queueName The local source queue from which to move messages.
|
||||||
|
* @property targets The list of TCP connection targets on which the peer resides
|
||||||
|
* @property legalNames The list of acceptable [CordaX500Name] names that should be presented as subject of the validated peer TLS certificate.
|
||||||
|
*/
|
||||||
|
@CordaSerializable
|
||||||
|
data class BridgeEntry(val queueName: String, val targets: List<NetworkHostAndPort>, val legalNames: List<CordaX500Name>)
|
||||||
|
|
||||||
|
sealed class BridgeControl {
|
||||||
|
/**
|
||||||
|
* This message is sent on node start to inform any bridges of valid inbound peer-to-peer topics and pre-existing outbound queues needing bridging.
|
||||||
|
* @property nodeIdentity This is used for informational purposes to identify the originating node instance.
|
||||||
|
* @property inboxQueues The list of P2P inbox queue names/addresses, which could be used to filter inbound messages and prevent any identity spoofing.
|
||||||
|
* @property sendQueues The list [BridgeEntry] for all pre-existing local queues requiring a bridge to a remote peer.
|
||||||
|
*/
|
||||||
|
@CordaSerializable
|
||||||
|
data class NodeToBridgeSnapshot(val nodeIdentity: String, val inboxQueues: List<String>, val sendQueues: List<BridgeEntry>) : BridgeControl()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This message is sent on bridge start to re-request NodeToBridgeSnapshot information from all nodes on the broker.
|
||||||
|
* @property bridgeIdentity This is used for informational purposes to identify the originating bridge instance.
|
||||||
|
*/
|
||||||
|
@CordaSerializable
|
||||||
|
data class BridgeToNodeSnapshotRequest(val bridgeIdentity: String) : BridgeControl()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This message is sent to any active bridges to create a new bridge if one does not already exist. It may also be sent if updated
|
||||||
|
* information arrives from the network map to allow connection details of a pre-existing queue to now be resolved.
|
||||||
|
* @property nodeIdentity This is used for informational purposes to identify the originating node instance.
|
||||||
|
* @property bridgeInfo The connection details of the new bridge.
|
||||||
|
*/
|
||||||
|
@CordaSerializable
|
||||||
|
data class Create(val nodeIdentity: String, val bridgeInfo: BridgeEntry) : BridgeControl()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This message is sent to any active bridges to tear down an existing bridge. Typically this is only done when there is a change in network map details for a peer.
|
||||||
|
* The source queue is not affected by this operation and it is the responsibility of the node to ensure there are no unsent messages and to delete the durable queue.
|
||||||
|
* @property nodeIdentity This is used for informational purposes to identify the originating node instance.
|
||||||
|
* @property bridgeInfo The connection details of the bridge to be removed
|
||||||
|
*/
|
||||||
|
@CordaSerializable
|
||||||
|
data class Delete(val nodeIdentity: String, val bridgeInfo: BridgeEntry) : BridgeControl()
|
||||||
|
}
|
@ -1,18 +1,18 @@
|
|||||||
package net.corda.node.amqp
|
package net.corda.node.amqp
|
||||||
|
|
||||||
import com.nhaarman.mockito_kotlin.any
|
|
||||||
import com.nhaarman.mockito_kotlin.doReturn
|
import com.nhaarman.mockito_kotlin.doReturn
|
||||||
import com.nhaarman.mockito_kotlin.whenever
|
import com.nhaarman.mockito_kotlin.whenever
|
||||||
import net.corda.core.crypto.toStringShort
|
import net.corda.core.crypto.toStringShort
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.node.NodeInfo
|
|
||||||
import net.corda.core.node.services.NetworkMapCache
|
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.internal.protonwrapper.netty.AMQPServer
|
import net.corda.node.internal.protonwrapper.netty.AMQPServer
|
||||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
import net.corda.node.services.config.CertChainPolicyConfig
|
||||||
import net.corda.node.services.config.*
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
|
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||||
|
import net.corda.node.services.messaging.AMQPBridgeManager
|
||||||
import net.corda.node.services.messaging.ArtemisMessagingClient
|
import net.corda.node.services.messaging.ArtemisMessagingClient
|
||||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||||
|
import net.corda.node.services.messaging.BridgeManager
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
import net.corda.testing.core.*
|
import net.corda.testing.core.*
|
||||||
import net.corda.testing.internal.rigorousMock
|
import net.corda.testing.internal.rigorousMock
|
||||||
@ -24,7 +24,6 @@ import org.junit.Ignore
|
|||||||
import org.junit.Rule
|
import org.junit.Rule
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.junit.rules.TemporaryFolder
|
import org.junit.rules.TemporaryFolder
|
||||||
import rx.Observable
|
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
import kotlin.test.assertNotEquals
|
import kotlin.test.assertNotEquals
|
||||||
@ -52,7 +51,7 @@ class AMQPBridgeTest {
|
|||||||
fun `test acked and nacked messages`() {
|
fun `test acked and nacked messages`() {
|
||||||
// Create local queue
|
// Create local queue
|
||||||
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
|
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
|
||||||
val (artemisServer, artemisClient) = createArtemis(sourceQueueName)
|
val (artemisServer, artemisClient, bridgeManager) = createArtemis(sourceQueueName)
|
||||||
|
|
||||||
// Pre-populate local queue with 3 messages
|
// Pre-populate local queue with 3 messages
|
||||||
val artemis = artemisClient.started!!
|
val artemis = artemisClient.started!!
|
||||||
@ -117,54 +116,25 @@ class AMQPBridgeTest {
|
|||||||
}
|
}
|
||||||
artemis.producer.send(sourceQueueName, artemisMessage)
|
artemis.producer.send(sourceQueueName, artemisMessage)
|
||||||
|
|
||||||
val received5 = receive.next()
|
|
||||||
val messageID5 = received5.applicationProperties["CountProp"] as Int
|
|
||||||
assertArrayEquals("Test_end".toByteArray(), received5.payload)
|
|
||||||
assertEquals(-1, messageID5) // next message should be in order
|
|
||||||
received5.complete(true)
|
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
val received5 = receive.next()
|
||||||
|
val messageID5 = received5.applicationProperties["CountProp"] as Int
|
||||||
|
if (messageID5 != 2) { // we may get a duplicate of the interrupted message, in which case skip
|
||||||
|
assertEquals(-1, messageID5) // next message should be in order though
|
||||||
|
assertArrayEquals("Test_end".toByteArray(), received5.payload)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
received5.complete(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
bridgeManager.stop()
|
||||||
amqpServer.stop()
|
amqpServer.stop()
|
||||||
artemisClient.stop()
|
artemisClient.stop()
|
||||||
artemisServer.stop()
|
artemisServer.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
private fun createArtemis(sourceQueueName: String?): Triple<ArtemisMessagingServer, ArtemisMessagingClient, BridgeManager> {
|
||||||
fun `Test legacy bridge still works`() {
|
|
||||||
// Create local queue
|
|
||||||
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
|
|
||||||
val (artemisLegacyServer, artemisLegacyClient) = createLegacyArtemis(sourceQueueName)
|
|
||||||
|
|
||||||
|
|
||||||
val (artemisServer, artemisClient) = createArtemis(null)
|
|
||||||
val inbox = ArtemisMessagingComponent.RemoteInboxAddress(BOB.party).queueName
|
|
||||||
artemisClient.started!!.session.createQueue(inbox, RoutingType.ANYCAST, inbox, true)
|
|
||||||
|
|
||||||
val artemis = artemisLegacyClient.started!!
|
|
||||||
for (i in 0 until 3) {
|
|
||||||
val artemisMessage = artemis.session.createMessage(true).apply {
|
|
||||||
putIntProperty("CountProp", i)
|
|
||||||
writeBodyBufferBytes("Test$i".toByteArray())
|
|
||||||
// Use the magic deduplication property built into Artemis as our message identity too
|
|
||||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
|
||||||
}
|
|
||||||
artemis.producer.send(sourceQueueName, artemisMessage)
|
|
||||||
}
|
|
||||||
|
|
||||||
val subs = artemisClient.started!!.session.createConsumer(inbox)
|
|
||||||
for (i in 0 until 3) {
|
|
||||||
val msg = subs.receive()
|
|
||||||
val messageBody = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
|
|
||||||
assertArrayEquals("Test$i".toByteArray(), messageBody)
|
|
||||||
assertEquals(i, msg.getIntProperty("CountProp"))
|
|
||||||
}
|
|
||||||
|
|
||||||
artemisClient.stop()
|
|
||||||
artemisServer.stop()
|
|
||||||
artemisLegacyClient.stop()
|
|
||||||
artemisLegacyServer.stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun createArtemis(sourceQueueName: String?): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
|
||||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||||
doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory
|
doReturn(temporaryFolder.root.toPath() / "artemis").whenever(it).baseDirectory
|
||||||
doReturn(ALICE_NAME).whenever(it).myLegalName
|
doReturn(ALICE_NAME).whenever(it).myLegalName
|
||||||
@ -173,50 +143,21 @@ class AMQPBridgeTest {
|
|||||||
doReturn(artemisAddress).whenever(it).p2pAddress
|
doReturn(artemisAddress).whenever(it).p2pAddress
|
||||||
doReturn("").whenever(it).exportJMXto
|
doReturn("").whenever(it).exportJMXto
|
||||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||||
doReturn(true).whenever(it).useAMQPBridges
|
|
||||||
}
|
}
|
||||||
artemisConfig.configureWithDevSSLCertificate()
|
artemisConfig.configureWithDevSSLCertificate()
|
||||||
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
|
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, MAX_MESSAGE_SIZE)
|
||||||
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
|
|
||||||
doReturn(listOf(NodeInfo(listOf(amqpAddress), listOf(BOB.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
|
|
||||||
}
|
|
||||||
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort, networkMap, MAX_MESSAGE_SIZE)
|
|
||||||
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE)
|
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE)
|
||||||
artemisServer.start()
|
artemisServer.start()
|
||||||
artemisClient.start()
|
artemisClient.start()
|
||||||
|
val bridgeManager = AMQPBridgeManager(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE)
|
||||||
|
bridgeManager.start()
|
||||||
val artemis = artemisClient.started!!
|
val artemis = artemisClient.started!!
|
||||||
if (sourceQueueName != null) {
|
if (sourceQueueName != null) {
|
||||||
// Local queue for outgoing messages
|
// Local queue for outgoing messages
|
||||||
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
|
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
|
||||||
|
bridgeManager.deployBridge(sourceQueueName, amqpAddress, setOf(BOB.name))
|
||||||
}
|
}
|
||||||
return Pair(artemisServer, artemisClient)
|
return Triple(artemisServer, artemisClient, bridgeManager)
|
||||||
}
|
|
||||||
|
|
||||||
private fun createLegacyArtemis(sourceQueueName: String): Pair<ArtemisMessagingServer, ArtemisMessagingClient> {
|
|
||||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
|
||||||
doReturn(temporaryFolder.root.toPath() / "artemis2").whenever(it).baseDirectory
|
|
||||||
doReturn(BOB_NAME).whenever(it).myLegalName
|
|
||||||
doReturn("trustpass").whenever(it).trustStorePassword
|
|
||||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
|
||||||
doReturn(artemisAddress).whenever(it).p2pAddress
|
|
||||||
doReturn("").whenever(it).exportJMXto
|
|
||||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
|
||||||
doReturn(false).whenever(it).useAMQPBridges
|
|
||||||
doReturn(ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0))).whenever(it).activeMQServer
|
|
||||||
}
|
|
||||||
artemisConfig.configureWithDevSSLCertificate()
|
|
||||||
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
|
|
||||||
doReturn(Observable.never<NetworkMapCache.MapChange>()).whenever(it).changed
|
|
||||||
doReturn(listOf(NodeInfo(listOf(artemisAddress), listOf(ALICE.identity), 1, 1L))).whenever(it).getNodesByOwningKeyIndex(any())
|
|
||||||
}
|
|
||||||
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisPort2, networkMap, MAX_MESSAGE_SIZE)
|
|
||||||
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress2, MAX_MESSAGE_SIZE)
|
|
||||||
artemisServer.start()
|
|
||||||
artemisClient.start()
|
|
||||||
val artemis = artemisClient.started!!
|
|
||||||
// Local queue for outgoing messages
|
|
||||||
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
|
|
||||||
return Pair(artemisServer, artemisClient)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun createAMQPServer(): AMQPServer {
|
private fun createAMQPServer(): AMQPServer {
|
||||||
|
@ -6,13 +6,11 @@ import io.netty.channel.EventLoopGroup
|
|||||||
import io.netty.channel.nio.NioEventLoopGroup
|
import io.netty.channel.nio.NioEventLoopGroup
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.node.services.NetworkMapCache
|
|
||||||
import net.corda.core.toFuture
|
import net.corda.core.toFuture
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.internal.protonwrapper.messages.MessageStatus
|
import net.corda.node.internal.protonwrapper.messages.MessageStatus
|
||||||
import net.corda.node.internal.protonwrapper.netty.AMQPClient
|
import net.corda.node.internal.protonwrapper.netty.AMQPClient
|
||||||
import net.corda.node.internal.protonwrapper.netty.AMQPServer
|
import net.corda.node.internal.protonwrapper.netty.AMQPServer
|
||||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
|
||||||
import net.corda.node.services.config.CertChainPolicyConfig
|
import net.corda.node.services.config.CertChainPolicyConfig
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||||
@ -27,7 +25,6 @@ import org.junit.Assert.assertArrayEquals
|
|||||||
import org.junit.Rule
|
import org.junit.Rule
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.junit.rules.TemporaryFolder
|
import org.junit.rules.TemporaryFolder
|
||||||
import rx.Observable.never
|
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
class ProtonWrapperTests {
|
class ProtonWrapperTests {
|
||||||
@ -227,14 +224,10 @@ class ProtonWrapperTests {
|
|||||||
doReturn(NetworkHostAndPort("0.0.0.0", artemisPort)).whenever(it).p2pAddress
|
doReturn(NetworkHostAndPort("0.0.0.0", artemisPort)).whenever(it).p2pAddress
|
||||||
doReturn("").whenever(it).exportJMXto
|
doReturn("").whenever(it).exportJMXto
|
||||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||||
doReturn(true).whenever(it).useAMQPBridges
|
|
||||||
}
|
}
|
||||||
artemisConfig.configureWithDevSSLCertificate()
|
artemisConfig.configureWithDevSSLCertificate()
|
||||||
|
|
||||||
val networkMap = rigorousMock<NetworkMapCacheInternal>().also {
|
val server = ArtemisMessagingServer(artemisConfig, artemisPort, MAX_MESSAGE_SIZE)
|
||||||
doReturn(never<NetworkMapCache.MapChange>()).whenever(it).changed
|
|
||||||
}
|
|
||||||
val server = ArtemisMessagingServer(artemisConfig, artemisPort, networkMap, MAX_MESSAGE_SIZE)
|
|
||||||
val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE)
|
val client = ArtemisMessagingClient(artemisConfig, NetworkHostAndPort("localhost", artemisPort), MAX_MESSAGE_SIZE)
|
||||||
server.start()
|
server.start()
|
||||||
client.start()
|
client.start()
|
||||||
|
@ -137,6 +137,7 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
override lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor
|
override lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor
|
||||||
|
|
||||||
private var messageBroker: ArtemisMessagingServer? = null
|
private var messageBroker: ArtemisMessagingServer? = null
|
||||||
|
private var bridgeControlListener: BridgeControlListener? = null
|
||||||
private var rpcBroker: ArtemisBroker? = null
|
private var rpcBroker: ArtemisBroker? = null
|
||||||
|
|
||||||
private var shutdownHook: ShutdownHook? = null
|
private var shutdownHook: ShutdownHook? = null
|
||||||
@ -152,6 +153,7 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
|
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
|
||||||
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) else startLocalRpcBroker()
|
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) BrokerAddresses(configuration.rpcOptions.address!!, configuration.rpcOptions.adminAddress) else startLocalRpcBroker()
|
||||||
val advertisedAddress = info.addresses.single()
|
val advertisedAddress = info.addresses.single()
|
||||||
|
bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
|
||||||
|
|
||||||
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())
|
printBasicNodeInfo("Incoming connection address", advertisedAddress.toString())
|
||||||
rpcServerAddresses?.let {
|
rpcServerAddresses?.let {
|
||||||
@ -171,6 +173,7 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
serviceIdentity,
|
serviceIdentity,
|
||||||
serverThread,
|
serverThread,
|
||||||
database,
|
database,
|
||||||
|
services.networkMapCache,
|
||||||
advertisedAddress,
|
advertisedAddress,
|
||||||
networkParameters.maxMessageSize)
|
networkParameters.maxMessageSize)
|
||||||
}
|
}
|
||||||
@ -194,7 +197,7 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
|
|
||||||
private fun makeLocalMessageBroker(): NetworkHostAndPort {
|
private fun makeLocalMessageBroker(): NetworkHostAndPort {
|
||||||
with(configuration) {
|
with(configuration) {
|
||||||
messageBroker = ArtemisMessagingServer(this, p2pAddress.port, services.networkMapCache, networkParameters.maxMessageSize)
|
messageBroker = ArtemisMessagingServer(this, p2pAddress.port, networkParameters.maxMessageSize)
|
||||||
return NetworkHostAndPort("localhost", p2pAddress.port)
|
return NetworkHostAndPort("localhost", p2pAddress.port)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -257,6 +260,11 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
runOnStop += this::close
|
runOnStop += this::close
|
||||||
start()
|
start()
|
||||||
}
|
}
|
||||||
|
// Start P2P bridge service
|
||||||
|
bridgeControlListener?.apply {
|
||||||
|
runOnStop += this::stop
|
||||||
|
start()
|
||||||
|
}
|
||||||
// Start up the MQ clients.
|
// Start up the MQ clients.
|
||||||
rpcMessagingClient?.run {
|
rpcMessagingClient?.run {
|
||||||
runOnStop += this::close
|
runOnStop += this::close
|
||||||
|
@ -402,6 +402,7 @@ internal class ConnectionStateMachine(serverMode: Boolean,
|
|||||||
private fun encodePayloadBytes(msg: SendableMessageImpl): ByteBuf {
|
private fun encodePayloadBytes(msg: SendableMessageImpl): ByteBuf {
|
||||||
val message = Proton.message() as ProtonJMessage
|
val message = Proton.message() as ProtonJMessage
|
||||||
message.body = Data(Binary(msg.payload))
|
message.body = Data(Binary(msg.payload))
|
||||||
|
message.isDurable = true
|
||||||
message.properties = Properties()
|
message.properties = Properties()
|
||||||
val appProperties = HashMap(msg.applicationProperties)
|
val appProperties = HashMap(msg.applicationProperties)
|
||||||
//TODO We shouldn't have to do this, but Artemis Server doesn't set the header on AMQP packets.
|
//TODO We shouldn't have to do this, but Artemis Server doesn't set the header on AMQP packets.
|
||||||
|
@ -48,7 +48,6 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
|||||||
val sshd: SSHDConfiguration?
|
val sshd: SSHDConfiguration?
|
||||||
val database: DatabaseConfig
|
val database: DatabaseConfig
|
||||||
val relay: RelayConfiguration?
|
val relay: RelayConfiguration?
|
||||||
val useAMQPBridges: Boolean get() = true
|
|
||||||
val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize
|
val transactionCacheSizeBytes: Long get() = defaultTransactionCacheSize
|
||||||
val attachmentContentCacheSizeBytes: Long get() = defaultAttachmentContentCacheSize
|
val attachmentContentCacheSizeBytes: Long get() = defaultAttachmentContentCacheSize
|
||||||
val attachmentCacheBound: Long get() = defaultAttachmentCacheBound
|
val attachmentCacheBound: Long get() = defaultAttachmentCacheBound
|
||||||
@ -161,7 +160,6 @@ data class NodeConfigurationImpl(
|
|||||||
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(),
|
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(),
|
||||||
override val sshd: SSHDConfiguration? = null,
|
override val sshd: SSHDConfiguration? = null,
|
||||||
override val database: DatabaseConfig = DatabaseConfig(exportHibernateJMXStatistics = devMode),
|
override val database: DatabaseConfig = DatabaseConfig(exportHibernateJMXStatistics = devMode),
|
||||||
override val useAMQPBridges: Boolean = true,
|
|
||||||
private val transactionCacheSizeMegaBytes: Int? = null,
|
private val transactionCacheSizeMegaBytes: Int? = null,
|
||||||
private val attachmentContentCacheSizeMegaBytes: Int? = null,
|
private val attachmentContentCacheSizeMegaBytes: Int? = null,
|
||||||
override val attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound,
|
override val attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound,
|
||||||
|
@ -3,6 +3,7 @@ package net.corda.node.services.messaging
|
|||||||
import io.netty.channel.EventLoopGroup
|
import io.netty.channel.EventLoopGroup
|
||||||
import io.netty.channel.nio.NioEventLoopGroup
|
import io.netty.channel.nio.NioEventLoopGroup
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
|
import net.corda.core.internal.VisibleForTesting
|
||||||
import net.corda.core.node.NodeInfo
|
import net.corda.core.node.NodeInfo
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.debug
|
import net.corda.core.utilities.debug
|
||||||
@ -32,7 +33,8 @@ import kotlin.concurrent.withLock
|
|||||||
* independent Session for message consumption.
|
* independent Session for message consumption.
|
||||||
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
|
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
|
||||||
*/
|
*/
|
||||||
internal class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress: NetworkHostAndPort, val maxMessageSize: Int) : BridgeManager {
|
@VisibleForTesting
|
||||||
|
class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress: NetworkHostAndPort, val maxMessageSize: Int) : BridgeManager {
|
||||||
|
|
||||||
private val lock = ReentrantLock()
|
private val lock = ReentrantLock()
|
||||||
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
|
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
|
||||||
@ -177,6 +179,13 @@ internal class AMQPBridgeManager(val config: NodeConfiguration, val p2pAddress:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun destroyBridge(queueName: String, hostAndPort: NetworkHostAndPort) {
|
||||||
|
lock.withLock {
|
||||||
|
val bridge = bridgeNameToBridgeMap.remove(getBridgeName(queueName, hostAndPort))
|
||||||
|
bridge?.stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override fun bridgeExists(bridgeName: String): Boolean = lock.withLock { bridgeNameToBridgeMap.containsKey(bridgeName) }
|
override fun bridgeExists(bridgeName: String): Boolean = lock.withLock { bridgeNameToBridgeMap.containsKey(bridgeName) }
|
||||||
|
|
||||||
override fun start() {
|
override fun start() {
|
||||||
|
@ -1,13 +1,9 @@
|
|||||||
package net.corda.node.services.messaging
|
package net.corda.node.services.messaging
|
||||||
|
|
||||||
import net.corda.core.crypto.AddressFormatException
|
|
||||||
import net.corda.core.identity.CordaX500Name
|
|
||||||
import net.corda.core.internal.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.internal.noneOrSingle
|
import net.corda.core.internal.noneOrSingle
|
||||||
import net.corda.core.internal.uncheckedCast
|
import net.corda.core.internal.uncheckedCast
|
||||||
import net.corda.core.node.NodeInfo
|
|
||||||
import net.corda.core.node.services.NetworkMapCache.MapChange
|
|
||||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
@ -17,7 +13,6 @@ import net.corda.node.internal.artemis.ArtemisBroker
|
|||||||
import net.corda.node.internal.artemis.BrokerAddresses
|
import net.corda.node.internal.artemis.BrokerAddresses
|
||||||
import net.corda.node.internal.artemis.CertificateChainCheckPolicy
|
import net.corda.node.internal.artemis.CertificateChainCheckPolicy
|
||||||
import net.corda.node.internal.artemis.SecureArtemisConfiguration
|
import net.corda.node.internal.artemis.SecureArtemisConfiguration
|
||||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
|
import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
|
||||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
|
import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
|
||||||
@ -25,12 +20,10 @@ import net.corda.node.services.messaging.NodeLoginModule.Companion.VERIFIER_ROLE
|
|||||||
import net.corda.nodeapi.ArtemisTcpTransport
|
import net.corda.nodeapi.ArtemisTcpTransport
|
||||||
import net.corda.nodeapi.ConnectionDirection
|
import net.corda.nodeapi.ConnectionDirection
|
||||||
import net.corda.nodeapi.VerifierApi
|
import net.corda.nodeapi.VerifierApi
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisPeerAddress
|
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
||||||
import net.corda.nodeapi.internal.requireOnDefaultFileSystem
|
import net.corda.nodeapi.internal.requireOnDefaultFileSystem
|
||||||
@ -47,7 +40,6 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
|
|||||||
import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback
|
import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback
|
||||||
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal
|
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal
|
||||||
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal
|
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal
|
||||||
import rx.Subscription
|
|
||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
import java.security.KeyStoreException
|
import java.security.KeyStoreException
|
||||||
import java.security.Principal
|
import java.security.Principal
|
||||||
@ -79,7 +71,6 @@ import javax.security.auth.spi.LoginModule
|
|||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
class ArtemisMessagingServer(private val config: NodeConfiguration,
|
class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||||
private val p2pPort: Int,
|
private val p2pPort: Int,
|
||||||
val networkMapCache: NetworkMapCacheInternal,
|
|
||||||
val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() {
|
val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() {
|
||||||
companion object {
|
companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
@ -91,29 +82,19 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
|||||||
private val mutex = ThreadBox(InnerState())
|
private val mutex = ThreadBox(InnerState())
|
||||||
private lateinit var activeMQServer: ActiveMQServer
|
private lateinit var activeMQServer: ActiveMQServer
|
||||||
override val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl
|
override val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl
|
||||||
private var networkChangeHandle: Subscription? = null
|
|
||||||
private lateinit var bridgeManager: BridgeManager
|
|
||||||
|
|
||||||
init {
|
init {
|
||||||
config.baseDirectory.requireOnDefaultFileSystem()
|
config.baseDirectory.requireOnDefaultFileSystem()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* The server will make sure the bridge exists on network map changes, see method [updateBridgesOnNetworkChange]
|
|
||||||
* We assume network map will be updated accordingly when the client node register with the network map.
|
|
||||||
*/
|
|
||||||
override fun start() = mutex.locked {
|
override fun start() = mutex.locked {
|
||||||
if (!running) {
|
if (!running) {
|
||||||
configureAndStartServer()
|
configureAndStartServer()
|
||||||
networkChangeHandle = networkMapCache.changed.subscribe { updateBridgesOnNetworkChange(it) }
|
|
||||||
running = true
|
running = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun stop() = mutex.locked {
|
override fun stop() = mutex.locked {
|
||||||
bridgeManager.close()
|
|
||||||
networkChangeHandle?.unsubscribe()
|
|
||||||
networkChangeHandle = null
|
|
||||||
activeMQServer.stop()
|
activeMQServer.stop()
|
||||||
running = false
|
running = false
|
||||||
}
|
}
|
||||||
@ -134,17 +115,11 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
|||||||
registerActivationFailureListener { exception -> throw exception }
|
registerActivationFailureListener { exception -> throw exception }
|
||||||
// Some types of queue might need special preparation on our side, like dialling back or preparing
|
// Some types of queue might need special preparation on our side, like dialling back or preparing
|
||||||
// a lazily initialised subsystem.
|
// a lazily initialised subsystem.
|
||||||
registerPostQueueCreationCallback { deployBridgesFromNewQueue(it.toString()) }
|
registerPostQueueCreationCallback { log.debug { "Queue Created: $it" } }
|
||||||
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
|
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
|
||||||
}
|
}
|
||||||
// Config driven switch between legacy CORE bridges and the newer AMQP protocol bridges.
|
// Config driven switch between legacy CORE bridges and the newer AMQP protocol bridges.
|
||||||
bridgeManager = if (config.useAMQPBridges) {
|
|
||||||
AMQPBridgeManager(config, NetworkHostAndPort("localhost", p2pPort), maxMessageSize)
|
|
||||||
} else {
|
|
||||||
CoreBridgeManager(config, activeMQServer)
|
|
||||||
}
|
|
||||||
activeMQServer.start()
|
activeMQServer.start()
|
||||||
bridgeManager.start()
|
|
||||||
Node.printBasicNodeInfo("Listening on port", p2pPort.toString())
|
Node.printBasicNodeInfo("Listening on port", p2pPort.toString())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -223,76 +198,8 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
|||||||
return ActiveMQJAASSecurityManager(NodeLoginModule::class.java.name, securityConfig)
|
return ActiveMQJAASSecurityManager(NodeLoginModule::class.java.name, securityConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun deployBridgesFromNewQueue(queueName: String) {
|
|
||||||
log.debug { "Queue created: $queueName, deploying bridge(s)" }
|
|
||||||
fun deployBridgeToPeer(nodeInfo: NodeInfo) {
|
|
||||||
log.debug("Deploying bridge for $queueName to $nodeInfo")
|
|
||||||
val address = nodeInfo.addresses.single()
|
|
||||||
bridgeManager.deployBridge(queueName, address, nodeInfo.legalIdentitiesAndCerts.map { it.name }.toSet())
|
|
||||||
}
|
|
||||||
|
|
||||||
if (queueName.startsWith(PEERS_PREFIX)) {
|
|
||||||
try {
|
|
||||||
val nodeInfos = networkMapCache.getNodesByOwningKeyIndex(queueName.substring(PEERS_PREFIX.length))
|
|
||||||
if (nodeInfos.isNotEmpty()) {
|
|
||||||
nodeInfos.forEach { deployBridgeToPeer(it) }
|
|
||||||
} else {
|
|
||||||
log.error("Queue created for a peer that we don't know from the network map: $queueName")
|
|
||||||
}
|
|
||||||
} catch (e: AddressFormatException) {
|
|
||||||
log.error("Flow violation: Could not parse peer queue name as Base 58: $queueName")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The bridge will be created automatically when the queues are created, however, this is not the case when the network map restarted.
|
|
||||||
* The queues are restored from the journal, and because the queues are added before we register the callback handler, this method will never get called for existing queues.
|
|
||||||
* This results in message queues up and never get send out. (https://github.com/corda/corda/issues/37)
|
|
||||||
*
|
|
||||||
* We create the bridges indirectly now because the network map is not persisted and there are no ways to obtain host and port information on startup.
|
|
||||||
* TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service.
|
|
||||||
*/
|
|
||||||
private fun updateBridgesOnNetworkChange(change: MapChange) {
|
|
||||||
log.debug { "Updating bridges on network map change: ${change.node}" }
|
|
||||||
fun gatherAddresses(node: NodeInfo): Sequence<ArtemisPeerAddress> {
|
|
||||||
val address = node.addresses.single()
|
|
||||||
return node.legalIdentitiesAndCerts.map { NodeAddress(it.party.owningKey, address) }.asSequence()
|
|
||||||
}
|
|
||||||
|
|
||||||
fun deployBridges(node: NodeInfo) {
|
|
||||||
gatherAddresses(node)
|
|
||||||
.filter { queueExists(it.queueName) && !bridgeManager.bridgeExists(it.bridgeName) }
|
|
||||||
.forEach { deployBridge(it, node.legalIdentitiesAndCerts.map { it.name }.toSet()) }
|
|
||||||
}
|
|
||||||
|
|
||||||
when (change) {
|
|
||||||
is MapChange.Added -> {
|
|
||||||
deployBridges(change.node)
|
|
||||||
}
|
|
||||||
is MapChange.Removed -> {
|
|
||||||
bridgeManager.destroyBridges(change.node)
|
|
||||||
}
|
|
||||||
is MapChange.Modified -> {
|
|
||||||
// TODO Figure out what has actually changed and only destroy those bridges that need to be.
|
|
||||||
bridgeManager.destroyBridges(change.previousNode)
|
|
||||||
deployBridges(change.node)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun deployBridge(address: ArtemisPeerAddress, legalNames: Set<CordaX500Name>) {
|
|
||||||
bridgeManager.deployBridge(address.queueName, address.hostAndPort, legalNames)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun createTcpTransport(connectionDirection: ConnectionDirection, host: String, port: Int, enableSSL: Boolean = true) =
|
private fun createTcpTransport(connectionDirection: ConnectionDirection, host: String, port: Int, enableSSL: Boolean = true) =
|
||||||
ArtemisTcpTransport.tcpTransport(connectionDirection, NetworkHostAndPort(host, port), config, enableSSL = enableSSL)
|
ArtemisTcpTransport.tcpTransport(connectionDirection, NetworkHostAndPort(host, port), config, enableSSL = enableSSL)
|
||||||
|
|
||||||
private fun queueExists(queueName: String): Boolean = activeMQServer.queueQuery(SimpleString(queueName)).isExists
|
|
||||||
|
|
||||||
private val ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort)
|
|
||||||
|
|
||||||
private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -0,0 +1,116 @@
|
|||||||
|
package net.corda.node.services.messaging
|
||||||
|
|
||||||
|
import net.corda.core.serialization.SerializationDefaults
|
||||||
|
import net.corda.core.serialization.deserialize
|
||||||
|
import net.corda.core.serialization.serialize
|
||||||
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
|
import net.corda.core.utilities.contextLogger
|
||||||
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||||
|
import net.corda.nodeapi.internal.BridgeControl
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||||
|
import java.util.*
|
||||||
|
|
||||||
|
internal class BridgeControlListener(val config: NodeConfiguration,
|
||||||
|
val p2pAddress: NetworkHostAndPort,
|
||||||
|
val maxMessageSize: Int) : AutoCloseable {
|
||||||
|
private val bridgeId: String = UUID.randomUUID().toString()
|
||||||
|
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, p2pAddress, maxMessageSize)
|
||||||
|
private val validInboundQueues = mutableSetOf<String>()
|
||||||
|
private var artemis: ArtemisMessagingClient? = null
|
||||||
|
private var controlConsumer: ClientConsumer? = null
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
private val log = contextLogger()
|
||||||
|
}
|
||||||
|
|
||||||
|
fun start() {
|
||||||
|
stop()
|
||||||
|
bridgeManager.start()
|
||||||
|
val artemis = ArtemisMessagingClient(config, p2pAddress, maxMessageSize)
|
||||||
|
this.artemis = artemis
|
||||||
|
artemis.start()
|
||||||
|
val artemisClient = artemis.started!!
|
||||||
|
val artemisSession = artemisClient.session
|
||||||
|
val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
|
||||||
|
artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue)
|
||||||
|
val control = artemisSession.createConsumer(bridgeControlQueue)
|
||||||
|
controlConsumer = control
|
||||||
|
control.setMessageHandler { msg ->
|
||||||
|
try {
|
||||||
|
processControlMessage(msg)
|
||||||
|
} catch (ex: Exception) {
|
||||||
|
log.error("Unable to process bridge control message", ex)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
val startupMessage = BridgeControl.BridgeToNodeSnapshotRequest(bridgeId).serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
|
||||||
|
val bridgeRequest = artemisSession.createMessage(false)
|
||||||
|
bridgeRequest.writeBodyBufferBytes(startupMessage)
|
||||||
|
artemisClient.producer.send(BRIDGE_NOTIFY, bridgeRequest)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun stop() {
|
||||||
|
controlConsumer?.close()
|
||||||
|
controlConsumer = null
|
||||||
|
artemis?.stop()
|
||||||
|
artemis = null
|
||||||
|
bridgeManager.stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun close() = stop()
|
||||||
|
|
||||||
|
private fun validateInboxQueueName(queueName: String): Boolean {
|
||||||
|
return queueName.startsWith(P2P_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun validateBridgingQueueName(queueName: String): Boolean {
|
||||||
|
return queueName.startsWith(PEERS_PREFIX) && artemis!!.started!!.session.queueQuery(SimpleString(queueName)).isExists
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun processControlMessage(msg: ClientMessage) {
|
||||||
|
val data: ByteArray = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
|
||||||
|
val controlMessage = data.deserialize<BridgeControl>(context = SerializationDefaults.P2P_CONTEXT)
|
||||||
|
log.info("Received bridge control message $controlMessage")
|
||||||
|
when (controlMessage) {
|
||||||
|
is BridgeControl.NodeToBridgeSnapshot -> {
|
||||||
|
if (!controlMessage.inboxQueues.all { validateInboxQueueName(it) }) {
|
||||||
|
log.error("Invalid queue names in control message $controlMessage")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if (!controlMessage.sendQueues.all { validateBridgingQueueName(it.queueName) }) {
|
||||||
|
log.error("Invalid queue names in control message $controlMessage")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for (outQueue in controlMessage.sendQueues) {
|
||||||
|
bridgeManager.deployBridge(outQueue.queueName, outQueue.targets.first(), outQueue.legalNames.toSet())
|
||||||
|
}
|
||||||
|
// TODO For now we just record the inboxes, but we don't use the information, but eventually out of process bridges will use this for validating inbound messages.
|
||||||
|
validInboundQueues.addAll(controlMessage.inboxQueues)
|
||||||
|
}
|
||||||
|
is BridgeControl.BridgeToNodeSnapshotRequest -> {
|
||||||
|
log.error("Message from Bridge $controlMessage detected on wrong topic!")
|
||||||
|
}
|
||||||
|
is BridgeControl.Create -> {
|
||||||
|
if (!validateBridgingQueueName((controlMessage.bridgeInfo.queueName))) {
|
||||||
|
log.error("Invalid queue names in control message $controlMessage")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
bridgeManager.deployBridge(controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets.first(), controlMessage.bridgeInfo.legalNames.toSet())
|
||||||
|
}
|
||||||
|
is BridgeControl.Delete -> {
|
||||||
|
if (!controlMessage.bridgeInfo.queueName.startsWith(PEERS_PREFIX)) {
|
||||||
|
log.error("Invalid queue names in control message $controlMessage")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
bridgeManager.destroyBridge(controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets.first())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,17 +1,21 @@
|
|||||||
package net.corda.node.services.messaging
|
package net.corda.node.services.messaging
|
||||||
|
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
|
import net.corda.core.internal.VisibleForTesting
|
||||||
import net.corda.core.node.NodeInfo
|
import net.corda.core.node.NodeInfo
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides an internal interface that the [ArtemisMessagingServer] delegates to for Bridge activities.
|
* Provides an internal interface that the [ArtemisMessagingServer] delegates to for Bridge activities.
|
||||||
*/
|
*/
|
||||||
internal interface BridgeManager : AutoCloseable {
|
@VisibleForTesting
|
||||||
|
interface BridgeManager : AutoCloseable {
|
||||||
fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>)
|
fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>)
|
||||||
|
|
||||||
fun destroyBridges(node: NodeInfo)
|
fun destroyBridges(node: NodeInfo)
|
||||||
|
|
||||||
|
fun destroyBridge(queueName: String, hostAndPort: NetworkHostAndPort)
|
||||||
|
|
||||||
fun bridgeExists(bridgeName: String): Boolean
|
fun bridgeExists(bridgeName: String): Boolean
|
||||||
|
|
||||||
fun start()
|
fun start()
|
||||||
|
@ -1,178 +0,0 @@
|
|||||||
package net.corda.node.services.messaging
|
|
||||||
|
|
||||||
import io.netty.handler.ssl.SslHandler
|
|
||||||
import net.corda.core.identity.CordaX500Name
|
|
||||||
import net.corda.core.internal.uncheckedCast
|
|
||||||
import net.corda.core.node.NodeInfo
|
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
|
||||||
import net.corda.core.utilities.contextLogger
|
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
|
||||||
import net.corda.nodeapi.ArtemisTcpTransport
|
|
||||||
import net.corda.nodeapi.ConnectionDirection
|
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
|
||||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
|
||||||
import net.corda.nodeapi.internal.crypto.x509
|
|
||||||
import org.apache.activemq.artemis.api.core.Message
|
|
||||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration
|
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection
|
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector
|
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
|
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer
|
|
||||||
import org.apache.activemq.artemis.core.server.cluster.Transformer
|
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.*
|
|
||||||
import org.apache.activemq.artemis.utils.ConfigurationHelper
|
|
||||||
import java.time.Duration
|
|
||||||
import java.util.concurrent.Executor
|
|
||||||
import java.util.concurrent.ScheduledExecutorService
|
|
||||||
import javax.security.auth.x500.X500Principal
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class simply moves the legacy CORE bridge code from [ArtemisMessagingServer]
|
|
||||||
* into a class implementing [BridgeManager].
|
|
||||||
* It has no lifecycle events, because the bridges are internal to the ActiveMQServer instance and thus
|
|
||||||
* stop when it is stopped.
|
|
||||||
*/
|
|
||||||
internal class CoreBridgeManager(val config: NodeConfiguration, val activeMQServer: ActiveMQServer) : BridgeManager {
|
|
||||||
companion object {
|
|
||||||
private fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
|
|
||||||
|
|
||||||
private val ArtemisMessagingComponent.ArtemisPeerAddress.bridgeName: String get() = getBridgeName(queueName, hostAndPort)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun gatherAddresses(node: NodeInfo): Sequence<ArtemisMessagingComponent.ArtemisPeerAddress> {
|
|
||||||
val address = node.addresses.single()
|
|
||||||
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, address) }.asSequence()
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* All nodes are expected to have a public facing address called p2p.inbound.$identity for receiving
|
|
||||||
* messages from other nodes. When we want to send a message to a node we send it to our internal address/queue for it,
|
|
||||||
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
|
|
||||||
* P2P address.
|
|
||||||
*/
|
|
||||||
override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>) {
|
|
||||||
val connectionDirection = ConnectionDirection.Outbound(
|
|
||||||
connectorFactoryClassName = VerifyingNettyConnectorFactory::class.java.name,
|
|
||||||
expectedCommonNames = legalNames
|
|
||||||
)
|
|
||||||
val tcpTransport = ArtemisTcpTransport.tcpTransport(connectionDirection, target, config, enableSSL = true)
|
|
||||||
tcpTransport.params[ArtemisMessagingServer::class.java.name] = this
|
|
||||||
// We intentionally overwrite any previous connector config in case the peer legal name changed
|
|
||||||
activeMQServer.configuration.addConnectorConfiguration(target.toString(), tcpTransport)
|
|
||||||
|
|
||||||
activeMQServer.deployBridge(BridgeConfiguration().apply {
|
|
||||||
name = getBridgeName(queueName, target)
|
|
||||||
this.queueName = queueName
|
|
||||||
staticConnectors = listOf(target.toString())
|
|
||||||
confirmationWindowSize = 100000 // a guess
|
|
||||||
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
|
|
||||||
// We keep trying until the network map deems the node unreachable and tells us it's been removed at which
|
|
||||||
// point we destroy the bridge
|
|
||||||
retryInterval = config.activeMQServer.bridge.retryIntervalMs
|
|
||||||
retryIntervalMultiplier = config.activeMQServer.bridge.retryIntervalMultiplier
|
|
||||||
maxRetryInterval = Duration.ofMinutes(config.activeMQServer.bridge.maxRetryIntervalMin).toMillis()
|
|
||||||
// As a peer of the target node we must connect to it using the peer user. Actual authentication is done using
|
|
||||||
// our TLS certificate.
|
|
||||||
user = PEER_USER
|
|
||||||
password = PEER_USER
|
|
||||||
transformerClassName = InboxTopicTransformer::class.java.name
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun bridgeExists(bridgeName: String): Boolean = activeMQServer.clusterManager.bridges.containsKey(bridgeName)
|
|
||||||
|
|
||||||
override fun start() {
|
|
||||||
// Nothing to do
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun stop() {
|
|
||||||
// Nothing to do
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun close() = stop()
|
|
||||||
|
|
||||||
override fun destroyBridges(node: NodeInfo) {
|
|
||||||
gatherAddresses(node).forEach {
|
|
||||||
activeMQServer.destroyBridge(it.bridgeName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class InboxTopicTransformer : Transformer {
|
|
||||||
override fun transform(message: Message): Message {
|
|
||||||
message.address = translateLocalQueueToInboxAddress(message.address)
|
|
||||||
return message
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class VerifyingNettyConnectorFactory : NettyConnectorFactory() {
|
|
||||||
override fun createConnector(configuration: MutableMap<String, Any>,
|
|
||||||
handler: BufferHandler?,
|
|
||||||
listener: ClientConnectionLifeCycleListener?,
|
|
||||||
closeExecutor: Executor?,
|
|
||||||
threadPool: Executor?,
|
|
||||||
scheduledThreadPool: ScheduledExecutorService?,
|
|
||||||
protocolManager: ClientProtocolManager?): Connector {
|
|
||||||
return VerifyingNettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool,
|
|
||||||
protocolManager)
|
|
||||||
}
|
|
||||||
|
|
||||||
private class VerifyingNettyConnector(configuration: MutableMap<String, Any>,
|
|
||||||
handler: BufferHandler?,
|
|
||||||
listener: ClientConnectionLifeCycleListener?,
|
|
||||||
closeExecutor: Executor?,
|
|
||||||
threadPool: Executor?,
|
|
||||||
scheduledThreadPool: ScheduledExecutorService?,
|
|
||||||
protocolManager: ClientProtocolManager?) :
|
|
||||||
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) {
|
|
||||||
companion object {
|
|
||||||
private val log = contextLogger()
|
|
||||||
}
|
|
||||||
|
|
||||||
private val sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration)
|
|
||||||
|
|
||||||
override fun createConnection(): Connection? {
|
|
||||||
val connection = super.createConnection() as? NettyConnection
|
|
||||||
if (sslEnabled && connection != null) {
|
|
||||||
val expectedLegalNames: Set<CordaX500Name> = uncheckedCast(configuration[ArtemisTcpTransport.VERIFY_PEER_LEGAL_NAME] ?: emptySet<CordaX500Name>())
|
|
||||||
try {
|
|
||||||
val session = connection.channel
|
|
||||||
.pipeline()
|
|
||||||
.get(SslHandler::class.java)
|
|
||||||
.engine()
|
|
||||||
.session
|
|
||||||
// Checks the peer name is the one we are expecting.
|
|
||||||
// TODO Some problems here: after introduction of multiple legal identities on the node and removal of the main one,
|
|
||||||
// we run into the issue, who are we connecting to. There are some solutions to that: advertise `network identity`;
|
|
||||||
// have mapping port -> identity (but, design doc says about removing SingleMessageRecipient and having just NetworkHostAndPort,
|
|
||||||
// it was convenient to store that this way); SNI.
|
|
||||||
val peerLegalName = CordaX500Name.parse(session.peerPrincipal.name)
|
|
||||||
val expectedLegalName = expectedLegalNames.singleOrNull { it == peerLegalName }
|
|
||||||
require(expectedLegalName != null) {
|
|
||||||
"Peer has wrong CN - expected $expectedLegalNames but got $peerLegalName. This is either a fatal " +
|
|
||||||
"misconfiguration by the remote peer or an SSL man-in-the-middle attack!"
|
|
||||||
}
|
|
||||||
// Make sure certificate has the same name.
|
|
||||||
val peerCertificateName = CordaX500Name.build(X500Principal(session.peerCertificateChain[0].subjectDN.name))
|
|
||||||
require(peerCertificateName == expectedLegalName) {
|
|
||||||
"Peer has wrong subject name in the certificate - expected $expectedLegalNames but got $peerCertificateName. This is either a fatal " +
|
|
||||||
"misconfiguration by the remote peer or an SSL man-in-the-middle attack!"
|
|
||||||
}
|
|
||||||
X509Utilities.validateCertificateChain(
|
|
||||||
session.localCertificates.last().x509,
|
|
||||||
session.peerCertificates.x509)
|
|
||||||
} catch (e: IllegalArgumentException) {
|
|
||||||
connection.close()
|
|
||||||
log.error(e.message)
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return connection
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,10 +1,13 @@
|
|||||||
package net.corda.node.services.messaging
|
package net.corda.node.services.messaging
|
||||||
|
|
||||||
|
import net.corda.core.crypto.toStringShort
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.internal.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.messaging.CordaRPCOps
|
import net.corda.core.messaging.CordaRPCOps
|
||||||
import net.corda.core.messaging.MessageRecipients
|
import net.corda.core.messaging.MessageRecipients
|
||||||
import net.corda.core.messaging.SingleMessageRecipient
|
import net.corda.core.messaging.SingleMessageRecipient
|
||||||
|
import net.corda.core.node.NodeInfo
|
||||||
|
import net.corda.core.node.services.NetworkMapCache
|
||||||
import net.corda.core.node.services.PartyInfo
|
import net.corda.core.node.services.PartyInfo
|
||||||
import net.corda.core.serialization.SerializationDefaults
|
import net.corda.core.serialization.SerializationDefaults
|
||||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||||
@ -12,6 +15,7 @@ import net.corda.core.serialization.deserialize
|
|||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.*
|
||||||
import net.corda.node.VersionInfo
|
import net.corda.node.VersionInfo
|
||||||
|
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.statemachine.DeduplicationId
|
import net.corda.node.services.statemachine.DeduplicationId
|
||||||
import net.corda.node.services.statemachine.FlowMessagingImpl
|
import net.corda.node.services.statemachine.FlowMessagingImpl
|
||||||
@ -19,6 +23,11 @@ import net.corda.node.utilities.AffinityExecutor
|
|||||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||||
import net.corda.node.utilities.PersistentMap
|
import net.corda.node.utilities.PersistentMap
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||||
|
import net.corda.nodeapi.internal.BridgeControl
|
||||||
|
import net.corda.nodeapi.internal.BridgeEntry
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||||
@ -27,6 +36,8 @@ import org.apache.activemq.artemis.api.core.RoutingType
|
|||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||||
|
import rx.Subscription
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
@ -73,6 +84,7 @@ class P2PMessagingClient(config: NodeConfiguration,
|
|||||||
private val serviceIdentity: PublicKey?,
|
private val serviceIdentity: PublicKey?,
|
||||||
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
|
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
|
||||||
private val database: CordaPersistence,
|
private val database: CordaPersistence,
|
||||||
|
private val networkMap: NetworkMapCacheInternal,
|
||||||
advertisedAddress: NetworkHostAndPort = serverAddress,
|
advertisedAddress: NetworkHostAndPort = serverAddress,
|
||||||
maxMessageSize: Int
|
maxMessageSize: Int
|
||||||
) : SingletonSerializeAsToken(), MessagingService {
|
) : SingletonSerializeAsToken(), MessagingService {
|
||||||
@ -133,6 +145,8 @@ class P2PMessagingClient(config: NodeConfiguration,
|
|||||||
var running = false
|
var running = false
|
||||||
var p2pConsumer: ClientConsumer? = null
|
var p2pConsumer: ClientConsumer? = null
|
||||||
var serviceConsumer: ClientConsumer? = null
|
var serviceConsumer: ClientConsumer? = null
|
||||||
|
var bridgeNotifyConsumer: ClientConsumer? = null
|
||||||
|
var networkChangeSubscription: Subscription? = null
|
||||||
}
|
}
|
||||||
|
|
||||||
private val messagesToRedeliver = database.transaction {
|
private val messagesToRedeliver = database.transaction {
|
||||||
@ -147,12 +161,13 @@ class P2PMessagingClient(config: NodeConfiguration,
|
|||||||
private val cordaVendor = SimpleString(versionInfo.vendor)
|
private val cordaVendor = SimpleString(versionInfo.vendor)
|
||||||
private val releaseVersion = SimpleString(versionInfo.releaseVersion)
|
private val releaseVersion = SimpleString(versionInfo.releaseVersion)
|
||||||
/** An executor for sending messages */
|
/** An executor for sending messages */
|
||||||
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
|
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging ${myIdentity.toStringShort()}", 1)
|
||||||
|
|
||||||
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
|
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
|
||||||
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
|
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
|
||||||
private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize)
|
private val artemis = ArtemisMessagingClient(config, serverAddress, maxMessageSize)
|
||||||
private val state = ThreadBox(InnerState())
|
private val state = ThreadBox(InnerState())
|
||||||
|
private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
|
||||||
|
|
||||||
private val handlers = ConcurrentHashMap<String, MessageHandler>()
|
private val handlers = ConcurrentHashMap<String, MessageHandler>()
|
||||||
|
|
||||||
@ -189,11 +204,13 @@ class P2PMessagingClient(config: NodeConfiguration,
|
|||||||
state.locked {
|
state.locked {
|
||||||
val session = artemis.start().session
|
val session = artemis.start().session
|
||||||
val inbox = RemoteInboxAddress(myIdentity).queueName
|
val inbox = RemoteInboxAddress(myIdentity).queueName
|
||||||
|
val inboxes = mutableListOf(inbox)
|
||||||
// Create a queue, consumer and producer for handling P2P network messages.
|
// Create a queue, consumer and producer for handling P2P network messages.
|
||||||
createQueueIfAbsent(inbox)
|
createQueueIfAbsent(inbox)
|
||||||
p2pConsumer = session.createConsumer(inbox)
|
p2pConsumer = session.createConsumer(inbox)
|
||||||
if (serviceIdentity != null) {
|
if (serviceIdentity != null) {
|
||||||
val serviceAddress = RemoteInboxAddress(serviceIdentity).queueName
|
val serviceAddress = RemoteInboxAddress(serviceIdentity).queueName
|
||||||
|
inboxes += serviceAddress
|
||||||
createQueueIfAbsent(serviceAddress)
|
createQueueIfAbsent(serviceAddress)
|
||||||
val serviceHandler = session.createConsumer(serviceAddress)
|
val serviceHandler = session.createConsumer(serviceAddress)
|
||||||
serviceHandler.setMessageHandler { msg ->
|
serviceHandler.setMessageHandler { msg ->
|
||||||
@ -202,11 +219,96 @@ class P2PMessagingClient(config: NodeConfiguration,
|
|||||||
deliver(msg, message)
|
deliver(msg, message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
registerBridgeControl(session, inboxes)
|
||||||
|
enumerateBridges(session, inboxes)
|
||||||
}
|
}
|
||||||
|
|
||||||
resumeMessageRedelivery()
|
resumeMessageRedelivery()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List<String>) {
|
||||||
|
val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}"
|
||||||
|
session.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue)
|
||||||
|
val bridgeConsumer = session.createConsumer(bridgeNotifyQueue)
|
||||||
|
bridgeNotifyConsumer = bridgeConsumer
|
||||||
|
bridgeConsumer.setMessageHandler { msg ->
|
||||||
|
val data: ByteArray = ByteArray(msg.bodySize).apply { msg.bodyBuffer.readBytes(this) }
|
||||||
|
val notifyMessage = data.deserialize<BridgeControl>(context = SerializationDefaults.P2P_CONTEXT)
|
||||||
|
log.info(notifyMessage.toString())
|
||||||
|
when (notifyMessage) {
|
||||||
|
is BridgeControl.BridgeToNodeSnapshotRequest -> enumerateBridges(session, inboxes)
|
||||||
|
else -> log.error("Unexpected Bridge Control message type on notify topc $notifyMessage")
|
||||||
|
}
|
||||||
|
msg.acknowledge()
|
||||||
|
}
|
||||||
|
networkChangeSubscription = networkMap.changed.subscribe { updateBridgesOnNetworkChange(it) }
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun sendBridgeControl(message: BridgeControl) {
|
||||||
|
val client = artemis.started!!
|
||||||
|
val controlPacket = message.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
|
||||||
|
val artemisMessage = client.session.createMessage(false)
|
||||||
|
artemisMessage.writeBodyBufferBytes(controlPacket)
|
||||||
|
client.producer.send(BRIDGE_CONTROL, artemisMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun updateBridgesOnNetworkChange(change: NetworkMapCache.MapChange) {
|
||||||
|
log.info("Updating bridges on network map change: ${change.node}")
|
||||||
|
fun gatherAddresses(node: NodeInfo): Sequence<BridgeEntry> {
|
||||||
|
return node.legalIdentitiesAndCerts.map {
|
||||||
|
val messagingAddress = NodeAddress(it.party.owningKey, node.addresses.first())
|
||||||
|
BridgeEntry(messagingAddress.queueName, node.addresses, listOf(it.party.name))
|
||||||
|
}.filter { artemis.started!!.session.queueQuery(SimpleString(it.queueName)).isExists }.asSequence()
|
||||||
|
}
|
||||||
|
|
||||||
|
fun deployBridges(node: NodeInfo) {
|
||||||
|
gatherAddresses(node)
|
||||||
|
.forEach {
|
||||||
|
sendBridgeControl(BridgeControl.Create(myIdentity.toStringShort(), it))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun destroyBridges(node: NodeInfo) {
|
||||||
|
gatherAddresses(node)
|
||||||
|
.forEach {
|
||||||
|
sendBridgeControl(BridgeControl.Delete(myIdentity.toStringShort(), it))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
when (change) {
|
||||||
|
is NetworkMapCache.MapChange.Added -> {
|
||||||
|
deployBridges(change.node)
|
||||||
|
}
|
||||||
|
is NetworkMapCache.MapChange.Removed -> {
|
||||||
|
destroyBridges(change.node)
|
||||||
|
}
|
||||||
|
is NetworkMapCache.MapChange.Modified -> {
|
||||||
|
destroyBridges(change.previousNode)
|
||||||
|
deployBridges(change.node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun enumerateBridges(session: ClientSession, inboxes: List<String>) {
|
||||||
|
val requiredBridges = mutableListOf<BridgeEntry>()
|
||||||
|
fun createBridgeEntry(queueName: SimpleString) {
|
||||||
|
val keyHash = queueName.substring(PEERS_PREFIX.length)
|
||||||
|
val peers = networkMap.getNodesByOwningKeyIndex(keyHash)
|
||||||
|
for (node in peers) {
|
||||||
|
val bridge = BridgeEntry(queueName.toString(), node.addresses, node.legalIdentities.map { it.name })
|
||||||
|
requiredBridges += bridge
|
||||||
|
knownQueues += queueName.toString()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val queues = session.addressQuery(SimpleString("$PEERS_PREFIX#")).queueNames
|
||||||
|
for (queue in queues) {
|
||||||
|
createBridgeEntry(queue)
|
||||||
|
}
|
||||||
|
val startupMessage = BridgeControl.NodeToBridgeSnapshot(myIdentity.toStringShort(), inboxes, requiredBridges)
|
||||||
|
sendBridgeControl(startupMessage)
|
||||||
|
}
|
||||||
|
|
||||||
private fun resumeMessageRedelivery() {
|
private fun resumeMessageRedelivery() {
|
||||||
messagesToRedeliver.forEach { retryId, (message, target) ->
|
messagesToRedeliver.forEach { retryId, (message, target) ->
|
||||||
send(message, target, retryId)
|
send(message, target, retryId)
|
||||||
@ -343,12 +445,18 @@ class P2PMessagingClient(config: NodeConfiguration,
|
|||||||
check(artemis.started != null)
|
check(artemis.started != null)
|
||||||
val prevRunning = running
|
val prevRunning = running
|
||||||
running = false
|
running = false
|
||||||
|
networkChangeSubscription?.unsubscribe()
|
||||||
val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice")
|
val c = p2pConsumer ?: throw IllegalStateException("stop can't be called twice")
|
||||||
try {
|
try {
|
||||||
c.close()
|
c.close()
|
||||||
} catch (e: ActiveMQObjectClosedException) {
|
} catch (e: ActiveMQObjectClosedException) {
|
||||||
// Ignore it: this can happen if the server has gone away before we do.
|
// Ignore it: this can happen if the server has gone away before we do.
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
bridgeNotifyConsumer!!.close()
|
||||||
|
} catch (e: ActiveMQObjectClosedException) {
|
||||||
|
// Ignore it: this can happen if the server has gone away before we do.
|
||||||
|
}
|
||||||
p2pConsumer = null
|
p2pConsumer = null
|
||||||
val s = serviceConsumer
|
val s = serviceConsumer
|
||||||
try {
|
try {
|
||||||
@ -357,6 +465,7 @@ class P2PMessagingClient(config: NodeConfiguration,
|
|||||||
// Ignore it: this can happen if the server has gone away before we do.
|
// Ignore it: this can happen if the server has gone away before we do.
|
||||||
}
|
}
|
||||||
serviceConsumer = null
|
serviceConsumer = null
|
||||||
|
knownQueues.clear()
|
||||||
prevRunning
|
prevRunning
|
||||||
}
|
}
|
||||||
if (running && !nodeExecutor.isOnThread) {
|
if (running && !nodeExecutor.isOnThread) {
|
||||||
@ -468,13 +577,25 @@ class P2PMessagingClient(config: NodeConfiguration,
|
|||||||
|
|
||||||
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
|
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
|
||||||
private fun createQueueIfAbsent(queueName: String) {
|
private fun createQueueIfAbsent(queueName: String) {
|
||||||
state.alreadyLocked {
|
if (!knownQueues.contains(queueName)) {
|
||||||
val session = artemis.started!!.session
|
state.alreadyLocked {
|
||||||
val queueQuery = session.queueQuery(SimpleString(queueName))
|
val session = artemis.started!!.session
|
||||||
if (!queueQuery.isExists) {
|
val queueQuery = session.queueQuery(SimpleString(queueName))
|
||||||
log.info("Create fresh queue $queueName bound on same address")
|
if (!queueQuery.isExists) {
|
||||||
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
|
log.info("Create fresh queue $queueName bound on same address")
|
||||||
|
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
|
||||||
|
if (queueName.startsWith(PEERS_PREFIX)) {
|
||||||
|
val keyHash = queueName.substring(PEERS_PREFIX.length)
|
||||||
|
val peers = networkMap.getNodesByOwningKeyIndex(keyHash)
|
||||||
|
for (node in peers) {
|
||||||
|
val bridge = BridgeEntry(queueName, node.addresses, node.legalIdentities.map { it.name })
|
||||||
|
val createBridgeMessage = BridgeControl.Create(myIdentity.toStringShort(), bridge)
|
||||||
|
sendBridgeControl(createBridgeMessage)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
knownQueues += queueName
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,7 +13,6 @@ import net.corda.core.flows.NotaryException
|
|||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.node.services.NotaryService
|
import net.corda.core.node.services.NotaryService
|
||||||
import net.corda.core.node.services.TimeWindowChecker
|
|
||||||
import net.corda.core.node.services.UniquenessProvider
|
import net.corda.core.node.services.UniquenessProvider
|
||||||
import net.corda.core.schemas.PersistentStateRef
|
import net.corda.core.schemas.PersistentStateRef
|
||||||
import net.corda.core.serialization.deserialize
|
import net.corda.core.serialization.deserialize
|
||||||
@ -54,8 +53,7 @@ class BFTNonValidatingNotaryService(
|
|||||||
// Replica startup must be in parallel with other replicas, otherwise the constructor may not return:
|
// Replica startup must be in parallel with other replicas, otherwise the constructor may not return:
|
||||||
thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) {
|
thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) {
|
||||||
configHandle.use {
|
configHandle.use {
|
||||||
val timeWindowChecker = TimeWindowChecker(services.clock)
|
val replica = Replica(it, replicaId, { createMap() }, services, notaryIdentityKey)
|
||||||
val replica = Replica(it, replicaId, { createMap() }, services, notaryIdentityKey, timeWindowChecker)
|
|
||||||
replicaHolder.set(replica)
|
replicaHolder.set(replica)
|
||||||
log.info("BFT SMaRt replica $replicaId is running.")
|
log.info("BFT SMaRt replica $replicaId is running.")
|
||||||
}
|
}
|
||||||
@ -131,8 +129,7 @@ class BFTNonValidatingNotaryService(
|
|||||||
replicaId: Int,
|
replicaId: Int,
|
||||||
createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistedCommittedState, PersistentStateRef>,
|
createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistedCommittedState, PersistentStateRef>,
|
||||||
services: ServiceHubInternal,
|
services: ServiceHubInternal,
|
||||||
notaryIdentityKey: PublicKey,
|
notaryIdentityKey: PublicKey) : BFTSMaRt.Replica(config, replicaId, createMap, services, notaryIdentityKey) {
|
||||||
timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Replica(config, replicaId, createMap, services, notaryIdentityKey, timeWindowChecker) {
|
|
||||||
|
|
||||||
override fun executeCommand(command: ByteArray): ByteArray {
|
override fun executeCommand(command: ByteArray): ByteArray {
|
||||||
val request = command.deserialize<BFTSMaRt.CommitRequest>()
|
val request = command.deserialize<BFTSMaRt.CommitRequest>()
|
||||||
@ -146,7 +143,7 @@ class BFTNonValidatingNotaryService(
|
|||||||
val id = ftx.id
|
val id = ftx.id
|
||||||
val inputs = ftx.inputs
|
val inputs = ftx.inputs
|
||||||
val notary = ftx.notary
|
val notary = ftx.notary
|
||||||
validateTimeWindow(ftx.timeWindow)
|
NotaryService.validateTimeWindow(services.clock, ftx.timeWindow)
|
||||||
if (notary !in services.myInfo.legalIdentities) throw NotaryException(NotaryError.WrongNotary)
|
if (notary !in services.myInfo.legalIdentities) throw NotaryException(NotaryError.WrongNotary)
|
||||||
commitInputStates(inputs, id, callerIdentity)
|
commitInputStates(inputs, id, callerIdentity)
|
||||||
log.debug { "Inputs committed successfully, signing $id" }
|
log.debug { "Inputs committed successfully, signing $id" }
|
||||||
|
@ -13,14 +13,12 @@ import bftsmart.tom.server.defaultservices.DefaultRecoverable
|
|||||||
import bftsmart.tom.server.defaultservices.DefaultReplier
|
import bftsmart.tom.server.defaultservices.DefaultReplier
|
||||||
import bftsmart.tom.util.Extractor
|
import bftsmart.tom.util.Extractor
|
||||||
import net.corda.core.contracts.StateRef
|
import net.corda.core.contracts.StateRef
|
||||||
import net.corda.core.contracts.TimeWindow
|
|
||||||
import net.corda.core.crypto.*
|
import net.corda.core.crypto.*
|
||||||
import net.corda.core.flows.NotaryError
|
import net.corda.core.flows.NotaryError
|
||||||
import net.corda.core.flows.NotaryException
|
import net.corda.core.flows.NotaryException
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.internal.declaredField
|
import net.corda.core.internal.declaredField
|
||||||
import net.corda.core.internal.toTypedArray
|
import net.corda.core.internal.toTypedArray
|
||||||
import net.corda.core.node.services.TimeWindowChecker
|
|
||||||
import net.corda.core.node.services.UniquenessProvider
|
import net.corda.core.node.services.UniquenessProvider
|
||||||
import net.corda.core.schemas.PersistentStateRef
|
import net.corda.core.schemas.PersistentStateRef
|
||||||
import net.corda.core.serialization.CordaSerializable
|
import net.corda.core.serialization.CordaSerializable
|
||||||
@ -178,8 +176,7 @@ object BFTSMaRt {
|
|||||||
createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx,
|
createMap: () -> AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx,
|
||||||
BFTNonValidatingNotaryService.PersistedCommittedState, PersistentStateRef>,
|
BFTNonValidatingNotaryService.PersistedCommittedState, PersistentStateRef>,
|
||||||
protected val services: ServiceHubInternal,
|
protected val services: ServiceHubInternal,
|
||||||
protected val notaryIdentityKey: PublicKey,
|
protected val notaryIdentityKey: PublicKey) : DefaultRecoverable() {
|
||||||
private val timeWindowChecker: TimeWindowChecker) : DefaultRecoverable() {
|
|
||||||
companion object {
|
companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
}
|
}
|
||||||
@ -218,7 +215,7 @@ object BFTSMaRt {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Implement logic to execute the command and commit the transaction to the log.
|
* Implement logic to execute the command and commit the transaction to the log.
|
||||||
* Helper methods are provided for transaction processing: [commitInputStates], [validateTimeWindow], and [sign].
|
* Helper methods are provided for transaction processing: [commitInputStates], and [sign].
|
||||||
*/
|
*/
|
||||||
abstract fun executeCommand(command: ByteArray): ByteArray?
|
abstract fun executeCommand(command: ByteArray): ByteArray?
|
||||||
|
|
||||||
@ -245,11 +242,6 @@ object BFTSMaRt {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected fun validateTimeWindow(t: TimeWindow?) {
|
|
||||||
if (t != null && !timeWindowChecker.isValid(t))
|
|
||||||
throw NotaryException(NotaryError.TimeWindowInvalid)
|
|
||||||
}
|
|
||||||
|
|
||||||
protected fun sign(bytes: ByteArray): DigitalSignature.WithKey {
|
protected fun sign(bytes: ByteArray): DigitalSignature.WithKey {
|
||||||
return services.database.transaction { services.keyManagementService.sign(bytes, notaryIdentityKey) }
|
return services.database.transaction { services.keyManagementService.sign(bytes, notaryIdentityKey) }
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@ package net.corda.node.services.transactions
|
|||||||
import net.corda.core.flows.FlowSession
|
import net.corda.core.flows.FlowSession
|
||||||
import net.corda.core.flows.NotaryFlow
|
import net.corda.core.flows.NotaryFlow
|
||||||
import net.corda.core.node.ServiceHub
|
import net.corda.core.node.ServiceHub
|
||||||
import net.corda.core.node.services.TimeWindowChecker
|
|
||||||
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
|
|
||||||
@ -13,8 +12,6 @@ class RaftNonValidatingNotaryService(
|
|||||||
override val notaryIdentityKey: PublicKey,
|
override val notaryIdentityKey: PublicKey,
|
||||||
override val uniquenessProvider: RaftUniquenessProvider
|
override val uniquenessProvider: RaftUniquenessProvider
|
||||||
) : TrustedAuthorityNotaryService() {
|
) : TrustedAuthorityNotaryService() {
|
||||||
override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock)
|
|
||||||
|
|
||||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service {
|
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service {
|
||||||
return NonValidatingNotaryFlow(otherPartySession, this)
|
return NonValidatingNotaryFlow(otherPartySession, this)
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@ package net.corda.node.services.transactions
|
|||||||
import net.corda.core.flows.FlowSession
|
import net.corda.core.flows.FlowSession
|
||||||
import net.corda.core.flows.NotaryFlow
|
import net.corda.core.flows.NotaryFlow
|
||||||
import net.corda.core.node.ServiceHub
|
import net.corda.core.node.ServiceHub
|
||||||
import net.corda.core.node.services.TimeWindowChecker
|
|
||||||
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
|
|
||||||
@ -13,8 +12,6 @@ class RaftValidatingNotaryService(
|
|||||||
override val notaryIdentityKey: PublicKey,
|
override val notaryIdentityKey: PublicKey,
|
||||||
override val uniquenessProvider: RaftUniquenessProvider
|
override val uniquenessProvider: RaftUniquenessProvider
|
||||||
) : TrustedAuthorityNotaryService() {
|
) : TrustedAuthorityNotaryService() {
|
||||||
override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock)
|
|
||||||
|
|
||||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service {
|
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service {
|
||||||
return ValidatingNotaryFlow(otherPartySession, this)
|
return ValidatingNotaryFlow(otherPartySession, this)
|
||||||
}
|
}
|
||||||
|
@ -2,14 +2,12 @@ package net.corda.node.services.transactions
|
|||||||
|
|
||||||
import net.corda.core.flows.FlowSession
|
import net.corda.core.flows.FlowSession
|
||||||
import net.corda.core.flows.NotaryFlow
|
import net.corda.core.flows.NotaryFlow
|
||||||
import net.corda.core.node.services.TimeWindowChecker
|
|
||||||
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
||||||
import net.corda.node.services.api.ServiceHubInternal
|
import net.corda.node.services.api.ServiceHubInternal
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
|
|
||||||
/** A simple Notary service that does not perform transaction validation */
|
/** A simple Notary service that does not perform transaction validation */
|
||||||
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||||
override val timeWindowChecker = TimeWindowChecker(services.clock)
|
|
||||||
override val uniquenessProvider = PersistentUniquenessProvider()
|
override val uniquenessProvider = PersistentUniquenessProvider()
|
||||||
|
|
||||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = NonValidatingNotaryFlow(otherPartySession, this)
|
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = NonValidatingNotaryFlow(otherPartySession, this)
|
||||||
|
@ -2,15 +2,12 @@ package net.corda.node.services.transactions
|
|||||||
|
|
||||||
import net.corda.core.flows.FlowSession
|
import net.corda.core.flows.FlowSession
|
||||||
import net.corda.core.flows.NotaryFlow
|
import net.corda.core.flows.NotaryFlow
|
||||||
import net.corda.core.node.services.TimeWindowChecker
|
|
||||||
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
import net.corda.core.node.services.TrustedAuthorityNotaryService
|
||||||
import net.corda.node.services.api.ServiceHubInternal
|
import net.corda.node.services.api.ServiceHubInternal
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
|
|
||||||
/** A Notary service that validates the transaction chain of the submitted transaction before committing it */
|
/** A Notary service that validates the transaction chain of the submitted transaction before committing it */
|
||||||
class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||||
override val timeWindowChecker = TimeWindowChecker(services.clock)
|
|
||||||
|
|
||||||
override val uniquenessProvider = PersistentUniquenessProvider()
|
override val uniquenessProvider = PersistentUniquenessProvider()
|
||||||
|
|
||||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = ValidatingNotaryFlow(otherPartySession, this)
|
override fun createServiceFlow(otherPartySession: FlowSession): NotaryFlow.Service = ValidatingNotaryFlow(otherPartySession, this)
|
||||||
|
@ -32,7 +32,6 @@ enterpriseConfiguration = {
|
|||||||
waitInterval = 40000
|
waitInterval = 40000
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
useAMQPBridges = true
|
|
||||||
rpcSettings = {
|
rpcSettings = {
|
||||||
useSsl = false
|
useSsl = false
|
||||||
standAloneBroker = false
|
standAloneBroker = false
|
||||||
|
@ -2,7 +2,6 @@ package net.corda.node.services.messaging
|
|||||||
|
|
||||||
import com.nhaarman.mockito_kotlin.doReturn
|
import com.nhaarman.mockito_kotlin.doReturn
|
||||||
import com.nhaarman.mockito_kotlin.whenever
|
import com.nhaarman.mockito_kotlin.whenever
|
||||||
import net.corda.core.context.AuthServiceId
|
|
||||||
import net.corda.core.crypto.generateKeyPair
|
import net.corda.core.crypto.generateKeyPair
|
||||||
import net.corda.core.concurrent.CordaFuture
|
import net.corda.core.concurrent.CordaFuture
|
||||||
import com.codahale.metrics.MetricRegistry
|
import com.codahale.metrics.MetricRegistry
|
||||||
@ -10,8 +9,6 @@ import net.corda.core.crypto.generateKeyPair
|
|||||||
import net.corda.core.internal.concurrent.openFuture
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.internal.configureDatabase
|
import net.corda.node.internal.configureDatabase
|
||||||
import net.corda.node.internal.security.RPCSecurityManager
|
|
||||||
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
|
||||||
import net.corda.node.services.config.CertChainPolicyConfig
|
import net.corda.node.services.config.CertChainPolicyConfig
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||||
@ -59,7 +56,6 @@ class ArtemisMessagingTest {
|
|||||||
|
|
||||||
private lateinit var config: NodeConfiguration
|
private lateinit var config: NodeConfiguration
|
||||||
private lateinit var database: CordaPersistence
|
private lateinit var database: CordaPersistence
|
||||||
private lateinit var securityManager: RPCSecurityManager
|
|
||||||
private var messagingClient: P2PMessagingClient? = null
|
private var messagingClient: P2PMessagingClient? = null
|
||||||
private var messagingServer: ArtemisMessagingServer? = null
|
private var messagingServer: ArtemisMessagingServer? = null
|
||||||
|
|
||||||
@ -67,7 +63,6 @@ class ArtemisMessagingTest {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
fun setUp() {
|
fun setUp() {
|
||||||
securityManager = RPCSecurityManagerImpl.fromUserList(users = emptyList(), id = AuthServiceId("TEST"))
|
|
||||||
abstract class AbstractNodeConfiguration : NodeConfiguration
|
abstract class AbstractNodeConfiguration : NodeConfiguration
|
||||||
config = rigorousMock<AbstractNodeConfiguration>().also {
|
config = rigorousMock<AbstractNodeConfiguration>().also {
|
||||||
doReturn(temporaryFolder.root.toPath()).whenever(it).baseDirectory
|
doReturn(temporaryFolder.root.toPath()).whenever(it).baseDirectory
|
||||||
@ -78,7 +73,6 @@ class ArtemisMessagingTest {
|
|||||||
doReturn("").whenever(it).exportJMXto
|
doReturn("").whenever(it).exportJMXto
|
||||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||||
doReturn(5).whenever(it).messageRedeliveryDelaySeconds
|
doReturn(5).whenever(it).messageRedeliveryDelaySeconds
|
||||||
doReturn(true).whenever(it).useAMQPBridges
|
|
||||||
}
|
}
|
||||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||||
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), rigorousMock())
|
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), rigorousMock())
|
||||||
@ -181,6 +175,7 @@ class ArtemisMessagingTest {
|
|||||||
null,
|
null,
|
||||||
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
|
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
|
||||||
database,
|
database,
|
||||||
|
networkMapCache,
|
||||||
maxMessageSize = maxMessageSize).apply {
|
maxMessageSize = maxMessageSize).apply {
|
||||||
config.configureWithDevSSLCertificate()
|
config.configureWithDevSSLCertificate()
|
||||||
messagingClient = this
|
messagingClient = this
|
||||||
@ -189,7 +184,7 @@ class ArtemisMessagingTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
|
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
|
||||||
return ArtemisMessagingServer(config, local, networkMapCache, maxMessageSize).apply {
|
return ArtemisMessagingServer(config, local, maxMessageSize).apply {
|
||||||
config.configureWithDevSSLCertificate()
|
config.configureWithDevSSLCertificate()
|
||||||
messagingServer = this
|
messagingServer = this
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,6 @@ import java.security.SignatureException
|
|||||||
// START 1
|
// START 1
|
||||||
@CordaService
|
@CordaService
|
||||||
class MyCustomValidatingNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
class MyCustomValidatingNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||||
override val timeWindowChecker = TimeWindowChecker(services.clock)
|
|
||||||
override val uniquenessProvider = PersistentUniquenessProvider()
|
override val uniquenessProvider = PersistentUniquenessProvider()
|
||||||
|
|
||||||
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this)
|
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this)
|
||||||
|
@ -5,7 +5,6 @@ import com.google.common.jimfs.Jimfs
|
|||||||
import com.nhaarman.mockito_kotlin.doReturn
|
import com.nhaarman.mockito_kotlin.doReturn
|
||||||
import com.nhaarman.mockito_kotlin.whenever
|
import com.nhaarman.mockito_kotlin.whenever
|
||||||
import net.corda.core.DoNotImplement
|
import net.corda.core.DoNotImplement
|
||||||
import net.corda.core.crypto.entropyToKeyPair
|
|
||||||
import net.corda.core.crypto.Crypto
|
import net.corda.core.crypto.Crypto
|
||||||
import net.corda.core.crypto.random63BitValue
|
import net.corda.core.crypto.random63BitValue
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
@ -515,7 +514,6 @@ private fun mockNodeConfiguration(): NodeConfiguration {
|
|||||||
doReturn(5).whenever(it).messageRedeliveryDelaySeconds
|
doReturn(5).whenever(it).messageRedeliveryDelaySeconds
|
||||||
doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec
|
doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec
|
||||||
doReturn(null).whenever(it).devModeOptions
|
doReturn(null).whenever(it).devModeOptions
|
||||||
doReturn(true).whenever(it).useAMQPBridges
|
|
||||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user