ENT-4610 Move tx signing to the Uniqueness provider (#5773)

* ENT-4610 Move tx signing to the Uniqueness provider

* Make detekt happy

* Remove unused imports

* Address review comment
This commit is contained in:
Christian Sailer 2019-11-29 17:30:33 +00:00 committed by Matthew Nesbit
parent 81a60377fa
commit 06f97cfed5
11 changed files with 146 additions and 48 deletions

View File

@ -4,12 +4,15 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.MIN_PLATFORM_VERSION_FOR_BACKPRESSURE_MESSAGE
import net.corda.core.internal.checkParameterHash
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import java.lang.IllegalStateException
import java.time.Duration
/**
@ -25,7 +28,7 @@ import java.time.Duration
* @param etaThreshold If the ETA for processing the request, according to the service, is greater than this, notify the client.
*/
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService, private val etaThreshold: Duration) : FlowLogic<Void?>() {
abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service: SinglePartyNotaryService, private val etaThreshold: Duration) : FlowLogic<Void?>(), IdempotentFlow {
companion object {
// TODO: Determine an appropriate limit and also enforce in the network parameters and the transaction builder.
private const val maxAllowedInputsAndReferences = 10_000
@ -47,7 +50,7 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
override fun call(): Void? {
val requestPayload = otherSideSession.receive<NotarisationPayload>().unwrap { it }
try {
val commitStatus = try {
val tx: TransactionParts = validateRequest(requestPayload)
val request = NotarisationRequest(tx.inputs, tx.id)
validateRequestSignature(request, requestPayload.requestSignature)
@ -73,7 +76,13 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
throw NotaryException(e.error, transactionId)
}
signTransactionAndSendResponse(transactionId!!)
if (commitStatus is UniquenessProvider.Result.Success) {
sendSignedResponse(transactionId!!, commitStatus.signature)
}
else {
val error = IllegalStateException("Request that failed uniqueness reached signing code! Ignoring.")
throw NotaryException(NotaryError.General(error))
}
return null
}
@ -124,8 +133,7 @@ abstract class NotaryServiceFlow(val otherSideSession: FlowSession, val service:
abstract fun verifyTransaction(requestPayload: NotarisationPayload)
@Suspendable
private fun signTransactionAndSendResponse(txId: SecureHash) {
val signature = service.signTransaction(txId)
private fun sendSignedResponse(txId: SecureHash, signature: TransactionSignature) {
logger.info("Transaction [$txId] successfully notarised, sending signature back to [${otherSideSession.counterparty.name}]")
otherSideSession.send(NotarisationResponse(listOf(signature)))
}

View File

@ -4,7 +4,11 @@ import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.flows.*
import net.corda.core.flows.NotarisationRequest
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotarisationResponse
import net.corda.core.flows.NotaryError
import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.serialize

View File

@ -40,7 +40,7 @@ abstract class SinglePartyNotaryService : NotaryService() {
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?,
references: List<StateRef>
) {
): Result {
// TODO: Log the request here. Benchmarking shows that logging is expensive and we might get better performance
// when we concurrently log requests here as part of the flows, instead of logging sequentially in the
// `UniquenessProvider`.
@ -60,9 +60,11 @@ abstract class SinglePartyNotaryService : NotaryService() {
)
)
if (result is UniquenessProvider.Result.Failure) {
if (result is Result.Failure) {
throw NotaryInternalException(result.error)
}
return result
}
/**
@ -97,4 +99,5 @@ abstract class SinglePartyNotaryService : NotaryService() {
val signableData = SignableData(txId, SignatureMetadata(services.myInfo.platformVersion, Crypto.findSignatureScheme(notaryIdentityKey).schemeNumberID))
return services.keyManagementService.sign(signableData, notaryIdentityKey)
}
}

View File

@ -4,11 +4,14 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.identity.Party
import java.time.Duration
typealias SigningFunction = (SecureHash) -> TransactionSignature
/**
* A service that records input states of the given transaction and provides conflict information
* if any of the inputs have already been used in another transaction.
@ -36,11 +39,12 @@ interface UniquenessProvider {
return NotaryServiceFlow.defaultEstimatedWaitTime
}
/** The outcome of committing a transaction. */
/** The outcome of committing and signing a transaction. */
sealed class Result {
/** Indicates that all input states have been committed successfully. */
object Success : Result()
data class Success(val signature: TransactionSignature) : Result()
/** Indicates that the transaction has not been committed. */
data class Failure(val error: NotaryError) : Result()
}
}
}

View File

@ -14,7 +14,12 @@ import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.elapsedTime
import net.corda.core.internal.notary.*
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.notary.NotaryServiceFlow
import net.corda.core.internal.notary.SigningFunction
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.internal.notary.isConsumedByTheSameTx
import net.corda.core.internal.notary.validateTimeWindow
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
@ -33,12 +38,18 @@ import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.*
import javax.persistence.Column
import javax.persistence.EmbeddedId
import javax.persistence.Entity
import javax.persistence.GeneratedValue
import javax.persistence.Id
import javax.persistence.Lob
import javax.persistence.MappedSuperclass
import kotlin.concurrent.thread
/** A RDBMS backed Uniqueness provider */
@ThreadSafe
class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory) : UniquenessProvider, SingletonSerializeAsToken() {
class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence, cacheFactory: NamedCacheFactory, val signTransaction : SigningFunction) : UniquenessProvider, SingletonSerializeAsToken() {
@MappedSuperclass
class BaseComittedState(
@ -315,6 +326,7 @@ class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersiste
}
private fun respondWithSuccess(request: CommitRequest) {
request.future.set(UniquenessProvider.Result.Success)
val signedTx = signTransaction(request.txId)
request.future.set(UniquenessProvider.Result.Success(signedTx))
}
}

View File

@ -18,7 +18,11 @@ class SimpleNotaryService(override val services: ServiceHubInternal, override va
log.info("Starting notary in $mode mode")
}
override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database, services.cacheFactory)
override val uniquenessProvider = PersistentUniquenessProvider(
services.clock,
services.database,
services.cacheFactory,
::signTransaction)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow {
return if (notaryConfig.validating) {

View File

@ -28,7 +28,8 @@ class RaftNotaryService(
clock,
monitoringService.metrics,
services.cacheFactory,
raftConfig
raftConfig,
::signTransaction
)
}

View File

@ -21,6 +21,7 @@ import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.identity.Party
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.SigningFunction
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SingletonSerializeAsToken
@ -37,7 +38,11 @@ import java.nio.file.Path
import java.time.Clock
import java.util.concurrent.CompletableFuture
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.*
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Lob
import javax.persistence.Table
/**
* A uniqueness provider that records committed input states in a distributed collection replicated and
@ -56,7 +61,8 @@ class RaftUniquenessProvider(
private val clock: Clock,
private val metrics: MetricRegistry,
private val cacheFactory: NamedCacheFactory,
private val raftConfig: RaftConfig
private val raftConfig: RaftConfig,
private val signTransaction: SigningFunction
) : UniquenessProvider, SingletonSerializeAsToken() {
companion object {
private val log = contextLogger()
@ -228,7 +234,7 @@ class RaftUniquenessProvider(
UniquenessProvider.Result.Failure(commitError)
} else {
log.info("All input states of transaction $txId have been committed")
UniquenessProvider.Result.Success
UniquenessProvider.Result.Success(signTransaction(txId))
}
future.set(result)
}

View File

@ -272,7 +272,10 @@ class TimedFlowTests {
/** A dummy commit method that immediately returns a success message. */
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>): CordaFuture<UniquenessProvider.Result> {
return openFuture<UniquenessProvider.Result>().apply {
set(UniquenessProvider.Result.Success)
val signature = services.database.transaction {
signTransaction(txId)
}
set(UniquenessProvider.Result.Success(signature))
}
}
@ -280,7 +283,14 @@ class TimedFlowTests {
}
@Suspendable
override fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>) {
override fun commitInputStates(
inputs: List<StateRef>,
txId: SecureHash,
caller: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?,
references: List<StateRef>
) : UniquenessProvider.Result {
val callingFlow = FlowLogic.currentTopLevel
?: throw IllegalStateException("This method should be invoked in a flow context.")
@ -290,8 +300,9 @@ class TimedFlowTests {
callingFlow.stateMachine.suspend(FlowIORequest.WaitForLedgerCommit(SecureHash.randomSHA256()), false)
} else {
log.info("Processing")
super.commitInputStates(inputs, txId, caller, requestSignature, timeWindow, references)
return super.commitInputStates(inputs, txId, caller, requestSignature, timeWindow, references)
}
return UniquenessProvider.Result.Failure(NotaryError.General(Throwable("leave me alone")))
}
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = NonValidatingNotaryFlow(otherPartySession, this, waitEtaThreshold)

View File

@ -2,9 +2,12 @@ package net.corda.node.services.transactions
import com.codahale.metrics.MetricRegistry
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.NullKeys
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.sha256
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
@ -14,6 +17,7 @@ import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.minutes
import net.corda.node.services.schema.NodeSchemaService
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.notary.experimental.raft.RaftConfig
@ -28,12 +32,15 @@ import net.corda.testing.internal.configureDatabase
import net.corda.testing.internal.configureTestSSL
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.TestClock
import net.corda.testing.node.internal.MockKeyManagementService
import net.corda.testing.node.makeTestIdentityService
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.security.KeyPair
import java.time.Clock
import kotlin.test.assertEquals
@ -93,12 +100,12 @@ class UniquenessProviderTests(
val firstTxId = SecureHash.randomSHA256()
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes))
val result = uniquenessProvider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
// Idempotency: can re-notarise successfully later.
testClock.advanceBy(90.minutes)
val result2 = uniquenessProvider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow).get()
assertEquals(UniquenessProvider.Result.Success, result2)
assert(result2 is UniquenessProvider.Result.Success)
}
@Test
@ -120,12 +127,12 @@ class UniquenessProviderTests(
val result = uniquenessProvider.commit(emptyList(), firstTxId, identity, requestSignature, references = listOf(referenceState))
.get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
// Idempotency: can re-notarise successfully.
val result2 = uniquenessProvider.commit(emptyList(), firstTxId, identity, requestSignature, references = listOf(referenceState))
.get()
assertEquals(UniquenessProvider.Result.Success, result2)
assert(result2 is UniquenessProvider.Result.Success)
}
@Test
@ -135,7 +142,7 @@ class UniquenessProviderTests(
val result = uniquenessProvider.commit(listOf(referenceState), firstTxId, identity, requestSignature, references = emptyList())
.get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
// Transaction referencing the spent sate fails.
val secondTxId = SecureHash.randomSHA256()
@ -157,18 +164,18 @@ class UniquenessProviderTests(
val result = uniquenessProvider.commit(emptyList(), firstTxId, identity, requestSignature, timeWindow, references = listOf(referenceState))
.get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
// The reference state gets consumed.
val result2 = uniquenessProvider.commit(listOf(referenceState), SecureHash.randomSHA256(), identity, requestSignature, timeWindow)
.get()
assertEquals(UniquenessProvider.Result.Success, result2)
assert(result2 is UniquenessProvider.Result.Success)
// Idempotency: can re-notarise successfully.
testClock.advanceBy(90.minutes)
val result3 = uniquenessProvider.commit(emptyList(), firstTxId, identity, requestSignature, timeWindow, references = listOf(referenceState))
.get()
assertEquals(UniquenessProvider.Result.Success, result3)
assert(result3 is UniquenessProvider.Result.Success)
}
@Test
@ -190,7 +197,7 @@ class UniquenessProviderTests(
val result = uniquenessProvider.commit(listOf(referenceState), firstTxId, identity, requestSignature, references = emptyList())
.get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
// Transaction referencing the spent sate fails.
val secondTxId = SecureHash.randomSHA256()
@ -210,7 +217,7 @@ class UniquenessProviderTests(
val result = uniquenessProvider.commit(listOf(referenceState), firstTxId, identity, requestSignature, references = emptyList())
.get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
// Transaction referencing the spent sate fails.
val secondTxId = SecureHash.randomSHA256()
@ -230,11 +237,11 @@ class UniquenessProviderTests(
val inputState = generateStateRef()
val result = uniquenessProvider.commit(listOf(inputState), txID, identity, requestSignature).get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
// Idempotency: can re-notarise successfully.
val result2 = uniquenessProvider.commit(listOf(inputState), txID, identity, requestSignature).get()
assertEquals(UniquenessProvider.Result.Success, result2)
assert(result2 is UniquenessProvider.Result.Success)
}
@Test
@ -244,7 +251,7 @@ class UniquenessProviderTests(
val inputs = listOf(inputState)
val firstTxId = txID
val result = uniquenessProvider.commit(inputs, firstTxId, identity, requestSignature).get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
val secondTxId = SecureHash.randomSHA256()
@ -263,12 +270,12 @@ class UniquenessProviderTests(
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes))
val result = uniquenessProvider.commit(listOf(inputState), txID, identity, requestSignature, timeWindow).get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
// Idempotency: can re-notarise successfully later.
testClock.advanceBy(90.minutes)
val result2 = uniquenessProvider.commit(listOf(inputState), txID, identity, requestSignature, timeWindow).get()
assertEquals(UniquenessProvider.Result.Success, result2)
assert(result2 is UniquenessProvider.Result.Success)
}
@Test
@ -287,7 +294,7 @@ class UniquenessProviderTests(
val inputs = listOf(inputState)
val firstTxId = txID
val result = uniquenessProvider.commit(inputs, firstTxId, identity, requestSignature).get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
val secondTxId = SecureHash.randomSHA256()
@ -306,7 +313,7 @@ class UniquenessProviderTests(
val inputs = listOf(inputState)
val firstTxId = txID
val result = uniquenessProvider.commit(inputs, firstTxId, identity, requestSignature).get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
val secondTxId = SecureHash.randomSHA256()
@ -330,13 +337,13 @@ class UniquenessProviderTests(
val result = uniquenessProvider.commit(listOf(inputState), firstTxId, identity, requestSignature, timeWindow, references = listOf(referenceState))
.get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
// Idempotency: can re-notarise successfully.
testClock.advanceBy(90.minutes)
val result2 = uniquenessProvider.commit(listOf(inputState), firstTxId, identity, requestSignature, timeWindow, references = listOf(referenceState))
.get()
assertEquals(UniquenessProvider.Result.Success, result2)
assert(result2 is UniquenessProvider.Result.Success)
}
@Test
@ -346,7 +353,7 @@ class UniquenessProviderTests(
val referenceState = generateStateRef()
val result = uniquenessProvider.commit(listOf(inputState), firstTxId, identity, requestSignature, references = emptyList()).get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
// Transaction referencing the spent sate fails.
val secondTxId = SecureHash.randomSHA256()
@ -367,7 +374,7 @@ class UniquenessProviderTests(
val result = uniquenessProvider.commit(listOf(referenceState), firstTxId, identity, requestSignature, references = emptyList())
.get()
assertEquals(UniquenessProvider.Result.Success, result)
assert(result is UniquenessProvider.Result.Success)
// Transaction referencing the spent sate fails.
val secondTxId = SecureHash.randomSHA256()
@ -381,6 +388,22 @@ class UniquenessProviderTests(
}
/* Group G: input, reference states and time window covered by previous tests. */
/* Transaction signing tests. */
@Test
fun `signs transactions correctly`() {
(1..10).map {
val inputState1 = generateStateRef()
val firstTxId = SecureHash.randomSHA256()
val timeWindow = TimeWindow.untilOnly(Clock.systemUTC().instant().plus(30.minutes))
Pair(firstTxId, uniquenessProvider.commit(listOf(inputState1), firstTxId, identity, requestSignature, timeWindow))
}.forEach {
val result = it.second.get()
assert(result is UniquenessProvider.Result.Success)
val signature = (result as UniquenessProvider.Result.Success).signature
assert(signature.verify(it.first))
}
}
}
interface UniquenessProviderFactory {
@ -394,7 +417,7 @@ class PersistentUniquenessProviderFactory : UniquenessProviderFactory {
override fun create(clock: Clock): UniquenessProvider {
database?.close()
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }, NodeSchemaService(extraSchemas = setOf(NodeNotarySchemaV1)))
return PersistentUniquenessProvider(clock, database!!, TestingNamedCacheFactory())
return PersistentUniquenessProvider(clock, database!!, TestingNamedCacheFactory(), ::signSingle)
}
override fun cleanUp() {
@ -420,7 +443,8 @@ class RaftUniquenessProviderFactory : UniquenessProviderFactory {
clock,
MetricRegistry(),
TestingNamedCacheFactory(),
RaftConfig(NetworkHostAndPort("localhost", raftNodePort), emptyList())
RaftConfig(NetworkHostAndPort("localhost", raftNodePort), emptyList()),
::signSingle
).apply {
start()
provider = this
@ -432,3 +456,17 @@ class RaftUniquenessProviderFactory : UniquenessProviderFactory {
database?.close()
}
}
var ourKeyPair: KeyPair = Crypto.generateKeyPair(X509Utilities.DEFAULT_TLS_SIGNATURE_SCHEME)
val keyService = MockKeyManagementService(makeTestIdentityService(), ourKeyPair)
val pubKey = keyService.freshKey()
fun signSingle(it: SecureHash) = keyService.sign(
SignableData(
txId = it,
signatureMetadata = SignatureMetadata(
4,
Crypto.findSignatureScheme(pubKey).schemeNumberID
)
), pubKey
)

View File

@ -22,8 +22,15 @@ import java.security.PublicKey
* The notary-related APIs might change in the future.
*/
// START 1
class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : SinglePartyNotaryService() {
override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database, services.cacheFactory)
class MyCustomValidatingNotaryService(
override val services: ServiceHubInternal,
override val notaryIdentityKey: PublicKey)
: SinglePartyNotaryService() {
override val uniquenessProvider = PersistentUniquenessProvider(
services.clock,
services.database,
services.cacheFactory,
::signTransaction)
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this)