mirror of
https://github.com/corda/corda.git
synced 2024-12-24 07:06:44 +00:00
Clean up compiler warning and make database table names and columns more meaningful.
This commit is contained in:
parent
7a440d890d
commit
50e613bb75
@ -4,7 +4,9 @@ import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.crypto.generateKeyPair
|
||||
import com.r3corda.core.node.services.KeyManagementService
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.node.utilities.JDBCHashMap
|
||||
import com.r3corda.node.utilities.*
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import java.security.KeyPair
|
||||
import java.security.PrivateKey
|
||||
import java.security.PublicKey
|
||||
@ -18,8 +20,26 @@ import java.util.*
|
||||
* This class needs database transactions to be in-flight during method calls and init.
|
||||
*/
|
||||
class PersistentKeyManagementService(initialKeys: Set<KeyPair>) : SingletonSerializeAsToken(), KeyManagementService {
|
||||
|
||||
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}our_key_pairs") {
|
||||
val publicKey = publicKey("public_key")
|
||||
val privateKey = blob("private_key")
|
||||
}
|
||||
|
||||
private class InnerState {
|
||||
val keys = JDBCHashMap<PublicKey, PrivateKey>("key_pairs", loadOnInit = false)
|
||||
val keys = object : AbstractJDBCHashMap<PublicKey, PrivateKey, Table>(Table, loadOnInit = false) {
|
||||
override fun keyFromRow(row: ResultRow): PublicKey = row[table.publicKey]
|
||||
|
||||
override fun valueFromRow(row: ResultRow): PrivateKey = deserializeFromBlob(row[table.privateKey])
|
||||
|
||||
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<PublicKey, PrivateKey>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.publicKey] = entry.key
|
||||
}
|
||||
|
||||
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<PublicKey, PrivateKey>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.privateKey] = serializeToBlob(entry.value, finalizables)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
|
@ -9,11 +9,12 @@ import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import com.r3corda.node.utilities.JDBCHashSet
|
||||
import com.r3corda.node.utilities.*
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import java.nio.file.FileSystems
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
@ -102,8 +103,19 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
|
||||
private val state = ThreadBox(InnerState())
|
||||
private val handlers = CopyOnWriteArrayList<Handler>()
|
||||
|
||||
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}message_ids") {
|
||||
val uuid = uuidString("message_id")
|
||||
}
|
||||
|
||||
private val processedMessages: MutableSet<UUID> = Collections.synchronizedSet(if (persistentInbox) {
|
||||
JDBCHashSet<UUID>("message_id", loadOnInit = true)
|
||||
object : AbstractJDBCHashSet<UUID, Table>(Table, loadOnInit = true) {
|
||||
override fun elementFromRow(row: ResultRow): UUID = row[table.uuid]
|
||||
|
||||
override fun addElementToInsert(insert: InsertStatement, entry: UUID, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.uuid] = entry
|
||||
}
|
||||
}
|
||||
} else {
|
||||
HashSet<UUID>()
|
||||
})
|
||||
|
@ -4,8 +4,10 @@ import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
import com.r3corda.node.utilities.JDBCHashMap
|
||||
import java.util.*
|
||||
import com.r3corda.node.utilities.*
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import java.util.Collections.synchronizedMap
|
||||
|
||||
/**
|
||||
* A network map service backed by a database to survive restarts of the node hosting it.
|
||||
@ -16,10 +18,27 @@ import java.util.*
|
||||
* exceptions.
|
||||
*/
|
||||
class PersistentNetworkMapService(services: ServiceHubInternal) : AbstractNetworkMapService(services) {
|
||||
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}network_map_nodes") {
|
||||
val nodeParty = party("node_party_name", "node_party_key")
|
||||
val registrationInfo = blob("node_registration_info")
|
||||
}
|
||||
|
||||
override val registeredNodes: MutableMap<Party, NodeRegistrationInfo> = Collections.synchronizedMap(JDBCHashMap("network_map_nodes", loadOnInit = true))
|
||||
override val registeredNodes: MutableMap<Party, NodeRegistrationInfo> = synchronizedMap(object : AbstractJDBCHashMap<Party, NodeRegistrationInfo, Table>(Table, loadOnInit = true) {
|
||||
override fun keyFromRow(row: ResultRow): Party = Party(row[table.nodeParty.name], row[table.nodeParty.owningKey])
|
||||
|
||||
override val subscribers = ThreadBox(JDBCHashMap<SingleMessageRecipient, LastAcknowledgeInfo>("network_map_subscribers", loadOnInit = true))
|
||||
override fun valueFromRow(row: ResultRow): NodeRegistrationInfo = deserializeFromBlob(row[table.registrationInfo])
|
||||
|
||||
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<Party, NodeRegistrationInfo>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.nodeParty.name] = entry.key.name
|
||||
insert[table.nodeParty.owningKey] = entry.key.owningKey
|
||||
}
|
||||
|
||||
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<Party, NodeRegistrationInfo>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.registrationInfo] = serializeToBlob(entry.value, finalizables)
|
||||
}
|
||||
})
|
||||
|
||||
override val subscribers = ThreadBox(JDBCHashMap<SingleMessageRecipient, LastAcknowledgeInfo>("${NODE_DATABASE_PREFIX}network_map_subscribers", loadOnInit = true))
|
||||
|
||||
init {
|
||||
// Initialise the network map version with the current highest persisted version, or zero if there are no entries.
|
||||
|
@ -6,14 +6,36 @@ import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import com.r3corda.node.services.api.Checkpoint
|
||||
import com.r3corda.node.services.api.CheckpointStorage
|
||||
import com.r3corda.node.utilities.JDBCHashMap
|
||||
import com.r3corda.node.utilities.*
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import java.util.Collections.synchronizedMap
|
||||
|
||||
/**
|
||||
* Simple checkpoint key value storage in DB using the underlying JDBCHashMap and transactional context of the call sites.
|
||||
*/
|
||||
class DBCheckpointStorage : CheckpointStorage {
|
||||
private val checkpointStorage = synchronizedMap(JDBCHashMap<SecureHash, SerializedBytes<Checkpoint>>("checkpoints", loadOnInit = false))
|
||||
|
||||
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}checkpoints") {
|
||||
val checkpointId = secureHash("checkpoint_id")
|
||||
val checkpoint = blob("checkpoint")
|
||||
}
|
||||
|
||||
private class CheckpointMap : AbstractJDBCHashMap<SecureHash, SerializedBytes<Checkpoint>, Table>(Table, loadOnInit = false) {
|
||||
override fun keyFromRow(row: ResultRow): SecureHash = row[table.checkpointId]
|
||||
|
||||
override fun valueFromRow(row: ResultRow): SerializedBytes<Checkpoint> = bytesFromBlob(row[table.checkpoint])
|
||||
|
||||
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<SecureHash, SerializedBytes<Checkpoint>>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.checkpointId] = entry.key
|
||||
}
|
||||
|
||||
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<SecureHash, SerializedBytes<Checkpoint>>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.checkpoint] = bytesToBlob(entry.value, finalizables)
|
||||
}
|
||||
}
|
||||
|
||||
private val checkpointStorage = synchronizedMap(CheckpointMap())
|
||||
|
||||
override fun addCheckpoint(checkpoint: Checkpoint) {
|
||||
checkpointStorage.put(checkpoint.id, checkpoint.serialize())
|
||||
|
@ -49,24 +49,22 @@ class HibernateObserver(services: ServiceHubInternal) {
|
||||
// TODO: replace auto schema generation as it isn't intended for production use, according to Hibernate docs.
|
||||
val config = Configuration().setProperty("hibernate.connection.provider_class", NodeDatabaseConnectionProvider::class.java.name)
|
||||
.setProperty("hibernate.hbm2ddl.auto", "update")
|
||||
.setProperty("hibernate.show_sql", "true")
|
||||
.setProperty("hibernate_format_sql", "true")
|
||||
.setProperty("hibernate.show_sql", "false")
|
||||
.setProperty("hibernate.format_sql", "true")
|
||||
val options = schemaService.schemaOptions[schema]
|
||||
val databaseSchema = options?.databaseSchema
|
||||
if (databaseSchema != null) {
|
||||
logger.debug { "Database schema = $databaseSchema" }
|
||||
config.setProperty("hibernate.default_schema", databaseSchema)
|
||||
}
|
||||
val tablePrefix = options?.tablePrefix
|
||||
if (tablePrefix != null) {
|
||||
logger.debug { "Table prefix = $tablePrefix" }
|
||||
config.setPhysicalNamingStrategy(object : PhysicalNamingStrategyStandardImpl() {
|
||||
override fun toPhysicalTableName(name: Identifier?, context: JdbcEnvironment?): Identifier {
|
||||
val default = super.toPhysicalTableName(name, context)
|
||||
return Identifier.toIdentifier(tablePrefix + default.text, default.isQuoted)
|
||||
}
|
||||
})
|
||||
}
|
||||
val tablePrefix = options?.tablePrefix ?: "contract_" // We always have this as the default for aesthetic reasons.
|
||||
logger.debug { "Table prefix = $tablePrefix" }
|
||||
config.setPhysicalNamingStrategy(object : PhysicalNamingStrategyStandardImpl() {
|
||||
override fun toPhysicalTableName(name: Identifier?, context: JdbcEnvironment?): Identifier {
|
||||
val default = super.toPhysicalTableName(name, context)
|
||||
return Identifier.toIdentifier(tablePrefix + default.text, default.isQuoted)
|
||||
}
|
||||
})
|
||||
schema.mappedTypes.forEach { config.addAnnotatedClass(it) }
|
||||
val sessionFactory = config.buildSessionFactory()
|
||||
logger.info("Created session factory for schema $schema")
|
||||
|
@ -8,7 +8,9 @@ import com.r3corda.core.node.services.UniquenessException
|
||||
import com.r3corda.core.node.services.UniquenessProvider
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.utilities.JDBCHashMap
|
||||
import com.r3corda.node.utilities.*
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
@ -16,14 +18,45 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
@ThreadSafe
|
||||
class PersistentUniquenessProvider() : UniquenessProvider, SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
private val TABLE_NAME = "notary_commit_log"
|
||||
private val TABLE_NAME = "${NODE_DATABASE_PREFIX}notary_commit_log"
|
||||
private val log = loggerFor<PersistentUniquenessProvider>()
|
||||
}
|
||||
|
||||
/**
|
||||
* For each input state store the consuming transaction information.
|
||||
*/
|
||||
val committedStates = ThreadBox(JDBCHashMap<StateRef, UniquenessProvider.ConsumingTx>(TABLE_NAME, loadOnInit = false))
|
||||
private object Table : JDBCHashedTable(TABLE_NAME) {
|
||||
val output = stateRef("transaction_id", "output_index")
|
||||
val consumingTxHash = secureHash("consuming_transaction_id")
|
||||
val consumingIndex = integer("consuming_input_index")
|
||||
val requestingParty = party("requesting_party_name", "requesting_party_key")
|
||||
}
|
||||
|
||||
private val committedStates = ThreadBox(object : AbstractJDBCHashMap<StateRef, UniquenessProvider.ConsumingTx, Table>(Table, loadOnInit = false) {
|
||||
override fun keyFromRow(row: ResultRow): StateRef = StateRef(row[table.output.txId], row[table.output.index])
|
||||
|
||||
override fun valueFromRow(row: ResultRow): UniquenessProvider.ConsumingTx = UniquenessProvider.ConsumingTx(
|
||||
row[table.consumingTxHash],
|
||||
row[table.consumingIndex],
|
||||
Party(row[table.requestingParty.name], row[table.requestingParty.owningKey])
|
||||
)
|
||||
|
||||
override fun addKeyToInsert(insert: InsertStatement,
|
||||
entry: Map.Entry<StateRef, UniquenessProvider.ConsumingTx>,
|
||||
finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.output.txId] = entry.key.txhash
|
||||
insert[table.output.index] = entry.key.index
|
||||
}
|
||||
|
||||
override fun addValueToInsert(insert: InsertStatement,
|
||||
entry: Map.Entry<StateRef, UniquenessProvider.ConsumingTx>,
|
||||
finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.consumingTxHash] = entry.value.id
|
||||
insert[table.consumingIndex] = entry.value.inputIndex
|
||||
insert[table.requestingParty.name] = entry.value.requestingParty.name
|
||||
insert[table.requestingParty.owningKey] = entry.value.requestingParty.owningKey
|
||||
}
|
||||
})
|
||||
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||
val conflict = committedStates.locked {
|
||||
|
@ -4,18 +4,17 @@ import com.google.common.collect.Sets
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.bufferUntilSubscribed
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.node.services.Vault
|
||||
import com.r3corda.core.node.services.VaultService
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.serialization.parseAsHex
|
||||
import com.r3corda.core.serialization.toHexString
|
||||
import com.r3corda.core.transactions.WireTransaction
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.utilities.AbstractJDBCHashSet
|
||||
import com.r3corda.node.utilities.JDBCHashedTable
|
||||
import com.r3corda.node.utilities.NODE_DATABASE_PREFIX
|
||||
import com.r3corda.node.utilities.stateRef
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import rx.Observable
|
||||
@ -39,20 +38,20 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
|
||||
val log = loggerFor<NodeVaultService>()
|
||||
}
|
||||
|
||||
private object StatesSetTable : JDBCHashedTable("vault_unconsumed_states") {
|
||||
val txhash = varchar("transaction_id", 64)
|
||||
val index = integer("output_index")
|
||||
private object StatesSetTable : JDBCHashedTable("${NODE_DATABASE_PREFIX}vault_unconsumed_states") {
|
||||
val stateRef = stateRef("transaction_id", "output_index")
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(object {
|
||||
val unconsumedStates = object : AbstractJDBCHashSet<StateRef, StatesSetTable>(StatesSetTable) {
|
||||
override fun elementFromRow(it: ResultRow): StateRef = StateRef(SecureHash.SHA256(it[table.txhash].parseAsHex()), it[table.index])
|
||||
override fun elementFromRow(row: ResultRow): StateRef = StateRef(row[table.stateRef.txId], row[table.stateRef.index])
|
||||
|
||||
override fun addElementToInsert(it: InsertStatement, entry: StateRef, finalizables: MutableList<() -> Unit>) {
|
||||
it[table.txhash] = entry.txhash.bits.toHexString()
|
||||
it[table.index] = entry.index
|
||||
override fun addElementToInsert(insert: InsertStatement, entry: StateRef, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.stateRef.txId] = entry.txhash
|
||||
insert[table.stateRef.index] = entry.index
|
||||
}
|
||||
}
|
||||
|
||||
val _updatesPublisher = PublishSubject.create<Vault.Update>()
|
||||
|
||||
fun allUnconsumedStates(): Iterable<StateAndRef<ContractState>> {
|
||||
|
@ -1,16 +1,27 @@
|
||||
package com.r3corda.node.utilities
|
||||
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.crypto.parsePublicKeyBase58
|
||||
import com.r3corda.core.crypto.toBase58String
|
||||
import com.zaxxer.hikari.HikariConfig
|
||||
import com.zaxxer.hikari.HikariDataSource
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Transaction
|
||||
import org.jetbrains.exposed.sql.*
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionInterface
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import java.io.Closeable
|
||||
import java.security.PublicKey
|
||||
import java.sql.Connection
|
||||
import java.time.Instant
|
||||
import java.time.LocalDate
|
||||
import java.time.ZoneOffset
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* Table prefix for all tables owned by the node module.
|
||||
*/
|
||||
const val NODE_DATABASE_PREFIX = "node_"
|
||||
|
||||
// TODO: Handle commit failure due to database unavailable. Better to shutdown and await database reconnect/recovery.
|
||||
fun <T> databaseTransaction(db: Database, statement: Transaction.() -> T): T {
|
||||
// We need to set the database for the current [Thread] or [Fiber] here as some tests share threads across databases.
|
||||
@ -121,6 +132,82 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan
|
||||
connection.close()
|
||||
threadLocal.set(outerTransaction)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Composite columns for use with below Exposed helpers.
|
||||
data class PartyColumns(val name: Column<String>, val owningKey: Column<PublicKey>)
|
||||
data class StateRefColumns(val txId: Column<SecureHash>, val index: Column<Int>)
|
||||
|
||||
/**
|
||||
* [Table] column helpers for use with Exposed, as per [varchar] etc.
|
||||
*/
|
||||
fun Table.publicKey(name: String) = this.registerColumn<PublicKey>(name, PublicKeyColumnType)
|
||||
fun Table.secureHash(name: String) = this.registerColumn<SecureHash>(name, SecureHashColumnType)
|
||||
fun Table.party(nameColumnName: String, keyColumnName: String) = PartyColumns(this.varchar(nameColumnName, length = 255), this.publicKey(keyColumnName))
|
||||
fun Table.uuidString(name: String) = this.registerColumn<UUID>(name, UUIDStringColumnType)
|
||||
fun Table.localDate(name: String) = this.registerColumn<LocalDate>(name, LocalDateColumnType)
|
||||
fun Table.stateRef(txIdColumnName: String, indexColumnName: String) = StateRefColumns(this.secureHash(txIdColumnName), this.integer(indexColumnName))
|
||||
|
||||
/**
|
||||
* [ColumnType] for marshalling to/from database on behalf of [PublicKey].
|
||||
*/
|
||||
object PublicKeyColumnType : ColumnType() {
|
||||
override fun sqlType(): String = "VARCHAR(255)"
|
||||
|
||||
override fun valueFromDB(value: Any): Any = parsePublicKeyBase58(value.toString())
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = if (value is PublicKey) value.toBase58String() else value
|
||||
}
|
||||
|
||||
/**
|
||||
* [ColumnType] for marshalling to/from database on behalf of [SecureHash].
|
||||
*/
|
||||
object SecureHashColumnType : ColumnType() {
|
||||
override fun sqlType(): String = "VARCHAR(64)"
|
||||
|
||||
override fun valueFromDB(value: Any): Any = SecureHash.parse(value.toString())
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = if (value is SecureHash) value.toString() else value
|
||||
}
|
||||
|
||||
/**
|
||||
* [ColumnType] for marshalling to/from database on behalf of [UUID], always using a string representation.
|
||||
*/
|
||||
object UUIDStringColumnType : ColumnType() {
|
||||
override fun sqlType(): String = "VARCHAR(36)"
|
||||
|
||||
override fun valueFromDB(value: Any): Any = UUID.fromString(value.toString())
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = if (value is UUID) value.toString() else value
|
||||
}
|
||||
|
||||
/**
|
||||
* [ColumnType] for marshalling to/from database on behalf of [java.time.LocalDate].
|
||||
*/
|
||||
object LocalDateColumnType : ColumnType() {
|
||||
override fun sqlType(): String = "DATE"
|
||||
|
||||
override fun nonNullValueToString(value: Any): String {
|
||||
if (value is String) return value
|
||||
|
||||
val localDate = when (value) {
|
||||
is LocalDate -> value
|
||||
is java.sql.Date -> value.toLocalDate()
|
||||
is java.sql.Timestamp -> value.toLocalDateTime().toLocalDate()
|
||||
else -> error("Unexpected value: $value")
|
||||
}
|
||||
return "'$localDate'"
|
||||
}
|
||||
|
||||
override fun valueFromDB(value: Any): Any = when (value) {
|
||||
is java.sql.Date -> value.toLocalDate()
|
||||
is java.sql.Timestamp -> value.toLocalDateTime().toLocalDate()
|
||||
is Long -> LocalDate.from(Instant.ofEpochMilli(value))
|
||||
else -> value
|
||||
}
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = if (value is LocalDate) {
|
||||
java.sql.Date(value.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli())
|
||||
} else value
|
||||
}
|
@ -1,5 +1,6 @@
|
||||
package com.r3corda.node.utilities
|
||||
|
||||
import com.r3corda.core.serialization.SerializedBytes
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
@ -32,35 +33,38 @@ class JDBCHashMap<K : Any, V : Any>(tableName: String, loadOnInit: Boolean = fal
|
||||
val value = blob("value")
|
||||
}
|
||||
|
||||
override fun keyFromRow(it: ResultRow): K = deserializeFromBlob(it[table.key])
|
||||
override fun valueFromRow(it: ResultRow): V = deserializeFromBlob(it[table.value])
|
||||
override fun keyFromRow(row: ResultRow): K = deserializeFromBlob(row[table.key])
|
||||
override fun valueFromRow(row: ResultRow): V = deserializeFromBlob(row[table.value])
|
||||
|
||||
override fun addKeyToInsert(it: InsertStatement, entry: Map.Entry<K, V>, finalizables: MutableList<() -> Unit>) {
|
||||
it[table.key] = serializeToBlob(entry.key, finalizables)
|
||||
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<K, V>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.key] = serializeToBlob(entry.key, finalizables)
|
||||
}
|
||||
|
||||
override fun addValueToInsert(it: InsertStatement, entry: Map.Entry<K, V>, finalizables: MutableList<() -> Unit>) {
|
||||
it[table.value] = serializeToBlob(entry.value, finalizables)
|
||||
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<K, V>, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.value] = serializeToBlob(entry.value, finalizables)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fun <T: Any> serializeToBlob(value: T, finalizables: MutableList<() -> Unit>): Blob {
|
||||
val blob = TransactionManager.currentOrNull()!!.connection.createBlob()
|
||||
fun bytesToBlob(value: SerializedBytes<*>, finalizables: MutableList<() -> Unit>): Blob {
|
||||
val blob = TransactionManager.current().connection.createBlob()
|
||||
finalizables += { blob.free() }
|
||||
blob.setBytes(1, value.serialize().bits)
|
||||
blob.setBytes(1, value.bits)
|
||||
return blob
|
||||
}
|
||||
|
||||
fun <T : Any> deserializeFromBlob(blob: Blob): T {
|
||||
fun serializeToBlob(value: Any, finalizables: MutableList<() -> Unit>): Blob = bytesToBlob(value.serialize(), finalizables)
|
||||
|
||||
fun <T: Any> bytesFromBlob(blob: Blob): SerializedBytes<T> {
|
||||
try {
|
||||
val bytes = blob.getBytes(0, blob.length().toInt())
|
||||
return bytes.deserialize()
|
||||
return SerializedBytes(blob.getBytes(0, blob.length().toInt()))
|
||||
} finally {
|
||||
blob.free()
|
||||
}
|
||||
}
|
||||
|
||||
fun <T : Any> deserializeFromBlob(blob: Blob): T = bytesFromBlob<T>(blob).deserialize()
|
||||
|
||||
/**
|
||||
* A convenient JDBC table backed hash set with iteration order based on insertion order.
|
||||
* See [AbstractJDBCHashSet] and [AbstractJDBCHashMap] for further implementation details.
|
||||
@ -75,10 +79,10 @@ class JDBCHashSet<K : Any>(tableName: String, loadOnInit: Boolean = false) : Abs
|
||||
val key = blob("key")
|
||||
}
|
||||
|
||||
override fun elementFromRow(it: ResultRow): K = deserializeFromBlob(it[table.key])
|
||||
override fun elementFromRow(row: ResultRow): K = deserializeFromBlob(row[table.key])
|
||||
|
||||
override fun addElementToInsert(it: InsertStatement, entry: K, finalizables: MutableList<() -> Unit>) {
|
||||
it[table.key] = serializeToBlob(entry, finalizables)
|
||||
override fun addElementToInsert(insert: InsertStatement, entry: K, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.key] = serializeToBlob(entry, finalizables)
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,18 +92,18 @@ class JDBCHashSet<K : Any>(tableName: String, loadOnInit: Boolean = false) : Abs
|
||||
*
|
||||
* See [AbstractJDBCHashMap] for implementation details.
|
||||
*/
|
||||
abstract class AbstractJDBCHashSet<K : Any, T : JDBCHashedTable>(protected val table: T, loadOnInit: Boolean = false) : MutableSet<K>, AbstractSet<K>() {
|
||||
abstract class AbstractJDBCHashSet<K : Any, out T : JDBCHashedTable>(protected val table: T, loadOnInit: Boolean = false) : MutableSet<K>, AbstractSet<K>() {
|
||||
protected val innerMap = object : AbstractJDBCHashMap<K, Unit, T>(table, loadOnInit) {
|
||||
override fun keyFromRow(it: ResultRow): K = this@AbstractJDBCHashSet.elementFromRow(it)
|
||||
override fun keyFromRow(row: ResultRow): K = this@AbstractJDBCHashSet.elementFromRow(row)
|
||||
|
||||
// Return constant.
|
||||
override fun valueFromRow(it: ResultRow) = Unit
|
||||
override fun valueFromRow(row: ResultRow) = Unit
|
||||
|
||||
override fun addKeyToInsert(it: InsertStatement, entry: Map.Entry<K, Unit>, finalizables: MutableList<() -> Unit>) =
|
||||
this@AbstractJDBCHashSet.addElementToInsert(it, entry.key, finalizables)
|
||||
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<K, Unit>, finalizables: MutableList<() -> Unit>) =
|
||||
this@AbstractJDBCHashSet.addElementToInsert(insert, entry.key, finalizables)
|
||||
|
||||
// No op as not actually persisted.
|
||||
override fun addValueToInsert(it: InsertStatement, entry: Map.Entry<K, Unit>, finalizables: MutableList<() -> Unit>) {
|
||||
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<K, Unit>, finalizables: MutableList<() -> Unit>) {
|
||||
}
|
||||
|
||||
}
|
||||
@ -133,7 +137,7 @@ abstract class AbstractJDBCHashSet<K : Any, T : JDBCHashedTable>(protected val t
|
||||
*
|
||||
* See example implementations in [JDBCHashSet].
|
||||
*/
|
||||
protected abstract fun elementFromRow(it: ResultRow): K
|
||||
protected abstract fun elementFromRow(row: ResultRow): K
|
||||
|
||||
/**
|
||||
* Implementation should marshall the element to the insert statement.
|
||||
@ -143,7 +147,7 @@ abstract class AbstractJDBCHashSet<K : Any, T : JDBCHashedTable>(protected val t
|
||||
*
|
||||
* See example implementations in [JDBCHashSet].
|
||||
*/
|
||||
protected abstract fun addElementToInsert(it: InsertStatement, entry: K, finalizables: MutableList<() -> Unit>)
|
||||
protected abstract fun addElementToInsert(insert: InsertStatement, entry: K, finalizables: MutableList<() -> Unit>)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -177,7 +181,7 @@ abstract class AbstractJDBCHashSet<K : Any, T : JDBCHashedTable>(protected val t
|
||||
* TODO: if iterators are used extensively when loadOnInit=true, consider maintaining a collection of keys in iteration order to avoid sorting each time.
|
||||
* TODO: revisit whether we need the loadOnInit==true functionality and remove if not.
|
||||
*/
|
||||
abstract class AbstractJDBCHashMap<K : Any, V : Any, T : JDBCHashedTable>(val table: T, val loadOnInit: Boolean = false) : MutableMap<K, V>, AbstractMap<K,V>() {
|
||||
abstract class AbstractJDBCHashMap<K : Any, V : Any, out T : JDBCHashedTable>(val table: T, val loadOnInit: Boolean = false) : MutableMap<K, V>, AbstractMap<K,V>() {
|
||||
|
||||
companion object {
|
||||
protected val log = loggerFor<AbstractJDBCHashMap<*,*,*>>()
|
||||
@ -199,7 +203,7 @@ abstract class AbstractJDBCHashMap<K : Any, V : Any, T : JDBCHashedTable>(val ta
|
||||
bucket.add(entry)
|
||||
}
|
||||
}
|
||||
log.trace { "Loaded ${size} entries on init for ${table.tableName} in ${elapsedMillis} millis." }
|
||||
log.trace { "Loaded $size entries on init for ${table.tableName} in $elapsedMillis millis." }
|
||||
}
|
||||
}
|
||||
|
||||
@ -271,10 +275,7 @@ abstract class AbstractJDBCHashMap<K : Any, V : Any, T : JDBCHashedTable>(val ta
|
||||
}
|
||||
|
||||
override fun remove() {
|
||||
val savedCurrent = current
|
||||
if (savedCurrent == null) {
|
||||
throw IllegalStateException("Not called next() yet or already removed.")
|
||||
}
|
||||
val savedCurrent = current ?: throw IllegalStateException("Not called next() yet or already removed.")
|
||||
current = null
|
||||
remove(savedCurrent.key)
|
||||
}
|
||||
@ -381,9 +382,9 @@ abstract class AbstractJDBCHashMap<K : Any, V : Any, T : JDBCHashedTable>(val ta
|
||||
}
|
||||
|
||||
override fun get(key: K): V? {
|
||||
for (entry in getBucket(key)) {
|
||||
if (entry.key == key) {
|
||||
return entry.value
|
||||
for ((entryKey, value) in getBucket(key)) {
|
||||
if (entryKey == key) {
|
||||
return value
|
||||
}
|
||||
}
|
||||
return null
|
||||
@ -412,14 +413,14 @@ abstract class AbstractJDBCHashMap<K : Any, V : Any, T : JDBCHashedTable>(val ta
|
||||
*
|
||||
* See example implementations in [JDBCHashMap].
|
||||
*/
|
||||
protected abstract fun keyFromRow(it: ResultRow): K
|
||||
protected abstract fun keyFromRow(row: ResultRow): K
|
||||
|
||||
/**
|
||||
* Implementation should return the value object marshalled from the database table row.
|
||||
*
|
||||
* See example implementations in [JDBCHashMap].
|
||||
*/
|
||||
protected abstract fun valueFromRow(it: ResultRow): V
|
||||
protected abstract fun valueFromRow(row: ResultRow): V
|
||||
|
||||
/**
|
||||
* Implementation should marshall the key to the insert statement.
|
||||
@ -429,7 +430,7 @@ abstract class AbstractJDBCHashMap<K : Any, V : Any, T : JDBCHashedTable>(val ta
|
||||
*
|
||||
* See example implementations in [JDBCHashMap].
|
||||
*/
|
||||
protected abstract fun addKeyToInsert(it: InsertStatement, entry: Map.Entry<K, V>, finalizables: MutableList<() -> Unit>)
|
||||
protected abstract fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<K, V>, finalizables: MutableList<() -> Unit>)
|
||||
|
||||
/**
|
||||
* Implementation should marshall the value to the insert statement.
|
||||
@ -439,7 +440,7 @@ abstract class AbstractJDBCHashMap<K : Any, V : Any, T : JDBCHashedTable>(val ta
|
||||
*
|
||||
* See example implementations in [JDBCHashMap].
|
||||
*/
|
||||
protected abstract fun addValueToInsert(it: InsertStatement, entry: Map.Entry<K, V>, finalizables: MutableList<() -> Unit>)
|
||||
protected abstract fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<K, V>, finalizables: MutableList<() -> Unit>)
|
||||
|
||||
private fun createEntry(it: ResultRow) = NotReallyMutableEntry<K, V>(keyFromRow(it), valueFromRow(it), it[table.seqNo])
|
||||
|
||||
|
@ -17,11 +17,14 @@ import com.r3corda.core.transactions.WireTransaction
|
||||
import com.r3corda.core.utilities.ProgressTracker
|
||||
import com.r3corda.node.services.api.AcceptsFileUpload
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
import com.r3corda.node.utilities.AbstractJDBCHashSet
|
||||
import com.r3corda.node.utilities.FiberBox
|
||||
import com.r3corda.node.utilities.JDBCHashSet
|
||||
import com.r3corda.node.utilities.JDBCHashedTable
|
||||
import com.r3corda.node.utilities.localDate
|
||||
import com.r3corda.protocols.RatesFixProtocol.*
|
||||
import com.r3corda.protocols.ServiceRequestMessage
|
||||
import com.r3corda.protocols.TwoPartyDealProtocol
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import java.io.InputStream
|
||||
import java.math.BigDecimal
|
||||
import java.security.KeyPair
|
||||
@ -125,8 +128,26 @@ object NodeInterestRates {
|
||||
@ThreadSafe
|
||||
class Oracle(val identity: Party, private val signingKey: KeyPair, val clock: Clock) {
|
||||
|
||||
private object Table : JDBCHashedTable("demo_interest_rate_fixes") {
|
||||
val name = varchar("index_name", length = 255)
|
||||
val forDay = localDate("for_day")
|
||||
val ofTenor = varchar("of_tenor", length = 16)
|
||||
val value = decimal("value", scale = 20, precision = 16)
|
||||
}
|
||||
|
||||
private class InnerState {
|
||||
val fixes = JDBCHashSet<Fix>("interest_rate_fixes")
|
||||
val fixes = object : AbstractJDBCHashSet<Fix, Table>(Table) {
|
||||
override fun elementFromRow(row: ResultRow): Fix {
|
||||
return Fix(FixOf(row[table.name], row[table.forDay], Tenor(row[table.ofTenor])), row[table.value])
|
||||
}
|
||||
|
||||
override fun addElementToInsert(insert: InsertStatement, entry: Fix, finalizables: MutableList<() -> Unit>) {
|
||||
insert[table.name] = entry.of.name
|
||||
insert[table.forDay] = entry.of.forDay
|
||||
insert[table.ofTenor] = entry.of.ofTenor.name
|
||||
insert[table.value] = entry.value
|
||||
}
|
||||
}
|
||||
var container: FixContainer = FixContainer(fixes)
|
||||
}
|
||||
private val mutex = FiberBox(InnerState())
|
||||
|
Loading…
Reference in New Issue
Block a user