mirror of
https://github.com/corda/corda.git
synced 2025-01-19 03:06:36 +00:00
Merged in mnesbit-cor-389-all-transaction-tests-in-db (pull request #411)
Messages requiring redelivery to late registered handler persisted in database.
This commit is contained in:
commit
60c1dcdbde
@ -43,7 +43,7 @@ interface APIServer {
|
||||
fun status(): Response
|
||||
|
||||
/**
|
||||
* Report this nodes configuration and identities.
|
||||
* Report this node's configuration and identities.
|
||||
* Currently tunnels the NodeInfo as an encoding of the Kryo serialised form.
|
||||
* TODO this functionality should be available via the RPC
|
||||
*/
|
||||
|
@ -437,13 +437,9 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
|
||||
|
||||
protected abstract fun startMessagingService(cordaRPCOps: CordaRPCOps)
|
||||
|
||||
protected open fun initialiseCheckpointService(dir: Path): CheckpointStorage {
|
||||
return DBCheckpointStorage()
|
||||
}
|
||||
|
||||
protected open fun initialiseStorageService(dir: Path): Pair<TxWritableStorageService, CheckpointStorage> {
|
||||
val attachments = makeAttachmentStorage(dir)
|
||||
val checkpointStorage = initialiseCheckpointService(dir)
|
||||
val checkpointStorage = DBCheckpointStorage()
|
||||
val transactionStorage = DBTransactionStorage()
|
||||
_servicesThatAcceptUploads += attachments
|
||||
// Populate the partyKeys set.
|
||||
|
@ -86,8 +86,7 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
var rpcConsumer: ClientConsumer? = null
|
||||
var rpcNotificationConsumer: ClientConsumer? = null
|
||||
|
||||
// TODO: This is not robust and needs to be replaced by more intelligently using the message queue server.
|
||||
var undeliveredMessages = listOf<Message>()
|
||||
var pendingRedelivery = JDBCHashSet<Message>("pending_messages",loadOnInit = true)
|
||||
}
|
||||
|
||||
/** A registration to handle messages of different types */
|
||||
@ -254,10 +253,10 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
// without causing log spam.
|
||||
log.warn("Received message for ${msg.topicSession} that doesn't have any registered handlers yet")
|
||||
|
||||
// This is a hack; transient messages held in memory isn't crash resistant.
|
||||
// TODO: Use Artemis API more effectively so we don't pop messages off a queue that we aren't ready to use.
|
||||
state.locked {
|
||||
undeliveredMessages += msg
|
||||
databaseTransaction(database) {
|
||||
pendingRedelivery.add(msg)
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
@ -371,9 +370,12 @@ class NodeMessagingClient(config: NodeConfiguration,
|
||||
val handler = Handler(topicSession, callback)
|
||||
handlers.add(handler)
|
||||
val messagesToRedeliver = state.locked {
|
||||
val messagesToRedeliver = undeliveredMessages
|
||||
undeliveredMessages = listOf()
|
||||
messagesToRedeliver
|
||||
val pending = ArrayList<Message>()
|
||||
databaseTransaction(database) {
|
||||
pending.addAll(pendingRedelivery)
|
||||
pendingRedelivery.clear()
|
||||
}
|
||||
pending
|
||||
}
|
||||
messagesToRedeliver.forEach { deliver(it) }
|
||||
return handler
|
||||
|
@ -1,73 +0,0 @@
|
||||
package com.r3corda.node.services.persistence
|
||||
|
||||
import com.r3corda.core.serialization.SerializedBytes
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.api.Checkpoint
|
||||
import com.r3corda.node.services.api.CheckpointStorage
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardCopyOption
|
||||
import java.util.*
|
||||
import java.util.Collections.synchronizedMap
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
|
||||
/**
|
||||
* File-based checkpoint storage, storing checkpoints per file.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class PerFileCheckpointStorage(val storeDir: Path) : CheckpointStorage {
|
||||
|
||||
companion object {
|
||||
private val logger = loggerFor<PerFileCheckpointStorage>()
|
||||
private val fileExtension = ".checkpoint"
|
||||
}
|
||||
|
||||
private val checkpointFiles = synchronizedMap(IdentityHashMap<Checkpoint, Path>())
|
||||
|
||||
init {
|
||||
logger.trace { "Initialising per file checkpoint storage on $storeDir" }
|
||||
Files.createDirectories(storeDir)
|
||||
Files.list(storeDir)
|
||||
.filter { it.toString().toLowerCase().endsWith(fileExtension) }
|
||||
.forEach {
|
||||
val checkpoint = Files.readAllBytes(it).deserialize<Checkpoint>()
|
||||
checkpointFiles[checkpoint] = it
|
||||
}
|
||||
}
|
||||
|
||||
override fun addCheckpoint(checkpoint: Checkpoint) {
|
||||
val fileName = "${checkpoint.id.toString().toLowerCase()}$fileExtension"
|
||||
val checkpointFile = storeDir.resolve(fileName)
|
||||
atomicWrite(checkpointFile, checkpoint.serialize())
|
||||
logger.trace { "Stored $checkpoint to $checkpointFile" }
|
||||
checkpointFiles[checkpoint] = checkpointFile
|
||||
}
|
||||
|
||||
private fun atomicWrite(checkpointFile: Path, serialisedCheckpoint: SerializedBytes<Checkpoint>) {
|
||||
val tempCheckpointFile = checkpointFile.parent.resolve("${checkpointFile.fileName}.tmp")
|
||||
serialisedCheckpoint.writeToFile(tempCheckpointFile)
|
||||
Files.move(tempCheckpointFile, checkpointFile, StandardCopyOption.ATOMIC_MOVE)
|
||||
}
|
||||
|
||||
override fun removeCheckpoint(checkpoint: Checkpoint) {
|
||||
val checkpointFile = checkpointFiles.remove(checkpoint)
|
||||
require(checkpointFile != null) { "Trying to removing unknown checkpoint: $checkpoint" }
|
||||
Files.delete(checkpointFile)
|
||||
logger.trace { "Removed $checkpoint ($checkpointFile)" }
|
||||
}
|
||||
|
||||
override fun forEach(block: (Checkpoint)->Boolean) {
|
||||
synchronized(checkpointFiles) {
|
||||
for(checkpoint in checkpointFiles.keys) {
|
||||
if (!block(checkpoint)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -1,70 +0,0 @@
|
||||
package com.r3corda.node.services.persistence
|
||||
|
||||
import com.r3corda.core.ThreadBox
|
||||
import com.r3corda.core.bufferUntilSubscribed
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.node.services.TransactionStorage
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import com.r3corda.core.transactions.SignedTransaction
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.core.utilities.trace
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* File-based transaction storage, storing transactions per file.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class PerFileTransactionStorage(val storeDir: Path) : TransactionStorage {
|
||||
companion object {
|
||||
private val logger = loggerFor<PerFileCheckpointStorage>()
|
||||
private val fileExtension = ".transaction"
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(object {
|
||||
val transactionsMap = HashMap<SecureHash, SignedTransaction>()
|
||||
val updatesPublisher = PublishSubject.create<SignedTransaction>()
|
||||
|
||||
fun notify(transaction: SignedTransaction) = updatesPublisher.onNext(transaction)
|
||||
})
|
||||
|
||||
override val updates: Observable<SignedTransaction>
|
||||
get() = mutex.content.updatesPublisher
|
||||
|
||||
init {
|
||||
logger.trace { "Initialising per file transaction storage on $storeDir" }
|
||||
Files.createDirectories(storeDir)
|
||||
mutex.locked {
|
||||
Files.list(storeDir)
|
||||
.filter { it.toString().toLowerCase().endsWith(fileExtension) }
|
||||
.map { Files.readAllBytes(it).deserialize<SignedTransaction>() }
|
||||
.forEach { transactionsMap[it.id] = it }
|
||||
}
|
||||
}
|
||||
|
||||
override fun addTransaction(transaction: SignedTransaction) {
|
||||
val transactionFile = storeDir.resolve("${transaction.id.toString().toLowerCase()}$fileExtension")
|
||||
transaction.serialize().writeToFile(transactionFile)
|
||||
mutex.locked {
|
||||
transactionsMap[transaction.id] = transaction
|
||||
notify(transaction)
|
||||
}
|
||||
logger.trace { "Stored $transaction to $transactionFile" }
|
||||
}
|
||||
|
||||
override fun getTransaction(id: SecureHash): SignedTransaction? = mutex.locked { transactionsMap[id] }
|
||||
|
||||
val transactions: Iterable<SignedTransaction> get() = mutex.locked { transactionsMap.values.toList() }
|
||||
|
||||
override fun track(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> {
|
||||
return mutex.locked {
|
||||
Pair(transactionsMap.values.toList(), updates.bufferUntilSubscribed())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -155,16 +155,14 @@ class TwoPartyTradeProtocolTests {
|
||||
bobNode.pumpReceive()
|
||||
|
||||
// OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature.
|
||||
assertThat(bobNode.checkpointStorage.checkpoints()).hasSize(1)
|
||||
databaseTransaction(bobNode.database) {
|
||||
assertThat(bobNode.checkpointStorage.checkpoints()).hasSize(1)
|
||||
}
|
||||
|
||||
val storage = bobNode.storage.validatedTransactions
|
||||
val bobTransactionsBeforeCrash = if (storage is PerFileTransactionStorage) {
|
||||
storage.transactions
|
||||
} else if (storage is DBTransactionStorage) {
|
||||
databaseTransaction(bobNode.database) {
|
||||
storage.transactions
|
||||
}
|
||||
} else throw IllegalArgumentException("Unknown storage implementation")
|
||||
val bobTransactionsBeforeCrash = databaseTransaction(bobNode.database) {
|
||||
(storage as DBTransactionStorage).transactions
|
||||
}
|
||||
assertThat(bobTransactionsBeforeCrash).isNotEmpty()
|
||||
|
||||
// .. and let's imagine that Bob's computer has a power cut. He now has nothing now beyond what was on disk.
|
||||
|
@ -12,7 +12,7 @@ import com.r3corda.core.protocols.ProtocolLogicRefFactory
|
||||
import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY
|
||||
import com.r3corda.node.services.events.NodeSchedulerService
|
||||
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
|
||||
import com.r3corda.node.services.persistence.DBCheckpointStorage
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
@ -88,7 +88,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
}
|
||||
scheduler = NodeSchedulerService(database, services, factory, schedulerGatedExecutor)
|
||||
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
|
||||
val mockSMM = StateMachineManager(services, listOf(services, scheduler), PerFileCheckpointStorage(fs.getPath("checkpoints")), smmExecutor, database)
|
||||
val mockSMM = StateMachineManager(services, listOf(services, scheduler), DBCheckpointStorage(), smmExecutor, database)
|
||||
mockSMM.changes.subscribe { change ->
|
||||
if (change.addOrRemove == AddOrRemove.REMOVE && mockSMM.allStateMachines.isEmpty()) {
|
||||
smmHasRemovedAllProtocols.countDown()
|
||||
|
@ -1,99 +0,0 @@
|
||||
package com.r3corda.node.services.persistence
|
||||
|
||||
import com.google.common.jimfs.Configuration.unix
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import com.google.common.primitives.Ints
|
||||
import com.r3corda.core.serialization.SerializedBytes
|
||||
import com.r3corda.node.services.api.Checkpoint
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.nio.file.FileSystem
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
|
||||
class PerFileCheckpointStorageTests {
|
||||
|
||||
val fileSystem: FileSystem = Jimfs.newFileSystem(unix())
|
||||
val storeDir: Path = fileSystem.getPath("store")
|
||||
lateinit var checkpointStorage: PerFileCheckpointStorage
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
newCheckpointStorage()
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
fileSystem.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `add new checkpoint`() {
|
||||
val checkpoint = newCheckpoint()
|
||||
checkpointStorage.addCheckpoint(checkpoint)
|
||||
assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint)
|
||||
newCheckpointStorage()
|
||||
assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `remove checkpoint`() {
|
||||
val checkpoint = newCheckpoint()
|
||||
checkpointStorage.addCheckpoint(checkpoint)
|
||||
checkpointStorage.removeCheckpoint(checkpoint)
|
||||
assertThat(checkpointStorage.checkpoints()).isEmpty()
|
||||
newCheckpointStorage()
|
||||
assertThat(checkpointStorage.checkpoints()).isEmpty()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `remove unknown checkpoint`() {
|
||||
val checkpoint = newCheckpoint()
|
||||
assertThatExceptionOfType(IllegalArgumentException::class.java).isThrownBy {
|
||||
checkpointStorage.removeCheckpoint(checkpoint)
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `add two checkpoints then remove first one`() {
|
||||
val firstCheckpoint = newCheckpoint()
|
||||
checkpointStorage.addCheckpoint(firstCheckpoint)
|
||||
val secondCheckpoint = newCheckpoint()
|
||||
checkpointStorage.addCheckpoint(secondCheckpoint)
|
||||
checkpointStorage.removeCheckpoint(firstCheckpoint)
|
||||
assertThat(checkpointStorage.checkpoints()).containsExactly(secondCheckpoint)
|
||||
newCheckpointStorage()
|
||||
assertThat(checkpointStorage.checkpoints()).containsExactly(secondCheckpoint)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `add checkpoint and then remove after 'restart'`() {
|
||||
val originalCheckpoint = newCheckpoint()
|
||||
checkpointStorage.addCheckpoint(originalCheckpoint)
|
||||
newCheckpointStorage()
|
||||
val reconstructedCheckpoint = checkpointStorage.checkpoints().single()
|
||||
assertThat(reconstructedCheckpoint).isEqualTo(originalCheckpoint).isNotSameAs(originalCheckpoint)
|
||||
checkpointStorage.removeCheckpoint(reconstructedCheckpoint)
|
||||
assertThat(checkpointStorage.checkpoints()).isEmpty()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `non-checkpoint files are ignored`() {
|
||||
val checkpoint = newCheckpoint()
|
||||
checkpointStorage.addCheckpoint(checkpoint)
|
||||
Files.write(storeDir.resolve("random-non-checkpoint-file"), "this is not a checkpoint!!".toByteArray())
|
||||
newCheckpointStorage()
|
||||
assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint)
|
||||
}
|
||||
|
||||
private fun newCheckpointStorage() {
|
||||
checkpointStorage = PerFileCheckpointStorage(storeDir)
|
||||
}
|
||||
|
||||
private var checkpointCount = 1
|
||||
private fun newCheckpoint() = Checkpoint(SerializedBytes(Ints.toByteArray(checkpointCount++)))
|
||||
|
||||
}
|
@ -1,100 +0,0 @@
|
||||
package com.r3corda.node.services.persistence
|
||||
|
||||
import com.google.common.jimfs.Configuration.unix
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import com.google.common.primitives.Ints
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import com.r3corda.core.crypto.DigitalSignature
|
||||
import com.r3corda.core.crypto.NullPublicKey
|
||||
import com.r3corda.core.serialization.SerializedBytes
|
||||
import com.r3corda.core.transactions.SignedTransaction
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.nio.file.FileSystem
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class PerFileTransactionStorageTests {
|
||||
|
||||
val fileSystem: FileSystem = Jimfs.newFileSystem(unix())
|
||||
val storeDir: Path = fileSystem.getPath("store")
|
||||
lateinit var transactionStorage: PerFileTransactionStorage
|
||||
|
||||
@Before
|
||||
fun setUp() {
|
||||
newTransactionStorage()
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
fileSystem.close()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `empty store`() {
|
||||
assertThat(transactionStorage.getTransaction(newTransaction().id)).isNull()
|
||||
assertThat(transactionStorage.transactions).isEmpty()
|
||||
newTransactionStorage()
|
||||
assertThat(transactionStorage.transactions).isEmpty()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `one transaction`() {
|
||||
val transaction = newTransaction()
|
||||
transactionStorage.addTransaction(transaction)
|
||||
assertTransactionIsRetrievable(transaction)
|
||||
assertThat(transactionStorage.transactions).containsExactly(transaction)
|
||||
newTransactionStorage()
|
||||
assertTransactionIsRetrievable(transaction)
|
||||
assertThat(transactionStorage.transactions).containsExactly(transaction)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `two transactions across restart`() {
|
||||
val firstTransaction = newTransaction()
|
||||
val secondTransaction = newTransaction()
|
||||
transactionStorage.addTransaction(firstTransaction)
|
||||
newTransactionStorage()
|
||||
transactionStorage.addTransaction(secondTransaction)
|
||||
assertTransactionIsRetrievable(firstTransaction)
|
||||
assertTransactionIsRetrievable(secondTransaction)
|
||||
assertThat(transactionStorage.transactions).containsOnly(firstTransaction, secondTransaction)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `non-transaction files are ignored`() {
|
||||
val transactions = newTransaction()
|
||||
transactionStorage.addTransaction(transactions)
|
||||
Files.write(storeDir.resolve("random-non-tx-file"), "this is not a transaction!!".toByteArray())
|
||||
newTransactionStorage()
|
||||
assertThat(transactionStorage.transactions).containsExactly(transactions)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `updates are fired`() {
|
||||
val future = SettableFuture.create<SignedTransaction>()
|
||||
transactionStorage.updates.subscribe { tx -> future.set(tx) }
|
||||
val expected = newTransaction()
|
||||
transactionStorage.addTransaction(expected)
|
||||
val actual = future.get(1, TimeUnit.SECONDS)
|
||||
assertEquals(expected, actual)
|
||||
}
|
||||
|
||||
private fun newTransactionStorage() {
|
||||
transactionStorage = PerFileTransactionStorage(storeDir)
|
||||
}
|
||||
|
||||
private fun assertTransactionIsRetrievable(transaction: SignedTransaction) {
|
||||
assertThat(transactionStorage.getTransaction(transaction.id)).isEqualTo(transaction)
|
||||
}
|
||||
|
||||
private var txCount = 0
|
||||
private fun newTransaction() = SignedTransaction(
|
||||
SerializedBytes(Ints.toByteArray(++txCount)),
|
||||
listOf(DigitalSignature.WithKey(NullPublicKey, ByteArray(1))))
|
||||
|
||||
}
|
@ -10,6 +10,7 @@ import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.node.services.persistence.checkpoints
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager.*
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import com.r3corda.testing.initiateSingleShotProtocol
|
||||
import com.r3corda.testing.node.InMemoryMessagingNetwork
|
||||
import com.r3corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
|
||||
@ -73,6 +74,7 @@ class StateMachineManagerTests {
|
||||
|
||||
// We push through just enough messages to get only the payload sent
|
||||
node2.pumpReceive()
|
||||
node2.disableDBCloseOnStop()
|
||||
node2.stop()
|
||||
net.runNetwork()
|
||||
val restoredProtocol = node2.restartAndGetRestoredProtocol<ReceiveThenSuspendProtocol>(node1)
|
||||
@ -95,6 +97,7 @@ class StateMachineManagerTests {
|
||||
val protocol = NoOpProtocol()
|
||||
node3.smm.add(protocol)
|
||||
assertEquals(false, protocol.protocolStarted) // Not started yet as no network activity has been allowed yet
|
||||
node3.disableDBCloseOnStop()
|
||||
node3.stop()
|
||||
|
||||
node3 = net.createNode(node1.info.address, forcedID = node3.id)
|
||||
@ -103,6 +106,7 @@ class StateMachineManagerTests {
|
||||
net.runNetwork() // Allow network map messages to flow
|
||||
node3.smm.executor.flush()
|
||||
assertEquals(true, restoredProtocol.protocolStarted) // Now we should have run the protocol and hopefully cleared the init checkpoint
|
||||
node3.disableDBCloseOnStop()
|
||||
node3.stop()
|
||||
|
||||
// Now it is completed the protocol should leave no Checkpoint.
|
||||
@ -119,6 +123,7 @@ class StateMachineManagerTests {
|
||||
node2.smm.add(ReceiveThenSuspendProtocol(node1.info.legalIdentity)) // Prepare checkpointed receive protocol
|
||||
// Make sure the add() has finished initial processing.
|
||||
node2.smm.executor.flush()
|
||||
node2.disableDBCloseOnStop()
|
||||
node2.stop() // kill receiver
|
||||
val restoredProtocol = node2.restartAndGetRestoredProtocol<ReceiveThenSuspendProtocol>(node1)
|
||||
assertThat(restoredProtocol.receivedPayloads[0]).isEqualTo(payload)
|
||||
@ -138,16 +143,22 @@ class StateMachineManagerTests {
|
||||
|
||||
// Kick off first send and receive
|
||||
node2.smm.add(PingPongProtocol(node3.info.legalIdentity, payload))
|
||||
assertEquals(1, node2.checkpointStorage.checkpoints().size)
|
||||
databaseTransaction(node2.database) {
|
||||
assertEquals(1, node2.checkpointStorage.checkpoints().size)
|
||||
}
|
||||
// Make sure the add() has finished initial processing.
|
||||
node2.smm.executor.flush()
|
||||
node2.disableDBCloseOnStop()
|
||||
// Restart node and thus reload the checkpoint and resend the message with same UUID
|
||||
node2.stop()
|
||||
databaseTransaction(node2.database) {
|
||||
assertEquals(1, node2.checkpointStorage.checkpoints().size) // confirm checkpoint
|
||||
}
|
||||
val node2b = net.createNode(node1.info.address, node2.id, advertisedServices = *node2.advertisedServices.toTypedArray())
|
||||
node2.manuallyCloseDB()
|
||||
val (firstAgain, fut1) = node2b.getSingleProtocol<PingPongProtocol>()
|
||||
// Run the network which will also fire up the second protocol. First message should get deduped. So message data stays in sync.
|
||||
net.runNetwork()
|
||||
assertEquals(1, node2.checkpointStorage.checkpoints().size)
|
||||
node2b.smm.executor.flush()
|
||||
fut1.get()
|
||||
|
||||
@ -156,8 +167,12 @@ class StateMachineManagerTests {
|
||||
assertEquals(4, receivedCount, "Protocol should have exchanged 4 unique messages")// Two messages each way
|
||||
// can't give a precise value as every addMessageHandler re-runs the undelivered messages
|
||||
assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages")
|
||||
assertEquals(0, node2b.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended")
|
||||
assertEquals(0, node3.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended")
|
||||
databaseTransaction(node2b.database) {
|
||||
assertEquals(0, node2b.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended")
|
||||
}
|
||||
databaseTransaction(node3.database) {
|
||||
assertEquals(0, node3.checkpointStorage.checkpoints().size, "Checkpoints left after restored protocol should have ended")
|
||||
}
|
||||
assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3")
|
||||
assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3")
|
||||
assertEquals(payload, secondProtocol.get().receivedPayload, "Received payload does not match the (restarted) first value on Node 2")
|
||||
@ -253,8 +268,10 @@ class StateMachineManagerTests {
|
||||
|
||||
private inline fun <reified P : ProtocolLogic<*>> MockNode.restartAndGetRestoredProtocol(
|
||||
networkMapNode: MockNode? = null): P {
|
||||
disableDBCloseOnStop() //Handover DB to new node copy
|
||||
stop()
|
||||
val newNode = mockNet.createNode(networkMapNode?.info?.address, id, advertisedServices = *advertisedServices.toTypedArray())
|
||||
manuallyCloseDB()
|
||||
mockNet.runNetwork() // allow NetworkMapService messages to stabilise and thus start the state machine
|
||||
return newNode.getSingleProtocol<P>().first
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ import com.r3corda.core.serialization.SingletonSerializeAsToken
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.api.MessagingServiceBuilder
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import com.r3corda.node.utilities.JDBCHashSet
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import com.r3corda.testing.node.InMemoryMessagingNetwork.InMemoryMessaging
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
@ -220,7 +221,7 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
|
||||
private inner class InnerState {
|
||||
val handlers: MutableList<Handler> = ArrayList()
|
||||
val pendingRedelivery = LinkedList<MessageTransfer>()
|
||||
val pendingRedelivery = JDBCHashSet<Message>("pending_messages",loadOnInit = true)
|
||||
}
|
||||
|
||||
private val state = ThreadBox(InnerState())
|
||||
@ -246,11 +247,14 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
check(running)
|
||||
val (handler, items) = state.locked {
|
||||
val handler = Handler(topicSession, callback).apply { handlers.add(this) }
|
||||
val items = ArrayList(pendingRedelivery)
|
||||
pendingRedelivery.clear()
|
||||
Pair(handler, items)
|
||||
val pending = ArrayList<Message>()
|
||||
databaseTransaction(database) {
|
||||
pending.addAll(pendingRedelivery)
|
||||
pendingRedelivery.clear()
|
||||
}
|
||||
Pair(handler, pending)
|
||||
}
|
||||
for ((sender, message) in items) {
|
||||
for (message in items) {
|
||||
send(message, handle)
|
||||
}
|
||||
return handler
|
||||
@ -330,7 +334,9 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
|
||||
// up a handler for yet. Most unit tests don't run threaded, but we want to test true parallelism at
|
||||
// least sometimes.
|
||||
log.warn("Message to ${transfer.message.topicSession} could not be delivered")
|
||||
pendingRedelivery.add(transfer)
|
||||
databaseTransaction(database) {
|
||||
pendingRedelivery.add(transfer.message)
|
||||
}
|
||||
null
|
||||
} else {
|
||||
h
|
||||
|
@ -4,7 +4,6 @@ import com.google.common.jimfs.Configuration.unix
|
||||
import com.google.common.jimfs.Jimfs
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.div
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.core.node.PhysicalLocation
|
||||
import com.r3corda.core.node.services.KeyManagementService
|
||||
@ -15,21 +14,17 @@ import com.r3corda.core.testing.InMemoryVaultService
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.internal.AbstractNode
|
||||
import com.r3corda.node.services.api.CheckpointStorage
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.keys.E2ETestKeyManagementService
|
||||
import com.r3corda.node.services.messaging.CordaRPCOps
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapService
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.persistence.DBCheckpointStorage
|
||||
import com.r3corda.node.services.persistence.PerFileCheckpointStorage
|
||||
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
|
||||
import com.r3corda.node.services.transactions.SimpleNotaryService
|
||||
import com.r3corda.node.services.transactions.ValidatingNotaryService
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import com.r3corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import org.slf4j.Logger
|
||||
import java.nio.file.FileSystem
|
||||
import java.nio.file.Files
|
||||
@ -128,14 +123,6 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
||||
return mockNet.messagingNetwork.createNodeWithID(!mockNet.threadPerNode, id, serverThread, configuration.myLegalName, database).start().get()
|
||||
}
|
||||
|
||||
override fun initialiseCheckpointService(dir: Path): CheckpointStorage {
|
||||
return if (mockNet.threadPerNode) {
|
||||
DBCheckpointStorage()
|
||||
} else {
|
||||
PerFileCheckpointStorage(dir / "checkpoints")
|
||||
}
|
||||
}
|
||||
|
||||
override fun makeIdentityService() = MockIdentityService(mockNet.identities)
|
||||
|
||||
override fun makeVaultService(): VaultService = InMemoryVaultService(services)
|
||||
|
Loading…
Reference in New Issue
Block a user