CORDA-3281: Drop some errors to warnings and clean up logic around (#5605)

shell "gracefulShutdown" command.
This commit is contained in:
Ryan Fowler 2019-10-22 11:02:04 +01:00 committed by bpaunescu
parent 61cdfa5b26
commit 5da114caa3
9 changed files with 214 additions and 124 deletions

View File

@ -3,14 +3,14 @@
<option name="LINE_SEPARATOR" value="&#10;" />
<option name="RIGHT_MARGIN" value="140" />
<option name="SOFT_MARGINS" value="140" />
<JavaCodeStyleSettings>
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
</JavaCodeStyleSettings>
<GroovyCodeStyleSettings>
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="100" />
</GroovyCodeStyleSettings>
<JavaCodeStyleSettings>
<option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
<option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="999" />
</JavaCodeStyleSettings>
<JetCodeStyleSettings>
<option name="PACKAGES_TO_USE_STAR_IMPORTS">
<value>

View File

@ -1,5 +1,6 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="USE_PER_PROJECT_SETTINGS" value="true" />
<option name="PREFERRED_PROJECT_CODE_STYLE" value="Default" />
</state>
</component>

View File

@ -75,13 +75,13 @@ class CordaRPCConnection private constructor(
override val serverProtocolVersion: Int get() = actualConnection.serverProtocolVersion
override fun notifyServerAndClose() = actualConnection.notifyServerAndClose()
override fun notifyServerAndClose() = doCloseLogic { actualConnection.notifyServerAndClose() }
override fun forceClose() = actualConnection.forceClose()
override fun forceClose() = doCloseLogic { actualConnection.forceClose() }
override fun close() {
private inline fun doCloseLogic(close: () -> Unit) {
try {
actualConnection.close()
close.invoke()
} finally {
observersPool?.apply {
shutdown()
@ -286,6 +286,7 @@ open class CordaRPCClientConfiguration @JvmOverloads constructor(
* The default value is 5.
*/
class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -> Unit = {}, val maxAttempts: Int = 5) {
@Suppress("unused") // constructor for java
@JvmOverloads
constructor(onDisconnect: Runnable, onReconnect: Runnable, maxAttempts: Int = 5) :
this(onDisconnect = { onDisconnect.run() }, onReconnect = { onReconnect.run() }, maxAttempts = maxAttempts)
@ -321,11 +322,15 @@ class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -
*
* If you want to enable a more graceful form of reconnection, you can make use of the gracefulReconnect argument of the [start] method.
* If this is set to true, then:
* - The client will automatically reconnect, when the connection is broken regardless of whether you provided a single or multiple addresses.
* - Simple RPC calls that return data (e.g. [CordaRPCOps.networkParameters]) will **block** and return after the connection has been re-established and the node is up.
* - RPC calls that return [rx.Observable]s (e.g. [CordaRPCOps.vaultTrack]) will automatically reconnect and keep sending events for the subscribed [rx.Observable]s.
* - The client will automatically reconnect, when the connection is broken regardless of whether you provided a single or
* multiple addresses.
* - Simple RPC calls that return data (e.g. [CordaRPCOps.networkParameters]) will **block** and return after the connection has been
* re-established and the node is up.
* - RPC calls that return [rx.Observable]s (e.g. [CordaRPCOps.vaultTrack]) will automatically reconnect and keep sending events for
* the subscribed [rx.Observable]s.
* Note: In this approach, some events might be lost during a re-connection and not sent in the subscribed [rx.Observable]s.
* - RPC calls that invoke flows (e.g. [CordaRPCOps.startFlowDynamic]) will fail during a disconnection throwing a [CouldNotStartFlowException].
* - RPC calls that invoke flows (e.g. [CordaRPCOps.startFlowDynamic]) will fail during a disconnection throwing
* a [CouldNotStartFlowException].
*
* @param hostAndPort The network address to connect to.
* @param configuration An optional configuration used to tweak client behaviour.
@ -333,7 +338,8 @@ class GracefulReconnect(val onDisconnect: () -> Unit = {}, val onReconnect: () -
* @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode.
* The client will attempt to connect to a live server by trying each address in the list. If the servers are not in
* HA mode, the client will round-robin from the beginning of the list and try all servers.
* @param classLoader a classloader, which will be used (if provided) to discover available [SerializationCustomSerializer]s and [SerializationWhitelist]s
* @param classLoader a classloader, which will be used (if provided) to discover available [SerializationCustomSerializer]s
* and [SerializationWhitelist]s
* If the created RPC client is intended to use types with custom serializers / whitelists,
* a classloader will need to be provided that contains the associated CorDapp jars.
*/

View File

@ -25,6 +25,9 @@ interface RPCConnection<out I : RPCOps> : Closeable {
* Closes this client gracefully by sending a notification to the server, so it can immediately clean up resources.
* If the server is not available this method may block for a short period until it's clear the server is not
* coming back.
*
* Note: this will also be the implementation of [close] so won't be needed when using [use] or `try-with-resources`
* blocks.
*/
fun notifyServerAndClose()
@ -37,4 +40,6 @@ interface RPCConnection<out I : RPCOps> : Closeable {
* block waiting for it to come back, which typically happens in integration tests and demos rather than production.
*/
fun forceClose()
override fun close() = notifyServerAndClose()
}

View File

@ -120,10 +120,6 @@ class RPCClient<I : RPCOps>(
override fun forceClose() {
close(false)
}
override fun close() {
close(true)
}
}
} catch (exception: Throwable) {
proxyHandler.notifyServerAndClose()

View File

@ -1,7 +1,18 @@
package net.corda.client.rpc.internal
import net.corda.client.rpc.*
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.*
import net.corda.client.rpc.ConnectionFailureException
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.MaxRpcRetryException
import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CLOSED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTING
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.DIED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.UNCONNECTED
import net.corda.client.rpc.reconnect.CouldNotStartFlowException
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.div
@ -89,7 +100,12 @@ class ReconnectingCordaRPCOps private constructor(
*
* Note that this method does not guarantee 100% that the flow will not be started twice.
*/
fun runFlowWithLogicalRetry(runFlow: (CordaRPCOps) -> StateMachineRunId, hasFlowStarted: (CordaRPCOps) -> Boolean, onFlowConfirmed: () -> Unit = {}, timeout: Duration = 4.seconds) {
fun runFlowWithLogicalRetry(
runFlow: (CordaRPCOps) -> StateMachineRunId,
hasFlowStarted: (CordaRPCOps) -> Boolean,
onFlowConfirmed: () -> Unit = {},
timeout: Duration = 4.seconds
) {
try {
runFlow(this)
onFlowConfirmed()
@ -149,7 +165,7 @@ class ReconnectingCordaRPCOps private constructor(
currentState = DIED
gracefulReconnect.onDisconnect.invoke()
//TODO - handle error cases
log.error("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
log.warn("Reconnecting to ${this.nodeHostAndPorts} due to error: ${e.message}")
log.debug("", e)
connect()
previousConnection?.forceClose()
@ -233,11 +249,6 @@ class ReconnectingCordaRPCOps private constructor(
currentState = CLOSED
currentRPCConnection?.forceClose()
}
@Synchronized
override fun close() {
currentState = CLOSED
currentRPCConnection?.close()
}
}
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler {
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
@ -267,22 +278,22 @@ class ReconnectingCordaRPCOps private constructor(
} catch (e: InvocationTargetException) {
when (e.targetException) {
is RejectedCommandException -> {
log.error("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e)
log.warn("Node is being shutdown. Operation ${method.name} rejected. Retrying when node is up...", e)
reconnectingRPCConnection.reconnectOnError(e)
}
is ConnectionFailureException -> {
log.error("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e)
log.warn("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e)
reconnectingRPCConnection.reconnectOnError(e)
checkIfIsStartFlow(method, e)
}
is RPCException -> {
log.error("Failed to perform operation ${method.name}. RPCException. Retrying....", e)
log.warn("Failed to perform operation ${method.name}. RPCException. Retrying....", e)
reconnectingRPCConnection.reconnectOnError(e)
Thread.sleep(1000) // TODO - explain why this sleep is necessary
checkIfIsStartFlow(method, e)
}
else -> {
log.error("Failed to perform operation ${method.name}. Unknown error. Retrying....", e)
log.warn("Failed to perform operation ${method.name}. Unknown error. Retrying....", e)
reconnectingRPCConnection.reconnectOnError(e)
checkIfIsStartFlow(method, e)
}

View File

@ -161,7 +161,6 @@
<ID>ComplexMethod:IRS.kt$InterestRateSwap.FloatingLeg$override fun equals(other: Any?): Boolean</ID>
<ID>ComplexMethod:InteractiveShell.kt$InteractiveShell$ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps))</ID>
<ID>ComplexMethod:InteractiveShell.kt$InteractiveShell$ private fun maybeAbbreviateGenericType(type: Type, extraRecognisedPackage: String): String</ID>
<ID>ComplexMethod:InteractiveShell.kt$InteractiveShell$@JvmStatic fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps)</ID>
<ID>ComplexMethod:InteractiveShell.kt$InteractiveShell$@JvmStatic fun runRPCFromString(input: List&lt;String&gt;, out: RenderPrintWriter, context: InvocationContext&lt;out Any&gt;, cordaRPCOps: CordaRPCOps, inputObjectMapper: ObjectMapper): Any?</ID>
<ID>ComplexMethod:Kryo.kt$ImmutableClassSerializer$override fun read(kryo: Kryo, input: Input, type: Class&lt;T&gt;): T</ID>
<ID>ComplexMethod:Kryo.kt$ImmutableClassSerializer$override fun write(kryo: Kryo, output: Output, obj: T)</ID>
@ -392,7 +391,6 @@
<ID>ForbiddenComment:InteractiveShell.kt$// TODO: Resurrect or reimplement the mail plugin.</ID>
<ID>ForbiddenComment:InteractiveShell.kt$// TODO: Review or fix the JVM commands which have bitrotted and some are useless.</ID>
<ID>ForbiddenComment:InteractiveShell.kt$InteractiveShell$// TODO: A default renderer could be used, instead of an object mapper. See: http://www.crashub.org/1.3/reference.html#_renderers</ID>
<ID>ForbiddenComment:InteractiveShell.kt$InteractiveShell$// TODO: This utility is generally useful and might be better moved to the node class, or an RPC, if we can commit to making it stable API.</ID>
<ID>ForbiddenComment:InternalUtils.kt$// TODO: Add inline back when a new Kotlin version is released and check if the java.lang.VerifyError</ID>
<ID>ForbiddenComment:InternalUtils.kt$// TODO: Currently the certificate revocation status is not handled here. Nowhere in the code the second parameter is used. Consider adding the support in the future.</ID>
<ID>ForbiddenComment:IrsDemoClientApi.kt$IRSDemoClientApi$// TODO: Add uploading of files to the HTTP API</ID>
@ -1949,7 +1947,6 @@
<ID>MaxLineLength:CordaPersistence.kt$CordaPersistence$error("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, $it")</ID>
<ID>MaxLineLength:CordaPersistence.kt$CordaPersistence$is SchemaManagementException -&gt; throw HibernateSchemaChangeException("Incompatible schema change detected. Please run the node with database.initialiseSchema=true. Reason: ${e.message}", e)</ID>
<ID>MaxLineLength:CordaPersistence.kt$CordaPersistence$val transaction = contextDatabase.currentOrNew(isolationLevel) // XXX: Does this code really support statement changing the contextDatabase?</ID>
<ID>MaxLineLength:CordaRPCClient.kt$CordaRPCClient</ID>
<ID>MaxLineLength:CordaRPCClientReconnectionTest.kt$CordaRPCClientReconnectionTest$val addresses = listOf(NetworkHostAndPort("localhost", portAllocator.nextPort()), NetworkHostAndPort("localhost", portAllocator.nextPort()))</ID>
<ID>MaxLineLength:CordaRPCClientTest.kt$CordaRPCClientTest$node.services.startFlow(CashIssueFlow(100.POUNDS, OpaqueBytes.of(1), identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow()</ID>
<ID>MaxLineLength:CordaRPCClientTest.kt$CordaRPCClientTest$node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0), identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow()</ID>
@ -1965,11 +1962,6 @@
<ID>MaxLineLength:CordaRPCOps.kt$CordaRPCOps$fun &lt;T : ContractState&gt; vaultTrackByWithSorting(contractStateType: Class&lt;out T&gt;, criteria: QueryCriteria, sorting: Sort): DataFeed&lt;Vault.Page&lt;T&gt;, Vault.Update&lt;T&gt;&gt;</ID>
<ID>MaxLineLength:CordaRPCOps.kt$StateMachineInfo$return copy(id = id, flowLogicClassName = flowLogicClassName, initiator = initiator, progressTrackerStepAndUpdates = progressTrackerStepAndUpdates, invocationContext = invocationContext)</ID>
<ID>MaxLineLength:CordaRPCOps.kt$sorting: Sort = Sort(emptySet())</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$is ConnectException -&gt; throw CordaRuntimeException("There is connection problem to network map. The possible causes are incorrect configuration or network map service being down")</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$override</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction?</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)</ID>
<ID>MaxLineLength:CordaRPCOpsImpl.kt$CordaRPCOpsImpl${ error -&gt; logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error) }</ID>
<ID>MaxLineLength:CordaRPCOpsImplTest.kt$CordaRPCOpsImplTest$assertThatCode { rpc.startFlow(::SoftLock, cash.ref, Duration.ofSeconds(1)).returnValue.getOrThrow() }.doesNotThrowAnyException()</ID>
<ID>MaxLineLength:CordaRPCOpsImplTest.kt$CordaRPCOpsImplTest$val cash = rpc.startFlow(::CashIssueFlow, 10.DOLLARS, issuerRef, notary).returnValue.getOrThrow().stx.tx.outRefsOfType&lt;Cash.State&gt;().single()</ID>
<ID>MaxLineLength:CordaSSHAuthInfo.kt$CordaSSHAuthInfo : AuthInfo</ID>
@ -3151,7 +3143,6 @@
<ID>MaxLineLength:ReceiveTransactionFlow.kt$ReceiveStateAndRefFlow&lt;out T : ContractState&gt; : FlowLogic</ID>
<ID>MaxLineLength:ReceiveTransactionFlow.kt$ReceiveTransactionFlow : FlowLogic</ID>
<ID>MaxLineLength:ReceiveTransactionFlow.kt$ReceiveTransactionFlow$private val statesToRecord: StatesToRecord = StatesToRecord.NONE</ID>
<ID>MaxLineLength:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps$ fun runFlowWithLogicalRetry(runFlow: (CordaRPCOps) -&gt; StateMachineRunId, hasFlowStarted: (CordaRPCOps) -&gt; Boolean, onFlowConfirmed: () -&gt; Unit = {}, timeout: Duration = 4.seconds)</ID>
<ID>MaxLineLength:ReferenceInputStateTests.kt$ReferenceStateTests$output(ExampleContract::class.java.typeName, "UPDATED REF DATA", "REF DATA".output&lt;ExampleState&gt;().copy(data = "NEW STUFF!"))</ID>
<ID>MaxLineLength:ReferenceInputStateTests.kt$ReferenceStateTests$val stateAndRef = StateAndRef(TransactionState(state, CONTRACT_ID, DUMMY_NOTARY, constraint = AlwaysAcceptAttachmentConstraint), StateRef(SecureHash.zeroHash, 0))</ID>
<ID>MaxLineLength:ReferencedStatesFlowTests.kt$ReferencedStatesFlowTests$assertEquals(2, nodes[2].services.vaultService.queryBy&lt;LinearState&gt;(QueryCriteria.VaultQueryCriteria(status = Vault.StateStatus.ALL)).states.size)</ID>
@ -3803,7 +3794,6 @@
<ID>NestedBlockDepth:FetchDataFlow.kt$FetchAttachmentsFlow$override fun maybeWriteToDisk(downloaded: List&lt;Attachment&gt;)</ID>
<ID>NestedBlockDepth:HibernateQueryCriteriaParser.kt$HibernateQueryCriteriaParser$override fun parseCriteria(criteria: CommonQueryCriteria): Collection&lt;Predicate&gt;</ID>
<ID>NestedBlockDepth:InteractiveShell.kt$InteractiveShell$ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps))</ID>
<ID>NestedBlockDepth:InteractiveShell.kt$InteractiveShell$@JvmStatic fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps)</ID>
<ID>NestedBlockDepth:InternalUtils.kt$ inline fun &lt;T&gt; Iterable&lt;T&gt;.noneOrSingle(predicate: (T) -&gt; Boolean): T?</ID>
<ID>NestedBlockDepth:JarSignatureTestUtils.kt$JarSignatureTestUtils$fun Path.addManifest(fileName: String, vararg entries: Pair&lt;Attributes.Name, String&gt;)</ID>
<ID>NestedBlockDepth:Main.kt$Node$fun avalancheLoop()</ID>
@ -4575,9 +4565,6 @@
<ID>WildcardImport:CordaRPCClientTest.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:CordaRPCClientTest.kt$import net.corda.testing.core.*</ID>
<ID>WildcardImport:CordaRPCOps.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:CordaRPCOpsImpl.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:CordaRPCOpsImpl.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:CordaRPCOpsImpl.kt$import net.corda.core.node.services.vault.*</ID>
<ID>WildcardImport:CordaRPCOpsImplTest.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:CordaRPCOpsImplTest.kt$import org.assertj.core.api.Assertions.*</ID>
<ID>WildcardImport:CordaServiceTest.kt$import kotlin.test.*</ID>
@ -4723,9 +4710,6 @@
<ID>WildcardImport:InputStreamSerializer.kt$import net.corda.serialization.internal.amqp.*</ID>
<ID>WildcardImport:InstallFactory.kt$import tornadofx.*</ID>
<ID>WildcardImport:InstallShellExtensionsParser.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:InteractiveShell.kt$import java.lang.reflect.*</ID>
<ID>WildcardImport:InteractiveShell.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:InteractiveShell.kt$import net.corda.core.messaging.*</ID>
<ID>WildcardImport:InteractiveShellIntegrationTest.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:InteractiveShellIntegrationTest.kt$import net.corda.core.flows.*</ID>
<ID>WildcardImport:InterestRatesSwapDemoAPI.kt$import org.springframework.web.bind.annotation.*</ID>
@ -4977,8 +4961,6 @@
<ID>WildcardImport:ReceiveFinalityFlowTest.kt$import net.corda.node.services.statemachine.StaffedFlowHospital.*</ID>
<ID>WildcardImport:ReceiveFinalityFlowTest.kt$import net.corda.testing.node.internal.*</ID>
<ID>WildcardImport:ReceiveTransactionFlow.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:ReconnectingCordaRPCOps.kt$import net.corda.client.rpc.*</ID>
<ID>WildcardImport:ReconnectingCordaRPCOps.kt$import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.*</ID>
<ID>WildcardImport:ReferenceInputStateTests.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:ReferencedStatesFlowTests.kt$import net.corda.core.contracts.*</ID>
<ID>WildcardImport:ReferencedStatesFlowTests.kt$import net.corda.core.flows.*</ID>

View File

@ -17,16 +17,34 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.AttachmentTrustInfo
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.RPC_UPLOADER
import net.corda.core.internal.STRUCTURAL_STEP_PREFIX
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.messaging.*
import net.corda.core.internal.sign
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowHandleImpl
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.FlowProgressHandleImpl
import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.messaging.RPCReturnsObservables
import net.corda.core.messaging.StateMachineInfo
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.pendingFlowsCount
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeDiagnosticInfo
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.*
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
@ -120,14 +138,16 @@ internal class CordaRPCOpsImpl(
return services.vaultService._trackBy(criteria, paging, sorting, contractStateType)
}
@Suppress("OverridingDeprecatedMember")
@Suppress("OverridingDeprecatedMember", "DEPRECATION")
override fun internalVerifiedTransactionsSnapshot(): List<SignedTransaction> {
val (snapshot, updates) = @Suppress("DEPRECATION") internalVerifiedTransactionsFeed()
val (snapshot, updates) = internalVerifiedTransactionsFeed()
updates.notUsed()
return snapshot
}
override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction? = services.validatedTransactions.getTransaction(txnId)
@Suppress("OverridingDeprecatedMember")
override fun internalFindVerifiedTransaction(txnId: SecureHash): SignedTransaction? =
services.validatedTransactions.getTransaction(txnId)
@Suppress("OverridingDeprecatedMember")
override fun internalVerifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction> {
@ -164,7 +184,8 @@ internal class CordaRPCOpsImpl(
return snapshot
}
override fun stateMachineRecordedTransactionMappingFeed(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
override fun stateMachineRecordedTransactionMappingFeed():
DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
return services.stateMachineRecordedTransactionMapping.track()
}
@ -292,7 +313,8 @@ internal class CordaRPCOpsImpl(
services.networkMapUpdater.updateNetworkMapCache()
} catch (e: Exception) {
when (e) {
is ConnectException -> throw CordaRuntimeException("There is connection problem to network map. The possible causes are incorrect configuration or network map service being down")
is ConnectException -> throw CordaRuntimeException("There is connection problem to network map. The possible causes " +
"are incorrect configuration or network map service being down")
else -> throw e
}
}
@ -302,15 +324,26 @@ internal class CordaRPCOpsImpl(
return vaultQueryBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultQueryByCriteria(criteria: QueryCriteria, contractStateType: Class<out T>): Vault.Page<T> {
override fun <T : ContractState> vaultQueryByCriteria(
criteria: QueryCriteria,
contractStateType: Class<out T>
): Vault.Page<T> {
return vaultQueryBy(criteria, PageSpecification(), Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultQueryByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): Vault.Page<T> {
override fun <T : ContractState> vaultQueryByWithPagingSpec(
contractStateType: Class<out T>,
criteria: QueryCriteria,
paging: PageSpecification
): Vault.Page<T> {
return vaultQueryBy(criteria, paging, Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultQueryByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): Vault.Page<T> {
override fun <T : ContractState> vaultQueryByWithSorting(
contractStateType: Class<out T>,
criteria: QueryCriteria,
sorting: Sort
): Vault.Page<T> {
return vaultQueryBy(criteria, PageSpecification(), sorting, contractStateType)
}
@ -318,15 +351,26 @@ internal class CordaRPCOpsImpl(
return vaultTrackBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultTrackByCriteria(contractStateType: Class<out T>, criteria: QueryCriteria): DataFeed<Vault.Page<T>, Vault.Update<T>> {
override fun <T : ContractState> vaultTrackByCriteria(
contractStateType: Class<out T>,
criteria: QueryCriteria
): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(criteria, PageSpecification(), Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultTrackByWithPagingSpec(contractStateType: Class<out T>, criteria: QueryCriteria, paging: PageSpecification): DataFeed<Vault.Page<T>, Vault.Update<T>> {
override fun <T : ContractState> vaultTrackByWithPagingSpec(
contractStateType: Class<out T>,
criteria: QueryCriteria,
paging: PageSpecification
): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(criteria, paging, Sort(emptySet()), contractStateType)
}
override fun <T : ContractState> vaultTrackByWithSorting(contractStateType: Class<out T>, criteria: QueryCriteria, sorting: Sort): DataFeed<Vault.Page<T>, Vault.Update<T>> {
override fun <T : ContractState> vaultTrackByWithSorting(
contractStateType: Class<out T>,
criteria: QueryCriteria,
sorting: Sort
): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return vaultTrackBy(criteria, PageSpecification(), sorting, contractStateType)
}
@ -349,7 +393,10 @@ internal class CordaRPCOpsImpl(
.doOnCompleted(shutdownNode::invoke)
.subscribe(
{ }, // Nothing to do on each update here, only completion matters.
{ error -> logger.error("Error while waiting for pending flows to drain in preparation for shutdown. Cause was: ${error.message}", error) }
{ error ->
logger.error("Error while waiting for pending flows to drain in preparation for shutdown. " +
"Cause was: ${error.message}", error)
}
)
drainingShutdownHook.set(subscription)
} else {
@ -375,7 +422,13 @@ internal class CordaRPCOpsImpl(
}
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)
return StateMachineInfo(
flowLogic.runId,
flowLogic.javaClass.name,
flowLogic.stateMachine.context.toFlowInitiator(),
flowLogic.track(),
flowLogic.stateMachine.context
)
}
private fun stateMachineUpdateFromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate {

View File

@ -11,6 +11,7 @@ import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.StringToMethodCallParser
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.notUsed
@ -19,11 +20,21 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.*
import net.corda.core.internal.Emoji
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.messaging.*
import net.corda.core.internal.packageName_
import net.corda.core.internal.rootCause
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.pendingFlowsCount
import net.corda.tools.shell.utlities.ANSIProgressRenderer
import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer
import org.crsh.command.InvocationContext
@ -52,12 +63,17 @@ import java.io.FileDescriptor
import java.io.FileInputStream
import java.io.InputStream
import java.io.PrintWriter
import java.lang.reflect.*
import java.lang.reflect.GenericArrayType
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.lang.reflect.UndeclaredThrowableException
import java.nio.file.Path
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import kotlin.collections.ArrayList
import kotlin.concurrent.thread
// TODO: Add command history.
@ -76,7 +92,7 @@ object InteractiveShell {
private val log = LoggerFactory.getLogger(javaClass)
private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps
private lateinit var ops: InternalCordaRPCOps
private lateinit var rpcConn: AutoCloseable
private lateinit var rpcConn: CordaRPCConnection
private var shell: Shell? = null
private var classLoader: ClassLoader? = null
private lateinit var shellConfiguration: ShellConfiguration
@ -131,13 +147,41 @@ object InteractiveShell {
}
}
ExternalResolver.INSTANCE.addCommand("output-format", "Commands to inspect and update the output format.", OutputFormatCommand::class.java)
ExternalResolver.INSTANCE.addCommand("run", "Runs a method from the CordaRPCOps interface on the node.", RunShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand("flow", "Commands to work with flows. Flows are how you can change the ledger.", FlowShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand("start", "An alias for 'flow start'", StartShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand("hashLookup", "Checks if a transaction with matching Id hash exists.", HashLookupShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand("attachments", "Commands to extract information about attachments stored within the node", AttachmentShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand("checkpoints", "Commands to extract information about checkpoints stored within the node", CheckpointShellCommand::class.java)
ExternalResolver.INSTANCE.addCommand(
"output-format",
"Commands to inspect and update the output format.",
OutputFormatCommand::class.java
)
ExternalResolver.INSTANCE.addCommand(
"run",
"Runs a method from the CordaRPCOps interface on the node.",
RunShellCommand::class.java
)
ExternalResolver.INSTANCE.addCommand(
"flow",
"Commands to work with flows. Flows are how you can change the ledger.",
FlowShellCommand::class.java
)
ExternalResolver.INSTANCE.addCommand(
"start",
"An alias for 'flow start'",
StartShellCommand::class.java
)
ExternalResolver.INSTANCE.addCommand(
"hashLookup",
"Checks if a transaction with matching Id hash exists.",
HashLookupShellCommand::class.java
)
ExternalResolver.INSTANCE.addCommand(
"attachments",
"Commands to extract information about attachments stored within the node",
AttachmentShellCommand::class.java
)
ExternalResolver.INSTANCE.addCommand(
"checkpoints",
"Commands to extract information about checkpoints stored within the node",
CheckpointShellCommand::class.java
)
shell = ShellLifecycle(configuration.commandsDirectory).start(config, configuration.user, configuration.password, runSshDaemon)
}
@ -294,7 +338,12 @@ object InteractiveShell {
try {
// Show the progress tracker on the console until the flow completes or is interrupted with a
// Ctrl-C keypress.
val stateObservable = runFlowFromString({ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) }, inputData, flowClazz, inputObjectMapper)
val stateObservable = runFlowFromString(
{ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) },
inputData,
flowClazz,
inputObjectMapper
)
latch = CountDownLatch(1)
ansiProgressRenderer.render(stateObservable, latch::countDown)
@ -327,7 +376,8 @@ object InteractiveShell {
}
class NoApplicableConstructor(val errors: List<String>) : CordaException(this.toString()) {
override fun toString() = (listOf("No applicable constructor for flow. Problems were:") + errors).joinToString(System.lineSeparator())
override fun toString() =
(listOf("No applicable constructor for flow. Problems were:") + errors).joinToString(System.lineSeparator())
}
/**
@ -386,7 +436,6 @@ object InteractiveShell {
}
}
// TODO: This utility is generally useful and might be better moved to the node class, or an RPC, if we can commit to making it stable API.
/**
* Given a [FlowLogic] class and a string in one-line Yaml form, finds an applicable constructor and starts
* the flow, returning the created flow logic. Useful for lightweight invocation where text is preferable
@ -515,7 +564,7 @@ object InteractiveShell {
out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow)
return null
} else if (cmd.substringAfter(" ").trim().equals("gracefulShutdown", ignoreCase = true)) {
return InteractiveShell.gracefulShutdown(out, cordaRPCOps)
return gracefulShutdown(out, cordaRPCOps)
}
var result: Any? = null
@ -525,7 +574,7 @@ object InteractiveShell {
val call = parser.parse(cordaRPCOps, cmd)
result = call.call()
var subscription : Subscriber<*>? = null
if (result != null && result !== kotlin.Unit && result !is Void) {
if (result != null && result !== Unit && result !is Void) {
val (subs, future) = printAndFollowRPCResponse(result, out, outputFormat)
subscription = subs
result = future
@ -568,60 +617,47 @@ object InteractiveShell {
userSessionOut.flush()
}
var isShuttingDown = false
try {
display { println("Orchestrating a clean shutdown, press CTRL+C to cancel...") }
isShuttingDown = true
display {
println("Orchestrating a clean shutdown, press CTRL+C to cancel...")
println("...enabling draining mode")
println("...waiting for in-flight flows to be completed")
}
cordaRPCOps.terminate(true)
val latch = CountDownLatch(1)
@Suppress("DEPRECATION")
cordaRPCOps.pendingFlowsCount().updates.doOnError { error ->
log.error(error.message)
throw error
}.doAfterTerminate(latch::countDown).subscribe(
// For each update.
{ (first, second) -> display { println("...remaining: $first / $second") } },
// On error.
{ error ->
if (!isShuttingDown) {
display { println("RPC failed: ${error.rootCause}", Color.red) }
}
},
// When completed.
{
rpcConn.close()
// This will only show up in the standalone Shell, because the embedded one is killed as part of a node's shutdown.
display { println("...done, quitting the shell now.") }
onExit.invoke()
})
while (!Thread.currentThread().isInterrupted) {
try {
latch.await()
break
} catch (e: InterruptedException) {
try {
cordaRPCOps.setFlowsDrainingModeEnabled(false)
display { println("...cancelled clean shutdown.") }
} finally {
Thread.currentThread().interrupt()
break
}
}
}
} catch (e: StringToMethodCallParser.UnparseableCallException) {
display {
println(e.message, Color.red)
println("Please try 'man run' to learn what syntax is acceptable")
val subscription = cordaRPCOps.pendingFlowsCount().updates
.doAfterTerminate(latch::countDown)
.subscribe(
// For each update.
{ (completed, total) -> display { println("...remaining: $completed / $total") } },
// On error.
{
log.error(it.message)
throw it
},
// When completed.
{
// This will only show up in the standalone Shell, because the embedded one
// is killed as part of a node's shutdown.
display { println("...done, quitting the shell now.") }
}
)
cordaRPCOps.terminate(true)
try {
latch.await()
// Unsubscribe or we hold up the shutdown
subscription.unsubscribe()
rpcConn.forceClose()
onExit.invoke()
} catch (e: InterruptedException) {
// Cancelled whilst draining flows. So let's carry on from here
cordaRPCOps.setFlowsDrainingModeEnabled(false)
display { println("...cancelled clean shutdown.") }
}
} catch (e: Exception) {
if (!isShuttingDown) {
display { println("RPC failed: ${e.rootCause}", Color.red) }
}
display { println("RPC failed: ${e.rootCause}", Color.red) }
} finally {
InputStreamSerializer.invokeContext = null
InputStreamDeserializer.closeAll()