rewrite few services to use Hibernate

* DBTransactionMappingStorage
* DBTransactionStorage
* DBCheckpointStorage
* PersistentUniquenessProvider
* PersistentKeyManagementService
This commit is contained in:
szymonsztuka 2017-08-14 15:06:06 +01:00 committed by GitHub
parent 55a4264652
commit a6bf8e35dd
29 changed files with 563 additions and 309 deletions

View File

@ -11,18 +11,22 @@ import javax.persistence.Converter
* Completely anonymous parties are stored as null (to preserve privacy)
*/
@Converter(autoApply = true)
class AbstractPartyToX500NameAsStringConverter(val identitySvc: IdentityService) : AttributeConverter<AbstractParty, String> {
class AbstractPartyToX500NameAsStringConverter(identitySvc: () -> IdentityService) : AttributeConverter<AbstractParty, String> {
private val identityService: IdentityService by lazy {
identitySvc()
}
override fun convertToDatabaseColumn(party: AbstractParty?): String? {
party?.let {
return identitySvc.partyFromAnonymous(party)?.toString()
return identityService.partyFromAnonymous(party)?.toString()
}
return null // non resolvable anonymous parties
}
override fun convertToEntityAttribute(dbData: String?): AbstractParty? {
dbData?.let {
val party = identitySvc.partyFromX500Name(X500Name(dbData))
val party = identityService.partyFromX500Name(X500Name(dbData))
return party as AbstractParty
}
return null // non resolvable anonymous parties are stored as nulls

View File

@ -10,18 +10,13 @@ import com.google.common.collect.testing.features.MapFeature
import com.google.common.collect.testing.features.SetFeature
import com.google.common.collect.testing.testers.*
import junit.framework.TestSuite
import net.corda.testing.TestDependencyInjectionBase
import net.corda.testing.initialiseTestSerialization
import net.corda.testing.*
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.resetTestSerialization
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.junit.*
import org.junit.runner.RunWith
import org.junit.runners.Suite
import java.sql.Connection
import java.util.*
@RunWith(Suite::class)
@ -47,7 +42,7 @@ class JDBCHashMapTestSuite {
@BeforeClass
fun before() {
initialiseTestSerialization()
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties())
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), identitySvc = { throw UnsupportedOperationException("Identity Service should not be in use") })
setUpDatabaseTx()
loadOnInitFalseMap = JDBCHashMap<String, String>("test_map_false", loadOnInit = false)
memoryConstrainedMap = JDBCHashMap<String, String>("test_map_constrained", loadOnInit = false, maxBuckets = 1)
@ -233,7 +228,7 @@ class JDBCHashMapTestSuite {
@Before
fun before() {
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties())
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), identitySvc = { throw UnsupportedOperationException("Identity Service should not be in use") })
}
@After

View File

