diff --git a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt index 5cd4529c6d..86bb3b2931 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt @@ -24,6 +24,7 @@ import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.node.services.Permissions import net.corda.node.services.statemachine.StaffedFlowHospital +import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.singleIdentity @@ -598,7 +599,12 @@ class VaultObserverExceptionTest { @Test(timeout=300_000) fun `Throw user error in VaultService rawUpdates during counterparty FinalityFlow blows up the flow but does not break the Observer`() { var observationCounter = 0 - StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter } + // Semaphore is used to wait until [PassErroneousOwnableStateReceiver] gets hospitalized, only after that moment let testing thread assert 'observationCounter' + val counterPartyHospitalized = Semaphore(0) + StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> + ++observationCounter + counterPartyHospitalized.release() + } val rawUpdatesCount = ConcurrentHashMap() DbListenerService.onNextVisited = { party -> @@ -644,6 +650,7 @@ class VaultObserverExceptionTest { assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size) assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size) assertEquals(1, notary.getNotarisedTransactionIds().size) + counterPartyHospitalized.acquire() assertEquals(1, observationCounter) assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) assertEquals(1, rawUpdatesCount[bobNode.nodeInfo.singleIdentity()]) @@ -653,6 +660,7 @@ class VaultObserverExceptionTest { assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.CONSUMED).size) assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size) assertEquals(2, notary.getNotarisedTransactionIds().size) + counterPartyHospitalized.acquire() assertEquals(2, observationCounter) assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()]) assertEquals(2, rawUpdatesCount[bobNode.nodeInfo.singleIdentity()]) @@ -833,14 +841,13 @@ class VaultObserverExceptionTest { @StartableByRPC class NotarisedTxs : FlowLogic>() { override fun call(): List { - val session = serviceHub.jdbcSession() - val statement = session.createStatement() - statement.execute("SELECT TRANSACTION_ID FROM NODE_NOTARY_COMMITTED_TXS;") - val result = mutableListOf() - while (statement.resultSet.next()) { - result.add(statement.resultSet.getString(1)) + return serviceHub.withEntityManager { + val criteriaQuery = this.criteriaBuilder.createQuery(String::class.java) + val root = criteriaQuery.from(PersistentUniquenessProvider.CommittedTransaction::class.java) + criteriaQuery.select(root.get(PersistentUniquenessProvider.CommittedTransaction::transactionId.name)) + val query = this.createQuery(criteriaQuery) + query.resultList } - return result } }