package java.util.concurrent; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.NoSuchElementException; public class LinkedBlockingQueue implements BlockingQueue { private final Object collectionLock; private final LinkedList storage; private final int capacity; public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { collectionLock = new Object(); this.capacity = capacity; storage = new LinkedList(); } // should be synchronized on collectionLock before calling private void handleRemove() { collectionLock.notifyAll(); } // should be synchronized on collectionLock before calling private void handleAdd() { collectionLock.notifyAll(); } // should be synchronized on collectionLock before calling private void blockTillNotFull() throws InterruptedException { blockTillNotFull(Long.MAX_VALUE); } // should be synchronized on collectionLock before calling private void blockTillNotFull(long maxWaitInMillis) throws InterruptedException { if (capacity > storage.size()) { return; } long startTime = 0; if (maxWaitInMillis != Long.MAX_VALUE) { startTime = System.currentTimeMillis(); } long remainingWait = maxWaitInMillis; while (remainingWait > 0) { collectionLock.wait(remainingWait); if (capacity > storage.size()) { return; } else if (maxWaitInMillis != Long.MAX_VALUE) { remainingWait = maxWaitInMillis - (System.currentTimeMillis() - startTime); } } } // should be synchronized on collectionLock before calling private void blockTillNotEmpty() throws InterruptedException { blockTillNotEmpty(Long.MAX_VALUE); } // should be synchronized on collectionLock before calling private void blockTillNotEmpty(long maxWaitInMillis) throws InterruptedException { if (! storage.isEmpty()) { return; } long startTime = 0; if (maxWaitInMillis != Long.MAX_VALUE) { startTime = System.currentTimeMillis(); } long remainingWait = maxWaitInMillis; while (remainingWait > 0) { collectionLock.wait(remainingWait); if (! storage.isEmpty()) { return; } else if (maxWaitInMillis != Long.MAX_VALUE) { remainingWait = maxWaitInMillis - (System.currentTimeMillis() - startTime); } } } @Override public boolean add(T element) { if (! offer(element)) { throw new IllegalStateException("At capacity"); } return true; } @Override public boolean offer(T element) { synchronized (collectionLock) { if (capacity > storage.size()) { storage.addLast(element); handleAdd(); return true; } else { return false; } } } @Override public boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException { long timeoutInMillis = unit.toMillis(timeout); synchronized (collectionLock) { // block till we can add or have reached timeout blockTillNotFull(timeoutInMillis); return offer(e); } } @Override public void put(T e) throws InterruptedException { synchronized (collectionLock) { // block till we have space blockTillNotFull(); storage.add(e); handleAdd(); } } @Override public boolean addAll(Collection c) { synchronized (collectionLock) { if (storage.size() + c.size() > capacity) { throw new IllegalStateException("Not enough space"); } if (c.isEmpty()) { return false; } else { storage.addAll(c); return true; } } } @Override public T peek() { synchronized (collectionLock) { if (storage.isEmpty()) { return null; } else { return storage.getFirst(); } } } @Override public T element() { T result = peek(); if (result == null) { throw new NoSuchElementException(); } return result; } // should be synchronized on collectionLock before calling private T removeFirst() { T result = storage.removeFirst(); handleRemove(); return result; } @Override public T poll() { synchronized (collectionLock) { if (storage.isEmpty()) { return null; } else { return removeFirst(); } } } @Override public T poll(long timeout, TimeUnit unit) throws InterruptedException { long timeoutInMillis = unit.toMillis(timeout); synchronized (collectionLock) { // block till we available or timeout blockTillNotEmpty(timeoutInMillis); return poll(); } } @Override public T take() throws InterruptedException { synchronized (collectionLock) { // block till we available blockTillNotEmpty(); return removeFirst(); } } @Override public T remove() { T result = poll(); if (result == null) { throw new NoSuchElementException(); } return result; } @Override public int drainTo(Collection c) { return drainTo(c, Integer.MAX_VALUE); } @Override public int drainTo(Collection c, int maxElements) { int remainingElements = maxElements; synchronized (collectionLock) { while (remainingElements > 0 && ! storage.isEmpty()) { c.add(storage.removeFirst()); remainingElements--; } if (remainingElements != maxElements) { handleRemove(); } return maxElements - remainingElements; } } @Override public int remainingCapacity() { synchronized (collectionLock) { return capacity - storage.size(); } } @Override public int size() { synchronized (collectionLock) { return storage.size(); } } @Override public boolean isEmpty() { return size() == 0; } @Override public boolean contains(Object element) { synchronized (collectionLock) { return storage.contains(element); } } @Override public boolean containsAll(Collection c) { synchronized (collectionLock) { return storage.containsAll(c); } } @Override public boolean remove(Object element) { synchronized (collectionLock) { if (storage.remove(element)) { handleRemove(); return true; } else { return false; } } } @Override public boolean removeAll(Collection c) { synchronized (collectionLock) { if (storage.removeAll(c)) { handleRemove(); return true; } else { return false; } } } @Override public void clear() { synchronized (collectionLock) { storage.clear(); } } @Override public Object[] toArray() { synchronized (collectionLock) { return storage.toArray(); } } @Override public S[] toArray(S[] array) { synchronized (collectionLock) { return storage.toArray(array); } } @Override public Iterator iterator() { throw new UnsupportedOperationException("Not implemented yet"); } }