Close sockets on poll errors

This commit is contained in:
Jeremy Lakeman 2012-08-22 14:50:14 +09:30
parent f6c899e107
commit df0e20408c
5 changed files with 439 additions and 399 deletions

128
monitor.c
View File

@ -168,6 +168,8 @@ void monitor_poll(struct sched_ent *alarm)
void monitor_client_close(struct monitor_context *c){
struct monitor_context *last;
INFO("Tearing down monitor client");
unwatch(&c->alarm);
close(c->alarm.poll.fd);
c->alarm.poll.fd=-1;
@ -189,75 +191,79 @@ void monitor_client_poll(struct sched_ent *alarm)
errno=0;
int bytes;
switch(c->state) {
case MONITOR_STATE_COMMAND:
bytes = 1;
while(bytes == 1) {
if (c->line_length >= MONITOR_LINE_LENGTH) {
/* line too long */
c->line[MONITOR_LINE_LENGTH-1] = 0;
monitor_process_command(c);
bytes = -1;
break;
}
bytes = read(c->alarm.poll.fd, &c->line[c->line_length], 1);
if (bytes < 1) {
switch(errno) {
case EINTR:
case ENOTRECOVERABLE:
/* transient errors */
WHY_perror("read");
if (alarm->poll.revents & POLLIN) {
switch(c->state) {
case MONITOR_STATE_COMMAND:
bytes = 1;
while(bytes == 1) {
if (c->line_length >= MONITOR_LINE_LENGTH) {
/* line too long */
c->line[MONITOR_LINE_LENGTH-1] = 0;
monitor_process_command(c);
bytes = -1;
break;
case EAGAIN:
break;
default:
WHY_perror("read");
/* all other errors; close socket */
INFO("Tearing down monitor client");
monitor_client_close(c);
return;
}
}
if (bytes > 0 && (c->line[c->line_length] != '\r')) {
c->line_length += bytes;
if (c->line[c->line_length-1] == '\n') {
/* got command */
c->line[c->line_length-1] = 0; /* trim new line for easier parsing */
monitor_process_command(c);
bytes = read(c->alarm.poll.fd, &c->line[c->line_length], 1);
if (bytes < 1) {
switch(errno) {
case EINTR:
case ENOTRECOVERABLE:
/* transient errors */
WHY_perror("read");
break;
case EAGAIN:
break;
default:
WHY_perror("read");
/* all other errors; close socket */
monitor_client_close(c);
return;
}
}
}
break;
case MONITOR_STATE_DATA:
bytes = read(c->alarm.poll.fd,
&c->buffer[c->data_offset],
c->data_expected-c->data_offset);
if (bytes < 1) {
switch(errno) {
case EAGAIN: case EINTR:
/* transient errors */
break;
default:
/* all other errors; close socket */
WHYF("Tearing down monitor client due to errno=%d",
errno);
monitor_client_close(c);
return;
if (bytes > 0 && (c->line[c->line_length] != '\r')) {
c->line_length += bytes;
if (c->line[c->line_length-1] == '\n') {
/* got command */
c->line[c->line_length-1] = 0; /* trim new line for easier parsing */
monitor_process_command(c);
break;
}
}
}
} else {
c->data_offset += bytes;
if (c->data_offset >= c->data_expected)
{
/* we have the binary data we were expecting. */
monitor_process_data(c);
c->state = MONITOR_STATE_COMMAND;
break;
case MONITOR_STATE_DATA:
bytes = read(c->alarm.poll.fd,
&c->buffer[c->data_offset],
c->data_expected-c->data_offset);
if (bytes < 1) {
switch(errno) {
case EAGAIN: case EINTR:
/* transient errors */
break;
default:
/* all other errors; close socket */
WHYF("Tearing down monitor client due to errno=%d",
errno);
monitor_client_close(c);
return;
}
} else {
c->data_offset += bytes;
if (c->data_offset >= c->data_expected)
{
/* we have the binary data we were expecting. */
monitor_process_data(c);
c->state = MONITOR_STATE_COMMAND;
}
}
break;
default:
c->state = MONITOR_STATE_COMMAND;
WHY("fixed monitor connection state");
}
break;
default:
c->state = MONITOR_STATE_COMMAND;
WHY("fixed monitor connection state");
}
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
monitor_client_close(c);
}
return;
}

View File

@ -254,50 +254,58 @@ error:
// So we have to bind a socket to INADDR_ANY to receive these packets.
static void
overlay_interface_read_any(struct sched_ent *alarm){
int plen=0;
int recvttl=1;
int i;
unsigned char packet[16384];
overlay_interface *interface=NULL;
struct sockaddr src_addr;
socklen_t addrlen = sizeof(src_addr);
if (alarm->poll.revents & POLLIN) {
int plen=0;
int recvttl=1;
int i;
unsigned char packet[16384];
overlay_interface *interface=NULL;
struct sockaddr src_addr;
socklen_t addrlen = sizeof(src_addr);
/* Read only one UDP packet per call to share resources more fairly, and also
enable stats to accurately count packets received */
plen = recvwithttl(alarm->poll.fd, packet, sizeof(packet), &recvttl, &src_addr, &addrlen);
if (plen == -1) {
WHY_perror("recvwithttl(c)");
unwatch(alarm);
close(alarm->poll.fd);
return;
}
/* Read only one UDP packet per call to share resources more fairly, and also
enable stats to accurately count packets received */
plen = recvwithttl(alarm->poll.fd, packet, sizeof(packet), &recvttl, &src_addr, &addrlen);
if (plen == -1) {
WHY_perror("recvwithttl(c)");
unwatch(alarm);
close(alarm->poll.fd);
return;
}
struct in_addr src = ((struct sockaddr_in *)&src_addr)->sin_addr;
struct in_addr src = ((struct sockaddr_in *)&src_addr)->sin_addr;
/* Try to identify the real interface that the packet arrived on */
for (i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP)
continue;
// TODO test netmask...
if ((overlay_interfaces[i].netmask.s_addr & src.s_addr) == (overlay_interfaces[i].netmask.s_addr & overlay_interfaces[i].address.sin_addr.s_addr)){
interface = &overlay_interfaces[i];
break;
/* Try to identify the real interface that the packet arrived on */
for (i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP)
continue;
// TODO test netmask...
if ((overlay_interfaces[i].netmask.s_addr & src.s_addr) == (overlay_interfaces[i].netmask.s_addr & overlay_interfaces[i].address.sin_addr.s_addr)){
interface = &overlay_interfaces[i];
break;
}
}
/* Should we drop the packet if we don't find a match? */
if (!interface){
if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("Could not find matching interface for packet received from %s", inet_ntoa(src));
return;
}
/* We have a frame from this interface */
if (debug&DEBUG_PACKETRX)
DEBUG_packet_visualise("Read from real interface", packet,plen);
if (debug&DEBUG_OVERLAYINTERFACES) DEBUGF("Received %d bytes on interface INADDR_ANY",plen);
if (packetOk(interface,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) {
WHY("Malformed packet");
}
}
/* Should we drop the packet if we don't find a match? */
if (!interface){
if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("Could not find matching interface for packet received from %s", inet_ntoa(src));
return;
}
/* We have a frame from this interface */
if (debug&DEBUG_PACKETRX)
DEBUG_packet_visualise("Read from real interface", packet,plen);
if (debug&DEBUG_OVERLAYINTERFACES) DEBUGF("Received %d bytes on interface INADDR_ANY",plen);
if (packetOk(interface,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) {
WHY("Malformed packet");
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
INFO("Closing broadcast socket due to error");
unwatch(alarm);
close(alarm->poll.fd);
alarm->poll.fd=-1;
}
}
@ -478,11 +486,6 @@ overlay_interface_init(char *name, struct in_addr src_addr, struct in_addr netma
static void overlay_interface_poll(struct sched_ent *alarm)
{
struct overlay_interface *interface = (overlay_interface *)alarm;
int plen=0;
unsigned char packet[16384];
struct sockaddr src_addr;
socklen_t addrlen = sizeof(src_addr);
if (alarm->poll.revents==0){
@ -499,24 +502,37 @@ static void overlay_interface_poll(struct sched_ent *alarm)
return;
}
/* Read only one UDP packet per call to share resources more fairly, and also
enable stats to accurately count packets received */
int recvttl=1;
plen = recvwithttl(alarm->poll.fd,packet, sizeof(packet), &recvttl, &src_addr, &addrlen);
if (plen == -1) {
WHY_perror("recvwithttl(c)");
overlay_interface_close(interface);
return;
if (alarm->poll.revents & POLLIN) {
int plen=0;
unsigned char packet[16384];
struct sockaddr src_addr;
socklen_t addrlen = sizeof(src_addr);
/* Read only one UDP packet per call to share resources more fairly, and also
enable stats to accurately count packets received */
int recvttl=1;
plen = recvwithttl(alarm->poll.fd,packet, sizeof(packet), &recvttl, &src_addr, &addrlen);
if (plen == -1) {
WHY_perror("recvwithttl(c)");
overlay_interface_close(interface);
return;
}
/* We have a frame from this interface */
if (debug&DEBUG_PACKETRX)
DEBUG_packet_visualise("Read from real interface", packet,plen);
if (debug&DEBUG_OVERLAYINTERFACES) DEBUGF("Received %d bytes on interface %s",plen,interface->name);
if (packetOk(interface,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) {
WHY("Malformed packet");
// Do we really want to attempt to parse it again?
//DEBUG_packet_visualise("Malformed packet", packet,plen);
}
}
/* We have a frame from this interface */
if (debug&DEBUG_PACKETRX)
DEBUG_packet_visualise("Read from real interface", packet,plen);
if (debug&DEBUG_OVERLAYINTERFACES) DEBUGF("Received %d bytes on interface %s",plen,interface->name);
if (packetOk(interface,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) {
WHY("Malformed packet");
// Do we really want to attempt to parse it again?
//DEBUG_packet_visualise("Malformed packet", packet,plen);
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
overlay_interface_close(interface);
}
}

View File

@ -972,137 +972,143 @@ int search_subscribers(struct subscriber *subscriber, void *context){
void overlay_mdp_poll(struct sched_ent *alarm)
{
unsigned char buffer[16384];
int ttl;
unsigned char recvaddrbuffer[1024];
struct sockaddr *recvaddr=(struct sockaddr *)&recvaddrbuffer[0];
socklen_t recvaddrlen=sizeof(recvaddrbuffer);
struct sockaddr_un *recvaddr_un=NULL;
if (alarm->poll.revents & POLLIN) {
unsigned char buffer[16384];
int ttl;
unsigned char recvaddrbuffer[1024];
struct sockaddr *recvaddr=(struct sockaddr *)&recvaddrbuffer[0];
socklen_t recvaddrlen=sizeof(recvaddrbuffer);
struct sockaddr_un *recvaddr_un=NULL;
ttl=-1;
bzero((void *)recvaddrbuffer,sizeof(recvaddrbuffer));
ttl=-1;
bzero((void *)recvaddrbuffer,sizeof(recvaddrbuffer));
ssize_t len = recvwithttl(alarm->poll.fd,buffer,sizeof(buffer),&ttl, recvaddr, &recvaddrlen);
recvaddr_un=(struct sockaddr_un *)recvaddr;
ssize_t len = recvwithttl(alarm->poll.fd,buffer,sizeof(buffer),&ttl, recvaddr, &recvaddrlen);
recvaddr_un=(struct sockaddr_un *)recvaddr;
if (len>0) {
/* Look at overlay_mdp_frame we have received */
overlay_mdp_frame *mdp=(overlay_mdp_frame *)&buffer[0];
unsigned int mdp_type = mdp->packetTypeAndFlags & MDP_TYPE_MASK;
if (len>0) {
/* Look at overlay_mdp_frame we have received */
overlay_mdp_frame *mdp=(overlay_mdp_frame *)&buffer[0];
unsigned int mdp_type = mdp->packetTypeAndFlags & MDP_TYPE_MASK;
switch (mdp_type) {
case MDP_GOODBYE:
if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_GOODBYE");
overlay_mdp_releasebindings(recvaddr_un,recvaddrlen);
return;
case MDP_NODEINFO:
if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_NODEINFO");
overlay_route_node_info(mdp,recvaddr_un,recvaddrlen);
return;
case MDP_GETADDRS:
if (debug & DEBUG_MDPREQUESTS)
DEBUGF("MDP_GETADDRS first_sid=%u last_sid=%u frame_sid_count=%u mode=%d",
mdp->addrlist.first_sid,
mdp->addrlist.last_sid,
mdp->addrlist.frame_sid_count,
mdp->addrlist.mode
);
{
overlay_mdp_frame mdpreply;
/* Work out which SIDs to get ... */
int sid_num=mdp->addrlist.first_sid;
int max_sid=mdp->addrlist.last_sid;
int max_sids=mdp->addrlist.frame_sid_count;
/* ... and constrain list for sanity */
if (sid_num<0) sid_num=0;
if (max_sids>MDP_MAX_SID_REQUEST) max_sids=MDP_MAX_SID_REQUEST;
if (max_sids<0) max_sids=0;
/* Prepare reply packet */
mdpreply.packetTypeAndFlags = MDP_ADDRLIST;
mdpreply.addrlist.mode = mdp->addrlist.mode;
mdpreply.addrlist.first_sid = sid_num;
mdpreply.addrlist.last_sid = max_sid;
mdpreply.addrlist.frame_sid_count = max_sids;
/* Populate with SIDs */
int i=0;
int count=0;
switch (mdp->addrlist.mode) {
case MDP_ADDRLIST_MODE_SELF: {
int cn=0,in=0,kp=0;
while(keyring_next_identity(keyring,&cn,&in,&kp)) {
if (count>=sid_num&&(i<max_sids))
bcopy(keyring->contexts[cn]->identities[in]
->keypairs[kp]->public_key,
mdpreply.addrlist.sids[i++],SID_SIZE);
in++; kp=0;
count++;
if (i>=max_sids)
break;
}
}
break;
case MDP_ADDRLIST_MODE_ROUTABLE_PEERS:
case MDP_ADDRLIST_MODE_ALL_PEERS: {
/* from peer list */
struct search_state state={
.mdp=mdp,
.mdpreply=&mdpreply,
.first=sid_num,
.max=max_sid,
};
enum_subscribers(NULL, search_subscribers, &state);
i=state.index;
count=state.count;
}
break;
}
mdpreply.addrlist.frame_sid_count = i;
mdpreply.addrlist.last_sid = sid_num + i - 1;
mdpreply.addrlist.server_sid_count = count;
if (debug & DEBUG_MDPREQUESTS)
DEBUGF("reply MDP_ADDRLIST first_sid=%u last_sid=%u frame_sid_count=%u server_sid_count=%u",
mdpreply.addrlist.first_sid,
mdpreply.addrlist.last_sid,
mdpreply.addrlist.frame_sid_count,
mdpreply.addrlist.server_sid_count
);
/* Send back to caller */
overlay_mdp_reply(alarm->poll.fd,
(struct sockaddr_un *)recvaddr,recvaddrlen,
&mdpreply);
switch (mdp_type) {
case MDP_GOODBYE:
if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_GOODBYE");
overlay_mdp_releasebindings(recvaddr_un,recvaddrlen);
return;
case MDP_NODEINFO:
if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_NODEINFO");
overlay_route_node_info(mdp,recvaddr_un,recvaddrlen);
return;
case MDP_GETADDRS:
if (debug & DEBUG_MDPREQUESTS)
DEBUGF("MDP_GETADDRS first_sid=%u last_sid=%u frame_sid_count=%u mode=%d",
mdp->addrlist.first_sid,
mdp->addrlist.last_sid,
mdp->addrlist.frame_sid_count,
mdp->addrlist.mode
);
{
overlay_mdp_frame mdpreply;
/* Work out which SIDs to get ... */
int sid_num=mdp->addrlist.first_sid;
int max_sid=mdp->addrlist.last_sid;
int max_sids=mdp->addrlist.frame_sid_count;
/* ... and constrain list for sanity */
if (sid_num<0) sid_num=0;
if (max_sids>MDP_MAX_SID_REQUEST) max_sids=MDP_MAX_SID_REQUEST;
if (max_sids<0) max_sids=0;
/* Prepare reply packet */
mdpreply.packetTypeAndFlags = MDP_ADDRLIST;
mdpreply.addrlist.mode = mdp->addrlist.mode;
mdpreply.addrlist.first_sid = sid_num;
mdpreply.addrlist.last_sid = max_sid;
mdpreply.addrlist.frame_sid_count = max_sids;
/* Populate with SIDs */
int i=0;
int count=0;
switch (mdp->addrlist.mode) {
case MDP_ADDRLIST_MODE_SELF: {
int cn=0,in=0,kp=0;
while(keyring_next_identity(keyring,&cn,&in,&kp)) {
if (count>=sid_num&&(i<max_sids))
bcopy(keyring->contexts[cn]->identities[in]
->keypairs[kp]->public_key,
mdpreply.addrlist.sids[i++],SID_SIZE);
in++; kp=0;
count++;
if (i>=max_sids)
break;
}
}
break;
case MDP_ADDRLIST_MODE_ROUTABLE_PEERS:
case MDP_ADDRLIST_MODE_ALL_PEERS: {
/* from peer list */
struct search_state state={
.mdp=mdp,
.mdpreply=&mdpreply,
.first=sid_num,
.max=max_sid,
};
enum_subscribers(NULL, search_subscribers, &state);
i=state.index;
count=state.count;
}
break;
}
mdpreply.addrlist.frame_sid_count = i;
mdpreply.addrlist.last_sid = sid_num + i - 1;
mdpreply.addrlist.server_sid_count = count;
if (debug & DEBUG_MDPREQUESTS)
DEBUGF("reply MDP_ADDRLIST first_sid=%u last_sid=%u frame_sid_count=%u server_sid_count=%u",
mdpreply.addrlist.first_sid,
mdpreply.addrlist.last_sid,
mdpreply.addrlist.frame_sid_count,
mdpreply.addrlist.server_sid_count
);
/* Send back to caller */
overlay_mdp_reply(alarm->poll.fd,
(struct sockaddr_un *)recvaddr,recvaddrlen,
&mdpreply);
return;
}
break;
case MDP_TX: /* Send payload (and don't treat it as system privileged) */
if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_TX");
overlay_mdp_dispatch(mdp,1,(struct sockaddr_un*)recvaddr,recvaddrlen);
return;
break;
case MDP_BIND: /* Bind to port */
if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_BIND");
overlay_mdp_process_bind_request(alarm->poll.fd,mdp, recvaddr_un, recvaddrlen);
return;
break;
default:
/* Client is not allowed to send any other frame type */
WARNF("Unsupported MDP frame type: %d", mdp_type);
mdp->packetTypeAndFlags=MDP_ERROR;
mdp->error.error=2;
snprintf(mdp->error.message,128,"Illegal request type. Clients may use only MDP_TX or MDP_BIND.");
int len=4+4+strlen(mdp->error.message)+1;
errno=0;
/* We ignore the result of the following, because it is just sending an
error message back to the client. If this fails, where would we report
the error to? My point exactly. */
sendto(alarm->poll.fd,mdp,len,0,(struct sockaddr *)recvaddr,recvaddrlen);
}
break;
case MDP_TX: /* Send payload (and don't treat it as system privileged) */
if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_TX");
overlay_mdp_dispatch(mdp,1,(struct sockaddr_un*)recvaddr,recvaddrlen);
return;
break;
case MDP_BIND: /* Bind to port */
if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_BIND");
overlay_mdp_process_bind_request(alarm->poll.fd,mdp, recvaddr_un, recvaddrlen);
return;
break;
default:
/* Client is not allowed to send any other frame type */
WARNF("Unsupported MDP frame type: %d", mdp_type);
mdp->packetTypeAndFlags=MDP_ERROR;
mdp->error.error=2;
snprintf(mdp->error.message,128,"Illegal request type. Clients may use only MDP_TX or MDP_BIND.");
int len=4+4+strlen(mdp->error.message)+1;
errno=0;
/* We ignore the result of the following, because it is just sending an
error message back to the client. If this fails, where would we report
the error to? My point exactly. */
sendto(alarm->poll.fd,mdp,len,0,(struct sockaddr *)recvaddr,recvaddrlen);
}
}
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
INFO("Error on mdp socket");
}
return;
}

View File

@ -827,134 +827,140 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
return;
}
switch(q->state) {
case RHIZOME_FETCH_CONNECTING:
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
rhizome_fetch_write(q);
break;
case RHIZOME_FETCH_RXFILE: {
/* Keep reading until we have the promised amount of data */
char buffer[8192];
sigPipeFlag = 0;
int bytes = read_nonblock(q->alarm.poll.fd, buffer, sizeof buffer);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes > 0) {
rhizome_write_content(q, buffer, bytes);
} else {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Empty read, closing connection");
rhizome_fetch_close(q);
return;
}
if (sigPipeFlag) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Received SIGPIPE, closing connection");
rhizome_fetch_close(q);
return;
}
}
break;
case RHIZOME_FETCH_RXHTTPHEADERS: {
/* Keep reading until we have two CR/LFs in a row */
sigPipeFlag = 0;
int bytes = read_nonblock(q->alarm.poll.fd, &q->request[q->request_len], 1024 - q->request_len - 1);
/* If we got some data, see if we have found the end of the HTTP reply */
if (bytes > 0) {
// reset timeout
unschedule(&q->alarm);
q->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
q->request_len += bytes;
if (http_header_complete(q->request, q->request_len, bytes + 4)) {
if (alarm->poll.revents & (POLLIN | POLLOUT)) {
switch(q->state) {
case RHIZOME_FETCH_CONNECTING:
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
rhizome_fetch_write(q);
break;
case RHIZOME_FETCH_RXFILE: {
/* Keep reading until we have the promised amount of data */
char buffer[8192];
sigPipeFlag = 0;
int bytes = read_nonblock(q->alarm.poll.fd, buffer, sizeof buffer);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes > 0) {
rhizome_write_content(q, buffer, bytes);
} else {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Got HTTP reply: %s", alloca_toprint(160, q->request, q->request_len));
/* We have all the reply headers, so parse them, taking care of any following bytes of
content. */
char *p = NULL;
if (!str_startswith(q->request, "HTTP/1.0 ", &p)) {
if (debug&DEBUG_RHIZOME_RX)
DEBUGF("Malformed HTTP reply: missing HTTP/1.0 preamble");
rhizome_fetch_close(q);
return;
}
int http_response_code = 0;
char *nump;
for (nump = p; isdigit(*p); ++p)
http_response_code = http_response_code * 10 + *p - '0';
if (p == nump || *p != ' ') {
if (debug&DEBUG_RHIZOME_RX)
DEBUGF("Malformed HTTP reply: missing decimal status code");
rhizome_fetch_close(q);
return;
}
if (http_response_code != 200) {
DEBUG("Empty read, closing connection");
rhizome_fetch_close(q);
return;
}
if (sigPipeFlag) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Received SIGPIPE, closing connection");
rhizome_fetch_close(q);
return;
}
}
break;
case RHIZOME_FETCH_RXHTTPHEADERS: {
/* Keep reading until we have two CR/LFs in a row */
sigPipeFlag = 0;
int bytes = read_nonblock(q->alarm.poll.fd, &q->request[q->request_len], 1024 - q->request_len - 1);
/* If we got some data, see if we have found the end of the HTTP reply */
if (bytes > 0) {
// reset timeout
unschedule(&q->alarm);
q->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
q->request_len += bytes;
if (http_header_complete(q->request, q->request_len, bytes + 4)) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", http_response_code);
rhizome_fetch_close(q);
return;
}
// This loop will terminate, because http_header_complete() above found at least
// "\n\n" at the end of the header, and probably "\r\n\r\n".
while (*p++ != '\n')
;
// Iterate over header lines until the last blank line.
long long content_length = -1;
while (*p != '\r' && *p != '\n') {
if (strcase_startswith(p, "Content-Length:", &p)) {
while (*p == ' ')
++p;
content_length = 0;
for (nump = p; isdigit(*p); ++p)
content_length = content_length * 10 + *p - '0';
if (p == nump || (*p != '\r' && *p != '\n')) {
if (debug & DEBUG_RHIZOME_RX) {
DEBUGF("Invalid HTTP reply: malformed Content-Length header");
rhizome_fetch_close(q);
return;
}
}
DEBUGF("Got HTTP reply: %s", alloca_toprint(160, q->request, q->request_len));
/* We have all the reply headers, so parse them, taking care of any following bytes of
content. */
char *p = NULL;
if (!str_startswith(q->request, "HTTP/1.0 ", &p)) {
if (debug&DEBUG_RHIZOME_RX)
DEBUGF("Malformed HTTP reply: missing HTTP/1.0 preamble");
rhizome_fetch_close(q);
return;
}
int http_response_code = 0;
char *nump;
for (nump = p; isdigit(*p); ++p)
http_response_code = http_response_code * 10 + *p - '0';
if (p == nump || *p != ' ') {
if (debug&DEBUG_RHIZOME_RX)
DEBUGF("Malformed HTTP reply: missing decimal status code");
rhizome_fetch_close(q);
return;
}
if (http_response_code != 200) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", http_response_code);
rhizome_fetch_close(q);
return;
}
// This loop will terminate, because http_header_complete() above found at least
// "\n\n" at the end of the header, and probably "\r\n\r\n".
while (*p++ != '\n')
;
// Iterate over header lines until the last blank line.
long long content_length = -1;
while (*p != '\r' && *p != '\n') {
if (strcase_startswith(p, "Content-Length:", &p)) {
while (*p == ' ')
++p;
content_length = 0;
for (nump = p; isdigit(*p); ++p)
content_length = content_length * 10 + *p - '0';
if (p == nump || (*p != '\r' && *p != '\n')) {
if (debug & DEBUG_RHIZOME_RX) {
DEBUGF("Invalid HTTP reply: malformed Content-Length header");
rhizome_fetch_close(q);
return;
}
}
}
while (*p++ != '\n')
;
}
if (*p == '\r')
++p;
++p; // skip '\n' at end of blank line
if (content_length == -1) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Invalid HTTP reply: missing Content-Length header");
rhizome_fetch_close(q);
return;
}
q->file_len = content_length;
/* We have all we need. The file is already open, so just write out any initial bytes of
the body we read.
*/
q->state = RHIZOME_FETCH_RXFILE;
int content_bytes = q->request + q->request_len - p;
if (content_bytes > 0)
rhizome_write_content(q, p, content_bytes);
}
if (*p == '\r')
++p;
++p; // skip '\n' at end of blank line
if (content_length == -1) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Invalid HTTP reply: missing Content-Length header");
rhizome_fetch_close(q);
return;
}
q->file_len = content_length;
/* We have all we need. The file is already open, so just write out any initial bytes of
the body we read.
*/
q->state = RHIZOME_FETCH_RXFILE;
int content_bytes = q->request + q->request_len - p;
if (content_bytes > 0)
rhizome_write_content(q, p, content_bytes);
} else {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Empty read, closing connection");
rhizome_fetch_close(q);
return;
}
if (sigPipeFlag) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Received SIGPIPE, closing connection");
rhizome_fetch_close(q);
return;
}
} else {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Empty read, closing connection");
rhizome_fetch_close(q);
return;
}
if (sigPipeFlag) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Received SIGPIPE, closing connection");
rhizome_fetch_close(q);
return;
}
break;
default:
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state.");
rhizome_fetch_close(q);
return;
}
break;
default:
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state.");
rhizome_fetch_close(q);
return;
}
}
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
rhizome_fetch_close(q);
}
return;
}

View File

@ -261,46 +261,52 @@ void rhizome_client_poll(struct sched_ent *alarm)
void rhizome_server_poll(struct sched_ent *alarm)
{
struct sockaddr addr;
unsigned int addr_len = sizeof addr;
int sock;
while ((sock = accept(rhizome_server_socket, &addr, &addr_len)) != -1) {
if (addr.sa_family == AF_INET) {
struct sockaddr_in *peerip = (struct sockaddr_in *)&addr;
INFOF("RHIZOME HTTP SERVER, ACCEPT addrlen=%u family=%u port=%u addr=%u.%u.%u.%u",
addr_len, peerip->sin_family, peerip->sin_port,
((unsigned char*)&peerip->sin_addr.s_addr)[0],
((unsigned char*)&peerip->sin_addr.s_addr)[1],
((unsigned char*)&peerip->sin_addr.s_addr)[2],
((unsigned char*)&peerip->sin_addr.s_addr)[3]
);
} else {
INFOF("RHIZOME HTTP SERVER, ACCEPT addrlen=%u family=%u data=%s",
if (alarm->poll.revents & (POLLIN | POLLOUT)) {
struct sockaddr addr;
unsigned int addr_len = sizeof addr;
int sock;
while ((sock = accept(rhizome_server_socket, &addr, &addr_len)) != -1) {
if (addr.sa_family == AF_INET) {
struct sockaddr_in *peerip = (struct sockaddr_in *)&addr;
INFOF("RHIZOME HTTP SERVER, ACCEPT addrlen=%u family=%u port=%u addr=%u.%u.%u.%u",
addr_len, peerip->sin_family, peerip->sin_port,
((unsigned char*)&peerip->sin_addr.s_addr)[0],
((unsigned char*)&peerip->sin_addr.s_addr)[1],
((unsigned char*)&peerip->sin_addr.s_addr)[2],
((unsigned char*)&peerip->sin_addr.s_addr)[3]
);
} else {
INFOF("RHIZOME HTTP SERVER, ACCEPT addrlen=%u family=%u data=%s",
addr_len, addr.sa_family, alloca_tohex((unsigned char *)addr.sa_data, sizeof addr.sa_data)
);
}
rhizome_http_request *request = calloc(sizeof(rhizome_http_request), 1);
if (request == NULL) {
WHYF_perror("calloc(%u, 1)", sizeof(rhizome_http_request));
WHY("Cannot respond to request, out of memory");
} else {
/* We are now trying to read the HTTP request */
request->request_type=RHIZOME_HTTP_REQUEST_RECEIVING;
request->alarm.function = rhizome_client_poll;
connection_stats.name="rhizome_client_poll";
request->alarm.stats=&connection_stats;
request->alarm.poll.fd=sock;
request->alarm.poll.events=POLLIN;
request->alarm.alarm = gettime_ms()+RHIZOME_IDLE_TIMEOUT;
request->alarm.deadline = request->alarm.alarm+RHIZOME_IDLE_TIMEOUT;
// watch for the incoming http request
watch(&request->alarm);
// set an inactivity timeout to close the connection
schedule(&request->alarm);
}
}
rhizome_http_request *request = calloc(sizeof(rhizome_http_request), 1);
if (request == NULL) {
WHYF_perror("calloc(%u, 1)", sizeof(rhizome_http_request));
WHY("Cannot respond to request, out of memory");
} else {
/* We are now trying to read the HTTP request */
request->request_type=RHIZOME_HTTP_REQUEST_RECEIVING;
request->alarm.function = rhizome_client_poll;
connection_stats.name="rhizome_client_poll";
request->alarm.stats=&connection_stats;
request->alarm.poll.fd=sock;
request->alarm.poll.events=POLLIN;
request->alarm.alarm = gettime_ms()+RHIZOME_IDLE_TIMEOUT;
request->alarm.deadline = request->alarm.alarm+RHIZOME_IDLE_TIMEOUT;
// watch for the incoming http request
watch(&request->alarm);
// set an inactivity timeout to close the connection
schedule(&request->alarm);
if (errno != EAGAIN) {
WARN_perror("accept");
}
}
if (errno != EAGAIN) {
WARN_perror("accept");
if (alarm->poll.revents & (POLLHUP | POLLERR)) {
INFO("Error on tcp listen socket");
}
}