mirror of
https://github.com/corda/corda.git
synced 2025-04-07 11:27:01 +00:00
Merge branch 'master' of https://bitbucket.org/R3-CEV/r3prototyping
This commit is contained in:
commit
0bcecac7f6
@ -1,12 +0,0 @@
|
||||
package com.r3corda.contracts.tradefinance
|
||||
|
||||
import java.security.PublicKey
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* A notice which can be attached to a receivable.
|
||||
*/
|
||||
sealed class Notice(val id: UUID, val owner: PublicKey) {
|
||||
class OwnershipInterest(id: UUID, owner: PublicKey) : Notice(id, owner)
|
||||
class Objection(id: UUID, owner: PublicKey) : Notice(id, owner)
|
||||
}
|
@ -1,305 +0,0 @@
|
||||
package com.r3corda.contracts.tradefinance
|
||||
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.contracts.clauses.*
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.isOrderedAndUnique
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.OpaqueBytes
|
||||
import com.r3corda.core.utilities.NonEmptySet
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.time.LocalDate
|
||||
import java.time.ZonedDateTime
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* Contract for managing lifecycle of a receivable which is recorded on the distributed ledger. These are entered by
|
||||
* a third party (typically a potential creditor), and then shared by the trade finance registry, allowing others to
|
||||
* attach/detach notices of ownership interest/objection.
|
||||
*
|
||||
* States of this contract *are not* fungible, and as such special rules apply. States must be unique within the
|
||||
* inputs/outputs, and strictly ordered, in order to make it easy to verify that outputs match the inputs except where
|
||||
* commands mean there are changes.
|
||||
*/
|
||||
class Receivable : Contract {
|
||||
data class State(override val linearId: UniqueIdentifier = UniqueIdentifier(),
|
||||
val created: ZonedDateTime, // When the underlying receivable was raised
|
||||
val registered: Instant, // When the receivable was added to the registry
|
||||
val payer: Party,
|
||||
val payee: Party,
|
||||
val payerRef: OpaqueBytes?,
|
||||
val payeeRef: OpaqueBytes?,
|
||||
val value: Amount<Issued<Currency>>,
|
||||
val attachments: Set<SecureHash>,
|
||||
val notices: List<Notice>,
|
||||
override val owner: PublicKey) : OwnableState, LinearState {
|
||||
override val contract: Contract = Receivable()
|
||||
override val participants: List<PublicKey> = listOf(owner)
|
||||
override fun isRelevant(ourKeys: Set<PublicKey>): Boolean
|
||||
= ourKeys.contains(payer.owningKey) || ourKeys.contains(payee.owningKey) || ourKeys.contains(owner)
|
||||
override fun withNewOwner(newOwner: PublicKey): Pair<CommandData, OwnableState>
|
||||
= Pair(Commands.Move(null, mapOf(Pair(linearId, newOwner))), copy(owner = newOwner))
|
||||
}
|
||||
|
||||
interface Commands : CommandData {
|
||||
val changed: Iterable<UniqueIdentifier>
|
||||
data class Issue(override val changed: NonEmptySet<UniqueIdentifier>,
|
||||
override val nonce: Long = random63BitValue()) : IssueCommand, Commands
|
||||
data class Move(override val contractHash: SecureHash?, val changes: Map<UniqueIdentifier, PublicKey>) : MoveCommand, Commands {
|
||||
override val changed: Iterable<UniqueIdentifier> = changes.keys
|
||||
}
|
||||
data class Note(val changes: Map<UniqueIdentifier, Diff<Notice>>) : Commands {
|
||||
override val changed: Iterable<UniqueIdentifier> = changes.keys
|
||||
}
|
||||
// TODO: Write Amend clause, possibly to merge into Move
|
||||
/* data class Amend(val id: UniqueIdentifier,
|
||||
val payer: Party,
|
||||
val payee: Party,
|
||||
val payerRef: OpaqueBytes?,
|
||||
val payeeRef: OpaqueBytes?,
|
||||
val value: Amount<Issued<Currency>>,
|
||||
val attachments: Set<SecureHash>) : Commands */
|
||||
data class Exit(override val changed: NonEmptySet<UniqueIdentifier>) : Commands
|
||||
}
|
||||
|
||||
data class Diff<T : Any>(val added: List<T>, val removed: List<T>)
|
||||
|
||||
interface Clauses {
|
||||
/**
|
||||
* Assert that each input/output state is unique within that list of states, and that states are ordered. There
|
||||
* should never be the same receivable twice in a transaction. Uniqueness is also enforced by the notary,
|
||||
* but we get the check as a side-effect of comparing states, so the duplication is acceptable.
|
||||
*/
|
||||
class StatesAreOrderedAndUnique : Clause<State, Commands, Unit>() {
|
||||
override fun verify(tx: TransactionForContract,
|
||||
inputs: List<State>,
|
||||
outputs: List<State>,
|
||||
commands: List<AuthenticatedObject<Commands>>,
|
||||
groupingKey: Unit?): Set<Commands> {
|
||||
// Enforce that states are ordered, so that the transaction can only be assembled in one way
|
||||
requireThat {
|
||||
"input receivables are ordered and unique" by inputs.isOrderedAndUnique { linearId }
|
||||
"output receivables are ordered and unique" by outputs.isOrderedAndUnique { linearId }
|
||||
}
|
||||
return emptySet()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that all inputs are present as outputs, and that all owners for new outputs have signed the command.
|
||||
*/
|
||||
class Issue : Clause<State, Commands, Unit>() {
|
||||
override val requiredCommands: Set<Class<out CommandData>> = setOf(Commands.Issue::class.java)
|
||||
|
||||
override fun verify(tx: TransactionForContract,
|
||||
inputs: List<State>,
|
||||
outputs: List<State>,
|
||||
commands: List<AuthenticatedObject<Commands>>,
|
||||
groupingKey: Unit?): Set<Commands> {
|
||||
require(groupingKey == null)
|
||||
// TODO: Take in matched commands as a parameter
|
||||
val command = commands.requireSingleCommand<Commands.Issue>()
|
||||
val timestamp = tx.timestamp
|
||||
|
||||
// Records for receivables are never fungible, so we just want to make sure all inputs exist as
|
||||
// outputs, and there are new outputs.
|
||||
requireThat {
|
||||
"there are more output states than input states" by (outputs.size > inputs.size)
|
||||
// TODO: Should timestamps perhaps be enforced on all receivable transactions?
|
||||
"the transaction has a timestamp" by (timestamp != null)
|
||||
}
|
||||
|
||||
val expectedOutputs = ArrayList(inputs)
|
||||
val keysThatSigned = command.signers
|
||||
val owningPubKeys = HashSet<PublicKey>()
|
||||
outputs
|
||||
.filter { it.linearId in command.value.changed }
|
||||
.forEach { state ->
|
||||
val registrationInLocalZone = state.registered.atZone(state.created.zone)
|
||||
requireThat {
|
||||
"the receivable is registered after it was created" by (state.created < registrationInLocalZone)
|
||||
// TODO: Should narrow the window on how long ago the registration can be compared to the transaction
|
||||
"the receivable is registered before the transaction date" by (state.registered < timestamp?.before)
|
||||
}
|
||||
owningPubKeys.add(state.owner)
|
||||
expectedOutputs.add(state)
|
||||
}
|
||||
// Re-sort the outputs now we've finished changing them
|
||||
expectedOutputs.sortBy { state -> state.linearId }
|
||||
requireThat {
|
||||
"the owning keys are the same as the signing keys" by keysThatSigned.containsAll(owningPubKeys)
|
||||
"outputs match inputs with expected changes applied" by outputs.equals(expectedOutputs)
|
||||
}
|
||||
|
||||
return setOf(command.value as Commands)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that inputs and outputs are exactly the same, except for ownership changes specified in the command.
|
||||
* The command must be signed by the previous owners of all changed input states.
|
||||
*/
|
||||
class Move : Clause<State, Commands, Unit>() {
|
||||
override val requiredCommands: Set<Class<out CommandData>> = setOf(Commands.Move::class.java)
|
||||
|
||||
override fun verify(tx: TransactionForContract,
|
||||
inputs: List<State>,
|
||||
outputs: List<State>,
|
||||
commands: List<AuthenticatedObject<Commands>>,
|
||||
groupingKey: Unit?): Set<Commands> {
|
||||
require(groupingKey == null)
|
||||
// TODO: Take in matched commands as a parameter
|
||||
val moveCommand = commands.requireSingleCommand<Commands.Move>()
|
||||
val changes = moveCommand.value.changes
|
||||
// Rebuild the outputs we expect, then compare. Receivables are not fungible, so inputs and outputs
|
||||
// must match one to one
|
||||
val expectedOutputs: List<State> = inputs.map { input ->
|
||||
val newOwner = changes[input.linearId]
|
||||
if (newOwner != null) {
|
||||
input.copy(owner = newOwner)
|
||||
} else {
|
||||
input
|
||||
}
|
||||
}
|
||||
requireThat {
|
||||
"inputs are not empty" by inputs.isNotEmpty()
|
||||
"outputs match inputs with expected changes applied" by outputs.equals(expectedOutputs)
|
||||
}
|
||||
// Do standard move command checks including the signature checks
|
||||
verifyMoveCommand<Commands.Move>(inputs, commands)
|
||||
return setOf(moveCommand.value as Commands)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add and/or remove notices on receivables. All input states must match output states, except for the
|
||||
* changed notices.
|
||||
*/
|
||||
class Note : Clause<State, Commands, Unit>() {
|
||||
override val requiredCommands: Set<Class<out CommandData>> = setOf(Commands.Note::class.java)
|
||||
|
||||
override fun verify(tx: TransactionForContract,
|
||||
inputs: List<State>,
|
||||
outputs: List<State>,
|
||||
commands: List<AuthenticatedObject<Commands>>,
|
||||
groupingKey: Unit?): Set<Commands> {
|
||||
require(groupingKey == null)
|
||||
// TODO: Take in matched commands as a parameter
|
||||
val command = commands.requireSingleCommand<Commands.Note>()
|
||||
// Rebuild the outputs we expect, then compare. Receivables are not fungible, so inputs and outputs
|
||||
// must match one to one
|
||||
val (expectedOutputs, owningPubKeys) = deriveOutputStates(inputs, command)
|
||||
val keysThatSigned = command.signers
|
||||
requireThat {
|
||||
"inputs are not empty" by inputs.isNotEmpty()
|
||||
"outputs match inputs with expected changes applied" by outputs.equals(expectedOutputs)
|
||||
"the owning keys are the same as the signing keys" by keysThatSigned.containsAll(owningPubKeys)
|
||||
}
|
||||
return setOf(command.value as Commands)
|
||||
}
|
||||
|
||||
fun deriveOutputStates(inputs: List<State>,
|
||||
command: AuthenticatedObject<Commands.Note>): Pair<List<State>, Set<PublicKey>> {
|
||||
val changes = command.value.changes
|
||||
val seenNotices = HashSet<Notice>()
|
||||
val outputs = inputs.map { input ->
|
||||
val stateChanges = changes[input.linearId]
|
||||
if (stateChanges != null) {
|
||||
val notices = ArrayList<Notice>(input.notices)
|
||||
stateChanges.added.forEach { notice ->
|
||||
require(!seenNotices.contains(notice)) { "Notices can only appear once in the add and/or remove lists" }
|
||||
require(!notices.contains(notice)) { "Notice is already present on the receivable" }
|
||||
seenNotices.add(notice)
|
||||
notices.add(notice)
|
||||
}
|
||||
stateChanges.removed.forEach { notice ->
|
||||
require(!seenNotices.contains(notice)) { "Notices can only appear once in the add and/or remove lists" }
|
||||
require(notices.remove(notice)) { "Notice is not present on the receivable" }
|
||||
seenNotices.add(notice)
|
||||
}
|
||||
input.copy(notices = notices)
|
||||
} else {
|
||||
input
|
||||
}
|
||||
}
|
||||
return Pair(outputs, seenNotices.map { it.owner }.toSet() )
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a receivable from the ledger. This can only be done once all notices have been removed.
|
||||
*/
|
||||
class Exit : Clause<State, Commands, Unit>() {
|
||||
override val requiredCommands: Set<Class<out CommandData>> = setOf(Commands.Exit::class.java)
|
||||
|
||||
override fun verify(tx: TransactionForContract,
|
||||
inputs: List<State>,
|
||||
outputs: List<State>,
|
||||
commands: List<AuthenticatedObject<Commands>>,
|
||||
groupingKey: Unit?): Set<Commands> {
|
||||
require(groupingKey == null)
|
||||
// TODO: Take in matched commands as a parameter
|
||||
val command = commands.requireSingleCommand<Commands.Exit>()
|
||||
val unmatchedIds = HashSet<UniqueIdentifier>(command.value.changed)
|
||||
val owningPubKeys = HashSet<PublicKey>()
|
||||
val expectedOutputs = inputs.filter { input ->
|
||||
if (unmatchedIds.contains(input.linearId)) {
|
||||
requireThat {
|
||||
"there are no notices on receivables to be removed from the ledger" by input.notices.isEmpty()
|
||||
}
|
||||
unmatchedIds.remove(input.linearId)
|
||||
owningPubKeys.add(input.owner)
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
val keysThatSigned = command.signers
|
||||
requireThat {
|
||||
"inputs are not empty" by inputs.isNotEmpty()
|
||||
"outputs match inputs with expected changes applied" by outputs.equals(expectedOutputs)
|
||||
"the owning keys are the same as the signing keys" by keysThatSigned.containsAll(owningPubKeys)
|
||||
}
|
||||
return setOf(command.value as Commands)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Amend clause, which replaces the Move clause
|
||||
|
||||
/**
|
||||
* Default clause, which checks the inputs and outputs match. Normally this wouldn't be expected to trigger,
|
||||
* as other commands would handle the transaction, but this exists in case the states need to be witnessed by
|
||||
* other contracts within the transaction but not modified.
|
||||
*/
|
||||
class InputsAndOutputsMatch : Clause<State, Commands, Unit>() {
|
||||
override fun verify(tx: TransactionForContract,
|
||||
inputs: List<State>,
|
||||
outputs: List<State>,
|
||||
commands: List<AuthenticatedObject<Commands>>,
|
||||
groupingKey: Unit?): Set<Commands> {
|
||||
require(groupingKey == null)
|
||||
require(inputs.equals(outputs)) { "Inputs and outputs must match unless commands indicate otherwise" }
|
||||
return emptySet()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override val legalContractReference: SecureHash = SecureHash.sha256("https://www.big-book-of-banking-law.gov/receivables.html")
|
||||
fun extractCommands(commands: Collection<AuthenticatedObject<CommandData>>): List<AuthenticatedObject<Commands>>
|
||||
= commands.select<Commands>()
|
||||
override fun verify(tx: TransactionForContract)
|
||||
= verifyClause(tx, FilterOn<State, Commands, Unit>(
|
||||
AllComposition(
|
||||
Clauses.StatesAreOrderedAndUnique(), // TODO: This is varient of the LinearState.ClauseVerifier, and we should move it up there
|
||||
FirstComposition(
|
||||
Clauses.Issue(),
|
||||
Clauses.Exit(),
|
||||
Clauses.Note(),
|
||||
Clauses.Move(),
|
||||
Clauses.InputsAndOutputsMatch()
|
||||
)
|
||||
), { states -> states.filterIsInstance<State>() }),
|
||||
extractCommands(tx.commands))
|
||||
}
|
@ -1,153 +0,0 @@
|
||||
package com.r3corda.contracts.tradefinance
|
||||
|
||||
import com.r3corda.contracts.asset.DUMMY_CASH_ISSUER
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.serialization.OpaqueBytes
|
||||
import com.r3corda.core.utilities.NonEmptySet
|
||||
import com.r3corda.core.utilities.TEST_TX_TIME
|
||||
import com.r3corda.testing.*
|
||||
import org.junit.Test
|
||||
import java.time.Duration
|
||||
import java.time.ZoneId
|
||||
import java.util.*
|
||||
|
||||
class ReceivableTests {
|
||||
val inStates = arrayOf(
|
||||
Receivable.State(
|
||||
UniqueIdentifier.fromString("9e688c58-a548-3b8e-af69-c9e1005ad0bf"),
|
||||
(TEST_TX_TIME - Duration.ofDays(2)).atZone(ZoneId.of("UTC")),
|
||||
TEST_TX_TIME - Duration.ofDays(1),
|
||||
ALICE,
|
||||
BOB,
|
||||
OpaqueBytes(ByteArray(1, { 1 })),
|
||||
OpaqueBytes(ByteArray(1, { 2 })),
|
||||
Amount<Issued<Currency>>(1000L, USD `issued by` DUMMY_CASH_ISSUER),
|
||||
emptySet(),
|
||||
emptyList(),
|
||||
MEGA_CORP_PUBKEY
|
||||
),
|
||||
Receivable.State(
|
||||
UniqueIdentifier.fromString("55a54008-ad1b-3589-aa21-0d2629c1df41"),
|
||||
(TEST_TX_TIME - Duration.ofDays(2)).atZone(ZoneId.of("UTC")),
|
||||
TEST_TX_TIME - Duration.ofDays(1),
|
||||
ALICE,
|
||||
BOB,
|
||||
OpaqueBytes(ByteArray(1, { 3 })),
|
||||
OpaqueBytes(ByteArray(1, { 4 })),
|
||||
Amount<Issued<Currency>>(2000L, GBP `issued by` DUMMY_CASH_ISSUER),
|
||||
emptySet(),
|
||||
emptyList(),
|
||||
MEGA_CORP_PUBKEY
|
||||
)
|
||||
)
|
||||
|
||||
@Test
|
||||
fun trivial() {
|
||||
transaction {
|
||||
input { inStates[0] }
|
||||
timestamp(TEST_TX_TIME)
|
||||
this `fails with` "Inputs and outputs must match unless commands indicate otherwise"
|
||||
|
||||
tweak {
|
||||
output { inStates[0] }
|
||||
verifies()
|
||||
}
|
||||
|
||||
tweak {
|
||||
output { inStates[1] }
|
||||
this `fails with` "Inputs and outputs must match unless commands indicate otherwise"
|
||||
}
|
||||
}
|
||||
|
||||
transaction {
|
||||
output { inStates[0] }
|
||||
timestamp(TEST_TX_TIME)
|
||||
this `fails with` "Inputs and outputs must match unless commands indicate otherwise"
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `order and uniqueness is enforced`() {
|
||||
transaction {
|
||||
input { inStates[0] }
|
||||
input { inStates[1] }
|
||||
output { inStates[0] }
|
||||
output { inStates[1] }
|
||||
timestamp(TEST_TX_TIME)
|
||||
verifies()
|
||||
}
|
||||
|
||||
transaction {
|
||||
input { inStates[1] }
|
||||
input { inStates[0] }
|
||||
output { inStates[0] }
|
||||
output { inStates[1] }
|
||||
timestamp(TEST_TX_TIME)
|
||||
this `fails with` "receivables are ordered and unique"
|
||||
}
|
||||
|
||||
transaction {
|
||||
input { inStates[0] }
|
||||
input { inStates[0] }
|
||||
output { inStates[0] }
|
||||
output { inStates[0] }
|
||||
timestamp(TEST_TX_TIME)
|
||||
this `fails with` "receivables are ordered and unique"
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `issue`() {
|
||||
// Testing that arbitrary new outputs are rejected is covered in trivial()
|
||||
transaction {
|
||||
output { inStates[0] }
|
||||
command(MEGA_CORP_PUBKEY, Receivable.Commands.Issue(NonEmptySet(inStates[0].linearId)))
|
||||
timestamp(TEST_TX_TIME)
|
||||
verifies()
|
||||
}
|
||||
transaction {
|
||||
output { inStates[0] }
|
||||
command(ALICE_PUBKEY, Receivable.Commands.Issue(NonEmptySet(inStates[0].linearId)))
|
||||
timestamp(TEST_TX_TIME)
|
||||
this `fails with` "the owning keys are the same as the signing keys"
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `move`() {
|
||||
transaction {
|
||||
input { inStates[0] }
|
||||
output { inStates[0].copy(owner = MINI_CORP_PUBKEY) }
|
||||
timestamp(TEST_TX_TIME)
|
||||
this `fails with` "Inputs and outputs must match unless commands indicate otherwise"
|
||||
tweak {
|
||||
command(MEGA_CORP_PUBKEY, Receivable.Commands.Move(null, mapOf(Pair(inStates[0].linearId, MINI_CORP_PUBKEY))))
|
||||
verifies()
|
||||
}
|
||||
// Test that moves enforce the correct new owner
|
||||
tweak {
|
||||
command(MEGA_CORP_PUBKEY, Receivable.Commands.Move(null, mapOf(Pair(inStates[0].linearId, ALICE_PUBKEY))))
|
||||
this `fails with` "outputs match inputs with expected changes applied"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `exit`() {
|
||||
// Testing that arbitrary disappearing outputs are rejected is covered in trivial()
|
||||
transaction {
|
||||
input { inStates[0] }
|
||||
timestamp(TEST_TX_TIME)
|
||||
command(MEGA_CORP_PUBKEY, Receivable.Commands.Exit(NonEmptySet(inStates[0].linearId)))
|
||||
verifies()
|
||||
}
|
||||
transaction {
|
||||
input { inStates[0] }
|
||||
timestamp(TEST_TX_TIME)
|
||||
command(ALICE_PUBKEY, Receivable.Commands.Exit(NonEmptySet(inStates[0].linearId)))
|
||||
this `fails with` "the owning keys are the same as the signing keys"
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Test adding and removing notices
|
||||
}
|
@ -1,35 +1,30 @@
|
||||
package com.r3corda.protocols
|
||||
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.core.messaging.onNext
|
||||
import com.r3corda.core.messaging.send
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import java.util.concurrent.Executor
|
||||
|
||||
/**
|
||||
* Abstract superclass for request messages sent to services, which includes common
|
||||
* fields such as replyTo and sessionID.
|
||||
* Abstract superclass for request messages sent to services which expect a reply.
|
||||
*/
|
||||
interface ServiceRequestMessage {
|
||||
val sessionID: Long
|
||||
fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients
|
||||
val replyTo: SingleMessageRecipient
|
||||
}
|
||||
|
||||
/**
|
||||
* A message which specifies reply destination as a specific endpoint such as a monitoring client. This is of particular
|
||||
* use where we want to address a specific endpoint, not necessarily a specific user (for example if the same user logs
|
||||
* in on two machines, we want to consistently deliver messages as part of a session, to the same machine the session
|
||||
* started on).
|
||||
* Sends a [ServiceRequestMessage] to [target] and returns a [ListenableFuture] of the response.
|
||||
* @param R The type of the response.
|
||||
*/
|
||||
interface DirectRequestMessage: ServiceRequestMessage {
|
||||
val replyToRecipient: SingleMessageRecipient
|
||||
override fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients = replyToRecipient
|
||||
}
|
||||
|
||||
interface PartyRequestMessage : ServiceRequestMessage {
|
||||
|
||||
val replyToParty: Party
|
||||
|
||||
override fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients {
|
||||
return networkMapCache.partyNodes.single { it.identity == replyToParty }.address
|
||||
}
|
||||
fun <R : Any> MessagingService.sendRequest(topic: String,
|
||||
request: ServiceRequestMessage,
|
||||
target: SingleMessageRecipient,
|
||||
executor: Executor? = null): ListenableFuture<R> {
|
||||
val responseFuture = onNext<R>(topic, request.sessionID, executor)
|
||||
send(topic, DEFAULT_SESSION_ID, request, target)
|
||||
return responseFuture
|
||||
}
|
@ -75,6 +75,18 @@ To enable remote debugging of the corda process use a command line such as:
|
||||
|
||||
This command line will start the debugger on port 5005 and pause the process awaiting debugger attachment.
|
||||
|
||||
Viewing persisted state of your Node
|
||||
------------------------------------
|
||||
|
||||
To make examining the persisted contract states of your node or the internal node database tables easier, and providing you are
|
||||
using the default database configuration used for demos, you should be able to connect to the internal node database over
|
||||
a JDBC connection at the URL that is output to the logs at node start up. That URL will be of the form ``jdbc:h2:tcp://<host>:<port>/node``.
|
||||
|
||||
The user name and password for the login are as per the node data source configuration.
|
||||
|
||||
The name and column layout of the internal node tables is in a state of flux and should not be relied upon to remain static
|
||||
at the present time, and should certainly be treated as read-only.
|
||||
|
||||
.. _CordaPluginRegistry: api/com.r3corda.core.node/-corda-plugin-registry/index.html
|
||||
.. _ServiceHubInternal: api/com.r3corda.node.services.api/-service-hub-internal/index.html
|
||||
.. _ServiceHub: api/com.r3corda.node.services.api/-service-hub/index.html
|
||||
|
@ -11,7 +11,9 @@ as annotations and is converted to database table rows by the node automatically
|
||||
node's local vault as part of a transaction.
|
||||
|
||||
.. note:: Presently the node includes an instance of the H2 database but any database that supports JDBC is a candidate and
|
||||
the node will in the future support a range of database implementations via their JDBC drivers.
|
||||
the node will in the future support a range of database implementations via their JDBC drivers. Much of the node
|
||||
internal state is also persisted there. If a node is using the default H2 JDBC configuration you should be able to connect
|
||||
to the H2 instance using the JDBC URL output to the logs at startup of the form ``jdbc:h2:tcp://<host>:<port>/node``
|
||||
|
||||
Schemas
|
||||
-------
|
||||
|
@ -8,8 +8,6 @@ import com.r3corda.core.RunOnCallerThread
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.X509Utilities
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.messaging.onNext
|
||||
import com.r3corda.core.node.CityDatabase
|
||||
import com.r3corda.core.node.CordaPluginRegistry
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
@ -18,7 +16,6 @@ import com.r3corda.core.node.services.*
|
||||
import com.r3corda.core.node.services.NetworkMapCache.MapChangeType
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.protocols.ProtocolLogicRefFactory
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.seconds
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
@ -49,6 +46,7 @@ import com.r3corda.node.services.transactions.ValidatingNotaryService
|
||||
import com.r3corda.node.services.vault.CashBalanceAsMetricsObserver
|
||||
import com.r3corda.node.services.vault.NodeVaultService
|
||||
import com.r3corda.node.utilities.*
|
||||
import com.r3corda.protocols.sendRequest
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.slf4j.Logger
|
||||
import java.nio.file.FileAlreadyExistsException
|
||||
@ -366,12 +364,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
|
||||
val instant = platformClock.instant()
|
||||
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
|
||||
val reg = NodeRegistration(info, instant.toEpochMilli(), type, expires)
|
||||
val sessionID = random63BitValue()
|
||||
val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID)
|
||||
val message = net.createMessage(REGISTER_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, request.serialize().bits)
|
||||
val future = net.onNext<RegistrationResponse>(REGISTER_PROTOCOL_TOPIC, sessionID, RunOnCallerThread)
|
||||
net.send(message, networkMapAddr)
|
||||
return future
|
||||
val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress)
|
||||
return net.sendRequest(REGISTER_PROTOCOL_TOPIC, request, networkMapAddr, RunOnCallerThread)
|
||||
}
|
||||
|
||||
protected open fun makeKeyManagementService(): KeyManagementService = PersistentKeyManagementService(setOf(storage.myLegalIdentityKey))
|
||||
|
@ -235,6 +235,35 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
|
||||
|
||||
override fun makeUniquenessProvider() = PersistentUniquenessProvider()
|
||||
|
||||
/**
|
||||
* If the node is persisting to an embedded H2 database, then expose this via TCP with a JDBC URL of the form:
|
||||
* jdbc:h2:tcp://<host>:<port>/node
|
||||
* with username and password as per the DataSource connection details. The key element to enabling this support is to
|
||||
* ensure that you specify a JDBC connection URL of the form jdbc:h2:file: in the node config and that you include
|
||||
* the H2 option AUTO_SERVER_PORT set to the port you desire to use (0 will give a dynamically allocated port number)
|
||||
* but exclude the H2 option AUTO_SERVER=TRUE.
|
||||
* This is not using the H2 "automatic mixed mode" directly but leans on many of the underpinnings. For more details
|
||||
* on H2 URLs and configuration see: http://www.h2database.com/html/features.html#database_url
|
||||
*/
|
||||
override fun initialiseDatabasePersistence(insideTransaction: () -> Unit) {
|
||||
val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url")
|
||||
val h2Prefix = "jdbc:h2:file:"
|
||||
if (databaseUrl != null && databaseUrl.startsWith(h2Prefix)) {
|
||||
val h2Port = databaseUrl.substringAfter(";AUTO_SERVER_PORT=", "").substringBefore(';')
|
||||
if (h2Port.isNotBlank()) {
|
||||
val databaseName = databaseUrl.removePrefix(h2Prefix).substringBefore(';')
|
||||
val server = org.h2.tools.Server.createTcpServer(
|
||||
"-tcpPort", h2Port,
|
||||
"-tcpAllowOthers",
|
||||
"-tcpDaemon",
|
||||
"-key", "node", databaseName)
|
||||
val url = server.start().url
|
||||
log.info("H2 JDBC url is jdbc:h2:$url/node")
|
||||
}
|
||||
}
|
||||
super.initialiseDatabasePersistence(insideTransaction)
|
||||
}
|
||||
|
||||
override fun start(): Node {
|
||||
alreadyRunningNodeCheck()
|
||||
super.start()
|
||||
|
@ -38,7 +38,7 @@ abstract class AbstractNodeService(val services: ServiceHubInternal) : Singleton
|
||||
// If the return type R is Unit, then do not send a response
|
||||
if (response.javaClass != Unit.javaClass) {
|
||||
val msg = net.createMessage(topic, request.sessionID, response.serialize().bits)
|
||||
net.send(msg, request.getReplyTo(services.networkMapCache))
|
||||
net.send(msg, request.replyTo)
|
||||
}
|
||||
} catch(e: Exception) {
|
||||
exceptionConsumer(message, e)
|
||||
|
@ -7,7 +7,9 @@ import com.r3corda.core.RunOnCallerThread
|
||||
import com.r3corda.core.contracts.Contract
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.map
|
||||
import com.r3corda.core.messaging.*
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.messaging.createMessage
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.NetworkCacheError
|
||||
@ -15,7 +17,6 @@ import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.core.node.services.NetworkMapCache.MapChange
|
||||
import com.r3corda.core.node.services.NetworkMapCache.MapChangeType
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.serialization.serialize
|
||||
@ -26,6 +27,7 @@ import com.r3corda.node.services.network.NetworkMapService.FetchMapResponse
|
||||
import com.r3corda.node.services.network.NetworkMapService.SubscribeResponse
|
||||
import com.r3corda.node.services.transactions.NotaryService
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.protocols.sendRequest
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.security.PublicKey
|
||||
@ -82,17 +84,12 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
|
||||
}
|
||||
|
||||
// Fetch the network map and register for updates at the same time
|
||||
val sessionID = random63BitValue()
|
||||
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress, sessionID)
|
||||
|
||||
// Add a message handler for the response, and prepare a future to put the data into.
|
||||
// Note that the message handler will run on the network thread (not this one).
|
||||
val future = net.onNext<FetchMapResponse>(FETCH_PROTOCOL_TOPIC, sessionID, RunOnCallerThread).map { resp ->
|
||||
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVer, net.myAddress)
|
||||
val future = net.sendRequest<FetchMapResponse>(FETCH_PROTOCOL_TOPIC, req, networkMapAddress, RunOnCallerThread).map { resp ->
|
||||
// We may not receive any nodes back, if the map hasn't changed since the version specified
|
||||
resp.nodes?.forEach { processRegistration(it) }
|
||||
Unit
|
||||
}
|
||||
net.send(FETCH_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, networkMapAddress)
|
||||
_registrationFuture.setFuture(future)
|
||||
|
||||
return future
|
||||
@ -119,19 +116,11 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
|
||||
*/
|
||||
override fun deregisterForUpdates(net: MessagingService, service: NodeInfo): ListenableFuture<Unit> {
|
||||
// Fetch the network map and register for updates at the same time
|
||||
val sessionID = random63BitValue()
|
||||
val req = NetworkMapService.SubscribeRequest(false, net.myAddress, sessionID)
|
||||
|
||||
// Add a message handler for the response, and prepare a future to put the data into.
|
||||
// Note that the message handler will run on the network thread (not this one).
|
||||
|
||||
val future = net.onNext<SubscribeResponse>(SUBSCRIPTION_PROTOCOL_TOPIC, sessionID, RunOnCallerThread).map {
|
||||
val req = NetworkMapService.SubscribeRequest(false, net.myAddress)
|
||||
val future = net.sendRequest<SubscribeResponse>(SUBSCRIPTION_PROTOCOL_TOPIC, req, service.address, RunOnCallerThread).map {
|
||||
if (it.confirmed) Unit else throw NetworkCacheError.DeregistrationFailed()
|
||||
}
|
||||
_registrationFuture.setFuture(future)
|
||||
|
||||
net.send(SUBSCRIPTION_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, service.address)
|
||||
|
||||
return future
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,7 @@ import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.SerializedBytes
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.serialization.serialize
|
||||
@ -32,7 +33,6 @@ import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
|
||||
/**
|
||||
* A network map contains lists of nodes on the network along with information about their identity keys, services
|
||||
* they provide and host names or IP addresses where they can be connected to. This information is cached locally within
|
||||
@ -67,18 +67,27 @@ interface NetworkMapService {
|
||||
|
||||
val nodes: List<NodeInfo>
|
||||
|
||||
abstract class NetworkMapRequestMessage(val replyTo: MessageRecipients) : ServiceRequestMessage {
|
||||
override fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients = replyTo
|
||||
}
|
||||
|
||||
class FetchMapRequest(val subscribe: Boolean, val ifChangedSinceVersion: Int?, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
|
||||
class FetchMapRequest(val subscribe: Boolean,
|
||||
val ifChangedSinceVersion: Int?,
|
||||
override val replyTo: SingleMessageRecipient,
|
||||
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
|
||||
data class FetchMapResponse(val nodes: Collection<NodeRegistration>?, val version: Int)
|
||||
class QueryIdentityRequest(val identity: Party, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
|
||||
|
||||
class QueryIdentityRequest(val identity: Party,
|
||||
override val replyTo: SingleMessageRecipient,
|
||||
override val sessionID: Long) : ServiceRequestMessage
|
||||
data class QueryIdentityResponse(val node: NodeInfo?)
|
||||
class RegistrationRequest(val wireReg: WireNodeRegistration, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
|
||||
|
||||
class RegistrationRequest(val wireReg: WireNodeRegistration,
|
||||
override val replyTo: SingleMessageRecipient,
|
||||
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
|
||||
data class RegistrationResponse(val success: Boolean)
|
||||
class SubscribeRequest(val subscribe: Boolean, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
|
||||
|
||||
class SubscribeRequest(val subscribe: Boolean,
|
||||
override val replyTo: SingleMessageRecipient,
|
||||
override val sessionID: Long = random63BitValue()) : ServiceRequestMessage
|
||||
data class SubscribeResponse(val confirmed: Boolean)
|
||||
|
||||
data class Update(val wireReg: WireNodeRegistration, val mapVersion: Int, val replyTo: MessageRecipients)
|
||||
data class UpdateAcknowledge(val mapVersion: Int, val replyTo: MessageRecipients)
|
||||
}
|
||||
|
@ -6,10 +6,11 @@ keyStorePassword = "cordacadevpass"
|
||||
trustStorePassword = "trustpass"
|
||||
dataSourceProperties = {
|
||||
dataSourceClassName = org.h2.jdbcx.JdbcDataSource
|
||||
"dataSource.url" = "jdbc:h2:"${basedir}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;MVCC=true;MV_STORE=true;WRITE_DELAY=0"
|
||||
"dataSource.url" = "jdbc:h2:file:"${basedir}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;MVCC=true;MV_STORE=true;WRITE_DELAY=0;AUTO_SERVER_PORT="${h2port}
|
||||
"dataSource.user" = sa
|
||||
"dataSource.password" = ""
|
||||
}
|
||||
devMode = true
|
||||
certificateSigningService = "https://cordaci-netperm.corda.r3cev.com"
|
||||
useHTTPS = false
|
||||
useHTTPS = false
|
||||
h2port = 0
|
@ -2,13 +2,21 @@ package com.r3corda.node.services
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.core.map
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.messaging.send
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.node.services.network.AbstractNetworkMapService
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapService
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.network.NetworkMapService.*
|
||||
import com.r3corda.node.services.network.NetworkMapService.Companion.FETCH_PROTOCOL_TOPIC
|
||||
import com.r3corda.node.services.network.NetworkMapService.Companion.PUSH_ACK_PROTOCOL_TOPIC
|
||||
import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC
|
||||
import com.r3corda.node.services.network.NetworkMapService.Companion.SUBSCRIPTION_PROTOCOL_TOPIC
|
||||
import com.r3corda.node.services.network.NodeRegistration
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.protocols.sendRequest
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork.MockNode
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.security.PrivateKey
|
||||
@ -24,8 +32,8 @@ import kotlin.test.assertTrue
|
||||
*/
|
||||
abstract class AbstractNetworkMapServiceTest {
|
||||
|
||||
protected fun success(mapServiceNode: MockNetwork.MockNode,
|
||||
registerNode: MockNetwork.MockNode,
|
||||
protected fun success(mapServiceNode: MockNode,
|
||||
registerNode: MockNode,
|
||||
service: () -> AbstractNetworkMapService,
|
||||
swizzle: () -> Unit) {
|
||||
// For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved.
|
||||
@ -41,14 +49,14 @@ abstract class AbstractNetworkMapServiceTest {
|
||||
val nodeKey = registerNode.storage.myLegalIdentityKey
|
||||
val addChange = NodeRegistration(registerNode.info, instant.toEpochMilli(), AddOrRemove.ADD, expires)
|
||||
val addWireChange = addChange.toWire(nodeKey.private)
|
||||
service().processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE))
|
||||
service().processRegistrationChangeRequest(RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE))
|
||||
swizzle()
|
||||
|
||||
assertEquals(1, service().nodes.count())
|
||||
assertEquals(registerNode.info, service().processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
|
||||
|
||||
// Re-registering should be a no-op
|
||||
service().processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE))
|
||||
service().processRegistrationChangeRequest(RegistrationRequest(addWireChange, mapServiceNode.info.address, Long.MIN_VALUE))
|
||||
swizzle()
|
||||
|
||||
assertEquals(1, service().nodes.count())
|
||||
@ -56,26 +64,26 @@ abstract class AbstractNetworkMapServiceTest {
|
||||
// Confirm that de-registering the node succeeds and drops it from the node lists
|
||||
val removeChange = NodeRegistration(registerNode.info, instant.toEpochMilli()+1, AddOrRemove.REMOVE, expires)
|
||||
val removeWireChange = removeChange.toWire(nodeKey.private)
|
||||
assert(service().processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
|
||||
assert(service().processRegistrationChangeRequest(RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
|
||||
swizzle()
|
||||
|
||||
assertNull(service().processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
|
||||
swizzle()
|
||||
|
||||
// Trying to de-register a node that doesn't exist should fail
|
||||
assert(!service().processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
|
||||
assert(!service().processRegistrationChangeRequest(RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
|
||||
}
|
||||
|
||||
protected fun `success with network`(network: MockNetwork,
|
||||
mapServiceNode: MockNetwork.MockNode,
|
||||
registerNode: MockNetwork.MockNode,
|
||||
mapServiceNode: MockNode,
|
||||
registerNode: MockNode,
|
||||
swizzle: () -> Unit) {
|
||||
// For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved.
|
||||
swizzle()
|
||||
|
||||
// Confirm all nodes have registered themselves
|
||||
network.runNetwork()
|
||||
var fetchPsm = fetchMap(registerNode, mapServiceNode, false)
|
||||
var fetchPsm = registerNode.fetchMap(mapServiceNode, false)
|
||||
network.runNetwork()
|
||||
assertEquals(2, fetchPsm.get()?.count())
|
||||
|
||||
@ -84,21 +92,21 @@ abstract class AbstractNetworkMapServiceTest {
|
||||
val instant = Instant.now()
|
||||
val expires = instant + NetworkMapService.DEFAULT_EXPIRATION_PERIOD
|
||||
val reg = NodeRegistration(registerNode.info, instant.toEpochMilli()+1, AddOrRemove.REMOVE, expires)
|
||||
val registerPsm = registration(registerNode, mapServiceNode, reg, nodeKey.private)
|
||||
val registerPsm = registerNode.registration(mapServiceNode, reg, nodeKey.private)
|
||||
network.runNetwork()
|
||||
assertTrue(registerPsm.get().success)
|
||||
|
||||
swizzle()
|
||||
|
||||
// Now only map service node should be registered
|
||||
fetchPsm = fetchMap(registerNode, mapServiceNode, false)
|
||||
fetchPsm = registerNode.fetchMap(mapServiceNode, false)
|
||||
network.runNetwork()
|
||||
assertEquals(mapServiceNode.info, fetchPsm.get()?.filter { it.type == AddOrRemove.ADD }?.map { it.node }?.single())
|
||||
}
|
||||
|
||||
protected fun `subscribe with network`(network: MockNetwork,
|
||||
mapServiceNode: MockNetwork.MockNode,
|
||||
registerNode: MockNetwork.MockNode,
|
||||
mapServiceNode: MockNode,
|
||||
registerNode: MockNode,
|
||||
service: () -> AbstractNetworkMapService,
|
||||
swizzle: () -> Unit) {
|
||||
// For persistent service, switch out the implementation for a newly instantiated one so we can check the state is preserved.
|
||||
@ -106,7 +114,7 @@ abstract class AbstractNetworkMapServiceTest {
|
||||
|
||||
// Test subscribing to updates
|
||||
network.runNetwork()
|
||||
val subscribePsm = subscribe(registerNode, mapServiceNode, true)
|
||||
val subscribePsm = registerNode.subscribe(mapServiceNode, true)
|
||||
network.runNetwork()
|
||||
subscribePsm.get()
|
||||
|
||||
@ -131,7 +139,7 @@ abstract class AbstractNetworkMapServiceTest {
|
||||
assertEquals(1, service().getUnacknowledgedCount(registerNode.info.address, startingMapVersion + 1))
|
||||
|
||||
// Send in an acknowledgment and verify the count goes down
|
||||
updateAcknowlege(registerNode, mapServiceNode, startingMapVersion + 1)
|
||||
registerNode.updateAcknowlege(mapServiceNode, startingMapVersion + 1)
|
||||
network.runNetwork()
|
||||
|
||||
swizzle()
|
||||
@ -155,24 +163,25 @@ abstract class AbstractNetworkMapServiceTest {
|
||||
}
|
||||
}
|
||||
|
||||
private fun registration(registerNode: MockNetwork.MockNode, mapServiceNode: MockNetwork.MockNode, reg: NodeRegistration, privateKey: PrivateKey): ListenableFuture<NetworkMapService.RegistrationResponse> {
|
||||
val req = NetworkMapService.RegistrationRequest(reg.toWire(privateKey), registerNode.services.networkService.myAddress, random63BitValue())
|
||||
return registerNode.sendAndReceive<NetworkMapService.RegistrationResponse>(NetworkMapService.REGISTER_PROTOCOL_TOPIC, mapServiceNode, req)
|
||||
private fun MockNode.registration(mapServiceNode: MockNode, reg: NodeRegistration, privateKey: PrivateKey): ListenableFuture<RegistrationResponse> {
|
||||
val req = RegistrationRequest(reg.toWire(privateKey), services.networkService.myAddress)
|
||||
return services.networkService.sendRequest(REGISTER_PROTOCOL_TOPIC, req, mapServiceNode.info.address)
|
||||
}
|
||||
|
||||
private fun subscribe(registerNode: MockNetwork.MockNode, mapServiceNode: MockNetwork.MockNode, subscribe: Boolean): ListenableFuture<NetworkMapService.SubscribeResponse> {
|
||||
val req = NetworkMapService.SubscribeRequest(subscribe, registerNode.services.networkService.myAddress, random63BitValue())
|
||||
return registerNode.sendAndReceive<NetworkMapService.SubscribeResponse>(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, mapServiceNode, req)
|
||||
private fun MockNode.subscribe(mapServiceNode: MockNode, subscribe: Boolean): ListenableFuture<SubscribeResponse> {
|
||||
val req = SubscribeRequest(subscribe, services.networkService.myAddress)
|
||||
return services.networkService.sendRequest(SUBSCRIPTION_PROTOCOL_TOPIC, req, mapServiceNode.info.address)
|
||||
}
|
||||
|
||||
private fun updateAcknowlege(registerNode: MockNetwork.MockNode, mapServiceNode: MockNetwork.MockNode, mapVersion: Int) {
|
||||
val req = NetworkMapService.UpdateAcknowledge(mapVersion, registerNode.services.networkService.myAddress)
|
||||
registerNode.send(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, mapServiceNode, req)
|
||||
private fun MockNode.updateAcknowlege(mapServiceNode: MockNode, mapVersion: Int) {
|
||||
val req = UpdateAcknowledge(mapVersion, services.networkService.myAddress)
|
||||
services.networkService.send(PUSH_ACK_PROTOCOL_TOPIC, DEFAULT_SESSION_ID, req, mapServiceNode.info.address)
|
||||
}
|
||||
|
||||
private fun fetchMap(registerNode: MockNetwork.MockNode, mapServiceNode: MockNetwork.MockNode, subscribe: Boolean, ifChangedSinceVersion: Int? = null): Future<Collection<NodeRegistration>?> {
|
||||
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVersion, registerNode.services.networkService.myAddress, random63BitValue())
|
||||
return registerNode.sendAndReceive<NetworkMapService.FetchMapResponse>(NetworkMapService.FETCH_PROTOCOL_TOPIC, mapServiceNode, req).map { it.nodes }
|
||||
private fun MockNode.fetchMap(mapServiceNode: MockNode, subscribe: Boolean, ifChangedSinceVersion: Int? = null): Future<Collection<NodeRegistration>?> {
|
||||
val net = services.networkService
|
||||
val req = FetchMapRequest(subscribe, ifChangedSinceVersion, net.myAddress)
|
||||
return net.sendRequest<FetchMapResponse>(FETCH_PROTOCOL_TOPIC, req, mapServiceNode.info.address).map { it.nodes }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,7 +66,8 @@ private fun startNode(baseDirectory: Path,
|
||||
"--base-directory", baseDirectory.toString(),
|
||||
"--network-address", nodeAddr.toString(),
|
||||
"--network-map-address", networkMapAddr.toString(),
|
||||
"--api-address", apiAddr.toString())
|
||||
"--api-address", apiAddr.toString(),
|
||||
"--h2-port", "0")
|
||||
val proc = spawn("com.r3corda.demos.IRSDemoKt", args, "IRSDemo$nodeType")
|
||||
NodeApi.ensureNodeStartsOrKill(proc, apiAddr)
|
||||
return proc
|
||||
|
@ -26,7 +26,8 @@ class TraderDemoTest {
|
||||
"--role", "BUYER",
|
||||
"--network-address", buyerAddr.toString(),
|
||||
"--api-address", buyerApiAddr.toString(),
|
||||
"--base-directory", baseDirectory
|
||||
"--base-directory", baseDirectory,
|
||||
"--h2-port", "0"
|
||||
)
|
||||
val proc = spawn("com.r3corda.demos.TraderDemoKt", args, "TradeDemoBuyer")
|
||||
NodeApi.ensureNodeStartsOrKill(proc, buyerApiAddr)
|
||||
@ -42,7 +43,8 @@ class TraderDemoTest {
|
||||
"--network-address", sellerAddr.toString(),
|
||||
"--api-address", sellerApiAddr.toString(),
|
||||
"--other-network-address", buyerAddr.toString(),
|
||||
"--base-directory", baseDirectory
|
||||
"--base-directory", baseDirectory,
|
||||
"--h2-port", "0"
|
||||
)
|
||||
val proc = spawn("com.r3corda.demos.TraderDemoKt", args, "TradeDemoSeller")
|
||||
assertExitOrKill(proc)
|
||||
|
@ -87,7 +87,8 @@ sealed class CliParams {
|
||||
val tradeWithIdentities: List<Path>,
|
||||
val uploadRates: Boolean,
|
||||
val defaultLegalName: String,
|
||||
val autoSetup: Boolean // Run Setup for both nodes automatically with default arguments
|
||||
val autoSetup: Boolean, // Run Setup for both nodes automatically with default arguments
|
||||
val h2Port: Int
|
||||
) : CliParams()
|
||||
|
||||
/**
|
||||
@ -151,6 +152,12 @@ sealed class CliParams {
|
||||
IRSDemoNode.NodeB -> Node.DEFAULT_PORT + 3
|
||||
}
|
||||
|
||||
private fun defaultH2Port(node: IRSDemoNode) =
|
||||
when (node) {
|
||||
IRSDemoNode.NodeA -> Node.DEFAULT_PORT + 4
|
||||
IRSDemoNode.NodeB -> Node.DEFAULT_PORT + 5
|
||||
}
|
||||
|
||||
private fun parseRunNode(options: OptionSet, node: IRSDemoNode): RunNode {
|
||||
val dir = nodeDirectory(options, node)
|
||||
|
||||
@ -171,7 +178,8 @@ sealed class CliParams {
|
||||
},
|
||||
uploadRates = node == IRSDemoNode.NodeB,
|
||||
defaultLegalName = legalName(node),
|
||||
autoSetup = !options.has(CliParamsSpec.baseDirectoryArg) && !options.has(CliParamsSpec.fakeTradeWithIdentityFile)
|
||||
autoSetup = !options.has(CliParamsSpec.baseDirectoryArg) && !options.has(CliParamsSpec.fakeTradeWithIdentityFile),
|
||||
h2Port = options.valueOf(CliParamsSpec.h2PortArg.defaultsTo(defaultH2Port(node)))
|
||||
)
|
||||
}
|
||||
|
||||
@ -263,6 +271,7 @@ object CliParamsSpec {
|
||||
val fakeTradeWithIdentityFile =
|
||||
parser.accepts("fake-trade-with-identity-file", "Extra identities to be registered with the identity service")
|
||||
.withOptionalArg()
|
||||
val h2PortArg = parser.accepts("h2-port").withRequiredArg().ofType(Int::class.java)
|
||||
val nonOptions = parser.nonOptions()
|
||||
val help = parser.accepts("help", "Prints this help").forHelp()
|
||||
}
|
||||
@ -449,7 +458,8 @@ private fun getNodeConfig(cliParams: CliParams.RunNode): FullNodeConfiguration {
|
||||
val configFile = cliParams.dir.resolve("config")
|
||||
val configOverrides = mapOf(
|
||||
"artemisAddress" to cliParams.networkAddress.toString(),
|
||||
"webAddress" to cliParams.apiAddress.toString()
|
||||
"webAddress" to cliParams.apiAddress.toString(),
|
||||
"h2port" to cliParams.h2Port.toString()
|
||||
)
|
||||
return loadConfigFile(cliParams.dir, configFile, configOverrides, cliParams.defaultLegalName)
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ fun main(args: Array<String>) {
|
||||
val theirNetworkAddress = parser.accepts("other-network-address").withRequiredArg().defaultsTo("localhost")
|
||||
val apiNetworkAddress = parser.accepts("api-address").withRequiredArg().defaultsTo("localhost")
|
||||
val baseDirectoryArg = parser.accepts("base-directory").withRequiredArg().defaultsTo(DEFAULT_BASE_DIRECTORY)
|
||||
|
||||
val h2PortArg = parser.accepts("h2-port").withRequiredArg().ofType(Int::class.java).defaultsTo(-1)
|
||||
val options = try {
|
||||
parser.parse(*args)
|
||||
} catch (e: Exception) {
|
||||
@ -104,6 +104,9 @@ fun main(args: Array<String>) {
|
||||
}
|
||||
)
|
||||
val apiNetAddr = HostAndPort.fromString(options.valueOf(apiNetworkAddress)).withDefaultPort(myNetAddr.port + 1)
|
||||
val h2Port = if (options.valueOf(h2PortArg) < 0) {
|
||||
myNetAddr.port + 2
|
||||
} else options.valueOf(h2PortArg)
|
||||
|
||||
val baseDirectory = options.valueOf(baseDirectoryArg)!!
|
||||
|
||||
@ -125,7 +128,8 @@ fun main(args: Array<String>) {
|
||||
val configOverrides = mapOf(
|
||||
"myLegalName" to myLegalName,
|
||||
"artemisAddress" to myNetAddr.toString(),
|
||||
"webAddress" to apiNetAddr.toString()
|
||||
"webAddress" to apiNetAddr.toString(),
|
||||
"h2port" to h2Port.toString()
|
||||
)
|
||||
FullNodeConfiguration(NodeConfiguration.loadConfig(directory, allowMissingConfig = true, configOverrides = configOverrides))
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
apply plugin: 'kotlin'
|
||||
apply plugin: QuasarPlugin
|
||||
apply plugin: DefaultPublishTasks
|
||||
|
||||
repositories {
|
||||
mavenLocal()
|
||||
@ -46,3 +47,15 @@ dependencies {
|
||||
}
|
||||
|
||||
quasarScan.dependsOn('classes', ':core:classes', ':contracts:classes')
|
||||
|
||||
publishing {
|
||||
publications {
|
||||
testutils(MavenPublication) {
|
||||
from components.java
|
||||
artifactId 'test-utils'
|
||||
|
||||
artifact sourceJar
|
||||
artifact javadocJar
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,13 +2,9 @@ package com.r3corda.testing.node
|
||||
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.div
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.messaging.TopicSession
|
||||
import com.r3corda.core.messaging.onNext
|
||||
import com.r3corda.core.messaging.send
|
||||
import com.r3corda.core.node.PhysicalLocation
|
||||
import com.r3corda.core.node.services.KeyManagementService
|
||||
import com.r3corda.core.node.services.ServiceInfo
|
||||
@ -23,13 +19,12 @@ import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.keys.E2ETestKeyManagementService
|
||||
import com.r3corda.node.services.messaging.CordaRPCOps
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapService
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.persistence.DBCheckpointStorage
|
||||
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import com.r3corda.protocols.ServiceRequestMessage
|
||||
import org.slf4j.Logger
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
@ -146,21 +141,6 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
return (net as InMemoryMessagingNetwork.InMemoryMessaging).pumpReceive(block)
|
||||
}
|
||||
|
||||
fun send(topic: String, target: MockNode, payload: Any) {
|
||||
services.networkService.send(TopicSession(topic), payload, target.info.address)
|
||||
}
|
||||
|
||||
fun <M : Any> receive(topic: String, sessionId: Long): ListenableFuture<M> {
|
||||
return services.networkService.onNext<M>(topic, sessionId)
|
||||
}
|
||||
|
||||
inline fun <reified T : Any> sendAndReceive(topic: String,
|
||||
target: MockNode,
|
||||
payload: ServiceRequestMessage): ListenableFuture<T> {
|
||||
send(topic, target, payload)
|
||||
return receive(topic, payload.sessionID)
|
||||
}
|
||||
|
||||
fun disableDBCloseOnStop() {
|
||||
runOnStop.remove(dbCloser)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user