Merge pull request #194 from jentfoo/FutureTask

Added implementation and tests for FutureTask.
This commit is contained in:
Joshua Warner 2014-03-10 16:51:36 -06:00
commit ed89e0c67d
4 changed files with 273 additions and 9 deletions

View File

@ -0,0 +1,159 @@
package java.util.concurrent;
import java.util.concurrent.atomic.AtomicReference;
public class FutureTask<T> implements RunnableFuture<T> {
private enum State { New, Canceling, Canceled, Running, Done };
private final AtomicReference<State> currentState;
private final Callable<T> callable;
private final Object notifyLock;
private volatile Thread runningThread;
private volatile T result;
private volatile Throwable failure;
public FutureTask(final Runnable r, final T result) {
this(new Callable<T>() {
@Override
public T call() {
r.run();
return result;
}
});
}
public FutureTask(Callable<T> callable) {
currentState = new AtomicReference<State>(State.New);
this.callable = callable;
notifyLock = new Object();
runningThread = null;
result = null;
failure = null;
}
@Override
public void run() {
if (currentState.compareAndSet(State.New, State.Running)) {
runningThread = Thread.currentThread();
try {
result = callable.call();
} catch (Throwable t) {
failure = t;
} finally {
if (currentState.compareAndSet(State.Running, State.Done) ||
currentState.get() == State.Canceled) {
/* in either of these conditions we either were not canceled
* or we already were interrupted. The thread may or MAY NOT
* be in an interrupted status depending on when it was
* interrupted and what the callable did with the state.
*/
} else {
/* Should be in canceling state, so block forever till we are
* interrupted. If state already transitioned into canceled
* and thus thread is in interrupted status, the exception should
* throw immediately on the sleep call.
*/
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
// expected
}
}
Thread.interrupted(); // reset interrupted status if set
handleDone();
runningThread = null; // must be last operation
}
}
}
private void handleDone() {
done();
synchronized (notifyLock) {
notifyLock.notifyAll();
}
}
protected void done() {
// default implementation does nothing, designed to be overridden
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (currentState.compareAndSet(State.New, State.Canceled)) {
handleDone();
return true;
} else if (mayInterruptIfRunning &&
currentState.compareAndSet(State.Running, State.Canceling)) {
// handleDone will be called from running thread
try {
Thread runningThread = this.runningThread;
if (runningThread != null) {
runningThread.interrupt();
return true;
}
} finally {
// we can not set to canceled until interrupt status has been set
currentState.set(State.Canceled);
}
}
return false;
}
@Override
public boolean isCancelled() {
return currentState.get() == State.Canceled;
}
@Override
public boolean isDone() {
return currentState.get() == State.Done || isCancelled();
}
@Override
public T get() throws InterruptedException, ExecutionException {
try {
return get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// not possible
throw new RuntimeException(e);
}
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException,
TimeoutException {
long timeoutInMillis = unit.toMillis(timeout);
long startTime = 0;
if (timeoutInMillis < Long.MAX_VALUE) {
startTime = System.currentTimeMillis();
}
long remainingTime;
synchronized (notifyLock) {
remainingTime = timeoutInMillis;
while (! isDone() && remainingTime > 0) {
notifyLock.wait(remainingTime);
if (timeoutInMillis < Long.MAX_VALUE) {
remainingTime = timeoutInMillis - (System.currentTimeMillis() - startTime);
}
}
}
if (remainingTime <= 0) {
throw new TimeoutException();
} else if (currentState.get() == State.Canceled) {
throw new CancellationException();
} else if (failure != null) {
throw new ExecutionException(failure);
} else {
return result;
}
}
}

View File

@ -32,6 +32,10 @@ public class AtomicReference<T> implements java.io.Serializable {
public T get() {
return value;
}
public void set(T newValue) {
this.value = newValue;
}
public void lazySet(T newValue) {
unsafe.putOrderedObject(this, valueOffset, newValue);

View File

@ -23,16 +23,12 @@ public class LockSupport {
static {
unsafe = Unsafe.getUnsafe();
try {
parkBlockerOffset = unsafe.objectFieldOffset(java.lang.Thread.class.getDeclaredField("parkBlocker"));
parkBlockerOffset = unsafe.objectFieldOffset(Thread.class.getDeclaredField("parkBlocker"));
} catch (Exception ex) {
throw new Error(ex);
}
}
private static void setBlocker(Thread t, Object arg) {
unsafe.putObject(t, parkBlockerOffset, arg);
}
public static void unpark(Thread thread) {
if (thread != null) {
unsafe.unpark(thread);
@ -53,16 +49,16 @@ public class LockSupport {
private static void doParkNanos(Object blocker, long nanos) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.putObject(t, parkBlockerOffset, blocker);
unsafe.park(false, nanos);
setBlocker(t, null);
unsafe.putObject(t, parkBlockerOffset, null);
}
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.putObject(t, parkBlockerOffset, blocker);
unsafe.park(true, deadline);
setBlocker(t, null);
unsafe.putObject(t, parkBlockerOffset, null);
}
public static Object getBlocker(Thread t) {

105
test/FutureTaskTest.java Normal file
View File

@ -0,0 +1,105 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureTaskTest {
private static final int DELAY_TIME = 10;
public static void main(String[] args) throws InterruptedException, ExecutionException {
isDoneTest(false);
isDoneTest(true);
getCallableResultTest();
getRunnableResultTest();
getTimeoutFail();
getExecutionExceptionTest();
}
private static void isDoneTest(final boolean throwException) {
RunnableFuture<?> future = new FutureTask<Object>(new Runnable() {
@Override
public void run() {
if (throwException) {
throw new RuntimeException();
}
}
}, null);
// should finish the future
future.run();
if (! future.isDone()) {
throw new RuntimeException("Future should be done");
}
}
private static void getCallableResultTest() throws InterruptedException, ExecutionException {
final Object result = new Object();
FutureTask<Object> future = new FutureTask<Object>(new Callable<Object>() {
@Override
public Object call() throws Exception {
return result;
}
});
future.run();
if (future.get() != result) {
throw new RuntimeException("Bad result returned: " + future.get());
}
}
private static void getRunnableResultTest() throws InterruptedException, ExecutionException {
final Object result = new Object();
FutureTask<Object> future = new FutureTask<Object>(new Runnable() {
@Override
public void run() {
// nothing here
}
}, result);
future.run();
if (future.get() != result) {
throw new RuntimeException("Bad result returned: " + future.get());
}
}
private static void getTimeoutFail() throws InterruptedException,
ExecutionException {
RunnableFuture<?> future = new FutureTask<Object>(new Runnable() {
@Override
public void run() {
// wont run
}
}, null);
long startTime = System.currentTimeMillis();
try {
future.get(DELAY_TIME, TimeUnit.MILLISECONDS);
throw new RuntimeException("Exception should have been thrown");
} catch (TimeoutException e) {
long catchTime = System.currentTimeMillis();
if (catchTime - startTime < DELAY_TIME) {
throw new RuntimeException("get with timeout did not block long enough");
}
}
}
private static void getExecutionExceptionTest() throws InterruptedException, ExecutionException {
FutureTask<Object> future = new FutureTask<Object>(new Runnable() {
@Override
public void run() {
throw new RuntimeException();
}
}, null);
future.run();
try {
future.get();
throw new RuntimeException("Exception should have thrown");
} catch (ExecutionException e) {
// expected
}
}
}