@ -486,7 +486,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
private fun makeVaultObservers() {
VaultSoftLockManager(services.vaultService, smm)
ScheduledActivityObserver(services)
HibernateObserver(services.vaultService.rawUpdates, HibernateConfiguration(services.schemaService, configuration.database ?: Properties(), services.identityService))
HibernateObserver(services.vaultService.rawUpdates, HibernateConfiguration(services.schemaService, configuration.database ?: Properties(), {services.identityService}))
}
private fun makeInfo(): NodeInfo {
@ -545,7 +545,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
protected open fun initialiseDatabasePersistence(insideTransaction: () -> Unit) {
val props = configuration.dataSourceProperties
if (props.isNotEmpty()) {
this.database = configureDatabase(props, configuration.database)
this.database = configureDatabase(props, configuration.database, identitySvc = { _services.identityService })
// Now log the vendor string as this will also cause a connection to be tested eagerly.
database.transaction {
log.info("Connected to ${database.database.vendor} database.")
@ -773,7 +773,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
override val networkMapCache by lazy { InMemoryNetworkMapCache(this) }
override val vaultService by lazy { NodeVaultService(this, configuration.dataSourceProperties, configuration.database) }
override val vaultQueryService by lazy {
HibernateVaultQueryImpl(HibernateConfiguration(schemaService, configuration.database ?: Properties(), identityService), vaultService.updatesPublisher)
HibernateVaultQueryImpl(HibernateConfiguration(schemaService, configuration.database ?: Properties(), { identityService }), vaultService.updatesPublisher)
}
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with

View File

@ -20,7 +20,7 @@ import java.sql.Connection
import java.util.*
import java.util.concurrent.ConcurrentHashMap
class HibernateConfiguration(val schemaService: SchemaService, val databaseProperties: Properties, val identitySvc: IdentityService) {
class HibernateConfiguration(val schemaService: SchemaService, val databaseProperties: Properties, private val identitySvc: () -> IdentityService) {
companion object {
val logger = loggerFor<HibernateConfiguration>()
}

View File

@ -6,14 +6,13 @@ import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.ThreadBox
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.*
import net.corda.node.utilities.*
import org.bouncycastle.operator.ContentSigner
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import java.security.KeyPair
import java.security.PrivateKey
import java.security.PublicKey
import javax.persistence.*
/**
* A persistent re-implementation of [E2ETestKeyManagementService] to support node re-start.
@ -25,60 +24,62 @@ import java.security.PublicKey
class PersistentKeyManagementService(val identityService: IdentityService,
initialKeys: Set<KeyPair>) : SingletonSerializeAsToken(), KeyManagementService {
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}our_key_pairs") {
val publicKey = publicKey("public_key")
val privateKey = blob("private_key")
}
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}our_key_pairs")
class PersistentKey(
private class InnerState {
val keys = object : AbstractJDBCHashMap<PublicKey, PrivateKey, Table>(Table, loadOnInit = false) {
override fun keyFromRow(row: ResultRow): PublicKey = row[table.publicKey]
@Id
@Column(name = "public_key")
var publicKey: String = "",
override fun valueFromRow(row: ResultRow): PrivateKey = deserializeFromBlob(row[table.privateKey])
@Lob
@Column(name = "private_key")
var privateKey: ByteArray = ByteArray(0)
)
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<PublicKey, PrivateKey>, finalizables: MutableList<() -> Unit>) {
insert[table.publicKey] = entry.key
}
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<PublicKey, PrivateKey>, finalizables: MutableList<() -> Unit>) {
insert[table.privateKey] = serializeToBlob(entry.value, finalizables)
}
private companion object {
fun createKeyMap(): AppendOnlyPersistentMap<PublicKey, PrivateKey, PersistentKey, String> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { it.toBase58String() },
fromPersistentEntity = { Pair(parsePublicKeyBase58(it.publicKey),
it.privateKey.deserialize<PrivateKey>(context = SerializationDefaults.STORAGE_CONTEXT)) },
toPersistentEntity = { key: PublicKey, value: PrivateKey ->
PersistentKey().apply {
publicKey = key.toBase58String()
privateKey = value.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
}
},
persistentEntityClass = PersistentKey::class.java
)
}
}
private val mutex = ThreadBox(InnerState())
val keysMap = createKeyMap()
init {
mutex.locked {
keys.putAll(initialKeys.associate { Pair(it.public, it.private) })
}
initialKeys.forEach({ it -> keysMap.addWithDuplicatesAllowed(it.public, it.private) })
}
override val keys: Set<PublicKey> get() = mutex.locked { keys.keys }
override val keys: Set<PublicKey> get() = keysMap.allPersisted().map { it.first }.toSet()
override fun filterMyKeys(candidateKeys: Iterable<PublicKey>): Iterable<PublicKey> {
return mutex.locked { candidateKeys.filter { it in this.keys } }
}
override fun filterMyKeys(candidateKeys: Iterable<PublicKey>): Iterable<PublicKey> =
candidateKeys.filter { keysMap[it] != null }
override fun freshKey(): PublicKey {
val keyPair = generateKeyPair()
mutex.locked {
keys[keyPair.public] = keyPair.private
}
keysMap[keyPair.public] = keyPair.private
return keyPair.public
}
override fun freshKeyAndCert(identity: PartyAndCertificate, revocationEnabled: Boolean): AnonymousPartyAndPath {
return freshCertificate(identityService, freshKey(), identity, getSigner(identity.owningKey), revocationEnabled)
}
override fun freshKeyAndCert(identity: PartyAndCertificate, revocationEnabled: Boolean): AnonymousPartyAndPath =
freshCertificate(identityService, freshKey(), identity, getSigner(identity.owningKey), revocationEnabled)
private fun getSigner(publicKey: PublicKey): ContentSigner = getSigner(getSigningKeyPair(publicKey))
//It looks for the PublicKey in the (potentially) CompositeKey that is ours, and then returns the associated PrivateKey to use in signing
private fun getSigningKeyPair(publicKey: PublicKey): KeyPair {
return mutex.locked {
val pk = publicKey.keys.first { keys.containsKey(it) }
KeyPair(pk, keys[pk]!!)
}
val pk = publicKey.keys.first { keysMap[it] != null } //TODO here for us to re-write this using an actual query if publicKey.keys.size > 1
return KeyPair(pk, keysMap[pk]!!)
}
override fun sign(bytes: ByteArray, publicKey: PublicKey): DigitalSignature.WithKey {

View File

@ -1,57 +1,60 @@
package net.corda.node.services.persistence
import net.corda.core.crypto.SecureHash
import net.corda.core.serialization.SerializationDefaults.CHECKPOINT_CONTEXT
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.utilities.*
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import java.util.Collections.synchronizedMap
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Lob
/**
* Simple checkpoint key value storage in DB using the underlying JDBCHashMap and transactional context of the call sites.
* Simple checkpoint key value storage in DB.
*/
class DBCheckpointStorage : CheckpointStorage {
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}checkpoints") {
val checkpointId = secureHash("checkpoint_id")
val checkpoint = blob("checkpoint")
}
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints")
class DBCheckpoint(
@Id
@Column(name = "checkpoint_id", length = 64)
var checkpointId: String = "",
private class CheckpointMap : AbstractJDBCHashMap<SecureHash, SerializedBytes<Checkpoint>, Table>(Table, loadOnInit = false) {
override fun keyFromRow(row: ResultRow): SecureHash = row[table.checkpointId]
@Lob
@Column(name = "checkpoint")
var checkpoint: ByteArray = ByteArray(0)
)
override fun valueFromRow(row: ResultRow): SerializedBytes<Checkpoint> = bytesFromBlob(row[table.checkpoint])
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<SecureHash, SerializedBytes<Checkpoint>>, finalizables: MutableList<() -> Unit>) {
insert[table.checkpointId] = entry.key
}
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<SecureHash, SerializedBytes<Checkpoint>>, finalizables: MutableList<() -> Unit>) {
insert[table.checkpoint] = bytesToBlob(entry.value, finalizables)
}
}
private val checkpointStorage = synchronizedMap(CheckpointMap())
override fun addCheckpoint(checkpoint: Checkpoint) {
checkpointStorage.put(checkpoint.id, checkpoint.serialize(context = CHECKPOINT_CONTEXT))
override fun addCheckpoint(value: Checkpoint) {
val session = DatabaseTransactionManager.current().session
session.save(DBCheckpoint().apply {
checkpointId = value.id.toString()
checkpoint = value.serialize(context = SerializationDefaults.CHECKPOINT_CONTEXT).bytes
})
}
override fun removeCheckpoint(checkpoint: Checkpoint) {
checkpointStorage.remove(checkpoint.id) ?: throw IllegalArgumentException("Checkpoint not found")
val session = DatabaseTransactionManager.current().session
val criteriaBuilder = session.criteriaBuilder
val delete = criteriaBuilder.createCriteriaDelete(DBCheckpoint::class.java)
val root = delete.from(DBCheckpoint::class.java)
delete.where(criteriaBuilder.equal(root.get<String>(DBCheckpoint::checkpointId.name), checkpoint.id.toString()))
session.createQuery(delete).executeUpdate()
}
override fun forEach(block: (Checkpoint) -> Boolean) {
synchronized(checkpointStorage) {
for (checkpoint in checkpointStorage.values) {
if (!block(checkpoint.deserialize(context = CHECKPOINT_CONTEXT))) {
break
}
val session = DatabaseTransactionManager.current().session
val criteriaQuery = session.criteriaBuilder.createQuery(DBCheckpoint::class.java)
val root = criteriaQuery.from(DBCheckpoint::class.java)
criteriaQuery.select(root)
val query = session.createQuery(criteriaQuery)
val checkpoints = query.resultList.map { e -> e.checkpoint.deserialize<Checkpoint>(context = SerializationDefaults.CHECKPOINT_CONTEXT) }.asSequence()
for (e in checkpoints) {
if (!block(e)) {
break
}
}
}

View File

@ -1,6 +1,5 @@
package net.corda.node.services.persistence
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.StateMachineRunId
@ -8,59 +7,57 @@ import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage
import net.corda.node.utilities.*
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import rx.subjects.PublishSubject
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.*
/**
* Database storage of a txhash -> state machine id mapping.
*
* Mappings are added as transactions are persisted by [ServiceHub.recordTransaction], and never deleted. Used in the
* RPC API to correlate transaction creation with flows.
*
*/
@ThreadSafe
class DBTransactionMappingStorage : StateMachineRecordedTransactionMappingStorage {
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}transaction_mappings") {
val txId = secureHash("tx_id")
val stateMachineRunId = uuidString("state_machine_run_id")
}
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}transaction_mappings")
class DBTransactionMapping(
@Id
@Column(name = "tx_id", length = 64)
var txId: String = "",
private class TransactionMappingsMap : AbstractJDBCHashMap<SecureHash, StateMachineRunId, Table>(Table, loadOnInit = false) {
override fun keyFromRow(row: ResultRow): SecureHash = row[table.txId]
@Column(name = "state_machine_run_id", length = 36)
var stateMachineRunId: String = ""
)
override fun valueFromRow(row: ResultRow): StateMachineRunId = StateMachineRunId(row[table.stateMachineRunId])
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<SecureHash, StateMachineRunId>, finalizables: MutableList<() -> Unit>) {
insert[table.txId] = entry.key
}
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<SecureHash, StateMachineRunId>, finalizables: MutableList<() -> Unit>) {
insert[table.stateMachineRunId] = entry.value.uuid
}
}
private class InnerState {
val stateMachineTransactionMap = TransactionMappingsMap()
val updates: PublishSubject<StateMachineTransactionMapping> = PublishSubject.create()
}
private val mutex = ThreadBox(InnerState())
override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) {
mutex.locked {
stateMachineTransactionMap[transactionId] = stateMachineRunId
updates.bufferUntilDatabaseCommit().onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId))
}
}
override fun track(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
mutex.locked {
return DataFeed(
stateMachineTransactionMap.map { StateMachineTransactionMapping(it.value, it.key) },
updates.bufferUntilSubscribed().wrapWithDatabaseTransaction()
private companion object {
fun createMap(): AppendOnlyPersistentMap<SecureHash, StateMachineRunId, DBTransactionMapping, String> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { it.toString() },
fromPersistentEntity = { Pair(SecureHash.parse(it.txId), StateMachineRunId(UUID.fromString(it.stateMachineRunId))) },
toPersistentEntity = { key: SecureHash, value: StateMachineRunId ->
DBTransactionMapping().apply {
txId = key.toString()
stateMachineRunId = value.uuid.toString()
}
},
persistentEntityClass = DBTransactionMapping::class.java
)
}
}
val stateMachineTransactionMap = createMap()
val updates: PublishSubject<StateMachineTransactionMapping> = PublishSubject.create()
override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) {
stateMachineTransactionMap[transactionId] = stateMachineRunId
updates.bufferUntilDatabaseCommit().onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId))
}
override fun track(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> =
DataFeed(stateMachineTransactionMap.allPersisted().map { StateMachineTransactionMapping(it.second, it.first) }.toList(),
updates.bufferUntilSubscribed().wrapWithDatabaseTransaction())
}

