corda/classpath/java/util/concurrent/FutureTask.java

170 lines
4.8 KiB
Java
Raw Normal View History

2014-04-20 20:14:48 -06:00
/* Copyright (c) 2008-2014, Avian Contributors
Permission to use, copy, modify, and/or distribute this software
for any purpose with or without fee is hereby granted, provided
that the above copyright notice and this permission notice appear
in all copies.
There is NO WARRANTY for this software. See license.txt for
details. */
package java.util.concurrent;
import java.util.concurrent.atomic.AtomicReference;
public class FutureTask<T> implements RunnableFuture<T> {
private enum State { New, Canceling, Canceled, Running, Done };
private final AtomicReference<State> currentState;
private final Callable<T> callable;
private final Object notifyLock;
private volatile Thread runningThread;
private volatile T result;
private volatile Throwable failure;
public FutureTask(final Runnable r, final T result) {
this(new Callable<T>() {
@Override
public T call() {
r.run();
return result;
}
});
}
public FutureTask(Callable<T> callable) {
currentState = new AtomicReference<State>(State.New);
this.callable = callable;
notifyLock = new Object();
runningThread = null;
result = null;
failure = null;
}
@Override
public void run() {
if (currentState.compareAndSet(State.New, State.Running)) {
runningThread = Thread.currentThread();
try {
result = callable.call();
} catch (Throwable t) {
failure = t;
} finally {
if (currentState.compareAndSet(State.Running, State.Done) ||
currentState.get() == State.Canceled) {
/* in either of these conditions we either were not canceled
* or we already were interrupted. The thread may or MAY NOT
* be in an interrupted status depending on when it was
* interrupted and what the callable did with the state.
*/
} else {
/* Should be in canceling state, so block forever till we are
* interrupted. If state already transitioned into canceled
* and thus thread is in interrupted status, the exception should
* throw immediately on the sleep call.
*/
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
// expected
}
}
Thread.interrupted(); // reset interrupted status if set
handleDone();
runningThread = null; // must be last operation
}
}
}
private void handleDone() {
done();
synchronized (notifyLock) {
notifyLock.notifyAll();
}
}
protected void done() {
// default implementation does nothing, designed to be overridden
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (currentState.compareAndSet(State.New, State.Canceled)) {
handleDone();
return true;
} else if (mayInterruptIfRunning &&
currentState.compareAndSet(State.Running, State.Canceling)) {
// handleDone will be called from running thread
try {
Thread runningThread = this.runningThread;
if (runningThread != null) {
runningThread.interrupt();
return true;
}
} finally {
// we can not set to canceled until interrupt status has been set
currentState.set(State.Canceled);
}
}
return false;
}
@Override
public boolean isCancelled() {
return currentState.get() == State.Canceled;
}
@Override
public boolean isDone() {
return currentState.get() == State.Done || isCancelled();
}
@Override
public T get() throws InterruptedException, ExecutionException {
try {
return get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// not possible
throw new RuntimeException(e);
}
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException,
TimeoutException {
long timeoutInMillis = unit.toMillis(timeout);
long startTime = 0;
if (timeoutInMillis < Long.MAX_VALUE) {
startTime = System.currentTimeMillis();
}
long remainingTime;
synchronized (notifyLock) {
remainingTime = timeoutInMillis;
while (! isDone() && remainingTime > 0) {
notifyLock.wait(remainingTime);
if (timeoutInMillis < Long.MAX_VALUE) {
remainingTime = timeoutInMillis - (System.currentTimeMillis() - startTime);
}
}
}
if (remainingTime <= 0) {
throw new TimeoutException();
} else if (currentState.get() == State.Canceled) {
throw new CancellationException();
} else if (failure != null) {
throw new ExecutionException(failure);
} else {
return result;
}
}
}