Fixing post merge issues with reference states

This commit is contained in:
Katelyn Baker 2018-07-27 17:26:55 +01:00
parent bb7d33380f
commit 1fda42c2de
6 changed files with 24 additions and 20 deletions

View File

@ -37,9 +37,10 @@ abstract class AsyncCFTNotaryService : TrustedAuthorityNotaryService() {
txId: SecureHash, txId: SecureHash,
callerIdentity: Party, callerIdentity: Party,
requestSignature: NotarisationRequestSignature, requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow? timeWindow: TimeWindow?,
references: List<StateRef>
): CordaFuture<Result> { ): CordaFuture<Result> {
return asyncUniquenessProvider.commitAsync(states, txId, callerIdentity, requestSignature, timeWindow) return asyncUniquenessProvider.commitAsync(states, txId, callerIdentity, requestSignature, timeWindow, references)
} }
/** /**
@ -53,9 +54,11 @@ abstract class AsyncCFTNotaryService : TrustedAuthorityNotaryService() {
val txId: SecureHash, val txId: SecureHash,
val caller: Party, val caller: Party,
val requestSignature: NotarisationRequestSignature, val requestSignature: NotarisationRequestSignature,
val timeWindow: TimeWindow?) : FlowAsyncOperation<Result> { val timeWindow: TimeWindow?,
val references: List<StateRef>
): FlowAsyncOperation<Result> {
override fun execute(): CordaFuture<Result> { override fun execute(): CordaFuture<Result> {
return service.commitAsync(inputs, txId, caller, requestSignature, timeWindow) return service.commitAsync(inputs, txId, caller, requestSignature, timeWindow, references)
} }
} }
} }

View File

@ -24,11 +24,11 @@ import net.corda.core.identity.Party
*/ */
interface AsyncUniquenessProvider : UniquenessProvider { interface AsyncUniquenessProvider : UniquenessProvider {
/** Commits all input states of the given transaction. */ /** Commits all input states of the given transaction. */
fun commitAsync(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?): CordaFuture<Result> fun commitAsync(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>): CordaFuture<Result>
/** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */ /** Commits all input states of the given transaction synchronously. Use [commitAsync] for better performance. */
override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?) { override fun commit(states: List<StateRef>, txId: SecureHash, callerIdentity: Party, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List<StateRef>) {
val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow).get() val result = commitAsync(states, txId, callerIdentity, requestSignature, timeWindow,references).get()
if (result is Result.Failure) { if (result is Result.Failure) {
throw NotaryInternalException(result.error) throw NotaryInternalException(result.error)
} }

View File

@ -240,7 +240,8 @@ class MySQLNotaryServiceTests : IntegrationTest() {
txId, txId,
callerParty, callerParty,
requestSignature, requestSignature,
null).execute() null,
emptyList()).execute()
} }
return futures.transpose().get() return futures.transpose().get()
} }

View File

@ -98,6 +98,7 @@ class MySQLUniquenessProvider(
val callerIdentity: Party, val callerIdentity: Party,
val requestSignature: NotarisationRequestSignature, val requestSignature: NotarisationRequestSignature,
val timeWindow: TimeWindow?, val timeWindow: TimeWindow?,
val references: List<StateRef>,
val id: UUID = UUID.randomUUID()) val id: UUID = UUID.randomUUID())
private val metricPrefix = MySQLUniquenessProvider::class.simpleName private val metricPrefix = MySQLUniquenessProvider::class.simpleName
@ -176,11 +177,12 @@ class MySQLUniquenessProvider(
txId: SecureHash, txId: SecureHash,
callerIdentity: Party, callerIdentity: Party,
requestSignature: NotarisationRequestSignature, requestSignature: NotarisationRequestSignature,
timeWindow: TimeWindow? timeWindow: TimeWindow?,
references: List<StateRef>
): CordaFuture<Result> { ): CordaFuture<Result> {
inputStateCount.update(states.size) inputStateCount.update(states.size)
val timer = Stopwatch.createStarted() val timer = Stopwatch.createStarted()
val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow) val request = CommitRequest(states, txId, callerIdentity, requestSignature, timeWindow, references)
val future = openFuture<Result>() val future = openFuture<Result>()
requestFutures[request.id] = future requestFutures[request.id] = future
future.then { future.then {

View File

@ -498,11 +498,9 @@ class NodeVaultService(
Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates, otherResults = otherResults) Vault.Page(states = statesAndRefs, statesMetadata = statesMeta, stateTypes = criteriaParser.stateTypes, totalStatesAvailable = totalStates, otherResults = otherResults)
} }
} }
@Throws(VaultQueryException::class) @Throws(VaultQueryException::class)
override fun <T : ContractState> _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> { override fun <T : ContractState> _trackBy(criteria: QueryCriteria, paging: PageSpecification, sorting: Sort, contractStateType: Class<out T>): DataFeed<Vault.Page<T>, Vault.Update<T>> {
return mutex.locked { return concurrentBox.exclusive {
concurrentBox.exclusive {
val snapshotResults = _queryBy(criteria, paging, sorting, contractStateType) val snapshotResults = _queryBy(criteria, paging, sorting, contractStateType)
val updates: Observable<Vault.Update<T>> = uncheckedCast(_updatesPublisher.bufferUntilSubscribed() val updates: Observable<Vault.Update<T>> = uncheckedCast(_updatesPublisher.bufferUntilSubscribed()
.filter { it.containsType(contractStateType, snapshotResults.stateTypes) } .filter { it.containsType(contractStateType, snapshotResults.stateTypes) }
@ -510,7 +508,6 @@ class NodeVaultService(
DataFeed(snapshotResults, updates) DataFeed(snapshotResults, updates)
} }
} }
}
private fun <T : ContractState> filterContractStates(update: Vault.Update<T>, contractStateType: Class<out T>) = private fun <T : ContractState> filterContractStates(update: Vault.Update<T>, contractStateType: Class<out T>) =
update.copy(consumed = filterByContractState(contractStateType, update.consumed), update.copy(consumed = filterByContractState(contractStateType, update.consumed),

View File

@ -82,7 +82,8 @@ open class AsyncLoadTestFlow<T : AsyncCFTNotaryService>(
val inputs = inputGenerator.generateOrFail(random) val inputs = inputGenerator.generateOrFail(random)
val requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub) val requestSignature = NotarisationRequest(inputs, txId).generateSignature(serviceHub)
futures += AsyncCFTNotaryService.CommitOperation(service, inputs, txId, callerParty, requestSignature, null).execute() futures += AsyncCFTNotaryService.CommitOperation(service, inputs, txId, callerParty, requestSignature,
null, emptyList()).execute()
} }
futures.transpose().get() futures.transpose().get()