Add AbstractTransactionSubstitutionProtocol

Abstracts the NotaryChangeProtocol to be extensible for other use-cases.
This commit is contained in:
Ross Nicoll
2016-06-29 13:07:28 +01:00
parent 22e5a5dddc
commit 779034691e
4 changed files with 278 additions and 201 deletions

View File

@ -0,0 +1,244 @@
package protocols
import co.paralleluniverse.fibers.Suspendable
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.signWithECDSA
import com.r3corda.core.messaging.Ack
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.random63BitValue
import com.r3corda.core.utilities.ProgressTracker
import com.r3corda.protocols.AbstractRequestMessage
import com.r3corda.protocols.NotaryProtocol
import com.r3corda.protocols.ResolveTransactionsProtocol
import java.security.PublicKey
/**
* Abstract protocol to be used for replacing one state with another, for example when changing the notary of a state.
* Notably this requires a one to one replacement of states, states cannot be split, merged or issued as part of these
* protocols.
*
* The [Instigator] assembles the transaction for state replacement and sends out change proposals to all participants
* ([Acceptor]) of that state. If participants agree to the proposed change, they each sign the transaction.
* Finally, [Instigator] sends the transaction containing all signatures back to each participant so they can record it and
* use the new updated state for future transactions.
*/
abstract class AbstractStateReplacementProtocol<T> {
interface Proposal<T> {
val stateRef: StateRef
val modification: T
val stx: SignedTransaction
}
class Handshake(val sessionIdForSend: Long,
replyTo: SingleMessageRecipient,
replySessionId: Long) : AbstractRequestMessage(replyTo, replySessionId)
abstract class Instigator<S : ContractState, T>(val originalState: StateAndRef<S>,
val modification: T,
override val progressTracker: ProgressTracker = tracker()) : ProtocolLogic<StateAndRef<S>>() {
companion object {
object SIGNING : ProgressTracker.Step("Requesting signatures from other parties")
object NOTARY : ProgressTracker.Step("Requesting notary signature")
fun tracker() = ProgressTracker(SIGNING, NOTARY)
}
abstract val TOPIC_CHANGE: String
abstract val TOPIC_INITIATE: String
@Suspendable
override fun call(): StateAndRef<S> {
val (stx, participants) = assembleTx()
progressTracker.currentStep = SIGNING
val myKey = serviceHub.storageService.myLegalIdentity.owningKey
val me = listOf(myKey)
val signatures = if (participants == me) {
listOf(getNotarySignature(stx))
} else {
collectSignatures(participants - me, stx)
}
val finalTx = stx + signatures
serviceHub.recordTransactions(listOf(finalTx))
return finalTx.tx.outRef(0)
}
abstract internal fun assembleProposal(stateRef: StateRef, modification: T, stx: SignedTransaction): Proposal<T>
abstract internal fun assembleTx(): Pair<SignedTransaction, List<PublicKey>>
@Suspendable
private fun collectSignatures(participants: List<PublicKey>, stx: SignedTransaction): List<DigitalSignature.WithKey> {
val sessions = mutableMapOf<NodeInfo, Long>()
val participantSignatures = participants.map {
val participantNode = serviceHub.networkMapCache.getNodeByPublicKey(it) ?:
throw IllegalStateException("Participant $it to state $originalState not found on the network")
val sessionIdForSend = random63BitValue()
sessions[participantNode] = sessionIdForSend
getParticipantSignature(participantNode, stx, sessionIdForSend)
}
val allSignatures = participantSignatures + getNotarySignature(stx)
sessions.forEach { send(TOPIC_CHANGE, it.key.address, it.value, allSignatures) }
return allSignatures
}
@Suspendable
private fun getParticipantSignature(node: NodeInfo, stx: SignedTransaction, sessionIdForSend: Long): DigitalSignature.WithKey {
val sessionIdForReceive = random63BitValue()
val proposal = assembleProposal(originalState.ref, modification, stx)
val handshake = Handshake(sessionIdForSend, serviceHub.networkService.myAddress, sessionIdForReceive)
sendAndReceive<Ack>(TOPIC_INITIATE, node.address, 0, sessionIdForReceive, handshake)
val response = sendAndReceive<Result>(TOPIC_CHANGE, node.address, sessionIdForSend, sessionIdForReceive, proposal)
val participantSignature = response.validate {
if (it.sig == null) throw StateReplacementException(it.error!!)
else {
check(it.sig.by == node.identity.owningKey) { "Not signed by the required participant" }
it.sig.verifyWithECDSA(stx.txBits)
it.sig
}
}
return participantSignature
}
@Suspendable
private fun getNotarySignature(stx: SignedTransaction): DigitalSignature.LegallyIdentifiable {
progressTracker.currentStep = NOTARY
return subProtocol(NotaryProtocol.Client(stx))
}
}
abstract class Acceptor<T>(val otherSide: SingleMessageRecipient,
val sessionIdForSend: Long,
val sessionIdForReceive: Long,
override val progressTracker: ProgressTracker = tracker()) : ProtocolLogic<Unit>() {
companion object {
object VERIFYING : ProgressTracker.Step("Verifying state replacement proposal")
object APPROVING : ProgressTracker.Step("State replacement approved")
object REJECTING : ProgressTracker.Step("State replacement rejected")
fun tracker() = ProgressTracker(VERIFYING, APPROVING, REJECTING)
}
abstract val TOPIC_CHANGE: String
abstract val TOPIC_INITIATE: String
@Suspendable
override fun call() {
progressTracker.currentStep = VERIFYING
val proposal = receive<Proposal<T>>(TOPIC_CHANGE, sessionIdForReceive).validate { it }
try {
verifyProposal(proposal)
verifyTx(proposal.stx)
} catch(e: Exception) {
// TODO: catch only specific exceptions. However, there are numerous validation exceptions
// that might occur (tx validation/resolution, invalid proposal). Need to rethink how
// we manage exceptions and maybe introduce some platform exception hierarchy
val myIdentity = serviceHub.storageService.myLegalIdentity
val state = proposal.stateRef
val reason = StateReplacementRefused(myIdentity, state, e.message)
reject(reason)
return
}
approve(proposal.stx)
}
@Suspendable
private fun approve(stx: SignedTransaction) {
progressTracker.currentStep = APPROVING
val mySignature = sign(stx)
val response = Result.noError(mySignature)
val swapSignatures = sendAndReceive<List<DigitalSignature.WithKey>>(TOPIC_CHANGE, otherSide, sessionIdForSend, sessionIdForReceive, response)
val allSignatures = swapSignatures.validate { signatures ->
signatures.forEach { it.verifyWithECDSA(stx.txBits) }
signatures
}
val finalTx = stx + allSignatures
finalTx.verify()
serviceHub.recordTransactions(listOf(finalTx))
}
@Suspendable
private fun reject(e: StateReplacementRefused) {
progressTracker.currentStep = REJECTING
val response = Result.withError(e)
send(TOPIC_CHANGE, otherSide, sessionIdForSend, response)
}
/**
* Check the state change proposal to confirm that it's acceptable to this node. Rules for verification depend
* on the change proposed, and may further depend on the node itself (for example configuration).
*/
abstract internal fun verifyProposal(proposal: Proposal<T>)
@Suspendable
private fun verifyTx(stx: SignedTransaction) {
checkMySignatureRequired(stx.tx)
checkDependenciesValid(stx)
checkValid(stx)
}
private fun checkMySignatureRequired(tx: WireTransaction) {
// TODO: use keys from the keyManagementService instead
val myKey = serviceHub.storageService.myLegalIdentity.owningKey
require(tx.signers.contains(myKey)) { "Party is not a participant for any of the input states of transaction ${tx.id}" }
}
@Suspendable
private fun checkDependenciesValid(stx: SignedTransaction) {
val dependencyTxIDs = stx.tx.inputs.map { it.txhash }.toSet()
subProtocol(ResolveTransactionsProtocol(dependencyTxIDs, otherSide))
}
private fun checkValid(stx: SignedTransaction) {
val ltx = stx.tx.toLedgerTransaction(serviceHub.identityService, serviceHub.storageService.attachments)
serviceHub.verifyTransaction(ltx)
}
private fun sign(stx: SignedTransaction): DigitalSignature.WithKey {
val myKeyPair = serviceHub.storageService.myLegalIdentityKey
return myKeyPair.signWithECDSA(stx.txBits)
}
}
// TODO: similar classes occur in other places (NotaryProtocol), need to consolidate
data class Result private constructor(val sig: DigitalSignature.WithKey?, val error: StateReplacementRefused?) {
companion object {
fun withError(error: StateReplacementRefused) = Result(null, error)
fun noError(sig: DigitalSignature.WithKey) = Result(sig, null)
}
}
}
/** Thrown when a participant refuses proposed the state replacement */
class StateReplacementRefused(val identity: Party, val state: StateRef, val detail: String?) {
override fun toString(): String
= "A participant $identity refused to change state $state"
}
class StateReplacementException(val error: StateReplacementRefused)
: Exception("State change failed - ${error}")

