From 65ff2141302e76747129dfb889b50e319ed7cf41 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Tue, 3 Apr 2018 17:30:53 +0100 Subject: [PATCH 1/5] CORDA-1276: Cleaned up creation of node-info object on node start and when using --just-generate-node-info (#2909) --- .../net/corda/node/internal/AbstractNode.kt | 83 +++++++++++++------ .../services/network/NetworkMapUpdater.kt | 67 +++++---------- .../net/corda/node/internal/NodeTest.kt | 13 +-- .../services/network/NetworkMapUpdaterTest.kt | 40 +-------- 4 files changed, 84 insertions(+), 119 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index ef1bbceeb0..e244117dae 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -59,9 +59,11 @@ import net.corda.node.services.vault.NodeVaultService import net.corda.node.services.vault.VaultSoftLockManager import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.JVMAgentRegistry +import net.corda.node.utilities.NamedThreadFactory import net.corda.node.utilities.NodeBuildProperties import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.NodeInfoAndSigned +import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig @@ -87,6 +89,7 @@ import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit.SECONDS import kotlin.collections.set import kotlin.reflect.KClass @@ -143,7 +146,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected val runOnStop = ArrayList<() -> Any?>() private val _nodeReadyFuture = openFuture() protected var networkMapClient: NetworkMapClient? = null - protected lateinit var networkMapUpdater: NetworkMapUpdater + private lateinit var networkMapUpdater: NetworkMapUpdater lateinit var securityManager: RPCSecurityManager private val shutdownExecutor = Executors.newSingleThreadExecutor() @@ -182,15 +185,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)).use { it.transaction { - // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like a code smell. + // TODO The fact that we need to specify an empty list of notaries just to generate our node info looks + // like a design smell. val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList()) persistentNetworkMapCache.start() - val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair) - val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised -> - val privateKey = keyPairs.single { it.public == publicKey }.private - privateKey.sign(serialised.bytes) - } - NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned) + val (_, nodeInfo) = updateNodeInfo(persistentNetworkMapCache, null, identity, identityKeyPair) nodeInfo } } @@ -204,15 +203,18 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val schemaService = NodeSchemaService(cordappLoader.cordappSchemas, configuration.notary != null) val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null) val identityService = makeIdentityService(identity.certificate) + networkMapClient = configuration.compatibilityZoneURL?.let { NetworkMapClient(it, identityService.trustRoot) } + val networkParameters = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory).networkParameters check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) { "Node's platform version is lower than network's required minimumPlatformVersion" } + // Do all of this in a database transaction so anything that might need a connection has one. val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService).transaction { val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService) - val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair) + val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair) identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts) val metrics = MetricRegistry() val transactionStorage = makeTransactionStorage(database, configuration.transactionCacheSizeBytes) @@ -247,14 +249,16 @@ abstract class AbstractNode(val configuration: NodeConfiguration, flowLogicRefFactory = flowLogicRefFactory, drainingModePollPeriod = configuration.drainingModePollPeriod, nodeProperties = nodeProperties) - if (serverThread is ExecutorService) { + + (serverThread as? ExecutorService)?.let { runOnStop += { // We wait here, even though any in-flight messages should have been drained away because the // server thread can potentially have other non-messaging tasks scheduled onto it. The timeout value is // arbitrary and might be inappropriate. - MoreExecutors.shutdownAndAwaitTermination(serverThread as ExecutorService, 50, SECONDS) + MoreExecutors.shutdownAndAwaitTermination(it, 50, SECONDS) } } + makeVaultObservers(schedulerService, database.hibernateConfig, smm, schemaService, flowLogicRefFactory) val rpcOps = makeRPCOps(flowStarter, database, smm) startMessagingService(rpcOps) @@ -266,6 +270,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, startShell() Pair(StartedNodeImpl(this@AbstractNode, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService) } + networkMapUpdater = NetworkMapUpdater(services.networkMapCache, NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)), networkMapClient, @@ -273,15 +278,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration, configuration.baseDirectory) runOnStop += networkMapUpdater::close - log.info("Node-info for this node: ${services.myInfo}") - - val nodeInfoAndSigned = NodeInfoAndSigned(services.myInfo) { publicKey, serialised -> - services.keyManagementService.sign(serialised.bytes, publicKey).withoutKey() - } - networkMapUpdater.updateNodeInfo(nodeInfoAndSigned) networkMapUpdater.subscribeToNetworkMap() - // If we successfully loaded network data from database, we set this future to Unit. + // If we successfully loaded network data from database, we set this future to Unit. _nodeReadyFuture.captureLater(services.networkMapCache.nodeReady.map { Unit }) return startedImpl.apply { @@ -310,9 +309,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } - private fun initNodeInfo(networkMapCache: NetworkMapCacheBaseInternal, - identity: PartyAndCertificate, - identityKeyPair: KeyPair): Pair, NodeInfo> { + private fun updateNodeInfo(networkMapCache: NetworkMapCacheBaseInternal, + networkMapClient: NetworkMapClient?, + identity: PartyAndCertificate, + identityKeyPair: KeyPair): Pair, NodeInfo> { val keyPairs = mutableSetOf(identityKeyPair) myNotaryIdentity = configuration.notary?.let { @@ -326,7 +326,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } - val nodeInfoWithBlankSerial = NodeInfo( + val potentialNodeInfo = NodeInfo( myAddresses(), setOf(identity, myNotaryIdentity).filterNotNull(), versionInfo.platformVersion, @@ -335,16 +335,51 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val nodeInfoFromDb = networkMapCache.getNodeByLegalName(identity.name) - val nodeInfo = if (nodeInfoWithBlankSerial == nodeInfoFromDb?.copy(serial = 0)) { + val nodeInfo = if (potentialNodeInfo == nodeInfoFromDb?.copy(serial = 0)) { // The node info hasn't changed. We use the one from the database to preserve the serial. + log.debug("Node-info hasn't changed") nodeInfoFromDb } else { - nodeInfoWithBlankSerial.copy(serial = platformClock.millis()) + log.info("Node-info has changed so submitting update. Old node-info was $nodeInfoFromDb") + val newNodeInfo = potentialNodeInfo.copy(serial = platformClock.millis()) + networkMapCache.addNode(newNodeInfo) + log.info("New node-info: $newNodeInfo") + newNodeInfo } + + val nodeInfoAndSigned = NodeInfoAndSigned(nodeInfo) { publicKey, serialised -> + val privateKey = keyPairs.single { it.public == publicKey }.private + privateKey.sign(serialised.bytes) + } + + // Write the node-info file even if nothing's changed, just in case the file has been deleted. + NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned) + + if (networkMapClient != null) { + tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient) + } + return Pair(keyPairs, nodeInfo) } + private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) { + val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory())) + + executor.submit(object : Runnable { + override fun run() { + try { + networkMapClient.publish(signedNodeInfo) + } catch (t: Throwable) { + log.warn("Error encountered while publishing node info, will retry again", t) + // TODO: Exponential backoff? + executor.schedule(this, 1, TimeUnit.MINUTES) + } + } + }) + } + protected abstract fun myAddresses(): List + protected open fun makeStateMachineManager(database: CordaPersistence): StateMachineManager { return StateMachineManagerImpl( services, diff --git a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt index a8b3e7c448..806ea71291 100644 --- a/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt +++ b/node/src/main/kotlin/net/corda/node/services/network/NetworkMapUpdater.kt @@ -15,12 +15,7 @@ import net.corda.core.utilities.minutes import net.corda.node.services.api.NetworkMapCacheInternal import net.corda.node.utilities.NamedThreadFactory import net.corda.nodeapi.exceptions.OutdatedNetworkParameterHashException -import net.corda.nodeapi.internal.NodeInfoAndSigned -import net.corda.nodeapi.internal.SignedNodeInfo -import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME -import net.corda.nodeapi.internal.network.ParametersUpdate -import net.corda.nodeapi.internal.network.SignedNetworkParameters -import net.corda.nodeapi.internal.network.verifiedNetworkMapCert +import net.corda.nodeapi.internal.network.* import rx.Subscription import rx.subjects.PublishSubject import java.nio.file.Path @@ -28,6 +23,7 @@ import java.nio.file.StandardCopyOption import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.TimeUnit +import kotlin.system.exitProcess class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, private val fileWatcher: NodeInfoWatcher, @@ -57,36 +53,6 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, return DataFeed(currentUpdateInfo, parametersUpdatesTrack) } - fun updateNodeInfo(nodeInfoAndSigned: NodeInfoAndSigned) { - // TODO We've already done this lookup and check in AbstractNode.initNodeInfo - val oldNodeInfo = networkMapCache.getNodeByLegalIdentity(nodeInfoAndSigned.nodeInfo.legalIdentities[0]) - // Compare node info without timestamp. - if (nodeInfoAndSigned.nodeInfo.copy(serial = 0L) == oldNodeInfo?.copy(serial = 0L)) return - - logger.info("Node-info has changed so submitting update. Old node-info was $oldNodeInfo") - // Only publish and write to disk if there are changes to the node info. - networkMapCache.addNode(nodeInfoAndSigned.nodeInfo) - fileWatcher.saveToFile(nodeInfoAndSigned) - - if (networkMapClient != null) { - tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient) - } - } - - private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) { - executor.submit(object : Runnable { - override fun run() { - try { - networkMapClient.publish(signedNodeInfo) - } catch (t: Throwable) { - logger.warn("Error encountered while publishing node info, will retry in $defaultRetryInterval", t) - // TODO: Exponential backoff? - executor.schedule(this, defaultRetryInterval.toMillis(), TimeUnit.MILLISECONDS) - } - } - }) - } - fun subscribeToNetworkMap() { require(fileWatcherSubscription == null) { "Should not call this method twice." } // Subscribe to file based networkMap @@ -114,17 +80,7 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, networkMap.parametersUpdate?.let { handleUpdateNetworkParameters(networkMapClient, it) } if (currentParametersHash != networkMap.networkParameterHash) { - val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME - val acceptedHash = if (updatesFile.exists()) updatesFile.readObject().raw.hash else null - if (acceptedHash == networkMap.networkParameterHash) { - logger.info("Flag day occurred. Network map switched to the new network parameters: ${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.") - } else { - // TODO This needs special handling (node omitted update process or didn't accept new parameters) - logger.error("Node is using parameters with hash: $currentParametersHash but network map is " + - "advertising: ${networkMap.networkParameterHash}.\n" + - "Node will shutdown now. Please update node to use correct network parameters file.") - } - System.exit(1) + exitOnParametersMismatch(networkMap) } val currentNodeHashes = networkMapCache.allNodeHashes @@ -151,6 +107,23 @@ class NetworkMapUpdater(private val networkMapCache: NetworkMapCacheInternal, return cacheTimeout } + private fun exitOnParametersMismatch(networkMap: NetworkMap) { + val updatesFile = baseDirectory / NETWORK_PARAMS_UPDATE_FILE_NAME + val acceptedHash = if (updatesFile.exists()) updatesFile.readObject().raw.hash else null + val exitCode = if (acceptedHash == networkMap.networkParameterHash) { + logger.info("Flag day occurred. Network map switched to the new network parameters: " + + "${networkMap.networkParameterHash}. Node will shutdown now and needs to be started again.") + 0 + } else { + // TODO This needs special handling (node omitted update process or didn't accept new parameters) + logger.error("Node is using parameters with hash: $currentParametersHash but network map is " + + "advertising: ${networkMap.networkParameterHash}.\n" + + "Node will shutdown now. Please update node to use correct network parameters file.") + 1 + } + exitProcess(exitCode) + } + private fun handleUpdateNetworkParameters(networkMapClient: NetworkMapClient, update: ParametersUpdate) { if (update.newParametersHash == newNetworkParameters?.first?.newParametersHash) { // This update was handled already. diff --git a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt index e47ab67a0d..4c734bf17b 100644 --- a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt +++ b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt @@ -9,7 +9,6 @@ import net.corda.core.serialization.deserialize import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.VersionInfo import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX import net.corda.nodeapi.internal.persistence.DatabaseConfig @@ -21,7 +20,6 @@ import org.junit.Test import org.junit.rules.TemporaryFolder import java.nio.file.Files import kotlin.test.assertEquals -import kotlin.test.assertNotEquals import kotlin.test.assertNull class NodeTest { @@ -66,16 +64,7 @@ class NodeTest { val node = Node(configuration, rigorousMock().also { doReturn(platformVersion).whenever(it).platformVersion }, initialiseSerialization = false) - val nodeInfo = node.generateNodeInfo() - assertEquals(listOf(nodeAddress), nodeInfo.addresses) - assertEquals(listOf(nodeName), nodeInfo.legalIdentitiesAndCerts.map { it.name }) - assertEquals(platformVersion, nodeInfo.platformVersion) - node.generateNodeInfo().let { - assertNotEquals(nodeInfo, it) // Different serial. - assertEquals(nodeInfo, it.copy(serial = nodeInfo.serial)) - } - PersistentNetworkMapCache(database, emptyList()).addNode(nodeInfo) - assertEquals(nodeInfo, node.generateNodeInfo()) + assertEquals(node.generateNodeInfo(), node.generateNodeInfo()) // Node info doesn't change (including the serial) } } } diff --git a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt index 9fc5698821..64c997237a 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/network/NetworkMapUpdaterTest.kt @@ -24,9 +24,11 @@ import net.corda.nodeapi.internal.createDevNetworkMapCa import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair import net.corda.nodeapi.internal.network.* import net.corda.testing.common.internal.testNetworkParameters -import net.corda.testing.core.* +import net.corda.testing.core.SerializationEnvironmentRule +import net.corda.testing.core.expect +import net.corda.testing.core.expectEvents +import net.corda.testing.core.sequence import net.corda.testing.internal.DEV_ROOT_CA -import net.corda.testing.internal.TestNodeInfoBuilder import net.corda.testing.internal.createNodeInfoAndSigned import org.assertj.core.api.Assertions.assertThat import org.junit.After @@ -56,7 +58,6 @@ class NetworkMapUpdaterTest { private val networkParametersHash = SecureHash.randomSHA256() private val fileWatcher = NodeInfoWatcher(baseDir, scheduler) private val updater = NetworkMapUpdater(networkMapCache, fileWatcher, networkMapClient, networkParametersHash, baseDir) - private val nodeInfoBuilder = TestNodeInfoBuilder() private var parametersUpdate: ParametersUpdate? = null @After @@ -65,39 +66,6 @@ class NetworkMapUpdaterTest { fs.close() } - @Test - fun `publish node info`() { - nodeInfoBuilder.addIdentity(ALICE_NAME) - - val nodeInfo1AndSigned = nodeInfoBuilder.buildWithSigned() - val sameNodeInfoDifferentTimeAndSigned = nodeInfoBuilder.buildWithSigned(serial = System.currentTimeMillis()) - - // Publish node info for the first time. - updater.updateNodeInfo(nodeInfo1AndSigned) - // Sleep as publish is asynchronous. - // TODO: Remove sleep in unit test - Thread.sleep(2L * cacheExpiryMs) - verify(networkMapClient, times(1)).publish(any()) - - networkMapCache.addNode(nodeInfo1AndSigned.nodeInfo) - - // Publish the same node info, but with different serial. - updater.updateNodeInfo(sameNodeInfoDifferentTimeAndSigned) - // TODO: Remove sleep in unit test. - Thread.sleep(2L * cacheExpiryMs) - - // Same node info should not publish twice - verify(networkMapClient, times(0)).publish(sameNodeInfoDifferentTimeAndSigned.signed) - - val differentNodeInfoAndSigned = createNodeInfoAndSigned("Bob") - - // Publish different node info. - updater.updateNodeInfo(differentNodeInfoAndSigned) - // TODO: Remove sleep in unit test. - Thread.sleep(200) - verify(networkMapClient, times(1)).publish(differentNodeInfoAndSigned.signed) - } - @Test fun `process add node updates from network map, with additional node infos from dir`() { val (nodeInfo1, signedNodeInfo1) = createNodeInfoAndSigned("Info 1") From 1f5559e3c44883e01e37a593adb05d0bbff47ac3 Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Tue, 3 Apr 2018 17:33:42 +0100 Subject: [PATCH 2/5] Speed-up `NodeRegistrationTest` (#2873) * Improve logging for NetworkMap requests * Allow interrupt in polling if the process started successfully * Put `advertiseNewParameters` back * Additional log line to indicate when all the nodes are started * Improve logging and use concurrent map since it is updated from multiple threads * Change NetworkMap response validity duration and rename parameter accordingly * Changes following code review from @shamsasari --- .../registration/NodeRegistrationTest.kt | 40 ++++++++++++------- .../testing/node/internal/DriverDSLImpl.kt | 16 +++++--- .../node/internal/network/NetworkMapServer.kt | 6 +-- 3 files changed, 39 insertions(+), 23 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt b/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt index c8ab1e3952..21592b7870 100644 --- a/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt @@ -2,11 +2,9 @@ package net.corda.node.utilities.registration import net.corda.core.identity.CordaX500Name import net.corda.core.internal.concurrent.transpose +import net.corda.core.internal.logElapsedTime import net.corda.core.messaging.startFlow -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.OpaqueBytes -import net.corda.core.utilities.getOrThrow -import net.corda.core.utilities.minutes +import net.corda.core.utilities.* import net.corda.finance.DOLLARS import net.corda.finance.flows.CashIssueAndPaymentFlow import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair @@ -49,6 +47,7 @@ class NodeRegistrationTest { private val notaryName = CordaX500Name("NotaryService", "Zurich", "CH") private val aliceName = CordaX500Name("Alice", "London", "GB") private val genevieveName = CordaX500Name("Genevieve", "London", "GB") + private val log = contextLogger() } @Rule @@ -63,7 +62,7 @@ class NodeRegistrationTest { @Before fun startServer() { server = NetworkMapServer( - cacheTimeout = 1.minutes, + pollInterval = 1.seconds, hostAndPort = portAllocation.nextHostAndPort(), myHostNameValue = "localhost", additionalServices = registrationHandler) @@ -93,6 +92,9 @@ class NodeRegistrationTest { startNode(providedName = genevieveName), defaultNotaryNode ).transpose().getOrThrow() + + log.info("Nodes started") + val (alice, genevieve) = nodes assertThat(registrationHandler.idsPolled).containsOnly( @@ -119,25 +121,33 @@ class RegistrationHandler(private val rootCertAndKeyPair: CertificateAndKeyPair) private val certPaths = HashMap() val idsPolled = HashSet() + companion object { + val log = loggerFor() + } + @POST @Consumes(MediaType.APPLICATION_OCTET_STREAM) @Produces(MediaType.TEXT_PLAIN) fun registration(input: InputStream): Response { - val certificationRequest = input.use { JcaPKCS10CertificationRequest(it.readBytes()) } - val (certPath, name) = createSignedClientCertificate( - certificationRequest, - rootCertAndKeyPair.keyPair, - listOf(rootCertAndKeyPair.certificate)) - require(!name.organisation.contains("\\s".toRegex())) { "Whitespace in the organisation name not supported" } - certPaths[name.organisation] = certPath - return Response.ok(name.organisation).build() + return log.logElapsedTime("Registration") { + val certificationRequest = input.use { JcaPKCS10CertificationRequest(it.readBytes()) } + val (certPath, name) = createSignedClientCertificate( + certificationRequest, + rootCertAndKeyPair.keyPair, + listOf(rootCertAndKeyPair.certificate)) + require(!name.organisation.contains("\\s".toRegex())) { "Whitespace in the organisation name not supported" } + certPaths[name.organisation] = certPath + Response.ok(name.organisation).build() + } } @GET @Path("{id}") fun reply(@PathParam("id") id: String): Response { - idsPolled += id - return buildResponse(certPaths[id]!!.certificates) + return log.logElapsedTime("Reply by Id") { + idsPolled += id + buildResponse(certPaths[id]!!.certificates) + } } private fun buildResponse(certificates: List): Response { diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt index 60b925d793..620294a904 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/DriverDSLImpl.kt @@ -73,6 +73,7 @@ import java.time.Instant import java.time.ZoneOffset.UTC import java.time.format.DateTimeFormatter import java.util.* +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit @@ -101,7 +102,7 @@ class DriverDSLImpl( override val shutdownManager get() = _shutdownManager!! private val cordappPackages = extraCordappPackagesToScan + getCallerPackage() // Map from a nodes legal name to an observable emitting the number of nodes in its network map. - private val countObservables = mutableMapOf>() + private val countObservables = ConcurrentHashMap>() private val nodeNames = mutableSetOf() /** * Future which completes when the network map is available, whether a local one or one from the CZ. This future acts @@ -575,15 +576,17 @@ class DriverDSLImpl( } /** + * @nodeName the name of the node which performs counting * @param initial number of nodes currently in the network map of a running node. * @param networkMapCacheChangeObservable an observable returning the updates to the node network map. * @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes * the initial value emitted is always [initial] */ - private fun nodeCountObservable(initial: Int, networkMapCacheChangeObservable: Observable): + private fun nodeCountObservable(nodeName: CordaX500Name, initial: Int, networkMapCacheChangeObservable: Observable): ConnectableObservable { val count = AtomicInteger(initial) return networkMapCacheChangeObservable.map { + log.debug("nodeCountObservable for '$nodeName' received '$it'") when (it) { is NetworkMapCache.MapChange.Added -> count.incrementAndGet() is NetworkMapCache.MapChange.Removed -> count.decrementAndGet() @@ -599,8 +602,9 @@ class DriverDSLImpl( */ private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture { val (snapshot, updates) = rpc.networkMapFeed() - val counterObservable = nodeCountObservable(snapshot.size, updates) - countObservables[rpc.nodeInfo().legalIdentities[0].name] = counterObservable + val nodeName = rpc.nodeInfo().legalIdentities[0].name + val counterObservable = nodeCountObservable(nodeName, snapshot.size, updates) + countObservables[nodeName] = counterObservable /* TODO: this might not always be the exact number of nodes one has to wait for, * for example in the following sequence * 1 start 3 nodes in order, A, B, C. @@ -611,6 +615,7 @@ class DriverDSLImpl( // This is an observable which yield the minimum number of nodes in each node network map. val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array -> + log.debug("smallestSeenNetworkMapSize for '$nodeName' is: ${args.toList()}") args.map { it as Int }.min() ?: 0 } val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture() @@ -701,7 +706,8 @@ class DriverDSLImpl( if (it == processDeathFuture) { throw ListenProcessDeathException(config.corda.p2pAddress, process) } - processDeathFuture.cancel(false) + // Will interrupt polling for process death as this is no longer relevant since the process been successfully started and reflected itself in the NetworkMap. + processDeathFuture.cancel(true) log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress") OutOfProcessImpl(rpc.nodeInfo(), rpc, config.corda, webAddress, useHTTPS, debugPort, process, onNodeExit) } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt index 5884cebb3f..ba8522d9c3 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt @@ -32,7 +32,7 @@ import javax.ws.rs.core.Response import javax.ws.rs.core.Response.ok import javax.ws.rs.core.Response.status -class NetworkMapServer(private val cacheTimeout: Duration, +class NetworkMapServer(private val pollInterval: Duration, hostAndPort: NetworkHostAndPort, private val networkMapCertAndKeyPair: CertificateAndKeyPair = createDevNetworkMapCa(), private val myHostNameValue: String = "test.host.name", @@ -137,7 +137,7 @@ class NetworkMapServer(private val cacheTimeout: Duration, fun getNetworkMap(): Response { val networkMap = NetworkMap(nodeInfoMap.keys.toList(), signedNetParams.raw.hash, parametersUpdate) val signedNetworkMap = networkMapCertAndKeyPair.sign(networkMap) - return Response.ok(signedNetworkMap.serialize().bytes).header("Cache-Control", "max-age=${cacheTimeout.seconds}").build() + return Response.ok(signedNetworkMap.serialize().bytes).header("Cache-Control", "max-age=${pollInterval.seconds}").build() } // Remove nodeInfo for testing. @@ -177,4 +177,4 @@ class NetworkMapServer(private val cacheTimeout: Duration, @Path("my-hostname") fun getHostName(): Response = Response.ok(myHostNameValue).build() } -} +} \ No newline at end of file From 7deb8025476d8bb5ad3d387d6c845c904df2f1e8 Mon Sep 17 00:00:00 2001 From: Joel Dudley Date: Tue, 3 Apr 2018 18:03:07 +0100 Subject: [PATCH 3/5] Adds link from changelog to upgrade notes. --- docs/source/changelog.rst | 6 +++--- samples/irs-demo/README.md | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index b567354c5a..109e50a378 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,12 +1,12 @@ Changelog ========= +Here's a summary of what's changed in each Corda release. For guidance on how to upgrade code from the previous +release, see :doc:`upgrade-notes`. + Unreleased ---------- -Here are brief summaries of what's changed between each snapshot release. This includes guidance on how to upgrade code -from the previous milestone release. - * Errors thrown by a Corda node will now reported to a calling RPC client with attention to serialization and obfuscation of internal data. * Serializing an inner class (non-static nested class in Java, inner class in Kotlin) will be rejected explicitly by the serialization diff --git a/samples/irs-demo/README.md b/samples/irs-demo/README.md index a3e5060a98..9a11c0236b 100644 --- a/samples/irs-demo/README.md +++ b/samples/irs-demo/README.md @@ -33,8 +33,8 @@ the time controls at the top left of the home page to run the fixings. Click any view it. *Note:* The IRS web UI currently has a bug when changing the clock time where it may show no numbers or apply fixings -inconsistently. The issues will be addressed in a future milestone release. Meanwhile, you can take a look at a simpler -oracle example here: https://github.com/corda/oracle-example. +inconsistently. The issues will be addressed in a future release. Meanwhile, you can take a look at a simpler oracle +example here: https://github.com/corda/oracle-example. ## Running the system test From 7ec65901b62b87a14f0424a8b55e412003861e02 Mon Sep 17 00:00:00 2001 From: szymonsztuka Date: Tue, 3 Apr 2018 19:13:41 +0100 Subject: [PATCH 4/5] Corda-1286 Fix embedded shell when rpc users from remoted database (#2910) --- .../kotlin/net/corda/node/internal/Node.kt | 7 ++-- .../security/RPCSecurityManagerImpl.kt | 4 +-- .../RPCSecurityManagerWithAdditionalUser.kt | 32 +++++++++++++++++++ .../node/services/config/shell/ShellConfig.kt | 3 +- 4 files changed, 40 insertions(+), 6 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerWithAdditionalUser.kt diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 3ce19af998..66d4308215 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -23,6 +23,7 @@ import net.corda.node.VersionInfo import net.corda.node.internal.artemis.ArtemisBroker import net.corda.node.internal.artemis.BrokerAddresses import net.corda.node.internal.cordapp.CordappLoader +import net.corda.node.internal.security.RPCSecurityManagerWithAdditionalUser import net.corda.node.internal.security.RPCSecurityManagerImpl import net.corda.node.serialization.KryoServerSerializationScheme import net.corda.node.services.api.NodePropertiesStore @@ -30,7 +31,7 @@ import net.corda.node.services.api.SchemaService import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.SecurityConfiguration import net.corda.node.services.config.VerifierType -import net.corda.node.services.config.shell.shellUser +import net.corda.node.services.config.shell.localShellUser import net.corda.node.services.config.shouldInitCrashShell import net.corda.node.services.messaging.* import net.corda.node.services.rpc.ArtemisRpcBroker @@ -162,7 +163,9 @@ open class Node(configuration: NodeConfiguration, val securityManagerConfig = configuration.security?.authService ?: SecurityConfiguration.AuthService.fromUsers(configuration.rpcUsers) - securityManager = RPCSecurityManagerImpl(if (configuration.shouldInitCrashShell()) securityManagerConfig.copyWithAdditionalUser(configuration.shellUser()) else securityManagerConfig) + securityManager = with(RPCSecurityManagerImpl(securityManagerConfig)) { + if (configuration.shouldInitCrashShell()) RPCSecurityManagerWithAdditionalUser(this, localShellUser()) else this + } val serverAddress = configuration.messagingServerAddress ?: makeLocalMessageBroker(networkParameters) val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) { diff --git a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt index eec5d8ff33..cf961636f9 100644 --- a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerImpl.kt @@ -186,7 +186,7 @@ private object RPCPermissionResolver : PermissionResolver { } } -private class ShiroAuthorizingSubject( +class ShiroAuthorizingSubject( private val subjectId: PrincipalCollection, private val manager: DefaultSecurityManager) : AuthorizingSubject { @@ -201,7 +201,7 @@ private fun buildCredentialMatcher(type: PasswordEncryption) = when (type) { PasswordEncryption.SHIRO_1_CRYPT -> PasswordMatcher() } -private class InMemoryRealm(users: List, +class InMemoryRealm(users: List, realmId: String, passwordEncryption: PasswordEncryption = PasswordEncryption.NONE) : AuthorizingRealm() { diff --git a/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerWithAdditionalUser.kt b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerWithAdditionalUser.kt new file mode 100644 index 0000000000..dfadbe6aeb --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/security/RPCSecurityManagerWithAdditionalUser.kt @@ -0,0 +1,32 @@ +package net.corda.node.internal.security + +import net.corda.nodeapi.internal.config.User +import org.apache.shiro.mgt.DefaultSecurityManager +import org.apache.shiro.subject.SimplePrincipalCollection +import javax.security.auth.login.FailedLoginException + +/** + * Wrapper for [RPCSecurityManager] which creates in-memory [AuthorizingSubject] for [User]. + * Can be used to add on a specific [User] on top of the principals provided by the [RPCSecurityManager] realm. + */ +class RPCSecurityManagerWithAdditionalUser(private val delegate: RPCSecurityManager, private val user: User) : RPCSecurityManager by delegate { + + private val realmId = user.username + "Realm" + private val shellAuthorizingSubject = ShiroAuthorizingSubject(subjectId = SimplePrincipalCollection(user.username, id.value), + manager = DefaultSecurityManager(InMemoryRealm(listOf(user), realmId))) + + @Throws(FailedLoginException::class) + override fun authenticate(principal: String, password: Password): AuthorizingSubject = + if (user.username == principal && user.password == password.valueAsString) { + shellAuthorizingSubject + } else { + delegate.authenticate(principal, password) + } + + override fun buildSubject(principal: String): AuthorizingSubject = + if (user.username == principal) { + shellAuthorizingSubject + } else { + delegate.buildSubject(principal) + } +} diff --git a/node/src/main/kotlin/net/corda/node/services/config/shell/ShellConfig.kt b/node/src/main/kotlin/net/corda/node/services/config/shell/ShellConfig.kt index 00fa4e0f81..b4e421a884 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/shell/ShellConfig.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/shell/ShellConfig.kt @@ -40,5 +40,4 @@ fun NodeConfiguration.toShellConfig(): ShellConfiguration { noLocalShell = this.noLocalShell) } -private fun localShellUser() = User("shell", "shell", setOf(Permissions.all())) -fun NodeConfiguration.shellUser() = shouldInitCrashShell()?.let { localShellUser() } +fun localShellUser() = User("shell", "shell", setOf(Permissions.all())) From 0af42bda445715b896b73f67697ab6ff578e3e43 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Wed, 4 Apr 2018 11:23:09 +0100 Subject: [PATCH 5/5] Fix a bug in the AMQP protocol wrapper code which gives incorrect reply address information on received packets. (#2914) --- .../internal/protonwrapper/engine/ConnectionStateMachine.kt | 4 ++-- .../internal/protonwrapper/netty/AMQPChannelHandler.kt | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index 473c28876c..42575770e0 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -359,9 +359,9 @@ internal class ConnectionStateMachine(serverMode: Boolean, payload, link.source.address, remoteLegalName, - NetworkHostAndPort(localAddress.hostString, localAddress.port), - localLegalName, NetworkHostAndPort(remoteAddress.hostString, remoteAddress.port), + localLegalName, + NetworkHostAndPort(localAddress.hostString, localAddress.port), appProperties, channel, delivery) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt index 25855df558..06237626bb 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt @@ -122,10 +122,10 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, is SendableMessageImpl -> { val inetAddress = InetSocketAddress(msg.destinationLink.host, msg.destinationLink.port) require(inetAddress == remoteAddress) { - "Message for incorrect endpoint" + "Message for incorrect endpoint $inetAddress expected $remoteAddress" } require(CordaX500Name.parse(msg.destinationLegalName) == CordaX500Name.build(remoteCert!!.subjectX500Principal)) { - "Message for incorrect legal identity" + "Message for incorrect legal identity ${msg.destinationLegalName} expected ${remoteCert!!.subjectX500Principal}" } log.debug { "channel write ${msg.applicationProperties["_AMQ_DUPL_ID"]}" } eventProcessor!!.transportWriteMessage(msg)