reimplement Java object monitors to avoid running out of OS handles

Due to SWT's nasty habit of creating a new object monitor for every
task added to Display.asyncExec, we've found that, on Windows at
least, we tend to run out of OS handles due to the large number of
mutexes we create between garbage collections.

One way to address this might be to trigger a GC when either the
number of monitors created since the last GC exceeds a certain number
or when the total number of monitors in the VM reaches a certain
number.  Both of these risk hurting performance, especially if they
force major collections which would otherwise be infrequent.  Also,
it's hard to know what the values of such thresholds should be on a
given system.

Instead, we reimplement Java monitors using atomic compare-and-swap
(CAS) and thread-specific native locks for blocking in the case of
contention.  This way, we can create an arbitrary number of monitors
without creating any new native locks.  The total number of native
locks needed by the VM is bounded instead by the number of live
threads plus a small constant.

Note that if we ever add support for an architecture which does not
support CAS, we'll need to provide a fallback monitor implementation.
This commit is contained in:
Joel Dice 2010-02-01 17:56:09 -07:00
parent 45476eb591
commit 8120bee4dc
3 changed files with 338 additions and 41 deletions

View File

@ -1808,17 +1808,13 @@ removeMonitor(Thread* t, object o)
hash = objectHash(t, o);
}
object p = hashMapRemove(t, t->m->monitorMap, o, objectHash, objectEqual);
object m = hashMapRemove(t, t->m->monitorMap, o, objectHash, objectEqual);
expect(t, p);
expect(t, m);
if (DebugMonitors) {
fprintf(stderr, "dispose monitor %p for object %x\n",
static_cast<System::Monitor*>(pointerValue(t, p)),
hash);
fprintf(stderr, "dispose monitor %p for object %x\n", m, hash);
}
static_cast<System::Monitor*>(pointerValue(t, p))->dispose();
}
void
@ -2185,9 +2181,11 @@ Thread::Thread(Machine* m, object javaThread, Thread* parent):
parent(parent),
peer((parent ? parent->child : 0)),
child(0),
waitNext(0),
state(NoState),
criticalLevel(0),
systemThread(0),
lock(0),
javaThread(javaThread),
exception(0),
heapIndex(0),
@ -2201,6 +2199,7 @@ Thread::Thread(Machine* m, object javaThread, Thread* parent):
backupHeap(0),
backupHeapIndex(0),
backupHeapSizeInWords(0),
waiting(false),
tracing(false)
#ifdef VM_STRESS
, stress(false)
@ -2255,6 +2254,8 @@ Thread::init()
parent->child = this;
}
expect(this, m->system->success(m->system->make(&lock)));
if (javaThread == 0) {
this->javaThread = makeJavaThread(this, parent);
}
@ -2282,6 +2283,8 @@ Thread::exit()
void
Thread::dispose()
{
lock->dispose();
if (systemThread) {
systemThread->dispose();
}
@ -3419,49 +3422,43 @@ addFinalizer(Thread* t, object target, void (*finalize)(Thread*, object))
t->m->finalizers = f;
}
System::Monitor*
object
objectMonitor(Thread* t, object o, bool createNew)
{
assert(t, t->state == Thread::ActiveState);
object p = hashMapFind(t, t->m->monitorMap, o, objectHash, objectEqual);
object m = hashMapFind(t, t->m->monitorMap, o, objectHash, objectEqual);
if (p) {
if (m) {
if (DebugMonitors) {
fprintf(stderr, "found monitor %p for object %x\n",
static_cast<System::Monitor*>(pointerValue(t, p)),
objectHash(t, o));
fprintf(stderr, "found monitor %p for object %x\n", m, objectHash(t, o));
}
return static_cast<System::Monitor*>(pointerValue(t, p));
return m;
} else if (createNew) {
PROTECT(t, o);
ENTER(t, Thread::ExclusiveState);
p = hashMapFind(t, t->m->monitorMap, o, objectHash, objectEqual);
if (p) {
m = hashMapFind(t, t->m->monitorMap, o, objectHash, objectEqual);
if (m) {
if (DebugMonitors) {
fprintf(stderr, "found monitor %p for object %x\n",
static_cast<System::Monitor*>(pointerValue(t, p)),
objectHash(t, o));
m, objectHash(t, o));
}
return static_cast<System::Monitor*>(pointerValue(t, p));
return m;
}
System::Monitor* m;
System::Status s = t->m->system->make(&m);
expect(t, t->m->system->success(s));
object head = makeMonitorNode(t, 0, 0);
object m = makeMonitor(t, 0, 0, 0, head, head, 0);
PROTECT(t, m);
if (DebugMonitors) {
fprintf(stderr, "made monitor %p for object %x\n",
m,
objectHash(t, o));
fprintf(stderr, "made monitor %p for object %x\n", m, objectHash(t, o));
}
p = makePointer(t, m);
hashMapInsert(t, t->m->monitorMap, o, p, objectHash);
hashMapInsert(t, t->m->monitorMap, o, m, objectHash);
addFinalizer(t, o, removeMonitor);

View File

@ -17,6 +17,7 @@
#include "finder.h"
#include "processor.h"
#include "constants.h"
#include "arch.h"
#ifdef PLATFORM_WINDOWS
# define JNICALL __stdcall
@ -1212,6 +1213,7 @@ class Machine {
JNIEnvVTable jniEnvVTable;
uintptr_t* heapPool[ThreadHeapPoolSize];
unsigned heapPoolIndex;
unsigned recentMonitorCount;
};
void
@ -1345,9 +1347,11 @@ class Thread {
Thread* parent;
Thread* peer;
Thread* child;
Thread* waitNext;
State state;
unsigned criticalLevel;
System::Thread* systemThread;
System::Monitor* lock;
object javaThread;
object exception;
unsigned heapIndex;
@ -1360,6 +1364,7 @@ class Thread {
uintptr_t* backupHeap;
unsigned backupHeapIndex;
unsigned backupHeapSizeInWords;
bool waiting;
bool tracing;
#ifdef VM_STRESS
bool stress;
@ -2296,7 +2301,288 @@ parameterFootprint(Thread* t, const char* s, bool static_);
void
addFinalizer(Thread* t, object target, void (*finalize)(Thread*, object));
System::Monitor*
inline bool
monitorTryAcquire(Thread* t, object monitor)
{
if (monitorOwner(t, monitor) == t
or atomicCompareAndSwap
(reinterpret_cast<uintptr_t*>(&monitorOwner(t, monitor)), 0,
reinterpret_cast<uintptr_t>(t)))
{
++ monitorDepth(t, monitor);
return true;
} else {
return false;
}
}
inline bool
atomicCompareAndSwapObject(Thread* t, object target, unsigned offset,
object old, object new_)
{
if (atomicCompareAndSwap(&cast<uintptr_t>(target, offset),
reinterpret_cast<uintptr_t>(old),
reinterpret_cast<uintptr_t>(new_)))
{
mark(t, target, offset);
return true;
} else {
return false;
}
}
// The following two methods (monitorAtomicAppendAcquire and
// monitorAtomicPollAcquire) use the Michael and Scott Non-Blocking
// Queue Algorithm: http://www.cs.rochester.edu/u/michael/PODC96.html
inline void
monitorAtomicAppendAcquire(Thread* t, object monitor)
{
PROTECT(t, monitor);
object node = makeMonitorNode(t, t, 0);
while (true) {
object tail = monitorAcquireTail(t, monitor);
loadMemoryBarrier();
object next = monitorNodeNext(t, tail);
loadMemoryBarrier();
if (tail == monitorAcquireTail(t, monitor)) {
if (next) {
atomicCompareAndSwapObject
(t, monitor, MonitorAcquireTail, tail, next);
} else if (atomicCompareAndSwapObject
(t, tail, MonitorNodeNext, 0, node))
{
atomicCompareAndSwapObject
(t, monitor, MonitorAcquireTail, tail, node);
return;
}
}
}
}
inline Thread*
monitorAtomicPollAcquire(Thread* t, object monitor, bool remove)
{
while (true) {
object head = monitorAcquireHead(t, monitor);
loadMemoryBarrier();
object tail = monitorAcquireTail(t, monitor);
loadMemoryBarrier();
object next = monitorNodeNext(t, head);
loadMemoryBarrier();
if (head == monitorAcquireHead(t, monitor)) {
if (head == tail) {
if (next) {
atomicCompareAndSwapObject
(t, monitor, MonitorAcquireTail, tail, next);
} else {
return 0;
}
} else {
Thread* value = static_cast<Thread*>(monitorNodeValue(t, next));
if ((not remove)
or atomicCompareAndSwapObject
(t, monitor, MonitorAcquireHead, head, next))
{
return value;
}
}
}
}
}
inline void
monitorAcquire(Thread* t, object monitor)
{
if (not monitorTryAcquire(t, monitor)) {
PROTECT(t, monitor);
ACQUIRE(t, t->lock);
monitorAtomicAppendAcquire(t, monitor);
// note that we don't try to acquire the lock until we're first in
// line, both because it's fair and because we don't support
// removing elements from arbitrary positions in the queue
while (not (t == monitorAtomicPollAcquire(t, monitor, false)
and atomicCompareAndSwap
(reinterpret_cast<uintptr_t*>(&monitorOwner(t, monitor)), 0,
reinterpret_cast<uintptr_t>(t))))
{
ENTER(t, Thread::IdleState);
t->lock->wait(t->systemThread, 0);
}
expect(t, t == monitorAtomicPollAcquire(t, monitor, true));
++ monitorDepth(t, monitor);
}
}
inline void
monitorRelease(Thread* t, object monitor)
{
expect(t, monitorOwner(t, monitor) == t);
if (-- monitorDepth(t, monitor) == 0) {
monitorOwner(t, monitor) = 0;
storeLoadMemoryBarrier();
Thread* next = monitorAtomicPollAcquire(t, monitor, false);
if (next) {
ACQUIRE(t, next->lock);
next->lock->notify(t->systemThread);
}
}
}
inline void
monitorAppendWait(Thread* t, object monitor)
{
expect(t, not t->waiting);
expect(t, t->waitNext == 0);
t->waiting = true;
if (monitorWaitTail(t, monitor)) {
static_cast<Thread*>(monitorWaitTail(t, monitor))->waitNext = t;
} else {
monitorWaitHead(t, monitor) = t;
}
monitorWaitTail(t, monitor) = t;
}
inline void
monitorRemoveWait(Thread* t, object monitor)
{
Thread* previous = 0;
for (Thread* current = static_cast<Thread*>(monitorWaitHead(t, monitor));
current;)
{
if (t == current) {
if (current == monitorWaitHead(t, monitor)) {
monitorWaitHead(t, monitor) = t->waitNext;
} else {
previous->waitNext = t->waitNext;
}
if (current == monitorWaitTail(t, monitor)) {
monitorWaitTail(t, monitor) = previous;
}
t->waitNext = 0;
t->waiting = false;
return;
} else {
previous = current;
current = current->waitNext;
}
}
abort(t);
}
inline bool
monitorWait(Thread* t, object monitor, int64_t time)
{
expect(t, monitorOwner(t, monitor) == t);
bool interrupted;
unsigned depth;
bool stillWaiting;
PROTECT(t, monitor);
{ ACQUIRE(t, t->lock);
monitorAppendWait(t, monitor);
depth = monitorDepth(t, monitor);
monitorDepth(t, monitor) = 1;
monitorRelease(t, monitor);
ENTER(t, Thread::IdleState);
interrupted = t->lock->wait(t->systemThread, time);
stillWaiting = t->waiting;
}
monitorAcquire(t, monitor);
if (stillWaiting) {
monitorRemoveWait(t, monitor);
}
monitorOwner(t, monitor) = t;
monitorDepth(t, monitor) = depth;
return interrupted;
}
inline Thread*
monitorPollWait(Thread* t, object monitor)
{
Thread* next = static_cast<Thread*>(monitorWaitHead(t, monitor));
if (next) {
monitorWaitHead(t, monitor) = next->waitNext;
next->waitNext = 0;
if (next == monitorWaitTail(t, monitor)) {
monitorWaitTail(t, monitor) = 0;
}
}
return next;
}
inline bool
monitorNotify(Thread* t, object monitor)
{
expect(t, monitorOwner(t, monitor) == t);
Thread* next = monitorPollWait(t, monitor);
if (next) {
ACQUIRE_RAW(t, next->lock);
next->waiting = false;
next->lock->notify(t->systemThread);
return true;
} else {
return false;
}
}
inline void
monitorNotifyAll(Thread* t, object monitor)
{
while (monitorNotify(t, monitor)) { }
}
object
objectMonitor(Thread* t, object o, bool createNew);
inline void
@ -2307,14 +2593,14 @@ acquire(Thread* t, object o)
hash = objectHash(t, o);
}
System::Monitor* m = objectMonitor(t, o, true);
object m = objectMonitor(t, o, true);
if (DebugMonitors) {
fprintf(stderr, "thread %p acquires %p for %x\n",
t, m, hash);
}
acquire(t, m);
monitorAcquire(t, m);
}
inline void
@ -2325,14 +2611,14 @@ release(Thread* t, object o)
hash = objectHash(t, o);
}
System::Monitor* m = objectMonitor(t, o, false);
object m = objectMonitor(t, o, false);
if (DebugMonitors) {
fprintf(stderr, "thread %p releases %p for %x\n",
t, m, hash);
}
release(t, m);
monitorRelease(t, m);
}
inline void
@ -2343,17 +2629,19 @@ wait(Thread* t, object o, int64_t milliseconds)
hash = objectHash(t, o);
}
System::Monitor* m = objectMonitor(t, o, false);
object m = objectMonitor(t, o, false);
if (DebugMonitors) {
fprintf(stderr, "thread %p waits %d millis on %p for %x\n",
t, static_cast<int>(milliseconds), m, hash);
}
if (m and m->owner() == t->systemThread) {
if (m and monitorOwner(t, m) == t) {
PROTECT(t, m);
bool interrupted;
{ ENTER(t, Thread::IdleState);
interrupted = m->wait(t->systemThread, milliseconds);
interrupted = monitorWait(t, m, milliseconds);
}
if (interrupted) {
@ -2379,15 +2667,15 @@ notify(Thread* t, object o)
hash = objectHash(t, o);
}
System::Monitor* m = objectMonitor(t, o, false);
object m = objectMonitor(t, o, false);
if (DebugMonitors) {
fprintf(stderr, "thread %p notifies on %p for %x\n",
t, m, hash);
}
if (m and m->owner() == t->systemThread) {
m->notify(t->systemThread);
if (m and monitorOwner(t, m) == t) {
monitorNotify(t, m);
} else {
t->exception = makeIllegalMonitorStateException(t);
}
@ -2396,15 +2684,15 @@ notify(Thread* t, object o)
inline void
notifyAll(Thread* t, object o)
{
System::Monitor* m = objectMonitor(t, o, false);
object m = objectMonitor(t, o, false);
if (DebugMonitors) {
fprintf(stderr, "thread %p notifies all on %p for %x\n",
t, m, objectHash(t, o));
}
if (m and m->owner() == t->systemThread) {
m->notifyAll(t->systemThread);
if (m and monitorOwner(t, m) == t) {
monitorNotifyAll(t, m);
} else {
t->exception = makeIllegalMonitorStateException(t);
}

View File

@ -111,6 +111,18 @@
(object first)
(object second))
(type monitor
(void* owner)
(void* waitHead)
(void* waitTail)
(object acquireHead)
(object acquireTail)
(unsigned depth))
(type monitorNode
(void* value)
(object next))
(type continuationContext
(object next)
(object before)