Merge pull request #149 from jentfoo/concurrency_classpath_extension

Concurrency classpath extension (part of the atomic package implementation)
This commit is contained in:
Joel Dice 2014-01-03 16:06:04 -08:00
commit 1f6051bcbc
5 changed files with 601 additions and 0 deletions

View File

@ -0,0 +1,57 @@
package java.util.concurrent.atomic;
public class AtomicBoolean implements java.io.Serializable {
private static final long serialVersionUID = 4654671469794556979L;
private static final int FALSE_VALUE = 0;
private static final int TRUE_VALUE = 1;
private final AtomicInteger value;
public AtomicBoolean() {
this(false);
}
public AtomicBoolean(boolean initialValue) {
value = new AtomicInteger(intValue(initialValue));
}
private static int intValue(boolean value) {
return value ? TRUE_VALUE : FALSE_VALUE;
}
private static boolean booleanValue(int value) {
return value == TRUE_VALUE;
}
public boolean get() {
return booleanValue(value.get());
}
public boolean compareAndSet(boolean expect, boolean update) {
return value.compareAndSet(intValue(expect), intValue(update));
}
public boolean weakCompareAndSet(boolean expect, boolean update) {
return value.weakCompareAndSet(intValue(expect), intValue(update));
}
public void set(boolean newValue) {
value.set(intValue(newValue));
}
public void lazySet(boolean newValue) {
value.lazySet(intValue(newValue));
}
public boolean getAndSet(boolean newValue) {
int intResult = value.getAndSet(intValue(newValue));
return booleanValue(intResult);
}
@Override
public String toString() {
return Boolean.toString(get());
}
}

View File

@ -0,0 +1,126 @@
package java.util.concurrent.atomic;
import java.lang.reflect.Field;
import sun.misc.Unsafe;
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
Field<?> f = AtomicInteger.class.getDeclaredField("value");
valueOffset = unsafe.objectFieldOffset(f);
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
private volatile int value;
public AtomicInteger() {
this(0);
}
public AtomicInteger(int initialValue) {
this.value = initialValue;
}
public int get() {
return value;
}
public void set(int newValue) {
this.value = newValue;
}
public void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}
public boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public int getAndSet(int newValue) {
while (true) {
int current = value;
if (compareAndSet(current, newValue)) {
return current;
}
}
}
public int getAndAdd(int delta) {
while (true) {
int current = value;
int next = current + delta;
if (compareAndSet(current, next)) {
return current;
}
}
}
public int getAndIncrement() {
return getAndAdd(1);
}
public int getAndDecrement() {
return getAndAdd(-1);
}
public int addAndGet(int delta) {
while (true) {
int current = value;
int next = current + delta;
if (compareAndSet(current, next)) {
return next;
}
}
}
public int incrementAndGet() {
return addAndGet(1);
}
public int decrementAndGet() {
return addAndGet(-1);
}
@Override
public byte byteValue() {
return (byte)value;
}
@Override
public short shortValue() {
return (short)value;
}
@Override
public int intValue() {
return value;
}
@Override
public long longValue() {
return value;
}
@Override
public float floatValue() {
return value;
}
@Override
public double doubleValue() {
return value;
}
}

View File

@ -0,0 +1,126 @@
package java.util.concurrent.atomic;
import java.lang.reflect.Field;
import sun.misc.Unsafe;
public class AtomicLong extends Number implements java.io.Serializable {
private static final long serialVersionUID = 1927816293512124184L;
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
Field<?> f = AtomicLong.class.getDeclaredField("value");
valueOffset = unsafe.objectFieldOffset(f);
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
private volatile long value;
public AtomicLong() {
this(0);
}
public AtomicLong(long initialValue) {
this.value = initialValue;
}
public long get() {
return value;
}
public void set(long newValue) {
this.value = newValue;
}
public void lazySet(long newValue) {
unsafe.putOrderedLong(this, valueOffset, newValue);
}
public boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
public boolean weakCompareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
public long getAndSet(long newValue) {
while (true) {
long current = value;
if (compareAndSet(current, newValue)) {
return current;
}
}
}
public long getAndAdd(long delta) {
while (true) {
long current = value;
long next = current + delta;
if (compareAndSet(current, next)) {
return current;
}
}
}
public long getAndIncrement() {
return getAndAdd(1);
}
public long getAndDecrement() {
return getAndAdd(-1);
}
public long addAndGet(long delta) {
while (true) {
long current = value;
long next = current + delta;
if (compareAndSet(current, next)) {
return next;
}
}
}
public long incrementAndGet() {
return addAndGet(1);
}
public long decrementAndGet() {
return addAndGet(-1);
}
@Override
public byte byteValue() {
return (byte)value;
}
@Override
public short shortValue() {
return (short)value;
}
@Override
public int intValue() {
return (int)value;
}
@Override
public long longValue() {
return value;
}
@Override
public float floatValue() {
return value;
}
@Override
public double doubleValue() {
return value;
}
}

View File

