mirror of
https://github.com/corda/corda.git
synced 2025-06-17 06:38:21 +00:00
Merge pull request #476 from corda/mnesbit-external-bridge
ENT-1549: Initial Creation of Bridge/Float Process
This commit is contained in:
@ -59,8 +59,6 @@ class AMQPBridgeTest {
|
||||
|
||||
private abstract class AbstractNodeConfiguration : NodeConfiguration
|
||||
|
||||
// TODO: revisit upon Matthew Nesbitt return
|
||||
@Ignore()
|
||||
@Test
|
||||
fun `test acked and nacked messages`() {
|
||||
// Create local queue
|
||||
|
@ -178,9 +178,11 @@ open class Node(configuration: NodeConfiguration,
|
||||
} else {
|
||||
startLocalRpcBroker(networkParameters)
|
||||
}
|
||||
val advertisedAddress = info.addresses[0]
|
||||
bridgeControlListener = BridgeControlListener(configuration, serverAddress, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE)
|
||||
|
||||
val advertisedAddress = info.addresses.single()
|
||||
val externalBridge = configuration.enterpriseConfiguration.externalBridge
|
||||
if (externalBridge == null || !externalBridge) {
|
||||
bridgeControlListener = BridgeControlListener(configuration, serverAddress, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE)
|
||||
}
|
||||
printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString())
|
||||
|
||||
val rpcServerConfiguration = RPCServerConfiguration.default.copy(
|
||||
@ -205,6 +207,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
database,
|
||||
services.networkMapCache,
|
||||
services.monitoringService.metrics,
|
||||
info.legalIdentities[0].name.toString(),
|
||||
advertisedAddress,
|
||||
/*networkParameters.maxMessageSize*/MAX_FILE_SIZE,
|
||||
nodeProperties.flowsDrainingMode::isEnabled,
|
||||
|
@ -16,8 +16,8 @@ import net.corda.node.services.statemachine.transitions.StateMachineConfiguratio
|
||||
data class EnterpriseConfiguration(
|
||||
val mutualExclusionConfiguration: MutualExclusionConfiguration,
|
||||
val useMultiThreadedSMM: Boolean = true,
|
||||
val tuning: PerformanceTuning = PerformanceTuning.default
|
||||
)
|
||||
val tuning: PerformanceTuning = PerformanceTuning.default,
|
||||
val externalBridge: Boolean? = null)
|
||||
|
||||
data class MutualExclusionConfiguration(val on: Boolean = false, val machineName: String, val updateInterval: Long, val waitInterval: Long)
|
||||
|
||||
|
@ -48,7 +48,6 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
||||
val verifierType: VerifierType
|
||||
val messageRedeliveryDelaySeconds: Int
|
||||
val notary: NotaryConfig?
|
||||
val activeMQServer: ActiveMqServerConfiguration
|
||||
val additionalNodeInfoPollingFrequencyMsec: Long
|
||||
val p2pAddress: NetworkHostAndPort
|
||||
val rpcOptions: NodeRpcOptions
|
||||
@ -139,12 +138,6 @@ data class BFTSMaRtConfiguration(
|
||||
}
|
||||
}
|
||||
|
||||
data class BridgeConfiguration(val retryIntervalMs: Long,
|
||||
val maxRetryIntervalMin: Long,
|
||||
val retryIntervalMultiplier: Double)
|
||||
|
||||
data class ActiveMqServerConfiguration(val bridge: BridgeConfiguration)
|
||||
|
||||
fun Config.parseAsNodeConfiguration(): NodeConfiguration = parseAs<NodeConfigurationImpl>()
|
||||
|
||||
data class NodeConfigurationImpl(
|
||||
@ -178,7 +171,6 @@ data class NodeConfigurationImpl(
|
||||
override val devModeOptions: DevModeOptions? = null,
|
||||
override val useTestClock: Boolean = false,
|
||||
override val detectPublicIp: Boolean = true,
|
||||
override val activeMQServer: ActiveMqServerConfiguration,
|
||||
// TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration
|
||||
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(),
|
||||
override val sshd: SSHDConfiguration? = null,
|
||||
|
@ -35,7 +35,6 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
|
||||
import net.corda.nodeapi.internal.requireOnDefaultFileSystem
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
|
@ -49,7 +49,8 @@ class MessagingExecutor(
|
||||
val resolver: AddressToArtemisQueueResolver,
|
||||
metricRegistry: MetricRegistry,
|
||||
val ourSenderUUID: String,
|
||||
queueBound: Int
|
||||
queueBound: Int,
|
||||
val myLegalName: String
|
||||
) {
|
||||
private sealed class Job {
|
||||
data class Acknowledge(val message: ClientMessage) : Job()
|
||||
@ -164,6 +165,7 @@ class MessagingExecutor(
|
||||
putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion)
|
||||
putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion)
|
||||
putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic))
|
||||
putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, SimpleString(myLegalName))
|
||||
sendMessageSizeMetric.update(message.data.bytes.size)
|
||||
writeBodyBufferBytes(message.data.bytes)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
|
@ -106,6 +106,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
private val database: CordaPersistence,
|
||||
private val networkMap: NetworkMapCacheInternal,
|
||||
private val metricRegistry: MetricRegistry,
|
||||
val legalName: String,
|
||||
advertisedAddress: NetworkHostAndPort = serverAddress,
|
||||
private val maxMessageSize: Int,
|
||||
private val isDrainingModeOn: () -> Boolean,
|
||||
@ -170,6 +171,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
private val messageRedeliveryDelaySeconds = config.messageRedeliveryDelaySeconds.toLong()
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
|
||||
private val externalBridge: Boolean = config.enterpriseConfiguration.externalBridge ?: false
|
||||
|
||||
private val handlers = ConcurrentHashMap<String, MessageHandler>()
|
||||
|
||||
@ -238,7 +240,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
this@P2PMessagingClient,
|
||||
metricRegistry,
|
||||
queueBound = config.enterpriseConfiguration.tuning.maximumMessagingBatchSize,
|
||||
ourSenderUUID = deduplicator.ourSenderUUID
|
||||
ourSenderUUID = deduplicator.ourSenderUUID,
|
||||
myLegalName = legalName
|
||||
)
|
||||
this@P2PMessagingClient.messagingExecutor = messagingExecutor
|
||||
messagingExecutor.start()
|
||||
@ -384,7 +387,12 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
private fun artemisToCordaMessage(message: ClientMessage): ReceivedMessage? {
|
||||
try {
|
||||
val topic = message.required(P2PMessagingHeaders.topicProperty) { getStringProperty(it) }
|
||||
val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" }
|
||||
val user = requireNotNull(if (externalBridge) {
|
||||
message.getStringProperty(P2PMessagingHeaders.bridgedCertificateSubject) ?: message.getStringProperty(HDR_VALIDATED_USER)
|
||||
} else {
|
||||
message.getStringProperty(HDR_VALIDATED_USER)
|
||||
}) { "Message is not authenticated" }
|
||||
|
||||
val platformVersion = message.required(P2PMessagingHeaders.platformVersionProperty) { getIntProperty(it) }
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) }
|
||||
|
@ -16,13 +16,6 @@ devMode = true
|
||||
h2port = 0
|
||||
useTestClock = false
|
||||
verifierType = InMemory
|
||||
activeMQServer = {
|
||||
bridge = {
|
||||
retryIntervalMs = 5000
|
||||
retryIntervalMultiplier = 1.5
|
||||
maxRetryIntervalMin = 3
|
||||
}
|
||||
}
|
||||
enterpriseConfiguration = {
|
||||
mutualExclusionConfiguration = {
|
||||
on = false
|
||||
|
@ -118,7 +118,6 @@ class NodeConfigurationImplTest {
|
||||
certificateChainCheckPolicies = emptyList(),
|
||||
devMode = true,
|
||||
noLocalShell = false,
|
||||
activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)),
|
||||
rpcSettings = rpcSettings,
|
||||
relay = null,
|
||||
enterpriseConfiguration = EnterpriseConfiguration((MutualExclusionConfiguration(false, "", 20000, 40000)))
|
||||
|
@ -216,10 +216,10 @@ class ArtemisMessagingTest {
|
||||
// Now change the receiver
|
||||
try {
|
||||
val messagingClient2 = createMessagingClient()
|
||||
messagingClient2.addMessageHandler(TOPIC) { message, _, handle ->
|
||||
messagingClient2.addMessageHandler(TOPIC) { msg, _, handle ->
|
||||
database.transaction { handle.persistDeduplicationId() }
|
||||
handle.acknowledge() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
|
||||
receivedMessages.add(message)
|
||||
receivedMessages.add(msg)
|
||||
}
|
||||
startNodeMessagingClient()
|
||||
|
||||
@ -251,10 +251,10 @@ class ArtemisMessagingTest {
|
||||
fakeMsg2!!.putStringProperty(HDR_VALIDATED_USER, SimpleString("O=Bank A, L=New York, C=US"))
|
||||
|
||||
val messagingClient3 = createMessagingClient()
|
||||
messagingClient3.addMessageHandler(TOPIC) { message, _, handle ->
|
||||
messagingClient3.addMessageHandler(TOPIC) { msg, _, handle ->
|
||||
database.transaction { handle.persistDeduplicationId() }
|
||||
handle.acknowledge() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
|
||||
receivedMessages.add(message)
|
||||
receivedMessages.add(msg)
|
||||
}
|
||||
startNodeMessagingClient()
|
||||
|
||||
@ -305,6 +305,7 @@ class ArtemisMessagingTest {
|
||||
database,
|
||||
networkMapCache,
|
||||
MetricRegistry(),
|
||||
ALICE_NAME.toString(),
|
||||
maxMessageSize = maxMessageSize,
|
||||
isDrainingModeOn = { false },
|
||||
drainingModeWasChangedEvents = PublishSubject.create()).apply {
|
||||
|
Reference in New Issue
Block a user