Merge remote-tracking branch 'remotes/open/master' into merges/march-19-14-41

# Conflicts:
#	client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt
#	node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt
#	node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt
#	node/src/main/java/CordaCaplet.java
#	node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
#	testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt
This commit is contained in:
sollecitom 2018-03-19 15:17:49 +00:00
commit 8a561cddbf
43 changed files with 783 additions and 342 deletions

View File

@ -658,10 +658,25 @@ public static final class net.corda.core.contracts.UniqueIdentifier$Companion ex
@org.jetbrains.annotations.NotNull public abstract List getServiceFlows() @org.jetbrains.annotations.NotNull public abstract List getServiceFlows()
@org.jetbrains.annotations.NotNull public abstract List getServices() @org.jetbrains.annotations.NotNull public abstract List getServices()
## ##
@net.corda.core.DoNotImplement public interface net.corda.core.cordapp.CordappConfig
public abstract boolean exists(String)
@org.jetbrains.annotations.NotNull public abstract Object get(String)
public abstract boolean getBoolean(String)
public abstract double getDouble(String)
public abstract float getFloat(String)
public abstract int getInt(String)
public abstract long getLong(String)
@org.jetbrains.annotations.NotNull public abstract Number getNumber(String)
@org.jetbrains.annotations.NotNull public abstract String getString(String)
##
public final class net.corda.core.cordapp.CordappConfigException extends java.lang.Exception
public <init>(String, Throwable)
##
public final class net.corda.core.cordapp.CordappContext extends java.lang.Object public final class net.corda.core.cordapp.CordappContext extends java.lang.Object
public <init>(net.corda.core.cordapp.Cordapp, net.corda.core.crypto.SecureHash, ClassLoader, net.corda.core.cordapp.CordappConfig) public <init>(net.corda.core.cordapp.Cordapp, net.corda.core.crypto.SecureHash, ClassLoader, net.corda.core.cordapp.CordappConfig)
@org.jetbrains.annotations.Nullable public final net.corda.core.crypto.SecureHash getAttachmentId() @org.jetbrains.annotations.Nullable public final net.corda.core.crypto.SecureHash getAttachmentId()
@org.jetbrains.annotations.NotNull public final ClassLoader getClassLoader() @org.jetbrains.annotations.NotNull public final ClassLoader getClassLoader()
@org.jetbrains.annotations.NotNull public final net.corda.core.cordapp.CordappConfig getConfig()
@org.jetbrains.annotations.NotNull public final net.corda.core.cordapp.Cordapp getCordapp() @org.jetbrains.annotations.NotNull public final net.corda.core.cordapp.Cordapp getCordapp()
## ##
@net.corda.core.DoNotImplement public interface net.corda.core.cordapp.CordappProvider @net.corda.core.DoNotImplement public interface net.corda.core.cordapp.CordappProvider
@ -966,6 +981,8 @@ public static final class net.corda.core.crypto.PartialMerkleTree$Companion exte
@kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public static final net.corda.core.crypto.SecureHash$SHA256 sha256Twice(byte[]) @kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public static final net.corda.core.crypto.SecureHash$SHA256 sha256Twice(byte[])
@org.jetbrains.annotations.NotNull public String toString() @org.jetbrains.annotations.NotNull public String toString()
public static final net.corda.core.crypto.SecureHash$Companion Companion public static final net.corda.core.crypto.SecureHash$Companion Companion
@org.jetbrains.annotations.NotNull public static final net.corda.core.crypto.SecureHash$SHA256 allOnesHash
@org.jetbrains.annotations.NotNull public static final net.corda.core.crypto.SecureHash$SHA256 zeroHash
## ##
public static final class net.corda.core.crypto.SecureHash$Companion extends java.lang.Object public static final class net.corda.core.crypto.SecureHash$Companion extends java.lang.Object
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash$SHA256 getAllOnesHash() @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash$SHA256 getAllOnesHash()
@ -1377,6 +1394,15 @@ public static final class net.corda.core.flows.NotarisationRequest$Companion ext
public int hashCode() public int hashCode()
public String toString() public String toString()
## ##
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.NotarisationResponse extends java.lang.Object
public <init>(List)
@org.jetbrains.annotations.NotNull public final List component1()
@org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotarisationResponse copy(List)
public boolean equals(Object)
@org.jetbrains.annotations.NotNull public final List getSignatures()
public int hashCode()
public String toString()
##
@net.corda.core.flows.InitiatingFlow public final class net.corda.core.flows.NotaryChangeFlow extends net.corda.core.flows.AbstractStateReplacementFlow$Instigator @net.corda.core.flows.InitiatingFlow public final class net.corda.core.flows.NotaryChangeFlow extends net.corda.core.flows.AbstractStateReplacementFlow$Instigator
public <init>(net.corda.core.contracts.StateAndRef, net.corda.core.identity.Party, net.corda.core.utilities.ProgressTracker) public <init>(net.corda.core.contracts.StateAndRef, net.corda.core.identity.Party, net.corda.core.utilities.ProgressTracker)
@org.jetbrains.annotations.NotNull protected net.corda.core.flows.AbstractStateReplacementFlow$UpgradeTx assembleTx() @org.jetbrains.annotations.NotNull protected net.corda.core.flows.AbstractStateReplacementFlow$UpgradeTx assembleTx()
@ -1384,10 +1410,12 @@ public static final class net.corda.core.flows.NotarisationRequest$Companion ext
@net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.flows.NotaryError extends java.lang.Object @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.flows.NotaryError extends java.lang.Object
## ##
@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$Conflict extends net.corda.core.flows.NotaryError @net.corda.core.serialization.CordaSerializable public static final class net.corda.core.flows.NotaryError$Conflict extends net.corda.core.flows.NotaryError
public <init>(net.corda.core.crypto.SecureHash, Map)
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash component1() @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash component1()
@org.jetbrains.annotations.NotNull public final Map component2() @org.jetbrains.annotations.NotNull public final Map component2()
@org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotaryError$Conflict copy(net.corda.core.crypto.SecureHash, Map) @org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotaryError$Conflict copy(net.corda.core.crypto.SecureHash, Map)
public boolean equals(Object) public boolean equals(Object)
@org.jetbrains.annotations.NotNull public final Map getConsumedStates()
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getTxId() @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getTxId()
public int hashCode() public int hashCode()
@org.jetbrains.annotations.NotNull public String toString() @org.jetbrains.annotations.NotNull public String toString()
@ -1440,6 +1468,7 @@ public static final class net.corda.core.flows.NotaryError$TimeWindowInvalid$Com
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.NotaryException extends net.corda.core.flows.FlowException @net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.NotaryException extends net.corda.core.flows.FlowException
public <init>(net.corda.core.flows.NotaryError, net.corda.core.crypto.SecureHash) public <init>(net.corda.core.flows.NotaryError, net.corda.core.crypto.SecureHash)
@org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotaryError getError() @org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotaryError getError()
@org.jetbrains.annotations.Nullable public final net.corda.core.crypto.SecureHash getTxId()
## ##
public final class net.corda.core.flows.NotaryFlow extends java.lang.Object public final class net.corda.core.flows.NotaryFlow extends java.lang.Object
public <init>() public <init>()
@ -1471,6 +1500,10 @@ public abstract static class net.corda.core.flows.NotaryFlow$Service extends net
@org.jetbrains.annotations.NotNull public final net.corda.core.node.services.TrustedAuthorityNotaryService getService() @org.jetbrains.annotations.NotNull public final net.corda.core.node.services.TrustedAuthorityNotaryService getService()
@co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull protected abstract net.corda.core.flows.TransactionParts validateRequest(net.corda.core.flows.NotarisationPayload) @co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull protected abstract net.corda.core.flows.TransactionParts validateRequest(net.corda.core.flows.NotarisationPayload)
## ##
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.NotaryInternalException extends net.corda.core.flows.FlowException
public <init>(net.corda.core.flows.NotaryError)
@org.jetbrains.annotations.NotNull public final net.corda.core.flows.NotaryError getError()
##
public final class net.corda.core.flows.ReceiveStateAndRefFlow extends net.corda.core.flows.FlowLogic public final class net.corda.core.flows.ReceiveStateAndRefFlow extends net.corda.core.flows.FlowLogic
public <init>(net.corda.core.flows.FlowSession) public <init>(net.corda.core.flows.FlowSession)
@co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public List call() @co.paralleluniverse.fibers.Suspendable @org.jetbrains.annotations.NotNull public List call()
@ -1523,6 +1556,15 @@ public @interface net.corda.core.flows.StartableByRPC
## ##
public @interface net.corda.core.flows.StartableByService public @interface net.corda.core.flows.StartableByService
## ##
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.StateConsumptionDetails extends java.lang.Object
public <init>(net.corda.core.crypto.SecureHash)
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash component1()
@org.jetbrains.annotations.NotNull public final net.corda.core.flows.StateConsumptionDetails copy(net.corda.core.crypto.SecureHash)
public boolean equals(Object)
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getHashOfTransactionId()
public int hashCode()
public String toString()
##
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.StateMachineRunId extends java.lang.Object @net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.StateMachineRunId extends java.lang.Object
public <init>(UUID) public <init>(UUID)
@org.jetbrains.annotations.NotNull public final UUID component1() @org.jetbrains.annotations.NotNull public final UUID component1()
@ -1669,6 +1711,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec
@org.jetbrains.annotations.NotNull public abstract List queryAttachments(net.corda.core.node.services.vault.AttachmentQueryCriteria, net.corda.core.node.services.vault.AttachmentSort) @org.jetbrains.annotations.NotNull public abstract List queryAttachments(net.corda.core.node.services.vault.AttachmentQueryCriteria, net.corda.core.node.services.vault.AttachmentSort)
@org.jetbrains.annotations.NotNull public abstract List registeredFlows() @org.jetbrains.annotations.NotNull public abstract List registeredFlows()
public abstract void setFlowsDrainingModeEnabled(boolean) public abstract void setFlowsDrainingModeEnabled(boolean)
public abstract void shutdown()
@net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.FlowHandle startFlowDynamic(Class, Object...) @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.FlowHandle startFlowDynamic(Class, Object...)
@net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.FlowProgressHandle startTrackedFlowDynamic(Class, Object...) @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.FlowProgressHandle startTrackedFlowDynamic(Class, Object...)
@net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed stateMachineRecordedTransactionMappingFeed() @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.messaging.DataFeed stateMachineRecordedTransactionMappingFeed()
@ -1692,6 +1735,7 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec
@org.jetbrains.annotations.Nullable public abstract net.corda.core.identity.Party wellKnownPartyFromX500Name(net.corda.core.identity.CordaX500Name) @org.jetbrains.annotations.Nullable public abstract net.corda.core.identity.Party wellKnownPartyFromX500Name(net.corda.core.identity.CordaX500Name)
## ##
public final class net.corda.core.messaging.CordaRPCOpsKt extends java.lang.Object public final class net.corda.core.messaging.CordaRPCOpsKt extends java.lang.Object
@org.jetbrains.annotations.NotNull public static final net.corda.core.messaging.DataFeed pendingFlowsCount(net.corda.core.messaging.CordaRPCOps)
## ##
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.messaging.DataFeed extends java.lang.Object @net.corda.core.serialization.CordaSerializable public final class net.corda.core.messaging.DataFeed extends java.lang.Object
public <init>(Object, rx.Observable) public <init>(Object, rx.Observable)
@ -1891,6 +1935,7 @@ public @interface net.corda.core.messaging.RPCReturnsObservables
@org.jetbrains.annotations.NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey) @org.jetbrains.annotations.NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey)
@org.jetbrains.annotations.NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction) @org.jetbrains.annotations.NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction)
@org.jetbrains.annotations.NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction, java.security.PublicKey) @org.jetbrains.annotations.NotNull public abstract net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction, java.security.PublicKey)
@org.jetbrains.annotations.NotNull public abstract net.corda.core.cordapp.CordappContext getAppContext()
@org.jetbrains.annotations.NotNull public abstract java.time.Clock getClock() @org.jetbrains.annotations.NotNull public abstract java.time.Clock getClock()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.ContractUpgradeService getContractUpgradeService() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.ContractUpgradeService getContractUpgradeService()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.KeyManagementService getKeyManagementService() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.KeyManagementService getKeyManagementService()
@ -2870,6 +2915,8 @@ public interface net.corda.core.schemas.StatePersistable
public interface net.corda.core.serialization.ClassWhitelist public interface net.corda.core.serialization.ClassWhitelist
public abstract boolean hasListed(Class) public abstract boolean hasListed(Class)
## ##
public @interface net.corda.core.serialization.ConstructorForDeserialization
##
public @interface net.corda.core.serialization.CordaSerializable public @interface net.corda.core.serialization.CordaSerializable
## ##
public @interface net.corda.core.serialization.CordaSerializationTransformEnumDefault public @interface net.corda.core.serialization.CordaSerializationTransformEnumDefault
@ -2889,6 +2936,9 @@ public @interface net.corda.core.serialization.CordaSerializationTransformRename
public @interface net.corda.core.serialization.DeprecatedConstructorForDeserialization public @interface net.corda.core.serialization.DeprecatedConstructorForDeserialization
public abstract int version() public abstract int version()
## ##
@net.corda.core.DoNotImplement public interface net.corda.core.serialization.EncodingWhitelist
public abstract boolean acceptEncoding(net.corda.core.serialization.SerializationEncoding)
##
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.serialization.MissingAttachmentsException extends net.corda.core.CordaException @net.corda.core.serialization.CordaSerializable public final class net.corda.core.serialization.MissingAttachmentsException extends net.corda.core.CordaException
public <init>(List) public <init>(List)
@org.jetbrains.annotations.NotNull public final List getIds() @org.jetbrains.annotations.NotNull public final List getIds()
@ -2909,6 +2959,8 @@ public final class net.corda.core.serialization.SerializationAPIKt extends java.
## ##
@net.corda.core.DoNotImplement public interface net.corda.core.serialization.SerializationContext @net.corda.core.DoNotImplement public interface net.corda.core.serialization.SerializationContext
@org.jetbrains.annotations.NotNull public abstract ClassLoader getDeserializationClassLoader() @org.jetbrains.annotations.NotNull public abstract ClassLoader getDeserializationClassLoader()
@org.jetbrains.annotations.Nullable public abstract net.corda.core.serialization.SerializationEncoding getEncoding()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.EncodingWhitelist getEncodingWhitelist()
public abstract boolean getObjectReferencesEnabled() public abstract boolean getObjectReferencesEnabled()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.utilities.ByteSequence getPreferredSerializationVersion() @org.jetbrains.annotations.NotNull public abstract net.corda.core.utilities.ByteSequence getPreferredSerializationVersion()
@org.jetbrains.annotations.NotNull public abstract Map getProperties() @org.jetbrains.annotations.NotNull public abstract Map getProperties()
@ -2916,6 +2968,7 @@ public final class net.corda.core.serialization.SerializationAPIKt extends java.
@org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.ClassWhitelist getWhitelist() @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.ClassWhitelist getWhitelist()
@org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withAttachmentsClassLoader(List) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withAttachmentsClassLoader(List)
@org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withClassLoader(ClassLoader) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withClassLoader(ClassLoader)
@org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withEncoding(net.corda.core.serialization.SerializationEncoding)
@org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withPreferredSerializationVersion(net.corda.core.utilities.ByteSequence) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withPreferredSerializationVersion(net.corda.core.utilities.ByteSequence)
@org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withProperty(Object, Object) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withProperty(Object, Object)
@org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withWhitelisted(Class) @org.jetbrains.annotations.NotNull public abstract net.corda.core.serialization.SerializationContext withWhitelisted(Class)
@ -2939,6 +2992,8 @@ public final class net.corda.core.serialization.SerializationDefaults extends ja
@org.jetbrains.annotations.NotNull public final net.corda.core.serialization.SerializationContext getSTORAGE_CONTEXT() @org.jetbrains.annotations.NotNull public final net.corda.core.serialization.SerializationContext getSTORAGE_CONTEXT()
public static final net.corda.core.serialization.SerializationDefaults INSTANCE public static final net.corda.core.serialization.SerializationDefaults INSTANCE
## ##
@net.corda.core.DoNotImplement public interface net.corda.core.serialization.SerializationEncoding
##
public abstract class net.corda.core.serialization.SerializationFactory extends java.lang.Object public abstract class net.corda.core.serialization.SerializationFactory extends java.lang.Object
public <init>() public <init>()
public final Object asCurrent(kotlin.jvm.functions.Function1) public final Object asCurrent(kotlin.jvm.functions.Function1)
@ -3018,13 +3073,20 @@ public static final class net.corda.core.serialization.SingletonSerializationTok
@org.jetbrains.annotations.NotNull public final Map component2() @org.jetbrains.annotations.NotNull public final Map component2()
@org.jetbrains.annotations.NotNull public final net.corda.core.transactions.ContractUpgradeFilteredTransaction copy(Map, Map) @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.ContractUpgradeFilteredTransaction copy(Map, Map)
public boolean equals(Object) public boolean equals(Object)
@org.jetbrains.annotations.NotNull public final Map getHiddenComponents()
@org.jetbrains.annotations.NotNull public net.corda.core.crypto.SecureHash getId() @org.jetbrains.annotations.NotNull public net.corda.core.crypto.SecureHash getId()
@org.jetbrains.annotations.NotNull public List getInputs() @org.jetbrains.annotations.NotNull public List getInputs()
@org.jetbrains.annotations.NotNull public net.corda.core.identity.Party getNotary() @org.jetbrains.annotations.NotNull public net.corda.core.identity.Party getNotary()
@org.jetbrains.annotations.NotNull public List getOutputs() @org.jetbrains.annotations.NotNull public List getOutputs()
@org.jetbrains.annotations.NotNull public final Map getVisibleComponents()
public int hashCode() public int hashCode()
public String toString() public String toString()
## ##
@net.corda.core.serialization.CordaSerializable public static final class net.corda.core.transactions.ContractUpgradeFilteredTransaction$FilteredComponent extends java.lang.Object
public <init>(net.corda.core.utilities.OpaqueBytes, net.corda.core.crypto.SecureHash)
@org.jetbrains.annotations.NotNull public final net.corda.core.utilities.OpaqueBytes getComponent()
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getNonce()
##
@net.corda.core.DoNotImplement public final class net.corda.core.transactions.ContractUpgradeLedgerTransaction extends net.corda.core.transactions.FullTransaction implements net.corda.core.transactions.TransactionWithSignatures @net.corda.core.DoNotImplement public final class net.corda.core.transactions.ContractUpgradeLedgerTransaction extends net.corda.core.transactions.FullTransaction implements net.corda.core.transactions.TransactionWithSignatures
public <init>(List, net.corda.core.identity.Party, net.corda.core.contracts.Attachment, String, net.corda.core.contracts.Attachment, net.corda.core.crypto.SecureHash, net.corda.core.contracts.PrivacySalt, List, net.corda.core.node.NetworkParameters) public <init>(List, net.corda.core.identity.Party, net.corda.core.contracts.Attachment, String, net.corda.core.contracts.Attachment, net.corda.core.crypto.SecureHash, net.corda.core.contracts.PrivacySalt, List, net.corda.core.node.NetworkParameters)
public void checkSignaturesAreValid() public void checkSignaturesAreValid()
@ -3069,12 +3131,18 @@ public static final class net.corda.core.serialization.SingletonSerializationTok
@org.jetbrains.annotations.NotNull public net.corda.core.identity.Party getNotary() @org.jetbrains.annotations.NotNull public net.corda.core.identity.Party getNotary()
@org.jetbrains.annotations.NotNull public List getOutputs() @org.jetbrains.annotations.NotNull public List getOutputs()
@org.jetbrains.annotations.NotNull public final net.corda.core.contracts.PrivacySalt getPrivacySalt() @org.jetbrains.annotations.NotNull public final net.corda.core.contracts.PrivacySalt getPrivacySalt()
@org.jetbrains.annotations.NotNull public final List getSerializedComponents()
@org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getUpgradedContractAttachmentId() @org.jetbrains.annotations.NotNull public final net.corda.core.crypto.SecureHash getUpgradedContractAttachmentId()
@org.jetbrains.annotations.NotNull public final String getUpgradedContractClassName() @org.jetbrains.annotations.NotNull public final String getUpgradedContractClassName()
public int hashCode() public int hashCode()
@org.jetbrains.annotations.NotNull public final net.corda.core.transactions.ContractUpgradeLedgerTransaction resolve(net.corda.core.node.ServicesForResolution, List) @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.ContractUpgradeLedgerTransaction resolve(net.corda.core.node.ServicesForResolution, List)
public String toString() public String toString()
## ##
public static final class net.corda.core.transactions.ContractUpgradeWireTransaction$Component extends java.lang.Enum
protected <init>(String, int)
public static net.corda.core.transactions.ContractUpgradeWireTransaction$Component valueOf(String)
public static net.corda.core.transactions.ContractUpgradeWireTransaction$Component[] values()
##
@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.transactions.CoreTransaction extends net.corda.core.transactions.BaseTransaction @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.transactions.CoreTransaction extends net.corda.core.transactions.BaseTransaction
public <init>() public <init>()
@org.jetbrains.annotations.NotNull public abstract List getInputs() @org.jetbrains.annotations.NotNull public abstract List getInputs()
@ -3206,6 +3274,7 @@ public static final class net.corda.core.transactions.LedgerTransaction$InOutGro
## ##
@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public final class net.corda.core.transactions.NotaryChangeWireTransaction extends net.corda.core.transactions.CoreTransaction @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public final class net.corda.core.transactions.NotaryChangeWireTransaction extends net.corda.core.transactions.CoreTransaction
public <init>(List) public <init>(List)
@kotlin.Deprecated public <init>(List, net.corda.core.identity.Party, net.corda.core.identity.Party)
@org.jetbrains.annotations.NotNull public final List component1() @org.jetbrains.annotations.NotNull public final List component1()
@org.jetbrains.annotations.NotNull public final net.corda.core.transactions.NotaryChangeWireTransaction copy(List) @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.NotaryChangeWireTransaction copy(List)
public boolean equals(Object) public boolean equals(Object)
@ -3214,11 +3283,17 @@ public static final class net.corda.core.transactions.LedgerTransaction$InOutGro
@org.jetbrains.annotations.NotNull public final net.corda.core.identity.Party getNewNotary() @org.jetbrains.annotations.NotNull public final net.corda.core.identity.Party getNewNotary()
@org.jetbrains.annotations.NotNull public net.corda.core.identity.Party getNotary() @org.jetbrains.annotations.NotNull public net.corda.core.identity.Party getNotary()
@org.jetbrains.annotations.NotNull public List getOutputs() @org.jetbrains.annotations.NotNull public List getOutputs()
@org.jetbrains.annotations.NotNull public final List getSerializedComponents()
public int hashCode() public int hashCode()
@org.jetbrains.annotations.NotNull public final net.corda.core.transactions.NotaryChangeLedgerTransaction resolve(net.corda.core.node.ServiceHub, List) @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.NotaryChangeLedgerTransaction resolve(net.corda.core.node.ServiceHub, List)
@org.jetbrains.annotations.NotNull public final net.corda.core.transactions.NotaryChangeLedgerTransaction resolve(net.corda.core.node.ServicesForResolution, List) @org.jetbrains.annotations.NotNull public final net.corda.core.transactions.NotaryChangeLedgerTransaction resolve(net.corda.core.node.ServicesForResolution, List)
public String toString() public String toString()
## ##
public static final class net.corda.core.transactions.NotaryChangeWireTransaction$Component extends java.lang.Enum
protected <init>(String, int)
public static net.corda.core.transactions.NotaryChangeWireTransaction$Component valueOf(String)
public static net.corda.core.transactions.NotaryChangeWireTransaction$Component[] values()
##
@net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public final class net.corda.core.transactions.SignedTransaction extends java.lang.Object implements net.corda.core.transactions.TransactionWithSignatures @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public final class net.corda.core.transactions.SignedTransaction extends java.lang.Object implements net.corda.core.transactions.TransactionWithSignatures
public <init>(net.corda.core.serialization.SerializedBytes, List) public <init>(net.corda.core.serialization.SerializedBytes, List)
public <init>(net.corda.core.transactions.CoreTransaction, List) public <init>(net.corda.core.transactions.CoreTransaction, List)
@ -3362,6 +3437,7 @@ public final class net.corda.core.utilities.ByteArrays extends java.lang.Object
@net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.utilities.ByteSequence extends java.lang.Object implements java.lang.Comparable @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.utilities.ByteSequence extends java.lang.Object implements java.lang.Comparable
public int compareTo(net.corda.core.utilities.ByteSequence) public int compareTo(net.corda.core.utilities.ByteSequence)
@org.jetbrains.annotations.NotNull public final net.corda.core.utilities.ByteSequence copy() @org.jetbrains.annotations.NotNull public final net.corda.core.utilities.ByteSequence copy()
@org.jetbrains.annotations.NotNull public final byte[] copyBytes()
public boolean equals(Object) public boolean equals(Object)
@org.jetbrains.annotations.NotNull public abstract byte[] getBytes() @org.jetbrains.annotations.NotNull public abstract byte[] getBytes()
public final int getOffset() public final int getOffset()
@ -3371,9 +3447,12 @@ public final class net.corda.core.utilities.ByteArrays extends java.lang.Object
@kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public static final net.corda.core.utilities.ByteSequence of(byte[], int) @kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public static final net.corda.core.utilities.ByteSequence of(byte[], int)
@kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public static final net.corda.core.utilities.ByteSequence of(byte[], int, int) @kotlin.jvm.JvmStatic @org.jetbrains.annotations.NotNull public static final net.corda.core.utilities.ByteSequence of(byte[], int, int)
@org.jetbrains.annotations.NotNull public final java.io.ByteArrayInputStream open() @org.jetbrains.annotations.NotNull public final java.io.ByteArrayInputStream open()
@org.jetbrains.annotations.NotNull public final java.nio.ByteBuffer putTo(java.nio.ByteBuffer)
@org.jetbrains.annotations.NotNull public final java.nio.ByteBuffer slice(int, int)
@org.jetbrains.annotations.NotNull public final net.corda.core.utilities.ByteSequence subSequence(int, int) @org.jetbrains.annotations.NotNull public final net.corda.core.utilities.ByteSequence subSequence(int, int)
@org.jetbrains.annotations.NotNull public final net.corda.core.utilities.ByteSequence take(int) @org.jetbrains.annotations.NotNull public final net.corda.core.utilities.ByteSequence take(int)
@org.jetbrains.annotations.NotNull public String toString() @org.jetbrains.annotations.NotNull public String toString()
public final void writeTo(java.io.OutputStream)
public static final net.corda.core.utilities.ByteSequence$Companion Companion public static final net.corda.core.utilities.ByteSequence$Companion Companion
## ##
public static final class net.corda.core.utilities.ByteSequence$Companion extends java.lang.Object public static final class net.corda.core.utilities.ByteSequence$Companion extends java.lang.Object
@ -4151,6 +4230,7 @@ public class net.corda.testing.node.MockServices extends java.lang.Object implem
@org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey)
@org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction)
@org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction, java.security.PublicKey) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction, java.security.PublicKey)
@org.jetbrains.annotations.NotNull public net.corda.core.cordapp.CordappContext getAppContext()
@org.jetbrains.annotations.NotNull public final net.corda.testing.services.MockAttachmentStorage getAttachments() @org.jetbrains.annotations.NotNull public final net.corda.testing.services.MockAttachmentStorage getAttachments()
@org.jetbrains.annotations.NotNull public java.time.Clock getClock() @org.jetbrains.annotations.NotNull public java.time.Clock getClock()
@org.jetbrains.annotations.NotNull public net.corda.core.node.services.ContractUpgradeService getContractUpgradeService() @org.jetbrains.annotations.NotNull public net.corda.core.node.services.ContractUpgradeService getContractUpgradeService()
@ -4209,6 +4289,7 @@ public static final class net.corda.testing.node.MockServicesKt$createMockCordaS
@org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.FilteredTransaction, java.security.PublicKey)
@org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction)
@org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction, java.security.PublicKey) @org.jetbrains.annotations.NotNull public net.corda.core.crypto.TransactionSignature createSignature(net.corda.core.transactions.SignedTransaction, java.security.PublicKey)
@org.jetbrains.annotations.NotNull public net.corda.core.cordapp.CordappContext getAppContext()
@org.jetbrains.annotations.NotNull public net.corda.core.node.services.AttachmentStorage getAttachments() @org.jetbrains.annotations.NotNull public net.corda.core.node.services.AttachmentStorage getAttachments()
@org.jetbrains.annotations.NotNull public java.time.Clock getClock() @org.jetbrains.annotations.NotNull public java.time.Clock getClock()
@org.jetbrains.annotations.NotNull public net.corda.core.node.services.ContractUpgradeService getContractUpgradeService() @org.jetbrains.annotations.NotNull public net.corda.core.node.services.ContractUpgradeService getContractUpgradeService()
@ -4313,18 +4394,22 @@ public final class net.corda.client.rpc.CordaRPCClient extends java.lang.Object
## ##
public static final class net.corda.client.rpc.CordaRPCClient$Companion extends java.lang.Object public static final class net.corda.client.rpc.CordaRPCClient$Companion extends java.lang.Object
## ##
public final class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang.Object public interface net.corda.client.rpc.CordaRPCClientConfiguration
public <init>(java.time.Duration) public abstract int getCacheConcurrencyLevel()
@org.jetbrains.annotations.NotNull public final java.time.Duration component1() @org.jetbrains.annotations.NotNull public abstract java.time.Duration getConnectionMaxRetryInterval()
@org.jetbrains.annotations.NotNull public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration) @org.jetbrains.annotations.NotNull public abstract java.time.Duration getConnectionRetryInterval()
public boolean equals(Object) public abstract double getConnectionRetryIntervalMultiplier()
@org.jetbrains.annotations.NotNull public final java.time.Duration getConnectionMaxRetryInterval() @org.jetbrains.annotations.NotNull public abstract java.time.Duration getDeduplicationCacheExpiry()
public int hashCode() public abstract int getMaxFileSize()
public String toString() public abstract int getMaxReconnectAttempts()
public abstract int getMinimumServerProtocolVersion()
public abstract int getObservationExecutorPoolSize()
@org.jetbrains.annotations.NotNull public abstract java.time.Duration getReapInterval()
public abstract boolean getTrackRpcCallSites()
public static final net.corda.client.rpc.CordaRPCClientConfiguration$Companion Companion public static final net.corda.client.rpc.CordaRPCClientConfiguration$Companion Companion
@org.jetbrains.annotations.NotNull public static final net.corda.client.rpc.CordaRPCClientConfiguration DEFAULT
## ##
public static final class net.corda.client.rpc.CordaRPCClientConfiguration$Companion extends java.lang.Object public static final class net.corda.client.rpc.CordaRPCClientConfiguration$Companion extends java.lang.Object
@org.jetbrains.annotations.NotNull public final net.corda.client.rpc.CordaRPCClientConfiguration default()
## ##
@net.corda.core.DoNotImplement public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Object implements net.corda.client.rpc.RPCConnection @net.corda.core.DoNotImplement public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Object implements net.corda.client.rpc.RPCConnection
public <init>(net.corda.client.rpc.RPCConnection) public <init>(net.corda.client.rpc.RPCConnection)

