diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index c94fff6a2c..63e1579e8c 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -315,16 +315,21 @@ class RPCStabilityTests { var terminateHandlerCalled = false var errorHandlerCalled = false + var exceptionMessage: String? = null val subscription = client.subscribe() .doOnTerminate{ terminateHandlerCalled = true } - .doOnError { errorHandlerCalled = true } - .subscribe() + .subscribe({}, { + errorHandlerCalled = true + //log exception + exceptionMessage = it.message + }) serverFollower.shutdown() Thread.sleep(100) assertTrue(terminateHandlerCalled) assertTrue(errorHandlerCalled) + assertEquals("Connection failure detected.", exceptionMessage) assertTrue(subscription.isUnsubscribed) clientFollower.shutdown() // Driver would do this after the new server, causing hang. diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index 4bedf058ce..b495570148 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -99,6 +99,10 @@ interface CordaRPCClientConfiguration { * with an error, the observable is closed and you can't then re-subscribe again: you'll have to re-request a fresh * observable with another RPC. * + * In case of loss of connection to the server, the client will try to reconnect using the settings provided via + * [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw + * [RPCException] and previously returned observables will call onError(). + * * @param hostAndPort The network address to connect to. * @param configuration An optional configuration used to tweak client behaviour. * @param sslConfiguration An optional [SSLConfiguration] used to enable secure communication with the server. diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 36ed362c47..9f66bd36ed 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -7,6 +7,11 @@ Unreleased Here are brief summaries of what's changed between each snapshot release. This includes guidance on how to upgrade code from the previous milestone release. +* Update the fast-classpath-scanner dependent library version from 2.0.21 to 2.12.3 + + .. note:: Whilst this is not the latest version of this library, that being 2.18.1 at time of writing, versions later + than 2.12.3 (including 2.12.4) exhibit a different issue. + * Node can be shut down abruptly by ``shutdown`` function in `CordaRPCOps` or gracefully (draining flows first) through ``gracefulShutdown`` command from shell. * Parsing of ``NodeConfiguration`` will now fail if unknown configuration keys are found. diff --git a/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt b/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt new file mode 100644 index 0000000000..948bd80f22 --- /dev/null +++ b/finance/src/integration-test/kotlin/net/corda/finance/flows/CashSelectionTest.kt @@ -0,0 +1,43 @@ +package net.corda.finance.flows + +import net.corda.core.internal.packageName +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.getOrThrow +import net.corda.finance.DOLLARS +import net.corda.finance.contracts.asset.Cash +import net.corda.finance.contracts.getCashBalance +import net.corda.finance.schemas.CashSchemaV1 +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.driver.internal.InProcessImpl +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test + +class CashSelectionTest { + + @Test + fun unconsumed_cash_states() { + + driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf(Cash::class, CashSchemaV1::class).map { it.packageName })) { + + defaultNotaryNode.getOrThrow() + val node = startNode().getOrThrow() as InProcessImpl + val issuerRef = OpaqueBytes.of(0) + val issuedAmount = 1000.DOLLARS + + node.rpc.startFlow(::CashIssueFlow, issuedAmount, issuerRef, defaultNotaryIdentity).returnValue.getOrThrow() + + val availableBalance = node.rpc.getCashBalance(issuedAmount.token) + + assertThat(availableBalance).isEqualTo(issuedAmount) + + val exitedAmount = 300.DOLLARS + node.rpc.startFlow(::CashExitFlow, exitedAmount, issuerRef).returnValue.getOrThrow() + + val availableBalanceAfterExit = node.rpc.getCashBalance(issuedAmount.token) + + assertThat(availableBalanceAfterExit).isEqualTo(issuedAmount - exitedAmount) + } + } +} \ No newline at end of file diff --git a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt index 255ab6610f..93416d3f5d 100644 --- a/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt +++ b/finance/src/main/kotlin/net/corda/finance/contracts/asset/cash/selection/AbstractCashSelection.kt @@ -68,7 +68,7 @@ abstract class AbstractCashSelection(private val maxRetries : Int = 8, private v * loaded JDBC driver. * Note: the first loaded implementation to pass this check will be used at run-time. */ - abstract fun isCompatible(metadata: DatabaseMetaData): Boolean + protected abstract fun isCompatible(metadata: DatabaseMetaData): Boolean /** * A vendor specific query(ies) to gather Cash states that are available. @@ -84,10 +84,10 @@ abstract class AbstractCashSelection(private val maxRetries : Int = 8, private v * otherwise what is available is returned unlocked for informational purposes. * @return The result of the withResultSet function */ - abstract fun executeQuery(connection: Connection, amount: Amount, lockId: UUID, notary: Party?, + protected abstract fun executeQuery(connection: Connection, amount: Amount, lockId: UUID, notary: Party?, onlyFromIssuerParties: Set, withIssuerRefs: Set, withResultSet: (ResultSet) -> Boolean): Boolean - override abstract fun toString(): String + abstract override fun toString(): String /** * Query to gather Cash states that are available and retry if they are temporarily unavailable. diff --git a/node-api/build.gradle b/node-api/build.gradle index 61d0a37e77..1edbe50840 100644 --- a/node-api/build.gradle +++ b/node-api/build.gradle @@ -56,7 +56,7 @@ dependencies { testCompile "org.apache.curator:curator-test:${curator_version}" // FastClasspathScanner: classpath scanning - needed for the NetworkBootstraper - compile 'io.github.lukehutch:fast-classpath-scanner:2.0.21' + compile 'io.github.lukehutch:fast-classpath-scanner:2.12.3' // Pure-Java Snappy compression compile 'org.iq80.snappy:snappy:0.4' diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index db3c6a01d7..b02eeaf275 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -285,6 +285,12 @@ class NodeSchedulerService(private val clock: CordaClock, schedulerTimerExecutor.join() } + @VisibleForTesting + internal fun cancelAndWait() { + schedulerTimerExecutor.shutdownNow() + schedulerTimerExecutor.join() + } + private inner class FlowStartDeduplicationHandler(val scheduledState: ScheduledStateRef) : DeduplicationHandler { override fun insideDatabaseTransaction() { scheduledStates.remove(scheduledState.ref) diff --git a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt index 10dbec15ea..38919396fe 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt @@ -12,6 +12,7 @@ package net.corda.node.services.events import com.nhaarman.mockito_kotlin.* import net.corda.core.contracts.* +import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogicRef import net.corda.core.flows.FlowLogicRefFactory @@ -20,12 +21,15 @@ import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.uncheckedCast import net.corda.core.node.ServicesForResolution import net.corda.core.utilities.days +import net.corda.node.internal.configureDatabase import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.NodePropertiesStore import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.testing.internal.doLookup import net.corda.testing.internal.rigorousMock +import net.corda.testing.node.MockServices import net.corda.testing.node.TestClock import org.junit.Rule import org.junit.Test @@ -36,9 +40,15 @@ import java.time.Clock import java.time.Duration import java.time.Instant -class NodeSchedulerServiceTest { - private val mark = Instant.now() - private val testClock = TestClock(rigorousMock().also { +open class NodeSchedulerServiceTestBase { + protected class Event(time: Instant) { + val stateRef = rigorousMock() + val flowLogic = rigorousMock>() + val ssr = ScheduledStateRef(stateRef, time) + } + + protected val mark = Instant.now()!! + protected val testClock = TestClock(rigorousMock().also { doReturn(mark).whenever(it).instant() }) private val database = rigorousMock().also { @@ -47,27 +57,54 @@ class NodeSchedulerServiceTest { rigorousMock().block() }.whenever(it).transaction(any()) } - private val flowStarter = rigorousMock().also { + protected val flowStarter = rigorousMock().also { doReturn(openFuture>()).whenever(it).startFlow(any>(), any(), any()) } private val flowsDraingMode = rigorousMock().also { doReturn(false).whenever(it).isEnabled() } - private val nodeProperties = rigorousMock().also { + protected val nodeProperties = rigorousMock().also { doReturn(flowsDraingMode).whenever(it).flowsDrainingMode } - private val transactionStates = mutableMapOf>() - private val servicesForResolution = rigorousMock().also { - doLookup(transactionStates).whenever(it).loadState(any()) - } - private val flows = mutableMapOf>() - private val flowLogicRefFactory = rigorousMock().also { + protected val flows = mutableMapOf>() + protected val flowLogicRefFactory = rigorousMock().also { doLookup(flows).whenever(it).toFlowLogic(any()) } - private val log = rigorousMock().also { + + protected val transactionStates = mutableMapOf>() + protected val servicesForResolution = rigorousMock().also { + doLookup(transactionStates).whenever(it).loadState(any()) + } + protected val log = rigorousMock().also { doReturn(false).whenever(it).isTraceEnabled doNothing().whenever(it).trace(any(), any()) + doNothing().whenever(it).info(any()) + doNothing().whenever(it).error(any(), any()) } + + protected fun assertWaitingFor(ssr: ScheduledStateRef, total: Int = 1) { + // The timeout is to make verify wait, which is necessary as we're racing the NSS thread i.e. we often get here just before the trace: + verify(log, timeout(5000).times(total)).trace(NodeSchedulerService.schedulingAsNextFormat, ssr) + } + + protected fun assertWaitingFor(event: Event, total: Int = 1) = assertWaitingFor(event.ssr, total) + + protected fun assertStarted(flowLogic: FlowLogic<*>) { + // Like in assertWaitingFor, use timeout to make verify wait as we often race the call to startFlow: + verify(flowStarter, timeout(5000)).startFlow(same(flowLogic)!!, any(), any()) + } + + protected fun assertStarted(event: Event) = assertStarted(event.flowLogic) +} + +class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() { + private val database = rigorousMock().also { + doAnswer { + val block: DatabaseTransaction.() -> Any? = uncheckedCast(it.arguments[0]) + rigorousMock().block() + }.whenever(it).transaction(any()) + } + private val scheduler = NodeSchedulerService( testClock, database, @@ -87,12 +124,6 @@ class NodeSchedulerServiceTest { } } - private class Event(time: Instant) { - val stateRef = rigorousMock() - val flowLogic = rigorousMock>() - val ssr = ScheduledStateRef(stateRef, time) - } - private fun schedule(time: Instant) = Event(time).apply { val logicRef = rigorousMock() transactionStates[stateRef] = rigorousMock>().also { @@ -104,16 +135,6 @@ class NodeSchedulerServiceTest { scheduler.scheduleStateActivity(ssr) } - private fun assertWaitingFor(event: Event, total: Int = 1) { - // The timeout is to make verify wait, which is necessary as we're racing the NSS thread i.e. we often get here just before the trace: - verify(log, timeout(5000).times(total)).trace(NodeSchedulerService.schedulingAsNextFormat, event.ssr) - } - - private fun assertStarted(event: Event) { - // Like in assertWaitingFor, use timeout to make verify wait as we often race the call to startFlow: - verify(flowStarter, timeout(5000)).startFlow(same(event.flowLogic)!!, any(), any()) - } - @Test fun `test activity due now`() { assertStarted(schedule(mark)) @@ -191,4 +212,98 @@ class NodeSchedulerServiceTest { scheduler.unscheduleStateActivity(schedule(mark + 1.days).stateRef) testClock.advanceBy(1.days) } +} + +class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() { + private val databaseConfig: DatabaseConfig = DatabaseConfig() + + fun createScheduler(db: CordaPersistence): NodeSchedulerService { + return NodeSchedulerService( + testClock, + db, + flowStarter, + servicesForResolution, + flowLogicRefFactory = flowLogicRefFactory, + nodeProperties = nodeProperties, + drainingModePollPeriod = Duration.ofSeconds(5), + log = log).apply { start() } + } + + fun transactionStateMock(logicRef: FlowLogicRef, time: Instant): TransactionState<*> { + return rigorousMock>().also { + doReturn(rigorousMock().also { + doReturn(ScheduledActivity(logicRef, time)).whenever(it).nextScheduledActivity(any(), any()) + }).whenever(it).data + } + } + + @Test + fun `test that schedule is persisted`() { + val dataSourceProps = MockServices.makeTestDataSourceProperties() + val timeInTheFuture = mark + 1.days + val stateRef = StateRef(SecureHash.zeroHash, 0) + + val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock()) + val scheduler = database.transaction { + createScheduler(database) + } + + val ssr1 = ScheduledStateRef(stateRef, timeInTheFuture) + database.transaction { + scheduler.scheduleStateActivity(ssr1) + } + // XXX: For some reason without the commit the db closes without writing the transactions + database.dataSource.connection.commit() + // Force the thread to shut down with operations waiting + scheduler.cancelAndWait() + database.close() + + val flowLogic = rigorousMock>() + val logicRef = rigorousMock() + + transactionStates[stateRef] = transactionStateMock(logicRef, timeInTheFuture) + flows[logicRef] = flowLogic + + val newDatabase = configureDatabase(dataSourceProps, DatabaseConfig(), rigorousMock()) + val newScheduler = newDatabase.transaction { + createScheduler(newDatabase) + } + testClock.advanceBy(1.days) + assertStarted(flowLogic) + + newScheduler.join() + newDatabase.close() + } + + @Test + fun `test that if schedule is updated then the flow is invoked on the correct schedule`() { + val dataSourceProps = MockServices.makeTestDataSourceProperties() + val timeInTheFuture = mark + 1.days + val stateRef = StateRef(SecureHash.allOnesHash, 0) + + val ssr1 = ScheduledStateRef(stateRef, mark) + val ssr2 = ScheduledStateRef(stateRef, timeInTheFuture) + val logicRef = rigorousMock() + val flowLogic = rigorousMock>() + val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock()) + + val scheduler = database.transaction { + createScheduler(database) + } + + transactionStates[stateRef] = transactionStateMock(logicRef, timeInTheFuture) + flows[logicRef] = flowLogic + + database.transaction { + scheduler.scheduleStateActivity(ssr1) + session.flush() + scheduler.scheduleStateActivity(ssr2) + } + assertWaitingFor(ssr1) + testClock.advanceBy(1.days) + assertStarted(flowLogic) + + scheduler.join() + database.close() + } } \ No newline at end of file diff --git a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt index a123337b7b..93badf7b85 100644 --- a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt +++ b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt @@ -54,6 +54,7 @@ class TraderDemoTest : IntegrationTest() { startFlow(), all())) driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) { + defaultNotaryNode.getOrThrow() val (nodeA, nodeB, bankNode) = listOf( startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)), startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)),