mirror of
https://github.com/corda/corda.git
synced 2024-12-29 09:18:58 +00:00
Add WalletMonitorService
This commit is contained in:
parent
3e121f3930
commit
cf40e0db70
node/src
main/kotlin/com/r3corda/node
test/kotlin/com/r3corda/node/services
@ -27,6 +27,7 @@ import com.r3corda.node.services.events.NodeSchedulerService
|
||||
import com.r3corda.node.services.events.ScheduledActivityObserver
|
||||
import com.r3corda.node.services.identity.InMemoryIdentityService
|
||||
import com.r3corda.node.services.keys.E2ETestKeyManagementService
|
||||
import com.r3corda.node.services.monitor.WalletMonitorService
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapCache
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapService
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
@ -125,6 +126,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
lateinit var keyManagement: E2ETestKeyManagementService
|
||||
var inNodeNetworkMapService: NetworkMapService? = null
|
||||
var inNodeNotaryService: NotaryService? = null
|
||||
var inNodeWalletMonitorService: WalletMonitorService? = null
|
||||
lateinit var identity: IdentityService
|
||||
lateinit var net: MessagingServiceInternal
|
||||
lateinit var netMapCache: NetworkMapCache
|
||||
@ -167,6 +169,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
wallet = makeWalletService()
|
||||
|
||||
identity = makeIdentityService()
|
||||
|
||||
// Place the long term identity key in the KMS. Eventually, this is likely going to be separated again because
|
||||
// the KMS is meant for derived temporary keys used in transactions, and we're not supposed to sign things with
|
||||
// the identity key. But the infrastructure to make that easy isn't here yet.
|
||||
@ -186,6 +189,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
checkpointStorage,
|
||||
serverThread)
|
||||
|
||||
inNodeWalletMonitorService = makeWalletMonitorService() // Note this HAS to be after smm is set
|
||||
buildAdvertisedServices()
|
||||
|
||||
// TODO: this model might change but for now it provides some de-coupling
|
||||
@ -338,6 +342,8 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
// TODO: sort out ordering of open & protected modifiers of functions in this class.
|
||||
protected open fun makeWalletService(): WalletService = NodeWalletService(services)
|
||||
|
||||
protected open fun makeWalletMonitorService(): WalletMonitorService = WalletMonitorService(net, smm, services)
|
||||
|
||||
open fun stop() {
|
||||
// TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the
|
||||
// network, including unsubscribing from updates from remote services. Possibly some sort of parameter to stop()
|
||||
|
@ -0,0 +1,86 @@
|
||||
package com.r3corda.node.services.monitor
|
||||
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.Party
|
||||
import com.r3corda.core.serialization.OpaqueBytes
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* Events triggered by changes in the node, and sent to monitoring client(s).
|
||||
*/
|
||||
sealed class ServiceToClientEvent(val time: Instant) {
|
||||
class OutputState(time: Instant, val consumed: Set<StateRef>, val produced: Set<StateAndRef<ContractState>>) : ServiceToClientEvent(time)
|
||||
class StateMachine(time: Instant, val fiberId: Long, val label: String, val addOrRemove: AddOrRemove) : ServiceToClientEvent(time)
|
||||
class Progress(time: Instant, val fiberId: Long, val message: String) : ServiceToClientEvent(time)
|
||||
class TransactionBuild(time: Instant, val id: UUID, val state: TransactionBuildResult) : ServiceToClientEvent(time)
|
||||
}
|
||||
|
||||
sealed class TransactionBuildResult {
|
||||
/**
|
||||
* State indicating the action undertaken has been completed (it was not complex enough to require a
|
||||
* state machine starting).
|
||||
*
|
||||
* @param transaction the transaction created as a result.
|
||||
*/
|
||||
// TODO: We should have a consistent "Transaction your request triggered has been built" event, rather than these
|
||||
// once-off results from a request. Unclear if that means all requests need to trigger a protocol state machine,
|
||||
// so the client sees a consistent process, or if some other solution can be found.
|
||||
class Complete(val transaction: SignedTransaction, val message: String?) : TransactionBuildResult()
|
||||
|
||||
/**
|
||||
* State indicating that a protocol is managing this request, and that the client should track protocol state machine
|
||||
* updates for further information. The monitor will separately receive notification of the state machine having been
|
||||
* added, as it would any other state machine. This response is used solely to enable the monitor to identify
|
||||
* the state machine (and its progress) as associated with the request.
|
||||
*
|
||||
* @param transaction the transaction created as a result, in the case where the protocol has completed.
|
||||
*/
|
||||
class ProtocolStarted(val fiberId: Long, val transaction: SignedTransaction?, val message: String?) : TransactionBuildResult()
|
||||
|
||||
/**
|
||||
* State indicating the action undertaken failed, either directly (it is not something which requires a
|
||||
* state machine), or before a state machine was started.
|
||||
*/
|
||||
class Failed(val message: String?) : TransactionBuildResult()
|
||||
}
|
||||
|
||||
/**
|
||||
* A command from the monitoring client, to the node.
|
||||
*
|
||||
* @param id ID used to tag event(s) resulting from a command.
|
||||
*/
|
||||
sealed class ClientToServiceCommand(val id: UUID) {
|
||||
// TODO: Replace with a generic event for starting a protocol which then passes back required information, rather
|
||||
// than using an event for every conceivable action.
|
||||
/**
|
||||
* Issue cash state objects.
|
||||
*
|
||||
* @param currency the currency to issue.
|
||||
* @param issueRef the reference to specify on the issuance, used to differentiate pools of cash. Convention is
|
||||
* to use the single byte "0x01" as a default.
|
||||
* @param pennies the amount to issue, in the smallest unit of the currency.
|
||||
* @param recipient the public key of the recipient.
|
||||
* @param notary the notary to use for this transaction.
|
||||
* @param id the ID to be provided in events resulting from this request.
|
||||
*/
|
||||
class IssueCash(val currency: Currency,
|
||||
val issueRef: OpaqueBytes,
|
||||
val pennies: Long,
|
||||
val recipient: PublicKey,
|
||||
val notary: Party,
|
||||
id: UUID = UUID.randomUUID())
|
||||
: ClientToServiceCommand(id)
|
||||
class PayCash(val tokenDef: Issued<Currency>, val pennies: Long, val owner: PublicKey,
|
||||
id: UUID = UUID.randomUUID())
|
||||
: ClientToServiceCommand(id)
|
||||
|
||||
/**
|
||||
* @param id the ID to be provided in events resulting from this request.
|
||||
*/
|
||||
class ExitCash(val currency: Currency, val issueRef: OpaqueBytes, val pennies: Long,
|
||||
id: UUID = UUID.randomUUID())
|
||||
: ClientToServiceCommand(id)
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package com.r3corda.node.services.monitor
|
||||
|
||||
import com.r3corda.core.contracts.ContractState
|
||||
import com.r3corda.core.messaging.SingleMessageRecipient
|
||||
import com.r3corda.protocols.DirectRequestMessage
|
||||
|
||||
data class RegisterRequest(override val replyToRecipient: SingleMessageRecipient,
|
||||
override val sessionID: Long) : DirectRequestMessage
|
||||
|
||||
data class RegisterResponse(val success: Boolean)
|
||||
// TODO: This should have a shared secret the monitor was sent in the registration response, for security
|
||||
data class DeregisterRequest(override val replyToRecipient: SingleMessageRecipient,
|
||||
override val sessionID: Long) : DirectRequestMessage
|
||||
|
||||
data class DeregisterResponse(val success: Boolean)
|
||||
data class StateSnapshotMessage(val contractStates: Collection<ContractState>, val protocolStates: Collection<String>)
|
||||
|
||||
data class ClientToServiceCommandMessage(override val sessionID: Long, override val replyToRecipient: SingleMessageRecipient, val command: ClientToServiceCommand) : DirectRequestMessage
|
@ -0,0 +1,225 @@
|
||||
package com.r3corda.node.services.monitor
|
||||
|
||||
import co.paralleluniverse.common.util.VisibleForTesting
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.r3corda.contracts.asset.Cash
|
||||
import com.r3corda.contracts.asset.InsufficientBalanceException
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.toStringShort
|
||||
import com.r3corda.core.messaging.Message
|
||||
import com.r3corda.core.messaging.MessageRecipients
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.node.ServiceHub
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.ServiceType
|
||||
import com.r3corda.core.node.services.Wallet
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import com.r3corda.core.utilities.loggerFor
|
||||
import com.r3corda.node.services.api.AbstractNodeService
|
||||
import com.r3corda.node.services.persistence.DataVending
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.security.KeyPair
|
||||
import java.security.PublicKey
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* Service which allows external clients to monitor the wallet service and state machine manager, as well as trigger
|
||||
* actions within the node. The service also sends requests for user input back to clients, for example to enter
|
||||
* additional information while a protocol runs, or confirm an action.
|
||||
*
|
||||
* This is intended to enable a range of tools from end user UI to ops tools which monitor health across a number of nodes.
|
||||
*/
|
||||
// TODO: Implement authorization controls+
|
||||
// TODO: Replace this entirely with a publish/subscribe based solution on a to-be-written service (likely JMS or similar),
|
||||
// rather than implement authentication and publish/subscribe ourselves.
|
||||
// TODO: Clients need to be able to indicate whether they support interactivity (no point in sending requests for input
|
||||
// to a monitoring tool)
|
||||
@ThreadSafe
|
||||
class WalletMonitorService(net: MessagingService, val smm: StateMachineManager, val services: ServiceHub)
|
||||
: AbstractNodeService(net, services.networkMapCache) {
|
||||
companion object {
|
||||
val REGISTER_TOPIC = "platform.wallet_monitor.register"
|
||||
val DEREGISTER_TOPIC = "platform.wallet_monitor.deregister"
|
||||
val STATE_TOPIC = "platform.wallet_monitor.state_snapshot"
|
||||
val IN_EVENT_TOPIC = "platform.wallet_monitor.in"
|
||||
val OUT_EVENT_TOPIC = "platform.wallet_monitor.out"
|
||||
|
||||
val logger = loggerFor<WalletMonitorService>()
|
||||
}
|
||||
|
||||
val listeners: MutableSet<RegisteredListener> = HashSet()
|
||||
|
||||
data class RegisteredListener(val recipients: MessageRecipients, val sessionID: Long)
|
||||
|
||||
init {
|
||||
addMessageHandler(REGISTER_TOPIC) { req: RegisterRequest -> processRegisterRequest(req) }
|
||||
addMessageHandler(DEREGISTER_TOPIC) { req: DeregisterRequest -> processDeregisterRequest(req) }
|
||||
addMessageHandler(OUT_EVENT_TOPIC) { req: ClientToServiceCommandMessage -> processEventRequest(req) }
|
||||
|
||||
// Notify listeners on state changes
|
||||
services.walletService.updates.subscribe { update -> notifyWalletUpdate(update) }
|
||||
smm.changes.subscribe { change ->
|
||||
val fiberId: Long = change.third
|
||||
val logic: ProtocolLogic<*> = change.first
|
||||
val progressTracker = logic.progressTracker
|
||||
|
||||
notifyEvent(ServiceToClientEvent.StateMachine(Instant.now(), fiberId, logic.javaClass.name, change.second))
|
||||
if (progressTracker != null) {
|
||||
when (change.second) {
|
||||
AddOrRemove.ADD -> progressTracker.changes.subscribe { progress ->
|
||||
notifyEvent(ServiceToClientEvent.Progress(Instant.now(), fiberId, progress.toString()))
|
||||
}
|
||||
AddOrRemove.REMOVE -> {
|
||||
// Nothing to do
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
internal fun notifyWalletUpdate(update: Wallet.Update)
|
||||
= notifyEvent(ServiceToClientEvent.OutputState(Instant.now(), update.consumed, update.produced))
|
||||
|
||||
private fun processEventRequest(reqMessage: ClientToServiceCommandMessage) {
|
||||
val req = reqMessage.command
|
||||
val result: TransactionBuildResult? =
|
||||
try {
|
||||
when (req) {
|
||||
is ClientToServiceCommand.IssueCash -> issueCash(req)
|
||||
is ClientToServiceCommand.PayCash -> initatePayment(req)
|
||||
is ClientToServiceCommand.ExitCash -> exitCash(req)
|
||||
else -> throw IllegalArgumentException("Unknown request type ${req.javaClass.name}")
|
||||
}
|
||||
} catch(ex: Exception) {
|
||||
TransactionBuildResult.Failed(ex.message)
|
||||
}
|
||||
|
||||
// Send back any result from the event. Not all events (especially TransactionInput) produce a
|
||||
// result.
|
||||
if (result != null) {
|
||||
val event = ServiceToClientEvent.TransactionBuild(Instant.now(), req.id, result)
|
||||
val respMessage = net.createMessage(IN_EVENT_TOPIC, reqMessage.sessionID,
|
||||
event.serialize().bits)
|
||||
net.send(respMessage, reqMessage.getReplyTo(services.networkMapCache))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a request from a monitor to remove them from the subscribers.
|
||||
*/
|
||||
fun processDeregisterRequest(req: DeregisterRequest) {
|
||||
val message: Message
|
||||
try {
|
||||
// TODO: Session ID should be managed by the messaging layer, so it handles ensuring that the
|
||||
// request comes from the same endpoint that registered at the start.
|
||||
listeners.remove(RegisteredListener(req.replyToRecipient, req.sessionID))
|
||||
message = net.createMessage(DEREGISTER_TOPIC, req.sessionID, DeregisterResponse(true).serialize().bits)
|
||||
} catch (ex: IllegalStateException) {
|
||||
message = net.createMessage(DEREGISTER_TOPIC, req.sessionID, DeregisterResponse(false).serialize().bits)
|
||||
}
|
||||
net.send(message, req.replyToRecipient)
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a request from a monitor to add them to the subscribers. This includes hooks to authenticate the request,
|
||||
* but currently all requests pass (and there's no access control on wallets, so it has no actual meaning).
|
||||
*/
|
||||
fun processRegisterRequest(req: RegisterRequest) {
|
||||
val message: Message
|
||||
try {
|
||||
message = net.createMessage(REGISTER_TOPIC, req.sessionID, RegisterResponse(true).serialize().bits)
|
||||
listeners.add(RegisteredListener(req.replyToRecipient, req.sessionID))
|
||||
val stateMessage = StateSnapshotMessage(services.walletService.currentWallet.states.map { it.state.data }.toList(),
|
||||
smm.allStateMachines.map { it.javaClass.name })
|
||||
net.send(net.createMessage(STATE_TOPIC, DEFAULT_SESSION_ID, stateMessage.serialize().bits), req.replyToRecipient)
|
||||
} catch (ex: IllegalStateException) {
|
||||
message = net.createMessage(REGISTER_TOPIC, req.sessionID, RegisterResponse(false).serialize().bits)
|
||||
}
|
||||
net.send(message, req.replyToRecipient)
|
||||
}
|
||||
|
||||
private fun notifyEvent(event: ServiceToClientEvent) = listeners.forEach { monitor ->
|
||||
net.send(net.createMessage(IN_EVENT_TOPIC, monitor.sessionID, event.serialize().bits),
|
||||
monitor.recipients)
|
||||
}
|
||||
|
||||
/**
|
||||
* Notifies the node associated with the [recipient] public key. Returns a future holding a Boolean of whether the
|
||||
* node accepted the transaction or not.
|
||||
*/
|
||||
private fun notifyRecipientAboutTransaction(
|
||||
recipient: PublicKey,
|
||||
transaction: SignedTransaction
|
||||
): ListenableFuture<Unit> {
|
||||
val recipientNodeInfo = services.networkMapCache.getNodeByPublicKey(recipient) ?: throw PublicKeyLookupFailed(recipient)
|
||||
return DataVending.Service.notify(net, services.storageService.myLegalIdentity,
|
||||
recipientNodeInfo, transaction)
|
||||
}
|
||||
|
||||
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
|
||||
private fun initatePayment(req: ClientToServiceCommand.PayCash): TransactionBuildResult {
|
||||
val builder: TransactionBuilder = TransactionType.General.Builder()
|
||||
// TODO: Have some way of restricting this to states the caller controls
|
||||
try {
|
||||
Cash().generateSpend(builder, Amount(req.pennies, req.tokenDef.product), req.owner,
|
||||
services.walletService.currentWallet.statesOfType<Cash.State>(),
|
||||
setOf(req.tokenDef.issuer.party))
|
||||
.forEach {
|
||||
val key = services.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}")
|
||||
builder.signWith(KeyPair(it, key))
|
||||
}
|
||||
val tx = builder.toSignedTransaction()
|
||||
services.walletService.notify(tx.tx)
|
||||
notifyRecipientAboutTransaction(req.owner, tx)
|
||||
return TransactionBuildResult.Complete(tx, "Cash payment completed")
|
||||
} catch(ex: InsufficientBalanceException) {
|
||||
return TransactionBuildResult.Failed(ex.message ?: "Insufficient balance")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
|
||||
private fun exitCash(req: ClientToServiceCommand.ExitCash): TransactionBuildResult {
|
||||
val builder: TransactionBuilder = TransactionType.General.Builder()
|
||||
val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef)
|
||||
Cash().generateExit(builder, Amount(req.pennies, Issued(issuer, req.currency)),
|
||||
issuer.party.owningKey, services.walletService.currentWallet.statesOfType<Cash.State>())
|
||||
builder.signWith(services.storageService.myLegalIdentityKey)
|
||||
val tx = builder.toSignedTransaction()
|
||||
services.walletService.notify(tx.tx)
|
||||
// Notify the owners
|
||||
val inputStatesNullable = services.walletService.statesForRefs(tx.tx.inputs)
|
||||
val inputStates = inputStatesNullable.values.filterNotNull().map { it.data }
|
||||
if (inputStatesNullable.size != inputStates.size) {
|
||||
val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key }
|
||||
throw InputStateRefResolveFailed(unresolvedStateRefs)
|
||||
}
|
||||
inputStates.filterIsInstance<Cash.State>().map { it.owner }.toSet().forEach {
|
||||
notifyRecipientAboutTransaction(it, tx)
|
||||
}
|
||||
return TransactionBuildResult.Complete(tx, "Cash destruction completed")
|
||||
}
|
||||
|
||||
// TODO: Make a lightweight protocol that manages this workflow, rather than embedding it directly in the service
|
||||
private fun issueCash(req: ClientToServiceCommand.IssueCash): TransactionBuildResult {
|
||||
val builder: TransactionBuilder = TransactionType.General.Builder(notary = req.notary)
|
||||
val issuer = PartyAndReference(services.storageService.myLegalIdentity, req.issueRef)
|
||||
Cash().generateIssue(builder, Amount(req.pennies, Issued(issuer, req.currency)), req.recipient, req.notary)
|
||||
builder.signWith(services.storageService.myLegalIdentityKey)
|
||||
val tx = builder.toSignedTransaction()
|
||||
services.walletService.notify(tx.tx)
|
||||
notifyRecipientAboutTransaction(req.recipient, tx)
|
||||
return TransactionBuildResult.Complete(tx, "Cash issuance completed")
|
||||
}
|
||||
|
||||
class PublicKeyLookupFailed(failedPublicKey: PublicKey) :
|
||||
Exception("Failed to lookup public keys $failedPublicKey")
|
||||
|
||||
class InputStateRefResolveFailed(stateRefs: List<StateRef>) :
|
||||
Exception("Failed to resolve input StateRefs $stateRefs")
|
||||
}
|
@ -0,0 +1,209 @@
|
||||
package com.r3corda.node.services
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.r3corda.contracts.asset.Cash
|
||||
import com.r3corda.core.contracts.*
|
||||
import com.r3corda.core.crypto.SecureHash
|
||||
import com.r3corda.core.crypto.newSecureRandom
|
||||
import com.r3corda.core.messaging.MessageHandlerRegistration
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
|
||||
import com.r3corda.core.node.services.Wallet
|
||||
import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.OpaqueBytes
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.core.serialization.serialize
|
||||
import com.r3corda.core.testing.DUMMY_NOTARY
|
||||
import com.r3corda.core.testing.DUMMY_PUBKEY_1
|
||||
import com.r3corda.node.internal.testing.MockNetwork
|
||||
import com.r3corda.node.services.monitor.*
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.util.*
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFalse
|
||||
import kotlin.test.assertTrue
|
||||
import kotlin.test.fail
|
||||
|
||||
/**
|
||||
* Unit tests for the wallet monitoring service.
|
||||
*/
|
||||
class WalletMonitorServiceTests {
|
||||
lateinit var network: MockNetwork
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
network = MockNetwork()
|
||||
}
|
||||
|
||||
/**
|
||||
* Authenticate the register node with the monitor service node.
|
||||
*/
|
||||
private fun authenticate(monitorServiceNode: MockNetwork.MockNode, registerNode: MockNetwork.MockNode): Long {
|
||||
network.runNetwork()
|
||||
val sessionID = random63BitValue()
|
||||
val authenticatePsm = registerNode.smm.add(WalletMonitorService.REGISTER_TOPIC,
|
||||
TestRegisterPSM(monitorServiceNode.info, sessionID))
|
||||
network.runNetwork()
|
||||
authenticatePsm.get(1, TimeUnit.SECONDS)
|
||||
return sessionID
|
||||
}
|
||||
|
||||
class TestReceiveWalletUpdatePSM(val sessionID: Long)
|
||||
: ProtocolLogic<ServiceToClientEvent.OutputState>() {
|
||||
override val topic: String get() = WalletMonitorService.IN_EVENT_TOPIC
|
||||
@Suspendable
|
||||
override fun call(): ServiceToClientEvent.OutputState
|
||||
= receive<ServiceToClientEvent.OutputState>(sessionID).validate { it }
|
||||
}
|
||||
|
||||
class TestRegisterPSM(val server: NodeInfo, val sessionID: Long)
|
||||
: ProtocolLogic<RegisterResponse>() {
|
||||
override val topic: String get() = WalletMonitorService.REGISTER_TOPIC
|
||||
@Suspendable
|
||||
override fun call(): RegisterResponse {
|
||||
val req = RegisterRequest(serviceHub.networkService.myAddress, sessionID)
|
||||
return sendAndReceive<RegisterResponse>(server.identity, 0, sessionID, req).validate { it }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a very simple case of trying to register against the service.
|
||||
*/
|
||||
@Test
|
||||
fun `success with network`() {
|
||||
val (monitorServiceNode, registerNode) = network.createTwoNodes()
|
||||
|
||||
network.runNetwork()
|
||||
val sessionID = random63BitValue()
|
||||
val authenticatePsm = registerNode.smm.add(WalletMonitorService.REGISTER_TOPIC,
|
||||
TestRegisterPSM(monitorServiceNode.info, sessionID))
|
||||
network.runNetwork()
|
||||
val result = authenticatePsm.get(1, TimeUnit.SECONDS)
|
||||
assertTrue(result.success)
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that having registered, changes are relayed correctly.
|
||||
*/
|
||||
@Test
|
||||
fun `event received`() {
|
||||
val (monitorServiceNode, registerNode) = network.createTwoNodes()
|
||||
val sessionID = authenticate(monitorServiceNode, registerNode)
|
||||
var receivePsm = registerNode.smm.add(WalletMonitorService.IN_EVENT_TOPIC,
|
||||
TestReceiveWalletUpdatePSM(sessionID))
|
||||
var expected = Wallet.Update(emptySet(), emptySet())
|
||||
monitorServiceNode.inNodeWalletMonitorService!!.notifyWalletUpdate(expected)
|
||||
network.runNetwork()
|
||||
var actual = receivePsm.get(1, TimeUnit.SECONDS)
|
||||
assertEquals(expected.consumed, actual.consumed)
|
||||
assertEquals(expected.produced, actual.produced)
|
||||
|
||||
// Check that states are passed through correctly
|
||||
receivePsm = registerNode.smm.add(WalletMonitorService.IN_EVENT_TOPIC,
|
||||
TestReceiveWalletUpdatePSM(sessionID))
|
||||
val consumed = setOf(StateRef(SecureHash.randomSHA256(), 0))
|
||||
val producedState = TransactionState(DummyContract.SingleOwnerState(newSecureRandom().nextInt(), DUMMY_PUBKEY_1), DUMMY_NOTARY)
|
||||
val produced = setOf(StateAndRef(producedState, StateRef(SecureHash.randomSHA256(), 0)))
|
||||
expected = Wallet.Update(consumed, produced)
|
||||
monitorServiceNode.inNodeWalletMonitorService!!.notifyWalletUpdate(expected)
|
||||
network.runNetwork()
|
||||
actual = receivePsm.get(1, TimeUnit.SECONDS)
|
||||
assertEquals(expected.produced, actual.produced)
|
||||
assertEquals(expected.consumed, actual.consumed)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `cash issue accepted`() {
|
||||
val (monitorServiceNode, registerNode) = network.createTwoNodes()
|
||||
val sessionID = authenticate(monitorServiceNode, registerNode)
|
||||
val quantity = 1000L
|
||||
val events = Collections.synchronizedList(ArrayList<ServiceToClientEvent>())
|
||||
val ref = OpaqueBytes(ByteArray(1) {1})
|
||||
|
||||
registerNode.net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC + ".0") { msg, reg ->
|
||||
events.add(msg.data.deserialize<ServiceToClientEvent>())
|
||||
}
|
||||
|
||||
// Check the monitoring service wallet is empty
|
||||
assertFalse(monitorServiceNode.services.walletService.currentWallet.states.iterator().hasNext())
|
||||
|
||||
// Tell the monitoring service node to issue some cash
|
||||
val recipientKey = monitorServiceNode.services.storageService.myLegalIdentityKey.public
|
||||
val outEvent = ClientToServiceCommand.IssueCash(GBP, ref, quantity, recipientKey, DUMMY_NOTARY)
|
||||
val message = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
|
||||
ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits)
|
||||
registerNode.net.send(message, monitorServiceNode.net.myAddress)
|
||||
network.runNetwork()
|
||||
|
||||
// Check we've received a response
|
||||
events.forEach { event ->
|
||||
when (event) {
|
||||
is ServiceToClientEvent.TransactionBuild -> {
|
||||
// Check the returned event is correct
|
||||
val actual = event.state as TransactionBuildResult.Complete
|
||||
val expected = TransactionBuildResult.Complete(actual.transaction, null)
|
||||
assertEquals(expected, actual)
|
||||
}
|
||||
is ServiceToClientEvent.OutputState -> {
|
||||
// Check the generated state is correct
|
||||
val actual = event.produced.single().state.data
|
||||
val expected = Cash.State(Amount(quantity,
|
||||
Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)),
|
||||
recipientKey)
|
||||
assertEquals(expected, actual)
|
||||
}
|
||||
else -> fail("Unexpected in event ${event}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `cash move accepted`() {
|
||||
val (monitorServiceNode, registerNode) = network.createTwoNodes()
|
||||
val sessionID = authenticate(monitorServiceNode, registerNode)
|
||||
val quantity = 1000L
|
||||
val events = Collections.synchronizedList(ArrayList<ServiceToClientEvent>())
|
||||
val ref = OpaqueBytes(ByteArray(1) {1})
|
||||
var handlerReg: MessageHandlerRegistration? = null
|
||||
|
||||
registerNode.net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC + ".0") { msg, reg ->
|
||||
events.add(msg.data.deserialize<ServiceToClientEvent>())
|
||||
handlerReg = reg
|
||||
}
|
||||
|
||||
// Check the monitoring service wallet is empty
|
||||
assertFalse(monitorServiceNode.services.walletService.currentWallet.states.iterator().hasNext())
|
||||
|
||||
// Tell the monitoring service node to issue some cash
|
||||
val recipientKey = monitorServiceNode.services.storageService.myLegalIdentityKey.public
|
||||
val outEvent = ClientToServiceCommand.IssueCash(GBP, ref, quantity, recipientKey, DUMMY_NOTARY)
|
||||
val message = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
|
||||
ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits)
|
||||
registerNode.net.send(message, monitorServiceNode.net.myAddress)
|
||||
network.runNetwork()
|
||||
|
||||
// Check we've received a response
|
||||
events.forEach { event ->
|
||||
when (event) {
|
||||
is ServiceToClientEvent.TransactionBuild -> {
|
||||
// Check the returned event is correct
|
||||
val actual = event.state as TransactionBuildResult.Complete
|
||||
val expected = TransactionBuildResult.Complete(actual.transaction, null)
|
||||
assertEquals(expected, actual)
|
||||
}
|
||||
is ServiceToClientEvent.OutputState -> {
|
||||
// Check the generated state is correct
|
||||
val actual = event.produced.single().state.data
|
||||
val expected = Cash.State(Amount(quantity,
|
||||
Issued(monitorServiceNode.services.storageService.myLegalIdentity.ref(ref), GBP)),
|
||||
recipientKey)
|
||||
assertEquals(expected, actual)
|
||||
}
|
||||
else -> fail("Unexpected in event ${event}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user