mirror of
https://github.com/corda/corda.git
synced 2025-02-20 17:33:15 +00:00
[CORDA-1828] Documentation update for observer nodes (#4443)
CORDA-1828 Documentation update for observer nodes * Update observer node documentation * Added test to catch original issue if seen again
This commit is contained in:
parent
3c13c07c09
commit
39e5dc5749
@ -46,3 +46,9 @@ Caveats
|
||||
* Nodes only record each transaction once. If a node has already recorded a transaction in non-observer mode, it cannot
|
||||
later re-record the same transaction as an observer. This issue is tracked here:
|
||||
https://r3-cev.atlassian.net/browse/CORDA-883
|
||||
|
||||
* Observer nodes will only record the states of the transactions sent to them, and not any states from any previous
|
||||
transactions in the chain. If the observer node is required to follow the creation and deletion of states, then each
|
||||
transaction in the chain involving those states must be sent individually. This is because the observer node does not
|
||||
necessarily have any visibility into the states of intermediate transactions, and so cannot always determine whether
|
||||
a previous state has been consumed when a new transaction is received.
|
||||
|
@ -0,0 +1,70 @@
|
||||
package net.corda.node.services.persistence
|
||||
|
||||
import net.corda.core.contracts.*
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.QueryableState
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Table
|
||||
|
||||
@CordaSerializable
|
||||
data class MessageData(val value: String)
|
||||
|
||||
data class MessageChainState(val message: MessageData, val by: Party, override val linearId: UniqueIdentifier = UniqueIdentifier()) : LinearState, QueryableState {
|
||||
override val participants: List<AbstractParty> = listOf(by)
|
||||
|
||||
override fun generateMappedObject(schema: MappedSchema): PersistentState {
|
||||
return when (schema) {
|
||||
is MessageChainSchemaV1 -> MessageChainSchemaV1.PersistentMessage(
|
||||
by = by.name.toString(),
|
||||
value = message.value
|
||||
)
|
||||
else -> throw IllegalArgumentException("Unrecognised schema $schema")
|
||||
}
|
||||
}
|
||||
|
||||
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(MessageChainSchemaV1)
|
||||
}
|
||||
|
||||
object MessageChainSchema
|
||||
object MessageChainSchemaV1 : MappedSchema(
|
||||
schemaFamily = MessageChainSchema.javaClass,
|
||||
version = 1,
|
||||
mappedTypes = listOf(PersistentMessage::class.java)) {
|
||||
|
||||
@Entity
|
||||
@Table(name = "messages")
|
||||
class PersistentMessage(
|
||||
@Column(name = "message_by", nullable = false)
|
||||
var by: String,
|
||||
|
||||
@Column(name = "message_value", nullable = false)
|
||||
var value: String
|
||||
) : PersistentState()
|
||||
}
|
||||
|
||||
const val MESSAGE_CHAIN_CONTRACT_PROGRAM_ID = "net.corda.node.services.persistence.MessageChainContract"
|
||||
|
||||
open class MessageChainContract : Contract {
|
||||
override fun verify(tx: LedgerTransaction) {
|
||||
val command = tx.commands.requireSingleCommand<Commands.Send>()
|
||||
requireThat {
|
||||
// Generic constraints around the IOU transaction.
|
||||
"Only one output state should be created." using (tx.outputs.size == 1)
|
||||
val out = tx.outputsOfType<MessageChainState>().single()
|
||||
"Message sender must sign." using (command.signers.containsAll(out.participants.map { it.owningKey }))
|
||||
|
||||
"Message value must not be empty." using (out.message.value.isNotBlank())
|
||||
}
|
||||
}
|
||||
|
||||
interface Commands : CommandData {
|
||||
class Send : Commands
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,186 @@
|
||||
package net.corda.node.services.persistence
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.Command
|
||||
import net.corda.core.contracts.StateAndContract
|
||||
import net.corda.core.contracts.StateAndRef
|
||||
import net.corda.core.flows.*
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.packageName
|
||||
import net.corda.core.node.StatesToRecord
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.cordappWithPackages
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNotNull
|
||||
|
||||
class ObserverNodeTransactionTests {
|
||||
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
mockNet = InternalMockNetwork(
|
||||
cordappsForAllNodes = listOf(cordappWithPackages(MessageChainState::class.packageName)),
|
||||
networkSendManuallyPumped = false,
|
||||
threadPerNode = true)
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `Broadcasting an old transaction does not cause 2 unconsumed states`() {
|
||||
val node = mockNet.createPartyNode(ALICE_NAME)
|
||||
val regulator = mockNet.createPartyNode(BOB_NAME)
|
||||
val notary = mockNet.defaultNotaryIdentity
|
||||
regulator.registerInitiatedFlow(ReceiveReportedTransaction::class.java)
|
||||
|
||||
fun buildTransactionChain(initialMessage: MessageData, chainLength: Int) {
|
||||
node.services.startFlow(StartMessageChainFlow(initialMessage, notary)).resultFuture.getOrThrow()
|
||||
var result = node.services.vaultService.queryBy(MessageChainState::class.java).states.filter {
|
||||
it.state.data.message.value.startsWith(initialMessage.value)
|
||||
}.singleOrNull()
|
||||
|
||||
for (_i in 0.until(chainLength -1 )) {
|
||||
node.services.startFlow(ContinueMessageChainFlow(result!!, notary)).resultFuture.getOrThrow()
|
||||
result = node.services.vaultService.queryBy(MessageChainState::class.java).states.filter {
|
||||
it.state.data.message.value.startsWith(initialMessage.value)
|
||||
}.singleOrNull()
|
||||
}
|
||||
}
|
||||
|
||||
fun sendTransactionToObserver(transactionIdx: Int) {
|
||||
val transactionList = node.services.validatedTransactions.track().snapshot
|
||||
node.services.startFlow(ReportToCounterparty(regulator.info.singleIdentity(), transactionList[transactionIdx])).resultFuture.getOrThrow()
|
||||
}
|
||||
|
||||
fun checkObserverTransactions(expectedMessage: MessageData) {
|
||||
val regulatorStates = regulator.services.vaultService.queryBy(MessageChainState::class.java).states.filter {
|
||||
it.state.data.message.value.startsWith(expectedMessage.value[0])
|
||||
}
|
||||
|
||||
assertNotNull(regulatorStates, "Could not find any regulator states")
|
||||
assertEquals(1, regulatorStates.size, "Incorrect number of unconsumed regulator states")
|
||||
val retrievedMessage = regulatorStates.singleOrNull()!!.state.data.message
|
||||
assertEquals(expectedMessage, retrievedMessage, "Final unconsumed regulator state is incorrect")
|
||||
}
|
||||
|
||||
// Check that sending an old transaction doesn't result in a new unconsumed state
|
||||
val message = MessageData("A")
|
||||
buildTransactionChain(message, 4)
|
||||
sendTransactionToObserver(3)
|
||||
sendTransactionToObserver(1)
|
||||
val outputMessage = MessageData("AAAA")
|
||||
checkObserverTransactions(outputMessage)
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class StartMessageChainFlow(private val message: MessageData, private val notary: Party) : FlowLogic<SignedTransaction>() {
|
||||
companion object {
|
||||
object GENERATING_TRANSACTION : ProgressTracker.Step("Generating transaction based on the message.")
|
||||
object VERIFYING_TRANSACTION : ProgressTracker.Step("Verifying contract constraints.")
|
||||
object SIGNING_TRANSACTION : ProgressTracker.Step("Signing transaction with our private key.")
|
||||
object FINALISING_TRANSACTION : ProgressTracker.Step("Obtaining notary signature and recording transaction.") {
|
||||
override fun childProgressTracker() = FinalityFlow.tracker()
|
||||
}
|
||||
|
||||
fun tracker() = ProgressTracker(GENERATING_TRANSACTION, VERIFYING_TRANSACTION, SIGNING_TRANSACTION, FINALISING_TRANSACTION)
|
||||
}
|
||||
|
||||
override val progressTracker = tracker()
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
progressTracker.currentStep = GENERATING_TRANSACTION
|
||||
|
||||
val messageState = MessageChainState(message = message, by = ourIdentity)
|
||||
val txCommand = Command(MessageChainContract.Commands.Send(), messageState.participants.map { it.owningKey })
|
||||
val txBuilder = TransactionBuilder(notary).withItems(StateAndContract(messageState, MESSAGE_CHAIN_CONTRACT_PROGRAM_ID), txCommand)
|
||||
|
||||
progressTracker.currentStep = VERIFYING_TRANSACTION
|
||||
txBuilder.toWireTransaction(serviceHub).toLedgerTransaction(serviceHub).verify()
|
||||
|
||||
progressTracker.currentStep = SIGNING_TRANSACTION
|
||||
val signedTx = serviceHub.signInitialTransaction(txBuilder)
|
||||
|
||||
progressTracker.currentStep = FINALISING_TRANSACTION
|
||||
return subFlow(FinalityFlow(signedTx, emptyList(), FINALISING_TRANSACTION.childProgressTracker()))
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class ContinueMessageChainFlow(private val stateRef: StateAndRef<MessageChainState>,
|
||||
private val notary: Party) : FlowLogic<SignedTransaction>() {
|
||||
companion object {
|
||||
object GENERATING_TRANSACTION : ProgressTracker.Step("Generating transaction based on the message.")
|
||||
object VERIFYING_TRANSACTION : ProgressTracker.Step("Verifying contract constraints.")
|
||||
object SIGNING_TRANSACTION : ProgressTracker.Step("Signing transaction with our private key.")
|
||||
object FINALISING_TRANSACTION : ProgressTracker.Step("Obtaining notary signature and recording transaction.") {
|
||||
override fun childProgressTracker() = FinalityFlow.tracker()
|
||||
}
|
||||
|
||||
fun tracker() = ProgressTracker(GENERATING_TRANSACTION, VERIFYING_TRANSACTION, SIGNING_TRANSACTION, FINALISING_TRANSACTION)
|
||||
}
|
||||
|
||||
override val progressTracker = tracker()
|
||||
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
progressTracker.currentStep = GENERATING_TRANSACTION
|
||||
|
||||
val oldMessageState = stateRef.state.data
|
||||
val messageState = MessageChainState(MessageData(oldMessageState.message.value + "A"),
|
||||
ourIdentity,
|
||||
stateRef.state.data.linearId)
|
||||
val txCommand = Command(MessageChainContract.Commands.Send(), messageState.participants.map { it.owningKey })
|
||||
val txBuilder = TransactionBuilder(notary).withItems(
|
||||
StateAndContract(messageState, MESSAGE_CHAIN_CONTRACT_PROGRAM_ID),
|
||||
txCommand,
|
||||
stateRef)
|
||||
|
||||
progressTracker.currentStep = VERIFYING_TRANSACTION
|
||||
txBuilder.toWireTransaction(serviceHub).toLedgerTransaction(serviceHub).verify()
|
||||
|
||||
progressTracker.currentStep = SIGNING_TRANSACTION
|
||||
val signedTx = serviceHub.signInitialTransaction(txBuilder)
|
||||
|
||||
progressTracker.currentStep = FINALISING_TRANSACTION
|
||||
return subFlow(FinalityFlow(signedTx, emptyList(), FINALISING_TRANSACTION.childProgressTracker()))
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class ReportToCounterparty(
|
||||
private val regulator: Party,
|
||||
private val signedTx: SignedTransaction) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val session = initiateFlow(regulator)
|
||||
subFlow(SendTransactionFlow(session, signedTx))
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(ReportToCounterparty::class)
|
||||
class ReceiveReportedTransaction(private val otherSideSession: FlowSession) : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
subFlow(ReceiveTransactionFlow(otherSideSession, true, StatesToRecord.ALL_VISIBLE))
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user