From a02263937c26d4f8c9c7d234126309179e15ef80 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Thu, 30 Jun 2016 11:18:28 +0100 Subject: [PATCH] send and sendAndReceive use Party for the destination --- .../protocols/TwoPartyTradeProtocol.kt | 5 ++- .../r3corda/core/node/services/Services.kt | 1 + .../r3corda/core/protocols/ProtocolLogic.kt | 15 ++++---- .../core/protocols/ProtocolStateMachine.kt | 8 ++--- .../protocols/AbstractRequestMessage.kt | 9 ----- .../protocols/FetchAttachmentsProtocol.kt | 4 +-- .../r3corda/protocols/FetchDataProtocol.kt | 8 ++--- .../protocols/FetchTransactionsProtocol.kt | 4 +-- .../com/r3corda/protocols/NotaryProtocol.kt | 32 ++++++++--------- .../com/r3corda/protocols/RatesFixProtocol.kt | 23 ++++++------- .../protocols/ResolveTransactionsProtocol.kt | 8 ++--- .../protocols/ServiceRequestMessage.kt | 20 +++++++++++ .../r3corda/protocols/TwoPartyDealProtocol.kt | 28 +++++++-------- .../protocols/ValidatingNotaryProtocol.kt | 6 ++-- .../AbstractStateReplacementProtocol.kt | 17 +++++----- .../kotlin/protocols/NotaryChangeProtocol.kt | 12 +------ .../com/r3corda/node/internal/AbstractNode.kt | 14 ++++---- .../node/internal/testing/IRSSimulation.kt | 4 +-- .../node/internal/testing/TradeSimulation.kt | 17 +++++++--- .../node/services/NotaryChangeService.kt | 8 ++--- .../node/services/api/AbstractNodeService.kt | 19 ++++++----- .../services/clientapi/NodeInterestRates.kt | 4 +-- .../services/network/NetworkMapService.kt | 19 +++++++---- .../persistence/DataVendingService.kt | 3 +- .../statemachine/ProtocolStateMachineImpl.kt | 16 +++++---- .../statemachine/StateMachineManager.kt | 11 +++--- .../services/transactions/NotaryService.kt | 10 +++--- .../transactions/SimpleNotaryService.kt | 4 ++- .../transactions/ValidatingNotaryService.kt | 10 +++--- .../r3corda/node/messaging/AttachmentTests.kt | 13 ++++--- .../messaging/TwoPartyTradeProtocolTests.kt | 25 ++++++-------- .../services/InMemoryNetworkMapServiceTest.kt | 16 +++++---- .../com/r3corda/node/services/MockServices.kt | 4 +-- .../node/services/NodeInterestRatesTest.kt | 7 ++-- .../kotlin/com/r3corda/demos/RateFixDemo.kt | 4 +-- .../kotlin/com/r3corda/demos/TraderDemo.kt | 34 ++++++++----------- .../demos/protocols/AutoOfferProtocol.kt | 9 +++-- .../demos/protocols/ExitServerProtocol.kt | 4 +-- .../protocols/UpdateBusinessDayProtocol.kt | 2 +- 39 files changed, 237 insertions(+), 220 deletions(-) delete mode 100644 core/src/main/kotlin/com/r3corda/protocols/AbstractRequestMessage.kt create mode 100644 core/src/main/kotlin/com/r3corda/protocols/ServiceRequestMessage.kt diff --git a/contracts/src/main/kotlin/com/r3corda/protocols/TwoPartyTradeProtocol.kt b/contracts/src/main/kotlin/com/r3corda/protocols/TwoPartyTradeProtocol.kt index 514f230675..9959de2fb8 100644 --- a/contracts/src/main/kotlin/com/r3corda/protocols/TwoPartyTradeProtocol.kt +++ b/contracts/src/main/kotlin/com/r3corda/protocols/TwoPartyTradeProtocol.kt @@ -7,7 +7,6 @@ import com.r3corda.core.contracts.* import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.signWithECDSA -import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue @@ -61,7 +60,7 @@ object TwoPartyTradeProtocol { class SignaturesFromSeller(val sellerSig: DigitalSignature.WithKey, val notarySig: DigitalSignature.LegallyIdentifiable) - open class Seller(val otherSide: SingleMessageRecipient, + open class Seller(val otherSide: Party, val notaryNode: NodeInfo, val assetToSell: StateAndRef, val price: Amount>, @@ -172,7 +171,7 @@ object TwoPartyTradeProtocol { } } - open class Buyer(val otherSide: SingleMessageRecipient, + open class Buyer(val otherSide: Party, val notary: Party, val acceptablePrice: Amount>, val typeToBuy: Class, diff --git a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt index 7596967949..deff8a419a 100644 --- a/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/com/r3corda/core/node/services/Services.kt @@ -169,6 +169,7 @@ interface StorageService { * Returns the legal identity that this node is configured with. Assumed to be initialised when the node is * first installed. */ + //TODO this should be in the IdentityService, or somewhere not here val myLegalIdentity: Party val myLegalIdentityKey: KeyPair } diff --git a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolLogic.kt b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolLogic.kt index 0ab15a3be2..25f500d7fa 100644 --- a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolLogic.kt +++ b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolLogic.kt @@ -1,7 +1,7 @@ package com.r3corda.core.protocols import co.paralleluniverse.fibers.Suspendable -import com.r3corda.core.messaging.MessageRecipients +import com.r3corda.core.crypto.Party import com.r3corda.core.node.ServiceHub import com.r3corda.core.utilities.ProgressTracker import com.r3corda.core.utilities.UntrustworthyData @@ -39,9 +39,12 @@ abstract class ProtocolLogic { val serviceHub: ServiceHub get() = psm.serviceHub // Kotlin helpers that allow the use of generic types. - inline fun sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long, - sessionIDForReceive: Long, obj: Any): UntrustworthyData { - return psm.sendAndReceive(topic, destination, sessionIDForSend, sessionIDForReceive, obj, T::class.java) + inline fun sendAndReceive(topic: String, + destination: Party, + sessionIDForSend: Long, + sessionIDForReceive: Long, + payload: Any): UntrustworthyData { + return psm.sendAndReceive(topic, destination, sessionIDForSend, sessionIDForReceive, payload, T::class.java) } inline fun receive(topic: String, sessionIDForReceive: Long): UntrustworthyData { @@ -52,8 +55,8 @@ abstract class ProtocolLogic { return psm.receive(topic, sessionIDForReceive, clazz) } - @Suspendable fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any) { - psm.send(topic, destination, sessionID, obj) + @Suspendable fun send(topic: String, destination: Party, sessionID: Long, payload: Any) { + psm.send(topic, destination, sessionID, payload) } /** diff --git a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt index fc4f8039e8..217df1752d 100644 --- a/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt +++ b/core/src/main/kotlin/com/r3corda/core/protocols/ProtocolStateMachine.kt @@ -1,7 +1,7 @@ package com.r3corda.core.protocols import co.paralleluniverse.fibers.Suspendable -import com.r3corda.core.messaging.MessageRecipients +import com.r3corda.core.crypto.Party import com.r3corda.core.node.ServiceHub import com.r3corda.core.utilities.UntrustworthyData import org.slf4j.Logger @@ -12,14 +12,14 @@ import org.slf4j.Logger */ interface ProtocolStateMachine { @Suspendable - fun sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long, sessionIDForReceive: Long, - obj: Any, recvType: Class): UntrustworthyData + fun sendAndReceive(topic: String, destination: Party, sessionIDForSend: Long, sessionIDForReceive: Long, + payload: Any, recvType: Class): UntrustworthyData @Suspendable fun receive(topic: String, sessionIDForReceive: Long, recvType: Class): UntrustworthyData @Suspendable - fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any) + fun send(topic: String, destination: Party, sessionID: Long, payload: Any) val serviceHub: ServiceHub val logger: Logger diff --git a/core/src/main/kotlin/com/r3corda/protocols/AbstractRequestMessage.kt b/core/src/main/kotlin/com/r3corda/protocols/AbstractRequestMessage.kt deleted file mode 100644 index 414df561f5..0000000000 --- a/core/src/main/kotlin/com/r3corda/protocols/AbstractRequestMessage.kt +++ /dev/null @@ -1,9 +0,0 @@ -package com.r3corda.protocols - -import com.r3corda.core.messaging.MessageRecipients - -/** - * Abstract superclass for request messages sent to services, which includes common - * fields such as replyTo and replyToTopic. - */ -abstract class AbstractRequestMessage(val replyTo: MessageRecipients, val sessionID: Long?) \ No newline at end of file diff --git a/core/src/main/kotlin/com/r3corda/protocols/FetchAttachmentsProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/FetchAttachmentsProtocol.kt index dd19927281..993c078206 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/FetchAttachmentsProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/FetchAttachmentsProtocol.kt @@ -1,9 +1,9 @@ package com.r3corda.protocols import com.r3corda.core.contracts.Attachment +import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.sha256 -import com.r3corda.core.messaging.SingleMessageRecipient import java.io.ByteArrayInputStream import java.io.InputStream @@ -12,7 +12,7 @@ import java.io.InputStream * attachments are saved to local storage automatically. */ class FetchAttachmentsProtocol(requests: Set, - otherSide: SingleMessageRecipient) : FetchDataProtocol(requests, otherSide) { + otherSide: Party) : FetchDataProtocol(requests, otherSide) { companion object { const val TOPIC = "platform.fetch.attachment" } diff --git a/core/src/main/kotlin/com/r3corda/protocols/FetchDataProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/FetchDataProtocol.kt index 2530269bd9..ee4dc1d31c 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/FetchDataProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/FetchDataProtocol.kt @@ -2,8 +2,8 @@ package com.r3corda.protocols import co.paralleluniverse.fibers.Suspendable import com.r3corda.core.contracts.NamedByHash +import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash -import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue import com.r3corda.core.utilities.UntrustworthyData @@ -27,13 +27,13 @@ import java.util.* */ abstract class FetchDataProtocol( protected val requests: Set, - protected val otherSide: SingleMessageRecipient) : ProtocolLogic>() { + protected val otherSide: Party) : ProtocolLogic>() { open class BadAnswer : Exception() class HashNotFound(val requested: SecureHash) : BadAnswer() class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : BadAnswer() - class Request(val hashes: List, replyTo: SingleMessageRecipient, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) + class Request(val hashes: List, replyTo: Party, override val sessionID: Long) : AbstractRequestMessage(replyTo) data class Result(val fromDisk: List, val downloaded: List) protected abstract val queryTopic: String @@ -49,7 +49,7 @@ abstract class FetchDataProtocol( logger.trace("Requesting ${toFetch.size} dependency(s) for verification") val sid = random63BitValue() - val fetchReq = Request(toFetch, serviceHub.networkService.myAddress, sid) + val fetchReq = Request(toFetch, serviceHub.storageService.myLegalIdentity, sid) // TODO: Support "large message" response streaming so response sizes are not limited by RAM. val maybeItems = sendAndReceive>(queryTopic, otherSide, 0, sid, fetchReq) // Check for a buggy/malicious peer answering with something that we didn't ask for. diff --git a/core/src/main/kotlin/com/r3corda/protocols/FetchTransactionsProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/FetchTransactionsProtocol.kt index 032fa9b583..667937c1be 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/FetchTransactionsProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/FetchTransactionsProtocol.kt @@ -1,8 +1,8 @@ package com.r3corda.protocols import com.r3corda.core.contracts.SignedTransaction +import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash -import com.r3corda.core.messaging.SingleMessageRecipient /** * Given a set of tx hashes (IDs), either loads them from local disk or asks the remote peer to provide them. @@ -12,7 +12,7 @@ import com.r3corda.core.messaging.SingleMessageRecipient * results in a [FetchDataProtocol.HashNotFound] exception. Note that returned transactions are not inserted into * the database, because it's up to the caller to actually verify the transactions are valid. */ -class FetchTransactionsProtocol(requests: Set, otherSide: SingleMessageRecipient) : +class FetchTransactionsProtocol(requests: Set, otherSide: Party) : FetchDataProtocol(requests, otherSide) { companion object { const val TOPIC = "platform.fetch.tx" diff --git a/core/src/main/kotlin/com/r3corda/protocols/NotaryProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/NotaryProtocol.kt index f7e50cf3d4..1da1d4d674 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/NotaryProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/NotaryProtocol.kt @@ -9,8 +9,6 @@ import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SignedData import com.r3corda.core.crypto.signWithECDSA import com.r3corda.core.messaging.Ack -import com.r3corda.core.messaging.SingleMessageRecipient -import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.TimestampChecker import com.r3corda.core.node.services.UniquenessException import com.r3corda.core.node.services.UniquenessProvider @@ -45,21 +43,21 @@ object NotaryProtocol { fun tracker() = ProgressTracker(REQUESTING, VALIDATING) } - lateinit var notaryNode: NodeInfo + lateinit var notaryParty: Party @Suspendable override fun call(): DigitalSignature.LegallyIdentifiable { progressTracker.currentStep = REQUESTING - notaryNode = findNotaryNode() + notaryParty = findNotaryParty() val sendSessionID = random63BitValue() val receiveSessionID = random63BitValue() - val handshake = Handshake(serviceHub.networkService.myAddress, sendSessionID, receiveSessionID) - sendAndReceive(TOPIC_INITIATE, notaryNode.address, 0, receiveSessionID, handshake) + val handshake = Handshake(serviceHub.storageService.myLegalIdentity, sendSessionID, receiveSessionID) + sendAndReceive(TOPIC_INITIATE, notaryParty, 0, receiveSessionID, handshake) val request = SignRequest(stx, serviceHub.storageService.myLegalIdentity) - val response = sendAndReceive(TOPIC, notaryNode.address, sendSessionID, receiveSessionID, request) + val response = sendAndReceive(TOPIC, notaryParty, sendSessionID, receiveSessionID, request) val notaryResult = validateResponse(response) return notaryResult.sig ?: throw NotaryException(notaryResult.error!!) @@ -72,17 +70,17 @@ object NotaryProtocol { if (it.sig != null) validateSignature(it.sig, stx.txBits) else if (it.error is NotaryError.Conflict) it.error.conflict.verified() else if (it.error == null || it.error !is NotaryError) - throw IllegalStateException("Received invalid result from Notary service '${notaryNode.identity}'") + throw IllegalStateException("Received invalid result from Notary service '$notaryParty'") return it } } private fun validateSignature(sig: DigitalSignature.LegallyIdentifiable, data: SerializedBytes) { - check(sig.signer == notaryNode.identity) { "Notary result not signed by the correct service" } + check(sig.signer == notaryParty) { "Notary result not signed by the correct service" } sig.verifyWithECDSA(data) } - private fun findNotaryNode(): NodeInfo { + private fun findNotaryParty(): Party { var maybeNotaryKey: PublicKey? = null val wtx = stx.tx @@ -97,8 +95,8 @@ object NotaryProtocol { } val notaryKey = maybeNotaryKey ?: throw IllegalStateException("Transaction does not specify a Notary") - val notaryNode = serviceHub.networkMapCache.getNodeByPublicKey(notaryKey) - return notaryNode ?: throw IllegalStateException("No Notary node can be found with the specified public key") + val notaryParty = serviceHub.networkMapCache.getNodeByPublicKey(notaryKey)?.identity + return notaryParty ?: throw IllegalStateException("No Notary node can be found with the specified public key") } } @@ -110,7 +108,7 @@ object NotaryProtocol { * * TODO: the notary service should only be able to see timestamp commands and inputs */ - open class Service(val otherSide: SingleMessageRecipient, + open class Service(val otherSide: Party, val sendSessionID: Long, val receiveSessionID: Long, val timestampChecker: TimestampChecker, @@ -181,9 +179,9 @@ object NotaryProtocol { } class Handshake( - replyTo: SingleMessageRecipient, + replyTo: Party, val sendSessionID: Long, - sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) + override val sessionID: Long) : AbstractRequestMessage(replyTo) /** TODO: The caller must authenticate instead of just specifying its identity */ class SignRequest(val tx: SignedTransaction, @@ -197,7 +195,7 @@ object NotaryProtocol { } interface Factory { - fun create(otherSide: SingleMessageRecipient, + fun create(otherSide: Party, sendSessionID: Long, receiveSessionID: Long, timestampChecker: TimestampChecker, @@ -205,7 +203,7 @@ object NotaryProtocol { } object DefaultFactory : Factory { - override fun create(otherSide: SingleMessageRecipient, + override fun create(otherSide: Party, sendSessionID: Long, receiveSessionID: Long, timestampChecker: TimestampChecker, diff --git a/core/src/main/kotlin/com/r3corda/protocols/RatesFixProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/RatesFixProtocol.kt index 1b910c8698..fd1378f548 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/RatesFixProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/RatesFixProtocol.kt @@ -6,8 +6,7 @@ import com.r3corda.core.contracts.FixOf import com.r3corda.core.contracts.TransactionBuilder import com.r3corda.core.contracts.WireTransaction import com.r3corda.core.crypto.DigitalSignature -import com.r3corda.core.messaging.SingleMessageRecipient -import com.r3corda.core.node.NodeInfo +import com.r3corda.core.crypto.Party import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue import com.r3corda.core.utilities.ProgressTracker @@ -28,7 +27,7 @@ import java.util.* * @throws FixOutOfRange if the returned fix was further away from the expected rate by the given amount. */ open class RatesFixProtocol(protected val tx: TransactionBuilder, - private val oracle: NodeInfo, + private val oracle: Party, private val fixOf: FixOf, private val expectedRate: BigDecimal, private val rateTolerance: BigDecimal, @@ -48,8 +47,8 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder, class FixOutOfRange(val byAmount: BigDecimal) : Exception() - class QueryRequest(val queries: List, replyTo: SingleMessageRecipient, sessionID: Long, val deadline: Instant) : AbstractRequestMessage(replyTo, sessionID) - class SignRequest(val tx: WireTransaction, replyTo: SingleMessageRecipient, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) + class QueryRequest(val queries: List, replyTo: Party, override val sessionID: Long, val deadline: Instant) : AbstractRequestMessage(replyTo) + class SignRequest(val tx: WireTransaction, replyTo: Party, override val sessionID: Long) : AbstractRequestMessage(replyTo) @Suspendable override fun call() { @@ -57,7 +56,7 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder, val fix = query() progressTracker.currentStep = WORKING checkFixIsNearExpected(fix) - tx.addCommand(fix, oracle.identity.owningKey) + tx.addCommand(fix, oracle.owningKey) beforeSigning(fix) progressTracker.currentStep = SIGNING tx.addSignatureUnchecked(sign()) @@ -83,11 +82,11 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder, fun sign(): DigitalSignature.LegallyIdentifiable { val sessionID = random63BitValue() val wtx = tx.toWireTransaction() - val req = SignRequest(wtx, serviceHub.networkService.myAddress, sessionID) - val resp = sendAndReceive(TOPIC_SIGN, oracle.address, 0, sessionID, req) + val req = SignRequest(wtx, serviceHub.storageService.myLegalIdentity, sessionID) + val resp = sendAndReceive(TOPIC_SIGN, oracle, 0, sessionID, req) return resp.validate { sig -> - check(sig.signer == oracle.identity) + check(sig.signer == oracle) tx.checkSignature(sig) sig } @@ -96,10 +95,10 @@ open class RatesFixProtocol(protected val tx: TransactionBuilder, @Suspendable fun query(): Fix { val sessionID = random63BitValue() - val deadline = suggestInterestRateAnnouncementTimeWindow(fixOf.name, oracle.identity.name, fixOf.forDay).end - val req = QueryRequest(listOf(fixOf), serviceHub.networkService.myAddress, sessionID, deadline) + val deadline = suggestInterestRateAnnouncementTimeWindow(fixOf.name, oracle.name, fixOf.forDay).end + val req = QueryRequest(listOf(fixOf), serviceHub.storageService.myLegalIdentity, sessionID, deadline) // TODO: add deadline to receive - val resp = sendAndReceive>(TOPIC_QUERY, oracle.address, 0, sessionID, req) + val resp = sendAndReceive>(TOPIC_QUERY, oracle, 0, sessionID, req) return resp.validate { val fix = it.first() diff --git a/core/src/main/kotlin/com/r3corda/protocols/ResolveTransactionsProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/ResolveTransactionsProtocol.kt index 7639a0befb..f6bc21fd56 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/ResolveTransactionsProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/ResolveTransactionsProtocol.kt @@ -2,8 +2,8 @@ package com.r3corda.protocols import co.paralleluniverse.fibers.Suspendable import com.r3corda.core.contracts.* +import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash -import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.protocols.ProtocolLogic import java.util.* @@ -21,7 +21,7 @@ import java.util.* * protocol is helpful when resolving and verifying a finished but partially signed transaction. */ class ResolveTransactionsProtocol(private val txHashes: Set, - private val otherSide: SingleMessageRecipient) : ProtocolLogic() { + private val otherSide: Party) : ProtocolLogic() { companion object { private fun dependencyIDs(wtx: WireTransaction) = wtx.inputs.map { it.txhash }.toSet() @@ -33,11 +33,11 @@ class ResolveTransactionsProtocol(private val txHashes: Set, private var stx: SignedTransaction? = null private var wtx: WireTransaction? = null - constructor(stx: SignedTransaction, otherSide: SingleMessageRecipient) : this(stx.tx, otherSide) { + constructor(stx: SignedTransaction, otherSide: Party) : this(stx.tx, otherSide) { this.stx = stx } - constructor(wtx: WireTransaction, otherSide: SingleMessageRecipient) : this(dependencyIDs(wtx), otherSide) { + constructor(wtx: WireTransaction, otherSide: Party) : this(dependencyIDs(wtx), otherSide) { this.wtx = wtx } diff --git a/core/src/main/kotlin/com/r3corda/protocols/ServiceRequestMessage.kt b/core/src/main/kotlin/com/r3corda/protocols/ServiceRequestMessage.kt new file mode 100644 index 0000000000..6f6fbb21ee --- /dev/null +++ b/core/src/main/kotlin/com/r3corda/protocols/ServiceRequestMessage.kt @@ -0,0 +1,20 @@ +package com.r3corda.protocols + +import com.r3corda.core.crypto.Party +import com.r3corda.core.messaging.MessageRecipients +import com.r3corda.core.node.services.NetworkMapCache + +/** + * Abstract superclass for request messages sent to services, which includes common + * fields such as replyTo and replyToTopic. + */ +interface ServiceRequestMessage { + val sessionID: Long + fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients +} + +abstract class AbstractRequestMessage(val replyToParty: Party): ServiceRequestMessage { + override fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients { + return networkMapCache.partyNodes.single { it.identity == replyToParty }.address + } +} \ No newline at end of file diff --git a/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt index d78fe77fef..f716dfa314 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/TwoPartyDealProtocol.kt @@ -6,7 +6,6 @@ import com.r3corda.core.contracts.* import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.signWithECDSA -import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue @@ -72,7 +71,7 @@ object TwoPartyDealProtocol { abstract val payload: U abstract val notaryNode: NodeInfo - abstract val otherSide: SingleMessageRecipient + abstract val otherSide: Party abstract val otherSessionID: Long abstract val myKeyPair: KeyPair @@ -153,7 +152,7 @@ object TwoPartyDealProtocol { // Copy the transaction to every regulator in the network. This is obviously completely bogus, it's // just for demo purposes. for (regulator in regulators) { - send("regulator.all.seeing.eye", regulator.address, 0, fullySigned) + send("regulator.all.seeing.eye", regulator.identity, 0, fullySigned) } } @@ -203,7 +202,7 @@ object TwoPartyDealProtocol { fun tracker() = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SWAPPING_SIGNATURES, RECORDING) } - abstract val otherSide: SingleMessageRecipient + abstract val otherSide: Party abstract val sessionID: Long @Suspendable @@ -276,7 +275,7 @@ object TwoPartyDealProtocol { /** * One side of the protocol for inserting a pre-agreed deal. */ - open class Instigator(override val otherSide: SingleMessageRecipient, + open class Instigator(override val otherSide: Party, val notary: Party, override val payload: T, override val myKeyPair: KeyPair, @@ -290,7 +289,7 @@ object TwoPartyDealProtocol { /** * One side of the protocol for inserting a pre-agreed deal. */ - open class Acceptor(override val otherSide: SingleMessageRecipient, + open class Acceptor(override val otherSide: Party, val notary: Party, val dealToBuy: T, override val sessionID: Long, @@ -342,7 +341,7 @@ object TwoPartyDealProtocol { override val sessionID: Long get() = initiation.sessionID - override val otherSide: SingleMessageRecipient get() = initiation.sender + override val otherSide: Party get() = initiation.sender private lateinit var txState: TransactionState<*> private lateinit var deal: FixableDealState @@ -360,8 +359,6 @@ object TwoPartyDealProtocol { val myName = serviceHub.storageService.myLegalIdentity.name val otherParty = deal.parties.filter { it.name != myName }.single() check(otherParty == initiation.party) - val otherPartyAddress = serviceHub.networkMapCache.getNodeByLegalName(otherParty.name)!!.address - check(otherPartyAddress == otherSide) // Also check we are one of the parties deal.parties.filter { it.name == myName }.single() @@ -380,7 +377,7 @@ object TwoPartyDealProtocol { val newDeal = deal val ptx = TransactionType.General.Builder() - val addFixing = object : RatesFixProtocol(ptx, serviceHub.networkMapCache.ratesOracleNodes[0], fixOf, BigDecimal.ZERO, BigDecimal.ONE, initiation.timeout) { + val addFixing = object : RatesFixProtocol(ptx, serviceHub.networkMapCache.ratesOracleNodes[0].identity, fixOf, BigDecimal.ZERO, BigDecimal.ONE, initiation.timeout) { @Suspendable override fun beforeSigning(fix: Fix) { newDeal.generateFix(ptx, StateAndRef(txState, handshake.payload), fix) @@ -417,11 +414,10 @@ object TwoPartyDealProtocol { return serviceHub.keyManagementService.toKeyPair(publicKey) } - override val otherSide: SingleMessageRecipient get() { + override val otherSide: Party get() { // TODO: what happens if there's no node? Move to messaging taking Party and then handled in messaging layer val myName = serviceHub.storageService.myLegalIdentity.name - val otherParty = dealToFix.state.data.parties.filter { it.name != myName }.single() - return serviceHub.networkMapCache.getNodeByLegalName(otherParty.name)!!.address + return dealToFix.state.data.parties.filter { it.name != myName }.single() } override val notaryNode: NodeInfo get() = @@ -432,7 +428,7 @@ object TwoPartyDealProtocol { val FIX_INITIATE_TOPIC = "platform.fix.initiate" /** Used to set up the session between [Floater] and [Fixer] */ - data class FixingSessionInitiation(val sessionID: Long, val party: Party, val sender: SingleMessageRecipient, val timeout: Duration) + data class FixingSessionInitiation(val sessionID: Long, val party: Party, val sender: Party, val timeout: Duration) /** * This protocol looks at the deal and decides whether to be the Fixer or Floater role in agreeing a fixing. @@ -459,10 +455,10 @@ object TwoPartyDealProtocol { if (sortedParties[0].name == serviceHub.storageService.myLegalIdentity.name) { // Generate sessionID val sessionID = random63BitValue() - val initation = FixingSessionInitiation(sessionID, sortedParties[0], serviceHub.networkService.myAddress, timeout) + val initation = FixingSessionInitiation(sessionID, sortedParties[0], serviceHub.storageService.myLegalIdentity, timeout) // Send initiation to other side to launch one side of the fixing protocol (the Fixer). - send(FIX_INITIATE_TOPIC, serviceHub.networkMapCache.getNodeByLegalName(sortedParties[1].name)!!.address, 0, initation) + send(FIX_INITIATE_TOPIC, sortedParties[1], 0, initation) // Then start the other side of the fixing protocol. val protocol = Floater(ref, sessionID) diff --git a/core/src/main/kotlin/com/r3corda/protocols/ValidatingNotaryProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/ValidatingNotaryProtocol.kt index ea36c2fe66..0b45e7f15a 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/ValidatingNotaryProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/ValidatingNotaryProtocol.kt @@ -6,7 +6,6 @@ import com.r3corda.core.contracts.TransactionVerificationException import com.r3corda.core.contracts.WireTransaction import com.r3corda.core.contracts.toLedgerTransaction import com.r3corda.core.crypto.Party -import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.services.TimestampChecker import com.r3corda.core.node.services.UniquenessProvider import java.security.SignatureException @@ -17,7 +16,7 @@ import java.security.SignatureException * has its input states "blocked" by a transaction from another party, and needs to establish whether that transaction was * indeed valid */ -class ValidatingNotaryProtocol(otherSide: SingleMessageRecipient, +class ValidatingNotaryProtocol(otherSide: Party, sessionIdForSend: Long, sessionIdForReceive: Long, timestampChecker: TimestampChecker, @@ -52,7 +51,6 @@ class ValidatingNotaryProtocol(otherSide: SingleMessageRecipient, @Suspendable private fun validateDependencies(reqIdentity: Party, wtx: WireTransaction) { - val otherSide = serviceHub.networkMapCache.getNodeByPublicKey(reqIdentity.owningKey)!!.address - subProtocol(ResolveTransactionsProtocol(wtx, otherSide)) + subProtocol(ResolveTransactionsProtocol(wtx, reqIdentity)) } } \ No newline at end of file diff --git a/core/src/main/kotlin/protocols/AbstractStateReplacementProtocol.kt b/core/src/main/kotlin/protocols/AbstractStateReplacementProtocol.kt index d1dbd6b7be..ba302424d9 100644 --- a/core/src/main/kotlin/protocols/AbstractStateReplacementProtocol.kt +++ b/core/src/main/kotlin/protocols/AbstractStateReplacementProtocol.kt @@ -6,7 +6,6 @@ import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.signWithECDSA import com.r3corda.core.messaging.Ack -import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue @@ -34,8 +33,8 @@ abstract class AbstractStateReplacementProtocol { } class Handshake(val sessionIdForSend: Long, - replyTo: SingleMessageRecipient, - replySessionId: Long) : AbstractRequestMessage(replyTo, replySessionId) + replyTo: Party, + override val sessionID: Long) : AbstractRequestMessage(replyTo) abstract class Instigator(val originalState: StateAndRef, val modification: T, @@ -89,7 +88,7 @@ abstract class AbstractStateReplacementProtocol { } val allSignatures = participantSignatures + getNotarySignature(stx) - sessions.forEach { send(TOPIC_CHANGE, it.key.address, it.value, allSignatures) } + sessions.forEach { send(TOPIC_CHANGE, it.key.identity, it.value, allSignatures) } return allSignatures } @@ -99,10 +98,10 @@ abstract class AbstractStateReplacementProtocol { val sessionIdForReceive = random63BitValue() val proposal = assembleProposal(originalState.ref, modification, stx) - val handshake = Handshake(sessionIdForSend, serviceHub.networkService.myAddress, sessionIdForReceive) - sendAndReceive(TOPIC_INITIATE, node.address, 0, sessionIdForReceive, handshake) + val handshake = Handshake(sessionIdForSend, serviceHub.storageService.myLegalIdentity, sessionIdForReceive) + sendAndReceive(TOPIC_INITIATE, node.identity, 0, sessionIdForReceive, handshake) - val response = sendAndReceive(TOPIC_CHANGE, node.address, sessionIdForSend, sessionIdForReceive, proposal) + val response = sendAndReceive(TOPIC_CHANGE, node.identity, sessionIdForSend, sessionIdForReceive, proposal) val participantSignature = response.validate { if (it.sig == null) throw StateReplacementException(it.error!!) else { @@ -122,7 +121,7 @@ abstract class AbstractStateReplacementProtocol { } } - abstract class Acceptor(val otherSide: SingleMessageRecipient, + abstract class Acceptor(val otherSide: Party, val sessionIdForSend: Long, val sessionIdForReceive: Long, override val progressTracker: ProgressTracker = tracker()) : ProtocolLogic() { @@ -241,4 +240,4 @@ class StateReplacementRefused(val identity: Party, val state: StateRef, val deta } class StateReplacementException(val error: StateReplacementRefused) -: Exception("State change failed - ${error}") \ No newline at end of file +: Exception("State change failed - $error") \ No newline at end of file diff --git a/core/src/main/kotlin/protocols/NotaryChangeProtocol.kt b/core/src/main/kotlin/protocols/NotaryChangeProtocol.kt index 2caf6069c2..f4427aee15 100644 --- a/core/src/main/kotlin/protocols/NotaryChangeProtocol.kt +++ b/core/src/main/kotlin/protocols/NotaryChangeProtocol.kt @@ -2,18 +2,8 @@ package protocols import co.paralleluniverse.fibers.Suspendable import com.r3corda.core.contracts.* -import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.Party -import com.r3corda.core.crypto.signWithECDSA -import com.r3corda.core.messaging.Ack -import com.r3corda.core.messaging.SingleMessageRecipient -import com.r3corda.core.node.NodeInfo -import com.r3corda.core.protocols.ProtocolLogic -import com.r3corda.core.random63BitValue import com.r3corda.core.utilities.ProgressTracker -import com.r3corda.protocols.AbstractRequestMessage -import com.r3corda.protocols.NotaryProtocol -import com.r3corda.protocols.ResolveTransactionsProtocol import java.security.PublicKey /** @@ -58,7 +48,7 @@ object NotaryChangeProtocol: AbstractStateReplacementProtocol() { } } - class Acceptor(otherSide: SingleMessageRecipient, + class Acceptor(otherSide: Party, sessionIdForSend: Long, sessionIdForReceive: Long, override val progressTracker: ProgressTracker = tracker()) diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 06abdc96b1..7838807b5a 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -33,6 +33,7 @@ import com.r3corda.node.services.keys.E2ETestKeyManagementService import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.InMemoryNetworkMapService import com.r3corda.node.services.network.NetworkMapService +import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC import com.r3corda.node.services.network.NodeRegistration import com.r3corda.node.services.persistence.* import com.r3corda.node.services.statemachine.StateMachineManager @@ -50,7 +51,6 @@ import java.nio.file.FileAlreadyExistsException import java.nio.file.Files import java.nio.file.Path import java.security.KeyPair -import java.security.Security import java.time.Clock import java.util.* @@ -160,8 +160,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, // This object doesn't need to be referenced from this class because it registers handlers on the network // service and so that keeps it from being collected. - DataVendingService(net, storage) - NotaryChangeService(net, smm) + DataVendingService(net, storage, services.networkMapCache) + NotaryChangeService(net, smm, services.networkMapCache) buildAdvertisedServices() @@ -231,9 +231,9 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val reg = NodeRegistration(info, networkMapSeq++, type, expires) val sessionID = random63BitValue() val request = NetworkMapService.RegistrationRequest(reg.toWire(storage.myLegalIdentityKey.private), net.myAddress, sessionID) - val message = net.createMessage(NetworkMapService.REGISTER_PROTOCOL_TOPIC + ".0", request.serialize().bits) + val message = net.createMessage("$REGISTER_PROTOCOL_TOPIC.0", request.serialize().bits) val future = SettableFuture.create() - val topic = NetworkMapService.REGISTER_PROTOCOL_TOPIC + "." + sessionID + val topic = "$REGISTER_PROTOCOL_TOPIC.$sessionID" net.runOnNextMessage(topic, RunOnCallerThread) { message -> future.set(message.data.deserialize()) @@ -254,8 +254,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration, val timestampChecker = TimestampChecker(platformClock, 30.seconds) inNodeNotaryService = when (type) { - is SimpleNotaryService.Type -> SimpleNotaryService(smm, net, timestampChecker, uniquenessProvider) - is ValidatingNotaryService.Type -> ValidatingNotaryService(smm, net, timestampChecker, uniquenessProvider) + is SimpleNotaryService.Type -> SimpleNotaryService(smm, net, timestampChecker, uniquenessProvider, services.networkMapCache) + is ValidatingNotaryService.Type -> ValidatingNotaryService(smm, net, timestampChecker, uniquenessProvider, services.networkMapCache) else -> null } } diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/IRSSimulation.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/IRSSimulation.kt index d3bf112590..674d663d4d 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/IRSSimulation.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/IRSSimulation.kt @@ -130,8 +130,8 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten val sessionID = random63BitValue() - val instigator = TwoPartyDealProtocol.Instigator(node2.net.myAddress, notary.info.identity, irs, nodeAKey!!, sessionID) - val acceptor = TwoPartyDealProtocol.Acceptor(node1.net.myAddress, notary.info.identity, irs, sessionID) + val instigator = TwoPartyDealProtocol.Instigator(node2.info.identity, notary.info.identity, irs, nodeAKey!!, sessionID) + val acceptor = TwoPartyDealProtocol.Acceptor(node1.info.identity, notary.info.identity, irs, sessionID) showProgressFor(listOf(node1, node2)) showConsensusFor(listOf(node1, node2, regulators[0])) diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt index 1acf61bc08..f86236517e 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/TradeSimulation.kt @@ -44,10 +44,19 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo val cashIssuerKey = generateKeyPair() val amount = 1000.DOLLARS `issued by` Party("Big friendly bank", cashIssuerKey.public).ref(1) val sessionID = random63BitValue() - val buyerProtocol = TwoPartyTradeProtocol.Buyer(seller.net.myAddress, notary.info.identity, - amount, CommercialPaper.State::class.java, sessionID) - val sellerProtocol = TwoPartyTradeProtocol.Seller(buyer.net.myAddress, notary.info, - issuance.tx.outRef(0), amount, seller.storage.myLegalIdentityKey, sessionID) + val buyerProtocol = TwoPartyTradeProtocol.Buyer( + seller.info.identity, + notary.info.identity, + amount, + CommercialPaper.State::class.java, + sessionID) + val sellerProtocol = TwoPartyTradeProtocol.Seller( + buyer.info.identity, + notary.info, + issuance.tx.outRef(0), + amount, + seller.storage.myLegalIdentityKey, + sessionID) showConsensusFor(listOf(buyer, seller, notary)) showProgressFor(listOf(buyer, seller)) diff --git a/node/src/main/kotlin/com/r3corda/node/services/NotaryChangeService.kt b/node/src/main/kotlin/com/r3corda/node/services/NotaryChangeService.kt index 6e0378c52f..e1b88f1c36 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/NotaryChangeService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/NotaryChangeService.kt @@ -2,7 +2,7 @@ package com.r3corda.node.services import com.r3corda.core.messaging.Ack import com.r3corda.core.messaging.MessagingService -import com.r3corda.core.messaging.SingleMessageRecipient +import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.node.services.api.AbstractNodeService import com.r3corda.node.services.statemachine.StateMachineManager import protocols.AbstractStateReplacementProtocol @@ -12,7 +12,7 @@ import protocols.NotaryChangeProtocol * A service that monitors the network for requests for changing the notary of a state, * and immediately runs the [NotaryChangeProtocol] if the auto-accept criteria are met. */ -class NotaryChangeService(net: MessagingService, val smm: StateMachineManager) : AbstractNodeService(net) { +class NotaryChangeService(net: MessagingService, val smm: StateMachineManager, networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) { init { addMessageHandler(NotaryChangeProtocol.TOPIC_INITIATE, { req: AbstractStateReplacementProtocol.Handshake -> handleChangeNotaryRequest(req) } @@ -21,8 +21,8 @@ class NotaryChangeService(net: MessagingService, val smm: StateMachineManager) : private fun handleChangeNotaryRequest(req: AbstractStateReplacementProtocol.Handshake): Ack { val protocol = NotaryChangeProtocol.Acceptor( - req.replyTo as SingleMessageRecipient, - req.sessionID!!, + req.replyToParty, + req.sessionID, req.sessionIdForSend) smm.add(NotaryChangeProtocol.TOPIC_CHANGE, protocol) return Ack diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt b/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt index 0827dc1066..d688a8e834 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt @@ -2,18 +2,19 @@ package com.r3corda.node.services.api import com.r3corda.core.messaging.Message import com.r3corda.core.messaging.MessagingService +import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize -import com.r3corda.protocols.AbstractRequestMessage +import com.r3corda.protocols.ServiceRequestMessage import javax.annotation.concurrent.ThreadSafe /** * Abstract superclass for services that a node can host, which provides helper functions. */ @ThreadSafe -abstract class AbstractNodeService(val net: MessagingService) : SingletonSerializeAsToken() { +abstract class AbstractNodeService(val net: MessagingService, val networkMapCache: NetworkMapCache) : SingletonSerializeAsToken() { /** * Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of @@ -24,18 +25,18 @@ abstract class AbstractNodeService(val net: MessagingService) : SingletonSeriali * @param handler a function to handle the deserialised request and return an optional response (if return type not Unit) * @param exceptionConsumer a function to which any thrown exception is passed. */ - protected inline fun + protected inline fun addMessageHandler(topic: String, crossinline handler: (Q) -> R, crossinline exceptionConsumer: (Message, Exception) -> Unit) { net.addMessageHandler(topic + TOPIC_DEFAULT_POSTFIX, null) { message, r -> try { - val req = message.data.deserialize() - val data = handler(req) + val request = message.data.deserialize() + val response = handler(request) // If the return type R is Unit, then do not send a response - if (data.javaClass != Unit.javaClass) { - val msg = net.createMessage("$topic.${req.sessionID}", data.serialize().bits) - net.send(msg, req.replyTo) + if (response.javaClass != Unit.javaClass) { + val msg = net.createMessage("$topic.${request.sessionID}", response.serialize().bits) + net.send(msg, request.getReplyTo(networkMapCache)) } } catch(e: Exception) { exceptionConsumer(message, e) @@ -51,7 +52,7 @@ abstract class AbstractNodeService(val net: MessagingService) : SingletonSeriali * @param topic the topic, without the default session ID postfix (".0) * @param handler a function to handle the deserialised request and return an optional response (if return type not Unit) */ - protected inline fun + protected inline fun addMessageHandler(topic: String, crossinline handler: (Q) -> R) { addMessageHandler(topic, handler, { message: Message, exception: Exception -> throw exception }) diff --git a/node/src/main/kotlin/com/r3corda/node/services/clientapi/NodeInterestRates.kt b/node/src/main/kotlin/com/r3corda/node/services/clientapi/NodeInterestRates.kt index 83102bd4ff..f4cd859470 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/clientapi/NodeInterestRates.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/clientapi/NodeInterestRates.kt @@ -41,7 +41,7 @@ object NodeInterestRates { /** * The Service that wraps [Oracle] and handles messages/network interaction/request scrubbing. */ - class Service(node: AbstractNode) : AcceptsFileUpload, AbstractNodeService(node.services.networkService) { + class Service(node: AbstractNode) : AcceptsFileUpload, AbstractNodeService(node.services.networkService, node.services.networkMapCache) { val ss = node.services.storageService val oracle = Oracle(ss.myLegalIdentity, ss.myLegalIdentityKey, node.services.clock) @@ -84,7 +84,7 @@ object NodeInterestRates { override fun call(): Unit { val answers = service.oracle.query(request.queries, request.deadline) progressTracker.currentStep = SENDING - send("${RatesFixProtocol.TOPIC}.query", request.replyTo, request.sessionID!!, answers) + send("${RatesFixProtocol.TOPIC}.query", request.replyToParty, request.sessionID, answers) } } diff --git a/node/src/main/kotlin/com/r3corda/node/services/network/NetworkMapService.kt b/node/src/main/kotlin/com/r3corda/node/services/network/NetworkMapService.kt index ede0a51205..ccd6a3545c 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/network/NetworkMapService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/network/NetworkMapService.kt @@ -7,16 +7,16 @@ import com.r3corda.core.messaging.MessageRecipients import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo -import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.NetworkMapCache +import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX import com.r3corda.core.serialization.SerializedBytes import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize import com.r3corda.node.services.api.AbstractNodeService import com.r3corda.node.utilities.AddOrRemove +import com.r3corda.protocols.ServiceRequestMessage import org.slf4j.LoggerFactory -import com.r3corda.protocols.AbstractRequestMessage import java.security.PrivateKey import java.security.SignatureException import java.time.Instant @@ -61,20 +61,25 @@ interface NetworkMapService { val nodes: List - class FetchMapRequest(val subscribe: Boolean, val ifChangedSinceVersion: Int?, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) + abstract class NetworkMapRequestMessage(val replyTo: MessageRecipients) : ServiceRequestMessage { + override fun getReplyTo(networkMapCache: NetworkMapCache): MessageRecipients = replyTo + } + + class FetchMapRequest(val subscribe: Boolean, val ifChangedSinceVersion: Int?, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo) data class FetchMapResponse(val nodes: Collection?, val version: Int) - class QueryIdentityRequest(val identity: Party, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) + class QueryIdentityRequest(val identity: Party, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo) data class QueryIdentityResponse(val node: NodeInfo?) - class RegistrationRequest(val wireReg: WireNodeRegistration, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) + class RegistrationRequest(val wireReg: WireNodeRegistration, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo) data class RegistrationResponse(val success: Boolean) - class SubscribeRequest(val subscribe: Boolean, replyTo: MessageRecipients, sessionID: Long) : AbstractRequestMessage(replyTo, sessionID) + class SubscribeRequest(val subscribe: Boolean, replyTo: MessageRecipients, override val sessionID: Long) : NetworkMapRequestMessage(replyTo) data class SubscribeResponse(val confirmed: Boolean) data class Update(val wireReg: WireNodeRegistration, val replyTo: MessageRecipients) data class UpdateAcknowledge(val wireRegHash: SecureHash, val replyTo: MessageRecipients) } + @ThreadSafe -class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, val cache: NetworkMapCache) : NetworkMapService, AbstractNodeService(net) { +class InMemoryNetworkMapService(net: MessagingService, home: NodeRegistration, val cache: NetworkMapCache) : NetworkMapService, AbstractNodeService(net, cache) { private val registeredNodes = ConcurrentHashMap() // Map from subscriber address, to a list of unacknowledged updates private val subscribers = ThreadBox(mutableMapOf>()) diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt index 43a09867ab..1468897f9a 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/persistence/DataVendingService.kt @@ -2,6 +2,7 @@ package com.r3corda.node.services.persistence import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.messaging.MessagingService +import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.StorageService import com.r3corda.core.utilities.loggerFor import com.r3corda.node.services.api.AbstractNodeService @@ -24,7 +25,7 @@ import javax.annotation.concurrent.ThreadSafe * Additionally, because nodes do not store invalid transactions, requesting such a transaction will always yield null. */ @ThreadSafe -class DataVendingService(net: MessagingService, private val storage: StorageService) : AbstractNodeService(net) { +class DataVendingService(net: MessagingService, private val storage: StorageService, networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) { companion object { val logger = loggerFor() } diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt index 56000f4700..43a5aa98cc 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/ProtocolStateMachineImpl.kt @@ -5,7 +5,7 @@ import co.paralleluniverse.fibers.FiberScheduler import co.paralleluniverse.fibers.Suspendable import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture -import com.r3corda.core.messaging.MessageRecipients +import com.r3corda.core.crypto.Party import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.protocols.ProtocolStateMachine import com.r3corda.core.utilities.UntrustworthyData @@ -81,9 +81,13 @@ class ProtocolStateMachineImpl(val logic: ProtocolLogic, } @Suspendable @Suppress("UNCHECKED_CAST") - override fun sendAndReceive(topic: String, destination: MessageRecipients, sessionIDForSend: Long, sessionIDForReceive: Long, - obj: Any, recvType: Class): UntrustworthyData { - val result = StateMachineManager.FiberRequest.ExpectingResponse(topic, destination, sessionIDForSend, sessionIDForReceive, obj, recvType) + override fun sendAndReceive(topic: String, + destination: Party, + sessionIDForSend: Long, + sessionIDForReceive: Long, + payload: Any, + recvType: Class): UntrustworthyData { + val result = StateMachineManager.FiberRequest.ExpectingResponse(topic, destination, sessionIDForSend, sessionIDForReceive, payload, recvType) return suspendAndExpectReceive(result) } @@ -94,8 +98,8 @@ class ProtocolStateMachineImpl(val logic: ProtocolLogic, } @Suspendable - override fun send(topic: String, destination: MessageRecipients, sessionID: Long, obj: Any) { - val result = StateMachineManager.FiberRequest.NotExpectingResponse(topic, destination, sessionID, obj) + override fun send(topic: String, destination: Party, sessionID: Long, payload: Any) { + val result = StateMachineManager.FiberRequest.NotExpectingResponse(topic, destination, sessionID, payload) suspend(result) } diff --git a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt index 2616585dec..dfb752cfd0 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/statemachine/StateMachineManager.kt @@ -8,7 +8,7 @@ import com.esotericsoftware.kryo.Kryo import com.google.common.base.Throwables import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.abbreviate -import com.r3corda.core.messaging.MessageRecipients +import com.r3corda.core.crypto.Party import com.r3corda.core.messaging.runOnNextMessage import com.r3corda.core.messaging.send import com.r3corda.core.protocols.ProtocolLogic @@ -253,7 +253,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService request.payload?.let { val topic = "${request.topic}.${request.sessionIDForSend}" psm.logger.trace { "Sending message of type ${it.javaClass.name} using topic $topic to ${request.destination} (${it.toString().abbreviate(50)})" } - serviceHub.networkService.send(topic, it, request.destination!!) + val address = serviceHub.networkMapCache.getNodeByLegalName(request.destination!!.name)!!.address + serviceHub.networkService.send(topic, it, address) } if (request is FiberRequest.NotExpectingResponse) { // We sent a message, but don't expect a response, so re-enter the continuation to let it keep going. @@ -307,7 +308,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService // TODO: Clean this up open class FiberRequest(val topic: String, - val destination: MessageRecipients?, + val destination: Party?, val sessionIDForSend: Long, val sessionIDForReceive: Long, val payload: Any?) { @@ -317,7 +318,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService class ExpectingResponse( topic: String, - destination: MessageRecipients?, + destination: Party?, sessionIDForSend: Long, sessionIDForReceive: Long, obj: Any?, @@ -326,7 +327,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService class NotExpectingResponse( topic: String, - destination: MessageRecipients, + destination: Party, sessionIDForSend: Long, obj: Any? ) : FiberRequest(topic, destination, sessionIDForSend, -1, obj) diff --git a/node/src/main/kotlin/com/r3corda/node/services/transactions/NotaryService.kt b/node/src/main/kotlin/com/r3corda/node/services/transactions/NotaryService.kt index c549734ad0..aa69394e70 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/transactions/NotaryService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/transactions/NotaryService.kt @@ -2,7 +2,7 @@ package com.r3corda.node.services.transactions import com.r3corda.core.messaging.Ack import com.r3corda.core.messaging.MessagingService -import com.r3corda.core.messaging.SingleMessageRecipient +import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.TimestampChecker import com.r3corda.core.node.services.UniquenessProvider @@ -22,7 +22,8 @@ import com.r3corda.protocols.NotaryProtocol abstract class NotaryService(val smm: StateMachineManager, net: MessagingService, val timestampChecker: TimestampChecker, - val uniquenessProvider: UniquenessProvider) : AbstractNodeService(net) { + val uniquenessProvider: UniquenessProvider, + networkMapCache: NetworkMapCache) : AbstractNodeService(net, networkMapCache) { object Type : ServiceType("corda.notary") abstract val logger: org.slf4j.Logger @@ -37,8 +38,9 @@ abstract class NotaryService(val smm: StateMachineManager, } private fun processRequest(req: NotaryProtocol.Handshake): Ack { - val protocol = protocolFactory.create(req.replyTo as SingleMessageRecipient, - req.sessionID!!, + val protocol = protocolFactory.create( + req.replyToParty, + req.sessionID, req.sendSessionID, timestampChecker, uniquenessProvider) diff --git a/node/src/main/kotlin/com/r3corda/node/services/transactions/SimpleNotaryService.kt b/node/src/main/kotlin/com/r3corda/node/services/transactions/SimpleNotaryService.kt index 850d84d40f..3cf38589f8 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/transactions/SimpleNotaryService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/transactions/SimpleNotaryService.kt @@ -1,6 +1,7 @@ package com.r3corda.node.services.transactions import com.r3corda.core.messaging.MessagingService +import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.TimestampChecker import com.r3corda.core.node.services.UniquenessProvider @@ -13,7 +14,8 @@ class SimpleNotaryService( smm: StateMachineManager, net: MessagingService, timestampChecker: TimestampChecker, - uniquenessProvider: UniquenessProvider) : NotaryService(smm, net, timestampChecker, uniquenessProvider) { + uniquenessProvider: UniquenessProvider, + networkMapCache: NetworkMapCache) : NotaryService(smm, net, timestampChecker, uniquenessProvider, networkMapCache) { object Type : ServiceType("corda.notary.simple") override val logger = loggerFor() diff --git a/node/src/main/kotlin/com/r3corda/node/services/transactions/ValidatingNotaryService.kt b/node/src/main/kotlin/com/r3corda/node/services/transactions/ValidatingNotaryService.kt index 3b0fc6faf1..d6f8fac88a 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/transactions/ValidatingNotaryService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/transactions/ValidatingNotaryService.kt @@ -1,7 +1,8 @@ package com.r3corda.node.services.transactions +import com.r3corda.core.crypto.Party import com.r3corda.core.messaging.MessagingService -import com.r3corda.core.messaging.SingleMessageRecipient +import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.ServiceType import com.r3corda.core.node.services.TimestampChecker import com.r3corda.core.node.services.UniquenessProvider @@ -15,14 +16,15 @@ class ValidatingNotaryService( smm: StateMachineManager, net: MessagingService, timestampChecker: TimestampChecker, - uniquenessProvider: UniquenessProvider -) : NotaryService(smm, net, timestampChecker, uniquenessProvider) { + uniquenessProvider: UniquenessProvider, + networkMapCache: NetworkMapCache +) : NotaryService(smm, net, timestampChecker, uniquenessProvider, networkMapCache) { object Type : ServiceType("corda.notary.validating") override val logger = loggerFor() override val protocolFactory = object : NotaryProtocol.Factory { - override fun create(otherSide: SingleMessageRecipient, + override fun create(otherSide: Party, sendSessionID: Long, receiveSessionID: Long, timestampChecker: TimestampChecker, diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt index b38dc92be6..eddd1f3f94 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/AttachmentTests.kt @@ -59,7 +59,8 @@ class AttachmentTests { val id = n0.storage.attachments.importAttachment(ByteArrayInputStream(fakeAttachment())) // Get node one to run a protocol to fetch it and insert it. - val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.net.myAddress)) + network.runNetwork() + val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)) network.runNetwork() assertEquals(0, f1.get().fromDisk.size) @@ -70,7 +71,7 @@ class AttachmentTests { // Shut down node zero and ensure node one can still resolve the attachment. n0.stop() - val response: FetchDataProtocol.Result = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.net.myAddress)).get() + val response: FetchDataProtocol.Result = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)).get() assertEquals(attachment, response.fromDisk[0]) } @@ -80,14 +81,15 @@ class AttachmentTests { // Get node one to fetch a non-existent attachment. val hash = SecureHash.randomSHA256() - val f1 = n1.smm.add("tests.fetch2", FetchAttachmentsProtocol(setOf(hash), n0.net.myAddress)) + network.runNetwork() + val f1 = n1.smm.add("tests.fetch2", FetchAttachmentsProtocol(setOf(hash), n0.info.identity)) network.runNetwork() val e = assertFailsWith { rootCauseExceptions { f1.get() } } assertEquals(hash, e.requested) } @Test - fun maliciousResponse() { + fun `malicious response`() { // Make a node that doesn't do sanity checking at load time. val n0 = network.createNode(null, -1, object : MockNetwork.Factory { override fun create(dir: Path, config: NodeConfiguration, network: MockNetwork, networkMapAddr: NodeInfo?, @@ -112,7 +114,8 @@ class AttachmentTests { writer.close() // Get n1 to fetch the attachment. Should receive corrupted bytes. - val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.net.myAddress)) + network.runNetwork() + val f1 = n1.smm.add("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.identity)) network.runNetwork() assertFailsWith { rootCauseExceptions { f1.get() } diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt index 7975972f8b..0934eed820 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt @@ -11,7 +11,6 @@ import com.r3corda.core.contracts.* import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash import com.r3corda.core.days -import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.ServiceHub import com.r3corda.core.node.services.ServiceType @@ -56,14 +55,14 @@ class TwoPartyTradeProtocolTests { lateinit var net: MockNetwork private fun runSeller(smm: StateMachineManager, notary: NodeInfo, - otherSide: SingleMessageRecipient, assetToSell: StateAndRef, price: Amount>, + otherSide: Party, assetToSell: StateAndRef, price: Amount>, myKeyPair: KeyPair, buyerSessionID: Long): ListenableFuture { val seller = TwoPartyTradeProtocol.Seller(otherSide, notary, assetToSell, price, myKeyPair, buyerSessionID) return smm.add("${TwoPartyTradeProtocol.TRADE_TOPIC}.seller", seller) } private fun runBuyer(smm: StateMachineManager, notaryNode: NodeInfo, - otherSide: SingleMessageRecipient, acceptablePrice: Amount>, typeToBuy: Class, + otherSide: Party, acceptablePrice: Amount>, typeToBuy: Class, sessionID: Long): ListenableFuture { val buyer = TwoPartyTradeProtocol.Buyer(otherSide, notaryNode.identity, acceptablePrice, typeToBuy, sessionID) return smm.add("${TwoPartyTradeProtocol.TRADE_TOPIC}.buyer", buyer) @@ -105,7 +104,7 @@ class TwoPartyTradeProtocolTests { val bobResult = runBuyer( bobNode.smm, notaryNode.info, - aliceNode.net.myAddress, + aliceNode.info.identity, 1000.DOLLARS `issued by` issuer, CommercialPaper.State::class.java, buyerSessionID @@ -113,7 +112,7 @@ class TwoPartyTradeProtocolTests { val aliceResult = runSeller( aliceNode.smm, notaryNode.info, - bobNode.net.myAddress, + bobNode.info.identity, lookup("alice's paper"), 1000.DOLLARS `issued by` issuer, ALICE_KEY, @@ -139,7 +138,6 @@ class TwoPartyTradeProtocolTests { val aliceNode = net.createPartyNode(notaryNode.info, ALICE.name, ALICE_KEY) var bobNode = net.createPartyNode(notaryNode.info, BOB.name, BOB_KEY) - val aliceAddr = aliceNode.net.myAddress val bobAddr = bobNode.net.myAddress as InMemoryMessagingNetwork.Handle val networkMapAddr = notaryNode.info val issuer = bobNode.services.storageService.myLegalIdentity.ref(0) @@ -156,7 +154,7 @@ class TwoPartyTradeProtocolTests { val aliceFuture = runSeller( aliceNode.smm, notaryNode.info, - bobAddr, + bobNode.info.identity, lookup("alice's paper"), 1000.DOLLARS `issued by` issuer, ALICE_KEY, @@ -165,7 +163,7 @@ class TwoPartyTradeProtocolTests { runBuyer( bobNode.smm, notaryNode.info, - aliceAddr, + aliceNode.info.identity, 1000.DOLLARS `issued by` issuer, CommercialPaper.State::class.java, buyerSessionID @@ -276,7 +274,7 @@ class TwoPartyTradeProtocolTests { runSeller( aliceNode.smm, notaryNode.info, - bobNode.net.myAddress, + bobNode.info.identity, lookup("alice's paper"), 1000.DOLLARS `issued by` issuer, ALICE_KEY, @@ -285,7 +283,7 @@ class TwoPartyTradeProtocolTests { runBuyer( bobNode.smm, notaryNode.info, - aliceNode.net.myAddress, + aliceNode.info.identity, 1000.DOLLARS `issued by` issuer, CommercialPaper.State::class.java, buyerSessionID @@ -371,9 +369,6 @@ class TwoPartyTradeProtocolTests { val bobNode = net.createPartyNode(notaryNode.info, BOB.name, BOB_KEY) val issuer = MEGA_CORP.ref(1, 2, 3) - val aliceAddr = aliceNode.net.myAddress - val bobAddr = bobNode.net.myAddress as InMemoryMessagingNetwork.Handle - val bobKey = bobNode.keyManagement.freshKey() val bobsBadCash = fillUpForBuyer(bobError, bobKey.public).second val alicesFakePaper = fillUpForSeller(aliceError, aliceNode.storage.myLegalIdentity.owningKey, @@ -389,7 +384,7 @@ class TwoPartyTradeProtocolTests { val aliceResult = runSeller( aliceNode.smm, notaryNode.info, - bobAddr, + bobNode.info.identity, lookup("alice's paper"), 1000.DOLLARS `issued by` issuer, ALICE_KEY, @@ -398,7 +393,7 @@ class TwoPartyTradeProtocolTests { val bobResult = runBuyer( bobNode.smm, notaryNode.info, - aliceAddr, + aliceNode.info.identity, 1000.DOLLARS `issued by` issuer, CommercialPaper.State::class.java, buyerSessionID diff --git a/node/src/test/kotlin/com/r3corda/node/services/InMemoryNetworkMapServiceTest.kt b/node/src/test/kotlin/com/r3corda/node/services/InMemoryNetworkMapServiceTest.kt index cbfabb5bfe..29c1221d98 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/InMemoryNetworkMapServiceTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/InMemoryNetworkMapServiceTest.kt @@ -59,7 +59,7 @@ class InMemoryNetworkMapServiceTest { assertEquals(2, service.nodes.count()) // Confirm that de-registering the node succeeds and drops it from the node lists - var removeChange = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires) + val removeChange = NodeRegistration(registerNode.info, seq, AddOrRemove.REMOVE, expires) val removeWireChange = removeChange.toWire(nodeKey.private) assert(service.processRegistrationChangeRequest(NetworkMapService.RegistrationRequest(removeWireChange, mapServiceNode.info.address, Long.MIN_VALUE)).success) assertNull(service.processQueryRequest(NetworkMapService.QueryIdentityRequest(registerNode.info.identity, mapServiceNode.info.address, Long.MIN_VALUE)).node) @@ -73,7 +73,7 @@ class InMemoryNetworkMapServiceTest { @Suspendable override fun call() { val req = NetworkMapService.UpdateAcknowledge(hash, serviceHub.networkService.myAddress) - send(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, server.address, 0, req) + send(NetworkMapService.PUSH_ACK_PROTOCOL_TOPIC, server.identity, 0, req) } } @@ -84,7 +84,7 @@ class InMemoryNetworkMapServiceTest { val sessionID = random63BitValue() val req = NetworkMapService.FetchMapRequest(subscribe, ifChangedSinceVersion, serviceHub.networkService.myAddress, sessionID) return sendAndReceive( - NetworkMapService.FETCH_PROTOCOL_TOPIC, server.address, 0, sessionID, req) + NetworkMapService.FETCH_PROTOCOL_TOPIC, server.identity, 0, sessionID, req) .validate { it.nodes } } } @@ -97,7 +97,7 @@ class InMemoryNetworkMapServiceTest { val req = NetworkMapService.RegistrationRequest(reg.toWire(privateKey), serviceHub.networkService.myAddress, sessionID) return sendAndReceive( - NetworkMapService.REGISTER_PROTOCOL_TOPIC, server.address, 0, sessionID, req) + NetworkMapService.REGISTER_PROTOCOL_TOPIC, server.identity, 0, sessionID, req) .validate { it } } } @@ -110,19 +110,20 @@ class InMemoryNetworkMapServiceTest { val req = NetworkMapService.SubscribeRequest(subscribe, serviceHub.networkService.myAddress, sessionID) return sendAndReceive( - NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, server.address, 0, sessionID, req) + NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, server.identity, 0, sessionID, req) .validate { it } } } @Test - fun successWithNetwork() { + fun `success with network`() { val (mapServiceNode, registerNode) = network.createTwoNodes() // Confirm there's a network map service on node 0 assertNotNull(mapServiceNode.inNodeNetworkMapService) // Confirm all nodes have registered themselves + network.runNetwork() var fetchPsm = registerNode.smm.add(NetworkMapService.FETCH_PROTOCOL_TOPIC, TestFetchPSM(mapServiceNode.info, false)) network.runNetwork() assertEquals(2, fetchPsm.get()?.count()) @@ -143,11 +144,12 @@ class InMemoryNetworkMapServiceTest { } @Test - fun subscribeWithNetwork() { + fun `subscribe with network`() { val (mapServiceNode, registerNode) = network.createTwoNodes() val service = (mapServiceNode.inNodeNetworkMapService as InMemoryNetworkMapService) // Test subscribing to updates + network.runNetwork() val subscribePsm = registerNode.smm.add(NetworkMapService.SUBSCRIPTION_PROTOCOL_TOPIC, TestSubscribePSM(mapServiceNode.info, true)) network.runNetwork() diff --git a/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt b/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt index 72bd57df44..242d41dbf2 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/MockServices.kt @@ -1,8 +1,8 @@ package com.r3corda.node.services import com.codahale.metrics.MetricRegistry -import com.r3corda.core.contracts.SignedTransaction import com.google.common.util.concurrent.ListenableFuture +import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.services.* import com.r3corda.core.node.services.testing.MockStorageService @@ -68,7 +68,7 @@ open class MockServices( if (net != null && storage != null) { // Creating this class is sufficient, we don't have to store it anywhere, because it registers a listener // on the networking service, so that will keep it from being collected. - DataVendingService(net, storage) + DataVendingService(net, storage, networkMapCache) } } } diff --git a/node/src/test/kotlin/com/r3corda/node/services/NodeInterestRatesTest.kt b/node/src/test/kotlin/com/r3corda/node/services/NodeInterestRatesTest.kt index 983db252d8..2cb5ba4f38 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/NodeInterestRatesTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/NodeInterestRatesTest.kt @@ -8,9 +8,9 @@ import com.r3corda.contracts.testing.`with notary` import com.r3corda.core.bd import com.r3corda.core.contracts.DOLLARS import com.r3corda.core.contracts.Fix +import com.r3corda.core.contracts.TransactionType import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.generateKeyPair -import com.r3corda.core.contracts.TransactionType import com.r3corda.core.testing.ALICE_PUBKEY import com.r3corda.core.testing.DUMMY_NOTARY import com.r3corda.core.testing.MEGA_CORP @@ -109,8 +109,9 @@ class NodeInterestRatesTest { val tx = TransactionType.General.Builder() val fixOf = NodeInterestRates.parseFixOf("LIBOR 2016-03-16 1M") - val protocol = RatesFixProtocol(tx, n2.info, fixOf, "0.675".bd, "0.1".bd, Duration.ofNanos(1)) + val protocol = RatesFixProtocol(tx, n2.info.identity, fixOf, "0.675".bd, "0.1".bd, Duration.ofNanos(1)) BriefLogFormatter.initVerbose("rates") + net.runNetwork() val future = n1.smm.add("rates", protocol) net.runNetwork() @@ -123,4 +124,4 @@ class NodeInterestRatesTest { } private fun makeTX() = TransactionType.General.Builder().withItems(1000.DOLLARS.CASH `issued by` DUMMY_CASH_ISSUER `owned by` ALICE_PUBKEY `with notary` DUMMY_NOTARY) -} \ No newline at end of file +} diff --git a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt index 007cc51e36..e795510e85 100644 --- a/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/RateFixDemo.kt @@ -65,7 +65,7 @@ fun main(args: Array) { val rateTolerance = BigDecimal(options.valueOf(rateToleranceArg)) // Bring up node. - var advertisedServices: Set = emptySet() + val advertisedServices: Set = emptySet() val myNetAddr = ArtemisMessagingService.toHostAndPort(options.valueOf(networkAddressArg)) val config = object : NodeConfiguration { override val myLegalName: String = "Rate fix demo node" @@ -84,7 +84,7 @@ fun main(args: Array) { // Make a garbage transaction that includes a rate fix. val tx = TransactionType.General.Builder() tx.addOutputState(TransactionState(Cash.State(1500.DOLLARS `issued by` node.storage.myLegalIdentity.ref(1), node.keyManagement.freshKey().public), notary.identity)) - val protocol = RatesFixProtocol(tx, oracleNode, fixOf, expectedRate, rateTolerance, 24.hours) + val protocol = RatesFixProtocol(tx, oracleNode.identity, fixOf, expectedRate, rateTolerance, 24.hours) node.smm.add("demo.ratefix", protocol).get() node.stop() diff --git a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt index 8178309c4b..c15bfd82e7 100644 --- a/src/main/kotlin/com/r3corda/demos/TraderDemo.kt +++ b/src/main/kotlin/com/r3corda/demos/TraderDemo.kt @@ -11,7 +11,6 @@ import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.days import com.r3corda.core.logElapsedTime -import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.ServiceType import com.r3corda.core.protocols.ProtocolLogic @@ -163,17 +162,13 @@ fun runTraderDemo(args: Array): Int { if (role == Role.BUYER) { runBuyer(node, amount) } else { - val recipient = ArtemisMessagingService.makeRecipient(theirNetAddr) - runSeller(myNetAddr, node, recipient, amount) + runSeller(node, amount, cashIssuer) } return 0 } -private fun runSeller(myNetAddr: HostAndPort, - node: Node, - recipient: SingleMessageRecipient, - amount: Amount>) { +private fun runSeller(node: Node, amount: Amount>, otherSide: Party) { // The seller will sell some commercial paper to the buyer, who will pay with (self issued) cash. // // The CP sale transaction comes with a prospectus PDF, which will tag along for the ride in an @@ -192,7 +187,7 @@ private fun runSeller(myNetAddr: HostAndPort, it.second.get() } } else { - val seller = TraderDemoProtocolSeller(myNetAddr, recipient, amount) + val seller = TraderDemoProtocolSeller(otherSide, amount) node.smm.add("demo.seller", seller).get() } @@ -208,8 +203,7 @@ private fun runBuyer(node: Node, amount: Amount>) { } val future = if (node.isPreviousCheckpointsPresent) { - val (@Suppress("UNUSED_VARIABLE") buyer, future) = node.smm.findStateMachines(TraderDemoProtocolBuyer::class.java).single() - future + node.smm.findStateMachines(TraderDemoProtocolBuyer::class.java).single().second } else { // We use a simple scenario-specific wrapper protocol to make things happen. val buyer = TraderDemoProtocolBuyer(attachmentsPath, node.info.identity, amount) @@ -249,16 +243,19 @@ private class TraderDemoProtocolBuyer(private val attachmentsPath: Path, // As the seller initiates the two-party trade protocol, here, we will be the buyer. try { progressTracker.currentStep = WAITING_FOR_SELLER_TO_CONNECT - val origin = receive(DEMO_TOPIC, 0).validate { it.withDefaultPort(Node.DEFAULT_PORT) } - val recipient = ArtemisMessagingService.makeRecipient(origin as HostAndPort) + val newPartnerParty = receive(DEMO_TOPIC, 0).validate { + val ourVersionOfParty = serviceHub.networkMapCache.getNodeByLegalName(it.name)!!.identity + require(ourVersionOfParty == it) + it + } // The session ID disambiguates the test trade. val sessionID = random63BitValue() progressTracker.currentStep = STARTING_BUY - send(DEMO_TOPIC, recipient, 0, sessionID) + send(DEMO_TOPIC, newPartnerParty, 0, sessionID) val notary = serviceHub.networkMapCache.notaryNodes[0] - val buyer = TwoPartyTradeProtocol.Buyer(recipient, notary.identity, amount, + val buyer = TwoPartyTradeProtocol.Buyer(newPartnerParty, notary.identity, amount, CommercialPaper.State::class.java, sessionID) // This invokes the trading protocol and out pops our finished transaction. @@ -301,10 +298,9 @@ ${Emoji.renderIfSupported(cpIssuance)}""") } } -private class TraderDemoProtocolSeller(val myAddress: HostAndPort, - val otherSide: SingleMessageRecipient, - val amount: Amount>, - override val progressTracker: ProgressTracker = TraderDemoProtocolSeller.tracker()) : ProtocolLogic() { +private class TraderDemoProtocolSeller(val otherSide: Party, + val amount: Amount>, + override val progressTracker: ProgressTracker = TraderDemoProtocolSeller.tracker()) : ProtocolLogic() { companion object { val PROSPECTUS_HASH = SecureHash.parse("decd098666b9657314870e192ced0c3519c2c9d395507a238338f8d003929de9") @@ -326,7 +322,7 @@ private class TraderDemoProtocolSeller(val myAddress: HostAndPort, override fun call() { progressTracker.currentStep = ANNOUNCING - val sessionID = sendAndReceive(DEMO_TOPIC, otherSide, 0, 0, myAddress).validate { it } + val sessionID = sendAndReceive(DEMO_TOPIC, otherSide, 0, 0, serviceHub.storageService.myLegalIdentity).validate { it } progressTracker.currentStep = SELF_ISSUING diff --git a/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt b/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt index 2be25b79b4..a2f5a72fa3 100644 --- a/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt +++ b/src/main/kotlin/com/r3corda/demos/protocols/AutoOfferProtocol.kt @@ -6,7 +6,6 @@ import com.google.common.util.concurrent.Futures import com.r3corda.core.contracts.DealState import com.r3corda.core.contracts.SignedTransaction import com.r3corda.core.crypto.Party -import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.protocols.ProtocolLogic import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.deserialize @@ -24,7 +23,7 @@ import com.r3corda.protocols.TwoPartyDealProtocol object AutoOfferProtocol { val TOPIC = "autooffer.topic" - data class AutoOfferMessage(val otherSide: SingleMessageRecipient, + data class AutoOfferMessage(val otherSide: Party, val notary: Party, val otherSessionID: Long, val dealBeingOffered: DealState) @@ -67,7 +66,7 @@ object AutoOfferProtocol { } - class Requester(val dealToBeOffered: DealState) : ProtocolLogic() { + class Requester(val dealToBeOffered: DealState) : ProtocolLogic() { companion object { object RECEIVED : ProgressTracker.Step("Received API call") @@ -98,9 +97,9 @@ object AutoOfferProtocol { val otherParty = notUs(*dealToBeOffered.parties).single() val otherNode = (serviceHub.networkMapCache.getNodeByLegalName(otherParty.name)) requireNotNull(otherNode) { "Cannot identify other party " + otherParty.name + ", know about: " + serviceHub.networkMapCache.partyNodes.map { it.identity } } - val otherSide = otherNode!!.address + val otherSide = otherNode!!.identity progressTracker.currentStep = ANNOUNCING - send(TOPIC, otherSide, 0, AutoOfferMessage(serviceHub.networkService.myAddress, notary, ourSessionID, dealToBeOffered)) + send(TOPIC, otherSide, 0, AutoOfferMessage(serviceHub.storageService.myLegalIdentity, notary, ourSessionID, dealToBeOffered)) progressTracker.currentStep = DEALING val stx = subProtocol(TwoPartyDealProtocol.Acceptor(otherSide, notary, dealToBeOffered, ourSessionID, progressTracker.getChildProgressTracker(DEALING)!!)) return stx diff --git a/src/main/kotlin/com/r3corda/demos/protocols/ExitServerProtocol.kt b/src/main/kotlin/com/r3corda/demos/protocols/ExitServerProtocol.kt index abf20e3809..a3b9d6a01c 100644 --- a/src/main/kotlin/com/r3corda/demos/protocols/ExitServerProtocol.kt +++ b/src/main/kotlin/com/r3corda/demos/protocols/ExitServerProtocol.kt @@ -22,7 +22,7 @@ object ExitServerProtocol { object Handler { fun register(node: Node) { - node.net.addMessageHandler("${TOPIC}.0") { msg, registration -> + node.net.addMessageHandler("$TOPIC.0") { msg, registration -> // Just to validate we got the message if (enabled) { val message = msg.data.deserialize() @@ -62,7 +62,7 @@ object ExitServerProtocol { } else { // TODO: messaging ourselves seems to trigger a bug for the time being and we continuously receive messages if (recipient.identity != serviceHub.storageService.myLegalIdentity) { - send(TOPIC, recipient.address, 0, message) + send(TOPIC, recipient.identity, 0, message) } } } diff --git a/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt b/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt index c702be19fc..00ede632fa 100644 --- a/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt +++ b/src/main/kotlin/com/r3corda/demos/protocols/UpdateBusinessDayProtocol.kt @@ -52,7 +52,7 @@ object UpdateBusinessDayProtocol { if (recipient.address is MockNetworkMapCache.MockAddress) { // Ignore } else { - send(TOPIC, recipient.address, 0, message) + send(TOPIC, recipient.identity, 0, message) } } }