From 057ee74611c683fc9aa18647297e30686694cc2e Mon Sep 17 00:00:00 2001 From: Thomas Schroeter Date: Wed, 12 Sep 2018 13:36:04 +0100 Subject: [PATCH] Single node notary thread safety (#3924) --- .idea/compiler.xml | 2 +- .../notary/AsyncUniquenessProvider.kt | 35 ++++++ .../net/corda/core/node/AppServiceHub.kt | 2 +- .../net/corda/node/internal/AbstractNode.kt | 48 +++++--- .../PersistentUniquenessProvider.kt | 108 ++++++++++++++---- .../transactions/SimpleNotaryService.kt | 4 +- .../transactions/ValidatingNotaryService.kt | 4 +- .../PersistentUniquenessProviderTests.kt | 10 +- .../corda/notarydemo/MyCustomNotaryService.kt | 5 +- 9 files changed, 163 insertions(+), 55 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt diff --git a/.idea/compiler.xml b/.idea/compiler.xml index ef6407f0b3..d981d5fd35 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -235,4 +235,4 @@ - \ No newline at end of file + diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt new file mode 100644 index 0000000000..1d9328de88 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt @@ -0,0 +1,35 @@ +package net.corda.core.internal.notary + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.NotarisationRequestSignature +import net.corda.core.flows.NotaryError +import net.corda.core.identity.Party + +/** + * A service that records input states of the given transaction and provides conflict information + * if any of the inputs have already been used in another transaction. + */ +interface AsyncUniquenessProvider : UniquenessProvider { + /** Commits all input states of the given transaction. */ + fun commitAsync(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List): CordaFuture + + /** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */ + override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List) { + val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow,references).get() + if (result is Result.Failure) { + throw NotaryInternalException(result.error) + } + } + + /** The outcome of committing a transaction. */ + sealed class Result { + /** Indicates that all input states have been committed successfully. */ + object Success : Result() + /** Indicates that the transaction has not been committed. */ + data class Failure(val error: NotaryError) : Result() + } +} + diff --git a/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt index 62e5e2f1f6..60816009cb 100644 --- a/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt @@ -28,4 +28,4 @@ interface AppServiceHub : ServiceHub { * TODO it is assumed here that the flow object has an appropriate classloader. */ fun startTrackedFlow(flow: FlowLogic): FlowProgressHandle -} \ No newline at end of file +} diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index cb008ac462..5590c1fe74 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -521,7 +521,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } /** - * If the [serviceClass] is a notary service, it will only be enable if the "custom" flag is set in + * If the [serviceClass] is a notary service, it will only be enabled if the "custom" flag is set in * the notary configuration. */ private fun isNotaryService(serviceClass: Class<*>) = NotaryService::class.java.isAssignableFrom(serviceClass) @@ -568,28 +568,38 @@ abstract class AbstractNode(val configuration: NodeConfiguration, private fun installCordaService(flowStarter: FlowStarter, serviceClass: Class, myNotaryIdentity: PartyAndCertificate?) { serviceClass.requireAnnotation() + val service = try { - val serviceContext = AppServiceHubImpl(services, flowStarter) - if (isNotaryService(serviceClass)) { - myNotaryIdentity ?: throw IllegalStateException("Trying to install a notary service but no notary identity specified") - val constructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java, PublicKey::class.java).apply { isAccessible = true } - serviceContext.serviceInstance = constructor.newInstance(serviceContext, myNotaryIdentity.owningKey) - serviceContext.serviceInstance - } else { - try { - val extendedServiceConstructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java).apply { isAccessible = true } - serviceContext.serviceInstance = extendedServiceConstructor.newInstance(serviceContext) - serviceContext.serviceInstance - } catch (ex: NoSuchMethodException) { - val constructor = serviceClass.getDeclaredConstructor(ServiceHub::class.java).apply { isAccessible = true } - log.warn("${serviceClass.name} is using legacy CordaService constructor with ServiceHub parameter. " + - "Upgrade to an AppServiceHub parameter to enable updated API features.") - constructor.newInstance(services) - } - } + if (isNotaryService(serviceClass)) { + myNotaryIdentity ?: throw IllegalStateException("Trying to install a notary service but no notary identity specified") + try { + val constructor = serviceClass.getDeclaredConstructor(ServiceHubInternal::class.java, PublicKey::class.java).apply { isAccessible = true } + constructor.newInstance(services, myNotaryIdentity.owningKey ) + } catch (ex: NoSuchMethodException) { + val constructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java, PublicKey::class.java).apply { isAccessible = true } + val serviceContext = AppServiceHubImpl(services, flowStarter) + val service = constructor.newInstance(serviceContext, myNotaryIdentity.owningKey) + serviceContext.serviceInstance = service + service + } + } else { + try { + val serviceContext = AppServiceHubImpl(services, flowStarter) + val extendedServiceConstructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java).apply { isAccessible = true } + val service = extendedServiceConstructor.newInstance(serviceContext) + serviceContext.serviceInstance = service + service + } catch (ex: NoSuchMethodException) { + val constructor = serviceClass.getDeclaredConstructor(ServiceHub::class.java).apply { isAccessible = true } + log.warn("${serviceClass.name} is using legacy CordaService constructor with ServiceHub parameter. " + + "Upgrade to an AppServiceHub parameter to enable updated API features.") + constructor.newInstance(services) + } + } } catch (e: InvocationTargetException) { throw ServiceInstantiationException(e.cause) } + cordappServices.putInstance(serviceClass, service) if (service is NotaryService) handleCustomNotaryService(service) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt index e0ab98b9e5..eadedd4e64 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt @@ -1,5 +1,6 @@ package net.corda.node.services.transactions +import net.corda.core.concurrent.CordaFuture import net.corda.core.contracts.StateRef import net.corda.core.contracts.TimeWindow import net.corda.core.crypto.SecureHash @@ -8,11 +9,9 @@ import net.corda.core.flows.NotarisationRequestSignature import net.corda.core.flows.NotaryError import net.corda.core.flows.StateConsumptionDetails import net.corda.core.identity.Party -import net.corda.core.internal.ThreadBox -import net.corda.core.internal.notary.NotaryInternalException -import net.corda.core.internal.notary.UniquenessProvider -import net.corda.core.internal.notary.isConsumedByTheSameTx -import net.corda.core.internal.notary.validateTimeWindow +import net.corda.core.internal.concurrent.OpenFuture +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.notary.* import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SingletonSerializeAsToken @@ -20,17 +19,22 @@ import net.corda.core.serialization.serialize import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug import net.corda.node.utilities.AppendOnlyPersistentMap +import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.currentDBSession +import net.corda.serialization.internal.CordaSerializationEncoding import java.time.Clock import java.time.Instant import java.util.* +import java.util.concurrent.LinkedBlockingQueue import javax.annotation.concurrent.ThreadSafe import javax.persistence.* +import kotlin.concurrent.thread /** A RDBMS backed Uniqueness provider */ @ThreadSafe -class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, SingletonSerializeAsToken() { +class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence) : AsyncUniquenessProvider, SingletonSerializeAsToken() { + @MappedSuperclass class BaseComittedState( @EmbeddedId @@ -63,17 +67,36 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl var requestDate: Instant ) + private data class CommitRequest( + val states: List, + val txId: SecureHash, + val callerIdentity: Party, + val requestSignature: NotarisationRequestSignature, + val timeWindow: TimeWindow?, + val references: List, + val future: OpenFuture) + @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states") class CommittedState(id: PersistentStateRef, consumingTxHash: String) : BaseComittedState(id, consumingTxHash) - private class InnerState { - val commitLog = createMap() + private val commitLog = createMap() + + private val requestQueue = LinkedBlockingQueue(requestQueueSize) + + /** A request processor thread. */ + private val processorThread = thread(name = "Notary request queue processor", isDaemon = true) { + try { + while (!Thread.interrupted()) { + processRequest(requestQueue.take()) + } + } catch (e: InterruptedException) { + } + log.debug { "Shutting down with ${requestQueue.size} in-flight requests unprocessed." } } - private val mutex = ThreadBox(InnerState()) - companion object { + private const val requestQueueSize = 100_000 private val log = contextLogger() fun createMap(): AppendOnlyPersistentMap = AppendOnlyPersistentMap( @@ -99,23 +122,25 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl ) } - override fun commit( + + /** + * Generates and adds a [CommitRequest] to the request queue. If the request queue is full, this method will block + * until space is available. + * + * Returns a future that will complete once the request is processed, containing the commit [Result]. + */ + override fun commitAsync( states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List - ) { - mutex.locked { - logRequest(txId, callerIdentity, requestSignature) - val conflictingStates = findAlreadyCommitted(states, references, commitLog) - if (conflictingStates.isNotEmpty()) { - handleConflicts(txId, conflictingStates) - } else { - handleNoConflicts(timeWindow, states, txId, commitLog) - } - } + ): CordaFuture { + val future = openFuture() + val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references, future) + requestQueue.put(request) + return future } private fun logRequest(txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { @@ -149,6 +174,25 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl return conflictingStates } + private fun commitOne( + states: List, + txId: SecureHash, + callerIdentity: Party, + requestSignature: NotarisationRequestSignature, + timeWindow: TimeWindow?, + references: List + ) { + database.transaction { + logRequest(txId, callerIdentity, requestSignature) + val conflictingStates = findAlreadyCommitted(states, references, commitLog) + if (conflictingStates.isNotEmpty()) { + handleConflicts(txId, conflictingStates) + } else { + handleNoConflicts(timeWindow, states, txId, commitLog) + } + } + } + private fun handleConflicts(txId: SecureHash, conflictingStates: LinkedHashMap) { if (isConsumedByTheSameTx(txId.sha256(), conflictingStates)) { log.debug { "Transaction $txId already notarised" } @@ -171,4 +215,26 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl throw NotaryInternalException(outsideTimeWindowError) } } + + private fun processRequest(request: CommitRequest) { + try { + commitOne(request.states, request.txId, request.callerIdentity, request.requestSignature, request.timeWindow, request.references) + respondWithSuccess(request) + } catch (e: Exception) { + log.warn("Error processing commit request", e) + respondWithError(request, e) + } + } + + private fun respondWithError(request: CommitRequest, exception: Exception) { + if (exception is NotaryInternalException) { + request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error)) + } else { + request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error.")))) + } + } + + private fun respondWithSuccess(request: CommitRequest) { + request.future.set(AsyncUniquenessProvider.Result.Success) + } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt index 093c0d905b..16ad4464af 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt @@ -8,10 +8,10 @@ import java.security.PublicKey /** A simple Notary service that does not perform transaction validation */ class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val uniquenessProvider = PersistentUniquenessProvider(services.clock) + override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database) override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow = NonValidatingNotaryFlow(otherPartySession, this) override fun start() {} override fun stop() {} -} \ No newline at end of file +} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt index 08b4fcb211..4a6f46b2ce 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt @@ -8,10 +8,10 @@ import java.security.PublicKey /** A Notary service that validates the transaction chain of the submitted transaction before committing it */ class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val uniquenessProvider = PersistentUniquenessProvider(services.clock) + override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database) override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow = ValidatingNotaryFlow(otherPartySession, this) override fun start() {} override fun stop() {} -} \ No newline at end of file +} diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt index 17fe106c99..bdc2b642d0 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt @@ -28,7 +28,7 @@ import kotlin.test.assertFailsWith class PersistentUniquenessProviderTests { @Rule @JvmField - val testSerialization = SerializationEnvironmentRule() + val testSerialization = SerializationEnvironmentRule(inheritable = true) private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party private val txID = SecureHash.randomSHA256() private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0) @@ -49,18 +49,15 @@ class PersistentUniquenessProviderTests { @Test fun `should commit a transaction with unused inputs without exception`() { - database.transaction { - val provider = PersistentUniquenessProvider(Clock.systemUTC()) + val provider = PersistentUniquenessProvider(Clock.systemUTC(), database) val inputState = generateStateRef() provider.commit(listOf(inputState), txID, identity, requestSignature) - } } @Test fun `should report a conflict for a transaction with previously used inputs`() { - database.transaction { - val provider = PersistentUniquenessProvider(Clock.systemUTC()) + val provider = PersistentUniquenessProvider(Clock.systemUTC(), database) val inputState = generateStateRef() val inputs = listOf(inputState) @@ -76,5 +73,4 @@ class PersistentUniquenessProviderTests { val conflictCause = error.consumedStates[inputState]!! assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256()) } - } } diff --git a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt index dd1f424d1d..666844691c 100644 --- a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt +++ b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt @@ -13,6 +13,7 @@ import net.corda.core.node.services.CordaService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionWithSignatures import net.corda.core.transactions.WireTransaction +import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.transactions.PersistentUniquenessProvider import java.security.PublicKey import java.security.SignatureException @@ -25,8 +26,8 @@ import java.security.SignatureException */ // START 1 @CordaService -class MyCustomValidatingNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { - override val uniquenessProvider = PersistentUniquenessProvider(services.clock) +class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { + override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database) override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = MyValidatingNotaryFlow(otherPartySession, this)