Introducing versioning of flows using the FlowVersion annotation.

Core flows, which are baked into the platform, are also versioned using the platform version of the node. Several core flows, such as the data vending ones, which were provided via plugins are now instead baked into the node.
This commit is contained in:
Shams Asari 2017-04-27 12:29:41 +01:00
parent 3d401d1dcb
commit cfe5786d2d
33 changed files with 363 additions and 265 deletions

View File

@ -26,7 +26,7 @@ sealed class FlowInitiator {
/** Started when we get new session initiation request. */
data class Peer(val party: Party) : FlowInitiator()
/** Started as scheduled activity. */
class Scheduled(val scheduledState: ScheduledStateRef) : FlowInitiator()
data class Scheduled(val scheduledState: ScheduledStateRef) : FlowInitiator()
object Shell : FlowInitiator() // TODO When proper ssh access enabled, add username/use RPC?
}

View File

@ -0,0 +1,18 @@
package net.corda.core.flows
/**
* Annotation for initiating [FlowLogic]s to specify the version of their flow protocol. The version is a single integer
* [value] which increments by one whenever a release is made where the flow protocol changes in any manner which is
* backwards incompatible. This may be a change in the sequence of sends and receives between the client and service flows,
* or it could be a change in the meaning. The version is used when a flow first initiates communication with a party to
* inform them what version they are using. For this reason the annotation is not applicable for the initiated flow.
*
* This flow version integer is not the same as Corda's platform version, though it follows a similar semantic.
*
* Note: Only one version of the same flow can currently be loaded at the same time. Any session request by a client flow for
* a different version will be rejected.
*
* Defaults to a flow version of 1 if not specified.
*/
// TODO Add support for multiple versions once CorDapps are loaded in separate class loaders
annotation class FlowVersion(val value: Int)

View File

