From 83a31314e0842c5754d887120f7c791db3a06e35 Mon Sep 17 00:00:00 2001 From: Mike Jensen Date: Mon, 10 Mar 2014 10:53:49 -0600 Subject: [PATCH] Added implementation and tests for FutureTask. I also was missing the set operation for AtomicReference, and cleaned a couple things up from LockSupport. --- .../java/util/concurrent/FutureTask.java | 136 ++++++++++++++++++ .../concurrent/atomic/AtomicReference.java | 4 + .../util/concurrent/locks/LockSupport.java | 14 +- test/FutureTaskTest.java | 105 ++++++++++++++ 4 files changed, 250 insertions(+), 9 deletions(-) create mode 100644 classpath/java/util/concurrent/FutureTask.java create mode 100644 test/FutureTaskTest.java diff --git a/classpath/java/util/concurrent/FutureTask.java b/classpath/java/util/concurrent/FutureTask.java new file mode 100644 index 0000000000..b8b351f614 --- /dev/null +++ b/classpath/java/util/concurrent/FutureTask.java @@ -0,0 +1,136 @@ +package java.util.concurrent; + +import java.util.concurrent.atomic.AtomicReference; + +public class FutureTask implements RunnableFuture { + private enum State { New, Canceled, Running, Done }; + + private final AtomicReference currentState; + private final Callable callable; + private final Object notifyLock; + private volatile Thread runningThread; + private volatile T result; + private volatile Throwable failure; + + public FutureTask(final Runnable r, final T result) { + this(new Callable() { + @Override + public T call() { + r.run(); + + return result; + } + }); + } + + public FutureTask(Callable callable) { + currentState = new AtomicReference(State.New); + this.callable = callable; + notifyLock = new Object(); + runningThread = null; + result = null; + failure = null; + } + + @Override + public void run() { + // must set running thread before we change the state for cancel to work + runningThread = Thread.currentThread(); + try { + if (currentState.compareAndSet(State.New, State.Running)) { + try { + result = callable.call(); + } catch (Throwable t) { + failure = t; + } finally { + currentState.compareAndSet(State.Running, State.Done); + handleDone(); + } + } + } finally { + runningThread = null; + } + } + + private void handleDone() { + done(); + + synchronized (notifyLock) { + notifyLock.notifyAll(); + } + } + + protected void done() { + // default implementation does nothing, designed to be overridden + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (currentState.compareAndSet(State.New, State.Canceled)) { + handleDone(); + + return true; + } else if (mayInterruptIfRunning) { + Thread runningThread = this.runningThread; + if (runningThread != null) { + runningThread.interrupt(); + + return true; + } + } + + return false; + } + + @Override + public boolean isCancelled() { + return currentState.get() == State.Canceled; + } + + @Override + public boolean isDone() { + return currentState.get() == State.Done || isCancelled(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + try { + return get(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // not possible + throw new RuntimeException(e); + } + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, + TimeoutException { + long timeoutInMillis = unit.toMillis(timeout); + long startTime = 0; + if (timeoutInMillis < Long.MAX_VALUE) { + startTime = System.currentTimeMillis(); + } + long remainingTime; + synchronized (notifyLock) { + remainingTime = timeoutInMillis; + while (! isDone() && remainingTime > 0) { + notifyLock.wait(remainingTime); + + if (timeoutInMillis < Long.MAX_VALUE) { + remainingTime = timeoutInMillis - (System.currentTimeMillis() - startTime); + } + } + } + + if (remainingTime <= 0) { + throw new TimeoutException(); + } else if (currentState.get() == State.Canceled) { + throw new CancellationException(); + } else if (failure != null) { + throw new ExecutionException(failure); + } else { + return result; + } + } +} diff --git a/classpath/java/util/concurrent/atomic/AtomicReference.java b/classpath/java/util/concurrent/atomic/AtomicReference.java index 7571d928d6..b7f3c95121 100644 --- a/classpath/java/util/concurrent/atomic/AtomicReference.java +++ b/classpath/java/util/concurrent/atomic/AtomicReference.java @@ -32,6 +32,10 @@ public class AtomicReference implements java.io.Serializable { public T get() { return value; } + + public void set(T newValue) { + this.value = newValue; + } public void lazySet(T newValue) { unsafe.putOrderedObject(this, valueOffset, newValue); diff --git a/classpath/java/util/concurrent/locks/LockSupport.java b/classpath/java/util/concurrent/locks/LockSupport.java index 1f547c465d..7d0237e796 100644 --- a/classpath/java/util/concurrent/locks/LockSupport.java +++ b/classpath/java/util/concurrent/locks/LockSupport.java @@ -23,16 +23,12 @@ public class LockSupport { static { unsafe = Unsafe.getUnsafe(); try { - parkBlockerOffset = unsafe.objectFieldOffset(java.lang.Thread.class.getDeclaredField("parkBlocker")); + parkBlockerOffset = unsafe.objectFieldOffset(Thread.class.getDeclaredField("parkBlocker")); } catch (Exception ex) { throw new Error(ex); } } - private static void setBlocker(Thread t, Object arg) { - unsafe.putObject(t, parkBlockerOffset, arg); - } - public static void unpark(Thread thread) { if (thread != null) { unsafe.unpark(thread); @@ -53,16 +49,16 @@ public class LockSupport { private static void doParkNanos(Object blocker, long nanos) { Thread t = Thread.currentThread(); - setBlocker(t, blocker); + unsafe.putObject(t, parkBlockerOffset, blocker); unsafe.park(false, nanos); - setBlocker(t, null); + unsafe.putObject(t, parkBlockerOffset, null); } public static void parkUntil(Object blocker, long deadline) { Thread t = Thread.currentThread(); - setBlocker(t, blocker); + unsafe.putObject(t, parkBlockerOffset, blocker); unsafe.park(true, deadline); - setBlocker(t, null); + unsafe.putObject(t, parkBlockerOffset, null); } public static Object getBlocker(Thread t) { diff --git a/test/FutureTaskTest.java b/test/FutureTaskTest.java new file mode 100644 index 0000000000..7a8ea9d9a7 --- /dev/null +++ b/test/FutureTaskTest.java @@ -0,0 +1,105 @@ +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class FutureTaskTest { + private static final int DELAY_TIME = 10; + + public static void main(String[] args) throws InterruptedException, ExecutionException { + isDoneTest(false); + isDoneTest(true); + getCallableResultTest(); + getRunnableResultTest(); + getTimeoutFail(); + getExecutionExceptionTest(); + } + + private static void isDoneTest(final boolean throwException) { + RunnableFuture future = new FutureTask(new Runnable() { + @Override + public void run() { + if (throwException) { + throw new RuntimeException(); + } + } + }, null); + + // should finish the future + future.run(); + + if (! future.isDone()) { + throw new RuntimeException("Future should be done"); + } + } + + private static void getCallableResultTest() throws InterruptedException, ExecutionException { + final Object result = new Object(); + FutureTask future = new FutureTask(new Callable() { + @Override + public Object call() throws Exception { + return result; + } + }); + + future.run(); + if (future.get() != result) { + throw new RuntimeException("Bad result returned: " + future.get()); + } + } + + private static void getRunnableResultTest() throws InterruptedException, ExecutionException { + final Object result = new Object(); + FutureTask future = new FutureTask(new Runnable() { + @Override + public void run() { + // nothing here + } + }, result); + + future.run(); + if (future.get() != result) { + throw new RuntimeException("Bad result returned: " + future.get()); + } + } + + private static void getTimeoutFail() throws InterruptedException, + ExecutionException { + RunnableFuture future = new FutureTask(new Runnable() { + @Override + public void run() { + // wont run + } + }, null); + + long startTime = System.currentTimeMillis(); + try { + future.get(DELAY_TIME, TimeUnit.MILLISECONDS); + throw new RuntimeException("Exception should have been thrown"); + } catch (TimeoutException e) { + long catchTime = System.currentTimeMillis(); + if (catchTime - startTime < DELAY_TIME) { + throw new RuntimeException("get with timeout did not block long enough"); + } + } + } + + private static void getExecutionExceptionTest() throws InterruptedException, ExecutionException { + FutureTask future = new FutureTask(new Runnable() { + @Override + public void run() { + throw new RuntimeException(); + } + }, null); + + future.run(); + try { + future.get(); + throw new RuntimeException("Exception should have thrown"); + } catch (ExecutionException e) { + // expected + } + } +}