From 50e613bb75bd493346fd7e11e0c027632309fa2b Mon Sep 17 00:00:00 2001 From: "rick.parker" Date: Wed, 5 Oct 2016 16:58:52 +0100 Subject: [PATCH] Clean up compiler warning and make database table names and columns more meaningful. --- .../keys/PersistentKeyManagementService.kt | 24 ++++- .../services/messaging/NodeMessagingClient.kt | 18 +++- .../network/PersistentNetworkMapService.kt | 27 +++++- .../persistence/DBCheckpointStorage.kt | 26 +++++- .../node/services/schema/HibernateObserver.kt | 22 ++--- .../PersistentUniquenessProvider.kt | 39 +++++++- .../node/services/vault/NodeVaultService.kt | 19 ++-- .../r3corda/node/utilities/DatabaseSupport.kt | 93 ++++++++++++++++++- .../com/r3corda/node/utilities/JDBCHashMap.kt | 73 ++++++++------- .../r3corda/demos/api/NodeInterestRates.kt | 27 +++++- 10 files changed, 290 insertions(+), 78 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/services/keys/PersistentKeyManagementService.kt b/node/src/main/kotlin/com/r3corda/node/services/keys/PersistentKeyManagementService.kt index e867d864d2..a43e957ef6 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/keys/PersistentKeyManagementService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/keys/PersistentKeyManagementService.kt @@ -4,7 +4,9 @@ import com.r3corda.core.ThreadBox import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.node.services.KeyManagementService import com.r3corda.core.serialization.SingletonSerializeAsToken -import com.r3corda.node.utilities.JDBCHashMap +import com.r3corda.node.utilities.* +import org.jetbrains.exposed.sql.ResultRow +import org.jetbrains.exposed.sql.statements.InsertStatement import java.security.KeyPair import java.security.PrivateKey import java.security.PublicKey @@ -18,8 +20,26 @@ import java.util.* * This class needs database transactions to be in-flight during method calls and init. */ class PersistentKeyManagementService(initialKeys: Set) : SingletonSerializeAsToken(), KeyManagementService { + + private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}our_key_pairs") { + val publicKey = publicKey("public_key") + val privateKey = blob("private_key") + } + private class InnerState { - val keys = JDBCHashMap("key_pairs", loadOnInit = false) + val keys = object : AbstractJDBCHashMap(Table, loadOnInit = false) { + override fun keyFromRow(row: ResultRow): PublicKey = row[table.publicKey] + + override fun valueFromRow(row: ResultRow): PrivateKey = deserializeFromBlob(row[table.privateKey]) + + override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { + insert[table.publicKey] = entry.key + } + + override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { + insert[table.privateKey] = serializeToBlob(entry.value, finalizables) + } + } } private val mutex = ThreadBox(InnerState()) diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt index f8869f75fe..778d06e404 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt @@ -9,11 +9,12 @@ import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.trace import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.config.NodeConfiguration -import com.r3corda.node.utilities.AffinityExecutor -import com.r3corda.node.utilities.JDBCHashSet +import com.r3corda.node.utilities.* import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.* +import org.jetbrains.exposed.sql.ResultRow +import org.jetbrains.exposed.sql.statements.InsertStatement import java.nio.file.FileSystems import java.security.PublicKey import java.time.Instant @@ -102,8 +103,19 @@ class NodeMessagingClient(config: NodeConfiguration, private val state = ThreadBox(InnerState()) private val handlers = CopyOnWriteArrayList() + + private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}message_ids") { + val uuid = uuidString("message_id") + } + private val processedMessages: MutableSet = Collections.synchronizedSet(if (persistentInbox) { - JDBCHashSet("message_id", loadOnInit = true) + object : AbstractJDBCHashSet(Table, loadOnInit = true) { + override fun elementFromRow(row: ResultRow): UUID = row[table.uuid] + + override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) { + insert[table.uuid] = entry + } + } } else { HashSet() }) diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/PersistentNetworkMapService.kt b/node/src/main/kotlin/com/r3corda/node/services/network/PersistentNetworkMapService.kt index bc4b294e54..fc3cb0f6d8 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/PersistentNetworkMapService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/PersistentNetworkMapService.kt @@ -4,8 +4,10 @@ import com.r3corda.core.ThreadBox import com.r3corda.core.crypto.Party import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.node.services.api.ServiceHubInternal -import com.r3corda.node.utilities.JDBCHashMap -import java.util.* +import com.r3corda.node.utilities.* +import org.jetbrains.exposed.sql.ResultRow +import org.jetbrains.exposed.sql.statements.InsertStatement +import java.util.Collections.synchronizedMap /** * A network map service backed by a database to survive restarts of the node hosting it. @@ -16,10 +18,27 @@ import java.util.* * exceptions. */ class PersistentNetworkMapService(services: ServiceHubInternal) : AbstractNetworkMapService(services) { + private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}network_map_nodes") { + val nodeParty = party("node_party_name", "node_party_key") + val registrationInfo = blob("node_registration_info") + } - override val registeredNodes: MutableMap = Collections.synchronizedMap(JDBCHashMap("network_map_nodes", loadOnInit = true)) + override val registeredNodes: MutableMap = synchronizedMap(object : AbstractJDBCHashMap(Table, loadOnInit = true) { + override fun keyFromRow(row: ResultRow): Party = Party(row[table.nodeParty.name], row[table.nodeParty.owningKey]) - override val subscribers = ThreadBox(JDBCHashMap("network_map_subscribers", loadOnInit = true)) + override fun valueFromRow(row: ResultRow): NodeRegistrationInfo = deserializeFromBlob(row[table.registrationInfo]) + + override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { + insert[table.nodeParty.name] = entry.key.name + insert[table.nodeParty.owningKey] = entry.key.owningKey + } + + override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { + insert[table.registrationInfo] = serializeToBlob(entry.value, finalizables) + } + }) + + override val subscribers = ThreadBox(JDBCHashMap("${NODE_DATABASE_PREFIX}network_map_subscribers", loadOnInit = true)) init { // Initialise the network map version with the current highest persisted version, or zero if there are no entries. diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/DBCheckpointStorage.kt index 69829b2683..71fa968bbc 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/DBCheckpointStorage.kt @@ -6,14 +6,36 @@ import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize import com.r3corda.node.services.api.Checkpoint import com.r3corda.node.services.api.CheckpointStorage -import com.r3corda.node.utilities.JDBCHashMap +import com.r3corda.node.utilities.* +import org.jetbrains.exposed.sql.ResultRow +import org.jetbrains.exposed.sql.statements.InsertStatement import java.util.Collections.synchronizedMap /** * Simple checkpoint key value storage in DB using the underlying JDBCHashMap and transactional context of the call sites. */ class DBCheckpointStorage : CheckpointStorage { - private val checkpointStorage = synchronizedMap(JDBCHashMap>("checkpoints", loadOnInit = false)) + + private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}checkpoints") { + val checkpointId = secureHash("checkpoint_id") + val checkpoint = blob("checkpoint") + } + + private class CheckpointMap : AbstractJDBCHashMap, Table>(Table, loadOnInit = false) { + override fun keyFromRow(row: ResultRow): SecureHash = row[table.checkpointId] + + override fun valueFromRow(row: ResultRow): SerializedBytes = bytesFromBlob(row[table.checkpoint]) + + override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry>, finalizables: MutableList<() -> Unit>) { + insert[table.checkpointId] = entry.key + } + + override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry>, finalizables: MutableList<() -> Unit>) { + insert[table.checkpoint] = bytesToBlob(entry.value, finalizables) + } + } + + private val checkpointStorage = synchronizedMap(CheckpointMap()) override fun addCheckpoint(checkpoint: Checkpoint) { checkpointStorage.put(checkpoint.id, checkpoint.serialize()) diff --git a/node/src/main/kotlin/com/r3corda/node/services/schema/HibernateObserver.kt b/node/src/main/kotlin/com/r3corda/node/services/schema/HibernateObserver.kt index fd977dad1e..a8e4986458 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/schema/HibernateObserver.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/schema/HibernateObserver.kt @@ -49,24 +49,22 @@ class HibernateObserver(services: ServiceHubInternal) { // TODO: replace auto schema generation as it isn't intended for production use, according to Hibernate docs. val config = Configuration().setProperty("hibernate.connection.provider_class", NodeDatabaseConnectionProvider::class.java.name) .setProperty("hibernate.hbm2ddl.auto", "update") - .setProperty("hibernate.show_sql", "true") - .setProperty("hibernate_format_sql", "true") + .setProperty("hibernate.show_sql", "false") + .setProperty("hibernate.format_sql", "true") val options = schemaService.schemaOptions[schema] val databaseSchema = options?.databaseSchema if (databaseSchema != null) { logger.debug { "Database schema = $databaseSchema" } config.setProperty("hibernate.default_schema", databaseSchema) } - val tablePrefix = options?.tablePrefix - if (tablePrefix != null) { - logger.debug { "Table prefix = $tablePrefix" } - config.setPhysicalNamingStrategy(object : PhysicalNamingStrategyStandardImpl() { - override fun toPhysicalTableName(name: Identifier?, context: JdbcEnvironment?): Identifier { - val default = super.toPhysicalTableName(name, context) - return Identifier.toIdentifier(tablePrefix + default.text, default.isQuoted) - } - }) - } + val tablePrefix = options?.tablePrefix ?: "contract_" // We always have this as the default for aesthetic reasons. + logger.debug { "Table prefix = $tablePrefix" } + config.setPhysicalNamingStrategy(object : PhysicalNamingStrategyStandardImpl() { + override fun toPhysicalTableName(name: Identifier?, context: JdbcEnvironment?): Identifier { + val default = super.toPhysicalTableName(name, context) + return Identifier.toIdentifier(tablePrefix + default.text, default.isQuoted) + } + }) schema.mappedTypes.forEach { config.addAnnotatedClass(it) } val sessionFactory = config.buildSessionFactory() logger.info("Created session factory for schema $schema") diff --git a/node/src/main/kotlin/com/r3corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/com/r3corda/node/services/transactions/PersistentUniquenessProvider.kt index 25e06a19d0..78e966b0e5 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/transactions/PersistentUniquenessProvider.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/transactions/PersistentUniquenessProvider.kt @@ -8,7 +8,9 @@ import com.r3corda.core.node.services.UniquenessException import com.r3corda.core.node.services.UniquenessProvider import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.utilities.loggerFor -import com.r3corda.node.utilities.JDBCHashMap +import com.r3corda.node.utilities.* +import org.jetbrains.exposed.sql.ResultRow +import org.jetbrains.exposed.sql.statements.InsertStatement import java.util.* import javax.annotation.concurrent.ThreadSafe @@ -16,14 +18,45 @@ import javax.annotation.concurrent.ThreadSafe @ThreadSafe class PersistentUniquenessProvider() : UniquenessProvider, SingletonSerializeAsToken() { companion object { - private val TABLE_NAME = "notary_commit_log" + private val TABLE_NAME = "${NODE_DATABASE_PREFIX}notary_commit_log" private val log = loggerFor() } /** * For each input state store the consuming transaction information. */ - val committedStates = ThreadBox(JDBCHashMap(TABLE_NAME, loadOnInit = false)) + private object Table : JDBCHashedTable(TABLE_NAME) { + val output = stateRef("transaction_id", "output_index") + val consumingTxHash = secureHash("consuming_transaction_id") + val consumingIndex = integer("consuming_input_index") + val requestingParty = party("requesting_party_name", "requesting_party_key") + } + + private val committedStates = ThreadBox(object : AbstractJDBCHashMap(Table, loadOnInit = false) { + override fun keyFromRow(row: ResultRow): StateRef = StateRef(row[table.output.txId], row[table.output.index]) + + override fun valueFromRow(row: ResultRow): UniquenessProvider.ConsumingTx = UniquenessProvider.ConsumingTx( + row[table.consumingTxHash], + row[table.consumingIndex], + Party(row[table.requestingParty.name], row[table.requestingParty.owningKey]) + ) + + override fun addKeyToInsert(insert: InsertStatement, + entry: Map.Entry, + finalizables: MutableList<() -> Unit>) { + insert[table.output.txId] = entry.key.txhash + insert[table.output.index] = entry.key.index + } + + override fun addValueToInsert(insert: InsertStatement, + entry: Map.Entry, + finalizables: MutableList<() -> Unit>) { + insert[table.consumingTxHash] = entry.value.id + insert[table.consumingIndex] = entry.value.inputIndex + insert[table.requestingParty.name] = entry.value.requestingParty.name + insert[table.requestingParty.owningKey] = entry.value.requestingParty.owningKey + } + }) override fun commit(states: List, txId: SecureHash, callerIdentity: Party) { val conflict = committedStates.locked { diff --git a/node/src/main/kotlin/com/r3corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/com/r3corda/node/services/vault/NodeVaultService.kt index a94d6938e8..995826cac6 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/vault/NodeVaultService.kt @@ -4,18 +4,17 @@ import com.google.common.collect.Sets import com.r3corda.core.ThreadBox import com.r3corda.core.bufferUntilSubscribed import com.r3corda.core.contracts.* -import com.r3corda.core.crypto.SecureHash import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.Vault import com.r3corda.core.node.services.VaultService import com.r3corda.core.serialization.SingletonSerializeAsToken -import com.r3corda.core.serialization.parseAsHex -import com.r3corda.core.serialization.toHexString import com.r3corda.core.transactions.WireTransaction import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.trace import com.r3corda.node.utilities.AbstractJDBCHashSet import com.r3corda.node.utilities.JDBCHashedTable +import com.r3corda.node.utilities.NODE_DATABASE_PREFIX +import com.r3corda.node.utilities.stateRef import org.jetbrains.exposed.sql.ResultRow import org.jetbrains.exposed.sql.statements.InsertStatement import rx.Observable @@ -39,20 +38,20 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT val log = loggerFor() } - private object StatesSetTable : JDBCHashedTable("vault_unconsumed_states") { - val txhash = varchar("transaction_id", 64) - val index = integer("output_index") + private object StatesSetTable : JDBCHashedTable("${NODE_DATABASE_PREFIX}vault_unconsumed_states") { + val stateRef = stateRef("transaction_id", "output_index") } private val mutex = ThreadBox(object { val unconsumedStates = object : AbstractJDBCHashSet(StatesSetTable) { - override fun elementFromRow(it: ResultRow): StateRef = StateRef(SecureHash.SHA256(it[table.txhash].parseAsHex()), it[table.index]) + override fun elementFromRow(row: ResultRow): StateRef = StateRef(row[table.stateRef.txId], row[table.stateRef.index]) - override fun addElementToInsert(it: InsertStatement, entry: StateRef, finalizables: MutableList<() -> Unit>) { - it[table.txhash] = entry.txhash.bits.toHexString() - it[table.index] = entry.index + override fun addElementToInsert(insert: InsertStatement, entry: StateRef, finalizables: MutableList<() -> Unit>) { + insert[table.stateRef.txId] = entry.txhash + insert[table.stateRef.index] = entry.index } } + val _updatesPublisher = PublishSubject.create() fun allUnconsumedStates(): Iterable> { diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/DatabaseSupport.kt b/node/src/main/kotlin/com/r3corda/node/utilities/DatabaseSupport.kt index ba1ca4fce1..f79722bf28 100644 --- a/node/src/main/kotlin/com/r3corda/node/utilities/DatabaseSupport.kt +++ b/node/src/main/kotlin/com/r3corda/node/utilities/DatabaseSupport.kt @@ -1,16 +1,27 @@ package com.r3corda.node.utilities import co.paralleluniverse.strands.Strand +import com.r3corda.core.crypto.SecureHash +import com.r3corda.core.crypto.parsePublicKeyBase58 +import com.r3corda.core.crypto.toBase58String import com.zaxxer.hikari.HikariConfig import com.zaxxer.hikari.HikariDataSource -import org.jetbrains.exposed.sql.Database -import org.jetbrains.exposed.sql.Transaction +import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.transactions.TransactionInterface import org.jetbrains.exposed.sql.transactions.TransactionManager import java.io.Closeable +import java.security.PublicKey import java.sql.Connection +import java.time.Instant +import java.time.LocalDate +import java.time.ZoneOffset import java.util.* +/** + * Table prefix for all tables owned by the node module. + */ +const val NODE_DATABASE_PREFIX = "node_" + // TODO: Handle commit failure due to database unavailable. Better to shutdown and await database reconnect/recovery. fun databaseTransaction(db: Database, statement: Transaction.() -> T): T { // We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases. @@ -121,6 +132,82 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan connection.close() threadLocal.set(outerTransaction) } - } +} + +// Composite columns for use with below Exposed helpers. +data class PartyColumns(val name: Column, val owningKey: Column) +data class StateRefColumns(val txId: Column, val index: Column) + +/** + * [Table] column helpers for use with Exposed, as per [varchar] etc. + */ +fun Table.publicKey(name: String) = this.registerColumn(name, PublicKeyColumnType) +fun Table.secureHash(name: String) = this.registerColumn(name, SecureHashColumnType) +fun Table.party(nameColumnName: String, keyColumnName: String) = PartyColumns(this.varchar(nameColumnName, length = 255), this.publicKey(keyColumnName)) +fun Table.uuidString(name: String) = this.registerColumn(name, UUIDStringColumnType) +fun Table.localDate(name: String) = this.registerColumn(name, LocalDateColumnType) +fun Table.stateRef(txIdColumnName: String, indexColumnName: String) = StateRefColumns(this.secureHash(txIdColumnName), this.integer(indexColumnName)) + +/** + * [ColumnType] for marshalling to/from database on behalf of [PublicKey]. + */ +object PublicKeyColumnType : ColumnType() { + override fun sqlType(): String = "VARCHAR(255)" + + override fun valueFromDB(value: Any): Any = parsePublicKeyBase58(value.toString()) + + override fun notNullValueToDB(value: Any): Any = if (value is PublicKey) value.toBase58String() else value +} + +/** + * [ColumnType] for marshalling to/from database on behalf of [SecureHash]. + */ +object SecureHashColumnType : ColumnType() { + override fun sqlType(): String = "VARCHAR(64)" + + override fun valueFromDB(value: Any): Any = SecureHash.parse(value.toString()) + + override fun notNullValueToDB(value: Any): Any = if (value is SecureHash) value.toString() else value +} + +/** + * [ColumnType] for marshalling to/from database on behalf of [UUID], always using a string representation. + */ +object UUIDStringColumnType : ColumnType() { + override fun sqlType(): String = "VARCHAR(36)" + + override fun valueFromDB(value: Any): Any = UUID.fromString(value.toString()) + + override fun notNullValueToDB(value: Any): Any = if (value is UUID) value.toString() else value +} + +/** + * [ColumnType] for marshalling to/from database on behalf of [java.time.LocalDate]. + */ +object LocalDateColumnType : ColumnType() { + override fun sqlType(): String = "DATE" + + override fun nonNullValueToString(value: Any): String { + if (value is String) return value + + val localDate = when (value) { + is LocalDate -> value + is java.sql.Date -> value.toLocalDate() + is java.sql.Timestamp -> value.toLocalDateTime().toLocalDate() + else -> error("Unexpected value: $value") + } + return "'$localDate'" + } + + override fun valueFromDB(value: Any): Any = when (value) { + is java.sql.Date -> value.toLocalDate() + is java.sql.Timestamp -> value.toLocalDateTime().toLocalDate() + is Long -> LocalDate.from(Instant.ofEpochMilli(value)) + else -> value + } + + override fun notNullValueToDB(value: Any): Any = if (value is LocalDate) { + java.sql.Date(value.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli()) + } else value } \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/JDBCHashMap.kt b/node/src/main/kotlin/com/r3corda/node/utilities/JDBCHashMap.kt index 4d382ce651..353779b819 100644 --- a/node/src/main/kotlin/com/r3corda/node/utilities/JDBCHashMap.kt +++ b/node/src/main/kotlin/com/r3corda/node/utilities/JDBCHashMap.kt @@ -1,5 +1,6 @@ package com.r3corda.node.utilities +import com.r3corda.core.serialization.SerializedBytes import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize import com.r3corda.core.utilities.loggerFor @@ -32,35 +33,38 @@ class JDBCHashMap(tableName: String, loadOnInit: Boolean = fal val value = blob("value") } - override fun keyFromRow(it: ResultRow): K = deserializeFromBlob(it[table.key]) - override fun valueFromRow(it: ResultRow): V = deserializeFromBlob(it[table.value]) + override fun keyFromRow(row: ResultRow): K = deserializeFromBlob(row[table.key]) + override fun valueFromRow(row: ResultRow): V = deserializeFromBlob(row[table.value]) - override fun addKeyToInsert(it: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { - it[table.key] = serializeToBlob(entry.key, finalizables) + override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { + insert[table.key] = serializeToBlob(entry.key, finalizables) } - override fun addValueToInsert(it: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { - it[table.value] = serializeToBlob(entry.value, finalizables) + override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { + insert[table.value] = serializeToBlob(entry.value, finalizables) } } -fun serializeToBlob(value: T, finalizables: MutableList<() -> Unit>): Blob { - val blob = TransactionManager.currentOrNull()!!.connection.createBlob() +fun bytesToBlob(value: SerializedBytes<*>, finalizables: MutableList<() -> Unit>): Blob { + val blob = TransactionManager.current().connection.createBlob() finalizables += { blob.free() } - blob.setBytes(1, value.serialize().bits) + blob.setBytes(1, value.bits) return blob } -fun deserializeFromBlob(blob: Blob): T { +fun serializeToBlob(value: Any, finalizables: MutableList<() -> Unit>): Blob = bytesToBlob(value.serialize(), finalizables) + +fun bytesFromBlob(blob: Blob): SerializedBytes { try { - val bytes = blob.getBytes(0, blob.length().toInt()) - return bytes.deserialize() + return SerializedBytes(blob.getBytes(0, blob.length().toInt())) } finally { blob.free() } } +fun deserializeFromBlob(blob: Blob): T = bytesFromBlob(blob).deserialize() + /** * A convenient JDBC table backed hash set with iteration order based on insertion order. * See [AbstractJDBCHashSet] and [AbstractJDBCHashMap] for further implementation details. @@ -75,10 +79,10 @@ class JDBCHashSet(tableName: String, loadOnInit: Boolean = false) : Abs val key = blob("key") } - override fun elementFromRow(it: ResultRow): K = deserializeFromBlob(it[table.key]) + override fun elementFromRow(row: ResultRow): K = deserializeFromBlob(row[table.key]) - override fun addElementToInsert(it: InsertStatement, entry: K, finalizables: MutableList<() -> Unit>) { - it[table.key] = serializeToBlob(entry, finalizables) + override fun addElementToInsert(insert: InsertStatement, entry: K, finalizables: MutableList<() -> Unit>) { + insert[table.key] = serializeToBlob(entry, finalizables) } } @@ -88,18 +92,18 @@ class JDBCHashSet(tableName: String, loadOnInit: Boolean = false) : Abs * * See [AbstractJDBCHashMap] for implementation details. */ -abstract class AbstractJDBCHashSet(protected val table: T, loadOnInit: Boolean = false) : MutableSet, AbstractSet() { +abstract class AbstractJDBCHashSet(protected val table: T, loadOnInit: Boolean = false) : MutableSet, AbstractSet() { protected val innerMap = object : AbstractJDBCHashMap(table, loadOnInit) { - override fun keyFromRow(it: ResultRow): K = this@AbstractJDBCHashSet.elementFromRow(it) + override fun keyFromRow(row: ResultRow): K = this@AbstractJDBCHashSet.elementFromRow(row) // Return constant. - override fun valueFromRow(it: ResultRow) = Unit + override fun valueFromRow(row: ResultRow) = Unit - override fun addKeyToInsert(it: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) = - this@AbstractJDBCHashSet.addElementToInsert(it, entry.key, finalizables) + override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) = + this@AbstractJDBCHashSet.addElementToInsert(insert, entry.key, finalizables) // No op as not actually persisted. - override fun addValueToInsert(it: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { + override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) { } } @@ -133,7 +137,7 @@ abstract class AbstractJDBCHashSet(protected val t * * See example implementations in [JDBCHashSet]. */ - protected abstract fun elementFromRow(it: ResultRow): K + protected abstract fun elementFromRow(row: ResultRow): K /** * Implementation should marshall the element to the insert statement. @@ -143,7 +147,7 @@ abstract class AbstractJDBCHashSet(protected val t * * See example implementations in [JDBCHashSet]. */ - protected abstract fun addElementToInsert(it: InsertStatement, entry: K, finalizables: MutableList<() -> Unit>) + protected abstract fun addElementToInsert(insert: InsertStatement, entry: K, finalizables: MutableList<() -> Unit>) } /** @@ -177,7 +181,7 @@ abstract class AbstractJDBCHashSet(protected val t * TODO: if iterators are used extensively when loadOnInit=true, consider maintaining a collection of keys in iteration order to avoid sorting each time. * TODO: revisit whether we need the loadOnInit==true functionality and remove if not. */ -abstract class AbstractJDBCHashMap(val table: T, val loadOnInit: Boolean = false) : MutableMap, AbstractMap() { +abstract class AbstractJDBCHashMap(val table: T, val loadOnInit: Boolean = false) : MutableMap, AbstractMap() { companion object { protected val log = loggerFor>() @@ -199,7 +203,7 @@ abstract class AbstractJDBCHashMap(val ta bucket.add(entry) } } - log.trace { "Loaded ${size} entries on init for ${table.tableName} in ${elapsedMillis} millis." } + log.trace { "Loaded $size entries on init for ${table.tableName} in $elapsedMillis millis." } } } @@ -271,10 +275,7 @@ abstract class AbstractJDBCHashMap(val ta } override fun remove() { - val savedCurrent = current - if (savedCurrent == null) { - throw IllegalStateException("Not called next() yet or already removed.") - } + val savedCurrent = current ?: throw IllegalStateException("Not called next() yet or already removed.") current = null remove(savedCurrent.key) } @@ -381,9 +382,9 @@ abstract class AbstractJDBCHashMap(val ta } override fun get(key: K): V? { - for (entry in getBucket(key)) { - if (entry.key == key) { - return entry.value + for ((entryKey, value) in getBucket(key)) { + if (entryKey == key) { + return value } } return null @@ -412,14 +413,14 @@ abstract class AbstractJDBCHashMap(val ta * * See example implementations in [JDBCHashMap]. */ - protected abstract fun keyFromRow(it: ResultRow): K + protected abstract fun keyFromRow(row: ResultRow): K /** * Implementation should return the value object marshalled from the database table row. * * See example implementations in [JDBCHashMap]. */ - protected abstract fun valueFromRow(it: ResultRow): V + protected abstract fun valueFromRow(row: ResultRow): V /** * Implementation should marshall the key to the insert statement. @@ -429,7 +430,7 @@ abstract class AbstractJDBCHashMap(val ta * * See example implementations in [JDBCHashMap]. */ - protected abstract fun addKeyToInsert(it: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) + protected abstract fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) /** * Implementation should marshall the value to the insert statement. @@ -439,7 +440,7 @@ abstract class AbstractJDBCHashMap(val ta * * See example implementations in [JDBCHashMap]. */ - protected abstract fun addValueToInsert(it: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) + protected abstract fun addValueToInsert(insert: InsertStatement, entry: Map.Entry, finalizables: MutableList<() -> Unit>) private fun createEntry(it: ResultRow) = NotReallyMutableEntry(keyFromRow(it), valueFromRow(it), it[table.seqNo]) diff --git a/src/main/kotlin/com/r3corda/demos/api/NodeInterestRates.kt b/src/main/kotlin/com/r3corda/demos/api/NodeInterestRates.kt index c9d193293e..c4b78deb5d 100644 --- a/src/main/kotlin/com/r3corda/demos/api/NodeInterestRates.kt +++ b/src/main/kotlin/com/r3corda/demos/api/NodeInterestRates.kt @@ -17,11 +17,14 @@ import com.r3corda.core.transactions.WireTransaction import com.r3corda.core.utilities.ProgressTracker import com.r3corda.node.services.api.AcceptsFileUpload import com.r3corda.node.services.api.ServiceHubInternal +import com.r3corda.node.utilities.AbstractJDBCHashSet import com.r3corda.node.utilities.FiberBox -import com.r3corda.node.utilities.JDBCHashSet +import com.r3corda.node.utilities.JDBCHashedTable +import com.r3corda.node.utilities.localDate import com.r3corda.protocols.RatesFixProtocol.* -import com.r3corda.protocols.ServiceRequestMessage import com.r3corda.protocols.TwoPartyDealProtocol +import org.jetbrains.exposed.sql.ResultRow +import org.jetbrains.exposed.sql.statements.InsertStatement import java.io.InputStream import java.math.BigDecimal import java.security.KeyPair @@ -125,8 +128,26 @@ object NodeInterestRates { @ThreadSafe class Oracle(val identity: Party, private val signingKey: KeyPair, val clock: Clock) { + private object Table : JDBCHashedTable("demo_interest_rate_fixes") { + val name = varchar("index_name", length = 255) + val forDay = localDate("for_day") + val ofTenor = varchar("of_tenor", length = 16) + val value = decimal("value", scale = 20, precision = 16) + } + private class InnerState { - val fixes = JDBCHashSet("interest_rate_fixes") + val fixes = object : AbstractJDBCHashSet(Table) { + override fun elementFromRow(row: ResultRow): Fix { + return Fix(FixOf(row[table.name], row[table.forDay], Tenor(row[table.ofTenor])), row[table.value]) + } + + override fun addElementToInsert(insert: InsertStatement, entry: Fix, finalizables: MutableList<() -> Unit>) { + insert[table.name] = entry.of.name + insert[table.forDay] = entry.of.forDay + insert[table.ofTenor] = entry.of.ofTenor.name + insert[table.value] = entry.value + } + } var container: FixContainer = FixContainer(fixes) } private val mutex = FiberBox(InnerState())