Introducing Platform Version and its use by the NMS for min version requirements for the network

This commit is contained in:
Shams Asari
2017-04-19 11:05:27 +01:00
parent 684d1089f0
commit b5e022f350
35 changed files with 247 additions and 237 deletions

View File

@ -15,7 +15,7 @@ import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.network.NetworkMapService.RegistrationRequest
import net.corda.node.services.network.NodeRegistration
import net.corda.node.utilities.AddOrRemove
import net.corda.testing.MOCK_VERSION
import net.corda.testing.MOCK_VERSION_INFO
import net.corda.testing.TestNodeConfiguration
import net.corda.testing.node.NodeBasedTest
import net.corda.testing.node.SimpleNode
@ -63,7 +63,7 @@ class P2PSecurityTest : NodeBasedTest() {
}
private fun SimpleNode.registerWithNetworkMap(registrationName: String): ListenableFuture<NetworkMapService.RegistrationResponse> {
val nodeInfo = NodeInfo(net.myAddress, Party(registrationName, identity.public), MOCK_VERSION)
val nodeInfo = NodeInfo(net.myAddress, Party(registrationName, identity.public), MOCK_VERSION_INFO.platformVersion)
val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX)
val request = RegistrationRequest(registration.toWire(identity.private), net.myAddress)
return net.sendRequest<NetworkMapService.RegistrationResponse>(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.net.myAddress)

View File

