Merge branch 'release/os/4.4' into dan/os-4.4-to-4.5-merge-2020-03-26

This commit is contained in:
LankyDan 2020-03-26 14:26:41 +00:00
commit d9c1907c88
8 changed files with 271 additions and 2 deletions

View File

@ -0,0 +1,62 @@
FROM azul/zulu-openjdk:8u192
## Add packages, clean cache, create dirs, create corda user and change ownership
RUN apt-get update && \
apt-get -y upgrade && \
apt-get -y install bash curl unzip netstat lsof telnet netcat && \
rm -rf /var/lib/apt/lists/* && \
mkdir -p /opt/corda/cordapps && \
mkdir -p /opt/corda/persistence && \
mkdir -p /opt/corda/certificates && \
mkdir -p /opt/corda/drivers && \
mkdir -p /opt/corda/logs && \
mkdir -p /opt/corda/bin && \
mkdir -p /opt/corda/additional-node-infos && \
mkdir -p /etc/corda && \
addgroup corda && \
useradd corda -g corda -m -d /opt/corda && \
chown -R corda:corda /opt/corda && \
chown -R corda:corda /etc/corda
ENV CORDAPPS_FOLDER="/opt/corda/cordapps" \
PERSISTENCE_FOLDER="/opt/corda/persistence" \
CERTIFICATES_FOLDER="/opt/corda/certificates" \
DRIVERS_FOLDER="/opt/corda/drivers" \
CONFIG_FOLDER="/etc/corda" \
MY_P2P_PORT=10200 \
MY_RPC_PORT=10201 \
MY_RPC_ADMIN_PORT=10202 \
PATH=$PATH:/opt/corda/bin \
JVM_ARGS="-XX:+UseG1GC -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap " \
CORDA_ARGS=""
##CORDAPPS FOLDER
VOLUME ["/opt/corda/cordapps"]
##PERSISTENCE FOLDER
VOLUME ["/opt/corda/persistence"]
##CERTS FOLDER
VOLUME ["/opt/corda/certificates"]
##OPTIONAL JDBC DRIVERS FOLDER
VOLUME ["/opt/corda/drivers"]
##LOG FOLDER
VOLUME ["/opt/corda/logs"]
##ADDITIONAL NODE INFOS FOLDER
VOLUME ["/opt/corda/additional-node-infos"]
##CONFIG LOCATION
VOLUME ["/etc/corda"]
##CORDA JAR
COPY --chown=corda:corda corda.jar /opt/corda/bin/corda.jar
##CONFIG MANIPULATOR JAR
COPY --chown=corda:corda config-exporter.jar /opt/corda/config-exporter.jar
##CONFIG GENERATOR SHELL SCRIPT
COPY --chown=corda:corda generate-config.sh /opt/corda/bin/config-generator
##CORDA RUN SCRIPT
COPY --chown=corda:corda run-corda.sh /opt/corda/bin/run-corda
##BASE CONFIG FOR GENERATOR
COPY --chown=corda:corda starting-node.conf /opt/corda/starting-node.conf
USER "corda"
EXPOSE ${MY_P2P_PORT} ${MY_RPC_PORT} ${MY_RPC_ADMIN_PORT}
WORKDIR /opt/corda
CMD ["run-corda"]

View File

@ -0,0 +1,66 @@
FROM amazonlinux:2
## Add packages, clean cache, create dirs, create corda user and change ownership
RUN amazon-linux-extras enable corretto8 && \
yum -y install java-1.8.0-amazon-corretto-devel && \
yum -y install bash && \
yum -y install curl && \
yum -y install unzip && \
yum -y install lsof telnet net-tools nmap-ncat && \
yum clean all && \
rm -rf /var/cache/yum && \
mkdir -p /opt/corda/cordapps && \
mkdir -p /opt/corda/persistence && \
mkdir -p /opt/corda/certificates && \
mkdir -p /opt/corda/drivers && \
mkdir -p /opt/corda/logs && \
mkdir -p /opt/corda/bin && \
mkdir -p /opt/corda/additional-node-infos && \
mkdir -p /etc/corda && \
groupadd corda && \
useradd corda -g corda -m -d /opt/corda && \
chown -R corda:corda /opt/corda && \
chown -R corda:corda /etc/corda
ENV CORDAPPS_FOLDER="/opt/corda/cordapps" \
PERSISTENCE_FOLDER="/opt/corda/persistence" \
CERTIFICATES_FOLDER="/opt/corda/certificates" \
DRIVERS_FOLDER="/opt/corda/drivers" \
CONFIG_FOLDER="/etc/corda" \
MY_P2P_PORT=10200 \
MY_RPC_PORT=10201 \
MY_RPC_ADMIN_PORT=10202 \
PATH=$PATH:/opt/corda/bin \
JVM_ARGS="-XX:+UseG1GC -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap " \
CORDA_ARGS=""
##CORDAPPS FOLDER
VOLUME ["/opt/corda/cordapps"]
##PERSISTENCE FOLDER
VOLUME ["/opt/corda/persistence"]
##CERTS FOLDER
VOLUME ["/opt/corda/certificates"]
##OPTIONAL JDBC DRIVERS FOLDER
VOLUME ["/opt/corda/drivers"]
##LOG FOLDER
VOLUME ["/opt/corda/logs"]
##ADDITIONAL NODE INFOS FOLDER
VOLUME ["/opt/corda/additional-node-infos"]
##CONFIG LOCATION
VOLUME ["/etc/corda"]
##CORDA JAR
COPY --chown=corda:corda corda.jar /opt/corda/bin/corda.jar
##CONFIG MANIPULATOR JAR
COPY --chown=corda:corda config-exporter.jar /opt/corda/config-exporter.jar
##CONFIG GENERATOR SHELL SCRIPT
COPY --chown=corda:corda generate-config.sh /opt/corda/bin/config-generator
##CORDA RUN SCRIPT
COPY --chown=corda:corda run-corda.sh /opt/corda/bin/run-corda
##BASE CONFIG FOR GENERATOR
COPY --chown=corda:corda starting-node.conf /opt/corda/starting-node.conf
USER "corda"
EXPOSE ${MY_P2P_PORT} ${MY_RPC_PORT} ${MY_RPC_ADMIN_PORT}
WORKDIR /opt/corda
CMD ["run-corda"]

View File

@ -261,6 +261,12 @@ interface WritableTransactionStorage : TransactionStorage {
* ID exists. * ID exists.
*/ */
fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, Boolean>? fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, Boolean>?
/**
* Returns a future that completes with the transaction corresponding to [id] once it has been committed. Do not warn when run inside
* a DB transaction.
*/
fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture<SignedTransaction>
} }
/** /**

View File

@ -140,6 +140,8 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
val actTx = tx.peekableValue ?: return 0 val actTx = tx.peekableValue ?: return 0
return actTx.sigs.sumBy { it.size + transactionSignatureOverheadEstimate } + actTx.txBits.size return actTx.sigs.sumBy { it.size + transactionSignatureOverheadEstimate } + actTx.txBits.size
} }
private val log = contextLogger()
} }
private val txStorage = ThreadBox(createTransactionsMap(cacheFactory, clock)) private val txStorage = ThreadBox(createTransactionsMap(cacheFactory, clock))
@ -219,12 +221,24 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
} }
override fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction> { override fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction> {
if (contextTransactionOrNull != null) {
log.warn("trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions.")
}
return trackTransactionWithNoWarning(id)
}
override fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture<SignedTransaction> {
val updateFuture = updates.filter { it.id == id }.toFuture()
return database.transaction { return database.transaction {
txStorage.locked { txStorage.locked {
val existingTransaction = getTransaction(id) val existingTransaction = getTransaction(id)
if (existingTransaction == null) { if (existingTransaction == null) {
updates.filter { it.id == id }.toFuture() updates.filter { it.id == id }.toFuture()
updateFuture
} else { } else {
updateFuture.cancel(false)
doneFuture(existingTransaction) doneFuture(existingTransaction)
} }
} }

View File

@ -89,7 +89,7 @@ class ActionExecutorImpl(
@Suspendable @Suspendable
private fun executeTrackTransaction(fiber: FlowFiber, action: Action.TrackTransaction) { private fun executeTrackTransaction(fiber: FlowFiber, action: Action.TrackTransaction) {
services.validatedTransactions.trackTransaction(action.hash).thenMatch( services.validatedTransactions.trackTransactionWithNoWarning(action.hash).thenMatch(
success = { transaction -> success = { transaction ->
fiber.scheduleEvent(Event.TransactionCommitted(transaction)) fiber.scheduleEvent(Event.TransactionCommitted(transaction))
}, },

View File

@ -733,6 +733,12 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
} }
} }
override fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture<SignedTransaction> {
return database.transaction {
delegate.trackTransactionWithNoWarning(id)
}
}
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> { override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return database.transaction { return database.transaction {
delegate.track() delegate.track()

View File

@ -1,6 +1,7 @@
package net.corda.node.services.persistence package net.corda.node.services.persistence
import junit.framework.TestCase.assertTrue import junit.framework.TestCase.assertTrue
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef import net.corda.core.contracts.StateRef
import net.corda.core.crypto.Crypto import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
@ -20,14 +21,23 @@ import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase import net.corda.testing.internal.configureDatabase
import net.corda.testing.internal.createWireTransaction import net.corda.testing.internal.createWireTransaction
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.core.Appender
import org.apache.logging.log4j.core.LoggerContext
import org.apache.logging.log4j.core.appender.WriterAppender
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.After import org.junit.After
import org.junit.Assert
import org.junit.Before import org.junit.Before
import org.junit.Rule import org.junit.Rule
import org.junit.Test import org.junit.Test
import rx.plugins.RxJavaHooks
import java.io.StringWriter
import java.util.concurrent.Semaphore
import java.time.Clock import java.time.Clock
import java.time.Instant import java.time.Instant
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kotlin.concurrent.thread
import kotlin.test.assertEquals import kotlin.test.assertEquals
class DBTransactionStorageTests { class DBTransactionStorageTests {
@ -38,7 +48,7 @@ class DBTransactionStorageTests {
@Rule @Rule
@JvmField @JvmField
val testSerialization = SerializationEnvironmentRule() val testSerialization = SerializationEnvironmentRule(inheritable = true)
private lateinit var database: CordaPersistence private lateinit var database: CordaPersistence
private lateinit var transactionStorage: DBTransactionStorage private lateinit var transactionStorage: DBTransactionStorage
@ -313,6 +323,107 @@ class DBTransactionStorageTests {
assertTransactionIsRetrievable(secondTransaction) assertTransactionIsRetrievable(secondTransaction)
} }
@Suppress("UnstableApiUsage")
@Test(timeout=300_000)
fun `race condition - failure path`() {
// Insert a sleep into trackTransaction
RxJavaHooks.setOnObservableCreate {
Thread.sleep(1_000)
it
}
try {
`race condition - ok path`()
} finally {
// Remove sleep so it does not affect other tests
RxJavaHooks.setOnObservableCreate { it }
}
}
@Test(timeout=300_000)
fun `race condition - ok path`() {
// Arrange
val signedTransaction = newTransaction()
val threadCount = 2
val finishedThreadsSemaphore = Semaphore(threadCount)
finishedThreadsSemaphore.acquire(threadCount)
// Act
thread(name = "addTransaction") {
transactionStorage.addTransaction(signedTransaction)
finishedThreadsSemaphore.release()
}
var result: CordaFuture<SignedTransaction>? = null
thread(name = "trackTransaction") {
result = transactionStorage.trackTransaction(signedTransaction.id)
finishedThreadsSemaphore.release()
}
if (!finishedThreadsSemaphore.tryAcquire(threadCount, 1, TimeUnit.MINUTES)) {
Assert.fail("Threads did not finish")
}
// Assert
assertThat(result).isNotNull()
assertThat(result?.get(20, TimeUnit.SECONDS)?.id).isEqualTo(signedTransaction.id)
}
@Test(timeout=300_000)
fun `race condition - transaction warning`() {
// Arrange
val signedTransaction = newTransaction()
// Act
val logMessages = collectLogsFrom {
database.transaction {
val result = transactionStorage.trackTransaction(signedTransaction.id)
result.cancel(false)
}
}
// Assert
assertThat(logMessages).contains("trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions.")
}
private fun collectLogsFrom(statement: () -> Unit): String {
// Create test appender
val stringWriter = StringWriter()
val appenderName = this::collectLogsFrom.name
val appender: Appender = WriterAppender.createAppender(
null,
null,
stringWriter,
appenderName,
false,
true
)
appender.start()
// Add test appender
val context = LogManager.getContext(false) as LoggerContext
val configuration = context.configuration
configuration.addAppender(appender)
configuration.loggers.values.forEach { it.addAppender(appender, null, null) }
try {
statement()
} finally {
// Remove test appender
configuration.loggers.values.forEach { it.removeAppender(appenderName) }
configuration.appenders.remove(appenderName)
appender.stop()
}
return stringWriter.toString()
}
private fun newTransactionStorage(cacheSizeBytesOverride: Long? = null, clock: CordaClock = SimpleClock(Clock.systemUTC())) { private fun newTransactionStorage(cacheSizeBytesOverride: Long? = null, clock: CordaClock = SimpleClock(Clock.systemUTC())) {
transactionStorage = DBTransactionStorage(database, TestingNamedCacheFactory(cacheSizeBytesOverride transactionStorage = DBTransactionStorage(database, TestingNamedCacheFactory(cacheSizeBytesOverride
?: 1024), clock) ?: 1024), clock)

View File

@ -21,6 +21,10 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
return getTransaction(id)?.let { doneFuture(it) } ?: _updatesPublisher.filter { it.id == id }.toFuture() return getTransaction(id)?.let { doneFuture(it) } ?: _updatesPublisher.filter { it.id == id }.toFuture()
} }
override fun trackTransactionWithNoWarning(id: SecureHash): CordaFuture<SignedTransaction> {
return trackTransaction(id)
}
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> { override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return DataFeed(txns.values.mapNotNull { if (it.isVerified) it.stx else null }, _updatesPublisher) return DataFeed(txns.values.mapNotNull { if (it.isVerified) it.stx else null }, _updatesPublisher)
} }