Changed FlowLogic.ourIdentity to return Party and added FlowLogic.ourIdentityAndCert which returns PartyAndCertificate. (#1537)

Updated code base to make use of these instead of chooseIdentity(). Also improved the serialisation of fiber checkpoints so that it doesn't store the entire cert parth of this identity.
This commit is contained in:
Shams Asari
2017-09-18 15:28:23 +01:00
committed by josecoll
parent 8f86068807
commit 8e0b8477af
35 changed files with 178 additions and 165 deletions

View File

@ -11,7 +11,6 @@ import net.corda.testing.BOB
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.aliceBobAndNotary
import net.corda.testing.contracts.DUMMY_PROGRAM_ID
import net.corda.testing.chooseIdentity
import net.corda.testing.contracts.DummyState
import net.corda.testing.driver.driver
import net.corda.testing.dummyCommand
@ -24,17 +23,20 @@ import kotlin.test.assertEquals
*/
class LargeTransactionsTest {
@StartableByRPC @InitiatingFlow
class SendLargeTransactionFlow(val hash1: SecureHash, val hash2: SecureHash, val hash3: SecureHash, val hash4: SecureHash) : FlowLogic<Unit>() {
class SendLargeTransactionFlow(private val hash1: SecureHash,
private val hash2: SecureHash,
private val hash3: SecureHash,
private val hash4: SecureHash) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val tx = TransactionBuilder(notary = DUMMY_NOTARY)
.addOutputState(DummyState(), DUMMY_PROGRAM_ID)
.addCommand(dummyCommand(serviceHub.myInfo.chooseIdentity().owningKey))
.addCommand(dummyCommand(ourIdentity.owningKey))
.addAttachment(hash1)
.addAttachment(hash2)
.addAttachment(hash3)
.addAttachment(hash4)
val stx = serviceHub.signInitialTransaction(tx, serviceHub.myInfo.chooseIdentity().owningKey)
val stx = serviceHub.signInitialTransaction(tx, ourIdentity.owningKey)
// Send to the other side and wait for it to trigger resolution from us.
val bob = serviceHub.identityService.partyFromX500Name(BOB.name)!!
subFlow(SendTransactionFlow(bob, stx))

View File

@ -140,7 +140,7 @@ class SendMessageFlow(private val message: Message) : FlowLogic<SignedTransactio
progressTracker.currentStep = GENERATING_TRANSACTION
val messageState = MessageState(message = message, by = serviceHub.myInfo.chooseIdentity())
val messageState = MessageState(message = message, by = ourIdentity)
val txCommand = Command(MessageContract.Commands.Send(), messageState.participants.map { it.owningKey })
val txBuilder = TransactionBuilder(notary).withItems(StateAndContract(messageState, MESSAGE_CONTRACT_PROGRAM_ID), txCommand)

View File

@ -731,9 +731,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
return cordappServices.getInstance(type) ?: throw IllegalArgumentException("Corda service ${type.name} does not exist")
}
override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator, me: PartyAndCertificate?): FlowStateMachineImpl<T> {
check(me == null || me in myInfo.legalIdentitiesAndCerts) { "Attempt to start a flow with legal identity not belonging to this node." }
return serverThread.fetchFrom { smm.add(logic, flowInitiator, me) }
override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator, ourIdentity: Party?): FlowStateMachineImpl<T> {
return serverThread.fetchFrom { smm.add(logic, flowInitiator, ourIdentity) }
}
override fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>? {

View File

@ -10,7 +10,6 @@ import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
@ -143,11 +142,11 @@ class CordaRPCOpsImpl(
private fun <T> startFlow(logicType: Class<out FlowLogic<T>>, args: Array<out Any?>): FlowStateMachineImpl<T> {
require(logicType.isAnnotationPresent(StartableByRPC::class.java)) { "${logicType.name} was not designed for RPC" }
val me = services.myInfo.legalIdentitiesAndCerts.first() // TODO RPC flows should have mapping user -> identity that should be resolved automatically on starting flow.
val rpcContext = getRpcContext()
rpcContext.requirePermission(startFlowPermission(logicType))
val currentUser = FlowInitiator.RPC(rpcContext.currentUser.username)
return services.invokeFlowAsync(logicType, currentUser, me, *args)
// TODO RPC flows should have mapping user -> identity that should be resolved automatically on starting flow.
return services.invokeFlowAsync(logicType, currentUser, *args)
}
override fun attachmentExists(id: SecureHash): Boolean {

View File

@ -1,16 +1,12 @@
package net.corda.node.services
import co.paralleluniverse.fibers.Suspendable
import net.corda.confidential.SwapIdentitiesFlow
import net.corda.core.flows.AbstractStateReplacementFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.ReceiveTransactionFlow
import net.corda.core.flows.StateReplacementException
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.unwrap
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
@ -32,7 +28,7 @@ class NotaryChangeHandler(otherSide: Party) : AbstractStateReplacementFlow.Accep
* and is also in a geographically convenient location we can just automatically approve the change.
* TODO: In more difficult cases this should call for human attention to manually verify and approve the proposal
*/
override fun verifyProposal(stx: SignedTransaction, proposal: AbstractStateReplacementFlow.Proposal<Party>): Unit {
override fun verifyProposal(stx: SignedTransaction, proposal: AbstractStateReplacementFlow.Proposal<Party>) {
val state = proposal.stateRef
val proposedTx = stx.resolveNotaryChangeTransaction(serviceHub)
val newNotary = proposal.modification

View File

@ -6,7 +6,6 @@ import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.VisibleForTesting
import net.corda.core.messaging.DataFeed
@ -118,7 +117,7 @@ interface ServiceHubInternal : ServiceHub {
* Starts an already constructed flow. Note that you must be on the server thread to call this method.
* @param flowInitiator indicates who started the flow, see: [FlowInitiator].
*/
fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator, me: PartyAndCertificate? = null): FlowStateMachineImpl<T>
fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator, ourIdentity: Party? = null): FlowStateMachineImpl<T>
/**
* Will check [logicType] and [args] against a whitelist and if acceptable then construct and initiate the flow.
@ -131,12 +130,11 @@ interface ServiceHubInternal : ServiceHub {
fun <T> invokeFlowAsync(
logicType: Class<out FlowLogic<T>>,
flowInitiator: FlowInitiator,
me: PartyAndCertificate? = null,
vararg args: Any?): FlowStateMachineImpl<T> {
val logicRef = FlowLogicRefFactoryImpl.createForRPC(logicType, *args)
@Suppress("UNCHECKED_CAST")
val logic = FlowLogicRefFactoryImpl.toFlowLogic(logicRef) as FlowLogic<T>
return startFlow(logic, flowInitiator, me)
return startFlow(logic, flowInitiator, ourIdentity = null)
}
fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>?

View File

@ -12,9 +12,12 @@ import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.*
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.abbreviate
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.isRegularFile
import net.corda.core.internal.staticField
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.*
import net.corda.node.services.api.FlowAppAuditEvent
@ -37,7 +40,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val logic: FlowLogic<R>,
scheduler: FiberScheduler,
override val flowInitiator: FlowInitiator,
override val ourIdentity: PartyAndCertificate) : Fiber<Unit>(id.toString(), scheduler), FlowStateMachine<R> {
// Store the Party rather than the full cert path with PartyAndCertificate
val ourIdentity: Party) : Fiber<Unit>(id.toString(), scheduler), FlowStateMachine<R> {
companion object {
// Used to work around a small limitation in Quasar.
private val QUASAR_UNBLOCKER = Fiber::class.staticField<Any>("SERIALIZER_BLOCKER").value
@ -67,6 +72,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
// These fields shouldn't be serialised, so they are marked @Transient.
@Transient override lateinit var serviceHub: ServiceHubInternal
@Transient override lateinit var ourIdentityAndCert: PartyAndCertificate
@Transient internal lateinit var database: CordaPersistence
@Transient internal lateinit var actionOnSuspend: (FlowIORequest) -> Unit
@Transient internal lateinit var actionOnEnd: (Try<R>, Boolean) -> Unit

View File

@ -3,7 +3,6 @@ package net.corda.node.services.statemachine
import net.corda.core.flows.FlowException
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.castIfPossible
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.UntrustworthyData
@ -26,9 +25,7 @@ data class SessionInit(val initiatorSessionId: Long,
val initiatingFlowClass: String,
val flowVersion: Int,
val appName: String,
val firstPayload: Any?,
// Left as a placeholder for support of multiple identities on a node. For now we choose the first one as a special one.
val otherIdentity: PartyAndCertificate? = null) : SessionMessage
val firstPayload: Any?) : SessionMessage
data class SessionConfirm(override val initiatorSessionId: Long,
val initiatedSessionId: Long,

View File

@ -14,7 +14,6 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.castIfPossible
@ -371,8 +370,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
session.receivedMessages += ReceivedSessionMessage(sender, SessionData(session.ourSessionId, sessionInit.firstPayload))
}
openSessions[session.ourSessionId] = session
val meIdentity = sessionInit.otherIdentity ?: serviceHub.myInfo.legalIdentitiesAndCerts.first()
val fiber = createFiber(flow, FlowInitiator.Peer(sender), meIdentity)
// TODO Perhaps the session-init will specificy which of our multiple identies to use, which we would have to
// double-check is actually ours. However, what if we want to control how our identities gets used?
val fiber = createFiber(flow, FlowInitiator.Peer(sender))
flowSession.sessionFlow = flow
flowSession.stateMachine = fiber
fiber.openSessions[Pair(flow, sender)] = session
@ -427,15 +427,23 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun <T> createFiber(logic: FlowLogic<T>, flowInitiator: FlowInitiator, me: PartyAndCertificate): FlowStateMachineImpl<T> {
val id = StateMachineRunId.createRandom()
return FlowStateMachineImpl(id, logic, scheduler, flowInitiator, me).apply { initFiber(this) }
private fun <T> createFiber(logic: FlowLogic<T>, flowInitiator: FlowInitiator, ourIdentity: Party? = null): FlowStateMachineImpl<T> {
val fsm = FlowStateMachineImpl(
StateMachineRunId.createRandom(),
logic,
scheduler,
flowInitiator,
ourIdentity ?: serviceHub.myInfo.legalIdentities[0])
initFiber(fsm)
return fsm
}
private fun initFiber(fiber: FlowStateMachineImpl<*>) {
verifyFlowLogicIsSuspendable(fiber.logic)
fiber.database = database
fiber.serviceHub = serviceHub
fiber.ourIdentityAndCert = serviceHub.myInfo.legalIdentitiesAndCerts.find { it.party == fiber.ourIdentity }
?: throw IllegalStateException("Identity specified by ${fiber.id} (${fiber.ourIdentity}) is not one of ours!")
fiber.actionOnSuspend = { ioRequest ->
updateCheckpoint(fiber)
// We commit on the fibers transaction that was copied across ThreadLocals during suspend
@ -514,11 +522,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
*
* Note that you must be on the [executor] thread.
*/
fun <T> add(logic: FlowLogic<T>, flowInitiator: FlowInitiator, me: PartyAndCertificate?): FlowStateMachineImpl<T> {
fun <T> add(logic: FlowLogic<T>, flowInitiator: FlowInitiator, ourIdentity: Party? = null): FlowStateMachineImpl<T> {
// TODO: Check that logic has @Suspendable on its call method.
executor.checkOnThread()
val fiber = database.transaction {
val fiber = createFiber(logic, flowInitiator, me ?: serviceHub.myInfo.legalIdentitiesAndCerts.first())
val fiber = createFiber(logic, flowInitiator, ourIdentity)
updateCheckpoint(fiber)
fiber
}

View File

@ -545,17 +545,17 @@ class TwoPartyTradeFlowTests {
}
@InitiatingFlow
class SellerInitiator(val buyer: Party,
val notary: NodeInfo,
val assetToSell: StateAndRef<OwnableState>,
val price: Amount<Currency>,
val anonymous: Boolean) : FlowLogic<SignedTransaction>() {
class SellerInitiator(private val buyer: Party,
private val notary: NodeInfo,
private val assetToSell: StateAndRef<OwnableState>,
private val price: Amount<Currency>,
private val anonymous: Boolean) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val myParty = if (anonymous) {
serviceHub.keyManagementService.freshKeyAndCert(serviceHub.myInfo.chooseIdentityAndCert(), false)
val myPartyAndCert = if (anonymous) {
serviceHub.keyManagementService.freshKeyAndCert(ourIdentityAndCert, false)
} else {
serviceHub.myInfo.chooseIdentityAndCert()
ourIdentityAndCert
}
send(buyer, TestTx(notary.notaryIdentity, price, anonymous))
return subFlow(Seller(
@ -563,12 +563,12 @@ class TwoPartyTradeFlowTests {
notary,
assetToSell,
price,
myParty))
myPartyAndCert))
}
}
@InitiatedBy(SellerInitiator::class)
class BuyerAcceptor(val seller: Party) : FlowLogic<SignedTransaction>() {
class BuyerAcceptor(private val seller: Party) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val (notary, price, anonymous) = receive<TestTx>(seller).unwrap {

View File

@ -37,6 +37,7 @@ class ScheduledFlowTests {
val PAGE_SIZE = 20
val SORTING = Sort(listOf(Sort.SortColumn(SortAttribute.Standard(Sort.CommonStateAttribute.STATE_REF_TXN_ID), Sort.Direction.DESC)))
}
lateinit var mockNet: MockNetwork
lateinit var notaryNode: StartedNode<MockNetwork.MockNode>
lateinit var nodeA: StartedNode<MockNetwork.MockNode>
@ -59,23 +60,22 @@ class ScheduledFlowTests {
override val participants: List<AbstractParty> = listOf(source, destination)
}
class InsertInitialStateFlow(val destination: Party) : FlowLogic<Unit>() {
class InsertInitialStateFlow(private val destination: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val scheduledState = ScheduledState(serviceHub.clock.instant(),
serviceHub.myInfo.chooseIdentity(), destination)
val scheduledState = ScheduledState(serviceHub.clock.instant(), ourIdentity, destination)
val notary = serviceHub.networkMapCache.getAnyNotary()
val builder = TransactionBuilder(notary)
.addOutputState(scheduledState, DUMMY_PROGRAM_ID)
.addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx, setOf(serviceHub.myInfo.chooseIdentity())))
subFlow(FinalityFlow(tx, setOf(ourIdentity)))
}
}
@SchedulableFlow
class ScheduledFlow(val stateRef: StateRef) : FlowLogic<Unit>() {
class ScheduledFlow(private val stateRef: StateRef) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val state = serviceHub.toStateAndRef<ScheduledState>(stateRef)
@ -90,7 +90,7 @@ class ScheduledFlowTests {
val builder = TransactionBuilder(notary)
.addInputState(state)
.addOutputState(newStateOutput, DUMMY_PROGRAM_ID)
.addCommand(dummyCommand(serviceHub.myInfo.chooseIdentity().owningKey))
.addCommand(dummyCommand(ourIdentity.owningKey))
val tx = serviceHub.signInitialTransaction(builder)
subFlow(FinalityFlow(tx, setOf(scheduledState.source, scheduledState.destination)))
}