From e9d63b2662358d5f3b98b4cbb441565413f7b977 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Mon, 6 Mar 2017 14:19:04 +0000 Subject: [PATCH] Added bits of versioning info to the node --- build.gradle | 5 ++ core/src/main/kotlin/net/corda/core/Utils.kt | 1 + .../kotlin/net/corda/core/node/NodeInfo.kt | 1 + .../kotlin/net/corda/core/node/Version.kt | 28 +++++++++++ .../kotlin/net/corda/core/node/VersionTest.kt | 31 ++++++++++++ node/build.gradle | 3 ++ node/capsule/build.gradle | 3 +- .../services/messaging/P2PSecurityTest.kt | 3 +- .../main/kotlin/net/corda/node/ArgsParser.kt | 7 ++- node/src/main/kotlin/net/corda/node/Corda.kt | 31 +++++++++--- .../net/corda/node/internal/AbstractNode.kt | 3 +- .../kotlin/net/corda/node/internal/Node.kt | 44 ++++++++-------- .../node/services/config/NodeConfiguration.kt | 5 +- .../services/messaging/NodeMessagingClient.kt | 50 ++++++++++--------- .../statemachine/FlowStateMachineImpl.kt | 2 +- .../kotlin/net/corda/node/ArgsParserTest.kt | 43 ++++++++++------ .../messaging/ArtemisMessagingTests.kt | 2 + .../kotlin/net/corda/testing/CoreTestUtils.kt | 5 ++ .../corda/testing/node/MockNetworkMapCache.kt | 7 +-- .../kotlin/net/corda/testing/node/MockNode.kt | 6 ++- .../net/corda/testing/node/MockServices.kt | 9 ++-- .../net/corda/testing/node/NodeBasedTest.kt | 3 +- .../net/corda/testing/node/SimpleNode.kt | 2 + 23 files changed, 211 insertions(+), 83 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/node/Version.kt create mode 100644 core/src/test/kotlin/net/corda/core/node/VersionTest.kt diff --git a/build.gradle b/build.gradle index c22a2bb950..efd52641fa 100644 --- a/build.gradle +++ b/build.gradle @@ -47,6 +47,7 @@ buildscript { classpath 'com.github.ben-manes:gradle-versions-plugin:0.13.0' classpath "org.jetbrains.kotlin:kotlin-noarg:$kotlin_version" classpath "org.jetbrains.dokka:dokka-gradle-plugin:${dokka_version}" + classpath "org.ajoberstar:grgit:1.1.0" } } @@ -56,6 +57,10 @@ plugins { id "us.kirchmeier.capsule" version "1.0.2" } +ext { + corda_revision = org.ajoberstar.grgit.Grgit.open(file('.')).head().id +} + apply plugin: 'kotlin' apply plugin: 'project-report' apply plugin: 'com.github.ben-manes.versions' diff --git a/core/src/main/kotlin/net/corda/core/Utils.kt b/core/src/main/kotlin/net/corda/core/Utils.kt index 91d8a67bf3..6b4969bdce 100644 --- a/core/src/main/kotlin/net/corda/core/Utils.kt +++ b/core/src/main/kotlin/net/corda/core/Utils.kt @@ -146,6 +146,7 @@ inline fun Path.write(createDirs: Boolean = false, vararg options: OpenOption = } inline fun Path.readLines(charset: Charset = UTF_8, block: (Stream) -> R): R = Files.lines(this, charset).use(block) +fun Path.readAllLines(charset: Charset = UTF_8): List = Files.readAllLines(this, charset) fun Path.writeLines(lines: Iterable, charset: Charset = UTF_8, vararg options: OpenOption): Path = Files.write(this, lines, charset, *options) fun InputStream.copyTo(target: Path, vararg options: CopyOption): Long = Files.copy(this, target, *options) diff --git a/core/src/main/kotlin/net/corda/core/node/NodeInfo.kt b/core/src/main/kotlin/net/corda/core/node/NodeInfo.kt index cfab38e427..c86cf30bd4 100644 --- a/core/src/main/kotlin/net/corda/core/node/NodeInfo.kt +++ b/core/src/main/kotlin/net/corda/core/node/NodeInfo.kt @@ -19,6 +19,7 @@ data class ServiceEntry(val info: ServiceInfo, val identity: Party) @CordaSerializable data class NodeInfo(val address: SingleMessageRecipient, val legalIdentity: Party, + val version: Version, var advertisedServices: List = emptyList(), val physicalLocation: PhysicalLocation? = null) { init { diff --git a/core/src/main/kotlin/net/corda/core/node/Version.kt b/core/src/main/kotlin/net/corda/core/node/Version.kt new file mode 100644 index 0000000000..8576e7a235 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/node/Version.kt @@ -0,0 +1,28 @@ +package net.corda.core.node + +import net.corda.core.serialization.CordaSerializable +import java.util.regex.Pattern + +/** + * Versions of the same [major] version but with different [minor] versions are considered compatible with each other. One + * exception to this is when the major version is 0 - each different minor version should be considered incompatible. + * + * If two [Version]s are equal (i.e. [equals] returns true) but they are both [snapshot] then they may refer to different + * builds of the node. [NodeVersionInfo.revision] would be required to differentiate the two. + */ +@CordaSerializable +data class Version(val major: Int, val minor: Int, val snapshot: Boolean) { + companion object { + private val pattern = Pattern.compile("""(\d+)\.(\d+)(-SNAPSHOT)?""") + + fun parse(string: String): Version { + val matcher = pattern.matcher(string) + require(matcher.matches()) + return Version(matcher.group(1).toInt(), matcher.group(2).toInt(), matcher.group(3) != null) + } + } + + override fun toString(): String = if (snapshot) "$major.$minor-SNAPSHOT" else "$major.$minor" +} + +data class NodeVersionInfo(val version: Version, val revision: String, val vendor: String) \ No newline at end of file diff --git a/core/src/test/kotlin/net/corda/core/node/VersionTest.kt b/core/src/test/kotlin/net/corda/core/node/VersionTest.kt new file mode 100644 index 0000000000..2b48d28b8a --- /dev/null +++ b/core/src/test/kotlin/net/corda/core/node/VersionTest.kt @@ -0,0 +1,31 @@ +package net.corda.core.node + +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.Test + +class VersionTest { + @Test + fun `parse valid non-SNAPSHOT string`() { + assertThat(Version.parse("1.2")).isEqualTo(Version(1, 2, false)) + } + + @Test + fun `parse valid SNAPSHOT string`() { + assertThat(Version.parse("2.23-SNAPSHOT")).isEqualTo(Version(2, 23, true)) + } + + @Test + fun `parse string with just major number`() { + assertThatThrownBy { + Version.parse("2") + }.isInstanceOf(IllegalArgumentException::class.java) + } + + @Test + fun `parse string with unknown qualifier`() { + assertThatThrownBy { + Version.parse("2.3-TEST") + }.isInstanceOf(IllegalArgumentException::class.java) + } +} \ No newline at end of file diff --git a/node/build.gradle b/node/build.gradle index 46834b09b0..3fa6816db5 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -87,6 +87,9 @@ dependencies { // JAnsi: for drawing things to the terminal in nicely coloured ways. compile "org.fusesource.jansi:jansi:$jansi_version" + // Manifests: for reading stuff from the manifest file + compile "com.jcabi:jcabi-manifests:1.1" + // GraphStream: For visualisation testCompile "org.graphstream:gs-core:1.3" testCompile("org.graphstream:gs-ui:1.3") { diff --git a/node/capsule/build.gradle b/node/capsule/build.gradle index a05f7b2eff..79d9d6cd4f 100644 --- a/node/capsule/build.gradle +++ b/node/capsule/build.gradle @@ -59,7 +59,6 @@ task buildCordaJAR(type: FatCapsule) { appClassPath = ["jolokia-agent-war-${project.rootProject.ext.jolokia_version}.war"] javaAgents = ["quasar-core-${quasar_version}-jdk8.jar"] systemProperties['visualvm.display.name'] = 'Corda' - systemProperties['corda.version'] = corda_version minJavaVersion = '1.8.0' // This version is known to work and avoids earlier 8u versions that have bugs. minUpdateVersion['1.8'] = '102' @@ -80,6 +79,8 @@ task buildCordaJAR(type: FatCapsule) { manifest { attributes('Corda-Version': corda_version) + attributes('Corda-Revision': corda_revision) + attributes('Corda-Vendor': 'Corda Open Source') } } diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt index d51286cb45..6ab3268eed 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt @@ -15,6 +15,7 @@ import net.corda.node.services.network.NetworkMapService import net.corda.node.services.network.NetworkMapService.RegistrationRequest import net.corda.node.services.network.NodeRegistration import net.corda.node.utilities.AddOrRemove +import net.corda.testing.MOCK_VERSION import net.corda.testing.TestNodeConfiguration import net.corda.testing.node.NodeBasedTest import net.corda.testing.node.SimpleNode @@ -62,7 +63,7 @@ class P2PSecurityTest : NodeBasedTest() { } private fun SimpleNode.registerWithNetworkMap(registrationName: String): ListenableFuture { - val nodeInfo = NodeInfo(net.myAddress, Party(registrationName, identity.public)) + val nodeInfo = NodeInfo(net.myAddress, Party(registrationName, identity.public), MOCK_VERSION) val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX) val request = RegistrationRequest(registration.toWire(identity.private), net.myAddress) return net.sendRequest(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.net.myAddress) diff --git a/node/src/main/kotlin/net/corda/node/ArgsParser.kt b/node/src/main/kotlin/net/corda/node/ArgsParser.kt index a4af516614..e8c30d11c8 100644 --- a/node/src/main/kotlin/net/corda/node/ArgsParser.kt +++ b/node/src/main/kotlin/net/corda/node/ArgsParser.kt @@ -29,6 +29,7 @@ class ArgsParser { .defaultsTo(Level.INFO) private val logToConsoleArg = optionParser.accepts("log-to-console", "If set, prints logging to the console as well as to a file.") private val isRegistrationArg = optionParser.accepts("initial-registration", "Start initial node registration with Corda network to obtain certificate from the permissioning server.") + private val isVersionArg = optionParser.accepts("version", "Print the version and exit") private val helpArg = optionParser.accepts("help").forHelp() fun parse(vararg args: String): CmdLineOptions { @@ -42,7 +43,8 @@ class ArgsParser { val loggingLevel = optionSet.valueOf(loggerLevel) val logToConsole = optionSet.has(logToConsoleArg) val isRegistration = optionSet.has(isRegistrationArg) - return CmdLineOptions(baseDirectory, configFile, help, loggingLevel, logToConsole, isRegistration) + val isVersion = optionSet.has(isVersionArg) + return CmdLineOptions(baseDirectory, configFile, help, loggingLevel, logToConsole, isRegistration, isVersion) } fun printHelp(sink: PrintStream) = optionParser.printHelpOn(sink) @@ -53,7 +55,8 @@ data class CmdLineOptions(val baseDirectory: Path, val help: Boolean, val loggingLevel: Level, val logToConsole: Boolean, - val isRegistration: Boolean) { + val isRegistration: Boolean, + val isVersion: Boolean) { fun loadConfig(allowMissingConfig: Boolean = false, configOverrides: Map = emptyMap()): Config { return ConfigHelper.loadConfig(baseDirectory, configFile, allowMissingConfig, configOverrides) } diff --git a/node/src/main/kotlin/net/corda/node/Corda.kt b/node/src/main/kotlin/net/corda/node/Corda.kt index 3ba3c6abb1..d2438c2c9d 100644 --- a/node/src/main/kotlin/net/corda/node/Corda.kt +++ b/node/src/main/kotlin/net/corda/node/Corda.kt @@ -1,9 +1,12 @@ @file:JvmName("Corda") package net.corda.node +import com.jcabi.manifests.Manifests import com.typesafe.config.ConfigException import joptsimple.OptionException import net.corda.core.* +import net.corda.core.node.NodeVersionInfo +import net.corda.core.node.Version import net.corda.core.utilities.Emoji import net.corda.node.internal.Node import net.corda.node.services.config.FullNodeConfiguration @@ -35,6 +38,16 @@ fun main(args: Array) { val startTime = System.currentTimeMillis() checkJavaVersion() + val nodeVersionInfo = if (Manifests.exists("Corda-Version")) { + NodeVersionInfo( + Version.parse(Manifests.read("Corda-Version")), + Manifests.read("Corda-Revision"), + Manifests.read("Corda-Vendor")) + } else { + // If the manifest properties aren't available then we're running from within an IDE + NodeVersionInfo(Version(0, 0, false), "~Git revision unavailable~", "Unknown vendor") + } + val argsParser = ArgsParser() val cmdlineOptions = try { @@ -45,6 +58,12 @@ fun main(args: Array) { exitProcess(1) } + if (cmdlineOptions.isVersion) { + println("${nodeVersionInfo.vendor} ${nodeVersionInfo.version}") + println("Revision ${nodeVersionInfo.revision}") + exitProcess(0) + } + // Maybe render command line help. if (cmdlineOptions.help) { argsParser.printHelp(System.out) @@ -58,7 +77,7 @@ fun main(args: Array) { renderBasicInfoToConsole = false } - drawBanner() + drawBanner(nodeVersionInfo) System.setProperty("log-path", (cmdlineOptions.baseDirectory / "logs").toString()) @@ -97,7 +116,7 @@ fun main(args: Array) { try { cmdlineOptions.baseDirectory.createDirectories() - val node = conf.createNode() + val node = conf.createNode(nodeVersionInfo) node.start() printPluginsAndServices(node) @@ -116,6 +135,7 @@ fun main(args: Array) { log.error("Exception during node startup", e) exitProcess(1) } + exitProcess(0) } @@ -154,13 +174,12 @@ private fun messageOfTheDay(): Pair { "Computer science and finance together.\nYou should see our crazy Christmas parties!" ) if (Emoji.hasEmojiTerminal) - messages += - "Kind of like a regular database but\nwith emojis, colours and ascii art. ${Emoji.coolGuy}" + messages += "Kind of like a regular database but\nwith emojis, colours and ascii art. ${Emoji.coolGuy}" val (a, b) = messages.randomOrNull()!!.split('\n') return Pair(a, b) } -private fun drawBanner() { +private fun drawBanner(nodeVersionInfo: NodeVersionInfo) { // This line makes sure ANSI escapes work on Windows, where they aren't supported out of the box. AnsiConsole.systemInstall() @@ -174,7 +193,7 @@ private fun drawBanner() { / / __ / ___/ __ / __ `/ """).fgBrightBlue().a(msg1).newline().fgBrightRed().a( "/ /___ /_/ / / / /_/ / /_/ / ").fgBrightBlue().a(msg2).newline().fgBrightRed().a( """\____/ /_/ \__,_/\__,_/""").reset().newline().newline().fgBrightDefault().bold(). - a("--- MILESTONE 9 -------------------------------------------------------------------"). + a("--- ${nodeVersionInfo.vendor} ${nodeVersionInfo.version} (${nodeVersionInfo.revision.take(6)}) -----------------------------------------------"). newline(). newline(). a("${Emoji.books}New! ").reset().a("Training now available worldwide, see https://corda.net/corda-training/"). diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 297c113c11..6bc7c0bb7e 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -97,6 +97,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, protected abstract val log: Logger protected abstract val networkMapAddress: SingleMessageRecipient? + protected abstract val version: Version // We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the // low-performance prototyping period. @@ -282,7 +283,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, private fun makeInfo(): NodeInfo { val advertisedServiceEntries = makeServiceEntries() val legalIdentity = obtainLegalIdentity() - return NodeInfo(net.myAddress, legalIdentity, advertisedServiceEntries, findMyLocation()) + return NodeInfo(net.myAddress, legalIdentity, version, advertisedServiceEntries, findMyLocation()) } /** diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 28c0ffc245..cf19f11724 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -4,11 +4,11 @@ import com.codahale.metrics.JmxReporter import com.google.common.net.HostAndPort import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.ListenableFuture -import net.corda.core.div -import net.corda.core.flatMap -import net.corda.core.getOrThrow +import net.corda.core.* import net.corda.core.messaging.RPCOps +import net.corda.core.node.NodeVersionInfo import net.corda.core.node.ServiceHub +import net.corda.core.node.Version import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType import net.corda.core.node.services.UniquenessProvider @@ -22,14 +22,15 @@ import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.NodeMessagingClient -import net.corda.node.services.transactions.* +import net.corda.node.services.transactions.PersistentUniquenessProvider +import net.corda.node.services.transactions.RaftUniquenessProvider +import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.utilities.AddressUtils import net.corda.node.utilities.AffinityExecutor +import org.slf4j.Logger import java.io.RandomAccessFile import java.lang.management.ManagementFactory import java.nio.channels.FileLock -import java.nio.file.Files -import java.nio.file.Paths import java.time.Clock import javax.management.ObjectName import kotlin.concurrent.thread @@ -45,8 +46,14 @@ import kotlin.concurrent.thread */ class Node(override val configuration: FullNodeConfiguration, advertisedServices: Set, + val nodeVersionInfo: NodeVersionInfo, clock: Clock = NodeClock()) : AbstractNode(configuration, advertisedServices, clock) { - override val log = loggerFor() + companion object { + private val logger = loggerFor() + } + + override val log: Logger get() = logger + override val version: Version get() = nodeVersionInfo.version override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress) // DISCUSSION @@ -103,35 +110,32 @@ class Node(override val configuration: FullNodeConfiguration, } /** - * Abort starting the node if an existing deployment with a different version is detected in the current directory. - * The current version is expected to be specified as a system property. If not provided, the check will be ignored. + * Abort starting the node if an existing deployment with a different version is detected in the base directory. */ private fun checkVersionUnchanged() { - val currentVersion = System.getProperty("corda.version") ?: return - val versionFile = Paths.get("version") - if (Files.exists(versionFile)) { - val existingVersion = Files.readAllLines(versionFile)[0] - check(existingVersion == currentVersion) { - "Version change detected - current: $currentVersion, existing: $existingVersion. Node upgrades are not yet supported." + val versionFile = configuration.baseDirectory / "version" + if (versionFile.exists()) { + val previousVersion = Version.parse(versionFile.readAllLines()[0]) + check(nodeVersionInfo.version.major == previousVersion.major) { + "Major version change detected - current: ${nodeVersionInfo.version}, previous: $previousVersion. " + + "Node upgrades across major versions are not yet supported." } - } else { - Files.write(versionFile, currentVersion.toByteArray()) } + versionFile.writeLines(listOf(nodeVersionInfo.version.toString())) } override fun makeMessagingService(): MessagingServiceInternal { userService = RPCUserServiceImpl(configuration) val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker() val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null - return NodeMessagingClient( configuration, + nodeVersionInfo, serverAddress, myIdentityOrNullIfNetworkMapService, serverThread, database, - networkMapRegistrationFuture - ) + networkMapRegistrationFuture) } private fun makeLocalMessageBroker(): HostAndPort { diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 405ee32429..159eec7916 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -3,6 +3,7 @@ package net.corda.node.services.config import com.google.common.net.HostAndPort import com.typesafe.config.Config import net.corda.core.div +import net.corda.core.node.NodeVersionInfo import net.corda.core.node.services.ServiceInfo import net.corda.node.internal.NetworkMapInfo import net.corda.node.internal.Node @@ -77,7 +78,7 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config .getListOrElse("notaryClusterAddresses") { emptyList() } .map { HostAndPort.fromString(it) } - fun createNode(): Node { + fun createNode(nodeVersionInfo: NodeVersionInfo): Node { // This is a sanity feature do not remove. require(!useTestClock || devMode) { "Cannot use test clock outside of dev mode" } @@ -87,7 +88,7 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config .toMutableSet() if (networkMapService == null) advertisedServices.add(ServiceInfo(NetworkMapService.type)) - return Node(this, advertisedServices, if (useTestClock) TestClock() else NodeClock()) + return Node(this, advertisedServices, nodeVersionInfo, if (useTestClock) TestClock() else NodeClock()) } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index 2c3bc0eb4a..f9aee2122b 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -5,6 +5,7 @@ import com.google.common.util.concurrent.ListenableFuture import net.corda.core.ThreadBox import net.corda.core.crypto.CompositeKey import net.corda.core.messaging.* +import net.corda.core.node.NodeVersionInfo import net.corda.core.node.services.PartyInfo import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.opaque @@ -21,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.Message.* import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.* +import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE import org.bouncycastle.asn1.x500.X500Name import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.ResultRow @@ -53,6 +55,7 @@ import javax.annotation.concurrent.ThreadSafe */ @ThreadSafe class NodeMessagingClient(override val config: NodeConfiguration, + nodeVersionInfo: NodeVersionInfo, val serverHostPort: HostAndPort, val myIdentity: CompositeKey?, val nodeExecutor: AffinityExecutor, @@ -65,9 +68,11 @@ class NodeMessagingClient(override val config: NodeConfiguration, // We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint // that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid // confusion. - const val TOPIC_PROPERTY = "platform-topic" - const val SESSION_ID_PROPERTY = "session-id" - private val AMQ_DELAY: Int = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0")) + private val topicProperty = SimpleString("platform-topic") + private val sessionIdProperty = SimpleString("session-id") + private val nodeVersionProperty = SimpleString("node-version") + private val nodeVendorProperty = SimpleString("node-vendor") + private val amqDelay: Int = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0")) } private class InnerState { @@ -87,6 +92,8 @@ class NodeMessagingClient(override val config: NodeConfiguration, data class Handler(val topicSession: TopicSession, val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration + private val nodeVendor = SimpleString(nodeVersionInfo.vendor) + private val version = SimpleString(nodeVersionInfo.version.toString()) /** An executor for sending messages */ private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1) @@ -130,7 +137,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, // using our TLS certificate. // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer // size of 1MB is acknowledged. - val session = clientFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) + val session = clientFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE) this.session = session session.start() @@ -165,7 +172,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, private fun makeP2PConsumer(session: ClientSession, networkMapOnly: Boolean): ClientConsumer { return if (networkMapOnly) { // Filter for just the network map messages. - val messageFilter = "hyphenated_props:$TOPIC_PROPERTY like 'platform.network_map.%'" + val messageFilter = "hyphenated_props:$topicProperty like 'platform.network_map.%'" session.createConsumer(P2P_QUEUE, messageFilter) } else session.createConsumer(P2P_QUEUE) @@ -249,18 +256,10 @@ class NodeMessagingClient(override val config: NodeConfiguration, private fun artemisToCordaMessage(message: ClientMessage): ReceivedMessage? { try { - if (!message.containsProperty(TOPIC_PROPERTY)) { - log.warn("Received message without a $TOPIC_PROPERTY property, ignoring") - return null - } - if (!message.containsProperty(SESSION_ID_PROPERTY)) { - log.warn("Received message without a $SESSION_ID_PROPERTY property, ignoring") - return null - } - val topic = message.getStringProperty(TOPIC_PROPERTY) - val sessionID = message.getLongProperty(SESSION_ID_PROPERTY) + val topic = message.required(topicProperty) { getStringProperty(it) } + val sessionID = message.required(sessionIdProperty) { getLongProperty(it) } // Use the magic deduplication property built into Artemis as our message identity too - val uuid = UUID.fromString(message.getStringProperty(HDR_DUPLICATE_DETECTION_ID)) + val uuid = message.required(HDR_DUPLICATE_DETECTION_ID) { UUID.fromString(message.getStringProperty(it)) } val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" } log.trace { "Received message from: ${message.address} user: $user topic: $topic sessionID: $sessionID uuid: $uuid" } @@ -277,11 +276,16 @@ class NodeMessagingClient(override val config: NodeConfiguration, return msg } catch (e: Exception) { - log.error("Internal error whilst reading MQ message", e) + log.error("Unable to process message, ignoring it: $message", e) return null } } + private inline fun ClientMessage.required(key: SimpleString, extractor: ClientMessage.(SimpleString) -> T): T { + require(containsProperty(key)) { "Missing $key" } + return extractor(key) + } + private fun deliver(msg: ReceivedMessage): Boolean { state.checkNotLocked() // Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added @@ -368,16 +372,17 @@ class NodeMessagingClient(override val config: NodeConfiguration, state.locked { val mqAddress = getMQAddress(target) val artemisMessage = session!!.createMessage(true).apply { - val sessionID = message.topicSession.sessionID - putStringProperty(TOPIC_PROPERTY, message.topicSession.topic) - putLongProperty(SESSION_ID_PROPERTY, sessionID) + putStringProperty(nodeVendorProperty, nodeVendor) + putStringProperty(nodeVersionProperty, version) + putStringProperty(topicProperty, SimpleString(message.topicSession.topic)) + putLongProperty(sessionIdProperty, message.topicSession.sessionID) writeBodyBufferBytes(message.data) // Use the magic deduplication property built into Artemis as our message identity too putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString())) // For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended - if (AMQ_DELAY > 0 && message.topicSession.topic == StateMachineManager.sessionTopic.topic) { - putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + AMQ_DELAY) + if (amqDelay > 0 && message.topicSession.topic == StateMachineManager.sessionTopic.topic) { + putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelay) } } log.trace { "Send to: $mqAddress topic: ${message.topicSession.topic} " + @@ -387,7 +392,6 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } - private fun getMQAddress(target: MessageRecipients): String { return if (target == myAddress) { // If we are sending to ourselves then route the message directly to our P2P queue. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 864049e787..91138fa7a2 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -215,7 +215,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, state = FlowSessionState.Initiated(peerParty, sessionInitResponse.initiatedSessionId) } else { sessionInitResponse as SessionReject - throw FlowException("Party ${state.sendToParty} rejected session request: ${sessionInitResponse.errorMessage}") + throw FlowSessionException("Party ${state.sendToParty} rejected session request: ${sessionInitResponse.errorMessage}") } } diff --git a/node/src/test/kotlin/net/corda/node/ArgsParserTest.kt b/node/src/test/kotlin/net/corda/node/ArgsParserTest.kt index fbb042d717..f0a8beca95 100644 --- a/node/src/test/kotlin/net/corda/node/ArgsParserTest.kt +++ b/node/src/test/kotlin/net/corda/node/ArgsParserTest.kt @@ -20,7 +20,8 @@ class ArgsParserTest { help = false, logToConsole = false, loggingLevel = Level.INFO, - isRegistration = false)) + isRegistration = false, + isVersion = false)) } @Test @@ -54,20 +55,6 @@ class ArgsParserTest { assertThat(cmdLineOptions.configFile).isEqualTo(configFile) } - @Test - fun `log-to-console`() { - val cmdLineOptions = parser.parse("--log-to-console") - assertThat(cmdLineOptions.logToConsole).isTrue() - } - - @Test - fun `logging-level`() { - for (level in Level.values()) { - val cmdLineOptions = parser.parse("--logging-level", level.name) - assertThat(cmdLineOptions.loggingLevel).isEqualTo(level) - } - } - @Test fun `both base-directory and config-file`() { assertThatExceptionOfType(IllegalArgumentException::class.java).isThrownBy { @@ -89,6 +76,20 @@ class ArgsParserTest { }.withMessageContaining("config-file") } + @Test + fun `log-to-console`() { + val cmdLineOptions = parser.parse("--log-to-console") + assertThat(cmdLineOptions.logToConsole).isTrue() + } + + @Test + fun `logging-level`() { + for (level in Level.values()) { + val cmdLineOptions = parser.parse("--logging-level", level.name) + assertThat(cmdLineOptions.loggingLevel).isEqualTo(level) + } + } + @Test fun `logging-level without argument`() { assertThatExceptionOfType(OptionException::class.java).isThrownBy { @@ -102,4 +103,16 @@ class ArgsParserTest { parser.parse("--logging-level", "not-a-level") }.withMessageContaining("logging-level") } + + @Test + fun `initial-registration`() { + val cmdLineOptions = parser.parse("--initial-registration") + assertThat(cmdLineOptions.isRegistration).isTrue() + } + + @Test + fun version() { + val cmdLineOptions = parser.parse("--version") + assertThat(cmdLineOptions.isVersion).isTrue() + } } diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index 15cf460d42..de31839105 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -23,6 +23,7 @@ import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.databaseTransaction +import net.corda.testing.MOCK_NODE_VERSION_INFO import net.corda.testing.TestNodeConfiguration import net.corda.testing.freeLocalHostAndPort import net.corda.testing.node.makeTestDataSourceProperties @@ -219,6 +220,7 @@ class ArtemisMessagingTests { return databaseTransaction(database) { NodeMessagingClient( config, + MOCK_NODE_VERSION_INFO, server, identity.public.composite, ServiceAffinityExecutor("ArtemisMessagingTests", 1), diff --git a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt index 08de15a6d9..bc431165fc 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/CoreTestUtils.kt @@ -9,7 +9,9 @@ import com.typesafe.config.Config import net.corda.core.contracts.StateRef import net.corda.core.crypto.* import net.corda.core.flows.FlowLogic +import net.corda.core.node.NodeVersionInfo import net.corda.core.node.ServiceHub +import net.corda.core.node.Version import net.corda.core.serialization.OpaqueBytes import net.corda.core.toFuture import net.corda.core.transactions.TransactionBuilder @@ -82,6 +84,9 @@ val ALL_TEST_KEYS: List get() = listOf(MEGA_CORP_KEY, MINI_CORP_KEY, AL val MOCK_IDENTITY_SERVICE: MockIdentityService get() = MockIdentityService(listOf(MEGA_CORP, MINI_CORP, DUMMY_NOTARY)) +val MOCK_VERSION = Version(0, 0, false) +val MOCK_NODE_VERSION_INFO = NodeVersionInfo(MOCK_VERSION, "Mock revision", "Mock Vendor") + fun generateStateRef() = StateRef(SecureHash.randomSHA256(), 0) /** diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt index c200fc3cf8..893df6c542 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNetworkMapCache.kt @@ -7,20 +7,21 @@ import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.NodeInfo import net.corda.core.node.services.NetworkMapCache import net.corda.node.services.network.InMemoryNetworkMapCache +import net.corda.testing.MOCK_VERSION import rx.Observable import rx.subjects.PublishSubject /** * Network map cache with no backing map service. */ -class MockNetworkMapCache() : InMemoryNetworkMapCache() { +class MockNetworkMapCache : InMemoryNetworkMapCache() { override val changed: Observable = PublishSubject.create() data class MockAddress(val id: String): SingleMessageRecipient init { - val mockNodeA = NodeInfo(MockAddress("bankC:8080"), Party("Bank C", DummyPublicKey("Bank C"))) - val mockNodeB = NodeInfo(MockAddress("bankD:8080"), Party("Bank D", DummyPublicKey("Bank D"))) + val mockNodeA = NodeInfo(MockAddress("bankC:8080"), Party("Bank C", DummyPublicKey("Bank C")), MOCK_VERSION) + val mockNodeB = NodeInfo(MockAddress("bankD:8080"), Party("Bank D", DummyPublicKey("Bank D")), MOCK_VERSION) registeredNodes[mockNodeA.legalIdentity.owningKey] = mockNodeA registeredNodes[mockNodeB.legalIdentity.owningKey] = mockNodeB runWithoutMapService() diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index e17e4262da..1e28762ac6 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -12,6 +12,7 @@ import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.CordaPluginRegistry import net.corda.core.node.PhysicalLocation import net.corda.core.node.ServiceEntry +import net.corda.core.node.Version import net.corda.core.node.services.* import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.core.utilities.loggerFor @@ -27,6 +28,7 @@ import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.node.services.vault.NodeVaultService import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor +import net.corda.testing.MOCK_VERSION import net.corda.testing.TestNodeConfiguration import org.apache.activemq.artemis.utils.ReusableLatch import org.slf4j.Logger @@ -129,9 +131,11 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, advertisedServices: Set, val id: Int, val overrideServices: Map?, - val entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue())) : AbstractNode(config, advertisedServices, TestClock(), mockNet.busyLatch) { + val entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue())) : + AbstractNode(config, advertisedServices, TestClock(), mockNet.busyLatch) { var counter = entropyRoot override val log: Logger = loggerFor() + override val version: Version get() = MOCK_VERSION override val serverThread: AffinityExecutor = if (mockNet.threadPerNode) ServiceAffinityExecutor("Mock node $id thread", 1) diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt index 5d04793a01..6426d5c59b 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -17,6 +17,7 @@ import net.corda.core.utilities.DUMMY_NOTARY import net.corda.node.services.persistence.InMemoryStateMachineRecordedTransactionMappingStorage import net.corda.testing.MEGA_CORP import net.corda.testing.MINI_CORP +import net.corda.testing.MOCK_VERSION import rx.Observable import rx.subjects.PublishSubject import java.io.ByteArrayInputStream @@ -63,7 +64,7 @@ open class MockServices(val key: KeyPair = generateKeyPair()) : ServiceHub { override val networkMapCache: NetworkMapCache get() = throw UnsupportedOperationException() override val clock: Clock get() = Clock.systemUTC() override val schedulerService: SchedulerService get() = throw UnsupportedOperationException() - override val myInfo: NodeInfo get() = NodeInfo(object : SingleMessageRecipient {}, Party("MegaCorp", key.public.composite)) + override val myInfo: NodeInfo get() = NodeInfo(object : SingleMessageRecipient {}, Party("MegaCorp", key.public.composite), MOCK_VERSION) } @ThreadSafe @@ -83,11 +84,7 @@ class MockIdentityService(val identities: List) : IdentityService, Single class MockKeyManagementService(vararg initialKeys: KeyPair) : SingletonSerializeAsToken(), KeyManagementService { - override val keys: MutableMap - - init { - keys = initialKeys.map { it.public to it.private }.toMap(HashMap()) - } + override val keys: MutableMap = initialKeys.associateByTo(HashMap(), { it.public }, { it.private }) val nextKeys = LinkedList() diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt index 0056a816d5..ffb9abb661 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt @@ -14,6 +14,7 @@ import net.corda.node.services.config.ConfigHelper import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.utilities.ServiceIdentityGenerator +import net.corda.testing.MOCK_NODE_VERSION_INFO import net.corda.testing.freeLocalHostAndPort import net.corda.testing.getFreeLocalPorts import org.junit.After @@ -132,7 +133,7 @@ abstract class NodeBasedTest { ) + configOverrides ) - val node = FullNodeConfiguration(baseDirectory, config).createNode() + val node = FullNodeConfiguration(baseDirectory, config).createNode(MOCK_NODE_VERSION_INFO) node.start() nodes += node thread(name = legalName) { diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt index 3794a3a189..fe665bcfe1 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/SimpleNode.kt @@ -5,6 +5,7 @@ import com.google.common.util.concurrent.SettableFuture import net.corda.core.crypto.composite import net.corda.core.crypto.generateKeyPair import net.corda.core.messaging.RPCOps +import net.corda.testing.MOCK_NODE_VERSION_INFO import net.corda.node.services.RPCUserServiceImpl import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.messaging.ArtemisMessagingServer @@ -35,6 +36,7 @@ class SimpleNode(val config: NodeConfiguration, val address: HostAndPort = freeL val net = databaseTransaction(database) { NodeMessagingClient( config, + MOCK_NODE_VERSION_INFO, address, identity.public.composite, executor,