From 0978500a9a01324feafd78abe391033268ff377f Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Tue, 21 Jan 2020 13:38:02 +0000 Subject: [PATCH] CORDA-2942: Node lifecycle events (#5846) * CORDA-2942: Port minimal set of changes to make lifecycle events work ... and make codebase compile. * CORDA-2942: Undo some changes which are not strictly speaking necessary * CORDA-2942: Make `NodeServicesContext` leaner and delete `extensions-api` module * CORDA-2942: Reduce even more number of files affected * CORDA-2942: Integration test fix * CORDA-2942: Make events `AfterStart` and `BeforeStop` generic w.r.t. `NodeServicesContext` * CORDA-2942: `NodeLifecycleObserverService` and a set of integration tests. Public API violations are expected as well as integration tests failing. * CORDA-2942: Re-work to introduce `ServiceLifecycleObserver` * CORDA-2942: Explicitly mention a type of exception that may be thrown for some events. * CORDA-2942: Register `ServiceLifecycleObserver` through `AppServiceHub` * CORDA-2942: Fix integration test + KDocs update * CORDA-2942: Detekt and `api-current` update * CORDA-2942: Improvement to `CordaServiceLifecycleFatalTests` ... or else it has side effects on other tests. * CORDA-2942: Add an integration test for new API use in Java Driver test is written in Kotlin, but services definition is written in Java. Also KDocs improvements. * CORDA-2942: Documentation and release notes update * CORDA-2942: First set of changes following review by @mnesbit * CORDA-2942: Second set of changes following review by @mnesbit * CORDA-2942: Added multi-threaded test * CORDA-2942: Fixes * CORDA-2942: Undo changes to `api-current.txt` * CORDA-2942: Bare mimimum change to `api-current.txt` for CI gate to pass. * CORDA-2942: Address review feedback from @rick-r3 * CORDA-2942: Detekt update * CORDA-2942: Delete `ServiceLifecycleObserverPriority` and replace it with `Int` after discussion with @mnesbit * CORDA-2942: Introduce more `NodeLifecycleEvent` and switch services to listen for those events * CORDA-2942: Few more changes after input from @rick-r3 * First stub on integration test Unfinished - hang on issue and pay * CORDA-2942: Switch to use out-of-process nodes for the inetgration test Currently Alice and Notary stuck waiting to hear from each other. * CORDA-2942: Extra log lines during event distribution * CORDA-2942: Asynchronously distribute lifecycle events * CORDA-2942: Await for complete P2P client start-up Next step: Add vault query to integration test * CORDA-2942: Asynchronously distribute lifecycle events Next step: Improve integration test * CORDA-2942: Fix test broken by recent changes and improve logging * CORDA-2942: Improvement of the test to be able to monitor actions performed by @CordaService in the remote process * CORDA-2942: Add node re-start step to the integration test * CORDA-2942: Remove `CORDAPP_STOPPED` event for now * CORDA-2942: s/CORDAPP_STARTED/STATE_MACHINE_STARTED/ * CORDA-2942: Inverse the meaning of `priority` as requested by @rick-r3 * CORDA-2942: Register `AppServiceHubImpl` for lifecycle events and put a warning when SMM is not ready. --- .ci/api-current.txt | 3 + .../configuration/parsing/internal/Utils.kt | 18 ++- .../net/corda/core/node/AppServiceHub.kt | 30 +++- .../node/services/ServiceLifecycleObserver.kt | 36 +++++ detekt-baseline.xml | 44 ++---- docs/source/api-service-classes.rst | 49 +++++-- docs/source/changelog.rst | 3 +- .../internal/lifecycle/NodeInitialContext.kt | 10 ++ .../NodeLifecycleEventsDistributor.kt | 128 +++++++++++++++++ .../lifecycle/NodeLifecycleObserver.kt | 56 ++++++++ .../internal/lifecycle/NodeServicesContext.kt | 17 +++ ...cycleEventsDistributorMultiThreadedTest.kt | 61 +++++++++ .../services/JavaCordaServiceLifecycle.java | 64 +++++++++ .../CordaServiceIssueOnceAtStartupTests.kt | 129 ++++++++++++++++++ .../CordaServiceLifecycleFatalTests.kt | 65 +++++++++ .../services/CordaServiceLifecycleTests.kt | 74 ++++++++++ .../JavaCordaServiceLifecycleTests.kt | 31 +++++ .../net/corda/node/internal/AbstractNode.kt | 124 +++++++---------- .../corda/node/internal/AppServiceHubImpl.kt | 124 +++++++++++++++++ .../corda/node/internal/CordaRPCOpsImpl.kt | 6 +- .../kotlin/net/corda/node/internal/Node.kt | 1 + .../node/services/config/NodeConfiguration.kt | 3 +- .../services/config/NodeConfigurationImpl.kt | 4 +- .../schema/v1/V1NodeConfigurationSpec.kt | 3 +- .../node/services/messaging/Messaging.kt | 6 + .../services/messaging/P2PMessagingClient.kt | 7 + ...pointDumper.kt => CheckpointDumperImpl.kt} | 62 +++++---- .../SingleThreadedStateMachineManager.kt | 4 +- .../statemachine/StateMachineManager.kt | 7 +- .../net/corda/node/internal/NodeTest.kt | 5 +- .../config/NodeConfigurationImplTest.kt | 4 +- ...perTest.kt => CheckpointDumperImplTest.kt} | 27 ++-- .../vault/VaultSoftLockManagerTest.kt | 3 +- .../net/corda/testing/node/MockServices.kt | 4 + .../node/internal/InternalMockNetwork.kt | 2 + .../node/internal/MockNodeMessagingService.kt | 5 + 36 files changed, 1055 insertions(+), 164 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/node/services/ServiceLifecycleObserver.kt create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeInitialContext.kt create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributor.kt create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleObserver.kt create mode 100644 node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeServicesContext.kt create mode 100644 node-api/src/test/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributorMultiThreadedTest.kt create mode 100644 node/src/integration-test/java/net/corda/node/services/JavaCordaServiceLifecycle.java create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/CordaServiceIssueOnceAtStartupTests.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleFatalTests.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleTests.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/JavaCordaServiceLifecycleTests.kt create mode 100644 node/src/main/kotlin/net/corda/node/internal/AppServiceHubImpl.kt rename node/src/main/kotlin/net/corda/node/services/rpc/{CheckpointDumper.kt => CheckpointDumperImpl.kt} (88%) rename node/src/test/kotlin/net/corda/node/services/rpc/{CheckpointDumperTest.kt => CheckpointDumperImplTest.kt} (83%) diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 3ee04e51e0..a8e9cb94d9 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -3754,6 +3754,9 @@ public static final class net.corda.core.node.services.PartyInfo$SingleNode exte @NotNull public String toString() ## +public interface net.corda.core.node.services.ServiceLifecycleObserver + public abstract void onServiceLifecycleEvent(net.corda.core.node.services.ServiceLifecycleEvent) +## @CordaSerializable public final class net.corda.core.node.services.StatesNotAvailableException extends net.corda.core.flows.FlowException public (String, Throwable) diff --git a/common/configuration-parsing/src/main/kotlin/net/corda/common/configuration/parsing/internal/Utils.kt b/common/configuration-parsing/src/main/kotlin/net/corda/common/configuration/parsing/internal/Utils.kt index e1d83215a5..e7455a38a7 100644 --- a/common/configuration-parsing/src/main/kotlin/net/corda/common/configuration/parsing/internal/Utils.kt +++ b/common/configuration-parsing/src/main/kotlin/net/corda/common/configuration/parsing/internal/Utils.kt @@ -64,4 +64,20 @@ internal fun ConfigValue.serialize(options: ConfigRenderOptions = ConfigRenderOp internal typealias Valid = Validated -internal fun valid(target: TYPE) = Validated.valid(target) \ No newline at end of file +internal fun valid(target: TYPE) = Validated.valid(target) + +/** + * Value extracted from a configuration file is a function of the actual value specified and configuration options. + * E.g. password value may be stored in the encrypted form rather than in a clear text. + */ +data class ConfigurationWithOptions(private val config: Config, private val options: Configuration.Validation.Options) { + operator fun get(property: Configuration.Property.Definition): TYPE = property.valueIn(config) + operator fun get(property: Configuration.Value.Extractor): TYPE = property.valueIn(config) +} + +/** + * Helper interface to mark objects that will have [ConfigurationWithOptions] in them. + */ +interface ConfigurationWithOptionsContainer { + val configurationWithOptions : ConfigurationWithOptions +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt index 9cf24a81fc..e1726a28ed 100644 --- a/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt @@ -4,6 +4,8 @@ import net.corda.core.DeleteForDJVM import net.corda.core.flows.FlowLogic import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.FlowProgressHandle +import net.corda.core.node.services.ServiceLifecycleEvent +import net.corda.core.node.services.ServiceLifecycleObserver import net.corda.core.node.services.vault.CordaTransactionSupport import rx.Observable @@ -17,6 +19,12 @@ import rx.Observable @DeleteForDJVM interface AppServiceHub : ServiceHub { + companion object { + const val SERVICE_PRIORITY_HIGH = 200 + const val SERVICE_PRIORITY_NORMAL = 100 + const val SERVICE_PRIORITY_LOW = 20 + } + /** * Start the given flow with the given arguments. [flow] must be annotated * with [net.corda.core.flows.StartableByService]. @@ -38,4 +46,24 @@ interface AppServiceHub : ServiceHub { * independent transaction from those used in the framework. */ val database: CordaTransactionSupport -} + + /** + * Allows to register [ServiceLifecycleObserver] such that it will start receiving [net.corda.core.node.services.ServiceLifecycleEvent]s + * + * @param priority controls to which queue [observer] will be added. Higher values correspond to higher priorities. + * + * @param observer an instance of [ServiceLifecycleObserver] to be registered. + */ + fun register(priority: Int = SERVICE_PRIORITY_NORMAL, observer: ServiceLifecycleObserver) + + /** + * Convenience method to be able to add an arbitrary function as a [register] callback. + */ + fun register(priority: Int = SERVICE_PRIORITY_NORMAL, + func: (ServiceLifecycleEvent) -> T) = register(priority, + object : ServiceLifecycleObserver { + override fun onServiceLifecycleEvent(event: ServiceLifecycleEvent) { + func(event) + } + }) +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/node/services/ServiceLifecycleObserver.kt b/core/src/main/kotlin/net/corda/core/node/services/ServiceLifecycleObserver.kt new file mode 100644 index 0000000000..9921894f80 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/node/services/ServiceLifecycleObserver.kt @@ -0,0 +1,36 @@ +package net.corda.core.node.services + +import java.lang.Exception + +/** + * Specifies that given [CordaService] is interested to know about important milestones of Corda Node lifecycle and potentially react to them. + * Subscription can be performed using [net.corda.core.node.AppServiceHub.register] method from a constructor of [CordaService]. + */ +@FunctionalInterface +interface ServiceLifecycleObserver { + /** + * A handler for [ServiceLifecycleEvent]s. + * Default implementation does nothing. + */ + @Throws(CordaServiceCriticalFailureException::class) + fun onServiceLifecycleEvent(event: ServiceLifecycleEvent) +} + +enum class ServiceLifecycleEvent { + /** + * This event is dispatched when State Machine is fully started such that [net.corda.core.node.AppServiceHub] available + * for [CordaService] to be use. + * + * If a handler for this event throws [CordaServiceCriticalFailureException] - this is the way to flag that it will not make + * sense for Corda node to continue its operation. The lifecycle events dispatcher will endeavor to terminate node's JVM as soon + * as practically possible. + */ + STATE_MACHINE_STARTED, +} + +/** + * Please see [ServiceLifecycleEvent.STATE_MACHINE_STARTED] for the purpose of this exception. + */ +class CordaServiceCriticalFailureException(message : String, cause: Throwable?) : Exception(message, cause) { + constructor(message : String) : this(message, null) +} \ No newline at end of file diff --git a/detekt-baseline.xml b/detekt-baseline.xml index 9995711eec..1c384a0d9e 100644 --- a/detekt-baseline.xml +++ b/detekt-baseline.xml @@ -117,8 +117,8 @@ ComplexMethod:CheckpointAgent.kt$CheckpointHook$private fun instrumentClass(clazz: CtClass): CtClass? ComplexMethod:CheckpointAgent.kt$CheckpointHook$private fun prettyStatsTree(indent: Int, statsInfo: StatsInfo, identityInfo: IdentityInfo, builder: StringBuilder) ComplexMethod:CheckpointAgent.kt$fun readTrees(events: List<StatsEvent>, index: Int, idMap: IdentityHashMap<Any, IdentityInfo>): Pair<Int, List<Pair<StatsInfo, IdentityInfo>>> - ComplexMethod:CheckpointDumper.kt$CheckpointDumper$fun dump() - ComplexMethod:CheckpointDumper.kt$CheckpointDumper$private fun FlowIORequest<*>.toSuspendedOn(suspendedTimestamp: Instant, now: Instant): SuspendedOn + ComplexMethod:CheckpointDumperImpl.kt$CheckpointDumperImpl$fun dumpCheckpoints() + ComplexMethod:CheckpointDumperImpl.kt$CheckpointDumperImpl$private fun FlowIORequest<*>.toSuspendedOn(suspendedTimestamp: Instant, now: Instant): SuspendedOn ComplexMethod:ClassCarpenter.kt$ClassCarpenterImpl$ private fun validateSchema(schema: Schema) ComplexMethod:CompatibleTransactionTests.kt$CompatibleTransactionTests$@Test fun `Command visibility tests`() ComplexMethod:ConfigUtilities.kt$// For Iterables figure out the type parameter and apply the same logic as above on the individual elements. private fun Iterable<*>.toConfigIterable(field: Field): Iterable<Any?> @@ -190,15 +190,11 @@ ComplexMethod:SendTransactionFlow.kt$DataVendingFlow$@Suspendable override fun call(): Void? ComplexMethod:ShellCmdLineOptions.kt$ShellCmdLineOptions$private fun toConfigFile(): Config ComplexMethod:ShellCmdLineOptions.kt$ShellConfigurationFile.ShellConfigFile$fun toShellConfiguration(): ShellConfiguration - ComplexMethod:SignedTransaction.kt$SignedTransaction$// TODO: Verify contract constraints here as well as in LedgerTransaction to ensure that anything being deserialised // from the attachment is trusted. This will require some partial serialisation work to not load the ContractState // objects from the TransactionState. @DeleteForDJVM private fun verifyRegularTransaction(services: ServiceHub, checkSufficientSignatures: Boolean) ComplexMethod:StartedFlowTransition.kt$StartedFlowTransition$override fun transition(): TransitionResult ComplexMethod:StatusTransitions.kt$StatusTransitions$ fun verify(tx: LedgerTransaction) ComplexMethod:StringToMethodCallParser.kt$StringToMethodCallParser$ @Throws(UnparseableCallException::class) fun parse(target: T?, command: String): ParsedMethodCall ComplexMethod:TlsDiffAlgorithmsTest.kt$TlsDiffAlgorithmsTest$@Test fun testClientServerTlsExchange() ComplexMethod:TlsDiffProtocolsTest.kt$TlsDiffProtocolsTest$@Test fun testClientServerTlsExchange() - ComplexMethod:TransactionBuilder.kt$TransactionBuilder$ fun withItems(vararg items: Any) - ComplexMethod:TransactionBuilder.kt$TransactionBuilder$ private fun addMissingDependency(services: ServicesForResolution, wireTx: WireTransaction): Boolean - ComplexMethod:TransactionBuilder.kt$TransactionBuilder$ private fun handleContract( contractClassName: ContractClassName, inputStates: List<TransactionState<ContractState>>?, outputStates: List<TransactionState<ContractState>>?, explicitContractAttachment: AttachmentId?, services: ServicesForResolution ): Pair<AttachmentId, List<TransactionState<ContractState>>?> ComplexMethod:TransactionUtils.kt$ fun createComponentGroups(inputs: List<StateRef>, outputs: List<TransactionState<ContractState>>, commands: List<Command<*>>, attachments: List<SecureHash>, notary: Party?, timeWindow: TimeWindow?, references: List<StateRef>, networkParametersHash: SecureHash?): List<ComponentGroup> ComplexMethod:TypeModellingFingerPrinter.kt$FingerPrintingState$// For a type we haven't seen before, determine the correct path depending on the type of type it is. private fun fingerprintNewType(type: LocalTypeInformation) ComplexMethod:UniversalContract.kt$UniversalContract$override fun verify(tx: LedgerTransaction) @@ -1308,8 +1304,7 @@ MaxLineLength:AbstractNode.kt$AbstractNode$throw IllegalStateException("CryptoService and signingCertificateStore are not aligned, the entry for key-alias: $alias is only found in $keyExistsIn") MaxLineLength:AbstractNode.kt$AbstractNode$val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(configuration.cordappDirectories), attachments).tokenize() MaxLineLength:AbstractNode.kt$AbstractNode$val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, networkParametersStorage, transactionStorage).also { attachments.servicesForResolution = it } - MaxLineLength:AbstractNode.kt$AbstractNode.AppServiceHubImpl$override val database: CordaTransactionSupport - MaxLineLength:AbstractNode.kt$AbstractNode.AppServiceHubImpl$require(logicType.isAnnotationPresent(StartableByService::class.java)) { "${logicType.name} was not designed for starting by a CordaService" } + MaxLineLength:AbstractNode.kt$AbstractNode.<no name provided>$// Note: tokenizableServices passed by reference meaning that any subsequent modification to the content in the `AbstractNode` will // be reflected in the context as well. However, since context only has access to immutable collection it can only read (but not modify) // the content. override val tokenizableServices: List<SerializeAsToken> = this@AbstractNode.tokenizableServices!! MaxLineLength:AbstractNode.kt$ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.", ex) MaxLineLength:AbstractNode.kt$ex.cause is ClassNotFoundException -> throw CouldNotCreateDataSourceException("Could not find the database driver class. Please add it to the 'drivers' folder. See: https://docs.corda.net/corda-configuration-file.html") MaxLineLength:AbstractNode.kt$fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set<MappedSchema>, metricRegistry: MetricRegistry? = null, cordappLoader: CordappLoader? = null, currentDir: Path? = null, ourName: CordaX500Name) @@ -1342,6 +1337,8 @@ MaxLineLength:AnalyticsEngine.kt$OGSIMMAnalyticsEngine$val t = BimmAnalysisUtils.computeMargin(combinedRatesProvider, normalizer, calculatorTotal, it.value.currencyParameterSensitivities, it.value.multiCurrencyAmount) MaxLineLength:AnonymousParty.kt$AnonymousParty : DestinationAbstractParty MaxLineLength:AnotherDummyContract.kt$AnotherDummyContract$return TransactionBuilder(notary).withItems(StateAndContract(state, ANOTHER_DUMMY_PROGRAM_ID), Command(Commands.Create(), owner.party.owningKey)) + MaxLineLength:AppServiceHubImpl.kt$AppServiceHubImpl$require(logicType.isAnnotationPresent(StartableByService::class.java)) { "${logicType.name} was not designed for starting by a CordaService" } + MaxLineLength:AppServiceHubImpl.kt$AppServiceHubImpl.Companion.NodeLifecycleServiceObserverAdaptor$private MaxLineLength:AppendOnlyPersistentMap.kt$AppendOnlyPersistentMapBase$ operator fun set(key: K, value: V) MaxLineLength:AppendOnlyPersistentMap.kt$AppendOnlyPersistentMapBase$log.warn("Double insert in ${this.javaClass.name} for entity class $persistentEntityClass key $key, not inserting the second time") MaxLineLength:AppendOnlyPersistentMap.kt$AppendOnlyPersistentMapBase$oldValueInCache @@ -1598,9 +1595,8 @@ MaxLineLength:CheckpointAgent.kt$log.debug { "Skipping repeated StatsEvent.Enter: ${exit.value} (hashcode:${exit.value!!.hashCode()}) (count:${idMap[exit.value]?.refCount})" } MaxLineLength:CheckpointAgent.kt$log.debug { "Skipping repeated StatsEvent.Exit: ${event.value} (hashcode:${event.value!!.hashCode()}) (count:${idMap[event.value]?.refCount})" } MaxLineLength:CheckpointAgent.kt$log.debug { "Skipping repeated StatsEvent.ObjectField: ${event.value} (hashcode:${event.value.hashCode()}) (count:${idMap[event.value]?.refCount})" } - MaxLineLength:CheckpointDumper.kt$CheckpointDumper - MaxLineLength:CheckpointDumper.kt$CheckpointDumper.CheckpointDumperBeanModifier$it.type.isTypeOrSubTypeOf(ProgressTracker::class.java) || it.name == "_stateMachine" || it.name == "deprecatedPartySessionMap" - MaxLineLength:CheckpointDumperTest.kt$CheckpointDumperTest$val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, myself.identity.party, SubFlowVersion.CoreFlow(version), false) .getOrThrow() + MaxLineLength:CheckpointDumperImpl.kt$CheckpointDumperImpl.CheckpointDumperBeanModifier$it.type.isTypeOrSubTypeOf(ProgressTracker::class.java) || it.name == "_stateMachine" || it.name == "deprecatedPartySessionMap" + MaxLineLength:CheckpointDumperImplTest.kt$CheckpointDumperImplTest$val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, myself.identity.party, SubFlowVersion.CoreFlow(version), false) .getOrThrow() MaxLineLength:CheckpointSerializationScheme.kt$CheckpointSerializationContextImpl$override val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist MaxLineLength:CheckpointVerifier.kt$CheckpointIncompatibleException$FlowVersionIncompatibleException : CheckpointIncompatibleException MaxLineLength:CheckpointVerifier.kt$CheckpointIncompatibleException$SubFlowCoreVersionIncompatibleException : CheckpointIncompatibleException @@ -2223,7 +2219,6 @@ MaxLineLength:IRSDemoDockerTest.kt$IRSDemoDockerTest.Companion$DockerComposeRule.builder() .files(DockerComposeFiles.from( System.getProperty("CORDAPP_DOCKER_COMPOSE"), System.getProperty("WEB_DOCKER_COMPOSE"))) .waitingForService("web-a", HealthChecks.toRespondOverHttp(8080, { port -> port.inFormat("http://\$HOST:\$EXTERNAL_PORT") })) MaxLineLength:IRSDemoTest.kt$IRSDemoTest$val (controllerApi, nodeAApi, nodeBApi) = listOf(controller, nodeA, nodeB).zip(listOf(controllerAddr, nodeAAddr, nodeBAddr)).map { val mapper = JacksonSupport.createDefaultMapper(it.first.rpc) registerFinanceJSONMappers(mapper) registerIRSModule(mapper) HttpApi.fromHostAndPort(it.second, "api/irs", mapper = mapper) } MaxLineLength:IRSDemoTest.kt$IRSDemoTest.InterestRateSwapStateDeserializer$InterestRateSwap.State(fixedLeg = fixedLeg, floatingLeg = floatingLeg, calculation = calculation, common = common, linearId = linearId, oracle = oracle) - MaxLineLength:IRSState.kt$IRSState$return TransactionBuilder(notary).withItems(StateAndContract(state, IRS_PROGRAM_ID), Command(OGTrade.Commands.Agree(), participants.map { it.owningKey })) MaxLineLength:IRSTests.kt$"(floatingLeg.notional.pennies * (calculation.fixingSchedule.get(context.getDate('currentDate')).rate.ratioUnit.value))" MaxLineLength:IRSTests.kt$( // TODO: this seems to fail quite dramatically //expression = "fixedLeg.notional * fixedLeg.fixedRate", // TODO: How I want it to look //expression = "( fixedLeg.notional * (fixedLeg.fixedRate)) - (floatingLeg.notional * (rateSchedule.get(context.getDate('currentDate'))))", // How it's ended up looking, which I think is now broken but it's a WIP. expression = Expression("( fixedLeg.notional.pennies * (fixedLeg.fixedRate.ratioUnit.value)) -" + "(floatingLeg.notional.pennies * (calculation.fixingSchedule.get(context.getDate('currentDate')).rate.ratioUnit.value))"), floatingLegPaymentSchedule = mutableMapOf(), fixedLegPaymentSchedule = mutableMapOf() ) MaxLineLength:IRSTests.kt$IRSTests$( "fixedLeg.notional.quantity", "fixedLeg.fixedRate.ratioUnit", "fixedLeg.fixedRate.ratioUnit.value", "floatingLeg.notional.quantity", "fixedLeg.fixedRate", "currentBusinessDate", "calculation.floatingLegPaymentSchedule.get(currentBusinessDate)", "fixedLeg.notional.token.currencyCode", "fixedLeg.notional.quantity * 10", "fixedLeg.notional.quantity * fixedLeg.fixedRate.ratioUnit.value", "(fixedLeg.notional.token.currencyCode.equals('GBP')) ? 365 : 360 ", "(fixedLeg.notional.quantity * (fixedLeg.fixedRate.ratioUnit.value))" // "calculation.floatingLegPaymentSchedule.get(context.getDate('currentDate')).rate" // "calculation.floatingLegPaymentSchedule.get(context.getDate('currentDate')).rate.ratioUnit.value", //"( fixedLeg.notional.pennies * (fixedLeg.fixedRate.ratioUnit.value)) - (floatingLeg.notional.pennies * (calculation.fixingSchedule.get(context.getDate('currentDate')).rate.ratioUnit.value))", // "( fixedLeg.notional * fixedLeg.fixedRate )" ) @@ -2269,7 +2264,6 @@ MaxLineLength:InMemoryMessagingNetwork.kt$InMemoryMessagingNetwork$peersMapping[messagingService.myAddress.name] = messagingService.myAddress MaxLineLength:InMemoryMessagingNetwork.kt$InMemoryMessagingNetwork$private val servicePeerAllocationStrategy: ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random() MaxLineLength:InMemoryMessagingNetwork.kt$InMemoryMessagingNetwork.Companion$servicePeerAllocationStrategy: ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random() - MaxLineLength:InMemoryTransactionVerifierService.kt$InMemoryTransactionVerifierService : SingletonSerializeAsTokenTransactionVerifierServiceTransactionVerifierServiceInternalAutoCloseable MaxLineLength:InfrequentlyMutatedCacheTest.kt$InfrequentlyMutatedCacheTest$@Test fun `fourth get outside first transaction from empty cache with invalidate in other thread in the middle returns result of second loader`() MaxLineLength:InitialRegistrationCli.kt$InitialRegistration : RunAfterNodeInitialisationNodeStartupLogging MaxLineLength:InitialRegistrationCli.kt$InitialRegistration$println("Node was started before with `--initial-registration`, but the registration was not completed.\nResuming registration.") @@ -2663,6 +2657,7 @@ MaxLineLength:NodeKeystoreCheckTest.kt$NodeKeystoreCheckTest$setPrivateKey(X509Utilities.CORDA_CLIENT_CA, nodeCA.keyPair.private, listOf(badNodeCACert, badRoot), signingCertStore.entryPassword) MaxLineLength:NodeKeystoreCheckTest.kt$NodeKeystoreCheckTest$val badNodeCACert = X509Utilities.createCertificate(CertificateType.NODE_CA, badRoot, badRootKeyPair, ALICE_NAME.x500Principal, nodeCA.keyPair.public) MaxLineLength:NodeKeystoreCheckTest.kt$NodeKeystoreCheckTest$val p2pSslConfig = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory, keyStorePassword = keystorePassword, trustStorePassword = keystorePassword) + MaxLineLength:NodeLifecycleEventsDistributor.kt$NodeLifecycleEventsDistributor MaxLineLength:NodeMonitorModel.kt$NodeMonitorModel${ rpc = CordaRPCClient(nodeHostAndPort).start(username, password, GracefulReconnect()) proxyObservable.value = rpc.proxy // Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates val ( statesSnapshot, vaultUpdates ) = rpc.proxy.vaultTrackBy<ContractState>( QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL), PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE) ) val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ -> statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED }.toSet() val consumedStates = statesSnapshot.states.toSet() - unconsumedStates val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet()) vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject::onNext) // Transactions val (transactions, newTransactions) = @Suppress("DEPRECATION") rpc.proxy.internalVerifiedTransactionsFeed() newTransactions.startWith(transactions).subscribe(transactionsSubject::onNext) // SM -> TX mapping val (smTxMappings, futureSmTxMappings) = rpc.proxy.stateMachineRecordedTransactionMappingFeed() futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject::onNext) // Parties on network val (parties, futurePartyUpdate) = rpc.proxy.networkMapFeed() futurePartyUpdate.startWith(parties.map(MapChange::Added)).subscribe(networkMapSubject::onNext) val stateMachines = rpc.proxy.stateMachinesSnapshot() notaryIdentities = rpc.proxy.notaryIdentities() // Extract the flow tracking stream // TODO is there a nicer way of doing this? Stream of streams in general results in code like this... // TODO `progressTrackingSubject` doesn't seem to be used anymore - should it be removed? val currentProgressTrackerUpdates = stateMachines.mapNotNull { stateMachine -> ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachine) } val futureProgressTrackerUpdates = stateMachineUpdatesSubject.map { stateMachineUpdate -> if (stateMachineUpdate is StateMachineUpdate.Added) { ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo) ?: Observable.empty<ProgressTrackingEvent>() } else { Observable.empty<ProgressTrackingEvent>() } } // We need to retry, because when flow errors, we unsubscribe from progressTrackingSubject. So we end up with stream of state machine updates and no progress trackers. futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject) } MaxLineLength:NodeNamedCache.kt$DefaultNamedCacheFactory$name.startsWith("RPCSecurityManagerShiroCache_") -> with(security?.authService?.options?.cache!!) { caffeine.maximumSize(maxEntries).expireAfterWrite(expireAfterSecs, TimeUnit.SECONDS) } MaxLineLength:NodeNamedCache.kt$DefaultNamedCacheFactory$open @@ -2934,7 +2929,6 @@ MaxLineLength:PortfolioApiUtils.kt$PortfolioApiUtils$InitialMarginView MaxLineLength:PortfolioApiUtils.kt$PortfolioApiUtils$val processedSensitivities = valuation.totalSensivities.sensitivities.map { it.marketDataName to it.parameterMetadata.map { it.label }.zip(it.sensitivity.toList()).toMap() }.toMap() MaxLineLength:PortfolioApiUtils.kt$PortfolioApiUtils$val yieldCurveCurrenciesValues = marketData.filter { !it.key.contains("/") }.map { it -> Triple(it.key.split("-")[0], it.key.split("-", limit = 2)[1], it.value) } - MaxLineLength:PortfolioState.kt$PortfolioState$return TransactionBuilder(notary).withItems(StateAndContract(copy(), PORTFOLIO_SWAP_PROGRAM_ID), Command(PortfolioSwap.Commands.Agree(), participants.map { it.owningKey })) MaxLineLength:PrintingInterceptor.kt$PrintingInterceptor$val transitionRecord = TransitionDiagnosticRecord(Instant.now(), fiber.id, previousState, nextState, event, transition, continuation) MaxLineLength:ProgressTracker.kt$ProgressTracker$log.warnOnce("Found ProgressTracker Step(s) with the same label: ${labels.groupBy { it }.filter { it.value.size > 1 }.map { it.key }}") MaxLineLength:ProgressTracker.kt$ProgressTracker.Step$private fun definitionLocation(): String @@ -3200,6 +3194,7 @@ MaxLineLength:ServiceHub.kt$ServiceHub$private MaxLineLength:ServiceHub.kt$ServiceHub$signInitialTransaction(builder, publicKey, SignatureMetadata(myInfo.platformVersion, Crypto.findSignatureScheme(publicKey).schemeNumberID)) MaxLineLength:ServiceHubInternal.kt$ServiceHubInternal.Companion$vaultService.notifyAll(statesToRecord, recordedTransactions.map { it.coreTransaction }, previouslySeenTxs.map { it.coreTransaction }) + MaxLineLength:ServiceLifecycleObserver.kt$ServiceLifecycleObserver MaxLineLength:ServicesForResolutionImpl.kt$ServicesForResolutionImpl$else -> throw UnsupportedOperationException("Attempting to resolve attachment for index ${stateRef.index} of a ${ctx.javaClass} transaction. This is not supported.") MaxLineLength:ServicesForResolutionImpl.kt$ServicesForResolutionImpl$if (attachment is ContractAttachment && (forContractClassName ?: transactionState.contract) in attachment.allContracts) { return attachment } MaxLineLength:ServicesForResolutionImpl.kt$ServicesForResolutionImpl$return attachments.openAttachment(ctx.upgradedContractAttachmentId) ?: throw AttachmentResolutionException(stateRef.txhash) @@ -3212,8 +3207,6 @@ MaxLineLength:SignatureConstraintMigrationFromWhitelistConstraintTests.kt$SignatureConstraintMigrationFromWhitelistConstraintTests$@Test fun `auto migration from WhitelistConstraint to SignatureConstraint will only transition states that do not have a constraint specified`() MaxLineLength:SignedNodeInfo.kt$NodeInfoAndSigned$constructor(nodeInfo: NodeInfo, signer: (PublicKey, SerializedBytes<NodeInfo>) -> DigitalSignature) : this(nodeInfo, nodeInfo.sign(signer)) MaxLineLength:SignedTransaction.kt$SignedTransaction : TransactionWithSignatures - MaxLineLength:SignedTransaction.kt$SignedTransaction$ |If you wish to verify this transaction, please contact the originator of the transaction and install the provided missing JAR. - MaxLineLength:SignedTransaction.kt$SignedTransaction$"""Transaction $ltx is incorrectly formed. Most likely it was created during version 3 of Corda when the verification logic was more lenient. |Attempted to find local dependency for class: $missingClass, but could not find one. |If you wish to verify this transaction, please contact the originator of the transaction and install the provided missing JAR. |You can install it using the RPC command: `uploadAttachment` without restarting the node. |""" MaxLineLength:SignedTransaction.kt$SignedTransaction$?: MaxLineLength:SignedTransaction.kt$SignedTransaction$@Throws(SignatureException::class, AttachmentResolutionException::class, TransactionResolutionException::class, TransactionVerificationException::class) MaxLineLength:SignedTransaction.kt$SignedTransaction$SignaturesMissingException : NamedByHashSignatureExceptionCordaThrowable @@ -3389,11 +3382,8 @@ MaxLineLength:TransactionBuilder.kt$TransactionBuilder$ private fun attachmentConstraintsTransition( constraints: Set<AttachmentConstraint>, attachmentToUse: ContractAttachment, services: ServicesForResolution ): AttachmentConstraint MaxLineLength:TransactionBuilder.kt$TransactionBuilder$ private fun handleContract( contractClassName: ContractClassName, inputStates: List<TransactionState<ContractState>>?, outputStates: List<TransactionState<ContractState>>?, explicitContractAttachment: AttachmentId?, services: ServicesForResolution ): Pair<AttachmentId, List<TransactionState<ContractState>>?> MaxLineLength:TransactionBuilder.kt$TransactionBuilder$ private fun selectAttachmentConstraint( contractClassName: ContractClassName, inputStates: List<TransactionState<ContractState>>?, attachmentToUse: ContractAttachment, services: ServicesForResolution): AttachmentConstraint - MaxLineLength:TransactionBuilder.kt$TransactionBuilder$ private fun selectContractAttachmentsAndOutputStateConstraints( services: ServicesForResolution, @Suppress("UNUSED_PARAMETER") serializationContext: SerializationContext? ): Pair<Collection<SecureHash>, List<TransactionState<ContractState>>> MaxLineLength:TransactionBuilder.kt$TransactionBuilder$"An attachment has been explicitly set for contract $contractClassName in the transaction builder which conflicts with the HashConstraint of a state." MaxLineLength:TransactionBuilder.kt$TransactionBuilder$"Transaction was built with $contractClassName states with multiple HashConstraints. This is illegal, because it makes it impossible to validate with a single version of the contract code." - MaxLineLength:TransactionBuilder.kt$TransactionBuilder$(allContractAttachments + attachments).toSortedSet().toList() - MaxLineLength:TransactionBuilder.kt$TransactionBuilder$@CordaInternal internal MaxLineLength:TransactionBuilder.kt$TransactionBuilder$constraints.any { it is WhitelistedByZoneAttachmentConstraint } && attachmentToUse.isSigned && services.networkParameters.minimumPlatformVersion >= 4 -> transitionToSignatureConstraint(constraints, attachmentToUse) MaxLineLength:TransactionBuilder.kt$TransactionBuilder$if ((attachment as ContractAttachment).isSigned && (explicitContractAttachment == null || explicitContractAttachment == attachment.id)) { val signatureConstraint = makeSignatureAttachmentConstraint(attachment.signerKeys) require(signatureConstraint.isSatisfiedBy(attachment)) { "Selected output constraint: $signatureConstraint not satisfying ${attachment.id}" } val resolvedOutputStates = outputStates?.map { if (it.constraint in automaticConstraints) { it.copy(constraint = signatureConstraint) } else { it } } return attachment.id to resolvedOutputStates } MaxLineLength:TransactionBuilder.kt$TransactionBuilder$internal @@ -3403,17 +3393,14 @@ MaxLineLength:TransactionBuilder.kt$TransactionBuilder$private fun useWhitelistedByZoneAttachmentConstraint(contractClassName: ContractClassName, networkParameters: NetworkParameters) MaxLineLength:TransactionBuilder.kt$TransactionBuilder$require(automaticConstraintPropagation) { "Contract $contractClassName was marked with @NoConstraintPropagation, which means the constraint of the output states has to be set explicitly." } MaxLineLength:TransactionBuilder.kt$TransactionBuilder$require(defaultOutputConstraint.isSatisfiedBy(constraintAttachment)) { "Selected output constraint: $defaultOutputConstraint not satisfying $selectedAttachmentId" } - MaxLineLength:TransactionBuilder.kt$TransactionBuilder$require(explicitAttachmentContracts.isEmpty() || explicitAttachmentContracts.groupBy { (ctr, _) -> ctr }.all { (_, groups) -> groups.size == 1 }) { "Multiple attachments set for the same contract." } MaxLineLength:TransactionBuilder.kt$TransactionBuilder$require(outputConstraint.canBeTransitionedFrom(input.constraint, attachmentToUse)) { "Output state constraint $outputConstraint cannot be transitioned from ${input.constraint}" } MaxLineLength:TransactionBuilder.kt$TransactionBuilder$require(signatureConstraint.isSatisfiedBy(attachment)) { "Selected output constraint: $signatureConstraint not satisfying ${attachment.id}" } MaxLineLength:TransactionBuilder.kt$TransactionBuilder$throw IllegalArgumentException("Attempting to create an illegal transaction. Please install the latest signed version for the $attachmentToUse Cordapp.") MaxLineLength:TransactionBuilder.kt$TransactionBuilder$throw IllegalArgumentException("Can't mix the AlwaysAcceptAttachmentConstraint with a secure constraint in the same transaction. This can be used to hide insecure transitions.") - MaxLineLength:TransactionBuilder.kt$TransactionBuilder$val (allContractAttachments: Collection<SecureHash>, resolvedOutputs: List<TransactionState<ContractState>>) = selectContractAttachmentsAndOutputStateConstraints(services, serializationContext) MaxLineLength:TransactionBuilder.kt$TransactionBuilder$val attachments: Collection<AttachmentId> = contractAttachmentsAndResolvedOutputStates.map { it.first } + refStateContractAttachments MaxLineLength:TransactionBuilder.kt$TransactionBuilder$val automaticConstraintPropagation = contractClassName.contractHasAutomaticConstraintPropagation(inputsAndOutputs.first().data::class.java.classLoader) MaxLineLength:TransactionBuilder.kt$TransactionBuilder$val constraintAttachment = AttachmentWithContext(attachmentToUse, contractClassName, services.networkParameters.whitelistedContractImplementations) MaxLineLength:TransactionBuilder.kt$TransactionBuilder$val contractAttachmentsAndResolvedOutputStates: List<Pair<AttachmentId, List<TransactionState<ContractState>>?>> = allContracts.toSet() .map { ctr -> handleContract(ctr, inputContractGroups[ctr], outputContractGroups[ctr], explicitAttachmentContractsMap[ctr], services) } - MaxLineLength:TransactionBuilder.kt$TransactionBuilder$val referenceStateGroups: Map<ContractClassName, List<TransactionState<ContractState>>> = referencesWithTransactionState.groupBy { it.contract } MaxLineLength:TransactionBuilder.kt$TransactionBuilder$val resolvedOutputStatesInTheOriginalOrder: List<TransactionState<ContractState>> = outputStates().map { os -> resolvedStates.find { rs -> rs.data == os.data && rs.encumbrance == os.encumbrance }!! } MaxLineLength:TransactionBuilder.kt$TransactionBuilder$when { // Sanity check. constraints.isEmpty() -> throw IllegalArgumentException("Cannot transition from no constraints.") // Fail when combining the insecure AlwaysAcceptAttachmentConstraint with something else. constraints.size > 1 && constraints.any { it is AlwaysAcceptAttachmentConstraint } -> throw IllegalArgumentException("Can't mix the AlwaysAcceptAttachmentConstraint with a secure constraint in the same transaction. This can be used to hide insecure transitions.") // Multiple states with Hash constraints with different hashes. This should not happen as we checked already. constraints.size > 1 && constraints.all { it is HashAttachmentConstraint } -> throw IllegalArgumentException("Cannot mix HashConstraints with different hashes in the same transaction.") // The HashAttachmentConstraint is the strongest constraint, so it wins when mixed with anything. As long as the actual constraints pass. // Migration from HashAttachmentConstraint to SignatureAttachmentConstraint is handled in [TransactionBuilder.handleContract] // If we have reached this point, then no migration is possible and the existing HashAttachmentConstraint must be used constraints.any { it is HashAttachmentConstraint } -> constraints.find { it is HashAttachmentConstraint }!! // TODO, we don't currently support mixing signature constraints with different signers. This will change once we introduce third party signers. constraints.count { it is SignatureAttachmentConstraint } > 1 -> throw IllegalArgumentException("Cannot mix SignatureAttachmentConstraints signed by different parties in the same transaction.") // This ensures a smooth migration from a Whitelist Constraint to a Signature Constraint constraints.any { it is WhitelistedByZoneAttachmentConstraint } && attachmentToUse.isSigned && services.networkParameters.minimumPlatformVersion >= 4 -> transitionToSignatureConstraint(constraints, attachmentToUse) // This condition is hit when the current node has not installed the latest signed version but has already received states that have been migrated constraints.any { it is SignatureAttachmentConstraint } && !attachmentToUse.isSigned -> throw IllegalArgumentException("Attempting to create an illegal transaction. Please install the latest signed version for the $attachmentToUse Cordapp.") // When all input states have the same constraint. constraints.size == 1 -> constraints.single() else -> throw IllegalArgumentException("Unexpected constraints $constraints.") } MaxLineLength:TransactionBuilder.kt$TransactionBuilder${ // If the constraint on the output state is already set, and is not a valid transition or can't be transitioned, then fail early. inputStates?.forEach { input -> require(outputConstraint.canBeTransitionedFrom(input.constraint, attachmentToUse)) { "Output state constraint $outputConstraint cannot be transitioned from ${input.constraint}" } } require(outputConstraint.isSatisfiedBy(constraintAttachment)) { "Output state constraint check fails. $outputConstraint" } it } @@ -3752,7 +3739,6 @@ NestedBlockDepth:StatusTransitions.kt$StatusTransitions$ fun verify(tx: LedgerTransaction) NestedBlockDepth:ThrowableSerializer.kt$ThrowableSerializer$override fun fromProxy(proxy: ThrowableProxy): Throwable NestedBlockDepth:TransactionVerifierServiceInternal.kt$Verifier$ private fun verifyConstraintsValidity(contractAttachmentsByContract: Map<ContractClassName, ContractAttachment>) - SpreadOperator:AMQPSerializationScheme.kt$AbstractAMQPSerializationScheme$(*it.whitelist.toTypedArray()) SpreadOperator:AbstractNode.kt$FlowStarterImpl$(logicType, *args) SpreadOperator:AbstractParty.kt$AbstractParty$(*bytes) SpreadOperator:AbstractRPCTest.kt$AbstractRPCTest.Companion$(*modes) @@ -3929,15 +3915,11 @@ ThrowsCount:SchemaMigration.kt$SchemaMigration$private fun doRunMigration( run: Boolean, check: Boolean, existingCheckpoints: Boolean? = null ) ThrowsCount:ServicesForResolutionImpl.kt$ServicesForResolutionImpl$// We may need to recursively chase transactions if there are notary changes. fun inner(stateRef: StateRef, forContractClassName: String?): Attachment ThrowsCount:SignedNodeInfo.kt$SignedNodeInfo$// TODO Add root cert param (or TrustAnchor) to make sure all the identities belong to the same root fun verified(): NodeInfo - ThrowsCount:SignedTransaction.kt$SignedTransaction$// TODO: Verify contract constraints here as well as in LedgerTransaction to ensure that anything being deserialised // from the attachment is trusted. This will require some partial serialisation work to not load the ContractState // objects from the TransactionState. @DeleteForDJVM private fun verifyRegularTransaction(services: ServiceHub, checkSufficientSignatures: Boolean) ThrowsCount:SignedTransaction.kt$SignedTransaction$@DeleteForDJVM private fun resolveAndCheckNetworkParameters(services: ServiceHub) ThrowsCount:SingleThreadedStateMachineManager.kt$SingleThreadedStateMachineManager$private fun getInitiatedFlowFactory(message: InitialSessionMessage): InitiatedFlowFactory<*> ThrowsCount:StringToMethodCallParser.kt$StringToMethodCallParser$ @Throws(UnparseableCallException::class) fun parse(target: T?, command: String): ParsedMethodCall ThrowsCount:StringToMethodCallParser.kt$StringToMethodCallParser$ @Throws(UnparseableCallException::class) fun parseArguments(methodNameHint: String, parameters: List<Pair<String, Type>>, args: String): Array<Any?> ThrowsCount:StructuresTests.kt$AttachmentTest$@Test fun `openAsJAR does not leak file handle if attachment has corrupted manifest`() - ThrowsCount:TransactionBuilder.kt$TransactionBuilder$ fun withItems(vararg items: Any) - ThrowsCount:TransactionBuilder.kt$TransactionBuilder$ private fun addMissingDependency(services: ServicesForResolution, wireTx: WireTransaction): Boolean - ThrowsCount:TransactionBuilder.kt$TransactionBuilder$ private fun attachmentConstraintsTransition( constraints: Set<AttachmentConstraint>, attachmentToUse: ContractAttachment, services: ServicesForResolution ): AttachmentConstraint ThrowsCount:TransactionVerifierServiceInternal.kt$Verifier$ private fun getUniqueContractAttachmentsByContract(): Map<ContractClassName, ContractAttachment> ThrowsCount:TransactionVerifierServiceInternal.kt$Verifier$// Using basic graph theory, a full cycle of encumbered (co-dependent) states should exist to achieve bi-directional // encumbrances. This property is important to ensure that no states involved in an encumbrance-relationship // can be spent on their own. Briefly, if any of the states is having more than one encumbrance references by // other states, a full cycle detection will fail. As a result, all of the encumbered states must be present // as "from" and "to" only once (or zero times if no encumbrance takes place). For instance, // a -> b // c -> b and a -> b // b -> a b -> c // do not satisfy the bi-directionality (full cycle) property. // // In the first example "b" appears twice in encumbrance ("to") list and "c" exists in the encumbered ("from") list only. // Due the above, one could consume "a" and "b" in the same transaction and then, because "b" is already consumed, "c" cannot be spent. // // Similarly, the second example does not form a full cycle because "a" and "c" exist in one of the lists only. // As a result, one can consume "b" and "c" in the same transactions, which will make "a" impossible to be spent. // // On other hand the following are valid constructions: // a -> b a -> c // b -> c and c -> b // c -> a b -> a // and form a full cycle, meaning that the bi-directionality property is satisfied. private fun checkBidirectionalOutputEncumbrances(statesAndEncumbrance: List<Pair<Int, Int>>) ThrowsCount:WireTransaction.kt$WireTransaction$private fun toLedgerTransactionInternal( resolveIdentity: (PublicKey) -> Party?, resolveAttachment: (SecureHash) -> Attachment?, resolveStateRefAsSerialized: (StateRef) -> SerializedBytes<TransactionState<ContractState>>?, resolveParameters: (SecureHash?) -> NetworkParameters?, isAttachmentTrusted: (Attachment) -> Boolean ): LedgerTransaction @@ -3963,7 +3945,7 @@ TooGenericExceptionCaught:CertRole.kt$CertRole.Companion$ex: ArrayIndexOutOfBoundsException TooGenericExceptionCaught:CheckpointAgent.kt$CheckpointAgent.Companion$e: Exception TooGenericExceptionCaught:CheckpointAgent.kt$CheckpointHook$throwable: Throwable - TooGenericExceptionCaught:CheckpointDumper.kt$CheckpointDumper$e: Exception + TooGenericExceptionCaught:CheckpointDumperImpl.kt$CheckpointDumperImpl$e: Exception TooGenericExceptionCaught:CheckpointVerifier.kt$CheckpointVerifier$e: Exception TooGenericExceptionCaught:CollectSignaturesFlow.kt$SignTransactionFlow$e: Exception TooGenericExceptionCaught:ConcurrencyUtils.kt$t: Throwable @@ -4422,9 +4404,9 @@ WildcardImport:CertificateRevocationListNodeTests.kt$import net.corda.nodeapi.internal.crypto.* WildcardImport:CertificateRevocationListNodeTests.kt$import org.bouncycastle.asn1.x509.* WildcardImport:CertificatesUtils.kt$import net.corda.nodeapi.internal.crypto.* - WildcardImport:CheckpointDumper.kt$import com.fasterxml.jackson.databind.* - WildcardImport:CheckpointDumper.kt$import net.corda.core.internal.* - WildcardImport:CheckpointDumper.kt$import net.corda.node.services.statemachine.* + WildcardImport:CheckpointDumperImpl.kt$import com.fasterxml.jackson.databind.* + WildcardImport:CheckpointDumperImpl.kt$import net.corda.core.internal.* + WildcardImport:CheckpointDumperImpl.kt$import net.corda.node.services.statemachine.* WildcardImport:CheckpointSerializationAPI.kt$import net.corda.core.serialization.* WildcardImport:ClassCarpenter.kt$import org.objectweb.asm.Opcodes.* WildcardImport:ClassCarpenterTestUtils.kt$import net.corda.serialization.internal.amqp.* diff --git a/docs/source/api-service-classes.rst b/docs/source/api-service-classes.rst index 2d87b19790..2216e85694 100644 --- a/docs/source/api-service-classes.rst +++ b/docs/source/api-service-classes.rst @@ -32,7 +32,23 @@ Below is an empty implementation of a Service class: class MyCordaService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() { init { - // code ran at service creation / node startup + // Custom code ran at service creation + + // Optional: Express interest in receiving lifecycle events + services.register { processEvent(it) } + } + + private fun processEvent(event: ServiceLifecycleEvent) { + // Lifecycle event handling code including full use of serviceHub + when (event) { + STATE_MACHINE_STARTED -> { + services.vaultService.queryBy(...) + services.startFlow(...) + } + else -> { + // Process other types of events + } + } } // public api of service @@ -43,11 +59,26 @@ Below is an empty implementation of a Service class: @CordaService public class MyCordaService extends SingletonSerializeAsToken { - private AppServiceHub serviceHub; + private final AppServiceHub serviceHub; public MyCordaService(AppServiceHub serviceHub) { this.serviceHub = serviceHub; - // code ran at service creation / node startup + // Custom code ran at service creation + + // Optional: Express interest in receiving lifecycle events + serviceHub.register(SERVICE_PRIORITY_NORMAL, this::processEvent); + } + + private void processEvent(ServiceLifecycleEvent event) { + switch (event) { + case STATE_MACHINE_STARTED: + serviceHub.getVaultService().queryBy(...) + serviceHub.startFlow(...) + break; + default: + // Process other types of events + break; + } } // public api of service @@ -59,9 +90,9 @@ from ``AppServiceHub`` is explained further in :ref:`Starting Flows from a Servi The ``AppServiceHub`` also provides access to ``database`` which will enable the Service class to perform DB transactions from the threads managed by the Service. -Code can be run during node startup when the class is being initialised. To do so, place the code into the ``init`` block or constructor. -This is useful when a service needs to establish a connection to an external database or setup observables via ``ServiceHub.trackBy`` during -its startup. These can then persist during the service's lifetime. +Also the ``AppServiceHub`` provides ability for ``CordaService`` to subscribe for lifecycle events of the node, such that it will get notified +about node finishing initialisation and when the node is shutting down such that ``CordaService`` will be able to perform clean-up of some +critical resources. For more details please have refer to KDocs for ``ServiceLifecycleObserver``. Retrieving a Service -------------------- @@ -96,8 +127,4 @@ starting. To avoid this, the rules bellow should be followed: .. note:: It is possible to avoid deadlock without following these rules depending on the number of flows running within the node. But, if the number of flows violating these rules reaches the flow worker queue size, then the node will deadlock. It is best practice to - abide by these rules to remove this possibility. - - - - + abide by these rules to remove this possibility. \ No newline at end of file diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index cec4b78d96..8d879ea2d1 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -7,11 +7,12 @@ release, see :doc:`app-upgrade-notes`. Unreleased ---------- +* Added ability for ``CordaService`` to register for receiving node lifecycle events to perform initialisation and clean-up actions. + * Custom serializer classes implementing ``SerializationCustomSerializer`` should ideally be packaged in the same CorDapp as the contract classes. Corda 4 could therefore fail to verify transactions created with Corda 3 if their custom serializer classes had been packaged somewhere else. Add a "fallback mechanism" to Corda's transaction verification logic which will attempt to include any missing custom serializers from other CorDapps within ``AttachmentStorage``. - * ``AppServiceHub`` been extended to provide access to ``database`` which will enable the Service class to perform DB transactions from the threads managed by the custom Service. diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeInitialContext.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeInitialContext.kt new file mode 100644 index 0000000000..9296849ea8 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeInitialContext.kt @@ -0,0 +1,10 @@ +package net.corda.nodeapi.internal.lifecycle + +import net.corda.common.configuration.parsing.internal.ConfigurationWithOptionsContainer + +/** + * Bare minimum information which will be available even before node fully started-up. + */ +interface NodeInitialContext : ConfigurationWithOptionsContainer { + val platformVersion: Int +} \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributor.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributor.kt new file mode 100644 index 0000000000..ba88cf4873 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributor.kt @@ -0,0 +1,128 @@ +package net.corda.nodeapi.internal.lifecycle + +import com.google.common.util.concurrent.ThreadFactoryBuilder +import net.corda.core.concurrent.CordaFuture +import net.corda.core.internal.concurrent.map +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.node.services.CordaServiceCriticalFailureException +import net.corda.core.utilities.Try +import net.corda.core.utilities.contextLogger +import java.util.Collections.singleton +import java.util.LinkedList +import java.util.concurrent.Executors +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReadWriteLock +import java.util.concurrent.locks.ReentrantReadWriteLock +import kotlin.system.exitProcess + +/** + * Responsible for distributing of various `NodeLifecycleEvent` to `NodeLifecycleObserver`. + * + * This class may do it in an asynchronous fashion. Also it might listen to the feedback from observers on the notifications sent and perform + * actions depending on the observer's priority. + * + * The class is safe for concurrent use from multiple threads. + */ +class NodeLifecycleEventsDistributor { + + companion object { + private val log = contextLogger() + + private val criticalEventsClasses: Set> = setOf( + NodeLifecycleEvent.BeforeNodeStart::class.java, + NodeLifecycleEvent.AfterNodeStart::class.java, + NodeLifecycleEvent.StateMachineStarted::class.java) + private val criticalExceptionsClasses: Set> = setOf(CordaServiceCriticalFailureException::class.java) + } + + /** + * Order is maintained by priority and within equal priority by full class name. + */ + private val prioritizedObservers: MutableList = mutableListOf() + + private val readWriteLock: ReadWriteLock = ReentrantReadWriteLock() + + private val executor = Executors.newSingleThreadExecutor( + ThreadFactoryBuilder().setNameFormat("NodeLifecycleEventsDistributor-%d").build()) + + /** + * Adds observer to the distribution list. + */ + fun add(observer: T) : T { + addAll(singleton(observer)) + return observer + } + + /** + * Adds multiple observers to the distribution list. + */ + fun addAll(observers: Collection) : Collection { + + data class SortingKey(val priority: Int, val clazz: Class<*>) : Comparable { + override fun compareTo(other: SortingKey): Int { + if(priority != other.priority) { + // Reversing sorting order such that higher priorities come first + return other.priority - priority + } + // Within the same priority order alphabetically by class name to deterministic order + return clazz.name.compareTo(other.clazz.name) + } + } + + readWriteLock.writeLock().executeLocked { + prioritizedObservers.addAll(observers) + // In-place sorting + prioritizedObservers.sortBy { SortingKey(it.priority, it.javaClass) } + } + + return observers + } + + /** + * Distributes event to all the observers previously added + * + * @return [CordaFuture] to signal when distribution is finished and delivered to all the observers + */ + fun distributeEvent(event: NodeLifecycleEvent): CordaFuture { + val snapshot = readWriteLock.readLock().executeLocked { LinkedList(prioritizedObservers) } + + val result = openFuture() + + executor.execute { + val orderedSnapshot = if (event.reversedPriority) snapshot.reversed() else snapshot + orderedSnapshot.forEach { + log.debug("Distributing event $event to: $it") + val updateResult = it.update(event) + if (updateResult.isSuccess) { + log.debug("Event $event distribution outcome: $updateResult") + } else { + log.error("Failed to distribute event $event, failure outcome: $updateResult") + handlePossibleFatalTermination(event, updateResult as Try.Failure) + } + } + result.set(null) + } + return result.map { } + } + + private fun handlePossibleFatalTermination(event: NodeLifecycleEvent, updateFailed: Try.Failure) { + if (event.javaClass in criticalEventsClasses && updateFailed.exception.javaClass in criticalExceptionsClasses) { + log.error("During processing of $event critical failure been reported: $updateFailed. JVM will be terminated.") + exitProcess(1) + } else { + log.warn("During processing of $event non-critical failure been reported: $updateFailed.") + } + } + + /** + * Custom implementation vs. using [kotlin.concurrent.withLock] to allow interruption during lock acquisition. + */ + private fun Lock.executeLocked(block: () -> T) : T { + lockInterruptibly() + try { + return block() + } finally { + unlock() + } + } +} \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleObserver.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleObserver.kt new file mode 100644 index 0000000000..0232029188 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleObserver.kt @@ -0,0 +1,56 @@ +package net.corda.nodeapi.internal.lifecycle + +import net.corda.core.utilities.Try + +/** + * Interface to flag interest in the Corda Node lifecycle which involves being notified when the node is starting up or + * shutting down. + * Unlike [net.corda.core.node.services.ServiceLifecycleObserver] this is an internal interface that provides much richer + * functionality for interacting with node's internal services. + */ +interface NodeLifecycleObserver { + + companion object { + const val RPC_PRIORITY_HIGH = 1200 + const val RPC_PRIORITY_NORMAL = 1100 + const val RPC_PRIORITY_LOW = 1020 + + /** + * Helper method to create a string to flag successful processing of an event. + */ + @Suppress("unused") + inline fun T.reportSuccess(nodeLifecycleEvent: NodeLifecycleEvent) : String = + "${T::class.java} successfully processed $nodeLifecycleEvent" + } + + /** + * Used to inform `NodeLifecycleObserver` of certain `NodeLifecycleEvent`. + * + * @return If even been processed successfully and the are no error conditions `Try.Success` with brief status, otherwise `Try.Failure` + * with exception explaining what went wrong. + * It is down to subject (i.e. Node) to decide what to do in case of failure and decision may depend on the Observer's priority. + */ + fun update(nodeLifecycleEvent: NodeLifecycleEvent) : Try = Try.on { "${javaClass.simpleName} ignored $nodeLifecycleEvent" } + + /** + * It is possible to optionally override observer priority. + * + * `start` methods will be invoked in the ascending sequence priority order. For items with the same order alphabetical ordering + * of full class name will be applied. + * For `stop` methods, the order will be opposite to `start`. + */ + val priority: Int +} + +/** + * A set of events to flag the important milestones in the lifecycle of the node. + * @param reversedPriority flags whether it would make sense to notify observers in the reversed order. + */ +sealed class NodeLifecycleEvent(val reversedPriority: Boolean = false) { + class BeforeNodeStart(val nodeInitialContext: NodeInitialContext) : NodeLifecycleEvent() + class AfterNodeStart(val nodeServicesContext: T) : NodeLifecycleEvent() + class StateMachineStarted(val nodeServicesContext: T) : NodeLifecycleEvent() + class StateMachineStopped(val nodeServicesContext: T) : NodeLifecycleEvent(reversedPriority = true) + class BeforeNodeStop(val nodeServicesContext: T) : NodeLifecycleEvent(reversedPriority = true) + class AfterNodeStop(val nodeInitialContext: NodeInitialContext) : NodeLifecycleEvent(reversedPriority = true) +} \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeServicesContext.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeServicesContext.kt new file mode 100644 index 0000000000..58f6b25cb5 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeServicesContext.kt @@ -0,0 +1,17 @@ +package net.corda.nodeapi.internal.lifecycle + +import net.corda.core.CordaInternal +import net.corda.core.serialization.SerializeAsToken + +/** + * Defines a set of properties that will be available for services to perform useful activity with side effects. + */ +interface NodeServicesContext : NodeInitialContext { + + /** + * Special services which upon serialisation will be represented in the stream by a special token. On the remote side + * during deserialization token will be read and corresponding instance found and wired as necessary. + */ + @CordaInternal + val tokenizableServices: List +} \ No newline at end of file diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributorMultiThreadedTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributorMultiThreadedTest.kt new file mode 100644 index 0000000000..d143ba89a4 --- /dev/null +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributorMultiThreadedTest.kt @@ -0,0 +1,61 @@ +package net.corda.nodeapi.internal.lifecycle + +import com.nhaarman.mockito_kotlin.mock +import net.corda.core.internal.stream +import net.corda.core.utilities.Try +import net.corda.core.utilities.contextLogger +import org.junit.Test +import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver.Companion.reportSuccess +import java.util.concurrent.atomic.AtomicLong +import kotlin.test.assertTrue + +internal class NodeLifecycleEventsDistributorMultiThreadedTest { + + companion object { + private val logger = contextLogger() + } + + private val instance = NodeLifecycleEventsDistributor() + + private val addedCounter = AtomicLong() + + private val eventsDeliveredCounter = AtomicLong() + + @Test + fun addAndDistributeConcurrently() { + + val initialObserversCount = 10 + repeat(initialObserversCount) { instance.add(MyObserver(it)) } + + val operationsCount = 100_000 + val event = NodeLifecycleEvent.BeforeNodeStart(mock()) + val additionFreq = 1000 + val distributionFutures = (1..operationsCount).stream(true).mapToObj { + if(it % additionFreq == 0) { + logger.debug("Adding observer") + instance.add(MyObserver(it)) + addedCounter.incrementAndGet() + logger.info("Progress so far: $it") + } + logger.debug("Distributing event") + instance.distributeEvent(event) + } + distributionFutures.forEach { it.get() } + + with(eventsDeliveredCounter.get()) { + // Greater than original observers times events + assertTrue("$this") { this > initialObserversCount.toLong() * operationsCount } + // Less than ever added observers times events + assertTrue("$this") { this < (initialObserversCount.toLong() + addedCounter.get()) * operationsCount } + } + } + + inner class MyObserver(seqNum: Int) : NodeLifecycleObserver { + override val priority: Int = seqNum % 10 + + override fun update(nodeLifecycleEvent: NodeLifecycleEvent): Try = Try.on { + eventsDeliveredCounter.incrementAndGet() + reportSuccess(nodeLifecycleEvent) + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/java/net/corda/node/services/JavaCordaServiceLifecycle.java b/node/src/integration-test/java/net/corda/node/services/JavaCordaServiceLifecycle.java new file mode 100644 index 0000000000..09fed25bbe --- /dev/null +++ b/node/src/integration-test/java/net/corda/node/services/JavaCordaServiceLifecycle.java @@ -0,0 +1,64 @@ +package net.corda.node.services; + +import co.paralleluniverse.fibers.Suspendable; +import net.corda.core.flows.FlowLogic; +import net.corda.core.flows.StartableByRPC; +import net.corda.core.node.AppServiceHub; +import net.corda.core.node.services.CordaService; +import net.corda.core.node.services.ServiceLifecycleEvent; +import net.corda.core.serialization.SingletonSerializeAsToken; + +import java.util.ArrayList; +import java.util.List; + +import static net.corda.core.node.AppServiceHub.SERVICE_PRIORITY_NORMAL; + +public class JavaCordaServiceLifecycle { + + static final List eventsCaptured = new ArrayList<>(); + + @StartableByRPC + public static class JavaComputeTextLengthThroughCordaService extends FlowLogic { + + private final String text; + + public JavaComputeTextLengthThroughCordaService(String text) { + this.text = text; + } + + @Override + @Suspendable + public Integer call() { + JavaTextLengthComputingService service = getServiceHub().cordaService(JavaTextLengthComputingService.class); + return service.computeLength(text); + } + } + + @CordaService + public static class JavaTextLengthComputingService extends SingletonSerializeAsToken { + + private final AppServiceHub serviceHub; + + public JavaTextLengthComputingService(AppServiceHub serviceHub) { + this.serviceHub = serviceHub; + serviceHub.register(SERVICE_PRIORITY_NORMAL, this::addEvent); + } + + private void addEvent(ServiceLifecycleEvent event) { + + switch (event) { + case STATE_MACHINE_STARTED: + eventsCaptured.add(event); + break; + default: + // Process other typed of events + break; + } + } + + public int computeLength(String text) { + assert !text.isEmpty(); + return text.length(); + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceIssueOnceAtStartupTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceIssueOnceAtStartupTests.kt new file mode 100644 index 0000000000..f3ae99b4dc --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceIssueOnceAtStartupTests.kt @@ -0,0 +1,129 @@ +package net.corda.node.services + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByService +import net.corda.core.identity.Party +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.CordaService +import net.corda.core.node.services.ServiceLifecycleEvent +import net.corda.core.node.services.ServiceLifecycleObserver +import net.corda.core.node.services.Vault +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.finance.DOLLARS +import net.corda.finance.contracts.asset.Cash +import net.corda.finance.flows.AbstractCashFlow +import net.corda.finance.flows.CashIssueAndPaymentFlow +import net.corda.testing.common.internal.eventually +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.node.internal.FINANCE_CORDAPPS +import net.corda.testing.node.internal.enclosedCordapp +import org.junit.After +import org.junit.Test +import java.io.File +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +/** + * The idea of this test is upon start-up of the node check if cash been already issued and if not issue under certain reference. + * If state is already present - do nothing. + */ +class CordaServiceIssueOnceAtStartupTests { + + companion object { + private val armedPropName = this::class.java.enclosingClass.name + "-armed" + private val logger = contextLogger() + private val tempFilePropertyName = this::class.java.enclosingClass.name + "-tmpFile" + private val tmpFile = createTempFile(prefix = tempFilePropertyName) + private const val vaultQueryExecutedMarker = "VaultQueryExecuted" + private const val sentFlowMarker = "SentFlow" + } + + @Test + fun test() { + driver(DriverParameters(startNodesInProcess = false, cordappsForAllNodes = FINANCE_CORDAPPS + enclosedCordapp(), inMemoryDB = false, + systemProperties = mapOf(armedPropName to "true", tempFilePropertyName to tmpFile.absolutePath))) { + var node = startNode(providedName = ALICE_NAME).getOrThrow() + var page: Vault.Page? + eventually(duration = 10.seconds) { + page = node.rpc.vaultQuery(Cash.State::class.java) + assertTrue(page!!.states.isNotEmpty()) + assertEquals(listOf(vaultQueryExecutedMarker, sentFlowMarker), tmpFile.readLines(), "First start tracker") + } + node.stop() + node = startNode(providedName = ALICE_NAME).getOrThrow() + eventually(duration = 10.seconds) { + assertEquals(listOf(vaultQueryExecutedMarker, sentFlowMarker, vaultQueryExecutedMarker), + tmpFile.readLines(), "Re-start tracker") + } + } + } + + @After + fun testDown() { + System.clearProperty(armedPropName) + tmpFile.delete() + } + + @CordaService + @Suppress("unused") + class IssueAndPayOnceService(private val services: AppServiceHub) : SingletonSerializeAsToken() { + + init { + // There are some "greedy" tests that may take on the package of this test and include it into the CorDapp they assemble + // Without the "secret" property service upon instantiation will be subscribed to lifecycle events which would be unwanted. + // Also do not do this for Notary + val myName = services.myInfo.legalIdentities.single().name + val notaryName = services.networkMapCache.notaryIdentities.single().name + if(java.lang.Boolean.getBoolean(armedPropName) && myName != notaryName) { + services.register(observer = MyServiceLifecycleObserver()) + } else { + logger.info("Skipping lifecycle events registration for $myName") + } + } + + inner class MyServiceLifecycleObserver : ServiceLifecycleObserver { + override fun onServiceLifecycleEvent(event: ServiceLifecycleEvent) { + val tmpFile = File(System.getProperty(tempFilePropertyName)) + if (event == ServiceLifecycleEvent.STATE_MACHINE_STARTED) { + val queryResult = services.vaultService.queryBy(Cash.State::class.java) + if (tmpFile.length() == 0L) { + tmpFile.appendText(vaultQueryExecutedMarker) + } else { + tmpFile.appendText("\n" + vaultQueryExecutedMarker) + } + + if(queryResult.states.isEmpty()) { + val issueAndPayResult = services.startFlow( + IssueAndPayByServiceFlow( + services.myInfo.legalIdentities.single(), services.networkMapCache.notaryIdentities.single())) + .returnValue.getOrThrow() + logger.info("Cash issued and paid: $issueAndPayResult") + tmpFile.appendText("\n" + sentFlowMarker) + } + } + } + } + } + + /** + * The only purpose to have this is to be able to have annotation: [StartableByService] + */ + @StartableByService + class IssueAndPayByServiceFlow(private val recipient: Party, private val notary: Party) : FlowLogic() { + @Suspendable + override fun call(): AbstractCashFlow.Result { + return subFlow(CashIssueAndPaymentFlow(500.DOLLARS, + OpaqueBytes.of(0x01), + recipient, + false, + notary)) + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleFatalTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleFatalTests.kt new file mode 100644 index 0000000000..a5c1aba49b --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleFatalTests.kt @@ -0,0 +1,65 @@ +package net.corda.node.services + +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.CordaService +import net.corda.core.node.services.CordaServiceCriticalFailureException +import net.corda.core.node.services.ServiceLifecycleEvent +import net.corda.core.node.services.ServiceLifecycleObserver +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.getOrThrow +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.node.internal.ListenProcessDeathException +import net.corda.testing.node.internal.enclosedCordapp +import org.junit.Test +import kotlin.test.assertFailsWith + +class CordaServiceLifecycleFatalTests { + + companion object { + + // It is important to disarm throwing of the exception as unfortunately this service may be packaged to many + // test cordaps, e.g. the one used by [net.corda.node.CordappScanningDriverTest] + // If service remains "armed" to throw exceptions this will fail node start-up sequence. + // The problem is caused by the fact that test from `net.corda.node` package also hoovers all the sub-packages. + // Since this is done as a separate process, the trigger is passed through the system property. + const val SECRET_PROPERTY_NAME = "CordaServiceLifecycleFatalTests.armed" + + @CordaService + @Suppress("unused") + class FatalService(services: AppServiceHub) : SingletonSerializeAsToken() { + + init { + services.register(observer = FailingObserver) + } + + fun computeLength(text: String): Int { + require(text.isNotEmpty()) { "Length must be at least 1." } + return text.length + } + } + + object FailingObserver : ServiceLifecycleObserver { + override fun onServiceLifecycleEvent(event: ServiceLifecycleEvent) { + if(java.lang.Boolean.getBoolean(SECRET_PROPERTY_NAME)) { + throw CordaServiceCriticalFailureException("failure") + } + } + } + } + + @Test + fun `JVM terminates on critical failure`() { + // Scenario terminates JVM - node should be running out of process + driver(DriverParameters(startNodesInProcess = false, cordappsForAllNodes = listOf(enclosedCordapp()), + notarySpecs = emptyList(), systemProperties = mapOf(Pair(SECRET_PROPERTY_NAME, "true")))) { + val nodeHandle = startNode(providedName = ALICE_NAME) + // Ensure ample time for all the stat-up lifecycle events to be processed + Thread.sleep(2000) + assertFailsWith(ListenProcessDeathException::class) { + nodeHandle.getOrThrow() + } + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleTests.kt new file mode 100644 index 0000000000..71df448458 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleTests.kt @@ -0,0 +1,74 @@ +package net.corda.node.services + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.messaging.startFlow +import net.corda.core.node.AppServiceHub +import net.corda.core.node.services.CordaService +import net.corda.core.node.services.ServiceLifecycleEvent +import net.corda.core.node.services.ServiceLifecycleEvent.STATE_MACHINE_STARTED +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.getOrThrow +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.node.internal.enclosedCordapp +import org.junit.Test +import kotlin.test.assertEquals + +class CordaServiceLifecycleTests { + + private companion object { + const val TEST_PHRASE = "testPhrase" + + private val eventsCaptured: MutableList = mutableListOf() + } + + @Test + fun `corda service receives events`() { + eventsCaptured.clear() + val result = driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), + notarySpecs = emptyList())) { + val node = startNode(providedName = ALICE_NAME).getOrThrow() + node.rpc.startFlow(::ComputeTextLengthThroughCordaService, TEST_PHRASE).returnValue.getOrThrow() + } + assertEquals(TEST_PHRASE.length, result) + assertEquals(1, eventsCaptured.size) + assertEquals(listOf(STATE_MACHINE_STARTED), eventsCaptured) + } + + @StartableByRPC + class ComputeTextLengthThroughCordaService(private val text: String) : FlowLogic() { + @Suspendable + override fun call(): Int { + val service = serviceHub.cordaService(TextLengthComputingService::class.java) + return service.computeLength(text) + } + } + + @CordaService + @Suppress("unused") + class TextLengthComputingService(services: AppServiceHub) : SingletonSerializeAsToken() { + + init { + services.register { addEvent(it) } + } + + private fun addEvent(event: ServiceLifecycleEvent) { + when (event) { + STATE_MACHINE_STARTED -> { + eventsCaptured.add(event) + } + else -> { + eventsCaptured.add(event) + } + } + } + + fun computeLength(text: String): Int { + require(text.isNotEmpty()) { "Length must be at least 1." } + return text.length + } + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/JavaCordaServiceLifecycleTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/JavaCordaServiceLifecycleTests.kt new file mode 100644 index 0000000000..0c01f0d058 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/JavaCordaServiceLifecycleTests.kt @@ -0,0 +1,31 @@ +package net.corda.node.services + +import net.corda.core.messaging.startFlow +import net.corda.core.node.services.ServiceLifecycleEvent +import net.corda.core.utilities.getOrThrow +import net.corda.node.services.JavaCordaServiceLifecycle.JavaComputeTextLengthThroughCordaService +import net.corda.node.services.JavaCordaServiceLifecycle.eventsCaptured +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import org.junit.Test +import kotlin.test.assertEquals + +class JavaCordaServiceLifecycleTests { + + private companion object { + const val TEST_PHRASE = "javaTestPhrase" + } + + @Test + fun `corda service receives events`() { + eventsCaptured.clear() + val result = driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList())) { + val node = startNode(providedName = ALICE_NAME).getOrThrow() + node.rpc.startFlow(::JavaComputeTextLengthThroughCordaService, TEST_PHRASE).returnValue.getOrThrow() + } + assertEquals(TEST_PHRASE.length, result) + assertEquals(1, eventsCaptured.size) + assertEquals(listOf(ServiceLifecycleEvent.STATE_MACHINE_STARTED), eventsCaptured) + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 4dca027d16..3b2530d9a0 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -19,7 +19,6 @@ import net.corda.core.flows.FlowLogicRefFactory import net.corda.core.flows.InitiatedBy import net.corda.core.flows.NotaryChangeFlow import net.corda.core.flows.NotaryFlow -import net.corda.core.flows.StartableByService import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.AbstractParty import net.corda.core.identity.CordaX500Name @@ -31,7 +30,7 @@ import net.corda.core.internal.NODE_INFO_DIRECTORY import net.corda.core.internal.NamedCacheFactory import net.corda.core.internal.NetworkParametersStorage import net.corda.core.internal.VisibleForTesting -import net.corda.core.internal.concurrent.doneFuture +import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.div @@ -41,10 +40,6 @@ import net.corda.core.internal.rootMessage import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.ClientRpcSslOptions import net.corda.core.messaging.CordaRPCOps -import net.corda.core.messaging.FlowHandle -import net.corda.core.messaging.FlowHandleImpl -import net.corda.core.messaging.FlowProgressHandle -import net.corda.core.messaging.FlowProgressHandleImpl import net.corda.core.messaging.RPCOps import net.corda.core.node.AppServiceHub import net.corda.core.node.NetworkParameters @@ -57,7 +52,6 @@ import net.corda.core.node.services.diagnostics.DiagnosticsService import net.corda.core.node.services.IdentityService import net.corda.core.node.services.KeyManagementService import net.corda.core.node.services.TransactionVerifierService -import net.corda.core.node.services.vault.CordaTransactionSupport import net.corda.core.schemas.MappedSchema import net.corda.core.serialization.SerializationWhitelist import net.corda.core.serialization.SerializeAsToken @@ -65,11 +59,12 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.days -import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.minutes +import net.corda.nodeapi.internal.lifecycle.NodeServicesContext import net.corda.djvm.source.ApiSource import net.corda.djvm.source.EmptyApi import net.corda.djvm.source.UserSource +import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent import net.corda.node.CordaClock import net.corda.node.VersionInfo import net.corda.node.internal.classloading.requireAnnotation @@ -89,6 +84,7 @@ import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.MonitoringService import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.services.api.NodePropertiesStore +import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEventsDistributor import net.corda.node.services.api.SchemaService import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.VaultServiceInternal @@ -122,7 +118,7 @@ import net.corda.node.services.persistence.NodeAttachmentService import net.corda.node.services.persistence.NodePropertiesPersistentStore import net.corda.node.services.persistence.PublicKeyToOwningIdentityCacheImpl import net.corda.node.services.persistence.PublicKeyToTextConverter -import net.corda.node.services.rpc.CheckpointDumper +import net.corda.node.services.rpc.CheckpointDumperImpl import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.statemachine.Event import net.corda.node.services.statemachine.ExternalEvent @@ -170,7 +166,6 @@ import org.apache.activemq.artemis.utils.ReusableLatch import org.jolokia.jvmagent.JolokiaServer import org.jolokia.jvmagent.JolokiaServerConfig import org.slf4j.Logger -import rx.Observable import rx.Scheduler import java.io.IOException import java.lang.reflect.InvocationTargetException @@ -182,7 +177,6 @@ import java.sql.Connection import java.time.Clock import java.time.Duration import java.time.format.DateTimeParseException -import java.util.Objects import java.util.Properties import java.util.concurrent.ExecutorService import java.util.concurrent.Executors @@ -212,7 +206,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected abstract val log: Logger @Suppress("LeakingThis") - private var tokenizableServices: MutableList? = mutableListOf(platformClock, this) + private var tokenizableServices: MutableList? = mutableListOf(platformClock, this) val metricRegistry = MetricRegistry() protected val cacheFactory = cacheFactoryPrototype.bindWithConfig(configuration).bindWithMetrics(metricRegistry).tokenize() @@ -244,7 +238,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration, schemaService, configuration.dataSourceProperties, cacheFactory, - this.cordappLoader.appClassLoader) + cordappLoader.appClassLoader) + + private val transactionSupport = CordaTransactionSupportImpl(database) init { // TODO Break cyclic dependency @@ -358,8 +354,22 @@ abstract class AbstractNode(val configuration: NodeConfiguration, @Volatile private var _started: S? = null + private val checkpointDumper = CheckpointDumperImpl(checkpointStorage, database, services, services.configuration.baseDirectory) + + private val nodeServicesContext = object : NodeServicesContext { + override val platformVersion = versionInfo.platformVersion + override val configurationWithOptions = configuration.configurationWithOptions + // Note: tokenizableServices passed by reference meaning that any subsequent modification to the content in the `AbstractNode` will + // be reflected in the context as well. However, since context only has access to immutable collection it can only read (but not modify) + // the content. + override val tokenizableServices: List = this@AbstractNode.tokenizableServices!! + } + + private val nodeLifecycleEventsDistributor = NodeLifecycleEventsDistributor().apply { add(checkpointDumper) } + private fun T.tokenize(): T { - tokenizableServices?.add(this) + tokenizableServices?.add(this as? SerializeAsToken ?: + throw IllegalStateException("${this::class.java} is expected to be extending from SerializeAsToken")) ?: throw IllegalStateException("The tokenisable services list has already been finalised") return this } @@ -370,7 +380,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } /** The implementation of the [CordaRPCOps] interface used by this node. */ - open fun makeRPCOps(cordappLoader: CordappLoader, checkpointDumper: CheckpointDumper): CordaRPCOps { + open fun makeRPCOps(cordappLoader: CordappLoader, checkpointDumper: CheckpointDumperImpl): CordaRPCOps { val ops: InternalCordaRPCOps = CordaRPCOpsImpl( services, smm, @@ -429,6 +439,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, if (configuration.devMode && System.getProperty("co.paralleluniverse.fibers.verifyInstrumentation") == null) { System.setProperty("co.paralleluniverse.fibers.verifyInstrumentation", "true") } + nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.BeforeNodeStart(nodeServicesContext)) log.info("Node starting up ...") val trustRoot = initKeyStores() @@ -443,7 +454,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, installCoreFlows() registerCordappFlows() services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows } - val checkpointDumper = CheckpointDumper(checkpointStorage, database, services, services.configuration.baseDirectory) val rpcOps = makeRPCOps(cordappLoader, checkpointDumper) startShell() networkMapClient?.start(trustRoot) @@ -485,7 +495,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } // Do all of this in a database transaction so anything that might need a connection has one. - return database.transaction(recoverableFailureTolerance = 0) { + val (resultingNodeInfo, readyFuture) = database.transaction(recoverableFailureTolerance = 0) { networkParametersStorage.setCurrentParameters(signedNetParams, trustRoot) identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts) attachments.start() @@ -505,21 +515,33 @@ abstract class AbstractNode(val configuration: NodeConfiguration, tokenizableServices = null verifyCheckpointsCompatible(frozenTokenizableServices) - checkpointDumper.start(frozenTokenizableServices) - smm.start(frozenTokenizableServices) + val smmStartedFuture = smm.start(frozenTokenizableServices) // Shut down the SMM so no Fibers are scheduled. runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } val flowMonitor = FlowMonitor( - smm, - configuration.flowMonitorPeriodMillis, - configuration.flowMonitorSuspensionLoggingThresholdMillis + smm, + configuration.flowMonitorPeriodMillis, + configuration.flowMonitorSuspensionLoggingThresholdMillis ) runOnStop += flowMonitor::stop flowMonitor.start() schedulerService.start() - createStartedNode(nodeInfo, rpcOps, notaryService).also { _started = it } + val resultingNodeInfo = createStartedNode(nodeInfo, rpcOps, notaryService).also { _started = it } + val readyFuture = smmStartedFuture.flatMap { + log.debug("SMM ready") + network.ready + } + resultingNodeInfo to readyFuture } + + readyFuture.map { + // NB: Dispatch lifecycle events outside of transaction to ensure attachments and the like persisted into the DB + log.debug("Distributing events") + nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.AfterNodeStart(nodeServicesContext)) + nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.StateMachineStarted(nodeServicesContext)) + } + return resultingNodeInfo } /** Subclasses must override this to create a "started" node of the desired type, using the provided machinery. */ @@ -747,60 +769,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } - /** - * This customizes the ServiceHub for each CordaService that is initiating flows. - */ - // TODO Move this into its own file - private class AppServiceHubImpl(private val serviceHub: ServiceHub, private val flowStarter: FlowStarter, - override val database: CordaTransactionSupport) : AppServiceHub, ServiceHub by serviceHub { - lateinit var serviceInstance: T - override fun startTrackedFlow(flow: FlowLogic): FlowProgressHandle { - val stateMachine = startFlowChecked(flow) - return FlowProgressHandleImpl( - id = stateMachine.id, - returnValue = stateMachine.resultFuture, - progress = stateMachine.logic.track()?.updates ?: Observable.empty() - ) - } - - override fun startFlow(flow: FlowLogic): FlowHandle { - val parentFlow = FlowLogic.currentTopLevel - return if (parentFlow != null) { - val result = parentFlow.subFlow(flow) - // Accessing the flow id must happen after the flow has started. - val flowId = flow.runId - FlowHandleImpl(flowId, doneFuture(result)) - } else { - val stateMachine = startFlowChecked(flow) - FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture) - } - } - - private fun startFlowChecked(flow: FlowLogic): FlowStateMachine { - val logicType = flow.javaClass - require(logicType.isAnnotationPresent(StartableByService::class.java)) { "${logicType.name} was not designed for starting by a CordaService" } - // TODO check service permissions - // TODO switch from myInfo.legalIdentities[0].name to current node's identity as soon as available - val context = InvocationContext.service(serviceInstance.javaClass.name, myInfo.legalIdentities[0].name) - return flowStarter.startFlow(flow, context).getOrThrow() - } - - override fun equals(other: Any?): Boolean { - if (this === other) return true - if (other !is AppServiceHubImpl<*>) return false - return serviceHub == other.serviceHub - && flowStarter == other.flowStarter - && serviceInstance == other.serviceInstance - } - - override fun hashCode() = Objects.hash(serviceHub, flowStarter, serviceInstance) - } - fun installCordaService(serviceClass: Class): T { serviceClass.requireAnnotation() val service = try { - val serviceContext = AppServiceHubImpl(services, flowStarter, CordaTransactionSupportImpl(database)) + val serviceContext = AppServiceHubImpl(services, flowStarter, transactionSupport, nodeLifecycleEventsDistributor) val extendedServiceConstructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java).apply { isAccessible = true } val service = extendedServiceConstructor.newInstance(serviceContext) serviceContext.serviceInstance = service @@ -950,6 +923,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } open fun stop() { + + nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.StateMachineStopped(nodeServicesContext)) + nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.BeforeNodeStop(nodeServicesContext)) + // TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the // network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop() // to indicate "Please shut down gracefully" vs "Shut down now". @@ -963,6 +940,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, runOnStop.clear() shutdownExecutor.shutdown() _started = null + nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.AfterNodeStop(nodeServicesContext)) } protected abstract fun makeMessagingService(): MessagingService diff --git a/node/src/main/kotlin/net/corda/node/internal/AppServiceHubImpl.kt b/node/src/main/kotlin/net/corda/node/internal/AppServiceHubImpl.kt new file mode 100644 index 0000000000..9c90173cb2 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/AppServiceHubImpl.kt @@ -0,0 +1,124 @@ +package net.corda.node.internal + +import net.corda.core.context.InvocationContext +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByService +import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.concurrent.doneFuture +import net.corda.core.messaging.FlowHandle +import net.corda.core.messaging.FlowHandleImpl +import net.corda.core.messaging.FlowProgressHandle +import net.corda.core.messaging.FlowProgressHandleImpl +import net.corda.core.node.AppServiceHub +import net.corda.core.node.ServiceHub +import net.corda.core.node.services.ServiceLifecycleEvent +import net.corda.core.node.services.ServiceLifecycleObserver +import net.corda.core.node.services.vault.CordaTransactionSupport +import net.corda.core.serialization.SerializeAsToken +import net.corda.core.utilities.Try +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.getOrThrow +import net.corda.node.services.api.FlowStarter +import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent +import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEventsDistributor +import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver +import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver.Companion.reportSuccess +import rx.Observable +import java.util.* + +/** + * This customizes the ServiceHub for each [net.corda.core.node.services.CordaService] that is initiating flows. + */ +internal class AppServiceHubImpl(private val serviceHub: ServiceHub, private val flowStarter: FlowStarter, + override val database: CordaTransactionSupport, + private val nodeLifecycleEventsDistributor: NodeLifecycleEventsDistributor) + : AppServiceHub, ServiceHub by serviceHub { + + companion object { + + private val logger = contextLogger() + + private class NodeLifecycleServiceObserverAdaptor(private val observer: ServiceLifecycleObserver, override val priority: Int) : NodeLifecycleObserver { + override fun update(nodeLifecycleEvent: NodeLifecycleEvent): Try { + return when(nodeLifecycleEvent) { + is NodeLifecycleEvent.StateMachineStarted<*> -> Try.on { + observer.onServiceLifecycleEvent(ServiceLifecycleEvent.STATE_MACHINE_STARTED) + reportSuccess(nodeLifecycleEvent) + } + else -> super.update(nodeLifecycleEvent) + } + } + } + } + + lateinit var serviceInstance: T + + @Volatile + private var flowsAllowed = false + + init { + nodeLifecycleEventsDistributor.add(object : NodeLifecycleObserver { + + override val priority: Int = AppServiceHub.SERVICE_PRIORITY_HIGH + + override fun update(nodeLifecycleEvent: NodeLifecycleEvent): Try { + return when(nodeLifecycleEvent) { + is NodeLifecycleEvent.StateMachineStarted<*> -> Try.on { + flowsAllowed = true + reportSuccess(nodeLifecycleEvent) + } + else -> super.update(nodeLifecycleEvent) + } + } + }) + } + + override fun startTrackedFlow(flow: FlowLogic): FlowProgressHandle { + val stateMachine = startFlowChecked(flow) + return FlowProgressHandleImpl( + id = stateMachine.id, + returnValue = stateMachine.resultFuture, + progress = stateMachine.logic.track()?.updates ?: Observable.empty() + ) + } + + override fun startFlow(flow: FlowLogic): FlowHandle { + val parentFlow = FlowLogic.currentTopLevel + return if (parentFlow != null) { + val result = parentFlow.subFlow(flow) + // Accessing the flow id must happen after the flow has started. + val flowId = flow.runId + FlowHandleImpl(flowId, doneFuture(result)) + } else { + val stateMachine = startFlowChecked(flow) + FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture) + } + } + + private fun startFlowChecked(flow: FlowLogic): FlowStateMachine { + val logicType = flow.javaClass + require(logicType.isAnnotationPresent(StartableByService::class.java)) { "${logicType.name} was not designed for starting by a CordaService" } + // TODO check service permissions + // TODO switch from myInfo.legalIdentities[0].name to current node's identity as soon as available + if(!flowsAllowed) { + logger.warn("Flow $flow started too early in the node's lifecycle, SMM may not be ready yet. " + + "Please consider registering your service to node's lifecycle event: `STATE_MACHINE_STARTED`") + } + val context = InvocationContext.service(serviceInstance.javaClass.name, myInfo.legalIdentities[0].name) + return flowStarter.startFlow(flow, context).getOrThrow() + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is AppServiceHubImpl<*>) return false + return serviceHub == other.serviceHub + && flowStarter == other.flowStarter + && serviceInstance == other.serviceInstance + } + + override fun hashCode() = Objects.hash(serviceHub, flowStarter, serviceInstance) + + override fun register(priority: Int, observer: ServiceLifecycleObserver) { + nodeLifecycleEventsDistributor.add(NodeLifecycleServiceObserverAdaptor(observer, priority)) + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index d88d5f2dbf..5405964ee1 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -51,7 +51,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.loggerFor import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.ServiceHubInternal -import net.corda.node.services.rpc.CheckpointDumper +import net.corda.node.services.rpc.CheckpointDumperImpl import net.corda.node.services.rpc.context import net.corda.node.services.statemachine.StateMachineManager import net.corda.nodeapi.exceptions.NonRpcFlowException @@ -72,7 +72,7 @@ internal class CordaRPCOpsImpl( private val services: ServiceHubInternal, private val smm: StateMachineManager, private val flowStarter: FlowStarter, - private val checkpointDumper: CheckpointDumper, + private val checkpointDumper: CheckpointDumperImpl, private val shutdownNode: () -> Unit ) : InternalCordaRPCOps, AutoCloseable { @@ -154,7 +154,7 @@ internal class CordaRPCOpsImpl( return services.validatedTransactions.track() } - override fun dumpCheckpoints() = checkpointDumper.dump() + override fun dumpCheckpoints() = checkpointDumper.dumpCheckpoints() override val attachmentTrustInfos: List get() { diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 56fc46d83b..359d8ad5de 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -633,6 +633,7 @@ open class Node(configuration: NodeConfiguration, /** Starts a blocking event loop for message dispatch. */ fun run() { internalRpcMessagingClient?.start(rpcBroker!!.serverControl) + printBasicNodeInfo("Running P2PMessaging loop") (network as P2PMessagingClient).run() } diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 2855b2935a..0fe654d6c9 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -2,6 +2,7 @@ package net.corda.node.services.config import com.typesafe.config.Config import net.corda.common.configuration.parsing.internal.Configuration +import net.corda.common.configuration.parsing.internal.ConfigurationWithOptionsContainer import net.corda.common.validation.internal.Validated import net.corda.core.context.AuthServiceId import net.corda.core.identity.CordaX500Name @@ -25,7 +26,7 @@ import javax.security.auth.x500.X500Principal val Int.MB: Long get() = this * 1024L * 1024L -interface NodeConfiguration { +interface NodeConfiguration : ConfigurationWithOptionsContainer { val myLegalName: CordaX500Name val emailAddress: String val jmxMonitoringHttpPort: Int? diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt index 625e94e476..437fa65468 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt @@ -1,6 +1,7 @@ package net.corda.node.services.config import com.typesafe.config.ConfigException +import net.corda.common.configuration.parsing.internal.ConfigurationWithOptions import net.corda.core.identity.CordaX500Name import net.corda.core.internal.div import net.corda.core.utilities.NetworkHostAndPort @@ -79,7 +80,8 @@ data class NodeConfigurationImpl( override val cordappSignerKeyFingerprintBlacklist: List = Defaults.cordappSignerKeyFingerprintBlacklist, override val networkParameterAcceptanceSettings: NetworkParameterAcceptanceSettings? = Defaults.networkParameterAcceptanceSettings, - override val blacklistedAttachmentSigningKeys: List = Defaults.blacklistedAttachmentSigningKeys + override val blacklistedAttachmentSigningKeys: List = Defaults.blacklistedAttachmentSigningKeys, + override val configurationWithOptions: ConfigurationWithOptions ) : NodeConfiguration { internal object Defaults { val jmxMonitoringHttpPort: Int? = null diff --git a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt index d59d3c6ad9..4a99d373db 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt @@ -125,7 +125,8 @@ internal object V1NodeConfigurationSpec : Configuration.Specification } fun MessagingService.send(topicSession: String, payload: Any, to: MessageRecipients, deduplicationId: SenderDeduplicationId = SenderDeduplicationId(DeduplicationId.createRandom(newSecureRandom()), ourSenderUUID), additionalHeaders: Map = emptyMap()) = send(createMessage(topicSession, payload.serialize().bytes, deduplicationId, additionalHeaders), to) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index eec97da964..713eba071e 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -7,6 +7,8 @@ import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.CordaX500Name import net.corda.core.internal.NamedCacheFactory import net.corda.core.internal.ThreadBox +import net.corda.core.internal.concurrent.OpenFuture +import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient @@ -143,6 +145,8 @@ class P2PMessagingClient(val config: NodeConfiguration, private val deduplicator = P2PMessageDeduplicator(cacheFactory, database) internal var messagingExecutor: MessagingExecutor? = null + override val ready: OpenFuture = openFuture() + /** * @param myIdentity The primary identity of the node, which defines the messaging address for externally received messages. * It is also used to construct the myAddress field, which is ultimately advertised in the network map. @@ -351,6 +355,9 @@ class P2PMessagingClient(val config: NodeConfiguration, p2pConsumer!! } consumer.start() + log.debug("Signalling ready") + ready.set(null) + log.debug("Awaiting on latch") latch.await() } finally { shutdownLatch.countDown() diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt similarity index 88% rename from node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt rename to node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt index 870c649c77..513b7b487d 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt @@ -28,6 +28,7 @@ import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.* +import net.corda.core.node.AppServiceHub.Companion.SERVICE_PRIORITY_NORMAL import net.corda.core.node.ServiceHub import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializedBytes @@ -37,7 +38,11 @@ import net.corda.core.serialization.internal.CheckpointSerializationDefaults import net.corda.core.serialization.internal.checkpointDeserialize import net.corda.core.utilities.NonEmptySet import net.corda.core.utilities.ProgressTracker +import net.corda.core.utilities.Try import net.corda.core.utilities.contextLogger +import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent +import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver +import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver.Companion.reportSuccess import net.corda.node.internal.NodeStartup import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.statemachine.* @@ -56,12 +61,15 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.zip.ZipEntry import java.util.zip.ZipOutputStream -class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence, private val serviceHub: ServiceHub, val baseDirectory: Path) { +class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence, + private val serviceHub: ServiceHub, val baseDirectory: Path) : NodeLifecycleObserver { companion object { internal val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(UTC) private val log = contextLogger() } + override val priority: Int = SERVICE_PRIORITY_NORMAL + private val lock = AtomicInteger(0) private lateinit var checkpointSerializationContext: CheckpointSerializationContext @@ -71,32 +79,38 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private checkpointAgentRunning() } - fun start(tokenizableServices: List) { - checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext( - CheckpointSerializeAsTokenContextImpl( - tokenizableServices, - CheckpointSerializationDefaults.CHECKPOINT_SERIALIZER, - CheckpointSerializationDefaults.CHECKPOINT_CONTEXT, - serviceHub + override fun update(nodeLifecycleEvent: NodeLifecycleEvent): Try { + return when(nodeLifecycleEvent) { + is NodeLifecycleEvent.AfterNodeStart<*> -> Try.on { + checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext( + CheckpointSerializeAsTokenContextImpl( + nodeLifecycleEvent.nodeServicesContext.tokenizableServices, + CheckpointSerializationDefaults.CHECKPOINT_SERIALIZER, + CheckpointSerializationDefaults.CHECKPOINT_CONTEXT, + serviceHub + ) ) - ) - val mapper = JacksonSupport.createNonRpcMapper() - mapper.registerModule(SimpleModule().apply { - setSerializerModifier(CheckpointDumperBeanModifier) - addSerializer(FlowSessionImplSerializer) - addSerializer(MapSerializer) - addSerializer(AttachmentSerializer) - setMixInAnnotation(FlowLogic::class.java, FlowLogicMixin::class.java) - setMixInAnnotation(SessionId::class.java, SessionIdMixin::class.java) - }) - val prettyPrinter = DefaultPrettyPrinter().apply { - indentArraysWith(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE) + val mapper = JacksonSupport.createNonRpcMapper() + mapper.registerModule(SimpleModule().apply { + setSerializerModifier(CheckpointDumperBeanModifier) + addSerializer(FlowSessionImplSerializer) + addSerializer(MapSerializer) + addSerializer(AttachmentSerializer) + setMixInAnnotation(FlowLogic::class.java, FlowLogicMixin::class.java) + setMixInAnnotation(SessionId::class.java, SessionIdMixin::class.java) + }) + val prettyPrinter = DefaultPrettyPrinter().apply { + indentArraysWith(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE) + } + writer = mapper.writer(prettyPrinter) + reportSuccess(nodeLifecycleEvent) + } + else -> super.update(nodeLifecycleEvent) } - writer = mapper.writer(prettyPrinter) } - fun dump() { + fun dumpCheckpoints() { val now = serviceHub.clock.instant() val file = baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "checkpoints_dump-${TIME_FORMATTER.format(now)}.zip" try { @@ -393,7 +407,7 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private private object MapSerializer : JsonSerializer>() { override fun serialize(map: Map, gen: JsonGenerator, serializers: SerializerProvider) { gen.writeStartArray(map.size) - map.forEach { key, value -> + map.forEach { (key, value) -> gen.jsonObject { writeObjectField("key", key) writeObjectField("value", value) @@ -404,4 +418,4 @@ class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private override fun handledType(): Class> = uncheckedCast(Map::class.java) } -} +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 6252c9dcdd..34c9b77582 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -131,7 +131,7 @@ class SingleThreadedStateMachineManager( */ override val changes: Observable = mutex.content.changesPublisher - override fun start(tokenizableServices: List) { + override fun start(tokenizableServices: List) : CordaFuture { checkQuasarJavaAgentPresence() val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext( CheckpointSerializeAsTokenContextImpl( @@ -153,7 +153,7 @@ class SingleThreadedStateMachineManager( (fiber as FlowStateMachineImpl<*>).logger.warn("Caught exception from flow", throwable) } } - serviceHub.networkMapCache.nodeReady.then { + return serviceHub.networkMapCache.nodeReady.map { logger.info("Node ready, info: ${serviceHub.myInfo}") resumeRestoredFlows(fibers) flowMessaging.start { _, deduplicationHandler -> diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index f7c10d9ecd..18ef3ce546 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -10,6 +10,7 @@ import net.corda.core.utilities.Try import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.ReceivedMessage import rx.Observable +import java.util.concurrent.Future /** * A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects. @@ -31,8 +32,10 @@ import rx.Observable interface StateMachineManager { /** * Starts the state machine manager, loading and starting the state machines in storage. + * + * @return `Future` which completes when SMM is fully started */ - fun start(tokenizableServices: List) + fun start(tokenizableServices: List) : CordaFuture /** * Stops the state machine manager gracefully, waiting until all but [allowedUnsuspendedFiberCount] flows reach the @@ -127,7 +130,7 @@ interface ExternalEvent { val context: InvocationContext /** - * A callback for the state machine to pass back the [Future] associated with the flow start to the submitter. + * A callback for the state machine to pass back the [CordaFuture] associated with the flow start to the submitter. */ fun wireUpFuture(flowFuture: CordaFuture>) diff --git a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt index b37533e404..1238021e9c 100644 --- a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt +++ b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt @@ -1,6 +1,7 @@ package net.corda.node.internal import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.mock import com.nhaarman.mockito_kotlin.whenever import net.corda.core.identity.CordaX500Name import net.corda.core.internal.delete @@ -189,8 +190,8 @@ class NodeTest { rpcSettings = NodeRpcSettings(address = fakeAddress, adminAddress = null, ssl = null), messagingServerAddress = null, notary = null, - flowOverrides = FlowOverrideConfig(listOf()) - + flowOverrides = FlowOverrideConfig(listOf()), + configurationWithOptions = mock() ) } } diff --git a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt index 862c6d6aba..2604a74f9b 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt @@ -1,5 +1,6 @@ package net.corda.node.services.config +import com.nhaarman.mockito_kotlin.mock import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions @@ -313,7 +314,8 @@ class NodeConfigurationImplTest { rpcSettings = rpcSettings, crlCheckSoftFail = true, tlsCertCrlDistPoint = null, - flowOverrides = FlowOverrideConfig(listOf()) + flowOverrides = FlowOverrideConfig(listOf()), + configurationWithOptions = mock() ) } } diff --git a/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperTest.kt b/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt similarity index 83% rename from node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperTest.kt rename to node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt index 37fb63e857..497f2d0203 100644 --- a/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt @@ -16,9 +16,12 @@ import net.corda.core.internal.div import net.corda.core.internal.inputStream import net.corda.core.internal.readFully import net.corda.core.node.ServiceHub +import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.internal.CheckpointSerializationDefaults import net.corda.core.serialization.internal.checkpointSerialize +import net.corda.nodeapi.internal.lifecycle.NodeServicesContext +import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent import net.corda.node.internal.NodeStartup import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.statemachine.Checkpoint @@ -39,7 +42,7 @@ import java.time.Clock import java.time.Instant import java.util.zip.ZipInputStream -class CheckpointDumperTest { +class CheckpointDumperImplTest { @Rule @JvmField @@ -50,12 +53,20 @@ class CheckpointDumperTest { private val currentTimestamp = Instant.parse("2019-12-25T10:15:30.00Z") private val baseDirectory = Files.createTempDirectory("CheckpointDumperTest") private val file = baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / - "checkpoints_dump-${CheckpointDumper.TIME_FORMATTER.format(currentTimestamp)}.zip" + "checkpoints_dump-${CheckpointDumperImpl.TIME_FORMATTER.format(currentTimestamp)}.zip" private lateinit var database: CordaPersistence private lateinit var services: ServiceHub private lateinit var checkpointStorage: DBCheckpointStorage + private val mockAfterStartEvent = { + val nodeServicesContextMock = mock() + whenever(nodeServicesContextMock.tokenizableServices).doReturn(emptyList()) + val eventMock = mock>() + whenever(eventMock.nodeServicesContext).doReturn(nodeServicesContextMock) + eventMock + }() + @Before fun setUp() { val (db, mockServices) = MockServices.makeTestDatabaseAndPersistentServices( @@ -87,8 +98,8 @@ class CheckpointDumperTest { @Test fun testDumpCheckpoints() { - val dumper = CheckpointDumper(checkpointStorage, database, services, baseDirectory) - dumper.start(emptyList()) + val dumper = CheckpointDumperImpl(checkpointStorage, database, services, baseDirectory) + dumper.update(mockAfterStartEvent) // add a checkpoint val (id, checkpoint) = newCheckpoint() @@ -96,7 +107,7 @@ class CheckpointDumperTest { checkpointStorage.addCheckpoint(id, checkpoint) } - dumper.dump() + dumper.dumpCheckpoints() checkDumpFile() } @@ -113,8 +124,8 @@ class CheckpointDumperTest { // -javaagent:tools/checkpoint-agent/build/libs/checkpoint-agent.jar @Test fun testDumpCheckpointsAndAgentDiagnostics() { - val dumper = CheckpointDumper(checkpointStorage, database, services, Paths.get(".")) - dumper.start(emptyList()) + val dumper = CheckpointDumperImpl(checkpointStorage, database, services, Paths.get(".")) + dumper.update(mockAfterStartEvent) // add a checkpoint val (id, checkpoint) = newCheckpoint() @@ -122,7 +133,7 @@ class CheckpointDumperTest { checkpointStorage.addCheckpoint(id, checkpoint) } - dumper.dump() + dumper.dumpCheckpoints() // check existence of output zip file: checkpoints_dump-.zip // check existence of output agent log: checkpoints_agent-.log } diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt index c4f8a3b7ff..c9cfd400a1 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt @@ -16,6 +16,7 @@ import net.corda.core.node.services.queryBy import net.corda.core.node.services.vault.QueryCriteria.SoftLockingCondition import net.corda.core.node.services.vault.QueryCriteria.SoftLockingType.LOCKED_ONLY import net.corda.core.node.services.vault.QueryCriteria.VaultQueryCriteria +import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.LedgerTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.NonEmptySet @@ -90,7 +91,7 @@ class VaultSoftLockManagerTest { cordappLoader: CordappLoader): VaultServiceInternal { val node = this val realVault = super.makeVaultService(keyManagementService, services, database, cordappLoader) - return object : VaultServiceInternal by realVault { + return object : SingletonSerializeAsToken(), VaultServiceInternal by realVault { override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet?) { // Should be called before flow is removed assertEquals(1, node.started!!.smm.allStateMachines.size) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index 8a54e0ea58..b46bfaeaae 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -499,6 +499,10 @@ fun createMockCordaService(serviceHub: MockServices, serv override val database: CordaTransactionSupport get() = throw UnsupportedOperationException() + + override fun register(priority: Int, observer: ServiceLifecycleObserver) { + throw UnsupportedOperationException() + } } return MockAppServiceHubImpl(serviceHub, serviceConstructor).serviceInstance } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index 30d9e24902..135daafc60 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -2,6 +2,7 @@ package net.corda.testing.node.internal import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.whenever +import net.corda.common.configuration.parsing.internal.ConfigurationWithOptions import net.corda.core.DoNotImplement import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash @@ -616,6 +617,7 @@ private fun mockNodeConfiguration(certificatesDirectory: Path): NodeConfiguratio doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec doReturn(null).whenever(it).devModeOptions doReturn(NetworkParameterAcceptanceSettings()).whenever(it).networkParameterAcceptanceSettings + doReturn(rigorousMock()).whenever(it).configurationWithOptions } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt index bebe27afbd..913d26a2d6 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt @@ -5,6 +5,8 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.identity.PartyAndCertificate import net.corda.core.internal.PLATFORM_VERSION import net.corda.core.internal.ThreadBox +import net.corda.core.internal.concurrent.OpenFuture +import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.MessageRecipients import net.corda.core.node.services.PartyInfo import net.corda.core.serialization.SingletonSerializeAsToken @@ -36,6 +38,8 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration, @Volatile private var running = true + override val ready: OpenFuture = openFuture() + private inner class InnerState { val handlers: MutableList = ArrayList() val pendingRedelivery = LinkedHashSet() @@ -85,6 +89,7 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration, } network.addNotaryIdentity(this, notaryService) + ready.set(null) } override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients {