mirror of
https://github.com/corda/corda.git
synced 2025-02-22 18:12:53 +00:00
Merge remote-tracking branch 'remotes/origin/master' into mnesbit-cor-261-artemis-over-ssl
# Conflicts: # build.gradle
This commit is contained in:
commit
e3aef96b09
12
build.gradle
12
build.gradle
@ -25,8 +25,8 @@ buildscript {
|
|||||||
ext.jetty_version = '9.3.9.v20160517'
|
ext.jetty_version = '9.3.9.v20160517'
|
||||||
ext.jersey_version = '2.23.1'
|
ext.jersey_version = '2.23.1'
|
||||||
ext.jolokia_version = '2.0.0-M1'
|
ext.jolokia_version = '2.0.0-M1'
|
||||||
ext.slf4j_version = '1.7.21'
|
|
||||||
ext.assertj_version = '3.5.1'
|
ext.assertj_version = '3.5.1'
|
||||||
|
ext.log4j_version = '2.6.2'
|
||||||
ext.bouncycastle_version = '1.54'
|
ext.bouncycastle_version = '1.54'
|
||||||
|
|
||||||
repositories {
|
repositories {
|
||||||
@ -64,6 +64,16 @@ sourceSets {
|
|||||||
srcDir file('src/integration-test/kotlin')
|
srcDir file('src/integration-test/kotlin')
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
main {
|
||||||
|
resources {
|
||||||
|
srcDir "config/dev"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
test {
|
||||||
|
resources {
|
||||||
|
srcDir "config/test"
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//noinspection GroovyAssignabilityCheck
|
//noinspection GroovyAssignabilityCheck
|
||||||
|
55
config/dev/log4j2.xml
Normal file
55
config/dev/log4j2.xml
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<Configuration status="info">
|
||||||
|
|
||||||
|
<Properties>
|
||||||
|
<Property name="log-path">logs</Property>
|
||||||
|
<Property name="log-name">node-${hostName}</Property>
|
||||||
|
<Property name="archive">${log-path}/archive</Property>
|
||||||
|
</Properties>
|
||||||
|
|
||||||
|
<ThresholdFilter level="trace"/>
|
||||||
|
|
||||||
|
<Appenders>
|
||||||
|
<Console name="Console-Appender" target="SYSTEM_OUT">
|
||||||
|
<PatternLayout>
|
||||||
|
<pattern>
|
||||||
|
[%-5level] %d{HH:mm:ss.SSS} [%t] %c{1}.%M - %msg%n
|
||||||
|
</pattern>>
|
||||||
|
</PatternLayout>
|
||||||
|
</Console>
|
||||||
|
|
||||||
|
<!-- Will generate up to 10 log files for a given day. During every rollover it will delete
|
||||||
|
those that are older than 60 days, but keep the most recent 10 GB -->
|
||||||
|
<RollingFile name="RollingFile-Appender"
|
||||||
|
fileName="${log-path}/${log-name}.log"
|
||||||
|
filePattern="${archive}/${log-name}.%d{yyyy-MM-dd}-%i.log.gz">
|
||||||
|
|
||||||
|
<PatternLayout pattern="[%-5level] %d{ISO8601}{GMT+0} [%t] %c{1} - %msg%n"/>
|
||||||
|
|
||||||
|
<Policies>
|
||||||
|
<TimeBasedTriggeringPolicy/>
|
||||||
|
<SizeBasedTriggeringPolicy size="10MB"/>
|
||||||
|
</Policies>
|
||||||
|
|
||||||
|
<DefaultRolloverStrategy min="1" max="10">
|
||||||
|
<Delete basePath="${archive}" maxDepth="1">
|
||||||
|
<IfFileName glob="${log-name}*.log.gz"/>
|
||||||
|
<IfLastModified age="60d">
|
||||||
|
<IfAny>
|
||||||
|
<IfAccumulatedFileSize exceeds="10 GB"/>
|
||||||
|
</IfAny>
|
||||||
|
</IfLastModified>
|
||||||
|
</Delete>
|
||||||
|
</DefaultRolloverStrategy>
|
||||||
|
|
||||||
|
</RollingFile>
|
||||||
|
</Appenders>
|
||||||
|
|
||||||
|
<Loggers>
|
||||||
|
<Root level="trace">
|
||||||
|
<AppenderRef ref="Console-Appender" level="info"/>
|
||||||
|
<AppenderRef ref="RollingFile-Appender" level="debug"/>
|
||||||
|
</Root>
|
||||||
|
</Loggers>
|
||||||
|
|
||||||
|
</Configuration>
|
17
config/test/log4j2.xml
Normal file
17
config/test/log4j2.xml
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<Configuration status="info">
|
||||||
|
<Appenders>
|
||||||
|
<Console name="Console-Appender" target="SYSTEM_OUT">
|
||||||
|
<PatternLayout>
|
||||||
|
<pattern>
|
||||||
|
[%-5level] %d{HH:mm:ss.SSS} [%t] %c{1}.%M - %msg%n
|
||||||
|
</pattern>>
|
||||||
|
</PatternLayout>
|
||||||
|
</Console>
|
||||||
|
</Appenders>
|
||||||
|
<Loggers>
|
||||||
|
<Root level="info">
|
||||||
|
<AppenderRef ref="Console-Appender" level="info"/>
|
||||||
|
</Root>
|
||||||
|
</Loggers>
|
||||||
|
</Configuration>
|
@ -81,3 +81,11 @@ dependencies {
|
|||||||
|
|
||||||
testCompile 'junit:junit:4.12'
|
testCompile 'junit:junit:4.12'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sourceSets {
|
||||||
|
test {
|
||||||
|
resources {
|
||||||
|
srcDir "../config/test"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -79,3 +79,11 @@ repositories {
|
|||||||
dependencies {
|
dependencies {
|
||||||
compile project(':core')
|
compile project(':core')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sourceSets {
|
||||||
|
test {
|
||||||
|
resources {
|
||||||
|
srcDir "../../config/test"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -21,6 +21,14 @@ repositories {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sourceSets {
|
||||||
|
test {
|
||||||
|
resources {
|
||||||
|
srcDir "../config/test"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
testCompile 'junit:junit:4.12'
|
testCompile 'junit:junit:4.12'
|
||||||
testCompile "commons-fileupload:commons-fileupload:1.3.2"
|
testCompile "commons-fileupload:commons-fileupload:1.3.2"
|
||||||
@ -35,12 +43,13 @@ dependencies {
|
|||||||
// Thread safety annotations
|
// Thread safety annotations
|
||||||
compile "com.google.code.findbugs:jsr305:3.0.1"
|
compile "com.google.code.findbugs:jsr305:3.0.1"
|
||||||
|
|
||||||
|
// Log4J: logging framework (with SLF4J bindings)
|
||||||
|
compile "org.apache.logging.log4j:log4j-slf4j-impl:${log4j_version}"
|
||||||
|
compile "org.apache.logging.log4j:log4j-core:${log4j_version}"
|
||||||
|
|
||||||
// AssertJ: for fluent assertions for testing
|
// AssertJ: for fluent assertions for testing
|
||||||
testCompile "org.assertj:assertj-core:${assertj_version}"
|
testCompile "org.assertj:assertj-core:${assertj_version}"
|
||||||
|
|
||||||
// SLF4J: Logging framework.
|
|
||||||
compile "org.slf4j:slf4j-jdk14:${slf4j_version}"
|
|
||||||
|
|
||||||
// Guava: Google utilities library.
|
// Guava: Google utilities library.
|
||||||
compile "com.google.guava:guava:19.0"
|
compile "com.google.guava:guava:19.0"
|
||||||
|
|
||||||
|
@ -19,6 +19,14 @@ configurations {
|
|||||||
runtime.exclude module: 'isolated'
|
runtime.exclude module: 'isolated'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sourceSets {
|
||||||
|
test {
|
||||||
|
resources {
|
||||||
|
srcDir "../config/test"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// To find potential version conflicts, run "gradle htmlDependencyReport" and then look in
|
// To find potential version conflicts, run "gradle htmlDependencyReport" and then look in
|
||||||
// build/reports/project/dependencies/index.html for green highlighted parts of the tree.
|
// build/reports/project/dependencies/index.html for green highlighted parts of the tree.
|
||||||
|
|
||||||
@ -26,7 +34,10 @@ dependencies {
|
|||||||
compile project(':contracts')
|
compile project(':contracts')
|
||||||
|
|
||||||
compile "com.google.code.findbugs:jsr305:3.0.1"
|
compile "com.google.code.findbugs:jsr305:3.0.1"
|
||||||
compile "org.slf4j:slf4j-jdk14:${slf4j_version}"
|
|
||||||
|
// Log4J: logging framework (with SLF4J bindings)
|
||||||
|
compile "org.apache.logging.log4j:log4j-slf4j-impl:${log4j_version}"
|
||||||
|
compile "org.apache.logging.log4j:log4j-core:${log4j_version}"
|
||||||
|
|
||||||
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
|
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
|
||||||
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
|
compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
package com.r3corda.node.services.api
|
package com.r3corda.node.services.api
|
||||||
|
|
||||||
import com.r3corda.core.serialization.SerializedBytes
|
import com.r3corda.core.serialization.SerializedBytes
|
||||||
|
import com.r3corda.node.services.statemachine.FiberRequest
|
||||||
import com.r3corda.node.services.statemachine.ProtocolStateMachineImpl
|
import com.r3corda.node.services.statemachine.ProtocolStateMachineImpl
|
||||||
|
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thread-safe storage of fiber checkpoints.
|
* Thread-safe storage of fiber checkpoints.
|
||||||
@ -31,7 +33,5 @@ interface CheckpointStorage {
|
|||||||
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).
|
// This class will be serialised, so everything it points to transitively must also be serialisable (with Kryo).
|
||||||
data class Checkpoint(
|
data class Checkpoint(
|
||||||
val serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
|
val serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
|
||||||
val awaitingTopic: String?,
|
val request: FiberRequest?
|
||||||
val awaitingPayloadType: String?,
|
|
||||||
val receivedPayload: Any?
|
|
||||||
)
|
)
|
@ -0,0 +1,81 @@
|
|||||||
|
package com.r3corda.node.services.statemachine
|
||||||
|
|
||||||
|
import com.r3corda.core.crypto.Party
|
||||||
|
|
||||||
|
// TODO: Clean this up
|
||||||
|
sealed class FiberRequest(val topic: String,
|
||||||
|
val destination: Party?,
|
||||||
|
val sessionIDForSend: Long,
|
||||||
|
val sessionIDForReceive: Long,
|
||||||
|
val payload: Any?) {
|
||||||
|
// This is used to identify where we suspended, in case of message mismatch errors and other things where we
|
||||||
|
// don't have the original stack trace because it's in a suspended fiber.
|
||||||
|
@Transient
|
||||||
|
val stackTraceInCaseOfProblems: StackSnapshot? = StackSnapshot()
|
||||||
|
|
||||||
|
val receiveTopic: String
|
||||||
|
get() = topic + "." + sessionIDForReceive
|
||||||
|
|
||||||
|
|
||||||
|
override fun equals(other: Any?): Boolean
|
||||||
|
= if (other is FiberRequest) {
|
||||||
|
topic == other.topic
|
||||||
|
&& destination == other.destination
|
||||||
|
&& sessionIDForSend == other.sessionIDForSend
|
||||||
|
&& sessionIDForReceive == other.sessionIDForReceive
|
||||||
|
&& payload == other.payload
|
||||||
|
} else
|
||||||
|
false
|
||||||
|
|
||||||
|
override fun hashCode(): Int {
|
||||||
|
var hash = 1L
|
||||||
|
|
||||||
|
hash = (hash * 31) + topic.hashCode()
|
||||||
|
hash = (hash * 31) + if (destination == null)
|
||||||
|
0
|
||||||
|
else
|
||||||
|
destination.hashCode()
|
||||||
|
hash = (hash * 31) + sessionIDForReceive
|
||||||
|
hash = (hash * 31) + sessionIDForReceive
|
||||||
|
hash = (hash * 31) + if (payload == null)
|
||||||
|
0
|
||||||
|
else
|
||||||
|
payload.hashCode()
|
||||||
|
|
||||||
|
return hash.toInt()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A fiber which is expecting a message response.
|
||||||
|
*/
|
||||||
|
class ExpectingResponse<R : Any>(
|
||||||
|
topic: String,
|
||||||
|
destination: Party?,
|
||||||
|
sessionIDForSend: Long,
|
||||||
|
sessionIDForReceive: Long,
|
||||||
|
obj: Any?,
|
||||||
|
type: Class<R>
|
||||||
|
) : FiberRequest(topic, destination, sessionIDForSend, sessionIDForReceive, obj) {
|
||||||
|
private val responseTypeName: String = type.name
|
||||||
|
|
||||||
|
override fun equals(other: Any?): Boolean
|
||||||
|
= if (other is ExpectingResponse<*>) {
|
||||||
|
super.equals(other)
|
||||||
|
&& responseTypeName == other.responseTypeName
|
||||||
|
} else
|
||||||
|
false
|
||||||
|
|
||||||
|
override fun toString(): String {
|
||||||
|
return "Expecting response via topic ${receiveTopic} of type ${responseTypeName}"
|
||||||
|
}
|
||||||
|
val responseType: Class<R>
|
||||||
|
get() = Class.forName(responseTypeName) as Class<R>
|
||||||
|
}
|
||||||
|
|
||||||
|
class NotExpectingResponse(
|
||||||
|
topic: String,
|
||||||
|
destination: Party,
|
||||||
|
sessionIDForSend: Long,
|
||||||
|
obj: Any?
|
||||||
|
) : FiberRequest(topic, destination, sessionIDForSend, -1, obj)
|
||||||
|
}
|
@ -28,7 +28,7 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
|
|||||||
|
|
||||||
// These fields shouldn't be serialised, so they are marked @Transient.
|
// These fields shouldn't be serialised, so they are marked @Transient.
|
||||||
@Transient lateinit override var serviceHub: ServiceHubInternal
|
@Transient lateinit override var serviceHub: ServiceHubInternal
|
||||||
@Transient internal lateinit var suspendAction: (StateMachineManager.FiberRequest) -> Unit
|
@Transient internal lateinit var suspendAction: (FiberRequest) -> Unit
|
||||||
@Transient internal lateinit var actionOnEnd: () -> Unit
|
@Transient internal lateinit var actionOnEnd: () -> Unit
|
||||||
@Transient internal var receivedPayload: Any? = null
|
@Transient internal var receivedPayload: Any? = null
|
||||||
|
|
||||||
@ -72,7 +72,7 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Suspendable @Suppress("UNCHECKED_CAST")
|
@Suspendable @Suppress("UNCHECKED_CAST")
|
||||||
private fun <T : Any> suspendAndExpectReceive(with: StateMachineManager.FiberRequest): UntrustworthyData<T> {
|
private fun <T : Any> suspendAndExpectReceive(with: FiberRequest): UntrustworthyData<T> {
|
||||||
suspend(with)
|
suspend(with)
|
||||||
check(receivedPayload != null) { "Expected to receive something" }
|
check(receivedPayload != null) { "Expected to receive something" }
|
||||||
val untrustworthy = UntrustworthyData(receivedPayload as T)
|
val untrustworthy = UntrustworthyData(receivedPayload as T)
|
||||||
@ -87,24 +87,24 @@ class ProtocolStateMachineImpl<R>(val logic: ProtocolLogic<R>,
|
|||||||
sessionIDForReceive: Long,
|
sessionIDForReceive: Long,
|
||||||
payload: Any,
|
payload: Any,
|
||||||
recvType: Class<T>): UntrustworthyData<T> {
|
recvType: Class<T>): UntrustworthyData<T> {
|
||||||
val result = StateMachineManager.FiberRequest.ExpectingResponse(topic, destination, sessionIDForSend, sessionIDForReceive, payload, recvType)
|
val result = FiberRequest.ExpectingResponse(topic, destination, sessionIDForSend, sessionIDForReceive, payload, recvType)
|
||||||
return suspendAndExpectReceive(result)
|
return suspendAndExpectReceive(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun <T : Any> receive(topic: String, sessionIDForReceive: Long, recvType: Class<T>): UntrustworthyData<T> {
|
override fun <T : Any> receive(topic: String, sessionIDForReceive: Long, recvType: Class<T>): UntrustworthyData<T> {
|
||||||
val result = StateMachineManager.FiberRequest.ExpectingResponse(topic, null, -1, sessionIDForReceive, null, recvType)
|
val result = FiberRequest.ExpectingResponse(topic, null, -1, sessionIDForReceive, null, recvType)
|
||||||
return suspendAndExpectReceive(result)
|
return suspendAndExpectReceive(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
override fun send(topic: String, destination: Party, sessionID: Long, payload: Any) {
|
override fun send(topic: String, destination: Party, sessionID: Long, payload: Any) {
|
||||||
val result = StateMachineManager.FiberRequest.NotExpectingResponse(topic, destination, sessionID, payload)
|
val result = FiberRequest.NotExpectingResponse(topic, destination, sessionID, payload)
|
||||||
suspend(result)
|
suspend(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
private fun suspend(with: StateMachineManager.FiberRequest) {
|
private fun suspend(with: FiberRequest) {
|
||||||
parkAndSerialize { fiber, serializer ->
|
parkAndSerialize { fiber, serializer ->
|
||||||
try {
|
try {
|
||||||
suspendAction(with)
|
suspendAction(with)
|
||||||
|
@ -8,7 +8,6 @@ import com.esotericsoftware.kryo.Kryo
|
|||||||
import com.google.common.base.Throwables
|
import com.google.common.base.Throwables
|
||||||
import com.google.common.util.concurrent.ListenableFuture
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
import com.r3corda.core.abbreviate
|
import com.r3corda.core.abbreviate
|
||||||
import com.r3corda.core.crypto.Party
|
|
||||||
import com.r3corda.core.messaging.runOnNextMessage
|
import com.r3corda.core.messaging.runOnNextMessage
|
||||||
import com.r3corda.core.messaging.send
|
import com.r3corda.core.messaging.send
|
||||||
import com.r3corda.core.protocols.ProtocolLogic
|
import com.r3corda.core.protocols.ProtocolLogic
|
||||||
@ -123,21 +122,34 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
|||||||
val fiber = deserializeFiber(checkpoint.serialisedFiber)
|
val fiber = deserializeFiber(checkpoint.serialisedFiber)
|
||||||
initFiber(fiber, { checkpoint })
|
initFiber(fiber, { checkpoint })
|
||||||
|
|
||||||
val topic = checkpoint.awaitingTopic
|
when (checkpoint.request) {
|
||||||
if (topic != null) {
|
is FiberRequest.ExpectingResponse<*> -> {
|
||||||
val awaitingPayloadType = Class.forName(checkpoint.awaitingPayloadType)
|
val topic = checkpoint.request.receiveTopic
|
||||||
|
val awaitingPayloadType = checkpoint.request.responseType
|
||||||
fiber.logger.info("Restored ${fiber.logic} - it was previously waiting for message of type ${awaitingPayloadType.name} on topic $topic")
|
fiber.logger.info("Restored ${fiber.logic} - it was previously waiting for message of type ${awaitingPayloadType.name} on topic $topic")
|
||||||
iterateOnResponse(fiber, awaitingPayloadType, checkpoint.serialisedFiber, topic) {
|
iterateOnResponse(fiber, awaitingPayloadType, checkpoint.serialisedFiber, checkpoint.request) {
|
||||||
try {
|
try {
|
||||||
Fiber.unparkDeserialized(fiber, scheduler)
|
Fiber.unparkDeserialized(fiber, scheduler)
|
||||||
} catch (e: Throwable) {
|
} catch (e: Throwable) {
|
||||||
logError(e, it, topic, fiber)
|
logError(e, it, topic, fiber)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
fiber.logger.info("Restored ${fiber.logic} - it was not waiting on any message; received payload: ${checkpoint.receivedPayload.toString().abbreviate(50)}")
|
is FiberRequest.NotExpectingResponse -> restoreNotExpectingResponse(fiber, checkpoint.request)
|
||||||
|
null -> restoreNotExpectingResponse(fiber)
|
||||||
|
else -> throw IllegalStateException("Unknown fiber request type " + checkpoint.request)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Restore a Fiber which was not expecting a response (either because it hasn't asked for one, such as a new
|
||||||
|
* Fiber, or because the detail it needed has arrived).
|
||||||
|
*/
|
||||||
|
private fun restoreNotExpectingResponse(fiber: ProtocolStateMachineImpl<*>, request: FiberRequest? = null) {
|
||||||
|
val payload = request?.payload
|
||||||
|
fiber.logger.info("Restored ${fiber.logic} - it was not waiting on any message; received payload: ${payload.toString().abbreviate(50)}")
|
||||||
executor.executeASAP {
|
executor.executeASAP {
|
||||||
iterateStateMachine(fiber, checkpoint.receivedPayload) {
|
iterateStateMachine(fiber, payload) {
|
||||||
try {
|
try {
|
||||||
Fiber.unparkDeserialized(fiber, scheduler)
|
Fiber.unparkDeserialized(fiber, scheduler)
|
||||||
} catch (e: Throwable) {
|
} catch (e: Throwable) {
|
||||||
@ -146,7 +158,6 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private fun serializeFiber(fiber: ProtocolStateMachineImpl<*>): SerializedBytes<ProtocolStateMachineImpl<*>> {
|
private fun serializeFiber(fiber: ProtocolStateMachineImpl<*>): SerializedBytes<ProtocolStateMachineImpl<*>> {
|
||||||
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
|
// We don't use the passed-in serializer here, because we need to use our own augmented Kryo.
|
||||||
@ -207,7 +218,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
|||||||
val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName)
|
val fiber = ProtocolStateMachineImpl(logic, scheduler, loggerName)
|
||||||
// Need to add before iterating in case of immediate completion
|
// Need to add before iterating in case of immediate completion
|
||||||
initFiber(fiber) {
|
initFiber(fiber) {
|
||||||
val checkpoint = Checkpoint(serializeFiber(fiber), null, null, null)
|
val checkpoint = Checkpoint(serializeFiber(fiber), null)
|
||||||
checkpointStorage.addCheckpoint(checkpoint)
|
checkpointStorage.addCheckpoint(checkpoint)
|
||||||
checkpoint
|
checkpoint
|
||||||
}
|
}
|
||||||
@ -226,10 +237,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
|||||||
|
|
||||||
private fun updateCheckpoint(psm: ProtocolStateMachineImpl<*>,
|
private fun updateCheckpoint(psm: ProtocolStateMachineImpl<*>,
|
||||||
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
|
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
|
||||||
awaitingTopic: String?,
|
request: FiberRequest) {
|
||||||
awaitingPayloadType: Class<*>?,
|
val newCheckpoint = Checkpoint(serialisedFiber, request)
|
||||||
receivedPayload: Any?) {
|
|
||||||
val newCheckpoint = Checkpoint(serialisedFiber, awaitingTopic, awaitingPayloadType?.name, receivedPayload)
|
|
||||||
val previousCheckpoint = stateMachines.put(psm, newCheckpoint)
|
val previousCheckpoint = stateMachines.put(psm, newCheckpoint)
|
||||||
if (previousCheckpoint != null) {
|
if (previousCheckpoint != null) {
|
||||||
checkpointStorage.removeCheckpoint(previousCheckpoint)
|
checkpointStorage.removeCheckpoint(previousCheckpoint)
|
||||||
@ -249,10 +258,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
|||||||
|
|
||||||
private fun onNextSuspend(psm: ProtocolStateMachineImpl<*>, request: FiberRequest) {
|
private fun onNextSuspend(psm: ProtocolStateMachineImpl<*>, request: FiberRequest) {
|
||||||
// We have a request to do something: send, receive, or send-and-receive.
|
// We have a request to do something: send, receive, or send-and-receive.
|
||||||
if (request is FiberRequest.ExpectingResponse<*>) {
|
when (request) {
|
||||||
|
is FiberRequest.ExpectingResponse<*> -> {
|
||||||
// Prepare a listener on the network that runs in the background thread when we receive a message.
|
// Prepare a listener on the network that runs in the background thread when we receive a message.
|
||||||
checkpointOnExpectingResponse(psm, request)
|
checkpointOnExpectingResponse(psm, request)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// If a non-null payload to send was provided, send it now.
|
// If a non-null payload to send was provided, send it now.
|
||||||
request.payload?.let {
|
request.payload?.let {
|
||||||
val topic = "${request.topic}.${request.sessionIDForSend}"
|
val topic = "${request.topic}.${request.sessionIDForSend}"
|
||||||
@ -277,9 +288,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
|||||||
executor.checkOnThread()
|
executor.checkOnThread()
|
||||||
val topic = "${request.topic}.${request.sessionIDForReceive}"
|
val topic = "${request.topic}.${request.sessionIDForReceive}"
|
||||||
val serialisedFiber = serializeFiber(psm)
|
val serialisedFiber = serializeFiber(psm)
|
||||||
updateCheckpoint(psm, serialisedFiber, topic, request.responseType, null)
|
updateCheckpoint(psm, serialisedFiber, request)
|
||||||
psm.logger.trace { "Preparing to receive message of type ${request.responseType.name} on topic $topic" }
|
psm.logger.trace { "Preparing to receive message of type ${request.responseType.name} on topic $topic" }
|
||||||
iterateOnResponse(psm, request.responseType, serialisedFiber, topic) {
|
iterateOnResponse(psm, request.responseType, serialisedFiber, request) {
|
||||||
try {
|
try {
|
||||||
Fiber.unpark(psm, QUASAR_UNBLOCKER)
|
Fiber.unpark(psm, QUASAR_UNBLOCKER)
|
||||||
} catch(e: Throwable) {
|
} catch(e: Throwable) {
|
||||||
@ -288,11 +299,16 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a trigger to the [MessagingService] to deserialize the fiber and pass message content to it, once a message is
|
||||||
|
* received.
|
||||||
|
*/
|
||||||
private fun iterateOnResponse(psm: ProtocolStateMachineImpl<*>,
|
private fun iterateOnResponse(psm: ProtocolStateMachineImpl<*>,
|
||||||
responseType: Class<*>,
|
responseType: Class<*>,
|
||||||
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
|
serialisedFiber: SerializedBytes<ProtocolStateMachineImpl<*>>,
|
||||||
topic: String,
|
request: FiberRequest.ExpectingResponse<*>,
|
||||||
resumeAction: (Any?) -> Unit) {
|
resumeAction: (Any?) -> Unit) {
|
||||||
|
val topic = request.receiveTopic
|
||||||
serviceHub.networkService.runOnNextMessage(topic, executor) { netMsg ->
|
serviceHub.networkService.runOnNextMessage(topic, executor) { netMsg ->
|
||||||
// Assertion to ensure we don't execute on the wrong thread.
|
// Assertion to ensure we don't execute on the wrong thread.
|
||||||
executor.checkOnThread()
|
executor.checkOnThread()
|
||||||
@ -305,38 +321,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableService
|
|||||||
val payload = netMsg.data.deserialize<Any>()
|
val payload = netMsg.data.deserialize<Any>()
|
||||||
check(responseType.isInstance(payload)) { "Expected message of type ${responseType.name} but got ${payload.javaClass.name}" }
|
check(responseType.isInstance(payload)) { "Expected message of type ${responseType.name} but got ${payload.javaClass.name}" }
|
||||||
// Update the fiber's checkpoint so that it's no longer waiting on a response, but rather has the received payload
|
// Update the fiber's checkpoint so that it's no longer waiting on a response, but rather has the received payload
|
||||||
updateCheckpoint(psm, serialisedFiber, null, null, payload)
|
updateCheckpoint(psm, serialisedFiber, request)
|
||||||
psm.logger.trace { "Received message of type ${payload.javaClass.name} on topic $topic (${payload.toString().abbreviate(50)})" }
|
psm.logger.trace { "Received message of type ${payload.javaClass.name} on topic $topic (${payload.toString().abbreviate(50)})" }
|
||||||
iterateStateMachine(psm, payload, resumeAction)
|
iterateStateMachine(psm, payload, resumeAction)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Clean this up
|
|
||||||
open class FiberRequest(val topic: String,
|
|
||||||
val destination: Party?,
|
|
||||||
val sessionIDForSend: Long,
|
|
||||||
val sessionIDForReceive: Long,
|
|
||||||
val payload: Any?) {
|
|
||||||
// This is used to identify where we suspended, in case of message mismatch errors and other things where we
|
|
||||||
// don't have the original stack trace because it's in a suspended fiber.
|
|
||||||
val stackTraceInCaseOfProblems = StackSnapshot()
|
|
||||||
|
|
||||||
class ExpectingResponse<R : Any>(
|
|
||||||
topic: String,
|
|
||||||
destination: Party?,
|
|
||||||
sessionIDForSend: Long,
|
|
||||||
sessionIDForReceive: Long,
|
|
||||||
obj: Any?,
|
|
||||||
val responseType: Class<R>
|
|
||||||
) : FiberRequest(topic, destination, sessionIDForSend, sessionIDForReceive, obj)
|
|
||||||
|
|
||||||
class NotExpectingResponse(
|
|
||||||
topic: String,
|
|
||||||
destination: Party,
|
|
||||||
sessionIDForSend: Long,
|
|
||||||
obj: Any?
|
|
||||||
) : FiberRequest(topic, destination, sessionIDForSend, -1, obj)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class StackSnapshot : Throwable("This is a stack trace to help identify the source of the underlying problem")
|
class StackSnapshot : Throwable("This is a stack trace to help identify the source of the underlying problem")
|
||||||
|
@ -3,8 +3,11 @@ package com.r3corda.node.services.persistence
|
|||||||
import com.google.common.jimfs.Configuration.unix
|
import com.google.common.jimfs.Configuration.unix
|
||||||
import com.google.common.jimfs.Jimfs
|
import com.google.common.jimfs.Jimfs
|
||||||
import com.google.common.primitives.Ints
|
import com.google.common.primitives.Ints
|
||||||
|
import com.r3corda.core.random63BitValue
|
||||||
import com.r3corda.core.serialization.SerializedBytes
|
import com.r3corda.core.serialization.SerializedBytes
|
||||||
import com.r3corda.node.services.api.Checkpoint
|
import com.r3corda.node.services.api.Checkpoint
|
||||||
|
import com.r3corda.node.services.statemachine.FiberRequest
|
||||||
|
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
@ -92,6 +95,8 @@ class PerFileCheckpointStorageTests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private var checkpointCount = 1
|
private var checkpointCount = 1
|
||||||
private fun newCheckpoint() = Checkpoint(SerializedBytes(Ints.toByteArray(checkpointCount++)), "topic", "javaType", null)
|
private val request = FiberRequest.ExpectingResponse("topic", null, random63BitValue(), random63BitValue(), null,
|
||||||
|
java.lang.String::class.java)
|
||||||
|
private fun newCheckpoint() = Checkpoint(SerializedBytes(Ints.toByteArray(checkpointCount++)), request)
|
||||||
|
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user