Handle stream EOF without entering an infinite poll loop

This commit is contained in:
Jeremy Lakeman 2016-01-27 15:46:21 +10:30
parent 0361b99ca7
commit 64eb4f426b
5 changed files with 35 additions and 26 deletions

View File

@ -496,6 +496,11 @@ static void monitor_replies(struct sched_ent *alarm)
reply_bufend = reply_buffer; reply_bufend = reply_buffer;
discarding_until_nl = 1; discarding_until_nl = 1;
} }
} else if(nread==0 || nread==-1){
close(dna_helper_stdout);
dna_helper_stdout = -1;
unwatch(&sched_replies);
sched_replies.poll.fd = -1;
} }
} }
if (sched_replies.poll.revents & (POLLHUP | POLLERR | POLLNVAL)) { if (sched_replies.poll.revents & (POLLHUP | POLLERR | POLLNVAL)) {
@ -520,6 +525,12 @@ static void monitor_errors(struct sched_ent *alarm)
ssize_t nread = read_nonblock(sched_errors.poll.fd, buffer, sizeof buffer); ssize_t nread = read_nonblock(sched_errors.poll.fd, buffer, sizeof buffer);
if (nread > 0) if (nread > 0)
WHYF("DNAHELPER stderr %s", alloca_toprint(-1, buffer, nread)); WHYF("DNAHELPER stderr %s", alloca_toprint(-1, buffer, nread));
if (nread==0 || nread==-1){
close(dna_helper_stderr);
dna_helper_stderr = -1;
unwatch(&sched_errors);
sched_errors.poll.fd = -1;
}
} }
if (sched_errors.poll.revents & (POLLHUP | POLLERR | POLLNVAL)) { if (sched_errors.poll.revents & (POLLHUP | POLLERR | POLLNVAL)) {
DEBUGF(dnahelper, "DNAHELPER closing stderr fd=%d", dna_helper_stderr); DEBUGF(dnahelper, "DNAHELPER closing stderr fd=%d", dna_helper_stderr);

View File

@ -299,8 +299,13 @@ static void call_alarm(struct sched_ent *alarm, int revents)
struct call_stats call_stats; struct call_stats call_stats;
call_stats.totals = alarm->stats; call_stats.totals = alarm->stats;
DEBUGF(io, "Calling alarm/callback %p %s", alarm, alloca_alarm_name(alarm)); DEBUGF(io, "Calling alarm/callback %p %s with revents %x%s%s%s%s",
alarm, alloca_alarm_name(alarm), revents,
revents&POLLIN?" POLLIN":"",
revents&POLLOUT?" POLLOUT":"",
revents&POLLERR?" POLLERR":"",
revents&POLLHUP?" POLLHUP":""
);
if (call_stats.totals) if (call_stats.totals)
fd_func_enter(__HERE__, &call_stats); fd_func_enter(__HERE__, &call_stats);

View File

@ -1565,8 +1565,8 @@ static ssize_t http_request_read(struct http_request *r, char *buf, size_t len)
{ {
sigPipeFlag = 0; sigPipeFlag = 0;
ssize_t bytes = read_nonblock(r->alarm.poll.fd, buf, len); ssize_t bytes = read_nonblock(r->alarm.poll.fd, buf, len);
if (bytes == -1) { if (bytes == -1 || bytes == 0) {
IDEBUG(r->debug, "HTTP socket read error, closing connection"); IDEBUG(r->debug, "HTTP EOF or socket read error, closing connection");
http_request_finalise(r); http_request_finalise(r);
return -1; return -1;
} }
@ -1617,7 +1617,7 @@ static void http_request_receive(struct http_request *r)
if (r->request_content_remaining != CONTENT_LENGTH_UNKNOWN) if (r->request_content_remaining != CONTENT_LENGTH_UNKNOWN)
assert(room <= r->request_content_remaining); assert(room <= r->request_content_remaining);
ssize_t bytes = http_request_read(r, (char *)r->end, room); ssize_t bytes = http_request_read(r, (char *)r->end, room);
if (bytes == -1) if (bytes <0)
RETURNVOID; RETURNVOID;
assert((size_t) bytes <= room); assert((size_t) bytes <= room);
// If no data was read, then just return to polling. Don't drop the connection on an empty read, // If no data was read, then just return to polling. Don't drop the connection on an empty read,
@ -2211,7 +2211,7 @@ static size_t http_request_drain(struct http_request *r)
char buf[8192]; char buf[8192];
size_t drained = 0; size_t drained = 0;
ssize_t bytes; ssize_t bytes;
while ((bytes = http_request_read(r, buf, sizeof buf)) != -1 && bytes != 0) while ((bytes = http_request_read(r, buf, sizeof buf)) >0)
drained += (size_t) bytes; drained += (size_t) bytes;
return drained; return drained;
} }

2
net.c
View File

@ -68,7 +68,7 @@ ssize_t _read_nonblock(int fd, void *buf, size_t len, struct __sourceloc __whenc
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK: case EWOULDBLOCK:
#endif #endif
return 0; return -2;
} }
return WHYF_perror("read_nonblock: read(%d,%p,%lu)", fd, buf, (unsigned long)len); return WHYF_perror("read_nonblock: read(%d,%p,%lu)", fd, buf, (unsigned long)len);
} }

