Merge branch 'release/os/4.4' into chrisr3-44-merge

This commit is contained in:
Chris Rankin 2020-03-04 16:27:44 +00:00
commit ffa2caed32
43 changed files with 1808 additions and 624 deletions

View File

@ -23,34 +23,51 @@ pipeline {
}
stages {
stage('Generate Build Image') {
steps {
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {
sh "./gradlew " +
"-Dkubenetize=true " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage --stacktrace"
stage('Corda Pull Request - Generate Build Image') {
steps {
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {
sh "./gradlew " +
"-Dkubenetize=true " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage --stacktrace"
}
sh "kubectl auth can-i get pods"
}
sh "kubectl auth can-i get pods"
}
}
stage('Regression Test') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
"-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" parallelRegressionTest --stacktrace"
stage('Testing phase') {
parallel {
stage('Regression Test') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
"-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" parallelRegressionTest --stacktrace"
}
}
stage('Slow Integration Test') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
"-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" allParallelSlowIntegrationTest --stacktrace"
}
}
}
}
}
}
post {

View File

@ -1,4 +1,5 @@
integration: { allParallelIntegrationTest }
pr-merge: { parallelRegressionTest }
smoke: { allParallelSmokeTest }
slow: { allParallelSlowIntegrationTest }
unit: { allParallelUnitTest }

View File

@ -33,17 +33,34 @@ pipeline {
}
}
stage('Regression Test') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
"-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" parallelRegressionTest --stacktrace"
stage('Testing phase') {
parallel {
stage('Regression Test') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
"-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" parallelRegressionTest --stacktrace"
}
}
stage('Slow Integration Test') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
"-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" allParallelSlowIntegrationTest --stacktrace"
}
}
}
}
}

View File

