diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt
index b23a79c125..171442be2c 100644
--- a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt
+++ b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt
@@ -26,7 +26,7 @@ sealed class FlowInitiator {
     /** Started when we get new session initiation request. */
     data class Peer(val party: Party) : FlowInitiator()
     /** Started as scheduled activity. */
-    class Scheduled(val scheduledState: ScheduledStateRef) : FlowInitiator()
+    data class Scheduled(val scheduledState: ScheduledStateRef) : FlowInitiator()
     object Shell : FlowInitiator() // TODO When proper ssh access enabled, add username/use RPC?
 }
 
diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowVersion.kt b/core/src/main/kotlin/net/corda/core/flows/FlowVersion.kt
new file mode 100644
index 0000000000..a953aa01eb
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/flows/FlowVersion.kt
@@ -0,0 +1,18 @@
+package net.corda.core.flows
+
+/**
+ * Annotation for initiating [FlowLogic]s to specify the version of their flow protocol. The version is a single integer
+ * [value] which increments by one whenever a release is made where the flow protocol changes in any manner which is
+ * backwards incompatible. This may be a change in the sequence of sends and receives between the client and service flows,
+ * or it could be a change in the meaning. The version is used when a flow first initiates communication with a party to
+ * inform them what version they are using. For this reason the annotation is not applicable for the initiated flow.
+ *
+ * This flow version integer is not the same as Corda's platform version, though it follows a similar semantic.
+ *
+ * Note: Only one version of the same flow can currently be loaded at the same time. Any session request by a client flow for
+ * a different version will be rejected.
+ *
+ * Defaults to a flow version of 1 if not specified.
+ */
+// TODO Add support for multiple versions once CorDapps are loaded in separate class loaders
+annotation class FlowVersion(val value: Int)
diff --git a/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt b/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt
index 2e0224a6ad..ea948ca162 100644
--- a/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt
+++ b/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt
@@ -190,6 +190,8 @@ interface Message {
 interface ReceivedMessage : Message {
     /** The authenticated sender. */
     val peer: X500Name
+    /** Platform version of the sender's node. */
+    val platformVersion: Int
 }
 
 /** A singleton that's useful for validating topic strings */
diff --git a/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt b/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt
index bc2528bfb3..c5b3db232b 100644
--- a/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt
+++ b/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt
@@ -6,7 +6,6 @@ import net.corda.core.flows.FlowLogic
 import net.corda.core.serialization.CordaSerializable
 import net.corda.core.transactions.SignedTransaction
 
-
 /**
  * Notify the specified parties about a transaction. The remote peers will download this transaction and its
  * dependency graph, verifying them all. The flow returns when all peers have acknowledged the transactions
@@ -26,7 +25,7 @@ class BroadcastTransactionFlow(val notarisedTransaction: SignedTransaction,
         // TODO: Messaging layer should handle this broadcast for us
         val msg = NotifyTxRequest(notarisedTransaction)
         participants.filter { it != serviceHub.myInfo.legalIdentity }.forEach { participant ->
-            // This pops out the other side in DataVending.NotifyTransactionHandler.
+            // This pops out the other side in NotifyTransactionHandler
             send(participant, msg)
         }
     }
diff --git a/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt b/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt
index af2fcde85d..c42e0038f8 100644
--- a/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt
+++ b/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt
@@ -88,9 +88,7 @@ object NotaryChangeFlow : AbstractStateReplacementFlow() {
 
     }
 
-    class Acceptor(otherSide: Party,
-                   override val progressTracker: ProgressTracker = tracker()) : AbstractStateReplacementFlow.Acceptor<Party>(otherSide) {
-
+    class Acceptor(otherSide: Party) : AbstractStateReplacementFlow.Acceptor<Party>(otherSide) {
         /**
          * Check the notary change proposal.
          *
diff --git a/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt b/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt
index c2b21333d3..fe45b2d27e 100644
--- a/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt
+++ b/core/src/test/kotlin/net/corda/core/flows/TxKeyFlow.kt
@@ -2,7 +2,6 @@ package net.corda.core.flows
 
 import co.paralleluniverse.fibers.Suspendable
 import net.corda.core.crypto.Party
-import net.corda.core.node.PluginServiceHub
 import net.corda.core.utilities.ProgressTracker
 import net.corda.flows.TxKeyFlowUtilities
 import java.security.PublicKey
@@ -14,9 +13,6 @@ import java.security.cert.Certificate
  * DoS of the node, as key generation/storage is vastly more expensive than submitting a request.
  */
 object TxKeyFlow {
-    fun registerServiceFlow(services: PluginServiceHub) {
-        services.registerServiceFlow(Requester::class.java, ::Provider)
-    }
 
     class Requester(val otherSide: Party,
                     override val progressTracker: ProgressTracker) : FlowLogic<Pair<PublicKey, Certificate?>>() {
diff --git a/core/src/test/kotlin/net/corda/core/flows/TxKeyFlowUtilitiesTests.kt b/core/src/test/kotlin/net/corda/core/flows/TxKeyFlowUtilitiesTests.kt
index 69d5242e6c..40595a0b55 100644
--- a/core/src/test/kotlin/net/corda/core/flows/TxKeyFlowUtilitiesTests.kt
+++ b/core/src/test/kotlin/net/corda/core/flows/TxKeyFlowUtilitiesTests.kt
@@ -32,7 +32,7 @@ class TxKeyFlowUtilitiesTests {
         val bobKey: Party = bobNode.services.myInfo.legalIdentity
 
         // Run the flows
-        TxKeyFlow.registerServiceFlow(bobNode.services)
+        bobNode.registerServiceFlow(TxKeyFlow.Requester::class) { TxKeyFlow.Provider(it) }
         val requesterFlow = aliceNode.services.startFlow(TxKeyFlow.Requester(bobKey))
 
         // Get the results
diff --git a/docs/source/release-notes.rst b/docs/source/release-notes.rst
index 5f44a37109..c59339e880 100644
--- a/docs/source/release-notes.rst
+++ b/docs/source/release-notes.rst
@@ -34,6 +34,11 @@ serialisation, etc. The node exposes the platform version it's on and we envisio
 run on older versions of the platform to the one they were compiled against. Platform version borrows heavily from Android's
 API Level.
 
+Flows can now be versioned using the ``FlowVersion`` annotation, which assigns an integer version number to it. For now
+this enables a node to restrict usage of a flow to a specific version. Support for multiple verisons of the same flow,
+hence achieving backwards compatibility, will be possible once we start loading CorDapps in separate class loaders. Watch
+this space...
+
 Milestone 10
 ------------
 
diff --git a/docs/source/versioning.rst b/docs/source/versioning.rst
index ff502c0d97..80562490ab 100644
--- a/docs/source/versioning.rst
+++ b/docs/source/versioning.rst
@@ -27,3 +27,18 @@ for the network.
 .. note:: A future release may introduce the concept of a target platform version, which would be similar to Android's
    ``targetSdkVersion``, and would provide a means of maintaining behavioural compatibility for the cases where the
    platform's behaviour has changed.
+
+Flow versioning
+---------------
+
+A platform which can be extended with CorDapps also requires the ability to version these apps as they evolve from
+release to release. This allows users of these apps, whether they're other nodes or RPC users, to select which version
+they wish to use and enables nodes to control which app versions they support. Flows have their own version numbers,
+independent of other versioning, for example of the platform. In particular it is the initiating flow that can be versioned
+using the ``FlowVersion`` annotation. This assigns an integer version number, similar in concept to the platform version,
+which is used in the session handshake process when a flow communicates with another party for the first time. The other
+party will only accept the session request if it, firstly, has that flow loaded, and secondly, for the same version (see
+:doc:`flow-state-machine`).
+
+.. note:: Currently we don't support multiple versions of the same flow loaded in the same node. This will be possible
+   once we start loading CorDapps in separate class loaders.
diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt
new file mode 100644
index 0000000000..d9f5cbbcf9
--- /dev/null
+++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt
@@ -0,0 +1,36 @@
+package net.corda.node.services.statemachine
+
+import co.paralleluniverse.fibers.Suspendable
+import com.google.common.util.concurrent.Futures
+import net.corda.core.crypto.Party
+import net.corda.core.flows.FlowLogic
+import net.corda.core.getOrThrow
+import net.corda.core.utilities.unwrap
+import net.corda.testing.node.NodeBasedTest
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.Test
+
+class FlowVersioningTest : NodeBasedTest() {
+    @Test
+    fun `core flows receive platform version of initiator`() {
+        val (alice, bob) = Futures.allAsList(
+                startNode("Alice", platformVersion = 2),
+                startNode("Bob", platformVersion = 3)).getOrThrow()
+        bob.installCoreFlow(ClientFlow::class, ::SendBackPlatformVersionFlow)
+        val resultFuture = alice.services.startFlow(ClientFlow(bob.info.legalIdentity)).resultFuture
+        assertThat(resultFuture.getOrThrow()).isEqualTo(2)
+    }
+
+    private open class ClientFlow(val otherParty: Party) : FlowLogic<Any>() {
+        @Suspendable
+        override fun call(): Any {
+            return sendAndReceive<Any>(otherParty, "This is ignored. We only send to kick off the flow on the other side").unwrap { it }
+        }
+    }
+
+    private open class SendBackPlatformVersionFlow(val otherParty: Party, val otherPartysPlatformVersion: Any) : FlowLogic<Unit>() {
+        @Suspendable
+        override fun call() = send(otherParty, otherPartysPlatformVersion)
+    }
+
+}
\ No newline at end of file
diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
index d51081a738..ab08631366 100644
--- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
+++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
@@ -10,10 +10,7 @@ import net.corda.core.contracts.Amount
 import net.corda.core.contracts.PartyAndReference
 import net.corda.core.crypto.Party
 import net.corda.core.crypto.X509Utilities
-import net.corda.core.flows.FlowInitiator
-import net.corda.core.flows.FlowLogic
-import net.corda.core.flows.FlowLogicRefFactory
-import net.corda.core.flows.FlowStateMachine
+import net.corda.core.flows.*
 import net.corda.core.messaging.CordaRPCOps
 import net.corda.core.messaging.RPCOps
 import net.corda.core.messaging.SingleMessageRecipient
@@ -25,6 +22,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
 import net.corda.core.serialization.deserialize
 import net.corda.core.serialization.serialize
 import net.corda.core.transactions.SignedTransaction
+import net.corda.core.utilities.debug
 import net.corda.flows.*
 import net.corda.node.services.api.*
 import net.corda.node.services.config.FullNodeConfiguration
@@ -43,6 +41,7 @@ import net.corda.node.services.persistence.*
 import net.corda.node.services.schema.HibernateObserver
 import net.corda.node.services.schema.NodeSchemaService
 import net.corda.node.services.statemachine.StateMachineManager
+import net.corda.node.services.statemachine.flowVersion
 import net.corda.node.services.transactions.*
 import net.corda.node.services.vault.CashBalanceAsMetricsObserver
 import net.corda.node.services.vault.NodeVaultService
@@ -63,7 +62,9 @@ import java.time.Clock
 import java.util.*
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.ExecutorService
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeUnit.SECONDS
+import kotlin.collections.ArrayList
+import kotlin.reflect.KClass
 import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
 
 /**
@@ -107,7 +108,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
     // low-performance prototyping period.
     protected abstract val serverThread: AffinityExecutor
 
-    private val serviceFlowFactories = ConcurrentHashMap<Class<*>, (Party) -> FlowLogic<*>>()
+    protected val serviceFlowFactories = ConcurrentHashMap<Class<*>, ServiceFlowInfo>()
     protected val partyKeys = mutableSetOf<KeyPair>()
 
     val services = object : ServiceHubInternal() {
@@ -118,7 +119,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
         override val keyManagementService: KeyManagementService get() = keyManagement
         override val identityService: IdentityService get() = identity
         override val schedulerService: SchedulerService get() = scheduler
-        override val clock: Clock = platformClock
+        override val clock: Clock get() = platformClock
         override val myInfo: NodeInfo get() = info
         override val schemaService: SchemaService get() = schemas
         override val transactionVerifierService: TransactionVerifierService get() = txVerifierService
@@ -133,11 +134,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
 
         override fun registerServiceFlow(clientFlowClass: Class<out FlowLogic<*>>, serviceFlowFactory: (Party) -> FlowLogic<*>) {
             require(clientFlowClass !in serviceFlowFactories) { "${clientFlowClass.name} has already been used to register a service flow" }
-            log.info("Registering service flow for ${clientFlowClass.name}")
-            serviceFlowFactories[clientFlowClass] = serviceFlowFactory
+            val version = clientFlowClass.flowVersion
+            val info = ServiceFlowInfo.CorDapp(version, serviceFlowFactory)
+            log.info("Registering service flow for ${clientFlowClass.name}: $info")
+            serviceFlowFactories[clientFlowClass] = info
         }
 
-        override fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ((Party) -> FlowLogic<*>)? {
+        override fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ServiceFlowInfo? {
             return serviceFlowFactories[clientFlowClass]
         }
 
@@ -157,7 +160,6 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
     lateinit var vault: VaultService
     lateinit var keyManagement: KeyManagementService
     var inNodeNetworkMapService: NetworkMapService? = null
-    var inNodeNotaryService: NotaryService? = null
     lateinit var txVerifierService: TransactionVerifierService
     lateinit var identity: IdentityService
     lateinit var net: MessagingServiceInternal
@@ -224,7 +226,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
                     // We wait here, even though any in-flight messages should have been drained away because the
                     // server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
                     // arbitrary and might be inappropriate.
-                    MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, TimeUnit.SECONDS)
+                    MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS)
                 }
             }
 
@@ -235,7 +237,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
                 false
             }
             startMessagingService(rpcOps)
-            services.registerServiceFlow(ContractUpgradeFlow.Instigator::class.java) { ContractUpgradeFlow.Acceptor(it) }
+            installCoreFlows()
             runOnStop += Runnable { net.stop() }
             _networkMapRegistrationFuture.setFuture(registerWithNetworkMapIfConfigured())
             smm.start()
@@ -247,6 +249,29 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
         return this
     }
 
+    /**
+     * @suppress
+     * Installs a flow that's core to the Corda platform. Unlike CorDapp flows which are versioned individually using
+     * [FlowVersion], core flows have the same version as the node's platform version. To cater for backwards compatibility
+     * [serviceFlowFactory] provides a second parameter which is the platform version of the initiating party.
+     */
+    @VisibleForTesting
+    fun installCoreFlow(clientFlowClass: KClass<out FlowLogic<*>>, serviceFlowFactory: (Party, Int) -> FlowLogic<*>) {
+        require(!clientFlowClass.java.isAnnotationPresent(FlowVersion::class.java)) {
+            "${FlowVersion::class.java.name} not applicable for core flows; their version is the node's platform version"
+        }
+        serviceFlowFactories[clientFlowClass.java] = ServiceFlowInfo.Core(serviceFlowFactory)
+        log.debug { "Installed core flow ${clientFlowClass.java.name}" }
+    }
+
+    private fun installCoreFlows() {
+        installCoreFlow(FetchTransactionsFlow::class) { otherParty, _ -> FetchTransactionsHandler(otherParty) }
+        installCoreFlow(FetchAttachmentsFlow::class) { otherParty, _ -> FetchAttachmentsHandler(otherParty) }
+        installCoreFlow(BroadcastTransactionFlow::class) { otherParty, _ -> NotifyTransactionHandler(otherParty) }
+        installCoreFlow(NotaryChangeFlow.Instigator::class) { otherParty, _ -> NotaryChangeFlow.Acceptor(otherParty) }
+        installCoreFlow(ContractUpgradeFlow.Instigator::class) { otherParty, _ -> ContractUpgradeFlow.Acceptor(otherParty) }
+    }
+
     /**
      * Builds node internal, advertised, and plugin services.
      * Returns a list of tokenizable services to be added to the serialisation context.
@@ -369,14 +394,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
     }
 
     private fun makePluginServices(tokenizableServices: MutableList<Any>): List<Any> {
-        val pluginServices = pluginRegistries.flatMap { x -> x.servicePlugins }
-        val serviceList = mutableListOf<Any>()
-        for (serviceConstructor in pluginServices) {
-            val service = serviceConstructor.apply(services)
-            serviceList.add(service)
-            tokenizableServices.add(service)
-        }
-        return serviceList
+        val pluginServices = pluginRegistries.flatMap { it.servicePlugins }.map { it.apply(services) }
+        tokenizableServices.addAll(pluginServices)
+        return pluginServices
     }
 
     /**
@@ -393,13 +413,13 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
 
         val notaryServiceType = serviceTypes.singleOrNull { it.isNotary() }
         if (notaryServiceType != null) {
-            inNodeNotaryService = makeNotaryService(notaryServiceType, tokenizableServices)
+            makeNotaryService(notaryServiceType, tokenizableServices)
         }
     }
 
     private fun registerWithNetworkMapIfConfigured(): ListenableFuture<Unit> {
         services.networkMapCache.addNode(info)
-        // In the unit test environment, we may run without any network map service sometimes.
+        // In the unit test environment, we may sometimes run without any network map service
         return if (networkMapAddress == null && inNodeNetworkMapService == null) {
             services.networkMapCache.runWithoutMapService()
             noNetworkMapConfigured()  // TODO This method isn't needed as runWithoutMapService sets the Future in the cache
@@ -448,26 +468,28 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
         inNodeNetworkMapService = PersistentNetworkMapService(services, configuration.minimumPlatformVersion)
     }
 
-    open protected fun makeNotaryService(type: ServiceType, tokenizableServices: MutableList<Any>): NotaryService {
+    open protected fun makeNotaryService(type: ServiceType, tokenizableServices: MutableList<Any>) {
         val timestampChecker = TimestampChecker(platformClock, 30.seconds)
         val uniquenessProvider = makeUniquenessProvider(type)
         tokenizableServices.add(uniquenessProvider)
 
-        return when (type) {
-            SimpleNotaryService.type -> SimpleNotaryService(services, timestampChecker, uniquenessProvider)
-            ValidatingNotaryService.type -> ValidatingNotaryService(services, timestampChecker, uniquenessProvider)
-            RaftNonValidatingNotaryService.type -> RaftNonValidatingNotaryService(services, timestampChecker, uniquenessProvider as RaftUniquenessProvider)
-            RaftValidatingNotaryService.type -> RaftValidatingNotaryService(services, timestampChecker, uniquenessProvider as RaftUniquenessProvider)
+        val notaryService = when (type) {
+            SimpleNotaryService.type -> SimpleNotaryService(timestampChecker, uniquenessProvider)
+            ValidatingNotaryService.type -> ValidatingNotaryService(timestampChecker, uniquenessProvider)
+            RaftNonValidatingNotaryService.type -> RaftNonValidatingNotaryService(timestampChecker, uniquenessProvider as RaftUniquenessProvider)
+            RaftValidatingNotaryService.type -> RaftValidatingNotaryService(timestampChecker, uniquenessProvider as RaftUniquenessProvider)
             BFTNonValidatingNotaryService.type -> with(configuration as FullNodeConfiguration) {
                 val nodeId = notaryNodeId ?: throw IllegalArgumentException("notaryNodeId value must be specified in the configuration")
                 val client = BFTSMaRt.Client(nodeId)
-                tokenizableServices.add(client)
+                tokenizableServices += client
                 BFTNonValidatingNotaryService(services, timestampChecker, nodeId, database, client)
             }
             else -> {
                 throw IllegalArgumentException("Notary type ${type.id} is not handled by makeNotaryService.")
             }
         }
+
+        installCoreFlow(NotaryFlow.Client::class, notaryService.serviceFlowFactory)
     }
 
     protected abstract fun makeUniquenessProvider(type: ServiceType): UniquenessProvider
@@ -579,3 +601,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
         configuration.baseDirectory.createDirectories()
     }
 }
+
+sealed class ServiceFlowInfo {
+    data class Core(val factory: (Party, Int) -> FlowLogic<*>) : ServiceFlowInfo()
+    data class CorDapp(val version: Int, val factory: (Party) -> FlowLogic<*>) : ServiceFlowInfo()
+}
diff --git a/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt b/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt
deleted file mode 100644
index 4115ff6dfd..0000000000
--- a/node/src/main/kotlin/net/corda/node/services/NotaryChangeService.kt
+++ /dev/null
@@ -1,23 +0,0 @@
-package net.corda.node.services
-
-import net.corda.core.node.CordaPluginRegistry
-import net.corda.core.node.PluginServiceHub
-import net.corda.core.serialization.SingletonSerializeAsToken
-import net.corda.flows.NotaryChangeFlow
-import java.util.function.Function
-
-object NotaryChange {
-    class Plugin : CordaPluginRegistry() {
-        override val servicePlugins = listOf(Function(::Service))
-    }
-
-    /**
-     * A service that monitors the network for requests for changing the notary of a state,
-     * and immediately runs the [NotaryChangeFlow] if the auto-accept criteria are met.
-     */
-    class Service(services: PluginServiceHub) : SingletonSerializeAsToken() {
-        init {
-            services.registerServiceFlow(NotaryChangeFlow.Instigator::class.java) { NotaryChangeFlow.Acceptor(it) }
-        }
-    }
-}
diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt
index 56c5c7247e..acff817c4e 100644
--- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt
+++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt
@@ -2,7 +2,6 @@ package net.corda.node.services.api
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.util.concurrent.ListenableFuture
-import net.corda.core.crypto.Party
 import net.corda.core.flows.FlowInitiator
 import net.corda.core.flows.FlowLogic
 import net.corda.core.flows.FlowLogicRefFactory
@@ -11,8 +10,9 @@ import net.corda.core.messaging.MessagingService
 import net.corda.core.node.PluginServiceHub
 import net.corda.core.node.services.TxWritableStorageService
 import net.corda.core.transactions.SignedTransaction
+import net.corda.core.utilities.loggerFor
+import net.corda.node.internal.ServiceFlowInfo
 import net.corda.node.services.statemachine.FlowStateMachineImpl
-import org.slf4j.LoggerFactory
 
 interface MessagingServiceInternal : MessagingService {
     /**
@@ -37,9 +37,12 @@ interface MessagingServiceBuilder<out T : MessagingServiceInternal> {
     fun start(): ListenableFuture<out T>
 }
 
-private val log = LoggerFactory.getLogger(ServiceHubInternal::class.java)
 
 abstract class ServiceHubInternal : PluginServiceHub {
+    companion object {
+        private val log = loggerFor<ServiceHubInternal>()
+    }
+
     abstract val monitoringService: MonitoringService
     abstract val flowLogicRefFactory: FlowLogicRefFactory
     abstract val schemaService: SchemaService
@@ -99,5 +102,5 @@ abstract class ServiceHubInternal : PluginServiceHub {
         return startFlow(logic, flowInitiator)
     }
 
-    abstract fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ((Party) -> FlowLogic<*>)?
-}
+    abstract fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ServiceFlowInfo?
+}
\ No newline at end of file
diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt
index 514e13f451..e4683ca31b 100644
--- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt
+++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt
@@ -38,13 +38,13 @@ import org.bouncycastle.asn1.x500.X500Name
 import org.jetbrains.exposed.sql.Database
 import org.jetbrains.exposed.sql.ResultRow
 import org.jetbrains.exposed.sql.statements.InsertStatement
+import java.security.PublicKey
 import java.time.Instant
 import java.util.*
 import java.util.concurrent.CopyOnWriteArrayList
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.TimeUnit
 import javax.annotation.concurrent.ThreadSafe
-import java.security.PublicKey
 
 // TODO: Stop the wallet explorer and other clients from using this class and get rid of persistentInbox
 
@@ -296,23 +296,13 @@ class NodeMessagingClient(override val config: NodeConfiguration,
         try {
             val topic = message.required(topicProperty) { getStringProperty(it) }
             val sessionID = message.required(sessionIdProperty) { getLongProperty(it) }
+            val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" }
+            val platformVersion = message.required(platformVersionProperty) { getIntProperty(it) }
             // Use the magic deduplication property built into Artemis as our message identity too
             val uuid = message.required(HDR_DUPLICATE_DETECTION_ID) { UUID.fromString(message.getStringProperty(it)) }
-            val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" }
             log.trace { "Received message from: ${message.address} user: $user topic: $topic sessionID: $sessionID uuid: $uuid" }
 
-            val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
-
-            val msg = object : ReceivedMessage {
-                override val topicSession = TopicSession(topic, sessionID)
-                override val data: ByteArray = body
-                override val peer: X500Name = X500Name(user)
-                override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
-                override val uniqueMessageId: UUID = uuid
-                override fun toString() = "$topic#${data.opaque()}"
-            }
-
-            return msg
+            return ArtemisReceivedMessage(TopicSession(topic, sessionID), X500Name(user), platformVersion, uuid, message)
         } catch (e: Exception) {
             log.error("Unable to process message, ignoring it: $message", e)
             return null
@@ -324,6 +314,16 @@ class NodeMessagingClient(override val config: NodeConfiguration,
         return extractor(key)
     }
 
+    private class ArtemisReceivedMessage(override val topicSession: TopicSession,
+                                         override val peer: X500Name,
+                                         override val platformVersion: Int,
+                                         override val uniqueMessageId: UUID,
+                                         private val message: ClientMessage) : ReceivedMessage {
+        override val data: ByteArray by lazy { ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) } }
+        override val debugTimestamp: Instant get() = Instant.ofEpochMilli(message.timestamp)
+        override fun toString() = "${topicSession.topic}#${data.opaque()}"
+    }
+
     private fun deliver(msg: ReceivedMessage): Boolean {
         state.checkNotLocked()
         // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt
index e4f8c77ed2..42fb222fb9 100644
--- a/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt
@@ -5,83 +5,61 @@ import net.corda.core.crypto.Party
 import net.corda.core.crypto.SecureHash
 import net.corda.core.flows.FlowException
 import net.corda.core.flows.FlowLogic
-import net.corda.core.node.CordaPluginRegistry
-import net.corda.core.node.PluginServiceHub
-import net.corda.core.serialization.SingletonSerializeAsToken
 import net.corda.core.transactions.SignedTransaction
 import net.corda.core.utilities.unwrap
 import net.corda.flows.*
-import java.util.function.Function
-import javax.annotation.concurrent.ThreadSafe
 
-object DataVending {
-    class Plugin : CordaPluginRegistry() {
-        override val servicePlugins = listOf(Function(::Service))
+/**
+ * This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
+ * glue that sits between the network layer and the database layer.
+ *
+ * Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There
+ * are no access control lists. If you want to keep some data private, then you must be careful who you give its name
+ * to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have
+ * its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as
+ * such the hash of a piece of data can be seen as a type of password allowing access to it.
+ *
+ * Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
+ */
+class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTransaction>(otherParty) {
+    override fun getData(id: SecureHash): SignedTransaction? {
+        return serviceHub.storageService.validatedTransactions.getTransaction(id)
+    }
+}
+
+// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
+class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
+    override fun getData(id: SecureHash): ByteArray? {
+        return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes()
+    }
+}
+
+abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>() {
+    @Suspendable
+    @Throws(FetchDataFlow.HashNotFound::class)
+    override fun call() {
+        val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
+            if (it.hashes.isEmpty()) throw FlowException("Empty hash list")
+            it
+        }
+        val response = request.hashes.map {
+            getData(it) ?: throw FetchDataFlow.HashNotFound(it)
+        }
+        send(otherParty, response)
+    }
+
+    protected abstract fun getData(id: SecureHash): T?
+}
+
+// TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
+//       includes us in any outside that list. Potentially just if it includes any outside that list at all.
+// TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
+//       cash without from unknown parties?
+class NotifyTransactionHandler(val otherParty: Party) : FlowLogic<Unit>() {
+    @Suspendable
+    override fun call() {
+        val request = receive<BroadcastTransactionFlow.NotifyTxRequest>(otherParty).unwrap { it }
+        subFlow(ResolveTransactionsFlow(request.tx, otherParty), shareParentSessions = true)
+        serviceHub.recordTransactions(request.tx)
     }
-
-    /**
-     * This class sets up network message handlers for requests from peers for data keyed by hash. It is a piece of simple
-     * glue that sits between the network layer and the database layer.
-     *
-     * Note that in our data model, to be able to name a thing by hash automatically gives the power to request it. There
-     * are no access control lists. If you want to keep some data private, then you must be careful who you give its name
-     * to, and trust that they will not pass the name onwards. If someone suspects some data might exist but does not have
-     * its name, then the 256-bit search space they'd have to cover makes it physically impossible to enumerate, and as
-     * such the hash of a piece of data can be seen as a type of password allowing access to it.
-     *
-     * Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
-     */
-    @ThreadSafe
-    class Service(services: PluginServiceHub) : SingletonSerializeAsToken() {
-        init {
-            services.registerServiceFlow(FetchTransactionsFlow::class.java, ::FetchTransactionsHandler)
-            services.registerServiceFlow(FetchAttachmentsFlow::class.java, ::FetchAttachmentsHandler)
-            services.registerServiceFlow(BroadcastTransactionFlow::class.java, ::NotifyTransactionHandler)
-        }
-
-        private class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTransaction>(otherParty) {
-            override fun getData(id: SecureHash): SignedTransaction? {
-                return serviceHub.storageService.validatedTransactions.getTransaction(id)
-            }
-        }
-
-        // TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
-        private class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
-            override fun getData(id: SecureHash): ByteArray? {
-                return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes()
-            }
-        }
-
-        private abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>() {
-            @Suspendable
-            @Throws(FetchDataFlow.HashNotFound::class)
-            override fun call() {
-                val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
-                    if (it.hashes.isEmpty()) throw FlowException("Empty hash list")
-                    it
-                }
-                val response = request.hashes.map {
-                    getData(it) ?: throw FetchDataFlow.HashNotFound(it)
-                }
-                send(otherParty, response)
-            }
-
-            protected abstract fun getData(id: SecureHash): T?
-        }
-
-
-        // TODO: We should have a whitelist of contracts we're willing to accept at all, and reject if the transaction
-        //       includes us in any outside that list. Potentially just if it includes any outside that list at all.
-        // TODO: Do we want to be able to reject specific transactions on more complex rules, for example reject incoming
-        //       cash without from unknown parties?
-        class NotifyTransactionHandler(val otherParty: Party) : FlowLogic<Unit>() {
-            @Suspendable
-            override fun call() {
-                val request = receive<BroadcastTransactionFlow.NotifyTxRequest>(otherParty).unwrap { it }
-                subFlow(ResolveTransactionsFlow(request.tx, otherParty), shareParentSessions = true)
-                serviceHub.recordTransactions(request.tx)
-            }
-        }
-    }
-
 }
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
index de026c0d18..081e239e77 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
@@ -298,7 +298,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
         openSessions[Pair(sessionFlow, otherParty)] = session
         // We get the top-most concrete class object to cater for the case where the client flow is customised via a sub-class
         val clientFlowClass = sessionFlow.topConcreteFlowClass