@ -190,6 +190,8 @@ interface Message {
interface ReceivedMessage : Message {
/** The authenticated sender. */
val peer: X500Name
/** Platform version of the sender's node. */
val platformVersion: Int
}
/** A singleton that's useful for validating topic strings */

View File

@ -6,7 +6,6 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
/**
* Notify the specified parties about a transaction. The remote peers will download this transaction and its
* dependency graph, verifying them all. The flow returns when all peers have acknowledged the transactions
@ -26,7 +25,7 @@ class BroadcastTransactionFlow(val notarisedTransaction: SignedTransaction,
// TODO: Messaging layer should handle this broadcast for us
val msg = NotifyTxRequest(notarisedTransaction)
participants.filter { it != serviceHub.myInfo.legalIdentity }.forEach { participant ->
// This pops out the other side in DataVending.NotifyTransactionHandler.
// This pops out the other side in NotifyTransactionHandler
send(participant, msg)
}
}

View File

@ -88,9 +88,7 @@ object NotaryChangeFlow : AbstractStateReplacementFlow() {
}
class Acceptor(otherSide: Party,
override val progressTracker: ProgressTracker = tracker()) : AbstractStateReplacementFlow.Acceptor<Party>(otherSide) {
class Acceptor(otherSide: Party) : AbstractStateReplacementFlow.Acceptor<Party>(otherSide) {
/**
* Check the notary change proposal.
*

View File

@ -2,7 +2,6 @@ package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.Party
import net.corda.core.node.PluginServiceHub
import net.corda.core.utilities.ProgressTracker
import net.corda.flows.TxKeyFlowUtilities
import java.security.PublicKey
@ -14,9 +13,6 @@ import java.security.cert.Certificate
* DoS of the node, as key generation/storage is vastly more expensive than submitting a request.
*/
object TxKeyFlow {
fun registerServiceFlow(services: PluginServiceHub) {
services.registerServiceFlow(Requester::class.java, ::Provider)
}
class Requester(val otherSide: Party,
override val progressTracker: ProgressTracker) : FlowLogic<Pair<PublicKey, Certificate?>>() {

View File

@ -32,7 +32,7 @@ class TxKeyFlowUtilitiesTests {
val bobKey: Party = bobNode.services.myInfo.legalIdentity
// Run the flows
TxKeyFlow.registerServiceFlow(bobNode.services)
bobNode.registerServiceFlow(TxKeyFlow.Requester::class) { TxKeyFlow.Provider(it) }
val requesterFlow = aliceNode.services.startFlow(TxKeyFlow.Requester(bobKey))
// Get the results

View File

@ -34,6 +34,11 @@ serialisation, etc. The node exposes the platform version it's on and we envisio
run on older versions of the platform to the one they were compiled against. Platform version borrows heavily from Android's
API Level.
Flows can now be versioned using the ``FlowVersion`` annotation, which assigns an integer version number to it. For now
this enables a node to restrict usage of a flow to a specific version. Support for multiple verisons of the same flow,
hence achieving backwards compatibility, will be possible once we start loading CorDapps in separate class loaders. Watch
this space...
Milestone 10
------------

View File

@ -27,3 +27,18 @@ for the network.
.. note:: A future release may introduce the concept of a target platform version, which would be similar to Android's
``targetSdkVersion``, and would provide a means of maintaining behavioural compatibility for the cases where the
platform's behaviour has changed.
Flow versioning
---------------
A platform which can be extended with CorDapps also requires the ability to version these apps as they evolve from
release to release. This allows users of these apps, whether they're other nodes or RPC users, to select which version
they wish to use and enables nodes to control which app versions they support. Flows have their own version numbers,
independent of other versioning, for example of the platform. In particular it is the initiating flow that can be versioned
using the ``FlowVersion`` annotation. This assigns an integer version number, similar in concept to the platform version,
which is used in the session handshake process when a flow communicates with another party for the first time. The other
party will only accept the session request if it, firstly, has that flow loaded, and secondly, for the same version (see
:doc:`flow-state-machine`).
.. note:: Currently we don't support multiple versions of the same flow loaded in the same node. This will be possible
once we start loading CorDapps in separate class loaders.

View File

@ -0,0 +1,36 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.Futures
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.testing.node.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
class FlowVersioningTest : NodeBasedTest() {
@Test
fun `core flows receive platform version of initiator`() {
val (alice, bob) = Futures.allAsList(
startNode("Alice", platformVersion = 2),
startNode("Bob", platformVersion = 3)).getOrThrow()
bob.installCoreFlow(ClientFlow::class, ::SendBackPlatformVersionFlow)
val resultFuture = alice.services.startFlow(ClientFlow(bob.info.legalIdentity)).resultFuture
assertThat(resultFuture.getOrThrow()).isEqualTo(2)
}
private open class ClientFlow(val otherParty: Party) : FlowLogic<Any>() {
@Suspendable
override fun call(): Any {
return sendAndReceive<Any>(otherParty, "This is ignored. We only send to kick off the flow on the other side").unwrap { it }
}
}
private open class SendBackPlatformVersionFlow(val otherParty: Party, val otherPartysPlatformVersion: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() = send(otherParty, otherPartysPlatformVersion)
}
}

View File

@ -10,10 +10,7 @@ import net.corda.core.contracts.Amount
import net.corda.core.contracts.PartyAndReference
import net.corda.core.crypto.Party
import net.corda.core.crypto.X509Utilities
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.*
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
@ -25,6 +22,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.debug
import net.corda.flows.*
import net.corda.node.services.api.*
import net.corda.node.services.config.FullNodeConfiguration
@ -43,6 +41,7 @@ import net.corda.node.services.persistence.*
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.statemachine.flowVersion
import net.corda.node.services.transactions.*
import net.corda.node.services.vault.CashBalanceAsMetricsObserver
import net.corda.node.services.vault.NodeVaultService
@ -63,7 +62,9 @@ import java.time.Clock
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.SECONDS
import kotlin.collections.ArrayList
import kotlin.reflect.KClass
import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
/**
@ -107,7 +108,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
// low-performance prototyping period.
protected abstract val serverThread: AffinityExecutor
private val serviceFlowFactories = ConcurrentHashMap<Class<*>, (Party) -> FlowLogic<*>>()
protected val serviceFlowFactories = ConcurrentHashMap<Class<*>, ServiceFlowInfo>()
protected val partyKeys = mutableSetOf<KeyPair>()
val services = object : ServiceHubInternal() {
@ -118,7 +119,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
override val keyManagementService: KeyManagementService get() = keyManagement
override val identityService: IdentityService get() = identity
override val schedulerService: SchedulerService get() = scheduler
override val clock: Clock = platformClock
override val clock: Clock get() = platformClock
override val myInfo: NodeInfo get() = info
override val schemaService: SchemaService get() = schemas
override val transactionVerifierService: TransactionVerifierService get() = txVerifierService
@ -133,11 +134,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
override fun registerServiceFlow(clientFlowClass: Class<out FlowLogic<*>>, serviceFlowFactory: (Party) -> FlowLogic<*>) {
require(clientFlowClass !in serviceFlowFactories) { "${clientFlowClass.name} has already been used to register a service flow" }
log.info("Registering service flow for ${clientFlowClass.name}")
serviceFlowFactories[clientFlowClass] = serviceFlowFactory
val version = clientFlowClass.flowVersion
val info = ServiceFlowInfo.CorDapp(version, serviceFlowFactory)
log.info("Registering service flow for ${clientFlowClass.name}: $info")
serviceFlowFactories[clientFlowClass] = info
}
override fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ((Party) -> FlowLogic<*>)? {
override fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ServiceFlowInfo? {
return serviceFlowFactories[clientFlowClass]
}
@ -157,7 +160,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
lateinit var vault: VaultService
lateinit var keyManagement: KeyManagementService
var inNodeNetworkMapService: NetworkMapService? = null
var inNodeNotaryService: NotaryService? = null
lateinit var txVerifierService: TransactionVerifierService
lateinit var identity: IdentityService
lateinit var net: MessagingServiceInternal
@ -224,7 +226,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
// We wait here, even though any in-flight messages should have been drained away because the
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
// arbitrary and might be inappropriate.
MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, TimeUnit.SECONDS)
MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS)
}
}
@ -235,7 +237,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
false
}
startMessagingService(rpcOps)
services.registerServiceFlow(ContractUpgradeFlow.Instigator::class.java) { ContractUpgradeFlow.Acceptor(it) }
installCoreFlows()
runOnStop += Runnable { net.stop() }
_networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured())
smm.start()
@ -247,6 +249,29 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
return this
}
/**
* @suppress
* Installs a flow that's core to the Corda platform. Unlike CorDapp flows which are versioned individually using
* [FlowVersion], core flows have the same version as the node's platform version. To cater for backwards compatibility
* [serviceFlowFactory] provides a second parameter which is the platform version of the initiating party.
*/
@VisibleForTesting
fun installCoreFlow(clientFlowClass: KClass<out FlowLogic<*>>, serviceFlowFactory: (Party, Int) -> FlowLogic<*>) {
require(!clientFlowClass.java.isAnnotationPresent(FlowVersion::class.java)) {
"${FlowVersion::class.java.name} not applicable for core flows; their version is the node's platform version"
}
serviceFlowFactories[clientFlowClass.java] = ServiceFlowInfo.Core(serviceFlowFactory)
log.debug { "Installed core flow ${clientFlowClass.java.name}" }
}
private fun installCoreFlows() {
installCoreFlow(FetchTransactionsFlow::class) { otherParty, _ -> FetchTransactionsHandler(otherParty) }
installCoreFlow(FetchAttachmentsFlow::class) { otherParty, _ -> FetchAttachmentsHandler(otherParty) }
installCoreFlow(BroadcastTransactionFlow::class) { otherParty, _ -> NotifyTransactionHandler(otherParty) }
installCoreFlow(NotaryChangeFlow.Instigator::class) { otherParty, _ -> NotaryChangeFlow.Acceptor(otherParty) }
installCoreFlow(ContractUpgradeFlow.Instigator::class) { otherParty, _ -> ContractUpgradeFlow.Acceptor(otherParty) }
}
/**
* Builds node internal, advertised, and plugin services.
* Returns a list of tokenizable services to be added to the serialisation context.
@ -369,14 +394,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
private fun makePluginServices(tokenizableServices: MutableList<Any>): List<Any> {
val pluginServices = pluginRegistries.flatMap { x -> x.servicePlugins }
val serviceList = mutableListOf<Any>()
for (serviceConstructor in pluginServices) {
val service = serviceConstructor.apply(services)
serviceList.add(service)
tokenizableServices.add(service)
}
return serviceList
val pluginServices = pluginRegistries.flatMap { it.servicePlugins }.map { it.apply(services) }
tokenizableServices.addAll(pluginServices)
return pluginServices
}
/**
@ -393,13 +413,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val notaryServiceType = serviceTypes.singleOrNull { it.isNotary() }
if (notaryServiceType != null) {
inNodeNotaryService = makeNotaryService(notaryServiceType, tokenizableServices)
makeNotaryService(notaryServiceType, tokenizableServices)
}
}
private fun registerWithNetworkMapIfConfigured(): ListenableFuture<Unit> {
services.networkMapCache.addNode(info)
// In the unit test environment, we may run without any network map service sometimes.
// In the unit test environment, we may sometimes run without any network map service
return if (networkMapAddress == null && inNodeNetworkMapService == null) {
services.networkMapCache.runWithoutMapService()
noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache
@ -448,26 +468,28 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
inNodeNetworkMapService = PersistentNetworkMapService(services, configuration.minimumPlatformVersion)
}
open protected fun makeNotaryService(type: ServiceType, tokenizableServices: MutableList<Any>): NotaryService {
open protected fun makeNotaryService(type: ServiceType, tokenizableServices: MutableList<Any>) {
val timestampChecker = TimestampChecker(platformClock, 30.seconds)
val uniquenessProvider = makeUniquenessProvider(type)
tokenizableServices.add(uniquenessProvider)
return when (type) {
SimpleNotaryService.type -> SimpleNotaryService(services, timestampChecker, uniquenessProvider)
ValidatingNotaryService.type -> ValidatingNotaryService(services, timestampChecker, uniquenessProvider)
RaftNonValidatingNotaryService.type -> RaftNonValidatingNotaryService(services, timestampChecker, uniquenessProvider as RaftUniquenessProvider)
RaftValidatingNotaryService.type -> RaftValidatingNotaryService(services, timestampChecker, uniquenessProvider as RaftUniquenessProvider)
val notaryService = when (type) {
SimpleNotaryService.type -> SimpleNotaryService(timestampChecker, uniquenessProvider)
ValidatingNotaryService.type -> ValidatingNotaryService(timestampChecker, uniquenessProvider)
RaftNonValidatingNotaryService.type -> RaftNonValidatingNotaryService(timestampChecker, uniquenessProvider as RaftUniquenessProvider)
RaftValidatingNotaryService.type -> RaftValidatingNotaryService(timestampChecker, uniquenessProvider as RaftUniquenessProvider)
BFTNonValidatingNotaryService.type -> with(configuration as FullNodeConfiguration) {
val nodeId = notaryNodeId ?: throw IllegalArgumentException("notaryNodeId value must be specified in the configuration")
val client = BFTSMaRt.Client(nodeId)
tokenizableServices.add(client)
tokenizableServices += client
BFTNonValidatingNotaryService(services, timestampChecker, nodeId, database, client)
}
else -> {
throw IllegalArgumentException("Notary type ${type.id} is not handled by makeNotaryService.")
}
}
installCoreFlow(NotaryFlow.Client::class, notaryService.serviceFlowFactory)
}
protected abstract fun makeUniquenessProvider(type: ServiceType): UniquenessProvider
@ -579,3 +601,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
configuration.baseDirectory.createDirectories()
}
}
sealed class ServiceFlowInfo {
data class Core(val factory: (Party, Int) -> FlowLogic<*>) : ServiceFlowInfo()
data class CorDapp(val version: Int, val factory: (Party) -> FlowLogic<*>) : ServiceFlowInfo()
}

View File

@ -1,23 +0,0 @@
package net.corda.node.services
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PluginServiceHub
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.flows.NotaryChangeFlow
import java.util.function.Function
object NotaryChange {
class Plugin : CordaPluginRegistry() {
override val servicePlugins = listOf(Function(::Service))
}
/**
* A service that monitors the network for requests for changing the notary of a state,
* and immediately runs the [NotaryChangeFlow] if the auto-accept criteria are met.
*/
class Service(services: PluginServiceHub) : SingletonSerializeAsToken() {
init {
services.registerServiceFlow(NotaryChangeFlow.Instigator::class.java) { NotaryChangeFlow.Acceptor(it) }
}
}
}

View File

@ -2,7 +2,6 @@ package net.corda.node.services.api
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRefFactory
@ -11,8 +10,9 @@ import net.corda.core.messaging.MessagingService
import net.corda.core.node.PluginServiceHub
import net.corda.core.node.services.TxWritableStorageService
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.ServiceFlowInfo
import net.corda.node.services.statemachine.FlowStateMachineImpl
import org.slf4j.LoggerFactory
interface MessagingServiceInternal : MessagingService {
/**
@ -37,9 +37,12 @@ interface MessagingServiceBuilder<out T : MessagingServiceInternal> {
fun start(): ListenableFuture<out T>
}
private val log = LoggerFactory.getLogger(ServiceHubInternal::class.java)
abstract class ServiceHubInternal : PluginServiceHub {
companion object {
private val log = loggerFor<ServiceHubInternal>()
}
abstract val monitoringService: MonitoringService
abstract val flowLogicRefFactory: FlowLogicRefFactory
abstract val schemaService: SchemaService
@ -99,5 +102,5 @@ abstract class ServiceHubInternal : PluginServiceHub {
return startFlow(logic, flowInitiator)
}
abstract fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ((Party) -> FlowLogic<*>)?
}
abstract fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ServiceFlowInfo?
}

View File

@ -38,13 +38,13 @@ import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import java.security.PublicKey
import java.time.Instant
import java.util.*
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.ThreadSafe
import java.security.PublicKey
// TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox
@ -296,23 +296,13 @@ class NodeMessagingClient(override val config: NodeConfiguration,
try {
val topic = message.required(topicProperty) { getStringProperty(it) }
val sessionID = message.required(sessionIdProperty) { getLongProperty(it) }
val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" }
val platformVersion = message.required(platformVersionProperty) { getIntProperty(it) }
// Use the magic deduplication property built into Artemis as our message identity too
val uuid = message.required(HDR_DUPLICATE_DETECTION_ID) { UUID.fromString(message.getStringProperty(it)) }
val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" }
log.trace { "Received message from: ${message.address} user: $user topic: $topic sessionID: $sessionID uuid: $uuid" }
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
val msg = object : ReceivedMessage {
override val topicSession = TopicSession(topic, sessionID)
override val data: ByteArray = body
override val peer: X500Name = X500Name(user)
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
override val uniqueMessageId: UUID = uuid
override fun toString() = "$topic#${data.opaque()}"
}
return msg
return ArtemisReceivedMessage(TopicSession(topic, sessionID), X500Name(user), platformVersion, uuid, message)
} catch (e: Exception) {
log.error("Unable to process message, ignoring it: $message", e)
return null
@ -324,6 +314,16 @@ class NodeMessagingClient(override val config: NodeConfiguration,
return extractor(key)
}
private class ArtemisReceivedMessage(override val topicSession: TopicSession,
override val peer: X500Name,
override val platformVersion: Int,
override val uniqueMessageId: UUID,
private val message: ClientMessage) : ReceivedMessage {
override val data: ByteArray by lazy { ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } }
override val debugTimestamp: Instant get() = Instant.ofEpochMilli(message.timestamp)
override fun toString() = "${topicSession.topic}#${data.opaque()}"
}
private fun deliver(msg: ReceivedMessage): Boolean {
state.checkNotLocked()
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added

View File

@ -5,83 +5,61 @@ import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PluginServiceHub
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.unwrap
import net.corda.flows.*
import java.util.function.Function
import javax.annotation.concurrent.ThreadSafe
object DataVending {
class Plugin : CordaPluginRegistry() {
override val servicePlugins = listOf(Function(::Service))
/**
* This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
* glue that sits between the network layer and the database layer.
*
* Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There
* are no access control lists. If you want to keep some data private, then you must be careful who you give its name
* to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have
* its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as
* such the hash of a piece of data can be seen as a type of password allowing access to it.
*
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
*/
class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTransaction>(otherParty) {
override fun getData(id: SecureHash): SignedTransaction? {
return serviceHub.storageService.validatedTransactions.getTransaction(id)
}
}
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
override fun getData(id: SecureHash): ByteArray? {
return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes()
}
}
abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
@Throws(FetchDataFlow.HashNotFound::class)
override fun call() {
val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
if (it.hashes.isEmpty()) throw FlowException("Empty hash list")
it
}
val response = request.hashes.map {
getData(it) ?: throw FetchDataFlow.HashNotFound(it)
}
send(otherParty, response)
}
protected abstract fun getData(id: SecureHash): T?
}
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
class NotifyTransactionHandler(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val request = receive<BroadcastTransactionFlow.NotifyTxRequest>(otherParty).unwrap { it }
subFlow(ResolveTransactionsFlow(request.tx, otherParty), shareParentSessions = true)
serviceHub.recordTransactions(request.tx)
}
/**
* This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
* glue that sits between the network layer and the database layer.
*
* Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There
* are no access control lists. If you want to keep some data private, then you must be careful who you give its name
* to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have
* its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as
* such the hash of a piece of data can be seen as a type of password allowing access to it.
*
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
*/
@ThreadSafe
class Service(services: PluginServiceHub) : SingletonSerializeAsToken() {
init {
services.registerServiceFlow(FetchTransactionsFlow::class.java, ::FetchTransactionsHandler)
services.registerServiceFlow(FetchAttachmentsFlow::class.java, ::FetchAttachmentsHandler)
services.registerServiceFlow(BroadcastTransactionFlow::class.java, ::NotifyTransactionHandler)
}
private class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTransaction>(otherParty) {
override fun getData(id: SecureHash): SignedTransaction? {
return serviceHub.storageService.validatedTransactions.getTransaction(id)
}
}
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
private class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
override fun getData(id: SecureHash): ByteArray? {
return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes()
}
}
private abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
@Throws(FetchDataFlow.HashNotFound::class)
override fun call() {
val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
if (it.hashes.isEmpty()) throw FlowException("Empty hash list")
it
}
val response = request.hashes.map {
getData(it) ?: throw FetchDataFlow.HashNotFound(it)
}
send(otherParty, response)
}
protected abstract fun getData(id: SecureHash): T?
}
// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
// includes us in any outside that list. Potentially just if it includes any outside that list at all.
// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
// cash without from unknown parties?
class NotifyTransactionHandler(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val request = receive<BroadcastTransactionFlow.NotifyTxRequest>(otherParty).unwrap { it }
subFlow(ResolveTransactionsFlow(request.tx, otherParty), shareParentSessions = true)
serviceHub.recordTransactions(request.tx)
}
}
}
}