@ -658,7 +658,7 @@ task allParallelUnitAndIntegrationTest(type: ParallelTestGroup) {
nodeTaints "big"
}
task parallelRegressionTest(type: ParallelTestGroup) {
testGroups "test", "integrationTest", "slowIntegrationTest", "smokeTest"
testGroups "test", "integrationTest", "smokeTest"
numberOfShards 15
streamOutput false
coresPerFork 2
@ -667,7 +667,16 @@ task parallelRegressionTest(type: ParallelTestGroup) {
nodeTaints "big"
}
task allParallelSmokeTest(type: ParallelTestGroup) {
testGroups "slowIntegrationTest", "smokeTest"
testGroups "smokeTest"
numberOfShards 4
streamOutput false
coresPerFork 6
memoryInGbPerFork 10
distribute DistributeTestsBy.CLASS
nodeTaints "big"
}
task allParallelSlowIntegrationTest(type: ParallelTestGroup) {
testGroups "slowIntegrationTest"
numberOfShards 4
streamOutput false
coresPerFork 6

View File

@ -13,7 +13,6 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.cordapp.CordappResolver
import net.corda.core.internal.warnOnce
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.CordaSerializable
@ -95,7 +94,7 @@ private constructor(private val otherSideSession: FlowSession?,
override fun call(): LinkedHashMap<Party, AnonymousParty> {
val session = if (otherParty != null && otherParty != otherSideSession?.counterparty) {
logger.warnOnce("The current usage of SwapIdentitiesFlow is unsafe. Please consider upgrading your CorDapp to use " +
"SwapIdentitiesFlow with FlowSessions. (${CordappResolver.currentCordapp?.info})")
"SwapIdentitiesFlow with FlowSessions. (${serviceHub.getAppContext().cordapp.info})")
initiateFlow(otherParty)
} else {
otherSideSession!!

View File

@ -30,7 +30,7 @@ snakeYamlVersion=1.19
caffeineVersion=2.7.0
metricsVersion=4.1.0
metricsNewRelicVersion=1.1.1
djvmVersion=1.0-RC09
djvmVersion=1.0-RC10
deterministicRtVersion=1.0-RC02
openSourceBranch=https://github.com/corda/corda/blob/release/os/4.4
openSourceSamplesBranch=https://github.com/corda/samples/blob/release-V4

View File

@ -4,7 +4,6 @@ import com.natpryce.hamkrest.and
import com.natpryce.hamkrest.assertion.assertThat
import net.corda.core.flows.FinalityFlow
import net.corda.core.identity.Party
import net.corda.core.internal.cordapp.CordappResolver
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
@ -25,7 +24,8 @@ class FinalityFlowTests : WithFinality {
private val CHARLIE = TestIdentity(CHARLIE_NAME, 90).party
}
override val mockNet = InternalMockNetwork(cordappsForAllNodes = listOf(FINANCE_CONTRACTS_CORDAPP, enclosedCordapp()))
override val mockNet = InternalMockNetwork(cordappsForAllNodes = listOf(FINANCE_CONTRACTS_CORDAPP, enclosedCordapp(),
CustomCordapp(targetPlatformVersion = 3, classes = setOf(FinalityFlow::class.java))))
private val aliceNode = makeNode(ALICE_NAME)
@ -60,11 +60,8 @@ class FinalityFlowTests : WithFinality {
fun `allow use of the old API if the CorDapp target version is 3`() {
val oldBob = createBob(cordapps = listOf(tokenOldCordapp()))
val stx = aliceNode.issuesCashTo(oldBob)
val resultFuture = CordappResolver.withTestCordapp(targetPlatformVersion = 3) {
@Suppress("DEPRECATION")
aliceNode.startFlowAndRunNetwork(FinalityFlow(stx)).resultFuture
}
resultFuture.getOrThrow()
@Suppress("DEPRECATION")
aliceNode.startFlowAndRunNetwork(FinalityFlow(stx)).resultFuture.getOrThrow()
assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull()
}

View File

@ -5,7 +5,6 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.identity.Party
import net.corda.core.identity.groupAbstractPartyByWellKnownParty
import net.corda.core.internal.cordapp.CordappResolver
import net.corda.core.internal.pushToLoggingContext
import net.corda.core.internal.warnOnce
import net.corda.core.node.StatesToRecord
@ -136,7 +135,7 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
override fun call(): SignedTransaction {
if (!newApi) {
logger.warnOnce("The current usage of FinalityFlow is unsafe. Please consider upgrading your CorDapp to use " +
"FinalityFlow with FlowSessions. (${CordappResolver.currentCordapp?.info})")
"FinalityFlow with FlowSessions. (${serviceHub.getAppContext().cordapp.info})")
} else {
require(sessions.none { serviceHub.myInfo.isLegalIdentity(it.counterparty) }) {
"Do not provide flow sessions for the local node. FinalityFlow will record the notarised transaction locally."

View File

@ -192,6 +192,9 @@ fun <T> Observable<T>.bufferUntilSubscribed(): Observable<T> {
@DeleteForDJVM
fun <T> Observer<T>.tee(vararg teeTo: Observer<T>): Observer<T> {
val subject = PublishSubject.create<T>()
// use unsafe subscribe, so that the teed subscribers will not get wrapped with SafeSubscribers,
// therefore a potential raw exception (non Rx) coming from a child -unsafe subscribed- observer
// will not unsubscribe all of the subscribers under the PublishSubject.
subject.unsafeSubscribe(Subscribers.from(this))
teeTo.forEach { subject.unsafeSubscribe(Subscribers.from(it)) }
return subject

View File

@ -1,133 +0,0 @@
package net.corda.core.internal.cordapp
import net.corda.core.cordapp.Cordapp
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.warnOnce
import net.corda.core.utilities.loggerFor
import java.util.concurrent.ConcurrentHashMap
/**
* Provides a way to acquire information about the calling CorDapp.
*/
object CordappResolver {
private val logger = loggerFor<CordappResolver>()
private val cordappClasses: ConcurrentHashMap<String, Set<Cordapp>> = ConcurrentHashMap()
private val insideInMemoryTest: Boolean by lazy { insideInMemoryTest() }
// TODO Use the StackWalker API once we migrate to Java 9+
private var cordappResolver: () -> Cordapp? = {
Exception().stackTrace
.mapNotNull { cordappClasses[it.className] }
// in case there are multiple classes matched, we select the first one having a single CorDapp registered against it.
.firstOrNull { it.size == 1 }
// otherwise we return null, signalling we cannot reliably determine the current CorDapp.
?.single()
}
/**
* Associates class names with CorDapps or logs a warning when a CorDapp is already registered for a given class.
* This could happen when trying to run different versions of the same CorDapp on the same node.
*
* @throws IllegalStateException when multiple CorDapps are registered for the same contract class,
* since this can lead to undefined behaviour.
*/
@Synchronized
fun register(cordapp: Cordapp) {
val contractClasses = cordapp.contractClassNames.toSet()
val existingClasses = cordappClasses.keys
val classesToRegister = cordapp.cordappClasses.toSet()
val notAlreadyRegisteredClasses = classesToRegister - existingClasses
val alreadyRegistered= HashMap(cordappClasses).apply { keys.retainAll(classesToRegister) }
notAlreadyRegisteredClasses.forEach { cordappClasses[it] = setOf(cordapp) }
for ((registeredClassName, registeredCordapps) in alreadyRegistered) {
val duplicateCordapps = registeredCordapps.filter { it.jarHash == cordapp.jarHash }.toSet()
if (duplicateCordapps.isNotEmpty()) {
logger.warnOnce("The CorDapp (name: ${cordapp.info.shortName}, file: ${cordapp.name}) " +
"is installed multiple times on the node. The following files correspond to the exact same content: " +
"${duplicateCordapps.map { it.name }}")
continue
}
// During in-memory tests, the spawned nodes share the same CordappResolver, so detected conflicts can be spurious.
if (registeredClassName in contractClasses && !insideInMemoryTest) {
throw IllegalStateException("More than one CorDapp installed on the node for contract $registeredClassName. " +
"Please remove the previous version when upgrading to a new version.")
}
cordappClasses[registeredClassName] = registeredCordapps + cordapp
}
}
private fun insideInMemoryTest(): Boolean {
return Exception().stackTrace.any {
it.className.startsWith("net.corda.testing.node.internal.InternalMockNetwork") ||
it.className.startsWith("net.corda.testing.node.internal.InProcessNode") ||
it.className.startsWith("net.corda.testing.node.MockServices")
}
}
/*
* This should only be used when making a change that would break compatibility with existing CorDapps. The change
* can then be version-gated, meaning the old behaviour is used if the calling CorDapp's target version is lower
* than the platform version that introduces the new behaviour.
* In situations where a `[CordappProvider]` is available the CorDapp context should be obtained from there.
*
* @return Information about the CorDapp from which the invoker is called, null if called outside a CorDapp or the
* calling CorDapp cannot be reliably determined.
*/
val currentCordapp: Cordapp? get() = cordappResolver()
/**
* Returns the target version of the current calling CorDapp. Defaults to platform version 1 if there isn't one,
* assuming only basic platform capabilities.
*/
val currentTargetVersion: Int get() = currentCordapp?.targetPlatformVersion ?: 1
// A list of extra CorDapps added to the current CorDapps list for testing purposes.
private var extraCordappsForTesting = listOf<Cordapp>()
/**
* Return all the CorDapps that were involved in the call stack at the point the provided exception was generated.
*
* This is provided to allow splitting the cost of generating the exception and retrieving the CorDapps involved.
*/
fun cordappsFromException(exception: Exception): List<Cordapp> {
val apps = exception.stackTrace
.mapNotNull { cordappClasses[it.className] }
.flatten()
.distinct()
return (apps + extraCordappsForTesting)
}
/**
* Temporarily apply a fake CorDapp with the given parameters. For use in testing.
*/
@Synchronized
@VisibleForTesting
fun <T> withTestCordapp(minimumPlatformVersion: Int = 1,
targetPlatformVersion: Int = PLATFORM_VERSION,
extraApps: List<CordappImpl> = listOf(),
block: () -> T): T {
val currentResolver = cordappResolver
cordappResolver = {
CordappImpl.TEST_INSTANCE.copy(minimumPlatformVersion = minimumPlatformVersion, targetPlatformVersion = targetPlatformVersion)
}
extraCordappsForTesting = listOf(cordappResolver()!!) + extraApps
try {
return block()
} finally {
cordappResolver = currentResolver
extraCordappsForTesting = listOf()
}
}
@VisibleForTesting
internal fun clear() {
cordappClasses.clear()
}
}

View File

@ -0,0 +1,13 @@
@file:JvmName("Observables")
package net.corda.core.observable
import net.corda.core.observable.internal.OnResilientSubscribe
import rx.Observable
/**
* [Observable.continueOnError] is used to return an Observable, through which we can subscribe non unsubscribing [rx.Observer]s
* to the source [Observable]. Namely, it makes the [rx.Observer]s resilient to exceptions coming out of [rx.Observer.onNext].
*
* [Observable.continueOnError] should be called before every subscribe to have the aforementioned effect.
*/
fun <T> Observable<T>.continueOnError(): Observable<T> = Observable.unsafeCreate(OnResilientSubscribe(this, true))

View File

@ -0,0 +1,120 @@
package net.corda.core.observable.internal
import net.corda.core.internal.VisibleForTesting
import rx.Observable
import rx.Observer
import rx.Subscriber
import rx.exceptions.CompositeException
import rx.exceptions.Exceptions
import rx.exceptions.OnErrorFailedException
import rx.exceptions.OnErrorNotImplementedException
import rx.internal.util.ActionSubscriber
import rx.observers.SafeSubscriber
import rx.plugins.RxJavaHooks
import rx.plugins.RxJavaPlugins
import rx.subjects.Subject
/**
* Extends [SafeSubscriber] to override [SafeSubscriber.onNext], [SafeSubscriber.onError] and [SafeSubscriber._onError].
*
* [ResilientSubscriber] will not set [SafeSubscriber.done] flag to true nor will call [SafeSubscriber.unsubscribe] upon
* error inside [Observer.onNext]. This way, the [ResilientSubscriber] will not get unsubscribed and therefore the underlying [Observer]
* will not get removed.
*
* An [Observer] that will not get removed due to errors in [onNext] events becomes useful when an unsubscribe could
* lead to a malfunctioning CorDapp, due to a single isolated error. If the [Observer] gets removed,
* it will no longer be available the next time any events are pushed from the base [Subject].
*/
@VisibleForTesting
class ResilientSubscriber<T>(actual: Subscriber<in T>) : SafeSubscriber<T>(actual) {
/**
* Duplicate of [SafeSubscriber.onNext]. However, it ignores [SafeSubscriber.done] flag.
* It only delegates to [SafeSubscriber.onError] if it wraps an [ActionSubscriber] which is
* a leaf in an Subscribers' tree structure.
*/
@Suppress("TooGenericExceptionCaught")
override fun onNext(t: T) {
try {
actual.onNext(t)
} catch (e: Throwable) {
if (actual is ActionSubscriber) {
// this Subscriber wraps an ActionSubscriber which is always a leaf Observer, then call user-defined onError
Exceptions.throwOrReport(e, this)
} else {
// this Subscriber may wrap a non leaf Observer. In case the wrapped Observer is a PublishSubject then we
// should not call onError because PublishSubjectState.onError will shut down all of the Observers under it
throw OnNextFailedException(
"Observer.onNext failed, this is a non leaf ResilientSubscriber, therefore onError will be skipped", e
)
}
}
}
/**
* Duplicate of [SafeSubscriber.onError]. However, it will not set [SafeSubscriber.done] flag to true.
*/
override fun onError(e: Throwable) {
Exceptions.throwIfFatal(e)
_onError(e)
}
/**
* Duplicate of [SafeSubscriber._onError]. However, it will not call [Subscriber.unsubscribe].
*/
@Suppress("TooGenericExceptionCaught")
override fun _onError(e: Throwable) {
@Suppress("DEPRECATION")
RxJavaPlugins.getInstance().errorHandler.handleError(e)
try {
actual.onError(e)
} catch (e: OnErrorNotImplementedException) {
throw e
} catch (e2: Throwable) {
RxJavaHooks.onError(e2)
throw OnErrorFailedException(
"Error occurred when trying to propagate error to Observer.onError", CompositeException(listOf(e, e2))
)
}
}
}
/**
* We throw [OnNextFailedException] to pass the exception back through the preceding [Subscriber] chain
* without triggering any [SafeSubscriber.onError]s. Since we are extending an [OnErrorNotImplementedException]
* the exception will be re-thrown at [Exceptions.throwOrReport].
*/
@VisibleForTesting
class OnNextFailedException(message: String, cause: Throwable) : OnErrorNotImplementedException(message, cause)
/**
* [OnResilientSubscribe] returns an [Observable] holding a reference to the source [Observable]. Upon subscribing to it,
* when reaching [call] method, if the subscriber passed in [isSafeSubscriber] it will unwrap the [Observer] from
* the [SafeSubscriber], re-wrap it with [ResilientSubscriber] and then subscribe it to the source [Observable].
*
* In case we need to subscribe with a [SafeSubscriber] to the source [Observable] via [OnResilientSubscribe], we have to:
* 1. Declare a custom SafeSubscriber extending [SafeSubscriber].
* 2. Wrap our [rx.Observer] -to be subscribed to the source [Observable]- with the custom SafeSubscriber.
* 3. Create a [OnResilientSubscribe] object with [strictMode] = false.
* 3. Call [Observable.unsafeCreate] passing in as argument the [OnResilientSubscribe].
* 4. Subscribe to the returned [Observable] passing in as argument the custom SafeSubscriber.
*/
class OnResilientSubscribe<T>(val source: Observable<T>, private val strictMode: Boolean): Observable.OnSubscribe<T> {
override fun call(subscriber: Subscriber<in T>) {
if (isSafeSubscriber(subscriber)) {
source.unsafeSubscribe(ResilientSubscriber((subscriber as SafeSubscriber).actual))
} else {
source.unsafeSubscribe(subscriber)
}
}
private fun isSafeSubscriber(subscriber: Subscriber<*>): Boolean {
return if (strictMode) {
// In strictMode mode we capture SafeSubscriber subclasses as well
SafeSubscriber::class.java.isAssignableFrom(subscriber::class.java)
} else {
subscriber::class == SafeSubscriber::class
}
}
}

View File

@ -1,92 +0,0 @@
package net.corda.core.internal.cordapp
import net.corda.core.crypto.SecureHash
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.lang.IllegalStateException
import kotlin.test.assertEquals
class CordappResolverTest {
@Before
@After
fun clearCordappInfoResolver() {
CordappResolver.clear()
}
@Test(timeout=300_000)
fun `the correct cordapp resolver is used after calling withCordappInfo`() {
val defaultTargetVersion = 222
CordappResolver.register(CordappImpl.TEST_INSTANCE.copy(
contractClassNames = listOf(javaClass.name),
minimumPlatformVersion = 3,
targetPlatformVersion = defaultTargetVersion
))
assertEquals(defaultTargetVersion, CordappResolver.currentTargetVersion)
val expectedTargetVersion = 555
CordappResolver.withTestCordapp(targetPlatformVersion = expectedTargetVersion) {
val actualTargetVersion = CordappResolver.currentTargetVersion
assertEquals(expectedTargetVersion, actualTargetVersion)
}
assertEquals(defaultTargetVersion, CordappResolver.currentTargetVersion)
}
@Test(timeout=300_000)
fun `when the same cordapp is registered for the same class multiple times, the resolver deduplicates and returns it as the current one`() {
CordappResolver.register(CordappImpl.TEST_INSTANCE.copy(
contractClassNames = listOf(javaClass.name),
minimumPlatformVersion = 3,
targetPlatformVersion = 222
))
CordappResolver.register(CordappImpl.TEST_INSTANCE.copy(
contractClassNames = listOf(javaClass.name),
minimumPlatformVersion = 2,
targetPlatformVersion = 456
))
assertThat(CordappResolver.currentCordapp).isNotNull()
}
@Test(timeout=300_000)
fun `when different cordapps are registered for the same (non-contract) class, the resolver returns null`() {
CordappResolver.register(CordappImpl.TEST_INSTANCE.copy(
contractClassNames = listOf("ContractClass1"),
minimumPlatformVersion = 3,
targetPlatformVersion = 222,
jarHash = SecureHash.randomSHA256()
))
CordappResolver.register(CordappImpl.TEST_INSTANCE.copy(
contractClassNames = listOf("ContractClass2"),
minimumPlatformVersion = 2,
targetPlatformVersion = 456,
jarHash = SecureHash.randomSHA256()
))
assertThat(CordappResolver.currentCordapp).isNull()
}
@Test(timeout=300_000)
fun `when different cordapps are registered for the same (contract) class, the resolver throws an exception`() {
val firstCordapp = CordappImpl.TEST_INSTANCE.copy(
contractClassNames = listOf(javaClass.name),
minimumPlatformVersion = 3,
targetPlatformVersion = 222,
jarHash = SecureHash.randomSHA256()
)
val secondCordapp = CordappImpl.TEST_INSTANCE.copy(
contractClassNames = listOf(javaClass.name),
minimumPlatformVersion = 2,
targetPlatformVersion = 456,
jarHash = SecureHash.randomSHA256()
)
CordappResolver.register(firstCordapp)
assertThatThrownBy { CordappResolver.register(secondCordapp) }
.isInstanceOf(IllegalStateException::class.java)
.hasMessageContaining("More than one CorDapp installed on the node for contract ${javaClass.name}. " +
"Please remove the previous version when upgrading to a new version.")
}
}

View File

@ -1,19 +1,23 @@
#!/usr/bin/env bash
GENERATE_TEST_NET=0
GENERATE_GENERIC=0
EXIT_ON_GENERATE=0
die() {
printf '%s\n' "$1" >&2
exit 1
printf '%s\n' "$1" >&2
exit 1
}
show_help(){
echo "usage: generate-config <--testnet>|<--generic>"
echo -e "\t --testnet is used to generate config and certificates for joining TestNet"
echo -e "\t --generic is used to generate config and certificates for joining an existing Corda Compatibility Zone"
show_help() {
echo "usage: generate-config <--testnet>|<--generic>"
echo -e "\t --testnet is used to generate config and certificates for joining TestNet"
echo -e "\t --generic is used to generate config and certificates for joining an existing Corda Compatibility Zone"
}
function generateTestnetConfig() {
: ${RPC_PASSWORD=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1)}
RPC_PASSWORD=${RPC_PASSWORD} \
: ${RPC_PASSWORD=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1)}
RPC_PASSWORD=${RPC_PASSWORD} \
DB_PASSWORD=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1) \
MY_PUBLIC_ADDRESS=${MY_PUBLIC_ADDRESS} \
MY_P2P_PORT=${MY_P2P_PORT} \
@ -24,101 +28,105 @@ function generateTestnetConfig() {
java -jar config-exporter.jar "TEST-NET-COMBINE" "node.conf" "/opt/corda/starting-node.conf" "${CONFIG_FOLDER}/node.conf"
}
function generateGenericCZConfig(){
if ! [[ -f ${CONFIG_FOLDER}/node.conf ]] ; then
echo 'INFO: no existing node config detected, generating config skeleton'
: ${NETWORKMAP_URL:? '$NETWORKMAP_URL, the Compatibility Zone to join must be set as environment variable'}
: ${DOORMAN_URL:? '$DOORMAN_URL, the Doorman to use when joining must be set as environment variable'}
: ${MY_LEGAL_NAME:? '$MY_LEGAL_NAME, the X500 name to use when joining must be set as environment variable'}
: ${MY_EMAIL_ADDRESS:? '$MY_EMAIL_ADDRESS, the email to use when joining must be set as an environment variable'}
: ${NETWORK_TRUST_PASSWORD=:? '$NETWORK_TRUST_PASSWORD, the password to the network store to use when joining must be set as environment variable'}
function generateGenericCZConfig() {
if ! [[ -f ${CONFIG_FOLDER}/node.conf ]]; then
echo 'INFO: no existing node config detected, generating config skeleton'
: ${NETWORKMAP_URL:? '$NETWORKMAP_URL, the Compatibility Zone to join must be set as environment variable'}
: ${DOORMAN_URL:? '$DOORMAN_URL, the Doorman to use when joining must be set as environment variable'}
: ${MY_LEGAL_NAME:? '$MY_LEGAL_NAME, the X500 name to use when joining must be set as environment variable'}
: ${MY_EMAIL_ADDRESS:? '$MY_EMAIL_ADDRESS, the email to use when joining must be set as an environment variable'}
: ${NETWORK_TRUST_PASSWORD=:? '$NETWORK_TRUST_PASSWORD, the password to the network store to use when joining must be set as environment variable'}
if [[ ! -f ${CERTIFICATES_FOLDER}/${TRUST_STORE_NAME} ]]; then
die "Network Trust Root file not found"
fi
: ${RPC_PASSWORD=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1)}
RPC_PASSWORD=${RPC_PASSWORD} \
DB_PASSWORD=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1) \
MY_PUBLIC_ADDRESS=${MY_PUBLIC_ADDRESS} \
MY_P2P_PORT=${MY_P2P_PORT} \
MY_RPC_PORT=${MY_RPC_PORT} \
MY_RPC_ADMIN_PORT=${MY_RPC_ADMIN_PORT} \
java -jar config-exporter.jar "GENERIC-CZ" "/opt/corda/starting-node.conf" "${CONFIG_FOLDER}/node.conf"
if [[ ! -f ${CERTIFICATES_FOLDER}/${TRUST_STORE_NAME} ]]; then
die "Network Trust Root file not found"
fi
: ${RPC_PASSWORD=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1)}
RPC_PASSWORD=${RPC_PASSWORD} \
DB_PASSWORD=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 32 | head -n 1) \
MY_PUBLIC_ADDRESS=${MY_PUBLIC_ADDRESS} \
MY_P2P_PORT=${MY_P2P_PORT} \
MY_RPC_PORT=${MY_RPC_PORT} \
MY_RPC_ADMIN_PORT=${MY_RPC_ADMIN_PORT} \
java -jar config-exporter.jar "GENERIC-CZ" "/opt/corda/starting-node.conf" "${CONFIG_FOLDER}/node.conf"
fi
java -Djava.security.egd=file:/dev/./urandom -Dcapsule.jvm.args="${JVM_ARGS}" -jar /opt/corda/bin/corda.jar \
--initial-registration \
--base-directory /opt/corda \
--config-file ${CONFIG_FOLDER}/node.conf \
--network-root-truststore-password ${NETWORK_TRUST_PASSWORD} \
--network-root-truststore ${CERTIFICATES_FOLDER}/${TRUST_STORE_NAME} &&
echo "Successfully registered with ${DOORMAN_URL}, starting corda"
if [[ ${EXIT_ON_GENERATE} == 1 ]]; then
exit 0
else
run-corda
fi
java -Djava.security.egd=file:/dev/./urandom -Dcapsule.jvm.args="${JVM_ARGS}" -jar /opt/corda/bin/corda.jar \
--initial-registration \
--base-directory /opt/corda \
--config-file ${CONFIG_FOLDER}/node.conf \
--network-root-truststore-password ${NETWORK_TRUST_PASSWORD} \
--network-root-truststore ${CERTIFICATES_FOLDER}/${TRUST_STORE_NAME} && \
echo "Successfully registered with ${DOORMAN_URL}, starting corda" && \
run-corda
}
function downloadTestnetCerts() {
if [[ ! -f ${CERTIFICATES_FOLDER}/certs.zip ]]; then
: ${ONE_TIME_DOWNLOAD_KEY:? '$ONE_TIME_DOWNLOAD_KEY must be set as environment variable'}
: ${LOCALITY:? '$LOCALITY (the locality used when registering for Testnet) must be set as environment variable'}
: ${COUNTRY:? '$COUNTRY (the country used when registering for Testnet) must be set as environment variable'}
curl \
-X POST "https://onboarder.prod.ws.r3.com/api/user/node/generate/one-time-key/redeem/$ONE_TIME_DOWNLOAD_KEY" \
-o "${CERTIFICATES_FOLDER}/certs.zip"
fi
rm -rf ${CERTIFICATES_FOLDER}/*.jks
unzip ${CERTIFICATES_FOLDER}/certs.zip
if [[ ! -f ${CERTIFICATES_FOLDER}/certs.zip ]]; then
: ${ONE_TIME_DOWNLOAD_KEY:? '$ONE_TIME_DOWNLOAD_KEY must be set as environment variable'}
: ${LOCALITY:? '$LOCALITY (the locality used when registering for Testnet) must be set as environment variable'}
: ${COUNTRY:? '$COUNTRY (the country used when registering for Testnet) must be set as environment variable'}
curl \
-X POST "https://onboarder.prod.ws.r3.com/api/user/node/generate/one-time-key/redeem/$ONE_TIME_DOWNLOAD_KEY" \
-o "${CERTIFICATES_FOLDER}/certs.zip"
fi
rm -rf ${CERTIFICATES_FOLDER}/*.jks
unzip ${CERTIFICATES_FOLDER}/certs.zip
}
GENERATE_TEST_NET=0
GENERATE_GENERIC=0
while :; do
case $1 in
-h|-\?|--help)
show_help # Display a usage synopsis.
exit
;;
-t|--testnet)
if [[ ${GENERATE_GENERIC} = 0 ]]; then
GENERATE_TEST_NET=1
else
die 'ERROR: cannot generate config for multiple networks'
fi
;;
-g|--generic)
if [[ ${GENERATE_TEST_NET} = 0 ]]; then
GENERATE_GENERIC=1
else
die 'ERROR: cannot generate config for multiple networks'
fi
;;
--) # End of all options.
shift
break
;;
-?*)
printf 'WARN: Unknown option (ignored): %s\n' "$1" >&2
;;
*) # Default case: No more options, so break out of the loop.
break
esac
case $1 in
-h | -\? | --help)
show_help # Display a usage synopsis.
exit
;;
-t | --testnet)
if [[ ${GENERATE_GENERIC} == 0 ]]; then
GENERATE_TEST_NET=1
else
die 'ERROR: cannot generate config for multiple networks'
fi
;;
-g | --generic)
if [[ ${GENERATE_TEST_NET} == 0 ]]; then
GENERATE_GENERIC=1
else
die 'ERROR: cannot generate config for multiple networks'
fi
;;
-e | --exit-on-generate)
if [[ ${EXIT_ON_GENERATE} == 0 ]]; then
EXIT_ON_GENERATE=1
else
die 'ERROR: cannot set exit on generate flag'
fi
;;
--) # End of all options.
shift
break
;;
-?*)
printf 'WARN: Unknown option (ignored): %s\n' "$1" >&2
;;
*) # Default case: No more options, so break out of the loop.
break ;;
esac
shift
done
: ${TRUST_STORE_NAME="network-root-truststore.jks"}
: ${JVM_ARGS='-Xmx4g -Xms2g -XX:+UseG1GC'}
if [[ ${GENERATE_TEST_NET} == 1 ]]
then
: ${MY_PUBLIC_ADDRESS:? 'MY_PUBLIC_ADDRESS must be set as environment variable'}
downloadTestnetCerts
generateTestnetConfig
elif [[ ${GENERATE_GENERIC} == 1 ]]
then
: ${MY_PUBLIC_ADDRESS:? 'MY_PUBLIC_ADDRESS must be set as environment variable'}
generateGenericCZConfig
if [[ ${GENERATE_TEST_NET} == 1 ]]; then
: ${MY_PUBLIC_ADDRESS:? 'MY_PUBLIC_ADDRESS must be set as environment variable'}
downloadTestnetCerts
generateTestnetConfig
elif [[ ${GENERATE_GENERIC} == 1 ]]; then
: ${MY_PUBLIC_ADDRESS:? 'MY_PUBLIC_ADDRESS must be set as environment variable'}
generateGenericCZConfig
else
show_help
die "No Valid Configuration requested"
show_help
die "No Valid Configuration requested"
fi

View File

@ -92,55 +92,73 @@ Corda utility classes, providing a broad range of functionality to help implemen
Some simple testing utilities like pre-defined top-level values for common currencies. Mostly useful for
writing unit tests in Kotlin.
WARNING: NOT API STABLE.
__WARNING:__ This library is not suitable for production use and should not be used in real CorDapps.
Instead, use the [Token SDK](https://github.com/corda/token-sdk), or implement your own library. This
library may be removed in a future release without warning.
# Package net.corda.finance.utils
A collection of utilities for summing financial states, for example, summing obligations to get total debts.
WARNING: NOT API STABLE.
__WARNING:__ This library is not suitable for production use and should not be used in real CorDapps.
Instead, use the [Token SDK](https://github.com/corda/token-sdk), or implement your own library. This
library may be removed in a future release without warning.
# Package net.corda.finance.contracts
Various types for common financial concepts like day roll conventions, fixes, etc.
WARNING: NOT API STABLE.
__WARNING:__ This library is not suitable for production use and should not be used in real CorDapps.
Instead, use the [Token SDK](https://github.com/corda/token-sdk), or implement your own library. This
library may be removed in a future release without warning.
# Package net.corda.finance.contracts.asset
Cash states, obligations and commodities.
WARNING: NOT API STABLE.
__WARNING:__ This library is not suitable for production use and should not be used in real CorDapps.
Instead, use the [Token SDK](https://github.com/corda/token-sdk), or implement your own library. This
library may be removed in a future release without warning.
# Package net.corda.finance.contracts.asset.cash.selection
Provisional support for pluggable cash selectors, needed for different database backends.
WARNING: NOT API STABLE.
__WARNING:__ This library is not suitable for production use and should not be used in real CorDapps.
Instead, use the [Token SDK](https://github.com/corda/token-sdk), or implement your own library. This
library may be removed in a future release without warning.
# Package net.corda.finance.contracts.math
Splines and interpolation.
WARNING: NOT API STABLE.
__WARNING:__ This library is not suitable for production use and should not be used in real CorDapps.
Instead, use the [Token SDK](https://github.com/corda/token-sdk), or implement your own library. This
library may be removed in a future release without warning.
# Package net.corda.finance.flows
Cash payments and issuances. Two party "delivery vs payment" atomic asset swaps.
WARNING: NOT API STABLE.
__WARNING:__ This library is not suitable for production use and should not be used in real CorDapps.
Instead, use the [Token SDK](https://github.com/corda/token-sdk), or implement your own library. This
library may be removed in a future release without warning.
# Package net.corda.finance.plugin
JSON/Jackson plugin for business calendars.
WARNING: NOT API STABLE.
__WARNING:__ This library is not suitable for production use and should not be used in real CorDapps.
Instead, use the [Token SDK](https://github.com/corda/token-sdk), or implement your own library. This
library may be removed in a future release without warning.
# Package net.corda.finance.schemas
JPA (Java Persistence Architecture) schemas for the financial state types.
WARNING: NOT API STABLE.
__WARNING:__ This library is not suitable for production use and should not be used in real CorDapps.
Instead, use the [Token SDK](https://github.com/corda/token-sdk), or implement your own library. This
library may be removed in a future release without warning.
# Package net.corda.testing.core

View File

@ -132,7 +132,7 @@ It is possible to configure the name of the Trust Root file by setting the ``TRU
-e MY_EMAIL_ADDRESS="cordauser@r3.com" \
-v /home/user/docker/config:/etc/corda \
-v /home/user/docker/certificates:/opt/corda/certificates \
corda/corda-zulu-java1.8-|corda_version_lower|:latest config-generator --generic
corda/corda-zulu-java1.8-|corda_version_lower|:latest config-generator --generic --exit-on-generate
Several environment variables must also be passed to the container to allow it to register:

View File

@ -1,6 +1,5 @@
package net.corda.docs.java.tutorial.test;
import kotlin.Unit;
import net.corda.client.rpc.CordaRPCClient;
import net.corda.core.messaging.CordaRPCOps;
import net.corda.core.utilities.KotlinUtilsKt;
@ -10,24 +9,24 @@ import net.corda.testing.driver.*;
import net.corda.testing.node.User;
import org.junit.Test;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.Future;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static net.corda.testing.core.TestConstants.ALICE_NAME;
import static net.corda.testing.driver.Driver.driver;
import static net.corda.testing.node.internal.InternalTestUtilsKt.cordappWithPackages;
import static org.junit.Assert.assertEquals;
public final class TutorialFlowAsyncOperationTest {
public class TutorialFlowAsyncOperationTest {
// DOCSTART summingWorks
@Test
public final void summingWorks() {
Driver.driver(new DriverParameters(), (DriverDSL dsl) -> {
User aliceUser = new User("aliceUser", "testPassword1",
new HashSet<>(Collections.singletonList(Permissions.all()))
);
public void summingWorks() {
driver(new DriverParameters(singletonList(cordappWithPackages("net.corda.docs.java.tutorial.flowstatemachines"))), (DriverDSL dsl) -> {
User aliceUser = new User("aliceUser", "testPassword1", singleton(Permissions.all()));
Future<NodeHandle> aliceFuture = dsl.startNode(new NodeParameters()
.withProvidedName(ALICE_NAME)
.withRpcUsers(Collections.singletonList(aliceUser))
.withRpcUsers(singletonList(aliceUser))
);
NodeHandle alice = KotlinUtilsKt.getOrThrow(aliceFuture, null);
CordaRPCClient aliceClient = new CordaRPCClient(alice.getRpcAddress());
@ -35,7 +34,7 @@ public final class TutorialFlowAsyncOperationTest {
Future<Integer> answerFuture = aliceProxy.startFlowDynamic(ExampleSummingFlow.class).getReturnValue();
int answer = KotlinUtilsKt.getOrThrow(answerFuture, null);
assertEquals(3, answer);
return Unit.INSTANCE;
return null;
});
}
// DOCEND summingWorks

View File

@ -10,7 +10,6 @@ import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.cordappWithPackages
import net.corda.testing.node.internal.findCordapp
import org.junit.Test
import kotlin.test.assertEquals

View File

@ -27,9 +27,11 @@ The name must also obey the following constraints:
* The ``organisation``, ``locality`` and ``country`` attributes are present
* The ``state``, ``organisational-unit`` and ``common name`` attributes are optional
* The ``state``, ``organisational-unit`` and ``common name`` attributes are optional
* The fields of the name have the following maximum character lengths:
* The maximum number of characters in the whole x500 name string is 128 characters
* The fields of the name have character lengths **less** than the following maximum values:
* Common name: 64
* Organisation: 128
@ -40,12 +42,18 @@ The name must also obey the following constraints:
* The ``country`` attribute is a valid `ISO 3166-1<https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2>` two letter code in upper-case
* The ``organisation`` field of the name obeys the following constraints:
* Has at least two letters
* Does not include the following characters: ``,`` , ``"``, ``\``
* All data fields adhere to the following constraints:
* Upper-case first letter
* Does not include the following characters: ``,``, ``=``, ``$``, ``"``, ``'``, ``\``
* Is in NFKC normalization form
* Does not contain the null character
* Only the latin, common and inherited unicode scripts are supported
* No double-spacing
* No leading or trailing whitespace
This is to avoid right-to-left issues, debugging issues when we can't pronounce names over the phone, and
character confusability attacks.

View File

@ -4,7 +4,65 @@ Release notes
.. contents::
:depth: 2
Welcome to the Corda 4.3 release notes. Please read these carefully to understand whats new in this release and how the features can help you. Just as prior releases have brought with them commitments to wire and API stability, Corda 4.3 comes with those same guarantees. States and apps valid in Corda 3.0 are transparently usable in Corda 4.3.
Welcome to the Corda 4.4 release notes. Please read these carefully to understand whats new in this release and how the features can help you. Just as prior releases have brought with them commitments to wire and API stability, Corda 4.4 comes with those same guarantees. States and apps valid in Corda 3.0 are usable in Corda 4.4.
.. _release_notes_v4_4:
Corda 4.4
=========
Corda 4.4 lays the foundation of a new open-core approach for the Corda codebase. This involved a refactoring of the main functional components of Corda. Please consult :doc:`cordapp-overview.rst` to get an overview of the practical impact on CorDapp development.
Furthermore, Corda 4.4 introduces improvements to the flow framework API, a new diagnostic ``ServiceHub`` call and includes a number of security enhancements.
Changes for developers in Corda 4.4
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Flows API improvements
+++++++++++++++++++++++
Corda 4.4 introduces a new ``FlowLogic.await`` API that allows a CorDapp developer to suspend their flow when executing user-defined long-running operations (e.g. call-outs to external services). This prevents these long-running operations from blocking the flow thread, allowing other flows to progress in the interim. Previously, these operations had to be executed synchronously, blocking the flow thread.
The CorDapp developer can decide whether to run these asynchronous flow operations in a dedicated thread pool, or to handle the threading themselves directly.
Note that as before, the flow framework suspends automatically for certain operations (e.g. when waiting to receive a message from a counterparty). These suspensions do not have to be triggered explicitly.
The node operator can configure the number of threads in the threadpool to dedicate to external operations.
Corda 4.4 also introduces a new ``HospitalizeFlowException`` exception type that, when thrown, causes a flow to halt execution and send itself to the flow hospital for observation. The flow will automatically be retried on the next node start.
This exception gives user code a way to retry a flow from its last checkpoint if a known intermittent failure occurred.
New utility APIs
+++++++++++++++++++++++
Corda 4.4 introduces a new call (``ServiceHub.DiagnosticsService``) available to CorDapp developers that allows them to access:
* The edition of Corda being run (e.g. Open Source, Enterprise)
* The version of Corda being run including the patch number (eg. 3.2.20190215)
Corda 4.4 also provides a callback (``AppServiceHub.register``) to allow Corda services to register custom actions to be performed once the node is fully started-up. This pattern prevents issues caused by the service trying to immediately access a part of the node that hadn't yet been initialised .
Security enhancements
+++++++++++++++++++++++
* The SSH server in the :doc:`shell` has been updated to remove outdated weak ciphers and algorithms.
* The ability to SSH into the standalone shell has been removed
* A new read-only RPC user role template has been documented in :doc:`shell`
Platform version change
~~~~~~~~~~~~~~~~~~~~~~~
Given the addition of new APIs, the platform version of Corda 4.4 has been bumped up from 5 to 6. This is to prevent CorDapps that use it being deployed onto nodes unable to host them. Note that the minimum platform version has not been changed - this means that older Corda nodes can still interoperate with Corda 4.4 nodes. Since the APIs added do not affect the wire protocol or have other zone-level implications, applications can take advantage of these new platform version 6 features even if the Corda 4.4 node is running on a network whose minimum platform version is 4.
For more information on platform version, please see :doc:`versioning`. For more details on upgrading a CorDapp to use platform version 5, please see :doc:`app-upgrade-notes`.
Issues Fixed
~~~~~~~~~~~~
.. _release_notes_v4_3:
@ -446,7 +504,7 @@ Corda 4
Welcome to the Corda 4 release notes. Please read these carefully to understand what's new in this
release and how the changes can help you. Just as prior releases have brought with them commitments
to wire and API stability, Corda 4 comes with those same guarantees. States and apps valid in
Corda 3 are transparently usable in Corda 4.
Corda 3 are usable in Corda 4.
For app developers, we strongly recommend reading ":doc:`app-upgrade-notes`". This covers the upgrade
procedure, along with how you can adjust your app to opt-in to new features making your app more secure and

View File

@ -91,6 +91,8 @@ The host key is loaded from the ``<node root directory>/sshkey/hostkey.pem`` fil
generated automatically. In development mode, the seed may be specified to give the same results on the same computer
in order to avoid host-checking errors.
Only RSA key is currently supported as a host key. If ``hostkey.pem`` is not RSA, it will be replaced by the newly generated RSA key.
Connecting to the shell
***********************

View File

@ -0,0 +1,5 @@
package net.corda.nodeapi.internal
object ArtemisConstants {
const val MESSAGE_ID_KEY = "_AMQ_DUPL_ID"
}

View File

@ -7,6 +7,8 @@ import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.toHexString
import net.corda.nodeapi.internal.ArtemisConstants.MESSAGE_ID_KEY
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
@ -26,6 +28,8 @@ import org.slf4j.MDC
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.*
import kotlin.math.max
import kotlin.math.min
/**
* This ConnectionStateMachine class handles the events generated by the proton-j library to track
@ -51,6 +55,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
MDC.put("serverMode", serverMode.toString())
MDC.put("localLegalName", localLegalName)
MDC.put("remoteLegalName", remoteLegalName)
MDC.put("conn", connection.prettyPrint)
block()
} finally {
MDC.setContextMap(oldMDC)
@ -73,12 +78,22 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
private val transport: Transport
private val id = UUID.randomUUID().toString()
private var session: Session? = null
/**
* Key is message topic and value is the list of messages
*/
private val messageQueues = mutableMapOf<String, LinkedList<SendableMessageImpl>>()
private val unackedQueue = LinkedList<SendableMessageImpl>()
private val receivers = mutableMapOf<String, Receiver>()
private val senders = mutableMapOf<String, Sender>()
private var tagId: Int = 0
private val Connection?.prettyPrint: String
get() = this?.context?.toString() ?: "<n/a>"
private val Transport?.prettyPrint: String
// Inside Transport's context - there is Connection, inside Connection's context there is NIO channel that has useful information
get() = (this?.context as? Endpoint)?.context?.toString() ?: "<n/a>"
init {
connection = Engine.connection()
connection.container = "CORDA:$id"
@ -116,12 +131,12 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
override fun onConnectionInit(event: Event) {
val connection = event.connection
logDebugWithMDC { "Connection init $connection" }
logDebugWithMDC { "Connection init ${connection.prettyPrint}" }
}
override fun onConnectionLocalOpen(event: Event) {
val connection = event.connection
logInfoWithMDC("Connection local open $connection")
logInfoWithMDC("Connection local open ${connection.prettyPrint}")
val session = connection.session()
session.open()
this.session = session
@ -132,13 +147,15 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
override fun onConnectionLocalClose(event: Event) {
val connection = event.connection
logInfoWithMDC("Connection local close $connection")
logInfoWithMDC("Connection local close ${connection.prettyPrint}")
connection.close()
connection.free()
}
override fun onConnectionUnbound(event: Event) {
if (event.connection == this.connection) {
val connection = event.connection
logInfoWithMDC("Connection unbound ${connection.prettyPrint}")
if (connection == this.connection) {
val channel = connection.context as? Channel
if (channel != null) {
if (channel.isActive) {
@ -150,12 +167,13 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
override fun onConnectionFinal(event: Event) {
val connection = event.connection
logDebugWithMDC { "Connection final $connection" }
logDebugWithMDC { "Connection final ${connection.prettyPrint}" }
if (connection == this.connection) {
this.connection.context = null
for (queue in messageQueues.values) {
// clear any dead messages
while (true) {
logDebugWithMDC { "Queue size: ${queue.size}" }
val msg = queue.poll()
if (msg != null) {
msg.doComplete(MessageStatus.Rejected)
@ -167,6 +185,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
}
messageQueues.clear()
while (true) {
logDebugWithMDC { "Unacked queue size: ${unackedQueue.size}" }
val msg = unackedQueue.poll()
if (msg != null) {
msg.doComplete(MessageStatus.Rejected)
@ -185,26 +204,28 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
session = null
receivers.clear()
senders.clear()
} else {
logDebugWithMDC { "Connection from the event: ${connection.prettyPrint} is not the connection owned: ${this.connection.prettyPrint}" }
}
}
override fun onTransportHeadClosed(event: Event) {
val transport = event.transport
logDebugWithMDC { "Transport Head Closed $transport" }
logDebugWithMDC { "Transport Head Closed ${transport.prettyPrint}" }
transport.close_tail()
onTransportInternal(transport)
}
override fun onTransportTailClosed(event: Event) {
val transport = event.transport
logDebugWithMDC { "Transport Tail Closed $transport" }
logDebugWithMDC { "Transport Tail Closed ${transport.prettyPrint}" }
transport.close_head()
onTransportInternal(transport)
}
override fun onTransportClosed(event: Event) {
val transport = event.transport
logDebugWithMDC { "Transport Closed $transport" }
logDebugWithMDC { "Transport Closed ${transport.prettyPrint}" }
if (transport == this.transport) {
transport.unbind()
transport.free()
@ -214,7 +235,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
override fun onTransportError(event: Event) {
val transport = event.transport
logInfoWithMDC("Transport Error $transport")
logInfoWithMDC("Transport Error ${transport.prettyPrint}")
val condition = event.transport.condition
if (condition != null) {
logInfoWithMDC("Error: ${condition.description}")
@ -226,7 +247,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
override fun onTransport(event: Event) {
val transport = event.transport
logDebugWithMDC { "Transport $transport" }
logDebugWithMDC { "Transport ${transport.prettyPrint}" }
onTransportInternal(transport)
}
@ -284,6 +305,17 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
logDebugWithMDC { "Session final $session" }
if (session == this.session) {
this.session = null
// If TRANSPORT_CLOSED event was already processed, the 'transport' in all subsequent events is set to null.
// There is, however, a chance of missing TRANSPORT_CLOSED event, e.g. when disconnect occurs before opening remote session.
// In such cases we must explicitly cleanup the 'transport' in order to guarantee the delivery of CONNECTION_FINAL event.
val transport = event.transport
if (transport == this.transport) {
logDebugWithMDC { "Missed TRANSPORT_CLOSED: force cleanup ${transport.prettyPrint}" }
transport.unbind()
transport.free()
transport.context = null
}
}
}
@ -361,7 +393,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
delivery.context = nextMessage
sender.send(messageBuf.array(), messageBuf.arrayOffset() + messageBuf.readerIndex(), messageBuf.readableBytes())
nextMessage.status = MessageStatus.Sent
logDebugWithMDC { "Put tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)} on wire uuid: ${nextMessage.applicationProperties["_AMQ_DUPL_ID"]}" }
logDebugWithMDC { "Put tag ${delivery.tag.toHexString()} on wire uuid: ${nextMessage.applicationProperties[MESSAGE_ID_KEY]}" }
unackedQueue.offer(nextMessage)
sender.advance()
} finally {
@ -398,7 +430,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
appProperties,
channel,
delivery)
logDebugWithMDC { "Full message received uuid: ${appProperties["_AMQ_DUPL_ID"]}" }
logDebugWithMDC { "Full message received uuid: ${appProperties[MESSAGE_ID_KEY]}" }
channel.writeAndFlush(receivedMessage)
if (link.current() == delivery) {
link.advance()
@ -409,7 +441,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
}
}
} else if (link is Sender) {
logDebugWithMDC { "Sender delivery confirmed tag ${javax.xml.bind.DatatypeConverter.printHexBinary(delivery.tag)}" }
logDebugWithMDC { "Sender delivery confirmed tag ${delivery.tag.toHexString()}" }
val ok = delivery.remotelySettled() && delivery.remoteState == Accepted.getInstance()
val sourceMessage = delivery.context as? SendableMessageImpl
unackedQueue.remove(sourceMessage)
@ -462,6 +494,8 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
if (session != null) {
val sender = getSender(msg.topic)
transmitMessages(sender)
} else {
logInfoWithMDC("Session been closed already")
}
}
@ -470,7 +504,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
try {
do {
val buffer = transport.inputBuffer
val limit = Math.min(buffer.remaining(), source.remaining())
val limit = min(buffer.remaining(), source.remaining())
val duplicate = source.duplicate()
duplicate.limit(source.position() + limit)
buffer.put(duplicate)
@ -483,7 +517,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
condition.description = ex.message
transport.condition = condition
transport.close_tail()
transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
transport.pop(max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
}
}
@ -508,7 +542,7 @@ internal class ConnectionStateMachine(private val serverMode: Boolean,
condition.description = ex.message
transport.condition = condition
transport.close_head()
transport.pop(Math.max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
transport.pop(max(0, transport.pending())) // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
}
}
}

View File

@ -3,6 +3,8 @@ package net.corda.nodeapi.internal.protonwrapper.engine
import io.netty.buffer.ByteBuf
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import net.corda.core.internal.declaredField
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.messages.impl.ReceivedMessageImpl
@ -13,7 +15,6 @@ import org.apache.qpid.proton.amqp.messaging.Rejected
import org.apache.qpid.proton.amqp.transport.DeliveryState
import org.apache.qpid.proton.amqp.transport.ErrorCondition
import org.apache.qpid.proton.engine.*
import org.apache.qpid.proton.engine.impl.CollectorImpl
import org.apache.qpid.proton.reactor.FlowController
import org.apache.qpid.proton.reactor.Handshaker
import org.slf4j.MDC
@ -21,6 +22,7 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
import kotlin.math.max
/**
* The EventProcessor class converts calls on the netty scheduler/pipeline
@ -29,12 +31,12 @@ import kotlin.concurrent.withLock
* and simple sliding window flow control, so that these events don't have to live inside ConnectionStateMachine.
* Everything here is single threaded, because the proton-j library has to be run that way.
*/
internal class EventProcessor(channel: Channel,
internal class EventProcessor(private val channel: Channel,
private val serverMode: Boolean,
private val localLegalName: String,
private val remoteLegalName: String,
userName: String?,
password: String?) : BaseHandler() {
password: String?) {
companion object {
private const val FLOW_WINDOW_SIZE = 10
private val log = contextLogger()
@ -45,7 +47,9 @@ internal class EventProcessor(channel: Channel,
try {
MDC.put("serverMode", serverMode.toString())
MDC.put("localLegalName", localLegalName)
MDC.put("localAddress", channel.localAddress()?.toString())
MDC.put("remoteLegalName", remoteLegalName)
MDC.put("remoteAddress", channel.remoteAddress()?.toString())
block()
} finally {
MDC.setContextMap(oldMDC)
@ -59,10 +63,13 @@ internal class EventProcessor(channel: Channel,
}
private val lock = ReentrantLock()
@Volatile
private var pendingExecute: Boolean = false
@Volatile
private var processorClosed: Boolean = false
private val executor: ScheduledExecutorService = channel.eventLoop()
private val collector = Proton.collector() as CollectorImpl
private val handlers = mutableListOf<Handler>()
private val collector = Proton.collector()
private val handlers: List<Handler>
private val stateMachine: ConnectionStateMachine = ConnectionStateMachine(serverMode,
collector,
localLegalName,
@ -73,15 +80,11 @@ internal class EventProcessor(channel: Channel,
val connection: Connection = stateMachine.connection
init {
addHandler(Handshaker())
addHandler(FlowController(FLOW_WINDOW_SIZE))
addHandler(stateMachine)
handlers = listOf(Handshaker(), FlowController(FLOW_WINDOW_SIZE), stateMachine)
connection.context = channel
tick(stateMachine.connection)
}
fun addHandler(handler: Handler) = handlers.add(handler)
private fun popEvent(): Event? {
var ev = collector.peek()
if (ev != null) {
@ -93,23 +96,28 @@ internal class EventProcessor(channel: Channel,
private fun tick(connection: Connection) {
lock.withLock {
logDebugWithMDC { "Tick" }
try {
if ((connection.localState != EndpointState.CLOSED) && !connection.transport.isClosed) {
val now = System.currentTimeMillis()
val tickDelay = Math.max(0L, connection.transport.tick(now) - now)
val tickDelay = max(0L, connection.transport.tick(now) - now)
executor.schedule({
tick(connection)
processEvents()
}, tickDelay, TimeUnit.MILLISECONDS)
logDebugWithMDC {"Tick done. Next tick scheduled in $tickDelay ms"}
} else {
logDebugWithMDC { "Connection closed - no more ticking" }
}
} catch (ex: Exception) {
withMDC { log.info("Tick failed", ex) }
connection.transport.close()
connection.condition = ErrorCondition()
}
}
}
fun processEvents() {
private fun processEvents() {
lock.withLock {
pendingExecute = false
logDebugWithMDC { "Process Events" }
@ -135,11 +143,27 @@ internal class EventProcessor(channel: Channel,
}
fun close() {
if (connection.localState != EndpointState.CLOSED) {
connection.close()
processEvents()
connection.free()
processEvents()
lock.withLock {
if (!processorClosed) {
processorClosed = true
connection.logLocalState("Before close")
connection.close()
processEvents()
logDebugWithMDC { "Freeing-up connection" }
connection.free()
processEvents()
connection.logLocalState("After close")
} else {
logDebugWithMDC { "Processor is already closed" }
}
}
}
private fun Connection.logLocalState(prefix: String) {
if (log.isDebugEnabled) {
val freedTry = Try.on { declaredField<Boolean>("freed").value }
val refcountTry = Try.on { declaredField<Int>("refcount").value }
logDebugWithMDC { "$prefix, local state: $localState, freed: $freedTry, refcount: $refcountTry" }
}
}

View File

@ -0,0 +1,71 @@
package net.corda.nodeapi.internal.protonwrapper.engine
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import io.netty.channel.Channel
import io.netty.channel.ChannelFuture
import io.netty.channel.DefaultEventLoop
import io.netty.channel.EventLoop
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.internal.rigorousMock
import org.apache.qpid.proton.amqp.transport.Begin
import org.apache.qpid.proton.amqp.transport.Open
import org.apache.qpid.proton.engine.impl.TransportImpl
import org.junit.Test
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
class EventProcessorTest {
@Test(timeout=300_000)
fun `reject unacknowledged message on disconnect`() {
val executor = DefaultEventLoop()
val channel = channel(executor)
val eventProcessor = EventProcessor(channel, false, ALICE_NAME.toString(), BOB_NAME.toString(), "username", "password")
eventProcessor.processEventsAsync()
val msg = SendableMessageImpl("test".toByteArray(), "topic", BOB_NAME.toString(), mock(), mapOf())
eventProcessor.transportWriteMessage(msg)
eventProcessor.processEventsAsync()
// Open remote connection and session
(eventProcessor.connection.transport as TransportImpl).also {
Open().invoke(it, null, 0)
Begin().invoke(it, null, 0)
}
eventProcessor.processEventsAsync()
executor.execute { eventProcessor.close() }
assertEquals(MessageStatus.Rejected, msg.onComplete.get(5, TimeUnit.SECONDS))
}
@Test(timeout=300_000)
fun `reject unacknowledged message on disconnect without remote session being open`() {
val executor = DefaultEventLoop()
val channel = channel(executor)
val eventProcessor = EventProcessor(channel, false, ALICE_NAME.toString(), BOB_NAME.toString(), "username", "password")
eventProcessor.processEventsAsync()
val msg = SendableMessageImpl("test".toByteArray(), "topic", BOB_NAME.toString(), mock(), mapOf())
eventProcessor.transportWriteMessage(msg)
eventProcessor.processEventsAsync()
executor.execute { eventProcessor.close() }
assertEquals(MessageStatus.Rejected, msg.onComplete.get(5, TimeUnit.SECONDS))
}
private fun channel(executor: EventLoop) = rigorousMock<Channel>().also {
doReturn(executor).whenever(it).eventLoop()
doReturn(mock<ChannelFuture>()).whenever(it).writeAndFlush(any())
doReturn(true).whenever(it).isActive
doReturn(mock<ChannelFuture>()).whenever(it).close()
doReturn(null).whenever(it).localAddress()
doReturn(null).whenever(it).remoteAddress()
}
}

View File

@ -1,6 +1,5 @@
package net.corda.node.logging
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
@ -23,7 +22,13 @@ class ErrorCodeLoggingTests {
node.rpc.startFlow(::MyFlow).waitForCompletion()
val logFile = node.logFile()
val linesWithErrorCode = logFile.useLines { lines -> lines.filter { line -> line.contains("[errorCode=") }.filter { line -> line.contains("moreInformationAt=https://errors.corda.net/") }.toList() }
val linesWithErrorCode = logFile.useLines { lines ->
lines.filter { line ->
line.contains("[errorCode=")
}.filter { line ->
line.contains("moreInformationAt=https://errors.corda.net/")
}.toList()
}
assertThat(linesWithErrorCode).isNotEmpty
}
@ -35,10 +40,11 @@ class ErrorCodeLoggingTests {
fun `When logging is set to error level, there are no other levels logged after node startup`() {
driver(DriverParameters(notarySpecs = emptyList())) {
val node = startNode(startInSameProcess = false, logLevelOverride = "ERROR").getOrThrow()
node.rpc.startFlow(::MyFlow).waitForCompletion()
val logFile = node.logFile()
val lengthAfterStart = logFile.length()
node.rpc.startFlow(::MyFlow).waitForCompletion()
// An exception thrown in a flow will log at the "INFO" level.
assertThat(logFile.length()).isEqualTo(0)
assertThat(logFile.length()).isEqualTo(lengthAfterStart)
}
}

View File

@ -1,34 +1,49 @@
package net.corda.node.services.vault
import co.paralleluniverse.strands.concurrent.Semaphore
import com.r3.dbfailure.contracts.DbFailureContract
import com.r3.dbfailure.workflows.CreateStateFlow
import com.r3.dbfailure.workflows.CreateStateFlow.Initiator
import com.r3.dbfailure.workflows.CreateStateFlow.errorTargetsToNum
import com.r3.dbfailure.workflows.DbListenerService
import com.r3.dbfailure.workflows.DbListenerService.MakeServiceThrowErrorFlow
import com.r3.dbfailure.workflows.SendStateFlow
import com.r3.transactionfailure.workflows.ErrorHandling
import com.r3.transactionfailure.workflows.ErrorHandling.CheckpointAfterErrorFlow
import net.corda.core.CordaRuntimeException
import net.corda.core.contracts.StateAndRef
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.startFlow
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
import net.corda.testing.node.User
import net.corda.testing.node.internal.findCordapp
import org.assertj.core.api.Assertions
import org.junit.After
import org.junit.Assert
import org.junit.Test
import java.lang.IllegalStateException
import java.sql.SQLException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import javax.persistence.PersistenceException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@ -49,6 +64,10 @@ class VaultObserverExceptionTest {
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
StaffedFlowHospital.onFlowAdmitted.clear()
DbListenerService.onError = null
DbListenerService.safeSubscription = true
DbListenerService.onNextVisited = {}
DbListenerService.onErrorVisited = null
DbListenerService.withCustomSafeSubscriber = false
}
/**
@ -74,9 +93,43 @@ class VaultObserverExceptionTest {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(
::Initiator,
"Syntax Error in Custom SQL",
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
CreateStateFlow::Initiator,
"Syntax Error in Custom SQL",
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
).returnValue.then { testControlFuture.complete(false) }
val foundExpectedException = testControlFuture.getOrThrow(30.seconds)
Assert.assertTrue(foundExpectedException)
}
}
/**
* Causing an SqlException via a syntax error in a vault observer causes the flow to hit the
* DatabsaseEndocrinologist in the FlowHospital and being kept for overnight observation - Unsafe subscribe
*/
@Test(timeout=300_000)
fun unhandledSqlExceptionFromVaultObserverGetsHospitalisedUnsafeSubscription() {
DbListenerService.safeSubscription = false
val testControlFuture = openFuture<Boolean>().toCompletableFuture()
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add {
when (it) {
is SQLException -> {
testControlFuture.complete(true)
}
}
false
}
driver(DriverParameters(
startNodesInProcess = true,
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(
CreateStateFlow::Initiator,
"Syntax Error in Custom SQL",
CreateStateFlow.errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError)
).returnValue.then { testControlFuture.complete(false) }
val foundExpectedException = testControlFuture.getOrThrow(30.seconds)
@ -103,7 +156,7 @@ class VaultObserverExceptionTest {
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
@ -131,9 +184,9 @@ class VaultObserverExceptionTest {
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "InvalidParameterException", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
}
@ -167,7 +220,7 @@ class VaultObserverExceptionTest {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
assertFailsWith<TimeoutException>("PersistenceException") {
aliceNode.rpc.startFlow(::Initiator, "EntityManager", errorTargetsToNum(
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", errorTargetsToNum(
CreateStateFlow.ErrorTarget.TxInvalidState))
.returnValue.getOrThrow(30.seconds)
}
@ -200,7 +253,7 @@ class VaultObserverExceptionTest {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(
::Initiator, "EntityManager",
CreateStateFlow::Initiator, "EntityManager",
CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.TxInvalidState,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
@ -228,7 +281,7 @@ class VaultObserverExceptionTest {
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
val flowHandle = aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
val flowResult = flowHandle.returnValue
@ -251,7 +304,7 @@ class VaultObserverExceptionTest {
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
val flowHandle = aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "EntityManager", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError,
CreateStateFlow.ErrorTarget.ServiceSwallowErrors))
val flowResult = flowHandle.returnValue
@ -281,20 +334,20 @@ class VaultObserverExceptionTest {
}
driver(DriverParameters(
inMemoryDB = false,
startNodesInProcess = true,
isDebug = true,
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.transactionfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")))) {
inMemoryDB = false,
startNodesInProcess = true,
isDebug = true,
cordappsForAllNodes = listOf(findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.transactionfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")))) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val node = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
node.rpc.startFlow(::CheckpointAfterErrorFlow, CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions, // throw not persistence exception
CreateStateFlow.ErrorTarget.FlowSwallowErrors
)
)
)
waitUntilHospitalised.acquire()
@ -329,9 +382,9 @@ class VaultObserverExceptionTest {
cordappsForAllNodes = testCordapps())) {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser)).getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "Exception", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter,
CreateStateFlow.ErrorTarget.FlowSwallowErrors))
waitUntilHospitalised.acquire() // wait here until flow gets hospitalised
}
@ -344,7 +397,7 @@ class VaultObserverExceptionTest {
val aliceUser = User("user", "foo", setOf(Permissions.all()))
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(aliceUser), startInSameProcess = false).getOrThrow()
aliceNode.rpc.startFlow(::MakeServiceThrowErrorFlow).returnValue.getOrThrow()
aliceNode.rpc.startFlow(::Initiator, "UnrecoverableError", CreateStateFlow.errorTargetsToNum(
aliceNode.rpc.startFlow(CreateStateFlow::Initiator, "UnrecoverableError", CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError))
val terminated = (aliceNode as OutOfProcess).process.waitFor(30, TimeUnit.SECONDS)
@ -363,4 +416,459 @@ class VaultObserverExceptionTest {
}
}
/**
* An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node.
*
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
* also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
* counterparty node.
*
* More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
*
* Check onNext is visited the correct number of times.
*
* This test causes 2 failures inside of the observer to ensure that the observer is still subscribed.
*/
@Test(timeout=300_000)
fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer - onNext check`() {
var observationCounter = 0
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
val rawUpdatesCount = ConcurrentHashMap<Party, Int>()
DbListenerService.onNextVisited = { party ->
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
}
}
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")
),inMemoryDB = false)
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
val startErrorInObservableWhenConsumingState = {
val stateId = aliceNode.rpc.startFlow(
CreateStateFlow::Initiator,
"AllGood",
errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed)
).returnValue.getOrThrow(30.seconds)
println("Created new state")
val flowHandle = aliceNode.rpc.startFlow(
SendStateFlow::PassErroneousOwnableState, // throws at consumed state -> should end up in hospital -> flow should hang
stateId,
errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError),
bobNode.nodeInfo.legalIdentities.first()
)
Assertions.assertThatExceptionOfType(TimeoutException::class.java)
.isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) }
stateId
}
assertEquals(0, notary.getNotarisedTransactionIds().size)
println("First set of flows")
val stateId = startErrorInObservableWhenConsumingState()
assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
assertEquals(1, notary.getNotarisedTransactionIds().size)
assertEquals(1, observationCounter)
assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
println("Second set of flows")
val stateId2 = startErrorInObservableWhenConsumingState()
assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
assertEquals(2, notary.getNotarisedTransactionIds().size)
assertEquals(2, observationCounter)
assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
}
}
/**
* An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node.
*
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
* also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
* counterparty node.
*
* More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
*
* Check onNext and onError are visited the correct number of times.
*
* This test causes 2 failures inside of the observer to ensure that the observer is still subscribed.
*/
@Test(timeout=300_000)
fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer - onNext and onError check`() {
var observationCounter = 0
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
val rawUpdatesCount = ConcurrentHashMap<Party, Int>()
DbListenerService.onNextVisited = { party ->
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
}
}
DbListenerService.onError = {/*just rethrow - we just want to check that onError gets visited by parties*/ throw it}
DbListenerService.onErrorVisited = { party ->
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
}
}
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")
),
inMemoryDB = false)
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
val startErrorInObservableWhenConsumingState = {
val stateId = aliceNode.rpc.startFlow(
CreateStateFlow::Initiator,
"AllGood",
// should be a hospital exception
errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed)
).returnValue.getOrThrow(30.seconds)
val flowHandle = aliceNode.rpc.startFlow(
SendStateFlow::PassErroneousOwnableState,
stateId,
errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError),
bobNode.nodeInfo.legalIdentities.first()
)
Assertions.assertThatExceptionOfType(TimeoutException::class.java)
.isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) }
stateId
}
assertEquals(0, notary.getNotarisedTransactionIds().size)
val stateId = startErrorInObservableWhenConsumingState()
assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
assertEquals(1, notary.getNotarisedTransactionIds().size)
assertEquals(1, observationCounter)
assertEquals(3, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
val stateId2 = startErrorInObservableWhenConsumingState()
assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
assertEquals(2, notary.getNotarisedTransactionIds().size)
assertEquals(2, observationCounter)
assertEquals(6, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
}
}
/**
* An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the counterparty node.
*
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent.
* Observer events are recorded on both the initiating node and the counterparty node.
*
* More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
*
* This test causes 2 failures inside of the observer to ensure that the observer is still subscribed.
*/
@Test(timeout=300_000)
fun `Throw user error in VaultService rawUpdates during counterparty FinalityFlow blows up the flow but does not break the Observer`() {
var observationCounter = 0
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
val rawUpdatesCount = ConcurrentHashMap<Party, Int>()
DbListenerService.onNextVisited = { party ->
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
}
}
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")
),
inMemoryDB = false)
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
val startErrorInObservableWhenCreatingSecondState = {
val stateId = aliceNode.rpc.startFlow(
CreateStateFlow::Initiator,
"AllGood",
errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError)
).returnValue.getOrThrow(30.seconds)
aliceNode.rpc.startFlow(
SendStateFlow::PassErroneousOwnableState,
stateId,
errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError),
bobNode.nodeInfo.legalIdentities.first()
).returnValue.getOrThrow(20.seconds)
stateId
}
assertEquals(0, notary.getNotarisedTransactionIds().size)
val stateId = startErrorInObservableWhenCreatingSecondState()
assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
assertEquals(1, notary.getNotarisedTransactionIds().size)
assertEquals(1, observationCounter)
assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(1, rawUpdatesCount[bobNode.nodeInfo.singleIdentity()])
val stateId2 = startErrorInObservableWhenCreatingSecondState()
assertEquals(1, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.CONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
assertEquals(2, notary.getNotarisedTransactionIds().size)
assertEquals(2, observationCounter)
assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(2, rawUpdatesCount[bobNode.nodeInfo.singleIdentity()])
}
}
/**
* An error is thrown inside of the [VaultService.updates] observable while recording a transaction inside of the initiating node.
*
* This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
* also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
* counterparty node.
*
* More importantly, the observer listening to the [VaultService.updates] observable should not unsubscribe.
*
* This test causes 2 failures inside of the [rx.Observer] to ensure that the Observer is still subscribed.
*/
@Test(timeout=300_000)
fun `Throw user error in VaultService rawUpdates during FinalityFlow blows up the flow but does not break the Observer`() {
var observationCounter = 0
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
val rawUpdatesCount = ConcurrentHashMap<Party, Int>()
DbListenerService.onNextVisited = { party ->
if (rawUpdatesCount.putIfAbsent(party, 1) != null) {
rawUpdatesCount.computeIfPresent(party) { _, count -> count + 1 }
}
}
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")
),
inMemoryDB = false)
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
val startErrorInObservableWhenConsumingState = {
val stateId = aliceNode.rpc.startFlow(
CreateStateFlow::Initiator,
"AllGood",
errorTargetsToNum(CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed)
).returnValue.getOrThrow(30.seconds)
val flowHandle = aliceNode.rpc.startFlow(
SendStateFlow::PassErroneousOwnableState,
stateId,
errorTargetsToNum(CreateStateFlow.ErrorTarget.NoError),
bobNode.nodeInfo.legalIdentities.first()
)
Assertions.assertThatExceptionOfType(TimeoutException::class.java)
.isThrownBy { flowHandle.returnValue.getOrThrow(20.seconds) }
stateId
}
assertEquals(0, notary.getNotarisedTransactionIds().size)
val stateId = startErrorInObservableWhenConsumingState()
assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
assertEquals(1, notary.getNotarisedTransactionIds().size)
assertEquals(1, observationCounter)
assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
val stateId2 = startErrorInObservableWhenConsumingState()
assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.UNCONSUMED).size)
assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
assertEquals(2, notary.getNotarisedTransactionIds().size)
assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
}
}
@Test(timeout=300_000)
fun `Accessing NodeVaultService rawUpdates from a flow is not allowed` () {
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")
),
inMemoryDB = false)
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val flowHandle = aliceNode.rpc.startFlow(::SubscribingRawUpdatesFlow)
assertFailsWith<CordaRuntimeException>(
"Flow ${SubscribingRawUpdatesFlow::class.java.name} tried to access VaultService.rawUpdates " +
"- Rx.Observables should only be accessed outside the context of a flow "
) {
flowHandle.returnValue.getOrThrow(30.seconds)
}
}
}
@Test(timeout=300_000)
fun `Failing Observer wrapped with ResilientSubscriber will survive and be re-called upon flow retry`() {
var onNextCount = 0
var onErrorCount = 0
DbListenerService.onNextVisited = { _ -> onNextCount++ }
DbListenerService.onError = {/*just rethrow - we just want to check that onError gets visited by parties*/ throw it}
DbListenerService.onErrorVisited = { _ -> onErrorCount++ }
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.transactionfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")),
inMemoryDB = false)
) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
assertFailsWith<TimeoutException> {
aliceNode.rpc.startFlow(
ErrorHandling::CheckpointAfterErrorFlow,
CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceConstraintViolationException,
CreateStateFlow.ErrorTarget.FlowSwallowErrors
)
).returnValue.getOrThrow(20.seconds)
}
assertEquals(4, onNextCount)
assertEquals(4, onErrorCount)
}
}
@Test(timeout=300_000)
fun `Users may subscribe to NodeVaultService rawUpdates with their own custom SafeSubscribers`() {
var onNextCount = 0
DbListenerService.onNextVisited = { _ -> onNextCount++ }
val user = User("user", "foo", setOf(Permissions.all()))
driver(DriverParameters(startNodesInProcess = true,
cordappsForAllNodes = listOf(
findCordapp("com.r3.dbfailure.contracts"),
findCordapp("com.r3.dbfailure.workflows"),
findCordapp("com.r3.transactionfailure.workflows"),
findCordapp("com.r3.dbfailure.schemas")),
inMemoryDB = false)
) {
// Subscribing with custom SafeSubscriber; the custom SafeSubscriber will not get replaced by a ResilientSubscriber
// meaning that it will behave as a SafeSubscriber; it will get unsubscribed upon throwing an error.
// Because we throw a ConstraintViolationException, the Rx Observer will get unsubscribed but the flow will retry
// from previous checkpoint, however the Observer will no longer be there.
DbListenerService.withCustomSafeSubscriber = true
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
aliceNode.rpc.startFlow(
ErrorHandling::CheckpointAfterErrorFlow,
CreateStateFlow.errorTargetsToNum(
CreateStateFlow.ErrorTarget.ServiceConstraintViolationException,
CreateStateFlow.ErrorTarget.FlowSwallowErrors
)
).returnValue.getOrThrow(20.seconds)
assertEquals(1, onNextCount)
}
}
private fun NodeHandle.getNotarisedTransactionIds(): List<String> {
@StartableByRPC
class NotarisedTxs : FlowLogic<List<String>>() {
override fun call(): List<String> {
val session = serviceHub.jdbcSession()
val statement = session.createStatement()
statement.execute("SELECT TRANSACTION_ID FROM NODE_NOTARY_COMMITTED_TXS;")
val result = mutableListOf<String>()
while (statement.resultSet.next()) {
result.add(statement.resultSet.getString(1))
}
return result
}
}
return rpc.startFlowDynamic(NotarisedTxs::class.java).returnValue.getOrThrow()
}
private fun NodeHandle.getStatesById(id: UniqueIdentifier?, status: Vault.StateStatus): List<StateAndRef<DbFailureContract.TestState>> {
return rpc.vaultQueryByCriteria(
QueryCriteria.LinearStateQueryCriteria(
linearId = if (id != null) listOf(id) else null,
status = status
), DbFailureContract.TestState::class.java
).states
}
private fun NodeHandle.getAllStates(status: Vault.StateStatus): List<StateAndRef<DbFailureContract.TestState>> {
return getStatesById(null, status)
}
@StartableByRPC
class SubscribingRawUpdatesFlow: FlowLogic<Unit>() {
override fun call() {
logger.info("Accessing rawUpdates within a flow will throw! ")
val rawUpdates = serviceHub.vaultService.rawUpdates // throws
logger.info("Code flow should never reach this logging or the following segment! ")
rawUpdates.subscribe {
println("Code flow should never get in here!")
}
}
}
}

View File

@ -9,7 +9,6 @@ import net.corda.core.flows.*
import net.corda.core.internal.*
import net.corda.core.internal.cordapp.CordappImpl
import net.corda.core.internal.cordapp.CordappImpl.Companion.UNKNOWN_INFO
import net.corda.core.internal.cordapp.CordappResolver
import net.corda.core.internal.cordapp.get
import net.corda.core.internal.notary.NotaryService
import net.corda.core.internal.notary.SinglePartyNotaryService
@ -23,13 +22,13 @@ import net.corda.node.VersionInfo
import net.corda.nodeapi.internal.cordapp.CordappLoader
import net.corda.nodeapi.internal.coreContractClasses
import net.corda.serialization.internal.DefaultWhitelist
import org.apache.commons.collections4.map.LRUMap
import java.lang.reflect.Modifier
import java.math.BigInteger
import java.net.URL
import java.net.URLClassLoader
import java.nio.file.Path
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.jar.JarInputStream
import java.util.jar.Manifest
import java.util.zip.ZipInputStream
@ -52,7 +51,7 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths:
logger.info("Loading CorDapps from ${cordappJarPaths.joinToString()}")
}
}
private val cordappClasses: ConcurrentHashMap<String, Set<Cordapp>> = ConcurrentHashMap()
override val cordapps: List<CordappImpl> by lazy { loadCordapps() + extraCordapps }
override val appClassLoader: URLClassLoader = URLClassLoader(cordappJarPaths.stream().map { it.url }.toTypedArray(), javaClass.classLoader)
@ -128,10 +127,35 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths:
}
}
}
cordapps.forEach(CordappResolver::register)
cordapps.forEach(::register)
return cordapps
}
private fun register(cordapp: Cordapp) {
val contractClasses = cordapp.contractClassNames.toSet()
val existingClasses = cordappClasses.keys
val classesToRegister = cordapp.cordappClasses.toSet()
val notAlreadyRegisteredClasses = classesToRegister - existingClasses
val alreadyRegistered= HashMap(cordappClasses).apply { keys.retainAll(classesToRegister) }
notAlreadyRegisteredClasses.forEach { cordappClasses[it] = setOf(cordapp) }
for ((registeredClassName, registeredCordapps) in alreadyRegistered) {
val duplicateCordapps = registeredCordapps.filter { it.jarHash == cordapp.jarHash }.toSet()
if (duplicateCordapps.isNotEmpty()) {
throw IllegalStateException("The CorDapp (name: ${cordapp.info.shortName}, file: ${cordapp.name}) " +
"is installed multiple times on the node. The following files correspond to the exact same content: " +
"${duplicateCordapps.map { it.name }}")
}
if (registeredClassName in contractClasses) {
throw IllegalStateException("More than one CorDapp installed on the node for contract $registeredClassName. " +
"Please remove the previous version when upgrading to a new version.")
}
cordappClasses[registeredClassName] = registeredCordapps + cordapp
}
}
private fun RestrictedScanResult.toCordapp(url: RestrictedURL): CordappImpl {
val manifest: Manifest? = url.url.openStream().use { JarInputStream(it).manifest }
val info = parseCordappInfo(manifest, CordappImpl.jarName(url.url))
@ -268,9 +292,7 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths:
}
private fun findWhitelists(cordappJarPath: RestrictedURL): List<SerializationWhitelist> {
val whitelists = URLClassLoader(arrayOf(cordappJarPath.url)).use {
ServiceLoader.load(SerializationWhitelist::class.java, it).toList()
}
val whitelists = ServiceLoader.load(SerializationWhitelist::class.java, appClassLoader).toList()
return whitelists.filter {
it.javaClass.location == cordappJarPath.url && it.javaClass.name.startsWith(cordappJarPath.qualifiedNamePrefix)
} + DefaultWhitelist // Always add the DefaultWhitelist to the whitelist for an app.
@ -284,19 +306,21 @@ class JarScanningCordappLoader private constructor(private val cordappJarPaths:
return scanResult.getClassesWithSuperclass(MappedSchema::class).instances().toSet()
}
private val cachedScanResult = LRUMap<RestrictedURL, RestrictedScanResult>(1000)
private fun scanCordapp(cordappJarPath: RestrictedURL): RestrictedScanResult {
logger.info("Scanning CorDapp in ${cordappJarPath.url}")
return cachedScanResult.computeIfAbsent(cordappJarPath) {
val scanResult = ClassGraph().addClassLoader(appClassLoader).overrideClasspath(cordappJarPath.url).enableAllInfo().pooledScan()
RestrictedScanResult(scanResult, cordappJarPath.qualifiedNamePrefix)
}
val cordappElement = cordappJarPath.url.toString()
logger.info("Scanning CorDapp in $cordappElement")
val scanResult = ClassGraph()
.filterClasspathElements { elt -> elt == cordappElement }
.overrideClassLoaders(appClassLoader)
.ignoreParentClassLoaders()
.enableAllInfo()
.pooledScan()
return RestrictedScanResult(scanResult, cordappJarPath.qualifiedNamePrefix)
}
private fun <T : Any> loadClass(className: String, type: KClass<T>): Class<out T>? {
return try {
appClassLoader.loadClass(className).asSubclass(type.java)
Class.forName(className, false, appClassLoader).asSubclass(type.java)
} catch (e: ClassCastException) {
logger.warn("As $className must be a sub-type of ${type.java.name}")
null

View File

@ -2,6 +2,7 @@ package net.corda.node.services.vault
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import net.corda.core.CordaRuntimeException
import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.containsAny
@ -13,6 +14,7 @@ import net.corda.core.node.StatesToRecord
import net.corda.core.node.services.*
import net.corda.core.node.services.Vault.ConstraintInfo.Companion.constraintInfo
import net.corda.core.node.services.vault.*
import net.corda.core.observable.internal.OnResilientSubscribe
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.*
@ -209,7 +211,27 @@ class NodeVaultService(
}
override val rawUpdates: Observable<Vault.Update<ContractState>>
get() = mutex.locked { _rawUpdatesPublisher }
get() = mutex.locked {
FlowStateMachineImpl.currentStateMachine()?.let {
// we are inside a flow; we cannot allow flows to subscribe Rx Observers,
// because the Observer could reference flow's properties, essentially fiber's properties then,
// since it does not unsubscribe on flow's/ fiber's completion,
// it could prevent the flow/ fiber -object- get garbage collected.
log.error(
"Flow ${it.logic::class.java.name} tried to access VaultService.rawUpdates " +
"- Rx.Observables should only be accessed outside the context of a flow " +
"- aborting the flow "
)
throw CordaRuntimeException(
"Flow ${it.logic::class.java.name} tried to access VaultService.rawUpdates " +
"- Rx.Observables should only be accessed outside the context of a flow "
)
}
// we are not inside a flow, we are most likely inside a CordaService;
// we will expose, by default, subscribing of -non unsubscribing- rx.Observers to rawUpdates.
return _rawUpdatesPublisher.resilientOnError()
}
override val updates: Observable<Vault.Update<ContractState>>
get() = mutex.locked { _updatesInDbTx }
@ -414,7 +436,7 @@ class NodeVaultService(
HospitalizeFlowException(wrapped)
}
}
} ?: HospitalizeFlowException(e)
} ?: (e as? SQLException ?: (e as? PersistenceException ?: HospitalizeFlowException(e)))
}
}
}
@ -795,4 +817,7 @@ class NodeVaultService(
}
return myTypes
}
}
}
/** The Observable returned allows subscribing with custom SafeSubscribers to source [Observable]. */
internal fun<T> Observable<T>.resilientOnError(): Observable<T> = Observable.unsafeCreate(OnResilientSubscribe(this, false))

