NOTICK: Fix Initiate Flow with Anonymous party (#5579)

* delete buildSrc block configuring multiple plugins

* remove outer stage block

* fix issues around initiateFlow with anonymous party

* code checks

* disable unit tests

* fix flowframeworktest

* undo some extraneous changes
This commit is contained in:
Stefano Franz 2019-10-14 11:32:22 +01:00 committed by Jonathan Locke
parent 6de6702cb4
commit 970f60c625
10 changed files with 105 additions and 88 deletions

View File

@ -29,20 +29,18 @@ pipeline {
}
}
stage('Corda Pull Request - Run Tests') {
stage('Unit Tests') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelUnitTest"
if (env.CHANGE_ID) {
pullRequest.createStatus(status: 'success',
context: 'continuous-integration/jenkins/pr-merge/unitTest',
description: 'Unit Tests Passed',
targetUrl: "${env.JOB_URL}/testResults")
}
stage('Unit Tests') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelUnitTest"
if (env.CHANGE_ID) {
pullRequest.createStatus(status: 'success',
context: 'continuous-integration/jenkins/pr-merge/unitTest',
description: 'Unit Tests Passed',
targetUrl: "${env.JOB_URL}/testResults")
}
}
}

1
Jenkinsfile vendored
View File

@ -1,3 +1,4 @@
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
@Library('existing-build-control')
import static com.r3.build.BuildControl.killAllExistingBuildsForJob

View File

