ENT-2500: Corda Firewall should log some packet statistics (#1426)

* ENT-2500: Refactoring to allow FirewallAuditService have inbound as well as outbound logging.

Change some visibility modifiers to `private`. Use in-place initialisation where possible.

* ENT-2500: Record accepted package coming into Float

* ENT-2500: Introduce optional BridgeAuditService in `nodeApi` module

* ENT-2500: Switch FirewallAuditService to use `ApplicationMessage` and bind outgoing message stats.

* ENT-2500: Introduce scheduled executor and audit service configuration.

* ENT-2500: Stats formatting.

* ENT-2500: Stats formatting unit test.

* ENT-2500: Minor changes to LoggingFirewallAuditService and its unit test.

* ENT-2500: Additional configuration parameter documentation update.

* ENT-2500: Supply optional parameter.

* ENT-2500: Address PR comments.

* ENT-2500: Make API more consistent by using `RoutingDirection`, re-jig `State` data structure, improve unit test.

* ENT-2500: Add breakdown by endpoint address.

* ENT-2500: Compilation fix after rebase in `master`.

* ENT-2500: Making `AuditServiceConfiguration` not optional and supplying default settings.
Also few minor changes.
This commit is contained in:
Viktor Kolomeyko 2018-10-02 14:13:08 +01:00 committed by GitHub
parent ce9538f917
commit 8c23abbd7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 474 additions and 76 deletions

View File

@ -1,6 +1,6 @@
package net.corda.bridge.services.api
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.nodeapi.internal.protonwrapper.messages.ApplicationMessage
import java.net.InetSocketAddress
/**
@ -9,9 +9,17 @@ import java.net.InetSocketAddress
* security data to an enterprise service.
*/
interface FirewallAuditService : ServiceLifecycleSupport {
fun successfulConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress, certificateSubject: String, msg: String)
fun failedConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress?, certificateSubject: String?, msg: String)
fun packetDropEvent(packet: ReceivedMessage?, msg: String)
fun packetAcceptedEvent(packet: ReceivedMessage)
fun successfulConnectionEvent(address: InetSocketAddress, certificateSubject: String, msg: String, direction: RoutingDirection)
fun failedConnectionEvent(address: InetSocketAddress, certificateSubject: String?, msg: String, direction: RoutingDirection)
fun packetDropEvent(packet: ApplicationMessage?, msg: String, direction: RoutingDirection)
fun packetAcceptedEvent(packet: ApplicationMessage, direction: RoutingDirection)
fun statusChangeEvent(msg: String)
}
/**
* Specifies direction of message flow with regard to Corda Node connected to Firewall.
*/
enum class RoutingDirection {
INBOUND,
OUTGOING
}

View File

