diff --git a/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt b/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt index 0d05bfd743..43da9925fe 100644 --- a/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt +++ b/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt @@ -1,6 +1,7 @@ package com.r3corda.core.messaging import com.google.common.util.concurrent.ListenableFuture +import com.r3corda.core.serialization.DeserializeAsKotlinObjectDef import com.r3corda.core.serialization.serialize import java.time.Instant import java.util.concurrent.Executor @@ -133,3 +134,9 @@ interface MessageRecipientGroup : MessageRecipients /** A special base class for the set of all possible recipients, without having to identify who they all are. */ interface AllPossibleRecipients : MessageRecipients + +/** + * A general Ack message that conveys no content other than it's presence for use when you want an acknowledgement + * from a recipient. Using [Unit] can be ambiguous as it is similar to [Void] and so could mean no response. + */ +object Ack : DeserializeAsKotlinObjectDef \ No newline at end of file diff --git a/core/src/main/kotlin/com/r3corda/core/serialization/Kryo.kt b/core/src/main/kotlin/com/r3corda/core/serialization/Kryo.kt index 064cfda45b..2e555156e7 100644 --- a/core/src/main/kotlin/com/r3corda/core/serialization/Kryo.kt +++ b/core/src/main/kotlin/com/r3corda/core/serialization/Kryo.kt @@ -37,7 +37,6 @@ import java.util.* import javax.annotation.concurrent.ThreadSafe import kotlin.reflect.* import kotlin.reflect.jvm.javaType -import java.security.PrivateKey /** * Serialization utilities, using the Kryo framework with a custom serialiser for immutable data classes and a dead @@ -305,6 +304,19 @@ object Ed25519PublicKeySerializer : Serializer() { } } +/** Marker interface for kotlin object definitions so that they are deserialized as the singleton instance. */ +interface DeserializeAsKotlinObjectDef + +/** Serializer to deserialize kotlin object definitions marked with [DeserializeAsKotlinObjectDef]. */ +object KotlinObjectSerializer : Serializer() { + override fun read(kryo: Kryo, input: Input, type: Class): DeserializeAsKotlinObjectDef { + // read the public static INSTANCE field that kotlin compiler generates. + return type.getField("INSTANCE").get(null) as DeserializeAsKotlinObjectDef + } + + override fun write(kryo: Kryo, output: Output, obj: DeserializeAsKotlinObjectDef) { + } +} fun createKryo(k: Kryo = Kryo()): Kryo { return k.apply { @@ -354,6 +366,9 @@ fun createKryo(k: Kryo = Kryo()): Kryo { // This ensures a NonEmptySetSerializer is constructed with an initial value. register(NonEmptySet::class.java, NonEmptySetSerializer) + + /** This ensures any kotlin objects that implement [DeserializeAsKotlinObjectDef] are read back in as singletons. */ + addDefaultSerializer(DeserializeAsKotlinObjectDef::class.java, KotlinObjectSerializer) noReferencesWithin() } diff --git a/core/src/main/kotlin/com/r3corda/protocols/NotaryProtocol.kt b/core/src/main/kotlin/com/r3corda/protocols/NotaryProtocol.kt index 6345e4ecbb..d8a6b9f833 100644 --- a/core/src/main/kotlin/com/r3corda/protocols/NotaryProtocol.kt +++ b/core/src/main/kotlin/com/r3corda/protocols/NotaryProtocol.kt @@ -7,6 +7,7 @@ import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SignedData import com.r3corda.core.crypto.signWithECDSA +import com.r3corda.core.messaging.Ack import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.TimestampChecker @@ -55,7 +56,7 @@ object NotaryProtocol { val receiveSessionID = random63BitValue() val handshake = Handshake(serviceHub.networkService.myAddress, sendSessionID, receiveSessionID) - sendAndReceive(TOPIC_INITIATE, notaryNode.address, 0, receiveSessionID, handshake) + sendAndReceive(TOPIC_INITIATE, notaryNode.address, 0, receiveSessionID, handshake) val request = SignRequest(wtx.serialized, serviceHub.storageService.myLegalIdentity) val response = sendAndReceive(TOPIC, notaryNode.address, sendSessionID, receiveSessionID, request) diff --git a/core/src/main/kotlin/protocols/NotaryChangeProtocol.kt b/core/src/main/kotlin/protocols/NotaryChangeProtocol.kt index 9acebf7aa6..665a1343a5 100644 --- a/core/src/main/kotlin/protocols/NotaryChangeProtocol.kt +++ b/core/src/main/kotlin/protocols/NotaryChangeProtocol.kt @@ -5,6 +5,7 @@ import com.r3corda.core.contracts.* import com.r3corda.core.crypto.DigitalSignature import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.signWithECDSA +import com.r3corda.core.messaging.Ack import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.NodeInfo import com.r3corda.core.protocols.ProtocolLogic @@ -104,7 +105,7 @@ object NotaryChangeProtocol { val proposal = Proposal(originalState.ref, newNotary, stx) val handshake = Handshake(sessionIdForSend, serviceHub.networkService.myAddress, sessionIdForReceive) - sendAndReceive(TOPIC_INITIATE, node.address, 0, sessionIdForReceive, handshake) + sendAndReceive(TOPIC_INITIATE, node.address, 0, sessionIdForReceive, handshake) val response = sendAndReceive(TOPIC_CHANGE, node.address, sessionIdForSend, sessionIdForReceive, proposal) val participantSignature = response.validate { diff --git a/core/src/test/kotlin/com/r3corda/core/serialization/KryoTests.kt b/core/src/test/kotlin/com/r3corda/core/serialization/KryoTests.kt index 3b5d29ee82..6adc5aa7e9 100644 --- a/core/src/test/kotlin/com/r3corda/core/serialization/KryoTests.kt +++ b/core/src/test/kotlin/com/r3corda/core/serialization/KryoTests.kt @@ -3,6 +3,7 @@ package com.r3corda.core.serialization import com.google.common.primitives.Ints import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.crypto.signWithECDSA +import com.r3corda.core.messaging.Ack import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.Test @@ -73,6 +74,14 @@ class KryoTests { assertThatThrownBy { deserialisedSignature.verifyWithECDSA(wrongBits) } } + @Test + fun `write and read Ack`() { + val tokenizableBefore = Ack + val serializedBytes = tokenizableBefore.serialize(kryo) + val tokenizableAfter = serializedBytes.deserialize(kryo) + assertThat(tokenizableAfter).isSameAs(tokenizableBefore) + } + private data class Person(val name: String, val birthday: Instant?) @Suppress("unused") diff --git a/node/src/main/kotlin/com/r3corda/node/services/NotaryChangeService.kt b/node/src/main/kotlin/com/r3corda/node/services/NotaryChangeService.kt index 7b13f01e44..ab0b6581aa 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/NotaryChangeService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/NotaryChangeService.kt @@ -1,5 +1,6 @@ package com.r3corda.node.services +import com.r3corda.core.messaging.Ack import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.node.services.api.AbstractNodeService @@ -17,11 +18,12 @@ class NotaryChangeService(net: MessagingService, val smm: StateMachineManager) : ) } - private fun handleChangeNotaryRequest(req: NotaryChangeProtocol.Handshake) { + private fun handleChangeNotaryRequest(req: NotaryChangeProtocol.Handshake): Ack { val protocol = NotaryChangeProtocol.Acceptor( req.replyTo as SingleMessageRecipient, req.sessionID!!, req.sessionIdForSend) smm.add(NotaryChangeProtocol.TOPIC_CHANGE, protocol) + return Ack } } diff --git a/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt b/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt index 10e0379643..0827dc1066 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/api/AbstractNodeService.kt @@ -3,6 +3,7 @@ package com.r3corda.node.services.api import com.r3corda.core.messaging.Message import com.r3corda.core.messaging.MessagingService import com.r3corda.core.node.services.TOPIC_DEFAULT_POSTFIX +import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize import com.r3corda.protocols.AbstractRequestMessage @@ -12,14 +13,15 @@ import javax.annotation.concurrent.ThreadSafe * Abstract superclass for services that a node can host, which provides helper functions. */ @ThreadSafe -abstract class AbstractNodeService(val net: MessagingService) { +abstract class AbstractNodeService(val net: MessagingService) : SingletonSerializeAsToken() { /** * Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of - * common boilerplate code. Exceptions are caught and passed to the provided consumer. + * common boilerplate code. Exceptions are caught and passed to the provided consumer. If you just want a simple + * acknowledgement response with no content, use [com.r3corda.core.messaging.Ack] * * @param topic the topic, without the default session ID postfix (".0) - * @param handler a function to handle the deserialised request and return a response + * @param handler a function to handle the deserialised request and return an optional response (if return type not Unit) * @param exceptionConsumer a function to which any thrown exception is passed. */ protected inline fun @@ -30,8 +32,11 @@ abstract class AbstractNodeService(val net: MessagingService) { try { val req = message.data.deserialize() val data = handler(req) - val msg = net.createMessage(topic + "." + req.sessionID, data.serialize().bits) - net.send(msg, req.replyTo) + // If the return type R is Unit, then do not send a response + if (data.javaClass != Unit.javaClass) { + val msg = net.createMessage("$topic.${req.sessionID}", data.serialize().bits) + net.send(msg, req.replyTo) + } } catch(e: Exception) { exceptionConsumer(message, e) } @@ -40,19 +45,15 @@ abstract class AbstractNodeService(val net: MessagingService) { /** * Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of - * common boilerplate code. Exceptions are propagated to the messaging layer. + * common boilerplate code. Exceptions are propagated to the messaging layer. If you just want a simple + * acknowledgement response with no content, use [com.r3corda.core.messaging.Ack] * * @param topic the topic, without the default session ID postfix (".0) - * @param handler a function to handle the deserialised request and return a response + * @param handler a function to handle the deserialised request and return an optional response (if return type not Unit) */ protected inline fun addMessageHandler(topic: String, crossinline handler: (Q) -> R) { - net.addMessageHandler(topic + TOPIC_DEFAULT_POSTFIX, null) { message, r -> - val req = message.data.deserialize() - val data = handler(req) - val msg = net.createMessage(topic + "." + req.sessionID, data.serialize().bits) - net.send(msg, req.replyTo) - } + addMessageHandler(topic, handler, { message: Message, exception: Exception -> throw exception }) } } diff --git a/node/src/main/kotlin/com/r3corda/node/services/transactions/NotaryService.kt b/node/src/main/kotlin/com/r3corda/node/services/transactions/NotaryService.kt index b845b36675..c549734ad0 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/transactions/NotaryService.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/transactions/NotaryService.kt @@ -1,5 +1,6 @@ package com.r3corda.node.services.transactions +import com.r3corda.core.messaging.Ack import com.r3corda.core.messaging.MessagingService import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.services.ServiceType @@ -35,12 +36,13 @@ abstract class NotaryService(val smm: StateMachineManager, ) } - private fun processRequest(req: NotaryProtocol.Handshake) { + private fun processRequest(req: NotaryProtocol.Handshake): Ack { val protocol = protocolFactory.create(req.replyTo as SingleMessageRecipient, req.sessionID!!, req.sendSessionID, timestampChecker, uniquenessProvider) smm.add(NotaryProtocol.TOPIC, protocol) + return Ack } }