Refactor explorer and friends to use RPC, remove NodeMonitor*

This commit is contained in:
Andras Slemmer
2016-09-29 17:19:44 +01:00
parent 415de1ce1f
commit 5af0e97444
20 changed files with 776 additions and 1324 deletions

View File

@ -33,7 +33,6 @@ import com.r3corda.node.services.events.ScheduledActivityObserver
import com.r3corda.node.services.identity.InMemoryIdentityService
import com.r3corda.node.services.keys.PersistentKeyManagementService
import com.r3corda.node.services.messaging.CordaRPCOps
import com.r3corda.node.services.monitor.NodeMonitorService
import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC
@ -140,7 +139,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
lateinit var vault: VaultService
lateinit var keyManagement: KeyManagementService
var inNodeNetworkMapService: NetworkMapService? = null
var inNodeMonitorService: NodeMonitorService? = null
var inNodeNotaryService: NotaryService? = null
var uniquenessProvider: UniquenessProvider? = null
lateinit var identity: IdentityService
@ -231,7 +229,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
}
}
inNodeMonitorService = makeMonitorService() // Note this HAS to be after smm is set
buildAdvertisedServices()
// TODO: this model might change but for now it provides some de-coupling
@ -410,8 +407,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val networkMap
// TODO: sort out ordering of open & protected modifiers of functions in this class.
protected open fun makeVaultService(): VaultService = NodeVaultService(services)
protected open fun makeMonitorService(): NodeMonitorService = NodeMonitorService(services, smm)
open fun stop() {
// TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the
// network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop()

View File

@ -1,24 +1,33 @@
package com.r3corda.node.internal
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.contracts.asset.Cash
import com.r3corda.contracts.asset.InsufficientBalanceException
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.toStringShort
import com.r3corda.core.node.ServiceHub
import com.r3corda.core.node.services.Vault
import com.r3corda.core.transactions.TransactionBuilder
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.messaging.CordaRPCOps
import com.r3corda.node.services.messaging.StateMachineInfo
import com.r3corda.node.services.messaging.StateMachineUpdate
import com.r3corda.node.services.messaging.TransactionBuildResult
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.utilities.databaseTransaction
import com.r3corda.protocols.BroadcastTransactionProtocol
import com.r3corda.protocols.FinalityProtocol
import org.jetbrains.exposed.sql.Database
import rx.Observable
import java.security.KeyPair
/**
* Server side implementations of RPCs available to MQ based client tools. Execution takes place on the server
* thread (i.e. serially). Arguments are serialised and deserialised automatically.
*/
class ServerRPCOps(
val services: ServiceHubInternal,
val stateMachineManager: StateMachineManager,
val services: ServiceHub,
val smm: StateMachineManager,
val database: Database
) : CordaRPCOps {
override val protocolVersion: Int = 0
@ -31,11 +40,100 @@ class ServerRPCOps(
}
override fun verifiedTransactions() = services.storageService.validatedTransactions.track()
override fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>> {
val (allStateMachines, changes) = stateMachineManager.track()
val (allStateMachines, changes) = smm.track()
return Pair(
allStateMachines.map { StateMachineInfo.fromProtocolStateMachineImpl(it) },
changes.map { StateMachineUpdate.fromStateMachineChange(it) }
)
}
override fun stateMachineRecordedTransactionMapping() = services.storageService.stateMachineRecordedTransactionMapping.track()
override fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult {
return databaseTransaction(database) {
when (command) {
is ClientToServiceCommand.IssueCash -> issueCash(command)
is ClientToServiceCommand.PayCash -> initiatePayment(command)
is ClientToServiceCommand.ExitCash -> exitCash(command)
}
}
}
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun initiatePayment(req: ClientToServiceCommand.PayCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(null)
// TODO: Have some way of restricting this to states the caller controls
try {
Cash().generateSpend(builder, req.amount.withoutIssuer(), req.recipient.owningKey,
// TODO: Move cash state filtering by issuer down to the contract itself
services.vaultService.currentVault.statesOfType<Cash.State>().filter { it.state.data.amount.token == req.amount.token },
setOf(req.amount.token.issuer.party))
.forEach {
val key = services.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}")
builder.signWith(KeyPair(it, key))
}
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
val protocol = FinalityProtocol(tx, setOf(req), setOf(req.recipient))
return TransactionBuildResult.ProtocolStarted(
smm.add(BroadcastTransactionProtocol.TOPIC, protocol).id,
tx,
"Cash payment transaction generated"
)
} catch(ex: InsufficientBalanceException) {
return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance")
}
}
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun exitCash(req: ClientToServiceCommand.ExitCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(null)
try {
val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef)
Cash().generateExit(builder, req.amount.issuedBy(issuer),
services.vaultService.currentVault.statesOfType<Cash.State>().filter { it.state.data.owner == issuer.party.owningKey })
builder.signWith(services.storageService.myLegalIdentityKey)
// Work out who the owners of the burnt states were
val inputStatesNullable = services.vaultService.statesForRefs(builder.inputStates())
val inputStates = inputStatesNullable.values.filterNotNull().map { it.data }
if (inputStatesNullable.size != inputStates.size) {
val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key }
throw InputStateRefResolveFailed(unresolvedStateRefs)
}
// TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them
// count as a reason to fail?
val participants: Set<Party> = inputStates.filterIsInstance<Cash.State>().map { services.identityService.partyFromKey(it.owner) }.filterNotNull().toSet()
// Commit the transaction
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
val protocol = FinalityProtocol(tx, setOf(req), participants)
return TransactionBuildResult.ProtocolStarted(
smm.add(BroadcastTransactionProtocol.TOPIC, protocol).id,
tx,
"Cash destruction transaction generated"
)
} catch (ex: InsufficientBalanceException) {
return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance")
}
}
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun issueCash(req: ClientToServiceCommand.IssueCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(notary = null)
val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef)
Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary)
builder.signWith(services.storageService.myLegalIdentityKey)
val tx = builder.toSignedTransaction(checkSufficientSignatures = true)
// Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it
val protocol = BroadcastTransactionProtocol(tx, setOf(req), setOf(req.recipient))
return TransactionBuildResult.ProtocolStarted(
smm.add(BroadcastTransactionProtocol.TOPIC, protocol).id,
tx,
"Cash issuance completed"
)
}
class InputStateRefResolveFailed(stateRefs: List<StateRef>) :
Exception("Failed to resolve input StateRefs $stateRefs")
}

