From 673f02d635af3253d7106533e5a4f0b28ee6d68e Mon Sep 17 00:00:00 2001 From: Dimos Raptis Date: Thu, 24 Sep 2020 12:01:29 +0100 Subject: [PATCH] Address Rick's comments --- .../messaging/P2PMessageDeduplicatorTest.kt | 4 +-- .../services/messaging/MessageIdentifier.kt | 34 ++----------------- .../node/services/messaging/Messaging.kt | 6 ++-- .../services/messaging/P2PMessagingClient.kt | 4 +-- .../services/statemachine/SessionMessage.kt | 21 ++++++++---- .../statemachine/sharding/ShardIdGenerator.kt | 12 +++++++ .../transitions/StartedFlowTransition.kt | 8 ++--- .../node/internal/MockNodeMessagingService.kt | 2 +- 8 files changed, 40 insertions(+), 51 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/services/statemachine/sharding/ShardIdGenerator.kt diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessageDeduplicatorTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessageDeduplicatorTest.kt index 83db356a8d..b7e60f1cca 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessageDeduplicatorTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessageDeduplicatorTest.kt @@ -5,9 +5,9 @@ import net.corda.core.utilities.ByteSequence import net.corda.node.services.messaging.MessageIdentifier import net.corda.node.services.messaging.P2PMessageDeduplicator import net.corda.node.services.messaging.ReceivedMessage -import net.corda.node.services.messaging.generateShardId import net.corda.node.services.statemachine.MessageType import net.corda.node.services.statemachine.SessionId +import net.corda.node.services.statemachine.sharding.ShardIdGenerator import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.testing.core.SerializationEnvironmentRule @@ -29,7 +29,7 @@ class P2PMessageDeduplicatorTest { companion object { private const val TOPIC = "whatever" private val DATA = ByteSequence.of("blah blah blah".toByteArray()) - private val SHARD_ID = generateShardId("some-flow-id") + private val SHARD_ID = ShardIdGenerator.generateShardId("some-flow-id") private val SESSION_ID = SessionId(BigInteger.ONE) private val TIMESTAMP = Instant.now() private val SENDER = CordaX500Name("CordaWorld", "The Sea Devil", "NeverLand", "NL") diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessageIdentifier.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessageIdentifier.kt index d5fbc91aaf..273bd68daa 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessageIdentifier.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessageIdentifier.kt @@ -1,10 +1,7 @@ package net.corda.node.services.messaging -import net.corda.core.crypto.SecureHash -import net.corda.node.services.messaging.MessageIdentifier.Companion.SHARD_SIZE_IN_CHARS import net.corda.node.services.statemachine.MessageType import net.corda.node.services.statemachine.SessionId -import java.lang.IllegalStateException import java.math.BigInteger import java.time.Instant @@ -39,40 +36,17 @@ data class MessageIdentifier( fun parse(id: String): MessageIdentifier { val prefix = id.substring(0, 2) - val messageType = prefixToMessageType(prefix) + val messageType = MessageType.fromPrefix(prefix) val timestamp = java.lang.Long.parseUnsignedLong(id.substring(3, 19), HEX_RADIX) val shardIdentifier = id.substring(20, 28) val sessionId = BigInteger(id.substring(29, 61), HEX_RADIX) val sessionSequenceNumber = Integer.parseInt(id.substring(62), HEX_RADIX) return MessageIdentifier(messageType, shardIdentifier, SessionId(sessionId), sessionSequenceNumber, Instant.ofEpochMilli(timestamp)) } - - private fun messageTypeToPrefix(messageType: MessageType): String { - return when(messageType) { - MessageType.SESSION_INIT -> "XI" - MessageType.SESSION_CONFIRM -> "XC" - MessageType.SESSION_REJECT -> "XR" - MessageType.DATA_MESSAGE -> "XD" - MessageType.SESSION_END -> "XE" - MessageType.SESSION_ERROR -> "XX" - } - } - - private fun prefixToMessageType(prefix: String): MessageType { - return when(prefix) { - "XI" -> MessageType.SESSION_INIT - "XC" -> MessageType.SESSION_CONFIRM - "XR" -> MessageType.SESSION_REJECT - "XD" -> MessageType.DATA_MESSAGE - "XE" -> MessageType.SESSION_END - "XX" -> MessageType.SESSION_ERROR - else -> throw IllegalStateException("Invalid prefix: $prefix") - } - } } override fun toString(): String { - val prefix = messageTypeToPrefix(messageType) + val prefix = messageType.prefix val encodedSessionIdentifier = String.format("%1$0${SESSION_ID_SIZE_IN_HEX}X", sessionIdentifier.value) val encodedSequenceNumber = Integer.toHexString(sessionSequenceNumber).toUpperCase() val encodedTimestamp = String.format("%1$0${LONG_SIZE_IN_HEX}X", timestamp.toEpochMilli()) @@ -81,10 +55,6 @@ data class MessageIdentifier( } -fun generateShardId(flowIdentifier: String): String { - return SecureHash.sha256(flowIdentifier).prefixChars(SHARD_SIZE_IN_CHARS) -} - /** * A unique identifier for a sender that might be different across restarts. * It is used to help identify when messages are being sent continuously without errors or message are sent after the sender recovered from an error. diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt index 7e342b6eea..829f1b5610 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/Messaging.kt @@ -94,11 +94,11 @@ interface MessagingService : ServiceLifecycleSupport { * Signal that a session has ended to the messaging layer, so that any necessary cleanup is performed. * * @param sessionId the identifier of the session that ended. - * @param senderUUID the sender UUID of the last message seen in the session or null if there was no sender UUID in that message. - * @param senderSequenceNumber the sender sequence number of the last message seen in the session or null if there was no sender sequence number in that message. + * @param theirSenderUUID the sender UUID last seen from the other side in this session or null if there was no sender UUID seen at all. + * @param senderSequenceNumber the last sender sequence number seen from the other side in the session or null if there was no sender sequence number seen at all. */ @Suspendable - fun sessionEnded(sessionId: SessionId, senderUUID: SenderUUID?, senderSequenceNumber: SenderSequenceNumber?) + fun sessionEnded(sessionId: SessionId, theirSenderUUID: SenderUUID?, senderSequenceNumber: SenderSequenceNumber?) /** * Returns an initialised [Message] with the current time, etc, already filled in. diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 3fc9562c7e..bc106ec069 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -551,8 +551,8 @@ class P2PMessagingClient(val config: NodeConfiguration, } @Suspendable - override fun sessionEnded(sessionId: SessionId, senderUUID: SenderUUID?, senderSequenceNumber: SenderSequenceNumber?) { - deduplicator.signalSessionEnd(sessionId, senderUUID, senderSequenceNumber) + override fun sessionEnded(sessionId: SessionId, theirSenderUUID: SenderUUID?, senderSequenceNumber: SenderSequenceNumber?) { + deduplicator.signalSessionEnd(sessionId, theirSenderUUID, senderSequenceNumber) } override fun resolveTargetToArtemisQueue(address: MessageRecipients): String { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt index ec76925ffb..15b9868197 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt @@ -4,6 +4,7 @@ import net.corda.core.flows.FlowException import net.corda.core.flows.FlowInfo import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SerializedBytes +import java.lang.IllegalStateException import java.math.BigInteger import java.security.SecureRandom @@ -136,15 +137,21 @@ data class RejectSessionMessage(val message: String, val errorId: Long) : Existi */ object EndSessionMessage : ExistingSessionMessagePayload() -enum class MessageType { - SESSION_INIT, - SESSION_CONFIRM, - SESSION_REJECT, - DATA_MESSAGE, - SESSION_END, - SESSION_ERROR; +enum class MessageType(val prefix: String) { + SESSION_INIT("XI"), + SESSION_CONFIRM("XC"), + SESSION_REJECT("XR"), + DATA_MESSAGE("XD"), + SESSION_END("XE"), + SESSION_ERROR("XX"); companion object { + private val reverseMap = values().associateBy(MessageType::prefix) + + fun fromPrefix(prefix: String): MessageType { + return reverseMap[prefix] ?: throw IllegalStateException("Invalid prefix: $prefix") + } + fun inferFromMessage(message: SessionMessage): MessageType { return when (message) { is InitialSessionMessage -> SESSION_INIT diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/sharding/ShardIdGenerator.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/sharding/ShardIdGenerator.kt new file mode 100644 index 0000000000..ccc01663ce --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/sharding/ShardIdGenerator.kt @@ -0,0 +1,12 @@ +package net.corda.node.services.statemachine.sharding + +import net.corda.core.crypto.SecureHash +import net.corda.node.services.messaging.MessageIdentifier + +class ShardIdGenerator { + companion object { + fun generateShardId(flowIdentifier: String): String { + return SecureHash.sha256(flowIdentifier).prefixChars(MessageIdentifier.SHARD_SIZE_IN_CHARS) + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index c42ac0eaf9..94b4948e48 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -12,8 +12,8 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.toNonEmptySet import net.corda.node.services.messaging.MessageIdentifier import net.corda.node.services.messaging.SenderDeduplicationInfo -import net.corda.node.services.messaging.generateShardId import net.corda.node.services.statemachine.* +import net.corda.node.services.statemachine.sharding.ShardIdGenerator import org.slf4j.Logger import kotlin.collections.LinkedHashMap @@ -184,7 +184,7 @@ class StartedFlowTransition( val sinkSessionId = (sessionState as SessionState.Initiated).peerSinkSessionId val message = ExistingSessionMessage(sinkSessionId, EndSessionMessage) val messageType = MessageType.inferFromMessage(message) - val messageIdentifier = MessageIdentifier(messageType, generateShardId(context.id.toString()), sinkSessionId, sessionState.nextSendingSeqNumber, currentState.checkpoint.checkpointState.suspensionTime) + val messageIdentifier = MessageIdentifier(messageType, ShardIdGenerator.generateShardId(context.id.toString()), sinkSessionId, sessionState.nextSendingSeqNumber, currentState.checkpoint.checkpointState.suspensionTime) Action.SendExisting(sessionState.peerParty, message, SenderDeduplicationInfo(messageIdentifier, currentState.senderUUID)) } val signalSessionsEndMap = existingSessionsToRemove.map { (sessionId, _) -> @@ -290,7 +290,7 @@ class StartedFlowTransition( if (sessionState !is SessionState.Uninitiated) { continue } - val shardId = generateShardId(context.id.toString()) + val shardId = ShardIdGenerator.generateShardId(context.id.toString()) val counterpartySessionId = sourceSessionId.calculateInitiatedSessionId() val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, null) val newSessionState = SessionState.Initiating( @@ -333,7 +333,7 @@ class StartedFlowTransition( val sendInitialActions = messagesByType[SessionState.Uninitiated::class]?.mapNotNull { (sourceSessionId, sessionState, message) -> val uninitiatedSessionState = sessionState as SessionState.Uninitiated - val shardId = generateShardId(context.id.toString()) + val shardId = ShardIdGenerator.generateShardId(context.id.toString()) if (sessionState.hasBeenAcknowledged != null) { newSessions[sourceSessionId] = SessionState.Initiated( peerParty = sessionState.hasBeenAcknowledged.first, diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt index 5a10a2a71c..3bc04b40c7 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockNodeMessagingService.kt @@ -169,7 +169,7 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration, } @Suspendable - override fun sessionEnded(sessionId: SessionId, senderUUID: SenderUUID?, senderSequenceNumber: SenderSequenceNumber?) { + override fun sessionEnded(sessionId: SessionId, theirSenderUUID: SenderUUID?, senderSequenceNumber: SenderSequenceNumber?) { // nothing to do here. }