Merge pull request #2147 from corda/shams-doorman-node-persistence-deps

Moved CordaPersistence and dependent classes into internal package in…
This commit is contained in:
Shams Asari
2017-11-29 17:51:35 +00:00
committed by GitHub
63 changed files with 331 additions and 297 deletions

View File

@ -3,6 +3,8 @@ package net.corda.node.internal
import com.codahale.metrics.MetricRegistry
import com.google.common.collect.MutableClassToInstanceMap
import com.google.common.util.concurrent.MoreExecutors
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import net.corda.confidential.SwapIdentitiesFlow
import net.corda.confidential.SwapIdentitiesHandler
import net.corda.core.CordaException
@ -58,10 +60,12 @@ import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSoftLockManager
import net.corda.node.shell.InteractiveShell
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.nodeapi.internal.crypto.*
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import org.apache.activemq.artemis.utils.ReusableLatch
import org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry
import org.slf4j.Logger
import rx.Observable
import java.io.IOException
@ -181,7 +185,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
log.info("Node starting up ...")
initCertificate()
val (keyPairs, info) = initNodeInfo()
val schemaService = NodeSchemaService(cordappLoader)
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
val identityService = makeIdentityService(info)
// Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
@ -207,7 +211,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS)
}
}
makeVaultObservers(schedulerService, database.hibernateConfig, smm)
makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService)
val rpcOps = makeRPCOps(flowStarter, database, smm)
startMessagingService(rpcOps)
installCoreFlows()
@ -513,10 +517,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
protected open fun makeTransactionStorage(database: CordaPersistence): WritableTransactionStorage = DBTransactionStorage()
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, smm: StateMachineManager) {
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, smm: StateMachineManager, schemaService: SchemaService) {
VaultSoftLockManager.install(services.vaultService, smm)
ScheduledActivityObserver.install(services.vaultService, schedulerService)
HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig)
HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig, schemaService)
}
@VisibleForTesting
@ -753,7 +758,23 @@ internal class FlowStarterImpl(private val serverThread: AffinityExecutor, priva
}
class ConfigurationException(message: String) : CordaException(message)
/**
* Thrown when a node is about to start and its network map cache doesn't contain any node.
*/
internal class NetworkMapCacheEmptyException : Exception()
internal class NetworkMapCacheEmptyException : Exception()
fun configureDatabase(dataSourceProperties: Properties,
databaseConfig: DatabaseConfig,
identityService: IdentityService,
schemaService: SchemaService = NodeSchemaService()): CordaPersistence {
// Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately
// Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
// either Hibernate can be convinced to stop warning, use the descriptor by default, or something else.
JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(identityService))
val config = HikariConfig(dataSourceProperties)
val dataSource = HikariDataSource(config)
val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(identityService))
return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, attributeConverters)
}

View File

@ -26,7 +26,7 @@ import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.context
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
import rx.Observable
import java.io.InputStream
import java.security.PublicKey

View File

@ -10,9 +10,9 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.node.VersionInfo
import net.corda.node.internal.cordapp.CordappLoader
@ -25,10 +25,10 @@ import net.corda.node.services.messaging.*
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.utilities.AddressUtils
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.DemoClock
import net.corda.nodeapi.internal.ShutdownHook
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.serialization.*
import net.corda.nodeapi.internal.serialization.amqp.AMQPServerSerializationScheme
import org.slf4j.Logger

View File

@ -5,7 +5,7 @@ import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.rpcContext
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
/**
* Implementation of [CordaRPCOps] that checks authorisation.

View File

@ -14,7 +14,7 @@ import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
import rx.Observable
interface StartedNode<out N : AbstractNode> {

View File

@ -52,12 +52,14 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
}
}
val cordappSchemas: Set<MappedSchema> get() = cordapps.flatMap { it.customSchemas }.toSet()
companion object {
private val logger = contextLogger()
/**
* Default cordapp dir name
*/
val CORDAPPS_DIR_NAME = "cordapps"
private const val CORDAPPS_DIR_NAME = "cordapps"
/**
* Creates a default CordappLoader intended to be used in non-dev or non-test environments.
@ -94,8 +96,9 @@ class CordappLoader private constructor(private val cordappJarPaths: List<Restri
* CorDapps.
*/
@VisibleForTesting
fun createWithTestPackages(testPackages: List<String>)
= cordappLoadersCache.computeIfAbsent(testPackages, { CordappLoader(testPackages.flatMap(this::createScanPackage)) })
fun createWithTestPackages(testPackages: List<String>): CordappLoader {
return cordappLoadersCache.computeIfAbsent(testPackages, { CordappLoader(testPackages.flatMap(this::createScanPackage)) })
}
/**
* Creates a dev mode CordappLoader intended only to be used in test environments

View File

@ -1,11 +1,11 @@
package net.corda.node.services.api
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowStateMachine
import net.corda.core.context.InvocationContext
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineTransactionMapping
@ -23,7 +23,7 @@ import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBaseInternal
interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {

View File

@ -5,6 +5,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.node.services.messaging.CertificateChainCheckPolicy
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.User
import net.corda.nodeapi.config.NodeSSLConfiguration
import net.corda.nodeapi.config.parseAs
@ -43,25 +44,6 @@ interface NodeConfiguration : NodeSSLConfiguration {
data class DevModeOptions(val disableCheckpointChecker: Boolean = false)
data class DatabaseConfig(
val initialiseSchema: Boolean = true,
val serverNameTablePrefix: String = "",
val transactionIsolationLevel: TransactionIsolationLevel = TransactionIsolationLevel.REPEATABLE_READ
)
enum class TransactionIsolationLevel {
NONE,
READ_UNCOMMITTED,
READ_COMMITTED,
REPEATABLE_READ,
SERIALIZABLE;
/**
* The JDBC constant value of the same name but with prefixed with TRANSACTION_ defined in [java.sql.Connection].
*/
val jdbcValue: Int = java.sql.Connection::class.java.getField("TRANSACTION_$name").get(null) as Int
}
fun NodeConfiguration.shouldCheckCheckpoints(): Boolean {
return this.devMode && this.devModeOptions?.disableCheckpointChecker != true
}

View File

@ -3,6 +3,7 @@ package net.corda.node.services.events
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.context.InvocationContext
import net.corda.core.context.Origin
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledActivity
import net.corda.core.contracts.ScheduledStateRef
@ -12,7 +13,6 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.context.Origin
import net.corda.core.internal.until
import net.corda.core.node.StateLoader
import net.corda.core.schemas.PersistentStateRef
@ -24,9 +24,9 @@ import net.corda.node.services.api.FlowStarter
import net.corda.node.services.api.SchedulerService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.apache.activemq.artemis.utils.ReusableLatch
import java.time.Clock
import java.time.Instant

View File

@ -13,7 +13,7 @@ import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
import org.bouncycastle.cert.X509CertificateHolder
import java.security.InvalidAlgorithmParameterException

View File

@ -7,7 +7,7 @@ import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.bouncycastle.operator.ContentSigner
import java.security.KeyPair
import java.security.PrivateKey

View File

@ -17,10 +17,13 @@ import net.corda.core.utilities.trace
import net.corda.node.VersionInfo
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.statemachine.StateMachineManagerImpl
import net.corda.node.utilities.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.*
import org.apache.activemq.artemis.api.core.RoutingType

View File

@ -22,9 +22,9 @@ import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.node.services.api.NetworkMapCacheBaseInternal
import net.corda.node.services.api.NetworkMapCacheInternal
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.wrapWithDatabaseTransaction
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import org.hibernate.Session
import rx.Observable
import rx.subjects.PublishSubject

View File

@ -3,8 +3,8 @@ package net.corda.node.services.persistence
import net.corda.core.serialization.SerializedBytes
import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.node.utilities.currentDBSession
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id

View File

@ -7,6 +7,9 @@ import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage
import net.corda.node.utilities.*
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import rx.subjects.PublishSubject
import java.util.*
import javax.annotation.concurrent.ThreadSafe

View File

@ -8,6 +8,9 @@ import net.corda.core.serialization.*
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.utilities.*
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import rx.Observable
import rx.subjects.PublishSubject
import javax.persistence.*

View File