@ -0,0 +1,61 @@
package java.util.concurrent.atomic;
import java.lang.reflect.Field;
import sun.misc.Unsafe;
public class AtomicReference<T> implements java.io.Serializable {
private static final long serialVersionUID = -1848883965231344442L;
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
Field<?> f = AtomicReference.class.getDeclaredField("value");
valueOffset = unsafe.objectFieldOffset(f);
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
private volatile T value;
public AtomicReference() {
this(null);
}
public AtomicReference(T initialValue) {
this.value = initialValue;
}
public T get() {
return value;
}
public void lazySet(T newValue) {
unsafe.putOrderedObject(this, valueOffset, newValue);
}
public boolean compareAndSet(T expect, T update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
public boolean weakCompareAndSet(T expect, T update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}
public final T getAndSet(T newValue) {
while (true) {
T current = value;
if (compareAndSet(current, newValue)) {
return current;
}
}
}
@Override
public String toString() {
return String.valueOf(value);
}
}

231
test/AtomicTests.java Normal file
View File

@ -0,0 +1,231 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicTests {
private static final int threadCount = 10;
private static final int iterationsPerThread = 100;
public static void main(String[] args) {
runAtomicIntegerTest(true);
runAtomicIntegerTest(false);
runAtomicLongTest(true);
runAtomicLongTest(false);
runAtomicReferenceTest();
}
private static void blockTillThreadsDone(AtomicInteger threadDoneCount) throws InterruptedException {
synchronized (threadDoneCount) {
while (threadDoneCount.get() < threadCount) {
threadDoneCount.wait();
}
}
}
private static void runAtomicIntegerTest(final boolean increment) {
final AtomicInteger result = new AtomicInteger();
final AtomicInteger threadDoneCount = new AtomicInteger();
// only using an AtomicBoolean here so I don't need two variables to do the synchronize/wait/notify
final AtomicBoolean threadsStart = new AtomicBoolean(false);
Runnable operationRunnable = new Runnable() {
@Override
public void run() {
boolean flip = true;
for (int i = 0; i < iterationsPerThread; i++) {
if (flip) {
if (increment) {
result.incrementAndGet();
} else {
result.decrementAndGet();
}
flip = false;
} else {
if (increment) {
result.getAndIncrement();
} else {
result.getAndDecrement();
}
flip = true;
}
}
}
};
for (int i = 0; i < threadCount; i++) {
new Thread(new DelayedRunnable(threadsStart,
operationRunnable,
threadDoneCount)).start();
}
synchronized (threadsStart) {
threadsStart.set(true);
threadsStart.notifyAll();
}
try {
blockTillThreadsDone(threadDoneCount);
} catch (InterruptedException e) {
// let thread exit
return;
}
int expectedResult = threadCount * iterationsPerThread;
if (! increment) {
expectedResult *= -1;
}
int resultValue = result.get();
if (resultValue != expectedResult) {
throw new IllegalStateException(resultValue + " != " + expectedResult);
}
}
private static void runAtomicLongTest(final boolean increment) {
final AtomicLong result = new AtomicLong();
final AtomicInteger threadDoneCount = new AtomicInteger();
// only using an AtomicBoolean here so I don't need two variables to do the synchronize/wait/notify
final AtomicBoolean threadsStart = new AtomicBoolean(false);
Runnable operationRunnable = new Runnable() {
@Override
public void run() {
boolean flip = true;
for (int i = 0; i < iterationsPerThread; i++) {
if (flip) {
if (increment) {
result.incrementAndGet();
} else {
result.decrementAndGet();
}
flip = false;
} else {
if (increment) {
result.getAndIncrement();
} else {
result.getAndDecrement();
}
flip = true;
}
}
}
};
for (int i = 0; i < threadCount; i++) {
new Thread(new DelayedRunnable(threadsStart,
operationRunnable,
threadDoneCount)).start();
}
synchronized (threadsStart) {
threadsStart.set(true);
threadsStart.notifyAll();
}
try {
blockTillThreadsDone(threadDoneCount);
} catch (InterruptedException e) {
// let thread exit
return;
}
long expectedResult = threadCount * iterationsPerThread;
if (! increment) {
expectedResult *= -1;
}
long resultValue = result.get();
if (resultValue != expectedResult) {
throw new IllegalStateException(resultValue + " != " + expectedResult);
}
}
private static void runAtomicReferenceTest() {
final AtomicReference<Integer> result = new AtomicReference<Integer>(0);
final AtomicInteger threadDoneCount = new AtomicInteger(0);
// only using an AtomicBoolean here so I don't need two variables to do the synchronize/wait/notify
final AtomicBoolean threadsStart = new AtomicBoolean(false);
Runnable operationRunnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < iterationsPerThread; i++) {
Integer current = result.get();
while (! result.compareAndSet(current, current + 1)) {
current = result.get();
}
}
}
};
for (int i = 0; i < threadCount; i++) {
new Thread(new DelayedRunnable(threadsStart,
operationRunnable,
threadDoneCount)).start();
}
synchronized (threadsStart) {
threadsStart.set(true);
threadsStart.notifyAll();
}
try {
blockTillThreadsDone(threadDoneCount);
} catch (InterruptedException e) {
// let thread exit
return;
}
long expectedResult = threadCount * iterationsPerThread;
Integer resultValue = result.get();
if (resultValue != expectedResult) {
throw new IllegalStateException(resultValue + " != " + expectedResult);
}
}
private static class DelayedRunnable implements Runnable {
private final AtomicBoolean threadsStart;
private final Runnable operationRunnable;
private final AtomicInteger threadDoneCount;
private DelayedRunnable(AtomicBoolean threadsStart,
Runnable operationRunnable,
AtomicInteger threadDoneCount) {
this.threadsStart = threadsStart;
this.operationRunnable = operationRunnable;
this.threadDoneCount = threadDoneCount;
}
@Override
public void run() {
try {
try {
waitTillReady();
} catch (InterruptedException e) {
// let thread exit
return;
}
operationRunnable.run();
} finally {
synchronized (threadDoneCount) {
threadDoneCount.incrementAndGet();
threadDoneCount.notifyAll();
}
}
}
private void waitTillReady() throws InterruptedException {
synchronized (threadsStart) {
while (! threadsStart.get()) {
threadsStart.wait();
}
}
}
}
}