mirror of
https://github.com/corda/corda.git
synced 2025-06-10 11:21:45 +00:00
Added a persistent uniqueness provider, backed by a JDBCHashMap.
Enabled a single node persistent notary.
This commit is contained in:
parent
8cad9efd27
commit
0ed6a0ef4d
@ -40,7 +40,6 @@ import com.r3corda.node.services.persistence.PerFileCheckpointStorage
|
|||||||
import com.r3corda.node.services.persistence.PerFileTransactionStorage
|
import com.r3corda.node.services.persistence.PerFileTransactionStorage
|
||||||
import com.r3corda.node.services.persistence.StorageServiceImpl
|
import com.r3corda.node.services.persistence.StorageServiceImpl
|
||||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||||
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
|
|
||||||
import com.r3corda.node.services.transactions.NotaryService
|
import com.r3corda.node.services.transactions.NotaryService
|
||||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||||
import com.r3corda.node.services.transactions.ValidatingNotaryService
|
import com.r3corda.node.services.transactions.ValidatingNotaryService
|
||||||
@ -128,8 +127,9 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
|||||||
lateinit var wallet: WalletService
|
lateinit var wallet: WalletService
|
||||||
lateinit var keyManagement: E2ETestKeyManagementService
|
lateinit var keyManagement: E2ETestKeyManagementService
|
||||||
var inNodeNetworkMapService: NetworkMapService? = null
|
var inNodeNetworkMapService: NetworkMapService? = null
|
||||||
var inNodeNotaryService: NotaryService? = null
|
|
||||||
var inNodeWalletMonitorService: WalletMonitorService? = null
|
var inNodeWalletMonitorService: WalletMonitorService? = null
|
||||||
|
var inNodeNotaryService: NotaryService? = null
|
||||||
|
var uniquenessProvider: UniquenessProvider? = null
|
||||||
lateinit var identity: IdentityService
|
lateinit var identity: IdentityService
|
||||||
lateinit var net: MessagingServiceInternal
|
lateinit var net: MessagingServiceInternal
|
||||||
lateinit var netMapCache: NetworkMapCache
|
lateinit var netMapCache: NetworkMapCache
|
||||||
@ -187,6 +187,13 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
|||||||
customServices.clear()
|
customServices.clear()
|
||||||
customServices.addAll(buildPluginServices(tokenizableServices))
|
customServices.addAll(buildPluginServices(tokenizableServices))
|
||||||
|
|
||||||
|
// TODO: uniquenessProvider creation should be inside makeNotaryService(), but notary service initialisation
|
||||||
|
// depends on smm, while smm depends on tokenizableServices, which uniquenessProvider is part of
|
||||||
|
advertisedServices.singleOrNull { it.isSubTypeOf(NotaryService.Type) }?.let {
|
||||||
|
uniquenessProvider = makeUniquenessProvider()
|
||||||
|
tokenizableServices.add(uniquenessProvider!!)
|
||||||
|
}
|
||||||
|
|
||||||
smm = StateMachineManager(services,
|
smm = StateMachineManager(services,
|
||||||
listOf(tokenizableServices),
|
listOf(tokenizableServices),
|
||||||
checkpointStorage,
|
checkpointStorage,
|
||||||
@ -326,7 +333,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
|||||||
}
|
}
|
||||||
|
|
||||||
open protected fun makeNotaryService(type: ServiceType): NotaryService {
|
open protected fun makeNotaryService(type: ServiceType): NotaryService {
|
||||||
val uniquenessProvider = InMemoryUniquenessProvider()
|
val uniquenessProvider = makeUniquenessProvider()
|
||||||
val timestampChecker = TimestampChecker(platformClock, 30.seconds)
|
val timestampChecker = TimestampChecker(platformClock, 30.seconds)
|
||||||
|
|
||||||
return when (type) {
|
return when (type) {
|
||||||
@ -338,6 +345,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract fun makeUniquenessProvider(): UniquenessProvider
|
||||||
|
|
||||||
protected open fun makeIdentityService(): IdentityService {
|
protected open fun makeIdentityService(): IdentityService {
|
||||||
val service = InMemoryIdentityService()
|
val service = InMemoryIdentityService()
|
||||||
|
|
||||||
|
@ -3,7 +3,6 @@ package com.r3corda.node.internal
|
|||||||
import com.codahale.metrics.JmxReporter
|
import com.codahale.metrics.JmxReporter
|
||||||
import com.google.common.net.HostAndPort
|
import com.google.common.net.HostAndPort
|
||||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||||
import com.r3corda.core.node.NodeInfo
|
|
||||||
import com.r3corda.core.node.ServiceHub
|
import com.r3corda.core.node.ServiceHub
|
||||||
import com.r3corda.core.node.services.ServiceType
|
import com.r3corda.core.node.services.ServiceType
|
||||||
import com.r3corda.core.utilities.loggerFor
|
import com.r3corda.core.utilities.loggerFor
|
||||||
@ -13,6 +12,7 @@ import com.r3corda.node.services.config.FullNodeConfiguration
|
|||||||
import com.r3corda.node.services.config.NodeConfiguration
|
import com.r3corda.node.services.config.NodeConfiguration
|
||||||
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
import com.r3corda.node.services.messaging.ArtemisMessagingClient
|
||||||
import com.r3corda.node.services.messaging.ArtemisMessagingServer
|
import com.r3corda.node.services.messaging.ArtemisMessagingServer
|
||||||
|
import com.r3corda.node.services.transactions.PersistentUniquenessProvider
|
||||||
import com.r3corda.node.servlets.AttachmentDownloadServlet
|
import com.r3corda.node.servlets.AttachmentDownloadServlet
|
||||||
import com.r3corda.node.servlets.Config
|
import com.r3corda.node.servlets.Config
|
||||||
import com.r3corda.node.servlets.DataUploadServlet
|
import com.r3corda.node.servlets.DataUploadServlet
|
||||||
@ -234,6 +234,8 @@ class Node(dir: Path, val p2pAddr: HostAndPort, val webServerAddr: HostAndPort,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun makeUniquenessProvider() = PersistentUniquenessProvider()
|
||||||
|
|
||||||
override fun start(): Node {
|
override fun start(): Node {
|
||||||
alreadyRunningNodeCheck()
|
alreadyRunningNodeCheck()
|
||||||
super.start()
|
super.start()
|
||||||
|
@ -0,0 +1,52 @@
|
|||||||
|
package com.r3corda.node.services.transactions
|
||||||
|
|
||||||
|
import com.r3corda.core.ThreadBox
|
||||||
|
import com.r3corda.core.contracts.StateRef
|
||||||
|
import com.r3corda.core.crypto.Party
|
||||||
|
import com.r3corda.core.crypto.SecureHash
|
||||||
|
import com.r3corda.core.node.services.UniquenessException
|
||||||
|
import com.r3corda.core.node.services.UniquenessProvider
|
||||||
|
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||||
|
import com.r3corda.node.utilities.JDBCHashMap
|
||||||
|
import com.r3corda.node.utilities.databaseTransaction
|
||||||
|
import java.util.*
|
||||||
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
|
|
||||||
|
/** A a RDBMS backed Uniqueness provider */
|
||||||
|
@ThreadSafe
|
||||||
|
class PersistentUniquenessProvider() : UniquenessProvider, SingletonSerializeAsToken() {
|
||||||
|
companion object {
|
||||||
|
private val TABLE_NAME = "notary_commit_log"
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For each input state store the consuming transaction information.
|
||||||
|
* TODO: remove databaseTransaction here once node initialisation is wrapped in it
|
||||||
|
*/
|
||||||
|
val committedStates = ThreadBox(databaseTransaction {
|
||||||
|
JDBCHashMap<StateRef, UniquenessProvider.ConsumingTx>(TABLE_NAME, loadOnInit = false)
|
||||||
|
})
|
||||||
|
|
||||||
|
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party) {
|
||||||
|
val conflict = committedStates.locked {
|
||||||
|
// TODO: remove databaseTransaction here once protocols are wrapped in it
|
||||||
|
databaseTransaction {
|
||||||
|
val conflictingStates = LinkedHashMap<StateRef, UniquenessProvider.ConsumingTx>()
|
||||||
|
for (inputState in states) {
|
||||||
|
val consumingTx = get(inputState)
|
||||||
|
if (consumingTx != null) conflictingStates[inputState] = consumingTx
|
||||||
|
}
|
||||||
|
if (conflictingStates.isNotEmpty()) {
|
||||||
|
UniquenessProvider.Conflict(conflictingStates)
|
||||||
|
} else {
|
||||||
|
states.forEachIndexed { i, stateRef ->
|
||||||
|
put(stateRef, UniquenessProvider.ConsumingTx(txId, i, callerIdentity))
|
||||||
|
}
|
||||||
|
null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (conflict != null) throw UniquenessException(conflict)
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,57 @@
|
|||||||
|
package com.r3corda.node.services
|
||||||
|
|
||||||
|
import com.r3corda.core.crypto.SecureHash
|
||||||
|
import com.r3corda.core.node.services.UniquenessException
|
||||||
|
import com.r3corda.core.utilities.LogHelper
|
||||||
|
import com.r3corda.node.services.transactions.PersistentUniquenessProvider
|
||||||
|
import com.r3corda.node.utilities.configureDatabase
|
||||||
|
import com.r3corda.testing.MEGA_CORP
|
||||||
|
import com.r3corda.testing.generateStateRef
|
||||||
|
import com.r3corda.testing.node.makeTestDataSourceProperties
|
||||||
|
import org.junit.After
|
||||||
|
import org.junit.Before
|
||||||
|
import org.junit.Test
|
||||||
|
import java.io.Closeable
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
import kotlin.test.assertFailsWith
|
||||||
|
|
||||||
|
class PersistentUniquenessProviderTests {
|
||||||
|
val identity = MEGA_CORP
|
||||||
|
val txID = SecureHash.randomSHA256()
|
||||||
|
|
||||||
|
lateinit var dataSource: Closeable
|
||||||
|
|
||||||
|
@Before
|
||||||
|
fun setUp() {
|
||||||
|
LogHelper.setLevel(PersistentUniquenessProvider::class)
|
||||||
|
dataSource = configureDatabase(makeTestDataSourceProperties()).first
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
fun tearDown() {
|
||||||
|
dataSource.close()
|
||||||
|
LogHelper.reset(PersistentUniquenessProvider::class)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test fun `should commit a transaction with unused inputs without exception`() {
|
||||||
|
val provider = PersistentUniquenessProvider()
|
||||||
|
val inputState = generateStateRef()
|
||||||
|
|
||||||
|
provider.commit(listOf(inputState), txID, identity)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test fun `should report a conflict for a transaction with previously used inputs`() {
|
||||||
|
val provider = PersistentUniquenessProvider()
|
||||||
|
val inputState = generateStateRef()
|
||||||
|
|
||||||
|
val inputs = listOf(inputState)
|
||||||
|
provider.commit(inputs, txID, identity)
|
||||||
|
|
||||||
|
val ex = assertFailsWith<UniquenessException> { provider.commit(inputs, txID, identity) }
|
||||||
|
|
||||||
|
val consumingTx = ex.error.stateHistory[inputState]!!
|
||||||
|
assertEquals(consumingTx.id, txID)
|
||||||
|
assertEquals(consumingTx.inputIndex, inputs.indexOf(inputState))
|
||||||
|
assertEquals(consumingTx.requestingParty, identity)
|
||||||
|
}
|
||||||
|
}
|
@ -4,7 +4,6 @@ import com.google.common.jimfs.Jimfs
|
|||||||
import com.google.common.util.concurrent.Futures
|
import com.google.common.util.concurrent.Futures
|
||||||
import com.r3corda.core.crypto.Party
|
import com.r3corda.core.crypto.Party
|
||||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||||
import com.r3corda.core.node.NodeInfo
|
|
||||||
import com.r3corda.core.node.PhysicalLocation
|
import com.r3corda.core.node.PhysicalLocation
|
||||||
import com.r3corda.core.node.services.ServiceType
|
import com.r3corda.core.node.services.ServiceType
|
||||||
import com.r3corda.core.node.services.WalletService
|
import com.r3corda.core.node.services.WalletService
|
||||||
@ -12,6 +11,7 @@ import com.r3corda.core.testing.InMemoryWalletService
|
|||||||
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
|
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
|
||||||
import com.r3corda.core.utilities.loggerFor
|
import com.r3corda.core.utilities.loggerFor
|
||||||
import com.r3corda.node.services.config.NodeConfiguration
|
import com.r3corda.node.services.config.NodeConfiguration
|
||||||
|
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import java.nio.file.Files
|
import java.nio.file.Files
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
@ -93,6 +93,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
|||||||
// There is no need to slow down the unit tests by initialising CityDatabase
|
// There is no need to slow down the unit tests by initialising CityDatabase
|
||||||
override fun findMyLocation(): PhysicalLocation? = null
|
override fun findMyLocation(): PhysicalLocation? = null
|
||||||
|
|
||||||
|
override fun makeUniquenessProvider() = InMemoryUniquenessProvider()
|
||||||
|
|
||||||
override fun start(): MockNode {
|
override fun start(): MockNode {
|
||||||
super.start()
|
super.start()
|
||||||
mockNet.identities.add(storage.myLegalIdentity)
|
mockNet.identities.add(storage.myLegalIdentity)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user