diff --git a/.ci/api-current.txt b/.ci/api-current.txt index fae0f1dfcf..752c2ce5a3 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -658,10 +658,25 @@ public static final class net.corda.core.contracts.UniqueIdentifier$Companion ex @org.jetbrains.annotations.NotNull public abstract List getServiceFlows() @org.jetbrains.annotations.NotNull public abstract List getServices() ## +@net.corda.core.DoNotImplement public interface net.corda.core.cordapp.CordappConfig + public abstract boolean exists(String) + @org.jetbrains.annotations.NotNull public abstract Object get(String) + public abstract boolean getBoolean(String) + public abstract double getDouble(String) + public abstract float getFloat(String) + public abstract int getInt(String) + public abstract long getLong(String) + @org.jetbrains.annotations.NotNull public abstract Number getNumber(String) + @org.jetbrains.annotations.NotNull public abstract String getString(String) +## +public final class net.corda.core.cordapp.CordappConfigException extends java.lang.Exception + public (String, Throwable) +## public final class net.corda.core.cordapp.CordappContext extends java.lang.Object public (net.corda.core.cordapp.Cordapp, net.corda.core.crypto.SecureHash, ClassLoader, net.corda.core.cordapp.CordappConfig) @org.jetbrains.annotations.Nullable public final net.corda.core.crypto.SecureHash getAttachmentId() @org.jetbrains.annotations.NotNull public final ClassLoader getClassLoader() + @org.jetbrains.annotations.NotNull public final net.corda.core.cordapp.CordappConfig getConfig() @org.jetbrains.annotations.NotNull public final net.corda.core.cordapp.Cordapp getCordapp() ## @net.corda.core.DoNotImplement public interface net.corda.core.cordapp.CordappProvider @@ -966,6 +981,8 @@ public static final class net.corda.core.crypto.PartialMerkleTree$Companion exte @kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public static final net.corda.core.crypto.SecureHash$SHA256 sha256Twice(byte[]) @org.jetbrains.annotations.NotNull public String toString() public static final net.corda.core.crypto.SecureHash$Companion Companion + @org.jetbrains.annotations.NotNull public static final net.corda.core.crypto.SecureHash$SHA256 allOnesHash + @org.jetbrains.annotations.NotNull public static final net.corda.core.crypto.SecureHash$SHA256 zeroHash ## public static final class net.corda.core.crypto.SecureHash$Companion extends java.lang.Object @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash$SHA256 getAllOnesHash() @@ -1377,6 +1394,15 @@ public static final class net.corda.core.flows.NotarisationRequest$Companion ext public int hashCode() public String toString() ## +@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.NotarisationResponse extends java.lang.Object + public (List) + @org.jetbrains.annotations.NotNull public final List component1() + @org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotarisationResponse copy(List) + public boolean equals(Object) + @org.jetbrains.annotations.NotNull public final List getSignatures() + public int hashCode() + public String toString() +## @net.corda.core.flows.InitiatingFlow public final class net.corda.core.flows.NotaryChangeFlow extends net.corda.core.flows.AbstractStateReplacementFlow$Instigator public (net.corda.core.contracts.StateAndRef, net.corda.core.identity.Party, net.corda.core.utilities.ProgressTracker) @org.jetbrains.annotations.NotNull protected net.corda.core.flows.AbstractStateReplacementFlow$UpgradeTx assembleTx() @@ -1384,10 +1410,12 @@ public static final class net.corda.core.flows.NotarisationRequest$Companion ext @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.flows.NotaryError extends java.lang.Object ## @net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$Conflict extends net.corda.core.flows.NotaryError + public (net.corda.core.crypto.SecureHash, Map) @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash component1() @org.jetbrains.annotations.NotNull public final Map component2() @org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotaryError$Conflict copy(net.corda.core.crypto.SecureHash, Map) public boolean equals(Object) + @org.jetbrains.annotations.NotNull public final Map getConsumedStates() @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getTxId() public int hashCode() @org.jetbrains.annotations.NotNull public String toString() @@ -1440,6 +1468,7 @@ public static final class net.corda.core.flows.NotaryError$TimeWindowInvalid$Com @net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.NotaryException extends net.corda.core.flows.FlowException public (net.corda.core.flows.NotaryError, net.corda.core.crypto.SecureHash) @org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotaryError getError() + @org.jetbrains.annotations.Nullable public final net.corda.core.crypto.SecureHash getTxId() ## public final class net.corda.core.flows.NotaryFlow extends java.lang.Object public () @@ -1471,6 +1500,10 @@ public abstract static class net.corda.core.flows.NotaryFlow$Service extends net @org.jetbrains.annotations.NotNull public final net.corda.core.node.services.TrustedAuthorityNotaryService getService() @co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull protected abstract net.corda.core.flows.TransactionParts validateRequest(net.corda.core.flows.NotarisationPayload) ## +@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.NotaryInternalException extends net.corda.core.flows.FlowException + public (net.corda.core.flows.NotaryError) + @org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotaryError getError() +## public final class net.corda.core.flows.ReceiveStateAndRefFlow extends net.corda.core.flows.FlowLogic public (net.corda.core.flows.FlowSession) @co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public List call() @@ -1523,6 +1556,15 @@ public @interface net.corda.core.flows.StartableByRPC ## public @interface net.corda.core.flows.StartableByService ## +@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.StateConsumptionDetails extends java.lang.Object + public (net.corda.core.crypto.SecureHash) + @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash component1() + @org.jetbrains.annotations.NotNull public final net.corda.core.flows.StateConsumptionDetails copy(net.corda.core.crypto.SecureHash) + public boolean equals(Object) + @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getHashOfTransactionId() + public int hashCode() + public String toString() +## @net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.StateMachineRunId extends java.lang.Object public (UUID) @org.jetbrains.annotations.NotNull public final UUID component1() @@ -1669,6 +1711,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec @org.jetbrains.annotations.NotNull public abstract List queryAttachments(net.corda.core.node.services.vault.AttachmentQueryCriteria, net.corda.core.node.services.vault.AttachmentSort) @org.jetbrains.annotations.NotNull public abstract List registeredFlows() public abstract void setFlowsDrainingModeEnabled(boolean) + public abstract void shutdown() @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.FlowHandle startFlowDynamic(Class, Object...) @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.FlowProgressHandle startTrackedFlowDynamic(Class, Object...) @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed stateMachineRecordedTransactionMappingFeed() @@ -1692,6 +1735,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec @org.jetbrains.annotations.Nullable public abstract net.corda.core.identity.Party wellKnownPartyFromX500Name(net.corda.core.identity.CordaX500Name) ## public final class net.corda.core.messaging.CordaRPCOpsKt extends java.lang.Object + @org.jetbrains.annotations.NotNull public static final net.corda.core.messaging.DataFeed pendingFlowsCount(net.corda.core.messaging.CordaRPCOps) ## @net.corda.core.serialization.CordaSerializable public final class net.corda.core.messaging.DataFeed extends java.lang.Object public (Object, rx.Observable) @@ -1891,6 +1935,7 @@ public @interface net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey) @org.jetbrains.annotations.NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction) @org.jetbrains.annotations.NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction, java.security.PublicKey) + @org.jetbrains.annotations.NotNull public abstract net.corda.core.cordapp.CordappContext getAppContext() @org.jetbrains.annotations.NotNull public abstract java.time.Clock getClock() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.ContractUpgradeService getContractUpgradeService() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.KeyManagementService getKeyManagementService() @@ -2870,6 +2915,8 @@ public interface net.corda.core.schemas.StatePersistable public interface net.corda.core.serialization.ClassWhitelist public abstract boolean hasListed(Class) ## +public @interface net.corda.core.serialization.ConstructorForDeserialization +## public @interface net.corda.core.serialization.CordaSerializable ## public @interface net.corda.core.serialization.CordaSerializationTransformEnumDefault @@ -2889,6 +2936,9 @@ public @interface net.corda.core.serialization.CordaSerializationTransformRename public @interface net.corda.core.serialization.DeprecatedConstructorForDeserialization public abstract int version() ## +@net.corda.core.DoNotImplement public interface net.corda.core.serialization.EncodingWhitelist + public abstract boolean acceptEncoding(net.corda.core.serialization.SerializationEncoding) +## @net.corda.core.serialization.CordaSerializable public final class net.corda.core.serialization.MissingAttachmentsException extends net.corda.core.CordaException public (List) @org.jetbrains.annotations.NotNull public final List getIds() @@ -2909,6 +2959,8 @@ public final class net.corda.core.serialization.SerializationAPIKt extends java. ## @net.corda.core.DoNotImplement public interface net.corda.core.serialization.SerializationContext @org.jetbrains.annotations.NotNull public abstract ClassLoader getDeserializationClassLoader() + @org.jetbrains.annotations.Nullable public abstract net.corda.core.serialization.SerializationEncoding getEncoding() + @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.EncodingWhitelist getEncodingWhitelist() public abstract boolean getObjectReferencesEnabled() @org.jetbrains.annotations.NotNull public abstract net.corda.core.utilities.ByteSequence getPreferredSerializationVersion() @org.jetbrains.annotations.NotNull public abstract Map getProperties() @@ -2916,6 +2968,7 @@ public final class net.corda.core.serialization.SerializationAPIKt extends java. @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.ClassWhitelist getWhitelist() @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withAttachmentsClassLoader(List) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withClassLoader(ClassLoader) + @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withEncoding(net.corda.core.serialization.SerializationEncoding) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withPreferredSerializationVersion(net.corda.core.utilities.ByteSequence) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withProperty(Object, Object) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withWhitelisted(Class) @@ -2939,6 +2992,8 @@ public final class net.corda.core.serialization.SerializationDefaults extends ja @org.jetbrains.annotations.NotNull public final net.corda.core.serialization.SerializationContext getSTORAGE_CONTEXT() public static final net.corda.core.serialization.SerializationDefaults INSTANCE ## +@net.corda.core.DoNotImplement public interface net.corda.core.serialization.SerializationEncoding +## public abstract class net.corda.core.serialization.SerializationFactory extends java.lang.Object public () public final Object asCurrent(kotlin.jvm.functions.Function1) @@ -3018,13 +3073,20 @@ public static final class net.corda.core.serialization.SingletonSerializationTok @org.jetbrains.annotations.NotNull public final Map component2() @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.ContractUpgradeFilteredTransaction copy(Map, Map) public boolean equals(Object) + @org.jetbrains.annotations.NotNull public final Map getHiddenComponents() @org.jetbrains.annotations.NotNull public net.corda.core.crypto.SecureHash getId() @org.jetbrains.annotations.NotNull public List getInputs() @org.jetbrains.annotations.NotNull public net.corda.core.identity.Party getNotary() @org.jetbrains.annotations.NotNull public List getOutputs() + @org.jetbrains.annotations.NotNull public final Map getVisibleComponents() public int hashCode() public String toString() ## +@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.transactions.ContractUpgradeFilteredTransaction$FilteredComponent extends java.lang.Object + public (net.corda.core.utilities.OpaqueBytes, net.corda.core.crypto.SecureHash) + @org.jetbrains.annotations.NotNull public final net.corda.core.utilities.OpaqueBytes getComponent() + @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getNonce() +## @net.corda.core.DoNotImplement public final class net.corda.core.transactions.ContractUpgradeLedgerTransaction extends net.corda.core.transactions.FullTransaction implements net.corda.core.transactions.TransactionWithSignatures public (List, net.corda.core.identity.Party, net.corda.core.contracts.Attachment, String, net.corda.core.contracts.Attachment, net.corda.core.crypto.SecureHash, net.corda.core.contracts.PrivacySalt, List, net.corda.core.node.NetworkParameters) public void checkSignaturesAreValid() @@ -3069,12 +3131,18 @@ public static final class net.corda.core.serialization.SingletonSerializationTok @org.jetbrains.annotations.NotNull public net.corda.core.identity.Party getNotary() @org.jetbrains.annotations.NotNull public List getOutputs() @org.jetbrains.annotations.NotNull public final net.corda.core.contracts.PrivacySalt getPrivacySalt() + @org.jetbrains.annotations.NotNull public final List getSerializedComponents() @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getUpgradedContractAttachmentId() @org.jetbrains.annotations.NotNull public final String getUpgradedContractClassName() public int hashCode() @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.ContractUpgradeLedgerTransaction resolve(net.corda.core.node.ServicesForResolution, List) public String toString() ## +public static final class net.corda.core.transactions.ContractUpgradeWireTransaction$Component extends java.lang.Enum + protected (String, int) + public static net.corda.core.transactions.ContractUpgradeWireTransaction$Component valueOf(String) + public static net.corda.core.transactions.ContractUpgradeWireTransaction$Component[] values() +## @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.transactions.CoreTransaction extends net.corda.core.transactions.BaseTransaction public () @org.jetbrains.annotations.NotNull public abstract List getInputs() @@ -3206,6 +3274,7 @@ public static final class net.corda.core.transactions.LedgerTransaction$InOutGro ## @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public final class net.corda.core.transactions.NotaryChangeWireTransaction extends net.corda.core.transactions.CoreTransaction public (List) + @kotlin.Deprecated public (List, net.corda.core.identity.Party, net.corda.core.identity.Party) @org.jetbrains.annotations.NotNull public final List component1() @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.NotaryChangeWireTransaction copy(List) public boolean equals(Object) @@ -3214,11 +3283,17 @@ public static final class net.corda.core.transactions.LedgerTransaction$InOutGro @org.jetbrains.annotations.NotNull public final net.corda.core.identity.Party getNewNotary() @org.jetbrains.annotations.NotNull public net.corda.core.identity.Party getNotary() @org.jetbrains.annotations.NotNull public List getOutputs() + @org.jetbrains.annotations.NotNull public final List getSerializedComponents() public int hashCode() @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.NotaryChangeLedgerTransaction resolve(net.corda.core.node.ServiceHub, List) @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.NotaryChangeLedgerTransaction resolve(net.corda.core.node.ServicesForResolution, List) public String toString() ## +public static final class net.corda.core.transactions.NotaryChangeWireTransaction$Component extends java.lang.Enum + protected (String, int) + public static net.corda.core.transactions.NotaryChangeWireTransaction$Component valueOf(String) + public static net.corda.core.transactions.NotaryChangeWireTransaction$Component[] values() +## @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public final class net.corda.core.transactions.SignedTransaction extends java.lang.Object implements net.corda.core.transactions.TransactionWithSignatures public (net.corda.core.serialization.SerializedBytes, List) public (net.corda.core.transactions.CoreTransaction, List) @@ -3362,6 +3437,7 @@ public final class net.corda.core.utilities.ByteArrays extends java.lang.Object @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.utilities.ByteSequence extends java.lang.Object implements java.lang.Comparable public int compareTo(net.corda.core.utilities.ByteSequence) @org.jetbrains.annotations.NotNull public final net.corda.core.utilities.ByteSequence copy() + @org.jetbrains.annotations.NotNull public final byte[] copyBytes() public boolean equals(Object) @org.jetbrains.annotations.NotNull public abstract byte[] getBytes() public final int getOffset() @@ -3371,9 +3447,12 @@ public final class net.corda.core.utilities.ByteArrays extends java.lang.Object @kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public static final net.corda.core.utilities.ByteSequence of(byte[], int) @kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public static final net.corda.core.utilities.ByteSequence of(byte[], int, int) @org.jetbrains.annotations.NotNull public final java.io.ByteArrayInputStream open() + @org.jetbrains.annotations.NotNull public final java.nio.ByteBuffer putTo(java.nio.ByteBuffer) + @org.jetbrains.annotations.NotNull public final java.nio.ByteBuffer slice(int, int) @org.jetbrains.annotations.NotNull public final net.corda.core.utilities.ByteSequence subSequence(int, int) @org.jetbrains.annotations.NotNull public final net.corda.core.utilities.ByteSequence take(int) @org.jetbrains.annotations.NotNull public String toString() + public final void writeTo(java.io.OutputStream) public static final net.corda.core.utilities.ByteSequence$Companion Companion ## public static final class net.corda.core.utilities.ByteSequence$Companion extends java.lang.Object @@ -4151,6 +4230,7 @@ public class net.corda.testing.node.MockServices extends java.lang.Object implem @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction, java.security.PublicKey) + @org.jetbrains.annotations.NotNull public net.corda.core.cordapp.CordappContext getAppContext() @org.jetbrains.annotations.NotNull public final net.corda.testing.services.MockAttachmentStorage getAttachments() @org.jetbrains.annotations.NotNull public java.time.Clock getClock() @org.jetbrains.annotations.NotNull public net.corda.core.node.services.ContractUpgradeService getContractUpgradeService() @@ -4209,6 +4289,7 @@ public static final class net.corda.testing.node.MockServicesKt$createMockCordaS @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction, java.security.PublicKey) + @org.jetbrains.annotations.NotNull public net.corda.core.cordapp.CordappContext getAppContext() @org.jetbrains.annotations.NotNull public net.corda.core.node.services.AttachmentStorage getAttachments() @org.jetbrains.annotations.NotNull public java.time.Clock getClock() @org.jetbrains.annotations.NotNull public net.corda.core.node.services.ContractUpgradeService getContractUpgradeService() @@ -4313,18 +4394,22 @@ public final class net.corda.client.rpc.CordaRPCClient extends java.lang.Object ## public static final class net.corda.client.rpc.CordaRPCClient$Companion extends java.lang.Object ## -public final class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang.Object - public (java.time.Duration) - @org.jetbrains.annotations.NotNull public final java.time.Duration component1() - @org.jetbrains.annotations.NotNull public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration) - public boolean equals(Object) - @org.jetbrains.annotations.NotNull public final java.time.Duration getConnectionMaxRetryInterval() - public int hashCode() - public String toString() +public interface net.corda.client.rpc.CordaRPCClientConfiguration + public abstract int getCacheConcurrencyLevel() + @org.jetbrains.annotations.NotNull public abstract java.time.Duration getConnectionMaxRetryInterval() + @org.jetbrains.annotations.NotNull public abstract java.time.Duration getConnectionRetryInterval() + public abstract double getConnectionRetryIntervalMultiplier() + @org.jetbrains.annotations.NotNull public abstract java.time.Duration getDeduplicationCacheExpiry() + public abstract int getMaxFileSize() + public abstract int getMaxReconnectAttempts() + public abstract int getMinimumServerProtocolVersion() + public abstract int getObservationExecutorPoolSize() + @org.jetbrains.annotations.NotNull public abstract java.time.Duration getReapInterval() + public abstract boolean getTrackRpcCallSites() public static final net.corda.client.rpc.CordaRPCClientConfiguration$Companion Companion - @org.jetbrains.annotations.NotNull public static final net.corda.client.rpc.CordaRPCClientConfiguration DEFAULT ## public static final class net.corda.client.rpc.CordaRPCClientConfiguration$Companion extends java.lang.Object + @org.jetbrains.annotations.NotNull public final net.corda.client.rpc.CordaRPCClientConfiguration default() ## @net.corda.core.DoNotImplement public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Object implements net.corda.client.rpc.RPCConnection public (net.corda.client.rpc.RPCConnection) diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt index a0957211d6..abb98d90af 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt @@ -68,9 +68,9 @@ class NodeMonitorModel { fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) { val client = CordaRPCClient( nodeHostAndPort, - CordaRPCClientConfiguration.DEFAULT.copy( - connectionMaxRetryInterval = 10.seconds - ) + object : CordaRPCClientConfiguration { + override val connectionMaxRetryInterval = 10.seconds + } ) val connection = client.start(username, password) val proxy = connection.proxy diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt index eec3f54f53..5ee6d56059 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt @@ -28,13 +28,13 @@ import net.corda.finance.flows.CashPaymentFlow import net.corda.finance.schemas.CashSchemaV1 import net.corda.node.internal.Node import net.corda.node.internal.StartedNode -import net.corda.node.services.Permissions.Companion.invokeRpc -import net.corda.node.services.Permissions.Companion.startFlow +import net.corda.node.services.Permissions.Companion.all import net.corda.testing.core.* import net.corda.testing.node.User import net.corda.testing.internal.IntegrationTestSchemas import net.corda.testing.internal.toDatabaseSchemaName import net.corda.testing.node.internal.NodeBasedTest +import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType @@ -42,17 +42,17 @@ import org.junit.After import org.junit.Before import org.junit.ClassRule import org.junit.Test +import rx.subjects.PublishSubject +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertTrue class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) { - private val rpcUser = User("user1", "test", permissions = setOf( - startFlow(), - startFlow(), - invokeRpc("vaultQueryBy"), - invokeRpc(CordaRPCOps::stateMachinesFeed), - invokeRpc("vaultQueryByCriteria")) + private val rpcUser = User("user1", "test", permissions = setOf(all()) ) private lateinit var node: StartedNode private lateinit var identity: Party @@ -72,7 +72,9 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C override fun setUp() { super.setUp() node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) - client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!) + client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, object : CordaRPCClientConfiguration { + override val maxReconnectAttempts = 5 + }) identity = node.info.identityFromX500Name(ALICE_NAME) } @@ -100,6 +102,61 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C } } + @Test + fun `shutdown command stops the node`() { + + val nodeIsShut: PublishSubject = PublishSubject.create() + val latch = CountDownLatch(1) + var successful = false + val maxCount = 20 + var count = 0 + CloseableExecutor(Executors.newSingleThreadScheduledExecutor()).use { scheduler -> + + val task = scheduler.scheduleAtFixedRate({ + try { + println("Checking whether node is still running...") + client.start(rpcUser.username, rpcUser.password).use { + println("... node is still running.") + if (count == maxCount) { + nodeIsShut.onError(AssertionError("Node does not get shutdown by RPC")) + } + count++ + } + } catch (e: ActiveMQNotConnectedException) { + println("... node is not running.") + nodeIsShut.onCompleted() + } catch (e: ActiveMQSecurityException) { + // nothing here - this happens if trying to connect before the node is started + } catch (e: Throwable) { + nodeIsShut.onError(e) + } + }, 1, 1, TimeUnit.SECONDS) + + nodeIsShut.doOnError { error -> + error.printStackTrace() + successful = false + task.cancel(true) + latch.countDown() + }.doOnCompleted { + successful = (node.internals.started == null) + task.cancel(true) + latch.countDown() + }.subscribe() + + client.start(rpcUser.username, rpcUser.password).use { rpc -> rpc.proxy.shutdown() } + + latch.await() + assertThat(successful).isTrue() + } + } + + private class CloseableExecutor(private val delegate: ScheduledExecutorService) : AutoCloseable, ScheduledExecutorService by delegate { + + override fun close() { + delegate.shutdown() + } + } + @Test fun `close-send deadlock and premature shutdown on empty observable`() { println("Starting client") @@ -160,7 +217,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C val updates = proxy.stateMachinesFeed().updates - node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0),identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow() + node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0), identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow() proxy.startFlow(::CashIssueFlow, 123.DOLLARS, OpaqueBytes.of(0), identity).returnValue.getOrThrow() proxy.startFlowDynamic(CashIssueFlow::class.java, 1000.DOLLARS, OpaqueBytes.of(0), identity).returnValue.getOrThrow() diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index c428d90860..97dd383557 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -11,7 +11,7 @@ package net.corda.client.rpc import net.corda.client.rpc.internal.RPCClient -import net.corda.client.rpc.internal.RPCClientConfiguration +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.context.Trace import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork @@ -115,7 +115,7 @@ class RPCStabilityTests { Try.on { startRpcClient( server.get().broker.hostAndPort!!, - configuration = RPCClientConfiguration.default.copy(minimumServerProtocolVersion = 1) + configuration = CordaRPCClientConfigurationImpl.default.copy(minimumServerProtocolVersion = 1) ).get() } } @@ -250,7 +250,7 @@ class RPCStabilityTests { val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() // Set retry interval to 1s to reduce test duration - val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds) + val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds) val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() clientFollower.unfollow() @@ -276,7 +276,7 @@ class RPCStabilityTests { val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() // Set retry interval to 1s to reduce test duration - val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5) + val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5) val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() clientFollower.unfollow() @@ -308,7 +308,7 @@ class RPCStabilityTests { val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() - val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1) + val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1) val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() clientFollower.unfollow() diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index 0d0dcbe427..4bedf058ce 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -12,7 +12,7 @@ package net.corda.client.rpc import net.corda.client.rpc.internal.KryoClientSerializationScheme import net.corda.client.rpc.internal.RPCClient -import net.corda.client.rpc.internal.RPCClientConfiguration +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.context.Actor import net.corda.core.context.Trace import net.corda.core.messaging.CordaRPCOps @@ -33,23 +33,46 @@ class CordaRPCConnection internal constructor(connection: RPCConnection( tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration), - configuration.toRpcClientConfiguration(), + configuration, if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT ) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt index 45a8f31463..dc45c1b729 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt @@ -12,19 +12,32 @@ package net.corda.client.rpc.internal import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.pendingFlowsCount import net.corda.core.utilities.NetworkHostAndPort import net.corda.nodeapi.internal.config.SSLConfiguration +import rx.Observable /** Utility which exposes the internal Corda RPC constructor to other internal Corda components */ fun createCordaRPCClientWithSsl( hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), sslConfiguration: SSLConfiguration? = null ) = CordaRPCClient.createWithSsl(hostAndPort, configuration, sslConfiguration) fun createCordaRPCClientWithSslAndClassLoader( hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), sslConfiguration: SSLConfiguration? = null, classLoader: ClassLoader? = null -) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader) \ No newline at end of file +) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader) + +fun CordaRPCOps.drainAndShutdown(): Observable { + + setFlowsDrainingModeEnabled(true) + return pendingFlowsCount().updates + .doOnError { error -> + throw error + } + .doOnCompleted { shutdown() }.map { } +} \ No newline at end of file diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index a085144c80..2b2e1539f6 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -10,7 +10,6 @@ package net.corda.client.rpc.internal -import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.RPCConnection import net.corda.client.rpc.RPCException @@ -37,40 +36,22 @@ import java.time.Duration /** * This configuration may be used to tweak the internals of the RPC client. */ -data class RPCClientConfiguration( - /** The minimum protocol version required from the server */ - val minimumServerProtocolVersion: Int, - /** - * If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a - * returned Observable stream the stack trace of the originating RPC will be shown as well. Note that - * constructing call stacks is a moderately expensive operation. - */ - val trackRpcCallSites: Boolean, - /** - * The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references - * and are cleaned up in batches in this interval. If set too large it will waste server side resources for this - * duration. If set too low it wastes client side cycles. - */ - val reapInterval: Duration, - /** The number of threads to use for observations (for executing [Observable.onNext]) */ - val observationExecutorPoolSize: Int, - /** The retry interval of artemis connections in milliseconds */ - val connectionRetryInterval: Duration, - /** The retry interval multiplier for exponential backoff */ - val connectionRetryIntervalMultiplier: Double, - /** Maximum retry interval */ - val connectionMaxRetryInterval: Duration, - /** Maximum reconnect attempts on failover */ - val maxReconnectAttempts: Int, - /** Maximum file size */ - val maxFileSize: Int, - /** The cache expiry of a deduplication watermark per client. */ - val deduplicationCacheExpiry: Duration -) { +data class CordaRPCClientConfigurationImpl( + override val minimumServerProtocolVersion: Int, + override val trackRpcCallSites: Boolean, + override val reapInterval: Duration, + override val observationExecutorPoolSize: Int, + override val connectionRetryInterval: Duration, + override val connectionRetryIntervalMultiplier: Double, + override val connectionMaxRetryInterval: Duration, + override val maxReconnectAttempts: Int, + override val maxFileSize: Int, + override val deduplicationCacheExpiry: Duration +) : CordaRPCClientConfiguration { companion object { - val unlimitedReconnectAttempts = -1 + private const val unlimitedReconnectAttempts = -1 @JvmStatic - val default = RPCClientConfiguration( + val default = CordaRPCClientConfigurationImpl( minimumServerProtocolVersion = 0, trackRpcCallSites = false, reapInterval = 1.seconds, @@ -88,13 +69,13 @@ data class RPCClientConfiguration( class RPCClient( val transport: TransportConfiguration, - val rpcConfiguration: RPCClientConfiguration = RPCClientConfiguration.default, + val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT ) { constructor( hostAndPort: NetworkHostAndPort, sslConfiguration: SSLConfiguration? = null, - configuration: RPCClientConfiguration = RPCClientConfiguration.default, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT ) : this(tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfiguration), configuration, serializationContext) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index a08a94162f..f97287ae07 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -21,6 +21,7 @@ import com.github.benmanes.caffeine.cache.RemovalCause import com.github.benmanes.caffeine.cache.RemovalListener import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.ThreadFactoryBuilder +import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCSinceVersion import net.corda.core.context.Actor @@ -80,7 +81,7 @@ import kotlin.reflect.jvm.javaMethod * The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor]. */ class RPCClientProxyHandler( - private val rpcConfiguration: RPCClientConfiguration, + private val rpcConfiguration: CordaRPCClientConfiguration, private val rpcUsername: String, private val rpcPassword: String, private val serverLocator: ServerLocator, @@ -183,6 +184,7 @@ class RPCClientProxyHandler( private val deduplicationSequenceNumber = AtomicLong(0) private val lock = ReentrantReadWriteLock() + @Volatile private var sendingEnabled = true /** @@ -423,8 +425,8 @@ class RPCClientProxyHandler( sendingEnabled = false } - log.warn("RPC server unavailable.") - log.warn("Terminating observables and in flight RPCs.") + log.warn("RPC server unavailable. RPC calls are being buffered.") + log.warn("Terminating observables.") val m = observableContext.observableMap.asMap() m.keys.forEach { k -> observationExecutorPool.run(k) { diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt index 463a6790ed..d4c4025e72 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt @@ -10,7 +10,7 @@ package net.corda.client.rpc -import net.corda.client.rpc.internal.RPCClientConfiguration +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.map import net.corda.core.messaging.RPCOps @@ -54,7 +54,7 @@ open class AbstractRPCTest { inline fun RPCDriverDSL.testProxy( ops: I, rpcUser: User = rpcTestUser, - clientConfiguration: RPCClientConfiguration = RPCClientConfiguration.default, + clientConfiguration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default, serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.default ): TestProxy { return when (mode) { diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt index eb5247b055..2245f5bea4 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt @@ -10,7 +10,7 @@ package net.corda.client.rpc -import net.corda.client.rpc.internal.RPCClientConfiguration +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.transpose @@ -100,7 +100,7 @@ class RPCConcurrencyTests : AbstractRPCTest() { private fun RPCDriverDSL.testProxy(): TestProxy { return testProxy( TestOpsImpl(pool), - clientConfiguration = RPCClientConfiguration.default.copy( + clientConfiguration = CordaRPCClientConfigurationImpl.default.copy( reapInterval = 100.millis ), serverConfiguration = RPCServerConfiguration.default.copy( diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt index 3748b6940c..0ceb8740e5 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt @@ -11,10 +11,9 @@ package net.corda.client.rpc import com.google.common.base.Stopwatch -import net.corda.client.rpc.internal.RPCClientConfiguration +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.internal.concurrent.doneFuture import net.corda.core.messaging.RPCOps -import net.corda.core.utilities.days import net.corda.core.utilities.minutes import net.corda.core.utilities.seconds import net.corda.node.services.messaging.RPCServerConfiguration @@ -54,7 +53,7 @@ class RPCPerformanceTests : AbstractRPCTest() { } private fun RPCDriverDSL.testProxy( - clientConfiguration: RPCClientConfiguration, + clientConfiguration: CordaRPCClientConfigurationImpl, serverConfiguration: RPCServerConfiguration ): TestProxy { return testProxy( @@ -67,7 +66,7 @@ class RPCPerformanceTests : AbstractRPCTest() { private fun warmup() { rpcDriver { val proxy = testProxy( - RPCClientConfiguration.default, + CordaRPCClientConfigurationImpl.default, RPCServerConfiguration.default ) val executor = Executors.newFixedThreadPool(4) @@ -97,7 +96,7 @@ class RPCPerformanceTests : AbstractRPCTest() { measure(inputOutputSizes, (1..5)) { inputOutputSize, _ -> rpcDriver { val proxy = testProxy( - RPCClientConfiguration.default.copy( + CordaRPCClientConfigurationImpl.default.copy( observationExecutorPoolSize = 2 ), RPCServerConfiguration.default.copy( @@ -136,7 +135,7 @@ class RPCPerformanceTests : AbstractRPCTest() { rpcDriver { val metricRegistry = startReporter(shutdownManager) val proxy = testProxy( - RPCClientConfiguration.default.copy( + CordaRPCClientConfigurationImpl.default.copy( reapInterval = 1.seconds ), RPCServerConfiguration.default.copy( @@ -169,7 +168,7 @@ class RPCPerformanceTests : AbstractRPCTest() { // TODO this hangs with more parallelism rpcDriver { val proxy = testProxy( - RPCClientConfiguration.default, + CordaRPCClientConfigurationImpl.default, RPCServerConfiguration.default ) val numberOfMessages = 1000 diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index f84f9aad7f..dcceb9eb9a 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -31,6 +31,7 @@ import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.Try import rx.Observable +import rx.subjects.PublishSubject import java.io.IOException import java.io.InputStream import java.security.PublicKey @@ -382,6 +383,44 @@ interface CordaRPCOps : RPCOps { * @see setFlowsDrainingModeEnabled */ fun isFlowsDrainingModeEnabled(): Boolean + + /** + * Shuts the node down. Returns immediately. + * This does not wait for flows to be completed. + */ + fun shutdown() +} + +/** + * Returns a [DataFeed] that keeps track on the count of pending flows. + */ +fun CordaRPCOps.pendingFlowsCount(): DataFeed> { + + val stateMachineState = stateMachinesFeed() + var pendingFlowsCount = stateMachineState.snapshot.size + var completedFlowsCount = 0 + val updates = PublishSubject.create>() + stateMachineState + .updates + .doOnNext { update -> + when (update) { + is StateMachineUpdate.Added -> { + pendingFlowsCount++ + updates.onNext(completedFlowsCount to pendingFlowsCount) + } + is StateMachineUpdate.Removed -> { + completedFlowsCount++ + updates.onNext(completedFlowsCount to pendingFlowsCount) + if (completedFlowsCount == pendingFlowsCount) { + updates.onCompleted() + } + } + } + }.subscribe() + if (completedFlowsCount == 0) { + updates.onCompleted() + } + return DataFeed(pendingFlowsCount, updates) } inline fun CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(), diff --git a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt index cc9087a14e..789876f4ef 100644 --- a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt +++ b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt @@ -126,7 +126,7 @@ class ContractUpgradeFlowTest { return startRpcClient( rpcAddress = startRpcServer( rpcUser = user, - ops = SecureCordaRPCOps(node.services, node.smm, node.database, node.services) + ops = SecureCordaRPCOps(node.services, node.smm, node.database, node.services, { }) ).get().broker.hostAndPort!!, username = user.username, password = user.password diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 68f35530a3..17bdbd2844 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -7,6 +7,8 @@ from previous releases. Please refer to :doc:`upgrade-notes` for detailed instru UNRELEASED ---------- +* Node can be shut down abruptly by ``shutdown`` function in `CordaRPCOps` or gracefully (draining flows first) through ``gracefulShutdown`` command from shell. + * Parsing of ``NodeConfiguration`` will now fail if unknown configuration keys are found. * The web server now has its own ``web-server.conf`` file, separate from ``node.conf``. diff --git a/docs/source/shell.rst b/docs/source/shell.rst index b89fb28bf9..e6214e4b60 100644 --- a/docs/source/shell.rst +++ b/docs/source/shell.rst @@ -18,6 +18,7 @@ the `CRaSH`_ shell and supports many of the same features. These features includ * Issuing SQL queries to the underlying database * Viewing JMX metrics and monitoring exports * UNIX style pipes for both text and objects, an ``egrep`` command and a command for working with columnular data +* Shutting the node down. Permissions ----------- @@ -199,6 +200,14 @@ Some RPCs return a stream of events that will be shown on screen until you press You can find a list of the available RPC methods `here `_. +Shutting down the node +********************** + +You can shut the node down via shell: + +* ``gracefulShutdown`` will put node into draining mode, and shut down when there are no flows running +* ``shutdown`` will shut the node down immediately + Flow commands ************* diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt b/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt index c938d18914..535abbb772 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt @@ -166,9 +166,9 @@ class Node( val user = config.users.first() val address = config.nodeInterface val targetHost = NetworkHostAndPort(address.host, address.rpcPort) - val config = CordaRPCClientConfiguration( - connectionMaxRetryInterval = 10.seconds - ) + val config = object : CordaRPCClientConfiguration { + override val connectionMaxRetryInterval = 10.seconds + } log.info("Establishing RPC connection to ${targetHost.host} on port ${targetHost.port} ...") CordaRPCClient(targetHost, config).use(user.username, user.password) { log.info("RPC connection to ${targetHost.host}:${targetHost.port} established") diff --git a/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentCertificateRevocationRequestStorageTest.kt b/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentCertificateRevocationRequestStorageTest.kt index ca4328dc56..bdeb2d604f 100644 --- a/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentCertificateRevocationRequestStorageTest.kt +++ b/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentCertificateRevocationRequestStorageTest.kt @@ -96,7 +96,7 @@ class PersistentCertificateRevocationRequestStorageTest : TestBase() { } @Test - fun `Certificate revocation request is not persisted if a valid certificate cannot be found`() { + fun `revocation request fails if a valid certificate cannot be found`() { // given // then diff --git a/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNetworkMapStorageTest.kt b/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNetworkMapStorageTest.kt index f2b6ec3ba3..1070c24db5 100644 --- a/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNetworkMapStorageTest.kt +++ b/network-management/src/test/kotlin/com/r3/corda/networkmanage/common/persistence/PersistentNetworkMapStorageTest.kt @@ -55,7 +55,7 @@ class PersistentNetworkMapStorageTest : TestBase() { } @Test - fun `saveNetworkParameters and then saveNewActiveNetworkMap creates the active network map`() { + fun `create active network map`() { // given // Create node info. val (signedNodeInfo) = createValidSignedNodeInfo("Test", requestStorage) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkParametersCopier.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkParametersCopier.kt index fbbba008c1..f9eb5cc5bf 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkParametersCopier.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkParametersCopier.kt @@ -10,7 +10,9 @@ package net.corda.nodeapi.internal.network -import net.corda.core.internal.* +import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.copyTo +import net.corda.core.internal.div import net.corda.core.node.NetworkParameters import net.corda.core.serialization.serialize import net.corda.nodeapi.internal.createDevNetworkMapCa @@ -21,16 +23,13 @@ import java.nio.file.StandardCopyOption class NetworkParametersCopier( networkParameters: NetworkParameters, - networkMapCa: CertificateAndKeyPair = createDevNetworkMapCa(), + signingCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa(), overwriteFile: Boolean = false, @VisibleForTesting val update: Boolean = false ) { private val copyOptions = if (overwriteFile) arrayOf(StandardCopyOption.REPLACE_EXISTING) else emptyArray() - private val serialisedSignedNetParams = networkParameters.signWithCert( - networkMapCa.keyPair.private, - networkMapCa.certificate - ).serialize() + private val serialisedSignedNetParams = signingCertAndKeyPair.sign(networkParameters).serialize() fun install(nodeDir: Path) { val fileName = if (update) NETWORK_PARAMS_UPDATE_FILE_NAME else NETWORK_PARAMS_FILE_NAME diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt index 72def0bee9..d084ba6182 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt @@ -64,8 +64,7 @@ class CordaPersistence( databaseConfig: DatabaseConfig, schemas: Set, val jdbcUrl: String, - attributeConverters: Collection> = emptySet(), - val cordappClassLoader: ClassLoader? = null + attributeConverters: Collection> = emptySet() ) : Closeable { companion object { private val log = contextLogger() @@ -75,7 +74,7 @@ class CordaPersistence( val hibernateConfig: HibernateConfiguration by lazy { transaction { - HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl, cordappClassLoader) + HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl) } } val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas diff --git a/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt b/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt index 069905e3d0..add347191c 100644 --- a/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt @@ -16,6 +16,7 @@ import net.corda.core.flows.FlowSession import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.StartableByRPC +import net.corda.client.rpc.internal.drainAndShutdown import net.corda.core.identity.Party import net.corda.core.internal.concurrent.map import net.corda.core.messaging.startFlow @@ -30,9 +31,14 @@ import net.corda.testing.driver.driver import net.corda.testing.internal.IntegrationTest import net.corda.testing.internal.IntegrationTestSchemas import net.corda.testing.internal.toDatabaseSchemaName +import net.corda.testing.internal.chooseIdentity import net.corda.testing.node.User import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat import org.junit.* +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit @@ -95,27 +101,54 @@ class P2PFlowsDrainingModeTest : IntegrationTest() { } } - @StartableByRPC - @InitiatingFlow - class InitiateSessionFlow(private val counterParty: Party) : FlowLogic() { + @Test + fun `clean shutdown by draining`() { - @Suspendable - override fun call(): String { + driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation)) { - val session = initiateFlow(counterParty) - session.send("Hi there") - return session.receive().unwrap { it } + val nodeA = startNode(rpcUsers = users).getOrThrow() + val nodeB = startNode(rpcUsers = users).getOrThrow() + var successful = false + val latch = CountDownLatch(1) + nodeB.rpc.setFlowsDrainingModeEnabled(true) + IntRange(1, 10).forEach { nodeA.rpc.startFlow(::InitiateSessionFlow, nodeB.nodeInfo.chooseIdentity()) } + + nodeA.rpc.drainAndShutdown() + .doOnError { error -> + error.printStackTrace() + successful = false + } + .doOnCompleted { successful = true } + .doAfterTerminate { latch.countDown() } + .subscribe() + nodeB.rpc.setFlowsDrainingModeEnabled(false) + latch.await() + + assertThat(successful).isTrue() } } +} - @InitiatedBy(InitiateSessionFlow::class) - class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic() { +@StartableByRPC +@InitiatingFlow +class InitiateSessionFlow(private val counterParty: Party) : FlowLogic() { - @Suspendable - override fun call() { + @Suspendable + override fun call(): String { - val message = initiatingSession.receive().unwrap { it } - initiatingSession.send("$message answer") - } + val session = initiateFlow(counterParty) + session.send("Hi there") + return session.receive().unwrap { it } + } +} + +@InitiatedBy(InitiateSessionFlow::class) +class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic() { + + @Suspendable + override fun call() { + + val message = initiatingSession.receive().unwrap { it } + initiatingSession.send("$message answer") } } \ No newline at end of file diff --git a/node/src/main/java/CordaCaplet.java b/node/src/main/java/CordaCaplet.java index 86dffc2f6a..69deb89f1b 100644 --- a/node/src/main/java/CordaCaplet.java +++ b/node/src/main/java/CordaCaplet.java @@ -93,6 +93,7 @@ public class CordaCaplet extends Capsule { (new File(baseDir, "cordapps")).mkdir(); // Add additional directories of JARs to the classpath (at the end). e.g. for JDBC drivers augmentClasspath((List) cp, new File(baseDir, "drivers")); + augmentClasspath((List) cp, new File(baseDir, "cordapps")); try { List jarDirs = nodeConfig.getStringList("jarDirs"); log(LOG_VERBOSE, "Configured JAR directories = " + jarDirs); diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 7a2415841a..084b037097 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -19,7 +19,16 @@ import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.crypto.newSecureRandom import net.corda.core.crypto.sign -import net.corda.core.flows.* +import net.corda.core.flows.ContractUpgradeFlow +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowLogicRefFactory +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.NotaryChangeFlow +import net.corda.core.flows.NotaryFlow +import net.corda.core.flows.StartableByService import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate @@ -28,9 +37,24 @@ import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.uncheckedCast -import net.corda.core.messaging.* -import net.corda.core.node.* -import net.corda.core.node.services.* +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.FlowHandle +import net.corda.core.messaging.FlowHandleImpl +import net.corda.core.messaging.FlowProgressHandle +import net.corda.core.messaging.FlowProgressHandleImpl +import net.corda.core.messaging.RPCOps +import net.corda.core.node.AppServiceHub +import net.corda.core.node.NetworkParameters +import net.corda.core.node.NodeInfo +import net.corda.core.node.ServiceHub +import net.corda.core.node.ServicesForResolution +import net.corda.core.node.StatesToRecord +import net.corda.core.node.services.AttachmentStorage +import net.corda.core.node.services.CordaService +import net.corda.core.node.services.IdentityService +import net.corda.core.node.services.KeyManagementService +import net.corda.core.node.services.NotaryService +import net.corda.core.node.services.TransactionVerifierService import net.corda.core.serialization.SerializationWhitelist import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken @@ -50,21 +74,61 @@ import net.corda.node.internal.security.RPCSecurityManager import net.corda.node.services.ContractUpgradeHandler import net.corda.node.services.FinalityHandler import net.corda.node.services.NotaryChangeHandler -import net.corda.node.services.api.* -import net.corda.node.services.config.* +import net.corda.node.services.api.CheckpointStorage +import net.corda.node.services.api.DummyAuditService +import net.corda.node.services.api.FlowStarter +import net.corda.node.services.api.IdentityServiceInternal +import net.corda.node.services.api.MonitoringService +import net.corda.node.services.api.NetworkMapCacheBaseInternal +import net.corda.node.services.api.NetworkMapCacheInternal +import net.corda.node.services.api.NodePropertiesStore +import net.corda.node.services.api.SchedulerService +import net.corda.node.services.api.SchemaService +import net.corda.node.services.api.ServiceHubInternal +import net.corda.node.services.api.StartedNodeServices +import net.corda.node.services.api.VaultServiceInternal +import net.corda.node.services.api.WritableTransactionStorage +import net.corda.node.services.config.BFTSMaRtConfiguration +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.NotaryConfig +import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.config.shell.toShellConfig +import net.corda.node.services.config.shouldInitCrashShell import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.events.ScheduledActivityObserver import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.MessagingService -import net.corda.node.services.network.* -import net.corda.node.services.persistence.* +import net.corda.node.services.network.NetworkMapCacheImpl +import net.corda.node.services.network.NetworkMapClient +import net.corda.node.services.network.NetworkMapUpdater +import net.corda.node.services.network.NodeInfoWatcher +import net.corda.node.services.network.PersistentNetworkMapCache +import net.corda.node.services.persistence.AbstractPartyDescriptor +import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter +import net.corda.node.services.persistence.DBCheckpointStorage +import net.corda.node.services.persistence.DBTransactionMappingStorage +import net.corda.node.services.persistence.DBTransactionStorage +import net.corda.node.services.persistence.NodeAttachmentService +import net.corda.node.services.persistence.NodePropertiesPersistentStore +import net.corda.node.services.persistence.RunOnceService import net.corda.node.services.schema.HibernateObserver import net.corda.node.services.schema.NodeSchemaService -import net.corda.node.services.statemachine.* -import net.corda.node.services.transactions.* +import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl +import net.corda.node.services.statemachine.SingleThreadedStateMachineManager +import net.corda.node.services.statemachine.StateMachineManager +import net.corda.node.services.statemachine.appName +import net.corda.node.services.statemachine.flowVersionAndInitiatingClass +import net.corda.node.services.transactions.BFTNonValidatingNotaryService +import net.corda.node.services.transactions.BFTSMaRt +import net.corda.node.services.transactions.MySQLNonValidatingNotaryService +import net.corda.node.services.transactions.MySQLValidatingNotaryService +import net.corda.node.services.transactions.RaftNonValidatingNotaryService +import net.corda.node.services.transactions.RaftUniquenessProvider +import net.corda.node.services.transactions.RaftValidatingNotaryService +import net.corda.node.services.transactions.SimpleNotaryService +import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.node.services.upgrade.ContractUpgradeServiceImpl import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.VaultSoftLockManager @@ -74,10 +138,11 @@ import net.corda.node.utilities.NodeBuildProperties import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.NodeInfoAndSigned import net.corda.nodeapi.internal.crypto.X509Utilities -import net.corda.nodeapi.internal.persistence.* import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.HibernateConfiguration +import net.corda.nodeapi.internal.persistence.SchemaMigration +import net.corda.nodeapi.internal.persistence.isH2Database import net.corda.nodeapi.internal.storeLegalIdentity import net.corda.tools.shell.InteractiveShell import org.apache.activemq.artemis.utils.ReusableLatch @@ -99,6 +164,7 @@ import java.time.Clock import java.time.Duration import java.util.* import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors import kotlin.collections.set import kotlin.reflect.KClass import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair @@ -159,6 +225,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected lateinit var networkMapUpdater: NetworkMapUpdater lateinit var securityManager: RPCSecurityManager + private val shutdownExecutor = Executors.newSingleThreadExecutor() + /** Completes once the node has successfully registered with the network map service * or has loaded network map data from local database */ val nodeReadyFuture: CordaFuture get() = _nodeReadyFuture @@ -173,7 +241,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, /** The implementation of the [CordaRPCOps] interface used by this node. */ open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence, smm: StateMachineManager): CordaRPCOps { - return SecureCordaRPCOps(services, smm, database, flowStarter) + + return SecureCordaRPCOps(services, smm, database, flowStarter, { shutdownExecutor.submit { stop() } }) } private fun initCertificate() { @@ -657,7 +726,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val props = configuration.dataSourceProperties if (props.isEmpty()) throw DatabaseConfigurationException("There must be a database configured.") - val database = configureDatabase(props, configuration.database, identityService, schemaService, cordappLoader.appClassLoader) + val database = configureDatabase(props, configuration.database, identityService, schemaService) // Now log the vendor string as this will also cause a connection to be tested eagerly. logVendorString(database, log) runOnStop += database::close @@ -735,6 +804,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, toRun() } runOnStop.clear() + shutdownExecutor.shutdown() _started = null } @@ -897,8 +967,7 @@ internal class NetworkMapCacheEmptyException : Exception() fun configureDatabase(hikariProperties: Properties, databaseConfig: DatabaseConfig, identityService: IdentityService, - schemaService: SchemaService = NodeSchemaService(), - cordappClassLoader: ClassLoader? = null): CordaPersistence { + schemaService: SchemaService = NodeSchemaService()): CordaPersistence { // Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately // Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default // so we end up providing both descriptor and converter. We should re-examine this in later versions to see if @@ -911,7 +980,6 @@ fun configureDatabase(hikariProperties: Properties, schemaService.schemaOptions.keys, dataSource, !isH2Database(jdbcUrl), - databaseConfig, - cordappClassLoader ?: Thread.currentThread().contextClassLoader).nodeStartup() - return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, attributeConverters, cordappClassLoader) + databaseConfig).nodeStartup() + return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, attributeConverters) } diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 2af0c74284..45f31c1b64 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -56,7 +56,8 @@ internal class CordaRPCOpsImpl( private val services: ServiceHubInternal, private val smm: StateMachineManager, private val database: CordaPersistence, - private val flowStarter: FlowStarter + private val flowStarter: FlowStarter, + private val shutdownNode: () -> Unit ) : CordaRPCOps { override fun networkMapSnapshot(): List { val (snapshot, updates) = networkMapFeed() @@ -311,6 +312,10 @@ internal class CordaRPCOpsImpl( return services.nodeProperties.flowsDrainingMode.isEnabled() } + override fun shutdown() { + shutdownNode.invoke() + } + private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo { return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context) } diff --git a/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt b/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt index 7f28b7c617..9108c13a52 100644 --- a/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt +++ b/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt @@ -182,6 +182,8 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val override fun isFlowsDrainingModeEnabled(): Boolean = guard("isFlowsDrainingModeEnabled", implementation::isFlowsDrainingModeEnabled) + override fun shutdown() = guard("shutdown", implementation::shutdown) + // TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140 private inline fun guard(methodName: String, action: () -> RESULT) = guard(methodName, emptyList(), action) diff --git a/node/src/main/kotlin/net/corda/node/internal/SecureCordaRPCOps.kt b/node/src/main/kotlin/net/corda/node/internal/SecureCordaRPCOps.kt index 57baa5f428..71fc96c729 100644 --- a/node/src/main/kotlin/net/corda/node/internal/SecureCordaRPCOps.kt +++ b/node/src/main/kotlin/net/corda/node/internal/SecureCordaRPCOps.kt @@ -24,7 +24,8 @@ class SecureCordaRPCOps(services: ServiceHubInternal, smm: StateMachineManager, database: CordaPersistence, flowStarter: FlowStarter, - val unsafe: CordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter)) : CordaRPCOps by RpcAuthorisationProxy(unsafe, ::rpcContext) { + shutdownNode: () -> Unit, + val unsafe: CordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter, shutdownNode)) : CordaRPCOps by RpcAuthorisationProxy(unsafe, ::rpcContext) { /** * Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed diff --git a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt index 8908b28acf..54de3b66ef 100644 --- a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt @@ -95,7 +95,7 @@ class CordaRPCOpsImplTest { fun setup() { mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.finance.contracts.asset", "net.corda.finance.schemas")) aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) - rpc = SecureCordaRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database, aliceNode.services) + rpc = SecureCordaRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database, aliceNode.services, { }) CURRENT_RPC_CONTEXT.set(RpcAuthContext(InvocationContext.rpc(testActor()), buildSubject("TEST_USER", emptySet()))) mockNet.runNetwork() diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt index 89dc105c68..1f8c3ba988 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt @@ -59,7 +59,7 @@ class NetworkMapUpdaterTest { private val networkMapCache = createMockNetworkMapCache() private val nodeInfoMap = ConcurrentHashMap() private val networkParamsMap = HashMap() - private val networkMapCa: CertificateAndKeyPair = createDevNetworkMapCa() + private val networkMapCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa() private val cacheExpiryMs = 100 private val networkMapClient = createMockNetworkMapClient() private val scheduler = TestScheduler() @@ -264,7 +264,7 @@ class NetworkMapUpdaterTest { } on { getNetworkParameters(any()) }.then { val paramsHash: SecureHash = uncheckedCast(it.arguments[0]) - networkParamsMap[paramsHash]?.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate) + networkParamsMap[paramsHash]?.let { networkMapCertAndKeyPair.sign(it) } } on { ackNetworkParametersUpdate(any()) }.then { Unit diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index 143faae953..db43d527df 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -142,7 +142,7 @@ open class MockServices private constructor( val cordappLoader = CordappLoader.createWithTestPackages(cordappPackages) val dataSourceProps = makeTestDataSourceProperties(initialIdentity.name.organisation) val schemaService = NodeSchemaService(cordappLoader.cordappSchemas) - val database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(initialIdentity.name.organisation), identityService, schemaService, cordappLoader.appClassLoader) + val database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(initialIdentity.name.organisation), identityService, schemaService) val mockService = database.transaction { object : MockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys) { override val vaultService: VaultService = makeVaultService(database.hibernateConfig, schemaService) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt index 1df72cd250..5055744679 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt @@ -10,6 +10,8 @@ package net.corda.testing.node.internal +import net.corda.client.rpc.CordaRPCClient +import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.core.CordaException import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext @@ -24,8 +26,10 @@ import net.corda.core.utilities.seconds import net.corda.node.services.api.StartedNodeServices import net.corda.node.services.messaging.Message import net.corda.node.services.messaging.MessagingService +import net.corda.testing.driver.NodeHandle import net.corda.testing.internal.chooseIdentity import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.User import net.corda.testing.node.testContext import org.slf4j.LoggerFactory import java.net.Socket @@ -123,4 +127,4 @@ internal interface InternalMockMessagingService : MessagingService { fun stop() } - +fun CordaRPCClient.start(user: User) = start(user.username, user.password) \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index 402474b829..d3bf3affe4 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -13,7 +13,7 @@ package net.corda.testing.node.internal import net.corda.client.mock.Generator import net.corda.client.rpc.internal.KryoClientSerializationScheme import net.corda.client.rpc.internal.RPCClient -import net.corda.client.rpc.internal.RPCClientConfiguration +import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.concurrent.CordaFuture import net.corda.core.context.AuthServiceId import net.corda.core.context.Trace @@ -69,7 +69,7 @@ import net.corda.nodeapi.internal.config.User as InternalUser inline fun RPCDriverDSL.startInVmRpcClient( username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: RPCClientConfiguration = RPCClientConfiguration.default + configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default ) = startInVmRpcClient(I::class.java, username, password, configuration) inline fun RPCDriverDSL.startRandomRpcClient( @@ -82,7 +82,7 @@ inline fun RPCDriverDSL.startRpcClient( rpcAddress: NetworkHostAndPort, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: RPCClientConfiguration = RPCClientConfiguration.default + configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default ) = startRpcClient(I::class.java, rpcAddress, username, password, configuration) data class RpcBrokerHandle( @@ -263,7 +263,7 @@ data class RPCDriverDSL( rpcOpsClass: Class, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: RPCClientConfiguration = RPCClientConfiguration.default + configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default ): CordaFuture { return driverDSL.executorService.fork { val client = RPCClient(inVmClientTransportConfiguration, configuration) @@ -334,7 +334,7 @@ data class RPCDriverDSL( rpcAddress: NetworkHostAndPort, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: RPCClientConfiguration = RPCClientConfiguration.default + configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default ): CordaFuture { return driverDSL.executorService.fork { val client = RPCClient(ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), rpcAddress, null), configuration) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt index 5c1f518689..bf9a206605 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt @@ -45,7 +45,7 @@ import javax.ws.rs.core.Response.status class NetworkMapServer(private val cacheTimeout: Duration, hostAndPort: NetworkHostAndPort, - private val networkMapCa: CertificateAndKeyPair = createDevNetworkMapCa(), + private val networkMapCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa(), private val myHostNameValue: String = "test.host.name", vararg additionalServices: Any) : Closeable { companion object { @@ -118,9 +118,7 @@ class NetworkMapServer(private val cacheTimeout: Duration, inner class InMemoryNetworkMapService { private val nodeInfoMap = mutableMapOf() val latestAcceptedParametersMap = mutableMapOf() - private val signedNetParams by lazy { - networkParameters.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate) - } + private val signedNetParams by lazy { networkMapCertAndKeyPair.sign(networkParameters) } @POST @Path("publish") @@ -153,7 +151,7 @@ class NetworkMapServer(private val cacheTimeout: Duration, @Produces(MediaType.APPLICATION_OCTET_STREAM) fun getNetworkMap(): Response { val networkMap = NetworkMap(nodeInfoMap.keys.toList(), signedNetParams.raw.hash, parametersUpdate) - val signedNetworkMap = networkMap.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate) + val signedNetworkMap = networkMapCertAndKeyPair.sign(networkMap) return Response.ok(signedNetworkMap.serialize().bytes).header("Cache-Control", "max-age=${cacheTimeout.seconds}").build() } @@ -182,8 +180,10 @@ class NetworkMapServer(private val cacheTimeout: Duration, val requestedParameters = if (requestedHash == signedNetParams.raw.hash) { signedNetParams } else if (requestedHash == nextNetworkParameters?.serialize()?.hash) { - nextNetworkParameters?.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate) - } else null + nextNetworkParameters?.let { networkMapCertAndKeyPair.sign(it) } + } else { + null + } requireNotNull(requestedParameters) return Response.ok(requestedParameters!!.serialize().bytes).build() } diff --git a/tools/shell/src/main/java/net/corda/tools/shell/RunShellCommand.java b/tools/shell/src/main/java/net/corda/tools/shell/RunShellCommand.java index 4934a9a4dd..d967a3bf1e 100644 --- a/tools/shell/src/main/java/net/corda/tools/shell/RunShellCommand.java +++ b/tools/shell/src/main/java/net/corda/tools/shell/RunShellCommand.java @@ -10,13 +10,21 @@ package net.corda.tools.shell; -import net.corda.core.messaging.*; -import net.corda.client.jackson.*; -import org.crsh.cli.*; -import org.crsh.command.*; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import net.corda.client.jackson.StringToMethodCallParser; +import net.corda.core.messaging.CordaRPCOps; +import org.crsh.cli.Argument; +import org.crsh.cli.Command; +import org.crsh.cli.Man; +import org.crsh.cli.Usage; +import org.crsh.command.InvocationContext; +import org.jetbrains.annotations.NotNull; import java.util.*; +import static java.util.Comparator.comparing; + // Note that this class cannot be converted to Kotlin because CRaSH does not understand InvocationContext> which // is the closest you can get in Kotlin to raw types. @@ -39,8 +47,7 @@ public class RunShellCommand extends InteractiveShellCommand { emitHelp(context, parser); return null; } - - return InteractiveShell.runRPCFromString(command, out, context, ops(), objectMapper()); + return InteractiveShell.runRPCFromString(command, out, context, ops(), objectMapper(), isSsh()); } private void emitHelp(InvocationContext context, StringToMethodCallParser parser) { @@ -48,21 +55,37 @@ public class RunShellCommand extends InteractiveShellCommand { // Each element we emit is a map of column -> content. Set> entries = parser.getAvailableCommands().entrySet(); ArrayList> entryList = new ArrayList<>(entries); - entryList.sort(Comparator.comparing(Map.Entry::getKey)); + entryList.sort(comparing(Map.Entry::getKey)); for (Map.Entry entry : entryList) { // Skip these entries as they aren't really interesting for the user. if (entry.getKey().equals("startFlowDynamic")) continue; if (entry.getKey().equals("getProtocolVersion")) continue; - // Use a LinkedHashMap to ensure that the Command column comes first. - Map m = new LinkedHashMap<>(); - m.put("Command", entry.getKey()); - m.put("Parameter types", entry.getValue()); try { - context.provide(m); + context.provide(commandAndDesc(entry.getKey(), entry.getValue())); } catch (Exception e) { throw new RuntimeException(e); } } + + Lists.newArrayList( + commandAndDesc("shutdown", "Shuts node down (immediately)"), + commandAndDesc("gracefulShutdown", "Shuts node down gracefully, waiting for all flows to complete first.") + ).forEach(stringStringMap -> { + try { + context.provide(stringStringMap); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @NotNull + private Map commandAndDesc(String command, String description) { + // Use a LinkedHashMap to ensure that the Command column comes first. + Map abruptShutdown = Maps.newLinkedHashMap(); + abruptShutdown.put("Command", command); + abruptShutdown.put("Parameter types", description); + return abruptShutdown; } } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaAuthenticationPlugin.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaAuthenticationPlugin.kt index 77a282ff1c..b8acfb5ba8 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaAuthenticationPlugin.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaAuthenticationPlugin.kt @@ -34,7 +34,7 @@ class CordaAuthenticationPlugin(private val rpcOps: (username: String, credentia } try { val ops = rpcOps(username, credential) - return CordaSSHAuthInfo(true, ops) + return CordaSSHAuthInfo(true, ops, isSsh = true) } catch (e: ActiveMQSecurityException) { logger.warn(e.message) } catch (e: Exception) { diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaSSHAuthInfo.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaSSHAuthInfo.kt index 5de255c96a..298d18b974 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaSSHAuthInfo.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/CordaSSHAuthInfo.kt @@ -16,7 +16,7 @@ import net.corda.tools.shell.InteractiveShell.createYamlInputMapper import net.corda.tools.shell.utlities.ANSIProgressRenderer import org.crsh.auth.AuthInfo -class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: CordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null) : AuthInfo { +class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: CordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null, val isSsh: Boolean = false) : AuthInfo { override fun isSuccessful(): Boolean = successful val yamlInputMapper: ObjectMapper by lazy { diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index fd1ad5e47d..102bfcee36 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -11,13 +11,16 @@ package net.corda.tools.shell import com.fasterxml.jackson.core.JsonGenerator -import com.fasterxml.jackson.core.JsonParser -import com.fasterxml.jackson.databind.* +import com.fasterxml.jackson.databind.JsonSerializer +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.SerializationFeature +import com.fasterxml.jackson.databind.SerializerProvider import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.dataformat.yaml.YAMLFactory -import com.google.common.io.Closeables import net.corda.client.jackson.JacksonSupport import net.corda.client.jackson.StringToMethodCallParser +import net.corda.client.rpc.CordaRPCClientConfiguration +import net.corda.client.rpc.CordaRPCConnection import net.corda.client.rpc.PermissionException import net.corda.client.rpc.internal.createCordaRPCClientWithSslAndClassLoader import net.corda.core.CordaException @@ -28,13 +31,9 @@ import net.corda.core.identity.Party import net.corda.core.internal.* import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.openFuture -import net.corda.core.messaging.CordaRPCOps -import net.corda.core.messaging.DataFeed -import net.corda.core.messaging.FlowProgressHandle -import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.messaging.* import net.corda.core.node.NodeInfo import net.corda.core.utilities.NetworkHostAndPort -import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.tools.shell.utlities.ANSIProgressRenderer import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer import org.crsh.command.InvocationContext @@ -60,12 +59,13 @@ import org.json.JSONObject import org.slf4j.LoggerFactory import rx.Observable import rx.Subscriber -import java.io.* +import java.io.FileDescriptor +import java.io.FileInputStream +import java.io.InputStream +import java.io.PrintWriter import java.lang.reflect.InvocationTargetException import java.lang.reflect.UndeclaredThrowableException -import java.nio.file.Files import java.nio.file.Path -import java.nio.file.Paths import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.ExecutionException @@ -83,69 +83,30 @@ import kotlin.concurrent.thread // TODO: Resurrect or reimplement the mail plugin. // TODO: Make it notice new shell commands added after the node started. -data class SSHDConfiguration(val port: Int) { - companion object { - internal const val INVALID_PORT_FORMAT = "Invalid port: %s" - private const val MISSING_PORT_FORMAT = "Missing port: %s" - - /** - * Parses a string of the form port into a [SSHDConfiguration]. - * @throws IllegalArgumentException if the port is missing or the string is garbage. - */ - @JvmStatic - fun parse(str: String): SSHDConfiguration { - require(!str.isNullOrBlank()) { SSHDConfiguration.MISSING_PORT_FORMAT.format(str) } - val port = try { - str.toInt() - } catch (ex: NumberFormatException) { - throw IllegalArgumentException("Port syntax is invalid, expected port") - } - return SSHDConfiguration(port) - } - } - - init { - require(port in (0..0xffff)) { INVALID_PORT_FORMAT.format(port) } - } -} - -data class ShellSslOptions(override val sslKeystore: Path, override val keyStorePassword: String, override val trustStoreFile:Path, override val trustStorePassword: String) : SSLConfiguration { - override val certificatesDirectory: Path get() = Paths.get("") -} - -data class ShellConfiguration( - val commandsDirectory: Path, - val cordappsDirectory: Path? = null, - var user: String = "", - var password: String = "", - val hostAndPort: NetworkHostAndPort, - val ssl: ShellSslOptions? = null, - val sshdPort: Int? = null, - val sshHostKeyDirectory: Path? = null, - val noLocalShell: Boolean = false) { - companion object { - const val SSH_PORT = 2222 - const val COMMANDS_DIR = "shell-commands" - const val CORDAPPS_DIR = "cordapps" - const val SSHD_HOSTKEY_DIR = "ssh" - } -} - object InteractiveShell { private val log = LoggerFactory.getLogger(javaClass) private lateinit var rpcOps: (username: String, credentials: String) -> CordaRPCOps - private lateinit var connection: CordaRPCOps + private lateinit var ops: CordaRPCOps + private lateinit var connection: CordaRPCConnection private var shell: Shell? = null private var classLoader: ClassLoader? = null + private lateinit var shellConfiguration: ShellConfiguration + private var onExit: () -> Unit = {} /** * Starts an interactive shell connected to the local terminal. This shell gives administrator access to the node * internals. */ fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) { + shellConfiguration = configuration rpcOps = { username: String, credentials: String -> val client = createCordaRPCClientWithSslAndClassLoader(hostAndPort = configuration.hostAndPort, - sslConfiguration = configuration.ssl, classLoader = classLoader) - client.start(username, credentials).proxy + configuration = object : CordaRPCClientConfiguration { + override val maxReconnectAttempts = 1 + }, + sslConfiguration = configuration.ssl, + classLoader = classLoader) + this.connection = client.start(username, credentials) + connection.proxy } InteractiveShell.classLoader = classLoader val runSshDaemon = configuration.sshdPort != null @@ -170,6 +131,7 @@ object InteractiveShell { } fun runLocalShell(onExit: () -> Unit = {}) { + this.onExit = onExit val terminal = TerminalFactory.create() val consoleReader = ConsoleReader("Corda", FileInputStream(FileDescriptor.`in`), System.out, terminal) val jlineProcessor = JLineProcessor(terminal.isAnsiSupported, shell, consoleReader, System.out) @@ -215,18 +177,18 @@ object InteractiveShell { return super.getPlugins().filterNot { it is JavaLanguage } + CordaAuthenticationPlugin(rpcOps) } } - val attributes = emptyMap() + val attributes = emptyMap() val context = PluginContext(discovery, attributes, commandsFS, confFS, classLoader) context.refresh() this.config = config start(context) - connection = makeRPCOps(rpcOps, localUserName, localUserPassword) - return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, connection, StdoutANSIProgressRenderer)) + ops = makeRPCOps(rpcOps, localUserName, localUserPassword) + return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, ops, StdoutANSIProgressRenderer)) } } fun nodeInfo() = try { - connection.nodeInfo() + ops.nodeInfo() } catch (e: UndeclaredThrowableException) { throw e.cause ?: e } @@ -421,14 +383,16 @@ object InteractiveShell { } @JvmStatic - fun runRPCFromString(input: List, out: RenderPrintWriter, context: InvocationContext, cordaRPCOps: CordaRPCOps, om: ObjectMapper): Any? { + fun runRPCFromString(input: List, out: RenderPrintWriter, context: InvocationContext, cordaRPCOps: CordaRPCOps, om: ObjectMapper, isSsh: Boolean = false): Any? { val cmd = input.joinToString(" ").trim { it <= ' ' } - if (cmd.toLowerCase().startsWith("startflow")) { + if (cmd.startsWith("startflow", ignoreCase = true)) { // The flow command provides better support and startFlow requires special handling anyway due to // the generic startFlow RPC interface which offers no type information with which to parse the // string form of the command. out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow) return null + } else if (cmd.substringAfter(" ").trim().equals("gracefulShutdown", ignoreCase = true)) { + return InteractiveShell.gracefulShutdown(out, cordaRPCOps, isSsh) } var result: Any? = null @@ -467,6 +431,68 @@ object InteractiveShell { return result } + + @JvmStatic + fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps, isSsh: Boolean = false) { + + fun display(statements: RenderPrintWriter.() -> Unit) { + statements.invoke(userSessionOut) + userSessionOut.flush() + } + + var isShuttingDown = false + try { + display { + println("Orchestrating a clean shutdown...") + println("...enabling draining mode") + } + cordaRPCOps.setFlowsDrainingModeEnabled(true) + display { + println("...waiting for in-flight flows to be completed") + } + cordaRPCOps.pendingFlowsCount().updates + .doOnError { error -> + log.error(error.message) + throw error + } + .doOnNext { remaining -> + display { + println("...remaining: ${remaining.first}/${remaining.second}") + } + } + .doOnCompleted { + if (isSsh) { + // print in the original Shell process + System.out.println("Shutting down the node via remote SSH session (it may take a while)") + } + display { + println("Shutting down the node (it may take a while)") + } + cordaRPCOps.shutdown() + isShuttingDown = true + connection.forceClose() + display { + println("...done, quitting standalone shell now.") + } + onExit.invoke() + }.toBlocking().single() + } catch (e: StringToMethodCallParser.UnparseableCallException) { + display { + println(e.message, Color.red) + println("Please try 'man run' to learn what syntax is acceptable") + } + } catch (e: Exception) { + if (!isShuttingDown) { + display { + println("RPC failed: ${e.rootCause}", Color.red) + } + } + } finally { + InputStreamSerializer.invokeContext = null + InputStreamDeserializer.closeAll() + } + } + private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter): CordaFuture { val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } @@ -528,6 +554,7 @@ object InteractiveShell { return printNextElements(response.updates, printerFun, out) } if (response is Observable<*>) { + return printNextElements(response, printerFun, out) } @@ -542,94 +569,4 @@ object InteractiveShell { return subscriber.future } - //region Extra serializers - // - // These serializers are used to enable the user to specify objects that aren't natural data containers in the shell, - // and for the shell to print things out that otherwise wouldn't be usefully printable. - - private object ObservableSerializer : JsonSerializer>() { - override fun serialize(value: Observable<*>, gen: JsonGenerator, serializers: SerializerProvider) { - gen.writeString("(observable)") - } - } - - // A file name is deserialized to an InputStream if found. - object InputStreamDeserializer : JsonDeserializer() { - // Keep track of them so we can close them later. - private val streams = Collections.synchronizedSet(HashSet()) - - override fun deserialize(p: JsonParser, ctxt: DeserializationContext): InputStream { - val stream = object : BufferedInputStream(Files.newInputStream(Paths.get(p.text))) { - override fun close() { - super.close() - streams.remove(this) - } - } - streams += stream - return stream - } - - fun closeAll() { - // Clone the set with toList() here so each closed stream can be removed from the set inside close(). - streams.toList().forEach { Closeables.closeQuietly(it) } - } - } - - // An InputStream found in a response triggers a request to the user to provide somewhere to save it. - private object InputStreamSerializer : JsonSerializer() { - var invokeContext: InvocationContext<*>? = null - - override fun serialize(value: InputStream, gen: JsonGenerator, serializers: SerializerProvider) { - try { - val toPath = invokeContext!!.readLine("Path to save stream to (enter to ignore): ", true) - if (toPath == null || toPath.isBlank()) { - gen.writeString("") - } else { - val path = Paths.get(toPath) - value.copyTo(path) - gen.writeString("") - } - } finally { - try { - value.close() - } catch (e: IOException) { - // Ignore. - } - } - } - } - - /** - * String value deserialized to [UniqueIdentifier]. - * Any string value used as [UniqueIdentifier.externalId]. - * If string contains underscore(i.e. externalId_uuid) then split with it. - * Index 0 as [UniqueIdentifier.externalId] - * Index 1 as [UniqueIdentifier.id] - * */ - object UniqueIdentifierDeserializer : JsonDeserializer() { - override fun deserialize(p: JsonParser, ctxt: DeserializationContext): UniqueIdentifier { - //Check if externalId and UUID may be separated by underscore. - if (p.text.contains("_")) { - val ids = p.text.split("_") - //Create UUID object from string. - val uuid: UUID = UUID.fromString(ids[1]) - //Create UniqueIdentifier object using externalId and UUID. - return UniqueIdentifier(ids[0], uuid) - } - //Any other string used as externalId. - return UniqueIdentifier.fromString(p.text) - } - } - - /** - * String value deserialized to [UUID]. - * */ - object UUIDDeserializer : JsonDeserializer() { - override fun deserialize(p: JsonParser, ctxt: DeserializationContext): UUID { - //Create UUID object from string. - return UUID.fromString(p.text) - } - } - - //endregion } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShellCommand.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShellCommand.kt index 77cd727c48..a3b9c20999 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShellCommand.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShellCommand.kt @@ -20,4 +20,5 @@ open class InteractiveShellCommand : BaseCommand() { fun ops() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).rpcOps fun ansiProgressRenderer() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).ansiProgressRenderer fun objectMapper() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).yamlInputMapper + fun isSsh() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).isSsh } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/RPCOpsWithContext.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/RPCOpsWithContext.kt index 5f847b37d9..a3f3b593ac 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/RPCOpsWithContext.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/RPCOpsWithContext.kt @@ -26,6 +26,5 @@ fun makeRPCOps(getCordaRPCOps: (username: String, credential: String) -> CordaRP // Unpack exception. throw e.targetException } - } - ) as CordaRPCOps + }) as CordaRPCOps } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/SSHDConfiguration.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/SSHDConfiguration.kt new file mode 100644 index 0000000000..90e08363fa --- /dev/null +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/SSHDConfiguration.kt @@ -0,0 +1,27 @@ +package net.corda.tools.shell + +data class SSHDConfiguration(val port: Int) { + companion object { + internal const val INVALID_PORT_FORMAT = "Invalid port: %s" + private const val MISSING_PORT_FORMAT = "Missing port: %s" + + /** + * Parses a string of the form port into a [SSHDConfiguration]. + * @throws IllegalArgumentException if the port is missing or the string is garbage. + */ + @JvmStatic + fun parse(str: String): SSHDConfiguration { + require(!str.isNullOrBlank()) { SSHDConfiguration.MISSING_PORT_FORMAT.format(str) } + val port = try { + str.toInt() + } catch (ex: NumberFormatException) { + throw IllegalArgumentException("Port syntax is invalid, expected port") + } + return SSHDConfiguration(port) + } + } + + init { + require(port in (0..0xffff)) { INVALID_PORT_FORMAT.format(port) } + } +} \ No newline at end of file diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/SerializationSupport.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/SerializationSupport.kt new file mode 100644 index 0000000000..90a1a149e3 --- /dev/null +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/SerializationSupport.kt @@ -0,0 +1,103 @@ +package net.corda.tools.shell + +import com.fasterxml.jackson.core.JsonGenerator +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.databind.DeserializationContext +import com.fasterxml.jackson.databind.JsonDeserializer +import com.fasterxml.jackson.databind.JsonSerializer +import com.fasterxml.jackson.databind.SerializerProvider +import com.google.common.io.Closeables +import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.internal.copyTo +import org.crsh.command.InvocationContext +import rx.Observable +import java.io.BufferedInputStream +import java.io.InputStream +import java.nio.file.Files +import java.nio.file.Paths +import java.util.* + +//region Extra serializers +// +// These serializers are used to enable the user to specify objects that aren't natural data containers in the shell, +// and for the shell to print things out that otherwise wouldn't be usefully printable. + +object ObservableSerializer : JsonSerializer>() { + override fun serialize(value: Observable<*>, gen: JsonGenerator, serializers: SerializerProvider) { + gen.writeString("(observable)") + } +} + +/** + * String value deserialized to [UniqueIdentifier]. + * Any string value used as [UniqueIdentifier.externalId]. + * If string contains underscore(i.e. externalId_uuid) then split with it. + * Index 0 as [UniqueIdentifier.externalId] + * Index 1 as [UniqueIdentifier.id] + * */ +object UniqueIdentifierDeserializer : JsonDeserializer() { + override fun deserialize(p: JsonParser, ctxt: DeserializationContext): UniqueIdentifier { + //Check if externalId and UUID may be separated by underscore. + if (p.text.contains("_")) { + val ids = p.text.split("_") + //Create UUID object from string. + val uuid: UUID = UUID.fromString(ids[1]) + //Create UniqueIdentifier object using externalId and UUID. + return UniqueIdentifier(ids[0], uuid) + } + //Any other string used as externalId. + return UniqueIdentifier.fromString(p.text) + } +} + +/** + * String value deserialized to [UUID]. + * */ +object UUIDDeserializer : JsonDeserializer() { + override fun deserialize(p: JsonParser, ctxt: DeserializationContext): UUID { + //Create UUID object from string. + return UUID.fromString(p.text) + } +} + +// An InputStream found in a response triggers a request to the user to provide somewhere to save it. +object InputStreamSerializer : JsonSerializer() { + var invokeContext: InvocationContext<*>? = null + + override fun serialize(value: InputStream, gen: JsonGenerator, serializers: SerializerProvider) { + + value.use { + val toPath = invokeContext!!.readLine("Path to save stream to (enter to ignore): ", true) + if (toPath == null || toPath.isBlank()) { + gen.writeString("") + } else { + val path = Paths.get(toPath) + it.copyTo(path) + gen.writeString("") + } + } + } +} + +// A file name is deserialized to an InputStream if found. +object InputStreamDeserializer : JsonDeserializer() { + // Keep track of them so we can close them later. + private val streams = Collections.synchronizedSet(HashSet()) + + override fun deserialize(p: JsonParser, ctxt: DeserializationContext): InputStream { + val stream = object : BufferedInputStream(Files.newInputStream(Paths.get(p.text))) { + override fun close() { + super.close() + streams.remove(this) + } + } + streams += stream + return stream + } + + fun closeAll() { + // Clone the set with toList() here so each closed stream can be removed from the set inside close(). + streams.toList().forEach { Closeables.closeQuietly(it) } + } +} +//endregion \ No newline at end of file diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/ShellConfiguration.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/ShellConfiguration.kt new file mode 100644 index 0000000000..2714679720 --- /dev/null +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/ShellConfiguration.kt @@ -0,0 +1,28 @@ +package net.corda.tools.shell + +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.nodeapi.internal.config.SSLConfiguration +import java.nio.file.Path +import java.nio.file.Paths + +data class ShellConfiguration( + val commandsDirectory: Path, + val cordappsDirectory: Path? = null, + var user: String = "", + var password: String = "", + val hostAndPort: NetworkHostAndPort, + val ssl: ShellSslOptions? = null, + val sshdPort: Int? = null, + val sshHostKeyDirectory: Path? = null, + val noLocalShell: Boolean = false) { + companion object { + const val SSH_PORT = 2222 + const val COMMANDS_DIR = "shell-commands" + const val CORDAPPS_DIR = "cordapps" + const val SSHD_HOSTKEY_DIR = "ssh" + } +} + +data class ShellSslOptions(override val sslKeystore: Path, override val keyStorePassword: String, override val trustStoreFile:Path, override val trustStorePassword: String) : SSLConfiguration { + override val certificatesDirectory: Path get() = Paths.get("") +} \ No newline at end of file diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt index a6c51c8047..bf9bd2563d 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/StandaloneShell.kt @@ -115,6 +115,7 @@ class StandaloneShell(private val configuration: ShellConfiguration) { configuration.sshdPort?.apply{ println("SSH server listening on port $this.") } exit.await() + // because we can't clean certain Crash Shell threads that block on read() exitProcess(0) } } diff --git a/tools/shell/src/test/kotlin/net/corda/tools/shell/CustomTypeJsonParsingTests.kt b/tools/shell/src/test/kotlin/net/corda/tools/shell/CustomTypeJsonParsingTests.kt index 819e1913bb..d4d97d76db 100644 --- a/tools/shell/src/test/kotlin/net/corda/tools/shell/CustomTypeJsonParsingTests.kt +++ b/tools/shell/src/test/kotlin/net/corda/tools/shell/CustomTypeJsonParsingTests.kt @@ -37,8 +37,8 @@ class CustomTypeJsonParsingTests { fun setup() { objectMapper = ObjectMapper() val simpleModule = SimpleModule() - simpleModule.addDeserializer(UniqueIdentifier::class.java, InteractiveShell.UniqueIdentifierDeserializer) - simpleModule.addDeserializer(UUID::class.java, InteractiveShell.UUIDDeserializer) + simpleModule.addDeserializer(UniqueIdentifier::class.java, UniqueIdentifierDeserializer) + simpleModule.addDeserializer(UUID::class.java, UUIDDeserializer) objectMapper.registerModule(simpleModule) }