diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessageDeduplicatorTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessageDeduplicatorTest.kt index b7e60f1cca..5e68ace6dd 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessageDeduplicatorTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessageDeduplicatorTest.kt @@ -89,7 +89,7 @@ class P2PMessageDeduplicatorTest { processMessage(sessionInitMessage) val sessionDataAfterSessionInit = database.transaction { - entityManager.find(P2PMessageDeduplicator.SessionData::class.java, SESSION_ID.value) + entityManager.find(P2PMessageDeduplicator.SessionData::class.java, SESSION_ID.toHex()) } assertThat(sessionDataAfterSessionInit.firstSenderSeqNo).isEqualTo(FIRST_SENDER_SEQ_NO) assertThat(sessionDataAfterSessionInit.lastSenderSeqNo).isNull() @@ -100,7 +100,7 @@ class P2PMessageDeduplicatorTest { } val sessionDataAfterSessionEnd = database.transaction { - entityManager.find(P2PMessageDeduplicator.SessionData::class.java, SESSION_ID.value) + entityManager.find(P2PMessageDeduplicator.SessionData::class.java, SESSION_ID.toHex()) } assertThat(sessionDataAfterSessionEnd.firstSenderSeqNo).isEqualTo(FIRST_SENDER_SEQ_NO) assertThat(sessionDataAfterSessionEnd.lastSenderSeqNo).isEqualTo(LAST_SENDER_SEQ_NO) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessageIdentifier.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessageIdentifier.kt index 273bd68daa..9eed4f1a93 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessageIdentifier.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessageIdentifier.kt @@ -2,7 +2,6 @@ package net.corda.node.services.messaging import net.corda.node.services.statemachine.MessageType import net.corda.node.services.statemachine.SessionId -import java.math.BigInteger import java.time.Instant /** @@ -31,23 +30,22 @@ data class MessageIdentifier( companion object { const val SHARD_SIZE_IN_CHARS = 8 const val LONG_SIZE_IN_HEX = 16 // 64 / 4 - const val SESSION_ID_SIZE_IN_HEX = SessionId.MAX_BIT_SIZE / 4 - const val HEX_RADIX = 16 + private const val HEX_RADIX = 16 fun parse(id: String): MessageIdentifier { val prefix = id.substring(0, 2) val messageType = MessageType.fromPrefix(prefix) val timestamp = java.lang.Long.parseUnsignedLong(id.substring(3, 19), HEX_RADIX) val shardIdentifier = id.substring(20, 28) - val sessionId = BigInteger(id.substring(29, 61), HEX_RADIX) + val sessionId = SessionId.fromHex(id.substring(29, 61)) val sessionSequenceNumber = Integer.parseInt(id.substring(62), HEX_RADIX) - return MessageIdentifier(messageType, shardIdentifier, SessionId(sessionId), sessionSequenceNumber, Instant.ofEpochMilli(timestamp)) + return MessageIdentifier(messageType, shardIdentifier, sessionId, sessionSequenceNumber, Instant.ofEpochMilli(timestamp)) } } override fun toString(): String { val prefix = messageType.prefix - val encodedSessionIdentifier = String.format("%1$0${SESSION_ID_SIZE_IN_HEX}X", sessionIdentifier.value) + val encodedSessionIdentifier = sessionIdentifier.toHex() val encodedSequenceNumber = Integer.toHexString(sessionSequenceNumber).toUpperCase() val encodedTimestamp = String.format("%1$0${LONG_SIZE_IN_HEX}X", timestamp.toEpochMilli()) return "$prefix-$encodedTimestamp-$shardIdentifier-$encodedSessionIdentifier-$encodedSequenceNumber" diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt index 24524d3edf..becc12e116 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessageDeduplicator.kt @@ -41,15 +41,15 @@ class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val databa */ private val sessionData = createSessionDataMap(cacheFactory) - private fun createSessionDataMap(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap { + private fun createSessionDataMap(cacheFactory: NamedCacheFactory): AppendOnlyPersistentMap { return AppendOnlyPersistentMap( cacheFactory = cacheFactory, name = "P2PMessageDeduplicator_sessionData", - toPersistentEntityKey = { it.value }, - fromPersistentEntity = { Pair(SessionId(it.sessionId), MessageMeta(it.generationTime, it.senderHash, it.firstSenderSeqNo, it.lastSenderSeqNo)) }, + toPersistentEntityKey = { it.toHex() }, + fromPersistentEntity = { Pair(SessionId.fromHex(it.sessionId), MessageMeta(it.generationTime, it.senderHash, it.firstSenderSeqNo, it.lastSenderSeqNo)) }, toPersistentEntity = { key: SessionId, value: MessageMeta -> SessionData().apply { - sessionId = key.value + sessionId = key.toHex() generationTime = value.generationTime senderHash = value.senderHash firstSenderSeqNo = value.firstSenderSeqNo @@ -132,7 +132,7 @@ class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val databa val criteriaUpdate = criteriaBuilder.createCriteriaUpdate(SessionData::class.java) val queryRoot = criteriaUpdate.from(SessionData::class.java) criteriaUpdate.set(SessionData::lastSenderSeqNo.name, value.lastSenderSeqNo) - criteriaUpdate.where(criteriaBuilder.equal(queryRoot.get(SessionData::sessionId.name), key.value)) + criteriaUpdate.where(criteriaBuilder.equal(queryRoot.get(SessionData::sessionId.name), key.toHex())) val update = session.createQuery(criteriaUpdate) val rowsUpdated = update.executeUpdate() return rowsUpdated != 0 @@ -142,9 +142,12 @@ class P2PMessageDeduplicator(cacheFactory: NamedCacheFactory, private val databa @Suppress("MagicNumber") // database column width @Table(name = "${NODE_DATABASE_PREFIX}session_data") class SessionData ( + /** + * The session identifier in hexadecimal form. + */ @Id @Column(name = "session_id", nullable = false) - var sessionId: BigInteger = BigInteger.ZERO, + var sessionId: String = "", /** * The time the corresponding session-init message was originally generated on the sender side. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt index 15b9868197..f72ef86fe4 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt @@ -40,11 +40,23 @@ data class SessionId(val value: BigInteger) { SessionId(this.value.plus(BigInteger.ONE)) } + fun toHex(): String { + return String.format("%1$0${SESSION_ID_SIZE_IN_HEX}X", value) + } + companion object { const val MAX_BIT_SIZE = 128 + const val SESSION_ID_SIZE_IN_HEX = MAX_BIT_SIZE / 4 val LARGEST_SESSION_ID = BigInteger.valueOf(2).pow(MAX_BIT_SIZE).minus(BigInteger.ONE) fun createRandom(secureRandom: SecureRandom) = SessionId(BigInteger(MAX_BIT_SIZE, secureRandom)) + + @Suppress("MagicNumber") + fun fromHex(hexValue: String): SessionId { + require(hexValue.length == SESSION_ID_SIZE_IN_HEX) { "A session identifier in hex form must be $SESSION_ID_SIZE_IN_HEX characters long" } + val value = BigInteger(hexValue, 16) + return SessionId(value) + } } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index ba8d7f3275..6f8ba80148 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -38,9 +38,6 @@ import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.DeduplicationHandler -import net.corda.node.services.messaging.MessageIdentifier -import net.corda.node.services.messaging.SenderSequenceNumber -import net.corda.node.services.messaging.SenderUUID import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.currentStateMachine import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor @@ -707,8 +704,7 @@ internal class SingleThreadedStateMachineManager( val sender = serviceHub.networkMapCache.getPeerByLegalName(peer) if (sender != null) { when (sessionMessage) { - is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, sender, event, - event.receivedMessage.uniqueMessageId, event.receivedMessage.senderUUID, event.receivedMessage.senderSeqNo) + is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, sender, event) is InitialSessionMessage -> onSessionInit(sessionMessage, sender, event) } } else { @@ -721,10 +717,7 @@ internal class SingleThreadedStateMachineManager( private fun onExistingSessionMessage( sessionMessage: ExistingSessionMessage, sender: Party, - externalEvent: ExternalEvent.ExternalMessageEvent, - messageIdentifier: MessageIdentifier, - senderUUID: SenderUUID?, - senderSequenceNumber: SenderSequenceNumber? + externalEvent: ExternalEvent.ExternalMessageEvent ) { try { val deduplicationHandler = externalEvent.deduplicationHandler @@ -742,7 +735,8 @@ internal class SingleThreadedStateMachineManager( logger.info("Cannot find flow corresponding to session ID - $recipientId.") } } else { - val event = Event.DeliverSessionMessage(sessionMessage, deduplicationHandler, sender, messageIdentifier, senderUUID, senderSequenceNumber) + val event = Event.DeliverSessionMessage(sessionMessage, deduplicationHandler, sender, + externalEvent.receivedMessage.uniqueMessageId, externalEvent.receivedMessage.senderUUID, externalEvent.receivedMessage.senderSeqNo) innerState.withLock { flows[flowId]?.run { fiber.scheduleEvent(event) } // If flow is not running add it to the list of external events to be processed if/when the flow resumes. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index 91ecc1477f..25d647ec29 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -354,7 +354,7 @@ sealed class SessionState { * Returns the new form of the state */ fun bufferMessage(messageIdentifier: MessageIdentifier, messagePayload: ExistingSessionMessagePayload): SessionState { - return this.copy(bufferedMessages = bufferedMessages + Pair(messageIdentifier, messagePayload), nextSendingSeqNumber = nextSendingSeqNumber + 1) + return bufferMessages(listOf(messageIdentifier to messagePayload)) } /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 7a7b6da50f..4e1d84e4ea 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -245,7 +245,7 @@ class TopLevelTransition( checkpointState = checkpoint.checkpointState.copy( numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1, numberOfCommits = checkpoint.checkpointState.numberOfCommits + 1, - suspensionTime = context.time + suspensionTime = context.time ), flowState = FlowState.Finished, result = event.returnValue, diff --git a/node/src/main/resources/migration/node-core.changelog-v21.xml b/node/src/main/resources/migration/node-core.changelog-v21.xml index bf5bf67081..581ca4dbe0 100644 --- a/node/src/main/resources/migration/node-core.changelog-v21.xml +++ b/node/src/main/resources/migration/node-core.changelog-v21.xml @@ -6,7 +6,7 @@ - + diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/SessionIdTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/SessionIdTest.kt index 2be1281a07..ad3307e4b0 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/SessionIdTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/SessionIdTest.kt @@ -39,4 +39,16 @@ class SessionIdTest { assertThat(initiatedSessionId.value.toLong()).isEqualTo(0) } + @Test(timeout=300_000) + fun `conversion from and to hex form works properly`() { + val sessionId = SessionId(BigInteger.valueOf(42)) + val sessionIdHexForm = "0000000000000000000000000000002A" + + assertThat(sessionId.toHex()).isEqualTo(sessionIdHexForm) + assertThat(SessionId.fromHex(sessionIdHexForm)).isEqualTo(sessionId) + assertThatThrownBy { SessionId.fromHex("2A") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining("A session identifier in hex form must be 32 characters long") + } + } \ No newline at end of file