diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt new file mode 100644 index 0000000000..103d75dfda --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/DumpCheckpointsTest.kt @@ -0,0 +1,98 @@ +package net.corda.node.services.rpc + +import co.paralleluniverse.fibers.Suspendable +import com.natpryce.hamkrest.assertion.assertThat +import com.natpryce.hamkrest.containsSubstring +import net.corda.client.rpc.CordaRPCClient +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC +import net.corda.core.internal.createDirectories +import net.corda.core.internal.div +import net.corda.core.internal.inputStream +import net.corda.core.internal.isRegularFile +import net.corda.core.internal.list +import net.corda.core.internal.messaging.InternalCordaRPCOps +import net.corda.core.internal.readFully +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.getOrThrow +import net.corda.node.internal.NodeStartup +import net.corda.node.services.Permissions +import net.corda.node.services.statemachine.CountUpDownLatch +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.driver +import net.corda.testing.node.User +import net.corda.testing.node.internal.enclosedCordapp +import org.junit.Test +import java.nio.file.Path +import java.util.concurrent.CountDownLatch +import java.util.zip.ZipInputStream +import kotlin.test.assertEquals + +class DumpCheckpointsTest { + + companion object { + private val dumpCheckPointLatch = CountDownLatch(1) + private val flowProceedLatch = CountUpDownLatch(1) + } + + @Test + fun `verify checkpoint dump via RPC`() { + val user = User("mark", "dadada", setOf(Permissions.all())) + driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) { + + val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() + + CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use { + val proxy = it.proxy as InternalCordaRPCOps + + // 1 for GetNumberOfCheckpointsFlow itself + val checkPointCountFuture = proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue + + val logDirPath = nodeAHandle.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME + logDirPath.createDirectories() + dumpCheckPointLatch.await() + proxy.dumpCheckpoints() + + flowProceedLatch.countDown() + assertEquals(1, checkPointCountFuture.get()) + checkDumpFile(logDirPath) + } + } + } + + private fun checkDumpFile(dir: Path) { + // The directory supposed to contain a single ZIP file + val file = dir.list().single { it.isRegularFile() } + + ZipInputStream(file.inputStream()).use { zip -> + val entry = zip.nextEntry + assertThat(entry.name, containsSubstring("json")) + val content = zip.readFully() + assertThat(String(content), containsSubstring(GetNumberOfCheckpointsFlow::class.java.name)) + } + } + + @StartableByRPC + class GetNumberOfCheckpointsFlow : FlowLogic() { + @Suspendable + override fun call(): Int { + var count = 0 + serviceHub.jdbcSession().prepareStatement("select * from node_checkpoints").use { ps -> + ps.executeQuery().use { rs -> + while (rs.next()) { + count++ + } + } + } + syncUp() + return count + } + + @Suspendable + private fun syncUp() { + dumpCheckPointLatch.countDown() + flowProceedLatch.await() + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt index 5a2ef80d56..870c649c77 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumper.kt @@ -58,7 +58,7 @@ import java.util.zip.ZipOutputStream class CheckpointDumper(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence, private val serviceHub: ServiceHub, val baseDirectory: Path) { companion object { - private val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(UTC) + internal val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(UTC) private val log = contextLogger() } diff --git a/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperTest.kt b/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperTest.kt index d0767b627f..37fb63e857 100644 --- a/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperTest.kt @@ -1,12 +1,25 @@ package net.corda.node.services.rpc +import com.natpryce.hamkrest.assertion.assertThat +import com.natpryce.hamkrest.containsSubstring +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.whenever import net.corda.core.context.InvocationContext import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.createDirectories +import net.corda.core.internal.deleteIfExists +import net.corda.core.internal.deleteRecursively +import net.corda.core.internal.div +import net.corda.core.internal.inputStream +import net.corda.core.internal.readFully +import net.corda.core.node.ServiceHub import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.internal.CheckpointSerializationDefaults import net.corda.core.serialization.internal.checkpointSerialize +import net.corda.node.internal.NodeStartup import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.FlowStart @@ -15,11 +28,16 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.TestIdentity import net.corda.testing.node.MockServices +import net.corda.testing.node.TestClock import org.junit.After import org.junit.Before import org.junit.Rule import org.junit.Test +import java.nio.file.Files import java.nio.file.Paths +import java.time.Clock +import java.time.Instant +import java.util.zip.ZipInputStream class CheckpointDumperTest { @@ -27,9 +45,15 @@ class CheckpointDumperTest { @JvmField val testSerialization = SerializationEnvironmentRule() - private val myself = TestIdentity(CordaX500Name("Me", "London", "GB")) + private val organisation = "MeTheMyself" + private val myself = TestIdentity(CordaX500Name(organisation, "London", "GB")) + private val currentTimestamp = Instant.parse("2019-12-25T10:15:30.00Z") + private val baseDirectory = Files.createTempDirectory("CheckpointDumperTest") + private val file = baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / + "checkpoints_dump-${CheckpointDumper.TIME_FORMATTER.format(currentTimestamp)}.zip" + private lateinit var database: CordaPersistence - private lateinit var services: MockServices + private lateinit var services: ServiceHub private lateinit var checkpointStorage: DBCheckpointStorage @Before @@ -41,18 +65,29 @@ class CheckpointDumperTest { moreKeys = emptySet() ) database = db - services = mockServices + services = object : ServiceHub by mockServices { + // Set fixed point in time + override val clock: Clock + get() { + return TestClock(mock().also { + doReturn(currentTimestamp).whenever(it).instant() + }) + } + } newCheckpointStorage() + file.parent.createDirectories() + file.deleteIfExists() } @After fun cleanUp() { database.close() + baseDirectory.deleteRecursively() } @Test fun testDumpCheckpoints() { - val dumper = CheckpointDumper(checkpointStorage, database, services, Paths.get(".")) + val dumper = CheckpointDumper(checkpointStorage, database, services, baseDirectory) dumper.start(emptyList()) // add a checkpoint @@ -62,7 +97,16 @@ class CheckpointDumperTest { } dumper.dump() - // check existence of output zip file: checkpoints_dump-.zip + checkDumpFile() + } + + private fun checkDumpFile() { + ZipInputStream(file.inputStream()).use { zip -> + val entry = zip.nextEntry + assertThat(entry.name, containsSubstring("json")) + val content = zip.readFully() + assertThat(String(content), containsSubstring(organisation)) + } } // This test will only succeed when the VM startup includes the "checkpoint-agent":