Add IdentitySyncFlow

Add IdentitySyncFlow for synchronizing the certificate paths for confidential identities in a transaction with all counterparties of the transaction.
This commit is contained in:
Ross Nicoll 2017-08-25 17:53:31 +01:00 committed by GitHub
parent 701c4f3c60
commit 8f0ea714b3
3 changed files with 188 additions and 0 deletions

View File

@ -0,0 +1,96 @@
package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.ContractState
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.unwrap
object IdentitySyncFlow {
/**
* Flow for ensuring that one or more counterparties to a transaction have the full certificate paths of confidential
* identities used in the transaction. This is intended for use as a subflow of another flow, typically between
* transaction assembly and signing. An example of where this is useful is where a recipient of a [Cash] state wants
* to know that it is being paid by the correct party, and the owner of the state is a confidential identity of that
* party. This flow would send a copy of the confidential identity path to the recipient, enabling them to verify that
* identity.
*
* @return a mapping of well known identities to the confidential identities used in the transaction.
*/
// TODO: Can this be triggered automatically from [SendTransactionFlow]
class Send(val otherSides: Set<Party>,
val tx: WireTransaction,
override val progressTracker: ProgressTracker) : FlowLogic<Unit>() {
constructor(otherSide: Party, tx: WireTransaction) : this(setOf(otherSide), tx, tracker())
companion object {
object SYNCING_IDENTITIES : ProgressTracker.Step("Syncing identities")
fun tracker() = ProgressTracker(SYNCING_IDENTITIES)
}
@Suspendable
override fun call() {
progressTracker.currentStep = SYNCING_IDENTITIES
val states: List<ContractState> = (tx.inputs.map { serviceHub.loadState(it) }.requireNoNulls().map { it.data } + tx.outputs.map { it.data })
val identities: Set<AbstractParty> = states.flatMap { it.participants }.toSet()
// Filter participants down to the set of those not in the network map (are not well known)
val confidentialIdentities = identities
.filter { serviceHub.networkMapCache.getNodeByLegalIdentityKey(it.owningKey) == null }
.toList()
val identityCertificates: Map<AbstractParty, PartyAndCertificate?> = identities
.map { Pair(it, serviceHub.identityService.certificateFromKey(it.owningKey)) }.toMap()
otherSides.forEach { otherSide ->
val requestedIdentities: List<AbstractParty> = sendAndReceive<List<AbstractParty>>(otherSide, confidentialIdentities).unwrap { req ->
require(req.all { it in identityCertificates.keys }) { "${otherSide} requested a confidential identity not part of transaction: ${tx.id}" }
req
}
val sendIdentities: List<PartyAndCertificate?> = requestedIdentities.map {
val identityCertificate = identityCertificates[it]
if (identityCertificate != null)
identityCertificate
else
throw IllegalStateException("Counterparty requested a confidential identity for which we do not have the certificate path: ${tx.id}")
}
send(otherSide, sendIdentities)
}
}
}
/**
* Handle an offer to provide proof of identity (in the form of certificate paths) for confidential identities which
* we do not yet know about.
*/
class Receive(val otherSide: Party) : FlowLogic<Unit>() {
companion object {
object RECEIVING_IDENTITIES : ProgressTracker.Step("Receiving confidential identities")
object RECEIVING_CERTIFICATES : ProgressTracker.Step("Receiving certificates for unknown identities")
}
override val progressTracker: ProgressTracker = ProgressTracker(RECEIVING_IDENTITIES, RECEIVING_CERTIFICATES)
@Suspendable
override fun call(): Unit {
progressTracker.currentStep = RECEIVING_IDENTITIES
val allIdentities = receive<List<AbstractParty>>(otherSide).unwrap { it }
val unknownIdentities = allIdentities.filter { serviceHub.identityService.partyFromAnonymous(it) == null }
progressTracker.currentStep = RECEIVING_CERTIFICATES
val missingIdentities = sendAndReceive<List<PartyAndCertificate>>(otherSide, unknownIdentities)
// Batch verify the identities we've received, so we know they're all correct before we start storing them in
// the identity service
missingIdentities.unwrap { identities ->
identities.forEach { it.verify(serviceHub.identityService.trustAnchor) }
identities
}.forEach { identity ->
// Store the received confidential identities in the identity service so we have a record of which well known identity they map to.
serviceHub.identityService.verifyAndRegisterIdentity(identity)
}
}
}
}