View File

@ -25,53 +25,30 @@ import java.security.PublicKey
* Finally, [Instigator] sends the transaction containing all signatures back to each participant so they can record it and * Finally, [Instigator] sends the transaction containing all signatures back to each participant so they can record it and
* use the new updated state for future transactions. * use the new updated state for future transactions.
*/ */
object NotaryChangeProtocol { object NotaryChangeProtocol: AbstractStateReplacementProtocol<Party>() {
val TOPIC_INITIATE = "platform.notary.change.initiate" val TOPIC_INITIATE = "platform.notary.change.initiate"
val TOPIC_CHANGE = "platform.notary.change.execute" val TOPIC_CHANGE = "platform.notary.change.execute"
data class Proposal(val stateRef: StateRef, data class Proposal(override val stateRef: StateRef,
val newNotary: Party, override val modification: Party,
val stx: SignedTransaction) override val stx: SignedTransaction) : AbstractStateReplacementProtocol.Proposal<Party>
class Handshake(val sessionIdForSend: Long, class Instigator<T : ContractState>(originalState: StateAndRef<T>,
replyTo: SingleMessageRecipient, newNotary: Party,
replySessionId: Long) : AbstractRequestMessage(replyTo, replySessionId) progressTracker: ProgressTracker = tracker())
: AbstractStateReplacementProtocol.Instigator<T, Party>(originalState, newNotary, progressTracker) {
class Instigator<T : ContractState>(val originalState: StateAndRef<T>, override val TOPIC_CHANGE: String
val newNotary: Party, get() = NotaryChangeProtocol.TOPIC_CHANGE
override val progressTracker: ProgressTracker = tracker()) : ProtocolLogic<StateAndRef<T>>() { override val TOPIC_INITIATE: String
companion object { get() = NotaryChangeProtocol.TOPIC_INITIATE
object SIGNING : ProgressTracker.Step("Requesting signatures from other parties") override fun assembleProposal(stateRef: StateRef, modification: Party, stx: SignedTransaction): AbstractStateReplacementProtocol.Proposal<Party>
= NotaryChangeProtocol.Proposal(stateRef, modification, stx)
object NOTARY : ProgressTracker.Step("Requesting current Notary signature") override fun assembleTx(): Pair<SignedTransaction, List<PublicKey>> {
fun tracker() = ProgressTracker(SIGNING, NOTARY)
}
@Suspendable
override fun call(): StateAndRef<T> {
val (stx, participants) = assembleTx()
progressTracker.currentStep = SIGNING
val myKey = serviceHub.storageService.myLegalIdentity.owningKey
val me = listOf(myKey)
val signatures = if (participants == me) {
listOf(getNotarySignature(stx))
} else {
collectSignatures(participants - me, stx)
}
val finalTx = stx + signatures
serviceHub.recordTransactions(listOf(finalTx))
return finalTx.tx.outRef(0)
}
private fun assembleTx(): Pair<SignedTransaction, List<PublicKey>> {
val state = originalState.state val state = originalState.state
val newState = state.withNewNotary(newNotary) val newState = state.withNewNotary(modification)
val participants = state.data.participants val participants = state.data.participants
val tx = TransactionType.NotaryChange.Builder().withItems(originalState, newState) val tx = TransactionType.NotaryChange.Builder().withItems(originalState, newState)
tx.signWith(serviceHub.storageService.myLegalIdentityKey) tx.signWith(serviceHub.storageService.myLegalIdentityKey)
@ -79,116 +56,17 @@ object NotaryChangeProtocol {
val stx = tx.toSignedTransaction(false) val stx = tx.toSignedTransaction(false)
return Pair(stx, participants) return Pair(stx, participants)
} }
@Suspendable
private fun collectSignatures(participants: List<PublicKey>, stx: SignedTransaction): List<DigitalSignature.WithKey> {
val sessions = mutableMapOf<NodeInfo, Long>()
val participantSignatures = participants.map {
val participantNode = serviceHub.networkMapCache.getNodeByPublicKey(it) ?:
throw IllegalStateException("Participant $it to state $originalState not found on the network")
val sessionIdForSend = random63BitValue()
sessions[participantNode] = sessionIdForSend
getParticipantSignature(participantNode, stx, sessionIdForSend)
}
val allSignatures = participantSignatures + getNotarySignature(stx)
sessions.forEach { send(TOPIC_CHANGE, it.key.address, it.value, allSignatures) }
return allSignatures
}
@Suspendable
private fun getParticipantSignature(node: NodeInfo, stx: SignedTransaction, sessionIdForSend: Long): DigitalSignature.WithKey {
val sessionIdForReceive = random63BitValue()
val proposal = Proposal(originalState.ref, newNotary, stx)
val handshake = Handshake(sessionIdForSend, serviceHub.networkService.myAddress, sessionIdForReceive)
sendAndReceive<Ack>(TOPIC_INITIATE, node.address, 0, sessionIdForReceive, handshake)
val response = sendAndReceive<Result>(TOPIC_CHANGE, node.address, sessionIdForSend, sessionIdForReceive, proposal)
val participantSignature = response.validate {
if (it.sig == null) throw NotaryChangeException(it.error!!)
else {
check(it.sig.by == node.identity.owningKey) { "Not signed by the required participant" }
it.sig.verifyWithECDSA(stx.txBits)
it.sig
}
}
return participantSignature
}
@Suspendable
private fun getNotarySignature(stx: SignedTransaction): DigitalSignature.LegallyIdentifiable {
progressTracker.currentStep = NOTARY
return subProtocol(NotaryProtocol.Client(stx))
}
} }
class Acceptor(val otherSide: SingleMessageRecipient, class Acceptor(otherSide: SingleMessageRecipient,
val sessionIdForSend: Long, sessionIdForSend: Long,
val sessionIdForReceive: Long, sessionIdForReceive: Long,
override val progressTracker: ProgressTracker = tracker()) : ProtocolLogic<Unit>() { override val progressTracker: ProgressTracker = tracker())
: AbstractStateReplacementProtocol.Acceptor<Party>(otherSide, sessionIdForSend, sessionIdForReceive) {
companion object { override val TOPIC_CHANGE: String
object VERIFYING : ProgressTracker.Step("Verifying Notary change proposal") get() = NotaryChangeProtocol.TOPIC_CHANGE
override val TOPIC_INITIATE: String
object APPROVING : ProgressTracker.Step("Notary change approved") get() = NotaryChangeProtocol.TOPIC_INITIATE
object REJECTING : ProgressTracker.Step("Notary change rejected")
fun tracker() = ProgressTracker(VERIFYING, APPROVING, REJECTING)
}
@Suspendable
override fun call() {
progressTracker.currentStep = VERIFYING
val proposal = receive<Proposal>(TOPIC_CHANGE, sessionIdForReceive).validate { it }
try {
verifyProposal(proposal)
verifyTx(proposal.stx)
} catch(e: Exception) {
// TODO: catch only specific exceptions. However, there are numerous validation exceptions
// that might occur (tx validation/resolution, invalid proposal). Need to rethink how
// we manage exceptions and maybe introduce some platform exception hierarchy
val myIdentity = serviceHub.storageService.myLegalIdentity
val state = proposal.stateRef
val reason = NotaryChangeRefused(myIdentity, state, e.message)
reject(reason)
return
}
approve(proposal.stx)
}
@Suspendable
private fun approve(stx: SignedTransaction) {
progressTracker.currentStep = APPROVING
val mySignature = sign(stx)
val response = Result.noError(mySignature)
val swapSignatures = sendAndReceive<List<DigitalSignature.WithKey>>(TOPIC_CHANGE, otherSide, sessionIdForSend, sessionIdForReceive, response)
val allSignatures = swapSignatures.validate { signatures ->
signatures.forEach { it.verifyWithECDSA(stx.txBits) }
signatures
}
val finalTx = stx + allSignatures
finalTx.verify()
serviceHub.recordTransactions(listOf(finalTx))
}
@Suspendable
private fun reject(e: NotaryChangeRefused) {
progressTracker.currentStep = REJECTING
val response = Result.withError(e)
send(TOPIC_CHANGE, otherSide, sessionIdForSend, response)
}
/** /**
* Check the notary change proposal. * Check the notary change proposal.
@ -198,8 +76,8 @@ object NotaryChangeProtocol {
* TODO: In more difficult cases this should call for human attention to manually verify and approve the proposal * TODO: In more difficult cases this should call for human attention to manually verify and approve the proposal
*/ */
@Suspendable @Suspendable
private fun verifyProposal(proposal: NotaryChangeProtocol.Proposal) { override fun verifyProposal(proposal: AbstractStateReplacementProtocol.Proposal<Party>) {
val newNotary = proposal.newNotary val newNotary = proposal.modification
val isNotary = serviceHub.networkMapCache.notaryNodes.any { it.identity == newNotary } val isNotary = serviceHub.networkMapCache.notaryNodes.any { it.identity == newNotary }
require(isNotary) { "The proposed node $newNotary does not run a Notary service " } require(isNotary) { "The proposed node $newNotary does not run a Notary service " }
@ -211,51 +89,5 @@ object NotaryChangeProtocol {
val blacklist = listOf("Evil Notary") val blacklist = listOf("Evil Notary")
require(!blacklist.contains(newNotary.name)) { "The proposed new notary $newNotary is not trusted by the party" } require(!blacklist.contains(newNotary.name)) { "The proposed new notary $newNotary is not trusted by the party" }
} }
@Suspendable
private fun verifyTx(stx: SignedTransaction) {
checkMySignatureRequired(stx.tx)
checkDependenciesValid(stx)
checkValid(stx)
}
private fun checkMySignatureRequired(tx: WireTransaction) {
// TODO: use keys from the keyManagementService instead
val myKey = serviceHub.storageService.myLegalIdentity.owningKey
require(tx.signers.contains(myKey)) { "Party is not a participant for any of the input states of transaction ${tx.id}" }
}
@Suspendable
private fun checkDependenciesValid(stx: SignedTransaction) {
val dependencyTxIDs = stx.tx.inputs.map { it.txhash }.toSet()
subProtocol(ResolveTransactionsProtocol(dependencyTxIDs, otherSide))
}
private fun checkValid(stx: SignedTransaction) {
val ltx = stx.tx.toLedgerTransaction(serviceHub.identityService, serviceHub.storageService.attachments)
serviceHub.verifyTransaction(ltx)
}
private fun sign(stx: SignedTransaction): DigitalSignature.WithKey {
val myKeyPair = serviceHub.storageService.myLegalIdentityKey
return myKeyPair.signWithECDSA(stx.txBits)
}
} }
// TODO: similar classes occur in other places (NotaryProtocol), need to consolidate
data class Result private constructor(val sig: DigitalSignature.WithKey?, val error: NotaryChangeRefused?) {
companion object {
fun withError(error: NotaryChangeRefused) = Result(null, error)
fun noError(sig: DigitalSignature.WithKey) = Result(sig, null)
}
}
}
/** Thrown when a participant refuses to change the notary of the state */
class NotaryChangeRefused(val identity: Party, val state: StateRef, val cause: String?) {
override fun toString() = "A participant $identity refused to change the notary of state $state"
}
class NotaryChangeException(val error: NotaryChangeRefused) : Exception() {
override fun toString() = "${super.toString()}: Notary change failed - ${error.toString()}"
} }

View File

@ -5,6 +5,7 @@ import com.r3corda.core.messaging.MessagingService
import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.node.services.api.AbstractNodeService import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.services.statemachine.StateMachineManager
import protocols.AbstractStateReplacementProtocol
import protocols.NotaryChangeProtocol import protocols.NotaryChangeProtocol
/** /**
@ -14,11 +15,11 @@ import protocols.NotaryChangeProtocol
class NotaryChangeService(net: MessagingService, val smm: StateMachineManager) : AbstractNodeService(net) { class NotaryChangeService(net: MessagingService, val smm: StateMachineManager) : AbstractNodeService(net) {
init { init {
addMessageHandler(NotaryChangeProtocol.TOPIC_INITIATE, addMessageHandler(NotaryChangeProtocol.TOPIC_INITIATE,
{ req: NotaryChangeProtocol.Handshake -> handleChangeNotaryRequest(req) } { req: AbstractStateReplacementProtocol.Handshake -> handleChangeNotaryRequest(req) }
) )
} }
private fun handleChangeNotaryRequest(req: NotaryChangeProtocol.Handshake): Ack { private fun handleChangeNotaryRequest(req: AbstractStateReplacementProtocol.Handshake): Ack {
val protocol = NotaryChangeProtocol.Acceptor( val protocol = NotaryChangeProtocol.Acceptor(
req.replyTo as SingleMessageRecipient, req.replyTo as SingleMessageRecipient,
req.sessionID!!, req.sessionID!!,

View File

@ -11,10 +11,10 @@ import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.transactions.SimpleNotaryService
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import protocols.NotaryChangeException import protocols.StateReplacementException
import protocols.StateReplacementRefused
import protocols.NotaryChangeProtocol import protocols.NotaryChangeProtocol
import protocols.NotaryChangeProtocol.Instigator import protocols.NotaryChangeProtocol.Instigator
import protocols.NotaryChangeRefused
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
@ -80,8 +80,8 @@ class NotaryChangeTests {
net.runNetwork() net.runNetwork()
val ex = assertFailsWith(ExecutionException::class) { future.get() } val ex = assertFailsWith(ExecutionException::class) { future.get() }
val error = (ex.cause as NotaryChangeException).error val error = (ex.cause as StateReplacementException).error
assertTrue(error is NotaryChangeRefused) assertTrue(error is StateReplacementRefused)
} }
// TODO: Add more test cases once we have a general protocol/service exception handling mechanism: // TODO: Add more test cases once we have a general protocol/service exception handling mechanism: