From 68fca60d21004387b1abadc20da8a4ceec5cdc32 Mon Sep 17 00:00:00 2001 From: Mike Jensen Date: Mon, 10 Mar 2014 19:06:37 -0600 Subject: [PATCH] Added interface BlockingDeque, and implementation for ExecutorCompletionService and LinkedBlockingQueue. I had to implement a blocking queue for ExecutorCompletionService. LinkedBlockingQueue could be very easily extended right now to implement the java 7 LinkedBlockingDeque. Right now LinkedBlockingQueue just synchronizes and depends on LinkedList implementation. But I wrote a very complete unit test suite so we if we want to put a more concurrent design here, we have a complete test suite to verify against.# Please enter the commit message for your changes. Lines starting --- .../java/util/concurrent/BlockingDeque.java | 13 + .../concurrent/ExecutorCompletionService.java | 63 +++ .../util/concurrent/LinkedBlockingQueue.java | 327 +++++++++++++ test/CompleteionServiceTest.java | 56 +++ test/LinkedBlockingQueueTest.java | 431 ++++++++++++++++++ 5 files changed, 890 insertions(+) create mode 100644 classpath/java/util/concurrent/BlockingDeque.java create mode 100644 classpath/java/util/concurrent/ExecutorCompletionService.java create mode 100644 classpath/java/util/concurrent/LinkedBlockingQueue.java create mode 100644 test/CompleteionServiceTest.java create mode 100644 test/LinkedBlockingQueueTest.java diff --git a/classpath/java/util/concurrent/BlockingDeque.java b/classpath/java/util/concurrent/BlockingDeque.java new file mode 100644 index 0000000000..7d8fc9945e --- /dev/null +++ b/classpath/java/util/concurrent/BlockingDeque.java @@ -0,0 +1,13 @@ +package java.util.concurrent; + +import java.util.Deque; + +public interface BlockingDeque extends Deque, BlockingQueue { + public T takeFirst() throws InterruptedException; + + public T takeLast() throws InterruptedException; + + public T pollFirst(long timeout, TimeUnit unit) throws InterruptedException; + + public T pollLast(long timeout, TimeUnit unit) throws InterruptedException; +} diff --git a/classpath/java/util/concurrent/ExecutorCompletionService.java b/classpath/java/util/concurrent/ExecutorCompletionService.java new file mode 100644 index 0000000000..fc5bb91b51 --- /dev/null +++ b/classpath/java/util/concurrent/ExecutorCompletionService.java @@ -0,0 +1,63 @@ +package java.util.concurrent; + +public class ExecutorCompletionService implements CompletionService { + private final Executor executor; + private final BlockingQueue> completionQueue; + + public ExecutorCompletionService(Executor executor) { + this(executor, new LinkedBlockingQueue>()); + } + + public ExecutorCompletionService(Executor executor, BlockingQueue> completionQueue) { + this.executor = executor; + this.completionQueue = completionQueue; + } + + @Override + public Future submit(Callable task) { + ECSFuture f = new ECSFuture(task); + + executor.execute(f); + + return f; + } + + @Override + public Future submit(Runnable task, T result) { + ECSFuture f = new ECSFuture(task, result); + + executor.execute(f); + + return f; + } + + @Override + public Future take() throws InterruptedException { + return completionQueue.take(); + } + + @Override + public Future poll() { + return completionQueue.poll(); + } + + @Override + public Future poll(long timeout, TimeUnit unit) throws InterruptedException { + return completionQueue.poll(timeout, unit); + } + + private class ECSFuture extends FutureTask implements Future { + private ECSFuture(Runnable r, T result) { + super(r, result); + } + + private ECSFuture(Callable callable) { + super(callable); + } + + @Override + protected void done() { + completionQueue.add(this); + } + } +} diff --git a/classpath/java/util/concurrent/LinkedBlockingQueue.java b/classpath/java/util/concurrent/LinkedBlockingQueue.java new file mode 100644 index 0000000000..bf655c2d5b --- /dev/null +++ b/classpath/java/util/concurrent/LinkedBlockingQueue.java @@ -0,0 +1,327 @@ +package java.util.concurrent; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.NoSuchElementException; + +public class LinkedBlockingQueue implements BlockingQueue { + private final Object collectionLock; + private final LinkedList storage; + private final int capacity; + + public LinkedBlockingQueue() { + this(Integer.MAX_VALUE); + } + + public LinkedBlockingQueue(int capacity) { + collectionLock = new Object(); + this.capacity = capacity; + storage = new LinkedList(); + } + + // should be synchronized on collectionLock before calling + private void handleRemove() { + collectionLock.notifyAll(); + } + + // should be synchronized on collectionLock before calling + private void handleAdd() { + collectionLock.notifyAll(); + } + + // should be synchronized on collectionLock before calling + private void blockTillNotFull() throws InterruptedException { + blockTillNotFull(Long.MAX_VALUE); + } + + // should be synchronized on collectionLock before calling + private void blockTillNotFull(long maxWaitInMillis) throws InterruptedException { + if (capacity > storage.size()) { + return; + } + + long startTime = 0; + if (maxWaitInMillis != Long.MAX_VALUE) { + startTime = System.currentTimeMillis(); + } + long remainingWait = maxWaitInMillis; + while (remainingWait > 0) { + collectionLock.wait(remainingWait); + + if (capacity > storage.size()) { + return; + } else if (maxWaitInMillis != Long.MAX_VALUE) { + remainingWait = maxWaitInMillis - (System.currentTimeMillis() - startTime); + } + } + } + + // should be synchronized on collectionLock before calling + private void blockTillNotEmpty() throws InterruptedException { + blockTillNotEmpty(Long.MAX_VALUE); + } + + // should be synchronized on collectionLock before calling + private void blockTillNotEmpty(long maxWaitInMillis) throws InterruptedException { + if (! storage.isEmpty()) { + return; + } + + long startTime = 0; + if (maxWaitInMillis != Long.MAX_VALUE) { + startTime = System.currentTimeMillis(); + } + long remainingWait = maxWaitInMillis; + while (remainingWait > 0) { + collectionLock.wait(remainingWait); + + if (! storage.isEmpty()) { + return; + } else if (maxWaitInMillis != Long.MAX_VALUE) { + remainingWait = maxWaitInMillis - (System.currentTimeMillis() - startTime); + } + } + } + + @Override + public boolean add(T element) { + if (! offer(element)) { + throw new IllegalStateException("At capacity"); + } + + return true; + } + + @Override + public boolean offer(T element) { + synchronized (collectionLock) { + if (capacity > storage.size()) { + storage.addLast(element); + + handleAdd(); + + return true; + } else { + return false; + } + } + } + + @Override + public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException { + long timeoutInMillis = unit.toMillis(timeout); + synchronized (collectionLock) { + // block till we can add or have reached timeout + blockTillNotFull(timeoutInMillis); + + return offer(e); + } + } + + @Override + public void put(T e) throws InterruptedException { + synchronized (collectionLock) { + // block till we have space + blockTillNotFull(); + + storage.add(e); + handleAdd(); + } + } + + @Override + public boolean addAll(Collection c) { + synchronized (collectionLock) { + if (storage.size() + c.size() > capacity) { + throw new IllegalStateException("Not enough space"); + } + + if (c.isEmpty()) { + return false; + } else { + storage.addAll(c); + + return true; + } + } + } + + @Override + public T peek() { + synchronized (collectionLock) { + if (storage.isEmpty()) { + return null; + } else { + return storage.getFirst(); + } + } + } + + @Override + public T element() { + T result = peek(); + + if (result == null) { + throw new NoSuchElementException(); + } + + return result; + } + + // should be synchronized on collectionLock before calling + private T removeFirst() { + T result = storage.removeFirst(); + handleRemove(); + + return result; + } + + @Override + public T poll() { + synchronized (collectionLock) { + if (storage.isEmpty()) { + return null; + } else { + return removeFirst(); + } + } + } + + @Override + public T poll(long timeout, TimeUnit unit) throws InterruptedException { + long timeoutInMillis = unit.toMillis(timeout); + synchronized (collectionLock) { + // block till we available or timeout + blockTillNotEmpty(timeoutInMillis); + + return poll(); + } + } + + @Override + public T take() throws InterruptedException { + synchronized (collectionLock) { + // block till we available + blockTillNotEmpty(); + + return removeFirst(); + } + } + + @Override + public T remove() { + T result = poll(); + + if (result == null) { + throw new NoSuchElementException(); + } + + return result; + } + + @Override + public int drainTo(Collection c) { + return drainTo(c, Integer.MAX_VALUE); + } + + @Override + public int drainTo(Collection c, int maxElements) { + int remainingElements = maxElements; + synchronized (collectionLock) { + while (remainingElements > 0 && ! storage.isEmpty()) { + c.add(storage.removeFirst()); + remainingElements--; + } + + if (remainingElements != maxElements) { + handleRemove(); + } + + return maxElements - remainingElements; + } + } + + @Override + public int remainingCapacity() { + synchronized (collectionLock) { + return capacity - storage.size(); + } + } + + @Override + public int size() { + synchronized (collectionLock) { + return storage.size(); + } + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public boolean contains(Object element) { + synchronized (collectionLock) { + return storage.contains(element); + } + } + + @Override + public boolean containsAll(Collection c) { + synchronized (collectionLock) { + return storage.containsAll(c); + } + } + + @Override + public boolean remove(Object element) { + synchronized (collectionLock) { + if (storage.remove(element)) { + handleRemove(); + return true; + } else { + return false; + } + } + } + + @Override + public boolean removeAll(Collection c) { + synchronized (collectionLock) { + if (storage.removeAll(c)) { + handleRemove(); + return true; + } else { + return false; + } + } + } + + @Override + public void clear() { + synchronized (collectionLock) { + storage.clear(); + } + } + + @Override + public Object[] toArray() { + synchronized (collectionLock) { + return storage.toArray(); + } + } + + @Override + public S[] toArray(S[] array) { + synchronized (collectionLock) { + return storage.toArray(array); + } + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException("Not implemented yet"); + } +} diff --git a/test/CompleteionServiceTest.java b/test/CompleteionServiceTest.java new file mode 100644 index 0000000000..257fd5a0ce --- /dev/null +++ b/test/CompleteionServiceTest.java @@ -0,0 +1,56 @@ +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.TimeUnit; + +public class CompleteionServiceTest { + public static void main(String args[]) throws InterruptedException, ExecutionException { + Executor dumbExecutor = new Executor() { + @Override + public void execute(Runnable task) { + new Thread(task).start(); + } + }; + + pollNoResultTest(dumbExecutor); + pollTimeoutNoResultTest(dumbExecutor); + takeTest(dumbExecutor); + } + + private static void pollNoResultTest(Executor executor) { + ExecutorCompletionService ecs = new ExecutorCompletionService(executor); + + if (ecs.poll() != null) { + throw new RuntimeException(); + } + } + + private static void pollTimeoutNoResultTest(Executor executor) throws InterruptedException { + long delayTime = 0; + ExecutorCompletionService ecs = new ExecutorCompletionService(executor); + + long startTime = System.currentTimeMillis(); + if (ecs.poll(delayTime, TimeUnit.MILLISECONDS) != null) { + throw new RuntimeException(); + } + if (System.currentTimeMillis() - startTime < delayTime) { + throw new RuntimeException(); + } + } + + private static void takeTest(Executor executor) throws InterruptedException, ExecutionException { + ExecutorCompletionService ecs = new ExecutorCompletionService(executor); + final Object result = new Object(); + ecs.submit(new Callable() { + @Override + public Object call() throws Exception { + return result; + } + }); + + if (ecs.take().get() != result) { + throw new RuntimeException(); + } + } +} diff --git a/test/LinkedBlockingQueueTest.java b/test/LinkedBlockingQueueTest.java new file mode 100644 index 0000000000..664643ec17 --- /dev/null +++ b/test/LinkedBlockingQueueTest.java @@ -0,0 +1,431 @@ +import java.util.LinkedList; +import java.util.NoSuchElementException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + + +public class LinkedBlockingQueueTest { + private static final int DELAY_TILL_ACTION = 10; + + public static void main(String[] args) throws InterruptedException { + remainingCapacityTest(); + sizeTest(); + isEmptyTest(); + addTest(); + addCapacityFail(); + offerTest(); + offerWithTimeoutTest(); + offerTimeoutTest(); + putTest(); + addAllTest(); + addAllFail(); + elementTest(); + elementFail(); + pollEmptyTest(); + pollTest(); + pollTimeoutTest(); + takeTest(); + removeEmptyTest(); + removeTest(); + drainToTest(); + drainToLimitTest(); + containsTest(); + containsAllTest(); + removeObjectTest(); + removeAllTest(); + clearTest(); + toArrayTest(); + } + + private static void remainingCapacityTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(2); + if (lbq.remainingCapacity() != 2) { + throw new RuntimeException(); + } + + lbq.add(new Object()); + if (lbq.remainingCapacity() != 1) { + throw new RuntimeException(); + } + } + + private static void sizeTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + if (lbq.size() != 0) { + throw new RuntimeException(); + } + + lbq.add(new Object()); + if (lbq.size() != 1) { + throw new RuntimeException(); + } + } + + private static void isEmptyTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + if (! lbq.isEmpty()) { + throw new RuntimeException(); + } + + lbq.add(new Object()); + if (lbq.isEmpty()) { + throw new RuntimeException(); + } + } + + private static void addTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + Object testObject = new Object(); + lbq.add(testObject); + + if (lbq.size() != 1) { + throw new RuntimeException(); + } else if (lbq.peek() != testObject) { + throw new RuntimeException(); + } + } + + private static void addCapacityFail() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(1); + Object testObject = new Object(); + lbq.add(testObject); + + try { + lbq.add(new Object()); + throw new RuntimeException("Exception should have thrown"); + } catch (IllegalStateException e) { + // expected + } + + if (lbq.size() != 1) { + throw new RuntimeException(); + } else if (lbq.peek() != testObject) { + throw new RuntimeException(); + } + } + + private static void offerTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(1); + Object testObject = new Object(); + if (! lbq.offer(testObject)) { + throw new RuntimeException(); + } + if (lbq.offer(new Object())) { + throw new RuntimeException(); + } + + if (lbq.size() != 1) { + throw new RuntimeException(); + } else if (lbq.peek() != testObject) { + throw new RuntimeException(); + } + } + + private static void offerWithTimeoutTest() throws InterruptedException { + final LinkedBlockingQueue lbq = new LinkedBlockingQueue(1); + lbq.add(new Object()); + + new Thread(new Runnable() { + @Override + public void run() { + try { + // sleep to make sure offer call starts first + Thread.sleep(DELAY_TILL_ACTION); + lbq.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }).start(); + + // should accept once thread starts + if (! lbq.offer(new Object(), 10, TimeUnit.SECONDS)) { + throw new RuntimeException(); + } + } + + private static void offerTimeoutTest() throws InterruptedException { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(1); + lbq.add(new Object()); + + if (lbq.offer(new Object(), 10, TimeUnit.MILLISECONDS)) { + throw new RuntimeException(); + } + } + + private static void putTest() throws InterruptedException { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + Object testObject = new Object(); + lbq.put(testObject); + + if (lbq.size() != 1) { + throw new RuntimeException(); + } else if (lbq.peek() != testObject) { + throw new RuntimeException(); + } + } + + private static void addAllTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + LinkedList toAdd = new LinkedList(); + toAdd.add(new Object()); + toAdd.add(new Object()); + + lbq.addAll(toAdd); + + if (lbq.size() != toAdd.size()) { + throw new RuntimeException(); + } + while (! lbq.isEmpty()) { + if (lbq.remove() != toAdd.remove()) { + throw new RuntimeException(); + } + } + } + + private static void addAllFail() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(1); + LinkedList toAdd = new LinkedList(); + toAdd.add(new Object()); + toAdd.add(new Object()); + + try { + lbq.addAll(toAdd); + throw new RuntimeException("Exception should have thrown"); + } catch (IllegalStateException e) { + // expected + } + } + + private static void elementTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + Object testObject = new Object(); + lbq.add(testObject); + + if (lbq.element() != testObject) { + throw new RuntimeException(); + } + } + + private static void elementFail() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + + try { + lbq.element(); + throw new RuntimeException("Exception should have thrown"); + } catch (NoSuchElementException e) { + // expected + } + } + + private static void pollEmptyTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + + if (lbq.poll() != null) { + throw new RuntimeException(); + } + } + + private static void pollTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + Object testObject = new Object(); + lbq.add(testObject); + + if (lbq.poll() != testObject) { + throw new RuntimeException(); + } + } + + private static void pollTimeoutTest() throws InterruptedException { + final LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + final Object testObject = new Object(); + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(DELAY_TILL_ACTION); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + lbq.add(testObject); + } + }).start(); + + + if (lbq.poll(DELAY_TILL_ACTION * 2, TimeUnit.MILLISECONDS) != testObject) { + throw new RuntimeException(); + } + } + + private static void takeTest() throws InterruptedException { + final LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + final Object testObject = new Object(); + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(DELAY_TILL_ACTION); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + lbq.add(testObject); + } + }).start(); + + + if (lbq.take() != testObject) { + throw new RuntimeException(); + } + } + + private static void removeEmptyTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + + try { + lbq.remove(); + throw new RuntimeException("Exception should have thrown"); + } catch (NoSuchElementException e) { + // expected + } + } + + private static void removeTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + Object testObject = new Object(); + lbq.add(testObject); + + if (lbq.remove() != testObject) { + throw new RuntimeException(); + } + } + + private static void drainToTest() { + int objQty = 2; + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + for (int i = 0; i < objQty; i++) { + lbq.add(new Object()); + } + + LinkedList drainToResult = new LinkedList(); + if (lbq.drainTo(drainToResult) != objQty) { + throw new RuntimeException(); + } else if (drainToResult.size() != objQty) { + throw new RuntimeException(); + } + } + + private static void drainToLimitTest() { + int objQty = 4; + int limit = 2; + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + for (int i = 0; i < objQty; i++) { + lbq.add(new Object()); + } + + LinkedList drainToResult = new LinkedList(); + if (lbq.drainTo(drainToResult, limit) != limit) { + throw new RuntimeException(); + } else if (drainToResult.size() != limit) { + throw new RuntimeException(); + } else if (lbq.size() != objQty - limit) { + throw new RuntimeException(); + } + } + + private static void containsTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + Object testObject = new Object(); + + if (lbq.contains(testObject)) { + throw new RuntimeException(); + } + + lbq.add(testObject); + if (! lbq.contains(testObject)) { + throw new RuntimeException(); + } + } + + private static void containsAllTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + Object testObject = new Object(); + lbq.add(testObject); + + LinkedList testList = new LinkedList(); + testList.add(testObject); + testList.add(new Object()); + + if (lbq.containsAll(testList)) { + throw new RuntimeException(); + } + + lbq.addAll(testList); + if (! lbq.containsAll(testList)) { + throw new RuntimeException(); + } + } + + private static void removeObjectTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + Object testObject = new Object(); + + if (lbq.remove(testObject)) { + throw new RuntimeException(); + } + + lbq.add(testObject); + if (! lbq.remove(testObject)) { + throw new RuntimeException(); + } + } + + private static void removeAllTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + Object testObject = new Object(); + lbq.add(testObject); + + LinkedList testList = new LinkedList(); + testList.add(testObject); + testList.add(new Object()); + + if (! lbq.removeAll(testList)) { + throw new RuntimeException(); + } + + lbq.addAll(testList); + if (! lbq.removeAll(testList)) { + throw new RuntimeException(); + } + } + + private static void clearTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + lbq.add(new Object()); + + lbq.clear(); + + if (! lbq.isEmpty()) { + throw new RuntimeException(); + } + } + + private static void toArrayTest() { + LinkedBlockingQueue lbq = new LinkedBlockingQueue(); + + if (lbq.toArray().length != 0) { + throw new RuntimeException(); + } + + Object testObject = new Object(); + lbq.add(testObject); + + Object[] result = lbq.toArray(); + if (result.length != 1) { + throw new RuntimeException(); + } else if (result[0] != testObject) { + throw new RuntimeException(); + } + } +}