diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt index 4a63927fad..bb95590de6 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt @@ -24,6 +24,10 @@ class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val databa private val beingProcessedMessages = ConcurrentHashMap() private val processedMessages = createProcessedMessages(cacheFactory) + enum class Outcome { + NEW, DUPLICATE, IN_FLIGHT + } + private fun createProcessedMessages(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap { return AppendOnlyPersistentMap( cacheFactory = cacheFactory, @@ -48,15 +52,17 @@ class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val databa private fun senderHash(senderKey: SenderKey) = SecureHash.sha256(senderKey.peer.toString() + senderKey.isSessionInit.toString() + senderKey.senderUUID).toString() /** - * @return true if we have seen this message before. + * @return IN_FLIGHT if this message is currently being processed by the state machine, otherwise indicate if DUPLICATE or NEW. */ - fun isDuplicate(msg: ReceivedMessage): Boolean { + fun checkDuplicate(msg: ReceivedMessage): Outcome { if (beingProcessedMessages.containsKey(msg.uniqueMessageId)) { - return true + return Outcome.IN_FLIGHT } - return isDuplicateInDatabase(msg) + return booleanToEnum(isDuplicateInDatabase(msg)) } + private fun booleanToEnum(isDuplicate: Boolean): Outcome = if (isDuplicate) Outcome.DUPLICATE else Outcome.NEW + /** * Called the first time we encounter [deduplicationId]. */ diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 7e0aa7dd02..ce0fcfc9f8 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -18,7 +18,15 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.serialization.serialize -import net.corda.core.utilities.* +import net.corda.core.utilities.ByteSequence +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug +import net.corda.core.utilities.loggerFor +import net.corda.core.utilities.minutes +import net.corda.core.utilities.seconds +import net.corda.core.utilities.trace import net.corda.node.VersionInfo import net.corda.node.internal.LifecycleSupport import net.corda.node.internal.artemis.ReactiveArtemisConsumer @@ -31,12 +39,15 @@ import net.corda.node.services.statemachine.SenderDeduplicationId import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.errorAndTerminate import net.corda.nodeapi.internal.ArtemisMessagingComponent -import net.corda.nodeapi.internal.ArtemisMessagingComponent.* +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX +import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry @@ -65,7 +76,9 @@ import rx.subjects.PublishSubject import java.security.PublicKey import java.time.Duration import java.time.Instant -import java.util.* +import java.util.Collections +import java.util.Timer +import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CountDownLatch import javax.annotation.concurrent.ThreadSafe @@ -440,12 +453,15 @@ class P2PMessagingClient(val config: NodeConfiguration, internal fun deliver(artemisMessage: ClientMessage) { artemisToCordaMessage(artemisMessage)?.let { cordaMessage -> - if (!deduplicator.isDuplicate(cordaMessage)) { + val outcome = deduplicator.checkDuplicate(cordaMessage) + if (outcome == P2PMessageDeduplicator.Outcome.NEW) { deduplicator.signalMessageProcessStart(cordaMessage) deliver(cordaMessage, artemisMessage) - } else { - log.trace { "Discard duplicate message ${cordaMessage.uniqueMessageId} for ${cordaMessage.topic}" } + } else if (outcome == P2PMessageDeduplicator.Outcome.DUPLICATE) { + log.debug { "Acknowledge duplicate message id: ${cordaMessage.uniqueMessageId} senderUUID: ${cordaMessage.senderUUID} senderSeqNo: ${cordaMessage.senderSeqNo} isSessionInit: ${cordaMessage.isSessionInit}" } messagingExecutor!!.acknowledge(artemisMessage) + } else { + log.debug { "Discard in-flight message id: ${cordaMessage.uniqueMessageId} senderUUID: ${cordaMessage.senderUUID} senderSeqNo: ${cordaMessage.senderSeqNo} isSessionInit: ${cordaMessage.isSessionInit}" } } } } diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index aeeea1dba8..24046f2941 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -101,7 +101,9 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: // Rough estimate for the average of a public key and the transaction metadata - hard to get exact figures here, // as public keys can vary in size a lot, and if someone else is holding a reference to the key, it won't add // to the memory pressure at all here. - private const val transactionSignatureOverheadEstimate = 1024 + private const val TRANSACTION_SIGNATURE_OVERHEAD_BYTES = 1024 + private const val TXCACHEVALUE_OVERHEAD_BYTES = 80 + private const val SECUREHASH_OVERHEAD_BYTES = 24 private val logger = contextLogger() @@ -134,13 +136,13 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: ) }, persistentEntityClass = DBTransaction::class.java, - weighingFunc = { hash, tx -> hash.size + weighTx(tx) } + weighingFunc = { hash, tx -> SECUREHASH_OVERHEAD_BYTES + hash.size + weighTx(tx) } ) } - private fun weighTx(tx: AppendOnlyPersistentMapBase.Transactional): Int { - val actTx = tx.peekableValue ?: return 0 - return actTx.sigs.sumBy { it.size + transactionSignatureOverheadEstimate } + actTx.txBits.size + private fun weighTx(actTx: TxCacheValue?): Int { + if (actTx == null) return 0 + return TXCACHEVALUE_OVERHEAD_BYTES + actTx.sigs.sumBy { it.size + TRANSACTION_SIGNATURE_OVERHEAD_BYTES } + actTx.txBits.size } private val log = contextLogger() diff --git a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt index f45ddbb7cf..570172fa06 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt @@ -32,8 +32,10 @@ abstract class AppendOnlyPersistentMapBase( private val log = contextLogger() } + protected class PendingKeyValue(val transactions: MutableSet, val estimatedSize: Int) + protected abstract val cache: LoadingCache> - protected val pendingKeys = ConcurrentHashMap>() + protected val pendingKeys = ConcurrentHashMap() /** * Returns the value associated with the key, first loading that value from the storage if necessary. @@ -85,7 +87,8 @@ abstract class AppendOnlyPersistentMapBase( // for cases where the value passed to set differs from that in the cache, but an update function has decided that this // differing value should not be written to the database. if (wasWritten) { - Transactional.InFlight(this, key, _readerValueLoader = { loadValue(key) }).apply { alsoWrite(value) } + Transactional.InFlight(this, key, weight = weight(key, value), _readerValueLoader = { loadValue(key) }) + .apply { alsoWrite(value) } } else { oldValueInCache } @@ -120,7 +123,8 @@ abstract class AppendOnlyPersistentMapBase( Transactional.Committed(oldValue) } else { // Some database transactions, including us, writing, with readers seeing whatever is in the database and writers seeing the (in memory) value. - Transactional.InFlight(this, key, _readerValueLoader = { loadValue(key) }).apply { alsoWrite(value) } + Transactional.InFlight(this, key, weight = weight(key, value), _readerValueLoader = { loadValue(key) }) + .apply { alsoWrite(value) } } } @@ -214,11 +218,12 @@ abstract class AppendOnlyPersistentMapBase( protected fun transactionalLoadValue(key: K): Transactional { // This gets called if a value is read and the cache has no Transactional for this key yet. - return if (anyoneWriting(key)) { + val estimatedSize = anyoneWriting(key) + return if (estimatedSize != -1) { // If someone is writing (but not us) // For those not writing, they need to re-load the value from the database (which their database transaction MIGHT see). // For those writing, they need to re-load the value from the database (which their database transaction CAN see). - Transactional.InFlight(this, key, { loadValue(key) }, { loadValue(key)!! }) + Transactional.InFlight(this, key, estimatedSize, { loadValue(key) }, { loadValue(key)!! }) } else { // If no one is writing, then the value may or may not exist in the database. Transactional.Unknown(this, key) { loadValue(key) } @@ -240,21 +245,24 @@ abstract class AppendOnlyPersistentMapBase( } // Helpers to know if transaction(s) are currently writing the given key. - private fun weAreWriting(key: K): Boolean = pendingKeys[key]?.contains(contextTransaction) ?: false + private fun weAreWriting(key: K): Boolean = pendingKeys[key]?.transactions?.contains(contextTransaction) ?: false - private fun anyoneWriting(key: K): Boolean = pendingKeys[key]?.isNotEmpty() ?: false + private fun anyoneWriting(key: K): Int = pendingKeys[key]?.estimatedSize ?: -1 + + protected open fun weight(key: K, value: V): Int = 1 // Indicate this database transaction is a writer of this key. - private fun addPendingKey(key: K, databaseTransaction: DatabaseTransaction): Boolean { + private fun addPendingKey(key: K, databaseTransaction: DatabaseTransaction, estimatedSize: Int): Boolean { var added = true - pendingKeys.compute(key) { _, oldSet -> + pendingKeys.compute(key) { _, value: PendingKeyValue? -> + val oldSet = value?.transactions if (oldSet == null) { val newSet = HashSet(0) newSet += databaseTransaction - newSet + PendingKeyValue(newSet, estimatedSize) } else { added = oldSet.add(databaseTransaction) - oldSet + value } } return added @@ -262,12 +270,13 @@ abstract class AppendOnlyPersistentMapBase( // Remove this database transaction as a writer of this key, because the transaction committed or rolled back. private fun removePendingKey(key: K, databaseTransaction: DatabaseTransaction) { - pendingKeys.compute(key) { _, oldSet -> + pendingKeys.compute(key) { _, value: PendingKeyValue? -> + val oldSet = value?.transactions if (oldSet == null) { - oldSet + null } else { oldSet -= databaseTransaction - if (oldSet.size == 0) null else oldSet + if (oldSet.size == 0) null else value } } } @@ -278,10 +287,12 @@ abstract class AppendOnlyPersistentMapBase( * There are 3 states. Globally missing, globally visible, and being written in a transaction somewhere now or in * the past (and it rolled back). */ + @Suppress("MagicNumber") sealed class Transactional { abstract val value: T abstract val isPresent: Boolean abstract val peekableValue: T? + abstract val shallowSize: Int fun orElse(alt: T?) = if (isPresent) value else alt @@ -291,6 +302,8 @@ abstract class AppendOnlyPersistentMapBase( get() = true override val peekableValue: T? get() = value + override val shallowSize: Int + get() = 48 } // No one can see it. @@ -301,6 +314,8 @@ abstract class AppendOnlyPersistentMapBase( get() = false override val peekableValue: T? get() = null + override val shallowSize: Int + get() = 16 } // No one is writing, but we haven't looked in the database yet. This can only be when there are no writers. @@ -323,12 +338,15 @@ abstract class AppendOnlyPersistentMapBase( } val isResolved: Boolean get() = valueWithoutIsolationDelegate.isInitialized() override val peekableValue: T? get() = if (isResolved && isPresent) value else null + override val shallowSize: Int + get() = 128 } // Written in a transaction (uncommitted) somewhere, but there's a small window when this might be seen after commit, // hence the committed flag. class InFlight(private val map: AppendOnlyPersistentMapBase, private val key: K, + val weight: Int, private val _readerValueLoader: () -> T?, private val _writerValueLoader: () -> T = { throw IllegalAccessException("No value loader provided") }) : Transactional() { @@ -352,7 +370,7 @@ abstract class AppendOnlyPersistentMapBase( val tx = contextTransaction val strongKey = key val strongMap = map - if (map.addPendingKey(key, tx)) { + if (map.addPendingKey(key, tx, weight)) { // If the transaction commits, update cache to make globally visible if we're first for this key, // and then stop saying the transaction is writing the key. tx.onCommit { @@ -414,6 +432,9 @@ abstract class AppendOnlyPersistentMapBase( // The value from the perspective of the eviction algorithm of the cache. i.e. we want to reveal memory footprint to it etc. override val peekableValue: T? get() = if (writerValueLoader.get() != _writerValueLoader) writerValueLoader.get()() else if (readerValueLoader.get() != _readerValueLoader) readerValueLoader.get()() else null + + override val shallowSize: Int + get() = 256 } } } @@ -445,15 +466,24 @@ class WeightBasedAppendOnlyPersistentMap( fromPersistentEntity: (E) -> Pair, toPersistentEntity: (key: K, value: V) -> E, persistentEntityClass: Class, - weighingFunc: (K, Transactional) -> Int + private val weighingFunc: (K, V?) -> Int ) : AppendOnlyPersistentMapBase( toPersistentEntityKey, fromPersistentEntity, toPersistentEntity, persistentEntityClass) { + + override fun weight(key: K, value: V): Int = weighingFunc(key, value) + override val cache = NonInvalidatingWeightBasedCache( cacheFactory = cacheFactory, name = name, - weigher = Weigher { key, value -> weighingFunc(key, value) }, + weigher = Weigher { key, value: Transactional -> + value.shallowSize + if (value is Transactional.InFlight<*, *>) { + value.weight * 2 + } else { + weighingFunc(key, value.peekableValue) + } + }, loadFunction = { key: K -> transactionalLoadValue(key) }) }