diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 16829873aa..df01661af0 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -274,8 +274,7 @@ abstract class AbstractNode(config: NodeConfiguration, require(customNotaryServiceList.size == 1) { "Attempting to install more than one notary service: ${customNotaryServiceList.joinToString()}" } - } - else return loadedServices - customNotaryServiceList + } else return loadedServices - customNotaryServiceList } return loadedServices } @@ -490,9 +489,9 @@ abstract class AbstractNode(config: NodeConfiguration, protected open fun makeTransactionStorage(): WritableTransactionStorage = DBTransactionStorage() private fun makeVaultObservers() { - VaultSoftLockManager(services.vaultService, smm) - ScheduledActivityObserver(services) - HibernateObserver(services.vaultService.rawUpdates, services.database.hibernateConfig) + VaultSoftLockManager.install(services.vaultService, smm) + ScheduledActivityObserver.install(services.vaultService, services.schedulerService) + HibernateObserver.install(services.vaultService.rawUpdates, database.hibernateConfig) } private fun makeInfo(legalIdentity: PartyAndCertificate): NodeInfo { @@ -758,6 +757,9 @@ abstract class AbstractNode(config: NodeConfiguration, } protected open fun generateKeyPair() = cryptoGenerateKeyPair() + protected open fun makeVaultService(keyManagementService: KeyManagementService, stateLoader: StateLoader): VaultServiceInternal { + return NodeVaultService(platformClock, keyManagementService, stateLoader, database.hibernateConfig) + } private inner class ServiceHubInternalImpl( override val schemaService: SchemaService, @@ -771,7 +773,7 @@ abstract class AbstractNode(config: NodeConfiguration, override val auditService = DummyAuditService() override val transactionVerifierService by lazy { makeTransactionVerifierService() } override val networkMapCache by lazy { PersistentNetworkMapCache(this) } - override val vaultService by lazy { NodeVaultService(platformClock, keyManagementService, stateLoader, this@AbstractNode.database.hibernateConfig) } + override val vaultService by lazy { makeVaultService(keyManagementService, stateLoader) } override val contractUpgradeService by lazy { ContractUpgradeServiceImpl() } // Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because diff --git a/node/src/main/kotlin/net/corda/node/services/events/ScheduledActivityObserver.kt b/node/src/main/kotlin/net/corda/node/services/events/ScheduledActivityObserver.kt index 3509fa2d34..3020a9e528 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/ScheduledActivityObserver.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/ScheduledActivityObserver.kt @@ -4,18 +4,28 @@ import net.corda.core.contracts.ContractState import net.corda.core.contracts.SchedulableState import net.corda.core.contracts.ScheduledStateRef import net.corda.core.contracts.StateAndRef -import net.corda.node.services.api.ServiceHubInternal +import net.corda.core.node.services.VaultService +import net.corda.node.services.api.SchedulerService import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl /** * This observes the vault and schedules and unschedules activities appropriately based on state production and * consumption. */ -class ScheduledActivityObserver(val services: ServiceHubInternal) { - init { - services.vaultService.rawUpdates.subscribe { (consumed, produced) -> - consumed.forEach { services.schedulerService.unscheduleStateActivity(it.ref) } - produced.forEach { scheduleStateActivity(it) } +class ScheduledActivityObserver private constructor(private val schedulerService: SchedulerService) { + companion object { + @JvmStatic + fun install(vaultService: VaultService, schedulerService: SchedulerService) { + val observer = ScheduledActivityObserver(schedulerService) + vaultService.rawUpdates.subscribe { (consumed, produced) -> + consumed.forEach { schedulerService.unscheduleStateActivity(it.ref) } + produced.forEach { observer.scheduleStateActivity(it) } + } + } + + // TODO: Beware we are calling dynamically loaded contract code inside here. + private inline fun <T : Any> sandbox(code: () -> T?): T? { + return code() } } @@ -23,12 +33,7 @@ class ScheduledActivityObserver(val services: ServiceHubInternal) { val producedState = produced.state.data if (producedState is SchedulableState) { val scheduledAt = sandbox { producedState.nextScheduledActivity(produced.ref, FlowLogicRefFactoryImpl)?.scheduledAt } ?: return - services.schedulerService.scheduleStateActivity(ScheduledStateRef(produced.ref, scheduledAt)) + schedulerService.scheduleStateActivity(ScheduledStateRef(produced.ref, scheduledAt)) } } - - // TODO: Beware we are calling dynamically loaded contract code inside here. - private inline fun <T : Any> sandbox(code: () -> T?): T? { - return code() - } } diff --git a/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt b/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt index 45a3334ef1..2babcc1d70 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/HibernateObserver.kt @@ -3,6 +3,7 @@ package net.corda.node.services.schema import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef import net.corda.core.contracts.StateRef +import net.corda.core.internal.VisibleForTesting import net.corda.core.node.services.Vault import net.corda.core.schemas.MappedSchema import net.corda.core.schemas.PersistentStateRef @@ -17,14 +18,15 @@ import rx.Observable * A vault observer that extracts Object Relational Mappings for contract states that support it, and persists them with Hibernate. */ // TODO: Manage version evolution of the schemas via additional tooling. -class HibernateObserver(vaultUpdates: Observable<Vault.Update<ContractState>>, val config: HibernateConfiguration) { - +class HibernateObserver private constructor(private val config: HibernateConfiguration) { companion object { - val logger = loggerFor<HibernateObserver>() - } - - init { - vaultUpdates.subscribe { persist(it.produced) } + private val log = loggerFor<HibernateObserver>() + @JvmStatic + fun install(vaultUpdates: Observable<Vault.Update<ContractState>>, config: HibernateConfiguration): HibernateObserver { + val observer = HibernateObserver(config) + vaultUpdates.subscribe { observer.persist(it.produced) } + return observer + } } private fun persist(produced: Set<StateAndRef<ContractState>>) { @@ -33,11 +35,12 @@ class HibernateObserver(vaultUpdates: Observable<Vault.Update<ContractState>>, v private fun persistState(stateAndRef: StateAndRef<ContractState>) { val state = stateAndRef.state.data - logger.debug { "Asked to persist state ${stateAndRef.ref}" } + log.debug { "Asked to persist state ${stateAndRef.ref}" } config.schemaService.selectSchemas(state).forEach { persistStateWithSchema(state, stateAndRef.ref, it) } } - fun persistStateWithSchema(state: ContractState, stateRef: StateRef, schema: MappedSchema) { + @VisibleForTesting + internal fun persistStateWithSchema(state: ContractState, stateRef: StateRef, schema: MappedSchema) { val sessionFactory = config.sessionFactoryForSchemas(setOf(schema)) val session = sessionFactory.withOptions(). connection(DatabaseTransactionManager.current().connection). diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 89f0bf6dc6..a3df2461c8 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -68,12 +68,10 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId, * is not necessary. */ override val logger: Logger = LoggerFactory.getLogger("net.corda.flow.$id") - - @Transient private var _resultFuture: OpenFuture<R>? = openFuture() + @Transient private var resultFutureTransient: OpenFuture<R>? = openFuture() + private val _resultFuture get() = resultFutureTransient ?: openFuture<R>().also { resultFutureTransient = it } /** This future will complete when the call method returns. */ - override val resultFuture: CordaFuture<R> - get() = _resultFuture ?: openFuture<R>().also { _resultFuture = it } - + override val resultFuture: CordaFuture<R> get() = _resultFuture // This state IS serialised, as we need it to know what the fiber is waiting for. internal val openSessions = HashMap<Pair<FlowLogic<*>, Party>, FlowSessionInternal>() internal var waitingForResponse: WaitingRequest? = null @@ -115,7 +113,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId, recordDuration(startTime) // This is to prevent actionOnEnd being called twice if it throws an exception actionOnEnd(Try.Success(result), false) - _resultFuture?.set(result) + _resultFuture.set(result) logic.progressTracker?.currentStep = ProgressTracker.DONE logger.debug { "Flow finished with result ${result.toString().abbreviate(300)}" } } @@ -128,7 +126,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId, private fun processException(exception: Throwable, propagated: Boolean) { actionOnEnd(Try.Failure(exception), propagated) - _resultFuture?.setException(exception) + _resultFuture.setException(exception) logic.progressTracker?.endWithError(exception) } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index d5a189b0ee..934cc2cb83 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -269,7 +269,7 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic update.where(stateStatusPredication, lockIdPredicate, *commonPredicates) } if (updatedRows > 0 && updatedRows == stateRefs.size) { - log.trace("Reserving soft lock states for $lockId: $stateRefs") + log.trace { "Reserving soft lock states for $lockId: $stateRefs" } FlowStateMachineImpl.currentStateMachine()?.hasSoftLockedStates = true } else { // revert partial soft locks @@ -280,7 +280,7 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic update.where(lockUpdateTime, lockIdPredicate, *commonPredicates) } if (revertUpdatedRows > 0) { - log.trace("Reverting $revertUpdatedRows partially soft locked states for $lockId") + log.trace { "Reverting $revertUpdatedRows partially soft locked states for $lockId" } } throw StatesNotAvailableException("Attempted to reserve $stateRefs for $lockId but only $updatedRows rows available") } @@ -309,7 +309,7 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic update.where(*commonPredicates) } if (update > 0) { - log.trace("Releasing $update soft locked states for $lockId") + log.trace { "Releasing $update soft locked states for $lockId" } } } else { try { @@ -320,7 +320,7 @@ class NodeVaultService(private val clock: Clock, private val keyManagementServic update.where(*commonPredicates, stateRefsPredicate) } if (updatedRows > 0) { - log.trace("Releasing $updatedRows soft locked states for $lockId and stateRefs $stateRefs") + log.trace { "Releasing $updatedRows soft locked states for $lockId and stateRefs $stateRefs" } } } catch (e: Exception) { log.error("""soft lock update error attempting to release states for $lockId and $stateRefs") diff --git a/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt b/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt index 2d063a1468..8bf233589b 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt @@ -1,8 +1,8 @@ package net.corda.node.services.vault +import net.corda.core.contracts.FungibleAsset import net.corda.core.contracts.StateRef import net.corda.core.flows.FlowLogic -import net.corda.core.flows.StateMachineRunId import net.corda.core.node.services.VaultService import net.corda.core.utilities.NonEmptySet import net.corda.core.utilities.loggerFor @@ -12,50 +12,50 @@ import net.corda.node.services.statemachine.FlowStateMachineImpl import net.corda.node.services.statemachine.StateMachineManager import java.util.* -class VaultSoftLockManager(val vault: VaultService, smm: StateMachineManager) { - - private companion object { - val log = loggerFor<VaultSoftLockManager>() - } - - init { - smm.changes.subscribe { change -> - if (change is StateMachineManager.Change.Removed && (FlowStateMachineImpl.currentStateMachine())?.hasSoftLockedStates == true) { - log.trace { "Remove flow name ${change.logic.javaClass} with id $change.id" } - unregisterSoftLocks(change.logic.runId, change.logic) +class VaultSoftLockManager private constructor(private val vault: VaultService) { + companion object { + private val log = loggerFor<VaultSoftLockManager>() + @JvmStatic + fun install(vault: VaultService, smm: StateMachineManager) { + val manager = VaultSoftLockManager(vault) + smm.changes.subscribe { change -> + if (change is StateMachineManager.Change.Removed) { + val logic = change.logic + // Don't run potentially expensive query if the flow didn't lock any states: + if ((logic.stateMachine as FlowStateMachineImpl<*>).hasSoftLockedStates) { + manager.unregisterSoftLocks(logic.runId.uuid, logic) + } + } } - } - - // Discussion - // - // The intent of the following approach is to support what might be a common pattern in a flow: - // 1. Create state - // 2. Do something with state - // without possibility of another flow intercepting the state between 1 and 2, - // since we cannot lock the state before it exists. e.g. Issue and then Move some Cash. - // - // The downside is we could have a long running flow that holds a lock for a long period of time. - // However, the lock can be programmatically released, like any other soft lock, - // should we want a long running flow that creates a visible state mid way through. - - vault.rawUpdates.subscribe { (_, produced, flowId) -> - flowId?.let { - if (produced.isNotEmpty()) { - registerSoftLocks(flowId, (produced.map { it.ref }).toNonEmptySet()) + // Discussion + // + // The intent of the following approach is to support what might be a common pattern in a flow: + // 1. Create state + // 2. Do something with state + // without possibility of another flow intercepting the state between 1 and 2, + // since we cannot lock the state before it exists. e.g. Issue and then Move some Cash. + // + // The downside is we could have a long running flow that holds a lock for a long period of time. + // However, the lock can be programmatically released, like any other soft lock, + // should we want a long running flow that creates a visible state mid way through. + vault.rawUpdates.subscribe { (_, produced, flowId) -> + if (flowId != null) { + val fungible = produced.filter { it.state.data is FungibleAsset<*> } + if (fungible.isNotEmpty()) { + manager.registerSoftLocks(flowId, fungible.map { it.ref }.toNonEmptySet()) + } } } } } private fun registerSoftLocks(flowId: UUID, stateRefs: NonEmptySet<StateRef>) { - log.trace("Reserving soft locks for flow id $flowId and states $stateRefs") + log.trace { "Reserving soft locks for flow id $flowId and states $stateRefs" } vault.softLockReserve(flowId, stateRefs) } - private fun unregisterSoftLocks(id: StateMachineRunId, logic: FlowLogic<*>) { - val flowClassName = logic.javaClass.simpleName - log.trace("Releasing soft locks for flow $flowClassName with flow id ${id.uuid}") - vault.softLockRelease(id.uuid) - + private fun unregisterSoftLocks(flowId: UUID, logic: FlowLogic<*>) { + log.trace { "Releasing soft locks for flow ${logic.javaClass.simpleName} with flow id $flowId" } + vault.softLockRelease(flowId) } } \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt index 2b9ff96f1c..160ea87bf3 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt @@ -45,7 +45,7 @@ class DBTransactionStorageTests : TestDependencyInjectionBase() { override val vaultService: VaultServiceInternal get() { val vaultService = NodeVaultService(clock, keyManagementService, stateLoader, database.hibernateConfig) - hibernatePersister = HibernateObserver(vaultService.rawUpdates, database.hibernateConfig) + hibernatePersister = HibernateObserver.install(vaultService.rawUpdates, database.hibernateConfig) return vaultService } diff --git a/node/src/test/kotlin/net/corda/node/services/schema/HibernateObserverTests.kt b/node/src/test/kotlin/net/corda/node/services/schema/HibernateObserverTests.kt index 33e8bcdad4..30d877ab93 100644 --- a/node/src/test/kotlin/net/corda/node/services/schema/HibernateObserverTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/schema/HibernateObserverTests.kt @@ -69,8 +69,7 @@ class HibernateObserverTests { } } val database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), ::makeTestIdentityService, schemaService) - @Suppress("UNUSED_VARIABLE") - val observer = HibernateObserver(rawUpdatesPublisher, database.hibernateConfig) + HibernateObserver.install(rawUpdatesPublisher, database.hibernateConfig) database.transaction { rawUpdatesPublisher.onNext(Vault.Update(emptySet(), setOf(StateAndRef(TransactionState(TestState(), DummyContract.PROGRAM_ID, MEGA_CORP), StateRef(SecureHash.sha256("dummy"), 0))))) val parentRowCountResult = DatabaseTransactionManager.current().connection.prepareStatement("select count(*) from Parents").executeQuery() diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt new file mode 100644 index 0000000000..d0d7562234 --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultSoftLockManagerTest.kt @@ -0,0 +1,161 @@ +package net.corda.node.services.vault + +import co.paralleluniverse.fibers.Suspendable +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.verify +import com.nhaarman.mockito_kotlin.verifyNoMoreInteractions +import net.corda.core.contracts.* +import net.corda.core.flows.FinalityFlow +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatingFlow +import net.corda.core.identity.AbstractParty +import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.packageName +import net.corda.core.internal.uncheckedCast +import net.corda.core.messaging.SingleMessageRecipient +import net.corda.core.node.StateLoader +import net.corda.core.node.services.KeyManagementService +import net.corda.core.node.services.queryBy +import net.corda.core.node.services.vault.QueryCriteria.SoftLockingCondition +import net.corda.core.node.services.vault.QueryCriteria.SoftLockingType.LOCKED_ONLY +import net.corda.core.node.services.vault.QueryCriteria.VaultQueryCriteria +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.NonEmptySet +import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap +import net.corda.node.internal.InitiatedFlowFactory +import net.corda.node.services.api.VaultServiceInternal +import net.corda.node.services.config.NodeConfiguration +import net.corda.nodeapi.internal.ServiceInfo +import net.corda.testing.chooseIdentity +import net.corda.testing.node.MockNetwork +import org.junit.After +import org.junit.Test +import java.math.BigInteger +import java.security.KeyPair +import java.util.* +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.reflect.jvm.jvmName +import kotlin.test.assertEquals + +private class NodePair(private val mockNet: MockNetwork) { + private class ServerLogic(private val session: FlowSession, private val running: AtomicBoolean) : FlowLogic<Unit>() { + @Suspendable + override fun call() { + running.set(true) + session.receive<String>().unwrap { assertEquals("ping", it) } + session.send("pong") + } + } + + @InitiatingFlow + abstract class AbstractClientLogic<out T>(nodePair: NodePair) : FlowLogic<T>() { + protected val server = nodePair.server.info.chooseIdentity() + protected abstract fun callImpl(): T + @Suspendable + override fun call() = callImpl().also { + initiateFlow(server).sendAndReceive<String>("ping").unwrap { assertEquals("pong", it) } + } + } + + private val serverRunning = AtomicBoolean() + val server = mockNet.createNode() + var client = mockNet.createNode().apply { + internals.disableDBCloseOnStop() // Otherwise the in-memory database may disappear (taking the checkpoint with it) while we reboot the client. + } + private set + + fun <T> communicate(clientLogic: AbstractClientLogic<T>, rebootClient: Boolean): FlowStateMachine<T> { + server.internals.internalRegisterFlowFactory(AbstractClientLogic::class.java, InitiatedFlowFactory.Core { ServerLogic(it, serverRunning) }, ServerLogic::class.java, false) + client.services.startFlow(clientLogic) + while (!serverRunning.get()) mockNet.runNetwork(1) + if (rebootClient) { + client.dispose() + client = mockNet.createNode(client.internals.id) + } + return uncheckedCast(client.smm.allStateMachines.single().stateMachine) + } +} + +class VaultSoftLockManagerTest { + private val mockVault: VaultServiceInternal = mock() + private val mockNet = MockNetwork(cordappPackages = listOf(ContractImpl::class.packageName), defaultFactory = object : MockNetwork.Factory<MockNetwork.MockNode> { + override fun create(config: NodeConfiguration, network: MockNetwork, networkMapAddr: SingleMessageRecipient?, id: Int, notaryIdentity: Pair<ServiceInfo, KeyPair>?, entropyRoot: BigInteger): MockNetwork.MockNode { + return object : MockNetwork.MockNode(config, network, networkMapAddr, id, notaryIdentity, entropyRoot) { + override fun makeVaultService(keyManagementService: KeyManagementService, stateLoader: StateLoader): VaultServiceInternal { + val realVault = super.makeVaultService(keyManagementService, stateLoader) + return object : VaultServiceInternal by realVault { + override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) { + mockVault.softLockRelease(lockId, stateRefs) // No need to also call the real one for these tests. + } + } + } + } + } + }) + private val nodePair = NodePair(mockNet) + @After + fun tearDown() { + mockNet.stopNodes() + } + + private object CommandDataImpl : CommandData + private class ClientLogic(nodePair: NodePair, private val state: ContractState) : NodePair.AbstractClientLogic<List<ContractState>>(nodePair) { + override fun callImpl() = run { + subFlow(FinalityFlow(serviceHub.signInitialTransaction(TransactionBuilder(notary = ourIdentity).apply { + addOutputState(state, ContractImpl::class.jvmName) + addCommand(CommandDataImpl, ourIdentity.owningKey) + }))) + serviceHub.vaultService.queryBy<ContractState>(VaultQueryCriteria(softLockingCondition = SoftLockingCondition(LOCKED_ONLY))).states.map { + it.state.data + } + } + } + + private abstract class SingleParticipantState(nodePair: NodePair) : ContractState { + override val participants = listOf(nodePair.client.info.chooseIdentity()) + } + + private class PlainOldState(nodePair: NodePair) : SingleParticipantState(nodePair) + private class FungibleAssetImpl(nodePair: NodePair) : SingleParticipantState(nodePair), FungibleAsset<Unit> { + override val owner get() = participants[0] + override fun withNewOwner(newOwner: AbstractParty) = throw UnsupportedOperationException() + override val amount get() = Amount(1, Issued(PartyAndReference(owner, OpaqueBytes.of(1)), Unit)) + override val exitKeys get() = throw UnsupportedOperationException() + override fun withNewOwnerAndAmount(newAmount: Amount<Issued<Unit>>, newOwner: AbstractParty) = throw UnsupportedOperationException() + override fun equals(other: Any?) = other is FungibleAssetImpl && participants == other.participants + override fun hashCode() = participants.hashCode() + } + + class ContractImpl : Contract { + override fun verify(tx: LedgerTransaction) {} + } + + private fun run(expectSoftLock: Boolean, state: ContractState, checkpoint: Boolean) { + val fsm = nodePair.communicate(ClientLogic(nodePair, state), checkpoint) + mockNet.runNetwork() + if (expectSoftLock) { + assertEquals(listOf(state), fsm.resultFuture.getOrThrow()) + verify(mockVault).softLockRelease(fsm.id.uuid, null) + } else { + assertEquals(emptyList(), fsm.resultFuture.getOrThrow()) + // In this case we don't want softLockRelease called so that we avoid its expensive query, even after restore from checkpoint. + } + verifyNoMoreInteractions(mockVault) + } + + @Test + fun `plain old state is not soft locked`() = run(false, PlainOldState(nodePair), false) + + @Test + fun `plain old state is not soft locked with checkpoint`() = run(false, PlainOldState(nodePair), true) + + @Test + fun `fungible asset is soft locked`() = run(true, FungibleAssetImpl(nodePair), false) + + @Test + fun `fungible asset is soft locked with checkpoint`() = run(true, FungibleAssetImpl(nodePair), true) +} diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index fcd5689800..fa881e1703 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -171,7 +171,7 @@ open class MockServices( fun makeVaultService(hibernateConfig: HibernateConfiguration): VaultServiceInternal { val vaultService = NodeVaultService(Clock.systemUTC(), keyManagementService, stateLoader, hibernateConfig) - hibernatePersister = HibernateObserver(vaultService.rawUpdates, hibernateConfig) + hibernatePersister = HibernateObserver.install(vaultService.rawUpdates, hibernateConfig) return vaultService }