mirror of
https://github.com/corda/corda.git
synced 2025-06-15 21:58:17 +00:00
CORDA-864 Wire up max message size (#3057)
* add checks on message size * added size check in AMQP bridge * passing maxMessageSize to AMQPClient and server * added Interceptor to enforce maxMessageSize on incoming messages
This commit is contained in:
@ -43,6 +43,7 @@ class ArtemisMessagingClient(private val config: SSLConfiguration,
|
||||
clientFailureCheckPeriod = -1
|
||||
minLargeMessageSize = maxMessageSize
|
||||
isUseGlobalPools = nodeSerializationEnv != null
|
||||
addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize))
|
||||
}
|
||||
val sessionFactory = locator.createSessionFactory()
|
||||
// Login using the node username. The broker will authenticate us as its node (as opposed to another peer)
|
||||
|
@ -30,7 +30,9 @@ class ArtemisMessagingComponent {
|
||||
const val BRIDGE_CONTROL = "${INTERNAL_PREFIX}bridge.control"
|
||||
const val BRIDGE_NOTIFY = "${INTERNAL_PREFIX}bridge.notify"
|
||||
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
|
||||
|
||||
// This is a rough guess on the extra space needed on top of maxMessageSize to store the journal.
|
||||
// TODO: we might want to make this value configurable.
|
||||
const val JOURNAL_HEADER_SIZE = 1024
|
||||
object P2PMessagingHeaders {
|
||||
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
|
||||
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
|
||||
|
@ -12,3 +12,7 @@ import java.nio.file.Path
|
||||
fun Path.requireOnDefaultFileSystem() {
|
||||
require(fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
|
||||
}
|
||||
|
||||
fun requireMessageSize(messageSize: Int, limit: Int) {
|
||||
require(messageSize <= limit) { "Message exceeds maxMessageSize network parameter, maxMessageSize: [$limit], message size: [$messageSize]" }
|
||||
}
|
||||
|
@ -0,0 +1,50 @@
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import org.apache.activemq.artemis.api.core.BaseInterceptor
|
||||
import org.apache.activemq.artemis.api.core.Interceptor
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePacket
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
|
||||
|
||||
class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<Packet>(maxMessageSize), Interceptor {
|
||||
override fun getMessageSize(packet: Packet?): Int? {
|
||||
return when (packet) {
|
||||
// This is an estimate of how much memory a Message body takes up.
|
||||
// Note, it is only an estimate
|
||||
is MessagePacket -> (packet.message.persistentSize - packet.message.headersAndPropertiesEncodeSize - 4).toInt()
|
||||
// Skip all artemis control messages.
|
||||
else -> null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class AmqpMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<AMQPMessage>(maxMessageSize), AmqpInterceptor {
|
||||
override fun getMessageSize(packet: AMQPMessage?): Int? = packet?.length
|
||||
}
|
||||
|
||||
/**
|
||||
* Artemis message interceptor to enforce maxMessageSize on incoming messages.
|
||||
*/
|
||||
sealed class MessageSizeChecksInterceptor<T : Any>(private val maxMessageSize: Int) : BaseInterceptor<T> {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
override fun intercept(packet: T, connection: RemotingConnection?): Boolean {
|
||||
val messageSize = getMessageSize(packet) ?: return true
|
||||
return if (messageSize > maxMessageSize) {
|
||||
logger.warn("Message size exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [$messageSize], " +
|
||||
"dropping message, client id :${connection?.clientID}")
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
// get size of the message in byte, returns null if the message is null or size don't need to be checked.
|
||||
abstract fun getMessageSize(packet: T?): Int?
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ import kotlin.concurrent.withLock
|
||||
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
class AMQPBridgeManager(config: NodeSSLConfiguration, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
|
||||
class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
|
||||
|
||||
private val lock = ReentrantLock()
|
||||
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
|
||||
@ -48,7 +48,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val artemisMessage
|
||||
private var artemis: ArtemisSessionProvider? = null
|
||||
private val crlCheckSoftFail: Boolean = config.crlCheckSoftFail
|
||||
|
||||
constructor(config: NodeSSLConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int) : this(config, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
|
||||
constructor(config: NodeSSLConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int) : this(config, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
|
||||
|
||||
companion object {
|
||||
private const val NUM_BRIDGE_THREADS = 0 // Default sized pool
|
||||
@ -70,14 +70,15 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val artemisMessage
|
||||
trustStore: KeyStore,
|
||||
crlCheckSoftFail: Boolean,
|
||||
sharedEventGroup: EventLoopGroup,
|
||||
private val artemis: ArtemisSessionProvider) {
|
||||
private val artemis: ArtemisSessionProvider,
|
||||
private val maxMessageSize: Int) {
|
||||
companion object {
|
||||
fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
|
||||
}
|
||||
|
||||
private val log = LoggerFactory.getLogger("$bridgeName:${legalNames.first()}")
|
||||
|
||||
val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedThreadPool = sharedEventGroup)
|
||||
val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedThreadPool = sharedEventGroup, maxMessageSize = maxMessageSize)
|
||||
val bridgeName: String get() = getBridgeName(queueName, target)
|
||||
private val lock = ReentrantLock() // lock to serialise session level access
|
||||
private var session: ClientSession? = null
|
||||
@ -129,6 +130,13 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val artemisMessage
|
||||
}
|
||||
|
||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
if (artemisMessage.bodySize > maxMessageSize) {
|
||||
log.warn("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " +
|
||||
"dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}")
|
||||
// Ack the message to prevent same message being sent to us again.
|
||||
artemisMessage.acknowledge()
|
||||
return
|
||||
}
|
||||
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
|
||||
val properties = HashMap<String, Any?>()
|
||||
for (key in P2PMessagingHeaders.whitelistedHeaders) {
|
||||
@ -171,7 +179,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val artemisMessage
|
||||
if (bridgeExists(getBridgeName(queueName, target))) {
|
||||
return
|
||||
}
|
||||
val newBridge = AMQPBridge(queueName, target, legalNames, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedEventLoopGroup!!, artemis!!)
|
||||
val newBridge = AMQPBridge(queueName, target, legalNames, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedEventLoopGroup!!, artemis!!, maxMessageSize)
|
||||
lock.withLock {
|
||||
bridgeNameToBridgeMap[newBridge.bridgeName] = newBridge
|
||||
}
|
||||
|
@ -19,16 +19,17 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import java.util.*
|
||||
|
||||
class BridgeControlListener(val config: NodeSSLConfiguration,
|
||||
maxMessageSize: Int,
|
||||
val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable {
|
||||
private val bridgeId: String = UUID.randomUUID().toString()
|
||||
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, artemisMessageClientFactory)
|
||||
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, maxMessageSize, artemisMessageClientFactory)
|
||||
private val validInboundQueues = mutableSetOf<String>()
|
||||
private var artemis: ArtemisSessionProvider? = null
|
||||
private var controlConsumer: ClientConsumer? = null
|
||||
|
||||
constructor(config: NodeSSLConfiguration,
|
||||
p2pAddress: NetworkHostAndPort,
|
||||
maxMessageSize: Int) : this(config, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
|
||||
maxMessageSize: Int) : this(config, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
|
||||
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
|
@ -15,6 +15,7 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||
import net.corda.nodeapi.internal.requireMessageSize
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.lang.Long.min
|
||||
@ -41,7 +42,8 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
||||
private val trustStore: KeyStore,
|
||||
private val crlCheckSoftFail: Boolean,
|
||||
private val trace: Boolean = false,
|
||||
private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable {
|
||||
private val sharedThreadPool: EventLoopGroup? = null,
|
||||
private val maxMessageSize: Int) : AutoCloseable {
|
||||
companion object {
|
||||
init {
|
||||
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
|
||||
@ -91,17 +93,15 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
||||
}
|
||||
}
|
||||
|
||||
private val closeListener = object : ChannelFutureListener {
|
||||
override fun operationComplete(future: ChannelFuture) {
|
||||
log.info("Disconnected from $currentTarget")
|
||||
future.channel()?.disconnect()
|
||||
clientChannel = null
|
||||
if (!stopping) {
|
||||
workerGroup?.schedule({
|
||||
nextTarget()
|
||||
restart()
|
||||
}, retryInterval, TimeUnit.MILLISECONDS)
|
||||
}
|
||||
private val closeListener = ChannelFutureListener { future ->
|
||||
log.info("Disconnected from $currentTarget")
|
||||
future.channel()?.disconnect()
|
||||
clientChannel = null
|
||||
if (!stopping) {
|
||||
workerGroup?.schedule({
|
||||
nextTarget()
|
||||
restart()
|
||||
}, retryInterval, TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
@ -182,6 +182,7 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
||||
topic: String,
|
||||
destinationLegalName: String,
|
||||
properties: Map<String, Any?>): SendableMessage {
|
||||
requireMessageSize(payload.size, maxMessageSize)
|
||||
return SendableMessageImpl(payload, topic, destinationLegalName, currentTarget, properties)
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,7 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||
import net.corda.nodeapi.internal.requireMessageSize
|
||||
import org.apache.qpid.proton.engine.Delivery
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
@ -41,7 +42,8 @@ class AMQPServer(val hostName: String,
|
||||
private val keyStorePrivateKeyPassword: CharArray,
|
||||
private val trustStore: KeyStore,
|
||||
private val crlCheckSoftFail: Boolean,
|
||||
private val trace: Boolean = false) : AutoCloseable {
|
||||
private val trace: Boolean = false,
|
||||
private val maxMessageSize: Int) : AutoCloseable {
|
||||
|
||||
companion object {
|
||||
init {
|
||||
@ -68,7 +70,8 @@ class AMQPServer(val hostName: String,
|
||||
keyStorePrivateKeyPassword: String,
|
||||
trustStore: KeyStore,
|
||||
crlCheckSoftFail: Boolean,
|
||||
trace: Boolean = false) : this(hostName, port, userName, password, keyStore, keyStorePrivateKeyPassword.toCharArray(), trustStore, crlCheckSoftFail, trace)
|
||||
trace: Boolean = false,
|
||||
maxMessageSize: Int) : this(hostName, port, userName, password, keyStore, keyStorePrivateKeyPassword.toCharArray(), trustStore, crlCheckSoftFail, trace, maxMessageSize)
|
||||
|
||||
private class ServerChannelInitializer(val parent: AMQPServer) : ChannelInitializer<SocketChannel>() {
|
||||
private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
|
||||
@ -156,6 +159,7 @@ class AMQPServer(val hostName: String,
|
||||
destinationLegalName: String,
|
||||
destinationLink: NetworkHostAndPort,
|
||||
properties: Map<String, Any?>): SendableMessage {
|
||||
requireMessageSize(payload.size, maxMessageSize)
|
||||
val dest = InetSocketAddress(destinationLink.host, destinationLink.port)
|
||||
require(dest in clientChannels.keys) {
|
||||
"Destination not available"
|
||||
|
Reference in New Issue
Block a user