Merge pull request #3285 from corda/CORDA-1191/aslemmer-fix-smm-bugs

CORDA-1191 SMM bugs
This commit is contained in:
Andras Slemmer 2018-06-11 15:46:56 +01:00 committed by GitHub
commit c66228adf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 301 additions and 10 deletions

View File

@ -0,0 +1,240 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.div
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.testing.common.internal.ProjectStructure
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import org.junit.Test
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import kotlin.concurrent.thread
import kotlin.test.assertEquals
class HardRestartTest {
@StartableByRPC
@InitiatingFlow
class Ping(val pongParty: Party, val times: Int) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val pongSession = initiateFlow(pongParty)
pongSession.sendAndReceive<Unit>(times)
for (i in 1 .. times) {
val j = pongSession.sendAndReceive<Int>(i).unwrap { it }
assertEquals(i, j)
}
}
}
@InitiatedBy(Ping::class)
class Pong(val pingSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val times = pingSession.sendAndReceive<Int>(Unit).unwrap { it }
for (i in 1 .. times) {
val j = pingSession.sendAndReceive<Int>(i).unwrap { it }
assertEquals(i, j)
}
}
}
companion object {
val logConfigFile = ProjectStructure.projectRootDir / "config" / "dev" / "log4j2.xml"
}
@Test
fun restartShortPingPongFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
val (a, b) = listOf(
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
).transpose().getOrThrow()
val latch = CountDownLatch(1)
// We kill -9 and restart the Pong node after a random sleep
val pongRestartThread = thread {
latch.await()
val ms = Random().nextInt(1000)
println("Sleeping $ms ms before kill")
Thread.sleep(ms.toLong())
(b as OutOfProcess).process.destroyForcibly()
b.stop()
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
}
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) {
val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 1).returnValue
latch.countDown()
// No matter the kill
returnValue.getOrThrow()
}
pongRestartThread.join()
}
}
@Test
fun restartLongPingPongFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
val (a, b) = listOf(
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
).transpose().getOrThrow()
val latch = CountDownLatch(1)
// We kill -9 and restart the Pong node after a random sleep
val pongRestartThread = thread {
latch.await()
val ms = Random().nextInt(1000)
println("Sleeping $ms ms before kill")
Thread.sleep(ms.toLong())
(b as OutOfProcess).process.destroyForcibly()
b.stop()
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
}
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) {
val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100).returnValue
latch.countDown()
// No matter the kill
returnValue.getOrThrow()
}
pongRestartThread.join()
}
}
@Test
fun softRestartLongPingPongFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
val (a, b) = listOf(
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
).transpose().getOrThrow()
val latch = CountDownLatch(1)
// We kill -9 and restart the Pong node after a random sleep
val pongRestartThread = thread {
latch.await()
val ms = Random().nextInt(1000)
println("Sleeping $ms ms before kill")
Thread.sleep(ms.toLong())
b.stop()
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
}
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) {
val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100).returnValue
latch.countDown()
// No matter the kill
returnValue.getOrThrow()
}
pongRestartThread.join()
}
}
sealed class RecursiveMode {
data class Top(val otherParty: Party, val initialDepth: Int) : RecursiveMode()
data class Recursive(val otherSession: FlowSession) : RecursiveMode()
}
@StartableByRPC
@InitiatingFlow
@InitiatedBy(RecursiveB::class)
class RecursiveA(val mode: RecursiveMode) : FlowLogic<String>() {
constructor(otherSession: FlowSession) : this(RecursiveMode.Recursive(otherSession))
constructor(otherParty: Party, initialDepth: Int) : this(RecursiveMode.Top(otherParty, initialDepth))
@Suspendable
override fun call(): String {
return when (mode) {
is HardRestartTest.RecursiveMode.Top -> {
val session = initiateFlow(mode.otherParty)
session.sendAndReceive<String>(mode.initialDepth).unwrap { it }
}
is HardRestartTest.RecursiveMode.Recursive -> {
val depth = mode.otherSession.receive<Int>().unwrap { it }
val string = if (depth > 0) {
val newSession = initiateFlow(mode.otherSession.counterparty)
newSession.sendAndReceive<String>(depth).unwrap { it }
} else {
"-"
}
mode.otherSession.send(string)
string
}
}
}
}
@InitiatingFlow
@InitiatedBy(RecursiveA::class)
class RecursiveB(val otherSession: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val depth = otherSession.receive<Int>().unwrap { it }
val newSession = initiateFlow(otherSession.counterparty)
val string = newSession.sendAndReceive<String>(depth - 1).unwrap { it }
otherSession.send(string + ":" + depth)
}
}
@Test
fun restartRecursiveFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<RecursiveA>(), Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
val (a, b) = listOf(
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
).transpose().getOrThrow()
val latch = CountDownLatch(1)
// We kill -9 and restart the node B after a random sleep
val bRestartThread = thread {
latch.await()
val ms = Random().nextInt(1000)
println("Sleeping $ms ms before kill")
Thread.sleep(ms.toLong())
(b as OutOfProcess).process.destroyForcibly()
b.stop()
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
}
val executor = Executors.newFixedThreadPool(8)
try {
val tlRpc = ThreadLocal<CordaRPCOps>()
(1 .. 10).map { num ->
executor.fork {
val rpc = tlRpc.get() ?: CordaRPCClient(a.rpcAddress).start(demoUser.username, demoUser.password).proxy.also { tlRpc.set(it) }
val string = rpc.startFlow(::RecursiveA, b.nodeInfo.singleIdentity(), 10).returnValue.getOrThrow()
latch.countDown()
println("$num: $string")
}
}.transpose().getOrThrow()
bRestartThread.join()
} finally {
executor.shutdown()
}
}
}
}