View File

@ -68,9 +68,9 @@ class NodeMonitorModel {
fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) { fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
val client = CordaRPCClient( val client = CordaRPCClient(
nodeHostAndPort, nodeHostAndPort,
CordaRPCClientConfiguration.DEFAULT.copy( object : CordaRPCClientConfiguration {
connectionMaxRetryInterval = 10.seconds override val connectionMaxRetryInterval = 10.seconds
) }
) )
val connection = client.start(username, password) val connection = client.start(username, password)
val proxy = connection.proxy val proxy = connection.proxy

View File

@ -28,13 +28,13 @@ import net.corda.finance.flows.CashPaymentFlow
import net.corda.finance.schemas.CashSchemaV1 import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.internal.Node import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode import net.corda.node.internal.StartedNode
import net.corda.node.services.Permissions.Companion.invokeRpc import net.corda.node.services.Permissions.Companion.all
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.testing.core.* import net.corda.testing.core.*
import net.corda.testing.node.User import net.corda.testing.node.User
import net.corda.testing.internal.IntegrationTestSchemas import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.internal.toDatabaseSchemaName import net.corda.testing.internal.toDatabaseSchemaName
import net.corda.testing.node.internal.NodeBasedTest import net.corda.testing.node.internal.NodeBasedTest
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.assertj.core.api.Assertions.assertThatExceptionOfType
@ -42,17 +42,17 @@ import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.ClassRule import org.junit.ClassRule
import org.junit.Test import org.junit.Test
import rx.subjects.PublishSubject
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFalse import kotlin.test.assertFalse
import kotlin.test.assertTrue import kotlin.test.assertTrue
class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) { class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", CashSchemaV1::class.packageName)) {
private val rpcUser = User("user1", "test", permissions = setOf( private val rpcUser = User("user1", "test", permissions = setOf(all())
startFlow<CashIssueFlow>(),
startFlow<CashPaymentFlow>(),
invokeRpc("vaultQueryBy"),
invokeRpc(CordaRPCOps::stateMachinesFeed),
invokeRpc("vaultQueryByCriteria"))
) )
private lateinit var node: StartedNode<Node> private lateinit var node: StartedNode<Node>
private lateinit var identity: Party private lateinit var identity: Party
@ -72,7 +72,9 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
override fun setUp() { override fun setUp() {
super.setUp() super.setUp()
node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!) client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, object : CordaRPCClientConfiguration {
override val maxReconnectAttempts = 5
})
identity = node.info.identityFromX500Name(ALICE_NAME) identity = node.info.identityFromX500Name(ALICE_NAME)
} }
@ -100,6 +102,61 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
} }
} }
@Test
fun `shutdown command stops the node`() {
val nodeIsShut: PublishSubject<Unit> = PublishSubject.create()
val latch = CountDownLatch(1)
var successful = false
val maxCount = 20
var count = 0
CloseableExecutor(Executors.newSingleThreadScheduledExecutor()).use { scheduler ->
val task = scheduler.scheduleAtFixedRate({
try {
println("Checking whether node is still running...")
client.start(rpcUser.username, rpcUser.password).use {
println("... node is still running.")
if (count == maxCount) {
nodeIsShut.onError(AssertionError("Node does not get shutdown by RPC"))
}
count++
}
} catch (e: ActiveMQNotConnectedException) {
println("... node is not running.")
nodeIsShut.onCompleted()
} catch (e: ActiveMQSecurityException) {
// nothing here - this happens if trying to connect before the node is started
} catch (e: Throwable) {
nodeIsShut.onError(e)
}
}, 1, 1, TimeUnit.SECONDS)
nodeIsShut.doOnError { error ->
error.printStackTrace()
successful = false
task.cancel(true)
latch.countDown()
}.doOnCompleted {
successful = (node.internals.started == null)
task.cancel(true)
latch.countDown()
}.subscribe()
client.start(rpcUser.username, rpcUser.password).use { rpc -> rpc.proxy.shutdown() }
latch.await()
assertThat(successful).isTrue()
}
}
private class CloseableExecutor(private val delegate: ScheduledExecutorService) : AutoCloseable, ScheduledExecutorService by delegate {
override fun close() {
delegate.shutdown()
}
}
@Test @Test
fun `close-send deadlock and premature shutdown on empty observable`() { fun `close-send deadlock and premature shutdown on empty observable`() {
println("Starting client") println("Starting client")
@ -160,7 +217,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
val updates = proxy.stateMachinesFeed().updates val updates = proxy.stateMachinesFeed().updates
node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0),identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow() node.services.startFlow(CashIssueFlow(2000.DOLLARS, OpaqueBytes.of(0), identity), InvocationContext.shell()).flatMap { it.resultFuture }.getOrThrow()
proxy.startFlow(::CashIssueFlow, 123.DOLLARS, OpaqueBytes.of(0), identity).returnValue.getOrThrow() proxy.startFlow(::CashIssueFlow, 123.DOLLARS, OpaqueBytes.of(0), identity).returnValue.getOrThrow()
proxy.startFlowDynamic(CashIssueFlow::class.java, 1000.DOLLARS, OpaqueBytes.of(0), identity).returnValue.getOrThrow() proxy.startFlowDynamic(CashIssueFlow::class.java, 1000.DOLLARS, OpaqueBytes.of(0), identity).returnValue.getOrThrow()

View File

@ -11,7 +11,7 @@
package net.corda.client.rpc package net.corda.client.rpc
import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.context.Trace import net.corda.core.context.Trace
import net.corda.core.crypto.random63BitValue import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.fork
@ -115,7 +115,7 @@ class RPCStabilityTests {
Try.on { Try.on {
startRpcClient<RPCOps>( startRpcClient<RPCOps>(
server.get().broker.hostAndPort!!, server.get().broker.hostAndPort!!,
configuration = RPCClientConfiguration.default.copy(minimumServerProtocolVersion = 1) configuration = CordaRPCClientConfigurationImpl.default.copy(minimumServerProtocolVersion = 1)
).get() ).get()
} }
} }
@ -250,7 +250,7 @@ class RPCStabilityTests {
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!! val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow() serverFollower.unfollow()
// Set retry interval to 1s to reduce test duration // Set retry interval to 1s to reduce test duration
val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds) val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds)
val clientFollower = shutdownManager.follower() val clientFollower = shutdownManager.follower()
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow() val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow() clientFollower.unfollow()
@ -276,7 +276,7 @@ class RPCStabilityTests {
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!! val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow() serverFollower.unfollow()
// Set retry interval to 1s to reduce test duration // Set retry interval to 1s to reduce test duration
val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5) val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5)
val clientFollower = shutdownManager.follower() val clientFollower = shutdownManager.follower()
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow() val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow() clientFollower.unfollow()
@ -308,7 +308,7 @@ class RPCStabilityTests {
val serverPort = startRpcServer<NoOps>(ops = ops).getOrThrow().broker.hostAndPort!! val serverPort = startRpcServer<NoOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow() serverFollower.unfollow()
val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1) val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1)
val clientFollower = shutdownManager.follower() val clientFollower = shutdownManager.follower()
val client = startRpcClient<NoOps>(serverPort, configuration = clientConfiguration).getOrThrow() val client = startRpcClient<NoOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow() clientFollower.unfollow()

