Merge pull request from corda/anthony-os-merge-20180326-2

Anthony os merge 20180326
This commit is contained in:
Anthony Keenan 2018-03-26 17:44:22 +01:00 committed by GitHub
commit 8183da3d5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 213 additions and 34 deletions
client/rpc/src
integration-test/kotlin/net/corda/client/rpc
main/kotlin/net/corda/client/rpc
docs/source
finance/src
integration-test/kotlin/net/corda/finance/flows
main/kotlin/net/corda/finance/contracts/asset/cash/selection
node-api
node/src
main/kotlin/net/corda/node/services/events
test/kotlin/net/corda/node/services/events
samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo

View File

@ -315,16 +315,21 @@ class RPCStabilityTests {
var terminateHandlerCalled = false var terminateHandlerCalled = false
var errorHandlerCalled = false var errorHandlerCalled = false
var exceptionMessage: String? = null
val subscription = client.subscribe() val subscription = client.subscribe()
.doOnTerminate{ terminateHandlerCalled = true } .doOnTerminate{ terminateHandlerCalled = true }
.doOnError { errorHandlerCalled = true } .subscribe({}, {
.subscribe() errorHandlerCalled = true
//log exception
exceptionMessage = it.message
})
serverFollower.shutdown() serverFollower.shutdown()
Thread.sleep(100) Thread.sleep(100)
assertTrue(terminateHandlerCalled) assertTrue(terminateHandlerCalled)
assertTrue(errorHandlerCalled) assertTrue(errorHandlerCalled)
assertEquals("Connection failure detected.", exceptionMessage)
assertTrue(subscription.isUnsubscribed) assertTrue(subscription.isUnsubscribed)
clientFollower.shutdown() // Driver would do this after the new server, causing hang. clientFollower.shutdown() // Driver would do this after the new server, causing hang.

View File

@ -99,6 +99,10 @@ interface CordaRPCClientConfiguration {
* with an error, the observable is closed and you can't then re-subscribe again: you'll have to re-request a fresh * with an error, the observable is closed and you can't then re-subscribe again: you'll have to re-request a fresh
* observable with another RPC. * observable with another RPC.
* *
* In case of loss of connection to the server, the client will try to reconnect using the settings provided via
* [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw
* [RPCException] and previously returned observables will call onError().
*
* @param hostAndPort The network address to connect to. * @param hostAndPort The network address to connect to.
* @param configuration An optional configuration used to tweak client behaviour. * @param configuration An optional configuration used to tweak client behaviour.
* @param sslConfiguration An optional [SSLConfiguration] used to enable secure communication with the server. * @param sslConfiguration An optional [SSLConfiguration] used to enable secure communication with the server.

View File

@ -7,6 +7,11 @@ Unreleased
Here are brief summaries of what's changed between each snapshot release. This includes guidance on how to upgrade code Here are brief summaries of what's changed between each snapshot release. This includes guidance on how to upgrade code
from the previous milestone release. from the previous milestone release.
* Update the fast-classpath-scanner dependent library version from 2.0.21 to 2.12.3
.. note:: Whilst this is not the latest version of this library, that being 2.18.1 at time of writing, versions later
than 2.12.3 (including 2.12.4) exhibit a different issue.
* Node can be shut down abruptly by ``shutdown`` function in `CordaRPCOps` or gracefully (draining flows first) through ``gracefulShutdown`` command from shell. * Node can be shut down abruptly by ``shutdown`` function in `CordaRPCOps` or gracefully (draining flows first) through ``gracefulShutdown`` command from shell.
* Parsing of ``NodeConfiguration`` will now fail if unknown configuration keys are found. * Parsing of ``NodeConfiguration`` will now fail if unknown configuration keys are found.

View File

@ -0,0 +1,43 @@
package net.corda.finance.flows
import net.corda.core.internal.packageName
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.contracts.getCashBalance
import net.corda.finance.schemas.CashSchemaV1
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.InProcessImpl
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
class CashSelectionTest {
@Test
fun unconsumed_cash_states() {
driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf(Cash::class, CashSchemaV1::class).map { it.packageName })) {
defaultNotaryNode.getOrThrow()
val node = startNode().getOrThrow() as InProcessImpl
val issuerRef = OpaqueBytes.of(0)
val issuedAmount = 1000.DOLLARS
node.rpc.startFlow(::CashIssueFlow, issuedAmount, issuerRef, defaultNotaryIdentity).returnValue.getOrThrow()
val availableBalance = node.rpc.getCashBalance(issuedAmount.token)
assertThat(availableBalance).isEqualTo(issuedAmount)
val exitedAmount = 300.DOLLARS
node.rpc.startFlow(::CashExitFlow, exitedAmount, issuerRef).returnValue.getOrThrow()
val availableBalanceAfterExit = node.rpc.getCashBalance(issuedAmount.token)
assertThat(availableBalanceAfterExit).isEqualTo(issuedAmount - exitedAmount)
}
}
}

View File

@ -68,7 +68,7 @@ abstract class AbstractCashSelection(private val maxRetries : Int = 8, private v
* loaded JDBC driver. * loaded JDBC driver.
* Note: the first loaded implementation to pass this check will be used at run-time. * Note: the first loaded implementation to pass this check will be used at run-time.
*/ */
abstract fun isCompatible(metadata: DatabaseMetaData): Boolean protected abstract fun isCompatible(metadata: DatabaseMetaData): Boolean
/** /**
* A vendor specific query(ies) to gather Cash states that are available. * A vendor specific query(ies) to gather Cash states that are available.
@ -84,10 +84,10 @@ abstract class AbstractCashSelection(private val maxRetries : Int = 8, private v
* otherwise what is available is returned unlocked for informational purposes. * otherwise what is available is returned unlocked for informational purposes.
* @return The result of the withResultSet function * @return The result of the withResultSet function
*/ */
abstract fun executeQuery(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?, protected abstract fun executeQuery(connection: Connection, amount: Amount<Currency>, lockId: UUID, notary: Party?,
onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>, withResultSet: (ResultSet) -> Boolean): Boolean onlyFromIssuerParties: Set<AbstractParty>, withIssuerRefs: Set<OpaqueBytes>, withResultSet: (ResultSet) -> Boolean): Boolean
override abstract fun toString(): String abstract override fun toString(): String
/** /**
* Query to gather Cash states that are available and retry if they are temporarily unavailable. * Query to gather Cash states that are available and retry if they are temporarily unavailable.

View File

@ -56,7 +56,7 @@ dependencies {
testCompile "org.apache.curator:curator-test:${curator_version}" testCompile "org.apache.curator:curator-test:${curator_version}"
// FastClasspathScanner: classpath scanning - needed for the NetworkBootstraper // FastClasspathScanner: classpath scanning - needed for the NetworkBootstraper
compile 'io.github.lukehutch:fast-classpath-scanner:2.0.21' compile 'io.github.lukehutch:fast-classpath-scanner:2.12.3'
// Pure-Java Snappy compression // Pure-Java Snappy compression
compile 'org.iq80.snappy:snappy:0.4' compile 'org.iq80.snappy:snappy:0.4'

View File

@ -285,6 +285,12 @@ class NodeSchedulerService(private val clock: CordaClock,
schedulerTimerExecutor.join() schedulerTimerExecutor.join()
} }
@VisibleForTesting
internal fun cancelAndWait() {
schedulerTimerExecutor.shutdownNow()
schedulerTimerExecutor.join()
}
private inner class FlowStartDeduplicationHandler(val scheduledState: ScheduledStateRef) : DeduplicationHandler { private inner class FlowStartDeduplicationHandler(val scheduledState: ScheduledStateRef) : DeduplicationHandler {
override fun insideDatabaseTransaction() { override fun insideDatabaseTransaction() {
scheduledStates.remove(scheduledState.ref) scheduledStates.remove(scheduledState.ref)

View File

@ -12,6 +12,7 @@ package net.corda.node.services.events
import com.nhaarman.mockito_kotlin.* import com.nhaarman.mockito_kotlin.*
import net.corda.core.contracts.* import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRef import net.corda.core.flows.FlowLogicRef
import net.corda.core.flows.FlowLogicRefFactory import net.corda.core.flows.FlowLogicRefFactory
@ -20,12 +21,15 @@ import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.uncheckedCast import net.corda.core.internal.uncheckedCast
import net.corda.core.node.ServicesForResolution import net.corda.core.node.ServicesForResolution
import net.corda.core.utilities.days import net.corda.core.utilities.days
import net.corda.node.internal.configureDatabase
import net.corda.node.services.api.FlowStarter import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.NodePropertiesStore import net.corda.node.services.api.NodePropertiesStore
import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.internal.doLookup import net.corda.testing.internal.doLookup
import net.corda.testing.internal.rigorousMock import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices
import net.corda.testing.node.TestClock import net.corda.testing.node.TestClock
import org.junit.Rule import org.junit.Rule
import org.junit.Test import org.junit.Test
@ -36,9 +40,15 @@ import java.time.Clock
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
class NodeSchedulerServiceTest { open class NodeSchedulerServiceTestBase {
private val mark = Instant.now() protected class Event(time: Instant) {
private val testClock = TestClock(rigorousMock<Clock>().also { val stateRef = rigorousMock<StateRef>()
val flowLogic = rigorousMock<FlowLogic<*>>()
val ssr = ScheduledStateRef(stateRef, time)
}
protected val mark = Instant.now()!!
protected val testClock = TestClock(rigorousMock<Clock>().also {
doReturn(mark).whenever(it).instant() doReturn(mark).whenever(it).instant()
}) })
private val database = rigorousMock<CordaPersistence>().also { private val database = rigorousMock<CordaPersistence>().also {
@ -47,27 +57,54 @@ class NodeSchedulerServiceTest {
rigorousMock<DatabaseTransaction>().block() rigorousMock<DatabaseTransaction>().block()
}.whenever(it).transaction(any()) }.whenever(it).transaction(any())
} }
private val flowStarter = rigorousMock<FlowStarter>().also { protected val flowStarter = rigorousMock<FlowStarter>().also {
doReturn(openFuture<FlowStateMachine<*>>()).whenever(it).startFlow(any<FlowLogic<*>>(), any(), any()) doReturn(openFuture<FlowStateMachine<*>>()).whenever(it).startFlow(any<FlowLogic<*>>(), any(), any())
} }
private val flowsDraingMode = rigorousMock<NodePropertiesStore.FlowsDrainingModeOperations>().also { private val flowsDraingMode = rigorousMock<NodePropertiesStore.FlowsDrainingModeOperations>().also {
doReturn(false).whenever(it).isEnabled() doReturn(false).whenever(it).isEnabled()
} }
private val nodeProperties = rigorousMock<NodePropertiesStore>().also { protected val nodeProperties = rigorousMock<NodePropertiesStore>().also {
doReturn(flowsDraingMode).whenever(it).flowsDrainingMode doReturn(flowsDraingMode).whenever(it).flowsDrainingMode
} }
private val transactionStates = mutableMapOf<StateRef, TransactionState<*>>() protected val flows = mutableMapOf<FlowLogicRef, FlowLogic<*>>()
private val servicesForResolution = rigorousMock<ServicesForResolution>().also { protected val flowLogicRefFactory = rigorousMock<FlowLogicRefFactory>().also {
doLookup(transactionStates).whenever(it).loadState(any())
}
private val flows = mutableMapOf<FlowLogicRef, FlowLogic<*>>()
private val flowLogicRefFactory = rigorousMock<FlowLogicRefFactory>().also {
doLookup(flows).whenever(it).toFlowLogic(any()) doLookup(flows).whenever(it).toFlowLogic(any())
} }
private val log = rigorousMock<Logger>().also {
protected val transactionStates = mutableMapOf<StateRef, TransactionState<*>>()
protected val servicesForResolution = rigorousMock<ServicesForResolution>().also {
doLookup(transactionStates).whenever(it).loadState(any())
}
protected val log = rigorousMock<Logger>().also {
doReturn(false).whenever(it).isTraceEnabled doReturn(false).whenever(it).isTraceEnabled
doNothing().whenever(it).trace(any(), any<Any>()) doNothing().whenever(it).trace(any(), any<Any>())
doNothing().whenever(it).info(any())
doNothing().whenever(it).error(any(), any<Throwable>())
} }
protected fun assertWaitingFor(ssr: ScheduledStateRef, total: Int = 1) {
// The timeout is to make verify wait, which is necessary as we're racing the NSS thread i.e. we often get here just before the trace:
verify(log, timeout(5000).times(total)).trace(NodeSchedulerService.schedulingAsNextFormat, ssr)
}
protected fun assertWaitingFor(event: Event, total: Int = 1) = assertWaitingFor(event.ssr, total)
protected fun assertStarted(flowLogic: FlowLogic<*>) {
// Like in assertWaitingFor, use timeout to make verify wait as we often race the call to startFlow:
verify(flowStarter, timeout(5000)).startFlow(same(flowLogic)!!, any(), any())
}
protected fun assertStarted(event: Event) = assertStarted(event.flowLogic)
}
class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() {
private val database = rigorousMock<CordaPersistence>().also {
doAnswer {
val block: DatabaseTransaction.() -> Any? = uncheckedCast(it.arguments[0])
rigorousMock<DatabaseTransaction>().block()
}.whenever(it).transaction(any())
}
private val scheduler = NodeSchedulerService( private val scheduler = NodeSchedulerService(
testClock, testClock,
database, database,
@ -87,12 +124,6 @@ class NodeSchedulerServiceTest {
} }
} }
private class Event(time: Instant) {
val stateRef = rigorousMock<StateRef>()
val flowLogic = rigorousMock<FlowLogic<*>>()
val ssr = ScheduledStateRef(stateRef, time)
}
private fun schedule(time: Instant) = Event(time).apply { private fun schedule(time: Instant) = Event(time).apply {
val logicRef = rigorousMock<FlowLogicRef>() val logicRef = rigorousMock<FlowLogicRef>()
transactionStates[stateRef] = rigorousMock<TransactionState<SchedulableState>>().also { transactionStates[stateRef] = rigorousMock<TransactionState<SchedulableState>>().also {
@ -104,16 +135,6 @@ class NodeSchedulerServiceTest {
scheduler.scheduleStateActivity(ssr) scheduler.scheduleStateActivity(ssr)
} }
private fun assertWaitingFor(event: Event, total: Int = 1) {
// The timeout is to make verify wait, which is necessary as we're racing the NSS thread i.e. we often get here just before the trace:
verify(log, timeout(5000).times(total)).trace(NodeSchedulerService.schedulingAsNextFormat, event.ssr)
}
private fun assertStarted(event: Event) {
// Like in assertWaitingFor, use timeout to make verify wait as we often race the call to startFlow:
verify(flowStarter, timeout(5000)).startFlow(same(event.flowLogic)!!, any(), any())
}
@Test @Test
fun `test activity due now`() { fun `test activity due now`() {
assertStarted(schedule(mark)) assertStarted(schedule(mark))
@ -192,3 +213,97 @@ class NodeSchedulerServiceTest {
testClock.advanceBy(1.days) testClock.advanceBy(1.days)
} }
} }
class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() {
private val databaseConfig: DatabaseConfig = DatabaseConfig()
fun createScheduler(db: CordaPersistence): NodeSchedulerService {
return NodeSchedulerService(
testClock,
db,
flowStarter,
servicesForResolution,
flowLogicRefFactory = flowLogicRefFactory,
nodeProperties = nodeProperties,
drainingModePollPeriod = Duration.ofSeconds(5),
log = log).apply { start() }
}
fun transactionStateMock(logicRef: FlowLogicRef, time: Instant): TransactionState<*> {
return rigorousMock<TransactionState<SchedulableState>>().also {
doReturn(rigorousMock<SchedulableState>().also {
doReturn(ScheduledActivity(logicRef, time)).whenever(it).nextScheduledActivity(any(), any())
}).whenever(it).data
}
}
@Test
fun `test that schedule is persisted`() {
val dataSourceProps = MockServices.makeTestDataSourceProperties()
val timeInTheFuture = mark + 1.days
val stateRef = StateRef(SecureHash.zeroHash, 0)
val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock())
val scheduler = database.transaction {
createScheduler(database)
}
val ssr1 = ScheduledStateRef(stateRef, timeInTheFuture)
database.transaction {
scheduler.scheduleStateActivity(ssr1)
}
// XXX: For some reason without the commit the db closes without writing the transactions
database.dataSource.connection.commit()
// Force the thread to shut down with operations waiting
scheduler.cancelAndWait()
database.close()
val flowLogic = rigorousMock<FlowLogic<*>>()
val logicRef = rigorousMock<FlowLogicRef>()
transactionStates[stateRef] = transactionStateMock(logicRef, timeInTheFuture)
flows[logicRef] = flowLogic
val newDatabase = configureDatabase(dataSourceProps, DatabaseConfig(), rigorousMock())
val newScheduler = newDatabase.transaction {
createScheduler(newDatabase)
}
testClock.advanceBy(1.days)
assertStarted(flowLogic)
newScheduler.join()
newDatabase.close()
}
@Test
fun `test that if schedule is updated then the flow is invoked on the correct schedule`() {
val dataSourceProps = MockServices.makeTestDataSourceProperties()
val timeInTheFuture = mark + 1.days
val stateRef = StateRef(SecureHash.allOnesHash, 0)
val ssr1 = ScheduledStateRef(stateRef, mark)
val ssr2 = ScheduledStateRef(stateRef, timeInTheFuture)
val logicRef = rigorousMock<FlowLogicRef>()
val flowLogic = rigorousMock<FlowLogic<*>>()
val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock())
val scheduler = database.transaction {
createScheduler(database)
}
transactionStates[stateRef] = transactionStateMock(logicRef, timeInTheFuture)
flows[logicRef] = flowLogic
database.transaction {
scheduler.scheduleStateActivity(ssr1)
session.flush()
scheduler.scheduleStateActivity(ssr2)
}
assertWaitingFor(ssr1)
testClock.advanceBy(1.days)
assertStarted(flowLogic)
scheduler.join()
database.close()
}
}

View File

@ -54,6 +54,7 @@ class TraderDemoTest : IntegrationTest() {
startFlow<CommercialPaperIssueFlow>(), startFlow<CommercialPaperIssueFlow>(),
all())) all()))
driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) { driver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
defaultNotaryNode.getOrThrow()
val (nodeA, nodeB, bankNode) = listOf( val (nodeA, nodeB, bankNode) = listOf(
startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)), startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)),
startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)), startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)),