Merge branch 'master' into shams-master-merge-291117

# Conflicts:
#	node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
#	node/src/main/kotlin/net/corda/node/services/network/PersistentNetworkMapCache.kt
#	node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt
#	samples/notary-demo/src/main/kotlin/net/corda/notarydemo/BFTNotaryCordform.kt
#	testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt
#	testing/node-driver/src/main/kotlin/net/corda/testing/internal/demorun/DemoRunner.kt
#	testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNode.kt
This commit is contained in:
Shams Asari 2017-11-29 18:00:16 +00:00
commit 71763ff1d3
171 changed files with 2334 additions and 1950 deletions

View File

@ -1,15 +0,0 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="BankOfCordaDriverKt - Issue Web" type="JetRunConfigurationType" factoryName="Kotlin">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
<option name="MAIN_CLASS_NAME" value="net.corda.bank.BankOfCordaDriverKt" />
<option name="VM_PARAMETERS" value="" />
<option name="PROGRAM_PARAMETERS" value="--role ISSUE_CASH_WEB --quantity 100 --currency USD" />
<option name="WORKING_DIRECTORY" value="" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
<option name="ALTERNATIVE_JRE_PATH" />
<option name="PASS_PARENT_ENVS" value="true" />
<module name="bank-of-corda-demo_main" />
<envs />
<method />
</configuration>
</component>

View File

@ -1,15 +0,0 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="BankOfCordaDriverKt - Run Stack" type="JetRunConfigurationType" factoryName="Kotlin">
<extension name="coverage" enabled="false" merge="false" sample_coverage="true" runner="idea" />
<option name="MAIN_CLASS_NAME" value="net.corda.bank.BankOfCordaDriverKt" />
<option name="VM_PARAMETERS" value="" />
<option name="PROGRAM_PARAMETERS" value="--role ISSUER" />
<option name="WORKING_DIRECTORY" value="" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false" />
<option name="ALTERNATIVE_JRE_PATH" />
<option name="PASS_PARENT_ENVS" value="true" />
<module name="bank-of-corda-demo_main" />
<envs />
<method />
</configuration>
</component>

View File

