Improve HTTP server generated content logic

Content generator functions now take arguments describing the buffer
they are to fill, and respond with a struct containing the number of
bytes filled, and the number of free bytes needed before being called
again.

The HTTP response logic now fills the buffer as much as possible before
calling write(2) by topping it up instead of waiting for it to be
completely emptied before generating more content.
This commit is contained in:
Andrew Bettison 2013-11-08 18:30:11 +10:30
parent a62b6f9250
commit d5b48f5a9e
4 changed files with 215 additions and 139 deletions

View File

@ -118,6 +118,8 @@ void http_request_init(struct http_request *r, int sockfd)
void http_request_free_response_buffer(struct http_request *r)
{
if (r->response_free_buffer) {
if (r->debug_flag && *r->debug_flag)
DEBUGF("Free response buffer of %zu bytes", r->response_buffer_size);
r->response_free_buffer(r->response_buffer);
r->response_free_buffer = NULL;
}
@ -127,6 +129,8 @@ void http_request_free_response_buffer(struct http_request *r)
int http_request_set_response_bufsize(struct http_request *r, size_t bufsiz)
{
// Don't allocate a new buffer if the existing one contains content.
assert(r->response_buffer_sent == r->response_buffer_length);
const char *const bufe = r->buffer + sizeof r->buffer;
assert(r->received < bufe);
size_t rbufsiz = bufe - r->received;
@ -134,6 +138,8 @@ int http_request_set_response_bufsize(struct http_request *r, size_t bufsiz)
http_request_free_response_buffer(r);
r->response_buffer = (char *) r->received;
r->response_buffer_size = rbufsiz;
if (r->debug_flag && *r->debug_flag)
DEBUGF("Static response buffer %zu bytes", r->response_buffer_size);
return 0;
}
if (bufsiz != r->response_buffer_size) {
@ -142,6 +148,8 @@ int http_request_set_response_bufsize(struct http_request *r, size_t bufsiz)
return -1;
r->response_free_buffer = free;
r->response_buffer_size = bufsiz;
if (r->debug_flag && *r->debug_flag)
DEBUGF("Allocated response buffer %zu bytes", r->response_buffer_size);
}
assert(r->response_buffer_size >= bufsiz);
assert(r->response_buffer != NULL);
@ -1642,56 +1650,87 @@ static void http_request_receive(struct http_request *r)
*/
static void http_request_send_response(struct http_request *r)
{
if (r->response_length != CONTENT_LENGTH_UNKNOWN)
assert(r->response_sent <= r->response_length);
int g = 1;
while (g) {
if (r->response_length != CONTENT_LENGTH_UNKNOWN && r->response_sent == r->response_length)
break;
while (1) {
if (r->response_length != CONTENT_LENGTH_UNKNOWN)
assert(r->response_sent <= r->response_length);
assert(r->response_buffer_sent <= r->response_buffer_length);
if (r->response_buffer_sent == r->response_buffer_length) {
g = 0;
if (r->response.content_generator) {
// Content generator must fill or partly fill response_buffer and set response_buffer_sent
// and response_buffer_length. May also malloc() a bigger buffer and set response_buffer to
// point to it.
r->response_buffer_sent = r->response_buffer_length = 0;
g = r->response.content_generator(r);
if (g == -1) {
if (r->debug_flag && *r->debug_flag)
DEBUG("Content generation error, closing connection");
http_request_finalise(r);
return;
}
assert(r->response_buffer_sent <= r->response_buffer_length);
if (r->response_buffer_sent == r->response_buffer_length) {
if (g == 0)
break;
if (r->response_length != CONTENT_LENGTH_UNKNOWN)
WHYF("HTTP response generator produced no content at offset %"PRIhttp_size_t"/%"PRIhttp_size_t" (%"PRIhttp_size_t" bytes remaining)",
r->response_sent, r->response_length, r->response_length - r->response_sent);
else
WHYF("HTTP response generator produced no content at offset %"PRIhttp_size_t"", r->response_sent);
http_request_finalise(r);
return;
}
} else if (r->response_length != CONTENT_LENGTH_UNKNOWN) {
WHYF("HTTP response is short of total length (%"PRIhttp_size_t") by %"PRIhttp_size_t" bytes",
r->response_length, r->response_length - r->response_sent);
http_request_finalise(r);
return;
}
uint64_t remaining = CONTENT_LENGTH_UNKNOWN;
size_t unsent = r->response_buffer_length - r->response_buffer_sent;
if (r->response_length != CONTENT_LENGTH_UNKNOWN) {
remaining = r->response_length - r->response_sent;
assert(unsent <= remaining);
assert(r->response_buffer_need <= remaining);
if (remaining == 0)
break; // no more to generate
}
assert(r->response_buffer_sent < r->response_buffer_length);
size_t bytes = r->response_buffer_length - r->response_buffer_sent;
if (r->response_sent + bytes > r->response_length) {
WHYF("HTTP response overruns total length (%"PRIhttp_size_t") by %"PRIhttp_size_t" bytes -- truncating",
r->response_length,
r->response_sent + bytes - r->response_length);
bytes = r->response_length - r->response_sent;
if (unsent == 0)
r->response_buffer_sent = r->response_buffer_length = 0;
if (r->response.content_generator) {
// If the buffer is smaller than the content generator needs, and it contains no unsent
// content, then allocate a larger buffer.
if (r->response_buffer_need > r->response_buffer_size && unsent == 0) {
if (http_request_set_response_bufsize(r, r->response_buffer_need) == -1) {
WHYF("HTTP response truncated at offset=%"PRIhttp_size_t" due to insufficient buffer space",
r->response_sent);
http_request_finalise(r);
return;
}
}
// If there are some sent bytes at the start of the buffer and only a few unsent bytes, then
// move the unsent content to the start of the buffer to make more room.
if (r->response_buffer_sent > 0 && unsent < 128) {
memmove(r->response_buffer, r->response_buffer + r->response_buffer_sent, unsent);
r->response_buffer_length -= r->response_buffer_sent;
r->response_buffer_sent = 0;
}
// If there is enough unfilled room at the end of the buffer, then fill the buffer with some
// more content.
assert(r->response_buffer_length <= r->response_buffer_size);
size_t unfilled = r->response_buffer_size - r->response_buffer_length;
if (unfilled > 0 && unfilled >= r->response_buffer_need) {
// The content generator must fill or partly fill the part of the buffer we indicate and
// return the number of bytes appended. If it returns zero, it means it has no more
// content (EOF), and must not be called again. If the return value exceeds the buffer size
// we supply, it gives the amount of free space the generator needs in order to append; the
// generator will not append any bytes until that much free space is available. If returns
// -1, it means an unrecoverable error occurred, and the generator must not be called again.
struct http_content_generator_result result;
bzero(&result, sizeof result);
int ret = r->response.content_generator(r, (unsigned char *) r->response_buffer + r->response_buffer_length, unfilled, &result);
if (ret == -1) {
WHY("Content generation error, closing connection");
http_request_finalise(r);
return;
}
if (result.generated == 0 && result.need <= unfilled) {
WHYF("HTTP response generator produced no content at offset %"PRIhttp_size_t, r->response_sent);
http_request_finalise(r);
return;
}
if (r->debug_flag && *r->debug_flag)
DEBUGF("Generated HTTP %zu bytes of content, need %zu bytes of buffer", result.generated, result.need);
assert(result.generated <= unfilled);
r->response_buffer_length += result.generated;
r->response_buffer_need = result.need;
if (ret == 0)
r->response.content_generator = NULL;
continue;
}
} else if (remaining != CONTENT_LENGTH_UNKNOWN && unsent < remaining) {
WHYF("HTTP response generator finished prematurely at offset %"PRIhttp_size_t"/%"PRIhttp_size_t" (%"PRIhttp_size_t" bytes remaining)",
r->response_sent, r->response_length, remaining);
http_request_finalise(r);
return;
} else if (unsent == 0)
break;
assert(unsent > 0);
if (remaining != CONTENT_LENGTH_UNKNOWN && unsent > remaining) {
WHYF("HTTP response overruns Content-Length (%"PRIhttp_size_t") by %"PRIhttp_size_t" bytes -- truncating",
r->response_length, unsent - remaining);
unsent = remaining;
}
sigPipeFlag = 0;
ssize_t written = write_nonblock(r->alarm.poll.fd, r->response_buffer + r->response_buffer_sent, bytes);
ssize_t written = write_nonblock(r->alarm.poll.fd, r->response_buffer + r->response_buffer_sent, unsent);
if (written == -1) {
if (r->debug_flag && *r->debug_flag)
DEBUG("HTTP socket write error, closing connection");
@ -1719,8 +1758,8 @@ static void http_request_send_response(struct http_request *r)
r->alarm.deadline = r->alarm.alarm + r->idle_timeout;
unschedule(&r->alarm);
schedule(&r->alarm);
// If we wrote less than we tried, then go back to polling.
if (written < (size_t) bytes)
// If we wrote less than we tried, then go back to polling, otherwise keep generating content.
if (written < (size_t) unsent)
return;
}
if (r->debug_flag && *r->debug_flag)
@ -1929,19 +1968,22 @@ static int _render_response(struct http_request *r)
r->response_length = strbuf_count(sb) + hr.header.content_length;
else
r->response_length = CONTENT_LENGTH_UNKNOWN;
r->response_buffer_need = strbuf_count(sb) + 1; // the header and the strbuf terminating NUL
if (hr.content) {
assert(r->response_length != CONTENT_LENGTH_UNKNOWN);
r->response_buffer_need = r->response_length + 1;
} else {
if (r->response_buffer_need < r->response_length)
r->response_buffer_need = r->response_length;
} else
assert(hr.content_generator);
r->response_buffer_need = strbuf_count(sb) + 1;
}
if (r->response_buffer_size < r->response_buffer_need)
return 0;
return 0; // doesn't fit
assert(!strbuf_overrun(sb));
if (hr.content)
if (hr.content) {
bcopy(hr.content, strbuf_end(sb), hr.header.content_length);
r->response_buffer_length = r->response_buffer_need;
r->response_buffer_length = r->response_length;
} else {
r->response_buffer_length = strbuf_count(sb);
}
r->response_buffer_sent = 0;
return 1;
}
@ -2022,6 +2064,7 @@ static void http_request_start_response(struct http_request *r)
return;
}
}
r->response_buffer_need = 0;
r->response_sent = 0;
if (r->debug_flag && *r->debug_flag)
DEBUGF("Sending HTTP response: %s", alloca_toprint(160, (const char *)r->response_buffer, r->response_buffer_length));

View File

@ -96,13 +96,18 @@ struct http_response_headers {
struct http_www_authenticate www_authenticate;
};
typedef int (*HTTP_CONTENT_GENERATOR)(struct http_request *);
struct http_content_generator_result {
size_t generated;
size_t need;
};
typedef int (HTTP_CONTENT_GENERATOR)(struct http_request *, unsigned char *, size_t, struct http_content_generator_result *);
struct http_response {
uint16_t result_code;
struct http_response_headers header;
const char *content;
HTTP_CONTENT_GENERATOR content_generator; // callback to produce more content
HTTP_CONTENT_GENERATOR *content_generator; // callback to produce more content
};
#define MIME_FILENAME_MAXLEN 127
@ -139,7 +144,7 @@ void http_request_free_response_buffer(struct http_request *r);
int http_request_set_response_bufsize(struct http_request *r, size_t bufsiz);
void http_request_finalise(struct http_request *r);
void http_request_response_static(struct http_request *r, int result, const char *mime_type, const char *body, uint64_t bytes);
void http_request_response_generated(struct http_request *r, int result, const char *mime_type, HTTP_CONTENT_GENERATOR);
void http_request_response_generated(struct http_request *r, int result, const char *mime_type, HTTP_CONTENT_GENERATOR *);
void http_request_simple_response(struct http_request *r, uint16_t result, const char *body);
typedef int (*HTTP_REQUEST_PARSER)(struct http_request *);

View File

@ -735,6 +735,7 @@ typedef struct rhizome_http_request
/* For responses that list manifests.
*/
struct {
enum { LIST_HEADER = 0, LIST_BODY, LIST_DONE } phase;
size_t rowcount;
struct rhizome_list_cursor cursor;
} list;

View File

@ -335,7 +335,7 @@ static int is_authorized(struct http_client_authorization *auth)
return 0;
}
static int restful_rhizome_bundlelist_json_content(struct http_request *);
static HTTP_CONTENT_GENERATOR restful_rhizome_bundlelist_json_content;
static int restful_rhizome_bundlelist_json(rhizome_http_request *r, const char *remainder)
{
@ -353,19 +353,16 @@ static int restful_rhizome_bundlelist_json(rhizome_http_request *r, const char *
http_request_simple_response(&r->http, 401, NULL);
return 0;
}
r->u.list.phase = LIST_HEADER;
r->u.list.rowcount = 0;
bzero(&r->u.list.cursor, sizeof r->u.list.cursor);
http_request_response_generated(&r->http, 200, "application/json", restful_rhizome_bundlelist_json_content);
return 0;
}
static int restful_rhizome_bundlelist_json_content(struct http_request *hr)
static int restful_rhizome_bundlelist_json_content_chunk(sqlite_retry_state *retry, struct rhizome_http_request *r, strbuf b)
{
rhizome_http_request *r = (rhizome_http_request *) hr;
assert(r->http.response_buffer_sent == 0);
assert(r->http.response_buffer_length == 0);
strbuf b = strbuf_local(r->http.response_buffer, r->http.response_buffer_size);
const char *headers[]={
const char *headers[] = {
"_id",
"service",
"id",
@ -380,66 +377,102 @@ static int restful_rhizome_bundlelist_json_content(struct http_request *hr)
"recipient",
"name"
};
if (r->u.list.rowcount == 0) {
strbuf_puts(b, "[[");
unsigned i;
for (i = 0; i != NELS(headers); ++i) {
if (i)
switch (r->u.list.phase) {
case LIST_HEADER:
strbuf_puts(b, "[[");
unsigned i;
for (i = 0; i != NELS(headers); ++i) {
if (i)
strbuf_putc(b, ',');
strbuf_json_string(b, headers[i]);
}
strbuf_puts(b, "]");
if (strbuf_overrun(b))
return 0;
r->u.list.phase = LIST_BODY;
return 1;
case LIST_BODY:
{
int ret = rhizome_list_next(retry, &r->u.list.cursor);
if (ret == -1)
return -1;
if (ret == 0) {
strbuf_puts(b, "\n]\n");
if (strbuf_overrun(b))
return 0;
r->u.list.phase = LIST_DONE;
return 0;
}
rhizome_manifest *m = r->u.list.cursor.manifest;
assert(m->filesize != RHIZOME_SIZE_UNSET);
rhizome_lookup_author(m);
strbuf_puts(b, ",\n [");
strbuf_sprintf(b, "%"PRIu64, r->u.list.cursor.rowid);
strbuf_putc(b, ',');
strbuf_json_string(b, headers[i]);
}
strbuf_puts(b, "]");
strbuf_json_string(b, m->service);
strbuf_putc(b, ',');
strbuf_json_hex(b, m->cryptoSignPublic.binary, sizeof m->cryptoSignPublic.binary);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRId64, m->version);
strbuf_putc(b, ',');
if (m->has_date)
strbuf_sprintf(b, "%"PRItime_ms_t, m->date);
else
strbuf_json_null(b);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRItime_ms_t",", m->inserttime);
switch (m->authorship) {
case AUTHOR_LOCAL:
case AUTHOR_AUTHENTIC:
strbuf_json_hex(b, m->author.binary, sizeof m->author.binary);
strbuf_puts(b, ",1,");
break;
default:
strbuf_json_null(b);
strbuf_puts(b, ",1,");
break;
}
strbuf_sprintf(b, "%"PRIu64, m->filesize);
strbuf_putc(b, ',');
strbuf_json_hex(b, m->filesize ? m->filehash.binary : NULL, sizeof m->filehash.binary);
strbuf_putc(b, ',');
strbuf_json_hex(b, m->has_sender ? m->sender.binary : NULL, sizeof m->sender.binary);
strbuf_putc(b, ',');
strbuf_json_hex(b, m->has_recipient ? m->recipient.binary : NULL, sizeof m->recipient.binary);
strbuf_putc(b, ',');
strbuf_json_string(b, m->name);
strbuf_puts(b, "]");
if (strbuf_overrun(b))
return 0;
rhizome_list_commit(&r->u.list.cursor);
++r->u.list.rowcount;
return 1;
}
case LIST_DONE:
break;
}
return 0;
}
static int restful_rhizome_bundlelist_json_content(struct http_request *hr, unsigned char *buf, size_t bufsz, struct http_content_generator_result *result)
{
rhizome_http_request *r = (rhizome_http_request *) hr;
assert(bufsz > 0);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
int ret = rhizome_list_next(&retry, &r->u.list.cursor);
if (ret == 0) {
strbuf_puts(b, "\n]\n");
} else if (ret == 1) {
++r->u.list.rowcount;
rhizome_manifest *m = r->u.list.cursor.manifest;
assert(m->filesize != RHIZOME_SIZE_UNSET);
rhizome_lookup_author(m);
strbuf_puts(b, ",\n [");
strbuf_sprintf(b, "%"PRIu64, r->u.list.cursor.rowid);
strbuf_putc(b, ',');
strbuf_json_string(b, m->service);
strbuf_putc(b, ',');
strbuf_json_hex(b, m->cryptoSignPublic.binary, sizeof m->cryptoSignPublic.binary);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRId64, m->version);
strbuf_putc(b, ',');
if (m->has_date)
strbuf_sprintf(b, "%"PRItime_ms_t, m->date);
else
strbuf_json_null(b);
strbuf_putc(b, ',');
strbuf_sprintf(b, "%"PRItime_ms_t",", m->inserttime);
switch (m->authorship) {
case AUTHOR_LOCAL:
case AUTHOR_AUTHENTIC:
strbuf_json_hex(b, m->author.binary, sizeof m->author.binary);
strbuf_puts(b, ",1,");
break;
default:
strbuf_json_null(b);
strbuf_puts(b, ",1,");
break;
int ret = rhizome_list_open(&retry, &r->u.list.cursor);
if (ret == -1)
return -1;
strbuf b = strbuf_local((char *)buf, bufsz);
while ((ret = restful_rhizome_bundlelist_json_content_chunk(&retry, r, b)) != -1) {
if (strbuf_overrun(b)) {
result->need = strbuf_count(b) + 1 - result->generated;
break;
}
strbuf_sprintf(b, "%"PRIu64, m->filesize);
strbuf_putc(b, ',');
strbuf_json_hex(b, m->filesize ? m->filehash.binary : NULL, sizeof m->filehash.binary);
strbuf_putc(b, ',');
strbuf_json_hex(b, m->has_sender ? m->sender.binary : NULL, sizeof m->sender.binary);
strbuf_putc(b, ',');
strbuf_json_hex(b, m->has_recipient ? m->recipient.binary : NULL, sizeof m->recipient.binary);
strbuf_putc(b, ',');
strbuf_json_string(b, m->name);
strbuf_puts(b, "]");
result->generated = strbuf_len(b);
if (ret == 0)
break;
}
if (strbuf_overrun(b))
ret = WHY("HTTP response buffer overrun");
rhizome_list_release(&r->u.list.cursor);
r->http.response_buffer_length = strbuf_len(b);
return ret;
}
@ -509,30 +542,24 @@ static int rhizome_status_page(rhizome_http_request *r, const char *remainder)
return 0;
}
static int rhizome_file_content(struct http_request *hr)
static int rhizome_file_content(struct http_request *hr, unsigned char *buf, size_t bufsz, struct http_content_generator_result *result)
{
// Reads the next part of the payload into the supplied buffer.
rhizome_http_request *r = (rhizome_http_request *) hr;
assert(r->http.response_buffer_sent == 0);
assert(r->http.response_buffer_length == 0);
assert(r->u.read_state.offset < r->u.read_state.length);
uint64_t readlen = r->u.read_state.length - r->u.read_state.offset;
size_t suggested_size = 64 * 1024;
if (suggested_size > readlen)
suggested_size = readlen;
if (r->http.response_buffer_size < suggested_size)
http_request_set_response_bufsize(&r->http, suggested_size);
if (r->http.response_buffer == NULL)
http_request_set_response_bufsize(&r->http, 1);
if (r->http.response_buffer == NULL)
size_t readlen = r->u.read_state.length - r->u.read_state.offset;
if (readlen > bufsz)
readlen = bufsz;
ssize_t n = rhizome_read(&r->u.read_state, buf, readlen);
if (n == -1)
return -1;
ssize_t len = rhizome_read(&r->u.read_state,
(unsigned char *)r->http.response_buffer,
r->http.response_buffer_size);
if (len == -1)
return -1;
assert((size_t) len <= r->http.response_buffer_size);
r->http.response_buffer_length += (size_t) len;
return 1;
result->generated = (size_t) n;
// Ask for a large buffer for all future reads.
const size_t preferred_bufsz = 64 * 1024;
assert(r->u.read_state.offset < r->u.read_state.length);
size_t remain = r->u.read_state.length - r->u.read_state.offset;
result->need = remain < preferred_bufsz ? remain : preferred_bufsz;
return remain ? 1 : 0;
}
static int rhizome_file_page(rhizome_http_request *r, const char *remainder)