View File

@ -12,7 +12,7 @@ package net.corda.client.rpc
import net.corda.client.rpc.internal.KryoClientSerializationScheme import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.context.Actor import net.corda.core.context.Actor
import net.corda.core.context.Trace import net.corda.core.context.Trace
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
@ -33,23 +33,46 @@ class CordaRPCConnection internal constructor(connection: RPCConnection<CordaRPC
/** /**
* Can be used to configure the RPC client connection. * Can be used to configure the RPC client connection.
*
* @property connectionMaxRetryInterval How much time to wait between connection retries if the server goes down. This
* time will be reached via exponential backoff.
*/ */
data class CordaRPCClientConfiguration(val connectionMaxRetryInterval: Duration) { interface CordaRPCClientConfiguration {
internal fun toRpcClientConfiguration(): RPCClientConfiguration {
return RPCClientConfiguration.default.copy( /** The minimum protocol version required from the server */
connectionMaxRetryInterval = connectionMaxRetryInterval val minimumServerProtocolVersion: Int get() = default().minimumServerProtocolVersion
) /**
} * If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a
* returned Observable stream the stack trace of the originating RPC will be shown as well. Note that
* constructing call stacks is a moderately expensive operation.
*/
val trackRpcCallSites: Boolean get() = default().trackRpcCallSites
/**
* The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references
* and are cleaned up in batches in this interval. If set too large it will waste server side resources for this
* duration. If set too low it wastes client side cycles.
*/
val reapInterval: Duration get() = default().reapInterval
/** The number of threads to use for observations (for executing [Observable.onNext]) */
val observationExecutorPoolSize: Int get() = default().observationExecutorPoolSize
/**
* Determines the concurrency level of the Observable Cache. This is exposed because it implicitly determines
* the limit on the number of leaked observables reaped because of garbage collection per reaping.
* See the implementation of [com.google.common.cache.LocalCache] for details.
*/
val cacheConcurrencyLevel: Int get() = default().cacheConcurrencyLevel
/** The retry interval of artemis connections in milliseconds */
val connectionRetryInterval: Duration get() = default().connectionRetryInterval
/** The retry interval multiplier for exponential backoff */
val connectionRetryIntervalMultiplier: Double get() = default().connectionRetryIntervalMultiplier
/** Maximum retry interval */
val connectionMaxRetryInterval: Duration get() = default().connectionMaxRetryInterval
/** Maximum reconnect attempts on failover */
val maxReconnectAttempts: Int get() = default().maxReconnectAttempts
/** Maximum file size */
val maxFileSize: Int get() = default().maxFileSize
/** The cache expiry of a deduplication watermark per client. */
val deduplicationCacheExpiry: Duration get() = default().deduplicationCacheExpiry
companion object { companion object {
/** fun default(): CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default
* Returns the default configuration we recommend you use.
*/
@JvmField
val DEFAULT = CordaRPCClientConfiguration(connectionMaxRetryInterval = RPCClientConfiguration.default.connectionMaxRetryInterval)
} }
} }
@ -82,17 +105,17 @@ data class CordaRPCClientConfiguration(val connectionMaxRetryInterval: Duration)
*/ */
class CordaRPCClient private constructor( class CordaRPCClient private constructor(
hostAndPort: NetworkHostAndPort, hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null, sslConfiguration: SSLConfiguration? = null,
classLoader: ClassLoader? = null classLoader: ClassLoader? = null
) { ) {
@JvmOverloads @JvmOverloads
constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT) : this(hostAndPort, configuration, null) constructor(hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(hostAndPort, configuration, null)
companion object { companion object {
internal fun createWithSsl( internal fun createWithSsl(
hostAndPort: NetworkHostAndPort, hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null sslConfiguration: SSLConfiguration? = null
): CordaRPCClient { ): CordaRPCClient {
return CordaRPCClient(hostAndPort, configuration, sslConfiguration) return CordaRPCClient(hostAndPort, configuration, sslConfiguration)
@ -100,7 +123,7 @@ class CordaRPCClient private constructor(
internal fun createWithSslAndClassLoader( internal fun createWithSslAndClassLoader(
hostAndPort: NetworkHostAndPort, hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null, sslConfiguration: SSLConfiguration? = null,
classLoader: ClassLoader? = null classLoader: ClassLoader? = null
): CordaRPCClient { ): CordaRPCClient {
@ -122,7 +145,7 @@ class CordaRPCClient private constructor(
private val rpcClient = RPCClient<CordaRPCOps>( private val rpcClient = RPCClient<CordaRPCOps>(
tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration), tcpTransport(ConnectionDirection.Outbound(), hostAndPort, config = sslConfiguration),
configuration.toRpcClientConfiguration(), configuration,
if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT if (classLoader != null) KRYO_RPC_CLIENT_CONTEXT.withClassLoader(classLoader) else KRYO_RPC_CLIENT_CONTEXT
) )

View File

@ -12,19 +12,32 @@ package net.corda.client.rpc.internal
import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.pendingFlowsCount
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.config.SSLConfiguration
import rx.Observable
/** Utility which exposes the internal Corda RPC constructor to other internal Corda components */ /** Utility which exposes the internal Corda RPC constructor to other internal Corda components */
fun createCordaRPCClientWithSsl( fun createCordaRPCClientWithSsl(
hostAndPort: NetworkHostAndPort, hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null sslConfiguration: SSLConfiguration? = null
) = CordaRPCClient.createWithSsl(hostAndPort, configuration, sslConfiguration) ) = CordaRPCClient.createWithSsl(hostAndPort, configuration, sslConfiguration)
fun createCordaRPCClientWithSslAndClassLoader( fun createCordaRPCClientWithSslAndClassLoader(
hostAndPort: NetworkHostAndPort, hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
sslConfiguration: SSLConfiguration? = null, sslConfiguration: SSLConfiguration? = null,
classLoader: ClassLoader? = null classLoader: ClassLoader? = null
) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader) ) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
fun CordaRPCOps.drainAndShutdown(): Observable<Unit> {
setFlowsDrainingModeEnabled(true)
return pendingFlowsCount().updates
.doOnError { error ->
throw error
}
.doOnCompleted { shutdown() }.map { }
}

View File

@ -10,7 +10,6 @@
package net.corda.client.rpc.internal package net.corda.client.rpc.internal
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.RPCConnection import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCException
@ -37,40 +36,22 @@ import java.time.Duration
/** /**
* This configuration may be used to tweak the internals of the RPC client. * This configuration may be used to tweak the internals of the RPC client.
*/ */
data class RPCClientConfiguration( data class CordaRPCClientConfigurationImpl(
/** The minimum protocol version required from the server */ override val minimumServerProtocolVersion: Int,
val minimumServerProtocolVersion: Int, override val trackRpcCallSites: Boolean,
/** override val reapInterval: Duration,
* If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a override val observationExecutorPoolSize: Int,
* returned Observable stream the stack trace of the originating RPC will be shown as well. Note that override val connectionRetryInterval: Duration,
* constructing call stacks is a moderately expensive operation. override val connectionRetryIntervalMultiplier: Double,
*/ override val connectionMaxRetryInterval: Duration,
val trackRpcCallSites: Boolean, override val maxReconnectAttempts: Int,
/** override val maxFileSize: Int,
* The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references override val deduplicationCacheExpiry: Duration
* and are cleaned up in batches in this interval. If set too large it will waste server side resources for this ) : CordaRPCClientConfiguration {
* duration. If set too low it wastes client side cycles.
*/
val reapInterval: Duration,
/** The number of threads to use for observations (for executing [Observable.onNext]) */
val observationExecutorPoolSize: Int,
/** The retry interval of artemis connections in milliseconds */
val connectionRetryInterval: Duration,
/** The retry interval multiplier for exponential backoff */
val connectionRetryIntervalMultiplier: Double,
/** Maximum retry interval */
val connectionMaxRetryInterval: Duration,
/** Maximum reconnect attempts on failover */
val maxReconnectAttempts: Int,
/** Maximum file size */
val maxFileSize: Int,
/** The cache expiry of a deduplication watermark per client. */
val deduplicationCacheExpiry: Duration
) {
companion object { companion object {
val unlimitedReconnectAttempts = -1 private const val unlimitedReconnectAttempts = -1
@JvmStatic @JvmStatic
val default = RPCClientConfiguration( val default = CordaRPCClientConfigurationImpl(
minimumServerProtocolVersion = 0, minimumServerProtocolVersion = 0,
trackRpcCallSites = false, trackRpcCallSites = false,
reapInterval = 1.seconds, reapInterval = 1.seconds,
@ -88,13 +69,13 @@ data class RPCClientConfiguration(
class RPCClient<I : RPCOps>( class RPCClient<I : RPCOps>(
val transport: TransportConfiguration, val transport: TransportConfiguration,
val rpcConfiguration: RPCClientConfiguration = RPCClientConfiguration.default, val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
) { ) {
constructor( constructor(
hostAndPort: NetworkHostAndPort, hostAndPort: NetworkHostAndPort,
sslConfiguration: SSLConfiguration? = null, sslConfiguration: SSLConfiguration? = null,
configuration: RPCClientConfiguration = RPCClientConfiguration.default, configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
) : this(tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfiguration), configuration, serializationContext) ) : this(tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfiguration), configuration, serializationContext)

View File

@ -21,6 +21,7 @@ import com.github.benmanes.caffeine.cache.RemovalCause
import com.github.benmanes.caffeine.cache.RemovalListener import com.github.benmanes.caffeine.cache.RemovalListener
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.RPCException import net.corda.client.rpc.RPCException
import net.corda.client.rpc.RPCSinceVersion import net.corda.client.rpc.RPCSinceVersion
import net.corda.core.context.Actor import net.corda.core.context.Actor
@ -80,7 +81,7 @@ import kotlin.reflect.jvm.javaMethod
* The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor]. * The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor].
*/ */
class RPCClientProxyHandler( class RPCClientProxyHandler(
private val rpcConfiguration: RPCClientConfiguration, private val rpcConfiguration: CordaRPCClientConfiguration,
private val rpcUsername: String, private val rpcUsername: String,
private val rpcPassword: String, private val rpcPassword: String,
private val serverLocator: ServerLocator, private val serverLocator: ServerLocator,
@ -183,6 +184,7 @@ class RPCClientProxyHandler(
private val deduplicationSequenceNumber = AtomicLong(0) private val deduplicationSequenceNumber = AtomicLong(0)
private val lock = ReentrantReadWriteLock() private val lock = ReentrantReadWriteLock()
@Volatile
private var sendingEnabled = true private var sendingEnabled = true
/** /**
@ -423,8 +425,8 @@ class RPCClientProxyHandler(
sendingEnabled = false sendingEnabled = false
} }
log.warn("RPC server unavailable.") log.warn("RPC server unavailable. RPC calls are being buffered.")
log.warn("Terminating observables and in flight RPCs.") log.warn("Terminating observables.")
val m = observableContext.observableMap.asMap() val m = observableContext.observableMap.asMap()
m.keys.forEach { k -> m.keys.forEach { k ->
observationExecutorPool.run(k) { observationExecutorPool.run(k) {

View File

@ -10,7 +10,7 @@
package net.corda.client.rpc package net.corda.client.rpc
import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
@ -54,7 +54,7 @@ open class AbstractRPCTest {
inline fun <reified I : RPCOps> RPCDriverDSL.testProxy( inline fun <reified I : RPCOps> RPCDriverDSL.testProxy(
ops: I, ops: I,
rpcUser: User = rpcTestUser, rpcUser: User = rpcTestUser,
clientConfiguration: RPCClientConfiguration = RPCClientConfiguration.default, clientConfiguration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default,
serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.default serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.default
): TestProxy<I> { ): TestProxy<I> {
return when (mode) { return when (mode) {

View File

@ -10,7 +10,7 @@
package net.corda.client.rpc package net.corda.client.rpc
import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.crypto.random63BitValue import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose import net.corda.core.internal.concurrent.transpose
@ -100,7 +100,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
private fun RPCDriverDSL.testProxy(): TestProxy<TestOps> { private fun RPCDriverDSL.testProxy(): TestProxy<TestOps> {
return testProxy<TestOps>( return testProxy<TestOps>(
TestOpsImpl(pool), TestOpsImpl(pool),
clientConfiguration = RPCClientConfiguration.default.copy( clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(
reapInterval = 100.millis reapInterval = 100.millis
), ),
serverConfiguration = RPCServerConfiguration.default.copy( serverConfiguration = RPCServerConfiguration.default.copy(

View File

@ -11,10 +11,9 @@
package net.corda.client.rpc package net.corda.client.rpc
import com.google.common.base.Stopwatch import com.google.common.base.Stopwatch
import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.days
import net.corda.core.utilities.minutes import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
@ -54,7 +53,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
} }
private fun RPCDriverDSL.testProxy( private fun RPCDriverDSL.testProxy(
clientConfiguration: RPCClientConfiguration, clientConfiguration: CordaRPCClientConfigurationImpl,
serverConfiguration: RPCServerConfiguration serverConfiguration: RPCServerConfiguration
): TestProxy<TestOps> { ): TestProxy<TestOps> {
return testProxy<TestOps>( return testProxy<TestOps>(
@ -67,7 +66,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
private fun warmup() { private fun warmup() {
rpcDriver { rpcDriver {
val proxy = testProxy( val proxy = testProxy(
RPCClientConfiguration.default, CordaRPCClientConfigurationImpl.default,
RPCServerConfiguration.default RPCServerConfiguration.default
) )
val executor = Executors.newFixedThreadPool(4) val executor = Executors.newFixedThreadPool(4)
@ -97,7 +96,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
measure(inputOutputSizes, (1..5)) { inputOutputSize, _ -> measure(inputOutputSizes, (1..5)) { inputOutputSize, _ ->
rpcDriver { rpcDriver {
val proxy = testProxy( val proxy = testProxy(
RPCClientConfiguration.default.copy( CordaRPCClientConfigurationImpl.default.copy(
observationExecutorPoolSize = 2 observationExecutorPoolSize = 2
), ),
RPCServerConfiguration.default.copy( RPCServerConfiguration.default.copy(
@ -136,7 +135,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
rpcDriver { rpcDriver {
val metricRegistry = startReporter(shutdownManager) val metricRegistry = startReporter(shutdownManager)
val proxy = testProxy( val proxy = testProxy(
RPCClientConfiguration.default.copy( CordaRPCClientConfigurationImpl.default.copy(
reapInterval = 1.seconds reapInterval = 1.seconds
), ),
RPCServerConfiguration.default.copy( RPCServerConfiguration.default.copy(
@ -169,7 +168,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
// TODO this hangs with more parallelism // TODO this hangs with more parallelism
rpcDriver { rpcDriver {
val proxy = testProxy( val proxy = testProxy(
RPCClientConfiguration.default, CordaRPCClientConfigurationImpl.default,
RPCServerConfiguration.default RPCServerConfiguration.default
) )
val numberOfMessages = 1000 val numberOfMessages = 1000

View File

@ -31,6 +31,7 @@ import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.Try import net.corda.core.utilities.Try
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject
import java.io.IOException import java.io.IOException
import java.io.InputStream import java.io.InputStream
import java.security.PublicKey import java.security.PublicKey
@ -382,6 +383,44 @@ interface CordaRPCOps : RPCOps {
* @see setFlowsDrainingModeEnabled * @see setFlowsDrainingModeEnabled
*/ */
fun isFlowsDrainingModeEnabled(): Boolean fun isFlowsDrainingModeEnabled(): Boolean
/**
* Shuts the node down. Returns immediately.
* This does not wait for flows to be completed.
*/
fun shutdown()
}
/**
* Returns a [DataFeed] that keeps track on the count of pending flows.
*/
fun CordaRPCOps.pendingFlowsCount(): DataFeed<Int, Pair<Int, Int>> {
val stateMachineState = stateMachinesFeed()
var pendingFlowsCount = stateMachineState.snapshot.size
var completedFlowsCount = 0
val updates = PublishSubject.create<Pair<Int, Int>>()
stateMachineState
.updates
.doOnNext { update ->
when (update) {
is StateMachineUpdate.Added -> {
pendingFlowsCount++
updates.onNext(completedFlowsCount to pendingFlowsCount)
}
is StateMachineUpdate.Removed -> {
completedFlowsCount++
updates.onNext(completedFlowsCount to pendingFlowsCount)
if (completedFlowsCount == pendingFlowsCount) {
updates.onCompleted()
}
}
}
}.subscribe()
if (completedFlowsCount == 0) {
updates.onCompleted()
}
return DataFeed(pendingFlowsCount, updates)
} }
inline fun <reified T : ContractState> CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(), inline fun <reified T : ContractState> CordaRPCOps.vaultQueryBy(criteria: QueryCriteria = QueryCriteria.VaultQueryCriteria(),

View File

@ -126,7 +126,7 @@ class ContractUpgradeFlowTest {
return startRpcClient<CordaRPCOps>( return startRpcClient<CordaRPCOps>(
rpcAddress = startRpcServer( rpcAddress = startRpcServer(
rpcUser = user, rpcUser = user,
ops = SecureCordaRPCOps(node.services, node.smm, node.database, node.services) ops = SecureCordaRPCOps(node.services, node.smm, node.database, node.services, { })
).get().broker.hostAndPort!!, ).get().broker.hostAndPort!!,
username = user.username, username = user.username,
password = user.password password = user.password

View File

@ -7,6 +7,8 @@ from previous releases. Please refer to :doc:`upgrade-notes` for detailed instru
UNRELEASED UNRELEASED
---------- ----------
* Node can be shut down abruptly by ``shutdown`` function in `CordaRPCOps` or gracefully (draining flows first) through ``gracefulShutdown`` command from shell.
* Parsing of ``NodeConfiguration`` will now fail if unknown configuration keys are found. * Parsing of ``NodeConfiguration`` will now fail if unknown configuration keys are found.
* The web server now has its own ``web-server.conf`` file, separate from ``node.conf``. * The web server now has its own ``web-server.conf`` file, separate from ``node.conf``.

View File

@ -18,6 +18,7 @@ the `CRaSH`_ shell and supports many of the same features. These features includ
* Issuing SQL queries to the underlying database * Issuing SQL queries to the underlying database
* Viewing JMX metrics and monitoring exports * Viewing JMX metrics and monitoring exports
* UNIX style pipes for both text and objects, an ``egrep`` command and a command for working with columnular data * UNIX style pipes for both text and objects, an ``egrep`` command and a command for working with columnular data
* Shutting the node down.
Permissions Permissions
----------- -----------
@ -199,6 +200,14 @@ Some RPCs return a stream of events that will be shown on screen until you press
You can find a list of the available RPC methods You can find a list of the available RPC methods
`here <https://docs.corda.net/api/kotlin/corda/net.corda.core.messaging/-corda-r-p-c-ops/index.html>`_. `here <https://docs.corda.net/api/kotlin/corda/net.corda.core.messaging/-corda-r-p-c-ops/index.html>`_.
Shutting down the node
**********************
You can shut the node down via shell:
* ``gracefulShutdown`` will put node into draining mode, and shut down when there are no flows running
* ``shutdown`` will shut the node down immediately
Flow commands Flow commands
************* *************

View File

@ -166,9 +166,9 @@ class Node(
val user = config.users.first() val user = config.users.first()
val address = config.nodeInterface val address = config.nodeInterface
val targetHost = NetworkHostAndPort(address.host, address.rpcPort) val targetHost = NetworkHostAndPort(address.host, address.rpcPort)
val config = CordaRPCClientConfiguration( val config = object : CordaRPCClientConfiguration {
connectionMaxRetryInterval = 10.seconds override val connectionMaxRetryInterval = 10.seconds
) }
log.info("Establishing RPC connection to ${targetHost.host} on port ${targetHost.port} ...") log.info("Establishing RPC connection to ${targetHost.host} on port ${targetHost.port} ...")
CordaRPCClient(targetHost, config).use(user.username, user.password) { CordaRPCClient(targetHost, config).use(user.username, user.password) {
log.info("RPC connection to ${targetHost.host}:${targetHost.port} established") log.info("RPC connection to ${targetHost.host}:${targetHost.port} established")

View File

@ -96,7 +96,7 @@ class PersistentCertificateRevocationRequestStorageTest : TestBase() {
} }
@Test @Test
fun `Certificate revocation request is not persisted if a valid certificate cannot be found`() { fun `revocation request fails if a valid certificate cannot be found`() {
// given // given
// then // then

View File

@ -55,7 +55,7 @@ class PersistentNetworkMapStorageTest : TestBase() {
} }
@Test @Test
fun `saveNetworkParameters and then saveNewActiveNetworkMap creates the active network map`() { fun `create active network map`() {
// given // given
// Create node info. // Create node info.
val (signedNodeInfo) = createValidSignedNodeInfo("Test", requestStorage) val (signedNodeInfo) = createValidSignedNodeInfo("Test", requestStorage)

View File

@ -10,7 +10,9 @@
package net.corda.nodeapi.internal.network package net.corda.nodeapi.internal.network
import net.corda.core.internal.* import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.copyTo
import net.corda.core.internal.div
import net.corda.core.node.NetworkParameters import net.corda.core.node.NetworkParameters
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.nodeapi.internal.createDevNetworkMapCa import net.corda.nodeapi.internal.createDevNetworkMapCa
@ -21,16 +23,13 @@ import java.nio.file.StandardCopyOption
class NetworkParametersCopier( class NetworkParametersCopier(
networkParameters: NetworkParameters, networkParameters: NetworkParameters,
networkMapCa: CertificateAndKeyPair = createDevNetworkMapCa(), signingCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa(),
overwriteFile: Boolean = false, overwriteFile: Boolean = false,
@VisibleForTesting @VisibleForTesting
val update: Boolean = false val update: Boolean = false
) { ) {
private val copyOptions = if (overwriteFile) arrayOf(StandardCopyOption.REPLACE_EXISTING) else emptyArray() private val copyOptions = if (overwriteFile) arrayOf(StandardCopyOption.REPLACE_EXISTING) else emptyArray()
private val serialisedSignedNetParams = networkParameters.signWithCert( private val serialisedSignedNetParams = signingCertAndKeyPair.sign(networkParameters).serialize()
networkMapCa.keyPair.private,
networkMapCa.certificate
).serialize()
fun install(nodeDir: Path) { fun install(nodeDir: Path) {
val fileName = if (update) NETWORK_PARAMS_UPDATE_FILE_NAME else NETWORK_PARAMS_FILE_NAME val fileName = if (update) NETWORK_PARAMS_UPDATE_FILE_NAME else NETWORK_PARAMS_FILE_NAME

View File

@ -64,8 +64,7 @@ class CordaPersistence(
databaseConfig: DatabaseConfig, databaseConfig: DatabaseConfig,
schemas: Set<MappedSchema>, schemas: Set<MappedSchema>,
val jdbcUrl: String, val jdbcUrl: String,
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet(), attributeConverters: Collection<AttributeConverter<*, *>> = emptySet()
val cordappClassLoader: ClassLoader? = null
) : Closeable { ) : Closeable {
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
@ -75,7 +74,7 @@ class CordaPersistence(
val hibernateConfig: HibernateConfiguration by lazy { val hibernateConfig: HibernateConfiguration by lazy {
transaction { transaction {
HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl, cordappClassLoader) HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl)
} }
} }
val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas

View File

@ -16,6 +16,7 @@ import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.client.rpc.internal.drainAndShutdown
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
@ -30,9 +31,14 @@ import net.corda.testing.driver.driver
import net.corda.testing.internal.IntegrationTest import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.IntegrationTestSchemas import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.internal.toDatabaseSchemaName import net.corda.testing.internal.toDatabaseSchemaName
import net.corda.testing.internal.chooseIdentity
import net.corda.testing.node.User import net.corda.testing.node.User
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.* import org.junit.*
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -95,27 +101,54 @@ class P2PFlowsDrainingModeTest : IntegrationTest() {
} }
} }
@StartableByRPC @Test
@InitiatingFlow fun `clean shutdown by draining`() {
class InitiateSessionFlow(private val counterParty: Party) : FlowLogic<String>() {
@Suspendable driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = portAllocation)) {
override fun call(): String {
val session = initiateFlow(counterParty) val nodeA = startNode(rpcUsers = users).getOrThrow()
session.send("Hi there") val nodeB = startNode(rpcUsers = users).getOrThrow()
return session.receive<String>().unwrap { it } var successful = false
val latch = CountDownLatch(1)
nodeB.rpc.setFlowsDrainingModeEnabled(true)
IntRange(1, 10).forEach { nodeA.rpc.startFlow(::InitiateSessionFlow, nodeB.nodeInfo.chooseIdentity()) }
nodeA.rpc.drainAndShutdown()
.doOnError { error ->
error.printStackTrace()
successful = false
}
.doOnCompleted { successful = true }
.doAfterTerminate { latch.countDown() }
.subscribe()
nodeB.rpc.setFlowsDrainingModeEnabled(false)
latch.await()
assertThat(successful).isTrue()
} }
} }
}
@InitiatedBy(InitiateSessionFlow::class) @StartableByRPC
class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic<Unit>() { @InitiatingFlow
class InitiateSessionFlow(private val counterParty: Party) : FlowLogic<String>() {
@Suspendable @Suspendable
override fun call() { override fun call(): String {
val message = initiatingSession.receive<String>().unwrap { it } val session = initiateFlow(counterParty)
initiatingSession.send("$message answer") session.send("Hi there")
} return session.receive<String>().unwrap { it }
}
}
@InitiatedBy(InitiateSessionFlow::class)
class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val message = initiatingSession.receive<String>().unwrap { it }
initiatingSession.send("$message answer")
} }
} }

View File

@ -93,6 +93,7 @@ public class CordaCaplet extends Capsule {
(new File(baseDir, "cordapps")).mkdir(); (new File(baseDir, "cordapps")).mkdir();
// Add additional directories of JARs to the classpath (at the end). e.g. for JDBC drivers // Add additional directories of JARs to the classpath (at the end). e.g. for JDBC drivers
augmentClasspath((List<Path>) cp, new File(baseDir, "drivers")); augmentClasspath((List<Path>) cp, new File(baseDir, "drivers"));
augmentClasspath((List<Path>) cp, new File(baseDir, "cordapps"));
try { try {
List<String> jarDirs = nodeConfig.getStringList("jarDirs"); List<String> jarDirs = nodeConfig.getStringList("jarDirs");
log(LOG_VERBOSE, "Configured JAR directories = " + jarDirs); log(LOG_VERBOSE, "Configured JAR directories = " + jarDirs);

View File

@ -19,7 +19,16 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext import net.corda.core.context.InvocationContext
import net.corda.core.crypto.newSecureRandom import net.corda.core.crypto.newSecureRandom
import net.corda.core.crypto.sign import net.corda.core.crypto.sign
import net.corda.core.flows.* import net.corda.core.flows.ContractUpgradeFlow
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.NotaryChangeFlow
import net.corda.core.flows.NotaryFlow
import net.corda.core.flows.StartableByService
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate import net.corda.core.identity.PartyAndCertificate
@ -28,9 +37,24 @@ import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.uncheckedCast import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.* import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.* import net.corda.core.messaging.FlowHandle
import net.corda.core.node.services.* import net.corda.core.messaging.FlowHandleImpl
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.FlowProgressHandleImpl
import net.corda.core.messaging.RPCOps
import net.corda.core.node.AppServiceHub
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.ServicesForResolution
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.node.services.CordaService
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.node.services.NotaryService
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.serialization.SerializationWhitelist import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
@ -50,21 +74,61 @@ import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.ContractUpgradeHandler import net.corda.node.services.ContractUpgradeHandler
import net.corda.node.services.FinalityHandler import net.corda.node.services.FinalityHandler
import net.corda.node.services.NotaryChangeHandler import net.corda.node.services.NotaryChangeHandler
import net.corda.node.services.api.* import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.config.* import net.corda.node.services.api.DummyAuditService
import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.IdentityServiceInternal
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.SchedulerService
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.NotaryConfig
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.config.shell.toShellConfig import net.corda.node.services.config.shell.toShellConfig
import net.corda.node.services.config.shouldInitCrashShell
import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.events.ScheduledActivityObserver import net.corda.node.services.events.ScheduledActivityObserver
import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.* import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.persistence.* import net.corda.node.services.network.NetworkMapClient
import net.corda.node.services.network.NetworkMapUpdater
import net.corda.node.services.network.NodeInfoWatcher
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.persistence.AbstractPartyDescriptor
import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.persistence.NodePropertiesPersistentStore
import net.corda.node.services.persistence.RunOnceService
import net.corda.node.services.schema.HibernateObserver import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.* import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.transactions.* import net.corda.node.services.statemachine.SingleThreadedStateMachineManager
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.statemachine.appName
import net.corda.node.services.statemachine.flowVersionAndInitiatingClass
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
import net.corda.node.services.transactions.BFTSMaRt
import net.corda.node.services.transactions.MySQLNonValidatingNotaryService
import net.corda.node.services.transactions.MySQLValidatingNotaryService
import net.corda.node.services.transactions.RaftNonValidatingNotaryService
import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager import net.corda.node.services.vault.VaultSoftLockManager
@ -74,10 +138,11 @@ import net.corda.node.utilities.NodeBuildProperties
import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.NodeInfoAndSigned import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.*
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.HibernateConfiguration import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.nodeapi.internal.persistence.SchemaMigration
import net.corda.nodeapi.internal.persistence.isH2Database
import net.corda.nodeapi.internal.storeLegalIdentity import net.corda.nodeapi.internal.storeLegalIdentity
import net.corda.tools.shell.InteractiveShell import net.corda.tools.shell.InteractiveShell
import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.activemq.artemis.utils.ReusableLatch
@ -99,6 +164,7 @@ import java.time.Clock
import java.time.Duration import java.time.Duration
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import kotlin.collections.set import kotlin.collections.set
import kotlin.reflect.KClass import kotlin.reflect.KClass
import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
@ -159,6 +225,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected lateinit var networkMapUpdater: NetworkMapUpdater protected lateinit var networkMapUpdater: NetworkMapUpdater
lateinit var securityManager: RPCSecurityManager lateinit var securityManager: RPCSecurityManager
private val shutdownExecutor = Executors.newSingleThreadExecutor()
/** Completes once the node has successfully registered with the network map service /** Completes once the node has successfully registered with the network map service
* or has loaded network map data from local database */ * or has loaded network map data from local database */
val nodeReadyFuture: CordaFuture<Unit> get() = _nodeReadyFuture val nodeReadyFuture: CordaFuture<Unit> get() = _nodeReadyFuture
@ -173,7 +241,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
/** The implementation of the [CordaRPCOps] interface used by this node. */ /** The implementation of the [CordaRPCOps] interface used by this node. */
open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence, smm: StateMachineManager): CordaRPCOps { open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence, smm: StateMachineManager): CordaRPCOps {
return SecureCordaRPCOps(services, smm, database, flowStarter)
return SecureCordaRPCOps(services, smm, database, flowStarter, { shutdownExecutor.submit { stop() } })
} }
private fun initCertificate() { private fun initCertificate() {
@ -657,7 +726,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val props = configuration.dataSourceProperties val props = configuration.dataSourceProperties
if (props.isEmpty()) throw DatabaseConfigurationException("There must be a database configured.") if (props.isEmpty()) throw DatabaseConfigurationException("There must be a database configured.")
val database = configureDatabase(props, configuration.database, identityService, schemaService, cordappLoader.appClassLoader) val database = configureDatabase(props, configuration.database, identityService, schemaService)
// Now log the vendor string as this will also cause a connection to be tested eagerly. // Now log the vendor string as this will also cause a connection to be tested eagerly.
logVendorString(database, log) logVendorString(database, log)
runOnStop += database::close runOnStop += database::close
@ -735,6 +804,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
toRun() toRun()
} }
runOnStop.clear() runOnStop.clear()
shutdownExecutor.shutdown()
_started = null _started = null
} }
@ -897,8 +967,7 @@ internal class NetworkMapCacheEmptyException : Exception()
fun configureDatabase(hikariProperties: Properties, fun configureDatabase(hikariProperties: Properties,
databaseConfig: DatabaseConfig, databaseConfig: DatabaseConfig,
identityService: IdentityService, identityService: IdentityService,
schemaService: SchemaService = NodeSchemaService(), schemaService: SchemaService = NodeSchemaService()): CordaPersistence {
cordappClassLoader: ClassLoader? = null): CordaPersistence {
// Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately // Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately
// Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default // Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if // so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
@ -911,7 +980,6 @@ fun configureDatabase(hikariProperties: Properties,
schemaService.schemaOptions.keys, schemaService.schemaOptions.keys,
dataSource, dataSource,
!isH2Database(jdbcUrl), !isH2Database(jdbcUrl),
databaseConfig, databaseConfig).nodeStartup()
cordappClassLoader ?: Thread.currentThread().contextClassLoader).nodeStartup() return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, attributeConverters)
return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, attributeConverters, cordappClassLoader)
} }

View File

@ -56,7 +56,8 @@ internal class CordaRPCOpsImpl(
private val services: ServiceHubInternal, private val services: ServiceHubInternal,
private val smm: StateMachineManager, private val smm: StateMachineManager,
private val database: CordaPersistence, private val database: CordaPersistence,
private val flowStarter: FlowStarter private val flowStarter: FlowStarter,
private val shutdownNode: () -> Unit
) : CordaRPCOps { ) : CordaRPCOps {
override fun networkMapSnapshot(): List<NodeInfo> { override fun networkMapSnapshot(): List<NodeInfo> {
val (snapshot, updates) = networkMapFeed() val (snapshot, updates) = networkMapFeed()
@ -311,6 +312,10 @@ internal class CordaRPCOpsImpl(
return services.nodeProperties.flowsDrainingMode.isEnabled() return services.nodeProperties.flowsDrainingMode.isEnabled()
} }
override fun shutdown() {
shutdownNode.invoke()
}
private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo { private fun stateMachineInfoFromFlowLogic(flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context) return StateMachineInfo(flowLogic.runId, flowLogic.javaClass.name, flowLogic.stateMachine.context.toFlowInitiator(), flowLogic.track(), flowLogic.stateMachine.context)
} }

View File

@ -182,6 +182,8 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val
override fun isFlowsDrainingModeEnabled(): Boolean = guard("isFlowsDrainingModeEnabled", implementation::isFlowsDrainingModeEnabled) override fun isFlowsDrainingModeEnabled(): Boolean = guard("isFlowsDrainingModeEnabled", implementation::isFlowsDrainingModeEnabled)
override fun shutdown() = guard("shutdown", implementation::shutdown)
// TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140 // TODO change to KFunction reference after Kotlin fixes https://youtrack.jetbrains.com/issue/KT-12140
private inline fun <RESULT> guard(methodName: String, action: () -> RESULT) = guard(methodName, emptyList(), action) private inline fun <RESULT> guard(methodName: String, action: () -> RESULT) = guard(methodName, emptyList(), action)

View File

@ -24,7 +24,8 @@ class SecureCordaRPCOps(services: ServiceHubInternal,
smm: StateMachineManager, smm: StateMachineManager,
database: CordaPersistence, database: CordaPersistence,
flowStarter: FlowStarter, flowStarter: FlowStarter,
val unsafe: CordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter)) : CordaRPCOps by RpcAuthorisationProxy(unsafe, ::rpcContext) { shutdownNode: () -> Unit,
val unsafe: CordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter, shutdownNode)) : CordaRPCOps by RpcAuthorisationProxy(unsafe, ::rpcContext) {
/** /**
* Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed * Returns the RPC protocol version, which is the same the node's Platform Version. Exists since version 1 so guaranteed

View File

@ -95,7 +95,7 @@ class CordaRPCOpsImplTest {
fun setup() { fun setup() {
mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.finance.contracts.asset", "net.corda.finance.schemas")) mockNet = InternalMockNetwork(cordappPackages = listOf("net.corda.finance.contracts.asset", "net.corda.finance.schemas"))
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
rpc = SecureCordaRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database, aliceNode.services) rpc = SecureCordaRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database, aliceNode.services, { })
CURRENT_RPC_CONTEXT.set(RpcAuthContext(InvocationContext.rpc(testActor()), buildSubject("TEST_USER", emptySet()))) CURRENT_RPC_CONTEXT.set(RpcAuthContext(InvocationContext.rpc(testActor()), buildSubject("TEST_USER", emptySet())))
mockNet.runNetwork() mockNet.runNetwork()

View File

@ -59,7 +59,7 @@ class NetworkMapUpdaterTest {
private val networkMapCache = createMockNetworkMapCache() private val networkMapCache = createMockNetworkMapCache()
private val nodeInfoMap = ConcurrentHashMap<SecureHash, SignedNodeInfo>() private val nodeInfoMap = ConcurrentHashMap<SecureHash, SignedNodeInfo>()
private val networkParamsMap = HashMap<SecureHash, NetworkParameters>() private val networkParamsMap = HashMap<SecureHash, NetworkParameters>()
private val networkMapCa: CertificateAndKeyPair = createDevNetworkMapCa() private val networkMapCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa()
private val cacheExpiryMs = 100 private val cacheExpiryMs = 100
private val networkMapClient = createMockNetworkMapClient() private val networkMapClient = createMockNetworkMapClient()
private val scheduler = TestScheduler() private val scheduler = TestScheduler()
@ -264,7 +264,7 @@ class NetworkMapUpdaterTest {
} }
on { getNetworkParameters(any()) }.then { on { getNetworkParameters(any()) }.then {
val paramsHash: SecureHash = uncheckedCast(it.arguments[0]) val paramsHash: SecureHash = uncheckedCast(it.arguments[0])
networkParamsMap[paramsHash]?.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate) networkParamsMap[paramsHash]?.let { networkMapCertAndKeyPair.sign(it) }
} }
on { ackNetworkParametersUpdate(any()) }.then { on { ackNetworkParametersUpdate(any()) }.then {
Unit Unit

View File

@ -142,7 +142,7 @@ open class MockServices private constructor(
val cordappLoader = CordappLoader.createWithTestPackages(cordappPackages) val cordappLoader = CordappLoader.createWithTestPackages(cordappPackages)
val dataSourceProps = makeTestDataSourceProperties(initialIdentity.name.organisation) val dataSourceProps = makeTestDataSourceProperties(initialIdentity.name.organisation)
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas) val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
val database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(initialIdentity.name.organisation), identityService, schemaService, cordappLoader.appClassLoader) val database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(initialIdentity.name.organisation), identityService, schemaService)
val mockService = database.transaction { val mockService = database.transaction {
object : MockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys) { object : MockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys) {
override val vaultService: VaultService = makeVaultService(database.hibernateConfig, schemaService) override val vaultService: VaultService = makeVaultService(database.hibernateConfig, schemaService)

View File

@ -10,6 +10,8 @@
package net.corda.testing.node.internal package net.corda.testing.node.internal
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.CordaException import net.corda.core.CordaException
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext import net.corda.core.context.InvocationContext
@ -24,8 +26,10 @@ import net.corda.core.utilities.seconds
import net.corda.node.services.api.StartedNodeServices import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.messaging.Message import net.corda.node.services.messaging.Message
import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.MessagingService
import net.corda.testing.driver.NodeHandle
import net.corda.testing.internal.chooseIdentity import net.corda.testing.internal.chooseIdentity
import net.corda.testing.node.InMemoryMessagingNetwork import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.User
import net.corda.testing.node.testContext import net.corda.testing.node.testContext
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.net.Socket import java.net.Socket
@ -123,4 +127,4 @@ internal interface InternalMockMessagingService : MessagingService {
fun stop() fun stop()
} }
fun CordaRPCClient.start(user: User) = start(user.username, user.password)

View File

@ -13,7 +13,7 @@ package net.corda.testing.node.internal
import net.corda.client.mock.Generator import net.corda.client.mock.Generator
import net.corda.client.rpc.internal.KryoClientSerializationScheme import net.corda.client.rpc.internal.KryoClientSerializationScheme
import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.concurrent.CordaFuture import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.AuthServiceId import net.corda.core.context.AuthServiceId
import net.corda.core.context.Trace import net.corda.core.context.Trace
@ -69,7 +69,7 @@ import net.corda.nodeapi.internal.config.User as InternalUser
inline fun <reified I : RPCOps> RPCDriverDSL.startInVmRpcClient( inline fun <reified I : RPCOps> RPCDriverDSL.startInVmRpcClient(
username: String = rpcTestUser.username, username: String = rpcTestUser.username,
password: String = rpcTestUser.password, password: String = rpcTestUser.password,
configuration: RPCClientConfiguration = RPCClientConfiguration.default configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
) = startInVmRpcClient(I::class.java, username, password, configuration) ) = startInVmRpcClient(I::class.java, username, password, configuration)
inline fun <reified I : RPCOps> RPCDriverDSL.startRandomRpcClient( inline fun <reified I : RPCOps> RPCDriverDSL.startRandomRpcClient(
@ -82,7 +82,7 @@ inline fun <reified I : RPCOps> RPCDriverDSL.startRpcClient(
rpcAddress: NetworkHostAndPort, rpcAddress: NetworkHostAndPort,
username: String = rpcTestUser.username, username: String = rpcTestUser.username,
password: String = rpcTestUser.password, password: String = rpcTestUser.password,
configuration: RPCClientConfiguration = RPCClientConfiguration.default configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
) = startRpcClient(I::class.java, rpcAddress, username, password, configuration) ) = startRpcClient(I::class.java, rpcAddress, username, password, configuration)
data class RpcBrokerHandle( data class RpcBrokerHandle(
@ -263,7 +263,7 @@ data class RPCDriverDSL(
rpcOpsClass: Class<I>, rpcOpsClass: Class<I>,
username: String = rpcTestUser.username, username: String = rpcTestUser.username,
password: String = rpcTestUser.password, password: String = rpcTestUser.password,
configuration: RPCClientConfiguration = RPCClientConfiguration.default configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
): CordaFuture<I> { ): CordaFuture<I> {
return driverDSL.executorService.fork { return driverDSL.executorService.fork {
val client = RPCClient<I>(inVmClientTransportConfiguration, configuration) val client = RPCClient<I>(inVmClientTransportConfiguration, configuration)
@ -334,7 +334,7 @@ data class RPCDriverDSL(
rpcAddress: NetworkHostAndPort, rpcAddress: NetworkHostAndPort,
username: String = rpcTestUser.username, username: String = rpcTestUser.username,
password: String = rpcTestUser.password, password: String = rpcTestUser.password,
configuration: RPCClientConfiguration = RPCClientConfiguration.default configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
): CordaFuture<I> { ): CordaFuture<I> {
return driverDSL.executorService.fork { return driverDSL.executorService.fork {
val client = RPCClient<I>(ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), rpcAddress, null), configuration) val client = RPCClient<I>(ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), rpcAddress, null), configuration)

View File

@ -45,7 +45,7 @@ import javax.ws.rs.core.Response.status
class NetworkMapServer(private val cacheTimeout: Duration, class NetworkMapServer(private val cacheTimeout: Duration,
hostAndPort: NetworkHostAndPort, hostAndPort: NetworkHostAndPort,
private val networkMapCa: CertificateAndKeyPair = createDevNetworkMapCa(), private val networkMapCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa(),
private val myHostNameValue: String = "test.host.name", private val myHostNameValue: String = "test.host.name",
vararg additionalServices: Any) : Closeable { vararg additionalServices: Any) : Closeable {
companion object { companion object {
@ -118,9 +118,7 @@ class NetworkMapServer(private val cacheTimeout: Duration,
inner class InMemoryNetworkMapService { inner class InMemoryNetworkMapService {
private val nodeInfoMap = mutableMapOf<SecureHash, SignedNodeInfo>() private val nodeInfoMap = mutableMapOf<SecureHash, SignedNodeInfo>()
val latestAcceptedParametersMap = mutableMapOf<PublicKey, SecureHash>() val latestAcceptedParametersMap = mutableMapOf<PublicKey, SecureHash>()
private val signedNetParams by lazy { private val signedNetParams by lazy { networkMapCertAndKeyPair.sign(networkParameters) }
networkParameters.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate)
}
@POST @POST
@Path("publish") @Path("publish")
@ -153,7 +151,7 @@ class NetworkMapServer(private val cacheTimeout: Duration,
@Produces(MediaType.APPLICATION_OCTET_STREAM) @Produces(MediaType.APPLICATION_OCTET_STREAM)
fun getNetworkMap(): Response { fun getNetworkMap(): Response {
val networkMap = NetworkMap(nodeInfoMap.keys.toList(), signedNetParams.raw.hash, parametersUpdate) val networkMap = NetworkMap(nodeInfoMap.keys.toList(), signedNetParams.raw.hash, parametersUpdate)
val signedNetworkMap = networkMap.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate) val signedNetworkMap = networkMapCertAndKeyPair.sign(networkMap)
return Response.ok(signedNetworkMap.serialize().bytes).header("Cache-Control", "max-age=${cacheTimeout.seconds}").build() return Response.ok(signedNetworkMap.serialize().bytes).header("Cache-Control", "max-age=${cacheTimeout.seconds}").build()
} }
@ -182,8 +180,10 @@ class NetworkMapServer(private val cacheTimeout: Duration,
val requestedParameters = if (requestedHash == signedNetParams.raw.hash) { val requestedParameters = if (requestedHash == signedNetParams.raw.hash) {
signedNetParams signedNetParams
} else if (requestedHash == nextNetworkParameters?.serialize()?.hash) { } else if (requestedHash == nextNetworkParameters?.serialize()?.hash) {
nextNetworkParameters?.signWithCert(networkMapCa.keyPair.private, networkMapCa.certificate) nextNetworkParameters?.let { networkMapCertAndKeyPair.sign(it) }
} else null } else {
null
}
requireNotNull(requestedParameters) requireNotNull(requestedParameters)
return Response.ok(requestedParameters!!.serialize().bytes).build() return Response.ok(requestedParameters!!.serialize().bytes).build()
} }

View File

@ -10,13 +10,21 @@
package net.corda.tools.shell; package net.corda.tools.shell;
import net.corda.core.messaging.*; import com.google.common.collect.Lists;
import net.corda.client.jackson.*; import com.google.common.collect.Maps;
import org.crsh.cli.*; import net.corda.client.jackson.StringToMethodCallParser;
import org.crsh.command.*; import net.corda.core.messaging.CordaRPCOps;
import org.crsh.cli.Argument;
import org.crsh.cli.Command;
import org.crsh.cli.Man;
import org.crsh.cli.Usage;
import org.crsh.command.InvocationContext;
import org.jetbrains.annotations.NotNull;
import java.util.*; import java.util.*;
import static java.util.Comparator.comparing;
// Note that this class cannot be converted to Kotlin because CRaSH does not understand InvocationContext<Map<?, ?>> which // Note that this class cannot be converted to Kotlin because CRaSH does not understand InvocationContext<Map<?, ?>> which
// is the closest you can get in Kotlin to raw types. // is the closest you can get in Kotlin to raw types.
@ -39,8 +47,7 @@ public class RunShellCommand extends InteractiveShellCommand {
emitHelp(context, parser); emitHelp(context, parser);
return null; return null;
} }
return InteractiveShell.runRPCFromString(command, out, context, ops(), objectMapper(), isSsh());
return InteractiveShell.runRPCFromString(command, out, context, ops(), objectMapper());
} }
private void emitHelp(InvocationContext<Map> context, StringToMethodCallParser<CordaRPCOps> parser) { private void emitHelp(InvocationContext<Map> context, StringToMethodCallParser<CordaRPCOps> parser) {
@ -48,21 +55,37 @@ public class RunShellCommand extends InteractiveShellCommand {
// Each element we emit is a map of column -> content. // Each element we emit is a map of column -> content.
Set<Map.Entry<String, String>> entries = parser.getAvailableCommands().entrySet(); Set<Map.Entry<String, String>> entries = parser.getAvailableCommands().entrySet();
ArrayList<Map.Entry<String, String>> entryList = new ArrayList<>(entries); ArrayList<Map.Entry<String, String>> entryList = new ArrayList<>(entries);
entryList.sort(Comparator.comparing(Map.Entry::getKey)); entryList.sort(comparing(Map.Entry::getKey));
for (Map.Entry<String, String> entry : entryList) { for (Map.Entry<String, String> entry : entryList) {
// Skip these entries as they aren't really interesting for the user. // Skip these entries as they aren't really interesting for the user.
if (entry.getKey().equals("startFlowDynamic")) continue; if (entry.getKey().equals("startFlowDynamic")) continue;
if (entry.getKey().equals("getProtocolVersion")) continue; if (entry.getKey().equals("getProtocolVersion")) continue;
// Use a LinkedHashMap to ensure that the Command column comes first.
Map<String, String> m = new LinkedHashMap<>();
m.put("Command", entry.getKey());
m.put("Parameter types", entry.getValue());
try { try {
context.provide(m); context.provide(commandAndDesc(entry.getKey(), entry.getValue()));
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
Lists.newArrayList(
commandAndDesc("shutdown", "Shuts node down (immediately)"),
commandAndDesc("gracefulShutdown", "Shuts node down gracefully, waiting for all flows to complete first.")
).forEach(stringStringMap -> {
try {
context.provide(stringStringMap);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@NotNull
private Map<String, String> commandAndDesc(String command, String description) {
// Use a LinkedHashMap to ensure that the Command column comes first.
Map<String, String> abruptShutdown = Maps.newLinkedHashMap();
abruptShutdown.put("Command", command);
abruptShutdown.put("Parameter types", description);
return abruptShutdown;
} }
} }

View File

@ -34,7 +34,7 @@ class CordaAuthenticationPlugin(private val rpcOps: (username: String, credentia
} }
try { try {
val ops = rpcOps(username, credential) val ops = rpcOps(username, credential)
return CordaSSHAuthInfo(true, ops) return CordaSSHAuthInfo(true, ops, isSsh = true)
} catch (e: ActiveMQSecurityException) { } catch (e: ActiveMQSecurityException) {
logger.warn(e.message) logger.warn(e.message)
} catch (e: Exception) { } catch (e: Exception) {

View File

@ -16,7 +16,7 @@ import net.corda.tools.shell.InteractiveShell.createYamlInputMapper
import net.corda.tools.shell.utlities.ANSIProgressRenderer import net.corda.tools.shell.utlities.ANSIProgressRenderer
import org.crsh.auth.AuthInfo import org.crsh.auth.AuthInfo
class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: CordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null) : AuthInfo { class CordaSSHAuthInfo(val successful: Boolean, val rpcOps: CordaRPCOps, val ansiProgressRenderer: ANSIProgressRenderer? = null, val isSsh: Boolean = false) : AuthInfo {
override fun isSuccessful(): Boolean = successful override fun isSuccessful(): Boolean = successful
val yamlInputMapper: ObjectMapper by lazy { val yamlInputMapper: ObjectMapper by lazy {

View File

@ -11,13 +11,16 @@
package net.corda.tools.shell package net.corda.tools.shell
import com.fasterxml.jackson.core.JsonGenerator import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParser import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.* import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.SerializerProvider
import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.google.common.io.Closeables
import net.corda.client.jackson.JacksonSupport import net.corda.client.jackson.JacksonSupport
import net.corda.client.jackson.StringToMethodCallParser import net.corda.client.jackson.StringToMethodCallParser
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.PermissionException import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.internal.createCordaRPCClientWithSslAndClassLoader import net.corda.client.rpc.internal.createCordaRPCClientWithSslAndClassLoader
import net.corda.core.CordaException import net.corda.core.CordaException
@ -28,13 +31,9 @@ import net.corda.core.identity.Party
import net.corda.core.internal.* import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.*
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.tools.shell.utlities.ANSIProgressRenderer import net.corda.tools.shell.utlities.ANSIProgressRenderer
import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer import net.corda.tools.shell.utlities.StdoutANSIProgressRenderer
import org.crsh.command.InvocationContext import org.crsh.command.InvocationContext
@ -60,12 +59,13 @@ import org.json.JSONObject
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import rx.Observable import rx.Observable
import rx.Subscriber import rx.Subscriber
import java.io.* import java.io.FileDescriptor
import java.io.FileInputStream
import java.io.InputStream
import java.io.PrintWriter
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.lang.reflect.UndeclaredThrowableException import java.lang.reflect.UndeclaredThrowableException
import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.Paths
import java.util.* import java.util.*
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
@ -83,69 +83,30 @@ import kotlin.concurrent.thread
// TODO: Resurrect or reimplement the mail plugin. // TODO: Resurrect or reimplement the mail plugin.
// TODO: Make it notice new shell commands added after the node started. // TODO: Make it notice new shell commands added after the node started.
data class SSHDConfiguration(val port: Int) {
companion object {
internal const val INVALID_PORT_FORMAT = "Invalid port: %s"
private const val MISSING_PORT_FORMAT = "Missing port: %s"
/**
* Parses a string of the form port into a [SSHDConfiguration].
* @throws IllegalArgumentException if the port is missing or the string is garbage.
*/
@JvmStatic
fun parse(str: String): SSHDConfiguration {
require(!str.isNullOrBlank()) { SSHDConfiguration.MISSING_PORT_FORMAT.format(str) }
val port = try {
str.toInt()
} catch (ex: NumberFormatException) {
throw IllegalArgumentException("Port syntax is invalid, expected port")
}
return SSHDConfiguration(port)
}
}
init {
require(port in (0..0xffff)) { INVALID_PORT_FORMAT.format(port) }
}
}
data class ShellSslOptions(override val sslKeystore: Path, override val keyStorePassword: String, override val trustStoreFile:Path, override val trustStorePassword: String) : SSLConfiguration {
override val certificatesDirectory: Path get() = Paths.get("")
}
data class ShellConfiguration(
val commandsDirectory: Path,
val cordappsDirectory: Path? = null,
var user: String = "",
var password: String = "",
val hostAndPort: NetworkHostAndPort,
val ssl: ShellSslOptions? = null,
val sshdPort: Int? = null,
val sshHostKeyDirectory: Path? = null,
val noLocalShell: Boolean = false) {
companion object {
const val SSH_PORT = 2222
const val COMMANDS_DIR = "shell-commands"
const val CORDAPPS_DIR = "cordapps"
const val SSHD_HOSTKEY_DIR = "ssh"
}
}
object InteractiveShell { object InteractiveShell {
private val log = LoggerFactory.getLogger(javaClass) private val log = LoggerFactory.getLogger(javaClass)
private lateinit var rpcOps: (username: String, credentials: String) -> CordaRPCOps private lateinit var rpcOps: (username: String, credentials: String) -> CordaRPCOps
private lateinit var connection: CordaRPCOps private lateinit var ops: CordaRPCOps
private lateinit var connection: CordaRPCConnection
private var shell: Shell? = null private var shell: Shell? = null
private var classLoader: ClassLoader? = null private var classLoader: ClassLoader? = null
private lateinit var shellConfiguration: ShellConfiguration
private var onExit: () -> Unit = {}
/** /**
* Starts an interactive shell connected to the local terminal. This shell gives administrator access to the node * Starts an interactive shell connected to the local terminal. This shell gives administrator access to the node
* internals. * internals.
*/ */
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) { fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) {
shellConfiguration = configuration
rpcOps = { username: String, credentials: String -> rpcOps = { username: String, credentials: String ->
val client = createCordaRPCClientWithSslAndClassLoader(hostAndPort = configuration.hostAndPort, val client = createCordaRPCClientWithSslAndClassLoader(hostAndPort = configuration.hostAndPort,
sslConfiguration = configuration.ssl, classLoader = classLoader) configuration = object : CordaRPCClientConfiguration {
client.start(username, credentials).proxy override val maxReconnectAttempts = 1
},
sslConfiguration = configuration.ssl,
classLoader = classLoader)
this.connection = client.start(username, credentials)
connection.proxy
} }
InteractiveShell.classLoader = classLoader InteractiveShell.classLoader = classLoader
val runSshDaemon = configuration.sshdPort != null val runSshDaemon = configuration.sshdPort != null
@ -170,6 +131,7 @@ object InteractiveShell {
} }
fun runLocalShell(onExit: () -> Unit = {}) { fun runLocalShell(onExit: () -> Unit = {}) {
this.onExit = onExit
val terminal = TerminalFactory.create() val terminal = TerminalFactory.create()
val consoleReader = ConsoleReader("Corda", FileInputStream(FileDescriptor.`in`), System.out, terminal) val consoleReader = ConsoleReader("Corda", FileInputStream(FileDescriptor.`in`), System.out, terminal)
val jlineProcessor = JLineProcessor(terminal.isAnsiSupported, shell, consoleReader, System.out) val jlineProcessor = JLineProcessor(terminal.isAnsiSupported, shell, consoleReader, System.out)
@ -215,18 +177,18 @@ object InteractiveShell {
return super.getPlugins().filterNot { it is JavaLanguage } + CordaAuthenticationPlugin(rpcOps) return super.getPlugins().filterNot { it is JavaLanguage } + CordaAuthenticationPlugin(rpcOps)
} }
} }
val attributes = emptyMap<String,Any>() val attributes = emptyMap<String, Any>()
val context = PluginContext(discovery, attributes, commandsFS, confFS, classLoader) val context = PluginContext(discovery, attributes, commandsFS, confFS, classLoader)
context.refresh() context.refresh()
this.config = config this.config = config
start(context) start(context)
connection = makeRPCOps(rpcOps, localUserName, localUserPassword) ops = makeRPCOps(rpcOps, localUserName, localUserPassword)
return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, connection, StdoutANSIProgressRenderer)) return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, ops, StdoutANSIProgressRenderer))
} }
} }
fun nodeInfo() = try { fun nodeInfo() = try {
connection.nodeInfo() ops.nodeInfo()
} catch (e: UndeclaredThrowableException) { } catch (e: UndeclaredThrowableException) {
throw e.cause ?: e throw e.cause ?: e
} }
@ -421,14 +383,16 @@ object InteractiveShell {
} }
@JvmStatic @JvmStatic
fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps, om: ObjectMapper): Any? { fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps, om: ObjectMapper, isSsh: Boolean = false): Any? {
val cmd = input.joinToString(" ").trim { it <= ' ' } val cmd = input.joinToString(" ").trim { it <= ' ' }
if (cmd.toLowerCase().startsWith("startflow")) { if (cmd.startsWith("startflow", ignoreCase = true)) {
// The flow command provides better support and startFlow requires special handling anyway due to // The flow command provides better support and startFlow requires special handling anyway due to
// the generic startFlow RPC interface which offers no type information with which to parse the // the generic startFlow RPC interface which offers no type information with which to parse the
// string form of the command. // string form of the command.
out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow) out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow)
return null return null
} else if (cmd.substringAfter(" ").trim().equals("gracefulShutdown", ignoreCase = true)) {
return InteractiveShell.gracefulShutdown(out, cordaRPCOps, isSsh)
} }
var result: Any? = null var result: Any? = null
@ -467,6 +431,68 @@ object InteractiveShell {
return result return result
} }
@JvmStatic
fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps, isSsh: Boolean = false) {
fun display(statements: RenderPrintWriter.() -> Unit) {
statements.invoke(userSessionOut)
userSessionOut.flush()
}
var isShuttingDown = false
try {
display {
println("Orchestrating a clean shutdown...")
println("...enabling draining mode")
}
cordaRPCOps.setFlowsDrainingModeEnabled(true)
display {
println("...waiting for in-flight flows to be completed")
}
cordaRPCOps.pendingFlowsCount().updates
.doOnError { error ->
log.error(error.message)
throw error
}
.doOnNext { remaining ->
display {
println("...remaining: ${remaining.first}/${remaining.second}")
}
}
.doOnCompleted {
if (isSsh) {
// print in the original Shell process
System.out.println("Shutting down the node via remote SSH session (it may take a while)")
}
display {
println("Shutting down the node (it may take a while)")
}
cordaRPCOps.shutdown()
isShuttingDown = true
connection.forceClose()
display {
println("...done, quitting standalone shell now.")
}
onExit.invoke()
}.toBlocking().single()
} catch (e: StringToMethodCallParser.UnparseableCallException) {
display {
println(e.message, Color.red)
println("Please try 'man run' to learn what syntax is acceptable")
}
} catch (e: Exception) {
if (!isShuttingDown) {
display {
println("RPC failed: ${e.rootCause}", Color.red)
}
}
} finally {
InputStreamSerializer.invokeContext = null
InputStreamDeserializer.closeAll()
}
}
private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter): CordaFuture<Unit> { private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter): CordaFuture<Unit> {
val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) }
@ -528,6 +554,7 @@ object InteractiveShell {
return printNextElements(response.updates, printerFun, out) return printNextElements(response.updates, printerFun, out)
} }
if (response is Observable<*>) { if (response is Observable<*>) {
return printNextElements(response, printerFun, out) return printNextElements(response, printerFun, out)
} }
@ -542,94 +569,4 @@ object InteractiveShell {
return subscriber.future return subscriber.future
} }
//region Extra serializers
//
// These serializers are used to enable the user to specify objects that aren't natural data containers in the shell,
// and for the shell to print things out that otherwise wouldn't be usefully printable.
private object ObservableSerializer : JsonSerializer<Observable<*>>() {
override fun serialize(value: Observable<*>, gen: JsonGenerator, serializers: SerializerProvider) {
gen.writeString("(observable)")
}
}
// A file name is deserialized to an InputStream if found.
object InputStreamDeserializer : JsonDeserializer<InputStream>() {
// Keep track of them so we can close them later.
private val streams = Collections.synchronizedSet(HashSet<InputStream>())
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): InputStream {
val stream = object : BufferedInputStream(Files.newInputStream(Paths.get(p.text))) {
override fun close() {
super.close()
streams.remove(this)
}
}
streams += stream
return stream
}
fun closeAll() {
// Clone the set with toList() here so each closed stream can be removed from the set inside close().
streams.toList().forEach { Closeables.closeQuietly(it) }
}
}
// An InputStream found in a response triggers a request to the user to provide somewhere to save it.
private object InputStreamSerializer : JsonSerializer<InputStream>() {
var invokeContext: InvocationContext<*>? = null
override fun serialize(value: InputStream, gen: JsonGenerator, serializers: SerializerProvider) {
try {
val toPath = invokeContext!!.readLine("Path to save stream to (enter to ignore): ", true)
if (toPath == null || toPath.isBlank()) {
gen.writeString("<not saved>")
} else {
val path = Paths.get(toPath)
value.copyTo(path)
gen.writeString("<saved to: ${path.toAbsolutePath()}>")
}
} finally {
try {
value.close()
} catch (e: IOException) {
// Ignore.
}
}
}
}
/**
* String value deserialized to [UniqueIdentifier].
* Any string value used as [UniqueIdentifier.externalId].
* If string contains underscore(i.e. externalId_uuid) then split with it.
* Index 0 as [UniqueIdentifier.externalId]
* Index 1 as [UniqueIdentifier.id]
* */
object UniqueIdentifierDeserializer : JsonDeserializer<UniqueIdentifier>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): UniqueIdentifier {
//Check if externalId and UUID may be separated by underscore.
if (p.text.contains("_")) {
val ids = p.text.split("_")
//Create UUID object from string.
val uuid: UUID = UUID.fromString(ids[1])
//Create UniqueIdentifier object using externalId and UUID.
return UniqueIdentifier(ids[0], uuid)
}
//Any other string used as externalId.
return UniqueIdentifier.fromString(p.text)
}
}
/**
* String value deserialized to [UUID].
* */
object UUIDDeserializer : JsonDeserializer<UUID>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): UUID {
//Create UUID object from string.
return UUID.fromString(p.text)
}
}
//endregion
} }

