mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
Merge branch 'release/os/4.6' into nnagy-os-4.6-os-4.7-20200814
# Conflicts: # client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt
This commit is contained in:
commit
c9b2fa11cd
@ -61,8 +61,8 @@ buildscript {
|
||||
|
||||
ext.asm_version = '7.1'
|
||||
ext.artemis_version = '2.6.2'
|
||||
// TODO Upgrade to Jackson 2.10+ only when corda is using kotlin 1.3.10
|
||||
ext.jackson_version = '2.9.8'
|
||||
// TODO Upgrade Jackson only when corda is using kotlin 1.3.10
|
||||
ext.jackson_version = '2.9.7'
|
||||
ext.jetty_version = '9.4.19.v20190610'
|
||||
ext.jersey_version = '2.25'
|
||||
ext.servlet_version = '4.0.1'
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.client.rpcreconnect
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||
import net.corda.client.rpc.CordaRPCClientTest
|
||||
@ -8,10 +9,18 @@ import net.corda.client.rpc.MaxRpcRetryException
|
||||
import net.corda.client.rpc.RPCException
|
||||
import net.corda.client.rpc.UnrecoverableRPCException
|
||||
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.internal.concurrent.doOnComplete
|
||||
import net.corda.core.internal.concurrent.doOnError
|
||||
import net.corda.core.internal.concurrent.thenMatch
|
||||
import net.corda.core.messaging.startFlowWithClientId
|
||||
import net.corda.core.messaging.startTrackedFlow
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.finance.DOLLARS
|
||||
import net.corda.finance.contracts.asset.Cash
|
||||
import net.corda.finance.flows.CashIssueFlow
|
||||
@ -24,16 +33,22 @@ import net.corda.testing.driver.driver
|
||||
import net.corda.testing.driver.internal.incrementalPortAllocation
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.FINANCE_CORDAPPS
|
||||
import net.corda.testing.node.internal.enclosedCordapp
|
||||
import net.corda.testing.node.internal.rpcDriver
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Test
|
||||
import java.lang.IllegalStateException
|
||||
import java.lang.RuntimeException
|
||||
import java.lang.Thread.sleep
|
||||
import java.time.Duration
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import kotlin.concurrent.thread
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertFalse
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
@ -51,10 +66,6 @@ class CordaRPCClientReconnectionTest {
|
||||
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `rpc node start when FlowsDrainingModeEnabled throws RejectedCommandException and won't attempt to reconnect`() {
|
||||
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS)) {
|
||||
@ -373,4 +384,200 @@ class CordaRPCClientReconnectionTest {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `rpc returned flow -started with cient id- result future continue working when the node crashes and restarts`() {
|
||||
driver(DriverParameters(inMemoryDB = false, cordappsForAllNodes = listOf(this.enclosedCordapp()))) {
|
||||
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
|
||||
fun startNode(): NodeHandle {
|
||||
return startNode(
|
||||
providedName = CHARLIE_NAME,
|
||||
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
|
||||
customOverrides = mapOf("rpcSettings.address" to address.toString())
|
||||
).getOrThrow()
|
||||
}
|
||||
|
||||
val node = startNode()
|
||||
val client = CordaRPCClient(node.rpcAddress, config)
|
||||
client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect).use {
|
||||
val rpcOps = it.proxy as ReconnectingCordaRPCOps
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
// assert result reconnectable futures returned work from both 'startFlowWithClientId' and 'reattachFlowWithClientId'
|
||||
val flowHandle0 = rpcOps.startFlowWithClientId(clientId, ::SimpleFlow)
|
||||
val flowHandle1 = rpcOps.reattachFlowWithClientId<Int>(clientId)
|
||||
|
||||
val completedCounter = AtomicInteger(0)
|
||||
flowHandle0.returnValue.doOnComplete {
|
||||
completedCounter.incrementAndGet()
|
||||
}
|
||||
flowHandle1!!.returnValue.doOnComplete {
|
||||
completedCounter.incrementAndGet()
|
||||
}
|
||||
|
||||
flowHandle0.returnValue.thenMatch({
|
||||
completedCounter.incrementAndGet()
|
||||
}, {})
|
||||
flowHandle1.returnValue.thenMatch({
|
||||
completedCounter.incrementAndGet()
|
||||
}, {})
|
||||
|
||||
flowHandle0.returnValue.toCompletableFuture().thenApply {
|
||||
completedCounter.incrementAndGet()
|
||||
}
|
||||
flowHandle1.returnValue.toCompletableFuture().thenApply {
|
||||
completedCounter.incrementAndGet()
|
||||
}
|
||||
|
||||
node.stop()
|
||||
thread {
|
||||
sleep(1000)
|
||||
startNode()
|
||||
}
|
||||
|
||||
var result1: Int? = null
|
||||
thread {
|
||||
result1 = flowHandle1.returnValue.get()
|
||||
}
|
||||
|
||||
val result0 = flowHandle0.returnValue.get()
|
||||
|
||||
sleep(1000)
|
||||
assertEquals(6, completedCounter.get())
|
||||
assertEquals(5, result0!!)
|
||||
assertEquals(5, result1!!)
|
||||
assertThat(rpcOps.reconnectingRPCConnection.isClosed())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `rpc returned flow -started with cient id- exception future continue working when the node crashes and restarts`() {
|
||||
driver(DriverParameters(inMemoryDB = false, cordappsForAllNodes = listOf(this.enclosedCordapp()))) {
|
||||
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
|
||||
fun startNode(): NodeHandle {
|
||||
return startNode(
|
||||
providedName = CHARLIE_NAME,
|
||||
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
|
||||
customOverrides = mapOf("rpcSettings.address" to address.toString())
|
||||
).getOrThrow()
|
||||
}
|
||||
|
||||
val node = startNode()
|
||||
val client = CordaRPCClient(node.rpcAddress, config)
|
||||
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
|
||||
val rpcOps = it.proxy as ReconnectingCordaRPCOps
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
val flowHandle = rpcOps.startFlowWithClientId(clientId, ::ThrowingFlow)
|
||||
|
||||
var erroredCounter = 0
|
||||
flowHandle.returnValue.doOnError {
|
||||
erroredCounter++
|
||||
}
|
||||
|
||||
flowHandle.returnValue.toCompletableFuture().exceptionally {
|
||||
erroredCounter++
|
||||
}
|
||||
|
||||
node.stop()
|
||||
thread {
|
||||
sleep(1000)
|
||||
startNode()
|
||||
}
|
||||
|
||||
assertFailsWith<CordaRuntimeException> {
|
||||
flowHandle.returnValue.getOrThrow()
|
||||
}
|
||||
|
||||
sleep(1000)
|
||||
assertEquals(2, erroredCounter)
|
||||
assertThat(rpcOps.reconnectingRPCConnection.isClosed())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `rpc re attach to flow with client id tries to reconnect when the node is down`() {
|
||||
driver(DriverParameters(inMemoryDB = false, cordappsForAllNodes = listOf(this.enclosedCordapp()))) {
|
||||
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
|
||||
fun startNode(): NodeHandle {
|
||||
return startNode(
|
||||
providedName = CHARLIE_NAME,
|
||||
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
|
||||
customOverrides = mapOf("rpcSettings.address" to address.toString())
|
||||
).getOrThrow()
|
||||
}
|
||||
|
||||
val node = startNode()
|
||||
val client = CordaRPCClient(node.rpcAddress, config)
|
||||
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
|
||||
val rpcOps = it.proxy as ReconnectingCordaRPCOps
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
rpcOps.startFlowWithClientId(clientId, ::SimpleFlow)
|
||||
|
||||
node.stop()
|
||||
thread {
|
||||
sleep(1000)
|
||||
startNode()
|
||||
}
|
||||
val flowHandle = rpcOps.reattachFlowWithClientId<Int>(clientId)
|
||||
|
||||
val result = flowHandle!!.returnValue.get()
|
||||
|
||||
assertEquals(5, result!!)
|
||||
assertThat(rpcOps.reconnectingRPCConnection.isClosed())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `rpc remove client id tries to reconnect when the node is down`() {
|
||||
driver(DriverParameters(inMemoryDB = false, cordappsForAllNodes = listOf(this.enclosedCordapp()))) {
|
||||
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
|
||||
fun startNode(): NodeHandle {
|
||||
return startNode(
|
||||
providedName = CHARLIE_NAME,
|
||||
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
|
||||
customOverrides = mapOf("rpcSettings.address" to address.toString())
|
||||
).getOrThrow()
|
||||
}
|
||||
|
||||
val node = startNode()
|
||||
val client = CordaRPCClient(node.rpcAddress, config)
|
||||
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
|
||||
val rpcOps = it.proxy as ReconnectingCordaRPCOps
|
||||
val clientId = UUID.randomUUID().toString()
|
||||
rpcOps.startFlowWithClientId(clientId, ::SimpleFlow).returnValue.getOrThrow()
|
||||
|
||||
node.stop()
|
||||
thread {
|
||||
sleep(1000)
|
||||
startNode()
|
||||
}
|
||||
val removed = rpcOps.removeClientId(clientId)
|
||||
|
||||
assertTrue(removed)
|
||||
assertThat(rpcOps.reconnectingRPCConnection.isClosed())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class SimpleFlow : FlowLogic<Int>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call(): Int {
|
||||
sleep(10.seconds)
|
||||
return 5
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
class ThrowingFlow : FlowLogic<Unit>() {
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
sleep(10.seconds)
|
||||
throw IllegalStateException("bye")
|
||||
}
|
||||
}
|
||||
}
|
@ -12,4 +12,5 @@ object RPCUtils {
|
||||
fun RPCApi.ClientToServer.RpcRequest.isShutdownCmd() = isShutdownMethodName(methodName)
|
||||
fun Method.isShutdown() = isShutdownMethodName(name)
|
||||
fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
|
||||
fun Method.isStartFlowWithClientId() = name == "startFlowWithClientId" || name == "startFlowDynamicWithClientId"
|
||||
}
|
@ -12,13 +12,19 @@ import net.corda.client.rpc.RPCException
|
||||
import net.corda.client.rpc.UnrecoverableRPCException
|
||||
import net.corda.client.rpc.internal.RPCUtils.isShutdown
|
||||
import net.corda.client.rpc.internal.RPCUtils.isStartFlow
|
||||
import net.corda.client.rpc.internal.RPCUtils.isStartFlowWithClientId
|
||||
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CLOSED
|
||||
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTED
|
||||
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTING
|
||||
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.DIED
|
||||
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.UNCONNECTED
|
||||
import net.corda.client.rpc.reconnect.CouldNotStartFlowException
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.internal.concurrent.OpenFuture
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.concurrent.thenMatch
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.internal.min
|
||||
import net.corda.core.internal.times
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
@ -26,6 +32,8 @@ import net.corda.core.messaging.ClientRpcSslOptions
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.messaging.FlowHandleWithClientId
|
||||
import net.corda.core.messaging.FlowHandleWithClientIdImpl
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
@ -294,8 +302,9 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
}
|
||||
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler {
|
||||
private fun checkIfIsStartFlow(method: Method, e: InvocationTargetException) {
|
||||
if (method.isStartFlow()) {
|
||||
// Don't retry flows
|
||||
if (method.isStartFlow() && !method.isStartFlowWithClientId()) {
|
||||
// Only retry flows that have started with a client id. For such flows alone it is safe to recall them since,
|
||||
// on recalling trying to reconnect they will not start a new flow but re-hook to an existing one ,that matches the client id, instead.
|
||||
throw CouldNotStartFlowException(e.targetException)
|
||||
}
|
||||
}
|
||||
@ -382,10 +391,44 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
}
|
||||
initialFeed.copy(updates = observable)
|
||||
}
|
||||
FlowHandleWithClientId::class.java -> {
|
||||
val initialHandle: FlowHandleWithClientId<Any?> = uncheckedCast(doInvoke(method, args,
|
||||
reconnectingRPCConnection.gracefulReconnect.maxAttempts))
|
||||
|
||||
val initialFuture = initialHandle.returnValue
|
||||
// This is the future that is returned to the client. It will get carried until we reconnect to the node.
|
||||
val returnFuture = openFuture<Any?>()
|
||||
|
||||
tryConnect(initialFuture, returnFuture) {
|
||||
val handle: FlowHandleWithClientId<Any?> = uncheckedCast(doInvoke(method, args, reconnectingRPCConnection.gracefulReconnect.maxAttempts))
|
||||
handle.returnValue
|
||||
}
|
||||
|
||||
return (initialHandle as FlowHandleWithClientIdImpl<Any?>).copy(returnValue = returnFuture)
|
||||
}
|
||||
// TODO - add handlers for Observable return types.
|
||||
else -> doInvoke(method, args, reconnectingRPCConnection.gracefulReconnect.maxAttempts)
|
||||
}
|
||||
}
|
||||
|
||||
private fun tryConnect(currentFuture: CordaFuture<*>, returnFuture: OpenFuture<Any?>, doInvoke: () -> CordaFuture<*>) {
|
||||
currentFuture.thenMatch(
|
||||
success = {
|
||||
returnFuture.set(it)
|
||||
} ,
|
||||
failure = {
|
||||
if (it is ConnectionFailureException) {
|
||||
reconnectingRPCConnection.observersPool.execute {
|
||||
val reconnectedFuture = doInvoke()
|
||||
tryConnect(reconnectedFuture, returnFuture, doInvoke)
|
||||
}
|
||||
} else {
|
||||
returnFuture.setException(it)
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fun close() {
|
||||
@ -393,3 +436,4 @@ class ReconnectingCordaRPCOps private constructor(
|
||||
reconnectingRPCConnection.forceClose()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,26 @@
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
|
||||
class JVMAgentUtilities {
|
||||
|
||||
companion object {
|
||||
@VisibleForTesting
|
||||
@Suppress("NestedBlockDepth")
|
||||
fun parseDebugPort(args: Iterable<String>): Short? {
|
||||
val debugArgumentPrefix = "-agentlib:jdwp="
|
||||
for (arg in args) {
|
||||
if (arg.startsWith(debugArgumentPrefix)) {
|
||||
for (keyValuePair in arg.substring(debugArgumentPrefix.length + 1).split(",")) {
|
||||
val equal = keyValuePair.indexOf('=')
|
||||
if (equal >= 0 && keyValuePair.startsWith("address")) {
|
||||
val portBegin = (keyValuePair.lastIndexOf(':').takeUnless { it < 0 } ?: equal) + 1
|
||||
return keyValuePair.substring(portBegin).toShort()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import org.junit.Assert
|
||||
import org.junit.Test
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.Parameterized
|
||||
|
||||
@RunWith(Parameterized::class)
|
||||
class ParseDebugPortTest(private val args: Iterable<String>,
|
||||
private val expectedPort: Short?,
|
||||
@Suppress("unused_parameter") description : String) {
|
||||
|
||||
companion object {
|
||||
@JvmStatic
|
||||
@Parameterized.Parameters(name = "{2}")
|
||||
fun load() = arrayOf(
|
||||
arrayOf(emptyList<String>(), null, "No arguments"),
|
||||
arrayOf(listOf("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=1234"), 1234.toShort(), "Debug argument"),
|
||||
arrayOf(listOf("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=0.0.0.0:7777"), 7777.toShort(), "Debug argument with bind address"),
|
||||
arrayOf(listOf("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y"), null, "Debug argument without port"),
|
||||
arrayOf(listOf("-version", "-Dmy.jvm.property=someValue"), null, "Unrelated arguments"),
|
||||
arrayOf(listOf("-Dcapsule.jvm.args=\"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=4321",
|
||||
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=1234"), 1234.toShort(), "Debug argument and capsule arguments")
|
||||
)
|
||||
}
|
||||
|
||||
@Test(timeout = 10_000)
|
||||
fun test() {
|
||||
val port = JVMAgentUtilities.parseDebugPort(args)
|
||||
Assert.assertEquals(expectedPort, port)
|
||||
}
|
||||
}
|
@ -226,9 +226,6 @@ dependencies {
|
||||
// Adding native SSL library to allow using native SSL with Artemis and AMQP
|
||||
compile "io.netty:netty-tcnative-boringssl-static:$tcnative_version"
|
||||
|
||||
// Required by JVMAgentUtil (x-compatible java 8 & 11 agent lookup mechanism)
|
||||
compile files("${System.properties['java.home']}/../lib/tools.jar")
|
||||
|
||||
// Byteman for runtime (termination) rules injection on the running node
|
||||
// Submission tool allowing to install rules on running nodes
|
||||
slowIntegrationTestCompile "org.jboss.byteman:byteman-submit:4.0.11"
|
||||
|
@ -33,8 +33,8 @@ public class CordaCaplet extends Capsule {
|
||||
|
||||
private Config parseConfigFile(List<String> args) {
|
||||
this.baseDir = getBaseDirectory(args);
|
||||
String config = getOption(args, "--config-file");
|
||||
File configFile = (config == null) ? new File(baseDir, "node.conf") : new File(config);
|
||||
|
||||
File configFile = getConfigFile(args, baseDir);
|
||||
try {
|
||||
ConfigParseOptions parseOptions = ConfigParseOptions.defaults().setAllowMissing(false);
|
||||
Config defaultConfig = ConfigFactory.parseResources("corda-reference.conf", parseOptions);
|
||||
|
@ -14,6 +14,7 @@ import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.NodeParameters
|
||||
import net.corda.testing.node.internal.internalDriver
|
||||
import org.junit.Assume.assumeFalse
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNotNull
|
||||
@ -133,6 +134,7 @@ open class SignatureConstraintMigrationFromHashConstraintsTests : SignatureConst
|
||||
assertTrue(consumingTransaction.outputs.single().constraint is HashAttachmentConstraint)
|
||||
}
|
||||
|
||||
@Ignore("ENT-5676: Disabling to isolate Gradle process death cause")
|
||||
@Test(timeout=300_000)
|
||||
fun `HashConstraint cannot be migrated to SignatureConstraint if a HashConstraint is specified for one state and another uses an AutomaticPlaceholderConstraint`() {
|
||||
assumeFalse(System.getProperty("os.name").toLowerCase().startsWith("win")) // See NodeStatePersistenceTests.kt.
|
||||
|
@ -34,6 +34,7 @@ import net.corda.testing.node.internal.FINANCE_CORDAPPS
|
||||
import net.corda.testing.node.internal.enclosedCordapp
|
||||
import org.junit.Test
|
||||
import java.sql.SQLTransientConnectionException
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
@ -43,6 +44,7 @@ import kotlin.test.assertTrue
|
||||
class FlowReloadAfterCheckpointTest {
|
||||
|
||||
private companion object {
|
||||
private val DEFAULT_TIMEOUT = Duration.ofSeconds(10)
|
||||
val cordapps = listOf(enclosedCordapp())
|
||||
}
|
||||
|
||||
@ -98,15 +100,13 @@ class FlowReloadAfterCheckpointTest {
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true and be kept for observation due to failed deserialization`() {
|
||||
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
|
||||
val observations = QueueWithCountdown<StateMachineRunId>(1)
|
||||
val reloads = QueueWithCountdown<StateMachineRunId>(8)
|
||||
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
|
||||
reloads.add(id)
|
||||
}
|
||||
lateinit var flowKeptForObservation: StateMachineRunId
|
||||
val lock = CountDownLatch(1)
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { id, _ ->
|
||||
flowKeptForObservation = id
|
||||
lock.countDown()
|
||||
observations.add(id)
|
||||
}
|
||||
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = cordapps)) {
|
||||
|
||||
@ -121,9 +121,12 @@ class FlowReloadAfterCheckpointTest {
|
||||
.getOrThrow()
|
||||
|
||||
val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), true, false, false)
|
||||
val flowStartedByAlice = handle.id
|
||||
lock.await()
|
||||
assertEquals(flowStartedByAlice, flowKeptForObservation)
|
||||
val flowStartedByAlice: StateMachineRunId = handle.id
|
||||
|
||||
// We can't wait on the flow ending, because it breaks, so we need to wait on internal status changes instead
|
||||
observations.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
|
||||
reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
|
||||
assertEquals(flowStartedByAlice, observations.singleOrNull())
|
||||
assertEquals(4, reloads.filter { it == flowStartedByAlice }.count())
|
||||
assertEquals(4, reloads.filter { it == ReloadFromCheckpointResponder.flowId }.count())
|
||||
}
|
||||
@ -131,7 +134,7 @@ class FlowReloadAfterCheckpointTest {
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `flow will reload from a previous checkpoint after calling suspending function and skipping the persisting the current checkpoint when reloadCheckpointAfterSuspend is true`() {
|
||||
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
|
||||
val reloads = QueueWithCountdown<StateMachineRunId>(11)
|
||||
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
|
||||
reloads.add(id)
|
||||
}
|
||||
@ -150,6 +153,7 @@ class FlowReloadAfterCheckpointTest {
|
||||
val handle = alice.rpc.startFlow(::ReloadFromCheckpointFlow, bob.nodeInfo.singleIdentity(), false, false, true)
|
||||
val flowStartedByAlice = handle.id
|
||||
handle.returnValue.getOrThrow()
|
||||
reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
|
||||
assertEquals(5, reloads.filter { it == flowStartedByAlice }.count())
|
||||
assertEquals(6, reloads.filter { it == ReloadFromCheckpointResponder.flowId }.count())
|
||||
}
|
||||
@ -224,13 +228,11 @@ class FlowReloadAfterCheckpointTest {
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() {
|
||||
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
|
||||
val reloads = QueueWithCountdown<StateMachineRunId>(5)
|
||||
val firstLatch = CountDownLatch(2)
|
||||
val secondLatch = CountDownLatch(5)
|
||||
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { runId ->
|
||||
reloads.add(runId)
|
||||
firstLatch.countDown()
|
||||
secondLatch.countDown()
|
||||
}
|
||||
driver(
|
||||
DriverParameters(
|
||||
@ -257,7 +259,7 @@ class FlowReloadAfterCheckpointTest {
|
||||
customOverrides = mapOf(NodeConfiguration::reloadCheckpointAfterSuspend.name to true)
|
||||
).getOrThrow()
|
||||
|
||||
assertTrue { secondLatch.await(20, TimeUnit.SECONDS) }
|
||||
reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
|
||||
assertEquals(5, reloads.size)
|
||||
}
|
||||
}
|
||||
@ -266,11 +268,9 @@ class FlowReloadAfterCheckpointTest {
|
||||
fun `idempotent flow continues reloading from checkpoints after node restart when reloadCheckpointAfterSuspend is true`() {
|
||||
// restarts completely from the beginning and forgets the in-memory reload count therefore
|
||||
// it reloads an extra 2 times for checkpoints it had already reloaded before the node shutdown
|
||||
val reloadsExpected = CountDownLatch(7)
|
||||
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
|
||||
val reloads = QueueWithCountdown<StateMachineRunId>(7)
|
||||
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { runId ->
|
||||
reloads.add(runId)
|
||||
reloadsExpected.countDown()
|
||||
}
|
||||
driver(
|
||||
DriverParameters(
|
||||
@ -298,14 +298,14 @@ class FlowReloadAfterCheckpointTest {
|
||||
|
||||
// restarts completely from the beginning and forgets the in-memory reload count therefore
|
||||
// it reloads an extra 2 times for checkpoints it had already reloaded before the node shutdown
|
||||
assertTrue { reloadsExpected.await(20, TimeUnit.SECONDS) }
|
||||
reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
|
||||
assertEquals(7, reloads.size)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `more complicated flow will reload from its checkpoint after suspending when reloadCheckpointAfterSuspend is true`() {
|
||||
val reloads = ConcurrentLinkedQueue<StateMachineRunId>()
|
||||
val reloads = QueueWithCountdown<StateMachineRunId>(13)
|
||||
FlowStateMachineImpl.onReloadFlowFromCheckpoint = { id ->
|
||||
reloads.add(id)
|
||||
}
|
||||
@ -335,7 +335,7 @@ class FlowReloadAfterCheckpointTest {
|
||||
.map(StateMachineTransactionMapping::stateMachineRunId)
|
||||
.toSet()
|
||||
.single()
|
||||
Thread.sleep(10.seconds.toMillis())
|
||||
reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
|
||||
assertEquals(7, reloads.filter { it == flowStartedByAlice }.size)
|
||||
assertEquals(6, reloads.filter { it == flowStartedByBob }.size)
|
||||
}
|
||||
@ -523,4 +523,5 @@ class FlowReloadAfterCheckpointTest {
|
||||
|
||||
internal class BrokenMap<K, V>(delegate: MutableMap<K, V> = mutableMapOf()) : MutableMap<K, V> by delegate {
|
||||
override fun put(key: K, value: V): V? = throw IllegalStateException("Broken on purpose")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -185,7 +185,9 @@ class FlowSessionCloseTest {
|
||||
}
|
||||
|
||||
session.send(responderReaction)
|
||||
sleep(1.seconds)
|
||||
|
||||
// Give time to the other flow to receive the message, close its session and send the end session message back
|
||||
sleep(5.seconds)
|
||||
|
||||
if (accessClosedSessionWithApi != null) {
|
||||
when(accessClosedSessionWithApi) {
|
||||
@ -291,4 +293,4 @@ class FlowSessionCloseTest {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,28 @@
|
||||
package net.corda.node.flows
|
||||
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* Helper class for waiting until another thread has put a set number of objects
|
||||
* into a queue.
|
||||
*/
|
||||
internal class QueueWithCountdown<E> private constructor(
|
||||
count: Int = 0,
|
||||
private val queue: ConcurrentLinkedQueue<E>
|
||||
) : Collection<E> by queue {
|
||||
|
||||
constructor(count: Int = 0) : this(count, ConcurrentLinkedQueue<E>())
|
||||
|
||||
private val latch: CountDownLatch = CountDownLatch(count)
|
||||
|
||||
fun add(element: E) {
|
||||
queue.add(element)
|
||||
latch.countDown()
|
||||
}
|
||||
|
||||
fun await() = latch.await()
|
||||
|
||||
fun await(timeout: Long, unit: TimeUnit) = latch.await(timeout, unit)
|
||||
}
|
@ -31,13 +31,10 @@ import org.junit.Assert.assertThat
|
||||
import org.junit.Before
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.Parameterized
|
||||
import java.net.URL
|
||||
import java.time.Instant
|
||||
|
||||
@RunWith(Parameterized::class)
|
||||
class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneParams) {
|
||||
class NetworkMapTest {
|
||||
@Rule
|
||||
@JvmField
|
||||
val testSerialization = SerializationEnvironmentRule(true)
|
||||
@ -48,40 +45,18 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
private lateinit var networkMapServer: NetworkMapServer
|
||||
private lateinit var compatibilityZone: CompatibilityZoneParams
|
||||
|
||||
companion object {
|
||||
@JvmStatic
|
||||
@Parameterized.Parameters(name = "{0}")
|
||||
fun runParams() = listOf(
|
||||
{
|
||||
addr: URL,
|
||||
nms: NetworkMapServer -> SharedCompatibilityZoneParams(
|
||||
addr,
|
||||
pnm = null,
|
||||
publishNotaries = {
|
||||
nms.networkParameters = testNetworkParameters(it, modifiedTime = Instant.ofEpochMilli(random63BitValue()), epoch = 2)
|
||||
}
|
||||
)
|
||||
},
|
||||
{
|
||||
addr: URL,
|
||||
nms: NetworkMapServer -> SplitCompatibilityZoneParams (
|
||||
doormanURL = URL("http://I/Don't/Exist"),
|
||||
networkMapURL = addr,
|
||||
pnm = null,
|
||||
publishNotaries = {
|
||||
nms.networkParameters = testNetworkParameters(it, modifiedTime = Instant.ofEpochMilli(random63BitValue()), epoch = 2)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
)
|
||||
}
|
||||
|
||||
@Before
|
||||
fun start() {
|
||||
networkMapServer = NetworkMapServer(cacheTimeout, portAllocation.nextHostAndPort())
|
||||
val address = networkMapServer.start()
|
||||
compatibilityZone = initFunc(URL("http://$address"), networkMapServer)
|
||||
compatibilityZone = SplitCompatibilityZoneParams(
|
||||
doormanURL = URL("https://example.org/does/not/exist"),
|
||||
networkMapURL = URL("http://$address"),
|
||||
pnm = null,
|
||||
publishNotaries = {
|
||||
networkMapServer.networkParameters = testNetworkParameters(it, modifiedTime = Instant.ofEpochMilli(random63BitValue()), epoch = 2)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@After
|
||||
@ -89,8 +64,8 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
networkMapServer.close()
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `parameters update test`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `parameters update test`() {
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
@ -211,27 +186,6 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `Can not hotload parameters if notary and a non-hotloadable parameter changes and the node will shut down`() {
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
notarySpecs = emptyList(),
|
||||
allowHibernateToManageAppSchema = false
|
||||
) {
|
||||
|
||||
val oldParams = networkMapServer.networkParameters
|
||||
val notary: Party = TestIdentity.fresh("test notary").party
|
||||
val paramsWithUpdatedMaxMessageSizeAndNotary = oldParams.copy(
|
||||
epoch = 3,
|
||||
modifiedTime = Instant.ofEpochMilli(random63BitValue()),
|
||||
maxMessageSize = oldParams.maxMessageSize + 1).addNotary(notary)
|
||||
startNodeAndRunFlagDay(paramsWithUpdatedMaxMessageSizeAndNotary).use { alice ->
|
||||
eventually { assertThatThrownBy { alice.rpc.networkParameters }.hasMessageContaining("Connection failure detected") }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun DriverDSLImpl.startNodeAndRunFlagDay(newParams: NetworkParameters): NodeHandleInternal {
|
||||
|
||||
val alice = startNode(providedName = ALICE_NAME, devMode = false).getOrThrow() as NodeHandleInternal
|
||||
@ -247,8 +201,8 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
return alice
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `nodes process additions and removals from the network map correctly (and also download the network parameters)`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `nodes process additions and removals from the network map correctly (and also download the network parameters)`() {
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
@ -277,8 +231,8 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `test node heartbeat`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `test node heartbeat`() {
|
||||
internalDriver(
|
||||
portAllocation = portAllocation,
|
||||
compatibilityZone = compatibilityZone,
|
||||
@ -322,7 +276,8 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
|
||||
val nodeInfosDir = baseDirectory / NODE_INFO_DIRECTORY
|
||||
if (nodeInfosDir.exists()) {
|
||||
assertThat(nodeInfosDir.list().size, `is`(1))
|
||||
assertThat(nodeInfosDir.list().single().readObject<SignedNodeInfo>().verified().legalIdentities.first(), `is`( this.nodeInfo.legalIdentities.first()))
|
||||
assertThat(nodeInfosDir.list().single().readObject<SignedNodeInfo>()
|
||||
.verified().legalIdentities.first(), `is`(this.nodeInfo.legalIdentities.first()))
|
||||
}
|
||||
assertThat(rpc.networkMapSnapshot()).containsOnly(*nodes)
|
||||
}
|
||||
|
@ -174,6 +174,13 @@ open class NodeCmdLineOptions : SharedNodeCmdLineOptions() {
|
||||
)
|
||||
var networkRootTrustStorePassword: String? = null
|
||||
|
||||
@Option(
|
||||
names = ["-s", "--skip-schema-creation"],
|
||||
description = ["DEPRECATED. Prevent database migration scripts to run during initial node registration."],
|
||||
hidden = true
|
||||
)
|
||||
var skipSchemaCreation: Boolean = false
|
||||
|
||||
override fun parseConfiguration(configuration: Config): Valid<NodeConfiguration> {
|
||||
return super.parseConfiguration(configuration).doIfValid { config ->
|
||||
if (isRegistration) {
|
||||
|
@ -28,8 +28,8 @@ import net.corda.node.internal.subcommands.ValidateConfigurationCli.Companion.lo
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.shouldStartLocalShell
|
||||
import net.corda.node.services.config.shouldStartSSHDaemon
|
||||
import net.corda.node.utilities.JVMAgentUtil.getJvmAgentProperties
|
||||
import net.corda.node.utilities.registration.NodeRegistrationException
|
||||
import net.corda.nodeapi.internal.JVMAgentUtilities
|
||||
import net.corda.nodeapi.internal.addShutdownHook
|
||||
import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException
|
||||
import net.corda.nodeapi.internal.persistence.DatabaseIncompatibleException
|
||||
@ -120,6 +120,7 @@ open class NodeStartupCli : CordaCliWrapper("corda", "Runs a Corda Node") {
|
||||
requireNotNull(cmdLineOptions.networkRootTrustStorePassword) { "Network root trust store password must be provided in registration mode using --network-root-truststore-password." }
|
||||
initialRegistrationCli.networkRootTrustStorePassword = cmdLineOptions.networkRootTrustStorePassword!!
|
||||
initialRegistrationCli.networkRootTrustStorePathParameter = cmdLineOptions.networkRootTrustStorePathParameter
|
||||
initialRegistrationCli.skipSchemaCreation = cmdLineOptions.skipSchemaCreation
|
||||
initialRegistrationCli.cmdLineOptions.copyFrom(cmdLineOptions)
|
||||
initialRegistrationCli.runProgram()
|
||||
}
|
||||
@ -153,6 +154,8 @@ open class NodeStartup : NodeStartupLogging {
|
||||
const val LOGS_DIRECTORY_NAME = "logs"
|
||||
const val LOGS_CAN_BE_FOUND_IN_STRING = "Logs can be found in"
|
||||
const val ERROR_CODE_RESOURCE_LOCATION = "error-codes"
|
||||
|
||||
|
||||
}
|
||||
|
||||
lateinit var cmdLineOptions: SharedNodeCmdLineOptions
|
||||
@ -269,10 +272,10 @@ open class NodeStartup : NodeStartupLogging {
|
||||
logger.info("VM ${info.vmName} ${info.vmVendor} ${info.vmVersion}")
|
||||
logger.info("Machine: ${lookupMachineNameAndMaybeWarn()}")
|
||||
logger.info("Working Directory: ${cmdLineOptions.baseDirectory}")
|
||||
val agentProperties = getJvmAgentProperties(logger)
|
||||
if (agentProperties.containsKey("sun.jdwp.listenerAddress")) {
|
||||
logger.info("Debug port: ${agentProperties.getProperty("sun.jdwp.listenerAddress")}")
|
||||
JVMAgentUtilities.parseDebugPort(info.inputArguments) ?.let {
|
||||
logger.info("Debug port: $it")
|
||||
}
|
||||
|
||||
var nodeStartedMessage = "Starting as node on ${conf.p2pAddress}"
|
||||
if (conf.extraNetworkMapKeys.isNotEmpty()) {
|
||||
nodeStartedMessage = "$nodeStartedMessage with additional Network Map keys ${conf.extraNetworkMapKeys.joinToString(prefix = "[", postfix = "]", separator = ", ")}"
|
||||
|
@ -25,9 +25,12 @@ class InitialRegistrationCli(val startup: NodeStartup): CliWrapperBase("initial-
|
||||
@Option(names = ["-p", "--network-root-truststore-password"], description = ["Network root trust store password obtained from network operator."], required = true)
|
||||
var networkRootTrustStorePassword: String = ""
|
||||
|
||||
@Option(names = ["-s", "--skip-schema-creation"], description = ["Prevent database migration scripts to run during initial node registration "], required = false)
|
||||
var skipSchemaCreation: Boolean = false
|
||||
|
||||
override fun runProgram() : Int {
|
||||
val networkRootTrustStorePath: Path = networkRootTrustStorePathParameter ?: cmdLineOptions.baseDirectory / "certificates" / "network-root-truststore.jks"
|
||||
return startup.initialiseAndRun(cmdLineOptions, InitialRegistration(cmdLineOptions.baseDirectory, networkRootTrustStorePath, networkRootTrustStorePassword, startup))
|
||||
return startup.initialiseAndRun(cmdLineOptions, InitialRegistration(cmdLineOptions.baseDirectory, networkRootTrustStorePath, networkRootTrustStorePassword, skipSchemaCreation, startup))
|
||||
}
|
||||
|
||||
override fun initLogging(): Boolean = this.initLogging(cmdLineOptions.baseDirectory)
|
||||
@ -36,7 +39,8 @@ class InitialRegistrationCli(val startup: NodeStartup): CliWrapperBase("initial-
|
||||
val cmdLineOptions = InitialRegistrationCmdLineOptions()
|
||||
}
|
||||
|
||||
class InitialRegistration(val baseDirectory: Path, private val networkRootTrustStorePath: Path, networkRootTrustStorePassword: String, private val startup: NodeStartup) : RunAfterNodeInitialisation, NodeStartupLogging {
|
||||
class InitialRegistration(val baseDirectory: Path, private val networkRootTrustStorePath: Path, networkRootTrustStorePassword: String,
|
||||
private val skipSchemaMigration: Boolean, private val startup: NodeStartup) : RunAfterNodeInitialisation, NodeStartupLogging {
|
||||
companion object {
|
||||
private const val INITIAL_REGISTRATION_MARKER = ".initialregistration"
|
||||
|
||||
@ -76,7 +80,11 @@ class InitialRegistration(val baseDirectory: Path, private val networkRootTrustS
|
||||
|
||||
// Minimal changes to make registration tool create node identity.
|
||||
// TODO: Move node identity generation logic from node to registration helper.
|
||||
startup.createNode(conf, versionInfo).generateAndSaveNodeInfo()
|
||||
val node = startup.createNode(conf, versionInfo)
|
||||
if(!skipSchemaMigration) {
|
||||
node.runDatabaseMigrationScripts(updateCoreSchemas = true, updateAppSchemas = true, updateAppSchemasWithCheckpoints = false)
|
||||
}
|
||||
node.generateAndSaveNodeInfo()
|
||||
|
||||
println("Successfully registered Corda node with compatibility zone, node identity keys and certificates are stored in '${conf.certificatesDirectory}', it is advised to backup the private keys and certificates.")
|
||||
println("Corda node will now terminate.")
|
||||
|
@ -33,7 +33,6 @@ import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.node.AppServiceHub.Companion.SERVICE_PRIORITY_NORMAL
|
||||
import net.corda.core.internal.FlowAsyncOperation
|
||||
import net.corda.core.internal.FlowIORequest
|
||||
import net.corda.core.internal.WaitForStateConsumption
|
||||
@ -43,6 +42,7 @@ import net.corda.core.internal.exists
|
||||
import net.corda.core.internal.objectOrNewInstance
|
||||
import net.corda.core.internal.outputStream
|
||||
import net.corda.core.internal.uncheckedCast
|
||||
import net.corda.core.node.AppServiceHub.Companion.SERVICE_PRIORITY_NORMAL
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
@ -54,9 +54,6 @@ import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.Try
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent
|
||||
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver
|
||||
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver.Companion.reportSuccess
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.statemachine.Checkpoint
|
||||
@ -68,7 +65,9 @@ import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.services.statemachine.SessionId
|
||||
import net.corda.node.services.statemachine.SessionState
|
||||
import net.corda.node.services.statemachine.SubFlow
|
||||
import net.corda.node.utilities.JVMAgentUtil.getJvmAgentProperties
|
||||
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent
|
||||
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver
|
||||
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver.Companion.reportSuccess
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
|
||||
import net.corda.serialization.internal.withTokenContext
|
||||
@ -77,21 +76,24 @@ import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.time.ZoneOffset.UTC
|
||||
import java.time.format.DateTimeFormatter
|
||||
import java.util.UUID
|
||||
import java.util.*
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.zip.ZipEntry
|
||||
import java.util.zip.ZipOutputStream
|
||||
import kotlin.reflect.KProperty1
|
||||
import kotlin.reflect.full.companionObject
|
||||
import kotlin.reflect.full.memberProperties
|
||||
|
||||
class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, private val database: CordaPersistence,
|
||||
private val serviceHub: ServiceHub, val baseDirectory: Path) : NodeLifecycleObserver {
|
||||
private val serviceHub: ServiceHub, val baseDirectory: Path) : NodeLifecycleObserver {
|
||||
companion object {
|
||||
internal val TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss").withZone(UTC)
|
||||
private val log = contextLogger()
|
||||
private val DUMPABLE_CHECKPOINTS = setOf(
|
||||
Checkpoint.FlowStatus.RUNNABLE,
|
||||
Checkpoint.FlowStatus.HOSPITALIZED,
|
||||
Checkpoint.FlowStatus.PAUSED
|
||||
Checkpoint.FlowStatus.RUNNABLE,
|
||||
Checkpoint.FlowStatus.HOSPITALIZED,
|
||||
Checkpoint.FlowStatus.PAUSED
|
||||
)
|
||||
}
|
||||
|
||||
@ -102,12 +104,10 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
|
||||
private lateinit var checkpointSerializationContext: CheckpointSerializationContext
|
||||
private lateinit var writer: ObjectWriter
|
||||
|
||||
private val isCheckpointAgentRunning by lazy {
|
||||
checkpointAgentRunning()
|
||||
}
|
||||
private val isCheckpointAgentRunning by lazy(::checkpointAgentRunning)
|
||||
|
||||
override fun update(nodeLifecycleEvent: NodeLifecycleEvent): Try<String> {
|
||||
return when(nodeLifecycleEvent) {
|
||||
return when (nodeLifecycleEvent) {
|
||||
is NodeLifecycleEvent.AfterNodeStart<*> -> Try.on {
|
||||
checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
|
||||
CheckpointSerializeAsTokenContextImpl(
|
||||
@ -190,13 +190,20 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
|
||||
}
|
||||
}
|
||||
|
||||
private fun checkpointAgentRunning(): Boolean {
|
||||
val agentProperties = getJvmAgentProperties(log)
|
||||
val pattern = "(.+)?checkpoint-agent(-.+)?\\.jar.*".toRegex()
|
||||
return agentProperties.values.any { value ->
|
||||
value is String && value.contains(pattern)
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Note that this method dynamically uses [net.corda.tools.CheckpointAgent.running], make sure to keep it up to date with
|
||||
* the checkpoint agent source code
|
||||
*/
|
||||
private fun checkpointAgentRunning() = try {
|
||||
javaClass.classLoader.loadClass("net.corda.tools.CheckpointAgent").kotlin.companionObject
|
||||
} catch (e: ClassNotFoundException) {
|
||||
null
|
||||
}?.let { cls ->
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
cls.memberProperties.find { it.name == "running"}
|
||||
?.let {it as KProperty1<Any, Boolean>}
|
||||
?.get(cls.objectInstance!!)
|
||||
} ?: false
|
||||
|
||||
private fun Checkpoint.toJson(id: UUID, now: Instant): CheckpointJson {
|
||||
val (fiber, flowLogic) = when (flowState) {
|
||||
@ -402,6 +409,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
|
||||
private interface FlowAsyncOperationMixin {
|
||||
@get:JsonIgnore
|
||||
val serviceHub: ServiceHub
|
||||
|
||||
// [Any] used so this single mixin can serialize [FlowExternalOperation] and [FlowExternalAsyncOperation]
|
||||
@get:JsonUnwrapped
|
||||
val operation: Any
|
||||
|
@ -186,13 +186,6 @@ sealed class Event {
|
||||
override fun toString() = "Pause"
|
||||
}
|
||||
|
||||
/**
|
||||
* Terminate the specified [sessions], removing them from in-memory datastructures.
|
||||
*
|
||||
* @param sessions The sessions to terminate
|
||||
*/
|
||||
data class TerminateSessions(val sessions: Set<SessionId>) : Event()
|
||||
|
||||
/**
|
||||
* Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow,
|
||||
* even if it has not yet been processed and placed on the pending de-duplication handlers list.
|
||||
|
@ -158,16 +158,17 @@ data class Checkpoint(
|
||||
return copy(checkpointState = checkpointState.copy(sessionsToBeClosed = checkpointState.sessionsToBeClosed + sessionIds))
|
||||
}
|
||||
|
||||
fun removeSessionsToBeClosed(sessionIds: Set<SessionId>): Checkpoint {
|
||||
return copy(checkpointState = checkpointState.copy(sessionsToBeClosed = checkpointState.sessionsToBeClosed - sessionIds))
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a copy of the Checkpoint with the specified session removed from the session map.
|
||||
* @param sessionIds the sessions to remove.
|
||||
*/
|
||||
fun removeSessions(sessionIds: Set<SessionId>): Checkpoint {
|
||||
return copy(checkpointState = checkpointState.copy(sessions = checkpointState.sessions - sessionIds))
|
||||
return copy(
|
||||
checkpointState = checkpointState.copy(
|
||||
sessions = checkpointState.sessions - sessionIds,
|
||||
sessionsToBeClosed = checkpointState.sessionsToBeClosed - sessionIds
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -528,10 +528,14 @@ class StartedFlowTransition(
|
||||
}
|
||||
|
||||
private fun scheduleTerminateSessionsIfRequired(transition: TransitionResult): TransitionResult {
|
||||
// If there are sessions to be closed, close them on a following transition
|
||||
// If there are sessions to be closed, close them on the currently executing transition
|
||||
val sessionsToBeTerminated = findSessionsToBeTerminated(transition.newState)
|
||||
return if (sessionsToBeTerminated.isNotEmpty()) {
|
||||
transition.copy(actions = transition.actions + Action.ScheduleEvent(Event.TerminateSessions(sessionsToBeTerminated.keys)))
|
||||
val checkpointWithSessionsRemoved = transition.newState.checkpoint.removeSessions(sessionsToBeTerminated.keys)
|
||||
transition.copy(
|
||||
newState = transition.newState.copy(checkpoint = checkpointWithSessionsRemoved),
|
||||
actions = transition.actions + Action.RemoveSessionBindings(sessionsToBeTerminated.keys)
|
||||
)
|
||||
} else {
|
||||
transition
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.internal.FlowIORequest
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.utilities.Try
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.statemachine.Action
|
||||
import net.corda.node.services.statemachine.Checkpoint
|
||||
@ -37,6 +38,10 @@ class TopLevelTransition(
|
||||
val event: Event
|
||||
) : Transition {
|
||||
|
||||
private companion object {
|
||||
val log = contextLogger()
|
||||
}
|
||||
|
||||
@Suppress("ComplexMethod", "TooGenericExceptionCaught")
|
||||
override fun transition(): TransitionResult {
|
||||
return try {
|
||||
@ -63,11 +68,11 @@ class TopLevelTransition(
|
||||
is Event.OvernightObservation -> overnightObservationTransition()
|
||||
is Event.WakeUpFromSleep -> wakeUpFromSleepTransition()
|
||||
is Event.Pause -> pausedFlowTransition()
|
||||
is Event.TerminateSessions -> terminateSessionsTransition(event)
|
||||
}
|
||||
} catch (t: Throwable) {
|
||||
// All errors coming from the transition should be sent back to the flow
|
||||
// Letting the flow re-enter standard error handling
|
||||
log.error("Error occurred while creating transition for event: $event", t)
|
||||
builder { resumeFlowLogic(t) }
|
||||
}
|
||||
}
|
||||
@ -401,16 +406,4 @@ class TopLevelTransition(
|
||||
FlowContinuation.Abort
|
||||
}
|
||||
}
|
||||
|
||||
private fun terminateSessionsTransition(event: Event.TerminateSessions): TransitionResult {
|
||||
return builder {
|
||||
val sessions = event.sessions
|
||||
val newCheckpoint = currentState.checkpoint
|
||||
.removeSessions(sessions)
|
||||
.removeSessionsToBeClosed(sessions)
|
||||
currentState = currentState.copy(checkpoint = newCheckpoint)
|
||||
actions.add(Action.RemoveSessionBindings(sessions))
|
||||
FlowContinuation.ProcessEvents
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -35,7 +35,6 @@ import java.util.*
|
||||
import javax.persistence.Tuple
|
||||
import javax.persistence.criteria.*
|
||||
|
||||
|
||||
abstract class AbstractQueryCriteriaParser<Q : GenericQueryCriteria<Q,P>, in P: BaseQueryCriteriaParser<Q, P, S>, in S: BaseSort> : BaseQueryCriteriaParser<Q, P, S> {
|
||||
|
||||
abstract val criteriaBuilder: CriteriaBuilder
|
||||
@ -277,6 +276,7 @@ class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractStat
|
||||
val vaultStates: Root<VaultSchemaV1.VaultStates>) : AbstractQueryCriteriaParser<QueryCriteria, IQueryCriteriaParser, Sort>(), IQueryCriteriaParser {
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
private val disableCorda3879 = System.getProperty("net.corda.vault.query.disable.corda3879")?.toBoolean() ?: false
|
||||
}
|
||||
|
||||
// incrementally build list of join predicates
|
||||
@ -550,7 +550,6 @@ class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractStat
|
||||
|
||||
// ensure we re-use any existing instance of the same root entity
|
||||
val vaultLinearStatesRoot = getVaultLinearStatesRoot()
|
||||
|
||||
val joinPredicate = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"),
|
||||
vaultLinearStatesRoot.get<PersistentStateRef>("stateRef"))
|
||||
predicateSet.add(joinPredicate)
|
||||
@ -636,6 +635,7 @@ class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractStat
|
||||
return predicateSet
|
||||
}
|
||||
|
||||
@Suppress("SpreadOperator")
|
||||
override fun parse(criteria: QueryCriteria, sorting: Sort?): Collection<Predicate> {
|
||||
val predicateSet = criteria.visit(this)
|
||||
|
||||
@ -650,12 +650,37 @@ class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractStat
|
||||
else
|
||||
aggregateExpressions
|
||||
criteriaQuery.multiselect(selections)
|
||||
val combinedPredicates = commonPredicates.values.plus(predicateSet).plus(constraintPredicates).plus(joinPredicates)
|
||||
criteriaQuery.where(*combinedPredicates.toTypedArray())
|
||||
val combinedPredicates = commonPredicates.values.plus(predicateSet)
|
||||
.plus(constraintPredicates)
|
||||
.plus(joinPredicates)
|
||||
|
||||
val forceJoinPredicates = joinStateRefPredicate()
|
||||
|
||||
if(forceJoinPredicates.isEmpty() || disableCorda3879) {
|
||||
criteriaQuery.where(*combinedPredicates.toTypedArray())
|
||||
} else {
|
||||
criteriaQuery.where(*combinedPredicates.toTypedArray(), criteriaBuilder.or(*forceJoinPredicates.toTypedArray()))
|
||||
}
|
||||
|
||||
return predicateSet
|
||||
}
|
||||
|
||||
private fun joinStateRefPredicate(): Set<Predicate> {
|
||||
val returnSet = mutableSetOf<Predicate>()
|
||||
|
||||
rootEntities.values.forEach {
|
||||
if (it != vaultStates) {
|
||||
if(IndirectStatePersistable::class.java.isAssignableFrom(it.javaType)) {
|
||||
returnSet.add(criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), it.get<IndirectStatePersistable<*>>("compositeKey").get<PersistentStateRef>("stateRef")))
|
||||
} else {
|
||||
returnSet.add(criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), it.get<PersistentStateRef>("stateRef")))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return returnSet
|
||||
}
|
||||
|
||||
override fun parseCriteria(criteria: CommonQueryCriteria): Collection<Predicate> {
|
||||
log.trace { "Parsing CommonQueryCriteria: $criteria" }
|
||||
|
||||
@ -852,8 +877,6 @@ class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractStat
|
||||
// scenario where sorting on attributes not parsed as criteria
|
||||
val entityRoot = criteriaQuery.from(entityStateClass)
|
||||
rootEntities[entityStateClass] = entityRoot
|
||||
val joinPredicate = criteriaBuilder.equal(vaultStates.get<PersistentStateRef>("stateRef"), entityRoot.get<PersistentStateRef>("stateRef"))
|
||||
joinPredicates.add(joinPredicate)
|
||||
entityRoot
|
||||
}
|
||||
when (direction) {
|
||||
@ -872,7 +895,6 @@ class HibernateQueryCriteriaParser(val contractStateType: Class<out ContractStat
|
||||
}
|
||||
if (orderCriteria.isNotEmpty()) {
|
||||
criteriaQuery.orderBy(orderCriteria)
|
||||
criteriaQuery.where(*joinPredicates.toTypedArray())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,25 +0,0 @@
|
||||
package net.corda.node.utilities
|
||||
|
||||
import com.sun.tools.attach.VirtualMachine
|
||||
import org.slf4j.Logger
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.util.*
|
||||
|
||||
object JVMAgentUtil {
|
||||
/**
|
||||
* Utility to attach to own VM at run-time and obtain agent details.
|
||||
* In Java 9 this requires setting the following run-time jvm flag: -Djdk.attach.allowAttachSelf=true
|
||||
* This mechanism supersedes the usage of VMSupport which is not available from Java 9 onwards.
|
||||
*/
|
||||
fun getJvmAgentProperties(log: Logger): Properties {
|
||||
val jvmPid = ManagementFactory.getRuntimeMXBean().name.substringBefore('@')
|
||||
return try {
|
||||
val vm = VirtualMachine.attach(jvmPid)
|
||||
return vm.agentProperties
|
||||
} catch (e: Throwable) {
|
||||
log.warn("Unable to determine whether agent is running: ${e.message}.\n" +
|
||||
"You may need to pass in -Djdk.attach.allowAttachSelf=true if running on a Java 9 or later VM")
|
||||
Properties()
|
||||
}
|
||||
}
|
||||
}
|
@ -1,30 +1,22 @@
|
||||
package net.corda.node.services.config
|
||||
|
||||
import com.natpryce.hamkrest.assertion.assertThat
|
||||
import com.natpryce.hamkrest.containsSubstring
|
||||
import com.nhaarman.mockito_kotlin.any
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.mock
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import com.nhaarman.mockito_kotlin.spy
|
||||
import com.nhaarman.mockito_kotlin.verify
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.internal.delete
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.internal.Node
|
||||
import org.apache.logging.log4j.LogManager
|
||||
import org.apache.logging.log4j.core.Appender
|
||||
import org.apache.logging.log4j.core.Logger
|
||||
import org.apache.logging.log4j.core.LogEvent
|
||||
import org.junit.After
|
||||
import org.junit.Assert
|
||||
import org.junit.Before
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import org.mockito.ArgumentMatchers.contains
|
||||
import org.slf4j.Logger
|
||||
import java.lang.reflect.Field
|
||||
import java.lang.reflect.Modifier
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.time.Duration
|
||||
import kotlin.test.assertFalse
|
||||
|
||||
class ConfigHelperTests {
|
||||
@ -74,34 +66,25 @@ class ConfigHelperTests {
|
||||
"corda.sshd.port" to sshPort.toString())
|
||||
}
|
||||
|
||||
@Ignore("CORDA-3981: Test is not stable")
|
||||
@Test(timeout = 300_000)
|
||||
fun `bad keys are ignored and warned for`() {
|
||||
val appender = mock<Appender>()
|
||||
val logMessage = openFuture<String>()
|
||||
whenever(appender.name).doReturn("mock")
|
||||
whenever(appender.isStarted).doReturn(true)
|
||||
whenever(appender.append(any())).thenAnswer {
|
||||
val event: LogEvent = it.getArgument(0)
|
||||
logMessage.set(event.message.format)
|
||||
null
|
||||
}
|
||||
val logger = LogManager.getLogger(Node::class.java.canonicalName) as Logger
|
||||
logger.addAppender(appender)
|
||||
val loggerField = Node::class.java.getDeclaredField("staticLog")
|
||||
loggerField.isAccessible = true
|
||||
val modifiersField = Field::class.java.getDeclaredField("modifiers")
|
||||
modifiersField.isAccessible = true
|
||||
modifiersField.setInt(loggerField, loggerField.modifiers and Modifier.FINAL.inv())
|
||||
val originalLogger = loggerField.get(null) as Logger
|
||||
val spyLogger = spy(originalLogger)
|
||||
loggerField.set(null, spyLogger)
|
||||
|
||||
val baseDirectory = mock<Path>()
|
||||
val configFile = createTempFile()
|
||||
configFile.deleteOnExit()
|
||||
System.setProperty("corda_bad_key", "2077")
|
||||
val config = loadConfig("corda_bad_key" to "2077")
|
||||
|
||||
verify(spyLogger).warn(contains("(property or environment variable) cannot be mapped to an existing Corda"))
|
||||
assertFalse(config?.hasPath("corda_bad_key") ?: true)
|
||||
|
||||
val config = ConfigHelper.loadConfig(baseDirectory, configFile.toPath())
|
||||
val warning = logMessage.getOrThrow(Duration.ofMinutes(3))
|
||||
assertThat(warning, containsSubstring("(property or environment variable) cannot be mapped to an existing Corda"))
|
||||
assertFalse(config.hasPath("corda_bad_key"))
|
||||
|
||||
System.clearProperty("corda_bad_key")
|
||||
loggerField.set(null, originalLogger)
|
||||
}
|
||||
|
||||
/**
|
||||
* Load the node configuration with the given environment variable
|
||||
* overrides.
|
||||
|
@ -0,0 +1,157 @@
|
||||
package net.corda.node.services.vault
|
||||
|
||||
import net.corda.core.contracts.BelongsToContract
|
||||
import net.corda.core.contracts.CommandData
|
||||
import net.corda.core.contracts.Contract
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.identity.AbstractParty
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.queryBy
|
||||
import net.corda.core.node.services.vault.DEFAULT_PAGE_SIZE
|
||||
import net.corda.core.node.services.vault.QueryCriteria
|
||||
import net.corda.core.node.services.vault.Sort
|
||||
import net.corda.core.node.services.vault.SortAttribute
|
||||
import net.corda.core.node.services.vault.builder
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.schemas.PersistentState
|
||||
import net.corda.core.schemas.QueryableState
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.node.MockNetwork
|
||||
import net.corda.testing.node.MockNetworkParameters
|
||||
import net.corda.testing.node.internal.cordappsForPackages
|
||||
import org.junit.BeforeClass
|
||||
import org.junit.Test
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Index
|
||||
import javax.persistence.Table
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class VaultQueryJoinTest {
|
||||
companion object {
|
||||
private val mockNetwork = MockNetwork(
|
||||
MockNetworkParameters(
|
||||
cordappsForAllNodes = cordappsForPackages(
|
||||
listOf(
|
||||
"net.corda.node.services.vault"
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
private val aliceNode = mockNetwork.createPartyNode(ALICE_NAME)
|
||||
private val notaryNode = mockNetwork.defaultNotaryNode
|
||||
private val serviceHubHandle = aliceNode.services
|
||||
private val createdStateRefs = mutableListOf<StateRef>()
|
||||
private const val numObjectsInLedger = DEFAULT_PAGE_SIZE + 1
|
||||
|
||||
@BeforeClass
|
||||
@JvmStatic
|
||||
fun setup() {
|
||||
repeat(numObjectsInLedger) { index ->
|
||||
createdStateRefs.add(addSimpleObjectToLedger(DummyData(index)))
|
||||
}
|
||||
|
||||
System.setProperty("net.corda.vault.query.disable.corda3879", "false");
|
||||
}
|
||||
|
||||
private fun addSimpleObjectToLedger(dummyObject: DummyData): StateRef {
|
||||
val tx = TransactionBuilder(notaryNode.info.legalIdentities.first())
|
||||
tx.addOutputState(
|
||||
DummyState(dummyObject, listOf(aliceNode.info.identityFromX500Name(ALICE_NAME)))
|
||||
)
|
||||
tx.addCommand(DummyContract.Commands.AddDummy(), aliceNode.info.legalIdentitiesAndCerts.first().owningKey)
|
||||
tx.verify(serviceHubHandle)
|
||||
val stx = serviceHubHandle.signInitialTransaction(tx)
|
||||
serviceHubHandle.recordTransactions(listOf(stx))
|
||||
return StateRef(stx.id, 0)
|
||||
}
|
||||
}
|
||||
|
||||
private val queryToCheckId = builder {
|
||||
val conditionToCheckId =
|
||||
DummySchema.DummyState::id
|
||||
.equal(0)
|
||||
QueryCriteria.VaultCustomQueryCriteria(conditionToCheckId, Vault.StateStatus.UNCONSUMED)
|
||||
}
|
||||
|
||||
private val queryToCheckStateRef =
|
||||
QueryCriteria.VaultQueryCriteria(Vault.StateStatus.UNCONSUMED, stateRefs = listOf(createdStateRefs[numObjectsInLedger-1]))
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `filter query with OR operator`() {
|
||||
val results = serviceHubHandle.vaultService.queryBy<DummyState>(
|
||||
queryToCheckId.or(queryToCheckStateRef)
|
||||
)
|
||||
assertEquals(2, results.states.size)
|
||||
assertEquals(2, results.statesMetadata.size)
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `filter query with sorting`() {
|
||||
val sorting = Sort(listOf(Sort.SortColumn(SortAttribute.Custom(DummySchema.DummyState::class.java, "stateRef"), Sort.Direction.DESC)))
|
||||
|
||||
val results = serviceHubHandle.vaultService.queryBy<DummyState>(
|
||||
queryToCheckStateRef, sorting = sorting
|
||||
)
|
||||
|
||||
assertEquals(1, results.states.size)
|
||||
assertEquals(1, results.statesMetadata.size)
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `filter query with OR operator and sorting`() {
|
||||
val sorting = Sort(listOf(Sort.SortColumn(SortAttribute.Custom(DummySchema.DummyState::class.java, "stateRef"), Sort.Direction.DESC)))
|
||||
|
||||
val results = serviceHubHandle.vaultService.queryBy<DummyState>(
|
||||
queryToCheckId.or(queryToCheckStateRef), sorting = sorting
|
||||
)
|
||||
|
||||
assertEquals(2, results.states.size)
|
||||
assertEquals(2, results.statesMetadata.size)
|
||||
}
|
||||
}
|
||||
|
||||
object DummyStatesV
|
||||
|
||||
@Suppress("MagicNumber") // SQL column length
|
||||
@CordaSerializable
|
||||
object DummySchema : MappedSchema(schemaFamily = DummyStatesV.javaClass, version = 1, mappedTypes = listOf(DummyState::class.java)){
|
||||
|
||||
@Entity
|
||||
@Table(name = "dummy_states", indexes = [Index(name = "dummy_id_index", columnList = "id")])
|
||||
class DummyState (
|
||||
@Column(name = "id", length = 4, nullable = false)
|
||||
var id: Int
|
||||
) : PersistentState()
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
data class DummyData(
|
||||
val id: Int
|
||||
)
|
||||
|
||||
@BelongsToContract(DummyContract::class)
|
||||
data class DummyState(val dummyData: DummyData, override val participants: List<AbstractParty>) :
|
||||
ContractState, QueryableState {
|
||||
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(DummySchema)
|
||||
|
||||
|
||||
override fun generateMappedObject(schema: MappedSchema) =
|
||||
when (schema) {
|
||||
is DummySchema -> DummySchema.DummyState(
|
||||
dummyData.id
|
||||
)
|
||||
else -> throw IllegalArgumentException("Unsupported Schema")
|
||||
}
|
||||
}
|
||||
|
||||
class DummyContract : Contract {
|
||||
override fun verify(tx: LedgerTransaction) { }
|
||||
interface Commands : CommandData {
|
||||
class AddDummy : Commands
|
||||
}
|
||||
}
|
@ -31,23 +31,22 @@ apply plugin: 'com.jfrog.artifactory'
|
||||
description 'A javaagent to allow hooking into Kryo checkpoints'
|
||||
|
||||
dependencies {
|
||||
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
|
||||
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
|
||||
compile "javassist:javassist:$javaassist_version"
|
||||
compile "com.esotericsoftware:kryo:$kryo_version"
|
||||
compile "co.paralleluniverse:quasar-core:$quasar_version"
|
||||
compileOnly "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
|
||||
compileOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
|
||||
compileOnly "javassist:javassist:$javaassist_version"
|
||||
compileOnly "com.esotericsoftware:kryo:$kryo_version"
|
||||
compileOnly "co.paralleluniverse:quasar-core:$quasar_version"
|
||||
|
||||
compile (project(':core')) {
|
||||
compileOnly (project(':core')) {
|
||||
transitive = false
|
||||
}
|
||||
|
||||
// Unit testing helpers.
|
||||
compile "org.junit.jupiter:junit-jupiter-api:${junit_jupiter_version}"
|
||||
compile "junit:junit:$junit_version"
|
||||
testCompile "org.junit.jupiter:junit-jupiter-api:${junit_jupiter_version}"
|
||||
testCompile "junit:junit:$junit_version"
|
||||
|
||||
// SLF4J: commons-logging bindings for a SLF4J back end
|
||||
compile "org.slf4j:jcl-over-slf4j:$slf4j_version"
|
||||
compile "org.slf4j:slf4j-api:$slf4j_version"
|
||||
compileOnly "org.slf4j:slf4j-api:$slf4j_version"
|
||||
}
|
||||
|
||||
jar {
|
||||
|
@ -51,8 +51,14 @@ class CheckpointAgent {
|
||||
LoggerFactory.getLogger("CheckpointAgent")
|
||||
}
|
||||
|
||||
val running by lazy {
|
||||
premainExecuted
|
||||
}
|
||||
private var premainExecuted = false
|
||||
|
||||
@JvmStatic
|
||||
fun premain(argumentsString: String?, instrumentation: Instrumentation) {
|
||||
premainExecuted = true
|
||||
parseArguments(argumentsString)
|
||||
instrumentation.addTransformer(CheckpointHook)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user