From cfe5786d2d0c8ed4956de28b9f3d1f0f5da5d96b Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Thu, 27 Apr 2017 12:29:41 +0100 Subject: [PATCH] Introducing versioning of flows using the FlowVersion annotation. Core flows, which are baked into the platform, are also versioned using the platform version of the node. Several core flows, such as the data vending ones, which were provided via plugins are now instead baked into the node. --- .../net/corda/core/flows/FlowStateMachine.kt | 2 +- .../net/corda/core/flows/FlowVersion.kt | 18 +++ .../net/corda/core/messaging/Messaging.kt | 2 + .../corda/flows/BroadcastTransactionFlow.kt | 3 +- .../net/corda/flows/NotaryChangeFlow.kt | 4 +- .../kotlin/net/corda/core/flows/TxKeyFlow.kt | 4 - .../core/flows/TxKeyFlowUtilitiesTests.kt | 2 +- docs/source/release-notes.rst | 5 + docs/source/versioning.rst | 15 +++ .../statemachine/FlowVersioningTest.kt | 36 +++++ .../net/corda/node/internal/AbstractNode.kt | 87 +++++++----- .../node/services/NotaryChangeService.kt | 23 ---- .../node/services/api/ServiceHubInternal.kt | 13 +- .../services/messaging/NodeMessagingClient.kt | 28 ++-- .../persistence/DataVendingService.kt | 126 ++++++++---------- .../statemachine/FlowStateMachineImpl.kt | 8 +- .../services/statemachine/SessionMessage.kt | 1 + .../statemachine/StateMachineManager.kt | 42 +++--- .../BFTNonValidatingNotaryService.kt | 18 +-- .../services/transactions/NotaryService.kt | 27 +--- .../RaftNonValidatingNotaryService.kt | 11 +- .../RaftValidatingNotaryService.kt | 11 +- .../transactions/SimpleNotaryService.kt | 12 +- .../transactions/ValidatingNotaryService.kt | 11 +- .../net.corda.core.node.CordaPluginRegistry | 3 - .../node/services/MockServiceHubInternal.kt | 12 +- .../persistence/DataVendingServiceTests.kt | 3 +- .../statemachine/StateMachineManagerTests.kt | 58 +++++--- .../net/corda/traderdemo/TraderDemoTest.kt | 2 +- .../kotlin/net/corda/testing/CoreTestUtils.kt | 4 +- .../testing/node/InMemoryMessagingNetwork.kt | 17 ++- .../kotlin/net/corda/testing/node/MockNode.kt | 12 ++ .../net/corda/testing/node/NodeBasedTest.kt | 8 +- 33 files changed, 363 insertions(+), 265 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/flows/FlowVersion.kt create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt delete mode 100644 node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt delete mode 100644 node/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt index b23a79c125..171442be2c 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt @@ -26,7 +26,7 @@ sealed class FlowInitiator { /** Started when we get new session initiation request. */ data class Peer(val party: Party) : FlowInitiator() /** Started as scheduled activity. */ - class Scheduled(val scheduledState: ScheduledStateRef) : FlowInitiator() + data class Scheduled(val scheduledState: ScheduledStateRef) : FlowInitiator() object Shell : FlowInitiator() // TODO When proper ssh access enabled, add username/use RPC? } diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowVersion.kt b/core/src/main/kotlin/net/corda/core/flows/FlowVersion.kt new file mode 100644 index 0000000000..a953aa01eb --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/flows/FlowVersion.kt @@ -0,0 +1,18 @@ +package net.corda.core.flows + +/** + * Annotation for initiating [FlowLogic]s to specify the version of their flow protocol. The version is a single integer + * [value] which increments by one whenever a release is made where the flow protocol changes in any manner which is + * backwards incompatible. This may be a change in the sequence of sends and receives between the client and service flows, + * or it could be a change in the meaning. The version is used when a flow first initiates communication with a party to + * inform them what version they are using. For this reason the annotation is not applicable for the initiated flow. + * + * This flow version integer is not the same as Corda's platform version, though it follows a similar semantic. + * + * Note: Only one version of the same flow can currently be loaded at the same time. Any session request by a client flow for + * a different version will be rejected. + * + * Defaults to a flow version of 1 if not specified. + */ +// TODO Add support for multiple versions once CorDapps are loaded in separate class loaders +annotation class FlowVersion(val value: Int) diff --git a/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt b/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt index 2e0224a6ad..ea948ca162 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt @@ -190,6 +190,8 @@ interface Message { interface ReceivedMessage : Message { /** The authenticated sender. */ val peer: X500Name + /** Platform version of the sender's node. */ + val platformVersion: Int } /** A singleton that's useful for validating topic strings */ diff --git a/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt b/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt index bc2528bfb3..c5b3db232b 100644 --- a/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt @@ -6,7 +6,6 @@ import net.corda.core.flows.FlowLogic import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction - /** * Notify the specified parties about a transaction. The remote peers will download this transaction and its * dependency graph, verifying them all. The flow returns when all peers have acknowledged the transactions @@ -26,7 +25,7 @@ class BroadcastTransactionFlow(val notarisedTransaction: SignedTransaction, // TODO: Messaging layer should handle this broadcast for us val msg = NotifyTxRequest(notarisedTransaction) participants.filter { it != serviceHub.myInfo.legalIdentity }.forEach { participant -> - // This pops out the other side in DataVending.NotifyTransactionHandler. + // This pops out the other side in NotifyTransactionHandler send(participant, msg) } } diff --git a/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt b/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt index af2fcde85d..c42e0038f8 100644 --- a/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt @@ -88,9 +88,7 @@ object NotaryChangeFlow : AbstractStateReplacementFlow() { } - class Acceptor(otherSide: Party, - override val progressTracker: ProgressTracker = tracker()) : AbstractStateReplacementFlow.Acceptor(otherSide) { - + class Acceptor(otherSide: Party) : AbstractStateReplacementFlow.Acceptor(otherSide) { /** * Check the notary change proposal. * diff --git a/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt b/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt index c2b21333d3..fe45b2d27e 100644 --- a/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt +++ b/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt @@ -2,7 +2,6 @@ package net.corda.core.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.crypto.Party -import net.corda.core.node.PluginServiceHub import net.corda.core.utilities.ProgressTracker import net.corda.flows.TxKeyFlowUtilities import java.security.PublicKey @@ -14,9 +13,6 @@ import java.security.cert.Certificate * DoS of the node, as key generation/storage is vastly more expensive than submitting a request. */ object TxKeyFlow { - fun registerServiceFlow(services: PluginServiceHub) { - services.registerServiceFlow(Requester::class.java, ::Provider) - } class Requester(val otherSide: Party, override val progressTracker: ProgressTracker) : FlowLogic>() { diff --git a/core/src/test/kotlin/net/corda/core/flows/TxKeyFlowUtilitiesTests.kt b/core/src/test/kotlin/net/corda/core/flows/TxKeyFlowUtilitiesTests.kt index 69d5242e6c..40595a0b55 100644 --- a/core/src/test/kotlin/net/corda/core/flows/TxKeyFlowUtilitiesTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/TxKeyFlowUtilitiesTests.kt @@ -32,7 +32,7 @@ class TxKeyFlowUtilitiesTests { val bobKey: Party = bobNode.services.myInfo.legalIdentity // Run the flows - TxKeyFlow.registerServiceFlow(bobNode.services) + bobNode.registerServiceFlow(TxKeyFlow.Requester::class) { TxKeyFlow.Provider(it) } val requesterFlow = aliceNode.services.startFlow(TxKeyFlow.Requester(bobKey)) // Get the results diff --git a/docs/source/release-notes.rst b/docs/source/release-notes.rst index 5f44a37109..c59339e880 100644 --- a/docs/source/release-notes.rst +++ b/docs/source/release-notes.rst @@ -34,6 +34,11 @@ serialisation, etc. The node exposes the platform version it's on and we envisio run on older versions of the platform to the one they were compiled against. Platform version borrows heavily from Android's API Level. +Flows can now be versioned using the ``FlowVersion`` annotation, which assigns an integer version number to it. For now +this enables a node to restrict usage of a flow to a specific version. Support for multiple verisons of the same flow, +hence achieving backwards compatibility, will be possible once we start loading CorDapps in separate class loaders. Watch +this space... + Milestone 10 ------------ diff --git a/docs/source/versioning.rst b/docs/source/versioning.rst index ff502c0d97..80562490ab 100644 --- a/docs/source/versioning.rst +++ b/docs/source/versioning.rst @@ -27,3 +27,18 @@ for the network. .. note:: A future release may introduce the concept of a target platform version, which would be similar to Android's ``targetSdkVersion``, and would provide a means of maintaining behavioural compatibility for the cases where the platform's behaviour has changed. + +Flow versioning +--------------- + +A platform which can be extended with CorDapps also requires the ability to version these apps as they evolve from +release to release. This allows users of these apps, whether they're other nodes or RPC users, to select which version +they wish to use and enables nodes to control which app versions they support. Flows have their own version numbers, +independent of other versioning, for example of the platform. In particular it is the initiating flow that can be versioned +using the ``FlowVersion`` annotation. This assigns an integer version number, similar in concept to the platform version, +which is used in the session handshake process when a flow communicates with another party for the first time. The other +party will only accept the session request if it, firstly, has that flow loaded, and secondly, for the same version (see +:doc:`flow-state-machine`). + +.. note:: Currently we don't support multiple versions of the same flow loaded in the same node. This will be possible + once we start loading CorDapps in separate class loaders. diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt new file mode 100644 index 0000000000..d9f5cbbcf9 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt @@ -0,0 +1,36 @@ +package net.corda.node.services.statemachine + +import co.paralleluniverse.fibers.Suspendable +import com.google.common.util.concurrent.Futures +import net.corda.core.crypto.Party +import net.corda.core.flows.FlowLogic +import net.corda.core.getOrThrow +import net.corda.core.utilities.unwrap +import net.corda.testing.node.NodeBasedTest +import org.assertj.core.api.Assertions.assertThat +import org.junit.Test + +class FlowVersioningTest : NodeBasedTest() { + @Test + fun `core flows receive platform version of initiator`() { + val (alice, bob) = Futures.allAsList( + startNode("Alice", platformVersion = 2), + startNode("Bob", platformVersion = 3)).getOrThrow() + bob.installCoreFlow(ClientFlow::class, ::SendBackPlatformVersionFlow) + val resultFuture = alice.services.startFlow(ClientFlow(bob.info.legalIdentity)).resultFuture + assertThat(resultFuture.getOrThrow()).isEqualTo(2) + } + + private open class ClientFlow(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call(): Any { + return sendAndReceive(otherParty, "This is ignored. We only send to kick off the flow on the other side").unwrap { it } + } + } + + private open class SendBackPlatformVersionFlow(val otherParty: Party, val otherPartysPlatformVersion: Any) : FlowLogic() { + @Suspendable + override fun call() = send(otherParty, otherPartysPlatformVersion) + } + +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index d51081a738..ab08631366 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -10,10 +10,7 @@ import net.corda.core.contracts.Amount import net.corda.core.contracts.PartyAndReference import net.corda.core.crypto.Party import net.corda.core.crypto.X509Utilities -import net.corda.core.flows.FlowInitiator -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowLogicRefFactory -import net.corda.core.flows.FlowStateMachine +import net.corda.core.flows.* import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.RPCOps import net.corda.core.messaging.SingleMessageRecipient @@ -25,6 +22,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction +import net.corda.core.utilities.debug import net.corda.flows.* import net.corda.node.services.api.* import net.corda.node.services.config.FullNodeConfiguration @@ -43,6 +41,7 @@ import net.corda.node.services.persistence.* import net.corda.node.services.schema.HibernateObserver import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.statemachine.StateMachineManager +import net.corda.node.services.statemachine.flowVersion import net.corda.node.services.transactions.* import net.corda.node.services.vault.CashBalanceAsMetricsObserver import net.corda.node.services.vault.NodeVaultService @@ -63,7 +62,9 @@ import java.time.Clock import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService -import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeUnit.SECONDS +import kotlin.collections.ArrayList +import kotlin.reflect.KClass import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair /** @@ -107,7 +108,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, // low-performance prototyping period. protected abstract val serverThread: AffinityExecutor - private val serviceFlowFactories = ConcurrentHashMap, (Party) -> FlowLogic<*>>() + protected val serviceFlowFactories = ConcurrentHashMap, ServiceFlowInfo>() protected val partyKeys = mutableSetOf() val services = object : ServiceHubInternal() { @@ -118,7 +119,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, override val keyManagementService: KeyManagementService get() = keyManagement override val identityService: IdentityService get() = identity override val schedulerService: SchedulerService get() = scheduler - override val clock: Clock = platformClock + override val clock: Clock get() = platformClock override val myInfo: NodeInfo get() = info override val schemaService: SchemaService get() = schemas override val transactionVerifierService: TransactionVerifierService get() = txVerifierService @@ -133,11 +134,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, override fun registerServiceFlow(clientFlowClass: Class>, serviceFlowFactory: (Party) -> FlowLogic<*>) { require(clientFlowClass !in serviceFlowFactories) { "${clientFlowClass.name} has already been used to register a service flow" } - log.info("Registering service flow for ${clientFlowClass.name}") - serviceFlowFactories[clientFlowClass] = serviceFlowFactory + val version = clientFlowClass.flowVersion + val info = ServiceFlowInfo.CorDapp(version, serviceFlowFactory) + log.info("Registering service flow for ${clientFlowClass.name}: $info") + serviceFlowFactories[clientFlowClass] = info } - override fun getServiceFlowFactory(clientFlowClass: Class>): ((Party) -> FlowLogic<*>)? { + override fun getServiceFlowFactory(clientFlowClass: Class>): ServiceFlowInfo? { return serviceFlowFactories[clientFlowClass] } @@ -157,7 +160,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, lateinit var vault: VaultService lateinit var keyManagement: KeyManagementService var inNodeNetworkMapService: NetworkMapService? = null - var inNodeNotaryService: NotaryService? = null lateinit var txVerifierService: TransactionVerifierService lateinit var identity: IdentityService lateinit var net: MessagingServiceInternal @@ -224,7 +226,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, // We wait here, even though any in-flight messages should have been drained away because the // server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is // arbitrary and might be inappropriate. - MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, TimeUnit.SECONDS) + MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS) } } @@ -235,7 +237,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, false } startMessagingService(rpcOps) - services.registerServiceFlow(ContractUpgradeFlow.Instigator::class.java) { ContractUpgradeFlow.Acceptor(it) } + installCoreFlows() runOnStop += Runnable { net.stop() } _networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured()) smm.start() @@ -247,6 +249,29 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, return this } + /** + * @suppress + * Installs a flow that's core to the Corda platform. Unlike CorDapp flows which are versioned individually using + * [FlowVersion], core flows have the same version as the node's platform version. To cater for backwards compatibility + * [serviceFlowFactory] provides a second parameter which is the platform version of the initiating party. + */ + @VisibleForTesting + fun installCoreFlow(clientFlowClass: KClass>, serviceFlowFactory: (Party, Int) -> FlowLogic<*>) { + require(!clientFlowClass.java.isAnnotationPresent(FlowVersion::class.java)) { + "${FlowVersion::class.java.name} not applicable for core flows; their version is the node's platform version" + } + serviceFlowFactories[clientFlowClass.java] = ServiceFlowInfo.Core(serviceFlowFactory) + log.debug { "Installed core flow ${clientFlowClass.java.name}" } + } + + private fun installCoreFlows() { + installCoreFlow(FetchTransactionsFlow::class) { otherParty, _ -> FetchTransactionsHandler(otherParty) } + installCoreFlow(FetchAttachmentsFlow::class) { otherParty, _ -> FetchAttachmentsHandler(otherParty) } + installCoreFlow(BroadcastTransactionFlow::class) { otherParty, _ -> NotifyTransactionHandler(otherParty) } + installCoreFlow(NotaryChangeFlow.Instigator::class) { otherParty, _ -> NotaryChangeFlow.Acceptor(otherParty) } + installCoreFlow(ContractUpgradeFlow.Instigator::class) { otherParty, _ -> ContractUpgradeFlow.Acceptor(otherParty) } + } + /** * Builds node internal, advertised, and plugin services. * Returns a list of tokenizable services to be added to the serialisation context. @@ -369,14 +394,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, } private fun makePluginServices(tokenizableServices: MutableList): List { - val pluginServices = pluginRegistries.flatMap { x -> x.servicePlugins } - val serviceList = mutableListOf() - for (serviceConstructor in pluginServices) { - val service = serviceConstructor.apply(services) - serviceList.add(service) - tokenizableServices.add(service) - } - return serviceList + val pluginServices = pluginRegistries.flatMap { it.servicePlugins }.map { it.apply(services) } + tokenizableServices.addAll(pluginServices) + return pluginServices } /** @@ -393,13 +413,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val notaryServiceType = serviceTypes.singleOrNull { it.isNotary() } if (notaryServiceType != null) { - inNodeNotaryService = makeNotaryService(notaryServiceType, tokenizableServices) + makeNotaryService(notaryServiceType, tokenizableServices) } } private fun registerWithNetworkMapIfConfigured(): ListenableFuture { services.networkMapCache.addNode(info) - // In the unit test environment, we may run without any network map service sometimes. + // In the unit test environment, we may sometimes run without any network map service return if (networkMapAddress == null && inNodeNetworkMapService == null) { services.networkMapCache.runWithoutMapService() noNetworkMapConfigured() // TODO This method isn't needed as runWithoutMapService sets the Future in the cache @@ -448,26 +468,28 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, inNodeNetworkMapService = PersistentNetworkMapService(services, configuration.minimumPlatformVersion) } - open protected fun makeNotaryService(type: ServiceType, tokenizableServices: MutableList): NotaryService { + open protected fun makeNotaryService(type: ServiceType, tokenizableServices: MutableList) { val timestampChecker = TimestampChecker(platformClock, 30.seconds) val uniquenessProvider = makeUniquenessProvider(type) tokenizableServices.add(uniquenessProvider) - return when (type) { - SimpleNotaryService.type -> SimpleNotaryService(services, timestampChecker, uniquenessProvider) - ValidatingNotaryService.type -> ValidatingNotaryService(services, timestampChecker, uniquenessProvider) - RaftNonValidatingNotaryService.type -> RaftNonValidatingNotaryService(services, timestampChecker, uniquenessProvider as RaftUniquenessProvider) - RaftValidatingNotaryService.type -> RaftValidatingNotaryService(services, timestampChecker, uniquenessProvider as RaftUniquenessProvider) + val notaryService = when (type) { + SimpleNotaryService.type -> SimpleNotaryService(timestampChecker, uniquenessProvider) + ValidatingNotaryService.type -> ValidatingNotaryService(timestampChecker, uniquenessProvider) + RaftNonValidatingNotaryService.type -> RaftNonValidatingNotaryService(timestampChecker, uniquenessProvider as RaftUniquenessProvider) + RaftValidatingNotaryService.type -> RaftValidatingNotaryService(timestampChecker, uniquenessProvider as RaftUniquenessProvider) BFTNonValidatingNotaryService.type -> with(configuration as FullNodeConfiguration) { val nodeId = notaryNodeId ?: throw IllegalArgumentException("notaryNodeId value must be specified in the configuration") val client = BFTSMaRt.Client(nodeId) - tokenizableServices.add(client) + tokenizableServices += client BFTNonValidatingNotaryService(services, timestampChecker, nodeId, database, client) } else -> { throw IllegalArgumentException("Notary type ${type.id} is not handled by makeNotaryService.") } } + + installCoreFlow(NotaryFlow.Client::class, notaryService.serviceFlowFactory) } protected abstract fun makeUniquenessProvider(type: ServiceType): UniquenessProvider @@ -579,3 +601,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, configuration.baseDirectory.createDirectories() } } + +sealed class ServiceFlowInfo { + data class Core(val factory: (Party, Int) -> FlowLogic<*>) : ServiceFlowInfo() + data class CorDapp(val version: Int, val factory: (Party) -> FlowLogic<*>) : ServiceFlowInfo() +} diff --git a/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt b/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt deleted file mode 100644 index 4115ff6dfd..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt +++ /dev/null @@ -1,23 +0,0 @@ -package net.corda.node.services - -import net.corda.core.node.CordaPluginRegistry -import net.corda.core.node.PluginServiceHub -import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.flows.NotaryChangeFlow -import java.util.function.Function - -object NotaryChange { - class Plugin : CordaPluginRegistry() { - override val servicePlugins = listOf(Function(::Service)) - } - - /** - * A service that monitors the network for requests for changing the notary of a state, - * and immediately runs the [NotaryChangeFlow] if the auto-accept criteria are met. - */ - class Service(services: PluginServiceHub) : SingletonSerializeAsToken() { - init { - services.registerServiceFlow(NotaryChangeFlow.Instigator::class.java) { NotaryChangeFlow.Acceptor(it) } - } - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index 56c5c7247e..acff817c4e 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -2,7 +2,6 @@ package net.corda.node.services.api import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ListenableFuture -import net.corda.core.crypto.Party import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogicRefFactory @@ -11,8 +10,9 @@ import net.corda.core.messaging.MessagingService import net.corda.core.node.PluginServiceHub import net.corda.core.node.services.TxWritableStorageService import net.corda.core.transactions.SignedTransaction +import net.corda.core.utilities.loggerFor +import net.corda.node.internal.ServiceFlowInfo import net.corda.node.services.statemachine.FlowStateMachineImpl -import org.slf4j.LoggerFactory interface MessagingServiceInternal : MessagingService { /** @@ -37,9 +37,12 @@ interface MessagingServiceBuilder { fun start(): ListenableFuture } -private val log = LoggerFactory.getLogger(ServiceHubInternal::class.java) abstract class ServiceHubInternal : PluginServiceHub { + companion object { + private val log = loggerFor() + } + abstract val monitoringService: MonitoringService abstract val flowLogicRefFactory: FlowLogicRefFactory abstract val schemaService: SchemaService @@ -99,5 +102,5 @@ abstract class ServiceHubInternal : PluginServiceHub { return startFlow(logic, flowInitiator) } - abstract fun getServiceFlowFactory(clientFlowClass: Class>): ((Party) -> FlowLogic<*>)? -} + abstract fun getServiceFlowFactory(clientFlowClass: Class>): ServiceFlowInfo? +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index 514e13f451..e4683ca31b 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -38,13 +38,13 @@ import org.bouncycastle.asn1.x500.X500Name import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.statements.InsertStatement +import java.security.PublicKey import java.time.Instant import java.util.* import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import javax.annotation.concurrent.ThreadSafe -import java.security.PublicKey // TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox @@ -296,23 +296,13 @@ class NodeMessagingClient(override val config: NodeConfiguration, try { val topic = message.required(topicProperty) { getStringProperty(it) } val sessionID = message.required(sessionIdProperty) { getLongProperty(it) } + val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" } + val platformVersion = message.required(platformVersionProperty) { getIntProperty(it) } // Use the magic deduplication property built into Artemis as our message identity too val uuid = message.required(HDR_DUPLICATE_DETECTION_ID) { UUID.fromString(message.getStringProperty(it)) } - val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" } log.trace { "Received message from: ${message.address} user: $user topic: $topic sessionID: $sessionID uuid: $uuid" } - val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } - - val msg = object : ReceivedMessage { - override val topicSession = TopicSession(topic, sessionID) - override val data: ByteArray = body - override val peer: X500Name = X500Name(user) - override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp) - override val uniqueMessageId: UUID = uuid - override fun toString() = "$topic#${data.opaque()}" - } - - return msg + return ArtemisReceivedMessage(TopicSession(topic, sessionID), X500Name(user), platformVersion, uuid, message) } catch (e: Exception) { log.error("Unable to process message, ignoring it: $message", e) return null @@ -324,6 +314,16 @@ class NodeMessagingClient(override val config: NodeConfiguration, return extractor(key) } + private class ArtemisReceivedMessage(override val topicSession: TopicSession, + override val peer: X500Name, + override val platformVersion: Int, + override val uniqueMessageId: UUID, + private val message: ClientMessage) : ReceivedMessage { + override val data: ByteArray by lazy { ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } } + override val debugTimestamp: Instant get() = Instant.ofEpochMilli(message.timestamp) + override fun toString() = "${topicSession.topic}#${data.opaque()}" + } + private fun deliver(msg: ReceivedMessage): Boolean { state.checkNotLocked() // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt index e4f8c77ed2..42fb222fb9 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt @@ -5,83 +5,61 @@ import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic -import net.corda.core.node.CordaPluginRegistry -import net.corda.core.node.PluginServiceHub -import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.unwrap import net.corda.flows.* -import java.util.function.Function -import javax.annotation.concurrent.ThreadSafe -object DataVending { - class Plugin : CordaPluginRegistry() { - override val servicePlugins = listOf(Function(::Service)) +/** + * This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple + * glue that sits between the network layer and the database layer. + * + * Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There + * are no access control lists. If you want to keep some data private, then you must be careful who you give its name + * to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have + * its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as + * such the hash of a piece of data can be seen as a type of password allowing access to it. + * + * Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null. + */ +class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler(otherParty) { + override fun getData(id: SecureHash): SignedTransaction? { + return serviceHub.storageService.validatedTransactions.getTransaction(id) + } +} + +// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer. +class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler(otherParty) { + override fun getData(id: SecureHash): ByteArray? { + return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes() + } +} + +abstract class FetchDataHandler(val otherParty: Party) : FlowLogic() { + @Suspendable + @Throws(FetchDataFlow.HashNotFound::class) + override fun call() { + val request = receive(otherParty).unwrap { + if (it.hashes.isEmpty()) throw FlowException("Empty hash list") + it + } + val response = request.hashes.map { + getData(it) ?: throw FetchDataFlow.HashNotFound(it) + } + send(otherParty, response) + } + + protected abstract fun getData(id: SecureHash): T? +} + +// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction +// includes us in any outside that list. Potentially just if it includes any outside that list at all. +// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming +// cash without from unknown parties? +class NotifyTransactionHandler(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call() { + val request = receive(otherParty).unwrap { it } + subFlow(ResolveTransactionsFlow(request.tx, otherParty), shareParentSessions = true) + serviceHub.recordTransactions(request.tx) } - - /** - * This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple - * glue that sits between the network layer and the database layer. - * - * Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There - * are no access control lists. If you want to keep some data private, then you must be careful who you give its name - * to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have - * its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as - * such the hash of a piece of data can be seen as a type of password allowing access to it. - * - * Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null. - */ - @ThreadSafe - class Service(services: PluginServiceHub) : SingletonSerializeAsToken() { - init { - services.registerServiceFlow(FetchTransactionsFlow::class.java, ::FetchTransactionsHandler) - services.registerServiceFlow(FetchAttachmentsFlow::class.java, ::FetchAttachmentsHandler) - services.registerServiceFlow(BroadcastTransactionFlow::class.java, ::NotifyTransactionHandler) - } - - private class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler(otherParty) { - override fun getData(id: SecureHash): SignedTransaction? { - return serviceHub.storageService.validatedTransactions.getTransaction(id) - } - } - - // TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer. - private class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler(otherParty) { - override fun getData(id: SecureHash): ByteArray? { - return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes() - } - } - - private abstract class FetchDataHandler(val otherParty: Party) : FlowLogic() { - @Suspendable - @Throws(FetchDataFlow.HashNotFound::class) - override fun call() { - val request = receive(otherParty).unwrap { - if (it.hashes.isEmpty()) throw FlowException("Empty hash list") - it - } - val response = request.hashes.map { - getData(it) ?: throw FetchDataFlow.HashNotFound(it) - } - send(otherParty, response) - } - - protected abstract fun getData(id: SecureHash): T? - } - - - // TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction - // includes us in any outside that list. Potentially just if it includes any outside that list at all. - // TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming - // cash without from unknown parties? - class NotifyTransactionHandler(val otherParty: Party) : FlowLogic() { - @Suspendable - override fun call() { - val request = receive(otherParty).unwrap { it } - subFlow(ResolveTransactionsFlow(request.tx, otherParty), shareParentSessions = true) - serviceHub.recordTransactions(request.tx) - } - } - } - } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index de026c0d18..081e239e77 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -298,7 +298,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, openSessions[Pair(sessionFlow, otherParty)] = session // We get the top-most concrete class object to cater for the case where the client flow is customised via a sub-class val clientFlowClass = sessionFlow.topConcreteFlowClass - val sessionInit = SessionInit(session.ourSessionId, clientFlowClass, firstPayload) + val sessionInit = SessionInit(session.ourSessionId, clientFlowClass, clientFlowClass.flowVersion, firstPayload) sendInternal(session, sessionInit) if (waitForConfirmation) { session.waitForConfirmation() @@ -434,6 +434,12 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } } +val Class>.flowVersion: Int get() { + val flowVersion = getDeclaredAnnotation(FlowVersion::class.java) ?: return 1 + require(flowVersion.value > 0) { "Flow versions have to be greater or equal to 1" } + return flowVersion.value +} + // I would prefer for [FlowProgressHandleImpl] to extend [FlowHandleImpl], // but Kotlin doesn't allow this for data classes, not even to create // another data class! diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt index d84dab7988..faeb131298 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt @@ -15,6 +15,7 @@ interface SessionMessage data class SessionInit(val initiatorSessionId: Long, val clientFlowClass: Class>, + val flowVerison: Int, val firstPayload: Any?) : SessionMessage interface ExistingSessionMessage : SessionMessage { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index c62317c66b..3baa199eed 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -13,9 +13,7 @@ import com.esotericsoftware.kryo.pool.KryoPool import com.google.common.collect.HashMultimap import com.google.common.util.concurrent.ListenableFuture import io.requery.util.CloseableIterator -import net.corda.core.ErrorOr -import net.corda.core.ThreadBox -import net.corda.core.bufferUntilSubscribed +import net.corda.core.* import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.crypto.commonName @@ -23,12 +21,11 @@ import net.corda.core.flows.* import net.corda.core.messaging.ReceivedMessage import net.corda.core.messaging.TopicSession import net.corda.core.messaging.send -import net.corda.core.random63BitValue import net.corda.core.serialization.* -import net.corda.core.then import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace +import net.corda.node.internal.ServiceFlowInfo import net.corda.node.services.api.Checkpoint import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal @@ -151,7 +148,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, private val recentlyClosedSessions = ConcurrentHashMap() // Context for tokenized services in checkpoints - private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryo(), serviceHub) + private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryoPool, serviceHub) /** Returns a list of all state machines executing the given flow logic at the top level (subflows do not count) */ fun

, T> findStateMachines(flowClass: Class

): List>> { @@ -289,7 +286,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, if (sender != null) { when (sessionMessage) { is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, sender) - is SessionInit -> onSessionInit(sessionMessage, sender) + is SessionInit -> onSessionInit(sessionMessage, message, sender) } } else { logger.error("Unknown peer ${message.peer} in $sessionMessage") @@ -335,21 +332,38 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, waitingForResponse is WaitForLedgerCommit && message is ErrorSessionEnd } - private fun onSessionInit(sessionInit: SessionInit, sender: Party) { + private fun onSessionInit(sessionInit: SessionInit, receivedMessage: ReceivedMessage, sender: Party) { logger.trace { "Received $sessionInit from $sender" } val otherPartySessionId = sessionInit.initiatorSessionId fun sendSessionReject(message: String) = sendSessionMessage(sender, SessionReject(otherPartySessionId, message)) - val flowFactory = serviceHub.getServiceFlowFactory(sessionInit.clientFlowClass) - if (flowFactory == null) { + val serviceFlowInfo = serviceHub.getServiceFlowFactory(sessionInit.clientFlowClass) + if (serviceFlowInfo == null) { logger.warn("${sessionInit.clientFlowClass} has not been registered with a service flow: $sessionInit") sendSessionReject("Don't know ${sessionInit.clientFlowClass.name}") return } val session = try { - val flow = flowFactory(sender) + val flow = when (serviceFlowInfo) { + is ServiceFlowInfo.CorDapp -> { + // TODO Add support for multiple versions of the same flow when CorDapps are loaded in separate class loaders + if (sessionInit.flowVerison != serviceFlowInfo.version) { + logger.warn("Version mismatch - ${sessionInit.clientFlowClass} is only registered for version " + + "${serviceFlowInfo.version}: $sessionInit") + sendSessionReject("Version not supported") + return + } + serviceFlowInfo.factory(sender) + } + is ServiceFlowInfo.Core -> serviceFlowInfo.factory(sender, receivedMessage.platformVersion) + } + + if (flow.javaClass.isAnnotationPresent(FlowVersion::class.java)) { + logger.warn("${FlowVersion::class.java.name} is not applicable for service flows: ${flow.javaClass.name}") + } + val fiber = createFiber(flow, FlowInitiator.Peer(sender)) val session = FlowSession(flow, random63BitValue(), sender, FlowSessionState.Initiated(sender, otherPartySessionId)) if (sessionInit.firstPayload != null) { @@ -372,7 +386,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes> { - return quasarKryo().run { kryo -> + return quasarKryoPool.run { kryo -> // add the map of tokens -> tokenizedServices to the kyro context kryo.withSerializationContext(serializationContext) { fiber.serialize(kryo) @@ -381,7 +395,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } private fun deserializeFiber(checkpoint: Checkpoint): FlowStateMachineImpl<*> { - return quasarKryo().run { kryo -> + return quasarKryoPool.run { kryo -> // put the map of token -> tokenized into the kryo context kryo.withSerializationContext(serializationContext) { checkpoint.serializedFiber.deserialize(kryo) @@ -389,8 +403,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } } - private fun quasarKryo(): KryoPool = quasarKryoPool - private fun createFiber(logic: FlowLogic, flowInitiator: FlowInitiator): FlowStateMachineImpl { val id = StateMachineRunId.createRandom() return FlowStateMachineImpl(id, logic, scheduler, flowInitiator).apply { initFiber(this) } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt index 128370e7b3..4d9d58de8d 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt @@ -25,7 +25,7 @@ class BFTNonValidatingNotaryService(services: ServiceHubInternal, timestampChecker: TimestampChecker, serverId: Int, db: Database, - val client: BFTSMaRt.Client) : NotaryService(services) { + val client: BFTSMaRt.Client) : NotaryService { init { thread(name = "BFTSmartServer-$serverId", isDaemon = true) { Server(serverId, db, "bft_smart_notary_committed_states", services, timestampChecker) @@ -37,9 +37,11 @@ class BFTNonValidatingNotaryService(services: ServiceHubInternal, private val log = loggerFor() } - override fun createFlow(otherParty: Party) = ServiceFlow(otherParty, client) + override val serviceFlowFactory: (Party, Int) -> FlowLogic = { otherParty, _ -> + ServiceFlow(otherParty, client) + } - class ServiceFlow(val otherSide: Party, val client: BFTSMaRt.Client) : FlowLogic() { + private class ServiceFlow(val otherSide: Party, val client: BFTSMaRt.Client) : FlowLogic() { @Suspendable override fun call(): Void? { val stx = receive(otherSide).unwrap { it } @@ -60,11 +62,11 @@ class BFTNonValidatingNotaryService(services: ServiceHubInternal, } } - class Server(id: Int, - db: Database, - tableName: String, - services: ServiceHubInternal, - timestampChecker: TimestampChecker) : BFTSMaRt.Server(id, db, tableName, services, timestampChecker) { + private class Server(id: Int, + db: Database, + tableName: String, + services: ServiceHubInternal, + timestampChecker: TimestampChecker) : BFTSMaRt.Server(id, db, tableName, services, timestampChecker) { override fun executeCommand(command: ByteArray): ByteArray { val request = command.deserialize() diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt index 621e27737d..941caff33d 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt @@ -2,26 +2,13 @@ package net.corda.node.services.transactions import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic -import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.flows.NotaryFlow -import net.corda.node.services.api.ServiceHubInternal -/** - * A Notary service acts as the final signer of a transaction ensuring two things: - * - The (optional) timestamp of the transaction is valid. - * - None of the referenced input states have previously been consumed by a transaction signed by this Notary - *O - * A transaction has to be signed by a Notary to be considered valid (except for output-only transactions without a timestamp). - * - * This is the base implementation that can be customised with specific Notary transaction commit flow. - */ -abstract class NotaryService(services: ServiceHubInternal) : SingletonSerializeAsToken() { - - init { - services.registerServiceFlow(NotaryFlow.Client::class.java) { createFlow(it) } - } - - /** Implement a factory that specifies the transaction commit flow for the notary service to use */ - abstract fun createFlow(otherParty: Party): FlowLogic +interface NotaryService { + /** + * Factory for producing notary service flows which have the corresponding sends and receives as NotaryFlow.Client. + * The first parameter is the client [Party] making the request and the second is the platform version of the client's + * node. Use this version parameter to provide backwards compatibility if the notary flow protocol changes. + */ + val serviceFlowFactory: (Party, Int) -> FlowLogic } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt index 0fc78c605a..e61eafc079 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt @@ -1,19 +1,18 @@ package net.corda.node.services.transactions import net.corda.core.crypto.Party +import net.corda.core.flows.FlowLogic import net.corda.core.node.services.TimestampChecker import net.corda.flows.NonValidatingNotaryFlow -import net.corda.node.services.api.ServiceHubInternal /** A non-validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */ -class RaftNonValidatingNotaryService(services: ServiceHubInternal, - val timestampChecker: TimestampChecker, - val uniquenessProvider: RaftUniquenessProvider) : NotaryService(services) { +class RaftNonValidatingNotaryService(val timestampChecker: TimestampChecker, + val uniquenessProvider: RaftUniquenessProvider) : NotaryService { companion object { val type = SimpleNotaryService.type.getSubType("raft") } - override fun createFlow(otherParty: Party): NonValidatingNotaryFlow { - return NonValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider) + override val serviceFlowFactory: (Party, Int) -> FlowLogic = { otherParty, _ -> + NonValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider) } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt index 3ef3e1610f..bb52ee9d5b 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt @@ -1,19 +1,18 @@ package net.corda.node.services.transactions import net.corda.core.crypto.Party +import net.corda.core.flows.FlowLogic import net.corda.core.node.services.TimestampChecker import net.corda.flows.ValidatingNotaryFlow -import net.corda.node.services.api.ServiceHubInternal /** A validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */ -class RaftValidatingNotaryService(services: ServiceHubInternal, - val timestampChecker: TimestampChecker, - val uniquenessProvider: RaftUniquenessProvider) : NotaryService(services) { +class RaftValidatingNotaryService(val timestampChecker: TimestampChecker, + val uniquenessProvider: RaftUniquenessProvider) : NotaryService { companion object { val type = ValidatingNotaryService.type.getSubType("raft") } - override fun createFlow(otherParty: Party): ValidatingNotaryFlow { - return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider) + override val serviceFlowFactory: (Party, Int) -> FlowLogic = { otherParty, _ -> + ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider) } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt index 062d14d2a5..722a1fed2c 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt @@ -1,22 +1,20 @@ package net.corda.node.services.transactions import net.corda.core.crypto.Party +import net.corda.core.flows.FlowLogic import net.corda.core.node.services.ServiceType import net.corda.core.node.services.TimestampChecker import net.corda.core.node.services.UniquenessProvider import net.corda.flows.NonValidatingNotaryFlow -import net.corda.flows.NotaryFlow -import net.corda.node.services.api.ServiceHubInternal /** A simple Notary service that does not perform transaction validation */ -class SimpleNotaryService(services: ServiceHubInternal, - val timestampChecker: TimestampChecker, - val uniquenessProvider: UniquenessProvider) : NotaryService(services) { +class SimpleNotaryService(val timestampChecker: TimestampChecker, + val uniquenessProvider: UniquenessProvider) : NotaryService { companion object { val type = ServiceType.notary.getSubType("simple") } - override fun createFlow(otherParty: Party): NotaryFlow.Service { - return NonValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider) + override val serviceFlowFactory: (Party, Int) -> FlowLogic = { otherParty, _ -> + NonValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider) } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt index 96d1f1fc72..2b1000983f 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt @@ -1,21 +1,20 @@ package net.corda.node.services.transactions import net.corda.core.crypto.Party +import net.corda.core.flows.FlowLogic import net.corda.core.node.services.ServiceType import net.corda.core.node.services.TimestampChecker import net.corda.core.node.services.UniquenessProvider import net.corda.flows.ValidatingNotaryFlow -import net.corda.node.services.api.ServiceHubInternal /** A Notary service that validates the transaction chain of the submitted transaction before committing it */ -class ValidatingNotaryService(services: ServiceHubInternal, - val timestampChecker: TimestampChecker, - val uniquenessProvider: UniquenessProvider) : NotaryService(services) { +class ValidatingNotaryService(val timestampChecker: TimestampChecker, + val uniquenessProvider: UniquenessProvider) : NotaryService { companion object { val type = ServiceType.notary.getSubType("validating") } - override fun createFlow(otherParty: Party): ValidatingNotaryFlow { - return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider) + override val serviceFlowFactory: (Party, Int) -> FlowLogic = { otherParty, _ -> + ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider) } } diff --git a/node/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry b/node/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry deleted file mode 100644 index 884cc0cfae..0000000000 --- a/node/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry +++ /dev/null @@ -1,3 +0,0 @@ -# Register a ServiceLoader service extending from net.corda.core.node.CordaPluginRegistry -net.corda.node.services.NotaryChange$Plugin -net.corda.node.services.persistence.DataVending$Plugin \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt index 8fe78212f5..a89f2d05a4 100644 --- a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt @@ -9,12 +9,12 @@ import net.corda.core.flows.FlowStateMachine import net.corda.core.node.NodeInfo import net.corda.core.node.services.* import net.corda.core.transactions.SignedTransaction +import net.corda.node.internal.ServiceFlowInfo import net.corda.node.serialization.NodeClock import net.corda.node.services.api.MessagingServiceInternal import net.corda.node.services.api.MonitoringService import net.corda.node.services.api.SchemaService import net.corda.node.services.api.ServiceHubInternal -import net.corda.node.services.persistence.DataVending import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.transactions.InMemoryTransactionVerifierService @@ -69,14 +69,6 @@ open class MockServiceHubInternal( lateinit var smm: StateMachineManager - init { - if (net != null && storage != null) { - // Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener - // on the networking service, so that will keep it from being collected. - DataVending.Service(this) - } - } - override fun recordTransactions(txs: Iterable) = recordTransactionsInternal(txStorageService, txs) override fun startFlow(logic: FlowLogic, flowInitiator: FlowInitiator): FlowStateMachine { @@ -85,5 +77,5 @@ open class MockServiceHubInternal( override fun registerServiceFlow(clientFlowClass: Class>, serviceFlowFactory: (Party) -> FlowLogic<*>) = Unit - override fun getServiceFlowFactory(clientFlowClass: Class>): ((Party) -> FlowLogic<*>)? = null + override fun getServiceFlowFactory(clientFlowClass: Class>): ServiceFlowInfo? = null } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt index 97f60a1ccb..e87e35d961 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt @@ -12,7 +12,6 @@ import net.corda.core.node.services.unconsumedStates import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY import net.corda.flows.BroadcastTransactionFlow.NotifyTxRequest -import net.corda.node.services.persistence.DataVending.Service.NotifyTransactionHandler import net.corda.node.utilities.transaction import net.corda.testing.MEGA_CORP import net.corda.testing.node.MockNetwork @@ -89,7 +88,7 @@ class DataVendingServiceTests { } private fun MockNode.sendNotifyTx(tx: SignedTransaction, walletServiceNode: MockNode) { - walletServiceNode.services.registerServiceFlow(NotifyTxFlow::class.java, ::NotifyTransactionHandler) + walletServiceNode.registerServiceFlow(clientFlowClass = NotifyTxFlow::class, serviceFlowFactory = ::NotifyTransactionHandler) services.startFlow(NotifyTxFlow(walletServiceNode.info.legalIdentity, tx)) network.runNetwork() } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt index 2b903c9b60..aadbf6d472 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt @@ -11,6 +11,7 @@ import net.corda.core.crypto.Party import net.corda.core.crypto.generateKeyPair import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowVersion import net.corda.core.messaging.MessageRecipients import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.ServiceInfo @@ -110,7 +111,7 @@ class StateMachineManagerTests { @Test fun `exception while fiber suspended`() { - node2.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow("Hello", it) } + node2.registerServiceFlow(ReceiveFlow::class) { SendFlow("Hello", it) } val flow = ReceiveFlow(node2.info.legalIdentity) val fiber = node1.services.startFlow(flow) as FlowStateMachineImpl // Before the flow runs change the suspend action to throw an exception @@ -129,7 +130,7 @@ class StateMachineManagerTests { @Test fun `flow restarted just after receiving payload`() { - node2.services.registerServiceFlow(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() } + node2.registerServiceFlow(SendFlow::class) { ReceiveFlow(it).nonTerminating() } node1.services.startFlow(SendFlow("Hello", node2.info.legalIdentity)) // We push through just enough messages to get only the payload sent @@ -179,7 +180,7 @@ class StateMachineManagerTests { @Test fun `flow loaded from checkpoint will respond to messages from before start`() { - node1.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow("Hello", it) } + node1.registerServiceFlow(ReceiveFlow::class) { SendFlow("Hello", it) } node2.services.startFlow(ReceiveFlow(node1.info.legalIdentity).nonTerminating()) // Prepare checkpointed receive flow // Make sure the add() has finished initial processing. node2.smm.executor.flush() @@ -243,8 +244,8 @@ class StateMachineManagerTests { fun `sending to multiple parties`() { val node3 = net.createNode(node1.info.address) net.runNetwork() - node2.services.registerServiceFlow(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() } - node3.services.registerServiceFlow(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() } + node2.registerServiceFlow(SendFlow::class) { ReceiveFlow(it).nonTerminating() } + node3.registerServiceFlow(SendFlow::class) { ReceiveFlow(it).nonTerminating() } val payload = "Hello World" node1.services.startFlow(SendFlow(payload, node2.info.legalIdentity, node3.info.legalIdentity)) net.runNetwork() @@ -254,14 +255,14 @@ class StateMachineManagerTests { assertThat(node3Flow.receivedPayloads[0]).isEqualTo(payload) assertSessionTransfers(node2, - node1 sent sessionInit(SendFlow::class, payload) to node2, + node1 sent sessionInit(SendFlow::class, 1, payload) to node2, node2 sent sessionConfirm to node1, node1 sent normalEnd to node2 //There's no session end from the other flows as they're manually suspended ) assertSessionTransfers(node3, - node1 sent sessionInit(SendFlow::class, payload) to node3, + node1 sent sessionInit(SendFlow::class, 1, payload) to node3, node3 sent sessionConfirm to node1, node1 sent normalEnd to node3 //There's no session end from the other flows as they're manually suspended @@ -277,8 +278,8 @@ class StateMachineManagerTests { net.runNetwork() val node2Payload = "Test 1" val node3Payload = "Test 2" - node2.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow(node2Payload, it) } - node3.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow(node3Payload, it) } + node2.registerServiceFlow(ReceiveFlow::class) { SendFlow(node2Payload, it) } + node3.registerServiceFlow(ReceiveFlow::class) { SendFlow(node3Payload, it) } val multiReceiveFlow = ReceiveFlow(node2.info.legalIdentity, node3.info.legalIdentity).nonTerminating() node1.services.startFlow(multiReceiveFlow) node1.acceptableLiveFiberCountOnStop = 1 @@ -303,12 +304,12 @@ class StateMachineManagerTests { @Test fun `both sides do a send as their first IO request`() { - node2.services.registerServiceFlow(PingPongFlow::class.java) { PingPongFlow(it, 20L) } + node2.registerServiceFlow(PingPongFlow::class) { PingPongFlow(it, 20L) } node1.services.startFlow(PingPongFlow(node2.info.legalIdentity, 10L)) net.runNetwork() assertSessionTransfers( - node1 sent sessionInit(PingPongFlow::class, 10L) to node2, + node1 sent sessionInit(PingPongFlow::class, 1, 10L) to node2, node2 sent sessionConfirm to node1, node2 sent sessionData(20L) to node1, node1 sent sessionData(11L) to node2, @@ -374,7 +375,7 @@ class StateMachineManagerTests { @Test fun `other side ends before doing expected send`() { - node2.services.registerServiceFlow(ReceiveFlow::class.java) { NoOpFlow() } + node2.registerServiceFlow(ReceiveFlow::class) { NoOpFlow() } val resultFuture = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity)).resultFuture net.runNetwork() assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy { @@ -534,7 +535,7 @@ class StateMachineManagerTests { } } - node2.services.registerServiceFlow(AskForExceptionFlow::class.java) { ConditionalExceptionFlow(it, "Hello") } + node2.registerServiceFlow(AskForExceptionFlow::class) { ConditionalExceptionFlow(it, "Hello") } val resultFuture = node1.services.startFlow(RetryOnExceptionFlow(node2.info.legalIdentity)).resultFuture net.runNetwork() assertThat(resultFuture.getOrThrow()).isEqualTo("Hello") @@ -562,7 +563,7 @@ class StateMachineManagerTests { ptx.signWith(node1.services.legalIdentityKey) val stx = ptx.toSignedTransaction() - node1.services.registerServiceFlow(WaitingFlows.Waiter::class.java) { + node1.registerServiceFlow(WaitingFlows.Waiter::class) { WaitingFlows.Committer(it) { throw Exception("Error") } } val waiter = node2.services.startFlow(WaitingFlows.Waiter(stx, node1.info.legalIdentity)).resultFuture @@ -587,6 +588,31 @@ class StateMachineManagerTests { assertThat(receiveFlowFuture.getOrThrow().receivedPayloads).containsOnly("Hello") } + @Test + fun `upgraded flow`() { + node1.services.startFlow(UpgradedFlow(node2.info.legalIdentity)) + net.runNetwork() + assertThat(sessionTransfers).startsWith( + node1 sent sessionInit(UpgradedFlow::class, 2) to node2 + ) + } + + @Test + fun `unsupported new flow version`() { + node2.registerServiceFlow(UpgradedFlow::class, flowVersion = 1) { SendFlow("Hello", it) } + val result = node1.services.startFlow(UpgradedFlow(node2.info.legalIdentity)).resultFuture + net.runNetwork() + assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy { + result.getOrThrow() + }.withMessageContaining("Version") + } + + @FlowVersion(2) + private class UpgradedFlow(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call(): Any = receive(otherParty).unwrap { it } + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////// //region Helpers @@ -605,8 +631,8 @@ class StateMachineManagerTests { return smm.findStateMachines(P::class.java).single() } - private fun sessionInit(clientFlowClass: KClass>, payload: Any? = null): SessionInit { - return SessionInit(0, clientFlowClass.java, payload) + private fun sessionInit(clientFlowClass: KClass>, flowVersion: Int = 1, payload: Any? = null): SessionInit { + return SessionInit(0, clientFlowClass.java, flowVersion, payload) } private val sessionConfirm = SessionConfirm(0, 0) private fun sessionData(payload: Any) = SessionData(0, payload) diff --git a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt index 76b198eac8..37c68fba2d 100644 --- a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt +++ b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt @@ -30,7 +30,7 @@ class TraderDemoTest : NodeBasedTest() { startNode(DUMMY_BANK_A.name, rpcUsers = demoUser), startNode(DUMMY_BANK_B.name, rpcUsers = demoUser), startNode(BOC.name, rpcUsers = listOf(user)), - startNode(DUMMY_NOTARY.name, setOf(ServiceInfo(SimpleNotaryService.type))) + startNode(DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type))) ).getOrThrow() val (nodeARpc, nodeBRpc) = listOf(nodeA, nodeB).map { diff --git a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt index b64f505a62..c158a06b21 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt @@ -145,9 +145,9 @@ fun getFreeLocalPorts(hostName: String, numberToAlloc: Int): List { */ inline fun > AbstractNode.initiateSingleShotFlow( clientFlowClass: KClass>, - noinline flowFactory: (Party) -> P): ListenableFuture

{ + noinline serviceFlowFactory: (Party) -> P): ListenableFuture

{ val future = smm.changes.filter { it is StateMachineManager.Change.Add && it.logic is P }.map { it.logic as P }.toFuture() - services.registerServiceFlow(clientFlowClass.java, flowFactory) + services.registerServiceFlow(clientFlowClass.java, serviceFlowFactory) return future } diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index 9cbc1aa1d8..33f1bcb25d 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -272,12 +272,20 @@ class InMemoryMessagingNetwork( } @CordaSerializable - private data class InMemoryMessage(override val topicSession: TopicSession, override val data: ByteArray, override val uniqueMessageId: UUID, override val debugTimestamp: Instant = Instant.now()) : Message { + private data class InMemoryMessage(override val topicSession: TopicSession, + override val data: ByteArray, + override val uniqueMessageId: UUID, + override val debugTimestamp: Instant = Instant.now()) : Message { override fun toString() = "$topicSession#${String(data)}" } @CordaSerializable - private data class InMemoryReceivedMessage(override val topicSession: TopicSession, override val data: ByteArray, override val uniqueMessageId: UUID, override val debugTimestamp: Instant, override val peer: X500Name) : ReceivedMessage + private data class InMemoryReceivedMessage(override val topicSession: TopicSession, + override val data: ByteArray, + override val platformVersion: Int, + override val uniqueMessageId: UUID, + override val debugTimestamp: Instant, + override val peer: X500Name) : ReceivedMessage /** * An [InMemoryMessaging] provides a [MessagingService] that isn't backed by any kind of network or disk storage @@ -453,6 +461,9 @@ class InMemoryMessagingNetwork( private fun MessageTransfer.toReceivedMessage(): ReceivedMessage = InMemoryReceivedMessage( message.topicSession, message.data.copyOf(), // Kryo messes with the buffer so give each client a unique copy - message.uniqueMessageId, message.debugTimestamp, X509Utilities.getDevX509Name(sender.description)) + 1, + message.uniqueMessageId, + message.debugTimestamp, + X509Utilities.getDevX509Name(sender.description)) } } diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index 75e1dff548..6ecba4a8f4 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -1,5 +1,6 @@ package net.corda.testing.node +import com.google.common.annotations.VisibleForTesting import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Jimfs import com.google.common.util.concurrent.Futures @@ -7,6 +8,7 @@ import com.google.common.util.concurrent.ListenableFuture import net.corda.core.* import net.corda.core.crypto.Party import net.corda.core.crypto.entropyToKeyPair +import net.corda.core.flows.FlowLogic import net.corda.core.messaging.RPCOps import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.CordaPluginRegistry @@ -16,11 +18,13 @@ import net.corda.core.node.services.* import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.core.utilities.loggerFor import net.corda.node.internal.AbstractNode +import net.corda.node.internal.ServiceFlowInfo import net.corda.node.services.api.MessagingServiceInternal import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.keys.E2ETestKeyManagementService import net.corda.node.services.network.InMemoryNetworkMapService import net.corda.node.services.network.NetworkMapService +import net.corda.node.services.statemachine.flowVersion import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.services.transactions.InMemoryUniquenessProvider import net.corda.node.services.transactions.SimpleNotaryService @@ -38,6 +42,7 @@ import java.security.KeyPair import java.util.* import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +import kotlin.reflect.KClass /** * A mock node brings up a suite of in-memory services in a fast manner suitable for unit testing. @@ -224,6 +229,13 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, // It is used from the network visualiser tool. @Suppress("unused") val place: PhysicalLocation get() = findMyLocation()!! + @VisibleForTesting + fun registerServiceFlow(clientFlowClass: KClass>, + flowVersion: Int = clientFlowClass.java.flowVersion, + serviceFlowFactory: (Party) -> FlowLogic<*>) { + serviceFlowFactories[clientFlowClass.java] = ServiceFlowInfo.CorDapp(flowVersion, serviceFlowFactory) + } + fun pumpReceive(block: Boolean = false): InMemoryMessagingNetwork.MessageTransfer? { return (net as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(block) } diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt index fdac833c6b..cb093a4460 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt @@ -60,21 +60,24 @@ abstract class NodeBasedTest { * will automatically be started with the default parameters. */ fun startNetworkMapNode(legalName: String = DUMMY_MAP.name, + platformVersion: Int = 1, advertisedServices: Set = emptySet(), rpcUsers: List = emptyList(), configOverrides: Map = emptyMap()): Node { check(_networkMapNode == null) - return startNodeInternal(legalName, advertisedServices, rpcUsers, configOverrides).apply { + return startNodeInternal(legalName, platformVersion, advertisedServices, rpcUsers, configOverrides).apply { _networkMapNode = this } } fun startNode(legalName: String, + platformVersion: Int = 1, advertisedServices: Set = emptySet(), rpcUsers: List = emptyList(), configOverrides: Map = emptyMap()): ListenableFuture { val node = startNodeInternal( legalName, + platformVersion, advertisedServices, rpcUsers, mapOf( @@ -118,6 +121,7 @@ abstract class NodeBasedTest { } private fun startNodeInternal(legalName: String, + platformVersion: Int, advertisedServices: Set, rpcUsers: List, configOverrides: Map): Node { @@ -141,7 +145,7 @@ abstract class NodeBasedTest { ) + configOverrides ) - val node = config.parseAs().createNode(MOCK_VERSION_INFO) + val node = config.parseAs().createNode(MOCK_VERSION_INFO.copy(platformVersion = platformVersion)) node.start() nodes += node thread(name = legalName) {