@ -6,8 +6,7 @@ buildscript {
// Our version: bump this on release.
ext.corda_release_version = "3.0-NETWORKMAP-SNAPSHOT"
// Increment this on any release that changes public APIs anywhere in the Corda platform
// TODO This is going to be difficult until we have a clear separation throughout the code of what is public and what is internal
ext.corda_platform_version = 2
ext.corda_platform_version = constants.getProperty("platformVersion")
ext.gradle_plugins_version = constants.getProperty("gradlePluginsVersion")
// Dependency versions. Can run 'gradle dependencyUpdates' to find new versions of things.
@ -49,6 +48,7 @@ buildscript {
ext.commons_collections_version = '4.1'
ext.beanutils_version = '1.9.3'
ext.crash_version = 'faba68332800f21278c5b600bf14ad55cef5989e'
ext.jsr305_version = constants.getProperty("jsr305Version")
// Update 121 is required for ObjectInputFilter and at time of writing 131 was latest:
ext.java8_minUpdateVersion = '131'

View File

@ -12,11 +12,14 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.*
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.driver.poll
import net.corda.testing.internal.*
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Rule
import org.junit.Test
import rx.Observable
import rx.subjects.PublishSubject
@ -26,6 +29,14 @@ import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
class RPCStabilityTests {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
private val pool = Executors.newFixedThreadPool(10, testThreadFactory())
@After
fun shutdown() {
pool.shutdown()
}
object DummyOps : RPCOps {
override val protocolVersion = 0
@ -197,9 +208,9 @@ class RPCStabilityTests {
val proxy = startRpcClient<LeakObservableOps>(server.get().broker.hostAndPort!!).get()
// Leak many observables
val N = 200
(1..N).toList().parallelStream().forEach {
proxy.leakObservable()
}
(1..N).map {
pool.fork { proxy.leakObservable(); Unit }
}.transpose().getOrThrow()
// In a loop force GC and check whether the server is notified
while (true) {
System.gc()
@ -231,7 +242,7 @@ class RPCStabilityTests {
assertEquals("pong", client.ping())
serverFollower.shutdown()
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
val pingFuture = ForkJoinPool.commonPool().fork(client::ping)
val pingFuture = pool.fork(client::ping)
assertEquals("pong", pingFuture.getOrThrow(10.seconds))
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}

View File

@ -74,7 +74,7 @@ public class StandaloneCordaRPCJavaClientTest {
}
private void copyFinanceCordapp() {
Path cordappsDir = (factory.baseDirectory(notaryConfig).resolve("cordapps"));
Path cordappsDir = (factory.baseDirectory(notaryConfig).resolve(NodeProcess.CORDAPPS_DIR_NAME));
try {
Files.createDirectories(cordappsDir);
} catch (IOException ex) {

View File

@ -86,7 +86,7 @@ class StandaloneCordaRPClientTest {
}
private fun copyFinanceCordapp() {
val cordappsDir = (factory.baseDirectory(notaryConfig) / "cordapps").createDirectories()
val cordappsDir = (factory.baseDirectory(notaryConfig) / NodeProcess.CORDAPPS_DIR_NAME).createDirectories()
// Find the finance jar file for the smoke tests of this module
val financeJar = Paths.get("build", "resources", "smokeTest").list {
it.filter { "corda-finance" in it.toString() }.toList().single()

View File

@ -6,14 +6,20 @@ import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.RPCOps
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.User
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.internal.RPCDriverExposedDSLInterface
import net.corda.testing.internal.rpcTestUser
import net.corda.testing.internal.startInVmRpcClient
import net.corda.testing.internal.startRpcClient
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.junit.Rule
import org.junit.runners.Parameterized
open class AbstractRPCTest {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
enum class RPCTestMode {
InVm,
Netty

View File

@ -5,19 +5,22 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.millis
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.internal.RPCDriverExposedDSLInterface
import net.corda.testing.internal.rpcDriver
import net.corda.testing.internal.testThreadFactory
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet
import org.junit.After
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import rx.Observable
import rx.subjects.UnicastSubject
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.*
@RunWith(Parameterized::class)
class RPCConcurrencyTests : AbstractRPCTest() {
@ -36,7 +39,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
fun getParallelObservableTree(depth: Int, branchingFactor: Int): ObservableRose<Int>
}
class TestOpsImpl : TestOps {
class TestOpsImpl(private val pool: Executor) : TestOps {
private val latches = ConcurrentHashMap<Long, CountDownLatch>()
override val protocolVersion = 0
@ -68,24 +71,22 @@ class RPCConcurrencyTests : AbstractRPCTest() {
val branches = if (depth == 0) {
Observable.empty<ObservableRose<Int>>()
} else {
val publish = UnicastSubject.create<ObservableRose<Int>>()
ForkJoinPool.commonPool().fork {
(1..branchingFactor).toList().parallelStream().forEach {
publish.onNext(getParallelObservableTree(depth - 1, branchingFactor))
UnicastSubject.create<ObservableRose<Int>>().also { publish ->
(1..branchingFactor).map {
pool.fork { publish.onNext(getParallelObservableTree(depth - 1, branchingFactor)) }
}.transpose().then {
it.getOrThrow()
publish.onCompleted()
}
publish.onCompleted()
}
publish
}
return ObservableRose(depth, branches)
}
}
private lateinit var testOpsImpl: TestOpsImpl
private fun RPCDriverExposedDSLInterface.testProxy(): TestProxy<TestOps> {
testOpsImpl = TestOpsImpl()
return testProxy<TestOps>(
testOpsImpl,
TestOpsImpl(pool),
clientConfiguration = RPCClientConfiguration.default.copy(
reapInterval = 100.millis,
cacheConcurrencyLevel = 16
@ -96,6 +97,12 @@ class RPCConcurrencyTests : AbstractRPCTest() {
)
}
private val pool = Executors.newFixedThreadPool(10, testThreadFactory())
@After
fun shutdown() {
pool.shutdown()
}
@Test
fun `call multiple RPCs in parallel`() {
rpcDriver {
@ -103,19 +110,17 @@ class RPCConcurrencyTests : AbstractRPCTest() {
val numberOfBlockedCalls = 2
val numberOfDownsRequired = 100
val id = proxy.ops.newLatch(numberOfDownsRequired)
val done = CountDownLatch(numberOfBlockedCalls)
// Start a couple of blocking RPC calls
(1..numberOfBlockedCalls).forEach {
ForkJoinPool.commonPool().fork {
val done = (1..numberOfBlockedCalls).map {
pool.fork {
proxy.ops.waitLatch(id)
done.countDown()
}
}
}.transpose()
// Down the latch that the others are waiting for concurrently
(1..numberOfDownsRequired).toList().parallelStream().forEach {
proxy.ops.downLatch(id)
}
done.await()
(1..numberOfDownsRequired).map {
pool.fork { proxy.ops.downLatch(id) }
}.transpose().getOrThrow()
done.getOrThrow()
}
}
@ -146,7 +151,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
fun ObservableRose<Int>.subscribeToAll() {
remainingLatch.countDown()
this.branches.subscribe { tree ->
(tree.value + 1..treeDepth - 1).forEach {
(tree.value + 1 until treeDepth).forEach {
require(it in depthsSeen) { "Got ${tree.value} before $it" }
}
depthsSeen.add(tree.value)
@ -165,11 +170,11 @@ class RPCConcurrencyTests : AbstractRPCTest() {
val treeDepth = 2
val treeBranchingFactor = 10
val remainingLatch = CountDownLatch((intPower(treeBranchingFactor, treeDepth + 1) - 1) / (treeBranchingFactor - 1))
val depthsSeen = Collections.synchronizedSet(HashSet<Int>())
val depthsSeen = ConcurrentHashSet<Int>()
fun ObservableRose<Int>.subscribeToAll() {
remainingLatch.countDown()
branches.subscribe { tree ->
(tree.value + 1..treeDepth - 1).forEach {
(tree.value + 1 until treeDepth).forEach {
require(it in depthsSeen) { "Got ${tree.value} before $it" }
}
depthsSeen.add(tree.value)

View File

@ -5,12 +5,18 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.*
import net.corda.core.utilities.getOrThrow
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.internal.rpcDriver
import net.corda.testing.internal.startRpcClient
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Rule
import org.junit.Test
class RPCFailureTests {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
class Unserializable
interface Ops : RPCOps {
fun getUnserializable(): Unserializable

View File

@ -49,8 +49,7 @@ class IdentitySyncFlowTests {
val alice: Party = aliceNode.info.singleIdentity()
val bob: Party = bobNode.info.singleIdentity()
val notary = mockNet.defaultNotaryIdentity
bobNode.internals.registerInitiatedFlow(Receive::class.java)
bobNode.registerInitiatedFlow(Receive::class.java)
// Alice issues then pays some cash to a new confidential identity that Bob doesn't know about
val anonymous = true
val ref = OpaqueBytes.of(0x01)
@ -80,8 +79,7 @@ class IdentitySyncFlowTests {
val bob: Party = bobNode.info.singleIdentity()
val charlie: Party = charlieNode.info.singleIdentity()
val notary = mockNet.defaultNotaryIdentity
bobNode.internals.registerInitiatedFlow(Receive::class.java)
bobNode.registerInitiatedFlow(Receive::class.java)
// Charlie issues then pays some cash to a new confidential identity
val anonymous = true
val ref = OpaqueBytes.of(0x01)

View File

@ -1,5 +1,7 @@
gradlePluginsVersion=2.0.9
gradlePluginsVersion=3.0.0
kotlinVersion=1.1.60
platformVersion=2
guavaVersion=21.0
bouncycastleVersion=1.57
typesafeConfigVersion=1.3.1
typesafeConfigVersion=1.3.1
jsr305Version=3.0.2

View File

@ -78,7 +78,7 @@ dependencies {
compileOnly "co.paralleluniverse:quasar-core:$quasar_version:jdk8"
// Thread safety annotations
compile "com.google.code.findbugs:jsr305:3.0.1"
compile "com.google.code.findbugs:jsr305:$jsr305_version"
// Log4J: logging framework (ONLY explicitly referenced by net.corda.core.utilities.Logging.kt)
compile "org.apache.logging.log4j:log4j-core:${log4j_version}"

View File

@ -31,6 +31,8 @@ import java.time.Duration
import java.time.temporal.Temporal
import java.util.*
import java.util.Spliterator.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
import java.util.stream.IntStream
import java.util.stream.Stream
import java.util.stream.StreamSupport
@ -307,3 +309,10 @@ fun TransactionBuilder.toLedgerTransaction(services: ServiceHub, serializationCo
val KClass<*>.packageName: String get() = java.`package`.name
fun URL.openHttpConnection(): HttpURLConnection = openConnection() as HttpURLConnection
/** Analogous to [Thread.join]. */
fun ExecutorService.join() {
shutdown() // Do not change to shutdownNow, tests use this method to assert the executor has no more tasks.
while (!awaitTermination(1, TimeUnit.SECONDS)) {
// Try forever. Do not give up, tests use this method to assert the executor has no more tasks.
}
}

View File

@ -43,7 +43,7 @@ class ThreadLocalToggleField<T>(name: String) : ToggleField<T>(name) {
}
/** The named thread has leaked from a previous test. */
class ThreadLeakException : RuntimeException("Leaked thread detected: ${Thread.currentThread().name}")
class ThreadLeakException(valueToString: String) : RuntimeException("Leaked thread '${Thread.currentThread().name}' detected, value was: $valueToString")
/** @param isAGlobalThreadBeingCreated whether a global thread (that should not inherit any value) is being created. */
class InheritableThreadLocalToggleField<T>(name: String,
@ -54,16 +54,12 @@ class InheritableThreadLocalToggleField<T>(name: String,
}
private inner class Holder(value: T) : AtomicReference<T?>(value) {
fun valueOrDeclareLeak() = get() ?: throw ThreadLeakException()
private val valueToString = value.toString() // We never set another non-null value.
fun valueOrDeclareLeak() = get() ?: throw ThreadLeakException(valueToString)
fun childValue(): Holder? {
val e = ThreadLeakException() // Expensive, but so is starting the new thread.
return if (isAGlobalThreadBeingCreated(e.stackTrace)) {
get() ?: log.warn(e.message)
null
} else {
get() ?: log.error(e.message)
this
}
val e = ThreadLeakException(valueToString) // Expensive, but so is starting the new thread.
get() ?: log.warn(e.message)
return if (isAGlobalThreadBeingCreated(e.stackTrace)) null else this
}
}

View File

@ -16,7 +16,7 @@ interface SerializationEnvironment {
val checkpointContext: SerializationContext
}
class SerializationEnvironmentImpl(
open class SerializationEnvironmentImpl(
override val serializationFactory: SerializationFactory,
override val p2pContext: SerializationContext,
rpcServerContext: SerializationContext? = null,

View File

@ -0,0 +1,75 @@
package net.corda.core
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.*
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.nodeapi.User
import net.corda.smoketesting.NodeConfig
import net.corda.smoketesting.NodeProcess
import net.corda.testing.common.internal.ProjectStructure
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.nio.file.Paths
import java.util.*
import java.util.concurrent.atomic.AtomicInteger
import java.util.jar.JarFile
import kotlin.streams.toList
class NodeVersioningTest {
private companion object {
val user = User("user1", "test", permissions = setOf("ALL"))
val port = AtomicInteger(15100)
val expectedPlatformVersion = (ProjectStructure.projectRootDir / "constants.properties").read {
val constants = Properties()
constants.load(it)
constants.getProperty("platformVersion").toInt()
}
}
private val factory = NodeProcess.Factory()
private val aliceConfig = NodeConfig(
legalName = CordaX500Name(organisation = "Alice Corp", locality = "Madrid", country = "ES"),
p2pPort = port.andIncrement,
rpcPort = port.andIncrement,
webPort = port.andIncrement,
isNotary = false,
users = listOf(user)
)
@Test
fun `platform version in manifest file`() {
val manifest = JarFile(factory.cordaJar.toFile()).manifest
assertThat(manifest.mainAttributes.getValue("Corda-Platform-Version").toInt()).isEqualTo(expectedPlatformVersion)
}
@Test
fun `platform version from RPC`() {
val cordappsDir = (factory.baseDirectory(aliceConfig) / NodeProcess.CORDAPPS_DIR_NAME).createDirectories()
// Find the jar file for the smoke tests of this module
val selfCordapp = Paths.get("build", "libs").list {
it.filter { "-smokeTests" in it.toString() }.toList().single()
}
selfCordapp.copyToDirectory(cordappsDir)
factory.create(aliceConfig).use { alice ->
alice.connect().use {
val rpc = it.proxy
assertThat(rpc.protocolVersion).isEqualTo(expectedPlatformVersion)
assertThat(rpc.nodeInfo().platformVersion).isEqualTo(expectedPlatformVersion)
assertThat(rpc.startFlow(NodeVersioningTest::GetPlatformVersionFlow).returnValue.getOrThrow()).isEqualTo(expectedPlatformVersion)
}
}
}
@StartableByRPC
class GetPlatformVersionFlow : FlowLogic<Int>() {
@Suspendable
override fun call(): Int = serviceHub.myInfo.platformVersion
}
}

View File

@ -14,6 +14,7 @@ import net.corda.core.utilities.unwrap
import net.corda.nodeapi.User
import net.corda.smoketesting.NodeConfig
import net.corda.smoketesting.NodeProcess
import net.corda.smoketesting.NodeProcess.Companion.CORDAPPS_DIR_NAME
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.nio.file.Paths
@ -22,7 +23,6 @@ import kotlin.streams.toList
class CordappSmokeTest {
private companion object {
private const val CORDAPPS_DIR_NAME = "cordapps"
val user = User("user1", "test", permissions = setOf("ALL"))
val port = AtomicInteger(15100)
}
@ -38,7 +38,6 @@ class CordappSmokeTest {
users = listOf(user)
)
@Test
fun `FlowContent appName returns the filename of the CorDapp jar`() {
val cordappsDir = (factory.baseDirectory(aliceConfig) / CORDAPPS_DIR_NAME).createDirectories()

View File

@ -38,7 +38,7 @@ public class FlowsInJavaTest {
@Test
public void suspendableActionInsideUnwrap() throws Exception {
bobNode.getInternals().registerInitiatedFlow(SendHelloAndThenReceive.class);
bobNode.registerInitiatedFlow(SendHelloAndThenReceive.class);
Future<String> result = startFlow(aliceNode.getServices(), new SendInUnwrapFlow(bob)).getResultFuture();
mockNet.runNetwork();
assertThat(result.get()).isEqualTo("Hello");

View File

@ -52,10 +52,8 @@ class AttachmentTests {
val bobNode = mockNet.createPartyNode(BOB.name)
val alice = aliceNode.info.singleIdentity()
aliceNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
aliceNode.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
// Insert an attachment into node zero's store directly.
val id = aliceNode.database.transaction {
aliceNode.attachments.importAttachment(ByteArrayInputStream(fakeAttachment()))
@ -85,10 +83,8 @@ class AttachmentTests {
fun `missing`() {
val aliceNode = mockNet.createPartyNode(ALICE.name)
val bobNode = mockNet.createPartyNode(BOB.name)
aliceNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
aliceNode.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
// Get node one to fetch a non-existent attachment.
val hash = SecureHash.randomSHA256()
val alice = aliceNode.info.singleIdentity()
@ -108,10 +104,8 @@ class AttachmentTests {
})
val bobNode = mockNet.createNode(MockNodeParameters(legalName = BOB.name))
val alice = aliceNode.services.myInfo.identityFromX500Name(ALICE_NAME)
aliceNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.internals.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
aliceNode.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
bobNode.registerInitiatedFlow(FetchAttachmentsResponse::class.java)
val attachment = fakeAttachment()
// Insert an attachment into node zero's store directly.
val id = aliceNode.database.transaction {

View File

@ -50,7 +50,7 @@ class CollectSignaturesFlowTests {
private fun registerFlowOnAllNodes(flowClass: KClass<out FlowLogic<*>>) {
listOf(aliceNode, bobNode, charlieNode).forEach {
it.internals.registerInitiatedFlow(flowClass.java)
it.registerInitiatedFlow(flowClass.java)
}
}

View File

@ -133,7 +133,7 @@ class ContractUpgradeFlowTest {
@Test
fun `2 parties contract upgrade using RPC`() {
rpcDriver(initialiseSerialization = false) {
rpcDriver {
// Create dummy contract.
val twoPartyDummyContract = DummyContract.generateInitial(0, notary, alice.ref(1), bob.ref(1))
val signedByA = aliceNode.services.signInitialTransaction(twoPartyDummyContract)

View File

@ -38,14 +38,14 @@ class NoAnswer(private val closure: () -> Unit = {}) : FlowLogic<Unit>() {
* Allows to register a flow of type [R] against an initiating flow of type [I].
*/
inline fun <I : FlowLogic<*>, reified R : FlowLogic<*>> StartedNode<*>.registerInitiatedFlow(initiatingFlowType: KClass<I>, crossinline construct: (session: FlowSession) -> R) {
internals.internalRegisterFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> construct(session) }, R::class.javaObjectType, true)
internalRegisterFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> construct(session) }, R::class.javaObjectType, true)
}
/**
* Allows to register a flow of type [Answer] against an initiating flow of type [I], returning a valure of type [R].
*/
inline fun <I : FlowLogic<*>, reified R : Any> StartedNode<*>.registerAnswer(initiatingFlowType: KClass<I>, value: R) {
internals.internalRegisterFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> Answer(session, value) }, Answer::class.javaObjectType, true)
internalRegisterFlowFactory(initiatingFlowType.java, InitiatedFlowFactory.Core { session -> Answer(session, value) }, Answer::class.javaObjectType, true)
}
/**

View File

@ -42,8 +42,8 @@ class ResolveTransactionsFlowTest {
notaryNode = mockNet.defaultNotaryNode
megaCorpNode = mockNet.createPartyNode(MEGA_CORP.name)
miniCorpNode = mockNet.createPartyNode(MINI_CORP.name)
megaCorpNode.internals.registerInitiatedFlow(TestResponseFlow::class.java)
miniCorpNode.internals.registerInitiatedFlow(TestResponseFlow::class.java)
megaCorpNode.registerInitiatedFlow(TestResponseFlow::class.java)
miniCorpNode.registerInitiatedFlow(TestResponseFlow::class.java)
notary = mockNet.defaultNotaryIdentity
megaCorp = megaCorpNode.info.singleIdentity()
miniCorp = miniCorpNode.info.singleIdentity()

View File

@ -14,7 +14,6 @@ import org.junit.runners.model.Statement
import org.slf4j.Logger
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertNull
@ -23,10 +22,7 @@ private fun <T> withSingleThreadExecutor(callable: ExecutorService.() -> T) = Ex
fork {}.getOrThrow() // Start the thread.
callable()
} finally {
shutdown()
while (!awaitTermination(1, TimeUnit.SECONDS)) {
// Do nothing.
}
join()
}
}
@ -134,6 +130,7 @@ class ToggleFieldTest {
assertThatThrownBy { future.getOrThrow() }
.isInstanceOf(ThreadLeakException::class.java)
.hasMessageContaining(threadName)
.hasMessageContaining("hello")
}
}
withSingleThreadExecutor {
@ -141,9 +138,9 @@ class ToggleFieldTest {
}
}
/** We log an error rather than failing-fast as the new thread may be an undetected global. */
/** We log a warning rather than failing-fast as the new thread may be an undetected global. */
@Test
fun `leaked thread propagates holder to non-global thread, with error`() {
fun `leaked thread propagates holder to non-global thread, with warning`() {
val field = inheritableThreadLocalToggleField<String>()
field.set("hello")
withSingleThreadExecutor {
@ -153,17 +150,18 @@ class ToggleFieldTest {
val leakedThreadName = Thread.currentThread().name
verifyNoMoreInteractions(log)
withSingleThreadExecutor {
// If ThreadLeakException is seen in practice, these errors form a trail of where the holder has been:
verify(log).error(argThat { contains(leakedThreadName) })
// If ThreadLeakException is seen in practice, these warnings form a trail of where the holder has been:
verify(log).warn(argThat { contains(leakedThreadName) && contains("hello") })
val newThreadName = fork { Thread.currentThread().name }.getOrThrow()
val future = fork(field::get)
assertThatThrownBy { future.getOrThrow() }
.isInstanceOf(ThreadLeakException::class.java)
.hasMessageContaining(newThreadName)
.hasMessageContaining("hello")
fork {
verifyNoMoreInteractions(log)
withSingleThreadExecutor {
verify(log).error(argThat { contains(newThreadName) })
verify(log).warn(argThat { contains(newThreadName) && contains("hello") })
}
}.getOrThrow()
}
@ -183,7 +181,7 @@ class ToggleFieldTest {
globalThreadCreationMethod {
verifyNoMoreInteractions(log)
withSingleThreadExecutor {
verify(log).warn(argThat { contains(leakedThreadName) })
verify(log).warn(argThat { contains(leakedThreadName) && contains("hello") })
// In practice the new thread is for example a static thread we can't get rid of:
assertNull(fork(field::get).getOrThrow())
}

View File

@ -2,13 +2,13 @@ package net.corda.core.internal.concurrent
import com.nhaarman.mockito_kotlin.*
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.join
import net.corda.core.utilities.getOrThrow
import net.corda.testing.rigorousMock
import org.assertj.core.api.Assertions
import org.junit.Test
import org.slf4j.Logger
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.test.assertEquals
import kotlin.test.assertFalse
@ -108,10 +108,7 @@ class CordaFutureTest {
val throwable = Exception("Boom")
val executor = Executors.newSingleThreadExecutor()
executor.fork { throw throwable }.andForget(log)
executor.shutdown()
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
// Do nothing.
}
executor.join()
verify(log).error(any(), same(throwable))
}

View File

@ -15,7 +15,7 @@ import net.corda.core.utilities.unwrap
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.internal.StartedNode
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.utilities.currentDBSession
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.testing.ALICE_NAME
import net.corda.testing.BOB_NAME
import net.corda.testing.node.MockNetwork
@ -148,7 +148,7 @@ class AttachmentSerializationTest {
}
private fun launchFlow(clientLogic: ClientLogic, rounds: Int, sendData: Boolean = false) {
server.internals.internalRegisterFlowFactory(
server.internalRegisterFlowFactory(
ClientLogic::class.java,
InitiatedFlowFactory.Core { ServerLogic(it, sendData) },
ServerLogic::class.java,

View File

@ -6,6 +6,9 @@ from the previous milestone release.
UNRELEASED
----------
* Removed confusing property database.initDatabase, enabling its guarded behaviour with the dev-mode.
In devMode Hibernate will try to create or update database schemas, otherwise it will expect relevant schemas to be present
in the database (pre configured via DDL scripts or equivalent), and validate these are correct.
* ``AttachmentStorage`` now allows providing metadata on attachments upload - username and filename, currently as plain
strings. Those can be then used for querying, utilizing ``queryAttachments`` method of the same interface.

View File

@ -70,7 +70,6 @@ path to the node's base directory.
:database: Database configuration:
:initDatabase: Boolean on whether to initialise the database or just validate the schema. Defaults to true.
:serverNameTablePrefix: Prefix string to apply to all the database tables. The default is no prefix.
:transactionIsolationLevel: Transaction isolation level as defined by the ``TRANSACTION_`` constants in
``java.sql.Connection``, but without the "TRANSACTION_" prefix. Defaults to REPEATABLE_READ.
@ -103,24 +102,26 @@ path to the node's base directory.
:notary: Optional configuration object which if present configures the node to run as a notary. If part of a Raft or BFT SMaRt
cluster then specify ``raft`` or ``bftSMaRt`` respectively as described below. If a single node notary then omit both.
:validating: Boolean to determine whether the notary is a validating or non-validating one.
:validating: Boolean to determine whether the notary is a validating or non-validating one.
:raft: If part of a distributed Raft cluster specify this config object, with the following settings:
:raft: If part of a distributed Raft cluster specify this config object, with the following settings:
:nodeAddress: The host and port to which to bind the embedded Raft server. Note that the Raft cluster uses a
separate transport layer for communication that does not integrate with ArtemisMQ messaging services.
:nodeAddress: The host and port to which to bind the embedded Raft server. Note that the Raft cluster uses a
separate transport layer for communication that does not integrate with ArtemisMQ messaging services.
:clusterAddresses: List of Raft cluster member addresses used to join the cluster. At least one of the specified
members must be active and be able to communicate with the cluster leader for joining. If empty, a new
cluster will be bootstrapped.
:clusterAddresses: Must list the addresses of all the members in the cluster. At least one of the members must
be active and be able to communicate with the cluster leader for the node to join the cluster. If empty, a
new cluster will be bootstrapped.
:bftSMaRt: If part of a distributed BFT-SMaRt cluster specify this config object, with the following settings:
:bftSMaRt: If part of a distributed BFT-SMaRt cluster specify this config object, with the following settings:
:replicaId: The zero-based index of the current replica. All replicas must specify a unique replica id.
:replicaId: The zero-based index of the current replica. All replicas must specify a unique replica id.
:clusterAddresses: List of all BFT-SMaRt cluster member addresses.
:clusterAddresses: Must list the addresses of all the members in the cluster. At least one of the members must
be active and be able to communicate with the cluster leader for the node to join the cluster. If empty, a
new cluster will be bootstrapped.
:custom: If `true`, will load and install a notary service from a CorDapp. See :doc:`tutorial-custom-notary`.
:custom: If `true`, will load and install a notary service from a CorDapp. See :doc:`tutorial-custom-notary`.
Only one of ``raft``, ``bftSMaRt`` or ``custom`` configuration values may be specified.
@ -131,17 +132,19 @@ path to the node's base directory.
:rpcUsers: A list of users who are authorised to access the RPC system. Each user in the list is a config object with the
following fields:
:username: Username consisting only of word characters (a-z, A-Z, 0-9 and _)
:password: The password
:permissions: A list of permission strings which RPC methods can use to control access
If this field is absent or an empty list then RPC is effectively locked down. Alternatively, if it contains the string
``ALL`` then the user is permitted to use *any* RPC method. This value is intended for administrator users and for developers.
:username: Username consisting only of word characters (a-z, A-Z, 0-9 and _)
:password: The password
:permissions: A list of permissions for starting flows via RPC. To give the user the permission to start the flow
``foo.bar.FlowClass``, add the string ``StartFlow.foo.bar.FlowClass`` to the list. If the list
contains the string ``ALL``, the user can start any flow via RPC. This value is intended for administrator
users and for development.
:devMode: This flag sets the node to run in development mode. On startup, if the keystore ``<workspace>/certificates/sslkeystore.jks``
does not exist, a developer keystore will be used if ``devMode`` is true. The node will exit if ``devMode`` is false
and the keystore does not exist. ``devMode`` also turns on background checking of flow checkpoints to shake out any
bugs in the checkpointing process.
bugs in the checkpointing process. Also, if ``devMode`` is true, Hibernate will try to automatically create the schema required by Corda
or update an existing schema in the SQL database; if ``devMode`` is false, Hibernate will simply validate an existing schema
failing on node start if this schema is either not present or not compatible.
:detectPublicIp: This flag toggles the auto IP detection behaviour, it is enabled by default. On startup the node will
attempt to discover its externally visible IP address first by looking for any public addresses on its network
@ -163,4 +166,4 @@ path to the node's base directory.
:sshd: If provided, node will start internal SSH server which will provide a management shell. It uses the same credentials
and permissions as RPC subsystem. It has one required parameter.
:port: - the port to start SSH server on
:port: The port to start SSH server on

View File

@ -4,8 +4,9 @@ Corda nodes
.. toctree::
:maxdepth: 1
deploying-a-node
generating-a-node
running-a-node
deploying-a-node
corda-configuration-file
clientrpc
shell

View File

@ -118,9 +118,9 @@ Creating the CorDapp JAR
The gradle ``jar`` task included in the CorDapp template build file will automatically build your CorDapp JAR correctly
as long as your dependencies are set correctly.
Note that the hash of the resulting CorDapp JAR is not deterministic, as it depends on variables such as the timestamp
at creation. Nodes running the same CorDapp must therefore ensure they are using the exact same CorDapp jar, and not
different versions of the JAR created from identical sources.
.. warning:: The hash of the generated CorDapp JAR is not deterministic, as it depends on variables such as the
timestamp at creation. Nodes running the same CorDapp must therefore ensure they are using the exact same CorDapp
jar, and not different versions of the JAR created from identical sources.
The filename of the JAR must include a unique identifier to deduplicate it from other releases of the same CorDapp.
This is typically done by appending the version string to the CorDapp's name. This unique identifier should not change
@ -131,7 +131,7 @@ Installing the CorDapp jar
--------------------------
.. note:: Before installing a CorDapp, you must create one or more nodes to install it on. For instructions, please see
:doc:`deploying-a-node`.
:doc:`generating-a-node`.
At runtime, nodes will load any CorDapps present in their ``cordapps`` folder. Therefore in order to install a CorDapp on
a node, the CorDapp JAR must be added to the ``<node_dir>/cordapps/`` folder, where ``node_dir`` is the folder in which

View File

@ -1,141 +1,242 @@
Deploying a node
================
Node structure
--------------
Each Corda node has the following structure:
.. contents::
.. sourcecode:: none
.. note:: These instructions are intended for people who want to deploy a Corda node to a server,
whether they have developed and tested a CorDapp following the instructions in :doc:`generating-a-node`
or are deploying a third-party CorDapp.
.
├── certificates // The node's doorman certificates
├── corda-webserver.jar // The built-in node webserver
├── corda.jar // The core Corda libraries
├── logs // The node logs
├── node.conf // The node's configuration files
├── persistence.mv.db // The node's database
└── cordapps // The CorDapps jars installed on the node
Linux (systemd): Installing and running Corda as a systemd service
------------------------------------------------------------------
We recommend creating systemd services to run a node and the optional webserver. This provides logging and service
handling, and ensures the Corda service is run at boot.
The node is configured by editing its ``node.conf`` file. You install CorDapps on the node by dropping the CorDapp JARs
into the ``cordapps`` folder.
**Prerequisites**:
Node naming
-----------
A node's name must be a valid X500 name that obeys the following additional constraints:
* Oracle Java 8. The supported versions are listed in :doc:`getting-set-up`
* The fields of the name have the following maximum character lengths:
1. Add a system user which will be used to run Corda:
* Common name: 64
* Organisation: 128
* Organisation unit: 64
* Locality: 64
* State: 64
``sudo adduser --system --no-create-home --group corda``
* The country code is a valid ISO 3166-1 two letter code in upper-case
2. Create a directory called ``/opt/corda`` and change its ownership to the user you want to use to run Corda:
* The organisation, locality and country attributes are present
``mkdir /opt/corda; chown corda:corda /opt/corda``
* The organisation field of the name obeys the following constraints:
3. Download the `Corda jar <https://r3.bintray.com/corda/net/corda/corda/>`_
(under ``/VERSION_NUMBER/corda-VERSION_NUMBER.jar``) and place it in ``/opt/corda``
* Has at least two letters
* No leading or trailing whitespace
* No double-spacing
* Upper-case first letter
* Does not contain the words "node" or "server"
* Does not include the characters ',' or '=' or '$' or '"' or '\'' or '\\'
* Is in NFKC normalization form
* Only the latin, common and inherited unicode scripts are supported
3. Create a directory called ``plugins`` in ``/opt/corda`` and save your CorDapp jar file to it. Alternatively, download one of
our `sample CorDapps <https://www.corda.net/samples/>`_ to the ``plugins`` directory
The deployNodes task
--------------------
The CorDapp template defines a ``deployNodes`` task that allows you to automatically generate and configure a set of
nodes:
4. Save the below as ``/opt/corda/node.conf``. See :doc:`corda-configuration-file` for a description of these options
.. sourcecode:: groovy
.. code-block:: json
task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar']) {
directory "./build/nodes"
networkMap "O=Controller,L=London,C=GB"
node {
name "O=Controller,L=London,C=GB"
// The notary will offer a validating notary service.
notary = [validating : true]
p2pPort 10002
rpcPort 10003
// No webport property, so no webserver will be created.
h2Port 10004
sshdPort 22
// Includes the corda-finance CorDapp on our node.
cordapps = ["net.corda:corda-finance:$corda_release_version"]
basedir : "/opt/corda"
p2pAddress : "example.com:10002"
rpcAddress : "example.com:10003"
webAddress : "0.0.0.0:10004"
h2port : 11000
emailAddress : "you@example.com"
myLegalName : "O=Bank of Breakfast Tea, L=London, C=GB"
keyStorePassword : "cordacadevpass"
trustStorePassword : "trustpass"
useHTTPS : false
devMode : false
networkMapService {
address="networkmap.foo.bar.com:10002"
legalName="O=FooBar NetworkMap, L=Dublin, C=IE"
}
rpcUsers=[
{
user=corda
password=portal_password
permissions=[
ALL
]
}
]
5. Make the following changes to ``/opt/corda/node.conf``:
* Change the ``p2pAddress`` and ``rpcAddress`` values to start with your server's hostname or external IP address.
This is the address other nodes or RPC interfaces will use to communicate with your node
* Change the ports if necessary, for example if you are running multiple nodes on one server (see below)
* Enter an email address which will be used as an administrative contact during the registration process. This is
only visible to the permissioning service
* Enter your node's desired legal name. This will be used during the issuance of your certificate and should rarely
change as it should represent the legal identity of your node
* Organization (``O=``) should be a unique and meaningful identifier (e.g. Bank of Breakfast Tea)
* Location (``L=``) is your nearest city
* Country (``C=``) is the `ISO 3166-1 alpha-2 code <https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2>`_
* Change the RPC username and password
6. Create a ``corda.service`` file based on the example below and save it in the ``/etc/systemd/system/`` directory
.. code-block:: shell
[Unit]
Description=Corda Node - Bank of Breakfast Tea
Requires=network.target
[Service]
Type=simple
User=corda
WorkingDirectory=/opt/corda
ExecStart=/usr/bin/java -Xmx2048m -jar /opt/corda/corda.jar
Restart=on-failure
[Install]
WantedBy=multi-user.target
7. Make the following changes to ``corda.service``:
* Make sure the service description is informative - particularly if you plan to run multiple nodes.
* Change the username to the user account you want to use to run Corda. **We recommend that this is not root**
* Set the maximum amount of memory available to the Corda process by changing the ``-Xmx2048m`` parameter
* Make sure the ``corda.service`` file is owned by root with the correct permissions:
* ``sudo chown root:root /etc/systemd/system/corda.service``
* ``sudo chmod 644 /etc/systemd/system/corda.service``
.. note:: The Corda webserver provides a simple interface for interacting with your installed CorDapps in a browser.
Running the webserver is optional.
8. Create a ``corda-webserver.service`` file based on the example below and save it in the ``/etc/systemd/system/``
directory.
.. code-block:: shell
[Unit]
Description=Webserver for Corda Node - Bank of Breakfast Tea
Requires=network.target
[Service]
Type=simple
User=username
WorkingDirectory=/opt/corda
ExecStart=/usr/bin/java -jar /opt/corda/corda-webserver.jar
Restart=on-failure
[Install]
WantedBy=multi-user.target
9. Provision the required certificates to your node. Contact the network permissioning service or see
:doc:`permissioning`
10. You can now start a node and its webserver by running the following ``systemctl`` commands:
* ``sudo systemctl daemon-reload``
* ``sudo systemctl corda start``
* ``sudo systemctl corda-webserver start``
You can run multiple nodes by creating multiple directories and Corda services, modifying the ``node.conf`` and
``service`` files so they are unique.
Windows: Installing and running Corda as a Windows service
----------------------------------------------------------
We recommend running Corda as a Windows service. This provides service handling, ensures the Corda service is run
at boot, and means the Corda service stays running with no users connected to the server.
**Prerequisites**:
* Oracle Java 8. The supported versions are listed in :doc:`getting-set-up`
1. Create a Corda directory and download the Corda jar. Replace ``VERSION_NUMBER`` with the desired version. Here's an
example using PowerShell:
.. code-block:: PowerShell
mkdir C:\Corda
wget http://jcenter.bintray.com/net/corda/corda/VERSION_NUMBER/corda-VERSION_NUMBER.jar -OutFile C:\Corda\corda.jar
2. Create a directory called ``plugins`` in ``/opt/corda`` and save your CorDapp jar file to it. Alternatively,
download one of our `sample CorDapps <https://www.corda.net/samples/>`_ to the ``plugins`` directory
3. Save the below as ``C:\Corda\node.conf``. See :doc:`corda-configuration-file` for a description of these options
.. code-block:: json
basedir : "C:\\Corda"
p2pAddress : "example.com:10002"
rpcAddress : "example.com:10003"
webAddress : "0.0.0.0:10004"
h2port : 11000
emailAddress: "you@example.com"
myLegalName : "O=Bank of Breakfast Tea, L=London, C=GB"
keyStorePassword : "cordacadevpass"
trustStorePassword : "trustpass"
extraAdvertisedServiceIds: [ "" ]
useHTTPS : false
devMode : false
networkMapService {
address="networkmap.foo.bar.com:10002"
legalName="O=FooBar NetworkMap, L=Dublin, C=IE"
}
node {
name "O=PartyA,L=London,C=GB"
advertisedServices = []
p2pPort 10005
rpcPort 10006
webPort 10007
h2Port 10008
sshdPort 22
cordapps = ["net.corda:corda-finance:$corda_release_version"]
// Grants user1 all RPC permissions.
rpcUsers = [[ user: "user1", "password": "test", "permissions": ["ALL"]]]
}
node {
name "O=PartyB,L=New York,C=US"
advertisedServices = []
p2pPort 10009
rpcPort 10010
webPort 10011
h2Port 10012
sshdPort 22
cordapps = ["net.corda:corda-finance:$corda_release_version"]
// Grants user1 the ability to start the MyFlow flow.
rpcUsers = [[ user: "user1", "password": "test", "permissions": ["StartFlow.net.corda.flows.MyFlow"]]]
}
}
rpcUsers=[
{
user=corda
password=portal_password
permissions=[
ALL
]
}
]
Running this task will create three nodes in the ``build/nodes`` folder:
4. Make the following changes to ``C:\Corda\node.conf``:
* A ``Controller`` node that:
* Change the ``p2pAddress`` and ``rpcAddress`` values to start with your server's hostname or external IP address.
This is the address other nodes or RPC interfaces will use to communicate with your node
* Change the ports if necessary, for example if you are running multiple nodes on one server (see below)
* Enter an email address which will be used as an administrative contact during the registration process. This is
only visible to the permissioning service
* Enter your node's desired legal name. This will be used during the issuance of your certificate and should rarely
change as it should represent the legal identity of your node
* Serves as the network map
* Offers a validating notary service
* Will not have a webserver (since ``webPort`` is not defined)
* Is running the ``corda-finance`` CorDapp
* Organization (``O=``) should be a unique and meaningful identifier (e.g. Bank of Breakfast Tea)
* Location (``L=``) is your nearest city
* Country (``C=``) is the `ISO 3166-1 alpha-2 code <https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2>`_
* Change the RPC username and password
* ``PartyA`` and ``PartyB`` nodes that:
5. Copy the required Java keystores to the node. See :doc:`permissioning`
* Are pointing at the ``Controller`` as the network map service
* Are not offering any services
* Will have a webserver (since ``webPort`` is defined)
* Are running the ``corda-finance`` CorDapp
* Have an RPC user, ``user1``, that can be used to log into the node via RPC
6. Download the `NSSM service manager <nssm.cc>`_
Additionally, all three nodes will include any CorDapps defined in the project's source folders, even though these
CorDapps are not listed in each node's ``cordapps`` entry. This means that running the ``deployNodes`` task from the
template CorDapp, for example, would automatically build and add the template CorDapp to each node.
7. Unzip ``nssm-2.24\win64\nssm.exe`` to ``C:\Corda``
You can extend ``deployNodes`` to generate additional nodes. The only requirement is that you must specify
a single node to run the network map service, by putting their name in the ``networkMap`` field.
8. Save the following as ``C:\Corda\nssm.bat``:
.. warning:: When adding nodes, make sure that there are no port clashes!
.. code-block:: batch
Running deployNodes
-------------------
To create the nodes defined in our ``deployNodes`` task, we'd run the following command in a terminal window from the
root of the project:
nssm install cordanode1 C:\ProgramData\Oracle\Java\javapath\java.exe
nssm set cordanode1 AppDirectory C:\Corda
nssm set cordanode1 AppParameters "-jar corda.jar -Xmx2048m --config-file=C:\corda\node.conf"
nssm set cordanode1 AppStdout C:\Corda\service.log
nssm set cordanode1 AppStderr C:\Corda\service.log
nssm set cordanode1 Description Corda Node - Bank of Breakfast Tea
sc start cordanode1
* Unix/Mac OSX: ``./gradlew deployNodes``
* Windows: ``gradlew.bat deployNodes``
9. Modify the batch file:
This will create the nodes in the ``build/nodes`` folder.
* If you are installing multiple nodes, use a different service name (``cordanode1``) for each node
* Set the amount of Java heap memory available to this node by modifying the -Xmx argument
* Set an informative description
.. note:: During the build process each node generates a NodeInfo file which is written in its own root directory,
the plug-in proceeds and copies each node NodeInfo to every other node ``additional-node-infos`` directory.
The NodeInfo file contains a node hostname and port, legal name and security certificate.
10. Run the batch file by clicking on it or from a command prompt
There will be a node folder generated for each node you defined, plus a ``runnodes`` shell script (or batch file on
Windows) to run all the nodes at once. If you make any changes to your ``deployNodes`` task, you will need to re-run
the task to see the changes take effect.
11. Run ``services.msc`` and verify that a service called ``cordanode1`` is present and running
You can now run the nodes by following the instructions in :doc:`Running a node <running-a-node>`.
12. Run ``netstat -ano`` and check for the ports you configured in ``node.conf``
13. You may need to open the ports on the Windows firewall
Testing your installation
-------------------------
You can verify Corda is running by connecting to your RPC port from another host, e.g.:
``telnet your-hostname.example.com 10002``
If you receive the message "Escape character is ^]", Corda is running and accessible. Press Ctrl-] and Ctrl-D to exit
telnet.

View File

@ -28,7 +28,7 @@ class CustomVaultQueryTest {
mockNet = MockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.finance", "net.corda.docs"))
nodeA = mockNet.createPartyNode()
nodeB = mockNet.createPartyNode()
nodeA.internals.registerInitiatedFlow(TopupIssuerFlow.TopupIssuer::class.java)
nodeA.registerInitiatedFlow(TopupIssuerFlow.TopupIssuer::class.java)
notary = mockNet.defaultNotaryIdentity
}

View File

@ -27,7 +27,7 @@ class FxTransactionBuildTutorialTest {
mockNet = MockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.finance"))
nodeA = mockNet.createPartyNode()
nodeB = mockNet.createPartyNode()
nodeB.internals.registerInitiatedFlow(ForeignExchangeRemoteFlow::class.java)
nodeB.registerInitiatedFlow(ForeignExchangeRemoteFlow::class.java)
notary = mockNet.defaultNotaryIdentity
}

View File

@ -35,7 +35,7 @@ class WorkflowTransactionBuildTutorialTest {
mockNet = MockNetwork(threadPerNode = true, cordappPackages = listOf("net.corda.docs"))
val aliceNode = mockNet.createPartyNode(ALICE_NAME)
val bobNode = mockNet.createPartyNode(BOB_NAME)
aliceNode.internals.registerInitiatedFlow(RecordCompletionFlow::class.java)
aliceNode.registerInitiatedFlow(RecordCompletionFlow::class.java)
aliceServices = aliceNode.services
bobServices = bobNode.services
alice = aliceNode.services.myInfo.identityFromX500Name(ALICE_NAME)

View File

@ -0,0 +1,136 @@
Creating nodes locally
======================
.. contents::
Node structure
--------------
Each Corda node has the following structure:
.. sourcecode:: none
.
├── certificates // The node's certificates
├── corda-webserver.jar // The built-in node webserver
├── corda.jar // The core Corda libraries
├── logs // The node logs
├── node.conf // The node's configuration files
├── persistence.mv.db // The node's database
└── cordapps // The CorDapps jars installed on the node
The node is configured by editing its ``node.conf`` file. You install CorDapps on the node by dropping the CorDapp JARs
into the ``cordapps`` folder.
Node naming
-----------
A node's name must be a valid X.500 name that obeys the following additional constraints:
* The fields of the name have the following maximum character lengths:
* Common name: 64
* Organisation: 128
* Organisation unit: 64
* Locality: 64
* State: 64
* The country code is a valid ISO 3166-1 two letter code in upper-case
* The organisation, locality and country attributes are present
* The organisation field of the name obeys the following constraints:
* Has at least two letters
* No leading or trailing whitespace
* No double-spacing
* Upper-case first letter
* Does not contain the words "node" or "server"
* Does not include the characters ',' or '=' or '$' or '"' or '\'' or '\\'
* Is in NFKC normalization form
* Only the latin, common and inherited unicode scripts are supported
The Cordform task
-----------------
Corda provides a gradle plugin called ``Cordform`` that allows you to automatically generate and configure a set of
nodes. Here is an example ``Cordform`` task called ``deployNodes`` that creates three nodes, defined in the
`Kotlin CorDapp Template <https://github.com/corda/cordapp-template-kotlin/blob/release-V2/build.gradle#L97>`_:
.. sourcecode:: groovy
task deployNodes(type: net.corda.plugins.Cordform, dependsOn: ['jar']) {
directory "./build/nodes"
networkMap "O=Controller,L=London,C=GB"
node {
name "O=Controller,L=London,C=GB"
// The notary will offer a validating notary service.
notary = [validating : true]
p2pPort 10002
rpcPort 10003
// No webport property, so no webserver will be created.
h2Port 10004
// Includes the corda-finance CorDapp on our node.
cordapps = ["net.corda:corda-finance:$corda_release_version"]
}
node {
name "O=PartyA,L=London,C=GB"
advertisedServices = []
p2pPort 10005
rpcPort 10006
webPort 10007
h2Port 10008
cordapps = ["net.corda:corda-finance:$corda_release_version"]
// Grants user1 all RPC permissions.
rpcUsers = [[ user: "user1", "password": "test", "permissions": ["ALL"]]]
}
node {
name "O=PartyB,L=New York,C=US"
advertisedServices = []
p2pPort 10009
rpcPort 10010
webPort 10011
h2Port 10012
cordapps = ["net.corda:corda-finance:$corda_release_version"]
// Grants user1 the ability to start the MyFlow flow.
rpcUsers = [[ user: "user1", "password": "test", "permissions": ["StartFlow.net.corda.flows.MyFlow"]]]
}
}
Running this task will create three nodes in the ``build/nodes`` folder:
* A ``Controller`` node that:
* Serves as the network map
* Offers a validating notary service
* Will not have a webserver (since ``webPort`` is not defined)
* Is running the ``corda-finance`` CorDapp
* ``PartyA`` and ``PartyB`` nodes that:
* Are pointing at the ``Controller`` as the network map service
* Are not offering any services
* Will have a webserver (since ``webPort`` is defined)
* Are running the ``corda-finance`` CorDapp
* Have an RPC user, ``user1``, that can be used to log into the node via RPC
Additionally, all three nodes will include any CorDapps defined in the project's source folders, even though these
CorDapps are not listed in each node's ``cordapps`` entry. This means that running the ``deployNodes`` task from the
template CorDapp, for example, would automatically build and add the template CorDapp to each node.
You can extend ``deployNodes`` to generate additional nodes. The only requirement is that you must specify
a single node to run the network map service, by putting its name in the ``networkMap`` field.
.. warning:: When adding nodes, make sure that there are no port clashes!
Running deployNodes
-------------------
To create the nodes defined in our ``deployNodes`` task, run the following command in a terminal window from the root
of the project where the ``deployNodes`` task is defined:
* Linux/macOS: ``./gradlew deployNodes``
* Windows: ``gradlew.bat deployNodes``
This will create the nodes in the ``build/nodes`` folder. There will be a node folder generated for each node defined
in the ``deployNodes`` task, plus a ``runnodes`` shell script (or batch file on Windows) to run all the nodes at once
for testing and development purposes. If you make any changes to your CorDapp source or ``deployNodes`` task, you will
need to re-run the task to see the changes take effect.
You can now run the nodes by following the instructions in :doc:`Running a node <running-a-node>`.

View File

@ -1,25 +1,49 @@
Running a node
==============
Running nodes locally
=====================
Starting your node
------------------
After following the steps in :doc:`deploying-a-node`, you should have deployed your node(s) with any chosen CorDapps
already installed. You run each node by navigating to ``<node_dir>`` in a terminal window and running:
.. contents::
.. note:: You should already have generated your node(s) with their CorDapps installed by following the instructions in
:doc:`generating-a-node`.
There are several ways to run a Corda node locally for testing purposes.
Starting all nodes at once
--------------------------
.. note:: ``runnodes`` is a shell script (or batch file on Windows) that is generated by ``deployNodes`` to allow you
to quickly start up all nodes and their webservers. ``runnodes`` should only be used for testing purposes.
Start the nodes with ``runnodes`` by running the following command from the root of the project:
* Linux/macOS: ``build/nodes/runnodes``
* Windows: ``call build\nodes\runnodes.bat``
.. warn:: On macOS, do not click/change focus until all the node terminal windows have opened, or some processes may
fail to start.
Starting an individual Corda node
---------------------------------
Run the node by opening a terminal window in the node's folder and running:
.. code-block:: shell
java -jar corda.jar
.. warning:: If your working directory is not ``<node_dir>`` your cordapps and configuration will not be used.
.. warning:: By default, the node will look for a configuration file called ``node.conf`` and a CorDapps folder called
``cordapps`` in the current working directory. You can override the configuration file and workspace paths on the
command line (e.g. ``./corda.jar --config-file=test.conf --base-directory=/opt/r3corda/nodes/test``).
The configuration file and workspace paths can be overridden on the command line. For example:
Optionally run the node's webserver as well by opening a terminal window in the node's folder and running:
``./corda.jar --config-file=test.conf --base-directory=/opt/r3corda/nodes/test``.
.. code-block:: shell
Otherwise the workspace folder for the node is the current working path.
java -jar corda-webserver.jar
Debugging your node
-------------------
.. warning:: The node webserver is for testing purposes only and will be removed soon.
Starting a node with remote debugging enabled
---------------------------------------------
To enable remote debugging of the node, run the following from the terminal window:
``java -Dcapsule.jvm.args="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005" -jar corda.jar``

View File

@ -137,7 +137,7 @@ which could be represented as ``{ first: foo, second: 123 }``.
.. note:: If your CorDapp is written in Java,
named arguments won't work unless you compiled using the ``-parameters`` argument to javac.
See :doc:`deploying-a-node` for how to specify it via Gradle.
See :doc:`generating-a-node` for how to specify it via Gradle.
The same syntax is also used to specify the parameters for RPCs, accessed via the ``run`` command, like this:

View File

@ -26,7 +26,8 @@ for gradle and IntelliJ, but it's possible this option is not present in your en
"No matching constructor found: - [arg0: int, arg1: Party]: missing parameter arg0"
***********************************************************************************
Your CorDapp is written in Java and you haven't specified the ``-parameters`` compiler argument. See :doc:`deploying-a-node` for how it can be done using Gradle.
Your CorDapp is written in Java and you haven't specified the ``-parameters`` compiler argument. See
:doc:`generating-a-node` for how it can be done using Gradle.
IDEA issues
-----------

View File

@ -176,7 +176,7 @@ There are two ways to run the example CorDapp:
* Via IntelliJ
In both cases, we will deploy a set of test nodes with our CorDapp installed, then run the nodes. You can read more
about how we define the nodes to be deployed :doc:`here <deploying-a-node>`.
about how we define the nodes to be deployed :doc:`here <generating-a-node>`.
Terminal
~~~~~~~~

View File

@ -13,7 +13,7 @@ import net.corda.finance.DOLLARS
import net.corda.finance.`issued by`
import net.corda.finance.contracts.asset.*
import net.corda.testing.*
import net.corda.testing.contracts.fillWithSomeTestCash
import net.corda.testing.contracts.VaultFiller
import net.corda.testing.node.MockServices
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseAndMockServices
import org.junit.Ignore
@ -240,7 +240,7 @@ class CommercialPaperTestsGeneric {
aliceVaultService = aliceServices.vaultService
databaseAlice.transaction {
alicesVault = aliceServices.fillWithSomeTestCash(9000.DOLLARS, issuerServices, atLeastThisManyStates = 1, atMostThisManyStates = 1, issuedBy = DUMMY_CASH_ISSUER)
alicesVault = VaultFiller(aliceServices, DUMMY_NOTARY, DUMMY_NOTARY_KEY, rngFactory = ::Random).fillWithSomeTestCash(9000.DOLLARS, issuerServices, 1, DUMMY_CASH_ISSUER)
aliceVaultService = aliceServices.vaultService
}
@ -250,7 +250,7 @@ class CommercialPaperTestsGeneric {
bigCorpVaultService = bigCorpServices.vaultService
databaseBigCorp.transaction {
bigCorpVault = bigCorpServices.fillWithSomeTestCash(13000.DOLLARS, issuerServices, atLeastThisManyStates = 1, atMostThisManyStates = 1, issuedBy = DUMMY_CASH_ISSUER)
bigCorpVault = VaultFiller(bigCorpServices, DUMMY_NOTARY, DUMMY_NOTARY_KEY, rngFactory = ::Random).fillWithSomeTestCash(13000.DOLLARS, issuerServices, 1, DUMMY_CASH_ISSUER)
bigCorpVaultService = bigCorpServices.vaultService
}

View File

@ -3,7 +3,10 @@ package net.corda.finance.contracts.asset
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.generateKeyPair
import net.corda.core.identity.*
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.VaultService
import net.corda.core.node.services.queryBy
@ -16,10 +19,10 @@ import net.corda.finance.utils.sumCashBy
import net.corda.finance.utils.sumCashOrNull
import net.corda.finance.utils.sumCashOrZero
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.*
import net.corda.testing.contracts.DummyState
import net.corda.testing.contracts.fillWithSomeTestCash
import net.corda.testing.contracts.VaultFiller
import net.corda.testing.node.MockServices
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseAndMockServices
import org.junit.After
@ -81,15 +84,12 @@ class CashTests {
}
// Create some cash. Any attempt to spend >$500 will require multiple issuers to be involved.
database.transaction {
ourServices.fillWithSomeTestCash(howMuch = 100.DOLLARS, atLeastThisManyStates = 1, atMostThisManyStates = 1,
owner = ourIdentity, issuedBy = MEGA_CORP.ref(1), issuerServices = megaCorpServices)
ourServices.fillWithSomeTestCash(howMuch = 400.DOLLARS, atLeastThisManyStates = 1, atMostThisManyStates = 1,
owner = ourIdentity, issuedBy = MEGA_CORP.ref(1), issuerServices = megaCorpServices)
ourServices.fillWithSomeTestCash(howMuch = 80.DOLLARS, atLeastThisManyStates = 1, atMostThisManyStates = 1,
owner = ourIdentity, issuedBy = MINI_CORP.ref(1), issuerServices = miniCorpServices)
ourServices.fillWithSomeTestCash(howMuch = 80.SWISS_FRANCS, atLeastThisManyStates = 1, atMostThisManyStates = 1,
owner = ourIdentity, issuedBy = MINI_CORP.ref(1), issuerServices = miniCorpServices)
database.transaction {
val vaultFiller = VaultFiller(ourServices, DUMMY_NOTARY, DUMMY_NOTARY_KEY, rngFactory = ::Random)
vaultFiller.fillWithSomeTestCash(100.DOLLARS, megaCorpServices, 1, MEGA_CORP.ref(1), ourIdentity)
vaultFiller.fillWithSomeTestCash(400.DOLLARS, megaCorpServices, 1, MEGA_CORP.ref(1), ourIdentity)
vaultFiller.fillWithSomeTestCash(80.DOLLARS, miniCorpServices, 1, MINI_CORP.ref(1), ourIdentity)
vaultFiller.fillWithSomeTestCash(80.SWISS_FRANCS, miniCorpServices, 1, MINI_CORP.ref(1), ourIdentity)
}
database.transaction {
vaultStatesUnconsumed = ourServices.vaultService.queryBy<Cash.State>().states

View File

@ -10,6 +10,7 @@ buildscript {
ext.gradle_plugins_version = constants.getProperty("gradlePluginsVersion")
ext.bouncycastle_version = constants.getProperty("bouncycastleVersion")
ext.typesafe_config_version = constants.getProperty("typesafeConfigVersion")
ext.jsr305_version = constants.getProperty("jsr305Version")
ext.kotlin_version = constants.getProperty("kotlinVersion")
repositories {

View File

@ -11,6 +11,9 @@ version gradle_plugins_version
group 'net.corda.plugins'
dependencies {
// JSR 305: Nullability annotations
compile "com.google.code.findbugs:jsr305:$jsr305_version"
// TypeSafe Config: for simple and human friendly config files.
compile "com.typesafe:config:$typesafe_config_version"

View File

@ -1,24 +1,40 @@
package net.corda.cordform;
import javax.annotation.Nonnull;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
public abstract class CordformDefinition {
public final Path driverDirectory;
public final ArrayList<Consumer<? super CordformNode>> nodeConfigurers = new ArrayList<>();
private Path nodesDirectory = Paths.get("build", "nodes");
private final List<Consumer<CordformNode>> nodeConfigurers = new ArrayList<>();
private final List<String> cordappPackages = new ArrayList<>();
public CordformDefinition(Path driverDirectory) {
this.driverDirectory = driverDirectory;
public Path getNodesDirectory() {
return nodesDirectory;
}
public void addNode(Consumer<? super CordformNode> configurer) {
public void setNodesDirectory(Path nodesDirectory) {
this.nodesDirectory = nodesDirectory;
}
public List<Consumer<CordformNode>> getNodeConfigurers() {
return nodeConfigurers;
}
public void addNode(Consumer<CordformNode> configurer) {
nodeConfigurers.add(configurer);
}
public List<String> getCordappPackages() {
return cordappPackages;
}
/**
* Make arbitrary changes to the node directories before they are started.
* @param context Lookup of node directory by node name.
*/
public abstract void setup(CordformContext context);
public abstract void setup(@Nonnull CordformContext context);
}

View File

@ -2,6 +2,9 @@ package net.corda.cordform;
import static java.util.Collections.emptyList;
import com.typesafe.config.*;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -54,7 +57,7 @@ public class CordformNode implements NodeDefinition {
*/
public void name(String name) {
this.name = name;
config = config.withValue("myLegalName", ConfigValueFactory.fromAnyRef(name));
setValue("myLegalName", name);
}
/**
@ -62,6 +65,7 @@ public class CordformNode implements NodeDefinition {
*
* @return This node's P2P address.
*/
@Nonnull
public String getP2pAddress() {
return config.getString("p2pAddress");
}
@ -71,8 +75,8 @@ public class CordformNode implements NodeDefinition {
*
* @param p2pPort The Artemis messaging queue port.
*/
public void p2pPort(Integer p2pPort) {
config = config.withValue("p2pAddress", ConfigValueFactory.fromAnyRef(DEFAULT_HOST + ':' + p2pPort));
public void p2pPort(int p2pPort) {
p2pAddress(DEFAULT_HOST + ':' + p2pPort);
}
/**
@ -81,7 +85,15 @@ public class CordformNode implements NodeDefinition {
* @param p2pAddress The Artemis messaging queue host and port.
*/
public void p2pAddress(String p2pAddress) {
config = config.withValue("p2pAddress", ConfigValueFactory.fromAnyRef(p2pAddress));
setValue("p2pAddress", p2pAddress);
}
/**
* Returns the RPC address for this node, or null if one hasn't been specified.
*/
@Nullable
public String getRpcAddress() {
return getOptionalString("rpcAddress");
}
/**
@ -89,8 +101,8 @@ public class CordformNode implements NodeDefinition {
*
* @param rpcPort The Artemis RPC queue port.
*/
public void rpcPort(Integer rpcPort) {
config = config.withValue("rpcAddress", ConfigValueFactory.fromAnyRef(DEFAULT_HOST + ':' + rpcPort));
public void rpcPort(int rpcPort) {
rpcAddress(DEFAULT_HOST + ':' + rpcPort);
}
/**
@ -99,7 +111,31 @@ public class CordformNode implements NodeDefinition {
* @param rpcAddress The Artemis RPC queue host and port.
*/
public void rpcAddress(String rpcAddress) {
config = config.withValue("rpcAddress", ConfigValueFactory.fromAnyRef(rpcAddress));
setValue("rpcAddress", rpcAddress);
}
/**
* Returns the address of the web server that will connect to the node, or null if one hasn't been specified.
*/
@Nullable
public String getWebAddress() {
return getOptionalString("webAddress");
}
/**
* Configure a webserver to connect to the node via RPC. This port will specify the port it will listen on. The node
* must have an RPC address configured.
*/
public void webPort(int webPort) {
webAddress(DEFAULT_HOST + ':' + webPort);
}
/**
* Configure a webserver to connect to the node via RPC. This address will specify the port it will listen on. The node
* must have an RPC address configured.
*/
public void webAddress(String webAddress) {
setValue("webAddress", webAddress);
}
/**
@ -108,6 +144,14 @@ public class CordformNode implements NodeDefinition {
* @param configFile The file path.
*/
public void configFile(String configFile) {
config = config.withValue("configFile", ConfigValueFactory.fromAnyRef(configFile));
setValue("configFile", configFile);
}
private String getOptionalString(String path) {
return config.hasPath(path) ? config.getString(path) : null;
}
private void setValue(String path, Object value) {
config = config.withValue(path, ConfigValueFactory.fromAnyRef(value));
}
}

View File

@ -11,10 +11,10 @@ import org.gradle.api.tasks.SourceSet.MAIN_SOURCE_SET_NAME
import org.gradle.api.tasks.TaskAction
import java.io.File
import java.net.URLClassLoader
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.TimeUnit
import java.util.jar.JarInputStream
/**
* Creates nodes based on the configuration of this task in the gradle configuration DSL.
@ -23,12 +23,16 @@ import java.util.concurrent.TimeUnit
*/
@Suppress("unused")
open class Cordform : DefaultTask() {
private companion object {
private val defaultDirectory: Path = Paths.get("build", "nodes")
}
/**
* Optionally the name of a CordformDefinition subclass to which all configuration will be delegated.
*/
@Suppress("MemberVisibilityCanPrivate")
var definitionClass: String? = null
private var directory = Paths.get("build", "nodes")
private var directory = defaultDirectory
private val nodes = mutableListOf<Node>()
/**
@ -116,7 +120,6 @@ open class Cordform : DefaultTask() {
/**
* This task action will create and install the nodes based on the node configurations added.
*/
@Suppress("unused")
@TaskAction
fun build() {
project.logger.info("Running Cordform task")
@ -129,10 +132,18 @@ open class Cordform : DefaultTask() {
private fun initializeConfiguration() {
if (definitionClass != null) {
val cd = loadCordformDefinition()
// If the user has specified their own directory (even if it's the same default path) then let them know
// it's not used and should just rely on the one in CordformDefinition
require(directory === defaultDirectory) {
"'directory' cannot be used when 'definitionClass' is specified. Use CordformDefinition.nodesDirectory instead."
}
directory = cd.nodesDirectory
val cordapps = cd.getMatchingCordapps()
cd.nodeConfigurers.forEach {
val node = node { }
it.accept(node)
node.rootDir(directory)
node.installCordapps(cordapps)
}
cd.setup { nodeName -> project.projectDir.toPath().resolve(getNodeByName(nodeName)!!.nodeDir.toPath()) }
} else {
@ -142,6 +153,30 @@ open class Cordform : DefaultTask() {
}
}
private fun CordformDefinition.getMatchingCordapps(): List<File> {
val cordappJars = project.configuration("cordapp").files
return cordappPackages.map { `package` ->
val cordappsWithPackage = cordappJars.filter { it.containsPackage(`package`) }
when (cordappsWithPackage.size) {
0 -> throw IllegalArgumentException("There are no cordapp dependencies containing the package $`package`")
1 -> cordappsWithPackage[0]
else -> throw IllegalArgumentException("More than one cordapp dependency contains the package $`package`: $cordappsWithPackage")
}
}
}
private fun File.containsPackage(`package`: String): Boolean {
JarInputStream(inputStream()).use {
while (true) {
val name = it.nextJarEntry?.name ?: break
if (name.endsWith(".class") && name.replace('/', '.').startsWith(`package`)) {
return true
}
}
return false
}
}
private fun generateAndInstallNodeInfos() {
generateNodeInfos()
installNodeInfos()
@ -149,7 +184,7 @@ open class Cordform : DefaultTask() {
private fun generateNodeInfos() {
project.logger.info("Generating node infos")
var nodeProcesses = buildNodeProcesses()
val nodeProcesses = buildNodeProcesses()
try {
validateNodeProcessess(nodeProcesses)
} finally {
@ -177,7 +212,7 @@ open class Cordform : DefaultTask() {
private fun buildNodeProcess(node: Node): Pair<Node, Process> {
node.makeLogDirectory()
var process = ProcessBuilder(generateNodeInfoCommand())
val process = ProcessBuilder(generateNodeInfoCommand())
.directory(node.fullPath().toFile())
.redirectErrorStream(true)
// InheritIO causes hangs on windows due the gradle buffer also not being flushed.
@ -224,6 +259,8 @@ open class Cordform : DefaultTask() {
}
}
}
private fun Node.logFile(): Path = this.logDirectory().resolve("generate-info.log")
private fun ProcessBuilder.addEnvironment(key: String, value: String) = this.apply { environment().put(key, value) }
}

View File

@ -3,7 +3,6 @@ package net.corda.plugins
import com.typesafe.config.*
import net.corda.cordform.CordformNode
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x500.RDN
import org.bouncycastle.asn1.x500.style.BCStyle
import org.gradle.api.Project
import java.io.File
@ -39,6 +38,7 @@ class Node(private val project: Project) : CordformNode() {
private val releaseVersion = project.rootProject.ext<String>("corda_release_version")
internal lateinit var nodeDir: File
private set
/**
* Sets whether this node will use HTTPS communication.
@ -60,26 +60,6 @@ class Node(private val project: Project) : CordformNode() {
config = config.withValue("useTestClock", ConfigValueFactory.fromAnyRef(useTestClock))
}
/**
* Set the HTTP web server port for this node. Will use localhost as the address.
*
* @param webPort The web port number for this node.
*/
fun webPort(webPort: Int) {
config = config.withValue("webAddress",
ConfigValueFactory.fromAnyRef("$DEFAULT_HOST:$webPort"))
}
/**
* Set the HTTP web server address and port for this node.
*
* @param webAddress The web address for this node.
*/
fun webAddress(webAddress: String) {
config = config.withValue("webAddress",
ConfigValueFactory.fromAnyRef(webAddress))
}
/**
* Set the network map address for this node.
*
@ -104,7 +84,6 @@ class Node(private val project: Project) : CordformNode() {
config = config.withValue("sshd.port", ConfigValueFactory.fromAnyRef(sshdPort))
}
internal fun build() {
configureProperties()
installCordaJar()
@ -118,19 +97,15 @@ class Node(private val project: Project) : CordformNode() {
}
internal fun rootDir(rootDir: Path) {
if(name == null) {
if (name == null) {
project.logger.error("Node has a null name - cannot create node")
throw IllegalStateException("Node has a null name - cannot create node")
}
val dirName = try {
val o = X500Name(name).getRDNs(BCStyle.O)
if (o.size > 0) {
o.first().first.value.toString()
} else {
name
}
} catch(_ : IllegalArgumentException) {
if (o.isNotEmpty()) o.first().first.value.toString() else name
} catch (_ : IllegalArgumentException) {
// Can't parse as an X500 name, use the full string
name
}
@ -192,9 +167,8 @@ class Node(private val project: Project) : CordformNode() {
/**
* Installs other cordapps to this node's cordapps directory.
*/
private fun installCordapps() {
internal fun installCordapps(cordapps: Collection<File> = getCordappList()) {
val cordappsDir = File(nodeDir, "cordapps")
val cordapps = getCordappList()
project.copy {
it.apply {
from(cordapps)
@ -280,7 +254,7 @@ class Node(private val project: Project) : CordformNode() {
throw RuntimeException("No Corda Webserver JAR found. Have you deployed the Corda project to Maven? Looked for \"corda-webserver-$releaseVersion.jar\"")
} else {
val jar = maybeJar.singleFile
assert(jar.isFile)
require(jar.isFile)
return jar
}
}

View File

@ -1,5 +1,6 @@
#Sat Nov 25 22:21:50 GMT 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.3-all.zip

View File

@ -3,7 +3,7 @@ apply plugin: 'net.corda.plugins.quasar-utils'
apply plugin: 'net.corda.plugins.publish-utils'
apply plugin: 'com.jfrog.artifactory'
description 'Corda node Artemis API'
description 'Corda node API'
dependencies {
compile project(":core")

View File

@ -1,63 +1,79 @@
package net.corda.node.utilities
package net.corda.nodeapi.internal.persistence
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import net.corda.core.node.services.IdentityService
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.services.persistence.HibernateConfiguration
import net.corda.node.services.schema.NodeSchemaService
import net.corda.core.schemas.MappedSchema
import rx.Observable
import rx.Subscriber
import rx.subjects.UnicastSubject
import java.io.Closeable
import java.sql.Connection
import java.sql.SQLException
import java.util.*
import java.util.concurrent.CopyOnWriteArrayList
import javax.persistence.AttributeConverter
import javax.sql.DataSource
/**
* Table prefix for all tables owned by the node module.
*/
const val NODE_DATABASE_PREFIX = "node_"
//HikariDataSource implements Closeable which allows CordaPersistence to be Closeable
// This class forms part of the node config and so any changes to it must be handled with care
data class DatabaseConfig(
val initialiseSchema: Boolean = true,
val serverNameTablePrefix: String = "",
val transactionIsolationLevel: TransactionIsolationLevel = TransactionIsolationLevel.REPEATABLE_READ
)
// This class forms part of the node config and so any changes to it must be handled with care
enum class TransactionIsolationLevel {
NONE,
READ_UNCOMMITTED,
READ_COMMITTED,
REPEATABLE_READ,
SERIALIZABLE;
/**
* The JDBC constant value of the same name but prefixed with TRANSACTION_ defined in [java.sql.Connection].
*/
val jdbcValue: Int = java.sql.Connection::class.java.getField("TRANSACTION_$name").get(null) as Int
}
class CordaPersistence(
val dataSource: HikariDataSource,
private val schemaService: SchemaService,
private val identityService: IdentityService,
databaseConfig: DatabaseConfig
val dataSource: DataSource,
databaseConfig: DatabaseConfig,
schemas: Set<MappedSchema>,
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet()
) : Closeable {
val transactionIsolationLevel = databaseConfig.transactionIsolationLevel.jdbcValue
val defaultIsolationLevel = databaseConfig.transactionIsolationLevel
val hibernateConfig: HibernateConfiguration by lazy {
transaction {
HibernateConfiguration(schemaService, databaseConfig, identityService)
HibernateConfiguration(schemas, databaseConfig, attributeConverters)
}
}
val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas
companion object {
fun connect(dataSource: HikariDataSource, schemaService: SchemaService, identityService: IdentityService, databaseConfig: DatabaseConfig): CordaPersistence {
return CordaPersistence(dataSource, schemaService, identityService, databaseConfig).apply {
DatabaseTransactionManager(this)
init {
DatabaseTransactionManager(this)
// Check not in read-only mode.
transaction {
dataSource.connection.use {
check(!it.metaData.isReadOnly) { "Database should not be readonly." }
}
}
}
/**
* Creates an instance of [DatabaseTransaction], with the given isolation level.
* @param isolationLevel isolation level for the transaction. If not specified the default (i.e. provided at the creation time) is used.
* Creates an instance of [DatabaseTransaction], with the given transaction isolation level.
*/
fun createTransaction(isolationLevel: Int): DatabaseTransaction {
fun createTransaction(isolationLevel: TransactionIsolationLevel): DatabaseTransaction {
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
DatabaseTransactionManager.dataSource = this
return DatabaseTransactionManager.currentOrNew(isolationLevel)
}
/**
* Creates an instance of [DatabaseTransaction], with the transaction isolation level specified at the creation time.
* Creates an instance of [DatabaseTransaction], with the default transaction isolation level.
*/
fun createTransaction(): DatabaseTransaction = createTransaction(transactionIsolationLevel)
fun createTransaction(): DatabaseTransaction = createTransaction(defaultIsolationLevel)
fun createSession(): Connection {
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
@ -71,7 +87,7 @@ class CordaPersistence(
* @param isolationLevel isolation level for the transaction.
* @param statement to be executed in the scope of this transaction.
*/
fun <T> transaction(isolationLevel: Int, statement: DatabaseTransaction.() -> T): T {
fun <T> transaction(isolationLevel: TransactionIsolationLevel, statement: DatabaseTransaction.() -> T): T {
DatabaseTransactionManager.dataSource = this
return transaction(isolationLevel, 3, statement)
}
@ -80,22 +96,21 @@ class CordaPersistence(
* Executes given statement in the scope of transaction with the transaction level specified at the creation time.
* @param statement to be executed in the scope of this transaction.
*/
fun <T> transaction(statement: DatabaseTransaction.() -> T): T = transaction(transactionIsolationLevel, statement)
fun <T> transaction(statement: DatabaseTransaction.() -> T): T = transaction(defaultIsolationLevel, statement)
private fun <T> transaction(transactionIsolation: Int, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T {
private fun <T> transaction(isolationLevel: TransactionIsolationLevel, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T {
val outer = DatabaseTransactionManager.currentOrNull()
return if (outer != null) {
outer.statement()
} else {
inTopLevelTransaction(transactionIsolation, repetitionAttempts, statement)
inTopLevelTransaction(isolationLevel, repetitionAttempts, statement)
}
}
private fun <T> inTopLevelTransaction(transactionIsolation: Int, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T {
private fun <T> inTopLevelTransaction(isolationLevel: TransactionIsolationLevel, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T {
var repetitions = 0
while (true) {
val transaction = DatabaseTransactionManager.currentOrNew(transactionIsolation)
val transaction = DatabaseTransactionManager.currentOrNew(isolationLevel)
try {
val answer = transaction.statement()
transaction.commit()
@ -116,23 +131,11 @@ class CordaPersistence(
}
override fun close() {
dataSource.close()
// DataSource doesn't implement AutoCloseable so we just have to hope that the implementation does so that we can close it
(dataSource as? AutoCloseable)?.close()
}
}
fun configureDatabase(dataSourceProperties: Properties, databaseConfig: DatabaseConfig, identityService: IdentityService, schemaService: SchemaService = NodeSchemaService(null)): CordaPersistence {
val config = HikariConfig(dataSourceProperties)
val dataSource = HikariDataSource(config)
val persistence = CordaPersistence.connect(dataSource, schemaService, identityService, databaseConfig)
// Check not in read-only mode.
persistence.transaction {
persistence.dataSource.connection.use {
check(!it.metaData.isReadOnly) { "Database should not be readonly." }
}
}
return persistence
}
/**
* Buffer observations until after the current database transaction has been closed. Observations are never
* dropped, simply delayed.
@ -144,7 +147,7 @@ fun configureDatabase(dataSourceProperties: Properties, databaseConfig: Database
*/
fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
val currentTxId = DatabaseTransactionManager.transactionId
val databaseTxBoundary: Observable<DatabaseTransactionManager.Boundary> = DatabaseTransactionManager.transactionBoundaries.filter { it.txId == currentTxId }.first()
val databaseTxBoundary: Observable<DatabaseTransactionManager.Boundary> = DatabaseTransactionManager.transactionBoundaries.first { it.txId == currentTxId }
val subject = UnicastSubject.create<T>()
subject.delaySubscription(databaseTxBoundary).subscribe(this)
databaseTxBoundary.doOnCompleted { subject.onCompleted() }
@ -183,14 +186,9 @@ private class DatabaseTransactionWrappingSubscriber<U>(val db: CordaPersistence?
// A subscriber that wraps another but does not pass on observations to it.
private class NoOpSubscriber<U>(t: Subscriber<in U>) : Subscriber<U>(t) {
override fun onCompleted() {
}
override fun onError(e: Throwable?) {
}
override fun onNext(s: U) {
}
override fun onCompleted() {}
override fun onError(e: Throwable?) {}
override fun onNext(s: U) {}
}
/**

View File

@ -0,0 +1,61 @@
package net.corda.nodeapi.internal.persistence
import org.hibernate.Session
import org.hibernate.Transaction
import rx.subjects.Subject
import java.sql.Connection
import java.util.*
class DatabaseTransaction(
isolation: Int,
private val threadLocal: ThreadLocal<DatabaseTransaction>,
private val transactionBoundaries: Subject<DatabaseTransactionManager.Boundary, DatabaseTransactionManager.Boundary>,
val cordaPersistence: CordaPersistence
) {
val id: UUID = UUID.randomUUID()
val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
cordaPersistence.dataSource.connection.apply {
autoCommit = false
transactionIsolation = isolation
}
}
private val sessionDelegate = lazy {
val session = cordaPersistence.entityManagerFactory.withOptions().connection(connection).openSession()
hibernateTransaction = session.beginTransaction()
session
}
val session: Session by sessionDelegate
private lateinit var hibernateTransaction: Transaction
private val outerTransaction: DatabaseTransaction? = threadLocal.get()
fun commit() {
if (sessionDelegate.isInitialized()) {
hibernateTransaction.commit()
}
connection.commit()
}
fun rollback() {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.clear()
}
if (!connection.isClosed) {
connection.rollback()
}
}
fun close() {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.close()
}
connection.close()
threadLocal.set(outerTransaction)
if (outerTransaction == null) {
transactionBoundaries.onNext(DatabaseTransactionManager.Boundary(id))
}
}
}

View File

@ -1,68 +1,14 @@
package net.corda.node.utilities
package net.corda.nodeapi.internal.persistence
import co.paralleluniverse.strands.Strand
import org.hibernate.Session
import org.hibernate.Transaction
import rx.subjects.PublishSubject
import rx.subjects.Subject
import java.sql.Connection
import java.util.*
import java.util.concurrent.ConcurrentHashMap
class DatabaseTransaction(isolation: Int, val threadLocal: ThreadLocal<DatabaseTransaction>,
val transactionBoundaries: Subject<DatabaseTransactionManager.Boundary, DatabaseTransactionManager.Boundary>,
val cordaPersistence: CordaPersistence) {
fun currentDBSession(): Session = DatabaseTransactionManager.current().session
val id: UUID = UUID.randomUUID()
val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
cordaPersistence.dataSource.connection
.apply {
autoCommit = false
transactionIsolation = isolation
}
}
private val sessionDelegate = lazy {
val session = cordaPersistence.entityManagerFactory.withOptions().connection(connection).openSession()
hibernateTransaction = session.beginTransaction()
session
}
val session: Session by sessionDelegate
private lateinit var hibernateTransaction: Transaction
private val outerTransaction: DatabaseTransaction? = threadLocal.get()
fun commit() {
if (sessionDelegate.isInitialized()) {
hibernateTransaction.commit()
}
connection.commit()
}
fun rollback() {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.clear()
}
if (!connection.isClosed) {
connection.rollback()
}
}
fun close() {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.close()
}
connection.close()
threadLocal.set(outerTransaction)
if (outerTransaction == null) {
transactionBoundaries.onNext(DatabaseTransactionManager.Boundary(id))
}
}
}
fun currentDBSession() = DatabaseTransactionManager.current().session
class DatabaseTransactionManager(initDataSource: CordaPersistence) {
companion object {
private val threadLocalDb = ThreadLocal<CordaPersistence>()
@ -95,11 +41,15 @@ class DatabaseTransactionManager(initDataSource: CordaPersistence) {
fun currentOrNull(): DatabaseTransaction? = manager.currentOrNull()
fun currentOrNew(isolation: Int = dataSource.transactionIsolationLevel) = currentOrNull() ?: manager.newTransaction(isolation)
fun currentOrNew(isolation: TransactionIsolationLevel = dataSource.defaultIsolationLevel): DatabaseTransaction {
return currentOrNull() ?: manager.newTransaction(isolation.jdbcValue)
}
fun current(): DatabaseTransaction = currentOrNull() ?: error("No transaction in context.")
fun newTransaction(isolation: Int = dataSource.transactionIsolationLevel) = manager.newTransaction(isolation)
fun newTransaction(isolation: TransactionIsolationLevel = dataSource.defaultIsolationLevel): DatabaseTransaction {
return manager.newTransaction(isolation.jdbcValue)
}
}
data class Boundary(val txId: UUID)

View File

@ -1,13 +1,9 @@
package net.corda.node.services.persistence
package net.corda.nodeapi.internal.persistence
import net.corda.core.internal.castIfPossible
import net.corda.core.node.services.IdentityService
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.toHexString
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.utilities.DatabaseTransactionManager
import org.hibernate.SessionFactory
import org.hibernate.boot.MetadataSources
import org.hibernate.boot.model.naming.Identifier
@ -18,14 +14,18 @@ import org.hibernate.engine.jdbc.connections.spi.ConnectionProvider
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment
import org.hibernate.service.UnknownUnwrapTypeException
import org.hibernate.type.AbstractSingleColumnStandardBasicType
import org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry
import org.hibernate.type.descriptor.java.PrimitiveByteArrayTypeDescriptor
import org.hibernate.type.descriptor.sql.BlobTypeDescriptor
import org.hibernate.type.descriptor.sql.VarbinaryTypeDescriptor
import java.sql.Connection
import java.util.concurrent.ConcurrentHashMap
import javax.persistence.AttributeConverter
class HibernateConfiguration(val schemaService: SchemaService, private val databaseConfig: DatabaseConfig, private val identityService: IdentityService) {
class HibernateConfiguration(
schemas: Set<MappedSchema>,
private val databaseConfig: DatabaseConfig,
private val attributeConverters: Collection<AttributeConverter<*, *>>
) {
companion object {
private val logger = contextLogger()
}
@ -33,13 +33,8 @@ class HibernateConfiguration(val schemaService: SchemaService, private val datab
// TODO: make this a guava cache or similar to limit ability for this to grow forever.
private val sessionFactories = ConcurrentHashMap<Set<MappedSchema>, SessionFactory>()
val sessionFactoryForRegisteredSchemas = schemaService.schemaOptions.keys.let {
val sessionFactoryForRegisteredSchemas = schemas.let {
logger.info("Init HibernateConfiguration for schemas: $it")
// Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately
// Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
// either Hibernate can be convinced to stop warning, use the descriptor by default, or something else.
JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(identityService))
sessionFactoryForSchemas(it)
}
@ -54,7 +49,7 @@ class HibernateConfiguration(val schemaService: SchemaService, private val datab
// necessarily remain and would likely be replaced by something like Liquibase. For now it is very convenient though.
// TODO: replace auto schema generation as it isn't intended for production use, according to Hibernate docs.
val config = Configuration(metadataSources).setProperty("hibernate.connection.provider_class", NodeDatabaseConnectionProvider::class.java.name)
.setProperty("hibernate.hbm2ddl.auto", if (databaseConfig.initDatabase) "update" else "validate")
.setProperty("hibernate.hbm2ddl.auto", if (databaseConfig.initialiseSchema) "update" else "validate")
.setProperty("hibernate.format_sql", "true")
.setProperty("hibernate.connection.isolation", databaseConfig.transactionIsolationLevel.jdbcValue.toString())
@ -78,7 +73,7 @@ class HibernateConfiguration(val schemaService: SchemaService, private val datab
}
})
// register custom converters
applyAttributeConverter(AbstractPartyToX500NameAsStringConverter(identityService))
attributeConverters.forEach { applyAttributeConverter(it) }
// Register a tweaked version of `org.hibernate.type.MaterializedBlobType` that truncates logged messages.
// to avoid OOM when large blobs might get logged.
applyBasicType(CordaMaterializedBlobType, CordaMaterializedBlobType.name)

View File

@ -26,5 +26,8 @@ class AMQPPrimitiveSerializer(clazz: Class<*>) : AMQPSerializer<Any> {
}
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): Any = (obj as? Binary)?.array ?: obj
override fun readObject(
obj: Any,
schemas: SerializationSchemas,
input: DeserializationInput): Any = (obj as? Binary)?.array ?: obj
}

View File

@ -35,5 +35,5 @@ interface AMQPSerializer<out T> {
/**
* Read the given object from the input. The envelope is provided in case the schema is required.
*/
fun readObject(obj: Any, schema: Schema, input: DeserializationInput): T
fun readObject(obj: Any, schema: SerializationSchemas, input: DeserializationInput): T
}

View File

@ -56,9 +56,9 @@ open class ArraySerializer(override val type: Type, factory: SerializerFactory)
}
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): Any {
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): Any {
if (obj is List<*>) {
return obj.map { input.readObjectOrNull(it, schema, elementType) }.toArrayOfType(elementType)
return obj.map { input.readObjectOrNull(it, schemas, elementType) }.toArrayOfType(elementType)
} else throw NotSerializableException("Expected a List but found $obj")
}

View File

@ -77,8 +77,8 @@ class CollectionSerializer(val declaredType: ParameterizedType, factory: Seriali
}
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): Any = ifThrowsAppend({ declaredType.typeName }) {
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): Any = ifThrowsAppend({ declaredType.typeName }) {
// TODO: Can we verify the entries in the list?
concreteBuilder((obj as List<*>).map { input.readObjectOrNull(it, schema, declaredType.actualTypeArguments[0]) })
concreteBuilder((obj as List<*>).map { input.readObjectOrNull(it, schemas, declaredType.actualTypeArguments[0]) })
}
}

View File

@ -67,8 +67,8 @@ abstract class CustomSerializer<T : Any> : AMQPSerializer<T> {
superClassSerializer.writeDescribedObject(obj, data, type, output)
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): T {
return superClassSerializer.readObject(obj, schema, input)
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): T {
return superClassSerializer.readObject(obj, schemas, input)
}
}
@ -133,8 +133,8 @@ abstract class CustomSerializer<T : Any> : AMQPSerializer<T> {
}
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): T {
val proxy: P = uncheckedCast(proxySerializer.readObject(obj, schema, input))
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): T {
val proxy: P = uncheckedCast(proxySerializer.readObject(obj, schemas, input))
return fromProxy(proxy)
}
}
@ -166,7 +166,7 @@ abstract class CustomSerializer<T : Any> : AMQPSerializer<T> {
data.putString(unmaker(obj))
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): T {
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): T {
val proxy = obj as String
return maker(proxy)
}

View File

@ -97,21 +97,21 @@ class DeserializationInput(internal val serializerFactory: SerializerFactory) {
@Throws(NotSerializableException::class)
fun <T : Any> deserialize(bytes: ByteSequence, clazz: Class<T>): T = des {
val envelope = getEnvelope(bytes)
clazz.cast(readObjectOrNull(envelope.obj, envelope.schema, clazz))
clazz.cast(readObjectOrNull(envelope.obj, SerializationSchemas(envelope.schema, envelope.transformsSchema), clazz))
}
@Throws(NotSerializableException::class)
fun <T : Any> deserializeAndReturnEnvelope(bytes: SerializedBytes<T>, clazz: Class<T>): ObjectAndEnvelope<T> = des {
val envelope = getEnvelope(bytes)
// Now pick out the obj and schema from the envelope.
ObjectAndEnvelope(clazz.cast(readObjectOrNull(envelope.obj, envelope.schema, clazz)), envelope)
ObjectAndEnvelope(clazz.cast(readObjectOrNull(envelope.obj, SerializationSchemas(envelope.schema, envelope.transformsSchema), clazz)), envelope)
}
internal fun readObjectOrNull(obj: Any?, schema: Schema, type: Type): Any? {
internal fun readObjectOrNull(obj: Any?, schema: SerializationSchemas, type: Type): Any? {
return if (obj == null) null else readObject(obj, schema, type)
}
internal fun readObject(obj: Any, schema: Schema, type: Type): Any =
internal fun readObject(obj: Any, schemas: SerializationSchemas, type: Type): Any =
if (obj is DescribedType && ReferencedObject.DESCRIPTOR == obj.descriptor) {
// It must be a reference to an instance that has already been read, cheaply and quickly returning it by reference.
val objectIndex = (obj.described as UnsignedInteger).toInt()
@ -127,11 +127,11 @@ class DeserializationInput(internal val serializerFactory: SerializerFactory) {
val objectRead = when (obj) {
is DescribedType -> {
// Look up serializer in factory by descriptor
val serializer = serializerFactory.get(obj.descriptor, schema)
val serializer = serializerFactory.get(obj.descriptor, schemas)
if (SerializerFactory.AnyType != type && serializer.type != type && with(serializer.type) { !isSubClassOf(type) && !materiallyEquivalentTo(type) })
throw NotSerializableException("Described type with descriptor ${obj.descriptor} was " +
"expected to be of type $type but was ${serializer.type}")
serializer.readObject(obj.described, schema, this)
serializer.readObject(obj.described, schemas, this)
}
is Binary -> obj.array
else -> obj // this will be the case for primitive types like [boolean] et al.

View File

@ -27,7 +27,7 @@ class EnumSerializer(declaredType: Type, declaredClass: Class<*>, factory: Seria
output.writeTypeNotations(typeNotation)
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): Any {
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): Any {
val enumName = (obj as List<*>)[0] as String
val enumOrd = obj[1] as Int
val fromOrd = type.asClass()!!.enumConstants[enumOrd] as Enum<*>?

View File

@ -32,8 +32,8 @@ class EvolutionSerializer(
* @param property object to read the actual property value
*/
data class OldParam(val type: Type, val idx: Int, val property: PropertySerializer) {
fun readProperty(paramValues: List<*>, schema: Schema, input: DeserializationInput) =
property.readProperty(paramValues[idx], schema, input)
fun readProperty(paramValues: List<*>, schemas: SerializationSchemas, input: DeserializationInput) =
property.readProperty(paramValues[idx], schemas, input)
}
companion object {
@ -121,10 +121,10 @@ class EvolutionSerializer(
*
* TODO: Object references
*/
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): Any {
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): Any {
if (obj !is List<*>) throw NotSerializableException("Body of described type is unexpected $obj")
return construct(readers.map { it?.readProperty(obj, schema, input) })
return construct(readers.map { it?.readProperty(obj, schemas, input) })
}
}

View File

@ -88,15 +88,15 @@ class MapSerializer(private val declaredType: ParameterizedType, factory: Serial
}
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): Any = ifThrowsAppend({ declaredType.typeName }) {
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): Any = ifThrowsAppend({ declaredType.typeName }) {
// TODO: General generics question. Do we need to validate that entries in Maps and Collections match the generic type? Is it a security hole?
val entries: Iterable<Pair<Any?, Any?>> = (obj as Map<*, *>).map { readEntry(schema, input, it) }
val entries: Iterable<Pair<Any?, Any?>> = (obj as Map<*, *>).map { readEntry(schemas, input, it) }
concreteBuilder(entries.toMap())
}
private fun readEntry(schema: Schema, input: DeserializationInput, entry: Map.Entry<Any?, Any?>) =
input.readObjectOrNull(entry.key, schema, declaredType.actualTypeArguments[0]) to
input.readObjectOrNull(entry.value, schema, declaredType.actualTypeArguments[1])
private fun readEntry(schemas: SerializationSchemas, input: DeserializationInput, entry: Map.Entry<Any?, Any?>) =
input.readObjectOrNull(entry.key, schemas, declaredType.actualTypeArguments[0]) to
input.readObjectOrNull(entry.value, schemas, declaredType.actualTypeArguments[1])
// Cannot use * as a bound for EnumMap and EnumSet since * is not an enum. So, we use a sample enum instead.
// We don't actually care about the type, we just need to make the compiler happier.

View File

@ -55,10 +55,15 @@ open class ObjectSerializer(val clazz: Type, factory: SerializerFactory) : AMQPS
}
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): Any = ifThrowsAppend({ clazz.typeName }) {
override fun readObject(
obj: Any,
schemas: SerializationSchemas,
input: DeserializationInput): Any = ifThrowsAppend({ clazz.typeName }) {
if (obj is List<*>) {
if (obj.size > propertySerializers.size) throw NotSerializableException("Too many properties in described type $typeName")
val params = obj.zip(propertySerializers).map { it.second.readProperty(it.first, schema, input) }
if (obj.size > propertySerializers.size) {
throw NotSerializableException("Too many properties in described type $typeName")
}
val params = obj.zip(propertySerializers).map { it.second.readProperty(it.first, schemas, input) }
construct(params)
} else throw NotSerializableException("Body of described type is unexpected $obj")
}

View File

@ -14,7 +14,7 @@ import kotlin.reflect.jvm.javaGetter
sealed class PropertySerializer(val name: String, val readMethod: Method?, val resolvedType: Type) {
abstract fun writeClassInfo(output: SerializationOutput)
abstract fun writeProperty(obj: Any?, data: Data, output: SerializationOutput)
abstract fun readProperty(obj: Any?, schema: Schema, input: DeserializationInput): Any?
abstract fun readProperty(obj: Any?, schemas: SerializationSchemas, input: DeserializationInput): Any?
val type: String = generateType()
val requires: List<String> = generateRequires()
@ -91,8 +91,8 @@ sealed class PropertySerializer(val name: String, val readMethod: Method?, val r
}
}
override fun readProperty(obj: Any?, schema: Schema, input: DeserializationInput): Any? = ifThrowsAppend({ nameForDebug }) {
input.readObjectOrNull(obj, schema, resolvedType)
override fun readProperty(obj: Any?, schemas: SerializationSchemas, input: DeserializationInput): Any? = ifThrowsAppend({ nameForDebug }) {
input.readObjectOrNull(obj, schemas, resolvedType)
}
override fun writeProperty(obj: Any?, data: Data, output: SerializationOutput) = ifThrowsAppend({ nameForDebug }) {
@ -108,7 +108,7 @@ sealed class PropertySerializer(val name: String, val readMethod: Method?, val r
class AMQPPrimitivePropertySerializer(name: String, readMethod: Method?, resolvedType: Type) : PropertySerializer(name, readMethod, resolvedType) {
override fun writeClassInfo(output: SerializationOutput) {}
override fun readProperty(obj: Any?, schema: Schema, input: DeserializationInput): Any? {
override fun readProperty(obj: Any?, schemas: SerializationSchemas, input: DeserializationInput): Any? {
return if (obj is Binary) obj.array else obj
}
@ -131,7 +131,7 @@ sealed class PropertySerializer(val name: String, val readMethod: Method?, val r
PropertySerializer(name, readMethod, Character::class.java) {
override fun writeClassInfo(output: SerializationOutput) {}
override fun readProperty(obj: Any?, schema: Schema, input: DeserializationInput): Any? {
override fun readProperty(obj: Any?, schemas: SerializationSchemas, input: DeserializationInput): Any? {
return if (obj == null) null else (obj as Short).toChar()
}

View File

@ -4,7 +4,9 @@ import com.google.common.primitives.Primitives
import com.google.common.reflect.TypeResolver
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.ClassWhitelist
import net.corda.nodeapi.internal.serialization.carpenter.*
import net.corda.nodeapi.internal.serialization.carpenter.CarpenterMetaSchema
import net.corda.nodeapi.internal.serialization.carpenter.ClassCarpenter
import net.corda.nodeapi.internal.serialization.carpenter.MetaCarpenter
import org.apache.qpid.proton.amqp.*
import java.io.NotSerializableException
import java.lang.reflect.*
@ -13,7 +15,8 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import javax.annotation.concurrent.ThreadSafe
data class FactorySchemaAndDescriptor(val schema: Schema, val typeDescriptor: Any)
data class SerializationSchemas(val schema: Schema, val transforms: TransformsSchema)
data class FactorySchemaAndDescriptor(val schemas: SerializationSchemas, val typeDescriptor: Any)
/**
* Factory of serializers designed to be shared across threads and invocations.
@ -40,7 +43,10 @@ open class SerializerFactory(val whitelist: ClassWhitelist, cl: ClassLoader) {
val classloader: ClassLoader
get() = classCarpenter.classloader
private fun getEvolutionSerializer(typeNotation: TypeNotation, newSerializer: AMQPSerializer<Any>): AMQPSerializer<Any> {
private fun getEvolutionSerializer(
typeNotation: TypeNotation,
newSerializer: AMQPSerializer<Any>,
transforms: TransformsSchema): AMQPSerializer<Any> {
return serializersByDescriptor.computeIfAbsent(typeNotation.descriptor.name!!) {
when (typeNotation) {
is CompositeType -> EvolutionSerializer.make(typeNotation, newSerializer as ObjectSerializer, this)
@ -168,7 +174,7 @@ open class SerializerFactory(val whitelist: ClassWhitelist, cl: ClassLoader) {
* contained in the [Schema].
*/
@Throws(NotSerializableException::class)
fun get(typeDescriptor: Any, schema: Schema): AMQPSerializer<Any> {
fun get(typeDescriptor: Any, schema: SerializationSchemas): AMQPSerializer<Any> {
return serializersByDescriptor[typeDescriptor] ?: {
processSchema(FactorySchemaAndDescriptor(schema, typeDescriptor))
serializersByDescriptor[typeDescriptor] ?: throw NotSerializableException(
@ -194,9 +200,9 @@ open class SerializerFactory(val whitelist: ClassWhitelist, cl: ClassLoader) {
* Iterate over an AMQP schema, for each type ascertain weather it's on ClassPath of [classloader] amd
* if not use the [ClassCarpenter] to generate a class to use in it's place
*/
private fun processSchema(schema: FactorySchemaAndDescriptor, sentinel: Boolean = false) {
private fun processSchema(schemaAndDescriptor: FactorySchemaAndDescriptor, sentinel: Boolean = false) {
val metaSchema = CarpenterMetaSchema.newInstance()
for (typeNotation in schema.schema.types) {
for (typeNotation in schemaAndDescriptor.schemas.schema.types) {
try {
val serialiser = processSchemaEntry(typeNotation)
@ -204,7 +210,7 @@ open class SerializerFactory(val whitelist: ClassWhitelist, cl: ClassLoader) {
// doesn't match that of the serialised object then we are dealing with different
// instance of the class, as such we need to build an EvolutionSerialiser
if (serialiser.typeDescriptor != typeNotation.descriptor.name) {
getEvolutionSerializer(typeNotation, serialiser)
getEvolutionSerializer(typeNotation, serialiser, schemaAndDescriptor.schemas.transforms)
}
} catch (e: ClassNotFoundException) {
if (sentinel) throw e
@ -215,7 +221,7 @@ open class SerializerFactory(val whitelist: ClassWhitelist, cl: ClassLoader) {
if (metaSchema.isNotEmpty()) {
val mc = MetaCarpenter(metaSchema, classCarpenter)
mc.build()
processSchema(schema, true)
processSchema(schemaAndDescriptor, true)
}
}

View File

@ -28,7 +28,7 @@ class SingletonSerializer(override val type: Class<*>, val singleton: Any, facto
}
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): Any {
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): Any {
return singleton
}
}

View File

@ -85,6 +85,9 @@ class UnknownTransform : Transform() {
override val name: String get() = typeName
}
/**
* Used by the unit testing framework
*/
class UnknownTestTransform(val a: Int, val b: Int, val c: Int) : Transform() {
companion object : DescribedTypeConstructor<UnknownTestTransform> {
val typeName = "UnknownTest"

View File

@ -34,8 +34,8 @@ object InputStreamSerializer : CustomSerializer.Implements<InputStream>(InputStr
}
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): InputStream {
val bits = input.readObject(obj, schema, ByteArray::class.java) as ByteArray
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): InputStream {
val bits = input.readObject(obj, schemas, ByteArray::class.java) as ByteArray
return ByteArrayInputStream(bits)
}
}

View File

@ -20,8 +20,8 @@ object PrivateKeySerializer : CustomSerializer.Implements<PrivateKey>(PrivateKey
output.writeObject(obj.encoded, data, clazz)
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): PrivateKey {
val bits = input.readObject(obj, schema, ByteArray::class.java) as ByteArray
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): PrivateKey {
val bits = input.readObject(obj, schemas, ByteArray::class.java) as ByteArray
return Crypto.decodePrivateKey(bits)
}
}

View File

@ -17,8 +17,8 @@ object PublicKeySerializer : CustomSerializer.Implements<PublicKey>(PublicKey::c
output.writeObject(obj.encoded, data, clazz)
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): PublicKey {
val bits = input.readObject(obj, schema, ByteArray::class.java) as ByteArray
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): PublicKey {
val bits = input.readObject(obj, schemas, ByteArray::class.java) as ByteArray
return Crypto.decodePublicKey(bits)
}
}

View File

@ -20,8 +20,8 @@ object X509CertificateSerializer : CustomSerializer.Implements<X509Certificate>(
output.writeObject(obj.encoded, data, clazz)
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): X509Certificate {
val bits = input.readObject(obj, schema, ByteArray::class.java) as ByteArray
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): X509Certificate {
val bits = input.readObject(obj, schemas, ByteArray::class.java) as ByteArray
return X509CertificateFactory().generateCertificate(bits.inputStream())
}
}

View File

@ -1,4 +1,4 @@
package net.corda.node.utilities
package net.corda.nodeapi.internal.crypto
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.Crypto.EDDSA_ED25519_SHA512
@ -13,7 +13,6 @@ import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.node.serialization.KryoServerSerializationScheme
import net.corda.node.services.config.createKeystoreForCordaNode
import net.corda.nodeapi.internal.crypto.*
import net.corda.nodeapi.internal.serialization.AllWhitelist
import net.corda.nodeapi.internal.serialization.SerializationContextImpl
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl

View File

@ -0,0 +1,43 @@
package net.corda.nodeapi.internal.serialization.amqp
import net.corda.core.serialization.CordaSerializationTransformEnumDefault
import net.corda.core.serialization.SerializedBytes
import net.corda.testing.common.internal.ProjectStructure.projectRootDir
import org.assertj.core.api.Assertions
import org.junit.Test
import java.io.File
import java.io.NotSerializableException
import java.net.URI
// NOTE: To recreate the test files used by these tests uncomment the original test classes and comment
// the new ones out, then change each test to write out the serialized bytes rather than read
// the file.
class EnumEvolveTests {
var localPath = projectRootDir.toUri().resolve(
"node-api/src/test/resources/net/corda/nodeapi/internal/serialization/amqp")
// Version of the class as it was serialised
//
// @CordaSerializationTransformEnumDefault("D", "C")
// enum class DeserializeNewerSetToUnknown { A, B, C, D }
//
// Version of the class as it's used in the test
enum class DeserializeNewerSetToUnknown { A, B, C }
@Test
fun deserialiseNewerSetToUnknown() {
val resource = "${this.javaClass.simpleName}.${testName()}"
val sf = testDefaultFactory()
data class C (val e : DeserializeNewerSetToUnknown)
// Uncomment to re-generate test files
// File(URI("$localPath/$resource")).writeBytes(
// SerializationOutput(sf).serialize(C(DeserializeNewerSetToUnknown.D)).bytes)
Assertions.assertThatThrownBy {
DeserializationInput(sf).deserialize(SerializedBytes<C>(
File(EvolvabilityTests::class.java.getResource(resource).toURI()).readBytes()))
}.isInstanceOf(NotSerializableException::class.java)
}
}

View File

@ -17,7 +17,7 @@ class OverridePKSerializerTest {
throw SerializerTestException("Custom write call")
}
override fun readObject(obj: Any, schema: Schema, input: DeserializationInput): PublicKey {
override fun readObject(obj: Any, schemas: SerializationSchemas, input: DeserializationInput): PublicKey {
throw SerializerTestException("Custom read call")
}

View File

@ -54,8 +54,6 @@ dependencies {
compile project(':client:rpc')
compile "net.corda.plugins:cordform-common:$gradle_plugins_version"
compile "com.google.code.findbugs:jsr305:3.0.1"
// Log4J: logging framework (with SLF4J bindings)
compile "org.apache.logging.log4j:log4j-slf4j-impl:${log4j_version}"
compile "org.apache.logging.log4j:log4j-web:${log4j_version}"

View File

@ -10,7 +10,7 @@ import net.corda.node.internal.NodeStartup
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.nodeapi.User
import net.corda.testing.ALICE
import net.corda.testing.ProjectStructure.projectRootDir
import net.corda.testing.common.internal.ProjectStructure.projectRootDir
import net.corda.testing.driver.driver
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy

View File

@ -213,7 +213,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
private fun startBobAndCommunicateWithAlice(): Party {
val bob = startNode(BOB.name)
bob.internals.registerInitiatedFlow(ReceiveFlow::class.java)
bob.registerInitiatedFlow(ReceiveFlow::class.java)
val bobParty = bob.info.chooseIdentity()
// Perform a protocol exchange to force the peer queue to be created
alice.services.startFlow(SendFlow(bobParty, 0)).resultFuture.getOrThrow()

View File

@ -65,6 +65,40 @@ class NodeStatePersistenceTests {
val retrievedMessage = stateAndRef!!.state.data.message
assertEquals(message, retrievedMessage)
}
@Test
fun `persistent state survives node restart without reinitialising database schema`() {
// Temporary disable this test when executed on Windows. It is known to be sporadically failing.
// More investigation is needed to establish why.
assumeFalse(System.getProperty("os.name").toLowerCase().startsWith("win"))
val user = User("mark", "dadada", setOf(startFlow<SendMessageFlow>(), invokeRpc("vaultQuery")))
val message = Message("Hello world!")
val stateAndRef: StateAndRef<MessageState>? = driver(isDebug = true, startNodesInProcess = isQuasarAgentSpecified()) {
val nodeName = {
val nodeHandle = startNode(rpcUsers = listOf(user)).getOrThrow()
val nodeName = nodeHandle.nodeInfo.chooseIdentity().name
// Ensure the notary node has finished starting up, before starting a flow that needs a notary
defaultNotaryNode.getOrThrow()
nodeHandle.rpcClientToNode().start(user.username, user.password).use {
it.proxy.startFlow(::SendMessageFlow, message, defaultNotaryIdentity).returnValue.getOrThrow()
}
nodeHandle.stop()
nodeName
}()
val nodeHandle = startNode(providedName = nodeName, rpcUsers = listOf(user), customOverrides = mapOf("devMode" to "false")).getOrThrow()
val result = nodeHandle.rpcClientToNode().start(user.username, user.password).use {
val page = it.proxy.vaultQuery(MessageState::class.java)
page.states.singleOrNull()
}
nodeHandle.stop()
result
}
assertNotNull(stateAndRef)
val retrievedMessage = stateAndRef!!.state.data.message
assertEquals(message, retrievedMessage)
}
}
fun isQuasarAgentSpecified(): Boolean {

View File

@ -3,6 +3,8 @@ package net.corda.node.internal
import com.codahale.metrics.MetricRegistry
import com.google.common.collect.MutableClassToInstanceMap
import com.google.common.util.concurrent.MoreExecutors
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import net.corda.confidential.SwapIdentitiesFlow
import net.corda.confidential.SwapIdentitiesHandler
import net.corda.core.CordaException
@ -55,11 +57,13 @@ import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager
import net.corda.node.shell.InteractiveShell
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.nodeapi.internal.NetworkParameters
import net.corda.nodeapi.internal.crypto.*
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import org.apache.activemq.artemis.utils.ReusableLatch
import org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry
import org.slf4j.Logger
import rx.Observable
import java.io.IOException
@ -125,7 +129,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
private lateinit var _services: ServiceHubInternalImpl
protected var myNotaryIdentity: PartyAndCertificate? = null
protected lateinit var checkpointStorage: CheckpointStorage
protected lateinit var smm: StateMachineManager
private lateinit var tokenizableServices: List<Any>
protected lateinit var attachments: NodeAttachmentService
protected lateinit var network: MessagingService
@ -158,7 +161,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
@Volatile private var _started: StartedNode<AbstractNode>? = null
/** The implementation of the [CordaRPCOps] interface used by this node. */
open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence): CordaRPCOps {
open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence, smm: StateMachineManager): CordaRPCOps {
return SecureCordaRPCOps(services, smm, database, flowStarter)
}
@ -188,7 +191,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
initCertificate()
val (keyPairs, info) = initNodeInfo()
readNetworkParameters()
val schemaService = NodeSchemaService(cordappLoader)
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
val identityService = makeIdentityService(info)
// Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
@ -197,7 +200,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val stateLoader = StateLoaderImpl(transactionStorage)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, stateLoader, database, info, identityService)
val notaryService = makeNotaryService(nodeServices, database)
smm = makeStateMachineManager(database)
val smm = makeStateMachineManager(database)
val flowStarter = FlowStarterImpl(serverThread, smm)
val schedulerService = NodeSchedulerService(
platformClock,
@ -214,13 +217,13 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS)
}
}
makeVaultObservers(schedulerService, database.hibernateConfig)
val rpcOps = makeRPCOps(flowStarter, database)
makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService)
val rpcOps = makeRPCOps(flowStarter, database, smm)
startMessagingService(rpcOps)
installCoreFlows()
val cordaServices = installCordaServices(flowStarter)
tokenizableServices = nodeServices + cordaServices + schedulerService
registerCordappFlows()
registerCordappFlows(smm)
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
FlowLogicRefFactoryImpl.classloader = cordappLoader.appClassLoader
startShell(rpcOps)
@ -408,11 +411,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
installCoreFlow(NotaryFlow.Client::class, service::createServiceFlow)
}
private fun registerCordappFlows() {
private fun registerCordappFlows(smm: StateMachineManager) {
cordappLoader.cordapps.flatMap { it.initiatedFlows }
.forEach {
try {
registerInitiatedFlowInternal(it, track = false)
registerInitiatedFlowInternal(smm, it, track = false)
} catch (e: NoSuchMethodException) {
log.error("${it.name}, as an initiated flow, must have a constructor with a single parameter " +
"of type ${Party::class.java.name}")
@ -422,13 +425,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
/**
* Use this method to register your initiated flows in your tests. This is automatically done by the node when it
* starts up for all [FlowLogic] classes it finds which are annotated with [InitiatedBy].
* @return An [Observable] of the initiated flows started by counter-parties.
*/
fun <T : FlowLogic<*>> registerInitiatedFlow(initiatedFlowClass: Class<T>): Observable<T> {
return registerInitiatedFlowInternal(initiatedFlowClass, track = true)
internal fun <T : FlowLogic<*>> registerInitiatedFlow(smm: StateMachineManager, initiatedFlowClass: Class<T>): Observable<T> {
return registerInitiatedFlowInternal(smm, initiatedFlowClass, track = true)
}
// TODO remove once not needed
@ -437,7 +435,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
"It should accept a ${FlowSession::class.java.simpleName} instead"
}
private fun <F : FlowLogic<*>> registerInitiatedFlowInternal(initiatedFlow: Class<F>, track: Boolean): Observable<F> {
private fun <F : FlowLogic<*>> registerInitiatedFlowInternal(smm: StateMachineManager, initiatedFlow: Class<F>, track: Boolean): Observable<F> {
val constructors = initiatedFlow.declaredConstructors.associateBy { it.parameterTypes.toList() }
val flowSessionCtor = constructors[listOf(FlowSession::class.java)]?.apply { isAccessible = true }
val ctor: (FlowSession) -> F = if (flowSessionCtor == null) {
@ -458,16 +456,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
"${InitiatedBy::class.java.name} must point to ${classWithAnnotation.name} and not ${initiatingFlow.name}"
}
val flowFactory = InitiatedFlowFactory.CorDapp(version, initiatedFlow.appName, ctor)
val observable = internalRegisterFlowFactory(initiatingFlow, flowFactory, initiatedFlow, track)
val observable = internalRegisterFlowFactory(smm, initiatingFlow, flowFactory, initiatedFlow, track)
log.info("Registered ${initiatingFlow.name} to initiate ${initiatedFlow.name} (version $version)")
return observable
}
@VisibleForTesting
fun <F : FlowLogic<*>> internalRegisterFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>,
flowFactory: InitiatedFlowFactory<F>,
initiatedFlowClass: Class<F>,
track: Boolean): Observable<F> {
internal fun <F : FlowLogic<*>> internalRegisterFlowFactory(smm: StateMachineManager,
initiatingFlowClass: Class<out FlowLogic<*>>,
flowFactory: InitiatedFlowFactory<F>,
initiatedFlowClass: Class<F>,
track: Boolean): Observable<F> {
val observable = if (track) {
smm.changes.filter { it is StateMachineManager.Change.Add }.map { it.logic }.ofType(initiatedFlowClass)
} else {
@ -530,10 +528,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
protected open fun makeTransactionStorage(database: CordaPersistence): WritableTransactionStorage = DBTransactionStorage()
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration) {
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, smm: StateMachineManager, schemaService: SchemaService) {
VaultSoftLockManager.install(services.vaultService, smm)
ScheduledActivityObserver.install(services.vaultService, schedulerService)
HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig)
HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig, schemaService)
}
@VisibleForTesting
@ -777,7 +776,23 @@ internal class FlowStarterImpl(private val serverThread: AffinityExecutor, priva
}
class ConfigurationException(message: String) : CordaException(message)
/**
* Thrown when a node is about to start and its network map cache doesn't contain any node.
*/
internal class NetworkMapCacheEmptyException : Exception()
internal class NetworkMapCacheEmptyException : Exception()
fun configureDatabase(dataSourceProperties: Properties,
databaseConfig: DatabaseConfig,
identityService: IdentityService,
schemaService: SchemaService = NodeSchemaService()): CordaPersistence {
// Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately
// Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
// either Hibernate can be convinced to stop warning, use the descriptor by default, or something else.
JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(identityService))
val config = HikariConfig(dataSourceProperties)
val dataSource = HikariDataSource(config)
val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(identityService))
return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, attributeConverters)
}

View File

@ -26,7 +26,7 @@ import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.context
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
import rx.Observable
import java.io.InputStream
import java.security.PublicKey

View File

@ -10,9 +10,9 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.node.VersionInfo
import net.corda.node.internal.cordapp.CordappLoader
@ -25,10 +25,10 @@ import net.corda.node.services.messaging.*
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.utilities.AddressUtils
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.DemoClock
import net.corda.nodeapi.internal.ShutdownHook
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.serialization.*
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
import org.slf4j.Logger

View File

@ -5,7 +5,7 @@ import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.rpcContext
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
/**
* Implementation of [CordaRPCOps] that checks authorisation.

View File

@ -2,6 +2,8 @@ package net.corda.node.internal
import net.corda.core.contracts.*
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.internal.VisibleForTesting
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.StateLoader
@ -12,7 +14,8 @@ import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
import rx.Observable
interface StartedNode<out N : AbstractNode> {
val internals: N
@ -26,7 +29,20 @@ interface StartedNode<out N : AbstractNode> {
val rpcOps: CordaRPCOps
val notaryService: NotaryService?
fun dispose() = internals.stop()
fun <T : FlowLogic<*>> registerInitiatedFlow(initiatedFlowClass: Class<T>) = internals.registerInitiatedFlow(initiatedFlowClass)
/**
* Use this method to register your initiated flows in your tests. This is automatically done by the node when it
* starts up for all [FlowLogic] classes it finds which are annotated with [InitiatedBy].
* @return An [Observable] of the initiated flows started by counter-parties.
*/
fun <T : FlowLogic<*>> registerInitiatedFlow(initiatedFlowClass: Class<T>) = internals.registerInitiatedFlow(smm, initiatedFlowClass)
@VisibleForTesting
fun <F : FlowLogic<*>> internalRegisterFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>,
flowFactory: InitiatedFlowFactory<F>,
initiatedFlowClass: Class<F>,
track: Boolean): Observable<F> {
return internals.internalRegisterFlowFactory(smm, initiatingFlowClass, flowFactory, initiatedFlowClass, track)
}
}
class StateLoaderImpl(private val validatedTransactions: TransactionStorage) : StateLoader {

View File

@ -52,12 +52,14 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
}
}
val cordappSchemas: Set<MappedSchema> get() = cordapps.flatMap { it.customSchemas }.toSet()
companion object {
private val logger = contextLogger()
/**
* Default cordapp dir name
*/
val CORDAPPS_DIR_NAME = "cordapps"
private const val CORDAPPS_DIR_NAME = "cordapps"
/**
* Creates a default CordappLoader intended to be used in non-dev or non-test environments.
@ -79,7 +81,9 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
*/
@VisibleForTesting
fun createDefaultWithTestPackages(configuration: NodeConfiguration, testPackages: List<String>): CordappLoader {
check(configuration.devMode) { "Package scanning can only occur in dev mode" }
if (!configuration.devMode) {
logger.warn("Package scanning should only occur in dev mode!")
}
val paths = getCordappsInDirectory(getCordappsPath(configuration.baseDirectory)) + testPackages.flatMap(this::createScanPackage)
return cordappLoadersCache.computeIfAbsent(paths, { CordappLoader(paths) })
}
@ -92,8 +96,9 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
* CorDapps.
*/
@VisibleForTesting
fun createWithTestPackages(testPackages: List<String>)
= cordappLoadersCache.computeIfAbsent(testPackages, { CordappLoader(testPackages.flatMap(this::createScanPackage)) })
fun createWithTestPackages(testPackages: List<String>): CordappLoader {
return cordappLoadersCache.computeIfAbsent(testPackages, { CordappLoader(testPackages.flatMap(this::createScanPackage)) })
}
/**
* Creates a dev mode CordappLoader intended only to be used in test environments

View File

@ -1,11 +1,11 @@
package net.corda.node.services.api
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowStateMachine
import net.corda.core.context.InvocationContext
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineTransactionMapping
@ -23,7 +23,7 @@ import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBaseInternal
interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {

View File

@ -5,6 +5,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.node.services.messaging.CertificateChainCheckPolicy
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.User
import net.corda.nodeapi.config.NodeSSLConfiguration
import net.corda.nodeapi.config.parseAs
@ -19,7 +20,6 @@ interface NodeConfiguration : NodeSSLConfiguration {
val emailAddress: String
val exportJMXto: String
val dataSourceProperties: Properties
val database: DatabaseConfig
val rpcUsers: List<User>
val devMode: Boolean
val devModeOptions: DevModeOptions?
@ -30,6 +30,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
val notary: NotaryConfig?
val activeMQServer: ActiveMqServerConfiguration
val additionalNodeInfoPollingFrequencyMsec: Long
// TODO Remove as this is only used by the driver
val useHTTPS: Boolean
val p2pAddress: NetworkHostAndPort
val rpcAddress: NetworkHostAndPort?
@ -38,29 +39,11 @@ interface NodeConfiguration : NodeSSLConfiguration {
val useTestClock: Boolean get() = false
val detectPublicIp: Boolean get() = true
val sshd: SSHDConfiguration?
val database: DatabaseConfig
}
data class DevModeOptions(val disableCheckpointChecker: Boolean = false)
data class DatabaseConfig(
val initDatabase: Boolean = true,
val serverNameTablePrefix: String = "",
val transactionIsolationLevel: TransactionIsolationLevel = TransactionIsolationLevel.REPEATABLE_READ
)
enum class TransactionIsolationLevel {
NONE,
READ_UNCOMMITTED,
READ_COMMITTED,
REPEATABLE_READ,
SERIALIZABLE;
/**
* The JDBC constant value of the same name but with prefixed with TRANSACTION_ defined in [java.sql.Connection].
*/
val jdbcValue: Int = java.sql.Connection::class.java.getField("TRANSACTION_$name").get(null) as Int
}
fun NodeConfiguration.shouldCheckCheckpoints(): Boolean {
return this.devMode && this.devModeOptions?.disableCheckpointChecker != true
}
@ -108,7 +91,6 @@ data class NodeConfigurationImpl(
override val keyStorePassword: String,
override val trustStorePassword: String,
override val dataSourceProperties: Properties,
override val database: DatabaseConfig = DatabaseConfig(),
override val compatibilityZoneURL: URL? = null,
override val rpcUsers: List<User>,
override val verifierType: VerifierType,
@ -130,9 +112,9 @@ data class NodeConfigurationImpl(
override val activeMQServer: ActiveMqServerConfiguration,
// TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(),
override val sshd: SSHDConfiguration? = null
) : NodeConfiguration {
override val sshd: SSHDConfiguration? = null,
override val database: DatabaseConfig = DatabaseConfig(initialiseSchema = devMode)
) : NodeConfiguration {
override val exportJMXto: String get() = "http"
init {

View File

@ -3,6 +3,7 @@ package net.corda.node.services.events
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.context.InvocationContext
import net.corda.core.context.Origin
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledActivity
import net.corda.core.contracts.ScheduledStateRef
@ -12,7 +13,6 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.context.Origin
import net.corda.core.internal.until
import net.corda.core.node.StateLoader
import net.corda.core.schemas.PersistentStateRef
@ -24,9 +24,9 @@ import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.SchedulerService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.apache.activemq.artemis.utils.ReusableLatch
import java.time.Clock
import java.time.Instant

View File

@ -13,7 +13,7 @@ import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
import org.bouncycastle.cert.X509CertificateHolder
import java.security.InvalidAlgorithmParameterException

View File

@ -7,7 +7,7 @@ import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.bouncycastle.operator.ContentSigner
import java.security.KeyPair
import java.security.PrivateKey

View File

@ -190,11 +190,13 @@ fun <M : Any> MessagingService.onNext(topic: String, sessionId: Long): CordaFutu
return messageFuture
}
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID())
= send(TopicSession(topic, sessionID), payload, to, uuid)
fun MessagingService.send(topic: String, sessionID: Long, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID()) {
send(TopicSession(topic, sessionID), payload, to, uuid)
}
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID(), retryId: Long? = null)
= send(createMessage(topicSession, payload.serialize().bytes, uuid), to, retryId)
fun MessagingService.send(topicSession: TopicSession, payload: Any, to: MessageRecipients, uuid: UUID = UUID.randomUUID(), retryId: Long? = null) {
send(createMessage(topicSession, payload.serialize().bytes, uuid), to, retryId)
}
interface MessageHandlerRegistration

View File

@ -17,10 +17,13 @@ import net.corda.core.utilities.trace
import net.corda.node.VersionInfo
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.statemachine.StateMachineManagerImpl
import net.corda.node.utilities.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.*
import org.apache.activemq.artemis.api.core.RoutingType

View File

@ -20,6 +20,7 @@ import net.corda.core.context.Trace.InvocationId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.LazyStickyPool
import net.corda.core.internal.LifeCycle
import net.corda.core.internal.join
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
@ -207,6 +208,7 @@ class RPCServer(
}
fun close() {
observationSendExecutor?.join()
reaperScheduledFuture?.cancel(false)
rpcExecutor?.shutdownNow()
reaperExecutor?.shutdownNow()

View File

@ -21,9 +21,9 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.wrapWithDatabaseTransaction
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import net.corda.nodeapi.internal.NotaryInfo
import org.hibernate.Session
import rx.Observable
@ -162,13 +162,13 @@ open class PersistentNetworkMapCache(
if (previousNode == null) {
logger.info("No previous node found")
database.transaction {
updateInfoDB(node)
updateInfoDB(node, session)
changePublisher.onNext(MapChange.Added(node))
}
} else if (previousNode != node) {
logger.info("Previous node was found as: $previousNode")
database.transaction {
updateInfoDB(node)
updateInfoDB(node, session)
changePublisher.onNext(MapChange.Modified(node, previousNode))
}
} else {
@ -223,25 +223,14 @@ open class PersistentNetworkMapCache(
}
}
private fun updateInfoDB(nodeInfo: NodeInfo) {
// TODO Temporary workaround to force isolated transaction (otherwise it causes race conditions when processing
// network map registration on network map node)
database.dataSource.connection.use {
val session = database.entityManagerFactory.withOptions().connection(it.apply {
transactionIsolation = 1
}).openSession()
session.use {
val tx = session.beginTransaction()
// TODO For now the main legal identity is left in NodeInfo, this should be set comparision/come up with index for NodeInfo?
val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey)
val nodeInfoEntry = generateMappedObject(nodeInfo)
if (info.isNotEmpty()) {
nodeInfoEntry.id = info[0].id
}
session.merge(nodeInfoEntry)
tx.commit()
}
private fun updateInfoDB(nodeInfo: NodeInfo, session: Session) {
// TODO For now the main legal identity is left in NodeInfo, this should be set comparision/come up with index for NodeInfo?
val info = findByIdentityKey(session, nodeInfo.legalIdentitiesAndCerts.first().owningKey)
val nodeInfoEntry = generateMappedObject(nodeInfo)
if (info.isNotEmpty()) {
nodeInfoEntry.id = info.first().id
}
session.merge(nodeInfoEntry)
}
private fun removeInfoDB(session: Session, nodeInfo: NodeInfo) {

View File

@ -3,8 +3,8 @@ package net.corda.node.services.persistence
import net.corda.core.serialization.SerializedBytes
import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.node.utilities.currentDBSession
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id

View File

@ -7,6 +7,9 @@ import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage
import net.corda.node.utilities.*
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import rx.subjects.PublishSubject
import java.util.*
import javax.annotation.concurrent.ThreadSafe

View File

@ -8,6 +8,9 @@ import net.corda.core.serialization.*
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.utilities.*
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import rx.Observable
import rx.subjects.PublishSubject
import javax.persistence.*

View File

@ -17,9 +17,9 @@ import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.serialization.*
import net.corda.core.utilities.contextLogger
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
import net.corda.node.utilities.DatabaseTransactionManager
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.node.utilities.currentDBSession
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
import java.io.*
import java.nio.file.Paths
import java.time.Instant

View File

@ -9,8 +9,9 @@ import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.services.persistence.HibernateConfiguration
import net.corda.node.utilities.DatabaseTransactionManager
import net.corda.node.services.api.SchemaService
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
import org.hibernate.FlushMode
import rx.Observable
@ -18,12 +19,11 @@ import rx.Observable
* A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate.
*/
// TODO: Manage version evolution of the schemas via additional tooling.
class HibernateObserver private constructor(private val config: HibernateConfiguration) {
class HibernateObserver private constructor(private val config: HibernateConfiguration, private val schemaService: SchemaService) {
companion object {
private val log = contextLogger()
@JvmStatic
fun install(vaultUpdates: Observable<Vault.Update<ContractState>>, config: HibernateConfiguration): HibernateObserver {
val observer = HibernateObserver(config)
fun install(vaultUpdates: Observable<Vault.Update<ContractState>>, config: HibernateConfiguration, schemaService: SchemaService): HibernateObserver {
val observer = HibernateObserver(config, schemaService)
vaultUpdates.subscribe { observer.persist(it.produced) }
return observer
}
@ -36,7 +36,7 @@ class HibernateObserver private constructor(private val config: HibernateConfigu
private fun persistState(stateAndRef: StateAndRef<ContractState>) {
val state = stateAndRef.state.data
log.debug { "Asked to persist state ${stateAndRef.ref}" }
config.schemaService.selectSchemas(state).forEach { persistStateWithSchema(state, stateAndRef.ref, it) }
schemaService.selectSchemas(state).forEach { persistStateWithSchema(state, stateAndRef.ref, it) }
}
@VisibleForTesting
@ -47,7 +47,7 @@ class HibernateObserver private constructor(private val config: HibernateConfigu
flushMode(FlushMode.MANUAL).
openSession()
session.use {
val mappedObject = config.schemaService.generateMappedObject(state, schema)
val mappedObject = schemaService.generateMappedObject(state, schema)
mappedObject.stateRef = PersistentStateRef(stateRef)
it.persist(mappedObject)
it.flush()

View File

@ -9,8 +9,8 @@ import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.SchemaService.SchemaOptions
import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
@ -27,14 +27,12 @@ import net.corda.node.services.vault.VaultSchemaV1
/**
* Most basic implementation of [SchemaService].
* @param cordappLoader if not null, custom schemas will be extracted from its cordapps.
* TODO: support loading schema options from node configuration.
* TODO: support configuring what schemas are to be selected for persistence.
* TODO: support plugins for schema version upgrading or custom mapping not supported by original [QueryableState].
* TODO: create whitelisted tables when a CorDapp is first installed
*/
class NodeSchemaService(cordappLoader: CordappLoader?) : SchemaService, SingletonSerializeAsToken() {
class NodeSchemaService(extraSchemas: Set<MappedSchema> = emptySet()) : SchemaService, SingletonSerializeAsToken() {
// Entities for compulsory services
object NodeServices
@ -60,17 +58,12 @@ class NodeSchemaService(cordappLoader: CordappLoader?) : SchemaService, Singleto
// Required schemas are those used by internal Corda services
// For example, cash is used by the vault for coin selection (but will be extracted as a standalone CorDapp in future)
private val requiredSchemas: Map<MappedSchema, SchemaService.SchemaOptions> =
mapOf(Pair(CommonSchemaV1, SchemaService.SchemaOptions()),
Pair(VaultSchemaV1, SchemaService.SchemaOptions()),
Pair(NodeInfoSchemaV1, SchemaService.SchemaOptions()),
Pair(NodeServicesV1, SchemaService.SchemaOptions()))
mapOf(Pair(CommonSchemaV1, SchemaOptions()),
Pair(VaultSchemaV1, SchemaOptions()),
Pair(NodeInfoSchemaV1, SchemaOptions()),
Pair(NodeServicesV1, SchemaOptions()))
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = if (cordappLoader == null) {
requiredSchemas
} else {
val customSchemas = cordappLoader.cordapps.flatMap { it.customSchemas }.toSet()
requiredSchemas.plus(customSchemas.map { mappedSchema -> Pair(mappedSchema, SchemaService.SchemaOptions()) })
}
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas + extraSchemas.associateBy({ it }, { SchemaOptions() })
// Currently returns all schemas supported by the state, with no filtering or enrichment.
override fun selectSchemas(state: ContractState): Iterable<MappedSchema> {

Some files were not shown because too many files have changed in this diff Show More