mirror of
https://github.com/corda/corda.git
synced 2025-01-22 12:28:11 +00:00
avoid running out of OS resources due to zombie thread accumulation (part 2)
My previous attempt wasn't quite sufficient, since it was too late to call join on a thread which had already exited given the code was written to aggressively dispose of system handles as soon as the thread exited. The solution is to delay disposing these handles until after we're able to join the thread.
This commit is contained in:
parent
67a3ca881b
commit
be6896b8a0
@ -45,18 +45,10 @@ void
|
|||||||
join(Thread* t, Thread* o)
|
join(Thread* t, Thread* o)
|
||||||
{
|
{
|
||||||
if (t != o) {
|
if (t != o) {
|
||||||
// todo: There's potentially a leak here on systems where we must
|
assert(t, o->state != Thread::JoinedState);
|
||||||
// call join on a thread in order to clean up all resources
|
assert(t, (o->flags & Thread::SystemFlag) == 0);
|
||||||
// associated with it. If a thread has already been zombified by
|
if (o->flags & Thread::JoinFlag) {
|
||||||
// the time we get here, acquireSystem will return false, which
|
|
||||||
// means we can't safely join it because the System::Thread may
|
|
||||||
// already have been disposed. In that case, the thread has
|
|
||||||
// already exited (or will soon), but the OS will never free all
|
|
||||||
// its resources because it doesn't know we're completely done
|
|
||||||
// with it.
|
|
||||||
if (acquireSystem(t, o)) {
|
|
||||||
o->systemThread->join();
|
o->systemThread->join();
|
||||||
releaseSystem(t, o);
|
|
||||||
}
|
}
|
||||||
o->state = Thread::JoinedState;
|
o->state = Thread::JoinedState;
|
||||||
}
|
}
|
||||||
@ -257,6 +249,7 @@ killZombies(Thread* t, Thread* o)
|
|||||||
killZombies(t, child);
|
killZombies(t, child);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((o->flags & Thread::SystemFlag) == 0) {
|
||||||
switch (o->state) {
|
switch (o->state) {
|
||||||
case Thread::ZombieState:
|
case Thread::ZombieState:
|
||||||
join(t, o);
|
join(t, o);
|
||||||
@ -267,6 +260,7 @@ killZombies(Thread* t, Thread* o)
|
|||||||
|
|
||||||
default: break;
|
default: break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
unsigned
|
unsigned
|
||||||
@ -2623,31 +2617,14 @@ Thread::exit()
|
|||||||
} else {
|
} else {
|
||||||
threadPeer(this, javaThread) = 0;
|
threadPeer(this, javaThread) = 0;
|
||||||
|
|
||||||
System::Monitor* myLock = lock;
|
|
||||||
System::Thread* mySystemThread = systemThread;
|
|
||||||
|
|
||||||
{ ACQUIRE_RAW(this, m->stateLock);
|
|
||||||
|
|
||||||
while (flags & SystemFlag) {
|
|
||||||
m->stateLock->wait(systemThread, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
atomicOr(&flags, Thread::DisposeFlag);
|
|
||||||
|
|
||||||
enter(this, Thread::ZombieState);
|
enter(this, Thread::ZombieState);
|
||||||
}
|
}
|
||||||
|
|
||||||
myLock->dispose();
|
|
||||||
|
|
||||||
mySystemThread->dispose();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
Thread::dispose()
|
Thread::dispose()
|
||||||
{
|
{
|
||||||
if ((flags & Thread::DisposeFlag) == 0) {
|
|
||||||
if (lock) {
|
if (lock) {
|
||||||
lock->dispose();
|
lock->dispose();
|
||||||
}
|
}
|
||||||
@ -2655,7 +2632,6 @@ Thread::dispose()
|
|||||||
if (systemThread) {
|
if (systemThread) {
|
||||||
systemThread->dispose();
|
systemThread->dispose();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
-- m->threadCount;
|
-- m->threadCount;
|
||||||
|
|
||||||
|
@ -1392,7 +1392,7 @@ class Thread {
|
|||||||
static const unsigned StressFlag = 1 << 4;
|
static const unsigned StressFlag = 1 << 4;
|
||||||
static const unsigned ActiveFlag = 1 << 5;
|
static const unsigned ActiveFlag = 1 << 5;
|
||||||
static const unsigned SystemFlag = 1 << 6;
|
static const unsigned SystemFlag = 1 << 6;
|
||||||
static const unsigned DisposeFlag = 1 << 7;
|
static const unsigned JoinFlag = 1 << 7;
|
||||||
|
|
||||||
class Protector {
|
class Protector {
|
||||||
public:
|
public:
|
||||||
@ -1988,6 +1988,7 @@ runThread(Thread* t, uintptr_t*)
|
|||||||
inline bool
|
inline bool
|
||||||
startThread(Thread* t, Thread* p)
|
startThread(Thread* t, Thread* p)
|
||||||
{
|
{
|
||||||
|
p->flags |= Thread::JoinFlag;
|
||||||
return t->m->system->success(t->m->system->start(&(p->runnable)));
|
return t->m->system->success(t->m->system->start(&(p->runnable)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2784,19 +2785,12 @@ parameterFootprint(Thread* t, const char* s, bool static_);
|
|||||||
void
|
void
|
||||||
addFinalizer(Thread* t, object target, void (*finalize)(Thread*, object));
|
addFinalizer(Thread* t, object target, void (*finalize)(Thread*, object));
|
||||||
|
|
||||||
inline bool
|
|
||||||
zombified(Thread* t)
|
|
||||||
{
|
|
||||||
return t->state == Thread::ZombieState
|
|
||||||
or t->state == Thread::JoinedState;
|
|
||||||
}
|
|
||||||
|
|
||||||
inline bool
|
inline bool
|
||||||
acquireSystem(Thread* t, Thread* target)
|
acquireSystem(Thread* t, Thread* target)
|
||||||
{
|
{
|
||||||
ACQUIRE_RAW(t, t->m->stateLock);
|
ACQUIRE_RAW(t, t->m->stateLock);
|
||||||
|
|
||||||
if (not zombified(target)) {
|
if (t->state != Thread::JoinedState) {
|
||||||
atomicOr(&(target->flags), Thread::SystemFlag);
|
atomicOr(&(target->flags), Thread::SystemFlag);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
@ -2809,7 +2803,7 @@ releaseSystem(Thread* t, Thread* target)
|
|||||||
{
|
{
|
||||||
ACQUIRE_RAW(t, t->m->stateLock);
|
ACQUIRE_RAW(t, t->m->stateLock);
|
||||||
|
|
||||||
assert(t, not zombified(target));
|
assert(t, t->state != Thread::JoinedState);
|
||||||
|
|
||||||
atomicAnd(&(target->flags), ~Thread::SystemFlag);
|
atomicAnd(&(target->flags), ~Thread::SystemFlag);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user