diff --git a/detekt-baseline.xml b/detekt-baseline.xml index 1c384a0d9e..e777ed7ac8 100644 --- a/detekt-baseline.xml +++ b/detekt-baseline.xml @@ -231,6 +231,7 @@ ForEachOnRange:BFTSmartConfigTests.kt$BFTSmartConfigTests$forEach { n -> assertEquals(1, maxFaultyReplicas(n)) } ForEachOnRange:BFTSmartConfigTests.kt$BFTSmartConfigTests$forEach { n -> assertEquals(2, maxFaultyReplicas(n)) } ForEachOnRange:BFTSmartConfigTests.kt$BFTSmartConfigTests$forEach { n -> assertEquals(n, maxFaultyReplicas(n) + minCorrectReplicas(n)) } + ForEachOnRange:CordaServiceLifecycleFatalTests.kt$CordaServiceLifecycleFatalTests$forEach { logger.info("Rep #$it") // Scenario terminates JVM - node should be running out of process driver(DriverParameters(startNodesInProcess = false, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList(), systemProperties = mapOf(SECRET_PROPERTY_NAME to "true", tempFilePropertyName to tmpFile.absolutePath))) { val nodeHandle = startNode(providedName = ALICE_NAME).getOrThrow() val rpcInterface = nodeHandle.rpc eventually(duration = 60.seconds) { assertEquals(readyToThrowMarker, tmpFile.readLines().last()) } rpcInterface.protocolVersion tmpFile.appendText("\n" + goodToThrowMarker) // We signalled that it is good to throw which will eventually trigger node shutdown and RPC interface no longer working. eventually(duration = 30.seconds) { assertFailsWith(Exception::class) { try { rpcInterface.protocolVersion } catch (ex: Exception) { logger.info("Thrown as expected", ex) throw ex } } } } } ForEachOnRange:HibernateConfigurationTest.kt$HibernateConfigurationTest$forEach { consumeCash(it.DOLLARS) } ForEachOnRange:VaultQueryTests.kt$VaultQueryTestsBase$forEach { val newAllStates = vaultService.queryBy<DummyLinearContract.State>(sorting = sorting, criteria = criteria).states assertThat(newAllStates.groupBy(StateAndRef<*>::ref)).hasSameSizeAs(allStates) assertThat(newAllStates).containsExactlyElementsOf(allStates) } ForEachOnRange:VaultQueryTests.kt$VaultQueryTestsBase$forEach { vaultFiller.fillWithSomeTestLinearStates(1, linearNumber = it.toLong(), linearString = it.toString()) } @@ -2657,7 +2658,7 @@ MaxLineLength:NodeKeystoreCheckTest.kt$NodeKeystoreCheckTest$setPrivateKey(X509Utilities.CORDA_CLIENT_CA, nodeCA.keyPair.private, listOf(badNodeCACert, badRoot), signingCertStore.entryPassword) MaxLineLength:NodeKeystoreCheckTest.kt$NodeKeystoreCheckTest$val badNodeCACert = X509Utilities.createCertificate(CertificateType.NODE_CA, badRoot, badRootKeyPair, ALICE_NAME.x500Principal, nodeCA.keyPair.public) MaxLineLength:NodeKeystoreCheckTest.kt$NodeKeystoreCheckTest$val p2pSslConfig = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory, keyStorePassword = keystorePassword, trustStorePassword = keystorePassword) - MaxLineLength:NodeLifecycleEventsDistributor.kt$NodeLifecycleEventsDistributor + MaxLineLength:NodeLifecycleEventsDistributor.kt$NodeLifecycleEventsDistributor : Closeable MaxLineLength:NodeMonitorModel.kt$NodeMonitorModel${ rpc = CordaRPCClient(nodeHostAndPort).start(username, password, GracefulReconnect()) proxyObservable.value = rpc.proxy // Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates val ( statesSnapshot, vaultUpdates ) = rpc.proxy.vaultTrackBy<ContractState>( QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL), PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE) ) val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ -> statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED }.toSet() val consumedStates = statesSnapshot.states.toSet() - unconsumedStates val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates, references = emptySet()) vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject::onNext) // Transactions val (transactions, newTransactions) = @Suppress("DEPRECATION") rpc.proxy.internalVerifiedTransactionsFeed() newTransactions.startWith(transactions).subscribe(transactionsSubject::onNext) // SM -> TX mapping val (smTxMappings, futureSmTxMappings) = rpc.proxy.stateMachineRecordedTransactionMappingFeed() futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject::onNext) // Parties on network val (parties, futurePartyUpdate) = rpc.proxy.networkMapFeed() futurePartyUpdate.startWith(parties.map(MapChange::Added)).subscribe(networkMapSubject::onNext) val stateMachines = rpc.proxy.stateMachinesSnapshot() notaryIdentities = rpc.proxy.notaryIdentities() // Extract the flow tracking stream // TODO is there a nicer way of doing this? Stream of streams in general results in code like this... // TODO `progressTrackingSubject` doesn't seem to be used anymore - should it be removed? val currentProgressTrackerUpdates = stateMachines.mapNotNull { stateMachine -> ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachine) } val futureProgressTrackerUpdates = stateMachineUpdatesSubject.map { stateMachineUpdate -> if (stateMachineUpdate is StateMachineUpdate.Added) { ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo) ?: Observable.empty<ProgressTrackingEvent>() } else { Observable.empty<ProgressTrackingEvent>() } } // We need to retry, because when flow errors, we unsubscribe from progressTrackingSubject. So we end up with stream of state machine updates and no progress trackers. futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject) } MaxLineLength:NodeNamedCache.kt$DefaultNamedCacheFactory$name.startsWith("RPCSecurityManagerShiroCache_") -> with(security?.authService?.options?.cache!!) { caffeine.maximumSize(maxEntries).expireAfterWrite(expireAfterSecs, TimeUnit.SECONDS) } MaxLineLength:NodeNamedCache.kt$DefaultNamedCacheFactory$open @@ -3961,6 +3962,7 @@ TooGenericExceptionCaught:CordaPersistence.kt$CordaPersistence$e: Exception TooGenericExceptionCaught:CordaRPCClientTest.kt$CordaRPCClientTest$e: Exception TooGenericExceptionCaught:CordaRPCOpsImpl.kt$CordaRPCOpsImpl$e: Exception + TooGenericExceptionCaught:CordaServiceLifecycleFatalTests.kt$CordaServiceLifecycleFatalTests$ex: Exception TooGenericExceptionCaught:CryptoUtilsTest.kt$CryptoUtilsTest$e: Exception TooGenericExceptionCaught:DBNetworkParametersStorage.kt$DBNetworkParametersStorage$e: Exception TooGenericExceptionCaught:DataUploadServlet.kt$DataUploadServlet$e: RuntimeException diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributor.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributor.kt index ba88cf4873..4672ff85ac 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributor.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributor.kt @@ -7,6 +7,7 @@ import net.corda.core.internal.concurrent.openFuture import net.corda.core.node.services.CordaServiceCriticalFailureException import net.corda.core.utilities.Try import net.corda.core.utilities.contextLogger +import java.io.Closeable import java.util.Collections.singleton import java.util.LinkedList import java.util.concurrent.Executors @@ -23,7 +24,7 @@ import kotlin.system.exitProcess * * The class is safe for concurrent use from multiple threads. */ -class NodeLifecycleEventsDistributor { +class NodeLifecycleEventsDistributor : Closeable { companion object { private val log = contextLogger() @@ -43,7 +44,7 @@ class NodeLifecycleEventsDistributor { private val readWriteLock: ReadWriteLock = ReentrantReadWriteLock() private val executor = Executors.newSingleThreadExecutor( - ThreadFactoryBuilder().setNameFormat("NodeLifecycleEventsDistributor-%d").build()) + ThreadFactoryBuilder().setNameFormat("NodeLifecycleEventsDistributor-%d").setDaemon(true).build()) /** * Adds observer to the distribution list. @@ -88,19 +89,24 @@ class NodeLifecycleEventsDistributor { val result = openFuture() - executor.execute { - val orderedSnapshot = if (event.reversedPriority) snapshot.reversed() else snapshot - orderedSnapshot.forEach { - log.debug("Distributing event $event to: $it") - val updateResult = it.update(event) - if (updateResult.isSuccess) { - log.debug("Event $event distribution outcome: $updateResult") - } else { - log.error("Failed to distribute event $event, failure outcome: $updateResult") - handlePossibleFatalTermination(event, updateResult as Try.Failure) - } - } + if(executor.isShutdown || executor.isTerminated) { + log.warn("Not distributing $event as executor been already shutdown. Double close() case?") result.set(null) + } else { + executor.execute { + val orderedSnapshot = if (event.reversedPriority) snapshot.reversed() else snapshot + orderedSnapshot.forEach { + log.debug("Distributing event $event to: $it") + val updateResult = it.update(event) + if (updateResult.isSuccess) { + log.debug("Event $event distribution outcome: $updateResult") + } else { + log.error("Failed to distribute event $event, failure outcome: $updateResult") + handlePossibleFatalTermination(event, updateResult as Try.Failure) + } + } + result.set(null) + } } return result.map { } } @@ -114,6 +120,10 @@ class NodeLifecycleEventsDistributor { } } + override fun close() { + executor.shutdown() + } + /** * Custom implementation vs. using [kotlin.concurrent.withLock] to allow interruption during lock acquisition. */ diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributorMultiThreadedTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributorMultiThreadedTest.kt index d143ba89a4..a5da9f740b 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributorMultiThreadedTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleEventsDistributorMultiThreadedTest.kt @@ -15,8 +15,6 @@ internal class NodeLifecycleEventsDistributorMultiThreadedTest { private val logger = contextLogger() } - private val instance = NodeLifecycleEventsDistributor() - private val addedCounter = AtomicLong() private val eventsDeliveredCounter = AtomicLong() @@ -24,29 +22,32 @@ internal class NodeLifecycleEventsDistributorMultiThreadedTest { @Test fun addAndDistributeConcurrently() { - val initialObserversCount = 10 - repeat(initialObserversCount) { instance.add(MyObserver(it)) } + NodeLifecycleEventsDistributor().use { instance -> - val operationsCount = 100_000 - val event = NodeLifecycleEvent.BeforeNodeStart(mock()) - val additionFreq = 1000 - val distributionFutures = (1..operationsCount).stream(true).mapToObj { - if(it % additionFreq == 0) { - logger.debug("Adding observer") - instance.add(MyObserver(it)) - addedCounter.incrementAndGet() - logger.info("Progress so far: $it") + val initialObserversCount = 10 + repeat(initialObserversCount) { instance.add(MyObserver(it)) } + + val operationsCount = 100_000 + val event = NodeLifecycleEvent.BeforeNodeStart(mock()) + val additionFreq = 1000 + val distributionFutures = (1..operationsCount).stream(true).mapToObj { + if (it % additionFreq == 0) { + logger.debug("Adding observer") + instance.add(MyObserver(it)) + addedCounter.incrementAndGet() + logger.info("Progress so far: $it") + } + logger.debug("Distributing event") + instance.distributeEvent(event) } - logger.debug("Distributing event") - instance.distributeEvent(event) - } - distributionFutures.forEach { it.get() } + distributionFutures.forEach { it.get() } - with(eventsDeliveredCounter.get()) { - // Greater than original observers times events - assertTrue("$this") { this > initialObserversCount.toLong() * operationsCount } - // Less than ever added observers times events - assertTrue("$this") { this < (initialObserversCount.toLong() + addedCounter.get()) * operationsCount } + with(eventsDeliveredCounter.get()) { + // Greater than original observers times events + assertTrue("$this") { this > initialObserversCount.toLong() * operationsCount } + // Less than ever added observers times events + assertTrue("$this") { this < (initialObserversCount.toLong() + addedCounter.get()) * operationsCount } + } } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleFatalTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleFatalTests.kt index a5c1aba49b..1a14828bac 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleFatalTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleFatalTests.kt @@ -6,59 +6,93 @@ import net.corda.core.node.services.CordaServiceCriticalFailureException import net.corda.core.node.services.ServiceLifecycleEvent import net.corda.core.node.services.ServiceLifecycleObserver import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds +import net.corda.testing.common.internal.eventually import net.corda.testing.core.ALICE_NAME import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.driver -import net.corda.testing.node.internal.ListenProcessDeathException import net.corda.testing.node.internal.enclosedCordapp import org.junit.Test +import java.io.File +import kotlin.test.assertEquals import kotlin.test.assertFailsWith class CordaServiceLifecycleFatalTests { companion object { + private val logger = contextLogger() + // It is important to disarm throwing of the exception as unfortunately this service may be packaged to many // test cordaps, e.g. the one used by [net.corda.node.CordappScanningDriverTest] // If service remains "armed" to throw exceptions this will fail node start-up sequence. // The problem is caused by the fact that test from `net.corda.node` package also hoovers all the sub-packages. // Since this is done as a separate process, the trigger is passed through the system property. - const val SECRET_PROPERTY_NAME = "CordaServiceLifecycleFatalTests.armed" + private val SECRET_PROPERTY_NAME = this::class.java.enclosingClass.name + "-armed" + + // Temporaty file used as a latch between two processes + private val tempFilePropertyName = this::class.java.enclosingClass.name + "-tmpFile" + private val tmpFile = createTempFile(prefix = tempFilePropertyName) + private const val readyToThrowMarker = "ReadyToThrow" + private const val goodToThrowMarker = "GoodToThrow" @CordaService @Suppress("unused") class FatalService(services: AppServiceHub) : SingletonSerializeAsToken() { init { - services.register(observer = FailingObserver) - } - - fun computeLength(text: String): Int { - require(text.isNotEmpty()) { "Length must be at least 1." } - return text.length + if(java.lang.Boolean.getBoolean(SECRET_PROPERTY_NAME)) { + services.register(observer = FailingObserver) + } } } object FailingObserver : ServiceLifecycleObserver { override fun onServiceLifecycleEvent(event: ServiceLifecycleEvent) { - if(java.lang.Boolean.getBoolean(SECRET_PROPERTY_NAME)) { - throw CordaServiceCriticalFailureException("failure") + val tmpFile = File(System.getProperty(tempFilePropertyName)) + tmpFile.appendText("\n" + readyToThrowMarker) + eventually(duration = 30.seconds) { + assertEquals(goodToThrowMarker, tmpFile.readLines().last()) } + throw CordaServiceCriticalFailureException("controlled failure") } } } @Test fun `JVM terminates on critical failure`() { - // Scenario terminates JVM - node should be running out of process - driver(DriverParameters(startNodesInProcess = false, cordappsForAllNodes = listOf(enclosedCordapp()), - notarySpecs = emptyList(), systemProperties = mapOf(Pair(SECRET_PROPERTY_NAME, "true")))) { - val nodeHandle = startNode(providedName = ALICE_NAME) - // Ensure ample time for all the stat-up lifecycle events to be processed - Thread.sleep(2000) - assertFailsWith(ListenProcessDeathException::class) { - nodeHandle.getOrThrow() + (1..20).forEach { + + logger.info("Rep #$it") + + // Scenario terminates JVM - node should be running out of process + driver(DriverParameters(startNodesInProcess = false, cordappsForAllNodes = listOf(enclosedCordapp()), + notarySpecs = emptyList(), + systemProperties = mapOf(SECRET_PROPERTY_NAME to "true", tempFilePropertyName to tmpFile.absolutePath))) { + val nodeHandle = startNode(providedName = ALICE_NAME).getOrThrow() + + val rpcInterface = nodeHandle.rpc + eventually(duration = 60.seconds) { + assertEquals(readyToThrowMarker, tmpFile.readLines().last()) + } + + rpcInterface.protocolVersion + + tmpFile.appendText("\n" + goodToThrowMarker) + + // We signalled that it is good to throw which will eventually trigger node shutdown and RPC interface no longer working. + eventually(duration = 30.seconds) { + assertFailsWith(Exception::class) { + try { + rpcInterface.protocolVersion + } catch (ex: Exception) { + logger.info("Thrown as expected", ex) + throw ex + } + } + } } } } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index a51e1cb340..d8f25c1340 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -954,7 +954,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration, runOnStop.clear() shutdownExecutor.shutdown() _started = null - nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.AfterNodeStop(nodeServicesContext)) + nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.AfterNodeStop(nodeServicesContext)).then { + nodeLifecycleEventsDistributor.close() + } } protected abstract fun makeMessagingService(): MessagingService