diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index 7084768069..165e2c1f75 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -95,7 +95,9 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: } } - private companion object { + internal companion object { + const val TRANSACTION_ALREADY_IN_PROGRESS_WARNING = "trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions." + // Rough estimate for the average of a public key and the transaction metadata - hard to get exact figures here, // as public keys can vary in size a lot, and if someone else is holding a reference to the key, it won't add // to the memory pressure at all here. @@ -111,7 +113,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: } } - fun createTransactionsMap(cacheFactory: NamedCacheFactory, clock: CordaClock) + private fun createTransactionsMap(cacheFactory: NamedCacheFactory, clock: CordaClock) : AppendOnlyPersistentMapBase { return WeightBasedAppendOnlyPersistentMap( cacheFactory = cacheFactory, @@ -221,12 +223,22 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: } override fun trackTransaction(id: SecureHash): CordaFuture { + val (transaction, warning) = trackTransactionInternal(id) + warning?.also { log.warn(it) } + return transaction + } - if (contextTransactionOrNull != null) { - log.warn("trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions.") + /** + * @return a pair of the signed transaction, and a string containing any warning. + */ + internal fun trackTransactionInternal(id: SecureHash): Pair, String?> { + val warning: String? = if (contextTransactionOrNull != null) { + TRANSACTION_ALREADY_IN_PROGRESS_WARNING + } else { + null } - return trackTransactionWithNoWarning(id) + return Pair(trackTransactionWithNoWarning(id), warning) } override fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture { diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt index 39155a335a..51b400c321 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt @@ -9,22 +9,22 @@ import net.corda.core.crypto.SignatureMetadata import net.corda.core.crypto.TransactionSignature import net.corda.core.toFuture import net.corda.core.transactions.SignedTransaction -import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.CordaClock import net.corda.node.MutableClock import net.corda.node.SimpleClock +import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig -import net.corda.testing.core.* +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.core.TestIdentity +import net.corda.testing.core.dummyCommand import net.corda.testing.internal.LogHelper import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.configureDatabase import net.corda.testing.internal.createWireTransaction import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties -import org.apache.logging.log4j.LogManager -import org.apache.logging.log4j.core.Appender -import org.apache.logging.log4j.core.LoggerContext -import org.apache.logging.log4j.core.appender.WriterAppender import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Assert @@ -32,10 +32,9 @@ import org.junit.Before import org.junit.Rule import org.junit.Test import rx.plugins.RxJavaHooks -import java.io.StringWriter -import java.util.concurrent.Semaphore import java.time.Clock import java.time.Instant +import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit import kotlin.concurrent.thread import kotlin.test.assertEquals @@ -381,47 +380,14 @@ class DBTransactionStorageTests { val signedTransaction = newTransaction() // Act - val logMessages = collectLogsFrom { - database.transaction { - val result = transactionStorage.trackTransaction(signedTransaction.id) - result.cancel(false) - } + val warning = database.transaction { + val (result, warning) = transactionStorage.trackTransactionInternal(signedTransaction.id) + result.cancel(false) + warning } // Assert - assertThat(logMessages).contains("trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions.") - } - - private fun collectLogsFrom(statement: () -> Unit): String { - // Create test appender - val stringWriter = StringWriter() - val appenderName = this::collectLogsFrom.name - val appender: Appender = WriterAppender.createAppender( - null, - null, - stringWriter, - appenderName, - false, - true - ) - appender.start() - - // Add test appender - val context = LogManager.getContext(false) as LoggerContext - val configuration = context.configuration - configuration.addAppender(appender) - configuration.loggers.values.forEach { it.addAppender(appender, null, null) } - - try { - statement() - } finally { - // Remove test appender - configuration.loggers.values.forEach { it.removeAppender(appenderName) } - configuration.appenders.remove(appenderName) - appender.stop() - } - - return stringWriter.toString() + assertThat(warning).isEqualTo(DBTransactionStorage.TRANSACTION_ALREADY_IN_PROGRESS_WARNING) } private fun newTransactionStorage(cacheSizeBytesOverride: Long? = null, clock: CordaClock = SimpleClock(Clock.systemUTC())) {