Address #665, add more leak tests

This commit is contained in:
Andras Slemmer 2017-05-15 13:18:28 +01:00
parent b192a86a30
commit f445590cff
9 changed files with 191 additions and 72 deletions

View File

@ -7,18 +7,17 @@ import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.ErrorOr
import net.corda.core.getOrThrow
import net.corda.core.*
import net.corda.core.messaging.RPCOps
import net.corda.core.millis
import net.corda.core.random63BitValue
import net.corda.node.driver.poll
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.RPCKryo
import net.corda.testing.*
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test
import rx.Observable
@ -60,7 +59,7 @@ class RPCStabilityTests {
fun startAndStop() {
rpcDriver {
val server = startRpcServer<RPCOps>(ops = DummyOps)
startRpcClient<RPCOps>(server.get().hostAndPort).get()
startRpcClient<RPCOps>(server.get().broker.hostAndPort!!).get()
}
}
repeat(5) {
@ -84,7 +83,10 @@ class RPCStabilityTests {
rpcDriver {
ErrorOr.catch { startRpcClient<RPCOps>(HostAndPort.fromString("localhost:9999")).get() }
val server = startRpcServer<RPCOps>(ops = DummyOps)
ErrorOr.catch { startRpcClient<RPCOps>(server.get().hostAndPort, configuration = RPCClientConfiguration.default.copy(minimumServerProtocolVersion = 1)).get() }
ErrorOr.catch { startRpcClient<RPCOps>(
server.get().broker.hostAndPort!!,
configuration = RPCClientConfiguration.default.copy(minimumServerProtocolVersion = 1)
).get() }
}
}
repeat(5) {
@ -99,6 +101,85 @@ class RPCStabilityTests {
executor.shutdownNow()
}
fun RpcBrokerHandle.getStats(): Map<String, Any> {
return serverControl.run {
mapOf(
"connections" to listConnectionIDs().toSet(),
"sessionCount" to listConnectionIDs().flatMap { listSessions(it).toList() }.size,
"consumerCount" to totalConsumerCount
)
}
}
@Test
fun `rpc server close doesnt leak broker resources`() {
rpcDriver {
fun startAndCloseServer(broker: RpcBrokerHandle) {
startRpcServerWithBrokerRunning(
configuration = RPCServerConfiguration.default.copy(consumerPoolSize = 1, producerPoolBound = 1),
ops = DummyOps,
brokerHandle = broker
).rpcServer.close()
}
val broker = startRpcBroker().get()
startAndCloseServer(broker)
val initial = broker.getStats()
repeat(100) {
startAndCloseServer(broker)
}
pollUntilTrue("broker resources to be released") {
initial == broker.getStats()
}
}
}
@Test
fun `rpc client close doesnt leak broker resources`() {
rpcDriver {
val server = startRpcServer(configuration = RPCServerConfiguration.default.copy(consumerPoolSize = 1, producerPoolBound = 1), ops = DummyOps).get()
RPCClient<RPCOps>(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password).close()
val initial = server.broker.getStats()
repeat(100) {
val connection = RPCClient<RPCOps>(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password)
connection.close()
}
pollUntilTrue("broker resources to be released") {
initial == server.broker.getStats()
}
}
}
@Test
fun `rpc server close is idempotent`() {
rpcDriver {
val server = startRpcServer(ops = DummyOps).get()
repeat(10) {
server.rpcServer.close()
}
}
}
@Test
fun `rpc client close is idempotent`() {
rpcDriver {
val serverShutdown = shutdownManager.follower()
val server = startRpcServer(ops = DummyOps).get()
serverShutdown.unfollow()
// With the server up
val connection1 = RPCClient<RPCOps>(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password)
repeat(10) {
connection1.close()
}
val connection2 = RPCClient<RPCOps>(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password)
serverShutdown.shutdown()
// With the server down
repeat(10) {
connection2.close()
}
}
}
interface LeakObservableOps: RPCOps {
fun leakObservable(): Observable<Nothing>
}
@ -116,7 +197,7 @@ class RPCStabilityTests {
}
}
val server = startRpcServer<LeakObservableOps>(ops = leakObservableOpsImpl)
val proxy = startRpcClient<LeakObservableOps>(server.get().hostAndPort).get()
val proxy = startRpcClient<LeakObservableOps>(server.get().broker.hostAndPort!!).get()
// Leak many observables
val N = 200
(1..N).toList().parallelStream().forEach {
@ -143,7 +224,7 @@ class RPCStabilityTests {
override fun ping() = "pong"
}
val serverFollower = shutdownManager.follower()
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().hostAndPort
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
val clientFollower = shutdownManager.follower()
val client = startRpcClient<ReconnectOps>(serverPort).getOrThrow()
@ -185,7 +266,7 @@ class RPCStabilityTests {
val numberOfClients = 4
val clients = Futures.allAsList((1 .. numberOfClients).map {
startRandomRpcClient<TrackSubscriberOps>(server.hostAndPort)
startRandomRpcClient<TrackSubscriberOps>(server.broker.hostAndPort!!)
}).get()
// Poll until all clients connect
@ -230,7 +311,7 @@ class RPCStabilityTests {
// Construct an RPC session manually so that we can hang in the message handler
val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}"
val session = startArtemisSession(server.hostAndPort)
val session = startArtemisSession(server.broker.hostAndPort!!)
session.createTemporaryQueue(myQueue, myQueue)
val consumer = session.createConsumer(myQueue, null, -1, -1, false)
consumer.setMessageHandler {
@ -262,7 +343,7 @@ class RPCStabilityTests {
fun RPCDriverExposedDSLInterface.pollUntilClientNumber(server: RpcServerHandle, expected: Int) {
pollUntilTrue("number of RPC clients to become $expected") {
val clientAddresses = server.serverControl.addressNames.filter { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) }
val clientAddresses = server.broker.serverControl.addressNames.filter { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) }
clientAddresses.size == expected
}.get()
}

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ServerLocator
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal
import rx.Notification
import rx.Observable
import rx.subjects.UnicastSubject
@ -265,16 +266,12 @@ class RPCClientProxyHandler(
* Closes the RPC proxy. Reaps all observables, shuts down the reaper, closes all sessions and executors.
*/
fun close() {
sessionAndConsumer?.consumer?.close()
sessionAndConsumer?.session?.close()
sessionAndConsumer?.sessionFactory?.close()
reaperScheduledFuture?.cancel(false)
observableContext.observableMap.invalidateAll()
reapObservables()
reaperExecutor?.shutdownNow()
sessionAndProducerPool.close().forEach {
it.producer.close()
it.session.close()
it.sessionFactory.close()
}
// Note the ordering is important, we shut down the consumer *before* the observation executor, otherwise we may

View File

@ -47,8 +47,8 @@ open class AbstractRPCTest {
}.get()
RPCTestMode.Netty ->
startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { server ->
startRpcClient<I>(server.hostAndPort, rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startArtemisSession(server.hostAndPort, rpcUser.username, rpcUser.password) })
startRpcClient<I>(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startArtemisSession(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password) })
}
}.get()
}

View File

@ -2,6 +2,7 @@ package net.corda.core.contracts
import net.corda.core.identity.Party
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.DeserializeAsKotlinObjectDef
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.TransactionBuilder
import java.security.PublicKey
@ -60,7 +61,7 @@ sealed class TransactionType {
abstract fun verifyTransaction(tx: LedgerTransaction)
/** A general transaction type where transaction validity is determined by custom contract code */
object General : TransactionType() {
object General : TransactionType(), DeserializeAsKotlinObjectDef {
/** Just uses the default [TransactionBuilder] with no special logic */
class Builder(notary: Party?) : TransactionBuilder(General, notary)
@ -140,7 +141,7 @@ sealed class TransactionType {
* A special transaction type for reassigning a notary for a state. Validation does not involve running
* any contract code, it just checks that the states are unmodified apart from the notary field.
*/
object NotaryChange : TransactionType() {
object NotaryChange : TransactionType(), DeserializeAsKotlinObjectDef {
/**
* A transaction builder that automatically sets the transaction type to [NotaryChange]
* and adds the list of participants to the signers set for every input state.

View File

@ -59,8 +59,10 @@ class LazyPool<A>(
* the returned iterable will be inaccurate.
*/
fun close(): Iterable<A> {
lifeCycle.transition(State.STARTED, State.FINISHED)
return poolQueue
lifeCycle.justTransition(State.FINISHED)
val elements = poolQueue.toList()
poolQueue.clear()
return elements
}
inline fun <R> run(withInstance: (A) -> R): R {

View File

@ -109,7 +109,7 @@ class ContractUpgradeFlowTest {
rpcAddress = startRpcServer(
rpcUser = user,
ops = CordaRPCOpsImpl(node.services, node.smm, node.database)
).get().hostAndPort,
).get().broker.hostAndPort!!,
username = user.username,
password = user.password
).get()

View File

@ -456,10 +456,7 @@ class DriverDSL(
override fun shutdown() {
_shutdownManager?.shutdown()
_executorService?.apply {
shutdownNow()
require(awaitTermination(1, TimeUnit.SECONDS))
}
_executorService?.shutdownNow()
}
private fun establishRpc(nodeAddress: HostAndPort, sslConfig: SSLConfiguration): ListenableFuture<CordaRPCOps> {

View File

@ -173,15 +173,11 @@ class RPCServer(
rpcExecutor?.shutdownNow()
reaperExecutor?.shutdownNow()
sessionAndConsumers.forEach {
it.consumer.close()
it.session.close()
it.sessionFactory.close()
}
observableMap.invalidateAll()
reapSubscriptions()
sessionAndProducerPool.close().forEach {
it.producer.close()
it.session.close()
it.sessionFactory.close()
}
lifeCycle.justTransition(State.FINISHED)
@ -257,7 +253,6 @@ class RPCServer(
}
private fun reapSubscriptions() {
lifeCycle.requireState(State.STARTED)
observableMap.cleanUp()
}

View File

@ -9,6 +9,7 @@ import net.corda.client.mock.string
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.div
import net.corda.core.map
import net.corda.core.messaging.RPCOps
import net.corda.core.random63BitValue
import net.corda.core.utilities.ProcessUtilities
@ -64,7 +65,7 @@ interface RPCDriverExposedDSLInterface : DriverDSLExposedInterface {
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE,
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
ops : I
): ListenableFuture<Unit>
): ListenableFuture<RpcServerHandle>
/**
* Starts an In-VM RPC client.
@ -156,6 +157,28 @@ interface RPCDriverExposedDSLInterface : DriverDSLExposedInterface {
username: String = rpcTestUser.username,
password: String = rpcTestUser.password
): ClientSession
fun startRpcBroker(
serverName: String = "driver-rpc-server-${random63BitValue()}",
rpcUser: User = rpcTestUser,
maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE,
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE,
customPort: HostAndPort? = null
): ListenableFuture<RpcBrokerHandle>
fun startInVmRpcBroker(
rpcUser: User = rpcTestUser,
maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE,
maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE
): ListenableFuture<RpcBrokerHandle>
fun <I : RPCOps> startRpcServerWithBrokerRunning(
rpcUser: User = rpcTestUser,
nodeLegalName: X500Name = fakeNodeLegalName,
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
ops: I,
brokerHandle: RpcBrokerHandle
): RpcServerHandle
}
inline fun <reified I : RPCOps> RPCDriverExposedDSLInterface.startInVmRpcClient(
username: String = rpcTestUser.username,
@ -176,11 +199,17 @@ inline fun <reified I : RPCOps> RPCDriverExposedDSLInterface.startRpcClient(
interface RPCDriverInternalDSLInterface : DriverDSLInternalInterface, RPCDriverExposedDSLInterface
data class RpcServerHandle(
val hostAndPort: HostAndPort,
data class RpcBrokerHandle(
val hostAndPort: HostAndPort?, /** null if this is an InVM broker */
val clientTransportConfiguration: TransportConfiguration,
val serverControl: ActiveMQServerControl
)
data class RpcServerHandle(
val broker: RpcBrokerHandle,
val rpcServer: RPCServer
)
val rpcTestUser = User("user1", "test", permissions = emptySet())
val fakeNodeLegalName = X500Name("CN=not:a:valid:name")
@ -194,7 +223,7 @@ fun <A> rpcDriver(
debugPortAllocation: PortAllocation = globalDebugPortAllocation,
systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false,
networkMapStartStrategy: NetworkMapStartStrategy = FalseNetworkMap,
networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false),
dsl: RPCDriverExposedDSLInterface.() -> A
) = genericDriver(
driverDsl = RPCDriverDSL(
@ -293,21 +322,9 @@ data class RPCDriverDSL(
maxBufferedBytesPerClient: Long,
configuration: RPCServerConfiguration,
ops: I
): ListenableFuture<Unit> {
return driverDSL.executorService.submit<Unit> {
val artemisConfig = createInVmRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient)
val server = EmbeddedActiveMQ()
server.setConfiguration(artemisConfig)
server.setSecurityManager(SingleUserSecurityManager(rpcUser))
server.start()
driverDSL.shutdownManager.registerShutdown {
server.activeMQServer.stop()
server.stop()
}
startRpcServerWithBrokerRunning(
rpcUser, nodeLegalName, configuration, ops, inVmClientTransportConfiguration,
server.activeMQServer.activeMQServerControl
)
): ListenableFuture<RpcServerHandle> {
return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker ->
startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker)
}
}
@ -344,22 +361,8 @@ data class RPCDriverDSL(
customPort: HostAndPort?,
ops: I
): ListenableFuture<RpcServerHandle> {
val hostAndPort = customPort ?: driverDSL.portAllocation.nextHostAndPort()
addressMustNotBeBound(driverDSL.executorService, hostAndPort)
return driverDSL.executorService.submit<RpcServerHandle> {
val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, driverDSL.driverDirectory / serverName, hostAndPort)
val server = ActiveMQServerImpl(artemisConfig, SingleUserSecurityManager(rpcUser))
server.start()
driverDSL.shutdownManager.registerShutdown {
server.stop()
addressMustNotBeBound(driverDSL.executorService, hostAndPort).get()
}
val transportConfiguration = createNettyClientTransportConfiguration(hostAndPort)
startRpcServerWithBrokerRunning(
rpcUser, nodeLegalName, configuration, ops, transportConfiguration,
server.activeMQServerControl
)
RpcServerHandle(hostAndPort, server.activeMQServerControl)
return startRpcBroker(serverName, rpcUser, maxFileSize, maxBufferedBytesPerClient, customPort).map { broker ->
startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker)
}
}
@ -401,16 +404,58 @@ data class RPCDriverDSL(
return session
}
override fun startRpcBroker(
serverName: String,
rpcUser: User,
maxFileSize: Int,
maxBufferedBytesPerClient: Long,
customPort: HostAndPort?
): ListenableFuture<RpcBrokerHandle> {
val hostAndPort = customPort ?: driverDSL.portAllocation.nextHostAndPort()
addressMustNotBeBound(driverDSL.executorService, hostAndPort)
return driverDSL.executorService.submit<RpcBrokerHandle> {
val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, driverDSL.driverDirectory / serverName, hostAndPort)
val server = ActiveMQServerImpl(artemisConfig, SingleUserSecurityManager(rpcUser))
server.start()
driverDSL.shutdownManager.registerShutdown {
server.stop()
addressMustNotBeBound(driverDSL.executorService, hostAndPort).get()
}
RpcBrokerHandle(
hostAndPort = hostAndPort,
clientTransportConfiguration = createNettyClientTransportConfiguration(hostAndPort),
serverControl = server.activeMQServerControl
)
}
}
private fun <I : RPCOps> startRpcServerWithBrokerRunning(
override fun startInVmRpcBroker(rpcUser: User, maxFileSize: Int, maxBufferedBytesPerClient: Long): ListenableFuture<RpcBrokerHandle> {
return driverDSL.executorService.submit<RpcBrokerHandle> {
val artemisConfig = createInVmRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient)
val server = EmbeddedActiveMQ()
server.setConfiguration(artemisConfig)
server.setSecurityManager(SingleUserSecurityManager(rpcUser))
server.start()
driverDSL.shutdownManager.registerShutdown {
server.activeMQServer.stop()
server.stop()
}
RpcBrokerHandle(
hostAndPort = null,
clientTransportConfiguration = inVmClientTransportConfiguration,
serverControl = server.activeMQServer.activeMQServerControl
)
}
}
override fun <I : RPCOps> startRpcServerWithBrokerRunning(
rpcUser: User,
nodeLegalName: X500Name,
configuration: RPCServerConfiguration,
ops: I,
transportConfiguration: TransportConfiguration,
serverControl: ActiveMQServerControl
) {
val locator = ActiveMQClient.createServerLocatorWithoutHA(transportConfiguration).apply {
brokerHandle: RpcBrokerHandle
): RpcServerHandle {
val locator = ActiveMQClient.createServerLocatorWithoutHA(brokerHandle.clientTransportConfiguration).apply {
minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE
}
val userService = object : RPCUserService {
@ -430,7 +475,8 @@ data class RPCDriverDSL(
rpcServer.close()
locator.close()
}
rpcServer.start(serverControl)
rpcServer.start(brokerHandle.serverControl)
return RpcServerHandle(brokerHandle, rpcServer)
}
}