Wire up the maxMessageSize work in the bridge.

This commit is contained in:
Matthew Nesbit 2018-05-17 15:51:42 +01:00
parent a9ff1ac262
commit 2b39e63a3f
10 changed files with 62 additions and 27 deletions

View File

@ -49,11 +49,11 @@ class AMQPListenerTest {
@Test
fun `Basic AMPQListenerService lifecycle test`() {
val configResource = "/net/corda/bridge/singleprocess/bridge.conf"
createNetworkParams(tempFolder.root.toPath())
val maxMessageSize = createNetworkParams(tempFolder.root.toPath())
val bridgeConfig = createAndLoadConfigFromResource(tempFolder.root.toPath() / "listener", configResource)
bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val auditService = TestAuditService()
val amqpListenerService = BridgeAMQPListenerServiceImpl(bridgeConfig, auditService)
val amqpListenerService = BridgeAMQPListenerServiceImpl(bridgeConfig, maxMessageSize, auditService)
val stateFollower = amqpListenerService.activeChange.toBlocking().iterator
val connectionFollower = amqpListenerService.onConnection.toBlocking().iterator
val auditFollower = auditService.onAuditEvent.toBlocking().iterator
@ -78,7 +78,7 @@ class AMQPListenerTest {
// Fire lots of activity to prove we are good
assertEquals(TestAuditService.AuditEvent.STATUS_CHANGE, auditFollower.next())
assertEquals(true, amqpListenerService.active)
// Definitely a socket tehre
// Definitely a socket there
assertEquals(true, serverListening("localhost", 10005))
// But not a valid SSL link
assertEquals(false, connectionFollower.next().connected)
@ -95,7 +95,8 @@ class AMQPListenerTest {
clientKeyStore,
clientConfig.keyStorePassword,
clientTrustStore,
true)
true,
maxMessageSize = maxMessageSize)
amqpClient.start()
// Should see events to show we got a valid connection
@ -134,11 +135,11 @@ class AMQPListenerTest {
@Test
fun `Bad certificate audit check`() {
val configResource = "/net/corda/bridge/singleprocess/bridge.conf"
createNetworkParams(tempFolder.root.toPath())
val maxMessageSize = createNetworkParams(tempFolder.root.toPath())
val bridgeConfig = createAndLoadConfigFromResource(tempFolder.root.toPath() / "listener", configResource)
bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val auditService = TestAuditService()
val amqpListenerService = BridgeAMQPListenerServiceImpl(bridgeConfig, auditService)
val amqpListenerService = BridgeAMQPListenerServiceImpl(bridgeConfig, maxMessageSize, auditService)
amqpListenerService.start()
auditService.start()
val keyStoreBytes = bridgeConfig.sslKeystore.readAll()
@ -165,7 +166,8 @@ class AMQPListenerTest {
clientKeyStore.internal,
"password",
clientTrustStore.internal,
true)
true,
maxMessageSize = maxMessageSize)
amqpClient.start()
val connectionEvent = connectionFollower.next()
assertEquals(false, connectionEvent.connected)

View File

@ -71,13 +71,13 @@ class TunnelControlTest {
val bridgeConfigResource = "/net/corda/bridge/withfloat/bridge/bridge.conf"
val bridgePath = tempFolder.root.toPath() / "bridge"
bridgePath.createDirectories()
createNetworkParams(bridgePath)
val maxMessageSize = createNetworkParams(bridgePath)
val bridgeConfig = createAndLoadConfigFromResource(bridgePath, bridgeConfigResource)
bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val bridgeAuditService = TestAuditService()
val haService = SingleInstanceMasterService(bridgeConfig, bridgeAuditService)
val filterService = createPartialMock<TestIncomingMessageFilterService>()
val bridgeProxiedReceiverService = TunnelingBridgeReceiverService(bridgeConfig, bridgeAuditService, haService, filterService)
val bridgeProxiedReceiverService = TunnelingBridgeReceiverService(bridgeConfig, maxMessageSize, bridgeAuditService, haService, filterService)
val bridgeStateFollower = bridgeProxiedReceiverService.activeChange.toBlocking().iterator
bridgeProxiedReceiverService.start()
assertEquals(false, bridgeStateFollower.next())
@ -101,7 +101,7 @@ class TunnelControlTest {
doReturn(Observable.never<ConnectionChange>()).whenever(it).onConnection
doReturn(Observable.never<ReceivedMessage>()).whenever(it).onReceive
}
val floatControlListener = FloatControlListenerService(floatConfig, floatAuditService, amqpListenerService)
val floatControlListener = FloatControlListenerService(floatConfig, maxMessageSize, floatAuditService, amqpListenerService)
val floatStateFollower = floatControlListener.activeChange.toBlocking().iterator
assertEquals(false, floatStateFollower.next())
assertEquals(false, floatControlListener.active)
@ -149,7 +149,7 @@ class TunnelControlTest {
val bridgeConfigResource = "/net/corda/bridge/withfloat/bridge/bridge.conf"
val bridgePath = tempFolder.root.toPath() / "bridge"
bridgePath.createDirectories()
createNetworkParams(bridgePath)
val maxMessageSize = createNetworkParams(bridgePath)
val bridgeConfig = createAndLoadConfigFromResource(bridgePath, bridgeConfigResource)
bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME)
val bridgeAuditService = TestAuditService()
@ -162,7 +162,7 @@ class TunnelControlTest {
Unit
}.whenever(it).sendMessageToLocalBroker(any())
}
val bridgeProxiedReceiverService = TunnelingBridgeReceiverService(bridgeConfig, bridgeAuditService, haService, filterService)
val bridgeProxiedReceiverService = TunnelingBridgeReceiverService(bridgeConfig, maxMessageSize, bridgeAuditService, haService, filterService)
val bridgeStateFollower = bridgeProxiedReceiverService.activeChange.toBlocking().iterator
bridgeProxiedReceiverService.start()
bridgeAuditService.start()
@ -183,7 +183,7 @@ class TunnelControlTest {
doReturn(Observable.never<ConnectionChange>()).whenever(it).onConnection
doReturn(receiveObserver).whenever(it).onReceive
}
val floatControlListener = FloatControlListenerService(floatConfig, floatAuditService, amqpListenerService)
val floatControlListener = FloatControlListenerService(floatConfig, maxMessageSize, floatAuditService, amqpListenerService)
floatControlListener.start()
floatAuditService.start()
amqpListenerService.start()

View File

@ -80,7 +80,7 @@ class SimpleMessageFilterService(val conf: BridgeConfiguration,
require(inboundMessage.payload.size > 0) { "No valid payload" }
val validInboxTopic = bridgeSenderService.validateReceiveTopic(inboundMessage.topic, sourceLegalName)
require(validInboxTopic) { "Topic not a legitimate Inbox for a node on this Artemis Broker ${inboundMessage.topic}" }
require(inboundMessage.applicationProperties.keys.all { it!!.toString() in whiteListedAMQPHeaders }) { "Disallowed header present in ${inboundMessage.applicationProperties.keys.map { it.toString() }}" }
require(inboundMessage.applicationProperties.keys.all { it in whiteListedAMQPHeaders }) { "Disallowed header present in ${inboundMessage.applicationProperties.keys}" }
}
override fun sendMessageToLocalBroker(inboundMessage: ReceivedMessage) {

View File

@ -31,6 +31,7 @@ import java.security.KeyStore
import java.util.*
class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
val maxMessageSize: Int,
val auditService: BridgeAuditService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeAMQPListenerService, ServiceStateSupport by stateHelper {
companion object {
@ -60,7 +61,16 @@ class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
val keyStore = loadKeyStoreAndWipeKeys(keyStoreBytes, keyStorePassword)
val trustStore = loadKeyStoreAndWipeKeys(trustStoreBytes, trustStorePassword)
val bindAddress = conf.inboundConfig!!.listeningAddress
val server = AMQPServer(bindAddress.host, bindAddress.port, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, conf.crlCheckSoftFail, conf.enableAMQPPacketTrace)
val server = AMQPServer(bindAddress.host,
bindAddress.port,
PEER_USER,
PEER_USER,
keyStore,
keyStorePrivateKeyPassword,
trustStore,
conf.crlCheckSoftFail,
conf.enableAMQPPacketTrace,
maxMessageSize)
onConnectSubscription = server.onConnection.subscribe(_onConnection)
onConnectAuditSubscription = server.onConnection.subscribe {
if (it.connected) {

View File

@ -33,6 +33,7 @@ import kotlin.concurrent.withLock
class FloatControlListenerService(val conf: BridgeConfiguration,
val maxMessageSize: Int,
val auditService: BridgeAuditService,
val amqpListener: BridgeAMQPListenerService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : FloatControlService, ServiceStateSupport by stateHelper {
@ -82,7 +83,16 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
private fun startControlListener() {
lock.withLock {
val controlServer = AMQPServer(floatControlAddress.host, floatControlAddress.port, null, null, keyStore, keyStorePrivateKeyPassword, trustStore, conf.crlCheckSoftFail, conf.enableAMQPPacketTrace)
val controlServer = AMQPServer(floatControlAddress.host,
floatControlAddress.port,
null,
null,
keyStore,
keyStorePrivateKeyPassword,
trustStore,
conf.crlCheckSoftFail,
conf.enableAMQPPacketTrace,
maxMessageSize)
connectSubscriber = controlServer.onConnection.subscribe { onConnectToControl(it) }
receiveSubscriber = controlServer.onReceive.subscribe { onControlMessage(it) }
amqpControlServer = controlServer
@ -214,7 +224,7 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
message.complete(true) // consume message so it isn't resent forever
return
}
val appProperties = message.applicationProperties.map { Pair(it.key!!.toString(), it.value) }.toList()
val appProperties = message.applicationProperties.map { Pair(it.key, it.value) }.toList()
try {
val wrappedMessage = FloatDataPacket(message.topic,
appProperties,

View File

@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
val maxMessageSize: Int,
val auditService: BridgeAuditService,
haService: BridgeMasterService,
val filterService: IncomingMessageFilterService,
@ -72,7 +73,16 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
statusSubscriber = statusFollower.activeChange.subscribe {
if (it) {
val floatAddresses = conf.bridgeInnerConfig!!.floatAddresses
val controlClient = AMQPClient(floatAddresses, setOf(expectedCertificateSubject), null, null, controlLinkKeyStore, controLinkKeyStorePrivateKeyPassword, controlLinkTrustStore, conf.crlCheckSoftFail, conf.enableAMQPPacketTrace)
val controlClient = AMQPClient(floatAddresses,
setOf(expectedCertificateSubject),
null,
null,
controlLinkKeyStore,
controLinkKeyStorePrivateKeyPassword,
controlLinkTrustStore,
conf.crlCheckSoftFail,
conf.enableAMQPPacketTrace,
maxMessageSize = maxMessageSize)
connectSubscriber = controlClient.onConnection.subscribe { onConnectToControl(it) }
receiveSubscriber = controlClient.onReceive.subscribe { onFloatMessage(it) }
amqpControlClient = controlClient

View File

@ -21,8 +21,9 @@ import net.corda.nodeapi.internal.bridging.BridgeControlListener
import rx.Subscription
class DirectBridgeSenderService(val conf: BridgeConfiguration,
val maxMessageSize: Int,
val auditService: BridgeAuditService,
val haService: BridgeMasterService,
haService: BridgeMasterService,
val artemisConnectionService: BridgeArtemisConnectionService,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeSenderService, ServiceStateSupport by stateHelper {
companion object {
@ -33,7 +34,7 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration,
private var statusSubscriber: Subscription? = null
private var connectionSubscriber: Subscription? = null
private var listenerActiveSubscriber: Subscription? = null
private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf, conf.outboundConfig!!.socksProxyConfig, { ForwardingArtemisMessageClient(artemisConnectionService) })
private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf, conf.outboundConfig!!.socksProxyConfig, maxMessageSize, { ForwardingArtemisMessageClient(artemisConnectionService) })
init {
statusFollower = ServiceStateCombiner(listOf(auditService, artemisConnectionService, haService))

View File

@ -49,13 +49,13 @@ class BridgeSupervisorServiceImpl(val conf: BridgeConfiguration,
haService = ExternalMasterElectionService(conf, auditService)
}
artemisService = BridgeArtemisConnectionServiceImpl(conf, maxMessageSize, auditService)
senderService = DirectBridgeSenderService(conf, auditService, haService, artemisService)
senderService = DirectBridgeSenderService(conf, maxMessageSize, auditService, haService, artemisService)
filterService = SimpleMessageFilterService(conf, auditService, artemisService, senderService)
receiverService = if (conf.bridgeMode == BridgeMode.SenderReceiver) {
InProcessBridgeReceiverService(conf, auditService, haService, inProcessAMQPListenerService!!, filterService)
} else {
require(inProcessAMQPListenerService == null) { "Should not have an in process instance of the AMQPListenerService" }
TunnelingBridgeReceiverService(conf, auditService, haService, filterService)
TunnelingBridgeReceiverService(conf, maxMessageSize, auditService, haService, filterService)
}
statusFollower = ServiceStateCombiner(listOf(haService, senderService, receiverService, filterService))
activeChange.subscribe {

View File

@ -34,10 +34,10 @@ class FloatSupervisorServiceImpl(val conf: BridgeConfiguration,
private var statusSubscriber: Subscription? = null
init {
amqpListenerService = BridgeAMQPListenerServiceImpl(conf, auditService)
amqpListenerService = BridgeAMQPListenerServiceImpl(conf, maxMessageSize, auditService)
floatControlService = if (conf.bridgeMode == BridgeMode.FloatOuter) {
require(conf.haConfig == null) { "Float process should not have HA config, that is controlled via the bridge." }
FloatControlListenerService(conf, auditService, amqpListenerService)
FloatControlListenerService(conf, maxMessageSize, auditService, amqpListenerService)
} else {
null
}

View File

@ -33,10 +33,10 @@ import java.nio.file.Path
import java.security.cert.X509Certificate
import java.time.Instant
fun createNetworkParams(baseDirectory: Path) {
fun createNetworkParams(baseDirectory: Path): Int {
val dummyNotaryParty = TestIdentity(DUMMY_NOTARY_NAME)
val notaryInfo = NotaryInfo(dummyNotaryParty.party, false)
val copier = NetworkParametersCopier(NetworkParameters(
val networkParameters = NetworkParameters(
minimumPlatformVersion = 1,
notaries = listOf(notaryInfo),
modifiedTime = Instant.now(),
@ -44,8 +44,10 @@ fun createNetworkParams(baseDirectory: Path) {
maxTransactionSize = 40000,
epoch = 1,
whitelistedContractImplementations = emptyMap<String, List<AttachmentId>>()
), overwriteFile = true)
)
val copier = NetworkParametersCopier(networkParameters, overwriteFile = true)
copier.install(baseDirectory)
return networkParameters.maxMessageSize
}