From 00b272906abe8159a40d855a983f4e32601347df Mon Sep 17 00:00:00 2001 From: Andrius Dagys Date: Thu, 29 Jun 2017 17:25:10 +0100 Subject: [PATCH] Decouple notary implementations from AbstractNode. Allow custom notaries to be provided via CorDapps. --- .../corda/core/node/services/NotaryService.kt | 78 +++++++++++++++++++ .../main/kotlin/net/corda/flows/NotaryFlow.kt | 51 ++---------- .../net/corda/node/internal/AbstractNode.kt | 57 +++++++------- .../kotlin/net/corda/node/internal/Node.kt | 18 ----- .../node/services/api/ServiceHubInternal.kt | 4 + .../BFTNonValidatingNotaryService.kt | 72 +++++++++-------- .../node/services/transactions/BFTSMaRt.kt | 12 ++- .../InMemoryUniquenessProvider.kt | 36 --------- .../transactions/NonValidatingNotaryFlow.kt | 7 +- .../services/transactions/NotaryService.kt | 15 ---- .../RaftNonValidatingNotaryService.kt | 24 ++++-- .../transactions/RaftUniquenessProvider.kt | 53 ++++++++----- .../RaftValidatingNotaryService.kt | 22 ++++-- .../transactions/SimpleNotaryService.kt | 20 +++-- .../transactions/ValidatingNotaryFlow.kt | 8 +- .../transactions/ValidatingNotaryService.kt | 20 +++-- .../node/services/MockServiceHubInternal.kt | 7 +- .../InMemoryUniquenessProviderTests.kt | 36 --------- .../kotlin/net/corda/testing/node/MockNode.kt | 3 - 19 files changed, 270 insertions(+), 273 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt delete mode 100644 node/src/main/kotlin/net/corda/node/services/transactions/InMemoryUniquenessProvider.kt delete mode 100644 node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt delete mode 100644 node/src/test/kotlin/net/corda/node/services/transactions/InMemoryUniquenessProviderTests.kt diff --git a/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt b/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt new file mode 100644 index 0000000000..aa3def742c --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/node/services/NotaryService.kt @@ -0,0 +1,78 @@ +package net.corda.core.node.services + +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TimeWindow +import net.corda.core.crypto.DigitalSignature +import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.SignedData +import net.corda.core.flows.FlowLogic +import net.corda.core.identity.Party +import net.corda.core.node.ServiceHub +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.serialization.serialize +import net.corda.core.utilities.loggerFor +import net.corda.flows.NotaryError +import net.corda.flows.NotaryException +import org.slf4j.Logger + +abstract class NotaryService : SingletonSerializeAsToken() { + abstract val services: ServiceHub + + abstract fun start() + abstract fun stop() + + /** + * Produces a notary service flow which has the corresponding sends and receives as [NotaryFlow.Client]. + * The first parameter is the client [Party] making the request and the second is the platform version + * of the client's node. Use this version parameter to provide backwards compatibility if the notary flow protocol + * changes. + */ + abstract fun createServiceFlow(otherParty: Party, platformVersion: Int): FlowLogic +} + +/** + * A base notary service implementation that provides functionality for cases where a signature by a single member + * of the cluster is sufficient for transaction notarisation. For example, a single-node or a Raft notary. + */ +abstract class TrustedAuthorityNotaryService : NotaryService() { + protected open val log: Logger = loggerFor() + + // TODO: specify the valid time window in config, and convert TimeWindowChecker to a utility method + protected abstract val timeWindowChecker: TimeWindowChecker + protected abstract val uniquenessProvider: UniquenessProvider + + fun validateTimeWindow(t: TimeWindow?) { + if (t != null && !timeWindowChecker.isValid(t)) + throw NotaryException(NotaryError.TimeWindowInvalid) + } + + /** + * A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that + * this method does not throw an exception when input states are present multiple times within the transaction. + */ + fun commitInputStates(inputs: List, txId: SecureHash, caller: Party) { + try { + uniquenessProvider.commit(inputs, txId, caller) + } catch (e: UniquenessException) { + val conflicts = inputs.filterIndexed { i, stateRef -> + val consumingTx = e.error.stateHistory[stateRef] + consumingTx != null && consumingTx != UniquenessProvider.ConsumingTx(txId, i, caller) + } + if (conflicts.isNotEmpty()) { + // TODO: Create a new UniquenessException that only contains the conflicts filtered above. + log.warn("Notary conflicts for $txId: $conflicts") + throw notaryException(txId, e) + } + } + } + + private fun notaryException(txId: SecureHash, e: UniquenessException): NotaryException { + val conflictData = e.error.serialize() + val signedConflict = SignedData(conflictData, sign(conflictData.bytes)) + return NotaryException(NotaryError.Conflict(txId, signedConflict)) + } + + fun sign(bits: ByteArray): DigitalSignature.WithKey { + return services.keyManagementService.sign(bits, services.notaryIdentityKey) + } +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt index a12770ca1f..579f9c8125 100644 --- a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt @@ -11,11 +11,8 @@ import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic import net.corda.core.flows.InitiatingFlow import net.corda.core.identity.Party -import net.corda.core.node.services.TimeWindowChecker -import net.corda.core.node.services.UniquenessException -import net.corda.core.node.services.UniquenessProvider +import net.corda.core.node.services.* import net.corda.core.serialization.CordaSerializable -import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.unwrap @@ -97,14 +94,13 @@ object NotaryFlow { * Additional transaction validation logic can be added when implementing [receiveAndVerifyTx]. */ // See AbstractStateReplacementFlow.Acceptor for why it's Void? - abstract class Service(val otherSide: Party, - val timeWindowChecker: TimeWindowChecker, - val uniquenessProvider: UniquenessProvider) : FlowLogic() { + abstract class Service(val otherSide: Party, val service: TrustedAuthorityNotaryService) : FlowLogic() { + @Suspendable override fun call(): Void? { val (id, inputs, timeWindow) = receiveAndVerifyTx() - validateTimeWindow(timeWindow) - commitInputStates(inputs, id) + service.validateTimeWindow(timeWindow) + service.commitInputStates(inputs, id, otherSide) signAndSendResponse(id) return null } @@ -118,44 +114,9 @@ object NotaryFlow { @Suspendable private fun signAndSendResponse(txId: SecureHash) { - val signature = sign(txId.bytes) + val signature = service.sign(txId.bytes) send(otherSide, listOf(signature)) } - - private fun validateTimeWindow(t: TimeWindow?) { - if (t != null && !timeWindowChecker.isValid(t)) - throw NotaryException(NotaryError.TimeWindowInvalid) - } - - /** - * A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that - * this method does not throw an exception when input states are present multiple times within the transaction. - */ - private fun commitInputStates(inputs: List, txId: SecureHash) { - try { - uniquenessProvider.commit(inputs, txId, otherSide) - } catch (e: UniquenessException) { - val conflicts = inputs.filterIndexed { i, stateRef -> - val consumingTx = e.error.stateHistory[stateRef] - consumingTx != null && consumingTx != UniquenessProvider.ConsumingTx(txId, i, otherSide) - } - if (conflicts.isNotEmpty()) { - // TODO: Create a new UniquenessException that only contains the conflicts filtered above. - logger.warn("Notary conflicts for $txId: $conflicts") - throw notaryException(txId, e) - } - } - } - - private fun sign(bits: ByteArray): DigitalSignature.WithKey { - return serviceHub.keyManagementService.sign(bits, serviceHub.notaryIdentityKey) - } - - private fun notaryException(txId: SecureHash, e: UniquenessException): NotaryException { - val conflictData = e.error.serialize() - val signedConflict = SignedData(conflictData, sign(conflictData.bytes)) - return NotaryException(NotaryError.Conflict(txId, signedConflict)) - } } } diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 1f28e00a0b..18fef52f56 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -20,6 +20,7 @@ import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.* import net.corda.core.node.services.* import net.corda.core.node.services.NetworkMapCache.MapChange +import net.corda.core.node.services.NotaryService import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize @@ -59,7 +60,6 @@ import net.corda.node.utilities.AddOrRemove.ADD import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.transaction -import net.corda.nodeapi.ArtemisMessagingComponent import org.apache.activemq.artemis.utils.ReusableLatch import org.bouncycastle.asn1.x500.X500Name import org.jetbrains.exposed.sql.Database @@ -133,7 +133,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, override val myInfo: NodeInfo get() = info override val schemaService: SchemaService get() = schemas override val transactionVerifierService: TransactionVerifierService get() = txVerifierService - override val auditService: AuditService get() = auditService + override val auditService: AuditService get() = this@AbstractNode.auditService + override val database: Database get() = this@AbstractNode.database + override val configuration: NodeConfiguration get() = this@AbstractNode.configuration override fun cordaService(type: Class): T { require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" } @@ -329,10 +331,19 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, } cordappServices.putInstance(serviceClass, service) smm.tokenizableServices += service + + if (service is NotaryService) handleCustomNotaryService(service) + log.info("Installed ${serviceClass.name} Corda service") return service } + private fun handleCustomNotaryService(service: NotaryService) { + runOnStop += service::stop + service.start() + installCoreFlow(NotaryFlow.Client::class, { party: Party, version: Int -> service.createServiceFlow(party, version) }) + } + private inline fun Class<*>.requireAnnotation(): A { return requireNotNull(getDeclaredAnnotation(A::class.java)) { "$name needs to be annotated with ${A::class.java.name}" } } @@ -624,7 +635,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val notaryServiceType = serviceTypes.singleOrNull { it.isNotary() } if (notaryServiceType != null) { - makeNotaryService(notaryServiceType, tokenizableServices) + makeCoreNotaryService(notaryServiceType, tokenizableServices) } } @@ -685,35 +696,27 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, inNodeNetworkMapService = PersistentNetworkMapService(services, configuration.minimumPlatformVersion) } - open protected fun makeNotaryService(type: ServiceType, tokenizableServices: MutableList) { - val timeWindowChecker = TimeWindowChecker(platformClock, 30.seconds) - val uniquenessProvider = makeUniquenessProvider(type) - tokenizableServices.add(uniquenessProvider) - - val notaryService = when (type) { - SimpleNotaryService.type -> SimpleNotaryService(timeWindowChecker, uniquenessProvider) - ValidatingNotaryService.type -> ValidatingNotaryService(timeWindowChecker, uniquenessProvider) - RaftNonValidatingNotaryService.type -> RaftNonValidatingNotaryService(timeWindowChecker, uniquenessProvider as RaftUniquenessProvider) - RaftValidatingNotaryService.type -> RaftValidatingNotaryService(timeWindowChecker, uniquenessProvider as RaftUniquenessProvider) - BFTNonValidatingNotaryService.type -> with(configuration) { - val replicaId = bftReplicaId ?: throw IllegalArgumentException("bftReplicaId value must be specified in the configuration") - BFTSMaRtConfig(notaryClusterAddresses).use { config -> - BFTNonValidatingNotaryService(config, services, timeWindowChecker, replicaId, database).also { - tokenizableServices += it.client - runOnStop += it::dispose - } - } - } + open protected fun makeCoreNotaryService(type: ServiceType, tokenizableServices: MutableList) { + val service: NotaryService = when (type) { + SimpleNotaryService.type -> SimpleNotaryService(services) + ValidatingNotaryService.type -> ValidatingNotaryService(services) + RaftNonValidatingNotaryService.type -> RaftNonValidatingNotaryService(services) + RaftValidatingNotaryService.type -> RaftValidatingNotaryService(services) + BFTNonValidatingNotaryService.type -> BFTNonValidatingNotaryService(services) else -> { - throw IllegalArgumentException("Notary type ${type.id} is not handled by makeNotaryService.") + log.info("Notary type ${type.id} does not match any built-in notary types. " + + "It is expected to be loaded via a CorDapp") + return } } - - installCoreFlow(NotaryFlow.Client::class, notaryService.serviceFlowFactory) + service.apply { + tokenizableServices.add(this) + runOnStop += this::stop + start() + } + installCoreFlow(NotaryFlow.Client::class, { party: Party, version: Int -> service.createServiceFlow(party, version) }) } - protected abstract fun makeUniquenessProvider(type: ServiceType): UniquenessProvider - protected open fun makeIdentityService(trustRoot: X509Certificate, clientCa: CertificateAndKeyPair?, legalIdentity: PartyAndCertificate): IdentityService { diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 347f35edd2..993fe6391b 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -11,8 +11,6 @@ import net.corda.core.minutes import net.corda.core.node.ServiceHub import net.corda.core.node.VersionInfo import net.corda.core.node.services.ServiceInfo -import net.corda.core.node.services.ServiceType -import net.corda.core.node.services.UniquenessProvider import net.corda.core.seconds import net.corda.core.success import net.corda.core.utilities.loggerFor @@ -26,10 +24,6 @@ import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDete import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDetectResponseProperty import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.NodeMessagingClient -import net.corda.node.services.transactions.PersistentUniquenessProvider -import net.corda.node.services.transactions.RaftNonValidatingNotaryService -import net.corda.node.services.transactions.RaftUniquenessProvider -import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.node.utilities.AddressUtils import net.corda.node.utilities.AffinityExecutor import net.corda.nodeapi.ArtemisMessagingComponent @@ -263,18 +257,6 @@ open class Node(override val configuration: FullNodeConfiguration, return networkMapConnection.flatMap { super.registerWithNetworkMap() } } - override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider { - return when (type) { - RaftValidatingNotaryService.type, RaftNonValidatingNotaryService.type -> with(configuration) { - val provider = RaftUniquenessProvider(baseDirectory, notaryNodeAddress!!, notaryClusterAddresses, database, configuration) - provider.start() - runOnStop += provider::stop - provider - } - else -> PersistentUniquenessProvider() - } - } - override fun myAddresses(): List { val address = network.myAddress as ArtemisMessagingComponent.ArtemisPeerAddress return listOf(address.hostAndPort) diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt index abcda505f4..65c48f39b6 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt @@ -15,9 +15,11 @@ import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.loggerFor import net.corda.node.internal.InitiatedFlowFactory +import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.messaging.MessagingService import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl import net.corda.node.services.statemachine.FlowStateMachineImpl +import org.jetbrains.exposed.sql.Database interface NetworkMapCacheInternal : NetworkMapCache { /** @@ -68,6 +70,8 @@ abstract class ServiceHubInternal : PluginServiceHub { abstract val auditService: AuditService abstract val rpcFlows: List>> abstract val networkService: MessagingService + abstract val database: Database + abstract val configuration: NodeConfiguration /** * Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt index 4dd60e71a9..a64cfaaa9d 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTNonValidatingNotaryService.kt @@ -6,6 +6,7 @@ import net.corda.core.crypto.DigitalSignature import net.corda.core.flows.FlowLogic import net.corda.core.getOrThrow import net.corda.core.identity.Party +import net.corda.core.node.services.NotaryService import net.corda.core.node.services.TimeWindowChecker import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize @@ -15,7 +16,6 @@ import net.corda.core.utilities.loggerFor import net.corda.core.utilities.unwrap import net.corda.flows.NotaryException import net.corda.node.services.api.ServiceHubInternal -import org.jetbrains.exposed.sql.Database import kotlin.concurrent.thread /** @@ -23,40 +23,42 @@ import kotlin.concurrent.thread * * A transaction is notarised when the consensus is reached by the cluster on its uniqueness, and time-window validity. */ -class BFTNonValidatingNotaryService(config: BFTSMaRtConfig, - services: ServiceHubInternal, - timeWindowChecker: TimeWindowChecker, - replicaId: Int, - db: Database) : NotaryService { - val client = BFTSMaRt.Client(config, replicaId) // (Ab)use replicaId for clientId. - private val replicaHolder = SettableFuture.create() - - init { - // Replica startup must be in parallel with other replicas, otherwise the constructor may not return: - val configHandle = config.handle() - thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) { - configHandle.use { - replicaHolder.set(Replica(it, replicaId, db, "bft_smart_notary_committed_states", services, timeWindowChecker)) - log.info("BFT SMaRt replica $replicaId is running.") - } - } - } - - fun dispose() { - replicaHolder.getOrThrow().dispose() - client.dispose() - } - +class BFTNonValidatingNotaryService(override val services: ServiceHubInternal) : NotaryService() { companion object { val type = SimpleNotaryService.type.getSubType("bft") private val log = loggerFor() } - override val serviceFlowFactory: (Party, Int) -> FlowLogic = { otherParty, _ -> - ServiceFlow(otherParty, client) + private val client: BFTSMaRt.Client + private val replicaHolder = SettableFuture.create() + + init { + val replicaId = services.configuration.bftReplicaId ?: throw IllegalArgumentException("bftReplicaId value must be specified in the configuration") + val config = BFTSMaRtConfig(services.configuration.notaryClusterAddresses) + + client = config.use { + val configHandle = config.handle() + // Replica startup must be in parallel with other replicas, otherwise the constructor may not return: + thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) { + configHandle.use { + val timeWindowChecker = TimeWindowChecker(services.clock) + val replica = Replica(it, replicaId, "bft_smart_notary_committed_states", services, timeWindowChecker) + replicaHolder.set(replica) + log.info("BFT SMaRt replica $replicaId is running.") + } + } + + BFTSMaRt.Client(it, replicaId) + } } - private class ServiceFlow(val otherSide: Party, val client: BFTSMaRt.Client) : FlowLogic() { + fun commitTransaction(tx: Any, otherSide: Party) = client.commitTransaction(tx, otherSide) + + override fun createServiceFlow(otherParty: Party, platformVersion: Int): FlowLogic { + return ServiceFlow(otherParty, this) + } + + private class ServiceFlow(val otherSide: Party, val service: BFTNonValidatingNotaryService) : FlowLogic() { @Suspendable override fun call(): Void? { val stx = receive(otherSide).unwrap { it } @@ -66,7 +68,7 @@ class BFTNonValidatingNotaryService(config: BFTSMaRtConfig, } private fun commit(stx: FilteredTransaction): List { - val response = client.commitTransaction(stx, otherSide) + val response = service.commitTransaction(stx, otherSide) when (response) { is BFTSMaRt.ClusterResponse.Error -> throw NotaryException(response.error) is BFTSMaRt.ClusterResponse.Signatures -> { @@ -79,10 +81,9 @@ class BFTNonValidatingNotaryService(config: BFTSMaRtConfig, private class Replica(config: BFTSMaRtConfig, replicaId: Int, - db: Database, tableName: String, services: ServiceHubInternal, - timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Replica(config, replicaId, db, tableName, services, timeWindowChecker) { + timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Replica(config, replicaId, tableName, services, timeWindowChecker) { override fun executeCommand(command: ByteArray): ByteArray { val request = command.deserialize() @@ -107,5 +108,14 @@ class BFTNonValidatingNotaryService(config: BFTSMaRtConfig, BFTSMaRt.ReplicaResponse.Error(e.error) } } + + } + + override fun start() { + } + + override fun stop() { + replicaHolder.getOrThrow().dispose() + client.dispose() } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt index 2a13f0cab4..d95f768a4a 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/BFTSMaRt.kt @@ -37,7 +37,6 @@ import net.corda.node.services.transactions.BFTSMaRt.Client import net.corda.node.services.transactions.BFTSMaRt.Replica import net.corda.node.utilities.JDBCHashMap import net.corda.node.utilities.transaction -import org.jetbrains.exposed.sql.Database import java.nio.file.Path import java.util.* @@ -170,7 +169,6 @@ object BFTSMaRt { */ abstract class Replica(config: BFTSMaRtConfig, replicaId: Int, - private val db: Database, tableName: String, private val services: ServiceHubInternal, private val timeWindowChecker: TimeWindowChecker) : DefaultRecoverable() { @@ -180,7 +178,7 @@ object BFTSMaRt { // TODO: Use Requery with proper DB schema instead of JDBCHashMap. // Must be initialised before ServiceReplica is started - private val commitLog = db.transaction { JDBCHashMap(tableName) } + private val commitLog = services.database.transaction { JDBCHashMap(tableName) } @Suppress("LeakingThis") private val replica = CordaServiceReplica(replicaId, config.path, this) @@ -205,7 +203,7 @@ object BFTSMaRt { protected fun commitInputStates(states: List, txId: SecureHash, callerIdentity: Party) { log.debug { "Attempting to commit inputs for transaction: $txId" } val conflicts = mutableMapOf() - db.transaction { + services.database.transaction { states.forEach { state -> commitLog[state]?.let { conflicts[state] = it } } @@ -231,7 +229,7 @@ object BFTSMaRt { } protected fun sign(bytes: ByteArray): DigitalSignature.WithKey { - return db.transaction { services.keyManagementService.sign(bytes, services.notaryIdentityKey) } + return services.database.transaction { services.keyManagementService.sign(bytes, services.notaryIdentityKey) } } // TODO: @@ -240,7 +238,7 @@ object BFTSMaRt { override fun getSnapshot(): ByteArray { // LinkedHashMap for deterministic serialisation val m = LinkedHashMap() - db.transaction { + services.database.transaction { commitLog.forEach { m[it.key] = it.value } } return m.serialize().bytes @@ -248,7 +246,7 @@ object BFTSMaRt { override fun installSnapshot(bytes: ByteArray) { val m = bytes.deserialize>() - db.transaction { + services.database.transaction { commitLog.clear() commitLog.putAll(m) } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/InMemoryUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/InMemoryUniquenessProvider.kt deleted file mode 100644 index 17021d1c1b..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/transactions/InMemoryUniquenessProvider.kt +++ /dev/null @@ -1,36 +0,0 @@ -package net.corda.node.services.transactions - -import net.corda.core.ThreadBox -import net.corda.core.contracts.StateRef -import net.corda.core.identity.Party -import net.corda.core.crypto.SecureHash -import net.corda.core.node.services.UniquenessException -import net.corda.core.node.services.UniquenessProvider -import java.util.* -import javax.annotation.concurrent.ThreadSafe - -/** A dummy Uniqueness provider that stores the whole history of consumed states in memory */ -@ThreadSafe -class InMemoryUniquenessProvider : UniquenessProvider { - /** For each input state store the consuming transaction information */ - private val committedStates = ThreadBox(HashMap()) - - override fun commit(states: List, txId: SecureHash, callerIdentity: Party) { - committedStates.locked { - val conflictingStates = LinkedHashMap() - for (inputState in states) { - val consumingTx = get(inputState) - if (consumingTx != null) conflictingStates[inputState] = consumingTx - } - if (conflictingStates.isNotEmpty()) { - val conflict = UniquenessProvider.Conflict(conflictingStates) - throw UniquenessException(conflict) - } else { - states.forEachIndexed { i, stateRef -> - put(stateRef, UniquenessProvider.ConsumingTx(txId, i, callerIdentity)) - } - } - - } - } -} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt index 470ad66a8e..354ef7799d 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/NonValidatingNotaryFlow.kt @@ -2,16 +2,13 @@ package net.corda.node.services.transactions import co.paralleluniverse.fibers.Suspendable import net.corda.core.identity.Party -import net.corda.core.node.services.TimeWindowChecker -import net.corda.core.node.services.UniquenessProvider +import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.transactions.FilteredTransaction import net.corda.core.utilities.unwrap import net.corda.flows.NotaryFlow import net.corda.flows.TransactionParts -class NonValidatingNotaryFlow(otherSide: Party, - timeWindowChecker: TimeWindowChecker, - uniquenessProvider: UniquenessProvider) : NotaryFlow.Service(otherSide, timeWindowChecker, uniquenessProvider) { +class NonValidatingNotaryFlow(otherSide: Party, service: TrustedAuthorityNotaryService) : NotaryFlow.Service(otherSide, service) { /** * The received transaction is not checked for contract-validity, as that would require fully * resolving it into a [TransactionForVerification], for which the caller would have to reveal the whole transaction diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt deleted file mode 100644 index 9abef4eb50..0000000000 --- a/node/src/main/kotlin/net/corda/node/services/transactions/NotaryService.kt +++ /dev/null @@ -1,15 +0,0 @@ -package net.corda.node.services.transactions - -import net.corda.core.flows.FlowLogic -import net.corda.core.identity.Party - -interface NotaryService { - - /** - * Factory for producing notary service flows which have the corresponding sends and receives as NotaryFlow.Client. - * The first parameter is the client [Party] making the request and the second is the platform version - * of the client's node. Use this version parameter to provide backwards compatibility if the notary flow protocol - * changes. - */ - val serviceFlowFactory: (Party, Int) -> FlowLogic -} diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt index 614dfdeb36..05bcabe172 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftNonValidatingNotaryService.kt @@ -1,17 +1,29 @@ package net.corda.node.services.transactions -import net.corda.core.flows.FlowLogic import net.corda.core.identity.Party +import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.node.services.TimeWindowChecker +import net.corda.flows.NotaryFlow +import net.corda.node.services.api.ServiceHubInternal /** A non-validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */ -class RaftNonValidatingNotaryService(val timeWindowChecker: TimeWindowChecker, - val uniquenessProvider: RaftUniquenessProvider) : NotaryService { +class RaftNonValidatingNotaryService(override val services: ServiceHubInternal) : TrustedAuthorityNotaryService() { companion object { val type = SimpleNotaryService.type.getSubType("raft") } - override val serviceFlowFactory: (Party, Int) -> FlowLogic = { otherParty, _ -> - NonValidatingNotaryFlow(otherParty, timeWindowChecker, uniquenessProvider) + override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock) + override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services) + + override fun createServiceFlow(otherParty: Party, platformVersion: Int): NotaryFlow.Service { + return NonValidatingNotaryFlow(otherParty, this) } -} + + override fun start() { + uniquenessProvider.start() + } + + override fun stop() { + uniquenessProvider.stop() + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt index 87a09a22e3..e0f514cc32 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt @@ -23,6 +23,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.loggerFor +import net.corda.node.services.api.ServiceHubInternal import net.corda.nodeapi.config.SSLConfiguration import org.jetbrains.exposed.sql.Database import java.nio.file.Path @@ -36,27 +37,29 @@ import javax.annotation.concurrent.ThreadSafe * The uniqueness provider maintains both a Copycat cluster node (server) and a client through which it can submit * requests to the cluster. In Copycat, a client request is first sent to the server it's connected to and then redirected * to the cluster leader to be actioned. - * - * @param storagePath Directory storing the Raft log and state machine snapshots - * @param myAddress Address of the Copycat node run by this Corda node - * @param clusterAddresses List of node addresses in the existing Copycat cluster. At least one active node must be - * provided to join the cluster. If empty, a new cluster will be bootstrapped. - * @param db The database to store the state machine state in - * @param config SSL configuration */ @ThreadSafe -class RaftUniquenessProvider( - val storagePath: Path, - val myAddress: HostAndPort, - val clusterAddresses: List, - val db: Database, - val config: SSLConfiguration -) : UniquenessProvider, SingletonSerializeAsToken() { +class RaftUniquenessProvider(services: ServiceHubInternal) : UniquenessProvider, SingletonSerializeAsToken() { companion object { private val log = loggerFor() private val DB_TABLE_NAME = "notary_committed_states" } + /** Directory storing the Raft log and state machine snapshots */ + private val storagePath: Path = services.configuration.baseDirectory + /** Address of the Copycat node run by this Corda node */ + private val myAddress: HostAndPort = services.configuration.notaryNodeAddress + ?: throw IllegalArgumentException("notaryNodeAddress must be specified in configuration") + /** + * List of node addresses in the existing Copycat cluster. At least one active node must be + * provided to join the cluster. If empty, a new cluster will be bootstrapped. + */ + private val clusterAddresses: List = services.configuration.notaryClusterAddresses + /** The database to store the state machine state in */ + private val db: Database = services.database + /** SSL configuration */ + private val transportConfiguration: SSLConfiguration = services.configuration + private lateinit var _clientFuture: CompletableFuture private lateinit var server: CopycatServer /** @@ -71,13 +74,21 @@ class RaftUniquenessProvider( val stateMachineFactory = { DistributedImmutableMap(db, DB_TABLE_NAME) } val address = Address(myAddress.host, myAddress.port) val storage = buildStorage(storagePath) - val transport = buildTransport(config) + val transport = buildTransport(transportConfiguration) val serializer = Serializer().apply { // Add serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide: register(DistributedImmutableMap.Commands.PutAll::class.java) { object : TypeSerializer> { - override fun write(obj: DistributedImmutableMap.Commands.PutAll<*, *>, buffer: BufferOutput>, serializer: Serializer) = writeMap(obj.entries, buffer, serializer) - override fun read(type: Class>, buffer: BufferInput>, serializer: Serializer) = DistributedImmutableMap.Commands.PutAll(readMap(buffer, serializer)) + override fun write(obj: DistributedImmutableMap.Commands.PutAll<*, *>, + buffer: BufferOutput>, + serializer: Serializer) { + writeMap(obj.entries, buffer, serializer) + } + override fun read(type: Class>, + buffer: BufferInput>, + serializer: Serializer): DistributedImmutableMap.Commands.PutAll { + return DistributedImmutableMap.Commands.PutAll(readMap(buffer, serializer)) + } } } register(LinkedHashMap::class.java) { @@ -170,4 +181,10 @@ private fun writeMap(map: Map<*, *>, buffer: BufferOutput>, } } -private fun readMap(buffer: BufferInput>, serializer: Serializer) = LinkedHashMap().apply { repeat(buffer.readInt()) { put(serializer.readObject(buffer), serializer.readObject(buffer)) } } +private fun readMap(buffer: BufferInput>, serializer: Serializer): LinkedHashMap { + return LinkedHashMap().apply { + repeat(buffer.readInt()) { + put(serializer.readObject(buffer), serializer.readObject(buffer)) + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt index ff0217d12d..deba64d1a3 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftValidatingNotaryService.kt @@ -1,17 +1,29 @@ package net.corda.node.services.transactions -import net.corda.core.flows.FlowLogic import net.corda.core.identity.Party +import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.node.services.TimeWindowChecker +import net.corda.flows.NotaryFlow +import net.corda.node.services.api.ServiceHubInternal /** A validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */ -class RaftValidatingNotaryService(val timeWindowChecker: TimeWindowChecker, - val uniquenessProvider: RaftUniquenessProvider) : NotaryService { +class RaftValidatingNotaryService(override val services: ServiceHubInternal) : TrustedAuthorityNotaryService() { companion object { val type = ValidatingNotaryService.type.getSubType("raft") } - override val serviceFlowFactory: (Party, Int) -> FlowLogic = { otherParty, _ -> - ValidatingNotaryFlow(otherParty, timeWindowChecker, uniquenessProvider) + override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock) + override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services) + + override fun createServiceFlow(otherParty: Party, platformVersion: Int): NotaryFlow.Service { + return ValidatingNotaryFlow(otherParty, this) + } + + override fun start() { + uniquenessProvider.start() + } + + override fun stop() { + uniquenessProvider.stop() } } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt index 5ac707bc9e..c23d19532b 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/SimpleNotaryService.kt @@ -1,19 +1,25 @@ package net.corda.node.services.transactions -import net.corda.core.flows.FlowLogic import net.corda.core.identity.Party +import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.node.services.ServiceType import net.corda.core.node.services.TimeWindowChecker -import net.corda.core.node.services.UniquenessProvider +import net.corda.flows.NotaryFlow +import net.corda.node.services.api.ServiceHubInternal /** A simple Notary service that does not perform transaction validation */ -class SimpleNotaryService(val timeWindowChecker: TimeWindowChecker, - val uniquenessProvider: UniquenessProvider) : NotaryService { +class SimpleNotaryService(override val services: ServiceHubInternal) : TrustedAuthorityNotaryService() { companion object { val type = ServiceType.notary.getSubType("simple") } - override val serviceFlowFactory: (Party, Int) -> FlowLogic = { otherParty, _ -> - NonValidatingNotaryFlow(otherParty, timeWindowChecker, uniquenessProvider) + override val timeWindowChecker = TimeWindowChecker(services.clock) + override val uniquenessProvider = PersistentUniquenessProvider() + + override fun createServiceFlow(otherParty: Party, platformVersion: Int): NotaryFlow.Service { + return NonValidatingNotaryFlow(otherParty, this) } -} + + override fun start() {} + override fun stop() {} +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt index d30180db05..dca4e5f5ad 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryFlow.kt @@ -3,8 +3,7 @@ package net.corda.node.services.transactions import co.paralleluniverse.fibers.Suspendable import net.corda.core.contracts.TransactionVerificationException import net.corda.core.identity.Party -import net.corda.core.node.services.TimeWindowChecker -import net.corda.core.node.services.UniquenessProvider +import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.unwrap @@ -17,10 +16,7 @@ import java.security.SignatureException * has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was * indeed valid. */ -class ValidatingNotaryFlow(otherSide: Party, - timeWindowChecker: TimeWindowChecker, - uniquenessProvider: UniquenessProvider) : - NotaryFlow.Service(otherSide, timeWindowChecker, uniquenessProvider) { +class ValidatingNotaryFlow(otherSide: Party, service: TrustedAuthorityNotaryService) : NotaryFlow.Service(otherSide, service) { /** * The received transaction is checked for contract-validity, which requires fully resolving it into a * [TransactionForVerification], for which the caller also has to to reveal the whole transaction diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt index 72b819e90a..c996a8979d 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/ValidatingNotaryService.kt @@ -1,19 +1,25 @@ package net.corda.node.services.transactions -import net.corda.core.flows.FlowLogic import net.corda.core.identity.Party +import net.corda.core.node.services.TrustedAuthorityNotaryService import net.corda.core.node.services.ServiceType import net.corda.core.node.services.TimeWindowChecker -import net.corda.core.node.services.UniquenessProvider +import net.corda.flows.NotaryFlow +import net.corda.node.services.api.ServiceHubInternal /** A Notary service that validates the transaction chain of the submitted transaction before committing it */ -class ValidatingNotaryService(val timeWindowChecker: TimeWindowChecker, - val uniquenessProvider: UniquenessProvider) : NotaryService { +class ValidatingNotaryService(override val services: ServiceHubInternal) : TrustedAuthorityNotaryService() { companion object { val type = ServiceType.notary.getSubType("validating") } - override val serviceFlowFactory: (Party, Int) -> FlowLogic = { otherParty, _ -> - ValidatingNotaryFlow(otherParty, timeWindowChecker, uniquenessProvider) + override val timeWindowChecker = TimeWindowChecker(services.clock) + override val uniquenessProvider = PersistentUniquenessProvider() + + override fun createServiceFlow(otherParty: Party, platformVersion: Int): NotaryFlow.Service { + return ValidatingNotaryFlow(otherParty, this) } -} + + override fun start() {} + override fun stop() {} +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt index b7653f2a58..d82203d01a 100644 --- a/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt +++ b/node/src/test/kotlin/net/corda/node/services/MockServiceHubInternal.kt @@ -10,6 +10,7 @@ import net.corda.core.transactions.SignedTransaction import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.serialization.NodeClock import net.corda.node.services.api.* +import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.messaging.MessagingService import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.statemachine.FlowStateMachineImpl @@ -18,6 +19,7 @@ import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.testing.MOCK_IDENTITY_SERVICE import net.corda.testing.node.MockNetworkMapCache import net.corda.testing.node.MockStorageService +import org.jetbrains.exposed.sql.Database import java.time.Clock open class MockServiceHubInternal( @@ -55,7 +57,10 @@ open class MockServiceHubInternal( get() = overrideClock ?: throw UnsupportedOperationException() override val myInfo: NodeInfo get() = throw UnsupportedOperationException() - + override val database: Database + get() = throw UnsupportedOperationException() + override val configuration: NodeConfiguration + get() = throw UnsupportedOperationException() override val monitoringService: MonitoringService = MonitoringService(MetricRegistry()) override val rpcFlows: List>> get() = throw UnsupportedOperationException() diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/InMemoryUniquenessProviderTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/InMemoryUniquenessProviderTests.kt deleted file mode 100644 index 8634d885be..0000000000 --- a/node/src/test/kotlin/net/corda/node/services/transactions/InMemoryUniquenessProviderTests.kt +++ /dev/null @@ -1,36 +0,0 @@ -package net.corda.node.services.transactions - -import net.corda.core.crypto.SecureHash -import net.corda.core.node.services.UniquenessException -import net.corda.testing.MEGA_CORP -import net.corda.testing.generateStateRef -import org.junit.Test -import kotlin.test.assertEquals -import kotlin.test.assertFailsWith - -class InMemoryUniquenessProviderTests { - val identity = MEGA_CORP - val txID = SecureHash.randomSHA256() - - @Test fun `should commit a transaction with unused inputs without exception`() { - val provider = InMemoryUniquenessProvider() - val inputState = generateStateRef() - - provider.commit(listOf(inputState), txID, identity) - } - - @Test fun `should report a conflict for a transaction with previously used inputs`() { - val provider = InMemoryUniquenessProvider() - val inputState = generateStateRef() - - val inputs = listOf(inputState) - provider.commit(inputs, txID, identity) - - val ex = assertFailsWith { provider.commit(inputs, txID, identity) } - - val consumingTx = ex.error.stateHistory[inputState]!! - assertEquals(consumingTx.id, txID) - assertEquals(consumingTx.inputIndex, inputs.indexOf(inputState)) - assertEquals(consumingTx.requestingParty, identity) - } -} diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index 44e762f1f8..2762a20236 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -30,7 +30,6 @@ import net.corda.node.services.messaging.MessagingService import net.corda.node.services.network.InMemoryNetworkMapService import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.InMemoryTransactionVerifierService -import net.corda.node.services.transactions.InMemoryUniquenessProvider import net.corda.node.services.transactions.SimpleNotaryService import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.node.services.vault.NodeVaultService @@ -225,8 +224,6 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, // There is no need to slow down the unit tests by initialising CityDatabase override fun findMyLocation(): WorldMapLocation? = null - override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider = InMemoryUniquenessProvider() - override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1) override fun myAddresses(): List = listOf(HostAndPort.fromHost("mockHost"))