Merge remote-tracking branch 'open/master' into os-merge-757181e

# Conflicts:
#	node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt
#	node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt
#	node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
#	node/src/main/kotlin/net/corda/node/internal/Node.kt
#	node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt
#	node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt
#	node/src/test/kotlin/net/corda/node/internal/NodeTest.kt
#	node/src/test/kotlin/net/corda/node/services/identity/PersistentIdentityServiceTests.kt
#	testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt
This commit is contained in:
Shams Asari 2018-07-24 17:04:51 +01:00
commit 02fae5f385
54 changed files with 844 additions and 821 deletions

View File

@ -71,13 +71,13 @@ task patchCore(type: Zip, dependsOn: coreJarTask) {
import proguard.gradle.ProGuardTask
task predeterminise(type: ProGuardTask) {
injars patchCore
outjars "$buildDir/proguard/pre-deterministic-${project.version}.jar"
outjars file("$buildDir/proguard/pre-deterministic-${project.version}.jar")
libraryjars "$javaHome/lib/rt.jar"
libraryjars "$javaHome/lib/jce.jar"
libraryjars file("$javaHome/lib/rt.jar")
libraryjars file("$javaHome/lib/jce.jar")
configurations.compileOnly.forEach {
if (originalJar.path != it.path) {
libraryjars it.path, filter: '!META-INF/versions/**'
if (originalJar != it) {
libraryjars it, filter: '!META-INF/versions/**'
}
}
@ -120,12 +120,12 @@ task jarFilter(type: JarFilterTask) {
task determinise(type: ProGuardTask) {
injars jarFilter
outjars "$buildDir/proguard/$jarBaseName-${project.version}.jar"
outjars file("$buildDir/proguard/$jarBaseName-${project.version}.jar")
libraryjars "$javaHome/lib/rt.jar"
libraryjars "$javaHome/lib/jce.jar"
libraryjars file("$javaHome/lib/rt.jar")
libraryjars file("$javaHome/lib/jce.jar")
configurations.runtimeLibraries.forEach {
libraryjars it.path, filter: '!META-INF/versions/**'
libraryjars it, filter: '!META-INF/versions/**'
}
// Analyse the JAR for dead code, and remove (some of) it.
@ -165,7 +165,7 @@ task checkDeterminism(type: ProGuardTask, dependsOn: jdkTask) {
libraryjars deterministic_rt_jar
configurations.runtimeLibraries.forEach {
libraryjars it.path, filter: '!META-INF/versions/**'
libraryjars it, filter: '!META-INF/versions/**'
}
keepattributes '*'

View File

@ -41,8 +41,7 @@ Let's open the example CorDapp in IntelliJ IDEA:
* Open IntelliJ
* A splash screen will appear. Click ``open``, navigate to the folder where you cloned the ``cordapp-example``, and
click ``OK``
* A splash screen will appear. Click ``open``, select the cloned ``cordapp-example`` folder, and click ``OK``
* Once the project is open, click ``File``, then ``Project Structure``. Under ``Project SDK:``, set the project SDK by
clicking ``New...``, clicking ``JDK``, and navigating to ``C:\Program Files\Java\jdk1.8.0_XXX`` (where ``XXX`` is the

View File

@ -53,10 +53,14 @@ class CashSelectionH2Impl : AbstractCashSelection() {
""" +
(if (notary != null)
" AND vs.notary_name = ?" else "") +
(if (onlyFromIssuerParties.isNotEmpty())
" AND ccs.issuer_key_hash IN (?)" else "") +
(if (withIssuerRefs.isNotEmpty())
" AND ccs.issuer_ref IN (?)" else "")
(if (onlyFromIssuerParties.isNotEmpty()) {
val repeats = generateSequence { "?" }.take(onlyFromIssuerParties.size).joinToString(",")
" AND ccs.issuer_key_hash IN ($repeats)"
} else "") +
(if (withIssuerRefs.isNotEmpty()) {
val repeats = generateSequence { "?" }.take(withIssuerRefs.size).joinToString(",")
" AND ccs.issuer_ref IN ($repeats)"
} else "")
// Use prepared statement for protection against SQL Injection (http://www.h2database.com/html/advanced.html#sql_injection)
connection.prepareStatement(selectJoin).use { psSelectJoin ->
@ -66,10 +70,12 @@ class CashSelectionH2Impl : AbstractCashSelection() {
psSelectJoin.setString(++pIndex, lockId.toString())
if (notary != null)
psSelectJoin.setString(++pIndex, notary.name.toString())
if (onlyFromIssuerParties.isNotEmpty())
psSelectJoin.setObject(++pIndex, onlyFromIssuerParties.map { it.owningKey.toStringShort() as Any }.toTypedArray())
if (withIssuerRefs.isNotEmpty())
psSelectJoin.setObject(++pIndex, withIssuerRefs.map { it.bytes as Any }.toTypedArray())
onlyFromIssuerParties.forEach {
psSelectJoin.setString(++pIndex, it.owningKey.toStringShort())
}
withIssuerRefs.forEach {
psSelectJoin.setBytes(++pIndex, it.bytes)
}
log.debug { psSelectJoin.toString() }
psSelectJoin.executeQuery().use { rs ->

View File

@ -79,4 +79,13 @@ class CashSelectionH2ImplTest {
val paymentResult = node.startFlow(CashPaymentFlow(999.POUNDS, node.info.legalIdentities[0], false)).getOrThrow()
assertNotNull(paymentResult.recipient)
}
@Test
fun `multiple issuers in issuerConstraint condition`() {
val node = mockNet.createNode()
node.startFlow(CashIssueFlow(1.POUNDS, OpaqueBytes.of(1), mockNet.defaultNotaryIdentity)).getOrThrow()
val request = CashPaymentFlow.PaymentRequest(1.POUNDS, node.info.legalIdentities[0], true, setOf(node.info.legalIdentities[0], mockNet.defaultNotaryIdentity))
val paymentResult = node.startFlow(CashPaymentFlow(request)).getOrThrow()
assertNotNull(paymentResult.recipient)
}
}

View File

@ -61,7 +61,6 @@ var contextDatabase: CordaPersistence
val contextDatabaseOrNull: CordaPersistence? get() = _contextDatabase.get()
class CordaPersistence(
val dataSource: DataSource,
databaseConfig: DatabaseConfig,
schemas: Set<MappedSchema>,
val jdbcUrl: String,
@ -82,7 +81,11 @@ class CordaPersistence(
data class Boundary(val txId: UUID, val success: Boolean)
init {
private var _dataSource: DataSource? = null
val dataSource: DataSource get() = checkNotNull(_dataSource) { "CordaPersistence not started" }
fun start(dataSource: DataSource) {
_dataSource = dataSource
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
// database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a
// database transaction open. The [transaction] helper above handles this in a finally clause for you
@ -94,10 +97,10 @@ class CordaPersistence(
// Check not in read-only mode.
transaction {
check(!connection.metaData.isReadOnly) { "Database should not be readonly." }
}
}
object DataSourceConfigTag {
}
}
object DataSourceConfigTag {
const val DATA_SOURCE_URL = "dataSource.url"
}
@ -189,7 +192,7 @@ class CordaPersistence(
override fun close() {
// DataSource doesn't implement AutoCloseable so we just have to hope that the implementation does so that we can close it
(dataSource as? AutoCloseable)?.close()
(_dataSource as? AutoCloseable)?.close()
}
}

View File

@ -77,7 +77,9 @@ class AttachmentsClassLoaderStaticContractTests {
}
private val serviceHub = rigorousMock<ServicesForResolution>().also {
doReturn(CordappProviderImpl(cordappLoaderForPackages(listOf("net.corda.nodeapi.internal")), MockCordappConfigProvider(), MockAttachmentStorage(), testNetworkParameters().whitelistedContractImplementations)).whenever(it).cordappProvider
val cordappProviderImpl = CordappProviderImpl(cordappLoaderForPackages(listOf("net.corda.nodeapi.internal")), MockCordappConfigProvider(), MockAttachmentStorage())
cordappProviderImpl.start(testNetworkParameters().whitelistedContractImplementations)
doReturn(cordappProviderImpl).whenever(it).cordappProvider
doReturn(testNetworkParameters()).whenever(it).networkParameters
}

View File

@ -51,7 +51,9 @@ class AttachmentLoadingTests : IntegrationTest() {
@JvmField
val testSerialization = SerializationEnvironmentRule()
private val attachments = MockAttachmentStorage()
private val provider = CordappProviderImpl(JarScanningCordappLoader.fromJarUrls(listOf(isolatedJAR)), MockCordappConfigProvider(), attachments, testNetworkParameters().whitelistedContractImplementations)
private val provider = CordappProviderImpl(JarScanningCordappLoader.fromJarUrls(listOf(isolatedJAR)), MockCordappConfigProvider(), attachments).apply {
start(testNetworkParameters().whitelistedContractImplementations)
}
private val cordapp get() = provider.cordapps.first()
private val attachmentId get() = provider.getCordappAttachmentId(cordapp)!!
private val appContext get() = provider.getAppContext(cordapp)

View File

@ -25,7 +25,9 @@ import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.rigorousMock
@ -89,7 +91,8 @@ class ArtemisMessagingTest {
}
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeInternalTestDataSourceProperties(configSupplier = { ConfigFactory.empty() }), DatabaseConfig(runMigration = true), { null }, { null })
networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()), rigorousMock(), database)
val persistentNetworkMapCache = PersistentNetworkMapCache(database).apply { start(emptyList()) }
networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, rigorousMock(), database).apply { start() }
}
@After
@ -356,8 +359,8 @@ class ArtemisMessagingTest {
}
}
private fun startNodeMessagingClient() {
messagingClient!!.start()
private fun startNodeMessagingClient(maxMessageSize: Int = MAX_MESSAGE_SIZE) {
messagingClient!!.start(identity.public, null, maxMessageSize)
}
private fun createAndStartClientAndServer(platformVersion: Int = 1, serverMaxMessageSize: Int = MAX_MESSAGE_SIZE,
@ -368,14 +371,14 @@ class ArtemisMessagingTest {
createMessagingServer(maxMessageSize = serverMaxMessageSize).start()
val messagingClient = createMessagingClient(platformVersion = platformVersion, maxMessageSize = clientMaxMessageSize)
val messagingClient = createMessagingClient(platformVersion = platformVersion)
messagingClient.addMessageHandler(TOPIC) { message, _, handle ->
if (dontAckCondition(message)) return@addMessageHandler
database.transaction { handle.insideDatabaseTransaction() }
handle.afterDatabaseTransaction() // We ACK first so that if it fails we won't get a duplicate in [receivedMessages]
receivedMessages.add(message)
}
startNodeMessagingClient()
startNodeMessagingClient(maxMessageSize = clientMaxMessageSize)
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.
thread(isDaemon = true) { messagingClient.run() }
@ -383,20 +386,15 @@ class ArtemisMessagingTest {
return Pair(messagingClient, receivedMessages)
}
private fun createMessagingClient(server: NetworkHostAndPort = NetworkHostAndPort("localhost", serverPort), platformVersion: Int = 1, maxMessageSize: Int = MAX_MESSAGE_SIZE): P2PMessagingClient {
private fun createMessagingClient(server: NetworkHostAndPort = NetworkHostAndPort("localhost", serverPort), platformVersion: Int = 1): P2PMessagingClient {
return database.transaction {
P2PMessagingClient(
config,
MOCK_VERSION_INFO.copy(platformVersion = platformVersion),
server,
identity.public,
null,
ServiceAffinityExecutor("ArtemisMessagingTests", 1),
database,
networkMapCache,
MetricRegistry(),
ALICE_NAME.toString(),
maxMessageSize = maxMessageSize,
isDrainingModeOn = { false },
drainingModeWasChangedEvents = PublishSubject.create()).apply {
config.configureWithDevSSLCertificate()

View File

@ -27,7 +27,6 @@ import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.Emoji
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.map
@ -40,12 +39,10 @@ import net.corda.core.node.services.*
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.utilities.*
import net.corda.node.CordaClock
import net.corda.node.VersionInfo
import net.corda.node.cordapp.CordappLoader
import net.corda.node.internal.CheckpointVerifier.verifyCheckpointsCompatible
import net.corda.node.internal.classloading.requireAnnotation
import net.corda.node.internal.cordapp.CordappConfigFileProvider
import net.corda.node.internal.cordapp.CordappProviderImpl
@ -53,7 +50,6 @@ import net.corda.node.internal.cordapp.CordappProviderInternal
import net.corda.node.internal.rpc.proxies.AuthenticatedRpcOpsProxy
import net.corda.node.internal.rpc.proxies.ExceptionMaskingRpcOpsProxy
import net.corda.node.internal.rpc.proxies.ExceptionSerialisingRpcOpsProxy
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.ContractUpgradeHandler
import net.corda.node.services.FinalityHandler
import net.corda.node.services.NotaryChangeHandler
@ -63,6 +59,7 @@ import net.corda.node.services.config.shell.toShellConfig
import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.events.ScheduledActivityObserver
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.KeyManagementServiceInternal
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.MessagingService
@ -82,7 +79,10 @@ import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.NodeInfoAndSigned
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.*
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.IncompatibleAttachmentsContractsTableName
import net.corda.nodeapi.internal.storeLegalIdentity
import net.corda.tools.shell.InteractiveShell
import org.apache.activemq.artemis.utils.ReusableLatch
@ -109,6 +109,8 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.TimeUnit.MINUTES
import java.util.concurrent.TimeUnit.SECONDS
import kotlin.collections.set
import kotlin.reflect.KClass
import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
@ -128,50 +130,116 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val platformClock: CordaClock,
protected val versionInfo: VersionInfo,
protected val cordappLoader: CordappLoader,
protected val serverThread: AffinityExecutor.ServiceAffinityExecutor,
protected val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() {
private class StartedNodeImpl<out N : AbstractNode>(
override val internals: N,
services: ServiceHubInternalImpl,
override val info: NodeInfo,
override val checkpointStorage: CheckpointStorage,
override val smm: StateMachineManager,
override val attachments: NodeAttachmentService,
override val network: MessagingService,
override val database: CordaPersistence,
override val rpcOps: CordaRPCOps,
flowStarter: FlowStarter,
override val notaryService: NotaryService?) : StartedNode<N> {
override val services: StartedNodeServices = object : StartedNodeServices, ServiceHubInternal by services, FlowStarter by flowStarter {}
override val smm: StateMachineManager get() = internals.smm
override val attachments: NodeAttachmentService get() = internals.attachments
override val network: MessagingService get() = internals.network
override val database: CordaPersistence get() = internals.database
override val services: StartedNodeServices = object : StartedNodeServices, ServiceHubInternal by internals.services, FlowStarter by internals.flowStarter {}
}
protected abstract val log: Logger
// We will run as much stuff in this single thread as possible to keep the risk of thread safety bugs low during the
// low-performance prototyping period.
protected abstract val serverThread: AffinityExecutor.ServiceAffinityExecutor
@Suppress("LeakingThis")
private var tokenizableServices: MutableList<Any>? = mutableListOf(platformClock, this)
protected val runOnStop = ArrayList<() -> Any?>()
init {
(serverThread as? ExecutorService)?.let {
runOnStop += {
// We wait here, even though any in-flight messages should have been drained away because the
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
// arbitrary and might be inappropriate.
MoreExecutors.shutdownAndAwaitTermination(it, 50, SECONDS)
}
}
}
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null).tokenize()
val identityService = PersistentIdentityService().tokenize()
val database: CordaPersistence = createCordaPersistence(
configuration.database,
identityService::wellKnownPartyFromX500Name,
identityService::wellKnownPartyFromAnonymous,
schemaService
)
init {
// TODO Break cyclic dependency
identityService.database = database
}
private val persistentNetworkMapCache = PersistentNetworkMapCache(database)
val networkMapCache = NetworkMapCacheImpl(persistentNetworkMapCache, identityService, database).tokenize()
val checkpointStorage = DBCheckpointStorage()
@Suppress("LeakingThis")
val transactionStorage = makeTransactionStorage(configuration.transactionCacheSizeBytes).tokenize()
val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL) }
private val metricRegistry = MetricRegistry()
val attachments = NodeAttachmentService(metricRegistry, database, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound).tokenize()
val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments).tokenize()
@Suppress("LeakingThis")
val keyManagementService = makeKeyManagementService(identityService).tokenize()
val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, transactionStorage)
@Suppress("LeakingThis")
val vaultService = makeVaultService(keyManagementService, servicesForResolution, database).tokenize()
val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
val monitoringService = MonitoringService(metricRegistry).tokenize()
val networkMapUpdater = NetworkMapUpdater(
networkMapCache,
NodeInfoWatcher(
configuration.baseDirectory,
@Suppress("LeakingThis")
rxIoScheduler,
Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)
),
networkMapClient,
configuration.baseDirectory,
configuration.extraNetworkMapKeys
).closeOnStop()
@Suppress("LeakingThis")
val transactionVerifierService = InMemoryTransactionVerifierService(transactionVerifierWorkerCount).tokenize()
val contractUpgradeService = ContractUpgradeServiceImpl().tokenize()
val auditService = DummyAuditService().tokenize()
val services = ServiceHubInternalImpl().tokenize()
@Suppress("LeakingThis")
val smm = makeStateMachineManager()
private val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory)
private val schedulerService = NodeSchedulerService(
platformClock,
database,
flowStarter,
servicesForResolution,
flowLogicRefFactory,
nodeProperties,
configuration.drainingModePollPeriod,
unfinishedSchedules = busyNodeLatch
).tokenize().closeOnStop()
// TODO Making this non-lateinit requires MockNode being able to create a blank InMemoryMessaging instance
protected lateinit var network: MessagingService
private val cordappServices = MutableClassToInstanceMap.create<SerializeAsToken>()
private val flowFactories = ConcurrentHashMap<Class<out FlowLogic<*>>, InitiatedFlowFactory<*>>()
protected val services: ServiceHubInternal get() = _services
private lateinit var _services: ServiceHubInternalImpl
protected var myNotaryIdentity: PartyAndCertificate? = null
protected lateinit var checkpointStorage: CheckpointStorage
private lateinit var tokenizableServices: List<Any>
protected lateinit var attachments: NodeAttachmentService
protected lateinit var network: MessagingService
protected val runOnStop = ArrayList<() -> Any?>()
private val _nodeReadyFuture = openFuture<Unit>()
protected var networkMapClient: NetworkMapClient? = null
private lateinit var networkMapUpdater: NetworkMapUpdater
lateinit var securityManager: RPCSecurityManager
private val shutdownExecutor = Executors.newSingleThreadExecutor()
/** Completes once the node has successfully registered with the network map service
* or has loaded network map data from local database */
val nodeReadyFuture: CordaFuture<Unit> get() = _nodeReadyFuture
protected abstract val transactionVerifierWorkerCount: Int
/**
* Should be [rx.schedulers.Schedulers.io] for production,
* or [rx.internal.schedulers.CachedThreadScheduler] (with shutdown registered with [runOnStop]) for shared-JVM testing.
*/
protected abstract val rxIoScheduler: Scheduler
/**
* Completes once the node has successfully registered with the network map service
* or has loaded network map data from local database
*/
val nodeReadyFuture: CordaFuture<Unit> get() = networkMapCache.nodeReady.map { Unit }
open val serializationWhitelists: List<SerializationWhitelist> by lazy {
cordappLoader.cordapps.flatMap { it.serializationWhitelists }
@ -182,10 +250,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
@Volatile
private var _started: StartedNode<AbstractNode>? = null
/** The implementation of the [CordaRPCOps] interface used by this node. */
open fun makeRPCOps(flowStarter: FlowStarter, smm: StateMachineManager): CordaRPCOps {
private fun <T : Any> T.tokenize(): T {
tokenizableServices?.add(this) ?: throw IllegalStateException("The tokenisable services list has already been finialised")
return this
}
val ops: CordaRPCOps = CordaRPCOpsImpl(services, smm, flowStarter, { shutdownExecutor.submit { stop() } })
protected fun <T : AutoCloseable> T.closeOnStop(): T {
runOnStop += this::close
return this
}
/** The implementation of the [CordaRPCOps] interface used by this node. */
open fun makeRPCOps(): CordaRPCOps {
val ops: CordaRPCOps = CordaRPCOpsImpl(services, smm, flowStarter) { shutdownExecutor.submit { stop() } }
val proxies = mutableListOf<(CordaRPCOps) -> CordaRPCOps>()
// Mind that order is relevant here.
proxies += ::AuthenticatedRpcOpsProxy
@ -196,34 +273,29 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
return proxies.fold(ops) { delegate, decorate -> decorate(delegate) }
}
private fun initCertificate() {
private fun initKeyStore(): X509Certificate {
if (configuration.devMode) {
log.warn("The Corda node is running in developer mode. This is not suitable for production usage.")
configuration.configureWithDevSSLCertificate()
} else {
log.info("The Corda node is running in production mode. If this is a developer environment you can set 'devMode=true' in the node.conf file.")
}
validateKeystore()
return validateKeyStore()
}
open fun generateAndSaveNodeInfo(): NodeInfo {
check(started == null) { "Node has already been started" }
log.info("Generating nodeInfo ...")
initCertificate()
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
persistentNetworkMapCache.start(notaries = emptyList())
val trustRoot = initKeyStore()
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
// Wrapped in an atomic reference just to allow setting it before the closure below gets invoked.
val identityServiceRef = AtomicReference<IdentityService>()
val database = initialiseDatabasePersistence(schemaService, { name -> identityServiceRef.get().wellKnownPartyFromX500Name(name) }, { party -> identityServiceRef.get().wellKnownPartyFromAnonymous(party) })
val identityService = makeIdentityService(identity.certificate, database)
identityServiceRef.set(identityService)
startDatabase()
val nodeCa = configuration.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
identityService.start(trustRoot, listOf(identity.certificate, nodeCa))
return database.use {
it.transaction {
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks
// like a design smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null, identity, identityKeyPair)
nodeInfo
val (_, nodeInfoAndSigned) = updateNodeInfo(identity, identityKeyPair, publish = false)
nodeInfoAndSigned.nodeInfo
}
}
}
@ -231,36 +303,53 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
fun clearNetworkMapCache() {
Node.printBasicNodeInfo("Clearing network map cache entries")
log.info("Starting clearing of network map cache entries...")
configureDatabase(configuration.dataSourceProperties, configuration.database, { null }, { null }).use {
PersistentNetworkMapCache(it, emptyList()).clearNetworkMapCache()
persistentNetworkMapCache.start(notaries = emptyList())
startDatabase()
database.use {
persistentNetworkMapCache.clearNetworkMapCache()
}
}
open fun start(): StartedNode<AbstractNode> {
check(started == null) { "Node has already been started" }
if (configuration.devMode) {
System.setProperty("co.paralleluniverse.fibers.verifyInstrumentation", "true")
Emoji.renderIfSupported { Node.printWarning("This node is running in developer mode! ${Emoji.developer} This is not safe for production deployment.") }
}
log.info("Node starting up ...")
initCertificate()
// TODO First thing we do is create the MessagingService. This should have been done by the c'tor but it's not
// possible (yet) to due restriction from MockNode
network = makeMessagingService().tokenize()
val trustRoot = initKeyStore()
val nodeCa = configuration.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
initialiseJVMAgents()
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null)
schemaService.mappedSchemasWarnings().forEach {
val warning = it.toWarning()
log.warn(warning)
Node.printWarning(warning)
}
installCoreFlows()
registerCordappFlows()
services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
val rpcOps = makeRPCOps()
startShell()
networkMapClient?.start(trustRoot)
val (netParams, signedNetParams) = NetworkParametersReader(trustRoot, networkMapClient, configuration.baseDirectory).read()
log.info("Loaded network parameters: $netParams")
check(netParams.minimumPlatformVersion <= versionInfo.platformVersion) {
"Node's platform version is lower than network's required minimumPlatformVersion"
}
servicesForResolution.start(netParams)
persistentNetworkMapCache.start(netParams.notaries)
startDatabase()
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
// Wrapped in an atomic reference just to allow setting it before the closure below gets invoked.
val identityServiceRef = AtomicReference<IdentityService>()
// Do all of this in a database transaction so anything that might need a connection has one.
val database = initialiseDatabasePersistence(
schemaService,
{ name -> identityServiceRef.get().wellKnownPartyFromX500Name(name) },
{ party -> identityServiceRef.get().wellKnownPartyFromAnonymous(party) })
identityService.start(trustRoot, listOf(identity.certificate, nodeCa))
val mutualExclusionConfiguration = configuration.enterpriseConfiguration.mutualExclusionConfiguration
if (mutualExclusionConfiguration.on) {
@ -277,122 +366,61 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
val identityService = makeIdentityService(identity.certificate, database).also(identityServiceRef::set)
networkMapClient = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, identityService.trustRoot) }
val networkParameteresReader = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory)
val networkParameters = networkParameteresReader.networkParameters
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
"Node's platform version is lower than network's required minimumPlatformVersion"
val (keyPairs, nodeInfoAndSigned, myNotaryIdentity) = database.transaction {
networkMapCache.start()
updateNodeInfo(identity, identityKeyPair, publish = true)
}
val (startedImpl, schedulerService) = database.transaction {
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries), identityService, database)
val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair)
val (nodeInfo, signedNodeInfo) = nodeInfoAndSigned
networkMapUpdater.start(trustRoot, signedNetParams.raw.hash, signedNodeInfo.raw.hash)
startMessagingService(rpcOps, nodeInfo, myNotaryIdentity, netParams)
// Do all of this in a database transaction so anything that might need a connection has one.
return database.transaction {
services.start(nodeInfo, netParams)
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
val metrics = MetricRegistry()
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
log.debug("Transaction storage created")
attachments = NodeAttachmentService(metrics, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound, database)
log.debug("Attachment service created")
val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments, networkParameters.whitelistedContractImplementations)
log.debug("Cordapp provider created")
val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, networkParameters, transactionStorage)
val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
val nodeServices = makeServices(
keyPairs,
schemaService,
transactionStorage,
metrics,
servicesForResolution,
database,
nodeInfo,
identityService,
networkMapCache,
nodeProperties,
cordappProvider,
networkParameters)
val notaryService = makeNotaryService(nodeServices, database)
val smm = makeStateMachineManager(database)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory)
val cordaServices = installCordaServices(flowStarter)
val schedulerService = NodeSchedulerService(
platformClock,
database,
flowStarter,
servicesForResolution,
unfinishedSchedules = busyNodeLatch,
flowLogicRefFactory = flowLogicRefFactory,
drainingModePollPeriod = configuration.drainingModePollPeriod,
nodeProperties = nodeProperties)
runOnStop += { schedulerService.join() }
attachments.start()
cordappProvider.start(netParams.whitelistedContractImplementations)
nodeProperties.start()
keyManagementService.start(keyPairs)
val notaryService = makeNotaryService(myNotaryIdentity)
installCordaServices(myNotaryIdentity)
contractUpgradeService.start()
vaultService.start()
ScheduledActivityObserver.install(vaultService, schedulerService, flowLogicRefFactory)
HibernateObserver.install(vaultService.rawUpdates, database.hibernateConfig, schemaService)
tokenizableServices = nodeServices + cordaServices + schedulerService
val frozenTokenizableServices = tokenizableServices!!
tokenizableServices = null
try {
verifyCheckpointsCompatible(checkpointStorage, cordappProvider.cordapps, versionInfo.platformVersion, _services, tokenizableServices)
} catch (e: CheckpointIncompatibleException) {
if (configuration.devMode) {
Node.printWarning(e.message)
} else {
throw e
}
verifyCheckpointsCompatible(frozenTokenizableServices)
smm.start(frozenTokenizableServices)
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
(smm as? StateMachineManagerInternal)?.let {
val flowMonitor = FlowMonitor(smm::snapshot, configuration.flowMonitorPeriodMillis, configuration.flowMonitorSuspensionLoggingThresholdMillis)
runOnStop += flowMonitor::stop
flowMonitor.start()
}
(serverThread as? ExecutorService)?.let {
runOnStop += {
// We wait here, even though any in-flight messages should have been drained away because the
// server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is
// arbitrary and might be inappropriate.
MoreExecutors.shutdownAndAwaitTermination(it, 50, TimeUnit.SECONDS)
}
}
schedulerService.start()
makeVaultObservers(schedulerService, database.hibernateConfig, schemaService, flowLogicRefFactory)
val rpcOps = makeRPCOps(flowStarter, smm)
startMessagingService(rpcOps)
installCoreFlows()
registerCordappFlows(smm)
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
startShell()
Pair(StartedNodeImpl(this@AbstractNode, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
}
networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
networkMapClient,
networkParameteresReader.hash,
services.myInfo.serialize().hash,
configuration.baseDirectory,
configuration.extraNetworkMapKeys)
runOnStop += networkMapUpdater::close
networkMapUpdater.subscribeToNetworkMap()
// If we successfully loaded network data from database, we set this future to Unit.
_nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit })
return startedImpl.apply {
database.transaction {
smm.start(tokenizableServices)
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
(smm as? StateMachineManagerInternal)?.let {
val flowMonitor = FlowMonitor(smm::snapshot, configuration.flowMonitorPeriodMillis, configuration.flowMonitorSuspensionLoggingThresholdMillis)
runOnStop += { flowMonitor.stop() }
flowMonitor.start()
}
schedulerService.start()
}
_started = this
StartedNodeImpl(this@AbstractNode, nodeInfo, rpcOps, notaryService).also { _started = it }
}
}
/**
* Should be [rx.schedulers.Schedulers.io] for production,
* or [rx.internal.schedulers.CachedThreadScheduler] (with shutdown registered with [runOnStop]) for shared-JVM testing.
*/
protected abstract fun getRxIoScheduler(): Scheduler
private fun verifyCheckpointsCompatible(tokenizableServices: List<Any>) {
try {
CheckpointVerifier.verifyCheckpointsCompatible(checkpointStorage, cordappProvider.cordapps, versionInfo.platformVersion, services, tokenizableServices)
} catch (e: CheckpointIncompatibleException) {
if (configuration.devMode) {
Node.printWarning(e.message)
} else {
throw e
}
}
}
open fun startShell() {
if (configuration.shouldInitCrashShell()) {
@ -404,13 +432,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
private fun updateNodeInfo(networkMapCache: NetworkMapCacheBaseInternal,
networkMapClient: NetworkMapClient?,
identity: PartyAndCertificate,
identityKeyPair: KeyPair): Pair<Set<KeyPair>, NodeInfo> {
private fun updateNodeInfo(identity: PartyAndCertificate,
identityKeyPair: KeyPair,
publish: Boolean): Triple<MutableSet<KeyPair>, NodeInfoAndSigned, PartyAndCertificate?> {
val keyPairs = mutableSetOf(identityKeyPair)
myNotaryIdentity = configuration.notary?.let {
val myNotaryIdentity = configuration.notary?.let {
if (it.isClusterConfig) {
val (notaryIdentity, notaryIdentityKeyPair) = obtainIdentity(it)
keyPairs += notaryIdentityKeyPair
@ -428,7 +455,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
serial = 0
)
val nodeInfoFromDb = getPreviousNodeInfoIfPresent(networkMapCache, identity)
val nodeInfoFromDb = getPreviousNodeInfoIfPresent(identity)
val nodeInfo = if (potentialNodeInfo == nodeInfoFromDb?.copy(serial = 0)) {
@ -452,14 +479,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned)
// Always republish on startup, it's treated by network map server as a heartbeat.
if (networkMapClient != null) {
if (publish && networkMapClient != null) {
tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient)
}
return Pair(keyPairs, nodeInfo)
return Triple(keyPairs, nodeInfoAndSigned, myNotaryIdentity)
}
private fun getPreviousNodeInfoIfPresent(networkMapCache: NetworkMapCacheBaseInternal, identity: PartyAndCertificate): NodeInfo? {
private fun getPreviousNodeInfoIfPresent(identity: PartyAndCertificate): NodeInfo? {
val nodeInfosFromDb = networkMapCache.getNodesByLegalName(identity.name)
return when (nodeInfosFromDb.size) {
@ -496,14 +523,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// TODO: Exponential backoff? It should reach max interval of eventHorizon/2.
1.minutes
}
executor.schedule(this, republishInterval.toMinutes(), TimeUnit.MINUTES)
executor.schedule(this, republishInterval.toMinutes(), MINUTES)
}
})
}
protected abstract fun myAddresses(): List<NetworkHostAndPort>
protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager {
protected open fun makeStateMachineManager(): StateMachineManager {
return SingleThreadedStateMachineManager(
services,
checkpointStorage,
@ -517,21 +544,18 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
private class ServiceInstantiationException(cause: Throwable?) : CordaException("Service Instantiation Error", cause)
private fun installCordaServices(flowStarter: FlowStarter): List<SerializeAsToken> {
private fun installCordaServices(myNotaryIdentity: PartyAndCertificate?) {
val loadedServices = cordappLoader.cordapps.flatMap { it.services }
return filterServicesToInstall(loadedServices).mapNotNull {
filterServicesToInstall(loadedServices).forEach {
try {
installCordaService(flowStarter, it)
installCordaService(flowStarter, it, myNotaryIdentity)
} catch (e: NoSuchMethodException) {
log.error("${it.name}, as a Corda service, must have a constructor with a single parameter of type " +
ServiceHub::class.java.name)
null
} catch (e: ServiceInstantiationException) {
log.error("Corda service ${it.name} failed to instantiate", e.cause)
null
} catch (e: Exception) {
log.error("Unable to install Corda service ${it.name}", e)
null
}
}
}
@ -557,6 +581,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
/**
* This customizes the ServiceHub for each CordaService that is initiating flows
*/
// TODO Move this into its own file
private class AppServiceHubImpl<T : SerializeAsToken>(private val serviceHub: ServiceHub, private val flowStarter: FlowStarter) : AppServiceHub, ServiceHub by serviceHub {
lateinit var serviceInstance: T
override fun <T> startTrackedFlow(flow: FlowLogic<T>): FlowProgressHandle<T> {
@ -593,14 +618,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
override fun hashCode() = Objects.hash(serviceHub, flowStarter, serviceInstance)
}
private fun <T : SerializeAsToken> installCordaService(flowStarter: FlowStarter, serviceClass: Class<T>): T {
private fun <T : SerializeAsToken> installCordaService(flowStarter: FlowStarter, serviceClass: Class<T>, myNotaryIdentity: PartyAndCertificate?) {
serviceClass.requireAnnotation<CordaService>()
val service = try {
val serviceContext = AppServiceHubImpl<T>(services, flowStarter)
if (isNotaryService(serviceClass)) {
check(myNotaryIdentity != null) { "Trying to install a notary service but no notary identity specified" }
myNotaryIdentity ?: throw IllegalStateException("Trying to install a notary service but no notary identity specified")
val constructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java, PublicKey::class.java).apply { isAccessible = true }
serviceContext.serviceInstance = constructor.newInstance(serviceContext, myNotaryIdentity!!.owningKey)
serviceContext.serviceInstance = constructor.newInstance(serviceContext, myNotaryIdentity.owningKey)
serviceContext.serviceInstance
} else {
try {
@ -609,7 +634,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
serviceContext.serviceInstance
} catch (ex: NoSuchMethodException) {
val constructor = serviceClass.getDeclaredConstructor(ServiceHub::class.java).apply { isAccessible = true }
log.warn("${serviceClass.name} is using legacy CordaService constructor with ServiceHub parameter. Upgrade to an AppServiceHub parameter to enable updated API features.")
log.warn("${serviceClass.name} is using legacy CordaService constructor with ServiceHub parameter. " +
"Upgrade to an AppServiceHub parameter to enable updated API features.")
constructor.newInstance(services)
}
}
@ -619,18 +645,17 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
cordappServices.putInstance(serviceClass, service)
if (service is NotaryService) handleCustomNotaryService(service)
service.tokenize()
log.info("Installed ${serviceClass.name} Corda service")
return service
}
private fun handleCustomNotaryService(service: NotaryService) {
runOnStop += service::stop
service.start()
installCoreFlow(NotaryFlow.Client::class, service::createServiceFlow)
service.start()
}
private fun registerCordappFlows(smm: StateMachineManager) {
private fun registerCordappFlows() {
cordappLoader.cordapps.flatMap { it.initiatedFlows }
.forEach {
try {
@ -708,7 +733,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
log.debug { "Installed core flow ${clientFlowClass.java.name}" }
}
private fun installCoreFlows() {
installCoreFlow(FinalityFlow::class, ::FinalityHandler)
installCoreFlow(NotaryChangeFlow::class, ::NotaryChangeHandler)
@ -716,59 +740,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
installCoreFlow(SwapIdentitiesFlow::class, ::SwapIdentitiesHandler)
}
/**
* Builds node internal, advertised, and plugin services.
* Returns a list of tokenizable services to be added to the serialisation context.
*/
private fun makeServices(keyPairs: Set<KeyPair>,
schemaService: SchemaService,
transactionStorage: WritableTransactionStorage,
metrics: MetricRegistry,
servicesForResolution: ServicesForResolution,
database: CordaPersistence,
nodeInfo: NodeInfo,
identityService: IdentityServiceInternal,
networkMapCache: NetworkMapCacheInternal,
nodeProperties: NodePropertiesStore,
cordappProvider: CordappProviderInternal,
networkParameters: NetworkParameters): MutableList<Any> {
checkpointStorage = DBCheckpointStorage()
val keyManagementService = makeKeyManagementService(identityService, keyPairs, database)
_services = ServiceHubInternalImpl(
identityService,
keyManagementService,
schemaService,
transactionStorage,
MonitoringService(metrics),
cordappProvider,
database,
nodeInfo,
networkMapCache,
nodeProperties,
networkParameters,
servicesForResolution)
network = makeMessagingService(database, nodeInfo, nodeProperties, networkParameters)
return mutableListOf(attachments, network, services.vaultService,
services.keyManagementService, services.identityService, platformClock,
services.auditService, services.monitoringService, services.networkMapCache, services.schemaService,
services.transactionVerifierService, services.validatedTransactions, services.contractUpgradeService,
services, cordappProvider, this)
}
protected open fun makeTransactionStorage(database: CordaPersistence, transactionCacheSizeBytes: Long): WritableTransactionStorage = DBTransactionStorage(transactionCacheSizeBytes, database)
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, schemaService: SchemaService, flowLogicRefFactory: FlowLogicRefFactory) {
ScheduledActivityObserver.install(services.vaultService, schedulerService, flowLogicRefFactory)
HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig, schemaService)
protected open fun makeTransactionStorage(transactionCacheSizeBytes: Long): WritableTransactionStorage {
return DBTransactionStorage(transactionCacheSizeBytes, database)
}
@VisibleForTesting
protected open fun acceptableLiveFiberCountOnStop(): Int = 0
private fun validateKeystore() {
private fun validateKeyStore(): X509Certificate {
val containCorrectKeys = try {
// This will throw IOException if key file not found or KeyStoreException if keystore password is incorrect.
val sslKeystore = configuration.loadSslKeyStore()
@ -798,36 +777,33 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
if (configuration.devMode) {
val blacklisted = isCRLDistributionPointBlacklisted(configuration.loadNodeKeyStore().getCertificateChain(X509Utilities.CORDA_CLIENT_CA))
if(blacklisted){
if (blacklisted) {
log.warn("The format of the autogenerated dev. mode certificate this system uses has been deprecated. Please contact support@r3.com for information on how to upgrade.")
}
}
return trustRoot
}
// Specific class so that MockNode can catch it.
class DatabaseConfigurationException(msg: String) : CordaException(msg)
protected open fun initialiseDatabasePersistence(schemaService: SchemaService,
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?): CordaPersistence {
protected open fun startDatabase() {
log.debug {
val driverClasses = DriverManager.getDrivers().asSequence().map { it.javaClass.name }
"Available JDBC drivers: $driverClasses"
}
val props = configuration.dataSourceProperties
if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
val database = configureDatabase(props, configuration.database, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous, schemaService)
database.hikariStart(props)
// Now log the vendor string as this will also cause a connection to be tested eagerly.
logVendorString(database, log)
runOnStop += database::close
return database
}
private fun makeNotaryService(tokenizableServices: MutableList<Any>, database: CordaPersistence): NotaryService? {
private fun makeNotaryService(myNotaryIdentity: PartyAndCertificate?): NotaryService? {
return configuration.notary?.let {
makeCoreNotaryService(it, database).also {
tokenizableServices.add(it)
makeCoreNotaryService(it, myNotaryIdentity).also {
it.tokenize()
runOnStop += it::stop
installCoreFlow(NotaryFlow.Client::class, it::createServiceFlow)
log.info("Running core notary: ${it.javaClass.name}")
@ -836,17 +812,20 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
protected open fun makeKeyManagementService(identityService: IdentityService, keyPairs: Set<KeyPair>, database: CordaPersistence): KeyManagementService {
return PersistentKeyManagementService(identityService, keyPairs, database)
protected open fun makeKeyManagementService(identityService: IdentityService): KeyManagementServiceInternal {
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
// the identity key. But the infrastructure to make that easy isn't here yet.
return PersistentKeyManagementService(identityService, database)
}
private fun makeCoreNotaryService(notaryConfig: NotaryConfig, database: CordaPersistence): NotaryService {
private fun makeCoreNotaryService(notaryConfig: NotaryConfig, myNotaryIdentity: PartyAndCertificate?): NotaryService {
val notaryKey = myNotaryIdentity?.owningKey
?: throw IllegalArgumentException("No notary identity initialized when creating a notary service")
return notaryConfig.run {
when {
raft != null -> {
val uniquenessProvider = RaftUniquenessProvider(configuration, database, services.clock, services.monitoringService.metrics, raft)
val uniquenessProvider = RaftUniquenessProvider(configuration, database, platformClock, monitoringService.metrics, raft)
(if (validating) ::RaftValidatingNotaryService else ::RaftNonValidatingNotaryService)(services, notaryKey, uniquenessProvider)
}
bftSMaRt != null -> {
@ -869,14 +848,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
private fun makeIdentityService(identityCert: X509Certificate, database: CordaPersistence): PersistentIdentityService {
val trustRoot = configuration.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
val nodeCa = configuration.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
return PersistentIdentityService(trustRoot, database, listOf(identityCert, nodeCa))
}
protected abstract fun makeTransactionVerifierService(): TransactionVerifierService
open fun stop() {
// TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the
// network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop()
@ -893,8 +864,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
_started = null
}
protected abstract fun makeMessagingService(database: CordaPersistence, info: NodeInfo, nodeProperties: NodePropertiesStore, networkParameters: NetworkParameters): MessagingService
protected abstract fun startMessagingService(rpcOps: RPCOps)
protected abstract fun makeMessagingService(): MessagingService
protected abstract fun startMessagingService(rpcOps: RPCOps,
nodeInfo: NodeInfo,
myNotaryIdentity: PartyAndCertificate?,
networkParameters: NetworkParameters)
private fun obtainIdentity(notaryConfig: NotaryConfig?): Pair<PartyAndCertificate, KeyPair> {
val keyStore = configuration.loadNodeKeyStore()
@ -941,8 +916,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
throw ConfigurationException("The name '$singleName' for $id doesn't match what's in the key store: $subject")
} else if (notaryConfig != null && notaryConfig.isClusterConfig && notaryConfig.serviceLegalName != null && subject != notaryConfig.serviceLegalName) {
// Note that we're not checking if `notaryConfig.serviceLegalName` is not present for backwards compatibility.
throw ConfigurationException("The name of the notary service '${notaryConfig.serviceLegalName}' for $id doesn't match what's in the key store: $subject. " +
"You might need to adjust the configuration of `notary.serviceLegalName`.")
throw ConfigurationException("The name of the notary service '${notaryConfig.serviceLegalName}' for $id doesn't " +
"match what's in the key store: $subject. You might need to adjust the configuration of `notary.serviceLegalName`.")
}
val certPath = X509Utilities.buildCertPath(certificates)
@ -950,8 +925,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
protected open fun generateKeyPair() = cryptoGenerateKeyPair()
protected open fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, hibernateConfig: HibernateConfiguration, database: CordaPersistence): VaultServiceInternal {
return NodeVaultService(platformClock, keyManagementService, services, hibernateConfig, database)
protected open fun makeVaultService(keyManagementService: KeyManagementService,
services: ServicesForResolution,
database: CordaPersistence): VaultServiceInternal {
return NodeVaultService(platformClock, keyManagementService, services, database)
}
/** Load configured JVM agents */
@ -970,34 +948,39 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
private inner class ServiceHubInternalImpl(
override val identityService: IdentityService,
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
// the identity key. But the infrastructure to make that easy isn't here yet.
override val keyManagementService: KeyManagementService,
override val schemaService: SchemaService,
override val validatedTransactions: WritableTransactionStorage,
override val monitoringService: MonitoringService,
override val cordappProvider: CordappProviderInternal,
override val database: CordaPersistence,
override val myInfo: NodeInfo,
override val networkMapCache: NetworkMapCacheInternal,
override val nodeProperties: NodePropertiesStore,
override val networkParameters: NetworkParameters,
private val servicesForResolution: ServicesForResolution
) : SingletonSerializeAsToken(), ServiceHubInternal, ServicesForResolution by servicesForResolution {
inner class ServiceHubInternalImpl : SingletonSerializeAsToken(), ServiceHubInternal, ServicesForResolution by servicesForResolution {
override val rpcFlows = ArrayList<Class<out FlowLogic<*>>>()
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage(database)
override val auditService = DummyAuditService()
override val transactionVerifierService by lazy { makeTransactionVerifierService() }
override val vaultService by lazy { makeVaultService(keyManagementService, servicesForResolution, database.hibernateConfig, database) }
override val contractUpgradeService by lazy { ContractUpgradeServiceImpl() }
override val identityService: IdentityService get() = this@AbstractNode.identityService
override val keyManagementService: KeyManagementService get() = this@AbstractNode.keyManagementService
override val schemaService: SchemaService get() = this@AbstractNode.schemaService
override val validatedTransactions: WritableTransactionStorage get() = this@AbstractNode.transactionStorage
override val cordappProvider: CordappProviderInternal get() = this@AbstractNode.cordappProvider
override val networkMapCache: NetworkMapCacheInternal get() = this@AbstractNode.networkMapCache
override val vaultService: VaultServiceInternal get() = this@AbstractNode.vaultService
override val nodeProperties: NodePropertiesStore get() = this@AbstractNode.nodeProperties
override val database: CordaPersistence get() = this@AbstractNode.database
override val monitoringService: MonitoringService get() = this@AbstractNode.monitoringService
override val transactionVerifierService: TransactionVerifierService get() = this@AbstractNode.transactionVerifierService
override val contractUpgradeService: ContractUpgradeService get() = this@AbstractNode.contractUpgradeService
override val auditService: AuditService get() = this@AbstractNode.auditService
override val attachments: AttachmentStorage get() = this@AbstractNode.attachments
override val networkService: MessagingService get() = network
override val clock: Clock get() = platformClock
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration
override val networkMapUpdater: NetworkMapUpdater get() = this@AbstractNode.networkMapUpdater
private lateinit var _myInfo: NodeInfo
override val myInfo: NodeInfo get() = _myInfo
private lateinit var _networkParameters: NetworkParameters
override val networkParameters: NetworkParameters get() = _networkParameters
fun start(myInfo: NodeInfo, networkParameters: NetworkParameters) {
this._myInfo = myInfo
this._networkParameters = networkParameters
}
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" }
return cordappServices.getInstance(type)
@ -1024,6 +1007,7 @@ internal fun logVendorString(database: CordaPersistence, log: Logger) {
}
}
// TODO Move this into its own file
internal class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogicRefFactory: FlowLogicRefFactory) : FlowStarter {
override fun <T> startFlow(event: ExternalEvent.ExternalStartFlowEvent<T>): CordaFuture<FlowStateMachine<T>> {
smm.deliverExternalEvent(event)
@ -1070,31 +1054,38 @@ internal class FlowStarterImpl(private val smm: StateMachineManager, private val
class ConfigurationException(message: String) : CordaException(message)
/**
* Creates the connection pool to the database.
*
*@throws [CouldNotCreateDataSourceException]
*/
// TODO This is no longer used by AbstractNode and can be moved elsewhere
fun configureDatabase(hikariProperties: Properties,
databaseConfig: DatabaseConfig,
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?,
schemaService: SchemaService = NodeSchemaService()): CordaPersistence {
val persistence = createCordaPersistence(databaseConfig, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous, schemaService)
persistence.hikariStart(hikariProperties)
return persistence
}
fun createCordaPersistence(databaseConfig: DatabaseConfig,
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?,
schemaService: SchemaService): CordaPersistence {
// Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately
// Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
// either Hibernate can be convinced to stop warning, use the descriptor by default, or something else.
JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
try {
val dataSource = DataSourceFactory.createDataSource(hikariProperties)
val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
SchemaMigration(
schemaService.schemaOptions.keys,
dataSource,
!isH2Database(jdbcUrl),
databaseConfig).nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L })
return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, attributeConverters)
databaseConfig).nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L })return CordaPersistence( databaseConfig, schemaService.schemaOptions.keys, jdbcUrl,attributeConverters)}
fun CordaPersistence.hikariStart(hikariProperties: Properties) {
try {
start(DataSourceFactory.createDataSource(hikariProperties))
} catch (ex: Exception) {
when {
ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.", ex)

View File

@ -44,14 +44,9 @@ class NetworkParametersReader(private val trustRoot: X509Certificate,
)
}
private data class NetworkParamsAndHash(val networkParameters: NetworkParameters, val hash: SecureHash)
private val networkParamsFile = baseDirectory / NETWORK_PARAMS_FILE_NAME
private val parametersUpdateFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
private val netParamsAndHash by lazy { retrieveNetworkParameters() }
val networkParameters get() = netParamsAndHash.networkParameters
val hash get() = netParamsAndHash.hash
private fun retrieveNetworkParameters(): NetworkParamsAndHash {
fun read(): NetworkParametersAndSigned {
val advertisedParametersHash = try {
networkMapClient?.getNetworkMap()?.payload?.networkParameterHash
} catch (e: Exception) {
@ -64,7 +59,7 @@ class NetworkParametersReader(private val trustRoot: X509Certificate,
} else {
null
}
val parameters = if (advertisedParametersHash != null) {
val signedParameters = if (advertisedParametersHash != null) {
// TODO On one hand we have node starting without parameters and just accepting them by default,
// on the other we have parameters update process - it needs to be unified. Say you start the node, you don't have matching parameters,
// you get them from network map, but you have to run the approval step.
@ -78,11 +73,12 @@ class NetworkParametersReader(private val trustRoot: X509Certificate,
} else { // No compatibility zone configured. Node should proceed with parameters from file.
signedParametersFromFile ?: throw Error.ParamsNotConfigured()
}
logger.info("Loaded network parameters: $parameters")
return NetworkParamsAndHash(parameters.verifiedNetworkMapCert(trustRoot), parameters.raw.hash)
return NetworkParametersAndSigned(signedParameters, trustRoot)
}
private fun readParametersUpdate(advertisedParametersHash: SecureHash, previousParametersHash: SecureHash): SignedNetworkParameters {
val parametersUpdateFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME
if (!parametersUpdateFile.exists()) {
throw Error.OldParams(previousParametersHash, advertisedParametersHash)
}
@ -98,9 +94,17 @@ class NetworkParametersReader(private val trustRoot: X509Certificate,
// Used only when node joins for the first time.
private fun downloadParameters(parametersHash: SecureHash): SignedNetworkParameters {
logger.info("No network-parameters file found. Expecting network parameters to be available from the network map.")
val networkMapClient = networkMapClient ?: throw Error.NetworkMapNotConfigured()
networkMapClient ?: throw Error.NetworkMapNotConfigured()
val signedParams = networkMapClient.getNetworkParameters(parametersHash)
signedParams.serialize().open().copyTo(baseDirectory / NETWORK_PARAMS_FILE_NAME)
return signedParams
}
// By passing in just the SignedNetworkParameters object, this class guarantees that the networkParameters property
// could have only been derived from it.
class NetworkParametersAndSigned(val signed: SignedNetworkParameters, trustRoot: X509Certificate) {
val networkParameters: NetworkParameters = signed.verifiedNetworkMapCert(trustRoot)
operator fun component1() = networkParameters
operator fun component2() = signed
}
}

View File

@ -13,19 +13,18 @@ package net.corda.node.internal
import com.codahale.metrics.JmxReporter
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
import net.corda.core.concurrent.CordaFuture
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.Emoji
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.div
import net.corda.core.internal.errors.AddressBindingException
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.RPCOps
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
@ -33,30 +32,23 @@ import net.corda.core.utilities.contextLogger
import net.corda.node.CordaClock
import net.corda.node.SimpleClock
import net.corda.node.VersionInfo
import net.corda.node.cordapp.CordappLoader
import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.internal.artemis.BrokerAddresses
import net.corda.node.internal.cordapp.JarScanningCordappLoader
import net.corda.core.internal.errors.AddressBindingException
import net.corda.node.cordapp.CordappLoader
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser
import net.corda.node.serialization.amqp.AMQPServerSerializationScheme
import net.corda.node.serialization.kryo.KRYO_CHECKPOINT_CONTEXT
import net.corda.node.serialization.kryo.KryoServerSerializationScheme
import net.corda.node.services.Permissions
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.SecurityConfiguration
import net.corda.node.services.config.shouldInitCrashShell
import net.corda.node.services.config.shouldStartLocalShell
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.InternalRPCMessagingClient
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.P2PMessagingClient
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.node.services.messaging.*
import net.corda.node.services.rpc.ArtemisRpcBroker
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.utilities.AddressUtils
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.DemoClock
@ -66,12 +58,7 @@ import net.corda.nodeapi.internal.addShutdownHook
import net.corda.nodeapi.internal.bridging.BridgeControlListener
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.serialization.internal.AMQP_P2P_CONTEXT
import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
import net.corda.serialization.internal.AMQP_RPC_SERVER_CONTEXT
import net.corda.serialization.internal.AMQP_STORAGE_CONTEXT
import net.corda.serialization.internal.SerializationFactoryImpl
import net.corda.serialization.internal.*
import org.h2.jdbc.JdbcSQLException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@ -79,7 +66,6 @@ import rx.Scheduler
import rx.schedulers.Schedulers
import java.net.BindException
import java.nio.file.Path
import java.security.PublicKey
import java.time.Clock
import java.util.concurrent.atomic.AtomicInteger
import javax.management.ObjectName
@ -95,7 +81,14 @@ open class Node(configuration: NodeConfiguration,
versionInfo: VersionInfo,
private val initialiseSerialization: Boolean = true,
cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo)
) : AbstractNode(configuration, createClock(configuration), versionInfo, cordappLoader) {
) : AbstractNode(
configuration,
createClock(configuration),
versionInfo,
cordappLoader,
// Under normal (non-test execution) it will always be "1"
AffinityExecutor.ServiceAffinityExecutor("Node thread-${sameVmNodeCounter.incrementAndGet()}", 1)
) {
companion object {
private val staticLog = contextLogger()
var renderBasicInfoToConsole = true
@ -136,9 +129,12 @@ open class Node(configuration: NodeConfiguration,
}
override val log: Logger get() = staticLog
override fun makeTransactionVerifierService(): TransactionVerifierService = InMemoryTransactionVerifierService(numberOfWorkers = 4)
override val transactionVerifierWorkerCount: Int get() = 4
private val sameVmNodeNumber = sameVmNodeCounter.incrementAndGet() // Under normal (non-test execution) it will always be "1"
private var internalRpcMessagingClient: InternalRPCMessagingClient? = null
private var rpcBroker: ArtemisBroker? = null
private var shutdownHook: ShutdownHook? = null
// DISCUSSION
//
@ -177,47 +173,56 @@ open class Node(configuration: NodeConfiguration,
//
// The primary work done by the server thread is execution of flow logics, and related
// serialisation/deserialisation work.
override lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor
private var messageBroker: ArtemisMessagingServer? = null
private var bridgeControlListener: BridgeControlListener? = null
private var rpcBroker: ArtemisBroker? = null
override fun makeMessagingService(): MessagingService {
return P2PMessagingClient(
config = configuration,
versionInfo = versionInfo,
serverAddress = configuration.messagingServerAddress ?: NetworkHostAndPort("localhost", configuration.p2pAddress.port),
nodeExecutor = serverThread,
database = database,
networkMap = networkMapCache,
isDrainingModeOn = nodeProperties.flowsDrainingMode::isEnabled,
drainingModeWasChangedEvents = nodeProperties.flowsDrainingMode.values
)
}
private var shutdownHook: ShutdownHook? = null
override fun startMessagingService(rpcOps: RPCOps, nodeInfo: NodeInfo, myNotaryIdentity: PartyAndCertificate?, networkParameters: NetworkParameters) {
require(nodeInfo.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" }
val client = network as P2PMessagingClient
override fun makeMessagingService(database: CordaPersistence,
info: NodeInfo,
nodeProperties: NodePropertiesStore,
networkParameters: NetworkParameters): MessagingService {
// Construct security manager reading users data either from the 'security' config section
// if present or from rpcUsers list if the former is missing from config.
val securityManagerConfig = configuration.security?.authService
?: SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers)
securityManager = with(RPCSecurityManagerImpl(securityManagerConfig)) {
val securityManager = with(RPCSecurityManagerImpl(securityManagerConfig)) {
if (configuration.shouldStartLocalShell()) RPCSecurityManagerWithAdditionalUser(this, User(INTERNAL_SHELL_USER, INTERNAL_SHELL_USER, setOf(Permissions.all()))) else this
}
if (!configuration.messagingServerExternal) {
val messageBroker = if (!configuration.messagingServerExternal) {
val brokerBindAddress = configuration.messagingServerAddress ?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port)
messageBroker = ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize)
ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize)
} else {
null
}
val serverAddress = configuration.messagingServerAddress
?: NetworkHostAndPort("localhost", configuration.p2pAddress.port)
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) {
BrokerAddresses(configuration.rpcOptions.address, configuration.rpcOptions.adminAddress)
} else {
startLocalRpcBroker()
startLocalRpcBroker(securityManager)
}
val advertisedAddress = info.addresses.single()
val externalBridge = configuration.enterpriseConfiguration.externalBridge
if (externalBridge == null || !externalBridge) {
bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
val bridgeControlListener = if (externalBridge == null || !externalBridge) {
BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
} else {
null
}
printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString())
printBasicNodeInfo("Advertised P2P messaging addresses", nodeInfo.addresses.joinToString())
val rpcServerConfiguration = RPCServerConfiguration.DEFAULT.copy(
rpcThreadPoolSize = configuration.enterpriseConfiguration.tuning.rpcThreadPoolSize
@ -227,26 +232,36 @@ open class Node(configuration: NodeConfiguration,
printBasicNodeInfo("RPC connection address", it.primary.toString())
printBasicNodeInfo("RPC admin connection address", it.admin.toString())
}
require(info.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" }
val serviceIdentity: PublicKey? = if (info.legalIdentities.size == 1) null else info.legalIdentities[1].owningKey
return P2PMessagingClient(
configuration,
versionInfo,
serverAddress,
info.legalIdentities[0].owningKey,
serviceIdentity,
serverThread,
database,
services.networkMapCache,
services.monitoringService.metrics,
info.legalIdentities[0].name.toString(),
advertisedAddress,
networkParameters.maxMessageSize,
nodeProperties.flowsDrainingMode::isEnabled,
nodeProperties.flowsDrainingMode.values)
// Start up the embedded MQ server
messageBroker?.apply {
closeOnStop()
start()
}
rpcBroker?.apply {
closeOnStop()
start()
}
// Start P2P bridge service
bridgeControlListener.apply {
closeOnStop()
start()
}
// Start up the MQ clients.
internalRpcMessagingClient?.run {
closeOnStop()
init(rpcOps, securityManager)
}
client.closeOnStop()
client.start(
myIdentity = nodeInfo.legalIdentities[0].owningKey,
serviceIdentity = if (nodeInfo.legalIdentities.size == 1) null else nodeInfo.legalIdentities[1].owningKey,
advertisedAddress = nodeInfo.addresses[0],
maxMessageSize = networkParameters.maxMessageSize
)
}
private fun startLocalRpcBroker(): BrokerAddresses? {
private fun startLocalRpcBroker(securityManager: RPCSecurityManager): BrokerAddresses? {
return with(configuration) {
rpcOptions.address.let {
val rpcBrokerDirectory: Path = baseDirectory / "brokers" / "rpc"
@ -257,6 +272,7 @@ open class Node(configuration: NodeConfiguration,
ArtemisRpcBroker.withoutSsl(configuration, this.address, adminAddress, securityManager, MAX_RPC_MESSAGE_SIZE, jmxMonitoringHttpPort != null, rpcBrokerDirectory, shouldStartLocalShell())
}
}
rpcBroker!!.closeOnStop()
rpcBroker!!.addresses
}
}
@ -308,32 +324,6 @@ open class Node(configuration: NodeConfiguration,
}
}
override fun startMessagingService(rpcOps: RPCOps) {
// Start up the embedded MQ server
messageBroker?.apply {
runOnStop += this::close
start()
}
rpcBroker?.apply {
runOnStop += this::close
start()
}
// Start P2P bridge service
bridgeControlListener?.apply {
runOnStop += this::stop
start()
}
// Start up the MQ clients.
internalRpcMessagingClient?.run {
runOnStop += this::close
init(rpcOps, securityManager)
}
(network as P2PMessagingClient).apply {
runOnStop += this::stop
start()
}
}
/**
* If the node is persisting to an embedded H2 database, then expose this via TCP with a DB URL of the form:
* jdbc:h2:tcp://<host>:<port>/node
@ -344,9 +334,7 @@ open class Node(configuration: NodeConfiguration,
* This is not using the H2 "automatic mixed mode" directly but leans on many of the underpinnings. For more details
* on H2 URLs and configuration see: http://www.h2database.com/html/features.html#database_url
*/
override fun initialiseDatabasePersistence(schemaService: SchemaService,
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?): CordaPersistence {
override fun startDatabase() {
val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url")
val h2Prefix = "jdbc:h2:file:"
@ -374,11 +362,12 @@ open class Node(configuration: NodeConfiguration,
}
printBasicNodeInfo("Database connection url is", "jdbc:h2:$url/node")
}
}
else if (databaseUrl != null) {
} else if (databaseUrl != null) {
printBasicNodeInfo("Database connection url is", databaseUrl)
}
return super.initialiseDatabasePersistence(schemaService, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous)
super.startDatabase()
database.closeOnStop()
}
private val _startupComplete = openFuture<Unit>()
@ -390,7 +379,6 @@ open class Node(configuration: NodeConfiguration,
}
override fun start(): StartedNode<Node> {
serverThread = AffinityExecutor.ServiceAffinityExecutor("Node thread-$sameVmNodeNumber", 1)
initialiseSerialization()
val started: StartedNode<Node> = uncheckedCast(super.start())
nodeReadyFuture.thenMatch({
@ -419,7 +407,7 @@ open class Node(configuration: NodeConfiguration,
return started
}
override fun getRxIoScheduler(): Scheduler = Schedulers.io()
override val rxIoScheduler: Scheduler get() = Schedulers.io()
private fun initialiseSerialization() {
if (!initialiseSerialization) return
@ -437,8 +425,6 @@ open class Node(configuration: NodeConfiguration,
rpcClientContext = if (configuration.shouldInitCrashShell()) AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classloader) else null) //even Shell embeded in the node connects via RPC to the node
}
private var internalRpcMessagingClient: InternalRPCMessagingClient? = null
/** Starts a blocking event loop for message dispatch. */
fun run() {
internalRpcMessagingClient?.start(rpcBroker!!.serverControl)

View File

@ -20,14 +20,9 @@ import joptsimple.OptionParser
import joptsimple.util.PathConverter
import net.corda.core.cordapp.Cordapp
import net.corda.core.crypto.Crypto
import net.corda.core.internal.Emoji
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.errors.AddressBindingException
import net.corda.core.internal.exists
import net.corda.core.internal.location
import net.corda.core.internal.randomOrNull
import net.corda.core.utilities.Try
import net.corda.core.utilities.loggerFor
import net.corda.node.*
@ -336,6 +331,12 @@ open class NodeStartup(val args: Array<String>) {
return
}
if (conf.devMode) {
Emoji.renderIfSupported {
Node.printWarning("This node is running in developer mode! ${Emoji.developer} This is not safe for production deployment.")
}
}
val startedNode = node.start()
logLoadedCorDapps(startedNode.services.cordappProvider.cordapps)
startedNode.internals.nodeReadyFuture.thenMatch({

View File

@ -22,9 +22,15 @@ data class ServicesForResolutionImpl(
override val identityService: IdentityService,
override val attachments: AttachmentStorage,
override val cordappProvider: CordappProvider,
override val networkParameters: NetworkParameters,
private val validatedTransactions: TransactionStorage
) : ServicesForResolution {
private lateinit var _networkParameters: NetworkParameters
override val networkParameters: NetworkParameters get() = _networkParameters
fun start(networkParameters: NetworkParameters) {
_networkParameters = networkParameters
}
@Throws(TransactionResolutionException::class)
override fun loadState(stateRef: StateRef): TransactionState<*> {
val stx = validatedTransactions.getTransaction(stateRef.txhash) ?: throw TransactionResolutionException(stateRef.txhash)

View File

@ -16,7 +16,6 @@ import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.notary.NotaryService
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.persistence.NodeAttachmentService
@ -28,14 +27,15 @@ interface StartedNode<out N : AbstractNode> {
val internals: N
val services: StartedNodeServices
val info: NodeInfo
val checkpointStorage: CheckpointStorage
val smm: StateMachineManager
val attachments: NodeAttachmentService
val network: MessagingService
val database: CordaPersistence
val rpcOps: CordaRPCOps
val notaryService: NotaryService?
fun dispose() = internals.stop()
/**
* Use this method to register your initiated flows in your tests. This is automatically done by the node when it
* starts up for all [FlowLogic] classes it finds which are annotated with [InitiatedBy].

View File

@ -22,7 +22,7 @@ import net.corda.core.internal.createCordappContext
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.contextLogger
import net.corda.node.cordapp.CordappLoader
import java.net.URL
import java.util.concurrent.ConcurrentHashMap
@ -32,26 +32,25 @@ import java.util.concurrent.ConcurrentHashMap
*/
open class CordappProviderImpl(private val cordappLoader: CordappLoader,
private val cordappConfigProvider: CordappConfigProvider,
attachmentStorage: AttachmentStorage,
private val whitelistedContractImplementations: Map<String, List<AttachmentId>>) : SingletonSerializeAsToken(), CordappProviderInternal {
private val attachmentStorage: AttachmentStorage) : SingletonSerializeAsToken(), CordappProviderInternal {
companion object {
private val log = loggerFor<CordappProviderImpl>()
private val log = contextLogger()
}
private val contextCache = ConcurrentHashMap<Cordapp, CordappContext>()
private val cordappAttachments = HashBiMap.create<SecureHash, URL>()
/**
* Current known CorDapps loaded on this node
*/
override val cordapps get() = cordappLoader.cordapps
private val cordappAttachments = HashBiMap.create(loadContractsIntoAttachmentStore(attachmentStorage))
init {
verifyInstalledCordapps(attachmentStorage)
fun start(whitelistedContractImplementations: Map<String, List<AttachmentId>>) {
cordappAttachments.putAll(loadContractsIntoAttachmentStore())
verifyInstalledCordapps(whitelistedContractImplementations)
}
private fun verifyInstalledCordapps(attachmentStorage: AttachmentStorage) {
private fun verifyInstalledCordapps(whitelistedContractImplementations: Map<String, List<AttachmentId>>) {
// This will invoke the lazy flowCordappMap property, thus triggering the MultipleCordappsForFlow check.
cordappLoader.flowCordappMap
@ -96,7 +95,7 @@ open class CordappProviderImpl(private val cordappLoader: CordappLoader,
*/
fun getCordappAttachmentId(cordapp: Cordapp): SecureHash? = cordappAttachments.inverse()[cordapp.jarPath]
private fun loadContractsIntoAttachmentStore(attachmentStorage: AttachmentStorage): Map<SecureHash, URL> =
private fun loadContractsIntoAttachmentStore(): Map<SecureHash, URL> =
cordapps.filter { !it.contractClassNames.isEmpty() }.map {
it.jarPath.openStream().use { stream ->
try {
@ -114,14 +113,14 @@ open class CordappProviderImpl(private val cordappLoader: CordappLoader,
* @return A cordapp context for the given CorDapp
*/
fun getAppContext(cordapp: Cordapp): CordappContext {
return contextCache.computeIfAbsent(cordapp, {
return contextCache.computeIfAbsent(cordapp) {
createCordappContext(
cordapp,
getCordappAttachmentId(cordapp),
cordappLoader.appClassLoader,
TypesafeCordappConfig(cordappConfigProvider.getConfigByName(cordapp.name))
)
})
}
}
/**

View File

@ -17,6 +17,8 @@ import net.corda.core.transactions.NotaryChangeWireTransaction
import net.corda.core.transactions.WireTransaction
interface VaultServiceInternal : VaultService {
fun start()
/**
* Splits the provided [txns] into batches of [WireTransaction] and [NotaryChangeWireTransaction].
* This is required because the batches get aggregated into single updates, and we want to be able to

View File

@ -70,13 +70,13 @@ class NodeSchedulerService(private val clock: CordaClock,
private val database: CordaPersistence,
private val flowStarter: FlowStarter,
private val servicesForResolution: ServicesForResolution,
private val unfinishedSchedules: ReusableLatch = ReusableLatch(),
private val flowLogicRefFactory: FlowLogicRefFactory,
private val nodeProperties: NodePropertiesStore,
private val drainingModePollPeriod: Duration,
private val log: Logger = staticLog,
private val unfinishedSchedules: ReusableLatch = ReusableLatch(),
private val schedulerRepo: ScheduledFlowRepository = PersistentScheduledFlowRepository(database))
: SchedulerService, SingletonSerializeAsToken() {
: SchedulerService, AutoCloseable, SingletonSerializeAsToken() {
companion object {
private val staticLog get() = contextLogger()
@ -234,8 +234,7 @@ class NodeSchedulerService(private val clock: CordaClock,
}
}
@VisibleForTesting
internal fun join() {
override fun close() {
mutex.locked {
running = false
rescheduleWakeUp()

View File

@ -37,15 +37,9 @@ import javax.persistence.Lob
/**
* An identity service that stores parties and their identities to a key value tables in the database. The entries are
* cached for efficient lookup.
*
* @param trustRoot certificate from the zone operator for identity on the network.
* @param caCertificates list of additional certificates.
*/
@ThreadSafe
class PersistentIdentityService(override val trustRoot: X509Certificate,
private val database: CordaPersistence,
caCertificates: List<X509Certificate> = emptyList()) : SingletonSerializeAsToken(), IdentityServiceInternal {
class PersistentIdentityService : SingletonSerializeAsToken(), IdentityServiceInternal {
companion object {
private val log = contextLogger()
@ -103,30 +97,37 @@ class PersistentIdentityService(override val trustRoot: X509Certificate,
var publicKeyHash: String? = ""
)
override val caCertStore: CertStore
override val trustAnchor: TrustAnchor = TrustAnchor(trustRoot, null)
private lateinit var _caCertStore: CertStore
override val caCertStore: CertStore get() = _caCertStore
private lateinit var _trustRoot: X509Certificate
override val trustRoot: X509Certificate get() = _trustRoot
private lateinit var _trustAnchor: TrustAnchor
override val trustAnchor: TrustAnchor get() = _trustAnchor
// CordaPersistence is not a c'tor parameter to work around the cyclic dependency
lateinit var database: CordaPersistence
private val keyToParties = createPKMap()
private val principalToParties = createX500Map()
init {
val caCertificatesWithRoot: Set<X509Certificate> = caCertificates.toSet() + trustRoot
caCertStore = CertStore.getInstance("Collection", CollectionCertStoreParameters(caCertificatesWithRoot))
fun start(trustRoot: X509Certificate, caCertificates: List<X509Certificate> = emptyList()) {
_trustRoot = trustRoot
_trustAnchor = TrustAnchor(trustRoot, null)
_caCertStore = CertStore.getInstance("Collection", CollectionCertStoreParameters(caCertificates.toSet() + trustRoot))
}
/** Requires a database transaction. */
fun loadIdentities(identities: Iterable<PartyAndCertificate> = emptySet(), confidentialIdentities: Iterable<PartyAndCertificate> = emptySet()) {
database.transaction {
identities.forEach {
val key = mapToKey(it)
keyToParties.addWithDuplicatesAllowed(key, it, false)
principalToParties.addWithDuplicatesAllowed(it.name, key, false)
}
confidentialIdentities.forEach {
principalToParties.addWithDuplicatesAllowed(it.name, mapToKey(it), false)
}
log.debug("Identities loaded")
fun loadIdentities(identities: Collection<PartyAndCertificate> = emptySet(), confidentialIdentities: Collection<PartyAndCertificate> = emptySet()) {
identities.forEach {
val key = mapToKey(it)
keyToParties.addWithDuplicatesAllowed(key, it, false)
principalToParties.addWithDuplicatesAllowed(it.name, key, false)
}
confidentialIdentities.forEach {
principalToParties.addWithDuplicatesAllowed(it.name, mapToKey(it), false)
}
log.debug("Identities loaded")
}
@Throws(CertificateExpiredException::class, CertificateNotYetValidException::class, InvalidAlgorithmParameterException::class)
@ -177,5 +178,4 @@ class PersistentIdentityService(override val trustRoot: X509Certificate,
@Throws(UnknownAnonymousPartyException::class)
override fun assertOwnership(party: Party, anonymousParty: AnonymousParty) = database.transaction { super.assertOwnership(party, anonymousParty) }
}

View File

@ -14,7 +14,6 @@ import net.corda.core.crypto.*
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.ThreadBox
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.SingletonSerializeAsToken
import org.bouncycastle.operator.ContentSigner
import java.security.KeyPair
@ -35,25 +34,23 @@ import javax.annotation.concurrent.ThreadSafe
* etc.
*/
@ThreadSafe
class E2ETestKeyManagementService(val identityService: IdentityService,
initialKeys: Set<KeyPair>) : SingletonSerializeAsToken(), KeyManagementService {
class E2ETestKeyManagementService(val identityService: IdentityService) : SingletonSerializeAsToken(), KeyManagementServiceInternal {
private class InnerState {
val keys = HashMap<PublicKey, PrivateKey>()
}
private val mutex = ThreadBox(InnerState())
// Accessing this map clones it.
override val keys: Set<PublicKey> get() = mutex.locked { keys.keys }
init {
override fun start(initialKeyPairs: Set<KeyPair>) {
mutex.locked {
for (key in initialKeys) {
for (key in initialKeyPairs) {
keys[key.public] = key.private
}
}
}
// Accessing this map clones it.
override val keys: Set<PublicKey> get() = mutex.locked { keys.keys }
override fun freshKey(): PublicKey {
val keyPair = generateKeyPair()
mutex.locked {

View File

@ -0,0 +1,8 @@
package net.corda.node.services.keys
import net.corda.core.node.services.KeyManagementService
import java.security.KeyPair
interface KeyManagementServiceInternal : KeyManagementService {
fun start(initialKeyPairs: Set<KeyPair>)
}

View File

@ -13,7 +13,6 @@ package net.corda.node.services.keys
import net.corda.core.crypto.*
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.node.utilities.AppendOnlyPersistentMap
@ -37,8 +36,7 @@ import javax.persistence.Lob
* This class needs database transactions to be in-flight during method calls and init.
*/
class PersistentKeyManagementService(val identityService: IdentityService,
initialKeys: Set<KeyPair>,
private val database: CordaPersistence) : SingletonSerializeAsToken(), KeyManagementService {
private val database: CordaPersistence) : SingletonSerializeAsToken(), KeyManagementServiceInternal {
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}our_key_pairs")
@ -74,13 +72,10 @@ class PersistentKeyManagementService(val identityService: IdentityService,
}
}
val keysMap = createKeyMap()
private val keysMap = createKeyMap()
init {
// TODO this should be in a start function, not in an init block.
database.transaction {
initialKeys.forEach({ it -> keysMap.addWithDuplicatesAllowed(it.public, it.private) })
}
override fun start(initialKeyPairs: Set<KeyPair>) {
initialKeyPairs.forEach { keysMap.addWithDuplicatesAllowed(it.public, it.private) }
}
override val keys: Set<PublicKey> get() = database.transaction { keysMap.allPersisted().map { it.first }.toSet() }

View File

@ -36,7 +36,7 @@ import javax.annotation.concurrent.ThreadSafe
* is *reliable* and as such messages may be stored to disk once queued.
*/
@ThreadSafe
interface MessagingService {
interface MessagingService : AutoCloseable {
/**
* A unique identifier for this sender that changes whenever a node restarts. This is used in conjunction with a sequence
* number for message de-duplication at the recipient.

View File

@ -82,38 +82,32 @@ import javax.annotation.concurrent.ThreadSafe
* @param config The configuration of the node, which is used for controlling the message redelivery options.
* @param versionInfo All messages from the node carry the version info and received messages are checked against this for compatibility.
* @param serverAddress The host and port of the Artemis broker.
* @param myIdentity The primary identity of the node, which defines the messaging address for externally received messages.
* It is also used to construct the myAddress field, which is ultimately advertised in the network map.
* @param serviceIdentity An optional second identity if the node is also part of a group address, for example a notary.
* @param nodeExecutor The received messages are marshalled onto the server executor to prevent Netty buffers leaking during fiber suspends.
* @param database The nodes database, which is used to deduplicate messages.
* @param advertisedAddress The externally advertised version of the Artemis broker address used to construct myAddress and included
* in the network map data.
* @param maxMessageSize A bound applied to the message size.
*/
@ThreadSafe
class P2PMessagingClient(val config: NodeConfiguration,
private val versionInfo: VersionInfo,
private val serverAddress: NetworkHostAndPort,
private val myIdentity: PublicKey,
private val serviceIdentity: PublicKey?,
val serverAddress: NetworkHostAndPort,
private val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
private val database: CordaPersistence,
private val networkMap: NetworkMapCacheInternal,
private val metricRegistry: MetricRegistry,
val legalName: String,
advertisedAddress: NetworkHostAndPort = serverAddress,
private val maxMessageSize: Int,
private val isDrainingModeOn: () -> Boolean,
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, AutoCloseable {
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver {
companion object {
private val log = contextLogger()
}
class NodeClientMessage(override val topic: String, override val data: ByteSequence, override val uniqueMessageId: DeduplicationId, override val senderUUID: String?, override val additionalHeaders: Map<String, String>) : Message {
override val debugTimestamp: Instant = Instant.now()
override fun toString() = "$topic#${String(data.bytes)}"
}
private class NodeClientMessage(override val topic: String,
override val data: ByteSequence,
override val uniqueMessageId: DeduplicationId,
override val senderUUID: String?,
override val additionalHeaders: Map<String, String>) : Message {
override val debugTimestamp: Instant = Instant.now()
override fun toString() = "$topic#${String(data.bytes)}"
}
private class InnerState {
@ -134,7 +128,12 @@ class P2PMessagingClient(val config: NodeConfiguration,
/** A registration to handle messages of different types */
data class HandlerRegistration(val topic: String, val callback: Any) : MessageHandlerRegistration
override val myAddress: SingleMessageRecipient = NodeAddress(myIdentity, advertisedAddress)
private lateinit var myIdentity: PublicKey
private var serviceIdentity: PublicKey? = null
private lateinit var advertisedAddress: NetworkHostAndPort
private var maxMessageSize: Int = -1
override val myAddress: SingleMessageRecipient get() = NodeAddress(myIdentity, advertisedAddress)
override val ourSenderUUID = UUID.randomUUID().toString()
private val state = ThreadBox(InnerState())
@ -147,7 +146,19 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val deduplicator = P2PMessageDeduplicator(database)
var messagingExecutor: MessagingExecutor? = null
fun start() {
/**
* @param myIdentity The primary identity of the node, which defines the messaging address for externally received messages.
* It is also used to construct the myAddress field, which is ultimately advertised in the network map.
* @param serviceIdentity An optional second identity if the node is also part of a group address, for example a notary.
* @param advertisedAddress The externally advertised version of the Artemis broker address used to construct myAddress and included
* in the network map data.
* @param maxMessageSize A bound applied to the message size.
*/
fun start(myIdentity: PublicKey, serviceIdentity: PublicKey?, maxMessageSize: Int, advertisedAddress: NetworkHostAndPort = serverAddress) {
this.myIdentity = myIdentity
this.serviceIdentity = serviceIdentity
this.advertisedAddress = advertisedAddress
this.maxMessageSize = maxMessageSize
state.locked {
started = true
log.info("Connecting to message broker: $serverAddress")

View File

@ -33,12 +33,17 @@ import java.security.cert.X509Certificate
import java.time.Duration
import java.util.*
class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certificate) {
class NetworkMapClient(compatibilityZoneURL: URL) {
companion object {
private val logger = contextLogger()
}
private val networkMapUrl = URL("$compatibilityZoneURL/network-map")
private lateinit var trustRoot: X509Certificate
fun start(trustRoot: X509Certificate) {
this.trustRoot = trustRoot
}
fun publish(signedNodeInfo: SignedNodeInfo) {
val publishURL = URL("$networkMapUrl/publish")
@ -59,7 +64,7 @@ class NetworkMapClient(compatibilityZoneURL: URL, val trustedRoot: X509Certifica
logger.trace { "Fetching network map update from $url." }
val connection = url.openHttpConnection()
val signedNetworkMap = connection.responseAs<SignedNetworkMap>()
val networkMap = signedNetworkMap.verifiedNetworkMapCert(trustedRoot)
val networkMap = signedNetworkMap.verifiedNetworkMapCert(trustRoot)
val timeout = connection.cacheControl.maxAgeSeconds().seconds
logger.trace { "Fetched network map update from $url successfully: $networkMap" }
return NetworkMapResponse(networkMap, timeout)

View File

@ -26,16 +26,12 @@ import net.corda.core.utilities.minutes
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.NamedThreadFactory
import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.NetworkMap
import net.corda.nodeapi.internal.network.ParametersUpdate
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.nodeapi.internal.network.verifiedNetworkMapCert
import net.corda.nodeapi.internal.network.*
import rx.Subscription
import rx.subjects.PublishSubject
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.*
import java.util.concurrent.Executors
@ -46,8 +42,6 @@ import kotlin.system.exitProcess
class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val fileWatcher: NodeInfoWatcher,
private val networkMapClient: NetworkMapClient?,
private val currentParametersHash: SecureHash,
private val ourNodeInfoHash: SecureHash?,
private val baseDirectory: Path,
private val extraNetworkMapKeys: List<UUID>
) : AutoCloseable {
@ -60,21 +54,20 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
private val executor = ScheduledThreadPoolExecutor(1, NamedThreadFactory("Network Map Updater Thread", Executors.defaultThreadFactory()))
private var newNetworkParameters: Pair<ParametersUpdate, SignedNetworkParameters>? = null
private var fileWatcherSubscription: Subscription? = null
private lateinit var trustRoot: X509Certificate
private lateinit var currentParametersHash: SecureHash
private lateinit var ourNodeInfoHash: SecureHash
override fun close() {
fileWatcherSubscription?.unsubscribe()
MoreExecutors.shutdownAndAwaitTermination(executor, 50, TimeUnit.SECONDS)
}
fun trackParametersUpdate(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
val currentUpdateInfo = newNetworkParameters?.let {
ParametersUpdateInfo(it.first.newParametersHash, it.second.verified(), it.first.description, it.first.updateDeadline)
}
return DataFeed(currentUpdateInfo, parametersUpdatesTrack)
}
fun subscribeToNetworkMap() {
fun start(trustRoot: X509Certificate, currentParametersHash: SecureHash, ourNodeInfoHash: SecureHash) {
require(fileWatcherSubscription == null) { "Should not call this method twice." }
this.trustRoot = trustRoot
this.currentParametersHash = currentParametersHash
this.ourNodeInfoHash = ourNodeInfoHash
// Subscribe to file based networkMap
fileWatcherSubscription = fileWatcher.nodeInfoUpdates().subscribe {
when (it) {
@ -108,6 +101,13 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal,
}) // The check may be expensive, so always run it in the background even the first time.
}
fun trackParametersUpdate(): DataFeed<ParametersUpdateInfo?, ParametersUpdateInfo> {
val currentUpdateInfo = newNetworkParameters?.let {
ParametersUpdateInfo(it.first.newParametersHash, it.second.verified(), it.first.description, it.first.updateDeadline)
}
return DataFeed(currentUpdateInfo, parametersUpdatesTrack)
}
fun updateNetworkMapCache(): Duration {
if (networkMapClient == null) throw CordaRuntimeException("Network map cache can be updated only if network map/compatibility zone URL is specified")
val (globalNetworkMap, cacheTimeout) = networkMapClient.getNetworkMap()
@ -178,7 +178,7 @@ The node will shutdown now.""")
return
}
val newSignedNetParams = networkMapClient.getNetworkParameters(update.newParametersHash)
val newNetParams = newSignedNetParams.verifiedNetworkMapCert(networkMapClient.trustedRoot)
val newNetParams = newSignedNetParams.verifiedNetworkMapCert(trustRoot)
logger.info("Downloaded new network parameters: $newNetParams from the update: $update")
newNetworkParameters = Pair(update, newSignedNetParams)
val updateInfo = ParametersUpdateInfo(
@ -195,7 +195,7 @@ The node will shutdown now.""")
// Add persisting of newest parameters from update.
val (update, signedNewNetParams) = requireNotNull(newNetworkParameters) { "Couldn't find parameters update for the hash: $parametersHash" }
// We should check that we sign the right data structure hash.
val newNetParams = signedNewNetParams.verifiedNetworkMapCert(networkMapClient.trustedRoot)
val newNetParams = signedNewNetParams.verifiedNetworkMapCert(trustRoot)
val newParametersHash = signedNewNetParams.raw.hash
if (parametersHash == newParametersHash) {
// The latest parameters have priority.

View File

@ -30,7 +30,6 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
@ -44,18 +43,17 @@ import rx.subjects.PublishSubject
import java.security.PublicKey
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.HashSet
class NetworkMapCacheImpl(
networkMapCacheBase: NetworkMapCacheBaseInternal,
private val networkMapCacheBase: NetworkMapCacheBaseInternal,
private val identityService: IdentityService,
private val database: CordaPersistence
) : NetworkMapCacheBaseInternal by networkMapCacheBase, NetworkMapCacheInternal, SingletonSerializeAsToken() {
companion object {
private val logger = loggerFor<NetworkMapCacheImpl>()
private val logger = contextLogger()
}
init {
fun start() {
networkMapCacheBase.allNodes.forEach { it.legalIdentitiesAndCerts.forEach { identityService.verifyAndRegisterIdentity(it) } }
networkMapCacheBase.changed.subscribe { mapChange ->
// TODO how should we handle network map removal
@ -86,10 +84,7 @@ class NetworkMapCacheImpl(
* Extremely simple in-memory cache of the network map.
*/
@ThreadSafe
open class PersistentNetworkMapCache(
private val database: CordaPersistence,
notaries: List<NotaryInfo>
) : SingletonSerializeAsToken(), NetworkMapCacheBaseInternal {
open class PersistentNetworkMapCache(private val database: CordaPersistence) : SingletonSerializeAsToken(), NetworkMapCacheBaseInternal {
companion object {
private val logger = contextLogger()
}
@ -103,9 +98,9 @@ open class PersistentNetworkMapCache(
// with the NetworkMapService redesign their meaning is not too well defined.
private val _nodeReady = openFuture<Void?>()
override val nodeReady: CordaFuture<Void?> = _nodeReady
private lateinit var notaries: List<NotaryInfo>
override val notaryIdentities: List<Party> = notaries.map { it.identity }
private val validatingNotaries = notaries.mapNotNullTo(HashSet()) { if (it.validating) it.identity else null }
override val notaryIdentities: List<Party> get() = notaries.map { it.identity }
override val allNodeHashes: List<SecureHash>
get() {
@ -120,6 +115,10 @@ open class PersistentNetworkMapCache(
}
}
fun start(notaries: List<NotaryInfo>) {
this.notaries = notaries
}
override fun getNodeByHash(nodeHash: SecureHash): NodeInfo? {
return database.transaction {
val builder = session.criteriaBuilder
@ -132,7 +131,7 @@ open class PersistentNetworkMapCache(
}
}
override fun isValidatingNotary(party: Party): Boolean = party in validatingNotaries
override fun isValidatingNotary(party: Party): Boolean = notaries.any { it.validating && it.identity == party }
override fun getPartyInfo(party: Party): PartyInfo? {
val nodes = getNodesByLegalIdentityKey(party.owningKey)

View File

@ -57,12 +57,10 @@ import javax.persistence.*
@ThreadSafe
class NodeAttachmentService(
metrics: MetricRegistry,
private val database: CordaPersistence,
attachmentContentCacheSize: Long = NodeConfiguration.defaultAttachmentContentCacheSize,
attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound,
private val database: CordaPersistence
) : AttachmentStorage, SingletonSerializeAsToken(
) {
attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound
) : AttachmentStorage, SingletonSerializeAsToken() {
companion object {
private val log = contextLogger()
@ -119,14 +117,12 @@ class NodeAttachmentService(
private val attachmentCount = metrics.counter("Attachments")
fun start() {
database.transaction {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(Long::class.java)
criteriaQuery.select(criteriaBuilder.count(criteriaQuery.from(NodeAttachmentService.DBAttachment::class.java)))
val count = session.createQuery(criteriaQuery).singleResult
attachmentCount.inc(count)
}
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(Long::class.java)
criteriaQuery.select(criteriaBuilder.count(criteriaQuery.from(NodeAttachmentService.DBAttachment::class.java)))
val count = session.createQuery(criteriaQuery).singleResult
attachmentCount.inc(count)
}
@CordaSerializable

View File

@ -10,8 +10,8 @@
package net.corda.node.services.persistence
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.NodePropertiesStore.FlowsDrainingModeOperations
import net.corda.node.utilities.PersistentMap
@ -27,13 +27,16 @@ import javax.persistence.Table
/**
* Simple node properties key value store in DB.
*/
class NodePropertiesPersistentStore(readPhysicalNodeId: () -> String, persistence: CordaPersistence) : NodePropertiesStore {
class NodePropertiesPersistentStore(readPhysicalNodeId: () -> String, database: CordaPersistence) : NodePropertiesStore {
private companion object {
val logger = loggerFor<NodePropertiesStore>()
val logger = contextLogger()
}
override val flowsDrainingMode: FlowsDrainingModeOperations = FlowsDrainingModeOperationsImpl(readPhysicalNodeId, persistence, logger)
override val flowsDrainingMode = FlowsDrainingModeOperationsImpl(readPhysicalNodeId, database, logger)
fun start() {
flowsDrainingMode.map.preload()
}
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}properties")
@ -47,20 +50,23 @@ class NodePropertiesPersistentStore(readPhysicalNodeId: () -> String, persistenc
)
}
private class FlowsDrainingModeOperationsImpl(readPhysicalNodeId: () -> String, private val persistence: CordaPersistence, logger: Logger) : FlowsDrainingModeOperations {
class FlowsDrainingModeOperationsImpl(readPhysicalNodeId: () -> String, private val persistence: CordaPersistence, logger: Logger) : FlowsDrainingModeOperations {
private val nodeSpecificFlowsExecutionModeKey = "${readPhysicalNodeId()}_flowsExecutionMode"
init {
logger.debug { "Node's flow execution mode property key: $nodeSpecificFlowsExecutionModeKey" }
}
private val map = PersistentMap({ key -> key }, { entity -> entity.key to entity.value!! }, NodePropertiesPersistentStore::DBNodeProperty, NodePropertiesPersistentStore.DBNodeProperty::class.java)
internal val map = PersistentMap(
{ key -> key },
{ entity -> entity.key to entity.value!! },
NodePropertiesPersistentStore::DBNodeProperty,
NodePropertiesPersistentStore.DBNodeProperty::class.java
)
override val values = PublishSubject.create<Pair<Boolean, Boolean>>()!!
override fun setEnabled(enabled: Boolean) {
var oldValue: Boolean? = null
persistence.transaction {
oldValue = map.put(nodeSpecificFlowsExecutionModeKey, enabled.toString())?.toBoolean() ?: false
@ -69,9 +75,8 @@ private class FlowsDrainingModeOperationsImpl(readPhysicalNodeId: () -> String,
}
override fun isEnabled(): Boolean {
return persistence.transaction {
map[nodeSpecificFlowsExecutionModeKey]?.toBoolean() ?: false
}
}
}
}

View File

@ -111,7 +111,7 @@ class SingleThreadedStateMachineManager(
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
private val transitionExecutor = makeTransitionExecutor()
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
private val ourSenderUUID get() = serviceHub.networkService.ourSenderUUID // This is a getter since AbstractNode.network is still lateinit
private var checkpointSerializationContext: SerializationContext? = null
private var actionExecutor: ActionExecutor? = null

View File

@ -16,7 +16,8 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.LedgerTransaction
import java.util.concurrent.Executors
class InMemoryTransactionVerifierService(numberOfWorkers: Int) : SingletonSerializeAsToken(), TransactionVerifierService {
class InMemoryTransactionVerifierService(numberOfWorkers: Int) : SingletonSerializeAsToken(), TransactionVerifierService, AutoCloseable {
private val workerPool = Executors.newFixedThreadPool(numberOfWorkers)
override fun verify(transaction: LedgerTransaction) = workerPool.fork(transaction::verify)
override fun close() = workerPool.shutdown()
}

View File

@ -53,6 +53,10 @@ class ContractUpgradeServiceImpl : ContractUpgradeService, SingletonSerializeAsT
private val authorisedUpgrade = createContractUpgradesMap()
fun start() {
authorisedUpgrade.preload()
}
override fun getAuthorisedContractUpgrade(ref: StateRef) = authorisedUpgrade[ref.toString()]
override fun storeAuthorisedContractUpgrade(ref: StateRef, upgradedContractClass: Class<out UpgradedContract<*, *>>) {

View File

@ -12,47 +12,21 @@ package net.corda.node.services.vault
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import net.corda.core.contracts.Amount
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.FungibleAsset
import net.corda.core.contracts.OwnableState
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.internal.*
import net.corda.core.messaging.DataFeed
import net.corda.core.node.ServicesForResolution
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.KeyManagementService
import net.corda.core.node.services.StatesNotAvailableException
import net.corda.core.node.services.Vault
import net.corda.core.node.services.VaultQueryException
import net.corda.core.node.services.queryBy
import net.corda.core.node.services.vault.DEFAULT_PAGE_NUM
import net.corda.core.node.services.vault.DEFAULT_PAGE_SIZE
import net.corda.core.node.services.vault.MAX_PAGE_SIZE
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.node.services.vault.SortAttribute
import net.corda.core.node.services.vault.builder
import net.corda.core.node.services.*
import net.corda.core.node.services.vault.*
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.ContractUpgradeWireTransaction
import net.corda.core.transactions.CoreTransaction
import net.corda.core.transactions.FullTransaction
import net.corda.core.transactions.NotaryChangeWireTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.toHexString
import net.corda.core.utilities.toNonEmptySet
import net.corda.core.utilities.trace
import net.corda.core.transactions.*
import net.corda.core.utilities.*
import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
@ -87,7 +61,6 @@ class NodeVaultService(
private val clock: Clock,
private val keyManagementService: KeyManagementService,
private val servicesForResolution: ServicesForResolution,
hibernateConfig: HibernateConfiguration,
private val database: CordaPersistence
) : SingletonSerializeAsToken(), VaultServiceInternal {
private companion object {
@ -104,6 +77,32 @@ class NodeVaultService(
}
private val concurrentBox = ConcurrentBox(InnerState())
private lateinit var criteriaBuilder: CriteriaBuilder
/**
* Maintain a list of contract state interfaces to concrete types stored in the vault
* for usage in generic queries of type queryBy<LinearState> or queryBy<FungibleState<*>>
*/
private val contractStateTypeMappings = mutableMapOf<String, MutableSet<String>>()
override fun start() {
criteriaBuilder = database.hibernateConfig.sessionFactoryForRegisteredSchemas.criteriaBuilder
bootstrapContractStateTypes()
rawUpdates.subscribe { update ->
update.produced.forEach {
val concreteType = it.state.data.javaClass
log.trace { "State update of type: $concreteType" }
val seen = contractStateTypeMappings.any { it.value.contains(concreteType.name) }
if (!seen) {
val contractInterfaces = deriveContractInterfaces(concreteType)
contractInterfaces.map {
val contractInterface = contractStateTypeMappings.getOrPut(it.name) { mutableSetOf() }
contractInterface.add(concreteType.name)
}
}
}
}
}
private fun recordUpdate(update: Vault.Update<ContractState>): Vault.Update<ContractState> {
if (!update.isEmpty()) {
@ -413,31 +412,6 @@ class NodeVaultService(
return keysToCheck.any { it in myKeys }
}
private val sessionFactory = hibernateConfig.sessionFactoryForRegisteredSchemas
private val criteriaBuilder = sessionFactory.criteriaBuilder
/**
* Maintain a list of contract state interfaces to concrete types stored in the vault
* for usage in generic queries of type queryBy<LinearState> or queryBy<FungibleState<*>>
*/
private val contractStateTypeMappings = bootstrapContractStateTypes()
init {
rawUpdates.subscribe { update ->
update.produced.forEach {
val concreteType = it.state.data.javaClass
log.trace { "State update of type: $concreteType" }
val seen = contractStateTypeMappings.any { it.value.contains(concreteType.name) }
if (!seen) {
val contractInterfaces = deriveContractInterfaces(concreteType)
contractInterfaces.map {
val contractInterface = contractStateTypeMappings.getOrPut(it.name) { mutableSetOf() }
contractInterface.add(concreteType.name)
}
}
}
}
}
@Throws(VaultQueryException::class)
override fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): Vault.Page<T> {
return _queryBy(criteria, paging, sorting, contractStateType, false)
@ -549,7 +523,7 @@ class NodeVaultService(
/**
* Derive list from existing vault states and then incrementally update using vault observables
*/
private fun bootstrapContractStateTypes(): MutableMap<String, MutableSet<String>> {
private fun bootstrapContractStateTypes() {
val criteria = criteriaBuilder.createQuery(String::class.java)
val vaultStates = criteria.from(VaultSchemaV1.VaultStates::class.java)
criteria.select(vaultStates.get("contractStateClassName")).distinct(true)
@ -559,7 +533,6 @@ class NodeVaultService(
val results = query.resultList
val distinctTypes = results.map { it }
val contractInterfaceToConcreteTypes = mutableMapOf<String, MutableSet<String>>()
val unknownTypes = mutableSetOf<String>()
distinctTypes.forEach { type ->
val concreteType: Class<ContractState>? = try {
@ -571,7 +544,7 @@ class NodeVaultService(
concreteType?.let {
val contractInterfaces = deriveContractInterfaces(it)
contractInterfaces.map {
val contractInterface = contractInterfaceToConcreteTypes.getOrPut(it.name) { mutableSetOf() }
val contractInterface = contractStateTypeMappings.getOrPut(it.name) { mutableSetOf() }
contractInterface.add(it.name)
}
}
@ -579,7 +552,6 @@ class NodeVaultService(
if (unknownTypes.isNotEmpty()) {
log.warn("There are unknown contract state types in the vault, which will prevent these states from being used. The relevant CorDapps must be loaded for these states to be used. The types not on the classpath are ${unknownTypes.joinToString(", ", "[", "]")}.")
}
return contractInterfaceToConcreteTypes
}
private fun <T : ContractState> deriveContractInterfaces(clazz: Class<T>): Set<Class<T>> {

View File

@ -33,12 +33,14 @@ class PersistentMap<K : Any, V, E, out EK>(
private val cache = NonInvalidatingUnboundCache(
loadFunction = { key -> Optional.ofNullable(loadValue(key)) },
removalListener = ExplicitRemoval(toPersistentEntityKey, persistentEntityClass)
).apply {
//preload to allow all() to take data only from the cache (cache is unbound)
)
/** Preload to allow [all] to take data only from the cache (cache is unbound) */
fun preload() {
val session = currentDBSession()
val criteriaQuery = session.criteriaBuilder.createQuery(persistentEntityClass)
criteriaQuery.select(criteriaQuery.from(persistentEntityClass))
getAll(session.createQuery(criteriaQuery).resultList.map { e -> fromPersistentEntity(e as E).first }.asIterable())
cache.getAll(session.createQuery(criteriaQuery).resultList.map { e -> fromPersistentEntity(e as E).first }.asIterable())
}
class ExplicitRemoval<K, V, E, EK>(private val toPersistentEntityKey: (K) -> EK, private val persistentEntityClass: Class<E>) : RemovalListener<K, V> {

View File

@ -21,7 +21,7 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.VersionInfo
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.*
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -36,12 +36,11 @@ import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.nio.file.Path
import java.time.Duration
import kotlin.test.assertEquals
import kotlin.test.assertNull
class NodeTest {
private abstract class AbstractNodeConfiguration : NodeConfiguration
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
@ -71,7 +70,10 @@ class NodeTest {
val configuration = createConfig(ALICE_NAME)
val info = VersionInfo(789, "3.0", "SNAPSHOT", "R3")
configureDatabase(configuration.dataSourceProperties, configuration.database, { null }, { null }).use {
val node = Node(configuration, info, initialiseSerialization = false)
val versionInfo = rigorousMock<VersionInfo>().also {
doReturn(platformVersion).whenever(it).platformVersion
}
val node = Node(configuration, versionInfo, initialiseSerialization = false)
assertEquals(node.generateNodeInfo(), node.generateNodeInfo()) // Node info doesn't change (including the serial)
}
}
@ -160,20 +162,24 @@ class NodeTest {
}
private fun createConfig(nodeName: CordaX500Name): NodeConfiguration {
val dataSourceProperties = makeTestDataSourceProperties()
val databaseConfig = DatabaseConfig()
val nodeAddress = NetworkHostAndPort("0.1.2.3", 456)
return rigorousMock<AbstractNodeConfiguration>().also {
doReturn(null).whenever(it).relay
doReturn(nodeAddress).whenever(it).p2pAddress
doReturn(nodeName).whenever(it).myLegalName
doReturn(null).whenever(it).notary // Don't add notary identity.
doReturn(dataSourceProperties).whenever(it).dataSourceProperties
doReturn(databaseConfig).whenever(it).database
doReturn(temporaryFolder.root.toPath()).whenever(it).baseDirectory
doReturn(true).whenever(it).devMode // Needed for identity cert.
doReturn("tsp").whenever(it).trustStorePassword
doReturn("ksp").whenever(it).keyStorePassword
}
val fakeAddress = NetworkHostAndPort("0.1.2.3", 456)
return NodeConfigurationImpl(
baseDirectory = temporaryFolder.root.toPath(),
myLegalName = nodeName,
devMode = true, // Needed for identity cert.
emailAddress = "",
p2pAddress = fakeAddress,
keyStorePassword = "ksp",
trustStorePassword = "tsp",
crlCheckSoftFail = true,
dataSourceProperties = makeTestDataSourceProperties(),
database = DatabaseConfig(),
rpcUsers = emptyList(),
verifierType = VerifierType.InMemory,
flowTimeout = FlowTimeoutConfiguration(timeout = Duration.ZERO, backoffBase = 1.0, maxRestartCount = 1),
rpcSettings = NodeRpcSettings(address = fakeAddress, adminAddress = null, ssl = null),
messagingServerAddress = null,
notary = null
)
}
}

View File

@ -86,7 +86,7 @@ class CordappProviderImplTests {
val configProvider = MockCordappConfigProvider()
configProvider.cordappConfigs[isolatedCordappName] = validConfig
val loader = JarScanningCordappLoader.fromJarUrls(listOf(isolatedJAR))
val provider = CordappProviderImpl(loader, configProvider, attachmentStore, whitelistedContractImplementations)
val provider = CordappProviderImpl(loader, configProvider, attachmentStore).apply { start(whitelistedContractImplementations) }
val expected = provider.getAppContext(provider.cordapps.first()).config
@ -95,6 +95,6 @@ class CordappProviderImplTests {
private fun newCordappProvider(vararg urls: URL): CordappProviderImpl {
val loader = JarScanningCordappLoader.fromJarUrls(urls.toList())
return CordappProviderImpl(loader, stubConfigProvider, attachmentStore, whitelistedContractImplementations)
return CordappProviderImpl(loader, stubConfigProvider, attachmentStore).apply { start(whitelistedContractImplementations) }
}
}

View File

@ -60,11 +60,7 @@ import net.corda.testing.internal.rigorousMock
import net.corda.testing.internal.vault.VaultFiller
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.MockServices
import net.corda.testing.node.internal.cordappsForPackages
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.pumpReceive
import net.corda.testing.node.internal.startFlow
import net.corda.testing.node.internal.*
import net.corda.testing.node.ledger
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
@ -109,7 +105,9 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
@After
fun after() {
mockNet.stopNodes()
if (::mockNet.isInitialized) {
mockNet.stopNodes()
}
LogHelper.reset("platform.trade", "core.contract.TransactionGroup", "recordingmap")
}
@ -158,11 +156,11 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
bobNode.dispose()
aliceNode.database.transaction {
assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty()
assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty()
}
aliceNode.internals.manuallyCloseDB()
bobNode.database.transaction {
assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty()
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
}
bobNode.internals.manuallyCloseDB()
}
@ -216,11 +214,11 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
bobNode.dispose()
aliceNode.database.transaction {
assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty()
assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty()
}
aliceNode.internals.manuallyCloseDB()
bobNode.database.transaction {
assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty()
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
}
bobNode.internals.manuallyCloseDB()
}
@ -272,7 +270,7 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
// OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature.
bobNode.database.transaction {
assertThat(bobNode.checkpointStorage.checkpoints()).hasSize(1)
assertThat(bobNode.internals.checkpointStorage.checkpoints()).hasSize(1)
}
val storage = bobNode.services.validatedTransactions
@ -305,10 +303,10 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
assertThat(bobNode.smm.findStateMachines(Buyer::class.java)).isEmpty()
bobNode.database.transaction {
assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty()
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
}
aliceNode.database.transaction {
assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty()
assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty()
}
bobNode.database.transaction {
@ -331,15 +329,15 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
if (cordappLoader != null) {
object : InternalMockNetwork.MockNode(args, cordappLoader) {
// That constructs a recording tx storage
override fun makeTransactionStorage(database: CordaPersistence, transactionCacheSizeBytes: Long): WritableTransactionStorage {
return RecordingTransactionStorage(database, super.makeTransactionStorage(database, transactionCacheSizeBytes))
override fun makeTransactionStorage(transactionCacheSizeBytes: Long): WritableTransactionStorage {
return RecordingTransactionStorage(database, super.makeTransactionStorage(transactionCacheSizeBytes))
}
}
} else {
object : InternalMockNetwork.MockNode(args) {
// That constructs a recording tx storage
override fun makeTransactionStorage(database: CordaPersistence, transactionCacheSizeBytes: Long): WritableTransactionStorage {
return RecordingTransactionStorage(database, super.makeTransactionStorage(database, transactionCacheSizeBytes))
override fun makeTransactionStorage(transactionCacheSizeBytes: Long): WritableTransactionStorage {
return RecordingTransactionStorage(database, super.makeTransactionStorage(transactionCacheSizeBytes))
}
}
}

View File

@ -150,10 +150,10 @@ class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() {
database,
flowStarter,
servicesForResolution,
flowLogicRefFactory = flowLogicRefFactory,
nodeProperties = nodeProperties,
drainingModePollPeriod = Duration.ofSeconds(5),
log = log,
flowLogicRefFactory,
nodeProperties,
Duration.ofSeconds(5),
log,
schedulerRepo = MockScheduledFlowRepository()
).apply { start() }
@ -161,7 +161,7 @@ class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() {
@JvmField
val tearDown = object : TestWatcher() {
override fun succeeded(description: Description) {
scheduler.join()
scheduler.close()
verifyNoMoreInteractions(flowStarter)
}
}
@ -337,7 +337,7 @@ class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() {
testClock.advanceBy(1.days)
assertStarted(flowLogic)
newScheduler.join()
newScheduler.close()
}
}
@ -370,7 +370,7 @@ class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() {
testClock.advanceBy(1.days)
assertStarted(flowLogic)
scheduler.join()
scheduler.close()
}
}
}

View File

@ -16,7 +16,6 @@ import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.UnknownAnonymousPartyException
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.crypto.CertificateType
@ -24,11 +23,7 @@ import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.crypto.x509Certificates
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.getTestPartyAndCertificate
import net.corda.testing.core.*
import net.corda.testing.internal.DEV_INTERMEDIATE_CA
import net.corda.testing.internal.DEV_ROOT_CA
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
@ -37,14 +32,10 @@ import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.atomic.AtomicReference
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNull
/**
* Tests for the in memory identity service.
*/
class PersistentIdentityServiceTests {
private companion object {
val alice = TestIdentity(ALICE_NAME, 70)
@ -61,16 +52,19 @@ class PersistentIdentityServiceTests {
@JvmField
val testSerialization = SerializationEnvironmentRule()
private lateinit var database: CordaPersistence
private lateinit var identityService: IdentityService
private lateinit var identityService: PersistentIdentityService
@Before
fun setup() {
val identityServiceRef = AtomicReference<IdentityService>()
// Do all of this in a database transaction so anything that might need a connection has one.
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true),
{ name -> identityServiceRef.get().wellKnownPartyFromX500Name(name) },
{ party -> identityServiceRef.get().wellKnownPartyFromAnonymous(party) })
identityService = PersistentIdentityService(DEV_ROOT_CA.certificate, database).also(identityServiceRef::set)
identityService = PersistentIdentityService()
database = configureDatabase(
makeTestDataSourceProperties(),
DatabaseConfig(runMigration = true),
identityService::wellKnownPartyFromX500Name,
identityService::wellKnownPartyFromAnonymous
)
identityService.database = database
identityService.start(DEV_ROOT_CA.certificate)
}
@After
@ -214,7 +208,10 @@ class PersistentIdentityServiceTests {
identityService.verifyAndRegisterIdentity(anonymousBob)
// Create new identity service mounted onto same DB
val newPersistentIdentityService = PersistentIdentityService(DEV_ROOT_CA.certificate, database)
val newPersistentIdentityService = PersistentIdentityService().also {
it.database = database
it.start(DEV_ROOT_CA.certificate)
}
newPersistentIdentityService.assertOwnership(alice.party, anonymousAlice.party.anonymise())
newPersistentIdentityService.assertOwnership(bob.party, anonymousBob.party.anonymise())

View File

@ -50,7 +50,7 @@ class NetworkMapClientTest {
fun setUp() {
server = NetworkMapServer(cacheTimeout)
val address = server.start()
networkMapClient = NetworkMapClient(URL("http://$address"), DEV_ROOT_CA.certificate)
networkMapClient = NetworkMapClient(URL("http://$address")).apply { start(DEV_ROOT_CA.certificate) }
}
@After

View File

@ -73,7 +73,7 @@ class NetworkMapUpdaterTest {
fun setUp() {
server = NetworkMapServer(cacheExpiryMs.millis)
val address = server.start()
networkMapClient = NetworkMapClient(URL("http://$address"), DEV_ROOT_CA.certificate)
networkMapClient = NetworkMapClient(URL("http://$address")).apply { start(DEV_ROOT_CA.certificate) }
}
@After
@ -83,8 +83,12 @@ class NetworkMapUpdaterTest {
server.close()
}
private fun setUpdater(ourNodeHash: SecureHash? = null, extraNetworkMapKeys: List<UUID> = emptyList(), netMapClient: NetworkMapClient? = networkMapClient) {
updater = NetworkMapUpdater(networkMapCache, fileWatcher, netMapClient, server.networkParameters.serialize().hash, ourNodeHash, baseDir, extraNetworkMapKeys)
private fun setUpdater(extraNetworkMapKeys: List<UUID> = emptyList(), netMapClient: NetworkMapClient? = networkMapClient) {
updater = NetworkMapUpdater(networkMapCache, fileWatcher, netMapClient, baseDir, extraNetworkMapKeys)
}
private fun startUpdater(ourNodeHash: SecureHash = SecureHash.randomSHA256()) {
updater.start(DEV_ROOT_CA.certificate, server.networkParameters.serialize().hash, ourNodeHash)
}
@Test
@ -101,7 +105,7 @@ class NetworkMapUpdaterTest {
// Not subscribed yet.
verify(networkMapCache, times(0)).addNode(any())
updater.subscribeToNetworkMap()
startUpdater()
networkMapClient.publish(signedNodeInfo2)
// TODO: Remove sleep in unit test.
@ -139,7 +143,7 @@ class NetworkMapUpdaterTest {
networkMapClient.publish(signedNodeInfo3)
networkMapClient.publish(signedNodeInfo4)
updater.subscribeToNetworkMap()
startUpdater()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
@ -172,7 +176,7 @@ class NetworkMapUpdaterTest {
// Not subscribed yet.
verify(networkMapCache, times(0)).addNode(any())
updater.subscribeToNetworkMap()
startUpdater()
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned)
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
@ -193,7 +197,7 @@ class NetworkMapUpdaterTest {
val newParameters = testNetworkParameters(epoch = 2)
val updateDeadline = Instant.now().plus(1, ChronoUnit.DAYS)
server.scheduleParametersUpdate(newParameters, "Test update", updateDeadline)
updater.subscribeToNetworkMap()
startUpdater()
updates.expectEvents(isStrict = false) {
sequence(
expect { update: ParametersUpdateInfo ->
@ -211,12 +215,12 @@ class NetworkMapUpdaterTest {
setUpdater()
val newParameters = testNetworkParameters(epoch = 314)
server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN)
updater.subscribeToNetworkMap()
startUpdater()
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
val newHash = newParameters.serialize().hash
val keyPair = Crypto.generateKeyPair()
updater.acceptNewNetworkParameters(newHash, { hash -> hash.serialize().sign(keyPair) })
updater.acceptNewNetworkParameters(newHash) { it.serialize().sign(keyPair) }
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
val signedNetworkParams = updateFile.readObject<SignedNetworkParameters>()
val paramsFromFile = signedNetworkParams.verifiedNetworkMapCert(DEV_ROOT_CA.certificate)
@ -245,7 +249,7 @@ class NetworkMapUpdaterTest {
setUpdater(netMapClient = null)
val fileNodeInfoAndSigned1 = createNodeInfoAndSigned("Info from file 1")
val fileNodeInfoAndSigned2 = createNodeInfoAndSigned("Info from file 2")
updater.subscribeToNetworkMap()
startUpdater()
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned1)
NodeInfoWatcher.saveToFile(nodeInfoDir, fileNodeInfoAndSigned2)
@ -278,7 +282,7 @@ class NetworkMapUpdaterTest {
NodeInfoWatcher.saveToFile(nodeInfoDir, localSignedNodeInfo)
// Publish to network map the one with lower serial.
networkMapClient.publish(serverSignedNodeInfo)
updater.subscribeToNetworkMap()
startUpdater()
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
verify(networkMapCache, times(1)).addNode(localNodeInfo)
Thread.sleep(2L * cacheExpiryMs)
@ -301,10 +305,10 @@ class NetworkMapUpdaterTest {
fun `not remove own node info when it is not in network map yet`() {
val (myInfo, signedMyInfo) = createNodeInfoAndSigned("My node info")
val (_, signedOtherInfo) = createNodeInfoAndSigned("Other info")
setUpdater(ourNodeHash = signedMyInfo.raw.hash)
setUpdater()
networkMapCache.addNode(myInfo) // Simulate behaviour on node startup when our node info is added to cache
networkMapClient.publish(signedOtherInfo)
updater.subscribeToNetworkMap()
startUpdater(ourNodeHash = signedMyInfo.raw.hash)
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapCache, never()).removeNode(myInfo)
assertThat(server.networkMapHashes()).containsOnly(signedOtherInfo.raw.hash)
@ -327,7 +331,7 @@ class NetworkMapUpdaterTest {
// Not subscribed yet.
verify(networkMapCache, times(0)).addNode(any())
updater.subscribeToNetworkMap()
startUpdater()
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)

View File

@ -51,7 +51,8 @@ class NetworkParametersReaderTest {
fun setUp() {
server = NetworkMapServer(cacheTimeout)
val address = server.start()
networkMapClient = NetworkMapClient(URL("http://$address"), DEV_ROOT_CA.certificate)
networkMapClient = NetworkMapClient(URL("http://$address"))
networkMapClient.start(DEV_ROOT_CA.certificate)
}
@After
@ -66,7 +67,7 @@ class NetworkParametersReaderTest {
val oldParameters = testNetworkParameters(epoch = 1)
NetworkParametersCopier(oldParameters).install(baseDirectory)
NetworkParametersCopier(server.networkParameters, update = true).install(baseDirectory) // Parameters update file.
val parameters = NetworkParametersReader(DEV_ROOT_CA.certificate, networkMapClient, baseDirectory).networkParameters
val parameters = NetworkParametersReader(DEV_ROOT_CA.certificate, networkMapClient, baseDirectory).read().networkParameters
assertFalse((baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME).exists())
assertEquals(server.networkParameters, parameters)
// Parameters from update should be moved to `network-parameters` file.
@ -82,7 +83,7 @@ class NetworkParametersReaderTest {
val baseDirectory = fs.getPath("/node").createDirectories()
val fileParameters = testNetworkParameters(epoch = 1)
NetworkParametersCopier(fileParameters).install(baseDirectory)
val parameters = NetworkParametersReader(DEV_ROOT_CA.certificate, networkMapClient, baseDirectory).networkParameters
val parameters = NetworkParametersReader(DEV_ROOT_CA.certificate, networkMapClient, baseDirectory).read().networkParameters
assertThat(parameters).isEqualTo(fileParameters)
}

View File

@ -129,7 +129,7 @@ class HibernateConfigurationTest {
services = object : MockServices(cordappPackages, BOB_NAME, rigorousMock<IdentityServiceInternal>().also {
doNothing().whenever(it).justVerifyAndRegisterIdentity(argThat { name == BOB_NAME })
}, generateKeyPair(), dummyNotary.keyPair) {
override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, hibernateConfig, database)
override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, database).apply { start() }
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
for (stx in txs) {
(validatedTransactions as WritableTransactionStorage).addTransaction(stx)

View File

@ -53,7 +53,11 @@ class NodeAttachmentStorageTest {
val dataSourceProperties = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProperties, DatabaseConfig(runMigration = true), { null }, { null })
fs = Jimfs.newFileSystem(Configuration.unix())
storage = NodeAttachmentService(MetricRegistry(), database = database).also { it.start() }
storage = NodeAttachmentService(MetricRegistry(), database).also {
database.transaction {
it.start()
}
}
}
@After

View File

@ -205,7 +205,7 @@ class FlowFrameworkTests {
.withMessage("Nothing useful")
.withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow
bobNode.database.transaction {
assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty()
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
}
assertThat(receivingFiber.state).isEqualTo(Strand.State.WAITING)
@ -722,14 +722,14 @@ class FlowFrameworkPersistenceTests {
// Kick off first send and receive
bobNode.services.startFlow(PingPongFlow(charlie, payload))
bobNode.database.transaction {
assertEquals(1, bobNode.checkpointStorage.checkpoints().size)
assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size)
}
// Make sure the add() has finished initial processing.
bobNode.internals.disableDBCloseOnStop()
// Restart node and thus reload the checkpoint and resend the message with same UUID
bobNode.dispose()
bobNode.database.transaction {
assertEquals(1, bobNode.checkpointStorage.checkpoints().size) // confirm checkpoint
assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size) // confirm checkpoint
bobNode.services.networkMapCache.clearNetworkMapCache()
}
val node2b = mockNet.createNode(InternalMockNodeParameters(bobNode.internals.id))
@ -745,10 +745,10 @@ class FlowFrameworkPersistenceTests {
// can't give a precise value as every addMessageHandler re-runs the undelivered messages
assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages")
node2b.database.transaction {
assertEquals(0, node2b.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
assertEquals(0, node2b.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
}
charlieNode.database.transaction {
assertEquals(0, charlieNode.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
assertEquals(0, charlieNode.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended")
}
assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3")
assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3")

View File

@ -36,7 +36,6 @@ import net.corda.core.utilities.unwrap
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.VaultServiceInternal
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.testing.core.singleIdentity
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.internal.cordappsForPackages
@ -93,9 +92,9 @@ class VaultSoftLockManagerTest {
}
private val mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages(ContractImpl::class.packageName), defaultFactory = { args, _ ->
object : InternalMockNetwork.MockNode(args) {
override fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, hibernateConfig: HibernateConfiguration, database: CordaPersistence): VaultServiceInternal {
override fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, database: CordaPersistence): VaultServiceInternal {
val node = this
val realVault = super.makeVaultService(keyManagementService, services, hibernateConfig, database)
val realVault = super.makeVaultService(keyManagementService, services, database)
return object : VaultServiceInternal by realVault {
override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) {
// Should be called before flow is removed

View File

@ -25,7 +25,7 @@ class PersistentMapTests {
}
},
persistentEntityClass = ContractUpgradeServiceImpl.DBContractUpgrade::class.java
)
).apply { preload() }
}
@Test

View File

@ -68,14 +68,14 @@ task patchSerialization(type: Zip, dependsOn: serializationJarTask) {
import proguard.gradle.ProGuardTask
task predeterminise(type: ProGuardTask, dependsOn: project(':core-deterministic').assemble) {
injars patchSerialization
outjars "$buildDir/proguard/pre-deterministic-${project.version}.jar"
outjars file("$buildDir/proguard/pre-deterministic-${project.version}.jar")
libraryjars "$javaHome/lib/rt.jar"
libraryjars "$javaHome/lib/jce.jar"
libraryjars "$javaHome/lib/ext/sunec.jar"
libraryjars file("$javaHome/lib/rt.jar")
libraryjars file("$javaHome/lib/jce.jar")
libraryjars file("$javaHome/lib/ext/sunec.jar")
configurations.compileOnly.forEach {
if (originalJar.path != it.path) {
libraryjars it.path, filter: '!META-INF/versions/**'
if (originalJar != it) {
libraryjars it, filter: '!META-INF/versions/**'
}
}
@ -113,12 +113,12 @@ task jarFilter(type: JarFilterTask) {
task determinise(type: ProGuardTask) {
injars jarFilter
outjars "$buildDir/proguard/$jarBaseName-${project.version}.jar"
outjars file("$buildDir/proguard/$jarBaseName-${project.version}.jar")
libraryjars "$javaHome/lib/rt.jar"
libraryjars "$javaHome/lib/jce.jar"
libraryjars file("$javaHome/lib/rt.jar")
libraryjars file("$javaHome/lib/jce.jar")
configurations.runtimeLibraries.forEach {
libraryjars it.path, filter: '!META-INF/versions/**'
libraryjars it, filter: '!META-INF/versions/**'
}
// Analyse the JAR for dead code, and remove (some of) it.
@ -152,7 +152,7 @@ task checkDeterminism(type: ProGuardTask, dependsOn: jdkTask) {
libraryjars deterministic_rt_jar
configurations.runtimeLibraries.forEach {
libraryjars it.path, filter: '!META-INF/versions/**'
libraryjars it, filter: '!META-INF/versions/**'
}
keepattributes '*'

View File

@ -67,7 +67,9 @@ class AttachmentsClassLoaderTests {
val testSerialization = SerializationEnvironmentRule()
private val attachments = MockAttachmentStorage()
private val networkParameters = testNetworkParameters()
private val cordappProvider = CordappProviderImpl(JarScanningCordappLoader.fromJarUrls(listOf(ISOLATED_CONTRACTS_JAR_PATH)), MockCordappConfigProvider(), attachments, networkParameters.whitelistedContractImplementations)
private val cordappProvider = CordappProviderImpl(JarScanningCordappLoader.fromJarUrls(listOf(ISOLATED_CONTRACTS_JAR_PATH)), MockCordappConfigProvider(), attachments).apply {
start(networkParameters.whitelistedContractImplementations)
}
private val cordapp get() = cordappProvider.cordapps.first()
private val attachmentId get() = cordappProvider.getCordappAttachmentId(cordapp)!!
private val appContext get() = cordappProvider.getAppContext(cordapp)

View File

@ -28,17 +28,11 @@ import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.trace
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.Message
import net.corda.node.services.messaging.MessageHandler
import net.corda.node.services.messaging.MessageHandlerRegistration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.messaging.*
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.SenderDeduplicationId
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.node.internal.InMemoryMessage
import net.corda.testing.node.internal.InternalMockMessagingService
import org.apache.activemq.artemis.utils.ReusableLatch
@ -52,7 +46,6 @@ import java.util.concurrent.LinkedBlockingQueue
import javax.annotation.concurrent.ThreadSafe
import kotlin.concurrent.schedule
import kotlin.concurrent.thread
import kotlin.jvm.Volatile
/**
* An in-memory network allows you to manufacture [InternalMockMessagingService]s for a set of participants. Each
@ -142,24 +135,29 @@ class InMemoryMessagingNetwork private constructor(
manuallyPumped: Boolean,
id: Int,
executor: AffinityExecutor,
notaryService: PartyAndCertificate?,
description: CordaX500Name = CordaX500Name(organisation = "In memory node $id", locality = "London", country = "UK"))
: InternalMockMessagingService {
description: CordaX500Name = CordaX500Name(organisation = "In memory node $id", locality = "London", country = "UK")
): InternalMockMessagingService {
val peerHandle = PeerHandle(id, description)
peersMapping[peerHandle.name] = peerHandle // Assume that the same name - the same entity in MockNetwork.
notaryService?.let { if (it.owningKey !is CompositeKey) peersMapping[it.name] = peerHandle }
val serviceHandles = notaryService?.let { listOf(DistributedServiceHandle(it.party)) }
?: emptyList() //TODO only notary can be distributed?
synchronized(this) {
return synchronized(this) {
val node = InMemoryMessaging(manuallyPumped, peerHandle, executor)
val oldNode = handleEndpointMap.put(peerHandle, node)
if (oldNode != null) {
node.inheritPendingRedelivery(oldNode)
}
node
}
}
internal fun onNotaryIdentity(node: InternalMockMessagingService, notaryService: PartyAndCertificate?) {
val peerHandle = (node as InMemoryMessaging).peerHandle
notaryService?.let { if (it.owningKey !is CompositeKey) peersMapping[it.name] = peerHandle }
val serviceHandles = notaryService?.let { listOf(DistributedServiceHandle(it.party)) }
?: emptyList() //TODO only notary can be distributed?
synchronized(this) {
serviceHandles.forEach {
serviceToPeersMapping.getOrPut(it) { LinkedHashSet() }.add(peerHandle)
}
return node
}
}
@ -206,8 +204,7 @@ class InMemoryMessagingNetwork private constructor(
handleEndpointMap.values.toList()
}
for (node in nodes)
node.stop()
nodes.forEach { it.close() }
handleEndpointMap.clear()
messageReceiveQueues.clear()
@ -368,7 +365,7 @@ class InMemoryMessagingNetwork private constructor(
@ThreadSafe
private inner class InMemoryMessaging(private val manuallyPumped: Boolean,
private val peerHandle: PeerHandle,
val peerHandle: PeerHandle,
private val executor: AffinityExecutor) : SingletonSerializeAsToken(), InternalMockMessagingService {
private inner class Handler(val topicSession: String, val callback: MessageHandler) : MessageHandlerRegistration
@ -444,7 +441,7 @@ class InMemoryMessagingNetwork private constructor(
}
}
override fun stop() {
override fun close() {
if (backgroundThread != null) {
backgroundThread.interrupt()
backgroundThread.join()

View File

@ -241,15 +241,20 @@ open class MockServices private constructor(
return NodeInfo(listOf(NetworkHostAndPort("mock.node.services", 10000)), listOf(initialIdentity.identity), 1, serial = 1L)
}
override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2)
private val mockCordappProvider: MockCordappProvider = MockCordappProvider(cordappLoader, attachments, networkParameters.whitelistedContractImplementations)
private val mockCordappProvider: MockCordappProvider = MockCordappProvider(cordappLoader, attachments).also {
it.start( networkParameters.whitelistedContractImplementations)
}
override val cordappProvider: CordappProvider get() = mockCordappProvider
protected val servicesForResolution: ServicesForResolution get() = ServicesForResolutionImpl(identityService, attachments, cordappProvider, networkParameters, validatedTransactions)
protected val servicesForResolution: ServicesForResolution
get() {
return ServicesForResolutionImpl(identityService, attachments, cordappProvider, validatedTransactions).also {
it.start(networkParameters)
}
}
internal fun makeVaultService(hibernateConfig: HibernateConfiguration, schemaService: SchemaService, database: CordaPersistence): VaultServiceInternal {
val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, hibernateConfig, database)
val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database).apply { start() }
HibernateObserver.install(vaultService.rawUpdates, hibernateConfig, schemaService)
return vaultService
}

View File

@ -18,7 +18,6 @@ import net.corda.core.DoNotImplement
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
@ -30,7 +29,6 @@ import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.node.NotaryInfo
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.utilities.*
import net.corda.node.VersionInfo
@ -38,10 +36,9 @@ import net.corda.node.cordapp.CordappLoader
import net.corda.node.internal.AbstractNode
import net.corda.node.internal.StartedNode
import net.corda.node.internal.cordapp.JarScanningCordappLoader
import net.corda.node.services.api.NodePropertiesStore
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.*
import net.corda.node.services.keys.E2ETestKeyManagementService
import net.corda.node.services.keys.KeyManagementServiceInternal
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.transactions.BFTNonValidatingNotaryService
import net.corda.node.services.transactions.BFTSMaRt
@ -50,7 +47,6 @@ import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.nodeapi.internal.DevIdentityGenerator
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.driver.TestCorDapp
@ -60,6 +56,7 @@ import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.apache.sshd.common.util.security.SecurityUtils
import rx.Scheduler
import rx.internal.schedulers.CachedThreadScheduler
import java.math.BigInteger
import java.nio.file.Path
@ -234,11 +231,21 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
}
}
private fun getServerThread(id: Int): ServiceAffinityExecutor {
return if (threadPerNode) {
ServiceAffinityExecutor("Mock node $id thread", 1)
} else {
sharedUserCount.incrementAndGet()
sharedServerThread
}
}
open class MockNode(args: MockNodeArgs, cordappLoader: CordappLoader = JarScanningCordappLoader.fromDirectories(args.config.cordappDirectories)) : AbstractNode(
args.config,
TestClock(Clock.systemUTC()),
args.version,
cordappLoader,
args.network.getServerThread(args.id),
args.network.busyLatch
) {
companion object {
@ -250,13 +257,16 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
private val entropyRoot = args.entropyRoot
var counter = entropyRoot
override val log get() = staticLog
override val serverThread =
if (mockNet.threadPerNode) {
ServiceAffinityExecutor("Mock node $id thread", 1)
} else {
mockNet.sharedUserCount.incrementAndGet()
mockNet.sharedServerThread
override val transactionVerifierWorkerCount: Int get() = 1
private var _rxIoScheduler: Scheduler? = null
override val rxIoScheduler: Scheduler
get() {
return _rxIoScheduler ?: CachedThreadScheduler(testThreadFactory()).also {
runOnStop += it::shutdown
_rxIoScheduler = it
}
}
override val started: StartedNode<MockNode>? get() = uncheckedCast(super.started)
@ -267,7 +277,6 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
return started
}
override fun getRxIoScheduler() = CachedThreadScheduler(testThreadFactory()).also { runOnStop += it::shutdown }
private fun advertiseNodeToNetwork(newNode: StartedNode<MockNode>) {
mockNet.nodes
.mapNotNull { it.started }
@ -277,34 +286,38 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
}
}
// We only need to override the messaging service here, as currently everything that hits disk does so
// through the java.nio API which we are already mocking via Jimfs.
override fun makeMessagingService(database: CordaPersistence, info: NodeInfo, nodeProperties: NodePropertiesStore, networkParameters: NetworkParameters): MessagingService {
override fun makeMessagingService(): MessagingService {
require(id >= 0) { "Node ID must be zero or positive, was passed: $id" }
// TODO AbstractNode is forced to call this method in start(), and not in the c'tor, because the mockNet
// c'tor parameter isn't available. We need to be able to return a InternalMockMessagingService
// here that can be populated properly in startMessagingService.
return mockNet.messagingNetwork.createNodeWithID(
!mockNet.threadPerNode,
id,
serverThread,
myNotaryIdentity,
configuration.myLegalName).also { runOnStop += it::stop }
configuration.myLegalName
).closeOnStop()
}
override fun startMessagingService(rpcOps: RPCOps,
nodeInfo: NodeInfo,
myNotaryIdentity: PartyAndCertificate?,
networkParameters: NetworkParameters) {
mockNet.messagingNetwork.onNotaryIdentity(network as InternalMockMessagingService, myNotaryIdentity)
}
fun setMessagingServiceSpy(messagingServiceSpy: MessagingServiceSpy) {
network = messagingServiceSpy
}
override fun makeKeyManagementService(identityService: IdentityService, keyPairs: Set<KeyPair>, database: CordaPersistence): KeyManagementService {
return E2ETestKeyManagementService(identityService, keyPairs)
override fun makeKeyManagementService(identityService: IdentityService): KeyManagementServiceInternal {
return E2ETestKeyManagementService(identityService)
}
override fun startShell() {
//No mock shell
}
override fun startMessagingService(rpcOps: RPCOps) {
// Nothing to do
}
// This is not thread safe, but node construction is done on a single thread, so that should always be fine
override fun generateKeyPair(): KeyPair {
counter = counter.add(BigInteger.ONE)
@ -312,8 +325,6 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
return Crypto.deriveKeyPairFromEntropy(Crypto.EDDSA_ED25519_SHA512, counter)
}
override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1)
// NodeInfo requires a non-empty addresses list and so we give it a dummy value for mock nodes.
// The non-empty addresses check is important to have and so we tolerate the ugliness here.
override fun myAddresses(): List<NetworkHostAndPort> = listOf(NetworkHostAndPort("mock.node", 1000))
@ -324,10 +335,11 @@ open class InternalMockNetwork(defaultParameters: MockNetworkParameters = MockNe
override val serializationWhitelists: List<SerializationWhitelist>
get() = _serializationWhitelists
private var dbCloser: (() -> Any?)? = null
override fun initialiseDatabasePersistence(schemaService: SchemaService,
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?): CordaPersistence {
return super.initialiseDatabasePersistence(schemaService, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous).also { dbCloser = it::close }
override fun startDatabase() {
super.startDatabase()
dbCloser = database::close
runOnStop += dbCloser!!
}
fun disableDBCloseOnStop() {

View File

@ -132,8 +132,6 @@ fun InMemoryMessagingNetwork.MessageTransfer.getMessage(): Message = message
internal interface InternalMockMessagingService : MessagingService {
fun pumpReceive(block: Boolean): InMemoryMessagingNetwork.MessageTransfer?
fun stop()
}
/**

View File

@ -167,5 +167,5 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
class InProcessNode(configuration: NodeConfiguration, versionInfo: VersionInfo) : EnterpriseNode(configuration, versionInfo, false) {
override fun getRxIoScheduler() = CachedThreadScheduler(testThreadFactory()).also { runOnStop += it::shutdown }
override val rxIoScheduler get() = CachedThreadScheduler(testThreadFactory()).also { runOnStop += it::shutdown }
}

View File

@ -27,12 +27,10 @@ import java.util.*
class MockCordappProvider(
cordappLoader: CordappLoader,
attachmentStorage: AttachmentStorage,
whitelistedContractImplementations: Map<String, List<AttachmentId>>,
cordappConfigProvider: MockCordappConfigProvider = MockCordappConfigProvider()
) : CordappProviderImpl(cordappLoader, cordappConfigProvider, attachmentStorage, whitelistedContractImplementations) {
constructor(cordappLoader: CordappLoader, attachmentStorage: AttachmentStorage, whitelistedContractImplementations: Map<String, List<AttachmentId>>) : this(cordappLoader, attachmentStorage, whitelistedContractImplementations, MockCordappConfigProvider())
) : CordappProviderImpl(cordappLoader, cordappConfigProvider, attachmentStorage) {
val cordappRegistry = mutableListOf<Pair<Cordapp, AttachmentId>>()
private val cordappRegistry = mutableListOf<Pair<Cordapp, AttachmentId>>()
fun addMockCordapp(contractClassName: ContractClassName, attachments: MockAttachmentStorage) {
val cordapp = CordappImpl(