diff --git a/classpath/java/util/concurrent/FutureTask.java b/classpath/java/util/concurrent/FutureTask.java new file mode 100644 index 0000000000..15d0f0161b --- /dev/null +++ b/classpath/java/util/concurrent/FutureTask.java @@ -0,0 +1,159 @@ +package java.util.concurrent; + +import java.util.concurrent.atomic.AtomicReference; + +public class FutureTask implements RunnableFuture { + private enum State { New, Canceling, Canceled, Running, Done }; + + private final AtomicReference currentState; + private final Callable callable; + private final Object notifyLock; + private volatile Thread runningThread; + private volatile T result; + private volatile Throwable failure; + + public FutureTask(final Runnable r, final T result) { + this(new Callable() { + @Override + public T call() { + r.run(); + + return result; + } + }); + } + + public FutureTask(Callable callable) { + currentState = new AtomicReference(State.New); + this.callable = callable; + notifyLock = new Object(); + runningThread = null; + result = null; + failure = null; + } + + @Override + public void run() { + if (currentState.compareAndSet(State.New, State.Running)) { + runningThread = Thread.currentThread(); + try { + result = callable.call(); + } catch (Throwable t) { + failure = t; + } finally { + if (currentState.compareAndSet(State.Running, State.Done) || + currentState.get() == State.Canceled) { + /* in either of these conditions we either were not canceled + * or we already were interrupted. The thread may or MAY NOT + * be in an interrupted status depending on when it was + * interrupted and what the callable did with the state. + */ + } else { + /* Should be in canceling state, so block forever till we are + * interrupted. If state already transitioned into canceled + * and thus thread is in interrupted status, the exception should + * throw immediately on the sleep call. + */ + try { + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + // expected + } + } + + Thread.interrupted(); // reset interrupted status if set + handleDone(); + runningThread = null; // must be last operation + } + } + } + + private void handleDone() { + done(); + + synchronized (notifyLock) { + notifyLock.notifyAll(); + } + } + + protected void done() { + // default implementation does nothing, designed to be overridden + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (currentState.compareAndSet(State.New, State.Canceled)) { + handleDone(); + + return true; + } else if (mayInterruptIfRunning && + currentState.compareAndSet(State.Running, State.Canceling)) { + // handleDone will be called from running thread + try { + Thread runningThread = this.runningThread; + if (runningThread != null) { + runningThread.interrupt(); + + return true; + } + } finally { + // we can not set to canceled until interrupt status has been set + currentState.set(State.Canceled); + } + } + + return false; + } + + @Override + public boolean isCancelled() { + return currentState.get() == State.Canceled; + } + + @Override + public boolean isDone() { + return currentState.get() == State.Done || isCancelled(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + try { + return get(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // not possible + throw new RuntimeException(e); + } + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, + TimeoutException { + long timeoutInMillis = unit.toMillis(timeout); + long startTime = 0; + if (timeoutInMillis < Long.MAX_VALUE) { + startTime = System.currentTimeMillis(); + } + long remainingTime; + synchronized (notifyLock) { + remainingTime = timeoutInMillis; + while (! isDone() && remainingTime > 0) { + notifyLock.wait(remainingTime); + + if (timeoutInMillis < Long.MAX_VALUE) { + remainingTime = timeoutInMillis - (System.currentTimeMillis() - startTime); + } + } + } + + if (remainingTime <= 0) { + throw new TimeoutException(); + } else if (currentState.get() == State.Canceled) { + throw new CancellationException(); + } else if (failure != null) { + throw new ExecutionException(failure); + } else { + return result; + } + } +} diff --git a/classpath/java/util/concurrent/atomic/AtomicReference.java b/classpath/java/util/concurrent/atomic/AtomicReference.java index 7571d928d6..b7f3c95121 100644 --- a/classpath/java/util/concurrent/atomic/AtomicReference.java +++ b/classpath/java/util/concurrent/atomic/AtomicReference.java @@ -32,6 +32,10 @@ public class AtomicReference implements java.io.Serializable { public T get() { return value; } + + public void set(T newValue) { + this.value = newValue; + } public void lazySet(T newValue) { unsafe.putOrderedObject(this, valueOffset, newValue); diff --git a/classpath/java/util/concurrent/locks/LockSupport.java b/classpath/java/util/concurrent/locks/LockSupport.java index 1f547c465d..7d0237e796 100644 --- a/classpath/java/util/concurrent/locks/LockSupport.java +++ b/classpath/java/util/concurrent/locks/LockSupport.java @@ -23,16 +23,12 @@ public class LockSupport { static { unsafe = Unsafe.getUnsafe(); try { - parkBlockerOffset = unsafe.objectFieldOffset(java.lang.Thread.class.getDeclaredField("parkBlocker")); + parkBlockerOffset = unsafe.objectFieldOffset(Thread.class.getDeclaredField("parkBlocker")); } catch (Exception ex) { throw new Error(ex); } } - private static void setBlocker(Thread t, Object arg) { - unsafe.putObject(t, parkBlockerOffset, arg); - } - public static void unpark(Thread thread) { if (thread != null) { unsafe.unpark(thread); @@ -53,16 +49,16 @@ public class LockSupport { private static void doParkNanos(Object blocker, long nanos) { Thread t = Thread.currentThread(); - setBlocker(t, blocker); + unsafe.putObject(t, parkBlockerOffset, blocker); unsafe.park(false, nanos); - setBlocker(t, null); + unsafe.putObject(t, parkBlockerOffset, null); } public static void parkUntil(Object blocker, long deadline) { Thread t = Thread.currentThread(); - setBlocker(t, blocker); + unsafe.putObject(t, parkBlockerOffset, blocker); unsafe.park(true, deadline); - setBlocker(t, null); + unsafe.putObject(t, parkBlockerOffset, null); } public static Object getBlocker(Thread t) { diff --git a/test/FutureTaskTest.java b/test/FutureTaskTest.java new file mode 100644 index 0000000000..7a8ea9d9a7 --- /dev/null +++ b/test/FutureTaskTest.java @@ -0,0 +1,105 @@ +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class FutureTaskTest { + private static final int DELAY_TIME = 10; + + public static void main(String[] args) throws InterruptedException, ExecutionException { + isDoneTest(false); + isDoneTest(true); + getCallableResultTest(); + getRunnableResultTest(); + getTimeoutFail(); + getExecutionExceptionTest(); + } + + private static void isDoneTest(final boolean throwException) { + RunnableFuture future = new FutureTask(new Runnable() { + @Override + public void run() { + if (throwException) { + throw new RuntimeException(); + } + } + }, null); + + // should finish the future + future.run(); + + if (! future.isDone()) { + throw new RuntimeException("Future should be done"); + } + } + + private static void getCallableResultTest() throws InterruptedException, ExecutionException { + final Object result = new Object(); + FutureTask future = new FutureTask(new Callable() { + @Override + public Object call() throws Exception { + return result; + } + }); + + future.run(); + if (future.get() != result) { + throw new RuntimeException("Bad result returned: " + future.get()); + } + } + + private static void getRunnableResultTest() throws InterruptedException, ExecutionException { + final Object result = new Object(); + FutureTask future = new FutureTask(new Runnable() { + @Override + public void run() { + // nothing here + } + }, result); + + future.run(); + if (future.get() != result) { + throw new RuntimeException("Bad result returned: " + future.get()); + } + } + + private static void getTimeoutFail() throws InterruptedException, + ExecutionException { + RunnableFuture future = new FutureTask(new Runnable() { + @Override + public void run() { + // wont run + } + }, null); + + long startTime = System.currentTimeMillis(); + try { + future.get(DELAY_TIME, TimeUnit.MILLISECONDS); + throw new RuntimeException("Exception should have been thrown"); + } catch (TimeoutException e) { + long catchTime = System.currentTimeMillis(); + if (catchTime - startTime < DELAY_TIME) { + throw new RuntimeException("get with timeout did not block long enough"); + } + } + } + + private static void getExecutionExceptionTest() throws InterruptedException, ExecutionException { + FutureTask future = new FutureTask(new Runnable() { + @Override + public void run() { + throw new RuntimeException(); + } + }, null); + + future.run(); + try { + future.get(); + throw new RuntimeException("Exception should have thrown"); + } catch (ExecutionException e) { + // expected + } + } +}