mirror of
https://github.com/corda/corda.git
synced 2025-01-07 13:38:47 +00:00
tweak System::Monitor::wait to avoid notify deadlock
This commit is contained in:
parent
f5f7b01689
commit
6ba7852a62
@ -274,50 +274,56 @@ class MySystem: public System {
|
||||
Thread* t = static_cast<Thread*>(context);
|
||||
|
||||
if (owner_ == t) {
|
||||
ACQUIRE(t->mutex);
|
||||
bool interrupted;
|
||||
unsigned depth;
|
||||
|
||||
{ ACQUIRE(t->mutex);
|
||||
|
||||
if (t->r->interrupted()) {
|
||||
t->r->setInterrupted(false);
|
||||
return true;
|
||||
}
|
||||
if (t->r->interrupted()) {
|
||||
t->r->setInterrupted(false);
|
||||
return true;
|
||||
}
|
||||
|
||||
t->flags |= Waiting;
|
||||
t->flags |= Waiting;
|
||||
|
||||
append(t);
|
||||
append(t);
|
||||
|
||||
unsigned depth = this->depth;
|
||||
this->depth = 0;
|
||||
owner_ = 0;
|
||||
pthread_mutex_unlock(&mutex);
|
||||
depth = this->depth;
|
||||
this->depth = 0;
|
||||
owner_ = 0;
|
||||
pthread_mutex_unlock(&mutex);
|
||||
|
||||
if (time) {
|
||||
int64_t then = s->now() + time;
|
||||
timespec ts = { then / 1000, (then % 1000) * 1000 * 1000 };
|
||||
int rv UNUSED = pthread_cond_timedwait
|
||||
(&(t->condition), &(t->mutex), &ts);
|
||||
expect(s, rv == 0 or rv == ETIMEDOUT or rv == EINTR);
|
||||
} else {
|
||||
int rv UNUSED = pthread_cond_wait(&(t->condition), &(t->mutex));
|
||||
expect(s, rv == 0 or rv == EINTR);
|
||||
if (time) {
|
||||
int64_t then = s->now() + time;
|
||||
timespec ts = { then / 1000, (then % 1000) * 1000 * 1000 };
|
||||
int rv UNUSED = pthread_cond_timedwait
|
||||
(&(t->condition), &(t->mutex), &ts);
|
||||
expect(s, rv == 0 or rv == ETIMEDOUT or rv == EINTR);
|
||||
} else {
|
||||
int rv UNUSED = pthread_cond_wait(&(t->condition), &(t->mutex));
|
||||
expect(s, rv == 0 or rv == EINTR);
|
||||
}
|
||||
|
||||
if ((t->flags & Notified) == 0) {
|
||||
remove(t);
|
||||
}
|
||||
|
||||
t->flags = 0;
|
||||
t->next = 0;
|
||||
|
||||
if (t->r->interrupted()) {
|
||||
t->r->setInterrupted(false);
|
||||
interrupted = true;
|
||||
} else {
|
||||
interrupted = false;
|
||||
}
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
owner_ = t;
|
||||
this->depth = depth;
|
||||
|
||||
if ((t->flags & Notified) == 0) {
|
||||
remove(t);
|
||||
}
|
||||
|
||||
t->flags = 0;
|
||||
t->next = 0;
|
||||
|
||||
if (t->r->interrupted()) {
|
||||
t->r->setInterrupted(false);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
return interrupted;
|
||||
} else {
|
||||
sysAbort(s);
|
||||
}
|
||||
|
@ -240,55 +240,62 @@ class MySystem: public System {
|
||||
assert(s, t);
|
||||
|
||||
if (owner_ == t) {
|
||||
ACQUIRE(s, t->mutex);
|
||||
bool interrupted;
|
||||
unsigned depth;
|
||||
int r UNUSED;
|
||||
|
||||
{ ACQUIRE(s, t->mutex);
|
||||
|
||||
if (t->r->interrupted()) {
|
||||
t->r->setInterrupted(false);
|
||||
return true;
|
||||
if (t->r->interrupted()) {
|
||||
t->r->setInterrupted(false);
|
||||
return true;
|
||||
}
|
||||
|
||||
t->flags |= Waiting;
|
||||
|
||||
append(t);
|
||||
|
||||
depth = this->depth;
|
||||
this->depth = 0;
|
||||
owner_ = 0;
|
||||
|
||||
bool success UNUSED = ReleaseMutex(mutex);
|
||||
assert(s, success);
|
||||
|
||||
success = ResetEvent(t->event);
|
||||
assert(s, success);
|
||||
|
||||
success = ReleaseMutex(t->mutex);
|
||||
assert(s, success);
|
||||
|
||||
r = WaitForSingleObject(t->event, (time ? time : INFINITE));
|
||||
assert(s, r == WAIT_OBJECT_0 or r == WAIT_TIMEOUT);
|
||||
|
||||
r = WaitForSingleObject(t->mutex, INFINITE);
|
||||
assert(s, r == WAIT_OBJECT_0);
|
||||
|
||||
if ((t->flags & Notified) == 0) {
|
||||
remove(t);
|
||||
}
|
||||
|
||||
t->flags = 0;
|
||||
t->next = 0;
|
||||
|
||||
if (t->r->interrupted()) {
|
||||
t->r->setInterrupted(false);
|
||||
interrupted = true;
|
||||
} else {
|
||||
interrupted = false;
|
||||
}
|
||||
}
|
||||
|
||||
t->flags |= Waiting;
|
||||
|
||||
append(t);
|
||||
|
||||
unsigned depth = this->depth;
|
||||
this->depth = 0;
|
||||
owner_ = 0;
|
||||
|
||||
bool success UNUSED = ReleaseMutex(mutex);
|
||||
assert(s, success);
|
||||
|
||||
success = ResetEvent(t->event);
|
||||
assert(s, success);
|
||||
|
||||
success = ReleaseMutex(t->mutex);
|
||||
assert(s, success);
|
||||
|
||||
int r UNUSED = WaitForSingleObject(t->event, (time ? time : INFINITE));
|
||||
assert(s, r == WAIT_OBJECT_0 or r == WAIT_TIMEOUT);
|
||||
|
||||
r = WaitForSingleObject(t->mutex, INFINITE);
|
||||
assert(s, r == WAIT_OBJECT_0);
|
||||
|
||||
r = WaitForSingleObject(mutex, INFINITE);
|
||||
assert(s, r == WAIT_OBJECT_0);
|
||||
|
||||
owner_ = t;
|
||||
this->depth = depth;
|
||||
|
||||
if ((t->flags & Notified) == 0) {
|
||||
remove(t);
|
||||
}
|
||||
|
||||
t->flags = 0;
|
||||
t->next = 0;
|
||||
|
||||
if (t->r->interrupted()) {
|
||||
t->r->setInterrupted(false);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
return interrupted;
|
||||
} else {
|
||||
sysAbort(s);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user