View File

@ -607,7 +607,7 @@ status_ok:
); );
slot->alarm.poll.fd = sock; slot->alarm.poll.fd = sock;
/* Watch for activity on the socket */ /* Watch for activity on the socket */
slot->alarm.poll.events = POLLIN|POLLOUT; slot->alarm.poll.events = POLLOUT;
watch(&slot->alarm); watch(&slot->alarm);
/* And schedule a timeout alarm */ /* And schedule a timeout alarm */
unschedule(&slot->alarm); unschedule(&slot->alarm);
@ -1428,7 +1428,6 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
case RHIZOME_FETCH_RXFILE: { case RHIZOME_FETCH_RXFILE: {
/* Keep reading until we have the promised amount of data */ /* Keep reading until we have the promised amount of data */
unsigned char buffer[8192]; unsigned char buffer[8192];
sigPipeFlag = 0;
errno=0; errno=0;
int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer); int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer);
/* If we got some data, see if we have found the end of the HTTP request */ /* If we got some data, see if we have found the end of the HTTP request */
@ -1439,9 +1438,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
slot->alarm.alarm=gettime_ms() + config.rhizome.idle_timeout; slot->alarm.alarm=gettime_ms() + config.rhizome.idle_timeout;
slot->alarm.deadline = slot->alarm.alarm + config.rhizome.idle_timeout; slot->alarm.deadline = slot->alarm.alarm + config.rhizome.idle_timeout;
schedule(&slot->alarm); schedule(&slot->alarm);
return; } else if (bytes==0 || bytes==-1){
} else {
if (errno!=EAGAIN) {
DEBUGF(rhizome_rx, "Empty read, closing connection: received %"PRIu64" of %"PRIu64" bytes", DEBUGF(rhizome_rx, "Empty read, closing connection: received %"PRIu64" of %"PRIu64" bytes",
slot->write_state.file_offset, slot->write_state.file_offset,
slot->write_state.file_length); slot->write_state.file_length);
@ -1449,19 +1446,12 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
} }
return; return;
} }
if (sigPipeFlag) {
DEBUG(rhizome_rx, "Received SIGPIPE, closing connection");
rhizome_fetch_switch_to_mdp(slot);
return;
}
}
break;
case RHIZOME_FETCH_RXHTTPHEADERS: { case RHIZOME_FETCH_RXHTTPHEADERS: {
/* Keep reading until we have two CR/LFs in a row */ /* Keep reading until we have two CR/LFs in a row */
sigPipeFlag = 0; errno=0;
int bytes = read_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_len], 1024 - slot->request_len - 1); int bytes = read_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_len], 1024 - slot->request_len - 1);
/* If we got some data, see if we have found the end of the HTTP reply */
if (bytes>0){ if (bytes>0){
/* If we got some data, see if we have found the end of the HTTP reply */
// reset timeout // reset timeout
unschedule(&slot->alarm); unschedule(&slot->alarm);
slot->alarm.alarm = gettime_ms() + config.rhizome.idle_timeout; slot->alarm.alarm = gettime_ms() + config.rhizome.idle_timeout;
@ -1515,6 +1505,9 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
return; return;
} }
} }
}else if (bytes==0 || bytes==-1){
rhizome_fetch_switch_to_mdp(slot);
return;
} }
break; break;
default: default: