mirror of
https://github.com/corda/corda.git
synced 2025-03-14 08:16:32 +00:00
Merge pull request #973 from corda/aslemmer-os-merge-11-jun
Aslemmer os merge 11 jun
This commit is contained in:
commit
8038343065
@ -114,12 +114,14 @@ absolute path to the node's base directory.
|
||||
here must be externally accessible when running nodes across a cluster of machines. If the provided host is unreachable,
|
||||
the node will try to auto-discover its public one.
|
||||
|
||||
:p2pMessagingRetry: Only used for notarisation requests. When the response doesn't arrive in time, the message is
|
||||
resent to a different notary-replica round-robin in case of clustered notaries.
|
||||
:flowTimeout: When a flow implementing the ``TimedFlow`` interface does not complete in time, it is restarted from the
|
||||
initial checkpoint. Currently only used for notarisation requests: if a notary replica dies while processing a notarisation request,
|
||||
the client flow eventually times out and gets restarted. On restart the request is resent to a different notary replica
|
||||
in a round-robin fashion (assuming the notary is clustered).
|
||||
|
||||
:messageRedeliveryDelay: The initial retry delay, e.g. `30 seconds`.
|
||||
:maxRetryCount: How many retries to attempt.
|
||||
:backoffBase: The base of the exponential backoff, `t_{wait} = messageRedeliveryDelay * backoffBase^{retryCount}`.
|
||||
:timeout: The initial flow timeout period, e.g. `30 seconds`.
|
||||
:maxRestartCount: Maximum number of times the flow will restart before resulting in an error.
|
||||
:backoffBase: The base of the exponential backoff, `t_{wait} = timeout * backoffBase^{retryCount}`.
|
||||
|
||||
:rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. This is now deprecated in favour of the ``rpcSettings`` block.
|
||||
|
||||
|
@ -61,9 +61,9 @@ class TimedFlowMultiThreadedSMMTests : IntegrationTest() {
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = true,
|
||||
portAllocation = RandomFree)) {
|
||||
|
||||
val configOverrides = mapOf("p2pMessagingRetry" to mapOf(
|
||||
"messageRedeliveryDelay" to Duration.ofSeconds(1),
|
||||
"maxRetryCount" to 2,
|
||||
val configOverrides = mapOf("flowTimeout" to mapOf(
|
||||
"timeout" to Duration.ofSeconds(1),
|
||||
"maxRestartCount" to 2,
|
||||
"backoffBase" to 1.0
|
||||
))
|
||||
|
||||
|
@ -31,10 +31,9 @@ import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.internal.StartedNode
|
||||
import net.corda.node.services.config.FlowTimeoutConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.NotaryConfig
|
||||
import net.corda.node.services.config.P2PMessagingRetryConfiguration
|
||||
import net.corda.node.services.vault.VaultQueryIntegrationTests
|
||||
import net.corda.nodeapi.internal.DevIdentityGenerator
|
||||
import net.corda.nodeapi.internal.network.NetworkParametersCopier
|
||||
import net.corda.testing.common.internal.testNetworkParameters
|
||||
@ -43,13 +42,14 @@ import net.corda.testing.core.dummyCommand
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.internal.GlobalDatabaseRule
|
||||
import net.corda.testing.internal.LogHelper
|
||||
import net.corda.testing.internal.toDatabaseSchemaName
|
||||
import net.corda.testing.node.InMemoryMessagingNetwork
|
||||
import net.corda.testing.node.MockNetworkParameters
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import org.junit.*
|
||||
import org.junit.Before
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.ExternalResource
|
||||
import org.junit.rules.RuleChain
|
||||
import org.slf4j.MDC
|
||||
@ -85,8 +85,8 @@ class TimedFlowTestRule(val clusterSize: Int) : ExternalResource() {
|
||||
InternalMockNodeParameters(
|
||||
legalName = CordaX500Name("Alice", "AliceCorp", "GB"),
|
||||
configOverrides = { conf: NodeConfiguration ->
|
||||
val retryConfig = P2PMessagingRetryConfiguration(10.seconds, 3, 1.0)
|
||||
doReturn(retryConfig).whenever(conf).p2pMessagingRetry
|
||||
val flowTimeoutConfig = FlowTimeoutConfiguration(10.seconds, 3, 1.0)
|
||||
doReturn(flowTimeoutConfig).whenever(conf).flowTimeout
|
||||
}
|
||||
)
|
||||
)
|
||||
|
@ -6,12 +6,17 @@ import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.fork
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.testing.core.*
|
||||
import net.corda.testing.common.internal.ProjectStructure
|
||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||
import net.corda.testing.core.DUMMY_BANK_B_NAME
|
||||
import net.corda.testing.core.DUMMY_NOTARY_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.OutOfProcess
|
||||
import net.corda.testing.driver.driver
|
||||
@ -20,12 +25,12 @@ import net.corda.testing.internal.IntegrationTestSchemas
|
||||
import net.corda.testing.internal.toDatabaseSchemaName
|
||||
import net.corda.testing.node.User
|
||||
import org.junit.ClassRule
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.Executors
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class HardRestartTest : IntegrationTest() {
|
||||
companion object {
|
||||
@ -33,14 +38,20 @@ class HardRestartTest : IntegrationTest() {
|
||||
@JvmField
|
||||
val databaseSchemas = IntegrationTestSchemas(DUMMY_BANK_A_NAME.toDatabaseSchemaName(), DUMMY_BANK_B_NAME.toDatabaseSchemaName(),
|
||||
DUMMY_NOTARY_NAME.toDatabaseSchemaName())
|
||||
val logConfigFile = ProjectStructure.projectRootDir / "config" / "dev" / "log4j2.xml"
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class Ping(val pongParty: Party) : FlowLogic<Unit>() {
|
||||
class Ping(val pongParty: Party, val times: Int) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val pongSession = initiateFlow(pongParty)
|
||||
pongSession.sendAndReceive<Unit>(Unit)
|
||||
pongSession.sendAndReceive<Unit>(times)
|
||||
for (i in 1 .. times) {
|
||||
val j = pongSession.sendAndReceive<Int>(i).unwrap { it }
|
||||
assertEquals(i, j)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -48,14 +59,18 @@ class HardRestartTest : IntegrationTest() {
|
||||
class Pong(val pingSession: FlowSession) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
pingSession.sendAndReceive<Unit>(Unit)
|
||||
val times = pingSession.sendAndReceive<Int>(Unit).unwrap { it }
|
||||
for (i in 1 .. times) {
|
||||
val j = pingSession.sendAndReceive<Int>(i).unwrap { it }
|
||||
assertEquals(i, j)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun restartPingPongFlowRandomly() {
|
||||
fun restartShortPingPongFlowRandomly() {
|
||||
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false)) {
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
|
||||
val (a, b) = listOf(
|
||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")),
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
@ -74,12 +89,74 @@ class HardRestartTest : IntegrationTest() {
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
}
|
||||
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) {
|
||||
val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity()).returnValue
|
||||
val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 1).returnValue
|
||||
latch.countDown()
|
||||
// No matter the kill
|
||||
returnValue.getOrThrow()
|
||||
}
|
||||
|
||||
pongRestartThread.join()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun restartLongPingPongFlowRandomly() {
|
||||
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
|
||||
val (a, b) = listOf(
|
||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")),
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
).transpose().getOrThrow()
|
||||
|
||||
val latch = CountDownLatch(1)
|
||||
|
||||
// We kill -9 and restart the Pong node after a random sleep
|
||||
val pongRestartThread = thread {
|
||||
latch.await()
|
||||
val ms = Random().nextInt(1000)
|
||||
println("Sleeping $ms ms before kill")
|
||||
Thread.sleep(ms.toLong())
|
||||
(b as OutOfProcess).process.destroyForcibly()
|
||||
b.stop()
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
}
|
||||
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) {
|
||||
val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100).returnValue
|
||||
latch.countDown()
|
||||
// No matter the kill
|
||||
returnValue.getOrThrow()
|
||||
}
|
||||
|
||||
pongRestartThread.join()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun softRestartLongPingPongFlowRandomly() {
|
||||
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
|
||||
val (a, b) = listOf(
|
||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")),
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
).transpose().getOrThrow()
|
||||
|
||||
val latch = CountDownLatch(1)
|
||||
|
||||
// We kill -9 and restart the Pong node after a random sleep
|
||||
val pongRestartThread = thread {
|
||||
latch.await()
|
||||
val ms = Random().nextInt(1000)
|
||||
println("Sleeping $ms ms before kill")
|
||||
Thread.sleep(ms.toLong())
|
||||
b.stop()
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
}
|
||||
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) {
|
||||
val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100).returnValue
|
||||
latch.countDown()
|
||||
// No matter the kill
|
||||
returnValue.getOrThrow()
|
||||
}
|
||||
|
||||
pongRestartThread.join()
|
||||
}
|
||||
@ -133,7 +210,7 @@ class HardRestartTest : IntegrationTest() {
|
||||
@Test
|
||||
fun restartRecursiveFlowRandomly() {
|
||||
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<RecursiveA>(), Permissions.all()))
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false)) {
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) {
|
||||
val (a, b) = listOf(
|
||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")),
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000"))
|
||||
|
@ -50,7 +50,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
||||
val networkServices: NetworkServicesConfig?
|
||||
val certificateChainCheckPolicies: List<CertChainPolicyConfig>
|
||||
val verifierType: VerifierType
|
||||
val p2pMessagingRetry: P2PMessagingRetryConfiguration
|
||||
val flowTimeout: FlowTimeoutConfiguration
|
||||
val notary: NotaryConfig?
|
||||
val additionalNodeInfoPollingFrequencyMsec: Long
|
||||
val p2pAddress: NetworkHostAndPort
|
||||
@ -193,12 +193,11 @@ data class NetworkServicesConfig(
|
||||
/**
|
||||
* Currently only used for notarisation requests.
|
||||
*
|
||||
* When the response doesn't arrive in time, the message is resent to a different notary-replica round-robin
|
||||
* in case of clustered notaries.
|
||||
* Specifies the configuration for timing out and restarting a [TimedFlow].
|
||||
*/
|
||||
data class P2PMessagingRetryConfiguration(
|
||||
val messageRedeliveryDelay: Duration,
|
||||
val maxRetryCount: Int,
|
||||
data class FlowTimeoutConfiguration(
|
||||
val timeout: Duration,
|
||||
val maxRestartCount: Int,
|
||||
val backoffBase: Double
|
||||
)
|
||||
|
||||
@ -221,7 +220,7 @@ data class NodeConfigurationImpl(
|
||||
override val rpcUsers: List<User>,
|
||||
override val security: SecurityConfiguration? = null,
|
||||
override val verifierType: VerifierType,
|
||||
override val p2pMessagingRetry: P2PMessagingRetryConfiguration,
|
||||
override val flowTimeout: FlowTimeoutConfiguration,
|
||||
override val p2pAddress: NetworkHostAndPort,
|
||||
private val rpcAddress: NetworkHostAndPort? = null,
|
||||
private val rpcSettings: NodeRpcSettings,
|
||||
|
@ -15,6 +15,7 @@ import co.paralleluniverse.strands.SettableFuture
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.VersionInfo
|
||||
import net.corda.node.services.statemachine.FlowMessagingImpl
|
||||
@ -189,6 +190,10 @@ class MessagingExecutor(
|
||||
}
|
||||
|
||||
private fun acknowledgeJob(job: Job.Acknowledge) {
|
||||
log.debug {
|
||||
val id = job.message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)
|
||||
"Acking $id"
|
||||
}
|
||||
job.message.individualAcknowledge()
|
||||
}
|
||||
}
|
||||
|
@ -149,7 +149,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
val eventQueue = getTransientField(TransientValues::eventQueue)
|
||||
try {
|
||||
eventLoop@ while (true) {
|
||||
val nextEvent = eventQueue.receive()
|
||||
val nextEvent = try {
|
||||
eventQueue.receive()
|
||||
} catch (interrupted: InterruptedException) {
|
||||
log.error("Flow interrupted while waiting for events, aborting immediately")
|
||||
abortFiber()
|
||||
}
|
||||
val continuation = processEvent(transitionExecutor, nextEvent)
|
||||
when (continuation) {
|
||||
is FlowContinuation.Resume -> return continuation.result
|
||||
@ -176,7 +181,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
* processing finished. Purely used for internal invariant checks.
|
||||
*/
|
||||
@Suspendable
|
||||
private fun processEventImmediately(event: Event, isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): FlowContinuation {
|
||||
private fun processEventImmediately(
|
||||
event: Event,
|
||||
isDbTransactionOpenOnEntry: Boolean,
|
||||
isDbTransactionOpenOnExit: Boolean): FlowContinuation {
|
||||
checkDbTransaction(isDbTransactionOpenOnEntry)
|
||||
val transitionExecutor = getTransientField(TransientValues::transitionExecutor)
|
||||
val continuation = processEvent(transitionExecutor, event)
|
||||
@ -256,7 +264,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
processEventImmediately(
|
||||
Event.EnterSubFlow(subFlow.javaClass,
|
||||
createSubFlowVersion(
|
||||
serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion)),
|
||||
serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion
|
||||
)
|
||||
),
|
||||
isDbTransactionOpenOnEntry = true,
|
||||
isDbTransactionOpenOnExit = true
|
||||
)
|
||||
|
@ -602,10 +602,10 @@ class MultiThreadedStateMachineManager(
|
||||
|
||||
/** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */
|
||||
private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> {
|
||||
return with(serviceHub.configuration.p2pMessagingRetry) {
|
||||
val timeoutDelaySeconds = messageRedeliveryDelay.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
|
||||
return with(serviceHub.configuration.flowTimeout) {
|
||||
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
|
||||
timeoutScheduler.schedule({
|
||||
val event = Event.Error(FlowTimeoutException(maxRetryCount))
|
||||
val event = Event.Error(FlowTimeoutException(maxRestartCount))
|
||||
flow.fiber.scheduleEvent(event)
|
||||
}, timeoutDelaySeconds, TimeUnit.SECONDS)
|
||||
}
|
||||
|
@ -332,6 +332,7 @@ class SingleThreadedStateMachineManager(
|
||||
mutex.locked { if (flows.containsKey(id)) return@map null }
|
||||
val checkpoint = deserializeCheckpoint(serializedCheckpoint)
|
||||
if (checkpoint == null) return@map null
|
||||
logger.debug { "Restored $checkpoint" }
|
||||
createFlowFromCheckpoint(
|
||||
id = id,
|
||||
checkpoint = checkpoint,
|
||||
@ -380,7 +381,10 @@ class SingleThreadedStateMachineManager(
|
||||
// Just flow initiation message
|
||||
null
|
||||
}
|
||||
externalEventMutex.withLock {
|
||||
mutex.locked {
|
||||
if (stopping) {
|
||||
return
|
||||
}
|
||||
// Remove any sessions the old flow has.
|
||||
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
|
||||
sessionToFlow.remove(sessionId)
|
||||
@ -401,12 +405,13 @@ class SingleThreadedStateMachineManager(
|
||||
}
|
||||
}
|
||||
|
||||
private val externalEventMutex = ReentrantLock()
|
||||
override fun deliverExternalEvent(event: ExternalEvent) {
|
||||
externalEventMutex.withLock {
|
||||
when (event) {
|
||||
is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event)
|
||||
is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event)
|
||||
mutex.locked {
|
||||
if (!stopping) {
|
||||
when (event) {
|
||||
is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event)
|
||||
is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -614,10 +619,10 @@ class SingleThreadedStateMachineManager(
|
||||
|
||||
/** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */
|
||||
private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> {
|
||||
return with(serviceHub.configuration.p2pMessagingRetry) {
|
||||
val timeoutDelaySeconds = messageRedeliveryDelay.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
|
||||
return with(serviceHub.configuration.flowTimeout) {
|
||||
val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong()
|
||||
timeoutScheduler.schedule({
|
||||
val event = Event.Error(FlowTimeoutException(maxRetryCount))
|
||||
val event = Event.Error(FlowTimeoutException(maxRestartCount))
|
||||
flow.fiber.scheduleEvent(event)
|
||||
}, timeoutDelaySeconds, TimeUnit.SECONDS)
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ crlCheckSoftFail = true
|
||||
lazyBridgeStart = true
|
||||
dataSourceProperties = {
|
||||
dataSourceClassName = org.h2.jdbcx.JdbcDataSource
|
||||
dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=0;AUTO_SERVER_PORT="${h2port}
|
||||
dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;WRITE_DELAY=0;LOCK_TIMEOUT=10000;AUTO_SERVER_PORT="${h2port}
|
||||
dataSource.user = sa
|
||||
dataSource.password = ""
|
||||
}
|
||||
@ -44,8 +44,8 @@ rpcSettings = {
|
||||
useSsl = false
|
||||
standAloneBroker = false
|
||||
}
|
||||
p2pMessagingRetry {
|
||||
messageRedeliveryDelay = 30 seconds
|
||||
maxRetryCount = 3
|
||||
flowTimeout {
|
||||
timeout = 30 seconds
|
||||
maxRestartCount = 3
|
||||
backoffBase = 2.0
|
||||
}
|
||||
|
@ -280,7 +280,7 @@ class NodeConfigurationImplTest {
|
||||
verifierType = VerifierType.InMemory,
|
||||
p2pAddress = NetworkHostAndPort("localhost", 0),
|
||||
messagingServerAddress = null,
|
||||
p2pMessagingRetry = P2PMessagingRetryConfiguration(5.seconds, 3, 1.0),
|
||||
flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0),
|
||||
notary = null,
|
||||
devMode = true,
|
||||
noLocalShell = false,
|
||||
|
@ -83,8 +83,7 @@ class ArtemisMessagingTest {
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
||||
doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).p2pMessagingRetry
|
||||
|
||||
doReturn(FlowTimeoutConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).flowTimeout
|
||||
}
|
||||
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), { null }, { null })
|
||||
|
@ -24,9 +24,9 @@ rpcSettings = {
|
||||
useSsl = false
|
||||
standAloneBroker = false
|
||||
}
|
||||
p2pMessagingRetry {
|
||||
messageRedeliveryDelay = 30 seconds
|
||||
maxRetryCount = 3
|
||||
flowTimeout {
|
||||
timeout = 30 seconds
|
||||
maxRestartCount = 3
|
||||
backoffBase = 2.0
|
||||
}
|
||||
enterpriseConfiguration = {
|
||||
|
@ -11,6 +11,7 @@
|
||||
package net.corda.traderdemo
|
||||
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.millis
|
||||
import net.corda.finance.DOLLARS
|
||||
@ -25,6 +26,7 @@ import net.corda.testing.core.DUMMY_BANK_B_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.InProcess
|
||||
import net.corda.testing.driver.OutOfProcess
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.internal.IntegrationTest
|
||||
import net.corda.testing.internal.IntegrationTestSchemas
|
||||
@ -91,4 +93,31 @@ class TraderDemoTest : IntegrationTest() {
|
||||
assertThat(clientB.dollarCashBalance).isEqualTo(5.DOLLARS)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Tudor test`() {
|
||||
driver(DriverParameters(isDebug = true, startNodesInProcess = false, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
|
||||
val demoUser = User("demo", "demo", setOf(startFlow<SellerFlow>(), all()))
|
||||
val bankUser = User("user1", "test", permissions = setOf(all()))
|
||||
val (nodeA, nodeB, bankNode) = listOf(
|
||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)),
|
||||
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)),
|
||||
startNode(providedName = BOC_NAME, rpcUsers = listOf(bankUser))
|
||||
).map { (it.getOrThrow() as OutOfProcess) }
|
||||
|
||||
val nodeBRpc = CordaRPCClient(nodeB.rpcAddress).start(demoUser.username, demoUser.password).proxy
|
||||
val nodeARpc = CordaRPCClient(nodeA.rpcAddress).start(demoUser.username, demoUser.password).proxy
|
||||
val nodeBankRpc = let {
|
||||
val client = CordaRPCClient(bankNode.rpcAddress)
|
||||
client.start(bankUser.username, bankUser.password).proxy
|
||||
}
|
||||
|
||||
TraderDemoClientApi(nodeBankRpc).runIssuer(amount = 100.DOLLARS, buyerName = nodeA.nodeInfo.singleIdentity().name, sellerName = nodeB.nodeInfo.singleIdentity().name)
|
||||
val stxFuture = nodeBRpc.startFlow(::SellerFlow, nodeA.nodeInfo.singleIdentity(), 5.DOLLARS).returnValue
|
||||
nodeARpc.stateMachinesFeed().updates.toBlocking().first() // wait until initiated flow starts
|
||||
nodeA.stop()
|
||||
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to nodeA.p2pAddress.toString()))
|
||||
stxFuture.getOrThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -493,7 +493,7 @@ private fun mockNodeConfiguration(): NodeConfiguration {
|
||||
doReturn(null).whenever(it).networkServices
|
||||
doReturn(VerifierType.InMemory).whenever(it).verifierType
|
||||
// Set to be long enough so retries don't trigger unless we override it
|
||||
doReturn(P2PMessagingRetryConfiguration(1.hours, 3, backoffBase = 2.0)).whenever(it).p2pMessagingRetry
|
||||
doReturn(FlowTimeoutConfiguration(1.hours, 3, backoffBase = 1.0)).whenever(it).flowTimeout
|
||||
doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec
|
||||
doReturn(null).whenever(it).devModeOptions
|
||||
doReturn(EnterpriseConfiguration(
|
||||
|
Loading…
x
Reference in New Issue
Block a user