mirror of
https://github.com/corda/corda.git
synced 2025-02-15 15:12:46 +00:00
Merge remote-tracking branch 'remotes/open/master' into feature/vkolomeyko/os-merge
# Conflicts: # node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SharedContexts.kt # node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt # tools/explorer/src/main/kotlin/net/corda/explorer/views/SearchField.kt # tools/explorer/src/main/kotlin/net/corda/explorer/views/TransactionViewer.kt
This commit is contained in:
commit
355c65ad62
@ -99,7 +99,7 @@ class NodeMonitorModelTest : IntegrationTest() {
|
|||||||
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()
|
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()
|
||||||
|
|
||||||
monitor.register(aliceNodeHandle.rpcAddress, cashUser.username, cashUser.password)
|
monitor.register(aliceNodeHandle.rpcAddress, cashUser.username, cashUser.password)
|
||||||
rpc = monitor.proxyObservable.value!!
|
rpc = monitor.proxyObservable.value!!.cordaRPCOps
|
||||||
notaryParty = defaultNotaryIdentity
|
notaryParty = defaultNotaryIdentity
|
||||||
|
|
||||||
val bobNodeHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(cashUser)).getOrThrow()
|
val bobNodeHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(cashUser)).getOrThrow()
|
||||||
@ -107,7 +107,7 @@ class NodeMonitorModelTest : IntegrationTest() {
|
|||||||
val monitorBob = NodeMonitorModel()
|
val monitorBob = NodeMonitorModel()
|
||||||
stateMachineUpdatesBob = monitorBob.stateMachineUpdates.bufferUntilSubscribed()
|
stateMachineUpdatesBob = monitorBob.stateMachineUpdates.bufferUntilSubscribed()
|
||||||
monitorBob.register(bobNodeHandle.rpcAddress, cashUser.username, cashUser.password)
|
monitorBob.register(bobNodeHandle.rpcAddress, cashUser.username, cashUser.password)
|
||||||
rpcBob = monitorBob.proxyObservable.value!!
|
rpcBob = monitorBob.proxyObservable.value!!.cordaRPCOps
|
||||||
runTest()
|
runTest()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,6 +12,7 @@ package net.corda.client.jfx.model
|
|||||||
|
|
||||||
import javafx.collections.FXCollections
|
import javafx.collections.FXCollections
|
||||||
import javafx.collections.ObservableList
|
import javafx.collections.ObservableList
|
||||||
|
import net.corda.client.jfx.utils.distinctBy
|
||||||
import net.corda.client.jfx.utils.fold
|
import net.corda.client.jfx.utils.fold
|
||||||
import net.corda.client.jfx.utils.map
|
import net.corda.client.jfx.utils.map
|
||||||
import net.corda.core.contracts.ContractState
|
import net.corda.core.contracts.ContractState
|
||||||
@ -41,7 +42,7 @@ class ContractStateModel {
|
|||||||
val cashStates: ObservableList<StateAndRef<Cash.State>> = cashStatesDiff.fold(FXCollections.observableArrayList()) { list: MutableList<StateAndRef<Cash.State>>, statesDiff ->
|
val cashStates: ObservableList<StateAndRef<Cash.State>> = cashStatesDiff.fold(FXCollections.observableArrayList()) { list: MutableList<StateAndRef<Cash.State>>, statesDiff ->
|
||||||
list.removeIf { it in statesDiff.removed }
|
list.removeIf { it in statesDiff.removed }
|
||||||
list.addAll(statesDiff.added)
|
list.addAll(statesDiff.added)
|
||||||
}
|
}.distinctBy { it.ref }
|
||||||
|
|
||||||
val cash = cashStates.map { it.state.data.amount }
|
val cash = cashStates.map { it.state.data.amount }
|
||||||
|
|
||||||
|
@ -14,12 +14,8 @@ import com.github.benmanes.caffeine.cache.Caffeine
|
|||||||
import javafx.beans.value.ObservableValue
|
import javafx.beans.value.ObservableValue
|
||||||
import javafx.collections.FXCollections
|
import javafx.collections.FXCollections
|
||||||
import javafx.collections.ObservableList
|
import javafx.collections.ObservableList
|
||||||
import net.corda.client.jfx.utils.ChosenList
|
import net.corda.client.jfx.utils.*
|
||||||
import net.corda.client.jfx.utils.filterNotNull
|
|
||||||
import net.corda.client.jfx.utils.fold
|
|
||||||
import net.corda.client.jfx.utils.map
|
|
||||||
import net.corda.core.identity.AnonymousParty
|
import net.corda.core.identity.AnonymousParty
|
||||||
import net.corda.core.identity.Party
|
|
||||||
import net.corda.core.node.NodeInfo
|
import net.corda.core.node.NodeInfo
|
||||||
import net.corda.core.node.services.NetworkMapCache.MapChange
|
import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
@ -45,13 +41,13 @@ class NetworkIdentityModel {
|
|||||||
|
|
||||||
private val identityCache = Caffeine.newBuilder()
|
private val identityCache = Caffeine.newBuilder()
|
||||||
.build<PublicKey, ObservableValue<NodeInfo?>>({ publicKey ->
|
.build<PublicKey, ObservableValue<NodeInfo?>>({ publicKey ->
|
||||||
publicKey?.let { rpcProxy.map { it?.nodeInfoFromParty(AnonymousParty(publicKey)) } }
|
publicKey.let { rpcProxy.map { it?.cordaRPCOps?.nodeInfoFromParty(AnonymousParty(publicKey)) } }
|
||||||
})
|
})
|
||||||
val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.notaryIdentities() ?: emptyList()) })
|
val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.cordaRPCOps?.notaryIdentities() ?: emptyList()) }, "notaries")
|
||||||
val notaryNodes: ObservableList<NodeInfo> = notaries.map { rpcProxy.value?.nodeInfoFromParty(it) }.filterNotNull()
|
val notaryNodes: ObservableList<NodeInfo> = notaries.map { rpcProxy.value?.cordaRPCOps?.nodeInfoFromParty(it) }.filterNotNull()
|
||||||
val parties: ObservableList<NodeInfo> = networkIdentities
|
val parties: ObservableList<NodeInfo> = networkIdentities
|
||||||
.filtered { it.legalIdentities.all { it !in notaries } }
|
.filtered { it.legalIdentities.all { it !in notaries } }.unique()
|
||||||
val myIdentity = rpcProxy.map { it?.nodeInfo()?.legalIdentitiesAndCerts?.first()?.party }
|
val myIdentity = rpcProxy.map { it?.cordaRPCOps?.nodeInfo()?.legalIdentitiesAndCerts?.first()?.party }
|
||||||
|
|
||||||
fun partyFromPublicKey(publicKey: PublicKey): ObservableValue<NodeInfo?> = identityCache[publicKey]!!
|
fun partyFromPublicKey(publicKey: PublicKey): ObservableValue<NodeInfo?> = identityCache[publicKey]!!
|
||||||
}
|
}
|
||||||
|
@ -10,12 +10,16 @@
|
|||||||
|
|
||||||
package net.corda.client.jfx.model
|
package net.corda.client.jfx.model
|
||||||
|
|
||||||
|
import com.sun.javafx.application.PlatformImpl
|
||||||
|
import javafx.application.Platform
|
||||||
import javafx.beans.property.SimpleObjectProperty
|
import javafx.beans.property.SimpleObjectProperty
|
||||||
import net.corda.client.rpc.CordaRPCClient
|
import net.corda.client.rpc.CordaRPCClient
|
||||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||||
|
import net.corda.client.rpc.CordaRPCConnection
|
||||||
import net.corda.core.contracts.ContractState
|
import net.corda.core.contracts.ContractState
|
||||||
import net.corda.core.flows.StateMachineRunId
|
import net.corda.core.flows.StateMachineRunId
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
|
import net.corda.core.internal.staticField
|
||||||
import net.corda.core.messaging.*
|
import net.corda.core.messaging.*
|
||||||
import net.corda.core.node.services.NetworkMapCache.MapChange
|
import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||||
import net.corda.core.node.services.Vault
|
import net.corda.core.node.services.Vault
|
||||||
@ -25,9 +29,14 @@ import net.corda.core.node.services.vault.PageSpecification
|
|||||||
import net.corda.core.node.services.vault.QueryCriteria
|
import net.corda.core.node.services.vault.QueryCriteria
|
||||||
import net.corda.core.transactions.SignedTransaction
|
import net.corda.core.transactions.SignedTransaction
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.core.utilities.seconds
|
import net.corda.core.utilities.seconds
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
|
import rx.Subscription
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val message: String) {
|
data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val message: String) {
|
||||||
companion object {
|
companion object {
|
||||||
@ -44,6 +53,7 @@ data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val mess
|
|||||||
*/
|
*/
|
||||||
class NodeMonitorModel {
|
class NodeMonitorModel {
|
||||||
|
|
||||||
|
private val retryableStateMachineUpdatesSubject = PublishSubject.create<StateMachineUpdate>()
|
||||||
private val stateMachineUpdatesSubject = PublishSubject.create<StateMachineUpdate>()
|
private val stateMachineUpdatesSubject = PublishSubject.create<StateMachineUpdate>()
|
||||||
private val vaultUpdatesSubject = PublishSubject.create<Vault.Update<ContractState>>()
|
private val vaultUpdatesSubject = PublishSubject.create<Vault.Update<ContractState>>()
|
||||||
private val transactionsSubject = PublishSubject.create<SignedTransaction>()
|
private val transactionsSubject = PublishSubject.create<SignedTransaction>()
|
||||||
@ -58,27 +68,76 @@ class NodeMonitorModel {
|
|||||||
val progressTracking: Observable<ProgressTrackingEvent> = progressTrackingSubject
|
val progressTracking: Observable<ProgressTrackingEvent> = progressTrackingSubject
|
||||||
val networkMap: Observable<MapChange> = networkMapSubject
|
val networkMap: Observable<MapChange> = networkMapSubject
|
||||||
|
|
||||||
val proxyObservable = SimpleObjectProperty<CordaRPCOps?>()
|
val proxyObservable = SimpleObjectProperty<CordaRPCOpsWrapper?>()
|
||||||
lateinit var notaryIdentities: List<Party>
|
lateinit var notaryIdentities: List<Party>
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
val logger = contextLogger()
|
||||||
|
|
||||||
|
private fun runLaterIfInitialized(op: () -> Unit) {
|
||||||
|
|
||||||
|
val initialized = PlatformImpl::class.java.staticField<AtomicBoolean>("initialized")
|
||||||
|
|
||||||
|
// Only execute using "runLater()" if JavaFX been initialized.
|
||||||
|
// It may not be initialized in the unit test.
|
||||||
|
if(initialized.value.get()) {
|
||||||
|
Platform.runLater(op)
|
||||||
|
} else {
|
||||||
|
op()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is needed as JavaFX listener framework attempts to call `equals()` before dispatching notification change.
|
||||||
|
* And calling `CordaRPCOps.equals()` results in (unhandled) remote call.
|
||||||
|
*/
|
||||||
|
class CordaRPCOpsWrapper(val cordaRPCOps: CordaRPCOps)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register for updates to/from a given vault.
|
* Register for updates to/from a given vault.
|
||||||
* TODO provide an unsubscribe mechanism
|
* TODO provide an unsubscribe mechanism
|
||||||
*/
|
*/
|
||||||
fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
|
fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
|
||||||
val client = CordaRPCClient(
|
|
||||||
nodeHostAndPort,
|
|
||||||
object : CordaRPCClientConfiguration {
|
|
||||||
override val connectionMaxRetryInterval = 10.seconds
|
|
||||||
}
|
|
||||||
)
|
|
||||||
val connection = client.start(username, password)
|
|
||||||
val proxy = connection.proxy
|
|
||||||
notaryIdentities = proxy.notaryIdentities()
|
|
||||||
|
|
||||||
val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed()
|
// `retryableStateMachineUpdatesSubject` will change it's upstream subscriber in case of RPC connection failure, this `Observable` should
|
||||||
|
// never produce an error.
|
||||||
|
// `stateMachineUpdatesSubject` will stay firmly subscribed to `retryableStateMachineUpdatesSubject`
|
||||||
|
retryableStateMachineUpdatesSubject.subscribe(stateMachineUpdatesSubject)
|
||||||
|
|
||||||
|
// Proxy may change during re-connect, ensure that subject wiring accurately reacts to this activity.
|
||||||
|
proxyObservable.addListener { _, _, wrapper ->
|
||||||
|
if(wrapper != null) {
|
||||||
|
val proxy = wrapper.cordaRPCOps
|
||||||
|
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
|
||||||
|
val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
|
||||||
|
PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE))
|
||||||
|
val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ ->
|
||||||
|
statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED
|
||||||
|
}.toSet()
|
||||||
|
val consumedStates = statesSnapshot.states.toSet() - unconsumedStates
|
||||||
|
val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates)
|
||||||
|
vaultUpdates.startWith(initialVaultUpdate).subscribe({ vaultUpdatesSubject.onNext(it) }, {})
|
||||||
|
|
||||||
|
// Transactions
|
||||||
|
val (transactions, newTransactions) = proxy.internalVerifiedTransactionsFeed()
|
||||||
|
newTransactions.startWith(transactions).subscribe({ transactionsSubject.onNext(it) }, {})
|
||||||
|
|
||||||
|
// SM -> TX mapping
|
||||||
|
val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMappingFeed()
|
||||||
|
futureSmTxMappings.startWith(smTxMappings).subscribe({ stateMachineTransactionMappingSubject.onNext(it) }, {})
|
||||||
|
|
||||||
|
// Parties on network
|
||||||
|
val (parties, futurePartyUpdate) = proxy.networkMapFeed()
|
||||||
|
futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe({ networkMapSubject.onNext(it) }, {})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val stateMachines = performRpcReconnect(nodeHostAndPort, username, password)
|
||||||
|
|
||||||
// Extract the flow tracking stream
|
// Extract the flow tracking stream
|
||||||
// TODO is there a nicer way of doing this? Stream of streams in general results in code like this...
|
// TODO is there a nicer way of doing this? Stream of streams in general results in code like this...
|
||||||
|
// TODO `progressTrackingSubject` doesn't seem to be used anymore - should it be removed?
|
||||||
val currentProgressTrackerUpdates = stateMachines.mapNotNull { stateMachine ->
|
val currentProgressTrackerUpdates = stateMachines.mapNotNull { stateMachine ->
|
||||||
ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachine)
|
ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachine)
|
||||||
}
|
}
|
||||||
@ -92,33 +151,74 @@ class NodeMonitorModel {
|
|||||||
|
|
||||||
// We need to retry, because when flow errors, we unsubscribe from progressTrackingSubject. So we end up with stream of state machine updates and no progress trackers.
|
// We need to retry, because when flow errors, we unsubscribe from progressTrackingSubject. So we end up with stream of state machine updates and no progress trackers.
|
||||||
futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject)
|
futureProgressTrackerUpdates.startWith(currentProgressTrackerUpdates).flatMap { it }.retry().subscribe(progressTrackingSubject)
|
||||||
|
}
|
||||||
|
|
||||||
// Now the state machines
|
private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): List<StateMachineInfo> {
|
||||||
val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) }
|
|
||||||
stateMachineUpdates.startWith(currentStateMachines).subscribe(stateMachineUpdatesSubject)
|
|
||||||
|
|
||||||
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
|
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password)
|
||||||
val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
|
val proxy = connection.proxy
|
||||||
PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE))
|
|
||||||
val unconsumedStates = statesSnapshot.states.filterIndexed { index, _ ->
|
|
||||||
statesSnapshot.statesMetadata[index].status == Vault.StateStatus.UNCONSUMED
|
|
||||||
}.toSet()
|
|
||||||
val consumedStates = statesSnapshot.states.toSet() - unconsumedStates
|
|
||||||
val initialVaultUpdate = Vault.Update(consumedStates, unconsumedStates)
|
|
||||||
vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject)
|
|
||||||
|
|
||||||
// Transactions
|
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
|
||||||
val (transactions, newTransactions) = proxy.internalVerifiedTransactionsFeed()
|
|
||||||
newTransactions.startWith(transactions).subscribe(transactionsSubject)
|
|
||||||
|
|
||||||
// SM -> TX mapping
|
val retryableStateMachineUpdatesSubscription: AtomicReference<Subscription?> = AtomicReference(null)
|
||||||
val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMappingFeed()
|
val subscription: Subscription = stateMachineUpdatesRaw
|
||||||
futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject)
|
.startWith(stateMachineInfos.map { StateMachineUpdate.Added(it) })
|
||||||
|
.subscribe({ retryableStateMachineUpdatesSubject.onNext(it) }, {
|
||||||
|
// Terminate subscription such that nothing gets past this point to downstream Observables.
|
||||||
|
retryableStateMachineUpdatesSubscription.get()?.unsubscribe()
|
||||||
|
// Flag to everyone that proxy is no longer available.
|
||||||
|
runLaterIfInitialized { proxyObservable.set(null) }
|
||||||
|
// It is good idea to close connection to properly mark the end of it. During re-connect we will create a new
|
||||||
|
// client and a new connection, so no going back to this one. Also the server might be down, so we are
|
||||||
|
// force closing the connection to avoid propagation of notification to the server side.
|
||||||
|
connection.forceClose()
|
||||||
|
// Perform re-connect.
|
||||||
|
performRpcReconnect(nodeHostAndPort, username, password)
|
||||||
|
})
|
||||||
|
|
||||||
// Parties on network
|
retryableStateMachineUpdatesSubscription.set(subscription)
|
||||||
val (parties, futurePartyUpdate) = proxy.networkMapFeed()
|
runLaterIfInitialized { proxyObservable.set(CordaRPCOpsWrapper(proxy)) }
|
||||||
futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe(networkMapSubject)
|
notaryIdentities = proxy.notaryIdentities()
|
||||||
|
|
||||||
proxyObservable.set(proxy)
|
return stateMachineInfos
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String): CordaRPCConnection {
|
||||||
|
|
||||||
|
val retryInterval = 5.seconds
|
||||||
|
|
||||||
|
do {
|
||||||
|
val connection = try {
|
||||||
|
logger.info("Connecting to: $nodeHostAndPort")
|
||||||
|
val client = CordaRPCClient(
|
||||||
|
nodeHostAndPort,
|
||||||
|
object : CordaRPCClientConfiguration {
|
||||||
|
override val connectionMaxRetryInterval = retryInterval
|
||||||
|
}
|
||||||
|
)
|
||||||
|
val _connection = client.start(username, password)
|
||||||
|
// Check connection is truly operational before returning it.
|
||||||
|
val nodeInfo = _connection.proxy.nodeInfo()
|
||||||
|
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
||||||
|
_connection
|
||||||
|
} catch(secEx: ActiveMQSecurityException) {
|
||||||
|
// Happens when incorrect credentials provided - no point to retry connecting.
|
||||||
|
throw secEx
|
||||||
|
}
|
||||||
|
catch(th: Throwable) {
|
||||||
|
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||||
|
logger.info("Exception upon establishing connection: " + th.message)
|
||||||
|
null
|
||||||
|
}
|
||||||
|
|
||||||
|
if(connection != null) {
|
||||||
|
logger.info("Connection successfully established with: $nodeHostAndPort")
|
||||||
|
return connection
|
||||||
|
}
|
||||||
|
// Could not connect this time round - pause before giving another try.
|
||||||
|
Thread.sleep(retryInterval.toMillis())
|
||||||
|
} while (connection == null)
|
||||||
|
|
||||||
|
throw IllegalArgumentException("Never reaches here")
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -93,7 +93,7 @@ data class PartiallyResolvedTransaction(
|
|||||||
*/
|
*/
|
||||||
class TransactionDataModel {
|
class TransactionDataModel {
|
||||||
private val transactions by observable(NodeMonitorModel::transactions)
|
private val transactions by observable(NodeMonitorModel::transactions)
|
||||||
private val collectedTransactions = transactions.recordInSequence()
|
private val collectedTransactions = transactions.recordInSequence().distinctBy { it.id }
|
||||||
private val vaultUpdates by observable(NodeMonitorModel::vaultUpdates)
|
private val vaultUpdates by observable(NodeMonitorModel::vaultUpdates)
|
||||||
private val stateMap = vaultUpdates.fold(FXCollections.observableHashMap<StateRef, StateAndRef<ContractState>>()) { map, update ->
|
private val stateMap = vaultUpdates.fold(FXCollections.observableHashMap<StateRef, StateAndRef<ContractState>>()) { map, update ->
|
||||||
val states = update.consumed + update.produced
|
val states = update.consumed + update.produced
|
||||||
|
@ -31,7 +31,8 @@ import javafx.collections.ObservableListBase
|
|||||||
* The above will create a list that chooses and delegates to the appropriate filtered list based on the type of filter.
|
* The above will create a list that chooses and delegates to the appropriate filtered list based on the type of filter.
|
||||||
*/
|
*/
|
||||||
class ChosenList<E>(
|
class ChosenList<E>(
|
||||||
private val chosenListObservable: ObservableValue<out ObservableList<out E>>
|
private val chosenListObservable: ObservableValue<out ObservableList<out E>>,
|
||||||
|
private val logicalName: String? = null
|
||||||
) : ObservableListBase<E>() {
|
) : ObservableListBase<E>() {
|
||||||
|
|
||||||
private var currentList = chosenListObservable.value
|
private var currentList = chosenListObservable.value
|
||||||
@ -68,4 +69,7 @@ class ChosenList<E>(
|
|||||||
endChange()
|
endChange()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun toString(): String {
|
||||||
|
return "ChosenList: $logicalName"
|
||||||
|
}
|
||||||
}
|
}
|
@ -18,6 +18,7 @@ import javafx.beans.value.ObservableValue
|
|||||||
import javafx.collections.FXCollections
|
import javafx.collections.FXCollections
|
||||||
import javafx.collections.ObservableList
|
import javafx.collections.ObservableList
|
||||||
import javafx.collections.ObservableMap
|
import javafx.collections.ObservableMap
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
@ -25,6 +26,12 @@ import java.util.concurrent.TimeUnit
|
|||||||
* Simple utilities for converting an [rx.Observable] into a javafx [ObservableValue]/[ObservableList]
|
* Simple utilities for converting an [rx.Observable] into a javafx [ObservableValue]/[ObservableList]
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
private val logger = LoggerFactory.getLogger("ObservableFold")
|
||||||
|
|
||||||
|
private fun onError(th: Throwable) {
|
||||||
|
logger.debug("OnError when folding", th)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* [foldToObservableValue] takes an [rx.Observable] stream and creates an [ObservableValue] out of it.
|
* [foldToObservableValue] takes an [rx.Observable] stream and creates an [ObservableValue] out of it.
|
||||||
* @param initial The initial value of the returned observable.
|
* @param initial The initial value of the returned observable.
|
||||||
@ -33,11 +40,11 @@ import java.util.concurrent.TimeUnit
|
|||||||
*/
|
*/
|
||||||
fun <A, B> Observable<A>.foldToObservableValue(initial: B, folderFun: (A, B) -> B): ObservableValue<B> {
|
fun <A, B> Observable<A>.foldToObservableValue(initial: B, folderFun: (A, B) -> B): ObservableValue<B> {
|
||||||
val result = SimpleObjectProperty<B>(initial)
|
val result = SimpleObjectProperty<B>(initial)
|
||||||
subscribe {
|
subscribe ({
|
||||||
Platform.runLater {
|
Platform.runLater {
|
||||||
result.set(folderFun(it, result.get()))
|
result.set(folderFun(it, result.get()))
|
||||||
}
|
}
|
||||||
}
|
}, ::onError)
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -52,7 +59,7 @@ fun <T, R> Observable<T>.fold(accumulator: R, folderFun: (R, T) -> Unit): R {
|
|||||||
* This capture is fine, as [Platform.runLater] runs closures in order.
|
* This capture is fine, as [Platform.runLater] runs closures in order.
|
||||||
* The buffer is to avoid flooding FX thread with runnable.
|
* The buffer is to avoid flooding FX thread with runnable.
|
||||||
*/
|
*/
|
||||||
buffer(1, TimeUnit.SECONDS).subscribe {
|
buffer(1, TimeUnit.SECONDS).subscribe({
|
||||||
if (it.isNotEmpty()) {
|
if (it.isNotEmpty()) {
|
||||||
Platform.runLater {
|
Platform.runLater {
|
||||||
it.fold(accumulator) { list, item ->
|
it.fold(accumulator) { list, item ->
|
||||||
@ -61,7 +68,7 @@ fun <T, R> Observable<T>.fold(accumulator: R, folderFun: (R, T) -> Unit): R {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}, ::onError)
|
||||||
return accumulator
|
return accumulator
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -283,7 +283,7 @@ fun <A : Any, B : Any, K : Any> ObservableList<A>.leftOuterJoin(
|
|||||||
val rightTableMap = rightTable.associateByAggregation(rightToJoinKey)
|
val rightTableMap = rightTable.associateByAggregation(rightToJoinKey)
|
||||||
val joinedMap: ObservableMap<K, Pair<ObservableList<A>, ObservableList<B>>> =
|
val joinedMap: ObservableMap<K, Pair<ObservableList<A>, ObservableList<B>>> =
|
||||||
LeftOuterJoinedMap(leftTableMap, rightTableMap) { _, left, rightValue ->
|
LeftOuterJoinedMap(leftTableMap, rightTableMap) { _, left, rightValue ->
|
||||||
Pair(left, ChosenList(rightValue.map { it ?: FXCollections.emptyObservableList() }))
|
Pair(left, ChosenList(rightValue.map { it ?: FXCollections.emptyObservableList() }, "ChosenList from leftOuterJoin"))
|
||||||
}
|
}
|
||||||
return joinedMap
|
return joinedMap
|
||||||
}
|
}
|
||||||
@ -310,6 +310,10 @@ fun <T : Any> ObservableList<T>.unique(): ObservableList<T> {
|
|||||||
return AggregatedList(this, { it }, { key, _ -> key })
|
return AggregatedList(this, { it }, { key, _ -> key })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun <T : Any, K : Any> ObservableList<T>.distinctBy(toKey: (T) -> K): ObservableList<T> {
|
||||||
|
return AggregatedList(this, toKey, { _, entryList -> entryList[0] })
|
||||||
|
}
|
||||||
|
|
||||||
fun ObservableValue<*>.isNotNull(): BooleanBinding {
|
fun ObservableValue<*>.isNotNull(): BooleanBinding {
|
||||||
return Bindings.createBooleanBinding({ this.value != null }, arrayOf(this))
|
return Bindings.createBooleanBinding({ this.value != null }, arrayOf(this))
|
||||||
}
|
}
|
||||||
|
@ -371,7 +371,15 @@ class RPCClientProxyHandler(
|
|||||||
interrupt()
|
interrupt()
|
||||||
join(1000)
|
join(1000)
|
||||||
}
|
}
|
||||||
sessionFactory?.close()
|
|
||||||
|
if (notify) {
|
||||||
|
// This is going to send remote message, see `org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.doCleanUp()`.
|
||||||
|
sessionFactory?.close()
|
||||||
|
} else {
|
||||||
|
// This performs a cheaper and faster version of local cleanup.
|
||||||
|
sessionFactory?.cleanup()
|
||||||
|
}
|
||||||
|
|
||||||
reaperScheduledFuture?.cancel(false)
|
reaperScheduledFuture?.cancel(false)
|
||||||
observableContext.observableMap.invalidateAll()
|
observableContext.observableMap.invalidateAll()
|
||||||
reapObservables(notify)
|
reapObservables(notify)
|
||||||
@ -528,7 +536,11 @@ class RPCClientProxyHandler(
|
|||||||
val m = observableContext.observableMap.asMap()
|
val m = observableContext.observableMap.asMap()
|
||||||
m.keys.forEach { k ->
|
m.keys.forEach { k ->
|
||||||
observationExecutorPool.run(k) {
|
observationExecutorPool.run(k) {
|
||||||
m[k]?.onError(RPCException("Connection failure detected."))
|
try {
|
||||||
|
m[k]?.onError(RPCException("Connection failure detected."))
|
||||||
|
} catch (th: Throwable) {
|
||||||
|
log.error("Unexpected exception when RPC connection failure handling", th)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
observableContext.observableMap.invalidateAll()
|
observableContext.observableMap.invalidateAll()
|
||||||
|
@ -179,14 +179,21 @@ fun <T> Logger.logElapsedTime(label: String, body: () -> T): T = logElapsedTime(
|
|||||||
fun <T> logElapsedTime(label: String, logger: Logger? = null, body: () -> T): T {
|
fun <T> logElapsedTime(label: String, logger: Logger? = null, body: () -> T): T {
|
||||||
// Use nanoTime as it's monotonic.
|
// Use nanoTime as it's monotonic.
|
||||||
val now = System.nanoTime()
|
val now = System.nanoTime()
|
||||||
|
var failed = false
|
||||||
try {
|
try {
|
||||||
return body()
|
return body()
|
||||||
} finally {
|
}
|
||||||
|
catch (th: Throwable) {
|
||||||
|
failed = true
|
||||||
|
throw th
|
||||||
|
}
|
||||||
|
finally {
|
||||||
val elapsed = Duration.ofNanos(System.nanoTime() - now).toMillis()
|
val elapsed = Duration.ofNanos(System.nanoTime() - now).toMillis()
|
||||||
|
val msg = (if(failed) "Failed " else "") + "$label took $elapsed msec"
|
||||||
if (logger != null)
|
if (logger != null)
|
||||||
logger.info("$label took $elapsed msec")
|
logger.info(msg)
|
||||||
else
|
else
|
||||||
println("$label took $elapsed msec")
|
println(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,6 +12,12 @@ else
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
# TODO: The PDF rendering is pretty ugly and can be improved a lot.
|
# TODO: The PDF rendering is pretty ugly and can be improved a lot.
|
||||||
|
echo "Generating PDF document ..."
|
||||||
make pdf
|
make pdf
|
||||||
mv build/pdf/corda-developer-site.pdf build/html/_static/corda-developer-site.pdf
|
|
||||||
|
echo "Generating HTML pages ..."
|
||||||
make html
|
make html
|
||||||
|
|
||||||
|
echo "Moving PDF file into place ..."
|
||||||
|
mv $PWD/build/pdf/corda-developer-site.pdf $PWD/build/html/_static/corda-developer-site.pdf
|
||||||
|
|
||||||
|
@ -10,9 +10,9 @@ import java.nio.ByteBuffer
|
|||||||
import kotlin.math.min
|
import kotlin.math.min
|
||||||
|
|
||||||
internal val serializeOutputStreamPool = LazyPool(
|
internal val serializeOutputStreamPool = LazyPool(
|
||||||
clear = ByteBufferOutputStream::reset,
|
clear = ByteBufferOutputStream::reset,
|
||||||
shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large
|
shouldReturnToPool = { it.size() < 256 * 1024 }, // Discard if it grew too large
|
||||||
newInstance = { ByteBufferOutputStream(64 * 1024) })
|
newInstance = { ByteBufferOutputStream(64 * 1024) })
|
||||||
|
|
||||||
internal fun <T> byteArrayOutput(task: (ByteBufferOutputStream) -> T): ByteArray {
|
internal fun <T> byteArrayOutput(task: (ByteBufferOutputStream) -> T): ByteArray {
|
||||||
return serializeOutputStreamPool.run { underlying ->
|
return serializeOutputStreamPool.run { underlying ->
|
||||||
|
@ -12,16 +12,11 @@
|
|||||||
|
|
||||||
package net.corda.nodeapi.internal.serialization
|
package net.corda.nodeapi.internal.serialization
|
||||||
|
|
||||||
import net.corda.core.serialization.ClassWhitelist
|
|
||||||
import net.corda.core.serialization.SerializationContext
|
import net.corda.core.serialization.SerializationContext
|
||||||
import net.corda.core.serialization.SerializationDefaults
|
import net.corda.core.serialization.SerializationDefaults
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.amqpMagic
|
import net.corda.nodeapi.internal.serialization.amqp.amqpMagic
|
||||||
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
||||||
|
|
||||||
object QuasarWhitelist : ClassWhitelist {
|
|
||||||
override fun hasListed(type: Class<*>): Boolean = true
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Serialisation contexts for the server.
|
* Serialisation contexts for the server.
|
||||||
* These have been refactored into a separate file to prevent
|
* These have been refactored into a separate file to prevent
|
||||||
|
@ -12,10 +12,7 @@
|
|||||||
|
|
||||||
package net.corda.nodeapi.internal.serialization
|
package net.corda.nodeapi.internal.serialization
|
||||||
|
|
||||||
import net.corda.core.serialization.EncodingWhitelist
|
import net.corda.core.serialization.*
|
||||||
import net.corda.core.serialization.SerializationContext
|
|
||||||
import net.corda.core.serialization.SerializationDefaults
|
|
||||||
import net.corda.core.serialization.SerializationEncoding
|
|
||||||
import net.corda.nodeapi.internal.serialization.CordaSerializationEncoding.SNAPPY
|
import net.corda.nodeapi.internal.serialization.CordaSerializationEncoding.SNAPPY
|
||||||
import net.corda.nodeapi.internal.serialization.amqp.amqpMagic
|
import net.corda.nodeapi.internal.serialization.amqp.amqpMagic
|
||||||
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
import net.corda.nodeapi.internal.serialization.kryo.kryoMagic
|
||||||
@ -47,3 +44,7 @@ val AMQP_P2P_CONTEXT = SerializationContextImpl(amqpMagic,
|
|||||||
internal object AlwaysAcceptEncodingWhitelist : EncodingWhitelist {
|
internal object AlwaysAcceptEncodingWhitelist : EncodingWhitelist {
|
||||||
override fun acceptEncoding(encoding: SerializationEncoding) = true
|
override fun acceptEncoding(encoding: SerializationEncoding) = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object QuasarWhitelist : ClassWhitelist {
|
||||||
|
override fun hasListed(type: Class<*>): Boolean = true
|
||||||
|
}
|
||||||
|
@ -105,11 +105,11 @@ abstract class AbstractAMQPSerializationScheme(
|
|||||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.X509CRLSerializer)
|
register(net.corda.nodeapi.internal.serialization.amqp.custom.X509CRLSerializer)
|
||||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.CertPathSerializer(this))
|
register(net.corda.nodeapi.internal.serialization.amqp.custom.CertPathSerializer(this))
|
||||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.StringBufferSerializer)
|
register(net.corda.nodeapi.internal.serialization.amqp.custom.StringBufferSerializer)
|
||||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.SimpleStringSerializer)
|
|
||||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.InputStreamSerializer)
|
register(net.corda.nodeapi.internal.serialization.amqp.custom.InputStreamSerializer)
|
||||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.BitSetSerializer(this))
|
register(net.corda.nodeapi.internal.serialization.amqp.custom.BitSetSerializer(this))
|
||||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.EnumSetSerializer(this))
|
register(net.corda.nodeapi.internal.serialization.amqp.custom.EnumSetSerializer(this))
|
||||||
register(net.corda.nodeapi.internal.serialization.amqp.custom.ContractAttachmentSerializer(this))
|
register(net.corda.nodeapi.internal.serialization.amqp.custom.ContractAttachmentSerializer(this))
|
||||||
|
registerNonDeterministicSerializers(factory)
|
||||||
}
|
}
|
||||||
for (whitelistProvider in serializationWhitelists) {
|
for (whitelistProvider in serializationWhitelists) {
|
||||||
factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray())
|
factory.addToWhitelist(*whitelistProvider.whitelist.toTypedArray())
|
||||||
@ -126,7 +126,15 @@ abstract class AbstractAMQPSerializationScheme(
|
|||||||
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
|
factory.registerExternal(CorDappCustomSerializer(customSerializer, factory))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Register the serializers which will be excluded from the DJVM.
|
||||||
|
*/
|
||||||
|
private fun registerNonDeterministicSerializers(factory: SerializerFactory) {
|
||||||
|
with(factory) {
|
||||||
|
register(net.corda.nodeapi.internal.serialization.amqp.custom.SimpleStringSerializer)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private val serializerFactoriesForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>()
|
private val serializerFactoriesForContexts = ConcurrentHashMap<Pair<ClassWhitelist, ClassLoader>, SerializerFactory>()
|
||||||
|
@ -99,8 +99,9 @@ private val toStringHelper: String = Type.getInternalName(MoreObjects.ToStringHe
|
|||||||
*
|
*
|
||||||
* Equals/hashCode methods are not yet supported.
|
* Equals/hashCode methods are not yet supported.
|
||||||
*/
|
*/
|
||||||
class ClassCarpenter(cl: ClassLoader = Thread.currentThread().contextClassLoader,
|
class ClassCarpenter(cl: ClassLoader, val whitelist: ClassWhitelist) {
|
||||||
val whitelist: ClassWhitelist) {
|
constructor(whitelist: ClassWhitelist) : this(Thread.currentThread().contextClassLoader, whitelist)
|
||||||
|
|
||||||
// TODO: Generics.
|
// TODO: Generics.
|
||||||
// TODO: Sandbox the generated code when a security manager is in use.
|
// TODO: Sandbox the generated code when a security manager is in use.
|
||||||
// TODO: Generate equals/hashCode.
|
// TODO: Generate equals/hashCode.
|
||||||
|
@ -30,6 +30,7 @@ import net.corda.core.transactions.TransactionBuilder
|
|||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.Try
|
import net.corda.core.utilities.Try
|
||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
|
import net.corda.core.utilities.seconds
|
||||||
import net.corda.node.internal.StartedNode
|
import net.corda.node.internal.StartedNode
|
||||||
import net.corda.node.services.config.BFTSMaRtConfiguration
|
import net.corda.node.services.config.BFTSMaRtConfiguration
|
||||||
import net.corda.node.services.config.NotaryConfig
|
import net.corda.node.services.config.NotaryConfig
|
||||||
@ -187,6 +188,21 @@ class BFTNotaryServiceTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `notarise issue tx with time-window`() {
|
||||||
|
node.run {
|
||||||
|
val issueTx = signInitialTransaction(notary) {
|
||||||
|
setTimeWindow(services.clock.instant(), 30.seconds)
|
||||||
|
addOutputState(DummyContract.SingleOwnerState(owner = info.singleIdentity()), DummyContract.PROGRAM_ID, AlwaysAcceptAttachmentConstraint)
|
||||||
|
}
|
||||||
|
val resultFuture = services.startFlow(NotaryFlow.Client(issueTx)).resultFuture
|
||||||
|
|
||||||
|
mockNet.runNetwork()
|
||||||
|
val signatures = resultFuture.get()
|
||||||
|
verifySignatures(signatures, issueTx.id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `transactions can be re-notarised outside their time window`() {
|
fun `transactions can be re-notarised outside their time window`() {
|
||||||
node.run {
|
node.run {
|
||||||
|
@ -20,10 +20,11 @@ import net.corda.core.identity.Party
|
|||||||
import net.corda.core.internal.concurrent.map
|
import net.corda.core.internal.concurrent.map
|
||||||
import net.corda.core.transactions.TransactionBuilder
|
import net.corda.core.transactions.TransactionBuilder
|
||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
import net.corda.testing.contracts.DummyContract
|
import net.corda.core.utilities.seconds
|
||||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||||
import net.corda.testing.core.dummyCommand
|
|
||||||
import net.corda.testing.core.singleIdentity
|
import net.corda.testing.core.singleIdentity
|
||||||
|
import net.corda.testing.contracts.DummyContract
|
||||||
|
import net.corda.testing.core.dummyCommand
|
||||||
import net.corda.testing.driver.DriverParameters
|
import net.corda.testing.driver.DriverParameters
|
||||||
import net.corda.testing.driver.driver
|
import net.corda.testing.driver.driver
|
||||||
import net.corda.testing.driver.internal.InProcessImpl
|
import net.corda.testing.driver.internal.InProcessImpl
|
||||||
@ -80,6 +81,23 @@ class RaftNotaryServiceTests : IntegrationTest() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `notarise issue tx with time-window`() {
|
||||||
|
driver(DriverParameters(
|
||||||
|
startNodesInProcess = true,
|
||||||
|
extraCordappPackagesToScan = listOf("net.corda.testing.contracts"),
|
||||||
|
notarySpecs = listOf(NotarySpec(notaryName, cluster = ClusterSpec.Raft(clusterSize = 3)))
|
||||||
|
)) {
|
||||||
|
val bankA = startNode(providedName = DUMMY_BANK_A_NAME).map { (it as InProcessImpl) }.getOrThrow()
|
||||||
|
val issueTx = bankA.database.transaction {
|
||||||
|
val builder = DummyContract.generateInitial(Random().nextInt(), defaultNotaryIdentity, bankA.services.myInfo.singleIdentity().ref(0))
|
||||||
|
.setTimeWindow(bankA.services.clock.instant(), 30.seconds)
|
||||||
|
bankA.services.signInitialTransaction(builder)
|
||||||
|
}
|
||||||
|
bankA.startFlow(NotaryFlow.Client(issueTx)).getOrThrow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private fun issueState(nodeHandle: InProcessImpl, notary: Party): StateAndRef<*> {
|
private fun issueState(nodeHandle: InProcessImpl, notary: Party): StateAndRef<*> {
|
||||||
return nodeHandle.database.transaction {
|
return nodeHandle.database.transaction {
|
||||||
|
|
||||||
|
@ -179,6 +179,18 @@ class ValidatingNotaryServiceTests {
|
|||||||
assertThat(ex.error).isInstanceOf(NotaryError.TimeWindowInvalid::class.java)
|
assertThat(ex.error).isInstanceOf(NotaryError.TimeWindowInvalid::class.java)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `notarise issue tx with time-window`() {
|
||||||
|
val stx = run {
|
||||||
|
val tx = DummyContract.generateInitial(Random().nextInt(), notary, alice.ref(0))
|
||||||
|
.setTimeWindow(Instant.now(), 30.seconds)
|
||||||
|
aliceNode.services.signInitialTransaction(tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
val sig = runNotaryClient(stx).getOrThrow().single()
|
||||||
|
assertEquals(sig.by, notary.owningKey)
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `should sign identical transaction multiple times (notarisation is idempotent)`() {
|
fun `should sign identical transaction multiple times (notarisation is idempotent)`() {
|
||||||
val stx = run {
|
val stx = run {
|
||||||
|
@ -19,17 +19,21 @@ import net.corda.core.messaging.startFlow
|
|||||||
import net.corda.core.utilities.getOrThrow
|
import net.corda.core.utilities.getOrThrow
|
||||||
import net.corda.finance.flows.CashConfigDataFlow
|
import net.corda.finance.flows.CashConfigDataFlow
|
||||||
import tornadofx.*
|
import tornadofx.*
|
||||||
|
import java.util.*
|
||||||
|
|
||||||
class IssuerModel {
|
class IssuerModel {
|
||||||
|
|
||||||
|
private val defaultCurrency = Currency.getInstance("USD")
|
||||||
|
|
||||||
private val proxy by observableValue(NodeMonitorModel::proxyObservable)
|
private val proxy by observableValue(NodeMonitorModel::proxyObservable)
|
||||||
private val cashAppConfiguration = proxy.map { it?.startFlow(::CashConfigDataFlow)?.returnValue?.getOrThrow() }
|
private val cashAppConfiguration = proxy.map { it?.cordaRPCOps?.startFlow(::CashConfigDataFlow)?.returnValue?.getOrThrow() }
|
||||||
val supportedCurrencies = ChosenList(cashAppConfiguration.map { it?.supportedCurrencies?.observable() ?: FXCollections.emptyObservableList() })
|
val supportedCurrencies = ChosenList(cashAppConfiguration.map { it?.supportedCurrencies?.observable() ?: FXCollections.singletonObservableList(defaultCurrency) }, "supportedCurrencies")
|
||||||
val currencyTypes = ChosenList(cashAppConfiguration.map { it?.issuableCurrencies?.observable() ?: FXCollections.emptyObservableList() })
|
val currencyTypes = ChosenList(cashAppConfiguration.map { it?.issuableCurrencies?.observable() ?: FXCollections.emptyObservableList() }, "currencyTypes")
|
||||||
|
|
||||||
val transactionTypes = ChosenList(cashAppConfiguration.map {
|
val transactionTypes = ChosenList(cashAppConfiguration.map {
|
||||||
if (it?.issuableCurrencies?.isNotEmpty() == true)
|
if (it?.issuableCurrencies?.isNotEmpty() == true)
|
||||||
CashTransaction.values().asList().observable()
|
CashTransaction.values().asList().observable()
|
||||||
else
|
else
|
||||||
listOf(CashTransaction.Pay).observable()
|
listOf(CashTransaction.Pay).observable()
|
||||||
})
|
}, "transactionTypes")
|
||||||
}
|
}
|
||||||
|
@ -16,9 +16,7 @@ import javafx.beans.binding.Bindings
|
|||||||
import javafx.geometry.Insets
|
import javafx.geometry.Insets
|
||||||
import javafx.geometry.Pos
|
import javafx.geometry.Pos
|
||||||
import javafx.scene.Parent
|
import javafx.scene.Parent
|
||||||
import javafx.scene.control.ContentDisplay
|
import javafx.scene.control.*
|
||||||
import javafx.scene.control.MenuButton
|
|
||||||
import javafx.scene.control.MenuItem
|
|
||||||
import javafx.scene.input.MouseButton
|
import javafx.scene.input.MouseButton
|
||||||
import javafx.scene.layout.BorderPane
|
import javafx.scene.layout.BorderPane
|
||||||
import javafx.scene.layout.StackPane
|
import javafx.scene.layout.StackPane
|
||||||
@ -27,10 +25,7 @@ import javafx.scene.text.Font
|
|||||||
import javafx.scene.text.TextAlignment
|
import javafx.scene.text.TextAlignment
|
||||||
import javafx.stage.Stage
|
import javafx.stage.Stage
|
||||||
import javafx.stage.WindowEvent
|
import javafx.stage.WindowEvent
|
||||||
import net.corda.client.jfx.model.NetworkIdentityModel
|
import net.corda.client.jfx.model.*
|
||||||
import net.corda.client.jfx.model.objectProperty
|
|
||||||
import net.corda.client.jfx.model.observableList
|
|
||||||
import net.corda.client.jfx.model.observableValue
|
|
||||||
import net.corda.client.jfx.utils.ChosenList
|
import net.corda.client.jfx.utils.ChosenList
|
||||||
import net.corda.client.jfx.utils.map
|
import net.corda.client.jfx.utils.map
|
||||||
import net.corda.explorer.formatters.PartyNameFormatter
|
import net.corda.explorer.formatters.PartyNameFormatter
|
||||||
@ -48,11 +43,14 @@ class MainView : View(WINDOW_TITLE) {
|
|||||||
private val exit by fxid<MenuItem>()
|
private val exit by fxid<MenuItem>()
|
||||||
private val sidebar by fxid<VBox>()
|
private val sidebar by fxid<VBox>()
|
||||||
private val selectionBorderPane by fxid<BorderPane>()
|
private val selectionBorderPane by fxid<BorderPane>()
|
||||||
|
private val mainSplitPane by fxid<SplitPane>()
|
||||||
|
private val rpcWarnLabel by fxid<Label>()
|
||||||
|
|
||||||
// Inject data.
|
// Inject data.
|
||||||
private val myIdentity by observableValue(NetworkIdentityModel::myIdentity)
|
private val myIdentity by observableValue(NetworkIdentityModel::myIdentity)
|
||||||
private val selectedView by objectProperty(CordaViewModel::selectedView)
|
private val selectedView by objectProperty(CordaViewModel::selectedView)
|
||||||
private val registeredViews by observableList(CordaViewModel::registeredViews)
|
private val registeredViews by observableList(CordaViewModel::registeredViews)
|
||||||
|
private val proxy by observableValue(NodeMonitorModel::proxyObservable)
|
||||||
|
|
||||||
private val menuItemCSS = "sidebar-menu-item"
|
private val menuItemCSS = "sidebar-menu-item"
|
||||||
private val menuItemArrowCSS = "sidebar-menu-item-arrow"
|
private val menuItemArrowCSS = "sidebar-menu-item-arrow"
|
||||||
@ -69,7 +67,7 @@ class MainView : View(WINDOW_TITLE) {
|
|||||||
// This needed to be declared val or else it will get GCed and listener unregistered.
|
// This needed to be declared val or else it will get GCed and listener unregistered.
|
||||||
val buttonStyle = ChosenList(selectedView.map { selected ->
|
val buttonStyle = ChosenList(selectedView.map { selected ->
|
||||||
if (selected == it) listOf(menuItemCSS, menuItemSelectedCSS).observable() else listOf(menuItemCSS).observable()
|
if (selected == it) listOf(menuItemCSS, menuItemSelectedCSS).observable() else listOf(menuItemCSS).observable()
|
||||||
})
|
}, "buttonStyle")
|
||||||
stackpane {
|
stackpane {
|
||||||
button(it.title) {
|
button(it.title) {
|
||||||
graphic = FontAwesomeIconView(it.icon).apply {
|
graphic = FontAwesomeIconView(it.icon).apply {
|
||||||
@ -103,5 +101,9 @@ class MainView : View(WINDOW_TITLE) {
|
|||||||
Bindings.bindContent(sidebar.children, menuItems)
|
Bindings.bindContent(sidebar.children, menuItems)
|
||||||
// Main view
|
// Main view
|
||||||
selectionBorderPane.centerProperty().bind(selectedView.map { it?.root })
|
selectionBorderPane.centerProperty().bind(selectedView.map { it?.root })
|
||||||
|
// Trigger depending on RPC connectivity status.
|
||||||
|
val proxyNotAvailable = proxy.map { it == null }
|
||||||
|
mainSplitPane.disableProperty().bind(proxyNotAvailable)
|
||||||
|
rpcWarnLabel.visibleProperty().bind(proxyNotAvailable)
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -51,7 +51,7 @@ class SearchField<T>(private val data: ObservableList<T>, vararg filterCriteria:
|
|||||||
filterCriteria.toMap()[category]?.invoke(data, text) == true
|
filterCriteria.toMap()[category]?.invoke(data, text) == true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, arrayOf<Observable>(textField.textProperty(), searchCategory.valueProperty(), textField.visibleProperty())))
|
}, arrayOf<Observable>(textField.textProperty(), searchCategory.valueProperty(), textField.visibleProperty())), "filteredData")
|
||||||
|
|
||||||
init {
|
init {
|
||||||
clearButton.setOnMouseClicked { event: MouseEvent ->
|
clearButton.setOnMouseClicked { event: MouseEvent ->
|
||||||
|
@ -12,6 +12,7 @@ package net.corda.explorer.views
|
|||||||
|
|
||||||
import de.jensd.fx.glyphs.fontawesome.FontAwesomeIcon
|
import de.jensd.fx.glyphs.fontawesome.FontAwesomeIcon
|
||||||
import javafx.beans.binding.Bindings
|
import javafx.beans.binding.Bindings
|
||||||
|
import javafx.beans.binding.ObjectBinding
|
||||||
import javafx.beans.value.ObservableValue
|
import javafx.beans.value.ObservableValue
|
||||||
import javafx.collections.ObservableList
|
import javafx.collections.ObservableList
|
||||||
import javafx.geometry.HPos
|
import javafx.geometry.HPos
|
||||||
@ -27,17 +28,13 @@ import javafx.scene.layout.BorderPane
|
|||||||
import javafx.scene.layout.Pane
|
import javafx.scene.layout.Pane
|
||||||
import javafx.scene.layout.VBox
|
import javafx.scene.layout.VBox
|
||||||
import net.corda.client.jfx.model.*
|
import net.corda.client.jfx.model.*
|
||||||
import net.corda.client.jfx.utils.filterNotNull
|
import net.corda.client.jfx.utils.*
|
||||||
import net.corda.client.jfx.utils.lift
|
|
||||||
import net.corda.client.jfx.utils.map
|
|
||||||
import net.corda.client.jfx.utils.sequence
|
|
||||||
import net.corda.core.contracts.*
|
import net.corda.core.contracts.*
|
||||||
import net.corda.core.crypto.SecureHash
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.crypto.toStringShort
|
import net.corda.core.crypto.toStringShort
|
||||||
import net.corda.core.identity.AbstractParty
|
import net.corda.core.identity.AbstractParty
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.transactions.SignedTransaction
|
|
||||||
import net.corda.core.transactions.WireTransaction
|
import net.corda.core.transactions.WireTransaction
|
||||||
import net.corda.core.utilities.toBase58String
|
import net.corda.core.utilities.toBase58String
|
||||||
import net.corda.sample.businessnetwork.iou.IOUState
|
import net.corda.sample.businessnetwork.iou.IOUState
|
||||||
@ -74,7 +71,7 @@ class TransactionViewer : CordaView("Transactions") {
|
|||||||
|
|
||||||
private var scrollPosition: Int = 0
|
private var scrollPosition: Int = 0
|
||||||
private lateinit var expander: ExpanderColumn<TransactionViewer.Transaction>
|
private lateinit var expander: ExpanderColumn<TransactionViewer.Transaction>
|
||||||
var txIdToScroll: SecureHash? = null // Passed as param.
|
private var txIdToScroll: SecureHash? = null // Passed as param.
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is what holds data for a single transaction node. Note how a lot of these are nullable as we often simply don't
|
* This is what holds data for a single transaction node. Note how a lot of these are nullable as we often simply don't
|
||||||
@ -149,7 +146,7 @@ class TransactionViewer : CordaView("Transactions") {
|
|||||||
resolvedInputs.map { it.state.data }.lift(),
|
resolvedInputs.map { it.state.data }.lift(),
|
||||||
resolvedOutputs.map { it.state.data }.lift())
|
resolvedOutputs.map { it.state.data }.lift())
|
||||||
)
|
)
|
||||||
}
|
}.distinctBy { it.id }
|
||||||
|
|
||||||
val searchField = SearchField(transactions,
|
val searchField = SearchField(transactions,
|
||||||
"Transaction ID" to { tx, s -> "${tx.id}".contains(s, true) },
|
"Transaction ID" to { tx, s -> "${tx.id}".contains(s, true) },
|
||||||
@ -206,7 +203,7 @@ class TransactionViewer : CordaView("Transactions") {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
column("Command type", Transaction::commandTypes).cellFormat { text = it.map { it.simpleName }.joinToString() }
|
column("Command type", Transaction::commandTypes).cellFormat { text = it.joinToString { it.simpleName } }
|
||||||
column("Total value", Transaction::totalValueEquiv).cellFormat {
|
column("Total value", Transaction::totalValueEquiv).cellFormat {
|
||||||
text = "${it.positivity.sign}${AmountFormatter.boring.format(it.amount)}"
|
text = "${it.positivity.sign}${AmountFormatter.boring.format(it.amount)}"
|
||||||
titleProperty.bind(reportingCurrency.map { "Total value ($it equiv)" })
|
titleProperty.bind(reportingCurrency.map { "Total value ($it equiv)" })
|
||||||
@ -228,9 +225,9 @@ class TransactionViewer : CordaView("Transactions") {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun ObservableList<List<ObservableValue<Party?>>>.formatJoinPartyNames(separator: String = ",", formatter: Formatter<CordaX500Name>): String {
|
private fun ObservableList<List<ObservableValue<Party?>>>.formatJoinPartyNames(separator: String = ",", formatter: Formatter<CordaX500Name>): String {
|
||||||
return flatten().map {
|
return flatten().mapNotNull {
|
||||||
it.value?.let { formatter.format(it.name) }
|
it.value?.let { formatter.format(it.name) }
|
||||||
}.filterNotNull().toSet().joinToString(separator)
|
}.toSet().joinToString(separator)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun ObservableList<StateAndRef<ContractState>>.getParties() = map { it.state.data.participants.map { it.owningKey.toKnownParty() } }
|
private fun ObservableList<StateAndRef<ContractState>>.getParties() = map { it.state.data.participants.map { it.owningKey.toKnownParty() } }
|
||||||
@ -244,8 +241,17 @@ class TransactionViewer : CordaView("Transactions") {
|
|||||||
init {
|
init {
|
||||||
right {
|
right {
|
||||||
label {
|
label {
|
||||||
val hash = SecureHash.randomSHA256()
|
val hashList = partiallyResolvedTransactions.map { it.id }
|
||||||
graphic = identicon(hash, 30.0)
|
val hashBinding = object : ObjectBinding<SecureHash>() {
|
||||||
|
init {
|
||||||
|
bind(hashList)
|
||||||
|
}
|
||||||
|
override fun computeValue(): SecureHash {
|
||||||
|
return if (hashList.isEmpty()) SecureHash.zeroHash
|
||||||
|
else hashList.fold(hashList[0], { one, another -> one.hashConcat(another) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
graphicProperty().bind(hashBinding.map { identicon(it, 30.0) })
|
||||||
textProperty().bind(Bindings.size(partiallyResolvedTransactions).map(Number::toString))
|
textProperty().bind(Bindings.size(partiallyResolvedTransactions).map(Number::toString))
|
||||||
BorderPane.setAlignment(this, Pos.BOTTOM_RIGHT)
|
BorderPane.setAlignment(this, Pos.BOTTOM_RIGHT)
|
||||||
}
|
}
|
||||||
@ -367,8 +373,7 @@ private fun calculateTotalEquiv(myIdentity: Party?,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// For issuing cash, if I am the issuer and not the owner (e.g. issuing cash to other party), count it as negative.
|
// For issuing cash, if I am the issuer and not the owner (e.g. issuing cash to other party), count it as negative.
|
||||||
val issuedAmount = if (inputs.isEmpty()) outputs.map { it as? Cash.State }
|
val issuedAmount = if (inputs.isEmpty()) outputs.mapNotNull { it as? Cash.State }
|
||||||
.filterNotNull()
|
|
||||||
.filter { it.amount.token.issuer.party.owningKey.toKnownParty().value == myIdentity && it.owner.owningKey.toKnownParty().value != myIdentity }
|
.filter { it.amount.token.issuer.party.owningKey.toKnownParty().value == myIdentity && it.owner.owningKey.toKnownParty().value != myIdentity }
|
||||||
.map { exchange(it.amount.withoutIssuer()).quantity }
|
.map { exchange(it.amount.withoutIssuer()).quantity }
|
||||||
.sum() else 0
|
.sum() else 0
|
||||||
|
@ -87,7 +87,7 @@ class CashViewer : CordaView("Cash") {
|
|||||||
null -> FXCollections.observableArrayList(leftPane)
|
null -> FXCollections.observableArrayList(leftPane)
|
||||||
else -> FXCollections.observableArrayList(leftPane, rightPane)
|
else -> FXCollections.observableArrayList(leftPane, rightPane)
|
||||||
}
|
}
|
||||||
})
|
}, "CashViewerSplitPane")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This holds the data for each row in the TreeTable.
|
* This holds the data for each row in the TreeTable.
|
||||||
|
@ -94,7 +94,7 @@ class NewTransaction : Fragment() {
|
|||||||
CashTransaction.Exit -> currencyTypes
|
CashTransaction.Exit -> currencyTypes
|
||||||
else -> FXCollections.emptyObservableList()
|
else -> FXCollections.emptyObservableList()
|
||||||
}
|
}
|
||||||
})
|
}, "NewTransactionCurrencyItems")
|
||||||
|
|
||||||
fun show(window: Window) {
|
fun show(window: Window) {
|
||||||
newTransactionDialog(window).showAndWait().ifPresent { request ->
|
newTransactionDialog(window).showAndWait().ifPresent { request ->
|
||||||
@ -106,9 +106,9 @@ class NewTransaction : Fragment() {
|
|||||||
show()
|
show()
|
||||||
}
|
}
|
||||||
val handle: FlowHandle<AbstractCashFlow.Result> = when (request) {
|
val handle: FlowHandle<AbstractCashFlow.Result> = when (request) {
|
||||||
is IssueAndPaymentRequest -> rpcProxy.value!!.startFlow(::CashIssueAndPaymentFlow, request)
|
is IssueAndPaymentRequest -> rpcProxy.value!!.cordaRPCOps.startFlow(::CashIssueAndPaymentFlow, request)
|
||||||
is PaymentRequest -> rpcProxy.value!!.startFlow(::CashPaymentFlow, request)
|
is PaymentRequest -> rpcProxy.value!!.cordaRPCOps.startFlow(::CashPaymentFlow, request)
|
||||||
is ExitRequest -> rpcProxy.value!!.startFlow(::CashExitFlow, request)
|
is ExitRequest -> rpcProxy.value!!.cordaRPCOps.startFlow(::CashExitFlow, request)
|
||||||
else -> throw IllegalArgumentException("Unexpected request type: $request")
|
else -> throw IllegalArgumentException("Unexpected request type: $request")
|
||||||
}
|
}
|
||||||
runAsync {
|
runAsync {
|
||||||
|
@ -34,3 +34,11 @@
|
|||||||
.corda-text-logo {
|
.corda-text-logo {
|
||||||
-fx-image: url("../images/Logo-04.png");
|
-fx-image: url("../images/Logo-04.png");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
.warning-label {
|
||||||
|
-fx-text-fill: red;
|
||||||
|
-fx-font-size: 14;
|
||||||
|
-fx-font-family: 'sans-serif';
|
||||||
|
-fx-font-weight: bold;
|
||||||
|
-fx-label-padding: 5;
|
||||||
|
}
|
@ -23,6 +23,9 @@
|
|||||||
<ImageView styleClass="corda-text-logo" fitHeight="35" preserveRatio="true" GridPane.hgrow="ALWAYS"
|
<ImageView styleClass="corda-text-logo" fitHeight="35" preserveRatio="true" GridPane.hgrow="ALWAYS"
|
||||||
fx:id="cordaLogo"/>
|
fx:id="cordaLogo"/>
|
||||||
|
|
||||||
|
<!-- Normally hidden warning label -->
|
||||||
|
<Label fx:id="rpcWarnLabel" styleClass="warning-label" text="Status: RPC connection not available" GridPane.columnIndex="1" visible="true"/>
|
||||||
|
|
||||||
<!-- User account menu -->
|
<!-- User account menu -->
|
||||||
<MenuButton fx:id="userButton" mnemonicParsing="false" GridPane.columnIndex="3">
|
<MenuButton fx:id="userButton" mnemonicParsing="false" GridPane.columnIndex="3">
|
||||||
<items>
|
<items>
|
||||||
@ -35,7 +38,7 @@
|
|||||||
</GridPane>
|
</GridPane>
|
||||||
</top>
|
</top>
|
||||||
<center>
|
<center>
|
||||||
<SplitPane id="mainSplitPane" dividerPositions="0.0">
|
<SplitPane fx:id="mainSplitPane" dividerPositions="0.0">
|
||||||
<VBox styleClass="sidebar" fx:id="sidebar" SplitPane.resizableWithParent="false">
|
<VBox styleClass="sidebar" fx:id="sidebar" SplitPane.resizableWithParent="false">
|
||||||
<StackPane>
|
<StackPane>
|
||||||
<Button fx:id="template" text="Template" styleClass="sidebar-menu-item"/>
|
<Button fx:id="template" text="Template" styleClass="sidebar-menu-item"/>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user