View File

@ -20,4 +20,5 @@ open class InteractiveShellCommand : BaseCommand() {
fun ops() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).rpcOps fun ops() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).rpcOps
fun ansiProgressRenderer() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).ansiProgressRenderer fun ansiProgressRenderer() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).ansiProgressRenderer
fun objectMapper() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).yamlInputMapper fun objectMapper() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).yamlInputMapper
fun isSsh() = ((context.session as CRaSHSession).authInfo as CordaSSHAuthInfo).isSsh
} }

View File

@ -26,6 +26,5 @@ fun makeRPCOps(getCordaRPCOps: (username: String, credential: String) -> CordaRP
// Unpack exception. // Unpack exception.
throw e.targetException throw e.targetException
} }
} }) as CordaRPCOps
) as CordaRPCOps
} }

View File

@ -0,0 +1,27 @@
package net.corda.tools.shell
data class SSHDConfiguration(val port: Int) {
companion object {
internal const val INVALID_PORT_FORMAT = "Invalid port: %s"
private const val MISSING_PORT_FORMAT = "Missing port: %s"
/**
* Parses a string of the form port into a [SSHDConfiguration].
* @throws IllegalArgumentException if the port is missing or the string is garbage.
*/
@JvmStatic
fun parse(str: String): SSHDConfiguration {
require(!str.isNullOrBlank()) { SSHDConfiguration.MISSING_PORT_FORMAT.format(str) }
val port = try {
str.toInt()
} catch (ex: NumberFormatException) {
throw IllegalArgumentException("Port syntax is invalid, expected port")
}
return SSHDConfiguration(port)
}
}
init {
require(port in (0..0xffff)) { INVALID_PORT_FORMAT.format(port) }
}
}

