implement Thread.interrupt()

This commit is contained in:
Joel Dice 2007-07-28 15:28:25 -06:00
parent abd9c2bc8d
commit 0e373727a2
7 changed files with 327 additions and 139 deletions

View File

@ -7,14 +7,18 @@ public class Thread implements Runnable {
private long peer;
private final Runnable task;
private Map<ThreadLocal, Object> locals;
private Object sleepLock;
private boolean interrupted;
private Object sleepLock;
public Thread(Runnable task) {
this.task = task;
}
public synchronized void start() {
if (peer != 0) {
throw new IllegalStateException("thread already started");
}
Map<ThreadLocal, Object> map = currentThread().locals;
if (map != null) {
for (Map.Entry<ThreadLocal, Object> e: map.entrySet()) {
@ -45,6 +49,26 @@ public class Thread implements Runnable {
public static native Thread currentThread();
private static native void interrupt(long peer);
public synchronized void interrupt() {
if (peer != 0) {
interrupt(peer);
} else {
interrupted = true;
}
}
public static boolean interrupted() {
Thread t = currentThread();
synchronized (t) {
boolean v = t.interrupted;
t.interrupted = false;
return v;
}
}
public static void sleep(long milliseconds) throws InterruptedException {
Thread t = currentThread();
if (t.sleepLock == null) {

View File

@ -598,51 +598,31 @@ Thread_currentThread(Thread* t, jclass)
}
void
Thread_start(Thread* t, jobject this_)
Thread_doStart(Thread* t, jobject this_)
{
Thread* p = reinterpret_cast<Thread*>(threadPeer(t, *this_));
if (p) {
object message = makeString(t, "thread already started");
t->exception = makeIllegalStateException(t, message);
} else {
p = new (t->vm->system->allocate(sizeof(Thread))) Thread(t->vm, *this_, t);
Thread* p = new (t->vm->system->allocate(sizeof(Thread)))
Thread(t->vm, *this_, t);
enter(p, Thread::ActiveState);
enter(p, Thread::ActiveState);
class Runnable: public System::Runnable {
public:
Runnable(System* s, Thread* t): s(s), t(t) { }
threadPeer(t, *this_) = reinterpret_cast<jlong>(p);
virtual void run(System::Thread* st) {
t->systemThread = st;
if (not t->vm->system->success(t->vm->system->start(&(p->runnable)))) {
threadPeer(t, *this_) = -1;
vm::run(t, "java/lang/Thread", "run", "()V", t->javaThread);
p->exit();
if (t->exception) {
printTrace(t, t->exception);
}
t->exit();
}
virtual void dispose() {
s->free(this);
}
System* s;
Thread* t;
}* r = new (t->vm->system->allocate(sizeof(Runnable)))
Runnable(t->vm->system, p);
if (not t->vm->system->success(t->vm->system->start(r))) {
p->exit();
object message = makeString(t, "unable to start native thread");
t->exception = makeRuntimeException(t, message);
}
object message = makeString(t, "unable to start native thread");
t->exception = makeRuntimeException(t, message);
}
}
void
Thread_interrupt(Thread* t, jclass, jlong peer)
{
interrupt(t, reinterpret_cast<Thread*>(peer));
}
} // namespace
namespace vm {
@ -670,7 +650,9 @@ populateBuiltinMap(Thread* t, object map)
reinterpret_cast<void*>(::Runtime_exit) },
{ "Java_java_lang_Thread_doStart",
reinterpret_cast<void*>(::Thread_start) },
reinterpret_cast<void*>(::Thread_doStart) },
{ "Java_java_lang_Thread_interrupt",
reinterpret_cast<void*>(::Thread_interrupt) },
{ "Java_java_lang_Thread_currentThread",
reinterpret_cast<void*>(::Thread_currentThread) },

View File

@ -1322,7 +1322,8 @@ Thread::Thread(Machine* m, object javaThread, Thread* parent):
sp(0),
frame(-1),
heapIndex(0),
protector(0)
protector(0),
runnable(this)
#ifdef VM_STRESS
, stress(false),
heap(static_cast<object*>(m->system->allocate(HeapSizeInBytes)))
@ -1335,7 +1336,7 @@ Thread::Thread(Machine* m, object javaThread, Thread* parent):
m->rootThread = this;
m->unsafe = true;
if (not m->system->success(m->system->attach(&systemThread))) {
if (not m->system->success(m->system->attach(&runnable))) {
abort(this);
}
@ -1472,7 +1473,7 @@ enter(Thread* t, Thread::State s)
t->vm->exclusive = t;
while (t->vm->activeCount > 1) {
t->vm->stateLock->wait(t, 0);
t->vm->stateLock->wait(t->systemThread, 0);
}
} break;
@ -1498,7 +1499,7 @@ enter(Thread* t, Thread::State s)
}
t->state = s;
t->vm->stateLock->notifyAll(t);
t->vm->stateLock->notifyAll(t->systemThread);
} break;
case Thread::ActiveState: {
@ -1509,13 +1510,13 @@ enter(Thread* t, Thread::State s)
t->state = s;
t->vm->exclusive = 0;
t->vm->stateLock->notifyAll(t);
t->vm->stateLock->notifyAll(t->systemThread);
} break;
case Thread::NoState:
case Thread::IdleState: {
while (t->vm->exclusive) {
t->vm->stateLock->wait(t, 0);
t->vm->stateLock->wait(t->systemThread, 0);
}
++ t->vm->activeCount;
@ -1547,7 +1548,7 @@ enter(Thread* t, Thread::State s)
t->state = s;
while (t->vm->liveCount > 1) {
t->vm->stateLock->wait(t, 0);
t->vm->stateLock->wait(t->systemThread, 0);
}
} break;

View File

@ -22,7 +22,7 @@ namespace vm {
const bool Verbose = false;
const bool DebugRun = false;
const bool DebugStack = false;
const bool DebugMonitors = true;
const bool DebugMonitors = false;
const uintptr_t HashTakenMark = 1;
const uintptr_t ExtendedMark = 2;
@ -1124,6 +1124,16 @@ class Machine {
JNIEnvVTable jniEnvVTable;
};
object
run(Thread* t, const char* className, const char* methodName,
const char* methodSpec, object this_, ...);
void
printTrace(Thread* t, object exception);
uint8_t&
threadInterrupted(Thread* t, object thread);
class Thread {
public:
enum State {
@ -1151,6 +1161,35 @@ class Thread {
Protector* next;
};
class Runnable: public System::Runnable {
public:
Runnable(Thread* t): t(t) { }
virtual void attach(System::Thread* st) {
t->systemThread = st;
}
virtual void run() {
vm::run(t, "java/lang/Thread", "run", "()V", t->javaThread);
if (t->exception) {
printTrace(t, t->exception);
}
t->exit();
}
virtual bool interrupted() {
return threadInterrupted(t, t->javaThread);
}
virtual void setInterrupted(bool v) {
threadInterrupted(t, t->javaThread) = v;
}
Thread* t;
};
static const unsigned HeapSizeInBytes = 64 * 1024;
static const unsigned StackSizeInBytes = 64 * 1024;
@ -1178,6 +1217,7 @@ class Thread {
int frame;
unsigned heapIndex;
Protector* protector;
Runnable runnable;
#ifdef VM_STRESS
bool stress;
object* heap;
@ -1248,13 +1288,13 @@ class MonitorResource {
MonitorResource(Thread* t, System::Monitor* m): t(t), m(m) {
stress(t);
if (not m->tryAcquire(t)) {
if (not m->tryAcquire(t->systemThread)) {
ENTER(t, Thread::IdleState);
m->acquire(t);
m->acquire(t->systemThread);
}
}
~MonitorResource() { m->release(t); }
~MonitorResource() { m->release(t->systemThread); }
private:
Thread* t;
@ -1264,10 +1304,10 @@ class MonitorResource {
class RawMonitorResource {
public:
RawMonitorResource(Thread* t, System::Monitor* m): t(t), m(m) {
m->acquire(t);
m->acquire(t->systemThread);
}
~RawMonitorResource() { m->release(t); }
~RawMonitorResource() { m->release(t->systemThread); }
private:
Thread* t;
@ -1440,6 +1480,12 @@ makeInvocationTargetException(Thread* t, object targetException)
return makeRuntimeException(t, 0, trace, targetException);
}
inline object
makeInterruptedException(Thread* t)
{
return makeInterruptedException(t, 0, makeTrace(t), 0);
}
inline object
makeStackOverflowError(Thread* t)
{
@ -1986,9 +2032,9 @@ acquire(Thread* t, object o)
t, m, objectHash(t, o));
}
if (not m->tryAcquire(t)) {
if (not m->tryAcquire(t->systemThread)) {
ENTER(t, Thread::IdleState);
m->acquire(t);
m->acquire(t->systemThread);
}
stress(t);
@ -2004,7 +2050,7 @@ release(Thread* t, object o)
t, m, objectHash(t, o));
}
m->release(t);
m->release(t->systemThread);
}
inline void
@ -2017,9 +2063,13 @@ wait(Thread* t, object o, int64_t milliseconds)
t, milliseconds, m, objectHash(t, o));
}
if (m->owner() == t) {
if (m->owner() == t->systemThread) {
ENTER(t, Thread::IdleState);
m->wait(t, milliseconds);
bool interrupted = m->wait(t->systemThread, milliseconds);
if (interrupted) {
t->exception = makeInterruptedException(t);
}
} else {
t->exception = makeIllegalMonitorStateException(t);
}
@ -2032,12 +2082,6 @@ wait(Thread* t, object o, int64_t milliseconds)
stress(t);
}
inline void
vmWait(Thread* t, object o, int64_t milliseconds)
{
wait(t, o, milliseconds);
}
inline void
notify(Thread* t, object o)
{
@ -2048,19 +2092,13 @@ notify(Thread* t, object o)
t, m, objectHash(t, o));
}
if (m->owner() == t) {
m->notify(t);
if (m->owner() == t->systemThread) {
m->notify(t->systemThread);
} else {
t->exception = makeIllegalMonitorStateException(t);
}
}
inline void
vmNotify(Thread* t, object o)
{
notify(t, o);
}
inline void
notifyAll(Thread* t, object o)
{
@ -2071,22 +2109,19 @@ notifyAll(Thread* t, object o)
t, m, objectHash(t, o));
}
if (m->owner() == t) {
m->notifyAll(t);
if (m->owner() == t->systemThread) {
m->notifyAll(t->systemThread);
} else {
t->exception = makeIllegalMonitorStateException(t);
}
}
inline void
vmNotifyAll(Thread* t, object o)
interrupt(Thread*, Thread* target)
{
notifyAll(t, o);
target->systemThread->interrupt();
}
void
printTrace(Thread* t, object exception);
void
exit(Thread* t);

View File

@ -7,10 +7,14 @@
#include "dlfcn.h"
#include "errno.h"
#include "pthread.h"
#include "signal.h"
#include "stdint.h"
#include "system.h"
#define ACQUIRE(x) MutexResource MAKE_NAME(mutexResource_) (x)
#ifdef __i386__
extern "C" uint64_t
@ -86,6 +90,28 @@ using namespace vm;
namespace {
class MutexResource {
public:
MutexResource(pthread_mutex_t& m): m(&m) {
pthread_mutex_lock(&m);
}
~MutexResource() {
pthread_mutex_unlock(m);
}
private:
pthread_mutex_t* m;
};
const int InterruptSignal = SIGUSR2;
void
handleSignal(int)
{
// ignore
}
int64_t
now()
{
@ -96,22 +122,39 @@ now()
}
void*
run(void* t)
run(void* r)
{
static_cast<System::Thread*>(t)->run();
static_cast<System::Runnable*>(r)->run();
return 0;
}
const bool Verbose = false;
const unsigned Waiting = 1 << 0;
const unsigned Notified = 1 << 1;
class MySystem: public System {
public:
class Thread: public System::Thread {
public:
Thread(System* s, System::Runnable* r): s(s), r(r) { }
Thread(System* s, System::Runnable* r):
s(s),
r(r),
next(0),
flags(0)
{
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&condition, 0);
}
virtual void run() {
r->run(this);
virtual void interrupt() {
ACQUIRE(mutex);
r->setInterrupted(true);
if (flags & Waiting) {
pthread_kill(thread, InterruptSignal);
}
}
virtual void join() {
@ -120,26 +163,28 @@ class MySystem: public System {
}
virtual void dispose() {
if (r) {
r->dispose();
}
s->free(this);
}
pthread_t thread;
pthread_mutex_t mutex;
pthread_cond_t condition;
System* s;
System::Runnable* r;
pthread_t thread;
Thread* next;
unsigned flags;
};
class Monitor: public System::Monitor {
public:
Monitor(System* s): s(s), context(0), depth(0) {
Monitor(System* s): s(s), owner_(0), first(0), last(0), depth(0) {
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&condition, 0);
}
virtual bool tryAcquire(void* context) {
if (this->context == context) {
virtual bool tryAcquire(System::Thread* context) {
Thread* t = static_cast<Thread*>(context);
if (owner_ == t) {
++ depth;
return true;
} else {
@ -148,7 +193,7 @@ class MySystem: public System {
return false;
case 0:
this->context = context;
owner_ = t;
++ depth;
return true;
@ -158,18 +203,22 @@ class MySystem: public System {
}
}
virtual void acquire(void* context) {
if (this->context != context) {
virtual void acquire(System::Thread* context) {
Thread* t = static_cast<Thread*>(context);
if (owner_ != t) {
pthread_mutex_lock(&mutex);
this->context = context;
owner_ = t;
}
++ depth;
}
virtual void release(void* context) {
if (this->context == context) {
virtual void release(System::Thread* context) {
Thread* t = static_cast<Thread*>(context);
if (owner_ == t) {
if (-- depth == 0) {
this->context = 0;
owner_ = 0;
pthread_mutex_unlock(&mutex);
}
} else {
@ -177,60 +226,139 @@ class MySystem: public System {
}
}
virtual void wait(void* context, int64_t time) {
if (this->context == context) {
void append(Thread* t) {
if (last) {
last->next = t;
} else {
first = last = t;
}
}
void remove(Thread* t) {
for (Thread** p = &first; *p;) {
if (t == *p) {
*p = t->next;
if (last == t) {
last = 0;
}
break;
} else {
p = &((*p)->next);
}
}
}
virtual bool wait(System::Thread* context, int64_t time) {
Thread* t = static_cast<Thread*>(context);
if (owner_ == t) {
ACQUIRE(t->mutex);
if (t->r->interrupted()) {
t->r->setInterrupted(false);
return true;
}
t->flags |= Waiting;
append(t);
unsigned depth = this->depth;
this->depth = 0;
this->context = 0;
owner_ = 0;
pthread_mutex_unlock(&mutex);
if (time) {
int64_t then = now() + time;
timespec ts = { then / 1000, (then % 1000) * 1000 * 1000 };
int rv = pthread_cond_timedwait(&condition, &mutex, &ts);
assert(s, rv == 0 or rv == ETIMEDOUT);
int rv = pthread_cond_timedwait
(&(t->condition), &(t->mutex), &ts);
assert(s, rv == 0 or rv == ETIMEDOUT or rv == EINTR);
} else {
int rv = pthread_cond_wait(&condition, &mutex);
assert(s, rv == 0);
int rv = pthread_cond_wait(&(t->condition), &(t->mutex));
assert(s, rv == 0 or rv == EINTR);
}
this->context = context;
pthread_mutex_lock(&mutex);
owner_ = t;
this->depth = depth;
if ((t->flags & Notified) == 0) {
remove(t);
}
t->flags = 0;
if (t->r->interrupted()) {
t->r->setInterrupted(false);
return true;
} else {
return false;
}
} else {
sysAbort(s);
}
}
virtual void notify(void* context) {
if (this->context == context) {
int rv = pthread_cond_signal(&condition);
assert(s, rv == 0);
void doNotify(Thread* t) {
ACQUIRE(t->mutex);
t->flags |= Notified;
int rv = pthread_cond_signal(&(t->condition));
assert(s, rv == 0);
}
virtual void notify(System::Thread* context) {
Thread* t = static_cast<Thread*>(context);
if (owner_ == t) {
if (first) {
Thread* t = first;
first = first->next;
if (t == last) {
last = 0;
}
doNotify(t);
}
} else {
sysAbort(s);
}
}
virtual void notifyAll(void* context) {
if (this->context == context) {
int rv = pthread_cond_broadcast(&condition);
assert(s, rv == 0);
virtual void notifyAll(System::Thread* context) {
Thread* t = static_cast<Thread*>(context);
if (owner_ == t) {
for (Thread** p = &first; *p;) {
Thread* t = *p;
p = &(t->next);
if (t == last) {
last = 0;
}
doNotify(t);
}
} else {
sysAbort(s);
}
}
virtual void* owner() {
return context;
virtual System::Thread* owner() {
return owner_;
}
virtual void dispose() {
assert(s, context == 0);
assert(s, owner_ == 0);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&condition);
s->free(this);
}
System* s;
pthread_mutex_t mutex;
pthread_cond_t condition;
void* context;
Thread* owner_;
Thread* first;
Thread* last;
unsigned depth;
};
@ -278,6 +406,14 @@ class MySystem: public System {
MySystem(unsigned limit): limit(limit), count(0) {
pthread_mutex_init(&mutex, 0);
struct sigaction sa;
memset(&sa, 0, sizeof(struct sigaction));
sigemptyset(&(sa.sa_mask));
sa.sa_handler = handleSignal;
int rv = sigaction(InterruptSignal, &sa, 0);
assert(this, rv == 0);
}
virtual bool success(Status s) {
@ -332,16 +468,17 @@ class MySystem: public System {
pthread_mutex_unlock(&mutex);
}
virtual Status attach(System::Thread** tp) {
Thread* t = new (System::allocate(sizeof(Thread))) Thread(this, 0);
virtual Status attach(Runnable* r) {
Thread* t = new (System::allocate(sizeof(Thread))) Thread(this, r);
t->thread = pthread_self();
*tp = t;
r->attach(t);
return 0;
}
virtual Status start(Runnable* r) {
Thread* t = new (System::allocate(sizeof(Thread))) Thread(this, r);
int rv = pthread_create(&(t->thread), 0, run, t);
r->attach(t);
int rv = pthread_create(&(t->thread), 0, run, r);
assert(this, rv == 0);
return 0;
}

View File

@ -20,7 +20,7 @@ class System: public Allocator {
class Thread {
public:
virtual ~Thread() { }
virtual void run() = 0;
virtual void interrupt() = 0;
virtual void join() = 0;
virtual void dispose() = 0;
};
@ -28,20 +28,22 @@ class System: public Allocator {
class Runnable {
public:
virtual ~Runnable() { }
virtual void run(Thread*) = 0;
virtual void dispose() = 0;
virtual void attach(Thread*) = 0;
virtual void run() = 0;
virtual bool interrupted() = 0;
virtual void setInterrupted(bool v) = 0;
};
class Monitor {
public:
virtual ~Monitor() { }
virtual bool tryAcquire(void* context) = 0;
virtual void acquire(void* context) = 0;
virtual void release(void* context) = 0;
virtual void wait(void* context, int64_t time) = 0;
virtual void notify(void* context) = 0;
virtual void notifyAll(void* context) = 0;
virtual void* owner() = 0;
virtual bool tryAcquire(Thread* context) = 0;
virtual void acquire(Thread* context) = 0;
virtual void release(Thread* context) = 0;
virtual bool wait(Thread* context, int64_t time) = 0;
virtual void notify(Thread* context) = 0;
virtual void notifyAll(Thread* context) = 0;
virtual Thread* owner() = 0;
virtual void dispose() = 0;
};
@ -57,7 +59,7 @@ class System: public Allocator {
virtual ~System() { }
virtual bool success(Status) = 0;
virtual Status attach(Thread**) = 0;
virtual Status attach(Runnable*) = 0;
virtual Status start(Runnable*) = 0;
virtual Status make(Monitor**) = 0;
virtual uint64_t call(void* function, uintptr_t* arguments, uint8_t* types,
@ -97,16 +99,20 @@ expect(System* s, bool v)
}
#ifdef NDEBUG
inline void
assert(System*, bool)
{ }
#else
#else // not NDEBUG
inline void
assert(System* s, bool v)
{
expect(s, v);
}
#endif
#endif // not NDEBUG
System*
makeSystem(unsigned heapSize);

View File

@ -128,8 +128,8 @@
(int64_t peer)
(object task)
(object locals)
(object sleepLock)
(uint8_t interrupted))
(uint8_t interrupted)
(object sleepLock))
(type stackTraceElement java/lang/StackTraceElement
(extends jobject)
@ -181,6 +181,9 @@
(type invocationTargetException java/lang/InvocationTargetException
(extends exception))
(type interruptedException java/lang/InterruptedException
(extends exception))
(type error java/lang/Error
(extends throwable))