From 8c23abbd7af53409b21cd977aa9acde08b92bd1f Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Tue, 2 Oct 2018 14:13:08 +0100 Subject: [PATCH] ENT-2500: Corda Firewall should log some packet statistics (#1426) * ENT-2500: Refactoring to allow FirewallAuditService have inbound as well as outbound logging. Change some visibility modifiers to `private`. Use in-place initialisation where possible. * ENT-2500: Record accepted package coming into Float * ENT-2500: Introduce optional BridgeAuditService in `nodeApi` module * ENT-2500: Switch FirewallAuditService to use `ApplicationMessage` and bind outgoing message stats. * ENT-2500: Introduce scheduled executor and audit service configuration. * ENT-2500: Stats formatting. * ENT-2500: Stats formatting unit test. * ENT-2500: Minor changes to LoggingFirewallAuditService and its unit test. * ENT-2500: Additional configuration parameter documentation update. * ENT-2500: Supply optional parameter. * ENT-2500: Address PR comments. * ENT-2500: Make API more consistent by using `RoutingDirection`, re-jig `State` data structure, improve unit test. * ENT-2500: Add breakdown by endpoint address. * ENT-2500: Compilation fix after rebase in `master`. * ENT-2500: Making `AuditServiceConfiguration` not optional and supplying default settings. Also few minor changes. --- .../services/api/FirewallAuditService.kt | 18 +- .../services/api/FirewallConfiguration.kt | 5 + .../audit/LoggingFirewallAuditService.kt | 168 +++++++++++++++++- .../config/FirewallConfigurationImpl.kt | 5 +- .../filter/SimpleMessageFilterService.kt | 18 +- .../receiver/BridgeAMQPListenerServiceImpl.kt | 13 +- .../receiver/FloatControlListenerService.kt | 21 +-- .../TunnelingBridgeReceiverService.kt | 24 +-- .../sender/DirectBridgeSenderService.kt | 48 ++++- .../src/main/resources/firewalldefault.conf | 5 +- .../kotlin/net/corda/bridge/ConfigTest.kt | 13 ++ .../corda/bridge/services/TestAuditService.kt | 11 +- .../audit/LoggingFirewallAuditServiceTest.kt | 157 ++++++++++++++++ .../withaudit/badconfig/badInterval.conf | 18 ++ .../net/corda/bridge/withaudit/firewall.conf | 18 ++ docs/source/firewall-configuration-file.rst | 4 + .../corda/rpcWorker/RpcFlowWorkerDriver.kt | 4 +- 17 files changed, 474 insertions(+), 76 deletions(-) create mode 100644 bridge/src/test/kotlin/net/corda/bridge/services/audit/LoggingFirewallAuditServiceTest.kt create mode 100644 bridge/src/test/resources/net/corda/bridge/withaudit/badconfig/badInterval.conf create mode 100644 bridge/src/test/resources/net/corda/bridge/withaudit/firewall.conf diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/api/FirewallAuditService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/api/FirewallAuditService.kt index 9a281df585..47a2532293 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/api/FirewallAuditService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/api/FirewallAuditService.kt @@ -1,6 +1,6 @@ package net.corda.bridge.services.api -import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage +import net.corda.nodeapi.internal.protonwrapper.messages.ApplicationMessage import java.net.InetSocketAddress /** @@ -9,9 +9,17 @@ import java.net.InetSocketAddress * security data to an enterprise service. */ interface FirewallAuditService : ServiceLifecycleSupport { - fun successfulConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress, certificateSubject: String, msg: String) - fun failedConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress?, certificateSubject: String?, msg: String) - fun packetDropEvent(packet: ReceivedMessage?, msg: String) - fun packetAcceptedEvent(packet: ReceivedMessage) + fun successfulConnectionEvent(address: InetSocketAddress, certificateSubject: String, msg: String, direction: RoutingDirection) + fun failedConnectionEvent(address: InetSocketAddress, certificateSubject: String?, msg: String, direction: RoutingDirection) + fun packetDropEvent(packet: ApplicationMessage?, msg: String, direction: RoutingDirection) + fun packetAcceptedEvent(packet: ApplicationMessage, direction: RoutingDirection) fun statusChangeEvent(msg: String) +} + +/** + * Specifies direction of message flow with regard to Corda Node connected to Firewall. + */ +enum class RoutingDirection { + INBOUND, + OUTGOING } \ No newline at end of file diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/api/FirewallConfiguration.kt b/bridge/src/main/kotlin/net/corda/bridge/services/api/FirewallConfiguration.kt index 720f93362e..23ffe84f43 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/api/FirewallConfiguration.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/api/FirewallConfiguration.kt @@ -85,6 +85,10 @@ interface FloatOuterConfiguration { val customSSLConfiguration: BridgeSSLConfiguration? } +interface AuditServiceConfiguration { + val loggingIntervalSec: Long +} + interface FirewallConfiguration { val baseDirectory: Path val firewallMode: FirewallMode @@ -112,4 +116,5 @@ interface FirewallConfiguration { val whitelistedHeaders: List val crlCheckSoftFail: Boolean val p2pSslOptions: MutualSslConfiguration + val auditServiceConfiguration: AuditServiceConfiguration } \ No newline at end of file diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/audit/LoggingFirewallAuditService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/audit/LoggingFirewallAuditService.kt index 9791a7a261..c0ddb9d432 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/audit/LoggingFirewallAuditService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/audit/LoggingFirewallAuditService.kt @@ -2,44 +2,198 @@ package net.corda.bridge.services.audit import net.corda.bridge.services.api.FirewallAuditService import net.corda.bridge.services.api.FirewallConfiguration +import net.corda.bridge.services.api.RoutingDirection import net.corda.bridge.services.api.ServiceStateSupport import net.corda.bridge.services.util.ServiceStateHelper import net.corda.core.utilities.contextLogger import net.corda.core.utilities.trace +import net.corda.nodeapi.internal.protonwrapper.messages.ApplicationMessage import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage +import java.lang.management.ManagementFactory import java.net.InetSocketAddress +import java.text.NumberFormat +import java.time.Duration +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.locks.ReentrantLock class LoggingFirewallAuditService(val conf: FirewallConfiguration, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : FirewallAuditService, ServiceStateSupport by stateHelper { companion object { - val log = contextLogger() + private val log = contextLogger() + + private fun Map.sumValues(): Long { + return this.values.map { it.get() }.sum() + } + + private fun Map.prettyPrint(tabsCount: Int, nf: NumberFormat): String { + val leftPad = "\t".repeat(tabsCount) + return entries.joinToString(separator = "\n") { entry -> leftPad + entry.key.toString() + " -> " + nf.format(entry.value.get())} + } } + private val loggingIntervalSec = conf.auditServiceConfiguration.loggingIntervalSec + + private val timedExecutor = Executors.newScheduledThreadPool(1) + + private val executionLock = ReentrantLock() + + private data class DirectionalStats(val successfulConnectionCount : ConcurrentMap = ConcurrentHashMap(), + val failedConnectionCount : ConcurrentMap = ConcurrentHashMap(), + val accepted : ConcurrentMap> = ConcurrentHashMap>(), + val droppedPacketsCount : ConcurrentMap = ConcurrentHashMap()) + + private data class State(val directionalStatsMap: EnumMap = forEveryRoutingDirection() + ) { + companion object { + private fun forEveryRoutingDirection() : EnumMap { + val map = RoutingDirection.values().map { it to DirectionalStats() }.toMap() + return EnumMap(map) + } + } + } + + private val stateRef = AtomicReference(State()) + override fun start() { stateHelper.active = true + timedExecutor.scheduleAtFixedRate(::logStatsAndReset, loggingIntervalSec, loggingIntervalSec, TimeUnit.SECONDS) } override fun stop() { stateHelper.active = false + timedExecutor.shutdown() } - override fun successfulConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress, certificateSubject: String, msg: String) { + override fun successfulConnectionEvent(address: InetSocketAddress, certificateSubject: String, msg: String, direction: RoutingDirection) { log.info(msg) + withDirectionalStatsOf(direction) { + successfulConnectionCount.getOrPut(address, ::AtomicLong).incrementAndGet() + } } - override fun failedConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress?, certificateSubject: String?, msg: String) { + override fun failedConnectionEvent(address: InetSocketAddress, certificateSubject: String?, msg: String, direction: RoutingDirection) { log.warn(msg) + withDirectionalStatsOf(direction) { + failedConnectionCount.getOrPut(address, ::AtomicLong).incrementAndGet() + } } - override fun packetDropEvent(packet: ReceivedMessage?, msg: String) { - log.info(msg) + private fun withDirectionalStatsOf(direction: RoutingDirection, block: DirectionalStats.() -> Unit) { + val directionalStats = stateRef.get().directionalStatsMap[direction] + if(directionalStats == null) { + log.warn("Unknown direction: $direction") + } else { + directionalStats.block() + } } - override fun packetAcceptedEvent(packet: ReceivedMessage) { - log.trace { "Packet received from ${packet.sourceLegalName} uuid: ${packet.applicationProperties["_AMQ_DUPL_ID"]}" } + private fun ApplicationMessage?.address(direction: RoutingDirection) : String { + val unknownAddress = "Unknown address" + return when(direction) { + RoutingDirection.OUTGOING -> this?.destinationLegalName ?: unknownAddress + RoutingDirection.INBOUND -> if (this is ReceivedMessage) { + this.sourceLegalName + } else { + unknownAddress + } + } + } + + override fun packetDropEvent(packet: ApplicationMessage?, msg: String, direction: RoutingDirection) { + log.info("$direction : $msg") + + withDirectionalStatsOf(direction) { + droppedPacketsCount.getOrPut(packet.address(direction), ::AtomicLong).incrementAndGet() + } + } + + override fun packetAcceptedEvent(packet: ApplicationMessage, direction: RoutingDirection) { + + val address = packet.address(direction) + log.trace { "$direction: Address: $address, uuid: ${packet.applicationProperties["_AMQ_DUPL_ID"]}" } + + withDirectionalStatsOf(direction) { + val pair = accepted.getOrPut(address) { Pair(AtomicLong(), AtomicLong())} + pair.first.incrementAndGet() + pair.second.addAndGet(packet.payload.size.toLong()) + } } override fun statusChangeEvent(msg: String) { log.info(msg) } + + private fun logStatsAndReset() { + if(executionLock.tryLock()) { + try { + val statsStr = prepareStatsAndReset() + log.info(statsStr) + } catch (ex: Exception) { + // This is running by the scheduled execution service and must not fail + log.error("Unexpected exception when logging stats", ex) + } finally { + executionLock.unlock() + } + } else { + log.warn("Skipping stats logging as it is already running in a different thread.") + } + } + + internal fun prepareStatsAndReset() : String { + + fun Long.toMB(): Long = this / (1024 * 1024) + + val operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean() + val loadAverage = operatingSystemMXBean.systemLoadAverage + val loadAverageStr = if(loadAverage < 0) "N/A" else "${loadAverage * 100}%" + + val runtime = Runtime.getRuntime() + val freeMemory = runtime.freeMemory().toMB() + val totalMemory = runtime.totalMemory().toMB() + val maxMemory = runtime.maxMemory().toMB() + + val nf = NumberFormat.getNumberInstance() + + val durationStr = "During last ${Duration.ofSeconds(loggingIntervalSec)} stats were as follows:" + + val runtimeStr = "Load average: $loadAverageStr\n" + + "Memory:\n\tFree: ${nf.format(freeMemory)} MB" + + "\n\tTotal: ${nf.format(totalMemory)} MB" + + "\n\tMax: ${nf.format(maxMemory)} MB" + + val state = stateRef.getAndSet(State()) + + val dirStatsIn = state.directionalStatsMap[RoutingDirection.INBOUND]!! + val dirStatsOut = state.directionalStatsMap[RoutingDirection.OUTGOING]!! + + val inAcceptedPackets = dirStatsIn.accepted.mapValues { it.value.first } + val outAcceptedPackets = dirStatsOut.accepted.mapValues { it.value.first } + + val trafficTotalsStr = "Traffic totals:\n" + + "\tSuccessful connection count: ${nf.format(dirStatsIn.successfulConnectionCount.sumValues())}(inbound), ${nf.format(dirStatsOut.successfulConnectionCount.sumValues())}(outgoing)\n" + + "\tFailed connection count: ${nf.format(dirStatsIn.failedConnectionCount.sumValues())}(inbound), ${nf.format(dirStatsOut.failedConnectionCount.sumValues())}(outgoing)\n" + + "\tPackets accepted count: ${nf.format(inAcceptedPackets.sumValues())}(inbound), " + + "${nf.format(outAcceptedPackets.sumValues())}(outgoing)\n" + + "\tBytes transmitted: ${nf.format(dirStatsIn.accepted.mapValues { it.value.second }.sumValues())}(inbound), " + + "${nf.format(dirStatsOut.accepted.mapValues { it.value.second }.sumValues())}(outgoing)\n" + + "\tPackets dropped count: ${nf.format(dirStatsIn.droppedPacketsCount.sumValues())}(inbound), ${nf.format(dirStatsOut.droppedPacketsCount.sumValues())}(outgoing)" + + val breakDownTrafficStr = "Traffic breakdown:\n" + + "\tSuccessful connections in:\n${dirStatsIn.successfulConnectionCount.prettyPrint(2, nf)}\n" + + "\tSuccessful connections out:\n${dirStatsOut.successfulConnectionCount.prettyPrint(2, nf)}\n" + + "\tFailed connections in:\n${dirStatsIn.failedConnectionCount.prettyPrint(2, nf)}\n" + + "\tFailed connections out:\n${dirStatsOut.failedConnectionCount.prettyPrint(2, nf)}\n" + + "\tAccepted packets in:\n${inAcceptedPackets.prettyPrint(2, nf)}\n" + + "\tAccepted packets out:\n${outAcceptedPackets.prettyPrint(2, nf)}\n" + + "\tDropped packets in:\n${dirStatsIn.droppedPacketsCount.prettyPrint(2, nf)}\n" + + "\tDropped packets out:\n${dirStatsOut.droppedPacketsCount.prettyPrint(2, nf)}" + + return durationStr + "\n" + runtimeStr + "\n" + trafficTotalsStr + "\n" + breakDownTrafficStr + } } \ No newline at end of file diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/config/FirewallConfigurationImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/config/FirewallConfigurationImpl.kt index 804624f8f5..fcd248ab31 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/config/FirewallConfigurationImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/config/FirewallConfigurationImpl.kt @@ -45,6 +45,8 @@ data class FloatOuterConfigurationImpl(override val floatAddress: NetworkHostAnd data class BridgeHAConfigImpl(override val haConnectionString: String, override val haPriority: Int = 10, override val haTopic: String = "/bridge/ha") : BridgeHAConfig +data class AuditServiceConfigurationImpl(override val loggingIntervalSec: Long) : AuditServiceConfiguration + data class FirewallConfigurationImpl( override val baseDirectory: Path, private val certificatesDirectory: Path = baseDirectory / "certificates", @@ -65,7 +67,8 @@ data class FirewallConfigurationImpl( override val artemisReconnectionIntervalMax: Int = 60000, override val politeShutdownPeriod: Int = 1000, override val p2pConfirmationWindowSize: Int = 1048576, - override val whitelistedHeaders: List = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.toList() + override val whitelistedHeaders: List = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.toList(), + override val auditServiceConfiguration: AuditServiceConfigurationImpl ) : FirewallConfiguration { init { if (firewallMode == FirewallMode.SenderReceiver) { diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/filter/SimpleMessageFilterService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/filter/SimpleMessageFilterService.kt index 5990eeb66c..1ec90490af 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/filter/SimpleMessageFilterService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/filter/SimpleMessageFilterService.kt @@ -16,23 +16,19 @@ import rx.Subscription class SimpleMessageFilterService(val conf: FirewallConfiguration, val auditService: FirewallAuditService, - val artemisConnectionService: BridgeArtemisConnectionService, - val bridgeSenderService: BridgeSenderService, + private val artemisConnectionService: BridgeArtemisConnectionService, + private val bridgeSenderService: BridgeSenderService, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : IncomingMessageFilterService, ServiceStateSupport by stateHelper { companion object { val log = contextLogger() } - private val statusFollower: ServiceStateCombiner + private val statusFollower = ServiceStateCombiner(listOf(auditService, artemisConnectionService, bridgeSenderService)) private var statusSubscriber: Subscription? = null private val whiteListedAMQPHeaders: Set = conf.whitelistedHeaders.toSet() private var inboundSession: ClientSession? = null private var inboundProducer: ClientProducer? = null - init { - statusFollower = ServiceStateCombiner(listOf(auditService, artemisConnectionService, bridgeSenderService)) - } - override fun start() { statusSubscriber = statusFollower.activeChange.subscribe({ if (it) { @@ -67,7 +63,7 @@ class SimpleMessageFilterService(val conf: FirewallConfiguration, } catch (ex: IllegalArgumentException) { throw SecurityException("Invalid Legal Name ${inboundMessage.sourceLegalName}") } - require(inboundMessage.payload.size > 0) { "No valid payload" } + require(inboundMessage.payload.isNotEmpty()) { "No valid payload" } val validInboxTopic = bridgeSenderService.validateReceiveTopic(inboundMessage.topic, sourceLegalName) require(validInboxTopic) { "Topic not a legitimate Inbox for a node on this Artemis Broker ${inboundMessage.topic}" } require(inboundMessage.applicationProperties.keys.all { it in whiteListedAMQPHeaders }) { "Disallowed header present in ${inboundMessage.applicationProperties.keys}" } @@ -77,7 +73,7 @@ class SimpleMessageFilterService(val conf: FirewallConfiguration, try { validateMessage(inboundMessage) } catch (ex: Exception) { - auditService.packetDropEvent(inboundMessage, "Packet Failed validation checks: " + ex.message) + auditService.packetDropEvent(inboundMessage, "Packet Failed validation checks: " + ex.message, RoutingDirection.INBOUND) inboundMessage.complete(true) // consume the bad message, so that it isn't redelivered forever. return } @@ -95,8 +91,8 @@ class SimpleMessageFilterService(val conf: FirewallConfiguration, } artemisMessage.putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, SimpleString(inboundMessage.sourceLegalName)) artemisMessage.writeBodyBufferBytes(inboundMessage.payload) - producer.send(SimpleString(inboundMessage.topic), artemisMessage, { _ -> inboundMessage.complete(true) }) - auditService.packetAcceptedEvent(inboundMessage) + producer.send(SimpleString(inboundMessage.topic), artemisMessage) { _ -> inboundMessage.complete(true) } + auditService.packetAcceptedEvent(inboundMessage, RoutingDirection.INBOUND) } catch (ex: Exception) { log.error("Error trying to forward message", ex) inboundMessage.complete(false) // delivery failure. NAK back to source and await re-delivery attempts diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt index 6b14b98cc3..eb83e6431e 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt @@ -1,9 +1,6 @@ package net.corda.bridge.services.receiver -import net.corda.bridge.services.api.BridgeAMQPListenerService -import net.corda.bridge.services.api.FirewallAuditService -import net.corda.bridge.services.api.FirewallConfiguration -import net.corda.bridge.services.api.ServiceStateSupport +import net.corda.bridge.services.api.* import net.corda.bridge.services.util.ServiceStateCombiner import net.corda.bridge.services.util.ServiceStateHelper import net.corda.core.utilities.contextLogger @@ -67,11 +64,11 @@ class BridgeAMQPListenerServiceImpl(val conf: FirewallConfiguration, onConnectSubscription = server.onConnection.subscribe(_onConnection) onConnectAuditSubscription = server.onConnection.subscribe({ if (it.connected) { - auditService.successfulConnectionEvent(true, it.remoteAddress, it.remoteCert?.subjectDN?.name - ?: "", "Successful AMQP inbound connection") + auditService.successfulConnectionEvent(it.remoteAddress, it.remoteCert?.subjectDN?.name + ?: "", "Successful AMQP inbound connection", RoutingDirection.INBOUND) } else { - auditService.failedConnectionEvent(true, it.remoteAddress, it.remoteCert?.subjectDN?.name - ?: "", "Failed AMQP inbound connection") + auditService.failedConnectionEvent(it.remoteAddress, it.remoteCert?.subjectDN?.name + ?: "", "Failed AMQP inbound connection", RoutingDirection.INBOUND) } }, { log.error("Connection event error", it) }) onReceiveSubscription = server.onReceive.subscribe(_onReceive) diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt index 6871ca6806..e9ec0a8ee1 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt @@ -25,32 +25,26 @@ import kotlin.concurrent.withLock class FloatControlListenerService(val conf: FirewallConfiguration, val maximumMessageSize: Int, val auditService: FirewallAuditService, - val amqpListener: BridgeAMQPListenerService, + private val amqpListener: BridgeAMQPListenerService, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : FloatControlService, ServiceStateSupport by stateHelper { companion object { - val log = contextLogger() + private val log = contextLogger() } private val lock = ReentrantLock() - private val statusFollower: ServiceStateCombiner + private val statusFollower = ServiceStateCombiner(listOf(auditService, amqpListener)) private var statusSubscriber: Subscription? = null private var incomingMessageSubscriber: Subscription? = null private var connectSubscriber: Subscription? = null private var receiveSubscriber: Subscription? = null private var amqpControlServer: AMQPServer? = null - private val sslConfiguration: MutualSslConfiguration + private val sslConfiguration: MutualSslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: conf.p2pSslOptions private val floatControlAddress = conf.floatOuterConfig!!.floatAddress private val floatClientName = conf.floatOuterConfig!!.expectedCertificateSubject private var activeConnectionInfo: ConnectionChange? = null private var forwardAddress: NetworkHostAndPort? = null private var forwardLegalName: String? = null - init { - statusFollower = ServiceStateCombiner(listOf(auditService, amqpListener)) - sslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: conf.p2pSslOptions - } - - override fun start() { statusSubscriber = statusFollower.activeChange.subscribe({ if (it) { @@ -150,13 +144,13 @@ class FloatControlListenerService(val conf: FirewallConfiguration, private fun onControlMessage(receivedMessage: ReceivedMessage) { if (!receivedMessage.checkTunnelControlTopic()) { - auditService.packetDropEvent(receivedMessage, "Invalid control topic packet received on topic ${receivedMessage.topic}!!") + auditService.packetDropEvent(receivedMessage, "Invalid control topic packet received on topic ${receivedMessage.topic}!!", RoutingDirection.INBOUND) receivedMessage.complete(true) return } val controlMessage = try { if (CordaX500Name.parse(receivedMessage.sourceLegalName) != floatClientName) { - auditService.packetDropEvent(receivedMessage, "Invalid control source legal name!!") + auditService.packetDropEvent(receivedMessage, "Invalid control source legal name!!", RoutingDirection.INBOUND) receivedMessage.complete(true) return } @@ -208,7 +202,7 @@ class FloatControlListenerService(val conf: FirewallConfiguration, return } if (!message.topic.startsWith(P2P_PREFIX)) { - auditService.packetDropEvent(message, "Message topic is not a valid peer namespace ${message.topic}") + auditService.packetDropEvent(message, "Message topic is not a valid peer namespace ${message.topic}", RoutingDirection.INBOUND) message.complete(true) // consume message so it isn't resent forever return } @@ -228,6 +222,7 @@ class FloatControlListenerService(val conf: FirewallConfiguration, emptyMap()) amqpForwardMessage.onComplete.then { message.complete(it.get() == MessageStatus.Acknowledged) } amqpControl.write(amqpForwardMessage) + auditService.packetAcceptedEvent(message, RoutingDirection.INBOUND) } catch (ex: Exception) { log.error("Failed to forward message", ex) message.complete(false) diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt index 887946d7a7..e70c5e7374 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt @@ -29,30 +29,22 @@ class TunnelingBridgeReceiverService(val conf: FirewallConfiguration, val maximumMessageSize: Int, val auditService: FirewallAuditService, haService: BridgeMasterService, - val filterService: IncomingMessageFilterService, + private val filterService: IncomingMessageFilterService, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeReceiverService, ServiceStateSupport by stateHelper { companion object { - val log = contextLogger() + private val log = contextLogger() } - private val statusFollower: ServiceStateCombiner + private val statusFollower = ServiceStateCombiner(listOf(auditService, haService, filterService)) private var statusSubscriber: Subscription? = null private var connectSubscriber: Subscription? = null private var receiveSubscriber: Subscription? = null private var amqpControlClient: AMQPClient? = null - private val controlLinkSSLConfiguration: MutualSslConfiguration - private val floatListenerSSLConfiguration: MutualSslConfiguration - private val expectedCertificateSubject: CordaX500Name + private val controlLinkSSLConfiguration: MutualSslConfiguration = conf.bridgeInnerConfig?.customSSLConfiguration ?: conf.p2pSslOptions + private val floatListenerSSLConfiguration: MutualSslConfiguration = conf.bridgeInnerConfig?.customFloatOuterSSLConfiguration ?: conf.p2pSslOptions + private val expectedCertificateSubject: CordaX500Name = conf.bridgeInnerConfig!!.expectedCertificateSubject private val secureRandom: SecureRandom = newSecureRandom() - init { - statusFollower = ServiceStateCombiner(listOf(auditService, haService, filterService)) - controlLinkSSLConfiguration = conf.bridgeInnerConfig?.customSSLConfiguration ?: conf.p2pSslOptions - floatListenerSSLConfiguration = conf.bridgeInnerConfig?.customFloatOuterSSLConfiguration ?: conf.p2pSslOptions - expectedCertificateSubject = conf.bridgeInnerConfig!!.expectedCertificateSubject - } - - override fun start() { statusSubscriber = statusFollower.activeChange.subscribe({ if (it) { @@ -171,14 +163,14 @@ class TunnelingBridgeReceiverService(val conf: FirewallConfiguration, private fun onFloatMessage(receivedMessage: ReceivedMessage) { if (!receivedMessage.checkTunnelDataTopic()) { - auditService.packetDropEvent(receivedMessage, "Invalid float inbound topic received ${receivedMessage.topic}!!") + auditService.packetDropEvent(receivedMessage, "Invalid float inbound topic received ${receivedMessage.topic}!!", RoutingDirection.INBOUND) receivedMessage.complete(true) return } val innerMessage = try { receivedMessage.payload.deserialize() } catch (ex: Exception) { - auditService.packetDropEvent(receivedMessage, "Unable to decode Float Control message") + auditService.packetDropEvent(receivedMessage, "Unable to decode Float Control message", RoutingDirection.INBOUND) receivedMessage.complete(true) return } diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt index e27f788a0c..1826d5870a 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt @@ -4,30 +4,32 @@ import net.corda.bridge.services.api.* import net.corda.bridge.services.util.ServiceStateCombiner import net.corda.bridge.services.util.ServiceStateHelper import net.corda.core.identity.CordaX500Name +import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisSessionProvider import net.corda.nodeapi.internal.bridging.BridgeControlListener +import net.corda.nodeapi.internal.bridging.BridgeMetricsService +import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage +import org.apache.activemq.artemis.api.core.client.ClientMessage import rx.Subscription +import java.net.InetSocketAddress class DirectBridgeSenderService(val conf: FirewallConfiguration, val maxMessageSize: Int, val auditService: FirewallAuditService, haService: BridgeMasterService, - val artemisConnectionService: BridgeArtemisConnectionService, + private val artemisConnectionService: BridgeArtemisConnectionService, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeSenderService, ServiceStateSupport by stateHelper { companion object { - val log = contextLogger() + private val log = contextLogger() } - private val statusFollower: ServiceStateCombiner + private val statusFollower: ServiceStateCombiner = ServiceStateCombiner(listOf(auditService, artemisConnectionService, haService)) private var statusSubscriber: Subscription? = null private var listenerActiveSubscriber: Subscription? = null - private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf.p2pSslOptions, conf.outboundConfig!!.socksProxyConfig, maxMessageSize, { ForwardingArtemisMessageClient(artemisConnectionService) }) - - init { - statusFollower = ServiceStateCombiner(listOf(auditService, artemisConnectionService, haService)) - } + private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf.p2pSslOptions, conf.outboundConfig!!.socksProxyConfig, maxMessageSize, { ForwardingArtemisMessageClient(artemisConnectionService) }, + BridgeAuditServiceAdaptor(auditService)) private class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider { override fun start(): ArtemisMessagingClient.Started { @@ -44,6 +46,36 @@ class DirectBridgeSenderService(val conf: FirewallConfiguration, } + private class BridgeAuditServiceAdaptor(private val auditService: FirewallAuditService) : BridgeMetricsService { + override fun bridgeCreated(targets: List, legalNames: Set) { + // No corresponding method on FirewallAuditService yet + } + + override fun bridgeConnected(targets: List, legalNames: Set) { + val firstHostPort = targets.first() + auditService.successfulConnectionEvent(InetSocketAddress(firstHostPort.host, firstHostPort.port), + legalNames.first().toString(), "BridgeConnected", RoutingDirection.OUTGOING) + } + + override fun bridgeDisconnected(targets: List, legalNames: Set) { + // No corresponding method on FirewallAuditService yet + } + + override fun bridgeDestroyed(targets: List, legalNames: Set) { + // No corresponding method on FirewallAuditService yet + } + + override fun packetDropEvent(artemisMessage: ClientMessage, msg: String) { + // Too much of a hassle to translate `ClientMessage` into `ApplicationMessage?`, especially given that receiving side is likely + // to be doing counting only. + auditService.packetDropEvent(null, msg, RoutingDirection.OUTGOING) + } + + override fun packetAcceptedEvent(sendableMessage: SendableMessage) { + auditService.packetAcceptedEvent(sendableMessage, RoutingDirection.OUTGOING) + } + } + override fun start() { statusSubscriber = statusFollower.activeChange.subscribe({ ready -> if (ready) { diff --git a/bridge/src/main/resources/firewalldefault.conf b/bridge/src/main/resources/firewalldefault.conf index 7d853829e8..d3ead7bccb 100644 --- a/bridge/src/main/resources/firewalldefault.conf +++ b/bridge/src/main/resources/firewalldefault.conf @@ -5,4 +5,7 @@ artemisReconnectionIntervalMin = 5000 artemisReconnectionIntervalMax = 60000 politeShutdownPeriod = 1000 p2pConfirmationWindowSize = 1048576 -crlCheckSoftFail = true \ No newline at end of file +crlCheckSoftFail = true +auditServiceConfiguration : { + loggingIntervalSec = 60 +} \ No newline at end of file diff --git a/bridge/src/test/kotlin/net/corda/bridge/ConfigTest.kt b/bridge/src/test/kotlin/net/corda/bridge/ConfigTest.kt index 2bdada596c..f6a4b8d889 100644 --- a/bridge/src/test/kotlin/net/corda/bridge/ConfigTest.kt +++ b/bridge/src/test/kotlin/net/corda/bridge/ConfigTest.kt @@ -1,5 +1,6 @@ package net.corda.bridge +import com.typesafe.config.ConfigException import net.corda.bridge.services.api.FirewallMode import net.corda.core.identity.CordaX500Name import net.corda.core.internal.div @@ -119,5 +120,17 @@ class ConfigTest { createAndLoadConfigFromResource(tempFolder.root.toPath() / "bad", badConfigResource) } + assertEquals(60, config.auditServiceConfiguration.loggingIntervalSec) + } + + @Test + fun `Load audit service config`() { + val configResource = "/net/corda/bridge/withaudit/firewall.conf" + val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource) + assertEquals(34, config.auditServiceConfiguration.loggingIntervalSec) + + assertFailsWith { + createAndLoadConfigFromResource(tempFolder.root.toPath() / "err1", "/net/corda/bridge/withaudit/badconfig/badInterval.conf") + } } } \ No newline at end of file diff --git a/bridge/src/test/kotlin/net/corda/bridge/services/TestAuditService.kt b/bridge/src/test/kotlin/net/corda/bridge/services/TestAuditService.kt index 8a99c897ab..ddada659bb 100644 --- a/bridge/src/test/kotlin/net/corda/bridge/services/TestAuditService.kt +++ b/bridge/src/test/kotlin/net/corda/bridge/services/TestAuditService.kt @@ -1,7 +1,8 @@ package net.corda.bridge.services import net.corda.bridge.services.api.FirewallAuditService -import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage +import net.corda.bridge.services.api.RoutingDirection +import net.corda.nodeapi.internal.protonwrapper.messages.ApplicationMessage import rx.Observable import rx.subjects.PublishSubject import java.net.InetSocketAddress @@ -22,22 +23,22 @@ class TestAuditService() : FirewallAuditService, TestServiceBase() { val onAuditEvent: Observable get() = _onAuditEvent - override fun successfulConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress, certificateSubject: String, msg: String) { + override fun successfulConnectionEvent(address: InetSocketAddress, certificateSubject: String, msg: String, direction: RoutingDirection) { ++eventCount _onAuditEvent.onNext(AuditEvent.SUCCESSFUL_CONNECTION) } - override fun failedConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress?, certificateSubject: String?, msg: String) { + override fun failedConnectionEvent(address: InetSocketAddress, certificateSubject: String?, msg: String, direction: RoutingDirection) { ++eventCount _onAuditEvent.onNext(AuditEvent.FAILED_CONNECTION) } - override fun packetDropEvent(packet: ReceivedMessage?, msg: String) { + override fun packetDropEvent(packet: ApplicationMessage?, msg: String, direction: RoutingDirection) { ++eventCount _onAuditEvent.onNext(AuditEvent.PACKET_DROP) } - override fun packetAcceptedEvent(packet: ReceivedMessage) { + override fun packetAcceptedEvent(packet: ApplicationMessage, direction: RoutingDirection) { ++eventCount _onAuditEvent.onNext(AuditEvent.PACKET_ACCEPT) } diff --git a/bridge/src/test/kotlin/net/corda/bridge/services/audit/LoggingFirewallAuditServiceTest.kt b/bridge/src/test/kotlin/net/corda/bridge/services/audit/LoggingFirewallAuditServiceTest.kt new file mode 100644 index 0000000000..6c87d1db9d --- /dev/null +++ b/bridge/src/test/kotlin/net/corda/bridge/services/audit/LoggingFirewallAuditServiceTest.kt @@ -0,0 +1,157 @@ +package net.corda.bridge.services.audit + +import com.natpryce.hamkrest.assertion.assertThat +import com.natpryce.hamkrest.containsSubstring +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.whenever +import net.corda.bridge.services.api.AuditServiceConfiguration +import net.corda.bridge.services.api.FirewallConfiguration +import net.corda.bridge.services.api.RoutingDirection +import net.corda.nodeapi.internal.protonwrapper.messages.ApplicationMessage +import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage +import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage +import org.junit.After +import org.junit.Test +import java.net.InetSocketAddress + +class LoggingFirewallAuditServiceTest { + + private val instance : LoggingFirewallAuditService + + init { + val auditServiceConfiguration = mock() + val firewallConfiguration = mock() + whenever(firewallConfiguration.auditServiceConfiguration).then { auditServiceConfiguration } + whenever(auditServiceConfiguration.loggingIntervalSec).then { 50L } + instance = LoggingFirewallAuditService(firewallConfiguration) + } + + @After + fun tearDown() { + instance.stop() + } + + @Test + fun testStatsOutput() { + + fun Int.toDirection() : RoutingDirection { + return if (this % 3 == 0) { + RoutingDirection.INBOUND + } else { + RoutingDirection.OUTGOING + } + } + + val byteArray = ByteArray(20) + + fun Int.createMessage(direction : RoutingDirection) : ApplicationMessage { + val partyName = "Party" + ((this % 4) + 1) + return when(direction) { + RoutingDirection.INBOUND -> { + val msg = mock() + whenever(msg.payload).then { byteArray } + whenever(msg.sourceLegalName).then { partyName } + msg + } + RoutingDirection.OUTGOING -> { + val msg = mock() + whenever(msg.payload).then { byteArray } + whenever(msg.destinationLegalName).then { partyName } + msg + } + } + } + + fun Int.toAddress(): InetSocketAddress { + val hostname = "Server" + ((this % 5) + 1) + return InetSocketAddress(hostname, 10001) + } + + val failedConnCount = 7 + (1..failedConnCount).forEach { instance.failedConnectionEvent(it.toAddress(), null, "test", it.toDirection()) } + val succConnCount = 40 + (1..succConnCount).forEach { instance.successfulConnectionEvent(it.toAddress(), "test2", "test", it.toDirection()) } + + val packetDropCount = 20 + // Multi-threaded operation to make sure the state data structure is sound in that regard + (1..packetDropCount).toList().parallelStream().forEach { val direction = it.toDirection() + instance.packetDropEvent(it.createMessage(direction), "test3", direction) } + + val packetAcceptedCount = 9000 + // Multi-threaded operation to make sure the state data structure is sound in that regard + (1..packetAcceptedCount).toList().parallelStream().forEach { + val direction = it.toDirection() + instance.packetAcceptedEvent(it.createMessage(direction), direction) + } + + val statsStr = instance.prepareStatsAndReset() + assertThat(statsStr, containsSubstring("Successful connection count: 13(inbound), 27(outgoing)")) + assertThat(statsStr, containsSubstring("Failed connection count: 2(inbound), 5(outgoing)")) + assertThat(statsStr, containsSubstring("Packets accepted count: 3,000(inbound), 6,000(outgoing)")) + assertThat(statsStr, containsSubstring("Bytes transmitted: 60,000(inbound), 120,000(outgoing)")) + assertThat(statsStr, containsSubstring("Packets dropped count: 6(inbound), 14(outgoing)")) + assertThat(statsStr, containsSubstring("Failed connections out:")) + assertThat(statsStr, containsSubstring("Server5:10001 -> 1")) + + // Ensure reset stats + val statsStr2 = instance.prepareStatsAndReset() + assertThat(statsStr2, containsSubstring("Successful connection count: 0")) + assertThat(statsStr2, containsSubstring("Packets dropped count: 0(inbound), 0(outgoing)")) + } +} + +/* +During last PT1M stats were as follows: +Load average: N/A +Memory: + Free: 230 MB + Total: 702 MB + Max: 7,243 MB +Traffic totals: + Successful connection count: 13(inbound), 27(outgoing) + Failed connection count: 2(inbound), 5(outgoing) + Packets accepted count: 3,000(inbound), 6,000(outgoing) + Bytes transmitted: 60,000(inbound), 120,000(outgoing) + Packets dropped count: 6(inbound), 14(outgoing) +Traffic breakdown: + Successful connections in: + Server5:10001 -> 3 + Server4:10001 -> 3 + Server3:10001 -> 2 + Server2:10001 -> 3 + Server1:10001 -> 2 + Successful connections out: + Server5:10001 -> 5 + Server4:10001 -> 5 + Server3:10001 -> 6 + Server2:10001 -> 5 + Server1:10001 -> 6 + Failed connections in: + Server4:10001 -> 1 + Server2:10001 -> 1 + Failed connections out: + Server5:10001 -> 1 + Server3:10001 -> 2 + Server2:10001 -> 1 + Server1:10001 -> 1 + Accepted packets in: + Party1 -> 750 + Party2 -> 750 + Party3 -> 750 + Party4 -> 750 + Accepted packets out: + Party1 -> 1,500 + Party2 -> 1,500 + Party3 -> 1,500 + Party4 -> 1,500 + Dropped packets in: + Party1 -> 1 + Party2 -> 1 + Party3 -> 2 + Party4 -> 2 + Dropped packets out: + Party1 -> 4 + Party2 -> 4 + Party3 -> 3 + Party4 -> 3 + */ \ No newline at end of file diff --git a/bridge/src/test/resources/net/corda/bridge/withaudit/badconfig/badInterval.conf b/bridge/src/test/resources/net/corda/bridge/withaudit/badconfig/badInterval.conf new file mode 100644 index 0000000000..becf64becb --- /dev/null +++ b/bridge/src/test/resources/net/corda/bridge/withaudit/badconfig/badInterval.conf @@ -0,0 +1,18 @@ +firewallMode = SenderReceiver +outboundConfig : { + artemisBrokerAddress = "localhost:11005" + socksProxyConfig : { + version = SOCKS5 + proxyAddress = "localhost:12345" + userName = "proxyUser" + password = "pwd" + } +} +inboundConfig : { + listeningAddress = "0.0.0.0:10005" +} +networkParametersPath = network-parameters + +auditServiceConfiguration : { + loggingIntervalSec = gibberish +} \ No newline at end of file diff --git a/bridge/src/test/resources/net/corda/bridge/withaudit/firewall.conf b/bridge/src/test/resources/net/corda/bridge/withaudit/firewall.conf new file mode 100644 index 0000000000..5c325f4a0e --- /dev/null +++ b/bridge/src/test/resources/net/corda/bridge/withaudit/firewall.conf @@ -0,0 +1,18 @@ +firewallMode = SenderReceiver +outboundConfig : { + artemisBrokerAddress = "localhost:11005" + socksProxyConfig : { + version = SOCKS5 + proxyAddress = "localhost:12345" + userName = "proxyUser" + password = "pwd" + } +} +inboundConfig : { + listeningAddress = "0.0.0.0:10005" +} +networkParametersPath = network-parameters + +auditServiceConfiguration : { + loggingIntervalSec = 34 +} \ No newline at end of file diff --git a/docs/source/firewall-configuration-file.rst b/docs/source/firewall-configuration-file.rst index 787f801871..dfa2a8ddf6 100644 --- a/docs/source/firewall-configuration-file.rst +++ b/docs/source/firewall-configuration-file.rst @@ -201,6 +201,10 @@ absolute path to the firewall's base directory. sets of ``bridges`` (e.g. in test environments). The default value is ``bridge/ha`` and would not normally need to be changed if the cluster is not shared. +:auditServiceConfiguration: Both ``FloatOuter`` and ``BridgeInner`` components have an audit service which is currently outputting into the process log some traffic statistics. + + :loggingIntervalSec: This is an integer value which controls how frequently, in seconds, statistics will be written into the logs. + :artemisReconnectionIntervalMin: If connection to the local Artemis server fails the initial reconnection attempt will be after [artemisReconnectionIntervalMin] ms. The default interval is 5000 ms. Subsequent retries will take be exponentially backed off until they reach [artemisReconnectionIntervalMax] ms. diff --git a/experimental/rpc-worker/src/integration-test/kotlin/net/corda/rpcWorker/RpcFlowWorkerDriver.kt b/experimental/rpc-worker/src/integration-test/kotlin/net/corda/rpcWorker/RpcFlowWorkerDriver.kt index 8f01954c8d..f5ddfbfaf3 100644 --- a/experimental/rpc-worker/src/integration-test/kotlin/net/corda/rpcWorker/RpcFlowWorkerDriver.kt +++ b/experimental/rpc-worker/src/integration-test/kotlin/net/corda/rpcWorker/RpcFlowWorkerDriver.kt @@ -4,6 +4,7 @@ import net.corda.bridge.FirewallVersionInfo import net.corda.bridge.internal.FirewallInstance import net.corda.bridge.services.api.FirewallConfiguration import net.corda.bridge.services.api.FirewallMode +import net.corda.bridge.services.config.AuditServiceConfigurationImpl import net.corda.bridge.services.config.BridgeInboundConfigurationImpl import net.corda.bridge.services.config.BridgeOutboundConfigurationImpl import net.corda.bridge.services.config.FirewallConfigurationImpl @@ -201,7 +202,8 @@ data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : Intern val bridgeConfig = FirewallConfigurationImpl(baseDirectory = baseDirectory, crlCheckSoftFail = true, bridgeInnerConfig = null, keyStorePassword = "pass", trustStorePassword = "pass", firewallMode = FirewallMode.SenderReceiver, networkParametersPath = baseDirectory, outboundConfig = BridgeOutboundConfigurationImpl(nodeConfig.messagingServerAddress!!, listOf(), null, null), - inboundConfig = BridgeInboundConfigurationImpl(bridgeListeningAddress, null), enableAMQPPacketTrace = false, floatOuterConfig = null, haConfig = null) + inboundConfig = BridgeInboundConfigurationImpl(bridgeListeningAddress, null), enableAMQPPacketTrace = false, floatOuterConfig = null, haConfig = null, + auditServiceConfiguration = AuditServiceConfigurationImpl(120)) baseDirectory.createDirectories() // Write config (for reference)