@ -1,155 +0,0 @@
package net.corda.node.services.persistence
import net.corda.core.internal.castIfPossible
import net.corda.core.node.services.IdentityService
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.toHexString
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.utilities.DatabaseTransactionManager
import org.hibernate.SessionFactory
import org.hibernate.boot.MetadataSources
import org.hibernate.boot.model.naming.Identifier
import org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl
import org.hibernate.boot.registry.BootstrapServiceRegistryBuilder
import org.hibernate.cfg.Configuration
import org.hibernate.engine.jdbc.connections.spi.ConnectionProvider
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment
import org.hibernate.service.UnknownUnwrapTypeException
import org.hibernate.type.AbstractSingleColumnStandardBasicType
import org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry
import org.hibernate.type.descriptor.java.PrimitiveByteArrayTypeDescriptor
import org.hibernate.type.descriptor.sql.BlobTypeDescriptor
import org.hibernate.type.descriptor.sql.VarbinaryTypeDescriptor
import java.sql.Connection
import java.util.concurrent.ConcurrentHashMap
class HibernateConfiguration(val schemaService: SchemaService, private val databaseConfig: DatabaseConfig, private val identityService: IdentityService) {
companion object {
private val logger = contextLogger()
}
// TODO: make this a guava cache or similar to limit ability for this to grow forever.
private val sessionFactories = ConcurrentHashMap<Set<MappedSchema>, SessionFactory>()
val sessionFactoryForRegisteredSchemas = schemaService.schemaOptions.keys.let {
logger.info("Init HibernateConfiguration for schemas: $it")
// Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately
// Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
// either Hibernate can be convinced to stop warning, use the descriptor by default, or something else.
JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(identityService))
sessionFactoryForSchemas(it)
}
/** @param key must be immutable, not just read-only. */
fun sessionFactoryForSchemas(key: Set<MappedSchema>) = sessionFactories.computeIfAbsent(key, { makeSessionFactoryForSchemas(key) })
private fun makeSessionFactoryForSchemas(schemas: Set<MappedSchema>): SessionFactory {
logger.info("Creating session factory for schemas: $schemas")
val serviceRegistry = BootstrapServiceRegistryBuilder().build()
val metadataSources = MetadataSources(serviceRegistry)
// We set a connection provider as the auto schema generation requires it. The auto schema generation will not
// necessarily remain and would likely be replaced by something like Liquibase. For now it is very convenient though.
// TODO: replace auto schema generation as it isn't intended for production use, according to Hibernate docs.
val config = Configuration(metadataSources).setProperty("hibernate.connection.provider_class", NodeDatabaseConnectionProvider::class.java.name)
.setProperty("hibernate.hbm2ddl.auto", if (databaseConfig.initialiseSchema) "update" else "validate")
.setProperty("hibernate.format_sql", "true")
.setProperty("hibernate.connection.isolation", databaseConfig.transactionIsolationLevel.jdbcValue.toString())
schemas.forEach { schema ->
// TODO: require mechanism to set schemaOptions (databaseSchema, tablePrefix) which are not global to session
schema.mappedTypes.forEach { config.addAnnotatedClass(it) }
}
val sessionFactory = buildSessionFactory(config, metadataSources, databaseConfig.serverNameTablePrefix)
logger.info("Created session factory for schemas: $schemas")
return sessionFactory
}
private fun buildSessionFactory(config: Configuration, metadataSources: MetadataSources, tablePrefix: String): SessionFactory {
config.standardServiceRegistryBuilder.applySettings(config.properties)
val metadata = metadataSources.getMetadataBuilder(config.standardServiceRegistryBuilder.build()).run {
applyPhysicalNamingStrategy(object : PhysicalNamingStrategyStandardImpl() {
override fun toPhysicalTableName(name: Identifier?, context: JdbcEnvironment?): Identifier {
val default = super.toPhysicalTableName(name, context)
return Identifier.toIdentifier(tablePrefix + default.text, default.isQuoted)
}
})
// register custom converters
applyAttributeConverter(AbstractPartyToX500NameAsStringConverter(identityService))
// Register a tweaked version of `org.hibernate.type.MaterializedBlobType` that truncates logged messages.
// to avoid OOM when large blobs might get logged.
applyBasicType(CordaMaterializedBlobType, CordaMaterializedBlobType.name)
applyBasicType(CordaWrapperBinaryType, CordaWrapperBinaryType.name)
build()
}
return metadata.sessionFactoryBuilder.run {
allowOutOfTransactionUpdateOperations(true)
applySecondLevelCacheSupport(false)
applyQueryCacheSupport(false)
enableReleaseResourcesOnCloseEnabled(true)
build()
}
}
// Supply Hibernate with connections from our underlying Exposed database integration. Only used
// during schema creation / update.
class NodeDatabaseConnectionProvider : ConnectionProvider {
override fun closeConnection(conn: Connection) {
conn.autoCommit = false
val tx = DatabaseTransactionManager.current()
tx.commit()
tx.close()
}
override fun supportsAggressiveRelease(): Boolean = true
override fun getConnection(): Connection {
return DatabaseTransactionManager.newTransaction().connection
}
override fun <T : Any?> unwrap(unwrapType: Class<T>): T {
return unwrapType.castIfPossible(this) ?: throw UnknownUnwrapTypeException(unwrapType)
}
override fun isUnwrappableAs(unwrapType: Class<*>?): Boolean = unwrapType == NodeDatabaseConnectionProvider::class.java
}
// A tweaked version of `org.hibernate.type.MaterializedBlobType` that truncates logged messages. Also logs in hex.
private object CordaMaterializedBlobType : AbstractSingleColumnStandardBasicType<ByteArray>(BlobTypeDescriptor.DEFAULT, CordaPrimitiveByteArrayTypeDescriptor) {
override fun getName(): String {
return "materialized_blob"
}
}
// A tweaked version of `org.hibernate.type.descriptor.java.PrimitiveByteArrayTypeDescriptor` that truncates logged messages.
private object CordaPrimitiveByteArrayTypeDescriptor : PrimitiveByteArrayTypeDescriptor() {
private val LOG_SIZE_LIMIT = 1024
override fun extractLoggableRepresentation(value: ByteArray?): String {
return if (value == null) {
super.extractLoggableRepresentation(value)
} else {
if (value.size <= LOG_SIZE_LIMIT) {
"[size=${value.size}, value=${value.toHexString()}]"
} else {
"[size=${value.size}, value=${value.copyOfRange(0, LOG_SIZE_LIMIT).toHexString()}...truncated...]"
}
}
}
}
// A tweaked version of `org.hibernate.type.WrapperBinaryType` that deals with ByteArray (java primitive byte[] type).
private object CordaWrapperBinaryType : AbstractSingleColumnStandardBasicType<ByteArray>(VarbinaryTypeDescriptor.INSTANCE, PrimitiveByteArrayTypeDescriptor.INSTANCE) {
override fun getRegistrationKeys(): Array<String> {
return arrayOf(name, "ByteArray", ByteArray::class.java.name)
}
override fun getName(): String {
return "corda-wrapper-binary"
}
}
}

View File

@ -17,9 +17,9 @@ import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.serialization.*
import net.corda.core.utilities.contextLogger
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
import net.corda.node.utilities.DatabaseTransactionManager
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.node.utilities.currentDBSession
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
import java.io.*
import java.nio.file.Paths
import java.time.Instant

View File

