mirror of
https://github.com/corda/corda.git
synced 2024-12-24 07:06:44 +00:00
CORDA-2981 Disable slow consumers for RPC since it doesn't work. (#5194)
* CORDA-2981 Disable slow consumers for RPC since it doesn't work.
This commit is contained in:
parent
8678bad88d
commit
0c0101948b
@ -21,6 +21,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
|||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
import org.junit.Assert.*
|
import org.junit.Assert.*
|
||||||
|
import org.junit.Ignore
|
||||||
import org.junit.Rule
|
import org.junit.Rule
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
@ -498,6 +499,7 @@ class RPCStabilityTests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@Ignore // TODO: This is ignored because Artemis slow consumers are broken. I'm not deleting it in case we can get the feature fixed.
|
||||||
fun `slow consumers are kicked`() {
|
fun `slow consumers are kicked`() {
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
val server = startRpcServer(maxBufferedBytesPerClient = 10 * 1024 * 1024, ops = SlowConsumerRPCOpsImpl()).get()
|
val server = startRpcServer(maxBufferedBytesPerClient = 10 * 1024 * 1024, ops = SlowConsumerRPCOpsImpl()).get()
|
||||||
@ -529,7 +531,7 @@ class RPCStabilityTests {
|
|||||||
producer.send(message)
|
producer.send(message)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
// We are consuming slower than the server is producing, so we should be kicked after a while
|
// We are consuming slower than the server is producing, so we should be kicked after a while if slow consumers are enabled.
|
||||||
pollUntilClientNumber(server, 0)
|
pollUntilClientNumber(server, 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,6 @@ import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
|
|||||||
import org.apache.activemq.artemis.core.security.Role
|
import org.apache.activemq.artemis.core.security.Role
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
||||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy
|
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
||||||
internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, jmxEnabled: Boolean, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort?, sslOptions: BrokerRpcSslOptions?, useSsl: Boolean, nodeConfiguration: MutualSslConfiguration, shouldStartLocalShell: Boolean) : SecureArtemisConfiguration() {
|
internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, jmxEnabled: Boolean, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort?, sslOptions: BrokerRpcSslOptions?, useSsl: Boolean, nodeConfiguration: MutualSslConfiguration, shouldStartLocalShell: Boolean) : SecureArtemisConfiguration() {
|
||||||
@ -42,9 +41,6 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
|
|||||||
maxSizeBytes = 5L * maxMessageSize
|
maxSizeBytes = 5L * maxMessageSize
|
||||||
addressFullMessagePolicy = AddressFullMessagePolicy.PAGE
|
addressFullMessagePolicy = AddressFullMessagePolicy.PAGE
|
||||||
pageSizeBytes = 1L * maxMessageSize
|
pageSizeBytes = 1L * maxMessageSize
|
||||||
slowConsumerPolicy = SlowConsumerPolicy.KILL
|
|
||||||
slowConsumerThreshold = 1
|
|
||||||
slowConsumerCheckPeriod = 30
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -50,7 +50,6 @@ import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
|
|||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
||||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy
|
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3
|
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3
|
||||||
import java.lang.reflect.Method
|
import java.lang.reflect.Method
|
||||||
@ -207,9 +206,6 @@ data class RPCDriverDSL(
|
|||||||
maxSizeBytes = maxBufferedBytesPerClient
|
maxSizeBytes = maxBufferedBytesPerClient
|
||||||
addressFullMessagePolicy = AddressFullMessagePolicy.PAGE
|
addressFullMessagePolicy = AddressFullMessagePolicy.PAGE
|
||||||
pageSizeBytes = maxSizeBytes / 10
|
pageSizeBytes = maxSizeBytes / 10
|
||||||
slowConsumerPolicy = SlowConsumerPolicy.KILL
|
|
||||||
slowConsumerThreshold = 1
|
|
||||||
slowConsumerCheckPeriod = 30
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user