Decouple notary implementations from AbstractNode. Allow custom notaries to be provided via CorDapps.

This commit is contained in:
Andrius Dagys 2017-06-29 17:25:10 +01:00
parent 083b8265b5
commit 00b272906a
19 changed files with 270 additions and 273 deletions

View File

@ -0,0 +1,78 @@
package net.corda.core.node.services
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.SignedData
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.flows.NotaryError
import net.corda.flows.NotaryException
import org.slf4j.Logger
abstract class NotaryService : SingletonSerializeAsToken() {
abstract val services: ServiceHub
abstract fun start()
abstract fun stop()
/**
* Produces a notary service flow which has the corresponding sends and receives as [NotaryFlow.Client].
* The first parameter is the client [Party] making the request and the second is the platform version
* of the client's node. Use this version parameter to provide backwards compatibility if the notary flow protocol
* changes.
*/
abstract fun createServiceFlow(otherParty: Party, platformVersion: Int): FlowLogic<Void?>
}
/**
* A base notary service implementation that provides functionality for cases where a signature by a single member
* of the cluster is sufficient for transaction notarisation. For example, a single-node or a Raft notary.
*/
abstract class TrustedAuthorityNotaryService : NotaryService() {
protected open val log: Logger = loggerFor<TrustedAuthorityNotaryService>()
// TODO: specify the valid time window in config, and convert TimeWindowChecker to a utility method
protected abstract val timeWindowChecker: TimeWindowChecker
protected abstract val uniquenessProvider: UniquenessProvider
fun validateTimeWindow(t: TimeWindow?) {
if (t != null && !timeWindowChecker.isValid(t))
throw NotaryException(NotaryError.TimeWindowInvalid)
}
/**
* A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that
* this method does not throw an exception when input states are present multiple times within the transaction.
*/
fun commitInputStates(inputs: List<StateRef>, txId: SecureHash, caller: Party) {
try {
uniquenessProvider.commit(inputs, txId, caller)
} catch (e: UniquenessException) {
val conflicts = inputs.filterIndexed { i, stateRef ->
val consumingTx = e.error.stateHistory[stateRef]
consumingTx != null && consumingTx != UniquenessProvider.ConsumingTx(txId, i, caller)
}
if (conflicts.isNotEmpty()) {
// TODO: Create a new UniquenessException that only contains the conflicts filtered above.
log.warn("Notary conflicts for $txId: $conflicts")
throw notaryException(txId, e)
}
}
}
private fun notaryException(txId: SecureHash, e: UniquenessException): NotaryException {
val conflictData = e.error.serialize()
val signedConflict = SignedData(conflictData, sign(conflictData.bytes))
return NotaryException(NotaryError.Conflict(txId, signedConflict))
}
fun sign(bits: ByteArray): DigitalSignature.WithKey {
return services.keyManagementService.sign(bits, services.notaryIdentityKey)
}
}

View File