@ -9,8 +9,9 @@ import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.services.persistence.HibernateConfiguration
import net.corda.node.utilities.DatabaseTransactionManager
import net.corda.node.services.api.SchemaService
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
import org.hibernate.FlushMode
import rx.Observable
@ -18,12 +19,11 @@ import rx.Observable
* A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate.
*/
// TODO: Manage version evolution of the schemas via additional tooling.
class HibernateObserver private constructor(private val config: HibernateConfiguration) {
class HibernateObserver private constructor(private val config: HibernateConfiguration, private val schemaService: SchemaService) {
companion object {
private val log = contextLogger()
@JvmStatic
fun install(vaultUpdates: Observable<Vault.Update<ContractState>>, config: HibernateConfiguration): HibernateObserver {
val observer = HibernateObserver(config)
fun install(vaultUpdates: Observable<Vault.Update<ContractState>>, config: HibernateConfiguration, schemaService: SchemaService): HibernateObserver {
val observer = HibernateObserver(config, schemaService)
vaultUpdates.subscribe { observer.persist(it.produced) }
return observer
}
@ -36,7 +36,7 @@ class HibernateObserver private constructor(private val config: HibernateConfigu
private fun persistState(stateAndRef: StateAndRef<ContractState>) {
val state = stateAndRef.state.data
log.debug { "Asked to persist state ${stateAndRef.ref}" }
config.schemaService.selectSchemas(state).forEach { persistStateWithSchema(state, stateAndRef.ref, it) }
schemaService.selectSchemas(state).forEach { persistStateWithSchema(state, stateAndRef.ref, it) }
}
@VisibleForTesting
@ -47,7 +47,7 @@ class HibernateObserver private constructor(private val config: HibernateConfigu
flushMode(FlushMode.MANUAL).
openSession()
session.use {
val mappedObject = config.schemaService.generateMappedObject(state, schema)
val mappedObject = schemaService.generateMappedObject(state, schema)
mappedObject.stateRef = PersistentStateRef(stateRef)
it.persist(mappedObject)
it.flush()

View File

@ -9,8 +9,8 @@ import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.api.SchemaService
import net.corda.node.services.api.SchemaService.SchemaOptions
import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService
@ -27,14 +27,12 @@ import net.corda.node.services.vault.VaultSchemaV1
/**
* Most basic implementation of [SchemaService].
* @param cordappLoader if not null, custom schemas will be extracted from its cordapps.
* TODO: support loading schema options from node configuration.
* TODO: support configuring what schemas are to be selected for persistence.
* TODO: support plugins for schema version upgrading or custom mapping not supported by original [QueryableState].
* TODO: create whitelisted tables when a CorDapp is first installed
*/
class NodeSchemaService(cordappLoader: CordappLoader?) : SchemaService, SingletonSerializeAsToken() {
class NodeSchemaService(extraSchemas: Set<MappedSchema> = emptySet()) : SchemaService, SingletonSerializeAsToken() {
// Entities for compulsory services
object NodeServices
@ -60,17 +58,12 @@ class NodeSchemaService(cordappLoader: CordappLoader?) : SchemaService, Singleto
// Required schemas are those used by internal Corda services
// For example, cash is used by the vault for coin selection (but will be extracted as a standalone CorDapp in future)
private val requiredSchemas: Map<MappedSchema, SchemaService.SchemaOptions> =
mapOf(Pair(CommonSchemaV1, SchemaService.SchemaOptions()),
Pair(VaultSchemaV1, SchemaService.SchemaOptions()),
Pair(NodeInfoSchemaV1, SchemaService.SchemaOptions()),
Pair(NodeServicesV1, SchemaService.SchemaOptions()))
mapOf(Pair(CommonSchemaV1, SchemaOptions()),
Pair(VaultSchemaV1, SchemaOptions()),
Pair(NodeInfoSchemaV1, SchemaOptions()),
Pair(NodeServicesV1, SchemaOptions()))
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = if (cordappLoader == null) {
requiredSchemas
} else {
val customSchemas = cordappLoader.cordapps.flatMap { it.customSchemas }.toSet()
requiredSchemas.plus(customSchemas.map { mappedSchema -> Pair(mappedSchema, SchemaService.SchemaOptions()) })
}
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas + extraSchemas.associateBy({ it }, { SchemaOptions() })
// Currently returns all schemas supported by the state, with no filtering or enrichment.
override fun selectSchemas(state: ContractState): Iterable<MappedSchema> {

View File

@ -7,6 +7,7 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import com.google.common.primitives.Primitives
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.*
@ -15,7 +16,6 @@ import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.context.InvocationContext
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
@ -25,9 +25,9 @@ import net.corda.node.services.api.FlowPermissionAuditEvent
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.logging.pushToLoggingContext
import net.corda.node.services.statemachine.FlowSessionState.Initiating
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.DatabaseTransaction
import net.corda.node.utilities.DatabaseTransactionManager
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.nio.file.Paths

View File

@ -27,7 +27,10 @@ import net.corda.core.serialization.SerializationDefaults.SERIALIZATION_FACTORY
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.*
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
@ -35,7 +38,11 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.messaging.TopicSession
import net.corda.node.utilities.*
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.newNamedSingleThreadExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import net.corda.nodeapi.internal.serialization.SerializeAsTokenContextImpl
import net.corda.nodeapi.internal.serialization.withTokenContext
import org.apache.activemq.artemis.utils.ReusableLatch

View File

@ -23,7 +23,7 @@ import net.corda.core.utilities.*
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.BFTSMaRtConfiguration
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import java.security.PublicKey
import javax.persistence.Entity
import javax.persistence.Table

View File

@ -5,8 +5,9 @@ import io.atomix.copycat.Query
import io.atomix.copycat.server.Commit
import io.atomix.copycat.server.StateMachine
import net.corda.core.utilities.contextLogger
import net.corda.node.utilities.*
import java.util.LinkedHashMap
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import java.util.*
/**
* A distributed map state machine that doesn't allow overriding values. The state machine is replicated

View File

@ -12,7 +12,7 @@ import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import java.io.Serializable
import java.util.*
import javax.annotation.concurrent.ThreadSafe

View File

@ -29,10 +29,10 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.node.services.config.RaftConfig
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.nodeapi.config.NodeSSLConfiguration
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import java.nio.file.Path
import java.util.concurrent.CompletableFuture
import javax.annotation.concurrent.ThreadSafe

View File

@ -4,7 +4,7 @@ import net.corda.core.contracts.StateRef
import net.corda.core.contracts.UpgradedContract
import net.corda.core.node.services.ContractUpgradeService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.node.utilities.PersistentMap
import javax.persistence.Column
import javax.persistence.Entity

View File

@ -17,12 +17,12 @@ import net.corda.core.transactions.NotaryChangeWireTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.*
import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.persistence.HibernateConfiguration
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.utilities.DatabaseTransactionManager
import net.corda.node.utilities.bufferUntilDatabaseCommit
import net.corda.node.utilities.currentDBSession
import net.corda.node.utilities.wrapWithDatabaseTransaction
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import org.hibernate.Session
import rx.Observable
import rx.subjects.PublishSubject

View File

@ -22,9 +22,7 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.utilities.getOrThrow
import net.corda.core.node.services.IdentityService
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.node.services.RPCUserService
@ -33,8 +31,8 @@ import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
import net.corda.node.services.messaging.RpcAuthContext
import net.corda.node.services.messaging.RpcPermissions
import net.corda.node.utilities.ANSIProgressRenderer
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.StdoutANSIProgressRenderer
import net.corda.nodeapi.internal.persistence.CordaPersistence
import org.crsh.command.InvocationContext
import org.crsh.console.jline.JLineProcessor
import org.crsh.console.jline.TerminalFactory
@ -94,7 +92,7 @@ object InteractiveShell {
* Starts an interactive shell connected to the local terminal. This shell gives administrator access to the node
* internals.
*/
fun startShell(configuration:NodeConfiguration, cordaRPCOps: CordaRPCOps, userService: RPCUserService, identityService: IdentityService, database:CordaPersistence) {
fun startShell(configuration:NodeConfiguration, cordaRPCOps: CordaRPCOps, userService: RPCUserService, identityService: IdentityService, database: CordaPersistence) {
this.rpcOps = cordaRPCOps
this.userService = userService
this.identityService = identityService

View File

@ -1,6 +1,7 @@
package net.corda.node.utilities
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.persistence.currentDBSession
import java.util.*

View File

@ -1,218 +0,0 @@
package net.corda.node.utilities
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import net.corda.core.node.services.IdentityService
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.services.persistence.HibernateConfiguration
import net.corda.node.services.schema.NodeSchemaService
import rx.Observable
import rx.Subscriber
import rx.subjects.UnicastSubject
import java.io.Closeable
import java.sql.Connection
import java.sql.SQLException
import java.util.*
import java.util.concurrent.CopyOnWriteArrayList
/**
* Table prefix for all tables owned by the node module.
*/
const val NODE_DATABASE_PREFIX = "node_"
//HikariDataSource implements Closeable which allows CordaPersistence to be Closeable
class CordaPersistence(
val dataSource: HikariDataSource,
private val schemaService: SchemaService,
private val identityService: IdentityService,
databaseConfig: DatabaseConfig
) : Closeable {
val transactionIsolationLevel = databaseConfig.transactionIsolationLevel.jdbcValue
val hibernateConfig: HibernateConfiguration by lazy {
transaction {
HibernateConfiguration(schemaService, databaseConfig, identityService)
}
}
val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas
companion object {
fun connect(dataSource: HikariDataSource, schemaService: SchemaService, identityService: IdentityService, databaseConfig: DatabaseConfig): CordaPersistence {
return CordaPersistence(dataSource, schemaService, identityService, databaseConfig).apply {
DatabaseTransactionManager(this)
}
}
}
/**
* Creates an instance of [DatabaseTransaction], with the given isolation level.
* @param isolationLevel isolation level for the transaction. If not specified the default (i.e. provided at the creation time) is used.
*/
fun createTransaction(isolationLevel: Int): DatabaseTransaction {
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
DatabaseTransactionManager.dataSource = this
return DatabaseTransactionManager.currentOrNew(isolationLevel)
}
/**
* Creates an instance of [DatabaseTransaction], with the transaction isolation level specified at the creation time.
*/
fun createTransaction(): DatabaseTransaction = createTransaction(transactionIsolationLevel)
fun createSession(): Connection {
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
DatabaseTransactionManager.dataSource = this
val ctx = DatabaseTransactionManager.currentOrNull()
return ctx?.connection ?: throw IllegalStateException("Was expecting to find database transaction: must wrap calling code within a transaction.")
}
/**
* Executes given statement in the scope of transaction, with the given isolation level.
* @param isolationLevel isolation level for the transaction.
* @param statement to be executed in the scope of this transaction.
*/
fun <T> transaction(isolationLevel: Int, statement: DatabaseTransaction.() -> T): T {
DatabaseTransactionManager.dataSource = this
return transaction(isolationLevel, 3, statement)
}
/**
* Executes given statement in the scope of transaction with the transaction level specified at the creation time.
* @param statement to be executed in the scope of this transaction.
*/
fun <T> transaction(statement: DatabaseTransaction.() -> T): T = transaction(transactionIsolationLevel, statement)
private fun <T> transaction(transactionIsolation: Int, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T {
val outer = DatabaseTransactionManager.currentOrNull()
return if (outer != null) {
outer.statement()
} else {
inTopLevelTransaction(transactionIsolation, repetitionAttempts, statement)
}
}
private fun <T> inTopLevelTransaction(transactionIsolation: Int, repetitionAttempts: Int, statement: DatabaseTransaction.() -> T): T {
var repetitions = 0
while (true) {
val transaction = DatabaseTransactionManager.currentOrNew(transactionIsolation)
try {
val answer = transaction.statement()
transaction.commit()
return answer
} catch (e: SQLException) {
transaction.rollback()
repetitions++
if (repetitions >= repetitionAttempts) {
throw e
}
} catch (e: Throwable) {
transaction.rollback()
throw e
} finally {
transaction.close()
}
}
}
override fun close() {
dataSource.close()
}
}
fun configureDatabase(dataSourceProperties: Properties, databaseConfig: DatabaseConfig, identityService: IdentityService, schemaService: SchemaService = NodeSchemaService(null)): CordaPersistence {
val config = HikariConfig(dataSourceProperties)
val dataSource = HikariDataSource(config)
val persistence = CordaPersistence.connect(dataSource, schemaService, identityService, databaseConfig)
// Check not in read-only mode.
persistence.transaction {
persistence.dataSource.connection.use {
check(!it.metaData.isReadOnly) { "Database should not be readonly." }
}
}
return persistence
}
/**
* Buffer observations until after the current database transaction has been closed. Observations are never
* dropped, simply delayed.
*
* Primarily for use by component authors to publish observations during database transactions without racing against
* closing the database transaction.
*
* For examples, see the call hierarchy of this function.
*/
fun <T : Any> rx.Observer<T>.bufferUntilDatabaseCommit(): rx.Observer<T> {
val currentTxId = DatabaseTransactionManager.transactionId
val databaseTxBoundary: Observable<DatabaseTransactionManager.Boundary> = DatabaseTransactionManager.transactionBoundaries.filter { it.txId == currentTxId }.first()
val subject = UnicastSubject.create<T>()
subject.delaySubscription(databaseTxBoundary).subscribe(this)
databaseTxBoundary.doOnCompleted { subject.onCompleted() }
return subject
}
// A subscriber that delegates to multiple others, wrapping a database transaction around the combination.
private class DatabaseTransactionWrappingSubscriber<U>(val db: CordaPersistence?) : Subscriber<U>() {
// Some unsubscribes happen inside onNext() so need something that supports concurrent modification.
val delegates = CopyOnWriteArrayList<Subscriber<in U>>()
fun forEachSubscriberWithDbTx(block: Subscriber<in U>.() -> Unit) {
(db ?: DatabaseTransactionManager.dataSource).transaction {
delegates.filter { !it.isUnsubscribed }.forEach {
it.block()
}
}
}
override fun onCompleted() = forEachSubscriberWithDbTx { onCompleted() }
override fun onError(e: Throwable?) = forEachSubscriberWithDbTx { onError(e) }
override fun onNext(s: U) = forEachSubscriberWithDbTx { onNext(s) }
override fun onStart() = forEachSubscriberWithDbTx { onStart() }
fun cleanUp() {
if (delegates.removeIf { it.isUnsubscribed }) {
if (delegates.isEmpty()) {
unsubscribe()
}
}
}
}
// A subscriber that wraps another but does not pass on observations to it.
private class NoOpSubscriber<U>(t: Subscriber<in U>) : Subscriber<U>(t) {
override fun onCompleted() {
}
override fun onError(e: Throwable?) {
}
override fun onNext(s: U) {
}
}
/**
* Wrap delivery of observations in a database transaction. Multiple subscribers will receive the observations inside
* the same database transaction. This also lazily subscribes to the source [rx.Observable] to preserve any buffering
* that might be in place.
*/
fun <T : Any> rx.Observable<T>.wrapWithDatabaseTransaction(db: CordaPersistence? = null): rx.Observable<T> {
var wrappingSubscriber = DatabaseTransactionWrappingSubscriber<T>(db)
// Use lift to add subscribers to a special subscriber that wraps a database transaction around observations.
// Each subscriber will be passed to this lambda when they subscribe, at which point we add them to wrapping subscriber.
return this.lift { toBeWrappedInDbTx: Subscriber<in T> ->
// Add the subscriber to the wrapping subscriber, which will invoke the original subscribers together inside a database transaction.
wrappingSubscriber.delegates.add(toBeWrappedInDbTx)
// If we are the first subscriber, return the shared subscriber, otherwise return a subscriber that does nothing.
if (wrappingSubscriber.delegates.size == 1) wrappingSubscriber else NoOpSubscriber(toBeWrappedInDbTx)
// Clean up the shared list of subscribers when they unsubscribe.
}.doOnUnsubscribe {
wrappingSubscriber.cleanUp()
// If cleanup removed the last subscriber reset the system, as future subscribers might need the stream again
if (wrappingSubscriber.delegates.isEmpty()) {
wrappingSubscriber = DatabaseTransactionWrappingSubscriber(db)
}
}
}

View File

@ -1,127 +0,0 @@
package net.corda.node.utilities
import co.paralleluniverse.strands.Strand
import org.hibernate.Session
import org.hibernate.Transaction
import rx.subjects.PublishSubject
import rx.subjects.Subject
import java.sql.Connection
import java.util.*
import java.util.concurrent.ConcurrentHashMap
class DatabaseTransaction(isolation: Int, val threadLocal: ThreadLocal<DatabaseTransaction>,
val transactionBoundaries: Subject<DatabaseTransactionManager.Boundary, DatabaseTransactionManager.Boundary>,
val cordaPersistence: CordaPersistence) {
val id: UUID = UUID.randomUUID()
val connection: Connection by lazy(LazyThreadSafetyMode.NONE) {
cordaPersistence.dataSource.connection
.apply {
autoCommit = false
transactionIsolation = isolation
}
}
private val sessionDelegate = lazy {
val session = cordaPersistence.entityManagerFactory.withOptions().connection(connection).openSession()
hibernateTransaction = session.beginTransaction()
session
}
val session: Session by sessionDelegate
private lateinit var hibernateTransaction: Transaction
private val outerTransaction: DatabaseTransaction? = threadLocal.get()
fun commit() {
if (sessionDelegate.isInitialized()) {
hibernateTransaction.commit()
}
connection.commit()
}
fun rollback() {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.clear()
}
if (!connection.isClosed) {
connection.rollback()
}
}
fun close() {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.close()
}
connection.close()
threadLocal.set(outerTransaction)
if (outerTransaction == null) {
transactionBoundaries.onNext(DatabaseTransactionManager.Boundary(id))
}
}
}
fun currentDBSession() = DatabaseTransactionManager.current().session
class DatabaseTransactionManager(initDataSource: CordaPersistence) {
companion object {
private val threadLocalDb = ThreadLocal<CordaPersistence>()
private val threadLocalTx = ThreadLocal<DatabaseTransaction>()
private val databaseToInstance = ConcurrentHashMap<CordaPersistence, DatabaseTransactionManager>()
fun setThreadLocalTx(tx: DatabaseTransaction?): DatabaseTransaction? {
val oldTx = threadLocalTx.get()
threadLocalTx.set(tx)
return oldTx
}
fun restoreThreadLocalTx(context: DatabaseTransaction?) {
if (context != null) {
threadLocalDb.set(context.cordaPersistence)
}
threadLocalTx.set(context)
}
var dataSource: CordaPersistence
get() = threadLocalDb.get() ?: throw IllegalStateException("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}")
set(value) = threadLocalDb.set(value)
val transactionId: UUID
get() = threadLocalTx.get()?.id ?: throw IllegalStateException("Was expecting to find transaction set on current strand: ${Strand.currentStrand()}")
val manager: DatabaseTransactionManager get() = databaseToInstance[dataSource]!!
val transactionBoundaries: Subject<Boundary, Boundary> get() = manager._transactionBoundaries
fun currentOrNull(): DatabaseTransaction? = manager.currentOrNull()
fun currentOrNew(isolation: Int = dataSource.transactionIsolationLevel) = currentOrNull() ?: manager.newTransaction(isolation)
fun current(): DatabaseTransaction = currentOrNull() ?: error("No transaction in context.")
fun newTransaction(isolation: Int = dataSource.transactionIsolationLevel) = manager.newTransaction(isolation)
}
data class Boundary(val txId: UUID)
private val _transactionBoundaries = PublishSubject.create<Boundary>().toSerialized()
init {
// Found a unit test that was forgetting to close the database transactions. When you close() on the top level
// database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a
// database transaction open. The [transaction] helper above handles this in a finally clause for you
// but any manual database transaction management is liable to have this problem.
if (threadLocalTx.get() != null) {
throw IllegalStateException("Was not expecting to find existing database transaction on current strand when setting database: ${Strand.currentStrand()}, ${threadLocalTx.get()}")
}
dataSource = initDataSource
databaseToInstance[dataSource] = this
}
private fun newTransaction(isolation: Int) =
DatabaseTransaction(isolation, threadLocalTx, transactionBoundaries, dataSource).apply {
threadLocalTx.set(this)
}
private fun currentOrNull(): DatabaseTransaction? = threadLocalTx.get()
}

View File

@ -1,13 +1,12 @@
package net.corda.node.utilities
import com.google.common.cache.RemovalCause
import com.google.common.cache.RemovalListener
import com.google.common.cache.RemovalNotification
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.persistence.currentDBSession
import java.util.*
/**
* Implements an unbound caching layer on top of a table accessed via Hibernate mapping.
*/

View File

@ -20,8 +20,8 @@ import net.corda.finance.contracts.DealState;
import net.corda.finance.contracts.asset.Cash;
import net.corda.finance.schemas.CashSchemaV1;
import net.corda.node.services.identity.InMemoryIdentityService;
import net.corda.node.utilities.CordaPersistence;
import net.corda.node.utilities.DatabaseTransaction;
import net.corda.nodeapi.internal.persistence.CordaPersistence;
import net.corda.nodeapi.internal.persistence.DatabaseTransaction;
import net.corda.testing.SerializationEnvironmentRule;
import net.corda.testing.contracts.DummyLinearContract;
import net.corda.testing.contracts.VaultFiller;

View File

@ -10,10 +10,10 @@ import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.FlowProgressHandleImpl
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.config.DatabaseConfig
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.shell.InteractiveShell
import net.corda.node.utilities.configureDatabase
import net.corda.node.internal.configureDatabase
import net.corda.testing.DEV_TRUST_ROOT
import net.corda.testing.MEGA_CORP
import net.corda.testing.MEGA_CORP_IDENTITY

View File

@ -36,7 +36,7 @@ import net.corda.node.internal.StartedNode
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.checkpoints
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.*
import net.corda.testing.contracts.VaultFiller
import net.corda.testing.node.*

View File

@ -1,6 +1,7 @@
package net.corda.node.services.config
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.ALICE
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThatThrownBy

View File

@ -23,7 +23,6 @@ import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.internal.cordapp.CordappProviderImpl
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.persistence.DBCheckpointStorage
@ -32,8 +31,9 @@ import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.statemachine.StateMachineManagerImpl
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.*
import net.corda.testing.contracts.DummyContract
import net.corda.testing.node.*

View File

@ -10,11 +10,11 @@ import net.corda.core.internal.cert
import net.corda.core.internal.toX509CertHolder
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.UnknownAnonymousPartyException
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
import net.corda.nodeapi.internal.crypto.CertificateType
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.*
import net.corda.testing.node.MockServices
import org.junit.After

View File

@ -4,15 +4,15 @@ import net.corda.core.crypto.generateKeyPair
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.RPCUserService
import net.corda.node.services.RPCUserServiceImpl
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.network.NetworkMapCacheImpl
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.*
import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties

View File

@ -4,10 +4,10 @@ import com.google.common.primitives.Ints
import net.corda.core.serialization.SerializedBytes
import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.LogHelper
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties

View File

@ -10,12 +10,13 @@ import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.*
import net.corda.testing.node.MockServices
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
@ -40,13 +41,14 @@ class DBTransactionStorageTests {
fun setUp() {
LogHelper.setLevel(PersistentUniquenessProvider::class)
val dataSourceProps = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProps, DatabaseConfig(), rigorousMock())
val schemaService = NodeSchemaService()
database = configureDatabase(dataSourceProps, DatabaseConfig(), rigorousMock(), schemaService)
database.transaction {
services = object : MockServices(BOB_KEY) {
override val vaultService: VaultServiceInternal
get() {
val vaultService = NodeVaultService(clock, keyManagementService, stateLoader, database.hibernateConfig)
hibernatePersister = HibernateObserver.install(vaultService.rawUpdates, database.hibernateConfig)
hibernatePersister = HibernateObserver.install(vaultService.rawUpdates, database.hibernateConfig, schemaService)
return vaultService
}

View File

@ -23,16 +23,21 @@ import net.corda.core.utilities.toBase58String
import net.corda.finance.DOLLARS
import net.corda.finance.POUNDS
import net.corda.finance.SWISS_FRANCS
import net.corda.finance.contracts.asset.*
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.contracts.asset.DUMMY_CASH_ISSUER_KEY
import net.corda.finance.contracts.asset.DUMMY_CASH_ISSUER_NAME
import net.corda.finance.contracts.asset.DummyFungibleContract
import net.corda.finance.schemas.CashSchemaV1
import net.corda.finance.schemas.SampleCashSchemaV2
import net.corda.finance.schemas.SampleCashSchemaV3
import net.corda.finance.utils.sumCash
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.vault.VaultSchemaV1
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.testing.*
import net.corda.testing.contracts.*
import net.corda.testing.node.MockServices
@ -92,12 +97,13 @@ class HibernateConfigurationTest {
doReturn(it).whenever(mock).wellKnownPartyFromX500Name(it.name)
}
}
database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService)
val schemaService = NodeSchemaService()
database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService, schemaService)
database.transaction {
hibernateConfig = database.hibernateConfig
// `consumeCash` expects we can self-notarise transactions
services = object : MockServices(cordappPackages, BOB_NAME, generateKeyPair(), DUMMY_NOTARY_KEY) {
override val vaultService = makeVaultService(database.hibernateConfig)
override val vaultService = makeVaultService(database.hibernateConfig, schemaService)
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
for (stx in txs) {
validatedTransactions.addTransaction(stx)

View File

@ -13,10 +13,10 @@ import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.node.services.vault.Builder
import net.corda.core.node.services.vault.Sort
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.LogHelper
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.rigorousMock

View File

@ -11,9 +11,9 @@ import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
import net.corda.node.services.api.SchemaService
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.utilities.DatabaseTransactionManager
import net.corda.node.utilities.configureDatabase
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.DatabaseTransactionManager
import net.corda.testing.LogHelper
import net.corda.testing.MEGA_CORP
import net.corda.testing.contracts.DummyContract
@ -66,7 +66,7 @@ class HibernateObserverTests {
}
}
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock(), schemaService)
HibernateObserver.install(rawUpdatesPublisher, database.hibernateConfig)
HibernateObserver.install(rawUpdatesPublisher, database.hibernateConfig, schemaService)
database.transaction {
rawUpdatesPublisher.onNext(Vault.Update(emptySet(), setOf(StateAndRef(TransactionState(TestState(), DummyContract.PROGRAM_ID, MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0)))))
val parentRowCountResult = DatabaseTransactionManager.current().connection.prepareStatement("select count(*) from Parents").executeQuery()

View File

@ -10,9 +10,9 @@ import net.corda.core.internal.concurrent.asCordaFuture
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.LogHelper
import net.corda.testing.SerializationEnvironmentRule
import net.corda.testing.freeLocalHostAndPort

View File

@ -2,9 +2,9 @@ package net.corda.node.services.transactions
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.UniquenessException
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.*
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.junit.After

View File

@ -34,7 +34,7 @@ import net.corda.finance.contracts.asset.DUMMY_CASH_ISSUER_NAME
import net.corda.finance.contracts.getCashBalance
import net.corda.finance.schemas.CashSchemaV1
import net.corda.finance.utils.sumCash
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.*
import net.corda.testing.contracts.VaultFiller
import net.corda.testing.node.MockServices

View File

@ -28,9 +28,9 @@ import net.corda.finance.schemas.CashSchemaV1
import net.corda.finance.schemas.CashSchemaV1.PersistentCashState
import net.corda.finance.schemas.CommercialPaperSchemaV1
import net.corda.finance.schemas.SampleCashSchemaV3
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.*
import net.corda.testing.contracts.*
import net.corda.testing.node.MockServices

View File

@ -25,7 +25,7 @@ import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.VaultServiceInternal
import net.corda.node.services.persistence.HibernateConfiguration
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.testing.chooseIdentity
import net.corda.testing.node.MockNetwork
import net.corda.testing.rigorousMock

View File

@ -24,7 +24,7 @@ import net.corda.finance.contracts.asset.DUMMY_CASH_ISSUER_KEY
import net.corda.finance.contracts.asset.DUMMY_CASH_ISSUER_NAME
import net.corda.finance.contracts.getCashBalance
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.utilities.CordaPersistence
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.*
import net.corda.testing.contracts.*
import net.corda.testing.node.MockServices

View File

@ -3,7 +3,8 @@ package net.corda.node.utilities
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.tee
import net.corda.node.services.config.DatabaseConfig
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.*
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.rigorousMock
import org.assertj.core.api.Assertions.assertThat

View File

@ -1,442 +0,0 @@
package net.corda.node.utilities
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.Crypto.EDDSA_ED25519_SHA512
import net.corda.core.crypto.Crypto.generateKeyPair
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.cert
import net.corda.core.internal.div
import net.corda.core.internal.toTypedArray
import net.corda.core.internal.x500Name
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.node.serialization.KryoServerSerializationScheme
import net.corda.node.services.config.createKeystoreForCordaNode
import net.corda.nodeapi.internal.crypto.*
import net.corda.nodeapi.internal.serialization.AllWhitelist
import net.corda.nodeapi.internal.serialization.SerializationContextImpl
import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
import net.corda.nodeapi.internal.serialization.kryo.KryoHeaderV0_1
import net.corda.testing.ALICE
import net.corda.testing.BOB
import net.corda.testing.BOB_PUBKEY
import net.corda.testing.MEGA_CORP
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x509.BasicConstraints
import org.bouncycastle.asn1.x509.Extension
import org.bouncycastle.asn1.x509.KeyUsage
import org.bouncycastle.cert.X509CertificateHolder
import org.bouncycastle.operator.jcajce.JcaContentVerifierProviderBuilder
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.io.DataInputStream
import java.io.DataOutputStream
import java.io.IOException
import java.net.InetAddress
import java.net.InetSocketAddress
import java.nio.file.Path
import java.security.KeyStore
import java.security.PrivateKey
import java.security.SecureRandom
import java.security.cert.CertPath
import java.security.cert.Certificate
import java.security.cert.X509Certificate
import java.util.*
import java.util.stream.Stream
import javax.net.ssl.*
import kotlin.concurrent.thread
import kotlin.test.*
class X509UtilitiesTest {
@Rule
@JvmField
val tempFolder: TemporaryFolder = TemporaryFolder()
@Test
fun `create valid self-signed CA certificate`() {
val caKey = generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
val caCert = X509Utilities.createSelfSignedCACertificate(CordaX500Name(commonName = "Test Cert", organisation = "R3 Ltd", locality = "London", country = "GB"), caKey)
assertEquals(X500Name("CN=Test Cert,O=R3 Ltd,L=London,C=GB"), caCert.subject) // using our subject common name
assertEquals(caCert.issuer, caCert.subject) //self-signed
caCert.isValidOn(Date()) // throws on verification problems
caCert.isSignatureValid(JcaContentVerifierProviderBuilder().build(caKey.public)) // throws on verification problems
val basicConstraints = BasicConstraints.getInstance(caCert.getExtension(Extension.basicConstraints).parsedValue)
val keyUsage = KeyUsage.getInstance(caCert.getExtension(Extension.keyUsage).parsedValue)
assertFalse { keyUsage.hasUsages(5) } // Bit 5 == keyCertSign according to ASN.1 spec (see full comment on KeyUsage property)
assertNull(basicConstraints.pathLenConstraint) // No length constraint specified on this CA certificate
}
@Test
fun `load and save a PEM file certificate`() {
val tmpCertificateFile = tempFile("cacert.pem")
val caKey = generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
val caCert = X509Utilities.createSelfSignedCACertificate(CordaX500Name(commonName = "Test Cert", organisation = "R3 Ltd", locality = "London", country = "GB"), caKey)
X509Utilities.saveCertificateAsPEMFile(caCert, tmpCertificateFile)
val readCertificate = X509Utilities.loadCertificateFromPEMFile(tmpCertificateFile)
assertEquals(caCert, readCertificate)
}
@Test
fun `create valid server certificate chain`() {
val caKey = generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
val caCert = X509Utilities.createSelfSignedCACertificate(CordaX500Name(commonName = "Test CA Cert", organisation = "R3 Ltd", locality = "London", country = "GB"), caKey)
val subject = CordaX500Name(commonName = "Server Cert", organisation = "R3 Ltd", locality = "London", country = "GB")
val keyPair = generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
val serverCert = X509Utilities.createCertificate(CertificateType.TLS, caCert, caKey, subject, keyPair.public)
assertEquals(X500Name("C=GB,L=London,O=R3 Ltd,CN=Server Cert"), serverCert.subject) // using our subject common name
assertEquals(caCert.issuer, serverCert.issuer) // Issued by our CA cert
serverCert.isValidOn(Date()) // throws on verification problems
serverCert.isSignatureValid(JcaContentVerifierProviderBuilder().build(caKey.public)) // throws on verification problems
val basicConstraints = BasicConstraints.getInstance(serverCert.getExtension(Extension.basicConstraints).parsedValue)
val keyUsage = KeyUsage.getInstance(serverCert.getExtension(Extension.keyUsage).parsedValue)
assertFalse { keyUsage.hasUsages(5) } // Bit 5 == keyCertSign according to ASN.1 spec (see full comment on KeyUsage property)
assertNull(basicConstraints.pathLenConstraint) // Non-CA certificate
}
@Test
fun `storing EdDSA key in java keystore`() {
val tmpKeyStore = tempFile("keystore.jks")
val keyPair = generateKeyPair(EDDSA_ED25519_SHA512)
val testName = CordaX500Name(commonName = "Test", organisation = "R3 Ltd", locality = "London", country = "GB")
val selfSignCert = X509Utilities.createSelfSignedCACertificate(testName, keyPair)
assertTrue(Arrays.equals(selfSignCert.subjectPublicKeyInfo.encoded, keyPair.public.encoded))
// Save the EdDSA private key with self sign cert in the keystore.
val keyStore = loadOrCreateKeyStore(tmpKeyStore, "keystorepass")
keyStore.setKeyEntry("Key", keyPair.private, "password".toCharArray(),
Stream.of(selfSignCert).map { it.cert }.toTypedArray())
keyStore.save(tmpKeyStore, "keystorepass")
// Load the keystore from file and make sure keys are intact.
val keyStore2 = loadOrCreateKeyStore(tmpKeyStore, "keystorepass")
val privateKey = keyStore2.getKey("Key", "password".toCharArray())
val pubKey = keyStore2.getCertificate("Key").publicKey
assertNotNull(pubKey)
assertNotNull(privateKey)
assertEquals(keyPair.public, pubKey)
assertEquals(keyPair.private, privateKey)
}
@Test
fun `signing EdDSA key with EcDSA certificate`() {
val tmpKeyStore = tempFile("keystore.jks")
val ecDSAKey = generateKeyPair(Crypto.ECDSA_SECP256R1_SHA256)
val testName = CordaX500Name(commonName = "Test", organisation = "R3 Ltd", locality = "London", country = "GB")
val ecDSACert = X509Utilities.createSelfSignedCACertificate(testName, ecDSAKey)
val edDSAKeypair = generateKeyPair(EDDSA_ED25519_SHA512)
val edDSACert = X509Utilities.createCertificate(CertificateType.TLS, ecDSACert, ecDSAKey, X500Name("CN=TestEdDSA"), edDSAKeypair.public)
// Save the EdDSA private key with cert chains.
val keyStore = loadOrCreateKeyStore(tmpKeyStore, "keystorepass")
keyStore.setKeyEntry("Key", edDSAKeypair.private, "password".toCharArray(),
Stream.of(ecDSACert, edDSACert).map { it.cert }.toTypedArray())
keyStore.save(tmpKeyStore, "keystorepass")
// Load the keystore from file and make sure keys are intact.
val keyStore2 = loadOrCreateKeyStore(tmpKeyStore, "keystorepass")
val privateKey = keyStore2.getKey("Key", "password".toCharArray())
val certs = keyStore2.getCertificateChain("Key")
val pubKey = certs.last().publicKey
assertEquals(2, certs.size)
assertNotNull(pubKey)
assertNotNull(privateKey)
assertEquals(edDSAKeypair.public, pubKey)
assertEquals(edDSAKeypair.private, privateKey)
}
@Test
fun `create full CA keystore`() {
val tmpKeyStore = tempFile("keystore.jks")
val tmpTrustStore = tempFile("truststore.jks")
// Generate Root and Intermediate CA cert and put both into key store and root ca cert into trust store
createCAKeyStoreAndTrustStore(tmpKeyStore, "keystorepass", "keypass", tmpTrustStore, "trustpass")
// Load back generated root CA Cert and private key from keystore and check against copy in truststore
val keyStore = loadKeyStore(tmpKeyStore, "keystorepass")
val trustStore = loadKeyStore(tmpTrustStore, "trustpass")
val rootCaCert = keyStore.getCertificate(X509Utilities.CORDA_ROOT_CA) as X509Certificate
val rootCaPrivateKey = keyStore.getKey(X509Utilities.CORDA_ROOT_CA, "keypass".toCharArray()) as PrivateKey
val rootCaFromTrustStore = trustStore.getCertificate(X509Utilities.CORDA_ROOT_CA) as X509Certificate
assertEquals(rootCaCert, rootCaFromTrustStore)
rootCaCert.checkValidity(Date())
rootCaCert.verify(rootCaCert.publicKey)
// Now sign something with private key and verify against certificate public key
val testData = "12345".toByteArray()
val caSignature = Crypto.doSign(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME, rootCaPrivateKey, testData)
assertTrue { Crypto.isValid(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME, rootCaCert.publicKey, caSignature, testData) }
// Load back generated intermediate CA Cert and private key
val intermediateCaCert = keyStore.getCertificate(X509Utilities.CORDA_INTERMEDIATE_CA) as X509Certificate
val intermediateCaCertPrivateKey = keyStore.getKey(X509Utilities.CORDA_INTERMEDIATE_CA, "keypass".toCharArray()) as PrivateKey
intermediateCaCert.checkValidity(Date())
intermediateCaCert.verify(rootCaCert.publicKey)
// Now sign something with private key and verify against certificate public key
val intermediateSignature = Crypto.doSign(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME, intermediateCaCertPrivateKey, testData)
assertTrue { Crypto.isValid(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME, intermediateCaCert.publicKey, intermediateSignature, testData) }
}
@Test
fun `create server certificate in keystore for SSL`() {
val tmpCAKeyStore = tempFile("keystore.jks")
val tmpTrustStore = tempFile("truststore.jks")
val tmpSSLKeyStore = tempFile("sslkeystore.jks")
val tmpServerKeyStore = tempFile("serverkeystore.jks")
// Generate Root and Intermediate CA cert and put both into key store and root ca cert into trust store
createCAKeyStoreAndTrustStore(tmpCAKeyStore,
"cakeystorepass",
"cakeypass",
tmpTrustStore,
"trustpass")
// Load signing intermediate CA cert
val caKeyStore = loadKeyStore(tmpCAKeyStore, "cakeystorepass")
val caCertAndKey = caKeyStore.getCertificateAndKeyPair(X509Utilities.CORDA_INTERMEDIATE_CA, "cakeypass")
// Generate server cert and private key and populate another keystore suitable for SSL
createKeystoreForCordaNode(tmpSSLKeyStore, tmpServerKeyStore, "serverstorepass", "serverkeypass", caKeyStore, "cakeypass", MEGA_CORP.name)
// Load back server certificate
val serverKeyStore = loadKeyStore(tmpServerKeyStore, "serverstorepass")
val serverCertAndKey = serverKeyStore.getCertificateAndKeyPair(X509Utilities.CORDA_CLIENT_CA, "serverkeypass")
serverCertAndKey.certificate.isValidOn(Date())
serverCertAndKey.certificate.isSignatureValid(JcaContentVerifierProviderBuilder().build(caCertAndKey.certificate.subjectPublicKeyInfo))
assertTrue { serverCertAndKey.certificate.subject.toString().contains(MEGA_CORP.name.organisation) }
// Load back server certificate
val sslKeyStore = loadKeyStore(tmpSSLKeyStore, "serverstorepass")
val sslCertAndKey = sslKeyStore.getCertificateAndKeyPair(X509Utilities.CORDA_CLIENT_TLS, "serverkeypass")
sslCertAndKey.certificate.isValidOn(Date())
sslCertAndKey.certificate.isSignatureValid(JcaContentVerifierProviderBuilder().build(serverCertAndKey.certificate.subjectPublicKeyInfo))
assertTrue { sslCertAndKey.certificate.subject.toString().contains(MEGA_CORP.name.organisation) }
// Now sign something with private key and verify against certificate public key
val testData = "123456".toByteArray()
val signature = Crypto.doSign(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME, serverCertAndKey.keyPair.private, testData)
val publicKey = Crypto.toSupportedPublicKey(serverCertAndKey.certificate.subjectPublicKeyInfo)
assertTrue { Crypto.isValid(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME, publicKey, signature, testData) }
}
@Test
fun `create server cert and use in SSL socket`() {
val tmpCAKeyStore = tempFile("keystore.jks")
val tmpTrustStore = tempFile("truststore.jks")
val tmpSSLKeyStore = tempFile("sslkeystore.jks")
val tmpServerKeyStore = tempFile("serverkeystore.jks")
// Generate Root and Intermediate CA cert and put both into key store and root ca cert into trust store
val caKeyStore = createCAKeyStoreAndTrustStore(tmpCAKeyStore,
"cakeystorepass",
"cakeypass",
tmpTrustStore,
"trustpass")
// Generate server cert and private key and populate another keystore suitable for SSL
createKeystoreForCordaNode(tmpSSLKeyStore, tmpServerKeyStore, "serverstorepass", "serverstorepass", caKeyStore, "cakeypass", MEGA_CORP.name)
val keyStore = loadKeyStore(tmpSSLKeyStore, "serverstorepass")
val trustStore = loadKeyStore(tmpTrustStore, "trustpass")
val context = SSLContext.getInstance("TLS")
val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
keyManagerFactory.init(keyStore, "serverstorepass".toCharArray())
val keyManagers = keyManagerFactory.keyManagers
val trustMgrFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
trustMgrFactory.init(trustStore)
val trustManagers = trustMgrFactory.trustManagers
context.init(keyManagers, trustManagers, SecureRandom())
val serverSocketFactory = context.serverSocketFactory
val clientSocketFactory = context.socketFactory
val serverSocket = serverSocketFactory.createServerSocket(0) as SSLServerSocket // use 0 to get first free socket
val serverParams = SSLParameters(arrayOf("TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
"TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256",
"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
"TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA",
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256"),
arrayOf("TLSv1.2"))
serverParams.wantClientAuth = true
serverParams.needClientAuth = true
serverParams.endpointIdentificationAlgorithm = null // Reconfirm default no server name indication, use our own validator.
serverSocket.sslParameters = serverParams
serverSocket.useClientMode = false
val clientSocket = clientSocketFactory.createSocket() as SSLSocket
val clientParams = SSLParameters(arrayOf("TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
"TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256",
"TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
"TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA",
"TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
"TLS_ECDH_ECDSA_WITH_AES_128_GCM_SHA256"),
arrayOf("TLSv1.2"))
clientParams.endpointIdentificationAlgorithm = null // Reconfirm default no server name indication, use our own validator.
clientSocket.sslParameters = clientParams
clientSocket.useClientMode = true
// We need to specify this explicitly because by default the client binds to 'localhost' and we want it to bind
// to whatever <hostname> resolves to(as that's what the server binds to). In particular on Debian <hostname>
// resolves to 127.0.1.1 instead of the external address of the interface, so the ssl handshake fails.
clientSocket.bind(InetSocketAddress(InetAddress.getLocalHost(), 0))
val lock = Object()
var done = false
var serverError = false
val serverThread = thread {
try {
val sslServerSocket = serverSocket.accept()
assertTrue(sslServerSocket.isConnected)
val serverInput = DataInputStream(sslServerSocket.inputStream)
val receivedString = serverInput.readUTF()
assertEquals("Hello World", receivedString)
synchronized(lock) {
done = true
lock.notifyAll()
}
sslServerSocket.close()
} catch (ex: Throwable) {
serverError = true
}
}
clientSocket.connect(InetSocketAddress(InetAddress.getLocalHost(), serverSocket.localPort))
assertTrue(clientSocket.isConnected)
// Double check hostname manually
val peerChain = clientSocket.session.peerCertificates
val peerX500Principal = (peerChain[0] as X509Certificate).subjectX500Principal
assertEquals(MEGA_CORP.name.x500Principal, peerX500Principal)
X509Utilities.validateCertificateChain(trustStore.getX509Certificate(X509Utilities.CORDA_ROOT_CA), *peerChain)
val output = DataOutputStream(clientSocket.outputStream)
output.writeUTF("Hello World")
var timeout = 0
synchronized(lock) {
while (!done) {
timeout++
if (timeout > 10) throw IOException("Timed out waiting for server to complete")
lock.wait(1000)
}
}
clientSocket.close()
serverThread.join(1000)
assertFalse { serverError }
serverSocket.close()
assertTrue(done)
}
private fun tempFile(name: String): Path = tempFolder.root.toPath() / name
/**
* All in one wrapper to manufacture a root CA cert and an Intermediate CA cert.
* Normally this would be run once and then the outputs would be re-used repeatedly to manufacture the server certs
* @param keyStoreFilePath The output KeyStore path to publish the private keys of the CA root and intermediate certs into.
* @param storePassword The storage password to protect access to the generated KeyStore and public certificates
* @param keyPassword The password that protects the CA private keys.
* Unlike the SSL libraries that tend to assume the password is the same as the keystore password.
* These CA private keys should be protected more effectively with a distinct password.
* @param trustStoreFilePath The output KeyStore to place the Root CA public certificate, which can be used as an SSL truststore
* @param trustStorePassword The password to protect the truststore
* @return The KeyStore object that was saved to file
*/
private fun createCAKeyStoreAndTrustStore(keyStoreFilePath: Path,
storePassword: String,
keyPassword: String,
trustStoreFilePath: Path,
trustStorePassword: String
): KeyStore {
val rootCAKey = generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
val baseName = CordaX500Name(organisation = "R3CEV", locality = "London", country = "GB")
val rootCACert = X509Utilities.createSelfSignedCACertificate(baseName.copy(commonName = "Corda Node Root CA"), rootCAKey)
val intermediateCAKeyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
val intermediateCACert = X509Utilities.createCertificate(
CertificateType.INTERMEDIATE_CA,
rootCACert,
rootCAKey,
baseName.copy(commonName = "Corda Node Intermediate CA"),
intermediateCAKeyPair.public)
val keyPass = keyPassword.toCharArray()
val keyStore = loadOrCreateKeyStore(keyStoreFilePath, storePassword)
keyStore.addOrReplaceKey(X509Utilities.CORDA_ROOT_CA, rootCAKey.private, keyPass, arrayOf<Certificate>(rootCACert.cert))
keyStore.addOrReplaceKey(X509Utilities.CORDA_INTERMEDIATE_CA,
intermediateCAKeyPair.private,
keyPass,
Stream.of(intermediateCACert, rootCACert).map { it.cert }.toTypedArray<Certificate>())
keyStore.save(keyStoreFilePath, storePassword)
val trustStore = loadOrCreateKeyStore(trustStoreFilePath, trustStorePassword)
trustStore.addOrReplaceCertificate(X509Utilities.CORDA_ROOT_CA, rootCACert.cert)
trustStore.addOrReplaceCertificate(X509Utilities.CORDA_INTERMEDIATE_CA, intermediateCACert.cert)
trustStore.save(trustStoreFilePath, trustStorePassword)
return keyStore
}
@Test
fun `Get correct private key type from Keystore`() {
val keyPair = generateKeyPair(Crypto.ECDSA_SECP256R1_SHA256)
val testName = CordaX500Name(commonName = "Test", organisation = "R3 Ltd", locality = "London", country = "GB")
val selfSignCert = X509Utilities.createSelfSignedCACertificate(testName, keyPair)
val keyStore = loadOrCreateKeyStore(tempFile("testKeystore.jks"), "keystorepassword")
keyStore.setKeyEntry("Key", keyPair.private, "keypassword".toCharArray(), arrayOf(selfSignCert.cert))
val keyFromKeystore = keyStore.getKey("Key", "keypassword".toCharArray())
val keyFromKeystoreCasted = keyStore.getSupportedKey("Key", "keypassword")
assertTrue(keyFromKeystore is java.security.interfaces.ECPrivateKey) // by default JKS returns SUN EC key
assertTrue(keyFromKeystoreCasted is org.bouncycastle.jce.interfaces.ECPrivateKey)
}
@Test
fun `serialize - deserialize X509CertififcateHolder`() {
val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) }
val context = SerializationContextImpl(KryoHeaderV0_1,
javaClass.classLoader,
AllWhitelist,
emptyMap(),
true,
SerializationContext.UseCase.P2P)
val expected: X509CertificateHolder = X509Utilities.createSelfSignedCACertificate(ALICE.name, Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME))
val serialized = expected.serialize(factory, context).bytes
val actual: X509CertificateHolder = serialized.deserialize(factory, context)
assertEquals(expected, actual)
}
@Test
fun `serialize - deserialize X509CertPath`() {
val factory = SerializationFactoryImpl().apply { registerScheme(KryoServerSerializationScheme()) }
val context = SerializationContextImpl(KryoHeaderV0_1,
javaClass.classLoader,
AllWhitelist,
emptyMap(),
true,
SerializationContext.UseCase.P2P)
val rootCAKey = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
val rootCACert = X509Utilities.createSelfSignedCACertificate(ALICE.name, rootCAKey)
val certificate = X509Utilities.createCertificate(CertificateType.TLS, rootCACert, rootCAKey, BOB.name.x500Name, BOB_PUBKEY)
val expected = X509CertificateFactory().delegate.generateCertPath(listOf(certificate.cert, rootCACert.cert))
val serialized = expected.serialize(factory, context).bytes
val actual: CertPath = serialized.deserialize(factory, context)
assertEquals(expected, actual)
}
}