Merge pull request #1950 from corda/christians_checkpoint_checker_thread

checkpoint checker thread
This commit is contained in:
Christian Sailer 2017-10-27 16:08:10 +01:00 committed by GitHub
commit 8359fe9514
6 changed files with 126 additions and 7 deletions

View File

@ -102,7 +102,7 @@ class NodePerformanceTests {
@Test @Test
fun `self pay rate`() { fun `self pay rate`() {
driver(startNodesInProcess = true) { driver(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance")) {
val a = startNotaryNode( val a = startNotaryNode(
DUMMY_NOTARY.name, DUMMY_NOTARY.name,
rpcUsers = listOf(User("A", "A", setOf(startFlowPermission<CashIssueFlow>(), startFlowPermission<CashPaymentFlow>()))) rpcUsers = listOf(User("A", "A", setOf(startFlowPermission<CashIssueFlow>(), startFlowPermission<CashPaymentFlow>())))

View File

@ -13,6 +13,8 @@ import java.net.URL
import java.nio.file.Path import java.nio.file.Path
import java.util.* import java.util.*
data class DevModeOptions(val disableCheckpointChecker: Boolean?)
interface NodeConfiguration : NodeSSLConfiguration { interface NodeConfiguration : NodeSSLConfiguration {
// myLegalName should be only used in the initial network registration, we should use the name from the certificate instead of this. // myLegalName should be only used in the initial network registration, we should use the name from the certificate instead of this.
// TODO: Remove this so we don't accidentally use this identity in the code? // TODO: Remove this so we don't accidentally use this identity in the code?
@ -30,6 +32,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
val database: Properties? val database: Properties?
val rpcUsers: List<User> val rpcUsers: List<User>
val devMode: Boolean val devMode: Boolean
val devModeOptions: DevModeOptions?
val certificateSigningService: URL val certificateSigningService: URL
val certificateChainCheckPolicies: List<CertChainPolicyConfig> val certificateChainCheckPolicies: List<CertChainPolicyConfig>
val verifierType: VerifierType val verifierType: VerifierType
@ -108,6 +111,7 @@ data class NodeConfigurationImpl(
override val notary: NotaryConfig?, override val notary: NotaryConfig?,
override val certificateChainCheckPolicies: List<CertChainPolicyConfig>, override val certificateChainCheckPolicies: List<CertChainPolicyConfig>,
override val devMode: Boolean = false, override val devMode: Boolean = false,
override val devModeOptions: DevModeOptions? = null,
override val useTestClock: Boolean = false, override val useTestClock: Boolean = false,
override val detectPublicIp: Boolean = true, override val detectPublicIp: Boolean = true,
override val activeMQServer: ActiveMqServerConfiguration, override val activeMQServer: ActiveMqServerConfiguration,
@ -118,6 +122,7 @@ data class NodeConfigurationImpl(
init { init {
// This is a sanity feature do not remove. // This is a sanity feature do not remove.
require(!useTestClock || devMode) { "Cannot use test clock outside of dev mode" } require(!useTestClock || devMode) { "Cannot use test clock outside of dev mode" }
require(devModeOptions == null || devMode) { "Cannot use devModeOptions outside of dev mode" }
require(myLegalName.commonName == null) { "Common name must be null: $myLegalName" } require(myLegalName.commonName == null) { "Common name must be null: $myLegalName" }
require(minimumPlatformVersion >= 1) { "minimumPlatformVersion cannot be less than 1" } require(minimumPlatformVersion >= 1) { "minimumPlatformVersion cannot be less than 1" }
} }

View File

@ -31,12 +31,10 @@ import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.Checkpoint import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.messaging.TopicSession import net.corda.node.services.messaging.TopicSession
import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.*
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.wrapWithDatabaseTransaction
import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl
import net.corda.nodeapi.internal.serialization.withTokenContext import net.corda.nodeapi.internal.serialization.withTokenContext
import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.activemq.artemis.utils.ReusableLatch
@ -46,7 +44,6 @@ import rx.subjects.PublishSubject
import java.io.NotSerializableException import java.io.NotSerializableException
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.TimeUnit.SECONDS
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
@ -92,7 +89,9 @@ class StateMachineManagerImpl(
private val scheduler = FiberScheduler() private val scheduler = FiberScheduler()
private val mutex = ThreadBox(InnerState()) private val mutex = ThreadBox(InnerState())
// This thread (only enabled in dev mode) deserialises checkpoints in the background to shake out bugs in checkpoint restore. // This thread (only enabled in dev mode) deserialises checkpoints in the background to shake out bugs in checkpoint restore.
private val checkpointCheckerThread = if (serviceHub.configuration.devMode) Executors.newSingleThreadExecutor() else null private val checkpointCheckerThread = if (serviceHub.configuration.devMode
&& serviceHub.configuration.devModeOptions?.disableCheckpointChecker != true)
newNamedSingleThreadExecutor("CheckpointChecker") else null
@Volatile private var unrestorableCheckpoints = false @Volatile private var unrestorableCheckpoints = false

View File

@ -0,0 +1,29 @@
package net.corda.node.utilities
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicInteger
/**
* Utility class that allows to give threads arbitrary name prefixes when they are created
* via an executor. It will use an underlying thread factory to create the actual thread
* and then override the thread name with the prefix and an ever increasing number
*/
class NamedThreadFactory(private val name:String, private val underlyingFactory: ThreadFactory) : ThreadFactory{
val threadNumber = AtomicInteger(1)
override fun newThread(runnable: Runnable?): Thread {
val thread = underlyingFactory.newThread(runnable)
thread.name = name + "-" + threadNumber.getAndIncrement()
return thread
}
}
/**
* Create a single thread executor with a NamedThreadFactory based on the default thread factory
* defined in java.util.concurrent.Executors
*/
fun newNamedSingleThreadExecutor(name: String): ExecutorService {
return Executors.newSingleThreadExecutor(NamedThreadFactory(name, Executors.defaultThreadFactory()))
}

View File

@ -0,0 +1,85 @@
package net.corda.node.services.config
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.nodeapi.User
import net.corda.testing.ALICE
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseProperties
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import java.net.URL
import java.nio.file.Paths
import java.util.*
import kotlin.test.assertFalse
class NodeConfigurationImplTest {
@Test
fun `Can't have dev mode options if not in dev mode`() {
val testConfiguration = NodeConfigurationImpl(
baseDirectory = Paths.get("."),
myLegalName = ALICE.name,
networkMapService = null,
emailAddress = "",
keyStorePassword = "cordacadevpass",
trustStorePassword = "trustpass",
dataSourceProperties = makeTestDataSourceProperties(ALICE.name.organisation),
database = makeTestDatabaseProperties(),
certificateSigningService = URL("http://localhost"),
rpcUsers = emptyList(),
verifierType = VerifierType.InMemory,
useHTTPS = false,
p2pAddress = NetworkHostAndPort("localhost", 0),
rpcAddress = NetworkHostAndPort("localhost", 1),
messagingServerAddress = null,
notary = null,
certificateChainCheckPolicies = emptyList(),
devMode = true,
activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)),
additionalNodeInfoPollingFrequencyMsec = 5.seconds.toMillis())
fun configDebugOptions(devMode: Boolean, debugOptions: DevModeOptions?) {
testConfiguration.copy(devMode = devMode, devModeOptions = debugOptions)
}
val debugOptions = DevModeOptions(null)
configDebugOptions(true, debugOptions)
configDebugOptions(true,null)
assertThatThrownBy{configDebugOptions(false, debugOptions)}.hasMessageMatching( "Cannot use devModeOptions outside of dev mode" )
configDebugOptions(false,null)
}
@Test
fun `check devModeOptions flag helper`()
{
val testConfiguration = NodeConfigurationImpl(
baseDirectory = Paths.get("."),
myLegalName = ALICE.name,
networkMapService = null,
emailAddress = "",
keyStorePassword = "cordacadevpass",
trustStorePassword = "trustpass",
dataSourceProperties = makeTestDataSourceProperties(ALICE.name.organisation),
database = makeTestDatabaseProperties(),
certificateSigningService = URL("http://localhost"),
rpcUsers = emptyList(),
verifierType = VerifierType.InMemory,
useHTTPS = false,
p2pAddress = NetworkHostAndPort("localhost", 0),
rpcAddress = NetworkHostAndPort("localhost", 1),
messagingServerAddress = null,
notary = null,
certificateChainCheckPolicies = emptyList(),
devMode = true,
activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)),
additionalNodeInfoPollingFrequencyMsec = 5.seconds.toMillis())
fun configDebugOptions(devMode: Boolean, devModeOptions: DevModeOptions?) : NodeConfiguration {
return testConfiguration.copy(devMode = devMode, devModeOptions = devModeOptions)
}
assertFalse { configDebugOptions(true,null).devModeOptions?.disableCheckpointChecker == true}
assertFalse { configDebugOptions(true,DevModeOptions(null)).devModeOptions?.disableCheckpointChecker == true}
assertFalse { configDebugOptions(true,DevModeOptions(false)).devModeOptions?.disableCheckpointChecker == true}
assert ( configDebugOptions(true,DevModeOptions(true)).devModeOptions?.disableCheckpointChecker == true)
}
}

View File

@ -78,6 +78,7 @@ fun testNodeConfiguration(
doReturn(5).whenever(it).messageRedeliveryDelaySeconds doReturn(5).whenever(it).messageRedeliveryDelaySeconds
doReturn(0L).whenever(it).additionalNodeInfoPollingFrequencyMsec doReturn(0L).whenever(it).additionalNodeInfoPollingFrequencyMsec
doReturn(null).whenever(it).networkMapService doReturn(null).whenever(it).networkMapService
doReturn(null).whenever(it).devModeOptions
doCallRealMethod().whenever(it).certificatesDirectory doCallRealMethod().whenever(it).certificatesDirectory
doCallRealMethod().whenever(it).trustStoreFile doCallRealMethod().whenever(it).trustStoreFile
doCallRealMethod().whenever(it).sslKeystore doCallRealMethod().whenever(it).sslKeystore