Single node notary thread safety (#3924)

This commit is contained in:
Thomas Schroeter
2018-09-12 13:36:04 +01:00
committed by GitHub
parent 90a7dd2bf4
commit 057ee74611
9 changed files with 163 additions and 55 deletions

2
.idea/compiler.xml generated
View File

@ -235,4 +235,4 @@
<component name="JavacSettings"> <component name="JavacSettings">
<option name="ADDITIONAL_OPTIONS_STRING" value="-parameters" /> <option name="ADDITIONAL_OPTIONS_STRING" value="-parameters" />
</component> </component>
</project> </project>

View File

@ -0,0 +1,35 @@
package net.corda.core.internal.notary
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError
import net.corda.core.identity.Party
/**
* A service that records input states of the given transaction and provides conflict information
* if any of the inputs have already been used in another transaction.
*/
interface AsyncUniquenessProvider : UniquenessProvider {
/** Commits all input states of the given transaction. */
fun commitAsync(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>): CordaFuture<Result>
/** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>) {
val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow,references).get()
if (result is Result.Failure) {
throw NotaryInternalException(result.error)
}
}
/** The outcome of committing a transaction. */
sealed class Result {
/** Indicates that all input states have been committed successfully. */
object Success : Result()
/** Indicates that the transaction has not been committed. */
data class Failure(val error: NotaryError) : Result()
}
}

View File

@ -28,4 +28,4 @@ interface AppServiceHub : ServiceHub {
* TODO it is assumed here that the flow object has an appropriate classloader. * TODO it is assumed here that the flow object has an appropriate classloader.
*/ */
fun <T> startTrackedFlow(flow: FlowLogic<T>): FlowProgressHandle<T> fun <T> startTrackedFlow(flow: FlowLogic<T>): FlowProgressHandle<T>
} }

View File

@ -521,7 +521,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
} }
/** /**
* If the [serviceClass] is a notary service, it will only be enable if the "custom" flag is set in * If the [serviceClass] is a notary service, it will only be enabled if the "custom" flag is set in
* the notary configuration. * the notary configuration.
*/ */
private fun isNotaryService(serviceClass: Class<*>) = NotaryService::class.java.isAssignableFrom(serviceClass) private fun isNotaryService(serviceClass: Class<*>) = NotaryService::class.java.isAssignableFrom(serviceClass)
@ -568,28 +568,38 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
private fun <T : SerializeAsToken> installCordaService(flowStarter: FlowStarter, serviceClass: Class<T>, myNotaryIdentity: PartyAndCertificate?) { private fun <T : SerializeAsToken> installCordaService(flowStarter: FlowStarter, serviceClass: Class<T>, myNotaryIdentity: PartyAndCertificate?) {
serviceClass.requireAnnotation<CordaService>() serviceClass.requireAnnotation<CordaService>()
val service = try { val service = try {
val serviceContext = AppServiceHubImpl<T>(services, flowStarter) if (isNotaryService(serviceClass)) {
if (isNotaryService(serviceClass)) { myNotaryIdentity ?: throw IllegalStateException("Trying to install a notary service but no notary identity specified")
myNotaryIdentity ?: throw IllegalStateException("Trying to install a notary service but no notary identity specified") try {
val constructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java, PublicKey::class.java).apply { isAccessible = true } val constructor = serviceClass.getDeclaredConstructor(ServiceHubInternal::class.java, PublicKey::class.java).apply { isAccessible = true }
serviceContext.serviceInstance = constructor.newInstance(serviceContext, myNotaryIdentity.owningKey) constructor.newInstance(services, myNotaryIdentity.owningKey )
serviceContext.serviceInstance } catch (ex: NoSuchMethodException) {
} else { val constructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java, PublicKey::class.java).apply { isAccessible = true }
try { val serviceContext = AppServiceHubImpl<T>(services, flowStarter)
val extendedServiceConstructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java).apply { isAccessible = true } val service = constructor.newInstance(serviceContext, myNotaryIdentity.owningKey)
serviceContext.serviceInstance = extendedServiceConstructor.newInstance(serviceContext) serviceContext.serviceInstance = service
serviceContext.serviceInstance service
} catch (ex: NoSuchMethodException) { }
val constructor = serviceClass.getDeclaredConstructor(ServiceHub::class.java).apply { isAccessible = true } } else {
log.warn("${serviceClass.name} is using legacy CordaService constructor with ServiceHub parameter. " + try {
"Upgrade to an AppServiceHub parameter to enable updated API features.") val serviceContext = AppServiceHubImpl<T>(services, flowStarter)
constructor.newInstance(services) val extendedServiceConstructor = serviceClass.getDeclaredConstructor(AppServiceHub::class.java).apply { isAccessible = true }
} val service = extendedServiceConstructor.newInstance(serviceContext)
} serviceContext.serviceInstance = service
service
} catch (ex: NoSuchMethodException) {
val constructor = serviceClass.getDeclaredConstructor(ServiceHub::class.java).apply { isAccessible = true }
log.warn("${serviceClass.name} is using legacy CordaService constructor with ServiceHub parameter. " +
"Upgrade to an AppServiceHub parameter to enable updated API features.")
constructor.newInstance(services)
}
}
} catch (e: InvocationTargetException) { } catch (e: InvocationTargetException) {
throw ServiceInstantiationException(e.cause) throw ServiceInstantiationException(e.cause)
} }
cordappServices.putInstance(serviceClass, service) cordappServices.putInstance(serviceClass, service)
if (service is NotaryService) handleCustomNotaryService(service) if (service is NotaryService) handleCustomNotaryService(service)

