mirror of
https://github.com/corda/corda.git
synced 2025-01-21 03:55:00 +00:00
Rename wallet monitor service to node monitor service
This commit is contained in:
parent
1d53e59c7d
commit
d38392093f
@ -15,9 +15,9 @@ import org.slf4j.LoggerFactory
|
||||
import rx.subjects.PublishSubject
|
||||
import kotlin.test.fail
|
||||
|
||||
val log: Logger = LoggerFactory.getLogger(WalletMonitorClientTests::class.java)
|
||||
val log: Logger = LoggerFactory.getLogger(NodeMonitorClientTests::class.java)
|
||||
|
||||
class WalletMonitorClientTests {
|
||||
class NodeMonitorClientTests {
|
||||
@Test
|
||||
fun cashIssueWorksEndToEnd() {
|
||||
driver {
|
||||
@ -34,7 +34,7 @@ class WalletMonitorClientTests {
|
||||
val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
|
||||
val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
|
||||
|
||||
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create())
|
||||
val aliceMonitorClient = NodeMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create())
|
||||
require(aliceMonitorClient.register().get())
|
||||
|
||||
aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
|
||||
@ -77,7 +77,7 @@ class WalletMonitorClientTests {
|
||||
val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
|
||||
val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
|
||||
|
||||
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create())
|
||||
val aliceMonitorClient = NodeMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create())
|
||||
require(aliceMonitorClient.register().get())
|
||||
|
||||
aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
|
||||
@ -185,7 +185,7 @@ class WalletMonitorClientTests {
|
||||
val aliceInStream = PublishSubject.create<ServiceToClientEvent>()
|
||||
val aliceOutStream = PublishSubject.create<ClientToServiceCommand>()
|
||||
|
||||
val aliceMonitorClient = WalletMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create())
|
||||
val aliceMonitorClient = NodeMonitorClient(client, aliceNode, aliceOutStream, aliceInStream, PublishSubject.create())
|
||||
require(aliceMonitorClient.register().get())
|
||||
|
||||
aliceOutStream.onNext(ClientToServiceCommand.IssueCash(
|
@ -18,9 +18,9 @@ import rx.Observer
|
||||
* Worked example of a client which communicates with the wallet monitor service.
|
||||
*/
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger("WalletMonitorClient")
|
||||
private val log: Logger = LoggerFactory.getLogger(NodeMonitorClient::class.java)
|
||||
|
||||
class WalletMonitorClient(
|
||||
class NodeMonitorClient(
|
||||
val net: MessagingService,
|
||||
val node: NodeInfo,
|
||||
val outEvents: Observable<ClientToServiceCommand>,
|
||||
@ -33,29 +33,29 @@ class WalletMonitorClient(
|
||||
|
||||
val future = SettableFuture.create<Boolean>()
|
||||
log.info("Registering with ID $sessionID. I am ${net.myAddress}")
|
||||
net.addMessageHandler(WalletMonitorService.REGISTER_TOPIC, sessionID) { msg, reg ->
|
||||
net.addMessageHandler(NodeMonitorService.REGISTER_TOPIC, sessionID) { msg, reg ->
|
||||
val resp = msg.data.deserialize<RegisterResponse>()
|
||||
net.removeMessageHandler(reg)
|
||||
future.set(resp.success)
|
||||
}
|
||||
net.addMessageHandler(WalletMonitorService.STATE_TOPIC, sessionID) { msg, reg ->
|
||||
net.addMessageHandler(NodeMonitorService.STATE_TOPIC, sessionID) { msg, reg ->
|
||||
val snapshotMessage = msg.data.deserialize<StateSnapshotMessage>()
|
||||
net.removeMessageHandler(reg)
|
||||
snapshot.onNext(snapshotMessage)
|
||||
}
|
||||
|
||||
net.addMessageHandler(WalletMonitorService.IN_EVENT_TOPIC, sessionID) { msg, reg ->
|
||||
net.addMessageHandler(NodeMonitorService.IN_EVENT_TOPIC, sessionID) { msg, reg ->
|
||||
val event = msg.data.deserialize<ServiceToClientEvent>()
|
||||
inEvents.onNext(event)
|
||||
}
|
||||
|
||||
val req = RegisterRequest(net.myAddress, sessionID)
|
||||
val registerMessage = net.createMessage(WalletMonitorService.REGISTER_TOPIC, 0, req.serialize().bits)
|
||||
val registerMessage = net.createMessage(NodeMonitorService.REGISTER_TOPIC, 0, req.serialize().bits)
|
||||
net.send(registerMessage, node.address)
|
||||
|
||||
outEvents.subscribe { event ->
|
||||
val envelope = ClientToServiceCommandMessage(sessionID, net.myAddress, event)
|
||||
val message = net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, 0, envelope.serialize().bits)
|
||||
val message = net.createMessage(NodeMonitorService.OUT_EVENT_TOPIC, 0, envelope.serialize().bits)
|
||||
net.send(message, node.address)
|
||||
}
|
||||
|
@ -23,8 +23,8 @@ sealed class StatesModification<out T : ContractState>{
|
||||
* This model exposes the list of owned contract states.
|
||||
*/
|
||||
class ContractStateModel {
|
||||
private val serviceToClient: Observable<ServiceToClientEvent> by observable(WalletMonitorModel::serviceToClient)
|
||||
private val snapshot: Observable<StateSnapshotMessage> by observable(WalletMonitorModel::snapshot)
|
||||
private val serviceToClient: Observable<ServiceToClientEvent> by observable(NodeMonitorModel::serviceToClient)
|
||||
private val snapshot: Observable<StateSnapshotMessage> by observable(NodeMonitorModel::snapshot)
|
||||
private val outputStates = serviceToClient.ofType(ServiceToClientEvent.OutputState::class.java)
|
||||
|
||||
val contractStatesDiff: Observable<StatesModification.Diff<ContractState>> =
|
||||
|
@ -57,7 +57,7 @@ data class GatheredTransactionDataWritable(
|
||||
*/
|
||||
class GatheredTransactionDataModel {
|
||||
|
||||
private val serviceToClient: Observable<ServiceToClientEvent> by observable(WalletMonitorModel::serviceToClient)
|
||||
private val serviceToClient: Observable<ServiceToClientEvent> by observable(NodeMonitorModel::serviceToClient)
|
||||
|
||||
/**
|
||||
* Aggregation of updates to transactions. We use the observable list as the only container and do linear search for
|
||||
|
@ -1,6 +1,6 @@
|
||||
package com.r3corda.client.model
|
||||
|
||||
import com.r3corda.client.WalletMonitorClient
|
||||
import com.r3corda.client.NodeMonitorClient
|
||||
import com.r3corda.core.contracts.ClientToServiceCommand
|
||||
import com.r3corda.core.messaging.MessagingService
|
||||
import com.r3corda.core.node.NodeInfo
|
||||
@ -11,9 +11,9 @@ import rx.Observer
|
||||
import rx.subjects.PublishSubject
|
||||
|
||||
/**
|
||||
* This model exposes raw event streams to and from the [WalletMonitorService] through a [WalletMonitorClient]
|
||||
* This model exposes raw event streams to and from the [NodeMonitorService] through a [NodeMonitorClient]
|
||||
*/
|
||||
class WalletMonitorModel {
|
||||
class NodeMonitorModel {
|
||||
private val clientToServiceSource = PublishSubject.create<ClientToServiceCommand>()
|
||||
val clientToService: Observer<ClientToServiceCommand> = clientToServiceSource
|
||||
|
||||
@ -26,13 +26,13 @@ class WalletMonitorModel {
|
||||
/**
|
||||
* Register for updates to/from a given wallet.
|
||||
* @param messagingService The messaging to use for communication.
|
||||
* @param walletMonitorNodeInfo the [Node] to connect to.
|
||||
* @param monitorNodeInfo the [Node] to connect to.
|
||||
* TODO provide an unsubscribe mechanism
|
||||
*/
|
||||
fun register(messagingService: MessagingService, walletMonitorNodeInfo: NodeInfo) {
|
||||
val monitorClient = WalletMonitorClient(
|
||||
fun register(messagingService: MessagingService, monitorNodeInfo: NodeInfo) {
|
||||
val monitorClient = NodeMonitorClient(
|
||||
messagingService,
|
||||
walletMonitorNodeInfo,
|
||||
monitorNodeInfo,
|
||||
clientToServiceSource,
|
||||
serviceToClientSource,
|
||||
snapshotSource
|
@ -1,11 +1,11 @@
|
||||
package com.r3corda.explorer
|
||||
|
||||
import com.r3corda.client.WalletMonitorClient
|
||||
import com.r3corda.client.NodeMonitorClient
|
||||
import com.r3corda.client.mock.EventGenerator
|
||||
import com.r3corda.client.mock.Generator
|
||||
import com.r3corda.client.mock.oneOf
|
||||
import com.r3corda.client.model.Models
|
||||
import com.r3corda.client.model.WalletMonitorModel
|
||||
import com.r3corda.client.model.NodeMonitorModel
|
||||
import com.r3corda.client.model.observer
|
||||
import com.r3corda.core.contracts.ClientToServiceCommand
|
||||
import com.r3corda.explorer.model.IdentityModel
|
||||
@ -22,7 +22,7 @@ import java.util.*
|
||||
|
||||
class Main : App() {
|
||||
override val primaryView = MainWindow::class
|
||||
val aliceOutStream: Observer<ClientToServiceCommand> by observer(WalletMonitorModel::clientToService)
|
||||
val aliceOutStream: Observer<ClientToServiceCommand> by observer(NodeMonitorModel::clientToService)
|
||||
|
||||
override fun start(stage: Stage) {
|
||||
|
||||
@ -51,13 +51,13 @@ class Main : App() {
|
||||
val aliceClient = startClient(aliceNode).get()
|
||||
|
||||
Models.get<IdentityModel>(Main::class).myIdentity.set(aliceNode.identity)
|
||||
Models.get<WalletMonitorModel>(Main::class).register(aliceClient, aliceNode)
|
||||
Models.get<NodeMonitorModel>(Main::class).register(aliceClient, aliceNode)
|
||||
|
||||
val bobInStream = PublishSubject.create<ServiceToClientEvent>()
|
||||
val bobOutStream = PublishSubject.create<ClientToServiceCommand>()
|
||||
|
||||
val bobClient = startClient(bobNode).get()
|
||||
val bobMonitorClient = WalletMonitorClient(bobClient, bobNode, bobOutStream, bobInStream, PublishSubject.create())
|
||||
val bobMonitorClient = NodeMonitorClient(bobClient, bobNode, bobOutStream, bobInStream, PublishSubject.create())
|
||||
assert(bobMonitorClient.register().get())
|
||||
|
||||
for (i in 0 .. 10000) {
|
||||
|
@ -29,7 +29,7 @@ import com.r3corda.node.services.events.NodeSchedulerService
|
||||
import com.r3corda.node.services.events.ScheduledActivityObserver
|
||||
import com.r3corda.node.services.identity.InMemoryIdentityService
|
||||
import com.r3corda.node.services.keys.PersistentKeyManagementService
|
||||
import com.r3corda.node.services.monitor.WalletMonitorService
|
||||
import com.r3corda.node.services.monitor.NodeMonitorService
|
||||
import com.r3corda.node.services.network.InMemoryNetworkMapCache
|
||||
import com.r3corda.node.services.network.NetworkMapService
|
||||
import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC
|
||||
@ -124,7 +124,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
lateinit var vault: VaultService
|
||||
lateinit var keyManagement: KeyManagementService
|
||||
var inNodeNetworkMapService: NetworkMapService? = null
|
||||
var inNodeWalletMonitorService: WalletMonitorService? = null
|
||||
var inNodeMonitorService: NodeMonitorService? = null
|
||||
var inNodeNotaryService: NotaryService? = null
|
||||
var uniquenessProvider: UniquenessProvider? = null
|
||||
lateinit var identity: IdentityService
|
||||
@ -205,7 +205,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
inNodeWalletMonitorService = makeWalletMonitorService() // Note this HAS to be after smm is set
|
||||
inNodeMonitorService = makeMonitorService() // Note this HAS to be after smm is set
|
||||
buildAdvertisedServices()
|
||||
|
||||
// TODO: this model might change but for now it provides some de-coupling
|
||||
@ -374,7 +374,7 @@ abstract class AbstractNode(val dir: Path, val configuration: NodeConfiguration,
|
||||
// TODO: sort out ordering of open & protected modifiers of functions in this class.
|
||||
protected open fun makeVaultService(): VaultService = NodeVaultService(services)
|
||||
|
||||
protected open fun makeWalletMonitorService(): WalletMonitorService = WalletMonitorService(services, smm)
|
||||
protected open fun makeMonitorService(): NodeMonitorService = NodeMonitorService(services, smm)
|
||||
|
||||
open fun stop() {
|
||||
// TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the
|
||||
|
@ -26,7 +26,7 @@ import java.util.*
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* Service which allows external clients to monitor the wallet service and state machine manager, as well as trigger
|
||||
* Service which allows external clients to monitor the node's vault and state machine manager, as well as trigger
|
||||
* actions within the node. The service also sends requests for user input back to clients, for example to enter
|
||||
* additional information while a protocol runs, or confirm an action.
|
||||
*
|
||||
@ -38,15 +38,15 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
// TODO: Clients need to be able to indicate whether they support interactivity (no point in sending requests for input
|
||||
// to a monitoring tool)
|
||||
@ThreadSafe
|
||||
class WalletMonitorService(services: ServiceHubInternal, val smm: StateMachineManager) : AbstractNodeService(services) {
|
||||
class NodeMonitorService(services: ServiceHubInternal, val smm: StateMachineManager) : AbstractNodeService(services) {
|
||||
companion object {
|
||||
val REGISTER_TOPIC = "platform.wallet_monitor.register"
|
||||
val DEREGISTER_TOPIC = "platform.wallet_monitor.deregister"
|
||||
val STATE_TOPIC = "platform.wallet_monitor.state_snapshot"
|
||||
val IN_EVENT_TOPIC = "platform.wallet_monitor.in"
|
||||
val OUT_EVENT_TOPIC = "platform.wallet_monitor.out"
|
||||
val REGISTER_TOPIC = "platform.monitor.register"
|
||||
val DEREGISTER_TOPIC = "platform.monitor.deregister"
|
||||
val STATE_TOPIC = "platform.monitor.state_snapshot"
|
||||
val IN_EVENT_TOPIC = "platform.monitor.in"
|
||||
val OUT_EVENT_TOPIC = "platform.monitor.out"
|
||||
|
||||
val logger = loggerFor<WalletMonitorService>()
|
||||
val logger = loggerFor<NodeMonitorService>()
|
||||
}
|
||||
|
||||
val listeners: MutableSet<RegisteredListener> = HashSet()
|
||||
@ -130,7 +130,7 @@ class WalletMonitorService(services: ServiceHubInternal, val smm: StateMachineMa
|
||||
|
||||
/**
|
||||
* Process a request from a monitor to add them to the subscribers. This includes hooks to authenticate the request,
|
||||
* but currently all requests pass (and there's no access control on wallets, so it has no actual meaning).
|
||||
* but currently all requests pass (and there's no access control on vaults, so it has no actual meaning).
|
||||
*/
|
||||
fun processRegisterRequest(req: RegisterRequest) {
|
||||
try {
|
@ -14,8 +14,8 @@ import com.r3corda.core.serialization.serialize
|
||||
import com.r3corda.core.utilities.DUMMY_NOTARY
|
||||
import com.r3corda.core.utilities.DUMMY_PUBKEY_1
|
||||
import com.r3corda.node.services.monitor.*
|
||||
import com.r3corda.node.services.monitor.WalletMonitorService.Companion.IN_EVENT_TOPIC
|
||||
import com.r3corda.node.services.monitor.WalletMonitorService.Companion.REGISTER_TOPIC
|
||||
import com.r3corda.node.services.monitor.NodeMonitorService.Companion.IN_EVENT_TOPIC
|
||||
import com.r3corda.node.services.monitor.NodeMonitorService.Companion.REGISTER_TOPIC
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.testing.expect
|
||||
import com.r3corda.testing.expectEvents
|
||||
@ -33,9 +33,9 @@ import kotlin.test.assertNotNull
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
/**
|
||||
* Unit tests for the wallet monitoring service.
|
||||
* Unit tests for the node monitoring service.
|
||||
*/
|
||||
class WalletMonitorServiceTests {
|
||||
class NodeMonitorServiceTests {
|
||||
lateinit var network: MockNetwork
|
||||
|
||||
@Before
|
||||
@ -78,7 +78,7 @@ class WalletMonitorServiceTests {
|
||||
val sessionID = authenticate(monitorServiceNode, registerNode)
|
||||
var receivePsm = receiveWalletUpdate(registerNode, sessionID)
|
||||
var expected = Vault.Update(emptySet(), emptySet())
|
||||
monitorServiceNode.inNodeWalletMonitorService!!.notifyVaultUpdate(expected)
|
||||
monitorServiceNode.inNodeMonitorService!!.notifyVaultUpdate(expected)
|
||||
network.runNetwork()
|
||||
var actual = receivePsm.get(1, TimeUnit.SECONDS)
|
||||
assertEquals(expected.consumed, actual.consumed)
|
||||
@ -90,7 +90,7 @@ class WalletMonitorServiceTests {
|
||||
val producedState = TransactionState(DummyContract.SingleOwnerState(newSecureRandom().nextInt(), DUMMY_PUBKEY_1), DUMMY_NOTARY)
|
||||
val produced = setOf(StateAndRef(producedState, StateRef(SecureHash.randomSHA256(), 0)))
|
||||
expected = Vault.Update(consumed, produced)
|
||||
monitorServiceNode.inNodeWalletMonitorService!!.notifyVaultUpdate(expected)
|
||||
monitorServiceNode.inNodeMonitorService!!.notifyVaultUpdate(expected)
|
||||
network.runNetwork()
|
||||
actual = receivePsm.get(1, TimeUnit.SECONDS)
|
||||
assertEquals(expected.produced, actual.produced)
|
||||
@ -115,7 +115,7 @@ class WalletMonitorServiceTests {
|
||||
// Tell the monitoring service node to issue some cash
|
||||
val recipient = monitorServiceNode.services.storageService.myLegalIdentity
|
||||
val outEvent = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), ref, recipient, DUMMY_NOTARY)
|
||||
val message = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
|
||||
val message = registerNode.net.createMessage(NodeMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
|
||||
ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, outEvent).serialize().bits)
|
||||
registerNode.net.send(message, monitorServiceNode.net.myAddress)
|
||||
network.runNetwork()
|
||||
@ -166,11 +166,11 @@ class WalletMonitorServiceTests {
|
||||
|
||||
// Tell the monitoring service node to issue some cash so we can spend it later
|
||||
val issueCommand = ClientToServiceCommand.IssueCash(Amount(quantity, GBP), OpaqueBytes.of(0), recipient, recipient)
|
||||
val issueMessage = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
|
||||
val issueMessage = registerNode.net.createMessage(NodeMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
|
||||
ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, issueCommand).serialize().bits)
|
||||
registerNode.net.send(issueMessage, monitorServiceNode.net.myAddress)
|
||||
val payCommand = ClientToServiceCommand.PayCash(Amount(quantity, Issued(recipient.ref(0), GBP)), recipient)
|
||||
val payMessage = registerNode.net.createMessage(WalletMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
|
||||
val payMessage = registerNode.net.createMessage(NodeMonitorService.OUT_EVENT_TOPIC, DEFAULT_SESSION_ID,
|
||||
ClientToServiceCommandMessage(sessionID, registerNode.net.myAddress, payCommand).serialize().bits)
|
||||
registerNode.net.send(payMessage, monitorServiceNode.net.myAddress)
|
||||
network.runNetwork()
|
Loading…
Reference in New Issue
Block a user