mirror of
https://github.com/corda/corda.git
synced 2025-06-17 06:38:21 +00:00
Retire verifierDriver serialization init. (#2026)
This commit is contained in:
@ -1,5 +1,6 @@
|
|||||||
package net.corda.core.internal
|
package net.corda.core.internal
|
||||||
|
|
||||||
|
import net.corda.core.utilities.loggerFor
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
import kotlin.reflect.KProperty
|
import kotlin.reflect.KProperty
|
||||||
|
|
||||||
@ -44,14 +45,31 @@ class ThreadLocalToggleField<T>(name: String) : ToggleField<T>(name) {
|
|||||||
class ThreadLeakException : RuntimeException("Leaked thread detected: ${Thread.currentThread().name}")
|
class ThreadLeakException : RuntimeException("Leaked thread detected: ${Thread.currentThread().name}")
|
||||||
|
|
||||||
class InheritableThreadLocalToggleField<T>(name: String) : ToggleField<T>(name) {
|
class InheritableThreadLocalToggleField<T>(name: String) : ToggleField<T>(name) {
|
||||||
|
companion object {
|
||||||
|
private val log = loggerFor<InheritableThreadLocalToggleField<*>>()
|
||||||
|
private fun ThreadLeakException.isProblematic(): Boolean {
|
||||||
|
stackTrace.forEach {
|
||||||
|
// A dying Netty thread's death event restarting the Netty global executor:
|
||||||
|
it.className == "io.netty.util.concurrent.GlobalEventExecutor" && it.methodName == "startThread" && return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private class Holder<T>(value: T) : AtomicReference<T?>(value) {
|
private class Holder<T>(value: T) : AtomicReference<T?>(value) {
|
||||||
fun valueOrDeclareLeak() = get() ?: throw ThreadLeakException()
|
fun valueOrDeclareLeak() = get() ?: throw ThreadLeakException()
|
||||||
|
fun maybeFailFastIfCurrentThreadIsLeaked() {
|
||||||
|
get() != null && return // Current thread isn't leaked.
|
||||||
|
val e = ThreadLeakException()
|
||||||
|
e.isProblematic() && throw e
|
||||||
|
log.warn(e.message) // The exception on value retrieval is still enabled.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private val threadLocal = object : InheritableThreadLocal<Holder<T>?>() {
|
private val threadLocal = object : InheritableThreadLocal<Holder<T>?>() {
|
||||||
override fun childValue(holder: Holder<T>?): Holder<T>? {
|
override fun childValue(holder: Holder<T>?): Holder<T>? {
|
||||||
// The Holder itself may be null due to prior events, a leak is not implied in that case:
|
// The Holder itself may be null due to prior events, a leak is not implied in that case:
|
||||||
holder?.valueOrDeclareLeak() // Fail fast.
|
holder?.maybeFailFastIfCurrentThreadIsLeaked()
|
||||||
return holder // What super does.
|
return holder // What super does.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -212,13 +212,14 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
|||||||
log.info("Connecting to message broker: $serverAddress")
|
log.info("Connecting to message broker: $serverAddress")
|
||||||
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
|
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
|
||||||
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config)
|
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config)
|
||||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
|
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||||
// would be the default and the two lines below can be deleted.
|
// would be the default and the two lines below can be deleted.
|
||||||
locator.connectionTTL = -1
|
connectionTTL = -1
|
||||||
locator.clientFailureCheckPeriod = -1
|
clientFailureCheckPeriod = -1
|
||||||
locator.minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE
|
minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE
|
||||||
locator.isUseGlobalPools = nodeSerializationEnv != null
|
isUseGlobalPools = nodeSerializationEnv != null
|
||||||
|
}
|
||||||
sessionFactory = locator.createSessionFactory()
|
sessionFactory = locator.createSessionFactory()
|
||||||
|
|
||||||
// Login using the node username. The broker will authentiate us as its node (as opposed to another peer)
|
// Login using the node username. The broker will authentiate us as its node (as opposed to another peer)
|
||||||
|
@ -8,6 +8,7 @@ import net.corda.core.identity.CordaX500Name
|
|||||||
import net.corda.core.internal.concurrent.*
|
import net.corda.core.internal.concurrent.*
|
||||||
import net.corda.core.internal.createDirectories
|
import net.corda.core.internal.createDirectories
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
|
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||||
import net.corda.core.transactions.LedgerTransaction
|
import net.corda.core.transactions.LedgerTransaction
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.loggerFor
|
import net.corda.core.utilities.loggerFor
|
||||||
@ -96,7 +97,8 @@ fun <A> verifierDriver(
|
|||||||
)
|
)
|
||||||
),
|
),
|
||||||
coerce = { it },
|
coerce = { it },
|
||||||
dsl = dsl
|
dsl = dsl,
|
||||||
|
initialiseSerialization = false
|
||||||
)
|
)
|
||||||
|
|
||||||
/** A handle for a verifier */
|
/** A handle for a verifier */
|
||||||
@ -210,8 +212,9 @@ data class VerifierDriverDSL(
|
|||||||
driverDSL.shutdownManager.registerShutdown(doneFuture {
|
driverDSL.shutdownManager.registerShutdown(doneFuture {
|
||||||
server.stop()
|
server.stop()
|
||||||
})
|
})
|
||||||
|
val locator = ActiveMQClient.createServerLocatorWithoutHA().apply {
|
||||||
val locator = ActiveMQClient.createServerLocatorWithoutHA()
|
isUseGlobalPools = nodeSerializationEnv != null
|
||||||
|
}
|
||||||
val transport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfig)
|
val transport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfig)
|
||||||
val sessionFactory = locator.createSessionFactory(transport)
|
val sessionFactory = locator.createSessionFactory(transport)
|
||||||
val session = sessionFactory.createSession()
|
val session = sessionFactory.createSession()
|
||||||
|
@ -15,7 +15,9 @@ import net.corda.node.services.config.VerifierType
|
|||||||
import net.corda.testing.ALICE
|
import net.corda.testing.ALICE
|
||||||
import net.corda.testing.ALICE_NAME
|
import net.corda.testing.ALICE_NAME
|
||||||
import net.corda.testing.DUMMY_NOTARY
|
import net.corda.testing.DUMMY_NOTARY
|
||||||
|
import net.corda.testing.SerializationEnvironmentRule
|
||||||
import net.corda.testing.node.NotarySpec
|
import net.corda.testing.node.NotarySpec
|
||||||
|
import org.junit.Rule
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
@ -23,6 +25,10 @@ import kotlin.test.assertNotNull
|
|||||||
import kotlin.test.assertTrue
|
import kotlin.test.assertTrue
|
||||||
|
|
||||||
class VerifierTests {
|
class VerifierTests {
|
||||||
|
@Rule
|
||||||
|
@JvmField
|
||||||
|
val testSerialization = SerializationEnvironmentRule(true)
|
||||||
|
|
||||||
private fun generateTransactions(number: Int): List<LedgerTransaction> {
|
private fun generateTransactions(number: Int): List<LedgerTransaction> {
|
||||||
var currentLedger = GeneratedLedger.empty
|
var currentLedger = GeneratedLedger.empty
|
||||||
val transactions = arrayListOf<WireTransaction>()
|
val transactions = arrayListOf<WireTransaction>()
|
||||||
|
@ -53,6 +53,7 @@ class Verifier {
|
|||||||
require(args.isNotEmpty()) { "Usage: <binary> BASE_DIR_CONTAINING_VERIFIER_CONF" }
|
require(args.isNotEmpty()) { "Usage: <binary> BASE_DIR_CONTAINING_VERIFIER_CONF" }
|
||||||
val baseDirectory = Paths.get(args[0])
|
val baseDirectory = Paths.get(args[0])
|
||||||
val verifierConfig = loadConfiguration(baseDirectory, baseDirectory / "verifier.conf")
|
val verifierConfig = loadConfiguration(baseDirectory, baseDirectory / "verifier.conf")
|
||||||
|
initialiseSerialization()
|
||||||
val locator = ActiveMQClient.createServerLocatorWithHA(
|
val locator = ActiveMQClient.createServerLocatorWithHA(
|
||||||
tcpTransport(ConnectionDirection.Outbound(), verifierConfig.nodeHostAndPort, verifierConfig)
|
tcpTransport(ConnectionDirection.Outbound(), verifierConfig.nodeHostAndPort, verifierConfig)
|
||||||
)
|
)
|
||||||
@ -65,7 +66,6 @@ class Verifier {
|
|||||||
session.close()
|
session.close()
|
||||||
sessionFactory.close()
|
sessionFactory.close()
|
||||||
}
|
}
|
||||||
initialiseSerialization()
|
|
||||||
val consumer = session.createConsumer(VERIFICATION_REQUESTS_QUEUE_NAME)
|
val consumer = session.createConsumer(VERIFICATION_REQUESTS_QUEUE_NAME)
|
||||||
val replyProducer = session.createProducer()
|
val replyProducer = session.createProducer()
|
||||||
consumer.setMessageHandler {
|
consumer.setMessageHandler {
|
||||||
|
Reference in New Issue
Block a user