Fixed some problem I discovered while working on another story (#3901)

* - Fixed some problems with error handling for Observables.
- Eliminated incorrect double `stop()` call for RpcBroker.
- Added `Schedulers.shutdown()` call in `stop()` implementation for Node and Driver, to avoid stuck processes when observable pipelines go wrong.

* Fixed a missing import.

* Removed `Schedulers.shutdown()` for now.

* Fixed an issue with `pendingFlowsCount()` function.
This commit is contained in:
Michele Sollecito 2018-09-06 13:11:41 +01:00 committed by GitHub
parent ca9649ec0f
commit 584387d5ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 6 additions and 9 deletions

View File

@ -432,7 +432,7 @@ fun CordaRPCOps.pendingFlowsCount(): DataFeed<Int, Pair<Int, Int>> {
}
}
}.subscribe()
if (completedFlowsCount == 0) {
if (pendingFlowsCount == 0) {
updates.onCompleted()
}
return DataFeed(pendingFlowsCount, updates)

View File

@ -272,7 +272,6 @@ open class Node(configuration: NodeConfiguration,
ArtemisRpcBroker.withoutSsl(configuration.p2pSslOptions, this.address, adminAddress, securityManager, MAX_RPC_MESSAGE_SIZE, jmxMonitoringHttpPort != null, rpcBrokerDirectory, shouldStartLocalShell())
}
}
rpcBroker!!.closeOnStop()
rpcBroker!!.addresses
}
}

View File

@ -310,12 +310,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
return
}
eventsSubscription = p2pConsumer!!.messages
.doOnError { error -> throw error }
.doOnNext { message -> deliver(message) }
// this `run()` method is semantically meant to block until the message consumption runs, hence the latch here
.doOnCompleted(latch::countDown)
.doOnError { error -> throw error }
.subscribe()
.subscribe({ message -> deliver(message) }, { error -> throw error })
p2pConsumer!!
}
consumer.start()

View File

@ -878,7 +878,7 @@ private class NetworkVisibilityController {
val (snapshot, updates) = rpc.networkMapFeed()
visibleNodeCount = snapshot.size
checkIfAllVisible()
subscription = updates.subscribe {
subscription = updates.subscribe({
when (it) {
is NetworkMapCache.MapChange.Added -> {
visibleNodeCount++
@ -892,7 +892,9 @@ private class NetworkVisibilityController {
// Nothing to do here but better being exhaustive.
}
}
}
}, { _ ->
// Nothing to do on errors here.
})
return future
}
@ -963,7 +965,6 @@ fun <DI : DriverDSL, D : InternalDriverDSL, A> genericDriver(
DriverDSLImpl.log.error("Driver shutting down because of exception", exception)
throw exception
} finally {
driverDsl.shutdown()
shutdownHook.cancel()
serializationEnv.unset()
}