diff --git a/.gitignore b/.gitignore
index 8fc24e7bad..5a11950686 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,6 +32,7 @@ lib/dokka.jar
.idea/libraries
.idea/shelf
.idea/dataSources
+.idea/markdown-navigator
/gradle-plugins/.idea/
# Include the -parameters compiler option by default in IntelliJ required for serialization.
@@ -66,7 +67,7 @@ lib/dokka.jar
## Plugin-specific files:
# IntelliJ
-/out/
+**/out/
/classes/
# mpeltonen/sbt-idea plugin
diff --git a/.idea/compiler.xml b/.idea/compiler.xml
index 175866f66d..75643b8eeb 100644
--- a/.idea/compiler.xml
+++ b/.idea/compiler.xml
@@ -40,6 +40,8 @@
+
+
diff --git a/build.gradle b/build.gradle
index 51ab3d5f0e..e3a664f5de 100644
--- a/build.gradle
+++ b/build.gradle
@@ -4,7 +4,7 @@ buildscript {
file("$projectDir/constants.properties").withInputStream { constants.load(it) }
// Our version: bump this on release.
- ext.corda_release_version = "0.14-SNAPSHOT"
+ ext.corda_release_version = "0.15-SNAPSHOT"
// Increment this on any release that changes public APIs anywhere in the Corda platform
// TODO This is going to be difficult until we have a clear separation throughout the code of what is public and what is internal
ext.corda_platform_version = 1
@@ -31,7 +31,6 @@ buildscript {
ext.log4j_version = '2.7'
ext.bouncycastle_version = constants.getProperty("bouncycastleVersion")
ext.guava_version = constants.getProperty("guavaVersion")
- ext.quickcheck_version = '0.7'
ext.okhttp_version = '3.5.0'
ext.netty_version = '4.1.9.Final'
ext.typesafe_config_version = constants.getProperty("typesafeConfigVersion")
@@ -168,22 +167,23 @@ repositories {
}
}
+// TODO: Corda root project currently produces a dummy cordapp when it shouldn't.
// Required for building out the fat JAR.
dependencies {
- compile project(':node')
+ cordaCompile project(':node')
compile "com.google.guava:guava:$guava_version"
- // Set to compile to ensure it exists now deploy nodes no longer relies on build
- compile project(path: ":node:capsule", configuration: 'runtimeArtifacts')
- compile project(path: ":webserver:webcapsule", configuration: 'runtimeArtifacts')
+ // Set to corda compile to ensure it exists now deploy nodes no longer relies on build
+ cordaCompile project(path: ":node:capsule", configuration: 'runtimeArtifacts')
+ cordaCompile project(path: ":webserver:webcapsule", configuration: 'runtimeArtifacts')
// For the buildCordappDependenciesJar task
- runtime project(':client:jfx')
- runtime project(':client:mock')
- runtime project(':client:rpc')
- runtime project(':core')
- runtime project(':finance')
- runtime project(':webserver')
+ cordaRuntime project(':client:jfx')
+ cordaRuntime project(':client:mock')
+ cordaRuntime project(':client:rpc')
+ cordaRuntime project(':core')
+ cordaRuntime project(':finance')
+ cordaRuntime project(':webserver')
testCompile project(':test-utils')
}
@@ -285,7 +285,7 @@ artifactory {
password = System.getenv('CORDA_ARTIFACTORY_PASSWORD')
}
defaults {
- publications('corda-jfx', 'corda-mock', 'corda-rpc', 'corda-core', 'corda', 'cordform-common', 'corda-finance', 'corda-node', 'corda-node-api', 'corda-node-schemas', 'corda-test-utils', 'corda-jackson', 'corda-verifier', 'corda-webserver-impl', 'corda-webserver')
+ publications('corda-jfx', 'corda-mock', 'corda-rpc', 'corda-core', 'corda', 'cordform-common', 'corda-finance', 'corda-node', 'corda-node-api', 'corda-node-schemas', 'corda-test-common', 'corda-test-utils', 'corda-jackson', 'corda-verifier', 'corda-webserver-impl', 'corda-webserver')
}
}
}
diff --git a/client/jackson/build.gradle b/client/jackson/build.gradle
index 234f7e1ae0..f591484aa0 100644
--- a/client/jackson/build.gradle
+++ b/client/jackson/build.gradle
@@ -6,6 +6,8 @@ apply plugin: 'com.jfrog.artifactory'
dependencies {
compile project(':core')
compile project(':finance')
+ testCompile project(':test-utils')
+
compile "org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version"
testCompile "org.jetbrains.kotlin:kotlin-test:$kotlin_version"
@@ -18,12 +20,6 @@ dependencies {
testCompile project(path: ':core', configuration: 'testArtifacts')
testCompile "junit:junit:$junit_version"
-
- // TODO: Upgrade to junit-quickcheck 0.8, once it is released,
- // because it depends on org.javassist:javassist instead
- // of javassist:javassist.
- testCompile "com.pholser:junit-quickcheck-core:$quickcheck_version"
- testCompile "com.pholser:junit-quickcheck-generators:$quickcheck_version"
}
jar {
@@ -31,5 +27,5 @@ jar {
}
publish {
- name = jar.baseName
+ name jar.baseName
}
\ No newline at end of file
diff --git a/client/jackson/src/main/kotlin/net/corda/jackson/JacksonSupport.kt b/client/jackson/src/main/kotlin/net/corda/jackson/JacksonSupport.kt
index 1ebb881d19..ee8333f06e 100644
--- a/client/jackson/src/main/kotlin/net/corda/jackson/JacksonSupport.kt
+++ b/client/jackson/src/main/kotlin/net/corda/jackson/JacksonSupport.kt
@@ -1,5 +1,7 @@
package net.corda.jackson
+import com.fasterxml.jackson.annotation.JsonIgnore
+import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.core.*
import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.databind.deser.std.NumberDeserializers
@@ -9,6 +11,8 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import net.corda.contracts.BusinessCalendar
import net.corda.core.contracts.Amount
+import net.corda.core.contracts.ContractState
+import net.corda.core.contracts.StateRef
import net.corda.core.crypto.*
import net.corda.core.crypto.composite.CompositeKey
import net.corda.core.identity.AbstractParty
@@ -17,9 +21,14 @@ import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.IdentityService
-import net.corda.core.utilities.OpaqueBytes
+import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
+import net.corda.core.transactions.CoreTransaction
+import net.corda.core.transactions.NotaryChangeWireTransaction
+import net.corda.core.transactions.SignedTransaction
+import net.corda.core.transactions.WireTransaction
+import net.corda.core.utilities.OpaqueBytes
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.bouncycastle.asn1.x500.X500Name
import java.math.BigDecimal
@@ -38,32 +47,24 @@ object JacksonSupport {
// If you change this API please update the docs in the docsite (json.rst)
interface PartyObjectMapper {
- @Deprecated("Use partyFromX500Name instead")
- fun partyFromName(partyName: String): Party?
fun partyFromX500Name(name: X500Name): Party?
fun partyFromKey(owningKey: PublicKey): Party?
fun partiesFromName(query: String): Set
}
class RpcObjectMapper(val rpc: CordaRPCOps, factory: JsonFactory, val fuzzyIdentityMatch: Boolean) : PartyObjectMapper, ObjectMapper(factory) {
- @Suppress("OverridingDeprecatedMember", "DEPRECATION")
- override fun partyFromName(partyName: String): Party? = rpc.partyFromName(partyName)
override fun partyFromX500Name(name: X500Name): Party? = rpc.partyFromX500Name(name)
override fun partyFromKey(owningKey: PublicKey): Party? = rpc.partyFromKey(owningKey)
override fun partiesFromName(query: String) = rpc.partiesFromName(query, fuzzyIdentityMatch)
}
class IdentityObjectMapper(val identityService: IdentityService, factory: JsonFactory, val fuzzyIdentityMatch: Boolean) : PartyObjectMapper, ObjectMapper(factory) {
- @Suppress("OverridingDeprecatedMember", "DEPRECATION")
- override fun partyFromName(partyName: String): Party? = identityService.partyFromName(partyName)
override fun partyFromX500Name(name: X500Name): Party? = identityService.partyFromX500Name(name)
override fun partyFromKey(owningKey: PublicKey): Party? = identityService.partyFromKey(owningKey)
override fun partiesFromName(query: String) = identityService.partiesFromName(query, fuzzyIdentityMatch)
}
class NoPartyObjectMapper(factory: JsonFactory) : PartyObjectMapper, ObjectMapper(factory) {
- @Suppress("OverridingDeprecatedMember", "DEPRECATION")
- override fun partyFromName(partyName: String): Party? = throw UnsupportedOperationException()
override fun partyFromX500Name(name: X500Name): Party? = throw UnsupportedOperationException()
override fun partyFromKey(owningKey: PublicKey): Party? = throw UnsupportedOperationException()
override fun partiesFromName(query: String) = throw UnsupportedOperationException()
@@ -109,6 +110,10 @@ object JacksonSupport {
// For X.500 distinguished names
addDeserializer(X500Name::class.java, X500NameDeserializer)
addSerializer(X500Name::class.java, X500NameSerializer)
+
+ // Mixins for transaction types to prevent some properties from being serialized
+ setMixInAnnotation(SignedTransaction::class.java, SignedTransactionMixin::class.java)
+ setMixInAnnotation(WireTransaction::class.java, WireTransactionMixin::class.java)
}
}
@@ -278,7 +283,7 @@ object JacksonSupport {
object CalendarSerializer : JsonSerializer() {
override fun serialize(obj: BusinessCalendar, generator: JsonGenerator, context: SerializerProvider) {
val calendarName = BusinessCalendar.calendars.find { BusinessCalendar.getInstance(it) == obj }
- if(calendarName != null) {
+ if (calendarName != null) {
generator.writeString(calendarName)
} else {
generator.writeObject(BusinessCalendarWrapper(obj.holidayDates))
@@ -371,5 +376,24 @@ object JacksonSupport {
gen.writeBinary(value.bytes)
}
}
+
+ abstract class SignedTransactionMixin {
+ @JsonIgnore abstract fun getTxBits(): SerializedBytes
+ @JsonProperty("signatures") protected abstract fun getSigs(): List
+ @JsonProperty protected abstract fun getTransaction(): CoreTransaction
+ @JsonIgnore abstract fun getTx(): WireTransaction
+ @JsonIgnore abstract fun getNotaryChangeTx(): NotaryChangeWireTransaction
+ @JsonIgnore abstract fun getInputs(): List
+ @JsonIgnore abstract fun getNotary(): Party?
+ @JsonIgnore abstract fun getId(): SecureHash
+ @JsonIgnore abstract fun getRequiredSigningKeys(): Set
+ }
+
+ abstract class WireTransactionMixin {
+ @JsonIgnore abstract fun getMerkleTree(): MerkleTree
+ @JsonIgnore abstract fun getAvailableComponents(): List
+ @JsonIgnore abstract fun getAvailableComponentHashes(): List
+ @JsonIgnore abstract fun getOutputStates(): List
+ }
}
diff --git a/client/jackson/src/test/kotlin/net/corda/jackson/JacksonSupportTest.kt b/client/jackson/src/test/kotlin/net/corda/jackson/JacksonSupportTest.kt
index b1bbfa8f67..a92edb7aea 100644
--- a/client/jackson/src/test/kotlin/net/corda/jackson/JacksonSupportTest.kt
+++ b/client/jackson/src/test/kotlin/net/corda/jackson/JacksonSupportTest.kt
@@ -1,27 +1,31 @@
package net.corda.jackson
import com.fasterxml.jackson.databind.SerializationFeature
-import com.pholser.junit.quickcheck.From
-import com.pholser.junit.quickcheck.Property
-import com.pholser.junit.quickcheck.runner.JUnitQuickcheck
import net.corda.core.contracts.Amount
import net.corda.core.contracts.USD
-import net.corda.core.testing.PublicKeyGenerator
+import net.corda.core.crypto.Crypto
+import net.corda.core.crypto.SignatureMetadata
+import net.corda.core.crypto.TransactionSignature
+import net.corda.core.crypto.generateKeyPair
+import net.corda.core.transactions.SignedTransaction
+import net.corda.testing.ALICE_PUBKEY
+import net.corda.testing.DUMMY_NOTARY
+import net.corda.testing.MINI_CORP
+import net.corda.testing.TestDependencyInjectionBase
+import net.corda.testing.contracts.DummyContract
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.junit.Test
-import org.junit.runner.RunWith
-import java.security.PublicKey
import java.util.*
import kotlin.test.assertEquals
-@RunWith(JUnitQuickcheck::class)
-class JacksonSupportTest {
+class JacksonSupportTest : TestDependencyInjectionBase() {
companion object {
val mapper = JacksonSupport.createNonRpcMapper()
}
- @Property
- fun publicKeySerializingWorks(@From(PublicKeyGenerator::class) publicKey: PublicKey) {
+ @Test
+ fun publicKeySerializingWorks() {
+ val publicKey = generateKeyPair().public
val serialized = mapper.writeValueAsString(publicKey)
val parsedKey = mapper.readValue(serialized, EdDSAPublicKey::class.java)
assertEquals(publicKey, parsedKey)
@@ -50,4 +54,24 @@ class JacksonSupportTest {
val writer = mapper.writer().without(SerializationFeature.INDENT_OUTPUT)
assertEquals("""{"notional":"25000000.00 USD"}""", writer.writeValueAsString(Dummy(Amount.parseCurrency("$25000000"))))
}
+
+ @Test
+ fun writeTransaction() {
+ fun makeDummyTx(): SignedTransaction {
+ val wtx = DummyContract.generateInitial(1, DUMMY_NOTARY, MINI_CORP.ref(1)).toWireTransaction()
+ val signatures = TransactionSignature(
+ ByteArray(1),
+ ALICE_PUBKEY,
+ SignatureMetadata(
+ 1,
+ Crypto.findSignatureScheme(ALICE_PUBKEY).schemeNumberID
+ )
+ )
+ return SignedTransaction(wtx, listOf(signatures))
+ }
+
+ val writer = mapper.writer()
+ // We don't particularly care about the serialized format, just need to make sure it completes successfully.
+ writer.writeValueAsString(makeDummyTx())
+ }
}
diff --git a/client/jfx/build.gradle b/client/jfx/build.gradle
index 5d12e01f56..9c70ee7a04 100644
--- a/client/jfx/build.gradle
+++ b/client/jfx/build.gradle
@@ -62,5 +62,5 @@ jar {
}
publish {
- name = jar.baseName
+ name jar.baseName
}
\ No newline at end of file
diff --git a/client/jfx/src/integration-test/kotlin/net/corda/client/jfx/NodeMonitorModelTest.kt b/client/jfx/src/integration-test/kotlin/net/corda/client/jfx/NodeMonitorModelTest.kt
index 5c93d9e820..94662d7378 100644
--- a/client/jfx/src/integration-test/kotlin/net/corda/client/jfx/NodeMonitorModelTest.kt
+++ b/client/jfx/src/integration-test/kotlin/net/corda/client/jfx/NodeMonitorModelTest.kt
@@ -2,15 +2,15 @@ package net.corda.client.jfx
import net.corda.client.jfx.model.NodeMonitorModel
import net.corda.client.jfx.model.ProgressTrackingEvent
-import net.corda.core.bufferUntilSubscribed
+import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.contracts.Amount
+import net.corda.core.contracts.ContractState
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.USD
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.crypto.keys
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.StateMachineRunId
-import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineTransactionMapping
import net.corda.core.messaging.StateMachineUpdate
@@ -19,12 +19,9 @@ import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.Vault
-import net.corda.core.utilities.OpaqueBytes
import net.corda.core.transactions.SignedTransaction
-import net.corda.testing.ALICE
-import net.corda.testing.BOB
-import net.corda.testing.CHARLIE
-import net.corda.testing.DUMMY_NOTARY
+import net.corda.core.utilities.OpaqueBytes
+import net.corda.core.utilities.getOrThrow
import net.corda.flows.CashExitFlow
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
@@ -32,11 +29,9 @@ import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.nodeapi.User
+import net.corda.testing.*
import net.corda.testing.driver.driver
-import net.corda.testing.expect
-import net.corda.testing.expectEvents
import net.corda.testing.node.DriverBasedTest
-import net.corda.testing.sequence
import org.bouncycastle.asn1.x500.X500Name
import org.junit.Test
import rx.Observable
@@ -53,7 +48,7 @@ class NodeMonitorModelTest : DriverBasedTest() {
lateinit var stateMachineUpdatesBob: Observable
lateinit var progressTracking: Observable
lateinit var transactions: Observable
- lateinit var vaultUpdates: Observable
+ lateinit var vaultUpdates: Observable>
lateinit var networkMapUpdates: Observable
lateinit var newNode: (X500Name) -> NodeInfo
@@ -78,14 +73,14 @@ class NodeMonitorModelTest : DriverBasedTest() {
vaultUpdates = monitor.vaultUpdates.bufferUntilSubscribed()
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()
- monitor.register(aliceNodeHandle.configuration.rpcAddress!!, cashUser.username, cashUser.password)
+ monitor.register(aliceNodeHandle.configuration.rpcAddress!!, cashUser.username, cashUser.password, initialiseSerialization = false)
rpc = monitor.proxyObservable.value!!
val bobNodeHandle = startNode(BOB.name, rpcUsers = listOf(cashUser)).getOrThrow()
bobNode = bobNodeHandle.nodeInfo
val monitorBob = NodeMonitorModel()
stateMachineUpdatesBob = monitorBob.stateMachineUpdates.bufferUntilSubscribed()
- monitorBob.register(bobNodeHandle.configuration.rpcAddress!!, cashUser.username, cashUser.password)
+ monitorBob.register(bobNodeHandle.configuration.rpcAddress!!, cashUser.username, cashUser.password, initialiseSerialization = false)
rpcBob = monitorBob.proxyObservable.value!!
runTest()
}
@@ -148,7 +143,7 @@ class NodeMonitorModelTest : DriverBasedTest() {
var moveSmId: StateMachineRunId? = null
var issueTx: SignedTransaction? = null
var moveTx: SignedTransaction? = null
- stateMachineUpdates.expectEvents {
+ stateMachineUpdates.expectEvents(isStrict = false) {
sequence(
// ISSUE
expect { add: StateMachineUpdate.Added ->
@@ -159,14 +154,13 @@ class NodeMonitorModelTest : DriverBasedTest() {
expect { remove: StateMachineUpdate.Removed ->
require(remove.id == issueSmId)
},
- // MOVE
- expect { add: StateMachineUpdate.Added ->
+ // MOVE - N.B. There are other framework flows that happen in parallel for the remote resolve transactions flow
+ expect(match = { it is StateMachineUpdate.Added && it.stateMachineInfo.flowLogicClassName == CashPaymentFlow::class.java.name }) { add: StateMachineUpdate.Added ->
moveSmId = add.id
val initiator = add.stateMachineInfo.initiator
require(initiator is FlowInitiator.RPC && initiator.username == "user1")
},
- expect { remove: StateMachineUpdate.Removed ->
- require(remove.id == moveSmId)
+ expect(match = { it is StateMachineUpdate.Removed && it.id == moveSmId }) {
}
)
}
diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/ContractStateModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/ContractStateModel.kt
index bbe3736c22..4944ca31b9 100644
--- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/ContractStateModel.kt
+++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/ContractStateModel.kt
@@ -19,7 +19,7 @@ data class Diff(
* This model exposes the list of owned contract states.
*/
class ContractStateModel {
- private val vaultUpdates: Observable by observable(NodeMonitorModel::vaultUpdates)
+ private val vaultUpdates: Observable> by observable(NodeMonitorModel::vaultUpdates)
private val contractStatesDiff: Observable> = vaultUpdates.map {
Diff(it.produced, it.consumed)
diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt
index e2b134bc8c..33e160c742 100644
--- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt
+++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt
@@ -3,14 +3,13 @@ package net.corda.client.jfx.model
import javafx.beans.property.SimpleObjectProperty
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
+import net.corda.core.contracts.ContractState
import net.corda.core.flows.StateMachineRunId
-import net.corda.core.messaging.CordaRPCOps
-import net.corda.core.messaging.StateMachineInfo
-import net.corda.core.messaging.StateMachineTransactionMapping
-import net.corda.core.messaging.StateMachineUpdate
+import net.corda.core.messaging.*
import net.corda.core.node.services.NetworkMapCache.MapChange
import net.corda.core.node.services.Vault
-import net.corda.core.seconds
+import net.corda.core.node.services.vault.*
+import net.corda.core.utilities.seconds
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NetworkHostAndPort
import rx.Observable
@@ -32,14 +31,14 @@ data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val mess
class NodeMonitorModel {
private val stateMachineUpdatesSubject = PublishSubject.create()
- private val vaultUpdatesSubject = PublishSubject.create()
+ private val vaultUpdatesSubject = PublishSubject.create>()
private val transactionsSubject = PublishSubject.create()
private val stateMachineTransactionMappingSubject = PublishSubject.create()
private val progressTrackingSubject = PublishSubject.create()
private val networkMapSubject = PublishSubject.create()
val stateMachineUpdates: Observable = stateMachineUpdatesSubject
- val vaultUpdates: Observable = vaultUpdatesSubject
+ val vaultUpdates: Observable> = vaultUpdatesSubject
val transactions: Observable = transactionsSubject
val stateMachineTransactionMapping: Observable = stateMachineTransactionMappingSubject
val progressTracking: Observable = progressTrackingSubject
@@ -51,17 +50,18 @@ class NodeMonitorModel {
* Register for updates to/from a given vault.
* TODO provide an unsubscribe mechanism
*/
- fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String) {
+ fun register(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, initialiseSerialization: Boolean = true) {
val client = CordaRPCClient(
hostAndPort = nodeHostAndPort,
configuration = CordaRPCClientConfiguration.default.copy(
connectionMaxRetryInterval = 10.seconds
- )
+ ),
+ initialiseSerialization = initialiseSerialization
)
val connection = client.start(username, password)
val proxy = connection.proxy
- val (stateMachines, stateMachineUpdates) = proxy.stateMachinesAndUpdates()
+ val (stateMachines, stateMachineUpdates) = proxy.stateMachinesFeed()
// Extract the flow tracking stream
// TODO is there a nicer way of doing this? Stream of streams in general results in code like this...
val currentProgressTrackerUpdates = stateMachines.mapNotNull { stateMachine ->
@@ -82,21 +82,22 @@ class NodeMonitorModel {
val currentStateMachines = stateMachines.map { StateMachineUpdate.Added(it) }
stateMachineUpdates.startWith(currentStateMachines).subscribe(stateMachineUpdatesSubject)
- // Vault updates
- val (vault, vaultUpdates) = proxy.vaultAndUpdates()
- val initialVaultUpdate = Vault.Update(setOf(), vault.toSet())
+ // Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
+ val (vaultSnapshot, vaultUpdates) = proxy.vaultTrackBy(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
+ PageSpecification(DEFAULT_PAGE_NUM, MAX_PAGE_SIZE))
+ val initialVaultUpdate = Vault.Update(setOf(), vaultSnapshot.states.toSet())
vaultUpdates.startWith(initialVaultUpdate).subscribe(vaultUpdatesSubject)
// Transactions
- val (transactions, newTransactions) = proxy.verifiedTransactions()
+ val (transactions, newTransactions) = proxy.verifiedTransactionsFeed()
newTransactions.startWith(transactions).subscribe(transactionsSubject)
// SM -> TX mapping
- val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMapping()
+ val (smTxMappings, futureSmTxMappings) = proxy.stateMachineRecordedTransactionMappingFeed()
futureSmTxMappings.startWith(smTxMappings).subscribe(stateMachineTransactionMappingSubject)
// Parties on network
- val (parties, futurePartyUpdate) = proxy.networkMapUpdates()
+ val (parties, futurePartyUpdate) = proxy.networkMapFeed()
futurePartyUpdate.startWith(parties.map { MapChange.Added(it) }).subscribe(networkMapSubject)
proxyObservable.set(proxy)
diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/AmountBindings.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/AmountBindings.kt
index 31cb4bf3e6..26387c1663 100644
--- a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/AmountBindings.kt
+++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/AmountBindings.kt
@@ -23,13 +23,14 @@ object AmountBindings {
) { sum -> Amount(sum.toLong(), token) }
fun exchange(
- currency: ObservableValue,
- exchangeRate: ObservableValue
+ observableCurrency: ObservableValue,
+ observableExchangeRate: ObservableValue
): ObservableValue) -> Long>> {
- return EasyBind.combine(currency, exchangeRate) { currency, exchangeRate ->
- Pair(currency) { amount: Amount ->
- (exchangeRate.rate(amount.token, currency) * amount.quantity).toLong()
- }
+ return EasyBind.combine(observableCurrency, observableExchangeRate) { currency, exchangeRate ->
+ Pair) -> Long>(
+ currency,
+ { (quantity, _, token) -> (exchangeRate.rate(token, currency) * quantity).toLong() }
+ )
}
}
diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt
index 5ccc04a7c4..198fa779c5 100644
--- a/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt
+++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/utils/ObservableUtilities.kt
@@ -1,5 +1,6 @@
package net.corda.client.jfx.utils
+import javafx.application.Platform
import javafx.beans.binding.Bindings
import javafx.beans.binding.BooleanBinding
import javafx.beans.property.ReadOnlyObjectWrapper
@@ -10,7 +11,13 @@ import javafx.collections.MapChangeListener
import javafx.collections.ObservableList
import javafx.collections.ObservableMap
import javafx.collections.transformation.FilteredList
+import net.corda.core.contracts.ContractState
+import net.corda.core.contracts.StateAndRef
+import net.corda.core.messaging.DataFeed
+import net.corda.core.node.services.Vault
import org.fxmisc.easybind.EasyBind
+import rx.Observable
+import rx.schedulers.Schedulers
import java.util.function.Predicate
/**
@@ -313,3 +320,36 @@ fun ObservableList.firstOrDefault(default: ObservableValue, predicate
fun ObservableList.firstOrNullObservable(predicate: (A) -> Boolean): ObservableValue {
return Bindings.createObjectBinding({ this.firstOrNull(predicate) }, arrayOf(this))
}
+
+/**
+ * Modifies the given Rx observable such that emissions are run on the JavaFX GUI thread. Use this when you have an Rx
+ * observable that may emit in the background e.g. from the network and you wish to link it to the user interface.
+ *
+ * Note: you should use the returned observable, not the original one this method is called on.
+ */
+fun Observable.observeOnFXThread(): Observable = observeOn(Schedulers.from(Platform::runLater))
+
+/**
+ * Given a [DataFeed] that contains the results of a vault query and a subsequent stream of changes, returns a JavaFX
+ * [ObservableList] that mirrors the streamed results on the UI thread. Note that the paging is *not* respected by this
+ * function: if a state is added that would not have appeared in the page in the initial query, it will still be added
+ * to the observable list.
+ *
+ * @see toFXListOfStates if you want just the state objects and not the ledger pointers too.
+ */
+fun DataFeed, Vault.Update>.toFXListOfStateRefs(): ObservableList> {
+ val list = FXCollections.observableArrayList(snapshot.states)
+ updates.observeOnFXThread().subscribe { (consumed, produced) ->
+ list.removeAll(consumed)
+ list.addAll(produced)
+ }
+ return list
+}
+
+/**
+ * Returns the same list as [toFXListOfStateRefs] but which contains the states instead of [StateAndRef] wrappers.
+ * The same notes apply as with that function.
+ */
+fun DataFeed, Vault.Update>.toFXListOfStates(): ObservableList {
+ return toFXListOfStateRefs().map { it.state.data }
+}
\ No newline at end of file
diff --git a/client/mock/build.gradle b/client/mock/build.gradle
index d709d4c911..0c4a2efbb0 100644
--- a/client/mock/build.gradle
+++ b/client/mock/build.gradle
@@ -30,5 +30,5 @@ jar {
}
publish {
- name = jar.baseName
+ name jar.baseName
}
\ No newline at end of file
diff --git a/client/mock/src/main/kotlin/net/corda/client/mock/Generator.kt b/client/mock/src/main/kotlin/net/corda/client/mock/Generator.kt
index 9748e2a2ca..8da2e4e49b 100644
--- a/client/mock/src/main/kotlin/net/corda/client/mock/Generator.kt
+++ b/client/mock/src/main/kotlin/net/corda/client/mock/Generator.kt
@@ -167,21 +167,23 @@ fun Generator.Companion.replicate(number: Int, generator: Generator): Gen
}
-fun Generator.Companion.replicatePoisson(meanSize: Double, generator: Generator) = Generator> {
+fun Generator.Companion.replicatePoisson(meanSize: Double, generator: Generator, atLeastOne: Boolean = false) = Generator> {
val chance = (meanSize - 1) / meanSize
val result = mutableListOf()
var finish = false
while (!finish) {
- val result = Generator.doubleRange(0.0, 1.0).generate(it).flatMap { value ->
+ val res = Generator.doubleRange(0.0, 1.0).generate(it).flatMap { value ->
if (value < chance) {
generator.generate(it).map { result.add(it) }
} else {
finish = true
- Try.Success(Unit)
+ if (result.isEmpty() && atLeastOne) {
+ generator.generate(it).map { result.add(it) }
+ } else Try.Success(Unit)
}
}
- if (result is Try.Failure) {
- return@Generator result
+ if (res is Try.Failure) {
+ return@Generator res
}
}
Try.Success(result)
diff --git a/client/rpc/build.gradle b/client/rpc/build.gradle
index b2ab10dff4..61159ac74d 100644
--- a/client/rpc/build.gradle
+++ b/client/rpc/build.gradle
@@ -24,6 +24,11 @@ sourceSets {
runtimeClasspath += main.output + test.output
srcDir file('src/integration-test/kotlin')
}
+ java {
+ compileClasspath += main.output + test.output
+ runtimeClasspath += main.output + test.output
+ srcDir file('src/integration-test/java')
+ }
}
smokeTest {
kotlin {
@@ -33,6 +38,11 @@ sourceSets {
runtimeClasspath += main.output
srcDir file('src/smoke-test/kotlin')
}
+ java {
+ compileClasspath += main.output
+ runtimeClasspath += main.output
+ srcDir file('src/smoke-test/java')
+ }
}
}
@@ -82,5 +92,5 @@ jar {
}
publish {
- name = jar.baseName
+ name jar.baseName
}
diff --git a/client/rpc/src/integration-test/java/net/corda/client/rpc/CordaRPCJavaClientTest.java b/client/rpc/src/integration-test/java/net/corda/client/rpc/CordaRPCJavaClientTest.java
new file mode 100644
index 0000000000..9f38c17316
--- /dev/null
+++ b/client/rpc/src/integration-test/java/net/corda/client/rpc/CordaRPCJavaClientTest.java
@@ -0,0 +1,81 @@
+package net.corda.client.rpc;
+
+import net.corda.core.concurrent.CordaFuture;
+import net.corda.client.rpc.internal.RPCClient;
+import net.corda.core.contracts.Amount;
+import net.corda.core.messaging.CordaRPCOps;
+import net.corda.core.messaging.FlowHandle;
+import net.corda.core.node.services.ServiceInfo;
+import net.corda.core.utilities.OpaqueBytes;
+import net.corda.flows.AbstractCashFlow;
+import net.corda.flows.CashIssueFlow;
+import net.corda.flows.CashPaymentFlow;
+import net.corda.node.internal.Node;
+import net.corda.node.services.transactions.ValidatingNotaryService;
+import net.corda.nodeapi.User;
+import net.corda.testing.node.NodeBasedTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+import static kotlin.test.AssertionsKt.assertEquals;
+import static net.corda.client.rpc.CordaRPCClientConfiguration.getDefault;
+import static net.corda.contracts.GetBalances.getCashBalance;
+import static net.corda.node.services.RPCUserServiceKt.startFlowPermission;
+import static net.corda.testing.TestConstants.getALICE;
+
+public class CordaRPCJavaClientTest extends NodeBasedTest {
+ private List perms = Arrays.asList(startFlowPermission(CashPaymentFlow.class), startFlowPermission(CashIssueFlow.class));
+ private Set permSet = new HashSet<>(perms);
+ private User rpcUser = new User("user1", "test", permSet);
+
+ private Node node;
+ private CordaRPCClient client;
+ private RPCClient.RPCConnection connection = null;
+ private CordaRPCOps rpcProxy;
+
+ private void login(String username, String password) {
+ connection = client.start(username, password);
+ rpcProxy = connection.getProxy();
+ }
+
+ @Before
+ public void setUp() throws ExecutionException, InterruptedException {
+ Set services = new HashSet<>(Collections.singletonList(new ServiceInfo(ValidatingNotaryService.Companion.getType(), null)));
+ CordaFuture nodeFuture = startNode(getALICE().getName(), 1, services, Arrays.asList(rpcUser), Collections.emptyMap());
+ node = nodeFuture.get();
+ client = new CordaRPCClient(node.getConfiguration().getRpcAddress(), null, getDefault(), false);
+ }
+
+ @After
+ public void done() throws IOException {
+ connection.close();
+ }
+
+ @Test
+ public void testLogin() {
+ login(rpcUser.getUsername(), rpcUser.getPassword());
+ }
+
+ @Test
+ public void testCashBalances() throws NoSuchFieldException, ExecutionException, InterruptedException {
+ login(rpcUser.getUsername(), rpcUser.getPassword());
+
+ Amount dollars123 = new Amount<>(123, Currency.getInstance("USD"));
+
+ FlowHandle flowHandle = rpcProxy.startFlowDynamic(CashIssueFlow.class,
+ dollars123, OpaqueBytes.of("1".getBytes()),
+ node.info.getLegalIdentity(), node.info.getLegalIdentity());
+ System.out.println("Started issuing cash, waiting on result");
+ flowHandle.getReturnValue().get();
+
+ Amount balance = getCashBalance(rpcProxy, Currency.getInstance("USD"));
+ System.out.print("Balance: " + balance + "\n");
+
+ assertEquals(dollars123, balance, "matching");
+ }
+}
diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt
index 40a179fd29..19e5fdc50e 100644
--- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt
+++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt
@@ -1,13 +1,15 @@
package net.corda.client.rpc
+import net.corda.contracts.getCashBalance
+import net.corda.contracts.getCashBalances
import net.corda.core.contracts.DOLLARS
+import net.corda.core.contracts.USD
+import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.FlowInitiator
-import net.corda.core.getOrThrow
import net.corda.core.messaging.*
import net.corda.core.node.services.ServiceInfo
-import net.corda.core.crypto.random63BitValue
import net.corda.core.utilities.OpaqueBytes
-import net.corda.testing.ALICE
+import net.corda.core.utilities.getOrThrow
import net.corda.flows.CashException
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
@@ -15,13 +17,13 @@ import net.corda.node.internal.Node
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.ValidatingNotaryService
import net.corda.nodeapi.User
+import net.corda.testing.ALICE
import net.corda.testing.node.NodeBasedTest
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
import org.junit.Test
-import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
@@ -42,7 +44,7 @@ class CordaRPCClientTest : NodeBasedTest() {
@Before
fun setUp() {
node = startNode(ALICE.name, rpcUsers = listOf(rpcUser), advertisedServices = setOf(ServiceInfo(ValidatingNotaryService.type))).getOrThrow()
- client = CordaRPCClient(node.configuration.rpcAddress!!)
+ client = CordaRPCClient(node.configuration.rpcAddress!!, initialiseSerialization = false)
}
@After
@@ -117,20 +119,18 @@ class CordaRPCClientTest : NodeBasedTest() {
println("Started issuing cash, waiting on result")
flowHandle.returnValue.get()
- val finishCash = proxy.getCashBalances()
- println("Cash Balances: $finishCash")
- assertEquals(1, finishCash.size)
- assertEquals(123.DOLLARS, finishCash.get(Currency.getInstance("USD")))
+ val cashDollars = proxy.getCashBalance(USD)
+ println("Balance: $cashDollars")
+ assertEquals(123.DOLLARS, cashDollars)
}
@Test
fun `flow initiator via RPC`() {
login(rpcUser.username, rpcUser.password)
val proxy = connection!!.proxy
- val smUpdates = proxy.stateMachinesAndUpdates()
var countRpcFlows = 0
var countShellFlows = 0
- smUpdates.second.subscribe {
+ proxy.stateMachinesFeed().updates.subscribe {
if (it is StateMachineUpdate.Added) {
val initiator = it.stateMachineInfo.initiator
if (initiator is FlowInitiator.RPC)
diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt
index ac524995c7..4433ba03be 100644
--- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt
+++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt
@@ -1,24 +1,15 @@
package net.corda.client.rpc
-import com.esotericsoftware.kryo.Kryo
-import com.esotericsoftware.kryo.Serializer
-import com.esotericsoftware.kryo.io.Input
-import com.esotericsoftware.kryo.io.Output
-import com.esotericsoftware.kryo.pool.KryoPool
-import com.google.common.util.concurrent.Futures
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.crypto.random63BitValue
-import net.corda.core.future
-import net.corda.core.getOrThrow
+import net.corda.core.internal.concurrent.fork
+import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.RPCOps
-import net.corda.core.millis
-import net.corda.core.seconds
-import net.corda.core.utilities.NetworkHostAndPort
-import net.corda.core.utilities.Try
+import net.corda.core.serialization.SerializationDefaults
+import net.corda.core.utilities.*
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
-import net.corda.nodeapi.RPCKryo
import net.corda.testing.*
import net.corda.testing.driver.poll
import org.apache.activemq.artemis.api.core.SimpleString
@@ -29,10 +20,7 @@ import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject
import java.time.Duration
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.concurrent.Executors
-import java.util.concurrent.ScheduledExecutorService
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
class RPCStabilityTests {
@@ -238,9 +226,7 @@ class RPCStabilityTests {
assertEquals("pong", client.ping())
serverFollower.shutdown()
startRpcServer(ops = ops, customPort = serverPort).getOrThrow()
- val pingFuture = future {
- client.ping()
- }
+ val pingFuture = ForkJoinPool.commonPool().fork(client::ping)
assertEquals("pong", pingFuture.getOrThrow(10.seconds))
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
@@ -274,9 +260,9 @@ class RPCStabilityTests {
).get()
val numberOfClients = 4
- val clients = Futures.allAsList((1 .. numberOfClients).map {
+ val clients = (1 .. numberOfClients).map {
startRandomRpcClient(server.broker.hostAndPort!!)
- }).get()
+ }.transpose().get()
// Poll until all clients connect
pollUntilClientNumber(server, numberOfClients)
@@ -305,16 +291,8 @@ class RPCStabilityTests {
return Observable.interval(interval.toMillis(), TimeUnit.MILLISECONDS).map { chunk }
}
}
- val dummyObservableSerialiser = object : Serializer>() {
- override fun write(kryo: Kryo?, output: Output?, `object`: Observable?) {
- }
- override fun read(kryo: Kryo?, input: Input?, type: Class>?): Observable {
- return Observable.empty()
- }
- }
@Test
fun `slow consumers are kicked`() {
- val kryoPool = KryoPool.Builder { RPCKryo(dummyObservableSerialiser) }.build()
rpcDriver {
val server = startRpcServer(maxBufferedBytesPerClient = 10 * 1024 * 1024, ops = SlowConsumerRPCOpsImpl()).get()
@@ -339,7 +317,7 @@ class RPCStabilityTests {
methodName = SlowConsumerRPCOps::streamAtInterval.name,
arguments = listOf(10.millis, 123456)
)
- request.writeToClientMessage(kryoPool, message)
+ request.writeToClientMessage(SerializationDefaults.RPC_SERVER_CONTEXT, message)
producer.send(message)
session.commit()
diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt
index 78baa8d906..3ec8268c3e 100644
--- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt
+++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt
@@ -2,11 +2,17 @@ package net.corda.client.rpc
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration
+import net.corda.client.rpc.serialization.KryoClientSerializationScheme
import net.corda.core.messaging.CordaRPCOps
+import net.corda.core.serialization.SerializationDefaults
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.config.SSLConfiguration
+import net.corda.nodeapi.internal.serialization.AMQPClientSerializationScheme
+import net.corda.nodeapi.internal.serialization.KRYO_P2P_CONTEXT
+import net.corda.nodeapi.internal.serialization.KRYO_RPC_CLIENT_CONTEXT
+import net.corda.nodeapi.internal.serialization.SerializationFactoryImpl
import java.time.Duration
/** @see RPCClient.RPCConnection */
@@ -35,11 +41,22 @@ data class CordaRPCClientConfiguration(
class CordaRPCClient(
hostAndPort: NetworkHostAndPort,
sslConfiguration: SSLConfiguration? = null,
- configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default
+ configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default,
+ initialiseSerialization: Boolean = true
) {
+ init {
+ // Init serialization. It's plausible there are multiple clients in a single JVM, so be tolerant of
+ // others having registered first.
+ // TODO: allow clients to have serialization factory etc injected and align with RPC protocol version?
+ if (initialiseSerialization) {
+ initialiseSerialization()
+ }
+ }
+
private val rpcClient = RPCClient(
tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfiguration),
- configuration.toRpcClientConfiguration()
+ configuration.toRpcClientConfiguration(),
+ KRYO_RPC_CLIENT_CONTEXT
)
fun start(username: String, password: String): CordaRPCConnection {
@@ -49,4 +66,21 @@ class CordaRPCClient(
inline fun use(username: String, password: String, block: (CordaRPCConnection) -> A): A {
return start(username, password).use(block)
}
+
+ companion object {
+ fun initialiseSerialization() {
+ try {
+ SerializationDefaults.SERIALIZATION_FACTORY = SerializationFactoryImpl().apply {
+ registerScheme(KryoClientSerializationScheme(this))
+ registerScheme(AMQPClientSerializationScheme())
+ }
+ SerializationDefaults.P2P_CONTEXT = KRYO_P2P_CONTEXT
+ SerializationDefaults.RPC_CLIENT_CONTEXT = KRYO_RPC_CLIENT_CONTEXT
+ } catch(e: IllegalStateException) {
+ // Check that it's registered as we expect
+ check(SerializationDefaults.SERIALIZATION_FACTORY is SerializationFactoryImpl) { "RPC client encountered conflicting configuration of serialization subsystem." }
+ check((SerializationDefaults.SERIALIZATION_FACTORY as SerializationFactoryImpl).alreadyRegisteredSchemes.any { it is KryoClientSerializationScheme }) { "RPC client encountered conflicting configuration of serialization subsystem." }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt
index a792c24faa..94fa65a018 100644
--- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt
+++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt
@@ -1,10 +1,12 @@
package net.corda.client.rpc.internal
-import net.corda.core.logElapsedTime
-import net.corda.core.messaging.RPCOps
-import net.corda.core.minutes
import net.corda.core.crypto.random63BitValue
-import net.corda.core.seconds
+import net.corda.core.internal.logElapsedTime
+import net.corda.core.messaging.RPCOps
+import net.corda.core.serialization.SerializationContext
+import net.corda.core.serialization.SerializationDefaults
+import net.corda.core.utilities.minutes
+import net.corda.core.utilities.seconds
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
@@ -85,13 +87,15 @@ data class RPCClientConfiguration(
*/
class RPCClient(
val transport: TransportConfiguration,
- val rpcConfiguration: RPCClientConfiguration = RPCClientConfiguration.default
+ val rpcConfiguration: RPCClientConfiguration = RPCClientConfiguration.default,
+ val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
) {
constructor(
hostAndPort: NetworkHostAndPort,
sslConfiguration: SSLConfiguration? = null,
- configuration: RPCClientConfiguration = RPCClientConfiguration.default
- ) : this(tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfiguration), configuration)
+ configuration: RPCClientConfiguration = RPCClientConfiguration.default,
+ serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
+ ) : this(tcpTransport(ConnectionDirection.Outbound(), hostAndPort, sslConfiguration), configuration, serializationContext)
companion object {
private val log = loggerFor>()
@@ -146,7 +150,7 @@ class RPCClient(
minLargeMessageSize = rpcConfiguration.maxFileSize
}
- val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator, clientAddress, rpcOpsClass)
+ val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator, clientAddress, rpcOpsClass, serializationContext)
try {
proxyHandler.start()
diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt
index e83363b7fc..98a9e221ff 100644
--- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt
+++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt
@@ -4,18 +4,19 @@ import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
-import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import com.google.common.cache.RemovalCause
import com.google.common.cache.RemovalListener
import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.ThreadFactoryBuilder
-import net.corda.core.ThreadBox
+import net.corda.core.internal.ThreadBox
import net.corda.core.crypto.random63BitValue
-import net.corda.core.getOrThrow
+import net.corda.core.internal.LazyPool
+import net.corda.core.internal.LazyStickyPool
+import net.corda.core.internal.LifeCycle
import net.corda.core.messaging.RPCOps
-import net.corda.core.serialization.KryoPoolWithContext
+import net.corda.core.serialization.SerializationContext
import net.corda.core.utilities.*
import net.corda.nodeapi.*
import org.apache.activemq.artemis.api.core.SimpleString
@@ -61,7 +62,8 @@ class RPCClientProxyHandler(
private val rpcPassword: String,
private val serverLocator: ServerLocator,
private val clientAddress: SimpleString,
- private val rpcOpsClass: Class
+ private val rpcOpsClass: Class,
+ serializationContext: SerializationContext
) : InvocationHandler {
private enum class State {
@@ -74,9 +76,6 @@ class RPCClientProxyHandler(
private companion object {
val log = loggerFor()
- // Note that this KryoPool is not yet capable of deserialising Observables, it requires Proxy-specific context
- // to do that. However it may still be used for serialisation of RPC requests and related messages.
- val kryoPool: KryoPool = KryoPool.Builder { RPCKryo(RpcClientObservableSerializer) }.build()
// To check whether toString() is being invoked
val toStringMethod: Method = Object::toString.javaMethod!!
}
@@ -85,7 +84,7 @@ class RPCClientProxyHandler(
private var reaperExecutor: ScheduledExecutorService? = null
// A sticky pool for running Observable.onNext()s. We need the stickiness to preserve the observation ordering.
- private val observationExecutorThreadFactory = ThreadFactoryBuilder().setNameFormat("rpc-client-observation-pool-%d").build()
+ private val observationExecutorThreadFactory = ThreadFactoryBuilder().setNameFormat("rpc-client-observation-pool-%d").setDaemon(true).build()
private val observationExecutorPool = LazyStickyPool(rpcConfiguration.observationExecutorPoolSize) {
Executors.newFixedThreadPool(1, observationExecutorThreadFactory)
}
@@ -109,11 +108,10 @@ class RPCClientProxyHandler(
private val observablesToReap = ThreadBox(object {
var observables = ArrayList()
})
- // A Kryo pool that automatically adds the observable context when an instance is requested.
- private val kryoPoolWithObservableContext = RpcClientObservableSerializer.createPoolWithContext(kryoPool, observableContext)
+ private val serializationContextWithObservableContext = RpcClientObservableSerializer.createContext(serializationContext, observableContext)
private fun createRpcObservableMap(): RpcObservableMap {
- val onObservableRemove = RemovalListener>> {
+ val onObservableRemove = RemovalListener>> {
val rpcCallSite = callSiteMap?.remove(it.key.toLong)
if (it.cause == RemovalCause.COLLECTED) {
log.warn(listOf(
@@ -194,7 +192,7 @@ class RPCClientProxyHandler(
val replyFuture = SettableFuture.create()
sessionAndProducerPool.run {
val message = it.session.createMessage(false)
- request.writeToClientMessage(kryoPool, message)
+ request.writeToClientMessage(serializationContextWithObservableContext, message)
log.debug {
val argumentsString = arguments?.joinToString() ?: ""
@@ -221,7 +219,7 @@ class RPCClientProxyHandler(
// The handler for Artemis messages.
private fun artemisMessageHandler(message: ClientMessage) {
- val serverToClient = RPCApi.ServerToClient.fromClientMessage(kryoPoolWithObservableContext, message)
+ val serverToClient = RPCApi.ServerToClient.fromClientMessage(serializationContextWithObservableContext, message)
log.debug { "Got message from RPC server $serverToClient" }
when (serverToClient) {
is RPCApi.ServerToClient.RpcReply -> {
@@ -338,7 +336,7 @@ class RPCClientProxyHandler(
}
}
-private typealias RpcObservableMap = Cache>>
+private typealias RpcObservableMap = Cache>>
private typealias RpcReplyMap = ConcurrentHashMap>
private typealias CallSiteMap = ConcurrentHashMap
@@ -348,7 +346,7 @@ private typealias CallSiteMap = ConcurrentHashMap
* @param observableMap holds the Observables that are ultimately exposed to the user.
* @param hardReferenceStore holds references to Observables we want to keep alive while they are subscribed to.
*/
-private data class ObservableContext(
+data class ObservableContext(
val callSiteMap: CallSiteMap?,
val observableMap: RpcObservableMap,
val hardReferenceStore: MutableSet>
@@ -357,17 +355,17 @@ private data class ObservableContext(
/**
* A [Serializer] to deserialise Observables once the corresponding Kryo instance has been provided with an [ObservableContext].
*/
-private object RpcClientObservableSerializer : Serializer>() {
+object RpcClientObservableSerializer : Serializer>() {
private object RpcObservableContextKey
- fun createPoolWithContext(kryoPool: KryoPool, observableContext: ObservableContext): KryoPool {
- return KryoPoolWithContext(kryoPool, RpcObservableContextKey, observableContext)
+
+ fun createContext(serializationContext: SerializationContext, observableContext: ObservableContext): SerializationContext {
+ return serializationContext.withProperty(RpcObservableContextKey, observableContext)
}
- override fun read(kryo: Kryo, input: Input, type: Class>): Observable {
- @Suppress("UNCHECKED_CAST")
+ override fun read(kryo: Kryo, input: Input, type: Class>): Observable {
val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext
val observableId = RPCApi.ObservableId(input.readLong(true))
- val observable = UnicastSubject.create>()
+ val observable = UnicastSubject.create>()
require(observableContext.observableMap.getIfPresent(observableId) == null) {
"Multiple Observables arrived with the same ID $observableId"
}
@@ -384,7 +382,7 @@ private object RpcClientObservableSerializer : Serializer>() {
}.dematerialize()
}
- override fun write(kryo: Kryo, output: Output, observable: Observable) {
+ override fun write(kryo: Kryo, output: Output, observable: Observable<*>) {
throw UnsupportedOperationException("Cannot serialise Observables on the client side")
}
diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/serialization/SerializationScheme.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/serialization/SerializationScheme.kt
new file mode 100644
index 0000000000..0bb26b93fb
--- /dev/null
+++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/serialization/SerializationScheme.kt
@@ -0,0 +1,28 @@
+package net.corda.client.rpc.serialization
+
+import com.esotericsoftware.kryo.pool.KryoPool
+import net.corda.client.rpc.internal.RpcClientObservableSerializer
+import net.corda.core.serialization.SerializationContext
+import net.corda.core.serialization.SerializationFactory
+import net.corda.core.utilities.ByteSequence
+import net.corda.nodeapi.RPCKryo
+import net.corda.nodeapi.internal.serialization.AbstractKryoSerializationScheme
+import net.corda.nodeapi.internal.serialization.DefaultKryoCustomizer
+import net.corda.nodeapi.internal.serialization.KryoHeaderV0_1
+
+class KryoClientSerializationScheme(serializationFactory: SerializationFactory) : AbstractKryoSerializationScheme(serializationFactory) {
+ override fun canDeserializeVersion(byteSequence: ByteSequence, target: SerializationContext.UseCase): Boolean {
+ return byteSequence == KryoHeaderV0_1 && (target == SerializationContext.UseCase.RPCClient || target == SerializationContext.UseCase.P2P)
+ }
+
+ override fun rpcClientKryoPool(context: SerializationContext): KryoPool {
+ return KryoPool.Builder {
+ DefaultKryoCustomizer.customize(RPCKryo(RpcClientObservableSerializer, serializationFactory, context)).apply { classLoader = context.deserializationClassLoader }
+ }.build()
+ }
+
+ // We're on the client and don't have access to server classes.
+ override fun rpcServerKryoPool(context: SerializationContext): KryoPool {
+ throw UnsupportedOperationException()
+ }
+}
\ No newline at end of file
diff --git a/client/rpc/src/smoke-test/java/net/corda/java/rpc/StandaloneCordaRPCJavaClientTest.java b/client/rpc/src/smoke-test/java/net/corda/java/rpc/StandaloneCordaRPCJavaClientTest.java
new file mode 100644
index 0000000000..4d5a250ef6
--- /dev/null
+++ b/client/rpc/src/smoke-test/java/net/corda/java/rpc/StandaloneCordaRPCJavaClientTest.java
@@ -0,0 +1,87 @@
+package net.corda.java.rpc;
+
+import net.corda.client.rpc.CordaRPCConnection;
+import net.corda.core.contracts.Amount;
+import net.corda.core.messaging.CordaRPCOps;
+import net.corda.core.messaging.DataFeed;
+import net.corda.core.messaging.FlowHandle;
+import net.corda.core.node.NodeInfo;
+import net.corda.core.node.services.NetworkMapCache;
+import net.corda.core.utilities.OpaqueBytes;
+import net.corda.flows.AbstractCashFlow;
+import net.corda.flows.CashIssueFlow;
+import net.corda.nodeapi.User;
+import net.corda.smoketesting.NodeConfig;
+import net.corda.smoketesting.NodeProcess;
+import org.bouncycastle.asn1.x500.X500Name;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static kotlin.test.AssertionsKt.assertEquals;
+import static net.corda.contracts.GetBalances.getCashBalance;
+
+public class StandaloneCordaRPCJavaClientTest {
+ private List perms = Collections.singletonList("ALL");
+ private Set permSet = new HashSet<>(perms);
+ private User rpcUser = new User("user1", "test", permSet);
+
+ private AtomicInteger port = new AtomicInteger(15000);
+
+ private NodeProcess notary;
+ private CordaRPCOps rpcProxy;
+ private CordaRPCConnection connection;
+ private NodeInfo notaryNode;
+
+ private NodeConfig notaryConfig = new NodeConfig(
+ new X500Name("CN=Notary Service,O=R3,OU=corda,L=Zurich,C=CH"),
+ port.getAndIncrement(),
+ port.getAndIncrement(),
+ port.getAndIncrement(),
+ Collections.singletonList("corda.notary.validating"),
+ Arrays.asList(rpcUser),
+ null
+ );
+
+ @Before
+ public void setUp() {
+ notary = new NodeProcess.Factory().create(notaryConfig);
+ connection = notary.connect();
+ rpcProxy = connection.getProxy();
+ notaryNode = fetchNotaryIdentity();
+ }
+
+ @After
+ public void done() {
+ try {
+ connection.close();
+ } finally {
+ notary.close();
+ }
+ }
+
+ private NodeInfo fetchNotaryIdentity() {
+ DataFeed, NetworkMapCache.MapChange> nodeDataFeed = rpcProxy.networkMapFeed();
+ return nodeDataFeed.getSnapshot().get(0);
+ }
+
+ @Test
+ public void testCashBalances() throws NoSuchFieldException, ExecutionException, InterruptedException {
+ Amount dollars123 = new Amount<>(123, Currency.getInstance("USD"));
+
+ FlowHandle flowHandle = rpcProxy.startFlowDynamic(CashIssueFlow.class,
+ dollars123, OpaqueBytes.of("1".getBytes()),
+ notaryNode.getLegalIdentity(), notaryNode.getLegalIdentity());
+ System.out.println("Started issuing cash, waiting on result");
+ flowHandle.getReturnValue().get();
+
+ Amount balance = getCashBalance(rpcProxy, Currency.getInstance("USD"));
+ System.out.print("Balance: " + balance + "\n");
+
+ assertEquals(dollars123, balance, "matching");
+ }
+}
diff --git a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt
index 182ca10eea..f26ea31ff8 100644
--- a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt
+++ b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt
@@ -5,19 +5,19 @@ import com.google.common.hash.HashingInputStream
import net.corda.client.rpc.CordaRPCConnection
import net.corda.client.rpc.notUsed
import net.corda.contracts.asset.Cash
-import net.corda.core.contracts.DOLLARS
-import net.corda.core.contracts.POUNDS
-import net.corda.core.contracts.SWISS_FRANCS
+import net.corda.contracts.getCashBalance
+import net.corda.contracts.getCashBalances
+import net.corda.core.contracts.*
import net.corda.core.crypto.SecureHash
-import net.corda.core.getOrThrow
+import net.corda.core.internal.InputStreamAndHash
import net.corda.core.messaging.*
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.*
-import net.corda.core.seconds
import net.corda.core.utilities.OpaqueBytes
-import net.corda.core.sizedInputStreamAndHash
+import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
+import net.corda.core.utilities.seconds
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.nodeapi.User
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotEquals
+import kotlin.test.assertTrue
class StandaloneCordaRPClientTest {
private companion object {
@@ -78,7 +79,7 @@ class StandaloneCordaRPClientTest {
@Test
fun `test attachments`() {
- val attachment = sizedInputStreamAndHash(attachmentSize)
+ val attachment = InputStreamAndHash.createInMemoryTestZip(attachmentSize, 1)
assertFalse(rpcProxy.attachmentExists(attachment.sha256))
val id = WrapperStream(attachment.inputStream).use { rpcProxy.uploadAttachment(it) }
assertEquals(attachment.sha256, id, "Attachment has incorrect SHA256 hash")
@@ -117,38 +118,38 @@ class StandaloneCordaRPClientTest {
@Test
fun `test state machines`() {
- val (stateMachines, updates) = rpcProxy.stateMachinesAndUpdates()
+ val (stateMachines, updates) = rpcProxy.stateMachinesFeed()
assertEquals(0, stateMachines.size)
- var updateCount = 0
+ val updateCount = AtomicInteger(0)
updates.subscribe { update ->
if (update is StateMachineUpdate.Added) {
log.info("StateMachine>> Id=${update.id}")
- ++updateCount
+ updateCount.incrementAndGet()
}
}
// Now issue some cash
rpcProxy.startFlow(::CashIssueFlow, 513.SWISS_FRANCS, OpaqueBytes.of(0), notaryNode.legalIdentity, notaryNode.notaryIdentity)
.returnValue.getOrThrow(timeout)
- assertEquals(1, updateCount)
+ assertEquals(1, updateCount.get())
}
@Test
fun `test vault track by`() {
- val (vault, vaultUpdates) = rpcProxy.vaultTrackBy()
- assertEquals(0, vault.states.size)
+ val (vault, vaultUpdates) = rpcProxy.vaultTrackBy(paging = PageSpecification(DEFAULT_PAGE_NUM))
+ assertEquals(0, vault.totalStatesAvailable)
- var updateCount = 0
+ val updateCount = AtomicInteger(0)
vaultUpdates.subscribe { update ->
log.info("Vault>> FlowId=${update.flowId}")
- ++updateCount
+ updateCount.incrementAndGet()
}
// Now issue some cash
rpcProxy.startFlow(::CashIssueFlow, 629.POUNDS, OpaqueBytes.of(0), notaryNode.legalIdentity, notaryNode.notaryIdentity)
.returnValue.getOrThrow(timeout)
- assertNotEquals(0, updateCount)
+ assertNotEquals(0, updateCount.get())
// Check that this cash exists in the vault
val cashBalance = rpcProxy.getCashBalances()
@@ -177,10 +178,27 @@ class StandaloneCordaRPClientTest {
assertEquals(3, moreResults.totalStatesAvailable) // 629 - 100 + 100
// Check that this cash exists in the vault
- val cashBalance = rpcProxy.getCashBalances()
- log.info("Cash Balances: $cashBalance")
- assertEquals(1, cashBalance.size)
- assertEquals(629.POUNDS, cashBalance[Currency.getInstance("GBP")])
+ val cashBalances = rpcProxy.getCashBalances()
+ log.info("Cash Balances: $cashBalances")
+ assertEquals(1, cashBalances.size)
+ assertEquals(629.POUNDS, cashBalances[Currency.getInstance("GBP")])
+ }
+
+ @Test
+ fun `test cash balances`() {
+ val startCash = rpcProxy.getCashBalances()
+ assertTrue(startCash.isEmpty(), "Should not start with any cash")
+
+ val flowHandle = rpcProxy.startFlow(::CashIssueFlow,
+ 629.DOLLARS, OpaqueBytes.of(0),
+ notaryNode.legalIdentity, notaryNode.legalIdentity
+ )
+ println("Started issuing cash, waiting on result")
+ flowHandle.returnValue.get()
+
+ val balance = rpcProxy.getCashBalance(USD)
+ println("Balance: " + balance)
+ assertEquals(629.DOLLARS, balance)
}
private fun fetchNotaryIdentity(): NodeInfo {
diff --git a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/ValidateClasspathTest.kt b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/ValidateClasspathTest.kt
index ecc534ca8a..f79e97641a 100644
--- a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/ValidateClasspathTest.kt
+++ b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/ValidateClasspathTest.kt
@@ -1,6 +1,6 @@
package net.corda.kotlin.rpc
-import net.corda.core.div
+import net.corda.core.internal.div
import org.junit.Test
import java.io.File
import java.nio.file.Path
diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt
index 20026ab7c1..c6b5329d8c 100644
--- a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt
+++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt
@@ -1,8 +1,8 @@
package net.corda.client.rpc
import net.corda.client.rpc.internal.RPCClientConfiguration
-import net.corda.core.flatMap
-import net.corda.core.map
+import net.corda.core.internal.concurrent.flatMap
+import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.RPCOps
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.User
@@ -44,13 +44,13 @@ open class AbstractRPCTest {
startInVmRpcClient(rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startInVmArtemisSession(rpcUser.username, rpcUser.password) })
}
- }.get()
+ }
RPCTestMode.Netty ->
startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { server ->
startRpcClient(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map {
TestProxy(it, { startArtemisSession(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password) })
}
- }.get()
- }
+ }
+ }.get()
}
}
diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt
index 0117504c2e..e798438fa7 100644
--- a/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt
+++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt
@@ -1,11 +1,11 @@
package net.corda.client.rpc
-import com.google.common.util.concurrent.Futures
-import com.google.common.util.concurrent.ListenableFuture
-import com.google.common.util.concurrent.SettableFuture
-import net.corda.core.getOrThrow
+import net.corda.core.concurrent.CordaFuture
+import net.corda.core.internal.concurrent.doneFuture
+import net.corda.core.internal.concurrent.openFuture
+import net.corda.core.internal.concurrent.thenMatch
import net.corda.core.messaging.RPCOps
-import net.corda.core.thenMatch
+import net.corda.core.utilities.getOrThrow
import net.corda.node.services.messaging.getRpcContext
import net.corda.nodeapi.RPCSinceVersion
import net.corda.testing.RPCDriverExposedDSLInterface
@@ -27,7 +27,9 @@ import kotlin.test.assertTrue
class ClientRPCInfrastructureTests : AbstractRPCTest() {
// TODO: Test that timeouts work
- private fun RPCDriverExposedDSLInterface.testProxy() = testProxy(TestOpsImpl()).ops
+ private fun RPCDriverExposedDSLInterface.testProxy(): TestOps {
+ return testProxy(TestOpsImpl()).ops
+ }
interface TestOps : RPCOps {
@Throws(IllegalArgumentException::class)
@@ -41,9 +43,9 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
fun makeComplicatedObservable(): Observable>>
- fun makeListenableFuture(): ListenableFuture
+ fun makeListenableFuture(): CordaFuture
- fun makeComplicatedListenableFuture(): ListenableFuture>>
+ fun makeComplicatedListenableFuture(): CordaFuture>>
@RPCSinceVersion(2)
fun addedLater()
@@ -52,7 +54,7 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
}
private lateinit var complicatedObservable: Observable>>
- private lateinit var complicatedListenableFuturee: ListenableFuture>>
+ private lateinit var complicatedListenableFuturee: CordaFuture>>
inner class TestOpsImpl : TestOps {
override val protocolVersion = 1
@@ -60,9 +62,9 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
override fun void() {}
override fun someCalculation(str: String, num: Int) = "$str $num"
override fun makeObservable(): Observable = Observable.just(1, 2, 3, 4)
- override fun makeListenableFuture(): ListenableFuture = Futures.immediateFuture(1)
+ override fun makeListenableFuture() = doneFuture(1)
override fun makeComplicatedObservable() = complicatedObservable
- override fun makeComplicatedListenableFuture(): ListenableFuture>> = complicatedListenableFuturee
+ override fun makeComplicatedListenableFuture() = complicatedListenableFuturee
override fun addedLater(): Unit = throw IllegalStateException()
override fun captureUser(): String = getRpcContext().currentUser.username
}
@@ -150,10 +152,10 @@ class ClientRPCInfrastructureTests : AbstractRPCTest() {
fun `complex ListenableFuture`() {
rpcDriver {
val proxy = testProxy()
- val serverQuote = SettableFuture.create>>()
+ val serverQuote = openFuture>>()
complicatedListenableFuturee = serverQuote
- val twainQuote = "Mark Twain" to Futures.immediateFuture("I have never let my schooling interfere with my education.")
+ val twainQuote = "Mark Twain" to doneFuture("I have never let my schooling interfere with my education.")
val clientQuotes = LinkedBlockingQueue()
val clientFuture = proxy.makeComplicatedListenableFuture()
diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt
index fb283773d1..ea78ef374e 100644
--- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt
+++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt
@@ -1,10 +1,10 @@
package net.corda.client.rpc
import net.corda.client.rpc.internal.RPCClientConfiguration
-import net.corda.core.future
import net.corda.core.messaging.RPCOps
-import net.corda.core.millis
+import net.corda.core.utilities.millis
import net.corda.core.crypto.random63BitValue
+import net.corda.core.internal.concurrent.fork
import net.corda.core.serialization.CordaSerializable
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.RPCDriverExposedDSLInterface
@@ -17,6 +17,7 @@ import rx.subjects.UnicastSubject
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
+import java.util.concurrent.ForkJoinPool
@RunWith(Parameterized::class)
class RPCConcurrencyTests : AbstractRPCTest() {
@@ -68,7 +69,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
Observable.empty>()
} else {
val publish = UnicastSubject.create>()
- future {
+ ForkJoinPool.commonPool().fork {
(1..branchingFactor).toList().parallelStream().forEach {
publish.onNext(getParallelObservableTree(depth - 1, branchingFactor))
}
@@ -105,7 +106,7 @@ class RPCConcurrencyTests : AbstractRPCTest() {
val done = CountDownLatch(numberOfBlockedCalls)
// Start a couple of blocking RPC calls
(1..numberOfBlockedCalls).forEach {
- future {
+ ForkJoinPool.commonPool().fork {
proxy.ops.waitLatch(id)
done.countDown()
}
diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt
index 362f001232..b9d64a3cab 100644
--- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt
+++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt
@@ -3,12 +3,11 @@ package net.corda.client.rpc
import com.google.common.base.Stopwatch
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.messaging.RPCOps
-import net.corda.core.minutes
-import net.corda.core.seconds
-import net.corda.core.utilities.div
+import net.corda.core.utilities.minutes
+import net.corda.core.utilities.seconds
+import net.corda.testing.performance.div
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.testing.RPCDriverExposedDSLInterface
-import net.corda.testing.driver.ShutdownManager
import net.corda.testing.measure
import net.corda.testing.performance.startPublishingFixedRateInjector
import net.corda.testing.performance.startReporter
diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt
index ebc9cef461..f31469bcb4 100644
--- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt
+++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt
@@ -1,8 +1,8 @@
package net.corda.client.rpc
import net.corda.core.messaging.RPCOps
-import net.corda.node.services.messaging.requirePermission
import net.corda.node.services.messaging.getRpcContext
+import net.corda.node.services.messaging.requirePermission
import net.corda.nodeapi.PermissionException
import net.corda.nodeapi.User
import net.corda.testing.RPCDriverExposedDSLInterface
diff --git a/constants.properties b/constants.properties
index 6993620217..4567dcffbb 100644
--- a/constants.properties
+++ b/constants.properties
@@ -1,4 +1,4 @@
-gradlePluginsVersion=0.13.2
+gradlePluginsVersion=0.13.6
kotlinVersion=1.1.1
guavaVersion=21.0
bouncycastleVersion=1.57
diff --git a/cordform-common/build.gradle b/cordform-common/build.gradle
index 340a4b6ec6..82274e0d09 100644
--- a/cordform-common/build.gradle
+++ b/cordform-common/build.gradle
@@ -17,3 +17,7 @@ dependencies {
// Bouncy Castle: for X.500 distinguished name manipulation
compile "org.bouncycastle:bcprov-jdk15on:$bouncycastle_version"
}
+
+publish {
+ name project.name
+}
\ No newline at end of file
diff --git a/cordform-common/src/main/java/net/corda/cordform/CordformNode.java b/cordform-common/src/main/java/net/corda/cordform/CordformNode.java
index 80a9a3795a..9175bead2f 100644
--- a/cordform-common/src/main/java/net/corda/cordform/CordformNode.java
+++ b/cordform-common/src/main/java/net/corda/cordform/CordformNode.java
@@ -4,6 +4,7 @@ import static java.util.Collections.emptyList;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -86,6 +87,6 @@ public class CordformNode implements NodeDefinition {
* @param id The (0-based) BFT replica ID.
*/
public void bftReplicaId(Integer id) {
- config = config.withValue("bftReplicaId", ConfigValueFactory.fromAnyRef(id));
+ config = config.withValue("bftSMaRt", ConfigValueFactory.fromMap(Collections.singletonMap("replicaId", id)));
}
}
diff --git a/core/build.gradle b/core/build.gradle
index e7dc4fa641..edf8fb1037 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -2,6 +2,7 @@ apply plugin: 'kotlin'
apply plugin: 'kotlin-jpa'
apply plugin: 'net.corda.plugins.quasar-utils'
apply plugin: 'net.corda.plugins.publish-utils'
+apply plugin: 'com.jfrog.artifactory'
description 'Corda core'
@@ -40,22 +41,12 @@ dependencies {
// AssertJ: for fluent assertions for testing
testCompile "org.assertj:assertj-core:${assertj_version}"
- // TODO: Upgrade to junit-quickcheck 0.8, once it is released,
- // because it depends on org.javassist:javassist instead
- // of javassist:javassist.
- testCompile "com.pholser:junit-quickcheck-core:$quickcheck_version"
- testCompile "com.pholser:junit-quickcheck-generators:$quickcheck_version"
-
// Guava: Google utilities library.
compile "com.google.guava:guava:$guava_version"
// RxJava: observable streams of events.
compile "io.reactivex:rxjava:$rxjava_version"
- // Kryo: object graph serialization.
- compile "com.esotericsoftware:kryo:4.0.0"
- compile "de.javakaffee:kryo-serializers:0.41"
-
// Apache JEXL: An embeddable expression evaluation library.
// This may be temporary until we experiment with other ways to do on-the-fly contract specialisation via an API.
compile "org.apache.commons:commons-jexl3:3.0"
@@ -98,5 +89,5 @@ jar {
}
publish {
- name = jar.baseName
+ name jar.baseName
}
diff --git a/core/src/main/java/net/corda/core/internal/package-info.java b/core/src/main/java/net/corda/core/internal/package-info.java
new file mode 100644
index 0000000000..aa06d1bace
--- /dev/null
+++ b/core/src/main/java/net/corda/core/internal/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * WARNING: This is an internal package and not part of the public API. Do not use anything found here or in any sub-package.
+ */
+package net.corda.core.internal;
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/CordaException.kt b/core/src/main/kotlin/net/corda/core/CordaException.kt
index 49ed6b6975..4e5353fa77 100644
--- a/core/src/main/kotlin/net/corda/core/CordaException.kt
+++ b/core/src/main/kotlin/net/corda/core/CordaException.kt
@@ -58,9 +58,9 @@ open class CordaException internal constructor(override var originalExceptionCla
}
}
-open class CordaRuntimeException internal constructor(override var originalExceptionClassName: String?,
- private var _message: String? = null,
- private var _cause: Throwable? = null) : RuntimeException(null, null, true, true), CordaThrowable {
+open class CordaRuntimeException(override var originalExceptionClassName: String?,
+ private var _message: String? = null,
+ private var _cause: Throwable? = null) : RuntimeException(null, null, true, true), CordaThrowable {
constructor(message: String?, cause: Throwable?) : this(null, message, cause)
override val message: String?
diff --git a/core/src/main/kotlin/net/corda/core/Streams.kt b/core/src/main/kotlin/net/corda/core/Streams.kt
deleted file mode 100644
index 2f33522c35..0000000000
--- a/core/src/main/kotlin/net/corda/core/Streams.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-package net.corda.core
-
-import java.util.*
-import java.util.Spliterator.*
-import java.util.stream.IntStream
-import java.util.stream.Stream
-import java.util.stream.StreamSupport
-import kotlin.streams.asSequence
-
-private fun IntProgression.spliteratorOfInt(): Spliterator.OfInt {
- val kotlinIterator = iterator()
- val javaIterator = object : PrimitiveIterator.OfInt {
- override fun nextInt() = kotlinIterator.nextInt()
- override fun hasNext() = kotlinIterator.hasNext()
- override fun remove() = throw UnsupportedOperationException("remove")
- }
- val spliterator = Spliterators.spliterator(
- javaIterator,
- (1 + (last - first) / step).toLong(),
- SUBSIZED or IMMUTABLE or NONNULL or SIZED or ORDERED or SORTED or DISTINCT
- )
- return if (step > 0) spliterator else object : Spliterator.OfInt by spliterator {
- override fun getComparator() = Comparator.reverseOrder()
- }
-}
-
-fun IntProgression.stream(): IntStream = StreamSupport.intStream(spliteratorOfInt(), false)
-
-@Suppress("UNCHECKED_CAST") // When toArray has filled in the array, the component type is no longer T? but T (that may itself be nullable).
-inline fun Stream.toTypedArray() = toArray { size -> arrayOfNulls(size) } as Array
diff --git a/core/src/main/kotlin/net/corda/core/Utils.kt b/core/src/main/kotlin/net/corda/core/Utils.kt
index 24d8247df0..d9702f53aa 100644
--- a/core/src/main/kotlin/net/corda/core/Utils.kt
+++ b/core/src/main/kotlin/net/corda/core/Utils.kt
@@ -1,102 +1,16 @@
-// TODO Move out the Kotlin specific stuff into a separate file
@file:JvmName("Utils")
package net.corda.core
-import com.google.common.base.Throwables
-import com.google.common.io.ByteStreams
-import com.google.common.util.concurrent.*
-import net.corda.core.crypto.SecureHash
-import net.corda.core.crypto.sha256
-import net.corda.core.flows.FlowException
-import net.corda.core.serialization.CordaSerializable
-import org.slf4j.Logger
+import net.corda.core.concurrent.CordaFuture
+import net.corda.core.internal.concurrent.openFuture
+import net.corda.core.internal.concurrent.thenMatch
import rx.Observable
import rx.Observer
-import rx.subjects.PublishSubject
-import rx.subjects.UnicastSubject
-import java.io.*
-import java.math.BigDecimal
-import java.nio.charset.Charset
-import java.nio.charset.StandardCharsets.UTF_8
-import java.nio.file.*
-import java.nio.file.attribute.FileAttribute
-import java.time.Duration
-import java.time.temporal.Temporal
-import java.util.concurrent.CompletableFuture
-import java.util.concurrent.ExecutionException
-import java.util.concurrent.Future
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.locks.ReentrantLock
-import java.util.stream.Stream
-import java.util.zip.Deflater
-import java.util.zip.ZipEntry
-import java.util.zip.ZipInputStream
-import java.util.zip.ZipOutputStream
-import kotlin.concurrent.withLock
-import kotlin.reflect.KClass
-import kotlin.reflect.KProperty
-val Int.days: Duration get() = Duration.ofDays(this.toLong())
-@Suppress("unused") // It's here for completeness
-val Int.hours: Duration get() = Duration.ofHours(this.toLong())
-val Int.minutes: Duration get() = Duration.ofMinutes(this.toLong())
-val Int.seconds: Duration get() = Duration.ofSeconds(this.toLong())
-val Int.millis: Duration get() = Duration.ofMillis(this.toLong())
+// TODO Delete this file once the Future stuff is out of here
-
-// TODO: Review by EOY2016 if we ever found these utilities helpful.
-val Int.bd: BigDecimal get() = BigDecimal(this)
-val Double.bd: BigDecimal get() = BigDecimal(this)
-val String.bd: BigDecimal get() = BigDecimal(this)
-val Long.bd: BigDecimal get() = BigDecimal(this)
-
-fun String.abbreviate(maxWidth: Int): String = if (length <= maxWidth) this else take(maxWidth - 1) + "…"
-
-/** Like the + operator but throws an exception in case of integer overflow. */
-infix fun Int.checkedAdd(b: Int) = Math.addExact(this, b)
-
-/** Like the + operator but throws an exception in case of integer overflow. */
-@Suppress("unused")
-infix fun Long.checkedAdd(b: Long) = Math.addExact(this, b)
-
-/** Same as [Future.get] but with a more descriptive name, and doesn't throw [ExecutionException], instead throwing its cause */
-fun Future.getOrThrow(timeout: Duration? = null): T {
- return try {
- if (timeout == null) get() else get(timeout.toNanos(), TimeUnit.NANOSECONDS)
- } catch (e: ExecutionException) {
- throw e.cause!!
- }
-}
-
-fun future(block: () -> V): Future = CompletableFuture.supplyAsync(block)
-
-fun , V> F.then(block: (F) -> V) = addListener(Runnable { block(this) }, MoreExecutors.directExecutor())
-
-fun Future.match(success: (U) -> V, failure: (Throwable) -> V): V {
- return success(try {
- getOrThrow()
- } catch (t: Throwable) {
- return failure(t)
- })
-}
-
-fun ListenableFuture.thenMatch(success: (U) -> V, failure: (Throwable) -> W) = then { it.match(success, failure) }
-fun ListenableFuture<*>.andForget(log: Logger) = then { it.match({}, { log.error("Background task failed:", it) }) }
-@Suppress("UNCHECKED_CAST") // We need the awkward cast because otherwise F cannot be nullable, even though it's safe.
-infix fun ListenableFuture.map(mapper: (F) -> T): ListenableFuture = Futures.transform(this, { (mapper as (F?) -> T)(it) })
-infix fun ListenableFuture.flatMap(mapper: (F) -> ListenableFuture): ListenableFuture = Futures.transformAsync(this) { mapper(it!!) }
-
-/** Executes the given block and sets the future to either the result, or any exception that was thrown. */
-inline fun SettableFuture.catch(block: () -> T) {
- try {
- set(block())
- } catch (t: Throwable) {
- setException(t)
- }
-}
-
-fun ListenableFuture.toObservable(): Observable {
+fun CordaFuture.toObservable(): Observable {
return Observable.create { subscriber ->
thenMatch({
subscriber.onNext(it)
@@ -107,303 +21,26 @@ fun ListenableFuture.toObservable(): Observable {
}
}
-/** Allows you to write code like: Paths.get("someDir") / "subdir" / "filename" but using the Paths API to avoid platform separator problems. */
-operator fun Path.div(other: String): Path = resolve(other)
-operator fun String.div(other: String): Path = Paths.get(this) / other
-
-fun Path.createDirectory(vararg attrs: FileAttribute<*>): Path = Files.createDirectory(this, *attrs)
-fun Path.createDirectories(vararg attrs: FileAttribute<*>): Path = Files.createDirectories(this, *attrs)
-fun Path.exists(vararg options: LinkOption): Boolean = Files.exists(this, *options)
-fun Path.copyToDirectory(targetDir: Path, vararg options: CopyOption): Path {
- require(targetDir.isDirectory()) { "$targetDir is not a directory" }
- val targetFile = targetDir.resolve(fileName)
- Files.copy(this, targetFile, *options)
- return targetFile
-}
-fun Path.moveTo(target: Path, vararg options: CopyOption): Path = Files.move(this, target, *options)
-fun Path.isRegularFile(vararg options: LinkOption): Boolean = Files.isRegularFile(this, *options)
-fun Path.isDirectory(vararg options: LinkOption): Boolean = Files.isDirectory(this, *options)
-val Path.size: Long get() = Files.size(this)
-inline fun Path.list(block: (Stream) -> R): R = Files.list(this).use(block)
-fun Path.deleteIfExists(): Boolean = Files.deleteIfExists(this)
-fun Path.readAll(): ByteArray = Files.readAllBytes(this)
-inline fun Path.read(vararg options: OpenOption, block: (InputStream) -> R): R = Files.newInputStream(this, *options).use(block)
-inline fun Path.write(createDirs: Boolean = false, vararg options: OpenOption = emptyArray(), block: (OutputStream) -> Unit) {
- if (createDirs) {
- normalize().parent?.createDirectories()
- }
- Files.newOutputStream(this, *options).use(block)
-}
-
-inline fun Path.readLines(charset: Charset = UTF_8, block: (Stream) -> R): R = Files.lines(this, charset).use(block)
-fun Path.readAllLines(charset: Charset = UTF_8): List = Files.readAllLines(this, charset)
-fun Path.writeLines(lines: Iterable, charset: Charset = UTF_8, vararg options: OpenOption): Path = Files.write(this, lines, charset, *options)
-
-fun InputStream.copyTo(target: Path, vararg options: CopyOption): Long = Files.copy(this, target, *options)
-
-// Simple infix function to add back null safety that the JDK lacks: timeA until timeB
-infix fun Temporal.until(endExclusive: Temporal): Duration = Duration.between(this, endExclusive)
-
-/** Returns the index of the given item or throws [IllegalArgumentException] if not found. */
-fun List.indexOfOrThrow(item: T): Int {
- val i = indexOf(item)
- require(i != -1)
- return i
-}
-
/**
- * Returns the single element matching the given [predicate], or `null` if element was not found,
- * or throws if more than one element was found.
- */
-fun Iterable.noneOrSingle(predicate: (T) -> Boolean): T? {
- var single: T? = null
- for (element in this) {
- if (predicate(element)) {
- if (single == null) {
- single = element
- } else throw IllegalArgumentException("Collection contains more than one matching element.")
- }
- }
- return single
-}
-
-/** Returns single element, or `null` if element was not found, or throws if more than one element was found. */
-fun Iterable.noneOrSingle(): T? {
- var single: T? = null
- for (element in this) {
- if (single == null) {
- single = element
- } else throw IllegalArgumentException("Collection contains more than one matching element.")
- }
- return single
-}
-
-/** Returns a random element in the list, or null if empty */
-fun List.randomOrNull(): T? {
- if (size <= 1) return firstOrNull()
- val randomIndex = (Math.random() * size).toInt()
- return get(randomIndex)
-}
-
-/** Returns a random element in the list matching the given predicate, or null if none found */
-fun List.randomOrNull(predicate: (T) -> Boolean) = filter(predicate).randomOrNull()
-
-inline fun elapsedTime(block: () -> Unit): Duration {
- val start = System.nanoTime()
- block()
- val end = System.nanoTime()
- return Duration.ofNanos(end - start)
-}
-
-// TODO: Add inline back when a new Kotlin version is released and check if the java.lang.VerifyError
-// returns in the IRSSimulationTest. If not, commit the inline back.
-fun logElapsedTime(label: String, logger: Logger? = null, body: () -> T): T {
- // Use nanoTime as it's monotonic.
- val now = System.nanoTime()
- try {
- return body()
- } finally {
- val elapsed = Duration.ofNanos(System.nanoTime() - now).toMillis()
- if (logger != null)
- logger.info("$label took $elapsed msec")
- else
- println("$label took $elapsed msec")
- }
-}
-
-fun Logger.logElapsedTime(label: String, body: () -> T): T = logElapsedTime(label, this, body)
-
-/**
- * A threadbox is a simple utility that makes it harder to forget to take a lock before accessing some shared state.
- * Simply define a private class to hold the data that must be grouped under the same lock, and then pass the only
- * instance to the ThreadBox constructor. You can now use the [locked] method with a lambda to take the lock in a
- * way that ensures it'll be released if there's an exception.
- *
- * Note that this technique is not infallible: if you capture a reference to the fields in another lambda which then
- * gets stored and invoked later, there may still be unsafe multi-threaded access going on, so watch out for that.
- * This is just a simple guard rail that makes it harder to slip up.
- *
- * Example:
- *
- * private class MutableState { var i = 5 }
- * private val state = ThreadBox(MutableState())
- *
- * val ii = state.locked { i }
- */
-class ThreadBox(val content: T, val lock: ReentrantLock = ReentrantLock()) {
- inline fun locked(body: T.() -> R): R = lock.withLock { body(content) }
- inline fun alreadyLocked(body: T.() -> R): R {
- check(lock.isHeldByCurrentThread, { "Expected $lock to already be locked." })
- return body(content)
- }
-
- fun checkNotLocked() = check(!lock.isHeldByCurrentThread)
-}
-
-/**
- * This represents a transient exception or condition that might no longer be thrown if the operation is re-run or called
- * again.
- *
- * We avoid the use of the word transient here to hopefully reduce confusion with the term in relation to (Java) serialization.
- */
-@CordaSerializable
-abstract class RetryableException(message: String) : FlowException(message)
-
-/**
- * A simple wrapper that enables the use of Kotlin's "val x by TransientProperty { ... }" syntax. Such a property
- * will not be serialized to disk, and if it's missing (or the first time it's accessed), the initializer will be
- * used to set it up. Note that the initializer will be called with the TransientProperty object locked.
- */
-class TransientProperty(private val initializer: () -> T) {
- @Transient private var v: T? = null
-
- @Synchronized
- operator fun getValue(thisRef: Any?, property: KProperty<*>) = v ?: initializer().also { v = it }
-}
-
-/**
- * Given a path to a zip file, extracts it to the given directory.
- */
-fun extractZipFile(zipFile: Path, toDirectory: Path) = extractZipFile(Files.newInputStream(zipFile), toDirectory)
-
-/**
- * Given a zip file input stream, extracts it to the given directory.
- */
-fun extractZipFile(inputStream: InputStream, toDirectory: Path) {
- val normalisedDirectory = toDirectory.normalize().createDirectories()
- ZipInputStream(BufferedInputStream(inputStream)).use {
- while (true) {
- val e = it.nextEntry ?: break
- val outPath = (normalisedDirectory / e.name).normalize()
-
- // Security checks: we should reject a zip that contains tricksy paths that try to escape toDirectory.
- check(outPath.startsWith(normalisedDirectory)) { "ZIP contained a path that resolved incorrectly: ${e.name}" }
-
- if (e.isDirectory) {
- outPath.createDirectories()
- continue
- }
- outPath.write { out ->
- ByteStreams.copy(it, out)
- }
- it.closeEntry()
- }
- }
-}
-
-/**
- * Get a valid InputStream from an in-memory zip as required for tests.
- * Note that a slightly bigger than numOfExpectedBytes size is expected.
- */
-@Throws(IllegalArgumentException::class)
-fun sizedInputStreamAndHash(numOfExpectedBytes: Int): InputStreamAndHash {
- if (numOfExpectedBytes <= 0) throw IllegalArgumentException("A positive number of numOfExpectedBytes is required.")
- val baos = ByteArrayOutputStream()
- ZipOutputStream(baos).use({ zos ->
- val arraySize = 1024
- val bytes = ByteArray(arraySize)
- val n = (numOfExpectedBytes - 1) / arraySize + 1 // same as Math.ceil(numOfExpectedBytes/arraySize).
- zos.setLevel(Deflater.NO_COMPRESSION)
- zos.putNextEntry(ZipEntry("z"))
- for (i in 0 until n) {
- zos.write(bytes, 0, arraySize)
- }
- zos.closeEntry()
- })
- return getInputStreamAndHashFromOutputStream(baos)
-}
-
-/** Convert a [ByteArrayOutputStream] to [InputStreamAndHash]. */
-fun getInputStreamAndHashFromOutputStream(baos: ByteArrayOutputStream): InputStreamAndHash {
- // TODO: Consider converting OutputStream to InputStream without creating a ByteArray, probably using piped streams.
- val bytes = baos.toByteArray()
- // TODO: Consider calculating sha256 on the fly using a DigestInputStream.
- return InputStreamAndHash(ByteArrayInputStream(bytes), bytes.sha256())
-}
-
-data class InputStreamAndHash(val inputStream: InputStream, val sha256: SecureHash.SHA256)
-
-// TODO: Generic csv printing utility for clases.
-
-val Throwable.rootCause: Throwable get() = Throwables.getRootCause(this)
-
-/**
- * Returns an Observable that buffers events until subscribed.
- * @see UnicastSubject
- */
-fun Observable.bufferUntilSubscribed(): Observable {
- val subject = UnicastSubject.create()
- val subscription = subscribe(subject)
- return subject.doOnUnsubscribe { subscription.unsubscribe() }
-}
-
-/**
- * Copy an [Observer] to multiple other [Observer]s.
- */
-fun Observer.tee(vararg teeTo: Observer): Observer {
- val subject = PublishSubject.create()
- subject.subscribe(this)
- teeTo.forEach { subject.subscribe(it) }
- return subject
-}
-
-/**
- * Returns a [ListenableFuture] bound to the *first* item emitted by this Observable. The future will complete with a
+ * Returns a [CordaFuture] bound to the *first* item emitted by this Observable. The future will complete with a
* NoSuchElementException if no items are emitted or any other error thrown by the Observable. If it's cancelled then
* it will unsubscribe from the observable.
*/
-fun Observable.toFuture(): ListenableFuture = ObservableToFuture(this)
+fun Observable.toFuture(): CordaFuture = openFuture().also {
+ val subscription = first().subscribe(object : Observer {
+ override fun onNext(value: T) {
+ it.set(value)
+ }
-private class ObservableToFuture(observable: Observable) : AbstractFuture(), Observer {
- private val subscription = observable.first().subscribe(this)
- override fun onNext(value: T) {
- set(value)
- }
+ override fun onError(e: Throwable) {
+ it.setException(e)
+ }
- override fun onError(e: Throwable) {
- setException(e)
- }
-
- override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
- subscription.unsubscribe()
- return super.cancel(mayInterruptIfRunning)
- }
-
- override fun onCompleted() {}
-}
-
-/** Return the sum of an Iterable of [BigDecimal]s. */
-fun Iterable.sum(): BigDecimal = fold(BigDecimal.ZERO) { a, b -> a + b }
-
-fun codePointsString(vararg codePoints: Int): String {
- val builder = StringBuilder()
- codePoints.forEach { builder.append(Character.toChars(it)) }
- return builder.toString()
-}
-
-fun Class.checkNotUnorderedHashMap() {
- if (HashMap::class.java.isAssignableFrom(this) && !LinkedHashMap::class.java.isAssignableFrom(this)) {
- throw NotSerializableException("Map type $this is unstable under iteration. Suggested fix: use LinkedHashMap instead.")
- }
-}
-
-fun Class<*>.requireExternal(msg: String = "Internal class")
- = require(!name.startsWith("net.corda.node.") && !name.contains(".internal.")) { "$msg: $name" }
-
-interface DeclaredField {
- companion object {
- inline fun Any?.declaredField(clazz: KClass<*>, name: String): DeclaredField = declaredField(clazz.java, name)
- inline fun Any.declaredField(name: String): DeclaredField = declaredField(javaClass, name)
- inline fun Any?.declaredField(clazz: Class<*>, name: String): DeclaredField {
- val javaField = clazz.getDeclaredField(name).apply { isAccessible = true }
- val receiver = this
- return object : DeclaredField {
- override var value
- get() = javaField.get(receiver) as T
- set(value) = javaField.set(receiver, value)
- }
+ override fun onCompleted() {}
+ })
+ it.then {
+ if (it.isCancelled) {
+ subscription.unsubscribe()
}
}
-
- var value: T
}
diff --git a/core/src/main/kotlin/net/corda/core/concurrent/ConcurrencyUtils.kt b/core/src/main/kotlin/net/corda/core/concurrent/ConcurrencyUtils.kt
index 8ab4d6f4e1..fd0947e1ca 100644
--- a/core/src/main/kotlin/net/corda/core/concurrent/ConcurrencyUtils.kt
+++ b/core/src/main/kotlin/net/corda/core/concurrent/ConcurrencyUtils.kt
@@ -1,34 +1,44 @@
package net.corda.core.concurrent
-import com.google.common.annotations.VisibleForTesting
-import com.google.common.util.concurrent.ListenableFuture
-import com.google.common.util.concurrent.SettableFuture
-import net.corda.core.catch
-import net.corda.core.match
-import net.corda.core.then
+import net.corda.core.internal.concurrent.openFuture
+import net.corda.core.utilities.getOrThrow
+import net.corda.core.internal.VisibleForTesting
import org.slf4j.Logger
import org.slf4j.LoggerFactory
+import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicBoolean
+/** Invoke [getOrThrow] and pass the value/throwable to success/failure respectively. */
+fun Future.match(success: (V) -> W, failure: (Throwable) -> W): W {
+ val value = try {
+ getOrThrow()
+ } catch (t: Throwable) {
+ return failure(t)
+ }
+ return success(value)
+}
+
/**
* As soon as a given future becomes done, the handler is invoked with that future as its argument.
* The result of the handler is copied into the result future, and the handler isn't invoked again.
* If a given future errors after the result future is done, the error is automatically logged.
*/
-fun firstOf(vararg futures: ListenableFuture, handler: (ListenableFuture) -> T) = firstOf(futures, defaultLog, handler)
+fun firstOf(vararg futures: CordaFuture, handler: (CordaFuture) -> W) = firstOf(futures, defaultLog, handler)
private val defaultLog = LoggerFactory.getLogger("net.corda.core.concurrent")
@VisibleForTesting
internal val shortCircuitedTaskFailedMessage = "Short-circuited task failed:"
-internal fun firstOf(futures: Array>, log: Logger, handler: (ListenableFuture) -> T): ListenableFuture {
- val resultFuture = SettableFuture.create()
+internal fun firstOf(futures: Array>, log: Logger, handler: (CordaFuture) -> W): CordaFuture {
+ val resultFuture = openFuture()
val winnerChosen = AtomicBoolean()
futures.forEach {
it.then {
if (winnerChosen.compareAndSet(false, true)) {
- resultFuture.catch { handler(it) }
- } else if (!it.isCancelled) {
+ resultFuture.capture { handler(it) }
+ } else if (it.isCancelled) {
+ // Do nothing.
+ } else {
it.match({}, { log.error(shortCircuitedTaskFailedMessage, it) })
}
}
diff --git a/core/src/main/kotlin/net/corda/core/concurrent/CordaFuture.kt b/core/src/main/kotlin/net/corda/core/concurrent/CordaFuture.kt
new file mode 100644
index 0000000000..3ecab2e395
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/concurrent/CordaFuture.kt
@@ -0,0 +1,22 @@
+package net.corda.core.concurrent
+
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.Future
+
+/**
+ * Same as [Future] with additional methods to provide some of the features of [java.util.concurrent.CompletableFuture] while minimising the API surface area.
+ * In Kotlin, to avoid compile errors, whenever CordaFuture is used in a parameter or extension method receiver type, its type parameter should be specified with out variance.
+ */
+interface CordaFuture : Future {
+ /**
+ * Run the given callback when this future is done, on the completion thread.
+ * If the completion thread is problematic for you e.g. deadlock, you can submit to an executor manually.
+ * If callback fails, its throwable is logged.
+ */
+ fun then(callback: (CordaFuture) -> W): Unit
+
+ /**
+ * @return a new [CompletableFuture] with the same outcome as this Future.
+ */
+ fun toCompletableFuture(): CompletableFuture
+}
diff --git a/core/src/main/kotlin/net/corda/core/contracts/Amount.kt b/core/src/main/kotlin/net/corda/core/contracts/Amount.kt
index 0569c6751c..e08ddce671 100644
--- a/core/src/main/kotlin/net/corda/core/contracts/Amount.kt
+++ b/core/src/main/kotlin/net/corda/core/contracts/Amount.kt
@@ -1,5 +1,8 @@
package net.corda.core.contracts
+import net.corda.core.crypto.composite.CompositeKey
+import net.corda.core.utilities.exactAdd
+import net.corda.core.identity.Party
import net.corda.core.serialization.CordaSerializable
import java.math.BigDecimal
import java.math.RoundingMode
@@ -168,7 +171,7 @@ data class Amount(val quantity: Long, val displayTokenSize: BigDecimal,
*/
operator fun plus(other: Amount): Amount {
checkToken(other)
- return Amount(Math.addExact(quantity, other.quantity), displayTokenSize, token)
+ return Amount(quantity exactAdd other.quantity, displayTokenSize, token)
}
/**
@@ -268,9 +271,9 @@ data class SourceAndAmount(val source: P, val amount: Amou
* but in various scenarios it may be more consistent to allow positive and negative values.
* For example it is common for a bank to code asset flows as gains and losses from its perspective i.e. always the destination.
* @param token represents the type of asset token as would be used to construct Amount objects.
- * @param source is the [Party], [Account], [CompositeKey], or other identifier of the token source if quantityDelta is positive,
+ * @param source is the [Party], [CompositeKey], or other identifier of the token source if quantityDelta is positive,
* or the token sink if quantityDelta is negative. The type P should support value equality.
- * @param destination is the [Party], [Account], [CompositeKey], or other identifier of the token sink if quantityDelta is positive,
+ * @param destination is the [Party], [CompositeKey], or other identifier of the token sink if quantityDelta is positive,
* or the token source if quantityDelta is negative. The type P should support value equality.
*/
@CordaSerializable
@@ -329,7 +332,7 @@ class AmountTransfer(val quantityDelta: Long,
"Only AmountTransfer between the same two parties can be aggregated/netted"
}
return if (other.source == source) {
- AmountTransfer(Math.addExact(quantityDelta, other.quantityDelta), token, source, destination)
+ AmountTransfer(quantityDelta exactAdd other.quantityDelta, token, source, destination)
} else {
AmountTransfer(Math.subtractExact(quantityDelta, other.quantityDelta), token, source, destination)
}
@@ -388,10 +391,10 @@ class AmountTransfer(val quantityDelta: Long,
* relative asset exchange happens, but with each party exchanging versus a central counterparty, or clearing house.
*
* @param centralParty The central party to face the exchange against.
- * @return Returns two new AmountTransfers each between one of the original parties and the centralParty.
+ * @return Returns a list of two new AmountTransfers each between one of the original parties and the centralParty.
* The net total exchange is the same as in the original input.
*/
- fun novate(centralParty: P): Pair, AmountTransfer> = Pair(copy(destination = centralParty), copy(source = centralParty))
+ fun novate(centralParty: P): List> = listOf(copy(destination = centralParty), copy(source = centralParty))
/**
* Applies this AmountTransfer to a list of [SourceAndAmount] objects representing balances.
diff --git a/core/src/main/kotlin/net/corda/core/contracts/ContractsDSL.kt b/core/src/main/kotlin/net/corda/core/contracts/ContractsDSL.kt
index 4446c3c04f..fd38e9aa99 100644
--- a/core/src/main/kotlin/net/corda/core/contracts/ContractsDSL.kt
+++ b/core/src/main/kotlin/net/corda/core/contracts/ContractsDSL.kt
@@ -2,6 +2,7 @@
package net.corda.core.contracts
+import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
import java.math.BigDecimal
import java.security.PublicKey
@@ -54,13 +55,6 @@ object Requirements {
infix inline fun String.using(expr: Boolean) {
if (!expr) throw IllegalArgumentException("Failed requirement: $this")
}
- // Avoid overloading Kotlin keywords
- @Deprecated("This function is deprecated, use 'using' instead",
- ReplaceWith("using (expr)", "net.corda.core.contracts.Requirements.using"))
- @Suppress("NOTHING_TO_INLINE") // Inlining this takes it out of our committed ABI.
- infix inline fun String.by(expr: Boolean) {
- using(expr)
- }
}
inline fun requireThat(body: Requirements.() -> R) = Requirements.body()
@@ -71,7 +65,7 @@ inline fun requireThat(body: Requirements.() -> R) = Requirements.body()
/** Filters the command list by type, party and public key all at once. */
inline fun Collection>.select(signer: PublicKey? = null,
- party: Party? = null) =
+ party: AbstractParty? = null) =
filter { it.value is T }.
filter { if (signer == null) true else signer in it.signers }.
filter { if (party == null) true else party in it.signingParties }.
diff --git a/core/src/main/kotlin/net/corda/core/contracts/Structures.kt b/core/src/main/kotlin/net/corda/core/contracts/Structures.kt
index 0740a4a7e4..f820cdd674 100644
--- a/core/src/main/kotlin/net/corda/core/contracts/Structures.kt
+++ b/core/src/main/kotlin/net/corda/core/contracts/Structures.kt
@@ -1,19 +1,24 @@
+@file:JvmName("Structures")
+
package net.corda.core.contracts
-import net.corda.core.contracts.clauses.Clause
import net.corda.core.crypto.SecureHash
+import net.corda.core.crypto.secureRandomBytes
import net.corda.core.flows.FlowLogicRef
import net.corda.core.flows.FlowLogicRefFactory
import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party
-import net.corda.core.serialization.*
+import net.corda.core.serialization.CordaSerializable
+import net.corda.core.serialization.MissingAttachmentsException
+import net.corda.core.serialization.SerializeAsTokenContext
+import net.corda.core.serialization.serialize
+import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.OpaqueBytes
import java.io.FileNotFoundException
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
import java.security.PublicKey
-import java.time.Duration
import java.time.Instant
import java.util.jar.JarInputStream
@@ -76,7 +81,7 @@ interface ContractState {
* A _participant_ is any party that is able to consume this state in a valid transaction.
*
* The list of participants is required for certain types of transactions. For example, when changing the notary
- * for this state ([TransactionType.NotaryChange]), every participant has to be involved and approve the transaction
+ * for this state, every participant has to be involved and approve the transaction
* so that they receive the updated state, and don't end up in a situation where they can no longer use a state
* they possess, since someone consumed that state during the notary change process.
*
@@ -141,6 +146,12 @@ data class Issued(val issuer: PartyAndReference, val product: P) {
fun Amount>.withoutIssuer(): Amount = Amount(quantity, token.product)
// DOCSTART 3
+
+/**
+ * Return structure for [OwnableState.withNewOwner]
+ */
+data class CommandAndState(val command: CommandData, val ownableState: OwnableState)
+
/**
* A contract state that can have a single owner.
*/
@@ -149,7 +160,7 @@ interface OwnableState : ContractState {
val owner: AbstractParty
/** Copies the underlying data structure, replacing the owner field with this new value and leaving the rest alone */
- fun withNewOwner(newOwner: AbstractParty): Pair
+ fun withNewOwner(newOwner: AbstractParty): CommandAndState
}
// DOCEND 3
@@ -199,26 +210,6 @@ interface LinearState : ContractState {
* True if this should be tracked by our vault(s).
*/
fun isRelevant(ourKeys: Set