View File

@ -5,7 +5,8 @@ import com.typesafe.config.ConfigFactory
import net.corda.core.node.services.AttachmentId
import net.corda.core.node.services.AttachmentStorage
import net.corda.node.VersionInfo
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.internal.ContractJarTestUtils
import net.corda.testing.core.internal.SelfCleaningDir
import net.corda.testing.internal.MockCordappConfigProvider
import net.corda.testing.services.MockAttachmentStorage
import org.assertj.core.api.Assertions.assertThat
@ -14,7 +15,9 @@ import org.junit.Before
import org.junit.Test
import java.io.File
import java.io.FileOutputStream
import java.lang.IllegalStateException
import java.net.URL
import java.nio.file.Files
import java.util.jar.JarOutputStream
import java.util.zip.Deflater.NO_COMPRESSION
import java.util.zip.ZipEntry
@ -56,7 +59,6 @@ class CordappProviderImplTests {
}
private lateinit var attachmentStore: AttachmentStorage
private val whitelistedContractImplementations = testNetworkParameters().whitelistedContractImplementations
@Before
fun setup() {
@ -189,6 +191,33 @@ class CordappProviderImplTests {
assertThat(fixedIDs).containsExactlyInAnyOrder(ID2, ID4)
}
@Test(timeout=300_000)
fun `test an exception is raised when we have two jars with the same hash`() {
SelfCleaningDir().use { file ->
val jarAndSigner = ContractJarTestUtils.makeTestSignedContractJar(file.path, "com.example.MyContract")
val signedJarPath = jarAndSigner.first
val duplicateJarPath = signedJarPath.parent.resolve("duplicate-" + signedJarPath.fileName)
Files.copy(signedJarPath, duplicateJarPath)
assertFailsWith<IllegalStateException> {
newCordappProvider(signedJarPath.toUri().toURL(), duplicateJarPath.toUri().toURL())
}
}
}
@Test(timeout=300_000)
fun `test an exception is raised when two jars share a contract`() {
SelfCleaningDir().use { file ->
val jarA = ContractJarTestUtils.makeTestContractJar(file.path, listOf("com.example.MyContract", "com.example.AnotherContractForA"), generateManifest = false, jarFileName = "sampleA.jar")
val jarB = ContractJarTestUtils.makeTestContractJar(file.path, listOf("com.example.MyContract", "com.example.AnotherContractForB"), generateManifest = false, jarFileName = "sampleB.jar")
assertFailsWith<IllegalStateException> {
newCordappProvider(jarA.toUri().toURL(), jarB.toUri().toURL())
}
}
}
private fun File.writeFixupRules(vararg lines: String): File {
JarOutputStream(FileOutputStream(this)).use { jar ->
jar.setMethod(DEFLATED)

View File

@ -5,7 +5,6 @@ import net.corda.core.contracts.TransactionVerificationException
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.cordapp.CordappResolver
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
@ -35,7 +34,8 @@ class FinalityHandlerTest {
fun `sent to flow hospital on error and attempted retry on node restart`() {
// Setup a network where only Alice has the finance CorDapp and it sends a cash tx to Bob who doesn't have the
// CorDapp. Bob's FinalityHandler will error when validating the tx.
val alice = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, additionalCordapps = FINANCE_CORDAPPS))
val alice = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME,
additionalCordapps = FINANCE_CORDAPPS + CustomCordapp(targetPlatformVersion = 3, classes = setOf(FinalityFlow::class.java))))
var bob = mockNet.createNode(InternalMockNodeParameters(
legalName = BOB_NAME,
@ -82,11 +82,9 @@ class FinalityHandlerTest {
}
private fun TestStartedNode.finaliseWithOldApi(stx: SignedTransaction): CordaFuture<SignedTransaction> {
return CordappResolver.withTestCordapp(targetPlatformVersion = 3) {
@Suppress("DEPRECATION")
services.startFlow(FinalityFlow(stx)).resultFuture.apply {
@Suppress("DEPRECATION")
return services.startFlow(FinalityFlow(stx)).resultFuture.apply {
mockNet.runNetwork()
}
}
}

View File

@ -2,7 +2,6 @@ package net.corda.node.services.vault
import co.paralleluniverse.fibers.Suspendable
import com.nhaarman.mockito_kotlin.argThat
import com.nhaarman.mockito_kotlin.doNothing
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.contracts.*
@ -11,7 +10,6 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.generateKeyPair
import net.corda.core.identity.*
import net.corda.core.internal.NotaryChangeTransactionBuilder
import net.corda.core.internal.cordapp.CordappResolver
import net.corda.core.internal.packageName
import net.corda.core.node.NotaryInfo
import net.corda.core.node.StatesToRecord
@ -887,7 +885,7 @@ class NodeVaultServiceTest {
}
@Test(timeout=300_000)
fun `V3 vault queries return all states by default`() {
fun `Vault queries return all states by default`() {
fun createTx(number: Int, vararg participants: Party): SignedTransaction {
return services.signInitialTransaction(TransactionBuilder(DUMMY_NOTARY).apply {
addOutputState(DummyState(number, participants.toList()), DummyContract.PROGRAM_ID)
@ -897,20 +895,18 @@ class NodeVaultServiceTest {
fun List<StateAndRef<DummyState>>.getNumbers() = map { it.state.data.magicNumber }.toSet()
CordappResolver.withTestCordapp(targetPlatformVersion = 3) {
services.recordTransactions(StatesToRecord.ONLY_RELEVANT, listOf(createTx(1, megaCorp.party)))
services.recordTransactions(StatesToRecord.ONLY_RELEVANT, listOf(createTx(2, miniCorp.party)))
services.recordTransactions(StatesToRecord.ONLY_RELEVANT, listOf(createTx(3, miniCorp.party, megaCorp.party)))
services.recordTransactions(StatesToRecord.ALL_VISIBLE, listOf(createTx(4, miniCorp.party)))
services.recordTransactions(StatesToRecord.ALL_VISIBLE, listOf(createTx(5, bankOfCorda.party)))
services.recordTransactions(StatesToRecord.ALL_VISIBLE, listOf(createTx(6, megaCorp.party, bankOfCorda.party)))
services.recordTransactions(StatesToRecord.NONE, listOf(createTx(7, bankOfCorda.party)))
services.recordTransactions(StatesToRecord.ONLY_RELEVANT, listOf(createTx(1, megaCorp.party)))
services.recordTransactions(StatesToRecord.ONLY_RELEVANT, listOf(createTx(2, miniCorp.party)))
services.recordTransactions(StatesToRecord.ONLY_RELEVANT, listOf(createTx(3, miniCorp.party, megaCorp.party)))
services.recordTransactions(StatesToRecord.ALL_VISIBLE, listOf(createTx(4, miniCorp.party)))
services.recordTransactions(StatesToRecord.ALL_VISIBLE, listOf(createTx(5, bankOfCorda.party)))
services.recordTransactions(StatesToRecord.ALL_VISIBLE, listOf(createTx(6, megaCorp.party, bankOfCorda.party)))
services.recordTransactions(StatesToRecord.NONE, listOf(createTx(7, bankOfCorda.party)))
// Test one.
// RelevancyStatus is ALL by default. This should return five states.
val resultOne = vaultService.queryBy<DummyState>().states.getNumbers()
assertEquals(setOf(1, 3, 4, 5, 6), resultOne)
}
// Test one.
// RelevancyStatus is ALL by default. This should return five states.
val resultOne = vaultService.queryBy<DummyState>().states.getNumbers()
assertEquals(setOf(1, 3, 4, 5, 6), resultOne)
// We should never see 2 or 7.
}

View File

@ -3,6 +3,10 @@ package net.corda.node.utilities
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.tee
import net.corda.core.observable.internal.ResilientSubscriber
import net.corda.core.observable.internal.OnNextFailedException
import net.corda.core.observable.continueOnError
import net.corda.node.services.vault.resilientOnError
import net.corda.nodeapi.internal.persistence.*
import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
@ -10,9 +14,17 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Test
import rx.Observable
import rx.Subscriber
import rx.exceptions.CompositeException
import rx.exceptions.OnErrorFailedException
import rx.exceptions.OnErrorNotImplementedException
import rx.internal.util.ActionSubscriber
import rx.observers.SafeSubscriber
import rx.observers.Subscribers
import rx.subjects.PublishSubject
import java.io.Closeable
import java.lang.IllegalArgumentException
import java.lang.IllegalStateException
import java.lang.RuntimeException
import java.util.*
import kotlin.test.assertEquals
@ -194,7 +206,7 @@ class ObservablesTests {
* tee combines [PublishSubject]s under one PublishSubject. We need to make sure that they are not wrapped with a [SafeSubscriber].
* Otherwise, if a non Rx exception gets thrown from a subscriber under one of the PublishSubject it will get caught by the
* SafeSubscriber wrapping that PublishSubject and will call [PublishSubject.PublishSubjectState.onError], which will
* eventually shut down all of the subscribers under that PublishSubjectState.
* eventually shut down all of the subscribers under that PublishSubject.
*/
@Test(timeout=300_000)
fun `error in unsafe subscriber won't shutdown subscribers under same publish subject, after tee`() {
@ -214,6 +226,200 @@ class ObservablesTests {
assertEquals(2, count)
}
@Test(timeout=300_000)
fun `continueOnError subscribes ResilientSubscribers, wrapped Observers will survive errors from onNext`() {
var heartBeat1 = 0
var heartBeat2 = 0
val source = PublishSubject.create<Int>()
val continueOnError = source.continueOnError()
continueOnError.subscribe { runNo ->
// subscribes with a ResilientSubscriber
heartBeat1++
if (runNo == 1) {
throw IllegalStateException()
}
}
continueOnError.subscribe { runNo ->
// subscribes with a ResilientSubscriber
heartBeat2++
if (runNo == 2) {
throw IllegalStateException()
}
}
assertFailsWith<OnErrorNotImplementedException> {
source.onNext(1) // first observer only will run and throw
}
assertFailsWith<OnErrorNotImplementedException> {
source.onNext(2) // first observer will run, second observer will run and throw
}
source.onNext(3) // both observers will run
assertEquals(3, heartBeat1)
assertEquals(2, heartBeat2)
}
@Test(timeout=300_000)
fun `PublishSubject unsubscribes ResilientSubscribers only upon explicitly calling onError`() {
var heartBeat = 0
val source = PublishSubject.create<Int>()
source.continueOnError().subscribe { heartBeat += it }
source.continueOnError().subscribe { heartBeat += it }
source.onNext(1)
// send an onError event
assertFailsWith<CompositeException> {
source.onError(IllegalStateException()) // all ResilientSubscribers under PublishSubject get unsubscribed here
}
source.onNext(1)
assertEquals(2, heartBeat)
}
@Test(timeout=300_000)
fun `PublishSubject wrapped with a SafeSubscriber shuts down the whole structure, if one of them is unsafe and it throws`() {
var heartBeat = 0
val source = PublishSubject.create<Int>()
source.unsafeSubscribe(Subscribers.create { runNo -> // subscribes unsafe; It does not wrap with ResilientSubscriber
heartBeat++
if (runNo == 1) {
throw IllegalStateException()
}
})
source.continueOnError().subscribe { heartBeat += it }
// wrapping PublishSubject with a SafeSubscriber
val sourceWrapper = SafeSubscriber(Subscribers.from(source))
assertFailsWith<OnErrorFailedException> {
sourceWrapper.onNext(1)
}
sourceWrapper.onNext(2)
assertEquals(1, heartBeat)
}
/**
* A [ResilientSubscriber] that is NOT a leaf in a subscribers structure will not call [onError]
* if an error occurs during its [onNext] event processing.
*
* The reason why it should not call its onError is: if it wraps a [PublishSubject], calling [ResilientSubscriber.onError]
* will then call [PublishSubject.onError] which will shut down all the subscribers under the [PublishSubject].
*/
@Test(timeout=300_000)
fun `PublishSubject wrapped with a ResilientSubscriber will preserve the structure, if one of its children subscribers is unsafe and it throws`() {
var heartBeat = 0
val source = PublishSubject.create<Int>()
source.unsafeSubscribe(Subscribers.create { runNo ->
heartBeat++
if (runNo == 1) {
throw IllegalStateException()
}
})
source.continueOnError().subscribe { heartBeat++ }
// wrap PublishSubject with a ResilientSubscriber
val sourceWrapper = ResilientSubscriber(Subscribers.from(source))
assertFailsWith<OnNextFailedException>("Observer.onNext failed, this is a non leaf ResilientSubscriber, therefore onError will be skipped") {
sourceWrapper.onNext(1)
}
sourceWrapper.onNext(2)
assertEquals(3, heartBeat)
}
@Test(timeout=300_000)
fun `throwing inside onNext of a ResilientSubscriber leaf subscriber will call onError`() {
var heartBeatOnNext = 0
var heartBeatOnError = 0
val source = PublishSubject.create<Int>()
// add a leaf ResilientSubscriber
source.continueOnError().subscribe({
heartBeatOnNext++
throw IllegalStateException()
}, {
heartBeatOnError++
})
source.onNext(1)
source.onNext(1)
assertEquals(2, heartBeatOnNext)
assertEquals(2, heartBeatOnError)
}
/**
* In this test ResilientSubscriber throws an OnNextFailedException which is a OnErrorNotImplementedException.
* Because its underlying subscriber is not an ActionSubscriber, it will not be considered as a leaf ResilientSubscriber.
*/
@Test(timeout=300_000)
fun `throwing ResilientSubscriber at onNext will wrap with a Rx OnErrorNotImplementedException`() {
val resilientSubscriber = ResilientSubscriber<Int>(Subscribers.create { throw IllegalStateException() })
assertFailsWith<OnErrorNotImplementedException> { // actually fails with an OnNextFailedException
resilientSubscriber.onNext(1)
}
}
@Test(timeout=300_000)
fun `throwing inside ResilientSubscriber onError will wrap with a Rx OnErrorFailedException`() {
val resilientSubscriber = ResilientSubscriber<Int>(
ActionSubscriber(
{ throw IllegalStateException() },
{ throw IllegalStateException() },
null
)
)
assertFailsWith<OnErrorFailedException> {
resilientSubscriber.onNext(1)
}
}
/**
* In this test we create a chain of Subscribers with this the following order:
* ResilientSubscriber_X -> PublishSubject -> ResilientSubscriber_Y
*
* ResilientSubscriber_Y.onNext throws an error, since ResilientSubscriber_Y.onError is not defined,
* it will throw a OnErrorNotImplementedException. Then it will be propagated back until ResilientSubscriber_X.
* ResilientSubscriber_X will identify it is a not leaf subscriber and therefore will rethrow it as OnNextFailedException.
*/
@Test(timeout=300_000)
fun `propagated Rx exception will be rethrown at ResilientSubscriber onError`() {
val source = PublishSubject.create<Int>()
source.continueOnError().subscribe { throw IllegalStateException("123") } // will give a leaf ResilientSubscriber
val sourceWrapper = ResilientSubscriber(Subscribers.from(source)) // will give an inner ResilientSubscriber
assertFailsWith<OnNextFailedException>("Observer.onNext failed, this is a non leaf ResilientSubscriber, therefore onError will be skipped") {
// IllegalStateException will be wrapped and rethrown as a OnErrorNotImplementedException in leaf ResilientSubscriber,
// will be caught by inner ResilientSubscriber and just be rethrown
sourceWrapper.onNext(1)
}
}
@Test(timeout=300_000)
fun `test OnResilientSubscribe strictMode = true replaces SafeSubscriber subclass`() {
var heartBeat = 0
val customSafeSubscriber = CustomSafeSubscriber(
Subscribers.create<Int> {
heartBeat++
throw IllegalArgumentException()
})
val source = PublishSubject.create<Int>()
source.continueOnError().subscribe(customSafeSubscriber) // it should replace CustomSafeSubscriber with ResilientSubscriber
assertFailsWith<OnErrorNotImplementedException> { source.onNext(1) }
assertFailsWith<OnErrorNotImplementedException> { source.onNext(1) }
assertEquals(2, heartBeat)
}
@Test(timeout=300_000)
fun `test OnResilientSubscribe strictMode = false will not replace SafeSubscriber subclass`() {
var heartBeat = 0
val customSafeSubscriber = CustomSafeSubscriber(
Subscribers.create<Int> {
heartBeat++
throw IllegalArgumentException()
})
val source = PublishSubject.create<Int>()
source.resilientOnError().subscribe(customSafeSubscriber) // it should not replace CustomSafeSubscriber with ResilientSubscriber
assertFailsWith<OnErrorNotImplementedException> { source.onNext(1) }
source.onNext(1)
assertEquals(1, heartBeat)
}
@Test(timeout=300_000)
fun `combine tee and bufferUntilDatabaseCommit`() {
val database = createDatabase()
@ -359,4 +565,6 @@ class ObservablesTests {
subscription3.unsubscribe()
}
class CustomSafeSubscriber<T>(actual: Subscriber<in T>): SafeSubscriber<T>(actual)
}

View File

@ -1,14 +1,6 @@
buildscript {
ext {
springBootVersion = '1.5.21.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath "org.springframework.boot:spring-boot-gradle-plugin:$springBootVersion"
classpath "io.spring.gradle:dependency-management-plugin:1.0.8.RELEASE"
}
plugins {
id "org.springframework.boot" version "1.5.21.RELEASE"
id 'io.spring.dependency-management' version '1.0.9.RELEASE' apply false
}
// Spring Boot plugin adds a numerous hardcoded dependencies in the version much lower then Corda expects
@ -23,7 +15,6 @@ ext['mockito.version'] = "$mockito_version"
apply plugin: 'kotlin'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'net.corda.plugins.quasar-utils'
apply plugin: 'application'

View File

@ -56,7 +56,7 @@ jar {
}
task testJar(type: Jar) {
classifier "test"
classifier "tests"
from sourceSets.main.output
from sourceSets.test.output
}

View File

@ -12,8 +12,9 @@ buildscript {
}
plugins {
id 'io.spring.dependency-management'
id 'com.craigburke.client-dependencies' version '1.4.0'
id 'io.spring.dependency-management'
id 'org.springframework.boot'
}
group = "${parent.group}.irs-demo"
@ -55,7 +56,6 @@ ext['jackson.version'] = jackson_version
apply plugin: 'kotlin'
apply plugin: 'kotlin-spring'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'project-report'
apply plugin: 'application'

View File

@ -1,12 +1,13 @@
package com.r3.dbfailure.contracts
import com.r3.dbfailure.schemas.DbFailureSchemaV1
import net.corda.core.contracts.CommandAndState
import net.corda.core.contracts.CommandData
import net.corda.core.contracts.Contract
import net.corda.core.contracts.LinearState
import net.corda.core.contracts.OwnableState
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import net.corda.core.schemas.MappedSchema
import net.corda.core.schemas.PersistentState
import net.corda.core.schemas.QueryableState
@ -21,23 +22,31 @@ class DbFailureContract : Contract {
class TestState(
override val linearId: UniqueIdentifier,
val particpant: Party,
override val participants: List<AbstractParty>,
val randomValue: String?,
val errorTarget: Int = 0
) : LinearState, QueryableState {
val errorTarget: Int = 0,
override val owner: AbstractParty
) : LinearState, QueryableState, OwnableState {
override val participants: List<AbstractParty> = listOf(particpant)
override fun supportedSchemas(): Iterable<MappedSchema> = listOf(DbFailureSchemaV1)
override fun generateMappedObject(schema: MappedSchema): PersistentState {
return if (schema is DbFailureSchemaV1){
DbFailureSchemaV1.PersistentTestState( particpant.name.toString(), randomValue, errorTarget, linearId.id)
DbFailureSchemaV1.PersistentTestState( participants.toString(), randomValue, errorTarget, linearId.id)
}
else {
throw IllegalArgumentException("Unsupported schema $schema")
}
}
override fun withNewOwner(newOwner: AbstractParty): CommandAndState {
return CommandAndState(Commands.Send(), TestState(this.linearId, this.participants.plus(newOwner).toSet().toList(), this.randomValue, this.errorTarget, newOwner))
}
fun withNewOwnerAndErrorTarget(newOwner: AbstractParty, errorTarget: Int): CommandAndState {
return CommandAndState(Commands.Send(), TestState(this.linearId, this.participants.plus(newOwner).toSet().toList(), this.randomValue, errorTarget, newOwner))
}
}
override fun verify(tx: LedgerTransaction) {
@ -46,5 +55,6 @@ class DbFailureContract : Contract {
interface Commands : CommandData{
class Create: Commands
class Send : Commands
}
}

View File

@ -20,14 +20,16 @@ object CreateStateFlow {
// 1000s control exception handlling in the service/vault listener
enum class ErrorTarget(val targetNumber: Int) {
NoError(0),
ServiceSqlSyntaxError(1),
ServiceNullConstraintViolation(2),
ServiceValidUpdate(3),
ServiceReadState(4),
ServiceCheckForState(5),
ServiceThrowInvalidParameter(6),
ServiceThrowMotherOfAllExceptions(7),
ServiceThrowUnrecoverableError(8),
ServiceSqlSyntaxError(10000),
ServiceNullConstraintViolation(20000),
ServiceValidUpdate(30000),
ServiceReadState(40000),
ServiceCheckForState(50000),
ServiceThrowInvalidParameter(60000),
ServiceThrowMotherOfAllExceptions(70000),
ServiceThrowUnrecoverableError(80000),
ServiceSqlSyntaxErrorOnConsumed(90000),
ServiceConstraintViolationException(1000000),
TxInvalidState(10),
FlowSwallowErrors(100),
ServiceSwallowErrors(1000)
@ -40,7 +42,7 @@ object CreateStateFlow {
private val targetMap = ErrorTarget.values().associateBy(ErrorTarget::targetNumber)
fun getServiceTarget(target: Int?): ErrorTarget {
return target?.let { targetMap.getValue(it % 10) } ?: CreateStateFlow.ErrorTarget.NoError
return target?.let { targetMap.getValue(((it/10000) % 1000)*10000) } ?: CreateStateFlow.ErrorTarget.NoError
}
fun getServiceExceptionHandlingTarget(target: Int?): ErrorTarget {
@ -69,10 +71,11 @@ object CreateStateFlow {
val txTarget = getTxTarget(errorTarget)
logger.info("Test flow: The tx error target is $txTarget")
val state = DbFailureContract.TestState(
UniqueIdentifier(),
ourIdentity,
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue,
errorTarget)
UniqueIdentifier(),
listOf(ourIdentity),
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else randomValue,
errorTarget, ourIdentity
)
val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey)
logger.info("Test flow: tx builder")

View File

@ -4,33 +4,145 @@ import com.r3.dbfailure.contracts.DbFailureContract
import net.corda.core.contracts.ContractState
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.node.services.Vault
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import org.hibernate.exception.ConstraintViolationException
import rx.Subscriber
import rx.observers.SafeSubscriber
import rx.observers.Subscribers
import java.lang.IllegalStateException
import java.security.InvalidParameterException
import java.sql.SQLException
@CordaService
class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
companion object {
val log = contextLogger()
var onError: ((Throwable) -> Unit)? = null
// make the service throw an unrecoverable error (should be executed in an outOfProcess node so that it wont halt testing jvm)
var throwUnrecoverableError = false
var safeSubscription = true
var withCustomSafeSubscriber = false
var onNextVisited: (Party) -> Unit = {}
var onErrorVisited: ((Party) -> Unit)? = null
}
init {
val onNext: (Vault.Update<ContractState>) -> Unit =
{ (_, produced) ->
produced.forEach {
val contractState = it.state.data as? DbFailureContract.TestState
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions
try {
{ (consumed, produced) ->
onNextVisited(services.myInfo.legalIdentities.first())
produced.forEach {
val contractState = it.state.data as? DbFailureContract.TestState
@Suppress("TooGenericExceptionCaught") // this is fully intentional here, to allow twiddling with exceptions
try {
when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) {
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> {
log.info("Fail with syntax error on raw statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"BLAAA RANDOM_VALUE = NULL\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> {
log.info("Fail with null constraint violation on raw statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"SET RANDOM_VALUE = NULL\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceValidUpdate -> {
log.info("Update current statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceReadState -> {
log.info("Read current state from db")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"SELECT * FROM FAIL_TEST_STATES \n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceCheckForState -> {
log.info("Check for currently written state in the db")
val session = services.jdbcSession()
val statement = session.createStatement()
val rs = statement.executeQuery(
"SELECT COUNT(*) FROM FAIL_TEST_STATES \n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
log.info(
"Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " +
"TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}"
)
}
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> {
log.info("Throw InvalidParameterException")
throw InvalidParameterException("Toys out of pram")
}
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions -> {
log.info("Throw Exception")
throw Exception("Mother of all exceptions")
}
CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError -> {
// this bit of code should only work in a OutOfProcess node,
// otherwise it will kill the testing jvm (including the testing thread)
if (throwUnrecoverableError) {
log.info("Throw Unrecoverable error")
throw OutOfMemoryError("Unrecoverable error")
}
}
CreateStateFlow.ErrorTarget.ServiceConstraintViolationException -> {
log.info("Throw ConstraintViolationException")
throw ConstraintViolationException("Dummy Hibernate Exception ", SQLException(), " Will cause flow retry!")
}
else -> {
// do nothing, everything else must be handled elsewhere
}
}
} catch (t: Throwable) {
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget)
== CreateStateFlow.ErrorTarget.ServiceSwallowErrors
) {
log.warn("Service not letting errors escape", t)
} else {
throw t
}
}
}
consumed.forEach {
val contractState = it.state.data as? DbFailureContract.TestState
log.info("Test Service: Got state ${if (contractState == null) "null" else " test state with error target ${contractState.errorTarget}"}")
when (CreateStateFlow.getServiceTarget(contractState?.errorTarget)) {
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxError -> {
CreateStateFlow.ErrorTarget.ServiceSqlSyntaxErrorOnConsumed -> {
log.info("Fail with syntax error on raw statement")
val session = services.jdbcSession()
val statement = session.createStatement()
@ -41,85 +153,33 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceNullConstraintViolation -> {
log.info("Fail with null constraint violation on raw statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"SET RANDOM_VALUE = NULL\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceValidUpdate -> {
log.info("Update current statement")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"UPDATE FAIL_TEST_STATES \n" +
"SET RANDOM_VALUE = '${contractState!!.randomValue} Updated by service'\n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceReadState -> {
log.info("Read current state from db")
val session = services.jdbcSession()
val statement = session.createStatement()
statement.execute(
"SELECT * FROM FAIL_TEST_STATES \n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
log.info("SQL result: ${statement.resultSet}")
}
CreateStateFlow.ErrorTarget.ServiceCheckForState -> {
log.info("Check for currently written state in the db")
val session = services.jdbcSession()
val statement = session.createStatement()
val rs = statement.executeQuery(
"SELECT COUNT(*) FROM FAIL_TEST_STATES \n" +
"WHERE transaction_id = '${it.ref.txhash}' AND output_index = ${it.ref.index};"
)
val numOfRows = if (rs.next()) rs.getInt("COUNT(*)") else 0
log.info("Found a state with tx:ind ${it.ref.txhash}:${it.ref.index} in " +
"TEST_FAIL_STATES: ${if (numOfRows > 0) "Yes" else "No"}")
}
CreateStateFlow.ErrorTarget.ServiceThrowInvalidParameter -> {
log.info("Throw InvalidParameterException")
throw InvalidParameterException("Toys out of pram")
}
CreateStateFlow.ErrorTarget.ServiceThrowMotherOfAllExceptions -> {
log.info("Throw Exception")
throw Exception("Mother of all exceptions")
}
CreateStateFlow.ErrorTarget.ServiceThrowUnrecoverableError -> {
// this bit of code should only work in a OutOfProcess node,
// otherwise it will kill the testing jvm (including the testing thread)
if (throwUnrecoverableError) {
log.info("Throw Unrecoverable error")
throw OutOfMemoryError("Unrecoverable error")
}
}
else -> {
// do nothing, everything else must be handled elsewhere
}
}
} catch (t: Throwable) {
if (CreateStateFlow.getServiceExceptionHandlingTarget(contractState?.errorTarget)
== CreateStateFlow.ErrorTarget.ServiceSwallowErrors) {
log.warn("Service not letting errors escape", t)
} else {
throw t
}
}
}
}
if (onError != null) {
services.vaultService.rawUpdates.subscribe(onNext, onError) // onError is defined
val onErrorWrapper: ((Throwable) -> Unit)? = {
onErrorVisited?.let {
it(services.myInfo.legalIdentities.first())
}
onError!!(it)
}
services.vaultService.rawUpdates.subscribe(onNext, onErrorWrapper) // onError is defined
} else if (onErrorVisited != null) {
throw IllegalStateException("A DbListenerService.onError needs to be defined!")
} else {
services.vaultService.rawUpdates.subscribe(onNext)
if (safeSubscription) {
if (withCustomSafeSubscriber) {
services.vaultService.rawUpdates.subscribe(CustomSafeSubscriber(Subscribers.create(onNext)))
} else {
services.vaultService.rawUpdates.subscribe(onNext)
}
} else {
services.vaultService.rawUpdates.unsafeSubscribe(Subscribers.create(onNext))
}
}
}
@ -130,4 +190,6 @@ class DbListenerService(services: AppServiceHub) : SingletonSerializeAsToken() {
throwUnrecoverableError = true
}
}
class CustomSafeSubscriber<T>(actual: Subscriber<in T>): SafeSubscriber<T>(actual)
}

View File

@ -0,0 +1,88 @@
package com.r3.dbfailure.workflows
import co.paralleluniverse.fibers.Suspendable
import com.r3.dbfailure.contracts.DbFailureContract
import net.corda.core.contracts.UniqueIdentifier
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.unwrap
object SendStateFlow {
/**
* Creates a [DbFailureContract.TestState], signs it, collects a signature from a separate node and then calls [FinalityFlow] flow.
* Can throw in various stages
*/
@StartableByRPC
@InitiatingFlow
class PassErroneousOwnableState(private val stateId: UniqueIdentifier, private val errorTarget: Int, private val counterParty: Party) :
FlowLogic<Unit>() {
@Suspendable
override fun call() {
logger.info("Test flow: starting")
val notary = serviceHub.networkMapCache.notaryIdentities[0]
logger.info("Test flow: create counterparty session")
val recipientSession = initiateFlow(counterParty)
val queryCriteria = QueryCriteria.LinearStateQueryCriteria(linearId = listOf(stateId), status = Vault.StateStatus.UNCONSUMED)
val inputState = serviceHub.vaultService.queryBy(DbFailureContract.TestState::class.java, queryCriteria).states.singleOrNull()
?: throw FlowException("Failed to find single state for linear id $stateId")
logger.info("Test flow: tx builder")
val commandAndState = inputState.state.data.withNewOwnerAndErrorTarget(counterParty, errorTarget)
val txBuilder = TransactionBuilder(notary)
.addInputState(inputState)
.addOutputState(commandAndState.ownableState)
.addCommand(commandAndState.command, listOf(ourIdentity.owningKey, counterParty.owningKey))
logger.info("Test flow: verify")
txBuilder.verify(serviceHub)
val signedTx = serviceHub.signInitialTransaction(txBuilder)
logger.info("Test flow: send for counterparty signing")
recipientSession.send(signedTx)
logger.info("Test flow: Waiting to receive counter signed transaction")
val counterSignedTx = recipientSession.receive<SignedTransaction>().unwrap { it }
logger.info("Test flow: Received counter sigend transaction, invoking finality")
subFlow(FinalityFlow(counterSignedTx, recipientSession))
logger.info("Test flow: Finishing")
}
}
@InitiatedBy(PassErroneousOwnableState::class)
class PassErroneousOwnableStateReceiver(private val otherSide: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
logger.info("Test flow counterparty: starting")
val signedTx = otherSide.receive<SignedTransaction>().unwrap { it }
logger.info("Test flow counterparty: received TX, signing")
val counterSignedTx = serviceHub.addSignature(signedTx)
logger.info("Test flow counterparty: calling hookBeforeCounterPartyAnswers")
logger.info("Test flow counterparty: Answer with countersigned transaction")
otherSide.send(counterSignedTx)
logger.info("Test flow counterparty: calling hookAfterCounterPartyAnswers")
// Not ideal that we have to do this check, but we must as FinalityFlow does not send locally
if (!serviceHub.myInfo.isLegalIdentity(otherSide.counterparty)) {
logger.info("Test flow counterparty: Waiting for finality")
subFlow(ReceiveFinalityFlow(otherSide))
}
logger.info("Test flow counterparty: Finishing")
}
}
}

View File

@ -30,9 +30,10 @@ object ErrorHandling {
val txTarget = CreateStateFlow.getTxTarget(errorTarget)
val state = DbFailureContract.TestState(
UniqueIdentifier(),
ourIdentity,
listOf(ourIdentity),
if (txTarget == CreateStateFlow.ErrorTarget.TxInvalidState) null else "valid hibernate value",
errorTarget)
errorTarget,
ourIdentity)
val txCommand = Command(DbFailureContract.Commands.Create(), ourIdentity.owningKey)
val txBuilder = TransactionBuilder(notary).addOutputState(state).addCommand(txCommand)
val signedTx = serviceHub.signInitialTransaction(txBuilder)
@ -50,5 +51,4 @@ object ErrorHandling {
hookAfterSecondCheckpoint.invoke() // should be never executed
}
}
}

View File

@ -21,8 +21,20 @@ import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_CONTRACT_NAME
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_CONTRACT_LICENCE
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_CONTRACT_VENDOR
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_CONTRACT_VERSION
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_WORKFLOW_NAME
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_WORKFLOW_LICENCE
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_WORKFLOW_VENDOR
import net.corda.core.internal.cordapp.CordappImpl.Companion.CORDAPP_WORKFLOW_VERSION
import net.corda.core.internal.cordapp.CordappImpl.Companion.MIN_PLATFORM_VERSION
import net.corda.core.internal.cordapp.CordappImpl.Companion.TARGET_PLATFORM_VERSION
import net.corda.core.internal.cordapp.get
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.isRegularFile
import net.corda.core.internal.list
import net.corda.core.internal.packageName_
import net.corda.core.internal.readObject
@ -80,24 +92,26 @@ import okhttp3.OkHttpClient
import okhttp3.Request
import rx.Subscription
import rx.schedulers.Schedulers
import java.io.File
import java.net.ConnectException
import java.net.URL
import java.net.URLClassLoader
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.security.cert.X509Certificate
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.util.Random
import java.util.UUID
import java.util.*
import java.util.Collections.unmodifiableList
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import java.util.jar.JarInputStream
import kotlin.collections.ArrayList
import kotlin.collections.HashMap
import kotlin.collections.HashSet
@ -792,6 +806,17 @@ class DriverDSLImpl(
Permissions.invokeRpc(CordaRPCOps::killFlow)
)
private val CORDAPP_MANIFEST_ATTRIBUTES: List<String> = unmodifiableList(listOf(
CORDAPP_CONTRACT_NAME,
CORDAPP_CONTRACT_LICENCE,
CORDAPP_CONTRACT_VENDOR,
CORDAPP_CONTRACT_VERSION,
CORDAPP_WORKFLOW_NAME,
CORDAPP_WORKFLOW_LICENCE,
CORDAPP_WORKFLOW_VENDOR,
CORDAPP_WORKFLOW_VERSION
))
/**
* Add the DJVM's sources to the node's configuration file.
* These will all be ignored unless devMode is also true.
@ -923,12 +948,11 @@ class DriverDSLImpl(
// The following dependencies are excluded from the classpath of the created JVM,
// so that the environment resembles a real one as close as possible.
// These are either classes that will be added as attachments to the node (i.e. samples, finance, opengamma etc.)
// or irrelevant testing libraries (test, corda-mock etc.).
// TODO: There is pending work to fix this issue without custom blacklisting. See: https://r3-cev.atlassian.net/browse/CORDA-2164.
val exclude = listOf("samples", "finance", "integrationTest", "test", "corda-mock", "com.opengamma.strata")
val cp = ProcessUtilities.defaultClassPath.filterNot { cpEntry ->
exclude.any { token -> cpEntry.contains("${File.separatorChar}$token") } || cpEntry.endsWith("-tests.jar")
val cp = ProcessUtilities.defaultClassPath.filter { cpEntry ->
val cpPathEntry = Paths.get(cpEntry)
cpPathEntry.isRegularFile()
&& !isTestArtifact(cpPathEntry.fileName.toString())
&& !cpPathEntry.isCorDapp
}
return ProcessUtilities.startJavaProcess(
@ -944,6 +968,27 @@ class DriverDSLImpl(
)
}
// Obvious test artifacts. This is NOT intended to be an exhaustive list!
// It is only intended to remove those FEW jars which BLATANTLY do not
// belong inside a Corda Node.
private fun isTestArtifact(name: String): Boolean {
return name.endsWith("-tests.jar")
|| name.endsWith("-test.jar")
|| name.startsWith("corda-mock")
|| name.startsWith("junit")
|| name.startsWith("testng")
|| name.startsWith("mockito")
}
// Identify CorDapp JARs by their attributes in MANIFEST.MF.
private val Path.isCorDapp: Boolean get() {
return JarInputStream(Files.newInputStream(this).buffered()).use { jar ->
val manifest = jar.manifest ?: return false
CORDAPP_MANIFEST_ATTRIBUTES.any { manifest[it] != null }
|| (manifest[TARGET_PLATFORM_VERSION] != null && manifest[MIN_PLATFORM_VERSION] != null)
}
}
private fun startWebserver(handle: NodeHandleInternal, debugPort: Int?, maximumHeapSize: String): Process {
val className = "net.corda.webserver.WebServer"
writeConfig(handle.baseDirectory, "web-server.conf", handle.toWebServerConfig())

View File

@ -6,6 +6,7 @@ import net.corda.client.jackson.JacksonSupport
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.RPCException
import net.corda.core.internal.errors.AddressBindingException
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.contextLogger
@ -34,7 +35,8 @@ import javax.servlet.http.HttpServletRequest
class NodeWebServer(val config: WebServerConfig) {
private companion object {
private val log = contextLogger()
const val retryDelay = 1000L // Milliseconds
private const val NODE_CONNECT_RETRY_COUNT = 30
private const val NODE_CONNECT_WAIT_BETWEEN_RETRYS = 2000L
}
val address = config.webAddress
@ -186,13 +188,26 @@ class NodeWebServer(val config: WebServerConfig) {
private lateinit var rpc: CordaRPCConnection
private fun reconnectingCordaRPCOps(): CordaRPCOps {
rpc = CordaRPCClient(config.rpcAddress, null, javaClass.classLoader)
.start(
config.runAs.username,
config.runAs.password,
GracefulReconnect()
)
return rpc.proxy
var retryCount = NODE_CONNECT_RETRY_COUNT
while (true) {
try {
rpc = CordaRPCClient(config.rpcAddress, null, javaClass.classLoader)
.start(
config.runAs.username,
config.runAs.password,
GracefulReconnect()
)
return rpc.proxy
}
catch (ex: RPCException) {
if (retryCount-- == 0) {
throw ex
}
else {
Thread.sleep(NODE_CONNECT_WAIT_BETWEEN_RETRYS)
}
}
}
}
/**