From 83a31314e0842c5754d887120f7c791db3a06e35 Mon Sep 17 00:00:00 2001 From: Mike Jensen Date: Mon, 10 Mar 2014 10:53:49 -0600 Subject: [PATCH 1/3] Added implementation and tests for FutureTask. I also was missing the set operation for AtomicReference, and cleaned a couple things up from LockSupport. --- .../java/util/concurrent/FutureTask.java | 136 ++++++++++++++++++ .../concurrent/atomic/AtomicReference.java | 4 + .../util/concurrent/locks/LockSupport.java | 14 +- test/FutureTaskTest.java | 105 ++++++++++++++ 4 files changed, 250 insertions(+), 9 deletions(-) create mode 100644 classpath/java/util/concurrent/FutureTask.java create mode 100644 test/FutureTaskTest.java diff --git a/classpath/java/util/concurrent/FutureTask.java b/classpath/java/util/concurrent/FutureTask.java new file mode 100644 index 0000000000..b8b351f614 --- /dev/null +++ b/classpath/java/util/concurrent/FutureTask.java @@ -0,0 +1,136 @@ +package java.util.concurrent; + +import java.util.concurrent.atomic.AtomicReference; + +public class FutureTask implements RunnableFuture { + private enum State { New, Canceled, Running, Done }; + + private final AtomicReference currentState; + private final Callable callable; + private final Object notifyLock; + private volatile Thread runningThread; + private volatile T result; + private volatile Throwable failure; + + public FutureTask(final Runnable r, final T result) { + this(new Callable() { + @Override + public T call() { + r.run(); + + return result; + } + }); + } + + public FutureTask(Callable callable) { + currentState = new AtomicReference(State.New); + this.callable = callable; + notifyLock = new Object(); + runningThread = null; + result = null; + failure = null; + } + + @Override + public void run() { + // must set running thread before we change the state for cancel to work + runningThread = Thread.currentThread(); + try { + if (currentState.compareAndSet(State.New, State.Running)) { + try { + result = callable.call(); + } catch (Throwable t) { + failure = t; + } finally { + currentState.compareAndSet(State.Running, State.Done); + handleDone(); + } + } + } finally { + runningThread = null; + } + } + + private void handleDone() { + done(); + + synchronized (notifyLock) { + notifyLock.notifyAll(); + } + } + + protected void done() { + // default implementation does nothing, designed to be overridden + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (currentState.compareAndSet(State.New, State.Canceled)) { + handleDone(); + + return true; + } else if (mayInterruptIfRunning) { + Thread runningThread = this.runningThread; + if (runningThread != null) { + runningThread.interrupt(); + + return true; + } + } + + return false; + } + + @Override + public boolean isCancelled() { + return currentState.get() == State.Canceled; + } + + @Override + public boolean isDone() { + return currentState.get() == State.Done || isCancelled(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + try { + return get(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // not possible + throw new RuntimeException(e); + } + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, + TimeoutException { + long timeoutInMillis = unit.toMillis(timeout); + long startTime = 0; + if (timeoutInMillis < Long.MAX_VALUE) { + startTime = System.currentTimeMillis(); + } + long remainingTime; + synchronized (notifyLock) { + remainingTime = timeoutInMillis; + while (! isDone() && remainingTime > 0) { + notifyLock.wait(remainingTime); + + if (timeoutInMillis < Long.MAX_VALUE) { + remainingTime = timeoutInMillis - (System.currentTimeMillis() - startTime); + } + } + } + + if (remainingTime <= 0) { + throw new TimeoutException(); + } else if (currentState.get() == State.Canceled) { + throw new CancellationException(); + } else if (failure != null) { + throw new ExecutionException(failure); + } else { + return result; + } + } +} diff --git a/classpath/java/util/concurrent/atomic/AtomicReference.java b/classpath/java/util/concurrent/atomic/AtomicReference.java index 7571d928d6..b7f3c95121 100644 --- a/classpath/java/util/concurrent/atomic/AtomicReference.java +++ b/classpath/java/util/concurrent/atomic/AtomicReference.java @@ -32,6 +32,10 @@ public class AtomicReference implements java.io.Serializable { public T get() { return value; } + + public void set(T newValue) { + this.value = newValue; + } public void lazySet(T newValue) { unsafe.putOrderedObject(this, valueOffset, newValue); diff --git a/classpath/java/util/concurrent/locks/LockSupport.java b/classpath/java/util/concurrent/locks/LockSupport.java index 1f547c465d..7d0237e796 100644 --- a/classpath/java/util/concurrent/locks/LockSupport.java +++ b/classpath/java/util/concurrent/locks/LockSupport.java @@ -23,16 +23,12 @@ public class LockSupport { static { unsafe = Unsafe.getUnsafe(); try { - parkBlockerOffset = unsafe.objectFieldOffset(java.lang.Thread.class.getDeclaredField("parkBlocker")); + parkBlockerOffset = unsafe.objectFieldOffset(Thread.class.getDeclaredField("parkBlocker")); } catch (Exception ex) { throw new Error(ex); } } - private static void setBlocker(Thread t, Object arg) { - unsafe.putObject(t, parkBlockerOffset, arg); - } - public static void unpark(Thread thread) { if (thread != null) { unsafe.unpark(thread); @@ -53,16 +49,16 @@ public class LockSupport { private static void doParkNanos(Object blocker, long nanos) { Thread t = Thread.currentThread(); - setBlocker(t, blocker); + unsafe.putObject(t, parkBlockerOffset, blocker); unsafe.park(false, nanos); - setBlocker(t, null); + unsafe.putObject(t, parkBlockerOffset, null); } public static void parkUntil(Object blocker, long deadline) { Thread t = Thread.currentThread(); - setBlocker(t, blocker); + unsafe.putObject(t, parkBlockerOffset, blocker); unsafe.park(true, deadline); - setBlocker(t, null); + unsafe.putObject(t, parkBlockerOffset, null); } public static Object getBlocker(Thread t) { diff --git a/test/FutureTaskTest.java b/test/FutureTaskTest.java new file mode 100644 index 0000000000..7a8ea9d9a7 --- /dev/null +++ b/test/FutureTaskTest.java @@ -0,0 +1,105 @@ +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class FutureTaskTest { + private static final int DELAY_TIME = 10; + + public static void main(String[] args) throws InterruptedException, ExecutionException { + isDoneTest(false); + isDoneTest(true); + getCallableResultTest(); + getRunnableResultTest(); + getTimeoutFail(); + getExecutionExceptionTest(); + } + + private static void isDoneTest(final boolean throwException) { + RunnableFuture future = new FutureTask(new Runnable() { + @Override + public void run() { + if (throwException) { + throw new RuntimeException(); + } + } + }, null); + + // should finish the future + future.run(); + + if (! future.isDone()) { + throw new RuntimeException("Future should be done"); + } + } + + private static void getCallableResultTest() throws InterruptedException, ExecutionException { + final Object result = new Object(); + FutureTask future = new FutureTask(new Callable() { + @Override + public Object call() throws Exception { + return result; + } + }); + + future.run(); + if (future.get() != result) { + throw new RuntimeException("Bad result returned: " + future.get()); + } + } + + private static void getRunnableResultTest() throws InterruptedException, ExecutionException { + final Object result = new Object(); + FutureTask future = new FutureTask(new Runnable() { + @Override + public void run() { + // nothing here + } + }, result); + + future.run(); + if (future.get() != result) { + throw new RuntimeException("Bad result returned: " + future.get()); + } + } + + private static void getTimeoutFail() throws InterruptedException, + ExecutionException { + RunnableFuture future = new FutureTask(new Runnable() { + @Override + public void run() { + // wont run + } + }, null); + + long startTime = System.currentTimeMillis(); + try { + future.get(DELAY_TIME, TimeUnit.MILLISECONDS); + throw new RuntimeException("Exception should have been thrown"); + } catch (TimeoutException e) { + long catchTime = System.currentTimeMillis(); + if (catchTime - startTime < DELAY_TIME) { + throw new RuntimeException("get with timeout did not block long enough"); + } + } + } + + private static void getExecutionExceptionTest() throws InterruptedException, ExecutionException { + FutureTask future = new FutureTask(new Runnable() { + @Override + public void run() { + throw new RuntimeException(); + } + }, null); + + future.run(); + try { + future.get(); + throw new RuntimeException("Exception should have thrown"); + } catch (ExecutionException e) { + // expected + } + } +} From d56087240d9ca6ea126da4f281d48bf3759b40a1 Mon Sep 17 00:00:00 2001 From: Mike Jensen Date: Mon, 10 Mar 2014 12:43:22 -0600 Subject: [PATCH 2/3] Changes so that we only set the running thread if we actually ARE the running thread --- .../java/util/concurrent/FutureTask.java | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/classpath/java/util/concurrent/FutureTask.java b/classpath/java/util/concurrent/FutureTask.java index b8b351f614..2fb4d10e15 100644 --- a/classpath/java/util/concurrent/FutureTask.java +++ b/classpath/java/util/concurrent/FutureTask.java @@ -34,21 +34,17 @@ public class FutureTask implements RunnableFuture { @Override public void run() { - // must set running thread before we change the state for cancel to work - runningThread = Thread.currentThread(); - try { - if (currentState.compareAndSet(State.New, State.Running)) { - try { - result = callable.call(); - } catch (Throwable t) { - failure = t; - } finally { - currentState.compareAndSet(State.Running, State.Done); - handleDone(); - } + if (currentState.compareAndSet(State.New, State.Running)) { + runningThread = Thread.currentThread(); + try { + result = callable.call(); + } catch (Throwable t) { + failure = t; + } finally { + currentState.compareAndSet(State.Running, State.Done); + handleDone(); + runningThread = null; // must be set after state changed to done } - } finally { - runningThread = null; } } From ccb608304519866cab63aa15a1a788a3a69a5b44 Mon Sep 17 00:00:00 2001 From: Mike Jensen Date: Mon, 10 Mar 2014 16:14:10 -0600 Subject: [PATCH 3/3] Attempting to prevent interrupting threads after future has completed. We added a 4th state, so we have "Canceling and Canceled". We are in canceling state if we previously were running, and will not transition to canceled till after the interrupt has been sent. So at the end if we are not running, or already canceled, we will sleep, waiting for the interrupt to occur so we can be sure we handle it before we let the thread complete. This also fixes a condition where we returned true on a cancel after a task has already been canceled --- .../java/util/concurrent/FutureTask.java | 45 +++++++++++++++---- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/classpath/java/util/concurrent/FutureTask.java b/classpath/java/util/concurrent/FutureTask.java index 2fb4d10e15..15d0f0161b 100644 --- a/classpath/java/util/concurrent/FutureTask.java +++ b/classpath/java/util/concurrent/FutureTask.java @@ -3,7 +3,7 @@ package java.util.concurrent; import java.util.concurrent.atomic.AtomicReference; public class FutureTask implements RunnableFuture { - private enum State { New, Canceled, Running, Done }; + private enum State { New, Canceling, Canceled, Running, Done }; private final AtomicReference currentState; private final Callable callable; @@ -41,9 +41,29 @@ public class FutureTask implements RunnableFuture { } catch (Throwable t) { failure = t; } finally { - currentState.compareAndSet(State.Running, State.Done); + if (currentState.compareAndSet(State.Running, State.Done) || + currentState.get() == State.Canceled) { + /* in either of these conditions we either were not canceled + * or we already were interrupted. The thread may or MAY NOT + * be in an interrupted status depending on when it was + * interrupted and what the callable did with the state. + */ + } else { + /* Should be in canceling state, so block forever till we are + * interrupted. If state already transitioned into canceled + * and thus thread is in interrupted status, the exception should + * throw immediately on the sleep call. + */ + try { + Thread.sleep(Long.MAX_VALUE); + } catch (InterruptedException e) { + // expected + } + } + + Thread.interrupted(); // reset interrupted status if set handleDone(); - runningThread = null; // must be set after state changed to done + runningThread = null; // must be last operation } } } @@ -66,12 +86,19 @@ public class FutureTask implements RunnableFuture { handleDone(); return true; - } else if (mayInterruptIfRunning) { - Thread runningThread = this.runningThread; - if (runningThread != null) { - runningThread.interrupt(); - - return true; + } else if (mayInterruptIfRunning && + currentState.compareAndSet(State.Running, State.Canceling)) { + // handleDone will be called from running thread + try { + Thread runningThread = this.runningThread; + if (runningThread != null) { + runningThread.interrupt(); + + return true; + } + } finally { + // we can not set to canceled until interrupt status has been set + currentState.set(State.Canceled); } }