From 30b7eec18c554584427cb5a23cda79bfa198d807 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Wed, 19 Oct 2016 15:39:39 +0100 Subject: [PATCH] Remove the file based checkpoints and transactions from the tests. Messages requiring redelivery to late registered handler persisted in database. Remove spurious comment and make local val not var --- .../kotlin/com/r3corda/node/api/APIServer.kt | 2 +- .../com/r3corda/node/internal/AbstractNode.kt | 6 +- .../services/messaging/NodeMessagingClient.kt | 18 ++-- .../persistence/PerFileCheckpointStorage.kt | 73 ------------- .../persistence/PerFileTransactionStorage.kt | 70 ------------ .../messaging/TwoPartyTradeProtocolTests.kt | 14 ++- .../node/services/NodeSchedulerServiceTest.kt | 4 +- .../PerFileCheckpointStorageTests.kt | 99 ----------------- .../PerFileTransactionStorageTests.kt | 100 ------------------ .../statemachine/StateMachineManagerTests.kt | 25 ++++- .../testing/node/InMemoryMessagingNetwork.kt | 18 ++-- .../com/r3corda/testing/node/MockNode.kt | 13 --- 12 files changed, 53 insertions(+), 389 deletions(-) delete mode 100644 node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileCheckpointStorage.kt delete mode 100644 node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt delete mode 100644 node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileCheckpointStorageTests.kt delete mode 100644 node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt diff --git a/node/src/main/kotlin/com/r3corda/node/api/APIServer.kt b/node/src/main/kotlin/com/r3corda/node/api/APIServer.kt index 1245d7c846..60d7f75aa7 100644 --- a/node/src/main/kotlin/com/r3corda/node/api/APIServer.kt +++ b/node/src/main/kotlin/com/r3corda/node/api/APIServer.kt @@ -43,7 +43,7 @@ interface APIServer { fun status(): Response /** - * Report this nodes configuration and identities. + * Report this node's configuration and identities. * Currently tunnels the NodeInfo as an encoding of the Kryo serialised form. * TODO this functionality should be available via the RPC */ diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index d2cc9c71cf..a9d023fe6b 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -437,13 +437,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo protected abstract fun startMessagingService(cordaRPCOps: CordaRPCOps) - protected open fun initialiseCheckpointService(dir: Path): CheckpointStorage { - return DBCheckpointStorage() - } - protected open fun initialiseStorageService(dir: Path): Pair { val attachments = makeAttachmentStorage(dir) - val checkpointStorage = initialiseCheckpointService(dir) + val checkpointStorage = DBCheckpointStorage() val transactionStorage = DBTransactionStorage() _servicesThatAcceptUploads += attachments // Populate the partyKeys set. diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt index d69663bc53..b22e35772a 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt @@ -86,8 +86,7 @@ class NodeMessagingClient(config: NodeConfiguration, var rpcConsumer: ClientConsumer? = null var rpcNotificationConsumer: ClientConsumer? = null - // TODO: This is not robust and needs to be replaced by more intelligently using the message queue server. - var undeliveredMessages = listOf() + var pendingRedelivery = JDBCHashSet("pending_messages",loadOnInit = true) } /** A registration to handle messages of different types */ @@ -254,10 +253,10 @@ class NodeMessagingClient(config: NodeConfiguration, // without causing log spam. log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet") - // This is a hack; transient messages held in memory isn't crash resistant. - // TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use. state.locked { - undeliveredMessages += msg + databaseTransaction(database) { + pendingRedelivery.add(msg) + } } return false } @@ -371,9 +370,12 @@ class NodeMessagingClient(config: NodeConfiguration, val handler = Handler(topicSession, callback) handlers.add(handler) val messagesToRedeliver = state.locked { - val messagesToRedeliver = undeliveredMessages - undeliveredMessages = listOf() - messagesToRedeliver + val pending = ArrayList() + databaseTransaction(database) { + pending.addAll(pendingRedelivery) + pendingRedelivery.clear() + } + pending } messagesToRedeliver.forEach { deliver(it) } return handler diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileCheckpointStorage.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileCheckpointStorage.kt deleted file mode 100644 index 8d515a09cc..0000000000 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileCheckpointStorage.kt +++ /dev/null @@ -1,73 +0,0 @@ -package com.r3corda.node.services.persistence - -import com.r3corda.core.serialization.SerializedBytes -import com.r3corda.core.serialization.deserialize -import com.r3corda.core.serialization.serialize -import com.r3corda.core.utilities.loggerFor -import com.r3corda.core.utilities.trace -import com.r3corda.node.services.api.Checkpoint -import com.r3corda.node.services.api.CheckpointStorage -import java.nio.file.Files -import java.nio.file.Path -import java.nio.file.StandardCopyOption -import java.util.* -import java.util.Collections.synchronizedMap -import javax.annotation.concurrent.ThreadSafe - - -/** - * File-based checkpoint storage, storing checkpoints per file. - */ -@ThreadSafe -class PerFileCheckpointStorage(val storeDir: Path) : CheckpointStorage { - - companion object { - private val logger = loggerFor() - private val fileExtension = ".checkpoint" - } - - private val checkpointFiles = synchronizedMap(IdentityHashMap()) - - init { - logger.trace { "Initialising per file checkpoint storage on $storeDir" } - Files.createDirectories(storeDir) - Files.list(storeDir) - .filter { it.toString().toLowerCase().endsWith(fileExtension) } - .forEach { - val checkpoint = Files.readAllBytes(it).deserialize() - checkpointFiles[checkpoint] = it - } - } - - override fun addCheckpoint(checkpoint: Checkpoint) { - val fileName = "${checkpoint.id.toString().toLowerCase()}$fileExtension" - val checkpointFile = storeDir.resolve(fileName) - atomicWrite(checkpointFile, checkpoint.serialize()) - logger.trace { "Stored $checkpoint to $checkpointFile" } - checkpointFiles[checkpoint] = checkpointFile - } - - private fun atomicWrite(checkpointFile: Path, serialisedCheckpoint: SerializedBytes) { - val tempCheckpointFile = checkpointFile.parent.resolve("${checkpointFile.fileName}.tmp") - serialisedCheckpoint.writeToFile(tempCheckpointFile) - Files.move(tempCheckpointFile, checkpointFile, StandardCopyOption.ATOMIC_MOVE) - } - - override fun removeCheckpoint(checkpoint: Checkpoint) { - val checkpointFile = checkpointFiles.remove(checkpoint) - require(checkpointFile != null) { "Trying to removing unknown checkpoint: $checkpoint" } - Files.delete(checkpointFile) - logger.trace { "Removed $checkpoint ($checkpointFile)" } - } - - override fun forEach(block: (Checkpoint)->Boolean) { - synchronized(checkpointFiles) { - for(checkpoint in checkpointFiles.keys) { - if (!block(checkpoint)) { - break - } - } - } - } - -} diff --git a/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt b/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt deleted file mode 100644 index 0e320cbabf..0000000000 --- a/node/src/main/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorage.kt +++ /dev/null @@ -1,70 +0,0 @@ -package com.r3corda.node.services.persistence - -import com.r3corda.core.ThreadBox -import com.r3corda.core.bufferUntilSubscribed -import com.r3corda.core.crypto.SecureHash -import com.r3corda.core.node.services.TransactionStorage -import com.r3corda.core.serialization.deserialize -import com.r3corda.core.serialization.serialize -import com.r3corda.core.transactions.SignedTransaction -import com.r3corda.core.utilities.loggerFor -import com.r3corda.core.utilities.trace -import rx.Observable -import rx.subjects.PublishSubject -import java.nio.file.Files -import java.nio.file.Path -import java.util.* -import javax.annotation.concurrent.ThreadSafe - -/** - * File-based transaction storage, storing transactions per file. - */ -@ThreadSafe -class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage { - companion object { - private val logger = loggerFor() - private val fileExtension = ".transaction" - } - - private val mutex = ThreadBox(object { - val transactionsMap = HashMap() - val updatesPublisher = PublishSubject.create() - - fun notify(transaction: SignedTransaction) = updatesPublisher.onNext(transaction) - }) - - override val updates: Observable - get() = mutex.content.updatesPublisher - - init { - logger.trace { "Initialising per file transaction storage on $storeDir" } - Files.createDirectories(storeDir) - mutex.locked { - Files.list(storeDir) - .filter { it.toString().toLowerCase().endsWith(fileExtension) } - .map { Files.readAllBytes(it).deserialize() } - .forEach { transactionsMap[it.id] = it } - } - } - - override fun addTransaction(transaction: SignedTransaction) { - val transactionFile = storeDir.resolve("${transaction.id.toString().toLowerCase()}$fileExtension") - transaction.serialize().writeToFile(transactionFile) - mutex.locked { - transactionsMap[transaction.id] = transaction - notify(transaction) - } - logger.trace { "Stored $transaction to $transactionFile" } - } - - override fun getTransaction(id: SecureHash): SignedTransaction? = mutex.locked { transactionsMap[id] } - - val transactions: Iterable get() = mutex.locked { transactionsMap.values.toList() } - - override fun track(): Pair, Observable> { - return mutex.locked { - Pair(transactionsMap.values.toList(), updates.bufferUntilSubscribed()) - } - } - -} diff --git a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt index ec90d92797..1aa6d712a4 100644 --- a/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/messaging/TwoPartyTradeProtocolTests.kt @@ -155,16 +155,14 @@ class TwoPartyTradeProtocolTests { bobNode.pumpReceive() // OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature. - assertThat(bobNode.checkpointStorage.checkpoints()).hasSize(1) + databaseTransaction(bobNode.database) { + assertThat(bobNode.checkpointStorage.checkpoints()).hasSize(1) + } val storage = bobNode.storage.validatedTransactions - val bobTransactionsBeforeCrash = if (storage is PerFileTransactionStorage) { - storage.transactions - } else if (storage is DBTransactionStorage) { - databaseTransaction(bobNode.database) { - storage.transactions - } - } else throw IllegalArgumentException("Unknown storage implementation") + val bobTransactionsBeforeCrash = databaseTransaction(bobNode.database) { + (storage as DBTransactionStorage).transactions + } assertThat(bobTransactionsBeforeCrash).isNotEmpty() // .. and let's imagine that Bob's computer has a power cut. He now has nothing now beyond what was on disk. diff --git a/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt index 48c7b24d7b..cf7194eb1c 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/NodeSchedulerServiceTest.kt @@ -12,7 +12,7 @@ import com.r3corda.core.protocols.ProtocolLogicRefFactory import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.utilities.DUMMY_NOTARY import com.r3corda.node.services.events.NodeSchedulerService -import com.r3corda.node.services.persistence.PerFileCheckpointStorage +import com.r3corda.node.services.persistence.DBCheckpointStorage import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.utilities.AddOrRemove import com.r3corda.node.utilities.AffinityExecutor @@ -88,7 +88,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() { } scheduler = NodeSchedulerService(database, services, factory, schedulerGatedExecutor) smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1) - val mockSMM = StateMachineManager(services, listOf(services, scheduler), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor, database) + val mockSMM = StateMachineManager(services, listOf(services, scheduler), DBCheckpointStorage(), smmExecutor, database) mockSMM.changes.subscribe { change -> if (change.addOrRemove == AddOrRemove.REMOVE && mockSMM.allStateMachines.isEmpty()) { smmHasRemovedAllProtocols.countDown() diff --git a/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileCheckpointStorageTests.kt b/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileCheckpointStorageTests.kt deleted file mode 100644 index 65311009e2..0000000000 --- a/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileCheckpointStorageTests.kt +++ /dev/null @@ -1,99 +0,0 @@ -package com.r3corda.node.services.persistence - -import com.google.common.jimfs.Configuration.unix -import com.google.common.jimfs.Jimfs -import com.google.common.primitives.Ints -import com.r3corda.core.serialization.SerializedBytes -import com.r3corda.node.services.api.Checkpoint -import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.assertThatExceptionOfType -import org.junit.After -import org.junit.Before -import org.junit.Test -import java.nio.file.FileSystem -import java.nio.file.Files -import java.nio.file.Path - -class PerFileCheckpointStorageTests { - - val fileSystem: FileSystem = Jimfs.newFileSystem(unix()) - val storeDir: Path = fileSystem.getPath("store") - lateinit var checkpointStorage: PerFileCheckpointStorage - - @Before - fun setUp() { - newCheckpointStorage() - } - - @After - fun cleanUp() { - fileSystem.close() - } - - @Test - fun `add new checkpoint`() { - val checkpoint = newCheckpoint() - checkpointStorage.addCheckpoint(checkpoint) - assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint) - newCheckpointStorage() - assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint) - } - - @Test - fun `remove checkpoint`() { - val checkpoint = newCheckpoint() - checkpointStorage.addCheckpoint(checkpoint) - checkpointStorage.removeCheckpoint(checkpoint) - assertThat(checkpointStorage.checkpoints()).isEmpty() - newCheckpointStorage() - assertThat(checkpointStorage.checkpoints()).isEmpty() - } - - @Test - fun `remove unknown checkpoint`() { - val checkpoint = newCheckpoint() - assertThatExceptionOfType(IllegalArgumentException::class.java).isThrownBy { - checkpointStorage.removeCheckpoint(checkpoint) - } - } - - @Test - fun `add two checkpoints then remove first one`() { - val firstCheckpoint = newCheckpoint() - checkpointStorage.addCheckpoint(firstCheckpoint) - val secondCheckpoint = newCheckpoint() - checkpointStorage.addCheckpoint(secondCheckpoint) - checkpointStorage.removeCheckpoint(firstCheckpoint) - assertThat(checkpointStorage.checkpoints()).containsExactly(secondCheckpoint) - newCheckpointStorage() - assertThat(checkpointStorage.checkpoints()).containsExactly(secondCheckpoint) - } - - @Test - fun `add checkpoint and then remove after 'restart'`() { - val originalCheckpoint = newCheckpoint() - checkpointStorage.addCheckpoint(originalCheckpoint) - newCheckpointStorage() - val reconstructedCheckpoint = checkpointStorage.checkpoints().single() - assertThat(reconstructedCheckpoint).isEqualTo(originalCheckpoint).isNotSameAs(originalCheckpoint) - checkpointStorage.removeCheckpoint(reconstructedCheckpoint) - assertThat(checkpointStorage.checkpoints()).isEmpty() - } - - @Test - fun `non-checkpoint files are ignored`() { - val checkpoint = newCheckpoint() - checkpointStorage.addCheckpoint(checkpoint) - Files.write(storeDir.resolve("random-non-checkpoint-file"), "this is not a checkpoint!!".toByteArray()) - newCheckpointStorage() - assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint) - } - - private fun newCheckpointStorage() { - checkpointStorage = PerFileCheckpointStorage(storeDir) - } - - private var checkpointCount = 1 - private fun newCheckpoint() = Checkpoint(SerializedBytes(Ints.toByteArray(checkpointCount++))) - -} \ No newline at end of file diff --git a/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt b/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt deleted file mode 100644 index aa928bd335..0000000000 --- a/node/src/test/kotlin/com/r3corda/node/services/persistence/PerFileTransactionStorageTests.kt +++ /dev/null @@ -1,100 +0,0 @@ -package com.r3corda.node.services.persistence - -import com.google.common.jimfs.Configuration.unix -import com.google.common.jimfs.Jimfs -import com.google.common.primitives.Ints -import com.google.common.util.concurrent.SettableFuture -import com.r3corda.core.crypto.DigitalSignature -import com.r3corda.core.crypto.NullPublicKey -import com.r3corda.core.serialization.SerializedBytes -import com.r3corda.core.transactions.SignedTransaction -import org.assertj.core.api.Assertions.assertThat -import org.junit.After -import org.junit.Before -import org.junit.Test -import java.nio.file.FileSystem -import java.nio.file.Files -import java.nio.file.Path -import java.util.concurrent.TimeUnit -import kotlin.test.assertEquals - -class PerFileTransactionStorageTests { - - val fileSystem: FileSystem = Jimfs.newFileSystem(unix()) - val storeDir: Path = fileSystem.getPath("store") - lateinit var transactionStorage: PerFileTransactionStorage - - @Before - fun setUp() { - newTransactionStorage() - } - - @After - fun cleanUp() { - fileSystem.close() - } - - @Test - fun `empty store`() { - assertThat(transactionStorage.getTransaction(newTransaction().id)).isNull() - assertThat(transactionStorage.transactions).isEmpty() - newTransactionStorage() - assertThat(transactionStorage.transactions).isEmpty() - } - - @Test - fun `one transaction`() { - val transaction = newTransaction() - transactionStorage.addTransaction(transaction) - assertTransactionIsRetrievable(transaction) - assertThat(transactionStorage.transactions).containsExactly(transaction) - newTransactionStorage() - assertTransactionIsRetrievable(transaction) - assertThat(transactionStorage.transactions).containsExactly(transaction) - } - - @Test - fun `two transactions across restart`() { - val firstTransaction = newTransaction() - val secondTransaction = newTransaction() - transactionStorage.addTransaction(firstTransaction) - newTransactionStorage() - transactionStorage.addTransaction(secondTransaction) - assertTransactionIsRetrievable(firstTransaction) - assertTransactionIsRetrievable(secondTransaction) - assertThat(transactionStorage.transactions).containsOnly(firstTransaction, secondTransaction) - } - - @Test - fun `non-transaction files are ignored`() { - val transactions = newTransaction() - transactionStorage.addTransaction(transactions) - Files.write(storeDir.resolve("random-non-tx-file"), "this is not a transaction!!".toByteArray()) - newTransactionStorage() - assertThat(transactionStorage.transactions).containsExactly(transactions) - } - - @Test - fun `updates are fired`() { - val future = SettableFuture.create() - transactionStorage.updates.subscribe { tx -> future.set(tx) } - val expected = newTransaction() - transactionStorage.addTransaction(expected) - val actual = future.get(1, TimeUnit.SECONDS) - assertEquals(expected, actual) - } - - private fun newTransactionStorage() { - transactionStorage = PerFileTransactionStorage(storeDir) - } - - private fun assertTransactionIsRetrievable(transaction: SignedTransaction) { - assertThat(transactionStorage.getTransaction(transaction.id)).isEqualTo(transaction) - } - - private var txCount = 0 - private fun newTransaction() = SignedTransaction( - SerializedBytes(Ints.toByteArray(++txCount)), - listOf(DigitalSignature.WithKey(NullPublicKey, ByteArray(1)))) - -} \ No newline at end of file diff --git a/node/src/test/kotlin/com/r3corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/com/r3corda/node/services/statemachine/StateMachineManagerTests.kt index de8dc0d04e..f4b8c7b4f3 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/statemachine/StateMachineManagerTests.kt @@ -10,6 +10,7 @@ import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.deserialize import com.r3corda.node.services.persistence.checkpoints import com.r3corda.node.services.statemachine.StateMachineManager.* +import com.r3corda.node.utilities.databaseTransaction import com.r3corda.testing.initiateSingleShotProtocol import com.r3corda.testing.node.InMemoryMessagingNetwork import com.r3corda.testing.node.InMemoryMessagingNetwork.MessageTransfer @@ -73,6 +74,7 @@ class StateMachineManagerTests { // We push through just enough messages to get only the payload sent node2.pumpReceive() + node2.disableDBCloseOnStop() node2.stop() net.runNetwork() val restoredProtocol = node2.restartAndGetRestoredProtocol(node1) @@ -95,6 +97,7 @@ class StateMachineManagerTests { val protocol = NoOpProtocol() node3.smm.add(protocol) assertEquals(false, protocol.protocolStarted) // Not started yet as no network activity has been allowed yet + node3.disableDBCloseOnStop() node3.stop() node3 = net.createNode(node1.info.address, forcedID = node3.id) @@ -103,6 +106,7 @@ class StateMachineManagerTests { net.runNetwork() // Allow network map messages to flow node3.smm.executor.flush() assertEquals(true, restoredProtocol.protocolStarted) // Now we should have run the protocol and hopefully cleared the init checkpoint + node3.disableDBCloseOnStop() node3.stop() // Now it is completed the protocol should leave no Checkpoint. @@ -119,6 +123,7 @@ class StateMachineManagerTests { node2.smm.add(ReceiveThenSuspendProtocol(node1.info.legalIdentity)) // Prepare checkpointed receive protocol // Make sure the add() has finished initial processing. node2.smm.executor.flush() + node2.disableDBCloseOnStop() node2.stop() // kill receiver val restoredProtocol = node2.restartAndGetRestoredProtocol(node1) assertThat(restoredProtocol.receivedPayloads[0]).isEqualTo(payload) @@ -138,16 +143,22 @@ class StateMachineManagerTests { // Kick off first send and receive node2.smm.add(PingPongProtocol(node3.info.legalIdentity, payload)) - assertEquals(1, node2.checkpointStorage.checkpoints().size) + databaseTransaction(node2.database) { + assertEquals(1, node2.checkpointStorage.checkpoints().size) + } // Make sure the add() has finished initial processing. node2.smm.executor.flush() + node2.disableDBCloseOnStop() // Restart node and thus reload the checkpoint and resend the message with same UUID node2.stop() + databaseTransaction(node2.database) { + assertEquals(1, node2.checkpointStorage.checkpoints().size) // confirm checkpoint + } val node2b = net.createNode(node1.info.address, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray()) + node2.manuallyCloseDB() val (firstAgain, fut1) = node2b.getSingleProtocol() // Run the network which will also fire up the second protocol. First message should get deduped. So message data stays in sync. net.runNetwork() - assertEquals(1, node2.checkpointStorage.checkpoints().size) node2b.smm.executor.flush() fut1.get() @@ -156,8 +167,12 @@ class StateMachineManagerTests { assertEquals(4, receivedCount, "Protocol should have exchanged 4 unique messages")// Two messages each way // can't give a precise value as every addMessageHandler re-runs the undelivered messages assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages") - assertEquals(0, node2b.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended") - assertEquals(0, node3.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended") + databaseTransaction(node2b.database) { + assertEquals(0, node2b.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended") + } + databaseTransaction(node3.database) { + assertEquals(0, node3.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended") + } assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3") assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3") assertEquals(payload, secondProtocol.get().receivedPayload, "Received payload does not match the (restarted) first value on Node 2") @@ -253,8 +268,10 @@ class StateMachineManagerTests { private inline fun > MockNode.restartAndGetRestoredProtocol( networkMapNode: MockNode? = null): P { + disableDBCloseOnStop() //Handover DB to new node copy stop() val newNode = mockNet.createNode(networkMapNode?.info?.address, id, advertisedServices = *advertisedServices.toTypedArray()) + manuallyCloseDB() mockNet.runNetwork() // allow NetworkMapService messages to stabilise and thus start the state machine return newNode.getSingleProtocol

().first } diff --git a/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt b/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt index 66c5a3a248..d32c52560a 100644 --- a/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt +++ b/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt @@ -9,6 +9,7 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken import com.r3corda.core.utilities.trace import com.r3corda.node.services.api.MessagingServiceBuilder import com.r3corda.node.utilities.AffinityExecutor +import com.r3corda.node.utilities.JDBCHashSet import com.r3corda.node.utilities.databaseTransaction import com.r3corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging import org.jetbrains.exposed.sql.Database @@ -220,7 +221,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria private inner class InnerState { val handlers: MutableList = ArrayList() - val pendingRedelivery = LinkedList() + val pendingRedelivery = JDBCHashSet("pending_messages",loadOnInit = true) } private val state = ThreadBox(InnerState()) @@ -246,11 +247,14 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria check(running) val (handler, items) = state.locked { val handler = Handler(topicSession, callback).apply { handlers.add(this) } - val items = ArrayList(pendingRedelivery) - pendingRedelivery.clear() - Pair(handler, items) + val pending = ArrayList() + databaseTransaction(database) { + pending.addAll(pendingRedelivery) + pendingRedelivery.clear() + } + Pair(handler, pending) } - for ((sender, message) in items) { + for (message in items) { send(message, handle) } return handler @@ -330,7 +334,9 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria // up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at // least sometimes. log.warn("Message to ${transfer.message.topicSession} could not be delivered") - pendingRedelivery.add(transfer) + databaseTransaction(database) { + pendingRedelivery.add(transfer.message) + } null } else { h diff --git a/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt index 18b08d6c1e..7cace026c4 100644 --- a/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt @@ -4,7 +4,6 @@ import com.google.common.jimfs.Configuration.unix import com.google.common.jimfs.Jimfs import com.google.common.util.concurrent.Futures import com.r3corda.core.crypto.Party -import com.r3corda.core.div import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.PhysicalLocation import com.r3corda.core.node.services.KeyManagementService @@ -15,21 +14,17 @@ import com.r3corda.core.testing.InMemoryVaultService import com.r3corda.core.utilities.DUMMY_NOTARY_KEY import com.r3corda.core.utilities.loggerFor import com.r3corda.node.internal.AbstractNode -import com.r3corda.node.services.api.CheckpointStorage import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.keys.E2ETestKeyManagementService import com.r3corda.node.services.messaging.CordaRPCOps import com.r3corda.node.services.network.InMemoryNetworkMapService import com.r3corda.node.services.network.NetworkMapService -import com.r3corda.node.services.persistence.DBCheckpointStorage -import com.r3corda.node.services.persistence.PerFileCheckpointStorage import com.r3corda.node.services.transactions.InMemoryUniquenessProvider import com.r3corda.node.services.transactions.SimpleNotaryService import com.r3corda.node.services.transactions.ValidatingNotaryService import com.r3corda.node.utilities.AffinityExecutor import com.r3corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor -import com.r3corda.node.utilities.databaseTransaction import org.slf4j.Logger import java.nio.file.FileSystem import java.nio.file.Files @@ -128,14 +123,6 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, serverThread, configuration.myLegalName, database).start().get() } - override fun initialiseCheckpointService(dir: Path): CheckpointStorage { - return if (mockNet.threadPerNode) { - DBCheckpointStorage() - } else { - PerFileCheckpointStorage(dir / "checkpoints") - } - } - override fun makeIdentityService() = MockIdentityService(mockNet.identities) override fun makeVaultService(): VaultService = InMemoryVaultService(services)