@ -11,11 +11,8 @@ import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow
import net.corda.core.identity.Party
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.UniquenessException
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.node.services.*
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.unwrap
@ -97,14 +94,13 @@ object NotaryFlow {
* Additional transaction validation logic can be added when implementing [receiveAndVerifyTx].
*/
// See AbstractStateReplacementFlow.Acceptor for why it's Void?
abstract class Service(val otherSide: Party,
val timeWindowChecker: TimeWindowChecker,
val uniquenessProvider: UniquenessProvider) : FlowLogic<Void?>() {
abstract class Service(val otherSide: Party, val service: TrustedAuthorityNotaryService) : FlowLogic<Void?>() {
@Suspendable
override fun call(): Void? {
val (id, inputs, timeWindow) = receiveAndVerifyTx()
validateTimeWindow(timeWindow)
commitInputStates(inputs, id)
service.validateTimeWindow(timeWindow)
service.commitInputStates(inputs, id, otherSide)
signAndSendResponse(id)
return null
}
@ -118,44 +114,9 @@ object NotaryFlow {
@Suspendable
private fun signAndSendResponse(txId: SecureHash) {
val signature = sign(txId.bytes)
val signature = service.sign(txId.bytes)
send(otherSide, listOf(signature))
}
private fun validateTimeWindow(t: TimeWindow?) {
if (t != null && !timeWindowChecker.isValid(t))
throw NotaryException(NotaryError.TimeWindowInvalid)
}
/**
* A NotaryException is thrown if any of the states have been consumed by a different transaction. Note that
* this method does not throw an exception when input states are present multiple times within the transaction.
*/
private fun commitInputStates(inputs: List<StateRef>, txId: SecureHash) {
try {
uniquenessProvider.commit(inputs, txId, otherSide)
} catch (e: UniquenessException) {
val conflicts = inputs.filterIndexed { i, stateRef ->
val consumingTx = e.error.stateHistory[stateRef]
consumingTx != null && consumingTx != UniquenessProvider.ConsumingTx(txId, i, otherSide)
}
if (conflicts.isNotEmpty()) {
// TODO: Create a new UniquenessException that only contains the conflicts filtered above.
logger.warn("Notary conflicts for $txId: $conflicts")
throw notaryException(txId, e)
}
}
}
private fun sign(bits: ByteArray): DigitalSignature.WithKey {
return serviceHub.keyManagementService.sign(bits, serviceHub.notaryIdentityKey)
}
private fun notaryException(txId: SecureHash, e: UniquenessException): NotaryException {
val conflictData = e.error.serialize()
val signedConflict = SignedData(conflictData, sign(conflictData.bytes))
return NotaryException(NotaryError.Conflict(txId, signedConflict))
}
}
}

View File

@ -20,6 +20,7 @@ import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.NotaryService
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
@ -59,7 +60,6 @@ import net.corda.node.utilities.AddOrRemove.ADD
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.configureDatabase
import net.corda.node.utilities.transaction
import net.corda.nodeapi.ArtemisMessagingComponent
import org.apache.activemq.artemis.utils.ReusableLatch
import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database
@ -133,7 +133,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
override val myInfo: NodeInfo get() = info
override val schemaService: SchemaService get() = schemas
override val transactionVerifierService: TransactionVerifierService get() = txVerifierService
override val auditService: AuditService get() = auditService
override val auditService: AuditService get() = this@AbstractNode.auditService
override val database: Database get() = this@AbstractNode.database
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" }
@ -329,10 +331,19 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
}
cordappServices.putInstance(serviceClass, service)
smm.tokenizableServices += service
if (service is NotaryService) handleCustomNotaryService(service)
log.info("Installed ${serviceClass.name} Corda service")
return service
}
private fun handleCustomNotaryService(service: NotaryService) {
runOnStop += service::stop
service.start()
installCoreFlow(NotaryFlow.Client::class, { party: Party, version: Int -> service.createServiceFlow(party, version) })
}
private inline fun <reified A : Annotation> Class<*>.requireAnnotation(): A {
return requireNotNull(getDeclaredAnnotation(A::class.java)) { "$name needs to be annotated with ${A::class.java.name}" }
}
@ -624,7 +635,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
val notaryServiceType = serviceTypes.singleOrNull { it.isNotary() }
if (notaryServiceType != null) {
makeNotaryService(notaryServiceType, tokenizableServices)
makeCoreNotaryService(notaryServiceType, tokenizableServices)
}
}
@ -685,35 +696,27 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
inNodeNetworkMapService = PersistentNetworkMapService(services, configuration.minimumPlatformVersion)
}
open protected fun makeNotaryService(type: ServiceType, tokenizableServices: MutableList<Any>) {
val timeWindowChecker = TimeWindowChecker(platformClock, 30.seconds)
val uniquenessProvider = makeUniquenessProvider(type)
tokenizableServices.add(uniquenessProvider)
val notaryService = when (type) {
SimpleNotaryService.type -> SimpleNotaryService(timeWindowChecker, uniquenessProvider)
ValidatingNotaryService.type -> ValidatingNotaryService(timeWindowChecker, uniquenessProvider)
RaftNonValidatingNotaryService.type -> RaftNonValidatingNotaryService(timeWindowChecker, uniquenessProvider as RaftUniquenessProvider)
RaftValidatingNotaryService.type -> RaftValidatingNotaryService(timeWindowChecker, uniquenessProvider as RaftUniquenessProvider)
BFTNonValidatingNotaryService.type -> with(configuration) {
val replicaId = bftReplicaId ?: throw IllegalArgumentException("bftReplicaId value must be specified in the configuration")
BFTSMaRtConfig(notaryClusterAddresses).use { config ->
BFTNonValidatingNotaryService(config, services, timeWindowChecker, replicaId, database).also {
tokenizableServices += it.client
runOnStop += it::dispose
}
}
}
open protected fun makeCoreNotaryService(type: ServiceType, tokenizableServices: MutableList<Any>) {
val service: NotaryService = when (type) {
SimpleNotaryService.type -> SimpleNotaryService(services)
ValidatingNotaryService.type -> ValidatingNotaryService(services)
RaftNonValidatingNotaryService.type -> RaftNonValidatingNotaryService(services)
RaftValidatingNotaryService.type -> RaftValidatingNotaryService(services)
BFTNonValidatingNotaryService.type -> BFTNonValidatingNotaryService(services)
else -> {
throw IllegalArgumentException("Notary type ${type.id} is not handled by makeNotaryService.")
log.info("Notary type ${type.id} does not match any built-in notary types. " +
"It is expected to be loaded via a CorDapp")
return
}
}
installCoreFlow(NotaryFlow.Client::class, notaryService.serviceFlowFactory)
service.apply {
tokenizableServices.add(this)
runOnStop += this::stop
start()
}
installCoreFlow(NotaryFlow.Client::class, { party: Party, version: Int -> service.createServiceFlow(party, version) })
}
protected abstract fun makeUniquenessProvider(type: ServiceType): UniquenessProvider
protected open fun makeIdentityService(trustRoot: X509Certificate,
clientCa: CertificateAndKeyPair?,
legalIdentity: PartyAndCertificate): IdentityService {

View File

@ -11,8 +11,6 @@ import net.corda.core.minutes
import net.corda.core.node.ServiceHub
import net.corda.core.node.VersionInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.seconds
import net.corda.core.success
import net.corda.core.utilities.loggerFor
@ -26,10 +24,6 @@ import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDete
import net.corda.node.services.messaging.ArtemisMessagingServer.Companion.ipDetectResponseProperty
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.RaftNonValidatingNotaryService
import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.utilities.AddressUtils
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.ArtemisMessagingComponent
@ -263,18 +257,6 @@ open class Node(override val configuration: FullNodeConfiguration,
return networkMapConnection.flatMap { super.registerWithNetworkMap() }
}
override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider {
return when (type) {
RaftValidatingNotaryService.type, RaftNonValidatingNotaryService.type -> with(configuration) {
val provider = RaftUniquenessProvider(baseDirectory, notaryNodeAddress!!, notaryClusterAddresses, database, configuration)
provider.start()
runOnStop += provider::stop
provider
}
else -> PersistentUniquenessProvider()
}
}
override fun myAddresses(): List<HostAndPort> {
val address = network.myAddress as ArtemisMessagingComponent.ArtemisPeerAddress
return listOf(address.hostAndPort)

View File

@ -15,9 +15,11 @@ import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.FlowStateMachineImpl
import org.jetbrains.exposed.sql.Database
interface NetworkMapCacheInternal : NetworkMapCache {
/**
@ -68,6 +70,8 @@ abstract class ServiceHubInternal : PluginServiceHub {
abstract val auditService: AuditService
abstract val rpcFlows: List<Class<out FlowLogic<*>>>
abstract val networkService: MessagingService
abstract val database: Database
abstract val configuration: NodeConfiguration
/**
* Given a list of [SignedTransaction]s, writes them to the given storage for validated transactions and then

View File

@ -6,6 +6,7 @@ import net.corda.core.crypto.DigitalSignature
import net.corda.core.flows.FlowLogic
import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.node.services.NotaryService
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
@ -15,7 +16,6 @@ import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.unwrap
import net.corda.flows.NotaryException
import net.corda.node.services.api.ServiceHubInternal
import org.jetbrains.exposed.sql.Database
import kotlin.concurrent.thread
/**
@ -23,40 +23,42 @@ import kotlin.concurrent.thread
*
* A transaction is notarised when the consensus is reached by the cluster on its uniqueness, and time-window validity.
*/
class BFTNonValidatingNotaryService(config: BFTSMaRtConfig,
services: ServiceHubInternal,
timeWindowChecker: TimeWindowChecker,
replicaId: Int,
db: Database) : NotaryService {
val client = BFTSMaRt.Client(config, replicaId) // (Ab)use replicaId for clientId.
private val replicaHolder = SettableFuture.create<Replica>()
init {
// Replica startup must be in parallel with other replicas, otherwise the constructor may not return:
val configHandle = config.handle()
thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) {
configHandle.use {
replicaHolder.set(Replica(it, replicaId, db, "bft_smart_notary_committed_states", services, timeWindowChecker))
log.info("BFT SMaRt replica $replicaId is running.")
}
}
}
fun dispose() {
replicaHolder.getOrThrow().dispose()
client.dispose()
}
class BFTNonValidatingNotaryService(override val services: ServiceHubInternal) : NotaryService() {
companion object {
val type = SimpleNotaryService.type.getSubType("bft")
private val log = loggerFor<BFTNonValidatingNotaryService>()
}
override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
ServiceFlow(otherParty, client)
private val client: BFTSMaRt.Client
private val replicaHolder = SettableFuture.create<Replica>()
init {
val replicaId = services.configuration.bftReplicaId ?: throw IllegalArgumentException("bftReplicaId value must be specified in the configuration")
val config = BFTSMaRtConfig(services.configuration.notaryClusterAddresses)
client = config.use {
val configHandle = config.handle()
// Replica startup must be in parallel with other replicas, otherwise the constructor may not return:
thread(name = "BFT SMaRt replica $replicaId init", isDaemon = true) {
configHandle.use {
val timeWindowChecker = TimeWindowChecker(services.clock)
val replica = Replica(it, replicaId, "bft_smart_notary_committed_states", services, timeWindowChecker)
replicaHolder.set(replica)
log.info("BFT SMaRt replica $replicaId is running.")
}
}
BFTSMaRt.Client(it, replicaId)
}
}
private class ServiceFlow(val otherSide: Party, val client: BFTSMaRt.Client) : FlowLogic<Void?>() {
fun commitTransaction(tx: Any, otherSide: Party) = client.commitTransaction(tx, otherSide)
override fun createServiceFlow(otherParty: Party, platformVersion: Int): FlowLogic<Void?> {
return ServiceFlow(otherParty, this)
}
private class ServiceFlow(val otherSide: Party, val service: BFTNonValidatingNotaryService) : FlowLogic<Void?>() {
@Suspendable
override fun call(): Void? {
val stx = receive<FilteredTransaction>(otherSide).unwrap { it }
@ -66,7 +68,7 @@ class BFTNonValidatingNotaryService(config: BFTSMaRtConfig,
}
private fun commit(stx: FilteredTransaction): List<DigitalSignature> {
val response = client.commitTransaction(stx, otherSide)
val response = service.commitTransaction(stx, otherSide)
when (response) {
is BFTSMaRt.ClusterResponse.Error -> throw NotaryException(response.error)
is BFTSMaRt.ClusterResponse.Signatures -> {
@ -79,10 +81,9 @@ class BFTNonValidatingNotaryService(config: BFTSMaRtConfig,
private class Replica(config: BFTSMaRtConfig,
replicaId: Int,
db: Database,
tableName: String,
services: ServiceHubInternal,
timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Replica(config, replicaId, db, tableName, services, timeWindowChecker) {
timeWindowChecker: TimeWindowChecker) : BFTSMaRt.Replica(config, replicaId, tableName, services, timeWindowChecker) {
override fun executeCommand(command: ByteArray): ByteArray {
val request = command.deserialize<BFTSMaRt.CommitRequest>()
@ -107,5 +108,14 @@ class BFTNonValidatingNotaryService(config: BFTSMaRtConfig,
BFTSMaRt.ReplicaResponse.Error(e.error)
}
}
}
override fun start() {
}
override fun stop() {
replicaHolder.getOrThrow().dispose()
client.dispose()
}
}

View File

@ -37,7 +37,6 @@ import net.corda.node.services.transactions.BFTSMaRt.Client
import net.corda.node.services.transactions.BFTSMaRt.Replica
import net.corda.node.utilities.JDBCHashMap
import net.corda.node.utilities.transaction
import org.jetbrains.exposed.sql.Database
import java.nio.file.Path
import java.util.*
@ -170,7 +169,6 @@ object BFTSMaRt {
*/
abstract class Replica(config: BFTSMaRtConfig,
replicaId: Int,
private val db: Database,
tableName: String,
private val services: ServiceHubInternal,
private val timeWindowChecker: TimeWindowChecker) : DefaultRecoverable() {
@ -180,7 +178,7 @@ object BFTSMaRt {
// TODO: Use Requery with proper DB schema instead of JDBCHashMap.
// Must be initialised before ServiceReplica is started
private val commitLog = db.transaction { JDBCHashMap<StateRef, UniquenessProvider.ConsumingTx>(tableName) }
private val commitLog = services.database.transaction { JDBCHashMap<StateRef, UniquenessProvider.ConsumingTx>(tableName) }
@Suppress("LeakingThis")
private val replica = CordaServiceReplica(replicaId, config.path, this)
@ -205,7 +203,7 @@ object BFTSMaRt {
protected fun commitInputStates(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
log.debug { "Attempting to commit inputs for transaction: $txId" }
val conflicts = mutableMapOf<StateRef, UniquenessProvider.ConsumingTx>()
db.transaction {
services.database.transaction {
states.forEach { state ->
commitLog[state]?.let { conflicts[state] = it }
}
@ -231,7 +229,7 @@ object BFTSMaRt {
}
protected fun sign(bytes: ByteArray): DigitalSignature.WithKey {
return db.transaction { services.keyManagementService.sign(bytes, services.notaryIdentityKey) }
return services.database.transaction { services.keyManagementService.sign(bytes, services.notaryIdentityKey) }
}
// TODO:
@ -240,7 +238,7 @@ object BFTSMaRt {
override fun getSnapshot(): ByteArray {
// LinkedHashMap for deterministic serialisation
val m = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
db.transaction {
services.database.transaction {
commitLog.forEach { m[it.key] = it.value }
}
return m.serialize().bytes
@ -248,7 +246,7 @@ object BFTSMaRt {
override fun installSnapshot(bytes: ByteArray) {
val m = bytes.deserialize<LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>>()
db.transaction {
services.database.transaction {
commitLog.clear()
commitLog.putAll(m)
}

View File

@ -1,36 +0,0 @@
package net.corda.node.services.transactions
import net.corda.core.ThreadBox
import net.corda.core.contracts.StateRef
import net.corda.core.identity.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.UniquenessException
import net.corda.core.node.services.UniquenessProvider
import java.util.*
import javax.annotation.concurrent.ThreadSafe
/** A dummy Uniqueness provider that stores the whole history of consumed states in memory */
@ThreadSafe
class InMemoryUniquenessProvider : UniquenessProvider {
/** For each input state store the consuming transaction information */
private val committedStates = ThreadBox(HashMap<StateRef, UniquenessProvider.ConsumingTx>())
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
committedStates.locked {
val conflictingStates = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
for (inputState in states) {
val consumingTx = get(inputState)
if (consumingTx != null) conflictingStates[inputState] = consumingTx
}
if (conflictingStates.isNotEmpty()) {
val conflict = UniquenessProvider.Conflict(conflictingStates)
throw UniquenessException(conflict)
} else {
states.forEachIndexed { i, stateRef ->
put(stateRef, UniquenessProvider.ConsumingTx(txId, i, callerIdentity))
}
}
}
}
}

View File

@ -2,16 +2,13 @@ package net.corda.node.services.transactions
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.identity.Party
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.core.transactions.FilteredTransaction
import net.corda.core.utilities.unwrap
import net.corda.flows.NotaryFlow
import net.corda.flows.TransactionParts
class NonValidatingNotaryFlow(otherSide: Party,
timeWindowChecker: TimeWindowChecker,
uniquenessProvider: UniquenessProvider) : NotaryFlow.Service(otherSide, timeWindowChecker, uniquenessProvider) {
class NonValidatingNotaryFlow(otherSide: Party, service: TrustedAuthorityNotaryService) : NotaryFlow.Service(otherSide, service) {
/**
* The received transaction is not checked for contract-validity, as that would require fully
* resolving it into a [TransactionForVerification], for which the caller would have to reveal the whole transaction

View File

@ -1,15 +0,0 @@
package net.corda.node.services.transactions
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
interface NotaryService {
/**
* Factory for producing notary service flows which have the corresponding sends and receives as NotaryFlow.Client.
* The first parameter is the client [Party] making the request and the second is the platform version
* of the client's node. Use this version parameter to provide backwards compatibility if the notary flow protocol
* changes.
*/
val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?>
}

View File

@ -1,17 +1,29 @@
package net.corda.node.services.transactions
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.core.node.services.TimeWindowChecker
import net.corda.flows.NotaryFlow
import net.corda.node.services.api.ServiceHubInternal
/** A non-validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
class RaftNonValidatingNotaryService(val timeWindowChecker: TimeWindowChecker,
val uniquenessProvider: RaftUniquenessProvider) : NotaryService {
class RaftNonValidatingNotaryService(override val services: ServiceHubInternal) : TrustedAuthorityNotaryService() {
companion object {
val type = SimpleNotaryService.type.getSubType("raft")
}
override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
NonValidatingNotaryFlow(otherParty, timeWindowChecker, uniquenessProvider)
override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services)
override fun createServiceFlow(otherParty: Party, platformVersion: Int): NotaryFlow.Service {
return NonValidatingNotaryFlow(otherParty, this)
}
}
override fun start() {
uniquenessProvider.start()
}
override fun stop() {
uniquenessProvider.stop()
}
}

View File

@ -23,6 +23,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.loggerFor
import net.corda.node.services.api.ServiceHubInternal
import net.corda.nodeapi.config.SSLConfiguration
import org.jetbrains.exposed.sql.Database
import java.nio.file.Path
@ -36,27 +37,29 @@ import javax.annotation.concurrent.ThreadSafe
* The uniqueness provider maintains both a Copycat cluster node (server) and a client through which it can submit
* requests to the cluster. In Copycat, a client request is first sent to the server it's connected to and then redirected
* to the cluster leader to be actioned.
*
* @param storagePath Directory storing the Raft log and state machine snapshots
* @param myAddress Address of the Copycat node run by this Corda node
* @param clusterAddresses List of node addresses in the existing Copycat cluster. At least one active node must be
* provided to join the cluster. If empty, a new cluster will be bootstrapped.
* @param db The database to store the state machine state in
* @param config SSL configuration
*/
@ThreadSafe
class RaftUniquenessProvider(
val storagePath: Path,
val myAddress: HostAndPort,
val clusterAddresses: List<HostAndPort>,
val db: Database,
val config: SSLConfiguration
) : UniquenessProvider, SingletonSerializeAsToken() {
class RaftUniquenessProvider(services: ServiceHubInternal) : UniquenessProvider, SingletonSerializeAsToken() {
companion object {
private val log = loggerFor<RaftUniquenessProvider>()
private val DB_TABLE_NAME = "notary_committed_states"
}
/** Directory storing the Raft log and state machine snapshots */
private val storagePath: Path = services.configuration.baseDirectory
/** Address of the Copycat node run by this Corda node */
private val myAddress: HostAndPort = services.configuration.notaryNodeAddress
?: throw IllegalArgumentException("notaryNodeAddress must be specified in configuration")
/**
* List of node addresses in the existing Copycat cluster. At least one active node must be
* provided to join the cluster. If empty, a new cluster will be bootstrapped.
*/
private val clusterAddresses: List<HostAndPort> = services.configuration.notaryClusterAddresses
/** The database to store the state machine state in */
private val db: Database = services.database
/** SSL configuration */
private val transportConfiguration: SSLConfiguration = services.configuration
private lateinit var _clientFuture: CompletableFuture<CopycatClient>
private lateinit var server: CopycatServer
/**
@ -71,13 +74,21 @@ class RaftUniquenessProvider(
val stateMachineFactory = { DistributedImmutableMap<String, ByteArray>(db, DB_TABLE_NAME) }
val address = Address(myAddress.host, myAddress.port)
val storage = buildStorage(storagePath)
val transport = buildTransport(config)
val transport = buildTransport(transportConfiguration)
val serializer = Serializer().apply {
// Add serializers so Catalyst doesn't attempt to fall back on Java serialization for these types, which is disabled process-wide:
register(DistributedImmutableMap.Commands.PutAll::class.java) {
object : TypeSerializer<DistributedImmutableMap.Commands.PutAll<*, *>> {
override fun write(obj: DistributedImmutableMap.Commands.PutAll<*, *>, buffer: BufferOutput<out BufferOutput<*>>, serializer: Serializer) = writeMap(obj.entries, buffer, serializer)
override fun read(type: Class<DistributedImmutableMap.Commands.PutAll<*, *>>, buffer: BufferInput<out BufferInput<*>>, serializer: Serializer) = DistributedImmutableMap.Commands.PutAll(readMap(buffer, serializer))
override fun write(obj: DistributedImmutableMap.Commands.PutAll<*, *>,
buffer: BufferOutput<out BufferOutput<*>>,
serializer: Serializer) {
writeMap(obj.entries, buffer, serializer)
}
override fun read(type: Class<DistributedImmutableMap.Commands.PutAll<*, *>>,
buffer: BufferInput<out BufferInput<*>>,
serializer: Serializer): DistributedImmutableMap.Commands.PutAll<Any, Any> {
return DistributedImmutableMap.Commands.PutAll(readMap(buffer, serializer))
}
}
}
register(LinkedHashMap::class.java) {
@ -170,4 +181,10 @@ private fun writeMap(map: Map<*, *>, buffer: BufferOutput<out BufferOutput<*>>,
}
}
private fun readMap(buffer: BufferInput<out BufferInput<*>>, serializer: Serializer) = LinkedHashMap<Any, Any>().apply { repeat(buffer.readInt()) { put(serializer.readObject(buffer), serializer.readObject(buffer)) } }
private fun readMap(buffer: BufferInput<out BufferInput<*>>, serializer: Serializer): LinkedHashMap<Any, Any> {
return LinkedHashMap<Any, Any>().apply {
repeat(buffer.readInt()) {
put(serializer.readObject(buffer), serializer.readObject(buffer))
}
}
}

View File

@ -1,17 +1,29 @@
package net.corda.node.services.transactions
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.core.node.services.TimeWindowChecker
import net.corda.flows.NotaryFlow
import net.corda.node.services.api.ServiceHubInternal
/** A validating notary service operated by a group of mutually trusting parties, uses the Raft algorithm to achieve consensus. */
class RaftValidatingNotaryService(val timeWindowChecker: TimeWindowChecker,
val uniquenessProvider: RaftUniquenessProvider) : NotaryService {
class RaftValidatingNotaryService(override val services: ServiceHubInternal) : TrustedAuthorityNotaryService() {
companion object {
val type = ValidatingNotaryService.type.getSubType("raft")
}
override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
ValidatingNotaryFlow(otherParty, timeWindowChecker, uniquenessProvider)
override val timeWindowChecker: TimeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider: RaftUniquenessProvider = RaftUniquenessProvider(services)
override fun createServiceFlow(otherParty: Party, platformVersion: Int): NotaryFlow.Service {
return ValidatingNotaryFlow(otherParty, this)
}
override fun start() {
uniquenessProvider.start()
}
override fun stop() {
uniquenessProvider.stop()
}
}

View File

@ -1,19 +1,25 @@
package net.corda.node.services.transactions
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.UniquenessProvider
import net.corda.flows.NotaryFlow
import net.corda.node.services.api.ServiceHubInternal
/** A simple Notary service that does not perform transaction validation */
class SimpleNotaryService(val timeWindowChecker: TimeWindowChecker,
val uniquenessProvider: UniquenessProvider) : NotaryService {
class SimpleNotaryService(override val services: ServiceHubInternal) : TrustedAuthorityNotaryService() {
companion object {
val type = ServiceType.notary.getSubType("simple")
}
override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
NonValidatingNotaryFlow(otherParty, timeWindowChecker, uniquenessProvider)
override val timeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider = PersistentUniquenessProvider()
override fun createServiceFlow(otherParty: Party, platformVersion: Int): NotaryFlow.Service {
return NonValidatingNotaryFlow(otherParty, this)
}
}
override fun start() {}
override fun stop() {}
}

View File

@ -3,8 +3,7 @@ package net.corda.node.services.transactions
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.identity.Party
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.UniquenessProvider
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.unwrap
@ -17,10 +16,7 @@ import java.security.SignatureException
* has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was
* indeed valid.
*/
class ValidatingNotaryFlow(otherSide: Party,
timeWindowChecker: TimeWindowChecker,
uniquenessProvider: UniquenessProvider) :
NotaryFlow.Service(otherSide, timeWindowChecker, uniquenessProvider) {
class ValidatingNotaryFlow(otherSide: Party, service: TrustedAuthorityNotaryService) : NotaryFlow.Service(otherSide, service) {
/**
* The received transaction is checked for contract-validity, which requires fully resolving it into a
* [TransactionForVerification], for which the caller also has to to reveal the whole transaction

View File

@ -1,19 +1,25 @@
package net.corda.node.services.transactions
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.node.services.TrustedAuthorityNotaryService
import net.corda.core.node.services.ServiceType
import net.corda.core.node.services.TimeWindowChecker
import net.corda.core.node.services.UniquenessProvider
import net.corda.flows.NotaryFlow
import net.corda.node.services.api.ServiceHubInternal
/** A Notary service that validates the transaction chain of the submitted transaction before committing it */
class ValidatingNotaryService(val timeWindowChecker: TimeWindowChecker,
val uniquenessProvider: UniquenessProvider) : NotaryService {
class ValidatingNotaryService(override val services: ServiceHubInternal) : TrustedAuthorityNotaryService() {
companion object {
val type = ServiceType.notary.getSubType("validating")
}
override val serviceFlowFactory: (Party, Int) -> FlowLogic<Void?> = { otherParty, _ ->
ValidatingNotaryFlow(otherParty, timeWindowChecker, uniquenessProvider)
override val timeWindowChecker = TimeWindowChecker(services.clock)
override val uniquenessProvider = PersistentUniquenessProvider()
override fun createServiceFlow(otherParty: Party, platformVersion: Int): NotaryFlow.Service {
return ValidatingNotaryFlow(otherParty, this)
}
}
override fun start() {}
override fun stop() {}
}

View File

@ -10,6 +10,7 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.serialization.NodeClock
import net.corda.node.services.api.*
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.FlowStateMachineImpl
@ -18,6 +19,7 @@ import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.testing.MOCK_IDENTITY_SERVICE
import net.corda.testing.node.MockNetworkMapCache
import net.corda.testing.node.MockStorageService
import org.jetbrains.exposed.sql.Database
import java.time.Clock
open class MockServiceHubInternal(
@ -55,7 +57,10 @@ open class MockServiceHubInternal(
get() = overrideClock ?: throw UnsupportedOperationException()
override val myInfo: NodeInfo
get() = throw UnsupportedOperationException()
override val database: Database
get() = throw UnsupportedOperationException()
override val configuration: NodeConfiguration
get() = throw UnsupportedOperationException()
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
override val rpcFlows: List<Class<out FlowLogic<*>>>
get() = throw UnsupportedOperationException()

View File

@ -1,36 +0,0 @@
package net.corda.node.services.transactions
import net.corda.core.crypto.SecureHash
import net.corda.core.node.services.UniquenessException
import net.corda.testing.MEGA_CORP
import net.corda.testing.generateStateRef
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class InMemoryUniquenessProviderTests {
val identity = MEGA_CORP
val txID = SecureHash.randomSHA256()
@Test fun `should commit a transaction with unused inputs without exception`() {
val provider = InMemoryUniquenessProvider()
val inputState = generateStateRef()
provider.commit(listOf(inputState), txID, identity)
}
@Test fun `should report a conflict for a transaction with previously used inputs`() {
val provider = InMemoryUniquenessProvider()
val inputState = generateStateRef()
val inputs = listOf(inputState)
provider.commit(inputs, txID, identity)
val ex = assertFailsWith<UniquenessException> { provider.commit(inputs, txID, identity) }
val consumingTx = ex.error.stateHistory[inputState]!!
assertEquals(consumingTx.id, txID)
assertEquals(consumingTx.inputIndex, inputs.indexOf(inputState))
assertEquals(consumingTx.requestingParty, identity)
}
}

View File

@ -30,7 +30,6 @@ import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.network.InMemoryNetworkMapService
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.transactions.InMemoryUniquenessProvider
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.node.services.vault.NodeVaultService
@ -225,8 +224,6 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
// There is no need to slow down the unit tests by initialising CityDatabase
override fun findMyLocation(): WorldMapLocation? = null
override fun makeUniquenessProvider(type: ServiceType): UniquenessProvider = InMemoryUniquenessProvider()
override fun makeTransactionVerifierService() = InMemoryTransactionVerifierService(1)
override fun myAddresses(): List<HostAndPort> = listOf(HostAndPort.fromHost("mockHost"))