diff --git a/monitor.c b/monitor.c index 1771eddb..2e16845f 100644 --- a/monitor.c +++ b/monitor.c @@ -168,6 +168,8 @@ void monitor_poll(struct sched_ent *alarm) void monitor_client_close(struct monitor_context *c){ struct monitor_context *last; + INFO("Tearing down monitor client"); + unwatch(&c->alarm); close(c->alarm.poll.fd); c->alarm.poll.fd=-1; @@ -189,75 +191,79 @@ void monitor_client_poll(struct sched_ent *alarm) errno=0; int bytes; - switch(c->state) { - case MONITOR_STATE_COMMAND: - bytes = 1; - while(bytes == 1) { - if (c->line_length >= MONITOR_LINE_LENGTH) { - /* line too long */ - c->line[MONITOR_LINE_LENGTH-1] = 0; - monitor_process_command(c); - bytes = -1; - break; - } - bytes = read(c->alarm.poll.fd, &c->line[c->line_length], 1); - if (bytes < 1) { - switch(errno) { - case EINTR: - case ENOTRECOVERABLE: - /* transient errors */ - WHY_perror("read"); + if (alarm->poll.revents & POLLIN) { + switch(c->state) { + case MONITOR_STATE_COMMAND: + bytes = 1; + while(bytes == 1) { + if (c->line_length >= MONITOR_LINE_LENGTH) { + /* line too long */ + c->line[MONITOR_LINE_LENGTH-1] = 0; + monitor_process_command(c); + bytes = -1; break; - case EAGAIN: - break; - default: - WHY_perror("read"); - /* all other errors; close socket */ - INFO("Tearing down monitor client"); - monitor_client_close(c); - return; } - } - if (bytes > 0 && (c->line[c->line_length] != '\r')) { - c->line_length += bytes; - if (c->line[c->line_length-1] == '\n') { - /* got command */ - c->line[c->line_length-1] = 0; /* trim new line for easier parsing */ - monitor_process_command(c); + bytes = read(c->alarm.poll.fd, &c->line[c->line_length], 1); + if (bytes < 1) { + switch(errno) { + case EINTR: + case ENOTRECOVERABLE: + /* transient errors */ + WHY_perror("read"); break; + case EAGAIN: + break; + default: + WHY_perror("read"); + /* all other errors; close socket */ + monitor_client_close(c); + return; } } - } - break; - case MONITOR_STATE_DATA: - bytes = read(c->alarm.poll.fd, - &c->buffer[c->data_offset], - c->data_expected-c->data_offset); - if (bytes < 1) { - switch(errno) { - case EAGAIN: case EINTR: - /* transient errors */ - break; - default: - /* all other errors; close socket */ - WHYF("Tearing down monitor client due to errno=%d", - errno); - monitor_client_close(c); - return; + if (bytes > 0 && (c->line[c->line_length] != '\r')) { + c->line_length += bytes; + if (c->line[c->line_length-1] == '\n') { + /* got command */ + c->line[c->line_length-1] = 0; /* trim new line for easier parsing */ + monitor_process_command(c); + break; + } + } } - } else { - c->data_offset += bytes; - if (c->data_offset >= c->data_expected) - { - /* we have the binary data we were expecting. */ - monitor_process_data(c); - c->state = MONITOR_STATE_COMMAND; + break; + case MONITOR_STATE_DATA: + bytes = read(c->alarm.poll.fd, + &c->buffer[c->data_offset], + c->data_expected-c->data_offset); + if (bytes < 1) { + switch(errno) { + case EAGAIN: case EINTR: + /* transient errors */ + break; + default: + /* all other errors; close socket */ + WHYF("Tearing down monitor client due to errno=%d", + errno); + monitor_client_close(c); + return; } + } else { + c->data_offset += bytes; + if (c->data_offset >= c->data_expected) + { + /* we have the binary data we were expecting. */ + monitor_process_data(c); + c->state = MONITOR_STATE_COMMAND; + } + } + break; + default: + c->state = MONITOR_STATE_COMMAND; + WHY("fixed monitor connection state"); } - break; - default: - c->state = MONITOR_STATE_COMMAND; - WHY("fixed monitor connection state"); + } + if (alarm->poll.revents & (POLLHUP | POLLERR)) { + monitor_client_close(c); } return; } diff --git a/overlay_interface.c b/overlay_interface.c index 3f1caea7..c52a8f39 100644 --- a/overlay_interface.c +++ b/overlay_interface.c @@ -254,50 +254,58 @@ error: // So we have to bind a socket to INADDR_ANY to receive these packets. static void overlay_interface_read_any(struct sched_ent *alarm){ - int plen=0; - int recvttl=1; - int i; - unsigned char packet[16384]; - overlay_interface *interface=NULL; - struct sockaddr src_addr; - socklen_t addrlen = sizeof(src_addr); + if (alarm->poll.revents & POLLIN) { + int plen=0; + int recvttl=1; + int i; + unsigned char packet[16384]; + overlay_interface *interface=NULL; + struct sockaddr src_addr; + socklen_t addrlen = sizeof(src_addr); - /* Read only one UDP packet per call to share resources more fairly, and also - enable stats to accurately count packets received */ - plen = recvwithttl(alarm->poll.fd, packet, sizeof(packet), &recvttl, &src_addr, &addrlen); - if (plen == -1) { - WHY_perror("recvwithttl(c)"); - unwatch(alarm); - close(alarm->poll.fd); - return; - } + /* Read only one UDP packet per call to share resources more fairly, and also + enable stats to accurately count packets received */ + plen = recvwithttl(alarm->poll.fd, packet, sizeof(packet), &recvttl, &src_addr, &addrlen); + if (plen == -1) { + WHY_perror("recvwithttl(c)"); + unwatch(alarm); + close(alarm->poll.fd); + return; + } - struct in_addr src = ((struct sockaddr_in *)&src_addr)->sin_addr; + struct in_addr src = ((struct sockaddr_in *)&src_addr)->sin_addr; - /* Try to identify the real interface that the packet arrived on */ - for (i=0;ipoll.revents & (POLLHUP | POLLERR)) { + INFO("Closing broadcast socket due to error"); + unwatch(alarm); + close(alarm->poll.fd); + alarm->poll.fd=-1; } } @@ -478,11 +486,6 @@ overlay_interface_init(char *name, struct in_addr src_addr, struct in_addr netma static void overlay_interface_poll(struct sched_ent *alarm) { struct overlay_interface *interface = (overlay_interface *)alarm; - int plen=0; - unsigned char packet[16384]; - - struct sockaddr src_addr; - socklen_t addrlen = sizeof(src_addr); if (alarm->poll.revents==0){ @@ -499,24 +502,37 @@ static void overlay_interface_poll(struct sched_ent *alarm) return; } - /* Read only one UDP packet per call to share resources more fairly, and also - enable stats to accurately count packets received */ - int recvttl=1; - plen = recvwithttl(alarm->poll.fd,packet, sizeof(packet), &recvttl, &src_addr, &addrlen); - if (plen == -1) { - WHY_perror("recvwithttl(c)"); - overlay_interface_close(interface); - return; + if (alarm->poll.revents & POLLIN) { + int plen=0; + unsigned char packet[16384]; + + struct sockaddr src_addr; + socklen_t addrlen = sizeof(src_addr); + + + /* Read only one UDP packet per call to share resources more fairly, and also + enable stats to accurately count packets received */ + int recvttl=1; + plen = recvwithttl(alarm->poll.fd,packet, sizeof(packet), &recvttl, &src_addr, &addrlen); + if (plen == -1) { + WHY_perror("recvwithttl(c)"); + overlay_interface_close(interface); + return; + } + + /* We have a frame from this interface */ + if (debug&DEBUG_PACKETRX) + DEBUG_packet_visualise("Read from real interface", packet,plen); + if (debug&DEBUG_OVERLAYINTERFACES) DEBUGF("Received %d bytes on interface %s",plen,interface->name); + if (packetOk(interface,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) { + WHY("Malformed packet"); + // Do we really want to attempt to parse it again? + //DEBUG_packet_visualise("Malformed packet", packet,plen); + } } - /* We have a frame from this interface */ - if (debug&DEBUG_PACKETRX) - DEBUG_packet_visualise("Read from real interface", packet,plen); - if (debug&DEBUG_OVERLAYINTERFACES) DEBUGF("Received %d bytes on interface %s",plen,interface->name); - if (packetOk(interface,packet,plen,NULL,recvttl,&src_addr,addrlen,1)) { - WHY("Malformed packet"); - // Do we really want to attempt to parse it again? - //DEBUG_packet_visualise("Malformed packet", packet,plen); + if (alarm->poll.revents & (POLLHUP | POLLERR)) { + overlay_interface_close(interface); } } diff --git a/overlay_mdp.c b/overlay_mdp.c index cca26a62..e6be633c 100644 --- a/overlay_mdp.c +++ b/overlay_mdp.c @@ -972,137 +972,143 @@ int search_subscribers(struct subscriber *subscriber, void *context){ void overlay_mdp_poll(struct sched_ent *alarm) { - unsigned char buffer[16384]; - int ttl; - unsigned char recvaddrbuffer[1024]; - struct sockaddr *recvaddr=(struct sockaddr *)&recvaddrbuffer[0]; - socklen_t recvaddrlen=sizeof(recvaddrbuffer); - struct sockaddr_un *recvaddr_un=NULL; + if (alarm->poll.revents & POLLIN) { + unsigned char buffer[16384]; + int ttl; + unsigned char recvaddrbuffer[1024]; + struct sockaddr *recvaddr=(struct sockaddr *)&recvaddrbuffer[0]; + socklen_t recvaddrlen=sizeof(recvaddrbuffer); + struct sockaddr_un *recvaddr_un=NULL; - ttl=-1; - bzero((void *)recvaddrbuffer,sizeof(recvaddrbuffer)); + ttl=-1; + bzero((void *)recvaddrbuffer,sizeof(recvaddrbuffer)); - ssize_t len = recvwithttl(alarm->poll.fd,buffer,sizeof(buffer),&ttl, recvaddr, &recvaddrlen); - recvaddr_un=(struct sockaddr_un *)recvaddr; + ssize_t len = recvwithttl(alarm->poll.fd,buffer,sizeof(buffer),&ttl, recvaddr, &recvaddrlen); + recvaddr_un=(struct sockaddr_un *)recvaddr; - if (len>0) { - /* Look at overlay_mdp_frame we have received */ - overlay_mdp_frame *mdp=(overlay_mdp_frame *)&buffer[0]; - unsigned int mdp_type = mdp->packetTypeAndFlags & MDP_TYPE_MASK; + if (len>0) { + /* Look at overlay_mdp_frame we have received */ + overlay_mdp_frame *mdp=(overlay_mdp_frame *)&buffer[0]; + unsigned int mdp_type = mdp->packetTypeAndFlags & MDP_TYPE_MASK; - switch (mdp_type) { - case MDP_GOODBYE: - if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_GOODBYE"); - overlay_mdp_releasebindings(recvaddr_un,recvaddrlen); - return; - case MDP_NODEINFO: - if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_NODEINFO"); - overlay_route_node_info(mdp,recvaddr_un,recvaddrlen); - return; - case MDP_GETADDRS: - if (debug & DEBUG_MDPREQUESTS) - DEBUGF("MDP_GETADDRS first_sid=%u last_sid=%u frame_sid_count=%u mode=%d", - mdp->addrlist.first_sid, - mdp->addrlist.last_sid, - mdp->addrlist.frame_sid_count, - mdp->addrlist.mode - ); - { - overlay_mdp_frame mdpreply; - - /* Work out which SIDs to get ... */ - int sid_num=mdp->addrlist.first_sid; - int max_sid=mdp->addrlist.last_sid; - int max_sids=mdp->addrlist.frame_sid_count; - /* ... and constrain list for sanity */ - if (sid_num<0) sid_num=0; - if (max_sids>MDP_MAX_SID_REQUEST) max_sids=MDP_MAX_SID_REQUEST; - if (max_sids<0) max_sids=0; - - /* Prepare reply packet */ - mdpreply.packetTypeAndFlags = MDP_ADDRLIST; - mdpreply.addrlist.mode = mdp->addrlist.mode; - mdpreply.addrlist.first_sid = sid_num; - mdpreply.addrlist.last_sid = max_sid; - mdpreply.addrlist.frame_sid_count = max_sids; - - /* Populate with SIDs */ - int i=0; - int count=0; - switch (mdp->addrlist.mode) { - case MDP_ADDRLIST_MODE_SELF: { - int cn=0,in=0,kp=0; - while(keyring_next_identity(keyring,&cn,&in,&kp)) { - if (count>=sid_num&&(icontexts[cn]->identities[in] - ->keypairs[kp]->public_key, - mdpreply.addrlist.sids[i++],SID_SIZE); - in++; kp=0; - count++; - if (i>=max_sids) - break; - } - } - break; - case MDP_ADDRLIST_MODE_ROUTABLE_PEERS: - case MDP_ADDRLIST_MODE_ALL_PEERS: { - /* from peer list */ - struct search_state state={ - .mdp=mdp, - .mdpreply=&mdpreply, - .first=sid_num, - .max=max_sid, - }; - - enum_subscribers(NULL, search_subscribers, &state); - i=state.index; - count=state.count; - } - break; - } - mdpreply.addrlist.frame_sid_count = i; - mdpreply.addrlist.last_sid = sid_num + i - 1; - mdpreply.addrlist.server_sid_count = count; - - if (debug & DEBUG_MDPREQUESTS) - DEBUGF("reply MDP_ADDRLIST first_sid=%u last_sid=%u frame_sid_count=%u server_sid_count=%u", - mdpreply.addrlist.first_sid, - mdpreply.addrlist.last_sid, - mdpreply.addrlist.frame_sid_count, - mdpreply.addrlist.server_sid_count - ); - - /* Send back to caller */ - overlay_mdp_reply(alarm->poll.fd, - (struct sockaddr_un *)recvaddr,recvaddrlen, - &mdpreply); + switch (mdp_type) { + case MDP_GOODBYE: + if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_GOODBYE"); + overlay_mdp_releasebindings(recvaddr_un,recvaddrlen); return; + case MDP_NODEINFO: + if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_NODEINFO"); + overlay_route_node_info(mdp,recvaddr_un,recvaddrlen); + return; + case MDP_GETADDRS: + if (debug & DEBUG_MDPREQUESTS) + DEBUGF("MDP_GETADDRS first_sid=%u last_sid=%u frame_sid_count=%u mode=%d", + mdp->addrlist.first_sid, + mdp->addrlist.last_sid, + mdp->addrlist.frame_sid_count, + mdp->addrlist.mode + ); + { + overlay_mdp_frame mdpreply; + + /* Work out which SIDs to get ... */ + int sid_num=mdp->addrlist.first_sid; + int max_sid=mdp->addrlist.last_sid; + int max_sids=mdp->addrlist.frame_sid_count; + /* ... and constrain list for sanity */ + if (sid_num<0) sid_num=0; + if (max_sids>MDP_MAX_SID_REQUEST) max_sids=MDP_MAX_SID_REQUEST; + if (max_sids<0) max_sids=0; + + /* Prepare reply packet */ + mdpreply.packetTypeAndFlags = MDP_ADDRLIST; + mdpreply.addrlist.mode = mdp->addrlist.mode; + mdpreply.addrlist.first_sid = sid_num; + mdpreply.addrlist.last_sid = max_sid; + mdpreply.addrlist.frame_sid_count = max_sids; + + /* Populate with SIDs */ + int i=0; + int count=0; + switch (mdp->addrlist.mode) { + case MDP_ADDRLIST_MODE_SELF: { + int cn=0,in=0,kp=0; + while(keyring_next_identity(keyring,&cn,&in,&kp)) { + if (count>=sid_num&&(icontexts[cn]->identities[in] + ->keypairs[kp]->public_key, + mdpreply.addrlist.sids[i++],SID_SIZE); + in++; kp=0; + count++; + if (i>=max_sids) + break; + } + } + break; + case MDP_ADDRLIST_MODE_ROUTABLE_PEERS: + case MDP_ADDRLIST_MODE_ALL_PEERS: { + /* from peer list */ + struct search_state state={ + .mdp=mdp, + .mdpreply=&mdpreply, + .first=sid_num, + .max=max_sid, + }; + + enum_subscribers(NULL, search_subscribers, &state); + i=state.index; + count=state.count; + } + break; + } + mdpreply.addrlist.frame_sid_count = i; + mdpreply.addrlist.last_sid = sid_num + i - 1; + mdpreply.addrlist.server_sid_count = count; + + if (debug & DEBUG_MDPREQUESTS) + DEBUGF("reply MDP_ADDRLIST first_sid=%u last_sid=%u frame_sid_count=%u server_sid_count=%u", + mdpreply.addrlist.first_sid, + mdpreply.addrlist.last_sid, + mdpreply.addrlist.frame_sid_count, + mdpreply.addrlist.server_sid_count + ); + + /* Send back to caller */ + overlay_mdp_reply(alarm->poll.fd, + (struct sockaddr_un *)recvaddr,recvaddrlen, + &mdpreply); + return; + } + break; + case MDP_TX: /* Send payload (and don't treat it as system privileged) */ + if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_TX"); + overlay_mdp_dispatch(mdp,1,(struct sockaddr_un*)recvaddr,recvaddrlen); + return; + break; + case MDP_BIND: /* Bind to port */ + if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_BIND"); + overlay_mdp_process_bind_request(alarm->poll.fd,mdp, recvaddr_un, recvaddrlen); + return; + break; + default: + /* Client is not allowed to send any other frame type */ + WARNF("Unsupported MDP frame type: %d", mdp_type); + mdp->packetTypeAndFlags=MDP_ERROR; + mdp->error.error=2; + snprintf(mdp->error.message,128,"Illegal request type. Clients may use only MDP_TX or MDP_BIND."); + int len=4+4+strlen(mdp->error.message)+1; + errno=0; + /* We ignore the result of the following, because it is just sending an + error message back to the client. If this fails, where would we report + the error to? My point exactly. */ + sendto(alarm->poll.fd,mdp,len,0,(struct sockaddr *)recvaddr,recvaddrlen); } - break; - case MDP_TX: /* Send payload (and don't treat it as system privileged) */ - if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_TX"); - overlay_mdp_dispatch(mdp,1,(struct sockaddr_un*)recvaddr,recvaddrlen); - return; - break; - case MDP_BIND: /* Bind to port */ - if (debug & DEBUG_MDPREQUESTS) DEBUG("MDP_BIND"); - overlay_mdp_process_bind_request(alarm->poll.fd,mdp, recvaddr_un, recvaddrlen); - return; - break; - default: - /* Client is not allowed to send any other frame type */ - WARNF("Unsupported MDP frame type: %d", mdp_type); - mdp->packetTypeAndFlags=MDP_ERROR; - mdp->error.error=2; - snprintf(mdp->error.message,128,"Illegal request type. Clients may use only MDP_TX or MDP_BIND."); - int len=4+4+strlen(mdp->error.message)+1; - errno=0; - /* We ignore the result of the following, because it is just sending an - error message back to the client. If this fails, where would we report - the error to? My point exactly. */ - sendto(alarm->poll.fd,mdp,len,0,(struct sockaddr *)recvaddr,recvaddrlen); } } + + if (alarm->poll.revents & (POLLHUP | POLLERR)) { + INFO("Error on mdp socket"); + } return; } diff --git a/rhizome_fetch.c b/rhizome_fetch.c index 814f5d4d..861d98e9 100644 --- a/rhizome_fetch.c +++ b/rhizome_fetch.c @@ -827,134 +827,140 @@ void rhizome_fetch_poll(struct sched_ent *alarm) return; } - switch(q->state) { - case RHIZOME_FETCH_CONNECTING: - case RHIZOME_FETCH_SENDINGHTTPREQUEST: - rhizome_fetch_write(q); - break; - case RHIZOME_FETCH_RXFILE: { - /* Keep reading until we have the promised amount of data */ - char buffer[8192]; - sigPipeFlag = 0; - int bytes = read_nonblock(q->alarm.poll.fd, buffer, sizeof buffer); - /* If we got some data, see if we have found the end of the HTTP request */ - if (bytes > 0) { - rhizome_write_content(q, buffer, bytes); - } else { - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Empty read, closing connection"); - rhizome_fetch_close(q); - return; - } - if (sigPipeFlag) { - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Received SIGPIPE, closing connection"); - rhizome_fetch_close(q); - return; - } - } - break; - case RHIZOME_FETCH_RXHTTPHEADERS: { - /* Keep reading until we have two CR/LFs in a row */ - sigPipeFlag = 0; - int bytes = read_nonblock(q->alarm.poll.fd, &q->request[q->request_len], 1024 - q->request_len - 1); - /* If we got some data, see if we have found the end of the HTTP reply */ - if (bytes > 0) { - // reset timeout - unschedule(&q->alarm); - q->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT; - q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT; - schedule(&q->alarm); - q->request_len += bytes; - if (http_header_complete(q->request, q->request_len, bytes + 4)) { + if (alarm->poll.revents & (POLLIN | POLLOUT)) { + switch(q->state) { + case RHIZOME_FETCH_CONNECTING: + case RHIZOME_FETCH_SENDINGHTTPREQUEST: + rhizome_fetch_write(q); + break; + case RHIZOME_FETCH_RXFILE: { + /* Keep reading until we have the promised amount of data */ + char buffer[8192]; + sigPipeFlag = 0; + int bytes = read_nonblock(q->alarm.poll.fd, buffer, sizeof buffer); + /* If we got some data, see if we have found the end of the HTTP request */ + if (bytes > 0) { + rhizome_write_content(q, buffer, bytes); + } else { if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Got HTTP reply: %s", alloca_toprint(160, q->request, q->request_len)); - /* We have all the reply headers, so parse them, taking care of any following bytes of - content. */ - char *p = NULL; - if (!str_startswith(q->request, "HTTP/1.0 ", &p)) { - if (debug&DEBUG_RHIZOME_RX) - DEBUGF("Malformed HTTP reply: missing HTTP/1.0 preamble"); - rhizome_fetch_close(q); - return; - } - int http_response_code = 0; - char *nump; - for (nump = p; isdigit(*p); ++p) - http_response_code = http_response_code * 10 + *p - '0'; - if (p == nump || *p != ' ') { - if (debug&DEBUG_RHIZOME_RX) - DEBUGF("Malformed HTTP reply: missing decimal status code"); - rhizome_fetch_close(q); - return; - } - if (http_response_code != 200) { + DEBUG("Empty read, closing connection"); + rhizome_fetch_close(q); + return; + } + if (sigPipeFlag) { + if (debug & DEBUG_RHIZOME_RX) + DEBUG("Received SIGPIPE, closing connection"); + rhizome_fetch_close(q); + return; + } + } + break; + case RHIZOME_FETCH_RXHTTPHEADERS: { + /* Keep reading until we have two CR/LFs in a row */ + sigPipeFlag = 0; + int bytes = read_nonblock(q->alarm.poll.fd, &q->request[q->request_len], 1024 - q->request_len - 1); + /* If we got some data, see if we have found the end of the HTTP reply */ + if (bytes > 0) { + // reset timeout + unschedule(&q->alarm); + q->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT; + q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT; + schedule(&q->alarm); + q->request_len += bytes; + if (http_header_complete(q->request, q->request_len, bytes + 4)) { if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", http_response_code); - rhizome_fetch_close(q); - return; - } - // This loop will terminate, because http_header_complete() above found at least - // "\n\n" at the end of the header, and probably "\r\n\r\n". - while (*p++ != '\n') - ; - // Iterate over header lines until the last blank line. - long long content_length = -1; - while (*p != '\r' && *p != '\n') { - if (strcase_startswith(p, "Content-Length:", &p)) { - while (*p == ' ') - ++p; - content_length = 0; - for (nump = p; isdigit(*p); ++p) - content_length = content_length * 10 + *p - '0'; - if (p == nump || (*p != '\r' && *p != '\n')) { - if (debug & DEBUG_RHIZOME_RX) { - DEBUGF("Invalid HTTP reply: malformed Content-Length header"); - rhizome_fetch_close(q); - return; - } - } + DEBUGF("Got HTTP reply: %s", alloca_toprint(160, q->request, q->request_len)); + /* We have all the reply headers, so parse them, taking care of any following bytes of + content. */ + char *p = NULL; + if (!str_startswith(q->request, "HTTP/1.0 ", &p)) { + if (debug&DEBUG_RHIZOME_RX) + DEBUGF("Malformed HTTP reply: missing HTTP/1.0 preamble"); + rhizome_fetch_close(q); + return; } + int http_response_code = 0; + char *nump; + for (nump = p; isdigit(*p); ++p) + http_response_code = http_response_code * 10 + *p - '0'; + if (p == nump || *p != ' ') { + if (debug&DEBUG_RHIZOME_RX) + DEBUGF("Malformed HTTP reply: missing decimal status code"); + rhizome_fetch_close(q); + return; + } + if (http_response_code != 200) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", http_response_code); + rhizome_fetch_close(q); + return; + } + // This loop will terminate, because http_header_complete() above found at least + // "\n\n" at the end of the header, and probably "\r\n\r\n". while (*p++ != '\n') ; + // Iterate over header lines until the last blank line. + long long content_length = -1; + while (*p != '\r' && *p != '\n') { + if (strcase_startswith(p, "Content-Length:", &p)) { + while (*p == ' ') + ++p; + content_length = 0; + for (nump = p; isdigit(*p); ++p) + content_length = content_length * 10 + *p - '0'; + if (p == nump || (*p != '\r' && *p != '\n')) { + if (debug & DEBUG_RHIZOME_RX) { + DEBUGF("Invalid HTTP reply: malformed Content-Length header"); + rhizome_fetch_close(q); + return; + } + } + } + while (*p++ != '\n') + ; + } + if (*p == '\r') + ++p; + ++p; // skip '\n' at end of blank line + if (content_length == -1) { + if (debug & DEBUG_RHIZOME_RX) + DEBUGF("Invalid HTTP reply: missing Content-Length header"); + rhizome_fetch_close(q); + return; + } + q->file_len = content_length; + /* We have all we need. The file is already open, so just write out any initial bytes of + the body we read. + */ + q->state = RHIZOME_FETCH_RXFILE; + int content_bytes = q->request + q->request_len - p; + if (content_bytes > 0) + rhizome_write_content(q, p, content_bytes); } - if (*p == '\r') - ++p; - ++p; // skip '\n' at end of blank line - if (content_length == -1) { - if (debug & DEBUG_RHIZOME_RX) - DEBUGF("Invalid HTTP reply: missing Content-Length header"); - rhizome_fetch_close(q); - return; - } - q->file_len = content_length; - /* We have all we need. The file is already open, so just write out any initial bytes of - the body we read. - */ - q->state = RHIZOME_FETCH_RXFILE; - int content_bytes = q->request + q->request_len - p; - if (content_bytes > 0) - rhizome_write_content(q, p, content_bytes); + } else { + if (debug & DEBUG_RHIZOME_RX) + DEBUG("Empty read, closing connection"); + rhizome_fetch_close(q); + return; + } + if (sigPipeFlag) { + if (debug & DEBUG_RHIZOME_RX) + DEBUG("Received SIGPIPE, closing connection"); + rhizome_fetch_close(q); + return; } - } else { - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Empty read, closing connection"); - rhizome_fetch_close(q); - return; - } - if (sigPipeFlag) { - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Received SIGPIPE, closing connection"); - rhizome_fetch_close(q); - return; } + break; + default: + if (debug & DEBUG_RHIZOME_RX) + DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state."); + rhizome_fetch_close(q); + return; } - break; - default: - if (debug & DEBUG_RHIZOME_RX) - DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state."); - rhizome_fetch_close(q); - return; - } + } + + if (alarm->poll.revents & (POLLHUP | POLLERR)) { + rhizome_fetch_close(q); + } return; } diff --git a/rhizome_http.c b/rhizome_http.c index fcca6a2d..1f77736d 100644 --- a/rhizome_http.c +++ b/rhizome_http.c @@ -261,46 +261,52 @@ void rhizome_client_poll(struct sched_ent *alarm) void rhizome_server_poll(struct sched_ent *alarm) { - struct sockaddr addr; - unsigned int addr_len = sizeof addr; - int sock; - while ((sock = accept(rhizome_server_socket, &addr, &addr_len)) != -1) { - if (addr.sa_family == AF_INET) { - struct sockaddr_in *peerip = (struct sockaddr_in *)&addr; - INFOF("RHIZOME HTTP SERVER, ACCEPT addrlen=%u family=%u port=%u addr=%u.%u.%u.%u", - addr_len, peerip->sin_family, peerip->sin_port, - ((unsigned char*)&peerip->sin_addr.s_addr)[0], - ((unsigned char*)&peerip->sin_addr.s_addr)[1], - ((unsigned char*)&peerip->sin_addr.s_addr)[2], - ((unsigned char*)&peerip->sin_addr.s_addr)[3] - ); - } else { - INFOF("RHIZOME HTTP SERVER, ACCEPT addrlen=%u family=%u data=%s", + if (alarm->poll.revents & (POLLIN | POLLOUT)) { + struct sockaddr addr; + unsigned int addr_len = sizeof addr; + int sock; + while ((sock = accept(rhizome_server_socket, &addr, &addr_len)) != -1) { + if (addr.sa_family == AF_INET) { + struct sockaddr_in *peerip = (struct sockaddr_in *)&addr; + INFOF("RHIZOME HTTP SERVER, ACCEPT addrlen=%u family=%u port=%u addr=%u.%u.%u.%u", + addr_len, peerip->sin_family, peerip->sin_port, + ((unsigned char*)&peerip->sin_addr.s_addr)[0], + ((unsigned char*)&peerip->sin_addr.s_addr)[1], + ((unsigned char*)&peerip->sin_addr.s_addr)[2], + ((unsigned char*)&peerip->sin_addr.s_addr)[3] + ); + } else { + INFOF("RHIZOME HTTP SERVER, ACCEPT addrlen=%u family=%u data=%s", addr_len, addr.sa_family, alloca_tohex((unsigned char *)addr.sa_data, sizeof addr.sa_data) ); + } + rhizome_http_request *request = calloc(sizeof(rhizome_http_request), 1); + if (request == NULL) { + WHYF_perror("calloc(%u, 1)", sizeof(rhizome_http_request)); + WHY("Cannot respond to request, out of memory"); + } else { + /* We are now trying to read the HTTP request */ + request->request_type=RHIZOME_HTTP_REQUEST_RECEIVING; + request->alarm.function = rhizome_client_poll; + connection_stats.name="rhizome_client_poll"; + request->alarm.stats=&connection_stats; + request->alarm.poll.fd=sock; + request->alarm.poll.events=POLLIN; + request->alarm.alarm = gettime_ms()+RHIZOME_IDLE_TIMEOUT; + request->alarm.deadline = request->alarm.alarm+RHIZOME_IDLE_TIMEOUT; + // watch for the incoming http request + watch(&request->alarm); + // set an inactivity timeout to close the connection + schedule(&request->alarm); + } } - rhizome_http_request *request = calloc(sizeof(rhizome_http_request), 1); - if (request == NULL) { - WHYF_perror("calloc(%u, 1)", sizeof(rhizome_http_request)); - WHY("Cannot respond to request, out of memory"); - } else { - /* We are now trying to read the HTTP request */ - request->request_type=RHIZOME_HTTP_REQUEST_RECEIVING; - request->alarm.function = rhizome_client_poll; - connection_stats.name="rhizome_client_poll"; - request->alarm.stats=&connection_stats; - request->alarm.poll.fd=sock; - request->alarm.poll.events=POLLIN; - request->alarm.alarm = gettime_ms()+RHIZOME_IDLE_TIMEOUT; - request->alarm.deadline = request->alarm.alarm+RHIZOME_IDLE_TIMEOUT; - // watch for the incoming http request - watch(&request->alarm); - // set an inactivity timeout to close the connection - schedule(&request->alarm); + if (errno != EAGAIN) { + WARN_perror("accept"); } } - if (errno != EAGAIN) { - WARN_perror("accept"); + + if (alarm->poll.revents & (POLLHUP | POLLERR)) { + INFO("Error on tcp listen socket"); } }