diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt index 65e026bd2f..0e6ab2a048 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt @@ -1,8 +1,8 @@ package net.corda.client.jfx.model import javafx.beans.property.SimpleObjectProperty -import net.corda.client.rpc.CordaRPCClientConfiguration -import net.corda.client.rpc.internal.ReconnectingCordaRPCOps +import net.corda.client.rpc.CordaRPCClient +import net.corda.client.rpc.CordaRPCConnection import net.corda.core.contracts.ContractState import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party @@ -48,7 +48,7 @@ class NodeMonitorModel : AutoCloseable { val progressTracking: Observable = progressTrackingSubject val networkMap: Observable = networkMapSubject - private lateinit var rpc: CordaRPCOps + private lateinit var rpc: CordaRPCConnection val proxyObservable = SimpleObjectProperty() lateinit var notaryIdentities: List @@ -61,7 +61,7 @@ class NodeMonitorModel : AutoCloseable { */ override fun close() { try { - (rpc as ReconnectingCordaRPCOps).close() + rpc.close() } catch (e: Exception) { logger.error("Error closing RPC connection to node", e) } @@ -72,35 +72,42 @@ class NodeMonitorModel : AutoCloseable { * TODO provide an unsubscribe mechanism */ fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) { - rpc = ReconnectingCordaRPCOps(nodeHostAndPort, username, password, CordaRPCClientConfiguration.DEFAULT) - - proxyObservable.value = rpc + rpc = CordaRPCClient(nodeHostAndPort).start(username, password) + proxyObservable.value = rpc.proxy // Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates - val (statesSnapshot, vaultUpdates) = rpc.vaultTrackBy(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL), - PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE)) - val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ -> - statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED + val ( + statesSnapshot, + vaultUpdates + ) = rpc.proxy.vaultTrackBy( + QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL), + PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE) + ) + val unconsumedStates = + statesSnapshot.states.filterIndexed { index, _ -> + statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED }.toSet() val consumedStates = statesSnapshot.states.toSet() - unconsumedStates val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet()) vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject::onNext) // Transactions - val (transactions, newTransactions) = @Suppress("DEPRECATION") rpc.internalVerifiedTransactionsFeed() + val (transactions, newTransactions) = + @Suppress("DEPRECATION") rpc.proxy.internalVerifiedTransactionsFeed() newTransactions.startWith(transactions).subscribe(transactionsSubject::onNext) // SM -> TX mapping - val (smTxMappings, futureSmTxMappings) = rpc.stateMachineRecordedTransactionMappingFeed() + val (smTxMappings, futureSmTxMappings) = + rpc.proxy.stateMachineRecordedTransactionMappingFeed() futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject::onNext) // Parties on network - val (parties, futurePartyUpdate) = rpc.networkMapFeed() + val (parties, futurePartyUpdate) = rpc.proxy.networkMapFeed() futurePartyUpdate.startWith(parties.map(MapChange::Added)).subscribe(networkMapSubject::onNext) - val stateMachines = rpc.stateMachinesSnapshot() + val stateMachines = rpc.proxy.stateMachinesSnapshot() - notaryIdentities = rpc.notaryIdentities() + notaryIdentities = rpc.proxy.notaryIdentities() // Extract the flow tracking stream // TODO is there a nicer way of doing this? Stream of streams in general results in code like this... diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt index 67277b05a0..cee01f9004 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpcreconnect/CordaRPCClientReconnectionTest.kt @@ -56,8 +56,8 @@ class CordaRPCClientReconnectionTest { maxReconnectAttempts = 5 )) - (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).proxy as ReconnectingCordaRPCOps).use { - val rpcOps = it + (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use { + val rpcOps = it.proxy as ReconnectingCordaRPCOps val networkParameters = rpcOps.networkParameters val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) cashStatesFeed.updates.subscribe { latch.countDown() } @@ -96,8 +96,8 @@ class CordaRPCClientReconnectionTest { maxReconnectAttempts = 5 )) - (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).proxy as ReconnectingCordaRPCOps).use { - val rpcOps = it + (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use { + val rpcOps = it.proxy as ReconnectingCordaRPCOps val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) val subscription = cashStatesFeed.updates.subscribe { latch.countDown() } rpcOps.startTrackedFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0), defaultNotaryIdentity).returnValue.get() @@ -113,6 +113,7 @@ class CordaRPCClientReconnectionTest { latch.await(4, TimeUnit.SECONDS) } } + } } @@ -136,8 +137,8 @@ class CordaRPCClientReconnectionTest { maxReconnectAttempts = 5 )) - (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).proxy as ReconnectingCordaRPCOps).use { - val rpcOps = it + (client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use { + val rpcOps = it.proxy as ReconnectingCordaRPCOps val networkParameters = rpcOps.networkParameters val cashStatesFeed = rpcOps.vaultTrack(Cash.State::class.java) cashStatesFeed.updates.subscribe { latch.countDown() } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index dace40f2fa..5ae85a3ca9 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -27,6 +27,9 @@ import net.corda.serialization.internal.amqp.SerializationFactoryCacheKey import net.corda.serialization.internal.amqp.SerializerFactory import java.time.Duration import java.util.* +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit /** * This class is essentially just a wrapper for an RPCConnection and can be treated identically. @@ -35,9 +38,10 @@ import java.util.* */ class CordaRPCConnection private constructor( private val oneTimeConnection: RPCConnection?, + private val observersPool: ExecutorService?, private val reconnectingCordaRPCOps: ReconnectingCordaRPCOps? ) : RPCConnection { - internal constructor(connection: RPCConnection?) : this(connection, null) + internal constructor(connection: RPCConnection?) : this(connection, null, null) companion object { @CordaInternal @@ -46,14 +50,20 @@ class CordaRPCConnection private constructor( password: String, addresses: List, rpcConfiguration: CordaRPCClientConfiguration, - gracefulReconnect: GracefulReconnect + gracefulReconnect: GracefulReconnect, + sslConfiguration: ClientRpcSslOptions? = null, + classLoader: ClassLoader? = null ): CordaRPCConnection { - return CordaRPCConnection(null, ReconnectingCordaRPCOps( + val observersPool: ExecutorService = Executors.newCachedThreadPool() + return CordaRPCConnection(null, observersPool, ReconnectingCordaRPCOps( addresses, username, password, rpcConfiguration, - gracefulReconnect + gracefulReconnect, + sslConfiguration, + classLoader, + observersPool )) } } @@ -69,7 +79,18 @@ class CordaRPCConnection private constructor( override fun forceClose() = actualConnection.forceClose() - override fun close() = actualConnection.close() + override fun close() { + try { + actualConnection.close() + } finally { + observersPool?.apply { + shutdown() + if (!awaitTermination(@Suppress("MagicNumber")30, TimeUnit.SECONDS)) { + shutdownNow() + } + } + } + } } /** @@ -322,19 +343,36 @@ class CordaRPCClient private constructor( ) { @JvmOverloads - constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT) : this( - hostAndPort = hostAndPort, haAddressPool = emptyList(), configuration = configuration + constructor( + hostAndPort: NetworkHostAndPort, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT + ) : this( + hostAndPort = hostAndPort, + haAddressPool = emptyList(), + configuration = configuration ) - constructor(hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, - classLoader: ClassLoader): this(hostAndPort, configuration, null, classLoader = classLoader) + constructor( + hostAndPort: NetworkHostAndPort, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, + classLoader: ClassLoader + ): this( + hostAndPort, + configuration, + null, + classLoader = classLoader + ) constructor( hostAndPort: NetworkHostAndPort, sslConfiguration: ClientRpcSslOptions? = null, classLoader: ClassLoader? = null - ) : this(hostAndPort = hostAndPort, haAddressPool = emptyList(), sslConfiguration = sslConfiguration, classLoader = classLoader) + ) : this( + hostAndPort = hostAndPort, + haAddressPool = emptyList(), + sslConfiguration = sslConfiguration, + classLoader = classLoader + ) @JvmOverloads constructor( @@ -342,7 +380,13 @@ class CordaRPCClient private constructor( configuration: CordaRPCClientConfiguration, sslConfiguration: ClientRpcSslOptions?, classLoader: ClassLoader? = null - ) : this(hostAndPort = hostAndPort, haAddressPool = emptyList(), configuration = configuration, sslConfiguration = sslConfiguration, classLoader = classLoader) + ) : this( + hostAndPort = hostAndPort, + haAddressPool = emptyList(), + configuration = configuration, + sslConfiguration = sslConfiguration, + classLoader = classLoader + ) @JvmOverloads constructor( @@ -350,7 +394,13 @@ class CordaRPCClient private constructor( configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, sslConfiguration: ClientRpcSslOptions? = null, classLoader: ClassLoader? = null - ) : this(hostAndPort = null, haAddressPool = haAddressPool, configuration = configuration, sslConfiguration = sslConfiguration, classLoader = classLoader) + ) : this( + hostAndPort = null, + haAddressPool = haAddressPool, + configuration = configuration, + sslConfiguration = sslConfiguration, + classLoader = classLoader + ) // Here to keep the keep ABI compatibility happy companion object {} @@ -362,11 +412,23 @@ class CordaRPCClient private constructor( try { val cache = Caffeine.newBuilder().maximumSize(128).build().asMap() - // If the client has explicitly provided a classloader use this one to scan for custom serializers, otherwise use the current one. + // If the client has explicitly provided a classloader use this one to scan for custom serializers, + // otherwise use the current one. val serializationClassLoader = this.classLoader ?: this.javaClass.classLoader - val customSerializers = createInstancesOfClassesImplementing(serializationClassLoader, SerializationCustomSerializer::class.java) - val serializationWhitelists = ServiceLoader.load(SerializationWhitelist::class.java, serializationClassLoader).toSet() - AMQPClientSerializationScheme.initialiseSerialization(serializationClassLoader, customSerializers, serializationWhitelists, cache) + val customSerializers = createInstancesOfClassesImplementing( + serializationClassLoader, + SerializationCustomSerializer::class.java + ) + val serializationWhitelists = ServiceLoader.load( + SerializationWhitelist::class.java, + serializationClassLoader + ).toSet() + AMQPClientSerializationScheme.initialiseSerialization( + serializationClassLoader, + customSerializers, + serializationWhitelists, + cache + ) } catch (e: IllegalStateException) { // Race e.g. two of these constructed in parallel, ignore. } @@ -401,7 +463,11 @@ class CordaRPCClient private constructor( * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. */ @JvmOverloads - fun start(username: String, password: String, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection { + fun start( + username: String, + password: String, + gracefulReconnect: GracefulReconnect? = null + ): CordaRPCConnection { return start(username, password, null, null, gracefulReconnect) } @@ -418,7 +484,12 @@ class CordaRPCClient private constructor( * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. */ @JvmOverloads - fun start(username: String, password: String, targetLegalIdentity: CordaX500Name, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection { + fun start( + username: String, + password: String, + targetLegalIdentity: CordaX500Name, + gracefulReconnect: GracefulReconnect? = null + ): CordaRPCConnection { return start(username, password, null, null, targetLegalIdentity, gracefulReconnect) } @@ -436,7 +507,13 @@ class CordaRPCClient private constructor( * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. */ @JvmOverloads - fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection { + fun start( + username: String, + password: String, + externalTrace: Trace?, + impersonatedActor: Actor?, + gracefulReconnect: GracefulReconnect? = null + ): CordaRPCConnection { return start(username, password, externalTrace, impersonatedActor, null, gracefulReconnect) } @@ -457,7 +534,14 @@ class CordaRPCClient private constructor( * @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout. */ @JvmOverloads - fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection { + fun start( + username: String, + password: String, + externalTrace: Trace?, + impersonatedActor: Actor?, + targetLegalIdentity: CordaX500Name?, + gracefulReconnect: GracefulReconnect? = null + ): CordaRPCConnection { val addresses = if (haAddressPool.isEmpty()) { listOf(hostAndPort!!) } else { @@ -465,9 +549,23 @@ class CordaRPCClient private constructor( } return if (gracefulReconnect != null) { - CordaRPCConnection.createWithGracefulReconnection(username, password, addresses, configuration, gracefulReconnect) + CordaRPCConnection.createWithGracefulReconnection( + username, + password, + addresses, + configuration, + gracefulReconnect, + sslConfiguration + ) } else { - CordaRPCConnection(getRpcClient().start(InternalCordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity)) + CordaRPCConnection(getRpcClient().start( + InternalCordaRPCOps::class.java, + username, + password, + externalTrace, + impersonatedActor, + targetLegalIdentity + )) } } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index c460f39926..a6542d1451 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -32,37 +32,24 @@ import java.util.concurrent.TimeUnit /** * Wrapper over [CordaRPCOps] that handles exceptions when the node or the connection to the node fail. * - * All operations are retried on failure, except flow start operations that die before receiving a valid [FlowHandle], in which case a [CouldNotStartFlowException] is thrown. + * All operations are retried on failure, except flow start operations that die before receiving a valid [FlowHandle], in which case a + * [CouldNotStartFlowException] is thrown. * * When calling methods that return a [DataFeed] like [CordaRPCOps.vaultTrackBy], the returned [DataFeed.updates] object will no longer * be a usable [rx.Observable] but an instance of [ReconnectingObservable]. - * The caller has to explicitly cast to [ReconnectingObservable] and call [ReconnectingObservable.subscribe]. If used as an [rx.Observable] it will just fail. + * The caller has to explicitly cast to [ReconnectingObservable] and call [ReconnectingObservable.subscribe]. If used as an [rx.Observable] + * it will just fail. * The returned [DataFeed.snapshot] is the snapshot as it was when the feed was first retrieved. * * Note: There is no guarantee that observations will not be lost. * * *This class is not a stable API. Any project that wants to use it, must copy and paste it.* */ -// TODO The executor service is not needed. All we need is a single thread that deals with reconnecting and onto which ReconnectingObservables -// and other things can attach themselves as listeners for reconnect events. +// TODO The executor service is not needed. All we need is a single thread that deals with reconnecting and onto which +// ReconnectingObservables and other things can attach themselves as listeners for reconnect events. class ReconnectingCordaRPCOps private constructor( - val reconnectingRPCConnection: ReconnectingRPCConnection, - private val observersPool: ExecutorService, - private val userPool: Boolean -) : AutoCloseable, InternalCordaRPCOps by proxy(reconnectingRPCConnection, observersPool) { - // Constructors that mirror CordaRPCClient. - constructor( - nodeHostAndPort: NetworkHostAndPort, - username: String, - password: String, - rpcConfiguration: CordaRPCClientConfiguration, - sslConfiguration: ClientRpcSslOptions? = null, - classLoader: ClassLoader? = null, - observersPool: ExecutorService? = null - ) : this( - ReconnectingRPCConnection(listOf(nodeHostAndPort), username, password, rpcConfiguration, sslConfiguration, classLoader), - observersPool ?: Executors.newCachedThreadPool(), - observersPool != null) + val reconnectingRPCConnection: ReconnectingRPCConnection +) : InternalCordaRPCOps by proxy(reconnectingRPCConnection) { constructor( nodeHostAndPorts: List, username: String, @@ -71,18 +58,23 @@ class ReconnectingCordaRPCOps private constructor( gracefulReconnect: GracefulReconnect? = null, sslConfiguration: ClientRpcSslOptions? = null, classLoader: ClassLoader? = null, - observersPool: ExecutorService? = null - ) : this( - ReconnectingRPCConnection(nodeHostAndPorts, username, password, rpcConfiguration, sslConfiguration, classLoader, gracefulReconnect), - observersPool ?: Executors.newCachedThreadPool(), - observersPool != null) + observersPool: ExecutorService + ) : this(ReconnectingRPCConnection( + nodeHostAndPorts, + username, + password, + rpcConfiguration, + sslConfiguration, + classLoader, + gracefulReconnect, + observersPool)) private companion object { private val log = contextLogger() - private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection, observersPool: ExecutorService): InternalCordaRPCOps { + private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection): InternalCordaRPCOps { return Proxy.newProxyInstance( this::class.java.classLoader, arrayOf(InternalCordaRPCOps::class.java), - ErrorInterceptingHandler(reconnectingRPCConnection, observersPool)) as InternalCordaRPCOps + ErrorInterceptingHandler(reconnectingRPCConnection)) as InternalCordaRPCOps } } private val retryFlowsPool = Executors.newScheduledThreadPool(1) @@ -125,7 +117,8 @@ class ReconnectingCordaRPCOps private constructor( val rpcConfiguration: CordaRPCClientConfiguration, val sslConfiguration: ClientRpcSslOptions? = null, val classLoader: ClassLoader?, - val gracefulReconnect: GracefulReconnect? = null + val gracefulReconnect: GracefulReconnect? = null, + val observersPool: ExecutorService ) : RPCConnection { private var currentRPCConnection: CordaRPCConnection? = null enum class CurrentState { @@ -178,12 +171,18 @@ class ReconnectingCordaRPCOps private constructor( return currentRPCConnection!! } - private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, roundRobinIndex: Int = 0): CordaRPCConnection { + private tailrec fun establishConnectionWithRetry( + retryInterval: Duration = 1.seconds, + roundRobinIndex: Int = 0 + ): CordaRPCConnection { val attemptedAddress = nodeHostAndPorts[roundRobinIndex] log.info("Connecting to: $attemptedAddress") try { return CordaRPCClient( - attemptedAddress, rpcConfiguration.copy(connectionMaxRetryInterval = retryInterval, maxReconnectAttempts = 1), sslConfiguration, classLoader + attemptedAddress, + rpcConfiguration.copy(connectionMaxRetryInterval = retryInterval, maxReconnectAttempts = 1), + sslConfiguration, + classLoader ).start(username, password).also { // Check connection is truly operational before returning it. require(it.proxy.nodeInfo().legalIdentitiesAndCerts.isNotEmpty()) { @@ -240,7 +239,7 @@ class ReconnectingCordaRPCOps private constructor( currentRPCConnection?.close() } } - private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection, val observersPool: ExecutorService) : InvocationHandler { + private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler { private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow") private fun checkIfIsStartFlow(method: Method, e: InvocationTargetException) { @@ -290,7 +289,7 @@ class ReconnectingCordaRPCOps private constructor( DataFeed::class.java -> { // Intercept the data feed methods and return a ReconnectingObservable instance val initialFeed: DataFeed = uncheckedCast(doInvoke(method, args)) - val observable = ReconnectingObservable(reconnectingRPCConnection, observersPool, initialFeed) { + val observable = ReconnectingObservable(reconnectingRPCConnection, initialFeed) { // This handles reconnecting and creates new feeds. uncheckedCast(this.invoke(reconnectingRPCConnection.proxy, method, args)) } @@ -302,8 +301,7 @@ class ReconnectingCordaRPCOps private constructor( } } - override fun close() { - if (!userPool) observersPool.shutdown() + fun close() { retryFlowsPool.shutdown() reconnectingRPCConnection.forceClose() } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt index 545b40531e..102b0a953c 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingObservable.kt @@ -4,21 +4,18 @@ import net.corda.core.messaging.DataFeed import rx.Observable import rx.Subscriber import rx.Subscription -import java.util.concurrent.ExecutorService import java.util.concurrent.atomic.AtomicReference class ReconnectingObservable private constructor(subscriber: ReconnectingSubscriber) : Observable(subscriber) { constructor( reconnectingRPCConnection: ReconnectingCordaRPCOps.ReconnectingRPCConnection, - executor: ExecutorService, initialDataFeed: DataFeed<*, T>, createDataFeed: () -> DataFeed<*, T> - ) : this(ReconnectingSubscriber(reconnectingRPCConnection, executor, initialDataFeed, createDataFeed)) + ) : this(ReconnectingSubscriber(reconnectingRPCConnection, initialDataFeed, createDataFeed)) private class ReconnectingSubscriber( private val reconnectingRPCConnection: ReconnectingCordaRPCOps.ReconnectingRPCConnection, - private val executor: ExecutorService, private val initialDataFeed: DataFeed<*, T>, private val createDataFeed: () -> DataFeed<*, T> ) : OnSubscribe, Subscription { @@ -59,7 +56,7 @@ class ReconnectingObservable private constructor(subscriber: ReconnectingSubs private fun scheduleResubscribe(error: Throwable) { if (unsubscribed) return - executor.execute { + reconnectingRPCConnection.observersPool.execute { if (unsubscribed) return@execute reconnectingRPCConnection.reconnectOnError(error) val newDataFeed = createDataFeed() diff --git a/detekt-baseline.xml b/detekt-baseline.xml index 081bee46e1..e5b90f5181 100644 --- a/detekt-baseline.xml +++ b/detekt-baseline.xml @@ -676,6 +676,7 @@ LargeClass:ConnectionStateMachine.kt$ConnectionStateMachine : BaseHandler LargeClass:ConstraintsPropagationTests.kt$ConstraintsPropagationTests LargeClass:CordaClassResolverTests.kt$CordaClassResolverTests + LargeClass:CordaRPCClient.kt$CordaRPCClient LargeClass:CordaRPCClientTest.kt$CordaRPCClientTest : NodeBasedTest LargeClass:CordaRPCOpsImpl.kt$CordaRPCOpsImpl : InternalCordaRPCOpsAutoCloseable LargeClass:CordaRPCOpsImplTest.kt$CordaRPCOpsImplTest @@ -818,8 +819,9 @@ LongParameterList:ContractJarTestUtils.kt$ContractJarTestUtils$(workingDir: Path, contractNames: List<String>, signed: Boolean = false, version: Int = 1, generateManifest: Boolean = true, jarFileName : String? = null) LongParameterList:ContractUpgradeTransactions.kt$ContractUpgradeLedgerTransaction$( inputs: List<StateAndRef<ContractState>> = this.inputs, notary: Party = this.notary, legacyContractAttachment: Attachment = this.legacyContractAttachment, upgradedContractClassName: ContractClassName = this.upgradedContract::class.java.name, upgradedContractAttachment: Attachment = this.upgradedContractAttachment, id: SecureHash = this.id, privacySalt: PrivacySalt = this.privacySalt, sigs: List<TransactionSignature> = this.sigs, networkParameters: NetworkParameters = this.networkParameters ) LongParameterList:ContractUpgradeTransactions.kt$ContractUpgradeLedgerTransaction.Companion$( inputs: List<StateAndRef<ContractState>>, notary: Party, legacyContractAttachment: Attachment, upgradedContractAttachment: Attachment, id: SecureHash, privacySalt: PrivacySalt, sigs: List<TransactionSignature>, networkParameters: NetworkParameters, upgradedContract: UpgradedContract<ContractState, *> ) - LongParameterList:CordaRPCClient.kt$CordaRPCClient$(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?, gracefulReconnect: GracefulReconnect? = null) + LongParameterList:CordaRPCClient.kt$CordaRPCClient$( username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?, gracefulReconnect: GracefulReconnect? = null ) LongParameterList:CordaRPCClient.kt$CordaRPCClientConfiguration$( connectionMaxRetryInterval: Duration = this.connectionMaxRetryInterval, minimumServerProtocolVersion: Int = this.minimumServerProtocolVersion, trackRpcCallSites: Boolean = this.trackRpcCallSites, reapInterval: Duration = this.reapInterval, observationExecutorPoolSize: Int = this.observationExecutorPoolSize, cacheConcurrencyLevel: Int = this.cacheConcurrencyLevel, connectionRetryInterval: Duration = this.connectionRetryInterval, connectionRetryIntervalMultiplier: Double = this.connectionRetryIntervalMultiplier, maxReconnectAttempts: Int = this.maxReconnectAttempts, maxFileSize: Int = this.maxFileSize, deduplicationCacheExpiry: Duration = this.deduplicationCacheExpiry ) + LongParameterList:CordaRPCClient.kt$CordaRPCConnection.Companion$( username: String, password: String, addresses: List<NetworkHostAndPort>, rpcConfiguration: CordaRPCClientConfiguration, gracefulReconnect: GracefulReconnect, sslConfiguration: ClientRpcSslOptions? = null, classLoader: ClassLoader? = null ) LongParameterList:CordaRPCOps.kt$( @Suppress("UNUSED_PARAMETER") flowConstructor: (A, B, C, D, E) -> R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E ) LongParameterList:CordaRPCOps.kt$( @Suppress("UNUSED_PARAMETER") flowConstructor: (A, B, C, D, E, F) -> R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E, arg5: F ) LongParameterList:CordaRPCOps.kt$( @Suppress("unused_parameter") flowConstructor: (A, B, C, D, E) -> R, arg0: A, arg1: B, arg2: C, arg3: D, arg4: E ) @@ -2870,7 +2872,6 @@ MaxLineLength:BFTSmartNotaryService.kt$BFTSmartNotaryService.Replica$val response = verifyAndCommitTx(commitRequest.payload.coreTransaction, commitRequest.callerIdentity, commitRequest.payload.requestSignature) MaxLineLength:BackpressureAwareTimedFlow.kt$BackpressureAwareTimedFlow$else -> throw throw IllegalArgumentException("We were expecting a ${ReceiveType::class.java.name} or WaitTimeUpdate but we instead got a ${unwrapped.javaClass.name} ($unwrapped)") MaxLineLength:BackpressureAwareTimedFlow.kt$BackpressureAwareTimedFlow$logger.info("Counterparty [${session.counterparty}] is busy - TimedFlow $runId has been asked to wait for an additional ${unwrapped.waitTime} seconds for completion.") - MaxLineLength:BankOfCordaClientApi.kt$BankOfCordaClientApi$ fun requestRPCIssue(rpcAddress: NetworkHostAndPort, params: IssueRequestParams): SignedTransaction MaxLineLength:BankOfCordaWebApi.kt$BankOfCordaWebApi$?: MaxLineLength:BankOfCordaWebApi.kt$BankOfCordaWebApi$rpc.startFlow(::CashIssueAndPaymentFlow, params.amount, issuerBankPartyRef, issueToParty, anonymous, notaryParty).returnValue.getOrThrow() MaxLineLength:BankOfCordaWebApi.kt$BankOfCordaWebApi$rpc.wellKnownPartyFromX500Name(params.issuerBankName) ?: return Response.status(Response.Status.FORBIDDEN).entity("Unable to locate ${params.issuerBankName} in identity service").build() @@ -3298,15 +3299,6 @@ MaxLineLength:CordaPersistence.kt$CordaPersistence$is SchemaManagementException -> throw HibernateSchemaChangeException("Incompatible schema change detected. Please run the node with database.initialiseSchema=true. Reason: ${e.message}", e) MaxLineLength:CordaPersistence.kt$CordaPersistence$val transaction = contextDatabase.currentOrNew(isolationLevel) // XXX: Does this code really support statement changing the contextDatabase? MaxLineLength:CordaRPCClient.kt$CordaRPCClient - MaxLineLength:CordaRPCClient.kt$CordaRPCClient$ @JvmOverloads fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection - MaxLineLength:CordaRPCClient.kt$CordaRPCClient$ @JvmOverloads fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection - MaxLineLength:CordaRPCClient.kt$CordaRPCClient$ @JvmOverloads fun start(username: String, password: String, targetLegalIdentity: CordaX500Name, gracefulReconnect: GracefulReconnect? = null): CordaRPCConnection - MaxLineLength:CordaRPCClient.kt$CordaRPCClient$( haAddressPool: List<NetworkHostAndPort>, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, sslConfiguration: ClientRpcSslOptions? = null, classLoader: ClassLoader? = null ) - MaxLineLength:CordaRPCClient.kt$CordaRPCClient$( hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration, sslConfiguration: ClientRpcSslOptions?, classLoader: ClassLoader? = null ) - MaxLineLength:CordaRPCClient.kt$CordaRPCClient$AMQPClientSerializationScheme.initialiseSerialization(serializationClassLoader, customSerializers, serializationWhitelists, cache) - MaxLineLength:CordaRPCClient.kt$CordaRPCClient$CordaRPCConnection(getRpcClient().start(InternalCordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity)) - MaxLineLength:CordaRPCClient.kt$CordaRPCClient$val customSerializers = createInstancesOfClassesImplementing(serializationClassLoader, SerializationCustomSerializer::class.java) - MaxLineLength:CordaRPCClient.kt$CordaRPCClient${ val cache = Caffeine.newBuilder().maximumSize(128).build<SerializationFactoryCacheKey, SerializerFactory>().asMap() // If the client has explicitly provided a classloader use this one to scan for custom serializers, otherwise use the current one. val serializationClassLoader = this.classLoader ?: this.javaClass.classLoader val customSerializers = createInstancesOfClassesImplementing(serializationClassLoader, SerializationCustomSerializer::class.java) val serializationWhitelists = ServiceLoader.load(SerializationWhitelist::class.java, serializationClassLoader).toSet() AMQPClientSerializationScheme.initialiseSerialization(serializationClassLoader, customSerializers, serializationWhitelists, cache) } MaxLineLength:CordaRPCClientReconnectionTest.kt$CordaRPCClientReconnectionTest$val addresses = listOf(NetworkHostAndPort("localhost", portAllocator.nextPort()), NetworkHostAndPort("localhost", portAllocator.nextPort())) MaxLineLength:CordaRPCClientTest.kt$CordaRPCClientTest$node.services.startFlow(CashIssueFlow(100.POUNDS, OpaqueBytes.of(1), identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow() MaxLineLength:CordaRPCClientTest.kt$CordaRPCClientTest$node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0), identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow() @@ -3721,9 +3713,8 @@ MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("hashLookup", "Checks if a transaction with matching Id hash exists.", HashLookupShellCommand::class.java) MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("output-format", "Commands to inspect and update the output format.", OutputFormatCommand::class.java) MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("run", "Runs a method from the CordaRPCOps interface on the node.", RunShellCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$ReconnectingCordaRPCOps(configuration.hostAndPort, username, password, CordaRPCClientConfiguration.DEFAULT, configuration.ssl, classLoader) MaxLineLength:InteractiveShell.kt$InteractiveShell$val stateObservable = runFlowFromString({ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) }, inputData, flowClazz, inputObjectMapper) - MaxLineLength:InteractiveShell.kt$InteractiveShell${ private val log = LoggerFactory.getLogger(javaClass) private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps private lateinit var ops: InternalCordaRPCOps private lateinit var rpcConn: AutoCloseable private var shell: Shell? = null private var classLoader: ClassLoader? = null private lateinit var shellConfiguration: ShellConfiguration private var onExit: () -> Unit = {} @JvmStatic fun getCordappsClassloader() = classLoader enum class OutputFormat { JSON, YAML } fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) { rpcOps = { username: String, password: String -> if (standalone) { ReconnectingCordaRPCOps(configuration.hostAndPort, username, password, CordaRPCClientConfiguration.DEFAULT, configuration.ssl, classLoader).also { rpcConn = it } } else { val client = CordaRPCClient(hostAndPort = configuration.hostAndPort, configuration = CordaRPCClientConfiguration.DEFAULT.copy( maxReconnectAttempts = 1 ), sslConfiguration = configuration.ssl, classLoader = classLoader) val connection = client.start(username, password) rpcConn = connection connection.proxy as InternalCordaRPCOps } } _startShell(configuration, classLoader) } private fun _startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) { shellConfiguration = configuration InteractiveShell.classLoader = classLoader val runSshDaemon = configuration.sshdPort != null val config = Properties() if (runSshDaemon) { // Enable SSH access. Note: these have to be strings, even though raw object assignments also work. config["crash.ssh.port"] = configuration.sshdPort?.toString() config["crash.auth"] = "corda" configuration.sshHostKeyDirectory?.apply { val sshKeysDir = configuration.sshHostKeyDirectory.createDirectories() config["crash.ssh.keypath"] = (sshKeysDir / "hostkey.pem").toString() config["crash.ssh.keygen"] = "true" } } ExternalResolver.INSTANCE.addCommand("output-format", "Commands to inspect and update the output format.", OutputFormatCommand::class.java) ExternalResolver.INSTANCE.addCommand("run", "Runs a method from the CordaRPCOps interface on the node.", RunShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("flow", "Commands to work with flows. Flows are how you can change the ledger.", FlowShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("start", "An alias for 'flow start'", StartShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("hashLookup", "Checks if a transaction with matching Id hash exists.", HashLookupShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("attachments", "Commands to extract information about attachments stored within the node", AttachmentShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("checkpoints", "Commands to extract information about checkpoints stored within the node", CheckpointShellCommand::class.java) shell = ShellLifecycle(configuration.commandsDirectory).start(config, configuration.user, configuration.password) } fun runLocalShell(onExit: () -> Unit = {}) { this.onExit = onExit val terminal = TerminalFactory.create() val consoleReader = ConsoleReader("Corda", FileInputStream(FileDescriptor.`in`), System.out, terminal) val jlineProcessor = JLineProcessor(terminal.isAnsiSupported, shell, consoleReader, System.out) InterruptHandler { jlineProcessor.interrupt() }.install() thread(name = "Command line shell processor", isDaemon = true) { Emoji.renderIfSupported { try { jlineProcessor.run() } catch (e: IndexOutOfBoundsException) { log.warn("Cannot parse malformed command.") } } } thread(name = "Command line shell terminator", isDaemon = true) { // Wait for the shell to finish. jlineProcessor.closed() log.info("Command shell has exited") terminal.restore() onExit.invoke() } } class ShellLifecycle(private val shellCommands: Path) : PluginLifeCycle() { fun start(config: Properties, localUserName: String = "", localUserPassword: String = ""): Shell { val classLoader = this.javaClass.classLoader val classpathDriver = ClassPathMountFactory(classLoader) val fileDriver = FileMountFactory(Utils.getCurrentDirectory()) val extraCommandsPath = shellCommands.toAbsolutePath().createDirectories() val commandsFS = FS.Builder() .register("file", fileDriver) .mount("file:$extraCommandsPath") .register("classpath", classpathDriver) .mount("classpath:/net/corda/tools/shell/") .mount("classpath:/crash/commands/") .build() val confFS = FS.Builder() .register("classpath", classpathDriver) .mount("classpath:/crash") .build() val discovery = object : ServiceLoaderDiscovery(classLoader) { override fun getPlugins(): Iterable<CRaSHPlugin<*>> { // Don't use the Java language plugin (we may not have tools.jar available at runtime), this // will cause any commands using JIT Java compilation to be suppressed. In CRaSH upstream that // is only the 'jmx' command. return super.getPlugins().filterNot { it is JavaLanguage } + CordaAuthenticationPlugin(rpcOps) } } val attributes = emptyMap<String, Any>() val context = PluginContext(discovery, attributes, commandsFS, confFS, classLoader) context.refresh() this.config = config start(context) ops = makeRPCOps(rpcOps, localUserName, localUserPassword) return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, ops, StdoutANSIProgressRenderer)) } } fun nodeInfo() = try { ops.nodeInfo() } catch (e: UndeclaredThrowableException) { throw e.cause ?: e } @JvmStatic fun setOutputFormat(outputFormat: OutputFormat) { this.outputFormat = outputFormat } @JvmStatic fun getOutputFormat(): OutputFormat { return outputFormat } fun createYamlInputMapper(rpcOps: CordaRPCOps): ObjectMapper { // Return a standard Corda Jackson object mapper, configured to use YAML by default and with extra // serializers. return JacksonSupport.createDefaultMapper(rpcOps, YAMLFactory(), true).apply { val rpcModule = SimpleModule().apply { addDeserializer(InputStream::class.java, InputStreamDeserializer) addDeserializer(UniqueIdentifier::class.java, UniqueIdentifierDeserializer) } registerModule(rpcModule) } } private fun createOutputMapper(outputFormat: OutputFormat): ObjectMapper { val factory = when(outputFormat) { OutputFormat.JSON -> JsonFactory() OutputFormat.YAML -> YAMLFactory().disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER) } return JacksonSupport.createNonRpcMapper(factory).apply { // Register serializers for stateful objects from libraries that are special to the RPC system and don't // make sense to print out to the screen. For classes we own, annotations can be used instead. val rpcModule = SimpleModule().apply { addSerializer(Observable::class.java, ObservableSerializer) addSerializer(InputStream::class.java, InputStreamSerializer) } registerModule(rpcModule) disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) enable(SerializationFeature.INDENT_OUTPUT) } } // TODO: A default renderer could be used, instead of an object mapper. See: http://www.crashub.org/1.3/reference.html#_renderers private var outputFormat = OutputFormat.YAML @VisibleForTesting lateinit var latch: CountDownLatch private set /** * Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out * the list of options if the request is ambiguous. Then parses [inputData] as constructor arguments using * the [runFlowFromString] method and starts the requested flow. Ctrl-C can be used to cancel. */ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps)) { val matches = try { rpcOps.registeredFlows().filter { nameFragment in it } } catch (e: PermissionException) { output.println(e.message ?: "Access denied", Color.red) return } if (matches.isEmpty()) { output.println("No matching flow found, run 'flow list' to see your options.", Color.red) return } else if (matches.size > 1 && matches.find { it.endsWith(nameFragment)} == null) { output.println("Ambiguous name provided, please be more specific. Your options are:") matches.forEachIndexed { i, s -> output.println("${i + 1}. $s", Color.yellow) } return } val flowName = matches.find { it.endsWith(nameFragment)} ?: matches.single() val flowClazz: Class<FlowLogic<*>> = if (classLoader != null) { uncheckedCast(Class.forName(flowName, true, classLoader)) } else { uncheckedCast(Class.forName(flowName)) } try { // Show the progress tracker on the console until the flow completes or is interrupted with a // Ctrl-C keypress. val stateObservable = runFlowFromString({ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) }, inputData, flowClazz, inputObjectMapper) latch = CountDownLatch(1) ansiProgressRenderer.render(stateObservable, latch::countDown) // Wait for the flow to end and the progress tracker to notice. By the time the latch is released // the tracker is done with the screen. while (!Thread.currentThread().isInterrupted) { try { latch.await() break } catch (e: InterruptedException) { try { rpcOps.killFlow(stateObservable.id) } finally { Thread.currentThread().interrupt() break } } } output.println("Flow completed with result: ${stateObservable.returnValue.get()}") } catch (e: NoApplicableConstructor) { output.println("No matching constructor found:", Color.red) e.errors.forEach { output.println("- $it", Color.red) } } catch (e: PermissionException) { output.println(e.message ?: "Access denied", Color.red) } catch (e: ExecutionException) { // ignoring it as already logged by the progress handler subscriber } finally { InputStreamDeserializer.closeAll() } } class NoApplicableConstructor(val errors: List<String>) : CordaException(this.toString()) { override fun toString() = (listOf("No applicable constructor for flow. Problems were:") + errors).joinToString(System.lineSeparator()) } /** * Tidies up a possibly generic type name by chopping off the package names of classes in a hard-coded set of * hierarchies that are known to be widely used and recognised, and also not have (m)any ambiguous names in them. * * This is used for printing error messages when something doesn't match. */ private fun maybeAbbreviateGenericType(type: Type, extraRecognisedPackage: String): String { val packagesToAbbreviate = listOf("java.", "net.corda.core.", "kotlin.", extraRecognisedPackage) fun shouldAbbreviate(typeName: String) = packagesToAbbreviate.any { typeName.startsWith(it) } fun abbreviated(typeName: String) = if (shouldAbbreviate(typeName)) typeName.split('.').last() else typeName fun innerLoop(type: Type): String = when (type) { is ParameterizedType -> { val args: List<String> = type.actualTypeArguments.map(::innerLoop) abbreviated(type.rawType.typeName) + '<' + args.joinToString(", ") + '>' } is GenericArrayType -> { innerLoop(type.genericComponentType) + "[]" } is Class<*> -> { if (type.isArray) abbreviated(type.simpleName) else abbreviated(type.name).replace('$', '.') } else -> type.toString() } return innerLoop(type) } @JvmStatic fun killFlowById(id: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps)) { try { val runId = try { inputObjectMapper.readValue(id, StateMachineRunId::class.java) } catch (e: JsonMappingException) { output.println("Cannot parse flow ID of '$id' - expecting a UUID.", Color.red) log.error("Failed to parse flow ID", e) return } if (rpcOps.killFlow(runId)) { output.println("Killed flow $runId", Color.yellow) } else { output.println("Failed to kill flow $runId", Color.red) } } finally { output.flush() } } // TODO: This utility is generally useful and might be better moved to the node class, or an RPC, if we can commit to making it stable API. /** * Given a [FlowLogic] class and a string in one-line Yaml form, finds an applicable constructor and starts * the flow, returning the created flow logic. Useful for lightweight invocation where text is preferable * to statically typed, compiled code. * * See the [StringToMethodCallParser] class to learn more about limitations and acceptable syntax. * * @throws NoApplicableConstructor if no constructor could be found for the given set of types. */ @Throws(NoApplicableConstructor::class) fun <T> runFlowFromString(invoke: (Class<out FlowLogic<T>>, Array<out Any?>) -> FlowProgressHandle<T>, inputData: String, clazz: Class<out FlowLogic<T>>, om: ObjectMapper): FlowProgressHandle<T> { val errors = ArrayList<String>() val parser = StringToMethodCallParser(clazz, om) val nameTypeList = getMatchingConstructorParamsAndTypes(parser, inputData, clazz) try { val args = parser.parseArguments(clazz.name, nameTypeList, inputData) return invoke(clazz, args) } catch (e: StringToMethodCallParser.UnparseableCallException.ReflectionDataMissing) { val argTypes = nameTypeList.map { (_, type) -> type } errors.add("$argTypes: <constructor missing parameter reflection data>") } catch (e: StringToMethodCallParser.UnparseableCallException) { val argTypes = nameTypeList.map { (_, type) -> type } errors.add("$argTypes: ${e.message}") } throw NoApplicableConstructor(errors) } private fun <T> getMatchingConstructorParamsAndTypes(parser: StringToMethodCallParser<FlowLogic<T>>, inputData: String, clazz: Class<out FlowLogic<T>>) : List<Pair<String, Type>> { val errors = ArrayList<String>() val classPackage = clazz.packageName_ lateinit var paramNamesFromConstructor: List<String> for (ctor in clazz.constructors) { // Attempt construction with the given arguments. fun getPrototype(): List<String> { val argTypes = ctor.genericParameterTypes.map { // If the type name is in the net.corda.core or java namespaces, chop off the package name // because these hierarchies don't have (m)any ambiguous names and the extra detail is just noise. maybeAbbreviateGenericType(it, classPackage) } return paramNamesFromConstructor.zip(argTypes).map { (name, type) -> "$name: $type" } } try { paramNamesFromConstructor = parser.paramNamesFromConstructor(ctor) val nameTypeList = paramNamesFromConstructor.zip(ctor.genericParameterTypes) parser.validateIsMatchingCtor(clazz.name, nameTypeList, inputData) return nameTypeList } catch (e: StringToMethodCallParser.UnparseableCallException.MissingParameter) { errors.add("${getPrototype()}: missing parameter ${e.paramName}") } catch (e: StringToMethodCallParser.UnparseableCallException.TooManyParameters) { errors.add("${getPrototype()}: too many parameters") } catch (e: StringToMethodCallParser.UnparseableCallException.ReflectionDataMissing) { val argTypes = ctor.genericParameterTypes.map { it.typeName } errors.add("$argTypes: <constructor missing parameter reflection data>") } catch (e: StringToMethodCallParser.UnparseableCallException) { val argTypes = ctor.genericParameterTypes.map { it.typeName } errors.add("$argTypes: ${e.message}") } } throw NoApplicableConstructor(errors) } // TODO Filtering on error/success when we will have some sort of flow auditing, for now it doesn't make much sense. @JvmStatic fun runStateMachinesView(out: RenderPrintWriter, rpcOps: CordaRPCOps): Any? { val proxy = rpcOps val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed() val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) } val subscriber = FlowWatchPrintingSubscriber(out) stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber) var result: Any? = subscriber.future if (result is Future<*>) { if (!result.isDone) { out.cls() out.println("Waiting for completion or Ctrl-C ... ") out.flush() } try { result = result.get() } catch (e: InterruptedException) { subscriber.unsubscribe() Thread.currentThread().interrupt() } catch (e: ExecutionException) { throw e.rootCause } catch (e: InvocationTargetException) { throw e.rootCause } } return result } @JvmStatic fun runAttachmentTrustInfoView( out: RenderPrintWriter, rpcOps: InternalCordaRPCOps ): Any { return AttachmentTrustTable(out, rpcOps.attachmentTrustInfos) } @JvmStatic fun runDumpCheckpoints(rpcOps: InternalCordaRPCOps) { rpcOps.dumpCheckpoints() } @JvmStatic fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps, inputObjectMapper: ObjectMapper): Any? { val cmd = input.joinToString(" ").trim { it <= ' ' } if (cmd.startsWith("startflow", ignoreCase = true)) { // The flow command provides better support and startFlow requires special handling anyway due to // the generic startFlow RPC interface which offers no type information with which to parse the // string form of the command. out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow) return null } else if (cmd.substringAfter(" ").trim().equals("gracefulShutdown", ignoreCase = true)) { return InteractiveShell.gracefulShutdown(out, cordaRPCOps) } var result: Any? = null try { InputStreamSerializer.invokeContext = context val parser = StringToMethodCallParser(CordaRPCOps::class.java, inputObjectMapper) val call = parser.parse(cordaRPCOps, cmd) result = call.call() if (result != null && result !== kotlin.Unit && result !is Void) { result = printAndFollowRPCResponse(result, out, outputFormat) } if (result is Future<*>) { if (!result.isDone) { out.println("Waiting for completion or Ctrl-C ... ") out.flush() } try { result = result.get() } catch (e: InterruptedException) { Thread.currentThread().interrupt() } catch (e: ExecutionException) { throw e.rootCause } catch (e: InvocationTargetException) { throw e.rootCause } } } catch (e: StringToMethodCallParser.UnparseableCallException) { out.println(e.message, Color.red) if (e !is StringToMethodCallParser.UnparseableCallException.NoSuchFile) { out.println("Please try 'man run' to learn what syntax is acceptable") } } catch (e: Exception) { out.println("RPC failed: ${e.rootCause}", Color.red) } finally { InputStreamSerializer.invokeContext = null InputStreamDeserializer.closeAll() } return result } @JvmStatic fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps) { fun display(statements: RenderPrintWriter.() -> Unit) { statements.invoke(userSessionOut) userSessionOut.flush() } var isShuttingDown = false try { display { println("Orchestrating a clean shutdown, press CTRL+C to cancel...") } isShuttingDown = true display { println("...enabling draining mode") println("...waiting for in-flight flows to be completed") } cordaRPCOps.terminate(true) val latch = CountDownLatch(1) @Suppress("DEPRECATION") cordaRPCOps.pendingFlowsCount().updates.doOnError { error -> log.error(error.message) throw error }.doAfterTerminate(latch::countDown).subscribe( // For each update. { (first, second) -> display { println("...remaining: $first / $second") } }, // On error. { error -> if (!isShuttingDown) { display { println("RPC failed: ${error.rootCause}", Color.red) } } }, // When completed. { rpcConn.close() // This will only show up in the standalone Shell, because the embedded one is killed as part of a node's shutdown. display { println("...done, quitting the shell now.") } onExit.invoke() }) while (!Thread.currentThread().isInterrupted) { try { latch.await() break } catch (e: InterruptedException) { try { cordaRPCOps.setFlowsDrainingModeEnabled(false) display { println("...cancelled clean shutdown.") } } finally { Thread.currentThread().interrupt() break } } } } catch (e: StringToMethodCallParser.UnparseableCallException) { display { println(e.message, Color.red) println("Please try 'man run' to learn what syntax is acceptable") } } catch (e: Exception) { if (!isShuttingDown) { display { println("RPC failed: ${e.rootCause}", Color.red) } } } finally { InputStreamSerializer.invokeContext = null InputStreamDeserializer.closeAll() } } private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter, outputFormat: OutputFormat): CordaFuture<Unit> { val outputMapper = createOutputMapper(outputFormat) val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } return maybeFollow(response, mapElement, out) } private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber<Any>() { private var count = 0 val future = openFuture<Unit>() init { // The future is public and can be completed by something else to indicate we don't wish to follow // anymore (e.g. the user pressing Ctrl-C). future.then { unsubscribe() } } @Synchronized override fun onCompleted() { toStream.println("Observable has completed") future.set(Unit) } @Synchronized override fun onNext(t: Any?) { count++ toStream.println("Observation $count: " + printerFun(t)) toStream.flush() } @Synchronized override fun onError(e: Throwable) { toStream.println("Observable completed with an error") e.printStackTrace(toStream) future.setException(e) } } private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> { // Match on a couple of common patterns for "important" observables. It's tough to do this in a generic // way because observables can be embedded anywhere in the object graph, and can emit other arbitrary // object graphs that contain yet more observables. So we just look for top level responses that follow // the standard "track" pattern, and print them until the user presses Ctrl-C if (response == null) return doneFuture(Unit) if (response is DataFeed<*, *>) { out.println("Snapshot:") out.println(printerFun(response.snapshot)) out.flush() out.println("Updates:") return printNextElements(response.updates, printerFun, out) } if (response is Observable<*>) { return printNextElements(response, printerFun, out) } out.println(printerFun(response)) return doneFuture(Unit) } private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> { val subscriber = PrintingSubscriber(printerFun, out) uncheckedCast(elements).subscribe(subscriber) return subscriber.future } } + MaxLineLength:InteractiveShell.kt$InteractiveShell${ private val log = LoggerFactory.getLogger(javaClass) private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps private lateinit var ops: InternalCordaRPCOps private lateinit var rpcConn: AutoCloseable private var shell: Shell? = null private var classLoader: ClassLoader? = null private lateinit var shellConfiguration: ShellConfiguration private var onExit: () -> Unit = {} @JvmStatic fun getCordappsClassloader() = classLoader enum class OutputFormat { JSON, YAML } fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) { rpcOps = { username: String, password: String -> val connection = if (standalone) { CordaRPCClient( configuration.hostAndPort, configuration.ssl, classLoader ).start(username, password, gracefulReconnect = GracefulReconnect()) } else { CordaRPCClient( hostAndPort = configuration.hostAndPort, configuration = CordaRPCClientConfiguration.DEFAULT.copy( maxReconnectAttempts = 1 ), sslConfiguration = configuration.ssl, classLoader = classLoader ).start(username, password) } rpcConn = connection connection.proxy as InternalCordaRPCOps } _startShell(configuration, classLoader) } private fun _startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) { shellConfiguration = configuration InteractiveShell.classLoader = classLoader val runSshDaemon = configuration.sshdPort != null val config = Properties() if (runSshDaemon) { // Enable SSH access. Note: these have to be strings, even though raw object assignments also work. config["crash.ssh.port"] = configuration.sshdPort?.toString() config["crash.auth"] = "corda" configuration.sshHostKeyDirectory?.apply { val sshKeysDir = configuration.sshHostKeyDirectory.createDirectories() config["crash.ssh.keypath"] = (sshKeysDir / "hostkey.pem").toString() config["crash.ssh.keygen"] = "true" } } ExternalResolver.INSTANCE.addCommand("output-format", "Commands to inspect and update the output format.", OutputFormatCommand::class.java) ExternalResolver.INSTANCE.addCommand("run", "Runs a method from the CordaRPCOps interface on the node.", RunShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("flow", "Commands to work with flows. Flows are how you can change the ledger.", FlowShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("start", "An alias for 'flow start'", StartShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("hashLookup", "Checks if a transaction with matching Id hash exists.", HashLookupShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("attachments", "Commands to extract information about attachments stored within the node", AttachmentShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("checkpoints", "Commands to extract information about checkpoints stored within the node", CheckpointShellCommand::class.java) shell = ShellLifecycle(configuration.commandsDirectory).start(config, configuration.user, configuration.password) } fun runLocalShell(onExit: () -> Unit = {}) { this.onExit = onExit val terminal = TerminalFactory.create() val consoleReader = ConsoleReader("Corda", FileInputStream(FileDescriptor.`in`), System.out, terminal) val jlineProcessor = JLineProcessor(terminal.isAnsiSupported, shell, consoleReader, System.out) InterruptHandler { jlineProcessor.interrupt() }.install() thread(name = "Command line shell processor", isDaemon = true) { Emoji.renderIfSupported { try { jlineProcessor.run() } catch (e: IndexOutOfBoundsException) { log.warn("Cannot parse malformed command.") } } } thread(name = "Command line shell terminator", isDaemon = true) { // Wait for the shell to finish. jlineProcessor.closed() log.info("Command shell has exited") terminal.restore() onExit.invoke() } } class ShellLifecycle(private val shellCommands: Path) : PluginLifeCycle() { fun start(config: Properties, localUserName: String = "", localUserPassword: String = ""): Shell { val classLoader = this.javaClass.classLoader val classpathDriver = ClassPathMountFactory(classLoader) val fileDriver = FileMountFactory(Utils.getCurrentDirectory()) val extraCommandsPath = shellCommands.toAbsolutePath().createDirectories() val commandsFS = FS.Builder() .register("file", fileDriver) .mount("file:$extraCommandsPath") .register("classpath", classpathDriver) .mount("classpath:/net/corda/tools/shell/") .mount("classpath:/crash/commands/") .build() val confFS = FS.Builder() .register("classpath", classpathDriver) .mount("classpath:/crash") .build() val discovery = object : ServiceLoaderDiscovery(classLoader) { override fun getPlugins(): Iterable<CRaSHPlugin<*>> { // Don't use the Java language plugin (we may not have tools.jar available at runtime), this // will cause any commands using JIT Java compilation to be suppressed. In CRaSH upstream that // is only the 'jmx' command. return super.getPlugins().filterNot { it is JavaLanguage } + CordaAuthenticationPlugin(rpcOps) } } val attributes = emptyMap<String, Any>() val context = PluginContext(discovery, attributes, commandsFS, confFS, classLoader) context.refresh() this.config = config start(context) ops = makeRPCOps(rpcOps, localUserName, localUserPassword) return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, ops, StdoutANSIProgressRenderer)) } } fun nodeInfo() = try { ops.nodeInfo() } catch (e: UndeclaredThrowableException) { throw e.cause ?: e } @JvmStatic fun setOutputFormat(outputFormat: OutputFormat) { this.outputFormat = outputFormat } @JvmStatic fun getOutputFormat(): OutputFormat { return outputFormat } fun createYamlInputMapper(rpcOps: CordaRPCOps): ObjectMapper { // Return a standard Corda Jackson object mapper, configured to use YAML by default and with extra // serializers. return JacksonSupport.createDefaultMapper(rpcOps, YAMLFactory(), true).apply { val rpcModule = SimpleModule().apply { addDeserializer(InputStream::class.java, InputStreamDeserializer) addDeserializer(UniqueIdentifier::class.java, UniqueIdentifierDeserializer) } registerModule(rpcModule) } } private fun createOutputMapper(outputFormat: OutputFormat): ObjectMapper { val factory = when(outputFormat) { OutputFormat.JSON -> JsonFactory() OutputFormat.YAML -> YAMLFactory().disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER) } return JacksonSupport.createNonRpcMapper(factory).apply { // Register serializers for stateful objects from libraries that are special to the RPC system and don't // make sense to print out to the screen. For classes we own, annotations can be used instead. val rpcModule = SimpleModule().apply { addSerializer(Observable::class.java, ObservableSerializer) addSerializer(InputStream::class.java, InputStreamSerializer) } registerModule(rpcModule) disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) enable(SerializationFeature.INDENT_OUTPUT) } } // TODO: A default renderer could be used, instead of an object mapper. See: http://www.crashub.org/1.3/reference.html#_renderers private var outputFormat = OutputFormat.YAML @VisibleForTesting lateinit var latch: CountDownLatch private set /** * Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out * the list of options if the request is ambiguous. Then parses [inputData] as constructor arguments using * the [runFlowFromString] method and starts the requested flow. Ctrl-C can be used to cancel. */ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps)) { val matches = try { rpcOps.registeredFlows().filter { nameFragment in it } } catch (e: PermissionException) { output.println(e.message ?: "Access denied", Color.red) return } if (matches.isEmpty()) { output.println("No matching flow found, run 'flow list' to see your options.", Color.red) return } else if (matches.size > 1 && matches.find { it.endsWith(nameFragment)} == null) { output.println("Ambiguous name provided, please be more specific. Your options are:") matches.forEachIndexed { i, s -> output.println("${i + 1}. $s", Color.yellow) } return } val flowName = matches.find { it.endsWith(nameFragment)} ?: matches.single() val flowClazz: Class<FlowLogic<*>> = if (classLoader != null) { uncheckedCast(Class.forName(flowName, true, classLoader)) } else { uncheckedCast(Class.forName(flowName)) } try { // Show the progress tracker on the console until the flow completes or is interrupted with a // Ctrl-C keypress. val stateObservable = runFlowFromString({ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) }, inputData, flowClazz, inputObjectMapper) latch = CountDownLatch(1) ansiProgressRenderer.render(stateObservable, latch::countDown) // Wait for the flow to end and the progress tracker to notice. By the time the latch is released // the tracker is done with the screen. while (!Thread.currentThread().isInterrupted) { try { latch.await() break } catch (e: InterruptedException) { try { rpcOps.killFlow(stateObservable.id) } finally { Thread.currentThread().interrupt() break } } } output.println("Flow completed with result: ${stateObservable.returnValue.get()}") } catch (e: NoApplicableConstructor) { output.println("No matching constructor found:", Color.red) e.errors.forEach { output.println("- $it", Color.red) } } catch (e: PermissionException) { output.println(e.message ?: "Access denied", Color.red) } catch (e: ExecutionException) { // ignoring it as already logged by the progress handler subscriber } finally { InputStreamDeserializer.closeAll() } } class NoApplicableConstructor(val errors: List<String>) : CordaException(this.toString()) { override fun toString() = (listOf("No applicable constructor for flow. Problems were:") + errors).joinToString(System.lineSeparator()) } /** * Tidies up a possibly generic type name by chopping off the package names of classes in a hard-coded set of * hierarchies that are known to be widely used and recognised, and also not have (m)any ambiguous names in them. * * This is used for printing error messages when something doesn't match. */ private fun maybeAbbreviateGenericType(type: Type, extraRecognisedPackage: String): String { val packagesToAbbreviate = listOf("java.", "net.corda.core.", "kotlin.", extraRecognisedPackage) fun shouldAbbreviate(typeName: String) = packagesToAbbreviate.any { typeName.startsWith(it) } fun abbreviated(typeName: String) = if (shouldAbbreviate(typeName)) typeName.split('.').last() else typeName fun innerLoop(type: Type): String = when (type) { is ParameterizedType -> { val args: List<String> = type.actualTypeArguments.map(::innerLoop) abbreviated(type.rawType.typeName) + '<' + args.joinToString(", ") + '>' } is GenericArrayType -> { innerLoop(type.genericComponentType) + "[]" } is Class<*> -> { if (type.isArray) abbreviated(type.simpleName) else abbreviated(type.name).replace('$', '.') } else -> type.toString() } return innerLoop(type) } @JvmStatic fun killFlowById(id: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps)) { try { val runId = try { inputObjectMapper.readValue(id, StateMachineRunId::class.java) } catch (e: JsonMappingException) { output.println("Cannot parse flow ID of '$id' - expecting a UUID.", Color.red) log.error("Failed to parse flow ID", e) return } if (rpcOps.killFlow(runId)) { output.println("Killed flow $runId", Color.yellow) } else { output.println("Failed to kill flow $runId", Color.red) } } finally { output.flush() } } // TODO: This utility is generally useful and might be better moved to the node class, or an RPC, if we can commit to making it stable API. /** * Given a [FlowLogic] class and a string in one-line Yaml form, finds an applicable constructor and starts * the flow, returning the created flow logic. Useful for lightweight invocation where text is preferable * to statically typed, compiled code. * * See the [StringToMethodCallParser] class to learn more about limitations and acceptable syntax. * * @throws NoApplicableConstructor if no constructor could be found for the given set of types. */ @Throws(NoApplicableConstructor::class) fun <T> runFlowFromString(invoke: (Class<out FlowLogic<T>>, Array<out Any?>) -> FlowProgressHandle<T>, inputData: String, clazz: Class<out FlowLogic<T>>, om: ObjectMapper): FlowProgressHandle<T> { val errors = ArrayList<String>() val parser = StringToMethodCallParser(clazz, om) val nameTypeList = getMatchingConstructorParamsAndTypes(parser, inputData, clazz) try { val args = parser.parseArguments(clazz.name, nameTypeList, inputData) return invoke(clazz, args) } catch (e: StringToMethodCallParser.UnparseableCallException.ReflectionDataMissing) { val argTypes = nameTypeList.map { (_, type) -> type } errors.add("$argTypes: <constructor missing parameter reflection data>") } catch (e: StringToMethodCallParser.UnparseableCallException) { val argTypes = nameTypeList.map { (_, type) -> type } errors.add("$argTypes: ${e.message}") } throw NoApplicableConstructor(errors) } private fun <T> getMatchingConstructorParamsAndTypes(parser: StringToMethodCallParser<FlowLogic<T>>, inputData: String, clazz: Class<out FlowLogic<T>>) : List<Pair<String, Type>> { val errors = ArrayList<String>() val classPackage = clazz.packageName_ lateinit var paramNamesFromConstructor: List<String> for (ctor in clazz.constructors) { // Attempt construction with the given arguments. fun getPrototype(): List<String> { val argTypes = ctor.genericParameterTypes.map { // If the type name is in the net.corda.core or java namespaces, chop off the package name // because these hierarchies don't have (m)any ambiguous names and the extra detail is just noise. maybeAbbreviateGenericType(it, classPackage) } return paramNamesFromConstructor.zip(argTypes).map { (name, type) -> "$name: $type" } } try { paramNamesFromConstructor = parser.paramNamesFromConstructor(ctor) val nameTypeList = paramNamesFromConstructor.zip(ctor.genericParameterTypes) parser.validateIsMatchingCtor(clazz.name, nameTypeList, inputData) return nameTypeList } catch (e: StringToMethodCallParser.UnparseableCallException.MissingParameter) { errors.add("${getPrototype()}: missing parameter ${e.paramName}") } catch (e: StringToMethodCallParser.UnparseableCallException.TooManyParameters) { errors.add("${getPrototype()}: too many parameters") } catch (e: StringToMethodCallParser.UnparseableCallException.ReflectionDataMissing) { val argTypes = ctor.genericParameterTypes.map { it.typeName } errors.add("$argTypes: <constructor missing parameter reflection data>") } catch (e: StringToMethodCallParser.UnparseableCallException) { val argTypes = ctor.genericParameterTypes.map { it.typeName } errors.add("$argTypes: ${e.message}") } } throw NoApplicableConstructor(errors) } // TODO Filtering on error/success when we will have some sort of flow auditing, for now it doesn't make much sense. @JvmStatic fun runStateMachinesView(out: RenderPrintWriter, rpcOps: CordaRPCOps): Any? { val proxy = rpcOps val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed() val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) } val subscriber = FlowWatchPrintingSubscriber(out) stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber) var result: Any? = subscriber.future if (result is Future<*>) { if (!result.isDone) { out.cls() out.println("Waiting for completion or Ctrl-C ... ") out.flush() } try { result = result.get() } catch (e: InterruptedException) { subscriber.unsubscribe() Thread.currentThread().interrupt() } catch (e: ExecutionException) { throw e.rootCause } catch (e: InvocationTargetException) { throw e.rootCause } } return result } @JvmStatic fun runAttachmentTrustInfoView( out: RenderPrintWriter, rpcOps: InternalCordaRPCOps ): Any { return AttachmentTrustTable(out, rpcOps.attachmentTrustInfos) } @JvmStatic fun runDumpCheckpoints(rpcOps: InternalCordaRPCOps) { rpcOps.dumpCheckpoints() } @JvmStatic fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps, inputObjectMapper: ObjectMapper): Any? { val cmd = input.joinToString(" ").trim { it <= ' ' } if (cmd.startsWith("startflow", ignoreCase = true)) { // The flow command provides better support and startFlow requires special handling anyway due to // the generic startFlow RPC interface which offers no type information with which to parse the // string form of the command. out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow) return null } else if (cmd.substringAfter(" ").trim().equals("gracefulShutdown", ignoreCase = true)) { return InteractiveShell.gracefulShutdown(out, cordaRPCOps) } var result: Any? = null try { InputStreamSerializer.invokeContext = context val parser = StringToMethodCallParser(CordaRPCOps::class.java, inputObjectMapper) val call = parser.parse(cordaRPCOps, cmd) result = call.call() if (result != null && result !== kotlin.Unit && result !is Void) { result = printAndFollowRPCResponse(result, out, outputFormat) } if (result is Future<*>) { if (!result.isDone) { out.println("Waiting for completion or Ctrl-C ... ") out.flush() } try { result = result.get() } catch (e: InterruptedException) { Thread.currentThread().interrupt() } catch (e: ExecutionException) { throw e.rootCause } catch (e: InvocationTargetException) { throw e.rootCause } } } catch (e: StringToMethodCallParser.UnparseableCallException) { out.println(e.message, Color.red) if (e !is StringToMethodCallParser.UnparseableCallException.NoSuchFile) { out.println("Please try 'man run' to learn what syntax is acceptable") } } catch (e: Exception) { out.println("RPC failed: ${e.rootCause}", Color.red) } finally { InputStreamSerializer.invokeContext = null InputStreamDeserializer.closeAll() } return result } @JvmStatic fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps) { fun display(statements: RenderPrintWriter.() -> Unit) { statements.invoke(userSessionOut) userSessionOut.flush() } var isShuttingDown = false try { display { println("Orchestrating a clean shutdown, press CTRL+C to cancel...") } isShuttingDown = true display { println("...enabling draining mode") println("...waiting for in-flight flows to be completed") } cordaRPCOps.terminate(true) val latch = CountDownLatch(1) @Suppress("DEPRECATION") cordaRPCOps.pendingFlowsCount().updates.doOnError { error -> log.error(error.message) throw error }.doAfterTerminate(latch::countDown).subscribe( // For each update. { (first, second) -> display { println("...remaining: $first / $second") } }, // On error. { error -> if (!isShuttingDown) { display { println("RPC failed: ${error.rootCause}", Color.red) } } }, // When completed. { rpcConn.close() // This will only show up in the standalone Shell, because the embedded one is killed as part of a node's shutdown. display { println("...done, quitting the shell now.") } onExit.invoke() }) while (!Thread.currentThread().isInterrupted) { try { latch.await() break } catch (e: InterruptedException) { try { cordaRPCOps.setFlowsDrainingModeEnabled(false) display { println("...cancelled clean shutdown.") } } finally { Thread.currentThread().interrupt() break } } } } catch (e: StringToMethodCallParser.UnparseableCallException) { display { println(e.message, Color.red) println("Please try 'man run' to learn what syntax is acceptable") } } catch (e: Exception) { if (!isShuttingDown) { display { println("RPC failed: ${e.rootCause}", Color.red) } } } finally { InputStreamSerializer.invokeContext = null InputStreamDeserializer.closeAll() } } private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter, outputFormat: OutputFormat): CordaFuture<Unit> { val outputMapper = createOutputMapper(outputFormat) val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } return maybeFollow(response, mapElement, out) } private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber<Any>() { private var count = 0 val future = openFuture<Unit>() init { // The future is public and can be completed by something else to indicate we don't wish to follow // anymore (e.g. the user pressing Ctrl-C). future.then { unsubscribe() } } @Synchronized override fun onCompleted() { toStream.println("Observable has completed") future.set(Unit) } @Synchronized override fun onNext(t: Any?) { count++ toStream.println("Observation $count: " + printerFun(t)) toStream.flush() } @Synchronized override fun onError(e: Throwable) { toStream.println("Observable completed with an error") e.printStackTrace(toStream) future.setException(e) } } private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> { // Match on a couple of common patterns for "important" observables. It's tough to do this in a generic // way because observables can be embedded anywhere in the object graph, and can emit other arbitrary // object graphs that contain yet more observables. So we just look for top level responses that follow // the standard "track" pattern, and print them until the user presses Ctrl-C if (response == null) return doneFuture(Unit) if (response is DataFeed<*, *>) { out.println("Snapshot:") out.println(printerFun(response.snapshot)) out.flush() out.println("Updates:") return printNextElements(response.updates, printerFun, out) } if (response is Observable<*>) { return printNextElements(response, printerFun, out) } out.println(printerFun(response)) return doneFuture(Unit) } private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> { val subscriber = PrintingSubscriber(printerFun, out) uncheckedCast(elements).subscribe(subscriber) return subscriber.future } } MaxLineLength:InteractiveShell.kt$InteractiveShell.NoApplicableConstructor$override fun toString() MaxLineLength:InteractiveShellIntegrationTest.kt$InteractiveShellIntegrationTest$private MaxLineLength:InterestSwapRestAPI.kt$InterestRateSwapAPI @@ -4117,7 +4108,7 @@ MaxLineLength:NodeKeystoreCheckTest.kt$NodeKeystoreCheckTest$setPrivateKey(X509Utilities.CORDA_CLIENT_CA, nodeCA.keyPair.private, listOf(badNodeCACert, badRoot), signingCertStore.entryPassword) MaxLineLength:NodeKeystoreCheckTest.kt$NodeKeystoreCheckTest$val badNodeCACert = X509Utilities.createCertificate(CertificateType.NODE_CA, badRoot, badRootKeyPair, ALICE_NAME.x500Principal, nodeCA.keyPair.public) MaxLineLength:NodeKeystoreCheckTest.kt$NodeKeystoreCheckTest$val p2pSslConfig = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory, keyStorePassword = keystorePassword, trustStorePassword = keystorePassword) - MaxLineLength:NodeMonitorModel.kt$NodeMonitorModel${ rpc = ReconnectingCordaRPCOps(nodeHostAndPort, username, password, CordaRPCClientConfiguration.DEFAULT) proxyObservable.value = rpc // Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates val (statesSnapshot, vaultUpdates) = rpc.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL), PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE)) val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ -> statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED }.toSet() val consumedStates = statesSnapshot.states.toSet() - unconsumedStates val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet()) vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject::onNext) // Transactions val (transactions, newTransactions) = @Suppress("DEPRECATION") rpc.internalVerifiedTransactionsFeed() newTransactions.startWith(transactions).subscribe(transactionsSubject::onNext) // SM -> TX mapping val (smTxMappings, futureSmTxMappings) = rpc.stateMachineRecordedTransactionMappingFeed() futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject::onNext) // Parties on network val (parties, futurePartyUpdate) = rpc.networkMapFeed() futurePartyUpdate.startWith(parties.map(MapChange::Added)).subscribe(networkMapSubject::onNext) val stateMachines = rpc.stateMachinesSnapshot() notaryIdentities = rpc.notaryIdentities() // Extract the flow tracking stream // TODO is there a nicer way of doing this? Stream of streams in general results in code like this... // TODO `progressTrackingSubject` doesn't seem to be used anymore - should it be removed? val currentProgressTrackerUpdates = stateMachines.mapNotNull { stateMachine -> ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachine) } val futureProgressTrackerUpdates = stateMachineUpdatesSubject.map { stateMachineUpdate -> if (stateMachineUpdate is StateMachineUpdate.Added) { ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo) ?: Observable.empty<ProgressTrackingEvent>() } else { Observable.empty<ProgressTrackingEvent>() } } // We need to retry, because when flow errors, we unsubscribe from progressTrackingSubject. So we end up with stream of state machine updates and no progress trackers. futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject) } + MaxLineLength:NodeMonitorModel.kt$NodeMonitorModel${ rpc = CordaRPCClient(nodeHostAndPort).start(username, password) proxyObservable.value = rpc.proxy // Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates val ( statesSnapshot, vaultUpdates ) = rpc.proxy.vaultTrackBy<ContractState>( QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL), PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE) ) val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ -> statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED }.toSet() val consumedStates = statesSnapshot.states.toSet() - unconsumedStates val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet()) vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject::onNext) // Transactions val (transactions, newTransactions) = @Suppress("DEPRECATION") rpc.proxy.internalVerifiedTransactionsFeed() newTransactions.startWith(transactions).subscribe(transactionsSubject::onNext) // SM -> TX mapping val (smTxMappings, futureSmTxMappings) = rpc.proxy.stateMachineRecordedTransactionMappingFeed() futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject::onNext) // Parties on network val (parties, futurePartyUpdate) = rpc.proxy.networkMapFeed() futurePartyUpdate.startWith(parties.map(MapChange::Added)).subscribe(networkMapSubject::onNext) val stateMachines = rpc.proxy.stateMachinesSnapshot() notaryIdentities = rpc.proxy.notaryIdentities() // Extract the flow tracking stream // TODO is there a nicer way of doing this? Stream of streams in general results in code like this... // TODO `progressTrackingSubject` doesn't seem to be used anymore - should it be removed? val currentProgressTrackerUpdates = stateMachines.mapNotNull { stateMachine -> ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachine) } val futureProgressTrackerUpdates = stateMachineUpdatesSubject.map { stateMachineUpdate -> if (stateMachineUpdate is StateMachineUpdate.Added) { ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo) ?: Observable.empty<ProgressTrackingEvent>() } else { Observable.empty<ProgressTrackingEvent>() } } // We need to retry, because when flow errors, we unsubscribe from progressTrackingSubject. So we end up with stream of state machine updates and no progress trackers. futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject) } MaxLineLength:NodeNamedCache.kt$DefaultNamedCacheFactory$name.startsWith("RPCSecurityManagerShiroCache_") -> with(security?.authService?.options?.cache!!) { caffeine.maximumSize(maxEntries).expireAfterWrite(expireAfterSecs, TimeUnit.SECONDS) } MaxLineLength:NodeNamedCache.kt$DefaultNamedCacheFactory$open MaxLineLength:NodeNamedCache.kt$DefaultNamedCacheFactory$override fun bindWithConfig(nodeConfiguration: NodeConfiguration): BindableNamedCacheFactory @@ -4208,9 +4199,6 @@ MaxLineLength:NodeVaultServiceTest.kt$NodeVaultServiceTest$val unlockedStates1 = vaultService.queryBy<Cash.State>(VaultQueryCriteria(softLockingCondition = SoftLockingCondition(SoftLockingType.UNLOCKED_ONLY))).states MaxLineLength:NodeVaultServiceTest.kt$NodeVaultServiceTest$val unlockedStates2 = vaultService.queryBy<Cash.State>(VaultQueryCriteria(softLockingCondition = SoftLockingCondition(SoftLockingType.UNLOCKED_ONLY))).states MaxLineLength:NodeVaultServiceTest.kt$NodeVaultServiceTest$vaultService.queryBy(Cash.State::class.java, QueryCriteria.VaultQueryCriteria(relevancyStatus = Vault.RelevancyStatus.ALL), PageSpecification(1)).totalStatesAvailable - MaxLineLength:NodeWebServer.kt$NodeWebServer$/** Fetch WebServerPluginRegistry classes registered in META-INF/services/net.corda.webserver.services.WebServerPluginRegistry files that exist in the classpath */ val pluginRegistries: List<WebServerPluginRegistry> by lazy { ServiceLoader.load(WebServerPluginRegistry::class.java).toList() } - MaxLineLength:NodeWebServer.kt$NodeWebServer$private fun reconnectingCordaRPCOps() - MaxLineLength:NodeWebServer.kt$NodeWebServer$val sslConnector = ServerConnector(server, SslConnectionFactory(sslContextFactory, "http/1.1"), HttpConnectionFactory(httpsConfiguration)) MaxLineLength:NonInvalidatingCache.kt$NonInvalidatingWeightBasedCache.Companion$private MaxLineLength:NonInvalidatingUnboundCache.kt$NonInvalidatingUnboundCache$constructor(name: String, cacheFactory: NamedCacheFactory, loadFunction: (K) -> V, removalListener: RemovalListener<K, V> = RemovalListener { _, _, _ -> }, keysToPreload: () -> Iterable<K> = { emptyList() }) : this(buildCache(name, cacheFactory, loadFunction, removalListener, keysToPreload)) MaxLineLength:NonInvalidatingUnboundCache.kt$NonInvalidatingUnboundCache.Companion$private @@ -4558,12 +4546,7 @@ MaxLineLength:ReceiveTransactionFlow.kt$ReceiveStateAndRefFlow<out T : ContractState> : FlowLogic MaxLineLength:ReceiveTransactionFlow.kt$ReceiveTransactionFlow : FlowLogic MaxLineLength:ReceiveTransactionFlow.kt$ReceiveTransactionFlow$private val statesToRecord: StatesToRecord = StatesToRecord.NONE - MaxLineLength:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps : AutoCloseableInternalCordaRPCOps MaxLineLength:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps$ fun runFlowWithLogicalRetry(runFlow: (CordaRPCOps) -> StateMachineRunId, hasFlowStarted: (CordaRPCOps) -> Boolean, onFlowConfirmed: () -> Unit = {}, timeout: Duration = 4.seconds) - MaxLineLength:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps$ReconnectingRPCConnection(nodeHostAndPorts, username, password, rpcConfiguration, sslConfiguration, classLoader, gracefulReconnect) - MaxLineLength:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ErrorInterceptingHandler$private - MaxLineLength:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ReconnectingRPCConnection$attemptedAddress - MaxLineLength:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ReconnectingRPCConnection$private tailrec MaxLineLength:ReferenceInputStateTests.kt$ReferenceStateTests$output(ExampleContract::class.java.typeName, "UPDATED REF DATA", "REF DATA".output<ExampleState>().copy(data = "NEW STUFF!")) MaxLineLength:ReferenceInputStateTests.kt$ReferenceStateTests$val stateAndRef = StateAndRef(TransactionState(state, CONTRACT_ID, DUMMY_NOTARY, constraint = AlwaysAcceptAttachmentConstraint), StateRef(SecureHash.zeroHash, 0)) MaxLineLength:ReferencedStatesFlowTests.kt$ReferencedStatesFlowTests$assertEquals(2, nodes[2].services.vaultService.queryBy<LinearState>(QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)).states.size) @@ -4604,7 +4587,6 @@ MaxLineLength:RpcReconnectTests.kt$RpcReconnectTests$fun startBankA(address: NetworkHostAndPort) MaxLineLength:RpcReconnectTests.kt$RpcReconnectTests$fun startProxy(addressPair: AddressPair) MaxLineLength:RpcReconnectTests.kt$RpcReconnectTests$numReconnects++ // We only expect to see a single reconnectOnError in the stack trace. Otherwise we're in danger of stack overflow recursion maxStackOccurrences.set(max(maxStackOccurrences.get(), currentStackTrace().count { it.methodName == "reconnectOnError" })) Unit - MaxLineLength:RpcReconnectTests.kt$RpcReconnectTests$val bankAReconnectingRpc = client.start(demoUser.username, demoUser.password, gracefulReconnect = reconnect).proxy as ReconnectingCordaRPCOps MaxLineLength:RpcReconnectTests.kt$RpcReconnectTests$val criteria = QueryCriteria.VaultCustomQueryCriteria(builder { CashSchemaV1.PersistentCashState::pennies.equal(amount.toLong() * 100) }, status = Vault.StateStatus.ALL) MaxLineLength:SSLHelper.kt$val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java).map { LoggingTrustManagerWrapper(it) }.toTypedArray() MaxLineLength:SSLHelperTest.kt$SSLHelperTest$trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(CertificateStore.fromFile(trustStore.path, trustStore.storePassword, trustStore.entryPassword, false), false)) diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt index 071eb3822d..0984caaa88 100644 --- a/node/src/integration-test-slow/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt +++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt @@ -133,7 +133,8 @@ class RpcReconnectTests { } val reconnect = GracefulReconnect(onDisconnect = { numDisconnects++ }, onReconnect = onReconnect) val client = CordaRPCClient(addressesForRpc) - val bankAReconnectingRpc = client.start(demoUser.username, demoUser.password, gracefulReconnect = reconnect).proxy as ReconnectingCordaRPCOps + val bankAReconnectingRPCConnection = client.start(demoUser.username, demoUser.password, gracefulReconnect = reconnect) + val bankAReconnectingRpc = bankAReconnectingRPCConnection.proxy as ReconnectingCordaRPCOps // DOCEND rpcReconnectingRPC // Observe the vault and collect the observations. @@ -330,7 +331,7 @@ class RpcReconnectTests { // Stop the observers. vaultSubscription.unsubscribe() stateMachineSubscription.unsubscribe() - bankAReconnectingRpc.close() + bankAReconnectingRPCConnection.close() } proxy.close() diff --git a/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt b/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt index 6a96f92b56..0dfb0bf20d 100644 --- a/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt +++ b/samples/bank-of-corda-demo/src/main/kotlin/net/corda/bank/api/BankOfCordaClientApi.kt @@ -1,8 +1,8 @@ package net.corda.bank.api import net.corda.bank.api.BankOfCordaWebApi.IssueRequestParams -import net.corda.client.rpc.CordaRPCClientConfiguration -import net.corda.client.rpc.internal.ReconnectingCordaRPCOps +import net.corda.client.rpc.CordaRPCClient +import net.corda.client.rpc.GracefulReconnect import net.corda.core.messaging.startFlow import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.NetworkHostAndPort @@ -35,7 +35,9 @@ object BankOfCordaClientApi { * * @return a payment transaction (following successful issuance of cash to self). */ - fun requestRPCIssue(rpcAddress: NetworkHostAndPort, params: IssueRequestParams): SignedTransaction = requestRPCIssueHA(listOf(rpcAddress), params) + fun requestRPCIssue(rpcAddress: NetworkHostAndPort, params: IssueRequestParams): SignedTransaction { + return requestRPCIssueHA(listOf(rpcAddress), params) + } /** * RPC API @@ -44,20 +46,28 @@ object BankOfCordaClientApi { */ fun requestRPCIssueHA(availableRpcServers: List, params: IssueRequestParams): SignedTransaction { // TODO: privileged security controls required - ReconnectingCordaRPCOps(availableRpcServers, BOC_RPC_USER, BOC_RPC_PWD, CordaRPCClientConfiguration.DEFAULT).use { rpc-> - rpc.waitUntilNetworkReady().getOrThrow() + CordaRPCClient(availableRpcServers) + .start(BOC_RPC_USER, BOC_RPC_PWD, gracefulReconnect = GracefulReconnect()).use { rpc-> + rpc.proxy.waitUntilNetworkReady().getOrThrow() // Resolve parties via RPC - val issueToParty = rpc.wellKnownPartyFromX500Name(params.issueToPartyName) + val issueToParty = rpc.proxy.wellKnownPartyFromX500Name(params.issueToPartyName) ?: throw IllegalStateException("Unable to locate ${params.issueToPartyName} in Network Map Service") - val notaryLegalIdentity = rpc.notaryIdentities().firstOrNull { it.name == params.notaryName } + val notaryLegalIdentity = rpc.proxy.notaryIdentities().firstOrNull { it.name == params.notaryName } ?: throw IllegalStateException("Couldn't locate notary ${params.notaryName} in NetworkMapCache") val anonymous = true val issuerBankPartyRef = OpaqueBytes.of(params.issuerBankPartyRef.toByte()) - logger.info("${rpc.nodeInfo()} issuing ${params.amount} to transfer to $issueToParty ...") - return rpc.startFlow(::CashIssueAndPaymentFlow, params.amount, issuerBankPartyRef, issueToParty, anonymous, notaryLegalIdentity) + logger.info("${rpc.proxy.nodeInfo()} issuing ${params.amount} to transfer to $issueToParty ...") + return rpc.proxy.startFlow( + ::CashIssueAndPaymentFlow, + params.amount, + issuerBankPartyRef, + issueToParty, + anonymous, + notaryLegalIdentity + ) .returnValue.getOrThrow().stx } } diff --git a/testing/testserver/src/main/kotlin/net/corda/webserver/internal/NodeWebServer.kt b/testing/testserver/src/main/kotlin/net/corda/webserver/internal/NodeWebServer.kt index 256b61ce19..477c03e4b2 100644 --- a/testing/testserver/src/main/kotlin/net/corda/webserver/internal/NodeWebServer.kt +++ b/testing/testserver/src/main/kotlin/net/corda/webserver/internal/NodeWebServer.kt @@ -3,8 +3,9 @@ package net.corda.webserver.internal import com.google.common.html.HtmlEscapers.htmlEscaper import io.netty.channel.unix.Errors import net.corda.client.jackson.JacksonSupport -import net.corda.client.rpc.CordaRPCClientConfiguration -import net.corda.client.rpc.internal.ReconnectingCordaRPCOps +import net.corda.client.rpc.CordaRPCClient +import net.corda.client.rpc.CordaRPCConnection +import net.corda.client.rpc.GracefulReconnect import net.corda.core.internal.errors.AddressBindingException import net.corda.core.messaging.CordaRPCOps import net.corda.core.utilities.contextLogger @@ -46,8 +47,12 @@ class NodeWebServer(val config: WebServerConfig) { } fun run() { - while (server.isRunning) { - Thread.sleep(100) // TODO: Redesign + try { + while (server.isRunning) { + Thread.sleep(100) // TODO: Redesign + } + } finally { + rpc.close() } } @@ -75,7 +80,11 @@ class NodeWebServer(val config: WebServerConfig) { sslContextFactory.setIncludeProtocols("TLSv1.2") sslContextFactory.setExcludeCipherSuites(".*NULL.*", ".*RC4.*", ".*MD5.*", ".*DES.*", ".*DSS.*") sslContextFactory.setIncludeCipherSuites(".*AES.*GCM.*") - val sslConnector = ServerConnector(server, SslConnectionFactory(sslContextFactory, "http/1.1"), HttpConnectionFactory(httpsConfiguration)) + val sslConnector = ServerConnector( + server, + SslConnectionFactory(sslContextFactory, "http/1.1"), + HttpConnectionFactory(httpsConfiguration) + ) sslConnector.port = address.port sslConnector } else { @@ -175,9 +184,21 @@ class NodeWebServer(val config: WebServerConfig) { } } - private fun reconnectingCordaRPCOps() = ReconnectingCordaRPCOps(config.rpcAddress, config.runAs.username , config.runAs.password, CordaRPCClientConfiguration.DEFAULT, null, javaClass.classLoader) + private lateinit var rpc: CordaRPCConnection + private fun reconnectingCordaRPCOps(): CordaRPCOps { + rpc = CordaRPCClient(config.rpcAddress, null, javaClass.classLoader) + .start( + config.runAs.username, + config.runAs.password, + GracefulReconnect() + ) + return rpc.proxy + } - /** Fetch WebServerPluginRegistry classes registered in META-INF/services/net.corda.webserver.services.WebServerPluginRegistry files that exist in the classpath */ + /** + * Fetch WebServerPluginRegistry classes registered in META-INF/services/net.corda.webserver.services.WebServerPluginRegistry + * files that exist in the classpath + */ val pluginRegistries: List by lazy { ServiceLoader.load(WebServerPluginRegistry::class.java).toList() } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index 158d6f7bff..ac5edb6e7e 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -11,8 +11,8 @@ import net.corda.client.jackson.JacksonSupport import net.corda.client.jackson.StringToMethodCallParser import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration +import net.corda.client.rpc.GracefulReconnect import net.corda.client.rpc.PermissionException -import net.corda.client.rpc.internal.ReconnectingCordaRPCOps import net.corda.core.CordaException import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.UniqueIdentifier @@ -22,7 +22,6 @@ import net.corda.core.internal.* import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.messaging.InternalCordaRPCOps -import net.corda.core.internal.packageName_ import net.corda.core.messaging.* import net.corda.tools.shell.utlities.ANSIProgressRenderer import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer @@ -91,21 +90,24 @@ object InteractiveShell { fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) { rpcOps = { username: String, password: String -> - if (standalone) { - ReconnectingCordaRPCOps(configuration.hostAndPort, username, password, CordaRPCClientConfiguration.DEFAULT, configuration.ssl, classLoader).also { - rpcConn = it - } + val connection = if (standalone) { + CordaRPCClient( + configuration.hostAndPort, + configuration.ssl, + classLoader + ).start(username, password, gracefulReconnect = GracefulReconnect()) } else { - val client = CordaRPCClient(hostAndPort = configuration.hostAndPort, + CordaRPCClient( + hostAndPort = configuration.hostAndPort, configuration = CordaRPCClientConfiguration.DEFAULT.copy( maxReconnectAttempts = 1 ), sslConfiguration = configuration.ssl, - classLoader = classLoader) - val connection = client.start(username, password) - rpcConn = connection - connection.proxy as InternalCordaRPCOps + classLoader = classLoader + ).start(username, password) } + rpcConn = connection + connection.proxy as InternalCordaRPCOps } _startShell(configuration, classLoader) }