When discarding invalid messages we should consume them. When starting up wait for node bridge registration before activating the float, otherwise we have a race condition.

Reduce excessive logging

Address PR comments

Address PR comments
This commit is contained in:
Matthew Nesbit 2018-04-09 12:04:29 +01:00
parent a4a6eedbf0
commit e51de2739c
8 changed files with 84 additions and 32 deletions

View File

@ -15,6 +15,8 @@ import com.nhaarman.mockito_kotlin.whenever
import net.corda.bridge.internal.BridgeInstance
import net.corda.bridge.services.api.BridgeMode
import net.corda.core.internal.div
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.EnterpriseConfiguration
@ -22,11 +24,17 @@ import net.corda.node.services.config.MutualExclusionConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.rigorousMock
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
import org.junit.Rule
@ -57,6 +65,7 @@ class BridgeIntegrationTest {
config.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val (artemisServer, artemisClient) = createArtemis()
try {
installBridgeControlResponder(artemisClient)
val bridge = BridgeInstance(config, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
val stateFollower = bridge.activeChange.toBlocking().iterator
assertEquals(false, stateFollower.next())
@ -75,7 +84,6 @@ class BridgeIntegrationTest {
}
}
@Test
fun `Load bridge (float inner) and float outer and stand them up`() {
val bridgeFolder = tempFolder.root.toPath()
@ -94,6 +102,7 @@ class BridgeIntegrationTest {
assertEquals(NetworkHostAndPort("0.0.0.0", 10005), floatConfig.inboundConfig!!.listeningAddress)
val (artemisServer, artemisClient) = createArtemis()
try {
installBridgeControlResponder(artemisClient)
val bridge = BridgeInstance(bridgeConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
val bridgeStateFollower = bridge.activeChange.toBlocking().iterator
val float = BridgeInstance(floatConfig, BridgeVersionInfo(1, "1.1", "Dummy", "Test"))
@ -142,4 +151,20 @@ class BridgeIntegrationTest {
artemisClient.start()
return Pair(artemisServer, artemisClient)
}
private fun installBridgeControlResponder(artemisClient: ArtemisMessagingClient) {
val artemis = artemisClient.started!!
val inboxAddress = SimpleString("${P2P_PREFIX}Test")
artemis.session.createQueue(inboxAddress, RoutingType.ANYCAST, inboxAddress, true)
artemis.session.createQueue(BRIDGE_NOTIFY, RoutingType.ANYCAST, BRIDGE_NOTIFY, false)
val controlConsumer = artemis.session.createConsumer(BRIDGE_NOTIFY)
controlConsumer.setMessageHandler { msg ->
val bridgeControl = BridgeControl.NodeToBridgeSnapshot("Test", listOf(inboxAddress.toString()), emptyList())
val controlPacket = bridgeControl.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
val artemisMessage = artemis.session.createMessage(false)
artemisMessage.writeBodyBufferBytes(controlPacket)
artemis.producer.send(BRIDGE_CONTROL, artemisMessage)
msg.acknowledge()
}
}
}

View File

@ -22,17 +22,15 @@ import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.div
import net.corda.core.internal.exists
import net.corda.core.internal.readObject
import net.corda.core.node.NetworkParameters
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ShutdownHook
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
import net.corda.nodeapi.internal.serialization.AMQP_P2P_CONTEXT
import net.corda.nodeapi.internal.serialization.AMQP_STORAGE_CONTEXT
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
@ -50,7 +48,7 @@ class BridgeInstance(val conf: BridgeConfiguration,
private val shutdown = AtomicBoolean(false)
private var shutdownHook: ShutdownHook? = null
private lateinit var networkParameters: NetworkParameters
private var maxMessageSize: Int = -1
private lateinit var bridgeAuditService: BridgeAuditService
private var bridgeSupervisorService: BridgeSupervisorService? = null
private var floatSupervisorService: FloatSupervisorService? = null
@ -112,17 +110,15 @@ class BridgeInstance(val conf: BridgeConfiguration,
val onExit: CordaFuture<BridgeInstance> get() = _exitFuture
private fun retrieveNetworkParameters() {
val trustRoot = conf.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
val networkParamsFile = conf.baseDirectory / NETWORK_PARAMS_FILE_NAME
require(networkParamsFile.exists()) { "No network-parameters file found." }
networkParameters = networkParamsFile.readObject<SignedNetworkParameters>().verifiedNetworkMapCert(trustRoot)
log.info("Loaded network parameters: $networkParameters")
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
"Node's platform version is lower than network's required minimumPlatformVersion"
}
val networkParameters = networkParamsFile.readObject<SignedNetworkParameters>().raw.deserialize()
maxMessageSize = networkParameters.maxMessageSize
log.info("Loaded maxMessageSize from network-parameters file: $maxMessageSize")
}
private fun createServices() {
require(maxMessageSize > 0) { "maxMessageSize not initialised" }
bridgeAuditService = LoggingBridgeAuditService(conf)
when (conf.bridgeMode) {
// In the SenderReceiver mode the inbound and outbound message paths are run from within a single bridge process.
@ -131,8 +127,8 @@ class BridgeInstance(val conf: BridgeConfiguration,
// The process also runs a TLS/AMQP 1.0 server socket, which is can receive connections and messages from peers,
// validate the messages and then forwards the packets to the Artemis inbox queue of the node.
BridgeMode.SenderReceiver -> {
floatSupervisorService = FloatSupervisorServiceImpl(conf, networkParameters.maxMessageSize, bridgeAuditService)
bridgeSupervisorService = BridgeSupervisorServiceImpl(conf, networkParameters.maxMessageSize, bridgeAuditService, floatSupervisorService!!.amqpListenerService)
floatSupervisorService = FloatSupervisorServiceImpl(conf, maxMessageSize, bridgeAuditService)
bridgeSupervisorService = BridgeSupervisorServiceImpl(conf, maxMessageSize, bridgeAuditService, floatSupervisorService!!.amqpListenerService)
}
// In the FloatInner mode the process runs the full outbound message path as in the SenderReceiver mode, but the inbound path is split.
// This 'Float Inner/Bridge Controller' process runs the more trusted portion of the inbound path.
@ -141,7 +137,7 @@ class BridgeInstance(val conf: BridgeConfiguration,
// node inboxes, before transferring the message to Artemis. Potentially it might carry out deeper checks of received packets.
// However, the 'Float Inner' is not directly exposed to the internet, or peers and does not host the TLS/AMQP 1.0 server socket.
BridgeMode.FloatInner -> {
bridgeSupervisorService = BridgeSupervisorServiceImpl(conf, networkParameters.maxMessageSize, bridgeAuditService, null)
bridgeSupervisorService = BridgeSupervisorServiceImpl(conf, maxMessageSize, bridgeAuditService, null)
}
// In the FloatOuter mode this process runs a minimal AMQP proxy that is designed to run in a DMZ zone.
// The process holds the minimum data necessary to act as the TLS/AMQP 1.0 receiver socket and tries
@ -156,7 +152,7 @@ class BridgeInstance(val conf: BridgeConfiguration,
// holding potentially sensitive information and are then forwarded across the control tunnel to the 'Float Inner' process for more
// complete validation checks.
BridgeMode.FloatOuter -> {
floatSupervisorService = FloatSupervisorServiceImpl(conf, networkParameters.maxMessageSize, bridgeAuditService)
floatSupervisorService = FloatSupervisorServiceImpl(conf, maxMessageSize, bridgeAuditService)
}
}
statusFollower = ServiceStateCombiner(listOf(bridgeAuditService, floatSupervisorService, bridgeSupervisorService).filterNotNull())

View File

@ -86,6 +86,12 @@ class SimpleMessageFilterService(val conf: BridgeConfiguration,
override fun sendMessageToLocalBroker(inboundMessage: ReceivedMessage) {
try {
validateMessage(inboundMessage)
} catch (ex: Exception) {
auditService.packetDropEvent(inboundMessage, "Packet Failed validation checks: " + ex.message)
inboundMessage.complete(true) // consume the bad message, so that it isn't redelivered forever.
return
}
try {
val session = inboundSession
val producer = inboundProducer
if (session == null || producer == null) {
@ -102,8 +108,8 @@ class SimpleMessageFilterService(val conf: BridgeConfiguration,
producer.send(SimpleString(inboundMessage.topic), artemisMessage, { _ -> inboundMessage.complete(true) })
auditService.packetAcceptedEvent(inboundMessage)
} catch (ex: Exception) {
auditService.packetDropEvent(inboundMessage, "Packet Failed validation checks: " + ex.message)
inboundMessage.complete(false)
log.error("Error trying to forward message", ex)
inboundMessage.complete(false) // delivery failure. NAK back to source and await re-delivery attempts
}
}
}

View File

@ -153,18 +153,18 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
private fun onControlMessage(receivedMessage: ReceivedMessage) {
if (!receivedMessage.checkTunnelControlTopic()) {
auditService.packetDropEvent(receivedMessage, "Invalid control topic packet received on topic ${receivedMessage.topic}!!")
receivedMessage.complete(false)
receivedMessage.complete(true)
return
}
val controlMessage = try {
if (CordaX500Name.parse(receivedMessage.sourceLegalName) != floatClientName) {
auditService.packetDropEvent(receivedMessage, "Invalid control source legal name!!")
receivedMessage.complete(false)
receivedMessage.complete(true)
return
}
receivedMessage.payload.deserialize<TunnelControlMessage>()
} catch (ex: Exception) {
receivedMessage.complete(false)
receivedMessage.complete(true)
return
}
lock.withLock {
@ -206,12 +206,12 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
}
}
if (amqpControl == null) {
message.complete(false)
message.complete(true) // consume message so it isn't resent forever
return
}
if (!message.topic.startsWith(P2P_PREFIX)) {
auditService.packetDropEvent(message, "Message topic is not a valid peer namespace ${message.topic}")
message.complete(false)
message.complete(true) // consume message so it isn't resent forever
return
}
val appProperties = message.applicationProperties.map { Pair(it.key!!.toString(), it.value) }.toList()

View File

@ -22,6 +22,7 @@ import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
@ -172,17 +173,17 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
private fun onFloatMessage(receivedMessage: ReceivedMessage) {
if (!receivedMessage.checkTunnelDataTopic()) {
auditService.packetDropEvent(receivedMessage, "Invalid float inbound topic received ${receivedMessage.topic}!!")
receivedMessage.complete(false)
receivedMessage.complete(true)
return
}
val innerMessage = try {
receivedMessage.payload.deserialize<FloatDataPacket>()
} catch (ex: Exception) {
auditService.packetDropEvent(receivedMessage, "Unable to decode Float Control message")
receivedMessage.complete(false)
receivedMessage.complete(true)
return
}
log.info("Received $innerMessage")
log.debug { "Received message from ${innerMessage.sourceLegalName}" }
val onwardMessage = object : ReceivedMessage {
override val topic: String = innerMessage.topic
override val applicationProperties: Map<Any?, Any?> = innerMessage.originalHeaders.toMap()

View File

@ -32,6 +32,7 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration,
private val statusFollower: ServiceStateCombiner
private var statusSubscriber: Subscription? = null
private var connectionSubscriber: Subscription? = null
private var listenerActiveSubscriber: Subscription? = null
private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf, conf.outboundConfig!!.socksProxyConfig, { ForwardingArtemisMessageClient(artemisConnectionService) })
init {
@ -56,10 +57,15 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration,
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe { ready ->
if (ready) {
listenerActiveSubscriber = bridgeControlListener.activeChange.subscribe {
stateHelper.active = it
}
bridgeControlListener.start()
stateHelper.active = true
auditService.statusChangeEvent("Waiting for activation by at least one bridge control inbox registration")
} else {
stateHelper.active = false
listenerActiveSubscriber?.unsubscribe()
listenerActiveSubscriber = null
bridgeControlListener.stop()
}
}
@ -67,6 +73,8 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration,
override fun stop() {
stateHelper.active = false
listenerActiveSubscriber?.unsubscribe()
listenerActiveSubscriber = null
bridgeControlListener.stop()
connectionSubscriber?.unsubscribe()
connectionSubscriber = null

View File

@ -76,7 +76,7 @@ class FilterServiceTest {
filterService.start()
// Not ready so packet dropped
val fakeMessage = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(false) // NAK'd
doNothing().whenever(it).complete(true) // ACK was called
}
filterService.sendMessageToLocalBroker(fakeMessage)
assertEquals(TestAuditService.AuditEvent.PACKET_DROP, auditFollower.next()) // Dropped as not ready
@ -154,7 +154,7 @@ class FilterServiceTest {
// empty legal name
val badMessage1 = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(false) // NAK was called
doNothing().whenever(it).complete(true) // ACK was called
doReturn("").whenever(it).sourceLegalName
doReturn(inboxTopic).whenever(it).topic
doReturn(ByteArray(1)).whenever(it).payload
@ -165,7 +165,7 @@ class FilterServiceTest {
verify(dummyProducer, times(0)).send(ArgumentMatchers.any(), eq(dummyMessage), ArgumentMatchers.any()) // not sent
// bad legal name
val badMessage2 = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(false) // NAK was called
doNothing().whenever(it).complete(true) // ACK was called
doReturn("CN=Test").whenever(it).sourceLegalName
doReturn(inboxTopic).whenever(it).topic
doReturn(ByteArray(1)).whenever(it).payload
@ -176,7 +176,7 @@ class FilterServiceTest {
verify(dummyProducer, times(0)).send(ArgumentMatchers.any(), eq(dummyMessage), ArgumentMatchers.any()) // not sent
// empty payload
val badMessage3 = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(false) // NAK was called
doNothing().whenever(it).complete(true) // ACK was called
doReturn(DUMMY_BANK_B_NAME.toString()).whenever(it).sourceLegalName
doReturn(inboxTopic).whenever(it).topic
doReturn(ByteArray(0)).whenever(it).payload
@ -187,7 +187,7 @@ class FilterServiceTest {
verify(dummyProducer, times(0)).send(ArgumentMatchers.any(), eq(dummyMessage), ArgumentMatchers.any()) // not sent
// bad topic
val badMessage4 = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(false) // NAK was called
doNothing().whenever(it).complete(true) // ACK was called
doReturn(DUMMY_BANK_B_NAME.toString()).whenever(it).sourceLegalName
doReturn("bridge.control").whenever(it).topic
doReturn(ByteArray(1)).whenever(it).payload
@ -198,7 +198,7 @@ class FilterServiceTest {
verify(dummyProducer, times(0)).send(ArgumentMatchers.any(), eq(dummyMessage), ArgumentMatchers.any()) // not sent
// Non-whitelist header header
val badMessage5 = rigorousMock<ReceivedMessage>().also {
doNothing().whenever(it).complete(false) // NAK was called
doNothing().whenever(it).complete(true) // ACK was called
doReturn(DUMMY_BANK_B_NAME.toString()).whenever(it).sourceLegalName
doReturn(inboxTopic).whenever(it).topic
doReturn(ByteArray(1)).whenever(it).payload

View File

@ -27,6 +27,8 @@ import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import rx.Observable
import rx.subjects.PublishSubject
import java.util.*
class BridgeControlListener(val config: NodeSSLConfiguration,
@ -47,6 +49,13 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
private val log = contextLogger()
}
val active: Boolean
get() = validInboundQueues.isNotEmpty()
private val _activeChange = PublishSubject.create<Boolean>().toSerialized()
val activeChange: Observable<Boolean>
get() = _activeChange
fun start() {
stop()
bridgeManager.start()
@ -73,6 +82,9 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
}
fun stop() {
if (active) {
_activeChange.onNext(false)
}
validInboundQueues.clear()
controlConsumer?.close()
controlConsumer = null
@ -112,7 +124,11 @@ class BridgeControlListener(val config: NodeSSLConfiguration,
for (outQueue in controlMessage.sendQueues) {
bridgeManager.deployBridge(outQueue.queueName, outQueue.targets.first(), outQueue.legalNames.toSet())
}
val wasActive = active
validInboundQueues.addAll(controlMessage.inboxQueues)
if (!wasActive && active) {
_activeChange.onNext(true)
}
}
is BridgeControl.BridgeToNodeSnapshotRequest -> {
log.error("Message from Bridge $controlMessage detected on wrong topic!")