View File

@ -1,5 +1,6 @@
package net.corda.node.services.transactions package net.corda.node.services.transactions
import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.StateRef import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash import net.corda.core.crypto.SecureHash
@ -8,11 +9,9 @@ import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.flows.NotaryError import net.corda.core.flows.NotaryError
import net.corda.core.flows.StateConsumptionDetails import net.corda.core.flows.StateConsumptionDetails
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.ThreadBox import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.notary.NotaryInternalException import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.notary.UniquenessProvider import net.corda.core.internal.notary.*
import net.corda.core.internal.notary.isConsumedByTheSameTx
import net.corda.core.internal.notary.validateTimeWindow
import net.corda.core.schemas.PersistentStateRef import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
@ -20,17 +19,22 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.serialization.internal.CordaSerializationEncoding
import java.time.Clock import java.time.Clock
import java.time.Instant import java.time.Instant
import java.util.* import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import javax.annotation.concurrent.ThreadSafe import javax.annotation.concurrent.ThreadSafe
import javax.persistence.* import javax.persistence.*
import kotlin.concurrent.thread
/** A RDBMS backed Uniqueness provider */ /** A RDBMS backed Uniqueness provider */
@ThreadSafe @ThreadSafe
class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, SingletonSerializeAsToken() { class PersistentUniquenessProvider(val clock: Clock, val database: CordaPersistence) : AsyncUniquenessProvider, SingletonSerializeAsToken() {
@MappedSuperclass @MappedSuperclass
class BaseComittedState( class BaseComittedState(
@EmbeddedId @EmbeddedId
@ -63,17 +67,36 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
var requestDate: Instant var requestDate: Instant
) )
private data class CommitRequest(
val states: List<StateRef>,
val txId: SecureHash,
val callerIdentity: Party,
val requestSignature: NotarisationRequestSignature,
val timeWindow: TimeWindow?,
val references: List<StateRef>,
val future: OpenFuture<AsyncUniquenessProvider.Result>)
@Entity @Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states") @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}notary_committed_states")
class CommittedState(id: PersistentStateRef, consumingTxHash: String) : BaseComittedState(id, consumingTxHash) class CommittedState(id: PersistentStateRef, consumingTxHash: String) : BaseComittedState(id, consumingTxHash)
private class InnerState { private val commitLog = createMap()
val commitLog = createMap()
private val requestQueue = LinkedBlockingQueue<CommitRequest>(requestQueueSize)
/** A request processor thread. */
private val processorThread = thread(name = "Notary request queue processor", isDaemon = true) {
try {
while (!Thread.interrupted()) {
processRequest(requestQueue.take())
}
} catch (e: InterruptedException) {
}
log.debug { "Shutting down with ${requestQueue.size} in-flight requests unprocessed." }
} }
private val mutex = ThreadBox(InnerState())
companion object { companion object {
private const val requestQueueSize = 100_000
private val log = contextLogger() private val log = contextLogger()
fun createMap(): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> = fun createMap(): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> =
AppendOnlyPersistentMap( AppendOnlyPersistentMap(
@ -99,23 +122,25 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
) )
} }
override fun commit(
/**
* Generates and adds a [CommitRequest] to the request queue. If the request queue is full, this method will block
* until space is available.
*
* Returns a future that will complete once the request is processed, containing the commit [Result].
*/
override fun commitAsync(
states: List<StateRef>, states: List<StateRef>,
txId: SecureHash, txId: SecureHash,
callerIdentity: Party, callerIdentity: Party,
requestSignature: NotarisationRequestSignature, requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?, timeWindow: TimeWindow?,
references: List<StateRef> references: List<StateRef>
) { ): CordaFuture<AsyncUniquenessProvider.Result> {
mutex.locked { val future = openFuture<AsyncUniquenessProvider.Result>()
logRequest(txId, callerIdentity, requestSignature) val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references, future)
val conflictingStates = findAlreadyCommitted(states, references, commitLog) requestQueue.put(request)
if (conflictingStates.isNotEmpty()) { return future
handleConflicts(txId, conflictingStates)
} else {
handleNoConflicts(timeWindow, states, txId, commitLog)
}
}
} }
private fun logRequest(txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) { private fun logRequest(txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature) {
@ -149,6 +174,25 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
return conflictingStates return conflictingStates
} }
private fun commitOne(
states: List<StateRef>,
txId: SecureHash,
callerIdentity: Party,
requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow?,
references: List<StateRef>
) {
database.transaction {
logRequest(txId, callerIdentity, requestSignature)
val conflictingStates = findAlreadyCommitted(states, references, commitLog)
if (conflictingStates.isNotEmpty()) {
handleConflicts(txId, conflictingStates)
} else {
handleNoConflicts(timeWindow, states, txId, commitLog)
}
}
}
private fun handleConflicts(txId: SecureHash, conflictingStates: LinkedHashMap<StateRef, StateConsumptionDetails>) { private fun handleConflicts(txId: SecureHash, conflictingStates: LinkedHashMap<StateRef, StateConsumptionDetails>) {
if (isConsumedByTheSameTx(txId.sha256(), conflictingStates)) { if (isConsumedByTheSameTx(txId.sha256(), conflictingStates)) {
log.debug { "Transaction $txId already notarised" } log.debug { "Transaction $txId already notarised" }
@ -171,4 +215,26 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
throw NotaryInternalException(outsideTimeWindowError) throw NotaryInternalException(outsideTimeWindowError)
} }
} }
private fun processRequest(request: CommitRequest) {
try {
commitOne(request.states, request.txId, request.callerIdentity, request.requestSignature, request.timeWindow, request.references)
respondWithSuccess(request)
} catch (e: Exception) {
log.warn("Error processing commit request", e)
respondWithError(request, e)
}
}
private fun respondWithError(request: CommitRequest, exception: Exception) {
if (exception is NotaryInternalException) {
request.future.set(AsyncUniquenessProvider.Result.Failure(exception.error))
} else {
request.future.setException(NotaryInternalException(NotaryError.General(Exception("Internal service error."))))
}
}
private fun respondWithSuccess(request: CommitRequest) {
request.future.set(AsyncUniquenessProvider.Result.Success)
}
} }

