Merged in mnesbit-cleanup-remove-clashingthreads (pull request #281)

Mnesbit cleanup remove clashingthreads
This commit is contained in:
Matthew Nesbit 2016-08-19 16:05:50 +01:00
commit 21092e2bb6
11 changed files with 118 additions and 88 deletions

View File

@ -450,14 +450,18 @@ class InterestRateSwap() : Contract {
fun extractCommands(tx: TransactionForContract): Collection<AuthenticatedObject<CommandData>>
= tx.commands.select<Commands>()
override fun verify(tx: TransactionForContract) = verifyClauses(tx, listOf(Clause.Timestamped(), Clause.Group()), extractCommands(tx))
override fun verify(tx: TransactionForContract) {
verifyClauses(tx,
listOf(Clause.Timestamped(), Clause.Group(), LinearState.ClauseVerifier(State::class.java)),
extractCommands(tx))
}
interface Clause {
/**
* Common superclass for IRS contract clauses, which defines behaviour on match/no-match, and provides
* helper functions for the clauses.
*/
abstract class AbstractIRSClause : GroupClause<State, String> {
abstract class AbstractIRSClause : GroupClause<State, UniqueIdentifier> {
override val ifMatched = MatchBehaviour.END
override val ifNotMatched = MatchBehaviour.CONTINUE
@ -502,13 +506,13 @@ class InterestRateSwap() : Contract {
}
}
class Group : GroupClauseVerifier<State, String>() {
class Group : GroupClauseVerifier<State, UniqueIdentifier>() {
override val ifMatched = MatchBehaviour.END
override val ifNotMatched = MatchBehaviour.ERROR
override fun groupStates(tx: TransactionForContract): List<TransactionForContract.InOutGroup<State, String>>
override fun groupStates(tx: TransactionForContract): List<TransactionForContract.InOutGroup<State, UniqueIdentifier>>
// Group by Trade ID for in / out states
= tx.groupStates() { state -> state.common.tradeID }
= tx.groupStates() { state -> state.linearId }
override val clauses = listOf(Agree(), Fix(), Pay(), Mature())
}
@ -532,7 +536,7 @@ class InterestRateSwap() : Contract {
inputs: List<State>,
outputs: List<State>,
commands: Collection<AuthenticatedObject<CommandData>>,
token: String): Set<CommandData> {
token: UniqueIdentifier): Set<CommandData> {
val command = tx.commands.requireSingleCommand<Commands.Agree>()
val irs = outputs.filterIsInstance<State>().single()
requireThat {
@ -568,7 +572,7 @@ class InterestRateSwap() : Contract {
inputs: List<State>,
outputs: List<State>,
commands: Collection<AuthenticatedObject<CommandData>>,
token: String): Set<CommandData> {
token: UniqueIdentifier): Set<CommandData> {
val command = tx.commands.requireSingleCommand<Commands.Refix>()
val irs = outputs.filterIsInstance<State>().single()
val prevIrs = inputs.filterIsInstance<State>().single()
@ -613,7 +617,7 @@ class InterestRateSwap() : Contract {
inputs: List<State>,
outputs: List<State>,
commands: Collection<AuthenticatedObject<CommandData>>,
token: String): Set<CommandData> {
token: UniqueIdentifier): Set<CommandData> {
val command = tx.commands.requireSingleCommand<Commands.Pay>()
requireThat {
"Payments not supported / verifiable yet" by false
@ -629,11 +633,12 @@ class InterestRateSwap() : Contract {
inputs: List<State>,
outputs: List<State>,
commands: Collection<AuthenticatedObject<CommandData>>,
token: String): Set<CommandData> {
token: UniqueIdentifier): Set<CommandData> {
val command = tx.commands.requireSingleCommand<Commands.Mature>()
val irs = inputs.filterIsInstance<State>().single()
requireThat {
"No more fixings to be applied" by (irs.calculation.nextFixingDate() == null)
"The irs is fully consumed and there is no id matched output state" by outputs.isEmpty()
}
return setOf(command.value)
@ -656,11 +661,12 @@ class InterestRateSwap() : Contract {
val fixedLeg: FixedLeg,
val floatingLeg: FloatingLeg,
val calculation: Calculation,
val common: Common
val common: Common,
override val linearId: UniqueIdentifier = UniqueIdentifier(common.tradeID)
) : FixableDealState, SchedulableState {
override val contract = IRS_PROGRAM_ID
override val thread = SecureHash.sha256(common.tradeID)
override val ref = common.tradeID
override val participants: List<PublicKey>

View File

@ -1,6 +1,7 @@
package com.r3corda.contracts
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.testing.MockServices
import com.r3corda.core.seconds
import com.r3corda.core.testing.*
@ -394,9 +395,10 @@ class IRSTests {
@Test
fun `ensure failure occurs when there are inbound states for an agreement command`() {
val irs = singleIRS()
transaction {
input() { singleIRS() }
output("irs post agreement") { singleIRS() }
input() { irs }
output("irs post agreement") { irs }
command(MEGA_CORP_PUBKEY) { InterestRateSwap.Commands.Agree() }
timestamp(TEST_TX_TIME)
this `fails with` "There are no in states for an agreement"
@ -665,10 +667,11 @@ class IRSTests {
transaction("Agreement") {
output("irs post agreement2") {
irs.copy(
irs.fixedLeg,
irs.floatingLeg,
irs.calculation,
irs.common.copy(tradeID = "t2")
linearId = UniqueIdentifier("t2"),
fixedLeg = irs.fixedLeg,
floatingLeg = irs.floatingLeg,
calculation = irs.calculation,
common = irs.common.copy(tradeID = "t2")
)
}
command(MEGA_CORP_PUBKEY) { InterestRateSwap.Commands.Agree() }

View File

@ -434,4 +434,22 @@ data class Commodity(val symbol: String,
fun getInstance(symbol: String): Commodity?
= registry[symbol]
}
}
/**
* This class provides a truly unique identifier of a trade, state, or other business object.
* @param externalId If there is an existing weak identifer e.g. trade reference id.
* This should be set here the first time a UniqueIdentifier identifier is created as part of an issue,
* or ledger on-boarding activity. This ensure that the human readable identity is paired with the strong id.
* @param id Should never be set by user code and left as default initialised.
* So that the first time a state is issued this should be given a new UUID.
* Subsequent copies and evolutions of a state should just copy the externalId and Id fields unmodified.
*/
data class UniqueIdentifier(val externalId: String? = null, val id: UUID = UUID.randomUUID()) {
override fun toString(): String {
if (externalId != null) {
return "${externalId}_${id.toString()}"
}
return id.toString()
}
}

View File

@ -1,5 +1,7 @@
package com.r3corda.core.contracts
import com.r3corda.core.contracts.clauses.MatchBehaviour
import com.r3corda.core.contracts.clauses.SingleClause
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.crypto.toStringShort
@ -196,16 +198,41 @@ data class ScheduledStateRef(val ref: StateRef, override val scheduledAt: Instan
data class ScheduledActivity(val logicRef: ProtocolLogicRef, override val scheduledAt: Instant) : Scheduled
/**
* A state that evolves by superseding itself, all of which share the common "thread".
* A state that evolves by superseding itself, all of which share the common "linearId".
*
* This simplifies the job of tracking the current version of certain types of state in e.g. a wallet.
*/
interface LinearState : ContractState {
/** Unique thread id within the wallets of all parties */
val thread: SecureHash
interface LinearState: ContractState {
/**
* Unique id shared by all LinearState states throughout history within the wallets of all parties.
* Verify methods should check that one input and one output share the id in a transaction,
* except at issuance/termination.
*/
val linearId: UniqueIdentifier
/** true if this should be tracked by our wallet(s) */
/**
* True if this should be tracked by our wallet(s).
* */
fun isRelevant(ourKeys: Set<PublicKey>): Boolean
/**
* Standard clause to verify the LinearState safety properties.
*/
class ClauseVerifier<S: LinearState>(val stateClass: Class<S>) : SingleClause {
override val ifMatched = MatchBehaviour.CONTINUE
override val ifNotMatched = MatchBehaviour.ERROR
override val requiredCommands = emptySet<Class<out CommandData>>()
override fun verify(tx: TransactionForContract, commands: Collection<AuthenticatedObject<CommandData>>): Set<CommandData> {
val inputs = tx.inputs.filterIsInstance(stateClass)
val inputIds = inputs.map { it.linearId }.distinct()
require(inputIds.count() == inputs.count()) { "LinearStates cannot be merged" }
val outputs = tx.outputs.filterIsInstance(stateClass)
val outputIds = outputs.map { it.linearId }.distinct()
require(outputIds.count() == outputs.count()) { "LinearStates cannot be split" }
return emptySet()
}
}
}
interface SchedulableState : ContractState {

View File

@ -85,13 +85,13 @@ interface WalletService {
/**
* Returns a snapshot of the heads of LinearStates.
*/
val linearHeads: Map<SecureHash, StateAndRef<LinearState>>
val linearHeads: Map<UniqueIdentifier, StateAndRef<LinearState>>
// TODO: When KT-10399 is fixed, rename this and remove the inline version below.
/** Returns the [linearHeads] only when the type of the state would be considered an 'instanceof' the given type. */
@Suppress("UNCHECKED_CAST")
fun <T : LinearState> linearHeadsOfType_(stateType: Class<T>): Map<SecureHash, StateAndRef<T>> {
fun <T : LinearState> linearHeadsOfType_(stateType: Class<T>): Map<UniqueIdentifier, StateAndRef<T>> {
return linearHeads.filterValues { stateType.isInstance(it.state.data) }.mapValues { StateAndRef(it.value.state as TransactionState<T>, it.value.ref) }
}

View File

@ -2,16 +2,30 @@ package com.r3corda.core.testing
import com.r3corda.core.contracts.Contract
import com.r3corda.core.contracts.LinearState
import com.r3corda.core.contracts.UniqueIdentifier
import com.r3corda.core.contracts.TransactionForContract
import com.r3corda.core.contracts.clauses.verifyClauses
import com.r3corda.core.crypto.SecureHash
import java.security.PublicKey
class DummyLinearState(
override val thread: SecureHash = SecureHash.randomSHA256(),
override val contract: Contract = AlwaysSucceedContract(),
override val participants: List<PublicKey> = listOf(),
val nonce: SecureHash = SecureHash.randomSHA256()) : LinearState {
class DummyLinearContract: Contract {
override val legalContractReference: SecureHash = SecureHash.sha256("Test")
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean {
return participants.any { ourKeys.contains(it) }
override fun verify(tx: TransactionForContract) {
verifyClauses(tx,
listOf(LinearState.ClauseVerifier(State::class.java)),
emptyList())
}
class State(
override val linearId: UniqueIdentifier = UniqueIdentifier(),
override val contract: Contract = DummyLinearContract(),
override val participants: List<PublicKey> = listOf(),
val nonce: SecureHash = SecureHash.randomSHA256()) : LinearState {
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean {
return participants.any { ourKeys.contains(it) }
}
}
}

View File

@ -22,9 +22,6 @@ import javax.annotation.concurrent.ThreadSafe
*/
@ThreadSafe
open class InMemoryWalletService(protected val services: ServiceHub) : SingletonSerializeAsToken(), WalletService {
class ClashingThreads(threads: Set<SecureHash>, transactions: Iterable<WireTransaction>) :
Exception("There are multiple linear head states after processing transactions $transactions. The clashing thread(s): $threads")
open protected val log = loggerFor<InMemoryWalletService>()
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're
@ -46,9 +43,9 @@ open class InMemoryWalletService(protected val services: ServiceHub) : Singleton
/**
* Returns a snapshot of the heads of LinearStates.
*/
override val linearHeads: Map<SecureHash, StateAndRef<LinearState>>
override val linearHeads: Map<UniqueIdentifier, StateAndRef<LinearState>>
get() = currentWallet.let { wallet ->
wallet.states.filterStatesOfType<LinearState>().associateBy { it.state.data.thread }.mapValues { it.value }
wallet.states.filterStatesOfType<LinearState>().associateBy { it.state.data.linearId }.mapValues { it.value }
}
override fun notifyAll(txns: Iterable<WireTransaction>): Wallet {
@ -78,14 +75,6 @@ open class InMemoryWalletService(protected val services: ServiceHub) : Singleton
Pair(wallet, combinedDelta)
}
// TODO: we need to remove the clashing threads concepts and support potential duplicate threads
// because two different nodes can have two different sets of threads and so currently it's possible
// for only one party to have a clash which interferes with determinism of the transactions.
val clashingThreads = walletAndNetDelta.first.clashingThreads
if (!clashingThreads.isEmpty()) {
throw ClashingThreads(clashingThreads, txns)
}
wallet = walletAndNetDelta.first
netDelta = walletAndNetDelta.second
return@locked wallet
@ -133,23 +122,4 @@ open class InMemoryWalletService(protected val services: ServiceHub) : Singleton
return Pair(Wallet(newStates), change)
}
companion object {
// Returns the set of LinearState threads that clash in the wallet
val Wallet.clashingThreads: Set<SecureHash> get() {
val clashingThreads = HashSet<SecureHash>()
val threadsSeen = HashSet<SecureHash>()
for (linearState in states.filterStatesOfType<LinearState>()) {
val thread = linearState.state.data.thread
if (threadsSeen.contains(thread)) {
clashingThreads.add(thread)
} else {
threadsSeen.add(thread)
}
}
return clashingThreads
}
}
}

View File

@ -64,8 +64,8 @@ class Invoice : Contract {
val owner: Party,
val buyer: Party,
val assigned: Boolean,
val props: InvoiceProperties
val props: InvoiceProperties,
override val linearId: UniqueIdentifier = UniqueIdentifier()
) : LinearState {
override val contract = INVOICE_PROGRAM_ID
@ -84,8 +84,6 @@ class Invoice : Contract {
// iterate over the goods list and sum up the price for each
val amount: Amount<Issued<Currency>> get() = props.amount
override val thread = SecureHash.Companion.sha256(props.invoiceID)
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean {
return owner.owningKey in ourKeys || buyer.owningKey in ourKeys
}

View File

@ -9,7 +9,7 @@ import com.r3corda.contracts.InterestRateSwap
import com.r3corda.core.RunOnCallerThread
import com.r3corda.core.contracts.SignedTransaction
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.contracts.UniqueIdentifier
import com.r3corda.core.failure
import com.r3corda.core.node.services.linearHeadsOfType
import com.r3corda.core.node.services.testing.MockIdentityService
@ -80,7 +80,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
val node1: SimulatedNode = banks[i]
val node2: SimulatedNode = banks[j]
val swaps: Map<SecureHash, StateAndRef<InterestRateSwap.State>> = node1.services.walletService.linearHeadsOfType<InterestRateSwap.State>()
val swaps: Map<UniqueIdentifier, StateAndRef<InterestRateSwap.State>> = node1.services.walletService.linearHeadsOfType<InterestRateSwap.State>()
val theDealRef: StateAndRef<InterestRateSwap.State> = swaps.values.single()
// Do we have any more days left in this deal's lifetime? If not, return.

View File

@ -82,7 +82,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
override val participants: List<PublicKey>
get() = throw UnsupportedOperationException()
override val thread = SecureHash.sha256("does not matter but we need it to be unique ${Math.random()}")
override val linearId = UniqueIdentifier()
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean = true

View File

@ -104,56 +104,50 @@ class WalletWithCashTest {
@Test
fun branchingLinearStatesFails() {
fun branchingLinearStatesFailsToVerify() {
val freshKey = services.keyManagementService.freshKey()
val thread = SecureHash.sha256("thread")
val linearId = UniqueIdentifier()
// Issue a linear state
val dummyIssue = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
addOutputState(DummyLinearState(thread = thread, participants = listOf(freshKey.public)))
signWith(freshKey)
signWith(DUMMY_NOTARY_KEY)
}.toSignedTransaction()
wallet.notify(dummyIssue.tx)
assertEquals(1, wallet.currentWallet.states.toList().size)
// Issue another linear state of the same thread (nonce different)
val dummyIssue2 = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
addOutputState(DummyLinearState(thread = thread, participants = listOf(freshKey.public)))
addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public)))
addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public)))
signWith(freshKey)
signWith(DUMMY_NOTARY_KEY)
}.toSignedTransaction()
assertThatThrownBy {
wallet.notify(dummyIssue2.tx)
dummyIssue.toLedgerTransaction(services).verify()
}
assertEquals(1, wallet.currentWallet.states.toList().size)
}
@Test
fun sequencingLinearStatesWorks() {
val freshKey = services.keyManagementService.freshKey()
val thread = SecureHash.sha256("thread")
val linearId = UniqueIdentifier()
// Issue a linear state
val dummyIssue = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
addOutputState(DummyLinearState(thread = thread, participants = listOf(freshKey.public)))
addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public)))
signWith(freshKey)
signWith(DUMMY_NOTARY_KEY)
}.toSignedTransaction()
dummyIssue.toLedgerTransaction(services).verify()
wallet.notify(dummyIssue.tx)
assertEquals(1, wallet.currentWallet.states.toList().size)
// Move the same state
val dummyMove = TransactionType.General.Builder(notary = DUMMY_NOTARY).apply {
addOutputState(DummyLinearState(thread = thread, participants = listOf(freshKey.public)))
addOutputState(DummyLinearContract.State(linearId = linearId, participants = listOf(freshKey.public)))
addInputState(dummyIssue.tx.outRef<LinearState>(0))
signWith(DUMMY_NOTARY_KEY)
}.toSignedTransaction()
dummyIssue.toLedgerTransaction(services).verify()
wallet.notify(dummyMove.tx)
assertEquals(1, wallet.currentWallet.states.toList().size)
}