From 8120bee4dc5f9ae2dec75a907778f1479ad398bd Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Mon, 1 Feb 2010 17:56:09 -0700 Subject: [PATCH] reimplement Java object monitors to avoid running out of OS handles Due to SWT's nasty habit of creating a new object monitor for every task added to Display.asyncExec, we've found that, on Windows at least, we tend to run out of OS handles due to the large number of mutexes we create between garbage collections. One way to address this might be to trigger a GC when either the number of monitors created since the last GC exceeds a certain number or when the total number of monitors in the VM reaches a certain number. Both of these risk hurting performance, especially if they force major collections which would otherwise be infrequent. Also, it's hard to know what the values of such thresholds should be on a given system. Instead, we reimplement Java monitors using atomic compare-and-swap (CAS) and thread-specific native locks for blocking in the case of contention. This way, we can create an arbitrary number of monitors without creating any new native locks. The total number of native locks needed by the VM is bounded instead by the number of live threads plus a small constant. Note that if we ever add support for an architecture which does not support CAS, we'll need to provide a fallback monitor implementation. --- src/machine.cpp | 51 ++++---- src/machine.h | 316 +++++++++++++++++++++++++++++++++++++++++++++--- src/types.def | 12 ++ 3 files changed, 338 insertions(+), 41 deletions(-) diff --git a/src/machine.cpp b/src/machine.cpp index 8507e9c256..76953e480b 100644 --- a/src/machine.cpp +++ b/src/machine.cpp @@ -1808,17 +1808,13 @@ removeMonitor(Thread* t, object o) hash = objectHash(t, o); } - object p = hashMapRemove(t, t->m->monitorMap, o, objectHash, objectEqual); + object m = hashMapRemove(t, t->m->monitorMap, o, objectHash, objectEqual); - expect(t, p); + expect(t, m); if (DebugMonitors) { - fprintf(stderr, "dispose monitor %p for object %x\n", - static_cast(pointerValue(t, p)), - hash); + fprintf(stderr, "dispose monitor %p for object %x\n", m, hash); } - - static_cast(pointerValue(t, p))->dispose(); } void @@ -2185,9 +2181,11 @@ Thread::Thread(Machine* m, object javaThread, Thread* parent): parent(parent), peer((parent ? parent->child : 0)), child(0), + waitNext(0), state(NoState), criticalLevel(0), systemThread(0), + lock(0), javaThread(javaThread), exception(0), heapIndex(0), @@ -2201,6 +2199,7 @@ Thread::Thread(Machine* m, object javaThread, Thread* parent): backupHeap(0), backupHeapIndex(0), backupHeapSizeInWords(0), + waiting(false), tracing(false) #ifdef VM_STRESS , stress(false) @@ -2255,6 +2254,8 @@ Thread::init() parent->child = this; } + expect(this, m->system->success(m->system->make(&lock))); + if (javaThread == 0) { this->javaThread = makeJavaThread(this, parent); } @@ -2282,6 +2283,8 @@ Thread::exit() void Thread::dispose() { + lock->dispose(); + if (systemThread) { systemThread->dispose(); } @@ -3419,49 +3422,43 @@ addFinalizer(Thread* t, object target, void (*finalize)(Thread*, object)) t->m->finalizers = f; } -System::Monitor* +object objectMonitor(Thread* t, object o, bool createNew) { assert(t, t->state == Thread::ActiveState); - object p = hashMapFind(t, t->m->monitorMap, o, objectHash, objectEqual); + object m = hashMapFind(t, t->m->monitorMap, o, objectHash, objectEqual); - if (p) { + if (m) { if (DebugMonitors) { - fprintf(stderr, "found monitor %p for object %x\n", - static_cast(pointerValue(t, p)), - objectHash(t, o)); + fprintf(stderr, "found monitor %p for object %x\n", m, objectHash(t, o)); } - return static_cast(pointerValue(t, p)); + return m; } else if (createNew) { PROTECT(t, o); ENTER(t, Thread::ExclusiveState); - p = hashMapFind(t, t->m->monitorMap, o, objectHash, objectEqual); - if (p) { + m = hashMapFind(t, t->m->monitorMap, o, objectHash, objectEqual); + if (m) { if (DebugMonitors) { fprintf(stderr, "found monitor %p for object %x\n", - static_cast(pointerValue(t, p)), - objectHash(t, o)); + m, objectHash(t, o)); } - return static_cast(pointerValue(t, p)); + return m; } - System::Monitor* m; - System::Status s = t->m->system->make(&m); - expect(t, t->m->system->success(s)); + object head = makeMonitorNode(t, 0, 0); + object m = makeMonitor(t, 0, 0, 0, head, head, 0); + PROTECT(t, m); if (DebugMonitors) { - fprintf(stderr, "made monitor %p for object %x\n", - m, - objectHash(t, o)); + fprintf(stderr, "made monitor %p for object %x\n", m, objectHash(t, o)); } - p = makePointer(t, m); - hashMapInsert(t, t->m->monitorMap, o, p, objectHash); + hashMapInsert(t, t->m->monitorMap, o, m, objectHash); addFinalizer(t, o, removeMonitor); diff --git a/src/machine.h b/src/machine.h index fa8d99936d..24fe9ab70b 100644 --- a/src/machine.h +++ b/src/machine.h @@ -17,6 +17,7 @@ #include "finder.h" #include "processor.h" #include "constants.h" +#include "arch.h" #ifdef PLATFORM_WINDOWS # define JNICALL __stdcall @@ -1212,6 +1213,7 @@ class Machine { JNIEnvVTable jniEnvVTable; uintptr_t* heapPool[ThreadHeapPoolSize]; unsigned heapPoolIndex; + unsigned recentMonitorCount; }; void @@ -1345,9 +1347,11 @@ class Thread { Thread* parent; Thread* peer; Thread* child; + Thread* waitNext; State state; unsigned criticalLevel; System::Thread* systemThread; + System::Monitor* lock; object javaThread; object exception; unsigned heapIndex; @@ -1360,6 +1364,7 @@ class Thread { uintptr_t* backupHeap; unsigned backupHeapIndex; unsigned backupHeapSizeInWords; + bool waiting; bool tracing; #ifdef VM_STRESS bool stress; @@ -2296,7 +2301,288 @@ parameterFootprint(Thread* t, const char* s, bool static_); void addFinalizer(Thread* t, object target, void (*finalize)(Thread*, object)); -System::Monitor* +inline bool +monitorTryAcquire(Thread* t, object monitor) +{ + if (monitorOwner(t, monitor) == t + or atomicCompareAndSwap + (reinterpret_cast(&monitorOwner(t, monitor)), 0, + reinterpret_cast(t))) + { + ++ monitorDepth(t, monitor); + return true; + } else { + return false; + } +} + +inline bool +atomicCompareAndSwapObject(Thread* t, object target, unsigned offset, + object old, object new_) +{ + if (atomicCompareAndSwap(&cast(target, offset), + reinterpret_cast(old), + reinterpret_cast(new_))) + { + mark(t, target, offset); + return true; + } else { + return false; + } +} + +// The following two methods (monitorAtomicAppendAcquire and +// monitorAtomicPollAcquire) use the Michael and Scott Non-Blocking +// Queue Algorithm: http://www.cs.rochester.edu/u/michael/PODC96.html + +inline void +monitorAtomicAppendAcquire(Thread* t, object monitor) +{ + PROTECT(t, monitor); + + object node = makeMonitorNode(t, t, 0); + + while (true) { + object tail = monitorAcquireTail(t, monitor); + + loadMemoryBarrier(); + + object next = monitorNodeNext(t, tail); + + loadMemoryBarrier(); + + if (tail == monitorAcquireTail(t, monitor)) { + if (next) { + atomicCompareAndSwapObject + (t, monitor, MonitorAcquireTail, tail, next); + } else if (atomicCompareAndSwapObject + (t, tail, MonitorNodeNext, 0, node)) + { + atomicCompareAndSwapObject + (t, monitor, MonitorAcquireTail, tail, node); + return; + } + } + } +} + +inline Thread* +monitorAtomicPollAcquire(Thread* t, object monitor, bool remove) +{ + while (true) { + object head = monitorAcquireHead(t, monitor); + + loadMemoryBarrier(); + + object tail = monitorAcquireTail(t, monitor); + + loadMemoryBarrier(); + + object next = monitorNodeNext(t, head); + + loadMemoryBarrier(); + + if (head == monitorAcquireHead(t, monitor)) { + if (head == tail) { + if (next) { + atomicCompareAndSwapObject + (t, monitor, MonitorAcquireTail, tail, next); + } else { + return 0; + } + } else { + Thread* value = static_cast(monitorNodeValue(t, next)); + if ((not remove) + or atomicCompareAndSwapObject + (t, monitor, MonitorAcquireHead, head, next)) + { + return value; + } + } + } + } +} + +inline void +monitorAcquire(Thread* t, object monitor) +{ + if (not monitorTryAcquire(t, monitor)) { + PROTECT(t, monitor); + + ACQUIRE(t, t->lock); + + monitorAtomicAppendAcquire(t, monitor); + + // note that we don't try to acquire the lock until we're first in + // line, both because it's fair and because we don't support + // removing elements from arbitrary positions in the queue + + while (not (t == monitorAtomicPollAcquire(t, monitor, false) + and atomicCompareAndSwap + (reinterpret_cast(&monitorOwner(t, monitor)), 0, + reinterpret_cast(t)))) + { + ENTER(t, Thread::IdleState); + + t->lock->wait(t->systemThread, 0); + } + + expect(t, t == monitorAtomicPollAcquire(t, monitor, true)); + + ++ monitorDepth(t, monitor); + } +} + +inline void +monitorRelease(Thread* t, object monitor) +{ + expect(t, monitorOwner(t, monitor) == t); + + if (-- monitorDepth(t, monitor) == 0) { + monitorOwner(t, monitor) = 0; + + storeLoadMemoryBarrier(); + + Thread* next = monitorAtomicPollAcquire(t, monitor, false); + + if (next) { + ACQUIRE(t, next->lock); + + next->lock->notify(t->systemThread); + } + } +} + +inline void +monitorAppendWait(Thread* t, object monitor) +{ + expect(t, not t->waiting); + expect(t, t->waitNext == 0); + + t->waiting = true; + + if (monitorWaitTail(t, monitor)) { + static_cast(monitorWaitTail(t, monitor))->waitNext = t; + } else { + monitorWaitHead(t, monitor) = t; + } + + monitorWaitTail(t, monitor) = t; +} + +inline void +monitorRemoveWait(Thread* t, object monitor) +{ + Thread* previous = 0; + for (Thread* current = static_cast(monitorWaitHead(t, monitor)); + current;) + { + if (t == current) { + if (current == monitorWaitHead(t, monitor)) { + monitorWaitHead(t, monitor) = t->waitNext; + } else { + previous->waitNext = t->waitNext; + } + + if (current == monitorWaitTail(t, monitor)) { + monitorWaitTail(t, monitor) = previous; + } + + t->waitNext = 0; + t->waiting = false; + + return; + } else { + previous = current; + current = current->waitNext; + } + } + + abort(t); +} + +inline bool +monitorWait(Thread* t, object monitor, int64_t time) +{ + expect(t, monitorOwner(t, monitor) == t); + + bool interrupted; + unsigned depth; + bool stillWaiting; + + PROTECT(t, monitor); + + { ACQUIRE(t, t->lock); + + monitorAppendWait(t, monitor); + + depth = monitorDepth(t, monitor); + monitorDepth(t, monitor) = 1; + + monitorRelease(t, monitor); + + ENTER(t, Thread::IdleState); + + interrupted = t->lock->wait(t->systemThread, time); + + stillWaiting = t->waiting; + } + + monitorAcquire(t, monitor); + + if (stillWaiting) { + monitorRemoveWait(t, monitor); + } + + monitorOwner(t, monitor) = t; + monitorDepth(t, monitor) = depth; + + return interrupted; +} + +inline Thread* +monitorPollWait(Thread* t, object monitor) +{ + Thread* next = static_cast(monitorWaitHead(t, monitor)); + + if (next) { + monitorWaitHead(t, monitor) = next->waitNext; + next->waitNext = 0; + if (next == monitorWaitTail(t, monitor)) { + monitorWaitTail(t, monitor) = 0; + } + } + + return next; +} + +inline bool +monitorNotify(Thread* t, object monitor) +{ + expect(t, monitorOwner(t, monitor) == t); + + Thread* next = monitorPollWait(t, monitor); + + if (next) { + ACQUIRE_RAW(t, next->lock); + + next->waiting = false; + + next->lock->notify(t->systemThread); + + return true; + } else { + return false; + } +} + +inline void +monitorNotifyAll(Thread* t, object monitor) +{ + while (monitorNotify(t, monitor)) { } +} + +object objectMonitor(Thread* t, object o, bool createNew); inline void @@ -2307,14 +2593,14 @@ acquire(Thread* t, object o) hash = objectHash(t, o); } - System::Monitor* m = objectMonitor(t, o, true); + object m = objectMonitor(t, o, true); if (DebugMonitors) { fprintf(stderr, "thread %p acquires %p for %x\n", t, m, hash); } - acquire(t, m); + monitorAcquire(t, m); } inline void @@ -2325,14 +2611,14 @@ release(Thread* t, object o) hash = objectHash(t, o); } - System::Monitor* m = objectMonitor(t, o, false); + object m = objectMonitor(t, o, false); if (DebugMonitors) { fprintf(stderr, "thread %p releases %p for %x\n", t, m, hash); } - release(t, m); + monitorRelease(t, m); } inline void @@ -2343,17 +2629,19 @@ wait(Thread* t, object o, int64_t milliseconds) hash = objectHash(t, o); } - System::Monitor* m = objectMonitor(t, o, false); + object m = objectMonitor(t, o, false); if (DebugMonitors) { fprintf(stderr, "thread %p waits %d millis on %p for %x\n", t, static_cast(milliseconds), m, hash); } - if (m and m->owner() == t->systemThread) { + if (m and monitorOwner(t, m) == t) { + PROTECT(t, m); + bool interrupted; { ENTER(t, Thread::IdleState); - interrupted = m->wait(t->systemThread, milliseconds); + interrupted = monitorWait(t, m, milliseconds); } if (interrupted) { @@ -2379,15 +2667,15 @@ notify(Thread* t, object o) hash = objectHash(t, o); } - System::Monitor* m = objectMonitor(t, o, false); + object m = objectMonitor(t, o, false); if (DebugMonitors) { fprintf(stderr, "thread %p notifies on %p for %x\n", t, m, hash); } - if (m and m->owner() == t->systemThread) { - m->notify(t->systemThread); + if (m and monitorOwner(t, m) == t) { + monitorNotify(t, m); } else { t->exception = makeIllegalMonitorStateException(t); } @@ -2396,15 +2684,15 @@ notify(Thread* t, object o) inline void notifyAll(Thread* t, object o) { - System::Monitor* m = objectMonitor(t, o, false); + object m = objectMonitor(t, o, false); if (DebugMonitors) { fprintf(stderr, "thread %p notifies all on %p for %x\n", t, m, objectHash(t, o)); } - if (m and m->owner() == t->systemThread) { - m->notifyAll(t->systemThread); + if (m and monitorOwner(t, m) == t) { + monitorNotifyAll(t, m); } else { t->exception = makeIllegalMonitorStateException(t); } diff --git a/src/types.def b/src/types.def index 9a704c3b91..184df800a0 100644 --- a/src/types.def +++ b/src/types.def @@ -111,6 +111,18 @@ (object first) (object second)) +(type monitor + (void* owner) + (void* waitHead) + (void* waitTail) + (object acquireHead) + (object acquireTail) + (unsigned depth)) + +(type monitorNode + (void* value) + (object next)) + (type continuationContext (object next) (object before)