mirror of
https://github.com/corda/corda.git
synced 2025-03-19 18:45:28 +00:00
Merge remote-tracking branch 'open/master' into aslemmer-os-merge-11-jun
This commit is contained in:
commit
9793662719
@ -6,12 +6,17 @@ import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.fork
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.common.internal.ProjectStructure
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.DUMMY_BANK_B_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.OutOfProcess
|
||||
import net.corda.testing.driver.driver
|
||||
@ -20,12 +25,12 @@ import net.corda.testing.internal.IntegrationTestSchemas
|
||||
import net.corda.testing.internal.toDatabaseSchemaName
|
||||
import net.corda.testing.node.User
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.Executors
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class HardRestartTest : IntegrationTest() {
|
||||
companion object {
|
||||
@ -33,14 +38,20 @@ class HardRestartTest : IntegrationTest() {
|
||||
@JvmField
|
||||
val databaseSchemas = IntegrationTestSchemas(DUMMY_BANK_A_NAME.toDatabaseSchemaName(), DUMMY_BANK_B_NAME.toDatabaseSchemaName(),
|
||||
DUMMY_NOTARY_NAME.toDatabaseSchemaName())
|
||||
val logConfigFile = ProjectStructure.projectRootDir / "config" / "dev" / "log4j2.xml"
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class Ping(val pongParty: Party) : FlowLogic<Unit>() {
|
||||
class Ping(val pongParty: Party, val times: Int) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val pongSession = initiateFlow(pongParty)
|
||||
pongSession.sendAndReceive<Unit>(Unit)
|
||||
pongSession.sendAndReceive<Unit>(times)
|
||||
for (i in 1 .. times) {
|
||||
val j = pongSession.sendAndReceive<Int>(i).unwrap { it }
|
||||
assertEquals(i, j)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -48,14 +59,18 @@ class HardRestartTest : IntegrationTest() {
|
||||
class Pong(val pingSession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
pingSession.sendAndReceive<Unit>(Unit)
|
||||
val times = pingSession.sendAndReceive<Int>(Unit).unwrap { it }
|
||||
for (i in 1 .. times) {
|
||||
val j = pingSession.sendAndReceive<Int>(i).unwrap { it }
|
||||
assertEquals(i, j)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun restartPingPongFlowRandomly() {
|
||||
fun restartShortPingPongFlowRandomly() {
|
||||
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false)) {
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
|
||||
val (a, b) = listOf(
|
||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")),
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
@ -74,12 +89,74 @@ class HardRestartTest : IntegrationTest() {
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
}
|
||||
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) {
|
||||
val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity()).returnValue
|
||||
val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 1).returnValue
|
||||
latch.countDown()
|
||||
// No matter the kill
|
||||
returnValue.getOrThrow()
|
||||
}
|
||||
|
||||
pongRestartThread.join()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun restartLongPingPongFlowRandomly() {
|
||||
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
|
||||
val (a, b) = listOf(
|
||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")),
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
).transpose().getOrThrow()
|
||||
|
||||
val latch = CountDownLatch(1)
|
||||
|
||||
// We kill -9 and restart the Pong node after a random sleep
|
||||
val pongRestartThread = thread {
|
||||
latch.await()
|
||||
val ms = Random().nextInt(1000)
|
||||
println("Sleeping $ms ms before kill")
|
||||
Thread.sleep(ms.toLong())
|
||||
(b as OutOfProcess).process.destroyForcibly()
|
||||
b.stop()
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
}
|
||||
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) {
|
||||
val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100).returnValue
|
||||
latch.countDown()
|
||||
// No matter the kill
|
||||
returnValue.getOrThrow()
|
||||
}
|
||||
|
||||
pongRestartThread.join()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun softRestartLongPingPongFlowRandomly() {
|
||||
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
|
||||
val (a, b) = listOf(
|
||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")),
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
).transpose().getOrThrow()
|
||||
|
||||
val latch = CountDownLatch(1)
|
||||
|
||||
// We kill -9 and restart the Pong node after a random sleep
|
||||
val pongRestartThread = thread {
|
||||
latch.await()
|
||||
val ms = Random().nextInt(1000)
|
||||
println("Sleeping $ms ms before kill")
|
||||
Thread.sleep(ms.toLong())
|
||||
b.stop()
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
}
|
||||
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) {
|
||||
val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100).returnValue
|
||||
latch.countDown()
|
||||
// No matter the kill
|
||||
returnValue.getOrThrow()
|
||||
}
|
||||
|
||||
pongRestartThread.join()
|
||||
}
|
||||
@ -133,7 +210,7 @@ class HardRestartTest : IntegrationTest() {
|
||||
@Test
|
||||
fun restartRecursiveFlowRandomly() {
|
||||
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<RecursiveA>(), Permissions.all()))
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false)) {
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
|
||||
val (a, b) = listOf(
|
||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")),
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
|
@ -15,6 +15,7 @@ import co.paralleluniverse.strands.SettableFuture
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.services.statemachine.FlowMessagingImpl
|
||||
@ -189,6 +190,10 @@ class MessagingExecutor(
|
||||
}
|
||||
|
||||
private fun acknowledgeJob(job: Job.Acknowledge) {
|
||||
log.debug {
|
||||
val id = job.message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)
|
||||
"Acking $id"
|
||||
}
|
||||
job.message.individualAcknowledge()
|
||||
}
|
||||
}
|
||||
|
@ -149,7 +149,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
val eventQueue = getTransientField(TransientValues::eventQueue)
|
||||
try {
|
||||
eventLoop@ while (true) {
|
||||
val nextEvent = eventQueue.receive()
|
||||
val nextEvent = try {
|
||||
eventQueue.receive()
|
||||
} catch (interrupted: InterruptedException) {
|
||||
log.error("Flow interrupted while waiting for events, aborting immediately")
|
||||
abortFiber()
|
||||
}
|
||||
val continuation = processEvent(transitionExecutor, nextEvent)
|
||||
when (continuation) {
|
||||
is FlowContinuation.Resume -> return continuation.result
|
||||
@ -176,7 +181,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
* processing finished. Purely used for internal invariant checks.
|
||||
*/
|
||||
@Suspendable
|
||||
private fun processEventImmediately(event: Event, isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): FlowContinuation {
|
||||
private fun processEventImmediately(
|
||||
event: Event,
|
||||
isDbTransactionOpenOnEntry: Boolean,
|
||||
isDbTransactionOpenOnExit: Boolean): FlowContinuation {
|
||||
checkDbTransaction(isDbTransactionOpenOnEntry)
|
||||
val transitionExecutor = getTransientField(TransientValues::transitionExecutor)
|
||||
val continuation = processEvent(transitionExecutor, event)
|
||||
@ -256,7 +264,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
processEventImmediately(
|
||||
Event.EnterSubFlow(subFlow.javaClass,
|
||||
createSubFlowVersion(
|
||||
serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion)),
|
||||
serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion
|
||||
)
|
||||
),
|
||||
isDbTransactionOpenOnEntry = true,
|
||||
isDbTransactionOpenOnExit = true
|
||||
)
|
||||
|
@ -332,6 +332,7 @@ class SingleThreadedStateMachineManager(
|
||||
mutex.locked { if (flows.containsKey(id)) return@map null }
|
||||
val checkpoint = deserializeCheckpoint(serializedCheckpoint)
|
||||
if (checkpoint == null) return@map null
|
||||
logger.debug { "Restored $checkpoint" }
|
||||
createFlowFromCheckpoint(
|
||||
id = id,
|
||||
checkpoint = checkpoint,
|
||||
@ -380,7 +381,10 @@ class SingleThreadedStateMachineManager(
|
||||
// Just flow initiation message
|
||||
null
|
||||
}
|
||||
externalEventMutex.withLock {
|
||||
mutex.locked {
|
||||
if (stopping) {
|
||||
return
|
||||
}
|
||||
// Remove any sessions the old flow has.
|
||||
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
|
||||
sessionToFlow.remove(sessionId)
|
||||
@ -401,12 +405,13 @@ class SingleThreadedStateMachineManager(
|
||||
}
|
||||
}
|
||||
|
||||
private val externalEventMutex = ReentrantLock()
|
||||
override fun deliverExternalEvent(event: ExternalEvent) {
|
||||
externalEventMutex.withLock {
|
||||
when (event) {
|
||||
is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event)
|
||||
is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event)
|
||||
mutex.locked {
|
||||
if (!stopping) {
|
||||
when (event) {
|
||||
is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event)
|
||||
is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ crlCheckSoftFail = true
|
||||
lazyBridgeStart = true
|
||||
dataSourceProperties = {
|
||||
dataSourceClassName = org.h2.jdbcx.JdbcDataSource
|
||||
dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=0;AUTO_SERVER_PORT="${h2port}
|
||||
dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;WRITE_DELAY=0;LOCK_TIMEOUT=10000;AUTO_SERVER_PORT="${h2port}
|
||||
dataSource.user = sa
|
||||
dataSource.password = ""
|
||||
}
|
||||
|
@ -11,6 +11,7 @@
|
||||
package net.corda.traderdemo
|
||||
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.millis
|
||||
import net.corda.finance.DOLLARS
|
||||
@ -25,6 +26,7 @@ import net.corda.testing.core.DUMMY_BANK_B_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.InProcess
|
||||
import net.corda.testing.driver.OutOfProcess
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.internal.IntegrationTest
|
||||
import net.corda.testing.internal.IntegrationTestSchemas
|
||||
@ -91,4 +93,31 @@ class TraderDemoTest : IntegrationTest() {
|
||||
assertThat(clientB.dollarCashBalance).isEqualTo(5.DOLLARS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Tudor test`() {
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
|
||||
val demoUser = User("demo", "demo", setOf(startFlow<SellerFlow>(), all()))
|
||||
val bankUser = User("user1", "test", permissions = setOf(all()))
|
||||
val (nodeA, nodeB, bankNode) = listOf(
|
||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)),
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)),
|
||||
startNode(providedName = BOC_NAME, rpcUsers = listOf(bankUser))
|
||||
).map { (it.getOrThrow() as OutOfProcess) }
|
||||
|
||||
val nodeBRpc = CordaRPCClient(nodeB.rpcAddress).start(demoUser.username, demoUser.password).proxy
|
||||
val nodeARpc = CordaRPCClient(nodeA.rpcAddress).start(demoUser.username, demoUser.password).proxy
|
||||
val nodeBankRpc = let {
|
||||
val client = CordaRPCClient(bankNode.rpcAddress)
|
||||
client.start(bankUser.username, bankUser.password).proxy
|
||||
}
|
||||
|
||||
TraderDemoClientApi(nodeBankRpc).runIssuer(amount = 100.DOLLARS, buyerName = nodeA.nodeInfo.singleIdentity().name, sellerName = nodeB.nodeInfo.singleIdentity().name)
|
||||
val stxFuture = nodeBRpc.startFlow(::SellerFlow, nodeA.nodeInfo.singleIdentity(), 5.DOLLARS).returnValue
|
||||
nodeARpc.stateMachinesFeed().updates.toBlocking().first() // wait until initiated flow starts
|
||||
nodeA.stop()
|
||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to nodeA.p2pAddress.toString()))
|
||||
stxFuture.getOrThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user