added code to try switching to mdp if http fetching fails for

any reason.
This commit is contained in:
gardners 2012-11-29 16:15:51 +10:30
parent faad1f26b1
commit a07e5761c4

View File

@ -990,6 +990,25 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
return 0;
}
static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
{
/* close socket and stop watching it */
unwatch(&slot->alarm);
unschedule(&slot->alarm);
if (slot->alarm.poll.fd!=-1) {
close(slot->alarm.poll.fd);
slot->alarm.poll.fd = -1;
}
/* Begin MDP fetch process.
1. Send initial request.
2. Set timeout for next request (if fetching a file).
3. Set timeout for no traffic received.
*/
DEBUGF("Fetch via MDP not implemented");
return rhizome_fetch_close(slot);
}
void rhizome_fetch_write(struct rhizome_fetch_slot *slot)
{
if (debug & DEBUG_RHIZOME_RX)
@ -997,7 +1016,7 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot)
int bytes = write_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_ofs], slot->request_len-slot->request_ofs);
if (bytes == -1) {
WHY("Got error while sending HTTP request. Closing.");
rhizome_fetch_close(slot);
rhizome_fetch_switch_to_mdp(slot);
} else {
// reset timeout
unschedule(&slot->alarm);
@ -1086,88 +1105,93 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
if (alarm->poll.revents & (POLLIN | POLLOUT)) {
switch (slot->state) {
case RHIZOME_FETCH_CONNECTING:
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
rhizome_fetch_write(slot);
case RHIZOME_FETCH_CONNECTING:
case RHIZOME_FETCH_SENDINGHTTPREQUEST:
rhizome_fetch_write(slot);
return;
case RHIZOME_FETCH_RXFILEMDP:
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Fetching via MDP not implemented");
rhizome_fetch_close(slot);
break;
case RHIZOME_FETCH_RXFILE: {
/* Keep reading until we have the promised amount of data */
char buffer[8192];
sigPipeFlag = 0;
int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes > 0) {
rhizome_write_content(slot, buffer, bytes);
return;
case RHIZOME_FETCH_RXFILE: {
/* Keep reading until we have the promised amount of data */
char buffer[8192];
sigPipeFlag = 0;
int bytes = read_nonblock(slot->alarm.poll.fd, buffer, sizeof buffer);
/* If we got some data, see if we have found the end of the HTTP request */
if (bytes > 0) {
rhizome_write_content(slot, buffer, bytes);
return;
} else {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Empty read, closing connection");
rhizome_fetch_close(slot);
return;
}
if (sigPipeFlag) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Received SIGPIPE, closing connection");
rhizome_fetch_close(slot);
return;
}
}
break;
case RHIZOME_FETCH_RXHTTPHEADERS: {
/* Keep reading until we have two CR/LFs in a row */
sigPipeFlag = 0;
int bytes = read_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_len], 1024 - slot->request_len - 1);
/* If we got some data, see if we have found the end of the HTTP reply */
if (bytes > 0) {
// reset timeout
unschedule(&slot->alarm);
slot->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT;
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
slot->request_len += bytes;
if (http_header_complete(slot->request, slot->request_len, bytes)) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Got HTTP reply: %s", alloca_toprint(160, slot->request, slot->request_len));
/* We have all the reply headers, so parse them, taking care of any following bytes of
content. */
struct http_response_parts parts;
if (unpack_http_response(slot->request, &parts) == -1) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed HTTP request: failed to unpack http response");
rhizome_fetch_close(slot);
return;
}
if (parts.code != 200) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", parts.code);
rhizome_fetch_close(slot);
return;
}
if (parts.content_length == -1) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Invalid HTTP reply: missing Content-Length header");
rhizome_fetch_close(slot);
return;
}
slot->file_len = parts.content_length;
/* We have all we need. The file is already open, so just write out any initial bytes of
the body we read.
*/
slot->state = RHIZOME_FETCH_RXFILE;
int content_bytes = slot->request + slot->request_len - parts.content_start;
if (content_bytes > 0){
rhizome_write_content(slot, parts.content_start, content_bytes);
return;
}
}
}
break;
default:
} else {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Empty read, closing connection");
rhizome_fetch_switch_to_mdp(slot);
return;
}
if (sigPipeFlag) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Received SIGPIPE, closing connection");
rhizome_fetch_switch_to_mdp(slot);
return;
}
}
break;
case RHIZOME_FETCH_RXHTTPHEADERS: {
/* Keep reading until we have two CR/LFs in a row */
sigPipeFlag = 0;
int bytes = read_nonblock(slot->alarm.poll.fd, &slot->request[slot->request_len], 1024 - slot->request_len - 1);
/* If we got some data, see if we have found the end of the HTTP reply */
if (bytes > 0) {
// reset timeout
unschedule(&slot->alarm);
slot->alarm.alarm = gettime_ms() + RHIZOME_IDLE_TIMEOUT;
slot->alarm.deadline = slot->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&slot->alarm);
slot->request_len += bytes;
if (http_header_complete(slot->request, slot->request_len, bytes)) {
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state.");
rhizome_fetch_close(slot);
return;
DEBUGF("Got HTTP reply: %s", alloca_toprint(160, slot->request, slot->request_len));
/* We have all the reply headers, so parse them, taking care of any following bytes of
content. */
struct http_response_parts parts;
if (unpack_http_response(slot->request, &parts) == -1) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed HTTP request: failed to unpack http response");
rhizome_fetch_switch_to_mdp(slot);
return;
}
if (parts.code != 200) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Failed HTTP request: rhizome server returned %d != 200 OK", parts.code);
rhizome_fetch_switch_to_mdp(slot);
return;
}
if (parts.content_length == -1) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Invalid HTTP reply: missing Content-Length header");
rhizome_fetch_switch_to_mdp(slot);
return;
}
slot->file_len = parts.content_length;
/* We have all we need. The file is already open, so just write out any initial bytes of
the body we read.
*/
slot->state = RHIZOME_FETCH_RXFILE;
int content_bytes = slot->request + slot->request_len - parts.content_start;
if (content_bytes > 0){
rhizome_write_content(slot, parts.content_start, content_bytes);
return;
}
}
}
break;
default:
if (debug & DEBUG_RHIZOME_RX)
DEBUG("Closing rhizome fetch connection due to illegal/unimplemented state.");
rhizome_fetch_close(slot);
return;
}
}
}
if (alarm->poll.revents==0 || alarm->poll.revents & (POLLHUP | POLLERR)){