diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index b325c0bc8f..3ecb3eda6e 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -7,18 +7,17 @@ import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.pool.KryoPool import com.google.common.net.HostAndPort import com.google.common.util.concurrent.Futures +import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.RPCClientConfiguration -import net.corda.core.ErrorOr -import net.corda.core.getOrThrow +import net.corda.core.* import net.corda.core.messaging.RPCOps -import net.corda.core.millis -import net.corda.core.random63BitValue import net.corda.node.driver.poll import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.RPCApi import net.corda.nodeapi.RPCKryo import net.corda.testing.* import org.apache.activemq.artemis.api.core.SimpleString +import org.junit.Assert.assertEquals import org.junit.Assert.assertTrue import org.junit.Test import rx.Observable @@ -60,7 +59,7 @@ class RPCStabilityTests { fun startAndStop() { rpcDriver { val server = startRpcServer(ops = DummyOps) - startRpcClient(server.get().hostAndPort).get() + startRpcClient(server.get().broker.hostAndPort!!).get() } } repeat(5) { @@ -84,7 +83,10 @@ class RPCStabilityTests { rpcDriver { ErrorOr.catch { startRpcClient(HostAndPort.fromString("localhost:9999")).get() } val server = startRpcServer(ops = DummyOps) - ErrorOr.catch { startRpcClient(server.get().hostAndPort, configuration = RPCClientConfiguration.default.copy(minimumServerProtocolVersion = 1)).get() } + ErrorOr.catch { startRpcClient( + server.get().broker.hostAndPort!!, + configuration = RPCClientConfiguration.default.copy(minimumServerProtocolVersion = 1) + ).get() } } } repeat(5) { @@ -99,6 +101,85 @@ class RPCStabilityTests { executor.shutdownNow() } + fun RpcBrokerHandle.getStats(): Map { + return serverControl.run { + mapOf( + "connections" to listConnectionIDs().toSet(), + "sessionCount" to listConnectionIDs().flatMap { listSessions(it).toList() }.size, + "consumerCount" to totalConsumerCount + ) + } + } + + @Test + fun `rpc server close doesnt leak broker resources`() { + rpcDriver { + fun startAndCloseServer(broker: RpcBrokerHandle) { + startRpcServerWithBrokerRunning( + configuration = RPCServerConfiguration.default.copy(consumerPoolSize = 1, producerPoolBound = 1), + ops = DummyOps, + brokerHandle = broker + ).rpcServer.close() + } + + val broker = startRpcBroker().get() + startAndCloseServer(broker) + val initial = broker.getStats() + repeat(100) { + startAndCloseServer(broker) + } + pollUntilTrue("broker resources to be released") { + initial == broker.getStats() + } + } + } + + @Test + fun `rpc client close doesnt leak broker resources`() { + rpcDriver { + val server = startRpcServer(configuration = RPCServerConfiguration.default.copy(consumerPoolSize = 1, producerPoolBound = 1), ops = DummyOps).get() + RPCClient(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password).close() + val initial = server.broker.getStats() + repeat(100) { + val connection = RPCClient(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password) + connection.close() + } + pollUntilTrue("broker resources to be released") { + initial == server.broker.getStats() + } + } + } + + @Test + fun `rpc server close is idempotent`() { + rpcDriver { + val server = startRpcServer(ops = DummyOps).get() + repeat(10) { + server.rpcServer.close() + } + } + } + + @Test + fun `rpc client close is idempotent`() { + rpcDriver { + val serverShutdown = shutdownManager.follower() + val server = startRpcServer(ops = DummyOps).get() + serverShutdown.unfollow() + // With the server up + val connection1 = RPCClient(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password) + repeat(10) { + connection1.close() + } + val connection2 = RPCClient(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password) + serverShutdown.shutdown() + // With the server down + repeat(10) { + connection2.close() + } + } + } + interface LeakObservableOps: RPCOps { fun leakObservable(): Observable } @@ -116,7 +197,7 @@ class RPCStabilityTests { } } val server = startRpcServer(ops = leakObservableOpsImpl) - val proxy = startRpcClient(server.get().hostAndPort).get() + val proxy = startRpcClient(server.get().broker.hostAndPort!!).get() // Leak many observables val N = 200 (1..N).toList().parallelStream().forEach { @@ -143,7 +224,7 @@ class RPCStabilityTests { override fun ping() = "pong" } val serverFollower = shutdownManager.follower() - val serverPort = startRpcServer(ops = ops).getOrThrow().hostAndPort + val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort).getOrThrow() @@ -185,7 +266,7 @@ class RPCStabilityTests { val numberOfClients = 4 val clients = Futures.allAsList((1 .. numberOfClients).map { - startRandomRpcClient(server.hostAndPort) + startRandomRpcClient(server.broker.hostAndPort!!) }).get() // Poll until all clients connect @@ -230,7 +311,7 @@ class RPCStabilityTests { // Construct an RPC session manually so that we can hang in the message handler val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}" - val session = startArtemisSession(server.hostAndPort) + val session = startArtemisSession(server.broker.hostAndPort!!) session.createTemporaryQueue(myQueue, myQueue) val consumer = session.createConsumer(myQueue, null, -1, -1, false) consumer.setMessageHandler { @@ -262,7 +343,7 @@ class RPCStabilityTests { fun RPCDriverExposedDSLInterface.pollUntilClientNumber(server: RpcServerHandle, expected: Int) { pollUntilTrue("number of RPC clients to become $expected") { - val clientAddresses = server.serverControl.addressNames.filter { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) } + val clientAddresses = server.broker.serverControl.addressNames.filter { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) } clientAddresses.size == expected }.get() } \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 3d5232481c..f8b6bf3548 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE import org.apache.activemq.artemis.api.core.client.ClientMessage import org.apache.activemq.artemis.api.core.client.ServerLocator +import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal import rx.Notification import rx.Observable import rx.subjects.UnicastSubject @@ -265,16 +266,12 @@ class RPCClientProxyHandler( * Closes the RPC proxy. Reaps all observables, shuts down the reaper, closes all sessions and executors. */ fun close() { - sessionAndConsumer?.consumer?.close() - sessionAndConsumer?.session?.close() sessionAndConsumer?.sessionFactory?.close() reaperScheduledFuture?.cancel(false) observableContext.observableMap.invalidateAll() reapObservables() reaperExecutor?.shutdownNow() sessionAndProducerPool.close().forEach { - it.producer.close() - it.session.close() it.sessionFactory.close() } // Note the ordering is important, we shut down the consumer *before* the observation executor, otherwise we may diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt index 6139ad79fb..20026ab7c1 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt @@ -47,8 +47,8 @@ open class AbstractRPCTest { }.get() RPCTestMode.Netty -> startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { server -> - startRpcClient(server.hostAndPort, rpcUser.username, rpcUser.password, clientConfiguration).map { - TestProxy(it, { startArtemisSession(server.hostAndPort, rpcUser.username, rpcUser.password) }) + startRpcClient(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map { + TestProxy(it, { startArtemisSession(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password) }) } }.get() } diff --git a/core/src/main/kotlin/net/corda/core/contracts/TransactionTypes.kt b/core/src/main/kotlin/net/corda/core/contracts/TransactionTypes.kt index bac9fbde9d..d40e3ef608 100644 --- a/core/src/main/kotlin/net/corda/core/contracts/TransactionTypes.kt +++ b/core/src/main/kotlin/net/corda/core/contracts/TransactionTypes.kt @@ -2,6 +2,7 @@ package net.corda.core.contracts import net.corda.core.identity.Party import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.DeserializeAsKotlinObjectDef import net.corda.core.transactions.LedgerTransaction import net.corda.core.transactions.TransactionBuilder import java.security.PublicKey @@ -60,7 +61,7 @@ sealed class TransactionType { abstract fun verifyTransaction(tx: LedgerTransaction) /** A general transaction type where transaction validity is determined by custom contract code */ - object General : TransactionType() { + object General : TransactionType(), DeserializeAsKotlinObjectDef { /** Just uses the default [TransactionBuilder] with no special logic */ class Builder(notary: Party?) : TransactionBuilder(General, notary) @@ -140,7 +141,7 @@ sealed class TransactionType { * A special transaction type for reassigning a notary for a state. Validation does not involve running * any contract code, it just checks that the states are unmodified apart from the notary field. */ - object NotaryChange : TransactionType() { + object NotaryChange : TransactionType(), DeserializeAsKotlinObjectDef { /** * A transaction builder that automatically sets the transaction type to [NotaryChange] * and adds the list of participants to the signers set for every input state. diff --git a/core/src/main/kotlin/net/corda/core/utilities/LazyPool.kt b/core/src/main/kotlin/net/corda/core/utilities/LazyPool.kt index 1a1abebdca..2649924aa1 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/LazyPool.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/LazyPool.kt @@ -59,8 +59,10 @@ class LazyPool( * the returned iterable will be inaccurate. */ fun close(): Iterable { - lifeCycle.transition(State.STARTED, State.FINISHED) - return poolQueue + lifeCycle.justTransition(State.FINISHED) + val elements = poolQueue.toList() + poolQueue.clear() + return elements } inline fun run(withInstance: (A) -> R): R { diff --git a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt index 032a979672..bdd09610bd 100644 --- a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt +++ b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt @@ -109,7 +109,7 @@ class ContractUpgradeFlowTest { rpcAddress = startRpcServer( rpcUser = user, ops = CordaRPCOpsImpl(node.services, node.smm, node.database) - ).get().hostAndPort, + ).get().broker.hostAndPort!!, username = user.username, password = user.password ).get() diff --git a/node/src/main/kotlin/net/corda/node/driver/Driver.kt b/node/src/main/kotlin/net/corda/node/driver/Driver.kt index 9e53ada244..d2a9f3f2a1 100644 --- a/node/src/main/kotlin/net/corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/net/corda/node/driver/Driver.kt @@ -456,10 +456,7 @@ class DriverDSL( override fun shutdown() { _shutdownManager?.shutdown() - _executorService?.apply { - shutdownNow() - require(awaitTermination(1, TimeUnit.SECONDS)) - } + _executorService?.shutdownNow() } private fun establishRpc(nodeAddress: HostAndPort, sslConfig: SSLConfiguration): ListenableFuture { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index 693bbdf1e8..81935366a9 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -173,15 +173,11 @@ class RPCServer( rpcExecutor?.shutdownNow() reaperExecutor?.shutdownNow() sessionAndConsumers.forEach { - it.consumer.close() - it.session.close() it.sessionFactory.close() } observableMap.invalidateAll() reapSubscriptions() sessionAndProducerPool.close().forEach { - it.producer.close() - it.session.close() it.sessionFactory.close() } lifeCycle.justTransition(State.FINISHED) @@ -257,7 +253,6 @@ class RPCServer( } private fun reapSubscriptions() { - lifeCycle.requireState(State.STARTED) observableMap.cleanUp() } diff --git a/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt b/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt index 94a98c45fb..5bc7af456e 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt @@ -9,6 +9,7 @@ import net.corda.client.mock.string import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.core.div +import net.corda.core.map import net.corda.core.messaging.RPCOps import net.corda.core.random63BitValue import net.corda.core.utilities.ProcessUtilities @@ -64,7 +65,7 @@ interface RPCDriverExposedDSLInterface : DriverDSLExposedInterface { maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE, configuration: RPCServerConfiguration = RPCServerConfiguration.default, ops : I - ): ListenableFuture + ): ListenableFuture /** * Starts an In-VM RPC client. @@ -156,6 +157,28 @@ interface RPCDriverExposedDSLInterface : DriverDSLExposedInterface { username: String = rpcTestUser.username, password: String = rpcTestUser.password ): ClientSession + + fun startRpcBroker( + serverName: String = "driver-rpc-server-${random63BitValue()}", + rpcUser: User = rpcTestUser, + maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE, + maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE, + customPort: HostAndPort? = null + ): ListenableFuture + + fun startInVmRpcBroker( + rpcUser: User = rpcTestUser, + maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE, + maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE + ): ListenableFuture + + fun startRpcServerWithBrokerRunning( + rpcUser: User = rpcTestUser, + nodeLegalName: X500Name = fakeNodeLegalName, + configuration: RPCServerConfiguration = RPCServerConfiguration.default, + ops: I, + brokerHandle: RpcBrokerHandle + ): RpcServerHandle } inline fun RPCDriverExposedDSLInterface.startInVmRpcClient( username: String = rpcTestUser.username, @@ -176,11 +199,17 @@ inline fun RPCDriverExposedDSLInterface.startRpcClient( interface RPCDriverInternalDSLInterface : DriverDSLInternalInterface, RPCDriverExposedDSLInterface -data class RpcServerHandle( - val hostAndPort: HostAndPort, +data class RpcBrokerHandle( + val hostAndPort: HostAndPort?, /** null if this is an InVM broker */ + val clientTransportConfiguration: TransportConfiguration, val serverControl: ActiveMQServerControl ) +data class RpcServerHandle( + val broker: RpcBrokerHandle, + val rpcServer: RPCServer +) + val rpcTestUser = User("user1", "test", permissions = emptySet()) val fakeNodeLegalName = X500Name("CN=not:a:valid:name") @@ -194,7 +223,7 @@ fun rpcDriver( debugPortAllocation: PortAllocation = globalDebugPortAllocation, systemProperties: Map = emptyMap(), useTestClock: Boolean = false, - networkMapStartStrategy: NetworkMapStartStrategy = FalseNetworkMap, + networkMapStartStrategy: NetworkMapStartStrategy = NetworkMapStartStrategy.Dedicated(startAutomatically = false), dsl: RPCDriverExposedDSLInterface.() -> A ) = genericDriver( driverDsl = RPCDriverDSL( @@ -293,21 +322,9 @@ data class RPCDriverDSL( maxBufferedBytesPerClient: Long, configuration: RPCServerConfiguration, ops: I - ): ListenableFuture { - return driverDSL.executorService.submit { - val artemisConfig = createInVmRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient) - val server = EmbeddedActiveMQ() - server.setConfiguration(artemisConfig) - server.setSecurityManager(SingleUserSecurityManager(rpcUser)) - server.start() - driverDSL.shutdownManager.registerShutdown { - server.activeMQServer.stop() - server.stop() - } - startRpcServerWithBrokerRunning( - rpcUser, nodeLegalName, configuration, ops, inVmClientTransportConfiguration, - server.activeMQServer.activeMQServerControl - ) + ): ListenableFuture { + return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker -> + startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker) } } @@ -344,22 +361,8 @@ data class RPCDriverDSL( customPort: HostAndPort?, ops: I ): ListenableFuture { - val hostAndPort = customPort ?: driverDSL.portAllocation.nextHostAndPort() - addressMustNotBeBound(driverDSL.executorService, hostAndPort) - return driverDSL.executorService.submit { - val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, driverDSL.driverDirectory / serverName, hostAndPort) - val server = ActiveMQServerImpl(artemisConfig, SingleUserSecurityManager(rpcUser)) - server.start() - driverDSL.shutdownManager.registerShutdown { - server.stop() - addressMustNotBeBound(driverDSL.executorService, hostAndPort).get() - } - val transportConfiguration = createNettyClientTransportConfiguration(hostAndPort) - startRpcServerWithBrokerRunning( - rpcUser, nodeLegalName, configuration, ops, transportConfiguration, - server.activeMQServerControl - ) - RpcServerHandle(hostAndPort, server.activeMQServerControl) + return startRpcBroker(serverName, rpcUser, maxFileSize, maxBufferedBytesPerClient, customPort).map { broker -> + startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker) } } @@ -401,16 +404,58 @@ data class RPCDriverDSL( return session } + override fun startRpcBroker( + serverName: String, + rpcUser: User, + maxFileSize: Int, + maxBufferedBytesPerClient: Long, + customPort: HostAndPort? + ): ListenableFuture { + val hostAndPort = customPort ?: driverDSL.portAllocation.nextHostAndPort() + addressMustNotBeBound(driverDSL.executorService, hostAndPort) + return driverDSL.executorService.submit { + val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, driverDSL.driverDirectory / serverName, hostAndPort) + val server = ActiveMQServerImpl(artemisConfig, SingleUserSecurityManager(rpcUser)) + server.start() + driverDSL.shutdownManager.registerShutdown { + server.stop() + addressMustNotBeBound(driverDSL.executorService, hostAndPort).get() + } + RpcBrokerHandle( + hostAndPort = hostAndPort, + clientTransportConfiguration = createNettyClientTransportConfiguration(hostAndPort), + serverControl = server.activeMQServerControl + ) + } + } - private fun startRpcServerWithBrokerRunning( + override fun startInVmRpcBroker(rpcUser: User, maxFileSize: Int, maxBufferedBytesPerClient: Long): ListenableFuture { + return driverDSL.executorService.submit { + val artemisConfig = createInVmRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient) + val server = EmbeddedActiveMQ() + server.setConfiguration(artemisConfig) + server.setSecurityManager(SingleUserSecurityManager(rpcUser)) + server.start() + driverDSL.shutdownManager.registerShutdown { + server.activeMQServer.stop() + server.stop() + } + RpcBrokerHandle( + hostAndPort = null, + clientTransportConfiguration = inVmClientTransportConfiguration, + serverControl = server.activeMQServer.activeMQServerControl + ) + } + } + + override fun startRpcServerWithBrokerRunning( rpcUser: User, nodeLegalName: X500Name, configuration: RPCServerConfiguration, ops: I, - transportConfiguration: TransportConfiguration, - serverControl: ActiveMQServerControl - ) { - val locator = ActiveMQClient.createServerLocatorWithoutHA(transportConfiguration).apply { + brokerHandle: RpcBrokerHandle + ): RpcServerHandle { + val locator = ActiveMQClient.createServerLocatorWithoutHA(brokerHandle.clientTransportConfiguration).apply { minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE } val userService = object : RPCUserService { @@ -430,7 +475,8 @@ data class RPCDriverDSL( rpcServer.close() locator.close() } - rpcServer.start(serverControl) + rpcServer.start(brokerHandle.serverControl) + return RpcServerHandle(brokerHandle, rpcServer) } }