From 0dae1080880a1d274ccd25d858b63eea05039f20 Mon Sep 17 00:00:00 2001 From: josecoll Date: Thu, 10 May 2018 09:48:19 +0100 Subject: [PATCH 1/4] Reordering fix: move now works as depends on creation of html directory from previous step. (#3108) --- docs/make-docsite.sh | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/make-docsite.sh b/docs/make-docsite.sh index bc317c3e63..1030bd766a 100755 --- a/docs/make-docsite.sh +++ b/docs/make-docsite.sh @@ -12,6 +12,12 @@ else fi # TODO: The PDF rendering is pretty ugly and can be improved a lot. +echo "Generating PDF document ..." make pdf -mv build/pdf/corda-developer-site.pdf build/html/_static/corda-developer-site.pdf + +echo "Generating HTML pages ..." make html + +echo "Moving PDF file into place ..." +mv $PWD/build/pdf/corda-developer-site.pdf $PWD/build/html/_static/corda-developer-site.pdf + From 1a6922afe05fed324c23abd0c9ce125e57b2dad9 Mon Sep 17 00:00:00 2001 From: Thomas Schroeter Date: Thu, 10 May 2018 11:37:57 +0100 Subject: [PATCH 2/4] Test notarisation of issue transactions with time-window (#3092) --- .../node/services/BFTNotaryServiceTests.kt | 18 ++++++++++++++++-- .../node/services/RaftNotaryServiceTests.kt | 19 ++++++++++++++++++- .../ValidatingNotaryServiceTests.kt | 12 ++++++++++++ 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt index c6aea81f4e..c3492fbbef 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt @@ -20,6 +20,7 @@ import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.Try import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds import net.corda.node.internal.StartedNode import net.corda.node.services.config.BFTSMaRtConfiguration import net.corda.node.services.config.NotaryConfig @@ -31,7 +32,6 @@ import net.corda.testing.common.internal.testNetworkParameters import net.corda.testing.contracts.DummyContract import net.corda.testing.core.dummyCommand import net.corda.testing.core.singleIdentity -import net.corda.testing.driver.PortAllocation import net.corda.testing.node.TestClock import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.InternalMockNetwork.MockNode @@ -43,7 +43,6 @@ import org.junit.Assert.assertThat import java.nio.file.Paths import java.time.Duration import java.time.Instant -import java.util.* import java.util.concurrent.ExecutionException import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -168,6 +167,21 @@ class BFTNotaryServiceTests { } } + @Test + fun `notarise issue tx with time-window`() { + node.run { + val issueTx = signInitialTransaction(notary) { + setTimeWindow(services.clock.instant(), 30.seconds) + addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) + } + val resultFuture = services.startFlow(NotaryFlow.Client(issueTx)).resultFuture + + mockNet.runNetwork() + val signatures = resultFuture.get() + verifySignatures(signatures, issueTx.id) + } + } + @Test fun `transactions can be re-notarised outside their time window`() { node.run { diff --git a/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt index d656b9022e..5cc23ab47a 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt @@ -10,6 +10,7 @@ import net.corda.core.identity.Party import net.corda.core.internal.concurrent.map import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds import net.corda.testing.core.DUMMY_BANK_A_NAME import net.corda.testing.core.singleIdentity import net.corda.testing.contracts.DummyContract @@ -20,7 +21,6 @@ import net.corda.testing.driver.InProcess import net.corda.testing.driver.internal.InProcessImpl import net.corda.testing.node.ClusterSpec import net.corda.testing.node.NotarySpec -import net.corda.testing.node.internal.startFlow import org.junit.Test import java.util.* import kotlin.test.assertEquals @@ -62,6 +62,23 @@ class RaftNotaryServiceTests { } } + @Test + fun `notarise issue tx with time-window`() { + driver(DriverParameters( + startNodesInProcess = true, + extraCordappPackagesToScan = listOf("net.corda.testing.contracts"), + notarySpecs = listOf(NotarySpec(notaryName, cluster = ClusterSpec.Raft(clusterSize = 3))) + )) { + val bankA = startNode(providedName = DUMMY_BANK_A_NAME).map { (it as InProcess) }.getOrThrow() + val issueTx = (bankA as InProcessImpl).database.transaction { + val builder = DummyContract.generateInitial(Random().nextInt(), defaultNotaryIdentity, bankA.services.myInfo.singleIdentity().ref(0)) + .setTimeWindow(bankA.services.clock.instant(), 30.seconds) + bankA.services.signInitialTransaction(builder) + } + bankA.startFlow(NotaryFlow.Client(issueTx)).getOrThrow() + } + } + private fun issueState(nodeHandle: InProcess, notary: Party): StateAndRef<*> { return (nodeHandle as InProcessImpl).database.transaction { diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt index c3aee36b7b..6c7721cba3 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt @@ -169,6 +169,18 @@ class ValidatingNotaryServiceTests { assertThat(ex.error).isInstanceOf(NotaryError.TimeWindowInvalid::class.java) } + @Test + fun `notarise issue tx with time-window`() { + val stx = run { + val tx = DummyContract.generateInitial(Random().nextInt(), notary, alice.ref(0)) + .setTimeWindow(Instant.now(), 30.seconds) + aliceNode.services.signInitialTransaction(tx) + } + + val sig = runNotaryClient(stx).getOrThrow().single() + assertEquals(sig.by, notary.owningKey) + } + @Test fun `should sign identical transaction multiple times (notarisation is idempotent)`() { val stx = run { From 36d13124d5904e1dd6c339d357afb1daf1157942 Mon Sep 17 00:00:00 2001 From: Chris Rankin Date: Thu, 10 May 2018 12:42:01 +0100 Subject: [PATCH 3/4] ENT-1463: Refactor serialisation slightly for determinisation. (#3110) --- .../internal/serialization/ByteBufferStreams.kt | 6 +++--- .../nodeapi/internal/serialization/ServerContexts.kt | 5 ----- .../nodeapi/internal/serialization/SharedContexts.kt | 9 +++++---- .../serialization/amqp/AMQPSerializationScheme.kt | 10 +++++++++- .../internal/serialization/carpenter/ClassCarpenter.kt | 5 +++-- 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ByteBufferStreams.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ByteBufferStreams.kt index f3710569ed..e7e942c595 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ByteBufferStreams.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ByteBufferStreams.kt @@ -10,9 +10,9 @@ import java.nio.ByteBuffer import kotlin.math.min internal val serializeOutputStreamPool = LazyPool( - clear = ByteBufferOutputStream::reset, - shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large - newInstance = { ByteBufferOutputStream(64 * 1024) }) + clear = ByteBufferOutputStream::reset, + shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large + newInstance = { ByteBufferOutputStream(64 * 1024) }) internal fun byteArrayOutput(task: (ByteBufferOutputStream) -> T): ByteArray { return serializeOutputStreamPool.run { underlying -> diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt index c18d7f4228..ae2e8cdb67 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt @@ -2,16 +2,11 @@ package net.corda.nodeapi.internal.serialization -import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationDefaults import net.corda.nodeapi.internal.serialization.amqp.amqpMagic import net.corda.nodeapi.internal.serialization.kryo.kryoMagic -object QuasarWhitelist : ClassWhitelist { - override fun hasListed(type: Class<*>): Boolean = true -} - /* * Serialisation contexts for the server. * These have been refactored into a separate file to prevent diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt index e54c70cc69..0880af153c 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt @@ -2,10 +2,7 @@ package net.corda.nodeapi.internal.serialization -import net.corda.core.serialization.EncodingWhitelist -import net.corda.core.serialization.SerializationContext -import net.corda.core.serialization.SerializationDefaults -import net.corda.core.serialization.SerializationEncoding +import net.corda.core.serialization.* import net.corda.nodeapi.internal.serialization.amqp.amqpMagic import net.corda.nodeapi.internal.serialization.kryo.kryoMagic @@ -36,3 +33,7 @@ val AMQP_P2P_CONTEXT = SerializationContextImpl(amqpMagic, internal object AlwaysAcceptEncodingWhitelist : EncodingWhitelist { override fun acceptEncoding(encoding: SerializationEncoding) = true } + +object QuasarWhitelist : ClassWhitelist { + override fun hasListed(type: Class<*>): Boolean = true +} diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt index 4c16e1f44a..16ed0fa30d 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt @@ -95,11 +95,11 @@ abstract class AbstractAMQPSerializationScheme( register(net.corda.nodeapi.internal.serialization.amqp.custom.X509CRLSerializer) register(net.corda.nodeapi.internal.serialization.amqp.custom.CertPathSerializer(this)) register(net.corda.nodeapi.internal.serialization.amqp.custom.StringBufferSerializer) - register(net.corda.nodeapi.internal.serialization.amqp.custom.SimpleStringSerializer) register(net.corda.nodeapi.internal.serialization.amqp.custom.InputStreamSerializer) register(net.corda.nodeapi.internal.serialization.amqp.custom.BitSetSerializer(this)) register(net.corda.nodeapi.internal.serialization.amqp.custom.EnumSetSerializer(this)) register(net.corda.nodeapi.internal.serialization.amqp.custom.ContractAttachmentSerializer(this)) + registerNonDeterministicSerializers(factory) } for (whitelistProvider in serializationWhitelists) { factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray()) @@ -116,7 +116,15 @@ abstract class AbstractAMQPSerializationScheme( factory.registerExternal(CorDappCustomSerializer(customSerializer, factory)) } } + } + /* + * Register the serializers which will be excluded from the DJVM. + */ + private fun registerNonDeterministicSerializers(factory: SerializerFactory) { + with(factory) { + register(net.corda.nodeapi.internal.serialization.amqp.custom.SimpleStringSerializer) + } } private val serializerFactoriesForContexts = ConcurrentHashMap, SerializerFactory>() diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/ClassCarpenter.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/ClassCarpenter.kt index 5ac6c95c7c..b99440a912 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/ClassCarpenter.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/ClassCarpenter.kt @@ -89,8 +89,9 @@ private val toStringHelper: String = Type.getInternalName(MoreObjects.ToStringHe * * Equals/hashCode methods are not yet supported. */ -class ClassCarpenter(cl: ClassLoader = Thread.currentThread().contextClassLoader, - val whitelist: ClassWhitelist) { +class ClassCarpenter(cl: ClassLoader, val whitelist: ClassWhitelist) { + constructor(whitelist: ClassWhitelist) : this(Thread.currentThread().contextClassLoader, whitelist) + // TODO: Generics. // TODO: Sandbox the generated code when a security manager is in use. // TODO: Generate equals/hashCode. From 15e87050c71f661d0340c08276049412dc665477 Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Thu, 10 May 2018 15:20:41 +0100 Subject: [PATCH 4/4] CORDA-1393: Make Explorer GUI recover on RPC connection loss. (#3093) * CORDA-1393: Install `onError()` handler for folding action or else `ErrorNotImplementedAction` will be invoked which is never a good thing * CORDA-1335: Improve exception handling in `cleanUpOnConnectionLoss()` * CORDA-1335: Try to trick the logic to pretend we are running in HA mode to have a chance of re-connecting. * CORDA-1416: Make `NodeMonitorModel` code react to proxy changing. * CORDA-1416: Workaround `CordaRPCOps.equals()` calls when listener dispatching change. * CORDA-1416: Increase re-try interval to allow enough time for server to come back online. * CORDA-1355: Properly close RPC connection we are moving away from. * CORDA-1355: Unsubscribe on Error to prevent propagation of it downstream. * CORDA-1355: For downstream subscribers ignore errors properly. Thanka to @exfalso for the hint. This fixes: Transaction Updates do not flow after re-connect * CORDA-1355: Bugfix eliminate duplicating items on "Transactions" blotter after re-connect. * CORDA-1355: Bugfix eliminate double counting on dashboards. * CORDA-1355: Bugfix eliminate same parties in dropdowns. * CORDA-1355: Stop using `SecureHash.randomSHA256()` for painting widget icon. Instead use combined SHA hash such that icon represents the whole population of trades. That way two transactions blotters can be compared by a single glimpse at corresponding icons. Also minor refactoring. * CORDA-1416: Make RPC re-connection faster/more robust. * CORDA-1416: Properly announce thet Proxy may not be available during re-connect and prevent UI crashing. * CORDA-1416: Disable UI until RPC proxy is available. * CORDA-1416: Correct typo. * CORDA-1416: Unit test fix. * CORDA-1416: GUI cosmetic changes. * CORDA-1416: Correct spaces. * CORDA-1416: Remove un-necessary overrides in CordaRPCOpsWrapper. * CORDA-1416: Switch from using `doOnError` to installing an error handler upon subscription. --- .../corda/client/jfx/NodeMonitorModelTest.kt | 4 +- .../client/jfx/model/ContractStateModel.kt | 3 +- .../client/jfx/model/NetworkIdentityModel.kt | 16 +- .../client/jfx/model/NodeMonitorModel.kt | 180 ++++++++++++++---- .../client/jfx/model/TransactionDataModel.kt | 2 +- .../net/corda/client/jfx/utils/ChosenList.kt | 8 +- .../corda/client/jfx/utils/ObservableFold.kt | 15 +- .../client/jfx/utils/ObservableUtilities.kt | 6 +- .../rpc/internal/RPCClientProxyHandler.kt | 16 +- .../net/corda/core/internal/InternalUtils.kt | 13 +- .../net/corda/explorer/model/IssuerModel.kt | 12 +- .../net/corda/explorer/views/MainView.kt | 20 +- .../net/corda/explorer/views/SearchField.kt | 2 +- .../corda/explorer/views/TransactionViewer.kt | 36 ++-- .../views/cordapps/cash/CashViewer.kt | 2 +- .../views/cordapps/cash/NewTransaction.kt | 8 +- .../explorer/css/corda-dark-color-scheme.css | 8 + .../net/corda/explorer/views/MainView.fxml | 5 +- 18 files changed, 254 insertions(+), 102 deletions(-) diff --git a/client/jfx/src/integration-test/kotlin/net/corda/client/jfx/NodeMonitorModelTest.kt b/client/jfx/src/integration-test/kotlin/net/corda/client/jfx/NodeMonitorModelTest.kt index db1077d8c8..5da68b3af4 100644 --- a/client/jfx/src/integration-test/kotlin/net/corda/client/jfx/NodeMonitorModelTest.kt +++ b/client/jfx/src/integration-test/kotlin/net/corda/client/jfx/NodeMonitorModelTest.kt @@ -78,7 +78,7 @@ class NodeMonitorModelTest { networkMapUpdates = monitor.networkMap.bufferUntilSubscribed() monitor.register(aliceNodeHandle.rpcAddress, cashUser.username, cashUser.password) - rpc = monitor.proxyObservable.value!! + rpc = monitor.proxyObservable.value!!.cordaRPCOps notaryParty = defaultNotaryIdentity val bobNodeHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(cashUser)).getOrThrow() @@ -86,7 +86,7 @@ class NodeMonitorModelTest { val monitorBob = NodeMonitorModel() stateMachineUpdatesBob = monitorBob.stateMachineUpdates.bufferUntilSubscribed() monitorBob.register(bobNodeHandle.rpcAddress, cashUser.username, cashUser.password) - rpcBob = monitorBob.proxyObservable.value!! + rpcBob = monitorBob.proxyObservable.value!!.cordaRPCOps runTest() } } diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/ContractStateModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/ContractStateModel.kt index 85a37acafb..a8a0d04109 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/ContractStateModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/ContractStateModel.kt @@ -2,6 +2,7 @@ package net.corda.client.jfx.model import javafx.collections.FXCollections import javafx.collections.ObservableList +import net.corda.client.jfx.utils.distinctBy import net.corda.client.jfx.utils.fold import net.corda.client.jfx.utils.map import net.corda.core.contracts.ContractState @@ -31,7 +32,7 @@ class ContractStateModel { val cashStates: ObservableList> = cashStatesDiff.fold(FXCollections.observableArrayList()) { list: MutableList>, statesDiff -> list.removeIf { it in statesDiff.removed } list.addAll(statesDiff.added) - } + }.distinctBy { it.ref } val cash = cashStates.map { it.state.data.amount } diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NetworkIdentityModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NetworkIdentityModel.kt index 153c10f462..a4b8e334ed 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NetworkIdentityModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NetworkIdentityModel.kt @@ -4,12 +4,8 @@ import com.github.benmanes.caffeine.cache.Caffeine import javafx.beans.value.ObservableValue import javafx.collections.FXCollections import javafx.collections.ObservableList -import net.corda.client.jfx.utils.ChosenList -import net.corda.client.jfx.utils.filterNotNull -import net.corda.client.jfx.utils.fold -import net.corda.client.jfx.utils.map +import net.corda.client.jfx.utils.* import net.corda.core.identity.AnonymousParty -import net.corda.core.identity.Party import net.corda.core.node.NodeInfo import net.corda.core.node.services.NetworkMapCache.MapChange import java.security.PublicKey @@ -35,13 +31,13 @@ class NetworkIdentityModel { private val identityCache = Caffeine.newBuilder() .build>({ publicKey -> - publicKey?.let { rpcProxy.map { it?.nodeInfoFromParty(AnonymousParty(publicKey)) } } + publicKey.let { rpcProxy.map { it?.cordaRPCOps?.nodeInfoFromParty(AnonymousParty(publicKey)) } } }) - val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.notaryIdentities() ?: emptyList()) }) - val notaryNodes: ObservableList = notaries.map { rpcProxy.value?.nodeInfoFromParty(it) }.filterNotNull() + val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.cordaRPCOps?.notaryIdentities() ?: emptyList()) }, "notaries") + val notaryNodes: ObservableList = notaries.map { rpcProxy.value?.cordaRPCOps?.nodeInfoFromParty(it) }.filterNotNull() val parties: ObservableList = networkIdentities - .filtered { it.legalIdentities.all { it !in notaries } } - val myIdentity = rpcProxy.map { it?.nodeInfo()?.legalIdentitiesAndCerts?.first()?.party } + .filtered { it.legalIdentities.all { it !in notaries } }.unique() + val myIdentity = rpcProxy.map { it?.cordaRPCOps?.nodeInfo()?.legalIdentitiesAndCerts?.first()?.party } fun partyFromPublicKey(publicKey: PublicKey): ObservableValue = identityCache[publicKey]!! } diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt index 2fdcd18550..a49785a101 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt @@ -1,11 +1,15 @@ package net.corda.client.jfx.model +import com.sun.javafx.application.PlatformImpl +import javafx.application.Platform import javafx.beans.property.SimpleObjectProperty import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration +import net.corda.client.rpc.CordaRPCConnection import net.corda.core.contracts.ContractState import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party +import net.corda.core.internal.staticField import net.corda.core.messaging.* import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.node.services.Vault @@ -15,9 +19,14 @@ import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger import net.corda.core.utilities.seconds +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import rx.Observable +import rx.Subscription import rx.subjects.PublishSubject +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val message: String) { companion object { @@ -34,6 +43,7 @@ data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val mess */ class NodeMonitorModel { + private val retryableStateMachineUpdatesSubject = PublishSubject.create() private val stateMachineUpdatesSubject = PublishSubject.create() private val vaultUpdatesSubject = PublishSubject.create>() private val transactionsSubject = PublishSubject.create() @@ -48,27 +58,76 @@ class NodeMonitorModel { val progressTracking: Observable = progressTrackingSubject val networkMap: Observable = networkMapSubject - val proxyObservable = SimpleObjectProperty() + val proxyObservable = SimpleObjectProperty() lateinit var notaryIdentities: List + companion object { + val logger = contextLogger() + + private fun runLaterIfInitialized(op: () -> Unit) { + + val initialized = PlatformImpl::class.java.staticField("initialized") + + // Only execute using "runLater()" if JavaFX been initialized. + // It may not be initialized in the unit test. + if(initialized.value.get()) { + Platform.runLater(op) + } else { + op() + } + } + } + + /** + * This is needed as JavaFX listener framework attempts to call `equals()` before dispatching notification change. + * And calling `CordaRPCOps.equals()` results in (unhandled) remote call. + */ + class CordaRPCOpsWrapper(val cordaRPCOps: CordaRPCOps) + /** * Register for updates to/from a given vault. * TODO provide an unsubscribe mechanism */ fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) { - val client = CordaRPCClient( - nodeHostAndPort, - object : CordaRPCClientConfiguration { - override val connectionMaxRetryInterval = 10.seconds - } - ) - val connection = client.start(username, password) - val proxy = connection.proxy - notaryIdentities = proxy.notaryIdentities() - val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed() + // `retryableStateMachineUpdatesSubject` will change it's upstream subscriber in case of RPC connection failure, this `Observable` should + // never produce an error. + // `stateMachineUpdatesSubject` will stay firmly subscribed to `retryableStateMachineUpdatesSubject` + retryableStateMachineUpdatesSubject.subscribe(stateMachineUpdatesSubject) + + // Proxy may change during re-connect, ensure that subject wiring accurately reacts to this activity. + proxyObservable.addListener { _, _, wrapper -> + if(wrapper != null) { + val proxy = wrapper.cordaRPCOps + // Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates + val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL), + PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE)) + val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ -> + statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED + }.toSet() + val consumedStates = statesSnapshot.states.toSet() - unconsumedStates + val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates) + vaultUpdates.startWith(initialVaultUpdate).subscribe({ vaultUpdatesSubject.onNext(it) }, {}) + + // Transactions + val (transactions, newTransactions) = proxy.internalVerifiedTransactionsFeed() + newTransactions.startWith(transactions).subscribe({ transactionsSubject.onNext(it) }, {}) + + // SM -> TX mapping + val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMappingFeed() + futureSmTxMappings.startWith(smTxMappings).subscribe({ stateMachineTransactionMappingSubject.onNext(it) }, {}) + + // Parties on network + val (parties, futurePartyUpdate) = proxy.networkMapFeed() + futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe({ networkMapSubject.onNext(it) }, {}) + } + } + + val stateMachines = performRpcReconnect(nodeHostAndPort, username, password) + // Extract the flow tracking stream // TODO is there a nicer way of doing this? Stream of streams in general results in code like this... + // TODO `progressTrackingSubject` doesn't seem to be used anymore - should it be removed? val currentProgressTrackerUpdates = stateMachines.mapNotNull { stateMachine -> ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachine) } @@ -82,33 +141,74 @@ class NodeMonitorModel { // We need to retry, because when flow errors, we unsubscribe from progressTrackingSubject. So we end up with stream of state machine updates and no progress trackers. futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject) - - // Now the state machines - val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) } - stateMachineUpdates.startWith(currentStateMachines).subscribe(stateMachineUpdatesSubject) - - // Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates - val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL), - PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE)) - val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ -> - statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED - }.toSet() - val consumedStates = statesSnapshot.states.toSet() - unconsumedStates - val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates) - vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject) - - // Transactions - val (transactions, newTransactions) = proxy.internalVerifiedTransactionsFeed() - newTransactions.startWith(transactions).subscribe(transactionsSubject) - - // SM -> TX mapping - val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMappingFeed() - futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject) - - // Parties on network - val (parties, futurePartyUpdate) = proxy.networkMapFeed() - futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe(networkMapSubject) - - proxyObservable.set(proxy) } -} + + private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): List { + + val connection = establishConnectionWithRetry(nodeHostAndPort, username, password) + val proxy = connection.proxy + + val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed() + + val retryableStateMachineUpdatesSubscription: AtomicReference = AtomicReference(null) + val subscription: Subscription = stateMachineUpdatesRaw + .startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) }) + .subscribe({ retryableStateMachineUpdatesSubject.onNext(it) }, { + // Terminate subscription such that nothing gets past this point to downstream Observables. + retryableStateMachineUpdatesSubscription.get()?.unsubscribe() + // Flag to everyone that proxy is no longer available. + runLaterIfInitialized { proxyObservable.set(null) } + // It is good idea to close connection to properly mark the end of it. During re-connect we will create a new + // client and a new connection, so no going back to this one. Also the server might be down, so we are + // force closing the connection to avoid propagation of notification to the server side. + connection.forceClose() + // Perform re-connect. + performRpcReconnect(nodeHostAndPort, username, password) + }) + + retryableStateMachineUpdatesSubscription.set(subscription) + runLaterIfInitialized { proxyObservable.set(CordaRPCOpsWrapper(proxy)) } + notaryIdentities = proxy.notaryIdentities() + + return stateMachineInfos + } + + private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection { + + val retryInterval = 5.seconds + + do { + val connection = try { + logger.info("Connecting to: $nodeHostAndPort") + val client = CordaRPCClient( + nodeHostAndPort, + object : CordaRPCClientConfiguration { + override val connectionMaxRetryInterval = retryInterval + } + ) + val _connection = client.start(username, password) + // Check connection is truly operational before returning it. + val nodeInfo = _connection.proxy.nodeInfo() + require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty()) + _connection + } catch(secEx: ActiveMQSecurityException) { + // Happens when incorrect credentials provided - no point to retry connecting. + throw secEx + } + catch(th: Throwable) { + // Deliberately not logging full stack trace as it will be full of internal stacktraces. + logger.info("Exception upon establishing connection: " + th.message) + null + } + + if(connection != null) { + logger.info("Connection successfully established with: $nodeHostAndPort") + return connection + } + // Could not connect this time round - pause before giving another try. + Thread.sleep(retryInterval.toMillis()) + } while (connection == null) + + throw IllegalArgumentException("Never reaches here") + } +} \ No newline at end of file diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/TransactionDataModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/TransactionDataModel.kt index b2a76661e1..f6c8f5f479 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/TransactionDataModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/TransactionDataModel.kt @@ -83,7 +83,7 @@ data class PartiallyResolvedTransaction( */ class TransactionDataModel { private val transactions by observable(NodeMonitorModel::transactions) - private val collectedTransactions = transactions.recordInSequence() + private val collectedTransactions = transactions.recordInSequence().distinctBy { it.id } private val vaultUpdates by observable(NodeMonitorModel::vaultUpdates) private val stateMap = vaultUpdates.fold(FXCollections.observableHashMap>()) { map, update -> val states = update.consumed + update.produced diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ChosenList.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ChosenList.kt index 61f60e53aa..321f751c3e 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ChosenList.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ChosenList.kt @@ -21,7 +21,8 @@ import javafx.collections.ObservableListBase * The above will create a list that chooses and delegates to the appropriate filtered list based on the type of filter. */ class ChosenList( - private val chosenListObservable: ObservableValue> + private val chosenListObservable: ObservableValue>, + private val logicalName: String? = null ) : ObservableListBase() { private var currentList = chosenListObservable.value @@ -58,4 +59,7 @@ class ChosenList( endChange() } -} + override fun toString(): String { + return "ChosenList: $logicalName" + } +} \ No newline at end of file diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableFold.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableFold.kt index 76c75c38de..455d2b5ee7 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableFold.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableFold.kt @@ -8,6 +8,7 @@ import javafx.beans.value.ObservableValue import javafx.collections.FXCollections import javafx.collections.ObservableList import javafx.collections.ObservableMap +import org.slf4j.LoggerFactory import rx.Observable import java.util.concurrent.TimeUnit @@ -15,6 +16,12 @@ import java.util.concurrent.TimeUnit * Simple utilities for converting an [rx.Observable] into a javafx [ObservableValue]/[ObservableList] */ +private val logger = LoggerFactory.getLogger("ObservableFold") + +private fun onError(th: Throwable) { + logger.debug("OnError when folding", th) +} + /** * [foldToObservableValue] takes an [rx.Observable] stream and creates an [ObservableValue] out of it. * @param initial The initial value of the returned observable. @@ -23,11 +30,11 @@ import java.util.concurrent.TimeUnit */ fun Observable.foldToObservableValue(initial: B, folderFun: (A, B) -> B): ObservableValue { val result = SimpleObjectProperty(initial) - subscribe { + subscribe ({ Platform.runLater { result.set(folderFun(it, result.get())) } - } + }, ::onError) return result } @@ -42,7 +49,7 @@ fun Observable.fold(accumulator: R, folderFun: (R, T) -> Unit): R { * This capture is fine, as [Platform.runLater] runs closures in order. * The buffer is to avoid flooding FX thread with runnable. */ - buffer(1, TimeUnit.SECONDS).subscribe { + buffer(1, TimeUnit.SECONDS).subscribe({ if (it.isNotEmpty()) { Platform.runLater { it.fold(accumulator) { list, item -> @@ -51,7 +58,7 @@ fun Observable.fold(accumulator: R, folderFun: (R, T) -> Unit): R { } } } - } + }, ::onError) return accumulator } diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt index a863af5ade..599d825aa2 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt @@ -273,7 +273,7 @@ fun ObservableList.leftOuterJoin( val rightTableMap = rightTable.associateByAggregation(rightToJoinKey) val joinedMap: ObservableMap, ObservableList>> = LeftOuterJoinedMap(leftTableMap, rightTableMap) { _, left, rightValue -> - Pair(left, ChosenList(rightValue.map { it ?: FXCollections.emptyObservableList() })) + Pair(left, ChosenList(rightValue.map { it ?: FXCollections.emptyObservableList() }, "ChosenList from leftOuterJoin")) } return joinedMap } @@ -300,6 +300,10 @@ fun ObservableList.unique(): ObservableList { return AggregatedList(this, { it }, { key, _ -> key }) } +fun ObservableList.distinctBy(toKey: (T) -> K): ObservableList { + return AggregatedList(this, toKey, { _, entryList -> entryList[0] }) +} + fun ObservableValue<*>.isNotNull(): BooleanBinding { return Bindings.createBooleanBinding({ this.value != null }, arrayOf(this)) } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index b5146bc8ef..f26c6a0951 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -361,7 +361,15 @@ class RPCClientProxyHandler( interrupt() join(1000) } - sessionFactory?.close() + + if (notify) { + // This is going to send remote message, see `org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.doCleanUp()`. + sessionFactory?.close() + } else { + // This performs a cheaper and faster version of local cleanup. + sessionFactory?.cleanup() + } + reaperScheduledFuture?.cancel(false) observableContext.observableMap.invalidateAll() reapObservables(notify) @@ -518,7 +526,11 @@ class RPCClientProxyHandler( val m = observableContext.observableMap.asMap() m.keys.forEach { k -> observationExecutorPool.run(k) { - m[k]?.onError(RPCException("Connection failure detected.")) + try { + m[k]?.onError(RPCException("Connection failure detected.")) + } catch (th: Throwable) { + log.error("Unexpected exception when RPC connection failure handling", th) + } } } observableContext.observableMap.invalidateAll() diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index 467b2aa88f..c971b95eab 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -169,14 +169,21 @@ fun Logger.logElapsedTime(label: String, body: () -> T): T = logElapsedTime( fun logElapsedTime(label: String, logger: Logger? = null, body: () -> T): T { // Use nanoTime as it's monotonic. val now = System.nanoTime() + var failed = false try { return body() - } finally { + } + catch (th: Throwable) { + failed = true + throw th + } + finally { val elapsed = Duration.ofNanos(System.nanoTime() - now).toMillis() + val msg = (if(failed) "Failed " else "") + "$label took $elapsed msec" if (logger != null) - logger.info("$label took $elapsed msec") + logger.info(msg) else - println("$label took $elapsed msec") + println(msg) } } diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/model/IssuerModel.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/model/IssuerModel.kt index b84f092d3a..56049a35ee 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/model/IssuerModel.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/model/IssuerModel.kt @@ -9,17 +9,21 @@ import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.finance.flows.CashConfigDataFlow import tornadofx.* +import java.util.* class IssuerModel { + + private val defaultCurrency = Currency.getInstance("USD") + private val proxy by observableValue(NodeMonitorModel::proxyObservable) - private val cashAppConfiguration = proxy.map { it?.startFlow(::CashConfigDataFlow)?.returnValue?.getOrThrow() } - val supportedCurrencies = ChosenList(cashAppConfiguration.map { it?.supportedCurrencies?.observable() ?: FXCollections.emptyObservableList() }) - val currencyTypes = ChosenList(cashAppConfiguration.map { it?.issuableCurrencies?.observable() ?: FXCollections.emptyObservableList() }) + private val cashAppConfiguration = proxy.map { it?.cordaRPCOps?.startFlow(::CashConfigDataFlow)?.returnValue?.getOrThrow() } + val supportedCurrencies = ChosenList(cashAppConfiguration.map { it?.supportedCurrencies?.observable() ?: FXCollections.singletonObservableList(defaultCurrency) }, "supportedCurrencies") + val currencyTypes = ChosenList(cashAppConfiguration.map { it?.issuableCurrencies?.observable() ?: FXCollections.emptyObservableList() }, "currencyTypes") val transactionTypes = ChosenList(cashAppConfiguration.map { if (it?.issuableCurrencies?.isNotEmpty() == true) CashTransaction.values().asList().observable() else listOf(CashTransaction.Pay).observable() - }) + }, "transactionTypes") } diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/views/MainView.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/views/MainView.kt index 2369a5dfc5..3f394fd696 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/views/MainView.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/views/MainView.kt @@ -6,9 +6,7 @@ import javafx.beans.binding.Bindings import javafx.geometry.Insets import javafx.geometry.Pos import javafx.scene.Parent -import javafx.scene.control.ContentDisplay -import javafx.scene.control.MenuButton -import javafx.scene.control.MenuItem +import javafx.scene.control.* import javafx.scene.input.MouseButton import javafx.scene.layout.BorderPane import javafx.scene.layout.StackPane @@ -17,10 +15,7 @@ import javafx.scene.text.Font import javafx.scene.text.TextAlignment import javafx.stage.Stage import javafx.stage.WindowEvent -import net.corda.client.jfx.model.NetworkIdentityModel -import net.corda.client.jfx.model.objectProperty -import net.corda.client.jfx.model.observableList -import net.corda.client.jfx.model.observableValue +import net.corda.client.jfx.model.* import net.corda.client.jfx.utils.ChosenList import net.corda.client.jfx.utils.map import net.corda.explorer.formatters.PartyNameFormatter @@ -38,11 +33,14 @@ class MainView : View(WINDOW_TITLE) { private val exit by fxid() private val sidebar by fxid() private val selectionBorderPane by fxid() + private val mainSplitPane by fxid() + private val rpcWarnLabel by fxid