From 9d822cdbe8954e34b50e12435cc17cb3f9a564d1 Mon Sep 17 00:00:00 2001 From: josecoll Date: Mon, 14 May 2018 15:34:21 +0100 Subject: [PATCH 01/11] CORDA-861 Pagination failure with aggregate groupBy query (#3135) * Pagination relies on a recursive call to count total results, this sub-query should NOT perform pagination checks. * Fix using defaulted parameter. * Make internal method private. --- .../node/services/vault/NodeVaultService.kt | 13 ++++++--- .../node/services/vault/VaultQueryTests.kt | 29 ++++++++++++++++--- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index 84957f4230..81e8c0bdc6 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -397,13 +397,18 @@ class NodeVaultService( @Throws(VaultQueryException::class) override fun _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class): Vault.Page { + return _queryBy(criteria, paging, sorting, contractStateType, false) + } + + @Throws(VaultQueryException::class) + private fun _queryBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class, skipPagingChecks: Boolean): Vault.Page { log.info("Vault Query for contract type: $contractStateType, criteria: $criteria, pagination: $paging, sorting: $sorting") // calculate total results where a page specification has been defined var totalStates = -1L - if (!paging.isDefault) { + if (!skipPagingChecks && !paging.isDefault) { val count = builder { VaultSchemaV1.VaultStates::recordedTime.count() } val countCriteria = QueryCriteria.VaultCustomQueryCriteria(count, Vault.StateStatus.ALL) - val results = queryBy(contractStateType, criteria.and(countCriteria)) + val results = _queryBy(criteria.and(countCriteria), PageSpecification(), Sort(emptyList()), contractStateType, true) // only skip pagination checks for total results count query totalStates = results.otherResults.last() as Long } @@ -422,7 +427,7 @@ class NodeVaultService( val query = session.createQuery(criteriaQuery) // pagination checks - if (!paging.isDefault) { + if (!skipPagingChecks && !paging.isDefault) { // pagination if (paging.pageNumber < DEFAULT_PAGE_NUM) throw VaultQueryException("Page specification: invalid page number ${paging.pageNumber} [page numbers start from $DEFAULT_PAGE_NUM]") if (paging.pageSize < 1) throw VaultQueryException("Page specification: invalid page size ${paging.pageSize} [must be a value between 1 and $MAX_PAGE_SIZE]") @@ -435,7 +440,7 @@ class NodeVaultService( val results = query.resultList // final pagination check (fail-fast on too many results when no pagination specified) - if (paging.isDefault && results.size > DEFAULT_PAGE_SIZE) + if (!skipPagingChecks && paging.isDefault && results.size > DEFAULT_PAGE_SIZE) throw VaultQueryException("Please specify a `PageSpecification` as there are more results [${results.size}] than the default page size [$DEFAULT_PAGE_SIZE]") val statesAndRefs: MutableList> = mutableListOf() diff --git a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt index 4c021b48c2..cdba4ced28 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/VaultQueryTests.kt @@ -28,10 +28,7 @@ import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.testing.core.* import net.corda.testing.internal.TEST_TX_TIME import net.corda.testing.internal.rigorousMock -import net.corda.testing.internal.vault.DUMMY_LINEAR_CONTRACT_PROGRAM_ID -import net.corda.testing.internal.vault.DummyLinearContract -import net.corda.testing.internal.vault.DummyLinearStateSchemaV1 -import net.corda.testing.internal.vault.VaultFiller +import net.corda.testing.internal.vault.* import net.corda.testing.node.MockServices import net.corda.testing.node.MockServices.Companion.makeTestDatabaseAndMockServices import net.corda.testing.node.makeTestIdentityService @@ -1182,6 +1179,30 @@ abstract class VaultQueryTestsBase : VaultQueryParties { } } + // test paging with aggregate function and group by clause + @Test + fun `test paging with aggregate function and group by clause`() { + database.transaction { + (0..200).forEach { + vaultFiller.fillWithSomeTestLinearStates(1, linearNumber = it.toLong(), linearString = it.toString()) + } + val max = builder { DummyLinearStateSchemaV1.PersistentDummyLinearState::linearTimestamp.max( + groupByColumns = listOf(DummyLinearStateSchemaV1.PersistentDummyLinearState::linearNumber) + ) + } + val maxCriteria = VaultCustomQueryCriteria(max) + val pageSpec = PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE) + + val results = vaultService.queryBy(maxCriteria, paging = pageSpec) + println("Total states available: ${results.totalStatesAvailable}") + results.otherResults.forEachIndexed { index, any -> + println("$index : $any") + } + assertThat(results.otherResults.size).isEqualTo(402) + assertThat(results.otherResults.last()).isEqualTo(200L) + } + } + // sorting @Test fun `sorting - all states sorted by contract type, state status, consumed time`() { From 6e59a694c1235ab92def8d3e93b6f5952ace4b8b Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Mon, 14 May 2018 16:39:44 +0100 Subject: [PATCH 02/11] CORDA-1461 fix merge (#3139) --- docs/source/changelog.rst | 4 ---- 1 file changed, 4 deletions(-) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 988cbcb6a4..ced5c7738f 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -12,10 +12,6 @@ Unreleased * Added smart detection logic for the development mode setting and an option to override it from the command line. -* Fixed an error thrown by NodeVaultService upon recording a transaction with a number of inputs greater than the default page size. - -* Fixed incorrect computation of ``totalStates`` from ``otherResults`` in ``NodeVaultService``. - * Changes to the JSON/YAML serialisation format from ``JacksonSupport``, which also applies to the node shell: * ``Instant`` and ``Date`` objects are serialised as ISO-8601 formatted strings rather than timestamps From 4f9bbc8820759d156d836683fc28643dd4ddffea Mon Sep 17 00:00:00 2001 From: Chris Rankin Date: Mon, 14 May 2018 16:50:43 +0100 Subject: [PATCH 03/11] ENT-1463: Isolate more non-deterministic code from AMQP serialisation. (#3138) --- .../serialization/amqp/CustomSerializer.kt | 2 +- .../serialization/amqp/EvolutionSerializer.kt | 2 +- .../serialization/amqp/SerializerFactory.kt | 66 +++++++++++-------- .../serialization/amqp/TransformsSchema.kt | 2 +- .../serialization/carpenter/ClassCarpenter.kt | 2 +- .../serialization/carpenter/SchemaFields.kt | 4 +- .../amqp/JavaPrivatePropertyTests.java | 10 ++- .../serialization/amqp/GenericsTests.kt | 19 +++--- 8 files changed, 58 insertions(+), 49 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/CustomSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/CustomSerializer.kt index ccdd67a718..46de47d12a 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/CustomSerializer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/CustomSerializer.kt @@ -58,7 +58,7 @@ abstract class CustomSerializer : AMQPSerializer, SerializerFor { * subclass in the schema, so that we can distinguish between subclasses. */ // TODO: should this be a custom serializer at all, or should it just be a plain AMQPSerializer? - class SubClass(protected val clazz: Class<*>, protected val superClassSerializer: CustomSerializer) : CustomSerializer() { + class SubClass(private val clazz: Class<*>, private val superClassSerializer: CustomSerializer) : CustomSerializer() { // TODO: should this be empty or contain the schema of the super? override val schemaForDocumentation = Schema(emptyList()) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/EvolutionSerializer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/EvolutionSerializer.kt index 28a72d984a..691921bc8c 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/EvolutionSerializer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/EvolutionSerializer.kt @@ -239,7 +239,7 @@ class EvolutionSerializerGetter : EvolutionSerializerGetterBase() { typeNotation: TypeNotation, newSerializer: AMQPSerializer, schemas: SerializationSchemas): AMQPSerializer { - return factory.getSerializersByDescriptor().computeIfAbsent(typeNotation.descriptor.name!!) { + return factory.serializersByDescriptor.computeIfAbsent(typeNotation.descriptor.name!!) { when (typeNotation) { is CompositeType -> EvolutionSerializer.make(typeNotation, newSerializer as ObjectSerializer, factory) is RestrictedType -> { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt index b1e27f8b36..8a56baf1b0 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializerFactory.kt @@ -40,32 +40,40 @@ open class SerializerFactory( val whitelist: ClassWhitelist, val classCarpenter: ClassCarpenter, private val evolutionSerializerGetter: EvolutionSerializerGetterBase = EvolutionSerializerGetter(), - val fingerPrinter: FingerPrinter = SerializerFingerPrinter()) { + val fingerPrinter: FingerPrinter = SerializerFingerPrinter(), + private val serializersByType: MutableMap>, + val serializersByDescriptor: MutableMap>, + private val customSerializers: MutableList, + val transformsCache: MutableMap>>) { + constructor(whitelist: ClassWhitelist, + classCarpenter: ClassCarpenter, + evolutionSerializerGetter: EvolutionSerializerGetterBase = EvolutionSerializerGetter(), + fingerPrinter: FingerPrinter = SerializerFingerPrinter() + ) : this(whitelist, classCarpenter, evolutionSerializerGetter, fingerPrinter, + serializersByType = ConcurrentHashMap(), + serializersByDescriptor = ConcurrentHashMap(), + customSerializers = CopyOnWriteArrayList(), + transformsCache = ConcurrentHashMap()) constructor(whitelist: ClassWhitelist, classLoader: ClassLoader, evolutionSerializerGetter: EvolutionSerializerGetterBase = EvolutionSerializerGetter(), fingerPrinter: FingerPrinter = SerializerFingerPrinter() - ) : this(whitelist, ClassCarpenterImpl(classLoader, whitelist), evolutionSerializerGetter, fingerPrinter) + ) : this(whitelist, ClassCarpenterImpl(classLoader, whitelist), evolutionSerializerGetter, fingerPrinter, + serializersByType = ConcurrentHashMap(), + serializersByDescriptor = ConcurrentHashMap(), + customSerializers = CopyOnWriteArrayList(), + transformsCache = ConcurrentHashMap()) init { fingerPrinter.setOwner(this) } - private val serializersByType = ConcurrentHashMap>() - private val serializersByDescriptor = ConcurrentHashMap>() - private val customSerializers = CopyOnWriteArrayList() - private val transformsCache = ConcurrentHashMap>>() - val classloader: ClassLoader get() = classCarpenter.classloader private fun getEvolutionSerializer(typeNotation: TypeNotation, newSerializer: AMQPSerializer, schemas: SerializationSchemas) = evolutionSerializerGetter.getEvolutionSerializer(this, typeNotation, newSerializer, schemas) - fun getSerializersByDescriptor() = serializersByDescriptor - - fun getTransformsCache() = transformsCache - /** * Look up, and manufacture if necessary, a serializer for the given type. * @@ -219,7 +227,7 @@ open class SerializerFactory( /** * Iterate over an AMQP schema, for each type ascertain whether it's on ClassPath of [classloader] and, - * if not, use the [ClassCarpenter] to generate a class to use in it's place. + * if not, use the [ClassCarpenter] to generate a class to use in its place. */ private fun processSchema(schemaAndDescriptor: FactorySchemaAndDescriptor, sentinel: Boolean = false) { val metaSchema = CarpenterMetaSchema.newInstance() @@ -239,24 +247,28 @@ open class SerializerFactory( } if (metaSchema.isNotEmpty()) { - val mc = MetaCarpenter(metaSchema, classCarpenter) - try { - mc.build() - } catch (e: MetaCarpenterException) { - // preserve the actual message locally - loggerFor().apply { - error("${e.message} [hint: enable trace debugging for the stack trace]") - trace("", e) - } - - // prevent carpenter exceptions escaping into the world, convert things into a nice - // NotSerializableException for when this escapes over the wire - throw NotSerializableException(e.name) - } - processSchema(schemaAndDescriptor, true) + runCarpentry(schemaAndDescriptor, metaSchema) } } + private fun runCarpentry(schemaAndDescriptor: FactorySchemaAndDescriptor, metaSchema: CarpenterMetaSchema) { + val mc = MetaCarpenter(metaSchema, classCarpenter) + try { + mc.build() + } catch (e: MetaCarpenterException) { + // preserve the actual message locally + loggerFor().apply { + error("${e.message} [hint: enable trace debugging for the stack trace]") + trace("", e) + } + + // prevent carpenter exceptions escaping into the world, convert things into a nice + // NotSerializableException for when this escapes over the wire + throw NotSerializableException(e.name) + } + processSchema(schemaAndDescriptor, true) + } + private fun processSchemaEntry(typeNotation: TypeNotation) = when (typeNotation) { is CompositeType -> processCompositeType(typeNotation) // java.lang.Class (whether a class or interface) is RestrictedType -> processRestrictedType(typeNotation) // Collection / Map, possibly with generics diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/TransformsSchema.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/TransformsSchema.kt index edb7f2711d..ee695fd7bb 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/TransformsSchema.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/TransformsSchema.kt @@ -200,7 +200,7 @@ data class TransformsSchema(val types: Map>(TransformTypes::class.java) try { val clazz = sf.classloader.loadClass(name) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/ClassCarpenter.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/ClassCarpenter.kt index 42eb64f824..f828041a36 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/ClassCarpenter.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/ClassCarpenter.kt @@ -48,7 +48,7 @@ private val toStringHelper: String = Type.getInternalName(MoreObjects.ToStringHe // Allow us to create alternative ClassCarpenters. interface ClassCarpenter { val whitelist: ClassWhitelist - val classloader: CarpenterClassLoader + val classloader: ClassLoader fun build(schema: Schema): Class<*> } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/SchemaFields.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/SchemaFields.kt index 0ce9a8f3d1..351aaecb70 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/SchemaFields.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/carpenter/SchemaFields.kt @@ -30,8 +30,8 @@ abstract class ClassField(field: Class) : Field(field) { abstract val nullabilityAnnotation: String abstract fun nullTest(mv: MethodVisitor, slot: Int) - override var descriptor = Type.getDescriptor(this.field) - override val type: String get() = if (this.field.isPrimitive) this.descriptor else "Ljava/lang/Object;" + override var descriptor: String? = Type.getDescriptor(this.field) + override val type: String get() = if (this.field.isPrimitive) this.descriptor!! else "Ljava/lang/Object;" fun addNullabilityAnnotation(mv: MethodVisitor) { mv.visitAnnotation(nullabilityAnnotation, true).visitEnd() diff --git a/node-api/src/test/java/net/corda/nodeapi/internal/serialization/amqp/JavaPrivatePropertyTests.java b/node-api/src/test/java/net/corda/nodeapi/internal/serialization/amqp/JavaPrivatePropertyTests.java index 75e6d44ea7..2c0fe80a9e 100644 --- a/node-api/src/test/java/net/corda/nodeapi/internal/serialization/amqp/JavaPrivatePropertyTests.java +++ b/node-api/src/test/java/net/corda/nodeapi/internal/serialization/amqp/JavaPrivatePropertyTests.java @@ -7,7 +7,7 @@ import static org.junit.Assert.*; import java.io.NotSerializableException; import java.lang.reflect.Field; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Map; public class JavaPrivatePropertyTests { static class C { @@ -116,7 +116,7 @@ public class JavaPrivatePropertyTests { B3 b2 = des.deserialize(ser.serialize(b, TestSerializationContext.testSerializationContext), B3.class, TestSerializationContext.testSerializationContext); // since we can't find a getter for b (isb != isB) then we won't serialize that parameter - assertEquals (null, b2.b); + assertNull (b2.b); } @Test @@ -154,8 +154,7 @@ public class JavaPrivatePropertyTests { Field f = SerializerFactory.class.getDeclaredField("serializersByDescriptor"); f.setAccessible(true); - ConcurrentHashMap> serializersByDescriptor = - (ConcurrentHashMap>) f.get(factory); + Map> serializersByDescriptor = (Map>) f.get(factory); assertEquals(1, serializersByDescriptor.size()); ObjectSerializer cSerializer = ((ObjectSerializer)serializersByDescriptor.values().toArray()[0]); @@ -185,8 +184,7 @@ public class JavaPrivatePropertyTests { // Field f = SerializerFactory.class.getDeclaredField("serializersByDescriptor"); f.setAccessible(true); - ConcurrentHashMap> serializersByDescriptor = - (ConcurrentHashMap>) f.get(factory); + Map> serializersByDescriptor = (Map>) f.get(factory); assertEquals(1, serializersByDescriptor.size()); ObjectSerializer cSerializer = ((ObjectSerializer)serializersByDescriptor.values().toArray()[0]); diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/GenericsTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/GenericsTests.kt index 590cdfc5ca..3dd237b89e 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/GenericsTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/GenericsTests.kt @@ -10,7 +10,6 @@ import net.corda.core.identity.CordaX500Name import net.corda.nodeapi.internal.serialization.amqp.testutils.* import net.corda.testing.core.TestIdentity import java.util.* -import java.util.concurrent.ConcurrentHashMap import kotlin.test.assertEquals data class TestContractState( @@ -36,7 +35,7 @@ class GenericsTests { private fun BytesAndSchemas.printSchema() = if (VERBOSE) println("${this.schema}\n") else Unit - private fun ConcurrentHashMap>.printKeyToType() { + private fun MutableMap>.printKeyToType() { if (!VERBOSE) return forEach { @@ -53,11 +52,11 @@ class GenericsTests { val bytes1 = SerializationOutput(factory).serializeAndReturnSchema(G("hi")).apply { printSchema() } - factory.getSerializersByDescriptor().printKeyToType() + factory.serializersByDescriptor.printKeyToType() val bytes2 = SerializationOutput(factory).serializeAndReturnSchema(G(121)).apply { printSchema() } - factory.getSerializersByDescriptor().printKeyToType() + factory.serializersByDescriptor.printKeyToType() listOf(factory, testDefaultFactory()).forEach { f -> DeserializationInput(f).deserialize(bytes1.obj).apply { assertEquals("hi", this.a) } @@ -90,14 +89,14 @@ class GenericsTests { val bytes = ser.serializeAndReturnSchema(G("hi")).apply { printSchema() } - factory.getSerializersByDescriptor().printKeyToType() + factory.serializersByDescriptor.printKeyToType() assertEquals("hi", DeserializationInput(factory).deserialize(bytes.obj).a) assertEquals("hi", DeserializationInput(altContextFactory).deserialize(bytes.obj).a) val bytes2 = ser.serializeAndReturnSchema(Wrapper(1, G("hi"))).apply { printSchema() } - factory.getSerializersByDescriptor().printKeyToType() + factory.serializersByDescriptor.printKeyToType() printSeparator() @@ -149,21 +148,21 @@ class GenericsTests { ser.serialize(Wrapper(Container(InnerA(1)))).apply { factories.forEach { DeserializationInput(it).deserialize(this).apply { assertEquals(1, c.b.a_a) } - it.getSerializersByDescriptor().printKeyToType(); printSeparator() + it.serializersByDescriptor.printKeyToType(); printSeparator() } } ser.serialize(Wrapper(Container(InnerB(1)))).apply { factories.forEach { DeserializationInput(it).deserialize(this).apply { assertEquals(1, c.b.a_b) } - it.getSerializersByDescriptor().printKeyToType(); printSeparator() + it.serializersByDescriptor.printKeyToType(); printSeparator() } } ser.serialize(Wrapper(Container(InnerC("Ho ho ho")))).apply { factories.forEach { DeserializationInput(it).deserialize(this).apply { assertEquals("Ho ho ho", c.b.a_c) } - it.getSerializersByDescriptor().printKeyToType(); printSeparator() + it.serializersByDescriptor.printKeyToType(); printSeparator() } } } @@ -199,7 +198,7 @@ class GenericsTests { a: ForceWildcard<*>, factory: SerializerFactory = SerializerFactory(AllWhitelist, ClassLoader.getSystemClassLoader())): SerializedBytes<*> { val bytes = SerializationOutput(factory).serializeAndReturnSchema(a) - factory.getSerializersByDescriptor().printKeyToType() + factory.serializersByDescriptor.printKeyToType() bytes.printSchema() return bytes.obj } From 759ed7c3d02c7c26641768c87c7b63e133cfc573 Mon Sep 17 00:00:00 2001 From: Tudor Malene Date: Mon, 14 May 2018 17:26:08 +0100 Subject: [PATCH 04/11] CORDA-1369 set co.paralleluniverse.fibers.verifyInstrumentation=true in devMode (#3137) --- docs/source/changelog.rst | 2 ++ node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt | 1 + 2 files changed, 3 insertions(+) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index ced5c7738f..61a4ffdf91 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -6,6 +6,8 @@ release, see :doc:`upgrade-notes`. Unreleased ========== +* Set co.paralleluniverse.fibers.verifyInstrumentation=true in devMode. + * Node will now gracefully fail to start if one of the required ports is already in use. * Node will now gracefully fail to start if ``devMode`` is true and ``compatibilityZoneURL`` is specified. diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index e5b4835aeb..47db47e03a 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -203,6 +203,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, open fun start(): StartedNode { check(started == null) { "Node has already been started" } if (configuration.devMode) { + System.setProperty("co.paralleluniverse.fibers.verifyInstrumentation", "true") Emoji.renderIfSupported { Node.printWarning("This node is running in developer mode! ${Emoji.developer} This is not safe for production deployment.") } } log.info("Node starting up ...") From e1dc57ba9d41543abdc49d4d31a7ac74e09b0eb7 Mon Sep 17 00:00:00 2001 From: Joel Dudley Date: Mon, 14 May 2018 17:40:52 +0100 Subject: [PATCH 05/11] Update CONTRIBUTORS.md --- CONTRIBUTORS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 13fc420f42..af41c64516 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -138,7 +138,7 @@ see changes to this list. * Przemyslaw Bak (R3) * quiark * RangerOfFire -* renlulu +* renlulu (s-labs) * Rex Maudsley (Société Générale) * Rhett Brewer (Goldman Sachs) * Richard Crook (RBS) From b2e9a427a868f8b0a7b1704a0fecd9e98a4c40b4 Mon Sep 17 00:00:00 2001 From: renlulu Date: Tue, 15 May 2018 01:00:07 +0800 Subject: [PATCH 06/11] Extracted a tool class to reduce duplicated logic between InMemoryIdentityService and PersistentIdentityService (#3141) --- .../services/identity/IdentityServiceUtil.kt | 24 +++++++++++++++++++ .../identity/InMemoryIdentityService.kt | 17 +------------ .../identity/PersistentIdentityService.kt | 17 +------------ 3 files changed, 26 insertions(+), 32 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/services/identity/IdentityServiceUtil.kt diff --git a/node/src/main/kotlin/net/corda/node/services/identity/IdentityServiceUtil.kt b/node/src/main/kotlin/net/corda/node/services/identity/IdentityServiceUtil.kt new file mode 100644 index 0000000000..b5f065cc85 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/identity/IdentityServiceUtil.kt @@ -0,0 +1,24 @@ +package net.corda.node.services.identity + +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party + + +fun partiesFromName(query: String, exactMatch: Boolean, x500name: CordaX500Name, results: LinkedHashSet, party: Party) { + + val components = listOfNotNull(x500name.commonName, x500name.organisationUnit, x500name.organisation, x500name.locality, x500name.state, x500name.country) + components.forEach { component -> + if (exactMatch && component == query) { + results += party + } else if (!exactMatch) { + // We can imagine this being a query over a lucene index in future. + // + // Kostas says: We can easily use the Jaro-Winkler distance metric as it is best suited for short + // strings such as entity/company names, and to detect small typos. We can also apply it for city + // or any keyword related search in lists of records (not raw text - for raw text we need indexing) + // and we can return results in hierarchical order (based on normalised String similarity 0.0-1.0). + if (component.contains(query, ignoreCase = true)) + results += party + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt index 92e06f437d..4487128d27 100644 --- a/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt +++ b/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt @@ -103,22 +103,7 @@ class InMemoryIdentityService(identities: Array, override fun partiesFromName(query: String, exactMatch: Boolean): Set { val results = LinkedHashSet() for ((x500name, partyAndCertificate) in principalToParties) { - val party = partyAndCertificate.party - val components = listOfNotNull(x500name.commonName, x500name.organisationUnit, x500name.organisation, x500name.locality, x500name.state, x500name.country) - components.forEach { component -> - if (exactMatch && component == query) { - results += party - } else if (!exactMatch) { - // We can imagine this being a query over a lucene index in future. - // - // Kostas says: We can easily use the Jaro-Winkler distance metric as it is best suited for short - // strings such as entity/company names, and to detect small typos. We can also apply it for city - // or any keyword related search in lists of records (not raw text - for raw text we need indexing) - // and we can return results in hierarchical order (based on normalised String similarity 0.0-1.0). - if (component.contains(query, ignoreCase = true)) - results += party - } - } + partiesFromName(query, exactMatch, x500name, results, partyAndCertificate.party) } return results } diff --git a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt index a8ff0525e0..f2e01d2079 100644 --- a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt +++ b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt @@ -180,22 +180,7 @@ class PersistentIdentityService(override val trustRoot: X509Certificate, override fun partiesFromName(query: String, exactMatch: Boolean): Set { val results = LinkedHashSet() for ((x500name, partyId) in principalToParties.allPersisted()) { - val party = keyToParties[partyId]!!.party - val components = listOfNotNull(x500name.commonName, x500name.organisationUnit, x500name.organisation, x500name.locality, x500name.state, x500name.country) - components.forEach { component -> - if (exactMatch && component == query) { - results += party - } else if (!exactMatch) { - // We can imagine this being a query over a lucene index in future. - // - // Kostas says: We can easily use the Jaro-Winkler distance metric as it is best suited for short - // strings such as entity/company names, and to detect small typos. We can also apply it for city - // or any keyword related search in lists of records (not raw text - for raw text we need indexing) - // and we can return results in hierarchical order (based on normalised String similarity 0.0-1.0). - if (component.contains(query, ignoreCase = true)) - results += party - } - } + partiesFromName(query, exactMatch, x500name, results, keyToParties[partyId]!!.party) } return results } From dce0956f376a8eda9ffa27caf1739128e048c719 Mon Sep 17 00:00:00 2001 From: Joel Dudley Date: Tue, 15 May 2018 09:40:50 +0100 Subject: [PATCH 07/11] Update FlowLogic.kt (#3144) --- core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 29a84a6812..3fd3a32256 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -431,7 +431,8 @@ abstract class FlowLogic { */ var stateMachine: FlowStateMachine<*> @CordaInternal - get() = _stateMachine ?: throw IllegalStateException("This can only be done after the flow has been started.") + get() = _stateMachine ?: throw IllegalStateException( + "You cannot access the flow's state machine until the flow has been started.") @CordaInternal set(value) { _stateMachine = value From 323e809f8cb145ba5fbd77fdfff85feeb4225c00 Mon Sep 17 00:00:00 2001 From: Joel Dudley Date: Tue, 15 May 2018 11:23:15 +0100 Subject: [PATCH 08/11] Update CONTRIBUTORS.md --- CONTRIBUTORS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index af41c64516..3be2d6bd9e 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -108,6 +108,7 @@ see changes to this list. * Lars Stage Thomsen (Danske Bank) * Lee Braine (Barclays) * Lucas Salmen (Itau) +* Lulu Ren (S-Labs) * Maksymilian Pawlak (R3) * Marek Scocovsky (ABSA) * marekdapps @@ -138,7 +139,6 @@ see changes to this list. * Przemyslaw Bak (R3) * quiark * RangerOfFire -* renlulu (s-labs) * Rex Maudsley (Société Générale) * Rhett Brewer (Goldman Sachs) * Richard Crook (RBS) From 84d94d44addc9a795799e832df487e50797d6da4 Mon Sep 17 00:00:00 2001 From: Chris Rankin Date: Tue, 15 May 2018 12:03:33 +0100 Subject: [PATCH 09/11] ENT-1463: Hide more AMQP ConcurrentHashMaps behind interfaces. (#3147) --- .../serialization/SerializationScheme.kt | 10 ++++--- .../amqp/AMQPSerializationScheme.kt | 29 ++++++++++++------- .../amqp/SerializationSchemaTests.kt | 3 +- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt index 33962ab02b..a94aad8786 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/SerializationScheme.kt @@ -92,7 +92,12 @@ internal class AttachmentsClassLoaderBuilder(private val properties: Map, SerializationScheme> +) : SerializationFactory() { + constructor() : this(ConcurrentHashMap()) + companion object { val magicSize = sequenceOf(kryoMagic, amqpMagic).map { it.size }.distinct().single() } @@ -103,9 +108,6 @@ open class SerializationFactoryImpl : SerializationFactory() { private val logger = LoggerFactory.getLogger(javaClass) - // TODO: This is read-mostly. Probably a faster implementation to be found. - private val schemes: ConcurrentHashMap, SerializationScheme> = ConcurrentHashMap() - private fun schemeFor(byteSequence: ByteSequence, target: SerializationContext.UseCase): Pair { // truncate sequence to at most magicSize, and make sure it's a copy to avoid holding onto large ByteArrays val magic = CordaSerializationMagic(byteSequence.slice(end = magicSize).copyBytes()) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt index 8811a63eac..505228a377 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/serialization/amqp/AMQPSerializationScheme.kt @@ -35,10 +35,11 @@ interface SerializerFactoryFactory { } abstract class AbstractAMQPSerializationScheme( - private val cordappCustomSerializers: Set>, - val sff: SerializerFactoryFactory = createSerializerFactoryFactory() + private val cordappCustomSerializers: Set>, + private val serializerFactoriesForContexts: MutableMap, SerializerFactory>, + val sff: SerializerFactoryFactory = createSerializerFactoryFactory() ) : SerializationScheme { - constructor(cordapps: List) : this(cordapps.customSerializers) + constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) // TODO: This method of initialisation for the Whitelist and plugin serializers will have to change // when we have per-cordapp contexts and dynamic app reloading but for now it's the easiest way @@ -128,8 +129,6 @@ abstract class AbstractAMQPSerializationScheme( } } - private val serializerFactoriesForContexts = ConcurrentHashMap, SerializerFactory>() - protected abstract fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory protected abstract fun rpcServerSerializerFactory(context: SerializationContext): SerializerFactory protected open val publicKeySerializer: CustomSerializer.Implements = net.corda.nodeapi.internal.serialization.amqp.custom.PublicKeySerializer @@ -165,9 +164,13 @@ abstract class AbstractAMQPSerializationScheme( } // TODO: This will eventually cover server RPC as well and move to node module, but for now this is not implemented -class AMQPServerSerializationScheme(cordappCustomSerializers: Set> = emptySet()) - : AbstractAMQPSerializationScheme(cordappCustomSerializers) { - constructor(cordapps: List) : this(cordapps.customSerializers) +class AMQPServerSerializationScheme( + cordappCustomSerializers: Set>, + serializerFactoriesForContexts: MutableMap, SerializerFactory> +) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { + constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) + + constructor() : this(emptySet(), ConcurrentHashMap()) override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { throw UnsupportedOperationException() @@ -185,9 +188,13 @@ class AMQPServerSerializationScheme(cordappCustomSerializers: Set> = emptySet()) - : AbstractAMQPSerializationScheme(cordappCustomSerializers) { - constructor(cordapps: List) : this(cordapps.customSerializers) +class AMQPClientSerializationScheme( + cordappCustomSerializers: Set>, + serializerFactoriesForContexts: MutableMap, SerializerFactory> +) : AbstractAMQPSerializationScheme(cordappCustomSerializers, serializerFactoriesForContexts) { + constructor(cordapps: List) : this(cordapps.customSerializers, ConcurrentHashMap()) + + constructor() : this(emptySet(), ConcurrentHashMap()) override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { throw UnsupportedOperationException() diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationSchemaTests.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationSchemaTests.kt index a9c75359b7..cd49520e42 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationSchemaTests.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/serialization/amqp/SerializationSchemaTests.kt @@ -4,6 +4,7 @@ import net.corda.core.serialization.* import net.corda.core.utilities.ByteSequence import net.corda.nodeapi.internal.serialization.* import org.junit.Test +import java.util.concurrent.ConcurrentHashMap import kotlin.test.assertEquals // Make sure all serialization calls in this test don't get stomped on by anything else @@ -43,7 +44,7 @@ class TestSerializerFactoryFactory : SerializerFactoryFactoryImpl() { } } -class AMQPTestSerializationScheme : AbstractAMQPSerializationScheme(emptySet(), TestSerializerFactoryFactory()) { +class AMQPTestSerializationScheme : AbstractAMQPSerializationScheme(emptySet(), ConcurrentHashMap(), TestSerializerFactoryFactory()) { override fun rpcClientSerializerFactory(context: SerializationContext): SerializerFactory { throw UnsupportedOperationException() } From 24fa695ca0ef72fa851abc5b1630d722f32577ec Mon Sep 17 00:00:00 2001 From: Katarzyna Streich Date: Tue, 15 May 2018 12:10:04 +0100 Subject: [PATCH 10/11] CORDA-866: Implement removal of stale nodes from network - backport (#3128) * CORDA-866: Implement removal of stale nodes from network Backported * Implement removal of stale nodes from network Add eventHorizon to NetworkParameters structure. Add republishing of node info on 1 day intervals - it is treated by network map as heartbeat from node indicating if it's alive or not. Add removal of old node infos on network map signing. * Add copy method to NetworkParameters data class Add JvmOverloads annotation to the constructor, because it's data class exposed in API * Fix test --- .../net/corda/core/node/NetworkParameters.kt | 31 ++++++++++++++++--- docs/source/network-map.rst | 3 ++ .../internal/network/NetworkBootstrapper.kt | 4 ++- .../node/services/network/NetworkMapTest.kt | 17 ++++++++++ .../net/corda/node/internal/AbstractNode.kt | 27 +++++++++++----- .../node/internal/network/NetworkMapServer.kt | 9 ++++-- .../common/internal/ParametersUtilities.kt | 8 +++-- 7 files changed, 83 insertions(+), 16 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt b/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt index 2b8662d348..becd7e9d29 100644 --- a/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt +++ b/core/src/main/kotlin/net/corda/core/node/NetworkParameters.kt @@ -3,6 +3,8 @@ package net.corda.core.node import net.corda.core.identity.Party import net.corda.core.node.services.AttachmentId import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.days +import java.time.Duration import java.time.Instant /** @@ -17,18 +19,19 @@ import java.time.Instant * of parameters. * @property whitelistedContractImplementations List of whitelisted jars containing contract code for each contract class. * This will be used by [net.corda.core.contracts.WhitelistedByZoneAttachmentConstraint]. Read more about contract constraints here: + * @property eventHorizon Time after which nodes will be removed from the network map if they have not been seen + * during this period */ -// TODO Add eventHorizon - how many days a node can be offline before being automatically ejected from the network. -// It needs separate design. @CordaSerializable -data class NetworkParameters( +data class NetworkParameters @JvmOverloads constructor( val minimumPlatformVersion: Int, val notaries: List, val maxMessageSize: Int, val maxTransactionSize: Int, val modifiedTime: Instant, val epoch: Int, - val whitelistedContractImplementations: Map> + val whitelistedContractImplementations: Map>, + val eventHorizon: Duration = Int.MAX_VALUE.days ) { init { require(minimumPlatformVersion > 0) { "minimumPlatformVersion must be at least 1" } @@ -36,6 +39,25 @@ data class NetworkParameters( require(epoch > 0) { "epoch must be at least 1" } require(maxMessageSize > 0) { "maxMessageSize must be at least 1" } require(maxTransactionSize > 0) { "maxTransactionSize must be at least 1" } + require(!eventHorizon.isNegative) { "eventHorizon must be positive value" } + } + + fun copy(minimumPlatformVersion: Int, + notaries: List, + maxMessageSize: Int, + maxTransactionSize: Int, + modifiedTime: Instant, + epoch: Int, + whitelistedContractImplementations: Map> + ): NetworkParameters { + return copy(minimumPlatformVersion = minimumPlatformVersion, + notaries = notaries, + maxMessageSize = maxMessageSize, + maxTransactionSize = maxTransactionSize, + modifiedTime = modifiedTime, + epoch = epoch, + whitelistedContractImplementations = whitelistedContractImplementations, + eventHorizon = eventHorizon) } override fun toString(): String { @@ -47,6 +69,7 @@ data class NetworkParameters( whitelistedContractImplementations { ${whitelistedContractImplementations.entries.joinToString("\n ")} } + eventHorizon=$eventHorizon modifiedTime=$modifiedTime epoch=$epoch }""" diff --git a/docs/source/network-map.rst b/docs/source/network-map.rst index c7691e5e33..1c56b37ca8 100644 --- a/docs/source/network-map.rst +++ b/docs/source/network-map.rst @@ -121,6 +121,9 @@ The current set of network parameters: For each contract class there is a list of hashes of the approved CorDapp jar versions containing that contract. Read more about *Zone constraints* here :doc:`api-contract-constraints` +:eventHorizon: Time after which nodes are considered to be unresponsive and removed from network map. Nodes republish their + ``NodeInfo`` on a regular interval. Network map treats that as a heartbeat from the node. + More parameters will be added in future releases to regulate things like allowed port numbers, how long a node can be offline before it is evicted from the zone, whether or not IPv6 connectivity is required for zone members, required cryptographic algorithms and rollout schedules (e.g. for moving to post quantum cryptography), parameters related to diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt index e80bbc31a6..395c08ddfa 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/network/NetworkBootstrapper.kt @@ -15,6 +15,7 @@ import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.SerializationEnvironmentImpl import net.corda.core.serialization.internal._contextSerializationEnv +import net.corda.core.utilities.days import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.nodeapi.internal.ContractsJar @@ -222,7 +223,8 @@ class NetworkBootstrapper { maxMessageSize = 10485760, maxTransactionSize = Int.MAX_VALUE, whitelistedContractImplementations = whitelist, - epoch = 1 + epoch = 1, + eventHorizon = 30.days ) } val copier = NetworkParametersCopier(networkParameters, overwriteFile = true) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt index 62647424fa..683fc19fce 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt @@ -199,6 +199,23 @@ class NetworkMapTest { } } + @Test + fun `test node heartbeat`() { + internalDriver( + portAllocation = portAllocation, + compatibilityZone = compatibilityZone, + initialiseSerialization = false, + systemProperties = mapOf("net.corda.node.internal.nodeinfo.publish.interval" to 1.seconds.toString()) + ) { + val aliceNode = startNode(providedName = ALICE_NAME, devMode = false).getOrThrow() + assertThat(networkMapServer.networkMapHashes()).contains(aliceNode.nodeInfo.serialize().hash) + networkMapServer.removeNodeInfo(aliceNode.nodeInfo) + assertThat(networkMapServer.networkMapHashes()).doesNotContain(aliceNode.nodeInfo.serialize().hash) + Thread.sleep(2000) + assertThat(networkMapServer.networkMapHashes()).contains(aliceNode.nodeInfo.serialize().hash) + } + } + private fun NodeHandle.onlySees(vararg nodes: NodeInfo) { // Make sure the nodes aren't getting the node infos from their additional directories val nodeInfosDir = baseDirectory / CordformNode.NODE_INFO_DIRECTORY diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 47db47e03a..96dafeecc8 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -29,9 +29,7 @@ import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.debug -import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.* import net.corda.node.CordaClock import net.corda.node.VersionInfo import net.corda.node.internal.classloading.requireAnnotation @@ -88,6 +86,7 @@ import java.security.cert.X509Certificate import java.sql.Connection import java.time.Clock import java.time.Duration +import java.time.format.DateTimeParseException import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService @@ -367,6 +366,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // Write the node-info file even if nothing's changed, just in case the file has been deleted. NodeInfoWatcher.saveToFile(configuration.baseDirectory, nodeInfoAndSigned) + // Always republish on startup, it's treated by network map server as a heartbeat. if (networkMapClient != null) { tryPublishNodeInfoAsync(nodeInfoAndSigned.signed, networkMapClient) } @@ -374,18 +374,31 @@ abstract class AbstractNode(val configuration: NodeConfiguration, return Pair(keyPairs, nodeInfo) } + // Publish node info on startup and start task that sends every day a heartbeat - republishes node info. private fun tryPublishNodeInfoAsync(signedNodeInfo: SignedNodeInfo, networkMapClient: NetworkMapClient) { + // By default heartbeat interval should be set to 1 day, but for testing we may change it. + val republishProperty = System.getProperty("net.corda.node.internal.nodeinfo.publish.interval") + val heartbeatInterval = if (republishProperty != null) { + try { + Duration.parse(republishProperty) + } catch (e: DateTimeParseException) { + 1.days + } + } else { + 1.days + } val executor = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory("Network Map Updater", Executors.defaultThreadFactory())) - executor.submit(object : Runnable { override fun run() { - try { + val republishInterval = try { networkMapClient.publish(signedNodeInfo) + heartbeatInterval } catch (t: Throwable) { log.warn("Error encountered while publishing node info, will retry again", t) - // TODO: Exponential backoff? - executor.schedule(this, 1, TimeUnit.MINUTES) + // TODO: Exponential backoff? It should reach max interval of eventHorizon/2. + 1.minutes } + executor.schedule(this, republishInterval.toMinutes(), TimeUnit.MINUTES) } }) } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt index 0af7ff6458..d74b35053c 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/network/NetworkMapServer.kt @@ -8,11 +8,14 @@ import net.corda.core.node.NetworkParameters import net.corda.core.node.NodeInfo import net.corda.core.serialization.serialize import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.days import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.createDevNetworkMapCa import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair import net.corda.nodeapi.internal.network.NetworkMap import net.corda.nodeapi.internal.network.ParametersUpdate +import net.corda.testing.common.internal.testNetworkParameters import org.eclipse.jetty.server.Server import org.eclipse.jetty.server.ServerConnector import org.eclipse.jetty.server.handler.HandlerCollection @@ -40,7 +43,7 @@ class NetworkMapServer(private val pollInterval: Duration, private val myHostNameValue: String = "test.host.name", vararg additionalServices: Any) : Closeable { companion object { - private val stubNetworkParameters = NetworkParameters(1, emptyList(), 10485760, Int.MAX_VALUE, Instant.now(), 10, emptyMap()) + private val stubNetworkParameters = testNetworkParameters(epoch = 10) } private val server: Server @@ -78,6 +81,8 @@ class NetworkMapServer(private val pollInterval: Duration, .let { NetworkHostAndPort(it.host, it.localPort) } } + fun networkMapHashes(): List = service.nodeInfoMap.keys.toList() + fun removeNodeInfo(nodeInfo: NodeInfo) { service.removeNodeInfo(nodeInfo) } @@ -108,7 +113,7 @@ class NetworkMapServer(private val pollInterval: Duration, @Path("network-map") inner class InMemoryNetworkMapService { private val nodeNamesUUID = mutableMapOf() - private val nodeInfoMap = mutableMapOf() + val nodeInfoMap = mutableMapOf() // Mapping from the UUID of the network (null for global one) to hashes of the nodes in network private val networkMaps = mutableMapOf>() val latestAcceptedParametersMap = mutableMapOf() diff --git a/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/ParametersUtilities.kt b/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/ParametersUtilities.kt index a1f51ca2d3..e9f4902c58 100644 --- a/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/ParametersUtilities.kt +++ b/testing/test-common/src/main/kotlin/net/corda/testing/common/internal/ParametersUtilities.kt @@ -3,6 +3,8 @@ package net.corda.testing.common.internal import net.corda.core.node.NetworkParameters import net.corda.core.node.NotaryInfo import net.corda.core.node.services.AttachmentId +import net.corda.core.utilities.days +import java.time.Duration import java.time.Instant fun testNetworkParameters( @@ -13,7 +15,8 @@ fun testNetworkParameters( // TODO: Make this configurable and consistence across driver, bootstrapper, demobench and NetworkMapServer maxTransactionSize: Int = maxMessageSize, whitelistedContractImplementations: Map> = emptyMap(), - epoch: Int = 1 + epoch: Int = 1, + eventHorizon: Duration = 30.days ): NetworkParameters { return NetworkParameters( minimumPlatformVersion = minimumPlatformVersion, @@ -22,6 +25,7 @@ fun testNetworkParameters( maxTransactionSize = maxTransactionSize, whitelistedContractImplementations = whitelistedContractImplementations, modifiedTime = modifiedTime, - epoch = epoch + epoch = epoch, + eventHorizon = eventHorizon ) } \ No newline at end of file From d0d07287e7790649cd772fdeda422703f33e5f62 Mon Sep 17 00:00:00 2001 From: Chris Rankin Date: Tue, 15 May 2018 12:35:00 +0100 Subject: [PATCH 11/11] Remove duplicate Netty classes from Node. (#3142) * Remove duplicate Netty classes from Node. * Force all Netty modules to our given version. --- build.gradle | 11 +++++++++++ node-api/build.gradle | 5 +---- node/build.gradle | 6 ------ 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/build.gradle b/build.gradle index d8f71ddd81..d823ed5246 100644 --- a/build.gradle +++ b/build.gradle @@ -220,12 +220,23 @@ allprojects { force "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version" force "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" force "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" + + // Demand that everything uses our given version of Netty. + eachDependency { details -> + if (details.requested.group == 'io.netty' && details.requested.name.startsWith('netty-')) { + details.useVersion netty_version + } + } } } compile { // We want to use SLF4J's version of these bindings: jcl-over-slf4j // Remove any transitive dependency on Apache's version. exclude group: 'commons-logging', module: 'commons-logging' + + // Netty-All is an uber-jar which contains every Netty module. + // Exclude it to force us to use the individual Netty modules instead. + exclude group: 'io.netty', module: 'netty-all' } runtime { // We never want isolated.jar on classPath, since we want to test jar being dynamically loaded as an attachment diff --git a/node-api/build.gradle b/node-api/build.gradle index 745749c4ae..7311e7d403 100644 --- a/node-api/build.gradle +++ b/node-api/build.gradle @@ -18,9 +18,6 @@ dependencies { compile "org.apache.activemq:artemis-core-client:${artemis_version}" compile "org.apache.activemq:artemis-commons:${artemis_version}" - // Netty: All of it. - compile "io.netty:netty-all:$netty_version" - // For adding serialisation of file upload streams to RPC // TODO: Remove this dependency and the code that requires it compile "commons-fileupload:commons-fileupload:$fileupload_version" @@ -54,7 +51,7 @@ dependencies { } configurations { - testArtifacts.extendsFrom testRuntime + testArtifacts.extendsFrom testRuntime } task testJar(type: Jar) { diff --git a/node/build.gradle b/node/build.gradle index 72c440f0e3..a7e910df04 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -19,12 +19,6 @@ buildscript { //noinspection GroovyAssignabilityCheck configurations { - compile { - // We don't need these because we already include netty-all. - exclude group: 'io.netty', module: 'netty-transport' - exclude group: 'io.netty', module: 'netty-handler' - } - integrationTestCompile.extendsFrom testCompile integrationTestRuntime.extendsFrom testRuntime }