diff --git a/network-management/src/main/kotlin/com/r3/corda/networkmanage/doorman/webservice/NetworkMapWebService.kt b/network-management/src/main/kotlin/com/r3/corda/networkmanage/doorman/webservice/NetworkMapWebService.kt index 6d76ec7493..18afb91cd5 100644 --- a/network-management/src/main/kotlin/com/r3/corda/networkmanage/doorman/webservice/NetworkMapWebService.kt +++ b/network-management/src/main/kotlin/com/r3/corda/networkmanage/doorman/webservice/NetworkMapWebService.kt @@ -10,9 +10,8 @@ package com.r3.corda.networkmanage.doorman.webservice -import com.google.common.cache.CacheBuilder -import com.google.common.cache.CacheLoader -import com.google.common.cache.LoadingCache +import com.github.benmanes.caffeine.cache.Caffeine +import com.github.benmanes.caffeine.cache.LoadingCache import com.r3.corda.networkmanage.common.persistence.NetworkMapStorage import com.r3.corda.networkmanage.common.persistence.NodeInfoStorage import com.r3.corda.networkmanage.common.utils.SignedNetworkMap @@ -51,23 +50,24 @@ class NetworkMapWebService(private val nodeInfoStorage: NodeInfoStorage, const val NETWORK_MAP_PATH = "network-map" } - private val networkMapCache: LoadingCache = CacheBuilder.newBuilder() + private val networkMapCache: LoadingCache = Caffeine.newBuilder() .expireAfterWrite(config.cacheTimeout, TimeUnit.MILLISECONDS) - .build(CacheLoader.from { _ -> networkMapStorage.getCurrentNetworkMap()?.let { + .build({ _ -> + networkMapStorage.getCurrentNetworkMap()?.let { val networkMap = it.verified() CachedData(it, networkMap.nodeInfoHashes.toSet(), networkMapStorage.getSignedNetworkParameters(networkMap.networkParameterHash)?.verified()) } }) - private val nodeInfoCache: LoadingCache = CacheBuilder.newBuilder() + private val nodeInfoCache: LoadingCache = Caffeine.newBuilder() // TODO: Define cache retention policy. .softValues() - .build(CacheLoader.from { key -> + .build({ key -> key?.let { nodeInfoStorage.getNodeInfo(it) } }) - private val currentSignedNetworkMap: SignedNetworkMap? get() = networkMapCache.getOrNull(true)?.signedNetworkMap - private val currentNodeInfoHashes: Set get() = networkMapCache.getOrNull(true)?.nodeInfoHashes ?: emptySet() - private val currentNetworkParameters: NetworkParameters? get() = networkMapCache.getOrNull(true)?.currentNetworkParameter + private val currentSignedNetworkMap: SignedNetworkMap? get() = networkMapCache.get(true)?.signedNetworkMap + private val currentNodeInfoHashes: Set get() = networkMapCache.get(true)?.nodeInfoHashes ?: emptySet() + private val currentNetworkParameters: NetworkParameters? get() = networkMapCache.get(true)?.currentNetworkParameter @POST @Path("publish") @@ -104,7 +104,7 @@ class NetworkMapWebService(private val nodeInfoStorage: NodeInfoStorage, // Only serve node info if its in the current network map, otherwise return 404. logger.trace { "Processing node info request for hash: '$nodeInfoHash'" } val signedNodeInfo = if (SecureHash.parse(nodeInfoHash) in currentNodeInfoHashes) { - nodeInfoCache.getOrNull(SecureHash.parse(nodeInfoHash)) + nodeInfoCache.get(SecureHash.parse(nodeInfoHash)) } else { logger.trace { "Requested node info is not current, returning null." } null @@ -155,13 +155,4 @@ class NetworkMapWebService(private val nodeInfoStorage: NodeInfoStorage, private data class CachedData(val signedNetworkMap: SignedNetworkMap, val nodeInfoHashes: Set, val currentNetworkParameter: NetworkParameters?) - // Guava loading cache will throw if value is null, this helper method returns null instead. - // The loading cache will load the data from persistence again ignoring timeout if previous value was null. - private fun LoadingCache.getOrNull(key: K): V? { - return try { - get(key) - } catch (e: CacheLoader.InvalidCacheLoadException) { - null - } - } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt index 46913bcdc4..63a87f013d 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt @@ -149,7 +149,10 @@ interface ReceivedMessage : Message { val peer: CordaX500Name /** Platform version of the sender's node. */ val platformVersion: Int + /** UUID representing the sending JVM */ val senderSeqNo: Long? + /** True if a flow session init message */ + val isSessionInit: Boolean } /** A singleton that's useful for validating topic strings */ diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt index f4cdbef670..89f8a69d3e 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt @@ -10,7 +10,7 @@ package net.corda.node.services.messaging -import com.google.common.cache.CacheBuilder +import com.github.benmanes.caffeine.cache.Caffeine import net.corda.core.identity.CordaX500Name import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.utilities.AppendOnlyPersistentMap @@ -32,7 +32,7 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) { private val processedMessages = createProcessedMessages() // We add the peer to the key, so other peers cannot attempt malicious meddling with sequence numbers. // Expire after 7 days since we last touched an entry, to avoid infinite growth. - private val senderUUIDSeqNoHWM: MutableMap, Long> = CacheBuilder.newBuilder().expireAfterAccess(7, TimeUnit.DAYS).build, Long>().asMap() + private val senderUUIDSeqNoHWM: MutableMap, Long> = Caffeine.newBuilder().expireAfterAccess(7, TimeUnit.DAYS).build, Long>().asMap() private fun createProcessedMessages(): AppendOnlyPersistentMap { return AppendOnlyPersistentMap( @@ -62,7 +62,7 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) { * We also ensure the UUID cannot be spoofed, by incorporating the authenticated sender into the key of the map/cache. */ private fun isDuplicateWithPotentialOptimization(receivedSenderUUID: String, receivedSenderSeqNo: Long, msg: ReceivedMessage): Boolean { - return senderUUIDSeqNoHWM.compute(Pair(receivedSenderUUID, msg.peer)) { key, existingSeqNoHWM -> + return senderUUIDSeqNoHWM.compute(Triple(receivedSenderUUID, msg.peer, msg.isSessionInit)) { key, existingSeqNoHWM -> val isNewHWM = (existingSeqNoHWM != null && existingSeqNoHWM < receivedSenderSeqNo) if (isNewHWM) { // If we are the new HWM, set the HWM to us. diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 97e7eca293..354b2edd95 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -398,9 +398,10 @@ class P2PMessagingClient(val config: NodeConfiguration, val uniqueMessageId = message.required(HDR_DUPLICATE_DETECTION_ID) { DeduplicationId(message.getStringProperty(it)) } val receivedSenderUUID = message.getStringProperty(P2PMessagingHeaders.senderUUID) val receivedSenderSeqNo = if (message.containsProperty(P2PMessagingHeaders.senderSeqNo)) message.getLongProperty(P2PMessagingHeaders.senderSeqNo) else null - log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId senderUUID: $receivedSenderUUID senderSeqNo: $receivedSenderSeqNo" } + val isSessionInit = message.getStringProperty(P2PMessagingHeaders.Type.KEY) == P2PMessagingHeaders.Type.SESSION_INIT_VALUE + log.trace { "Received message from: ${message.address} user: $user topic: $topic id: $uniqueMessageId senderUUID: $receivedSenderUUID senderSeqNo: $receivedSenderSeqNo isSessionInit: $isSessionInit" } - return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, receivedSenderUUID, receivedSenderSeqNo, message) + return ArtemisReceivedMessage(topic, CordaX500Name.parse(user), platformVersion, uniqueMessageId, receivedSenderUUID, receivedSenderSeqNo, isSessionInit, message) } catch (e: Exception) { log.error("Unable to process message, ignoring it: $message", e) return null @@ -418,6 +419,7 @@ class P2PMessagingClient(val config: NodeConfiguration, override val uniqueMessageId: DeduplicationId, override val senderUUID: String?, override val senderSeqNo: Long?, + override val isSessionInit: Boolean, private val message: ClientMessage) : ReceivedMessage { override val data: ByteSequence by lazy { OpaqueBytes(ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }) } override val debugTimestamp: Instant get() = Instant.ofEpochMilli(message.timestamp) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index 19b72b288b..883c29c1ea 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -28,12 +28,7 @@ import net.corda.core.utilities.ByteSequence import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.trace -import net.corda.node.services.messaging.AcknowledgeHandle -import net.corda.node.services.messaging.Message -import net.corda.node.services.messaging.MessageHandler -import net.corda.node.services.messaging.MessageHandlerRegistration -import net.corda.node.services.messaging.MessagingService -import net.corda.node.services.messaging.ReceivedMessage +import net.corda.node.services.messaging.* import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.utilities.AffinityExecutor import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -307,7 +302,9 @@ class InMemoryMessagingNetwork private constructor( override val debugTimestamp: Instant, override val peer: CordaX500Name, override val senderUUID: String? = null, - override val senderSeqNo: Long? = null) : ReceivedMessage { + override val senderSeqNo: Long? = null, + /** Note this flag is never set in the in memory network. */ + override val isSessionInit: Boolean = false) : ReceivedMessage { override val additionalHeaders: Map = emptyMap() }