-        val sessionInit = SessionInit(session.ourSessionId, clientFlowClass, firstPayload)
+        val sessionInit = SessionInit(session.ourSessionId, clientFlowClass, clientFlowClass.flowVersion, firstPayload)
         sendInternal(session, sessionInit)
         if (waitForConfirmation) {
             session.waitForConfirmation()
@@ -434,6 +434,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
     }
 }
 
+val Class<out FlowLogic<*>>.flowVersion: Int get() {
+    val flowVersion = getDeclaredAnnotation(FlowVersion::class.java) ?: return 1
+    require(flowVersion.value > 0) { "Flow versions have to be greater or equal to 1" }
+    return flowVersion.value
+}
+
 // I would prefer for [FlowProgressHandleImpl] to extend [FlowHandleImpl],
 // but Kotlin doesn't allow this for data classes, not even to create
 // another data class!
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt
index d84dab7988..faeb131298 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt
@@ -15,6 +15,7 @@ interface SessionMessage
 
 data class SessionInit(val initiatorSessionId: Long,
                        val clientFlowClass: Class<out FlowLogic<*>>,
+                       val flowVerison: Int,
                        val firstPayload: Any?) : SessionMessage
 
 interface ExistingSessionMessage : SessionMessage {
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt
index c62317c66b..3baa199eed 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt
@@ -13,9 +13,7 @@ import com.esotericsoftware.kryo.pool.KryoPool
 import com.google.common.collect.HashMultimap
 import com.google.common.util.concurrent.ListenableFuture
 import io.requery.util.CloseableIterator
-import net.corda.core.ErrorOr
-import net.corda.core.ThreadBox
-import net.corda.core.bufferUntilSubscribed
+import net.corda.core.*
 import net.corda.core.crypto.Party
 import net.corda.core.crypto.SecureHash
 import net.corda.core.crypto.commonName
@@ -23,12 +21,11 @@ import net.corda.core.flows.*
 import net.corda.core.messaging.ReceivedMessage
 import net.corda.core.messaging.TopicSession
 import net.corda.core.messaging.send
-import net.corda.core.random63BitValue
 import net.corda.core.serialization.*
-import net.corda.core.then
 import net.corda.core.utilities.debug
 import net.corda.core.utilities.loggerFor
 import net.corda.core.utilities.trace
+import net.corda.node.internal.ServiceFlowInfo
 import net.corda.node.services.api.Checkpoint
 import net.corda.node.services.api.CheckpointStorage
 import net.corda.node.services.api.ServiceHubInternal
@@ -151,7 +148,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
     private val recentlyClosedSessions = ConcurrentHashMap<Long, Party>()
 
     // Context for tokenized services in checkpoints
-    private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryo(), serviceHub)
+    private val serializationContext = SerializeAsTokenContext(tokenizableServices, quasarKryoPool, serviceHub)
 
     /** Returns a list of all state machines executing the given flow logic at the top level (subflows do not count) */
     fun <P : FlowLogic<T>, T> findStateMachines(flowClass: Class<P>): List<Pair<P, ListenableFuture<T>>> {
@@ -289,7 +286,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
         if (sender != null) {
             when (sessionMessage) {
                 is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, sender)
-                is SessionInit -> onSessionInit(sessionMessage, sender)
+                is SessionInit -> onSessionInit(sessionMessage, message, sender)
             }
         } else {
             logger.error("Unknown peer ${message.peer} in $sessionMessage")
@@ -335,21 +332,38 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
                 waitingForResponse is WaitForLedgerCommit && message is ErrorSessionEnd
     }
 