View File

@ -0,0 +1,87 @@
package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.identity.Party
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.finance.DOLLARS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.node.MockNetwork
import org.junit.After
import org.junit.Before
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertNull
class IdentitySyncFlowTests {
lateinit var mockNet: MockNetwork
@Before
fun before() {
// We run this in parallel threads to help catch any race conditions that may exist.
mockNet = MockNetwork(networkSendManuallyPumped = false, threadPerNode = true)
}
@After
fun cleanUp() {
mockNet.stopNodes()
}
@Test
fun `sync confidential identities`() {
// Set up values we'll need
val notaryNode = mockNet.createNotaryNode(null, DUMMY_NOTARY.name)
val aliceNode = mockNet.createPartyNode(notaryNode.network.myAddress, ALICE.name)
val bobNode = mockNet.createPartyNode(notaryNode.network.myAddress, BOB.name)
val alice: Party = aliceNode.services.myInfo.legalIdentity
val bob: Party = bobNode.services.myInfo.legalIdentity
aliceNode.services.identityService.verifyAndRegisterIdentity(bobNode.info.legalIdentityAndCert)
aliceNode.services.identityService.verifyAndRegisterIdentity(notaryNode.info.legalIdentityAndCert)
bobNode.services.identityService.verifyAndRegisterIdentity(aliceNode.info.legalIdentityAndCert)
bobNode.services.identityService.verifyAndRegisterIdentity(notaryNode.info.legalIdentityAndCert)
bobNode.registerInitiatedFlow(Receive::class.java)
// Alice issues then pays some cash to a new confidential identity that Bob doesn't know about
val anonymous = true
val ref = OpaqueBytes.of(0x01)
val issueFlow = aliceNode.services.startFlow(CashIssueAndPaymentFlow(1000.DOLLARS, ref, alice, anonymous, notaryNode.services.myInfo.notaryIdentity))
val issueTx = issueFlow.resultFuture.getOrThrow().stx
val confidentialIdentity = issueTx.tx.outputs.map { it.data }.filterIsInstance<Cash.State>().single().owner
assertNull(bobNode.services.identityService.partyFromAnonymous(confidentialIdentity))
// Run the flow to sync up the identities
aliceNode.services.startFlow(Initiator(bob, issueTx.tx)).resultFuture.getOrThrow()
val expected = aliceNode.services.identityService.partyFromAnonymous(confidentialIdentity)
val actual = bobNode.services.identityService.partyFromAnonymous(confidentialIdentity)
assertEquals(expected, actual)
}
/**
* Very lightweight wrapping flow to trigger the counterparty flow that receives the identities.
*/
@InitiatingFlow
class Initiator(val otherSide: Party, val tx: WireTransaction): FlowLogic<Boolean>() {
@Suspendable
override fun call(): Boolean {
subFlow(IdentitySyncFlow.Send(otherSide, tx))
// Wait for the counterparty to indicate they're done
return receive<Boolean>(otherSide).unwrap { it }
}
}
@InitiatedBy(IdentitySyncFlowTests.Initiator::class)
class Receive(val otherSide: Party): FlowLogic<Unit>() {
@Suspendable
override fun call() {
subFlow(IdentitySyncFlow.Receive(otherSide))
// Notify the initiator that we've finished syncing
send(otherSide, true)
}
}
}

View File

@ -431,6 +431,11 @@ class TwoPartyTradeFlowTests {
val bankNode = makeNodeWithTracking(notaryNode.network.myAddress, BOC.name) val bankNode = makeNodeWithTracking(notaryNode.network.myAddress, BOC.name)
val issuer = bankNode.info.legalIdentity.ref(1, 2, 3) val issuer = bankNode.info.legalIdentity.ref(1, 2, 3)
val allNodes = listOf(notaryNode, aliceNode, bobNode, bankNode)
allNodes.forEach { node ->
allNodes.map { it.services.myInfo.legalIdentityAndCert }.forEach { identity -> node.services.identityService.verifyAndRegisterIdentity(identity) }
}
ledger(aliceNode.services, initialiseSerialization = false) { ledger(aliceNode.services, initialiseSerialization = false) {
// Insert a prospectus type attachment into the commercial paper transaction. // Insert a prospectus type attachment into the commercial paper transaction.