mirror of
https://github.com/corda/corda.git
synced 2025-03-01 04:16:21 +00:00
CORDA-1208: MySQL notary service - store request signature
This commit is contained in:
parent
49146b1cb5
commit
3db40bb392
@ -27,7 +27,6 @@ abstract class MySQLNotaryService(
|
||||
/** Database table will be automatically created in dev mode */
|
||||
val devMode: Boolean) : TrustedAuthorityNotaryService() {
|
||||
|
||||
override val timeWindowChecker = TimeWindowChecker(services.clock)
|
||||
override val uniquenessProvider = MySQLUniquenessProvider(
|
||||
services.monitoringService.metrics,
|
||||
configuration
|
||||
|
@ -48,20 +48,26 @@ class MySQLUniquenessProvider(
|
||||
private val log = loggerFor<MySQLUniquenessProvider>()
|
||||
|
||||
// TODO: optimize table schema for InnoDB
|
||||
private val createTableStatement =
|
||||
"CREATE TABLE IF NOT EXISTS committed_states (" +
|
||||
"issue_tx_id BINARY(32) NOT NULL," +
|
||||
"issue_tx_output_id INT UNSIGNED NOT NULL," +
|
||||
"consuming_tx_id BINARY(32) NOT NULL," +
|
||||
"consuming_tx_input_id INT UNSIGNED NOT NULL," +
|
||||
"consuming_party_name TEXT NOT NULL," +
|
||||
// TODO: do we need to store the key? X500 name should be sufficient
|
||||
"consuming_party_key BLOB NOT NULL," +
|
||||
"commit_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP," +
|
||||
"CONSTRAINT id PRIMARY KEY (issue_tx_id, issue_tx_output_id)" +
|
||||
private val createCommittedStateTable =
|
||||
"CREATE TABLE IF NOT EXISTS notary_committed_states (" +
|
||||
"issue_transaction_id BINARY(32) NOT NULL," +
|
||||
"issue_transaction_output_id INT UNSIGNED NOT NULL," +
|
||||
"consuming_transaction_id BINARY(32) NOT NULL," +
|
||||
"CONSTRAINT id PRIMARY KEY (issue_transaction_id, issue_transaction_output_id)" +
|
||||
")"
|
||||
private val insertStatement = "INSERT INTO committed_states (issue_tx_id, issue_tx_output_id, consuming_tx_id, consuming_tx_input_id, consuming_party_name, consuming_party_key) VALUES (?, ?, ?, ?, ?, ?)"
|
||||
private val findStatement = "SELECT consuming_tx_id, consuming_tx_input_id, consuming_party_name, consuming_party_key FROM committed_states WHERE issue_tx_id = ? AND issue_tx_output_id = ?"
|
||||
private val insertStateStatement = "INSERT INTO notary_committed_states (issue_transaction_id, issue_transaction_output_id, consuming_transaction_id) VALUES (?, ?, ?)"
|
||||
private val findStatement = "SELECT consuming_transaction_id FROM notary_committed_states WHERE issue_transaction_id = ? AND issue_transaction_output_id = ?"
|
||||
|
||||
private val createRequestLogTable =
|
||||
"CREATE TABLE IF NOT EXISTS notary_request_log (" +
|
||||
"consuming_transaction_id BINARY(32) NOT NULL," +
|
||||
"requesting_party_name TEXT NOT NULL," +
|
||||
"request_signature BLOB NOT NULL," +
|
||||
"request_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP," +
|
||||
"request_id INT UNSIGNED NOT NULL AUTO_INCREMENT," +
|
||||
"CONSTRAINT rid PRIMARY KEY (request_id)" +
|
||||
")"
|
||||
private val insertRequestStatement = "INSERT INTO notary_request_log (consuming_transaction_id, requesting_party_name, request_signature) VALUES (?, ?, ?)"
|
||||
}
|
||||
|
||||
private val metricPrefix = MySQLUniquenessProvider::class.simpleName
|
||||
@ -99,9 +105,10 @@ class MySQLUniquenessProvider(
|
||||
}
|
||||
|
||||
fun createTable() {
|
||||
log.debug("Attempting to create DB table if it does not yet exist: $createTableStatement")
|
||||
log.debug("Attempting to create DB table if it does not yet exist: $createCommittedStateTable")
|
||||
connection.use {
|
||||
it.createStatement().execute(createTableStatement)
|
||||
it.createStatement().execute(createCommittedStateTable)
|
||||
it.createStatement().execute(createRequestLogTable)
|
||||
it.commit()
|
||||
}
|
||||
}
|
||||
@ -113,7 +120,7 @@ class MySQLUniquenessProvider(
|
||||
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
|
||||
val s = Stopwatch.createStarted()
|
||||
try {
|
||||
retryTransaction(CommitAll(states, txId, callerIdentity))
|
||||
retryTransaction(CommitAll(states, txId, callerIdentity, requestSignature))
|
||||
nrInputStates.update(states.size)
|
||||
} catch (e: BatchUpdateException) {
|
||||
log.info("Unable to commit input states, finding conflicts, txId: $txId", e)
|
||||
@ -152,19 +159,25 @@ class MySQLUniquenessProvider(
|
||||
fun run(conn: Connection)
|
||||
}
|
||||
|
||||
private class CommitAll(val states: List<StateRef>, val txId: SecureHash, val callerIdentity: Party) : RetryableTransaction {
|
||||
private class CommitAll(val states: List<StateRef>, val txId: SecureHash, val callerIdentity: Party, val requestSignature: NotarisationRequestSignature) : RetryableTransaction {
|
||||
override fun run(conn: Connection) {
|
||||
conn.prepareStatement(insertStatement).apply {
|
||||
states.forEachIndexed { index, stateRef ->
|
||||
conn.prepareStatement(insertRequestStatement).apply {
|
||||
setBytes(1, txId.bytes)
|
||||
setString(2, callerIdentity.name.toString())
|
||||
setBytes(3, requestSignature.serialize().bytes)
|
||||
execute()
|
||||
close()
|
||||
}
|
||||
// We commit here, since we want to make sure it doesn't get rolled back in case of a conflict
|
||||
// when committing inputs
|
||||
conn.commit()
|
||||
conn.prepareStatement(insertStateStatement).apply {
|
||||
states.forEach { stateRef ->
|
||||
// StateRef
|
||||
setBytes(1, stateRef.txhash.bytes)
|
||||
setInt(2, stateRef.index)
|
||||
// Consuming transaction
|
||||
setBytes(3, txId.bytes)
|
||||
setInt(4, index)
|
||||
setString(5, callerIdentity.name.toString())
|
||||
setBytes(6, callerIdentity.owningKey.serialize().bytes)
|
||||
|
||||
addBatch()
|
||||
clearParameters()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user