-    private fun onSessionInit(sessionInit: SessionInit, sender: Party) {
+    private fun onSessionInit(sessionInit: SessionInit, receivedMessage: ReceivedMessage, sender: Party) {
         logger.trace { "Received $sessionInit from $sender" }
         val otherPartySessionId = sessionInit.initiatorSessionId
 
         fun sendSessionReject(message: String) = sendSessionMessage(sender, SessionReject(otherPartySessionId, message))
 
-        val flowFactory = serviceHub.getServiceFlowFactory(sessionInit.clientFlowClass)
-        if (flowFactory == null) {
+        val serviceFlowInfo = serviceHub.getServiceFlowFactory(sessionInit.clientFlowClass)
+        if (serviceFlowInfo == null) {
             logger.warn("${sessionInit.clientFlowClass} has not been registered with a service flow: $sessionInit")
             sendSessionReject("Don't know ${sessionInit.clientFlowClass.name}")
             return
         }
 
         val session = try {
-            val flow = flowFactory(sender)
+            val flow = when (serviceFlowInfo) {
+                is ServiceFlowInfo.CorDapp -> {
+                    // TODO Add support for multiple versions of the same flow when CorDapps are loaded in separate class loaders
+                    if (sessionInit.flowVerison != serviceFlowInfo.version) {
+                        logger.warn("Version mismatch - ${sessionInit.clientFlowClass} is only registered for version " +
+                                "${serviceFlowInfo.version}: $sessionInit")
+                        sendSessionReject("Version not supported")
+                        return
+                    }
+                    serviceFlowInfo.factory(sender)
+                }
+                is ServiceFlowInfo.Core -> serviceFlowInfo.factory(sender, receivedMessage.platformVersion)
+            }
+
+            if (flow.javaClass.isAnnotationPresent(FlowVersion::class.java)) {
+                logger.warn("${FlowVersion::class.java.name} is not applicable for service flows: ${flow.javaClass.name}")
+            }
+
             val fiber = createFiber(flow, FlowInitiator.Peer(sender))
             val session = FlowSession(flow, random63BitValue(), sender, FlowSessionState.Initiated(sender, otherPartySessionId))
             if (sessionInit.firstPayload != null) {
@@ -372,7 +386,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
     }
 
     private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes<FlowStateMachineImpl<*>> {
-        return quasarKryo().run { kryo ->
+        return quasarKryoPool.run { kryo ->
             // add the map of tokens -> tokenizedServices to the kyro context
             kryo.withSerializationContext(serializationContext) {
                 fiber.serialize(kryo)
@@ -381,7 +395,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
     }
 
     private fun deserializeFiber(checkpoint: Checkpoint): FlowStateMachineImpl<*> {
-        return quasarKryo().run { kryo ->
+        return quasarKryoPool.run { kryo ->
             // put the map of token -> tokenized into the kryo context
             kryo.withSerializationContext(serializationContext) {
                 checkpoint.serializedFiber.deserialize(kryo)
@@ -389,8 +403,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
         }
     }
 
-    private fun quasarKryo(): KryoPool = quasarKryoPool
-
     private fun <T> createFiber(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachineImpl<T> {
         val id = StateMachineRunId.createRandom()
         return FlowStateMachineImpl(id, logic, scheduler, flowInitiator).apply { initFiber(this) }
diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt
index 128370e7b3..4d9d58de8d 100644
--- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt
@@ -25,7 +25,7 @@ class BFTNonValidatingNotaryService(services: ServiceHubInternal,
                                     timestampChecker: TimestampChecker,
                                     serverId: Int,
                                     db: Database,
-                                    val client: BFTSMaRt.Client) : NotaryService(services) {
+                                    val client: BFTSMaRt.Client) : NotaryService {
     init {
         thread(name = "BFTSmartServer-$serverId", isDaemon = true) {
             Server(serverId, db, "bft_smart_notary_committed_states", services, timestampChecker)
@@ -37,9 +37,11 @@ class BFTNonValidatingNotaryService(services: ServiceHubInternal,
         private val log = loggerFor<BFTNonValidatingNotaryService>()
     }
 
-    override fun createFlow(otherParty: Party) = ServiceFlow(otherParty, client)
+    override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
+        ServiceFlow(otherParty, client)
+    }
 
-    class ServiceFlow(val otherSide: Party, val client: BFTSMaRt.Client) : FlowLogic<Void?>() {
+    private class ServiceFlow(val otherSide: Party, val client: BFTSMaRt.Client) : FlowLogic<Void?>() {
         @Suspendable
         override fun call(): Void? {
             val stx = receive<FilteredTransaction>(otherSide).unwrap { it }
@@ -60,11 +62,11 @@ class BFTNonValidatingNotaryService(services: ServiceHubInternal,
         }
     }
 
-    class Server(id: Int,
-                 db: Database,
-                 tableName: String,
-                 services: ServiceHubInternal,
-                 timestampChecker: TimestampChecker) : BFTSMaRt.Server(id, db, tableName, services, timestampChecker) {
+    private class Server(id: Int,
+                         db: Database,
+                         tableName: String,
+                         services: ServiceHubInternal,
+                         timestampChecker: TimestampChecker) : BFTSMaRt.Server(id, db, tableName, services, timestampChecker) {
 
         override fun executeCommand(command: ByteArray): ByteArray {
             val request = command.deserialize<BFTSMaRt.CommitRequest>()
diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt
index 621e27737d..941caff33d 100644
--- a/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt
@@ -2,26 +2,13 @@ package net.corda.node.services.transactions
 
 import net.corda.core.crypto.Party
 import net.corda.core.flows.FlowLogic
-import net.corda.core.serialization.SingletonSerializeAsToken
-import net.corda.flows.NotaryFlow
-import net.corda.node.services.api.ServiceHubInternal
 
-/**
- * A Notary service acts as the final signer of a transaction ensuring two things:
- * - The (optional) timestamp of the transaction is valid.
- * - None of the referenced input states have previously been consumed by a transaction signed by this Notary
- *O
- * A transaction has to be signed by a Notary to be considered valid (except for output-only transactions without a timestamp).
- *
- * This is the base implementation that can be customised with specific Notary transaction commit flow.
- */
-abstract class NotaryService(services: ServiceHubInternal) : SingletonSerializeAsToken() {
-
-    init {
-        services.registerServiceFlow(NotaryFlow.Client::class.java) { createFlow(it) }
-    }
-
-    /** Implement a factory that specifies the transaction commit flow for the notary service to use */
-    abstract fun createFlow(otherParty: Party): FlowLogic<Void?>
+interface NotaryService {
 
+    /**
+     * Factory for producing notary service flows which have the corresponding sends and receives as NotaryFlow.Client.
+     * The first parameter is the client [Party] making the request and the second is the platform version of the client's
+     * node. Use this version parameter to provide backwards compatibility if the notary flow protocol changes.
+     */
+    val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?>
 }
diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt
index 0fc78c605a..e61eafc079 100644
--- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt
@@ -1,19 +1,18 @@
 package net.corda.node.services.transactions
 
 import net.corda.core.crypto.Party
+import net.corda.core.flows.FlowLogic
 import net.corda.core.node.services.TimestampChecker
 import net.corda.flows.NonValidatingNotaryFlow
-import net.corda.node.services.api.ServiceHubInternal
 
 /** A non-validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
-class RaftNonValidatingNotaryService(services: ServiceHubInternal,
-                                     val timestampChecker: TimestampChecker,
-                                     val uniquenessProvider: RaftUniquenessProvider) : NotaryService(services) {
+class RaftNonValidatingNotaryService(val timestampChecker: TimestampChecker,
+                                     val uniquenessProvider: RaftUniquenessProvider) : NotaryService {
     companion object {
         val type = SimpleNotaryService.type.getSubType("raft")
     }
 
-    override fun createFlow(otherParty: Party): NonValidatingNotaryFlow {
-        return NonValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
+    override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
+        NonValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
     }
 }
diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt
index 3ef3e1610f..bb52ee9d5b 100644
--- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt
@@ -1,19 +1,18 @@
 package net.corda.node.services.transactions
 
 import net.corda.core.crypto.Party
+import net.corda.core.flows.FlowLogic
 import net.corda.core.node.services.TimestampChecker
 import net.corda.flows.ValidatingNotaryFlow
-import net.corda.node.services.api.ServiceHubInternal
 
 /** A validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
-class RaftValidatingNotaryService(services: ServiceHubInternal,
-                                  val timestampChecker: TimestampChecker,
-                                  val uniquenessProvider: RaftUniquenessProvider) : NotaryService(services) {
+class RaftValidatingNotaryService(val timestampChecker: TimestampChecker,
+                                  val uniquenessProvider: RaftUniquenessProvider) : NotaryService {
     companion object {
         val type = ValidatingNotaryService.type.getSubType("raft")
     }
 
-    override fun createFlow(otherParty: Party): ValidatingNotaryFlow {
-        return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
+    override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
+        ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
     }
 }
diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt
index 062d14d2a5..722a1fed2c 100644
--- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt
@@ -1,22 +1,20 @@
 package net.corda.node.services.transactions
 
 import net.corda.core.crypto.Party
+import net.corda.core.flows.FlowLogic
 import net.corda.core.node.services.ServiceType
 import net.corda.core.node.services.TimestampChecker
 import net.corda.core.node.services.UniquenessProvider
 import net.corda.flows.NonValidatingNotaryFlow
-import net.corda.flows.NotaryFlow
-import net.corda.node.services.api.ServiceHubInternal
 
 /** A simple Notary service that does not perform transaction validation */
-class SimpleNotaryService(services: ServiceHubInternal,
-                          val timestampChecker: TimestampChecker,
-                          val uniquenessProvider: UniquenessProvider) : NotaryService(services) {
+class SimpleNotaryService(val timestampChecker: TimestampChecker,
+                          val uniquenessProvider: UniquenessProvider) : NotaryService {
     companion object {
         val type = ServiceType.notary.getSubType("simple")
     }
 
-    override fun createFlow(otherParty: Party): NotaryFlow.Service {
-        return NonValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
+    override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
+        NonValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
     }
 }
diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt
index 96d1f1fc72..2b1000983f 100644
--- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt
@@ -1,21 +1,20 @@
 package net.corda.node.services.transactions
 
 import net.corda.core.crypto.Party
+import net.corda.core.flows.FlowLogic
 import net.corda.core.node.services.ServiceType
 import net.corda.core.node.services.TimestampChecker
 import net.corda.core.node.services.UniquenessProvider
 import net.corda.flows.ValidatingNotaryFlow
-import net.corda.node.services.api.ServiceHubInternal
 
 /** A Notary service that validates the transaction chain of the submitted transaction before committing it */
-class ValidatingNotaryService(services: ServiceHubInternal,
-                              val timestampChecker: TimestampChecker,
-                              val uniquenessProvider: UniquenessProvider) : NotaryService(services) {
+class ValidatingNotaryService(val timestampChecker: TimestampChecker,
+                              val uniquenessProvider: UniquenessProvider) : NotaryService {
     companion object {
         val type = ServiceType.notary.getSubType("validating")
     }
 
-    override fun createFlow(otherParty: Party): ValidatingNotaryFlow {
-        return ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
+    override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
+        ValidatingNotaryFlow(otherParty, timestampChecker, uniquenessProvider)
     }
 }
diff --git a/node/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry b/node/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry
deleted file mode 100644
index 884cc0cfae..0000000000
--- a/node/src/main/resources/META-INF/services/net.corda.core.node.CordaPluginRegistry
+++ /dev/null
@@ -1,3 +0,0 @@
-# Register a ServiceLoader service extending from net.corda.core.node.CordaPluginRegistry
-net.corda.node.services.NotaryChange$Plugin
-net.corda.node.services.persistence.DataVending$Plugin
\ No newline at end of file
diff --git a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt
index 8fe78212f5..a89f2d05a4 100644
--- a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt
+++ b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt
@@ -9,12 +9,12 @@ import net.corda.core.flows.FlowStateMachine
 import net.corda.core.node.NodeInfo
 import net.corda.core.node.services.*
 import net.corda.core.transactions.SignedTransaction
+import net.corda.node.internal.ServiceFlowInfo
 import net.corda.node.serialization.NodeClock
 import net.corda.node.services.api.MessagingServiceInternal
 import net.corda.node.services.api.MonitoringService
 import net.corda.node.services.api.SchemaService
 import net.corda.node.services.api.ServiceHubInternal
-import net.corda.node.services.persistence.DataVending
 import net.corda.node.services.schema.NodeSchemaService
 import net.corda.node.services.statemachine.StateMachineManager
 import net.corda.node.services.transactions.InMemoryTransactionVerifierService
@@ -69,14 +69,6 @@ open class MockServiceHubInternal(
 
     lateinit var smm: StateMachineManager
 
-    init {
-        if (net != null && storage != null) {
-            // Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener
-            // on the networking service, so that will keep it from being collected.
-            DataVending.Service(this)
-        }
-    }
-
     override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(txStorageService, txs)
 
     override fun <T> startFlow(logic: FlowLogic<T>, flowInitiator: FlowInitiator): FlowStateMachine<T> {
@@ -85,5 +77,5 @@ open class MockServiceHubInternal(
 
     override fun registerServiceFlow(clientFlowClass: Class<out FlowLogic<*>>, serviceFlowFactory: (Party) -> FlowLogic<*>) = Unit
 
-    override fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ((Party) -> FlowLogic<*>)? = null
+    override fun getServiceFlowFactory(clientFlowClass: Class<out FlowLogic<*>>): ServiceFlowInfo? = null
 }
diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt
index 97f60a1ccb..e87e35d961 100644
--- a/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt
+++ b/node/src/test/kotlin/net/corda/node/services/persistence/DataVendingServiceTests.kt
@@ -12,7 +12,6 @@ import net.corda.core.node.services.unconsumedStates
 import net.corda.core.transactions.SignedTransaction
 import net.corda.core.utilities.DUMMY_NOTARY
 import net.corda.flows.BroadcastTransactionFlow.NotifyTxRequest
-import net.corda.node.services.persistence.DataVending.Service.NotifyTransactionHandler
 import net.corda.node.utilities.transaction
 import net.corda.testing.MEGA_CORP
 import net.corda.testing.node.MockNetwork
@@ -89,7 +88,7 @@ class DataVendingServiceTests {
     }
 
     private fun MockNode.sendNotifyTx(tx: SignedTransaction, walletServiceNode: MockNode) {
-        walletServiceNode.services.registerServiceFlow(NotifyTxFlow::class.java, ::NotifyTransactionHandler)
+        walletServiceNode.registerServiceFlow(clientFlowClass = NotifyTxFlow::class, serviceFlowFactory = ::NotifyTransactionHandler)
         services.startFlow(NotifyTxFlow(walletServiceNode.info.legalIdentity, tx))
         network.runNetwork()
     }
diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt
index 2b903c9b60..aadbf6d472 100644
--- a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt
+++ b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt
@@ -11,6 +11,7 @@ import net.corda.core.crypto.Party
 import net.corda.core.crypto.generateKeyPair
 import net.corda.core.flows.FlowException
 import net.corda.core.flows.FlowLogic
+import net.corda.core.flows.FlowVersion
 import net.corda.core.messaging.MessageRecipients
 import net.corda.core.node.services.PartyInfo
 import net.corda.core.node.services.ServiceInfo
@@ -110,7 +111,7 @@ class StateMachineManagerTests {
 
     @Test
     fun `exception while fiber suspended`() {
-        node2.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow("Hello", it) }
+        node2.registerServiceFlow(ReceiveFlow::class) { SendFlow("Hello", it) }
         val flow = ReceiveFlow(node2.info.legalIdentity)
         val fiber = node1.services.startFlow(flow) as FlowStateMachineImpl
         // Before the flow runs change the suspend action to throw an exception
@@ -129,7 +130,7 @@ class StateMachineManagerTests {
 
     @Test
     fun `flow restarted just after receiving payload`() {
-        node2.services.registerServiceFlow(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() }
+        node2.registerServiceFlow(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
         node1.services.startFlow(SendFlow("Hello", node2.info.legalIdentity))
 
         // We push through just enough messages to get only the payload sent
@@ -179,7 +180,7 @@ class StateMachineManagerTests {
 
     @Test
     fun `flow loaded from checkpoint will respond to messages from before start`() {
-        node1.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow("Hello", it) }
+        node1.registerServiceFlow(ReceiveFlow::class) { SendFlow("Hello", it) }
         node2.services.startFlow(ReceiveFlow(node1.info.legalIdentity).nonTerminating()) // Prepare checkpointed receive flow
         // Make sure the add() has finished initial processing.
         node2.smm.executor.flush()
@@ -243,8 +244,8 @@ class StateMachineManagerTests {
     fun `sending to multiple parties`() {
         val node3 = net.createNode(node1.info.address)
         net.runNetwork()
-        node2.services.registerServiceFlow(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() }
-        node3.services.registerServiceFlow(SendFlow::class.java) { ReceiveFlow(it).nonTerminating() }
+        node2.registerServiceFlow(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
+        node3.registerServiceFlow(SendFlow::class) { ReceiveFlow(it).nonTerminating() }
         val payload = "Hello World"
         node1.services.startFlow(SendFlow(payload, node2.info.legalIdentity, node3.info.legalIdentity))
         net.runNetwork()
@@ -254,14 +255,14 @@ class StateMachineManagerTests {
         assertThat(node3Flow.receivedPayloads[0]).isEqualTo(payload)
 
         assertSessionTransfers(node2,
-                node1 sent sessionInit(SendFlow::class, payload) to node2,
+                node1 sent sessionInit(SendFlow::class, 1, payload) to node2,
                 node2 sent sessionConfirm to node1,
                 node1 sent normalEnd to node2
                 //There's no session end from the other flows as they're manually suspended
         )
 
         assertSessionTransfers(node3,
-                node1 sent sessionInit(SendFlow::class, payload) to node3,
+                node1 sent sessionInit(SendFlow::class, 1, payload) to node3,
                 node3 sent sessionConfirm to node1,
                 node1 sent normalEnd to node3
                 //There's no session end from the other flows as they're manually suspended
@@ -277,8 +278,8 @@ class StateMachineManagerTests {
         net.runNetwork()
         val node2Payload = "Test 1"
         val node3Payload = "Test 2"
-        node2.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow(node2Payload, it) }
-        node3.services.registerServiceFlow(ReceiveFlow::class.java) { SendFlow(node3Payload, it) }
+        node2.registerServiceFlow(ReceiveFlow::class) { SendFlow(node2Payload, it) }
+        node3.registerServiceFlow(ReceiveFlow::class) { SendFlow(node3Payload, it) }
         val multiReceiveFlow = ReceiveFlow(node2.info.legalIdentity, node3.info.legalIdentity).nonTerminating()
         node1.services.startFlow(multiReceiveFlow)
         node1.acceptableLiveFiberCountOnStop = 1
@@ -303,12 +304,12 @@ class StateMachineManagerTests {
 
     @Test
     fun `both sides do a send as their first IO request`() {
-        node2.services.registerServiceFlow(PingPongFlow::class.java) { PingPongFlow(it, 20L) }
+        node2.registerServiceFlow(PingPongFlow::class) { PingPongFlow(it, 20L) }
         node1.services.startFlow(PingPongFlow(node2.info.legalIdentity, 10L))
         net.runNetwork()
 
         assertSessionTransfers(
-                node1 sent sessionInit(PingPongFlow::class, 10L) to node2,
+                node1 sent sessionInit(PingPongFlow::class, 1, 10L) to node2,
                 node2 sent sessionConfirm to node1,
                 node2 sent sessionData(20L) to node1,
                 node1 sent sessionData(11L) to node2,
@@ -374,7 +375,7 @@ class StateMachineManagerTests {
 
     @Test
     fun `other side ends before doing expected send`() {
-        node2.services.registerServiceFlow(ReceiveFlow::class.java) { NoOpFlow() }
+        node2.registerServiceFlow(ReceiveFlow::class) { NoOpFlow() }
         val resultFuture = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity)).resultFuture
         net.runNetwork()
         assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy {
@@ -534,7 +535,7 @@ class StateMachineManagerTests {
             }
         }
 
-        node2.services.registerServiceFlow(AskForExceptionFlow::class.java) { ConditionalExceptionFlow(it, "Hello") }
+        node2.registerServiceFlow(AskForExceptionFlow::class) { ConditionalExceptionFlow(it, "Hello") }
         val resultFuture = node1.services.startFlow(RetryOnExceptionFlow(node2.info.legalIdentity)).resultFuture
         net.runNetwork()
         assertThat(resultFuture.getOrThrow()).isEqualTo("Hello")
@@ -562,7 +563,7 @@ class StateMachineManagerTests {
         ptx.signWith(node1.services.legalIdentityKey)
         val stx = ptx.toSignedTransaction()
 
-        node1.services.registerServiceFlow(WaitingFlows.Waiter::class.java) {
+        node1.registerServiceFlow(WaitingFlows.Waiter::class) {
             WaitingFlows.Committer(it) { throw Exception("Error") }
         }
         val waiter = node2.services.startFlow(WaitingFlows.Waiter(stx, node1.info.legalIdentity)).resultFuture
@@ -587,6 +588,31 @@ class StateMachineManagerTests {
         assertThat(receiveFlowFuture.getOrThrow().receivedPayloads).containsOnly("Hello")
     }
 
+    @Test
+    fun `upgraded flow`() {
+        node1.services.startFlow(UpgradedFlow(node2.info.legalIdentity))
+        net.runNetwork()
+        assertThat(sessionTransfers).startsWith(
+                node1 sent sessionInit(UpgradedFlow::class, 2) to node2
+        )
+    }
+
+    @Test
+    fun `unsupported new flow version`() {
+        node2.registerServiceFlow(UpgradedFlow::class, flowVersion = 1) { SendFlow("Hello", it) }
+        val result = node1.services.startFlow(UpgradedFlow(node2.info.legalIdentity)).resultFuture
+        net.runNetwork()
+        assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy {
+            result.getOrThrow()
+        }.withMessageContaining("Version")
+    }
+
+    @FlowVersion(2)
+    private class UpgradedFlow(val otherParty: Party) : FlowLogic<Any>() {
+        @Suspendable
+        override fun call(): Any = receive<Any>(otherParty).unwrap { it }
+    }
+
 
     ////////////////////////////////////////////////////////////////////////////////////////////////////////////
     //region Helpers
@@ -605,8 +631,8 @@ class StateMachineManagerTests {
         return smm.findStateMachines(P::class.java).single()
     }
 
-    private fun sessionInit(clientFlowClass: KClass<out FlowLogic<*>>, payload: Any? = null): SessionInit {
-        return SessionInit(0, clientFlowClass.java, payload)
+    private fun sessionInit(clientFlowClass: KClass<out FlowLogic<*>>, flowVersion: Int = 1, payload: Any? = null): SessionInit {
+        return SessionInit(0, clientFlowClass.java, flowVersion, payload)
     }
     private val sessionConfirm = SessionConfirm(0, 0)
     private fun sessionData(payload: Any) = SessionData(0, payload)
diff --git a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt
index 76b198eac8..37c68fba2d 100644
--- a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt
+++ b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt
@@ -30,7 +30,7 @@ class TraderDemoTest : NodeBasedTest() {
                 startNode(DUMMY_BANK_A.name, rpcUsers = demoUser),
                 startNode(DUMMY_BANK_B.name, rpcUsers = demoUser),
                 startNode(BOC.name, rpcUsers = listOf(user)),
-                startNode(DUMMY_NOTARY.name, setOf(ServiceInfo(SimpleNotaryService.type)))
+                startNode(DUMMY_NOTARY.name, advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
         ).getOrThrow()
 
         val (nodeARpc, nodeBRpc) = listOf(nodeA, nodeB).map {
diff --git a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt
index b64f505a62..c158a06b21 100644
--- a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt
+++ b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt
@@ -145,9 +145,9 @@ fun getFreeLocalPorts(hostName: String, numberToAlloc: Int): List<HostAndPort> {
  */
 inline fun <reified P : FlowLogic<*>> AbstractNode.initiateSingleShotFlow(
         clientFlowClass: KClass<out FlowLogic<*>>,
-        noinline flowFactory: (Party) -> P): ListenableFuture<P> {
+        noinline serviceFlowFactory: (Party) -> P): ListenableFuture<P> {
     val future = smm.changes.filter { it is StateMachineManager.Change.Add && it.logic is P }.map { it.logic as P }.toFuture()
-    services.registerServiceFlow(clientFlowClass.java, flowFactory)
+    services.registerServiceFlow(clientFlowClass.java, serviceFlowFactory)
     return future
 }
 
diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt
index 9cbc1aa1d8..33f1bcb25d 100644
--- a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt
+++ b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt
@@ -272,12 +272,20 @@ class InMemoryMessagingNetwork(
     }
 
     @CordaSerializable
-    private data class InMemoryMessage(override val topicSession: TopicSession, override val data: ByteArray, override val uniqueMessageId: UUID, override val debugTimestamp: Instant = Instant.now()) : Message {
+    private data class InMemoryMessage(override val topicSession: TopicSession,
+                                       override val data: ByteArray,
+                                       override val uniqueMessageId: UUID,
+                                       override val debugTimestamp: Instant = Instant.now()) : Message {
         override fun toString() = "$topicSession#${String(data)}"
     }
 
     @CordaSerializable
-    private data class InMemoryReceivedMessage(override val topicSession: TopicSession, override val data: ByteArray, override val uniqueMessageId: UUID, override val debugTimestamp: Instant, override val peer: X500Name) : ReceivedMessage
+    private data class InMemoryReceivedMessage(override val topicSession: TopicSession,
+                                               override val data: ByteArray,
+                                               override val platformVersion: Int,
+                                               override val uniqueMessageId: UUID,
+                                               override val debugTimestamp: Instant,
+                                               override val peer: X500Name) : ReceivedMessage
 
     /**
      * An [InMemoryMessaging] provides a [MessagingService] that isn't backed by any kind of network or disk storage
@@ -453,6 +461,9 @@ class InMemoryMessagingNetwork(
         private fun MessageTransfer.toReceivedMessage(): ReceivedMessage = InMemoryReceivedMessage(
                 message.topicSession,
                 message.data.copyOf(), // Kryo messes with the buffer so give each client a unique copy
-                message.uniqueMessageId, message.debugTimestamp, X509Utilities.getDevX509Name(sender.description))
+                1,
+                message.uniqueMessageId,
+                message.debugTimestamp,
+                X509Utilities.getDevX509Name(sender.description))
     }
 }
diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt
index 75e1dff548..6ecba4a8f4 100644
--- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt
+++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt
@@ -1,5 +1,6 @@
 package net.corda.testing.node
 
+import com.google.common.annotations.VisibleForTesting
 import com.google.common.jimfs.Configuration.unix
 import com.google.common.jimfs.Jimfs
 import com.google.common.util.concurrent.Futures
@@ -7,6 +8,7 @@ import com.google.common.util.concurrent.ListenableFuture
 import net.corda.core.*
 import net.corda.core.crypto.Party
 import net.corda.core.crypto.entropyToKeyPair
+import net.corda.core.flows.FlowLogic
 import net.corda.core.messaging.RPCOps
 import net.corda.core.messaging.SingleMessageRecipient
 import net.corda.core.node.CordaPluginRegistry
@@ -16,11 +18,13 @@ import net.corda.core.node.services.*
 import net.corda.core.utilities.DUMMY_NOTARY_KEY
 import net.corda.core.utilities.loggerFor
 import net.corda.node.internal.AbstractNode
+import net.corda.node.internal.ServiceFlowInfo
 import net.corda.node.services.api.MessagingServiceInternal
 import net.corda.node.services.config.NodeConfiguration
 import net.corda.node.services.keys.E2ETestKeyManagementService
 import net.corda.node.services.network.InMemoryNetworkMapService
 import net.corda.node.services.network.NetworkMapService
+import net.corda.node.services.statemachine.flowVersion
 import net.corda.node.services.transactions.InMemoryTransactionVerifierService
 import net.corda.node.services.transactions.InMemoryUniquenessProvider
 import net.corda.node.services.transactions.SimpleNotaryService
@@ -38,6 +42,7 @@ import java.security.KeyPair
 import java.util.*
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicInteger
+import kotlin.reflect.KClass
 
 /**
  * A mock node brings up a suite of in-memory services in a fast manner suitable for unit testing.
@@ -224,6 +229,13 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
         // It is used from the network visualiser tool.
         @Suppress("unused") val place: PhysicalLocation get() = findMyLocation()!!
 
+        @VisibleForTesting
+        fun registerServiceFlow(clientFlowClass: KClass<out FlowLogic<*>>,
+                                flowVersion: Int = clientFlowClass.java.flowVersion,
+                                serviceFlowFactory: (Party) -> FlowLogic<*>) {
+            serviceFlowFactories[clientFlowClass.java] = ServiceFlowInfo.CorDapp(flowVersion, serviceFlowFactory)
+        }
+
         fun pumpReceive(block: Boolean = false): InMemoryMessagingNetwork.MessageTransfer? {
             return (net as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(block)
         }
diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt
index fdac833c6b..cb093a4460 100644
--- a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt
+++ b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt
@@ -60,21 +60,24 @@ abstract class NodeBasedTest {
      * will automatically be started with the default parameters.
      */
     fun startNetworkMapNode(legalName: String = DUMMY_MAP.name,
+                            platformVersion: Int = 1,
                             advertisedServices: Set<ServiceInfo> = emptySet(),
                             rpcUsers: List<User> = emptyList(),
                             configOverrides: Map<String, Any> = emptyMap()): Node {
         check(_networkMapNode == null)
-        return startNodeInternal(legalName, advertisedServices, rpcUsers, configOverrides).apply {
+        return startNodeInternal(legalName, platformVersion, advertisedServices, rpcUsers, configOverrides).apply {
             _networkMapNode = this
         }
     }
 
     fun startNode(legalName: String,
+                  platformVersion: Int = 1,
                   advertisedServices: Set<ServiceInfo> = emptySet(),
                   rpcUsers: List<User> = emptyList(),
                   configOverrides: Map<String, Any> = emptyMap()): ListenableFuture<Node> {
         val node = startNodeInternal(
                 legalName,
+                platformVersion,
                 advertisedServices,
                 rpcUsers,
                 mapOf(
@@ -118,6 +121,7 @@ abstract class NodeBasedTest {
     }
 
     private fun startNodeInternal(legalName: String,
+                                  platformVersion: Int,
                                   advertisedServices: Set<ServiceInfo>,
                                   rpcUsers: List<User>,
                                   configOverrides: Map<String, Any>): Node {
@@ -141,7 +145,7 @@ abstract class NodeBasedTest {
                 ) + configOverrides
         )
 
-        val node = config.parseAs<FullNodeConfiguration>().createNode(MOCK_VERSION_INFO)
+        val node = config.parseAs<FullNodeConfiguration>().createNode(MOCK_VERSION_INFO.copy(platformVersion = platformVersion))
         node.start()
         nodes += node
         thread(name = legalName) {