diff --git a/classpath/java/lang/Thread.java b/classpath/java/lang/Thread.java index 6110b4c91d..a8c3d45b93 100644 --- a/classpath/java/lang/Thread.java +++ b/classpath/java/lang/Thread.java @@ -7,14 +7,18 @@ public class Thread implements Runnable { private long peer; private final Runnable task; private Map locals; - private Object sleepLock; private boolean interrupted; + private Object sleepLock; public Thread(Runnable task) { this.task = task; } public synchronized void start() { + if (peer != 0) { + throw new IllegalStateException("thread already started"); + } + Map map = currentThread().locals; if (map != null) { for (Map.Entry e: map.entrySet()) { @@ -45,6 +49,26 @@ public class Thread implements Runnable { public static native Thread currentThread(); + private static native void interrupt(long peer); + + public synchronized void interrupt() { + if (peer != 0) { + interrupt(peer); + } else { + interrupted = true; + } + } + + public static boolean interrupted() { + Thread t = currentThread(); + + synchronized (t) { + boolean v = t.interrupted; + t.interrupted = false; + return v; + } + } + public static void sleep(long milliseconds) throws InterruptedException { Thread t = currentThread(); if (t.sleepLock == null) { diff --git a/src/builtin.cpp b/src/builtin.cpp index 31f7b4e417..69f102738b 100644 --- a/src/builtin.cpp +++ b/src/builtin.cpp @@ -598,51 +598,31 @@ Thread_currentThread(Thread* t, jclass) } void -Thread_start(Thread* t, jobject this_) +Thread_doStart(Thread* t, jobject this_) { - Thread* p = reinterpret_cast(threadPeer(t, *this_)); - if (p) { - object message = makeString(t, "thread already started"); - t->exception = makeIllegalStateException(t, message); - } else { - p = new (t->vm->system->allocate(sizeof(Thread))) Thread(t->vm, *this_, t); + Thread* p = new (t->vm->system->allocate(sizeof(Thread))) + Thread(t->vm, *this_, t); - enter(p, Thread::ActiveState); + enter(p, Thread::ActiveState); - class Runnable: public System::Runnable { - public: - Runnable(System* s, Thread* t): s(s), t(t) { } + threadPeer(t, *this_) = reinterpret_cast(p); - virtual void run(System::Thread* st) { - t->systemThread = st; + if (not t->vm->system->success(t->vm->system->start(&(p->runnable)))) { + threadPeer(t, *this_) = -1; - vm::run(t, "java/lang/Thread", "run", "()V", t->javaThread); + p->exit(); - if (t->exception) { - printTrace(t, t->exception); - } - - t->exit(); - } - - virtual void dispose() { - s->free(this); - } - - System* s; - Thread* t; - }* r = new (t->vm->system->allocate(sizeof(Runnable))) - Runnable(t->vm->system, p); - - if (not t->vm->system->success(t->vm->system->start(r))) { - p->exit(); - - object message = makeString(t, "unable to start native thread"); - t->exception = makeRuntimeException(t, message); - } + object message = makeString(t, "unable to start native thread"); + t->exception = makeRuntimeException(t, message); } } +void +Thread_interrupt(Thread* t, jclass, jlong peer) +{ + interrupt(t, reinterpret_cast(peer)); +} + } // namespace namespace vm { @@ -670,7 +650,9 @@ populateBuiltinMap(Thread* t, object map) reinterpret_cast(::Runtime_exit) }, { "Java_java_lang_Thread_doStart", - reinterpret_cast(::Thread_start) }, + reinterpret_cast(::Thread_doStart) }, + { "Java_java_lang_Thread_interrupt", + reinterpret_cast(::Thread_interrupt) }, { "Java_java_lang_Thread_currentThread", reinterpret_cast(::Thread_currentThread) }, diff --git a/src/machine.cpp b/src/machine.cpp index c4c061bf0d..8e48e72dbb 100644 --- a/src/machine.cpp +++ b/src/machine.cpp @@ -1322,7 +1322,8 @@ Thread::Thread(Machine* m, object javaThread, Thread* parent): sp(0), frame(-1), heapIndex(0), - protector(0) + protector(0), + runnable(this) #ifdef VM_STRESS , stress(false), heap(static_cast(m->system->allocate(HeapSizeInBytes))) @@ -1335,7 +1336,7 @@ Thread::Thread(Machine* m, object javaThread, Thread* parent): m->rootThread = this; m->unsafe = true; - if (not m->system->success(m->system->attach(&systemThread))) { + if (not m->system->success(m->system->attach(&runnable))) { abort(this); } @@ -1472,7 +1473,7 @@ enter(Thread* t, Thread::State s) t->vm->exclusive = t; while (t->vm->activeCount > 1) { - t->vm->stateLock->wait(t, 0); + t->vm->stateLock->wait(t->systemThread, 0); } } break; @@ -1498,7 +1499,7 @@ enter(Thread* t, Thread::State s) } t->state = s; - t->vm->stateLock->notifyAll(t); + t->vm->stateLock->notifyAll(t->systemThread); } break; case Thread::ActiveState: { @@ -1509,13 +1510,13 @@ enter(Thread* t, Thread::State s) t->state = s; t->vm->exclusive = 0; - t->vm->stateLock->notifyAll(t); + t->vm->stateLock->notifyAll(t->systemThread); } break; case Thread::NoState: case Thread::IdleState: { while (t->vm->exclusive) { - t->vm->stateLock->wait(t, 0); + t->vm->stateLock->wait(t->systemThread, 0); } ++ t->vm->activeCount; @@ -1547,7 +1548,7 @@ enter(Thread* t, Thread::State s) t->state = s; while (t->vm->liveCount > 1) { - t->vm->stateLock->wait(t, 0); + t->vm->stateLock->wait(t->systemThread, 0); } } break; diff --git a/src/machine.h b/src/machine.h index 015097c913..961d861c1a 100644 --- a/src/machine.h +++ b/src/machine.h @@ -22,7 +22,7 @@ namespace vm { const bool Verbose = false; const bool DebugRun = false; const bool DebugStack = false; -const bool DebugMonitors = true; +const bool DebugMonitors = false; const uintptr_t HashTakenMark = 1; const uintptr_t ExtendedMark = 2; @@ -1124,6 +1124,16 @@ class Machine { JNIEnvVTable jniEnvVTable; }; +object +run(Thread* t, const char* className, const char* methodName, + const char* methodSpec, object this_, ...); + +void +printTrace(Thread* t, object exception); + +uint8_t& +threadInterrupted(Thread* t, object thread); + class Thread { public: enum State { @@ -1151,6 +1161,35 @@ class Thread { Protector* next; }; + class Runnable: public System::Runnable { + public: + Runnable(Thread* t): t(t) { } + + virtual void attach(System::Thread* st) { + t->systemThread = st; + } + + virtual void run() { + vm::run(t, "java/lang/Thread", "run", "()V", t->javaThread); + + if (t->exception) { + printTrace(t, t->exception); + } + + t->exit(); + } + + virtual bool interrupted() { + return threadInterrupted(t, t->javaThread); + } + + virtual void setInterrupted(bool v) { + threadInterrupted(t, t->javaThread) = v; + } + + Thread* t; + }; + static const unsigned HeapSizeInBytes = 64 * 1024; static const unsigned StackSizeInBytes = 64 * 1024; @@ -1178,6 +1217,7 @@ class Thread { int frame; unsigned heapIndex; Protector* protector; + Runnable runnable; #ifdef VM_STRESS bool stress; object* heap; @@ -1248,13 +1288,13 @@ class MonitorResource { MonitorResource(Thread* t, System::Monitor* m): t(t), m(m) { stress(t); - if (not m->tryAcquire(t)) { + if (not m->tryAcquire(t->systemThread)) { ENTER(t, Thread::IdleState); - m->acquire(t); + m->acquire(t->systemThread); } } - ~MonitorResource() { m->release(t); } + ~MonitorResource() { m->release(t->systemThread); } private: Thread* t; @@ -1264,10 +1304,10 @@ class MonitorResource { class RawMonitorResource { public: RawMonitorResource(Thread* t, System::Monitor* m): t(t), m(m) { - m->acquire(t); + m->acquire(t->systemThread); } - ~RawMonitorResource() { m->release(t); } + ~RawMonitorResource() { m->release(t->systemThread); } private: Thread* t; @@ -1440,6 +1480,12 @@ makeInvocationTargetException(Thread* t, object targetException) return makeRuntimeException(t, 0, trace, targetException); } +inline object +makeInterruptedException(Thread* t) +{ + return makeInterruptedException(t, 0, makeTrace(t), 0); +} + inline object makeStackOverflowError(Thread* t) { @@ -1986,9 +2032,9 @@ acquire(Thread* t, object o) t, m, objectHash(t, o)); } - if (not m->tryAcquire(t)) { + if (not m->tryAcquire(t->systemThread)) { ENTER(t, Thread::IdleState); - m->acquire(t); + m->acquire(t->systemThread); } stress(t); @@ -2004,7 +2050,7 @@ release(Thread* t, object o) t, m, objectHash(t, o)); } - m->release(t); + m->release(t->systemThread); } inline void @@ -2017,9 +2063,13 @@ wait(Thread* t, object o, int64_t milliseconds) t, milliseconds, m, objectHash(t, o)); } - if (m->owner() == t) { + if (m->owner() == t->systemThread) { ENTER(t, Thread::IdleState); - m->wait(t, milliseconds); + + bool interrupted = m->wait(t->systemThread, milliseconds); + if (interrupted) { + t->exception = makeInterruptedException(t); + } } else { t->exception = makeIllegalMonitorStateException(t); } @@ -2032,12 +2082,6 @@ wait(Thread* t, object o, int64_t milliseconds) stress(t); } -inline void -vmWait(Thread* t, object o, int64_t milliseconds) -{ - wait(t, o, milliseconds); -} - inline void notify(Thread* t, object o) { @@ -2048,19 +2092,13 @@ notify(Thread* t, object o) t, m, objectHash(t, o)); } - if (m->owner() == t) { - m->notify(t); + if (m->owner() == t->systemThread) { + m->notify(t->systemThread); } else { t->exception = makeIllegalMonitorStateException(t); } } -inline void -vmNotify(Thread* t, object o) -{ - notify(t, o); -} - inline void notifyAll(Thread* t, object o) { @@ -2071,22 +2109,19 @@ notifyAll(Thread* t, object o) t, m, objectHash(t, o)); } - if (m->owner() == t) { - m->notifyAll(t); + if (m->owner() == t->systemThread) { + m->notifyAll(t->systemThread); } else { t->exception = makeIllegalMonitorStateException(t); } } inline void -vmNotifyAll(Thread* t, object o) +interrupt(Thread*, Thread* target) { - notifyAll(t, o); + target->systemThread->interrupt(); } -void -printTrace(Thread* t, object exception); - void exit(Thread* t); diff --git a/src/system.cpp b/src/system.cpp index 9353515c9d..5fb504b14d 100644 --- a/src/system.cpp +++ b/src/system.cpp @@ -7,10 +7,14 @@ #include "dlfcn.h" #include "errno.h" #include "pthread.h" +#include "signal.h" #include "stdint.h" #include "system.h" +#define ACQUIRE(x) MutexResource MAKE_NAME(mutexResource_) (x) + + #ifdef __i386__ extern "C" uint64_t @@ -86,6 +90,28 @@ using namespace vm; namespace { +class MutexResource { + public: + MutexResource(pthread_mutex_t& m): m(&m) { + pthread_mutex_lock(&m); + } + + ~MutexResource() { + pthread_mutex_unlock(m); + } + + private: + pthread_mutex_t* m; +}; + +const int InterruptSignal = SIGUSR2; + +void +handleSignal(int) +{ + // ignore +} + int64_t now() { @@ -96,22 +122,39 @@ now() } void* -run(void* t) +run(void* r) { - static_cast(t)->run(); + static_cast(r)->run(); return 0; } const bool Verbose = false; +const unsigned Waiting = 1 << 0; +const unsigned Notified = 1 << 1; + class MySystem: public System { public: class Thread: public System::Thread { public: - Thread(System* s, System::Runnable* r): s(s), r(r) { } + Thread(System* s, System::Runnable* r): + s(s), + r(r), + next(0), + flags(0) + { + pthread_mutex_init(&mutex, 0); + pthread_cond_init(&condition, 0); + } - virtual void run() { - r->run(this); + virtual void interrupt() { + ACQUIRE(mutex); + + r->setInterrupted(true); + + if (flags & Waiting) { + pthread_kill(thread, InterruptSignal); + } } virtual void join() { @@ -120,26 +163,28 @@ class MySystem: public System { } virtual void dispose() { - if (r) { - r->dispose(); - } s->free(this); } + pthread_t thread; + pthread_mutex_t mutex; + pthread_cond_t condition; System* s; System::Runnable* r; - pthread_t thread; + Thread* next; + unsigned flags; }; class Monitor: public System::Monitor { public: - Monitor(System* s): s(s), context(0), depth(0) { - pthread_mutex_init(&mutex, 0); - pthread_cond_init(&condition, 0); + Monitor(System* s): s(s), owner_(0), first(0), last(0), depth(0) { + pthread_mutex_init(&mutex, 0); } - virtual bool tryAcquire(void* context) { - if (this->context == context) { + virtual bool tryAcquire(System::Thread* context) { + Thread* t = static_cast(context); + + if (owner_ == t) { ++ depth; return true; } else { @@ -148,7 +193,7 @@ class MySystem: public System { return false; case 0: - this->context = context; + owner_ = t; ++ depth; return true; @@ -158,18 +203,22 @@ class MySystem: public System { } } - virtual void acquire(void* context) { - if (this->context != context) { + virtual void acquire(System::Thread* context) { + Thread* t = static_cast(context); + + if (owner_ != t) { pthread_mutex_lock(&mutex); - this->context = context; + owner_ = t; } ++ depth; } - virtual void release(void* context) { - if (this->context == context) { + virtual void release(System::Thread* context) { + Thread* t = static_cast(context); + + if (owner_ == t) { if (-- depth == 0) { - this->context = 0; + owner_ = 0; pthread_mutex_unlock(&mutex); } } else { @@ -177,60 +226,139 @@ class MySystem: public System { } } - virtual void wait(void* context, int64_t time) { - if (this->context == context) { + void append(Thread* t) { + if (last) { + last->next = t; + } else { + first = last = t; + } + } + + void remove(Thread* t) { + for (Thread** p = &first; *p;) { + if (t == *p) { + *p = t->next; + if (last == t) { + last = 0; + } + break; + } else { + p = &((*p)->next); + } + } + } + + virtual bool wait(System::Thread* context, int64_t time) { + Thread* t = static_cast(context); + + if (owner_ == t) { + ACQUIRE(t->mutex); + + if (t->r->interrupted()) { + t->r->setInterrupted(false); + return true; + } + + t->flags |= Waiting; + + append(t); + unsigned depth = this->depth; this->depth = 0; - this->context = 0; + owner_ = 0; + pthread_mutex_unlock(&mutex); + if (time) { int64_t then = now() + time; timespec ts = { then / 1000, (then % 1000) * 1000 * 1000 }; - int rv = pthread_cond_timedwait(&condition, &mutex, &ts); - assert(s, rv == 0 or rv == ETIMEDOUT); + int rv = pthread_cond_timedwait + (&(t->condition), &(t->mutex), &ts); + assert(s, rv == 0 or rv == ETIMEDOUT or rv == EINTR); } else { - int rv = pthread_cond_wait(&condition, &mutex); - assert(s, rv == 0); + int rv = pthread_cond_wait(&(t->condition), &(t->mutex)); + assert(s, rv == 0 or rv == EINTR); } - this->context = context; + + pthread_mutex_lock(&mutex); + owner_ = t; this->depth = depth; + + if ((t->flags & Notified) == 0) { + remove(t); + } + + t->flags = 0; + + if (t->r->interrupted()) { + t->r->setInterrupted(false); + return true; + } else { + return false; + } } else { sysAbort(s); } } - virtual void notify(void* context) { - if (this->context == context) { - int rv = pthread_cond_signal(&condition); - assert(s, rv == 0); + void doNotify(Thread* t) { + ACQUIRE(t->mutex); + + t->flags |= Notified; + int rv = pthread_cond_signal(&(t->condition)); + assert(s, rv == 0); + } + + virtual void notify(System::Thread* context) { + Thread* t = static_cast(context); + + if (owner_ == t) { + if (first) { + Thread* t = first; + first = first->next; + if (t == last) { + last = 0; + } + + doNotify(t); + } } else { sysAbort(s); } } - virtual void notifyAll(void* context) { - if (this->context == context) { - int rv = pthread_cond_broadcast(&condition); - assert(s, rv == 0); + virtual void notifyAll(System::Thread* context) { + Thread* t = static_cast(context); + + if (owner_ == t) { + for (Thread** p = &first; *p;) { + Thread* t = *p; + p = &(t->next); + if (t == last) { + last = 0; + } + + doNotify(t); + } } else { sysAbort(s); } } - virtual void* owner() { - return context; + virtual System::Thread* owner() { + return owner_; } virtual void dispose() { - assert(s, context == 0); + assert(s, owner_ == 0); pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&condition); s->free(this); } System* s; pthread_mutex_t mutex; - pthread_cond_t condition; - void* context; + Thread* owner_; + Thread* first; + Thread* last; unsigned depth; }; @@ -278,6 +406,14 @@ class MySystem: public System { MySystem(unsigned limit): limit(limit), count(0) { pthread_mutex_init(&mutex, 0); + + struct sigaction sa; + memset(&sa, 0, sizeof(struct sigaction)); + sigemptyset(&(sa.sa_mask)); + sa.sa_handler = handleSignal; + + int rv = sigaction(InterruptSignal, &sa, 0); + assert(this, rv == 0); } virtual bool success(Status s) { @@ -332,16 +468,17 @@ class MySystem: public System { pthread_mutex_unlock(&mutex); } - virtual Status attach(System::Thread** tp) { - Thread* t = new (System::allocate(sizeof(Thread))) Thread(this, 0); + virtual Status attach(Runnable* r) { + Thread* t = new (System::allocate(sizeof(Thread))) Thread(this, r); t->thread = pthread_self(); - *tp = t; + r->attach(t); return 0; } virtual Status start(Runnable* r) { Thread* t = new (System::allocate(sizeof(Thread))) Thread(this, r); - int rv = pthread_create(&(t->thread), 0, run, t); + r->attach(t); + int rv = pthread_create(&(t->thread), 0, run, r); assert(this, rv == 0); return 0; } diff --git a/src/system.h b/src/system.h index fcb6956565..de8afadf04 100644 --- a/src/system.h +++ b/src/system.h @@ -20,7 +20,7 @@ class System: public Allocator { class Thread { public: virtual ~Thread() { } - virtual void run() = 0; + virtual void interrupt() = 0; virtual void join() = 0; virtual void dispose() = 0; }; @@ -28,20 +28,22 @@ class System: public Allocator { class Runnable { public: virtual ~Runnable() { } - virtual void run(Thread*) = 0; - virtual void dispose() = 0; + virtual void attach(Thread*) = 0; + virtual void run() = 0; + virtual bool interrupted() = 0; + virtual void setInterrupted(bool v) = 0; }; class Monitor { public: virtual ~Monitor() { } - virtual bool tryAcquire(void* context) = 0; - virtual void acquire(void* context) = 0; - virtual void release(void* context) = 0; - virtual void wait(void* context, int64_t time) = 0; - virtual void notify(void* context) = 0; - virtual void notifyAll(void* context) = 0; - virtual void* owner() = 0; + virtual bool tryAcquire(Thread* context) = 0; + virtual void acquire(Thread* context) = 0; + virtual void release(Thread* context) = 0; + virtual bool wait(Thread* context, int64_t time) = 0; + virtual void notify(Thread* context) = 0; + virtual void notifyAll(Thread* context) = 0; + virtual Thread* owner() = 0; virtual void dispose() = 0; }; @@ -57,7 +59,7 @@ class System: public Allocator { virtual ~System() { } virtual bool success(Status) = 0; - virtual Status attach(Thread**) = 0; + virtual Status attach(Runnable*) = 0; virtual Status start(Runnable*) = 0; virtual Status make(Monitor**) = 0; virtual uint64_t call(void* function, uintptr_t* arguments, uint8_t* types, @@ -97,16 +99,20 @@ expect(System* s, bool v) } #ifdef NDEBUG + inline void assert(System*, bool) { } -#else + +#else // not NDEBUG + inline void assert(System* s, bool v) { expect(s, v); } -#endif + +#endif // not NDEBUG System* makeSystem(unsigned heapSize); diff --git a/src/types.def b/src/types.def index f5ff26d9f0..dee9bccdc9 100644 --- a/src/types.def +++ b/src/types.def @@ -128,8 +128,8 @@ (int64_t peer) (object task) (object locals) - (object sleepLock) - (uint8_t interrupted)) + (uint8_t interrupted) + (object sleepLock)) (type stackTraceElement java/lang/StackTraceElement (extends jobject) @@ -181,6 +181,9 @@ (type invocationTargetException java/lang/InvocationTargetException (extends exception)) +(type interruptedException java/lang/InterruptedException + (extends exception)) + (type error java/lang/Error (extends throwable))