@ -6,6 +6,8 @@ import net.corda.core.CordaInternal
import net.corda.core.DeleteForDJVM
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.*
@ -120,14 +122,18 @@ abstract class FlowLogic<out T> {
* is routed depends on the [Destination] type, including whether this call does any initial communication.
*/
@Suspendable
fun initiateFlow(destination: Destination): FlowSession = stateMachine.initiateFlow(destination)
fun initiateFlow(destination: Destination): FlowSession {
require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" }
return stateMachine.initiateFlow(destination, serviceHub.identityService.wellKnownPartyFromAnonymous(destination as AbstractParty)
?: throw IllegalArgumentException("Could not resolve destination: $destination"))
}
/**
* Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note
* that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
*/
@Suspendable
fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party)
fun initiateFlow(party: Party): FlowSession = stateMachine.initiateFlow(party, party)
/**
* Specifies the identity, with certificate, to use for this flow. This will be one of the multiple identities that

View File

@ -18,7 +18,7 @@ interface FlowStateMachine<FLOWRETURN> {
fun <SUSPENDRETURN : Any> suspend(ioRequest: FlowIORequest<SUSPENDRETURN>, maySkipCheckpoint: Boolean): SUSPENDRETURN
@Suspendable
fun initiateFlow(destination: Destination): FlowSession
fun initiateFlow(destination: Destination, wellKnownParty: Party): FlowSession
fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>)

View File

@ -71,7 +71,7 @@ sealed class Event {
* Initiate a flow. This causes a new session object to be created and returned to the flow. Note that no actual
* communication takes place at this time, only on the first send/receive operation on the session.
*/
data class InitiateFlow(val destination: Destination) : Event()
data class InitiateFlow(val destination: Destination, val wellKnownParty: Party) : Event()
/**
* Signal the entering into a subflow.

View File

@ -17,10 +17,11 @@ import net.corda.core.utilities.UntrustworthyData
class FlowSessionImpl(
override val destination: Destination,
private val wellKnownParty: Party,
val sourceSessionId: SessionId
) : FlowSession() {
override val counterparty: Party get() = checkNotNull(destination as? Party) { "$destination is not a Party" }
override val counterparty: Party get() = wellKnownParty
override fun toString(): String = "FlowSessionImpl(destination=$destination, sourceSessionId=$sourceSessionId)"

View File

@ -355,10 +355,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
override fun initiateFlow(destination: Destination): FlowSession {
override fun initiateFlow(destination: Destination, wellKnownParty: Party): FlowSession {
require(destination is Party || destination is AnonymousParty) { "Unsupported destination type ${destination.javaClass.name}" }
val resume = processEventImmediately(
Event.InitiateFlow(destination),
Event.InitiateFlow(destination, wellKnownParty),
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = true
) as FlowContinuation.Resume

View File

@ -466,7 +466,7 @@ class SingleThreadedStateMachineManager(
try {
val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
val initiatedSessionId = SessionId.createRandom(secureRandom)
val senderSession = FlowSessionImpl(sender, initiatedSessionId)
val senderSession = FlowSessionImpl(sender, sender, initiatedSessionId)
val flowLogic = initiatedFlowFactory.createFlow(senderSession)
val initiatedFlowInfo = when (initiatedFlowFactory) {
is InitiatedFlowFactory.Core -> FlowInfo(serviceHub.myInfo.platformVersion, "corda")

View File

@ -235,7 +235,7 @@ class TopLevelTransition(
return@builder FlowContinuation.ProcessEvents
}
val sourceSessionId = SessionId.createRandom(context.secureRandom)
val sessionImpl = FlowSessionImpl(event.destination, sourceSessionId)
val sessionImpl = FlowSessionImpl(event.destination, event.wellKnownParty, sourceSessionId)
val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.destination, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
actions.add(Action.AddSessionBinding(context.id, sourceSessionId))

View File

@ -69,8 +69,8 @@ class FlowFrameworkTests {
@Before
fun setUpMockNet() {
mockNet = InternalMockNetwork(
cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP),
servicePeerAllocationStrategy = RoundRobin()
cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP),
servicePeerAllocationStrategy = RoundRobin()
)
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
@ -139,13 +139,13 @@ class FlowFrameworkTests {
mockNet.runNetwork()
assertSessionTransfers(
aliceNode sent sessionInit(PingPongFlow::class, payload = 10L) to bobNode,
bobNode sent sessionConfirm() to aliceNode,
bobNode sent sessionData(20L) to aliceNode,
aliceNode sent sessionData(11L) to bobNode,
bobNode sent sessionData(21L) to aliceNode,
aliceNode sent normalEnd to bobNode,
bobNode sent normalEnd to aliceNode
aliceNode sent sessionInit(PingPongFlow::class, payload = 10L) to bobNode,
bobNode sent sessionConfirm() to aliceNode,
bobNode sent sessionData(20L) to aliceNode,
aliceNode sent sessionData(11L) to bobNode,
bobNode sent sessionData(21L) to aliceNode,
aliceNode sent normalEnd to bobNode,
bobNode sent normalEnd to aliceNode
)
}
@ -167,7 +167,8 @@ class FlowFrameworkTests {
it.message is ExistingSessionMessage && it.message.payload === EndSessionMessage
}.subscribe { sessionEndReceived.release() }
val resultFuture = aliceNode.services.startFlow(
WaitForOtherSideEndBeforeSendAndReceive(bob, sessionEndReceived)).resultFuture
WaitForOtherSideEndBeforeSendAndReceive(bob, sessionEndReceived)
).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy {
resultFuture.getOrThrow()
@ -186,10 +187,10 @@ class FlowFrameworkTests {
mockNet.runNetwork()
assertThatExceptionOfType(MyFlowException::class.java)
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
.withMessage("Nothing useful")
.withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow
.withStackTraceContaining("Received counter-flow exception from peer")
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
.withMessage("Nothing useful")
.withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow
.withStackTraceContaining("Received counter-flow exception from peer")
bobNode.database.transaction {
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
}
@ -197,15 +198,15 @@ class FlowFrameworkTests {
assertThat(receivingFiber.state).isEqualTo(Strand.State.WAITING)
assertThat((erroringFlow.get().stateMachine as FlowStateMachineImpl).state).isEqualTo(Strand.State.WAITING)
assertThat(erroringFlowSteps.get()).containsExactly(
Notification.createOnNext(ProgressTracker.STARTING),
Notification.createOnNext(ExceptionFlow.START_STEP),
Notification.createOnError(erroringFlow.get().exceptionThrown)
Notification.createOnNext(ProgressTracker.STARTING),
Notification.createOnNext(ExceptionFlow.START_STEP),
Notification.createOnError(erroringFlow.get().exceptionThrown)
)
assertSessionTransfers(
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
bobNode sent sessionConfirm() to aliceNode,
bobNode sent errorMessage(erroringFlow.get().exceptionThrown) to aliceNode
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
bobNode sent sessionConfirm() to aliceNode,
bobNode sent errorMessage(erroringFlow.get().exceptionThrown) to aliceNode
)
// Make sure the original stack trace isn't sent down the wire
val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage
@ -296,8 +297,8 @@ class FlowFrameworkTests {
@Test
fun waitForLedgerCommit() {
val ptx = TransactionBuilder(notary = notaryIdentity)
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(alice.owningKey))
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
.addCommand(dummyCommand(alice.owningKey))
val stx = aliceNode.services.signInitialTransaction(ptx)
val committerStx = aliceNode.registerCordappFlowFactory(CommitterFlow::class) {
@ -313,8 +314,8 @@ class FlowFrameworkTests {
@Test
fun `waitForLedgerCommit throws exception if any active session ends in error`() {
val ptx = TransactionBuilder(notary = notaryIdentity)
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
.addCommand(dummyCommand())
.addOutputState(DummyState(), DummyContract.PROGRAM_ID)
.addCommand(dummyCommand())
val stx = aliceNode.services.signInitialTransaction(ptx)
aliceNode.registerCordappFlowFactory(WaitForLedgerCommitFlow::class) { ExceptionFlow { throw Exception("Error") } }
@ -354,8 +355,8 @@ class FlowFrameworkTests {
val result = aliceNode.services.startFlow(UpgradedFlow(bob)).resultFuture
mockNet.runNetwork()
assertThat(receivedSessionMessages).startsWith(
aliceNode sent sessionInit(UpgradedFlow::class, flowVersion = 2) to bobNode,
bobNode sent sessionConfirm(flowVersion = 1) to aliceNode
aliceNode sent sessionInit(UpgradedFlow::class, flowVersion = 2) to bobNode,
bobNode sent sessionConfirm(flowVersion = 1) to aliceNode
)
val (receivedPayload, node2FlowVersion) = result.getOrThrow()
assertThat(receivedPayload).isEqualTo("Old initiated")
@ -369,8 +370,8 @@ class FlowFrameworkTests {
val flowInfo = aliceNode.services.startFlow(initiatingFlow).resultFuture
mockNet.runNetwork()
assertThat(receivedSessionMessages).startsWith(
aliceNode sent sessionInit(SendFlow::class, flowVersion = 1, payload = "Old initiating") to bobNode,
bobNode sent sessionConfirm(flowVersion = 2) to aliceNode
aliceNode sent sessionInit(SendFlow::class, flowVersion = 1, payload = "Old initiating") to bobNode,
bobNode sent sessionConfirm(flowVersion = 2) to aliceNode
)
assertThat(flowInfo.get().flowVersion).isEqualTo(2)
}
@ -380,8 +381,8 @@ class FlowFrameworkTests {
val future = aliceNode.services.startFlow(NeverRegisteredFlow("Hello", bob)).resultFuture
mockNet.runNetwork()
assertThatExceptionOfType(UnexpectedFlowEndException::class.java)
.isThrownBy { future.getOrThrow() }
.withMessageEndingWith("${NeverRegisteredFlow::class.java.name} is not registered")
.isThrownBy { future.getOrThrow() }
.withMessageEndingWith("${NeverRegisteredFlow::class.java.name} is not registered")
}
@Test
@ -441,9 +442,9 @@ class FlowFrameworkTests {
erroringFlowFuture.getOrThrow()
val flowSteps = erroringFlowSteps.get()
assertThat(flowSteps).containsExactly(
Notification.createOnNext(ProgressTracker.STARTING),
Notification.createOnNext(ExceptionFlow.START_STEP),
Notification.createOnError(erroringFlowFuture.get().exceptionThrown)
Notification.createOnNext(ProgressTracker.STARTING),
Notification.createOnNext(ExceptionFlow.START_STEP),
Notification.createOnError(erroringFlowFuture.get().exceptionThrown)
)
val receiveFlowException = assertFailsWith(UnexpectedFlowEndException::class) {
@ -451,28 +452,28 @@ class FlowFrameworkTests {
}
assertThat(receiveFlowException.message).doesNotContain("evil bug!")
assertThat(receiveFlowSteps.get()).containsExactly(
Notification.createOnNext(ProgressTracker.STARTING),
Notification.createOnNext(ReceiveFlow.START_STEP),
Notification.createOnError(receiveFlowException)
Notification.createOnNext(ProgressTracker.STARTING),
Notification.createOnNext(ReceiveFlow.START_STEP),
Notification.createOnError(receiveFlowException)
)
assertSessionTransfers(
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
bobNode sent sessionConfirm() to aliceNode,
bobNode sent errorMessage() to aliceNode
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
bobNode sent sessionConfirm() to aliceNode,
bobNode sent errorMessage() to aliceNode
)
}
@Test
fun `initiating flow using unknown AnonymousParty`() {
val anonymousBob = bobNode.services.keyManagementService.freshKeyAndCert(bobNode.info.legalIdentitiesAndCerts.single(), false)
.party.anonymise()
.party.anonymise()
bobNode.registerCordappFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
val result = aliceNode.services.startFlow(SendAndReceiveFlow(anonymousBob, "Hello")).resultFuture
mockNet.runNetwork()
assertThatIllegalArgumentException()
.isThrownBy { result.getOrThrow() }
.withMessage("We do not know who $anonymousBob belongs to")
.isThrownBy { result.getOrThrow() }
.withMessage("Could not resolve destination: $anonymousBob")
}
@Test
@ -497,16 +498,18 @@ class FlowFrameworkTests {
private val FlowLogic<*>.progressSteps: CordaFuture<List<Notification<ProgressTracker.Step>>>
get() {
return progressTracker!!.changes
.ofType(Change.Position::class.java)
.map { it.newStep }
.materialize()
.toList()
.toFuture()
.ofType(Change.Position::class.java)
.map { it.newStep }
.materialize()
.toList()
.toFuture()
}
@InitiatingFlow
private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party,
@Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic<Unit>() {
private class WaitForOtherSideEndBeforeSendAndReceive(
val otherParty: Party,
@Transient val receivedOtherFlowEnd: Semaphore
) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
// Kick off the flow on the other side ...
@ -626,7 +629,8 @@ class FlowFrameworkTests {
//endregion Helpers
}
internal fun sessionConfirm(flowVersion: Int = 1) = ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, "")))
internal fun sessionConfirm(flowVersion: Int = 1) =
ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, "")))
internal inline fun <reified P : FlowLogic<*>> TestStartedNode.getSingleFlow(): Pair<P, CordaFuture<*>> {
return smm.findStateMachines(P::class.java).single()
@ -637,17 +641,17 @@ private fun sanitise(message: SessionMessage) = when (message) {
is ExistingSessionMessage -> {
val payload = message.payload
message.copy(
recipientSessionId = SessionId(0),
payload = when (payload) {
is ConfirmSessionMessage -> payload.copy(
initiatedSessionId = SessionId(0),
initiatedFlowInfo = payload.initiatedFlowInfo.copy(appName = "")
)
is ErrorSessionMessage -> payload.copy(
errorId = 0
)
else -> payload
}
recipientSessionId = SessionId(0),
payload = when (payload) {
is ConfirmSessionMessage -> payload.copy(
initiatedSessionId = SessionId(0),
initiatedFlowInfo = payload.initiatedFlowInfo.copy(appName = "")
)
is ErrorSessionMessage -> payload.copy(
errorId = 0
)
else -> payload
}
)
}
}
@ -667,10 +671,12 @@ internal fun TestStartedNode.sendSessionMessage(message: SessionMessage, destina
}
}
internal fun errorMessage(errorResponse: FlowException? = null) = ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0))
internal fun errorMessage(errorResponse: FlowException? = null) =
ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0))
internal infix fun TestStartedNode.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(internals.id, message)
internal infix fun Pair<Int, SessionMessage>.to(node: TestStartedNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress)
internal infix fun Pair<Int, SessionMessage>.to(node: TestStartedNode): SessionTransfer =
SessionTransfer(first, second, node.network.myAddress)
internal data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) {
val isPayloadTransfer: Boolean
@ -785,7 +791,11 @@ internal class MyFlowException(override val message: String) : FlowException() {
internal class MyPeerFlowException(override val message: String, val peer: Party) : FlowException()
@InitiatingFlow
internal class SendAndReceiveFlow(private val destination: Destination, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic<Any>() {
internal class SendAndReceiveFlow(
private val destination: Destination,
private val payload: Any,
private val otherPartySession: FlowSession? = null
) : FlowLogic<Any>() {
constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession)
@Suspendable
@ -795,7 +805,8 @@ internal class SendAndReceiveFlow(private val destination: Destination, private
}
@InitiatingFlow
internal class PingPongFlow(private val otherParty: Party, private val payload: Long, private val otherPartySession: FlowSession? = null) : FlowLogic<Unit>() {
internal class PingPongFlow(private val otherParty: Party, private val payload: Long, private val otherPartySession: FlowSession? = null) :
FlowLogic<Unit>() {
constructor(otherPartySession: FlowSession, payload: Long) : this(otherPartySession.counterparty, payload, otherPartySession)
@Transient