View File

@ -2,6 +2,7 @@ package net.corda.node.services.messaging
import net.corda.core.messaging.MessageRecipients
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
import net.corda.node.VersionInfo
import net.corda.node.services.statemachine.FlowMessagingImpl
@ -41,6 +42,7 @@ class MessagingExecutor(
val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt()
}
@Synchronized
fun send(message: Message, target: MessageRecipients) {
val mqAddress = resolver.resolveTargetToArtemisQueue(target)
val artemisMessage = cordaToArtemisMessage(message)
@ -51,7 +53,12 @@ class MessagingExecutor(
producer.send(SimpleString(mqAddress), artemisMessage)
}
@Synchronized
fun acknowledge(message: ClientMessage) {
log.debug {
val id = message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)
"Acking $id"
}
message.individualAcknowledge()
}

View File

@ -139,7 +139,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val eventQueue = getTransientField(TransientValues::eventQueue)
try {
eventLoop@ while (true) {
val nextEvent = eventQueue.receive()
val nextEvent = try {
eventQueue.receive()
} catch (interrupted: InterruptedException) {
log.error("Flow interrupted while waiting for events, aborting immediately")
abortFiber()
}
val continuation = processEvent(transitionExecutor, nextEvent)
when (continuation) {
is FlowContinuation.Resume -> return continuation.result
@ -166,7 +171,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
* processing finished. Purely used for internal invariant checks.
*/
@Suspendable
private fun processEventImmediately(event: Event, isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): FlowContinuation {
private fun processEventImmediately(
event: Event,
isDbTransactionOpenOnEntry: Boolean,
isDbTransactionOpenOnExit: Boolean): FlowContinuation {
checkDbTransaction(isDbTransactionOpenOnEntry)
val transitionExecutor = getTransientField(TransientValues::transitionExecutor)
val continuation = processEvent(transitionExecutor, event)
@ -246,7 +254,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
processEventImmediately(
Event.EnterSubFlow(subFlow.javaClass,
createSubFlowVersion(
serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion)),
serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion
)
),
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = true
)

View File

@ -322,6 +322,7 @@ class SingleThreadedStateMachineManager(
mutex.locked { if (flows.containsKey(id)) return@map null }
val checkpoint = deserializeCheckpoint(serializedCheckpoint)
if (checkpoint == null) return@map null
logger.debug { "Restored $checkpoint" }
createFlowFromCheckpoint(
id = id,
checkpoint = checkpoint,
@ -370,7 +371,10 @@ class SingleThreadedStateMachineManager(
// Just flow initiation message
null
}
externalEventMutex.withLock {
mutex.locked {
if (stopping) {
return
}
// Remove any sessions the old flow has.
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
sessionToFlow.remove(sessionId)
@ -391,12 +395,13 @@ class SingleThreadedStateMachineManager(
}
}
private val externalEventMutex = ReentrantLock()
override fun deliverExternalEvent(event: ExternalEvent) {
externalEventMutex.withLock {
when (event) {
is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event)
is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event)
mutex.locked {
if (!stopping) {
when (event) {
is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event)
is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event)
}
}
}
}

View File

@ -5,7 +5,7 @@ crlCheckSoftFail = true
lazyBridgeStart = true
dataSourceProperties = {
dataSourceClassName = org.h2.jdbcx.JdbcDataSource
dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=100;AUTO_SERVER_PORT="${h2port}
dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;WRITE_DELAY=0;LOCK_TIMEOUT=10000;AUTO_SERVER_PORT="${h2port}
dataSource.user = sa
dataSource.password = ""
}

View File

@ -1,6 +1,7 @@
package net.corda.traderdemo
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.millis
import net.corda.finance.DOLLARS
@ -14,6 +15,7 @@ import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.InProcess
import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.poll
@ -71,4 +73,31 @@ class TraderDemoTest {
assertThat(clientB.dollarCashBalance).isEqualTo(5.DOLLARS)
}
}
@Test
fun `Tudor test`() {
driver(DriverParameters(isDebug = true, startNodesInProcess = false, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
val demoUser = User("demo", "demo", setOf(startFlow<SellerFlow>(), all()))
val bankUser = User("user1", "test", permissions = setOf(all()))
val (nodeA, nodeB, bankNode) = listOf(
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)),
startNode(providedName = BOC_NAME, rpcUsers = listOf(bankUser))
).map { (it.getOrThrow() as OutOfProcess) }
val nodeBRpc = CordaRPCClient(nodeB.rpcAddress).start(demoUser.username, demoUser.password).proxy
val nodeARpc = CordaRPCClient(nodeA.rpcAddress).start(demoUser.username, demoUser.password).proxy
val nodeBankRpc = let {
val client = CordaRPCClient(bankNode.rpcAddress)
client.start(bankUser.username, bankUser.password).proxy
}
TraderDemoClientApi(nodeBankRpc).runIssuer(amount = 100.DOLLARS, buyerName = nodeA.nodeInfo.singleIdentity().name, sellerName = nodeB.nodeInfo.singleIdentity().name)
val stxFuture = nodeBRpc.startFlow(::SellerFlow, nodeA.nodeInfo.singleIdentity(), 5.DOLLARS).returnValue
nodeARpc.stateMachinesFeed().updates.toBlocking().first() // wait until initiated flow starts
nodeA.stop()
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to nodeA.p2pAddress.toString()))
stxFuture.getOrThrow()
}
}
}