Merge from Corda/OS

This commit is contained in:
IgorNitto 2018-03-14 17:24:47 +00:00
commit 0a0c8538c4
33 changed files with 155 additions and 146 deletions

View File

@ -58,6 +58,7 @@ buildscript {
ext.log4j_version = '2.9.1'
ext.bouncycastle_version = constants.getProperty("bouncycastleVersion")
ext.guava_version = constants.getProperty("guavaVersion")
ext.caffeine_version = constants.getProperty("caffeineVersion")
ext.okhttp_version = '3.5.0'
ext.netty_version = '4.1.9.Final'
ext.typesafe_config_version = constants.getProperty("typesafeConfigVersion")

View File

@ -10,8 +10,7 @@
package net.corda.client.jfx.model
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import javafx.beans.value.ObservableValue
import javafx.collections.FXCollections
import javafx.collections.ObservableList
@ -42,8 +41,8 @@ class NetworkIdentityModel {
private val rpcProxy by observableValue(NodeMonitorModel::proxyObservable)
private val identityCache = CacheBuilder.newBuilder()
.build<PublicKey, ObservableValue<NodeInfo?>>(CacheLoader.from { publicKey ->
private val identityCache = Caffeine.newBuilder()
.build<PublicKey, ObservableValue<NodeInfo?>>({ publicKey ->
publicKey?.let { rpcProxy.map { it?.nodeInfoFromParty(AnonymousParty(publicKey)) } }
})
val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.notaryIdentities() ?: emptyList()) })
@ -52,5 +51,5 @@ class NetworkIdentityModel {
.filtered { it.legalIdentities.all { it !in notaries } }
val myIdentity = rpcProxy.map { it?.nodeInfo()?.legalIdentitiesAndCerts?.first()?.party }
fun partyFromPublicKey(publicKey: PublicKey): ObservableValue<NodeInfo?> = identityCache[publicKey]
fun partyFromPublicKey(publicKey: PublicKey): ObservableValue<NodeInfo?> = identityCache[publicKey]!!
}

View File

@ -81,6 +81,9 @@ dependencies {
compile project(':core')
compile project(':node-api')
// For caches rather than guava
compile "com.github.ben-manes.caffeine:caffeine:$caffeine_version"
// Unit testing helpers.
testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
testCompile "junit:junit:$junit_version"

View File

@ -54,12 +54,6 @@ data class RPCClientConfiguration(
val reapInterval: Duration,
/** The number of threads to use for observations (for executing [Observable.onNext]) */
val observationExecutorPoolSize: Int,
/**
* Determines the concurrency level of the Observable Cache. This is exposed because it implicitly determines
* the limit on the number of leaked observables reaped because of garbage collection per reaping.
* See the implementation of [com.google.common.cache.LocalCache] for details.
*/
val cacheConcurrencyLevel: Int,
/** The retry interval of artemis connections in milliseconds */
val connectionRetryInterval: Duration,
/** The retry interval multiplier for exponential backoff */
@ -81,7 +75,6 @@ data class RPCClientConfiguration(
trackRpcCallSites = false,
reapInterval = 1.seconds,
observationExecutorPoolSize = 4,
cacheConcurrencyLevel = 8,
connectionRetryInterval = 5.seconds,
connectionRetryIntervalMultiplier = 1.5,
connectionMaxRetryInterval = 3.minutes,

View File

@ -10,14 +10,15 @@
package net.corda.client.rpc.internal
import co.paralleluniverse.common.util.SameThreadExecutor
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import com.google.common.cache.RemovalCause
import com.google.common.cache.RemovalListener
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.RemovalCause
import com.github.benmanes.caffeine.cache.RemovalListener
import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.RPCException
@ -152,10 +153,10 @@ class RPCClientProxyHandler(
private val serializationContextWithObservableContext = RpcClientObservableSerializer.createContext(serializationContext, observableContext)
private fun createRpcObservableMap(): RpcObservableMap {
val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> {
val observableId = it.key!!
val onObservableRemove = RemovalListener<InvocationId, UnicastSubject<Notification<*>>> { key, value, cause ->
val observableId = key!!
val rpcCallSite = callSiteMap?.remove(observableId)
if (it.cause == RemovalCause.COLLECTED) {
if (cause == RemovalCause.COLLECTED) {
log.warn(listOf(
"A hot observable returned from an RPC was never subscribed to.",
"This wastes server-side resources because it was queueing observations for retrieval.",
@ -166,10 +167,9 @@ class RPCClientProxyHandler(
}
observablesToReap.locked { observables.add(observableId) }
}
return CacheBuilder.newBuilder().
return Caffeine.newBuilder().
weakValues().
removalListener(onObservableRemove).
concurrencyLevel(rpcConfiguration.cacheConcurrencyLevel).
removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).
build()
}

View File

@ -101,8 +101,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
return testProxy<TestOps>(
TestOpsImpl(pool),
clientConfiguration = RPCClientConfiguration.default.copy(
reapInterval = 100.millis,
cacheConcurrencyLevel = 16
reapInterval = 100.millis
),
serverConfiguration = RPCServerConfiguration.default.copy(
rpcThreadPoolSize = 4

View File

@ -98,7 +98,6 @@ class RPCPerformanceTests : AbstractRPCTest() {
rpcDriver {
val proxy = testProxy(
RPCClientConfiguration.default.copy(
cacheConcurrencyLevel = 16,
observationExecutorPoolSize = 2
),
RPCServerConfiguration.default.copy(
@ -138,8 +137,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
val metricRegistry = startReporter(shutdownManager)
val proxy = testProxy(
RPCClientConfiguration.default.copy(
reapInterval = 1.seconds,
cacheConcurrencyLevel = 16
reapInterval = 1.seconds
),
RPCServerConfiguration.default.copy(
rpcThreadPoolSize = 8

View File

@ -17,3 +17,4 @@ typesafeConfigVersion=1.3.1
jsr305Version=3.0.2
artifactoryPluginVersion=4.4.18
snakeYamlVersion=1.19
caffeineVersion=2.6.2

View File

@ -62,13 +62,19 @@ data class LedgerTransaction @JvmOverloads constructor(
private companion object {
@JvmStatic
private fun createContractFor(className: ContractClassName): Try<Contract> {
return Try.on { this::class.java.classLoader.loadClass(className).asSubclass(Contract::class.java).getConstructor().newInstance() }
private fun createContractFor(className: ContractClassName, classLoader: ClassLoader?): Try<Contract> {
return Try.on {
(classLoader ?: this::class.java.classLoader)
.loadClass(className)
.asSubclass(Contract::class.java)
.getConstructor()
.newInstance()
}
}
}
private val contracts: Map<ContractClassName, Try<Contract>> = (inputs.map { it.state.contract } + outputs.map { it.contract })
.toSet().map { it to createContractFor(it) }.toMap()
private val contracts: Map<ContractClassName, Try<Contract>> = (inputs.map { it.state } + outputs)
.map { it.contract to createContractFor(it.contract, it.data::class.java.classLoader) }.toMap()
val inputStates: List<ContractState> get() = inputs.map { it.state.data }

View File

@ -107,11 +107,12 @@ Mac
Java
^^^^
1. Open "System Preferences > Java"
2. In the Java Control Panel, if an update is available, click "Update Now"
3. In the "Software Update" window, click "Install Update". If required, enter your password and click "Install Helper" when prompted
4. Wait for a pop-up window indicating that you have successfully installed the update, and click "Close"
5. Open a new terminal and type ``java -version`` to test that Java is installed correctly
1. Visit http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
2. Scroll down to "Java SE Development Kit 8uXXX" (where "XXX" is the latest minor version number)
3. Toggle "Accept License Agreement"
4. Click the download link for jdk-8uXXX-macosx-x64.dmg (where "XXX" is the latest minor version number)
5. Download and run the executable to install Java (use the default settings)
6. Open a new terminal window and run ``java -version`` to test that Java is installed correctly
IntelliJ
^^^^^^^^
@ -173,4 +174,4 @@ By then, you'll be ready to start writing your own CorDapps. Learn how to do thi
:doc:`flow cookbook <flow-cookbook>` and the `samples <https://www.corda.net/samples/>`_ along the way.
If you encounter any issues, please see the :doc:`troubleshooting` page, or get in touch with us on the
`forums <https://discourse.corda.net/>`_ or via `slack <http://slack.corda.net/>`_.
`forums <https://discourse.corda.net/>`_ or via `slack <http://slack.corda.net/>`_.

View File

@ -46,7 +46,7 @@ abstract class AbstractCashSelection(private val maxRetries : Int = 8, private v
fun getInstance(metadata: () -> java.sql.DatabaseMetaData): AbstractCashSelection {
return instance.get() ?: {
val _metadata = metadata()
val cashSelectionAlgos = ServiceLoader.load(AbstractCashSelection::class.java).toList()
val cashSelectionAlgos = ServiceLoader.load(AbstractCashSelection::class.java, this::class.java.classLoader).toList()
val cashSelectionAlgo = cashSelectionAlgos.firstOrNull { it.isCompatible(_metadata) }
cashSelectionAlgo?.let {
instance.set(cashSelectionAlgo)

View File

@ -63,6 +63,9 @@ dependencies {
compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
// For caches rather than guava
compile "com.github.ben-manes.caffeine:caffeine:$caffeine_version"
// Unit testing helpers.
testCompile "junit:junit:$junit_version"
testCompile "org.assertj:assertj-core:$assertj_version"

View File

@ -10,8 +10,8 @@
package net.corda.nodeapi.internal
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
@ -21,11 +21,11 @@ import java.util.concurrent.atomic.AtomicLong
*/
class DeduplicationChecker(cacheExpiry: Duration) {
// dedupe identity -> watermark cache
private val watermarkCache = CacheBuilder.newBuilder()
private val watermarkCache = Caffeine.newBuilder()
.expireAfterAccess(cacheExpiry.toNanos(), TimeUnit.NANOSECONDS)
.build(WatermarkCacheLoader)
private object WatermarkCacheLoader : CacheLoader<Any, AtomicLong>() {
private object WatermarkCacheLoader : CacheLoader<Any, AtomicLong> {
override fun load(key: Any) = AtomicLong(-1)
}
@ -35,6 +35,7 @@ class DeduplicationChecker(cacheExpiry: Duration) {
* @return true if the message is unique, false if it's a duplicate.
*/
fun checkDuplicateMessageId(identity: Any, sequenceNumber: Long): Boolean {
return watermarkCache[identity].getAndUpdate { maxOf(sequenceNumber, it) } >= sequenceNumber
return watermarkCache[identity]!!.getAndUpdate { maxOf(sequenceNumber, it) } >= sequenceNumber
}
}

View File

@ -64,7 +64,8 @@ class CordaPersistence(
databaseConfig: DatabaseConfig,
schemas: Set<MappedSchema>,
val jdbcUrl: String,
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet()
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet(),
val cordappClassLoader: ClassLoader? = null
) : Closeable {
companion object {
private val log = contextLogger()
@ -74,7 +75,7 @@ class CordaPersistence(
val hibernateConfig: HibernateConfiguration by lazy {
transaction {
HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl)
HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl, cordappClassLoader)
}
}
val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas

View File

@ -19,6 +19,8 @@ import org.hibernate.boot.Metadata
import org.hibernate.boot.MetadataBuilder
import org.hibernate.boot.MetadataSources
import org.hibernate.boot.registry.BootstrapServiceRegistryBuilder
import org.hibernate.boot.registry.classloading.internal.ClassLoaderServiceImpl
import org.hibernate.boot.registry.classloading.spi.ClassLoaderService
import org.hibernate.cfg.Configuration
import org.hibernate.engine.jdbc.connections.spi.ConnectionProvider
import org.hibernate.service.UnknownUnwrapTypeException
@ -36,7 +38,8 @@ class HibernateConfiguration(
schemas: Set<MappedSchema>,
private val databaseConfig: DatabaseConfig,
private val attributeConverters: Collection<AttributeConverter<*, *>>,
private val jdbcUrl: String
private val jdbcUrl: String,
val cordappClassLoader: ClassLoader? = null
) {
companion object {
private val logger = contextLogger()
@ -93,7 +96,7 @@ class HibernateConfiguration(
schema.mappedTypes.forEach { config.addAnnotatedClass(it) }
}
val sessionFactory = buildSessionFactory(config, metadataSources)
val sessionFactory = buildSessionFactory(config, metadataSources, cordappClassLoader)
logger.info("Created session factory for schemas: $schemas")
// export Hibernate JMX statistics
@ -119,11 +122,17 @@ class HibernateConfiguration(
}
}
private fun buildSessionFactory(config: Configuration, metadataSources: MetadataSources): SessionFactory {
private fun buildSessionFactory(config: Configuration, metadataSources: MetadataSources, cordappClassLoader: ClassLoader?): SessionFactory {
config.standardServiceRegistryBuilder.applySettings(config.properties)
if (cordappClassLoader != null) {
config.standardServiceRegistryBuilder.addService(
ClassLoaderService::class.java,
ClassLoaderServiceImpl(cordappClassLoader))
}
val metadataBuilder = metadataSources.getMetadataBuilder(config.standardServiceRegistryBuilder.build())
val metadata = buildHibernateMetadata(metadataBuilder, jdbcUrl, attributeConverters)
return metadata.sessionFactoryBuilder.run {
allowOutOfTransactionUpdateOperations(true)
applySecondLevelCacheSupport(false)

View File

@ -10,8 +10,8 @@
package net.corda.nodeapi.internal.serialization
import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.core.contracts.Attachment
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.copyBytes
@ -40,7 +40,7 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe
override val useCase: SerializationContext.UseCase,
override val encoding: SerializationEncoding?,
override val encodingWhitelist: EncodingWhitelist = NullEncodingWhitelist) : SerializationContext {
private val cache: Cache<List<SecureHash>, AttachmentsClassLoader> = CacheBuilder.newBuilder().weakValues().maximumSize(1024).build()
private val cache: Cache<List<SecureHash>, AttachmentsClassLoader> = Caffeine.newBuilder().weakValues().maximumSize(1024).build()
/**
* {@inheritDoc}
@ -59,7 +59,7 @@ data class SerializationContextImpl @JvmOverloads constructor(override val prefe
}
missing.isNotEmpty() && throw MissingAttachmentsException(missing)
AttachmentsClassLoader(attachments, parent = deserializationClassLoader)
})
}!!)
} catch (e: ExecutionException) {
// Caught from within the cache get, so unwrap.
throw e.cause!!

View File

@ -28,7 +28,8 @@ private typealias MapCreationFunction = (Map<*, *>) -> Map<*, *>
* Serialization / deserialization of certain supported [Map] types.
*/
class MapSerializer(private val declaredType: ParameterizedType, factory: SerializerFactory) : AMQPSerializer<Any> {
override val type: Type = declaredType as? DeserializedParameterizedType ?: DeserializedParameterizedType.make(SerializerFactory.nameForType(declaredType))
override val type: Type = (declaredType as? DeserializedParameterizedType) ?:
DeserializedParameterizedType.make(SerializerFactory.nameForType(declaredType), factory.classloader)
override val typeDescriptor: Symbol = Symbol.valueOf(
"$DESCRIPTOR_DOMAIN:${factory.fingerPrinter.fingerprint(type)}")

View File

@ -91,6 +91,9 @@ dependencies {
compile "com.google.guava:guava:$guava_version"
// For caches rather than guava
compile "com.github.ben-manes.caffeine:caffeine:$caffeine_version"
// JOpt: for command line flags.
compile "net.sf.jopt-simple:jopt-simple:$jopt_simple_version"

View File

@ -91,8 +91,6 @@ public class CordaCaplet extends Capsule {
T cp = super.attribute(attr);
(new File(baseDir, "cordapps")).mkdir();
augmentClasspath((List<Path>) cp, new File(baseDir, "cordapps"));
augmentClasspath((List<Path>) cp, new File(baseDir, "plugins"));
// Add additional directories of JARs to the classpath (at the end). e.g. for JDBC drivers
try {
List<String> jarDirs = nodeConfig.getStringList("jarDirs");

View File

@ -657,7 +657,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val props = configuration.dataSourceProperties
if (props.isEmpty()) throw DatabaseConfigurationException("There must be a database configured.")
val database = configureDatabase(props, configuration.database, identityService, schemaService)
val database = configureDatabase(props, configuration.database, identityService, schemaService, cordappLoader.appClassLoader)
// Now log the vendor string as this will also cause a connection to be tested eagerly.
logVendorString(database, log)
runOnStop += database::close
@ -897,7 +897,8 @@ internal class NetworkMapCacheEmptyException : Exception()
fun configureDatabase(hikariProperties: Properties,
databaseConfig: DatabaseConfig,
identityService: IdentityService,
schemaService: SchemaService = NodeSchemaService()): CordaPersistence {
schemaService: SchemaService = NodeSchemaService(),
cordappClassLoader: ClassLoader? = null): CordaPersistence {
// Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately
// Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
@ -907,8 +908,6 @@ fun configureDatabase(hikariProperties: Properties,
val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(identityService))
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
SchemaMigration(schemaService.schemaOptions.keys, dataSource, !isH2Database(jdbcUrl), databaseConfig).nodeStartup()
return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, attributeConverters)
return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, attributeConverters, cordappClassLoader)
}

View File

@ -53,7 +53,6 @@ import kotlin.streams.toList
*/
class CordappLoader private constructor(private val cordappJarPaths: List<RestrictedURL>) {
val cordapps: List<Cordapp> by lazy { loadCordapps() + coreCordapp }
val appClassLoader: ClassLoader = URLClassLoader(cordappJarPaths.stream().map { it.url }.toTypedArray(), javaClass.classLoader)
init {

View File

@ -10,8 +10,9 @@
package net.corda.node.internal.security
import com.google.common.cache.CacheBuilder
import com.google.common.cache.Cache
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.google.common.primitives.Ints
import net.corda.core.context.AuthServiceId
import net.corda.core.utilities.loggerFor
@ -112,7 +113,7 @@ class RPCSecurityManagerImpl(private val config: AuthServiceConfig) : RPCSecurit
return DefaultSecurityManager(realm).also {
// Setup optional cache layer if configured
it.cacheManager = config.options?.cache?.let {
GuavaCacheManager(
CaffeineCacheManager(
timeToLiveSeconds = it.expireAfterSecs,
maxSize = it.maxEntries)
}
@ -275,9 +276,9 @@ private class NodeJdbcRealm(config: SecurityConfiguration.AuthService.DataSource
private typealias ShiroCache<K, V> = org.apache.shiro.cache.Cache<K, V>
/*
* Adapts a [com.google.common.cache.Cache] to a [org.apache.shiro.cache.Cache] implementation.
* Adapts a [com.github.benmanes.caffeine.cache.Cache] to a [org.apache.shiro.cache.Cache] implementation.
*/
private fun <K, V> Cache<K, V>.toShiroCache(name: String) = object : ShiroCache<K, V> {
private fun <K : Any, V> Cache<K, V>.toShiroCache(name: String) = object : ShiroCache<K, V> {
val name = name
private val impl = this@toShiroCache
@ -300,7 +301,7 @@ private fun <K, V> Cache<K, V>.toShiroCache(name: String) = object : ShiroCache<
impl.invalidateAll()
}
override fun size() = Ints.checkedCast(impl.size())
override fun size() = Ints.checkedCast(impl.estimatedSize())
override fun keys() = impl.asMap().keys
override fun values() = impl.asMap().values
override fun toString() = "Guava cache adapter [$impl]"
@ -308,22 +309,22 @@ private fun <K, V> Cache<K, V>.toShiroCache(name: String) = object : ShiroCache<
/*
* Implementation of [org.apache.shiro.cache.CacheManager] based on
* cache implementation in [com.google.common.cache]
* cache implementation in [com.github.benmanes.caffeine.cache.Cache]
*/
private class GuavaCacheManager(val maxSize: Long,
val timeToLiveSeconds: Long) : CacheManager {
private class CaffeineCacheManager(val maxSize: Long,
val timeToLiveSeconds: Long) : CacheManager {
private val instances = ConcurrentHashMap<String, ShiroCache<*, *>>()
override fun <K, V> getCache(name: String): ShiroCache<K, V> {
override fun <K : Any, V> getCache(name: String): ShiroCache<K, V> {
val result = instances[name] ?: buildCache<K, V>(name)
instances.putIfAbsent(name, result)
return result as ShiroCache<K, V>
}
private fun <K, V> buildCache(name: String) : ShiroCache<K, V> {
private fun <K : Any, V> buildCache(name: String): ShiroCache<K, V> {
logger.info("Constructing cache '$name' with maximumSize=$maxSize, TTL=${timeToLiveSeconds}s")
return CacheBuilder.newBuilder()
return Caffeine.newBuilder()
.expireAfterWrite(timeToLiveSeconds, TimeUnit.SECONDS)
.maximumSize(maxSize)
.build<K, V>()
@ -331,6 +332,6 @@ private class GuavaCacheManager(val maxSize: Long,
}
companion object {
private val logger = loggerFor<GuavaCacheManager>()
private val logger = loggerFor<CaffeineCacheManager>()
}
}

View File

@ -10,13 +10,14 @@
package net.corda.node.services.messaging
import co.paralleluniverse.common.util.SameThreadExecutor
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import com.google.common.cache.RemovalListener
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.RemovalListener
import com.google.common.collect.HashMultimap
import com.google.common.collect.Multimaps
import com.google.common.collect.SetMultimap
@ -155,11 +156,11 @@ class RPCServer(
}
private fun createObservableSubscriptionMap(): ObservableSubscriptionMap {
val onObservableRemove = RemovalListener<InvocationId, ObservableSubscription> {
log.debug { "Unsubscribing from Observable with id ${it.key} because of ${it.cause}" }
it.value.subscription.unsubscribe()
val onObservableRemove = RemovalListener<InvocationId, ObservableSubscription> { key, value, cause ->
log.debug { "Unsubscribing from Observable with id ${key} because of ${cause}" }
value!!.subscription.unsubscribe()
}
return CacheBuilder.newBuilder().removalListener(onObservableRemove).build()
return Caffeine.newBuilder().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).build()
}
fun start(activeMqServerControl: ActiveMQServerControl) {

View File

@ -175,9 +175,9 @@ open class PersistentNetworkMapCache(
override fun getNodesByLegalName(name: CordaX500Name): List<NodeInfo> = database.transaction { queryByLegalName(session, name) }
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]
override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]!!
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, 8, { key -> database.transaction { queryByIdentityKey(session, key) } })
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024, { key -> database.transaction { queryByIdentityKey(session, key) } })
override fun getNodesByOwningKeyIndex(identityKeyIndex: String): List<NodeInfo> {
return database.transaction {
@ -187,9 +187,9 @@ open class PersistentNetworkMapCache(
override fun getNodeByAddress(address: NetworkHostAndPort): NodeInfo? = database.transaction { queryByAddress(session, address) }
override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = identityByLegalNameCache.get(name).orElse(null)
override fun getPeerCertificateByLegalName(name: CordaX500Name): PartyAndCertificate? = identityByLegalNameCache.get(name)!!.orElse(null)
private val identityByLegalNameCache = NonInvalidatingCache<CordaX500Name, Optional<PartyAndCertificate>>(1024, 8, { name -> Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) }) })
private val identityByLegalNameCache = NonInvalidatingCache<CordaX500Name, Optional<PartyAndCertificate>>(1024, { name -> Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) }) })
override fun track(): DataFeed<List<NodeInfo>, MapChange> {
synchronized(_changed) {

View File

@ -11,7 +11,7 @@
package net.corda.node.services.persistence
import com.codahale.metrics.MetricRegistry
import com.google.common.cache.Weigher
import com.github.benmanes.caffeine.cache.Weigher
import com.google.common.hash.HashCode
import com.google.common.hash.Hashing
import com.google.common.hash.HashingInputStream
@ -34,7 +34,6 @@ import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
import net.corda.node.utilities.NonInvalidatingCache
import net.corda.node.utilities.NonInvalidatingWeightBasedCache
import net.corda.node.utilities.defaultCordaCacheConcurrencyLevel
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.nodeapi.internal.withContractsInJar
@ -219,7 +218,6 @@ class NodeAttachmentService(
private val attachmentContentCache = NonInvalidatingWeightBasedCache<SecureHash, Optional<Pair<Attachment, ByteArray>>>(
maxWeight = attachmentContentCacheSize,
concurrencyLevel = defaultCordaCacheConcurrencyLevel,
weigher = object : Weigher<SecureHash, Optional<Pair<Attachment, ByteArray>>> {
override fun weigh(key: SecureHash, value: Optional<Pair<Attachment, ByteArray>>): Int {
return key.size + if (value.isPresent) value.get().second.size else 0
@ -244,12 +242,11 @@ class NodeAttachmentService(
private val attachmentCache = NonInvalidatingCache<SecureHash, Optional<Attachment>>(
attachmentCacheBound,
defaultCordaCacheConcurrencyLevel,
{ key -> Optional.ofNullable(createAttachment(key)) }
)
private fun createAttachment(key: SecureHash): Attachment? {
val content = attachmentContentCache.get(key)
val content = attachmentContentCache.get(key)!!
if (content.isPresent) {
return content.get().first
}
@ -259,7 +256,7 @@ class NodeAttachmentService(
}
override fun openAttachment(id: SecureHash): Attachment? {
val attachment = attachmentCache.get(id)
val attachment = attachmentCache.get(id)!!
if (attachment.isPresent) {
return attachment.get()
}

View File

@ -10,8 +10,8 @@
package net.corda.node.utilities
import com.google.common.cache.LoadingCache
import com.google.common.cache.Weigher
import com.github.benmanes.caffeine.cache.LoadingCache
import com.github.benmanes.caffeine.cache.Weigher
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.persistence.currentDBSession
import java.util.*
@ -39,7 +39,7 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
* Returns the value associated with the key, first loading that value from the storage if necessary.
*/
operator fun get(key: K): V? {
return cache.get(key).orElse(null)
return cache.get(key)!!.orElse(null)
}
val size get() = allPersisted().toList().size
@ -72,7 +72,7 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
} else {
Optional.of(value)
}
}
}!!
if (!insertionAttempt) {
if (existingInCache.isPresent) {
// Key already exists in cache, do nothing.
@ -81,7 +81,7 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
// This happens when the key was queried before with no value associated. We invalidate the cached null
// value and recursively call set again. This is to avoid race conditions where another thread queries after
// the invalidate but before the set.
cache.invalidate(key)
cache.invalidate(key!!)
return set(key, value, logWarning, store)
}
}
@ -158,7 +158,6 @@ class AppendOnlyPersistentMap<K, V, E, out EK>(
//TODO determine cacheBound based on entity class later or with node config allowing tuning, or using some heuristic based on heap size
override val cache = NonInvalidatingCache<K, Optional<V>>(
bound = cacheBound,
concurrencyLevel = 8,
loadFunction = { key -> Optional.ofNullable(loadValue(key)) })
}
@ -176,7 +175,6 @@ class WeightBasedAppendOnlyPersistentMap<K, V, E, out EK>(
persistentEntityClass) {
override val cache = NonInvalidatingWeightBasedCache<K, Optional<V>>(
maxWeight = maxWeight,
concurrencyLevel = 8,
weigher = object : Weigher<K, Optional<V>> {
override fun weigh(key: K, value: Optional<V>): Int {
return weighingFunc(key, value)

View File

@ -53,7 +53,7 @@ object JVMAgentRegistry {
} else {
(this::class.java.classLoader as? URLClassLoader)
?.urLs
?.map { Paths.get(it.path) }
?.map { Paths.get(it.toURI()) }
?.firstOrNull { it.fileName.toString() == jarFileName }
}
}

View File

@ -10,30 +10,29 @@
package net.corda.node.utilities
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.cache.LoadingCache
import com.google.common.cache.Weigher
import com.google.common.util.concurrent.ListenableFuture
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import com.github.benmanes.caffeine.cache.Weigher
class NonInvalidatingCache<K, V> private constructor(
val cache: LoadingCache<K, V>
) : LoadingCache<K, V> by cache {
constructor(bound: Long, concurrencyLevel: Int, loadFunction: (K) -> V) :
this(buildCache(bound, concurrencyLevel, loadFunction))
constructor(bound: Long, loadFunction: (K) -> V) :
this(buildCache(bound, loadFunction))
private companion object {
private fun <K, V> buildCache(bound: Long, concurrencyLevel: Int, loadFunction: (K) -> V): LoadingCache<K, V> {
val builder = CacheBuilder.newBuilder().maximumSize(bound).concurrencyLevel(concurrencyLevel)
private fun <K, V> buildCache(bound: Long, loadFunction: (K) -> V): LoadingCache<K, V> {
val builder = Caffeine.newBuilder().maximumSize(bound)
return builder.build(NonInvalidatingCacheLoader(loadFunction))
}
}
// TODO look into overriding loadAll() if we ever use it
class NonInvalidatingCacheLoader<K, V>(val loadFunction: (K) -> V) : CacheLoader<K, V>() {
override fun reload(key: K, oldValue: V): ListenableFuture<V> {
class NonInvalidatingCacheLoader<K, V>(val loadFunction: (K) -> V) : CacheLoader<K, V> {
override fun reload(key: K, oldValue: V): V {
throw IllegalStateException("Non invalidating cache refreshed")
}
@ -44,16 +43,14 @@ class NonInvalidatingCache<K, V> private constructor(
class NonInvalidatingWeightBasedCache<K, V> private constructor(
val cache: LoadingCache<K, V>
) : LoadingCache<K, V> by cache {
constructor (maxWeight: Long, concurrencyLevel: Int, weigher: Weigher<K, V>, loadFunction: (K) -> V) :
this(buildCache(maxWeight, concurrencyLevel, weigher, loadFunction))
constructor (maxWeight: Long, weigher: Weigher<K, V>, loadFunction: (K) -> V) :
this(buildCache(maxWeight, weigher, loadFunction))
private companion object {
private fun <K, V> buildCache(maxWeight: Long, concurrencyLevel: Int, weigher: Weigher<K, V>, loadFunction: (K) -> V): LoadingCache<K, V> {
val builder = CacheBuilder.newBuilder().maximumWeight(maxWeight).weigher(weigher).concurrencyLevel(concurrencyLevel)
private fun <K, V> buildCache(maxWeight: Long, weigher: Weigher<K, V>, loadFunction: (K) -> V): LoadingCache<K, V> {
val builder = Caffeine.newBuilder().maximumWeight(maxWeight).weigher(weigher)
return builder.build(NonInvalidatingCache.NonInvalidatingCacheLoader(loadFunction))
}
}
}
val defaultCordaCacheConcurrencyLevel: Int = 8
}

View File

@ -10,22 +10,24 @@
package net.corda.node.utilities
import com.google.common.cache.*
import com.google.common.util.concurrent.ListenableFuture
import co.paralleluniverse.common.util.SameThreadExecutor
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import com.github.benmanes.caffeine.cache.RemovalListener
class NonInvalidatingUnboundCache<K, V> private constructor(
val cache: LoadingCache<K, V>
) : LoadingCache<K, V> by cache {
constructor(concurrencyLevel: Int, loadFunction: (K) -> V, removalListener: RemovalListener<K, V> = RemovalListener {},
constructor(loadFunction: (K) -> V, removalListener: RemovalListener<K, V> = RemovalListener { key, value, cause -> },
keysToPreload: () -> Iterable<K> = { emptyList() }) :
this(buildCache(concurrencyLevel, loadFunction, removalListener, keysToPreload))
this(buildCache(loadFunction, removalListener, keysToPreload))
private companion object {
private fun <K, V> buildCache(concurrencyLevel: Int, loadFunction: (K) -> V, removalListener: RemovalListener<K, V>,
private fun <K, V> buildCache(loadFunction: (K) -> V, removalListener: RemovalListener<K, V>,
keysToPreload: () -> Iterable<K>): LoadingCache<K, V> {
val builder = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).removalListener(removalListener)
val builder = Caffeine.newBuilder().removalListener(removalListener).executor(SameThreadExecutor.getExecutor())
return builder.build(NonInvalidatingCacheLoader(loadFunction)).apply {
getAll(keysToPreload())
}
@ -33,8 +35,8 @@ class NonInvalidatingUnboundCache<K, V> private constructor(
}
// TODO look into overriding loadAll() if we ever use it
private class NonInvalidatingCacheLoader<K, V>(val loadFunction: (K) -> V) : CacheLoader<K, V>() {
override fun reload(key: K, oldValue: V): ListenableFuture<V> {
private class NonInvalidatingCacheLoader<K, V>(val loadFunction: (K) -> V) : CacheLoader<K, V> {
override fun reload(key: K, oldValue: V): V {
throw IllegalStateException("Non invalidating cache refreshed")
}

View File

@ -10,9 +10,8 @@
package net.corda.node.utilities
import com.google.common.cache.RemovalCause
import com.google.common.cache.RemovalListener
import com.google.common.cache.RemovalNotification
import com.github.benmanes.caffeine.cache.RemovalCause
import com.github.benmanes.caffeine.cache.RemovalListener
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.persistence.currentDBSession
import java.util.*
@ -20,7 +19,7 @@ import java.util.*
/**
* Implements an unbound caching layer on top of a table accessed via Hibernate mapping.
*/
class PersistentMap<K, V, E, out EK>(
class PersistentMap<K : Any, V, E, out EK>(
val toPersistentEntityKey: (K) -> EK,
val fromPersistentEntity: (E) -> Pair<K, V>,
val toPersistentEntity: (key: K, value: V) -> E,
@ -32,7 +31,6 @@ class PersistentMap<K, V, E, out EK>(
}
private val cache = NonInvalidatingUnboundCache(
concurrencyLevel = 8,
loadFunction = { key -> Optional.ofNullable(loadValue(key)) },
removalListener = ExplicitRemoval(toPersistentEntityKey, persistentEntityClass)
).apply {
@ -44,11 +42,11 @@ class PersistentMap<K, V, E, out EK>(
}
class ExplicitRemoval<K, V, E, EK>(private val toPersistentEntityKey: (K) -> EK, private val persistentEntityClass: Class<E>) : RemovalListener<K, V> {
override fun onRemoval(notification: RemovalNotification<K, V>?) {
when (notification?.cause) {
override fun onRemoval(key: K?, value: V?, cause: RemovalCause) {
when (cause) {
RemovalCause.EXPLICIT -> {
val session = currentDBSession()
val elem = session.find(persistentEntityClass, toPersistentEntityKey(notification.key))
val elem = session.find(persistentEntityClass, toPersistentEntityKey(key!!))
if (elem != null) {
session.remove(elem)
}
@ -63,14 +61,14 @@ class PersistentMap<K, V, E, out EK>(
}
override operator fun get(key: K): V? {
return cache.get(key).orElse(null)
return cache.get(key)!!.orElse(null)
}
fun all(): Sequence<Pair<K, V>> {
return cache.asMap().asSequence().filter { it.value.isPresent }.map { Pair(it.key, it.value.get()) }
}
override val size get() = cache.size().toInt()
override val size get() = cache.estimatedSize().toInt()
private tailrec fun set(key: K, value: V, logWarning: Boolean = true, store: (K, V) -> V?, replace: (K, V) -> Unit): Boolean {
var insertionAttempt = false
@ -82,7 +80,7 @@ class PersistentMap<K, V, E, out EK>(
// Store the value, depending on store implementation this may replace existing entry in DB.
store(key, value)
Optional.of(value)
}
}!!
if (!insertionAttempt) {
if (existingInCache.isPresent) {
// Key already exists in cache, store the new value in the DB (depends on tore implementation) and refresh cache.
@ -175,7 +173,7 @@ class PersistentMap<K, V, E, out EK>(
* Removes the mapping for the specified key from this map and underlying storage if present.
*/
override fun remove(key: K): V? {
val result = cache.get(key).orElse(null)
val result = cache.get(key)!!.orElse(null)
cache.invalidate(key)
return result
}
@ -263,7 +261,7 @@ class PersistentMap<K, V, E, out EK>(
override fun put(key: K, value: V): V? {
val old = cache.get(key)
addWithDuplicatesReplaced(key, value)
return old.orElse(null)
return old!!.orElse(null)
}
fun load() {

View File

@ -170,7 +170,7 @@ object NodeInterestRates {
}
private fun addDefaultFixes() {
knownFixes = parseFile(IOUtils.toString(Thread.currentThread().contextClassLoader.getResourceAsStream("net/corda/irs/simulation/example.rates.txt"), Charsets.UTF_8.name()))
knownFixes = parseFile(IOUtils.toString(this::class.java.classLoader.getResourceAsStream("net/corda/irs/simulation/example.rates.txt"), Charsets.UTF_8.name()))
}
}

View File

@ -142,7 +142,7 @@ open class MockServices private constructor(
val cordappLoader = CordappLoader.createWithTestPackages(cordappPackages)
val dataSourceProps = makeTestDataSourceProperties(initialIdentity.name.organisation)
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
val database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(initialIdentity.name.organisation), identityService, schemaService)
val database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(initialIdentity.name.organisation), identityService, schemaService, cordappLoader.appClassLoader)
val mockService = database.transaction {
object : MockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys) {
override val vaultService: VaultService = makeVaultService(database.hibernateConfig, schemaService)

View File

@ -10,9 +10,9 @@
package net.corda.explorer.identicon
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.google.common.base.Splitter
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import javafx.scene.SnapshotParameters
import javafx.scene.canvas.Canvas
import javafx.scene.canvas.GraphicsContext
@ -85,7 +85,7 @@ object IdenticonRenderer {
private val renderingSize = 30.0
private val cache = CacheBuilder.newBuilder().build(CacheLoader.from<SecureHash, Image> { key ->
private val cache = Caffeine.newBuilder().build(CacheLoader<SecureHash, Image> { key ->
key?.let { render(key.hashCode(), renderingSize) }
})
@ -102,7 +102,7 @@ object IdenticonRenderer {
}
fun getIdenticon(hash: SecureHash): Image {
return cache.get(hash)
return cache.get(hash)!!
}
/**