mirror of
https://github.com/corda/corda.git
synced 2025-01-19 11:16:54 +00:00
various threading bugfixes
This commit is contained in:
parent
56a5cf2503
commit
5fa7b074b4
108
src/machine.cpp
108
src/machine.cpp
@ -32,27 +32,79 @@ join(Thread* t, Thread* o)
|
||||
}
|
||||
}
|
||||
|
||||
unsigned
|
||||
count(Thread* t, Thread* o)
|
||||
{
|
||||
unsigned c = 0;
|
||||
|
||||
if (t != o) ++ c;
|
||||
|
||||
for (Thread* p = t->peer; p; p = p->peer) {
|
||||
c += count(p, o);
|
||||
}
|
||||
|
||||
if (t->child) c += count(t->child, o);
|
||||
|
||||
return c;
|
||||
}
|
||||
|
||||
Thread**
|
||||
fill(Thread* t, Thread* o, Thread** array)
|
||||
{
|
||||
if (t != o) *(array++) = t;
|
||||
|
||||
for (Thread* p = t->peer; p; p = p->peer) {
|
||||
array = fill(p, o, array);
|
||||
}
|
||||
|
||||
if (t->child) array = fill(t->child, o, array);
|
||||
|
||||
return array;
|
||||
}
|
||||
|
||||
void
|
||||
dispose(Thread* t, Thread* o, bool remove)
|
||||
{
|
||||
if (remove) {
|
||||
// debug
|
||||
expect(t, find(t->m->rootThread, o));
|
||||
|
||||
unsigned c = count(t->m->rootThread, o);
|
||||
Thread* threads[c];
|
||||
fill(t->m->rootThread, o, threads);
|
||||
// end debug
|
||||
|
||||
if (o->parent) {
|
||||
if (o->child) {
|
||||
o->parent->child = o->child;
|
||||
if (o->peer) {
|
||||
o->peer->peer = o->child->peer;
|
||||
o->child->peer = o->peer;
|
||||
Thread* previous;
|
||||
for (Thread* p = o->parent->child; p;) {
|
||||
if (p == o) {
|
||||
if (p == o->parent->child) {
|
||||
o->parent->child = p->peer;
|
||||
} else {
|
||||
previous->peer = p->peer;
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
previous = p;
|
||||
p = p->peer;
|
||||
}
|
||||
} else if (o->peer) {
|
||||
o->parent->child = o->peer;
|
||||
} else {
|
||||
o->parent->child = 0;
|
||||
}
|
||||
|
||||
for (Thread* p = o->child; p;) {
|
||||
Thread* next = p->peer;
|
||||
p->peer = o->parent->child;
|
||||
o->parent->child = p;
|
||||
p->parent = o->parent;
|
||||
p = next;
|
||||
}
|
||||
} else if (o->child) {
|
||||
t->m->rootThread = o->child;
|
||||
if (o->peer) {
|
||||
o->peer->peer = o->child->peer;
|
||||
o->child->peer = o->peer;
|
||||
|
||||
for (Thread* p = o->peer; p;) {
|
||||
Thread* next = p->peer;
|
||||
p->peer = t->m->rootThread;
|
||||
t->m->rootThread = p;
|
||||
p = next;
|
||||
}
|
||||
} else if (o->peer) {
|
||||
t->m->rootThread = o->peer;
|
||||
@ -60,7 +112,13 @@ dispose(Thread* t, Thread* o, bool remove)
|
||||
abort(t);
|
||||
}
|
||||
|
||||
assert(t, not find(t->m->rootThread, o));
|
||||
// debug
|
||||
expect(t, not find(t->m->rootThread, o));
|
||||
|
||||
for (unsigned i = 0; i < c; ++i) {
|
||||
expect(t, find(t->m->rootThread, threads[i]));
|
||||
}
|
||||
// end debug
|
||||
}
|
||||
|
||||
o->dispose();
|
||||
@ -1290,12 +1348,17 @@ parseMethodTable(Thread* t, Stream& s, object class_, object pool)
|
||||
if (declaredVirtualCount == 0
|
||||
and (classFlags(t, class_) & ACC_INTERFACE) == 0)
|
||||
{
|
||||
// inherit interface table and virtual table from superclass
|
||||
|
||||
set(t, class_, ClassInterfaceTable,
|
||||
classInterfaceTable(t, classSuper(t, class_)));
|
||||
|
||||
// inherit virtual table from superclass
|
||||
set(t, class_, ClassVirtualTable, superVirtualTable);
|
||||
|
||||
if (classInterfaceTable(t, classSuper(t, class_))
|
||||
and arrayLength(t, classInterfaceTable(t, class_))
|
||||
== arrayLength(t, classInterfaceTable(t, classSuper(t, class_))))
|
||||
{
|
||||
// inherit interface table from superclass
|
||||
set(t, class_, ClassInterfaceTable,
|
||||
classInterfaceTable(t, classSuper(t, class_)));
|
||||
}
|
||||
} else if (virtualCount) {
|
||||
// generate class vtable
|
||||
|
||||
@ -1892,6 +1955,7 @@ Thread::init()
|
||||
#include "type-java-initializations.cpp"
|
||||
}
|
||||
} else {
|
||||
peer = parent->child;
|
||||
parent->child = this;
|
||||
}
|
||||
|
||||
@ -2048,6 +2112,8 @@ enter(Thread* t, Thread::State s)
|
||||
case Thread::ExclusiveState: {
|
||||
assert(t, t->m->exclusive == t);
|
||||
t->m->exclusive = 0;
|
||||
|
||||
t->m->stateLock->notifyAll(t->systemThread);
|
||||
} break;
|
||||
|
||||
case Thread::ActiveState: break;
|
||||
@ -2632,7 +2698,7 @@ addFinalizer(Thread* t, object target, void (*finalize)(Thread*, object))
|
||||
}
|
||||
|
||||
System::Monitor*
|
||||
objectMonitor(Thread* t, object o)
|
||||
objectMonitor(Thread* t, object o, bool createNew)
|
||||
{
|
||||
object p = hashMapFind(t, t->m->monitorMap, o, objectHash, objectEqual);
|
||||
|
||||
@ -2644,7 +2710,7 @@ objectMonitor(Thread* t, object o)
|
||||
}
|
||||
|
||||
return static_cast<System::Monitor*>(pointerValue(t, p));
|
||||
} else {
|
||||
} else if (createNew) {
|
||||
PROTECT(t, o);
|
||||
|
||||
ENTER(t, Thread::ExclusiveState);
|
||||
@ -2665,6 +2731,8 @@ objectMonitor(Thread* t, object o)
|
||||
addFinalizer(t, o, removeMonitor);
|
||||
|
||||
return m;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1998,12 +1998,12 @@ void
|
||||
addFinalizer(Thread* t, object target, void (*finalize)(Thread*, object));
|
||||
|
||||
System::Monitor*
|
||||
objectMonitor(Thread* t, object o);
|
||||
objectMonitor(Thread* t, object o, bool createNew);
|
||||
|
||||
inline void
|
||||
acquire(Thread* t, object o)
|
||||
{
|
||||
System::Monitor* m = objectMonitor(t, o);
|
||||
System::Monitor* m = objectMonitor(t, o, true);
|
||||
|
||||
if (DebugMonitors) {
|
||||
fprintf(stderr, "thread %p acquires %p for %x\n",
|
||||
@ -2016,7 +2016,7 @@ acquire(Thread* t, object o)
|
||||
inline void
|
||||
release(Thread* t, object o)
|
||||
{
|
||||
System::Monitor* m = objectMonitor(t, o);
|
||||
System::Monitor* m = objectMonitor(t, o, false);
|
||||
|
||||
if (DebugMonitors) {
|
||||
fprintf(stderr, "thread %p releases %p for %x\n",
|
||||
@ -2029,14 +2029,14 @@ release(Thread* t, object o)
|
||||
inline void
|
||||
wait(Thread* t, object o, int64_t milliseconds)
|
||||
{
|
||||
System::Monitor* m = objectMonitor(t, o);
|
||||
System::Monitor* m = objectMonitor(t, o, false);
|
||||
|
||||
if (DebugMonitors) {
|
||||
fprintf(stderr, "thread %p waits %"LLD" millis on %p for %x\n",
|
||||
t, milliseconds, m, objectHash(t, o));
|
||||
}
|
||||
|
||||
if (m->owner() == t->systemThread) {
|
||||
if (m and m->owner() == t->systemThread) {
|
||||
ENTER(t, Thread::IdleState);
|
||||
|
||||
bool interrupted = m->wait(t->systemThread, milliseconds);
|
||||
@ -2058,14 +2058,14 @@ wait(Thread* t, object o, int64_t milliseconds)
|
||||
inline void
|
||||
notify(Thread* t, object o)
|
||||
{
|
||||
System::Monitor* m = objectMonitor(t, o);
|
||||
System::Monitor* m = objectMonitor(t, o, false);
|
||||
|
||||
if (DebugMonitors) {
|
||||
fprintf(stderr, "thread %p notifies on %p for %x\n",
|
||||
t, m, objectHash(t, o));
|
||||
}
|
||||
|
||||
if (m->owner() == t->systemThread) {
|
||||
if (m and m->owner() == t->systemThread) {
|
||||
m->notify(t->systemThread);
|
||||
} else {
|
||||
t->exception = makeIllegalMonitorStateException(t);
|
||||
@ -2075,14 +2075,14 @@ notify(Thread* t, object o)
|
||||
inline void
|
||||
notifyAll(Thread* t, object o)
|
||||
{
|
||||
System::Monitor* m = objectMonitor(t, o);
|
||||
System::Monitor* m = objectMonitor(t, o, false);
|
||||
|
||||
if (DebugMonitors) {
|
||||
fprintf(stderr, "thread %p notifies all on %p for %x\n",
|
||||
t, m, objectHash(t, o));
|
||||
}
|
||||
|
||||
if (m->owner() == t->systemThread) {
|
||||
if (m and m->owner() == t->systemThread) {
|
||||
m->notifyAll(t->systemThread);
|
||||
} else {
|
||||
t->exception = makeIllegalMonitorStateException(t);
|
||||
|
@ -80,7 +80,7 @@ class MySystem: public System {
|
||||
|
||||
virtual void join() {
|
||||
int rv UNUSED = pthread_join(thread, 0);
|
||||
assert(s, rv == 0);
|
||||
expect(s, rv == 0);
|
||||
}
|
||||
|
||||
virtual void dispose() {
|
||||
@ -150,21 +150,32 @@ class MySystem: public System {
|
||||
void append(Thread* t) {
|
||||
if (last) {
|
||||
last->next = t;
|
||||
last = t;
|
||||
} else {
|
||||
first = last = t;
|
||||
}
|
||||
}
|
||||
|
||||
void remove(Thread* t) {
|
||||
for (Thread** p = &first; *p;) {
|
||||
if (t == *p) {
|
||||
*p = t->next;
|
||||
if (last == t) {
|
||||
last = 0;
|
||||
Thread* previous;
|
||||
for (Thread* current = first; current;) {
|
||||
if (t == current) {
|
||||
if (current == first) {
|
||||
first = t->next;
|
||||
} else {
|
||||
previous->next = t->next;
|
||||
}
|
||||
|
||||
if (current == last) {
|
||||
last = previous;
|
||||
}
|
||||
|
||||
t->next = 0;
|
||||
|
||||
break;
|
||||
} else {
|
||||
p = &((*p)->next);
|
||||
previous = current;
|
||||
current = current->next;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -194,10 +205,10 @@ class MySystem: public System {
|
||||
timespec ts = { then / 1000, (then % 1000) * 1000 * 1000 };
|
||||
int rv UNUSED = pthread_cond_timedwait
|
||||
(&(t->condition), &(t->mutex), &ts);
|
||||
assert(s, rv == 0 or rv == ETIMEDOUT or rv == EINTR);
|
||||
expect(s, rv == 0 or rv == ETIMEDOUT or rv == EINTR);
|
||||
} else {
|
||||
int rv UNUSED = pthread_cond_wait(&(t->condition), &(t->mutex));
|
||||
assert(s, rv == 0 or rv == EINTR);
|
||||
expect(s, rv == 0 or rv == EINTR);
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
@ -227,7 +238,7 @@ class MySystem: public System {
|
||||
|
||||
t->flags |= Notified;
|
||||
int rv UNUSED = pthread_cond_signal(&(t->condition));
|
||||
assert(s, rv == 0);
|
||||
expect(s, rv == 0);
|
||||
}
|
||||
|
||||
virtual void notify(System::Thread* context) {
|
||||
@ -266,7 +277,7 @@ class MySystem: public System {
|
||||
}
|
||||
|
||||
virtual void dispose() {
|
||||
assert(s, owner_ == 0);
|
||||
expect(s, owner_ == 0);
|
||||
pthread_mutex_destroy(&mutex);
|
||||
s->free(this);
|
||||
}
|
||||
@ -283,7 +294,7 @@ class MySystem: public System {
|
||||
public:
|
||||
Local(System* s): s(s) {
|
||||
int r UNUSED = pthread_key_create(&key, 0);
|
||||
assert(s, r == 0);
|
||||
expect(s, r == 0);
|
||||
}
|
||||
|
||||
virtual void* get() {
|
||||
@ -292,12 +303,12 @@ class MySystem: public System {
|
||||
|
||||
virtual void set(void* p) {
|
||||
int r UNUSED = pthread_setspecific(key, p);
|
||||
assert(s, r == 0);
|
||||
expect(s, r == 0);
|
||||
}
|
||||
|
||||
virtual void dispose() {
|
||||
int r UNUSED = pthread_key_delete(key);
|
||||
assert(s, r == 0);
|
||||
expect(s, r == 0);
|
||||
|
||||
s->free(this);
|
||||
}
|
||||
@ -395,7 +406,7 @@ class MySystem: public System {
|
||||
sa.sa_handler = handleSignal;
|
||||
|
||||
int rv UNUSED = sigaction(InterruptSignal, &sa, 0);
|
||||
assert(this, rv == 0);
|
||||
expect(this, rv == 0);
|
||||
}
|
||||
|
||||
virtual bool success(Status s) {
|
||||
@ -455,7 +466,7 @@ class MySystem: public System {
|
||||
Thread* t = new (System::allocate(sizeof(Thread))) Thread(this, r);
|
||||
r->attach(t);
|
||||
int rv UNUSED = pthread_create(&(t->thread), 0, run, r);
|
||||
assert(this, rv == 0);
|
||||
expect(this, rv == 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -151,21 +151,32 @@ class MySystem: public System {
|
||||
void append(Thread* t) {
|
||||
if (last) {
|
||||
last->next = t;
|
||||
last = t;
|
||||
} else {
|
||||
first = last = t;
|
||||
}
|
||||
}
|
||||
|
||||
void remove(Thread* t) {
|
||||
for (Thread** p = &first; *p;) {
|
||||
if (t == *p) {
|
||||
*p = t->next;
|
||||
if (last == t) {
|
||||
last = 0;
|
||||
Thread* previous;
|
||||
for (Thread* current = first; current;) {
|
||||
if (t == current) {
|
||||
if (current == first) {
|
||||
first = t->next;
|
||||
} else {
|
||||
previous->next = t->next;
|
||||
}
|
||||
|
||||
if (current == last) {
|
||||
last = previous;
|
||||
}
|
||||
|
||||
t->next = 0;
|
||||
|
||||
break;
|
||||
} else {
|
||||
p = &((*p)->next);
|
||||
previous = current;
|
||||
current = current->next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user