mirror of
https://github.com/corda/corda.git
synced 2025-03-10 22:44:20 +00:00
ENT-2431 Lay foundations for caching metrics (#3955)
This commit is contained in:
parent
7159dfcb61
commit
965f9ce528
@ -1,5 +1,6 @@
|
||||
package net.corda.client.jfx.model
|
||||
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import javafx.beans.value.ObservableValue
|
||||
import javafx.collections.FXCollections
|
||||
@ -31,7 +32,7 @@ class NetworkIdentityModel {
|
||||
private val rpcProxy by observableValue(NodeMonitorModel::proxyObservable)
|
||||
|
||||
private val identityCache = Caffeine.newBuilder()
|
||||
.buildNamed<PublicKey, ObservableValue<NodeInfo?>>("NetworkIdentityModel_identity", { publicKey ->
|
||||
.buildNamed<PublicKey, ObservableValue<NodeInfo?>>("NetworkIdentityModel_identity", CacheLoader { publicKey: PublicKey ->
|
||||
publicKey.let { rpcProxy.map { it?.cordaRPCOps?.nodeInfoFromParty(AnonymousParty(publicKey)) } }
|
||||
})
|
||||
val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.cordaRPCOps?.notaryIdentities() ?: emptyList()) }, "notaries")
|
||||
|
@ -29,12 +29,6 @@ fun <K, V> Caffeine<in K, in V>.buildNamed(name: String): Cache<K, V> {
|
||||
return this.build<K, V>()
|
||||
}
|
||||
|
||||
fun <K, V> Caffeine<in K, in V>.buildNamed(name: String, loadFunc: (K) -> V): LoadingCache<K, V> {
|
||||
checkCacheName(name)
|
||||
return this.build<K, V>(loadFunc)
|
||||
}
|
||||
|
||||
|
||||
fun <K, V> Caffeine<in K, in V>.buildNamed(name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
|
||||
checkCacheName(name)
|
||||
return this.build<K, V>(loader)
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
@ -13,15 +14,16 @@ import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.network.PersistentNetworkMapCache
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||
import net.corda.node.utilities.TestingNamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.MAX_MESSAGE_SIZE
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import net.corda.testing.node.internal.MOCK_VERSION_INFO
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
|
||||
@ -88,7 +90,7 @@ class ArtemisMessagingTest {
|
||||
}
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
|
||||
networkMapCache = PersistentNetworkMapCache(database, rigorousMock()).apply { start(emptyList()) }
|
||||
networkMapCache = PersistentNetworkMapCache(TestingNamedCacheFactory(), database, rigorousMock()).apply { start(emptyList()) }
|
||||
}
|
||||
|
||||
@After
|
||||
@ -223,6 +225,8 @@ class ArtemisMessagingTest {
|
||||
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
|
||||
database,
|
||||
networkMapCache,
|
||||
MetricRegistry(),
|
||||
TestingNamedCacheFactory(),
|
||||
isDrainingModeOn = { false },
|
||||
drainingModeWasChangedEvents = PublishSubject.create<Pair<Boolean, Boolean>>()).apply {
|
||||
config.configureWithDevSSLCertificate()
|
||||
|
@ -5,6 +5,7 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.internal.schemas.NodeInfoSchemaV1
|
||||
import net.corda.node.services.identity.InMemoryIdentityService
|
||||
import net.corda.node.utilities.TestingNamedCacheFactory
|
||||
import net.corda.nodeapi.internal.DEV_ROOT_CA
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.core.*
|
||||
@ -27,7 +28,7 @@ class PersistentNetworkMapCacheTest {
|
||||
|
||||
private var portCounter = 1000
|
||||
private val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
|
||||
private val charlieNetMapCache = PersistentNetworkMapCache(database, InMemoryIdentityService(trustRoot = DEV_ROOT_CA.certificate))
|
||||
private val charlieNetMapCache = PersistentNetworkMapCache(TestingNamedCacheFactory(), database, InMemoryIdentityService(trustRoot = DEV_ROOT_CA.certificate))
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
|
@ -16,6 +16,7 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.utilities.TestingNamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
@ -155,7 +156,7 @@ class RaftTransactionCommitLogTests {
|
||||
val address = Address(myAddress.host, myAddress.port)
|
||||
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }, NodeSchemaService(includeNotarySchemas = true))
|
||||
databases.add(database)
|
||||
val stateMachineFactory = { RaftTransactionCommitLog(database, Clock.systemUTC(), RaftUniquenessProvider.Companion::createMap) }
|
||||
val stateMachineFactory = { RaftTransactionCommitLog(database, Clock.systemUTC(), { RaftUniquenessProvider.createMap(TestingNamedCacheFactory()) }) }
|
||||
|
||||
val server = CopycatServer.builder(address)
|
||||
.withStateMachine(stateMachineFactory)
|
||||
|
@ -64,10 +64,7 @@ import net.corda.node.services.statemachine.*
|
||||
import net.corda.node.services.transactions.*
|
||||
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.JVMAgentRegistry
|
||||
import net.corda.node.utilities.NamedThreadFactory
|
||||
import net.corda.node.utilities.NodeBuildProperties
|
||||
import net.corda.node.utilities.*
|
||||
import net.corda.nodeapi.internal.NodeInfoAndSigned
|
||||
import net.corda.nodeapi.internal.SignedNodeInfo
|
||||
import net.corda.nodeapi.internal.config.CertificateStore
|
||||
@ -116,6 +113,7 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
|
||||
// TODO Log warning if this node is a notary but not one of the ones specified in the network parameters, both for core and custom
|
||||
abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
val platformClock: CordaClock,
|
||||
cacheFactoryPrototype: NamedCacheFactory,
|
||||
protected val versionInfo: VersionInfo,
|
||||
protected val cordappLoader: CordappLoader,
|
||||
protected val serverThread: AffinityExecutor.ServiceAffinityExecutor,
|
||||
@ -125,6 +123,11 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
|
||||
@Suppress("LeakingThis")
|
||||
private var tokenizableServices: MutableList<Any>? = mutableListOf(platformClock, this)
|
||||
|
||||
protected val metricRegistry = MetricRegistry()
|
||||
protected val cacheFactory = cacheFactoryPrototype.bindWithConfig(configuration).bindWithMetrics(metricRegistry).tokenize()
|
||||
val monitoringService = MonitoringService(metricRegistry).tokenize()
|
||||
|
||||
protected val runOnStop = ArrayList<() -> Any?>()
|
||||
|
||||
init {
|
||||
@ -139,7 +142,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null).tokenize()
|
||||
val identityService = PersistentIdentityService().tokenize()
|
||||
val identityService = PersistentIdentityService(cacheFactory).tokenize()
|
||||
val database: CordaPersistence = createCordaPersistence(
|
||||
configuration.database,
|
||||
identityService::wellKnownPartyFromX500Name,
|
||||
@ -151,13 +154,13 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
// TODO Break cyclic dependency
|
||||
identityService.database = database
|
||||
}
|
||||
val networkMapCache = PersistentNetworkMapCache(database, identityService).tokenize()
|
||||
|
||||
val networkMapCache = PersistentNetworkMapCache(cacheFactory, database, identityService).tokenize()
|
||||
val checkpointStorage = DBCheckpointStorage()
|
||||
@Suppress("LeakingThis")
|
||||
val transactionStorage = makeTransactionStorage(configuration.transactionCacheSizeBytes).tokenize()
|
||||
val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, versionInfo) }
|
||||
private val metricRegistry = MetricRegistry()
|
||||
val attachments = NodeAttachmentService(metricRegistry, database, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound).tokenize()
|
||||
val attachments = NodeAttachmentService(metricRegistry, cacheFactory, database).tokenize()
|
||||
val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments).tokenize()
|
||||
@Suppress("LeakingThis")
|
||||
val keyManagementService = makeKeyManagementService(identityService).tokenize()
|
||||
@ -166,7 +169,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
val vaultService = makeVaultService(keyManagementService, servicesForResolution, database).tokenize()
|
||||
val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
|
||||
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
|
||||
val monitoringService = MonitoringService(metricRegistry).tokenize()
|
||||
val networkMapUpdater = NetworkMapUpdater(
|
||||
networkMapCache,
|
||||
NodeInfoWatcher(
|
||||
@ -314,7 +316,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
servicesForResolution.start(netParams)
|
||||
networkMapCache.start(netParams.notaries)
|
||||
|
||||
startDatabase(metricRegistry)
|
||||
startDatabase()
|
||||
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
|
||||
identityService.start(trustRoot, listOf(identity.certificate, nodeCa))
|
||||
|
||||
@ -708,7 +710,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
}
|
||||
|
||||
protected open fun makeTransactionStorage(transactionCacheSizeBytes: Long): WritableTransactionStorage {
|
||||
return DBTransactionStorage(transactionCacheSizeBytes, database)
|
||||
return DBTransactionStorage(database, cacheFactory)
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@ -768,7 +770,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
// Specific class so that MockNode can catch it.
|
||||
class DatabaseConfigurationException(message: String) : CordaException(message)
|
||||
|
||||
protected open fun startDatabase(metricRegistry: MetricRegistry? = null) {
|
||||
protected open fun startDatabase() {
|
||||
val props = configuration.dataSourceProperties
|
||||
if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
|
||||
database.startHikariPool(props, configuration.database, schemaService.internalSchemas(), metricRegistry)
|
||||
@ -792,7 +794,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
|
||||
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
|
||||
// the identity key. But the infrastructure to make that easy isn't here yet.
|
||||
return PersistentKeyManagementService(identityService, database)
|
||||
return PersistentKeyManagementService(cacheFactory, identityService, database)
|
||||
}
|
||||
|
||||
private fun makeCoreNotaryService(notaryConfig: NotaryConfig, myNotaryIdentity: PartyAndCertificate?): NotaryService {
|
||||
@ -801,7 +803,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
return notaryConfig.run {
|
||||
when {
|
||||
raft != null -> {
|
||||
val uniquenessProvider = RaftUniquenessProvider(configuration.baseDirectory, configuration.p2pSslOptions, database, platformClock, monitoringService.metrics, raft)
|
||||
val uniquenessProvider = RaftUniquenessProvider(configuration.baseDirectory, configuration.p2pSslOptions, database, platformClock, monitoringService.metrics, cacheFactory, raft)
|
||||
(if (validating) ::RaftValidatingNotaryService else ::RaftNonValidatingNotaryService)(services, notaryKey, uniquenessProvider)
|
||||
}
|
||||
bftSMaRt != null -> {
|
||||
@ -942,6 +944,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
override val clock: Clock get() = platformClock
|
||||
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration
|
||||
override val networkMapUpdater: NetworkMapUpdater get() = this@AbstractNode.networkMapUpdater
|
||||
override val cacheFactory: NamedCacheFactory get() = this@AbstractNode.cacheFactory
|
||||
|
||||
private lateinit var _myInfo: NodeInfo
|
||||
override val myInfo: NodeInfo get() = _myInfo
|
||||
|
@ -48,6 +48,7 @@ import net.corda.node.services.messaging.*
|
||||
import net.corda.node.services.rpc.ArtemisRpcBroker
|
||||
import net.corda.node.utilities.AddressUtils
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.DefaultNamedCacheFactory
|
||||
import net.corda.node.utilities.DemoClock
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_SHELL_USER
|
||||
import net.corda.nodeapi.internal.ShutdownHook
|
||||
@ -93,6 +94,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
) : AbstractNode<NodeInfo>(
|
||||
configuration,
|
||||
createClock(configuration),
|
||||
DefaultNamedCacheFactory(),
|
||||
versionInfo,
|
||||
cordappLoader,
|
||||
// Under normal (non-test execution) it will always be "1"
|
||||
@ -195,7 +197,9 @@ open class Node(configuration: NodeConfiguration,
|
||||
database = database,
|
||||
networkMap = networkMapCache,
|
||||
isDrainingModeOn = nodeProperties.flowsDrainingMode::isEnabled,
|
||||
drainingModeWasChangedEvents = nodeProperties.flowsDrainingMode.values
|
||||
drainingModeWasChangedEvents = nodeProperties.flowsDrainingMode.values,
|
||||
metricRegistry = metricRegistry,
|
||||
cacheFactory = cacheFactory
|
||||
)
|
||||
}
|
||||
|
||||
@ -332,7 +336,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
* This is not using the H2 "automatic mixed mode" directly but leans on many of the underpinnings. For more details
|
||||
* on H2 URLs and configuration see: http://www.h2database.com/html/features.html#database_url
|
||||
*/
|
||||
override fun startDatabase(metricRegistry: MetricRegistry?) {
|
||||
override fun startDatabase() {
|
||||
val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url")
|
||||
val h2Prefix = "jdbc:h2:file:"
|
||||
|
||||
@ -369,7 +373,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
super.startDatabase(metricRegistry)
|
||||
super.startDatabase()
|
||||
database.closeOnStop()
|
||||
}
|
||||
|
||||
@ -418,12 +422,13 @@ open class Node(configuration: NodeConfiguration,
|
||||
// https://jolokia.org/agent/jvm.html
|
||||
JmxReporter.forRegistry(registry).inDomain("net.corda").createsObjectNamesWith { _, domain, name ->
|
||||
// Make the JMX hierarchy a bit better organised.
|
||||
val category = name.substringBefore('.')
|
||||
val category = name.substringBefore('.').substringBeforeLast('/')
|
||||
val component = name.substringBefore('.').substringAfterLast('/', "")
|
||||
val subName = name.substringAfter('.', "")
|
||||
if (subName == "")
|
||||
ObjectName("$domain:name=$category")
|
||||
(if (subName == "")
|
||||
ObjectName("$domain:name=$category${if (component.isNotEmpty()) ",component=$component," else ""}")
|
||||
else
|
||||
ObjectName("$domain:type=$category,name=$subName")
|
||||
ObjectName("$domain:type=$category,${if (component.isNotEmpty()) "component=$component," else ""}name=$subName"))
|
||||
}.build().start()
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,7 @@ import net.corda.node.services.network.NetworkMapUpdater
|
||||
import net.corda.node.services.persistence.AttachmentStorageInternal
|
||||
import net.corda.node.services.statemachine.ExternalEvent
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.utilities.NamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import java.security.PublicKey
|
||||
|
||||
@ -132,6 +133,7 @@ interface ServiceHubInternal : ServiceHub {
|
||||
}
|
||||
|
||||
fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>?
|
||||
val cacheFactory: NamedCacheFactory
|
||||
}
|
||||
|
||||
interface FlowStarter {
|
||||
|
@ -10,6 +10,7 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.services.api.IdentityServiceInternal
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.node.utilities.NamedCacheFactory
|
||||
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
|
||||
import net.corda.nodeapi.internal.crypto.x509Certificates
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
@ -29,13 +30,14 @@ import javax.persistence.Lob
|
||||
* cached for efficient lookup.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class PersistentIdentityService : SingletonSerializeAsToken(), IdentityServiceInternal {
|
||||
class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSerializeAsToken(), IdentityServiceInternal {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
|
||||
fun createPKMap(): AppendOnlyPersistentMap<SecureHash, PartyAndCertificate, PersistentIdentity, String> {
|
||||
fun createPKMap(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap<SecureHash, PartyAndCertificate, PersistentIdentity, String> {
|
||||
return AppendOnlyPersistentMap(
|
||||
"PersistentIdentityService_partyByKey",
|
||||
cacheFactory = cacheFactory,
|
||||
name = "PersistentIdentityService_partyByKey",
|
||||
toPersistentEntityKey = { it.toString() },
|
||||
fromPersistentEntity = {
|
||||
Pair(
|
||||
@ -50,9 +52,10 @@ class PersistentIdentityService : SingletonSerializeAsToken(), IdentityServiceIn
|
||||
)
|
||||
}
|
||||
|
||||
fun createX500Map(): AppendOnlyPersistentMap<CordaX500Name, SecureHash, PersistentIdentityNames, String> {
|
||||
fun createX500Map(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap<CordaX500Name, SecureHash, PersistentIdentityNames, String> {
|
||||
return AppendOnlyPersistentMap(
|
||||
"PersistentIdentityService_partyByName",
|
||||
cacheFactory = cacheFactory,
|
||||
name = "PersistentIdentityService_partyByName",
|
||||
toPersistentEntityKey = { it.toString() },
|
||||
fromPersistentEntity = { Pair(CordaX500Name.parse(it.name), SecureHash.parse(it.publicKeyHash)) },
|
||||
toPersistentEntity = { key: CordaX500Name, value: SecureHash ->
|
||||
@ -101,8 +104,8 @@ class PersistentIdentityService : SingletonSerializeAsToken(), IdentityServiceIn
|
||||
// CordaPersistence is not a c'tor parameter to work around the cyclic dependency
|
||||
lateinit var database: CordaPersistence
|
||||
|
||||
private val keyToParties = createPKMap()
|
||||
private val principalToParties = createX500Map()
|
||||
private val keyToParties = createPKMap(cacheFactory)
|
||||
private val principalToParties = createX500Map(cacheFactory)
|
||||
|
||||
fun start(trustRoot: X509Certificate, caCertificates: List<X509Certificate> = emptyList()) {
|
||||
_trustRoot = trustRoot
|
||||
|
@ -6,6 +6,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.node.utilities.NamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
|
||||
@ -25,7 +26,7 @@ import javax.persistence.Lob
|
||||
*
|
||||
* This class needs database transactions to be in-flight during method calls and init.
|
||||
*/
|
||||
class PersistentKeyManagementService(val identityService: PersistentIdentityService,
|
||||
class PersistentKeyManagementService(cacheFactory: NamedCacheFactory, val identityService: PersistentIdentityService,
|
||||
private val database: CordaPersistence) : SingletonSerializeAsToken(), KeyManagementServiceInternal {
|
||||
@Entity
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}our_key_pairs")
|
||||
@ -46,11 +47,15 @@ class PersistentKeyManagementService(val identityService: PersistentIdentityServ
|
||||
}
|
||||
|
||||
private companion object {
|
||||
fun createKeyMap(): AppendOnlyPersistentMap<PublicKey, PrivateKey, PersistentKey, String> {
|
||||
fun createKeyMap(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap<PublicKey, PrivateKey, PersistentKey, String> {
|
||||
return AppendOnlyPersistentMap(
|
||||
"PersistentKeyManagementService_keys",
|
||||
cacheFactory = cacheFactory,
|
||||
name = "PersistentKeyManagementService_keys",
|
||||
toPersistentEntityKey = { it.toStringShort() },
|
||||
fromPersistentEntity = { Pair(Crypto.decodePublicKey(it.publicKey), Crypto.decodePrivateKey(it.privateKey)) },
|
||||
fromPersistentEntity = {
|
||||
Pair(Crypto.decodePublicKey(it.publicKey),
|
||||
Crypto.decodePrivateKey(it.privateKey))
|
||||
},
|
||||
toPersistentEntity = { key: PublicKey, value: PrivateKey ->
|
||||
PersistentKey(key, value)
|
||||
},
|
||||
@ -59,7 +64,7 @@ class PersistentKeyManagementService(val identityService: PersistentIdentityServ
|
||||
}
|
||||
}
|
||||
|
||||
private val keysMap = createKeyMap()
|
||||
private val keysMap = createKeyMap(cacheFactory)
|
||||
|
||||
override fun start(initialKeyPairs: Set<KeyPair>) {
|
||||
initialKeyPairs.forEach { keysMap.addWithDuplicatesAllowed(it.public, it.private) }
|
||||
|
@ -4,6 +4,7 @@ import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.node.services.statemachine.DeduplicationId
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.node.utilities.NamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import java.time.Instant
|
||||
@ -15,17 +16,18 @@ import javax.persistence.Id
|
||||
/**
|
||||
* Encapsulate the de-duplication logic.
|
||||
*/
|
||||
class P2PMessageDeduplicator(private val database: CordaPersistence) {
|
||||
class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val database: CordaPersistence) {
|
||||
// A temporary in-memory set of deduplication IDs and associated high water mark details.
|
||||
// When we receive a message we don't persist the ID immediately,
|
||||
// so we store the ID here in the meantime (until the persisting db tx has committed). This is because Artemis may
|
||||
// redeliver messages to the same consumer if they weren't ACKed.
|
||||
private val beingProcessedMessages = ConcurrentHashMap<DeduplicationId, MessageMeta>()
|
||||
private val processedMessages = createProcessedMessages()
|
||||
private val processedMessages = createProcessedMessages(cacheFactory)
|
||||
|
||||
private fun createProcessedMessages(): AppendOnlyPersistentMap<DeduplicationId, MessageMeta, ProcessedMessage, String> {
|
||||
private fun createProcessedMessages(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap<DeduplicationId, MessageMeta, ProcessedMessage, String> {
|
||||
return AppendOnlyPersistentMap(
|
||||
"P2PMessageDeduplicator_processedMessages",
|
||||
cacheFactory = cacheFactory,
|
||||
name = "P2PMessageDeduplicator_processedMessages",
|
||||
toPersistentEntityKey = { it.toString },
|
||||
fromPersistentEntity = { Pair(DeduplicationId(it.id), MessageMeta(it.insertionTime, it.hash, it.seqNo)) },
|
||||
toPersistentEntity = { key: DeduplicationId, value: MessageMeta ->
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.ThreadBox
|
||||
@ -26,6 +27,7 @@ import net.corda.node.services.statemachine.DeduplicationId
|
||||
import net.corda.node.services.statemachine.ExternalEvent
|
||||
import net.corda.node.services.statemachine.SenderDeduplicationId
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.NamedCacheFactory
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
|
||||
@ -81,6 +83,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
|
||||
private val database: CordaPersistence,
|
||||
private val networkMap: NetworkMapCacheInternal,
|
||||
@Suppress("UNUSED")
|
||||
private val metricRegistry: MetricRegistry,
|
||||
cacheFactory: NamedCacheFactory,
|
||||
private val isDrainingModeOn: () -> Boolean,
|
||||
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>
|
||||
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver {
|
||||
@ -129,7 +134,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
|
||||
private val handlers = ConcurrentHashMap<String, MessageHandler>()
|
||||
|
||||
private val deduplicator = P2PMessageDeduplicator(database)
|
||||
private val deduplicator = P2PMessageDeduplicator(cacheFactory, database)
|
||||
internal var messagingExecutor: MessagingExecutor? = null
|
||||
|
||||
/**
|
||||
|
@ -23,6 +23,7 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.internal.schemas.NodeInfoSchemaV1
|
||||
import net.corda.node.services.api.NetworkMapCacheInternal
|
||||
import net.corda.node.utilities.NamedCacheFactory
|
||||
import net.corda.node.utilities.NonInvalidatingCache
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
|
||||
@ -36,7 +37,8 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/** Database-based network map cache. */
|
||||
@ThreadSafe
|
||||
open class PersistentNetworkMapCache(private val database: CordaPersistence,
|
||||
open class PersistentNetworkMapCache(cacheFactory: NamedCacheFactory,
|
||||
private val database: CordaPersistence,
|
||||
private val identityService: IdentityService) : NetworkMapCacheInternal, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
@ -124,8 +126,8 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence,
|
||||
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]!!
|
||||
|
||||
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(
|
||||
"PersistentNetworkMap_nodesByKey",
|
||||
1024) { key ->
|
||||
cacheFactory = cacheFactory,
|
||||
name = "PersistentNetworkMap_nodesByKey") { key ->
|
||||
database.transaction { queryByIdentityKey(session, key) }
|
||||
}
|
||||
|
||||
@ -144,8 +146,8 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence,
|
||||
}
|
||||
|
||||
private val identityByLegalNameCache = NonInvalidatingCache<CordaX500Name, Optional<PartyAndCertificate>>(
|
||||
"PersistentNetworkMap_idByLegalName",
|
||||
1024) { name ->
|
||||
cacheFactory = cacheFactory,
|
||||
name = "PersistentNetworkMap_idByLegalName") { name ->
|
||||
Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) })
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@ import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.node.services.api.WritableTransactionStorage
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMapBase
|
||||
import net.corda.node.utilities.NamedCacheFactory
|
||||
import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
@ -31,7 +32,7 @@ typealias TxCacheValue = Pair<SerializedBytes<CoreTransaction>, List<Transaction
|
||||
fun TxCacheValue.toSignedTx() = SignedTransaction(this.first, this.second)
|
||||
fun SignedTransaction.toTxCacheValue() = TxCacheValue(this.txBits, this.sigs)
|
||||
|
||||
class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPersistence) : WritableTransactionStorage, SingletonSerializeAsToken() {
|
||||
class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: NamedCacheFactory) : WritableTransactionStorage, SingletonSerializeAsToken() {
|
||||
|
||||
@Entity
|
||||
@Table(name = "${NODE_DATABASE_PREFIX}transactions")
|
||||
@ -49,9 +50,10 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers
|
||||
)
|
||||
|
||||
private companion object {
|
||||
fun createTransactionsMap(maxSizeInBytes: Long)
|
||||
fun createTransactionsMap(cacheFactory: NamedCacheFactory)
|
||||
: AppendOnlyPersistentMapBase<SecureHash, TxCacheValue, DBTransaction, String> {
|
||||
return WeightBasedAppendOnlyPersistentMap<SecureHash, TxCacheValue, DBTransaction, String>(
|
||||
cacheFactory = cacheFactory,
|
||||
name = "DBTransactionStorage_transactions",
|
||||
toPersistentEntityKey = { it.toString() },
|
||||
fromPersistentEntity = {
|
||||
@ -67,7 +69,6 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers
|
||||
}
|
||||
},
|
||||
persistentEntityClass = DBTransaction::class.java,
|
||||
maxWeight = maxSizeInBytes,
|
||||
weighingFunc = { hash, tx -> hash.size + weighTx(tx) }
|
||||
)
|
||||
}
|
||||
@ -86,7 +87,7 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers
|
||||
}
|
||||
}
|
||||
|
||||
private val txStorage = ThreadBox(createTransactionsMap(cacheSizeBytes))
|
||||
private val txStorage = ThreadBox(createTransactionsMap(cacheFactory))
|
||||
|
||||
override fun addTransaction(transaction: SignedTransaction): Boolean = database.transaction {
|
||||
txStorage.locked {
|
||||
|
@ -18,8 +18,8 @@ import net.corda.core.node.services.vault.AttachmentQueryCriteria
|
||||
import net.corda.core.node.services.vault.AttachmentSort
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
|
||||
import net.corda.node.utilities.NamedCacheFactory
|
||||
import net.corda.node.utilities.NonInvalidatingCache
|
||||
import net.corda.node.utilities.NonInvalidatingWeightBasedCache
|
||||
import net.corda.nodeapi.exceptions.DuplicateAttachmentException
|
||||
@ -43,9 +43,8 @@ import javax.persistence.*
|
||||
@ThreadSafe
|
||||
class NodeAttachmentService(
|
||||
metrics: MetricRegistry,
|
||||
private val database: CordaPersistence,
|
||||
attachmentContentCacheSize: Long = NodeConfiguration.defaultAttachmentContentCacheSize,
|
||||
attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound
|
||||
cacheFactory: NamedCacheFactory,
|
||||
private val database: CordaPersistence
|
||||
) : AttachmentStorageInternal, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
@ -206,8 +205,8 @@ class NodeAttachmentService(
|
||||
// a problem somewhere else or this needs to be revisited.
|
||||
|
||||
private val attachmentContentCache = NonInvalidatingWeightBasedCache(
|
||||
cacheFactory = cacheFactory,
|
||||
name = "NodeAttachmentService_attachmentContent",
|
||||
maxWeight = attachmentContentCacheSize,
|
||||
weigher = Weigher<SecureHash, Optional<Pair<Attachment, ByteArray>>> { key, value -> key.size + if (value.isPresent) value.get().second.size else 0 },
|
||||
loadFunction = { Optional.ofNullable(loadAttachmentContent(it)) }
|
||||
)
|
||||
@ -228,10 +227,9 @@ class NodeAttachmentService(
|
||||
}
|
||||
|
||||
private val attachmentCache = NonInvalidatingCache<SecureHash, Optional<Attachment>>(
|
||||
"NodeAttachmentService_attachemnt",
|
||||
attachmentCacheBound) { key ->
|
||||
Optional.ofNullable(createAttachment(key))
|
||||
}
|
||||
cacheFactory = cacheFactory,
|
||||
name = "NodeAttachmentService_attachmentPresence",
|
||||
loadFunction = { key -> Optional.ofNullable(createAttachment(key)) })
|
||||
|
||||
private fun createAttachment(key: SecureHash): Attachment? {
|
||||
val content = attachmentContentCache.get(key)!!
|
||||
|
@ -102,7 +102,8 @@ class BFTNonValidatingNotaryService(
|
||||
|
||||
private fun createMap(): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> {
|
||||
return AppendOnlyPersistentMap(
|
||||
"BFTNonValidatingNotaryService_transactions",
|
||||
cacheFactory = services.cacheFactory,
|
||||
name = "BFTNonValidatingNotaryService_transactions",
|
||||
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
|
||||
fromPersistentEntity = {
|
||||
//TODO null check will become obsolete after making DB/JPA columns not nullable
|
||||
|
@ -11,7 +11,10 @@ import net.corda.core.flows.StateConsumptionDetails
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.notary.*
|
||||
import net.corda.core.internal.notary.AsyncUniquenessProvider
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.core.internal.notary.isConsumedByTheSameTx
|
||||
import net.corda.core.internal.notary.validateTimeWindow
|
||||
import net.corda.core.schemas.PersistentStateRef
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
@ -19,10 +22,10 @@ import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.node.utilities.NamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
import net.corda.nodeapi.internal.persistence.currentDBSession
|
||||
import net.corda.serialization.internal.CordaSerializationEncoding
|
||||
import java.time.Clock
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
@ -33,7 +36,7 @@ import kotlin.concurrent.thread
|
||||
|
||||
/** A RDBMS backed Uniqueness provider */
|
||||
@ThreadSafe
|
||||
class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
|
||||
class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
|
||||
|
||||
@MappedSuperclass
|
||||
class BaseComittedState(
|
||||
@ -80,7 +83,7 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states")
|
||||
class CommittedState(id: PersistentStateRef, consumingTxHash: String) : BaseComittedState(id, consumingTxHash)
|
||||
|
||||
private val commitLog = createMap()
|
||||
private val commitLog = createMap(cacheFactory)
|
||||
|
||||
private val requestQueue = LinkedBlockingQueue<CommitRequest>(requestQueueSize)
|
||||
|
||||
@ -98,9 +101,10 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
|
||||
companion object {
|
||||
private const val requestQueueSize = 100_000
|
||||
private val log = contextLogger()
|
||||
fun createMap(): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> =
|
||||
fun createMap(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> =
|
||||
AppendOnlyPersistentMap(
|
||||
"PersistentUniquenessProvider_transactions",
|
||||
cacheFactory = cacheFactory,
|
||||
name = "PersistentUniquenessProvider_transactions",
|
||||
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
|
||||
fromPersistentEntity = {
|
||||
//TODO null check will become obsolete after making DB/JPA columns not nullable
|
||||
|
@ -28,6 +28,7 @@ import net.corda.core.utilities.debug
|
||||
import net.corda.node.services.config.RaftConfig
|
||||
import net.corda.node.services.transactions.RaftTransactionCommitLog.Commands.CommitTransaction
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.node.utilities.NamedCacheFactory
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
|
||||
@ -55,13 +56,15 @@ class RaftUniquenessProvider(
|
||||
private val db: CordaPersistence,
|
||||
private val clock: Clock,
|
||||
private val metrics: MetricRegistry,
|
||||
private val cacheFactory: NamedCacheFactory,
|
||||
private val raftConfig: RaftConfig
|
||||
) : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
fun createMap(): AppendOnlyPersistentMap<StateRef, Pair<Long, SecureHash>, CommittedState, PersistentStateRef> =
|
||||
fun createMap(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap<StateRef, Pair<Long, SecureHash>, CommittedState, PersistentStateRef> =
|
||||
AppendOnlyPersistentMap(
|
||||
"RaftUniquenessProvider_transactions",
|
||||
cacheFactory = cacheFactory,
|
||||
name = "RaftUniquenessProvider_transactions",
|
||||
toPersistentEntityKey = { PersistentStateRef(it) },
|
||||
fromPersistentEntity = {
|
||||
val txId = it.id.txId
|
||||
@ -109,7 +112,7 @@ class RaftUniquenessProvider(
|
||||
fun start() {
|
||||
log.info("Creating Copycat server, log stored in: ${storagePath.toAbsolutePath()}")
|
||||
val stateMachineFactory = {
|
||||
RaftTransactionCommitLog(db, clock, RaftUniquenessProvider.Companion::createMap)
|
||||
RaftTransactionCommitLog(db, clock, { createMap(cacheFactory) })
|
||||
}
|
||||
val address = raftConfig.nodeAddress.let { Address(it.host, it.port) }
|
||||
val storage = buildStorage(storagePath)
|
||||
|
@ -8,7 +8,7 @@ import java.security.PublicKey
|
||||
|
||||
/** A simple Notary service that does not perform transaction validation */
|
||||
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database)
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database, services.cacheFactory)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow = NonValidatingNotaryFlow(otherPartySession, this)
|
||||
|
||||
|
@ -8,7 +8,7 @@ import java.security.PublicKey
|
||||
|
||||
/** A Notary service that validates the transaction chain of the submitted transaction before committing it */
|
||||
class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database)
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database, services.cacheFactory)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow = ValidatingNotaryFlow(otherPartySession, this)
|
||||
|
||||
|
@ -309,21 +309,20 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
|
||||
|
||||
// Open for tests to override
|
||||
open class AppendOnlyPersistentMap<K, V, E, out EK>(
|
||||
cacheFactory: NamedCacheFactory,
|
||||
name: String,
|
||||
toPersistentEntityKey: (K) -> EK,
|
||||
fromPersistentEntity: (E) -> Pair<K, V>,
|
||||
toPersistentEntity: (key: K, value: V) -> E,
|
||||
persistentEntityClass: Class<E>,
|
||||
cacheBound: Long = 1024
|
||||
persistentEntityClass: Class<E>
|
||||
) : AppendOnlyPersistentMapBase<K, V, E, EK>(
|
||||
toPersistentEntityKey,
|
||||
fromPersistentEntity,
|
||||
toPersistentEntity,
|
||||
persistentEntityClass) {
|
||||
//TODO determine cacheBound based on entity class later or with node config allowing tuning, or using some heuristic based on heap size
|
||||
override val cache = NonInvalidatingCache(
|
||||
cacheFactory = cacheFactory,
|
||||
name = name,
|
||||
bound = cacheBound,
|
||||
loadFunction = { key: K ->
|
||||
// This gets called if a value is read and the cache has no Transactional for this key yet.
|
||||
val value: V? = loadValue(key)
|
||||
@ -355,12 +354,12 @@ open class AppendOnlyPersistentMap<K, V, E, out EK>(
|
||||
|
||||
// Same as above, but with weighted values (e.g. memory footprint sensitive).
|
||||
class WeightBasedAppendOnlyPersistentMap<K, V, E, out EK>(
|
||||
cacheFactory: NamedCacheFactory,
|
||||
name: String,
|
||||
toPersistentEntityKey: (K) -> EK,
|
||||
fromPersistentEntity: (E) -> Pair<K, V>,
|
||||
toPersistentEntity: (key: K, value: V) -> E,
|
||||
persistentEntityClass: Class<E>,
|
||||
maxWeight: Long,
|
||||
weighingFunc: (K, Transactional<V>) -> Int
|
||||
) : AppendOnlyPersistentMapBase<K, V, E, EK>(
|
||||
toPersistentEntityKey,
|
||||
@ -368,8 +367,8 @@ class WeightBasedAppendOnlyPersistentMap<K, V, E, out EK>(
|
||||
toPersistentEntity,
|
||||
persistentEntityClass) {
|
||||
override val cache = NonInvalidatingWeightBasedCache(
|
||||
name,
|
||||
maxWeight = maxWeight,
|
||||
cacheFactory = cacheFactory,
|
||||
name = name,
|
||||
weigher = Weigher { key, value -> weighingFunc(key, value) },
|
||||
loadFunction = { key: K ->
|
||||
val value: V? = loadValue(key)
|
||||
|
@ -0,0 +1,54 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache
|
||||
import net.corda.core.internal.buildNamed
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
|
||||
/**
|
||||
* Allow passing metrics and config to caching implementations.
|
||||
*/
|
||||
interface NamedCacheFactory : SerializeAsToken {
|
||||
/**
|
||||
* Build a new cache factory of the same type that incorporates metrics.
|
||||
*/
|
||||
fun bindWithMetrics(metricRegistry: MetricRegistry): NamedCacheFactory
|
||||
|
||||
/**
|
||||
* Build a new cache factory of the same type that incorporates the associated configuration.
|
||||
*/
|
||||
fun bindWithConfig(nodeConfiguration: NodeConfiguration): NamedCacheFactory
|
||||
|
||||
fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String): Cache<K, V>
|
||||
fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String, loader: CacheLoader<K, V>): LoadingCache<K, V>
|
||||
}
|
||||
|
||||
class DefaultNamedCacheFactory private constructor(private val metricRegistry: MetricRegistry?, private val nodeConfiguration: NodeConfiguration?) : NamedCacheFactory, SingletonSerializeAsToken() {
|
||||
constructor() : this(null, null)
|
||||
|
||||
override fun bindWithMetrics(metricRegistry: MetricRegistry): NamedCacheFactory = DefaultNamedCacheFactory(metricRegistry, this.nodeConfiguration)
|
||||
override fun bindWithConfig(nodeConfiguration: NodeConfiguration): NamedCacheFactory = DefaultNamedCacheFactory(this.metricRegistry, nodeConfiguration)
|
||||
|
||||
override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String): Cache<K, V> {
|
||||
checkNotNull(metricRegistry)
|
||||
checkNotNull(nodeConfiguration)
|
||||
return caffeine.maximumSize(1024).buildNamed<K, V>(name)
|
||||
}
|
||||
|
||||
override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
|
||||
checkNotNull(metricRegistry)
|
||||
checkNotNull(nodeConfiguration)
|
||||
val configuredCaffeine = when (name) {
|
||||
"DBTransactionStorage_transactions" -> caffeine.maximumWeight(nodeConfiguration!!.transactionCacheSizeBytes)
|
||||
"NodeAttachmentService_attachmentContent" -> caffeine.maximumWeight(nodeConfiguration!!.attachmentContentCacheSizeBytes)
|
||||
"NodeAttachmentService_attachmentPresence" -> caffeine.maximumSize(nodeConfiguration!!.attachmentCacheBound)
|
||||
else -> caffeine.maximumSize(1024)
|
||||
}
|
||||
return configuredCaffeine.buildNamed<K, V>(name, loader)
|
||||
}
|
||||
}
|
@ -4,19 +4,18 @@ import com.github.benmanes.caffeine.cache.CacheLoader
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache
|
||||
import com.github.benmanes.caffeine.cache.Weigher
|
||||
import net.corda.core.internal.buildNamed
|
||||
|
||||
class NonInvalidatingCache<K, V> private constructor(
|
||||
val cache: LoadingCache<K, V>
|
||||
) : LoadingCache<K, V> by cache {
|
||||
|
||||
constructor(name: String, bound: Long, loadFunction: (K) -> V) :
|
||||
this(buildCache(name, bound, loadFunction))
|
||||
constructor(cacheFactory: NamedCacheFactory, name: String, loadFunction: (K) -> V) :
|
||||
this(buildCache(cacheFactory, name, loadFunction))
|
||||
|
||||
private companion object {
|
||||
private fun <K, V> buildCache(name: String, bound: Long, loadFunction: (K) -> V): LoadingCache<K, V> {
|
||||
val builder = Caffeine.newBuilder().maximumSize(bound)
|
||||
return builder.buildNamed(name, NonInvalidatingCacheLoader(loadFunction))
|
||||
private fun <K, V> buildCache(cacheFactory: NamedCacheFactory, name: String, loadFunction: (K) -> V): LoadingCache<K, V> {
|
||||
val builder = Caffeine.newBuilder()
|
||||
return cacheFactory.buildNamed(builder, name, NonInvalidatingCacheLoader(loadFunction))
|
||||
}
|
||||
}
|
||||
|
||||
@ -33,13 +32,13 @@ class NonInvalidatingCache<K, V> private constructor(
|
||||
class NonInvalidatingWeightBasedCache<K, V> private constructor(
|
||||
val cache: LoadingCache<K, V>
|
||||
) : LoadingCache<K, V> by cache {
|
||||
constructor (name: String, maxWeight: Long, weigher: Weigher<K, V>, loadFunction: (K) -> V) :
|
||||
this(buildCache(name, maxWeight, weigher, loadFunction))
|
||||
constructor (cacheFactory: NamedCacheFactory, name: String, weigher: Weigher<K, V>, loadFunction: (K) -> V) :
|
||||
this(buildCache(cacheFactory, name, weigher, loadFunction))
|
||||
|
||||
private companion object {
|
||||
private fun <K, V> buildCache(name: String, maxWeight: Long, weigher: Weigher<K, V>, loadFunction: (K) -> V): LoadingCache<K, V> {
|
||||
val builder = Caffeine.newBuilder().maximumWeight(maxWeight).weigher(weigher)
|
||||
return builder.buildNamed(name, NonInvalidatingCache.NonInvalidatingCacheLoader(loadFunction))
|
||||
private fun <K, V> buildCache(cacheFactory: NamedCacheFactory, name: String, weigher: Weigher<K, V>, loadFunction: (K) -> V): LoadingCache<K, V> {
|
||||
val builder = Caffeine.newBuilder().weigher(weigher)
|
||||
return cacheFactory.buildNamed(builder, name, NonInvalidatingCache.NonInvalidatingCacheLoader(loadFunction))
|
||||
}
|
||||
}
|
||||
}
|
@ -8,6 +8,7 @@ import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.node.services.UnknownAnonymousPartyException
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.utilities.TestingNamedCacheFactory
|
||||
import net.corda.nodeapi.internal.crypto.CertificateType
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.crypto.x509Certificates
|
||||
@ -46,7 +47,7 @@ class PersistentIdentityServiceTests {
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
identityService = PersistentIdentityService()
|
||||
identityService = PersistentIdentityService(TestingNamedCacheFactory())
|
||||
database = configureDatabase(
|
||||
makeTestDataSourceProperties(),
|
||||
DatabaseConfig(),
|
||||
@ -218,7 +219,7 @@ class PersistentIdentityServiceTests {
|
||||
identityService.verifyAndRegisterIdentity(anonymousBob)
|
||||
|
||||
// Create new identity service mounted onto same DB
|
||||
val newPersistentIdentityService = PersistentIdentityService().also {
|
||||
val newPersistentIdentityService = PersistentIdentityService(TestingNamedCacheFactory()).also {
|
||||
it.database = database
|
||||
it.start(DEV_ROOT_CA.certificate)
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
import net.corda.node.utilities.TestingNamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
import org.junit.After
|
||||
@ -271,7 +272,8 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) {
|
||||
)
|
||||
|
||||
class TestMap : AppendOnlyPersistentMap<Long, String, PersistentMapEntry, Long>(
|
||||
"ApoendOnlyPersistentMap_test",
|
||||
cacheFactory = TestingNamedCacheFactory(),
|
||||
name = "ApoendOnlyPersistentMap_test",
|
||||
toPersistentEntityKey = { it },
|
||||
fromPersistentEntity = { Pair(it.key, it.value) },
|
||||
toPersistentEntity = { key: Long, value: String ->
|
||||
|
@ -8,8 +8,8 @@ import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.utilities.TestingNamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.core.*
|
||||
@ -154,7 +154,7 @@ class DBTransactionStorageTests {
|
||||
}
|
||||
|
||||
private fun newTransactionStorage(cacheSizeBytesOverride: Long? = null) {
|
||||
transactionStorage = DBTransactionStorage(cacheSizeBytesOverride ?: NodeConfiguration.defaultTransactionCacheSize, database)
|
||||
transactionStorage = DBTransactionStorage(database, TestingNamedCacheFactory(cacheSizeBytesOverride ?: 1024))
|
||||
}
|
||||
|
||||
private fun assertTransactionIsRetrievable(transaction: SignedTransaction) {
|
||||
|
@ -9,6 +9,7 @@ import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.flows.CashIssueFlow
|
||||
import net.corda.node.services.identity.PersistentIdentityService
|
||||
import net.corda.node.services.keys.E2ETestKeyManagementService
|
||||
import net.corda.node.utilities.TestingNamedCacheFactory
|
||||
import net.corda.testing.core.BOC_NAME
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
import net.corda.testing.node.MockNetwork
|
||||
@ -47,7 +48,7 @@ class HibernateColumnConverterTests {
|
||||
val ref = OpaqueBytes.of(0x01)
|
||||
|
||||
// Create parallel set of key and identity services so that the values are not cached, forcing the node caches to do a lookup.
|
||||
val identityService = PersistentIdentityService()
|
||||
val identityService = PersistentIdentityService(TestingNamedCacheFactory())
|
||||
val originalIdentityService: PersistentIdentityService = bankOfCordaNode.services.identityService as PersistentIdentityService
|
||||
identityService.database = originalIdentityService.database
|
||||
identityService.start(originalIdentityService.trustRoot)
|
||||
|
@ -15,6 +15,7 @@ import net.corda.core.node.services.vault.Sort
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.utilities.TestingNamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.internal.LogHelper
|
||||
@ -51,7 +52,7 @@ class NodeAttachmentServiceTest {
|
||||
val dataSourceProperties = makeTestDataSourceProperties()
|
||||
database = configureDatabase(dataSourceProperties, DatabaseConfig(), { null }, { null })
|
||||
fs = Jimfs.newFileSystem(Configuration.unix())
|
||||
storage = NodeAttachmentService(MetricRegistry(), database).also {
|
||||
storage = NodeAttachmentService(MetricRegistry(), TestingNamedCacheFactory(), database).also {
|
||||
database.transaction {
|
||||
it.start()
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.notary.NotaryInternalException
|
||||
import net.corda.node.internal.configureDatabase
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
import net.corda.node.utilities.TestingNamedCacheFactory
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
@ -49,7 +50,7 @@ class PersistentUniquenessProviderTests {
|
||||
|
||||
@Test
|
||||
fun `should commit a transaction with unused inputs without exception`() {
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database)
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||
val inputState = generateStateRef()
|
||||
|
||||
provider.commit(listOf(inputState), txID, identity, requestSignature)
|
||||
@ -57,7 +58,7 @@ class PersistentUniquenessProviderTests {
|
||||
|
||||
@Test
|
||||
fun `should report a conflict for a transaction with previously used inputs`() {
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database)
|
||||
val provider = PersistentUniquenessProvider(Clock.systemUTC(), database, TestingNamedCacheFactory())
|
||||
val inputState = generateStateRef()
|
||||
|
||||
val inputs = listOf(inputState)
|
||||
|
@ -0,0 +1,33 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.github.benmanes.caffeine.cache.Cache
|
||||
import com.github.benmanes.caffeine.cache.CacheLoader
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache
|
||||
import net.corda.core.internal.buildNamed
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.node.services.config.MB
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
|
||||
class TestingNamedCacheFactory private constructor(private val sizeOverride: Long, private val metricRegistry: MetricRegistry?, private val nodeConfiguration: NodeConfiguration?) : NamedCacheFactory, SingletonSerializeAsToken() {
|
||||
constructor(sizeOverride: Long = 1024) : this(sizeOverride, null, null)
|
||||
|
||||
override fun bindWithMetrics(metricRegistry: MetricRegistry): NamedCacheFactory = TestingNamedCacheFactory(sizeOverride, metricRegistry, this.nodeConfiguration)
|
||||
override fun bindWithConfig(nodeConfiguration: NodeConfiguration): NamedCacheFactory = TestingNamedCacheFactory(sizeOverride, this.metricRegistry, nodeConfiguration)
|
||||
|
||||
override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String): Cache<K, V> {
|
||||
// Does not check metricRegistry or nodeConfiguration, because for tests we don't care.
|
||||
return caffeine.maximumSize(sizeOverride).buildNamed<K, V>(name)
|
||||
}
|
||||
|
||||
override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
|
||||
// Does not check metricRegistry or nodeConfiguration, because for tests we don't care.
|
||||
val configuredCaffeine = when (name) {
|
||||
"DBTransactionStorage_transactions" -> caffeine.maximumWeight(1.MB)
|
||||
"NodeAttachmentService_attachmentContent" -> caffeine.maximumWeight(1.MB)
|
||||
else -> caffeine.maximumSize(sizeOverride)
|
||||
}
|
||||
return configuredCaffeine.buildNamed<K, V>(name, loader)
|
||||
}
|
||||
}
|
@ -27,7 +27,7 @@ import java.security.SignatureException
|
||||
// START 1
|
||||
@CordaService
|
||||
class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database)
|
||||
override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database, services.cacheFactory)
|
||||
|
||||
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this)
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
package net.corda.testing.node.internal
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.google.common.jimfs.Configuration.unix
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
@ -47,6 +46,7 @@ import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
|
||||
import net.corda.node.services.transactions.BFTSMaRt
|
||||
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||
import net.corda.node.utilities.DefaultNamedCacheFactory
|
||||
import net.corda.nodeapi.internal.DevIdentityGenerator
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||
@ -54,9 +54,9 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseConfig
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
import net.corda.testing.driver.TestCorDapp
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.internal.setGlobalSerialization
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.testing.internal.testThreadFactory
|
||||
import net.corda.testing.node.*
|
||||
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
|
||||
@ -279,6 +279,7 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
open class MockNode(args: MockNodeArgs, cordappLoader: CordappLoader = JarScanningCordappLoader.fromDirectories(args.config.cordappDirectories)) : AbstractNode<TestStartedNode>(
|
||||
args.config,
|
||||
TestClock(Clock.systemUTC()),
|
||||
DefaultNamedCacheFactory(),
|
||||
args.version,
|
||||
cordappLoader,
|
||||
args.network.getServerThread(args.id),
|
||||
@ -405,8 +406,8 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
|
||||
get() = _serializationWhitelists
|
||||
private var dbCloser: (() -> Any?)? = null
|
||||
|
||||
override fun startDatabase(metricRegistry: MetricRegistry?) {
|
||||
super.startDatabase(metricRegistry)
|
||||
override fun startDatabase() {
|
||||
super.startDatabase()
|
||||
dbCloser = database::close
|
||||
runOnStop += dbCloser!!
|
||||
}
|
||||
|
@ -12,12 +12,14 @@ fun startReporter(shutdownManager: ShutdownManager, metricRegistry: MetricRegist
|
||||
val jmxReporter = thread {
|
||||
JmxReporter.forRegistry(metricRegistry).inDomain("net.corda").createsObjectNamesWith { _, domain, name ->
|
||||
// Make the JMX hierarchy a bit better organised.
|
||||
val category = name.substringBefore('.')
|
||||
val category = name.substringBefore('.').substringBeforeLast('/')
|
||||
val component = name.substringBefore('.').substringAfterLast('/', "")
|
||||
val subName = name.substringAfter('.', "")
|
||||
if (subName == "")
|
||||
ObjectName("$domain:name=$category")
|
||||
ObjectName("$domain:name=$category${if (component.isNotEmpty()) ",component=$component," else ""}")
|
||||
else
|
||||
ObjectName("$domain:type=$category,name=$subName")
|
||||
ObjectName("$domain:type=$category,${if (component.isNotEmpty()) "component=$component," else ""}name=$subName")
|
||||
|
||||
}.build().start()
|
||||
}
|
||||
shutdownManager.registerShutdown { jmxReporter.interrupt() }
|
||||
|
Loading…
x
Reference in New Issue
Block a user