@ -6,8 +6,7 @@ import com.jcabi.manifests.Manifests
import com.typesafe.config.ConfigException
import joptsimple.OptionException
import net.corda.core.*
import net.corda.core.node.NodeVersionInfo
import net.corda.core.node.Version
import net.corda.core.node.VersionInfo
import net.corda.core.utilities.Emoji
import net.corda.core.utilities.LogHelper.withLevel
import net.corda.node.internal.Node
@ -69,15 +68,17 @@ fun main(args: Array<String>) {
// Manifest properties are only available if running from the corda jar
fun manifestValue(name: String): String? = if (Manifests.exists(name)) Manifests.read(name) else null
val nodeVersionInfo = NodeVersionInfo(
manifestValue("Corda-Version")?.let { Version.parse(it) } ?: Version(0, 0, 0, false),
val versionInfo = VersionInfo(
manifestValue("Corda-Platform-Version")?.toInt() ?: 1,
manifestValue("Corda-Release-Version") ?: "Unknown",
manifestValue("Corda-Revision") ?: "Unknown",
manifestValue("Corda-Vendor") ?: "Unknown"
)
if (cmdlineOptions.isVersion) {
println("${nodeVersionInfo.vendor} ${nodeVersionInfo.version}")
println("Revision ${nodeVersionInfo.revision}")
println("${versionInfo.vendor} ${versionInfo.releaseVersion}")
println("Revision ${versionInfo.revision}")
println("Platform Version ${versionInfo.platformVersion}")
exitProcess(0)
}
@ -87,7 +88,7 @@ fun main(args: Array<String>) {
exitProcess(0)
}
drawBanner(nodeVersionInfo)
drawBanner(versionInfo)
val log = LoggerFactory.getLogger("Main")
printBasicNodeInfo("Logs can be found in", System.getProperty("log-path"))
@ -110,9 +111,10 @@ fun main(args: Array<String>) {
exitProcess(0)
}
log.info("Version: ${nodeVersionInfo.version}")
log.info("Vendor: ${nodeVersionInfo.vendor}")
log.info("Revision: ${nodeVersionInfo.revision}")
log.info("Vendor: ${versionInfo.vendor}")
log.info("Release: ${versionInfo.releaseVersion}")
log.info("Platform Version: ${versionInfo.platformVersion}")
log.info("Revision: ${versionInfo.revision}")
val info = ManagementFactory.getRuntimeMXBean()
log.info("PID: ${info.name.split("@").firstOrNull()}") // TODO Java 9 has better support for this
log.info("Main class: ${FullNodeConfiguration::class.java.protectionDomain.codeSource.location.toURI().path}")
@ -132,7 +134,7 @@ fun main(args: Array<String>) {
try {
cmdlineOptions.baseDirectory.createDirectories()
val node = conf.createNode(nodeVersionInfo)
val node = conf.createNode(versionInfo)
node.start()
printPluginsAndServices(node)
@ -236,7 +238,7 @@ private fun messageOfTheDay(): Pair<String, String> {
return Pair(a, b)
}
private fun drawBanner(nodeVersionInfo: NodeVersionInfo) {
private fun drawBanner(versionInfo: VersionInfo) {
// This line makes sure ANSI escapes work on Windows, where they aren't supported out of the box.
AnsiConsole.systemInstall()
@ -249,7 +251,7 @@ private fun drawBanner(nodeVersionInfo: NodeVersionInfo) {
/ / __ / ___/ __ / __ `/ """).fgBrightBlue().a(msg1).newline().fgBrightRed().a(
"/ /___ /_/ / / / /_/ / /_/ / ").fgBrightBlue().a(msg2).newline().fgBrightRed().a(
"""\____/ /_/ \__,_/\__,_/""").reset().newline().newline().fgBrightDefault().bold().
a("--- ${nodeVersionInfo.vendor} ${nodeVersionInfo.version} (${nodeVersionInfo.revision.take(7)}) -----------------------------------------------").
a("--- ${versionInfo.vendor} ${versionInfo.releaseVersion} (${versionInfo.revision.take(7)}) -----------------------------------------------").
newline().
newline().
a("${Emoji.books}New! ").reset().a("Training now available worldwide, see https://corda.net/corda-training/").

View File

@ -101,7 +101,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
protected abstract val log: Logger
protected abstract val networkMapAddress: SingleMessageRecipient?
protected abstract val version: Version
protected abstract val platformVersion: Int
// We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the
// low-performance prototyping period.
@ -296,7 +296,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private fun makeInfo(): NodeInfo {
val advertisedServiceEntries = makeServiceEntries()
val legalIdentity = obtainLegalIdentity()
return NodeInfo(net.myAddress, legalIdentity, version, advertisedServiceEntries, findMyLocation())
return NodeInfo(net.myAddress, legalIdentity, platformVersion, advertisedServiceEntries, findMyLocation())
}
/**
@ -445,7 +445,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
protected open fun makeKeyManagementService(): KeyManagementService = PersistentKeyManagementService(partyKeys)
open protected fun makeNetworkMapService() {
inNodeNetworkMapService = PersistentNetworkMapService(services)
inNodeNetworkMapService = PersistentNetworkMapService(services, configuration.minimumPlatformVersion)
}
open protected fun makeNotaryService(type: ServiceType, tokenizableServices: MutableList<Any>): NotaryService {

View File

@ -5,14 +5,15 @@ import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.*
import net.corda.core.div
import net.corda.core.flatMap
import net.corda.core.messaging.RPCOps
import net.corda.core.node.NodeVersionInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.Version
import net.corda.core.node.VersionInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.success
import net.corda.core.utilities.loggerFor
import net.corda.node.printBasicNodeInfo
import net.corda.node.serialization.NodeClock
@ -48,14 +49,14 @@ import kotlin.concurrent.thread
*/
class Node(override val configuration: FullNodeConfiguration,
advertisedServices: Set<ServiceInfo>,
val nodeVersionInfo: NodeVersionInfo,
val versionInfo: VersionInfo,
clock: Clock = NodeClock()) : AbstractNode(configuration, advertisedServices, clock) {
companion object {
private val logger = loggerFor<Node>()
}
override val log: Logger get() = logger
override val version: Version get() = nodeVersionInfo.version
override val platformVersion: Int get() = versionInfo.platformVersion
override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress)
override fun makeTransactionVerifierService() = (net as NodeMessagingClient).verifierService
@ -108,32 +109,13 @@ class Node(override val configuration: FullNodeConfiguration,
private lateinit var userService: RPCUserService
init {
checkVersionUnchanged()
}
/**
* Abort starting the node if an existing deployment with a different version is detected in the base directory.
*/
private fun checkVersionUnchanged() {
val versionFile = configuration.baseDirectory / "version"
if (versionFile.exists()) {
val previousVersion = Version.parse(versionFile.readAllLines()[0])
check(nodeVersionInfo.version.major == previousVersion.major) {
"Major version change detected - current: ${nodeVersionInfo.version}, previous: $previousVersion. " +
"Node upgrades across major versions are not yet supported."
}
}
versionFile.writeLines(listOf(nodeVersionInfo.version.toString()))
}
override fun makeMessagingService(): MessagingServiceInternal {
userService = RPCUserServiceImpl(configuration.rpcUsers)
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null
return NodeMessagingClient(
configuration,
nodeVersionInfo,
versionInfo,
serverAddress,
myIdentityOrNullIfNetworkMapService,
serverThread,

View File

@ -2,7 +2,7 @@ package net.corda.node.services.config
import com.google.common.net.HostAndPort
import net.corda.core.div
import net.corda.core.node.NodeVersionInfo
import net.corda.core.node.VersionInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.node.internal.NetworkMapInfo
import net.corda.node.internal.Node
@ -22,6 +22,7 @@ interface NodeConfiguration : SSLConfiguration {
override val certificatesDirectory: Path get() = baseDirectory / "certificates"
val myLegalName: String
val networkMapService: NetworkMapInfo?
val minimumPlatformVersion: Int
val nearestCity: String
val emailAddress: String
val exportJMXto: String
@ -47,6 +48,7 @@ data class FullNodeConfiguration(
override val dataSourceProperties: Properties,
override val certificateSigningService: URL,
override val networkMapService: NetworkMapInfo?,
override val minimumPlatformVersion: Int = 1,
override val rpcUsers: List<User>,
override val verifierType: VerifierType,
val useHTTPS: Boolean,
@ -78,14 +80,14 @@ data class FullNodeConfiguration(
}
}
fun createNode(nodeVersionInfo: NodeVersionInfo): Node {
fun createNode(versionInfo: VersionInfo): Node {
val advertisedServices = extraAdvertisedServiceIds
.filter(String::isNotBlank)
.map { ServiceInfo.parse(it) }
.toMutableSet()
if (networkMapService == null) advertisedServices += ServiceInfo(NetworkMapService.type)
return Node(this, advertisedServices, nodeVersionInfo, if (useTestClock) TestClock() else NodeClock())
return Node(this, advertisedServices, versionInfo, if (useTestClock) TestClock() else NodeClock())
}
}

View File

@ -4,7 +4,7 @@ import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.ThreadBox
import net.corda.core.messaging.*
import net.corda.core.node.NodeVersionInfo
import net.corda.core.node.VersionInfo
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.random63BitValue
@ -68,7 +68,7 @@ import java.security.PublicKey
*/
@ThreadSafe
class NodeMessagingClient(override val config: NodeConfiguration,
nodeVersionInfo: NodeVersionInfo,
val versionInfo: VersionInfo,
val serverHostPort: HostAndPort,
val myIdentity: PublicKey?,
val nodeExecutor: AffinityExecutor,
@ -85,9 +85,10 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// confusion.
private val topicProperty = SimpleString("platform-topic")
private val sessionIdProperty = SimpleString("session-id")
private val nodeVersionProperty = SimpleString("node-version")
private val nodeVendorProperty = SimpleString("node-vendor")
private val amqDelay: Int = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0"))
private val cordaVendorProperty = SimpleString("corda-vendor")
private val releaseVersionProperty = SimpleString("release-version")
private val platformVersionProperty = SimpleString("platform-version")
private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt()
private val verifierResponseAddress = "$VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX.${random63BitValue()}"
}
@ -114,8 +115,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
data class Handler(val topicSession: TopicSession,
val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
private val nodeVendor = SimpleString(nodeVersionInfo.vendor)
private val version = SimpleString(nodeVersionInfo.version.toString())
private val cordaVendor = SimpleString(versionInfo.vendor)
private val releaseVersion = SimpleString(versionInfo.releaseVersion)
/** An executor for sending messages */
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
@ -409,8 +410,9 @@ class NodeMessagingClient(override val config: NodeConfiguration,
state.locked {
val mqAddress = getMQAddress(target)
val artemisMessage = session!!.createMessage(true).apply {
putStringProperty(nodeVendorProperty, nodeVendor)
putStringProperty(nodeVersionProperty, version)
putStringProperty(cordaVendorProperty, cordaVendor)
putStringProperty(releaseVersionProperty, releaseVersion)
putIntProperty(platformVersionProperty, versionInfo.platformVersion)
putStringProperty(topicProperty, SimpleString(message.topicSession.topic))
putLongProperty(sessionIdProperty, message.topicSession.sessionID)
writeBodyBufferBytes(message.data)
@ -418,8 +420,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString()))
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
if (amqDelay > 0 && message.topicSession.topic == StateMachineManager.sessionTopic.topic) {
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelay)
if (amqDelayMillis > 0 && message.topicSession.topic == StateMachineManager.sessionTopic.topic) {
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelayMillis)
}
}
log.trace {

View File

@ -109,7 +109,8 @@ interface NetworkMapService {
}
@ThreadSafe
class InMemoryNetworkMapService(services: ServiceHubInternal) : AbstractNetworkMapService(services) {
class InMemoryNetworkMapService(services: ServiceHubInternal, minimumPlatformVersion: Int)
: AbstractNetworkMapService(services, minimumPlatformVersion) {
override val nodeRegistrations: MutableMap<Party, NodeRegistrationInfo> = ConcurrentHashMap()
override val subscribers = ThreadBox(mutableMapOf<SingleMessageRecipient, LastAcknowledgeInfo>())
@ -126,7 +127,8 @@ class InMemoryNetworkMapService(services: ServiceHubInternal) : AbstractNetworkM
* subscriber clean up and is simpler to persist than the previous implementation based on a set of missing messages acks.
*/
@ThreadSafe
abstract class AbstractNetworkMapService(services: ServiceHubInternal) : NetworkMapService, AbstractNodeService(services) {
abstract class AbstractNetworkMapService(services: ServiceHubInternal,
val minimumPlatformVersion: Int) : NetworkMapService, AbstractNodeService(services) {
companion object {
/**
* Maximum credible size for a registration request. Generally requests are around 500-600 bytes, so this gives a
@ -152,6 +154,13 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal) : Network
private val handlers = ArrayList<MessageHandlerRegistration>()
init {
require(minimumPlatformVersion >= 1) { "minimumPlatformVersion cannot be less than 1" }
require(minimumPlatformVersion <= services.myInfo.platformVersion) {
"minimumPlatformVersion cannot be greater than the node's own version"
}
}
protected fun setup() {
// Register message handlers
handlers += addMessageHandler(FETCH_TOPIC) { req: FetchMapRequest -> processFetchAllRequest(req) }
@ -219,21 +228,27 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal) : Network
}
private fun processRegistrationRequest(request: RegistrationRequest): RegistrationResponse {
if (request.wireReg.raw.size > MAX_SIZE_REGISTRATION_REQUEST_BYTES) return RegistrationResponse("Request is too big")
if (request.wireReg.raw.size > MAX_SIZE_REGISTRATION_REQUEST_BYTES) {
return RegistrationResponse("Request is too big")
}
val change = try {
request.wireReg.verified()
} catch(e: SignatureException) {
} catch (e: SignatureException) {
return RegistrationResponse("Invalid signature on request")
}
val node = change.node
if (node.platformVersion < minimumPlatformVersion) {
return RegistrationResponse("Minimum platform version requirement not met: $minimumPlatformVersion")
}
// Update the current value atomically, so that if multiple updates come
// in on different threads, there is no risk of a race condition while checking
// sequence numbers.
val registrationInfo = try {
nodeRegistrations.compute(node.legalIdentity) { _: Party, existing: NodeRegistrationInfo? ->
nodeRegistrations.compute(node.legalIdentity) { _, existing: NodeRegistrationInfo? ->
require(!((existing == null || existing.reg.type == REMOVE) && change.type == REMOVE)) {
"Attempting to de-register unknown node"
}
@ -263,16 +278,16 @@ abstract class AbstractNetworkMapService(services: ServiceHubInternal) : Network
return RegistrationResponse(null)
}
private fun notifySubscribers(wireReg: WireNodeRegistration, mapVersion: Int) {
private fun notifySubscribers(wireReg: WireNodeRegistration, newMapVersion: Int) {
// TODO: Once we have a better established messaging system, we can probably send
// to a MessageRecipientGroup that nodes join/leave, rather than the network map
// service itself managing the group
val update = NetworkMapService.Update(wireReg, mapVersion, net.myAddress).serialize().bytes
val update = NetworkMapService.Update(wireReg, newMapVersion, net.myAddress).serialize().bytes
val message = net.createMessage(PUSH_TOPIC, DEFAULT_SESSION_ID, update)
subscribers.locked {
// Remove any stale subscribers
values.removeIf { lastAckInfo -> mapVersion - lastAckInfo.mapVersion > maxUnacknowledgedUpdates }
values.removeIf { (mapVersion) -> newMapVersion - mapVersion > maxUnacknowledgedUpdates }
// TODO: introduce some concept of time in the condition to avoid unsubscribes when there's a message burst.
keys.forEach { recipient -> net.send(message, recipient) }
}

View File

@ -17,7 +17,9 @@ import java.util.Collections.synchronizedMap
* This class needs database transactions to be in-flight during method calls and init, otherwise it will throw
* exceptions.
*/
class PersistentNetworkMapService(services: ServiceHubInternal) : AbstractNetworkMapService(services) {
class PersistentNetworkMapService(services: ServiceHubInternal, minimumPlatformVersion: Int)
: AbstractNetworkMapService(services, minimumPlatformVersion) {
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}network_map_nodes") {
val nodeParty = party("node_party_name", "node_party_key")
val registrationInfo = blob("node_registration_info")

View File

@ -25,7 +25,7 @@ import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.transaction
import net.corda.testing.MOCK_NODE_VERSION_INFO
import net.corda.testing.MOCK_VERSION_INFO
import net.corda.testing.TestNodeConfiguration
import net.corda.testing.freeLocalHostAndPort
import net.corda.testing.node.makeTestDataSourceProperties
@ -225,7 +225,7 @@ class ArtemisMessagingTests {
return database.transaction {
NodeMessagingClient(
config,
MOCK_NODE_VERSION_INFO,
MOCK_VERSION_INFO,
server,
identity.public,
ServiceAffinityExecutor("ArtemisMessagingTests", 1),

View File

@ -5,6 +5,7 @@ import net.corda.core.node.services.ServiceInfo
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.utilities.transaction
import net.corda.testing.MOCK_VERSION_INFO
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import java.math.BigInteger
@ -48,11 +49,11 @@ class PersistentNetworkMapServiceTest : AbstractNetworkMapServiceTest<Persistent
* state within it is correctly restored.
*/
private class SwizzleNetworkMapService(val services: ServiceHubInternal) : NetworkMapService {
var delegate: PersistentNetworkMapService = PersistentNetworkMapService(services)
var delegate: PersistentNetworkMapService = PersistentNetworkMapService(services, MOCK_VERSION_INFO.platformVersion)
fun swizzle() {
delegate.unregisterNetworkHandlers()
delegate = PersistentNetworkMapService(services)
delegate = PersistentNetworkMapService(services, MOCK_VERSION_INFO.platformVersion)
}
}
}