Rename protocol to flow.

This commit is contained in:
rick.parker
2016-11-22 11:56:47 +00:00
parent d8e09f7174
commit f68529d1fd
120 changed files with 1157 additions and 1164 deletions

View File

@ -14,6 +14,9 @@ import net.corda.core.node.services.ServiceInfo
import net.corda.core.random63BitValue
import net.corda.core.serialization.serialize
import net.corda.core.utilities.LogHelper
import net.corda.flows.NotaryError
import net.corda.flows.NotaryException
import net.corda.flows.NotaryFlow
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.Node
import net.corda.node.services.config.ConfigHelper
@ -21,9 +24,6 @@ import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.databaseTransaction
import net.corda.protocols.NotaryError
import net.corda.protocols.NotaryException
import net.corda.protocols.NotaryProtocol
import net.corda.testing.freeLocalHostAndPort
import org.junit.After
import org.junit.Before
@ -76,12 +76,12 @@ class DistributedNotaryTests {
tx.toSignedTransaction(false)
}
val buildProtocol = { NotaryProtocol.Client(stx) }
val buildFlow = { NotaryFlow.Client(stx) }
val firstSpend = alice.services.startProtocol(buildProtocol())
val firstSpend = alice.services.startFlow(buildFlow())
firstSpend.resultFuture.get()
val secondSpend = alice.services.startProtocol(buildProtocol())
val secondSpend = alice.services.startFlow(buildFlow())
val ex = assertFailsWith(ExecutionException::class) { secondSpend.resultFuture.get() }
val error = (ex.cause as NotaryException).error as NotaryError.Conflict

View File

