mirror of
https://github.com/corda/corda.git
synced 2025-01-24 13:28:07 +00:00
ccb6083045
We added a 4th state, so we have "Canceling and Canceled". We are in canceling state if we previously were running, and will not transition to canceled till after the interrupt has been sent. So at the end if we are not running, or already canceled, we will sleep, waiting for the interrupt to occur so we can be sure we handle it before we let the thread complete. This also fixes a condition where we returned true on a cancel after a task has already been canceled
160 lines
4.5 KiB
Java
160 lines
4.5 KiB
Java
package java.util.concurrent;
|
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
public class FutureTask<T> implements RunnableFuture<T> {
|
|
private enum State { New, Canceling, Canceled, Running, Done };
|
|
|
|
private final AtomicReference<State> currentState;
|
|
private final Callable<T> callable;
|
|
private final Object notifyLock;
|
|
private volatile Thread runningThread;
|
|
private volatile T result;
|
|
private volatile Throwable failure;
|
|
|
|
public FutureTask(final Runnable r, final T result) {
|
|
this(new Callable<T>() {
|
|
@Override
|
|
public T call() {
|
|
r.run();
|
|
|
|
return result;
|
|
}
|
|
});
|
|
}
|
|
|
|
public FutureTask(Callable<T> callable) {
|
|
currentState = new AtomicReference<State>(State.New);
|
|
this.callable = callable;
|
|
notifyLock = new Object();
|
|
runningThread = null;
|
|
result = null;
|
|
failure = null;
|
|
}
|
|
|
|
@Override
|
|
public void run() {
|
|
if (currentState.compareAndSet(State.New, State.Running)) {
|
|
runningThread = Thread.currentThread();
|
|
try {
|
|
result = callable.call();
|
|
} catch (Throwable t) {
|
|
failure = t;
|
|
} finally {
|
|
if (currentState.compareAndSet(State.Running, State.Done) ||
|
|
currentState.get() == State.Canceled) {
|
|
/* in either of these conditions we either were not canceled
|
|
* or we already were interrupted. The thread may or MAY NOT
|
|
* be in an interrupted status depending on when it was
|
|
* interrupted and what the callable did with the state.
|
|
*/
|
|
} else {
|
|
/* Should be in canceling state, so block forever till we are
|
|
* interrupted. If state already transitioned into canceled
|
|
* and thus thread is in interrupted status, the exception should
|
|
* throw immediately on the sleep call.
|
|
*/
|
|
try {
|
|
Thread.sleep(Long.MAX_VALUE);
|
|
} catch (InterruptedException e) {
|
|
// expected
|
|
}
|
|
}
|
|
|
|
Thread.interrupted(); // reset interrupted status if set
|
|
handleDone();
|
|
runningThread = null; // must be last operation
|
|
}
|
|
}
|
|
}
|
|
|
|
private void handleDone() {
|
|
done();
|
|
|
|
synchronized (notifyLock) {
|
|
notifyLock.notifyAll();
|
|
}
|
|
}
|
|
|
|
protected void done() {
|
|
// default implementation does nothing, designed to be overridden
|
|
}
|
|
|
|
@Override
|
|
public boolean cancel(boolean mayInterruptIfRunning) {
|
|
if (currentState.compareAndSet(State.New, State.Canceled)) {
|
|
handleDone();
|
|
|
|
return true;
|
|
} else if (mayInterruptIfRunning &&
|
|
currentState.compareAndSet(State.Running, State.Canceling)) {
|
|
// handleDone will be called from running thread
|
|
try {
|
|
Thread runningThread = this.runningThread;
|
|
if (runningThread != null) {
|
|
runningThread.interrupt();
|
|
|
|
return true;
|
|
}
|
|
} finally {
|
|
// we can not set to canceled until interrupt status has been set
|
|
currentState.set(State.Canceled);
|
|
}
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
@Override
|
|
public boolean isCancelled() {
|
|
return currentState.get() == State.Canceled;
|
|
}
|
|
|
|
@Override
|
|
public boolean isDone() {
|
|
return currentState.get() == State.Done || isCancelled();
|
|
}
|
|
|
|
@Override
|
|
public T get() throws InterruptedException, ExecutionException {
|
|
try {
|
|
return get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
|
} catch (TimeoutException e) {
|
|
// not possible
|
|
throw new RuntimeException(e);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
public T get(long timeout, TimeUnit unit) throws InterruptedException,
|
|
ExecutionException,
|
|
TimeoutException {
|
|
long timeoutInMillis = unit.toMillis(timeout);
|
|
long startTime = 0;
|
|
if (timeoutInMillis < Long.MAX_VALUE) {
|
|
startTime = System.currentTimeMillis();
|
|
}
|
|
long remainingTime;
|
|
synchronized (notifyLock) {
|
|
remainingTime = timeoutInMillis;
|
|
while (! isDone() && remainingTime > 0) {
|
|
notifyLock.wait(remainingTime);
|
|
|
|
if (timeoutInMillis < Long.MAX_VALUE) {
|
|
remainingTime = timeoutInMillis - (System.currentTimeMillis() - startTime);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (remainingTime <= 0) {
|
|
throw new TimeoutException();
|
|
} else if (currentState.get() == State.Canceled) {
|
|
throw new CancellationException();
|
|
} else if (failure != null) {
|
|
throw new ExecutionException(failure);
|
|
} else {
|
|
return result;
|
|
}
|
|
}
|
|
}
|