diff --git a/docs/source/messaging.rst b/docs/source/messaging.rst index 242e77b674..ebba7c3013 100644 --- a/docs/source/messaging.rst +++ b/docs/source/messaging.rst @@ -24,21 +24,17 @@ Network Map Service Supporting the messaging layer is a network map service, which is responsible for tracking public nodes on the network. -Nodes have an internal component, the network map cache, which contains a copy of the network map (which is just a -document). When a node starts up its cache fetches a copy of the full network map, and requests to be notified of -changes. The node then registers itself with the network map service, and the service notifies subscribers that a new -node has joined the network. Nodes do not automatically deregister themselves, so (for example) nodes going offline -briefly for maintenance are retained in the network map, and messages for them will be queued, minimising disruption. +Nodes have an internal component, the network map cache, which contains a copy of the network map (which is backed up in the database +to persist that information across the restarts in case the network map server is down). When a node starts up its cache +fetches a copy of the full network map (from the server or from filesystem for development mode). After that it polls on +regular time interval for network map and applies any related changes locally. +Nodes do not automatically deregister themselves, so (for example) nodes going offline briefly for maintenance are retained +in the network map, and messages for them will be queued, minimising disruption. -Nodes submit signed changes to the map service, which then forwards update notifications on to nodes which have -requested to be notified of changes. - -The network map currently supports: - -* Looking up nodes by service -* Looking up node for a party -* Suggesting a node providing a specific service, based on suitability for a contract and parties, for example suggesting - an appropriate interest rates oracle for an interest rate swap contract. Currently no recommendation logic is in place. +Additionally, on every restart and on daily basis nodes submit signed `NodeInfo`s to the map service. When network map gets +signed, these changes are distributed as new network data. `NodeInfo` republishing is treated as a heartbeat from the node, +based on that network map service is able to figure out which nodes can be considered as stale and removed from the network +map document after `eventHorizon` time. Message queues -------------- diff --git a/docs/source/network-map.rst b/docs/source/network-map.rst index 409b91f84a..e5f227960f 100644 --- a/docs/source/network-map.rst +++ b/docs/source/network-map.rst @@ -191,3 +191,19 @@ Then node can be started as usual. At some point in time, nodes will gradually j information on business relations with operators. Private networks are not separate networks, nodes are still part of bigger compatibility zone, only hidden. We reuse all the infrastructure of the compatibility zone like notaries, permissioning service, so the interoperability between nodes is kept. + +Cleaning the network map cache +------------------------------ + +Sometimes it may happen that the node ends up with an inconsistent view of the network. This can occur due to changes in deployment +leading to stale data in the database, different data distribution time and mistakes in configuration. For these unlikely +events both RPC method and command line option for clearing local network map cache database exist. To use them +you either need to run from the command line: + +.. code-block:: shell + + java -jar corda.jar --clear-network-map-cache + +or call RPC method `clearNetworkMapCache` (it can be invoked through the node's shell as `run clearNetworkMapCache`, for more information on +how to log into node's shell see :doc:`shell`). As we are testing and hardening the implementation this step shouldn't be required. +After cleaning the cache, network map data is restored on the next poll from the server or filesystem. diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 7bf7e9e005..eb7b1ad4e9 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -364,6 +364,10 @@ class SingleThreadedStateMachineManager( null } externalEventMutex.withLock { + // Remove any sessions the old flow has. + for (sessionId in getFlowSessionIds(currentState.checkpoint)) { + sessionToFlow.remove(sessionId) + } if (flow != null) addAndStartFlow(flowId, flow) // Deliver all the external events from the old flow instance. val unprocessedExternalEvents = mutableListOf() diff --git a/tools/shell/src/main/java/net/corda/tools/shell/FlowShellCommand.java b/tools/shell/src/main/java/net/corda/tools/shell/FlowShellCommand.java index f56c80cd54..6e54d85f41 100644 --- a/tools/shell/src/main/java/net/corda/tools/shell/FlowShellCommand.java +++ b/tools/shell/src/main/java/net/corda/tools/shell/FlowShellCommand.java @@ -46,7 +46,7 @@ public class FlowShellCommand extends InteractiveShellCommand { @Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name, @Usage("The data to pass as input") @Argument(unquote = false) List input ) { - logger.info("Executing command \"flow start {} {}\",", name, input.stream().collect(joining(" "))); + logger.info("Executing command \"flow start {} {}\",", name, (input != null) ? input.stream().collect(joining(" ")) : ""); startFlow(name, input, out, ops(), ansiProgressRenderer(), objectMapper()); } diff --git a/tools/shell/src/main/java/net/corda/tools/shell/RunShellCommand.java b/tools/shell/src/main/java/net/corda/tools/shell/RunShellCommand.java index 2ea6b67ac3..d25a2cf028 100644 --- a/tools/shell/src/main/java/net/corda/tools/shell/RunShellCommand.java +++ b/tools/shell/src/main/java/net/corda/tools/shell/RunShellCommand.java @@ -47,7 +47,7 @@ public class RunShellCommand extends InteractiveShellCommand { ) @Usage("runs a method from the CordaRPCOps interface on the node.") public Object main(InvocationContext context, @Usage("The command to run") @Argument(unquote = false) List command) { - logger.info("Executing command \"run {}\",", command.stream().collect(joining(" "))); + logger.info("Executing command \"run {}\",", (command != null) ? command.stream().collect(joining(" ")) : ""); StringToMethodCallParser parser = new StringToMethodCallParser<>(CordaRPCOps.class, objectMapper()); if (command == null) { diff --git a/tools/shell/src/main/java/net/corda/tools/shell/StartShellCommand.java b/tools/shell/src/main/java/net/corda/tools/shell/StartShellCommand.java index 148e26bf84..b0aad905ca 100644 --- a/tools/shell/src/main/java/net/corda/tools/shell/StartShellCommand.java +++ b/tools/shell/src/main/java/net/corda/tools/shell/StartShellCommand.java @@ -31,7 +31,7 @@ public class StartShellCommand extends InteractiveShellCommand { public void main(@Usage("The class name of the flow to run, or an unambiguous substring") @Argument String name, @Usage("The data to pass as input") @Argument(unquote = false) List input) { - logger.info("Executing command \"start {} {}\",", name, input.stream().collect(joining(" "))); + logger.info("Executing command \"start {} {}\",", name, (input != null) ? input.stream().collect(joining(" ")) : ""); ANSIProgressRenderer ansiProgressRenderer = ansiProgressRenderer(); FlowShellCommand.startFlow(name, input, out, ops(), ansiProgressRenderer != null ? ansiProgressRenderer : new CRaSHANSIProgressRenderer(out), objectMapper()); } diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index ce4dcc711a..0c807c5ea5 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -291,6 +291,7 @@ object InteractiveShell { while (!Thread.currentThread().isInterrupted) { try { latch.await() + break } catch (e: InterruptedException) { try { rpcOps.killFlow(stateObservable.id)