Merge remote-tracking branch 'origin/release/os/4.4' into rni/notick/os-44-45-db

This commit is contained in:
Ross Nicoll 2020-09-01 16:06:03 +01:00
commit aa1b74d510
2 changed files with 29 additions and 51 deletions

View File

@ -95,7 +95,9 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
} }
} }
private companion object { internal companion object {
const val TRANSACTION_ALREADY_IN_PROGRESS_WARNING = "trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions."
// Rough estimate for the average of a public key and the transaction metadata - hard to get exact figures here, // Rough estimate for the average of a public key and the transaction metadata - hard to get exact figures here,
// as public keys can vary in size a lot, and if someone else is holding a reference to the key, it won't add // as public keys can vary in size a lot, and if someone else is holding a reference to the key, it won't add
// to the memory pressure at all here. // to the memory pressure at all here.
@ -111,7 +113,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
} }
} }
fun createTransactionsMap(cacheFactory: NamedCacheFactory, clock: CordaClock) private fun createTransactionsMap(cacheFactory: NamedCacheFactory, clock: CordaClock)
: AppendOnlyPersistentMapBase<SecureHash, TxCacheValue, DBTransaction, String> { : AppendOnlyPersistentMapBase<SecureHash, TxCacheValue, DBTransaction, String> {
return WeightBasedAppendOnlyPersistentMap<SecureHash, TxCacheValue, DBTransaction, String>( return WeightBasedAppendOnlyPersistentMap<SecureHash, TxCacheValue, DBTransaction, String>(
cacheFactory = cacheFactory, cacheFactory = cacheFactory,
@ -221,12 +223,22 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
} }
override fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction> { override fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction> {
val (transaction, warning) = trackTransactionInternal(id)
warning?.also { log.warn(it) }
return transaction
}
if (contextTransactionOrNull != null) { /**
log.warn("trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions.") * @return a pair of the signed transaction, and a string containing any warning.
*/
internal fun trackTransactionInternal(id: SecureHash): Pair<CordaFuture<SignedTransaction>, String?> {
val warning: String? = if (contextTransactionOrNull != null) {
TRANSACTION_ALREADY_IN_PROGRESS_WARNING
} else {
null
} }
return trackTransactionWithNoWarning(id) return Pair(trackTransactionWithNoWarning(id), warning)
} }
override fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture<SignedTransaction> { override fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture<SignedTransaction> {

View File

@ -9,22 +9,22 @@ import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature import net.corda.core.crypto.TransactionSignature
import net.corda.core.toFuture import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.CordaClock import net.corda.node.CordaClock
import net.corda.node.MutableClock import net.corda.node.MutableClock
import net.corda.node.SimpleClock import net.corda.node.SimpleClock
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.* import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.dummyCommand
import net.corda.testing.internal.LogHelper import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase import net.corda.testing.internal.configureDatabase
import net.corda.testing.internal.createWireTransaction import net.corda.testing.internal.createWireTransaction
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.core.Appender
import org.apache.logging.log4j.core.LoggerContext
import org.apache.logging.log4j.core.appender.WriterAppender
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.After import org.junit.After
import org.junit.Assert import org.junit.Assert
@ -32,10 +32,9 @@ import org.junit.Before
import org.junit.Rule import org.junit.Rule
import org.junit.Test import org.junit.Test
import rx.plugins.RxJavaHooks import rx.plugins.RxJavaHooks
import java.io.StringWriter
import java.util.concurrent.Semaphore
import java.time.Clock import java.time.Clock
import java.time.Instant import java.time.Instant
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kotlin.concurrent.thread import kotlin.concurrent.thread
import kotlin.test.assertEquals import kotlin.test.assertEquals
@ -381,47 +380,14 @@ class DBTransactionStorageTests {
val signedTransaction = newTransaction() val signedTransaction = newTransaction()
// Act // Act
val logMessages = collectLogsFrom { val warning = database.transaction {
database.transaction { val (result, warning) = transactionStorage.trackTransactionInternal(signedTransaction.id)
val result = transactionStorage.trackTransaction(signedTransaction.id) result.cancel(false)
result.cancel(false) warning
}
} }
// Assert // Assert
assertThat(logMessages).contains("trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions.") assertThat(warning).isEqualTo(DBTransactionStorage.TRANSACTION_ALREADY_IN_PROGRESS_WARNING)
}
private fun collectLogsFrom(statement: () -> Unit): String {
// Create test appender
val stringWriter = StringWriter()
val appenderName = this::collectLogsFrom.name
val appender: Appender = WriterAppender.createAppender(
null,
null,
stringWriter,
appenderName,
false,
true
)
appender.start()
// Add test appender
val context = LogManager.getContext(false) as LoggerContext
val configuration = context.configuration
configuration.addAppender(appender)
configuration.loggers.values.forEach { it.addAppender(appender, null, null) }
try {
statement()
} finally {
// Remove test appender
configuration.loggers.values.forEach { it.removeAppender(appenderName) }
configuration.appenders.remove(appenderName)
appender.stop()
}
return stringWriter.toString()
} }
private fun newTransactionStorage(cacheSizeBytesOverride: Long? = null, clock: CordaClock = SimpleClock(Clock.systemUTC())) { private fun newTransactionStorage(cacheSizeBytesOverride: Long? = null, clock: CordaClock = SimpleClock(Clock.systemUTC())) {