[CORDA-1341]: Ensure API can be called concurrently wrt transactions. (#3235)

This commit is contained in:
Michele Sollecito
2018-05-29 15:25:34 +01:00
committed by GitHub
parent 0f82e2df7f
commit f68cf6f712
47 changed files with 852 additions and 849 deletions

View File

@ -252,26 +252,6 @@ The network must then be manually run before retrieving the future's value:
Accessing ``StartedMockNode`` internals
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Creating a node database transaction
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Whenever you query a node's database (e.g. to extract information from the node's vault), you must wrap the query in
a database transaction, as follows:
.. container:: codeset
.. sourcecode:: kotlin
nodeA.database.transaction {
// Perform query here.
}
.. sourcecode:: java
node.getDatabase().transaction(tx -> {
// Perform query here.
}
Querying a node's vault
~~~~~~~~~~~~~~~~~~~~~~~
@ -281,15 +261,11 @@ Recorded states can be retrieved from the vault of a ``StartedMockNode`` using:
.. sourcecode:: kotlin
nodeA.database.transaction {
val myStates = nodeA.services.vaultService.queryBy<MyStateType>().states
}
.. sourcecode:: java
node.getDatabase().transaction(tx -> {
List<MyStateType> myStates = node.getServices().getVaultService().queryBy(MyStateType.class).getStates();
}
This allows you to check whether a given state has (or has not) been stored, and whether it has the correct attributes.

View File

@ -7,6 +7,8 @@ release, see :doc:`upgrade-notes`.
Unreleased
==========
* ``ServiceHub`` and ``CordaRPCOps`` can now safely be used from multiple threads without incurring in database transaction problems.
* Doorman and NetworkMap url's can now be configured individually rather than being assumed to be
the same server. Current ``compatibilityZoneURL`` configurations remain valid. See both :doc:`corda-configuration-file`
and :doc:`permissioning` for details.

View File

@ -42,7 +42,7 @@ enum class TransactionIsolationLevel {
val jdbcValue: Int = java.sql.Connection::class.java.getField("TRANSACTION_$name").get(null) as Int
}
private val _contextDatabase = ThreadLocal<CordaPersistence>()
private val _contextDatabase = InheritableThreadLocal<CordaPersistence>()
var contextDatabase: CordaPersistence
get() = _contextDatabase.get() ?: error("Was expecting to find CordaPersistence set on current thread: ${Strand.currentStrand()}")
set(database) = _contextDatabase.set(database)

View File

@ -6,7 +6,11 @@ import net.corda.core.contracts.AlwaysAcceptAttachmentConstraint
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.*
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.crypto.sha256
import net.corda.core.flows.NotaryError
import net.corda.core.flows.NotaryException
import net.corda.core.flows.NotaryFlow
@ -38,12 +42,26 @@ import net.corda.testing.node.internal.InternalMockNetwork.MockNode
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.startFlow
import org.hamcrest.Matchers.instanceOf
import org.junit.*
import org.junit.AfterClass
import org.junit.Assert.assertThat
import org.junit.BeforeClass
import org.junit.Test
import java.nio.file.Paths
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ExecutionException
import kotlin.collections.List
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.distinct
import kotlin.collections.forEach
import kotlin.collections.last
import kotlin.collections.listOf
import kotlin.collections.map
import kotlin.collections.mapIndexedNotNull
import kotlin.collections.plus
import kotlin.collections.single
import kotlin.collections.zip
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@ -106,9 +124,7 @@ class BFTNotaryServiceTests {
val issueTx = signInitialTransaction(notary) {
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
}
database.transaction {
services.recordTransactions(issueTx)
}
val spendTxs = (1..10).map {
signInitialTransaction(notary) {
addInputState(issueTx.tx.outRef<ContractState>(0))
@ -150,9 +166,7 @@ class BFTNotaryServiceTests {
val issueTx = signInitialTransaction(notary) {
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
}
database.transaction {
services.recordTransactions(issueTx)
}
val spendTx = signInitialTransaction(notary) {
addInputState(issueTx.tx.outRef<ContractState>(0))
setTimeWindow(TimeWindow.fromOnly(Instant.MAX))
@ -188,9 +202,7 @@ class BFTNotaryServiceTests {
val issueTx = signInitialTransaction(notary) {
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
}
database.transaction {
services.recordTransactions(issueTx)
}
val spendTx = signInitialTransaction(notary) {
addInputState(issueTx.tx.outRef<ContractState>(0))
setTimeWindow(TimeWindow.untilOnly(Instant.now() + Duration.ofHours(1)))

View File

@ -11,14 +11,13 @@ import net.corda.core.internal.concurrent.map
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.contracts.DummyContract
import net.corda.testing.driver.driver
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.dummyCommand
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.InProcess
import net.corda.testing.driver.internal.InProcessImpl
import net.corda.testing.driver.driver
import net.corda.testing.node.ClusterSpec
import net.corda.testing.node.NotarySpec
import org.junit.Test
@ -70,22 +69,18 @@ class RaftNotaryServiceTests {
notarySpecs = listOf(NotarySpec(notaryName, cluster = ClusterSpec.Raft(clusterSize = 3)))
)) {
val bankA = startNode(providedName = DUMMY_BANK_A_NAME).map { (it as InProcess) }.getOrThrow()
val issueTx = (bankA as InProcessImpl).database.transaction {
val builder = DummyContract.generateInitial(Random().nextInt(), defaultNotaryIdentity, bankA.services.myInfo.singleIdentity().ref(0))
.setTimeWindow(bankA.services.clock.instant(), 30.seconds)
bankA.services.signInitialTransaction(builder)
}
val issueTx = bankA.services.signInitialTransaction(builder)
bankA.startFlow(NotaryFlow.Client(issueTx)).getOrThrow()
}
}
private fun issueState(nodeHandle: InProcess, notary: Party): StateAndRef<*> {
return (nodeHandle as InProcessImpl).database.transaction {
val builder = DummyContract.generateInitial(Random().nextInt(), notary, nodeHandle.services.myInfo.singleIdentity().ref(0))
val stx = nodeHandle.services.signInitialTransaction(builder)
nodeHandle.services.recordTransactions(stx)
StateAndRef(builder.outputStates().first(), StateRef(stx.id, 0))
}
return StateAndRef(builder.outputStates().first(), StateRef(stx.id, 0))
}
}

View File

@ -7,7 +7,13 @@ import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.Node
import net.corda.node.internal.StartedNode
import net.corda.testing.core.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.getTestPartyAndCertificate
import net.corda.testing.core.singleIdentity
import net.corda.testing.node.internal.NodeBasedTest
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
@ -41,22 +47,19 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
fun `unknown legal name`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netMapCache = alice.services.networkMapCache
alice.database.transaction {
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).isEmpty()
assertThat(netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(netMapCache.getPeerByLegalName(DUMMY_NOTARY_NAME)).isNull()
assertThat(netMapCache.getPeerCertificateByLegalName(DUMMY_NOTARY_NAME)).isNull()
}
}
@Test
fun `nodes in distributed service`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netMapCache = alice.services.networkMapCache
val distServiceNodeInfos = alice.database.transaction {
val distributedIdentity = TestIdentity(DUMMY_NOTARY_NAME).identity
(1..2).map {
val distServiceNodeInfos = (1..2).map {
val nodeInfo = NodeInfo(
addresses = listOf(NetworkHostAndPort("localhost", 1000 + it)),
legalIdentitiesAndCerts = listOf(TestIdentity.fresh("Org-$it").identity, distributedIdentity),
@ -66,37 +69,30 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
netMapCache.addNode(nodeInfo)
nodeInfo
}
}
alice.database.transaction {
assertThat(netMapCache.getNodesByLegalName(DUMMY_NOTARY_NAME)).containsOnlyElementsOf(distServiceNodeInfos)
assertThatExceptionOfType(IllegalArgumentException::class.java)
.isThrownBy { netMapCache.getNodeByLegalName(DUMMY_NOTARY_NAME) }
.withMessageContaining(DUMMY_NOTARY_NAME.toString())
}
}
@Test
fun `get nodes by owning key and by name`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netCache = alice.services.networkMapCache
alice.database.transaction {
val res = netCache.getNodeByLegalIdentity(alice.info.singleIdentity())
assertEquals(alice.info, res)
val res2 = netCache.getNodeByLegalName(DUMMY_REGULATOR.name)
assertEquals(infos.singleOrNull { DUMMY_REGULATOR.name in it.legalIdentities.map { it.name } }, res2)
}
}
@Test
fun `get nodes by address`() {
val alice = startNodesWithPort(listOf(ALICE))[0]
val netCache = alice.services.networkMapCache
alice.database.transaction {
val res = netCache.getNodeByAddress(alice.info.addresses[0])
assertEquals(alice.info, res)
}
}
// This test has to be done as normal node not mock, because MockNodes don't have addresses.
@Test
@ -105,9 +101,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
val charliePartyCert = getTestPartyAndCertificate(CHARLIE_NAME, generateKeyPair().public)
val aliceCache = aliceNode.services.networkMapCache
aliceCache.addNode(aliceNode.info.copy(legalIdentitiesAndCerts = listOf(charliePartyCert)))
val res = aliceNode.database.transaction {
aliceCache.allNodes.filter { aliceNode.info.addresses[0] in it.addresses }
}
val res = aliceCache.allNodes.filter { aliceNode.info.addresses[0] in it.addresses }
assertEquals(2, res.size)
}

View File

@ -20,6 +20,7 @@ import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.NotaryChangeFlow
import net.corda.core.flows.NotaryFlow
import net.corda.core.flows.StartableByService
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
@ -41,7 +42,6 @@ import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.ServicesForResolution
import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.node.services.CordaService
import net.corda.core.node.services.IdentityService
@ -51,7 +51,6 @@ import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.days
import net.corda.core.utilities.debug
@ -160,6 +159,7 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.atomic.AtomicReference
import kotlin.collections.set
import kotlin.reflect.KClass
import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
@ -233,9 +233,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
@Volatile private var _started: StartedNode<AbstractNode>? = null
/** The implementation of the [CordaRPCOps] interface used by this node. */
open fun makeRPCOps(flowStarter: FlowStarter, database: CordaPersistence, smm: StateMachineManager): CordaRPCOps {
open fun makeRPCOps(flowStarter: FlowStarter, smm: StateMachineManager): CordaRPCOps {
val ops: CordaRPCOps = CordaRPCOpsImpl(services, smm, database, flowStarter, { shutdownExecutor.submit { stop() } })
val ops: CordaRPCOps = CordaRPCOpsImpl(services, smm, flowStarter, { shutdownExecutor.submit { stop() } })
// Mind that order is relevant here.
val proxies = listOf<(CordaRPCOps) -> CordaRPCOps>(::AuthenticatedRpcOpsProxy, { it -> ExceptionSerialisingRpcOpsProxy(it, true) })
return proxies.fold(ops) { delegate, decorate -> decorate(delegate) }
@ -257,7 +257,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
initCertificate()
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)).use {
// Wrapped in an atomic reference just to allow setting it before the closure below gets invoked.
val identityServiceRef = AtomicReference<IdentityService>()
val database = initialiseDatabasePersistence(schemaService, { name -> identityServiceRef.get().wellKnownPartyFromX500Name(name) }, { party -> identityServiceRef.get().wellKnownPartyFromAnonymous(party) })
val identityService = makeIdentityService(identity.certificate, database)
identityServiceRef.set(identityService)
return database.use {
it.transaction {
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks
// like a design smell.
@ -280,8 +285,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
initialiseJVMAgents()
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null)
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
val identityService = makeIdentityService(identity.certificate)
// Wrapped in an atomic reference just to allow setting it before the closure below gets invoked.
val identityServiceRef = AtomicReference<IdentityService>()
// Do all of this in a database transaction so anything that might need a connection has one.
val database = initialiseDatabasePersistence(
schemaService,
{ name -> identityServiceRef.get().wellKnownPartyFromX500Name(name) },
{ party -> identityServiceRef.get().wellKnownPartyFromAnonymous(party) })
val identityService = makeIdentityService(identity.certificate, database).also(identityServiceRef::set)
networkMapClient = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, identityService.trustRoot) }
val networkParameteresReader = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory)
val networkParameters = networkParameteresReader.networkParameters
@ -289,15 +302,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
"Node's platform version is lower than network's required minimumPlatformVersion"
}
// Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService).transaction {
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
val (startedImpl, schedulerService) = database.transaction {
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService, database)
val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair)
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
val metrics = MetricRegistry()
val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes)
log.debug("Transaction storage created")
attachments = NodeAttachmentService(metrics, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound)
attachments = NodeAttachmentService(metrics, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound, database)
log.debug("Attachment service created")
val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments, networkParameters.whitelistedContractImplementations)
log.debug("Cordapp provider created")
@ -341,7 +353,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
makeVaultObservers(schedulerService, database.hibernateConfig, schemaService, flowLogicRefFactory)
val rpcOps = makeRPCOps(flowStarter, database, smm)
val rpcOps = makeRPCOps(flowStarter, smm)
startMessagingService(rpcOps)
installCoreFlows()
val cordaServices = installCordaServices(flowStarter)
@ -705,7 +717,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
networkParameters: NetworkParameters): MutableList<Any> {
checkpointStorage = DBCheckpointStorage()
val keyManagementService = makeKeyManagementService(identityService, keyPairs)
val keyManagementService = makeKeyManagementService(identityService, keyPairs, database)
_services = ServiceHubInternalImpl(
identityService,
keyManagementService,
@ -727,7 +739,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
services, cordappProvider, this)
}
protected open fun makeTransactionStorage(database: CordaPersistence, transactionCacheSizeBytes: Long): WritableTransactionStorage = DBTransactionStorage(transactionCacheSizeBytes)
protected open fun makeTransactionStorage(database: CordaPersistence, transactionCacheSizeBytes: Long): WritableTransactionStorage = DBTransactionStorage(transactionCacheSizeBytes, database)
private fun makeVaultObservers(schedulerService: SchedulerService, hibernateConfig: HibernateConfiguration, schemaService: SchemaService, flowLogicRefFactory: FlowLogicRefFactory) {
ScheduledActivityObserver.install(services.vaultService, schedulerService, flowLogicRefFactory)
HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig, schemaService)
@ -768,10 +780,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// Specific class so that MockNode can catch it.
class DatabaseConfigurationException(msg: String) : CordaException(msg)
protected open fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence {
protected open fun initialiseDatabasePersistence(schemaService: SchemaService,
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?): CordaPersistence {
val props = configuration.dataSourceProperties
if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
val database = configureDatabase(props, configuration.database, identityService, schemaService)
val database = configureDatabase(props, configuration.database, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous, schemaService)
// Now log the vendor string as this will also cause a connection to be tested eagerly.
logVendorString(database, log)
runOnStop += database::close
@ -797,8 +811,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
protected open fun makeKeyManagementService(identityService: IdentityService, keyPairs: Set<KeyPair>): KeyManagementService {
return PersistentKeyManagementService(identityService, keyPairs)
protected open fun makeKeyManagementService(identityService: IdentityService, keyPairs: Set<KeyPair>, database: CordaPersistence): KeyManagementService {
return PersistentKeyManagementService(identityService, keyPairs, database)
}
private fun makeCoreNotaryService(notaryConfig: NotaryConfig, database: CordaPersistence): NotaryService {
@ -826,10 +840,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
}
private fun makeIdentityService(identityCert: X509Certificate): PersistentIdentityService {
private fun makeIdentityService(identityCert: X509Certificate, database: CordaPersistence): PersistentIdentityService {
val trustRoot = configuration.loadTrustStore().getCertificate(X509Utilities.CORDA_ROOT_CA)
val nodeCa = configuration.loadNodeKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_CA)
return PersistentIdentityService(trustRoot, listOf(identityCert, nodeCa))
return PersistentIdentityService(trustRoot, database, listOf(identityCert, nodeCa))
}
protected abstract fun makeTransactionVerifierService(): TransactionVerifierService
@ -907,8 +921,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
}
protected open fun generateKeyPair() = cryptoGenerateKeyPair()
protected open fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, hibernateConfig: HibernateConfiguration): VaultServiceInternal {
return NodeVaultService(platformClock, keyManagementService, services, hibernateConfig)
protected open fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, hibernateConfig: HibernateConfiguration, database: CordaPersistence): VaultServiceInternal {
return NodeVaultService(platformClock, keyManagementService, services, hibernateConfig, database)
}
/** Load configured JVM agents */
@ -945,10 +959,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
private val servicesForResolution: ServicesForResolution
) : SingletonSerializeAsToken(), ServiceHubInternal, ServicesForResolution by servicesForResolution {
override val rpcFlows = ArrayList<Class<out FlowLogic<*>>>()
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage()
override val stateMachineRecordedTransactionMapping = DBTransactionMappingStorage(database)
override val auditService = DummyAuditService()
override val transactionVerifierService by lazy { makeTransactionVerifierService() }
override val vaultService by lazy { makeVaultService(keyManagementService, servicesForResolution, database.hibernateConfig) }
override val vaultService by lazy { makeVaultService(keyManagementService, servicesForResolution, database.hibernateConfig, database) }
override val contractUpgradeService by lazy { ContractUpgradeServiceImpl() }
override val attachments: AttachmentStorage get() = this@AbstractNode.attachments
override val networkService: MessagingService get() = network
@ -964,12 +978,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
return flowFactories[initiatingFlowClass]
}
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
database.transaction {
super.recordTransactions(statesToRecord, txs)
}
}
override fun jdbcSession(): Connection = database.createSession()
// allows services to register handlers to be informed when the node stop method is called
@ -1039,14 +1047,15 @@ internal class NetworkMapCacheEmptyException : Exception()
fun configureDatabase(hikariProperties: Properties,
databaseConfig: DatabaseConfig,
identityService: IdentityService,
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?,
schemaService: SchemaService = NodeSchemaService()): CordaPersistence {
// Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately
// Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
// either Hibernate can be convinced to stop warning, use the descriptor by default, or something else.
JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(identityService))
JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
val dataSource = DataSourceFactory.createDataSource(hikariProperties)
val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(identityService))
val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
return CordaPersistence(dataSource, databaseConfig, schemaService.schemaOptions.keys, attributeConverters)
}