View File

@ -4,73 +4,60 @@ import com.google.common.annotations.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.crypto.SecureHash
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.*
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.utilities.*
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.exposedLogger
import org.jetbrains.exposed.sql.statements.InsertStatement
import rx.Observable
import rx.subjects.PublishSubject
import java.util.Collections.synchronizedMap
import javax.persistence.*
class DBTransactionStorage : WritableTransactionStorage, SingletonSerializeAsToken() {
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}transactions") {
val txId = secureHash("tx_id")
val transaction = blob("transaction")
}
private class TransactionsMap : AbstractJDBCHashMap<SecureHash, SignedTransaction, Table>(Table, loadOnInit = false) {
override fun keyFromRow(row: ResultRow): SecureHash = row[table.txId]
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}transactions")
class DBTransaction(
@Id
@Column(name = "tx_id", length = 64)
var txId: String = "",
override fun valueFromRow(row: ResultRow): SignedTransaction = deserializeFromBlob(row[table.transaction])
@Lob
@Column
var transaction: ByteArray = ByteArray(0)
)
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<SecureHash, SignedTransaction>, finalizables: MutableList<() -> Unit>) {
insert[table.txId] = entry.key
}
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<SecureHash, SignedTransaction>, finalizables: MutableList<() -> Unit>) {
insert[table.transaction] = serializeToBlob(entry.value, finalizables)
private companion object {
fun createTransactionsMap(): AppendOnlyPersistentMap<SecureHash, SignedTransaction, DBTransaction, String> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { it.toString() },
fromPersistentEntity = { Pair(SecureHash.parse(it.txId),
it.transaction.deserialize<SignedTransaction>( context = SerializationDefaults.STORAGE_CONTEXT)) },
toPersistentEntity = { key: SecureHash, value: SignedTransaction ->
DBTransaction().apply {
txId = key.toString()
transaction = value.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
}
},
persistentEntityClass = DBTransaction::class.java
)
}
}
private val txStorage = synchronizedMap(TransactionsMap())
private val txStorage = createTransactionsMap()
override fun addTransaction(transaction: SignedTransaction): Boolean {
val recorded = synchronized(txStorage) {
val old = txStorage[transaction.id]
if (old == null) {
txStorage.put(transaction.id, transaction)
updatesPublisher.bufferUntilDatabaseCommit().onNext(transaction)
true
} else {
false
}
override fun addTransaction(transaction: SignedTransaction): Boolean =
txStorage.addWithDuplicatesAllowed(transaction.id, transaction).apply {
updatesPublisher.bufferUntilDatabaseCommit().onNext(transaction)
}
if (!recorded) {
exposedLogger.warn("Duplicate recording of transaction ${transaction.id}")
}
return recorded
}
override fun getTransaction(id: SecureHash): SignedTransaction? {
synchronized(txStorage) {
return txStorage[id]
}
}
override fun getTransaction(id: SecureHash): SignedTransaction? = txStorage[id]
private val updatesPublisher = PublishSubject.create<SignedTransaction>().toSerialized()
override val updates: Observable<SignedTransaction> = updatesPublisher.wrapWithDatabaseTransaction()
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
synchronized(txStorage) {
return DataFeed(txStorage.values.toList(), updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
}
}
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> =
DataFeed(txStorage.allPersisted().map { it.second }.toList(), updatesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction())
@VisibleForTesting
val transactions: Iterable<SignedTransaction> get() = synchronized(txStorage) {
txStorage.values.toList()
}
val transactions: Iterable<SignedTransaction> get() = txStorage.allPersisted().map { it.second }.toList()
}

View File

