mirror of
https://github.com/corda/corda.git
synced 2025-06-16 14:18:20 +00:00
Code cleanup, mostly shortening long lines (#4070)
This commit is contained in:
@ -1,10 +1,8 @@
|
||||
package net.corda.node.modes.draining
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.testMessage.MESSAGE_CONTRACT_PROGRAM_ID
|
||||
import net.corda.testMessage.Message
|
||||
import net.corda.testMessage.MessageContract
|
||||
import net.corda.testMessage.MessageState
|
||||
import net.corda.RpcInfo
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.core.contracts.Command
|
||||
import net.corda.core.contracts.StateAndContract
|
||||
import net.corda.core.flows.*
|
||||
@ -15,9 +13,11 @@ import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.RpcInfo
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.node.services.Permissions.Companion.all
|
||||
import net.corda.testMessage.MESSAGE_CONTRACT_PROGRAM_ID
|
||||
import net.corda.testMessage.Message
|
||||
import net.corda.testMessage.MessageContract
|
||||
import net.corda.testMessage.MessageState
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
@ -53,7 +53,11 @@ class FlowsDrainingModeContentionTest {
|
||||
@Test
|
||||
fun `draining mode does not deadlock with acks between 2 nodes`() {
|
||||
val message = "Ground control to Major Tom"
|
||||
driver(DriverParameters(startNodesInProcess = true, portAllocation = portAllocation, extraCordappPackagesToScan = listOf(MessageState::class.packageName))) {
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
portAllocation = portAllocation,
|
||||
extraCordappPackagesToScan = listOf(MessageState::class.packageName)
|
||||
)) {
|
||||
val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow()
|
||||
val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow()
|
||||
|
||||
@ -70,11 +74,12 @@ class FlowsDrainingModeContentionTest {
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class ProposeTransactionAndWaitForCommit(private val data: String, private val myRpcInfo: RpcInfo, private val counterParty: Party, private val notary: Party) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
class ProposeTransactionAndWaitForCommit(private val data: String,
|
||||
private val myRpcInfo: RpcInfo,
|
||||
private val counterParty: Party,
|
||||
private val notary: Party) : FlowLogic<SignedTransaction>() {
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
|
||||
val session = initiateFlow(counterParty)
|
||||
val messageState = MessageState(message = Message(data), by = ourIdentity)
|
||||
val command = Command(MessageContract.Commands.Send(), messageState.participants.map { it.owningKey })
|
||||
@ -91,10 +96,8 @@ class ProposeTransactionAndWaitForCommit(private val data: String, private val m
|
||||
|
||||
@InitiatedBy(ProposeTransactionAndWaitForCommit::class)
|
||||
class SignTransactionTriggerDrainingModeAndFinality(private val session: FlowSession) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
|
||||
val tx = subFlow(ReceiveTransactionFlow(session))
|
||||
val signedTx = serviceHub.addSignature(tx)
|
||||
val initiatingRpcInfo = session.receive<RpcInfo>().unwrap { it }
|
||||
@ -105,9 +108,8 @@ class SignTransactionTriggerDrainingModeAndFinality(private val session: FlowSes
|
||||
}
|
||||
|
||||
private fun triggerDrainingModeForInitiatingNode(initiatingRpcInfo: RpcInfo) {
|
||||
|
||||
CordaRPCClient(initiatingRpcInfo.address).start(initiatingRpcInfo.username, initiatingRpcInfo.password).use {
|
||||
it.proxy.setFlowsDrainingModeEnabled(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -32,11 +32,14 @@ import kotlin.test.assertEquals
|
||||
|
||||
class ScheduledFlowIntegrationTests {
|
||||
@StartableByRPC
|
||||
class InsertInitialStateFlow(private val destination: Party, private val notary: Party, private val identity: Int = 1, private val scheduledFor: Instant? = null) : FlowLogic<Unit>() {
|
||||
class InsertInitialStateFlow(private val destination: Party,
|
||||
private val notary: Party,
|
||||
private val identity: Int = 1,
|
||||
private val scheduledFor: Instant? = null) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val scheduledState = ScheduledState(scheduledFor
|
||||
?: serviceHub.clock.instant(), ourIdentity, destination, identity.toString())
|
||||
val creationTime = scheduledFor ?: serviceHub.clock.instant()
|
||||
val scheduledState = ScheduledState(creationTime, ourIdentity, destination, identity.toString())
|
||||
val builder = TransactionBuilder(notary)
|
||||
.addOutputState(scheduledState, DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand(ourIdentity.owningKey))
|
||||
@ -90,8 +93,20 @@ class ScheduledFlowIntegrationTests {
|
||||
val scheduledFor = Instant.now().plusSeconds(10)
|
||||
val initialiseFutures = mutableListOf<CordaFuture<*>>()
|
||||
for (i in 0 until N) {
|
||||
initialiseFutures.add(aliceClient.proxy.startFlow(::InsertInitialStateFlow, bob.nodeInfo.legalIdentities.first(), defaultNotaryIdentity, i, scheduledFor).returnValue)
|
||||
initialiseFutures.add(bobClient.proxy.startFlow(::InsertInitialStateFlow, alice.nodeInfo.legalIdentities.first(), defaultNotaryIdentity, i + 100, scheduledFor).returnValue)
|
||||
initialiseFutures.add(aliceClient.proxy.startFlow(
|
||||
::InsertInitialStateFlow,
|
||||
bob.nodeInfo.legalIdentities.first(),
|
||||
defaultNotaryIdentity,
|
||||
i,
|
||||
scheduledFor
|
||||
).returnValue)
|
||||
initialiseFutures.add(bobClient.proxy.startFlow(
|
||||
::InsertInitialStateFlow,
|
||||
alice.nodeInfo.legalIdentities.first(),
|
||||
defaultNotaryIdentity,
|
||||
i + 100,
|
||||
scheduledFor
|
||||
).returnValue)
|
||||
}
|
||||
initialiseFutures.getOrThrowAll()
|
||||
|
||||
|
@ -46,12 +46,12 @@ class SendMessageFlow(private val message: Message, private val notary: Party, p
|
||||
|
||||
progressTracker.currentStep = FINALISING_TRANSACTION
|
||||
|
||||
if (reciepent != null) {
|
||||
return if (reciepent != null) {
|
||||
val session = initiateFlow(reciepent)
|
||||
subFlow(SendTransactionFlow(session, signedTx))
|
||||
return subFlow(FinalityFlow(signedTx, setOf(reciepent), FINALISING_TRANSACTION.childProgressTracker()))
|
||||
subFlow(FinalityFlow(signedTx, setOf(reciepent), FINALISING_TRANSACTION.childProgressTracker()))
|
||||
} else {
|
||||
return subFlow(FinalityFlow(signedTx, FINALISING_TRANSACTION.childProgressTracker()))
|
||||
subFlow(FinalityFlow(signedTx, FINALISING_TRANSACTION.childProgressTracker()))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -59,10 +59,9 @@ class SendMessageFlow(private val message: Message, private val notary: Party, p
|
||||
|
||||
@InitiatedBy(SendMessageFlow::class)
|
||||
class Record(private val session: FlowSession) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val tx = subFlow(ReceiveTransactionFlow(session, statesToRecord = StatesToRecord.ALL_VISIBLE))
|
||||
serviceHub.addSignature(tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -8,8 +8,8 @@ import net.corda.core.flows.FlowLogicRefFactory
|
||||
import net.corda.core.flows.SchedulableFlow
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
@ -28,6 +28,9 @@ import kotlin.reflect.jvm.jvmName
|
||||
import kotlin.test.fail
|
||||
|
||||
class ScheduledFlowsDrainingModeTest {
|
||||
companion object {
|
||||
private val logger = contextLogger()
|
||||
}
|
||||
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private lateinit var aliceNode: TestStartedNode
|
||||
@ -38,10 +41,6 @@ class ScheduledFlowsDrainingModeTest {
|
||||
|
||||
private var executor: ScheduledExecutorService? = null
|
||||
|
||||
companion object {
|
||||
private val logger = loggerFor<ScheduledFlowsDrainingModeTest>()
|
||||
}
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"), threadPerNode = true)
|
||||
@ -61,7 +60,6 @@ class ScheduledFlowsDrainingModeTest {
|
||||
|
||||
@Test
|
||||
fun `flows draining mode ignores scheduled flows until unset`() {
|
||||
|
||||
val latch = CountDownLatch(1)
|
||||
var shouldFail = true
|
||||
|
||||
@ -73,7 +71,8 @@ class ScheduledFlowsDrainingModeTest {
|
||||
.map { update -> update.produced.single().state.data as ScheduledState }
|
||||
|
||||
scheduledStates.filter { state -> !state.processed }.doOnNext { _ ->
|
||||
// this is needed because there is a delay between the moment a SchedulableState gets in the Vault and the first time nextScheduledActivity is called
|
||||
// This is needed because there is a delay between the moment a SchedulableState gets in the Vault and the
|
||||
// first time nextScheduledActivity is called
|
||||
executor!!.schedule({
|
||||
logger.info("Disabling flows draining mode")
|
||||
shouldFail = false
|
||||
@ -96,8 +95,11 @@ class ScheduledFlowsDrainingModeTest {
|
||||
latch.await()
|
||||
}
|
||||
|
||||
data class ScheduledState(private val creationTime: Instant, val source: Party, val destination: Party, val processed: Boolean = false, override val linearId: UniqueIdentifier = UniqueIdentifier()) : SchedulableState, LinearState {
|
||||
|
||||
data class ScheduledState(private val creationTime: Instant,
|
||||
val source: Party,
|
||||
val destination: Party,
|
||||
val processed: Boolean = false,
|
||||
override val linearId: UniqueIdentifier = UniqueIdentifier()) : SchedulableState, LinearState {
|
||||
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? {
|
||||
return if (!processed) {
|
||||
val logicRef = flowLogicRefFactory.create(ScheduledFlow::class.jvmName, thisStateRef)
|
||||
@ -111,12 +113,12 @@ class ScheduledFlowsDrainingModeTest {
|
||||
}
|
||||
|
||||
class InsertInitialStateFlow(private val destination: Party, private val notary: Party) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
|
||||
val scheduledState = ScheduledState(serviceHub.clock.instant(), ourIdentity, destination)
|
||||
val builder = TransactionBuilder(notary).addOutputState(scheduledState, DummyContract.PROGRAM_ID).addCommand(dummyCommand(ourIdentity.owningKey))
|
||||
val builder = TransactionBuilder(notary)
|
||||
.addOutputState(scheduledState, DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand(ourIdentity.owningKey))
|
||||
val tx = serviceHub.signInitialTransaction(builder)
|
||||
subFlow(FinalityFlow(tx))
|
||||
}
|
||||
@ -124,10 +126,8 @@ class ScheduledFlowsDrainingModeTest {
|
||||
|
||||
@SchedulableFlow
|
||||
class ScheduledFlow(private val stateRef: StateRef) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
|
||||
val state = serviceHub.toStateAndRef<ScheduledState>(stateRef)
|
||||
val scheduledState = state.state.data
|
||||
// Only run flow over states originating on this node
|
||||
@ -137,9 +137,12 @@ class ScheduledFlowsDrainingModeTest {
|
||||
require(!scheduledState.processed) { "State should not have been previously processed" }
|
||||
val notary = state.state.notary
|
||||
val newStateOutput = scheduledState.copy(processed = true)
|
||||
val builder = TransactionBuilder(notary).addInputState(state).addOutputState(newStateOutput, DummyContract.PROGRAM_ID).addCommand(dummyCommand(ourIdentity.owningKey))
|
||||
val builder = TransactionBuilder(notary)
|
||||
.addInputState(state)
|
||||
.addOutputState(newStateOutput, DummyContract.PROGRAM_ID)
|
||||
.addCommand(dummyCommand(ourIdentity.owningKey))
|
||||
val tx = serviceHub.signInitialTransaction(builder)
|
||||
subFlow(FinalityFlow(tx, setOf(scheduledState.destination)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ import org.junit.After
|
||||
import org.junit.Test
|
||||
|
||||
class FinalityHandlerTest {
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private val mockNet = InternalMockNetwork()
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
@ -32,8 +32,6 @@ class FinalityHandlerTest {
|
||||
fun `sent to flow hospital on error and attempted retry on node restart`() {
|
||||
// Setup a network where only Alice has the finance CorDapp and it sends a cash tx to Bob who doesn't have the
|
||||
// CorDapp. Bob's FinalityHandler will error when validating the tx.
|
||||
mockNet = InternalMockNetwork()
|
||||
|
||||
val alice = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, additionalCordapps = setOf(FINANCE_CORDAPP)))
|
||||
|
||||
var bob = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
|
||||
@ -82,8 +80,6 @@ class FinalityHandlerTest {
|
||||
}
|
||||
|
||||
private fun TestStartedNode.getTransaction(id: SecureHash): SignedTransaction? {
|
||||
return database.transaction {
|
||||
services.validatedTransactions.getTransaction(id)
|
||||
}
|
||||
return services.validatedTransactions.getTransaction(id)
|
||||
}
|
||||
}
|
||||
|
@ -24,8 +24,11 @@ import rx.schedulers.Schedulers
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
class ServiceHubConcurrentUsageTest {
|
||||
|
||||
private val mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.finance.schemas", "net.corda.node.services.vault.VaultQueryExceptionsTests", Cash::class.packageName))
|
||||
private val mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages(
|
||||
"net.corda.finance.schemas",
|
||||
"net.corda.node.services.vault.VaultQueryExceptionsTests",
|
||||
Cash::class.packageName
|
||||
))
|
||||
|
||||
@After
|
||||
fun stopNodes() {
|
||||
@ -34,7 +37,6 @@ class ServiceHubConcurrentUsageTest {
|
||||
|
||||
@Test
|
||||
fun `operations requiring a transaction work from another thread`() {
|
||||
|
||||
val latch = CountDownLatch(1)
|
||||
var successful = false
|
||||
val initiatingFlow = TestFlow(mockNet.defaultNotaryIdentity)
|
||||
@ -57,10 +59,8 @@ class ServiceHubConcurrentUsageTest {
|
||||
}
|
||||
|
||||
class TestFlow(private val notary: Party) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
|
||||
val builder = TransactionBuilder(notary)
|
||||
val issuer = ourIdentity.ref(OpaqueBytes.of(0))
|
||||
Cash().generateIssue(builder, 10.DOLLARS.issuedBy(issuer), ourIdentity, notary)
|
||||
@ -68,4 +68,4 @@ class ServiceHubConcurrentUsageTest {
|
||||
return subFlow(FinalityFlow(stx))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -33,7 +33,6 @@ import kotlin.test.assertEquals
|
||||
|
||||
class ScheduledFlowTests {
|
||||
companion object {
|
||||
const val PAGE_SIZE = 20
|
||||
val SORTING = Sort(listOf(Sort.SortColumn(SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_TXN_ID), Sort.Direction.DESC)))
|
||||
}
|
||||
|
||||
@ -168,6 +167,7 @@ class ScheduledFlowTests {
|
||||
assertTrue("Expect all states have run the scheduled task", statesFromB.all { it.state.data.processed })
|
||||
}
|
||||
|
||||
private fun queryStates(vaultService: VaultService): List<StateAndRef<ScheduledState>> =
|
||||
vaultService.queryBy<ScheduledState>(VaultQueryCriteria(), sorting = SORTING).states
|
||||
private fun queryStates(vaultService: VaultService): List<StateAndRef<ScheduledState>> {
|
||||
return vaultService.queryBy<ScheduledState>(VaultQueryCriteria(), sorting = SORTING).states
|
||||
}
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ class NodePair(private val mockNet: InternalMockNetwork) {
|
||||
|
||||
@InitiatingFlow
|
||||
abstract class AbstractClientLogic<out T>(nodePair: NodePair) : FlowLogic<T>() {
|
||||
protected val server = nodePair.server.info.singleIdentity()
|
||||
private val server = nodePair.server.info.singleIdentity()
|
||||
protected abstract fun callImpl(): T
|
||||
@Suspendable
|
||||
override fun call() = callImpl().also {
|
||||
@ -82,9 +82,12 @@ class VaultSoftLockManagerTest {
|
||||
private val mockVault = rigorousMock<VaultServiceInternal>().also {
|
||||
doNothing().whenever(it).softLockRelease(any(), anyOrNull())
|
||||
}
|
||||
|
||||
private val mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages(ContractImpl::class.packageName), defaultFactory = { args ->
|
||||
object : InternalMockNetwork.MockNode(args) {
|
||||
override fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, database: CordaPersistence): VaultServiceInternal {
|
||||
override fun makeVaultService(keyManagementService: KeyManagementService,
|
||||
services: ServicesForResolution,
|
||||
database: CordaPersistence): VaultServiceInternal {
|
||||
val node = this
|
||||
val realVault = super.makeVaultService(keyManagementService, services, database)
|
||||
return object : VaultServiceInternal by realVault {
|
||||
@ -97,13 +100,11 @@ class VaultSoftLockManagerTest {
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
private val nodePair = NodePair(mockNet)
|
||||
@After
|
||||
fun tearDown() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
|
||||
object CommandDataImpl : CommandData
|
||||
|
||||
class ClientLogic(nodePair: NodePair, val state: ContractState) : NodePair.AbstractClientLogic<List<ContractState>>(nodePair) {
|
||||
override fun callImpl() = run {
|
||||
subFlow(FinalityFlow(serviceHub.signInitialTransaction(TransactionBuilder(notary = ourIdentity).apply {
|
||||
@ -151,6 +152,11 @@ class VaultSoftLockManagerTest {
|
||||
verifyNoMoreInteractions(mockVault)
|
||||
}
|
||||
|
||||
@After
|
||||
fun tearDown() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `plain old state is not soft locked`() = run(false, PlainOldState(nodePair), false)
|
||||
|
||||
|
Reference in New Issue
Block a user