mirror of
https://github.com/corda/corda.git
synced 2024-12-27 08:22:35 +00:00
ENT-1594 Remove usages of Guava cache. Replace with Caffeine. Added whether message is session initiation or not to cache key for de-duplication to account for the 2 P2P consumers. (#557)
This commit is contained in:
parent
247f15ec19
commit
a71ab3f1a1
@ -10,9 +10,8 @@
|
||||
|
||||
package com.r3.corda.networkmanage.doorman.webservice
|
||||
|
||||
import com.google.common.cache.CacheBuilder
|
||||
import com.google.common.cache.CacheLoader
|
||||
import com.google.common.cache.LoadingCache
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache
|
||||
import com.r3.corda.networkmanage.common.persistence.NetworkMapStorage
|
||||
import com.r3.corda.networkmanage.common.persistence.NodeInfoStorage
|
||||
import com.r3.corda.networkmanage.common.utils.SignedNetworkMap
|
||||
@ -51,23 +50,24 @@ class NetworkMapWebService(private val nodeInfoStorage: NodeInfoStorage,
|
||||
const val NETWORK_MAP_PATH = "network-map"
|
||||
}
|
||||
|
||||
private val networkMapCache: LoadingCache<Boolean, CachedData> = CacheBuilder.newBuilder()
|
||||
private val networkMapCache: LoadingCache<Boolean, CachedData> = Caffeine.newBuilder()
|
||||
.expireAfterWrite(config.cacheTimeout, TimeUnit.MILLISECONDS)
|
||||
.build(CacheLoader.from { _ -> networkMapStorage.getCurrentNetworkMap()?.let {
|
||||
.build({ _ ->
|
||||
networkMapStorage.getCurrentNetworkMap()?.let {
|
||||
val networkMap = it.verified()
|
||||
CachedData(it, networkMap.nodeInfoHashes.toSet(), networkMapStorage.getSignedNetworkParameters(networkMap.networkParameterHash)?.verified()) }
|
||||
})
|
||||
|
||||
private val nodeInfoCache: LoadingCache<SecureHash, SignedNodeInfo> = CacheBuilder.newBuilder()
|
||||
private val nodeInfoCache: LoadingCache<SecureHash, SignedNodeInfo> = Caffeine.newBuilder()
|
||||
// TODO: Define cache retention policy.
|
||||
.softValues()
|
||||
.build(CacheLoader.from { key ->
|
||||
.build({ key ->
|
||||
key?.let { nodeInfoStorage.getNodeInfo(it) }
|
||||
})
|
||||
|
||||
private val currentSignedNetworkMap: SignedNetworkMap? get() = networkMapCache.getOrNull(true)?.signedNetworkMap
|
||||
private val currentNodeInfoHashes: Set<SecureHash> get() = networkMapCache.getOrNull(true)?.nodeInfoHashes ?: emptySet()
|
||||
private val currentNetworkParameters: NetworkParameters? get() = networkMapCache.getOrNull(true)?.currentNetworkParameter
|
||||
private val currentSignedNetworkMap: SignedNetworkMap? get() = networkMapCache.get(true)?.signedNetworkMap
|
||||
private val currentNodeInfoHashes: Set<SecureHash> get() = networkMapCache.get(true)?.nodeInfoHashes ?: emptySet()
|
||||
private val currentNetworkParameters: NetworkParameters? get() = networkMapCache.get(true)?.currentNetworkParameter
|
||||
|
||||
@POST
|
||||
@Path("publish")
|
||||
@ -104,7 +104,7 @@ class NetworkMapWebService(private val nodeInfoStorage: NodeInfoStorage,
|
||||
// Only serve node info if its in the current network map, otherwise return 404.
|
||||
logger.trace { "Processing node info request for hash: '$nodeInfoHash'" }
|
||||
val signedNodeInfo = if (SecureHash.parse(nodeInfoHash) in currentNodeInfoHashes) {
|
||||
nodeInfoCache.getOrNull(SecureHash.parse(nodeInfoHash))
|
||||
nodeInfoCache.get(SecureHash.parse(nodeInfoHash))
|
||||
} else {
|
||||
logger.trace { "Requested node info is not current, returning null." }
|
||||
null
|
||||
@ -155,13 +155,4 @@ class NetworkMapWebService(private val nodeInfoStorage: NodeInfoStorage,
|
||||
|
||||
private data class CachedData(val signedNetworkMap: SignedNetworkMap, val nodeInfoHashes: Set<SecureHash>, val currentNetworkParameter: NetworkParameters?)
|
||||
|
||||
// Guava loading cache will throw if value is null, this helper method returns null instead.
|
||||
// The loading cache will load the data from persistence again ignoring timeout if previous value was null.
|
||||
private fun <K : Any, V : Any> LoadingCache<K, V>.getOrNull(key: K): V? {
|
||||
return try {
|
||||
get(key)
|
||||
} catch (e: CacheLoader.InvalidCacheLoadException) {
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -149,7 +149,10 @@ interface ReceivedMessage : Message {
|
||||
val peer: CordaX500Name
|
||||
/** Platform version of the sender's node. */
|
||||
val platformVersion: Int
|
||||
/** UUID representing the sending JVM */
|
||||
val senderSeqNo: Long?
|
||||
/** True if a flow session init message */
|
||||
val isSessionInit: Boolean
|
||||
}
|
||||
|
||||
/** A singleton that's useful for validating topic strings */
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.google.common.cache.CacheBuilder
|
||||
import com.github.benmanes.caffeine.cache.Caffeine
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.node.services.statemachine.DeduplicationId
|
||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||
@ -32,7 +32,7 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) {
|
||||
private val processedMessages = createProcessedMessages()
|
||||
// We add the peer to the key, so other peers cannot attempt malicious meddling with sequence numbers.
|
||||
// Expire after 7 days since we last touched an entry, to avoid infinite growth.
|
||||
private val senderUUIDSeqNoHWM: MutableMap<Pair<String, CordaX500Name>, Long> = CacheBuilder.newBuilder().expireAfterAccess(7, TimeUnit.DAYS).build<Pair<String, CordaX500Name>, Long>().asMap()
|
||||
private val senderUUIDSeqNoHWM: MutableMap<Triple<String, CordaX500Name, Boolean>, Long> = Caffeine.newBuilder().expireAfterAccess(7, TimeUnit.DAYS).build<Triple<String, CordaX500Name, Boolean>, Long>().asMap()
|
||||
|
||||
private fun createProcessedMessages(): AppendOnlyPersistentMap<DeduplicationId, Instant, ProcessedMessage, String> {
|
||||
return AppendOnlyPersistentMap(
|
||||
@ -62,7 +62,7 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) {
|
||||
* We also ensure the UUID cannot be spoofed, by incorporating the authenticated sender into the key of the map/cache.
|
||||
*/
|
||||
private fun isDuplicateWithPotentialOptimization(receivedSenderUUID: String, receivedSenderSeqNo: Long, msg: ReceivedMessage): Boolean {
|
||||
return senderUUIDSeqNoHWM.compute(Pair(receivedSenderUUID, msg.peer)) { key, existingSeqNoHWM ->
|
||||
return senderUUIDSeqNoHWM.compute(Triple(receivedSenderUUID, msg.peer, msg.isSessionInit)) { key, existingSeqNoHWM ->
|
||||
val isNewHWM = (existingSeqNoHWM != null && existingSeqNoHWM < receivedSenderSeqNo)
|
||||
if (isNewHWM) {
|
||||
// If we are the new HWM, set the HWM to us.
|
||||
|
@ -398,9 +398,10 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) }
|
||||
val receivedSenderUUID = message.getStringProperty(P2PMessagingHeaders.senderUUID)
|
||||
val receivedSenderSeqNo = if (message.containsProperty(P2PMessagingHeaders.senderSeqNo)) message.getLongProperty(P2PMessagingHeaders.senderSeqNo) else null
|
||||
log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId senderUUID: $receivedSenderUUID senderSeqNo: $receivedSenderSeqNo" }
|
||||
val isSessionInit = message.getStringProperty(P2PMessagingHeaders.Type.KEY) == P2PMessagingHeaders.Type.SESSION_INIT_VALUE
|
||||
log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId senderUUID: $receivedSenderUUID senderSeqNo: $receivedSenderSeqNo isSessionInit: $isSessionInit" }
|
||||
|
||||
return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, receivedSenderUUID, receivedSenderSeqNo, message)
|
||||
return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, receivedSenderUUID, receivedSenderSeqNo, isSessionInit, message)
|
||||
} catch (e: Exception) {
|
||||
log.error("Unable to process message, ignoring it: $message", e)
|
||||
return null
|
||||
@ -418,6 +419,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
override val uniqueMessageId: DeduplicationId,
|
||||
override val senderUUID: String?,
|
||||
override val senderSeqNo: Long?,
|
||||
override val isSessionInit: Boolean,
|
||||
private val message: ClientMessage) : ReceivedMessage {
|
||||
override val data: ByteSequence by lazy { OpaqueBytes(ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }) }
|
||||
override val debugTimestamp: Instant get() = Instant.ofEpochMilli(message.timestamp)
|
||||
|
@ -28,12 +28,7 @@ import net.corda.core.utilities.ByteSequence
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.messaging.AcknowledgeHandle
|
||||
import net.corda.node.services.messaging.Message
|
||||
import net.corda.node.services.messaging.MessageHandler
|
||||
import net.corda.node.services.messaging.MessageHandlerRegistration
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.ReceivedMessage
|
||||
import net.corda.node.services.messaging.*
|
||||
import net.corda.node.services.statemachine.DeduplicationId
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
@ -307,7 +302,9 @@ class InMemoryMessagingNetwork private constructor(
|
||||
override val debugTimestamp: Instant,
|
||||
override val peer: CordaX500Name,
|
||||
override val senderUUID: String? = null,
|
||||
override val senderSeqNo: Long? = null) : ReceivedMessage {
|
||||
override val senderSeqNo: Long? = null,
|
||||
/** Note this flag is never set in the in memory network. */
|
||||
override val isSessionInit: Boolean = false) : ReceivedMessage {
|
||||
|
||||
override val additionalHeaders: Map<String, String> = emptyMap()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user