mirror of
https://github.com/corda/corda.git
synced 2024-12-19 04:57:58 +00:00
Using the protocol ID for the protocol logger name
This commit is contained in:
parent
11bcaf5fb2
commit
ac01b67549
@ -14,11 +14,16 @@ data class StateMachineRunId private constructor(val uuid: UUID) {
|
||||
fun createRandom(): StateMachineRunId = StateMachineRunId(UUID.randomUUID())
|
||||
}
|
||||
|
||||
override fun toString(): String = "${javaClass.simpleName}($uuid)"
|
||||
override fun toString(): String = "[$uuid]"
|
||||
}
|
||||
|
||||
/**
|
||||
* The interface of [ProtocolStateMachineImpl] exposing methods and properties required by ProtocolLogic for compilation.
|
||||
* A ProtocolStateMachine instance is a suspendable fiber that delegates all actual logic to a [ProtocolLogic] instance.
|
||||
* For any given flow there is only one PSM, even if that protocol invokes subprotocols.
|
||||
*
|
||||
* These classes are created by the [StateMachineManager] when a new protocol is started at the topmost level. If
|
||||
* a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost
|
||||
* logic element gets to return the value that the entire state machine resolves to.
|
||||
*/
|
||||
interface ProtocolStateMachine<R> {
|
||||
@Suspendable
|
||||
|
@ -45,7 +45,7 @@ class ResolveTransactionsProtocolTest {
|
||||
fun `resolve from two hashes`() {
|
||||
val (stx1, stx2) = makeTransactions()
|
||||
val p = ResolveTransactionsProtocol(setOf(stx2.id), a.info.legalIdentity)
|
||||
val future = b.services.startProtocol("resolve", p)
|
||||
val future = b.services.startProtocol(p)
|
||||
net.runNetwork()
|
||||
val results = future.get()
|
||||
assertEquals(listOf(stx1.id, stx2.id), results.map { it.id })
|
||||
@ -57,7 +57,7 @@ class ResolveTransactionsProtocolTest {
|
||||
fun `dependency with an error`() {
|
||||
val stx = makeTransactions(signFirstTX = false).second
|
||||
val p = ResolveTransactionsProtocol(setOf(stx.id), a.info.legalIdentity)
|
||||
val future = b.services.startProtocol("resolve", p)
|
||||
val future = b.services.startProtocol(p)
|
||||
net.runNetwork()
|
||||
assertFailsWith(SignatureException::class) {
|
||||
rootCauseExceptions { future.get() }
|
||||
@ -68,7 +68,7 @@ class ResolveTransactionsProtocolTest {
|
||||
fun `resolve from a signed transaction`() {
|
||||
val (stx1, stx2) = makeTransactions()
|
||||
val p = ResolveTransactionsProtocol(stx2, a.info.legalIdentity)
|
||||
val future = b.services.startProtocol("resolve", p)
|
||||
val future = b.services.startProtocol(p)
|
||||
net.runNetwork()
|
||||
future.get()
|
||||
assertEquals(stx1, b.storage.validatedTransactions.getTransaction(stx1.id))
|
||||
@ -91,7 +91,7 @@ class ResolveTransactionsProtocolTest {
|
||||
}
|
||||
val p = ResolveTransactionsProtocol(setOf(cursor.id), a.info.legalIdentity)
|
||||
p.transactionCountLimit = 40
|
||||
val future = b.services.startProtocol("resolve", p)
|
||||
val future = b.services.startProtocol(p)
|
||||
net.runNetwork()
|
||||
assertFailsWith<ResolveTransactionsProtocol.ExcessivelyLargeTransactionGraph> {
|
||||
rootCauseExceptions { future.get() }
|
||||
@ -117,7 +117,7 @@ class ResolveTransactionsProtocolTest {
|
||||
a.services.recordTransactions(stx2, stx3)
|
||||
|
||||
val p = ResolveTransactionsProtocol(setOf(stx3.id), a.info.legalIdentity)
|
||||
val future = b.services.startProtocol("resolve", p)
|
||||
val future = b.services.startProtocol(p)
|
||||
net.runNetwork()
|
||||
future.get()
|
||||
}
|
||||
@ -127,7 +127,7 @@ class ResolveTransactionsProtocolTest {
|
||||
val id = a.services.storageService.attachments.importAttachment("Some test file".toByteArray().opaque().open())
|
||||
val stx2 = makeTransactions(withAttachment = id).second
|
||||
val p = ResolveTransactionsProtocol(stx2, a.info.legalIdentity)
|
||||
val future = b.services.startProtocol("resolve", p)
|
||||
val future = b.services.startProtocol(p)
|
||||
net.runNetwork()
|
||||
future.get()
|
||||
assertNotNull(b.services.storageService.attachments.openAttachment(id))
|
||||
|
@ -70,7 +70,7 @@ class APIServerImpl(val node: AbstractNode) : APIServer {
|
||||
if (type is ProtocolClassRef) {
|
||||
val protocolLogicRef = node.services.protocolLogicRefFactory.createKotlin(type.className, args)
|
||||
val protocolInstance = node.services.protocolLogicRefFactory.toProtocolLogic(protocolLogicRef)
|
||||
return node.services.startProtocol(type.className, protocolInstance)
|
||||
return node.services.startProtocol(protocolInstance)
|
||||
} else {
|
||||
throw UnsupportedOperationException("Unsupported ProtocolRef type: $type")
|
||||
}
|
||||
|
@ -111,9 +111,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
|
||||
override val monitoringService: MonitoringService = MonitoringService(MetricRegistry())
|
||||
override val protocolLogicRefFactory: ProtocolLogicRefFactory get() = protocolLogicFactory
|
||||
|
||||
override fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
|
||||
return smm.add(loggerName, logic).resultFuture
|
||||
}
|
||||
override fun <T> startProtocol(logic: ProtocolLogic<T>): ListenableFuture<T> = smm.add(logic).resultFuture
|
||||
|
||||
override fun registerProtocolInitiator(markerClass: KClass<*>, protocolFactory: (Party) -> ProtocolLogic<*>) {
|
||||
require(markerClass !in protocolFactories) { "${markerClass.java.name} has already been used to register a protocol" }
|
||||
|
@ -79,7 +79,7 @@ class ServerRPCOps(
|
||||
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
|
||||
val protocol = FinalityProtocol(tx, setOf(req), setOf(req.recipient))
|
||||
return TransactionBuildResult.ProtocolStarted(
|
||||
smm.add(BroadcastTransactionProtocol::class.java.simpleName, protocol).id,
|
||||
smm.add(protocol).id,
|
||||
tx,
|
||||
"Cash payment transaction generated"
|
||||
)
|
||||
@ -114,7 +114,7 @@ class ServerRPCOps(
|
||||
val tx = builder.toSignedTransaction(checkSufficientSignatures = false)
|
||||
val protocol = FinalityProtocol(tx, setOf(req), participants)
|
||||
return TransactionBuildResult.ProtocolStarted(
|
||||
smm.add(BroadcastTransactionProtocol::class.java.simpleName, protocol).id,
|
||||
smm.add(protocol).id,
|
||||
tx,
|
||||
"Cash destruction transaction generated"
|
||||
)
|
||||
@ -134,7 +134,7 @@ class ServerRPCOps(
|
||||
// Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it
|
||||
val protocol = BroadcastTransactionProtocol(tx, setOf(req), setOf(req.recipient))
|
||||
return TransactionBuildResult.ProtocolStarted(
|
||||
smm.add(BroadcastTransactionProtocol::class.java.simpleName, protocol).id,
|
||||
smm.add(protocol).id,
|
||||
tx,
|
||||
"Cash issuance completed"
|
||||
)
|
||||
|
@ -69,7 +69,7 @@ abstract class ServiceHubInternal : ServiceHub {
|
||||
* between SMM and the scheduler. That particular problem should also be resolved by the service manager work
|
||||
* itself, at which point this method would not be needed (by the scheduler).
|
||||
*/
|
||||
abstract fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T>
|
||||
abstract fun <T> startProtocol(logic: ProtocolLogic<T>): ListenableFuture<T>
|
||||
|
||||
/**
|
||||
* Register the protocol factory we wish to use when a initiating party attempts to communicate with us. The
|
||||
@ -93,6 +93,6 @@ abstract class ServiceHubInternal : ServiceHub {
|
||||
val logicRef = protocolLogicRefFactory.create(logicType, *args)
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val logic = protocolLogicRefFactory.toProtocolLogic(logicRef) as ProtocolLogic<T>
|
||||
return startProtocol(logicType.simpleName, logic)
|
||||
return startProtocol(logic)
|
||||
}
|
||||
}
|
||||
|
@ -166,7 +166,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
|
||||
val logic = protocolLogicRefFactory.toProtocolLogic(scheduledActivity.logicRef)
|
||||
log.trace { "Firing ProtocolLogic $logic" }
|
||||
// TODO: ProtocolLogic should be checkpointed by the time this returns
|
||||
services.startProtocol("scheduled", logic)
|
||||
services.startProtocol(logic)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12,7 +12,6 @@ import com.r3corda.core.protocols.ProtocolSessionException
|
||||
import com.r3corda.core.protocols.ProtocolStateMachine
|
||||
import com.r3corda.core.protocols.StateMachineRunId
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.rootCause
|
||||
import com.r3corda.core.utilities.UntrustworthyData
|
||||
import com.r3corda.core.utilities.trace
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
@ -24,25 +23,13 @@ import org.jetbrains.exposed.sql.Transaction
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.PrintWriter
|
||||
import java.io.StringWriter
|
||||
import java.sql.SQLException
|
||||
import java.util.*
|
||||
import java.util.concurrent.ExecutionException
|
||||
|
||||
/**
|
||||
* A ProtocolStateMachine instance is a suspendable fiber that delegates all actual logic to a [ProtocolLogic] instance.
|
||||
* For any given flow there is only one PSM, even if that protocol invokes subprotocols.
|
||||
*
|
||||
* These classes are created by the [StateMachineManager] when a new protocol is started at the topmost level. If
|
||||
* a protocol invokes a sub-protocol, then it will pass along the PSM to the child. The call method of the topmost
|
||||
* logic element gets to return the value that the entire state machine resolves to.
|
||||
*/
|
||||
class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
val logic: ProtocolLogic<R>,
|
||||
scheduler: FiberScheduler,
|
||||
private val loggerName: String)
|
||||
: Fiber<R>("protocol", scheduler), ProtocolStateMachine<R> {
|
||||
scheduler: FiberScheduler) : Fiber<R>("protocol", scheduler), ProtocolStateMachine<R> {
|
||||
|
||||
companion object {
|
||||
// Used to work around a small limitation in Quasar.
|
||||
@ -69,7 +56,7 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
@Transient private var _logger: Logger? = null
|
||||
override val logger: Logger get() {
|
||||
return _logger ?: run {
|
||||
val l = LoggerFactory.getLogger(loggerName)
|
||||
val l = LoggerFactory.getLogger(id.toString())
|
||||
_logger = l
|
||||
return l
|
||||
}
|
||||
@ -208,7 +195,7 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
suspend(receiveRequest)
|
||||
receiveRequest.session.waitingForResponse = false
|
||||
getReceivedMessage()
|
||||
?: throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but got nothing: $id $receiveRequest")
|
||||
?: throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but got nothing: $receiveRequest")
|
||||
}
|
||||
|
||||
if (receivedMessage is SessionEnd) {
|
||||
@ -217,7 +204,7 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
} else if (receiveRequest.receiveType.isInstance(receivedMessage)) {
|
||||
return receiveRequest.receiveType.cast(receivedMessage)
|
||||
} else {
|
||||
throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but got $receivedMessage: $id $receiveRequest")
|
||||
throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but got $receivedMessage: $receiveRequest")
|
||||
}
|
||||
}
|
||||
|
||||
@ -227,7 +214,7 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
txTrampoline = TransactionManager.currentOrNull()
|
||||
StrandLocalTransactionManager.setThreadLocalTx(null)
|
||||
parkAndSerialize { fiber, serializer ->
|
||||
logger.trace { "Suspended $id on $ioRequest" }
|
||||
logger.trace { "Suspended on $ioRequest" }
|
||||
// restore the Tx onto the ThreadLocal so that we can commit the ensuing checkpoint to the DB
|
||||
StrandLocalTransactionManager.setThreadLocalTx(txTrampoline)
|
||||
txTrampoline = null
|
||||
@ -252,23 +239,18 @@ class ProtocolStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
internal fun resume(scheduler: FiberScheduler) {
|
||||
try {
|
||||
if (fromCheckpoint) {
|
||||
logger.info("$id resumed from checkpoint")
|
||||
logger.info("Resumed from checkpoint")
|
||||
fromCheckpoint = false
|
||||
Fiber.unparkDeserialized(this, scheduler)
|
||||
} else if (state == State.NEW) {
|
||||
logger.trace { "$id started" }
|
||||
logger.trace("Started")
|
||||
start()
|
||||
} else {
|
||||
logger.trace { "$id resumed" }
|
||||
logger.trace("Resumed")
|
||||
Fiber.unpark(this, QUASAR_UNBLOCKER)
|
||||
}
|
||||
} catch (t: Throwable) {
|
||||
logger.error("$id threw '${t.rootCause}'")
|
||||
logger.trace {
|
||||
val s = StringWriter()
|
||||
t.rootCause.printStackTrace(PrintWriter(s))
|
||||
"Stack trace of protocol error: $s"
|
||||
}
|
||||
logger.error("Error during resume", t)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,8 +27,8 @@ import com.r3corda.node.services.api.CheckpointStorage
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
import com.r3corda.node.utilities.AddOrRemove
|
||||
import com.r3corda.node.utilities.AffinityExecutor
|
||||
import kotlinx.support.jdk8.collections.removeIf
|
||||
import com.r3corda.node.utilities.isolatedTransaction
|
||||
import kotlinx.support.jdk8.collections.removeIf
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
@ -188,7 +188,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
private fun resumeRestoredFiber(fiber: ProtocolStateMachineImpl<*>) {
|
||||
fiber.openSessions.values.forEach { openSessions[it.ourSessionId] = it }
|
||||
if (fiber.openSessions.values.any { it.waitingForResponse }) {
|
||||
fiber.logger.info("Restored fiber pending on receive ${fiber.id}}")
|
||||
fiber.logger.info("Restored, pending on receive")
|
||||
} else {
|
||||
resumeFiber(fiber)
|
||||
}
|
||||
@ -197,7 +197,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
private fun onExistingSessionMessage(message: ExistingSessionMessage) {
|
||||
val session = openSessions[message.recipientSessionId]
|
||||
if (session != null) {
|
||||
session.psm.logger.trace { "${session.psm.id} received $message on $session" }
|
||||
session.psm.logger.trace { "Received $message on $session" }
|
||||
if (message is SessionEnd) {
|
||||
openSessions.remove(message.recipientSessionId)
|
||||
}
|
||||
@ -231,13 +231,13 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
val protocolFactory = serviceHub.getProtocolFactory(markerClass)
|
||||
if (protocolFactory != null) {
|
||||
val protocol = protocolFactory(otherParty)
|
||||
val psm = createFiber(sessionInit.protocolName, protocol)
|
||||
val psm = createFiber(protocol)
|
||||
val session = ProtocolSession(protocol, otherParty, random63BitValue(), otherPartySessionId)
|
||||
openSessions[session.ourSessionId] = session
|
||||
psm.openSessions[Pair(protocol, otherParty)] = session
|
||||
updateCheckpoint(psm)
|
||||
sendSessionMessage(otherParty, SessionConfirm(otherPartySessionId, session.ourSessionId), psm)
|
||||
psm.logger.debug { "Starting new ${psm.id} from $sessionInit on $session" }
|
||||
psm.logger.debug { "Initiated from $sessionInit on $session" }
|
||||
startFiber(psm)
|
||||
} else {
|
||||
logger.warn("Unknown protocol marker class in $sessionInit")
|
||||
@ -268,9 +268,9 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
return createKryo(serializer.kryo)
|
||||
}
|
||||
|
||||
private fun <T> createFiber(loggerName: String, logic: ProtocolLogic<T>): ProtocolStateMachineImpl<T> {
|
||||
private fun <T> createFiber(logic: ProtocolLogic<T>): ProtocolStateMachineImpl<T> {
|
||||
val id = StateMachineRunId.createRandom()
|
||||
return ProtocolStateMachineImpl(id, logic, scheduler, loggerName).apply { initFiber(this) }
|
||||
return ProtocolStateMachineImpl(id, logic, scheduler).apply { initFiber(this) }
|
||||
}
|
||||
|
||||
private fun initFiber(psm: ProtocolStateMachineImpl<*>) {
|
||||
@ -333,12 +333,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
}
|
||||
|
||||
/**
|
||||
* Kicks off a brand new state machine of the given class. It will log with the named logger.
|
||||
* Kicks off a brand new state machine of the given class.
|
||||
* The state machine will be persisted when it suspends, with automated restart if the StateMachineManager is
|
||||
* restarted with checkpointed state machines in the storage service.
|
||||
*/
|
||||
fun <T> add(loggerName: String, logic: ProtocolLogic<T>): ProtocolStateMachine<T> {
|
||||
val fiber = createFiber(loggerName, logic)
|
||||
fun <T> add(logic: ProtocolLogic<T>): ProtocolStateMachine<T> {
|
||||
val fiber = createFiber(logic)
|
||||
// We swap out the parent transaction context as using this frequently leads to a deadlock as we wait
|
||||
// on the protocol completion future inside that context. The problem is that any progress checkpoints are
|
||||
// unable to acquire the table lock and move forward till the calling transaction finishes.
|
||||
@ -389,7 +389,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
val node = serviceHub.networkMapCache.getNodeByPublicKey(party.owningKey)
|
||||
?: throw IllegalArgumentException("Don't know about party $party")
|
||||
val logger = psm?.logger ?: logger
|
||||
logger.trace { "${psm?.id} sending $message to party $party" }
|
||||
logger.trace { "Sending $message to party $party" }
|
||||
serviceHub.networkService.send(sessionTopic, message, node.address)
|
||||
}
|
||||
|
||||
|
@ -54,7 +54,7 @@ class AttachmentTests {
|
||||
|
||||
// Get node one to run a protocol to fetch it and insert it.
|
||||
network.runNetwork()
|
||||
val f1 = n1.services.startProtocol("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity))
|
||||
val f1 = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity))
|
||||
network.runNetwork()
|
||||
assertEquals(0, f1.get().fromDisk.size)
|
||||
|
||||
@ -65,7 +65,7 @@ class AttachmentTests {
|
||||
// Shut down node zero and ensure node one can still resolve the attachment.
|
||||
n0.stop()
|
||||
|
||||
val response: FetchDataProtocol.Result<Attachment> = n1.services.startProtocol("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity)).get()
|
||||
val response: FetchDataProtocol.Result<Attachment> = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity)).get()
|
||||
assertEquals(attachment, response.fromDisk[0])
|
||||
}
|
||||
|
||||
@ -76,7 +76,7 @@ class AttachmentTests {
|
||||
// Get node one to fetch a non-existent attachment.
|
||||
val hash = SecureHash.randomSHA256()
|
||||
network.runNetwork()
|
||||
val f1 = n1.services.startProtocol("tests.fetch2", FetchAttachmentsProtocol(setOf(hash), n0.info.legalIdentity))
|
||||
val f1 = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(hash), n0.info.legalIdentity))
|
||||
network.runNetwork()
|
||||
val e = assertFailsWith<FetchDataProtocol.HashNotFound> { rootCauseExceptions { f1.get() } }
|
||||
assertEquals(hash, e.requested)
|
||||
@ -109,7 +109,7 @@ class AttachmentTests {
|
||||
|
||||
// Get n1 to fetch the attachment. Should receive corrupted bytes.
|
||||
network.runNetwork()
|
||||
val f1 = n1.services.startProtocol("tests.fetch1", FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity))
|
||||
val f1 = n1.services.startProtocol(FetchAttachmentsProtocol(setOf(id), n0.info.legalIdentity))
|
||||
network.runNetwork()
|
||||
assertFailsWith<FetchDataProtocol.DownloadedVsRequestedDataMismatch> {
|
||||
rootCauseExceptions { f1.get() }
|
||||
|
@ -23,13 +23,13 @@ import com.r3corda.node.services.config.NodeConfiguration
|
||||
import com.r3corda.node.services.persistence.NodeAttachmentService
|
||||
import com.r3corda.node.services.persistence.PerFileTransactionStorage
|
||||
import com.r3corda.node.services.persistence.StorageServiceImpl
|
||||
import com.r3corda.node.services.persistence.checkpoints
|
||||
import com.r3corda.node.utilities.databaseTransaction
|
||||
import com.r3corda.protocols.TwoPartyTradeProtocol.Buyer
|
||||
import com.r3corda.protocols.TwoPartyTradeProtocol.Seller
|
||||
import com.r3corda.testing.*
|
||||
import com.r3corda.testing.node.InMemoryMessagingNetwork
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.node.services.persistence.checkpoints
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
@ -396,7 +396,7 @@ class TwoPartyTradeProtocolTests {
|
||||
Buyer(otherParty, notaryNode.info.notaryIdentity, 1000.DOLLARS, CommercialPaper.State::class.java)
|
||||
}
|
||||
val seller = Seller(bobNode.info.legalIdentity, notaryNode.info, assetToSell, 1000.DOLLARS, ALICE_KEY)
|
||||
val sellerResultFuture = aliceNode.smm.add("seller", seller).resultFuture
|
||||
val sellerResultFuture = aliceNode.smm.add(seller).resultFuture
|
||||
return RunResult(buyerFuture, sellerResultFuture, seller.psm.id)
|
||||
}
|
||||
|
||||
|
@ -12,8 +12,8 @@ import com.r3corda.core.transactions.SignedTransaction
|
||||
import com.r3corda.node.serialization.NodeClock
|
||||
import com.r3corda.node.services.api.MessagingServiceInternal
|
||||
import com.r3corda.node.services.api.MonitoringService
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
import com.r3corda.node.services.api.SchemaService
|
||||
import com.r3corda.node.services.api.ServiceHubInternal
|
||||
import com.r3corda.node.services.persistence.DataVending
|
||||
import com.r3corda.node.services.schema.NodeSchemaService
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager
|
||||
@ -79,9 +79,7 @@ open class MockServiceHubInternal(
|
||||
|
||||
override fun recordTransactions(txs: Iterable<SignedTransaction>) = recordTransactionsInternal(txStorageService, txs)
|
||||
|
||||
override fun <T> startProtocol(loggerName: String, logic: ProtocolLogic<T>): ListenableFuture<T> {
|
||||
return smm.add(loggerName, logic).resultFuture
|
||||
}
|
||||
override fun <T> startProtocol(logic: ProtocolLogic<T>): ListenableFuture<T> = smm.add(logic).resultFuture
|
||||
|
||||
override fun registerProtocolInitiator(markerClass: KClass<*>, protocolFactory: (Party) -> ProtocolLogic<*>) {
|
||||
protocolFactories[markerClass.java] = protocolFactory
|
||||
|
@ -49,7 +49,7 @@ class NotaryChangeTests {
|
||||
val state = issueState(clientNodeA, oldNotaryNode)
|
||||
val newNotary = newNotaryNode.info.notaryIdentity
|
||||
val protocol = Instigator(state, newNotary)
|
||||
val future = clientNodeA.services.startProtocol("notary-change", protocol)
|
||||
val future = clientNodeA.services.startProtocol(protocol)
|
||||
|
||||
net.runNetwork()
|
||||
|
||||
@ -62,7 +62,7 @@ class NotaryChangeTests {
|
||||
val state = issueMultiPartyState(clientNodeA, clientNodeB, oldNotaryNode)
|
||||
val newNotary = newNotaryNode.info.notaryIdentity
|
||||
val protocol = Instigator(state, newNotary)
|
||||
val future = clientNodeA.services.startProtocol("notary-change", protocol)
|
||||
val future = clientNodeA.services.startProtocol(protocol)
|
||||
|
||||
net.runNetwork()
|
||||
|
||||
@ -78,7 +78,7 @@ class NotaryChangeTests {
|
||||
val state = issueMultiPartyState(clientNodeA, clientNodeB, oldNotaryNode)
|
||||
val newEvilNotary = Party("Evil Notary", generateKeyPair().public)
|
||||
val protocol = Instigator(state, newEvilNotary)
|
||||
val future = clientNodeA.services.startProtocol("notary-change", protocol)
|
||||
val future = clientNodeA.services.startProtocol(protocol)
|
||||
|
||||
net.runNetwork()
|
||||
|
||||
|
@ -96,8 +96,8 @@ class NotaryServiceTests {
|
||||
|
||||
val firstSpend = NotaryProtocol.Client(stx)
|
||||
val secondSpend = NotaryProtocol.Client(stx)
|
||||
clientNode.services.startProtocol("notary.first", firstSpend)
|
||||
val future = clientNode.services.startProtocol("notary.second", secondSpend)
|
||||
clientNode.services.startProtocol(firstSpend)
|
||||
val future = clientNode.services.startProtocol(secondSpend)
|
||||
|
||||
net.runNetwork()
|
||||
|
||||
@ -110,7 +110,7 @@ class NotaryServiceTests {
|
||||
|
||||
private fun runNotaryClient(stx: SignedTransaction): ListenableFuture<DigitalSignature.LegallyIdentifiable> {
|
||||
val protocol = NotaryProtocol.Client(stx)
|
||||
val future = clientNode.services.startProtocol("notary-test", protocol)
|
||||
val future = clientNode.services.startProtocol(protocol)
|
||||
net.runNetwork()
|
||||
return future
|
||||
}
|
||||
|
@ -78,7 +78,7 @@ class ValidatingNotaryServiceTests {
|
||||
|
||||
private fun runClient(stx: SignedTransaction): ListenableFuture<DigitalSignature.LegallyIdentifiable> {
|
||||
val protocol = NotaryProtocol.Client(stx)
|
||||
val future = clientNode.services.startProtocol("notary", protocol)
|
||||
val future = clientNode.services.startProtocol(protocol)
|
||||
net.runNetwork()
|
||||
return future
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ class DataVendingServiceTests {
|
||||
|
||||
private fun MockNode.sendNotifyTx(tx: SignedTransaction, walletServiceNode: MockNode) {
|
||||
walletServiceNode.services.registerProtocolInitiator(NotifyTxProtocol::class, ::NotifyTransactionHandler)
|
||||
services.startProtocol("notify-tx", NotifyTxProtocol(walletServiceNode.info.legalIdentity, tx))
|
||||
services.startProtocol(NotifyTxProtocol(walletServiceNode.info.legalIdentity, tx))
|
||||
network.runNetwork()
|
||||
}
|
||||
|
||||
|
@ -9,10 +9,10 @@ import com.r3corda.core.protocols.ProtocolLogic
|
||||
import com.r3corda.core.protocols.ProtocolSessionException
|
||||
import com.r3corda.core.random63BitValue
|
||||
import com.r3corda.core.serialization.deserialize
|
||||
import com.r3corda.node.services.persistence.checkpoints
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager.SessionData
|
||||
import com.r3corda.node.services.statemachine.StateMachineManager.SessionMessage
|
||||
import com.r3corda.testing.node.InMemoryMessagingNetwork
|
||||
import com.r3corda.node.services.persistence.checkpoints
|
||||
import com.r3corda.testing.node.MockNetwork
|
||||
import com.r3corda.testing.node.MockNetwork.MockNode
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
@ -44,7 +44,7 @@ class StateMachineManagerTests {
|
||||
|
||||
@Test
|
||||
fun `newly added protocol is preserved on restart`() {
|
||||
node1.smm.add("test", ProtocolWithoutCheckpoints())
|
||||
node1.smm.add(ProtocolWithoutCheckpoints())
|
||||
val restoredProtocol = node1.restartAndGetRestoredProtocol<ProtocolWithoutCheckpoints>()
|
||||
assertThat(restoredProtocol.protocolStarted).isTrue()
|
||||
}
|
||||
@ -52,7 +52,7 @@ class StateMachineManagerTests {
|
||||
@Test
|
||||
fun `protocol can lazily use the serviceHub in its constructor`() {
|
||||
val protocol = ProtocolWithLazyServiceHub()
|
||||
node1.smm.add("test", protocol)
|
||||
node1.smm.add(protocol)
|
||||
assertThat(protocol.lazyTime).isNotNull()
|
||||
}
|
||||
|
||||
@ -60,7 +60,7 @@ class StateMachineManagerTests {
|
||||
fun `protocol restarted just after receiving payload`() {
|
||||
node2.services.registerProtocolInitiator(SendProtocol::class) { ReceiveThenSuspendProtocol(it) }
|
||||
val payload = random63BitValue()
|
||||
node1.smm.add("test", SendProtocol(payload, node2.info.legalIdentity))
|
||||
node1.smm.add(SendProtocol(payload, node2.info.legalIdentity))
|
||||
|
||||
// We push through just enough messages to get only the SessionData sent
|
||||
// TODO We should be able to give runNetwork a predicate for when to stop
|
||||
@ -75,7 +75,7 @@ class StateMachineManagerTests {
|
||||
fun `protocol added before network map does run after init`() {
|
||||
val node3 = net.createNode(node1.info.address) //create vanilla node
|
||||
val protocol = ProtocolNoBlocking()
|
||||
node3.smm.add("test", protocol)
|
||||
node3.smm.add(protocol)
|
||||
assertEquals(false, protocol.protocolStarted) // Not started yet as no network activity has been allowed yet
|
||||
net.runNetwork() // Allow network map messages to flow
|
||||
assertEquals(true, protocol.protocolStarted) // Now we should have run the protocol
|
||||
@ -85,7 +85,7 @@ class StateMachineManagerTests {
|
||||
fun `protocol added before network map will be init checkpointed`() {
|
||||
var node3 = net.createNode(node1.info.address) //create vanilla node
|
||||
val protocol = ProtocolNoBlocking()
|
||||
node3.smm.add("test", protocol)
|
||||
node3.smm.add(protocol)
|
||||
assertEquals(false, protocol.protocolStarted) // Not started yet as no network activity has been allowed yet
|
||||
node3.stop()
|
||||
|
||||
@ -109,7 +109,7 @@ class StateMachineManagerTests {
|
||||
val payload = random63BitValue()
|
||||
node1.services.registerProtocolInitiator(ReceiveThenSuspendProtocol::class) { SendProtocol(payload, it) }
|
||||
val receiveProtocol = ReceiveThenSuspendProtocol(node1.info.legalIdentity)
|
||||
node2.smm.add("test", receiveProtocol) // Prepare checkpointed receive protocol
|
||||
node2.smm.add(receiveProtocol) // Prepare checkpointed receive protocol
|
||||
node2.stop() // kill receiver
|
||||
val restoredProtocol = node2.restartAndGetRestoredProtocol<ReceiveThenSuspendProtocol>(node1.info.address)
|
||||
assertThat(restoredProtocol.receivedPayloads[0]).isEqualTo(payload)
|
||||
@ -135,7 +135,7 @@ class StateMachineManagerTests {
|
||||
}
|
||||
|
||||
// Kick off first send and receive
|
||||
node2.smm.add("test", PingPongProtocol(node3.info.legalIdentity, payload))
|
||||
node2.smm.add(PingPongProtocol(node3.info.legalIdentity, payload))
|
||||
assertEquals(1, node2.checkpointStorage.checkpoints().count())
|
||||
// Restart node and thus reload the checkpoint and resend the message with same UUID
|
||||
node2.stop()
|
||||
@ -165,7 +165,7 @@ class StateMachineManagerTests {
|
||||
node2.services.registerProtocolInitiator(SendProtocol::class) { ReceiveThenSuspendProtocol(it) }
|
||||
node3.services.registerProtocolInitiator(SendProtocol::class) { ReceiveThenSuspendProtocol(it) }
|
||||
val payload = random63BitValue()
|
||||
node1.smm.add("multiple-send", SendProtocol(payload, node2.info.legalIdentity, node3.info.legalIdentity))
|
||||
node1.smm.add(SendProtocol(payload, node2.info.legalIdentity, node3.info.legalIdentity))
|
||||
net.runNetwork()
|
||||
val node2Protocol = node2.getSingleProtocol<ReceiveThenSuspendProtocol>().first
|
||||
val node3Protocol = node3.getSingleProtocol<ReceiveThenSuspendProtocol>().first
|
||||
@ -182,7 +182,7 @@ class StateMachineManagerTests {
|
||||
node2.services.registerProtocolInitiator(ReceiveThenSuspendProtocol::class) { SendProtocol(node2Payload, it) }
|
||||
node3.services.registerProtocolInitiator(ReceiveThenSuspendProtocol::class) { SendProtocol(node3Payload, it) }
|
||||
val multiReceiveProtocol = ReceiveThenSuspendProtocol(node2.info.legalIdentity, node3.info.legalIdentity)
|
||||
node1.smm.add("multiple-receive", multiReceiveProtocol)
|
||||
node1.smm.add(multiReceiveProtocol)
|
||||
net.runNetwork(1) // session handshaking
|
||||
// have the messages arrive in reverse order of receive
|
||||
node3.pumpReceive(false)
|
||||
@ -195,7 +195,7 @@ class StateMachineManagerTests {
|
||||
@Test
|
||||
fun `exception thrown on other side`() {
|
||||
node2.services.registerProtocolInitiator(ReceiveThenSuspendProtocol::class) { ExceptionProtocol }
|
||||
val future = node1.smm.add("exception", ReceiveThenSuspendProtocol(node2.info.legalIdentity)).resultFuture
|
||||
val future = node1.smm.add(ReceiveThenSuspendProtocol(node2.info.legalIdentity)).resultFuture
|
||||
net.runNetwork()
|
||||
assertThatThrownBy { future.get() }.hasCauseInstanceOf(ProtocolSessionException::class.java)
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ fun main(args: Array<String>) {
|
||||
val tx = TransactionType.General.Builder(notaryNode.notaryIdentity)
|
||||
tx.addOutputState(TransactionState(Cash.State(1500.DOLLARS `issued by` node.info.legalIdentity.ref(1), node.info.legalIdentity.owningKey), notaryNode.notaryIdentity))
|
||||
val protocol = RatesFixProtocol(tx, rateOracle.serviceIdentities(InterestRateSwap.oracleType).first(), fixOf, expectedRate, rateTolerance)
|
||||
node.services.startProtocol("demo.ratefix", protocol).get()
|
||||
node.services.startProtocol(protocol).get()
|
||||
node.stop()
|
||||
|
||||
// Show the user the output.
|
||||
|
@ -187,7 +187,7 @@ private fun runSeller(node: Node, amount: Amount<Currency>, otherSide: Party) {
|
||||
tradeTX = node.smm.findStateMachines(TraderDemoProtocolSeller::class.java).single().second
|
||||
} else {
|
||||
val seller = TraderDemoProtocolSeller(otherSide, amount)
|
||||
tradeTX = node.services.startProtocol("demo.seller", seller)
|
||||
tradeTX = node.services.startProtocol(seller)
|
||||
}
|
||||
|
||||
tradeTX.success {
|
||||
|
@ -50,9 +50,6 @@ enum class Role(val legalName: String, val port: Int) {
|
||||
// which holds things like checkpoints, keys, databases, message logs etc.
|
||||
val DEFAULT_BASE_DIRECTORY = "./build/attachment-demo"
|
||||
|
||||
val LOG_RECIPIENT = "demo.recipient"
|
||||
val LOG_SENDER = "demo.sender"
|
||||
|
||||
val PROSPECTUS_HASH = SecureHash.parse("decd098666b9657314870e192ced0c3519c2c9d395507a238338f8d003929de9")
|
||||
|
||||
private val log: Logger = LoggerFactory.getLogger("AttachmentDemo")
|
||||
@ -86,7 +83,7 @@ fun main(args: Array<String>) {
|
||||
//
|
||||
// The first two strings correspond to the first argument to StateMachineManager.add() but the way we handle logging
|
||||
// for protocols will change in future.
|
||||
LogHelper.setLevel("+${LOG_RECIPIENT}", "+${LOG_SENDER}", "-org.apache.activemq")
|
||||
LogHelper.setLevel("-org.apache.activemq")
|
||||
|
||||
val directory = Paths.get(baseDirectory, role.name.toLowerCase())
|
||||
log.info("Using base demo directory $directory")
|
||||
@ -183,7 +180,7 @@ private fun runSender(node: Node, otherSide: Party) {
|
||||
|
||||
// Send the transaction to the other recipient
|
||||
val tx = ptx.toSignedTransaction()
|
||||
serviceHub.startProtocol(LOG_SENDER, FinalityProtocol(tx, emptySet(), setOf(otherSide))).success {
|
||||
serviceHub.startProtocol(FinalityProtocol(tx, emptySet(), setOf(otherSide))).success {
|
||||
thread {
|
||||
Thread.sleep(1000L) // Give the other side time to request the attachment
|
||||
node.stop()
|
||||
|
@ -117,7 +117,7 @@ class IRSSimulation(networkSendManuallyPumped: Boolean, runAsync: Boolean, laten
|
||||
showConsensusFor(listOf(node1, node2, regulators[0]))
|
||||
|
||||
val instigator = Instigator(node2.info.legalIdentity, AutoOffer(notary.info.notaryIdentity, irs), node1.keyPair!!)
|
||||
val instigatorTx = node1.services.startProtocol("instigator", instigator)
|
||||
val instigatorTx = node1.services.startProtocol(instigator)
|
||||
|
||||
return Futures.allAsList(instigatorTx, acceptorTx).flatMap { instigatorTx }
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ class TradeSimulation(runAsync: Boolean, latencyInjector: InMemoryMessagingNetwo
|
||||
showConsensusFor(listOf(buyer, seller, notary))
|
||||
showProgressFor(listOf(buyer, seller))
|
||||
|
||||
val sellerFuture = seller.services.startProtocol("bank.$sellerBankIndex.seller", sellerProtocol)
|
||||
val sellerFuture = seller.services.startProtocol(sellerProtocol)
|
||||
|
||||
return Futures.successfulAsList(buyerFuture, sellerFuture)
|
||||
}
|
||||
|
@ -152,7 +152,7 @@ class NodeInterestRatesTest {
|
||||
val protocol = RatesFixProtocol(tx, n2.info.serviceIdentities(NodeInterestRates.type).first(), fixOf, "0.675".bd, "0.1".bd)
|
||||
LogHelper.setLevel("rates")
|
||||
net.runNetwork()
|
||||
val future = n1.services.startProtocol("rates", protocol)
|
||||
val future = n1.services.startProtocol(protocol)
|
||||
|
||||
net.runNetwork()
|
||||
future.get()
|
||||
|
Loading…
Reference in New Issue
Block a user