@ -10,6 +10,11 @@ import net.corda.core.schemas.QueryableState
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.node.services.api.SchemaService
import net.corda.core.schemas.CommonSchemaV1
import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.vault.VaultSchemaV1
import net.corda.schemas.CashSchemaV1
@ -23,14 +28,25 @@ import net.corda.schemas.CashSchemaV1
*/
class NodeSchemaService(customSchemas: Set<MappedSchema> = emptySet()) : SchemaService, SingletonSerializeAsToken() {
// Currently does not support configuring schema options.
// Entities for compulsory services
object NodeServices
object NodeServicesV1 : MappedSchema(schemaFamily = NodeServices.javaClass, version = 1,
mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java,
DBTransactionStorage.DBTransaction::class.java,
DBTransactionMappingStorage.DBTransactionMapping::class.java,
PersistentKeyManagementService.PersistentKey::class.java,
PersistentUniquenessProvider.PersistentUniqueness::class.java
))
// Required schemas are those used by internal Corda services
// For example, cash is used by the vault for coin selection (but will be extracted as a standalone CorDapp in future)
val requiredSchemas: Map<MappedSchema, SchemaService.SchemaOptions> =
mapOf(Pair(CashSchemaV1, SchemaService.SchemaOptions()),
Pair(CommonSchemaV1, SchemaService.SchemaOptions()),
Pair(VaultSchemaV1, SchemaService.SchemaOptions()))
Pair(VaultSchemaV1, SchemaService.SchemaOptions()),
Pair(NodeServicesV1, SchemaService.SchemaOptions()))
override val schemaOptions: Map<MappedSchema, SchemaService.SchemaOptions> = requiredSchemas.plus(customSchemas.map {
mappedSchema -> Pair(mappedSchema, SchemaService.SchemaOptions())

View File

@ -1,82 +1,110 @@
package net.corda.node.services.transactions
import net.corda.core.internal.ThreadBox
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.parsePublicKeyBase58
import net.corda.core.identity.Party
import net.corda.core.internal.ThreadBox
import net.corda.core.node.services.UniquenessException
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.loggerFor
import net.corda.node.utilities.*
import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import java.io.Serializable
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.*
/** A RDBMS backed Uniqueness provider */
@ThreadSafe
class PersistentUniquenessProvider : UniquenessProvider, SingletonSerializeAsToken() {
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_commit_log")
class PersistentUniqueness (
@EmbeddedId
var id: StateRef = StateRef(),
@Column(name = "consuming_transaction_id")
var consumingTxHash: String = "",
@Column(name = "consuming_input_index", length = 36)
var consumingIndex: Int = 0,
@Embedded
var party: Party = Party()
) {
@Embeddable
data class StateRef (
@Column(name = "transaction_id")
var txId: String = "",
@Column(name = "output_index", length = 36)
var index: Int = 0
) : Serializable
@Embeddable
data class Party (
@Column(name = "requesting_party_name")
var name: String = "",
@Column(name = "requesting_party_key", length = 255)
var owningKey: String = ""
) : Serializable
}
private class InnerState {
val committedStates = createMap()
}
private val mutex = ThreadBox(InnerState())
companion object {
private val TABLE_NAME = "${NODE_DATABASE_PREFIX}notary_commit_log"
private val log = loggerFor<PersistentUniquenessProvider>()
}
/**
* For each input state store the consuming transaction information.
*/
private object Table : JDBCHashedTable(TABLE_NAME) {
val output = stateRef("transaction_id", "output_index")
val consumingTxHash = secureHash("consuming_transaction_id")
val consumingIndex = integer("consuming_input_index")
val requestingParty = party("requesting_party_name", "requesting_party_key")
}
private val committedStates = ThreadBox(object : AbstractJDBCHashMap<StateRef, UniquenessProvider.ConsumingTx, Table>(Table, loadOnInit = false) {
override fun keyFromRow(row: ResultRow): StateRef = StateRef(row[table.output.txId], row[table.output.index])
override fun valueFromRow(row: ResultRow): UniquenessProvider.ConsumingTx = UniquenessProvider.ConsumingTx(
row[table.consumingTxHash],
row[table.consumingIndex],
Party(X500Name(row[table.requestingParty.name]), row[table.requestingParty.owningKey])
)
override fun addKeyToInsert(insert: InsertStatement,
entry: Map.Entry<StateRef, UniquenessProvider.ConsumingTx>,
finalizables: MutableList<() -> Unit>) {
insert[table.output.txId] = entry.key.txhash
insert[table.output.index] = entry.key.index
fun createMap(): AppendOnlyPersistentMap<StateRef, UniquenessProvider.ConsumingTx, PersistentUniqueness, PersistentUniqueness.StateRef> {
return AppendOnlyPersistentMap(
toPersistentEntityKey = { PersistentUniqueness.StateRef(it.txhash.toString(), it.index) },
fromPersistentEntity = {
Pair(StateRef(SecureHash.parse(it.id.txId), it.id.index),
UniquenessProvider.ConsumingTx(SecureHash.parse(it.consumingTxHash), it.consumingIndex,
Party(X500Name(it.party.name), parsePublicKeyBase58(it.party.owningKey))))
},
toPersistentEntity = { key: StateRef, value: UniquenessProvider.ConsumingTx ->
PersistentUniqueness().apply {
id = PersistentUniqueness.StateRef(key.txhash.toString(), key.index)
consumingTxHash = value.id.toString()
consumingIndex = value.inputIndex
party = PersistentUniqueness.Party(value.requestingParty.name.toString())
}
},
persistentEntityClass = PersistentUniqueness::class.java
)
}
override fun addValueToInsert(insert: InsertStatement,
entry: Map.Entry<StateRef, UniquenessProvider.ConsumingTx>,
finalizables: MutableList<() -> Unit>) {
insert[table.consumingTxHash] = entry.value.id
insert[table.consumingIndex] = entry.value.inputIndex
insert[table.requestingParty.name] = entry.value.requestingParty.name.toString()
insert[table.requestingParty.owningKey] = entry.value.requestingParty.owningKey
}
})
}
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
val conflict = committedStates.locked {
val conflictingStates = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
for (inputState in states) {
val consumingTx = get(inputState)
if (consumingTx != null) conflictingStates[inputState] = consumingTx
}
if (conflictingStates.isNotEmpty()) {
log.debug("Failure, input states already committed: ${conflictingStates.keys}")
UniquenessProvider.Conflict(conflictingStates)
} else {
states.forEachIndexed { i, stateRef ->
put(stateRef, UniquenessProvider.ConsumingTx(txId, i, callerIdentity))
val conflict = mutex.locked {
val conflictingStates = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
for (inputState in states) {
val consumingTx = committedStates.get(inputState)
if (consumingTx != null) conflictingStates[inputState] = consumingTx
}
if (conflictingStates.isNotEmpty()) {
log.debug("Failure, input states already committed: ${conflictingStates.keys}")
UniquenessProvider.Conflict(conflictingStates)
} else {
states.forEachIndexed { i, stateRef ->
committedStates[stateRef] = UniquenessProvider.ConsumingTx(txId, i, callerIdentity)
}
log.debug("Successfully committed all input states: $states")
null
}
}
log.debug("Successfully committed all input states: $states")
null
}
}
if (conflict != null) throw UniquenessException(conflict)
}

View File

@ -0,0 +1,113 @@
package net.corda.node.utilities
import net.corda.core.utilities.loggerFor
import java.util.*
/**
* Implements a caching layer on top of an *append-only* table accessed via Hibernate mapping. Note that if the same key is [put] twice the
* behaviour is unpredictable! There is a best-effort check for double inserts, but this should *not* be relied on, so
* ONLY USE THIS IF YOUR TABLE IS APPEND-ONLY
*/
class AppendOnlyPersistentMap<K, V, E, EK> (
val toPersistentEntityKey: (K) -> EK,
val fromPersistentEntity: (E) -> Pair<K,V>,
val toPersistentEntity: (key: K, value: V) -> E,
val persistentEntityClass: Class<E>,
cacheBound: Long = 1024
) { //TODO determine cacheBound based on entity class later or with node config allowing tuning, or using some heuristic based on heap size
private companion object {
val log = loggerFor<AppendOnlyPersistentMap<*, *, *, *>>()
}
private val cache = NonInvalidatingCache<K, Optional<V>>(
bound = cacheBound,
concurrencyLevel = 8,
loadFunction = { key -> Optional.ofNullable(loadValue(key)) }
)
/**
* Returns the value associated with the key, first loading that value from the storage if necessary.
*/
operator fun get(key: K): V? {
return cache.get(key).orElse(null)
}
/**
* Returns all key/value pairs from the underlying storage.
*/
fun allPersisted(): Sequence<Pair<K, V>> {
val criteriaQuery = DatabaseTransactionManager.current().session.criteriaBuilder.createQuery(persistentEntityClass)
val root = criteriaQuery.from(persistentEntityClass)
criteriaQuery.select(root)
val query = DatabaseTransactionManager.current().session.createQuery(criteriaQuery)
val result = query.resultList
return result.map { x -> fromPersistentEntity(x) }.asSequence()
}
private tailrec fun set(key: K, value: V, logWarning: Boolean = true, store: (K,V) -> V?): Boolean {
var insertionAttempt = false
var isUnique = true
val existingInCache = cache.get(key) { // Thread safe, if multiple threads may wait until the first one has loaded.
insertionAttempt = true
// Key wasn't in the cache and might be in the underlying storage.
// Depending on 'store' method, this may insert without checking key duplication or it may avoid inserting a duplicated key.
val existingInDb = store(key, value)
if (existingInDb != null) { // Always reuse an existing value from the storage of a duplicated key.
Optional.of(existingInDb)
} else {
Optional.of(value)
}
}
if (!insertionAttempt) {
if (existingInCache.isPresent) {
// Key already exists in cache, do nothing.
isUnique = false
} else {
// This happens when the key was queried before with no value associated. We invalidate the cached null
// value and recursively call set again. This is to avoid race conditions where another thread queries after
// the invalidate but before the set.
cache.invalidate(key)
return set(key, value, logWarning, store)
}
}
if (logWarning && !isUnique) {
log.warn("Double insert in ${this.javaClass.name} for entity class $persistentEntityClass key $key, not inserting the second time")
}
return isUnique
}
/**
* Puts the value into the map and the underlying storage.
* Inserting the duplicated key may be unpredictable.
*/
operator fun set(key: K, value: V) =
set(key, value, logWarning = false) {
key,value -> DatabaseTransactionManager.current().session.save(toPersistentEntity(key,value))
null
}
/**
* Puts the value into the map and underlying storage.
* Duplicated key is not added into the map and underlying storage.
* @return true if added key was unique, otherwise false
*/
fun addWithDuplicatesAllowed(key: K, value: V): Boolean =
set(key, value) {
key, value ->
val existingEntry = DatabaseTransactionManager.current().session.find(persistentEntityClass, toPersistentEntityKey(key))
if (existingEntry == null) {
DatabaseTransactionManager.current().session.save(toPersistentEntity(key,value))
null
} else {
fromPersistentEntity(existingEntry).second
}
}
private fun loadValue(key: K): V? {
val result = DatabaseTransactionManager.current().session.find(persistentEntityClass, toPersistentEntityKey(key))
return result?.let(fromPersistentEntity)?.second
}
}

View File

@ -2,6 +2,11 @@ package net.corda.node.utilities
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import net.corda.core.node.services.IdentityService
import net.corda.core.schemas.MappedSchema
import net.corda.node.services.database.HibernateConfiguration
import net.corda.node.services.schema.NodeSchemaService
import org.hibernate.SessionFactory
import org.jetbrains.exposed.sql.Database
import rx.Observable
@ -15,15 +20,21 @@ import java.util.concurrent.CopyOnWriteArrayList
//HikariDataSource implements Closeable which allows CordaPersistence to be Closeable
class CordaPersistence(var dataSource: HikariDataSource, databaseProperties: Properties): Closeable {
class CordaPersistence(var dataSource: HikariDataSource, var nodeSchemaService: NodeSchemaService, val identitySvc: ()-> IdentityService, databaseProperties: Properties): Closeable {
/** Holds Exposed database, the field will be removed once Exposed library is removed */
lateinit var database: Database
var transactionIsolationLevel = parserTransactionIsolationLevel(databaseProperties.getProperty("transactionIsolationLevel"))
val entityManagerFactory: SessionFactory by lazy(LazyThreadSafetyMode.NONE) {
transaction {
HibernateConfiguration(nodeSchemaService, databaseProperties, identitySvc).sessionFactoryForRegisteredSchemas()
}
}
companion object {
fun connect(dataSource: HikariDataSource, databaseProperties: Properties): CordaPersistence {
return CordaPersistence(dataSource, databaseProperties).apply {
fun connect(dataSource: HikariDataSource, nodeSchemaService: NodeSchemaService, identitySvc: () -> IdentityService, databaseProperties: Properties): CordaPersistence {
return CordaPersistence(dataSource, nodeSchemaService, identitySvc, databaseProperties).apply {
DatabaseTransactionManager(this)
}
}
@ -89,10 +100,10 @@ class CordaPersistence(var dataSource: HikariDataSource, databaseProperties: Pro
}
}
fun configureDatabase(dataSourceProperties: Properties, databaseProperties: Properties?): CordaPersistence {
fun configureDatabase(dataSourceProperties: Properties, databaseProperties: Properties?, entitySchemas: Set<MappedSchema> = emptySet<MappedSchema>(), identitySvc: ()-> IdentityService): CordaPersistence {
val config = HikariConfig(dataSourceProperties)
val dataSource = HikariDataSource(config)
val persistence = CordaPersistence.connect(dataSource, databaseProperties ?: Properties())
val persistence = CordaPersistence.connect(dataSource, NodeSchemaService(entitySchemas), identitySvc, databaseProperties ?: Properties())
//org.jetbrains.exposed.sql.Database will be removed once Exposed library is removed
val database = Database.connect(dataSource) { _ -> ExposedTransactionManager() }
@ -156,7 +167,7 @@ private class DatabaseTransactionWrappingSubscriber<U>(val db: CordaPersistence?
}
// A subscriber that wraps another but does not pass on observations to it.
private class NoOpSubscriber<U>(t: Subscriber<in U>) : Subscriber<U>(t) {
private class NoOpSubscriber<U>(t: Subscriber<in U>): Subscriber<U>(t) {
override fun onCompleted() {
}
@ -191,15 +202,14 @@ fun <T : Any> rx.Observable<T>.wrapWithDatabaseTransaction(db: CordaPersistence?
}
}
fun parserTransactionIsolationLevel(property: String?) : Int =
when (property) {
"none" -> Connection.TRANSACTION_NONE
"readUncommitted" -> Connection.TRANSACTION_READ_UNCOMMITTED
"readCommitted" -> Connection.TRANSACTION_READ_COMMITTED
"repeatableRead" -> Connection.TRANSACTION_REPEATABLE_READ
"serializable" -> Connection.TRANSACTION_SERIALIZABLE
else -> {
Connection.TRANSACTION_REPEATABLE_READ
fun parserTransactionIsolationLevel(property: String?): Int =
when (property) {
"none" -> Connection.TRANSACTION_NONE
"readUncommitted" -> Connection.TRANSACTION_READ_UNCOMMITTED
"readCommitted" -> Connection.TRANSACTION_READ_COMMITTED
"repeatableRead" -> Connection.TRANSACTION_REPEATABLE_READ
"serializable" -> Connection.TRANSACTION_SERIALIZABLE
else -> {
Connection.TRANSACTION_REPEATABLE_READ
}
}
}

View File

@ -1,6 +1,8 @@
package net.corda.node.utilities
import co.paralleluniverse.strands.Strand
import org.hibernate.Session
import org.hibernate.Transaction
import rx.subjects.PublishSubject
import rx.subjects.Subject
import java.sql.Connection
@ -21,13 +23,28 @@ class DatabaseTransaction(isolation: Int, val threadLocal: ThreadLocal<DatabaseT
}
}
private val sessionDelegate = lazy {
val session = cordaPersistence.entityManagerFactory.withOptions().connection(connection).openSession()
hibernateTransaction = session.beginTransaction()
session
}
val session: Session by sessionDelegate
private lateinit var hibernateTransaction : Transaction
val outerTransaction: DatabaseTransaction? = threadLocal.get()
fun commit() {
if (sessionDelegate.isInitialized()) {
hibernateTransaction.commit()
}
connection.commit()
}
fun rollback() {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.clear()
}
if (!connection.isClosed) {
connection.rollback()
}

View File

@ -0,0 +1,33 @@
package net.corda.node.utilities
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.cache.LoadingCache
import com.google.common.util.concurrent.ListenableFuture
class NonInvalidatingCache<K, V> private constructor(
val cache: LoadingCache<K, V>
): LoadingCache<K, V> by cache {
constructor(bound: Long, concurrencyLevel: Int, loadFunction: (K) -> V) :
this(buildCache(bound, concurrencyLevel, loadFunction))
private companion object {
private fun <K, V> buildCache(bound: Long, concurrencyLevel: Int, loadFunction: (K) -> V): LoadingCache<K, V> {
val builder = CacheBuilder.newBuilder().maximumSize(bound).concurrencyLevel(concurrencyLevel)
return builder.build(NonInvalidatingCacheLoader(loadFunction))
}
}
// TODO look into overriding loadAll() if we ever use it
private class NonInvalidatingCacheLoader<K, V>(val loadFunction: (K) -> V) : CacheLoader<K, V>() {
override fun reload(key: K, oldValue: V): ListenableFuture<V> {
throw IllegalStateException("Non invalidating cache refreshed")
}
override fun load(key: K) = loadFunction(key)
override fun loadAll(keys: Iterable<K>): MutableMap<K, V> {
return super.loadAll(keys)
}
}
}

View File

@ -10,7 +10,6 @@ import net.corda.core.schemas.CommonSchemaV1
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.deserialize
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.vault.VaultSchemaV1
@ -27,6 +26,7 @@ import net.corda.testing.contracts.fillWithSomeTestLinearStates
import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.node.makeTestIdentityService
import net.corda.testing.schemas.DummyLinearStateSchemaV1
import net.corda.testing.schemas.DummyLinearStateSchemaV2
import org.assertj.core.api.Assertions
@ -65,11 +65,10 @@ class HibernateConfigurationTest : TestDependencyInjectionBase() {
issuerServices = MockServices(DUMMY_CASH_ISSUER_KEY, BOB_KEY, BOC_KEY)
val dataSourceProps = makeTestDataSourceProperties()
val defaultDatabaseProperties = makeTestDatabaseProperties()
database = configureDatabase(dataSourceProps, defaultDatabaseProperties)
database = configureDatabase(dataSourceProps, defaultDatabaseProperties, identitySvc = ::makeTestIdentityService)
val customSchemas = setOf(VaultSchemaV1, CashSchemaV1, SampleCashSchemaV2, SampleCashSchemaV3)
database.transaction {
val identityService = InMemoryIdentityService(MOCK_IDENTITIES, trustRoot = DUMMY_CA.certificate)
hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas), makeTestDatabaseProperties(), identityService)
hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas), makeTestDatabaseProperties(), ::makeTestIdentityService)
services = object : MockServices(BOB_KEY, BOC_KEY, DUMMY_NOTARY_KEY) {
override val vaultService: VaultService = makeVaultService(dataSourceProps, hibernateConfig)

View File

@ -17,13 +17,11 @@ import net.corda.node.services.vault.schemas.requery.VaultSchema
import net.corda.node.services.vault.schemas.requery.VaultStatesEntity
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.testing.ALICE_PUBKEY
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.DUMMY_PUBKEY_1
import net.corda.testing.TestDependencyInjectionBase
import net.corda.testing.*
import net.corda.testing.contracts.DummyContract
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.node.makeTestIdentityService
import org.assertj.core.api.Assertions
import org.junit.After
import org.junit.Assert.assertEquals
@ -42,7 +40,7 @@ class RequeryConfigurationTest : TestDependencyInjectionBase() {
@Before
fun setUp() {
val dataSourceProperties = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProperties, makeTestDatabaseProperties())
database = configureDatabase(dataSourceProperties, makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
newTransactionStorage()
newRequeryStorage(dataSourceProperties)
}

View File

@ -10,9 +10,6 @@ import net.corda.core.node.ServiceHub
import net.corda.core.node.services.VaultService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.TransactionBuilder
import net.corda.testing.ALICE_KEY
import net.corda.testing.DUMMY_CA
import net.corda.testing.DUMMY_NOTARY
import net.corda.node.services.MockServiceHubInternal
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.persistence.DBCheckpointStorage
@ -22,14 +19,11 @@ import net.corda.node.services.vault.NodeVaultService
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.testing.*
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.MockKeyManagementService
import net.corda.testing.getTestX509Name
import net.corda.testing.testNodeConfiguration
import net.corda.testing.initialiseTestSerialization
import net.corda.testing.node.*
import net.corda.testing.node.TestClock
import net.corda.testing.resetTestSerialization
import org.assertj.core.api.Assertions.assertThat
import org.bouncycastle.asn1.x500.X500Name
import org.junit.After
@ -77,7 +71,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
smmHasRemovedAllFlows = CountDownLatch(1)
calls = 0
val dataSourceProps = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProps, makeTestDatabaseProperties())
database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
val identityService = InMemoryIdentityService(trustRoot = DUMMY_CA.certificate)
val kms = MockKeyManagementService(identityService, ALICE_KEY)

View File

@ -13,6 +13,7 @@ import net.corda.node.services.api.DEFAULT_SESSION_ID
import net.corda.node.services.api.MonitoringService
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.network.InMemoryNetworkMapCache
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.PersistentUniquenessProvider
@ -23,6 +24,7 @@ import net.corda.testing.*
import net.corda.testing.node.MOCK_VERSION_INFO
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.node.makeTestIdentityService
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
@ -69,7 +71,7 @@ class ArtemisMessagingTests : TestDependencyInjectionBase() {
baseDirectory = baseDirectory,
myLegalName = ALICE.name)
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties())
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
networkMapRegistrationFuture = doneFuture(Unit)
}

View File

@ -11,8 +11,8 @@ import net.corda.testing.LogHelper
import net.corda.testing.TestDependencyInjectionBase
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.node.makeTestIdentityService
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -33,7 +33,7 @@ class DBCheckpointStorageTests : TestDependencyInjectionBase() {
@Before
fun setUp() {
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties())
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
newCheckpointStorage()
}
@ -94,16 +94,6 @@ class DBCheckpointStorageTests : TestDependencyInjectionBase() {
}
}
@Test
fun `remove unknown checkpoint`() {
val checkpoint = newCheckpoint()
database.transaction {
assertThatExceptionOfType(IllegalArgumentException::class.java).isThrownBy {
checkpointStorage.removeCheckpoint(checkpoint)
}
}
}
@Test
fun `add two checkpoints then remove first one`() {
val firstCheckpoint = newCheckpoint()

View File

@ -4,19 +4,28 @@ import net.corda.core.contracts.StateRef
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.node.services.VaultService
import net.corda.core.crypto.TransactionSignature
import net.corda.core.schemas.MappedSchema
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.node.services.database.HibernateConfiguration
import net.corda.node.services.schema.HibernateObserver
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.vault.NodeVaultService
import net.corda.node.services.vault.VaultSchemaV1
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.testing.ALICE_PUBKEY
import net.corda.testing.DUMMY_NOTARY
import net.corda.testing.LogHelper
import net.corda.testing.TestDependencyInjectionBase
import net.corda.schemas.CashSchemaV1
import net.corda.schemas.SampleCashSchemaV2
import net.corda.schemas.SampleCashSchemaV3
import net.corda.testing.*
import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.node.makeTestIdentityService
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
@ -27,11 +36,43 @@ import kotlin.test.assertEquals
class DBTransactionStorageTests : TestDependencyInjectionBase() {
lateinit var database: CordaPersistence
lateinit var transactionStorage: DBTransactionStorage
lateinit var services: MockServices
val vault: VaultService get() = services.vaultService
// Hibernate configuration objects
lateinit var hibernateConfig: HibernateConfiguration
@Before
fun setUp() {
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties())
val dataSourceProps = makeTestDataSourceProperties()
val transactionSchema = MappedSchema(schemaFamily = javaClass, version = 1,
mappedTypes = listOf(DBTransactionStorage.DBTransaction::class.java))
val customSchemas = setOf(VaultSchemaV1, CashSchemaV1, SampleCashSchemaV2, SampleCashSchemaV3, transactionSchema)
database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(), customSchemas, identitySvc = ::makeTestIdentityService)
database.transaction {
hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas), makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
services = object : MockServices(BOB_KEY) {
override val vaultService: VaultService get() {
val vaultService = NodeVaultService(this, dataSourceProps, makeTestDatabaseProperties())
hibernatePersister = HibernateObserver(vaultService.rawUpdates, hibernateConfig)
return vaultService
}
override fun recordTransactions(txs: Iterable<SignedTransaction>) {
for (stx in txs) {
validatedTransactions.addTransaction(stx)
}
// Refactored to use notifyAll() as we have no other unit test for that method with multiple transactions.
vaultService.notifyAll(txs.map { it.tx })
}
}
}
newTransactionStorage()
}
@ -120,6 +161,37 @@ class DBTransactionStorageTests : TestDependencyInjectionBase() {
}
}
@Test
fun `transaction saved twice in same DB transaction scope`() {
val firstTransaction = newTransaction()
database.transaction {
transactionStorage.addTransaction(firstTransaction)
transactionStorage.addTransaction(firstTransaction)
}
assertTransactionIsRetrievable(firstTransaction)
database.transaction {
assertThat(transactionStorage.transactions).containsOnly(firstTransaction)
}
}
@Test
fun `transaction saved twice in two DB transaction scopes`() {
val firstTransaction = newTransaction()
val secondTransaction = newTransaction()
database.transaction {
transactionStorage.addTransaction(firstTransaction)
}
database.transaction {
transactionStorage.addTransaction(secondTransaction)
transactionStorage.addTransaction(firstTransaction)
}
assertTransactionIsRetrievable(firstTransaction)
database.transaction {
assertThat(transactionStorage.transactions).containsOnly(firstTransaction, secondTransaction)
}
}
@Test
fun `updates are fired`() {
val future = transactionStorage.updates.toFuture()

View File

@ -17,6 +17,7 @@ import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.node.makeTestIdentityService
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -43,7 +44,7 @@ class NodeAttachmentStorageTest {
LogHelper.setLevel(PersistentUniquenessProvider::class)
dataSourceProperties = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProperties, makeTestDatabaseProperties())
database = configureDatabase(dataSourceProperties, makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
configuration = RequeryConfiguration(dataSourceProperties, databaseProperties = makeTestDatabaseProperties())
fs = Jimfs.newFileSystem(Configuration.unix())

View File

@ -18,6 +18,7 @@ import net.corda.testing.MEGA_CORP
import net.corda.testing.MOCK_IDENTITIES
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.node.makeTestIdentityService
import org.hibernate.annotations.Cascade
import org.hibernate.annotations.CascadeType
import org.jetbrains.exposed.sql.transactions.TransactionManager
@ -35,7 +36,7 @@ class HibernateObserverTests {
@Before
fun setUp() {
LogHelper.setLevel(HibernateObserver::class)
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties())
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
}
@After
@ -105,8 +106,7 @@ class HibernateObserverTests {
}
@Suppress("UNUSED_VARIABLE")
val identityService = InMemoryIdentityService(MOCK_IDENTITIES, trustRoot = DUMMY_CA.certificate)
val observer = HibernateObserver(rawUpdatesPublisher, HibernateConfiguration(schemaService, makeTestDatabaseProperties(), identityService))
val observer = HibernateObserver(rawUpdatesPublisher, HibernateConfiguration(schemaService, makeTestDatabaseProperties(), ::makeTestIdentityService))
database.transaction {
rawUpdatesPublisher.onNext(Vault.Update(emptySet(), setOf(StateAndRef(TransactionState(TestState(), MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0)))))
val parentRowCountResult = TransactionManager.current().connection.prepareStatement("select count(*) from Parents").executeQuery()

View File

@ -11,11 +11,10 @@ import net.corda.core.utilities.getOrThrow
import net.corda.node.services.network.NetworkMapService
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.testing.LogHelper
import net.corda.testing.TestDependencyInjectionBase
import net.corda.testing.freeLocalHostAndPort
import net.corda.testing.*
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.node.makeTestIdentityService
import org.jetbrains.exposed.sql.Transaction
import org.junit.After
import org.junit.Before
@ -35,7 +34,7 @@ class DistributedImmutableMapTests : TestDependencyInjectionBase() {
fun setup() {
LogHelper.setLevel("-org.apache.activemq")
LogHelper.setLevel(NetworkMapService::class)
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties())
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
cluster = setUpCluster()
}

View File

@ -4,12 +4,10 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.UniquenessException
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.testing.LogHelper
import net.corda.testing.MEGA_CORP
import net.corda.testing.TestDependencyInjectionBase
import net.corda.testing.generateStateRef
import net.corda.testing.*
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.node.makeTestIdentityService
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -25,7 +23,7 @@ class PersistentUniquenessProviderTests : TestDependencyInjectionBase() {
@Before
fun setUp() {
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties())
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
}
@After

View File

@ -15,13 +15,9 @@ import net.corda.core.node.services.*
import net.corda.core.node.services.vault.*
import net.corda.core.node.services.vault.QueryCriteria.*
import net.corda.core.utilities.seconds
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.toHexString
import net.corda.node.services.database.HibernateConfiguration
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.schema.NodeSchemaService
import net.corda.core.utilities.*
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
@ -34,6 +30,7 @@ import net.corda.testing.contracts.*
import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestDatabaseAndMockServices
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.node.makeTestIdentityService
import net.corda.testing.schemas.DummyLinearStateSchemaV1
import org.assertj.core.api.Assertions
import org.assertj.core.api.Assertions.assertThat
@ -77,7 +74,7 @@ class VaultQueryTests : TestDependencyInjectionBase() {
@Ignore
@Test
fun createPersistentTestDb() {
val database = configureDatabase(makePersistentDataSourceProperties(), makeTestDatabaseProperties())
val database = configureDatabase(makePersistentDataSourceProperties(), makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
setUpDb(database, 5000)

View File

@ -5,6 +5,7 @@ import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.tee
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.node.makeTestIdentityService
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
@ -20,7 +21,7 @@ class ObservablesTests {
val toBeClosed = mutableListOf<Closeable>()
fun createDatabase(): CordaPersistence {
val database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties())
val database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
toBeClosed += database
return database
}

View File

@ -18,10 +18,7 @@ import net.corda.irs.flows.RatesFixFlow
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.testing.*
import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestDataSourceProperties
import net.corda.testing.node.makeTestDatabaseProperties
import net.corda.testing.node.*
import org.bouncycastle.asn1.x500.X500Name
import org.junit.After
import org.junit.Assert
@ -60,7 +57,7 @@ class NodeInterestRatesTest : TestDependencyInjectionBase() {
@Before
fun setUp() {
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties())
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), identitySvc = ::makeTestIdentityService)
database.transaction {
oracle = NodeInterestRates.Oracle(
MEGA_CORP,

View File

@ -88,7 +88,7 @@ open class MockServices(vararg val keys: KeyPair) : ServiceHub {
lateinit var hibernatePersister: HibernateObserver
fun makeVaultService(dataSourceProps: Properties, hibernateConfig: HibernateConfiguration = HibernateConfiguration(NodeSchemaService(), makeTestDatabaseProperties(), identityService)): VaultService {
fun makeVaultService(dataSourceProps: Properties, hibernateConfig: HibernateConfiguration = HibernateConfiguration(NodeSchemaService(), makeTestDatabaseProperties(), { identityService })): VaultService {
val vaultService = NodeVaultService(this, dataSourceProps, makeTestDatabaseProperties())
hibernatePersister = HibernateObserver(vaultService.rawUpdates, hibernateConfig)
return vaultService
@ -216,13 +216,15 @@ fun makeTestDatabaseProperties(): Properties {
return props
}
fun makeTestIdentityService() = InMemoryIdentityService(MOCK_IDENTITIES, trustRoot = DUMMY_CA.certificate)
fun makeTestDatabaseAndMockServices(customSchemas: Set<MappedSchema> = setOf(CommercialPaperSchemaV1, DummyLinearStateSchemaV1, CashSchemaV1), keys: List<KeyPair> = listOf(MEGA_CORP_KEY)): Pair<CordaPersistence, MockServices> {
val dataSourceProps = makeTestDataSourceProperties()
val databaseProperties = makeTestDatabaseProperties()
val database = configureDatabase(dataSourceProps, databaseProperties)
val database = configureDatabase(dataSourceProps, databaseProperties, identitySvc = ::makeTestIdentityService)
val mockService = database.transaction {
val identityService = InMemoryIdentityService(MOCK_IDENTITIES, trustRoot = DUMMY_CA.certificate)
val hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas), databaseProperties, identityService)
val hibernateConfig = HibernateConfiguration(NodeSchemaService(customSchemas), databaseProperties, identitySvc = ::makeTestIdentityService)
object : MockServices(*(keys.toTypedArray())) {
override val vaultService: VaultService = makeVaultService(dataSourceProps, hibernateConfig)

View File

@ -32,11 +32,11 @@ class SimpleNode(val config: NodeConfiguration, val address: NetworkHostAndPort
rpcAddress: NetworkHostAndPort = freeLocalHostAndPort(),
trustRoot: X509Certificate) : AutoCloseable {
val database: CordaPersistence = configureDatabase(config.dataSourceProperties, config.database)
val userService = RPCUserServiceImpl(config.rpcUsers)
val monitoringService = MonitoringService(MetricRegistry())
val identity: KeyPair = generateKeyPair()
val identityService: IdentityService = InMemoryIdentityService(trustRoot = trustRoot)
val database: CordaPersistence = configureDatabase(config.dataSourceProperties, config.database, identitySvc = {InMemoryIdentityService(trustRoot = trustRoot)})
val keyService: KeyManagementService = E2ETestKeyManagementService(identityService, setOf(identity))
val executor = ServiceAffinityExecutor(config.myLegalName.commonName, 1)
// TODO: We should have a dummy service hub rather than change behaviour in tests