mirror of
https://github.com/corda/corda.git
synced 2025-01-29 15:43:55 +00:00
Added bits of versioning info to the node
This commit is contained in:
parent
570b871524
commit
e9d63b2662
@ -47,6 +47,7 @@ buildscript {
|
||||
classpath 'com.github.ben-manes:gradle-versions-plugin:0.13.0'
|
||||
classpath "org.jetbrains.kotlin:kotlin-noarg:$kotlin_version"
|
||||
classpath "org.jetbrains.dokka:dokka-gradle-plugin:${dokka_version}"
|
||||
classpath "org.ajoberstar:grgit:1.1.0"
|
||||
}
|
||||
}
|
||||
|
||||
@ -56,6 +57,10 @@ plugins {
|
||||
id "us.kirchmeier.capsule" version "1.0.2"
|
||||
}
|
||||
|
||||
ext {
|
||||
corda_revision = org.ajoberstar.grgit.Grgit.open(file('.')).head().id
|
||||
}
|
||||
|
||||
apply plugin: 'kotlin'
|
||||
apply plugin: 'project-report'
|
||||
apply plugin: 'com.github.ben-manes.versions'
|
||||
|
@ -146,6 +146,7 @@ inline fun Path.write(createDirs: Boolean = false, vararg options: OpenOption =
|
||||
}
|
||||
|
||||
inline fun <R> Path.readLines(charset: Charset = UTF_8, block: (Stream<String>) -> R): R = Files.lines(this, charset).use(block)
|
||||
fun Path.readAllLines(charset: Charset = UTF_8): List<String> = Files.readAllLines(this, charset)
|
||||
fun Path.writeLines(lines: Iterable<CharSequence>, charset: Charset = UTF_8, vararg options: OpenOption): Path = Files.write(this, lines, charset, *options)
|
||||
|
||||
fun InputStream.copyTo(target: Path, vararg options: CopyOption): Long = Files.copy(this, target, *options)
|
||||
|
@ -19,6 +19,7 @@ data class ServiceEntry(val info: ServiceInfo, val identity: Party)
|
||||
@CordaSerializable
|
||||
data class NodeInfo(val address: SingleMessageRecipient,
|
||||
val legalIdentity: Party,
|
||||
val version: Version,
|
||||
var advertisedServices: List<ServiceEntry> = emptyList(),
|
||||
val physicalLocation: PhysicalLocation? = null) {
|
||||
init {
|
||||
|
28
core/src/main/kotlin/net/corda/core/node/Version.kt
Normal file
28
core/src/main/kotlin/net/corda/core/node/Version.kt
Normal file
@ -0,0 +1,28 @@
|
||||
package net.corda.core.node
|
||||
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import java.util.regex.Pattern
|
||||
|
||||
/**
|
||||
* Versions of the same [major] version but with different [minor] versions are considered compatible with each other. One
|
||||
* exception to this is when the major version is 0 - each different minor version should be considered incompatible.
|
||||
*
|
||||
* If two [Version]s are equal (i.e. [equals] returns true) but they are both [snapshot] then they may refer to different
|
||||
* builds of the node. [NodeVersionInfo.revision] would be required to differentiate the two.
|
||||
*/
|
||||
@CordaSerializable
|
||||
data class Version(val major: Int, val minor: Int, val snapshot: Boolean) {
|
||||
companion object {
|
||||
private val pattern = Pattern.compile("""(\d+)\.(\d+)(-SNAPSHOT)?""")
|
||||
|
||||
fun parse(string: String): Version {
|
||||
val matcher = pattern.matcher(string)
|
||||
require(matcher.matches())
|
||||
return Version(matcher.group(1).toInt(), matcher.group(2).toInt(), matcher.group(3) != null)
|
||||
}
|
||||
}
|
||||
|
||||
override fun toString(): String = if (snapshot) "$major.$minor-SNAPSHOT" else "$major.$minor"
|
||||
}
|
||||
|
||||
data class NodeVersionInfo(val version: Version, val revision: String, val vendor: String)
|
31
core/src/test/kotlin/net/corda/core/node/VersionTest.kt
Normal file
31
core/src/test/kotlin/net/corda/core/node/VersionTest.kt
Normal file
@ -0,0 +1,31 @@
|
||||
package net.corda.core.node
|
||||
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Test
|
||||
|
||||
class VersionTest {
|
||||
@Test
|
||||
fun `parse valid non-SNAPSHOT string`() {
|
||||
assertThat(Version.parse("1.2")).isEqualTo(Version(1, 2, false))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `parse valid SNAPSHOT string`() {
|
||||
assertThat(Version.parse("2.23-SNAPSHOT")).isEqualTo(Version(2, 23, true))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `parse string with just major number`() {
|
||||
assertThatThrownBy {
|
||||
Version.parse("2")
|
||||
}.isInstanceOf(IllegalArgumentException::class.java)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `parse string with unknown qualifier`() {
|
||||
assertThatThrownBy {
|
||||
Version.parse("2.3-TEST")
|
||||
}.isInstanceOf(IllegalArgumentException::class.java)
|
||||
}
|
||||
}
|
@ -87,6 +87,9 @@ dependencies {
|
||||
// JAnsi: for drawing things to the terminal in nicely coloured ways.
|
||||
compile "org.fusesource.jansi:jansi:$jansi_version"
|
||||
|
||||
// Manifests: for reading stuff from the manifest file
|
||||
compile "com.jcabi:jcabi-manifests:1.1"
|
||||
|
||||
// GraphStream: For visualisation
|
||||
testCompile "org.graphstream:gs-core:1.3"
|
||||
testCompile("org.graphstream:gs-ui:1.3") {
|
||||
|
@ -59,7 +59,6 @@ task buildCordaJAR(type: FatCapsule) {
|
||||
appClassPath = ["jolokia-agent-war-${project.rootProject.ext.jolokia_version}.war"]
|
||||
javaAgents = ["quasar-core-${quasar_version}-jdk8.jar"]
|
||||
systemProperties['visualvm.display.name'] = 'Corda'
|
||||
systemProperties['corda.version'] = corda_version
|
||||
minJavaVersion = '1.8.0'
|
||||
// This version is known to work and avoids earlier 8u versions that have bugs.
|
||||
minUpdateVersion['1.8'] = '102'
|
||||
@ -80,6 +79,8 @@ task buildCordaJAR(type: FatCapsule) {
|
||||
|
||||
manifest {
|
||||
attributes('Corda-Version': corda_version)
|
||||
attributes('Corda-Revision': corda_revision)
|
||||
attributes('Corda-Vendor': 'Corda Open Source')
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -15,6 +15,7 @@ import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.network.NetworkMapService.RegistrationRequest
|
||||
import net.corda.node.services.network.NodeRegistration
|
||||
import net.corda.node.utilities.AddOrRemove
|
||||
import net.corda.testing.MOCK_VERSION
|
||||
import net.corda.testing.TestNodeConfiguration
|
||||
import net.corda.testing.node.NodeBasedTest
|
||||
import net.corda.testing.node.SimpleNode
|
||||
@ -62,7 +63,7 @@ class P2PSecurityTest : NodeBasedTest() {
|
||||
}
|
||||
|
||||
private fun SimpleNode.registerWithNetworkMap(registrationName: String): ListenableFuture<NetworkMapService.RegistrationResponse> {
|
||||
val nodeInfo = NodeInfo(net.myAddress, Party(registrationName, identity.public))
|
||||
val nodeInfo = NodeInfo(net.myAddress, Party(registrationName, identity.public), MOCK_VERSION)
|
||||
val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX)
|
||||
val request = RegistrationRequest(registration.toWire(identity.private), net.myAddress)
|
||||
return net.sendRequest<NetworkMapService.RegistrationResponse>(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.net.myAddress)
|
||||
|
@ -29,6 +29,7 @@ class ArgsParser {
|
||||
.defaultsTo(Level.INFO)
|
||||
private val logToConsoleArg = optionParser.accepts("log-to-console", "If set, prints logging to the console as well as to a file.")
|
||||
private val isRegistrationArg = optionParser.accepts("initial-registration", "Start initial node registration with Corda network to obtain certificate from the permissioning server.")
|
||||
private val isVersionArg = optionParser.accepts("version", "Print the version and exit")
|
||||
private val helpArg = optionParser.accepts("help").forHelp()
|
||||
|
||||
fun parse(vararg args: String): CmdLineOptions {
|
||||
@ -42,7 +43,8 @@ class ArgsParser {
|
||||
val loggingLevel = optionSet.valueOf(loggerLevel)
|
||||
val logToConsole = optionSet.has(logToConsoleArg)
|
||||
val isRegistration = optionSet.has(isRegistrationArg)
|
||||
return CmdLineOptions(baseDirectory, configFile, help, loggingLevel, logToConsole, isRegistration)
|
||||
val isVersion = optionSet.has(isVersionArg)
|
||||
return CmdLineOptions(baseDirectory, configFile, help, loggingLevel, logToConsole, isRegistration, isVersion)
|
||||
}
|
||||
|
||||
fun printHelp(sink: PrintStream) = optionParser.printHelpOn(sink)
|
||||
@ -53,7 +55,8 @@ data class CmdLineOptions(val baseDirectory: Path,
|
||||
val help: Boolean,
|
||||
val loggingLevel: Level,
|
||||
val logToConsole: Boolean,
|
||||
val isRegistration: Boolean) {
|
||||
val isRegistration: Boolean,
|
||||
val isVersion: Boolean) {
|
||||
fun loadConfig(allowMissingConfig: Boolean = false, configOverrides: Map<String, Any?> = emptyMap()): Config {
|
||||
return ConfigHelper.loadConfig(baseDirectory, configFile, allowMissingConfig, configOverrides)
|
||||
}
|
||||
|
@ -1,9 +1,12 @@
|
||||
@file:JvmName("Corda")
|
||||
package net.corda.node
|
||||
|
||||
import com.jcabi.manifests.Manifests
|
||||
import com.typesafe.config.ConfigException
|
||||
import joptsimple.OptionException
|
||||
import net.corda.core.*
|
||||
import net.corda.core.node.NodeVersionInfo
|
||||
import net.corda.core.node.Version
|
||||
import net.corda.core.utilities.Emoji
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
@ -35,6 +38,16 @@ fun main(args: Array<String>) {
|
||||
val startTime = System.currentTimeMillis()
|
||||
checkJavaVersion()
|
||||
|
||||
val nodeVersionInfo = if (Manifests.exists("Corda-Version")) {
|
||||
NodeVersionInfo(
|
||||
Version.parse(Manifests.read("Corda-Version")),
|
||||
Manifests.read("Corda-Revision"),
|
||||
Manifests.read("Corda-Vendor"))
|
||||
} else {
|
||||
// If the manifest properties aren't available then we're running from within an IDE
|
||||
NodeVersionInfo(Version(0, 0, false), "~Git revision unavailable~", "Unknown vendor")
|
||||
}
|
||||
|
||||
val argsParser = ArgsParser()
|
||||
|
||||
val cmdlineOptions = try {
|
||||
@ -45,6 +58,12 @@ fun main(args: Array<String>) {
|
||||
exitProcess(1)
|
||||
}
|
||||
|
||||
if (cmdlineOptions.isVersion) {
|
||||
println("${nodeVersionInfo.vendor} ${nodeVersionInfo.version}")
|
||||
println("Revision ${nodeVersionInfo.revision}")
|
||||
exitProcess(0)
|
||||
}
|
||||
|
||||
// Maybe render command line help.
|
||||
if (cmdlineOptions.help) {
|
||||
argsParser.printHelp(System.out)
|
||||
@ -58,7 +77,7 @@ fun main(args: Array<String>) {
|
||||
renderBasicInfoToConsole = false
|
||||
}
|
||||
|
||||
drawBanner()
|
||||
drawBanner(nodeVersionInfo)
|
||||
|
||||
System.setProperty("log-path", (cmdlineOptions.baseDirectory / "logs").toString())
|
||||
|
||||
@ -97,7 +116,7 @@ fun main(args: Array<String>) {
|
||||
try {
|
||||
cmdlineOptions.baseDirectory.createDirectories()
|
||||
|
||||
val node = conf.createNode()
|
||||
val node = conf.createNode(nodeVersionInfo)
|
||||
node.start()
|
||||
printPluginsAndServices(node)
|
||||
|
||||
@ -116,6 +135,7 @@ fun main(args: Array<String>) {
|
||||
log.error("Exception during node startup", e)
|
||||
exitProcess(1)
|
||||
}
|
||||
|
||||
exitProcess(0)
|
||||
}
|
||||
|
||||
@ -154,13 +174,12 @@ private fun messageOfTheDay(): Pair<String, String> {
|
||||
"Computer science and finance together.\nYou should see our crazy Christmas parties!"
|
||||
)
|
||||
if (Emoji.hasEmojiTerminal)
|
||||
messages +=
|
||||
"Kind of like a regular database but\nwith emojis, colours and ascii art. ${Emoji.coolGuy}"
|
||||
messages += "Kind of like a regular database but\nwith emojis, colours and ascii art. ${Emoji.coolGuy}"
|
||||
val (a, b) = messages.randomOrNull()!!.split('\n')
|
||||
return Pair(a, b)
|
||||
}
|
||||
|
||||
private fun drawBanner() {
|
||||
private fun drawBanner(nodeVersionInfo: NodeVersionInfo) {
|
||||
// This line makes sure ANSI escapes work on Windows, where they aren't supported out of the box.
|
||||
AnsiConsole.systemInstall()
|
||||
|
||||
@ -174,7 +193,7 @@ private fun drawBanner() {
|
||||
/ / __ / ___/ __ / __ `/ """).fgBrightBlue().a(msg1).newline().fgBrightRed().a(
|
||||
"/ /___ /_/ / / / /_/ / /_/ / ").fgBrightBlue().a(msg2).newline().fgBrightRed().a(
|
||||
"""\____/ /_/ \__,_/\__,_/""").reset().newline().newline().fgBrightDefault().bold().
|
||||
a("--- MILESTONE 9 -------------------------------------------------------------------").
|
||||
a("--- ${nodeVersionInfo.vendor} ${nodeVersionInfo.version} (${nodeVersionInfo.revision.take(6)}) -----------------------------------------------").
|
||||
newline().
|
||||
newline().
|
||||
a("${Emoji.books}New! ").reset().a("Training now available worldwide, see https://corda.net/corda-training/").
|
||||
|
@ -97,6 +97,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
|
||||
protected abstract val log: Logger
|
||||
protected abstract val networkMapAddress: SingleMessageRecipient?
|
||||
protected abstract val version: Version
|
||||
|
||||
// We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the
|
||||
// low-performance prototyping period.
|
||||
@ -282,7 +283,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
private fun makeInfo(): NodeInfo {
|
||||
val advertisedServiceEntries = makeServiceEntries()
|
||||
val legalIdentity = obtainLegalIdentity()
|
||||
return NodeInfo(net.myAddress, legalIdentity, advertisedServiceEntries, findMyLocation())
|
||||
return NodeInfo(net.myAddress, legalIdentity, version, advertisedServiceEntries, findMyLocation())
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4,11 +4,11 @@ import com.codahale.metrics.JmxReporter
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import net.corda.core.div
|
||||
import net.corda.core.flatMap
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.*
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.node.NodeVersionInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.Version
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.node.services.ServiceType
|
||||
import net.corda.core.node.services.UniquenessProvider
|
||||
@ -22,14 +22,15 @@ import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.transactions.*
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.node.utilities.AddressUtils
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import org.slf4j.Logger
|
||||
import java.io.RandomAccessFile
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.nio.channels.FileLock
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths
|
||||
import java.time.Clock
|
||||
import javax.management.ObjectName
|
||||
import kotlin.concurrent.thread
|
||||
@ -45,8 +46,14 @@ import kotlin.concurrent.thread
|
||||
*/
|
||||
class Node(override val configuration: FullNodeConfiguration,
|
||||
advertisedServices: Set<ServiceInfo>,
|
||||
val nodeVersionInfo: NodeVersionInfo,
|
||||
clock: Clock = NodeClock()) : AbstractNode(configuration, advertisedServices, clock) {
|
||||
override val log = loggerFor<Node>()
|
||||
companion object {
|
||||
private val logger = loggerFor<Node>()
|
||||
}
|
||||
|
||||
override val log: Logger get() = logger
|
||||
override val version: Version get() = nodeVersionInfo.version
|
||||
override val networkMapAddress: NetworkMapAddress? get() = configuration.networkMapService?.address?.let(::NetworkMapAddress)
|
||||
|
||||
// DISCUSSION
|
||||
@ -103,35 +110,32 @@ class Node(override val configuration: FullNodeConfiguration,
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort starting the node if an existing deployment with a different version is detected in the current directory.
|
||||
* The current version is expected to be specified as a system property. If not provided, the check will be ignored.
|
||||
* Abort starting the node if an existing deployment with a different version is detected in the base directory.
|
||||
*/
|
||||
private fun checkVersionUnchanged() {
|
||||
val currentVersion = System.getProperty("corda.version") ?: return
|
||||
val versionFile = Paths.get("version")
|
||||
if (Files.exists(versionFile)) {
|
||||
val existingVersion = Files.readAllLines(versionFile)[0]
|
||||
check(existingVersion == currentVersion) {
|
||||
"Version change detected - current: $currentVersion, existing: $existingVersion. Node upgrades are not yet supported."
|
||||
val versionFile = configuration.baseDirectory / "version"
|
||||
if (versionFile.exists()) {
|
||||
val previousVersion = Version.parse(versionFile.readAllLines()[0])
|
||||
check(nodeVersionInfo.version.major == previousVersion.major) {
|
||||
"Major version change detected - current: ${nodeVersionInfo.version}, previous: $previousVersion. " +
|
||||
"Node upgrades across major versions are not yet supported."
|
||||
}
|
||||
} else {
|
||||
Files.write(versionFile, currentVersion.toByteArray())
|
||||
}
|
||||
versionFile.writeLines(listOf(nodeVersionInfo.version.toString()))
|
||||
}
|
||||
|
||||
override fun makeMessagingService(): MessagingServiceInternal {
|
||||
userService = RPCUserServiceImpl(configuration)
|
||||
val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker()
|
||||
val myIdentityOrNullIfNetworkMapService = if (networkMapAddress != null) obtainLegalIdentity().owningKey else null
|
||||
|
||||
return NodeMessagingClient(
|
||||
configuration,
|
||||
nodeVersionInfo,
|
||||
serverAddress,
|
||||
myIdentityOrNullIfNetworkMapService,
|
||||
serverThread,
|
||||
database,
|
||||
networkMapRegistrationFuture
|
||||
)
|
||||
networkMapRegistrationFuture)
|
||||
}
|
||||
|
||||
private fun makeLocalMessageBroker(): HostAndPort {
|
||||
|
@ -3,6 +3,7 @@ package net.corda.node.services.config
|
||||
import com.google.common.net.HostAndPort
|
||||
import com.typesafe.config.Config
|
||||
import net.corda.core.div
|
||||
import net.corda.core.node.NodeVersionInfo
|
||||
import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.node.internal.NetworkMapInfo
|
||||
import net.corda.node.internal.Node
|
||||
@ -77,7 +78,7 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config
|
||||
.getListOrElse<String>("notaryClusterAddresses") { emptyList() }
|
||||
.map { HostAndPort.fromString(it) }
|
||||
|
||||
fun createNode(): Node {
|
||||
fun createNode(nodeVersionInfo: NodeVersionInfo): Node {
|
||||
// This is a sanity feature do not remove.
|
||||
require(!useTestClock || devMode) { "Cannot use test clock outside of dev mode" }
|
||||
|
||||
@ -87,7 +88,7 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config
|
||||
.toMutableSet()
|
||||
if (networkMapService == null) advertisedServices.add(ServiceInfo(NetworkMapService.type))
|
||||
|
||||
return Node(this, advertisedServices, if (useTestClock) TestClock() else NodeClock())
|
||||
return Node(this, advertisedServices, nodeVersionInfo, if (useTestClock) TestClock() else NodeClock())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@ import com.google.common.util.concurrent.ListenableFuture
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.core.crypto.CompositeKey
|
||||
import net.corda.core.messaging.*
|
||||
import net.corda.core.node.NodeVersionInfo
|
||||
import net.corda.core.node.services.PartyInfo
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.opaque
|
||||
@ -21,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.Message.*
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
import org.bouncycastle.asn1.x500.X500Name
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
@ -53,6 +55,7 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
*/
|
||||
@ThreadSafe
|
||||
class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
nodeVersionInfo: NodeVersionInfo,
|
||||
val serverHostPort: HostAndPort,
|
||||
val myIdentity: CompositeKey?,
|
||||
val nodeExecutor: AffinityExecutor,
|
||||
@ -65,9 +68,11 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
|
||||
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
|
||||
// confusion.
|
||||
const val TOPIC_PROPERTY = "platform-topic"
|
||||
const val SESSION_ID_PROPERTY = "session-id"
|
||||
private val AMQ_DELAY: Int = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0"))
|
||||
private val topicProperty = SimpleString("platform-topic")
|
||||
private val sessionIdProperty = SimpleString("session-id")
|
||||
private val nodeVersionProperty = SimpleString("node-version")
|
||||
private val nodeVendorProperty = SimpleString("node-vendor")
|
||||
private val amqDelay: Int = Integer.valueOf(System.getProperty("amq.delivery.delay.ms", "0"))
|
||||
}
|
||||
|
||||
private class InnerState {
|
||||
@ -87,6 +92,8 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
data class Handler(val topicSession: TopicSession,
|
||||
val callback: (ReceivedMessage, MessageHandlerRegistration) -> Unit) : MessageHandlerRegistration
|
||||
|
||||
private val nodeVendor = SimpleString(nodeVersionInfo.vendor)
|
||||
private val version = SimpleString(nodeVersionInfo.version.toString())
|
||||
/** An executor for sending messages */
|
||||
private val messagingExecutor = AffinityExecutor.ServiceAffinityExecutor("Messaging", 1)
|
||||
|
||||
@ -130,7 +137,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
// using our TLS certificate.
|
||||
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
|
||||
// size of 1MB is acknowledged.
|
||||
val session = clientFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
|
||||
val session = clientFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE)
|
||||
this.session = session
|
||||
session.start()
|
||||
|
||||
@ -165,7 +172,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
private fun makeP2PConsumer(session: ClientSession, networkMapOnly: Boolean): ClientConsumer {
|
||||
return if (networkMapOnly) {
|
||||
// Filter for just the network map messages.
|
||||
val messageFilter = "hyphenated_props:$TOPIC_PROPERTY like 'platform.network_map.%'"
|
||||
val messageFilter = "hyphenated_props:$topicProperty like 'platform.network_map.%'"
|
||||
session.createConsumer(P2P_QUEUE, messageFilter)
|
||||
} else
|
||||
session.createConsumer(P2P_QUEUE)
|
||||
@ -249,18 +256,10 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
|
||||
private fun artemisToCordaMessage(message: ClientMessage): ReceivedMessage? {
|
||||
try {
|
||||
if (!message.containsProperty(TOPIC_PROPERTY)) {
|
||||
log.warn("Received message without a $TOPIC_PROPERTY property, ignoring")
|
||||
return null
|
||||
}
|
||||
if (!message.containsProperty(SESSION_ID_PROPERTY)) {
|
||||
log.warn("Received message without a $SESSION_ID_PROPERTY property, ignoring")
|
||||
return null
|
||||
}
|
||||
val topic = message.getStringProperty(TOPIC_PROPERTY)
|
||||
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
|
||||
val topic = message.required(topicProperty) { getStringProperty(it) }
|
||||
val sessionID = message.required(sessionIdProperty) { getLongProperty(it) }
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
val uuid = UUID.fromString(message.getStringProperty(HDR_DUPLICATE_DETECTION_ID))
|
||||
val uuid = message.required(HDR_DUPLICATE_DETECTION_ID) { UUID.fromString(message.getStringProperty(it)) }
|
||||
val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" }
|
||||
log.trace { "Received message from: ${message.address} user: $user topic: $topic sessionID: $sessionID uuid: $uuid" }
|
||||
|
||||
@ -277,11 +276,16 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
|
||||
return msg
|
||||
} catch (e: Exception) {
|
||||
log.error("Internal error whilst reading MQ message", e)
|
||||
log.error("Unable to process message, ignoring it: $message", e)
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
private inline fun <T> ClientMessage.required(key: SimpleString, extractor: ClientMessage.(SimpleString) -> T): T {
|
||||
require(containsProperty(key)) { "Missing $key" }
|
||||
return extractor(key)
|
||||
}
|
||||
|
||||
private fun deliver(msg: ReceivedMessage): Boolean {
|
||||
state.checkNotLocked()
|
||||
// Because handlers is a COW list, the loop inside filter will operate on a snapshot. Handlers being added
|
||||
@ -368,16 +372,17 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
state.locked {
|
||||
val mqAddress = getMQAddress(target)
|
||||
val artemisMessage = session!!.createMessage(true).apply {
|
||||
val sessionID = message.topicSession.sessionID
|
||||
putStringProperty(TOPIC_PROPERTY, message.topicSession.topic)
|
||||
putLongProperty(SESSION_ID_PROPERTY, sessionID)
|
||||
putStringProperty(nodeVendorProperty, nodeVendor)
|
||||
putStringProperty(nodeVersionProperty, version)
|
||||
putStringProperty(topicProperty, SimpleString(message.topicSession.topic))
|
||||
putLongProperty(sessionIdProperty, message.topicSession.sessionID)
|
||||
writeBodyBufferBytes(message.data)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString()))
|
||||
|
||||
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
|
||||
if (AMQ_DELAY > 0 && message.topicSession.topic == StateMachineManager.sessionTopic.topic) {
|
||||
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + AMQ_DELAY)
|
||||
if (amqDelay > 0 && message.topicSession.topic == StateMachineManager.sessionTopic.topic) {
|
||||
putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelay)
|
||||
}
|
||||
}
|
||||
log.trace { "Send to: $mqAddress topic: ${message.topicSession.topic} " +
|
||||
@ -387,7 +392,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private fun getMQAddress(target: MessageRecipients): String {
|
||||
return if (target == myAddress) {
|
||||
// If we are sending to ourselves then route the message directly to our P2P queue.
|
||||
|
@ -215,7 +215,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
state = FlowSessionState.Initiated(peerParty, sessionInitResponse.initiatedSessionId)
|
||||
} else {
|
||||
sessionInitResponse as SessionReject
|
||||
throw FlowException("Party ${state.sendToParty} rejected session request: ${sessionInitResponse.errorMessage}")
|
||||
throw FlowSessionException("Party ${state.sendToParty} rejected session request: ${sessionInitResponse.errorMessage}")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,8 @@ class ArgsParserTest {
|
||||
help = false,
|
||||
logToConsole = false,
|
||||
loggingLevel = Level.INFO,
|
||||
isRegistration = false))
|
||||
isRegistration = false,
|
||||
isVersion = false))
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -54,20 +55,6 @@ class ArgsParserTest {
|
||||
assertThat(cmdLineOptions.configFile).isEqualTo(configFile)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `log-to-console`() {
|
||||
val cmdLineOptions = parser.parse("--log-to-console")
|
||||
assertThat(cmdLineOptions.logToConsole).isTrue()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `logging-level`() {
|
||||
for (level in Level.values()) {
|
||||
val cmdLineOptions = parser.parse("--logging-level", level.name)
|
||||
assertThat(cmdLineOptions.loggingLevel).isEqualTo(level)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `both base-directory and config-file`() {
|
||||
assertThatExceptionOfType(IllegalArgumentException::class.java).isThrownBy {
|
||||
@ -89,6 +76,20 @@ class ArgsParserTest {
|
||||
}.withMessageContaining("config-file")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `log-to-console`() {
|
||||
val cmdLineOptions = parser.parse("--log-to-console")
|
||||
assertThat(cmdLineOptions.logToConsole).isTrue()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `logging-level`() {
|
||||
for (level in Level.values()) {
|
||||
val cmdLineOptions = parser.parse("--logging-level", level.name)
|
||||
assertThat(cmdLineOptions.loggingLevel).isEqualTo(level)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `logging-level without argument`() {
|
||||
assertThatExceptionOfType(OptionException::class.java).isThrownBy {
|
||||
@ -102,4 +103,16 @@ class ArgsParserTest {
|
||||
parser.parse("--logging-level", "not-a-level")
|
||||
}.withMessageContaining("logging-level")
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `initial-registration`() {
|
||||
val cmdLineOptions = parser.parse("--initial-registration")
|
||||
assertThat(cmdLineOptions.isRegistration).isTrue()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun version() {
|
||||
val cmdLineOptions = parser.parse("--version")
|
||||
assertThat(cmdLineOptions.isVersion).isTrue()
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.databaseTransaction
|
||||
import net.corda.testing.MOCK_NODE_VERSION_INFO
|
||||
import net.corda.testing.TestNodeConfiguration
|
||||
import net.corda.testing.freeLocalHostAndPort
|
||||
import net.corda.testing.node.makeTestDataSourceProperties
|
||||
@ -219,6 +220,7 @@ class ArtemisMessagingTests {
|
||||
return databaseTransaction(database) {
|
||||
NodeMessagingClient(
|
||||
config,
|
||||
MOCK_NODE_VERSION_INFO,
|
||||
server,
|
||||
identity.public.composite,
|
||||
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
|
||||
|
@ -9,7 +9,9 @@ import com.typesafe.config.Config
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.crypto.*
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.node.NodeVersionInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.Version
|
||||
import net.corda.core.serialization.OpaqueBytes
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
@ -82,6 +84,9 @@ val ALL_TEST_KEYS: List<KeyPair> get() = listOf(MEGA_CORP_KEY, MINI_CORP_KEY, AL
|
||||
|
||||
val MOCK_IDENTITY_SERVICE: MockIdentityService get() = MockIdentityService(listOf(MEGA_CORP, MINI_CORP, DUMMY_NOTARY))
|
||||
|
||||
val MOCK_VERSION = Version(0, 0, false)
|
||||
val MOCK_NODE_VERSION_INFO = NodeVersionInfo(MOCK_VERSION, "Mock revision", "Mock Vendor")
|
||||
|
||||
fun generateStateRef() = StateRef(SecureHash.randomSHA256(), 0)
|
||||
|
||||
/**
|
||||
|
@ -7,20 +7,21 @@ import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.node.services.network.InMemoryNetworkMapCache
|
||||
import net.corda.testing.MOCK_VERSION
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
|
||||
/**
|
||||
* Network map cache with no backing map service.
|
||||
*/
|
||||
class MockNetworkMapCache() : InMemoryNetworkMapCache() {
|
||||
class MockNetworkMapCache : InMemoryNetworkMapCache() {
|
||||
override val changed: Observable<NetworkMapCache.MapChange> = PublishSubject.create<NetworkMapCache.MapChange>()
|
||||
|
||||
data class MockAddress(val id: String): SingleMessageRecipient
|
||||
|
||||
init {
|
||||
val mockNodeA = NodeInfo(MockAddress("bankC:8080"), Party("Bank C", DummyPublicKey("Bank C")))
|
||||
val mockNodeB = NodeInfo(MockAddress("bankD:8080"), Party("Bank D", DummyPublicKey("Bank D")))
|
||||
val mockNodeA = NodeInfo(MockAddress("bankC:8080"), Party("Bank C", DummyPublicKey("Bank C")), MOCK_VERSION)
|
||||
val mockNodeB = NodeInfo(MockAddress("bankD:8080"), Party("Bank D", DummyPublicKey("Bank D")), MOCK_VERSION)
|
||||
registeredNodes[mockNodeA.legalIdentity.owningKey] = mockNodeA
|
||||
registeredNodes[mockNodeB.legalIdentity.owningKey] = mockNodeB
|
||||
runWithoutMapService()
|
||||
|
@ -12,6 +12,7 @@ import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.node.CordaPluginRegistry
|
||||
import net.corda.core.node.PhysicalLocation
|
||||
import net.corda.core.node.ServiceEntry
|
||||
import net.corda.core.node.Version
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.utilities.DUMMY_NOTARY_KEY
|
||||
import net.corda.core.utilities.loggerFor
|
||||
@ -27,6 +28,7 @@ import net.corda.node.services.transactions.ValidatingNotaryService
|
||||
import net.corda.node.services.vault.NodeVaultService
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||
import net.corda.testing.MOCK_VERSION
|
||||
import net.corda.testing.TestNodeConfiguration
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.slf4j.Logger
|
||||
@ -129,9 +131,11 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
advertisedServices: Set<ServiceInfo>,
|
||||
val id: Int,
|
||||
val overrideServices: Map<ServiceInfo, KeyPair>?,
|
||||
val entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue())) : AbstractNode(config, advertisedServices, TestClock(), mockNet.busyLatch) {
|
||||
val entropyRoot: BigInteger = BigInteger.valueOf(random63BitValue())) :
|
||||
AbstractNode(config, advertisedServices, TestClock(), mockNet.busyLatch) {
|
||||
var counter = entropyRoot
|
||||
override val log: Logger = loggerFor<MockNode>()
|
||||
override val version: Version get() = MOCK_VERSION
|
||||
override val serverThread: AffinityExecutor =
|
||||
if (mockNet.threadPerNode)
|
||||
ServiceAffinityExecutor("Mock node $id thread", 1)
|
||||
|
@ -17,6 +17,7 @@ import net.corda.core.utilities.DUMMY_NOTARY
|
||||
import net.corda.node.services.persistence.InMemoryStateMachineRecordedTransactionMappingStorage
|
||||
import net.corda.testing.MEGA_CORP
|
||||
import net.corda.testing.MINI_CORP
|
||||
import net.corda.testing.MOCK_VERSION
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.io.ByteArrayInputStream
|
||||
@ -63,7 +64,7 @@ open class MockServices(val key: KeyPair = generateKeyPair()) : ServiceHub {
|
||||
override val networkMapCache: NetworkMapCache get() = throw UnsupportedOperationException()
|
||||
override val clock: Clock get() = Clock.systemUTC()
|
||||
override val schedulerService: SchedulerService get() = throw UnsupportedOperationException()
|
||||
override val myInfo: NodeInfo get() = NodeInfo(object : SingleMessageRecipient {}, Party("MegaCorp", key.public.composite))
|
||||
override val myInfo: NodeInfo get() = NodeInfo(object : SingleMessageRecipient {}, Party("MegaCorp", key.public.composite), MOCK_VERSION)
|
||||
}
|
||||
|
||||
@ThreadSafe
|
||||
@ -83,11 +84,7 @@ class MockIdentityService(val identities: List<Party>) : IdentityService, Single
|
||||
|
||||
|
||||
class MockKeyManagementService(vararg initialKeys: KeyPair) : SingletonSerializeAsToken(), KeyManagementService {
|
||||
override val keys: MutableMap<PublicKey, PrivateKey>
|
||||
|
||||
init {
|
||||
keys = initialKeys.map { it.public to it.private }.toMap(HashMap())
|
||||
}
|
||||
override val keys: MutableMap<PublicKey, PrivateKey> = initialKeys.associateByTo(HashMap(), { it.public }, { it.private })
|
||||
|
||||
val nextKeys = LinkedList<KeyPair>()
|
||||
|
||||
|
@ -14,6 +14,7 @@ import net.corda.node.services.config.ConfigHelper
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.node.utilities.ServiceIdentityGenerator
|
||||
import net.corda.testing.MOCK_NODE_VERSION_INFO
|
||||
import net.corda.testing.freeLocalHostAndPort
|
||||
import net.corda.testing.getFreeLocalPorts
|
||||
import org.junit.After
|
||||
@ -132,7 +133,7 @@ abstract class NodeBasedTest {
|
||||
) + configOverrides
|
||||
)
|
||||
|
||||
val node = FullNodeConfiguration(baseDirectory, config).createNode()
|
||||
val node = FullNodeConfiguration(baseDirectory, config).createNode(MOCK_NODE_VERSION_INFO)
|
||||
node.start()
|
||||
nodes += node
|
||||
thread(name = legalName) {
|
||||
|
@ -5,6 +5,7 @@ import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.crypto.composite
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.testing.MOCK_NODE_VERSION_INFO
|
||||
import net.corda.node.services.RPCUserServiceImpl
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
@ -35,6 +36,7 @@ class SimpleNode(val config: NodeConfiguration, val address: HostAndPort = freeL
|
||||
val net = databaseTransaction(database) {
|
||||
NodeMessagingClient(
|
||||
config,
|
||||
MOCK_NODE_VERSION_INFO,
|
||||
address,
|
||||
identity.public.composite,
|
||||
executor,
|
||||
|
Loading…
x
Reference in New Issue
Block a user