diff --git a/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt b/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt index 4b1b49cb11..b262eefef5 100644 --- a/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt @@ -9,8 +9,8 @@ import net.corda.core.readLines import net.corda.node.LOGS_DIRECTORY_NAME import net.corda.node.services.api.RegulatorService import net.corda.node.services.transactions.SimpleNotaryService -import org.assertj.core.api.Assertions.assertThat import net.corda.nodeapi.ArtemisMessagingComponent +import org.assertj.core.api.Assertions.assertThat import org.junit.Test import java.nio.file.Paths import java.util.concurrent.Executors diff --git a/node/src/main/kotlin/net/corda/node/Corda.kt b/node/src/main/kotlin/net/corda/node/Corda.kt index 5b50c8dd24..73192c16d7 100644 --- a/node/src/main/kotlin/net/corda/node/Corda.kt +++ b/node/src/main/kotlin/net/corda/node/Corda.kt @@ -1,4 +1,5 @@ @file:JvmName("Corda") + package net.corda.node import com.jcabi.manifests.Manifests @@ -116,7 +117,7 @@ fun main(args: Array) { log.info("Machine: ${InetAddress.getLocalHost().hostName}") log.info("Working Directory: ${cmdlineOptions.baseDirectory}") val agentProperties = sun.misc.VMSupport.getAgentProperties() - if(agentProperties.containsKey("sun.jdwp.listenerAddress")) { + if (agentProperties.containsKey("sun.jdwp.listenerAddress")) { log.info("Debug port: ${agentProperties.getProperty("sun.jdwp.listenerAddress")}") } log.info("Starting as node on ${conf.p2pAddress}") @@ -172,7 +173,7 @@ private fun printPluginsAndServices(node: Node) { } val plugins = node.pluginRegistries .map { it.javaClass.name } - .filterNot { it.startsWith("net.corda.node.") || it.startsWith("net.corda.core.") || it.startsWith("net.corda.nodeapi.")} + .filterNot { it.startsWith("net.corda.node.") || it.startsWith("net.corda.core.") || it.startsWith("net.corda.nodeapi.") } .map { it.substringBefore('$') } if (plugins.isNotEmpty()) printBasicNodeInfo("Loaded plugins", plugins.joinToString()) @@ -217,7 +218,7 @@ private fun drawBanner(nodeVersionInfo: NodeVersionInfo) { val (msg1, msg2) = messageOfTheDay() println(Ansi.ansi().fgBrightRed().a( -""" + """ ______ __ / ____/ _________/ /___ _ / / __ / ___/ __ / __ `/ """).fgBrightBlue().a(msg1).newline().fgBrightRed().a( diff --git a/node/src/main/kotlin/net/corda/node/driver/Driver.kt b/node/src/main/kotlin/net/corda/node/driver/Driver.kt index f1cd67b32a..8256e55bbd 100644 --- a/node/src/main/kotlin/net/corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/net/corda/node/driver/Driver.kt @@ -1,4 +1,5 @@ @file:JvmName("Driver") + package net.corda.node.driver import com.google.common.net.HostAndPort @@ -287,6 +288,7 @@ class ShutdownManager(private val executorService: ExecutorService) { val registeredShutdowns = ArrayList Unit>>() var isShutdown = false } + private val state = ThreadBox(State()) fun shutdown() { @@ -302,7 +304,7 @@ class ShutdownManager(private val executorService: ExecutorService) { /** Could not get all of them, collect what we have */ shutdownFutures.filter { it.isDone }.map { it.get() } } - shutdowns.reversed().forEach{ it() } + shutdowns.reversed().forEach { it() } } fun registerShutdown(shutdown: ListenableFuture<() -> Unit>) { @@ -432,9 +434,9 @@ class DriverDSL( ) + customOverrides val config = ConfigHelper.loadConfig( - baseDirectory = baseDirectory, - allowMissingConfig = true, - configOverrides = configOverrides) + baseDirectory = baseDirectory, + allowMissingConfig = true, + configOverrides = configOverrides) val configuration = config.parseAs() val processFuture = startNode(executorService, configuration, config, quasarJarPath, debugPort, systemProperties) diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 69cbaec270..27a8243b10 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -199,10 +199,12 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, log.warn("Corda node is running in dev mode.") configuration.configureWithDevSSLCertificate() } - require(hasSSLCertificates()) { "Identity certificate not found. " + - "Please either copy your existing identity key and certificate from another node, " + - "or if you don't have one yet, fill out the config file and run corda.jar --initial-registration. " + - "Read more at: https://docs.corda.net/permissioning.html" } + require(hasSSLCertificates()) { + "Identity certificate not found. " + + "Please either copy your existing identity key and certificate from another node, " + + "or if you don't have one yet, fill out the config file and run corda.jar --initial-registration. " + + "Read more at: https://docs.corda.net/permissioning.html" + } log.info("Node starting up ...") @@ -487,7 +489,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, protected open fun makeSchemaService(): SchemaService = NodeSchemaService() - protected abstract fun makeTransactionVerifierService() : TransactionVerifierService + protected abstract fun makeTransactionVerifierService(): TransactionVerifierService open fun stop() { // TODO: We need a good way of handling "nice to have" shutdown events, especially those that deal with the diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 7064f5c8f2..5b13c0723e 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -111,7 +111,7 @@ class CordaRPCOpsImpl( override fun attachmentExists(id: SecureHash): Boolean { // TODO: this operation should not require an explicit transaction - return databaseTransaction(database){ + return databaseTransaction(database) { services.storageService.attachments.openAttachment(id) != null } } @@ -125,10 +125,11 @@ class CordaRPCOpsImpl( override fun uploadAttachment(jar: InputStream): SecureHash { // TODO: this operation should not require an explicit transaction - return databaseTransaction(database){ + return databaseTransaction(database) { services.storageService.attachments.importAttachment(jar) } } + override fun authoriseContractUpgrade(state: StateAndRef<*>, upgradedContractClass: Class>) = services.vaultService.authoriseContractUpgrade(state, upgradedContractClass) override fun deauthoriseContractUpgrade(state: StateAndRef<*>) = services.vaultService.deauthoriseContractUpgrade(state) override fun currentNodeTime(): Instant = Instant.now(services.clock) diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 1b28b8b46b..b8dfd9e5e2 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -23,10 +23,9 @@ import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.NodeMessagingClient import net.corda.node.services.transactions.PersistentUniquenessProvider +import net.corda.node.services.transactions.RaftNonValidatingNotaryService import net.corda.node.services.transactions.RaftUniquenessProvider import net.corda.node.services.transactions.RaftValidatingNotaryService -import net.corda.node.services.transactions.RaftNonValidatingNotaryService -import net.corda.node.services.transactions.* import net.corda.node.utilities.AddressUtils import net.corda.node.utilities.AffinityExecutor import net.corda.nodeapi.ArtemisMessagingComponent.NetworkMapAddress diff --git a/node/src/main/kotlin/net/corda/node/services/api/AcceptsFileUpload.kt b/node/src/main/kotlin/net/corda/node/services/api/AcceptsFileUpload.kt index 933e204dc1..24d787f27c 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/AcceptsFileUpload.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/AcceptsFileUpload.kt @@ -7,7 +7,7 @@ import net.corda.core.node.services.FileUploader * * TODO: In future, also accept uploads over the MQ interface too. */ -interface AcceptsFileUpload: FileUploader { +interface AcceptsFileUpload : FileUploader { /** A string that prefixes the URLs, e.g. "attachments" or "interest-rates". Should be OK for URLs. */ val dataTypePrefix: String diff --git a/node/src/main/kotlin/net/corda/node/services/database/KotlinConfigurationTransactionWrapper.kt b/node/src/main/kotlin/net/corda/node/services/database/KotlinConfigurationTransactionWrapper.kt index 20e99c58dd..a122022a62 100644 --- a/node/src/main/kotlin/net/corda/node/services/database/KotlinConfigurationTransactionWrapper.kt +++ b/node/src/main/kotlin/net/corda/node/services/database/KotlinConfigurationTransactionWrapper.kt @@ -9,7 +9,10 @@ import io.requery.sql.* import io.requery.sql.platform.H2 import io.requery.util.function.Function import io.requery.util.function.Supplier -import net.corda.core.schemas.requery.converters.* +import net.corda.core.schemas.requery.converters.InstantConverter +import net.corda.core.schemas.requery.converters.SecureHashConverter +import net.corda.core.schemas.requery.converters.StateRefConverter +import net.corda.core.schemas.requery.converters.VaultStateStatusConverter import org.jetbrains.exposed.sql.transactions.TransactionManager import java.sql.Connection import java.util.* @@ -128,8 +131,7 @@ class KotlinConfigurationTransactionWrapper(private val model: EntityModel, override fun getConnection(): Connection { val tx = TransactionManager.manager.currentOrNull() return CordaConnection( - tx?.connection ?: - TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ).connection + tx?.connection ?: TransactionManager.manager.newTransaction(Connection.TRANSACTION_REPEATABLE_READ).connection ) } } diff --git a/node/src/main/kotlin/net/corda/node/services/database/RequeryConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/database/RequeryConfiguration.kt index 409b309ede..908661d837 100644 --- a/node/src/main/kotlin/net/corda/node/services/database/RequeryConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/database/RequeryConfiguration.kt @@ -4,7 +4,9 @@ import com.zaxxer.hikari.HikariConfig import com.zaxxer.hikari.HikariDataSource import io.requery.Persistable import io.requery.meta.EntityModel -import io.requery.sql.* +import io.requery.sql.KotlinEntityDataStore +import io.requery.sql.SchemaModifier +import io.requery.sql.TableCreationMode import net.corda.core.utilities.loggerFor import org.jetbrains.exposed.sql.transactions.TransactionManager import java.sql.Connection diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index d0076ec40a..ba08978632 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -104,6 +104,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, var rpcNotificationConsumer: ClientConsumer? = null var verificationResponseConsumer: ClientConsumer? = null } + val verifierService = when (config.verifierType) { VerifierType.InMemory -> InMemoryTransactionVerifierService(numberOfWorkers = 4) VerifierType.OutOfProcess -> createOutOfProcessVerifierService() @@ -421,8 +422,10 @@ class NodeMessagingClient(override val config: NodeConfiguration, putLongProperty(HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + amqDelay) } } - log.trace { "Send to: $mqAddress topic: ${message.topicSession.topic} " + - "sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}" } + log.trace { + "Send to: $mqAddress topic: ${message.topicSession.topic} " + + "sessionID: ${message.topicSession.sessionID} uuid: ${message.uniqueMessageId}" + } producer!!.send(mqAddress, artemisMessage) } } @@ -499,19 +502,19 @@ class NodeMessagingClient(override val config: NodeConfiguration, private fun createOutOfProcessVerifierService(): TransactionVerifierService { return object : OutOfProcessTransactionVerifierService(monitoringService) { - override fun sendRequest(nonce: Long, transaction: LedgerTransaction) { - messagingExecutor.fetchFrom { - state.locked { - val message = session!!.createMessage(false) - val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress)) - request.writeToClientMessage(message) - producer!!.send(VERIFICATION_REQUESTS_QUEUE_NAME, message) - } + override fun sendRequest(nonce: Long, transaction: LedgerTransaction) { + messagingExecutor.fetchFrom { + state.locked { + val message = session!!.createMessage(false) + val request = VerifierApi.VerificationRequest(nonce, transaction, SimpleString(verifierResponseAddress)) + request.writeToClientMessage(message) + producer!!.send(VERIFICATION_REQUESTS_QUEUE_NAME, message) } } - } + } + } override fun getAddressOfParty(partyInfo: PartyInfo): MessageRecipients { diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt index b2199873d1..ed0eae56c0 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCDispatcher.kt @@ -70,7 +70,9 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService, v // representing what happened, which is useful for us to send over the wire. val subscription = obj.materialize().subscribe { materialised: Notification -> val newKryo = createRPCKryoForSerialization(qName, dispatcher) - val bits = try { MarshalledObservation(handle, materialised).serialize(newKryo) } finally { + val bits = try { + MarshalledObservation(handle, materialised).serialize(newKryo) + } finally { releaseRPCKryoForSerialization(newKryo) } rpcLog.debug("RPC sending observation: $materialised") @@ -91,7 +93,9 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService, v throw RPCException("Received RPC without any destination for observations, but the RPC returns observables") val kryo = createRPCKryoForSerialization(observationsTo, this) - val args = try { argsBytes.deserialize(kryo) } finally { + val args = try { + argsBytes.deserialize(kryo) + } finally { releaseRPCKryoForSerialization(kryo) } @@ -173,6 +177,7 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService, v // TODO remove this User once webserver doesn't need it private val nodeUser = User(NODE_USER, NODE_USER, setOf()) + @VisibleForTesting protected open fun getUser(message: ClientMessage): User { val validatedUser = message.requiredString(Message.HDR_VALIDATED_USER.toString()) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSession.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSession.kt index 11dcd6f493..b0ee2906d3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSession.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSession.kt @@ -10,8 +10,7 @@ class FlowSession( val flow: FlowLogic<*>, val ourSessionId: Long, val initiatingParty: Party?, - var state: FlowSessionState) -{ + var state: FlowSessionState) { val receivedMessages = ConcurrentLinkedQueue>() val fiber: FlowStateMachineImpl<*> get() = flow.stateMachine as FlowStateMachineImpl<*> diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt index 2f6e44eb94..5b74b0330d 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt @@ -26,6 +26,7 @@ interface SessionInitResponse : ExistingSessionMessage { val initiatorSessionId: Long override val recipientSessionId: Long get() = initiatorSessionId } + data class SessionConfirm(override val initiatorSessionId: Long, val initiatedSessionId: Long) : SessionInitResponse data class SessionReject(override val initiatorSessionId: Long, val errorMessage: String) : SessionInitResponse diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 3693a58b89..fc2b8383f8 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -5,7 +5,6 @@ import co.paralleluniverse.fibers.FiberExecutorScheduler import co.paralleluniverse.io.serialization.kryo.KryoSerializer import co.paralleluniverse.strands.Strand import com.codahale.metrics.Gauge -import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.pool.KryoPool import com.google.common.collect.HashMultimap import com.google.common.util.concurrent.ListenableFuture @@ -79,6 +78,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, companion object { private val logger = loggerFor() internal val sessionTopic = TopicSession("platform.session") + init { Fiber.setDefaultUncaughtExceptionHandler { fiber, throwable -> (fiber as FlowStateMachineImpl<*>).logger.error("Caught exception from flow", throwable) @@ -101,12 +101,13 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, var started = false val stateMachines = LinkedHashMap, Checkpoint>() val changesPublisher = PublishSubject.create()!! - val fibersWaitingForLedgerCommit = HashMultimap.create>()!! + val fibersWaitingForLedgerCommit = HashMultimap.create>()!! fun notifyChangeObservers(fiber: FlowStateMachineImpl<*>, addOrRemove: AddOrRemove) { changesPublisher.bufferUntilDatabaseCommit().onNext(Change(fiber.logic, addOrRemove, fiber.id)) } } + private val mutex = ThreadBox(InnerState()) // True if we're shutting down, so don't resume anything. diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt b/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt index 5711458fb6..e0d2999526 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/OutOfProcessTransactionVerifierService.kt @@ -9,7 +9,6 @@ import net.corda.core.node.services.TransactionVerifierService import net.corda.core.random63BitValue import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.LedgerTransaction -import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor import net.corda.node.services.api.MonitoringService import net.corda.nodeapi.VerifierApi @@ -22,15 +21,18 @@ abstract class OutOfProcessTransactionVerifierService( companion object { val log = loggerFor() } + private data class VerificationHandle( val transactionId: SecureHash, val resultFuture: SettableFuture, val durationTimerContext: Timer.Context ) + private val verificationHandles = ConcurrentHashMap() // Metrics private fun metric(name: String) = "OutOfProcessTransactionVerifierService.$name" + private val durationTimer = monitoringService.metrics.timer(metric("Verification.Duration")) private val successMeter = monitoringService.metrics.meter(metric("Verification.Success")) private val failureMeter = monitoringService.metrics.meter(metric("Verification.Failure")) diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index ae36165e32..e598519696 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -13,7 +13,10 @@ import net.corda.contracts.asset.Cash import net.corda.core.ThreadBox import net.corda.core.bufferUntilSubscribed import net.corda.core.contracts.* -import net.corda.core.crypto.* +import net.corda.core.crypto.AbstractParty +import net.corda.core.crypto.CompositeKey +import net.corda.core.crypto.Party +import net.corda.core.crypto.SecureHash import net.corda.core.node.ServiceHub import net.corda.core.node.services.StatesNotAvailableException import net.corda.core.node.services.Vault @@ -55,7 +58,7 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P val log = loggerFor() // Define composite primary key used in Requery Expression - val stateRefCompositeColumn : RowExpression = RowExpression.of(listOf(VaultStatesEntity.TX_ID, VaultStatesEntity.INDEX)) + val stateRefCompositeColumn: RowExpression = RowExpression.of(listOf(VaultStatesEntity.TX_ID, VaultStatesEntity.INDEX)) } val configuration = RequeryConfiguration(dataSourceProperties) @@ -136,7 +139,7 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P } upsert(state ?: cashBalanceEntity) val total = state?.amount ?: cashBalanceEntity.amount - log.trace{"Updating Cash balance for $currency by ${cashBalanceEntity.amount} pennies (total: $total)"} + log.trace { "Updating Cash balance for $currency by ${cashBalanceEntity.amount} pennies (total: $total)" } } } } @@ -171,24 +174,24 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P } } - override fun states(clazzes: Set>, statuses: EnumSet, includeSoftLockedStates: Boolean): Iterable> { + override fun states(clazzes: Set>, statuses: EnumSet, includeSoftLockedStates: Boolean): Iterable> { val stateAndRefs = - session.withTransaction(TransactionIsolation.REPEATABLE_READ) { - var query = select(VaultSchema.VaultStates::class) - .where(VaultSchema.VaultStates::stateStatus `in` statuses) - // TODO: temporary fix to continue supporting track() function (until becomes Typed) - if (!clazzes.map {it.name}.contains(ContractState::class.java.name)) - query.and (VaultSchema.VaultStates::contractStateClassName `in` (clazzes.map { it.name })) - if (!includeSoftLockedStates) - query.and(VaultSchema.VaultStates::lockId.isNull()) - val iterator = query.get().iterator() - Sequence{iterator} - .map { it -> - val stateRef = StateRef(SecureHash.parse(it.txId), it.index) - val state = it.contractState.deserialize>(storageKryo()) - StateAndRef(state, stateRef) - } - } + session.withTransaction(TransactionIsolation.REPEATABLE_READ) { + var query = select(VaultSchema.VaultStates::class) + .where(VaultSchema.VaultStates::stateStatus `in` statuses) + // TODO: temporary fix to continue supporting track() function (until becomes Typed) + if (!clazzes.map { it.name }.contains(ContractState::class.java.name)) + query.and(VaultSchema.VaultStates::contractStateClassName `in` (clazzes.map { it.name })) + if (!includeSoftLockedStates) + query.and(VaultSchema.VaultStates::lockId.isNull()) + val iterator = query.get().iterator() + Sequence { iterator } + .map { it -> + val stateRef = StateRef(SecureHash.parse(it.txId), it.index) + val state = it.contractState.deserialize>(storageKryo()) + StateAndRef(state, stateRef) + } + } return stateAndRefs.asIterable() } @@ -286,14 +289,13 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P val update = update(VaultStatesEntity::class) .set(VaultStatesEntity.LOCK_ID, null) .set(VaultStatesEntity.LOCK_UPDATE_TIME, services.clock.instant()) - .where (VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED) - .and (VaultStatesEntity.LOCK_ID eq lockId.toString()).get() + .where(VaultStatesEntity.STATE_STATUS eq Vault.StateStatus.UNCONSUMED) + .and(VaultStatesEntity.LOCK_ID eq lockId.toString()).get() if (update.value() > 0) { log.trace("Releasing ${update.value()} soft locked states for $lockId") } } - } - else if (stateRefs.isNotEmpty()) { + } else if (stateRefs.isNotEmpty()) { try { session.withTransaction(TransactionIsolation.REPEATABLE_READ) { val updatedRows = update(VaultStatesEntity::class) @@ -412,21 +414,21 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P override fun softLockedStates(lockId: UUID?): List> { val stateAndRefs = - session.withTransaction(TransactionIsolation.REPEATABLE_READ) { - var query = select(VaultSchema.VaultStates::class) - .where(VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED) - .and(VaultSchema.VaultStates::contractStateClassName eq Cash.State::class.java.name) - if (lockId != null) - query.and(VaultSchema.VaultStates::lockId eq lockId) - else - query.and(VaultSchema.VaultStates::lockId.notNull()) - query.get() - .map { it -> - val stateRef = StateRef(SecureHash.parse(it.txId), it.index) - val state = it.contractState.deserialize>(storageKryo()) - StateAndRef(state, stateRef) - }.toList() - } + session.withTransaction(TransactionIsolation.REPEATABLE_READ) { + var query = select(VaultSchema.VaultStates::class) + .where(VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED) + .and(VaultSchema.VaultStates::contractStateClassName eq Cash.State::class.java.name) + if (lockId != null) + query.and(VaultSchema.VaultStates::lockId eq lockId) + else + query.and(VaultSchema.VaultStates::lockId.notNull()) + query.get() + .map { it -> + val stateRef = StateRef(SecureHash.parse(it.txId), it.index) + val state = it.contractState.deserialize>(storageKryo()) + StateAndRef(state, stateRef) + }.toList() + } return stateAndRefs } @@ -561,8 +563,8 @@ class NodeVaultService(private val services: ServiceHub, dataSourceProperties: P if (tx.inputs.isNotEmpty()) { session.withTransaction(TransactionIsolation.REPEATABLE_READ) { val result = select(VaultStatesEntity::class). - where (stateRefCompositeColumn.`in`(stateRefArgs(tx.inputs))). - and (VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED) + where(stateRefCompositeColumn.`in`(stateRefArgs(tx.inputs))). + and(VaultSchema.VaultStates::stateStatus eq Vault.StateStatus.UNCONSUMED) result.get().forEach { val txHash = SecureHash.parse(it.txId) val index = it.index diff --git a/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt b/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt index a034c042ca..f8b14034d3 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/VaultSoftLockManager.kt @@ -1,4 +1,5 @@ package net.corda.node.services.vault + import net.corda.core.contracts.StateRef import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId @@ -19,7 +20,7 @@ class VaultSoftLockManager(val vault: VaultService, smm: StateMachineManager) { init { smm.changes.subscribe { change -> if (change.addOrRemove == AddOrRemove.REMOVE && trackingFlowIds.contains(change.id.uuid)) { - log.trace( "${change.addOrRemove} Flow name ${change.logic.javaClass} with id ${change.id}") + log.trace("${change.addOrRemove} Flow name ${change.logic.javaClass} with id ${change.id}") unregisterSoftLocks(change.id, change.logic) } trackingFlowIds.remove(change.id.uuid) diff --git a/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt b/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt index 0d5ba454cd..af8b96bab2 100644 --- a/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt +++ b/node/src/main/kotlin/net/corda/node/shell/InteractiveShell.kt @@ -217,7 +217,7 @@ object InteractiveShell { return } else if (matches.size > 1) { output.println("Ambigous name provided, please be more specific. Your options are:") - matches.forEachIndexed { i, s -> output.println("${i+1}. $s", Color.yellow) } + matches.forEachIndexed { i, s -> output.println("${i + 1}. $s", Color.yellow) } return } val match = matches.single() diff --git a/node/src/main/kotlin/net/corda/node/utilities/AffinityExecutor.kt b/node/src/main/kotlin/net/corda/node/utilities/AffinityExecutor.kt index 818ec902c3..273a861d56 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AffinityExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AffinityExecutor.kt @@ -1,11 +1,13 @@ package net.corda.node.utilities -import com.google.common.util.concurrent.ListeningScheduledExecutorService import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.Uninterruptibles import net.corda.core.utilities.loggerFor import java.util.* -import java.util.concurrent.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executor +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ScheduledThreadPoolExecutor import java.util.function.Supplier /** diff --git a/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt b/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt index 0fc0f38f9a..5873b52fe6 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/DatabaseSupport.kt @@ -261,6 +261,7 @@ fun rx.Observable.wrapWithDatabaseTransaction(db: Database? = null) // Composite columns for use with below Exposed helpers. data class PartyColumns(val name: Column, val owningKey: Column) + data class StateRefColumns(val txId: Column, val index: Column) data class TxnNoteColumns(val txId: Column, val note: Column) diff --git a/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt b/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt index 370ba5ff0e..11ce7f8639 100644 --- a/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt +++ b/node/src/test/kotlin/net/corda/node/InteractiveShellTest.kt @@ -28,6 +28,7 @@ class InteractiveShellTest { constructor(amount: Amount) : this(amount.toString()) constructor(pair: Pair, SecureHash.SHA256>) : this(pair.toString()) constructor(party: Party) : this(party.name) + override fun call() = a } diff --git a/node/src/test/kotlin/net/corda/node/messaging/AttachmentTests.kt b/node/src/test/kotlin/net/corda/node/messaging/AttachmentTests.kt index 67d078c3ca..1975010503 100644 --- a/node/src/test/kotlin/net/corda/node/messaging/AttachmentTests.kt +++ b/node/src/test/kotlin/net/corda/node/messaging/AttachmentTests.kt @@ -14,8 +14,8 @@ import net.corda.node.services.network.NetworkMapService import net.corda.node.services.persistence.NodeAttachmentService import net.corda.node.services.persistence.schemas.AttachmentEntity import net.corda.node.services.transactions.SimpleNotaryService -import net.corda.testing.node.MockNetwork import net.corda.node.utilities.databaseTransaction +import net.corda.testing.node.MockNetwork import net.corda.testing.node.makeTestDataSourceProperties import org.jetbrains.exposed.sql.Database import org.junit.Before diff --git a/node/src/test/kotlin/net/corda/node/services/AbstractNetworkMapServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/AbstractNetworkMapServiceTest.kt index 070cbeba8b..2fb39b0b6a 100644 --- a/node/src/test/kotlin/net/corda/node/services/AbstractNetworkMapServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/AbstractNetworkMapServiceTest.kt @@ -256,6 +256,7 @@ abstract class AbstractNetworkMapServiceTest data class Added(val node: NodeInfo) : Changed() { constructor(node: MockNode) : this(node.info) } + data class Removed(val node: NodeInfo) : Changed() { constructor(node: MockNode) : this(node.info) } diff --git a/node/src/test/kotlin/net/corda/node/services/InMemoryIdentityServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/InMemoryIdentityServiceTests.kt index 7ed80dc192..6417754abd 100644 --- a/node/src/test/kotlin/net/corda/node/services/InMemoryIdentityServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/InMemoryIdentityServiceTests.kt @@ -4,7 +4,10 @@ import net.corda.core.crypto.Party import net.corda.core.crypto.composite import net.corda.core.crypto.generateKeyPair import net.corda.node.services.identity.InMemoryIdentityService -import net.corda.testing.* +import net.corda.testing.ALICE +import net.corda.testing.ALICE_PUBKEY +import net.corda.testing.BOB +import net.corda.testing.BOB_PUBKEY import org.junit.Test import kotlin.test.assertEquals import kotlin.test.assertNull diff --git a/node/src/test/kotlin/net/corda/node/services/NotaryChangeTests.kt b/node/src/test/kotlin/net/corda/node/services/NotaryChangeTests.kt index 178758757f..a8b78d2419 100644 --- a/node/src/test/kotlin/net/corda/node/services/NotaryChangeTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/NotaryChangeTests.kt @@ -8,7 +8,6 @@ import net.corda.core.node.services.ServiceInfo import net.corda.core.seconds import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.DUMMY_NOTARY -import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.flows.NotaryChangeFlow.Instigator import net.corda.flows.StateReplacementException import net.corda.node.internal.AbstractNode diff --git a/node/src/test/kotlin/net/corda/node/services/NotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/NotaryServiceTests.kt index c6c9110860..f55b9ecc3a 100644 --- a/node/src/test/kotlin/net/corda/node/services/NotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/NotaryServiceTests.kt @@ -11,14 +11,12 @@ import net.corda.core.node.services.ServiceInfo import net.corda.core.seconds import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY -import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.flows.NotaryError import net.corda.flows.NotaryException import net.corda.flows.NotaryFlow import net.corda.node.internal.AbstractNode import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.SimpleNotaryService -import net.corda.testing.MINI_CORP_KEY import net.corda.testing.node.MockNetwork import org.assertj.core.api.Assertions.assertThat import org.junit.Before diff --git a/node/src/test/kotlin/net/corda/node/services/ValidatingNotaryServiceTests.kt b/node/src/test/kotlin/net/corda/node/services/ValidatingNotaryServiceTests.kt index 3e7a311dda..14989ae104 100644 --- a/node/src/test/kotlin/net/corda/node/services/ValidatingNotaryServiceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/ValidatingNotaryServiceTests.kt @@ -8,7 +8,6 @@ import net.corda.core.getOrThrow import net.corda.core.node.services.ServiceInfo import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY -import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.flows.NotaryError import net.corda.flows.NotaryException import net.corda.flows.NotaryFlow @@ -16,7 +15,6 @@ import net.corda.node.internal.AbstractNode import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.testing.MEGA_CORP_KEY -import net.corda.testing.MINI_CORP_KEY import net.corda.testing.node.MockNetwork import org.assertj.core.api.Assertions.assertThat import org.junit.Before diff --git a/node/src/test/kotlin/net/corda/node/services/VaultWithCashTest.kt b/node/src/test/kotlin/net/corda/node/services/VaultWithCashTest.kt index 735ade1011..9c028eff0d 100644 --- a/node/src/test/kotlin/net/corda/node/services/VaultWithCashTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/VaultWithCashTest.kt @@ -14,9 +14,6 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.core.utilities.LogHelper -import net.corda.node.services.schema.HibernateObserver -import net.corda.node.services.schema.NodeSchemaService -import net.corda.node.services.vault.NodeVaultService import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.databaseTransaction import net.corda.testing.BOB_KEY @@ -138,9 +135,9 @@ class VaultWithCashTest { databaseTransaction(database) { // A tx that sends us money. services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 10, 10, Random(0L), - issuedBy = MEGA_CORP.ref(1), - issuerKey = MEGA_CORP_KEY, - ownedBy = freshKey.public.composite) + issuedBy = MEGA_CORP.ref(1), + issuerKey = MEGA_CORP_KEY, + ownedBy = freshKey.public.composite) println("Cash balance: ${vault.cashBalances[USD]}") assertThat(vault.unconsumedStates()).hasSize(10) @@ -173,8 +170,7 @@ class VaultWithCashTest { LOCKED: ${vault.softLockedStates().count()} : ${vault.softLockedStates()} """) txn1 - } - catch(e: Exception) { + } catch(e: Exception) { println(e) } } @@ -206,8 +202,7 @@ class VaultWithCashTest { LOCKED: ${vault.softLockedStates().count()} : ${vault.softLockedStates()} """) txn2 - } - catch(e: Exception) { + } catch(e: Exception) { println(e) } } @@ -219,7 +214,7 @@ class VaultWithCashTest { countDown.await() databaseTransaction(database) { println("Cash balance: ${vault.cashBalances[USD]}") - assertThat(vault.cashBalances[USD]).isIn(DOLLARS(20),DOLLARS(40)) + assertThat(vault.cashBalances[USD]).isIn(DOLLARS(20), DOLLARS(40)) } } @@ -287,7 +282,7 @@ class VaultWithCashTest { val cash = vault.unconsumedStates() cash.forEach { println(it.state.data.amount) } - services.fillWithSomeTestDeals(listOf("123","456","789")) + services.fillWithSomeTestDeals(listOf("123", "456", "789")) val deals = vault.unconsumedStates() deals.forEach { println(it.state.data.ref) } } @@ -315,7 +310,7 @@ class VaultWithCashTest { val freshKey = services.keyManagementService.freshKey() databaseTransaction(database) { - services.fillWithSomeTestDeals(listOf("123","456","789")) + services.fillWithSomeTestDeals(listOf("123", "456", "789")) val deals = vault.unconsumedStates().toList() deals.forEach { println(it.state.data.ref) } diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt index 99994f4ed2..3a77aa0626 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTests.kt @@ -15,7 +15,6 @@ import net.corda.core.utilities.LogHelper import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserServiceImpl import net.corda.node.services.api.MonitoringService -import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.network.InMemoryNetworkMapCache diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt index 0909c6ac83..2f8a67987a 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt @@ -324,7 +324,7 @@ class StateMachineManagerTests { node1.info.legalIdentity, notary1.info.notaryIdentity)) // We pay a couple of times, the notary picking should go round robin - for (i in 1 .. 3) { + for (i in 1..3) { node1.services.startFlow(CashPaymentFlow(500.DOLLARS, node2.info.legalIdentity)) net.runNetwork() } diff --git a/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt index d1dd8ed608..27075838a8 100644 --- a/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/vault/NodeVaultServiceTest.kt @@ -303,7 +303,7 @@ class NodeVaultServiceTest { val spendableStatesUSD = (services.vaultService as NodeVaultService).unconsumedStatesForSpending(100.DOLLARS, lockId = UUID.randomUUID()) spendableStatesUSD.forEach(::println) assertThat(spendableStatesUSD).hasSize(1) - assertThat(spendableStatesUSD[0].state.data.amount.quantity).isEqualTo(100L*100) + assertThat(spendableStatesUSD[0].state.data.amount.quantity).isEqualTo(100L * 100) assertThat(services.vaultService.softLockedStates()).hasSize(1) } } @@ -316,7 +316,7 @@ class NodeVaultServiceTest { services.fillWithSomeTestCash(100.DOLLARS, DUMMY_NOTARY, 1, 1, Random(0L), issuedBy = (BOC.ref(1)), issuerKey = BOC_KEY) val spendableStatesUSD = services.vaultService.unconsumedStatesForSpending(200.DOLLARS, lockId = UUID.randomUUID(), - onlyFromIssuerParties = setOf(DUMMY_CASH_ISSUER.party, BOC)).toList() + onlyFromIssuerParties = setOf(DUMMY_CASH_ISSUER.party, BOC)).toList() spendableStatesUSD.forEach(::println) assertThat(spendableStatesUSD).hasSize(2) assertThat(spendableStatesUSD[0].state.data.amount.token.issuer).isEqualTo(DUMMY_CASH_ISSUER) @@ -373,7 +373,7 @@ class NodeVaultServiceTest { val spendableStatesUSD = (services.vaultService as NodeVaultService).unconsumedStatesForSpending(1.DOLLARS, lockId = UUID.randomUUID()) spendableStatesUSD.forEach(::println) assertThat(spendableStatesUSD).hasSize(1) - assertThat(spendableStatesUSD[0].state.data.amount.quantity).isGreaterThanOrEqualTo(1L*100) + assertThat(spendableStatesUSD[0].state.data.amount.quantity).isGreaterThanOrEqualTo(1L * 100) assertThat(services.vaultService.softLockedStates()).hasSize(1) } } diff --git a/node/src/test/kotlin/net/corda/node/utilities/AffinityExecutorTests.kt b/node/src/test/kotlin/net/corda/node/utilities/AffinityExecutorTests.kt index ef63bc7cd5..1c6bce0a77 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/AffinityExecutorTests.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/AffinityExecutorTests.kt @@ -5,7 +5,6 @@ import org.junit.Test import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference -import kotlin.concurrent.thread import kotlin.test.assertEquals import kotlin.test.assertFails import kotlin.test.assertNotEquals