diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 354adb1db8..a34d83e643 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -2464,7 +2464,6 @@ public interface net.corda.core.flows.IdentifiableException @Nullable public Long getErrorId() ## -@CordaSerializable public final class net.corda.core.flows.IllegalFlowLogicException extends java.lang.IllegalArgumentException public (Class, String) public (String, String) diff --git a/Jenkinsfile b/Jenkinsfile index d19be16f6e..a9d08e4d17 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -39,15 +39,6 @@ pipeline { " allParallelIntegrationTest" } } - stage('Unit Tests') { - steps { - sh "./gradlew " + - "-DbuildId=\"\${BUILD_ID}\" " + - "-Dkubenetize=true " + - "-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" + - " allParallelUnitTest" - } - } } } diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 87468c4f89..0e2e78274d 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -23,15 +23,7 @@ allprojects { } } -configurations { - runtime -} - dependencies { - // Add the top-level projects ONLY to the host project. - runtime project.childProjects.collect { n, p -> - project(p.path) - } compile gradleApi() compile "io.fabric8:kubernetes-client:4.4.1" compile 'org.apache.commons:commons-compress:1.19' diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 7c1d7cdc2d..ee2f5052a6 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -1,10 +1,10 @@ package net.corda.client.rpc.internal -import co.paralleluniverse.common.util.SameThreadExecutor import com.github.benmanes.caffeine.cache.Cache import com.github.benmanes.caffeine.cache.Caffeine import com.github.benmanes.caffeine.cache.RemovalCause import com.github.benmanes.caffeine.cache.RemovalListener +import com.google.common.util.concurrent.MoreExecutors import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.ThreadFactoryBuilder import net.corda.client.rpc.ConnectionFailureException @@ -132,7 +132,10 @@ class RPCClientProxyHandler( private var sendExecutor: ExecutorService? = null // A sticky pool for running Observable.onNext()s. We need the stickiness to preserve the observation ordering. - private val observationExecutorThreadFactory = ThreadFactoryBuilder().setNameFormat("rpc-client-observation-pool-%d").setDaemon(true).build() + private val observationExecutorThreadFactory = ThreadFactoryBuilder() + .setNameFormat("rpc-client-observation-pool-%d") + .setDaemon(true) + .build() private val observationExecutorPool = LazyStickyPool(rpcConfiguration.observationExecutorPoolSize) { Executors.newFixedThreadPool(1, observationExecutorThreadFactory) } @@ -156,12 +159,14 @@ class RPCClientProxyHandler( private val observablesToReap = ThreadBox(object { var observables = ArrayList() }) - private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext) + private val serializationContextWithObservableContext = RpcClientObservableDeSerializer + .createContext(serializationContext, observableContext) private fun createRpcObservableMap(): RpcObservableMap { val onObservableRemove = RemovalListener>> { key, _, cause -> val observableId = key!! val rpcCallSite: CallSite? = callSiteMap?.remove(observableId) + if (cause == RemovalCause.COLLECTED) { log.warn(listOf( "A hot observable returned from an RPC was never subscribed to.", @@ -175,7 +180,13 @@ class RPCClientProxyHandler( } observablesToReap.locked { observables.add(observableId) } } - return cacheFactory.buildNamed(Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()), "RpcClientProxyHandler_rpcObservable") + return cacheFactory.buildNamed( + Caffeine.newBuilder() + .weakValues() + .removalListener(onObservableRemove) + .executor(MoreExecutors.directExecutor()), + "RpcClientProxyHandler_rpcObservable" + ) } private var sessionFactory: ClientSessionFactory? = null diff --git a/constants.properties b/constants.properties index 546a658e02..aff84d2c0c 100644 --- a/constants.properties +++ b/constants.properties @@ -32,3 +32,5 @@ metricsVersion=4.1.0 metricsNewRelicVersion=1.1.1 openSourceBranch=https://github.com/corda/corda/blob/master openSourceSamplesBranch=https://github.com/corda/samples/blob/master +jolokiaAgentVersion=1.6.1 + diff --git a/core-tests/src/test/kotlin/net/corda/coretests/contracts/TransactionVerificationExceptionSerialisationTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/contracts/TransactionVerificationExceptionSerialisationTests.kt index 03e2f5e731..e8bf2cf807 100644 --- a/core-tests/src/test/kotlin/net/corda/coretests/contracts/TransactionVerificationExceptionSerialisationTests.kt +++ b/core-tests/src/test/kotlin/net/corda/coretests/contracts/TransactionVerificationExceptionSerialisationTests.kt @@ -1,8 +1,13 @@ package net.corda.coretests.contracts +import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint import net.corda.core.contracts.Contract +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionState import net.corda.core.contracts.TransactionVerificationException import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.generateKeyPair +import net.corda.core.identity.Party import net.corda.core.internal.createContractCreationError import net.corda.core.internal.createContractRejection import net.corda.core.transactions.LedgerTransaction @@ -13,6 +18,9 @@ import net.corda.serialization.internal.amqp.SerializationOutput import net.corda.serialization.internal.amqp.SerializerFactoryBuilder import net.corda.serialization.internal.amqp.custom.PublicKeySerializer import net.corda.serialization.internal.amqp.custom.ThrowableSerializer +import net.corda.testing.common.internal.testNetworkParameters +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME import net.corda.testing.core.DUMMY_BANK_A_NAME import net.corda.testing.core.DUMMY_NOTARY_NAME import net.corda.testing.core.TestIdentity @@ -23,7 +31,9 @@ class TransactionVerificationExceptionSerialisationTests { private fun defaultFactory() = SerializerFactoryBuilder.build( AllWhitelist, ClassLoader.getSystemClassLoader() - ).apply { register(ThrowableSerializer(this)) } + ).apply { + register(ThrowableSerializer(this)) + } private val context get() = AMQP_RPC_CLIENT_CONTEXT @@ -179,4 +189,125 @@ class TransactionVerificationExceptionSerialisationTests { assertEquals(exc.message, exc2.message) } + + @Test + fun transactionNetworkParameterOrderingExceptionTest() { + val exception = TransactionVerificationException.TransactionNetworkParameterOrderingException( + txid, + StateRef(SecureHash.zeroHash, 1), + testNetworkParameters(), + testNetworkParameters()) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } + + @Test + fun missingNetworkParametersExceptionTest() { + val exception = TransactionVerificationException.MissingNetworkParametersException(txid, SecureHash.zeroHash) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } + + @Test + fun constraintPropagationRejectionTest() { + val exception = TransactionVerificationException.ConstraintPropagationRejection(txid, "com.test.Contract", + AlwaysAcceptAttachmentConstraint, AlwaysAcceptAttachmentConstraint) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + assertEquals("com.test.Contract", exception2.contractClass) + } + + @Test + fun transactionDuplicateEncumbranceExceptionTest() { + val exception = TransactionVerificationException.TransactionDuplicateEncumbranceException(txid, 1) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } + + @Test + fun transactionNonMatchingEncumbranceExceptionTest() { + val exception = TransactionVerificationException.TransactionNonMatchingEncumbranceException(txid, listOf(1, 2, 3)) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } + + @Test + fun transactionNotaryMismatchEncumbranceExceptionTest() { + val exception = TransactionVerificationException.TransactionNotaryMismatchEncumbranceException( + txid, 1, 2, Party(ALICE_NAME, generateKeyPair().public), Party(BOB_NAME, generateKeyPair().public)) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } + + @Test + fun transactionContractConflictExceptionTest() { + val exception = TransactionVerificationException.TransactionContractConflictException( + txid, TransactionState(DummyContractState(), notary = Party(BOB_NAME, generateKeyPair().public)), "aa") + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } + + @Test + fun transactionRequiredContractUnspecifiedExceptionTest() { + val exception = TransactionVerificationException.TransactionRequiredContractUnspecifiedException( + txid, TransactionState(DummyContractState(), notary = Party(BOB_NAME, generateKeyPair().public))) + val exception2 = DeserializationInput(factory) + .deserialize( + SerializationOutput(factory) + .serialize(exception, context), + context) + + assertEquals(exception.message, exception2.message) + assertEquals(exception.cause?.message, exception2.cause?.message) + assertEquals(exception.txId, exception2.txId) + } } \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/contracts/TransactionVerificationException.kt b/core/src/main/kotlin/net/corda/core/contracts/TransactionVerificationException.kt index 84429f19bf..826a7d9eb4 100644 --- a/core/src/main/kotlin/net/corda/core/contracts/TransactionVerificationException.kt +++ b/core/src/main/kotlin/net/corda/core/contracts/TransactionVerificationException.kt @@ -70,8 +70,17 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S * @property outputConstraint The constraint of the outputs state. */ @KeepForDJVM - class ConstraintPropagationRejection(txId: SecureHash, val contractClass: String, inputConstraint: AttachmentConstraint, outputConstraint: AttachmentConstraint) - : TransactionVerificationException(txId, "Contract constraints for $contractClass are not propagated correctly. The outputConstraint: $outputConstraint is not a valid transition from the input constraint: $inputConstraint.", null) + class ConstraintPropagationRejection(txId: SecureHash, message: String) : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, + contractClass: String, + inputConstraint: AttachmentConstraint, + outputConstraint: AttachmentConstraint) : + this(txId, "Contract constraints for $contractClass are not propagated correctly. " + + "The outputConstraint: $outputConstraint is not a valid transition from the input constraint: $inputConstraint.") + + // This is only required for backwards compatibility. In case the message format changes, update the index. + val contractClass: String = message.split(" ")[3] + } /** * The transaction attachment that contains the [contractClass] class didn't meet the constraints specified by @@ -153,19 +162,24 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S * be satisfied. */ @KeepForDJVM - class TransactionDuplicateEncumbranceException(txId: SecureHash, index: Int) - : TransactionVerificationException(txId, "The bi-directionality property of encumbered output states " + - "is not satisfied. Index $index is referenced more than once", null) + class TransactionDuplicateEncumbranceException(txId: SecureHash, message: String) + : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, index: Int) : this(txId, "The bi-directionality property of encumbered output states " + + "is not satisfied. Index $index is referenced more than once") + } /** * An encumbered state should also be referenced as the encumbrance of another state in order to satisfy the * bi-directionality property (a full cycle should be present). */ @KeepForDJVM - class TransactionNonMatchingEncumbranceException(txId: SecureHash, nonMatching: Collection) - : TransactionVerificationException(txId, "The bi-directionality property of encumbered output states " + - "is not satisfied. Encumbered states should also be referenced as an encumbrance of another state to form " + - "a full cycle. Offending indices $nonMatching", null) + class TransactionNonMatchingEncumbranceException(txId: SecureHash, message: String) + : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, nonMatching: Collection) : this(txId, + "The bi-directionality property of encumbered output states " + + "is not satisfied. Encumbered states should also be referenced as an encumbrance of another state to form " + + "a full cycle. Offending indices $nonMatching") + } /** * All encumbered states should be assigned to the same notary. This is due to the fact that multi-notary @@ -173,9 +187,13 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S * in the same transaction. */ @KeepForDJVM - class TransactionNotaryMismatchEncumbranceException(txId: SecureHash, encumberedIndex: Int, encumbranceIndex: Int, encumberedNotary: Party, encumbranceNotary: Party) - : TransactionVerificationException(txId, "Encumbered output states assigned to different notaries found. " + - "Output state with index $encumberedIndex is assigned to notary [$encumberedNotary], while its encumbrance with index $encumbranceIndex is assigned to notary [$encumbranceNotary]", null) + class TransactionNotaryMismatchEncumbranceException(txId: SecureHash, message: String) + : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, encumberedIndex: Int, encumbranceIndex: Int, encumberedNotary: Party, encumbranceNotary: Party) : + this(txId, "Encumbered output states assigned to different notaries found. " + + "Output state with index $encumberedIndex is assigned to notary [$encumberedNotary], " + + "while its encumbrance with index $encumbranceIndex is assigned to notary [$encumbranceNotary]") + } /** * If a state is identified as belonging to a contract, either because the state class is defined as an inner class @@ -186,35 +204,44 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S * @param requiredContractClassName The class name of the contract to which the state belongs. */ @KeepForDJVM - class TransactionContractConflictException(txId: SecureHash, state: TransactionState, requiredContractClassName: String) - : TransactionVerificationException(txId, - """ - State of class ${state.data::class.java.typeName} belongs to contract $requiredContractClassName, but + class TransactionContractConflictException(txId: SecureHash, message: String) + : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, state: TransactionState, requiredContractClassName: String): this(txId, + """ + State of class ${state.data ::class.java.typeName} belongs to contract $requiredContractClassName, but is bundled in TransactionState with ${state.contract}. For details see: https://docs.corda.net/api-contract-constraints.html#contract-state-agreement - """.trimIndent().replace('\n', ' '), null) + """.trimIndent().replace('\n', ' ')) + } // TODO: add reference to documentation @KeepForDJVM - class TransactionRequiredContractUnspecifiedException(txId: SecureHash, state: TransactionState) - : TransactionVerificationException(txId, - """ + class TransactionRequiredContractUnspecifiedException(txId: SecureHash, message: String) + : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, state: TransactionState) : this(txId, + """ State of class ${state.data::class.java.typeName} does not have a specified owning contract. Add the @BelongsToContract annotation to this class to ensure that it can only be bundled in a TransactionState with the correct contract. For details see: https://docs.corda.net/api-contract-constraints.html#contract-state-agreement - """.trimIndent(), null) - + """.trimIndent()) + } /** * If the network parameters associated with an input or reference state in a transaction are more recent than the network parameters of the new transaction itself. */ @KeepForDJVM - class TransactionNetworkParameterOrderingException(txId: SecureHash, inputStateRef: StateRef, txnNetworkParameters: NetworkParameters, inputNetworkParameters: NetworkParameters) - : TransactionVerificationException(txId, "The network parameters epoch (${txnNetworkParameters.epoch}) of this transaction " + - "is older than the epoch (${inputNetworkParameters.epoch}) of input state: $inputStateRef", null) + class TransactionNetworkParameterOrderingException(txId: SecureHash, message: String) : + TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, + inputStateRef: StateRef, + txnNetworkParameters: NetworkParameters, + inputNetworkParameters: NetworkParameters) + : this(txId, "The network parameters epoch (${txnNetworkParameters.epoch}) of this transaction " + + "is older than the epoch (${inputNetworkParameters.epoch}) of input state: $inputStateRef") + } /** * Thrown when the network parameters with hash: missingNetworkParametersHash is not available at this node. Usually all the parameters @@ -224,9 +251,11 @@ abstract class TransactionVerificationException(val txId: SecureHash, message: S * @param missingNetworkParametersHash Missing hash of the network parameters associated to this transaction */ @KeepForDJVM - class MissingNetworkParametersException(txId: SecureHash, missingNetworkParametersHash: SecureHash) - : TransactionVerificationException(txId, "Couldn't find network parameters with hash: $missingNetworkParametersHash related to this transaction: $txId", null) - + class MissingNetworkParametersException(txId: SecureHash, message: String) + : TransactionVerificationException(txId, message, null) { + constructor(txId: SecureHash, missingNetworkParametersHash: SecureHash) : + this(txId, "Couldn't find network parameters with hash: $missingNetworkParametersHash related to this transaction: $txId") + } /** Whether the inputs or outputs list contains an encumbrance issue, see [TransactionMissingEncumbranceException]. */ @CordaSerializable diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt index 7cd903d712..7781c38b95 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogicRef.kt @@ -45,7 +45,6 @@ interface FlowLogicRefFactory { * * @property type the fully qualified name of the class that failed checks. */ -@CordaSerializable class IllegalFlowLogicException(val type: String, msg: String) : IllegalArgumentException("A FlowLogicRef cannot be constructed for FlowLogic of type $type: $msg") { constructor(type: Class<*>, msg: String) : this(type.name, msg) diff --git a/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt b/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt index 3b7eab103a..abcaa6b4ff 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt @@ -80,9 +80,7 @@ interface IdentityService { * @param key The owning [PublicKey] of the [Party]. * @return Returns a [Party] with a matching owningKey if known, else returns null. */ - fun partyFromKey(key: PublicKey): Party? = - @Suppress("DEPRECATION") - certificateFromKey(key)?.party + fun partyFromKey(key: PublicKey): Party? /** * Resolves a party name to the well known identity [Party] instance for this name. Where possible well known identity diff --git a/detekt-baseline.xml b/detekt-baseline.xml index 740f8e1f80..d37757a64c 100644 --- a/detekt-baseline.xml +++ b/detekt-baseline.xml @@ -708,6 +708,7 @@ LongParameterList:LedgerTransaction.kt$LedgerTransaction$(inputs: List<StateAndRef<ContractState>>, outputs: List<TransactionState<ContractState>>, commands: List<CommandWithParties<CommandData>>, attachments: List<Attachment>, id: SecureHash, notary: Party?, timeWindow: TimeWindow?, privacySalt: PrivacySalt ) LongParameterList:LedgerTransaction.kt$LedgerTransaction.Companion$( inputs: List<StateAndRef<ContractState>>, outputs: List<TransactionState<ContractState>>, commands: List<CommandWithParties<CommandData>>, attachments: List<Attachment>, id: SecureHash, notary: Party?, timeWindow: TimeWindow?, privacySalt: PrivacySalt, networkParameters: NetworkParameters, references: List<StateAndRef<ContractState>>, componentGroups: List<ComponentGroup>? = null, serializedInputs: List<SerializedStateAndRef>? = null, serializedReferences: List<SerializedStateAndRef>? = null, isAttachmentTrusted: (Attachment) -> Boolean ) LongParameterList:MockServices.kt$MockServices.Companion$( cordappLoader: CordappLoader, identityService: IdentityService, networkParameters: NetworkParameters, initialIdentity: TestIdentity, moreKeys: Set<KeyPair>, keyManagementService: KeyManagementService, schemaService: SchemaService, persistence: CordaPersistence ) + LongParameterList:MockServices.kt$MockServices.Companion$( cordappPackages: List<String>, initialIdentity: TestIdentity, networkParameters: NetworkParameters = testNetworkParameters(modifiedTime = Instant.MIN), moreKeys: Set<KeyPair>, moreIdentities: Set<PartyAndCertificate>, cacheFactory: TestingNamedCacheFactory = TestingNamedCacheFactory() ) LongParameterList:NetworkBootstrapperTest.kt$NetworkBootstrapperTest$(copyCordapps: CopyCordapps = CopyCordapps.FirstRunOnly, packageOwnership: Map<String, PublicKey>? = emptyMap(), minimumPlatformVerison: Int? = PLATFORM_VERSION, maxMessageSize: Int? = DEFAULT_MAX_MESSAGE_SIZE, maxTransactionSize: Int? = DEFAULT_MAX_TRANSACTION_SIZE, eventHorizon: Duration? = 30.days) LongParameterList:NetworkMapUpdater.kt$NetworkMapUpdater$(trustRoot: X509Certificate, currentParametersHash: SecureHash, ourNodeInfo: SignedNodeInfo, networkParameters: NetworkParameters, keyManagementService: KeyManagementService, networkParameterAcceptanceSettings: NetworkParameterAcceptanceSettings) LongParameterList:NetworkParameters.kt$NetworkParameters$(minimumPlatformVersion: Int = this.minimumPlatformVersion, notaries: List<NotaryInfo> = this.notaries, maxMessageSize: Int = this.maxMessageSize, maxTransactionSize: Int = this.maxTransactionSize, modifiedTime: Instant = this.modifiedTime, epoch: Int = this.epoch, whitelistedContractImplementations: Map<String, List<AttachmentId>> = this.whitelistedContractImplementations ) @@ -1226,6 +1227,7 @@ MagicNumber:TransactionBuilder.kt$TransactionBuilder$4 MagicNumber:TransactionDSLInterpreter.kt$TransactionDSL$30 MagicNumber:TransactionUtils.kt$4 + MagicNumber:TransactionVerificationException.kt$TransactionVerificationException.ConstraintPropagationRejection$3 MagicNumber:TransactionVerifierServiceInternal.kt$Verifier$4 MagicNumber:TransactionViewer.kt$TransactionViewer$15.0 MagicNumber:TransactionViewer.kt$TransactionViewer$20.0 @@ -2216,7 +2218,6 @@ MaxLineLength:H2SecurityTests.kt$H2SecurityTests$startNode(customOverrides = mapOf(h2AddressKey to "${InetAddress.getLocalHost().hostAddress}:${getFreePort()}")).getOrThrow() MaxLineLength:HTTPNetworkRegistrationService.kt$HTTPNetworkRegistrationService$in TRANSIENT_ERROR_STATUS_CODES -> throw ServiceUnavailableException("Could not connect with Doorman. Http response status code was ${conn.responseCode}.") MaxLineLength:HardRestartTest.kt$HardRestartTest$val rpc = tlRpc.get() ?: CordaRPCClient(a.rpcAddress).start(demoUser.username, demoUser.password).proxy.also { tlRpc.set(it) } - MaxLineLength:HibernateColumnConverterTests.kt$HibernateColumnConverterTests$// AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a // cache miss was doing a flush. This also checks that loading during flush does actually work. @Test fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() MaxLineLength:HibernateConfiguration.kt$HibernateConfiguration$CordaMaterializedBlobType : AbstractSingleColumnStandardBasicType MaxLineLength:HibernateConfiguration.kt$HibernateConfiguration$CordaWrapperBinaryType : AbstractSingleColumnStandardBasicType MaxLineLength:HibernateConfiguration.kt$HibernateConfiguration$MapBlobToPostgresByteA : AbstractSingleColumnStandardBasicType @@ -2242,6 +2243,8 @@ MaxLineLength:HibernateConfigurationTest.kt$HibernateConfigurationTest$val joinVaultStatesToCash = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), cashStatesSchema.get<PersistentStateRef>("stateRef")) MaxLineLength:HibernateConfigurationTest.kt$HibernateConfigurationTest$val schemaService = NodeSchemaService(extraSchemas = setOf(CashSchemaV1, SampleCashSchemaV1, SampleCashSchemaV2, SampleCashSchemaV3, DummyLinearStateSchemaV1, DummyLinearStateSchemaV2, DummyDealStateSchemaV1)) MaxLineLength:HibernateConfigurationTest.kt$HibernateConfigurationTest.<no name provided>$override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, database, schemaService, cordappClassloader).apply { start() } + MaxLineLength:HibernateInteractionTests.kt$HibernateInteractionTests$// AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a // cache miss was doing a flush. This also checks that loading during flush does actually work. @Test fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() + MaxLineLength:HibernateInteractionTests.kt$HibernateInteractionTests$@Test fun `when a cascade is in progress (because of nested entities), the node avoids to flush & detach entities, since it's not allowed by Hibernate`() MaxLineLength:HibernateQueryCriteriaParser.kt$AbstractQueryCriteriaParser$abstract MaxLineLength:HibernateQueryCriteriaParser.kt$HibernateAttachmentQueryCriteriaParser$AbstractQueryCriteriaParser<AttachmentQueryCriteria, AttachmentsQueryCriteriaParser, AttachmentSort>(), AttachmentsQueryCriteriaParser MaxLineLength:HibernateQueryCriteriaParser.kt$HibernateAttachmentQueryCriteriaParser$private val criteriaQuery: CriteriaQuery<NodeAttachmentService.DBAttachment> @@ -2369,15 +2372,6 @@ MaxLineLength:InstallShellExtensionsParser.kt$ShellExtensionsGenerator$printWarning("Cannot install shell extension for bash major version earlier than $minSupportedBashVersion. Please upgrade your bash version. Aliases should still work.") MaxLineLength:InstallShellExtensionsParser.kt$ShellExtensionsGenerator$println("Installation complete, ${parent.alias} is available in bash, but autocompletion was not installed because of an old version of bash.") MaxLineLength:InstantSerializer.kt$InstantSerializer : Proxy - MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("attachments", "Commands to extract information about attachments stored within the node", AttachmentShellCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("checkpoints", "Commands to extract information about checkpoints stored within the node", CheckpointShellCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("flow", "Commands to work with flows. Flows are how you can change the ledger.", FlowShellCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("hashLookup", "Checks if a transaction with matching Id hash exists.", HashLookupShellCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("output-format", "Commands to inspect and update the output format.", OutputFormatCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$ExternalResolver.INSTANCE.addCommand("run", "Runs a method from the CordaRPCOps interface on the node.", RunShellCommand::class.java) - MaxLineLength:InteractiveShell.kt$InteractiveShell$val stateObservable = runFlowFromString({ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) }, inputData, flowClazz, inputObjectMapper) - MaxLineLength:InteractiveShell.kt$InteractiveShell${ private val log = LoggerFactory.getLogger(javaClass) private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps private lateinit var ops: InternalCordaRPCOps private lateinit var rpcConn: AutoCloseable private var shell: Shell? = null private var classLoader: ClassLoader? = null private lateinit var shellConfiguration: ShellConfiguration private var onExit: () -> Unit = {} @JvmStatic fun getCordappsClassloader() = classLoader enum class OutputFormat { JSON, YAML } fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) { rpcOps = { username: String, password: String -> val connection = if (standalone) { CordaRPCClient( configuration.hostAndPort, configuration.ssl, classLoader ).start(username, password, gracefulReconnect = GracefulReconnect()) } else { CordaRPCClient( hostAndPort = configuration.hostAndPort, configuration = CordaRPCClientConfiguration.DEFAULT.copy( maxReconnectAttempts = 1 ), sslConfiguration = configuration.ssl, classLoader = classLoader ).start(username, password) } rpcConn = connection connection.proxy as InternalCordaRPCOps } _startShell(configuration, classLoader) } private fun _startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) { shellConfiguration = configuration InteractiveShell.classLoader = classLoader val runSshDaemon = configuration.sshdPort != null val config = Properties() if (runSshDaemon) { // Enable SSH access. Note: these have to be strings, even though raw object assignments also work. config["crash.ssh.port"] = configuration.sshdPort?.toString() config["crash.auth"] = "corda" configuration.sshHostKeyDirectory?.apply { val sshKeysDir = configuration.sshHostKeyDirectory.createDirectories() config["crash.ssh.keypath"] = (sshKeysDir / "hostkey.pem").toString() config["crash.ssh.keygen"] = "true" } } ExternalResolver.INSTANCE.addCommand("output-format", "Commands to inspect and update the output format.", OutputFormatCommand::class.java) ExternalResolver.INSTANCE.addCommand("run", "Runs a method from the CordaRPCOps interface on the node.", RunShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("flow", "Commands to work with flows. Flows are how you can change the ledger.", FlowShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("start", "An alias for 'flow start'", StartShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("hashLookup", "Checks if a transaction with matching Id hash exists.", HashLookupShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("attachments", "Commands to extract information about attachments stored within the node", AttachmentShellCommand::class.java) ExternalResolver.INSTANCE.addCommand("checkpoints", "Commands to extract information about checkpoints stored within the node", CheckpointShellCommand::class.java) shell = ShellLifecycle(configuration.commandsDirectory).start(config, configuration.user, configuration.password) } fun runLocalShell(onExit: () -> Unit = {}) { this.onExit = onExit val terminal = TerminalFactory.create() val consoleReader = ConsoleReader("Corda", FileInputStream(FileDescriptor.`in`), System.out, terminal) val jlineProcessor = JLineProcessor(terminal.isAnsiSupported, shell, consoleReader, System.out) InterruptHandler { jlineProcessor.interrupt() }.install() thread(name = "Command line shell processor", isDaemon = true) { Emoji.renderIfSupported { try { jlineProcessor.run() } catch (e: IndexOutOfBoundsException) { log.warn("Cannot parse malformed command.") } } } thread(name = "Command line shell terminator", isDaemon = true) { // Wait for the shell to finish. jlineProcessor.closed() log.info("Command shell has exited") terminal.restore() onExit.invoke() } } class ShellLifecycle(private val shellCommands: Path) : PluginLifeCycle() { fun start(config: Properties, localUserName: String = "", localUserPassword: String = ""): Shell { val classLoader = this.javaClass.classLoader val classpathDriver = ClassPathMountFactory(classLoader) val fileDriver = FileMountFactory(Utils.getCurrentDirectory()) val extraCommandsPath = shellCommands.toAbsolutePath().createDirectories() val commandsFS = FS.Builder() .register("file", fileDriver) .mount("file:$extraCommandsPath") .register("classpath", classpathDriver) .mount("classpath:/net/corda/tools/shell/") .mount("classpath:/crash/commands/") .build() val confFS = FS.Builder() .register("classpath", classpathDriver) .mount("classpath:/crash") .build() val discovery = object : ServiceLoaderDiscovery(classLoader) { override fun getPlugins(): Iterable<CRaSHPlugin<*>> { // Don't use the Java language plugin (we may not have tools.jar available at runtime), this // will cause any commands using JIT Java compilation to be suppressed. In CRaSH upstream that // is only the 'jmx' command. return super.getPlugins().filterNot { it is JavaLanguage } + CordaAuthenticationPlugin(rpcOps) } } val attributes = emptyMap<String, Any>() val context = PluginContext(discovery, attributes, commandsFS, confFS, classLoader) context.refresh() this.config = config start(context) ops = makeRPCOps(rpcOps, localUserName, localUserPassword) return context.getPlugin(ShellFactory::class.java).create(null, CordaSSHAuthInfo(false, ops, StdoutANSIProgressRenderer)) } } fun nodeInfo() = try { ops.nodeInfo() } catch (e: UndeclaredThrowableException) { throw e.cause ?: e } @JvmStatic fun setOutputFormat(outputFormat: OutputFormat) { this.outputFormat = outputFormat } @JvmStatic fun getOutputFormat(): OutputFormat { return outputFormat } fun createYamlInputMapper(rpcOps: CordaRPCOps): ObjectMapper { // Return a standard Corda Jackson object mapper, configured to use YAML by default and with extra // serializers. return JacksonSupport.createDefaultMapper(rpcOps, YAMLFactory(), true).apply { val rpcModule = SimpleModule().apply { addDeserializer(InputStream::class.java, InputStreamDeserializer) addDeserializer(UniqueIdentifier::class.java, UniqueIdentifierDeserializer) } registerModule(rpcModule) } } private fun createOutputMapper(outputFormat: OutputFormat): ObjectMapper { val factory = when(outputFormat) { OutputFormat.JSON -> JsonFactory() OutputFormat.YAML -> YAMLFactory().disable(YAMLGenerator.Feature.WRITE_DOC_START_MARKER) } return JacksonSupport.createNonRpcMapper(factory).apply { // Register serializers for stateful objects from libraries that are special to the RPC system and don't // make sense to print out to the screen. For classes we own, annotations can be used instead. val rpcModule = SimpleModule().apply { addSerializer(Observable::class.java, ObservableSerializer) addSerializer(InputStream::class.java, InputStreamSerializer) } registerModule(rpcModule) disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) enable(SerializationFeature.INDENT_OUTPUT) } } // TODO: A default renderer could be used, instead of an object mapper. See: http://www.crashub.org/1.3/reference.html#_renderers private var outputFormat = OutputFormat.YAML @VisibleForTesting lateinit var latch: CountDownLatch private set /** * Called from the 'flow' shell command. Takes a name fragment and finds a matching flow, or prints out * the list of options if the request is ambiguous. Then parses [inputData] as constructor arguments using * the [runFlowFromString] method and starts the requested flow. Ctrl-C can be used to cancel. */ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps)) { val matches = try { rpcOps.registeredFlows().filter { nameFragment in it } } catch (e: PermissionException) { output.println(e.message ?: "Access denied", Color.red) return } if (matches.isEmpty()) { output.println("No matching flow found, run 'flow list' to see your options.", Color.red) return } else if (matches.size > 1 && matches.find { it.endsWith(nameFragment)} == null) { output.println("Ambiguous name provided, please be more specific. Your options are:") matches.forEachIndexed { i, s -> output.println("${i + 1}. $s", Color.yellow) } return } val flowName = matches.find { it.endsWith(nameFragment)} ?: matches.single() val flowClazz: Class<FlowLogic<*>> = if (classLoader != null) { uncheckedCast(Class.forName(flowName, true, classLoader)) } else { uncheckedCast(Class.forName(flowName)) } try { // Show the progress tracker on the console until the flow completes or is interrupted with a // Ctrl-C keypress. val stateObservable = runFlowFromString({ clazz, args -> rpcOps.startTrackedFlowDynamic(clazz, *args) }, inputData, flowClazz, inputObjectMapper) latch = CountDownLatch(1) ansiProgressRenderer.render(stateObservable, latch::countDown) // Wait for the flow to end and the progress tracker to notice. By the time the latch is released // the tracker is done with the screen. while (!Thread.currentThread().isInterrupted) { try { latch.await() break } catch (e: InterruptedException) { try { rpcOps.killFlow(stateObservable.id) } finally { Thread.currentThread().interrupt() break } } } output.println("Flow completed with result: ${stateObservable.returnValue.get()}") } catch (e: NoApplicableConstructor) { output.println("No matching constructor found:", Color.red) e.errors.forEach { output.println("- $it", Color.red) } } catch (e: PermissionException) { output.println(e.message ?: "Access denied", Color.red) } catch (e: ExecutionException) { // ignoring it as already logged by the progress handler subscriber } finally { InputStreamDeserializer.closeAll() } } class NoApplicableConstructor(val errors: List<String>) : CordaException(this.toString()) { override fun toString() = (listOf("No applicable constructor for flow. Problems were:") + errors).joinToString(System.lineSeparator()) } /** * Tidies up a possibly generic type name by chopping off the package names of classes in a hard-coded set of * hierarchies that are known to be widely used and recognised, and also not have (m)any ambiguous names in them. * * This is used for printing error messages when something doesn't match. */ private fun maybeAbbreviateGenericType(type: Type, extraRecognisedPackage: String): String { val packagesToAbbreviate = listOf("java.", "net.corda.core.", "kotlin.", extraRecognisedPackage) fun shouldAbbreviate(typeName: String) = packagesToAbbreviate.any { typeName.startsWith(it) } fun abbreviated(typeName: String) = if (shouldAbbreviate(typeName)) typeName.split('.').last() else typeName fun innerLoop(type: Type): String = when (type) { is ParameterizedType -> { val args: List<String> = type.actualTypeArguments.map(::innerLoop) abbreviated(type.rawType.typeName) + '<' + args.joinToString(", ") + '>' } is GenericArrayType -> { innerLoop(type.genericComponentType) + "[]" } is Class<*> -> { if (type.isArray) abbreviated(type.simpleName) else abbreviated(type.name).replace('$', '.') } else -> type.toString() } return innerLoop(type) } @JvmStatic fun killFlowById(id: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps)) { try { val runId = try { inputObjectMapper.readValue(id, StateMachineRunId::class.java) } catch (e: JsonMappingException) { output.println("Cannot parse flow ID of '$id' - expecting a UUID.", Color.red) log.error("Failed to parse flow ID", e) return } if (rpcOps.killFlow(runId)) { output.println("Killed flow $runId", Color.yellow) } else { output.println("Failed to kill flow $runId", Color.red) } } finally { output.flush() } } // TODO: This utility is generally useful and might be better moved to the node class, or an RPC, if we can commit to making it stable API. /** * Given a [FlowLogic] class and a string in one-line Yaml form, finds an applicable constructor and starts * the flow, returning the created flow logic. Useful for lightweight invocation where text is preferable * to statically typed, compiled code. * * See the [StringToMethodCallParser] class to learn more about limitations and acceptable syntax. * * @throws NoApplicableConstructor if no constructor could be found for the given set of types. */ @Throws(NoApplicableConstructor::class) fun <T> runFlowFromString(invoke: (Class<out FlowLogic<T>>, Array<out Any?>) -> FlowProgressHandle<T>, inputData: String, clazz: Class<out FlowLogic<T>>, om: ObjectMapper): FlowProgressHandle<T> { val errors = ArrayList<String>() val parser = StringToMethodCallParser(clazz, om) val nameTypeList = getMatchingConstructorParamsAndTypes(parser, inputData, clazz) try { val args = parser.parseArguments(clazz.name, nameTypeList, inputData) return invoke(clazz, args) } catch (e: StringToMethodCallParser.UnparseableCallException.ReflectionDataMissing) { val argTypes = nameTypeList.map { (_, type) -> type } errors.add("$argTypes: <constructor missing parameter reflection data>") } catch (e: StringToMethodCallParser.UnparseableCallException) { val argTypes = nameTypeList.map { (_, type) -> type } errors.add("$argTypes: ${e.message}") } throw NoApplicableConstructor(errors) } private fun <T> getMatchingConstructorParamsAndTypes(parser: StringToMethodCallParser<FlowLogic<T>>, inputData: String, clazz: Class<out FlowLogic<T>>) : List<Pair<String, Type>> { val errors = ArrayList<String>() val classPackage = clazz.packageName_ lateinit var paramNamesFromConstructor: List<String> for (ctor in clazz.constructors) { // Attempt construction with the given arguments. fun getPrototype(): List<String> { val argTypes = ctor.genericParameterTypes.map { // If the type name is in the net.corda.core or java namespaces, chop off the package name // because these hierarchies don't have (m)any ambiguous names and the extra detail is just noise. maybeAbbreviateGenericType(it, classPackage) } return paramNamesFromConstructor.zip(argTypes).map { (name, type) -> "$name: $type" } } try { paramNamesFromConstructor = parser.paramNamesFromConstructor(ctor) val nameTypeList = paramNamesFromConstructor.zip(ctor.genericParameterTypes) parser.validateIsMatchingCtor(clazz.name, nameTypeList, inputData) return nameTypeList } catch (e: StringToMethodCallParser.UnparseableCallException.MissingParameter) { errors.add("${getPrototype()}: missing parameter ${e.paramName}") } catch (e: StringToMethodCallParser.UnparseableCallException.TooManyParameters) { errors.add("${getPrototype()}: too many parameters") } catch (e: StringToMethodCallParser.UnparseableCallException.ReflectionDataMissing) { val argTypes = ctor.genericParameterTypes.map { it.typeName } errors.add("$argTypes: <constructor missing parameter reflection data>") } catch (e: StringToMethodCallParser.UnparseableCallException) { val argTypes = ctor.genericParameterTypes.map { it.typeName } errors.add("$argTypes: ${e.message}") } } throw NoApplicableConstructor(errors) } // TODO Filtering on error/success when we will have some sort of flow auditing, for now it doesn't make much sense. @JvmStatic fun runStateMachinesView(out: RenderPrintWriter, rpcOps: CordaRPCOps): Any? { val proxy = rpcOps val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed() val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) } val subscriber = FlowWatchPrintingSubscriber(out) stateMachineUpdates.startWith(currentStateMachines).subscribe(subscriber) var result: Any? = subscriber.future if (result is Future<*>) { if (!result.isDone) { out.cls() out.println("Waiting for completion or Ctrl-C ... ") out.flush() } try { result = result.get() } catch (e: InterruptedException) { subscriber.unsubscribe() Thread.currentThread().interrupt() } catch (e: ExecutionException) { throw e.rootCause } catch (e: InvocationTargetException) { throw e.rootCause } } return result } @JvmStatic fun runAttachmentTrustInfoView( out: RenderPrintWriter, rpcOps: InternalCordaRPCOps ): Any { return AttachmentTrustTable(out, rpcOps.attachmentTrustInfos) } @JvmStatic fun runDumpCheckpoints(rpcOps: InternalCordaRPCOps) { rpcOps.dumpCheckpoints() } @JvmStatic fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps, inputObjectMapper: ObjectMapper): Any? { val cmd = input.joinToString(" ").trim { it <= ' ' } if (cmd.startsWith("startflow", ignoreCase = true)) { // The flow command provides better support and startFlow requires special handling anyway due to // the generic startFlow RPC interface which offers no type information with which to parse the // string form of the command. out.println("Please use the 'flow' command to interact with flows rather than the 'run' command.", Color.yellow) return null } else if (cmd.substringAfter(" ").trim().equals("gracefulShutdown", ignoreCase = true)) { return InteractiveShell.gracefulShutdown(out, cordaRPCOps) } var result: Any? = null try { InputStreamSerializer.invokeContext = context val parser = StringToMethodCallParser(CordaRPCOps::class.java, inputObjectMapper) val call = parser.parse(cordaRPCOps, cmd) result = call.call() if (result != null && result !== kotlin.Unit && result !is Void) { result = printAndFollowRPCResponse(result, out, outputFormat) } if (result is Future<*>) { if (!result.isDone) { out.println("Waiting for completion or Ctrl-C ... ") out.flush() } try { result = result.get() } catch (e: InterruptedException) { Thread.currentThread().interrupt() } catch (e: ExecutionException) { throw e.rootCause } catch (e: InvocationTargetException) { throw e.rootCause } } } catch (e: StringToMethodCallParser.UnparseableCallException) { out.println(e.message, Color.red) if (e !is StringToMethodCallParser.UnparseableCallException.NoSuchFile) { out.println("Please try 'man run' to learn what syntax is acceptable") } } catch (e: Exception) { out.println("RPC failed: ${e.rootCause}", Color.red) } finally { InputStreamSerializer.invokeContext = null InputStreamDeserializer.closeAll() } return result } @JvmStatic fun gracefulShutdown(userSessionOut: RenderPrintWriter, cordaRPCOps: CordaRPCOps) { fun display(statements: RenderPrintWriter.() -> Unit) { statements.invoke(userSessionOut) userSessionOut.flush() } var isShuttingDown = false try { display { println("Orchestrating a clean shutdown, press CTRL+C to cancel...") } isShuttingDown = true display { println("...enabling draining mode") println("...waiting for in-flight flows to be completed") } cordaRPCOps.terminate(true) val latch = CountDownLatch(1) @Suppress("DEPRECATION") cordaRPCOps.pendingFlowsCount().updates.doOnError { error -> log.error(error.message) throw error }.doAfterTerminate(latch::countDown).subscribe( // For each update. { (first, second) -> display { println("...remaining: $first / $second") } }, // On error. { error -> if (!isShuttingDown) { display { println("RPC failed: ${error.rootCause}", Color.red) } } }, // When completed. { rpcConn.close() // This will only show up in the standalone Shell, because the embedded one is killed as part of a node's shutdown. display { println("...done, quitting the shell now.") } onExit.invoke() }) while (!Thread.currentThread().isInterrupted) { try { latch.await() break } catch (e: InterruptedException) { try { cordaRPCOps.setFlowsDrainingModeEnabled(false) display { println("...cancelled clean shutdown.") } } finally { Thread.currentThread().interrupt() break } } } } catch (e: StringToMethodCallParser.UnparseableCallException) { display { println(e.message, Color.red) println("Please try 'man run' to learn what syntax is acceptable") } } catch (e: Exception) { if (!isShuttingDown) { display { println("RPC failed: ${e.rootCause}", Color.red) } } } finally { InputStreamSerializer.invokeContext = null InputStreamDeserializer.closeAll() } } private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter, outputFormat: OutputFormat): CordaFuture<Unit> { val outputMapper = createOutputMapper(outputFormat) val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } return maybeFollow(response, mapElement, out) } private class PrintingSubscriber(private val printerFun: (Any?) -> String, private val toStream: PrintWriter) : Subscriber<Any>() { private var count = 0 val future = openFuture<Unit>() init { // The future is public and can be completed by something else to indicate we don't wish to follow // anymore (e.g. the user pressing Ctrl-C). future.then { unsubscribe() } } @Synchronized override fun onCompleted() { toStream.println("Observable has completed") future.set(Unit) } @Synchronized override fun onNext(t: Any?) { count++ toStream.println("Observation $count: " + printerFun(t)) toStream.flush() } @Synchronized override fun onError(e: Throwable) { toStream.println("Observable completed with an error") e.printStackTrace(toStream) future.setException(e) } } private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> { // Match on a couple of common patterns for "important" observables. It's tough to do this in a generic // way because observables can be embedded anywhere in the object graph, and can emit other arbitrary // object graphs that contain yet more observables. So we just look for top level responses that follow // the standard "track" pattern, and print them until the user presses Ctrl-C if (response == null) return doneFuture(Unit) if (response is DataFeed<*, *>) { out.println("Snapshot:") out.println(printerFun(response.snapshot)) out.flush() out.println("Updates:") return printNextElements(response.updates, printerFun, out) } if (response is Observable<*>) { return printNextElements(response, printerFun, out) } out.println(printerFun(response)) return doneFuture(Unit) } private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> { val subscriber = PrintingSubscriber(printerFun, out) uncheckedCast(elements).subscribe(subscriber) return subscriber.future } } - MaxLineLength:InteractiveShell.kt$InteractiveShell.NoApplicableConstructor$override fun toString() MaxLineLength:InteractiveShellIntegrationTest.kt$InteractiveShellIntegrationTest$private MaxLineLength:InterestSwapRestAPI.kt$InterestRateSwapAPI MaxLineLength:InternalAccessTestHelpers.kt$fun <T> ifThrowsAppend(strToAppendFn: () -> String, block: () -> T): T @@ -2604,7 +2598,6 @@ MaxLineLength:MockServices.kt$MockServices$this(cordappLoaderForPackages(cordappPackages), identityService, networkParameters, initialIdentity, moreKeys, keyManagementService) MaxLineLength:MockServices.kt$MockServices$this(cordappLoaderForPackages(cordappPackages), identityService, testNetworkParameters(modifiedTime = Instant.MIN), initialIdentity, moreKeys) MaxLineLength:MockServices.kt$MockServices.Companion$ @JvmStatic @JvmOverloads fun makeTestDatabaseAndMockServices(cordappPackages: List<String>, identityService: IdentityService, initialIdentity: TestIdentity, networkParameters: NetworkParameters = testNetworkParameters(modifiedTime = Instant.MIN), vararg moreKeys: KeyPair): Pair<CordaPersistence, MockServices> - MaxLineLength:MockServices.kt$MockServices.Companion$ @JvmStatic @JvmOverloads fun makeTestDatabaseAndPersistentServices( cordappPackages: List<String>, initialIdentity: TestIdentity, networkParameters: NetworkParameters = testNetworkParameters(modifiedTime = Instant.MIN), moreKeys: Set<KeyPair>, moreIdentities: Set<PartyAndCertificate> ): Pair<CordaPersistence, MockServices> MaxLineLength:MockServices.kt$MockServices.Companion$makeMockMockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys, keyManagementService, schemaService, persistence) MaxLineLength:MockServices.kt$MockServices.Companion$makeMockMockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys.toSet(), keyManagementService, schemaService, database) MaxLineLength:MockServices.kt$MockServices.Companion$return object : MockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys.toTypedArray(), keyManagementService) { override var networkParametersService: NetworkParametersService = MockNetworkParametersStorage(networkParameters) override val vaultService: VaultService = makeVaultService(schemaService, persistence, cordappLoader) override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) { ServiceHubInternal.recordTransactions( statesToRecord, txs as? Collection ?: txs.toList(), validatedTransactions as WritableTransactionStorage, mockStateMachineRecordedTransactionMappingStorage, vaultService as VaultServiceInternal, persistence ) } override fun jdbcSession(): Connection = persistence.createSession() override fun <T : Any?> withEntityManager(block: EntityManager.() -> T): T { return block(contextTransaction.restrictedEntityManager) } override fun withEntityManager(block: Consumer<EntityManager>) { return block.accept(contextTransaction.restrictedEntityManager) } } @@ -3153,7 +3146,6 @@ MaxLineLength:RPCApi.kt$return sessionId(RPC_SESSION_ID_FIELD_NAME, RPC_SESSION_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot extract the session id from client message.") MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$private MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$private val serializationContextWithObservableContext = RpcClientObservableDeSerializer.createContext(serializationContext, observableContext) - MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$return cacheFactory.buildNamed(Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()), "RpcClientProxyHandler_rpcObservable") MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$throw UnsupportedOperationException("Method $calledMethod was added in RPC protocol version $sinceVersion but the server is running $serverProtocolVersion") MaxLineLength:RPCDriver.kt$RPCDriverDSL$val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, driverDSL.driverDirectory / serverName, hostAndPort) MaxLineLength:RPCDriver.kt$RPCDriverDSL.Companion$fun createRpcServerArtemisConfig(maxFileSize: Int, maxBufferedBytesPerClient: Long, baseDirectory: Path, hostAndPort: NetworkHostAndPort): Configuration @@ -3581,17 +3573,14 @@ MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$ContractCreationError : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$ContractRejection : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$InvalidAttachmentException : TransactionVerificationException - MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$MissingNetworkParametersException : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$NotaryChangeInWrongTransactionType : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$OverlappingAttachmentsException : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$PackageOwnershipException : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$SignersMissing : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$TransactionNetworkParameterOrderingException : TransactionVerificationException - MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException$TransactionNotaryMismatchEncumbranceException : TransactionVerificationException MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException.ContractCreationError$internal constructor(txId: SecureHash, contractClass: String, cause: Throwable) : this(txId, contractClass, cause, cause.message ?: "") MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException.ContractRejection$internal constructor(txId: SecureHash, contract: Contract, cause: Throwable) : this(txId, contract.javaClass.name, cause, cause.message ?: "") MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException.PackageOwnershipException$"""The attachment JAR: $attachmentHash containing the class: $invalidClassName is not signed by the owner of package $packageName specified in the network parameters. Please check the source of this attachment and if it is malicious contact your zone operator to report this incident. For details see: https://docs.corda.net/network-map.html#network-parameters""" - MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException.TransactionNotaryMismatchEncumbranceException$"Output state with index $encumberedIndex is assigned to notary [$encumberedNotary], while its encumbrance with index $encumbranceIndex is assigned to notary [$encumbranceNotary]" MaxLineLength:TransactionVerificationException.kt$TransactionVerificationException.UntrustedAttachmentsException$"Please follow the operational steps outlined in https://docs.corda.net/cordapp-build-systems.html#cordapp-contract-attachments to learn more and continue." MaxLineLength:TransactionVerificationException.kt$net.corda.core.contracts.TransactionVerificationException.kt MaxLineLength:TransactionVerificationRequest.kt$TransactionVerificationRequest$@Suppress("MemberVisibilityCanBePrivate") //TODO the use of deprecated toLedgerTransaction need to be revisited as resolveContractAttachment requires attachments of the transactions which created input states... //TODO ...to check contract version non downgrade rule, curretly dummy Attachment if not fund is used which sets contract version to '1' @CordaSerializable @@ -3909,7 +3898,6 @@ ReturnCount:FlowManager.kt$NodeFlowManager.FlowWeightComparator$override fun compare(o1: NodeFlowManager.RegisteredFlowContainer, o2: NodeFlowManager.RegisteredFlowContainer): Int ReturnCount:InteractiveShell.kt$InteractiveShell$ @JvmStatic fun runFlowByNameFragment(nameFragment: String, inputData: String, output: RenderPrintWriter, rpcOps: CordaRPCOps, ansiProgressRenderer: ANSIProgressRenderer, inputObjectMapper: ObjectMapper = createYamlInputMapper(rpcOps)) ReturnCount:InteractiveShell.kt$InteractiveShell$@JvmStatic fun runRPCFromString(input: List<String>, out: RenderPrintWriter, context: InvocationContext<out Any>, cordaRPCOps: CordaRPCOps, inputObjectMapper: ObjectMapper): Any? - ReturnCount:InteractiveShell.kt$InteractiveShell$private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture<Unit> ReturnCount:Interpolators.kt$LinearInterpolator$override fun interpolate(x: Double): Double ReturnCount:JarScanningCordappLoader.kt$JarScanningCordappLoader$private fun parseCordappInfo(manifest: Manifest?, defaultName: String): Cordapp.Info ReturnCount:LocalSerializerFactory.kt$DefaultLocalSerializerFactory$override fun get(actualClass: Class<*>, declaredType: Type): AMQPSerializer<Any> diff --git a/detekt-config.yml b/detekt-config.yml index 033fbdca4e..65bb613bd7 100644 --- a/detekt-config.yml +++ b/detekt-config.yml @@ -10,29 +10,33 @@ complexity: active: true ComplexCondition: active: true + excludes: "**/buildSrc/**" threshold: 4 ComplexMethod: active: true + excludes: "**/buildSrc/**" threshold: 10 ignoreSingleWhenExpression: true LargeClass: active: true - excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt" + excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt,**/buildSrc/**" threshold: 600 LongMethod: active: true - excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt" + excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt,**/buildSrc/**" threshold: 120 LongParameterList: active: true + excludes: "**/buildSrc/**" threshold: 6 ignoreDefaultParameters: false NestedBlockDepth: active: true + excludes: "**/buildSrc/**" threshold: 4 TooManyFunctions: active: true - excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt" + excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt,**/buildSrc/**" thresholdInFiles: 15 thresholdInClasses: 15 thresholdInInterfaces: 15 @@ -41,6 +45,7 @@ complexity: empty-blocks: active: true + excludes: "**/buildSrc/**" EmptyCatchBlock: active: true allowedExceptionNameRegex: "^(_|(ignore|expected).*)" @@ -71,6 +76,7 @@ empty-blocks: exceptions: active: true + excludes: "**/buildSrc/**" TooGenericExceptionCaught: active: true exceptionNames: @@ -92,6 +98,7 @@ exceptions: naming: active: true + excludes: "**/buildSrc/**" ClassNaming: active: true classPattern: '[A-Z$][a-zA-Z0-9$]*' @@ -127,6 +134,7 @@ naming: performance: active: true + excludes: "**/buildSrc/**" ForEachOnRange: active: true SpreadOperator: @@ -136,6 +144,7 @@ performance: potential-bugs: active: true + excludes: "**/buildSrc/**" DuplicateCaseInWhenExpression: active: true EqualsWithHashCodeExist: @@ -149,10 +158,11 @@ style: active: true ForbiddenComment: active: true + excludes: "**/buildSrc/**" values: 'TODO:,FIXME:,STOPSHIP:' MagicNumber: active: true - excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt" + excludes: "**/test/**,**/integration-test/**,**/integration-test-slow/**,**/*Test.kt,**/*Tests.kt,**/buildSrc/**" ignoreNumbers: '-1,0,1,2' ignoreHashCodeFunction: true ignorePropertyDeclaration: false @@ -163,23 +173,30 @@ style: ignoreEnums: false MaxLineLength: active: true + excludes: "**/buildSrc/**" maxLineLength: 140 excludePackageStatements: true excludeImportStatements: true ModifierOrder: active: true + excludes: "**/buildSrc/**" OptionalAbstractKeyword: active: true + excludes: "**/buildSrc/**" ReturnCount: active: true + excludes: "**/buildSrc/**" max: 2 excludedFunctions: "equals" excludeReturnFromLambda: true SafeCast: active: true + excludes: "**/buildSrc/**" ThrowsCount: active: true + excludes: "**/buildSrc/**" max: 2 WildcardImport: active: true + excludes: "**/buildSrc/**" excludeImports: 'java.util.*,kotlinx.android.synthetic.*' \ No newline at end of file diff --git a/docs/source/conf.py b/docs/source/conf.py index 1fa6e4a1e2..f34756ac7d 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -25,7 +25,8 @@ corda_substitutions = { "|quasar_version|" : constants_properties_dict["quasarVersion"], "|platform_version|" : constants_properties_dict["platformVersion"], "|os_branch|" : constants_properties_dict["openSourceBranch"], - "|os_samples_branch|" : constants_properties_dict["openSourceSamplesBranch"] + "|os_samples_branch|" : constants_properties_dict["openSourceSamplesBranch"], + "|jolokia_version|" : constants_properties_dict["jolokiaAgentVersion"] } def setup(app): diff --git a/docs/source/node-administration.rst b/docs/source/node-administration.rst index bb44e666df..379e5efb39 100644 --- a/docs/source/node-administration.rst +++ b/docs/source/node-administration.rst @@ -81,6 +81,12 @@ Note that in production, exposing the database via the node is not recommended. Monitoring your node -------------------- +This section covers monitoring performance and health of a node in Corda Enterprise with Jolokia and Graphite. General best practices for monitoring (e.g. setting up TCP checks for the ports the node communicates on, database health checks etc.) are not covered here but should be followed. + + +Monitoring via Jolokia +++++++++++++++++++++++ + Like most Java servers, the node can be configured to export various useful metrics and management operations via the industry-standard `JMX infrastructure `_. JMX is a standard API for registering so-called *MBeans* ... objects whose properties and methods are intended for server management. As Java @@ -106,8 +112,12 @@ Here are a few ways to build dashboards and extract monitoring data for a node: It can bridge any data input to any output using their plugin system, for example, Telegraf can be configured to collect data from Jolokia and write to DataDog web api. -The Node configuration parameter `jmxMonitoringHttpPort` has to be present in order to ensure a Jolokia agent is instrumented with -the JVM run-time. +In order to ensure that a Jolokia agent is instrumented with the JVM run-time, you can choose one of these options: + +* Specify the Node configuration parameter ``jmxMonitoringHttpPort`` which will attempt to load the jolokia driver from the ``drivers`` folder. + The format of the driver name needs to be ``jolokia-jvm-{VERSION}-agent.jar`` where VERSION is the version required by Corda, currently |jolokia_version|. +* Start the node with ``java -Dcapsule.jvm.args="-javaagent:drivers/jolokia-jvm-1.6.0-agent.jar=port=7777,host=localhost" -jar corda.jar``. + The following JMX statistics are exported: @@ -126,6 +136,8 @@ via a file called ``jolokia-access.xml``. Several Jolokia policy based security configuration files (``jolokia-access.xml``) are available for dev, test, and prod environments under ``/config/``. +To pass a security policy use ``java -Dcapsule.jvm.args=-javaagent:./drivers/jolokia-jvm-1.6.0-agent.jar,policyLocation=file:./config-path/jolokia-access.xml -jar corda.jar`` + Notes for development use +++++++++++++++++++++++++ diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt index 47cf9ab543..bbeff4adce 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt @@ -33,6 +33,7 @@ import java.io.File import java.net.URL import java.nio.file.FileAlreadyExistsException import java.nio.file.Path +import java.nio.file.Paths import java.nio.file.StandardCopyOption.REPLACE_EXISTING import java.security.PublicKey import java.time.Duration @@ -68,7 +69,7 @@ internal constructor(private val initSerEnv: Boolean, companion object { // TODO This will probably need to change once we start using a bundled JVM private val nodeInfoGenCmd = listOf( - "java", + Paths.get(System.getProperty("java.home"), "bin", "java").toString(), "-jar", "corda.jar", "generate-node-info" diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt index 4ccfff8ddb..95f950d13c 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt @@ -25,9 +25,6 @@ class DatabaseTransaction( ) { val id: UUID = UUID.randomUUID() - val flushing: Boolean get() = _flushingCount > 0 - private var _flushingCount = 0 - val connection: Connection by lazy(LazyThreadSafetyMode.NONE) { database.dataSource.connection.apply { autoCommit = false @@ -37,27 +34,6 @@ class DatabaseTransaction( private val sessionDelegate = lazy { val session = database.entityManagerFactory.withOptions().connection(connection).openSession() - session.addEventListeners(object : BaseSessionEventListener() { - override fun flushStart() { - _flushingCount++ - super.flushStart() - } - - override fun flushEnd(numberOfEntities: Int, numberOfCollections: Int) { - super.flushEnd(numberOfEntities, numberOfCollections) - _flushingCount-- - } - - override fun partialFlushStart() { - _flushingCount++ - super.partialFlushStart() - } - - override fun partialFlushEnd(numberOfEntities: Int, numberOfCollections: Int) { - super.partialFlushEnd(numberOfEntities, numberOfCollections) - _flushingCount-- - } - }) hibernateTransaction = session.beginTransaction() session } diff --git a/node/build.gradle b/node/build.gradle index cbcc56f69f..ce88a4bf56 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -3,7 +3,12 @@ buildscript { def properties = new Properties() file("$projectDir/src/main/resources/build.properties").withInputStream { properties.load(it) } - ext.jolokia_version = properties.getProperty('jolokiaAgentVersion') + + Properties constants = new Properties() + file("$rootDir/constants.properties").withInputStream { constants.load(it) } + + + ext.jolokia_version = constants.getProperty('jolokiaAgentVersion') dependencies { classpath group: 'com.github.docker-java', name: 'docker-java', version: '3.1.5' diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index b02d0a6c76..d1f2f7e7b2 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -150,6 +150,7 @@ internal class CordaRPCOpsImpl( override fun killFlow(id: StateMachineRunId): Boolean = if (smm.killFlow(id)) true else smm.flowHospital.dropSessionInit(id.uuid) override fun stateMachinesFeed(): DataFeed, StateMachineUpdate> { + val (allStateMachines, changes) = smm.track() return DataFeed( allStateMachines.map { stateMachineInfoFromFlowLogic(it) }, diff --git a/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt index b1cf9d3e9f..28ec775b46 100644 --- a/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt +++ b/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt @@ -6,13 +6,10 @@ import net.corda.core.identity.Party import net.corda.core.identity.PartyAndCertificate import net.corda.core.identity.x500Matches import net.corda.core.internal.CertRole -import net.corda.core.internal.hash -import net.corda.core.node.services.IdentityService import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.contextLogger import net.corda.core.utilities.trace import net.corda.node.services.api.IdentityServiceInternal -import net.corda.node.services.persistence.WritablePublicKeyToOwningIdentityCache import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.crypto.x509Certificates import java.security.InvalidAlgorithmParameterException @@ -101,6 +98,10 @@ class InMemoryIdentityService( return keyToPartyAndCerts[identityCertChain[1].publicKey] } + override fun partyFromKey(key: PublicKey): Party? { + return certificateFromKey(key)?.party ?: keyToName[key.toStringShort()]?.let { wellKnownPartyFromX500Name(it) } + } + override fun certificateFromKey(owningKey: PublicKey): PartyAndCertificate? = keyToPartyAndCerts[owningKey] // We give the caller a copy of the data set to avoid any locking problems diff --git a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt index 6dd70ea852..b86355302c 100644 --- a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt +++ b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt @@ -296,6 +296,12 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri keyToPartyAndCert[owningKey.toStringShort()] } + override fun partyFromKey(key: PublicKey): Party? { + return certificateFromKey(key)?.party ?: database.transaction { + keyToName[key.toStringShort()] + }?.let { wellKnownPartyFromX500Name(it) } + } + private fun certificateFromCordaX500Name(name: CordaX500Name): PartyAndCertificate? { return database.transaction { val partyId = nameToKey[name] diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowLogicRefFactoryImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowLogicRefFactoryImpl.kt index 2d8695de5c..dfd2b1a8db 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowLogicRefFactoryImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowLogicRefFactoryImpl.kt @@ -5,6 +5,8 @@ import net.corda.core.flows.* import net.corda.core.internal.VisibleForTesting import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.contextLogger +import org.slf4j.Logger import java.lang.reflect.ParameterizedType import java.lang.reflect.Type import java.lang.reflect.TypeVariable @@ -33,7 +35,12 @@ data class FlowLogicRefImpl internal constructor(val flowLogicClassName: String, * in response to a potential malicious use or buggy update to an app etc. */ // TODO: Replace with a per app classloader/cordapp provider/cordapp loader - this will do for now +@Suppress("ReturnCount", "TooManyFunctions") open class FlowLogicRefFactoryImpl(private val classloader: ClassLoader) : SingletonSerializeAsToken(), FlowLogicRefFactory { + companion object { + private val log: Logger = contextLogger() + } + override fun create(flowClass: Class>, vararg args: Any?): FlowLogicRef { if (!flowClass.isAnnotationPresent(SchedulableFlow::class.java)) { throw IllegalFlowLogicException(flowClass, "because it's not a schedulable flow") @@ -76,20 +83,100 @@ open class FlowLogicRefFactoryImpl(private val classloader: ClassLoader) : Singl return createKotlin(flowClass, argsMap) } - protected open fun findConstructor(flowClass: Class>, argTypes: List?>): KFunction> { + private fun matchConstructorArgs(ctorTypes: List>, optional: List, + argTypes: List?>): Pair { + // There must be at least as many constructor arguments as supplied arguments + if (argTypes.size > ctorTypes.size) { + return Pair(false, 0) + } + + // Check if all constructor arguments are assignable for all supplied arguments, then for remaining arguments in constructor + // check that they are optional. If they are it's still a match. Return if matched and the number of default args consumed. + var numDefaultsUsed = 0 + var index = 0 + for (conArg in ctorTypes) { + if (index < argTypes.size) { + val argType = argTypes[index] + if (argType != null && !conArg.isAssignableFrom(argType)) { + return Pair(false, 0) + } + } else { + if (index >= optional.size || !optional[index]) { + return Pair(false, 0) + } + numDefaultsUsed++ + } + index++ + } + + return Pair(true, numDefaultsUsed) + } + + private fun handleNoMatchingConstructor(flowClass: Class>, argTypes: List?>) { + log.error("Cannot find Constructor to match arguments: ${argTypes.joinToString()}") + log.info("Candidate constructors are:") + for (ctor in flowClass.kotlin.constructors) { + log.info("${ctor}") + } + } + + private fun findConstructorCheckDefaultParams(flowClass: Class>, argTypes: List?>): + KFunction> { + // There may be multiple matches. If there are, we will use the one with the least number of default parameter matches. + var ctorMatch: KFunction>? = null + var matchNumDefArgs = 0 + for (ctor in flowClass.kotlin.constructors) { + // Get the types of the arguments, always boxed (as that's what we get in the invocation). + val ctorTypes = ctor.javaConstructor!!.parameterTypes.map { + if (it == null) { it } else { Primitives.wrap(it) } + } + + val optional = ctor.parameters.map { it.isOptional } + val (matched, numDefaultsUsed) = matchConstructorArgs(ctorTypes, optional, argTypes) + if (matched) { + if (ctorMatch == null || numDefaultsUsed < matchNumDefArgs) { + ctorMatch = ctor + matchNumDefArgs = numDefaultsUsed + } + } + } + + if (ctorMatch == null) { + handleNoMatchingConstructor(flowClass, argTypes) + // Must do the throw here, not in handleNoMatchingConstructor(added for Detekt) else we can't return ctorMatch as non-null + throw IllegalFlowLogicException(flowClass, "No constructor found that matches arguments (${argTypes.joinToString()}), " + + "see log for more information.") + } + + log.info("Matched constructor: ${ctorMatch} (num_default_args_used=$matchNumDefArgs)") + return ctorMatch + } + + private fun findConstructorDirectMatch(flowClass: Class>, argTypes: List?>): KFunction> { return flowClass.kotlin.constructors.single { ctor -> // Get the types of the arguments, always boxed (as that's what we get in the invocation). val ctorTypes = ctor.javaConstructor!!.parameterTypes.map { Primitives.wrap(it) } if (argTypes.size != ctorTypes.size) return@single false for ((argType, ctorType) in argTypes.zip(ctorTypes)) { - if (argType == null) continue // Try and find a match based on the other arguments. + if (argType == null) continue // Try and find a match based on the other arguments. if (!ctorType.isAssignableFrom(argType)) return@single false } true } } + protected open fun findConstructor(flowClass: Class>, argTypes: List?>): KFunction> { + try { + return findConstructorDirectMatch(flowClass, argTypes) + } catch(e: java.lang.IllegalArgumentException) { + log.trace("findConstructorDirectMatch threw IllegalArgumentException (more than 1 matches).") + } catch (e: NoSuchElementException) { + log.trace("findConstructorDirectMatch threw NoSuchElementException (no matches).") + } + return findConstructorCheckDefaultParams(flowClass, argTypes) + } + /** * Create a [FlowLogicRef] by trying to find a Kotlin constructor that matches the given args. * diff --git a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt index 6ccf10b441..f45ddbb7cf 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt @@ -7,6 +7,8 @@ import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.nodeapi.internal.persistence.contextTransaction import net.corda.nodeapi.internal.persistence.currentDBSession +import org.hibernate.Session +import org.hibernate.internal.SessionImpl import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean @@ -191,14 +193,23 @@ abstract class AppendOnlyPersistentMapBase( private fun loadValue(key: K): V? { val session = currentDBSession() - val flushing = contextTransaction.flushing - if (!flushing) { + val isSafeToDetach = isSafeToFlushAndDetach(session) + if (isSafeToDetach) { // IMPORTANT: The flush is needed because detach() makes the queue of unflushed entries invalid w.r.t. Hibernate internal state if the found entity is unflushed. // We want the detach() so that we rely on our cache memory management and don't retain strong references in the Hibernate session. session.flush() } val result = session.find(persistentEntityClass, toPersistentEntityKey(key)) - return result?.apply { if (!flushing) session.detach(result) }?.let(fromPersistentEntity)?.second + return result?.apply { if (isSafeToDetach) session.detach(result) }?.let(fromPersistentEntity)?.second + } + + private fun isSafeToFlushAndDetach(session: Session): Boolean { + if (session !is SessionImpl) + return true + + val flushInProgress = session.persistenceContext.isFlushing + val cascadeInProgress = session.persistenceContext.cascadeLevel > 0 + return !flushInProgress && !cascadeInProgress } protected fun transactionalLoadValue(key: K): Transactional { diff --git a/node/src/main/resources/build.properties b/node/src/main/resources/build.properties index cbe490156a..0b42fe7292 100644 --- a/node/src/main/resources/build.properties +++ b/node/src/main/resources/build.properties @@ -1,4 +1,6 @@ # Build constants exported as resource file to make them visible in Node program # Note: sadly, due to present limitation of IntelliJ-IDEA in processing resource files, these constants cannot be # imported from top-level 'constants.properties' file -jolokiaAgentVersion=1.6.1 +#jolokiaAgentVersion=1.6.1 + + diff --git a/node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt index c364bf96ee..989312d392 100644 --- a/node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt @@ -261,6 +261,17 @@ class PersistentIdentityServiceTests { } } + @Test + fun `resolve key to party for key without certificate`() { + // Register Alice's PartyAndCert as if it was done so via the network map cache. + identityService.verifyAndRegisterIdentity(alice.identity) + // Use a key which is not tied to a cert. + val publicKey = Crypto.generateKeyPair().public + // Register the PublicKey to Alice's CordaX500Name. + identityService.registerKey(publicKey, alice.party) + assertEquals(alice.party, identityService.partyFromKey(publicKey)) + } + @Test fun `register incorrect party to public key `(){ database.transaction { identityService.verifyAndRegisterIdentity(ALICE_IDENTITY) } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateColumnConverterTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateColumnConverterTests.kt deleted file mode 100644 index 175fab84e3..0000000000 --- a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateColumnConverterTests.kt +++ /dev/null @@ -1,82 +0,0 @@ -package net.corda.node.services.persistence - -import net.corda.core.identity.CordaX500Name -import net.corda.core.identity.Party -import net.corda.core.transactions.SignedTransaction -import net.corda.core.transactions.TransactionBuilder -import net.corda.core.utilities.OpaqueBytes -import net.corda.finance.DOLLARS -import net.corda.finance.`issued by` -import net.corda.finance.contracts.asset.Cash -import net.corda.finance.issuedBy -import net.corda.node.services.identity.PersistentIdentityService -import net.corda.node.services.keys.E2ETestKeyManagementService -import net.corda.nodeapi.internal.persistence.CordaPersistence -import net.corda.testing.common.internal.testNetworkParameters -import net.corda.testing.core.SerializationEnvironmentRule -import net.corda.testing.core.TestIdentity -import net.corda.testing.internal.TestingNamedCacheFactory -import net.corda.testing.node.MockServices -import org.junit.Before -import org.junit.Rule -import org.junit.Test -import kotlin.test.assertEquals - -class HibernateColumnConverterTests { - - @Rule - @JvmField - val testSerialization = SerializationEnvironmentRule() - - private val cordapps = listOf("net.corda.finance") - - private val myself = TestIdentity(CordaX500Name("Me", "London", "GB")) - private val notary = TestIdentity(CordaX500Name("NotaryService", "London", "GB"), 1337L) - - lateinit var services: MockServices - lateinit var database: CordaPersistence - - @Before - fun setUp() { - val (db, mockServices) = MockServices.makeTestDatabaseAndPersistentServices( - cordappPackages = cordapps, - initialIdentity = myself, - networkParameters = testNetworkParameters(minimumPlatformVersion = 4), - moreIdentities = setOf(notary.identity), - moreKeys = emptySet() - ) - services = mockServices - database = db - } - - // AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a - // cache miss was doing a flush. This also checks that loading during flush does actually work. - @Test - fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() { - val expected = 500.DOLLARS - val ref = OpaqueBytes.of(0x01) - - // Create parallel set of key and identity services so that the values are not cached, forcing the node caches to do a lookup. - val cacheFactory = TestingNamedCacheFactory() - val identityService = PersistentIdentityService(cacheFactory) - val originalIdentityService: PersistentIdentityService = services.identityService as PersistentIdentityService - identityService.database = originalIdentityService.database - identityService.start(originalIdentityService.trustRoot, pkToIdCache = PublicKeyToOwningIdentityCacheImpl(database, cacheFactory)) - val keyService = E2ETestKeyManagementService(identityService) - keyService.start(setOf(myself.keyPair)) - - // New identity for a notary (doesn't matter that it's for Bank Of Corda... since not going to use it as an actual notary etc). - val newKeyAndCert = keyService.freshKeyAndCert(services.myInfo.legalIdentitiesAndCerts[0], false) - val randomNotary = Party(myself.name, newKeyAndCert.owningKey) - - val ourIdentity = services.myInfo.legalIdentities.first() - val builder = TransactionBuilder(notary.party) - val issuer = services.myInfo.legalIdentities.first().ref(ref) - val signers = Cash().generateIssue(builder, expected.issuedBy(issuer), ourIdentity, randomNotary) - val tx: SignedTransaction = services.signInitialTransaction(builder, signers) - services.recordTransactions(tx) - - val output = tx.tx.outputsOfType().single() - assertEquals(expected.`issued by`(ourIdentity.ref(ref)), output.amount) - } -} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateInteractionTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateInteractionTests.kt new file mode 100644 index 0000000000..3f321008c1 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateInteractionTests.kt @@ -0,0 +1,168 @@ +package net.corda.node.services.persistence + +import net.corda.core.contracts.BelongsToContract +import net.corda.core.contracts.Contract +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.TransactionState +import net.corda.core.contracts.TypeOnlyCommandData +import net.corda.core.identity.AbstractParty +import net.corda.core.identity.CordaX500Name +import net.corda.core.schemas.MappedSchema +import net.corda.core.schemas.PersistentState +import net.corda.core.schemas.QueryableState +import net.corda.core.serialization.CordaSerializable +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.OpaqueBytes +import net.corda.finance.DOLLARS +import net.corda.finance.`issued by` +import net.corda.finance.contracts.asset.Cash +import net.corda.finance.issuedBy +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.testing.common.internal.testNetworkParameters +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.core.TestIdentity +import net.corda.testing.internal.TestingNamedCacheFactory +import net.corda.testing.node.MockServices +import org.assertj.core.api.Assertions.assertThat +import org.hibernate.annotations.Cascade +import org.hibernate.annotations.CascadeType +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import java.lang.IllegalArgumentException +import javax.persistence.Entity +import javax.persistence.Id +import javax.persistence.OneToMany +import javax.persistence.Table +import javax.persistence.GeneratedValue +import javax.persistence.GenerationType +import kotlin.test.assertEquals + +/** + * These tests cover the interactions between Corda and Hibernate with regards to flushing/detaching/cascading. + */ +class HibernateInteractionTests { + + @Rule + @JvmField + val testSerialization = SerializationEnvironmentRule() + + private val cordapps = listOf("net.corda.finance", "net.corda.node.services.persistence") + + private val myself = TestIdentity(CordaX500Name("Me", "London", "GB")) + private val notary = TestIdentity(CordaX500Name("NotaryService", "London", "GB"), 1337L) + + lateinit var services: MockServices + lateinit var database: CordaPersistence + + @Before + fun setUp() { + val (db, mockServices) = MockServices.makeTestDatabaseAndPersistentServices( + cordappPackages = cordapps, + initialIdentity = myself, + networkParameters = testNetworkParameters(minimumPlatformVersion = 4), + moreIdentities = setOf(notary.identity), + moreKeys = emptySet(), + // forcing a cache size of zero, so that all requests lead to a cache miss and end up hitting the database + cacheFactory = TestingNamedCacheFactory(0) + ) + services = mockServices + database = db + } + + // AbstractPartyToX500NameAsStringConverter could cause circular flush of Hibernate session because it is invoked during flush, and a + // cache miss was doing a flush. This also checks that loading during flush does actually work. + @Test + fun `issue some cash on a notary that exists only in the database to check cache loading works in our identity column converters during flush of vault update`() { + val expected = 500.DOLLARS + val ref = OpaqueBytes.of(0x01) + + val ourIdentity = services.myInfo.legalIdentities.first() + val builder = TransactionBuilder(notary.party) + val issuer = services.myInfo.legalIdentities.first().ref(ref) + val signers = Cash().generateIssue(builder, expected.issuedBy(issuer), ourIdentity, notary.party) + val tx: SignedTransaction = services.signInitialTransaction(builder, signers) + services.recordTransactions(tx) + + val output = tx.tx.outputsOfType().single() + assertEquals(expected.`issued by`(ourIdentity.ref(ref)), output.amount) + } + + @Test + fun `when a cascade is in progress (because of nested entities), the node avoids to flush & detach entities, since it's not allowed by Hibernate`() { + val ourIdentity = services.myInfo.legalIdentities.first() + + val childEntities = listOf(SimpleContract.ChildState(ourIdentity)) + val parentEntity = SimpleContract.ParentState(childEntities) + + val builder = TransactionBuilder(notary.party) + .addOutputState(TransactionState(parentEntity, SimpleContract::class.java.name, notary.party)) + .addCommand(SimpleContract.Issue(), listOf(ourIdentity.owningKey)) + val tx: SignedTransaction = services.signInitialTransaction(builder, listOf(ourIdentity.owningKey)) + services.recordTransactions(tx) + + val output = tx.tx.outputsOfType().single() + assertThat(output.children.single().member).isEqualTo(ourIdentity) + } + + object PersistenceSchema: MappedSchema(PersistenceSchema::class.java, 1, listOf(Parent::class.java, Child::class.java)) { + + @Entity(name = "parents") + @Table + class Parent: PersistentState() { + + @Cascade(CascadeType.ALL) + @OneToMany(targetEntity = Child::class) + val children: MutableCollection = mutableSetOf() + + fun addChild(child: Child) { + children.add(child) + } + } + + @Entity(name = "children") + class Child( + @Id + // Do not change this: this generation type is required in order to trigger the proper cascade ordering. + @GeneratedValue(strategy = GenerationType.IDENTITY) + val identifier: Int?, + + val member: AbstractParty? + ) { + constructor(member: AbstractParty): this(null, member) + } + + } + + class SimpleContract: Contract { + + @BelongsToContract(SimpleContract::class) + @CordaSerializable + data class ParentState(val children: List): ContractState, QueryableState { + override fun supportedSchemas(): Iterable = listOf(PersistenceSchema) + + override fun generateMappedObject(schema: MappedSchema): PersistentState { + return when(schema) { + is PersistenceSchema -> { + val parent = PersistenceSchema.Parent() + children.forEach { parent.addChild(PersistenceSchema.Child(it.member)) } + parent + } + else -> throw IllegalArgumentException("Unrecognised schema $schema") + } + } + + override val participants: List = children.map { it.member } + } + + @CordaSerializable + data class ChildState(val member: AbstractParty) + + override fun verify(tx: LedgerTransaction) {} + + class Issue: TypeOnlyCommandData() + } + +} \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index 27cb01c617..01bd936cbe 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -139,10 +139,12 @@ open class MockServices private constructor( * Makes database and persistent services appropriate for unit tests which require persistence across the vault, identity service * and key managment service. * - * @param cordappPackages A [List] of cordapp packages to scan for any cordapp code, e.g. contract verification code, flows and services. + * @param cordappPackages A [List] of cordapp packages to scan for any cordapp code, e.g. contract verification code, + * flows and services. * @param initialIdentity The first (typically sole) identity the services will represent. * @param moreKeys A list of additional [KeyPair] instances to be used by [MockServices]. * @param moreIdentities A list of additional [KeyPair] instances to be used by [MockServices]. + * @param cacheFactory A custom cache factory to be used by the created [IdentityService] * @return A pair where the first element is the instance of [CordaPersistence] and the second is [MockServices]. */ @JvmStatic @@ -152,12 +154,13 @@ open class MockServices private constructor( initialIdentity: TestIdentity, networkParameters: NetworkParameters = testNetworkParameters(modifiedTime = Instant.MIN), moreKeys: Set, - moreIdentities: Set + moreIdentities: Set, + cacheFactory: TestingNamedCacheFactory = TestingNamedCacheFactory() ): Pair { val cordappLoader = cordappLoaderForPackages(cordappPackages) val dataSourceProps = makeTestDataSourceProperties() val schemaService = NodeSchemaService(cordappLoader.cordappSchemas) - val identityService = PersistentIdentityService(TestingNamedCacheFactory()) + val identityService = PersistentIdentityService(cacheFactory) val persistence = configureDatabase( hikariProperties = dataSourceProps, databaseConfig = DatabaseConfig(), @@ -167,7 +170,7 @@ open class MockServices private constructor( internalSchemas = schemaService.internalSchemas() ) - val pkToIdCache = PublicKeyToOwningIdentityCacheImpl(persistence, TestingNamedCacheFactory()) + val pkToIdCache = PublicKeyToOwningIdentityCacheImpl(persistence, cacheFactory) // Create a persistent identity service and add all the supplied identities. identityService.apply { diff --git a/tools/network-builder/src/main/kotlin/net/corda/networkbuilder/notaries/NotaryCopier.kt b/tools/network-builder/src/main/kotlin/net/corda/networkbuilder/notaries/NotaryCopier.kt index 7e476905e0..7e4210d144 100644 --- a/tools/network-builder/src/main/kotlin/net/corda/networkbuilder/notaries/NotaryCopier.kt +++ b/tools/network-builder/src/main/kotlin/net/corda/networkbuilder/notaries/NotaryCopier.kt @@ -6,6 +6,7 @@ import net.corda.networkbuilder.nodes.FoundNode import net.corda.networkbuilder.nodes.NodeCopier import org.slf4j.LoggerFactory import java.io.File +import java.nio.file.Paths class NotaryCopier(private val cacheDir: File) : NodeCopier(cacheDir) { @@ -28,7 +29,9 @@ class NotaryCopier(private val cacheDir: File) : NodeCopier(cacheDir) { fun generateNodeInfo(dirToGenerateFrom: File): File { val nodeInfoGeneratorProcess = ProcessBuilder() - .command(listOf("java", "-jar", "corda.jar", "generate-node-info")) + .command(listOf( + Paths.get(System.getProperty("java.home"), "bin", "java").toString(), + "-jar", "corda.jar", "generate-node-info")) .directory(dirToGenerateFrom) .inheritIO() .start() diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index ac5edb6e7e..a78edf970b 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -13,6 +13,7 @@ import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.GracefulReconnect import net.corda.client.rpc.PermissionException +import net.corda.client.rpc.notUsed import net.corda.core.CordaException import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.UniqueIdentifier @@ -70,6 +71,7 @@ import kotlin.concurrent.thread // TODO: Resurrect or reimplement the mail plugin. // TODO: Make it notice new shell commands added after the node started. +@Suppress("MaxLineLength") object InteractiveShell { private val log = LoggerFactory.getLogger(javaClass) private lateinit var rpcOps: (username: String, password: String) -> InternalCordaRPCOps @@ -521,8 +523,11 @@ object InteractiveShell { val parser = StringToMethodCallParser(CordaRPCOps::class.java, inputObjectMapper) val call = parser.parse(cordaRPCOps, cmd) result = call.call() + var subscription : Subscriber<*>? = null if (result != null && result !== kotlin.Unit && result !is Void) { - result = printAndFollowRPCResponse(result, out, outputFormat) + val (subs, future) = printAndFollowRPCResponse(result, out, outputFormat) + subscription = subs + result = future } if (result is Future<*>) { if (!result.isDone) { @@ -532,6 +537,7 @@ object InteractiveShell { try { result = result.get() } catch (e: InterruptedException) { + subscription?.unsubscribe() Thread.currentThread().interrupt() } catch (e: ExecutionException) { throw e.rootCause @@ -621,7 +627,11 @@ object InteractiveShell { } } - private fun printAndFollowRPCResponse(response: Any?, out: PrintWriter, outputFormat: OutputFormat): CordaFuture { + private fun printAndFollowRPCResponse( + response: Any?, + out: PrintWriter, + outputFormat: OutputFormat + ): Pair> { val outputMapper = createOutputMapper(outputFormat) val mapElement: (Any?) -> String = { element -> outputMapper.writerWithDefaultPrettyPrinter().writeValueAsString(element) } @@ -659,34 +669,52 @@ object InteractiveShell { } } - private fun maybeFollow(response: Any?, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture { + private fun maybeFollow( + response: Any?, + printerFun: (Any?) -> String, + out: PrintWriter + ): Pair> { // Match on a couple of common patterns for "important" observables. It's tough to do this in a generic // way because observables can be embedded anywhere in the object graph, and can emit other arbitrary // object graphs that contain yet more observables. So we just look for top level responses that follow // the standard "track" pattern, and print them until the user presses Ctrl-C - if (response == null) return doneFuture(Unit) + var result = Pair>(null, doneFuture(Unit)) - if (response is DataFeed<*, *>) { - out.println("Snapshot:") - out.println(printerFun(response.snapshot)) - out.flush() - out.println("Updates:") - return printNextElements(response.updates, printerFun, out) + + when { + response is DataFeed<*, *> -> { + out.println("Snapshot:") + out.println(printerFun(response.snapshot)) + out.flush() + out.println("Updates:") + + val unsubscribeAndPrint: (Any?) -> String = { resp -> + if (resp is StateMachineUpdate.Added) { + resp.stateMachineInfo.progressTrackerStepAndUpdates?.updates?.notUsed() + } + printerFun(resp) + } + + result = printNextElements(response.updates, unsubscribeAndPrint, out) + } + response is Observable<*> -> { + result = printNextElements(response, printerFun, out) + } + response != null -> { + out.println(printerFun(response)) + } } - if (response is Observable<*>) { - - return printNextElements(response, printerFun, out) - } - - out.println(printerFun(response)) - return doneFuture(Unit) + return result } - private fun printNextElements(elements: Observable<*>, printerFun: (Any?) -> String, out: PrintWriter): CordaFuture { - + private fun printNextElements( + elements: Observable<*>, + printerFun: (Any?) -> String, + out: PrintWriter + ): Pair> { val subscriber = PrintingSubscriber(printerFun, out) uncheckedCast(elements).subscribe(subscriber) - return subscriber.future + return Pair(subscriber, subscriber.future) } } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/utlities/ANSIProgressRenderer.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/utlities/ANSIProgressRenderer.kt index 20037be4bc..f670d5388f 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/utlities/ANSIProgressRenderer.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/utlities/ANSIProgressRenderer.kt @@ -177,7 +177,7 @@ abstract class ANSIProgressRenderer { ansi.fgRed() ansi.a("${IntStream.range(indent, indent).mapToObj { "\t" }.toList().joinToString(separator = "") { s -> s }} $errorIcon ${error.message}") ansi.reset() - errorToPrint = error.cause + errorToPrint = errorToPrint.cause indent++ } ansi.eraseLine(Ansi.Erase.FORWARD)