mirror of
https://github.com/corda/corda.git
synced 2024-12-22 06:17:55 +00:00
send and sendAndReceive use Party for the destination
This commit is contained in:
parent
aef111114f
commit
a02263937c
@ -7,7 +7,6 @@ import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.DigitalSignature
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.signWithECDSA
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.random63BitValue
|
||||
@ -61,7 +60,7 @@ object TwoPartyTradeProtocol {
|
||||
class SignaturesFromSeller(val sellerSig: DigitalSignature.WithKey,
|
||||
val notarySig: DigitalSignature.LegallyIdentifiable)
|
||||
|
||||
open class Seller(val otherSide: SingleMessageRecipient,
|
||||
open class Seller(val otherSide: Party,
|
||||
val notaryNode: NodeInfo,
|
||||
val assetToSell: StateAndRef<OwnableState>,
|
||||
val price: Amount<Issued<Currency>>,
|
||||
@ -172,7 +171,7 @@ object TwoPartyTradeProtocol {
|
||||
}
|
||||
}
|
||||
|
||||
open class Buyer(val otherSide: SingleMessageRecipient,
|
||||
open class Buyer(val otherSide: Party,
|
||||
val notary: Party,
|
||||
val acceptablePrice: Amount<Issued<Currency>>,
|
||||
val typeToBuy: Class<out OwnableState>,
|
||||
|
@ -169,6 +169,7 @@ interface StorageService {
|
||||
* Returns the legal identity that this node is configured with. Assumed to be initialised when the node is
|
||||
* first installed.
|
||||
*/
|
||||
//TODO this should be in the IdentityService, or somewhere not here
|
||||
val myLegalIdentity: Party
|
||||
val myLegalIdentityKey: KeyPair
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package com.r3corda.core.protocols
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.utilities.ProgressTracker
|
||||
import com.r3corda.core.utilities.UntrustworthyData
|
||||
@ -39,9 +39,12 @@ abstract class ProtocolLogic<T> {
|
||||
val serviceHub: ServiceHub get() = psm.serviceHub
|
||||
|
||||
// Kotlin helpers that allow the use of generic types.
|
||||
inline fun <reified T : Any> sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long,
|
||||
sessionIDForReceive: Long, obj: Any): UntrustworthyData<T> {
|
||||
return psm.sendAndReceive(topic, destination, sessionIDForSend, sessionIDForReceive, obj, T::class.java)
|
||||
inline fun <reified T : Any> sendAndReceive(topic: String,
|
||||
destination: Party,
|
||||
sessionIDForSend: Long,
|
||||
sessionIDForReceive: Long,
|
||||
payload: Any): UntrustworthyData<T> {
|
||||
return psm.sendAndReceive(topic, destination, sessionIDForSend, sessionIDForReceive, payload, T::class.java)
|
||||
}
|
||||
|
||||
inline fun <reified T : Any> receive(topic: String, sessionIDForReceive: Long): UntrustworthyData<T> {
|
||||
@ -52,8 +55,8 @@ abstract class ProtocolLogic<T> {
|
||||
return psm.receive(topic, sessionIDForReceive, clazz)
|
||||
}
|
||||
|
||||
@Suspendable fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any) {
|
||||
psm.send(topic, destination, sessionID, obj)
|
||||
@Suspendable fun send(topic: String, destination: Party, sessionID: Long, payload: Any) {
|
||||
psm.send(topic, destination, sessionID, payload)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,7 +1,7 @@
|
||||
package com.r3corda.core.protocols
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.utilities.UntrustworthyData
|
||||
import org.slf4j.Logger
|
||||
@ -12,14 +12,14 @@ import org.slf4j.Logger
|
||||
*/
|
||||
interface ProtocolStateMachine<R> {
|
||||
@Suspendable
|
||||
fun <T : Any> sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long, sessionIDForReceive: Long,
|
||||
obj: Any, recvType: Class<T>): UntrustworthyData<T>
|
||||
fun <T : Any> sendAndReceive(topic: String, destination: Party, sessionIDForSend: Long, sessionIDForReceive: Long,
|
||||
payload: Any, recvType: Class<T>): UntrustworthyData<T>
|
||||
|
||||
@Suspendable
|
||||
fun <T : Any> receive(topic: String, sessionIDForReceive: Long, recvType: Class<T>): UntrustworthyData<T>
|
||||
|
||||
@Suspendable
|
||||
fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any)
|
||||
fun send(topic: String, destination: Party, sessionID: Long, payload: Any)
|
||||
|
||||
val serviceHub: ServiceHub
|
||||
val logger: Logger
|
||||
|
@ -1,9 +0,0 @@
|
||||
package com.r3corda.protocols
|
||||
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
|
||||
/**
|
||||
* Abstract superclass for request messages sent to services, which includes common
|
||||
* fields such as replyTo and replyToTopic.
|
||||
*/
|
||||
abstract class AbstractRequestMessage(val replyTo: MessageRecipients, val sessionID: Long?)
|
@ -1,9 +1,9 @@
|
||||
package com.r3corda.protocols
|
||||
|
||||
import com.r3corda.core.contracts.Attachment
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.crypto.sha256
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import java.io.ByteArrayInputStream
|
||||
import java.io.InputStream
|
||||
|
||||
@ -12,7 +12,7 @@ import java.io.InputStream
|
||||
* attachments are saved to local storage automatically.
|
||||
*/
|
||||
class FetchAttachmentsProtocol(requests: Set<SecureHash>,
|
||||
otherSide: SingleMessageRecipient) : FetchDataProtocol<Attachment, ByteArray>(requests, otherSide) {
|
||||
otherSide: Party) : FetchDataProtocol<Attachment, ByteArray>(requests, otherSide) {
|
||||
companion object {
|
||||
const val TOPIC = "platform.fetch.attachment"
|
||||
}
|
||||
|
@ -2,8 +2,8 @@ package com.r3corda.protocols
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.r3corda.core.contracts.NamedByHash
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.utilities.UntrustworthyData
|
||||
@ -27,13 +27,13 @@ import java.util.*
|
||||
*/
|
||||
abstract class FetchDataProtocol<T : NamedByHash, W : Any>(
|
||||
protected val requests: Set<SecureHash>,
|
||||
protected val otherSide: SingleMessageRecipient) : ProtocolLogic<FetchDataProtocol.Result<T>>() {
|
||||
protected val otherSide: Party) : ProtocolLogic<FetchDataProtocol.Result<T>>() {
|
||||
|
||||
open class BadAnswer : Exception()
|
||||
class HashNotFound(val requested: SecureHash) : BadAnswer()
|
||||
class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : BadAnswer()
|
||||
|
||||
class Request(val hashes: List<SecureHash>, replyTo: SingleMessageRecipient, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
|
||||
class Request(val hashes: List<SecureHash>, replyTo: Party, override val sessionID: Long) : AbstractRequestMessage(replyTo)
|
||||
data class Result<T : NamedByHash>(val fromDisk: List<T>, val downloaded: List<T>)
|
||||
|
||||
protected abstract val queryTopic: String
|
||||
@ -49,7 +49,7 @@ abstract class FetchDataProtocol<T : NamedByHash, W : Any>(
|
||||
logger.trace("Requesting ${toFetch.size} dependency(s) for verification")
|
||||
|
||||
val sid = random63BitValue()
|
||||
val fetchReq = Request(toFetch, serviceHub.networkService.myAddress, sid)
|
||||
val fetchReq = Request(toFetch, serviceHub.storageService.myLegalIdentity, sid)
|
||||
// TODO: Support "large message" response streaming so response sizes are not limited by RAM.
|
||||
val maybeItems = sendAndReceive<ArrayList<W?>>(queryTopic, otherSide, 0, sid, fetchReq)
|
||||
// Check for a buggy/malicious peer answering with something that we didn't ask for.
|
||||
|
@ -1,8 +1,8 @@
|
||||
package com.r3corda.protocols
|
||||
|
||||
import com.r3corda.core.contracts.SignedTransaction
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
|
||||
/**
|
||||
* Given a set of tx hashes (IDs), either loads them from local disk or asks the remote peer to provide them.
|
||||
@ -12,7 +12,7 @@ import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
* results in a [FetchDataProtocol.HashNotFound] exception. Note that returned transactions are not inserted into
|
||||
* the database, because it's up to the caller to actually verify the transactions are valid.
|
||||
*/
|
||||
class FetchTransactionsProtocol(requests: Set<SecureHash>, otherSide: SingleMessageRecipient) :
|
||||
class FetchTransactionsProtocol(requests: Set<SecureHash>, otherSide: Party) :
|
||||
FetchDataProtocol<SignedTransaction, SignedTransaction>(requests, otherSide) {
|
||||
companion object {
|
||||
const val TOPIC = "platform.fetch.tx"
|
||||
|
@ -9,8 +9,6 @@ import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.SignedData
|
||||
import com.r3corda.core.crypto.signWithECDSA
|
||||
import com.r3corda.core.messaging.Ack
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.TimestampChecker
|
||||
import com.r3corda.core.node.services.UniquenessException
|
||||
import com.r3corda.core.node.services.UniquenessProvider
|
||||
@ -45,21 +43,21 @@ object NotaryProtocol {
|
||||
fun tracker() = ProgressTracker(REQUESTING, VALIDATING)
|
||||
}
|
||||
|
||||
lateinit var notaryNode: NodeInfo
|
||||
lateinit var notaryParty: Party
|
||||
|
||||
@Suspendable
|
||||
override fun call(): DigitalSignature.LegallyIdentifiable {
|
||||
progressTracker.currentStep = REQUESTING
|
||||
notaryNode = findNotaryNode()
|
||||
notaryParty = findNotaryParty()
|
||||
|
||||
val sendSessionID = random63BitValue()
|
||||
val receiveSessionID = random63BitValue()
|
||||
|
||||
val handshake = Handshake(serviceHub.networkService.myAddress, sendSessionID, receiveSessionID)
|
||||
sendAndReceive<Ack>(TOPIC_INITIATE, notaryNode.address, 0, receiveSessionID, handshake)
|
||||
val handshake = Handshake(serviceHub.storageService.myLegalIdentity, sendSessionID, receiveSessionID)
|
||||
sendAndReceive<Ack>(TOPIC_INITIATE, notaryParty, 0, receiveSessionID, handshake)
|
||||
|
||||
val request = SignRequest(stx, serviceHub.storageService.myLegalIdentity)
|
||||
val response = sendAndReceive<Result>(TOPIC, notaryNode.address, sendSessionID, receiveSessionID, request)
|
||||
val response = sendAndReceive<Result>(TOPIC, notaryParty, sendSessionID, receiveSessionID, request)
|
||||
|
||||
val notaryResult = validateResponse(response)
|
||||
return notaryResult.sig ?: throw NotaryException(notaryResult.error!!)
|
||||
@ -72,17 +70,17 @@ object NotaryProtocol {
|
||||
if (it.sig != null) validateSignature(it.sig, stx.txBits)
|
||||
else if (it.error is NotaryError.Conflict) it.error.conflict.verified()
|
||||
else if (it.error == null || it.error !is NotaryError)
|
||||
throw IllegalStateException("Received invalid result from Notary service '${notaryNode.identity}'")
|
||||
throw IllegalStateException("Received invalid result from Notary service '$notaryParty'")
|
||||
return it
|
||||
}
|
||||
}
|
||||
|
||||
private fun validateSignature(sig: DigitalSignature.LegallyIdentifiable, data: SerializedBytes<WireTransaction>) {
|
||||
check(sig.signer == notaryNode.identity) { "Notary result not signed by the correct service" }
|
||||
check(sig.signer == notaryParty) { "Notary result not signed by the correct service" }
|
||||
sig.verifyWithECDSA(data)
|
||||
}
|
||||
|
||||
private fun findNotaryNode(): NodeInfo {
|
||||
private fun findNotaryParty(): Party {
|
||||
var maybeNotaryKey: PublicKey? = null
|
||||
val wtx = stx.tx
|
||||
|
||||
@ -97,8 +95,8 @@ object NotaryProtocol {
|
||||
}
|
||||
|
||||
val notaryKey = maybeNotaryKey ?: throw IllegalStateException("Transaction does not specify a Notary")
|
||||
val notaryNode = serviceHub.networkMapCache.getNodeByPublicKey(notaryKey)
|
||||
return notaryNode ?: throw IllegalStateException("No Notary node can be found with the specified public key")
|
||||
val notaryParty = serviceHub.networkMapCache.getNodeByPublicKey(notaryKey)?.identity
|
||||
return notaryParty ?: throw IllegalStateException("No Notary node can be found with the specified public key")
|
||||
}
|
||||
}
|
||||
|
||||
@ -110,7 +108,7 @@ object NotaryProtocol {
|
||||
*
|
||||
* TODO: the notary service should only be able to see timestamp commands and inputs
|
||||
*/
|
||||
open class Service(val otherSide: SingleMessageRecipient,
|
||||
open class Service(val otherSide: Party,
|
||||
val sendSessionID: Long,
|
||||
val receiveSessionID: Long,
|
||||
val timestampChecker: TimestampChecker,
|
||||
@ -181,9 +179,9 @@ object NotaryProtocol {
|
||||
}
|
||||
|
||||
class Handshake(
|
||||
replyTo: SingleMessageRecipient,
|
||||
replyTo: Party,
|
||||
val sendSessionID: Long,
|
||||
sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
|
||||
override val sessionID: Long) : AbstractRequestMessage(replyTo)
|
||||
|
||||
/** TODO: The caller must authenticate instead of just specifying its identity */
|
||||
class SignRequest(val tx: SignedTransaction,
|
||||
@ -197,7 +195,7 @@ object NotaryProtocol {
|
||||
}
|
||||
|
||||
interface Factory {
|
||||
fun create(otherSide: SingleMessageRecipient,
|
||||
fun create(otherSide: Party,
|
||||
sendSessionID: Long,
|
||||
receiveSessionID: Long,
|
||||
timestampChecker: TimestampChecker,
|
||||
@ -205,7 +203,7 @@ object NotaryProtocol {
|
||||
}
|
||||
|
||||
object DefaultFactory : Factory {
|
||||
override fun create(otherSide: SingleMessageRecipient,
|
||||
override fun create(otherSide: Party,
|
||||
sendSessionID: Long,
|
||||
receiveSessionID: Long,
|
||||
timestampChecker: TimestampChecker,
|
||||
|
@ -6,8 +6,7 @@ import com.r3corda.core.contracts.FixOf
|
||||
import com.r3corda.core.contracts.TransactionBuilder
|
||||
import com.r3corda.core.contracts.WireTransaction
|
||||
import com.r3corda.core.crypto.DigitalSignature
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.utilities.ProgressTracker
|
||||
@ -28,7 +27,7 @@ import java.util.*
|
||||
* @throws FixOutOfRange if the returned fix was further away from the expected rate by the given amount.
|
||||
*/
|
||||
open class RatesFixProtocol(protected val tx: TransactionBuilder,
|
||||
private val oracle: NodeInfo,
|
||||
private val oracle: Party,
|
||||
private val fixOf: FixOf,
|
||||
private val expectedRate: BigDecimal,
|
||||
private val rateTolerance: BigDecimal,
|
||||
@ -48,8 +47,8 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder,
|
||||
|
||||
class FixOutOfRange(val byAmount: BigDecimal) : Exception()
|
||||
|
||||
class QueryRequest(val queries: List<FixOf>, replyTo: SingleMessageRecipient, sessionID: Long, val deadline: Instant) : AbstractRequestMessage(replyTo, sessionID)
|
||||
class SignRequest(val tx: WireTransaction, replyTo: SingleMessageRecipient, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
|
||||
class QueryRequest(val queries: List<FixOf>, replyTo: Party, override val sessionID: Long, val deadline: Instant) : AbstractRequestMessage(replyTo)
|
||||
class SignRequest(val tx: WireTransaction, replyTo: Party, override val sessionID: Long) : AbstractRequestMessage(replyTo)
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
@ -57,7 +56,7 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder,
|
||||
val fix = query()
|
||||
progressTracker.currentStep = WORKING
|
||||
checkFixIsNearExpected(fix)
|
||||
tx.addCommand(fix, oracle.identity.owningKey)
|
||||
tx.addCommand(fix, oracle.owningKey)
|
||||
beforeSigning(fix)
|
||||
progressTracker.currentStep = SIGNING
|
||||
tx.addSignatureUnchecked(sign())
|
||||
@ -83,11 +82,11 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder,
|
||||
fun sign(): DigitalSignature.LegallyIdentifiable {
|
||||
val sessionID = random63BitValue()
|
||||
val wtx = tx.toWireTransaction()
|
||||
val req = SignRequest(wtx, serviceHub.networkService.myAddress, sessionID)
|
||||
val resp = sendAndReceive<DigitalSignature.LegallyIdentifiable>(TOPIC_SIGN, oracle.address, 0, sessionID, req)
|
||||
val req = SignRequest(wtx, serviceHub.storageService.myLegalIdentity, sessionID)
|
||||
val resp = sendAndReceive<DigitalSignature.LegallyIdentifiable>(TOPIC_SIGN, oracle, 0, sessionID, req)
|
||||
|
||||
return resp.validate { sig ->
|
||||
check(sig.signer == oracle.identity)
|
||||
check(sig.signer == oracle)
|
||||
tx.checkSignature(sig)
|
||||
sig
|
||||
}
|
||||
@ -96,10 +95,10 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder,
|
||||
@Suspendable
|
||||
fun query(): Fix {
|
||||
val sessionID = random63BitValue()
|
||||
val deadline = suggestInterestRateAnnouncementTimeWindow(fixOf.name, oracle.identity.name, fixOf.forDay).end
|
||||
val req = QueryRequest(listOf(fixOf), serviceHub.networkService.myAddress, sessionID, deadline)
|
||||
val deadline = suggestInterestRateAnnouncementTimeWindow(fixOf.name, oracle.name, fixOf.forDay).end
|
||||
val req = QueryRequest(listOf(fixOf), serviceHub.storageService.myLegalIdentity, sessionID, deadline)
|
||||
// TODO: add deadline to receive
|
||||
val resp = sendAndReceive<ArrayList<Fix>>(TOPIC_QUERY, oracle.address, 0, sessionID, req)
|
||||
val resp = sendAndReceive<ArrayList<Fix>>(TOPIC_QUERY, oracle, 0, sessionID, req)
|
||||
|
||||
return resp.validate {
|
||||
val fix = it.first()
|
||||
|
@ -2,8 +2,8 @@ package com.r3corda.protocols
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import java.util.*
|
||||
|
||||
@ -21,7 +21,7 @@ import java.util.*
|
||||
* protocol is helpful when resolving and verifying a finished but partially signed transaction.
|
||||
*/
|
||||
class ResolveTransactionsProtocol(private val txHashes: Set<SecureHash>,
|
||||
private val otherSide: SingleMessageRecipient) : ProtocolLogic<Unit>() {
|
||||
private val otherSide: Party) : ProtocolLogic<Unit>() {
|
||||
|
||||
companion object {
|
||||
private fun dependencyIDs(wtx: WireTransaction) = wtx.inputs.map { it.txhash }.toSet()
|
||||
@ -33,11 +33,11 @@ class ResolveTransactionsProtocol(private val txHashes: Set<SecureHash>,
|
||||
private var stx: SignedTransaction? = null
|
||||
private var wtx: WireTransaction? = null
|
||||
|
||||
constructor(stx: SignedTransaction, otherSide: SingleMessageRecipient) : this(stx.tx, otherSide) {
|
||||
constructor(stx: SignedTransaction, otherSide: Party) : this(stx.tx, otherSide) {
|
||||
this.stx = stx
|
||||
}
|
||||
|
||||
constructor(wtx: WireTransaction, otherSide: SingleMessageRecipient) : this(dependencyIDs(wtx), otherSide) {
|
||||
constructor(wtx: WireTransaction, otherSide: Party) : this(dependencyIDs(wtx), otherSide) {
|
||||
this.wtx = wtx
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,20 @@
|
||||
package com.r3corda.protocols
|
||||
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
|
||||
/**
|
||||
* Abstract superclass for request messages sent to services, which includes common
|
||||
* fields such as replyTo and replyToTopic.
|
||||
*/
|
||||
interface ServiceRequestMessage {
|
||||
val sessionID: Long
|
||||
fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients
|
||||
}
|
||||
|
||||
abstract class AbstractRequestMessage(val replyToParty: Party): ServiceRequestMessage {
|
||||
override fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients {
|
||||
return networkMapCache.partyNodes.single { it.identity == replyToParty }.address
|
||||
}
|
||||
}
|
@ -6,7 +6,6 @@ import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.DigitalSignature
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.signWithECDSA
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.random63BitValue
|
||||
@ -72,7 +71,7 @@ object TwoPartyDealProtocol {
|
||||
|
||||
abstract val payload: U
|
||||
abstract val notaryNode: NodeInfo
|
||||
abstract val otherSide: SingleMessageRecipient
|
||||
abstract val otherSide: Party
|
||||
abstract val otherSessionID: Long
|
||||
abstract val myKeyPair: KeyPair
|
||||
|
||||
@ -153,7 +152,7 @@ object TwoPartyDealProtocol {
|
||||
// Copy the transaction to every regulator in the network. This is obviously completely bogus, it's
|
||||
// just for demo purposes.
|
||||
for (regulator in regulators) {
|
||||
send("regulator.all.seeing.eye", regulator.address, 0, fullySigned)
|
||||
send("regulator.all.seeing.eye", regulator.identity, 0, fullySigned)
|
||||
}
|
||||
}
|
||||
|
||||
@ -203,7 +202,7 @@ object TwoPartyDealProtocol {
|
||||
fun tracker() = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SWAPPING_SIGNATURES, RECORDING)
|
||||
}
|
||||
|
||||
abstract val otherSide: SingleMessageRecipient
|
||||
abstract val otherSide: Party
|
||||
abstract val sessionID: Long
|
||||
|
||||
@Suspendable
|
||||
@ -276,7 +275,7 @@ object TwoPartyDealProtocol {
|
||||
/**
|
||||
* One side of the protocol for inserting a pre-agreed deal.
|
||||
*/
|
||||
open class Instigator<T : DealState>(override val otherSide: SingleMessageRecipient,
|
||||
open class Instigator<T : DealState>(override val otherSide: Party,
|
||||
val notary: Party,
|
||||
override val payload: T,
|
||||
override val myKeyPair: KeyPair,
|
||||
@ -290,7 +289,7 @@ object TwoPartyDealProtocol {
|
||||
/**
|
||||
* One side of the protocol for inserting a pre-agreed deal.
|
||||
*/
|
||||
open class Acceptor<T : DealState>(override val otherSide: SingleMessageRecipient,
|
||||
open class Acceptor<T : DealState>(override val otherSide: Party,
|
||||
val notary: Party,
|
||||
val dealToBuy: T,
|
||||
override val sessionID: Long,
|
||||
@ -342,7 +341,7 @@ object TwoPartyDealProtocol {
|
||||
|
||||
override val sessionID: Long get() = initiation.sessionID
|
||||
|
||||
override val otherSide: SingleMessageRecipient get() = initiation.sender
|
||||
override val otherSide: Party get() = initiation.sender
|
||||
|
||||
private lateinit var txState: TransactionState<*>
|
||||
private lateinit var deal: FixableDealState
|
||||
@ -360,8 +359,6 @@ object TwoPartyDealProtocol {
|
||||
val myName = serviceHub.storageService.myLegalIdentity.name
|
||||
val otherParty = deal.parties.filter { it.name != myName }.single()
|
||||
check(otherParty == initiation.party)
|
||||
val otherPartyAddress = serviceHub.networkMapCache.getNodeByLegalName(otherParty.name)!!.address
|
||||
check(otherPartyAddress == otherSide)
|
||||
// Also check we are one of the parties
|
||||
deal.parties.filter { it.name == myName }.single()
|
||||
|
||||
@ -380,7 +377,7 @@ object TwoPartyDealProtocol {
|
||||
val newDeal = deal
|
||||
|
||||
val ptx = TransactionType.General.Builder()
|
||||
val addFixing = object : RatesFixProtocol(ptx, serviceHub.networkMapCache.ratesOracleNodes[0], fixOf, BigDecimal.ZERO, BigDecimal.ONE, initiation.timeout) {
|
||||
val addFixing = object : RatesFixProtocol(ptx, serviceHub.networkMapCache.ratesOracleNodes[0].identity, fixOf, BigDecimal.ZERO, BigDecimal.ONE, initiation.timeout) {
|
||||
@Suspendable
|
||||
override fun beforeSigning(fix: Fix) {
|
||||
newDeal.generateFix(ptx, StateAndRef(txState, handshake.payload), fix)
|
||||
@ -417,11 +414,10 @@ object TwoPartyDealProtocol {
|
||||
return serviceHub.keyManagementService.toKeyPair(publicKey)
|
||||
}
|
||||
|
||||
override val otherSide: SingleMessageRecipient get() {
|
||||
override val otherSide: Party get() {
|
||||
// TODO: what happens if there's no node? Move to messaging taking Party and then handled in messaging layer
|
||||
val myName = serviceHub.storageService.myLegalIdentity.name
|
||||
val otherParty = dealToFix.state.data.parties.filter { it.name != myName }.single()
|
||||
return serviceHub.networkMapCache.getNodeByLegalName(otherParty.name)!!.address
|
||||
return dealToFix.state.data.parties.filter { it.name != myName }.single()
|
||||
}
|
||||
|
||||
override val notaryNode: NodeInfo get() =
|
||||
@ -432,7 +428,7 @@ object TwoPartyDealProtocol {
|
||||
val FIX_INITIATE_TOPIC = "platform.fix.initiate"
|
||||
|
||||
/** Used to set up the session between [Floater] and [Fixer] */
|
||||
data class FixingSessionInitiation(val sessionID: Long, val party: Party, val sender: SingleMessageRecipient, val timeout: Duration)
|
||||
data class FixingSessionInitiation(val sessionID: Long, val party: Party, val sender: Party, val timeout: Duration)
|
||||
|
||||
/**
|
||||
* This protocol looks at the deal and decides whether to be the Fixer or Floater role in agreeing a fixing.
|
||||
@ -459,10 +455,10 @@ object TwoPartyDealProtocol {
|
||||
if (sortedParties[0].name == serviceHub.storageService.myLegalIdentity.name) {
|
||||
// Generate sessionID
|
||||
val sessionID = random63BitValue()
|
||||
val initation = FixingSessionInitiation(sessionID, sortedParties[0], serviceHub.networkService.myAddress, timeout)
|
||||
val initation = FixingSessionInitiation(sessionID, sortedParties[0], serviceHub.storageService.myLegalIdentity, timeout)
|
||||
|
||||
// Send initiation to other side to launch one side of the fixing protocol (the Fixer).
|
||||
send(FIX_INITIATE_TOPIC, serviceHub.networkMapCache.getNodeByLegalName(sortedParties[1].name)!!.address, 0, initation)
|
||||
send(FIX_INITIATE_TOPIC, sortedParties[1], 0, initation)
|
||||
|
||||
// Then start the other side of the fixing protocol.
|
||||
val protocol = Floater(ref, sessionID)
|
||||
|
@ -6,7 +6,6 @@ import com.r3corda.core.contracts.TransactionVerificationException
|
||||
import com.r3corda.core.contracts.WireTransaction
|
||||
import com.r3corda.core.contracts.toLedgerTransaction
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.services.TimestampChecker
|
||||
import com.r3corda.core.node.services.UniquenessProvider
|
||||
import java.security.SignatureException
|
||||
@ -17,7 +16,7 @@ import java.security.SignatureException
|
||||
* has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was
|
||||
* indeed valid
|
||||
*/
|
||||
class ValidatingNotaryProtocol(otherSide: SingleMessageRecipient,
|
||||
class ValidatingNotaryProtocol(otherSide: Party,
|
||||
sessionIdForSend: Long,
|
||||
sessionIdForReceive: Long,
|
||||
timestampChecker: TimestampChecker,
|
||||
@ -52,7 +51,6 @@ class ValidatingNotaryProtocol(otherSide: SingleMessageRecipient,
|
||||
|
||||
@Suspendable
|
||||
private fun validateDependencies(reqIdentity: Party, wtx: WireTransaction) {
|
||||
val otherSide = serviceHub.networkMapCache.getNodeByPublicKey(reqIdentity.owningKey)!!.address
|
||||
subProtocol(ResolveTransactionsProtocol(wtx, otherSide))
|
||||
subProtocol(ResolveTransactionsProtocol(wtx, reqIdentity))
|
||||
}
|
||||
}
|
@ -6,7 +6,6 @@ import com.r3corda.core.crypto.DigitalSignature
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.signWithECDSA
|
||||
import com.r3corda.core.messaging.Ack
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.random63BitValue
|
||||
@ -34,8 +33,8 @@ abstract class AbstractStateReplacementProtocol<T> {
|
||||
}
|
||||
|
||||
class Handshake(val sessionIdForSend: Long,
|
||||
replyTo: SingleMessageRecipient,
|
||||
replySessionId: Long) : AbstractRequestMessage(replyTo, replySessionId)
|
||||
replyTo: Party,
|
||||
override val sessionID: Long) : AbstractRequestMessage(replyTo)
|
||||
|
||||
abstract class Instigator<S : ContractState, T>(val originalState: StateAndRef<S>,
|
||||
val modification: T,
|
||||
@ -89,7 +88,7 @@ abstract class AbstractStateReplacementProtocol<T> {
|
||||
}
|
||||
|
||||
val allSignatures = participantSignatures + getNotarySignature(stx)
|
||||
sessions.forEach { send(TOPIC_CHANGE, it.key.address, it.value, allSignatures) }
|
||||
sessions.forEach { send(TOPIC_CHANGE, it.key.identity, it.value, allSignatures) }
|
||||
|
||||
return allSignatures
|
||||
}
|
||||
@ -99,10 +98,10 @@ abstract class AbstractStateReplacementProtocol<T> {
|
||||
val sessionIdForReceive = random63BitValue()
|
||||
val proposal = assembleProposal(originalState.ref, modification, stx)
|
||||
|
||||
val handshake = Handshake(sessionIdForSend, serviceHub.networkService.myAddress, sessionIdForReceive)
|
||||
sendAndReceive<Ack>(TOPIC_INITIATE, node.address, 0, sessionIdForReceive, handshake)
|
||||
val handshake = Handshake(sessionIdForSend, serviceHub.storageService.myLegalIdentity, sessionIdForReceive)
|
||||
sendAndReceive<Ack>(TOPIC_INITIATE, node.identity, 0, sessionIdForReceive, handshake)
|
||||
|
||||
val response = sendAndReceive<Result>(TOPIC_CHANGE, node.address, sessionIdForSend, sessionIdForReceive, proposal)
|
||||
val response = sendAndReceive<Result>(TOPIC_CHANGE, node.identity, sessionIdForSend, sessionIdForReceive, proposal)
|
||||
val participantSignature = response.validate {
|
||||
if (it.sig == null) throw StateReplacementException(it.error!!)
|
||||
else {
|
||||
@ -122,7 +121,7 @@ abstract class AbstractStateReplacementProtocol<T> {
|
||||
}
|
||||
}
|
||||
|
||||
abstract class Acceptor<T>(val otherSide: SingleMessageRecipient,
|
||||
abstract class Acceptor<T>(val otherSide: Party,
|
||||
val sessionIdForSend: Long,
|
||||
val sessionIdForReceive: Long,
|
||||
override val progressTracker: ProgressTracker = tracker()) : ProtocolLogic<Unit>() {
|
||||
@ -241,4 +240,4 @@ class StateReplacementRefused(val identity: Party, val state: StateRef, val deta
|
||||
}
|
||||
|
||||
class StateReplacementException(val error: StateReplacementRefused)
|
||||
: Exception("State change failed - ${error}")
|
||||
: Exception("State change failed - $error")
|
@ -2,18 +2,8 @@ package protocols
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.DigitalSignature
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.signWithECDSA
|
||||
import com.r3corda.core.messaging.Ack
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.utilities.ProgressTracker
|
||||
import com.r3corda.protocols.AbstractRequestMessage
|
||||
import com.r3corda.protocols.NotaryProtocol
|
||||
import com.r3corda.protocols.ResolveTransactionsProtocol
|
||||
import java.security.PublicKey
|
||||
|
||||
/**
|
||||
@ -58,7 +48,7 @@ object NotaryChangeProtocol: AbstractStateReplacementProtocol<Party>() {
|
||||
}
|
||||
}
|
||||
|
||||
class Acceptor(otherSide: SingleMessageRecipient,
|
||||
class Acceptor(otherSide: Party,
|
||||
sessionIdForSend: Long,
|
||||
sessionIdForReceive: Long,
|
||||
override val progressTracker: ProgressTracker = tracker())
|
||||
|
@ -33,6 +33,7 @@ import com.r3corda.node.services.keys.E2ETestKeyManagementService
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapCache
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapService
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC
|
||||
import com.r3corda.node.services.network.NodeRegistration
|
||||
import com.r3corda.node.services.persistence.*
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
@ -50,7 +51,6 @@ import java.nio.file.FileAlreadyExistsException
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.security.KeyPair
|
||||
import java.security.Security
|
||||
import java.time.Clock
|
||||
import java.util.*
|
||||
|
||||
@ -160,8 +160,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
|
||||
// This object doesn't need to be referenced from this class because it registers handlers on the network
|
||||
// service and so that keeps it from being collected.
|
||||
DataVendingService(net, storage)
|
||||
NotaryChangeService(net, smm)
|
||||
DataVendingService(net, storage, services.networkMapCache)
|
||||
NotaryChangeService(net, smm, services.networkMapCache)
|
||||
|
||||
buildAdvertisedServices()
|
||||
|
||||
@ -231,9 +231,9 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
val reg = NodeRegistration(info, networkMapSeq++, type, expires)
|
||||
val sessionID = random63BitValue()
|
||||
val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID)
|
||||
val message = net.createMessage(NetworkMapService.REGISTER_PROTOCOL_TOPIC + ".0", request.serialize().bits)
|
||||
val message = net.createMessage("$REGISTER_PROTOCOL_TOPIC.0", request.serialize().bits)
|
||||
val future = SettableFuture.create<NetworkMapService.RegistrationResponse>()
|
||||
val topic = NetworkMapService.REGISTER_PROTOCOL_TOPIC + "." + sessionID
|
||||
val topic = "$REGISTER_PROTOCOL_TOPIC.$sessionID"
|
||||
|
||||
net.runOnNextMessage(topic, RunOnCallerThread) { message ->
|
||||
future.set(message.data.deserialize())
|
||||
@ -254,8 +254,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
val timestampChecker = TimestampChecker(platformClock, 30.seconds)
|
||||
|
||||
inNodeNotaryService = when (type) {
|
||||
is SimpleNotaryService.Type -> SimpleNotaryService(smm, net, timestampChecker, uniquenessProvider)
|
||||
is ValidatingNotaryService.Type -> ValidatingNotaryService(smm, net, timestampChecker, uniquenessProvider)
|
||||
is SimpleNotaryService.Type -> SimpleNotaryService(smm, net, timestampChecker, uniquenessProvider, services.networkMapCache)
|
||||
is ValidatingNotaryService.Type -> ValidatingNotaryService(smm, net, timestampChecker, uniquenessProvider, services.networkMapCache)
|
||||
else -> null
|
||||
}
|
||||
}
|
||||
|
@ -130,8 +130,8 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
|
||||
|
||||
val sessionID = random63BitValue()
|
||||
|
||||
val instigator = TwoPartyDealProtocol.Instigator(node2.net.myAddress, notary.info.identity, irs, nodeAKey!!, sessionID)
|
||||
val acceptor = TwoPartyDealProtocol.Acceptor(node1.net.myAddress, notary.info.identity, irs, sessionID)
|
||||
val instigator = TwoPartyDealProtocol.Instigator(node2.info.identity, notary.info.identity, irs, nodeAKey!!, sessionID)
|
||||
val acceptor = TwoPartyDealProtocol.Acceptor(node1.info.identity, notary.info.identity, irs, sessionID)
|
||||
|
||||
showProgressFor(listOf(node1, node2))
|
||||
showConsensusFor(listOf(node1, node2, regulators[0]))
|
||||
|
@ -44,10 +44,19 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
|
||||
val cashIssuerKey = generateKeyPair()
|
||||
val amount = 1000.DOLLARS `issued by` Party("Big friendly bank", cashIssuerKey.public).ref(1)
|
||||
val sessionID = random63BitValue()
|
||||
val buyerProtocol = TwoPartyTradeProtocol.Buyer(seller.net.myAddress, notary.info.identity,
|
||||
amount, CommercialPaper.State::class.java, sessionID)
|
||||
val sellerProtocol = TwoPartyTradeProtocol.Seller(buyer.net.myAddress, notary.info,
|
||||
issuance.tx.outRef(0), amount, seller.storage.myLegalIdentityKey, sessionID)
|
||||
val buyerProtocol = TwoPartyTradeProtocol.Buyer(
|
||||
seller.info.identity,
|
||||
notary.info.identity,
|
||||
amount,
|
||||
CommercialPaper.State::class.java,
|
||||
sessionID)
|
||||
val sellerProtocol = TwoPartyTradeProtocol.Seller(
|
||||
buyer.info.identity,
|
||||
notary.info,
|
||||
issuance.tx.outRef(0),
|
||||
amount,
|
||||
seller.storage.myLegalIdentityKey,
|
||||
sessionID)
|
||||
|
||||
showConsensusFor(listOf(buyer, seller, notary))
|
||||
showProgressFor(listOf(buyer, seller))
|
||||
|
@ -2,7 +2,7 @@ package com.r3corda.node.services
|
||||
|
||||
import com.r3corda.core.messaging.Ack
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.node.services.api.AbstractNodeService
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
import protocols.AbstractStateReplacementProtocol
|
||||
@ -12,7 +12,7 @@ import protocols.NotaryChangeProtocol
|
||||
* A service that monitors the network for requests for changing the notary of a state,
|
||||
* and immediately runs the [NotaryChangeProtocol] if the auto-accept criteria are met.
|
||||
*/
|
||||
class NotaryChangeService(net: MessagingService, val smm: StateMachineManager) : AbstractNodeService(net) {
|
||||
class NotaryChangeService(net: MessagingService, val smm: StateMachineManager, networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) {
|
||||
init {
|
||||
addMessageHandler(NotaryChangeProtocol.TOPIC_INITIATE,
|
||||
{ req: AbstractStateReplacementProtocol.Handshake -> handleChangeNotaryRequest(req) }
|
||||
@ -21,8 +21,8 @@ class NotaryChangeService(net: MessagingService, val smm: StateMachineManager) :
|
||||
|
||||
private fun handleChangeNotaryRequest(req: AbstractStateReplacementProtocol.Handshake): Ack {
|
||||
val protocol = NotaryChangeProtocol.Acceptor(
|
||||
req.replyTo as SingleMessageRecipient,
|
||||
req.sessionID!!,
|
||||
req.replyToParty,
|
||||
req.sessionID,
|
||||
req.sessionIdForSend)
|
||||
smm.add(NotaryChangeProtocol.TOPIC_CHANGE, protocol)
|
||||
return Ack
|
||||
|
@ -2,18 +2,19 @@ package com.r3corda.node.services.api
|
||||
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import com.r3corda.protocols.AbstractRequestMessage
|
||||
import com.r3corda.protocols.ServiceRequestMessage
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* Abstract superclass for services that a node can host, which provides helper functions.
|
||||
*/
|
||||
@ThreadSafe
|
||||
abstract class AbstractNodeService(val net: MessagingService) : SingletonSerializeAsToken() {
|
||||
abstract class AbstractNodeService(val net: MessagingService, val networkMapCache: NetworkMapCache) : SingletonSerializeAsToken() {
|
||||
|
||||
/**
|
||||
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
|
||||
@ -24,18 +25,18 @@ abstract class AbstractNodeService(val net: MessagingService) : SingletonSeriali
|
||||
* @param handler a function to handle the deserialised request and return an optional response (if return type not Unit)
|
||||
* @param exceptionConsumer a function to which any thrown exception is passed.
|
||||
*/
|
||||
protected inline fun <reified Q : AbstractRequestMessage, reified R : Any>
|
||||
protected inline fun <reified Q : ServiceRequestMessage, reified R : Any>
|
||||
addMessageHandler(topic: String,
|
||||
crossinline handler: (Q) -> R,
|
||||
crossinline exceptionConsumer: (Message, Exception) -> Unit) {
|
||||
net.addMessageHandler(topic + TOPIC_DEFAULT_POSTFIX, null) { message, r ->
|
||||
try {
|
||||
val req = message.data.deserialize<Q>()
|
||||
val data = handler(req)
|
||||
val request = message.data.deserialize<Q>()
|
||||
val response = handler(request)
|
||||
// If the return type R is Unit, then do not send a response
|
||||
if (data.javaClass != Unit.javaClass) {
|
||||
val msg = net.createMessage("$topic.${req.sessionID}", data.serialize().bits)
|
||||
net.send(msg, req.replyTo)
|
||||
if (response.javaClass != Unit.javaClass) {
|
||||
val msg = net.createMessage("$topic.${request.sessionID}", response.serialize().bits)
|
||||
net.send(msg, request.getReplyTo(networkMapCache))
|
||||
}
|
||||
} catch(e: Exception) {
|
||||
exceptionConsumer(message, e)
|
||||
@ -51,7 +52,7 @@ abstract class AbstractNodeService(val net: MessagingService) : SingletonSeriali
|
||||
* @param topic the topic, without the default session ID postfix (".0)
|
||||
* @param handler a function to handle the deserialised request and return an optional response (if return type not Unit)
|
||||
*/
|
||||
protected inline fun <reified Q : AbstractRequestMessage, reified R : Any>
|
||||
protected inline fun <reified Q : ServiceRequestMessage, reified R : Any>
|
||||
addMessageHandler(topic: String,
|
||||
crossinline handler: (Q) -> R) {
|
||||
addMessageHandler(topic, handler, { message: Message, exception: Exception -> throw exception })
|
||||
|
@ -41,7 +41,7 @@ object NodeInterestRates {
|
||||
/**
|
||||
* The Service that wraps [Oracle] and handles messages/network interaction/request scrubbing.
|
||||
*/
|
||||
class Service(node: AbstractNode) : AcceptsFileUpload, AbstractNodeService(node.services.networkService) {
|
||||
class Service(node: AbstractNode) : AcceptsFileUpload, AbstractNodeService(node.services.networkService, node.services.networkMapCache) {
|
||||
val ss = node.services.storageService
|
||||
val oracle = Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey, node.services.clock)
|
||||
|
||||
@ -84,7 +84,7 @@ object NodeInterestRates {
|
||||
override fun call(): Unit {
|
||||
val answers = service.oracle.query(request.queries, request.deadline)
|
||||
progressTracker.currentStep = SENDING
|
||||
send("${RatesFixProtocol.TOPIC}.query", request.replyTo, request.sessionID!!, answers)
|
||||
send("${RatesFixProtocol.TOPIC}.query", request.replyToParty, request.sessionID, answers)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -7,16 +7,16 @@ import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX
|
||||
import com.r3corda.core.serialization.SerializedBytes
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import com.r3corda.node.services.api.AbstractNodeService
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.protocols.ServiceRequestMessage
|
||||
import org.slf4j.LoggerFactory
|
||||
import com.r3corda.protocols.AbstractRequestMessage
|
||||
import java.security.PrivateKey
|
||||
import java.security.SignatureException
|
||||
import java.time.Instant
|
||||
@ -61,20 +61,25 @@ interface NetworkMapService {
|
||||
|
||||
val nodes: List<NodeInfo>
|
||||
|
||||
class FetchMapRequest(val subscribe: Boolean, val ifChangedSinceVersion: Int?, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
|
||||
abstract class NetworkMapRequestMessage(val replyTo: MessageRecipients) : ServiceRequestMessage {
|
||||
override fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients = replyTo
|
||||
}
|
||||
|
||||
class FetchMapRequest(val subscribe: Boolean, val ifChangedSinceVersion: Int?, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
|
||||
data class FetchMapResponse(val nodes: Collection<NodeRegistration>?, val version: Int)
|
||||
class QueryIdentityRequest(val identity: Party, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
|
||||
class QueryIdentityRequest(val identity: Party, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
|
||||
data class QueryIdentityResponse(val node: NodeInfo?)
|
||||
class RegistrationRequest(val wireReg: WireNodeRegistration, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
|
||||
class RegistrationRequest(val wireReg: WireNodeRegistration, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
|
||||
data class RegistrationResponse(val success: Boolean)
|
||||
class SubscribeRequest(val subscribe: Boolean, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID)
|
||||
class SubscribeRequest(val subscribe: Boolean, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo)
|
||||
data class SubscribeResponse(val confirmed: Boolean)
|
||||
data class Update(val wireReg: WireNodeRegistration, val replyTo: MessageRecipients)
|
||||
data class UpdateAcknowledge(val wireRegHash: SecureHash, val replyTo: MessageRecipients)
|
||||
}
|
||||
|
||||
|
||||
@ThreadSafe
|
||||
class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, val cache: NetworkMapCache) : NetworkMapService, AbstractNodeService(net) {
|
||||
class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, val cache: NetworkMapCache) : NetworkMapService, AbstractNodeService(net, cache) {
|
||||
private val registeredNodes = ConcurrentHashMap<Party, NodeRegistration>()
|
||||
// Map from subscriber address, to a list of unacknowledged updates
|
||||
private val subscribers = ThreadBox(mutableMapOf<SingleMessageRecipient, MutableList<SecureHash>>())
|
||||
|
@ -2,6 +2,7 @@ package com.r3corda.node.services.persistence
|
||||
|
||||
import com.r3corda.core.contracts.SignedTransaction
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.core.node.services.StorageService
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.services.api.AbstractNodeService
|
||||
@ -24,7 +25,7 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class DataVendingService(net: MessagingService, private val storage: StorageService) : AbstractNodeService(net) {
|
||||
class DataVendingService(net: MessagingService, private val storage: StorageService, networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) {
|
||||
companion object {
|
||||
val logger = loggerFor<DataVendingService>()
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ import co.paralleluniverse.fibers.FiberScheduler
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.protocols.ProtocolStateMachine
|
||||
import com.r3corda.core.utilities.UntrustworthyData
|
||||
@ -81,9 +81,13 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
|
||||
}
|
||||
|
||||
@Suspendable @Suppress("UNCHECKED_CAST")
|
||||
override fun <T : Any> sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long, sessionIDForReceive: Long,
|
||||
obj: Any, recvType: Class<T>): UntrustworthyData<T> {
|
||||
val result = StateMachineManager.FiberRequest.ExpectingResponse(topic, destination, sessionIDForSend, sessionIDForReceive, obj, recvType)
|
||||
override fun <T : Any> sendAndReceive(topic: String,
|
||||
destination: Party,
|
||||
sessionIDForSend: Long,
|
||||
sessionIDForReceive: Long,
|
||||
payload: Any,
|
||||
recvType: Class<T>): UntrustworthyData<T> {
|
||||
val result = StateMachineManager.FiberRequest.ExpectingResponse(topic, destination, sessionIDForSend, sessionIDForReceive, payload, recvType)
|
||||
return suspendAndExpectReceive(result)
|
||||
}
|
||||
|
||||
@ -94,8 +98,8 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any) {
|
||||
val result = StateMachineManager.FiberRequest.NotExpectingResponse(topic, destination, sessionID, obj)
|
||||
override fun send(topic: String, destination: Party, sessionID: Long, payload: Any) {
|
||||
val result = StateMachineManager.FiberRequest.NotExpectingResponse(topic, destination, sessionID, payload)
|
||||
suspend(result)
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,7 @@ import com.esotericsoftware.kryo.Kryo
|
||||
import com.google.common.base.Throwables
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.core.abbreviate
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.runOnNextMessage
|
||||
import com.r3corda.core.messaging.send
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
@ -253,7 +253,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
request.payload?.let {
|
||||
val topic = "${request.topic}.${request.sessionIDForSend}"
|
||||
psm.logger.trace { "Sending message of type ${it.javaClass.name} using topic $topic to ${request.destination} (${it.toString().abbreviate(50)})" }
|
||||
serviceHub.networkService.send(topic, it, request.destination!!)
|
||||
val address = serviceHub.networkMapCache.getNodeByLegalName(request.destination!!.name)!!.address
|
||||
serviceHub.networkService.send(topic, it, address)
|
||||
}
|
||||
if (request is FiberRequest.NotExpectingResponse) {
|
||||
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
|
||||
@ -307,7 +308,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
|
||||
// TODO: Clean this up
|
||||
open class FiberRequest(val topic: String,
|
||||
val destination: MessageRecipients?,
|
||||
val destination: Party?,
|
||||
val sessionIDForSend: Long,
|
||||
val sessionIDForReceive: Long,
|
||||
val payload: Any?) {
|
||||
@ -317,7 +318,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
|
||||
class ExpectingResponse<R : Any>(
|
||||
topic: String,
|
||||
destination: MessageRecipients?,
|
||||
destination: Party?,
|
||||
sessionIDForSend: Long,
|
||||
sessionIDForReceive: Long,
|
||||
obj: Any?,
|
||||
@ -326,7 +327,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
||||
|
||||
class NotExpectingResponse(
|
||||
topic: String,
|
||||
destination: MessageRecipients,
|
||||
destination: Party,
|
||||
sessionIDForSend: Long,
|
||||
obj: Any?
|
||||
) : FiberRequest(topic, destination, sessionIDForSend, -1, obj)
|
||||
|
@ -2,7 +2,7 @@ package com.r3corda.node.services.transactions
|
||||
|
||||
import com.r3corda.core.messaging.Ack
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.node.services.TimestampChecker
|
||||
import com.r3corda.core.node.services.UniquenessProvider
|
||||
@ -22,7 +22,8 @@ import com.r3corda.protocols.NotaryProtocol
|
||||
abstract class NotaryService(val smm: StateMachineManager,
|
||||
net: MessagingService,
|
||||
val timestampChecker: TimestampChecker,
|
||||
val uniquenessProvider: UniquenessProvider) : AbstractNodeService(net) {
|
||||
val uniquenessProvider: UniquenessProvider,
|
||||
networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) {
|
||||
object Type : ServiceType("corda.notary")
|
||||
|
||||
abstract val logger: org.slf4j.Logger
|
||||
@ -37,8 +38,9 @@ abstract class NotaryService(val smm: StateMachineManager,
|
||||
}
|
||||
|
||||
private fun processRequest(req: NotaryProtocol.Handshake): Ack {
|
||||
val protocol = protocolFactory.create(req.replyTo as SingleMessageRecipient,
|
||||
req.sessionID!!,
|
||||
val protocol = protocolFactory.create(
|
||||
req.replyToParty,
|
||||
req.sessionID,
|
||||
req.sendSessionID,
|
||||
timestampChecker,
|
||||
uniquenessProvider)
|
||||
|
@ -1,6 +1,7 @@
|
||||
package com.r3corda.node.services.transactions
|
||||
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.node.services.TimestampChecker
|
||||
import com.r3corda.core.node.services.UniquenessProvider
|
||||
@ -13,7 +14,8 @@ class SimpleNotaryService(
|
||||
smm: StateMachineManager,
|
||||
net: MessagingService,
|
||||
timestampChecker: TimestampChecker,
|
||||
uniquenessProvider: UniquenessProvider) : NotaryService(smm, net, timestampChecker, uniquenessProvider) {
|
||||
uniquenessProvider: UniquenessProvider,
|
||||
networkMapCache: NetworkMapCache) : NotaryService(smm, net, timestampChecker, uniquenessProvider, networkMapCache) {
|
||||
object Type : ServiceType("corda.notary.simple")
|
||||
|
||||
override val logger = loggerFor<SimpleNotaryService>()
|
||||
|
@ -1,7 +1,8 @@
|
||||
package com.r3corda.node.services.transactions
|
||||
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.services.NetworkMapCache
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.node.services.TimestampChecker
|
||||
import com.r3corda.core.node.services.UniquenessProvider
|
||||
@ -15,14 +16,15 @@ class ValidatingNotaryService(
|
||||
smm: StateMachineManager,
|
||||
net: MessagingService,
|
||||
timestampChecker: TimestampChecker,
|
||||
uniquenessProvider: UniquenessProvider
|
||||
) : NotaryService(smm, net, timestampChecker, uniquenessProvider) {
|
||||
uniquenessProvider: UniquenessProvider,
|
||||
networkMapCache: NetworkMapCache
|
||||
) : NotaryService(smm, net, timestampChecker, uniquenessProvider, networkMapCache) {
|
||||
object Type : ServiceType("corda.notary.validating")
|
||||
|
||||
override val logger = loggerFor<ValidatingNotaryService>()
|
||||
|
||||
override val protocolFactory = object : NotaryProtocol.Factory {
|
||||
override fun create(otherSide: SingleMessageRecipient,
|
||||
override fun create(otherSide: Party,
|
||||
sendSessionID: Long,
|
||||
receiveSessionID: Long,
|
||||
timestampChecker: TimestampChecker,
|
||||
|
@ -59,7 +59,8 @@ class AttachmentTests {
|
||||
val id = n0.storage.attachments.importAttachment(ByteArrayInputStream(fakeAttachment()))
|
||||
|
||||
// Get node one to run a protocol to fetch it and insert it.
|
||||
val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.net.myAddress))
|
||||
network.runNetwork()
|
||||
val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity))
|
||||
network.runNetwork()
|
||||
assertEquals(0, f1.get().fromDisk.size)
|
||||
|
||||
@ -70,7 +71,7 @@ class AttachmentTests {
|
||||
// Shut down node zero and ensure node one can still resolve the attachment.
|
||||
n0.stop()
|
||||
|
||||
val response: FetchDataProtocol.Result<Attachment> = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.net.myAddress)).get()
|
||||
val response: FetchDataProtocol.Result<Attachment> = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)).get()
|
||||
assertEquals(attachment, response.fromDisk[0])
|
||||
}
|
||||
|
||||
@ -80,14 +81,15 @@ class AttachmentTests {
|
||||
|
||||
// Get node one to fetch a non-existent attachment.
|
||||
val hash = SecureHash.randomSHA256()
|
||||
val f1 = n1.smm.add("tests.fetch2", FetchAttachmentsProtocol(setOf(hash), n0.net.myAddress))
|
||||
network.runNetwork()
|
||||
val f1 = n1.smm.add("tests.fetch2", FetchAttachmentsProtocol(setOf(hash), n0.info.identity))
|
||||
network.runNetwork()
|
||||
val e = assertFailsWith<FetchDataProtocol.HashNotFound> { rootCauseExceptions { f1.get() } }
|
||||
assertEquals(hash, e.requested)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun maliciousResponse() {
|
||||
fun `malicious response`() {
|
||||
// Make a node that doesn't do sanity checking at load time.
|
||||
val n0 = network.createNode(null, -1, object : MockNetwork.Factory {
|
||||
override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?,
|
||||
@ -112,7 +114,8 @@ class AttachmentTests {
|
||||
writer.close()
|
||||
|
||||
// Get n1 to fetch the attachment. Should receive corrupted bytes.
|
||||
val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.net.myAddress))
|
||||
network.runNetwork()
|
||||
val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity))
|
||||
network.runNetwork()
|
||||
assertFailsWith<FetchDataProtocol.DownloadedVsRequestedDataMismatch> {
|
||||
rootCauseExceptions { f1.get() }
|
||||
|
@ -11,7 +11,6 @@ import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.days
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
@ -56,14 +55,14 @@ class TwoPartyTradeProtocolTests {
|
||||
lateinit var net: MockNetwork
|
||||
|
||||
private fun runSeller(smm: StateMachineManager, notary: NodeInfo,
|
||||
otherSide: SingleMessageRecipient, assetToSell: StateAndRef<OwnableState>, price: Amount<Issued<Currency>>,
|
||||
otherSide: Party, assetToSell: StateAndRef<OwnableState>, price: Amount<Issued<Currency>>,
|
||||
myKeyPair: KeyPair, buyerSessionID: Long): ListenableFuture<SignedTransaction> {
|
||||
val seller = TwoPartyTradeProtocol.Seller(otherSide, notary, assetToSell, price, myKeyPair, buyerSessionID)
|
||||
return smm.add("${TwoPartyTradeProtocol.TRADE_TOPIC}.seller", seller)
|
||||
}
|
||||
|
||||
private fun runBuyer(smm: StateMachineManager, notaryNode: NodeInfo,
|
||||
otherSide: SingleMessageRecipient, acceptablePrice: Amount<Issued<Currency>>, typeToBuy: Class<out OwnableState>,
|
||||
otherSide: Party, acceptablePrice: Amount<Issued<Currency>>, typeToBuy: Class<out OwnableState>,
|
||||
sessionID: Long): ListenableFuture<SignedTransaction> {
|
||||
val buyer = TwoPartyTradeProtocol.Buyer(otherSide, notaryNode.identity, acceptablePrice, typeToBuy, sessionID)
|
||||
return smm.add("${TwoPartyTradeProtocol.TRADE_TOPIC}.buyer", buyer)
|
||||
@ -105,7 +104,7 @@ class TwoPartyTradeProtocolTests {
|
||||
val bobResult = runBuyer(
|
||||
bobNode.smm,
|
||||
notaryNode.info,
|
||||
aliceNode.net.myAddress,
|
||||
aliceNode.info.identity,
|
||||
1000.DOLLARS `issued by` issuer,
|
||||
CommercialPaper.State::class.java,
|
||||
buyerSessionID
|
||||
@ -113,7 +112,7 @@ class TwoPartyTradeProtocolTests {
|
||||
val aliceResult = runSeller(
|
||||
aliceNode.smm,
|
||||
notaryNode.info,
|
||||
bobNode.net.myAddress,
|
||||
bobNode.info.identity,
|
||||
lookup("alice's paper"),
|
||||
1000.DOLLARS `issued by` issuer,
|
||||
ALICE_KEY,
|
||||
@ -139,7 +138,6 @@ class TwoPartyTradeProtocolTests {
|
||||
val aliceNode = net.createPartyNode(notaryNode.info, ALICE.name, ALICE_KEY)
|
||||
var bobNode = net.createPartyNode(notaryNode.info, BOB.name, BOB_KEY)
|
||||
|
||||
val aliceAddr = aliceNode.net.myAddress
|
||||
val bobAddr = bobNode.net.myAddress as InMemoryMessagingNetwork.Handle
|
||||
val networkMapAddr = notaryNode.info
|
||||
val issuer = bobNode.services.storageService.myLegalIdentity.ref(0)
|
||||
@ -156,7 +154,7 @@ class TwoPartyTradeProtocolTests {
|
||||
val aliceFuture = runSeller(
|
||||
aliceNode.smm,
|
||||
notaryNode.info,
|
||||
bobAddr,
|
||||
bobNode.info.identity,
|
||||
lookup("alice's paper"),
|
||||
1000.DOLLARS `issued by` issuer,
|
||||
ALICE_KEY,
|
||||
@ -165,7 +163,7 @@ class TwoPartyTradeProtocolTests {
|
||||
runBuyer(
|
||||
bobNode.smm,
|
||||
notaryNode.info,
|
||||
aliceAddr,
|
||||
aliceNode.info.identity,
|
||||
1000.DOLLARS `issued by` issuer,
|
||||
CommercialPaper.State::class.java,
|
||||
buyerSessionID
|
||||
@ -276,7 +274,7 @@ class TwoPartyTradeProtocolTests {
|
||||
runSeller(
|
||||
aliceNode.smm,
|
||||
notaryNode.info,
|
||||
bobNode.net.myAddress,
|
||||
bobNode.info.identity,
|
||||
lookup("alice's paper"),
|
||||
1000.DOLLARS `issued by` issuer,
|
||||
ALICE_KEY,
|
||||
@ -285,7 +283,7 @@ class TwoPartyTradeProtocolTests {
|
||||
runBuyer(
|
||||
bobNode.smm,
|
||||
notaryNode.info,
|
||||
aliceNode.net.myAddress,
|
||||
aliceNode.info.identity,
|
||||
1000.DOLLARS `issued by` issuer,
|
||||
CommercialPaper.State::class.java,
|
||||
buyerSessionID
|
||||
@ -371,9 +369,6 @@ class TwoPartyTradeProtocolTests {
|
||||
val bobNode = net.createPartyNode(notaryNode.info, BOB.name, BOB_KEY)
|
||||
val issuer = MEGA_CORP.ref(1, 2, 3)
|
||||
|
||||
val aliceAddr = aliceNode.net.myAddress
|
||||
val bobAddr = bobNode.net.myAddress as InMemoryMessagingNetwork.Handle
|
||||
|
||||
val bobKey = bobNode.keyManagement.freshKey()
|
||||
val bobsBadCash = fillUpForBuyer(bobError, bobKey.public).second
|
||||
val alicesFakePaper = fillUpForSeller(aliceError, aliceNode.storage.myLegalIdentity.owningKey,
|
||||
@ -389,7 +384,7 @@ class TwoPartyTradeProtocolTests {
|
||||
val aliceResult = runSeller(
|
||||
aliceNode.smm,
|
||||
notaryNode.info,
|
||||
bobAddr,
|
||||
bobNode.info.identity,
|
||||
lookup("alice's paper"),
|
||||
1000.DOLLARS `issued by` issuer,
|
||||
ALICE_KEY,
|
||||
@ -398,7 +393,7 @@ class TwoPartyTradeProtocolTests {
|
||||
val bobResult = runBuyer(
|
||||
bobNode.smm,
|
||||
notaryNode.info,
|
||||
aliceAddr,
|
||||
aliceNode.info.identity,
|
||||
1000.DOLLARS `issued by` issuer,
|
||||
CommercialPaper.State::class.java,
|
||||
buyerSessionID
|
||||
|
@ -59,7 +59,7 @@ class InMemoryNetworkMapServiceTest {
|
||||
assertEquals(2, service.nodes.count())
|
||||
|
||||
// Confirm that de-registering the node succeeds and drops it from the node lists
|
||||
var removeChange = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires)
|
||||
val removeChange = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires)
|
||||
val removeWireChange = removeChange.toWire(nodeKey.private)
|
||||
assert(service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success)
|
||||
assertNull(service.processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node)
|
||||
@ -73,7 +73,7 @@ class InMemoryNetworkMapServiceTest {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val req = NetworkMapService.UpdateAcknowledge(hash, serviceHub.networkService.myAddress)
|
||||
send(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, server.address, 0, req)
|
||||
send(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, server.identity, 0, req)
|
||||
}
|
||||
}
|
||||
|
||||
@ -84,7 +84,7 @@ class InMemoryNetworkMapServiceTest {
|
||||
val sessionID = random63BitValue()
|
||||
val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVersion, serviceHub.networkService.myAddress, sessionID)
|
||||
return sendAndReceive<NetworkMapService.FetchMapResponse>(
|
||||
NetworkMapService.FETCH_PROTOCOL_TOPIC, server.address, 0, sessionID, req)
|
||||
NetworkMapService.FETCH_PROTOCOL_TOPIC, server.identity, 0, sessionID, req)
|
||||
.validate { it.nodes }
|
||||
}
|
||||
}
|
||||
@ -97,7 +97,7 @@ class InMemoryNetworkMapServiceTest {
|
||||
val req = NetworkMapService.RegistrationRequest(reg.toWire(privateKey), serviceHub.networkService.myAddress, sessionID)
|
||||
|
||||
return sendAndReceive<NetworkMapService.RegistrationResponse>(
|
||||
NetworkMapService.REGISTER_PROTOCOL_TOPIC, server.address, 0, sessionID, req)
|
||||
NetworkMapService.REGISTER_PROTOCOL_TOPIC, server.identity, 0, sessionID, req)
|
||||
.validate { it }
|
||||
}
|
||||
}
|
||||
@ -110,19 +110,20 @@ class InMemoryNetworkMapServiceTest {
|
||||
val req = NetworkMapService.SubscribeRequest(subscribe, serviceHub.networkService.myAddress, sessionID)
|
||||
|
||||
return sendAndReceive<NetworkMapService.SubscribeResponse>(
|
||||
NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, server.address, 0, sessionID, req)
|
||||
NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, server.identity, 0, sessionID, req)
|
||||
.validate { it }
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun successWithNetwork() {
|
||||
fun `success with network`() {
|
||||
val (mapServiceNode, registerNode) = network.createTwoNodes()
|
||||
|
||||
// Confirm there's a network map service on node 0
|
||||
assertNotNull(mapServiceNode.inNodeNetworkMapService)
|
||||
|
||||
// Confirm all nodes have registered themselves
|
||||
network.runNetwork()
|
||||
var fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false))
|
||||
network.runNetwork()
|
||||
assertEquals(2, fetchPsm.get()?.count())
|
||||
@ -143,11 +144,12 @@ class InMemoryNetworkMapServiceTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun subscribeWithNetwork() {
|
||||
fun `subscribe with network`() {
|
||||
val (mapServiceNode, registerNode) = network.createTwoNodes()
|
||||
val service = (mapServiceNode.inNodeNetworkMapService as InMemoryNetworkMapService)
|
||||
|
||||
// Test subscribing to updates
|
||||
network.runNetwork()
|
||||
val subscribePsm = registerNode.smm.add(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC,
|
||||
TestSubscribePSM(mapServiceNode.info, true))
|
||||
network.runNetwork()
|
||||
|
@ -1,8 +1,8 @@
|
||||
package com.r3corda.node.services
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import com.r3corda.core.contracts.SignedTransaction
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.core.contracts.SignedTransaction
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.node.services.*
|
||||
import com.r3corda.core.node.services.testing.MockStorageService
|
||||
@ -68,7 +68,7 @@ open class MockServices(
|
||||
if (net != null && storage != null) {
|
||||
// Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener
|
||||
// on the networking service, so that will keep it from being collected.
|
||||
DataVendingService(net, storage)
|
||||
DataVendingService(net, storage, networkMapCache)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -8,9 +8,9 @@ import com.r3corda.contracts.testing.`with notary`
|
||||
import com.r3corda.core.bd
|
||||
import com.r3corda.core.contracts.DOLLARS
|
||||
import com.r3corda.core.contracts.Fix
|
||||
import com.r3corda.core.contracts.TransactionType
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.crypto.generateKeyPair
|
||||
import com.r3corda.core.contracts.TransactionType
|
||||
import com.r3corda.core.testing.ALICE_PUBKEY
|
||||
import com.r3corda.core.testing.DUMMY_NOTARY
|
||||
import com.r3corda.core.testing.MEGA_CORP
|
||||
@ -109,8 +109,9 @@ class NodeInterestRatesTest {
|
||||
|
||||
val tx = TransactionType.General.Builder()
|
||||
val fixOf = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M")
|
||||
val protocol = RatesFixProtocol(tx, n2.info, fixOf, "0.675".bd, "0.1".bd, Duration.ofNanos(1))
|
||||
val protocol = RatesFixProtocol(tx, n2.info.identity, fixOf, "0.675".bd, "0.1".bd, Duration.ofNanos(1))
|
||||
BriefLogFormatter.initVerbose("rates")
|
||||
net.runNetwork()
|
||||
val future = n1.smm.add("rates", protocol)
|
||||
|
||||
net.runNetwork()
|
||||
@ -123,4 +124,4 @@ class NodeInterestRatesTest {
|
||||
}
|
||||
|
||||
private fun makeTX() = TransactionType.General.Builder().withItems(1000.DOLLARS.CASH `issued by` DUMMY_CASH_ISSUER `owned by` ALICE_PUBKEY `with notary` DUMMY_NOTARY)
|
||||
}
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ fun main(args: Array<String>) {
|
||||
val rateTolerance = BigDecimal(options.valueOf(rateToleranceArg))
|
||||
|
||||
// Bring up node.
|
||||
var advertisedServices: Set<ServiceType> = emptySet()
|
||||
val advertisedServices: Set<ServiceType> = emptySet()
|
||||
val myNetAddr = ArtemisMessagingService.toHostAndPort(options.valueOf(networkAddressArg))
|
||||
val config = object : NodeConfiguration {
|
||||
override val myLegalName: String = "Rate fix demo node"
|
||||
@ -84,7 +84,7 @@ fun main(args: Array<String>) {
|
||||
// Make a garbage transaction that includes a rate fix.
|
||||
val tx = TransactionType.General.Builder()
|
||||
tx.addOutputState(TransactionState(Cash.State(1500.DOLLARS `issued by` node.storage.myLegalIdentity.ref(1), node.keyManagement.freshKey().public), notary.identity))
|
||||
val protocol = RatesFixProtocol(tx, oracleNode, fixOf, expectedRate, rateTolerance, 24.hours)
|
||||
val protocol = RatesFixProtocol(tx, oracleNode.identity, fixOf, expectedRate, rateTolerance, 24.hours)
|
||||
node.smm.add("demo.ratefix", protocol).get()
|
||||
node.stop()
|
||||
|
||||
|
@ -11,7 +11,6 @@ import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.crypto.generateKeyPair
|
||||
import com.r3corda.core.days
|
||||
import com.r3corda.core.logElapsedTime
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
@ -163,17 +162,13 @@ fun runTraderDemo(args: Array<String>): Int {
|
||||
if (role == Role.BUYER) {
|
||||
runBuyer(node, amount)
|
||||
} else {
|
||||
val recipient = ArtemisMessagingService.makeRecipient(theirNetAddr)
|
||||
runSeller(myNetAddr, node, recipient, amount)
|
||||
runSeller(node, amount, cashIssuer)
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
private fun runSeller(myNetAddr: HostAndPort,
|
||||
node: Node,
|
||||
recipient: SingleMessageRecipient,
|
||||
amount: Amount<Issued<Currency>>) {
|
||||
private fun runSeller(node: Node, amount: Amount<Issued<Currency>>, otherSide: Party) {
|
||||
// The seller will sell some commercial paper to the buyer, who will pay with (self issued) cash.
|
||||
//
|
||||
// The CP sale transaction comes with a prospectus PDF, which will tag along for the ride in an
|
||||
@ -192,7 +187,7 @@ private fun runSeller(myNetAddr: HostAndPort,
|
||||
it.second.get()
|
||||
}
|
||||
} else {
|
||||
val seller = TraderDemoProtocolSeller(myNetAddr, recipient, amount)
|
||||
val seller = TraderDemoProtocolSeller(otherSide, amount)
|
||||
node.smm.add("demo.seller", seller).get()
|
||||
}
|
||||
|
||||
@ -208,8 +203,7 @@ private fun runBuyer(node: Node, amount: Amount<Issued<Currency>>) {
|
||||
}
|
||||
|
||||
val future = if (node.isPreviousCheckpointsPresent) {
|
||||
val (@Suppress("UNUSED_VARIABLE") buyer, future) = node.smm.findStateMachines(TraderDemoProtocolBuyer::class.java).single()
|
||||
future
|
||||
node.smm.findStateMachines(TraderDemoProtocolBuyer::class.java).single().second
|
||||
} else {
|
||||
// We use a simple scenario-specific wrapper protocol to make things happen.
|
||||
val buyer = TraderDemoProtocolBuyer(attachmentsPath, node.info.identity, amount)
|
||||
@ -249,16 +243,19 @@ private class TraderDemoProtocolBuyer(private val attachmentsPath: Path,
|
||||
// As the seller initiates the two-party trade protocol, here, we will be the buyer.
|
||||
try {
|
||||
progressTracker.currentStep = WAITING_FOR_SELLER_TO_CONNECT
|
||||
val origin = receive<HostAndPort>(DEMO_TOPIC, 0).validate { it.withDefaultPort(Node.DEFAULT_PORT) }
|
||||
val recipient = ArtemisMessagingService.makeRecipient(origin as HostAndPort)
|
||||
val newPartnerParty = receive<Party>(DEMO_TOPIC, 0).validate {
|
||||
val ourVersionOfParty = serviceHub.networkMapCache.getNodeByLegalName(it.name)!!.identity
|
||||
require(ourVersionOfParty == it)
|
||||
it
|
||||
}
|
||||
|
||||
// The session ID disambiguates the test trade.
|
||||
val sessionID = random63BitValue()
|
||||
progressTracker.currentStep = STARTING_BUY
|
||||
send(DEMO_TOPIC, recipient, 0, sessionID)
|
||||
send(DEMO_TOPIC, newPartnerParty, 0, sessionID)
|
||||
|
||||
val notary = serviceHub.networkMapCache.notaryNodes[0]
|
||||
val buyer = TwoPartyTradeProtocol.Buyer(recipient, notary.identity, amount,
|
||||
val buyer = TwoPartyTradeProtocol.Buyer(newPartnerParty, notary.identity, amount,
|
||||
CommercialPaper.State::class.java, sessionID)
|
||||
|
||||
// This invokes the trading protocol and out pops our finished transaction.
|
||||
@ -301,10 +298,9 @@ ${Emoji.renderIfSupported(cpIssuance)}""")
|
||||
}
|
||||
}
|
||||
|
||||
private class TraderDemoProtocolSeller(val myAddress: HostAndPort,
|
||||
val otherSide: SingleMessageRecipient,
|
||||
val amount: Amount<Issued<Currency>>,
|
||||
override val progressTracker: ProgressTracker = TraderDemoProtocolSeller.tracker()) : ProtocolLogic<Unit>() {
|
||||
private class TraderDemoProtocolSeller(val otherSide: Party,
|
||||
val amount: Amount<Issued<Currency>>,
|
||||
override val progressTracker: ProgressTracker = TraderDemoProtocolSeller.tracker()) : ProtocolLogic<Unit>() {
|
||||
companion object {
|
||||
val PROSPECTUS_HASH = SecureHash.parse("decd098666b9657314870e192ced0c3519c2c9d395507a238338f8d003929de9")
|
||||
|
||||
@ -326,7 +322,7 @@ private class TraderDemoProtocolSeller(val myAddress: HostAndPort,
|
||||
override fun call() {
|
||||
progressTracker.currentStep = ANNOUNCING
|
||||
|
||||
val sessionID = sendAndReceive<Long>(DEMO_TOPIC, otherSide, 0, 0, myAddress).validate { it }
|
||||
val sessionID = sendAndReceive<Long>(DEMO_TOPIC, otherSide, 0, 0, serviceHub.storageService.myLegalIdentity).validate { it }
|
||||
|
||||
progressTracker.currentStep = SELF_ISSUING
|
||||
|
||||
|
@ -6,7 +6,6 @@ import com.google.common.util.concurrent.Futures
|
||||
import com.r3corda.core.contracts.DealState
|
||||
import com.r3corda.core.contracts.SignedTransaction
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
@ -24,7 +23,7 @@ import com.r3corda.protocols.TwoPartyDealProtocol
|
||||
object AutoOfferProtocol {
|
||||
val TOPIC = "autooffer.topic"
|
||||
|
||||
data class AutoOfferMessage(val otherSide: SingleMessageRecipient,
|
||||
data class AutoOfferMessage(val otherSide: Party,
|
||||
val notary: Party,
|
||||
val otherSessionID: Long, val dealBeingOffered: DealState)
|
||||
|
||||
@ -67,7 +66,7 @@ object AutoOfferProtocol {
|
||||
|
||||
}
|
||||
|
||||
class Requester<T>(val dealToBeOffered: DealState) : ProtocolLogic<SignedTransaction>() {
|
||||
class Requester(val dealToBeOffered: DealState) : ProtocolLogic<SignedTransaction>() {
|
||||
|
||||
companion object {
|
||||
object RECEIVED : ProgressTracker.Step("Received API call")
|
||||
@ -98,9 +97,9 @@ object AutoOfferProtocol {
|
||||
val otherParty = notUs(*dealToBeOffered.parties).single()
|
||||
val otherNode = (serviceHub.networkMapCache.getNodeByLegalName(otherParty.name))
|
||||
requireNotNull(otherNode) { "Cannot identify other party " + otherParty.name + ", know about: " + serviceHub.networkMapCache.partyNodes.map { it.identity } }
|
||||
val otherSide = otherNode!!.address
|
||||
val otherSide = otherNode!!.identity
|
||||
progressTracker.currentStep = ANNOUNCING
|
||||
send(TOPIC, otherSide, 0, AutoOfferMessage(serviceHub.networkService.myAddress, notary, ourSessionID, dealToBeOffered))
|
||||
send(TOPIC, otherSide, 0, AutoOfferMessage(serviceHub.storageService.myLegalIdentity, notary, ourSessionID, dealToBeOffered))
|
||||
progressTracker.currentStep = DEALING
|
||||
val stx = subProtocol(TwoPartyDealProtocol.Acceptor(otherSide, notary, dealToBeOffered, ourSessionID, progressTracker.getChildProgressTracker(DEALING)!!))
|
||||
return stx
|
||||
|
@ -22,7 +22,7 @@ object ExitServerProtocol {
|
||||
object Handler {
|
||||
|
||||
fun register(node: Node) {
|
||||
node.net.addMessageHandler("${TOPIC}.0") { msg, registration ->
|
||||
node.net.addMessageHandler("$TOPIC.0") { msg, registration ->
|
||||
// Just to validate we got the message
|
||||
if (enabled) {
|
||||
val message = msg.data.deserialize<ExitMessage>()
|
||||
@ -62,7 +62,7 @@ object ExitServerProtocol {
|
||||
} else {
|
||||
// TODO: messaging ourselves seems to trigger a bug for the time being and we continuously receive messages
|
||||
if (recipient.identity != serviceHub.storageService.myLegalIdentity) {
|
||||
send(TOPIC, recipient.address, 0, message)
|
||||
send(TOPIC, recipient.identity, 0, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ object UpdateBusinessDayProtocol {
|
||||
if (recipient.address is MockNetworkMapCache.MockAddress) {
|
||||
// Ignore
|
||||
} else {
|
||||
send(TOPIC, recipient.address, 0, message)
|
||||
send(TOPIC, recipient.identity, 0, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user