View File

@ -1,8 +1,8 @@
package com.r3corda.node.services.messaging
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.node.services.Vault
import com.r3corda.core.protocols.StateMachineRunId
@ -28,9 +28,9 @@ data class StateMachineInfo(
}
}
sealed class StateMachineUpdate {
class Added(val stateMachineInfo: StateMachineInfo) : StateMachineUpdate()
class Removed(val stateMachineRunId: StateMachineRunId) : StateMachineUpdate()
sealed class StateMachineUpdate(val id: StateMachineRunId) {
class Added(val stateMachineInfo: StateMachineInfo) : StateMachineUpdate(stateMachineInfo.id)
class Removed(id: StateMachineRunId) : StateMachineUpdate(id)
companion object {
fun fromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate {
@ -51,6 +51,28 @@ sealed class StateMachineUpdate {
}
}
sealed class TransactionBuildResult {
/**
* State indicating that a protocol is managing this request, and that the client should track protocol state machine
* updates for further information. The monitor will separately receive notification of the state machine having been
* added, as it would any other state machine. This response is used solely to enable the monitor to identify
* the state machine (and its progress) as associated with the request.
*
* @param transaction the transaction created as a result, in the case where the protocol has completed.
*/
class ProtocolStarted(val id: StateMachineRunId, val transaction: SignedTransaction?, val message: String?) : TransactionBuildResult() {
override fun toString() = "Started($message)"
}
/**
* State indicating the action undertaken failed, either directly (it is not something which requires a
* state machine), or before a state machine was started.
*/
class Failed(val message: String?) : TransactionBuildResult() {
override fun toString() = "Failed($message)"
}
}
/**
* RPC operations that the node exposes to clients using the Java client library. These can be called from
* client apps and are implemented by the node in the [ServerRPCOps] class.
@ -73,6 +95,15 @@ interface CordaRPCOps : RPCOps {
*/
@RPCReturnsObservables
fun verifiedTransactions(): Pair<List<SignedTransaction>, Observable<SignedTransaction>>
/**
* Returns a pair of state machine id - recorded transaction hash pairs
*/
@RPCReturnsObservables
fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>>
/**
* Executes the given command, possibly triggering cash creation etc.
*/
fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult
}

View File

@ -5,13 +5,23 @@ import com.esotericsoftware.kryo.Registration
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.serializers.DefaultSerializers
import com.r3corda.contracts.asset.Cash
import com.r3corda.core.ErrorOr
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.DigitalSignature
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.node.services.Vault
import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.serialization.*
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.transactions.WireTransaction
import de.javakaffee.kryoserializers.ArraysAsListSerializer
import de.javakaffee.kryoserializers.guava.*
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.objenesis.strategy.StdInstantiatorStrategy
import org.slf4j.LoggerFactory
@ -118,7 +128,41 @@ private class RPCKryo(private val observableSerializer: Serializer<Observable<An
register(Notification::class.java)
register(Notification.Kind::class.java)
register(kotlin.Pair::class.java)
register(ArrayList::class.java)
register(listOf<Any>().javaClass) // EmptyList
register(IllegalStateException::class.java)
register(Pair::class.java)
register(StateMachineUpdate.Added::class.java)
register(StateMachineUpdate.Removed::class.java)
register(StateMachineInfo::class.java)
register(DigitalSignature.WithKey::class.java)
register(DigitalSignature.LegallyIdentifiable::class.java)
register(ByteArray::class.java)
register(EdDSAPublicKey::class.java, Ed25519PublicKeySerializer)
register(EdDSAPrivateKey::class.java, Ed25519PrivateKeySerializer)
register(Vault::class.java)
register(Vault.Update::class.java)
register(StateMachineRunId::class.java)
register(StateMachineTransactionMapping::class.java)
register(UUID::class.java)
register(LinkedHashSet::class.java)
register(StateAndRef::class.java)
register(setOf<Unit>().javaClass) // EmptySet
register(StateRef::class.java)
register(SecureHash.SHA256::class.java)
register(TransactionState::class.java)
register(Cash.State::class.java)
register(Amount::class.java)
register(Issued::class.java)
register(PartyAndReference::class.java)
register(OpaqueBytes::class.java)
register(Currency::class.java)
register(Cash::class.java)
register(Cash.Clauses.ConserveAmount::class.java)
register(listOf(Unit).javaClass) // SingletonList
register(setOf(Unit).javaClass) // SingletonSet
register(TransactionBuildResult.ProtocolStarted::class.java)
register(TransactionBuildResult.Failed::class.java)
// Exceptions. We don't bother sending the stack traces as the client will fill in its own anyway.
register(IllegalArgumentException::class.java)
@ -139,4 +183,4 @@ private class RPCKryo(private val observableSerializer: Serializer<Observable<An
}
}
fun createRPCKryo(observableSerializer: Serializer<Observable<Any>>? = null): Kryo = RPCKryo(observableSerializer)
fun createRPCKryo(observableSerializer: Serializer<Observable<Any>>? = null): Kryo = RPCKryo(observableSerializer)

View File

@ -1,60 +0,0 @@
package com.r3corda.node.services.monitor
import com.r3corda.core.contracts.*
import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.node.utilities.AddOrRemove
import java.time.Instant
import java.util.*
/**
* Events triggered by changes in the node, and sent to monitoring client(s).
*/
sealed class ServiceToClientEvent(val time: Instant) {
class Transaction(time: Instant, val transaction: SignedTransaction) : ServiceToClientEvent(time) {
override fun toString() = "Transaction(${transaction.tx.commands})"
}
class OutputState(
time: Instant,
val consumed: Set<StateRef>,
val produced: Set<StateAndRef<ContractState>>
) : ServiceToClientEvent(time) {
override fun toString() = "OutputState(consumed=$consumed, produced=${produced.map { it.state.data.javaClass.simpleName } })"
}
class StateMachine(
time: Instant,
val id: StateMachineRunId,
val label: String,
val addOrRemove: AddOrRemove
) : ServiceToClientEvent(time) {
override fun toString() = "StateMachine($label, ${addOrRemove.name})"
}
class Progress(time: Instant, val id: StateMachineRunId, val message: String) : ServiceToClientEvent(time) {
override fun toString() = "Progress($message)"
}
class TransactionBuild(time: Instant, val id: UUID, val state: TransactionBuildResult) : ServiceToClientEvent(time) {
override fun toString() = "TransactionBuild($state)"
}
}
sealed class TransactionBuildResult {
/**
* State indicating that a protocol is managing this request, and that the client should track protocol state machine
* updates for further information. The monitor will separately receive notification of the state machine having been
* added, as it would any other state machine. This response is used solely to enable the monitor to identify
* the state machine (and its progress) as associated with the request.
*
* @param transaction the transaction created as a result, in the case where the protocol has completed.
*/
class ProtocolStarted(val id: StateMachineRunId, val transaction: SignedTransaction?, val message: String?) : TransactionBuildResult() {
override fun toString() = "Started($message)"
}
/**
* State indicating the action undertaken failed, either directly (it is not something which requires a
* state machine), or before a state machine was started.
*/
class Failed(val message: String?) : TransactionBuildResult() {
override fun toString() = "Failed($message)"
}
}

View File

@ -1,20 +0,0 @@
package com.r3corda.node.services.monitor
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.protocols.DirectRequestMessage
data class RegisterRequest(override val replyToRecipient: SingleMessageRecipient,
override val sessionID: Long) : DirectRequestMessage
data class RegisterResponse(val success: Boolean)
// TODO: This should have a shared secret the monitor was sent in the registration response, for security
data class DeregisterRequest(override val replyToRecipient: SingleMessageRecipient,
override val sessionID: Long) : DirectRequestMessage
data class DeregisterResponse(val success: Boolean)
data class StateSnapshotMessage(val contractStates: Collection<StateAndRef<ContractState>>, val protocolStates: Collection<String>)
data class ClientToServiceCommandMessage(override val sessionID: Long, override val replyToRecipient: SingleMessageRecipient, val command: ClientToServiceCommand) : DirectRequestMessage

View File

@ -1,233 +0,0 @@
package com.r3corda.node.services.monitor
import co.paralleluniverse.common.util.VisibleForTesting
import com.r3corda.contracts.asset.Cash
import com.r3corda.contracts.asset.InsufficientBalanceException
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.toStringShort
import com.r3corda.core.messaging.MessageRecipients
import com.r3corda.core.messaging.createMessage
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.Vault
import com.r3corda.core.protocols.ProtocolLogic
import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.serialization.serialize
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.transactions.TransactionBuilder
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.api.AbstractNodeService
import com.r3corda.node.services.api.ServiceHubInternal
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.protocols.BroadcastTransactionProtocol
import com.r3corda.protocols.FinalityProtocol
import java.security.KeyPair
import java.time.Instant
import java.util.*
import javax.annotation.concurrent.ThreadSafe
/**
* Service which allows external clients to monitor the node's vault and state machine manager, as well as trigger
* actions within the node. The service also sends requests for user input back to clients, for example to enter
* additional information while a protocol runs, or confirm an action.
*
* This is intended to enable a range of tools from end user UI to ops tools which monitor health across a number of nodes.
*/
// TODO: Implement authorization controls+
// TODO: Replace this entirely with a publish/subscribe based solution on a to-be-written service (likely JMS or similar),
// rather than implement authentication and publish/subscribe ourselves.
// TODO: Clients need to be able to indicate whether they support interactivity (no point in sending requests for input
// to a monitoring tool)
@ThreadSafe
class NodeMonitorService(services: ServiceHubInternal, val smm: StateMachineManager) : AbstractNodeService(services) {
companion object {
val REGISTER_TOPIC = "platform.monitor.register"
val DEREGISTER_TOPIC = "platform.monitor.deregister"
val STATE_TOPIC = "platform.monitor.state_snapshot"
val IN_EVENT_TOPIC = "platform.monitor.in"
val OUT_EVENT_TOPIC = "platform.monitor.out"
val logger = loggerFor<NodeMonitorService>()
}
val listeners: MutableSet<RegisteredListener> = HashSet()
data class RegisteredListener(val recipients: MessageRecipients, val sessionID: Long)
init {
addMessageHandler(REGISTER_TOPIC) { req: RegisterRequest -> processRegisterRequest(req) }
addMessageHandler(DEREGISTER_TOPIC) { req: DeregisterRequest -> processDeregisterRequest(req) }
addMessageHandler(OUT_EVENT_TOPIC) { req: ClientToServiceCommandMessage -> processEventRequest(req) }
// Notify listeners on state changes
services.storageService.validatedTransactions.updates.subscribe { tx -> notifyTransaction(tx) }
services.vaultService.updates.subscribe { update -> notifyVaultUpdate(update) }
smm.changes.subscribe { change ->
val id: StateMachineRunId = change.id
val logic: ProtocolLogic<*> = change.logic
val progressTracker = logic.progressTracker
notifyEvent(ServiceToClientEvent.StateMachine(Instant.now(), id, logic.javaClass.name, change.addOrRemove))
if (progressTracker != null) {
when (change.addOrRemove) {
AddOrRemove.ADD -> progressTracker.changes.subscribe { progress ->
notifyEvent(ServiceToClientEvent.Progress(Instant.now(), id, progress.toString()))
}
AddOrRemove.REMOVE -> {
// Nothing to do
}
}
}
}
}
@VisibleForTesting
internal fun notifyVaultUpdate(update: Vault.Update)
= notifyEvent(ServiceToClientEvent.OutputState(Instant.now(), update.consumed, update.produced))
@VisibleForTesting
internal fun notifyTransaction(transaction: SignedTransaction)
= notifyEvent(ServiceToClientEvent.Transaction(Instant.now(), transaction))
private fun processEventRequest(reqMessage: ClientToServiceCommandMessage) {
val req = reqMessage.command
val result: TransactionBuildResult? =
try {
when (req) {
is ClientToServiceCommand.IssueCash -> issueCash(req)
is ClientToServiceCommand.PayCash -> initiatePayment(req)
is ClientToServiceCommand.ExitCash -> exitCash(req)
else -> throw IllegalArgumentException("Unknown request type ${req.javaClass.name}")
}
} catch(ex: Exception) {
logger.warn("Exception while processing message of type ${req.javaClass.simpleName}", ex)
TransactionBuildResult.Failed(ex.message)
}
// Send back any result from the event. Not all events (especially TransactionInput) produce a
// result.
if (result != null) {
val event = ServiceToClientEvent.TransactionBuild(Instant.now(), req.id, result)
val respMessage = net.createMessage(IN_EVENT_TOPIC, reqMessage.sessionID,
event.serialize().bits)
net.send(respMessage, reqMessage.getReplyTo(services.networkMapCache))
}
}
/**
* Process a request from a monitor to remove them from the subscribers.
*/
fun processDeregisterRequest(req: DeregisterRequest) {
val message = try {
// TODO: Session ID should be managed by the messaging layer, so it handles ensuring that the
// request comes from the same endpoint that registered at the start.
listeners.remove(RegisteredListener(req.replyToRecipient, req.sessionID))
net.createMessage(DEREGISTER_TOPIC, req.sessionID, DeregisterResponse(true).serialize().bits)
} catch (ex: IllegalStateException) {
net.createMessage(DEREGISTER_TOPIC, req.sessionID, DeregisterResponse(false).serialize().bits)
}
net.send(message, req.replyToRecipient)
}
/**
* Process a request from a monitor to add them to the subscribers. This includes hooks to authenticate the request,
* but currently all requests pass (and there's no access control on vaults, so it has no actual meaning).
*/
fun processRegisterRequest(req: RegisterRequest) {
try {
listeners.add(RegisteredListener(req.replyToRecipient, req.sessionID))
val stateMessage = StateSnapshotMessage(services.vaultService.currentVault.states.toList(),
smm.allStateMachines.map { it.javaClass.name })
net.send(net.createMessage(STATE_TOPIC, DEFAULT_SESSION_ID, stateMessage.serialize().bits), req.replyToRecipient)
val message = net.createMessage(REGISTER_TOPIC, req.sessionID, RegisterResponse(true).serialize().bits)
net.send(message, req.replyToRecipient)
} catch (ex: IllegalStateException) {
val message = net.createMessage(REGISTER_TOPIC, req.sessionID, RegisterResponse(false).serialize().bits)
net.send(message, req.replyToRecipient)
}
}
private fun notifyEvent(event: ServiceToClientEvent) = listeners.forEach { monitor ->
net.send(net.createMessage(IN_EVENT_TOPIC, monitor.sessionID, event.serialize().bits), monitor.recipients)
}
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun initiatePayment(req: ClientToServiceCommand.PayCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(null)
// TODO: Have some way of restricting this to states the caller controls
try {
Cash().generateSpend(builder, req.amount.withoutIssuer(), req.recipient.owningKey,
// TODO: Move cash state filtering by issuer down to the contract itself
services.vaultService.currentVault.statesOfType<Cash.State>().filter { it.state.data.amount.token == req.amount.token },
setOf(req.amount.token.issuer.party))
.forEach {
val key = services.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}")
builder.signWith(KeyPair(it, key))
}
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
val protocol = FinalityProtocol(tx, setOf(req), setOf(req.recipient))
return TransactionBuildResult.ProtocolStarted(
smm.add("broadcast", protocol).id,
tx,
"Cash payment transaction generated"
)
} catch(ex: InsufficientBalanceException) {
return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance")
}
}
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun exitCash(req: ClientToServiceCommand.ExitCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(null)
try {
val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef)
Cash().generateExit(builder, req.amount.issuedBy(issuer),
services.vaultService.currentVault.statesOfType<Cash.State>().filter { it.state.data.owner == issuer.party.owningKey })
builder.signWith(services.storageService.myLegalIdentityKey)
// Work out who the owners of the burnt states were
val inputStatesNullable = services.vaultService.statesForRefs(builder.inputStates())
val inputStates = inputStatesNullable.values.filterNotNull().map { it.data }
if (inputStatesNullable.size != inputStates.size) {
val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key }
throw InputStateRefResolveFailed(unresolvedStateRefs)
}
// TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them
// count as a reason to fail?
val participants: Set<Party> = inputStates.filterIsInstance<Cash.State>().map { services.identityService.partyFromKey(it.owner) }.filterNotNull().toSet()
// Commit the transaction
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
val protocol = FinalityProtocol(tx, setOf(req), participants)
return TransactionBuildResult.ProtocolStarted(
smm.add("broadcast", protocol).id,
tx,
"Cash destruction transaction generated"
)
} catch (ex: InsufficientBalanceException) {
return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance")
}
}
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
private fun issueCash(req: ClientToServiceCommand.IssueCash): TransactionBuildResult {
val builder: TransactionBuilder = TransactionType.General.Builder(notary = null)
val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef)
Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary)
builder.signWith(services.storageService.myLegalIdentityKey)
val tx = builder.toSignedTransaction(checkSufficientSignatures = true)
// Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it
val protocol = BroadcastTransactionProtocol(tx, setOf(req), setOf(req.recipient))
return TransactionBuildResult.ProtocolStarted(
smm.add("broadcast", protocol).id,
tx,
"Cash issuance completed"
)
}
class InputStateRefResolveFailed(stateRefs: List<StateRef>) :
Exception("Failed to resolve input StateRefs $stateRefs")
}