View File

@ -8,10 +8,10 @@ import java.security.PublicKey
/** A simple Notary service that does not perform transaction validation */ /** A simple Notary service that does not perform transaction validation */
class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { class SimpleNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
override val uniquenessProvider = PersistentUniquenessProvider(services.clock) override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow = NonValidatingNotaryFlow(otherPartySession, this) override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow = NonValidatingNotaryFlow(otherPartySession, this)
override fun start() {} override fun start() {}
override fun stop() {} override fun stop() {}
} }

View File

@ -8,10 +8,10 @@ import java.security.PublicKey
/** A Notary service that validates the transaction chain of the submitted transaction before committing it */ /** A Notary service that validates the transaction chain of the submitted transaction before committing it */
class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { class ValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
override val uniquenessProvider = PersistentUniquenessProvider(services.clock) override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database)
override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow = ValidatingNotaryFlow(otherPartySession, this) override fun createServiceFlow(otherPartySession: FlowSession): NotaryServiceFlow = ValidatingNotaryFlow(otherPartySession, this)
override fun start() {} override fun start() {}
override fun stop() {} override fun stop() {}
} }

View File

@ -28,7 +28,7 @@ import kotlin.test.assertFailsWith
class PersistentUniquenessProviderTests { class PersistentUniquenessProviderTests {
@Rule @Rule
@JvmField @JvmField
val testSerialization = SerializationEnvironmentRule() val testSerialization = SerializationEnvironmentRule(inheritable = true)
private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party private val identity = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party
private val txID = SecureHash.randomSHA256() private val txID = SecureHash.randomSHA256()
private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0) private val requestSignature = NotarisationRequestSignature(DigitalSignature.WithKey(NullKeys.NullPublicKey, ByteArray(32)), 0)
@ -49,18 +49,15 @@ class PersistentUniquenessProviderTests {
@Test @Test
fun `should commit a transaction with unused inputs without exception`() { fun `should commit a transaction with unused inputs without exception`() {
database.transaction { val provider = PersistentUniquenessProvider(Clock.systemUTC(), database)
val provider = PersistentUniquenessProvider(Clock.systemUTC())
val inputState = generateStateRef() val inputState = generateStateRef()
provider.commit(listOf(inputState), txID, identity, requestSignature) provider.commit(listOf(inputState), txID, identity, requestSignature)
}
} }
@Test @Test
fun `should report a conflict for a transaction with previously used inputs`() { fun `should report a conflict for a transaction with previously used inputs`() {
database.transaction { val provider = PersistentUniquenessProvider(Clock.systemUTC(), database)
val provider = PersistentUniquenessProvider(Clock.systemUTC())
val inputState = generateStateRef() val inputState = generateStateRef()
val inputs = listOf(inputState) val inputs = listOf(inputState)
@ -76,5 +73,4 @@ class PersistentUniquenessProviderTests {
val conflictCause = error.consumedStates[inputState]!! val conflictCause = error.consumedStates[inputState]!!
assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256()) assertEquals(conflictCause.hashOfTransactionId, firstTxId.sha256())
} }
}
} }

View File

@ -13,6 +13,7 @@ import net.corda.core.node.services.CordaService
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionWithSignatures import net.corda.core.transactions.TransactionWithSignatures
import net.corda.core.transactions.WireTransaction import net.corda.core.transactions.WireTransaction
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.PersistentUniquenessProvider
import java.security.PublicKey import java.security.PublicKey
import java.security.SignatureException import java.security.SignatureException
@ -25,8 +26,8 @@ import java.security.SignatureException
*/ */
// START 1 // START 1
@CordaService @CordaService
class MyCustomValidatingNotaryService(override val services: AppServiceHub, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() { class MyCustomValidatingNotaryService(override val services: ServiceHubInternal, override val notaryIdentityKey: PublicKey) : TrustedAuthorityNotaryService() {
override val uniquenessProvider = PersistentUniquenessProvider(services.clock) override val uniquenessProvider = PersistentUniquenessProvider(services.clock, services.database)
override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this) override fun createServiceFlow(otherPartySession: FlowSession): FlowLogic<Void?> = MyValidatingNotaryFlow(otherPartySession, this)