mirror of
https://github.com/corda/corda.git
synced 2024-12-19 21:17:58 +00:00
Merged in temporary-fix-inmemory-wallet-thread-clash (pull request #184)
Temporary fix inmemory wallet thread clash
This commit is contained in:
commit
8b24acf69e
@ -23,7 +23,11 @@ val TOPIC_DEFAULT_POSTFIX = ".0"
|
||||
* change out from underneath you, even though the canonical currently-best-known wallet may change as we learn
|
||||
* about new transactions from our peers and generate new transactions that consume states ourselves.
|
||||
*
|
||||
* This absract class has no references to Cash contracts.
|
||||
* This abstract class has no references to Cash contracts.
|
||||
*
|
||||
* [states] Holds the list of states that are *active* and *relevant*.
|
||||
* Active means they haven't been consumed yet (or we don't know about it).
|
||||
* Relevant means they contain at least one of our pubkeys
|
||||
*/
|
||||
class Wallet(val states: List<StateAndRef<ContractState>>) {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
|
@ -0,0 +1,10 @@
|
||||
package com.r3corda.core.testing
|
||||
|
||||
import com.r3corda.core.contracts.Contract
|
||||
import com.r3corda.core.contracts.TransactionForContract
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
|
||||
class AlwaysSucceedContract(override val legalContractReference: SecureHash = SecureHash.sha256("Always succeed contract")) : Contract {
|
||||
override fun verify(tx: TransactionForContract) {
|
||||
}
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package com.r3corda.core.testing
|
||||
|
||||
import com.r3corda.core.contracts.Contract
|
||||
import com.r3corda.core.contracts.LinearState
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
|
||||
class DummyLinearState(
|
||||
override val thread: SecureHash = SecureHash.randomSHA256(),
|
||||
override val contract: Contract = AlwaysSucceedContract(),
|
||||
override val participants: List<PublicKey> = listOf(),
|
||||
val nonce: SecureHash = SecureHash.randomSHA256()) : LinearState {
|
||||
|
||||
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean {
|
||||
return participants.any { ourKeys.contains(it) }
|
||||
}
|
||||
}
|
@ -22,6 +22,8 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
*/
|
||||
@ThreadSafe
|
||||
open class InMemoryWalletService(private val services: ServiceHub) : SingletonSerializeAsToken(), WalletService {
|
||||
class ClashingThreads(threads: Set<SecureHash>, transactions: Iterable<WireTransaction>) :
|
||||
Exception("There are multiple linear head states after processing transactions $transactions. The clashing thread(s): $threads")
|
||||
private val log = loggerFor<InMemoryWalletService>()
|
||||
|
||||
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're
|
||||
@ -44,7 +46,7 @@ open class InMemoryWalletService(private val services: ServiceHub) : SingletonSe
|
||||
* Returns a snapshot of the heads of LinearStates
|
||||
*/
|
||||
override val linearHeads: Map<SecureHash, StateAndRef<LinearState>>
|
||||
get() = mutex.locked { wallet }.let { wallet ->
|
||||
get() = currentWallet.let { wallet ->
|
||||
wallet.states.filterStatesOfType<LinearState>().associateBy { it.state.data.thread }.mapValues { it.value }
|
||||
}
|
||||
|
||||
@ -74,10 +76,17 @@ open class InMemoryWalletService(private val services: ServiceHub) : SingletonSe
|
||||
val combinedDelta = delta + walletAndDelta.second
|
||||
Pair(wallet, combinedDelta)
|
||||
}
|
||||
|
||||
val clashingThreads = walletAndNetDelta.first.clashingThreads
|
||||
if (!clashingThreads.isEmpty()) {
|
||||
throw ClashingThreads(clashingThreads, txns)
|
||||
}
|
||||
|
||||
wallet = walletAndNetDelta.first
|
||||
netDelta = walletAndNetDelta.second
|
||||
return@locked wallet
|
||||
}
|
||||
|
||||
if (netDelta != Wallet.NoUpdate) {
|
||||
_updatesPublisher.onNext(netDelta)
|
||||
}
|
||||
@ -120,4 +129,23 @@ open class InMemoryWalletService(private val services: ServiceHub) : SingletonSe
|
||||
|
||||
return Pair(Wallet(newStates), change)
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
// Returns the set of LinearState threads that clash in the wallet
|
||||
val Wallet.clashingThreads: Set<SecureHash> get() {
|
||||
val clashingThreads = HashSet<SecureHash>()
|
||||
val threadsSeen = HashSet<SecureHash>()
|
||||
for (linearState in states.filterStatesOfType<LinearState>()) {
|
||||
val thread = linearState.state.data.thread
|
||||
if (threadsSeen.contains(thread)) {
|
||||
clashingThreads.add(thread)
|
||||
} else {
|
||||
threadsSeen.add(thread)
|
||||
}
|
||||
}
|
||||
return clashingThreads
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import com.r3corda.contracts.cash.Cash
|
||||
import com.r3corda.contracts.cash.cashBalances
|
||||
import com.r3corda.contracts.testing.fillWithSomeTestCash
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.node.services.testing.MockKeyManagementService
|
||||
import com.r3corda.core.node.services.testing.MockStorageService
|
||||
@ -14,6 +15,7 @@ import com.r3corda.node.services.wallet.NodeWalletService
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNull
|
||||
@ -96,4 +98,62 @@ class WalletWithCashTest {
|
||||
|
||||
// TODO: Flesh out these tests as needed.
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
fun branchingLinearStatesFails() {
|
||||
val (wallet, services) = make()
|
||||
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
|
||||
val thread = SecureHash.sha256("thread")
|
||||
|
||||
// Issue a linear state
|
||||
val dummyIssue = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
|
||||
addOutputState(DummyLinearState(thread = thread, participants = listOf(freshKey.public)))
|
||||
signWith(freshKey)
|
||||
}.toSignedTransaction()
|
||||
|
||||
wallet.notify(dummyIssue.tx)
|
||||
assertEquals(1, wallet.currentWallet.states.size)
|
||||
|
||||
// Issue another linear state of the same thread (nonce different)
|
||||
val dummyIssue2 = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
|
||||
addOutputState(DummyLinearState(thread = thread, participants = listOf(freshKey.public)))
|
||||
signWith(freshKey)
|
||||
}.toSignedTransaction()
|
||||
|
||||
assertThatThrownBy {
|
||||
wallet.notify(dummyIssue2.tx)
|
||||
}
|
||||
assertEquals(1, wallet.currentWallet.states.size)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun sequencingLinearStatesWorks() {
|
||||
val (wallet, services) = make()
|
||||
|
||||
val freshKey = services.keyManagementService.freshKey()
|
||||
|
||||
val thread = SecureHash.sha256("thread")
|
||||
|
||||
// Issue a linear state
|
||||
val dummyIssue = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
|
||||
addOutputState(DummyLinearState(thread = thread, participants = listOf(freshKey.public)))
|
||||
signWith(freshKey)
|
||||
}.toSignedTransaction()
|
||||
|
||||
wallet.notify(dummyIssue.tx)
|
||||
assertEquals(1, wallet.currentWallet.states.size)
|
||||
|
||||
// Move the same state
|
||||
val dummyMove = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
|
||||
addOutputState(DummyLinearState(thread = thread, participants = listOf(freshKey.public)))
|
||||
addInputState(dummyIssue.tx.outRef<LinearState>(0))
|
||||
signWith(DUMMY_NOTARY_KEY)
|
||||
}.toSignedTransaction()
|
||||
|
||||
wallet.notify(dummyMove.tx)
|
||||
assertEquals(1, wallet.currentWallet.states.size)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user