From dc595792e78e41b3a971e83f73bcc8409b53fa4d Mon Sep 17 00:00:00 2001 From: bpaunescu Date: Fri, 16 Mar 2018 17:14:25 +0000 Subject: [PATCH 1/5] github issue #2630: missing rpc info at node startup (#2839) --- node/src/main/kotlin/net/corda/node/internal/Node.kt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 8fb3e583d5..4879d5868b 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -173,6 +173,8 @@ open class Node(configuration: NodeConfiguration, printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString()) rpcServerAddresses?.let { rpcMessagingClient = RPCMessagingClient(configuration.rpcOptions.sslConfig, it.admin, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE) + printBasicNodeInfo("RPC connection address", it.primary.toString()) + printBasicNodeInfo("RPC admin connection address", it.admin.toString()) } verifierMessagingClient = when (configuration.verifierType) { VerifierType.OutOfProcess -> throw IllegalArgumentException("OutOfProcess verifier not supported") //VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE) From 327d7d8acfb89fc0d6cc716f018d8d49dba8635f Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Mon, 19 Mar 2018 12:47:23 +0000 Subject: [PATCH 2/5] Added sign helper method to CertificateAndKeyPair for producing SignedDataWithCert objects (#2841) --- .../net/corda/core/internal/InternalUtils.kt | 12 +++++++++--- .../corda/nodeapi/internal/crypto/X509Utilities.kt | 9 ++++----- .../corda/nodeapi/internal/network/NetworkMap.kt | 10 ++++++++++ .../internal/network/NetworkParametersCopier.kt | 11 +++++------ .../node/services/network/NetworkMapUpdaterTest.kt | 4 ++-- .../node/internal/network/NetworkMapServer.kt | 14 +++++++------- 6 files changed, 37 insertions(+), 23 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index 73e6d7819d..97ef208514 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -378,10 +378,16 @@ val CordaX500Name.x500Name: X500Name val CordaX500Name.Companion.unspecifiedCountry get() = "ZZ" -fun T.signWithCert(privateKey: PrivateKey, certificate: X509Certificate): SignedDataWithCert { +inline fun T.signWithCert(signer: (SerializedBytes) -> DigitalSignatureWithCert): SignedDataWithCert { val serialised = serialize() - val signature = Crypto.doSign(privateKey, serialised.bytes) - return SignedDataWithCert(serialised, DigitalSignatureWithCert(certificate, signature)) + return SignedDataWithCert(serialised, signer(serialised)) +} + +fun T.signWithCert(privateKey: PrivateKey, certificate: X509Certificate): SignedDataWithCert { + return signWithCert { + val signature = Crypto.doSign(privateKey, it.bytes) + DigitalSignatureWithCert(certificate, signature) + } } inline fun SerializedBytes.sign(signer: (SerializedBytes) -> DigitalSignature.WithKey): SignedData { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt index b23a8fc861..4d93816756 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/crypto/X509Utilities.kt @@ -4,10 +4,7 @@ import net.corda.core.CordaOID import net.corda.core.crypto.Crypto import net.corda.core.crypto.SignatureScheme import net.corda.core.crypto.random63BitValue -import net.corda.core.internal.CertRole -import net.corda.core.internal.reader -import net.corda.core.internal.uncheckedCast -import net.corda.core.internal.writer +import net.corda.core.internal.* import net.corda.core.utilities.days import net.corda.core.utilities.millis import org.bouncycastle.asn1.* @@ -415,4 +412,6 @@ enum class CertificateType(val keyUsage: KeyUsage, vararg val purposes: KeyPurpo ) } -data class CertificateAndKeyPair(val certificate: X509Certificate, val keyPair: KeyPair) +data class CertificateAndKeyPair(val certificate: X509Certificate, val keyPair: KeyPair) { + fun sign(obj: T): SignedDataWithCert = obj.signWithCert(keyPair.private, certificate) +} diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkMap.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkMap.kt index 2e652d9279..0433aed3f2 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkMap.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkMap.kt @@ -2,10 +2,13 @@ package net.corda.nodeapi.internal.network import net.corda.core.crypto.SecureHash import net.corda.core.internal.CertRole +import net.corda.core.internal.DigitalSignatureWithCert import net.corda.core.internal.SignedDataWithCert +import net.corda.core.internal.signWithCert import net.corda.core.node.NetworkParameters import net.corda.core.node.NodeInfo import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.SerializedBytes import net.corda.nodeapi.internal.crypto.X509Utilities import java.security.cert.X509Certificate import java.time.Instant @@ -53,3 +56,10 @@ fun SignedDataWithCert.verifiedNetworkMapCert(rootCert: X509Certifi X509Utilities.validateCertificateChain(rootCert, sig.by, rootCert) return verified() } + +class NetworkMapAndSigned private constructor(val networkMap: NetworkMap, val signed: SignedNetworkMap) { + constructor(networkMap: NetworkMap, signer: (SerializedBytes) -> DigitalSignatureWithCert) : this(networkMap, networkMap.signWithCert(signer)) + constructor(signed: SignedNetworkMap) : this(signed.verified(), signed) + operator fun component1(): NetworkMap = networkMap + operator fun component2(): SignedNetworkMap = signed +} diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkParametersCopier.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkParametersCopier.kt index 18376251a7..df2e325605 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkParametersCopier.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkParametersCopier.kt @@ -1,6 +1,8 @@ package net.corda.nodeapi.internal.network -import net.corda.core.internal.* +import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.copyTo +import net.corda.core.internal.div import net.corda.core.node.NetworkParameters import net.corda.core.serialization.serialize import net.corda.nodeapi.internal.createDevNetworkMapCa @@ -11,16 +13,13 @@ import java.nio.file.StandardCopyOption class NetworkParametersCopier( networkParameters: NetworkParameters, - networkMapCa: CertificateAndKeyPair = createDevNetworkMapCa(), + signingCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa(), overwriteFile: Boolean = false, @VisibleForTesting val update: Boolean = false ) { private val copyOptions = if (overwriteFile) arrayOf(StandardCopyOption.REPLACE_EXISTING) else emptyArray() - private val serialisedSignedNetParams = networkParameters.signWithCert( - networkMapCa.keyPair.private, - networkMapCa.certificate - ).serialize() + private val serialisedSignedNetParams = signingCertAndKeyPair.sign(networkParameters).serialize() fun install(nodeDir: Path) { val fileName = if (update) NETWORK_PARAMS_UPDATE_FILE_NAME else NETWORK_PARAMS_FILE_NAME diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt index 2e77979c24..2f6535a74f 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt @@ -49,7 +49,7 @@ class NetworkMapUpdaterTest { private val networkMapCache = createMockNetworkMapCache() private val nodeInfoMap = ConcurrentHashMap() private val networkParamsMap = HashMap() - private val networkMapCa: CertificateAndKeyPair = createDevNetworkMapCa() + private val networkMapCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa() private val cacheExpiryMs = 100 private val networkMapClient = createMockNetworkMapClient() private val scheduler = TestScheduler() @@ -254,7 +254,7 @@ class NetworkMapUpdaterTest { } on { getNetworkParameters(any()) }.then { val paramsHash: SecureHash = uncheckedCast(it.arguments[0]) - networkParamsMap[paramsHash]?.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate) + networkParamsMap[paramsHash]?.let { networkMapCertAndKeyPair.sign(it) } } on { ackNetworkParametersUpdate(any()) }.then { Unit diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt index 4d0e952ce2..763cce2bf3 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt @@ -35,7 +35,7 @@ import javax.ws.rs.core.Response.status class NetworkMapServer(private val cacheTimeout: Duration, hostAndPort: NetworkHostAndPort, - private val networkMapCa: CertificateAndKeyPair = createDevNetworkMapCa(), + private val networkMapCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa(), private val myHostNameValue: String = "test.host.name", vararg additionalServices: Any) : Closeable { companion object { @@ -108,9 +108,7 @@ class NetworkMapServer(private val cacheTimeout: Duration, inner class InMemoryNetworkMapService { private val nodeInfoMap = mutableMapOf() val latestAcceptedParametersMap = mutableMapOf() - private val signedNetParams by lazy { - networkParameters.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate) - } + private val signedNetParams by lazy { networkMapCertAndKeyPair.sign(networkParameters) } @POST @Path("publish") @@ -143,7 +141,7 @@ class NetworkMapServer(private val cacheTimeout: Duration, @Produces(MediaType.APPLICATION_OCTET_STREAM) fun getNetworkMap(): Response { val networkMap = NetworkMap(nodeInfoMap.keys.toList(), signedNetParams.raw.hash, parametersUpdate) - val signedNetworkMap = networkMap.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate) + val signedNetworkMap = networkMapCertAndKeyPair.sign(networkMap) return Response.ok(signedNetworkMap.serialize().bytes).header("Cache-Control", "max-age=${cacheTimeout.seconds}").build() } @@ -172,8 +170,10 @@ class NetworkMapServer(private val cacheTimeout: Duration, val requestedParameters = if (requestedHash == signedNetParams.raw.hash) { signedNetParams } else if (requestedHash == nextNetworkParameters?.serialize()?.hash) { - nextNetworkParameters?.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate) - } else null + nextNetworkParameters?.let { networkMapCertAndKeyPair.sign(it) } + } else { + null + } requireNotNull(requestedParameters) return Response.ok(requestedParameters!!.serialize().bytes).build() } From c964e50696a9bfe4ea2c229d1d04f7e5d38c99f3 Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Mon, 19 Mar 2018 13:19:40 +0000 Subject: [PATCH 3/5] Trying to improve stability of `RPCStabilityTests.client reconnects to rebooted server` (#2842) Prior to this change it was intermittently failing with: ``` net.corda.client.rpc.RPCException: RPC server is not available. at net.corda.client.rpc.internal.RPCClientProxyHandler.invoke(RPCClientProxyHandler.kt:222) at com.sun.proxy.$Proxy79.ping(Unknown Source) at net.corda.client.rpc.RPCStabilityTests$client reconnects to rebooted server$1$pingFuture$1.invoke(RPCStabilityTests.kt:251) at net.corda.client.rpc.RPCStabilityTests$client reconnects to rebooted server$1$pingFuture$1.invoke(RPCStabilityTests.kt:36) at net.corda.core.internal.concurrent.ValueOrException$DefaultImpls.capture(CordaFutureImpl.kt:107) at net.corda.core.internal.concurrent.OpenFuture$DefaultImpls.capture(CordaFutureImpl.kt:65535) at net.corda.core.internal.concurrent.CordaFutureImpl.capture(CordaFutureImpl.kt:119) at net.corda.core.internal.concurrent.CordaFutureImplKt$fork$$inlined$also$lambda$1.run(CordaFutureImpl.kt:22) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) ``` --- .../net/corda/client/rpc/internal/RPCClientProxyHandler.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 6964fe493d..56e472d107 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -173,6 +173,7 @@ class RPCClientProxyHandler( private val deduplicationSequenceNumber = AtomicLong(0) private val lock = ReentrantReadWriteLock() + @Volatile private var sendingEnabled = true /** From 7a077e76f0cdf457cd40033fb0da594805a5951e Mon Sep 17 00:00:00 2001 From: Michele Sollecito Date: Mon, 19 Mar 2018 14:20:10 +0000 Subject: [PATCH 4/5] CORDA-1099: Orchestrated clean shutdown from Shell (#2831) --- .ci/api-current.txt | 103 ++++++- .../client/jfx/model/NodeMonitorModel.kt | 6 +- .../corda/client/rpc/CordaRPCClientTest.kt | 77 +++++- .../net/corda/client/rpc/RPCStabilityTests.kt | 10 +- .../net/corda/client/rpc/CordaRPCClient.kt | 63 +++-- .../rpc/internal/CordaRPCClientUtils.kt | 19 +- .../corda/client/rpc/internal/RPCClient.kt | 51 ++-- .../rpc/internal/RPCClientProxyHandler.kt | 7 +- .../net/corda/client/rpc/AbstractRPCTest.kt | 4 +- .../corda/client/rpc/RPCConcurrencyTests.kt | 4 +- .../corda/client/rpc/RPCPerformanceTests.kt | 13 +- .../net/corda/core/messaging/CordaRPCOps.kt | 39 +++ .../core/flows/ContractUpgradeFlowTest.kt | 2 +- docs/source/changelog.rst | 2 + docs/source/shell.rst | 9 + .../main/kotlin/net/corda/behave/node/Node.kt | 6 +- .../draining/P2PFlowsDrainingModeTest.kt | 61 +++-- .../net/corda/node/internal/AbstractNode.kt | 7 +- .../corda/node/internal/CordaRPCOpsImpl.kt | 7 +- .../node/internal/RpcAuthorisationProxy.kt | 2 + .../corda/node/internal/SecureCordaRPCOps.kt | 3 +- .../net/corda/node/CordaRPCOpsImplTest.kt | 2 +- .../node/internal/InternalTestUtils.kt | 6 +- .../corda/testing/node/internal/RPCDriver.kt | 10 +- .../corda/tools/shell/RunShellCommand.java | 47 +++- .../tools/shell/CordaAuthenticationPlugin.kt | 2 +- .../net/corda/tools/shell/CordaSSHAuthInfo.kt | 2 +- .../net/corda/tools/shell/InteractiveShell.kt | 253 +++++++----------- .../tools/shell/InteractiveShellCommand.kt | 1 + .../corda/tools/shell/RPCOpsWithContext.kt | 3 +- .../corda/tools/shell/SSHDConfiguration.kt | 27 ++ .../corda/tools/shell/SerializationSupport.kt | 103 +++++++ .../corda/tools/shell/ShellConfiguration.kt | 28 ++ .../net/corda/tools/shell/StandaloneShell.kt | 1 + .../tools/shell/CustomTypeJsonParsingTests.kt | 4 +- 35 files changed, 680 insertions(+), 304 deletions(-) create mode 100644 tools/shell/src/main/kotlin/net/corda/tools/shell/SSHDConfiguration.kt create mode 100644 tools/shell/src/main/kotlin/net/corda/tools/shell/SerializationSupport.kt create mode 100644 tools/shell/src/main/kotlin/net/corda/tools/shell/ShellConfiguration.kt diff --git a/.ci/api-current.txt b/.ci/api-current.txt index bf524bd54a..9b13ec80c8 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -658,10 +658,25 @@ public static final class net.corda.core.contracts.UniqueIdentifier$Companion ex @org.jetbrains.annotations.NotNull public abstract List getServiceFlows() @org.jetbrains.annotations.NotNull public abstract List getServices() ## +@net.corda.core.DoNotImplement public interface net.corda.core.cordapp.CordappConfig + public abstract boolean exists(String) + @org.jetbrains.annotations.NotNull public abstract Object get(String) + public abstract boolean getBoolean(String) + public abstract double getDouble(String) + public abstract float getFloat(String) + public abstract int getInt(String) + public abstract long getLong(String) + @org.jetbrains.annotations.NotNull public abstract Number getNumber(String) + @org.jetbrains.annotations.NotNull public abstract String getString(String) +## +public final class net.corda.core.cordapp.CordappConfigException extends java.lang.Exception + public (String, Throwable) +## public final class net.corda.core.cordapp.CordappContext extends java.lang.Object public (net.corda.core.cordapp.Cordapp, net.corda.core.crypto.SecureHash, ClassLoader, net.corda.core.cordapp.CordappConfig) @org.jetbrains.annotations.Nullable public final net.corda.core.crypto.SecureHash getAttachmentId() @org.jetbrains.annotations.NotNull public final ClassLoader getClassLoader() + @org.jetbrains.annotations.NotNull public final net.corda.core.cordapp.CordappConfig getConfig() @org.jetbrains.annotations.NotNull public final net.corda.core.cordapp.Cordapp getCordapp() ## @net.corda.core.DoNotImplement public interface net.corda.core.cordapp.CordappProvider @@ -963,6 +978,8 @@ public static final class net.corda.core.crypto.PartialMerkleTree$Companion exte @kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public static final net.corda.core.crypto.SecureHash$SHA256 sha256Twice(byte[]) @org.jetbrains.annotations.NotNull public String toString() public static final net.corda.core.crypto.SecureHash$Companion Companion + @org.jetbrains.annotations.NotNull public static final net.corda.core.crypto.SecureHash$SHA256 allOnesHash + @org.jetbrains.annotations.NotNull public static final net.corda.core.crypto.SecureHash$SHA256 zeroHash ## public static final class net.corda.core.crypto.SecureHash$Companion extends java.lang.Object @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash$SHA256 getAllOnesHash() @@ -1368,6 +1385,15 @@ public static final class net.corda.core.flows.NotarisationRequest$Companion ext public int hashCode() public String toString() ## +@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.NotarisationResponse extends java.lang.Object + public (List) + @org.jetbrains.annotations.NotNull public final List component1() + @org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotarisationResponse copy(List) + public boolean equals(Object) + @org.jetbrains.annotations.NotNull public final List getSignatures() + public int hashCode() + public String toString() +## @net.corda.core.flows.InitiatingFlow public final class net.corda.core.flows.NotaryChangeFlow extends net.corda.core.flows.AbstractStateReplacementFlow$Instigator public (net.corda.core.contracts.StateAndRef, net.corda.core.identity.Party, net.corda.core.utilities.ProgressTracker) @org.jetbrains.annotations.NotNull protected net.corda.core.flows.AbstractStateReplacementFlow$UpgradeTx assembleTx() @@ -1375,10 +1401,12 @@ public static final class net.corda.core.flows.NotarisationRequest$Companion ext @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.flows.NotaryError extends java.lang.Object ## @net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$Conflict extends net.corda.core.flows.NotaryError + public (net.corda.core.crypto.SecureHash, Map) @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash component1() @org.jetbrains.annotations.NotNull public final Map component2() @org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotaryError$Conflict copy(net.corda.core.crypto.SecureHash, Map) public boolean equals(Object) + @org.jetbrains.annotations.NotNull public final Map getConsumedStates() @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getTxId() public int hashCode() @org.jetbrains.annotations.NotNull public String toString() @@ -1431,6 +1459,7 @@ public static final class net.corda.core.flows.NotaryError$TimeWindowInvalid$Com @net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.NotaryException extends net.corda.core.flows.FlowException public (net.corda.core.flows.NotaryError, net.corda.core.crypto.SecureHash) @org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotaryError getError() + @org.jetbrains.annotations.Nullable public final net.corda.core.crypto.SecureHash getTxId() ## public final class net.corda.core.flows.NotaryFlow extends java.lang.Object public () @@ -1462,6 +1491,10 @@ public abstract static class net.corda.core.flows.NotaryFlow$Service extends net @org.jetbrains.annotations.NotNull public final net.corda.core.node.services.TrustedAuthorityNotaryService getService() @co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull protected abstract net.corda.core.flows.TransactionParts validateRequest(net.corda.core.flows.NotarisationPayload) ## +@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.NotaryInternalException extends net.corda.core.flows.FlowException + public (net.corda.core.flows.NotaryError) + @org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotaryError getError() +## public final class net.corda.core.flows.ReceiveStateAndRefFlow extends net.corda.core.flows.FlowLogic public (net.corda.core.flows.FlowSession) @co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public List call() @@ -1514,6 +1547,15 @@ public @interface net.corda.core.flows.StartableByRPC ## public @interface net.corda.core.flows.StartableByService ## +@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.StateConsumptionDetails extends java.lang.Object + public (net.corda.core.crypto.SecureHash) + @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash component1() + @org.jetbrains.annotations.NotNull public final net.corda.core.flows.StateConsumptionDetails copy(net.corda.core.crypto.SecureHash) + public boolean equals(Object) + @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getHashOfTransactionId() + public int hashCode() + public String toString() +## @net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.StateMachineRunId extends java.lang.Object public (UUID) @org.jetbrains.annotations.NotNull public final UUID component1() @@ -1658,6 +1700,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec @org.jetbrains.annotations.NotNull public abstract List queryAttachments(net.corda.core.node.services.vault.AttachmentQueryCriteria, net.corda.core.node.services.vault.AttachmentSort) @org.jetbrains.annotations.NotNull public abstract List registeredFlows() public abstract void setFlowsDrainingModeEnabled(boolean) + public abstract void shutdown() @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.FlowHandle startFlowDynamic(Class, Object...) @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.FlowProgressHandle startTrackedFlowDynamic(Class, Object...) @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed stateMachineRecordedTransactionMappingFeed() @@ -1681,6 +1724,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec @org.jetbrains.annotations.Nullable public abstract net.corda.core.identity.Party wellKnownPartyFromX500Name(net.corda.core.identity.CordaX500Name) ## public final class net.corda.core.messaging.CordaRPCOpsKt extends java.lang.Object + @org.jetbrains.annotations.NotNull public static final net.corda.core.messaging.DataFeed pendingFlowsCount(net.corda.core.messaging.CordaRPCOps) ## @net.corda.core.serialization.CordaSerializable public final class net.corda.core.messaging.DataFeed extends java.lang.Object public (Object, rx.Observable) @@ -1880,6 +1924,7 @@ public @interface net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey) @org.jetbrains.annotations.NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction) @org.jetbrains.annotations.NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction, java.security.PublicKey) + @org.jetbrains.annotations.NotNull public abstract net.corda.core.cordapp.CordappContext getAppContext() @org.jetbrains.annotations.NotNull public abstract java.time.Clock getClock() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.ContractUpgradeService getContractUpgradeService() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.KeyManagementService getKeyManagementService() @@ -2856,6 +2901,8 @@ public interface net.corda.core.schemas.StatePersistable public interface net.corda.core.serialization.ClassWhitelist public abstract boolean hasListed(Class) ## +public @interface net.corda.core.serialization.ConstructorForDeserialization +## public @interface net.corda.core.serialization.CordaSerializable ## public @interface net.corda.core.serialization.CordaSerializationTransformEnumDefault @@ -2875,6 +2922,9 @@ public @interface net.corda.core.serialization.CordaSerializationTransformRename public @interface net.corda.core.serialization.DeprecatedConstructorForDeserialization public abstract int version() ## +@net.corda.core.DoNotImplement public interface net.corda.core.serialization.EncodingWhitelist + public abstract boolean acceptEncoding(net.corda.core.serialization.SerializationEncoding) +## @net.corda.core.serialization.CordaSerializable public final class net.corda.core.serialization.MissingAttachmentsException extends net.corda.core.CordaException public (List) @org.jetbrains.annotations.NotNull public final List getIds() @@ -2895,6 +2945,8 @@ public final class net.corda.core.serialization.SerializationAPIKt extends java. ## @net.corda.core.DoNotImplement public interface net.corda.core.serialization.SerializationContext @org.jetbrains.annotations.NotNull public abstract ClassLoader getDeserializationClassLoader() + @org.jetbrains.annotations.Nullable public abstract net.corda.core.serialization.SerializationEncoding getEncoding() + @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.EncodingWhitelist getEncodingWhitelist() public abstract boolean getObjectReferencesEnabled() @org.jetbrains.annotations.NotNull public abstract net.corda.core.utilities.ByteSequence getPreferredSerializationVersion() @org.jetbrains.annotations.NotNull public abstract Map getProperties() @@ -2902,6 +2954,7 @@ public final class net.corda.core.serialization.SerializationAPIKt extends java. @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.ClassWhitelist getWhitelist() @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withAttachmentsClassLoader(List) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withClassLoader(ClassLoader) + @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withEncoding(net.corda.core.serialization.SerializationEncoding) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withPreferredSerializationVersion(net.corda.core.utilities.ByteSequence) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withProperty(Object, Object) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withWhitelisted(Class) @@ -2925,6 +2978,8 @@ public final class net.corda.core.serialization.SerializationDefaults extends ja @org.jetbrains.annotations.NotNull public final net.corda.core.serialization.SerializationContext getSTORAGE_CONTEXT() public static final net.corda.core.serialization.SerializationDefaults INSTANCE ## +@net.corda.core.DoNotImplement public interface net.corda.core.serialization.SerializationEncoding +## public abstract class net.corda.core.serialization.SerializationFactory extends java.lang.Object public () public final Object asCurrent(kotlin.jvm.functions.Function1) @@ -3004,13 +3059,20 @@ public static final class net.corda.core.serialization.SingletonSerializationTok @org.jetbrains.annotations.NotNull public final Map component2() @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.ContractUpgradeFilteredTransaction copy(Map, Map) public boolean equals(Object) + @org.jetbrains.annotations.NotNull public final Map getHiddenComponents() @org.jetbrains.annotations.NotNull public net.corda.core.crypto.SecureHash getId() @org.jetbrains.annotations.NotNull public List getInputs() @org.jetbrains.annotations.NotNull public net.corda.core.identity.Party getNotary() @org.jetbrains.annotations.NotNull public List getOutputs() + @org.jetbrains.annotations.NotNull public final Map getVisibleComponents() public int hashCode() public String toString() ## +@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.transactions.ContractUpgradeFilteredTransaction$FilteredComponent extends java.lang.Object + public (net.corda.core.utilities.OpaqueBytes, net.corda.core.crypto.SecureHash) + @org.jetbrains.annotations.NotNull public final net.corda.core.utilities.OpaqueBytes getComponent() + @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getNonce() +## @net.corda.core.DoNotImplement public final class net.corda.core.transactions.ContractUpgradeLedgerTransaction extends net.corda.core.transactions.FullTransaction implements net.corda.core.transactions.TransactionWithSignatures public (List, net.corda.core.identity.Party, net.corda.core.contracts.Attachment, String, net.corda.core.contracts.Attachment, net.corda.core.crypto.SecureHash, net.corda.core.contracts.PrivacySalt, List, net.corda.core.node.NetworkParameters) public void checkSignaturesAreValid() @@ -3055,12 +3117,18 @@ public static final class net.corda.core.serialization.SingletonSerializationTok @org.jetbrains.annotations.NotNull public net.corda.core.identity.Party getNotary() @org.jetbrains.annotations.NotNull public List getOutputs() @org.jetbrains.annotations.NotNull public final net.corda.core.contracts.PrivacySalt getPrivacySalt() + @org.jetbrains.annotations.NotNull public final List getSerializedComponents() @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getUpgradedContractAttachmentId() @org.jetbrains.annotations.NotNull public final String getUpgradedContractClassName() public int hashCode() @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.ContractUpgradeLedgerTransaction resolve(net.corda.core.node.ServicesForResolution, List) public String toString() ## +public static final class net.corda.core.transactions.ContractUpgradeWireTransaction$Component extends java.lang.Enum + protected (String, int) + public static net.corda.core.transactions.ContractUpgradeWireTransaction$Component valueOf(String) + public static net.corda.core.transactions.ContractUpgradeWireTransaction$Component[] values() +## @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.transactions.CoreTransaction extends net.corda.core.transactions.BaseTransaction public () @org.jetbrains.annotations.NotNull public abstract List getInputs() @@ -3192,6 +3260,7 @@ public static final class net.corda.core.transactions.LedgerTransaction$InOutGro ## @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public final class net.corda.core.transactions.NotaryChangeWireTransaction extends net.corda.core.transactions.CoreTransaction public (List) + @kotlin.Deprecated public (List, net.corda.core.identity.Party, net.corda.core.identity.Party) @org.jetbrains.annotations.NotNull public final List component1() @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.NotaryChangeWireTransaction copy(List) public boolean equals(Object) @@ -3200,11 +3269,17 @@ public static final class net.corda.core.transactions.LedgerTransaction$InOutGro @org.jetbrains.annotations.NotNull public final net.corda.core.identity.Party getNewNotary() @org.jetbrains.annotations.NotNull public net.corda.core.identity.Party getNotary() @org.jetbrains.annotations.NotNull public List getOutputs() + @org.jetbrains.annotations.NotNull public final List getSerializedComponents() public int hashCode() @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.NotaryChangeLedgerTransaction resolve(net.corda.core.node.ServiceHub, List) @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.NotaryChangeLedgerTransaction resolve(net.corda.core.node.ServicesForResolution, List) public String toString() ## +public static final class net.corda.core.transactions.NotaryChangeWireTransaction$Component extends java.lang.Enum + protected (String, int) + public static net.corda.core.transactions.NotaryChangeWireTransaction$Component valueOf(String) + public static net.corda.core.transactions.NotaryChangeWireTransaction$Component[] values() +## @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public final class net.corda.core.transactions.SignedTransaction extends java.lang.Object implements net.corda.core.transactions.TransactionWithSignatures public (net.corda.core.serialization.SerializedBytes, List) public (net.corda.core.transactions.CoreTransaction, List) @@ -3348,6 +3423,7 @@ public final class net.corda.core.utilities.ByteArrays extends java.lang.Object @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.utilities.ByteSequence extends java.lang.Object implements java.lang.Comparable public int compareTo(net.corda.core.utilities.ByteSequence) @org.jetbrains.annotations.NotNull public final net.corda.core.utilities.ByteSequence copy() + @org.jetbrains.annotations.NotNull public final byte[] copyBytes() public boolean equals(Object) @org.jetbrains.annotations.NotNull public abstract byte[] getBytes() public final int getOffset() @@ -3357,9 +3433,12 @@ public final class net.corda.core.utilities.ByteArrays extends java.lang.Object @kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public static final net.corda.core.utilities.ByteSequence of(byte[], int) @kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public static final net.corda.core.utilities.ByteSequence of(byte[], int, int) @org.jetbrains.annotations.NotNull public final java.io.ByteArrayInputStream open() + @org.jetbrains.annotations.NotNull public final java.nio.ByteBuffer putTo(java.nio.ByteBuffer) + @org.jetbrains.annotations.NotNull public final java.nio.ByteBuffer slice(int, int) @org.jetbrains.annotations.NotNull public final net.corda.core.utilities.ByteSequence subSequence(int, int) @org.jetbrains.annotations.NotNull public final net.corda.core.utilities.ByteSequence take(int) @org.jetbrains.annotations.NotNull public String toString() + public final void writeTo(java.io.OutputStream) public static final net.corda.core.utilities.ByteSequence$Companion Companion ## public static final class net.corda.core.utilities.ByteSequence$Companion extends java.lang.Object @@ -4126,6 +4205,7 @@ public class net.corda.testing.node.MockServices extends java.lang.Object implem @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction, java.security.PublicKey) + @org.jetbrains.annotations.NotNull public net.corda.core.cordapp.CordappContext getAppContext() @org.jetbrains.annotations.NotNull public final net.corda.testing.services.MockAttachmentStorage getAttachments() @org.jetbrains.annotations.NotNull public java.time.Clock getClock() @org.jetbrains.annotations.NotNull public net.corda.core.node.services.ContractUpgradeService getContractUpgradeService() @@ -4180,6 +4260,7 @@ public static final class net.corda.testing.node.MockServicesKt$createMockCordaS @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction, java.security.PublicKey) + @org.jetbrains.annotations.NotNull public net.corda.core.cordapp.CordappContext getAppContext() @org.jetbrains.annotations.NotNull public net.corda.core.node.services.AttachmentStorage getAttachments() @org.jetbrains.annotations.NotNull public java.time.Clock getClock() @org.jetbrains.annotations.NotNull public net.corda.core.node.services.ContractUpgradeService getContractUpgradeService() @@ -4284,18 +4365,22 @@ public final class net.corda.client.rpc.CordaRPCClient extends java.lang.Object ## public static final class net.corda.client.rpc.CordaRPCClient$Companion extends java.lang.Object ## -public final class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang.Object - public (java.time.Duration) - @org.jetbrains.annotations.NotNull public final java.time.Duration component1() - @org.jetbrains.annotations.NotNull public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration) - public boolean equals(Object) - @org.jetbrains.annotations.NotNull public final java.time.Duration getConnectionMaxRetryInterval() - public int hashCode() - public String toString() +public interface net.corda.client.rpc.CordaRPCClientConfiguration + public abstract int getCacheConcurrencyLevel() + @org.jetbrains.annotations.NotNull public abstract java.time.Duration getConnectionMaxRetryInterval() + @org.jetbrains.annotations.NotNull public abstract java.time.Duration getConnectionRetryInterval() + public abstract double getConnectionRetryIntervalMultiplier() + @org.jetbrains.annotations.NotNull public abstract java.time.Duration getDeduplicationCacheExpiry() + public abstract int getMaxFileSize() + public abstract int getMaxReconnectAttempts() + public abstract int getMinimumServerProtocolVersion() + public abstract int getObservationExecutorPoolSize() + @org.jetbrains.annotations.NotNull public abstract java.time.Duration getReapInterval() + public abstract boolean getTrackRpcCallSites() public static final net.corda.client.rpc.CordaRPCClientConfiguration$Companion Companion - @org.jetbrains.annotations.NotNull public static final net.corda.client.rpc.CordaRPCClientConfiguration DEFAULT ## public static final class net.corda.client.rpc.CordaRPCClientConfiguration$Companion extends java.lang.Object + @org.jetbrains.annotations.NotNull public final net.corda.client.rpc.CordaRPCClientConfiguration default() ## @net.corda.core.DoNotImplement public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Object implements net.corda.client.rpc.RPCConnection public (net.corda.client.rpc.RPCConnection) diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt index ad2900fd53..2fdcd18550 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt @@ -58,9 +58,9 @@ class NodeMonitorModel { fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) { val client = CordaRPCClient( nodeHostAndPort, - CordaRPCClientConfiguration.DEFAULT.copy( - connectionMaxRetryInterval = 10.seconds - ) + object : CordaRPCClientConfiguration { + override val connectionMaxRetryInterval = 10.seconds + } ) val connection = client.start(username, password) val proxy = connection.proxy diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt index b95480b95b..2eef0c704f 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt @@ -18,28 +18,28 @@ import net.corda.finance.flows.CashPaymentFlow import net.corda.finance.schemas.CashSchemaV1 import net.corda.node.internal.Node import net.corda.node.internal.StartedNode -import net.corda.node.services.Permissions.Companion.invokeRpc -import net.corda.node.services.Permissions.Companion.startFlow +import net.corda.node.services.Permissions.Companion.all import net.corda.testing.core.* import net.corda.testing.node.User import net.corda.testing.node.internal.NodeBasedTest +import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.After import org.junit.Before import org.junit.Test +import rx.subjects.PublishSubject +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertTrue class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) { - private val rpcUser = User("user1", "test", permissions = setOf( - startFlow(), - startFlow(), - invokeRpc("vaultQueryBy"), - invokeRpc(CordaRPCOps::stateMachinesFeed), - invokeRpc("vaultQueryByCriteria")) + private val rpcUser = User("user1", "test", permissions = setOf(all()) ) private lateinit var node: StartedNode private lateinit var identity: Party @@ -53,7 +53,9 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C @Before fun setUp() { node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) - client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!) + client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, object : CordaRPCClientConfiguration { + override val maxReconnectAttempts = 5 + }) identity = node.info.identityFromX500Name(ALICE_NAME) } @@ -81,6 +83,61 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C } } + @Test + fun `shutdown command stops the node`() { + + val nodeIsShut: PublishSubject = PublishSubject.create() + val latch = CountDownLatch(1) + var successful = false + val maxCount = 20 + var count = 0 + CloseableExecutor(Executors.newSingleThreadScheduledExecutor()).use { scheduler -> + + val task = scheduler.scheduleAtFixedRate({ + try { + println("Checking whether node is still running...") + client.start(rpcUser.username, rpcUser.password).use { + println("... node is still running.") + if (count == maxCount) { + nodeIsShut.onError(AssertionError("Node does not get shutdown by RPC")) + } + count++ + } + } catch (e: ActiveMQNotConnectedException) { + println("... node is not running.") + nodeIsShut.onCompleted() + } catch (e: ActiveMQSecurityException) { + // nothing here - this happens if trying to connect before the node is started + } catch (e: Throwable) { + nodeIsShut.onError(e) + } + }, 1, 1, TimeUnit.SECONDS) + + nodeIsShut.doOnError { error -> + error.printStackTrace() + successful = false + task.cancel(true) + latch.countDown() + }.doOnCompleted { + successful = (node.internals.started == null) + task.cancel(true) + latch.countDown() + }.subscribe() + + client.start(rpcUser.username, rpcUser.password).use { rpc -> rpc.proxy.shutdown() } + + latch.await() + assertThat(successful).isTrue() + } + } + + private class CloseableExecutor(private val delegate: ScheduledExecutorService) : AutoCloseable, ScheduledExecutorService by delegate { + + override fun close() { + delegate.shutdown() + } + } + @Test fun `close-send deadlock and premature shutdown on empty observable`() { println("Starting client") @@ -141,7 +198,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C val updates = proxy.stateMachinesFeed().updates - node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0),identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow() + node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0), identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow() proxy.startFlow(::CashIssueFlow, 123.DOLLARS, OpaqueBytes.of(0), identity).returnValue.getOrThrow() proxy.startFlowDynamic(CashIssueFlow::class.java, 1000.DOLLARS, OpaqueBytes.of(0), identity).returnValue.getOrThrow() diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 6597df2915..8dcb6e3858 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -1,7 +1,7 @@ package net.corda.client.rpc import net.corda.client.rpc.internal.RPCClient -import net.corda.client.rpc.internal.RPCClientConfiguration +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.context.Trace import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork @@ -105,7 +105,7 @@ class RPCStabilityTests { Try.on { startRpcClient( server.get().broker.hostAndPort!!, - configuration = RPCClientConfiguration.default.copy(minimumServerProtocolVersion = 1) + configuration = CordaRPCClientConfigurationImpl.default.copy(minimumServerProtocolVersion = 1) ).get() } } @@ -240,7 +240,7 @@ class RPCStabilityTests { val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() // Set retry interval to 1s to reduce test duration - val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds) + val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds) val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() clientFollower.unfollow() @@ -266,7 +266,7 @@ class RPCStabilityTests { val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() // Set retry interval to 1s to reduce test duration - val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5) + val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5) val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() clientFollower.unfollow() @@ -298,7 +298,7 @@ class RPCStabilityTests { val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() - val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1) + val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1) val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() clientFollower.unfollow() diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index 7368f5fd88..4e4c6c99cc 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -2,7 +2,7 @@ package net.corda.client.rpc import net.corda.client.rpc.internal.KryoClientSerializationScheme import net.corda.client.rpc.internal.RPCClient -import net.corda.client.rpc.internal.RPCClientConfiguration +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.context.Actor import net.corda.core.context.Trace import net.corda.core.messaging.CordaRPCOps @@ -23,23 +23,46 @@ class CordaRPCConnection internal constructor(connection: RPCConnection( tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration), - configuration.toRpcClientConfiguration(), + configuration, if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT ) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt index 7526921453..ab74cb9a4a 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt @@ -2,19 +2,32 @@ package net.corda.client.rpc.internal import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.pendingFlowsCount import net.corda.core.utilities.NetworkHostAndPort import net.corda.nodeapi.internal.config.SSLConfiguration +import rx.Observable /** Utility which exposes the internal Corda RPC constructor to other internal Corda components */ fun createCordaRPCClientWithSsl( hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), sslConfiguration: SSLConfiguration? = null ) = CordaRPCClient.createWithSsl(hostAndPort, configuration, sslConfiguration) fun createCordaRPCClientWithSslAndClassLoader( hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), sslConfiguration: SSLConfiguration? = null, classLoader: ClassLoader? = null -) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader) \ No newline at end of file +) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader) + +fun CordaRPCOps.drainAndShutdown(): Observable { + + setFlowsDrainingModeEnabled(true) + return pendingFlowsCount().updates + .doOnError { error -> + throw error + } + .doOnCompleted { shutdown() }.map { } +} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index 26d027e287..e8f33d284f 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -1,6 +1,5 @@ package net.corda.client.rpc.internal -import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.RPCConnection import net.corda.client.rpc.RPCException @@ -27,40 +26,22 @@ import java.time.Duration /** * This configuration may be used to tweak the internals of the RPC client. */ -data class RPCClientConfiguration( - /** The minimum protocol version required from the server */ - val minimumServerProtocolVersion: Int, - /** - * If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a - * returned Observable stream the stack trace of the originating RPC will be shown as well. Note that - * constructing call stacks is a moderately expensive operation. - */ - val trackRpcCallSites: Boolean, - /** - * The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references - * and are cleaned up in batches in this interval. If set too large it will waste server side resources for this - * duration. If set too low it wastes client side cycles. - */ - val reapInterval: Duration, - /** The number of threads to use for observations (for executing [Observable.onNext]) */ - val observationExecutorPoolSize: Int, - /** The retry interval of artemis connections in milliseconds */ - val connectionRetryInterval: Duration, - /** The retry interval multiplier for exponential backoff */ - val connectionRetryIntervalMultiplier: Double, - /** Maximum retry interval */ - val connectionMaxRetryInterval: Duration, - /** Maximum reconnect attempts on failover */ - val maxReconnectAttempts: Int, - /** Maximum file size */ - val maxFileSize: Int, - /** The cache expiry of a deduplication watermark per client. */ - val deduplicationCacheExpiry: Duration -) { +data class CordaRPCClientConfigurationImpl( + override val minimumServerProtocolVersion: Int, + override val trackRpcCallSites: Boolean, + override val reapInterval: Duration, + override val observationExecutorPoolSize: Int, + override val connectionRetryInterval: Duration, + override val connectionRetryIntervalMultiplier: Double, + override val connectionMaxRetryInterval: Duration, + override val maxReconnectAttempts: Int, + override val maxFileSize: Int, + override val deduplicationCacheExpiry: Duration +) : CordaRPCClientConfiguration { companion object { - val unlimitedReconnectAttempts = -1 + private const val unlimitedReconnectAttempts = -1 @JvmStatic - val default = RPCClientConfiguration( + val default = CordaRPCClientConfigurationImpl( minimumServerProtocolVersion = 0, trackRpcCallSites = false, reapInterval = 1.seconds, @@ -78,13 +59,13 @@ data class RPCClientConfiguration( class RPCClient( val transport: TransportConfiguration, - val rpcConfiguration: RPCClientConfiguration = RPCClientConfiguration.default, + val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT ) { constructor( hostAndPort: NetworkHostAndPort, sslConfiguration: SSLConfiguration? = null, - configuration: RPCClientConfiguration = RPCClientConfiguration.default, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT ) : this(tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfiguration), configuration, serializationContext) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 56e472d107..aa93a48c3c 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -11,6 +11,7 @@ import com.github.benmanes.caffeine.cache.RemovalCause import com.github.benmanes.caffeine.cache.RemovalListener import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.ThreadFactoryBuilder +import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCSinceVersion import net.corda.core.context.Actor @@ -70,7 +71,7 @@ import kotlin.reflect.jvm.javaMethod * The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor]. */ class RPCClientProxyHandler( - private val rpcConfiguration: RPCClientConfiguration, + private val rpcConfiguration: CordaRPCClientConfiguration, private val rpcUsername: String, private val rpcPassword: String, private val serverLocator: ServerLocator, @@ -414,8 +415,8 @@ class RPCClientProxyHandler( sendingEnabled = false } - log.warn("RPC server unavailable.") - log.warn("Terminating observables and in flight RPCs.") + log.warn("RPC server unavailable. RPC calls are being buffered.") + log.warn("Terminating observables.") val m = observableContext.observableMap.asMap() m.keys.forEach { k -> observationExecutorPool.run(k) { diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt index 9f7ddefa0e..aa542ec49f 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt @@ -1,6 +1,6 @@ package net.corda.client.rpc -import net.corda.client.rpc.internal.RPCClientConfiguration +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.map import net.corda.core.messaging.RPCOps @@ -44,7 +44,7 @@ open class AbstractRPCTest { inline fun RPCDriverDSL.testProxy( ops: I, rpcUser: User = rpcTestUser, - clientConfiguration: RPCClientConfiguration = RPCClientConfiguration.default, + clientConfiguration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default, serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.default ): TestProxy { return when (mode) { diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt index fe405b37af..66dd7ae816 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt @@ -1,6 +1,6 @@ package net.corda.client.rpc -import net.corda.client.rpc.internal.RPCClientConfiguration +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.transpose @@ -90,7 +90,7 @@ class RPCConcurrencyTests : AbstractRPCTest() { private fun RPCDriverDSL.testProxy(): TestProxy { return testProxy( TestOpsImpl(pool), - clientConfiguration = RPCClientConfiguration.default.copy( + clientConfiguration = CordaRPCClientConfigurationImpl.default.copy( reapInterval = 100.millis ), serverConfiguration = RPCServerConfiguration.default.copy( diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt index 0211d0b3ec..7175ea987f 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt @@ -1,9 +1,8 @@ package net.corda.client.rpc import com.google.common.base.Stopwatch -import net.corda.client.rpc.internal.RPCClientConfiguration +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.messaging.RPCOps -import net.corda.core.utilities.days import net.corda.core.utilities.minutes import net.corda.core.utilities.seconds import net.corda.node.services.messaging.RPCServerConfiguration @@ -43,7 +42,7 @@ class RPCPerformanceTests : AbstractRPCTest() { } private fun RPCDriverDSL.testProxy( - clientConfiguration: RPCClientConfiguration, + clientConfiguration: CordaRPCClientConfigurationImpl, serverConfiguration: RPCServerConfiguration ): TestProxy { return testProxy( @@ -56,7 +55,7 @@ class RPCPerformanceTests : AbstractRPCTest() { private fun warmup() { rpcDriver { val proxy = testProxy( - RPCClientConfiguration.default, + CordaRPCClientConfigurationImpl.default, RPCServerConfiguration.default ) val executor = Executors.newFixedThreadPool(4) @@ -86,7 +85,7 @@ class RPCPerformanceTests : AbstractRPCTest() { measure(inputOutputSizes, (1..5)) { inputOutputSize, _ -> rpcDriver { val proxy = testProxy( - RPCClientConfiguration.default.copy( + CordaRPCClientConfigurationImpl.default.copy( observationExecutorPoolSize = 2 ), RPCServerConfiguration.default.copy( @@ -125,7 +124,7 @@ class RPCPerformanceTests : AbstractRPCTest() { rpcDriver { val metricRegistry = startReporter(shutdownManager) val proxy = testProxy( - RPCClientConfiguration.default.copy( + CordaRPCClientConfigurationImpl.default.copy( reapInterval = 1.seconds ), RPCServerConfiguration.default.copy( @@ -157,7 +156,7 @@ class RPCPerformanceTests : AbstractRPCTest() { // TODO this hangs with more parallelism rpcDriver { val proxy = testProxy( - RPCClientConfiguration.default, + CordaRPCClientConfigurationImpl.default, RPCServerConfiguration.default ) val numberOfMessages = 1000 diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index 6ad0b6ee89..51c287af17 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -21,6 +21,7 @@ import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.Try import rx.Observable +import rx.subjects.PublishSubject import java.io.IOException import java.io.InputStream import java.security.PublicKey @@ -365,6 +366,44 @@ interface CordaRPCOps : RPCOps { * @see setFlowsDrainingModeEnabled */ fun isFlowsDrainingModeEnabled(): Boolean + + /** + * Shuts the node down. Returns immediately. + * This does not wait for flows to be completed. + */ + fun shutdown() +} + +/** + * Returns a [DataFeed] that keeps track on the count of pending flows. + */ +fun CordaRPCOps.pendingFlowsCount(): DataFeed> { + + val stateMachineState = stateMachinesFeed() + var pendingFlowsCount = stateMachineState.snapshot.size + var completedFlowsCount = 0 + val updates = PublishSubject.create>() + stateMachineState + .updates + .doOnNext { update -> + when (update) { + is StateMachineUpdate.Added -> { + pendingFlowsCount++ + updates.onNext(completedFlowsCount to pendingFlowsCount) + } + is StateMachineUpdate.Removed -> { + completedFlowsCount++ + updates.onNext(completedFlowsCount to pendingFlowsCount) + if (completedFlowsCount == pendingFlowsCount) { + updates.onCompleted() + } + } + } + }.subscribe() + if (completedFlowsCount == 0) { + updates.onCompleted() + } + return DataFeed(pendingFlowsCount, updates) } inline fun CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(), diff --git a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt index 69288a9a8e..4a6f76821b 100644 --- a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt +++ b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt @@ -116,7 +116,7 @@ class ContractUpgradeFlowTest { return startRpcClient( rpcAddress = startRpcServer( rpcUser = user, - ops = SecureCordaRPCOps(node.services, node.smm, node.database, node.services) + ops = SecureCordaRPCOps(node.services, node.smm, node.database, node.services, { }) ).get().broker.hostAndPort!!, username = user.username, password = user.password diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index a7a24e834e..167df1cddc 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -7,6 +7,8 @@ Unreleased Here are brief summaries of what's changed between each snapshot release. This includes guidance on how to upgrade code from the previous milestone release. +* Node can be shut down abruptly by ``shutdown`` function in `CordaRPCOps` or gracefully (draining flows first) through ``gracefulShutdown`` command from shell. + * Parsing of ``NodeConfiguration`` will now fail if unknown configuration keys are found. * The web server now has its own ``web-server.conf`` file, separate from ``node.conf``. diff --git a/docs/source/shell.rst b/docs/source/shell.rst index d560467f7e..a76dc56846 100644 --- a/docs/source/shell.rst +++ b/docs/source/shell.rst @@ -18,6 +18,7 @@ the `CRaSH`_ shell and supports many of the same features. These features includ * Issuing SQL queries to the underlying database * Viewing JMX metrics and monitoring exports * UNIX style pipes for both text and objects, an ``egrep`` command and a command for working with columnular data +* Shutting the node down. Permissions ----------- @@ -197,6 +198,14 @@ Some RPCs return a stream of events that will be shown on screen until you press You can find a list of the available RPC methods `here `_. +Shutting down the node +********************** + +You can shut the node down via shell: + +* ``gracefulShutdown`` will put node into draining mode, and shut down when there are no flows running +* ``shutdown`` will shut the node down immediately + Flow commands ************* diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt b/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt index ed6b9f097a..7f4b89862b 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt @@ -156,9 +156,9 @@ class Node( val user = config.users.first() val address = config.nodeInterface val targetHost = NetworkHostAndPort(address.host, address.rpcPort) - val config = CordaRPCClientConfiguration( - connectionMaxRetryInterval = 10.seconds - ) + val config = object : CordaRPCClientConfiguration { + override val connectionMaxRetryInterval = 10.seconds + } log.info("Establishing RPC connection to ${targetHost.host} on port ${targetHost.port} ...") CordaRPCClient(targetHost, config).use(user.username, user.password) { log.info("RPC connection to ${targetHost.host}:${targetHost.port} established") diff --git a/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt b/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt index 645f32f081..3acc1a6435 100644 --- a/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt @@ -1,6 +1,7 @@ package net.corda.node.modes.draining import co.paralleluniverse.fibers.Suspendable +import net.corda.client.rpc.internal.drainAndShutdown import net.corda.core.flows.* import net.corda.core.identity.Party import net.corda.core.internal.concurrent.map @@ -13,11 +14,13 @@ import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.PortAllocation import net.corda.testing.driver.driver +import net.corda.testing.internal.chooseIdentity import net.corda.testing.node.User import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat import org.junit.After import org.junit.Before import org.junit.Test +import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit @@ -49,6 +52,7 @@ class P2PFlowsDrainingModeTest { fun `flows draining mode suspends consumption of initial session messages`() { driver(DriverParameters(isDebug = true, startNodesInProcess = false, portAllocation = portAllocation)) { + val initiatedNode = startNode().getOrThrow() val initiating = startNode(rpcUsers = users).getOrThrow().rpc val counterParty = initiatedNode.nodeInfo.singleIdentity() @@ -76,27 +80,54 @@ class P2PFlowsDrainingModeTest { } } - @StartableByRPC - @InitiatingFlow - class InitiateSessionFlow(private val counterParty: Party) : FlowLogic() { + @Test + fun `clean shutdown by draining`() { - @Suspendable - override fun call(): String { + driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation)) { - val session = initiateFlow(counterParty) - session.send("Hi there") - return session.receive().unwrap { it } + val nodeA = startNode(rpcUsers = users).getOrThrow() + val nodeB = startNode(rpcUsers = users).getOrThrow() + var successful = false + val latch = CountDownLatch(1) + nodeB.rpc.setFlowsDrainingModeEnabled(true) + IntRange(1, 10).forEach { nodeA.rpc.startFlow(::InitiateSessionFlow, nodeB.nodeInfo.chooseIdentity()) } + + nodeA.rpc.drainAndShutdown() + .doOnError { error -> + error.printStackTrace() + successful = false + } + .doOnCompleted { successful = true } + .doAfterTerminate { latch.countDown() } + .subscribe() + nodeB.rpc.setFlowsDrainingModeEnabled(false) + latch.await() + + assertThat(successful).isTrue() } } +} - @InitiatedBy(InitiateSessionFlow::class) - class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic() { +@StartableByRPC +@InitiatingFlow +class InitiateSessionFlow(private val counterParty: Party) : FlowLogic() { - @Suspendable - override fun call() { + @Suspendable + override fun call(): String { - val message = initiatingSession.receive().unwrap { it } - initiatingSession.send("$message answer") - } + val session = initiateFlow(counterParty) + session.send("Hi there") + return session.receive().unwrap { it } + } +} + +@InitiatedBy(InitiateSessionFlow::class) +class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic() { + + @Suspendable + override fun call() { + + val message = initiatingSession.receive().unwrap { it } + initiatingSession.send("$message answer") } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index be172b5300..6f8572fd1a 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -86,6 +86,7 @@ import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.SECONDS import kotlin.collections.set import kotlin.reflect.KClass @@ -145,6 +146,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected lateinit var networkMapUpdater: NetworkMapUpdater lateinit var securityManager: RPCSecurityManager + private val shutdownExecutor = Executors.newSingleThreadExecutor() + /** Completes once the node has successfully registered with the network map service * or has loaded network map data from local database */ val nodeReadyFuture: CordaFuture get() = _nodeReadyFuture @@ -159,7 +162,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, /** The implementation of the [CordaRPCOps] interface used by this node. */ open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence, smm: StateMachineManager): CordaRPCOps { - return SecureCordaRPCOps(services, smm, database, flowStarter) + + return SecureCordaRPCOps(services, smm, database, flowStarter, { shutdownExecutor.submit { stop() } }) } private fun initCertificate() { @@ -712,6 +716,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, toRun() } runOnStop.clear() + shutdownExecutor.shutdown() _started = null } diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index b6cd736f10..44214ba206 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -45,7 +45,8 @@ internal class CordaRPCOpsImpl( private val services: ServiceHubInternal, private val smm: StateMachineManager, private val database: CordaPersistence, - private val flowStarter: FlowStarter + private val flowStarter: FlowStarter, + private val shutdownNode: () -> Unit ) : CordaRPCOps { override fun networkMapSnapshot(): List { val (snapshot, updates) = networkMapFeed() @@ -298,6 +299,10 @@ internal class CordaRPCOpsImpl( return services.nodeProperties.flowsDrainingMode.isEnabled() } + override fun shutdown() { + shutdownNode.invoke() + } + private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo { return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context) } diff --git a/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt b/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt index c17917c24e..ece6c31561 100644 --- a/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt +++ b/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt @@ -167,6 +167,8 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val override fun isFlowsDrainingModeEnabled(): Boolean = guard("isFlowsDrainingModeEnabled", implementation::isFlowsDrainingModeEnabled) + override fun shutdown() = guard("shutdown", implementation::shutdown) + // TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140 private inline fun guard(methodName: String, action: () -> RESULT) = guard(methodName, emptyList(), action) diff --git a/node/src/main/kotlin/net/corda/node/internal/SecureCordaRPCOps.kt b/node/src/main/kotlin/net/corda/node/internal/SecureCordaRPCOps.kt index 0a7d9e9ce1..eebf8db477 100644 --- a/node/src/main/kotlin/net/corda/node/internal/SecureCordaRPCOps.kt +++ b/node/src/main/kotlin/net/corda/node/internal/SecureCordaRPCOps.kt @@ -14,7 +14,8 @@ class SecureCordaRPCOps(services: ServiceHubInternal, smm: StateMachineManager, database: CordaPersistence, flowStarter: FlowStarter, - val unsafe: CordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter)) : CordaRPCOps by RpcAuthorisationProxy(unsafe, ::rpcContext) { + shutdownNode: () -> Unit, + val unsafe: CordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter, shutdownNode)) : CordaRPCOps by RpcAuthorisationProxy(unsafe, ::rpcContext) { /** * Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed diff --git a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt index 99bfc70399..ef83ab852c 100644 --- a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt @@ -82,7 +82,7 @@ class CordaRPCOpsImplTest { fun setup() { mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.finance.contracts.asset")) aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) - rpc = SecureCordaRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database, aliceNode.services) + rpc = SecureCordaRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database, aliceNode.services, { }) CURRENT_RPC_CONTEXT.set(RpcAuthContext(InvocationContext.rpc(testActor()), buildSubject("TEST_USER", emptySet()))) mockNet.runNetwork() diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt index 3d30e72712..2d07005d83 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt @@ -1,5 +1,7 @@ package net.corda.testing.node.internal +import net.corda.client.rpc.CordaRPCClient +import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.core.CordaException import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext @@ -14,8 +16,10 @@ import net.corda.core.utilities.seconds import net.corda.node.services.api.StartedNodeServices import net.corda.node.services.messaging.Message import net.corda.node.services.messaging.MessagingService +import net.corda.testing.driver.NodeHandle import net.corda.testing.internal.chooseIdentity import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.User import net.corda.testing.node.testContext import org.slf4j.LoggerFactory import java.net.Socket @@ -113,4 +117,4 @@ internal interface InternalMockMessagingService : MessagingService { fun stop() } - +fun CordaRPCClient.start(user: User) = start(user.username, user.password) \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index 040f42057d..042a38ce67 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -3,7 +3,7 @@ package net.corda.testing.node.internal import net.corda.client.mock.Generator import net.corda.client.rpc.internal.KryoClientSerializationScheme import net.corda.client.rpc.internal.RPCClient -import net.corda.client.rpc.internal.RPCClientConfiguration +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.concurrent.CordaFuture import net.corda.core.context.AuthServiceId import net.corda.core.context.Trace @@ -59,7 +59,7 @@ import net.corda.nodeapi.internal.config.User as InternalUser inline fun RPCDriverDSL.startInVmRpcClient( username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: RPCClientConfiguration = RPCClientConfiguration.default + configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default ) = startInVmRpcClient(I::class.java, username, password, configuration) inline fun RPCDriverDSL.startRandomRpcClient( @@ -72,7 +72,7 @@ inline fun RPCDriverDSL.startRpcClient( rpcAddress: NetworkHostAndPort, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: RPCClientConfiguration = RPCClientConfiguration.default + configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default ) = startRpcClient(I::class.java, rpcAddress, username, password, configuration) data class RpcBrokerHandle( @@ -253,7 +253,7 @@ data class RPCDriverDSL( rpcOpsClass: Class, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: RPCClientConfiguration = RPCClientConfiguration.default + configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default ): CordaFuture { return driverDSL.executorService.fork { val client = RPCClient(inVmClientTransportConfiguration, configuration) @@ -324,7 +324,7 @@ data class RPCDriverDSL( rpcAddress: NetworkHostAndPort, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: RPCClientConfiguration = RPCClientConfiguration.default + configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default ): CordaFuture { return driverDSL.executorService.fork { val client = RPCClient(ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), rpcAddress, null), configuration) diff --git a/tools/shell/src/main/java/net/corda/tools/shell/RunShellCommand.java b/tools/shell/src/main/java/net/corda/tools/shell/RunShellCommand.java index f6c0623ad1..9b98d176af 100644 --- a/tools/shell/src/main/java/net/corda/tools/shell/RunShellCommand.java +++ b/tools/shell/src/main/java/net/corda/tools/shell/RunShellCommand.java @@ -1,12 +1,20 @@ package net.corda.tools.shell; -import net.corda.core.messaging.*; -import net.corda.client.jackson.*; -import org.crsh.cli.*; -import org.crsh.command.*; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import net.corda.client.jackson.StringToMethodCallParser; +import net.corda.core.messaging.CordaRPCOps; +import org.crsh.cli.Argument; +import org.crsh.cli.Command; +import org.crsh.cli.Man; +import org.crsh.cli.Usage; +import org.crsh.command.InvocationContext; +import org.jetbrains.annotations.NotNull; import java.util.*; +import static java.util.Comparator.comparing; + // Note that this class cannot be converted to Kotlin because CRaSH does not understand InvocationContext> which // is the closest you can get in Kotlin to raw types. @@ -29,8 +37,7 @@ public class RunShellCommand extends InteractiveShellCommand { emitHelp(context, parser); return null; } - - return InteractiveShell.runRPCFromString(command, out, context, ops(), objectMapper()); + return InteractiveShell.runRPCFromString(command, out, context, ops(), objectMapper(), isSsh()); } private void emitHelp(InvocationContext context, StringToMethodCallParser parser) { @@ -38,21 +45,37 @@ public class RunShellCommand extends InteractiveShellCommand { // Each element we emit is a map of column -> content. Set> entries = parser.getAvailableCommands().entrySet(); ArrayList> entryList = new ArrayList<>(entries); - entryList.sort(Comparator.comparing(Map.Entry::getKey)); + entryList.sort(comparing(Map.Entry::getKey)); for (Map.Entry entry : entryList) { // Skip these entries as they aren't really interesting for the user. if (entry.getKey().equals("startFlowDynamic")) continue; if (entry.getKey().equals("getProtocolVersion")) continue; - // Use a LinkedHashMap to ensure that the Command column comes first. - Map m = new LinkedHashMap<>(); - m.put("Command", entry.getKey()); - m.put("Parameter types", entry.getValue()); try { - context.provide(m); + context.provide(commandAndDesc(entry.getKey(), entry.getValue())); } catch (Exception e) { throw new RuntimeException(e); } } + + Lists.newArrayList( + commandAndDesc("shutdown", "Shuts node down (immediately)"), + commandAndDesc("gracefulShutdown", "Shuts node down gracefully, waiting for all flows to complete first.") + ).forEach(stringStringMap -> { + try { + context.provide(stringStringMap); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @NotNull + private Map commandAndDesc(String command, String description) { + // Use a LinkedHashMap to ensure that the Command column comes first. + Map abruptShutdown = Maps.newLinkedHashMap(); + abruptShutdown.put("Command", command); + abruptShutdown.put("Parameter types", description); + return abruptShutdown; } } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaAuthenticationPlugin.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaAuthenticationPlugin.kt index c495292d0f..4ee877b3af 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaAuthenticationPlugin.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaAuthenticationPlugin.kt @@ -24,7 +24,7 @@ class CordaAuthenticationPlugin(private val rpcOps: (username: String, credentia } try { val ops = rpcOps(username, credential) - return CordaSSHAuthInfo(true, ops) + return CordaSSHAuthInfo(true, ops, isSsh = true) } catch (e: ActiveMQSecurityException) { logger.warn(e.message) } catch (e: Exception) { diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaSSHAuthInfo.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaSSHAuthInfo.kt index c8202bf03d..738b98623c 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaSSHAuthInfo.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaSSHAuthInfo.kt @@ -6,7 +6,7 @@ import net.corda.tools.shell.InteractiveShell.createYamlInputMapper import net.corda.tools.shell.utlities.ANSIProgressRenderer import org.crsh.auth.AuthInfo -class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: CordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null) : AuthInfo { +class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: CordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null, val isSsh: Boolean = false) : AuthInfo { override fun isSuccessful(): Boolean = successful val yamlInputMapper: ObjectMapper by lazy { diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index 88620c32c6..646bc3929b 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -1,13 +1,16 @@ package net.corda.tools.shell import com.fasterxml.jackson.core.JsonGenerator -import com.fasterxml.jackson.core.JsonParser -import com.fasterxml.jackson.databind.* +import com.fasterxml.jackson.databind.JsonSerializer +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.SerializationFeature +import com.fasterxml.jackson.databind.SerializerProvider import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.dataformat.yaml.YAMLFactory -import com.google.common.io.Closeables import net.corda.client.jackson.JacksonSupport import net.corda.client.jackson.StringToMethodCallParser +import net.corda.client.rpc.CordaRPCClientConfiguration +import net.corda.client.rpc.CordaRPCConnection import net.corda.client.rpc.PermissionException import net.corda.client.rpc.internal.createCordaRPCClientWithSslAndClassLoader import net.corda.core.CordaException @@ -18,13 +21,9 @@ import net.corda.core.identity.Party import net.corda.core.internal.* import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.openFuture -import net.corda.core.messaging.CordaRPCOps -import net.corda.core.messaging.DataFeed -import net.corda.core.messaging.FlowProgressHandle -import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.messaging.* import net.corda.core.node.NodeInfo import net.corda.core.utilities.NetworkHostAndPort -import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.tools.shell.utlities.ANSIProgressRenderer import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer import org.crsh.command.InvocationContext @@ -50,12 +49,13 @@ import org.json.JSONObject import org.slf4j.LoggerFactory import rx.Observable import rx.Subscriber -import java.io.* +import java.io.FileDescriptor +import java.io.FileInputStream +import java.io.InputStream +import java.io.PrintWriter import java.lang.reflect.InvocationTargetException import java.lang.reflect.UndeclaredThrowableException -import java.nio.file.Files import java.nio.file.Path -import java.nio.file.Paths import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.ExecutionException @@ -73,69 +73,30 @@ import kotlin.concurrent.thread // TODO: Resurrect or reimplement the mail plugin. // TODO: Make it notice new shell commands added after the node started. -data class SSHDConfiguration(val port: Int) { - companion object { - internal const val INVALID_PORT_FORMAT = "Invalid port: %s" - private const val MISSING_PORT_FORMAT = "Missing port: %s" - - /** - * Parses a string of the form port into a [SSHDConfiguration]. - * @throws IllegalArgumentException if the port is missing or the string is garbage. - */ - @JvmStatic - fun parse(str: String): SSHDConfiguration { - require(!str.isNullOrBlank()) { SSHDConfiguration.MISSING_PORT_FORMAT.format(str) } - val port = try { - str.toInt() - } catch (ex: NumberFormatException) { - throw IllegalArgumentException("Port syntax is invalid, expected port") - } - return SSHDConfiguration(port) - } - } - - init { - require(port in (0..0xffff)) { INVALID_PORT_FORMAT.format(port) } - } -} - -data class ShellSslOptions(override val sslKeystore: Path, override val keyStorePassword: String, override val trustStoreFile:Path, override val trustStorePassword: String) : SSLConfiguration { - override val certificatesDirectory: Path get() = Paths.get("") -} - -data class ShellConfiguration( - val commandsDirectory: Path, - val cordappsDirectory: Path? = null, - var user: String = "", - var password: String = "", - val hostAndPort: NetworkHostAndPort, - val ssl: ShellSslOptions? = null, - val sshdPort: Int? = null, - val sshHostKeyDirectory: Path? = null, - val noLocalShell: Boolean = false) { - companion object { - const val SSH_PORT = 2222 - const val COMMANDS_DIR = "shell-commands" - const val CORDAPPS_DIR = "cordapps" - const val SSHD_HOSTKEY_DIR = "ssh" - } -} - object InteractiveShell { private val log = LoggerFactory.getLogger(javaClass) private lateinit var rpcOps: (username: String, credentials: String) -> CordaRPCOps - private lateinit var connection: CordaRPCOps + private lateinit var ops: CordaRPCOps + private lateinit var connection: CordaRPCConnection private var shell: Shell? = null private var classLoader: ClassLoader? = null + private lateinit var shellConfiguration: ShellConfiguration + private var onExit: () -> Unit = {} /** * Starts an interactive shell connected to the local terminal. This shell gives administrator access to the node * internals. */ fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) { + shellConfiguration = configuration rpcOps = { username: String, credentials: String -> val client = createCordaRPCClientWithSslAndClassLoader(hostAndPort = configuration.hostAndPort, - sslConfiguration = configuration.ssl, classLoader = classLoader) - client.start(username, credentials).proxy + configuration = object : CordaRPCClientConfiguration { + override val maxReconnectAttempts = 1 + }, + sslConfiguration = configuration.ssl, + classLoader = classLoader) + this.connection = client.start(username, credentials) + connection.proxy } InteractiveShell.classLoader = classLoader val runSshDaemon = configuration.sshdPort != null @@ -160,6 +121,7 @@ object InteractiveShell { } fun runLocalShell(onExit: () -> Unit = {}) { + this.onExit = onExit val terminal = TerminalFactory.create() val consoleReader = ConsoleReader("Corda", FileInputStream(FileDescriptor.`in`), System.out, terminal) val jlineProcessor = JLineProcessor(terminal.isAnsiSupported, shell, consoleReader, System.out) @@ -205,18 +167,18 @@ object InteractiveShell { return super.getPlugins().filterNot { it is JavaLanguage } + CordaAuthenticationPlugin(rpcOps) } } - val attributes = emptyMap() + val attributes = emptyMap() val context = PluginContext(discovery, attributes, commandsFS, confFS, classLoader) context.refresh() this.config = config start(context) - connection = makeRPCOps(rpcOps, localUserName, localUserPassword) - return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, connection, StdoutANSIProgressRenderer)) + ops = makeRPCOps(rpcOps, localUserName, localUserPassword) + return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, ops, StdoutANSIProgressRenderer)) } } fun nodeInfo() = try { - connection.nodeInfo() + ops.nodeInfo() } catch (e: UndeclaredThrowableException) { throw e.cause ?: e } @@ -411,14 +373,16 @@ object InteractiveShell { } @JvmStatic - fun runRPCFromString(input: List, out: RenderPrintWriter, context: InvocationContext, cordaRPCOps: CordaRPCOps, om: ObjectMapper): Any? { + fun runRPCFromString(input: List, out: RenderPrintWriter, context: InvocationContext, cordaRPCOps: CordaRPCOps, om: ObjectMapper, isSsh: Boolean = false): Any? { val cmd = input.joinToString(" ").trim { it <= ' ' } - if (cmd.toLowerCase().startsWith("startflow")) { + if (cmd.startsWith("startflow", ignoreCase = true)) { // The flow command provides better support and startFlow requires special handling anyway due to // the generic startFlow RPC interface which offers no type information with which to parse the // string form of the command. out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow) return null + } else if (cmd.substringAfter(" ").trim().equals("gracefulShutdown", ignoreCase = true)) { + return InteractiveShell.gracefulShutdown(out, cordaRPCOps, isSsh) } var result: Any? = null @@ -457,6 +421,68 @@ object InteractiveShell { return result } + + @JvmStatic + fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps, isSsh: Boolean = false) { + + fun display(statements: RenderPrintWriter.() -> Unit) { + statements.invoke(userSessionOut) + userSessionOut.flush() + } + + var isShuttingDown = false + try { + display { + println("Orchestrating a clean shutdown...") + println("...enabling draining mode") + } + cordaRPCOps.setFlowsDrainingModeEnabled(true) + display { + println("...waiting for in-flight flows to be completed") + } + cordaRPCOps.pendingFlowsCount().updates + .doOnError { error -> + log.error(error.message) + throw error + } + .doOnNext { remaining -> + display { + println("...remaining: ${remaining.first}/${remaining.second}") + } + } + .doOnCompleted { + if (isSsh) { + // print in the original Shell process + System.out.println("Shutting down the node via remote SSH session (it may take a while)") + } + display { + println("Shutting down the node (it may take a while)") + } + cordaRPCOps.shutdown() + isShuttingDown = true + connection.forceClose() + display { + println("...done, quitting standalone shell now.") + } + onExit.invoke() + }.toBlocking().single() + } catch (e: StringToMethodCallParser.UnparseableCallException) { + display { + println(e.message, Color.red) + println("Please try 'man run' to learn what syntax is acceptable") + } + } catch (e: Exception) { + if (!isShuttingDown) { + display { + println("RPC failed: ${e.rootCause}", Color.red) + } + } + } finally { + InputStreamSerializer.invokeContext = null + InputStreamDeserializer.closeAll() + } + } + private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter): CordaFuture { val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } @@ -518,6 +544,7 @@ object InteractiveShell { return printNextElements(response.updates, printerFun, out) } if (response is Observable<*>) { + return printNextElements(response, printerFun, out) } @@ -532,94 +559,4 @@ object InteractiveShell { return subscriber.future } - //region Extra serializers - // - // These serializers are used to enable the user to specify objects that aren't natural data containers in the shell, - // and for the shell to print things out that otherwise wouldn't be usefully printable. - - private object ObservableSerializer : JsonSerializer>() { - override fun serialize(value: Observable<*>, gen: JsonGenerator, serializers: SerializerProvider) { - gen.writeString("(observable)") - } - } - - // A file name is deserialized to an InputStream if found. - object InputStreamDeserializer : JsonDeserializer() { - // Keep track of them so we can close them later. - private val streams = Collections.synchronizedSet(HashSet()) - - override fun deserialize(p: JsonParser, ctxt: DeserializationContext): InputStream { - val stream = object : BufferedInputStream(Files.newInputStream(Paths.get(p.text))) { - override fun close() { - super.close() - streams.remove(this) - } - } - streams += stream - return stream - } - - fun closeAll() { - // Clone the set with toList() here so each closed stream can be removed from the set inside close(). - streams.toList().forEach { Closeables.closeQuietly(it) } - } - } - - // An InputStream found in a response triggers a request to the user to provide somewhere to save it. - private object InputStreamSerializer : JsonSerializer() { - var invokeContext: InvocationContext<*>? = null - - override fun serialize(value: InputStream, gen: JsonGenerator, serializers: SerializerProvider) { - try { - val toPath = invokeContext!!.readLine("Path to save stream to (enter to ignore): ", true) - if (toPath == null || toPath.isBlank()) { - gen.writeString("") - } else { - val path = Paths.get(toPath) - value.copyTo(path) - gen.writeString("") - } - } finally { - try { - value.close() - } catch (e: IOException) { - // Ignore. - } - } - } - } - - /** - * String value deserialized to [UniqueIdentifier]. - * Any string value used as [UniqueIdentifier.externalId]. - * If string contains underscore(i.e. externalId_uuid) then split with it. - * Index 0 as [UniqueIdentifier.externalId] - * Index 1 as [UniqueIdentifier.id] - * */ - object UniqueIdentifierDeserializer : JsonDeserializer() { - override fun deserialize(p: JsonParser, ctxt: DeserializationContext): UniqueIdentifier { - //Check if externalId and UUID may be separated by underscore. - if (p.text.contains("_")) { - val ids = p.text.split("_") - //Create UUID object from string. - val uuid: UUID = UUID.fromString(ids[1]) - //Create UniqueIdentifier object using externalId and UUID. - return UniqueIdentifier(ids[0], uuid) - } - //Any other string used as externalId. - return UniqueIdentifier.fromString(p.text) - } - } - - /** - * String value deserialized to [UUID]. - * */ - object UUIDDeserializer : JsonDeserializer() { - override fun deserialize(p: JsonParser, ctxt: DeserializationContext): UUID { - //Create UUID object from string. - return UUID.fromString(p.text) - } - } - - //endregion } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShellCommand.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShellCommand.kt index 5538838c27..6253be2172 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShellCommand.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShellCommand.kt @@ -10,4 +10,5 @@ open class InteractiveShellCommand : BaseCommand() { fun ops() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).rpcOps fun ansiProgressRenderer() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).ansiProgressRenderer fun objectMapper() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).yamlInputMapper + fun isSsh() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).isSsh } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/RPCOpsWithContext.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/RPCOpsWithContext.kt index bca4ad47c5..c4153b16f1 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/RPCOpsWithContext.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/RPCOpsWithContext.kt @@ -16,6 +16,5 @@ fun makeRPCOps(getCordaRPCOps: (username: String, credential: String) -> CordaRP // Unpack exception. throw e.targetException } - } - ) as CordaRPCOps + }) as CordaRPCOps } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/SSHDConfiguration.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/SSHDConfiguration.kt new file mode 100644 index 0000000000..90e08363fa --- /dev/null +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/SSHDConfiguration.kt @@ -0,0 +1,27 @@ +package net.corda.tools.shell + +data class SSHDConfiguration(val port: Int) { + companion object { + internal const val INVALID_PORT_FORMAT = "Invalid port: %s" + private const val MISSING_PORT_FORMAT = "Missing port: %s" + + /** + * Parses a string of the form port into a [SSHDConfiguration]. + * @throws IllegalArgumentException if the port is missing or the string is garbage. + */ + @JvmStatic + fun parse(str: String): SSHDConfiguration { + require(!str.isNullOrBlank()) { SSHDConfiguration.MISSING_PORT_FORMAT.format(str) } + val port = try { + str.toInt() + } catch (ex: NumberFormatException) { + throw IllegalArgumentException("Port syntax is invalid, expected port") + } + return SSHDConfiguration(port) + } + } + + init { + require(port in (0..0xffff)) { INVALID_PORT_FORMAT.format(port) } + } +} \ No newline at end of file diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/SerializationSupport.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/SerializationSupport.kt new file mode 100644 index 0000000000..90a1a149e3 --- /dev/null +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/SerializationSupport.kt @@ -0,0 +1,103 @@ +package net.corda.tools.shell + +import com.fasterxml.jackson.core.JsonGenerator +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.databind.DeserializationContext +import com.fasterxml.jackson.databind.JsonDeserializer +import com.fasterxml.jackson.databind.JsonSerializer +import com.fasterxml.jackson.databind.SerializerProvider +import com.google.common.io.Closeables +import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.internal.copyTo +import org.crsh.command.InvocationContext +import rx.Observable +import java.io.BufferedInputStream +import java.io.InputStream +import java.nio.file.Files +import java.nio.file.Paths +import java.util.* + +//region Extra serializers +// +// These serializers are used to enable the user to specify objects that aren't natural data containers in the shell, +// and for the shell to print things out that otherwise wouldn't be usefully printable. + +object ObservableSerializer : JsonSerializer>() { + override fun serialize(value: Observable<*>, gen: JsonGenerator, serializers: SerializerProvider) { + gen.writeString("(observable)") + } +} + +/** + * String value deserialized to [UniqueIdentifier]. + * Any string value used as [UniqueIdentifier.externalId]. + * If string contains underscore(i.e. externalId_uuid) then split with it. + * Index 0 as [UniqueIdentifier.externalId] + * Index 1 as [UniqueIdentifier.id] + * */ +object UniqueIdentifierDeserializer : JsonDeserializer() { + override fun deserialize(p: JsonParser, ctxt: DeserializationContext): UniqueIdentifier { + //Check if externalId and UUID may be separated by underscore. + if (p.text.contains("_")) { + val ids = p.text.split("_") + //Create UUID object from string. + val uuid: UUID = UUID.fromString(ids[1]) + //Create UniqueIdentifier object using externalId and UUID. + return UniqueIdentifier(ids[0], uuid) + } + //Any other string used as externalId. + return UniqueIdentifier.fromString(p.text) + } +} + +/** + * String value deserialized to [UUID]. + * */ +object UUIDDeserializer : JsonDeserializer() { + override fun deserialize(p: JsonParser, ctxt: DeserializationContext): UUID { + //Create UUID object from string. + return UUID.fromString(p.text) + } +} + +// An InputStream found in a response triggers a request to the user to provide somewhere to save it. +object InputStreamSerializer : JsonSerializer() { + var invokeContext: InvocationContext<*>? = null + + override fun serialize(value: InputStream, gen: JsonGenerator, serializers: SerializerProvider) { + + value.use { + val toPath = invokeContext!!.readLine("Path to save stream to (enter to ignore): ", true) + if (toPath == null || toPath.isBlank()) { + gen.writeString("") + } else { + val path = Paths.get(toPath) + it.copyTo(path) + gen.writeString("") + } + } + } +} + +// A file name is deserialized to an InputStream if found. +object InputStreamDeserializer : JsonDeserializer() { + // Keep track of them so we can close them later. + private val streams = Collections.synchronizedSet(HashSet()) + + override fun deserialize(p: JsonParser, ctxt: DeserializationContext): InputStream { + val stream = object : BufferedInputStream(Files.newInputStream(Paths.get(p.text))) { + override fun close() { + super.close() + streams.remove(this) + } + } + streams += stream + return stream + } + + fun closeAll() { + // Clone the set with toList() here so each closed stream can be removed from the set inside close(). + streams.toList().forEach { Closeables.closeQuietly(it) } + } +} +//endregion \ No newline at end of file diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/ShellConfiguration.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/ShellConfiguration.kt new file mode 100644 index 0000000000..2714679720 --- /dev/null +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/ShellConfiguration.kt @@ -0,0 +1,28 @@ +package net.corda.tools.shell + +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.nodeapi.internal.config.SSLConfiguration +import java.nio.file.Path +import java.nio.file.Paths + +data class ShellConfiguration( + val commandsDirectory: Path, + val cordappsDirectory: Path? = null, + var user: String = "", + var password: String = "", + val hostAndPort: NetworkHostAndPort, + val ssl: ShellSslOptions? = null, + val sshdPort: Int? = null, + val sshHostKeyDirectory: Path? = null, + val noLocalShell: Boolean = false) { + companion object { + const val SSH_PORT = 2222 + const val COMMANDS_DIR = "shell-commands" + const val CORDAPPS_DIR = "cordapps" + const val SSHD_HOSTKEY_DIR = "ssh" + } +} + +data class ShellSslOptions(override val sslKeystore: Path, override val keyStorePassword: String, override val trustStoreFile:Path, override val trustStorePassword: String) : SSLConfiguration { + override val certificatesDirectory: Path get() = Paths.get("") +} \ No newline at end of file diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt index 16613e27a1..349bbefc0e 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt @@ -105,6 +105,7 @@ class StandaloneShell(private val configuration: ShellConfiguration) { configuration.sshdPort?.apply{ println("SSH server listening on port $this.") } exit.await() + // because we can't clean certain Crash Shell threads that block on read() exitProcess(0) } } diff --git a/tools/shell/src/test/kotlin/net/corda/tools/shell/CustomTypeJsonParsingTests.kt b/tools/shell/src/test/kotlin/net/corda/tools/shell/CustomTypeJsonParsingTests.kt index 7ab1718558..e82f51ac24 100644 --- a/tools/shell/src/test/kotlin/net/corda/tools/shell/CustomTypeJsonParsingTests.kt +++ b/tools/shell/src/test/kotlin/net/corda/tools/shell/CustomTypeJsonParsingTests.kt @@ -27,8 +27,8 @@ class CustomTypeJsonParsingTests { fun setup() { objectMapper = ObjectMapper() val simpleModule = SimpleModule() - simpleModule.addDeserializer(UniqueIdentifier::class.java, InteractiveShell.UniqueIdentifierDeserializer) - simpleModule.addDeserializer(UUID::class.java, InteractiveShell.UUIDDeserializer) + simpleModule.addDeserializer(UniqueIdentifier::class.java, UniqueIdentifierDeserializer) + simpleModule.addDeserializer(UUID::class.java, UUIDDeserializer) objectMapper.registerModule(simpleModule) } From 8591ae2dc9131e590ced855dacfd8ef8c199a73d Mon Sep 17 00:00:00 2001 From: igor nitto Date: Mon, 19 Mar 2018 14:34:02 +0000 Subject: [PATCH 5/5] CordApps back in node classpath (#2843) [CORDA-1232] --- .../corda/nodeapi/internal/persistence/CordaPersistence.kt | 5 ++--- node/src/main/java/CordaCaplet.java | 1 + .../main/kotlin/net/corda/node/internal/AbstractNode.kt | 7 +++---- node/src/main/kotlin/net/corda/node/internal/Node.kt | 2 -- .../src/main/kotlin/net/corda/testing/node/MockServices.kt | 2 +- 5 files changed, 7 insertions(+), 10 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt index 6c16d28785..4884c21832 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt @@ -52,8 +52,7 @@ class CordaPersistence( val dataSource: DataSource, databaseConfig: DatabaseConfig, schemas: Set, - attributeConverters: Collection> = emptySet(), - val cordappClassLoader: ClassLoader? = null + attributeConverters: Collection> = emptySet() ) : Closeable { companion object { private val log = contextLogger() @@ -62,7 +61,7 @@ class CordaPersistence( private val defaultIsolationLevel = databaseConfig.transactionIsolationLevel val hibernateConfig: HibernateConfiguration by lazy { transaction { - HibernateConfiguration(schemas, databaseConfig, attributeConverters, cordappClassLoader) + HibernateConfiguration(schemas, databaseConfig, attributeConverters) } } val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas diff --git a/node/src/main/java/CordaCaplet.java b/node/src/main/java/CordaCaplet.java index c127eefb8c..c9201a3621 100644 --- a/node/src/main/java/CordaCaplet.java +++ b/node/src/main/java/CordaCaplet.java @@ -82,6 +82,7 @@ public class CordaCaplet extends Capsule { (new File(baseDir, "cordapps")).mkdir(); // Add additional directories of JARs to the classpath (at the end). e.g. for JDBC drivers + augmentClasspath((List) cp, new File(baseDir, "cordapps")); try { List jarDirs = nodeConfig.getStringList("jarDirs"); log(LOG_VERBOSE, "Configured JAR directories = " + jarDirs); diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 6f8572fd1a..ef1bbceeb0 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -643,7 +643,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected open fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence { val props = configuration.dataSourceProperties if (props.isEmpty()) throw DatabaseConfigurationException("There must be a database configured.") - val database = configureDatabase(props, configuration.database, identityService, schemaService, cordappLoader.appClassLoader) + val database = configureDatabase(props, configuration.database, identityService, schemaService) // Now log the vendor string as this will also cause a connection to be tested eagerly. logVendorString(database, log) runOnStop += database::close @@ -879,8 +879,7 @@ internal class NetworkMapCacheEmptyException : Exception() fun configureDatabase(hikariProperties: Properties, databaseConfig: DatabaseConfig, identityService: IdentityService, - schemaService: SchemaService = NodeSchemaService(), - cordappClassLoader: ClassLoader? = null): CordaPersistence { + schemaService: SchemaService = NodeSchemaService()): CordaPersistence { // Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately // Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default // so we end up providing both descriptor and converter. We should re-examine this in later versions to see if @@ -888,5 +887,5 @@ fun configureDatabase(hikariProperties: Properties, JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(identityService)) val dataSource = DataSourceFactory.createDataSource(hikariProperties) val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(identityService)) - return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, attributeConverters, cordappClassLoader) + return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, attributeConverters) } diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 4879d5868b..8fb3e583d5 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -173,8 +173,6 @@ open class Node(configuration: NodeConfiguration, printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString()) rpcServerAddresses?.let { rpcMessagingClient = RPCMessagingClient(configuration.rpcOptions.sslConfig, it.admin, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE) - printBasicNodeInfo("RPC connection address", it.primary.toString()) - printBasicNodeInfo("RPC admin connection address", it.admin.toString()) } verifierMessagingClient = when (configuration.verifierType) { VerifierType.OutOfProcess -> throw IllegalArgumentException("OutOfProcess verifier not supported") //VerifierMessagingClient(configuration, serverAddress, services.monitoringService.metrics, /*networkParameters.maxMessageSize*/MAX_FILE_SIZE) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index 54d5fe3362..7a2e3cba34 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -102,7 +102,7 @@ open class MockServices private constructor( val cordappLoader = CordappLoader.createWithTestPackages(cordappPackages) val dataSourceProps = makeTestDataSourceProperties() val schemaService = NodeSchemaService(cordappLoader.cordappSchemas) - val database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService, schemaService, cordappLoader.appClassLoader) + val database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService, schemaService) val mockService = database.transaction { object : MockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys) { override val vaultService: VaultService = makeVaultService(database.hibernateConfig, schemaService)