@ -85,6 +85,10 @@ interface FloatOuterConfiguration {
val customSSLConfiguration: BridgeSSLConfiguration?
}
interface AuditServiceConfiguration {
val loggingIntervalSec: Long
}
interface FirewallConfiguration {
val baseDirectory: Path
val firewallMode: FirewallMode
@ -112,4 +116,5 @@ interface FirewallConfiguration {
val whitelistedHeaders: List<String>
val crlCheckSoftFail: Boolean
val p2pSslOptions: MutualSslConfiguration
val auditServiceConfiguration: AuditServiceConfiguration
}

View File

@ -2,44 +2,198 @@ package net.corda.bridge.services.audit
import net.corda.bridge.services.api.FirewallAuditService
import net.corda.bridge.services.api.FirewallConfiguration
import net.corda.bridge.services.api.RoutingDirection
import net.corda.bridge.services.api.ServiceStateSupport
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.trace
import net.corda.nodeapi.internal.protonwrapper.messages.ApplicationMessage
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import java.lang.management.ManagementFactory
import java.net.InetSocketAddress
import java.text.NumberFormat
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.ReentrantLock
class LoggingFirewallAuditService(val conf: FirewallConfiguration,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : FirewallAuditService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
private val log = contextLogger()
private fun <K> Map<K, AtomicLong>.sumValues(): Long {
return this.values.map { it.get() }.sum()
}
private fun <K> Map<K, AtomicLong>.prettyPrint(tabsCount: Int, nf: NumberFormat): String {
val leftPad = "\t".repeat(tabsCount)
return entries.joinToString(separator = "\n") { entry -> leftPad + entry.key.toString() + " -> " + nf.format(entry.value.get())}
}
}
private val loggingIntervalSec = conf.auditServiceConfiguration.loggingIntervalSec
private val timedExecutor = Executors.newScheduledThreadPool(1)
private val executionLock = ReentrantLock()
private data class DirectionalStats(val successfulConnectionCount : ConcurrentMap<InetSocketAddress, AtomicLong> = ConcurrentHashMap<InetSocketAddress, AtomicLong>(),
val failedConnectionCount : ConcurrentMap<InetSocketAddress, AtomicLong> = ConcurrentHashMap<InetSocketAddress, AtomicLong>(),
val accepted : ConcurrentMap<String, Pair<AtomicLong, AtomicLong>> = ConcurrentHashMap<String, Pair<AtomicLong, AtomicLong>>(),
val droppedPacketsCount : ConcurrentMap<String, AtomicLong> = ConcurrentHashMap<String, AtomicLong>())
private data class State(val directionalStatsMap: EnumMap<RoutingDirection, DirectionalStats> = forEveryRoutingDirection()
) {
companion object {
private fun forEveryRoutingDirection() : EnumMap<RoutingDirection, DirectionalStats> {
val map = RoutingDirection.values().map { it to DirectionalStats() }.toMap()
return EnumMap(map)
}
}
}
private val stateRef = AtomicReference(State())
override fun start() {
stateHelper.active = true
timedExecutor.scheduleAtFixedRate(::logStatsAndReset, loggingIntervalSec, loggingIntervalSec, TimeUnit.SECONDS)
}
override fun stop() {
stateHelper.active = false
timedExecutor.shutdown()
}
override fun successfulConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress, certificateSubject: String, msg: String) {
override fun successfulConnectionEvent(address: InetSocketAddress, certificateSubject: String, msg: String, direction: RoutingDirection) {
log.info(msg)
withDirectionalStatsOf(direction) {
successfulConnectionCount.getOrPut(address, ::AtomicLong).incrementAndGet()
}
}
override fun failedConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress?, certificateSubject: String?, msg: String) {
override fun failedConnectionEvent(address: InetSocketAddress, certificateSubject: String?, msg: String, direction: RoutingDirection) {
log.warn(msg)
withDirectionalStatsOf(direction) {
failedConnectionCount.getOrPut(address, ::AtomicLong).incrementAndGet()
}
}
override fun packetDropEvent(packet: ReceivedMessage?, msg: String) {
log.info(msg)
private fun withDirectionalStatsOf(direction: RoutingDirection, block: DirectionalStats.() -> Unit) {
val directionalStats = stateRef.get().directionalStatsMap[direction]
if(directionalStats == null) {
log.warn("Unknown direction: $direction")
} else {
directionalStats.block()
}
}
override fun packetAcceptedEvent(packet: ReceivedMessage) {
log.trace { "Packet received from ${packet.sourceLegalName} uuid: ${packet.applicationProperties["_AMQ_DUPL_ID"]}" }
private fun ApplicationMessage?.address(direction: RoutingDirection) : String {
val unknownAddress = "Unknown address"
return when(direction) {
RoutingDirection.OUTGOING -> this?.destinationLegalName ?: unknownAddress
RoutingDirection.INBOUND -> if (this is ReceivedMessage) {
this.sourceLegalName
} else {
unknownAddress
}
}
}
override fun packetDropEvent(packet: ApplicationMessage?, msg: String, direction: RoutingDirection) {
log.info("$direction : $msg")
withDirectionalStatsOf(direction) {
droppedPacketsCount.getOrPut(packet.address(direction), ::AtomicLong).incrementAndGet()
}
}
override fun packetAcceptedEvent(packet: ApplicationMessage, direction: RoutingDirection) {
val address = packet.address(direction)
log.trace { "$direction: Address: $address, uuid: ${packet.applicationProperties["_AMQ_DUPL_ID"]}" }
withDirectionalStatsOf(direction) {
val pair = accepted.getOrPut(address) { Pair(AtomicLong(), AtomicLong())}
pair.first.incrementAndGet()
pair.second.addAndGet(packet.payload.size.toLong())
}
}
override fun statusChangeEvent(msg: String) {
log.info(msg)
}
private fun logStatsAndReset() {
if(executionLock.tryLock()) {
try {
val statsStr = prepareStatsAndReset()
log.info(statsStr)
} catch (ex: Exception) {
// This is running by the scheduled execution service and must not fail
log.error("Unexpected exception when logging stats", ex)
} finally {
executionLock.unlock()
}
} else {
log.warn("Skipping stats logging as it is already running in a different thread.")
}
}
internal fun prepareStatsAndReset() : String {
fun Long.toMB(): Long = this / (1024 * 1024)
val operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean()
val loadAverage = operatingSystemMXBean.systemLoadAverage
val loadAverageStr = if(loadAverage < 0) "N/A" else "${loadAverage * 100}%"
val runtime = Runtime.getRuntime()
val freeMemory = runtime.freeMemory().toMB()
val totalMemory = runtime.totalMemory().toMB()
val maxMemory = runtime.maxMemory().toMB()
val nf = NumberFormat.getNumberInstance()
val durationStr = "During last ${Duration.ofSeconds(loggingIntervalSec)} stats were as follows:"
val runtimeStr = "Load average: $loadAverageStr\n" +
"Memory:\n\tFree: ${nf.format(freeMemory)} MB" +
"\n\tTotal: ${nf.format(totalMemory)} MB" +
"\n\tMax: ${nf.format(maxMemory)} MB"
val state = stateRef.getAndSet(State())
val dirStatsIn = state.directionalStatsMap[RoutingDirection.INBOUND]!!
val dirStatsOut = state.directionalStatsMap[RoutingDirection.OUTGOING]!!
val inAcceptedPackets = dirStatsIn.accepted.mapValues { it.value.first }
val outAcceptedPackets = dirStatsOut.accepted.mapValues { it.value.first }
val trafficTotalsStr = "Traffic totals:\n" +
"\tSuccessful connection count: ${nf.format(dirStatsIn.successfulConnectionCount.sumValues())}(inbound), ${nf.format(dirStatsOut.successfulConnectionCount.sumValues())}(outgoing)\n" +
"\tFailed connection count: ${nf.format(dirStatsIn.failedConnectionCount.sumValues())}(inbound), ${nf.format(dirStatsOut.failedConnectionCount.sumValues())}(outgoing)\n" +
"\tPackets accepted count: ${nf.format(inAcceptedPackets.sumValues())}(inbound), " +
"${nf.format(outAcceptedPackets.sumValues())}(outgoing)\n" +
"\tBytes transmitted: ${nf.format(dirStatsIn.accepted.mapValues { it.value.second }.sumValues())}(inbound), " +
"${nf.format(dirStatsOut.accepted.mapValues { it.value.second }.sumValues())}(outgoing)\n" +
"\tPackets dropped count: ${nf.format(dirStatsIn.droppedPacketsCount.sumValues())}(inbound), ${nf.format(dirStatsOut.droppedPacketsCount.sumValues())}(outgoing)"
val breakDownTrafficStr = "Traffic breakdown:\n" +
"\tSuccessful connections in:\n${dirStatsIn.successfulConnectionCount.prettyPrint(2, nf)}\n" +
"\tSuccessful connections out:\n${dirStatsOut.successfulConnectionCount.prettyPrint(2, nf)}\n" +
"\tFailed connections in:\n${dirStatsIn.failedConnectionCount.prettyPrint(2, nf)}\n" +
"\tFailed connections out:\n${dirStatsOut.failedConnectionCount.prettyPrint(2, nf)}\n" +
"\tAccepted packets in:\n${inAcceptedPackets.prettyPrint(2, nf)}\n" +
"\tAccepted packets out:\n${outAcceptedPackets.prettyPrint(2, nf)}\n" +
"\tDropped packets in:\n${dirStatsIn.droppedPacketsCount.prettyPrint(2, nf)}\n" +
"\tDropped packets out:\n${dirStatsOut.droppedPacketsCount.prettyPrint(2, nf)}"
return durationStr + "\n" + runtimeStr + "\n" + trafficTotalsStr + "\n" + breakDownTrafficStr
}
}

View File

@ -45,6 +45,8 @@ data class FloatOuterConfigurationImpl(override val floatAddress: NetworkHostAnd
data class BridgeHAConfigImpl(override val haConnectionString: String, override val haPriority: Int = 10, override val haTopic: String = "/bridge/ha") : BridgeHAConfig
data class AuditServiceConfigurationImpl(override val loggingIntervalSec: Long) : AuditServiceConfiguration
data class FirewallConfigurationImpl(
override val baseDirectory: Path,
private val certificatesDirectory: Path = baseDirectory / "certificates",
@ -65,7 +67,8 @@ data class FirewallConfigurationImpl(
override val artemisReconnectionIntervalMax: Int = 60000,
override val politeShutdownPeriod: Int = 1000,
override val p2pConfirmationWindowSize: Int = 1048576,
override val whitelistedHeaders: List<String> = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.toList()
override val whitelistedHeaders: List<String> = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.toList(),
override val auditServiceConfiguration: AuditServiceConfigurationImpl
) : FirewallConfiguration {
init {
if (firewallMode == FirewallMode.SenderReceiver) {

View File

@ -16,23 +16,19 @@ import rx.Subscription
class SimpleMessageFilterService(val conf: FirewallConfiguration,
val auditService: FirewallAuditService,
val artemisConnectionService: BridgeArtemisConnectionService,
val bridgeSenderService: BridgeSenderService,
private val artemisConnectionService: BridgeArtemisConnectionService,
private val bridgeSenderService: BridgeSenderService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : IncomingMessageFilterService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
}
private val statusFollower: ServiceStateCombiner
private val statusFollower = ServiceStateCombiner(listOf(auditService, artemisConnectionService, bridgeSenderService))
private var statusSubscriber: Subscription? = null
private val whiteListedAMQPHeaders: Set<String> = conf.whitelistedHeaders.toSet()
private var inboundSession: ClientSession? = null
private var inboundProducer: ClientProducer? = null
init {
statusFollower = ServiceStateCombiner(listOf(auditService, artemisConnectionService, bridgeSenderService))
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
@ -67,7 +63,7 @@ class SimpleMessageFilterService(val conf: FirewallConfiguration,
} catch (ex: IllegalArgumentException) {
throw SecurityException("Invalid Legal Name ${inboundMessage.sourceLegalName}")
}
require(inboundMessage.payload.size > 0) { "No valid payload" }
require(inboundMessage.payload.isNotEmpty()) { "No valid payload" }
val validInboxTopic = bridgeSenderService.validateReceiveTopic(inboundMessage.topic, sourceLegalName)
require(validInboxTopic) { "Topic not a legitimate Inbox for a node on this Artemis Broker ${inboundMessage.topic}" }
require(inboundMessage.applicationProperties.keys.all { it in whiteListedAMQPHeaders }) { "Disallowed header present in ${inboundMessage.applicationProperties.keys}" }
@ -77,7 +73,7 @@ class SimpleMessageFilterService(val conf: FirewallConfiguration,
try {
validateMessage(inboundMessage)
} catch (ex: Exception) {
auditService.packetDropEvent(inboundMessage, "Packet Failed validation checks: " + ex.message)
auditService.packetDropEvent(inboundMessage, "Packet Failed validation checks: " + ex.message, RoutingDirection.INBOUND)
inboundMessage.complete(true) // consume the bad message, so that it isn't redelivered forever.
return
}
@ -95,8 +91,8 @@ class SimpleMessageFilterService(val conf: FirewallConfiguration,
}
artemisMessage.putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, SimpleString(inboundMessage.sourceLegalName))
artemisMessage.writeBodyBufferBytes(inboundMessage.payload)
producer.send(SimpleString(inboundMessage.topic), artemisMessage, { _ -> inboundMessage.complete(true) })
auditService.packetAcceptedEvent(inboundMessage)
producer.send(SimpleString(inboundMessage.topic), artemisMessage) { _ -> inboundMessage.complete(true) }
auditService.packetAcceptedEvent(inboundMessage, RoutingDirection.INBOUND)
} catch (ex: Exception) {
log.error("Error trying to forward message", ex)
inboundMessage.complete(false) // delivery failure. NAK back to source and await re-delivery attempts

View File

@ -1,9 +1,6 @@
package net.corda.bridge.services.receiver
import net.corda.bridge.services.api.BridgeAMQPListenerService
import net.corda.bridge.services.api.FirewallAuditService
import net.corda.bridge.services.api.FirewallConfiguration
import net.corda.bridge.services.api.ServiceStateSupport
import net.corda.bridge.services.api.*
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.utilities.contextLogger
@ -67,11 +64,11 @@ class BridgeAMQPListenerServiceImpl(val conf: FirewallConfiguration,
onConnectSubscription = server.onConnection.subscribe(_onConnection)
onConnectAuditSubscription = server.onConnection.subscribe({
if (it.connected) {
auditService.successfulConnectionEvent(true, it.remoteAddress, it.remoteCert?.subjectDN?.name
?: "", "Successful AMQP inbound connection")
auditService.successfulConnectionEvent(it.remoteAddress, it.remoteCert?.subjectDN?.name
?: "", "Successful AMQP inbound connection", RoutingDirection.INBOUND)
} else {
auditService.failedConnectionEvent(true, it.remoteAddress, it.remoteCert?.subjectDN?.name
?: "", "Failed AMQP inbound connection")
auditService.failedConnectionEvent(it.remoteAddress, it.remoteCert?.subjectDN?.name
?: "", "Failed AMQP inbound connection", RoutingDirection.INBOUND)
}
}, { log.error("Connection event error", it) })
onReceiveSubscription = server.onReceive.subscribe(_onReceive)

View File

@ -25,32 +25,26 @@ import kotlin.concurrent.withLock
class FloatControlListenerService(val conf: FirewallConfiguration,
val maximumMessageSize: Int,
val auditService: FirewallAuditService,
val amqpListener: BridgeAMQPListenerService,
private val amqpListener: BridgeAMQPListenerService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : FloatControlService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
private val log = contextLogger()
}
private val lock = ReentrantLock()
private val statusFollower: ServiceStateCombiner
private val statusFollower = ServiceStateCombiner(listOf(auditService, amqpListener))
private var statusSubscriber: Subscription? = null
private var incomingMessageSubscriber: Subscription? = null
private var connectSubscriber: Subscription? = null
private var receiveSubscriber: Subscription? = null
private var amqpControlServer: AMQPServer? = null
private val sslConfiguration: MutualSslConfiguration
private val sslConfiguration: MutualSslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: conf.p2pSslOptions
private val floatControlAddress = conf.floatOuterConfig!!.floatAddress
private val floatClientName = conf.floatOuterConfig!!.expectedCertificateSubject
private var activeConnectionInfo: ConnectionChange? = null
private var forwardAddress: NetworkHostAndPort? = null
private var forwardLegalName: String? = null
init {
statusFollower = ServiceStateCombiner(listOf(auditService, amqpListener))
sslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: conf.p2pSslOptions
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
@ -150,13 +144,13 @@ class FloatControlListenerService(val conf: FirewallConfiguration,
private fun onControlMessage(receivedMessage: ReceivedMessage) {
if (!receivedMessage.checkTunnelControlTopic()) {
auditService.packetDropEvent(receivedMessage, "Invalid control topic packet received on topic ${receivedMessage.topic}!!")
auditService.packetDropEvent(receivedMessage, "Invalid control topic packet received on topic ${receivedMessage.topic}!!", RoutingDirection.INBOUND)
receivedMessage.complete(true)
return
}
val controlMessage = try {
if (CordaX500Name.parse(receivedMessage.sourceLegalName) != floatClientName) {
auditService.packetDropEvent(receivedMessage, "Invalid control source legal name!!")
auditService.packetDropEvent(receivedMessage, "Invalid control source legal name!!", RoutingDirection.INBOUND)
receivedMessage.complete(true)
return
}
@ -208,7 +202,7 @@ class FloatControlListenerService(val conf: FirewallConfiguration,
return
}
if (!message.topic.startsWith(P2P_PREFIX)) {
auditService.packetDropEvent(message, "Message topic is not a valid peer namespace ${message.topic}")
auditService.packetDropEvent(message, "Message topic is not a valid peer namespace ${message.topic}", RoutingDirection.INBOUND)
message.complete(true) // consume message so it isn't resent forever
return
}
@ -228,6 +222,7 @@ class FloatControlListenerService(val conf: FirewallConfiguration,
emptyMap())
amqpForwardMessage.onComplete.then { message.complete(it.get() == MessageStatus.Acknowledged) }
amqpControl.write(amqpForwardMessage)
auditService.packetAcceptedEvent(message, RoutingDirection.INBOUND)
} catch (ex: Exception) {
log.error("Failed to forward message", ex)
message.complete(false)

View File

@ -29,30 +29,22 @@ class TunnelingBridgeReceiverService(val conf: FirewallConfiguration,
val maximumMessageSize: Int,
val auditService: FirewallAuditService,
haService: BridgeMasterService,
val filterService: IncomingMessageFilterService,
private val filterService: IncomingMessageFilterService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeReceiverService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
private val log = contextLogger()
}
private val statusFollower: ServiceStateCombiner
private val statusFollower = ServiceStateCombiner(listOf(auditService, haService, filterService))
private var statusSubscriber: Subscription? = null
private var connectSubscriber: Subscription? = null
private var receiveSubscriber: Subscription? = null
private var amqpControlClient: AMQPClient? = null
private val controlLinkSSLConfiguration: MutualSslConfiguration
private val floatListenerSSLConfiguration: MutualSslConfiguration
private val expectedCertificateSubject: CordaX500Name
private val controlLinkSSLConfiguration: MutualSslConfiguration = conf.bridgeInnerConfig?.customSSLConfiguration ?: conf.p2pSslOptions
private val floatListenerSSLConfiguration: MutualSslConfiguration = conf.bridgeInnerConfig?.customFloatOuterSSLConfiguration ?: conf.p2pSslOptions
private val expectedCertificateSubject: CordaX500Name = conf.bridgeInnerConfig!!.expectedCertificateSubject
private val secureRandom: SecureRandom = newSecureRandom()
init {
statusFollower = ServiceStateCombiner(listOf(auditService, haService, filterService))
controlLinkSSLConfiguration = conf.bridgeInnerConfig?.customSSLConfiguration ?: conf.p2pSslOptions
floatListenerSSLConfiguration = conf.bridgeInnerConfig?.customFloatOuterSSLConfiguration ?: conf.p2pSslOptions
expectedCertificateSubject = conf.bridgeInnerConfig!!.expectedCertificateSubject
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe({
if (it) {
@ -171,14 +163,14 @@ class TunnelingBridgeReceiverService(val conf: FirewallConfiguration,
private fun onFloatMessage(receivedMessage: ReceivedMessage) {
if (!receivedMessage.checkTunnelDataTopic()) {
auditService.packetDropEvent(receivedMessage, "Invalid float inbound topic received ${receivedMessage.topic}!!")
auditService.packetDropEvent(receivedMessage, "Invalid float inbound topic received ${receivedMessage.topic}!!", RoutingDirection.INBOUND)
receivedMessage.complete(true)
return
}
val innerMessage = try {
receivedMessage.payload.deserialize<FloatDataPacket>()
} catch (ex: Exception) {
auditService.packetDropEvent(receivedMessage, "Unable to decode Float Control message")
auditService.packetDropEvent(receivedMessage, "Unable to decode Float Control message", RoutingDirection.INBOUND)
receivedMessage.complete(true)
return
}

View File

@ -4,30 +4,32 @@ import net.corda.bridge.services.api.*
import net.corda.bridge.services.util.ServiceStateCombiner
import net.corda.bridge.services.util.ServiceStateHelper
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.bridging.BridgeControlListener
import net.corda.nodeapi.internal.bridging.BridgeMetricsService
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
import org.apache.activemq.artemis.api.core.client.ClientMessage
import rx.Subscription
import java.net.InetSocketAddress
class DirectBridgeSenderService(val conf: FirewallConfiguration,
val maxMessageSize: Int,
val auditService: FirewallAuditService,
haService: BridgeMasterService,
val artemisConnectionService: BridgeArtemisConnectionService,
private val artemisConnectionService: BridgeArtemisConnectionService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeSenderService, ServiceStateSupport by stateHelper {
companion object {
val log = contextLogger()
private val log = contextLogger()
}
private val statusFollower: ServiceStateCombiner
private val statusFollower: ServiceStateCombiner = ServiceStateCombiner(listOf(auditService, artemisConnectionService, haService))
private var statusSubscriber: Subscription? = null
private var listenerActiveSubscriber: Subscription? = null
private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf.p2pSslOptions, conf.outboundConfig!!.socksProxyConfig, maxMessageSize, { ForwardingArtemisMessageClient(artemisConnectionService) })
init {
statusFollower = ServiceStateCombiner(listOf(auditService, artemisConnectionService, haService))
}
private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf.p2pSslOptions, conf.outboundConfig!!.socksProxyConfig, maxMessageSize, { ForwardingArtemisMessageClient(artemisConnectionService) },
BridgeAuditServiceAdaptor(auditService))
private class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider {
override fun start(): ArtemisMessagingClient.Started {
@ -44,6 +46,36 @@ class DirectBridgeSenderService(val conf: FirewallConfiguration,
}
private class BridgeAuditServiceAdaptor(private val auditService: FirewallAuditService) : BridgeMetricsService {
override fun bridgeCreated(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
// No corresponding method on FirewallAuditService yet
}
override fun bridgeConnected(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
val firstHostPort = targets.first()
auditService.successfulConnectionEvent(InetSocketAddress(firstHostPort.host, firstHostPort.port),
legalNames.first().toString(), "BridgeConnected", RoutingDirection.OUTGOING)
}
override fun bridgeDisconnected(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
// No corresponding method on FirewallAuditService yet
}
override fun bridgeDestroyed(targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
// No corresponding method on FirewallAuditService yet
}
override fun packetDropEvent(artemisMessage: ClientMessage, msg: String) {
// Too much of a hassle to translate `ClientMessage` into `ApplicationMessage?`, especially given that receiving side is likely
// to be doing counting only.
auditService.packetDropEvent(null, msg, RoutingDirection.OUTGOING)
}
override fun packetAcceptedEvent(sendableMessage: SendableMessage) {
auditService.packetAcceptedEvent(sendableMessage, RoutingDirection.OUTGOING)
}
}
override fun start() {
statusSubscriber = statusFollower.activeChange.subscribe({ ready ->
if (ready) {

View File

@ -5,4 +5,7 @@ artemisReconnectionIntervalMin = 5000
artemisReconnectionIntervalMax = 60000
politeShutdownPeriod = 1000
p2pConfirmationWindowSize = 1048576
crlCheckSoftFail = true
crlCheckSoftFail = true
auditServiceConfiguration : {
loggingIntervalSec = 60
}

View File

@ -1,5 +1,6 @@
package net.corda.bridge
import com.typesafe.config.ConfigException
import net.corda.bridge.services.api.FirewallMode
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.div
@ -119,5 +120,17 @@ class ConfigTest {
createAndLoadConfigFromResource(tempFolder.root.toPath() / "bad", badConfigResource)
}
assertEquals(60, config.auditServiceConfiguration.loggingIntervalSec)
}
@Test
fun `Load audit service config`() {
val configResource = "/net/corda/bridge/withaudit/firewall.conf"
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
assertEquals(34, config.auditServiceConfiguration.loggingIntervalSec)
assertFailsWith<ConfigException.WrongType> {
createAndLoadConfigFromResource(tempFolder.root.toPath() / "err1", "/net/corda/bridge/withaudit/badconfig/badInterval.conf")
}
}
}

View File

@ -1,7 +1,8 @@
package net.corda.bridge.services
import net.corda.bridge.services.api.FirewallAuditService
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.bridge.services.api.RoutingDirection
import net.corda.nodeapi.internal.protonwrapper.messages.ApplicationMessage
import rx.Observable
import rx.subjects.PublishSubject
import java.net.InetSocketAddress
@ -22,22 +23,22 @@ class TestAuditService() : FirewallAuditService, TestServiceBase() {
val onAuditEvent: Observable<AuditEvent>
get() = _onAuditEvent
override fun successfulConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress, certificateSubject: String, msg: String) {
override fun successfulConnectionEvent(address: InetSocketAddress, certificateSubject: String, msg: String, direction: RoutingDirection) {
++eventCount
_onAuditEvent.onNext(AuditEvent.SUCCESSFUL_CONNECTION)
}
override fun failedConnectionEvent(inbound: Boolean, sourceIP: InetSocketAddress?, certificateSubject: String?, msg: String) {
override fun failedConnectionEvent(address: InetSocketAddress, certificateSubject: String?, msg: String, direction: RoutingDirection) {
++eventCount
_onAuditEvent.onNext(AuditEvent.FAILED_CONNECTION)
}
override fun packetDropEvent(packet: ReceivedMessage?, msg: String) {
override fun packetDropEvent(packet: ApplicationMessage?, msg: String, direction: RoutingDirection) {
++eventCount
_onAuditEvent.onNext(AuditEvent.PACKET_DROP)
}
override fun packetAcceptedEvent(packet: ReceivedMessage) {
override fun packetAcceptedEvent(packet: ApplicationMessage, direction: RoutingDirection) {
++eventCount
_onAuditEvent.onNext(AuditEvent.PACKET_ACCEPT)
}

View File

@ -0,0 +1,157 @@
package net.corda.bridge.services.audit
import com.natpryce.hamkrest.assertion.assertThat
import com.natpryce.hamkrest.containsSubstring
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import net.corda.bridge.services.api.AuditServiceConfiguration
import net.corda.bridge.services.api.FirewallConfiguration
import net.corda.bridge.services.api.RoutingDirection
import net.corda.nodeapi.internal.protonwrapper.messages.ApplicationMessage
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
import org.junit.After
import org.junit.Test
import java.net.InetSocketAddress
class LoggingFirewallAuditServiceTest {
private val instance : LoggingFirewallAuditService
init {
val auditServiceConfiguration = mock<AuditServiceConfiguration>()
val firewallConfiguration = mock<FirewallConfiguration>()
whenever(firewallConfiguration.auditServiceConfiguration).then { auditServiceConfiguration }
whenever(auditServiceConfiguration.loggingIntervalSec).then { 50L }
instance = LoggingFirewallAuditService(firewallConfiguration)
}
@After
fun tearDown() {
instance.stop()
}
@Test
fun testStatsOutput() {
fun Int.toDirection() : RoutingDirection {
return if (this % 3 == 0) {
RoutingDirection.INBOUND
} else {
RoutingDirection.OUTGOING
}
}
val byteArray = ByteArray(20)
fun Int.createMessage(direction : RoutingDirection) : ApplicationMessage {
val partyName = "Party" + ((this % 4) + 1)
return when(direction) {
RoutingDirection.INBOUND -> {
val msg = mock<ReceivedMessage>()
whenever(msg.payload).then { byteArray }
whenever(msg.sourceLegalName).then { partyName }
msg
}
RoutingDirection.OUTGOING -> {
val msg = mock<SendableMessage>()
whenever(msg.payload).then { byteArray }
whenever(msg.destinationLegalName).then { partyName }
msg
}
}
}
fun Int.toAddress(): InetSocketAddress {
val hostname = "Server" + ((this % 5) + 1)
return InetSocketAddress(hostname, 10001)
}
val failedConnCount = 7
(1..failedConnCount).forEach { instance.failedConnectionEvent(it.toAddress(), null, "test", it.toDirection()) }
val succConnCount = 40
(1..succConnCount).forEach { instance.successfulConnectionEvent(it.toAddress(), "test2", "test", it.toDirection()) }
val packetDropCount = 20
// Multi-threaded operation to make sure the state data structure is sound in that regard
(1..packetDropCount).toList().parallelStream().forEach { val direction = it.toDirection()
instance.packetDropEvent(it.createMessage(direction), "test3", direction) }
val packetAcceptedCount = 9000
// Multi-threaded operation to make sure the state data structure is sound in that regard
(1..packetAcceptedCount).toList().parallelStream().forEach {
val direction = it.toDirection()
instance.packetAcceptedEvent(it.createMessage(direction), direction)
}
val statsStr = instance.prepareStatsAndReset()
assertThat(statsStr, containsSubstring("Successful connection count: 13(inbound), 27(outgoing)"))
assertThat(statsStr, containsSubstring("Failed connection count: 2(inbound), 5(outgoing)"))
assertThat(statsStr, containsSubstring("Packets accepted count: 3,000(inbound), 6,000(outgoing)"))
assertThat(statsStr, containsSubstring("Bytes transmitted: 60,000(inbound), 120,000(outgoing)"))
assertThat(statsStr, containsSubstring("Packets dropped count: 6(inbound), 14(outgoing)"))
assertThat(statsStr, containsSubstring("Failed connections out:"))
assertThat(statsStr, containsSubstring("Server5:10001 -> 1"))
// Ensure reset stats
val statsStr2 = instance.prepareStatsAndReset()
assertThat(statsStr2, containsSubstring("Successful connection count: 0"))
assertThat(statsStr2, containsSubstring("Packets dropped count: 0(inbound), 0(outgoing)"))
}
}
/*
During last PT1M stats were as follows:
Load average: N/A
Memory:
Free: 230 MB
Total: 702 MB
Max: 7,243 MB
Traffic totals:
Successful connection count: 13(inbound), 27(outgoing)
Failed connection count: 2(inbound), 5(outgoing)
Packets accepted count: 3,000(inbound), 6,000(outgoing)
Bytes transmitted: 60,000(inbound), 120,000(outgoing)
Packets dropped count: 6(inbound), 14(outgoing)
Traffic breakdown:
Successful connections in:
Server5:10001 -> 3
Server4:10001 -> 3
Server3:10001 -> 2
Server2:10001 -> 3
Server1:10001 -> 2
Successful connections out:
Server5:10001 -> 5
Server4:10001 -> 5
Server3:10001 -> 6
Server2:10001 -> 5
Server1:10001 -> 6
Failed connections in:
Server4:10001 -> 1
Server2:10001 -> 1
Failed connections out:
Server5:10001 -> 1
Server3:10001 -> 2
Server2:10001 -> 1
Server1:10001 -> 1
Accepted packets in:
Party1 -> 750
Party2 -> 750
Party3 -> 750
Party4 -> 750
Accepted packets out:
Party1 -> 1,500
Party2 -> 1,500
Party3 -> 1,500
Party4 -> 1,500
Dropped packets in:
Party1 -> 1
Party2 -> 1
Party3 -> 2
Party4 -> 2
Dropped packets out:
Party1 -> 4
Party2 -> 4
Party3 -> 3
Party4 -> 3
*/

View File

@ -0,0 +1,18 @@
firewallMode = SenderReceiver
outboundConfig : {
artemisBrokerAddress = "localhost:11005"
socksProxyConfig : {
version = SOCKS5
proxyAddress = "localhost:12345"
userName = "proxyUser"
password = "pwd"
}
}
inboundConfig : {
listeningAddress = "0.0.0.0:10005"
}
networkParametersPath = network-parameters
auditServiceConfiguration : {
loggingIntervalSec = gibberish
}

View File

@ -0,0 +1,18 @@
firewallMode = SenderReceiver
outboundConfig : {
artemisBrokerAddress = "localhost:11005"
socksProxyConfig : {
version = SOCKS5
proxyAddress = "localhost:12345"
userName = "proxyUser"
password = "pwd"
}
}
inboundConfig : {
listeningAddress = "0.0.0.0:10005"
}
networkParametersPath = network-parameters
auditServiceConfiguration : {
loggingIntervalSec = 34
}

View File

@ -201,6 +201,10 @@ absolute path to the firewall's base directory.
sets of ``bridges`` (e.g. in test environments).
The default value is ``bridge/ha`` and would not normally need to be changed if the cluster is not shared.
:auditServiceConfiguration: Both ``FloatOuter`` and ``BridgeInner`` components have an audit service which is currently outputting into the process log some traffic statistics.
:loggingIntervalSec: This is an integer value which controls how frequently, in seconds, statistics will be written into the logs.
:artemisReconnectionIntervalMin: If connection to the local Artemis server fails the initial reconnection attempt will be
after [artemisReconnectionIntervalMin] ms. The default interval is 5000 ms.
Subsequent retries will take be exponentially backed off until they reach [artemisReconnectionIntervalMax] ms.

View File

@ -4,6 +4,7 @@ import net.corda.bridge.FirewallVersionInfo
import net.corda.bridge.internal.FirewallInstance
import net.corda.bridge.services.api.FirewallConfiguration
import net.corda.bridge.services.api.FirewallMode
import net.corda.bridge.services.config.AuditServiceConfigurationImpl
import net.corda.bridge.services.config.BridgeInboundConfigurationImpl
import net.corda.bridge.services.config.BridgeOutboundConfigurationImpl
import net.corda.bridge.services.config.FirewallConfigurationImpl
@ -201,7 +202,8 @@ data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : Intern
val bridgeConfig = FirewallConfigurationImpl(baseDirectory = baseDirectory, crlCheckSoftFail = true,
bridgeInnerConfig = null, keyStorePassword = "pass", trustStorePassword = "pass", firewallMode = FirewallMode.SenderReceiver,
networkParametersPath = baseDirectory, outboundConfig = BridgeOutboundConfigurationImpl(nodeConfig.messagingServerAddress!!, listOf(), null, null),
inboundConfig = BridgeInboundConfigurationImpl(bridgeListeningAddress, null), enableAMQPPacketTrace = false, floatOuterConfig = null, haConfig = null)
inboundConfig = BridgeInboundConfigurationImpl(bridgeListeningAddress, null), enableAMQPPacketTrace = false, floatOuterConfig = null, haConfig = null,
auditServiceConfiguration = AuditServiceConfigurationImpl(120))
baseDirectory.createDirectories()
// Write config (for reference)