From d040945b0e9293f67d50e84e749aca3b1c8dd27d Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Tue, 24 Jul 2018 17:47:20 +0100 Subject: [PATCH] Merge fixes --- .../internal/persistence/CordaPersistence.kt | 8 +++-- .../messaging/ArtemisMessagingTest.kt | 3 +- .../net/corda/node/internal/AbstractNode.kt | 31 +++++++++---------- .../net/corda/node/internal/EnterpriseNode.kt | 5 ++- .../kotlin/net/corda/node/internal/Node.kt | 11 ++++--- .../services/messaging/P2PMessagingClient.kt | 3 +- .../MultiThreadedStateMachineManager.kt | 2 +- .../net/corda/node/internal/NodeTest.kt | 11 ++++--- .../flows/TwoPartyTradeFlowTest.kt | 18 +++++------ 9 files changed, 47 insertions(+), 45 deletions(-) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt index 581cba4cc2..82f8ac918d 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/CordaPersistence.kt @@ -63,7 +63,6 @@ val contextDatabaseOrNull: CordaPersistence? get() = _contextDatabase.get() class CordaPersistence( databaseConfig: DatabaseConfig, schemas: Set, - val jdbcUrl: String, attributeConverters: Collection> = emptySet() ) : Closeable { companion object { @@ -72,11 +71,11 @@ class CordaPersistence( private val defaultIsolationLevel = databaseConfig.transactionIsolationLevel val hibernateConfig: HibernateConfiguration by lazy { - transaction { HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl) } } + val entityManagerFactory get() = hibernateConfig.sessionFactoryForRegisteredSchemas data class Boundary(val txId: UUID, val success: Boolean) @@ -84,8 +83,11 @@ class CordaPersistence( private var _dataSource: DataSource? = null val dataSource: DataSource get() = checkNotNull(_dataSource) { "CordaPersistence not started" } - fun start(dataSource: DataSource) { + private lateinit var jdbcUrl: String + + fun start(dataSource: DataSource, jdbcUrl: String) { _dataSource = dataSource + this.jdbcUrl = jdbcUrl // Found a unit test that was forgetting to close the database transactions. When you close() on the top level // database transaction it will reset the threadLocalTx back to null, so if it isn't then there is still a // database transaction open. The [transaction] helper above handles this in a finally clause for you diff --git a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index 0bc7e1409d..1951e2a578 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -360,7 +360,7 @@ class ArtemisMessagingTest { } private fun startNodeMessagingClient(maxMessageSize: Int = MAX_MESSAGE_SIZE) { - messagingClient!!.start(identity.public, null, maxMessageSize) + messagingClient!!.start(identity.public, null, maxMessageSize, legalName = ALICE_NAME.toString()) } private fun createAndStartClientAndServer(platformVersion: Int = 1, serverMaxMessageSize: Int = MAX_MESSAGE_SIZE, @@ -395,6 +395,7 @@ class ArtemisMessagingTest { ServiceAffinityExecutor("ArtemisMessagingTests", 1), database, networkMapCache, + MetricRegistry(), isDrainingModeOn = { false }, drainingModeWasChangedEvents = PublishSubject.create()).apply { config.configureWithDevSSLCertificate() diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 9511ae72dc..03154d5afd 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -79,10 +79,7 @@ import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.NodeInfoAndSigned import net.corda.nodeapi.internal.SignedNodeInfo import net.corda.nodeapi.internal.crypto.X509Utilities -import net.corda.nodeapi.internal.persistence.CordaPersistence -import net.corda.nodeapi.internal.persistence.CouldNotCreateDataSourceException -import net.corda.nodeapi.internal.persistence.DatabaseConfig -import net.corda.nodeapi.internal.persistence.IncompatibleAttachmentsContractsTableName +import net.corda.nodeapi.internal.persistence.* import net.corda.nodeapi.internal.storeLegalIdentity import net.corda.tools.shell.InteractiveShell import org.apache.activemq.artemis.utils.ReusableLatch @@ -107,8 +104,6 @@ import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.TimeUnit.MINUTES import java.util.concurrent.TimeUnit.SECONDS import kotlin.collections.set @@ -180,7 +175,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, @Suppress("LeakingThis") val transactionStorage = makeTransactionStorage(configuration.transactionCacheSizeBytes).tokenize() val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL) } - private val metricRegistry = MetricRegistry() + val metricRegistry = MetricRegistry() val attachments = NodeAttachmentService(metricRegistry, database, configuration.attachmentContentCacheSizeBytes, configuration.attachmentCacheBound).tokenize() val cordappProvider = CordappProviderImpl(cordappLoader, CordappConfigFileProvider(), attachments).tokenize() @Suppress("LeakingThis") @@ -795,7 +790,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } val props = configuration.dataSourceProperties if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.") - database.hikariStart(props) + database.hikariStart(props, configuration.database, schemaService) // Now log the vendor string as this will also cause a connection to be tested eagerly. logVendorString(database, log) } @@ -1061,7 +1056,7 @@ fun configureDatabase(hikariProperties: Properties, wellKnownPartyFromAnonymous: (AbstractParty) -> Party?, schemaService: SchemaService = NodeSchemaService()): CordaPersistence { val persistence = createCordaPersistence(databaseConfig, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous, schemaService) - persistence.hikariStart(hikariProperties) + persistence.hikariStart(hikariProperties, databaseConfig, schemaService) return persistence } @@ -1074,18 +1069,22 @@ fun createCordaPersistence(databaseConfig: DatabaseConfig, // so we end up providing both descriptor and converter. We should re-examine this in later versions to see if // either Hibernate can be convinced to stop warning, use the descriptor by default, or something else. JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous)) + val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous)) + return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, attributeConverters) +} - val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous)) +fun CordaPersistence.hikariStart(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemaService: SchemaService) { + try { + val dataSource = DataSourceFactory.createDataSource(hikariProperties) val jdbcUrl = hikariProperties.getProperty("dataSource.url", "") - SchemaMigration( + val schemaMigration = SchemaMigration( schemaService.schemaOptions.keys, dataSource, !isH2Database(jdbcUrl), - databaseConfig).nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L })return CordaPersistence( databaseConfig, schemaService.schemaOptions.keys, jdbcUrl,attributeConverters)} - -fun CordaPersistence.hikariStart(hikariProperties: Properties) { - try { - start(DataSourceFactory.createDataSource(hikariProperties)) + databaseConfig + ) + schemaMigration.nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L }) + start(dataSource, jdbcUrl) } catch (ex: Exception) { when { ex is HikariPool.PoolInitializationException -> throw CouldNotCreateDataSourceException("Could not connect to the database. Please check your JDBC connection URL, or the connectivity to the database.", ex) diff --git a/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt b/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt index 7204f04b5a..e1b10ec301 100644 --- a/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/EnterpriseNode.kt @@ -27,7 +27,6 @@ import net.corda.node.services.config.RelayConfiguration import net.corda.node.services.statemachine.MultiThreadedStateMachineExecutor import net.corda.node.services.statemachine.MultiThreadedStateMachineManager import net.corda.node.services.statemachine.StateMachineManager -import net.corda.nodeapi.internal.persistence.CordaPersistence import org.fusesource.jansi.Ansi import org.fusesource.jansi.AnsiConsole import java.io.IOException @@ -184,7 +183,7 @@ D""".trimStart() return MultiThreadedStateMachineExecutor(configuration.enterpriseConfiguration.tuning.flowThreadPoolSize) } - override fun makeStateMachineManager(database: CordaPersistence): StateMachineManager { + override fun makeStateMachineManager(): StateMachineManager { if (configuration.enterpriseConfiguration.useMultiThreadedSMM) { val executor = makeStateMachineExecutorService() runOnStop += { executor.shutdown() } @@ -199,7 +198,7 @@ D""".trimStart() ) } else { log.info("Single-threaded state machine manager with 1 thread.") - return super.makeStateMachineManager(database) + return super.makeStateMachineManager() } } } diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index f1c35696b1..595c92d781 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -182,6 +182,7 @@ open class Node(configuration: NodeConfiguration, nodeExecutor = serverThread, database = database, networkMap = networkMapCache, + metricRegistry = metricRegistry, isDrainingModeOn = nodeProperties.flowsDrainingMode::isEnabled, drainingModeWasChangedEvents = nodeProperties.flowsDrainingMode.values ) @@ -214,10 +215,9 @@ open class Node(configuration: NodeConfiguration, startLocalRpcBroker(securityManager) } - val advertisedAddress = info.addresses.single() val externalBridge = configuration.enterpriseConfiguration.externalBridge val bridgeControlListener = if (externalBridge == null || !externalBridge) { - BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize) + BridgeControlListener(configuration, client.serverAddress, networkParameters.maxMessageSize) } else { null } @@ -243,7 +243,7 @@ open class Node(configuration: NodeConfiguration, start() } // Start P2P bridge service - bridgeControlListener.apply { + bridgeControlListener?.apply { closeOnStop() start() } @@ -256,8 +256,9 @@ open class Node(configuration: NodeConfiguration, client.start( myIdentity = nodeInfo.legalIdentities[0].owningKey, serviceIdentity = if (nodeInfo.legalIdentities.size == 1) null else nodeInfo.legalIdentities[1].owningKey, - advertisedAddress = nodeInfo.addresses[0], - maxMessageSize = networkParameters.maxMessageSize + advertisedAddress = nodeInfo.addresses.single(), + maxMessageSize = networkParameters.maxMessageSize, + legalName = nodeInfo.legalIdentities[0].name.toString() ) } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 907fecc6da..b175f7792d 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -93,7 +93,6 @@ class P2PMessagingClient(val config: NodeConfiguration, private val database: CordaPersistence, private val networkMap: NetworkMapCacheInternal, private val metricRegistry: MetricRegistry, - val legalName: String, private val isDrainingModeOn: () -> Boolean, private val drainingModeWasChangedEvents: Observable> ) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver { @@ -154,7 +153,7 @@ class P2PMessagingClient(val config: NodeConfiguration, * in the network map data. * @param maxMessageSize A bound applied to the message size. */ - fun start(myIdentity: PublicKey, serviceIdentity: PublicKey?, maxMessageSize: Int, advertisedAddress: NetworkHostAndPort = serverAddress) { + fun start(myIdentity: PublicKey, serviceIdentity: PublicKey?, maxMessageSize: Int, advertisedAddress: NetworkHostAndPort = serverAddress, legalName: String) { this.myIdentity = myIdentity this.serviceIdentity = serviceIdentity this.advertisedAddress = advertisedAddress diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index a3ab52e4e8..bdd6e2b5bc 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -118,7 +118,7 @@ class MultiThreadedStateMachineManager( private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub) private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null private val transitionExecutor = makeTransitionExecutor() - private val ourSenderUUID = serviceHub.networkService.ourSenderUUID + private val ourSenderUUID get() = serviceHub.networkService.ourSenderUUID private var checkpointSerializationContext: SerializationContext? = null private var tokenizableServices: List? = null diff --git a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt index 0a4bba71ce..70e607aaf9 100644 --- a/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt +++ b/node/src/test/kotlin/net/corda/node/internal/NodeTest.kt @@ -70,10 +70,7 @@ class NodeTest { val configuration = createConfig(ALICE_NAME) val info = VersionInfo(789, "3.0", "SNAPSHOT", "R3") configureDatabase(configuration.dataSourceProperties, configuration.database, { null }, { null }).use { - val versionInfo = rigorousMock().also { - doReturn(platformVersion).whenever(it).platformVersion - } - val node = Node(configuration, versionInfo, initialiseSerialization = false) + val node = Node(configuration, info, initialiseSerialization = false) assertEquals(node.generateNodeInfo(), node.generateNodeInfo()) // Node info doesn't change (including the serial) } } @@ -179,7 +176,11 @@ class NodeTest { flowTimeout = FlowTimeoutConfiguration(timeout = Duration.ZERO, backoffBase = 1.0, maxRestartCount = 1), rpcSettings = NodeRpcSettings(address = fakeAddress, adminAddress = null, ssl = null), messagingServerAddress = null, - notary = null + notary = null, + enterpriseConfiguration = EnterpriseConfiguration( + mutualExclusionConfiguration = MutualExclusionConfiguration(updateInterval = 0, waitInterval = 0) + ), + relay = null ) } } diff --git a/perftestcordapp/src/test/kotlin/com/r3/corda/enterprise/perftestcordapp/flows/TwoPartyTradeFlowTest.kt b/perftestcordapp/src/test/kotlin/com/r3/corda/enterprise/perftestcordapp/flows/TwoPartyTradeFlowTest.kt index 5af88e64b0..2181cea55c 100644 --- a/perftestcordapp/src/test/kotlin/com/r3/corda/enterprise/perftestcordapp/flows/TwoPartyTradeFlowTest.kt +++ b/perftestcordapp/src/test/kotlin/com/r3/corda/enterprise/perftestcordapp/flows/TwoPartyTradeFlowTest.kt @@ -166,11 +166,11 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) { bobNode.dispose() aliceNode.database.transaction { - assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty() + assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty() } aliceNode.internals.manuallyCloseDB() bobNode.database.transaction { - assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty() + assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty() } bobNode.internals.manuallyCloseDB() } @@ -224,11 +224,11 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) { bobNode.dispose() aliceNode.database.transaction { - assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty() + assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty() } aliceNode.internals.manuallyCloseDB() bobNode.database.transaction { - assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty() + assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty() } bobNode.internals.manuallyCloseDB() } @@ -281,7 +281,7 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) { // OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature. bobNode.database.transaction { - assertThat(bobNode.checkpointStorage.checkpoints()).hasSize(1) + assertThat(bobNode.internals.checkpointStorage.checkpoints()).hasSize(1) } val storage = bobNode.services.validatedTransactions @@ -314,10 +314,10 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) { assertThat(bobNode.smm.findStateMachines(Buyer::class.java)).isEmpty() bobNode.database.transaction { - assertThat(bobNode.checkpointStorage.checkpoints()).isEmpty() + assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty() } aliceNode.database.transaction { - assertThat(aliceNode.checkpointStorage.checkpoints()).isEmpty() + assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty() } bobNode.database.transaction { @@ -340,8 +340,8 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) { return mockNet.createNode(InternalMockNodeParameters(legalName = name), nodeFactory = { args, _ -> object : InternalMockNetwork.MockNode(args) { // That constructs a recording tx storage - override fun makeTransactionStorage(database: CordaPersistence, transactionCacheSizeBytes: Long): WritableTransactionStorage { - return RecordingTransactionStorage(database, super.makeTransactionStorage(database, transactionCacheSizeBytes)) + override fun makeTransactionStorage(transactionCacheSizeBytes: Long): WritableTransactionStorage { + return RecordingTransactionStorage(database, super.makeTransactionStorage(transactionCacheSizeBytes)) } } })