View File

@ -0,0 +1,103 @@
package net.corda.tools.shell
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.JsonDeserializer
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.SerializerProvider
import com.google.common.io.Closeables
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.internal.copyTo
import org.crsh.command.InvocationContext
import rx.Observable
import java.io.BufferedInputStream
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Paths
import java.util.*
//region Extra serializers
//
// These serializers are used to enable the user to specify objects that aren't natural data containers in the shell,
// and for the shell to print things out that otherwise wouldn't be usefully printable.
object ObservableSerializer : JsonSerializer<Observable<*>>() {
override fun serialize(value: Observable<*>, gen: JsonGenerator, serializers: SerializerProvider) {
gen.writeString("(observable)")
}
}
/**
* String value deserialized to [UniqueIdentifier].
* Any string value used as [UniqueIdentifier.externalId].
* If string contains underscore(i.e. externalId_uuid) then split with it.
* Index 0 as [UniqueIdentifier.externalId]
* Index 1 as [UniqueIdentifier.id]
* */
object UniqueIdentifierDeserializer : JsonDeserializer<UniqueIdentifier>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): UniqueIdentifier {
//Check if externalId and UUID may be separated by underscore.
if (p.text.contains("_")) {
val ids = p.text.split("_")
//Create UUID object from string.
val uuid: UUID = UUID.fromString(ids[1])
//Create UniqueIdentifier object using externalId and UUID.
return UniqueIdentifier(ids[0], uuid)
}
//Any other string used as externalId.
return UniqueIdentifier.fromString(p.text)
}
}
/**
* String value deserialized to [UUID].
* */
object UUIDDeserializer : JsonDeserializer<UUID>() {
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): UUID {
//Create UUID object from string.
return UUID.fromString(p.text)
}
}
// An InputStream found in a response triggers a request to the user to provide somewhere to save it.
object InputStreamSerializer : JsonSerializer<InputStream>() {
var invokeContext: InvocationContext<*>? = null
override fun serialize(value: InputStream, gen: JsonGenerator, serializers: SerializerProvider) {
value.use {
val toPath = invokeContext!!.readLine("Path to save stream to (enter to ignore): ", true)
if (toPath == null || toPath.isBlank()) {
gen.writeString("<not saved>")
} else {
val path = Paths.get(toPath)
it.copyTo(path)
gen.writeString("<saved to: ${path.toAbsolutePath()}>")
}
}
}
}
// A file name is deserialized to an InputStream if found.
object InputStreamDeserializer : JsonDeserializer<InputStream>() {
// Keep track of them so we can close them later.
private val streams = Collections.synchronizedSet(HashSet<InputStream>())
override fun deserialize(p: JsonParser, ctxt: DeserializationContext): InputStream {
val stream = object : BufferedInputStream(Files.newInputStream(Paths.get(p.text))) {
override fun close() {
super.close()
streams.remove(this)
}
}
streams += stream
return stream
}
fun closeAll() {
// Clone the set with toList() here so each closed stream can be removed from the set inside close().
streams.toList().forEach { Closeables.closeQuietly(it) }
}
}
//endregion

