diff --git a/.idea/compiler.xml b/.idea/compiler.xml
index ef6407f0b3..d981d5fd35 100644
--- a/.idea/compiler.xml
+++ b/.idea/compiler.xml
@@ -235,4 +235,4 @@
-
\ No newline at end of file
+
diff --git a/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt
new file mode 100644
index 0000000000..1d9328de88
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/internal/notary/AsyncUniquenessProvider.kt
@@ -0,0 +1,35 @@
+package net.corda.core.internal.notary
+
+import net.corda.core.concurrent.CordaFuture
+import net.corda.core.contracts.StateRef
+import net.corda.core.contracts.TimeWindow
+import net.corda.core.crypto.SecureHash
+import net.corda.core.flows.NotarisationRequestSignature
+import net.corda.core.flows.NotaryError
+import net.corda.core.identity.Party
+
+/**
+ * A service that records input states of the given transaction and provides conflict information
+ * if any of the inputs have already been used in another transaction.
+ */
+interface AsyncUniquenessProvider : UniquenessProvider {
+ /** Commits all input states of the given transaction. */
+ fun commitAsync(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List): CordaFuture
+
+ /** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */
+ override fun commit(states: List, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List) {
+ val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow,references).get()
+ if (result is Result.Failure) {
+ throw NotaryInternalException(result.error)
+ }
+ }
+
+ /** The outcome of committing a transaction. */
+ sealed class Result {
+ /** Indicates that all input states have been committed successfully. */
+ object Success : Result()
+ /** Indicates that the transaction has not been committed. */
+ data class Failure(val error: NotaryError) : Result()
+ }
+}
+
diff --git a/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt
index 62e5e2f1f6..60816009cb 100644
--- a/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt
+++ b/core/src/main/kotlin/net/corda/core/node/AppServiceHub.kt
@@ -28,4 +28,4 @@ interface AppServiceHub : ServiceHub {
* TODO it is assumed here that the flow object has an appropriate classloader.
*/
fun startTrackedFlow(flow: FlowLogic): FlowProgressHandle
-}
\ No newline at end of file
+}
diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
index cb008ac462..5590c1fe74 100644
--- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
+++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
@@ -521,7 +521,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
/**
- * If the [serviceClass] is a notary service, it will only be enable if the "custom" flag is set in
+ * If the [serviceClass] is a notary service, it will only be enabled if the "custom" flag is set in
* the notary configuration.
*/
private fun isNotaryService(serviceClass: Class<*>) = NotaryService::class.java.isAssignableFrom(serviceClass)
@@ -568,28 +568,38 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
private fun installCordaService(flowStarter: FlowStarter, serviceClass: Class, myNotaryIdentity: PartyAndCertificate?) {
serviceClass.requireAnnotation()
+
val service = try {
- val serviceContext = AppServiceHubImpl(services, flowStarter)
- if (isNotaryService(serviceClass)) {
- myNotaryIdentity ?: throw IllegalStateException("Trying to install a notary service but no notary identity specified")
- val constructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java, PublicKey::class.java).apply { isAccessible = true }
- serviceContext.serviceInstance = constructor.newInstance(serviceContext, myNotaryIdentity.owningKey)
- serviceContext.serviceInstance
- } else {
- try {
- val extendedServiceConstructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java).apply { isAccessible = true }
- serviceContext.serviceInstance = extendedServiceConstructor.newInstance(serviceContext)
- serviceContext.serviceInstance
- } catch (ex: NoSuchMethodException) {
- val constructor = serviceClass.getDeclaredConstructor(ServiceHub::class.java).apply { isAccessible = true }
- log.warn("${serviceClass.name} is using legacy CordaService constructor with ServiceHub parameter. " +
- "Upgrade to an AppServiceHub parameter to enable updated API features.")
- constructor.newInstance(services)
- }
- }
+ if (isNotaryService(serviceClass)) {
+ myNotaryIdentity ?: throw IllegalStateException("Trying to install a notary service but no notary identity specified")
+ try {
+ val constructor = serviceClass.getDeclaredConstructor(ServiceHubInternal::class.java, PublicKey::class.java).apply { isAccessible = true }
+ constructor.newInstance(services, myNotaryIdentity.owningKey )
+ } catch (ex: NoSuchMethodException) {
+ val constructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java, PublicKey::class.java).apply { isAccessible = true }
+ val serviceContext = AppServiceHubImpl(services, flowStarter)
+ val service = constructor.newInstance(serviceContext, myNotaryIdentity.owningKey)
+ serviceContext.serviceInstance = service
+ service
+ }
+ } else {
+ try {
+ val serviceContext = AppServiceHubImpl(services, flowStarter)
+ val extendedServiceConstructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java).apply { isAccessible = true }
+ val service = extendedServiceConstructor.newInstance(serviceContext)
+ serviceContext.serviceInstance = service
+ service
+ } catch (ex: NoSuchMethodException) {
+ val constructor = serviceClass.getDeclaredConstructor(ServiceHub::class.java).apply { isAccessible = true }
+ log.warn("${serviceClass.name} is using legacy CordaService constructor with ServiceHub parameter. " +
+ "Upgrade to an AppServiceHub parameter to enable updated API features.")
+ constructor.newInstance(services)
+ }
+ }
} catch (e: InvocationTargetException) {
throw ServiceInstantiationException(e.cause)
}
+
cordappServices.putInstance(serviceClass, service)
if (service is NotaryService) handleCustomNotaryService(service)
diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt
index e0ab98b9e5..eadedd4e64 100644
--- a/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt
+++ b/node/src/main/kotlin/net/corda/node/services/transactions/PersistentUniquenessProvider.kt
@@ -1,5 +1,6 @@
package net.corda.node.services.transactions
+import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
@@ -8,11 +9,9 @@ import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.Party
-import net.corda.core.internal.ThreadBox
-import net.corda.core.internal.notary.NotaryInternalException
-import net.corda.core.internal.notary.UniquenessProvider
-import net.corda.core.internal.notary.isConsumedByTheSameTx
-import net.corda.core.internal.notary.validateTimeWindow
+import net.corda.core.internal.concurrent.OpenFuture
+import net.corda.core.internal.concurrent.openFuture
+import net.corda.core.internal.notary.*
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
@@ -20,17 +19,22 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.utilities.AppendOnlyPersistentMap
+import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
+import net.corda.serialization.internal.CordaSerializationEncoding
import java.time.Clock
import java.time.Instant
import java.util.*
+import java.util.concurrent.LinkedBlockingQueue
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.*
+import kotlin.concurrent.thread
/** A RDBMS backed Uniqueness provider */
@ThreadSafe
-class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, SingletonSerializeAsToken() {
+class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
+
@MappedSuperclass
class BaseComittedState(
@EmbeddedId
@@ -63,17 +67,36 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
var requestDate: Instant
)
+ private data class CommitRequest(
+ val states: List,
+ val txId: SecureHash,
+ val callerIdentity: Party,
+ val requestSignature: NotarisationRequestSignature,
+ val timeWindow: TimeWindow?,
+ val references: List,
+ val future: OpenFuture)
+
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states")
class CommittedState(id: PersistentStateRef, consumingTxHash: String) : BaseComittedState(id, consumingTxHash)
- private class InnerState {
- val commitLog = createMap()
+ private val commitLog = createMap()
+
+ private val requestQueue = LinkedBlockingQueue(requestQueueSize)
+
+ /** A request processor thread. */
+ private val processorThread = thread(name = "Notary request queue processor", isDaemon = true) {
+ try {
+ while (!Thread.interrupted()) {
+ processRequest(requestQueue.take())
+ }
+ } catch (e: InterruptedException) {
+ }
+ log.debug { "Shutting down with ${requestQueue.size} in-flight requests unprocessed." }
}
- private val mutex = ThreadBox(InnerState())
-
companion object {
+ private const val requestQueueSize = 100_000
private val log = contextLogger()
fun createMap(): AppendOnlyPersistentMap =
AppendOnlyPersistentMap(
@@ -99,23 +122,25 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
)
}
- override fun commit(
+
+ /**
+ * Generates and adds a [CommitRequest] to the request queue. If the request queue is full, this method will block
+ * until space is available.
+ *
+ * Returns a future that will complete once the request is processed, containing the commit [Result].
+ */
+ override fun commitAsync(
states: List,
txId: SecureHash,
callerIdentity: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?,
references: List
- ) {
- mutex.locked {
- logRequest(txId, callerIdentity, requestSignature)
- val conflictingStates = findAlreadyCommitted(states, references, commitLog)
- if (conflictingStates.isNotEmpty()) {
- handleConflicts(txId, conflictingStates)
- } else {
- handleNoConflicts(timeWindow, states, txId, commitLog)
- }
- }
+ ): CordaFuture {
+ val future = openFuture()
+ val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references, future)
+ requestQueue.put(request)
+ return future
}
private fun logRequest(txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
@@ -149,6 +174,25 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
return conflictingStates
}
+ private fun commitOne(
+ states: List,
+ txId: SecureHash,
+ callerIdentity: Party,
+ requestSignature: NotarisationRequestSignature,
+ timeWindow: TimeWindow?,
+ references: List
+ ) {
+ database.transaction {
+ logRequest(txId, callerIdentity, requestSignature)
+ val conflictingStates = findAlreadyCommitted(states, references, commitLog)
+ if (conflictingStates.isNotEmpty()) {
+ handleConflicts(txId, conflictingStates)
+ } else {
+ handleNoConflicts(timeWindow, states, txId, commitLog)
+ }
+ }
+ }
+
private fun handleConflicts(txId: SecureHash, conflictingStates: LinkedHashMap) {
if (isConsumedByTheSameTx(txId.sha256(), conflictingStates)) {
log.debug { "Transaction $txId already notarised" }
@@ -171,4 +215,26 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
throw NotaryInternalException(outsideTimeWindowError)
}
}
+
+ private fun processRequest(request: CommitRequest) {
+ try {
+ commitOne(request.states, request.txId, request.callerIdentity, request.requestSignature, request.timeWindow, request.references)
+ respondWithSuccess(request)
+ } catch (e: Exception) {
+ log.warn("Error processing commit request", e)
+ respondWithError(request, e)
+ }
+ }
+
+ private fun respondWithError(request: CommitRequest, exception: Exception) {
+ if (exception is NotaryInternalException) {
+ request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error))
+ } else {
+ request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
+ }
+ }
+
+ private fun respondWithSuccess(request: CommitRequest) {
+ request.future.set(AsyncUniquenessProvider.Result.Success)
+ }
}
diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt
index 093c0d905b..16ad4464af 100644
--- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt
@@ -8,10 +8,10 @@ import java.security.PublicKey
/** A simple Notary service that does not perform transaction validation */
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
- override val uniquenessProvider = PersistentUniquenessProvider(services.clock)
+ override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow = NonValidatingNotaryFlow(otherPartySession, this)
override fun start() {}
override fun stop() {}
-}
\ No newline at end of file
+}
diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt
index 08b4fcb211..4a6f46b2ce 100644
--- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt
@@ -8,10 +8,10 @@ import java.security.PublicKey
/** A Notary service that validates the transaction chain of the submitted transaction before committing it */
class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
- override val uniquenessProvider = PersistentUniquenessProvider(services.clock)
+ override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow = ValidatingNotaryFlow(otherPartySession, this)
override fun start() {}
override fun stop() {}
-}
\ No newline at end of file
+}
diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt
index 17fe106c99..bdc2b642d0 100644
--- a/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt
+++ b/node/src/test/kotlin/net/corda/node/services/transactions/PersistentUniquenessProviderTests.kt
@@ -28,7 +28,7 @@ import kotlin.test.assertFailsWith
class PersistentUniquenessProviderTests {
@Rule
@JvmField
- val testSerialization = SerializationEnvironmentRule()
+ val testSerialization = SerializationEnvironmentRule(inheritable = true)
private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party
private val txID = SecureHash.randomSHA256()
private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0)
@@ -49,18 +49,15 @@ class PersistentUniquenessProviderTests {
@Test
fun `should commit a transaction with unused inputs without exception`() {
- database.transaction {
- val provider = PersistentUniquenessProvider(Clock.systemUTC())
+ val provider = PersistentUniquenessProvider(Clock.systemUTC(), database)
val inputState = generateStateRef()
provider.commit(listOf(inputState), txID, identity, requestSignature)
- }
}
@Test
fun `should report a conflict for a transaction with previously used inputs`() {
- database.transaction {
- val provider = PersistentUniquenessProvider(Clock.systemUTC())
+ val provider = PersistentUniquenessProvider(Clock.systemUTC(), database)
val inputState = generateStateRef()
val inputs = listOf(inputState)
@@ -76,5 +73,4 @@ class PersistentUniquenessProviderTests {
val conflictCause = error.consumedStates[inputState]!!
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
}
- }
}
diff --git a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt
index dd1f424d1d..666844691c 100644
--- a/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt
+++ b/samples/notary-demo/src/main/kotlin/net/corda/notarydemo/MyCustomNotaryService.kt
@@ -13,6 +13,7 @@ import net.corda.core.node.services.CordaService
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionWithSignatures
import net.corda.core.transactions.WireTransaction
+import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.PersistentUniquenessProvider
import java.security.PublicKey
import java.security.SignatureException
@@ -25,8 +26,8 @@ import java.security.SignatureException
*/
// START 1
@CordaService
-class MyCustomValidatingNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
- override val uniquenessProvider = PersistentUniquenessProvider(services.clock)
+class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
+ override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database)
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic = MyValidatingNotaryFlow(otherPartySession, this)