mirror of
https://github.com/corda/corda.git
synced 2024-12-19 04:57:58 +00:00
rpc: Fix close/send deadlock and premature shutdown on empty observable, add test
This commit is contained in:
parent
ccd1c39492
commit
cb67fd504b
@ -1,10 +1,19 @@
|
||||
package net.corda.client
|
||||
|
||||
import net.corda.core.contracts.DOLLARS
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.random63BitValue
|
||||
import net.corda.core.serialization.OpaqueBytes
|
||||
import net.corda.node.driver.NodeInfoAndConfig
|
||||
import net.corda.node.driver.driver
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.config.configureTestSSL
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.toHostAndPort
|
||||
import net.corda.node.services.messaging.startProtocol
|
||||
import net.corda.node.services.startProtocolPermission
|
||||
import net.corda.node.services.transactions.ValidatingNotaryService
|
||||
import net.corda.protocols.CashCommand
|
||||
import net.corda.protocols.CashProtocol
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
import org.junit.After
|
||||
@ -15,17 +24,18 @@ import kotlin.concurrent.thread
|
||||
|
||||
class CordaRPCClientTest {
|
||||
|
||||
private val rpcUser = User("user1", "test", permissions = emptySet())
|
||||
private val rpcUser = User("user1", "test", permissions = setOf(startProtocolPermission<CashProtocol>()))
|
||||
private val stopDriver = CountDownLatch(1)
|
||||
private var driverThread: Thread? = null
|
||||
private lateinit var client: CordaRPCClient
|
||||
private lateinit var driverInfo: NodeInfoAndConfig
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
val driverStarted = CountDownLatch(1)
|
||||
driverThread = thread {
|
||||
driver {
|
||||
val driverInfo = startNode(rpcUsers = listOf(rpcUser)).get()
|
||||
driver(isDebug = true) {
|
||||
driverInfo = startNode(rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).get()
|
||||
client = CordaRPCClient(toHostAndPort(driverInfo.nodeInfo.address), configureTestSSL())
|
||||
driverStarted.countDown()
|
||||
stopDriver.await()
|
||||
@ -59,4 +69,18 @@ class CordaRPCClientTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `indefinite block bug`() {
|
||||
println("Starting client")
|
||||
client.start(rpcUser.username, rpcUser.password)
|
||||
println("Creating proxy")
|
||||
val proxy = client.proxy()
|
||||
println("Starting protocol")
|
||||
val protocolHandle = proxy.startProtocol(::CashProtocol, CashCommand.IssueCash(20.DOLLARS, OpaqueBytes.of(0), driverInfo.nodeInfo.legalIdentity, driverInfo.nodeInfo.legalIdentity))
|
||||
println("Started protocol, waiting on result")
|
||||
protocolHandle.progress.subscribe {
|
||||
println("PROGRESS $it")
|
||||
}
|
||||
println("Result: ${protocolHandle.returnValue.toBlocking().first()}")
|
||||
}
|
||||
}
|
||||
|
@ -88,16 +88,14 @@ class CordaRPCClientImpl(private val session: ClientSession,
|
||||
private val rpcLocation: Throwable) : Serializer<Observable<Any>>() {
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<Observable<Any>>): Observable<Any> {
|
||||
val handle = input.readInt(true)
|
||||
return sessionLock.withLock {
|
||||
var ob = addressToQueueObservables.getIfPresent(qName)
|
||||
if (ob == null) {
|
||||
ob = QueuedObservable(qName, rpcName, rpcLocation, this)
|
||||
addressToQueueObservables.put(qName, ob)
|
||||
val ob = sessionLock.withLock {
|
||||
addressToQueueObservables.getIfPresent(qName) ?: QueuedObservable(qName, rpcName, rpcLocation, this).apply {
|
||||
addressToQueueObservables.put(qName, this)
|
||||
}
|
||||
val result = ob.getForHandle(handle)
|
||||
rpcLog.trace { "Deserializing and connecting a new observable for $rpcName on $qName: $result" }
|
||||
result
|
||||
}
|
||||
val result = ob.getForHandle(handle)
|
||||
rpcLog.debug { "Deserializing and connecting a new observable for $rpcName on $qName: $result" }
|
||||
return result
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, `object`: Observable<Any>) {
|
||||
@ -142,7 +140,7 @@ class CordaRPCClientImpl(private val session: ClientSession,
|
||||
|
||||
// All invoked methods on the proxy end up here.
|
||||
val location = Throwable()
|
||||
rpcLog.trace {
|
||||
rpcLog.debug {
|
||||
val argStr = args?.joinToString() ?: ""
|
||||
"-> RPC -> ${method.name}($argStr): ${method.returnType}"
|
||||
}
|
||||
@ -152,7 +150,7 @@ class CordaRPCClientImpl(private val session: ClientSession,
|
||||
// sendRequest may return a reconfigured Kryo if the method returns observables.
|
||||
val kryo: Kryo = sendRequest(args, location, method) ?: createRPCKryo()
|
||||
val next = receiveResponse(kryo, method, timeout)
|
||||
rpcLog.trace { "<- RPC <- ${method.name} = $next" }
|
||||
rpcLog.debug { "<- RPC <- ${method.name} = $next" }
|
||||
return unwrapOrThrow(next)
|
||||
}
|
||||
|
||||
@ -257,7 +255,18 @@ class CordaRPCClientImpl(private val session: ClientSession,
|
||||
@Synchronized
|
||||
fun getForHandle(handle: Int): Observable<Any> {
|
||||
return observables.getOrPut(handle) {
|
||||
rootShared.filter { it.forHandle == handle }.map { it.what }.dematerialize<Any>().bufferUntilSubscribed().share()
|
||||
/**
|
||||
* Note that the order of bufferUntilSubscribed() -> dematerialize() is very important here.
|
||||
*
|
||||
* In particular doing it the other way around may result in the following edge case:
|
||||
* The RPC returns two (or more) Observables. The first Observable unsubscribes *during serialisation*,
|
||||
* before the second one is hit, causing the [rootShared] to unsubscribe and consequently closing
|
||||
* the underlying artemis queue, even though the second Observable was not even registered.
|
||||
*
|
||||
* The buffer -> dematerialize order ensures that the Observable may not unsubscribe until the caller
|
||||
* subscribes, which must be after full deserialisation and registering of all top level Observables.
|
||||
*/
|
||||
rootShared.filter { it.forHandle == handle }.map { it.what }.bufferUntilSubscribed().dematerialize<Any>().share()
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user