Fetch journal range via HTTP

This commit is contained in:
Jeremy Lakeman 2013-08-15 17:00:43 +09:30
parent bc8ff888e9
commit 771cb4151b
4 changed files with 52 additions and 4 deletions

View File

@ -688,6 +688,7 @@ int rhizome_fetch_has_queue_space(unsigned char log2_size);
struct http_response_parts {
int code;
char *reason;
int64_t range_start;
int64_t content_length;
char *content_start;
};

View File

@ -437,6 +437,9 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
|| slot->previous->fileLength + slot->previous->journalTail < slot->manifest->journalTail){
rhizome_manifest_free(slot->previous);
slot->previous=NULL;
}else{
strbuf_sprintf(r, "Range: bytes=%"PRId64"-%"PRId64"\r\n",
slot->previous->fileLength - slot->manifest->journalTail, slot->manifest->fileLength);
}
}
@ -1425,12 +1428,19 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
}
if (slot->write_state.file_length==-1)
slot->write_state.file_length=parts.content_length;
else if (parts.content_length != slot->write_state.file_length)
WARNF("Expected content length %"PRId64", got %"PRId64, slot->write_state.file_length, parts.content_length);
else if (parts.content_length + parts.range_start != slot->write_state.file_length)
WARNF("Expected content length %"PRId64", got %"PRId64" + %"PRId64,
slot->write_state.file_length, parts.content_length, parts.range_start);
/* We have all we need. The file is already open, so just write out any initial bytes of
the body we read.
*/
slot->state = RHIZOME_FETCH_RXFILE;
if (slot->previous && parts.range_start){
if (parts.range_start != slot->previous->fileLength - slot->manifest->journalTail)
WARNF("Expected Content-Range header to start @%"PRId64, slot->previous->fileLength - slot->manifest->journalTail);
pipe_journal(slot);
}
int content_bytes = slot->request + slot->request_len - parts.content_start;
if (content_bytes > 0){
rhizome_write_content(slot, (unsigned char*)parts.content_start, content_bytes);
@ -1487,6 +1497,7 @@ int unpack_http_response(char *response, struct http_response_parts *parts)
IN();
parts->code = -1;
parts->reason = NULL;
parts->range_start=0;
parts->content_length = -1;
parts->content_start = NULL;
char *p = NULL;
@ -1510,6 +1521,16 @@ int unpack_http_response(char *response, struct http_response_parts *parts)
*p++ = '\0';
// Iterate over header lines until the last blank line.
while (!(p[0] == '\n' || (p[0] == '\r' && p[1] == '\n'))) {
if (strcase_startswith(p, "Content-Range: bytes ", (const char **)&p)) {
char *nump = p;
while (isdigit(*p))
parts->range_start = parts->range_start * 10 + *p++ - '0';
if (p == nump) {
if (config.debug.rhizome_rx)
DEBUGF("Invalid HTTP reply: malformed Content-Range header");
RETURN(-1);
}
}
if (strcase_startswith(p, "Content-Length:", (const char **)&p)) {
while (*p == ' ')
++p;

View File

@ -735,9 +735,9 @@ static strbuf strbuf_build_http_response(strbuf sb, const struct http_response *
{
strbuf_sprintf(sb, "HTTP/1.0 %03u %s\r\n", h->result_code, httpResultString(h->result_code));
strbuf_sprintf(sb, "Content-type: %s\r\n", h->content_type);
if (h->content_end && h->content_length)
if (h->content_end && h->content_length && (h->content_start!=0 || h->content_end!=h->content_length))
strbuf_sprintf(sb,
"Content-range: %"PRIu64"-%"PRIu64"/%"PRIu64"\r\n"
"Content-range: bytes %"PRIu64"-%"PRIu64"/%"PRIu64"\r\n"
"Content-length: %"PRIu64"\r\n",
h->content_start, h->content_end, h->content_length, h->content_end - h->content_start);
else if (h->content_length)

View File

@ -228,6 +228,32 @@ test_journalMDP() {
assertGrep $instance_servald_log "Copying [0-9]\+ bytes from previous journal"
}
doc_journalHTTP="Transfer and update a journal bundle via HTTP"
setup_journalHTTP() {
setup_common
foreach_instance +A +B \
executeOk_servald config \
set rhizome.mdp.enable 0
set_instance +A
create_file file1 64
create_file file2 64
executeOk_servald rhizome journal append $SIDA "" file1
extract_stdout_manifestid BID
extract_stdout_version VERSION
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
}
test_journalHTTP() {
wait_until bundle_received_by $BID:$VERSION +B
set_instance +A
executeOk_servald rhizome journal append $SIDA $BID file2
extract_stdout_version VERSION2
set_instance +B
wait_until bundle_received_by $BID:$VERSION2 +B
assertGrep $instance_servald_log "Copying [0-9]\+ bytes from previous journal"
}
#common setup and test routines for transferring a 1MB file
setup_bigfile_common() {
set_instance +A