View File

@ -298,7 +298,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
openSessions[Pair(sessionFlow, otherParty)] = session
// We get the top-most concrete class object to cater for the case where the client flow is customised via a sub-class
val clientFlowClass = sessionFlow.topConcreteFlowClass
val sessionInit = SessionInit(session.ourSessionId, clientFlowClass, firstPayload)
val sessionInit = SessionInit(session.ourSessionId, clientFlowClass, clientFlowClass.flowVersion, firstPayload)
sendInternal(session, sessionInit)
if (waitForConfirmation) {
session.waitForConfirmation()
@ -434,6 +434,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
}
val Class<out FlowLogic<*>>.flowVersion: Int get() {
val flowVersion = getDeclaredAnnotation(FlowVersion::class.java) ?: return 1
require(flowVersion.value > 0) { "Flow versions have to be greater or equal to 1" }
return flowVersion.value
}
// I would prefer for [FlowProgressHandleImpl] to extend [FlowHandleImpl],
// but Kotlin doesn't allow this for data classes, not even to create
// another data class!

View File

@ -15,6 +15,7 @@ interface SessionMessage
data class SessionInit(val initiatorSessionId: Long,
val clientFlowClass: Class<out FlowLogic<*>>,
val flowVerison: Int,
val firstPayload: Any?) : SessionMessage
interface ExistingSessionMessage : SessionMessage {

View File

@ -13,9 +13,7 @@ import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.collect.HashMultimap
import com.google.common.util.concurrent.ListenableFuture
import io.requery.util.CloseableIterator
import net.corda.core.ErrorOr
import net.corda.core.ThreadBox
import net.corda.core.bufferUntilSubscribed
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.commonName
@ -23,12 +21,11 @@ import net.corda.core.flows.*
import net.corda.core.messaging.ReceivedMessage
import net.corda.core.messaging.TopicSession
import net.corda.core.messaging.send
import net.corda.core.random63BitValue
import net.corda.core.serialization.*
import net.corda.core.then
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.internal.ServiceFlowInfo
import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
@ -151,7 +148,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private val recentlyClosedSessions = ConcurrentHashMap<Long, Party>()
// Context for tokenized services in checkpoints
private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryo(), serviceHub)
private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryoPool, serviceHub)
/** Returns a list of all state machines executing the given flow logic at the top level (subflows do not count) */
fun <P : FlowLogic<T>, T> findStateMachines(flowClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
@ -289,7 +286,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
if (sender != null) {
when (sessionMessage) {
is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, sender)
is SessionInit -> onSessionInit(sessionMessage, sender)
is SessionInit -> onSessionInit(sessionMessage, message, sender)
}
} else {
logger.error("Unknown peer ${message.peer} in $sessionMessage")
@ -335,21 +332,38 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
waitingForResponse is WaitForLedgerCommit && message is ErrorSessionEnd
}
private fun onSessionInit(sessionInit: SessionInit, sender: Party) {
private fun onSessionInit(sessionInit: SessionInit, receivedMessage: ReceivedMessage, sender: Party) {
logger.trace { "Received $sessionInit from $sender" }
val otherPartySessionId = sessionInit.initiatorSessionId
fun sendSessionReject(message: String) = sendSessionMessage(sender, SessionReject(otherPartySessionId, message))
val flowFactory = serviceHub.getServiceFlowFactory(sessionInit.clientFlowClass)
if (flowFactory == null) {
val serviceFlowInfo = serviceHub.getServiceFlowFactory(sessionInit.clientFlowClass)
if (serviceFlowInfo == null) {
logger.warn("${sessionInit.clientFlowClass} has not been registered with a service flow: $sessionInit")
sendSessionReject("Don't know ${sessionInit.clientFlowClass.name}")
return
}
val session = try {
val flow = flowFactory(sender)
val flow = when (serviceFlowInfo) {
is ServiceFlowInfo.CorDapp -> {
// TODO Add support for multiple versions of the same flow when CorDapps are loaded in separate class loaders
if (sessionInit.flowVerison != serviceFlowInfo.version) {
logger.warn("Version mismatch - ${sessionInit.clientFlowClass} is only registered for version " +
"${serviceFlowInfo.version}: $sessionInit")
sendSessionReject("Version not supported")
return
}
serviceFlowInfo.factory(sender)
}
is ServiceFlowInfo.Core -> serviceFlowInfo.factory(sender, receivedMessage.platformVersion)
}
if (flow.javaClass.isAnnotationPresent(FlowVersion::class.java)) {
logger.warn("${FlowVersion::class.java.name} is not applicable for service flows: ${flow.javaClass.name}")
}
val fiber = createFiber(flow, FlowInitiator.Peer(sender))
val session = FlowSession(flow, random63BitValue(), sender, FlowSessionState.Initiated(sender, otherPartySessionId))
if (sessionInit.firstPayload != null) {
@ -372,7 +386,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes<FlowStateMachineImpl<*>> {
return quasarKryo().run { kryo ->
return quasarKryoPool.run { kryo ->
// add the map of tokens -> tokenizedServices to the kyro context
kryo.withSerializationContext(serializationContext) {
fiber.serialize(kryo)
@ -381,7 +395,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
private fun deserializeFiber(checkpoint: Checkpoint): FlowStateMachineImpl<*> {
return quasarKryo().run { kryo ->
return quasarKryoPool.run { kryo ->
// put the map of token -> tokenized into the kryo context
kryo.withSerializationContext(serializationContext) {
checkpoint.serializedFiber.deserialize(kryo)
@ -389,8 +403,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun quasarKryo(): KryoPool = quasarKryoPool
private fun <T> createFiber(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T> {
val id = StateMachineRunId.createRandom()
return FlowStateMachineImpl(id, logic, scheduler, flowInitiator).apply { initFiber(this) }

View File

@ -25,7 +25,7 @@ class BFTNonValidatingNotaryService(services: ServiceHubInternal,
timestampChecker: TimestampChecker,
serverId: Int,
db: Database,
val client: BFTSMaRt.Client) : NotaryService(services) {
val client: BFTSMaRt.Client) : NotaryService {
init {
thread(name = "BFTSmartServer-$serverId", isDaemon = true) {
Server(serverId, db, "bft_smart_notary_committed_states", services, timestampChecker)
@ -37,9 +37,11 @@ class BFTNonValidatingNotaryService(services: ServiceHubInternal,
private val log = loggerFor<BFTNonValidatingNotaryService>()
}
override fun createFlow(otherParty: Party) = ServiceFlow(otherParty, client)
override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
ServiceFlow(otherParty, client)
}
class ServiceFlow(val otherSide: Party, val client: BFTSMaRt.Client) : FlowLogic<Void?>() {
private class ServiceFlow(val otherSide: Party, val client: BFTSMaRt.Client) : FlowLogic<Void?>() {
@Suspendable
override fun call(): Void? {
val stx = receive<FilteredTransaction>(otherSide).unwrap { it }
@ -60,11 +62,11 @@ class BFTNonValidatingNotaryService(services: ServiceHubInternal,
}
}
class Server(id: Int,
db: Database,
tableName: String,
services: ServiceHubInternal,
timestampChecker: TimestampChecker) : BFTSMaRt.Server(id, db, tableName, services, timestampChecker) {
private class Server(id: Int,
db: Database,
tableName: String,
services: ServiceHubInternal,
timestampChecker: TimestampChecker) : BFTSMaRt.Server(id, db, tableName, services, timestampChecker) {
override fun executeCommand(command: ByteArray): ByteArray {
val request = command.deserialize<BFTSMaRt.CommitRequest>()

View File

@ -2,26 +2,13 @@ package net.corda.node.services.transactions
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.flows.NotaryFlow
import net.corda.node.services.api.ServiceHubInternal
/**
* A Notary service acts as the final signer of a transaction ensuring two things:
* - The (optional) timestamp of the transaction is valid.
* - None of the referenced input states have previously been consumed by a transaction signed by this Notary
*O
* A transaction has to be signed by a Notary to be considered valid (except for output-only transactions without a timestamp).
*
* This is the base implementation that can be customised with specific Notary transaction commit flow.
*/
abstract class NotaryService(services: ServiceHubInternal) : SingletonSerializeAsToken() {
init {
services.registerServiceFlow(NotaryFlow.Client::class.java) { createFlow(it) }
}
/** Implement a factory that specifies the transaction commit flow for the notary service to use */
abstract fun createFlow(otherParty: Party): FlowLogic<Void?>
interface NotaryService {
/**
* Factory for producing notary service flows which have the corresponding sends and receives as NotaryFlow.Client.
* The first parameter is the client [Party] making the request and the second is the platform version of the client's
* node. Use this version parameter to provide backwards compatibility if the notary flow protocol changes.
*/
val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?>
}

View File

@ -1,19 +1,18 @@
package net.corda.node.services.transactions
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.node.services.TimestampChecker
import net.corda.flows.NonValidatingNotaryFlow
import net.corda.node.services.api.ServiceHubInternal
/** A non-validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
class RaftNonValidatingNotaryService(services: ServiceHubInternal,
val timestampChecker: TimestampChecker,
val uniquenessProvider: RaftUniquenessProvider) : NotaryService(services) {
class RaftNonValidatingNotaryService(val timestampChecker: TimestampChecker,
val uniquenessProvider: RaftUniquenessProvider) : NotaryService {
companion object {
val type = SimpleNotaryService.type.getSubType("raft")
}
override fun createFlow(otherParty: Party): NonValidatingNotaryFlow {
return NonValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
NonValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
}
}

View File

@ -1,19 +1,18 @@
package net.corda.node.services.transactions
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.node.services.TimestampChecker
import net.corda.flows.ValidatingNotaryFlow
import net.corda.node.services.api.ServiceHubInternal
/** A validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
class RaftValidatingNotaryService(services: ServiceHubInternal,
val timestampChecker: TimestampChecker,
val uniquenessProvider: RaftUniquenessProvider) : NotaryService(services) {
class RaftValidatingNotaryService(val timestampChecker: TimestampChecker,
val uniquenessProvider: RaftUniquenessProvider) : NotaryService {
companion object {
val type = ValidatingNotaryService.type.getSubType("raft")
}
override fun createFlow(otherParty: Party): ValidatingNotaryFlow {
return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
}
}

View File

@ -1,22 +1,20 @@
package net.corda.node.services.transactions
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.TimestampChecker
import net.corda.core.node.services.UniquenessProvider
import net.corda.flows.NonValidatingNotaryFlow
import net.corda.flows.NotaryFlow
import net.corda.node.services.api.ServiceHubInternal
/** A simple Notary service that does not perform transaction validation */
class SimpleNotaryService(services: ServiceHubInternal,
val timestampChecker: TimestampChecker,
val uniquenessProvider: UniquenessProvider) : NotaryService(services) {
class SimpleNotaryService(val timestampChecker: TimestampChecker,
val uniquenessProvider: UniquenessProvider) : NotaryService {
companion object {
val type = ServiceType.notary.getSubType("simple")
}
override fun createFlow(otherParty: Party): NotaryFlow.Service {
return NonValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
NonValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
}
}

View File

@ -1,21 +1,20 @@
package net.corda.node.services.transactions
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.TimestampChecker
import net.corda.core.node.services.UniquenessProvider
import net.corda.flows.ValidatingNotaryFlow
import net.corda.node.services.api.ServiceHubInternal
/** A Notary service that validates the transaction chain of the submitted transaction before committing it */
class ValidatingNotaryService(services: ServiceHubInternal,
val timestampChecker: TimestampChecker,
val uniquenessProvider: UniquenessProvider) : NotaryService(services) {
class ValidatingNotaryService(val timestampChecker: TimestampChecker,
val uniquenessProvider: UniquenessProvider) : NotaryService {
companion object {
val type = ServiceType.notary.getSubType("validating")
}
override fun createFlow(otherParty: Party): ValidatingNotaryFlow {
return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
}
}

View File

@ -1,3 +0,0 @@
# Register a ServiceLoader service extending from net.corda.core.node.CordaPluginRegistry
net.corda.node.services.NotaryChange$Plugin
net.corda.node.services.persistence.DataVending$Plugin

View File

@ -9,12 +9,12 @@ import net.corda.core.flows.FlowStateMachine
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.*
import net.corda.core.transactions.SignedTransaction
import net.corda.node.internal.ServiceFlowInfo
import net.corda.node.serialization.NodeClock
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.persistence.DataVending
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
@ -69,14 +69,6 @@ open class MockServiceHubInternal(
lateinit var smm: StateMachineManager
init {
if (net != null && storage != null) {
// Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener
// on the networking service, so that will keep it from being collected.
DataVending.Service(this)
}
}
override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(txStorageService, txs)
override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachine<T> {
@ -85,5 +77,5 @@ open class MockServiceHubInternal(
override fun registerServiceFlow(clientFlowClass: Class<out FlowLogic<*>>, serviceFlowFactory: (Party) -> FlowLogic<*>) = Unit
override fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ((Party) -> FlowLogic<*>)? = null
override fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ServiceFlowInfo? = null
}

View File

@ -12,7 +12,6 @@ import net.corda.core.node.services.unconsumedStates
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.flows.BroadcastTransactionFlow.NotifyTxRequest
import net.corda.node.services.persistence.DataVending.Service.NotifyTransactionHandler
import net.corda.node.utilities.transaction
import net.corda.testing.MEGA_CORP
import net.corda.testing.node.MockNetwork
@ -89,7 +88,7 @@ class DataVendingServiceTests {
}
private fun MockNode.sendNotifyTx(tx: SignedTransaction, walletServiceNode: MockNode) {
walletServiceNode.services.registerServiceFlow(NotifyTxFlow::class.java, ::NotifyTransactionHandler)
walletServiceNode.registerServiceFlow(clientFlowClass = NotifyTxFlow::class, serviceFlowFactory = ::NotifyTransactionHandler)
services.startFlow(NotifyTxFlow(walletServiceNode.info.legalIdentity, tx))
network.runNetwork()
}

View File

@ -11,6 +11,7 @@ import net.corda.core.crypto.Party
import net.corda.core.crypto.generateKeyPair
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowVersion
import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.ServiceInfo
@ -110,7 +111,7 @@ class StateMachineManagerTests {
@Test
fun `exception while fiber suspended`() {
node2.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow("Hello", it) }
node2.registerServiceFlow(ReceiveFlow::class) { SendFlow("Hello", it) }
val flow = ReceiveFlow(node2.info.legalIdentity)
val fiber = node1.services.startFlow(flow) as FlowStateMachineImpl
// Before the flow runs change the suspend action to throw an exception
@ -129,7 +130,7 @@ class StateMachineManagerTests {
@Test
fun `flow restarted just after receiving payload`() {
node2.services.registerServiceFlow(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() }
node2.registerServiceFlow(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
node1.services.startFlow(SendFlow("Hello", node2.info.legalIdentity))
// We push through just enough messages to get only the payload sent
@ -179,7 +180,7 @@ class StateMachineManagerTests {
@Test
fun `flow loaded from checkpoint will respond to messages from before start`() {
node1.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow("Hello", it) }
node1.registerServiceFlow(ReceiveFlow::class) { SendFlow("Hello", it) }
node2.services.startFlow(ReceiveFlow(node1.info.legalIdentity).nonTerminating()) // Prepare checkpointed receive flow
// Make sure the add() has finished initial processing.
node2.smm.executor.flush()
@ -243,8 +244,8 @@ class StateMachineManagerTests {
fun `sending to multiple parties`() {
val node3 = net.createNode(node1.info.address)
net.runNetwork()
node2.services.registerServiceFlow(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() }
node3.services.registerServiceFlow(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() }
node2.registerServiceFlow(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
node3.registerServiceFlow(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
val payload = "Hello World"
node1.services.startFlow(SendFlow(payload, node2.info.legalIdentity, node3.info.legalIdentity))
net.runNetwork()
@ -254,14 +255,14 @@ class StateMachineManagerTests {
assertThat(node3Flow.receivedPayloads[0]).isEqualTo(payload)
assertSessionTransfers(node2,
node1 sent sessionInit(SendFlow::class, payload) to node2,
node1 sent sessionInit(SendFlow::class, 1, payload) to node2,
node2 sent sessionConfirm to node1,
node1 sent normalEnd to node2
//There's no session end from the other flows as they're manually suspended
)
assertSessionTransfers(node3,
node1 sent sessionInit(SendFlow::class, payload) to node3,
node1 sent sessionInit(SendFlow::class, 1, payload) to node3,
node3 sent sessionConfirm to node1,
node1 sent normalEnd to node3
//There's no session end from the other flows as they're manually suspended
@ -277,8 +278,8 @@ class StateMachineManagerTests {
net.runNetwork()
val node2Payload = "Test 1"
val node3Payload = "Test 2"
node2.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow(node2Payload, it) }
node3.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow(node3Payload, it) }
node2.registerServiceFlow(ReceiveFlow::class) { SendFlow(node2Payload, it) }
node3.registerServiceFlow(ReceiveFlow::class) { SendFlow(node3Payload, it) }
val multiReceiveFlow = ReceiveFlow(node2.info.legalIdentity, node3.info.legalIdentity).nonTerminating()
node1.services.startFlow(multiReceiveFlow)
node1.acceptableLiveFiberCountOnStop = 1
@ -303,12 +304,12 @@ class StateMachineManagerTests {
@Test
fun `both sides do a send as their first IO request`() {
node2.services.registerServiceFlow(PingPongFlow::class.java) { PingPongFlow(it, 20L) }
node2.registerServiceFlow(PingPongFlow::class) { PingPongFlow(it, 20L) }
node1.services.startFlow(PingPongFlow(node2.info.legalIdentity, 10L))
net.runNetwork()
assertSessionTransfers(
node1 sent sessionInit(PingPongFlow::class, 10L) to node2,
node1 sent sessionInit(PingPongFlow::class, 1, 10L) to node2,
node2 sent sessionConfirm to node1,
node2 sent sessionData(20L) to node1,
node1 sent sessionData(11L) to node2,
@ -374,7 +375,7 @@ class StateMachineManagerTests {
@Test
fun `other side ends before doing expected send`() {
node2.services.registerServiceFlow(ReceiveFlow::class.java) { NoOpFlow() }
node2.registerServiceFlow(ReceiveFlow::class) { NoOpFlow() }
val resultFuture = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity)).resultFuture
net.runNetwork()
assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy {
@ -534,7 +535,7 @@ class StateMachineManagerTests {
}
}
node2.services.registerServiceFlow(AskForExceptionFlow::class.java) { ConditionalExceptionFlow(it, "Hello") }
node2.registerServiceFlow(AskForExceptionFlow::class) { ConditionalExceptionFlow(it, "Hello") }
val resultFuture = node1.services.startFlow(RetryOnExceptionFlow(node2.info.legalIdentity)).resultFuture
net.runNetwork()
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello")
@ -562,7 +563,7 @@ class StateMachineManagerTests {
ptx.signWith(node1.services.legalIdentityKey)
val stx = ptx.toSignedTransaction()
node1.services.registerServiceFlow(WaitingFlows.Waiter::class.java) {
node1.registerServiceFlow(WaitingFlows.Waiter::class) {
WaitingFlows.Committer(it) { throw Exception("Error") }
}
val waiter = node2.services.startFlow(WaitingFlows.Waiter(stx, node1.info.legalIdentity)).resultFuture
@ -587,6 +588,31 @@ class StateMachineManagerTests {
assertThat(receiveFlowFuture.getOrThrow().receivedPayloads).containsOnly("Hello")
}
@Test
fun `upgraded flow`() {
node1.services.startFlow(UpgradedFlow(node2.info.legalIdentity))
net.runNetwork()
assertThat(sessionTransfers).startsWith(
node1 sent sessionInit(UpgradedFlow::class, 2) to node2
)
}
@Test
fun `unsupported new flow version`() {
node2.registerServiceFlow(UpgradedFlow::class, flowVersion = 1) { SendFlow("Hello", it) }
val result = node1.services.startFlow(UpgradedFlow(node2.info.legalIdentity)).resultFuture
net.runNetwork()
assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy {
result.getOrThrow()
}.withMessageContaining("Version")
}
@FlowVersion(2)
private class UpgradedFlow(val otherParty: Party) : FlowLogic<Any>() {
@Suspendable
override fun call(): Any = receive<Any>(otherParty).unwrap { it }
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////
//region Helpers
@ -605,8 +631,8 @@ class StateMachineManagerTests {
return smm.findStateMachines(P::class.java).single()
}
private fun sessionInit(clientFlowClass: KClass<out FlowLogic<*>>, payload: Any? = null): SessionInit {
return SessionInit(0, clientFlowClass.java, payload)
private fun sessionInit(clientFlowClass: KClass<out FlowLogic<*>>, flowVersion: Int = 1, payload: Any? = null): SessionInit {
return SessionInit(0, clientFlowClass.java, flowVersion, payload)
}
private val sessionConfirm = SessionConfirm(0, 0)
private fun sessionData(payload: Any) = SessionData(0, payload)

View File

@ -30,7 +30,7 @@ class TraderDemoTest : NodeBasedTest() {
startNode(DUMMY_BANK_A.name, rpcUsers = demoUser),
startNode(DUMMY_BANK_B.name, rpcUsers = demoUser),
startNode(BOC.name, rpcUsers = listOf(user)),
startNode(DUMMY_NOTARY.name, setOf(ServiceInfo(SimpleNotaryService.type)))
startNode(DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
).getOrThrow()
val (nodeARpc, nodeBRpc) = listOf(nodeA, nodeB).map {

View File

@ -145,9 +145,9 @@ fun getFreeLocalPorts(hostName: String, numberToAlloc: Int): List<HostAndPort> {
*/
inline fun <reified P : FlowLogic<*>> AbstractNode.initiateSingleShotFlow(
clientFlowClass: KClass<out FlowLogic<*>>,
noinline flowFactory: (Party) -> P): ListenableFuture<P> {
noinline serviceFlowFactory: (Party) -> P): ListenableFuture<P> {
val future = smm.changes.filter { it is StateMachineManager.Change.Add && it.logic is P }.map { it.logic as P }.toFuture()
services.registerServiceFlow(clientFlowClass.java, flowFactory)
services.registerServiceFlow(clientFlowClass.java, serviceFlowFactory)
return future
}

View File

@ -272,12 +272,20 @@ class InMemoryMessagingNetwork(
}
@CordaSerializable
private data class InMemoryMessage(override val topicSession: TopicSession, override val data: ByteArray, override val uniqueMessageId: UUID, override val debugTimestamp: Instant = Instant.now()) : Message {
private data class InMemoryMessage(override val topicSession: TopicSession,
override val data: ByteArray,
override val uniqueMessageId: UUID,
override val debugTimestamp: Instant = Instant.now()) : Message {
override fun toString() = "$topicSession#${String(data)}"
}
@CordaSerializable
private data class InMemoryReceivedMessage(override val topicSession: TopicSession, override val data: ByteArray, override val uniqueMessageId: UUID, override val debugTimestamp: Instant, override val peer: X500Name) : ReceivedMessage
private data class InMemoryReceivedMessage(override val topicSession: TopicSession,
override val data: ByteArray,
override val platformVersion: Int,
override val uniqueMessageId: UUID,
override val debugTimestamp: Instant,
override val peer: X500Name) : ReceivedMessage
/**
* An [InMemoryMessaging] provides a [MessagingService] that isn't backed by any kind of network or disk storage
@ -453,6 +461,9 @@ class InMemoryMessagingNetwork(
private fun MessageTransfer.toReceivedMessage(): ReceivedMessage = InMemoryReceivedMessage(
message.topicSession,
message.data.copyOf(), // Kryo messes with the buffer so give each client a unique copy
message.uniqueMessageId, message.debugTimestamp, X509Utilities.getDevX509Name(sender.description))
1,
message.uniqueMessageId,
message.debugTimestamp,
X509Utilities.getDevX509Name(sender.description))
}
}

View File

@ -1,5 +1,6 @@
package net.corda.testing.node
import com.google.common.annotations.VisibleForTesting
import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs
import com.google.common.util.concurrent.Futures
@ -7,6 +8,7 @@ import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.crypto.entropyToKeyPair
import net.corda.core.flows.FlowLogic
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.CordaPluginRegistry
@ -16,11 +18,13 @@ import net.corda.core.node.services.*
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.ServiceFlowInfo
import net.corda.node.services.api.MessagingServiceInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.node.services.network.InMemoryNetworkMapService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.statemachine.flowVersion
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.transactions.InMemoryUniquenessProvider
import net.corda.node.services.transactions.SimpleNotaryService
@ -38,6 +42,7 @@ import java.security.KeyPair
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.reflect.KClass
/**
* A mock node brings up a suite of in-memory services in a fast manner suitable for unit testing.
@ -224,6 +229,13 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
// It is used from the network visualiser tool.
@Suppress("unused") val place: PhysicalLocation get() = findMyLocation()!!
@VisibleForTesting
fun registerServiceFlow(clientFlowClass: KClass<out FlowLogic<*>>,
flowVersion: Int = clientFlowClass.java.flowVersion,
serviceFlowFactory: (Party) -> FlowLogic<*>) {
serviceFlowFactories[clientFlowClass.java] = ServiceFlowInfo.CorDapp(flowVersion, serviceFlowFactory)
}
fun pumpReceive(block: Boolean = false): InMemoryMessagingNetwork.MessageTransfer? {
return (net as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(block)
}

View File

@ -60,21 +60,24 @@ abstract class NodeBasedTest {
* will automatically be started with the default parameters.
*/
fun startNetworkMapNode(legalName: String = DUMMY_MAP.name,
platformVersion: Int = 1,
advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): Node {
check(_networkMapNode == null)
return startNodeInternal(legalName, advertisedServices, rpcUsers, configOverrides).apply {
return startNodeInternal(legalName, platformVersion, advertisedServices, rpcUsers, configOverrides).apply {
_networkMapNode = this
}
}
fun startNode(legalName: String,
platformVersion: Int = 1,
advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): ListenableFuture<Node> {
val node = startNodeInternal(
legalName,
platformVersion,
advertisedServices,
rpcUsers,
mapOf(
@ -118,6 +121,7 @@ abstract class NodeBasedTest {
}
private fun startNodeInternal(legalName: String,
platformVersion: Int,
advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>,
configOverrides: Map<String, Any>): Node {
@ -141,7 +145,7 @@ abstract class NodeBasedTest {
) + configOverrides
)
val node = config.parseAs<FullNodeConfiguration>().createNode(MOCK_VERSION_INFO)
val node = config.parseAs<FullNodeConfiguration>().createNode(MOCK_VERSION_INFO.copy(platformVersion = platformVersion))
node.start()
nodes += node
thread(name = legalName) {