diff --git a/.ci/api-current.txt b/.ci/api-current.txt index cf5f0a1573..976152d456 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -6517,6 +6517,10 @@ public final class net.corda.testing.node.NotarySpec extends java.lang.Object public int hashCode() public String toString() ## +public interface net.corda.testing.node.ResponderFlowFactory + @NotNull + public abstract F invoke(net.corda.core.flows.FlowSession) +## public final class net.corda.testing.node.StartedMockNode extends java.lang.Object @NotNull public final java.util.List>> findStateMachines(Class) diff --git a/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt b/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt index eac8e32ecb..981b217a53 100644 --- a/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt @@ -22,6 +22,7 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.identity.excludeHostNode import net.corda.core.identity.groupAbstractPartyByWellKnownParty +import net.corda.core.node.services.IdentityService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.testing.contracts.DummyContract @@ -37,7 +38,7 @@ import org.junit.Test class CollectSignaturesFlowTests : WithContracts { companion object { private val miniCorp = TestIdentity(CordaX500Name("MiniCorp", "London", "GB")) - private val miniCorpServices = MockServices(listOf("net.corda.testing.contracts"), miniCorp, rigorousMock()) + private val miniCorpServices = MockServices(listOf("net.corda.testing.contracts"), miniCorp, rigorousMock()) private val classMockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts", "net.corda.core.flows")) private const val MAGIC_NUMBER = 1337 diff --git a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt index cfac4fb280..40d8624644 100644 --- a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt +++ b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt @@ -43,7 +43,7 @@ import java.util.* class ContractUpgradeFlowTest : WithContracts, WithFinality { companion object { - private val classMockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts", "net.corda.finance.contracts.asset", "net.corda.core.flows")) + private val classMockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts", "net.corda.finance.contracts.asset", "net.corda.core.flows", "net.corda.finance.schemas")) @JvmStatic @AfterClass diff --git a/core/src/test/kotlin/net/corda/core/flows/FinalityFlowTests.kt b/core/src/test/kotlin/net/corda/core/flows/FinalityFlowTests.kt index 6e57b350cb..c30e47ae73 100644 --- a/core/src/test/kotlin/net/corda/core/flows/FinalityFlowTests.kt +++ b/core/src/test/kotlin/net/corda/core/flows/FinalityFlowTests.kt @@ -31,7 +31,7 @@ import org.junit.Test class FinalityFlowTests : WithFinality { companion object { private val CHARLIE = TestIdentity(CHARLIE_NAME, 90).party - private val classMockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts.asset")) + private val classMockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts.asset","net.corda.finance.schemas")) @JvmStatic @AfterClass diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 52c093174b..09af78383f 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -6,6 +6,8 @@ release, see :doc:`upgrade-notes`. Unreleased ---------- +* Added ``registerResponderFlow`` method to ``StartedMockNode``, to support isolated testing of responder flow behaviour. + * Introduced ``TestCorDapp`` and utilities to support asymmetric setups for nodes through ``DriverDSL``, ``MockNetwork`` and ``MockServices``. * Change type of the `checkpoint_value` column. Please check the upgrade-notes on how to update your database. diff --git a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt index 0339fc3962..13da729582 100644 --- a/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt +++ b/experimental/flow-worker/src/main/kotlin/net/corda/flowworker/FlowWorkerServiceHub.kt @@ -44,7 +44,6 @@ import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.P2PMessagingClient import net.corda.node.services.network.* import net.corda.node.services.persistence.* -import net.corda.node.services.schema.HibernateObserver import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.statemachine.MultiThreadedStateMachineExecutor import net.corda.node.services.statemachine.MultiThreadedStateMachineManager @@ -111,7 +110,7 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri override val keyManagementService = PersistentKeyManagementService(identityService, database).tokenize() private val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, validatedTransactions) @Suppress("LeakingThis") - override val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database).tokenize() + override val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService).tokenize() override val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database) override val monitoringService = MonitoringService(metricRegistry).tokenize() override val networkMapUpdater = NetworkMapUpdater( @@ -264,7 +263,6 @@ class FlowWorkerServiceHub(override val configuration: NodeConfiguration, overri contractUpgradeService.start() vaultService.start() - HibernateObserver.install(vaultService.rawUpdates, database.hibernateConfig, schemaService) val frozenTokenizableServices = tokenizableServices!! tokenizableServices = null diff --git a/finance/src/test/kotlin/net/corda/finance/contracts/CommercialPaperTests.kt b/finance/src/test/kotlin/net/corda/finance/contracts/CommercialPaperTests.kt index c5aadb2c37..a49724af7c 100644 --- a/finance/src/test/kotlin/net/corda/finance/contracts/CommercialPaperTests.kt +++ b/finance/src/test/kotlin/net/corda/finance/contracts/CommercialPaperTests.kt @@ -119,7 +119,7 @@ class CommercialPaperTestsGeneric { val testSerialization = SerializationEnvironmentRule() private val megaCorpRef = megaCorp.ref(123) - private val ledgerServices = MockServices(megaCorp, miniCorp) + private val ledgerServices = MockServices(listOf("net.corda.finance.schemas"), megaCorp, miniCorp) @Test fun `trade lifecycle test`() { @@ -255,8 +255,8 @@ class CommercialPaperTestsGeneric { // of the dummy cash issuer. val allIdentities = arrayOf(megaCorp.identity, alice.identity, dummyCashIssuer.identity, dummyNotary.identity) - val notaryServices = MockServices(dummyNotary) - val issuerServices = MockServices(dummyCashIssuer, dummyNotary) + val notaryServices = MockServices(listOf("net.corda.finance.contracts", "net.corda.finance.contracts.asset", "net.corda.finance.schemas"), dummyNotary) + val issuerServices = MockServices(listOf("net.corda.finance.contracts", "net.corda.finance.contracts.asset", "net.corda.finance.schemas"), dummyCashIssuer, dummyNotary) val (aliceDatabase, aliceServices) = makeTestDatabaseAndMockServices( listOf("net.corda.finance.contracts", "net.corda.finance.schemas"), makeTestIdentityService(*allIdentities), diff --git a/finance/src/test/kotlin/net/corda/finance/contracts/asset/CashTests.kt b/finance/src/test/kotlin/net/corda/finance/contracts/asset/CashTests.kt index 14019d271d..e4eab8924f 100644 --- a/finance/src/test/kotlin/net/corda/finance/contracts/asset/CashTests.kt +++ b/finance/src/test/kotlin/net/corda/finance/contracts/asset/CashTests.kt @@ -89,12 +89,13 @@ class CashTests { // TODO: Optimise this so that we don't throw away and rebuild state that can be shared across tests. @Before fun setUp() { + val cordapps = listOf("net.corda.finance.contracts.asset", "net.corda.finance.schemas") LogHelper.setLevel(NodeVaultService::class) - megaCorpServices = MockServices(megaCorp) - miniCorpServices = MockServices(miniCorp) + megaCorpServices = MockServices(cordapps, megaCorp) + miniCorpServices = MockServices(cordapps, miniCorp) val myself = TestIdentity(CordaX500Name("Me", "London", "GB")) val databaseAndServices = makeTestDatabaseAndMockServices( - listOf("net.corda.finance.contracts.asset", "net.corda.finance.schemas"), + cordapps, makeTestIdentityService(megaCorp.identity, miniCorp.identity, dummyCashIssuer.identity, dummyNotary.identity, myself.identity), myself ) diff --git a/finance/src/test/kotlin/net/corda/finance/contracts/asset/ObligationTests.kt b/finance/src/test/kotlin/net/corda/finance/contracts/asset/ObligationTests.kt index 054bcc99c6..0eb09e4fc3 100644 --- a/finance/src/test/kotlin/net/corda/finance/contracts/asset/ObligationTests.kt +++ b/finance/src/test/kotlin/net/corda/finance/contracts/asset/ObligationTests.kt @@ -19,6 +19,7 @@ import net.corda.core.crypto.sha256 import net.corda.core.identity.AbstractParty import net.corda.core.identity.AnonymousParty import net.corda.core.identity.CordaX500Name +import net.corda.core.node.services.IdentityService import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.NonEmptySet import net.corda.core.utilities.OpaqueBytes @@ -90,7 +91,7 @@ class ObligationTests { beneficiary = CHARLIE ) private val outState = inState.copy(beneficiary = AnonymousParty(BOB_PUBKEY)) - private val miniCorpServices = MockServices(listOf("net.corda.finance.contracts.asset"), miniCorp, rigorousMock()) + private val miniCorpServices = MockServices(listOf("net.corda.finance.contracts.asset"), miniCorp, rigorousMock()) private val notaryServices = MockServices(emptyList(), MEGA_CORP.name, rigorousMock(), dummyNotary.keyPair) private val identityService = rigorousMock().also { doReturn(null).whenever(it).partyFromKey(ALICE_PUBKEY) diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 14b8de1eaf..788d367754 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -65,7 +65,6 @@ import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.MessagingService import net.corda.node.services.network.* import net.corda.node.services.persistence.* -import net.corda.node.services.schema.HibernateObserver import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.statemachine.* import net.corda.node.services.transactions.* @@ -364,7 +363,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, contractUpgradeService.start() vaultService.start() ScheduledActivityObserver.install(vaultService, schedulerService, flowLogicRefFactory) - HibernateObserver.install(vaultService.rawUpdates, database.hibernateConfig, schemaService) val frozenTokenizableServices = tokenizableServices!! tokenizableServices = null @@ -908,7 +906,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected open fun makeVaultService(keyManagementService: KeyManagementService, services: ServicesForResolution, database: CordaPersistence): VaultServiceInternal { - return NodeVaultService(platformClock, keyManagementService, services, database) + return NodeVaultService(platformClock, keyManagementService, services, database, schemaService) } /** Load configured JVM agents */ diff --git a/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt b/node/src/main/kotlin/net/corda/node/services/schema/PersistentStateService.kt similarity index 62% rename from node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt rename to node/src/main/kotlin/net/corda/node/services/schema/PersistentStateService.kt index 01f4ac0040..755679f912 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/PersistentStateService.kt @@ -14,16 +14,12 @@ import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef import net.corda.core.internal.VisibleForTesting -import net.corda.core.node.services.Vault import net.corda.core.schemas.MappedSchema import net.corda.core.schemas.PersistentStateRef import net.corda.core.utilities.contextLogger import net.corda.core.utilities.debug import net.corda.node.services.api.SchemaService -import net.corda.nodeapi.internal.persistence.HibernateConfiguration -import net.corda.nodeapi.internal.persistence.contextTransaction -import org.hibernate.FlushMode -import rx.Observable +import net.corda.nodeapi.internal.persistence.currentDBSession /** * Small data class bundling together a ContractState and a StateRef (as opposed to a TransactionState and StateRef @@ -35,17 +31,12 @@ data class ContractStateAndRef(val state: ContractState, val ref: StateRef) * A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate. */ // TODO: Manage version evolution of the schemas via additional tooling. -class HibernateObserver private constructor(private val config: HibernateConfiguration, private val schemaService: SchemaService) { +class PersistentStateService(private val schemaService: SchemaService) { companion object { private val log = contextLogger() - fun install(vaultUpdates: Observable>, config: HibernateConfiguration, schemaService: SchemaService): HibernateObserver { - val observer = HibernateObserver(config, schemaService) - vaultUpdates.subscribe { observer.persist(it.produced) } - return observer - } } - private fun persist(produced: Set>) { + fun persist(produced: Set>) { val stateBySchema: MutableMap> = mutableMapOf() // map all states by their referenced schemas produced.forEach { @@ -61,15 +52,10 @@ class HibernateObserver private constructor(private val config: HibernateConfigu @VisibleForTesting internal fun persistStatesWithSchema(statesAndRefs: List, schema: MappedSchema) { - val sessionFactory = config.sessionFactoryForSchemas(setOf(schema)) - val session = sessionFactory.withOptions().connection(contextTransaction.connection).flushMode(FlushMode.MANUAL).openSession() - session.use { thisSession -> - statesAndRefs.forEach { - val mappedObject = schemaService.generateMappedObject(it.state, schema) - mappedObject.stateRef = PersistentStateRef(it.ref) - thisSession.persist(mappedObject) - } - thisSession.flush() + statesAndRefs.forEach { + val mappedObject = schemaService.generateMappedObject(it.state, schema) + mappedObject.stateRef = PersistentStateRef(it.ref) + currentDBSession().persist(mappedObject) } } } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 65d2995b05..76520eaf88 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -24,12 +24,11 @@ import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.* import net.corda.core.utilities.* +import net.corda.node.services.api.SchemaService import net.corda.node.services.api.VaultServiceInternal +import net.corda.node.services.schema.PersistentStateService import net.corda.node.services.statemachine.FlowStateMachineImpl -import net.corda.nodeapi.internal.persistence.CordaPersistence -import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit -import net.corda.nodeapi.internal.persistence.currentDBSession -import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction +import net.corda.nodeapi.internal.persistence.* import org.hibernate.Session import rx.Observable import rx.subjects.PublishSubject @@ -61,7 +60,8 @@ class NodeVaultService( private val clock: Clock, private val keyManagementService: KeyManagementService, private val servicesForResolution: ServicesForResolution, - private val database: CordaPersistence + private val database: CordaPersistence, + private val schemaService: SchemaService ) : SingletonSerializeAsToken(), VaultServiceInternal { private companion object { private val log = contextLogger() @@ -78,6 +78,7 @@ class NodeVaultService( private val concurrentBox = ConcurrentBox(InnerState()) private lateinit var criteriaBuilder: CriteriaBuilder + private val persistentStateService = PersistentStateService(schemaService) /** * Maintain a list of contract state interfaces to concrete types stored in the vault @@ -257,6 +258,7 @@ class NodeVaultService( softLockReserve(uuid, stateRefs) } } + persistentStateService.persist(vaultUpdate.produced) updatesPublisher.onNext(vaultUpdate) } } diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt index 497184620e..7a990c272f 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt @@ -84,7 +84,7 @@ import kotlin.test.assertTrue @RunWith(Parameterized::class) class TwoPartyTradeFlowTests(private val anonymous: Boolean) { companion object { - private val cordappPackages = setOf("net.corda.finance.contracts") + private val cordappPackages = listOf("net.corda.finance.contracts", "net.corda.finance.schemas") @JvmStatic @Parameterized.Parameters(name = "Anonymous = {0}") fun data(): Collection = listOf(true, false) diff --git a/node/src/test/kotlin/net/corda/node/services/ServiceHubConcurrentUsageTest.kt b/node/src/test/kotlin/net/corda/node/services/ServiceHubConcurrentUsageTest.kt index 3329cc7259..4d2b8147b7 100644 --- a/node/src/test/kotlin/net/corda/node/services/ServiceHubConcurrentUsageTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/ServiceHubConcurrentUsageTest.kt @@ -25,7 +25,7 @@ import java.util.concurrent.CountDownLatch class ServiceHubConcurrentUsageTest { - private val mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages(Cash::class.packageName)) + private val mockNet = InternalMockNetwork(cordappsForAllNodes = cordappsForPackages("net.corda.finance.schemas", "net.corda.node.services.vault.VaultQueryExceptionsTests", Cash::class.packageName)) @After fun stopNodes() { diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateConfigurationTest.kt b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateConfigurationTest.kt index db39dff72f..a3474124f2 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/HibernateConfigurationTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/HibernateConfigurationTest.kt @@ -33,16 +33,17 @@ import net.corda.finance.POUNDS import net.corda.finance.SWISS_FRANCS import net.corda.finance.contracts.asset.Cash import net.corda.finance.contracts.asset.DummyFungibleContract +import net.corda.finance.schemas.CashSchemaV1 +import net.corda.finance.schemas.SampleCashSchemaV1 import net.corda.finance.schemas.SampleCashSchemaV2 import net.corda.finance.schemas.SampleCashSchemaV3 -import net.corda.finance.schemas.CashSchemaV1 import net.corda.finance.utils.sumCash import net.corda.node.internal.configureDatabase import net.corda.node.services.api.IdentityServiceInternal import net.corda.node.services.api.WritableTransactionStorage import net.corda.node.services.schema.ContractStateAndRef -import net.corda.node.services.schema.HibernateObserver import net.corda.node.services.schema.NodeSchemaService +import net.corda.node.services.schema.PersistentStateService import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.VaultSchemaV1 import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -92,7 +93,7 @@ class HibernateConfigurationTest { // Hibernate configuration objects lateinit var hibernateConfig: HibernateConfiguration - private lateinit var hibernatePersister: HibernateObserver + private lateinit var hibernatePersister: PersistentStateService private lateinit var sessionFactory: SessionFactory private lateinit var entityManager: EntityManager private lateinit var criteriaBuilder: CriteriaBuilder @@ -107,10 +108,10 @@ class HibernateConfigurationTest { @Before fun setUp() { - val cordappPackages = listOf("net.corda.testing.internal.vault", "net.corda.finance.contracts.asset") + val cordappPackages = listOf("net.corda.testing.internal.vault", "net.corda.finance.contracts.asset", "net.corda.finance.schemas") bankServices = MockServices(cordappPackages, BOC.name, rigorousMock(), BOC_KEY) - issuerServices = MockServices(cordappPackages, dummyCashIssuer, rigorousMock()) - notaryServices = MockServices(cordappPackages, dummyNotary, rigorousMock()) + issuerServices = MockServices(cordappPackages, dummyCashIssuer, rigorousMock()) + notaryServices = MockServices(cordappPackages, dummyNotary, rigorousMock()) notary = notaryServices.myInfo.singleIdentity() val dataSourceProps = makeTestDataSourceProperties() val identityService = rigorousMock().also { mock -> @@ -120,7 +121,7 @@ class HibernateConfigurationTest { doReturn(it.party).whenever(mock).wellKnownPartyFromX500Name(it.name) } } - val schemaService = NodeSchemaService(extraSchemas = setOf(CashSchemaV1, SampleCashSchemaV2, SampleCashSchemaV3, DummyLinearStateSchemaV1, DummyLinearStateSchemaV2, DummyDealStateSchemaV1 )) + val schemaService = NodeSchemaService(extraSchemas = setOf(CashSchemaV1, SampleCashSchemaV1, SampleCashSchemaV2, SampleCashSchemaV3, DummyLinearStateSchemaV1, DummyLinearStateSchemaV2, DummyDealStateSchemaV1)) database = configureDatabase(dataSourceProps, DatabaseConfig(), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService) database.transaction { hibernateConfig = database.hibernateConfig @@ -129,7 +130,7 @@ class HibernateConfigurationTest { services = object : MockServices(cordappPackages, BOB_NAME, rigorousMock().also { doNothing().whenever(it).justVerifyAndRegisterIdentity(argThat { name == BOB_NAME }) }, generateKeyPair(), dummyNotary.keyPair) { - override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, database).apply { start() } + override val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, servicesForResolution, database, schemaService).apply { start() } override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable) { for (stx in txs) { (validatedTransactions as WritableTransactionStorage).addTransaction(stx) @@ -141,7 +142,7 @@ class HibernateConfigurationTest { override fun jdbcSession() = database.createSession() } vaultFiller = VaultFiller(services, dummyNotary, notary, ::Random) - hibernatePersister = HibernateObserver.install(services.vaultService.rawUpdates, hibernateConfig, schemaService) + hibernatePersister = PersistentStateService(schemaService) } identity = services.myInfo.singleIdentity() diff --git a/node/src/test/kotlin/net/corda/node/services/schema/HibernateObserverTests.kt b/node/src/test/kotlin/net/corda/node/services/schema/PersistentStateServiceTests.kt similarity index 84% rename from node/src/test/kotlin/net/corda/node/services/schema/HibernateObserverTests.kt rename to node/src/test/kotlin/net/corda/node/services/schema/PersistentStateServiceTests.kt index e338f199dd..6d4800777d 100644 --- a/node/src/test/kotlin/net/corda/node/services/schema/HibernateObserverTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/schema/PersistentStateServiceTests.kt @@ -25,25 +25,28 @@ import net.corda.core.schemas.QueryableState import net.corda.node.services.api.SchemaService import net.corda.node.internal.configureDatabase import net.corda.nodeapi.internal.persistence.DatabaseConfig +import net.corda.nodeapi.internal.persistence.currentDBSession import net.corda.testing.internal.LogHelper import net.corda.testing.core.TestIdentity import net.corda.testing.contracts.DummyContract +import net.corda.testing.internal.rigorousMock import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import org.junit.After import org.junit.Before +import org.junit.Ignore import org.junit.Test import rx.subjects.PublishSubject import kotlin.test.assertEquals -class HibernateObserverTests { +class PersistentStateServiceTests { @Before fun setUp() { - LogHelper.setLevel(HibernateObserver::class) + LogHelper.setLevel(PersistentStateService::class) } @After fun cleanUp() { - LogHelper.reset(HibernateObserver::class) + LogHelper.reset(PersistentStateService::class) } class TestState : QueryableState { @@ -62,11 +65,8 @@ class HibernateObserverTests { @Test fun `test child objects are persisted`() { val testSchema = TestSchema - val rawUpdatesPublisher = PublishSubject.create>() val schemaService = object : SchemaService { - override val schemaOptions: Map = mapOf( - CommonSchemaV1 to SchemaService.SchemaOptions(), - testSchema to SchemaService.SchemaOptions()) + override val schemaOptions: Map = mapOf(testSchema to SchemaService.SchemaOptions()) override fun selectSchemas(state: ContractState): Iterable = setOf(testSchema) @@ -78,10 +78,11 @@ class HibernateObserverTests { } } val database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), { null }, { null }, schemaService) - HibernateObserver.install(rawUpdatesPublisher, database.hibernateConfig, schemaService) + val persistentStateService = PersistentStateService(schemaService) database.transaction { val MEGA_CORP = TestIdentity(CordaX500Name("MegaCorp", "London", "GB")).party - rawUpdatesPublisher.onNext(Vault.Update(emptySet(), setOf(StateAndRef(TransactionState(TestState(), DummyContract.PROGRAM_ID, MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0))))) + persistentStateService.persist(setOf(StateAndRef(TransactionState(TestState(), DummyContract.PROGRAM_ID, MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0)))) + currentDBSession().flush() val parentRowCountResult = connection.prepareStatement("select count(*) from Parents").executeQuery() parentRowCountResult.next() val parentRows = parentRowCountResult.getInt(1) diff --git a/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt index 82d9601953..3021545c6a 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt @@ -21,10 +21,7 @@ import net.corda.core.identity.* import net.corda.core.internal.NotaryChangeTransactionBuilder import net.corda.core.internal.packageName import net.corda.core.node.StatesToRecord -import net.corda.core.node.services.StatesNotAvailableException -import net.corda.core.node.services.Vault -import net.corda.core.node.services.VaultService -import net.corda.core.node.services.queryBy +import net.corda.core.node.services.* import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.QueryCriteria.* @@ -107,8 +104,8 @@ class NodeVaultServiceTest { vaultFiller = VaultFiller(services, dummyNotary) // This is safe because MockServices only ever have a single identity identity = services.myInfo.singleIdentityAndCert() - issuerServices = MockServices(cordappPackages, dummyCashIssuer, rigorousMock()) - bocServices = MockServices(cordappPackages, bankOfCorda, rigorousMock()) + issuerServices = MockServices(cordappPackages, dummyCashIssuer, rigorousMock()) + bocServices = MockServices(cordappPackages, bankOfCorda, rigorousMock()) services.identityService.verifyAndRegisterIdentity(DUMMY_CASH_ISSUER_IDENTITY) services.identityService.verifyAndRegisterIdentity(BOC_IDENTITY) } diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryExceptionsTests.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryExceptionsTests.kt index 2890420db3..88701d7254 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryExceptionsTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryExceptionsTests.kt @@ -33,8 +33,7 @@ class VaultQueryExceptionsTests : VaultQueryParties by rule { override val cordappPackages = listOf( "net.corda.testing.contracts", "net.corda.finance.contracts", - CashSchemaV1::class.packageName, - DummyLinearStateSchemaV1::class.packageName) - SampleCashSchemaV3::class.packageName + DummyLinearStateSchemaV1::class.packageName) } } @@ -49,9 +48,6 @@ class VaultQueryExceptionsTests : VaultQueryParties by rule { @Test fun `query attempting to use unregistered schema`() { database.transaction { - vaultFiller.fillWithSomeTestCash(100.DOLLARS, notaryServices, 1, DUMMY_CASH_ISSUER) - vaultFiller.fillWithSomeTestCash(100.POUNDS, notaryServices, 1, DUMMY_CASH_ISSUER) - vaultFiller.fillWithSomeTestCash(100.SWISS_FRANCS, notaryServices, 1, DUMMY_CASH_ISSUER) // CashSchemaV3 NOT registered with NodeSchemaService val logicalExpression = builder { SampleCashSchemaV3.PersistentCashState::currency.equal(GBP.currencyCode) } val criteria = VaultCustomQueryCriteria(logicalExpression) diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt index a70bef5fef..b680237a63 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt @@ -34,6 +34,7 @@ import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.internal.InitiatedFlowFactory +import net.corda.node.services.api.SchemaService import net.corda.node.services.api.VaultServiceInternal import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.testing.core.singleIdentity diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultWithCashTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultWithCashTest.kt index 19e4c5e7e5..bb7ee8f2f4 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultWithCashTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultWithCashTest.kt @@ -94,7 +94,7 @@ class VaultWithCashTest { services = databaseAndServices.second vaultFiller = VaultFiller(services, dummyNotary) issuerServices = MockServices(cordappPackages, dummyCashIssuer, rigorousMock(), MEGA_CORP_KEY) - notaryServices = MockServices(cordappPackages, dummyNotary, rigorousMock()) + notaryServices = MockServices(cordappPackages, dummyNotary, rigorousMock()) notary = notaryServices.myInfo.legalIdentitiesAndCerts.single().party } diff --git a/samples/trader-demo/src/test/kotlin/net/corda/traderdemo/TransactionGraphSearchTests.kt b/samples/trader-demo/src/test/kotlin/net/corda/traderdemo/TransactionGraphSearchTests.kt index 9ffd8e2550..ee45877c9d 100644 --- a/samples/trader-demo/src/test/kotlin/net/corda/traderdemo/TransactionGraphSearchTests.kt +++ b/samples/trader-demo/src/test/kotlin/net/corda/traderdemo/TransactionGraphSearchTests.kt @@ -13,6 +13,7 @@ package net.corda.traderdemo import net.corda.core.contracts.CommandData import net.corda.core.crypto.newSecureRandom import net.corda.core.identity.CordaX500Name +import net.corda.core.node.services.IdentityService import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.WireTransaction @@ -56,8 +57,8 @@ class TransactionGraphSearchTests { * @param signer signer for the two transactions and their commands. */ fun buildTransactions(command: CommandData): GraphTransactionStorage { - val megaCorpServices = MockServices(listOf("net.corda.testing.contracts"), megaCorp, rigorousMock()) - val notaryServices = MockServices(listOf("net.corda.testing.contracts"), dummyNotary, rigorousMock()) + val megaCorpServices = MockServices(listOf("net.corda.testing.contracts"), megaCorp, rigorousMock()) + val notaryServices = MockServices(listOf("net.corda.testing.contracts"), dummyNotary, rigorousMock()) val originBuilder = TransactionBuilder(dummyNotary.party) .addOutputState(DummyState(random31BitValue()), DummyContract.PROGRAM_ID) .addCommand(command, megaCorp.publicKey) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNetwork.kt index 47407e76b3..3dca5276b9 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockNetwork.kt @@ -14,12 +14,15 @@ import com.google.common.jimfs.Jimfs import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.random63BitValue import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.node.NetworkParameters import net.corda.core.node.NodeInfo import net.corda.core.node.ServiceHub +import net.corda.core.toFuture import net.corda.core.utilities.getOrThrow +import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.services.config.NodeConfiguration import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.core.DUMMY_NOTARY_NAME @@ -28,6 +31,7 @@ import net.corda.testing.node.internal.* import rx.Observable import java.math.BigInteger import java.nio.file.Path +import java.util.concurrent.Future /** * Immutable builder for configuring a [StartedMockNode] or an [UnstartedMockNode] via [MockNetwork.createNode] and @@ -187,8 +191,69 @@ class StartedMockNode private constructor(private val node: TestStartedNode) { statement() } } + + /** + * Register an [InitiatedFlowFactory], to control relationship between initiating and receiving flow classes + * explicitly on a node-by-node basis. This is used when we want to manually specify that a particular initiating + * flow class will have a particular responder. + * + * An [ResponderFlowFactory] is responsible for converting a [FlowSession] into the [FlowLogic] that will respond + * to the initiated flow. The registry records one responder type, and hence one factory, for each initiator flow + * type. If a factory is already registered for the type, it is overwritten in the registry when a new factory is + * registered. + * + * Note that this change affects _only_ the node on which this method is called, and not the entire network. + * + * @property initiatingFlowClass The [FlowLogic]-inheriting class to register a new responder for. + * @property flowFactory The flow factory that will create the responding flow. + * @property responderFlowClass The class of the responder flow. + * @return A [CordaFuture] that will complete the first time the responding flow is created. + */ + fun > registerResponderFlow(initiatingFlowClass: Class>, + flowFactory: ResponderFlowFactory, + responderFlowClass: Class): CordaFuture = + node.registerFlowFactory( + initiatingFlowClass, + InitiatedFlowFactory.CorDapp(flowVersion = 0, appName = "", factory = flowFactory::invoke), + responderFlowClass, true) + .toFuture() } +/** + * Responsible for converting a [FlowSession] into the [FlowLogic] that will respond to an initiated flow. + * + * @param F The [FlowLogic]-inherited type of the responder class this factory creates. + */ +@FunctionalInterface +interface ResponderFlowFactory> { + /** + * Given the provided [FlowSession], create a responder [FlowLogic] of the desired type. + * + * @param flowSession The [FlowSession] to use to create the responder flow object. + * @return The constructed responder flow object. + */ + fun invoke(flowSession: FlowSession): F +} + +/** + * Kotlin-only utility function using a reified type parameter and a lambda parameter to simplify the + * [InitiatedFlowFactory.registerFlowFactory] function. + * + * @param F The [FlowLogic]-inherited type of the responder to register. + * @property initiatingFlowClass The [FlowLogic]-inheriting class to register a new responder for. + * @property flowFactory A lambda converting a [FlowSession] into an instance of the responder class [F]. + * @return A [CordaFuture] that will complete the first time the responding flow is created. + */ +inline fun > StartedMockNode.registerResponderFlow( + initiatingFlowClass: Class>, + noinline flowFactory: (FlowSession) -> F): Future = + registerResponderFlow( + initiatingFlowClass, + object : ResponderFlowFactory { + override fun invoke(flowSession: FlowSession) = flowFactory(flowSession) + }, + F::class.java) + /** * A mock node brings up a suite of in-memory services in a fast manner suitable for unit testing. * Components that do IO are either swapped out for mocks, or pointed to a [Jimfs] in memory filesystem or an in diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index a1eb0da81e..847bdcbe7c 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -34,7 +34,6 @@ import net.corda.node.internal.configureDatabase import net.corda.node.internal.cordapp.JarScanningCordappLoader import net.corda.node.services.api.* import net.corda.node.services.identity.InMemoryIdentityService -import net.corda.node.services.schema.HibernateObserver import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.transactions.InMemoryTransactionVerifierService import net.corda.node.services.vault.NodeVaultService @@ -117,7 +116,7 @@ open class MockServices private constructor( val database = configureDatabase(dataSourceProps, makeTestDatabaseProperties(initialIdentity.name.organisation), identityService::wellKnownPartyFromX500Name, identityService::wellKnownPartyFromAnonymous, schemaService) val mockService = database.transaction { object : MockServices(cordappLoader, identityService, networkParameters, initialIdentity, moreKeys) { - override val vaultService: VaultService = makeVaultService(database.hibernateConfig, schemaService, database) + override val vaultService: VaultService = makeVaultService(schemaService, database) override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable) { ServiceHubInternal.recordTransactions(statesToRecord, txs, @@ -214,6 +213,12 @@ open class MockServices private constructor( constructor(firstIdentity: TestIdentity, vararg moreIdentities: TestIdentity) : this( listOf(getCallerPackage(MockServices::class)!!), firstIdentity, + *moreIdentities + ) + + constructor(cordappPackages: List, firstIdentity: TestIdentity, vararg moreIdentities: TestIdentity) : this( + cordappPackages, + firstIdentity, makeTestIdentityService(*listOf(firstIdentity, *moreIdentities).map { it.identity }.toTypedArray()), firstIdentity.keyPair ) @@ -253,10 +258,8 @@ open class MockServices private constructor( } } - internal fun makeVaultService(hibernateConfig: HibernateConfiguration, schemaService: SchemaService, database: CordaPersistence): VaultServiceInternal { - val vaultService = NodeVaultService(clock, keyManagementService, servicesForResolution, database).apply { start() } - HibernateObserver.install(vaultService.rawUpdates, hibernateConfig, schemaService) - return vaultService + internal fun makeVaultService(schemaService: SchemaService, database: CordaPersistence): VaultServiceInternal { + return NodeVaultService(clock, keyManagementService, servicesForResolution, database, schemaService).apply { start() } } // This needs to be internal as MutableClassToInstanceMap is a guava type and shouldn't be part of our public API diff --git a/testing/node-driver/src/test/java/net/corda/testing/node/TestResponseFlowInIsolationInJava.java b/testing/node-driver/src/test/java/net/corda/testing/node/TestResponseFlowInIsolationInJava.java new file mode 100644 index 0000000000..af4bb5a14b --- /dev/null +++ b/testing/node-driver/src/test/java/net/corda/testing/node/TestResponseFlowInIsolationInJava.java @@ -0,0 +1,115 @@ +package net.corda.testing.node; + +import co.paralleluniverse.fibers.Suspendable; +import net.corda.core.concurrent.CordaFuture; +import net.corda.core.flows.*; +import net.corda.core.identity.Party; +import net.corda.core.utilities.UntrustworthyData; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.concurrent.Future; + +import static java.util.Collections.singletonList; +import static org.hamcrest.Matchers.instanceOf; + +/** + * Java version of test based on the example given as an answer to this SO question: + * + * https://stackoverflow.com/questions/48166626/how-can-an-acceptor-flow-in-corda-be-isolated-for-unit-testing/ + * + * but using the `registerFlowFactory` method implemented in response to https://r3-cev.atlassian.net/browse/CORDA-916 + */ +public class TestResponseFlowInIsolationInJava { + + private final MockNetwork network = new MockNetwork(singletonList("com.template")); + private final StartedMockNode a = network.createNode(); + private final StartedMockNode b = network.createNode(); + + @After + public void tearDown() { + network.stopNodes(); + } + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Test + public void test() throws Exception { + // This method returns the Responder flow object used by node B. + Future initiatedResponderFlowFuture = b.registerResponderFlow( + // We tell node B to respond to BadInitiator with Responder. + // We want to observe the Responder flow object to check for errors. + BadInitiator.class, Responder::new, Responder.class); + + // We run the BadInitiator flow on node A. + BadInitiator flow = new BadInitiator(b.getInfo().getLegalIdentities().get(0)); + CordaFuture future = a.startFlow(flow); + network.runNetwork(); + future.get(); + + // We check that the invocation of the Responder flow object has caused an ExecutionException. + Responder initiatedResponderFlow = initiatedResponderFlowFuture.get(); + CordaFuture initiatedResponderFlowResultFuture = initiatedResponderFlow.getStateMachine().getResultFuture(); + exception.expectCause(instanceOf(FlowException.class)); + exception.expectMessage("String did not contain the expected message."); + initiatedResponderFlowResultFuture.get(); + } + + // This is the real implementation of Initiator. + @InitiatingFlow + @StartableByRPC + public static class Initiator extends FlowLogic { + private Party counterparty; + + public Initiator(Party counterparty) { + this.counterparty = counterparty; + } + + @Suspendable + @Override public Void call() { + FlowSession session = initiateFlow(counterparty); + session.send("goodString"); + return null; + } + } + + // This is the response flow that we want to isolate for testing. + @InitiatedBy(Initiator.class) + public static class Responder extends FlowLogic { + private final FlowSession counterpartySession; + + Responder(FlowSession counterpartySession) { + this.counterpartySession = counterpartySession; + } + + @Suspendable + @Override + public Void call() throws FlowException { + UntrustworthyData packet = counterpartySession.receive(String.class); + String string = packet.unwrap(data -> data); + if (!string.equals("goodString")) { + throw new FlowException("String did not contain the expected message."); + } + return null; + } + } + + @InitiatingFlow + public static final class BadInitiator extends FlowLogic { + private final Party counterparty; + + BadInitiator(Party counterparty) { + this.counterparty = counterparty; + } + + @Suspendable + @Override public Void call() { + FlowSession session = initiateFlow(counterparty); + session.send("badString"); + return null; + } + } +} diff --git a/testing/node-driver/src/test/kotlin/net/corda/testing/node/internal/TestResponseFlowInIsolation.kt b/testing/node-driver/src/test/kotlin/net/corda/testing/node/internal/TestResponseFlowInIsolation.kt new file mode 100644 index 0000000000..07e260019d --- /dev/null +++ b/testing/node-driver/src/test/kotlin/net/corda/testing/node/internal/TestResponseFlowInIsolation.kt @@ -0,0 +1,89 @@ +package net.corda.testing.node.internal + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.* +import net.corda.core.identity.Party +import net.corda.core.utilities.unwrap +import net.corda.testing.internal.chooseIdentity +import net.corda.testing.node.MockNetwork +import net.corda.testing.node.registerResponderFlow +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Test +import java.util.concurrent.ExecutionException +import kotlin.test.assertFailsWith + +/** + * Test based on the example given as an answer to this SO question: + * + * https://stackoverflow.com/questions/48166626/how-can-an-acceptor-flow-in-corda-be-isolated-for-unit-testing/ + * + * but using the `registerFlowFactory` method implemented in response to https://r3-cev.atlassian.net/browse/CORDA-916 + */ +class TestResponseFlowInIsolation { + + private val network: MockNetwork = MockNetwork(listOf("com.template")) + private val a = network.createNode() + private val b = network.createNode() + + @After + fun tearDown() = network.stopNodes() + + // This is the real implementation of Initiator. + @InitiatingFlow + open class Initiator(val counterparty: Party) : FlowLogic() { + @Suspendable + override fun call() { + val session = initiateFlow(counterparty) + session.send("goodString") + } + } + + // This is the response flow that we want to isolate for testing. + @InitiatedBy(Initiator::class) + class Responder(val counterpartySession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val string = counterpartySession.receive().unwrap { contents -> contents } + if (string != "goodString") { + throw FlowException("String did not contain the expected message.") + } + } + } + + // This is a fake implementation of Initiator to check how Responder responds to non-golden-path scenarios. + @InitiatingFlow + class BadInitiator(val counterparty: Party): FlowLogic() { + @Suspendable + override fun call() { + val session = initiateFlow(counterparty) + session.send("badString") + } + } + + @Test + fun `test`() { + // This method returns the Responder flow object used by node B. + // We tell node B to respond to BadInitiator with Responder. + val initiatedResponderFlowFuture = b.registerResponderFlow( + initiatingFlowClass = BadInitiator::class.java, + flowFactory = ::Responder) + + // We run the BadInitiator flow on node A. + val flow = BadInitiator(b.info.chooseIdentity()) + val future = a.startFlow(flow) + network.runNetwork() + future.get() + + // We check that the invocation of the Responder flow object has caused an ExecutionException. + val initiatedResponderFlow = initiatedResponderFlowFuture.get() + val initiatedResponderFlowResultFuture = initiatedResponderFlow.stateMachine.resultFuture + + val exceptionFromFlow = assertFailsWith { + initiatedResponderFlowResultFuture.get() + }.cause + assertThat(exceptionFromFlow) + .isInstanceOf(FlowException::class.java) + .hasMessage("String did not contain the expected message.") + } +} \ No newline at end of file