diff --git a/client/jfx/src/integration-test/kotlin/net/corda/client/jfx/NodeMonitorModelTest.kt b/client/jfx/src/integration-test/kotlin/net/corda/client/jfx/NodeMonitorModelTest.kt index ab0c4cf204..de3e38557d 100644 --- a/client/jfx/src/integration-test/kotlin/net/corda/client/jfx/NodeMonitorModelTest.kt +++ b/client/jfx/src/integration-test/kotlin/net/corda/client/jfx/NodeMonitorModelTest.kt @@ -99,7 +99,7 @@ class NodeMonitorModelTest : IntegrationTest() { networkMapUpdates = monitor.networkMap.bufferUntilSubscribed() monitor.register(aliceNodeHandle.rpcAddress, cashUser.username, cashUser.password) - rpc = monitor.proxyObservable.value!! + rpc = monitor.proxyObservable.value!!.cordaRPCOps notaryParty = defaultNotaryIdentity val bobNodeHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(cashUser)).getOrThrow() @@ -107,7 +107,7 @@ class NodeMonitorModelTest : IntegrationTest() { val monitorBob = NodeMonitorModel() stateMachineUpdatesBob = monitorBob.stateMachineUpdates.bufferUntilSubscribed() monitorBob.register(bobNodeHandle.rpcAddress, cashUser.username, cashUser.password) - rpcBob = monitorBob.proxyObservable.value!! + rpcBob = monitorBob.proxyObservable.value!!.cordaRPCOps runTest() } } diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/ContractStateModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/ContractStateModel.kt index 53a31b4da2..f0c35b48b8 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/ContractStateModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/ContractStateModel.kt @@ -12,6 +12,7 @@ package net.corda.client.jfx.model import javafx.collections.FXCollections import javafx.collections.ObservableList +import net.corda.client.jfx.utils.distinctBy import net.corda.client.jfx.utils.fold import net.corda.client.jfx.utils.map import net.corda.core.contracts.ContractState @@ -41,7 +42,7 @@ class ContractStateModel { val cashStates: ObservableList> = cashStatesDiff.fold(FXCollections.observableArrayList()) { list: MutableList>, statesDiff -> list.removeIf { it in statesDiff.removed } list.addAll(statesDiff.added) - } + }.distinctBy { it.ref } val cash = cashStates.map { it.state.data.amount } diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NetworkIdentityModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NetworkIdentityModel.kt index bbd5e46eff..c4245ae4bb 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NetworkIdentityModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NetworkIdentityModel.kt @@ -14,12 +14,8 @@ import com.github.benmanes.caffeine.cache.Caffeine import javafx.beans.value.ObservableValue import javafx.collections.FXCollections import javafx.collections.ObservableList -import net.corda.client.jfx.utils.ChosenList -import net.corda.client.jfx.utils.filterNotNull -import net.corda.client.jfx.utils.fold -import net.corda.client.jfx.utils.map +import net.corda.client.jfx.utils.* import net.corda.core.identity.AnonymousParty -import net.corda.core.identity.Party import net.corda.core.node.NodeInfo import net.corda.core.node.services.NetworkMapCache.MapChange import java.security.PublicKey @@ -45,13 +41,13 @@ class NetworkIdentityModel { private val identityCache = Caffeine.newBuilder() .build>({ publicKey -> - publicKey?.let { rpcProxy.map { it?.nodeInfoFromParty(AnonymousParty(publicKey)) } } + publicKey.let { rpcProxy.map { it?.cordaRPCOps?.nodeInfoFromParty(AnonymousParty(publicKey)) } } }) - val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.notaryIdentities() ?: emptyList()) }) - val notaryNodes: ObservableList = notaries.map { rpcProxy.value?.nodeInfoFromParty(it) }.filterNotNull() + val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.cordaRPCOps?.notaryIdentities() ?: emptyList()) }, "notaries") + val notaryNodes: ObservableList = notaries.map { rpcProxy.value?.cordaRPCOps?.nodeInfoFromParty(it) }.filterNotNull() val parties: ObservableList = networkIdentities - .filtered { it.legalIdentities.all { it !in notaries } } - val myIdentity = rpcProxy.map { it?.nodeInfo()?.legalIdentitiesAndCerts?.first()?.party } + .filtered { it.legalIdentities.all { it !in notaries } }.unique() + val myIdentity = rpcProxy.map { it?.cordaRPCOps?.nodeInfo()?.legalIdentitiesAndCerts?.first()?.party } fun partyFromPublicKey(publicKey: PublicKey): ObservableValue = identityCache[publicKey]!! } diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt index abb98d90af..b15cfcae12 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt @@ -10,12 +10,16 @@ package net.corda.client.jfx.model +import com.sun.javafx.application.PlatformImpl +import javafx.application.Platform import javafx.beans.property.SimpleObjectProperty import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClientConfiguration +import net.corda.client.rpc.CordaRPCConnection import net.corda.core.contracts.ContractState import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party +import net.corda.core.internal.staticField import net.corda.core.messaging.* import net.corda.core.node.services.NetworkMapCache.MapChange import net.corda.core.node.services.Vault @@ -25,9 +29,14 @@ import net.corda.core.node.services.vault.PageSpecification import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger import net.corda.core.utilities.seconds +import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import rx.Observable +import rx.Subscription import rx.subjects.PublishSubject +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val message: String) { companion object { @@ -44,6 +53,7 @@ data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val mess */ class NodeMonitorModel { + private val retryableStateMachineUpdatesSubject = PublishSubject.create() private val stateMachineUpdatesSubject = PublishSubject.create() private val vaultUpdatesSubject = PublishSubject.create>() private val transactionsSubject = PublishSubject.create() @@ -58,27 +68,76 @@ class NodeMonitorModel { val progressTracking: Observable = progressTrackingSubject val networkMap: Observable = networkMapSubject - val proxyObservable = SimpleObjectProperty() + val proxyObservable = SimpleObjectProperty() lateinit var notaryIdentities: List + companion object { + val logger = contextLogger() + + private fun runLaterIfInitialized(op: () -> Unit) { + + val initialized = PlatformImpl::class.java.staticField("initialized") + + // Only execute using "runLater()" if JavaFX been initialized. + // It may not be initialized in the unit test. + if(initialized.value.get()) { + Platform.runLater(op) + } else { + op() + } + } + } + + /** + * This is needed as JavaFX listener framework attempts to call `equals()` before dispatching notification change. + * And calling `CordaRPCOps.equals()` results in (unhandled) remote call. + */ + class CordaRPCOpsWrapper(val cordaRPCOps: CordaRPCOps) + /** * Register for updates to/from a given vault. * TODO provide an unsubscribe mechanism */ fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) { - val client = CordaRPCClient( - nodeHostAndPort, - object : CordaRPCClientConfiguration { - override val connectionMaxRetryInterval = 10.seconds - } - ) - val connection = client.start(username, password) - val proxy = connection.proxy - notaryIdentities = proxy.notaryIdentities() - val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed() + // `retryableStateMachineUpdatesSubject` will change it's upstream subscriber in case of RPC connection failure, this `Observable` should + // never produce an error. + // `stateMachineUpdatesSubject` will stay firmly subscribed to `retryableStateMachineUpdatesSubject` + retryableStateMachineUpdatesSubject.subscribe(stateMachineUpdatesSubject) + + // Proxy may change during re-connect, ensure that subject wiring accurately reacts to this activity. + proxyObservable.addListener { _, _, wrapper -> + if(wrapper != null) { + val proxy = wrapper.cordaRPCOps + // Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates + val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL), + PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE)) + val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ -> + statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED + }.toSet() + val consumedStates = statesSnapshot.states.toSet() - unconsumedStates + val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates) + vaultUpdates.startWith(initialVaultUpdate).subscribe({ vaultUpdatesSubject.onNext(it) }, {}) + + // Transactions + val (transactions, newTransactions) = proxy.internalVerifiedTransactionsFeed() + newTransactions.startWith(transactions).subscribe({ transactionsSubject.onNext(it) }, {}) + + // SM -> TX mapping + val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMappingFeed() + futureSmTxMappings.startWith(smTxMappings).subscribe({ stateMachineTransactionMappingSubject.onNext(it) }, {}) + + // Parties on network + val (parties, futurePartyUpdate) = proxy.networkMapFeed() + futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe({ networkMapSubject.onNext(it) }, {}) + } + } + + val stateMachines = performRpcReconnect(nodeHostAndPort, username, password) + // Extract the flow tracking stream // TODO is there a nicer way of doing this? Stream of streams in general results in code like this... + // TODO `progressTrackingSubject` doesn't seem to be used anymore - should it be removed? val currentProgressTrackerUpdates = stateMachines.mapNotNull { stateMachine -> ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachine) } @@ -92,33 +151,74 @@ class NodeMonitorModel { // We need to retry, because when flow errors, we unsubscribe from progressTrackingSubject. So we end up with stream of state machine updates and no progress trackers. futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject) - - // Now the state machines - val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) } - stateMachineUpdates.startWith(currentStateMachines).subscribe(stateMachineUpdatesSubject) - - // Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates - val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL), - PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE)) - val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ -> - statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED - }.toSet() - val consumedStates = statesSnapshot.states.toSet() - unconsumedStates - val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates) - vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject) - - // Transactions - val (transactions, newTransactions) = proxy.internalVerifiedTransactionsFeed() - newTransactions.startWith(transactions).subscribe(transactionsSubject) - - // SM -> TX mapping - val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMappingFeed() - futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject) - - // Parties on network - val (parties, futurePartyUpdate) = proxy.networkMapFeed() - futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe(networkMapSubject) - - proxyObservable.set(proxy) } -} + + private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): List { + + val connection = establishConnectionWithRetry(nodeHostAndPort, username, password) + val proxy = connection.proxy + + val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed() + + val retryableStateMachineUpdatesSubscription: AtomicReference = AtomicReference(null) + val subscription: Subscription = stateMachineUpdatesRaw + .startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) }) + .subscribe({ retryableStateMachineUpdatesSubject.onNext(it) }, { + // Terminate subscription such that nothing gets past this point to downstream Observables. + retryableStateMachineUpdatesSubscription.get()?.unsubscribe() + // Flag to everyone that proxy is no longer available. + runLaterIfInitialized { proxyObservable.set(null) } + // It is good idea to close connection to properly mark the end of it. During re-connect we will create a new + // client and a new connection, so no going back to this one. Also the server might be down, so we are + // force closing the connection to avoid propagation of notification to the server side. + connection.forceClose() + // Perform re-connect. + performRpcReconnect(nodeHostAndPort, username, password) + }) + + retryableStateMachineUpdatesSubscription.set(subscription) + runLaterIfInitialized { proxyObservable.set(CordaRPCOpsWrapper(proxy)) } + notaryIdentities = proxy.notaryIdentities() + + return stateMachineInfos + } + + private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection { + + val retryInterval = 5.seconds + + do { + val connection = try { + logger.info("Connecting to: $nodeHostAndPort") + val client = CordaRPCClient( + nodeHostAndPort, + object : CordaRPCClientConfiguration { + override val connectionMaxRetryInterval = retryInterval + } + ) + val _connection = client.start(username, password) + // Check connection is truly operational before returning it. + val nodeInfo = _connection.proxy.nodeInfo() + require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty()) + _connection + } catch(secEx: ActiveMQSecurityException) { + // Happens when incorrect credentials provided - no point to retry connecting. + throw secEx + } + catch(th: Throwable) { + // Deliberately not logging full stack trace as it will be full of internal stacktraces. + logger.info("Exception upon establishing connection: " + th.message) + null + } + + if(connection != null) { + logger.info("Connection successfully established with: $nodeHostAndPort") + return connection + } + // Could not connect this time round - pause before giving another try. + Thread.sleep(retryInterval.toMillis()) + } while (connection == null) + + throw IllegalArgumentException("Never reaches here") + } +} \ No newline at end of file diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/TransactionDataModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/TransactionDataModel.kt index 6c5ff06851..b8f8486b71 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/TransactionDataModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/TransactionDataModel.kt @@ -93,7 +93,7 @@ data class PartiallyResolvedTransaction( */ class TransactionDataModel { private val transactions by observable(NodeMonitorModel::transactions) - private val collectedTransactions = transactions.recordInSequence() + private val collectedTransactions = transactions.recordInSequence().distinctBy { it.id } private val vaultUpdates by observable(NodeMonitorModel::vaultUpdates) private val stateMap = vaultUpdates.fold(FXCollections.observableHashMap>()) { map, update -> val states = update.consumed + update.produced diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ChosenList.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ChosenList.kt index 36f48d7b47..06a5b027ca 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ChosenList.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ChosenList.kt @@ -31,7 +31,8 @@ import javafx.collections.ObservableListBase * The above will create a list that chooses and delegates to the appropriate filtered list based on the type of filter. */ class ChosenList( - private val chosenListObservable: ObservableValue> + private val chosenListObservable: ObservableValue>, + private val logicalName: String? = null ) : ObservableListBase() { private var currentList = chosenListObservable.value @@ -68,4 +69,7 @@ class ChosenList( endChange() } -} + override fun toString(): String { + return "ChosenList: $logicalName" + } +} \ No newline at end of file diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableFold.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableFold.kt index ac82943048..d8be074ba2 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableFold.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableFold.kt @@ -18,6 +18,7 @@ import javafx.beans.value.ObservableValue import javafx.collections.FXCollections import javafx.collections.ObservableList import javafx.collections.ObservableMap +import org.slf4j.LoggerFactory import rx.Observable import java.util.concurrent.TimeUnit @@ -25,6 +26,12 @@ import java.util.concurrent.TimeUnit * Simple utilities for converting an [rx.Observable] into a javafx [ObservableValue]/[ObservableList] */ +private val logger = LoggerFactory.getLogger("ObservableFold") + +private fun onError(th: Throwable) { + logger.debug("OnError when folding", th) +} + /** * [foldToObservableValue] takes an [rx.Observable] stream and creates an [ObservableValue] out of it. * @param initial The initial value of the returned observable. @@ -33,11 +40,11 @@ import java.util.concurrent.TimeUnit */ fun Observable.foldToObservableValue(initial: B, folderFun: (A, B) -> B): ObservableValue { val result = SimpleObjectProperty(initial) - subscribe { + subscribe ({ Platform.runLater { result.set(folderFun(it, result.get())) } - } + }, ::onError) return result } @@ -52,7 +59,7 @@ fun Observable.fold(accumulator: R, folderFun: (R, T) -> Unit): R { * This capture is fine, as [Platform.runLater] runs closures in order. * The buffer is to avoid flooding FX thread with runnable. */ - buffer(1, TimeUnit.SECONDS).subscribe { + buffer(1, TimeUnit.SECONDS).subscribe({ if (it.isNotEmpty()) { Platform.runLater { it.fold(accumulator) { list, item -> @@ -61,7 +68,7 @@ fun Observable.fold(accumulator: R, folderFun: (R, T) -> Unit): R { } } } - } + }, ::onError) return accumulator } diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt index e891b373e4..58f944ba61 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt @@ -283,7 +283,7 @@ fun ObservableList.leftOuterJoin( val rightTableMap = rightTable.associateByAggregation(rightToJoinKey) val joinedMap: ObservableMap, ObservableList>> = LeftOuterJoinedMap(leftTableMap, rightTableMap) { _, left, rightValue -> - Pair(left, ChosenList(rightValue.map { it ?: FXCollections.emptyObservableList() })) + Pair(left, ChosenList(rightValue.map { it ?: FXCollections.emptyObservableList() }, "ChosenList from leftOuterJoin")) } return joinedMap } @@ -310,6 +310,10 @@ fun ObservableList.unique(): ObservableList { return AggregatedList(this, { it }, { key, _ -> key }) } +fun ObservableList.distinctBy(toKey: (T) -> K): ObservableList { + return AggregatedList(this, toKey, { _, entryList -> entryList[0] }) +} + fun ObservableValue<*>.isNotNull(): BooleanBinding { return Bindings.createBooleanBinding({ this.value != null }, arrayOf(this)) } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index fb3aa356d3..564e378287 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -371,7 +371,15 @@ class RPCClientProxyHandler( interrupt() join(1000) } - sessionFactory?.close() + + if (notify) { + // This is going to send remote message, see `org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.doCleanUp()`. + sessionFactory?.close() + } else { + // This performs a cheaper and faster version of local cleanup. + sessionFactory?.cleanup() + } + reaperScheduledFuture?.cancel(false) observableContext.observableMap.invalidateAll() reapObservables(notify) @@ -528,7 +536,11 @@ class RPCClientProxyHandler( val m = observableContext.observableMap.asMap() m.keys.forEach { k -> observationExecutorPool.run(k) { - m[k]?.onError(RPCException("Connection failure detected.")) + try { + m[k]?.onError(RPCException("Connection failure detected.")) + } catch (th: Throwable) { + log.error("Unexpected exception when RPC connection failure handling", th) + } } } observableContext.observableMap.invalidateAll() diff --git a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt index 8acb3a3469..6d7714c663 100644 --- a/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/InternalUtils.kt @@ -179,14 +179,21 @@ fun Logger.logElapsedTime(label: String, body: () -> T): T = logElapsedTime( fun logElapsedTime(label: String, logger: Logger? = null, body: () -> T): T { // Use nanoTime as it's monotonic. val now = System.nanoTime() + var failed = false try { return body() - } finally { + } + catch (th: Throwable) { + failed = true + throw th + } + finally { val elapsed = Duration.ofNanos(System.nanoTime() - now).toMillis() + val msg = (if(failed) "Failed " else "") + "$label took $elapsed msec" if (logger != null) - logger.info("$label took $elapsed msec") + logger.info(msg) else - println("$label took $elapsed msec") + println(msg) } } diff --git a/docs/make-docsite.sh b/docs/make-docsite.sh index bc317c3e63..1030bd766a 100755 --- a/docs/make-docsite.sh +++ b/docs/make-docsite.sh @@ -12,6 +12,12 @@ else fi # TODO: The PDF rendering is pretty ugly and can be improved a lot. +echo "Generating PDF document ..." make pdf -mv build/pdf/corda-developer-site.pdf build/html/_static/corda-developer-site.pdf + +echo "Generating HTML pages ..." make html + +echo "Moving PDF file into place ..." +mv $PWD/build/pdf/corda-developer-site.pdf $PWD/build/html/_static/corda-developer-site.pdf + diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ByteBufferStreams.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ByteBufferStreams.kt index f3710569ed..e7e942c595 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ByteBufferStreams.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ByteBufferStreams.kt @@ -10,9 +10,9 @@ import java.nio.ByteBuffer import kotlin.math.min internal val serializeOutputStreamPool = LazyPool( - clear = ByteBufferOutputStream::reset, - shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large - newInstance = { ByteBufferOutputStream(64 * 1024) }) + clear = ByteBufferOutputStream::reset, + shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large + newInstance = { ByteBufferOutputStream(64 * 1024) }) internal fun byteArrayOutput(task: (ByteBufferOutputStream) -> T): ByteArray { return serializeOutputStreamPool.run { underlying -> diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt index b99263b528..b76e755e13 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/ServerContexts.kt @@ -12,16 +12,11 @@ package net.corda.nodeapi.internal.serialization -import net.corda.core.serialization.ClassWhitelist import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationDefaults import net.corda.nodeapi.internal.serialization.amqp.amqpMagic import net.corda.nodeapi.internal.serialization.kryo.kryoMagic -object QuasarWhitelist : ClassWhitelist { - override fun hasListed(type: Class<*>): Boolean = true -} - /* * Serialisation contexts for the server. * These have been refactored into a separate file to prevent diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt index f16de30d43..f2888ed907 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt @@ -12,10 +12,7 @@ package net.corda.nodeapi.internal.serialization -import net.corda.core.serialization.EncodingWhitelist -import net.corda.core.serialization.SerializationContext -import net.corda.core.serialization.SerializationDefaults -import net.corda.core.serialization.SerializationEncoding +import net.corda.core.serialization.* import net.corda.nodeapi.internal.serialization.CordaSerializationEncoding.SNAPPY import net.corda.nodeapi.internal.serialization.amqp.amqpMagic import net.corda.nodeapi.internal.serialization.kryo.kryoMagic @@ -47,3 +44,7 @@ val AMQP_P2P_CONTEXT = SerializationContextImpl(amqpMagic, internal object AlwaysAcceptEncodingWhitelist : EncodingWhitelist { override fun acceptEncoding(encoding: SerializationEncoding) = true } + +object QuasarWhitelist : ClassWhitelist { + override fun hasListed(type: Class<*>): Boolean = true +} diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt index 1a7dbebdb4..377c530837 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt @@ -105,11 +105,11 @@ abstract class AbstractAMQPSerializationScheme( register(net.corda.nodeapi.internal.serialization.amqp.custom.X509CRLSerializer) register(net.corda.nodeapi.internal.serialization.amqp.custom.CertPathSerializer(this)) register(net.corda.nodeapi.internal.serialization.amqp.custom.StringBufferSerializer) - register(net.corda.nodeapi.internal.serialization.amqp.custom.SimpleStringSerializer) register(net.corda.nodeapi.internal.serialization.amqp.custom.InputStreamSerializer) register(net.corda.nodeapi.internal.serialization.amqp.custom.BitSetSerializer(this)) register(net.corda.nodeapi.internal.serialization.amqp.custom.EnumSetSerializer(this)) register(net.corda.nodeapi.internal.serialization.amqp.custom.ContractAttachmentSerializer(this)) + registerNonDeterministicSerializers(factory) } for (whitelistProvider in serializationWhitelists) { factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray()) @@ -126,7 +126,15 @@ abstract class AbstractAMQPSerializationScheme( factory.registerExternal(CorDappCustomSerializer(customSerializer, factory)) } } + } + /* + * Register the serializers which will be excluded from the DJVM. + */ + private fun registerNonDeterministicSerializers(factory: SerializerFactory) { + with(factory) { + register(net.corda.nodeapi.internal.serialization.amqp.custom.SimpleStringSerializer) + } } private val serializerFactoriesForContexts = ConcurrentHashMap, SerializerFactory>() diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/ClassCarpenter.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/ClassCarpenter.kt index 00b79d7a5b..f6315957cf 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/ClassCarpenter.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/ClassCarpenter.kt @@ -99,8 +99,9 @@ private val toStringHelper: String = Type.getInternalName(MoreObjects.ToStringHe * * Equals/hashCode methods are not yet supported. */ -class ClassCarpenter(cl: ClassLoader = Thread.currentThread().contextClassLoader, - val whitelist: ClassWhitelist) { +class ClassCarpenter(cl: ClassLoader, val whitelist: ClassWhitelist) { + constructor(whitelist: ClassWhitelist) : this(Thread.currentThread().contextClassLoader, whitelist) + // TODO: Generics. // TODO: Sandbox the generated code when a security manager is in use. // TODO: Generate equals/hashCode. diff --git a/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt index 61990e44e7..26ed5e119b 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt @@ -30,6 +30,7 @@ import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.Try import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.seconds import net.corda.node.internal.StartedNode import net.corda.node.services.config.BFTSMaRtConfiguration import net.corda.node.services.config.NotaryConfig @@ -187,6 +188,21 @@ class BFTNotaryServiceTests { } } + @Test + fun `notarise issue tx with time-window`() { + node.run { + val issueTx = signInitialTransaction(notary) { + setTimeWindow(services.clock.instant(), 30.seconds) + addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint) + } + val resultFuture = services.startFlow(NotaryFlow.Client(issueTx)).resultFuture + + mockNet.runNetwork() + val signatures = resultFuture.get() + verifySignatures(signatures, issueTx.id) + } + } + @Test fun `transactions can be re-notarised outside their time window`() { node.run { diff --git a/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt index fbdde16e6d..e6d98c3e20 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt @@ -20,10 +20,11 @@ import net.corda.core.identity.Party import net.corda.core.internal.concurrent.map import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow -import net.corda.testing.contracts.DummyContract +import net.corda.core.utilities.seconds import net.corda.testing.core.DUMMY_BANK_A_NAME -import net.corda.testing.core.dummyCommand import net.corda.testing.core.singleIdentity +import net.corda.testing.contracts.DummyContract +import net.corda.testing.core.dummyCommand import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.driver import net.corda.testing.driver.internal.InProcessImpl @@ -80,6 +81,23 @@ class RaftNotaryServiceTests : IntegrationTest() { } } + @Test + fun `notarise issue tx with time-window`() { + driver(DriverParameters( + startNodesInProcess = true, + extraCordappPackagesToScan = listOf("net.corda.testing.contracts"), + notarySpecs = listOf(NotarySpec(notaryName, cluster = ClusterSpec.Raft(clusterSize = 3))) + )) { + val bankA = startNode(providedName = DUMMY_BANK_A_NAME).map { (it as InProcessImpl) }.getOrThrow() + val issueTx = bankA.database.transaction { + val builder = DummyContract.generateInitial(Random().nextInt(), defaultNotaryIdentity, bankA.services.myInfo.singleIdentity().ref(0)) + .setTimeWindow(bankA.services.clock.instant(), 30.seconds) + bankA.services.signInitialTransaction(builder) + } + bankA.startFlow(NotaryFlow.Client(issueTx)).getOrThrow() + } + } + private fun issueState(nodeHandle: InProcessImpl, notary: Party): StateAndRef<*> { return nodeHandle.database.transaction { diff --git a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt index 04b336e6d5..ed85b659d1 100644 --- a/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/transactions/ValidatingNotaryServiceTests.kt @@ -179,6 +179,18 @@ class ValidatingNotaryServiceTests { assertThat(ex.error).isInstanceOf(NotaryError.TimeWindowInvalid::class.java) } + @Test + fun `notarise issue tx with time-window`() { + val stx = run { + val tx = DummyContract.generateInitial(Random().nextInt(), notary, alice.ref(0)) + .setTimeWindow(Instant.now(), 30.seconds) + aliceNode.services.signInitialTransaction(tx) + } + + val sig = runNotaryClient(stx).getOrThrow().single() + assertEquals(sig.by, notary.owningKey) + } + @Test fun `should sign identical transaction multiple times (notarisation is idempotent)`() { val stx = run { diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/model/IssuerModel.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/model/IssuerModel.kt index 8b41bff7d8..2d4b281453 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/model/IssuerModel.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/model/IssuerModel.kt @@ -19,17 +19,21 @@ import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.finance.flows.CashConfigDataFlow import tornadofx.* +import java.util.* class IssuerModel { + + private val defaultCurrency = Currency.getInstance("USD") + private val proxy by observableValue(NodeMonitorModel::proxyObservable) - private val cashAppConfiguration = proxy.map { it?.startFlow(::CashConfigDataFlow)?.returnValue?.getOrThrow() } - val supportedCurrencies = ChosenList(cashAppConfiguration.map { it?.supportedCurrencies?.observable() ?: FXCollections.emptyObservableList() }) - val currencyTypes = ChosenList(cashAppConfiguration.map { it?.issuableCurrencies?.observable() ?: FXCollections.emptyObservableList() }) + private val cashAppConfiguration = proxy.map { it?.cordaRPCOps?.startFlow(::CashConfigDataFlow)?.returnValue?.getOrThrow() } + val supportedCurrencies = ChosenList(cashAppConfiguration.map { it?.supportedCurrencies?.observable() ?: FXCollections.singletonObservableList(defaultCurrency) }, "supportedCurrencies") + val currencyTypes = ChosenList(cashAppConfiguration.map { it?.issuableCurrencies?.observable() ?: FXCollections.emptyObservableList() }, "currencyTypes") val transactionTypes = ChosenList(cashAppConfiguration.map { if (it?.issuableCurrencies?.isNotEmpty() == true) CashTransaction.values().asList().observable() else listOf(CashTransaction.Pay).observable() - }) + }, "transactionTypes") } diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/model/MembershipListModel.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/model/MembershipListModel.kt index e612b142c8..42f6bbf3ea 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/model/MembershipListModel.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/model/MembershipListModel.kt @@ -24,7 +24,7 @@ import net.corda.sample.businessnetwork.membership.flow.ObtainMembershipListCont class MembershipListModel { private val proxy by observableValue(NodeMonitorModel::proxyObservable) - private val members = proxy.map { it?.startFlow(::ObtainMembershipListContentFlow, IOUFlow.allowedMembershipName)?.returnValue?.getOrThrow() } + private val members = proxy.map { it?.cordaRPCOps?.startFlow(::ObtainMembershipListContentFlow, IOUFlow.allowedMembershipName)?.returnValue?.getOrThrow() } private val observableValueOfParties = members.map { FXCollections.observableList(it?.toList() ?: emptyList()) } diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/views/MainView.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/views/MainView.kt index e6f6409dd1..852acc438f 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/views/MainView.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/views/MainView.kt @@ -16,9 +16,7 @@ import javafx.beans.binding.Bindings import javafx.geometry.Insets import javafx.geometry.Pos import javafx.scene.Parent -import javafx.scene.control.ContentDisplay -import javafx.scene.control.MenuButton -import javafx.scene.control.MenuItem +import javafx.scene.control.* import javafx.scene.input.MouseButton import javafx.scene.layout.BorderPane import javafx.scene.layout.StackPane @@ -27,10 +25,7 @@ import javafx.scene.text.Font import javafx.scene.text.TextAlignment import javafx.stage.Stage import javafx.stage.WindowEvent -import net.corda.client.jfx.model.NetworkIdentityModel -import net.corda.client.jfx.model.objectProperty -import net.corda.client.jfx.model.observableList -import net.corda.client.jfx.model.observableValue +import net.corda.client.jfx.model.* import net.corda.client.jfx.utils.ChosenList import net.corda.client.jfx.utils.map import net.corda.explorer.formatters.PartyNameFormatter @@ -48,11 +43,14 @@ class MainView : View(WINDOW_TITLE) { private val exit by fxid() private val sidebar by fxid() private val selectionBorderPane by fxid() + private val mainSplitPane by fxid() + private val rpcWarnLabel by fxid