View File

@ -0,0 +1,28 @@
package net.corda.tools.shell
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.config.SSLConfiguration
import java.nio.file.Path
import java.nio.file.Paths
data class ShellConfiguration(
val commandsDirectory: Path,
val cordappsDirectory: Path? = null,
var user: String = "",
var password: String = "",
val hostAndPort: NetworkHostAndPort,
val ssl: ShellSslOptions? = null,
val sshdPort: Int? = null,
val sshHostKeyDirectory: Path? = null,
val noLocalShell: Boolean = false) {
companion object {
const val SSH_PORT = 2222
const val COMMANDS_DIR = "shell-commands"
const val CORDAPPS_DIR = "cordapps"
const val SSHD_HOSTKEY_DIR = "ssh"
}
}
data class ShellSslOptions(override val sslKeystore: Path, override val keyStorePassword: String, override val trustStoreFile:Path, override val trustStorePassword: String) : SSLConfiguration {
override val certificatesDirectory: Path get() = Paths.get("")
}

View File

@ -115,6 +115,7 @@ class StandaloneShell(private val configuration: ShellConfiguration) {
configuration.sshdPort?.apply{ println("SSH server listening on port $this.") } configuration.sshdPort?.apply{ println("SSH server listening on port $this.") }
exit.await() exit.await()
// because we can't clean certain Crash Shell threads that block on read()
exitProcess(0) exitProcess(0)
} }
} }

View File

@ -37,8 +37,8 @@ class CustomTypeJsonParsingTests {
fun setup() { fun setup() {
objectMapper = ObjectMapper() objectMapper = ObjectMapper()
val simpleModule = SimpleModule() val simpleModule = SimpleModule()
simpleModule.addDeserializer(UniqueIdentifier::class.java, InteractiveShell.UniqueIdentifierDeserializer) simpleModule.addDeserializer(UniqueIdentifier::class.java, UniqueIdentifierDeserializer)
simpleModule.addDeserializer(UUID::class.java, InteractiveShell.UUIDDeserializer) simpleModule.addDeserializer(UUID::class.java, UUIDDeserializer)
objectMapper.registerModule(simpleModule) objectMapper.registerModule(simpleModule)
} }