View File

@ -0,0 +1,174 @@
package com.r3corda.node
import com.r3corda.contracts.asset.Cash
import com.r3corda.core.contracts.*
import com.r3corda.core.node.services.Vault
import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.serialization.OpaqueBytes
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.utilities.DUMMY_NOTARY
import com.r3corda.node.internal.ServerRPCOps
import com.r3corda.node.services.messaging.StateMachineUpdate
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.ValidatingNotaryService
import com.r3corda.testing.expect
import com.r3corda.testing.expectEvents
import com.r3corda.testing.node.MockNetwork
import com.r3corda.testing.node.MockNetwork.MockNode
import com.r3corda.testing.sequence
import org.junit.Before
import org.junit.Test
import rx.Observable
import kotlin.test.assertEquals
import kotlin.test.assertFalse
/**
* Unit tests for the node monitoring service.
*/
class ServerRPCTest {
lateinit var network: MockNetwork
lateinit var aliceNode: MockNode
lateinit var notaryNode: MockNode
lateinit var rpc: ServerRPCOps
lateinit var stateMachineUpdates: Observable<StateMachineUpdate>
lateinit var transactions: Observable<SignedTransaction>
lateinit var vaultUpdates: Observable<Vault.Update>
@Before
fun setup() {
network = MockNetwork()
val networkMap = network.createNode(advertisedServices = NetworkMapService.Type)
aliceNode = network.createNode(networkMapAddress = networkMap.info.address)
notaryNode = network.createNode(advertisedServices = ValidatingNotaryService.Type, networkMapAddress = networkMap.info.address)
rpc = ServerRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database)
stateMachineUpdates = rpc.stateMachinesAndUpdates().second
transactions = rpc.verifiedTransactions().second
vaultUpdates = rpc.vaultAndUpdates().second
}
@Test
fun `cash issue accepted`() {
val quantity = 1000L
val ref = OpaqueBytes(ByteArray(1) {1})
// Check the monitoring service wallet is empty
assertFalse(aliceNode.services.vaultService.currentVault.states.iterator().hasNext())
// Tell the monitoring service node to issue some cash
val recipient = aliceNode.services.storageService.myLegalIdentity
val outEvent = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), ref, recipient, DUMMY_NOTARY)
rpc.executeCommand(outEvent)
network.runNetwork()
val expectedState = Cash.State(Amount(quantity,
Issued(aliceNode.services.storageService.myLegalIdentity.ref(ref), GBP)),
recipient.owningKey)
var issueSmId: StateMachineRunId? = null
stateMachineUpdates.expectEvents {
sequence(
// ISSUE
expect { add: StateMachineUpdate.Added ->
issueSmId = add.id
},
expect { remove: StateMachineUpdate.Removed ->
require(remove.id == issueSmId)
}
)
}
transactions.expectEvents {
expect { tx ->
assertEquals(expectedState, tx.tx.outputs.single().data)
}
}
vaultUpdates.expectEvents {
expect { update ->
val actual = update.produced.single().state.data
assertEquals(expectedState, actual)
}
}
}
@Test
fun issueAndMoveWorks() {
rpc.executeCommand(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.services.storageService.myLegalIdentity,
notary = notaryNode.services.storageService.myLegalIdentity
))
network.runNetwork()
rpc.executeCommand(ClientToServiceCommand.PayCash(
amount = Amount(100, Issued(PartyAndReference(aliceNode.services.storageService.myLegalIdentity, OpaqueBytes(ByteArray(1, { 1 }))), USD)),
recipient = aliceNode.services.storageService.myLegalIdentity
))
network.runNetwork()
var issueSmId: StateMachineRunId? = null
var moveSmId: StateMachineRunId? = null
stateMachineUpdates.expectEvents {
sequence(
// ISSUE
expect { add: StateMachineUpdate.Added ->
issueSmId = add.id
},
expect { remove: StateMachineUpdate.Removed ->
require(remove.id == issueSmId)
},
// MOVE
expect { add: StateMachineUpdate.Added ->
moveSmId = add.id
},
expect { remove: StateMachineUpdate.Removed ->
require(remove.id == moveSmId)
}
)
}
transactions.expectEvents {
sequence(
// ISSUE
expect { tx ->
require(tx.tx.inputs.isEmpty())
require(tx.tx.outputs.size == 1)
val signaturePubKeys = tx.sigs.map { it.by }.toSet()
// Only Alice signed
require(signaturePubKeys.size == 1)
require(signaturePubKeys.contains(aliceNode.services.storageService.myLegalIdentity.owningKey))
},
// MOVE
expect { tx ->
require(tx.tx.inputs.size == 1)
require(tx.tx.outputs.size == 1)
val signaturePubKeys = tx.sigs.map { it.by }.toSet()
// Alice and Notary signed
require(signaturePubKeys.size == 2)
require(signaturePubKeys.contains(aliceNode.services.storageService.myLegalIdentity.owningKey))
require(signaturePubKeys.contains(notaryNode.services.storageService.myLegalIdentity.owningKey))
}
)
}
vaultUpdates.expectEvents {
sequence(
// ISSUE
expect { update ->
require(update.consumed.size == 0) { update.consumed.size }
require(update.produced.size == 1) { update.produced.size }
},
// MOVE
expect { update ->
require(update.consumed.size == 1) { update.consumed.size }
require(update.produced.size == 1) { update.produced.size }
}
)
}
}
}

