From cb67fd504b72a9e47aca70ab05d5f4eccac52b41 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 22 Nov 2016 15:25:44 +0000 Subject: [PATCH] rpc: Fix close/send deadlock and premature shutdown on empty observable, add test --- .../net/corda/client/CordaRPCClientTest.kt | 30 ++++++++++++++++-- .../corda/client/impl/CordaRPCClientImpl.kt | 31 ++++++++++++------- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt b/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt index 4794be099e..0e8bf05453 100644 --- a/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt +++ b/client/src/integration-test/kotlin/net/corda/client/CordaRPCClientTest.kt @@ -1,10 +1,19 @@ package net.corda.client +import net.corda.core.contracts.DOLLARS +import net.corda.core.node.services.ServiceInfo import net.corda.core.random63BitValue +import net.corda.core.serialization.OpaqueBytes +import net.corda.node.driver.NodeInfoAndConfig import net.corda.node.driver.driver import net.corda.node.services.User import net.corda.node.services.config.configureTestSSL import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.toHostAndPort +import net.corda.node.services.messaging.startProtocol +import net.corda.node.services.startProtocolPermission +import net.corda.node.services.transactions.ValidatingNotaryService +import net.corda.protocols.CashCommand +import net.corda.protocols.CashProtocol import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.After @@ -15,17 +24,18 @@ import kotlin.concurrent.thread class CordaRPCClientTest { - private val rpcUser = User("user1", "test", permissions = emptySet()) + private val rpcUser = User("user1", "test", permissions = setOf(startProtocolPermission())) private val stopDriver = CountDownLatch(1) private var driverThread: Thread? = null private lateinit var client: CordaRPCClient + private lateinit var driverInfo: NodeInfoAndConfig @Before fun start() { val driverStarted = CountDownLatch(1) driverThread = thread { - driver { - val driverInfo = startNode(rpcUsers = listOf(rpcUser)).get() + driver(isDebug = true) { + driverInfo = startNode(rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).get() client = CordaRPCClient(toHostAndPort(driverInfo.nodeInfo.address), configureTestSSL()) driverStarted.countDown() stopDriver.await() @@ -59,4 +69,18 @@ class CordaRPCClientTest { } } + @Test + fun `indefinite block bug`() { + println("Starting client") + client.start(rpcUser.username, rpcUser.password) + println("Creating proxy") + val proxy = client.proxy() + println("Starting protocol") + val protocolHandle = proxy.startProtocol(::CashProtocol, CashCommand.IssueCash(20.DOLLARS, OpaqueBytes.of(0), driverInfo.nodeInfo.legalIdentity, driverInfo.nodeInfo.legalIdentity)) + println("Started protocol, waiting on result") + protocolHandle.progress.subscribe { + println("PROGRESS $it") + } + println("Result: ${protocolHandle.returnValue.toBlocking().first()}") + } } diff --git a/client/src/main/kotlin/net/corda/client/impl/CordaRPCClientImpl.kt b/client/src/main/kotlin/net/corda/client/impl/CordaRPCClientImpl.kt index 396e5d293b..4d9c917b7c 100644 --- a/client/src/main/kotlin/net/corda/client/impl/CordaRPCClientImpl.kt +++ b/client/src/main/kotlin/net/corda/client/impl/CordaRPCClientImpl.kt @@ -88,16 +88,14 @@ class CordaRPCClientImpl(private val session: ClientSession, private val rpcLocation: Throwable) : Serializer>() { override fun read(kryo: Kryo, input: Input, type: Class>): Observable { val handle = input.readInt(true) - return sessionLock.withLock { - var ob = addressToQueueObservables.getIfPresent(qName) - if (ob == null) { - ob = QueuedObservable(qName, rpcName, rpcLocation, this) - addressToQueueObservables.put(qName, ob) + val ob = sessionLock.withLock { + addressToQueueObservables.getIfPresent(qName) ?: QueuedObservable(qName, rpcName, rpcLocation, this).apply { + addressToQueueObservables.put(qName, this) } - val result = ob.getForHandle(handle) - rpcLog.trace { "Deserializing and connecting a new observable for $rpcName on $qName: $result" } - result } + val result = ob.getForHandle(handle) + rpcLog.debug { "Deserializing and connecting a new observable for $rpcName on $qName: $result" } + return result } override fun write(kryo: Kryo, output: Output, `object`: Observable) { @@ -142,7 +140,7 @@ class CordaRPCClientImpl(private val session: ClientSession, // All invoked methods on the proxy end up here. val location = Throwable() - rpcLog.trace { + rpcLog.debug { val argStr = args?.joinToString() ?: "" "-> RPC -> ${method.name}($argStr): ${method.returnType}" } @@ -152,7 +150,7 @@ class CordaRPCClientImpl(private val session: ClientSession, // sendRequest may return a reconfigured Kryo if the method returns observables. val kryo: Kryo = sendRequest(args, location, method) ?: createRPCKryo() val next = receiveResponse(kryo, method, timeout) - rpcLog.trace { "<- RPC <- ${method.name} = $next" } + rpcLog.debug { "<- RPC <- ${method.name} = $next" } return unwrapOrThrow(next) } @@ -257,7 +255,18 @@ class CordaRPCClientImpl(private val session: ClientSession, @Synchronized fun getForHandle(handle: Int): Observable { return observables.getOrPut(handle) { - rootShared.filter { it.forHandle == handle }.map { it.what }.dematerialize().bufferUntilSubscribed().share() + /** + * Note that the order of bufferUntilSubscribed() -> dematerialize() is very important here. + * + * In particular doing it the other way around may result in the following edge case: + * The RPC returns two (or more) Observables. The first Observable unsubscribes *during serialisation*, + * before the second one is hit, causing the [rootShared] to unsubscribe and consequently closing + * the underlying artemis queue, even though the second Observable was not even registered. + * + * The buffer -> dematerialize order ensures that the Observable may not unsubscribe until the caller + * subscribes, which must be after full deserialisation and registering of all top level Observables. + */ + rootShared.filter { it.forHandle == handle }.map { it.what }.bufferUntilSubscribed().dematerialize().share() } }