diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/services/AMQPListenerTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/services/AMQPListenerTest.kt index 46b0197eec..a682be54d8 100644 --- a/bridge/src/integration-test/kotlin/net/corda/bridge/services/AMQPListenerTest.kt +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/services/AMQPListenerTest.kt @@ -49,11 +49,11 @@ class AMQPListenerTest { @Test fun `Basic AMPQListenerService lifecycle test`() { val configResource = "/net/corda/bridge/singleprocess/bridge.conf" - createNetworkParams(tempFolder.root.toPath()) + val maxMessageSize = createNetworkParams(tempFolder.root.toPath()) val bridgeConfig = createAndLoadConfigFromResource(tempFolder.root.toPath() / "listener", configResource) bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME) val auditService = TestAuditService() - val amqpListenerService = BridgeAMQPListenerServiceImpl(bridgeConfig, auditService) + val amqpListenerService = BridgeAMQPListenerServiceImpl(bridgeConfig, maxMessageSize, auditService) val stateFollower = amqpListenerService.activeChange.toBlocking().iterator val connectionFollower = amqpListenerService.onConnection.toBlocking().iterator val auditFollower = auditService.onAuditEvent.toBlocking().iterator @@ -78,7 +78,7 @@ class AMQPListenerTest { // Fire lots of activity to prove we are good assertEquals(TestAuditService.AuditEvent.STATUS_CHANGE, auditFollower.next()) assertEquals(true, amqpListenerService.active) - // Definitely a socket tehre + // Definitely a socket there assertEquals(true, serverListening("localhost", 10005)) // But not a valid SSL link assertEquals(false, connectionFollower.next().connected) @@ -95,7 +95,8 @@ class AMQPListenerTest { clientKeyStore, clientConfig.keyStorePassword, clientTrustStore, - true) + true, + maxMessageSize = maxMessageSize) amqpClient.start() // Should see events to show we got a valid connection @@ -134,11 +135,11 @@ class AMQPListenerTest { @Test fun `Bad certificate audit check`() { val configResource = "/net/corda/bridge/singleprocess/bridge.conf" - createNetworkParams(tempFolder.root.toPath()) + val maxMessageSize = createNetworkParams(tempFolder.root.toPath()) val bridgeConfig = createAndLoadConfigFromResource(tempFolder.root.toPath() / "listener", configResource) bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME) val auditService = TestAuditService() - val amqpListenerService = BridgeAMQPListenerServiceImpl(bridgeConfig, auditService) + val amqpListenerService = BridgeAMQPListenerServiceImpl(bridgeConfig, maxMessageSize, auditService) amqpListenerService.start() auditService.start() val keyStoreBytes = bridgeConfig.sslKeystore.readAll() @@ -165,7 +166,8 @@ class AMQPListenerTest { clientKeyStore.internal, "password", clientTrustStore.internal, - true) + true, + maxMessageSize = maxMessageSize) amqpClient.start() val connectionEvent = connectionFollower.next() assertEquals(false, connectionEvent.connected) diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/services/TunnelControlTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/services/TunnelControlTest.kt index 3a10554463..60e497adab 100644 --- a/bridge/src/integration-test/kotlin/net/corda/bridge/services/TunnelControlTest.kt +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/services/TunnelControlTest.kt @@ -71,13 +71,13 @@ class TunnelControlTest { val bridgeConfigResource = "/net/corda/bridge/withfloat/bridge/bridge.conf" val bridgePath = tempFolder.root.toPath() / "bridge" bridgePath.createDirectories() - createNetworkParams(bridgePath) + val maxMessageSize = createNetworkParams(bridgePath) val bridgeConfig = createAndLoadConfigFromResource(bridgePath, bridgeConfigResource) bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME) val bridgeAuditService = TestAuditService() val haService = SingleInstanceMasterService(bridgeConfig, bridgeAuditService) val filterService = createPartialMock() - val bridgeProxiedReceiverService = TunnelingBridgeReceiverService(bridgeConfig, bridgeAuditService, haService, filterService) + val bridgeProxiedReceiverService = TunnelingBridgeReceiverService(bridgeConfig, maxMessageSize, bridgeAuditService, haService, filterService) val bridgeStateFollower = bridgeProxiedReceiverService.activeChange.toBlocking().iterator bridgeProxiedReceiverService.start() assertEquals(false, bridgeStateFollower.next()) @@ -101,7 +101,7 @@ class TunnelControlTest { doReturn(Observable.never()).whenever(it).onConnection doReturn(Observable.never()).whenever(it).onReceive } - val floatControlListener = FloatControlListenerService(floatConfig, floatAuditService, amqpListenerService) + val floatControlListener = FloatControlListenerService(floatConfig, maxMessageSize, floatAuditService, amqpListenerService) val floatStateFollower = floatControlListener.activeChange.toBlocking().iterator assertEquals(false, floatStateFollower.next()) assertEquals(false, floatControlListener.active) @@ -149,7 +149,7 @@ class TunnelControlTest { val bridgeConfigResource = "/net/corda/bridge/withfloat/bridge/bridge.conf" val bridgePath = tempFolder.root.toPath() / "bridge" bridgePath.createDirectories() - createNetworkParams(bridgePath) + val maxMessageSize = createNetworkParams(bridgePath) val bridgeConfig = createAndLoadConfigFromResource(bridgePath, bridgeConfigResource) bridgeConfig.createBridgeKeyStores(DUMMY_BANK_A_NAME) val bridgeAuditService = TestAuditService() @@ -162,7 +162,7 @@ class TunnelControlTest { Unit }.whenever(it).sendMessageToLocalBroker(any()) } - val bridgeProxiedReceiverService = TunnelingBridgeReceiverService(bridgeConfig, bridgeAuditService, haService, filterService) + val bridgeProxiedReceiverService = TunnelingBridgeReceiverService(bridgeConfig, maxMessageSize, bridgeAuditService, haService, filterService) val bridgeStateFollower = bridgeProxiedReceiverService.activeChange.toBlocking().iterator bridgeProxiedReceiverService.start() bridgeAuditService.start() @@ -183,7 +183,7 @@ class TunnelControlTest { doReturn(Observable.never()).whenever(it).onConnection doReturn(receiveObserver).whenever(it).onReceive } - val floatControlListener = FloatControlListenerService(floatConfig, floatAuditService, amqpListenerService) + val floatControlListener = FloatControlListenerService(floatConfig, maxMessageSize, floatAuditService, amqpListenerService) floatControlListener.start() floatAuditService.start() amqpListenerService.start() diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/filter/SimpleMessageFilterService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/filter/SimpleMessageFilterService.kt index 4a8a08cf85..7edf17e8b3 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/filter/SimpleMessageFilterService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/filter/SimpleMessageFilterService.kt @@ -80,7 +80,7 @@ class SimpleMessageFilterService(val conf: BridgeConfiguration, require(inboundMessage.payload.size > 0) { "No valid payload" } val validInboxTopic = bridgeSenderService.validateReceiveTopic(inboundMessage.topic, sourceLegalName) require(validInboxTopic) { "Topic not a legitimate Inbox for a node on this Artemis Broker ${inboundMessage.topic}" } - require(inboundMessage.applicationProperties.keys.all { it!!.toString() in whiteListedAMQPHeaders }) { "Disallowed header present in ${inboundMessage.applicationProperties.keys.map { it.toString() }}" } + require(inboundMessage.applicationProperties.keys.all { it in whiteListedAMQPHeaders }) { "Disallowed header present in ${inboundMessage.applicationProperties.keys}" } } override fun sendMessageToLocalBroker(inboundMessage: ReceivedMessage) { diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt index cb45877cf9..376a506459 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt @@ -31,6 +31,7 @@ import java.security.KeyStore import java.util.* class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration, + val maxMessageSize: Int, val auditService: BridgeAuditService, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeAMQPListenerService, ServiceStateSupport by stateHelper { companion object { @@ -60,7 +61,16 @@ class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration, val keyStore = loadKeyStoreAndWipeKeys(keyStoreBytes, keyStorePassword) val trustStore = loadKeyStoreAndWipeKeys(trustStoreBytes, trustStorePassword) val bindAddress = conf.inboundConfig!!.listeningAddress - val server = AMQPServer(bindAddress.host, bindAddress.port, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, conf.crlCheckSoftFail, conf.enableAMQPPacketTrace) + val server = AMQPServer(bindAddress.host, + bindAddress.port, + PEER_USER, + PEER_USER, + keyStore, + keyStorePrivateKeyPassword, + trustStore, + conf.crlCheckSoftFail, + conf.enableAMQPPacketTrace, + maxMessageSize) onConnectSubscription = server.onConnection.subscribe(_onConnection) onConnectAuditSubscription = server.onConnection.subscribe { if (it.connected) { diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt index 31cc594fed..d8f5851474 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt @@ -33,6 +33,7 @@ import kotlin.concurrent.withLock class FloatControlListenerService(val conf: BridgeConfiguration, + val maxMessageSize: Int, val auditService: BridgeAuditService, val amqpListener: BridgeAMQPListenerService, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : FloatControlService, ServiceStateSupport by stateHelper { @@ -82,7 +83,16 @@ class FloatControlListenerService(val conf: BridgeConfiguration, private fun startControlListener() { lock.withLock { - val controlServer = AMQPServer(floatControlAddress.host, floatControlAddress.port, null, null, keyStore, keyStorePrivateKeyPassword, trustStore, conf.crlCheckSoftFail, conf.enableAMQPPacketTrace) + val controlServer = AMQPServer(floatControlAddress.host, + floatControlAddress.port, + null, + null, + keyStore, + keyStorePrivateKeyPassword, + trustStore, + conf.crlCheckSoftFail, + conf.enableAMQPPacketTrace, + maxMessageSize) connectSubscriber = controlServer.onConnection.subscribe { onConnectToControl(it) } receiveSubscriber = controlServer.onReceive.subscribe { onControlMessage(it) } amqpControlServer = controlServer @@ -214,7 +224,7 @@ class FloatControlListenerService(val conf: BridgeConfiguration, message.complete(true) // consume message so it isn't resent forever return } - val appProperties = message.applicationProperties.map { Pair(it.key!!.toString(), it.value) }.toList() + val appProperties = message.applicationProperties.map { Pair(it.key, it.value) }.toList() try { val wrappedMessage = FloatDataPacket(message.topic, appProperties, diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt index 8a53c67223..b68eb59baa 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException class TunnelingBridgeReceiverService(val conf: BridgeConfiguration, + val maxMessageSize: Int, val auditService: BridgeAuditService, haService: BridgeMasterService, val filterService: IncomingMessageFilterService, @@ -72,7 +73,16 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration, statusSubscriber = statusFollower.activeChange.subscribe { if (it) { val floatAddresses = conf.bridgeInnerConfig!!.floatAddresses - val controlClient = AMQPClient(floatAddresses, setOf(expectedCertificateSubject), null, null, controlLinkKeyStore, controLinkKeyStorePrivateKeyPassword, controlLinkTrustStore, conf.crlCheckSoftFail, conf.enableAMQPPacketTrace) + val controlClient = AMQPClient(floatAddresses, + setOf(expectedCertificateSubject), + null, + null, + controlLinkKeyStore, + controLinkKeyStorePrivateKeyPassword, + controlLinkTrustStore, + conf.crlCheckSoftFail, + conf.enableAMQPPacketTrace, + maxMessageSize = maxMessageSize) connectSubscriber = controlClient.onConnection.subscribe { onConnectToControl(it) } receiveSubscriber = controlClient.onReceive.subscribe { onFloatMessage(it) } amqpControlClient = controlClient diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt index f0b500290f..f247e2a788 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt @@ -21,8 +21,9 @@ import net.corda.nodeapi.internal.bridging.BridgeControlListener import rx.Subscription class DirectBridgeSenderService(val conf: BridgeConfiguration, + val maxMessageSize: Int, val auditService: BridgeAuditService, - val haService: BridgeMasterService, + haService: BridgeMasterService, val artemisConnectionService: BridgeArtemisConnectionService, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeSenderService, ServiceStateSupport by stateHelper { companion object { @@ -33,7 +34,7 @@ class DirectBridgeSenderService(val conf: BridgeConfiguration, private var statusSubscriber: Subscription? = null private var connectionSubscriber: Subscription? = null private var listenerActiveSubscriber: Subscription? = null - private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf, conf.outboundConfig!!.socksProxyConfig, { ForwardingArtemisMessageClient(artemisConnectionService) }) + private var bridgeControlListener: BridgeControlListener = BridgeControlListener(conf, conf.outboundConfig!!.socksProxyConfig, maxMessageSize, { ForwardingArtemisMessageClient(artemisConnectionService) }) init { statusFollower = ServiceStateCombiner(listOf(auditService, artemisConnectionService, haService)) diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/BridgeSupervisorServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/BridgeSupervisorServiceImpl.kt index 5d1820dbac..5d7edd2394 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/BridgeSupervisorServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/BridgeSupervisorServiceImpl.kt @@ -49,13 +49,13 @@ class BridgeSupervisorServiceImpl(val conf: BridgeConfiguration, haService = ExternalMasterElectionService(conf, auditService) } artemisService = BridgeArtemisConnectionServiceImpl(conf, maxMessageSize, auditService) - senderService = DirectBridgeSenderService(conf, auditService, haService, artemisService) + senderService = DirectBridgeSenderService(conf, maxMessageSize, auditService, haService, artemisService) filterService = SimpleMessageFilterService(conf, auditService, artemisService, senderService) receiverService = if (conf.bridgeMode == BridgeMode.SenderReceiver) { InProcessBridgeReceiverService(conf, auditService, haService, inProcessAMQPListenerService!!, filterService) } else { require(inProcessAMQPListenerService == null) { "Should not have an in process instance of the AMQPListenerService" } - TunnelingBridgeReceiverService(conf, auditService, haService, filterService) + TunnelingBridgeReceiverService(conf, maxMessageSize, auditService, haService, filterService) } statusFollower = ServiceStateCombiner(listOf(haService, senderService, receiverService, filterService)) activeChange.subscribe { diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/FloatSupervisorServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/FloatSupervisorServiceImpl.kt index 2a7e9b309b..6300e0aa91 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/FloatSupervisorServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/supervisors/FloatSupervisorServiceImpl.kt @@ -34,10 +34,10 @@ class FloatSupervisorServiceImpl(val conf: BridgeConfiguration, private var statusSubscriber: Subscription? = null init { - amqpListenerService = BridgeAMQPListenerServiceImpl(conf, auditService) + amqpListenerService = BridgeAMQPListenerServiceImpl(conf, maxMessageSize, auditService) floatControlService = if (conf.bridgeMode == BridgeMode.FloatOuter) { require(conf.haConfig == null) { "Float process should not have HA config, that is controlled via the bridge." } - FloatControlListenerService(conf, auditService, amqpListenerService) + FloatControlListenerService(conf, maxMessageSize, auditService, amqpListenerService) } else { null } diff --git a/bridge/src/test/kotlin/net/corda/bridge/BridgeTestHelper.kt b/bridge/src/test/kotlin/net/corda/bridge/BridgeTestHelper.kt index 2dd0359c03..d3c691b361 100644 --- a/bridge/src/test/kotlin/net/corda/bridge/BridgeTestHelper.kt +++ b/bridge/src/test/kotlin/net/corda/bridge/BridgeTestHelper.kt @@ -33,10 +33,10 @@ import java.nio.file.Path import java.security.cert.X509Certificate import java.time.Instant -fun createNetworkParams(baseDirectory: Path) { +fun createNetworkParams(baseDirectory: Path): Int { val dummyNotaryParty = TestIdentity(DUMMY_NOTARY_NAME) val notaryInfo = NotaryInfo(dummyNotaryParty.party, false) - val copier = NetworkParametersCopier(NetworkParameters( + val networkParameters = NetworkParameters( minimumPlatformVersion = 1, notaries = listOf(notaryInfo), modifiedTime = Instant.now(), @@ -44,8 +44,10 @@ fun createNetworkParams(baseDirectory: Path) { maxTransactionSize = 40000, epoch = 1, whitelistedContractImplementations = emptyMap>() - ), overwriteFile = true) + ) + val copier = NetworkParametersCopier(networkParameters, overwriteFile = true) copier.install(baseDirectory) + return networkParameters.maxMessageSize }