@ -91,31 +91,31 @@ interface APIServer {
fun commitTransaction(tx: SerializedBytes<WireTransaction>, signatures: List<DigitalSignature.WithKey>): SecureHash
/**
* This method would not return until the protocol is finished (hence the "Sync").
* This method would not return until the flow is finished (hence the "Sync").
*
* Longer term we'd add an Async version that returns some kind of ProtocolInvocationRef that could be queried and
* Longer term we'd add an Async version that returns some kind of FlowInvocationRef that could be queried and
* would appear on some kind of event message that is broadcast informing of progress.
*
* Will throw exception if protocol fails.
* Will throw exception if flow fails.
*/
fun invokeProtocolSync(type: ProtocolRef, args: Map<String, Any?>): Any?
fun invokeFlowSync(type: FlowRef, args: Map<String, Any?>): Any?
// fun invokeProtocolAsync(type: ProtocolRef, args: Map<String, Any?>): ProtocolInstanceRef
// fun invokeFlowAsync(type: FlowRef, args: Map<String, Any?>): FlowInstanceRef
/**
* Fetch protocols that require a response to some prompt/question by a human (on the "bank" side).
* Fetch flows that require a response to some prompt/question by a human (on the "bank" side).
*/
fun fetchProtocolsRequiringAttention(query: StatesQuery): Map<StateRef, ProtocolRequiringAttention>
fun fetchFlowsRequiringAttention(query: StatesQuery): Map<StateRef, FlowRequiringAttention>
/**
* Provide the response that a protocol is waiting for.
* Provide the response that a flow is waiting for.
*
* @param protocol Should refer to a previously supplied ProtocolRequiringAttention.
* @param stepId Which step of the protocol are we referring too.
* @param choice Should be one of the choices presented in the ProtocolRequiringAttention.
* @param flow Should refer to a previously supplied FlowRequiringAttention.
* @param stepId Which step of the flow are we referring too.
* @param choice Should be one of the choices presented in the FlowRequiringAttention.
* @param args Any arguments required.
*/
fun provideProtocolResponse(protocol: ProtocolInstanceRef, choice: SecureHash, args: Map<String, Any?>)
fun provideFlowResponse(flow: FlowInstanceRef, choice: SecureHash, args: Map<String, Any?>)
}
@ -131,20 +131,20 @@ data class ContractLedgerRef(val hash: SecureHash) : ContractDefRef
/**
* Encapsulates the protocol to be instantiated. e.g. TwoPartyTradeProtocol.Buyer.
* Encapsulates the flow to be instantiated. e.g. TwoPartyTradeFlow.Buyer.
*/
interface ProtocolRef {
interface FlowRef {
}
data class ProtocolClassRef(val className: String) : ProtocolRef
data class FlowClassRef(val className: String) : FlowRef
data class ProtocolInstanceRef(val protocolInstance: SecureHash, val protocolClass: ProtocolClassRef, val protocolStepId: String)
data class FlowInstanceRef(val flowInstance: SecureHash, val flowClass: FlowClassRef, val flowStepId: String)
/**
* Thinking that Instant is OK for short lived protocol deadlines.
* Thinking that Instant is OK for short lived flow deadlines.
*/
data class ProtocolRequiringAttention(val ref: ProtocolInstanceRef, val prompt: String, val choiceIdsToMessages: Map<SecureHash, String>, val dueBy: Instant)
data class FlowRequiringAttention(val ref: FlowInstanceRef, val prompt: String, val choiceIdsToMessages: Map<SecureHash, String>, val dueBy: Instant)
/**

View File

@ -1,7 +1,10 @@
package net.corda.node.internal
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.*
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.DealState
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.linearHeadsOfType
@ -64,25 +67,25 @@ class APIServerImpl(val node: AbstractNode) : APIServer {
throw UnsupportedOperationException()
}
override fun invokeProtocolSync(type: ProtocolRef, args: Map<String, Any?>): Any? {
return invokeProtocolAsync(type, args).get()
override fun invokeFlowSync(type: FlowRef, args: Map<String, Any?>): Any? {
return invokeFlowAsync(type, args).get()
}
private fun invokeProtocolAsync(type: ProtocolRef, args: Map<String, Any?>): ListenableFuture<out Any?> {
if (type is ProtocolClassRef) {
val protocolLogicRef = node.services.protocolLogicRefFactory.createKotlin(type.className, args)
val protocolInstance = node.services.protocolLogicRefFactory.toProtocolLogic(protocolLogicRef)
return node.services.startProtocol(protocolInstance).resultFuture
private fun invokeFlowAsync(type: FlowRef, args: Map<String, Any?>): ListenableFuture<out Any?> {
if (type is FlowClassRef) {
val flowLogicRef = node.services.flowLogicRefFactory.createKotlin(type.className, args)
val flowInstance = node.services.flowLogicRefFactory.toFlowLogic(flowLogicRef)
return node.services.startFlow(flowInstance).resultFuture
} else {
throw UnsupportedOperationException("Unsupported ProtocolRef type: $type")
throw UnsupportedOperationException("Unsupported FlowRef type: $type")
}
}
override fun fetchProtocolsRequiringAttention(query: StatesQuery): Map<StateRef, ProtocolRequiringAttention> {
override fun fetchFlowsRequiringAttention(query: StatesQuery): Map<StateRef, FlowRequiringAttention> {
throw UnsupportedOperationException()
}
override fun provideProtocolResponse(protocol: ProtocolInstanceRef, choice: SecureHash, args: Map<String, Any?>) {
override fun provideFlowResponse(flow: FlowInstanceRef, choice: SecureHash, args: Map<String, Any?>) {
throw UnsupportedOperationException()
}

View File

@ -8,17 +8,20 @@ import com.google.common.util.concurrent.SettableFuture
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.crypto.X509Utilities
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.FlowStateMachine
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.node.services.NetworkMapCache.MapChangeType
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolLogicRefFactory
import net.corda.core.protocols.ProtocolStateMachine
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.sendRequest
import net.corda.node.api.APIServer
import net.corda.node.services.api.*
import net.corda.node.services.config.NodeConfiguration
@ -30,7 +33,7 @@ import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.RPCOps
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.RegistrationResponse
import net.corda.node.services.network.NodeRegistration
import net.corda.node.services.network.PersistentNetworkMapService
@ -45,9 +48,6 @@ import net.corda.node.utilities.AddOrRemove
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.databaseTransaction
import net.corda.protocols.CashCommand
import net.corda.protocols.CashProtocol
import net.corda.protocols.sendRequest
import org.jetbrains.exposed.sql.Database
import org.slf4j.Logger
import java.nio.file.FileAlreadyExistsException
@ -66,7 +66,7 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
* I/O), or a mock implementation suitable for unit test environments.
*
* Marked as SingletonSerializeAsToken to prevent the invisible reference to AbstractNode in the ServiceHub accidentally
* sweeping up the Node into the Kryo checkpoint serialization via any protocols holding a reference to ServiceHub.
* sweeping up the Node into the Kryo checkpoint serialization via any flows holding a reference to ServiceHub.
*/
// TODO: Where this node is the initial network map service, currently no networkMapService is provided.
// In theory the NodeInfo for the node should be passed in, instead, however currently this is constructed by the
@ -95,7 +95,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
protected val _servicesThatAcceptUploads = ArrayList<AcceptsFileUpload>()
val servicesThatAcceptUploads: List<AcceptsFileUpload> = _servicesThatAcceptUploads
private val protocolFactories = ConcurrentHashMap<Class<*>, (Party) -> ProtocolLogic<*>>()
private val flowFactories = ConcurrentHashMap<Class<*>, (Party) -> FlowLogic<*>>()
protected val partyKeys = mutableSetOf<KeyPair>()
val services = object : ServiceHubInternal() {
@ -112,18 +112,18 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
// Internal only
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val protocolLogicRefFactory: ProtocolLogicRefFactory get() = protocolLogicFactory
override val flowLogicRefFactory: FlowLogicRefFactory get() = flowLogicFactory
override fun <T> startProtocol(logic: ProtocolLogic<T>): ProtocolStateMachine<T> = smm.add(logic)
override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> = smm.add(logic)
override fun registerProtocolInitiator(markerClass: KClass<*>, protocolFactory: (Party) -> ProtocolLogic<*>) {
require(markerClass !in protocolFactories) { "${markerClass.java.name} has already been used to register a protocol" }
log.info("Registering protocol ${markerClass.java.name}")
protocolFactories[markerClass.java] = protocolFactory
override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) {
require(markerClass !in flowFactories) { "${markerClass.java.name} has already been used to register a flow" }
log.info("Registering flow ${markerClass.java.name}")
flowFactories[markerClass.java] = flowFactory
}
override fun getProtocolFactory(markerClass: Class<*>): ((Party) -> ProtocolLogic<*>)? {
return protocolFactories[markerClass]
override fun getFlowFactory(markerClass: Class<*>): ((Party) -> FlowLogic<*>)? {
return flowFactories[markerClass]
}
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
@ -149,7 +149,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
lateinit var netMapCache: NetworkMapCache
lateinit var api: APIServer
lateinit var scheduler: NodeSchedulerService
lateinit var protocolLogicFactory: ProtocolLogicRefFactory
lateinit var flowLogicFactory: FlowLogicRefFactory
lateinit var schemas: SchemaService
val customServices: ArrayList<Any> = ArrayList()
protected val runOnStop: ArrayList<Runnable> = ArrayList()
@ -204,8 +204,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
// the identity key. But the infrastructure to make that easy isn't here yet.
keyManagement = makeKeyManagementService()
api = APIServerImpl(this@AbstractNode)
protocolLogicFactory = initialiseProtocolLogicFactory()
scheduler = NodeSchedulerService(database, services, protocolLogicFactory)
flowLogicFactory = initialiseFlowLogicFactory()
scheduler = NodeSchedulerService(database, services, flowLogicFactory)
val tokenizableServices = mutableListOf(storage, net, vault, keyManagement, identity, platformClock, scheduler)
@ -309,31 +309,32 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
}
}
private val defaultProtocolWhiteList: Map<Class<out ProtocolLogic<*>>, Set<Class<*>>> = mapOf(
CashProtocol::class.java to setOf(
private val defaultFlowWhiteList: Map<Class<out FlowLogic<*>>, Set<Class<*>>> = mapOf(
CashFlow::class.java to setOf(
CashCommand.IssueCash::class.java,
CashCommand.PayCash::class.java,
CashCommand.ExitCash::class.java
)
)
private fun initialiseProtocolLogicFactory(): ProtocolLogicRefFactory {
val protocolWhitelist = HashMap<String, Set<String>>()
for ((protocolClass, extraArgumentTypes) in defaultProtocolWhiteList) {
private fun initialiseFlowLogicFactory(): FlowLogicRefFactory {
val flowWhitelist = HashMap<String, Set<String>>()
for ((flowClass, extraArgumentTypes) in defaultFlowWhiteList) {
val argumentWhitelistClassNames = HashSet(extraArgumentTypes.map { it.name })
protocolClass.constructors.forEach {
flowClass.constructors.forEach {
it.parameters.mapTo(argumentWhitelistClassNames) { it.type.name }
}
protocolWhitelist.merge(protocolClass.name, argumentWhitelistClassNames, { x, y -> x + y })
flowWhitelist.merge(flowClass.name, argumentWhitelistClassNames, { x, y -> x + y })
}
for (plugin in pluginRegistries) {
for ((className, classWhitelist) in plugin.requiredProtocols) {
protocolWhitelist.merge(className, classWhitelist, { x, y -> x + y })
for ((className, classWhitelist) in plugin.requiredFlows) {
flowWhitelist.merge(className, classWhitelist, { x, y -> x + y })
}
}
return ProtocolLogicRefFactory(protocolWhitelist)
return FlowLogicRefFactory(flowWhitelist)
}
private fun buildPluginServices(tokenizableServices: MutableList<Any>): List<Any> {
@ -406,7 +407,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
val reg = NodeRegistration(info, instant.toEpochMilli(), type, expires)
val legalIdentityKey = obtainLegalIdentityKey()
val request = NetworkMapService.RegistrationRequest(reg.toWire(legalIdentityKey.private), net.myAddress)
return net.sendRequest(REGISTER_PROTOCOL_TOPIC, request, networkMapAddr)
return net.sendRequest(REGISTER_FLOW_TOPIC, request, networkMapAddr)
}
protected open fun makeKeyManagementService(): KeyManagementService = PersistentKeyManagementService(partyKeys)

View File

@ -3,20 +3,18 @@ package net.corda.node.internal
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.keys
import net.corda.core.crypto.toStringShort
import net.corda.core.flows.FlowLogic
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.toObservable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.messaging.*
import net.corda.node.services.startProtocolPermission
import net.corda.node.services.statemachine.ProtocolStateMachineImpl
import net.corda.node.services.startFlowPermission
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.databaseTransaction
import org.jetbrains.exposed.sql.Database
@ -52,7 +50,7 @@ class CordaRPCOpsImpl(
override fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>> {
val (allStateMachines, changes) = smm.track()
return Pair(
allStateMachines.map { StateMachineInfo.fromProtocolStateMachineImpl(it) },
allStateMachines.map { StateMachineInfo.fromFlowStateMachineImpl(it) },
changes.map { StateMachineUpdate.fromStateMachineChange(it) }
)
}
@ -79,11 +77,11 @@ class CordaRPCOpsImpl(
}
}
// TODO: Check that this protocol is annotated as being intended for RPC invocation
override fun <T: Any> startProtocolDynamic(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ProtocolHandle<T> {
requirePermission(startProtocolPermission(logicType))
val stateMachine = services.invokeProtocolAsync(logicType, *args) as ProtocolStateMachineImpl<T>
return ProtocolHandle(
// TODO: Check that this flow is annotated as being intended for RPC invocation
override fun <T : Any> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> {
requirePermission(startFlowPermission(logicType))
val stateMachine = services.invokeFlowAsync(logicType, *args) as FlowStateMachineImpl<T>
return FlowHandle(
id = stateMachine.id,
progress = stateMachine.logic.progressTracker?.changes ?: Observable.empty<ProgressTracker.Change>(),
returnValue = stateMachine.resultFuture.toObservable()

View File

@ -61,7 +61,7 @@ class ConfigurationException(message: String) : Exception(message)
* network map service, while bootstrapping a network.
* @param advertisedServices The services this node advertises. This must be a subset of the services it runs,
* but nodes are not required to advertise services they run (hence subset).
* @param clock The clock used within the node and by all protocols etc.
* @param clock The clock used within the node and by all flows etc.
*/
class Node(override val configuration: FullNodeConfiguration, networkMapAddress: SingleMessageRecipient?,
advertisedServices: Set<ServiceInfo>, clock: Clock = NodeClock()) : AbstractNode(configuration, networkMapAddress, advertisedServices, clock) {
@ -102,7 +102,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
// layer, which can then react to the backpressure. Artemis MQ in particular knows how to do flow control by paging
// messages to disk rather than letting us run out of RAM.
//
// The primary work done by the server thread is execution of protocol logics, and related
// The primary work done by the server thread is execution of flow logics, and related
// serialisation/deserialisation work.
override val serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread", 1)

View File

@ -3,8 +3,7 @@ package net.corda.node.services
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PluginServiceHub
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.services.api.ServiceHubInternal
import net.corda.protocols.NotaryChangeProtocol
import net.corda.flows.NotaryChangeFlow
object NotaryChange {
class Plugin : CordaPluginRegistry() {
@ -13,11 +12,11 @@ object NotaryChange {
/**
* A service that monitors the network for requests for changing the notary of a state,
* and immediately runs the [NotaryChangeProtocol] if the auto-accept criteria are met.
* and immediately runs the [NotaryChangeFlow] if the auto-accept criteria are met.
*/
class Service(services: PluginServiceHub) : SingletonSerializeAsToken() {
init {
services.registerProtocolInitiator(NotaryChangeProtocol.Instigator::class) { NotaryChangeProtocol.Acceptor(it) }
services.registerFlowInitiator(NotaryChangeFlow.Instigator::class) { NotaryChangeFlow.Acceptor(it) }
}
}
}

View File

@ -1,7 +1,7 @@
package net.corda.node.services
import com.typesafe.config.Config
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.flows.FlowLogic
import net.corda.node.services.config.getListOrElse
/**
@ -41,5 +41,5 @@ data class User(val username: String, val password: String, val permissions: Set
override fun toString(): String = "${javaClass.simpleName}($username, permissions=$permissions)"
}
fun <P : ProtocolLogic<*>> startProtocolPermission(clazz: Class<P>) = "StartProtocol.${clazz.name}"
inline fun <reified P : ProtocolLogic<*>> startProtocolPermission(): String = startProtocolPermission(P::class.java)
fun <P : FlowLogic<*>> startFlowPermission(clazz: Class<P>) = "StartFlow.${clazz.name}"
inline fun <reified P : FlowLogic<*>> startFlowPermission(): String = startFlowPermission(P::class.java)

View File

@ -7,7 +7,7 @@ import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.protocols.ServiceRequestMessage
import net.corda.flows.ServiceRequestMessage
import javax.annotation.concurrent.ThreadSafe
/**

View File

@ -2,7 +2,7 @@ package net.corda.node.services.api
import net.corda.core.crypto.SecureHash
import net.corda.core.serialization.SerializedBytes
import net.corda.node.services.statemachine.ProtocolStateMachineImpl
import net.corda.node.services.statemachine.FlowStateMachineImpl
/**
* Thread-safe storage of fiber checkpoints.
@ -30,7 +30,7 @@ interface CheckpointStorage {
}
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).
class Checkpoint(val serializedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>) {
class Checkpoint(val serializedFiber: SerializedBytes<FlowStateMachineImpl<*>>) {
val id: SecureHash get() = serializedFiber.hash

View File

@ -1,14 +1,14 @@
package net.corda.node.services.api
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.FlowStateMachine
import net.corda.core.messaging.MessagingService
import net.corda.core.node.PluginServiceHub
import net.corda.core.node.services.TxWritableStorageService
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolLogicRefFactory
import net.corda.core.protocols.ProtocolStateMachine
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.statemachine.ProtocolStateMachineImpl
import net.corda.node.services.statemachine.FlowStateMachineImpl
import org.slf4j.LoggerFactory
interface MessagingServiceInternal : MessagingService {
@ -38,7 +38,7 @@ private val log = LoggerFactory.getLogger(ServiceHubInternal::class.java)
abstract class ServiceHubInternal : PluginServiceHub {
abstract val monitoringService: MonitoringService
abstract val protocolLogicRefFactory: ProtocolLogicRefFactory
abstract val flowLogicRefFactory: FlowLogicRefFactory
abstract val schemaService: SchemaService
abstract override val networkService: MessagingServiceInternal
@ -51,7 +51,7 @@ abstract class ServiceHubInternal : PluginServiceHub {
* @param txs The transactions to record.
*/
internal fun recordTransactionsInternal(writableStorageService: TxWritableStorageService, txs: Iterable<SignedTransaction>) {
val stateMachineRunId = ProtocolStateMachineImpl.currentStateMachine()?.id
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
if (stateMachineRunId != null) {
txs.forEach {
storageService.stateMachineRecordedTransactionMapping.addMapping(stateMachineRunId, it.id)
@ -68,12 +68,12 @@ abstract class ServiceHubInternal : PluginServiceHub {
* between SMM and the scheduler. That particular problem should also be resolved by the service manager work
* itself, at which point this method would not be needed (by the scheduler).
*/
abstract fun <T> startProtocol(logic: ProtocolLogic<T>): ProtocolStateMachine<T>
abstract fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T>
override fun <T : Any> invokeProtocolAsync(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ProtocolStateMachine<T> {
val logicRef = protocolLogicRefFactory.create(logicType, *args)
override fun <T : Any> invokeFlowAsync(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowStateMachine<T> {
val logicRef = flowLogicRefFactory.create(logicType, *args)
@Suppress("UNCHECKED_CAST")
val logic = protocolLogicRefFactory.toProtocolLogic(logicRef) as ProtocolLogic<T>
return startProtocol(logic)
val logic = flowLogicRefFactory.toFlowLogic(logicRef) as FlowLogic<T>
return startFlow(logic)
}
}

View File

@ -2,21 +2,21 @@ package net.corda.node.services.events
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.SettableFuture
import kotlinx.support.jdk8.collections.compute
import net.corda.core.ThreadBox
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledActivity
import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.contracts.StateRef
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.node.services.SchedulerService
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolLogicRefFactory
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.utilities.*
import kotlinx.support.jdk8.collections.compute
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
@ -38,14 +38,14 @@ import javax.annotation.concurrent.ThreadSafe
* but that starts to sound a lot like off-ledger state.
*
* @param services Core node services.
* @param protocolLogicRefFactory Factory for restoring [ProtocolLogic] instances from references.
* @param flowLogicRefFactory Factory for restoring [FlowLogic] instances from references.
* @param schedulerTimerExecutor The executor the scheduler blocks on waiting for the clock to advance to the next
* activity. Only replace this for unit testing purposes. This is not the executor the [ProtocolLogic] is launched on.
* activity. Only replace this for unit testing purposes. This is not the executor the [FlowLogic] is launched on.
*/
@ThreadSafe
class NodeSchedulerService(private val database: Database,
private val services: ServiceHubInternal,
private val protocolLogicRefFactory: ProtocolLogicRefFactory,
private val flowLogicRefFactory: FlowLogicRefFactory,
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor())
: SchedulerService, SingletonSerializeAsToken() {
@ -87,7 +87,7 @@ class NodeSchedulerService(private val database: Database,
private val mutex = ThreadBox(InnerState())
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a protocol.
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
fun start() {
mutex.locked {
recomputeEarliest()
@ -152,10 +152,10 @@ class NodeSchedulerService(private val database: Database,
}
private fun onTimeReached(scheduledState: ScheduledStateRef) {
services.startProtocol(RunScheduled(scheduledState, this@NodeSchedulerService))
services.startFlow(RunScheduled(scheduledState, this@NodeSchedulerService))
}
class RunScheduled(val scheduledState: ScheduledStateRef, val scheduler: NodeSchedulerService) : ProtocolLogic<Unit>() {
class RunScheduled(val scheduledState: ScheduledStateRef, val scheduler: NodeSchedulerService) : FlowLogic<Unit>() {
companion object {
object RUNNING : ProgressTracker.Step("Running scheduled...")
@ -169,9 +169,9 @@ class NodeSchedulerService(private val database: Database,
progressTracker.currentStep = RUNNING
// Ensure we are still scheduled.
val scheduledLogic: ProtocolLogic<*>? = getScheduledLogic()
val scheduledLogic: FlowLogic<*>? = getScheduledLogic()
if(scheduledLogic != null) {
subProtocol(scheduledLogic)
subFlow(scheduledLogic)
}
}
@ -180,16 +180,16 @@ class NodeSchedulerService(private val database: Database,
val state = txState.data as SchedulableState
return try {
// This can throw as running contract code.
state.nextScheduledActivity(scheduledState.ref, scheduler.protocolLogicRefFactory)
state.nextScheduledActivity(scheduledState.ref, scheduler.flowLogicRefFactory)
} catch(e: Exception) {
logger.error("Attempt to run scheduled state $scheduledState resulted in error.", e)
null
}
}
private fun getScheduledLogic(): ProtocolLogic<*>? {
private fun getScheduledLogic(): FlowLogic<*>? {
val scheduledActivity = getScheduledaActivity()
var scheduledLogic: ProtocolLogic<*>? = null
var scheduledLogic: FlowLogic<*>? = null
scheduler.mutex.locked {
// need to remove us from those scheduled, but only if we are still next
scheduledStates.compute(scheduledState.ref) { ref, value ->
@ -201,11 +201,11 @@ class NodeSchedulerService(private val database: Database,
logger.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.")
ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt)
} else {
// TODO: ProtocolLogicRefFactory needs to sort out the class loader etc
val logic = scheduler.protocolLogicRefFactory.toProtocolLogic(scheduledActivity.logicRef)
logger.trace { "Scheduler starting ProtocolLogic $logic" }
// ProtocolLogic will be checkpointed by the time this returns.
//scheduler.services.startProtocolAndForget(logic)
// TODO: FlowLogicRefFactory needs to sort out the class loader etc
val logic = scheduler.flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
logger.trace { "Scheduler starting FlowLogic $logic" }
// FlowLogic will be checkpointed by the time this returns.
//scheduler.services.startFlowAndForget(logic)
scheduledLogic = logic
null
}

View File

@ -4,7 +4,7 @@ import net.corda.core.contracts.ContractState
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.contracts.StateAndRef
import net.corda.core.protocols.ProtocolLogicRefFactory
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.node.services.api.ServiceHubInternal
/**
@ -15,14 +15,14 @@ class ScheduledActivityObserver(val services: ServiceHubInternal) {
init {
services.vaultService.updates.subscribe { update ->
update.consumed.forEach { services.schedulerService.unscheduleStateActivity(it) }
update.produced.forEach { scheduleStateActivity(it, services.protocolLogicRefFactory) }
update.produced.forEach { scheduleStateActivity(it, services.flowLogicRefFactory) }
}
}
private fun scheduleStateActivity(produced: StateAndRef<ContractState>, protocolLogicRefFactory: ProtocolLogicRefFactory) {
private fun scheduleStateActivity(produced: StateAndRef<ContractState>, flowLogicRefFactory: FlowLogicRefFactory) {
val producedState = produced.state.data
if (producedState is SchedulableState) {
val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, protocolLogicRefFactory)?.scheduledAt } ?: return
val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, flowLogicRefFactory)?.scheduledAt } ?: return
services.schedulerService.scheduleStateActivity(ScheduledStateRef(produced.ref, scheduledAt))
}
}

View File

@ -15,7 +15,7 @@ import javax.annotation.concurrent.ThreadSafe
*
* - Probably be accessed via the network layer as an internal node service i.e. via a message queue, so it can run
* on a separate/firewalled service.
* - Use the protocol framework so requests to fetch keys can be suspended whilst a human signs off on the request.
* - Use the flow framework so requests to fetch keys can be suspended whilst a human signs off on the request.
* - Use deterministic key derivation.
* - Possibly have some sort of TREZOR-like two-factor authentication ability.
*

View File

@ -158,7 +158,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
log.error("Queue created for a peer that we don't know from the network map: $queueName")
}
} catch (e: AddressFormatException) {
log.error("Protocol violation: Could not parse queue name as Base 58: $queueName")
log.error("Flow violation: Could not parse queue name as Base 58: $queueName")
}
}
}

View File

@ -3,29 +3,29 @@ package net.corda.node.services.messaging
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.node.services.Vault
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.StateMachineRunId
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.statemachine.ProtocolStateMachineImpl
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.AddOrRemove
import rx.Observable
data class StateMachineInfo(
val id: StateMachineRunId,
val protocolLogicClassName: String,
val flowLogicClassName: String,
val progressTrackerStepAndUpdates: Pair<String, Observable<String>>?
) {
companion object {
fun fromProtocolStateMachineImpl(psm: ProtocolStateMachineImpl<*>): StateMachineInfo {
fun fromFlowStateMachineImpl(psm: FlowStateMachineImpl<*>): StateMachineInfo {
return StateMachineInfo(
id = psm.id,
protocolLogicClassName = psm.logic.javaClass.simpleName,
flowLogicClassName = psm.logic.javaClass.simpleName,
progressTrackerStepAndUpdates = psm.logic.track()
)
}
@ -42,7 +42,7 @@ sealed class StateMachineUpdate(val id: StateMachineRunId) {
AddOrRemove.ADD -> {
val stateMachineInfo = StateMachineInfo(
id = change.id,
protocolLogicClassName = change.logic.javaClass.simpleName,
flowLogicClassName = change.logic.javaClass.simpleName,
progressTrackerStepAndUpdates = change.logic.track()
)
StateMachineUpdate.Added(stateMachineInfo)
@ -92,11 +92,11 @@ interface CordaRPCOps : RPCOps {
fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>>
/**
* Start the given protocol with the given arguments, returning an [Observable] with a single observation of the
* result of running the protocol.
* Start the given flow with the given arguments, returning an [Observable] with a single observation of the
* result of running the flow.
*/
@RPCReturnsObservables
fun <T: Any> startProtocolDynamic(logicType: Class<out ProtocolLogic<T>>, vararg args: Any?): ProtocolHandle<T>
fun <T : Any> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T>
/**
* Returns Node's identity, assuming this will not change while the node is running.
@ -115,46 +115,50 @@ interface CordaRPCOps : RPCOps {
}
/**
* These allow type safe invocations of protocols from Kotlin, e.g.:
* These allow type safe invocations of flows from Kotlin, e.g.:
*
* val rpc: CordaRPCOps = (..)
* rpc.startProtocol(::ResolveTransactionsProtocol, setOf<SecureHash>(), aliceIdentity)
* rpc.startFlow(::ResolveTransactionsFlow, setOf<SecureHash>(), aliceIdentity)
*
* Note that the passed in constructor function is only used for unification of other type parameters and reification of
* the Class instance of the protocol. This could be changed to use the constructor function directly.
* the Class instance of the flow. This could be changed to use the constructor function directly.
*/
inline fun <T : Any, reified R : ProtocolLogic<T>> CordaRPCOps.startProtocol(
inline fun <T : Any, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
protocolConstructor: () -> R
) = startProtocolDynamic(R::class.java)
inline fun <T : Any, A, reified R : ProtocolLogic<T>> CordaRPCOps.startProtocol(
flowConstructor: () -> R
) = startFlowDynamic(R::class.java)
inline fun <T : Any, A, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
protocolConstructor: (A) -> R,
flowConstructor: (A) -> R,
arg0: A
) = startProtocolDynamic(R::class.java, arg0)
inline fun <T : Any, A, B, reified R : ProtocolLogic<T>> CordaRPCOps.startProtocol(
) = startFlowDynamic(R::class.java, arg0)
inline fun <T : Any, A, B, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
protocolConstructor: (A, B) -> R,
flowConstructor: (A, B) -> R,
arg0: A,
arg1: B
) = startProtocolDynamic(R::class.java, arg0, arg1)
inline fun <T : Any, A, B, C, reified R: ProtocolLogic<T>> CordaRPCOps.startProtocol(
) = startFlowDynamic(R::class.java, arg0, arg1)
inline fun <T : Any, A, B, C, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
protocolConstructor: (A, B, C) -> R,
flowConstructor: (A, B, C) -> R,
arg0: A,
arg1: B,
arg2: C
) = startProtocolDynamic(R::class.java, arg0, arg1, arg2)
inline fun <T : Any, A, B, C, D, reified R : ProtocolLogic<T>> CordaRPCOps.startProtocol(
) = startFlowDynamic(R::class.java, arg0, arg1, arg2)
inline fun <T : Any, A, B, C, D, reified R : FlowLogic<T>> CordaRPCOps.startFlow(
@Suppress("UNUSED_PARAMETER")
protocolConstructor: (A, B, C, D) -> R,
flowConstructor: (A, B, C, D) -> R,
arg0: A,
arg1: B,
arg2: C,
arg3: D
) = startProtocolDynamic(R::class.java, arg0, arg1, arg2, arg3)
) = startFlowDynamic(R::class.java, arg0, arg1, arg2, arg3)
data class ProtocolHandle<A>(
data class FlowHandle<A>(
val id: StateMachineRunId,
val progress: Observable<ProgressTracker.Change>,
val returnValue: Observable<A>

View File

@ -19,14 +19,14 @@ import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.StateMachineRunId
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.protocols.StateMachineRunId
import net.corda.core.serialization.*
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.flows.CashFlowResult
import net.corda.node.services.User
import net.corda.protocols.CashProtocolResult
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.apache.activemq.artemis.api.core.SimpleString
@ -181,8 +181,8 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(Cash.Clauses.ConserveAmount::class.java)
register(listOf(Unit).javaClass) // SingletonList
register(setOf(Unit).javaClass) // SingletonSet
register(CashProtocolResult.Success::class.java)
register(CashProtocolResult.Failed::class.java)
register(CashFlowResult.Success::class.java)
register(CashFlowResult.Failed::class.java)
register(ServiceEntry::class.java)
register(NodeInfo::class.java)
register(PhysicalLocation::class.java)
@ -215,7 +215,7 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(Array<StackTraceElement>::class.java, read = { kryo, input -> emptyArray() }, write = { kryo, output, o -> })
register(Collections.unmodifiableList(emptyList<String>()).javaClass)
register(PermissionException::class.java)
register(ProtocolHandle::class.java)
register(FlowHandle::class.java)
register(KryoException::class.java)
register(StringBuffer::class.java)
pluginRegistries.forEach { it.registerRPCKryoTypes(this) }

View File

@ -22,12 +22,12 @@ import net.corda.core.randomOrNull
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.node.services.network.NetworkMapService.Companion.FETCH_PROTOCOL_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_PROTOCOL_TOPIC
import net.corda.flows.sendRequest
import net.corda.node.services.network.NetworkMapService.Companion.FETCH_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.FetchMapResponse
import net.corda.node.services.network.NetworkMapService.SubscribeResponse
import net.corda.node.utilities.AddOrRemove
import net.corda.protocols.sendRequest
import rx.Observable
import rx.subjects.PublishSubject
import java.security.SignatureException
@ -106,10 +106,10 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
ifChangedSinceVer: Int?): ListenableFuture<Unit> {
if (subscribe && !registeredForPush) {
// Add handler to the network, for updates received from the remote network map service.
net.addMessageHandler(NetworkMapService.PUSH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID) { message, r ->
net.addMessageHandler(NetworkMapService.PUSH_FLOW_TOPIC, DEFAULT_SESSION_ID) { message, r ->
try {
val req = message.data.deserialize<NetworkMapService.Update>()
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, DEFAULT_SESSION_ID,
val ackMessage = net.createMessage(NetworkMapService.PUSH_ACK_FLOW_TOPIC, DEFAULT_SESSION_ID,
NetworkMapService.UpdateAcknowledge(req.mapVersion, net.myAddress).serialize().bytes)
net.send(ackMessage, req.replyTo)
processUpdatePush(req)
@ -124,7 +124,7 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress)
val future = net.sendRequest<FetchMapResponse>(FETCH_PROTOCOL_TOPIC, req, networkMapAddress).map { resp ->
val future = net.sendRequest<FetchMapResponse>(FETCH_FLOW_TOPIC, req, networkMapAddress).map { resp ->
// We may not receive any nodes back, if the map hasn't changed since the version specified
resp.nodes?.forEach { processRegistration(it) }
Unit
@ -161,7 +161,7 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
override fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
// Fetch the network map and register for updates at the same time
val req = NetworkMapService.SubscribeRequest(false, net.myAddress)
val future = net.sendRequest<SubscribeResponse>(SUBSCRIPTION_PROTOCOL_TOPIC, req, service.address).map {
val future = net.sendRequest<SubscribeResponse>(SUBSCRIPTION_FLOW_TOPIC, req, service.address).map {
if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed()
}
_registrationFuture.setFuture(future)

View File

@ -20,10 +20,10 @@ import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.flows.ServiceRequestMessage
import net.corda.node.services.api.AbstractNodeService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.utilities.AddOrRemove
import net.corda.protocols.ServiceRequestMessage
import java.security.PrivateKey
import java.security.SignatureException
import java.time.Instant
@ -51,15 +51,15 @@ interface NetworkMapService {
companion object {
val DEFAULT_EXPIRATION_PERIOD = Period.ofWeeks(4)
val FETCH_PROTOCOL_TOPIC = "platform.network_map.fetch"
val QUERY_PROTOCOL_TOPIC = "platform.network_map.query"
val REGISTER_PROTOCOL_TOPIC = "platform.network_map.register"
val SUBSCRIPTION_PROTOCOL_TOPIC = "platform.network_map.subscribe"
val FETCH_FLOW_TOPIC = "platform.network_map.fetch"
val QUERY_FLOW_TOPIC = "platform.network_map.query"
val REGISTER_FLOW_TOPIC = "platform.network_map.register"
val SUBSCRIPTION_FLOW_TOPIC = "platform.network_map.subscribe"
// Base topic used when pushing out updates to the network map. Consumed, for example, by the map cache.
// When subscribing to these updates, remember they must be acknowledged
val PUSH_PROTOCOL_TOPIC = "platform.network_map.push"
val PUSH_FLOW_TOPIC = "platform.network_map.push"
// Base topic for messages acknowledging pushed updates
val PUSH_ACK_PROTOCOL_TOPIC = "platform.network_map.push_ack"
val PUSH_ACK_FLOW_TOPIC = "platform.network_map.push_ack"
val logger = loggerFor<NetworkMapService>()
@ -142,19 +142,19 @@ abstract class AbstractNetworkMapService
protected fun setup() {
// Register message handlers
handlers += addMessageHandler(NetworkMapService.FETCH_PROTOCOL_TOPIC,
handlers += addMessageHandler(NetworkMapService.FETCH_FLOW_TOPIC,
{ req: NetworkMapService.FetchMapRequest -> processFetchAllRequest(req) }
)
handlers += addMessageHandler(NetworkMapService.QUERY_PROTOCOL_TOPIC,
handlers += addMessageHandler(NetworkMapService.QUERY_FLOW_TOPIC,
{ req: NetworkMapService.QueryIdentityRequest -> processQueryRequest(req) }
)
handlers += addMessageHandler(NetworkMapService.REGISTER_PROTOCOL_TOPIC,
handlers += addMessageHandler(NetworkMapService.REGISTER_FLOW_TOPIC,
{ req: NetworkMapService.RegistrationRequest -> processRegistrationChangeRequest(req) }
)
handlers += addMessageHandler(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC,
handlers += addMessageHandler(NetworkMapService.SUBSCRIPTION_FLOW_TOPIC,
{ req: NetworkMapService.SubscribeRequest -> processSubscriptionRequest(req) }
)
handlers += net.addMessageHandler(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, DEFAULT_SESSION_ID) { message, r ->
handlers += net.addMessageHandler(NetworkMapService.PUSH_ACK_FLOW_TOPIC, DEFAULT_SESSION_ID) { message, r ->
val req = message.data.deserialize<NetworkMapService.UpdateAcknowledge>()
processAcknowledge(req)
}
@ -200,7 +200,7 @@ abstract class AbstractNetworkMapService
// to a MessageRecipientGroup that nodes join/leave, rather than the network map
// service itself managing the group
val update = NetworkMapService.Update(wireReg, mapVersion, net.myAddress).serialize().bytes
val message = net.createMessage(NetworkMapService.PUSH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, update)
val message = net.createMessage(NetworkMapService.PUSH_FLOW_TOPIC, DEFAULT_SESSION_ID, update)
subscribers.locked {
val toRemove = mutableListOf<SingleMessageRecipient>()

View File

@ -3,9 +3,9 @@ package net.corda.node.services.persistence
import net.corda.core.ThreadBox
import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.StateMachineRunId
import net.corda.core.node.services.StateMachineRecordedTransactionMappingStorage
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.protocols.StateMachineRunId
import net.corda.node.utilities.*
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
@ -17,7 +17,7 @@ import javax.annotation.concurrent.ThreadSafe
* Database storage of a txhash -> state machine id mapping.
*
* Mappings are added as transactions are persisted by [ServiceHub.recordTransaction], and never deleted. Used in the
* RPC API to correlate transaction creation with protocols.
* RPC API to correlate transaction creation with flows.
*
*/
@ThreadSafe

View File

@ -2,14 +2,13 @@ package net.corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PluginServiceHub
import net.corda.core.node.recordTransactions
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.ServiceHubInternal
import net.corda.protocols.*
import net.corda.flows.*
import java.io.InputStream
import javax.annotation.concurrent.ThreadSafe
@ -41,16 +40,16 @@ object DataVending {
class TransactionRejectedError(msg: String) : Exception(msg)
init {
services.registerProtocolInitiator(FetchTransactionsProtocol::class, ::FetchTransactionsHandler)
services.registerProtocolInitiator(FetchAttachmentsProtocol::class, ::FetchAttachmentsHandler)
services.registerProtocolInitiator(BroadcastTransactionProtocol::class, ::NotifyTransactionHandler)
services.registerFlowInitiator(FetchTransactionsFlow::class, ::FetchTransactionsHandler)
services.registerFlowInitiator(FetchAttachmentsFlow::class, ::FetchAttachmentsHandler)
services.registerFlowInitiator(BroadcastTransactionFlow::class, ::NotifyTransactionHandler)
}
private class FetchTransactionsHandler(val otherParty: Party) : ProtocolLogic<Unit>() {
private class FetchTransactionsHandler(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val request = receive<FetchDataProtocol.Request>(otherParty).unwrap {
val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
require(it.hashes.isNotEmpty())
it
}
@ -66,10 +65,10 @@ object DataVending {
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
private class FetchAttachmentsHandler(val otherParty: Party) : ProtocolLogic<Unit>() {
private class FetchAttachmentsHandler(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val request = receive<FetchDataProtocol.Request>(otherParty).unwrap {
val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
require(it.hashes.isNotEmpty())
it
}
@ -91,11 +90,11 @@ object DataVending {
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
class NotifyTransactionHandler(val otherParty: Party) : ProtocolLogic<Unit>() {
class NotifyTransactionHandler(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val request = receive<BroadcastTransactionProtocol.NotifyTxRequest>(otherParty).unwrap { it }
subProtocol(ResolveTransactionsProtocol(request.tx, otherParty), shareParentSessions = true)
val request = receive<BroadcastTransactionFlow.NotifyTxRequest>(otherParty).unwrap { it }
subFlow(ResolveTransactionsFlow(request.tx, otherParty), shareParentSessions = true)
serviceHub.recordTransactions(request.tx)
}
}

View File

@ -5,7 +5,7 @@ import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.StateMachineRecordedTransactionMappingStorage
import net.corda.core.node.services.StateMachineTransactionMapping
import net.corda.core.protocols.StateMachineRunId
import net.corda.core.flows.StateMachineRunId
import rx.Observable
import rx.subjects.PublishSubject
import java.util.*

View File

@ -7,10 +7,10 @@ import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.crypto.Party
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolSessionException
import net.corda.core.protocols.ProtocolStateMachine
import net.corda.core.protocols.StateMachineRunId
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSessionException
import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.random63BitValue
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.trace
@ -28,9 +28,9 @@ import java.sql.SQLException
import java.util.*
import java.util.concurrent.ExecutionException
class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
val logic: ProtocolLogic<R>,
scheduler: FiberScheduler) : Fiber<R>("protocol", scheduler), ProtocolStateMachine<R> {
class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val logic: FlowLogic<R>,
scheduler: FiberScheduler) : Fiber<R>("flow", scheduler), FlowStateMachine<R> {
companion object {
// Used to work around a small limitation in Quasar.
@ -41,14 +41,14 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
}
/**
* Return the current [ProtocolStateMachineImpl] or null if executing outside of one.
* Return the current [FlowStateMachineImpl] or null if executing outside of one.
*/
fun currentStateMachine(): ProtocolStateMachineImpl<*>? = Strand.currentStrand() as? ProtocolStateMachineImpl<*>
fun currentStateMachine(): FlowStateMachineImpl<*>? = Strand.currentStrand() as? FlowStateMachineImpl<*>
}
// These fields shouldn't be serialised, so they are marked @Transient.
@Transient lateinit override var serviceHub: ServiceHubInternal
@Transient internal lateinit var actionOnSuspend: (ProtocolIORequest) -> Unit
@Transient internal lateinit var actionOnSuspend: (FlowIORequest) -> Unit
@Transient internal lateinit var actionOnEnd: () -> Unit
@Transient internal lateinit var database: Database
@Transient internal var fromCheckpoint: Boolean = false
@ -73,10 +73,10 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
}
}
internal val openSessions = HashMap<Pair<ProtocolLogic<*>, Party>, ProtocolSession>()
internal val openSessions = HashMap<Pair<FlowLogic<*>, Party>, FlowSession>()
init {
logic.psm = this
logic.fsm = this
name = id.toString()
}
@ -122,8 +122,8 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
override fun <T : Any> sendAndReceive(otherParty: Party,
payload: Any,
receiveType: Class<T>,
sessionProtocol: ProtocolLogic<*>): UntrustworthyData<T> {
val (session, new) = getSession(otherParty, sessionProtocol, payload)
sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
val (session, new) = getSession(otherParty, sessionFlow, payload)
val receivedSessionData = if (new) {
// Only do a receive here as the session init has carried the payload
receiveInternal<SessionData>(session)
@ -137,48 +137,48 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
@Suspendable
override fun <T : Any> receive(otherParty: Party,
receiveType: Class<T>,
sessionProtocol: ProtocolLogic<*>): UntrustworthyData<T> {
val session = getSession(otherParty, sessionProtocol, null).first
sessionFlow: FlowLogic<*>): UntrustworthyData<T> {
val session = getSession(otherParty, sessionFlow, null).first
val receivedSessionData = receiveInternal<SessionData>(session)
return UntrustworthyData(receiveType.cast(receivedSessionData.payload))
}
@Suspendable
override fun send(otherParty: Party, payload: Any, sessionProtocol: ProtocolLogic<*>) {
val (session, new) = getSession(otherParty, sessionProtocol, payload)
override fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) {
val (session, new) = getSession(otherParty, sessionFlow, payload)
if (!new) {
// Don't send the payload again if it was already piggy-backed on a session init
sendInternal(session, createSessionData(session, payload))
}
}
private fun createSessionData(session: ProtocolSession, payload: Any): SessionData {
private fun createSessionData(session: FlowSession, payload: Any): SessionData {
val otherPartySessionId = session.otherPartySessionId
?: throw IllegalStateException("We've somehow held onto an unconfirmed session: $session")
return SessionData(otherPartySessionId, payload)
}
@Suspendable
private fun sendInternal(session: ProtocolSession, message: SessionMessage) {
private fun sendInternal(session: FlowSession, message: SessionMessage) {
suspend(SendOnly(session, message))
}
@Suspendable
private inline fun <reified M : SessionMessage> receiveInternal(session: ProtocolSession): M {
private inline fun <reified M : SessionMessage> receiveInternal(session: FlowSession): M {
return suspendAndExpectReceive(ReceiveOnly(session, M::class.java))
}
private inline fun <reified M : SessionMessage> sendAndReceiveInternal(session: ProtocolSession, message: SessionMessage): M {
private inline fun <reified M : SessionMessage> sendAndReceiveInternal(session: FlowSession, message: SessionMessage): M {
return suspendAndExpectReceive(SendAndReceive(session, message, M::class.java))
}
@Suspendable
private fun getSession(otherParty: Party, sessionProtocol: ProtocolLogic<*>, firstPayload: Any?): Pair<ProtocolSession, Boolean> {
val session = openSessions[Pair(sessionProtocol, otherParty)]
private fun getSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?): Pair<FlowSession, Boolean> {
val session = openSessions[Pair(sessionFlow, otherParty)]
return if (session != null) {
Pair(session, false)
} else {
Pair(startNewSession(otherParty, sessionProtocol, firstPayload), true)
Pair(startNewSession(otherParty, sessionFlow, firstPayload), true)
}
}
@ -189,21 +189,21 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
* multiple public keys, but we **don't support multiple nodes advertising the same legal identity**.
*/
@Suspendable
private fun startNewSession(otherParty: Party, sessionProtocol: ProtocolLogic<*>, firstPayload: Any?) : ProtocolSession {
private fun startNewSession(otherParty: Party, sessionFlow: FlowLogic<*>, firstPayload: Any?): FlowSession {
val node = serviceHub.networkMapCache.getRepresentativeNode(otherParty) ?: throw IllegalArgumentException("Don't know about party $otherParty")
val nodeIdentity = node.legalIdentity
logger.trace { "Initiating a new session with $nodeIdentity (representative of $otherParty)" }
val session = ProtocolSession(sessionProtocol, nodeIdentity, random63BitValue(), null)
openSessions[Pair(sessionProtocol, nodeIdentity)] = session
val counterpartyProtocol = sessionProtocol.getCounterpartyMarker(nodeIdentity).name
val sessionInit = SessionInit(session.ourSessionId, serviceHub.myInfo.legalIdentity, counterpartyProtocol, firstPayload)
val session = FlowSession(sessionFlow, nodeIdentity, random63BitValue(), null)
openSessions[Pair(sessionFlow, nodeIdentity)] = session
val counterpartyFlow = sessionFlow.getCounterpartyMarker(nodeIdentity).name
val sessionInit = SessionInit(session.ourSessionId, serviceHub.myInfo.legalIdentity, counterpartyFlow, firstPayload)
val sessionInitResponse = sendAndReceiveInternal<SessionInitResponse>(session, sessionInit)
if (sessionInitResponse is SessionConfirm) {
session.otherPartySessionId = sessionInitResponse.initiatedSessionId
return session
} else {
sessionInitResponse as SessionReject
throw ProtocolSessionException("Party $nodeIdentity rejected session attempt: ${sessionInitResponse.errorMessage}")
throw FlowSessionException("Party $nodeIdentity rejected session attempt: ${sessionInitResponse.errorMessage}")
}
}
@ -227,7 +227,7 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
if (receivedMessage is SessionEnd) {
openSessions.values.remove(receiveRequest.session)
throw ProtocolSessionException("Counterparty on ${receiveRequest.session.otherParty} has prematurely ended on $receiveRequest")
throw FlowSessionException("Counterparty on ${receiveRequest.session.otherParty} has prematurely ended on $receiveRequest")
} else if (receiveRequest.receiveType.isInstance(receivedMessage)) {
return receiveRequest.receiveType.cast(receivedMessage)
} else {
@ -236,7 +236,7 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
private fun suspend(ioRequest: ProtocolIORequest) {
private fun suspend(ioRequest: FlowIORequest) {
// we have to pass the Thread local Transaction across via a transient field as the Fiber Park swaps them out.
txTrampoline = TransactionManager.currentOrNull()
StrandLocalTransactionManager.setThreadLocalTx(null)

View File

@ -1,38 +1,38 @@
package net.corda.node.services.statemachine
import net.corda.node.services.statemachine.StateMachineManager.ProtocolSession
import net.corda.node.services.statemachine.StateMachineManager.FlowSession
import net.corda.node.services.statemachine.StateMachineManager.SessionMessage
// TODO revisit when Kotlin 1.1 is released and data classes can extend other classes
interface ProtocolIORequest {
interface FlowIORequest {
// This is used to identify where we suspended, in case of message mismatch errors and other things where we
// don't have the original stack trace because it's in a suspended fiber.
val stackTraceInCaseOfProblems: StackSnapshot
val session: ProtocolSession
val session: FlowSession
}
interface SendRequest : ProtocolIORequest {
interface SendRequest : FlowIORequest {
val message: SessionMessage
}
interface ReceiveRequest<T : SessionMessage> : ProtocolIORequest {
interface ReceiveRequest<T : SessionMessage> : FlowIORequest {
val receiveType: Class<T>
}
data class SendAndReceive<T : SessionMessage>(override val session: ProtocolSession,
data class SendAndReceive<T : SessionMessage>(override val session: FlowSession,
override val message: SessionMessage,
override val receiveType: Class<T>) : SendRequest, ReceiveRequest<T> {
@Transient
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
}
data class ReceiveOnly<T : SessionMessage>(override val session: ProtocolSession,
data class ReceiveOnly<T : SessionMessage>(override val session: FlowSession,
override val receiveType: Class<T>) : ReceiveRequest<T> {
@Transient
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
}
data class SendOnly(override val session: ProtocolSession, override val message: SessionMessage) : SendRequest {
data class SendOnly(override val session: FlowSession, override val message: SessionMessage) : SendRequest {
@Transient
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
}

View File

@ -11,11 +11,11 @@ import kotlinx.support.jdk8.collections.removeIf
import net.corda.core.ThreadBox
import net.corda.core.abbreviate
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.TopicSession
import net.corda.core.messaging.send
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolStateMachine
import net.corda.core.protocols.StateMachineRunId
import net.corda.core.random63BitValue
import net.corda.core.serialization.*
import net.corda.core.then
@ -41,8 +41,8 @@ import java.util.concurrent.ExecutionException
import javax.annotation.concurrent.ThreadSafe
/**
* A StateMachineManager is responsible for coordination and persistence of multiple [ProtocolStateMachine] objects.
* Each such object represents an instantiation of a (two-party) protocol that has reached a particular point.
* A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects.
* Each such object represents an instantiation of a (two-party) flow that has reached a particular point.
*
* An implementation of this class will persist state machines to long term storage so they can survive process restarts
* and, if run with a single-threaded executor, will ensure no two state machines run concurrently with each other
@ -51,7 +51,7 @@ import javax.annotation.concurrent.ThreadSafe
* A "state machine" is a class with a single call method. The call method and any others it invokes are rewritten by
* a bytecode rewriting engine called Quasar, to ensure the code can be suspended and resumed at any point.
*
* The SMM will always invoke the protocol fibers on the given [AffinityExecutor], regardless of which thread actually
* The SMM will always invoke the flow fibers on the given [AffinityExecutor], regardless of which thread actually
* starts them via [add].
*
* TODO: Consider the issue of continuation identity more deeply: is it a safe assumption that a serialised
@ -79,7 +79,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val scheduler = FiberScheduler()
data class Change(
val logic: ProtocolLogic<*>,
val logic: FlowLogic<*>,
val addOrRemove: AddOrRemove,
val id: StateMachineRunId
)
@ -88,10 +88,10 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
// property.
private val mutex = ThreadBox(object {
var started = false
val stateMachines = LinkedHashMap<ProtocolStateMachineImpl<*>, Checkpoint>()
val stateMachines = LinkedHashMap<FlowStateMachineImpl<*>, Checkpoint>()
val changesPublisher = PublishSubject.create<Change>()
fun notifyChangeObservers(psm: ProtocolStateMachineImpl<*>, addOrRemove: AddOrRemove) {
fun notifyChangeObservers(psm: FlowStateMachineImpl<*>, addOrRemove: AddOrRemove) {
changesPublisher.onNext(Change(psm.logic, addOrRemove, psm.id))
}
})
@ -105,35 +105,35 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private val metrics = serviceHub.monitoringService.metrics
init {
metrics.register("Protocols.InFlight", Gauge<Int> { mutex.content.stateMachines.size })
metrics.register("Flows.InFlight", Gauge<Int> { mutex.content.stateMachines.size })
}
private val checkpointingMeter = metrics.meter("Protocols.Checkpointing Rate")
private val totalStartedProtocols = metrics.counter("Protocols.Started")
private val totalFinishedProtocols = metrics.counter("Protocols.Finished")
private val checkpointingMeter = metrics.meter("Flows.Checkpointing Rate")
private val totalStartedFlows = metrics.counter("Flows.Started")
private val totalFinishedFlows = metrics.counter("Flows.Finished")
private val openSessions = ConcurrentHashMap<Long, ProtocolSession>()
private val openSessions = ConcurrentHashMap<Long, FlowSession>()
private val recentlyClosedSessions = ConcurrentHashMap<Long, Party>()
// Context for tokenized services in checkpoints
private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryo())
/** Returns a list of all state machines executing the given protocol logic at the top level (subprotocols do not count) */
fun <P : ProtocolLogic<T>, T> findStateMachines(protocolClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
/** Returns a list of all state machines executing the given flow logic at the top level (subflows do not count) */
fun <P : FlowLogic<T>, T> findStateMachines(flowClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
@Suppress("UNCHECKED_CAST")
return mutex.locked {
stateMachines.keys
.map { it.logic }
.filterIsInstance(protocolClass)
.map { it to (it.psm as ProtocolStateMachineImpl<T>).resultFuture }
.filterIsInstance(flowClass)
.map { it to (it.fsm as FlowStateMachineImpl<T>).resultFuture }
}
}
val allStateMachines: List<ProtocolLogic<*>>
val allStateMachines: List<FlowLogic<*>>
get() = mutex.locked { stateMachines.keys.map { it.logic } }
/**
* An observable that emits triples of the changing protocol, the type of change, and a process-specific ID number
* An observable that emits triples of the changing flow, the type of change, and a process-specific ID number
* which may change across restarts.
*/
val changes: Observable<Change>
@ -141,7 +141,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
init {
Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable ->
(fiber as ProtocolStateMachineImpl<*>).logger.error("Caught exception from protocol", throwable)
(fiber as FlowStateMachineImpl<*>).logger.error("Caught exception from flow", throwable)
}
}
@ -179,7 +179,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
* Atomic get snapshot + subscribe. This is needed so we don't miss updates between subscriptions to [changes] and
* calls to [allStateMachines]
*/
fun track(): Pair<List<ProtocolStateMachineImpl<*>>, Observable<Change>> {
fun track(): Pair<List<FlowStateMachineImpl<*>>, Observable<Change>> {
return mutex.locked {
val bufferedChanges = UnicastSubject.create<Change>()
changesPublisher.subscribe(bufferedChanges)
@ -190,7 +190,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun restoreFibersFromCheckpoints() {
mutex.locked {
checkpointStorage.forEach {
// If a protocol is added before start() then don't attempt to restore it
// If a flow is added before start() then don't attempt to restore it
if (!stateMachines.containsValue(it)) {
val fiber = deserializeFiber(it.serializedFiber)
initFiber(fiber)
@ -216,7 +216,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun resumeRestoredFiber(fiber: ProtocolStateMachineImpl<*>) {
private fun resumeRestoredFiber(fiber: FlowStateMachineImpl<*>) {
fiber.openSessions.values.forEach { openSessions[it.ourSessionId] = it }
if (fiber.openSessions.values.any { it.waitingForResponse }) {
fiber.logger.info("Restored, pending on receive")
@ -260,23 +260,23 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
val otherParty = sessionInit.initiatorParty
val otherPartySessionId = sessionInit.initiatorSessionId
try {
val markerClass = Class.forName(sessionInit.protocolName)
val protocolFactory = serviceHub.getProtocolFactory(markerClass)
if (protocolFactory != null) {
val protocol = protocolFactory(otherParty)
val psm = createFiber(protocol)
val session = ProtocolSession(protocol, otherParty, random63BitValue(), otherPartySessionId)
val markerClass = Class.forName(sessionInit.flowName)
val flowFactory = serviceHub.getFlowFactory(markerClass)
if (flowFactory != null) {
val flow = flowFactory(otherParty)
val psm = createFiber(flow)
val session = FlowSession(flow, otherParty, random63BitValue(), otherPartySessionId)
if (sessionInit.firstPayload != null) {
session.receivedMessages += SessionData(session.ourSessionId, sessionInit.firstPayload)
}
openSessions[session.ourSessionId] = session
psm.openSessions[Pair(protocol, otherParty)] = session
psm.openSessions[Pair(flow, otherParty)] = session
updateCheckpoint(psm)
sendSessionMessage(otherParty, SessionConfirm(otherPartySessionId, session.ourSessionId), psm)
psm.logger.debug { "Initiated from $sessionInit on $session" }
startFiber(psm)
} else {
logger.warn("Unknown protocol marker class in $sessionInit")
logger.warn("Unknown flow marker class in $sessionInit")
sendSessionMessage(otherParty, SessionReject(otherPartySessionId, "Don't know ${markerClass.name}"), null)
}
} catch (e: Exception) {
@ -285,14 +285,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun serializeFiber(fiber: ProtocolStateMachineImpl<*>): SerializedBytes<ProtocolStateMachineImpl<*>> {
private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes<FlowStateMachineImpl<*>> {
val kryo = quasarKryo()
// add the map of tokens -> tokenizedServices to the kyro context
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
return fiber.serialize(kryo)
}
private fun deserializeFiber(serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>): ProtocolStateMachineImpl<*> {
private fun deserializeFiber(serialisedFiber: SerializedBytes<FlowStateMachineImpl<*>>): FlowStateMachineImpl<*> {
val kryo = quasarKryo()
// put the map of token -> tokenized into the kryo context
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
@ -304,12 +304,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
return createKryo(serializer.kryo)
}
private fun <T> createFiber(logic: ProtocolLogic<T>): ProtocolStateMachineImpl<T> {
private fun <T> createFiber(logic: FlowLogic<T>): FlowStateMachineImpl<T> {
val id = StateMachineRunId.createRandom()
return ProtocolStateMachineImpl(id, logic, scheduler).apply { initFiber(this) }
return FlowStateMachineImpl(id, logic, scheduler).apply { initFiber(this) }
}
private fun initFiber(psm: ProtocolStateMachineImpl<*>) {
private fun initFiber(psm: FlowStateMachineImpl<*>) {
psm.database = database
psm.serviceHub = serviceHub
psm.actionOnSuspend = { ioRequest ->
@ -325,7 +325,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
psm.logic.progressTracker?.currentStep = ProgressTracker.DONE
mutex.locked {
stateMachines.remove(psm)?.let { checkpointStorage.removeCheckpoint(it) }
totalFinishedProtocols.inc()
totalFinishedFlows.inc()
notifyChangeObservers(psm, AddOrRemove.REMOVE)
}
endAllFiberSessions(psm)
@ -334,12 +334,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
mutex.locked {
totalStartedProtocols.inc()
totalStartedFlows.inc()
notifyChangeObservers(psm, AddOrRemove.ADD)
}
}
private fun endAllFiberSessions(psm: ProtocolStateMachineImpl<*>) {
private fun endAllFiberSessions(psm: FlowStateMachineImpl<*>) {
openSessions.values.removeIf { session ->
if (session.psm == psm) {
val otherPartySessionId = session.otherPartySessionId
@ -354,17 +354,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun startFiber(fiber: ProtocolStateMachineImpl<*>) {
private fun startFiber(fiber: FlowStateMachineImpl<*>) {
try {
resumeFiber(fiber)
} catch (e: ExecutionException) {
// There are two ways we can take exceptions in this method:
//
// 1) A bug in the SMM code itself whilst setting up the new protocol. In that case the exception will
// 1) A bug in the SMM code itself whilst setting up the new flow. In that case the exception will
// propagate out of this method as it would for any method.
//
// 2) An exception in the first part of the fiber after it's been invoked for the first time via
// fiber.start(). In this case the exception will be caught and stashed in the protocol logic future,
// fiber.start(). In this case the exception will be caught and stashed in the flow logic future,
// then sent to the unhandled exception handler above which logs it, and is then rethrown wrapped
// in an ExecutionException or RuntimeException+EE so we can just catch it here and ignore it.
} catch (e: RuntimeException) {
@ -378,10 +378,10 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
* The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is
* restarted with checkpointed state machines in the storage service.
*/
fun <T> add(logic: ProtocolLogic<T>): ProtocolStateMachine<T> {
fun <T> add(logic: FlowLogic<T>): FlowStateMachine<T> {
val fiber = createFiber(logic)
// We swap out the parent transaction context as using this frequently leads to a deadlock as we wait
// on the protocol completion future inside that context. The problem is that any progress checkpoints are
// on the flow completion future inside that context. The problem is that any progress checkpoints are
// unable to acquire the table lock and move forward till the calling transaction finishes.
// Committing in line here on a fresh context ensure we can progress.
isolatedTransaction(database) {
@ -396,7 +396,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
return fiber
}
private fun updateCheckpoint(psm: ProtocolStateMachineImpl<*>) {
private fun updateCheckpoint(psm: FlowStateMachineImpl<*>) {
check(psm.state != Strand.State.RUNNING) { "Fiber cannot be running when checkpointing" }
val newCheckpoint = Checkpoint(serializeFiber(psm))
val previousCheckpoint = mutex.locked { stateMachines.put(psm, newCheckpoint) }
@ -407,7 +407,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
checkpointingMeter.mark()
}
private fun resumeFiber(psm: ProtocolStateMachineImpl<*>) {
private fun resumeFiber(psm: FlowStateMachineImpl<*>) {
// Avoid race condition when setting stopping to true and then checking liveFibers
incrementLiveFibers()
if (!stopping) executor.executeASAP {
@ -418,7 +418,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun processIORequest(ioRequest: ProtocolIORequest) {
private fun processIORequest(ioRequest: FlowIORequest) {
if (ioRequest is SendRequest) {
if (ioRequest.message is SessionInit) {
openSessions[ioRequest.session.ourSessionId] = ioRequest.session
@ -431,7 +431,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun sendSessionMessage(party: Party, message: SessionMessage, psm: ProtocolStateMachineImpl<*>?) {
private fun sendSessionMessage(party: Party, message: SessionMessage, psm: FlowStateMachineImpl<*>?) {
val node = serviceHub.networkMapCache.getNodeByCompositeKey(party.owningKey)
?: throw IllegalArgumentException("Don't know about party $party")
val logger = psm?.logger ?: logger
@ -448,7 +448,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
data class SessionInit(val initiatorSessionId: Long,
val initiatorParty: Party,
val protocolName: String,
val flowName: String,
val firstPayload: Any?) : SessionMessage
interface SessionInitResponse : ExistingSessionMessage
@ -470,14 +470,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
data class SessionEnd(override val recipientSessionId: Long) : ExistingSessionMessage
data class ProtocolSession(val protocol: ProtocolLogic<*>,
val otherParty: Party,
val ourSessionId: Long,
var otherPartySessionId: Long?,
@Volatile var waitingForResponse: Boolean = false) {
data class FlowSession(val flow: FlowLogic<*>,
val otherParty: Party,
val ourSessionId: Long,
var otherPartySessionId: Long?,
@Volatile var waitingForResponse: Boolean = false) {
val receivedMessages = ConcurrentLinkedQueue<ExistingSessionMessage>()
val psm: ProtocolStateMachineImpl<*> get() = protocol.psm as ProtocolStateMachineImpl<*>
val psm: FlowStateMachineImpl<*> get() = flow.fsm as FlowStateMachineImpl<*>
}

View File

@ -1,11 +1,9 @@
package net.corda.node.services.transactions
import net.corda.core.crypto.Party
import net.corda.core.node.services.ServiceType
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.flows.NotaryFlow
import net.corda.node.services.api.ServiceHubInternal
import net.corda.protocols.NotaryProtocol
import kotlin.reflect.KClass
/**
* A Notary service acts as the final signer of a transaction ensuring two things:
@ -14,15 +12,15 @@ import kotlin.reflect.KClass
*O
* A transaction has to be signed by a Notary to be considered valid (except for output-only transactions without a timestamp).
*
* This is the base implementation that can be customised with specific Notary transaction commit protocol.
* This is the base implementation that can be customised with specific Notary transaction commit flow.
*/
abstract class NotaryService(services: ServiceHubInternal) : SingletonSerializeAsToken() {
init {
services.registerProtocolInitiator(NotaryProtocol.Client::class) { createProtocol(it) }
services.registerFlowInitiator(NotaryFlow.Client::class) { createFlow(it) }
}
/** Implement a factory that specifies the transaction commit protocol for the notary service to use */
abstract fun createProtocol(otherParty: Party): NotaryProtocol.Service
/** Implement a factory that specifies the transaction commit flow for the notary service to use */
abstract fun createFlow(otherParty: Party): NotaryFlow.Service
}

View File

@ -2,8 +2,8 @@ package net.corda.node.services.transactions
import net.corda.core.crypto.Party
import net.corda.core.node.services.TimestampChecker
import net.corda.flows.ValidatingNotaryFlow
import net.corda.node.services.api.ServiceHubInternal
import net.corda.protocols.ValidatingNotaryProtocol
/** A validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
class RaftValidatingNotaryService(services: ServiceHubInternal,
@ -13,7 +13,7 @@ class RaftValidatingNotaryService(services: ServiceHubInternal,
val type = ValidatingNotaryService.type.getSubType("raft")
}
override fun createProtocol(otherParty: Party): ValidatingNotaryProtocol {
return ValidatingNotaryProtocol(otherParty, timestampChecker, uniquenessProvider)
override fun createFlow(otherParty: Party): ValidatingNotaryFlow {
return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
}
}

View File

@ -4,9 +4,8 @@ import net.corda.core.crypto.Party
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.TimestampChecker
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.utilities.loggerFor
import net.corda.flows.NotaryFlow
import net.corda.node.services.api.ServiceHubInternal
import net.corda.protocols.NotaryProtocol
/** A simple Notary service that does not perform transaction validation */
class SimpleNotaryService(services: ServiceHubInternal,
@ -16,7 +15,7 @@ class SimpleNotaryService(services: ServiceHubInternal,
val type = ServiceType.notary.getSubType("simple")
}
override fun createProtocol(otherParty: Party): NotaryProtocol.Service {
return NotaryProtocol.Service(otherParty, timestampChecker, uniquenessProvider)
override fun createFlow(otherParty: Party): NotaryFlow.Service {
return NotaryFlow.Service(otherParty, timestampChecker, uniquenessProvider)
}
}

View File

@ -4,8 +4,8 @@ import net.corda.core.crypto.Party
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.TimestampChecker
import net.corda.core.node.services.UniquenessProvider
import net.corda.flows.ValidatingNotaryFlow
import net.corda.node.services.api.ServiceHubInternal
import net.corda.protocols.ValidatingNotaryProtocol
/** A Notary service that validates the transaction chain of he submitted transaction before committing it */
class ValidatingNotaryService(services: ServiceHubInternal,
@ -15,7 +15,7 @@ class ValidatingNotaryService(services: ServiceHubInternal,
val type = ServiceType.notary.getSubType("validating")
}
override fun createProtocol(otherParty: Party): ValidatingNotaryProtocol {
return ValidatingNotaryProtocol(otherParty, timestampChecker, uniquenessProvider)
override fun createFlow(otherParty: Party): ValidatingNotaryFlow {
return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
}
}

View File

@ -1,28 +1,28 @@
package net.corda.node.utilities
import net.corda.core.ThreadBox
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.flows.FlowLogic
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.statemachine.StateMachineManager
import java.util.*
/**
* This observes the [StateMachineManager] and follows the progress of [ProtocolLogic]s until they complete in the order
* This observes the [StateMachineManager] and follows the progress of [FlowLogic]s until they complete in the order
* they are added to the [StateMachineManager].
*/
class ANSIProgressObserver(val smm: StateMachineManager) {
init {
smm.changes.subscribe { change ->
when (change.addOrRemove) {
AddOrRemove.ADD -> addProtocolLogic(change.logic)
AddOrRemove.REMOVE -> removeProtocolLogic(change.logic)
AddOrRemove.ADD -> addFlowLogic(change.logic)
AddOrRemove.REMOVE -> removeFlowLogic(change.logic)
}
}
}
private class Content {
var currentlyRendering: ProtocolLogic<*>? = null
val pending = ArrayDeque<ProtocolLogic<*>>()
var currentlyRendering: FlowLogic<*>? = null
val pending = ArrayDeque<FlowLogic<*>>()
}
private val state = ThreadBox(Content())
@ -39,18 +39,18 @@ class ANSIProgressObserver(val smm: StateMachineManager) {
}
}
private fun removeProtocolLogic(protocolLogic: ProtocolLogic<*>) {
private fun removeFlowLogic(flowLogic: FlowLogic<*>) {
state.locked {
protocolLogic.progressTracker?.currentStep = ProgressTracker.DONE
if (currentlyRendering == protocolLogic) {
flowLogic.progressTracker?.currentStep = ProgressTracker.DONE
if (currentlyRendering == flowLogic) {
wireUpProgressRendering()
}
}
}
private fun addProtocolLogic(protocolLogic: ProtocolLogic<*>) {
private fun addFlowLogic(flowLogic: FlowLogic<*>) {
state.locked {
pending.add(protocolLogic)
pending.add(flowLogic)
if ((currentlyRendering?.progressTracker?.currentStep ?: ProgressTracker.DONE) == ProgressTracker.DONE) {
wireUpProgressRendering()
}

View File

@ -27,9 +27,9 @@ import kotlin.concurrent.withLock
* or testing.
*
* Currently this is intended for use within a node as a simplified way for Oracles to implement subscriptions for changing
* data by running a protocol internally to implement the request handler (see [NodeInterestRates.Oracle]), which can then
* data by running a flow internally to implement the request handler (see [NodeInterestRates.Oracle]), which can then
* effectively relinquish control until the data becomes available. This isn't the most scalable design and is intended
* to be temporary. In addition, it's enitrely possible to envisage a time when we want public [ProtocolLogic]
* to be temporary. In addition, it's enitrely possible to envisage a time when we want public [FlowLogic]
* implementations to be able to wait for some condition to become true outside of message send/receive. At that point
* we may revisit this implementation and indeed the whole model for this, when we understand that requirement more fully.
*

View File

@ -2,23 +2,23 @@ package net.corda.node
import net.corda.contracts.asset.Cash
import net.corda.core.contracts.*
import net.corda.core.flows.StateMachineRunId
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.Vault
import net.corda.core.protocols.StateMachineRunId
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.node.internal.CordaRPCOpsImpl
import net.corda.node.services.User
import net.corda.node.services.messaging.CURRENT_RPC_USER
import net.corda.node.services.messaging.PermissionException
import net.corda.node.services.messaging.StateMachineUpdate
import net.corda.node.services.messaging.startProtocol
import net.corda.node.services.messaging.startFlow
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.startProtocolPermission
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.utilities.databaseTransaction
import net.corda.protocols.CashCommand
import net.corda.protocols.CashProtocol
import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.node.MockNetwork
@ -48,7 +48,7 @@ class CordaRPCOpsImplTest {
aliceNode = network.createNode(networkMapAddress = networkMap.info.address)
notaryNode = network.createNode(advertisedServices = ServiceInfo(SimpleNotaryService.type), networkMapAddress = networkMap.info.address)
rpc = CordaRPCOpsImpl(aliceNode.services, aliceNode.smm, aliceNode.database)
CURRENT_RPC_USER.set(User("user", "pwd", permissions = setOf(startProtocolPermission<CashProtocol>())))
CURRENT_RPC_USER.set(User("user", "pwd", permissions = setOf(startFlowPermission<CashFlow>())))
stateMachineUpdates = rpc.stateMachinesAndUpdates().second
transactions = rpc.verifiedTransactions().second
@ -68,7 +68,7 @@ class CordaRPCOpsImplTest {
// Tell the monitoring service node to issue some cash
val recipient = aliceNode.info.legalIdentity
val outEvent = CashCommand.IssueCash(Amount(quantity, GBP), ref, recipient, notaryNode.info.notaryIdentity)
rpc.startProtocol(::CashProtocol, outEvent)
rpc.startFlow(::CashFlow, outEvent)
network.runNetwork()
val expectedState = Cash.State(Amount(quantity,
@ -105,7 +105,7 @@ class CordaRPCOpsImplTest {
@Test
fun `issue and move`() {
rpc.startProtocol(::CashProtocol, CashCommand.IssueCash(
rpc.startFlow(::CashFlow, CashCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.info.legalIdentity,
@ -114,7 +114,7 @@ class CordaRPCOpsImplTest {
network.runNetwork()
rpc.startProtocol(::CashProtocol, CashCommand.PayCash(
rpc.startFlow(::CashFlow, CashCommand.PayCash(
amount = Amount(100, Issued(PartyAndReference(aliceNode.info.legalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),
recipient = aliceNode.info.legalIdentity
))
@ -186,7 +186,7 @@ class CordaRPCOpsImplTest {
fun `cash command by user not permissioned for cash`() {
CURRENT_RPC_USER.set(User("user", "pwd", permissions = emptySet()))
assertThatExceptionOfType(PermissionException::class.java).isThrownBy {
rpc.startProtocol(::CashProtocol, CashCommand.IssueCash(
rpc.startFlow(::CashFlow, CashCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.info.legalIdentity,

View File

@ -6,12 +6,12 @@ import net.corda.core.crypto.sha256
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.ServiceInfo
import net.corda.core.write
import net.corda.flows.FetchAttachmentsFlow
import net.corda.flows.FetchDataFlow
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.protocols.FetchAttachmentsProtocol
import net.corda.protocols.FetchDataProtocol
import net.corda.testing.node.MockNetwork
import net.corda.testing.rootCauseExceptions
import org.junit.Before
@ -49,9 +49,9 @@ class AttachmentTests {
// Insert an attachment into node zero's store directly.
val id = n0.storage.attachments.importAttachment(ByteArrayInputStream(fakeAttachment()))
// Get node one to run a protocol to fetch it and insert it.
// Get node one to run a flow to fetch it and insert it.
network.runNetwork()
val f1 = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity))
val f1 = n1.services.startFlow(FetchAttachmentsFlow(setOf(id), n0.info.legalIdentity))
network.runNetwork()
assertEquals(0, f1.resultFuture.get().fromDisk.size)
@ -62,7 +62,7 @@ class AttachmentTests {
// Shut down node zero and ensure node one can still resolve the attachment.
n0.stop()
val response: FetchDataProtocol.Result<Attachment> = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity)).resultFuture.get()
val response: FetchDataFlow.Result<Attachment> = n1.services.startFlow(FetchAttachmentsFlow(setOf(id), n0.info.legalIdentity)).resultFuture.get()
assertEquals(attachment, response.fromDisk[0])
}
@ -73,9 +73,9 @@ class AttachmentTests {
// Get node one to fetch a non-existent attachment.
val hash = SecureHash.randomSHA256()
network.runNetwork()
val f1 = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(hash), n0.info.legalIdentity))
val f1 = n1.services.startFlow(FetchAttachmentsFlow(setOf(hash), n0.info.legalIdentity))
network.runNetwork()
val e = assertFailsWith<FetchDataProtocol.HashNotFound> { rootCauseExceptions { f1.resultFuture.get() } }
val e = assertFailsWith<FetchDataFlow.HashNotFound> { rootCauseExceptions { f1.resultFuture.get() } }
assertEquals(hash, e.requested)
}
@ -104,9 +104,9 @@ class AttachmentTests {
// Get n1 to fetch the attachment. Should receive corrupted bytes.
network.runNetwork()
val f1 = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity))
val f1 = n1.services.startFlow(FetchAttachmentsFlow(setOf(id), n0.info.legalIdentity))
network.runNetwork()
assertFailsWith<FetchDataProtocol.DownloadedVsRequestedDataMismatch> {
assertFailsWith<FetchDataFlow.DownloadedVsRequestedDataMismatch> {
rootCauseExceptions { f1.resultFuture.get() }
}
}

View File

@ -9,11 +9,11 @@ import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.composite
import net.corda.core.days
import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.map
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.services.*
import net.corda.core.protocols.ProtocolStateMachine
import net.corda.core.protocols.StateMachineRunId
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.transactions.WireTransaction
@ -21,6 +21,8 @@ import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.core.utilities.LogHelper
import net.corda.core.utilities.TEST_TX_TIME
import net.corda.flows.TwoPartyTradeFlow.Buyer
import net.corda.flows.TwoPartyTradeFlow.Seller
import net.corda.node.internal.AbstractNode
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.persistence.DBTransactionStorage
@ -28,8 +30,6 @@ import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.persistence.StorageServiceImpl
import net.corda.node.services.persistence.checkpoints
import net.corda.node.utilities.databaseTransaction
import net.corda.protocols.TwoPartyTradeProtocol.Buyer
import net.corda.protocols.TwoPartyTradeProtocol.Seller
import net.corda.testing.*
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.MockNetwork
@ -58,7 +58,7 @@ import kotlin.test.assertTrue
*
* We assume that Alice and Bob already found each other via some market, and have agreed the details already.
*/
class TwoPartyTradeProtocolTests {
class TwoPartyTradeFlowTests {
lateinit var net: MockNetwork
lateinit var notaryNode: MockNetwork.MockNode
@ -146,7 +146,7 @@ class TwoPartyTradeProtocolTests {
insertFakeTransactions(alicesFakePaper, aliceNode, aliceKey, notaryKey)
val aliceFuture = runBuyerAndSeller("alice's paper".outputStateAndRef()).sellerResult
// Everything is on this thread so we can now step through the protocol one step at a time.
// Everything is on this thread so we can now step through the flow one step at a time.
// Seller Alice already sent a message to Buyer Bob. Pump once:
bobNode.pumpReceive()
@ -408,18 +408,18 @@ class TwoPartyTradeProtocolTests {
private data class RunResult(
// The buyer is not created immediately, only when the seller starts running
val buyer: Future<ProtocolStateMachine<*>>,
val buyer: Future<FlowStateMachine<*>>,
val sellerResult: Future<SignedTransaction>,
val sellerId: StateMachineRunId
)
private fun runBuyerAndSeller(assetToSell: StateAndRef<OwnableState>) : RunResult {
val buyerFuture = bobNode.initiateSingleShotProtocol(Seller::class) { otherParty ->
val buyerFuture = bobNode.initiateSingleShotFlow(Seller::class) { otherParty ->
Buyer(otherParty, notaryNode.info.notaryIdentity, 1000.DOLLARS, CommercialPaper.State::class.java)
}.map { it.psm }
}.map { it.fsm }
val seller = Seller(bobNode.info.legalIdentity, notaryNode.info, assetToSell, 1000.DOLLARS, ALICE_KEY)
val sellerResultFuture = aliceNode.smm.add(seller).resultFuture
return RunResult(buyerFuture, sellerResultFuture, seller.psm.id)
return RunResult(buyerFuture, sellerResultFuture, seller.fsm.id)
}
private fun LedgerDSL<TestTransactionDSLInterpreter, TestLedgerDSLInterpreter>.runWithError(

View File

@ -152,7 +152,7 @@ class ArtemisMessagingTests {
val message = messagingClient.createMessage(topic, DEFAULT_SESSION_ID, "first msg".toByteArray())
messagingClient.send(message, messagingClient.myAddress)
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_FLOW_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
messagingClient.send(networkMapMessage, messagingClient.myAddress)
val actual: Message = receivedMessages.take()
@ -179,7 +179,7 @@ class ArtemisMessagingTests {
messagingClient.send(message, messagingClient.myAddress)
}
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
val networkMapMessage = messagingClient.createMessage(NetworkMapService.FETCH_FLOW_TOPIC, DEFAULT_SESSION_ID, "second msg".toByteArray())
messagingClient.send(networkMapMessage, messagingClient.myAddress)
val actual: Message = receivedMessages.take()
@ -212,7 +212,7 @@ class ArtemisMessagingTests {
messagingClient.addMessageHandler(topic) { message, r ->
receivedMessages.add(message)
}
messagingClient.addMessageHandler(NetworkMapService.FETCH_PROTOCOL_TOPIC) { message, r ->
messagingClient.addMessageHandler(NetworkMapService.FETCH_FLOW_TOPIC) { message, r ->
receivedMessages.add(message)
}
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.

View File

@ -4,17 +4,17 @@ import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.map
import net.corda.core.messaging.send
import net.corda.core.node.services.DEFAULT_SESSION_ID
import net.corda.flows.sendRequest
import net.corda.node.services.network.AbstractNetworkMapService
import net.corda.node.services.network.InMemoryNetworkMapService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.*
import net.corda.node.services.network.NetworkMapService.Companion.FETCH_PROTOCOL_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_PROTOCOL_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_PROTOCOL_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.FETCH_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.REGISTER_FLOW_TOPIC
import net.corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_FLOW_TOPIC
import net.corda.node.services.network.NodeRegistration
import net.corda.node.utilities.AddOrRemove
import net.corda.protocols.sendRequest
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import org.junit.Before
@ -165,23 +165,23 @@ abstract class AbstractNetworkMapServiceTest {
private fun MockNode.registration(mapServiceNode: MockNode, reg: NodeRegistration, privateKey: PrivateKey): ListenableFuture<RegistrationResponse> {
val req = RegistrationRequest(reg.toWire(privateKey), services.networkService.myAddress)
return services.networkService.sendRequest(REGISTER_PROTOCOL_TOPIC, req, mapServiceNode.info.address)
return services.networkService.sendRequest(REGISTER_FLOW_TOPIC, req, mapServiceNode.info.address)
}
private fun MockNode.subscribe(mapServiceNode: MockNode, subscribe: Boolean): ListenableFuture<SubscribeResponse> {
val req = SubscribeRequest(subscribe, services.networkService.myAddress)
return services.networkService.sendRequest(SUBSCRIPTION_PROTOCOL_TOPIC, req, mapServiceNode.info.address)
return services.networkService.sendRequest(SUBSCRIPTION_FLOW_TOPIC, req, mapServiceNode.info.address)
}
private fun MockNode.updateAcknowlege(mapServiceNode: MockNode, mapVersion: Int) {
val req = UpdateAcknowledge(mapVersion, services.networkService.myAddress)
services.networkService.send(PUSH_ACK_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, mapServiceNode.info.address)
services.networkService.send(PUSH_ACK_FLOW_TOPIC, DEFAULT_SESSION_ID, req, mapServiceNode.info.address)
}
private fun MockNode.fetchMap(mapServiceNode: MockNode, subscribe: Boolean, ifChangedSinceVersion: Int? = null): Future<Collection<NodeRegistration>?> {
val net = services.networkService
val req = FetchMapRequest(subscribe, ifChangedSinceVersion, net.myAddress)
return net.sendRequest<FetchMapResponse>(FETCH_PROTOCOL_TOPIC, req, mapServiceNode.info.address).map { it.nodes }
return net.sendRequest<FetchMapResponse>(FETCH_FLOW_TOPIC, req, mapServiceNode.info.address).map { it.nodes }
}
}

View File

@ -2,11 +2,11 @@ package net.corda.node.services
import com.codahale.metrics.MetricRegistry
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.FlowStateMachine
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.*
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolLogicRefFactory
import net.corda.core.protocols.ProtocolStateMachine
import net.corda.core.transactions.SignedTransaction
import net.corda.node.serialization.NodeClock
import net.corda.node.services.api.MessagingServiceInternal
@ -34,7 +34,7 @@ open class MockServiceHubInternal(
val mapCache: NetworkMapCache? = MockNetworkMapCache(),
val scheduler: SchedulerService? = null,
val overrideClock: Clock? = NodeClock(),
val protocolFactory: ProtocolLogicRefFactory? = ProtocolLogicRefFactory(),
val flowFactory: FlowLogicRefFactory? = FlowLogicRefFactory(),
val schemas: SchemaService? = NodeSchemaService()
) : ServiceHubInternal() {
override val vaultService: VaultService = customVault ?: NodeVaultService(this)
@ -56,8 +56,8 @@ open class MockServiceHubInternal(
get() = throw UnsupportedOperationException()
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val protocolLogicRefFactory: ProtocolLogicRefFactory
get() = protocolFactory ?: throw UnsupportedOperationException()
override val flowLogicRefFactory: FlowLogicRefFactory
get() = flowFactory ?: throw UnsupportedOperationException()
override val schemaService: SchemaService
get() = schemas ?: throw UnsupportedOperationException()
@ -65,7 +65,7 @@ open class MockServiceHubInternal(
private val txStorageService: TxWritableStorageService
get() = storage ?: throw UnsupportedOperationException()
private val protocolFactories = ConcurrentHashMap<Class<*>, (Party) -> ProtocolLogic<*>>()
private val flowFactories = ConcurrentHashMap<Class<*>, (Party) -> FlowLogic<*>>()
lateinit var smm: StateMachineManager
@ -79,13 +79,13 @@ open class MockServiceHubInternal(
override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(txStorageService, txs)
override fun <T> startProtocol(logic: ProtocolLogic<T>): ProtocolStateMachine<T> = smm.add(logic)
override fun <T> startFlow(logic: FlowLogic<T>): FlowStateMachine<T> = smm.add(logic)
override fun registerProtocolInitiator(markerClass: KClass<*>, protocolFactory: (Party) -> ProtocolLogic<*>) {
protocolFactories[markerClass.java] = protocolFactory
override fun registerFlowInitiator(markerClass: KClass<*>, flowFactory: (Party) -> FlowLogic<*>) {
flowFactories[markerClass.java] = flowFactory
}
override fun getProtocolFactory(markerClass: Class<*>): ((Party) -> ProtocolLogic<*>)? {
return protocolFactories[markerClass]
override fun getFlowFactory(markerClass: Class<*>): ((Party) -> FlowLogic<*>)? {
return flowFactories[markerClass]
}
}

View File

@ -4,11 +4,11 @@ import net.corda.core.contracts.*
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.composite
import net.corda.core.days
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRef
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.node.ServiceHub
import net.corda.core.node.recordTransactions
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolLogicRef
import net.corda.core.protocols.ProtocolLogicRefFactory
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.node.services.events.NodeSchedulerService
@ -47,7 +47,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
// We have to allow Java boxed primitives but Kotlin warns we shouldn't be using them
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
val factory = ProtocolLogicRefFactory(mapOf(Pair(TestProtocolLogic::class.java.name, setOf(NodeSchedulerServiceTest::class.java.name, Integer::class.java.name))))
val factory = FlowLogicRefFactory(mapOf(Pair(TestFlowLogic::class.java.name, setOf(NodeSchedulerServiceTest::class.java.name, Integer::class.java.name))))
lateinit var services: MockServiceHubInternal
@ -56,12 +56,12 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
lateinit var dataSource: Closeable
lateinit var database: Database
lateinit var countDown: CountDownLatch
lateinit var smmHasRemovedAllProtocols: CountDownLatch
lateinit var smmHasRemovedAllFlows: CountDownLatch
var calls: Int = 0
/**
* Have a reference to this test added to [ServiceHub] so that when the [ProtocolLogic] runs it can access the test instance.
* Have a reference to this test added to [ServiceHub] so that when the [FlowLogic] runs it can access the test instance.
* The [TestState] is serialized and deserialized so attempting to use a transient field won't work, as it just
* results in NPE.
*/
@ -73,7 +73,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
@Before
fun setup() {
countDown = CountDownLatch(1)
smmHasRemovedAllProtocols = CountDownLatch(1)
smmHasRemovedAllFlows = CountDownLatch(1)
calls = 0
val dataSourceAndDatabase = configureDatabase(makeTestDataSourceProperties())
dataSource = dataSourceAndDatabase.first
@ -90,7 +90,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
val mockSMM = StateMachineManager(services, listOf(services, scheduler), DBCheckpointStorage(), smmExecutor, database)
mockSMM.changes.subscribe { change ->
if (change.addOrRemove == AddOrRemove.REMOVE && mockSMM.allStateMachines.isEmpty()) {
smmHasRemovedAllProtocols.countDown()
smmHasRemovedAllFlows.countDown()
}
}
mockSMM.start()
@ -103,14 +103,14 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
fun tearDown() {
// We need to make sure the StateMachineManager is done before shutting down executors.
if (services.smm.allStateMachines.isNotEmpty()) {
smmHasRemovedAllProtocols.await()
smmHasRemovedAllFlows.await()
}
smmExecutor.shutdown()
smmExecutor.awaitTermination(60, TimeUnit.SECONDS)
dataSource.close()
}
class TestState(val protocolLogicRef: ProtocolLogicRef, val instant: Instant) : LinearState, SchedulableState {
class TestState(val flowLogicRef: FlowLogicRef, val instant: Instant) : LinearState, SchedulableState {
override val participants: List<CompositeKey>
get() = throw UnsupportedOperationException()
@ -118,13 +118,13 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean = true
override fun nextScheduledActivity(thisStateRef: StateRef, protocolLogicRefFactory: ProtocolLogicRefFactory): ScheduledActivity? = ScheduledActivity(protocolLogicRef, instant)
override fun nextScheduledActivity(thisStateRef: StateRef, flowLogicRefFactory: FlowLogicRefFactory): ScheduledActivity? = ScheduledActivity(flowLogicRef, instant)
override val contract: Contract
get() = throw UnsupportedOperationException()
}
class TestProtocolLogic(val increment: Int = 1) : ProtocolLogic<Unit>() {
class TestFlowLogic(val increment: Int = 1) : FlowLogic<Unit>() {
override fun call() {
(serviceHub as TestReference).testReference.calls += increment
(serviceHub as TestReference).testReference.countDown.countDown()
@ -267,7 +267,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
databaseTransaction(database) {
apply {
val freshKey = services.keyManagementService.freshKey()
val state = TestState(factory.create(TestProtocolLogic::class.java, increment), instant)
val state = TestState(factory.create(TestFlowLogic::class.java, increment), instant)
val usefulTX = TransactionType.General.Builder(null).apply {
addOutputState(state, DUMMY_NOTARY)
addCommand(Command(), freshKey.public.composite)

View File

@ -7,12 +7,12 @@ import net.corda.core.node.services.ServiceInfo
import net.corda.core.seconds
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.flows.NotaryChangeFlow.Instigator
import net.corda.flows.StateReplacementException
import net.corda.flows.StateReplacementRefused
import net.corda.node.internal.AbstractNode
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.protocols.NotaryChangeProtocol.Instigator
import net.corda.protocols.StateReplacementException
import net.corda.protocols.StateReplacementRefused
import net.corda.testing.node.MockNetwork
import org.junit.Before
import org.junit.Test
@ -48,8 +48,8 @@ class NotaryChangeTests {
fun `should change notary for a state with single participant`() {
val state = issueState(clientNodeA, oldNotaryNode)
val newNotary = newNotaryNode.info.notaryIdentity
val protocol = Instigator(state, newNotary)
val future = clientNodeA.services.startProtocol(protocol)
val flow = Instigator(state, newNotary)
val future = clientNodeA.services.startFlow(flow)
net.runNetwork()
@ -61,8 +61,8 @@ class NotaryChangeTests {
fun `should change notary for a state with multiple participants`() {
val state = issueMultiPartyState(clientNodeA, clientNodeB, oldNotaryNode)
val newNotary = newNotaryNode.info.notaryIdentity
val protocol = Instigator(state, newNotary)
val future = clientNodeA.services.startProtocol(protocol)
val flow = Instigator(state, newNotary)
val future = clientNodeA.services.startFlow(flow)
net.runNetwork()
@ -77,8 +77,8 @@ class NotaryChangeTests {
fun `should throw when a participant refuses to change Notary`() {
val state = issueMultiPartyState(clientNodeA, clientNodeB, oldNotaryNode)
val newEvilNotary = Party("Evil Notary", generateKeyPair().public)
val protocol = Instigator(state, newEvilNotary)
val future = clientNodeA.services.startProtocol(protocol)
val flow = Instigator(state, newEvilNotary)
val future = clientNodeA.services.startFlow(flow)
net.runNetwork()
@ -87,7 +87,7 @@ class NotaryChangeTests {
assertTrue(error is StateReplacementRefused)
}
// TODO: Add more test cases once we have a general protocol/service exception handling mechanism:
// TODO: Add more test cases once we have a general flow/service exception handling mechanism:
// - A participant is offline/can't be found on the network
// - The requesting party is not a participant
// - The requesting party wants to change additional state fields

View File

@ -11,12 +11,12 @@ import net.corda.core.seconds
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.flows.NotaryError
import net.corda.flows.NotaryException
import net.corda.flows.NotaryFlow
import net.corda.node.internal.AbstractNode
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.protocols.NotaryError
import net.corda.protocols.NotaryException
import net.corda.protocols.NotaryProtocol
import net.corda.testing.MINI_CORP_KEY
import net.corda.testing.node.MockNetwork
import org.junit.Before
@ -94,10 +94,10 @@ class NotaryServiceTests {
tx.toSignedTransaction(false)
}
val firstSpend = NotaryProtocol.Client(stx)
val secondSpend = NotaryProtocol.Client(stx)
clientNode.services.startProtocol(firstSpend)
val future = clientNode.services.startProtocol(secondSpend)
val firstSpend = NotaryFlow.Client(stx)
val secondSpend = NotaryFlow.Client(stx)
clientNode.services.startFlow(firstSpend)
val future = clientNode.services.startFlow(secondSpend)
net.runNetwork()
@ -109,8 +109,8 @@ class NotaryServiceTests {
private fun runNotaryClient(stx: SignedTransaction): ListenableFuture<DigitalSignature.WithKey> {
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.services.startProtocol(protocol).resultFuture
val flow = NotaryFlow.Client(stx)
val future = clientNode.services.startFlow(flow).resultFuture
net.runNetwork()
return future
}

View File

@ -8,12 +8,12 @@ import net.corda.core.node.services.ServiceInfo
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.flows.NotaryError
import net.corda.flows.NotaryException
import net.corda.flows.NotaryFlow
import net.corda.node.internal.AbstractNode
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.protocols.NotaryError
import net.corda.protocols.NotaryException
import net.corda.protocols.NotaryProtocol
import net.corda.testing.MEGA_CORP_KEY
import net.corda.testing.MINI_CORP_KEY
import net.corda.testing.node.MockNetwork
@ -79,8 +79,8 @@ class ValidatingNotaryServiceTests {
}
private fun runClient(stx: SignedTransaction): ListenableFuture<DigitalSignature.WithKey> {
val protocol = NotaryProtocol.Client(stx)
val future = clientNode.services.startProtocol(protocol).resultFuture
val flow = NotaryFlow.Client(stx)
val future = clientNode.services.startFlow(flow).resultFuture
net.runNetwork()
return future
}

View File

@ -7,12 +7,12 @@ import net.corda.core.contracts.Issued
import net.corda.core.contracts.TransactionType
import net.corda.core.contracts.USD
import net.corda.core.crypto.Party
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.flows.FlowLogic
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.flows.BroadcastTransactionFlow.NotifyTxRequest
import net.corda.node.services.persistence.DataVending.Service.NotifyTransactionHandler
import net.corda.node.utilities.databaseTransaction
import net.corda.protocols.BroadcastTransactionProtocol.NotifyTxRequest
import net.corda.testing.MEGA_CORP
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
@ -88,13 +88,13 @@ class DataVendingServiceTests {
}
private fun MockNode.sendNotifyTx(tx: SignedTransaction, walletServiceNode: MockNode) {
walletServiceNode.services.registerProtocolInitiator(NotifyTxProtocol::class, ::NotifyTransactionHandler)
services.startProtocol(NotifyTxProtocol(walletServiceNode.info.legalIdentity, tx))
walletServiceNode.services.registerFlowInitiator(NotifyTxFlow::class, ::NotifyTransactionHandler)
services.startFlow(NotifyTxFlow(walletServiceNode.info.legalIdentity, tx))
network.runNetwork()
}
private class NotifyTxProtocol(val otherParty: Party, val stx: SignedTransaction) : ProtocolLogic<Unit>() {
private class NotifyTxFlow(val otherParty: Party, val stx: SignedTransaction) : FlowLogic<Unit>() {
@Suspendable
override fun call() = send(otherParty, NotifyTxRequest(stx))
}

View File

@ -4,14 +4,14 @@ import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.crypto.Party
import net.corda.core.protocols.ProtocolLogic
import net.corda.core.protocols.ProtocolSessionException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSessionException
import net.corda.core.random63BitValue
import net.corda.core.serialization.deserialize
import net.corda.node.services.persistence.checkpoints
import net.corda.node.services.statemachine.StateMachineManager.*
import net.corda.node.utilities.databaseTransaction
import net.corda.testing.initiateSingleShotProtocol
import net.corda.testing.initiateSingleShotFlow
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
import net.corda.testing.node.MockNetwork
@ -49,29 +49,29 @@ class StateMachineManagerTests {
}
@Test
fun `newly added protocol is preserved on restart`() {
node1.smm.add(NoOpProtocol(nonTerminating = true))
fun `newly added flow is preserved on restart`() {
node1.smm.add(NoOpFlow(nonTerminating = true))
node1.acceptableLiveFiberCountOnStop = 1
val restoredProtocol = node1.restartAndGetRestoredProtocol<NoOpProtocol>()
assertThat(restoredProtocol.protocolStarted).isTrue()
val restoredFlow = node1.restartAndGetRestoredFlow<NoOpFlow>()
assertThat(restoredFlow.flowStarted).isTrue()
}
@Test
fun `protocol can lazily use the serviceHub in its constructor`() {
val protocol = object : ProtocolLogic<Unit>() {
fun `flow can lazily use the serviceHub in its constructor`() {
val flow = object : FlowLogic<Unit>() {
val lazyTime by lazy { serviceHub.clock.instant() }
@Suspendable
override fun call() = Unit
}
node1.smm.add(protocol)
assertThat(protocol.lazyTime).isNotNull()
node1.smm.add(flow)
assertThat(flow.lazyTime).isNotNull()
}
@Test
fun `protocol restarted just after receiving payload`() {
node2.services.registerProtocolInitiator(SendProtocol::class) { ReceiveThenSuspendProtocol(it) }
fun `flow restarted just after receiving payload`() {
node2.services.registerFlowInitiator(SendFlow::class) { ReceiveThenSuspendFlow(it) }
val payload = random63BitValue()
node1.smm.add(SendProtocol(payload, node2.info.legalIdentity))
node1.smm.add(SendFlow(payload, node2.info.legalIdentity))
// We push through just enough messages to get only the payload sent
node2.pumpReceive()
@ -79,60 +79,60 @@ class StateMachineManagerTests {
node2.acceptableLiveFiberCountOnStop = 1
node2.stop()
net.runNetwork()
val restoredProtocol = node2.restartAndGetRestoredProtocol<ReceiveThenSuspendProtocol>(node1)
assertThat(restoredProtocol.receivedPayloads[0]).isEqualTo(payload)
val restoredFlow = node2.restartAndGetRestoredFlow<ReceiveThenSuspendFlow>(node1)
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo(payload)
}
@Test
fun `protocol added before network map does run after init`() {
fun `flow added before network map does run after init`() {
val node3 = net.createNode(node1.info.address) //create vanilla node
val protocol = NoOpProtocol()
node3.smm.add(protocol)
assertEquals(false, protocol.protocolStarted) // Not started yet as no network activity has been allowed yet
val flow = NoOpFlow()
node3.smm.add(flow)
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
net.runNetwork() // Allow network map messages to flow
assertEquals(true, protocol.protocolStarted) // Now we should have run the protocol
assertEquals(true, flow.flowStarted) // Now we should have run the flow
}
@Test
fun `protocol added before network map will be init checkpointed`() {
fun `flow added before network map will be init checkpointed`() {
var node3 = net.createNode(node1.info.address) //create vanilla node
val protocol = NoOpProtocol()
node3.smm.add(protocol)
assertEquals(false, protocol.protocolStarted) // Not started yet as no network activity has been allowed yet
val flow = NoOpFlow()
node3.smm.add(flow)
assertEquals(false, flow.flowStarted) // Not started yet as no network activity has been allowed yet
node3.disableDBCloseOnStop()
node3.stop()
node3 = net.createNode(node1.info.address, forcedID = node3.id)
val restoredProtocol = node3.getSingleProtocol<NoOpProtocol>().first
assertEquals(false, restoredProtocol.protocolStarted) // Not started yet as no network activity has been allowed yet
val restoredFlow = node3.getSingleFlow<NoOpFlow>().first
assertEquals(false, restoredFlow.flowStarted) // Not started yet as no network activity has been allowed yet
net.runNetwork() // Allow network map messages to flow
node3.smm.executor.flush()
assertEquals(true, restoredProtocol.protocolStarted) // Now we should have run the protocol and hopefully cleared the init checkpoint
assertEquals(true, restoredFlow.flowStarted) // Now we should have run the flow and hopefully cleared the init checkpoint
node3.disableDBCloseOnStop()
node3.stop()
// Now it is completed the protocol should leave no Checkpoint.
// Now it is completed the flow should leave no Checkpoint.
node3 = net.createNode(node1.info.address, forcedID = node3.id)
net.runNetwork() // Allow network map messages to flow
node3.smm.executor.flush()
assertTrue(node3.smm.findStateMachines(NoOpProtocol::class.java).isEmpty())
assertTrue(node3.smm.findStateMachines(NoOpFlow::class.java).isEmpty())
}
@Test
fun `protocol loaded from checkpoint will respond to messages from before start`() {
fun `flow loaded from checkpoint will respond to messages from before start`() {
val payload = random63BitValue()
node1.services.registerProtocolInitiator(ReceiveThenSuspendProtocol::class) { SendProtocol(payload, it) }
node2.smm.add(ReceiveThenSuspendProtocol(node1.info.legalIdentity)) // Prepare checkpointed receive protocol
node1.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { SendFlow(payload, it) }
node2.smm.add(ReceiveThenSuspendFlow(node1.info.legalIdentity)) // Prepare checkpointed receive flow
// Make sure the add() has finished initial processing.
node2.smm.executor.flush()
node2.disableDBCloseOnStop()
node2.stop() // kill receiver
val restoredProtocol = node2.restartAndGetRestoredProtocol<ReceiveThenSuspendProtocol>(node1)
assertThat(restoredProtocol.receivedPayloads[0]).isEqualTo(payload)
val restoredFlow = node2.restartAndGetRestoredFlow<ReceiveThenSuspendFlow>(node1)
assertThat(restoredFlow.receivedPayloads[0]).isEqualTo(payload)
}
@Test
fun `protocol with send will resend on interrupted restart`() {
fun `flow with send will resend on interrupted restart`() {
val payload = random63BitValue()
val payload2 = random63BitValue()
@ -140,11 +140,11 @@ class StateMachineManagerTests {
net.messagingNetwork.sentMessages.toSessionTransfers().filter { it.isPayloadTransfer }.forEach { sentCount++ }
val node3 = net.createNode(node1.info.address)
val secondProtocol = node3.initiateSingleShotProtocol(PingPongProtocol::class) { PingPongProtocol(it, payload2) }
val secondFlow = node3.initiateSingleShotFlow(PingPongFlow::class) { PingPongFlow(it, payload2) }
net.runNetwork()
// Kick off first send and receive
node2.smm.add(PingPongProtocol(node3.info.legalIdentity, payload))
node2.smm.add(PingPongFlow(node3.info.legalIdentity, payload))
databaseTransaction(node2.database) {
assertEquals(1, node2.checkpointStorage.checkpoints().size)
}
@ -158,55 +158,55 @@ class StateMachineManagerTests {
}
val node2b = net.createNode(node1.info.address, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray())
node2.manuallyCloseDB()
val (firstAgain, fut1) = node2b.getSingleProtocol<PingPongProtocol>()
// Run the network which will also fire up the second protocol. First message should get deduped. So message data stays in sync.
val (firstAgain, fut1) = node2b.getSingleFlow<PingPongFlow>()
// Run the network which will also fire up the second flow. First message should get deduped. So message data stays in sync.
net.runNetwork()
node2b.smm.executor.flush()
fut1.get()
val receivedCount = sessionTransfers.count { it.isPayloadTransfer }
// Check protocols completed cleanly and didn't get out of phase
assertEquals(4, receivedCount, "Protocol should have exchanged 4 unique messages")// Two messages each way
// Check flows completed cleanly and didn't get out of phase
assertEquals(4, receivedCount, "Flow should have exchanged 4 unique messages")// Two messages each way
// can't give a precise value as every addMessageHandler re-runs the undelivered messages
assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages")
databaseTransaction(node2b.database) {
assertEquals(0, node2b.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended")
assertEquals(0, node2b.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
}
databaseTransaction(node3.database) {
assertEquals(0, node3.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended")
assertEquals(0, node3.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
}
assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3")
assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3")
assertEquals(payload, secondProtocol.get().receivedPayload, "Received payload does not match the (restarted) first value on Node 2")
assertEquals(payload + 1, secondProtocol.get().receivedPayload2, "Received payload does not match the expected second value on Node 2")
assertEquals(payload, secondFlow.get().receivedPayload, "Received payload does not match the (restarted) first value on Node 2")
assertEquals(payload + 1, secondFlow.get().receivedPayload2, "Received payload does not match the expected second value on Node 2")
}
@Test
fun `sending to multiple parties`() {
val node3 = net.createNode(node1.info.address)
net.runNetwork()
node2.services.registerProtocolInitiator(SendProtocol::class) { ReceiveThenSuspendProtocol(it) }
node3.services.registerProtocolInitiator(SendProtocol::class) { ReceiveThenSuspendProtocol(it) }
node2.services.registerFlowInitiator(SendFlow::class) { ReceiveThenSuspendFlow(it) }
node3.services.registerFlowInitiator(SendFlow::class) { ReceiveThenSuspendFlow(it) }
val payload = random63BitValue()
node1.smm.add(SendProtocol(payload, node2.info.legalIdentity, node3.info.legalIdentity))
node1.smm.add(SendFlow(payload, node2.info.legalIdentity, node3.info.legalIdentity))
net.runNetwork()
val node2Protocol = node2.getSingleProtocol<ReceiveThenSuspendProtocol>().first
val node3Protocol = node3.getSingleProtocol<ReceiveThenSuspendProtocol>().first
assertThat(node2Protocol.receivedPayloads[0]).isEqualTo(payload)
assertThat(node3Protocol.receivedPayloads[0]).isEqualTo(payload)
val node2Flow = node2.getSingleFlow<ReceiveThenSuspendFlow>().first
val node3Flow = node3.getSingleFlow<ReceiveThenSuspendFlow>().first
assertThat(node2Flow.receivedPayloads[0]).isEqualTo(payload)
assertThat(node3Flow.receivedPayloads[0]).isEqualTo(payload)
assertSessionTransfers(node2,
node1 sent sessionInit(node1, SendProtocol::class, payload) to node2,
node1 sent sessionInit(node1, SendFlow::class, payload) to node2,
node2 sent sessionConfirm() to node1,
node1 sent sessionEnd() to node2
//There's no session end from the other protocols as they're manually suspended
//There's no session end from the other flows as they're manually suspended
)
assertSessionTransfers(node3,
node1 sent sessionInit(node1, SendProtocol::class, payload) to node3,
node1 sent sessionInit(node1, SendFlow::class, payload) to node3,
node3 sent sessionConfirm() to node1,
node1 sent sessionEnd() to node3
//There's no session end from the other protocols as they're manually suspended
//There's no session end from the other flows as they're manually suspended
)
node2.acceptableLiveFiberCountOnStop = 1
@ -219,24 +219,24 @@ class StateMachineManagerTests {
net.runNetwork()
val node2Payload = random63BitValue()
val node3Payload = random63BitValue()
node2.services.registerProtocolInitiator(ReceiveThenSuspendProtocol::class) { SendProtocol(node2Payload, it) }
node3.services.registerProtocolInitiator(ReceiveThenSuspendProtocol::class) { SendProtocol(node3Payload, it) }
val multiReceiveProtocol = ReceiveThenSuspendProtocol(node2.info.legalIdentity, node3.info.legalIdentity)
node1.smm.add(multiReceiveProtocol)
node2.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { SendFlow(node2Payload, it) }
node3.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { SendFlow(node3Payload, it) }
val multiReceiveFlow = ReceiveThenSuspendFlow(node2.info.legalIdentity, node3.info.legalIdentity)
node1.smm.add(multiReceiveFlow)
node1.acceptableLiveFiberCountOnStop = 1
net.runNetwork()
assertThat(multiReceiveProtocol.receivedPayloads[0]).isEqualTo(node2Payload)
assertThat(multiReceiveProtocol.receivedPayloads[1]).isEqualTo(node3Payload)
assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(node2Payload)
assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(node3Payload)
assertSessionTransfers(node2,
node1 sent sessionInit(node1, ReceiveThenSuspendProtocol::class) to node2,
node1 sent sessionInit(node1, ReceiveThenSuspendFlow::class) to node2,
node2 sent sessionConfirm() to node1,
node2 sent sessionData(node2Payload) to node1,
node2 sent sessionEnd() to node1
)
assertSessionTransfers(node3,
node1 sent sessionInit(node1, ReceiveThenSuspendProtocol::class) to node3,
node1 sent sessionInit(node1, ReceiveThenSuspendFlow::class) to node3,
node3 sent sessionConfirm() to node1,
node3 sent sessionData(node3Payload) to node1,
node3 sent sessionEnd() to node1
@ -245,12 +245,12 @@ class StateMachineManagerTests {
@Test
fun `both sides do a send as their first IO request`() {
node2.services.registerProtocolInitiator(PingPongProtocol::class) { PingPongProtocol(it, 20L) }
node1.smm.add(PingPongProtocol(node2.info.legalIdentity, 10L))
node2.services.registerFlowInitiator(PingPongFlow::class) { PingPongFlow(it, 20L) }
node1.smm.add(PingPongFlow(node2.info.legalIdentity, 10L))
net.runNetwork()
assertSessionTransfers(
node1 sent sessionInit(node1, PingPongProtocol::class, 10L) to node2,
node1 sent sessionInit(node1, PingPongFlow::class, 10L) to node2,
node2 sent sessionConfirm() to node1,
node2 sent sessionData(20L) to node1,
node1 sent sessionData(11L) to node2,
@ -261,18 +261,18 @@ class StateMachineManagerTests {
@Test
fun `exception thrown on other side`() {
node2.services.registerProtocolInitiator(ReceiveThenSuspendProtocol::class) { ExceptionProtocol }
val future = node1.smm.add(ReceiveThenSuspendProtocol(node2.info.legalIdentity)).resultFuture
node2.services.registerFlowInitiator(ReceiveThenSuspendFlow::class) { ExceptionFlow }
val future = node1.smm.add(ReceiveThenSuspendFlow(node2.info.legalIdentity)).resultFuture
net.runNetwork()
assertThatThrownBy { future.get() }.hasCauseInstanceOf(ProtocolSessionException::class.java)
assertThatThrownBy { future.get() }.hasCauseInstanceOf(FlowSessionException::class.java)
assertSessionTransfers(
node1 sent sessionInit(node1, ReceiveThenSuspendProtocol::class) to node2,
node1 sent sessionInit(node1, ReceiveThenSuspendFlow::class) to node2,
node2 sent sessionConfirm() to node1,
node2 sent sessionEnd() to node1
)
}
private inline fun <reified P : ProtocolLogic<*>> MockNode.restartAndGetRestoredProtocol(
private inline fun <reified P : FlowLogic<*>> MockNode.restartAndGetRestoredFlow(
networkMapNode: MockNode? = null): P {
disableDBCloseOnStop() //Handover DB to new node copy
stop()
@ -280,15 +280,15 @@ class StateMachineManagerTests {
newNode.acceptableLiveFiberCountOnStop = 1
manuallyCloseDB()
mockNet.runNetwork() // allow NetworkMapService messages to stabilise and thus start the state machine
return newNode.getSingleProtocol<P>().first
return newNode.getSingleFlow<P>().first
}
private inline fun <reified P : ProtocolLogic<*>> MockNode.getSingleProtocol(): Pair<P, ListenableFuture<*>> {
private inline fun <reified P : FlowLogic<*>> MockNode.getSingleFlow(): Pair<P, ListenableFuture<*>> {
return smm.findStateMachines(P::class.java).single()
}
private fun sessionInit(initiatorNode: MockNode, protocolMarker: KClass<*>, payload: Any? = null): SessionInit {
return SessionInit(0, initiatorNode.info.legalIdentity, protocolMarker.java.name, payload)
private fun sessionInit(initiatorNode: MockNode, flowMarker: KClass<*>, payload: Any? = null): SessionInit {
return SessionInit(0, initiatorNode.info.legalIdentity, flowMarker.java.name, payload)
}
private fun sessionConfirm() = SessionConfirm(0, 0)
@ -334,13 +334,13 @@ class StateMachineManagerTests {
private infix fun Pair<Int, SessionMessage>.to(node: MockNode): SessionTransfer = SessionTransfer(first, second, node.id)
private class NoOpProtocol(val nonTerminating: Boolean = false) : ProtocolLogic<Unit>() {
private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>() {
@Transient var protocolStarted = false
@Transient var flowStarted = false
@Suspendable
override fun call() {
protocolStarted = true
flowStarted = true
if (nonTerminating) {
Fiber.park()
}
@ -348,7 +348,7 @@ class StateMachineManagerTests {
}
private class SendProtocol(val payload: Any, vararg val otherParties: Party) : ProtocolLogic<Unit>() {
private class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic<Unit>() {
init {
require(otherParties.isNotEmpty())
@ -359,7 +359,7 @@ class StateMachineManagerTests {
}
private class ReceiveThenSuspendProtocol(vararg val otherParties: Party) : ProtocolLogic<Unit>() {
private class ReceiveThenSuspendFlow(vararg val otherParties: Party) : FlowLogic<Unit>() {
init {
require(otherParties.isNotEmpty())
@ -375,7 +375,7 @@ class StateMachineManagerTests {
}
}
private class PingPongProtocol(val otherParty: Party, val payload: Long) : ProtocolLogic<Unit>() {
private class PingPongFlow(val otherParty: Party, val payload: Long) : FlowLogic<Unit>() {
@Transient var receivedPayload: Long? = null
@Transient var receivedPayload2: Long? = null
@ -383,13 +383,13 @@ class StateMachineManagerTests {
@Suspendable
override fun call() {
receivedPayload = sendAndReceive<Long>(otherParty, payload).unwrap { it }
println("${psm.id} Received $receivedPayload")
println("${fsm.id} Received $receivedPayload")
receivedPayload2 = sendAndReceive<Long>(otherParty, payload + 1).unwrap { it }
println("${psm.id} Received $receivedPayload2")
println("${fsm.id} Received $receivedPayload2")
}
}
private object ExceptionProtocol : ProtocolLogic<Nothing>() {
private object ExceptionFlow : FlowLogic<Nothing>() {
override fun call(): Nothing = throw Exception()
}