CORDA-3091: Move executor thread management into CordaRPCConnection (#5491)

This commit is contained in:
Ryan Fowler 2019-09-26 17:48:07 +01:00 committed by Shams Asari
parent be6824dac4
commit 6ca4b589e2
10 changed files with 254 additions and 137 deletions

View File

@ -1,8 +1,8 @@
package net.corda.client.jfx.model
import javafx.beans.property.SimpleObjectProperty
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCConnection
import net.corda.core.contracts.ContractState
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
@ -48,7 +48,7 @@ class NodeMonitorModel : AutoCloseable {
val progressTracking: Observable<ProgressTrackingEvent> = progressTrackingSubject
val networkMap: Observable<MapChange> = networkMapSubject
private lateinit var rpc: CordaRPCOps
private lateinit var rpc: CordaRPCConnection
val proxyObservable = SimpleObjectProperty<CordaRPCOps>()
lateinit var notaryIdentities: List<Party>
@ -61,7 +61,7 @@ class NodeMonitorModel : AutoCloseable {
*/
override fun close() {
try {
(rpc as ReconnectingCordaRPCOps).close()
rpc.close()
} catch (e: Exception) {
logger.error("Error closing RPC connection to node", e)
}
@ -72,35 +72,42 @@ class NodeMonitorModel : AutoCloseable {
* TODO provide an unsubscribe mechanism
*/
fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
rpc = ReconnectingCordaRPCOps(nodeHostAndPort, username, password, CordaRPCClientConfiguration.DEFAULT)
proxyObservable.value = rpc
rpc = CordaRPCClient(nodeHostAndPort).start(username, password)
proxyObservable.value = rpc.proxy
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
val (statesSnapshot, vaultUpdates) = rpc.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE))
val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ ->
statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED
val (
statesSnapshot,
vaultUpdates
) = rpc.proxy.vaultTrackBy<ContractState>(
QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE)
)
val unconsumedStates =
statesSnapshot.states.filterIndexed { index, _ ->
statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED
}.toSet()
val consumedStates = statesSnapshot.states.toSet() - unconsumedStates
val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet())
vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject::onNext)
// Transactions
val (transactions, newTransactions) = @Suppress("DEPRECATION") rpc.internalVerifiedTransactionsFeed()
val (transactions, newTransactions) =
@Suppress("DEPRECATION") rpc.proxy.internalVerifiedTransactionsFeed()
newTransactions.startWith(transactions).subscribe(transactionsSubject::onNext)
// SM -> TX mapping
val (smTxMappings, futureSmTxMappings) = rpc.stateMachineRecordedTransactionMappingFeed()
val (smTxMappings, futureSmTxMappings) =
rpc.proxy.stateMachineRecordedTransactionMappingFeed()
futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject::onNext)
// Parties on network
val (parties, futurePartyUpdate) = rpc.networkMapFeed()
val (parties, futurePartyUpdate) = rpc.proxy.networkMapFeed()
futurePartyUpdate.startWith(parties.map(MapChange::Added)).subscribe(networkMapSubject::onNext)
val stateMachines = rpc.stateMachinesSnapshot()
val stateMachines = rpc.proxy.stateMachinesSnapshot()
notaryIdentities = rpc.notaryIdentities()
notaryIdentities = rpc.proxy.notaryIdentities()
// Extract the flow tracking stream
// TODO is there a nicer way of doing this? Stream of streams in general results in code like this...

View File

@ -56,8 +56,8 @@ class CordaRPCClientReconnectionTest {
maxReconnectAttempts = 5
))
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).proxy as ReconnectingCordaRPCOps).use {
val rpcOps = it
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
val networkParameters = rpcOps.networkParameters
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
cashStatesFeed.updates.subscribe { latch.countDown() }
@ -96,8 +96,8 @@ class CordaRPCClientReconnectionTest {
maxReconnectAttempts = 5
))
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).proxy as ReconnectingCordaRPCOps).use {
val rpcOps = it
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
val subscription = cashStatesFeed.updates.subscribe { latch.countDown() }
rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get()
@ -113,6 +113,7 @@ class CordaRPCClientReconnectionTest {
latch.await(4, TimeUnit.SECONDS)
}
}
}
}
@ -136,8 +137,8 @@ class CordaRPCClientReconnectionTest {
maxReconnectAttempts = 5
))
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).proxy as ReconnectingCordaRPCOps).use {
val rpcOps = it
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
val rpcOps = it.proxy as ReconnectingCordaRPCOps
val networkParameters = rpcOps.networkParameters
val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java)
cashStatesFeed.updates.subscribe { latch.countDown() }

