NOTICK - Add tests for handling of user errors in reconnecting observables (#5932)

* Add tests for handling of user errors in reconnecting observables

* detekt
This commit is contained in:
Dimos Raptis 2020-02-14 09:58:32 +00:00 committed by GitHub
parent da7a5cce4a
commit 70e8e69a87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 7 deletions

View File

@ -26,6 +26,7 @@ import net.corda.testing.node.internal.rpcDriver
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import java.lang.RuntimeException
import java.lang.Thread.sleep
import java.time.Duration
import java.util.concurrent.CountDownLatch
@ -163,6 +164,47 @@ class CordaRPCClientReconnectionTest {
}
}
@Test(timeout=300_000)
fun `when user code throws an error on a reconnecting observable, then onError is invoked and observable is unsubscribed successfully`() {
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS)) {
val normalLatch = CountDownLatch(1)
val errorLatch = CountDownLatch(1)
var observedEvents = 0
fun startNode(address: NetworkHostAndPort): NodeHandle {
return startNode(
providedName = CHARLIE_NAME,
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
customOverrides = mapOf("rpcSettings.address" to address.toString())
).getOrThrow()
}
val addresses = listOf(NetworkHostAndPort("localhost", portAllocator.nextPort()), NetworkHostAndPort("localhost", portAllocator.nextPort()))
startNode(addresses[0])
val client = CordaRPCClient(addresses)
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
val subscription = cashStatesFeed.updates.subscribe ({
normalLatch.countDown()
observedEvents++
throw RuntimeException()
}, {
errorLatch.countDown()
})
rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get()
rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get()
assertTrue { normalLatch.await(2, TimeUnit.SECONDS) }
assertTrue { errorLatch.await(2, TimeUnit.SECONDS) }
assertThat(subscription.isUnsubscribed).isTrue()
assertThat(observedEvents).isEqualTo(1)
}
}
}
@Test(timeout=300_000)
fun `an RPC call fails, when the maximum number of attempts is exceeded`() {
driver(DriverParameters(cordappsForAllNodes = emptyList())) {

View File

@ -1,5 +1,6 @@
package net.corda.client.rpc.internal
import net.corda.client.rpc.ConnectionFailureException
import net.corda.core.messaging.DataFeed
import rx.Observable
import rx.Subscriber
@ -54,16 +55,30 @@ class ReconnectingObservable<T> private constructor(subscriber: ReconnectingSubs
}
}
/**
* Depending on the type of error, the reaction is different:
* - If the error is coming from a connection disruption, we establish a new connection and re-wire the observable
* without letting the client notice at all.
* - In any other case, we let the error propagate to the client's observable. Both of the observables
* (this one and the client's one) will be automatically unsubscribed, since that's the semantics of onError.
*/
private fun scheduleResubscribe(error: Throwable) {
if (unsubscribed) return
reconnectingRPCConnection.observersPool.execute {
if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute
reconnectingRPCConnection.reconnectOnError(error)
// It can take a while to reconnect so we might find that we've shutdown in in the meantime
if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute
val newDataFeed = createDataFeed()
subscribeImmediately(newDataFeed)
if (error is ConnectionFailureException) {
reconnectingRPCConnection.observersPool.execute {
if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute
reconnectingRPCConnection.reconnectOnError(error)
// It can take a while to reconnect so we might find that we've shutdown in in the meantime
if (unsubscribed || reconnectingRPCConnection.isClosed()) return@execute
val newDataFeed = createDataFeed()
subscribeImmediately(newDataFeed)
}
} else {
val subscriber = checkNotNull(this.subscriber.get())
subscriber.onError(error)
}
}
}
}

View File

@ -1656,6 +1656,7 @@
<ID>TooGenericExceptionThrown:ClassLoadingUtilsTest.kt$ClassLoadingUtilsTest$throw RuntimeException()</ID>
<ID>TooGenericExceptionThrown:CommandParsers.kt$AzureParser.RegionConverter$throw Error("Unknown azure region: $value")</ID>
<ID>TooGenericExceptionThrown:ContractHierarchyTest.kt$ContractHierarchyTest.IndirectContractParent$throw RuntimeException("Boom!")</ID>
<ID>TooGenericExceptionThrown:CordaRPCClientReconnectionTest.kt$CordaRPCClientReconnectionTest$throw RuntimeException()</ID>
<ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated exit of ${request.amount} from $issuer, however there is no cash to exit!" )</ID>
<ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from $issuer, " + "however they only have $issuerQuantity!" )</ID>
<ID>TooGenericExceptionThrown:CrossCashTest.kt$throw Exception( "Generated payment of ${request.amount} from ${node.mainIdentity}, " + "however there is no cash from $issuer!" )</ID>