View File

@ -17,12 +17,26 @@ import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.RPC_UPLOADER
import net.corda.core.internal.STRUCTURAL_STEP_PREFIX
import net.corda.core.internal.sign
import net.corda.core.messaging.*
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowHandleImpl
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.FlowProgressHandleImpl
import net.corda.core.messaging.ParametersUpdateInfo
import net.corda.core.messaging.RPCReturnsObservables
import net.corda.core.messaging.StateMachineInfo
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.*
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.node.services.vault.PageSpecification
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.serialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.getOrThrow
@ -32,7 +46,6 @@ import net.corda.node.services.messaging.context
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.nodeapi.exceptions.NonRpcFlowException
import net.corda.nodeapi.exceptions.RejectedCommandException
import net.corda.nodeapi.internal.persistence.CordaPersistence
import rx.Observable
import java.io.InputStream
import java.security.PublicKey
@ -45,7 +58,6 @@ import java.time.Instant
internal class CordaRPCOpsImpl(
private val services: ServiceHubInternal,
private val smm: StateMachineManager,
private val database: CordaPersistence,
private val flowStarter: FlowStarter,
private val shutdownNode: () -> Unit
) : CordaRPCOps {
@ -68,9 +80,7 @@ internal class CordaRPCOpsImpl(
}
override fun networkMapFeed(): DataFeed<List<NodeInfo>, NetworkMapCache.MapChange> {
return database.transaction {
services.networkMapCache.track()
}
return services.networkMapCache.track()
}
override fun <T : ContractState> vaultQueryBy(criteria: QueryCriteria,
@ -78,9 +88,7 @@ internal class CordaRPCOpsImpl(
sorting: Sort,
contractStateType: Class<out T>): Vault.Page<T> {
contractStateType.checkIsA<ContractState>()
return database.transaction {
services.vaultService._queryBy(criteria, paging, sorting, contractStateType)
}
return services.vaultService._queryBy(criteria, paging, sorting, contractStateType)
}
@RPCReturnsObservables
@ -89,9 +97,7 @@ internal class CordaRPCOpsImpl(
sorting: Sort,
contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
contractStateType.checkIsA<ContractState>()
return database.transaction {
services.vaultService._trackBy(criteria, paging, sorting, contractStateType)
}
return services.vaultService._trackBy(criteria, paging, sorting, contractStateType)
}
@Suppress("OverridingDeprecatedMember")
@ -103,9 +109,7 @@ internal class CordaRPCOpsImpl(
@Suppress("OverridingDeprecatedMember")
override fun internalVerifiedTransactionsFeed(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return database.transaction {
services.validatedTransactions.track()
}
return services.validatedTransactions.track()
}
override fun stateMachinesSnapshot(): List<StateMachineInfo> {
@ -117,14 +121,12 @@ internal class CordaRPCOpsImpl(
override fun killFlow(id: StateMachineRunId) = smm.killFlow(id)
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
return database.transaction {
val (allStateMachines, changes) = smm.track()
DataFeed(
return DataFeed(
allStateMachines.map { stateMachineInfoFromFlowLogic(it) },
changes.map { stateMachineUpdateFromStateMachineChange(it) }
)
}
}
override fun stateMachineRecordedTransactionMappingSnapshot(): List<StateMachineTransactionMapping> {
val (snapshot, updates) = stateMachineRecordedTransactionMappingFeed()
@ -133,9 +135,7 @@ internal class CordaRPCOpsImpl(
}
override fun stateMachineRecordedTransactionMappingFeed(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> {
return database.transaction {
services.stateMachineRecordedTransactionMapping.track()
}
return services.stateMachineRecordedTransactionMapping.track()
}
override fun nodeInfo(): NodeInfo {
@ -147,15 +147,11 @@ internal class CordaRPCOpsImpl(
}
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) {
return database.transaction {
services.vaultService.addNoteToTransaction(txnId, txnNote)
}
}
override fun getVaultTransactionNotes(txnId: SecureHash): Iterable<String> {
return database.transaction {
services.vaultService.getTransactionNotes(txnId)
}
return services.vaultService.getTransactionNotes(txnId)
}
override fun <T> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> {
@ -183,38 +179,23 @@ internal class CordaRPCOpsImpl(
}
override fun attachmentExists(id: SecureHash): Boolean {
// TODO: this operation should not require an explicit transaction
return database.transaction {
services.attachments.openAttachment(id) != null
}
return services.attachments.openAttachment(id) != null
}
override fun openAttachment(id: SecureHash): InputStream {
// TODO: this operation should not require an explicit transaction
return database.transaction {
services.attachments.openAttachment(id)!!.open()
}
return services.attachments.openAttachment(id)!!.open()
}
override fun uploadAttachment(jar: InputStream): SecureHash {
// TODO: this operation should not require an explicit transaction
return database.transaction {
services.attachments.importAttachment(jar, RPC_UPLOADER, null)
}
return services.attachments.importAttachment(jar, RPC_UPLOADER, null)
}
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader:String, filename:String): SecureHash {
// TODO: this operation should not require an explicit transaction
return database.transaction {
services.attachments.importAttachment(jar, uploader, filename)
}
override fun uploadAttachmentWithMetadata(jar: InputStream, uploader: String, filename: String): SecureHash {
return services.attachments.importAttachment(jar, uploader, filename)
}
override fun queryAttachments(query: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
// TODO: this operation should not require an explicit transaction
return database.transaction {
services.attachments.queryAttachments(query, sorting)
}
return services.attachments.queryAttachments(query, sorting)
}
override fun currentNodeTime(): Instant = Instant.now(services.clock)
@ -222,44 +203,32 @@ internal class CordaRPCOpsImpl(
override fun waitUntilNetworkReady(): CordaFuture<Void?> = services.networkMapCache.nodeReady
override fun wellKnownPartyFromAnonymous(party: AbstractParty): Party? {
return database.transaction {
services.identityService.wellKnownPartyFromAnonymous(party)
}
return services.identityService.wellKnownPartyFromAnonymous(party)
}
override fun partyFromKey(key: PublicKey): Party? {
return database.transaction {
services.identityService.partyFromKey(key)
}
return services.identityService.partyFromKey(key)
}
override fun wellKnownPartyFromX500Name(x500Name: CordaX500Name): Party? {
return database.transaction {
services.identityService.wellKnownPartyFromX500Name(x500Name)
}
return services.identityService.wellKnownPartyFromX500Name(x500Name)
}
override fun notaryPartyFromX500Name(x500Name: CordaX500Name): Party? = services.networkMapCache.getNotary(x500Name)
override fun partiesFromName(query: String, exactMatch: Boolean): Set<Party> {
return database.transaction {
services.identityService.partiesFromName(query, exactMatch)
}
return services.identityService.partiesFromName(query, exactMatch)
}
override fun nodeInfoFromParty(party: AbstractParty): NodeInfo? {
return database.transaction {
services.networkMapCache.getNodeByLegalIdentity(party)
}
return services.networkMapCache.getNodeByLegalIdentity(party)
}
override fun registeredFlows(): List<String> = services.rpcFlows.map { it.name }.sorted()
override fun clearNetworkMapCache() {
database.transaction {
services.networkMapCache.clearNetworkMapCache()
}
}
override fun <T : ContractState> vaultQuery(contractStateType: Class<out T>): Vault.Page<T> {
return vaultQueryBy(QueryCriteria.VaultQueryCriteria(), PageSpecification(), Sort(emptySet()), contractStateType)

View File

@ -3,7 +3,9 @@ package net.corda.node.internal
import com.codahale.metrics.JmxReporter
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
import net.corda.core.concurrent.CordaFuture
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.Emoji
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.thenMatch
@ -13,7 +15,6 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.IdentityService
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.serialization.internal.SerializationEnvironmentImpl
import net.corda.core.serialization.internal.nodeSerializationEnv
@ -316,7 +317,9 @@ open class Node(configuration: NodeConfiguration,
* This is not using the H2 "automatic mixed mode" directly but leans on many of the underpinnings. For more details
* on H2 URLs and configuration see: http://www.h2database.com/html/features.html#database_url
*/
override fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence {
override fun initialiseDatabasePersistence(schemaService: SchemaService,
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?): CordaPersistence {
val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url")
val h2Prefix = "jdbc:h2:file:"
if (databaseUrl != null && databaseUrl.startsWith(h2Prefix)) {
@ -333,7 +336,7 @@ open class Node(configuration: NodeConfiguration,
printBasicNodeInfo("Database connection url is", "jdbc:h2:$url/node")
}
}
return super.initialiseDatabasePersistence(schemaService, identityService)
return super.initialiseDatabasePersistence(schemaService, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous)
}
private val _startupComplete = openFuture<Unit>()

View File

@ -24,6 +24,7 @@ import net.corda.node.services.network.NetworkMapUpdater
import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.contextDatabase
interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBaseInternal
interface NetworkMapCacheBaseInternal : NetworkMapCacheBase {
@ -52,8 +53,10 @@ interface ServiceHubInternal : ServiceHub {
fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>,
validatedTransactions: WritableTransactionStorage,
stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage,
vaultService: VaultServiceInternal) {
vaultService: VaultServiceInternal,
database: CordaPersistence) {
database.transaction {
require(txs.any()) { "No transactions passed in for recording" }
val recordedTransactions = txs.filter { validatedTransactions.addTransaction(it) }
val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
@ -104,6 +107,7 @@ interface ServiceHubInternal : ServiceHub {
}
}
}
}
override val vaultService: VaultServiceInternal
/**
@ -125,7 +129,7 @@ interface ServiceHubInternal : ServiceHub {
val networkMapUpdater: NetworkMapUpdater
override val cordappProvider: CordappProviderInternal
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
recordTransactions(statesToRecord, txs, validatedTransactions, stateMachineRecordedTransactionMapping, vaultService)
recordTransactions(statesToRecord, txs, validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database)
}
fun getFlowFactory(initiatingFlowClass: Class<out FlowLogic<*>>): InitiatedFlowFactory<*>?

View File

@ -16,6 +16,7 @@ import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.crypto.X509CertificateFactory
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.crypto.x509Certificates
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
import java.io.Serializable
@ -38,6 +39,7 @@ import javax.persistence.Lob
// TODO There is duplicated logic between this and InMemoryIdentityService
@ThreadSafe
class PersistentIdentityService(override val trustRoot: X509Certificate,
private val database: CordaPersistence,
caCertificates: List<X509Certificate> = emptyList()) : SingletonSerializeAsToken(), IdentityServiceInternal {
companion object {
@ -110,6 +112,7 @@ class PersistentIdentityService(override val trustRoot: X509Certificate,
/** Requires a database transaction. */
fun loadIdentities(identities: Iterable<PartyAndCertificate> = emptySet(), confidentialIdentities: Iterable<PartyAndCertificate> = emptySet()) {
database.transaction {
identities.forEach {
val key = mapToKey(it)
keyToParties.addWithDuplicatesAllowed(key, it, false)
@ -120,9 +123,12 @@ class PersistentIdentityService(override val trustRoot: X509Certificate,
}
log.debug("Identities loaded")
}
}
@Throws(CertificateExpiredException::class, CertificateNotYetValidException::class, InvalidAlgorithmParameterException::class)
override fun verifyAndRegisterIdentity(identity: PartyAndCertificate): PartyAndCertificate? {
return database.transaction {
// Validate the chain first, before we do anything clever with it
val identityCertChain = identity.certPath.x509Certificates
try {
@ -150,34 +156,40 @@ class PersistentIdentityService(override val trustRoot: X509Certificate,
// Always keep the first party we registered, as that's the well known identity
principalToParties.addWithDuplicatesAllowed(identity.name, key, false)
val parentId = mapToKey(identityCertChain[1].publicKey)
return keyToParties[parentId]
keyToParties[parentId]
}
}
override fun certificateFromKey(owningKey: PublicKey): PartyAndCertificate? = keyToParties[mapToKey(owningKey)]
override fun certificateFromKey(owningKey: PublicKey): PartyAndCertificate? = database.transaction { keyToParties[mapToKey(owningKey)] }
private fun certificateFromCordaX500Name(name: CordaX500Name): PartyAndCertificate? {
return database.transaction {
val partyId = principalToParties[name]
return if (partyId != null) {
if (partyId != null) {
keyToParties[partyId]
} else null
}
}
// We give the caller a copy of the data set to avoid any locking problems
override fun getAllIdentities(): Iterable<PartyAndCertificate> = keyToParties.allPersisted().map { it.second }.asIterable()
override fun getAllIdentities(): Iterable<PartyAndCertificate> = database.transaction { keyToParties.allPersisted().map { it.second }.asIterable() }
override fun partyFromKey(key: PublicKey): Party? = certificateFromKey(key)?.party
override fun wellKnownPartyFromX500Name(name: CordaX500Name): Party? = certificateFromCordaX500Name(name)?.party
override fun wellKnownPartyFromAnonymous(party: AbstractParty): Party? {
return database.transaction {
// The original version of this would return the party as-is if it was a Party (rather than AnonymousParty),
// however that means that we don't verify that we know who owns the key. As such as now enforce turning the key
// into a party, and from there figure out the well known party.
val candidate = partyFromKey(party.owningKey)
// TODO: This should be done via the network map cache, which is the authoritative source of well known identities
return if (candidate != null) {
if (candidate != null) {
wellKnownPartyFromX500Name(candidate.name)
} else {
null
}
}
}
override fun wellKnownPartyFromAnonymous(partyRef: PartyAndReference) = wellKnownPartyFromAnonymous(partyRef.party)
override fun requireWellKnownPartyFromAnonymous(party: AbstractParty): Party {
@ -185,20 +197,23 @@ class PersistentIdentityService(override val trustRoot: X509Certificate,
}
override fun partiesFromName(query: String, exactMatch: Boolean): Set<Party> {
return database.transaction {
val results = LinkedHashSet<Party>()
for ((x500name, partyId) in principalToParties.allPersisted()) {
partiesFromName(query, exactMatch, x500name, results, keyToParties[partyId]!!.party)
}
return results
results
}
}
@Throws(UnknownAnonymousPartyException::class)
override fun assertOwnership(party: Party, anonymousParty: AnonymousParty) {
val anonymousIdentity = certificateFromKey(anonymousParty.owningKey) ?:
throw UnknownAnonymousPartyException("Unknown $anonymousParty")
database.transaction {
val anonymousIdentity = certificateFromKey(anonymousParty.owningKey) ?: throw UnknownAnonymousPartyException("Unknown $anonymousParty")
val issuingCert = anonymousIdentity.certPath.certificates[1]
require(issuingCert.publicKey == party.owningKey) {
"Issuing certificate's public key must match the party key ${party.owningKey.toStringShort()}."
}
}
}
}

View File

@ -7,6 +7,7 @@ import net.corda.core.node.services.KeyManagementService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.MAX_HASH_HEX_SIZE
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
import org.bouncycastle.operator.ContentSigner
@ -27,7 +28,8 @@ import javax.persistence.Lob
* This class needs database transactions to be in-flight during method calls and init.
*/
class PersistentKeyManagementService(val identityService: IdentityService,
initialKeys: Set<KeyPair>) : SingletonSerializeAsToken(), KeyManagementService {
initialKeys: Set<KeyPair>,
private val database: CordaPersistence) : SingletonSerializeAsToken(), KeyManagementService {
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}our_key_pairs")
@ -65,17 +67,23 @@ class PersistentKeyManagementService(val identityService: IdentityService,
val keysMap = createKeyMap()
init {
// TODO this should be in a start function, not in an init block.
database.transaction {
initialKeys.forEach({ it -> keysMap.addWithDuplicatesAllowed(it.public, it.private) })
}
}
override val keys: Set<PublicKey> get() = keysMap.allPersisted().map { it.first }.toSet()
override val keys: Set<PublicKey> get() = database.transaction { keysMap.allPersisted().map { it.first }.toSet() }
override fun filterMyKeys(candidateKeys: Iterable<PublicKey>): Iterable<PublicKey> =
override fun filterMyKeys(candidateKeys: Iterable<PublicKey>): Iterable<PublicKey> = database.transaction {
candidateKeys.filter { keysMap[it] != null }
}
override fun freshKey(): PublicKey {
val keyPair = generateKeyPair()
database.transaction {
keysMap[keyPair.public] = keyPair.private
}
return keyPair.public
}
@ -86,8 +94,10 @@ class PersistentKeyManagementService(val identityService: IdentityService,
//It looks for the PublicKey in the (potentially) CompositeKey that is ours, and then returns the associated PrivateKey to use in signing
private fun getSigningKeyPair(publicKey: PublicKey): KeyPair {
return database.transaction {
val pk = publicKey.keys.first { keysMap[it] != null } //TODO here for us to re-write this using an actual query if publicKey.keys.size > 1
return KeyPair(pk, keysMap[pk]!!)
KeyPair(pk, keysMap[pk]!!)
}
}
override fun sign(bytes: ByteArray, publicKey: PublicKey): DigitalSignature.WithKey {

View File

@ -150,9 +150,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message)
}
private val messagesToRedeliver = database.transaction {
createMessageToRedeliver()
}
private val messagesToRedeliver = createMessageToRedeliver()
private val scheduledMessageRedeliveries = ConcurrentHashMap<Long, ScheduledFuture<*>>()

View File

@ -37,8 +37,9 @@ import kotlin.collections.HashSet
class NetworkMapCacheImpl(
networkMapCacheBase: NetworkMapCacheBaseInternal,
private val identityService: IdentityService
) : NetworkMapCacheBaseInternal by networkMapCacheBase, NetworkMapCacheInternal {
private val identityService: IdentityService,
private val database: CordaPersistence
) : NetworkMapCacheBaseInternal by networkMapCacheBase, NetworkMapCacheInternal, SingletonSerializeAsToken() {
companion object {
private val logger = loggerFor<NetworkMapCacheImpl>()
}
@ -61,11 +62,13 @@ class NetworkMapCacheImpl(
}
override fun getNodeByLegalIdentity(party: AbstractParty): NodeInfo? {
return database.transaction {
val wellKnownParty = identityService.wellKnownPartyFromAnonymous(party)
return wellKnownParty?.let {
wellKnownParty?.let {
getNodesByLegalIdentityKey(it.owningKey).firstOrNull()
}
}
}
}
/**
@ -182,7 +185,7 @@ open class PersistentNetworkMapCache(
override fun track(): DataFeed<List<NodeInfo>, MapChange> {
synchronized(_changed) {
val allInfos = database.transaction { getAllInfos(session) }.map { it.toNodeInfo() }
val allInfos = database.transaction { getAllInfos(session).map { it.toNodeInfo() } }
return DataFeed(allInfos, _changed.bufferUntilSubscribed().wrapWithDatabaseTransaction())
}
}

View File

@ -2,22 +2,23 @@ package net.corda.node.services.persistence
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.uncheckedCast
import net.corda.core.node.services.IdentityService
import net.corda.core.utilities.contextLogger
import org.hibernate.type.descriptor.WrapperOptions
import org.hibernate.type.descriptor.java.AbstractTypeDescriptor
import org.hibernate.type.descriptor.java.ImmutableMutabilityPlan
import org.hibernate.type.descriptor.java.MutabilityPlan
class AbstractPartyDescriptor(private val identityService: IdentityService) : AbstractTypeDescriptor<AbstractParty>(AbstractParty::class.java) {
class AbstractPartyDescriptor(private val wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
private val wellKnownPartyFromAnonymous: (AbstractParty) -> Party?) : AbstractTypeDescriptor<AbstractParty>(AbstractParty::class.java) {
companion object {
private val log = contextLogger()
}
override fun fromString(dbData: String?): AbstractParty? {
return if (dbData != null) {
val party = identityService.wellKnownPartyFromX500Name(CordaX500Name.parse(dbData))
val party = wellKnownPartyFromX500Name(CordaX500Name.parse(dbData))
if (party == null) log.warn("Identity service unable to resolve X500name: $dbData")
party
} else {
@ -29,7 +30,7 @@ class AbstractPartyDescriptor(private val identityService: IdentityService) : Ab
override fun toString(party: AbstractParty?): String? {
return if (party != null) {
val partyName = party.nameOrNull() ?: identityService.wellKnownPartyFromAnonymous(party)?.name
val partyName = party.nameOrNull() ?: wellKnownPartyFromAnonymous(party)?.name
if (partyName == null) log.warn("Identity service unable to resolve AbstractParty: $party")
partyName.toString()
} else {

View File

@ -2,7 +2,7 @@ package net.corda.node.services.persistence
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.node.services.IdentityService
import net.corda.core.identity.Party
import net.corda.core.utilities.contextLogger
import javax.persistence.AttributeConverter
import javax.persistence.Converter
@ -12,14 +12,15 @@ import javax.persistence.Converter
* Completely anonymous parties are stored as null (to preserve privacy).
*/
@Converter(autoApply = true)
class AbstractPartyToX500NameAsStringConverter(private val identityService: IdentityService) : AttributeConverter<AbstractParty, String> {
class AbstractPartyToX500NameAsStringConverter(private val wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
private val wellKnownPartyFromAnonymous: (AbstractParty) -> Party?) : AttributeConverter<AbstractParty, String> {
companion object {
private val log = contextLogger()
}
override fun convertToDatabaseColumn(party: AbstractParty?): String? {
if (party != null) {
val partyName = identityService.wellKnownPartyFromAnonymous(party)?.toString()
val partyName = wellKnownPartyFromAnonymous(party)?.toString()
if (partyName != null) return partyName
log.warn("Identity service unable to resolve AbstractParty: $party")
}
@ -28,7 +29,7 @@ class AbstractPartyToX500NameAsStringConverter(private val identityService: Iden
override fun convertToEntityAttribute(dbData: String?): AbstractParty? {
if (dbData != null) {
val party = identityService.wellKnownPartyFromX500Name(CordaX500Name.parse(dbData))
val party = wellKnownPartyFromX500Name(CordaX500Name.parse(dbData))
if (party != null) return party
log.warn("Identity service unable to resolve X500name: $dbData")
}

View File

@ -7,6 +7,7 @@ import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage
import net.corda.node.utilities.*
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
@ -23,7 +24,7 @@ import javax.persistence.*
* RPC API to correlate transaction creation with flows.
*/
@ThreadSafe
class DBTransactionMappingStorage : StateMachineRecordedTransactionMappingStorage {
class DBTransactionMappingStorage(private val database: CordaPersistence) : StateMachineRecordedTransactionMappingStorage {
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}transaction_mappings")
@ -56,11 +57,14 @@ class DBTransactionMappingStorage : StateMachineRecordedTransactionMappingStorag
val updates: PublishSubject<StateMachineTransactionMapping> = PublishSubject.create()
override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) {
database.transaction {
stateMachineTransactionMap.addWithDuplicatesAllowed(transactionId, stateMachineRunId)
updates.bufferUntilDatabaseCommit().onNext(StateMachineTransactionMapping(stateMachineRunId, transactionId))
}
}
override fun track(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> =
override fun track(): DataFeed<List<StateMachineTransactionMapping>, StateMachineTransactionMapping> = database.transaction {
DataFeed(stateMachineTransactionMap.allPersisted().map { StateMachineTransactionMapping(it.second, it.first) }.toList(),
updates.bufferUntilSubscribed().wrapWithDatabaseTransaction())
}
}

View File

@ -1,4 +1,5 @@
package net.corda.node.services.persistence
import net.corda.core.concurrent.CordaFuture
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.TransactionSignature
@ -7,13 +8,18 @@ import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.*
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.toFuture
import net.corda.core.transactions.CoreTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.utilities.AppendOnlyPersistentMapBase
import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
@ -21,14 +27,19 @@ import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
import rx.Observable
import rx.subjects.PublishSubject
import java.io.Serializable
import javax.persistence.*
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import javax.persistence.Lob
import javax.persistence.Table
// cache value type to just store the immutable bits of a signed transaction plus conversion helpers
typealias TxCacheValue = Pair<SerializedBytes<CoreTransaction>, List<TransactionSignature>>
fun TxCacheValue.toSignedTx() = SignedTransaction(this.first, this.second)
fun SignedTransaction.toTxCacheValue() = TxCacheValue(this.txBits, this.sigs)
class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, SingletonSerializeAsToken() {
class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPersistence) : WritableTransactionStorage, SingletonSerializeAsToken() {
@Entity
@Table(name = "${NODE_DATABASE_PREFIX}transactions")
@ -55,8 +66,7 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S
toPersistentEntity = { key: SecureHash, value: TxCacheValue ->
DBTransaction().apply {
txId = key.toString()
transaction = value.toSignedTx().
serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
transaction = value.toSignedTx().serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
}
},
persistentEntityClass = DBTransaction::class.java,
@ -81,26 +91,30 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S
private val txStorage = ThreadBox(createTransactionsMap(cacheSizeBytes))
override fun addTransaction(transaction: SignedTransaction): Boolean =
override fun addTransaction(transaction: SignedTransaction): Boolean = database.transaction {
txStorage.locked {
addWithDuplicatesAllowed(transaction.id, transaction.toTxCacheValue()).apply {
updatesPublisher.bufferUntilDatabaseCommit().onNext(transaction)
}
}
}
override fun getTransaction(id: SecureHash): SignedTransaction? = txStorage.content[id]?.toSignedTx()
override fun getTransaction(id: SecureHash): SignedTransaction? = database.transaction { txStorage.content[id]?.toSignedTx() }
private val updatesPublisher = PublishSubject.create<SignedTransaction>().toSerialized()
override val updates: Observable<SignedTransaction> = updatesPublisher.wrapWithDatabaseTransaction()
override fun track(): DataFeed<List<SignedTransaction>, SignedTransaction> {
return txStorage.locked {
return database.transaction {
txStorage.locked {
DataFeed(allPersisted().map { it.second.toSignedTx() }.toList(), updates.bufferUntilSubscribed())
}
}
}
override fun trackTransaction(id: SecureHash): CordaFuture<SignedTransaction> {
return txStorage.locked {
return database.transaction {
txStorage.locked {
val existingTransaction = get(id)
if (existingTransaction == null) {
updates.filter { it.id == id }.toFuture()
@ -109,8 +123,9 @@ class DBTransactionStorage(cacheSizeBytes: Long) : WritableTransactionStorage, S
}
}
}
}
@VisibleForTesting
val transactions: Iterable<SignedTransaction>
get() = txStorage.content.allPersisted().map { it.second.toSignedTx() }.toList()
get() = database.transaction { txStorage.content.allPersisted().map { it.second.toSignedTx() }.toList() }
}

View File

@ -20,13 +20,18 @@ import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.AttachmentStorage
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.serialization.*
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationToken
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SerializeAsTokenContext
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.vault.HibernateAttachmentQueryCriteriaParser
import net.corda.node.utilities.NonInvalidatingCache
import net.corda.node.utilities.NonInvalidatingWeightBasedCache
import net.corda.nodeapi.exceptions.DuplicateAttachmentException
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.nodeapi.internal.withContractsInJar
@ -39,7 +44,16 @@ import java.time.Instant
import java.util.*
import java.util.jar.JarInputStream
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.*
import javax.persistence.CollectionTable
import javax.persistence.Column
import javax.persistence.ElementCollection
import javax.persistence.Entity
import javax.persistence.ForeignKey
import javax.persistence.Id
import javax.persistence.Index
import javax.persistence.JoinColumn
import javax.persistence.Lob
import javax.persistence.Table
/**
* Stores attachments using Hibernate to database.
@ -48,7 +62,8 @@ import javax.persistence.*
class NodeAttachmentService(
metrics: MetricRegistry,
attachmentContentCacheSize: Long = NodeConfiguration.defaultAttachmentContentCacheSize,
attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound
attachmentCacheBound: Long = NodeConfiguration.defaultAttachmentCacheBound,
private val database: CordaPersistence
) : AttachmentStorage, SingletonSerializeAsToken(
) {
@ -107,7 +122,8 @@ class NodeAttachmentService(
private val attachmentCount = metrics.counter("Attachments")
init {
fun start() {
database.transaction {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(Long::class.java)
@ -115,6 +131,7 @@ class NodeAttachmentService(
val count = session.createQuery(criteriaQuery).singleResult
attachmentCount.inc(count)
}
}
@CordaSerializable
class HashMismatchException(val expected: SecureHash, val actual: SecureHash) : CordaRuntimeException("File $expected hashed to $actual: corruption in attachment store?")
@ -194,10 +211,8 @@ class NodeAttachmentService(
}
override fun toToken(context: SerializeAsTokenContext) = Token(id, checkOnLoad)
}
// slightly complex 2 level approach to attachment caching:
// On the first level we cache attachment contents loaded from the DB by their key. This is a weight based
// cache (we don't want to waste too much memory on this) and could be evicted quite aggressively. If we fail
@ -217,8 +232,8 @@ class NodeAttachmentService(
)
private fun loadAttachmentContent(id: SecureHash): Pair<Attachment, ByteArray>? {
val attachment = currentDBSession().get(NodeAttachmentService.DBAttachment::class.java, id.toString())
?: return null
return database.transaction {
val attachment = currentDBSession().get(NodeAttachmentService.DBAttachment::class.java, id.toString()) ?: return@transaction null
val attachmentImpl = AttachmentImpl(id, { attachment.content }, checkAttachmentsOnLoad).let {
val contracts = attachment.contractClassNames
if (contracts != null && contracts.isNotEmpty()) {
@ -227,7 +242,8 @@ class NodeAttachmentService(
it
}
}
return Pair(attachmentImpl, attachment.content)
Pair(attachmentImpl, attachment.content)
}
}
private val attachmentCache = NonInvalidatingCache<SecureHash, Optional<Attachment>>(
@ -263,12 +279,14 @@ class NodeAttachmentService(
return import(jar, uploader, filename)
}
override fun hasAttachment(attachmentId: AttachmentId): Boolean =
override fun hasAttachment(attachmentId: AttachmentId): Boolean = database.transaction {
currentDBSession().find(NodeAttachmentService.DBAttachment::class.java, attachmentId.toString()) != null
}
// TODO: PLT-147: The attachment should be randomised to prevent brute force guessing and thus privacy leaks.
private fun import(jar: InputStream, uploader: String?, filename: String?): AttachmentId {
return withContractsInJar(jar) { contractClassNames, inputStream ->
return database.transaction {
withContractsInJar(jar) { contractClassNames, inputStream ->
require(inputStream !is JarInputStream)
// Read the file into RAM and then calculate its hash. The attachment must fit into memory.
@ -292,6 +310,7 @@ class NodeAttachmentService(
}
}
}
}
@Suppress("OverridingDeprecatedMember")
override fun importOrGetAttachment(jar: InputStream): AttachmentId = try {
@ -302,6 +321,7 @@ class NodeAttachmentService(
override fun queryAttachments(criteria: AttachmentQueryCriteria, sorting: AttachmentSort?): List<AttachmentId> {
log.info("Attachment query criteria: $criteria, sorting: $sorting")
return database.transaction {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
@ -317,9 +337,7 @@ class NodeAttachmentService(
val query = session.createQuery(criteriaQuery)
// execution
val results = query.resultList
return results.map { AttachmentId.parse(it.attId) }
query.resultList.map { AttachmentId.parse(it.attId) }
}
}
}

View File

@ -37,6 +37,7 @@ import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.services.statemachine.transitions.StateMachineConfiguration
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import net.corda.serialization.internal.SerializeAsTokenContextImpl
import net.corda.serialization.internal.withTokenContext
import org.apache.activemq.artemis.utils.ReusableLatch
@ -175,7 +176,9 @@ class SingleThreadedStateMachineManager(
*/
override fun track(): DataFeed<List<FlowLogic<*>>, StateMachineManager.Change> {
return mutex.locked {
DataFeed(flows.values.map { it.fiber.logic }, changesPublisher.bufferUntilSubscribed())
database.transaction {
DataFeed(flows.values.map { it.fiber.logic }, changesPublisher.bufferUntilSubscribed().wrapWithDatabaseTransaction(database))
}
}
}

View File

@ -48,7 +48,8 @@ class NodeVaultService(
private val clock: Clock,
private val keyManagementService: KeyManagementService,
private val servicesForResolution: ServicesForResolution,
hibernateConfig: HibernateConfiguration
hibernateConfig: HibernateConfiguration,
private val database: CordaPersistence
) : SingletonSerializeAsToken(), VaultServiceInternal {
private companion object {
private val log = contextLogger()
@ -225,11 +226,14 @@ class NodeVaultService(
}
override fun addNoteToTransaction(txnId: SecureHash, noteText: String) {
database.transaction {
val txnNoteEntity = VaultSchemaV1.VaultTxnNote(txnId.toString(), noteText)
currentDBSession().save(txnNoteEntity)
}
}
override fun getTransactionNotes(txnId: SecureHash): Iterable<String> {
return database.transaction {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(VaultSchemaV1.VaultTxnNote::class.java)
@ -237,7 +241,8 @@ class NodeVaultService(
val txIdPredicate = criteriaBuilder.equal(vaultStates.get<Vault.StateStatus>(VaultSchemaV1.VaultTxnNote::txId.name), txnId.toString())
criteriaQuery.where(txIdPredicate)
val results = session.createQuery(criteriaQuery).resultList
return results.asIterable().map { it.note }
results.asIterable().map { it.note }
}
}
@Throws(StatesNotAvailableException::class)
@ -403,6 +408,7 @@ class NodeVaultService(
@Throws(VaultQueryException::class)
private fun <T : ContractState> _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>, skipPagingChecks: Boolean): Vault.Page<T> {
log.info("Vault Query for contract type: $contractStateType, criteria: $criteria, pagination: $paging, sorting: $sorting")
return database.transaction {
// calculate total results where a page specification has been defined
var totalStates = -1L
if (!skipPagingChecks && !paging.isDefault) {
@ -473,19 +479,22 @@ class NodeVaultService(
if (stateRefs.isNotEmpty())
statesAndRefs.addAll(uncheckedCast(servicesForResolution.loadStates(stateRefs)))
return Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates, otherResults = otherResults)
Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates, otherResults = otherResults)
}
}
@Throws(VaultQueryException::class)
override fun <T : ContractState> _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return mutex.locked {
return database.transaction {
mutex.locked {
val snapshotResults = _queryBy(criteria, paging, sorting, contractStateType)
val updates: Observable<Vault.Update<T>> = uncheckedCast(_updatesPublisher.bufferUntilSubscribed().filter { it.containsType(contractStateType, snapshotResults.stateTypes) })
DataFeed(snapshotResults, updates)
}
}
}
private fun getSession() = contextDatabase.currentOrNew().session
private fun getSession() = database.currentOrNew().session
/**
* Derive list from existing vault states and then incrementally update using vault observables
*/

View File

@ -32,7 +32,7 @@ class AbstractNodeTests {
@Test
fun `logVendorString does not leak connection`() {
// Note this test also covers a transaction that CordaPersistence does while it's instantiating:
val database = configureDatabase(hikariProperties(freshURL()), DatabaseConfig(), rigorousMock())
val database = configureDatabase(hikariProperties(freshURL()), DatabaseConfig(), { null }, { null })
val log = mock<Logger>() // Don't care what happens here.
// Actually 10 is enough to reproduce old code hang, as pool size is 10 and we leaked 9 connections and 1 is in flight:
repeat(100) {

View File

@ -68,7 +68,7 @@ class NodeTest {
doReturn("tsp").whenever(it).trustStorePassword
doReturn("ksp").whenever(it).keyStorePassword
}
configureDatabase(dataSourceProperties, databaseConfig, rigorousMock()).use { database ->
configureDatabase(dataSourceProperties, databaseConfig, { null }, { null }).use { database ->
val node = Node(configuration, rigorousMock<VersionInfo>().also {
doReturn(platformVersion).whenever(it).platformVersion
}, initialiseSerialization = false)

View File

@ -0,0 +1,70 @@
package net.corda.node.services
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.ContractState
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.internal.packageName
import net.corda.core.node.services.queryBy
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.issuedBy
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.startFlow
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
import rx.schedulers.Schedulers
import java.util.concurrent.CountDownLatch
class ServiceHubConcurrentUsageTest {
private val mockNet = InternalMockNetwork(listOf(Cash::class.packageName))
@After
fun stopNodes() {
mockNet.stopNodes()
}
@Test
fun `operations requiring a transaction work from another thread`() {
val latch = CountDownLatch(1)
var successful = false
val initiatingFlow = TestFlow(mockNet.defaultNotaryIdentity)
val node = mockNet.createPartyNode()
node.services.validatedTransactions.updates.observeOn(Schedulers.io()).subscribe { _ ->
try {
node.services.vaultService.queryBy<ContractState>().states
successful = true
} finally {
latch.countDown()
}
}
val flow = node.services.startFlow(initiatingFlow)
mockNet.runNetwork()
flow.resultFuture.getOrThrow()
latch.await()
assertThat(successful).isTrue()
}
class TestFlow(private val notary: Party) : FlowLogic<SignedTransaction>() {
@Suspendable
override fun call(): SignedTransaction {
val builder = TransactionBuilder(notary)
val issuer = ourIdentity.ref(OpaqueBytes.of(0))
Cash().generateIssue(builder, 10.DOLLARS.issuedBy(issuer), ourIdentity, notary)
val stx = serviceHub.signInitialTransaction(builder)
return subFlow(FinalityFlow(stx))
}
}
}

View File

@ -145,7 +145,7 @@ class MockScheduledFlowRepository : ScheduledFlowRepository {
}
class NodeSchedulerServiceTest : NodeSchedulerServiceTestBase() {
private val database = configureDatabase(MockServices.makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock())
private val database = configureDatabase(MockServices.makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
@After
fun closeDatabase() {
@ -296,7 +296,7 @@ class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() {
@Test
fun `test that correct item is returned`() {
val dataSourceProps = MockServices.makeTestDataSourceProperties()
val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock())
val database = configureDatabase(dataSourceProps, databaseConfig, { null }, { null })
database.transaction {
val repo = PersistentScheduledFlowRepository(database)
val stateRef = StateRef(SecureHash.randomSHA256(), 0)
@ -315,7 +315,7 @@ class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() {
val timeInTheFuture = mark + 1.days
val stateRef = StateRef(SecureHash.zeroHash, 0)
configureDatabase(dataSourceProps, databaseConfig, rigorousMock()).use { database ->
configureDatabase(dataSourceProps, databaseConfig, { null }, { null }).use { database ->
val scheduler = database.transaction {
createScheduler(database)
}
@ -337,7 +337,7 @@ class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() {
transactionStates[stateRef] = transactionStateMock(logicRef, timeInTheFuture)
flows[logicRef] = flowLogic
configureDatabase(dataSourceProps, DatabaseConfig(), rigorousMock()).use { database ->
configureDatabase(dataSourceProps, DatabaseConfig(), { null }, { null }).use { database ->
val newScheduler = database.transaction {
createScheduler(database)
}
@ -360,7 +360,7 @@ class NodeSchedulerPersistenceTest : NodeSchedulerServiceTestBase() {
val logicRef = rigorousMock<FlowLogicRef>()
val flowLogic = rigorousMock<FlowLogic<*>>()
configureDatabase(dataSourceProps, databaseConfig, rigorousMock()).use { database ->
configureDatabase(dataSourceProps, databaseConfig, { null }, { null }).use { database ->
val scheduler = database.transaction {
createScheduler(database)
}

View File

@ -21,7 +21,7 @@ class PersistentScheduledFlowRepositoryTest {
fun `test that earliest item is returned`() {
val laterTime = mark + 1.days
val dataSourceProps = MockServices.makeTestDataSourceProperties()
val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock())
val database = configureDatabase(dataSourceProps, databaseConfig, { null }, { null })
database.transaction {
val repo = PersistentScheduledFlowRepository(database)
@ -43,7 +43,7 @@ class PersistentScheduledFlowRepositoryTest {
fun `test that item is rescheduled`() {
val laterTime = mark + 1.days
val dataSourceProps = MockServices.makeTestDataSourceProperties()
val database = configureDatabase(dataSourceProps, databaseConfig, rigorousMock())
val database = configureDatabase(dataSourceProps, databaseConfig, { null }, { null })
database.transaction {
val repo = PersistentScheduledFlowRepository(database)
val stateRef = StateRef(SecureHash.randomSHA256(), 0)

View File

@ -14,7 +14,11 @@ import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.crypto.x509Certificates
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.getTestPartyAndCertificate
import net.corda.testing.internal.DEV_INTERMEDIATE_CA
import net.corda.testing.internal.DEV_ROOT_CA
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
@ -23,6 +27,7 @@ import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.atomic.AtomicReference
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNull
@ -50,8 +55,12 @@ class PersistentIdentityServiceTests {
@Before
fun setup() {
identityService = PersistentIdentityService(DEV_ROOT_CA.certificate)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), identityService)
val identityServiceRef = AtomicReference<IdentityService>()
// Do all of this in a database transaction so anything that might need a connection has one.
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(),
{ name -> identityServiceRef.get().wellKnownPartyFromX500Name(name) },
{ party -> identityServiceRef.get().wellKnownPartyFromAnonymous(party) })
identityService = PersistentIdentityService(DEV_ROOT_CA.certificate, database).also(identityServiceRef::set)
}
@After
@ -62,80 +71,57 @@ class PersistentIdentityServiceTests {
@Test
fun `get all identities`() {
// Nothing registered, so empty set
database.transaction {
assertNull(identityService.getAllIdentities().firstOrNull())
}
database.transaction {
identityService.verifyAndRegisterIdentity(ALICE_IDENTITY)
}
var expected = setOf(ALICE)
var actual = database.transaction {
identityService.getAllIdentities().map { it.party }.toHashSet()
}
var actual = identityService.getAllIdentities().map { it.party }.toHashSet()
assertEquals(expected, actual)
// Add a second party and check we get both back
database.transaction {
identityService.verifyAndRegisterIdentity(BOB_IDENTITY)
}
expected = setOf(ALICE, BOB)
actual = database.transaction {
identityService.getAllIdentities().map { it.party }.toHashSet()
}
actual = identityService.getAllIdentities().map { it.party }.toHashSet()
assertEquals(expected, actual)
}
@Test
fun `get identity by key`() {
database.transaction {
assertNull(identityService.partyFromKey(ALICE_PUBKEY))
identityService.verifyAndRegisterIdentity(ALICE_IDENTITY)
assertEquals(ALICE, identityService.partyFromKey(ALICE_PUBKEY))
assertNull(identityService.partyFromKey(BOB_PUBKEY))
}
}
@Test
fun `get identity by name with no registered identities`() {
database.transaction {
assertNull(identityService.wellKnownPartyFromX500Name(ALICE.name))
}
}
@Test
fun `get identity by substring match`() {
database.transaction {
identityService.verifyAndRegisterIdentity(ALICE_IDENTITY)
identityService.verifyAndRegisterIdentity(BOB_IDENTITY)
}
val alicente = getTestPartyAndCertificate(CordaX500Name(organisation = "Alicente Worldwide", locality = "London", country = "GB"), generateKeyPair().public)
database.transaction {
identityService.verifyAndRegisterIdentity(alicente)
assertEquals(setOf(ALICE, alicente.party), identityService.partiesFromName("Alice", false))
assertEquals(setOf(ALICE), identityService.partiesFromName("Alice Corp", true))
assertEquals(setOf(BOB), identityService.partiesFromName("Bob Plc", true))
}
}
@Test
fun `get identity by name`() {
val identities = listOf("Organisation A", "Organisation B", "Organisation C")
.map { getTestPartyAndCertificate(CordaX500Name(organisation = it, locality = "London", country = "GB"), generateKeyPair().public) }
database.transaction {
assertNull(identityService.wellKnownPartyFromX500Name(identities.first().name))
}
identities.forEach {
database.transaction {
identityService.verifyAndRegisterIdentity(it)
}
}
identities.forEach {
database.transaction {
assertEquals(it.party, identityService.wellKnownPartyFromX500Name(it.name))
}
}
}
/**
* Generate a certificate path from a root CA, down to a transaction key, store and verify the association.
@ -149,11 +135,9 @@ class PersistentIdentityServiceTests {
val txIdentity = AnonymousParty(txKey.public)
assertFailsWith<UnknownAnonymousPartyException> {
database.transaction {
identityService.assertOwnership(identity, txIdentity)
}
}
}
/**
* Generate a pair of certificate paths from a root CA, down to a transaction key, store and verify the associations.
@ -165,25 +149,15 @@ class PersistentIdentityServiceTests {
val (_, bobTxIdentity) = createConfidentialIdentity(ALICE.name)
// Now we have identities, construct the service and let it know about both
database.transaction {
identityService.verifyAndRegisterIdentity(alice)
identityService.verifyAndRegisterIdentity(aliceTxIdentity)
}
var actual = database.transaction {
identityService.certificateFromKey(aliceTxIdentity.party.owningKey)
}
var actual = identityService.certificateFromKey(aliceTxIdentity.party.owningKey)
assertEquals(aliceTxIdentity, actual!!)
database.transaction {
assertNull(identityService.certificateFromKey(bobTxIdentity.party.owningKey))
}
database.transaction {
identityService.verifyAndRegisterIdentity(bobTxIdentity)
}
actual = database.transaction {
identityService.certificateFromKey(bobTxIdentity.party.owningKey)
}
actual = identityService.certificateFromKey(bobTxIdentity.party.owningKey)
assertEquals(bobTxIdentity, actual!!)
}
@ -196,69 +170,50 @@ class PersistentIdentityServiceTests {
val (alice, anonymousAlice) = createConfidentialIdentity(ALICE.name)
val (bob, anonymousBob) = createConfidentialIdentity(BOB.name)
database.transaction {
// Now we have identities, construct the service and let it know about both
identityService.verifyAndRegisterIdentity(anonymousAlice)
identityService.verifyAndRegisterIdentity(anonymousBob)
}
// Verify that paths are verified
database.transaction {
identityService.assertOwnership(alice.party, anonymousAlice.party.anonymise())
identityService.assertOwnership(bob.party, anonymousBob.party.anonymise())
}
assertFailsWith<IllegalArgumentException> {
database.transaction {
identityService.assertOwnership(alice.party, anonymousBob.party.anonymise())
}
}
assertFailsWith<IllegalArgumentException> {
database.transaction {
identityService.assertOwnership(bob.party, anonymousAlice.party.anonymise())
}
}
assertFailsWith<IllegalArgumentException> {
val owningKey = DEV_INTERMEDIATE_CA.certificate.publicKey
database.transaction {
val subject = CordaX500Name.build(DEV_INTERMEDIATE_CA.certificate.subjectX500Principal)
identityService.assertOwnership(Party(subject, owningKey), anonymousAlice.party.anonymise())
}
}
}
@Test
fun `Test Persistence`() {
val (alice, anonymousAlice) = createConfidentialIdentity(ALICE.name)
val (bob, anonymousBob) = createConfidentialIdentity(BOB.name)
database.transaction {
// Register well known identities
identityService.verifyAndRegisterIdentity(alice)
identityService.verifyAndRegisterIdentity(bob)
// Register an anonymous identities
identityService.verifyAndRegisterIdentity(anonymousAlice)
identityService.verifyAndRegisterIdentity(anonymousBob)
}
// Create new identity service mounted onto same DB
val newPersistentIdentityService = database.transaction {
PersistentIdentityService(DEV_ROOT_CA.certificate)
}
val newPersistentIdentityService = PersistentIdentityService(DEV_ROOT_CA.certificate, database)
database.transaction {
newPersistentIdentityService.assertOwnership(alice.party, anonymousAlice.party.anonymise())
newPersistentIdentityService.assertOwnership(bob.party, anonymousBob.party.anonymise())
}
val aliceParent = database.transaction {
newPersistentIdentityService.wellKnownPartyFromAnonymous(anonymousAlice.party.anonymise())
}
val aliceParent = newPersistentIdentityService.wellKnownPartyFromAnonymous(anonymousAlice.party.anonymise())
assertEquals(alice.party, aliceParent!!)
val bobReload = database.transaction {
newPersistentIdentityService.certificateFromKey(anonymousBob.party.owningKey)
}
val bobReload = newPersistentIdentityService.certificateFromKey(anonymousBob.party.owningKey)
assertEquals(anonymousBob, bobReload!!)
}

View File

@ -76,8 +76,8 @@ class ArtemisMessagingTest {
doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).p2pMessagingRetry
}
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock())
networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()).start(), rigorousMock())
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, emptyList()).start(), rigorousMock(), database)
}
@After

View File

@ -6,7 +6,7 @@ import net.corda.node.internal.configureDatabase
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.junit.After
import org.junit.Assert.*
@ -58,7 +58,7 @@ class AppendOnlyPersistentMapTest(var scenario: Scenario) {
private val database = configureDatabase(makeTestDataSourceProperties(),
DatabaseConfig(),
rigorousMock(),
{ null }, { null },
NodeSchemaService(setOf(MappedSchema(AppendOnlyPersistentMapTest::class.java, 1, listOf(PersistentMapEntry::class.java)))))
@After

View File

@ -17,7 +17,6 @@ import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
@ -45,7 +44,7 @@ class DBCheckpointStorageTests {
@Before
fun setUp() {
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock())
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
newCheckpointStorage()
}

View File

@ -8,14 +8,17 @@ import net.corda.core.crypto.TransactionSignature
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.internal.configureDatabase
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.dummyCommand
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
@ -41,7 +44,7 @@ class DBTransactionStorageTests {
fun setUp() {
LogHelper.setLevel(PersistentUniquenessProvider::class)
val dataSourceProps = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProps, DatabaseConfig(), rigorousMock())
database = configureDatabase(dataSourceProps, DatabaseConfig(), { null }, { null })
newTransactionStorage()
}
@ -53,52 +56,34 @@ class DBTransactionStorageTests {
@Test
fun `empty store`() {
database.transaction {
assertThat(transactionStorage.getTransaction(newTransaction().id)).isNull()
}
database.transaction {
assertThat(transactionStorage.transactions).isEmpty()
}
newTransactionStorage()
database.transaction {
assertThat(transactionStorage.transactions).isEmpty()
}
}
@Test
fun `one transaction`() {
val transaction = newTransaction()
database.transaction {
transactionStorage.addTransaction(transaction)
}
assertTransactionIsRetrievable(transaction)
database.transaction {
assertThat(transactionStorage.transactions).containsExactly(transaction)
}
newTransactionStorage()
assertTransactionIsRetrievable(transaction)
database.transaction {
assertThat(transactionStorage.transactions).containsExactly(transaction)
}
}
@Test
fun `two transactions across restart`() {
val firstTransaction = newTransaction()
val secondTransaction = newTransaction()
database.transaction {
transactionStorage.addTransaction(firstTransaction)
}
newTransactionStorage()
database.transaction {
transactionStorage.addTransaction(secondTransaction)
}
assertTransactionIsRetrievable(firstTransaction)
assertTransactionIsRetrievable(secondTransaction)
database.transaction {
assertThat(transactionStorage.transactions).containsOnly(firstTransaction, secondTransaction)
}
}
@Test
fun `two transactions with rollback`() {
@ -110,25 +95,19 @@ class DBTransactionStorageTests {
rollback()
}
database.transaction {
assertThat(transactionStorage.transactions).isEmpty()
}
}
@Test
fun `two transactions in same DB transaction scope`() {
val firstTransaction = newTransaction()
val secondTransaction = newTransaction()
database.transaction {
transactionStorage.addTransaction(firstTransaction)
transactionStorage.addTransaction(secondTransaction)
}
assertTransactionIsRetrievable(firstTransaction)
assertTransactionIsRetrievable(secondTransaction)
database.transaction {
assertThat(transactionStorage.transactions).containsOnly(firstTransaction, secondTransaction)
}
}
@Test
fun `transaction saved twice in same DB transaction scope`() {
@ -138,36 +117,29 @@ class DBTransactionStorageTests {
transactionStorage.addTransaction(firstTransaction)
}
assertTransactionIsRetrievable(firstTransaction)
database.transaction {
assertThat(transactionStorage.transactions).containsOnly(firstTransaction)
}
}
@Test
fun `transaction saved twice in two DB transaction scopes`() {
val firstTransaction = newTransaction()
val secondTransaction = newTransaction()
database.transaction {
transactionStorage.addTransaction(firstTransaction)
}
database.transaction {
transactionStorage.addTransaction(secondTransaction)
transactionStorage.addTransaction(firstTransaction)
}
assertTransactionIsRetrievable(firstTransaction)
database.transaction {
assertThat(transactionStorage.transactions).containsOnly(firstTransaction, secondTransaction)
}
}
@Test
fun `updates are fired`() {
val future = transactionStorage.updates.toFuture()
val expected = newTransaction()
database.transaction {
transactionStorage.addTransaction(expected)
}
val actual = future.get(1, TimeUnit.SECONDS)
assertEquals(expected, actual)
}
@ -186,16 +158,12 @@ class DBTransactionStorageTests {
}
private fun newTransactionStorage(cacheSizeBytesOverride: Long? = null) {
database.transaction {
transactionStorage = DBTransactionStorage(cacheSizeBytesOverride ?: NodeConfiguration.defaultTransactionCacheSize)
}
transactionStorage = DBTransactionStorage(cacheSizeBytesOverride ?: NodeConfiguration.defaultTransactionCacheSize, database)
}
private fun assertTransactionIsRetrievable(transaction: SignedTransaction) {
database.transaction {
assertThat(transactionStorage.getTransaction(transaction.id)).isEqualTo(transaction)
}
}
private fun newTransaction(): SignedTransaction {
val wtx = WireTransaction(

View File

@ -111,7 +111,7 @@ class HibernateConfigurationTest {
}
}
val schemaService = NodeSchemaService()
database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService, schemaService)
database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService)
database.transaction {
hibernateConfig = database.hibernateConfig
@ -119,7 +119,7 @@ class HibernateConfigurationTest {
services = object : MockServices(cordappPackages, BOB_NAME, rigorousMock<IdentityServiceInternal>().also {
doNothing().whenever(it).justVerifyAndRegisterIdentity(argThat { name == BOB_NAME })
}, generateKeyPair(), dummyNotary.keyPair) {
override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, hibernateConfig)
override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, hibernateConfig, database)
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
for (stx in txs) {
(validatedTransactions as WritableTransactionStorage).addTransaction(stx)

View File

@ -5,7 +5,11 @@ import com.google.common.jimfs.Configuration
import com.google.common.jimfs.Jimfs
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sha256
import net.corda.core.internal.*
import net.corda.core.internal.read
import net.corda.core.internal.readAll
import net.corda.core.internal.readFully
import net.corda.core.internal.write
import net.corda.core.internal.writeLines
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.AttachmentSort
import net.corda.core.node.services.vault.Builder
@ -15,7 +19,6 @@ import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.junit.After
import org.junit.Before
@ -35,14 +38,16 @@ class NodeAttachmentStorageTest {
// Use an in memory file system for testing attachment storage.
private lateinit var fs: FileSystem
private lateinit var database: CordaPersistence
private lateinit var storage: NodeAttachmentService
@Before
fun setUp() {
LogHelper.setLevel(PersistentUniquenessProvider::class)
val dataSourceProperties = makeTestDataSourceProperties()
database = configureDatabase(dataSourceProperties, DatabaseConfig(), rigorousMock())
database = configureDatabase(dataSourceProperties, DatabaseConfig(), { null }, { null })
fs = Jimfs.newFileSystem(Configuration.unix())
storage = NodeAttachmentService(MetricRegistry(), database = database).also { it.start() }
}
@After
@ -52,10 +57,8 @@ class NodeAttachmentStorageTest {
@Test
fun `insert and retrieve`() {
val (testJar,expectedHash) = makeTestJar()
val (testJar, expectedHash) = makeTestJar()
database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
val id = testJar.read { storage.importAttachment(it) }
assertEquals(expectedHash, id)
@ -75,15 +78,12 @@ class NodeAttachmentStorageTest {
it.readBytes()
}
}
}
@Test
fun `missing is not cached`() {
val (testJar, expectedHash) = makeTestJar()
val (jarB, hashB) = makeTestJar(listOf(Pair("file", "content")))
database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
val id = testJar.read { storage.importAttachment(it) }
assertEquals(expectedHash, id)
@ -112,17 +112,12 @@ class NodeAttachmentStorageTest {
it.readBytes()
}
}
}
@Test
fun `metadata can be used to search`() {
val (jarA, _) = makeTestJar()
val (jarB, hashB) = makeTestJar(listOf(Pair("file","content")))
val (jarC, hashC) = makeTestJar(listOf(Pair("magic_file","magic_content_puff")))
database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
val (jarB, hashB) = makeTestJar(listOf(Pair("file", "content")))
val (jarC, hashC) = makeTestJar(listOf(Pair("magic_file", "magic_content_puff")))
jarA.read { storage.importAttachment(it) }
jarB.read { storage.importAttachment(it, "uploaderB", "fileB.zip") }
@ -130,30 +125,26 @@ class NodeAttachmentStorageTest {
assertEquals(
listOf(hashB),
storage.queryAttachments( AttachmentQueryCriteria.AttachmentsQueryCriteria( Builder.equal("uploaderB")))
storage.queryAttachments(AttachmentQueryCriteria.AttachmentsQueryCriteria(Builder.equal("uploaderB")))
)
assertEquals (
assertEquals(
listOf(hashB, hashC),
storage.queryAttachments( AttachmentQueryCriteria.AttachmentsQueryCriteria( Builder.like ("%uploader%")))
storage.queryAttachments(AttachmentQueryCriteria.AttachmentsQueryCriteria(Builder.like("%uploader%")))
)
}
}
@Test
fun `sorting and compound conditions work`() {
val (jarA,hashA) = makeTestJar(listOf(Pair("a","a")))
val (jarB,hashB) = makeTestJar(listOf(Pair("b","b")))
val (jarC,hashC) = makeTestJar(listOf(Pair("c","c")))
val (jarA, hashA) = makeTestJar(listOf(Pair("a", "a")))
val (jarB, hashB) = makeTestJar(listOf(Pair("b", "b")))
val (jarC, hashC) = makeTestJar(listOf(Pair("c", "c")))
fun uploaderCondition(s:String) = AttachmentQueryCriteria.AttachmentsQueryCriteria(uploaderCondition = Builder.equal(s))
fun filenamerCondition(s:String) = AttachmentQueryCriteria.AttachmentsQueryCriteria(filenameCondition = Builder.equal(s))
fun uploaderCondition(s: String) = AttachmentQueryCriteria.AttachmentsQueryCriteria(uploaderCondition = Builder.equal(s))
fun filenamerCondition(s: String) = AttachmentQueryCriteria.AttachmentsQueryCriteria(filenameCondition = Builder.equal(s))
fun filenameSort(direction: Sort.Direction) = AttachmentSort(listOf(AttachmentSort.AttachmentSortColumn(AttachmentSort.AttachmentSortAttribute.FILENAME, direction)))
database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
jarA.read { storage.importAttachment(it, "complexA", "archiveA.zip") }
jarB.read { storage.importAttachment(it, "complexB", "archiveB.zip") }
jarC.read { storage.importAttachment(it, "complexC", "archiveC.zip") }
@ -180,24 +171,20 @@ class NodeAttachmentStorageTest {
// DOCEND AttachmentQueryExample1
assertEquals (
assertEquals(
listOf(hashB, hashC),
storage.queryAttachments(complexCondition, sorting = filenameSort(Sort.Direction.ASC))
)
assertEquals (
assertEquals(
listOf(hashC, hashB),
storage.queryAttachments(complexCondition, sorting = filenameSort(Sort.Direction.DESC))
)
}
}
@Ignore("We need to be able to restart nodes - make importing attachments idempotent?")
@Test
fun `duplicates not allowed`() {
val (testJar,_) = makeTestJar()
database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
val (testJar, _) = makeTestJar()
testJar.read {
storage.importAttachment(it)
}
@ -207,13 +194,11 @@ class NodeAttachmentStorageTest {
}
}
}
}
@Test
fun `corrupt entry throws exception`() {
val (testJar,_) = makeTestJar()
val (testJar, _) = makeTestJar()
val id = database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
val id = testJar.read { storage.importAttachment(it) }
// Corrupt the file in the store.
@ -224,8 +209,6 @@ class NodeAttachmentStorageTest {
session.merge(corruptAttachment)
id
}
database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
val e = assertFailsWith<NodeAttachmentService.HashMismatchException> {
storage.openAttachment(id)!!.open().readFully()
}
@ -237,12 +220,9 @@ class NodeAttachmentStorageTest {
it.readBytes()
}
}
}
@Test
fun `non jar rejected`() {
database.transaction {
val storage = NodeAttachmentService(MetricRegistry())
val path = fs.getPath("notajar")
path.writeLines(listOf("Hey", "there!"))
path.read {
@ -251,10 +231,9 @@ class NodeAttachmentStorageTest {
}
}
}
}
private var counter = 0
private fun makeTestJar(extraEntries: List<Pair<String,String>> = emptyList()): Pair<Path, SecureHash> {
private fun makeTestJar(extraEntries: List<Pair<String, String>> = emptyList()): Pair<Path, SecureHash> {
counter++
val file = fs.getPath("$counter.jar")
file.write {

View File

@ -2,7 +2,6 @@ package net.corda.node.services.persistence
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.junit.After
import org.junit.Test
@ -10,7 +9,7 @@ import kotlin.test.assertEquals
class TransactionCallbackTest {
private val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock())
private val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
@After
fun closeDatabase() {

View File

@ -18,7 +18,6 @@ import net.corda.testing.internal.LogHelper
import net.corda.testing.core.TestIdentity
import net.corda.testing.contracts.DummyContract
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.internal.rigorousMock
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -65,7 +64,7 @@ class HibernateObserverTests {
return parent
}
}
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock(), schemaService)
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }, schemaService)
HibernateObserver.install(rawUpdatesPublisher, database.hibernateConfig, schemaService)
database.transaction {
val MEGA_CORP = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party

View File

@ -16,7 +16,6 @@ import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.core.generateStateRef
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.junit.After
import org.junit.Before
@ -39,7 +38,7 @@ class PersistentUniquenessProviderTests {
@Before
fun setUp() {
LogHelper.setLevel(PersistentUniquenessProvider::class)
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock(), NodeSchemaService(includeNotarySchemas = true))
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null }, NodeSchemaService(includeNotarySchemas = true))
}
@After

View File

@ -22,7 +22,6 @@ import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.freeLocalHostAndPort
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.hamcrest.Matchers.instanceOf
import org.junit.*
@ -149,7 +148,7 @@ class RaftTransactionCommitLogTests {
private fun createReplica(myAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): CompletableFuture<Member> {
val storage = Storage.builder().withStorageLevel(StorageLevel.MEMORY).build()
val address = Address(myAddress.host, myAddress.port)
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(serverNameTablePrefix = "PORT_${myAddress.port}_"), rigorousMock(), NodeSchemaService(includeNotarySchemas = true))
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(serverNameTablePrefix = "PORT_${myAddress.port}_"), { null }, { null }, NodeSchemaService(includeNotarySchemas = true))
databases.add(database)
val stateMachineFactory = { RaftTransactionCommitLog(database, Clock.systemUTC(), RaftUniquenessProvider.Companion::createMap) }

View File

@ -185,7 +185,7 @@ abstract class VaultQueryTestsBase : VaultQueryParties {
@Ignore
@Test
fun createPersistentTestDb() {
val database = configureDatabase(makePersistentDataSourceProperties(), DatabaseConfig(), identitySvc)
val database = configureDatabase(makePersistentDataSourceProperties(), DatabaseConfig(), identitySvc::wellKnownPartyFromX500Name, identitySvc::wellKnownPartyFromAnonymous)
setUpDb(database, 5000)
database.close()

View File

@ -25,6 +25,7 @@ import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.VaultServiceInternal
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.HibernateConfiguration
import net.corda.testing.core.singleIdentity
import net.corda.testing.internal.rigorousMock
@ -83,9 +84,9 @@ class VaultSoftLockManagerTest {
}
private val mockNet = InternalMockNetwork(cordappPackages = listOf(ContractImpl::class.packageName), defaultFactory = { args ->
object : InternalMockNetwork.MockNode(args) {
override fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, hibernateConfig: HibernateConfiguration): VaultServiceInternal {
override fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, hibernateConfig: HibernateConfiguration, database: CordaPersistence): VaultServiceInternal {
val node = this
val realVault = super.makeVaultService(keyManagementService, services, hibernateConfig)
val realVault = super.makeVaultService(keyManagementService, services, hibernateConfig, database)
return object : VaultServiceInternal by realVault {
override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) {
// Should be called before flow is removed

View File

@ -5,7 +5,6 @@ import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.tee
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.*
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
@ -21,7 +20,7 @@ class ObservablesTests {
private val toBeClosed = mutableListOf<Closeable>()
private fun createDatabase(): CordaPersistence {
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock())
val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
toBeClosed += database
return database
}

View File

@ -4,14 +4,13 @@ import net.corda.core.crypto.SecureHash
import net.corda.node.internal.configureDatabase
import net.corda.node.services.upgrade.ContractUpgradeServiceImpl
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices
import org.junit.Test
import kotlin.test.assertEquals
class PersistentMapTests {
private val databaseConfig = DatabaseConfig()
private val database get() = configureDatabase(dataSourceProps, databaseConfig, rigorousMock())
private val database get() = configureDatabase(dataSourceProps, databaseConfig, { null }, { null })
private val dataSourceProps = MockServices.makeTestDataSourceProperties()
//create a test map using an existing db table

View File

@ -67,7 +67,7 @@ class NodeInterestRatesTest {
@Before
fun setUp() {
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), rigorousMock())
database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(), { null }, { null })
database.transaction {
oracle = createMockCordaService(services, NodeInterestRates::Oracle)
oracle.knownFixes = TEST_DATA

View File

@ -94,10 +94,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
val node1 = banks[i].started!!
val node2 = banks[j].started!!
val swaps =
node1.database.transaction {
node1.services.vaultService.queryBy<InterestRateSwap.State>().states
}
val swaps = node1.services.vaultService.queryBy<InterestRateSwap.State>().states
val theDealRef: StateAndRef<InterestRateSwap.State> = swaps.single()
// Do we have any more days left in this deal's lifetime? If not, return.

View File

@ -63,12 +63,10 @@ abstract class Simulation(val networkSendManuallyPumped: Boolean,
registerInitiatedFlow(NodeInterestRates.FixQueryHandler::class.java)
registerInitiatedFlow(NodeInterestRates.FixSignHandler::class.java)
javaClass.classLoader.getResourceAsStream("net/corda/irs/simulation/example.rates.txt").use {
database.transaction {
services.cordaService(NodeInterestRates.Oracle::class.java).uploadFixes(it.reader().readText())
}
}
}
}
val mockNet = InternalMockNetwork(
cordappPackages = listOf("net.corda.finance.contract", "net.corda.irs"),

View File

@ -18,7 +18,12 @@ import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.trace
import net.corda.node.services.messaging.*
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.Message
import net.corda.node.services.messaging.MessageHandler
import net.corda.node.services.messaging.MessageHandlerRegistration
import net.corda.node.services.messaging.MessagingService
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.SenderDeduplicationId
@ -37,6 +42,7 @@ import java.util.concurrent.LinkedBlockingQueue
import javax.annotation.concurrent.ThreadSafe
import kotlin.concurrent.schedule
import kotlin.concurrent.thread
import kotlin.jvm.Volatile
/**
* An in-memory network allows you to manufacture [InternalMockMessagingService]s for a set of participants. Each
@ -127,8 +133,7 @@ class InMemoryMessagingNetwork private constructor(
id: Int,
executor: AffinityExecutor,
notaryService: PartyAndCertificate?,
description: CordaX500Name = CordaX500Name(organisation = "In memory node $id", locality = "London", country = "UK"),
database: CordaPersistence)
description: CordaX500Name = CordaX500Name(organisation = "In memory node $id", locality = "London", country = "UK"))
: InternalMockMessagingService {
val peerHandle = PeerHandle(id, description)
peersMapping[peerHandle.name] = peerHandle // Assume that the same name - the same entity in MockNetwork.
@ -136,7 +141,7 @@ class InMemoryMessagingNetwork private constructor(
val serviceHandles = notaryService?.let { listOf(DistributedServiceHandle(it.party)) }
?: emptyList() //TODO only notary can be distributed?
synchronized(this) {
val node = InMemoryMessaging(manuallyPumped, peerHandle, executor, database)
val node = InMemoryMessaging(manuallyPumped, peerHandle, executor)
val oldNode = handleEndpointMap.put(peerHandle, node)
if (oldNode != null) {
node.inheritPendingRedelivery(oldNode)
@ -354,8 +359,7 @@ class InMemoryMessagingNetwork private constructor(
@ThreadSafe
private inner class InMemoryMessaging(private val manuallyPumped: Boolean,
private val peerHandle: PeerHandle,
private val executor: AffinityExecutor,
private val database: CordaPersistence) : SingletonSerializeAsToken(), InternalMockMessagingService {
private val executor: AffinityExecutor) : SingletonSerializeAsToken(), InternalMockMessagingService {
private inner class Handler(val topicSession: String, val callback: MessageHandler) : MessageHandlerRegistration
@Volatile
@ -396,10 +400,8 @@ class InMemoryMessagingNetwork private constructor(
val (handler, transfers) = state.locked {
val handler = Handler(topic, callback).apply { handlers.add(this) }
val pending = ArrayList<MessageTransfer>()
database.transaction {
pending.addAll(pendingRedelivery)
pendingRedelivery.clear()
}
Pair(handler, pending)
}
@ -486,9 +488,7 @@ class InMemoryMessagingNetwork private constructor(
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
// least sometimes.
log.warn("Message to ${transfer.message.topic} could not be delivered")
database.transaction {
pendingRedelivery.add(transfer)
}
null
} else {
matchingHandlers
@ -506,7 +506,6 @@ class InMemoryMessagingNetwork private constructor(
val (transfer, deliverTo) = getNextQueue(q, block) ?: return null
if (transfer.message.uniqueMessageId !in processedMessages) {
executor.execute {
database.transaction {
for (handler in deliverTo) {
try {
val receivedMessage = transfer.toReceivedMessage()
@ -519,7 +518,6 @@ class InMemoryMessagingNetwork private constructor(
_receivedMessages.onNext(transfer)
messagesInFlight.countDown()
}
}
} else {
log.info("Drop duplicate message ${transfer.message.uniqueMessageId}")
}

View File

@ -100,16 +100,17 @@ open class MockServices private constructor(
val cordappLoader = CordappLoader.createWithTestPackages(cordappPackages)
val dataSourceProps = makeTestDataSourceProperties()
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
val database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService, schemaService)
val database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService)
val mockService = database.transaction {
object : MockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys) {
override val vaultService: VaultService = makeVaultService(database.hibernateConfig, schemaService)
override val vaultService: VaultService = makeVaultService(database.hibernateConfig, schemaService, database)
override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
ServiceHubInternal.recordTransactions(statesToRecord, txs,
validatedTransactions as WritableTransactionStorage,
mockStateMachineRecordedTransactionMappingStorage,
vaultService as VaultServiceInternal)
vaultService as VaultServiceInternal,
database)
}
override fun jdbcSession(): Connection = database.createSession()
@ -240,8 +241,8 @@ open class MockServices private constructor(
protected val servicesForResolution: ServicesForResolution get() = ServicesForResolutionImpl(identityService, attachments, cordappProvider, networkParameters, validatedTransactions)
internal fun makeVaultService(hibernateConfig: HibernateConfiguration, schemaService: SchemaService): VaultServiceInternal {
val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, hibernateConfig)
internal fun makeVaultService(hibernateConfig: HibernateConfiguration, schemaService: SchemaService, database: CordaPersistence): VaultServiceInternal {
val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, hibernateConfig, database)
HibernateObserver.install(vaultService.rawUpdates, hibernateConfig, schemaService)
return vaultService
}

View File

@ -8,6 +8,7 @@ import net.corda.core.DoNotImplement
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
@ -272,15 +273,14 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
id,
serverThread,
myNotaryIdentity,
configuration.myLegalName,
database).also { runOnStop += it::stop }
configuration.myLegalName).also { runOnStop += it::stop }
}
fun setMessagingServiceSpy(messagingServiceSpy: MessagingServiceSpy) {
network = messagingServiceSpy
}
override fun makeKeyManagementService(identityService: IdentityService, keyPairs: Set<KeyPair>): KeyManagementService {
override fun makeKeyManagementService(identityService: IdentityService, keyPairs: Set<KeyPair>, database: CordaPersistence): KeyManagementService {
return E2ETestKeyManagementService(identityService, keyPairs)
}
@ -317,8 +317,10 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
override val serializationWhitelists: List<SerializationWhitelist>
get() = _serializationWhitelists
private var dbCloser: (() -> Any?)? = null
override fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence {
return super.initialiseDatabasePersistence(schemaService, identityService).also { dbCloser = it::close }
override fun initialiseDatabasePersistence(schemaService: SchemaService,
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?): CordaPersistence {
return super.initialiseDatabasePersistence(schemaService, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous).also { dbCloser = it::close }
}
fun disableDBCloseOnStop() {