View File

@ -27,6 +27,9 @@ import net.corda.serialization.internal.amqp.SerializationFactoryCacheKey
import net.corda.serialization.internal.amqp.SerializerFactory
import java.time.Duration
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
/**
* This class is essentially just a wrapper for an RPCConnection<CordaRPCOps> and can be treated identically.
@ -35,9 +38,10 @@ import java.util.*
*/
class CordaRPCConnection private constructor(
private val oneTimeConnection: RPCConnection<CordaRPCOps>?,
private val observersPool: ExecutorService?,
private val reconnectingCordaRPCOps: ReconnectingCordaRPCOps?
) : RPCConnection<CordaRPCOps> {
internal constructor(connection: RPCConnection<CordaRPCOps>?) : this(connection, null)
internal constructor(connection: RPCConnection<CordaRPCOps>?) : this(connection, null, null)
companion object {
@CordaInternal
@ -46,14 +50,20 @@ class CordaRPCConnection private constructor(
password: String,
addresses: List<NetworkHostAndPort>,
rpcConfiguration: CordaRPCClientConfiguration,
gracefulReconnect: GracefulReconnect
gracefulReconnect: GracefulReconnect,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null
): CordaRPCConnection {
return CordaRPCConnection(null, ReconnectingCordaRPCOps(
val observersPool: ExecutorService = Executors.newCachedThreadPool()
return CordaRPCConnection(null, observersPool, ReconnectingCordaRPCOps(
addresses,
username,
password,
rpcConfiguration,
gracefulReconnect
gracefulReconnect,
sslConfiguration,
classLoader,
observersPool
))
}
}
@ -69,7 +79,18 @@ class CordaRPCConnection private constructor(
override fun forceClose() = actualConnection.forceClose()
override fun close() = actualConnection.close()
override fun close() {
try {
actualConnection.close()
} finally {
observersPool?.apply {
shutdown()
if (!awaitTermination(@Suppress("MagicNumber")30, TimeUnit.SECONDS)) {
shutdownNow()
}
}
}
}
}
/**
@ -322,19 +343,36 @@ class CordaRPCClient private constructor(
) {
@JvmOverloads
constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT) : this(
hostAndPort = hostAndPort, haAddressPool = emptyList(), configuration = configuration
constructor(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
) : this(
hostAndPort = hostAndPort,
haAddressPool = emptyList(),
configuration = configuration
)
constructor(hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
classLoader: ClassLoader): this(hostAndPort, configuration, null, classLoader = classLoader)
constructor(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
classLoader: ClassLoader
): this(
hostAndPort,
configuration,
null,
classLoader = classLoader
)
constructor(
hostAndPort: NetworkHostAndPort,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null
) : this(hostAndPort = hostAndPort, haAddressPool = emptyList(), sslConfiguration = sslConfiguration, classLoader = classLoader)
) : this(
hostAndPort = hostAndPort,
haAddressPool = emptyList(),
sslConfiguration = sslConfiguration,
classLoader = classLoader
)
@JvmOverloads
constructor(
@ -342,7 +380,13 @@ class CordaRPCClient private constructor(
configuration: CordaRPCClientConfiguration,
sslConfiguration: ClientRpcSslOptions?,
classLoader: ClassLoader? = null
) : this(hostAndPort = hostAndPort, haAddressPool = emptyList(), configuration = configuration, sslConfiguration = sslConfiguration, classLoader = classLoader)
) : this(
hostAndPort = hostAndPort,
haAddressPool = emptyList(),
configuration = configuration,
sslConfiguration = sslConfiguration,
classLoader = classLoader
)
@JvmOverloads
constructor(
@ -350,7 +394,13 @@ class CordaRPCClient private constructor(
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null
) : this(hostAndPort = null, haAddressPool = haAddressPool, configuration = configuration, sslConfiguration = sslConfiguration, classLoader = classLoader)
) : this(
hostAndPort = null,
haAddressPool = haAddressPool,
configuration = configuration,
sslConfiguration = sslConfiguration,
classLoader = classLoader
)
// Here to keep the keep ABI compatibility happy
companion object {}
@ -362,11 +412,23 @@ class CordaRPCClient private constructor(
try {
val cache = Caffeine.newBuilder().maximumSize(128).build<SerializationFactoryCacheKey, SerializerFactory>().asMap()
// If the client has explicitly provided a classloader use this one to scan for custom serializers, otherwise use the current one.
// If the client has explicitly provided a classloader use this one to scan for custom serializers,
// otherwise use the current one.
val serializationClassLoader = this.classLoader ?: this.javaClass.classLoader
val customSerializers = createInstancesOfClassesImplementing(serializationClassLoader, SerializationCustomSerializer::class.java)
val serializationWhitelists = ServiceLoader.load(SerializationWhitelist::class.java, serializationClassLoader).toSet()
AMQPClientSerializationScheme.initialiseSerialization(serializationClassLoader, customSerializers, serializationWhitelists, cache)
val customSerializers = createInstancesOfClassesImplementing(
serializationClassLoader,
SerializationCustomSerializer::class.java
)
val serializationWhitelists = ServiceLoader.load(
SerializationWhitelist::class.java,
serializationClassLoader
).toSet()
AMQPClientSerializationScheme.initialiseSerialization(
serializationClassLoader,
customSerializers,
serializationWhitelists,
cache
)
} catch (e: IllegalStateException) {
// Race e.g. two of these constructed in parallel, ignore.
}
@ -401,7 +463,11 @@ class CordaRPCClient private constructor(
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
@JvmOverloads
fun start(username: String, password: String, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection {
fun start(
username: String,
password: String,
gracefulReconnect: GracefulReconnect? = null
): CordaRPCConnection {
return start(username, password, null, null, gracefulReconnect)
}
@ -418,7 +484,12 @@ class CordaRPCClient private constructor(
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
@JvmOverloads
fun start(username: String, password: String, targetLegalIdentity: CordaX500Name, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection {
fun start(
username: String,
password: String,
targetLegalIdentity: CordaX500Name,
gracefulReconnect: GracefulReconnect? = null
): CordaRPCConnection {
return start(username, password, null, null, targetLegalIdentity, gracefulReconnect)
}
@ -436,7 +507,13 @@ class CordaRPCClient private constructor(
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
@JvmOverloads
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection {
fun start(
username: String,
password: String,
externalTrace: Trace?,
impersonatedActor: Actor?,
gracefulReconnect: GracefulReconnect? = null
): CordaRPCConnection {
return start(username, password, externalTrace, impersonatedActor, null, gracefulReconnect)
}
@ -457,7 +534,14 @@ class CordaRPCClient private constructor(
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
@JvmOverloads
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection {
fun start(
username: String,
password: String,
externalTrace: Trace?,
impersonatedActor: Actor?,
targetLegalIdentity: CordaX500Name?,
gracefulReconnect: GracefulReconnect? = null
): CordaRPCConnection {
val addresses = if (haAddressPool.isEmpty()) {
listOf(hostAndPort!!)
} else {
@ -465,9 +549,23 @@ class CordaRPCClient private constructor(
}
return if (gracefulReconnect != null) {
CordaRPCConnection.createWithGracefulReconnection(username, password, addresses, configuration, gracefulReconnect)
CordaRPCConnection.createWithGracefulReconnection(
username,
password,
addresses,
configuration,
gracefulReconnect,
sslConfiguration
)
} else {
CordaRPCConnection(getRpcClient().start(InternalCordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity))
CordaRPCConnection(getRpcClient().start(
InternalCordaRPCOps::class.java,
username,
password,
externalTrace,
impersonatedActor,
targetLegalIdentity
))
}
}

View File

@ -32,37 +32,24 @@ import java.util.concurrent.TimeUnit
/**
* Wrapper over [CordaRPCOps] that handles exceptions when the node or the connection to the node fail.
*
* All operations are retried on failure, except flow start operations that die before receiving a valid [FlowHandle], in which case a [CouldNotStartFlowException] is thrown.
* All operations are retried on failure, except flow start operations that die before receiving a valid [FlowHandle], in which case a
* [CouldNotStartFlowException] is thrown.
*
* When calling methods that return a [DataFeed] like [CordaRPCOps.vaultTrackBy], the returned [DataFeed.updates] object will no longer
* be a usable [rx.Observable] but an instance of [ReconnectingObservable].
* The caller has to explicitly cast to [ReconnectingObservable] and call [ReconnectingObservable.subscribe]. If used as an [rx.Observable] it will just fail.
* The caller has to explicitly cast to [ReconnectingObservable] and call [ReconnectingObservable.subscribe]. If used as an [rx.Observable]
* it will just fail.
* The returned [DataFeed.snapshot] is the snapshot as it was when the feed was first retrieved.
*
* Note: There is no guarantee that observations will not be lost.
*
* *This class is not a stable API. Any project that wants to use it, must copy and paste it.*
*/
// TODO The executor service is not needed. All we need is a single thread that deals with reconnecting and onto which ReconnectingObservables
// and other things can attach themselves as listeners for reconnect events.
// TODO The executor service is not needed. All we need is a single thread that deals with reconnecting and onto which
// ReconnectingObservables and other things can attach themselves as listeners for reconnect events.
class ReconnectingCordaRPCOps private constructor(
val reconnectingRPCConnection: ReconnectingRPCConnection,
private val observersPool: ExecutorService,
private val userPool: Boolean
) : AutoCloseable, InternalCordaRPCOps by proxy(reconnectingRPCConnection, observersPool) {
// Constructors that mirror CordaRPCClient.
constructor(
nodeHostAndPort: NetworkHostAndPort,
username: String,
password: String,
rpcConfiguration: CordaRPCClientConfiguration,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null,
observersPool: ExecutorService? = null
) : this(
ReconnectingRPCConnection(listOf(nodeHostAndPort), username, password, rpcConfiguration, sslConfiguration, classLoader),
observersPool ?: Executors.newCachedThreadPool(),
observersPool != null)
val reconnectingRPCConnection: ReconnectingRPCConnection
) : InternalCordaRPCOps by proxy(reconnectingRPCConnection) {
constructor(
nodeHostAndPorts: List<NetworkHostAndPort>,
username: String,
@ -71,18 +58,23 @@ class ReconnectingCordaRPCOps private constructor(
gracefulReconnect: GracefulReconnect? = null,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null,
observersPool: ExecutorService? = null
) : this(
ReconnectingRPCConnection(nodeHostAndPorts, username, password, rpcConfiguration, sslConfiguration, classLoader, gracefulReconnect),
observersPool ?: Executors.newCachedThreadPool(),
observersPool != null)
observersPool: ExecutorService
) : this(ReconnectingRPCConnection(
nodeHostAndPorts,
username,
password,
rpcConfiguration,
sslConfiguration,
classLoader,
gracefulReconnect,
observersPool))
private companion object {
private val log = contextLogger()
private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection, observersPool: ExecutorService): InternalCordaRPCOps {
private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection): InternalCordaRPCOps {
return Proxy.newProxyInstance(
this::class.java.classLoader,
arrayOf(InternalCordaRPCOps::class.java),
ErrorInterceptingHandler(reconnectingRPCConnection, observersPool)) as InternalCordaRPCOps
ErrorInterceptingHandler(reconnectingRPCConnection)) as InternalCordaRPCOps
}
}
private val retryFlowsPool = Executors.newScheduledThreadPool(1)
@ -125,7 +117,8 @@ class ReconnectingCordaRPCOps private constructor(
val rpcConfiguration: CordaRPCClientConfiguration,
val sslConfiguration: ClientRpcSslOptions? = null,
val classLoader: ClassLoader?,
val gracefulReconnect: GracefulReconnect? = null
val gracefulReconnect: GracefulReconnect? = null,
val observersPool: ExecutorService
) : RPCConnection<CordaRPCOps> {
private var currentRPCConnection: CordaRPCConnection? = null
enum class CurrentState {
@ -178,12 +171,18 @@ class ReconnectingCordaRPCOps private constructor(
return currentRPCConnection!!
}
private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, roundRobinIndex: Int = 0): CordaRPCConnection {
private tailrec fun establishConnectionWithRetry(
retryInterval: Duration = 1.seconds,
roundRobinIndex: Int = 0
): CordaRPCConnection {
val attemptedAddress = nodeHostAndPorts[roundRobinIndex]
log.info("Connecting to: $attemptedAddress")
try {
return CordaRPCClient(
attemptedAddress, rpcConfiguration.copy(connectionMaxRetryInterval = retryInterval, maxReconnectAttempts = 1), sslConfiguration, classLoader
attemptedAddress,
rpcConfiguration.copy(connectionMaxRetryInterval = retryInterval, maxReconnectAttempts = 1),
sslConfiguration,
classLoader
).start(username, password).also {
// Check connection is truly operational before returning it.
require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) {
@ -240,7 +239,7 @@ class ReconnectingCordaRPCOps private constructor(
currentRPCConnection?.close()
}
}
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection, val observersPool: ExecutorService) : InvocationHandler {
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler {
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
private fun checkIfIsStartFlow(method: Method, e: InvocationTargetException) {
@ -290,7 +289,7 @@ class ReconnectingCordaRPCOps private constructor(
DataFeed::class.java -> {
// Intercept the data feed methods and return a ReconnectingObservable instance
val initialFeed: DataFeed<Any, Any?> = uncheckedCast(doInvoke(method, args))
val observable = ReconnectingObservable(reconnectingRPCConnection, observersPool, initialFeed) {
val observable = ReconnectingObservable(reconnectingRPCConnection, initialFeed) {
// This handles reconnecting and creates new feeds.
uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args))
}
@ -302,8 +301,7 @@ class ReconnectingCordaRPCOps private constructor(
}
}
override fun close() {
if (!userPool) observersPool.shutdown()
fun close() {
retryFlowsPool.shutdown()
reconnectingRPCConnection.forceClose()
}

View File

@ -4,21 +4,18 @@ import net.corda.core.messaging.DataFeed
import rx.Observable
import rx.Subscriber
import rx.Subscription
import java.util.concurrent.ExecutorService
import java.util.concurrent.atomic.AtomicReference
class ReconnectingObservable<T> private constructor(subscriber: ReconnectingSubscriber<T>) : Observable<T>(subscriber) {
constructor(
reconnectingRPCConnection: ReconnectingCordaRPCOps.ReconnectingRPCConnection,
executor: ExecutorService,
initialDataFeed: DataFeed<*, T>,
createDataFeed: () -> DataFeed<*, T>
) : this(ReconnectingSubscriber(reconnectingRPCConnection, executor, initialDataFeed, createDataFeed))
) : this(ReconnectingSubscriber(reconnectingRPCConnection, initialDataFeed, createDataFeed))
private class ReconnectingSubscriber<T>(
private val reconnectingRPCConnection: ReconnectingCordaRPCOps.ReconnectingRPCConnection,
private val executor: ExecutorService,
private val initialDataFeed: DataFeed<*, T>,
private val createDataFeed: () -> DataFeed<*, T>
) : OnSubscribe<T>, Subscription {
@ -59,7 +56,7 @@ class ReconnectingObservable<T> private constructor(subscriber: ReconnectingSubs
private fun scheduleResubscribe(error: Throwable) {
if (unsubscribed) return
executor.execute {
reconnectingRPCConnection.observersPool.execute {
if (unsubscribed) return@execute
reconnectingRPCConnection.reconnectOnError(error)
val newDataFeed = createDataFeed()

File diff suppressed because one or more lines are too long

View File

@ -133,7 +133,8 @@ class RpcReconnectTests {
}
val reconnect = GracefulReconnect(onDisconnect = { numDisconnects++ }, onReconnect = onReconnect)
val client = CordaRPCClient(addressesForRpc)
val bankAReconnectingRpc = client.start(demoUser.username, demoUser.password, gracefulReconnect = reconnect).proxy as ReconnectingCordaRPCOps
val bankAReconnectingRPCConnection = client.start(demoUser.username, demoUser.password, gracefulReconnect = reconnect)
val bankAReconnectingRpc = bankAReconnectingRPCConnection.proxy as ReconnectingCordaRPCOps
// DOCEND rpcReconnectingRPC
// Observe the vault and collect the observations.
@ -330,7 +331,7 @@ class RpcReconnectTests {
// Stop the observers.
vaultSubscription.unsubscribe()
stateMachineSubscription.unsubscribe()
bankAReconnectingRpc.close()
bankAReconnectingRPCConnection.close()
}
proxy.close()

View File

@ -1,8 +1,8 @@
package net.corda.bank.api
import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.GracefulReconnect
import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
@ -35,7 +35,9 @@ object BankOfCordaClientApi {
*
* @return a payment transaction (following successful issuance of cash to self).
*/
fun requestRPCIssue(rpcAddress: NetworkHostAndPort, params: IssueRequestParams): SignedTransaction = requestRPCIssueHA(listOf(rpcAddress), params)
fun requestRPCIssue(rpcAddress: NetworkHostAndPort, params: IssueRequestParams): SignedTransaction {
return requestRPCIssueHA(listOf(rpcAddress), params)
}
/**
* RPC API
@ -44,20 +46,28 @@ object BankOfCordaClientApi {
*/
fun requestRPCIssueHA(availableRpcServers: List<NetworkHostAndPort>, params: IssueRequestParams): SignedTransaction {
// TODO: privileged security controls required
ReconnectingCordaRPCOps(availableRpcServers, BOC_RPC_USER, BOC_RPC_PWD, CordaRPCClientConfiguration.DEFAULT).use { rpc->
rpc.waitUntilNetworkReady().getOrThrow()
CordaRPCClient(availableRpcServers)
.start(BOC_RPC_USER, BOC_RPC_PWD, gracefulReconnect = GracefulReconnect()).use { rpc->
rpc.proxy.waitUntilNetworkReady().getOrThrow()
// Resolve parties via RPC
val issueToParty = rpc.wellKnownPartyFromX500Name(params.issueToPartyName)
val issueToParty = rpc.proxy.wellKnownPartyFromX500Name(params.issueToPartyName)
?: throw IllegalStateException("Unable to locate ${params.issueToPartyName} in Network Map Service")
val notaryLegalIdentity = rpc.notaryIdentities().firstOrNull { it.name == params.notaryName }
val notaryLegalIdentity = rpc.proxy.notaryIdentities().firstOrNull { it.name == params.notaryName }
?: throw IllegalStateException("Couldn't locate notary ${params.notaryName} in NetworkMapCache")
val anonymous = true
val issuerBankPartyRef = OpaqueBytes.of(params.issuerBankPartyRef.toByte())
logger.info("${rpc.nodeInfo()} issuing ${params.amount} to transfer to $issueToParty ...")
return rpc.startFlow(::CashIssueAndPaymentFlow, params.amount, issuerBankPartyRef, issueToParty, anonymous, notaryLegalIdentity)
logger.info("${rpc.proxy.nodeInfo()} issuing ${params.amount} to transfer to $issueToParty ...")
return rpc.proxy.startFlow(
::CashIssueAndPaymentFlow,
params.amount,
issuerBankPartyRef,
issueToParty,
anonymous,
notaryLegalIdentity
)
.returnValue.getOrThrow().stx
}
}

View File

@ -3,8 +3,9 @@ package net.corda.webserver.internal
import com.google.common.html.HtmlEscapers.htmlEscaper
import io.netty.channel.unix.Errors
import net.corda.client.jackson.JacksonSupport
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.GracefulReconnect
import net.corda.core.internal.errors.AddressBindingException
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.contextLogger
@ -46,8 +47,12 @@ class NodeWebServer(val config: WebServerConfig) {
}
fun run() {
while (server.isRunning) {
Thread.sleep(100) // TODO: Redesign
try {
while (server.isRunning) {
Thread.sleep(100) // TODO: Redesign
}
} finally {
rpc.close()
}
}
@ -75,7 +80,11 @@ class NodeWebServer(val config: WebServerConfig) {
sslContextFactory.setIncludeProtocols("TLSv1.2")
sslContextFactory.setExcludeCipherSuites(".*NULL.*", ".*RC4.*", ".*MD5.*", ".*DES.*", ".*DSS.*")
sslContextFactory.setIncludeCipherSuites(".*AES.*GCM.*")
val sslConnector = ServerConnector(server, SslConnectionFactory(sslContextFactory, "http/1.1"), HttpConnectionFactory(httpsConfiguration))
val sslConnector = ServerConnector(
server,
SslConnectionFactory(sslContextFactory, "http/1.1"),
HttpConnectionFactory(httpsConfiguration)
)
sslConnector.port = address.port
sslConnector
} else {
@ -175,9 +184,21 @@ class NodeWebServer(val config: WebServerConfig) {
}
}
private fun reconnectingCordaRPCOps() = ReconnectingCordaRPCOps(config.rpcAddress, config.runAs.username , config.runAs.password, CordaRPCClientConfiguration.DEFAULT, null, javaClass.classLoader)
private lateinit var rpc: CordaRPCConnection
private fun reconnectingCordaRPCOps(): CordaRPCOps {
rpc = CordaRPCClient(config.rpcAddress, null, javaClass.classLoader)
.start(
config.runAs.username,
config.runAs.password,
GracefulReconnect()
)
return rpc.proxy
}
/** Fetch WebServerPluginRegistry classes registered in META-INF/services/net.corda.webserver.services.WebServerPluginRegistry files that exist in the classpath */
/**
* Fetch WebServerPluginRegistry classes registered in META-INF/services/net.corda.webserver.services.WebServerPluginRegistry
* files that exist in the classpath
*/
val pluginRegistries: List<WebServerPluginRegistry> by lazy {
ServiceLoader.load(WebServerPluginRegistry::class.java).toList()
}

View File

@ -11,8 +11,8 @@ import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.StringToMethodCallParser
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
@ -22,7 +22,6 @@ import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.internal.packageName_
import net.corda.core.messaging.*
import net.corda.tools.shell.utlities.ANSIProgressRenderer
import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer
@ -91,21 +90,24 @@ object InteractiveShell {
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) {
rpcOps = { username: String, password: String ->
if (standalone) {
ReconnectingCordaRPCOps(configuration.hostAndPort, username, password, CordaRPCClientConfiguration.DEFAULT, configuration.ssl, classLoader).also {
rpcConn = it
}
val connection = if (standalone) {
CordaRPCClient(
configuration.hostAndPort,
configuration.ssl,
classLoader
).start(username, password, gracefulReconnect = GracefulReconnect())
} else {
val client = CordaRPCClient(hostAndPort = configuration.hostAndPort,
CordaRPCClient(
hostAndPort = configuration.hostAndPort,
configuration = CordaRPCClientConfiguration.DEFAULT.copy(
maxReconnectAttempts = 1
),
sslConfiguration = configuration.ssl,
classLoader = classLoader)
val connection = client.start(username, password)
rpcConn = connection
connection.proxy as InternalCordaRPCOps
classLoader = classLoader
).start(username, password)
}
rpcConn = connection
connection.proxy as InternalCordaRPCOps
}
_startShell(configuration, classLoader)
}