View File

@ -1,234 +0,0 @@
package com.r3corda.node.services
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.contracts.asset.Cash
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.crypto.newSecureRandom
import com.r3corda.core.messaging.createMessage
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.Vault
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.OpaqueBytes
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.DUMMY_NOTARY
import com.r3corda.core.utilities.DUMMY_PUBKEY_1
import com.r3corda.node.services.monitor.*
import com.r3corda.node.services.monitor.NodeMonitorService.Companion.IN_EVENT_TOPIC
import com.r3corda.node.services.monitor.NodeMonitorService.Companion.REGISTER_TOPIC
import com.r3corda.node.utilities.AddOrRemove
import com.r3corda.testing.expect
import com.r3corda.testing.expectEvents
import com.r3corda.testing.node.MockNetwork
import com.r3corda.testing.node.MockNetwork.MockNode
import com.r3corda.testing.parallel
import com.r3corda.testing.sequence
import org.junit.Before
import org.junit.Test
import rx.subjects.ReplaySubject
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
/**
* Unit tests for the node monitoring service.
*/
class NodeMonitorServiceTests {
lateinit var network: MockNetwork
@Before
fun setup() {
network = MockNetwork()
}
/**
* Authenticate the register node with the monitor service node.
*/
private fun authenticate(monitorServiceNode: MockNode, registerNode: MockNode): Long {
network.runNetwork()
val sessionId = random63BitValue()
val authenticatePsm = register(registerNode, monitorServiceNode, sessionId)
network.runNetwork()
authenticatePsm.get(1, TimeUnit.SECONDS)
return sessionId
}
/**
* Test a very simple case of trying to register against the service.
*/
@Test
fun `success with network`() {
val (monitorServiceNode, registerNode) = network.createTwoNodes()
network.runNetwork()
val authenticatePsm = register(registerNode, monitorServiceNode, random63BitValue())
network.runNetwork()
val result = authenticatePsm.get(1, TimeUnit.SECONDS)
assertTrue(result.success)
}
/**
* Test that having registered, changes are relayed correctly.
*/
@Test
fun `event received`() {
val (monitorServiceNode, registerNode) = network.createTwoNodes()
val sessionID = authenticate(monitorServiceNode, registerNode)
var receivePsm = receiveWalletUpdate(registerNode, sessionID)
var expected = Vault.Update(emptySet(), emptySet())
monitorServiceNode.inNodeMonitorService!!.notifyVaultUpdate(expected)
network.runNetwork()
var actual = receivePsm.get(1, TimeUnit.SECONDS)
assertEquals(expected.consumed, actual.consumed)
assertEquals(expected.produced, actual.produced)
// Check that states are passed through correctly
receivePsm = receiveWalletUpdate(registerNode, sessionID)
val consumed = setOf(StateRef(SecureHash.randomSHA256(), 0))
val producedState = TransactionState(DummyContract.SingleOwnerState(newSecureRandom().nextInt(), DUMMY_PUBKEY_1), DUMMY_NOTARY)
val produced = setOf(StateAndRef(producedState, StateRef(SecureHash.randomSHA256(), 0)))
expected = Vault.Update(consumed, produced)
monitorServiceNode.inNodeMonitorService!!.notifyVaultUpdate(expected)
network.runNetwork()
actual = receivePsm.get(1, TimeUnit.SECONDS)
assertEquals(expected.produced, actual.produced)
assertEquals(expected.consumed, actual.consumed)
}
@Test
fun `cash issue accepted`() {
val (monitorServiceNode, registerNode) = network.createTwoNodes()
val sessionID = authenticate(monitorServiceNode, registerNode)
val quantity = 1000L
val events = ReplaySubject.create<ServiceToClientEvent>()
val ref = OpaqueBytes(ByteArray(1) {1})
registerNode.net.addMessageHandler(IN_EVENT_TOPIC, sessionID) { msg, reg ->
events.onNext(msg.data.deserialize<ServiceToClientEvent>())
}
// Check the monitoring service wallet is empty
assertFalse(monitorServiceNode.services.vaultService.currentVault.states.iterator().hasNext())
// Tell the monitoring service node to issue some cash
val recipient = monitorServiceNode.services.storageService.myLegalIdentity
val outEvent = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), ref, recipient, DUMMY_NOTARY)
val message = registerNode.net.createMessage(NodeMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits)
registerNode.net.send(message, monitorServiceNode.net.myAddress)
network.runNetwork()
val expectedState = Cash.State(Amount(quantity,
Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)),
recipient.owningKey)
// Check we've received a response
events.expectEvents {
parallel(
sequence(
expect { event: ServiceToClientEvent.StateMachine ->
require(event.addOrRemove == AddOrRemove.ADD)
},
expect { event: ServiceToClientEvent.StateMachine ->
require(event.addOrRemove == AddOrRemove.REMOVE)
}
),
expect { event: ServiceToClientEvent.Transaction -> },
expect { event: ServiceToClientEvent.TransactionBuild ->
// Check the returned event is correct
val tx = (event.state as TransactionBuildResult.ProtocolStarted).transaction
assertNotNull(tx)
assertEquals(expectedState, tx!!.tx.outputs.single().data)
},
expect { event: ServiceToClientEvent.OutputState ->
// Check the generated state is correct
val actual = event.produced.single().state.data
assertEquals(expectedState, actual)
}
)
}
}
@Test
fun `cash move accepted`() {
val (monitorServiceNode, registerNode) = network.createTwoNodes()
val sessionID = authenticate(monitorServiceNode, registerNode)
val quantity = 1000L
val events = ReplaySubject.create<ServiceToClientEvent>()
registerNode.net.addMessageHandler(IN_EVENT_TOPIC, sessionID) { msg, reg ->
events.onNext(msg.data.deserialize<ServiceToClientEvent>())
}
val recipient = monitorServiceNode.services.storageService.myLegalIdentity
// Tell the monitoring service node to issue some cash so we can spend it later
val issueCommand = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), OpaqueBytes.of(0), recipient, recipient)
val issueMessage = registerNode.net.createMessage(NodeMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, issueCommand).serialize().bits)
registerNode.net.send(issueMessage, monitorServiceNode.net.myAddress)
val payCommand = ClientToServiceCommand.PayCash(Amount(quantity, Issued(recipient.ref(0), GBP)), recipient)
val payMessage = registerNode.net.createMessage(NodeMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, payCommand).serialize().bits)
registerNode.net.send(payMessage, monitorServiceNode.net.myAddress)
network.runNetwork()
events.expectEvents(isStrict = false) {
sequence(
// ISSUE
parallel(
sequence(
expect { event: ServiceToClientEvent.StateMachine ->
require(event.addOrRemove == AddOrRemove.ADD)
},
expect { event: ServiceToClientEvent.StateMachine ->
require(event.addOrRemove == AddOrRemove.REMOVE)
}
),
expect { event: ServiceToClientEvent.Transaction -> },
expect { event: ServiceToClientEvent.TransactionBuild -> },
expect { event: ServiceToClientEvent.OutputState -> }
),
// MOVE
parallel(
sequence(
expect { event: ServiceToClientEvent.StateMachine ->
require(event.addOrRemove == AddOrRemove.ADD)
},
expect { event: ServiceToClientEvent.StateMachine ->
require(event.addOrRemove == AddOrRemove.REMOVE)
}
),
expect { event: ServiceToClientEvent.Transaction ->
require(event.transaction.sigs.size == 1)
event.transaction.sigs.map { it.by }.containsAll(
listOf(
monitorServiceNode.services.storageService.myLegalIdentity.owningKey
)
)
},
expect { event: ServiceToClientEvent.TransactionBuild ->
require(event.state is TransactionBuildResult.ProtocolStarted)
},
expect { event: ServiceToClientEvent.OutputState ->
require(event.consumed.size == 1)
require(event.produced.size == 1)
}
)
)
}
}
private fun register(registerNode: MockNode, monitorServiceNode: MockNode, sessionId: Long): ListenableFuture<RegisterResponse> {
val req = RegisterRequest(registerNode.services.networkService.myAddress, sessionId)
return registerNode.sendAndReceive<RegisterResponse>(REGISTER_TOPIC, monitorServiceNode, req)
}
private fun receiveWalletUpdate(registerNode: MockNode, sessionId: Long): ListenableFuture<ServiceToClientEvent.OutputState> {
return registerNode.receive<ServiceToClientEvent.OutputState>(IN_EVENT_TOPIC, sessionId)
}
}