mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
Cleaned up NonEmptySet and expanded its usage in the codebase
This commit is contained in:
@ -2,12 +2,12 @@ package net.corda.services.messaging
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.crypto.X509Utilities
|
||||
import net.corda.core.crypto.cert
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.seconds
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.node.internal.NetworkMapInfo
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.messaging.sendRequest
|
||||
@ -30,7 +30,7 @@ class P2PSecurityTest : NodeBasedTest() {
|
||||
|
||||
@Test
|
||||
fun `incorrect legal name for the network map service config`() {
|
||||
val incorrectNetworkMapName = X509Utilities.getDevX509Name("NetworkMap-${random63BitValue()}")
|
||||
val incorrectNetworkMapName = getTestX509Name("NetworkMap-${random63BitValue()}")
|
||||
val node = startNode(BOB.name, configOverrides = mapOf(
|
||||
"networkMapService" to mapOf(
|
||||
"address" to networkMapNode.configuration.p2pAddress.toString(),
|
||||
@ -67,7 +67,7 @@ class P2PSecurityTest : NodeBasedTest() {
|
||||
|
||||
private fun SimpleNode.registerWithNetworkMap(registrationName: X500Name): ListenableFuture<NetworkMapService.RegistrationResponse> {
|
||||
val legalIdentity = getTestPartyAndCertificate(registrationName, identity.public)
|
||||
val nodeInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), legalIdentity, setOf(legalIdentity), 1)
|
||||
val nodeInfo = NodeInfo(listOf(MOCK_HOST_AND_PORT), legalIdentity, NonEmptySet.of(legalIdentity), 1)
|
||||
val registration = NodeRegistration(nodeInfo, System.currentTimeMillis(), AddOrRemove.ADD, Instant.MAX)
|
||||
val request = RegistrationRequest(registration.toWire(keyService, identity.public), network.myAddress)
|
||||
return network.sendRequest<NetworkMapService.RegistrationResponse>(NetworkMapService.REGISTER_TOPIC, request, networkMapNode.network.myAddress)
|
||||
|
@ -26,6 +26,7 @@ import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.flows.*
|
||||
import net.corda.node.services.*
|
||||
import net.corda.node.services.api.*
|
||||
@ -495,7 +496,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
private fun makeInfo(): NodeInfo {
|
||||
val advertisedServiceEntries = makeServiceEntries()
|
||||
val legalIdentity = obtainLegalIdentity()
|
||||
val allIdentitiesSet = advertisedServiceEntries.map { it.identity }.toSet() + legalIdentity
|
||||
val allIdentitiesSet = (advertisedServiceEntries.map { it.identity } + legalIdentity).toNonEmptySet()
|
||||
val addresses = myAddresses() // TODO There is no support for multiple IP addresses yet.
|
||||
return NodeInfo(addresses, legalIdentity, allIdentitiesSet, platformVersion, advertisedServiceEntries, findMyLocation())
|
||||
}
|
||||
|
@ -33,10 +33,7 @@ import net.corda.core.serialization.storageKryo
|
||||
import net.corda.core.tee
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.toHexString
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.node.services.database.RequeryConfiguration
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.services.vault.schemas.requery.*
|
||||
@ -261,44 +258,42 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
}
|
||||
|
||||
@Throws(StatesNotAvailableException::class)
|
||||
override fun softLockReserve(lockId: UUID, stateRefs: Set<StateRef>) {
|
||||
if (stateRefs.isNotEmpty()) {
|
||||
val softLockTimestamp = services.clock.instant()
|
||||
val stateRefArgs = stateRefArgs(stateRefs)
|
||||
try {
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
val updatedRows = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, lockId.toString())
|
||||
.set(VaultStatesEntity.LOCK_UPDATE_TIME, softLockTimestamp)
|
||||
.where(VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED)
|
||||
.and((VaultStatesEntity.LOCK_ID eq lockId.toString()) or (VaultStatesEntity.LOCK_ID.isNull()))
|
||||
override fun softLockReserve(lockId: UUID, stateRefs: NonEmptySet<StateRef>) {
|
||||
val softLockTimestamp = services.clock.instant()
|
||||
val stateRefArgs = stateRefArgs(stateRefs)
|
||||
try {
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
val updatedRows = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, lockId.toString())
|
||||
.set(VaultStatesEntity.LOCK_UPDATE_TIME, softLockTimestamp)
|
||||
.where(VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED)
|
||||
.and((VaultStatesEntity.LOCK_ID eq lockId.toString()) or (VaultStatesEntity.LOCK_ID.isNull()))
|
||||
.and(stateRefCompositeColumn.`in`(stateRefArgs)).get().value()
|
||||
if (updatedRows > 0 && updatedRows == stateRefs.size) {
|
||||
log.trace("Reserving soft lock states for $lockId: $stateRefs")
|
||||
FlowStateMachineImpl.currentStateMachine()?.hasSoftLockedStates = true
|
||||
} else {
|
||||
// revert partial soft locks
|
||||
val revertUpdatedRows = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, null)
|
||||
.where(VaultStatesEntity.LOCK_UPDATE_TIME eq softLockTimestamp)
|
||||
.and(VaultStatesEntity.LOCK_ID eq lockId.toString())
|
||||
.and(stateRefCompositeColumn.`in`(stateRefArgs)).get().value()
|
||||
if (updatedRows > 0 && updatedRows == stateRefs.size) {
|
||||
log.trace("Reserving soft lock states for $lockId: $stateRefs")
|
||||
FlowStateMachineImpl.currentStateMachine()?.hasSoftLockedStates = true
|
||||
} else {
|
||||
// revert partial soft locks
|
||||
val revertUpdatedRows = update(VaultStatesEntity::class)
|
||||
.set(VaultStatesEntity.LOCK_ID, null)
|
||||
.where(VaultStatesEntity.LOCK_UPDATE_TIME eq softLockTimestamp)
|
||||
.and(VaultStatesEntity.LOCK_ID eq lockId.toString())
|
||||
.and(stateRefCompositeColumn.`in`(stateRefArgs)).get().value()
|
||||
if (revertUpdatedRows > 0) {
|
||||
log.trace("Reverting $revertUpdatedRows partially soft locked states for $lockId")
|
||||
}
|
||||
throw StatesNotAvailableException("Attempted to reserve $stateRefs for $lockId but only $updatedRows rows available")
|
||||
if (revertUpdatedRows > 0) {
|
||||
log.trace("Reverting $revertUpdatedRows partially soft locked states for $lockId")
|
||||
}
|
||||
throw StatesNotAvailableException("Attempted to reserve $stateRefs for $lockId but only $updatedRows rows available")
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
log.error("""soft lock update error attempting to reserve states for $lockId and $stateRefs")
|
||||
}
|
||||
} catch (e: PersistenceException) {
|
||||
log.error("""soft lock update error attempting to reserve states for $lockId and $stateRefs")
|
||||
$e.
|
||||
""")
|
||||
if (e.cause is StatesNotAvailableException) throw (e.cause as StatesNotAvailableException)
|
||||
}
|
||||
if (e.cause is StatesNotAvailableException) throw (e.cause as StatesNotAvailableException)
|
||||
}
|
||||
}
|
||||
|
||||
override fun softLockRelease(lockId: UUID, stateRefs: Set<StateRef>?) {
|
||||
override fun softLockRelease(lockId: UUID, stateRefs: NonEmptySet<StateRef>?) {
|
||||
if (stateRefs == null) {
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
val update = update(VaultStatesEntity::class)
|
||||
@ -310,7 +305,7 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
log.trace("Releasing ${update.value()} soft locked states for $lockId")
|
||||
}
|
||||
}
|
||||
} else if (stateRefs.isNotEmpty()) {
|
||||
} else {
|
||||
try {
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
val updatedRows = update(VaultStatesEntity::class)
|
||||
@ -398,7 +393,7 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
log.trace("Coin selection for $amount retrieved ${stateAndRefs.count()} states totalling $totalPennies pennies: $stateAndRefs")
|
||||
|
||||
// update database
|
||||
softLockReserve(lockId, stateAndRefs.map { it.ref }.toSet())
|
||||
softLockReserve(lockId, (stateAndRefs.map { it.ref }).toNonEmptySet())
|
||||
return stateAndRefs
|
||||
}
|
||||
log.trace("Coin selection requested $amount but retrieved $totalPennies pennies with state refs: ${stateAndRefs.map { it.ref }}")
|
||||
|
@ -4,7 +4,9 @@ import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
@ -36,18 +38,18 @@ class VaultSoftLockManager(val vault: VaultService, smm: StateMachineManager) {
|
||||
// However, the lock can be programmatically released, like any other soft lock,
|
||||
// should we want a long running flow that creates a visible state mid way through.
|
||||
|
||||
vault.rawUpdates.subscribe { update ->
|
||||
update.flowId?.let {
|
||||
if (update.produced.isNotEmpty()) {
|
||||
registerSoftLocks(update.flowId as UUID, update.produced.map { it.ref })
|
||||
vault.rawUpdates.subscribe { (_, produced, flowId) ->
|
||||
flowId?.let {
|
||||
if (produced.isNotEmpty()) {
|
||||
registerSoftLocks(flowId, (produced.map { it.ref }).toNonEmptySet())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun registerSoftLocks(flowId: UUID, stateRefs: List<StateRef>) {
|
||||
private fun registerSoftLocks(flowId: UUID, stateRefs: NonEmptySet<StateRef>) {
|
||||
log.trace("Reserving soft locks for flow id $flowId and states $stateRefs")
|
||||
vault.softLockReserve(flowId, stateRefs.toSet())
|
||||
vault.softLockReserve(flowId, stateRefs)
|
||||
}
|
||||
|
||||
private fun unregisterSoftLocks(id: StateMachineRunId, logic: FlowLogic<*>) {
|
||||
|
@ -26,7 +26,7 @@ import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.testing.LogHelper
|
||||
import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.flows.TwoPartyTradeFlow.Buyer
|
||||
import net.corda.flows.TwoPartyTradeFlow.Seller
|
||||
@ -155,7 +155,10 @@ class TwoPartyTradeFlowTests {
|
||||
val cashLockId = UUID.randomUUID()
|
||||
bobNode.database.transaction {
|
||||
// lock the cash states with an arbitrary lockId (to prevent the Buyer flow from claiming the states)
|
||||
bobNode.services.vaultService.softLockReserve(cashLockId, cashStates.states.map { it.ref }.toSet())
|
||||
val refs = cashStates.states.map { it.ref }
|
||||
if (refs.isNotEmpty()) {
|
||||
bobNode.services.vaultService.softLockReserve(cashLockId, refs.toNonEmptySet())
|
||||
}
|
||||
}
|
||||
|
||||
val (bobStateMachine, aliceResult) = runBuyerAndSeller(notaryNode, aliceNode, bobNode,
|
||||
|
@ -11,7 +11,9 @@ import net.corda.core.node.services.VaultService
|
||||
import net.corda.core.node.services.unconsumedStates
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.node.utilities.configureDatabase
|
||||
import net.corda.node.utilities.transaction
|
||||
import net.corda.testing.*
|
||||
@ -116,7 +118,7 @@ class NodeVaultServiceTest {
|
||||
val unconsumedStates = vaultSvc.unconsumedStates<Cash.State>().toList()
|
||||
assertThat(unconsumedStates).hasSize(3)
|
||||
|
||||
val stateRefsToSoftLock = setOf(unconsumedStates[1].ref, unconsumedStates[2].ref)
|
||||
val stateRefsToSoftLock = NonEmptySet.of(unconsumedStates[1].ref, unconsumedStates[2].ref)
|
||||
|
||||
// soft lock two of the three states
|
||||
val softLockId = UUID.randomUUID()
|
||||
@ -132,7 +134,7 @@ class NodeVaultServiceTest {
|
||||
assertThat(unlockedStates1).hasSize(1)
|
||||
|
||||
// soft lock release one of the states explicitly
|
||||
vaultSvc.softLockRelease(softLockId, setOf(unconsumedStates[1].ref))
|
||||
vaultSvc.softLockRelease(softLockId, NonEmptySet.of(unconsumedStates[1].ref))
|
||||
val unlockedStates2 = vaultSvc.unconsumedStates<Cash.State>(includeSoftLockedStates = false).toList()
|
||||
assertThat(unlockedStates2).hasSize(2)
|
||||
|
||||
@ -160,7 +162,7 @@ class NodeVaultServiceTest {
|
||||
assertNull(vaultSvc.cashBalances[USD])
|
||||
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
}
|
||||
val stateRefsToSoftLock = vaultStates.states.map { it.ref }.toSet()
|
||||
val stateRefsToSoftLock = (vaultStates.states.map { it.ref }).toNonEmptySet()
|
||||
println("State Refs:: $stateRefsToSoftLock")
|
||||
|
||||
// 1st tx locks states
|
||||
@ -216,19 +218,19 @@ class NodeVaultServiceTest {
|
||||
assertNull(vaultSvc.cashBalances[USD])
|
||||
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
}
|
||||
val stateRefsToSoftLock = vaultStates.states.map { it.ref }.toSet()
|
||||
val stateRefsToSoftLock = vaultStates.states.map { it.ref }
|
||||
println("State Refs:: $stateRefsToSoftLock")
|
||||
|
||||
// lock 1st state with LockId1
|
||||
database.transaction {
|
||||
vaultSvc.softLockReserve(softLockId1, setOf(stateRefsToSoftLock.first()))
|
||||
vaultSvc.softLockReserve(softLockId1, NonEmptySet.of(stateRefsToSoftLock.first()))
|
||||
assertThat(vaultSvc.softLockedStates<Cash.State>(softLockId1)).hasSize(1)
|
||||
}
|
||||
|
||||
// attempt to lock all 3 states with LockId2
|
||||
database.transaction {
|
||||
assertThatExceptionOfType(StatesNotAvailableException::class.java).isThrownBy(
|
||||
{ vaultSvc.softLockReserve(softLockId2, stateRefsToSoftLock) }
|
||||
{ vaultSvc.softLockReserve(softLockId2, stateRefsToSoftLock.toNonEmptySet()) }
|
||||
).withMessageContaining("only 2 rows available").withNoCause()
|
||||
}
|
||||
}
|
||||
@ -243,7 +245,7 @@ class NodeVaultServiceTest {
|
||||
assertNull(vaultSvc.cashBalances[USD])
|
||||
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
}
|
||||
val stateRefsToSoftLock = vaultStates.states.map { it.ref }.toSet()
|
||||
val stateRefsToSoftLock = (vaultStates.states.map { it.ref }).toNonEmptySet()
|
||||
println("State Refs:: $stateRefsToSoftLock")
|
||||
|
||||
// lock states with LockId1
|
||||
@ -269,18 +271,18 @@ class NodeVaultServiceTest {
|
||||
assertNull(vaultSvc.cashBalances[USD])
|
||||
services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 3, 3, Random(0L))
|
||||
}
|
||||
val stateRefsToSoftLock = vaultStates.states.map { it.ref }.toSet()
|
||||
val stateRefsToSoftLock = vaultStates.states.map { it.ref }
|
||||
println("State Refs:: $stateRefsToSoftLock")
|
||||
|
||||
// lock states with LockId1
|
||||
database.transaction {
|
||||
vaultSvc.softLockReserve(softLockId1, setOf(stateRefsToSoftLock.first()))
|
||||
vaultSvc.softLockReserve(softLockId1, NonEmptySet.of(stateRefsToSoftLock.first()))
|
||||
assertThat(vaultSvc.softLockedStates<Cash.State>(softLockId1)).hasSize(1)
|
||||
}
|
||||
|
||||
// attempt to lock all states with LockId1 (including previously already locked one)
|
||||
database.transaction {
|
||||
vaultSvc.softLockReserve(softLockId1, stateRefsToSoftLock)
|
||||
vaultSvc.softLockReserve(softLockId1, stateRefsToSoftLock.toNonEmptySet())
|
||||
assertThat(vaultSvc.softLockedStates<Cash.State>(softLockId1)).hasSize(3)
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import net.corda.core.node.services.vault.QueryCriteria.*
|
||||
import net.corda.core.seconds
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.toHexString
|
||||
import net.corda.node.services.database.HibernateConfiguration
|
||||
import net.corda.node.services.schema.NodeSchemaService
|
||||
@ -470,7 +471,7 @@ class VaultQueryTests {
|
||||
database.transaction {
|
||||
|
||||
val issuedStates = services.fillWithSomeTestCash(100.DOLLARS, CASH_NOTARY, 3, 3, Random(0L))
|
||||
vaultSvc.softLockReserve(UUID.randomUUID(), setOf(issuedStates.states.first().ref, issuedStates.states.last().ref))
|
||||
vaultSvc.softLockReserve(UUID.randomUUID(), NonEmptySet.of(issuedStates.states.first().ref, issuedStates.states.last().ref))
|
||||
|
||||
val criteria = VaultQueryCriteria(includeSoftlockedStates = false)
|
||||
val results = vaultQuerySvc.queryBy<ContractState>(criteria)
|
||||
|
Reference in New Issue
Block a user