diff --git a/docker/src/docker/Dockerfile-debug b/docker/src/docker/Dockerfile-debug new file mode 100644 index 0000000000..5b0c7bbb1f --- /dev/null +++ b/docker/src/docker/Dockerfile-debug @@ -0,0 +1,62 @@ +FROM azul/zulu-openjdk:8u192 + +## Add packages, clean cache, create dirs, create corda user and change ownership +RUN apt-get update && \ + apt-get -y upgrade && \ + apt-get -y install bash curl unzip netstat lsof telnet netcat && \ + rm -rf /var/lib/apt/lists/* && \ + mkdir -p /opt/corda/cordapps && \ + mkdir -p /opt/corda/persistence && \ + mkdir -p /opt/corda/certificates && \ + mkdir -p /opt/corda/drivers && \ + mkdir -p /opt/corda/logs && \ + mkdir -p /opt/corda/bin && \ + mkdir -p /opt/corda/additional-node-infos && \ + mkdir -p /etc/corda && \ + addgroup corda && \ + useradd corda -g corda -m -d /opt/corda && \ + chown -R corda:corda /opt/corda && \ + chown -R corda:corda /etc/corda + +ENV CORDAPPS_FOLDER="/opt/corda/cordapps" \ + PERSISTENCE_FOLDER="/opt/corda/persistence" \ + CERTIFICATES_FOLDER="/opt/corda/certificates" \ + DRIVERS_FOLDER="/opt/corda/drivers" \ + CONFIG_FOLDER="/etc/corda" \ + MY_P2P_PORT=10200 \ + MY_RPC_PORT=10201 \ + MY_RPC_ADMIN_PORT=10202 \ + PATH=$PATH:/opt/corda/bin \ + JVM_ARGS="-XX:+UseG1GC -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap " \ + CORDA_ARGS="" + +##CORDAPPS FOLDER +VOLUME ["/opt/corda/cordapps"] +##PERSISTENCE FOLDER +VOLUME ["/opt/corda/persistence"] +##CERTS FOLDER +VOLUME ["/opt/corda/certificates"] +##OPTIONAL JDBC DRIVERS FOLDER +VOLUME ["/opt/corda/drivers"] +##LOG FOLDER +VOLUME ["/opt/corda/logs"] +##ADDITIONAL NODE INFOS FOLDER +VOLUME ["/opt/corda/additional-node-infos"] +##CONFIG LOCATION +VOLUME ["/etc/corda"] + +##CORDA JAR +COPY --chown=corda:corda corda.jar /opt/corda/bin/corda.jar +##CONFIG MANIPULATOR JAR +COPY --chown=corda:corda config-exporter.jar /opt/corda/config-exporter.jar +##CONFIG GENERATOR SHELL SCRIPT +COPY --chown=corda:corda generate-config.sh /opt/corda/bin/config-generator +##CORDA RUN SCRIPT +COPY --chown=corda:corda run-corda.sh /opt/corda/bin/run-corda +##BASE CONFIG FOR GENERATOR +COPY --chown=corda:corda starting-node.conf /opt/corda/starting-node.conf + +USER "corda" +EXPOSE ${MY_P2P_PORT} ${MY_RPC_PORT} ${MY_RPC_ADMIN_PORT} +WORKDIR /opt/corda +CMD ["run-corda"] diff --git a/docker/src/docker/DockerfileAL-debug b/docker/src/docker/DockerfileAL-debug new file mode 100644 index 0000000000..3cc6a9f0e7 --- /dev/null +++ b/docker/src/docker/DockerfileAL-debug @@ -0,0 +1,66 @@ +FROM amazonlinux:2 + +## Add packages, clean cache, create dirs, create corda user and change ownership +RUN amazon-linux-extras enable corretto8 && \ + yum -y install java-1.8.0-amazon-corretto-devel && \ + yum -y install bash && \ + yum -y install curl && \ + yum -y install unzip && \ + yum -y install lsof telnet net-tools nmap-ncat && \ + yum clean all && \ + rm -rf /var/cache/yum && \ + mkdir -p /opt/corda/cordapps && \ + mkdir -p /opt/corda/persistence && \ + mkdir -p /opt/corda/certificates && \ + mkdir -p /opt/corda/drivers && \ + mkdir -p /opt/corda/logs && \ + mkdir -p /opt/corda/bin && \ + mkdir -p /opt/corda/additional-node-infos && \ + mkdir -p /etc/corda && \ + groupadd corda && \ + useradd corda -g corda -m -d /opt/corda && \ + chown -R corda:corda /opt/corda && \ + chown -R corda:corda /etc/corda + +ENV CORDAPPS_FOLDER="/opt/corda/cordapps" \ + PERSISTENCE_FOLDER="/opt/corda/persistence" \ + CERTIFICATES_FOLDER="/opt/corda/certificates" \ + DRIVERS_FOLDER="/opt/corda/drivers" \ + CONFIG_FOLDER="/etc/corda" \ + MY_P2P_PORT=10200 \ + MY_RPC_PORT=10201 \ + MY_RPC_ADMIN_PORT=10202 \ + PATH=$PATH:/opt/corda/bin \ + JVM_ARGS="-XX:+UseG1GC -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap " \ + CORDA_ARGS="" + +##CORDAPPS FOLDER +VOLUME ["/opt/corda/cordapps"] +##PERSISTENCE FOLDER +VOLUME ["/opt/corda/persistence"] +##CERTS FOLDER +VOLUME ["/opt/corda/certificates"] +##OPTIONAL JDBC DRIVERS FOLDER +VOLUME ["/opt/corda/drivers"] +##LOG FOLDER +VOLUME ["/opt/corda/logs"] +##ADDITIONAL NODE INFOS FOLDER +VOLUME ["/opt/corda/additional-node-infos"] +##CONFIG LOCATION +VOLUME ["/etc/corda"] + +##CORDA JAR +COPY --chown=corda:corda corda.jar /opt/corda/bin/corda.jar +##CONFIG MANIPULATOR JAR +COPY --chown=corda:corda config-exporter.jar /opt/corda/config-exporter.jar +##CONFIG GENERATOR SHELL SCRIPT +COPY --chown=corda:corda generate-config.sh /opt/corda/bin/config-generator +##CORDA RUN SCRIPT +COPY --chown=corda:corda run-corda.sh /opt/corda/bin/run-corda +##BASE CONFIG FOR GENERATOR +COPY --chown=corda:corda starting-node.conf /opt/corda/starting-node.conf + +USER "corda" +EXPOSE ${MY_P2P_PORT} ${MY_RPC_PORT} ${MY_RPC_ADMIN_PORT} +WORKDIR /opt/corda +CMD ["run-corda"] diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 33ae457877..6fa3ed5869 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -261,6 +261,12 @@ interface WritableTransactionStorage : TransactionStorage { * ID exists. */ fun getTransactionInternal(id: SecureHash): Pair? + + /** + * Returns a future that completes with the transaction corresponding to [id] once it has been committed. Do not warn when run inside + * a DB transaction. + */ + fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture } /** diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt index b0e271e642..7084768069 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt @@ -140,6 +140,8 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: val actTx = tx.peekableValue ?: return 0 return actTx.sigs.sumBy { it.size + transactionSignatureOverheadEstimate } + actTx.txBits.size } + + private val log = contextLogger() } private val txStorage = ThreadBox(createTransactionsMap(cacheFactory, clock)) @@ -219,12 +221,24 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: } override fun trackTransaction(id: SecureHash): CordaFuture { + + if (contextTransactionOrNull != null) { + log.warn("trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions.") + } + + return trackTransactionWithNoWarning(id) + } + + override fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture { + val updateFuture = updates.filter { it.id == id }.toFuture() return database.transaction { txStorage.locked { val existingTransaction = getTransaction(id) if (existingTransaction == null) { updates.filter { it.id == id }.toFuture() + updateFuture } else { + updateFuture.cancel(false) doneFuture(existingTransaction) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index 9af0caab1f..882f961876 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -89,7 +89,7 @@ class ActionExecutorImpl( @Suspendable private fun executeTrackTransaction(fiber: FlowFiber, action: Action.TrackTransaction) { - services.validatedTransactions.trackTransaction(action.hash).thenMatch( + services.validatedTransactions.trackTransactionWithNoWarning(action.hash).thenMatch( success = { transaction -> fiber.scheduleEvent(Event.TransactionCommitted(transaction)) }, diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index 18fa7c40c4..85a99576f2 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -733,6 +733,12 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) { } } + override fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture { + return database.transaction { + delegate.trackTransactionWithNoWarning(id) + } + } + override fun track(): DataFeed, SignedTransaction> { return database.transaction { delegate.track() diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt index 0dabdd1069..39155a335a 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt @@ -1,6 +1,7 @@ package net.corda.node.services.persistence import junit.framework.TestCase.assertTrue +import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.StateRef import net.corda.core.crypto.Crypto import net.corda.core.crypto.SecureHash @@ -20,14 +21,23 @@ import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.configureDatabase import net.corda.testing.internal.createWireTransaction import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.core.Appender +import org.apache.logging.log4j.core.LoggerContext +import org.apache.logging.log4j.core.appender.WriterAppender import org.assertj.core.api.Assertions.assertThat import org.junit.After +import org.junit.Assert import org.junit.Before import org.junit.Rule import org.junit.Test +import rx.plugins.RxJavaHooks +import java.io.StringWriter +import java.util.concurrent.Semaphore import java.time.Clock import java.time.Instant import java.util.concurrent.TimeUnit +import kotlin.concurrent.thread import kotlin.test.assertEquals class DBTransactionStorageTests { @@ -38,7 +48,7 @@ class DBTransactionStorageTests { @Rule @JvmField - val testSerialization = SerializationEnvironmentRule() + val testSerialization = SerializationEnvironmentRule(inheritable = true) private lateinit var database: CordaPersistence private lateinit var transactionStorage: DBTransactionStorage @@ -313,6 +323,107 @@ class DBTransactionStorageTests { assertTransactionIsRetrievable(secondTransaction) } + @Suppress("UnstableApiUsage") + @Test(timeout=300_000) + fun `race condition - failure path`() { + + // Insert a sleep into trackTransaction + RxJavaHooks.setOnObservableCreate { + Thread.sleep(1_000) + it + } + try { + `race condition - ok path`() + } finally { + // Remove sleep so it does not affect other tests + RxJavaHooks.setOnObservableCreate { it } + } + } + + @Test(timeout=300_000) + fun `race condition - ok path`() { + + // Arrange + + val signedTransaction = newTransaction() + + val threadCount = 2 + val finishedThreadsSemaphore = Semaphore(threadCount) + finishedThreadsSemaphore.acquire(threadCount) + + // Act + + thread(name = "addTransaction") { + transactionStorage.addTransaction(signedTransaction) + finishedThreadsSemaphore.release() + } + + var result: CordaFuture? = null + thread(name = "trackTransaction") { + result = transactionStorage.trackTransaction(signedTransaction.id) + finishedThreadsSemaphore.release() + } + + if (!finishedThreadsSemaphore.tryAcquire(threadCount, 1, TimeUnit.MINUTES)) { + Assert.fail("Threads did not finish") + } + + // Assert + + assertThat(result).isNotNull() + assertThat(result?.get(20, TimeUnit.SECONDS)?.id).isEqualTo(signedTransaction.id) + } + + @Test(timeout=300_000) + fun `race condition - transaction warning`() { + + // Arrange + val signedTransaction = newTransaction() + + // Act + val logMessages = collectLogsFrom { + database.transaction { + val result = transactionStorage.trackTransaction(signedTransaction.id) + result.cancel(false) + } + } + + // Assert + assertThat(logMessages).contains("trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions.") + } + + private fun collectLogsFrom(statement: () -> Unit): String { + // Create test appender + val stringWriter = StringWriter() + val appenderName = this::collectLogsFrom.name + val appender: Appender = WriterAppender.createAppender( + null, + null, + stringWriter, + appenderName, + false, + true + ) + appender.start() + + // Add test appender + val context = LogManager.getContext(false) as LoggerContext + val configuration = context.configuration + configuration.addAppender(appender) + configuration.loggers.values.forEach { it.addAppender(appender, null, null) } + + try { + statement() + } finally { + // Remove test appender + configuration.loggers.values.forEach { it.removeAppender(appenderName) } + configuration.appenders.remove(appenderName) + appender.stop() + } + + return stringWriter.toString() + } + private fun newTransactionStorage(cacheSizeBytesOverride: Long? = null, clock: CordaClock = SimpleClock(Clock.systemUTC())) { transactionStorage = DBTransactionStorage(database, TestingNamedCacheFactory(cacheSizeBytesOverride ?: 1024), clock) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt index d9d090b62f..c1cebf95e1 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt @@ -21,6 +21,10 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali return getTransaction(id)?.let { doneFuture(it) } ?: _updatesPublisher.filter { it.id == id }.toFuture() } + override fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture { + return trackTransaction(id) + } + override fun track(): DataFeed, SignedTransaction> { return DataFeed(txns.values.mapNotNull { if (it.isVerified) it.stx else null }, _updatesPublisher) }