mirror of
https://github.com/corda/corda.git
synced 2024-12-20 13:33:12 +00:00
Merged in cor-133-ack-support (pull request #174)
Break out message handler changes
This commit is contained in:
commit
5c6d604815
@ -1,6 +1,7 @@
|
|||||||
package com.r3corda.core.messaging
|
package com.r3corda.core.messaging
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
|
import com.r3corda.core.serialization.DeserializeAsKotlinObjectDef
|
||||||
import com.r3corda.core.serialization.serialize
|
import com.r3corda.core.serialization.serialize
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.concurrent.Executor
|
import java.util.concurrent.Executor
|
||||||
@ -133,3 +134,9 @@ interface MessageRecipientGroup : MessageRecipients
|
|||||||
|
|
||||||
/** A special base class for the set of all possible recipients, without having to identify who they all are. */
|
/** A special base class for the set of all possible recipients, without having to identify who they all are. */
|
||||||
interface AllPossibleRecipients : MessageRecipients
|
interface AllPossibleRecipients : MessageRecipients
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A general Ack message that conveys no content other than it's presence for use when you want an acknowledgement
|
||||||
|
* from a recipient. Using [Unit] can be ambiguous as it is similar to [Void] and so could mean no response.
|
||||||
|
*/
|
||||||
|
object Ack : DeserializeAsKotlinObjectDef
|
@ -37,7 +37,6 @@ import java.util.*
|
|||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
import kotlin.reflect.*
|
import kotlin.reflect.*
|
||||||
import kotlin.reflect.jvm.javaType
|
import kotlin.reflect.jvm.javaType
|
||||||
import java.security.PrivateKey
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Serialization utilities, using the Kryo framework with a custom serialiser for immutable data classes and a dead
|
* Serialization utilities, using the Kryo framework with a custom serialiser for immutable data classes and a dead
|
||||||
@ -305,6 +304,19 @@ object Ed25519PublicKeySerializer : Serializer<EdDSAPublicKey>() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Marker interface for kotlin object definitions so that they are deserialized as the singleton instance. */
|
||||||
|
interface DeserializeAsKotlinObjectDef
|
||||||
|
|
||||||
|
/** Serializer to deserialize kotlin object definitions marked with [DeserializeAsKotlinObjectDef]. */
|
||||||
|
object KotlinObjectSerializer : Serializer<DeserializeAsKotlinObjectDef>() {
|
||||||
|
override fun read(kryo: Kryo, input: Input, type: Class<DeserializeAsKotlinObjectDef>): DeserializeAsKotlinObjectDef {
|
||||||
|
// read the public static INSTANCE field that kotlin compiler generates.
|
||||||
|
return type.getField("INSTANCE").get(null) as DeserializeAsKotlinObjectDef
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun write(kryo: Kryo, output: Output, obj: DeserializeAsKotlinObjectDef) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fun createKryo(k: Kryo = Kryo()): Kryo {
|
fun createKryo(k: Kryo = Kryo()): Kryo {
|
||||||
return k.apply {
|
return k.apply {
|
||||||
@ -355,6 +367,9 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
|
|||||||
// This ensures a NonEmptySetSerializer is constructed with an initial value.
|
// This ensures a NonEmptySetSerializer is constructed with an initial value.
|
||||||
register(NonEmptySet::class.java, NonEmptySetSerializer)
|
register(NonEmptySet::class.java, NonEmptySetSerializer)
|
||||||
|
|
||||||
|
/** This ensures any kotlin objects that implement [DeserializeAsKotlinObjectDef] are read back in as singletons. */
|
||||||
|
addDefaultSerializer(DeserializeAsKotlinObjectDef::class.java, KotlinObjectSerializer)
|
||||||
|
|
||||||
noReferencesWithin<WireTransaction>()
|
noReferencesWithin<WireTransaction>()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ import com.r3corda.core.crypto.DigitalSignature
|
|||||||
import com.r3corda.core.crypto.Party
|
import com.r3corda.core.crypto.Party
|
||||||
import com.r3corda.core.crypto.SignedData
|
import com.r3corda.core.crypto.SignedData
|
||||||
import com.r3corda.core.crypto.signWithECDSA
|
import com.r3corda.core.crypto.signWithECDSA
|
||||||
|
import com.r3corda.core.messaging.Ack
|
||||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||||
import com.r3corda.core.node.NodeInfo
|
import com.r3corda.core.node.NodeInfo
|
||||||
import com.r3corda.core.node.services.TimestampChecker
|
import com.r3corda.core.node.services.TimestampChecker
|
||||||
@ -55,7 +56,7 @@ object NotaryProtocol {
|
|||||||
val receiveSessionID = random63BitValue()
|
val receiveSessionID = random63BitValue()
|
||||||
|
|
||||||
val handshake = Handshake(serviceHub.networkService.myAddress, sendSessionID, receiveSessionID)
|
val handshake = Handshake(serviceHub.networkService.myAddress, sendSessionID, receiveSessionID)
|
||||||
sendAndReceive<Unit>(TOPIC_INITIATE, notaryNode.address, 0, receiveSessionID, handshake)
|
sendAndReceive<Ack>(TOPIC_INITIATE, notaryNode.address, 0, receiveSessionID, handshake)
|
||||||
|
|
||||||
val request = SignRequest(wtx.serialized, serviceHub.storageService.myLegalIdentity)
|
val request = SignRequest(wtx.serialized, serviceHub.storageService.myLegalIdentity)
|
||||||
val response = sendAndReceive<Result>(TOPIC, notaryNode.address, sendSessionID, receiveSessionID, request)
|
val response = sendAndReceive<Result>(TOPIC, notaryNode.address, sendSessionID, receiveSessionID, request)
|
||||||
|
@ -5,6 +5,7 @@ import com.r3corda.core.contracts.*
|
|||||||
import com.r3corda.core.crypto.DigitalSignature
|
import com.r3corda.core.crypto.DigitalSignature
|
||||||
import com.r3corda.core.crypto.Party
|
import com.r3corda.core.crypto.Party
|
||||||
import com.r3corda.core.crypto.signWithECDSA
|
import com.r3corda.core.crypto.signWithECDSA
|
||||||
|
import com.r3corda.core.messaging.Ack
|
||||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||||
import com.r3corda.core.node.NodeInfo
|
import com.r3corda.core.node.NodeInfo
|
||||||
import com.r3corda.core.protocols.ProtocolLogic
|
import com.r3corda.core.protocols.ProtocolLogic
|
||||||
@ -104,7 +105,7 @@ object NotaryChangeProtocol {
|
|||||||
val proposal = Proposal(originalState.ref, newNotary, stx)
|
val proposal = Proposal(originalState.ref, newNotary, stx)
|
||||||
|
|
||||||
val handshake = Handshake(sessionIdForSend, serviceHub.networkService.myAddress, sessionIdForReceive)
|
val handshake = Handshake(sessionIdForSend, serviceHub.networkService.myAddress, sessionIdForReceive)
|
||||||
sendAndReceive<Unit>(TOPIC_INITIATE, node.address, 0, sessionIdForReceive, handshake)
|
sendAndReceive<Ack>(TOPIC_INITIATE, node.address, 0, sessionIdForReceive, handshake)
|
||||||
|
|
||||||
val response = sendAndReceive<Result>(TOPIC_CHANGE, node.address, sessionIdForSend, sessionIdForReceive, proposal)
|
val response = sendAndReceive<Result>(TOPIC_CHANGE, node.address, sessionIdForSend, sessionIdForReceive, proposal)
|
||||||
val participantSignature = response.validate {
|
val participantSignature = response.validate {
|
||||||
|
@ -3,6 +3,7 @@ package com.r3corda.core.serialization
|
|||||||
import com.google.common.primitives.Ints
|
import com.google.common.primitives.Ints
|
||||||
import com.r3corda.core.crypto.generateKeyPair
|
import com.r3corda.core.crypto.generateKeyPair
|
||||||
import com.r3corda.core.crypto.signWithECDSA
|
import com.r3corda.core.crypto.signWithECDSA
|
||||||
|
import com.r3corda.core.messaging.Ack
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
@ -73,6 +74,14 @@ class KryoTests {
|
|||||||
assertThatThrownBy { deserialisedSignature.verifyWithECDSA(wrongBits) }
|
assertThatThrownBy { deserialisedSignature.verifyWithECDSA(wrongBits) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `write and read Ack`() {
|
||||||
|
val tokenizableBefore = Ack
|
||||||
|
val serializedBytes = tokenizableBefore.serialize(kryo)
|
||||||
|
val tokenizableAfter = serializedBytes.deserialize(kryo)
|
||||||
|
assertThat(tokenizableAfter).isSameAs(tokenizableBefore)
|
||||||
|
}
|
||||||
|
|
||||||
private data class Person(val name: String, val birthday: Instant?)
|
private data class Person(val name: String, val birthday: Instant?)
|
||||||
|
|
||||||
@Suppress("unused")
|
@Suppress("unused")
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package com.r3corda.node.services
|
package com.r3corda.node.services
|
||||||
|
|
||||||
|
import com.r3corda.core.messaging.Ack
|
||||||
import com.r3corda.core.messaging.MessagingService
|
import com.r3corda.core.messaging.MessagingService
|
||||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||||
import com.r3corda.node.services.api.AbstractNodeService
|
import com.r3corda.node.services.api.AbstractNodeService
|
||||||
@ -17,11 +18,12 @@ class NotaryChangeService(net: MessagingService, val smm: StateMachineManager) :
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun handleChangeNotaryRequest(req: NotaryChangeProtocol.Handshake) {
|
private fun handleChangeNotaryRequest(req: NotaryChangeProtocol.Handshake): Ack {
|
||||||
val protocol = NotaryChangeProtocol.Acceptor(
|
val protocol = NotaryChangeProtocol.Acceptor(
|
||||||
req.replyTo as SingleMessageRecipient,
|
req.replyTo as SingleMessageRecipient,
|
||||||
req.sessionID!!,
|
req.sessionID!!,
|
||||||
req.sessionIdForSend)
|
req.sessionIdForSend)
|
||||||
smm.add(NotaryChangeProtocol.TOPIC_CHANGE, protocol)
|
smm.add(NotaryChangeProtocol.TOPIC_CHANGE, protocol)
|
||||||
|
return Ack
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package com.r3corda.node.services.api
|
|||||||
import com.r3corda.core.messaging.Message
|
import com.r3corda.core.messaging.Message
|
||||||
import com.r3corda.core.messaging.MessagingService
|
import com.r3corda.core.messaging.MessagingService
|
||||||
import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX
|
import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX
|
||||||
|
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||||
import com.r3corda.core.serialization.deserialize
|
import com.r3corda.core.serialization.deserialize
|
||||||
import com.r3corda.core.serialization.serialize
|
import com.r3corda.core.serialization.serialize
|
||||||
import com.r3corda.protocols.AbstractRequestMessage
|
import com.r3corda.protocols.AbstractRequestMessage
|
||||||
@ -12,14 +13,15 @@ import javax.annotation.concurrent.ThreadSafe
|
|||||||
* Abstract superclass for services that a node can host, which provides helper functions.
|
* Abstract superclass for services that a node can host, which provides helper functions.
|
||||||
*/
|
*/
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
abstract class AbstractNodeService(val net: MessagingService) {
|
abstract class AbstractNodeService(val net: MessagingService) : SingletonSerializeAsToken() {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
|
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
|
||||||
* common boilerplate code. Exceptions are caught and passed to the provided consumer.
|
* common boilerplate code. Exceptions are caught and passed to the provided consumer. If you just want a simple
|
||||||
|
* acknowledgement response with no content, use [com.r3corda.core.messaging.Ack]
|
||||||
*
|
*
|
||||||
* @param topic the topic, without the default session ID postfix (".0)
|
* @param topic the topic, without the default session ID postfix (".0)
|
||||||
* @param handler a function to handle the deserialised request and return a response
|
* @param handler a function to handle the deserialised request and return an optional response (if return type not Unit)
|
||||||
* @param exceptionConsumer a function to which any thrown exception is passed.
|
* @param exceptionConsumer a function to which any thrown exception is passed.
|
||||||
*/
|
*/
|
||||||
protected inline fun <reified Q : AbstractRequestMessage, reified R : Any>
|
protected inline fun <reified Q : AbstractRequestMessage, reified R : Any>
|
||||||
@ -30,8 +32,11 @@ abstract class AbstractNodeService(val net: MessagingService) {
|
|||||||
try {
|
try {
|
||||||
val req = message.data.deserialize<Q>()
|
val req = message.data.deserialize<Q>()
|
||||||
val data = handler(req)
|
val data = handler(req)
|
||||||
val msg = net.createMessage(topic + "." + req.sessionID, data.serialize().bits)
|
// If the return type R is Unit, then do not send a response
|
||||||
|
if (data.javaClass != Unit.javaClass) {
|
||||||
|
val msg = net.createMessage("$topic.${req.sessionID}", data.serialize().bits)
|
||||||
net.send(msg, req.replyTo)
|
net.send(msg, req.replyTo)
|
||||||
|
}
|
||||||
} catch(e: Exception) {
|
} catch(e: Exception) {
|
||||||
exceptionConsumer(message, e)
|
exceptionConsumer(message, e)
|
||||||
}
|
}
|
||||||
@ -40,19 +45,15 @@ abstract class AbstractNodeService(val net: MessagingService) {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
|
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
|
||||||
* common boilerplate code. Exceptions are propagated to the messaging layer.
|
* common boilerplate code. Exceptions are propagated to the messaging layer. If you just want a simple
|
||||||
|
* acknowledgement response with no content, use [com.r3corda.core.messaging.Ack]
|
||||||
*
|
*
|
||||||
* @param topic the topic, without the default session ID postfix (".0)
|
* @param topic the topic, without the default session ID postfix (".0)
|
||||||
* @param handler a function to handle the deserialised request and return a response
|
* @param handler a function to handle the deserialised request and return an optional response (if return type not Unit)
|
||||||
*/
|
*/
|
||||||
protected inline fun <reified Q : AbstractRequestMessage, reified R : Any>
|
protected inline fun <reified Q : AbstractRequestMessage, reified R : Any>
|
||||||
addMessageHandler(topic: String,
|
addMessageHandler(topic: String,
|
||||||
crossinline handler: (Q) -> R) {
|
crossinline handler: (Q) -> R) {
|
||||||
net.addMessageHandler(topic + TOPIC_DEFAULT_POSTFIX, null) { message, r ->
|
addMessageHandler(topic, handler, { message: Message, exception: Exception -> throw exception })
|
||||||
val req = message.data.deserialize<Q>()
|
|
||||||
val data = handler(req)
|
|
||||||
val msg = net.createMessage(topic + "." + req.sessionID, data.serialize().bits)
|
|
||||||
net.send(msg, req.replyTo)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package com.r3corda.node.services.transactions
|
package com.r3corda.node.services.transactions
|
||||||
|
|
||||||
|
import com.r3corda.core.messaging.Ack
|
||||||
import com.r3corda.core.messaging.MessagingService
|
import com.r3corda.core.messaging.MessagingService
|
||||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||||
import com.r3corda.core.node.services.ServiceType
|
import com.r3corda.core.node.services.ServiceType
|
||||||
@ -35,12 +36,13 @@ abstract class NotaryService(val smm: StateMachineManager,
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun processRequest(req: NotaryProtocol.Handshake) {
|
private fun processRequest(req: NotaryProtocol.Handshake): Ack {
|
||||||
val protocol = protocolFactory.create(req.replyTo as SingleMessageRecipient,
|
val protocol = protocolFactory.create(req.replyTo as SingleMessageRecipient,
|
||||||
req.sessionID!!,
|
req.sessionID!!,
|
||||||
req.sendSessionID,
|
req.sendSessionID,
|
||||||
timestampChecker,
|
timestampChecker,
|
||||||
uniquenessProvider)
|
uniquenessProvider)
|
||||||
smm.add(NotaryProtocol.TOPIC, protocol)
|
smm.add(NotaryProtocol.TOPIC, protocol)
|
||||||
|
return Ack
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user