Merge remote-tracking branch 'remotes/open/master' into merges/march-13-14-55

# Conflicts:
#	build.gradle
#	docs/source/_static/versions
#	docs/source/running-a-node.rst
This commit is contained in:
sollecitom
2018-03-13 15:02:58 +00:00
9 changed files with 91 additions and 30 deletions

View File

@ -326,6 +326,46 @@ class RPCStabilityTests {
}
}
interface NoOps : RPCOps {
fun subscribe(): Observable<Nothing>
}
@Test
fun `observables error when connection breaks`() {
rpcDriver {
val ops = object : NoOps {
override val protocolVersion = 0
override fun subscribe(): Observable<Nothing> {
return PublishSubject.create<Nothing>()
}
}
val serverFollower = shutdownManager.follower()
val serverPort = startRpcServer<NoOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<NoOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow()
var terminateHandlerCalled = false
var errorHandlerCalled = false
val subscription = client.subscribe()
.doOnTerminate{ terminateHandlerCalled = true }
.doOnError { errorHandlerCalled = true }
.subscribe()
serverFollower.shutdown()
Thread.sleep(100)
assertTrue(terminateHandlerCalled)
assertTrue(errorHandlerCalled)
assertTrue(subscription.isUnsubscribed)
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
}
interface ThreadOps : RPCOps {
fun sendMessage(id: Int, msgNo: Int): String
}