CORDA-2942: Improve tests coverage for Checkpoint dumper (#5830)

* CORDA-2942: Switch to use predictable timestamp

* CORDA-2942: Validate content of dumped checkpoint

* CORDA-2942: First stub on the integration test
(no checkpoints dumped for some reason using RPC)

* CORDA-2942: Reduce checkpointing code to bare minimum

* CORDA-2942: Minor refactoring

* CORDA-2942: Verify dump checkpoint content
This commit is contained in:
Viktor Kolomeyko 2019-12-30 10:43:59 +00:00 committed by Anthony Keenan
parent d5e7c9abbd
commit 7f4c2ca974
3 changed files with 148 additions and 6 deletions

View File

@ -0,0 +1,98 @@
package net.corda.node.services.rpc
import co.paralleluniverse.fibers.Suspendable
import com.natpryce.hamkrest.assertion.assertThat
import com.natpryce.hamkrest.containsSubstring
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.inputStream
import net.corda.core.internal.isRegularFile
import net.corda.core.internal.list
import net.corda.core.internal.messaging.InternalCordaRPCOps
import net.corda.core.internal.readFully
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.NodeStartup
import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.CountUpDownLatch
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.enclosedCordapp
import org.junit.Test
import java.nio.file.Path
import java.util.concurrent.CountDownLatch
import java.util.zip.ZipInputStream
import kotlin.test.assertEquals
class DumpCheckpointsTest {
companion object {
private val dumpCheckPointLatch = CountDownLatch(1)
private val flowProceedLatch = CountUpDownLatch(1)
}
@Test
fun `verify checkpoint dump via RPC`() {
val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
val proxy = it.proxy as InternalCordaRPCOps
// 1 for GetNumberOfCheckpointsFlow itself
val checkPointCountFuture = proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue
val logDirPath = nodeAHandle.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME
logDirPath.createDirectories()
dumpCheckPointLatch.await()
proxy.dumpCheckpoints()
flowProceedLatch.countDown()
assertEquals(1, checkPointCountFuture.get())
checkDumpFile(logDirPath)
}
}
}
private fun checkDumpFile(dir: Path) {
// The directory supposed to contain a single ZIP file
val file = dir.list().single { it.isRegularFile() }
ZipInputStream(file.inputStream()).use { zip ->
val entry = zip.nextEntry
assertThat(entry.name, containsSubstring("json"))
val content = zip.readFully()
assertThat(String(content), containsSubstring(GetNumberOfCheckpointsFlow::class.java.name))
}
}
@StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<Int>() {
@Suspendable
override fun call(): Int {
var count = 0
serviceHub.jdbcSession().prepareStatement("select * from node_checkpoints").use { ps ->
ps.executeQuery().use { rs ->
while (rs.next()) {
count++
}
}
}
syncUp()
return count
}
@Suspendable
private fun syncUp() {
dumpCheckPointLatch.countDown()
flowProceedLatch.await()
}
}
}

View File

@ -58,7 +58,7 @@ import java.util.zip.ZipOutputStream
class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence, private val serviceHub: ServiceHub, val baseDirectory: Path) { class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence, private val serviceHub: ServiceHub, val baseDirectory: Path) {
companion object { companion object {
private val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(UTC) internal val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(UTC)
private val log = contextLogger() private val log = contextLogger()
} }

View File

@ -1,12 +1,25 @@
package net.corda.node.services.rpc package net.corda.node.services.rpc
import com.natpryce.hamkrest.assertion.assertThat
import com.natpryce.hamkrest.containsSubstring
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.context.InvocationContext import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.createDirectories
import net.corda.core.internal.deleteIfExists
import net.corda.core.internal.deleteRecursively
import net.corda.core.internal.div
import net.corda.core.internal.inputStream
import net.corda.core.internal.readFully
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationDefaults import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.internal.checkpointSerialize import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.node.internal.NodeStartup
import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.FlowStart import net.corda.node.services.statemachine.FlowStart
@ -15,11 +28,16 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity import net.corda.testing.core.TestIdentity
import net.corda.testing.node.MockServices import net.corda.testing.node.MockServices
import net.corda.testing.node.TestClock
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Rule import org.junit.Rule
import org.junit.Test import org.junit.Test
import java.nio.file.Files
import java.nio.file.Paths import java.nio.file.Paths
import java.time.Clock
import java.time.Instant
import java.util.zip.ZipInputStream
class CheckpointDumperTest { class CheckpointDumperTest {
@ -27,9 +45,15 @@ class CheckpointDumperTest {
@JvmField @JvmField
val testSerialization = SerializationEnvironmentRule() val testSerialization = SerializationEnvironmentRule()
private val myself = TestIdentity(CordaX500Name("Me", "London", "GB")) private val organisation = "MeTheMyself"
private val myself = TestIdentity(CordaX500Name(organisation, "London", "GB"))
private val currentTimestamp = Instant.parse("2019-12-25T10:15:30.00Z")
private val baseDirectory = Files.createTempDirectory("CheckpointDumperTest")
private val file = baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME /
"checkpoints_dump-${CheckpointDumper.TIME_FORMATTER.format(currentTimestamp)}.zip"
private lateinit var database: CordaPersistence private lateinit var database: CordaPersistence
private lateinit var services: MockServices private lateinit var services: ServiceHub
private lateinit var checkpointStorage: DBCheckpointStorage private lateinit var checkpointStorage: DBCheckpointStorage
@Before @Before
@ -41,18 +65,29 @@ class CheckpointDumperTest {
moreKeys = emptySet() moreKeys = emptySet()
) )
database = db database = db
services = mockServices services = object : ServiceHub by mockServices {
// Set fixed point in time
override val clock: Clock
get() {
return TestClock(mock<Clock>().also {
doReturn(currentTimestamp).whenever(it).instant()
})
}
}
newCheckpointStorage() newCheckpointStorage()
file.parent.createDirectories()
file.deleteIfExists()
} }
@After @After
fun cleanUp() { fun cleanUp() {
database.close() database.close()
baseDirectory.deleteRecursively()
} }
@Test @Test
fun testDumpCheckpoints() { fun testDumpCheckpoints() {
val dumper = CheckpointDumper(checkpointStorage, database, services, Paths.get(".")) val dumper = CheckpointDumper(checkpointStorage, database, services, baseDirectory)
dumper.start(emptyList()) dumper.start(emptyList())
// add a checkpoint // add a checkpoint
@ -62,7 +97,16 @@ class CheckpointDumperTest {
} }
dumper.dump() dumper.dump()
// check existence of output zip file: checkpoints_dump-<data>.zip checkDumpFile()
}
private fun checkDumpFile() {
ZipInputStream(file.inputStream()).use { zip ->
val entry = zip.nextEntry
assertThat(entry.name, containsSubstring("json"))
val content = zip.readFully()
assertThat(String(content), containsSubstring(organisation))
}
} }
// This test will only succeed when the VM startup includes the "checkpoint-agent": // This test will only succeed when the VM startup includes the "checkpoint-agent":