mirror of
https://github.com/corda/corda.git
synced 2025-02-21 01:42:24 +00:00
Minor: node: fix inspector warnings and delete dead code.
This commit is contained in:
parent
1a86ac481f
commit
bea799c60d
@ -267,7 +267,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
// the identity key. But the infrastructure to make that easy isn't here yet.
|
||||
keyManagement = makeKeyManagementService()
|
||||
flowLogicFactory = initialiseFlowLogicFactory()
|
||||
scheduler = NodeSchedulerService(database, services, flowLogicFactory, unfinishedSchedules = busyNodeLatch)
|
||||
scheduler = NodeSchedulerService(services, flowLogicFactory, unfinishedSchedules = busyNodeLatch)
|
||||
|
||||
val tokenizableServices = mutableListOf(storage, net, vault, keyManagement, identity, platformClock, scheduler)
|
||||
makeAdvertisedServices(tokenizableServices)
|
||||
@ -417,8 +417,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||
}
|
||||
val address = networkMapAddress ?: info.address
|
||||
// Register for updates, even if we're the one running the network map.
|
||||
return sendNetworkMapRegistration(address).flatMap { response ->
|
||||
check(response.error == null) { "Unable to register with the network map service: ${response.error}" }
|
||||
return sendNetworkMapRegistration(address).flatMap { (error) ->
|
||||
check(error == null) { "Unable to register with the network map service: $error" }
|
||||
// The future returned addMapService will complete on the same executor as sendNetworkMapRegistration, namely the one used by net
|
||||
services.networkMapCache.addMapService(net, address, true, null)
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ abstract class AbstractNodeService(val services: ServiceHubInternal) : Singleton
|
||||
/**
|
||||
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
|
||||
* common boilerplate code. Exceptions are caught and passed to the provided consumer. If you just want a simple
|
||||
* acknowledgement response with no content, use [Ack].
|
||||
* acknowledgement response with no content, use [net.corda.core.messaging.Ack].
|
||||
*
|
||||
* @param topic the topic, without the default session ID postfix (".0).
|
||||
* @param handler a function to handle the deserialised request and return an optional response (if return type not Unit)
|
||||
@ -49,7 +49,7 @@ abstract class AbstractNodeService(val services: ServiceHubInternal) : Singleton
|
||||
/**
|
||||
* Register a handler for a message topic. In comparison to using net.addMessageHandler() this manages a lot of
|
||||
* common boilerplate code. Exceptions are propagated to the messaging layer. If you just want a simple
|
||||
* acknowledgement response with no content, use [Ack].
|
||||
* acknowledgement response with no content, use [net.corda.core.messaging.Ack].
|
||||
*
|
||||
* @param topic the topic, without the default session ID postfix (".0).
|
||||
* @param handler a function to handle the deserialised request and return an optional response (if return type not Unit).
|
||||
|
@ -15,8 +15,8 @@ interface MessagingServiceInternal : MessagingService {
|
||||
/**
|
||||
* Initiates shutdown: if called from a thread that isn't controlled by the executor passed to the constructor
|
||||
* then this will block until all in-flight messages have finished being handled and acknowledged. If called
|
||||
* from a thread that's a part of the [AffinityExecutor] given to the constructor, it returns immediately and
|
||||
* shutdown is asynchronous.
|
||||
* from a thread that's a part of the [net.corda.node.utilities.AffinityExecutor] given to the constructor,
|
||||
* it returns immediately and shutdown is asynchronous.
|
||||
*/
|
||||
fun stop()
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* A first pass of a simple [SchedulerService] that works with [MutableClock]s for testing, demonstrations and simulations
|
||||
* that also encompasses the [Vault] observer for processing transactions.
|
||||
* that also encompasses the [net.corda.core.node.services.Vault] observer for processing transactions.
|
||||
*
|
||||
* This will observe transactions as they are stored and schedule and unschedule activities based on the States consumed
|
||||
* or produced.
|
||||
@ -43,8 +43,7 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* activity. Only replace this for unit testing purposes. This is not the executor the [FlowLogic] is launched on.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class NodeSchedulerService(private val database: Database,
|
||||
private val services: ServiceHubInternal,
|
||||
class NodeSchedulerService(private val services: ServiceHubInternal,
|
||||
private val flowLogicRefFactory: FlowLogicRefFactory,
|
||||
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(),
|
||||
private val unfinishedSchedules: ReusableLatch = ReusableLatch())
|
||||
@ -129,11 +128,12 @@ class NodeSchedulerService(private val database: Database,
|
||||
}
|
||||
|
||||
/**
|
||||
* This method first cancels the [Future] for any pending action so that the [awaitWithDeadline] used below
|
||||
* drops through without running the action. We then create a new [Future] for the new action (so it too can be
|
||||
* cancelled), and then await the arrival of the scheduled time. If we reach the scheduled time (the deadline)
|
||||
* without the [Future] being cancelled then we run the scheduled action. Finally we remove that action from the
|
||||
* scheduled actions and recompute the next scheduled action.
|
||||
* This method first cancels the [java.util.concurrent.Future] for any pending action so that the
|
||||
* [awaitWithDeadline] used below drops through without running the action. We then create a new
|
||||
* [java.util.concurrent.Future] for the new action (so it too can be cancelled), and then await the arrival of the
|
||||
* scheduled time. If we reach the scheduled time (the deadline) without the [java.util.concurrent.Future] being
|
||||
* cancelled then we run the scheduled action. Finally we remove that action from the scheduled actions and
|
||||
* recompute the next scheduled action.
|
||||
*/
|
||||
internal fun rescheduleWakeUp() {
|
||||
// Note, we already have the mutex but we need the scope again here
|
||||
|
@ -25,7 +25,7 @@ class InMemoryIdentityService : SingletonSerializeAsToken(), IdentityService {
|
||||
private val nameToParties = ConcurrentHashMap<String, Party>()
|
||||
|
||||
override fun registerIdentity(party: Party) {
|
||||
log.trace { "Registering identity ${party}" }
|
||||
log.trace { "Registering identity $party" }
|
||||
keyToParties[party.owningKey] = party
|
||||
nameToParties[party.name] = party
|
||||
}
|
||||
|
@ -59,8 +59,8 @@ open class InMemoryNetworkMapCache : SingletonSerializeAsToken(), NetworkMapCach
|
||||
if (node != null) {
|
||||
return PartyInfo.Node(node)
|
||||
}
|
||||
for (entry in registeredNodes) {
|
||||
for (service in entry.value.advertisedServices) {
|
||||
for ((_, value) in registeredNodes) {
|
||||
for (service in value.advertisedServices) {
|
||||
if (service.identity == party) {
|
||||
return PartyInfo.Service(service)
|
||||
}
|
||||
|
@ -42,10 +42,11 @@ class DBTransactionMappingStorage : StateMachineRecordedTransactionMappingStorag
|
||||
}
|
||||
}
|
||||
|
||||
private val mutex = ThreadBox(object {
|
||||
private class InnerState {
|
||||
val stateMachineTransactionMap = TransactionMappingsMap()
|
||||
val updates = PublishSubject.create<StateMachineTransactionMapping>()
|
||||
})
|
||||
val updates = PublishSubject.create<StateMachineTransactionMapping>()!!
|
||||
}
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
|
||||
override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) {
|
||||
mutex.locked {
|
||||
|
@ -37,7 +37,7 @@ class DBTransactionStorage : TransactionStorage {
|
||||
|
||||
override fun addTransaction(transaction: SignedTransaction): Boolean {
|
||||
val recorded = synchronized(txStorage) {
|
||||
val old = txStorage.get(transaction.id)
|
||||
val old = txStorage[transaction.id]
|
||||
if (old == null) {
|
||||
txStorage.put(transaction.id, transaction)
|
||||
updatesPublisher.bufferUntilDatabaseCommit().onNext(transaction)
|
||||
@ -54,7 +54,7 @@ class DBTransactionStorage : TransactionStorage {
|
||||
|
||||
override fun getTransaction(id: SecureHash): SignedTransaction? {
|
||||
synchronized(txStorage) {
|
||||
return txStorage.get(id)
|
||||
return txStorage[id]
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,11 +18,11 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
*/
|
||||
@ThreadSafe
|
||||
class InMemoryStateMachineRecordedTransactionMappingStorage : StateMachineRecordedTransactionMappingStorage {
|
||||
|
||||
private val mutex = ThreadBox(object {
|
||||
private class InnerState {
|
||||
val stateMachineTransactionMap = HashMap<StateMachineRunId, HashSet<SecureHash>>()
|
||||
val updates = PublishSubject.create<StateMachineTransactionMapping>()
|
||||
})
|
||||
val updates = PublishSubject.create<StateMachineTransactionMapping>()!!
|
||||
}
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
|
||||
override fun addMapping(stateMachineRunId: StateMachineRunId, transactionId: SecureHash) {
|
||||
mutex.locked {
|
||||
|
@ -43,11 +43,9 @@ class DistributedImmutableMap<K : Any, V : Any>(val db: Database, tableName: Str
|
||||
|
||||
/** Gets a value for the given [Commands.Get.key] */
|
||||
fun get(commit: Commit<Commands.Get<K, V>>): V? {
|
||||
try {
|
||||
commit.use {
|
||||
val key = commit.operation().key
|
||||
return databaseTransaction(db) { map[key] }
|
||||
} finally {
|
||||
commit.close()
|
||||
}
|
||||
}
|
||||
|
||||
@ -57,7 +55,7 @@ class DistributedImmutableMap<K : Any, V : Any>(val db: Database, tableName: Str
|
||||
* @return map containing conflicting entries
|
||||
*/
|
||||
fun put(commit: Commit<Commands.PutAll<K, V>>): Map<K, V> {
|
||||
try {
|
||||
commit.use { commit ->
|
||||
val conflicts = LinkedHashMap<K, V>()
|
||||
databaseTransaction(db) {
|
||||
val entries = commit.operation().entries
|
||||
@ -66,16 +64,12 @@ class DistributedImmutableMap<K : Any, V : Any>(val db: Database, tableName: Str
|
||||
if (conflicts.isEmpty()) map.putAll(entries)
|
||||
}
|
||||
return conflicts
|
||||
} finally {
|
||||
commit.close()
|
||||
}
|
||||
}
|
||||
|
||||
fun size(commit: Commit<Commands.Size>): Int {
|
||||
try {
|
||||
commit.use { commit ->
|
||||
return databaseTransaction(db) { map.size }
|
||||
} finally {
|
||||
commit.close()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,15 +66,15 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
val configuration = RequeryConfiguration(dataSourceProperties)
|
||||
val session = configuration.sessionForModel(Models.VAULT)
|
||||
|
||||
private val mutex = ThreadBox(content = object {
|
||||
|
||||
val _updatesPublisher = PublishSubject.create<Vault.Update>()
|
||||
val _rawUpdatesPublisher = PublishSubject.create<Vault.Update>()
|
||||
val _updatesInDbTx = _updatesPublisher.wrapWithDatabaseTransaction().asObservable()
|
||||
private class InnerState {
|
||||
val _updatesPublisher = PublishSubject.create<Vault.Update>()!!
|
||||
val _rawUpdatesPublisher = PublishSubject.create<Vault.Update>()!!
|
||||
val _updatesInDbTx = _updatesPublisher.wrapWithDatabaseTransaction().asObservable()!!
|
||||
|
||||
// For use during publishing only.
|
||||
val updatesPublisher: rx.Observer<Vault.Update> get() = _updatesPublisher.bufferUntilDatabaseCommit().tee(_rawUpdatesPublisher)
|
||||
})
|
||||
}
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
|
||||
private fun recordUpdate(update: Vault.Update): Vault.Update {
|
||||
if (update != Vault.NoUpdate) {
|
||||
@ -329,7 +329,7 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
val issuerKeysStr = onlyFromIssuerParties?.fold("") { left, right -> left + "('${right.owningKey.toBase58String()}')," }?.dropLast(1)
|
||||
val issuerRefsStr = withIssuerRefs?.fold("") { left, right -> left + "('${right.bytes.toHexString()}')," }?.dropLast(1)
|
||||
|
||||
var stateAndRefs = mutableListOf<StateAndRef<T>>()
|
||||
val stateAndRefs = mutableListOf<StateAndRef<T>>()
|
||||
|
||||
// TODO: Need to provide a database provider independent means of performing this function.
|
||||
// We are using an H2 specific means of selecting a minimum set of rows that match a request amount of coins:
|
||||
@ -417,7 +417,7 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P
|
||||
override fun <T : ContractState> softLockedStates(lockId: UUID?): List<StateAndRef<T>> {
|
||||
val stateAndRefs =
|
||||
session.withTransaction(TransactionIsolation.REPEATABLE_READ) {
|
||||
var query = select(VaultSchema.VaultStates::class)
|
||||
val query = select(VaultSchema.VaultStates::class)
|
||||
.where(VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED)
|
||||
.and(VaultSchema.VaultStates::contractStateClassName eq Cash.State::class.java.name)
|
||||
if (lockId != null)
|
||||
|
@ -19,12 +19,12 @@ class VaultSoftLockManager(val vault: VaultService, smm: StateMachineManager) {
|
||||
private val trackingFlowIds: MutableSet<UUID> = Collections.synchronizedSet(HashSet())
|
||||
|
||||
init {
|
||||
smm.changes.subscribe { change ->
|
||||
if (change.addOrRemove == AddOrRemove.REMOVE && trackingFlowIds.contains(change.id.uuid)) {
|
||||
log.trace { "${change.addOrRemove} Flow name ${change.logic.javaClass} with id ${change.id}" }
|
||||
unregisterSoftLocks(change.id, change.logic)
|
||||
smm.changes.subscribe { (logic, addOrRemove, id) ->
|
||||
if (addOrRemove == AddOrRemove.REMOVE && trackingFlowIds.contains(id.uuid)) {
|
||||
log.trace { "$addOrRemove Flow name ${logic.javaClass} with id $id" }
|
||||
unregisterSoftLocks(id, logic)
|
||||
}
|
||||
trackingFlowIds.remove(change.id.uuid)
|
||||
trackingFlowIds.remove(id.uuid)
|
||||
}
|
||||
|
||||
// Discussion
|
||||
|
@ -111,8 +111,6 @@ interface AffinityExecutor : Executor {
|
||||
runnable.run()
|
||||
}
|
||||
|
||||
val taskQueueSize: Int get() = commandQ.size
|
||||
|
||||
override fun flush() {
|
||||
throw UnsupportedOperationException()
|
||||
}
|
||||
|
@ -33,14 +33,6 @@ abstract class MutableClock : Clock() {
|
||||
|
||||
private val _version = AtomicLong(0L)
|
||||
|
||||
/**
|
||||
* This tracks how many direct mutations of "now" have occurred for this [Clock], but not the passage of time.
|
||||
*
|
||||
* It starts at zero, and increments by one per mutation.
|
||||
*/
|
||||
val mutationCount: Long
|
||||
get() = _version.get()
|
||||
|
||||
/**
|
||||
* This is an observer on the mutation count of this [Clock], which reflects the occurence of mutations.
|
||||
*/
|
||||
@ -71,7 +63,8 @@ abstract class MutableClock : Clock() {
|
||||
|
||||
/**
|
||||
* Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations
|
||||
* used in demos or testing. This will substitute a Fiber compatible Future so the current [Strand] is not blocked.
|
||||
* used in demos or testing. This will substitute a Fiber compatible Future so the current
|
||||
* [co.paralleluniverse.strands.Strand] is not blocked.
|
||||
*
|
||||
* @return true if the [Future] is complete, false if the deadline was reached.
|
||||
*/
|
||||
|
@ -3,6 +3,7 @@ package net.corda.node.utilities
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import com.zaxxer.hikari.HikariConfig
|
||||
import com.zaxxer.hikari.HikariDataSource
|
||||
import net.corda.core.crypto.CompositeKey
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.parsePublicKeyBase58
|
||||
import net.corda.core.crypto.toBase58String
|
||||
@ -65,8 +66,8 @@ fun <T> isolatedTransaction(database: Database, block: Transaction.() -> T): T {
|
||||
}
|
||||
|
||||
/**
|
||||
* A relatively close copy of the [ThreadLocalTransactionManager] in Exposed but with the following adjustments to suit
|
||||
* our environment:
|
||||
* A relatively close copy of the [org.jetbrains.exposed.sql.transactions.ThreadLocalTransactionManager]
|
||||
* in Exposed but with the following adjustments to suit our environment:
|
||||
*
|
||||
* Because the construction of a [Database] instance results in replacing the singleton [TransactionManager] instance,
|
||||
* our tests involving two [MockNode]s effectively replace the database instances of each other and continue to trample
|
||||
@ -100,7 +101,7 @@ class StrandLocalTransactionManager(initWithDatabase: Database) : TransactionMan
|
||||
|
||||
var database: Database
|
||||
get() = threadLocalDb.get() ?: throw IllegalStateException("Was expecting to find database set on current strand: ${Strand.currentStrand()}")
|
||||
set(value: Database) {
|
||||
set(value) {
|
||||
threadLocalDb.set(value)
|
||||
}
|
||||
|
||||
@ -253,7 +254,7 @@ fun <T : Any> rx.Observable<T>.wrapWithDatabaseTransaction(db: Database? = null)
|
||||
// Add the subscriber to the wrapping subscriber, which will invoke the original subscribers together inside a database transaction.
|
||||
wrappingSubscriber.delegates.add(toBeWrappedInDbTx)
|
||||
// If we are the first subscriber, return the shared subscriber, otherwise return a subscriber that does nothing.
|
||||
if (wrappingSubscriber.delegates.size == 1) wrappingSubscriber else NoOpSubscriber<T>(toBeWrappedInDbTx)
|
||||
if (wrappingSubscriber.delegates.size == 1) wrappingSubscriber else NoOpSubscriber(toBeWrappedInDbTx)
|
||||
// Clean up the shared list of subscribers when they unsubscribe.
|
||||
}.doOnUnsubscribe { wrappingSubscriber.cleanUp() }
|
||||
}
|
||||
@ -287,7 +288,7 @@ object PublicKeyColumnType : ColumnType() {
|
||||
|
||||
override fun valueFromDB(value: Any): Any = parsePublicKeyBase58(value.toString())
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = if (value is PublicKey) value.toBase58String() else value
|
||||
override fun notNullValueToDB(value: Any): Any = (value as? PublicKey)?.toBase58String() ?: value
|
||||
}
|
||||
|
||||
/**
|
||||
@ -298,7 +299,7 @@ object SecureHashColumnType : ColumnType() {
|
||||
|
||||
override fun valueFromDB(value: Any): Any = SecureHash.parse(value.toString())
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = if (value is SecureHash) value.toString() else value
|
||||
override fun notNullValueToDB(value: Any): Any = (value as? SecureHash)?.toString() ?: value
|
||||
}
|
||||
|
||||
/**
|
||||
@ -309,7 +310,7 @@ object UUIDStringColumnType : ColumnType() {
|
||||
|
||||
override fun valueFromDB(value: Any): Any = UUID.fromString(value.toString())
|
||||
|
||||
override fun notNullValueToDB(value: Any): Any = if (value is UUID) value.toString() else value
|
||||
override fun notNullValueToDB(value: Any): Any = (value as? UUID)?.toString() ?: value
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -10,14 +10,15 @@ import java.util.concurrent.Future
|
||||
import java.util.concurrent.locks.Lock
|
||||
import kotlin.concurrent.withLock
|
||||
|
||||
// TODO: We should consider using a Semaphore or CountDownLatch here to make it a little easier to understand, but it seems as though the current version of Quasar does not support suspending on either of their implementations.
|
||||
|
||||
/**
|
||||
* Modelled on [ThreadBox], but with support for waiting that is compatible with Quasar [Fiber]s and [MutableClock]s.
|
||||
* Modelled on [net.corda.core.ThreadBox], but with support for waiting that is compatible with Quasar [Fiber]s and [MutableClock]s.
|
||||
*
|
||||
* It supports 3 main operations, all of which operate in a similar context to the [locked] method
|
||||
* of [ThreadBox]. i.e. in the context of the content.
|
||||
* of [net.corda.core.ThreadBox]. i.e. in the context of the content.
|
||||
* * [read] operations which acquire the associated lock but do not notify any waiters (see [readWithDeadline])
|
||||
* and is a direct equivalent of [ThreadBox.locked].
|
||||
* and is a direct equivalent of [net.corda.core.ThreadBox.locked].
|
||||
* * [write] operations which are the same as [read] operations but additionally notify any waiters that the content may have changed.
|
||||
* * [readWithDeadline] operations acquire the lock and are evaluated repeatedly until they no longer throw any subclass
|
||||
* of [RetryableException]. Between iterations it will wait until woken by a [write] or the deadline is reached. It will eventually
|
||||
@ -27,14 +28,12 @@ import kotlin.concurrent.withLock
|
||||
* or testing.
|
||||
*
|
||||
* Currently this is intended for use within a node as a simplified way for Oracles to implement subscriptions for changing
|
||||
* data by running a flow internally to implement the request handler (see [NodeInterestRates.Oracle]), which can then
|
||||
* data by running a flow internally to implement the request handler which can then
|
||||
* effectively relinquish control until the data becomes available. This isn't the most scalable design and is intended
|
||||
* to be temporary. In addition, it's enitrely possible to envisage a time when we want public [FlowLogic]
|
||||
* to be temporary. In addition, it's enitrely possible to envisage a time when we want public [net.corda.core.flows.FlowLogic]
|
||||
* implementations to be able to wait for some condition to become true outside of message send/receive. At that point
|
||||
* we may revisit this implementation and indeed the whole model for this, when we understand that requirement more fully.
|
||||
*
|
||||
* TODO: We should consider using a [Semaphore] or [CountDownLatch] here to make it a little easier to understand, but it seems
|
||||
* as though the current version of Qasar does not support suspending on either of their implementations.
|
||||
*/
|
||||
class FiberBox<out T>(private val content: T, private val lock: Lock = ReentrantLock()) {
|
||||
private var mutated: SettableFuture<Boolean>? = null
|
||||
|
@ -15,10 +15,10 @@ import java.security.cert.Certificate
|
||||
import kotlin.system.exitProcess
|
||||
|
||||
/**
|
||||
* This checks the [config.certificatesDirectory] for certificates required to connect to a Corda network.
|
||||
* If the certificates are not found, a [PKCS10CertificationRequest] will be submitted to Corda network permissioning server using [NetworkRegistrationService].
|
||||
* This process will enter a polling loop until the request has been approved, and then
|
||||
* the certificate chain will be downloaded and stored in [Keystore] reside in [config.certificatesDirectory].
|
||||
* This checks the config.certificatesDirectory field for certificates required to connect to a Corda network.
|
||||
* If the certificates are not found, a [org.bouncycastle.pkcs.PKCS10CertificationRequest] will be submitted to
|
||||
* Corda network permissioning server using [NetworkRegistrationService]. This process will enter a polling loop until the request has been approved, and then
|
||||
* the certificate chain will be downloaded and stored in [Keystore] reside in the certificates directory.
|
||||
*/
|
||||
class NetworkRegistrationHelper(val config: NodeConfiguration, val certService: NetworkRegistrationService) {
|
||||
companion object {
|
||||
|
@ -181,7 +181,7 @@ class CordaRPCOpsImplTest {
|
||||
sequence(
|
||||
// ISSUE
|
||||
expect { update ->
|
||||
require(update.consumed.size == 0) { update.consumed.size }
|
||||
require(update.consumed.isEmpty()) { update.consumed.size }
|
||||
require(update.produced.size == 1) { update.produced.size }
|
||||
},
|
||||
// MOVE
|
||||
|
@ -85,7 +85,7 @@ class NodeSchedulerServiceTest : SingletonSerializeAsToken() {
|
||||
override val vaultService: VaultService = NodeVaultService(this, dataSourceProps)
|
||||
override val testReference = this@NodeSchedulerServiceTest
|
||||
}
|
||||
scheduler = NodeSchedulerService(database, services, factory, schedulerGatedExecutor)
|
||||
scheduler = NodeSchedulerService(services, factory, schedulerGatedExecutor)
|
||||
smmExecutor = AffinityExecutor.ServiceAffinityExecutor("test", 1)
|
||||
val mockSMM = StateMachineManager(services, listOf(services, scheduler), DBCheckpointStorage(), smmExecutor, database)
|
||||
mockSMM.changes.subscribe { change ->
|
||||
|
@ -1,91 +0,0 @@
|
||||
package net.corda.node.visualiser
|
||||
|
||||
import org.graphstream.graph.Edge
|
||||
import org.graphstream.graph.Element
|
||||
import org.graphstream.graph.Graph
|
||||
import org.graphstream.graph.Node
|
||||
import org.graphstream.graph.implementations.SingleGraph
|
||||
import org.graphstream.ui.layout.Layout
|
||||
import org.graphstream.ui.layout.springbox.implementations.SpringBox
|
||||
import org.graphstream.ui.swingViewer.DefaultView
|
||||
import org.graphstream.ui.view.Viewer
|
||||
import org.graphstream.ui.view.ViewerListener
|
||||
import java.util.*
|
||||
import javax.swing.JFrame
|
||||
import kotlin.reflect.KProperty
|
||||
|
||||
// Some utilities to make the GraphStream API a bit nicer to work with. For some reason GS likes to use a non-type safe
|
||||
// string->value map type API for configuring common things. We fix it up here:
|
||||
|
||||
class GSPropertyDelegate<T>(private val prefix: String) {
|
||||
operator fun getValue(thisRef: Element, property: KProperty<*>): T = thisRef.getAttribute("$prefix.${property.name}")
|
||||
operator fun setValue(thisRef: Element, property: KProperty<*>, value: T) = thisRef.setAttribute("$prefix.${property.name}", value)
|
||||
}
|
||||
|
||||
var Node.label: String by GSPropertyDelegate<String>("ui")
|
||||
var Graph.stylesheet: String by GSPropertyDelegate<String>("ui")
|
||||
var Edge.weight: Double by GSPropertyDelegate<Double>("layout")
|
||||
|
||||
// Do this one by hand as 'class' is a reserved word.
|
||||
var Node.styleClass: String
|
||||
set(value) = setAttribute("ui.class", value)
|
||||
get() = getAttribute("ui.class")
|
||||
|
||||
fun createGraph(name: String, styles: String): SingleGraph {
|
||||
System.setProperty("org.graphstream.ui.renderer", "org.graphstream.ui.j2dviewer.J2DGraphRenderer")
|
||||
return SingleGraph(name).apply {
|
||||
stylesheet = styles
|
||||
setAttribute("ui.quality")
|
||||
setAttribute("ui.antialias")
|
||||
setAttribute("layout.quality", 0)
|
||||
setAttribute("layout.force", 0.9)
|
||||
}
|
||||
}
|
||||
|
||||
class MyViewer(graph: Graph) : Viewer(graph, ThreadingModel.GRAPH_IN_ANOTHER_THREAD) {
|
||||
override fun enableAutoLayout(layoutAlgorithm: Layout) {
|
||||
super.enableAutoLayout(layoutAlgorithm)
|
||||
|
||||
// Setting shortNap to 1 stops things bouncing around horribly at the start.
|
||||
optLayout.setNaps(50, 1)
|
||||
}
|
||||
}
|
||||
|
||||
fun runGraph(graph: SingleGraph, nodeOnClick: (Node) -> Unit) {
|
||||
// Use a bit of custom code here instead of calling graph.display() so we can maximize the window.
|
||||
val viewer = MyViewer(graph)
|
||||
val view: DefaultView = object : DefaultView(viewer, Viewer.DEFAULT_VIEW_ID, Viewer.newGraphRenderer()) {
|
||||
override fun openInAFrame(on: Boolean) {
|
||||
super.openInAFrame(on)
|
||||
if (frame != null) {
|
||||
frame.extendedState = frame.extendedState or JFrame.MAXIMIZED_BOTH
|
||||
}
|
||||
}
|
||||
}
|
||||
viewer.addView(view)
|
||||
|
||||
var loop: Boolean = true
|
||||
val viewerPipe = viewer.newViewerPipe()
|
||||
viewerPipe.addViewerListener(object : ViewerListener {
|
||||
override fun buttonPushed(id: String?) {
|
||||
}
|
||||
|
||||
override fun buttonReleased(id: String?) {
|
||||
val node = graph.getNode<Node>(id)
|
||||
nodeOnClick(node)
|
||||
}
|
||||
|
||||
override fun viewClosed(viewName: String?) {
|
||||
loop = false
|
||||
}
|
||||
})
|
||||
|
||||
view.openInAFrame(true)
|
||||
// Seed determined through trial and error: it gives a reasonable layout for the Wednesday demo.
|
||||
val springBox = SpringBox(false, Random(-103468310429824593L))
|
||||
viewer.enableAutoLayout(springBox)
|
||||
|
||||
while (loop) {
|
||||
viewerPipe.blockingPump()
|
||||
}
|
||||
}
|
@ -1,78 +0,0 @@
|
||||
package net.corda.node.visualiser
|
||||
|
||||
import net.corda.core.contracts.CommandData
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.testing.LedgerDSL
|
||||
import net.corda.testing.TestLedgerDSLInterpreter
|
||||
import net.corda.testing.TestTransactionDSLInterpreter
|
||||
import org.graphstream.graph.Edge
|
||||
import org.graphstream.graph.Node
|
||||
import org.graphstream.graph.implementations.SingleGraph
|
||||
import kotlin.reflect.full.memberProperties
|
||||
|
||||
@Suppress("unused") // TODO: Re-evaluate by EOY2016 if this code is still useful and if not, delete.
|
||||
class GraphVisualiser(val dsl: LedgerDSL<TestTransactionDSLInterpreter, TestLedgerDSLInterpreter>) {
|
||||
companion object {
|
||||
val css = GraphVisualiser::class.java.getResourceAsStream("graph.css").bufferedReader().readText()
|
||||
}
|
||||
|
||||
fun convert(): SingleGraph {
|
||||
val testLedger: TestLedgerDSLInterpreter = dsl.interpreter
|
||||
val graph = createGraph("Transaction group", css)
|
||||
|
||||
// Map all the transactions, including the bogus non-verified ones (with no inputs) to graph nodes.
|
||||
for ((txIndex, tx) in (testLedger.transactionsToVerify + testLedger.transactionsUnverified).withIndex()) {
|
||||
val txNode = graph.addNode<Node>("tx$txIndex")
|
||||
if (tx !in testLedger.transactionsUnverified)
|
||||
txNode.label = dsl.interpreter.transactionName(tx.id).let { it ?: "TX[${tx.id.prefixChars()}]" }
|
||||
txNode.styleClass = "tx"
|
||||
|
||||
// Now create a vertex for each output state.
|
||||
for (outIndex in tx.outputs.indices) {
|
||||
val node = graph.addNode<Node>(tx.outRef<ContractState>(outIndex).ref.toString())
|
||||
val state = tx.outputs[outIndex]
|
||||
node.label = stateToLabel(state.data)
|
||||
node.styleClass = stateToCSSClass(state.data) + ",state"
|
||||
node.setAttribute("state", state)
|
||||
val edge = graph.addEdge<Edge>("tx$txIndex-out$outIndex", txNode, node, true)
|
||||
edge.weight = 0.7
|
||||
}
|
||||
|
||||
// And a vertex for each command.
|
||||
for ((index, cmd) in tx.commands.withIndex()) {
|
||||
val node = graph.addNode<Node>(SecureHash.randomSHA256().prefixChars())
|
||||
node.label = commandToTypeName(cmd.value)
|
||||
node.styleClass = "command"
|
||||
val edge = graph.addEdge<Edge>("tx$txIndex-cmd-$index", node, txNode)
|
||||
edge.weight = 0.4
|
||||
}
|
||||
}
|
||||
// And now all states and transactions were mapped to graph nodes, hook up the input edges.
|
||||
for ((txIndex, tx) in testLedger.transactionsToVerify.withIndex()) {
|
||||
for ((inputIndex, ref) in tx.inputs.withIndex()) {
|
||||
val edge = graph.addEdge<Edge>("tx$txIndex-in$inputIndex", ref.toString(), "tx$txIndex", true)
|
||||
edge.weight = 1.2
|
||||
}
|
||||
}
|
||||
return graph
|
||||
}
|
||||
|
||||
private fun stateToLabel(state: ContractState): String {
|
||||
return dsl.interpreter.outputToLabel(state) ?: stateToTypeName(state)
|
||||
}
|
||||
|
||||
private fun commandToTypeName(state: CommandData) = state.javaClass.canonicalName.removePrefix("contracts.").replace('$', '.')
|
||||
private fun stateToTypeName(state: ContractState) = state.javaClass.canonicalName.removePrefix("contracts.").removeSuffix(".State")
|
||||
private fun stateToCSSClass(state: ContractState) = stateToTypeName(state).replace('.', '_').toLowerCase()
|
||||
|
||||
fun display() {
|
||||
runGraph(convert(), nodeOnClick = { node ->
|
||||
val state: ContractState? = node.getAttribute("state")
|
||||
if (state != null) {
|
||||
val props: List<Pair<String, Any?>> = state.javaClass.kotlin.memberProperties.map { it.name to it.getter.call(state) }
|
||||
StateViewer.show(props)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -1,37 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<form xmlns="http://www.intellij.com/uidesigner/form/" version="1" bind-to-class="net.corda.node.visualiser.StateViewer">
|
||||
<grid id="27dc6" binding="root" layout-manager="BorderLayout" hgap="15" vgap="15">
|
||||
<constraints>
|
||||
<xy x="20" y="20" width="500" height="400"/>
|
||||
</constraints>
|
||||
<properties>
|
||||
<background color="-1"/>
|
||||
</properties>
|
||||
<border type="empty">
|
||||
<size top="15" left="15" bottom="15" right="15"/>
|
||||
</border>
|
||||
<children>
|
||||
<component id="c1614" class="javax.swing.JLabel">
|
||||
<constraints border-constraint="North"/>
|
||||
<properties>
|
||||
<font style="1"/>
|
||||
<text value="State viewer"/>
|
||||
</properties>
|
||||
</component>
|
||||
<scrollpane id="2974d">
|
||||
<constraints border-constraint="Center"/>
|
||||
<properties/>
|
||||
<border type="none"/>
|
||||
<children>
|
||||
<component id="8f1af" class="javax.swing.JTable" binding="propsTable">
|
||||
<constraints/>
|
||||
<properties>
|
||||
<autoResizeMode value="3"/>
|
||||
<showHorizontalLines value="false"/>
|
||||
</properties>
|
||||
</component>
|
||||
</children>
|
||||
</scrollpane>
|
||||
</children>
|
||||
</grid>
|
||||
</form>
|
@ -1,109 +0,0 @@
|
||||
package net.corda.node.visualiser;
|
||||
|
||||
import kotlin.*;
|
||||
|
||||
import javax.swing.*;
|
||||
import javax.swing.table.*;
|
||||
import java.awt.*;
|
||||
import java.util.*;
|
||||
import java.util.List;
|
||||
|
||||
public class StateViewer {
|
||||
private JPanel root;
|
||||
private JTable propsTable;
|
||||
|
||||
public static void main(String[] args) {
|
||||
JFrame frame = new JFrame("StateViewer");
|
||||
List<Pair<String, Object>> props = new ArrayList<Pair<String, Object>>();
|
||||
props.add(new Pair<String, Object>("a", 123));
|
||||
props.add(new Pair<String, Object>("things", "bar"));
|
||||
frame.setContentPane(new StateViewer(props).root);
|
||||
frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
|
||||
frame.pack();
|
||||
frame.setVisible(true);
|
||||
frame.setSize(800, 600);
|
||||
}
|
||||
|
||||
public static void show(List<Pair<String, Object>> props) {
|
||||
JFrame frame = new JFrame("StateViewer");
|
||||
StateViewer viewer = new StateViewer(props);
|
||||
frame.setContentPane(viewer.root);
|
||||
frame.pack();
|
||||
frame.setSize(600, 300);
|
||||
|
||||
viewer.propsTable.getColumnModel().getColumn(0).setMinWidth(150);
|
||||
viewer.propsTable.getColumnModel().getColumn(0).setMaxWidth(150);
|
||||
|
||||
frame.setVisible(true);
|
||||
}
|
||||
|
||||
private StateViewer(final List<Pair<String, Object>> props) {
|
||||
propsTable.setModel(new AbstractTableModel() {
|
||||
@Override
|
||||
public int getRowCount() {
|
||||
return props.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getColumnCount() {
|
||||
return 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getColumnName(int column) {
|
||||
if (column == 0)
|
||||
return "Attribute";
|
||||
else if (column == 1)
|
||||
return "Value";
|
||||
else
|
||||
return "?";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getValueAt(int rowIndex, int columnIndex) {
|
||||
if (columnIndex == 0)
|
||||
return props.get(rowIndex).getFirst();
|
||||
else
|
||||
return props.get(rowIndex).getSecond();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
// GUI initializer generated by IntelliJ IDEA GUI Designer
|
||||
// >>> IMPORTANT!! <<<
|
||||
// DO NOT EDIT OR ADD ANY CODE HERE!
|
||||
$$$setupUI$$$();
|
||||
}
|
||||
|
||||
/**
|
||||
* Method generated by IntelliJ IDEA GUI Designer
|
||||
* >>> IMPORTANT!! <<<
|
||||
* DO NOT edit this method OR call it in your code!
|
||||
*
|
||||
* @noinspection ALL
|
||||
*/
|
||||
private void $$$setupUI$$$() {
|
||||
root = new JPanel();
|
||||
root.setLayout(new BorderLayout(15, 15));
|
||||
root.setBackground(new Color(-1));
|
||||
root.setBorder(BorderFactory.createTitledBorder(BorderFactory.createEmptyBorder(15, 15, 15, 15), null));
|
||||
final JLabel label1 = new JLabel();
|
||||
label1.setFont(new Font(label1.getFont().getName(), Font.BOLD, label1.getFont().getSize()));
|
||||
label1.setText("State viewer");
|
||||
root.add(label1, BorderLayout.NORTH);
|
||||
final JScrollPane scrollPane1 = new JScrollPane();
|
||||
root.add(scrollPane1, BorderLayout.CENTER);
|
||||
propsTable = new JTable();
|
||||
propsTable.setAutoResizeMode(3);
|
||||
propsTable.setShowHorizontalLines(false);
|
||||
scrollPane1.setViewportView(propsTable);
|
||||
}
|
||||
|
||||
/**
|
||||
* @noinspection ALL
|
||||
*/
|
||||
public JComponent $$$getRootComponent$$$() {
|
||||
return root;
|
||||
}
|
||||
}
|
@ -1,41 +0,0 @@
|
||||
node.tx {
|
||||
size: 10px;
|
||||
fill-color: blue;
|
||||
shape: rounded-box;
|
||||
z-index: 4;
|
||||
}
|
||||
|
||||
node.state {
|
||||
size: 25px;
|
||||
fill-color: beige;
|
||||
stroke-width: 2px;
|
||||
stroke-color: black;
|
||||
stroke-mode: plain;
|
||||
}
|
||||
|
||||
node {
|
||||
text-background-mode: rounded-box;
|
||||
text-background-color: darkslategrey;
|
||||
text-padding: 5px;
|
||||
text-offset: 10px;
|
||||
text-color: white;
|
||||
text-alignment: under;
|
||||
text-size: 16;
|
||||
}
|
||||
|
||||
node.command {
|
||||
text-size: 12;
|
||||
size: 8px;
|
||||
fill-color: white;
|
||||
stroke-width: 2px;
|
||||
stroke-color: black;
|
||||
stroke-mode: plain;
|
||||
}
|
||||
|
||||
node.cash {
|
||||
fill-color: red;
|
||||
}
|
||||
|
||||
graph {
|
||||
padding: 100px;
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user