Pre-emptively send small payloads via MDP

This commit is contained in:
Jeremy Lakeman 2013-08-21 09:21:40 +09:30
parent 5c374e9e6e
commit 1468d2deb0
7 changed files with 198 additions and 78 deletions

View File

@ -333,7 +333,7 @@ SUB_STRUCT(mdp_iftypelist, iftype,)
END_STRUCT
STRUCT(olsr)
ATOM(bool_t, enable, 1, boolean,, "If true, OLSR is used for mesh routing")
ATOM(bool_t, enable, 0, boolean,, "If true, OLSR is used for mesh routing")
ATOM(uint16_t, remote_port, 4130, uint16_nonzero,, "Remote port number")
ATOM(uint16_t, local_port, 4131, uint16_nonzero,, "Local port number")
END_STRUCT

View File

@ -32,10 +32,13 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
int rhizome_mdp_send_block(struct subscriber *dest, unsigned char *id, uint64_t version, uint64_t fileOffset, uint32_t bitmap, uint16_t blockLength)
{
IN();
if (blockLength>1024) RETURN(-1);
if (!is_rhizome_mdp_server_running())
RETURN(-1);
if (blockLength<=0 || blockLength>1024)
RETURN(WHYF("Invalid block length %d", blockLength));
if (config.debug.rhizome_tx)
DEBUGF("Requested blocks for %s @%"PRIx64, alloca_tohex_bid(id), fileOffset);
DEBUGF("Requested blocks for %s @%"PRIx64" bitmap %x", alloca_tohex_bid(id), fileOffset, bitmap);
overlay_mdp_frame reply;
bzero(&reply,sizeof(reply));
@ -52,7 +55,7 @@ int rhizome_mdp_send_block(struct subscriber *dest, unsigned char *id, uint64_t
bcopy(my_subscriber->sid,reply.out.src.sid,SID_SIZE);
reply.out.src.port=MDP_PORT_RHIZOME_RESPONSE;
if (dest && dest->reachable&REACHABLE_UNICAST){
if (dest && (dest->reachable==REACHABLE_UNICAST || dest->reachable==REACHABLE_INDIRECT)){
// if we get a request from a peer that we can only talk to via unicast, send data via unicast too.
bcopy(dest->sid, reply.out.dst.sid, SID_SIZE);
}else{
@ -102,11 +105,8 @@ int rhizome_mdp_send_block(struct subscriber *dest, unsigned char *id, uint64_t
OUT();
}
int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp)
int overlay_mdp_service_rhizomerequest(struct overlay_frame *frame, overlay_mdp_frame *mdp)
{
if (!is_rhizome_mdp_server_running())
return -1;
uint64_t version=
read_uint64(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES]);
uint64_t fileOffset=
@ -116,23 +116,23 @@ int overlay_mdp_service_rhizomerequest(overlay_mdp_frame *mdp)
uint16_t blockLength=
read_uint16(&mdp->out.payload[RHIZOME_MANIFEST_ID_BYTES+8+8+4]);
struct subscriber *source = find_subscriber(mdp->out.src.sid, SID_SIZE, 0);
return rhizome_mdp_send_block(source, &mdp->out.payload[0], version, fileOffset, bitmap, blockLength);
return rhizome_mdp_send_block(frame->source, &mdp->out.payload[0], version, fileOffset, bitmap, blockLength);
}
int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp)
{
IN();
if (!mdp->out.payload_length) RETURN(-1);
if (!mdp->out.payload_length)
RETURN(WHYF("No payload?"));
int type=mdp->out.payload[0];
switch (type) {
case 'B': /* data block */
case 'T': /* terminal data block */
{
if (mdp->out.payload_length<(1+16+8+8+1)) RETURN(-1);
if (mdp->out.payload_length<(1+16+8+8+1))
RETURN(WHYF("Payload too short"));
unsigned char *bidprefix=&mdp->out.payload[1];
uint64_t version=read_uint64(&mdp->out.payload[1+16]);
uint64_t offset=read_uint64(&mdp->out.payload[1+16+8]);
@ -148,7 +148,7 @@ int overlay_mdp_service_rhizomeresponse(overlay_mdp_frame *mdp)
*/
rhizome_received_content(bidprefix,version,offset,count,bytes,type);
RETURN(-1);
RETURN(0);
}
break;
}
@ -344,7 +344,7 @@ end:
RETURN(ret);
}
static int overlay_mdp_service_manifest_response(overlay_mdp_frame *mdp){
static int overlay_mdp_service_manifest_requests(struct overlay_frame *frame, overlay_mdp_frame *mdp){
int offset=0;
char id_hex[RHIZOME_MANIFEST_ID_STRLEN];
@ -356,7 +356,13 @@ static int overlay_mdp_service_manifest_response(overlay_mdp_frame *mdp){
if (!m)
return WHY("Unable to allocate manifest");
if (!rhizome_retrieve_manifest(id_hex, m)){
rhizome_advertise_manifest(m);
rhizome_advertise_manifest(frame->source, m);
// pre-emptively send the payload if it will fit in a single packet
if (m->fileLength > 0 && m->fileLength <= 1024){
rhizome_mdp_send_block(frame->source, m->cryptoSignPublic, m->version,
0, 0, m->fileLength);
}
}
rhizome_manifest_free(m);
offset+=RHIZOME_BAR_BYTES;
@ -378,9 +384,9 @@ int overlay_mdp_try_interal_services(struct overlay_frame *frame, overlay_mdp_fr
case MDP_PORT_PROBE: RETURN(overlay_mdp_service_probe(frame, mdp));
case MDP_PORT_STUNREQ: RETURN(overlay_mdp_service_stun_req(mdp));
case MDP_PORT_STUN: RETURN(overlay_mdp_service_stun(mdp));
case MDP_PORT_RHIZOME_REQUEST: RETURN(overlay_mdp_service_rhizomerequest(mdp));
case MDP_PORT_RHIZOME_REQUEST: RETURN(overlay_mdp_service_rhizomerequest(frame, mdp));
case MDP_PORT_RHIZOME_RESPONSE: RETURN(overlay_mdp_service_rhizomeresponse(mdp));
case MDP_PORT_RHIZOME_MANIFEST_REQUEST: RETURN(overlay_mdp_service_manifest_response(mdp));
case MDP_PORT_RHIZOME_MANIFEST_REQUEST: RETURN(overlay_mdp_service_manifest_requests(frame, mdp));
case MDP_PORT_RHIZOME_SYNC: RETURN(overlay_mdp_service_rhizome_sync(frame, mdp));
}

View File

@ -335,7 +335,7 @@ int rhizome_list_manifests(struct cli_context *context, const char *service, con
const char *sender_sid, const char *recipient_sid,
int limit, int offset, char count_rows);
int rhizome_retrieve_manifest(const char *manifestid, rhizome_manifest *m);
int rhizome_advertise_manifest(rhizome_manifest *m);
int rhizome_advertise_manifest(struct subscriber *dest, rhizome_manifest *m);
int rhizome_delete_bundle(const char *manifestid);
int rhizome_delete_manifest(const char *manifestid);
int rhizome_delete_payload(const char *manifestid);
@ -706,6 +706,7 @@ int rhizome_write_file(struct rhizome_write *write, const char *filename);
int rhizome_fail_write(struct rhizome_write *write);
int rhizome_finish_write(struct rhizome_write *write);
int rhizome_import_file(rhizome_manifest *m, const char *filepath);
int rhizome_import_buffer(rhizome_manifest *m, unsigned char *buffer, int length);
int rhizome_stat_file(rhizome_manifest *m, const char *filepath);
int rhizome_add_file(rhizome_manifest *m, const char *filepath);
int rhizome_derive_key(rhizome_manifest *m, rhizome_bk_t *bsk);

View File

@ -226,6 +226,50 @@ static struct rhizome_fetch_slot *rhizome_find_fetch_slot(long long size)
return NULL;
}
// find the first matching active slot for this bundle
static struct rhizome_fetch_slot *fetch_search_slot(unsigned char *id, int prefix_length)
{
int i;
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i];
if (q->active.state != RHIZOME_FETCH_FREE &&
memcmp(id, q->active.manifest->cryptoSignPublic, prefix_length) == 0)
return &q->active;
}
return NULL;
}
// find the first matching candidate for this bundle
static struct rhizome_fetch_candidate *fetch_search_candidate(unsigned char *id, int prefix_length)
{
int i, j;
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i];
for (j = 0; j < q->candidate_queue_size; j++) {
struct rhizome_fetch_candidate *c = &q->candidate_queue[j];
if (!c->manifest)
continue;
if (memcmp(c->manifest->cryptoSignPublic, id, prefix_length))
continue;
return c;
}
}
return NULL;
}
/* Search all fetch slots, including active downloads, for a matching manifest */
rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length){
struct rhizome_fetch_slot *s = fetch_search_slot(id, prefix_length);
if (s)
return s->manifest;
struct rhizome_fetch_candidate *c = fetch_search_candidate(id, prefix_length);
if (c)
return c->manifest;
return NULL;
}
/* Insert a candidate into a given queue at a given position. All candidates succeeding the given
* position are copied backward in the queue to open up an empty element at the given position. If
* the queue was full, then the tail element is discarded, freeing the manifest it points to.
@ -274,6 +318,19 @@ static void rhizome_fetch_unqueue(struct rhizome_fetch_queue *q, int i)
c->manifest = NULL;
}
static void candidate_unqueue(struct rhizome_fetch_candidate *c)
{
int i, index;
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i];
index = c - q->candidate_queue;
if (index>=0 && index < q->candidate_queue_size){
rhizome_fetch_unqueue(q, index);
return;
}
}
}
/* Return true if there are any active fetches currently in progress.
*
* @author Andrew Bettison <andrew@servalproject.com>
@ -607,11 +664,10 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct
* This avoids the problem of indefinite postponement of fetching if new versions are constantly
* being published faster than we can fetch them.
*/
int i;
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_slot *as = &rhizome_fetch_queues[i].active;
const rhizome_manifest *am = as->manifest;
if (as->state != RHIZOME_FETCH_FREE && memcmp(m->cryptoSignPublic, am->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES) == 0) {
{
struct rhizome_fetch_slot *as = fetch_search_slot(m->cryptoSignPublic, RHIZOME_MANIFEST_ID_BYTES);
if (as){
const rhizome_manifest *am = as->manifest;
if (am->version < m->version) {
if (config.debug.rhizome_rx)
DEBUGF(" fetch already in progress -- older version");
@ -626,6 +682,11 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct
RETURN(SAMEBUNDLE);
}
}
}
int i;
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_slot *as = &rhizome_fetch_queues[i].active;
const rhizome_manifest *am = as->manifest;
if (as->state != RHIZOME_FETCH_FREE && strcasecmp(m->fileHexHash, am->fileHexHash) == 0) {
if (config.debug.rhizome_rx)
DEBUGF(" fetch already in progress, slot=%d filehash=%s", i, m->fileHexHash);
@ -747,26 +808,6 @@ static void rhizome_start_next_queued_fetches(struct sched_ent *alarm)
OUT();
}
/* Search all fetch slots, including active downloads, for a matching manifest */
rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length){
int i, j;
for (i = 0; i < NQUEUES; ++i) {
struct rhizome_fetch_queue *q = &rhizome_fetch_queues[i];
if (q->active.state != RHIZOME_FETCH_FREE &&
memcmp(id, q->active.manifest->cryptoSignPublic, prefix_length) == 0)
return q->active.manifest;
for (j = 0; j < q->candidate_queue_size; j++) {
struct rhizome_fetch_candidate *c = &q->candidate_queue[j];
if (c->manifest && memcmp(id, c->manifest->cryptoSignPublic, prefix_length) == 0)
return c->manifest;
}
}
return NULL;
}
/* Do we have space to add a fetch candidate of this size? */
int rhizome_fetch_has_queue_space(unsigned char log2_size){
int i;
@ -1119,6 +1160,9 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
instances with the same SID.
*/
IN();
if (!is_rhizome_mdp_enabled()){
RETURN(rhizome_fetch_close(slot));
}
if (!my_subscriber) {
DEBUGF("I don't have an identity, so we cannot fall back to MDP");
RETURN(rhizome_fetch_close(slot));
@ -1312,15 +1356,11 @@ int rhizome_received_content(unsigned char *bidprefix,
int count,unsigned char *bytes,int type)
{
IN();
int i;
for(i=0;i<NQUEUES;i++) {
struct rhizome_fetch_slot *slot=&rhizome_fetch_queues[i].active;
if (slot->state!=RHIZOME_FETCH_RXFILEMDP
|| version != slot->bidVersion
|| memcmp(slot->bid,bidprefix,16)!=0)
continue;
if (!is_rhizome_mdp_enabled())
RETURN(-1);
struct rhizome_fetch_slot *slot=fetch_search_slot(bidprefix, 16);
if (slot && slot->bidVersion == version && slot->state == RHIZOME_FETCH_RXFILEMDP){
if (rhizome_random_write(&slot->write_state, offset, bytes, count)){
if (config.debug.rhizome)
DEBUGF("Write failed!");
@ -1332,6 +1372,7 @@ int rhizome_received_content(unsigned char *bidprefix,
DEBUGF("Complete failed!");
RETURN(-1);
}
slot->last_write_time=gettime_ms();
rhizome_fetch_mdp_touch_timeout(slot);
@ -1342,7 +1383,43 @@ int rhizome_received_content(unsigned char *bidprefix,
}
RETURN(0);
}
// if we get a packet containing an entire payload
// we may wish to store it, even if we aren't already fetching this payload via MDP
if (offset == 0){
rhizome_manifest *m = NULL;
struct rhizome_fetch_candidate *c = NULL;
if (slot && slot->bidVersion == version && slot->manifest->fileLength==count
&& slot->state!=RHIZOME_FETCH_RXFILEMDP){
m=slot->manifest;
}else{
slot = NULL;
c = fetch_search_candidate(bidprefix, 16);
if (c && c->manifest->version==version && c->manifest->fileLength==count)
m=c->manifest;
}
if (m){
// the rhizome store API doesn't support concurrent writing, so we have an active slot for this payload
// we need to close it first without freeing the manifest just yet.
if (slot && (slot->write_state.blob_fd>=0 ||
slot->write_state.blob_rowid>=0))
rhizome_fail_write(&slot->write_state);
if (rhizome_import_buffer(m, bytes, count)>=0 && !rhizome_import_received_bundle(m)){
INFOF("Completed MDP transfer in one hit for file %s", m->fileHexHash);
if (c)
candidate_unqueue(c);
}
if (slot)
rhizome_fetch_close(slot);
RETURN(0);
}
}
RETURN(-1);
OUT();
}

View File

@ -237,14 +237,17 @@ end:
#define HAS_MANIFESTS (1<<0)
/* Queue an advertisment for a single manifest */
int rhizome_advertise_manifest(rhizome_manifest *m){
int rhizome_advertise_manifest(struct subscriber *dest, rhizome_manifest *m){
struct overlay_frame *frame = malloc(sizeof(struct overlay_frame));
bzero(frame,sizeof(struct overlay_frame));
frame->type = OF_TYPE_RHIZOME_ADVERT;
frame->source = my_subscriber;
if (dest && (dest->reachable==REACHABLE_UNICAST || dest->reachable==REACHABLE_INDIRECT))
frame->destination = dest;
frame->ttl = 1;
frame->queue = OQ_OPPORTUNISTIC;
frame->payload = ob_new();
ob_limitsize(frame->payload, 800);
if (ob_append_byte(frame->payload, HAS_PORT|HAS_MANIFESTS)) goto error;

View File

@ -176,7 +176,7 @@ static int write_get_lock(struct rhizome_write *write_state){
return 0;
}
if (!sqlite_code_busy(ret))
return WHYF("sqlite3_blob_write() failed: %s",
return WHYF("sqlite3_blob_open() failed: %s",
sqlite3_errmsg(rhizome_db));
if (sqlite_retry(&retry, "sqlite3_blob_open")==0)
return WHYF("Giving up");
@ -384,7 +384,9 @@ int rhizome_write_file(struct rhizome_write *write, const char *filename){
unsigned char buffer[RHIZOME_CRYPT_PAGE_SIZE];
int ret=0;
write_get_lock(write);
ret = write_get_lock(write);
if (ret)
goto end;
while(write->file_offset < write->file_length){
int size=sizeof(buffer);
@ -476,16 +478,16 @@ int rhizome_finish_write(struct rhizome_write *write){
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;") == -1)
goto failure;
goto dbfailure;
if (write->id_known){
if (strcasecmp(write->id, hash_out)){
WHYF("Expected hash=%s, got %s", write->id, hash_out);
goto failure;
goto dbfailure;
}
if (sqlite_exec_void_retry(&retry, "UPDATE FILES SET inserttime=%lld, datavalid=1 WHERE id='%s'",
gettime_ms(), write->id) == -1)
goto failure;
goto dbfailure;
}else{
str_toupper_inplace(hash_out);
@ -500,22 +502,22 @@ int rhizome_finish_write(struct rhizome_write *write){
if (sqlite_exec_void_retry(&retry,
"UPDATE FILES SET id='%s', inserttime=%lld, datavalid=1 WHERE id='%s'",
hash_out, gettime_ms(), write->id) == -1)
goto failure;
goto dbfailure;
if (fd>=0){
char blob_path[1024];
char dest_path[1024];
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, write->id)){
WHYF("Failed to generate file path");
goto failure;
goto dbfailure;
}
if (!FORM_RHIZOME_DATASTORE_PATH(dest_path, hash_out)){
WHYF("Failed to generate file path");
goto failure;
goto dbfailure;
}
if (link(blob_path, dest_path)){
WHY_perror("link");
goto failure;
goto dbfailure;
}
if (unlink(blob_path))
@ -525,19 +527,20 @@ int rhizome_finish_write(struct rhizome_write *write){
if (sqlite_exec_void_retry(&retry,
"UPDATE FILEBLOBS SET id='%s' WHERE rowid=%lld",
hash_out, write->blob_rowid) == -1){
goto failure;
goto dbfailure;
}
}
}
strlcpy(write->id, hash_out, SHA512_DIGEST_STRING_LENGTH);
}
if (sqlite_exec_void_retry(&retry, "COMMIT;") == -1)
goto failure;
goto dbfailure;
write->blob_rowid=-1;
return 0;
failure:
dbfailure:
sqlite_exec_void_retry(&retry, "ROLLBACK;");
failure:
rhizome_fail_write(write);
return -1;
}
@ -570,6 +573,36 @@ int rhizome_import_file(rhizome_manifest *m, const char *filepath)
return 0;
}
// store a whole payload from a single buffer
int rhizome_import_buffer(rhizome_manifest *m, unsigned char *buffer, int length)
{
if (m->fileLength<=0)
return 0;
if (length!=m->fileLength)
return WHYF("Expected %"PRId64" bytes, got %d", m->fileLength, length);
/* Import the file first, checking the hash as we go */
struct rhizome_write write;
bzero(&write, sizeof(write));
int ret=rhizome_open_write(&write, m->fileHexHash, m->fileLength, RHIZOME_PRIORITY_DEFAULT);
if (ret!=0)
return ret;
// file payload is not in the store yet
if (rhizome_write_buffer(&write, buffer, length)){
rhizome_fail_write(&write);
return -1;
}
if (rhizome_finish_write(&write)){
rhizome_fail_write(&write);
return -1;
}
return 0;
}
int rhizome_stat_file(rhizome_manifest *m, const char *filepath)
{
long long existing = rhizome_manifest_get_ll(m, "filesize");
@ -939,7 +972,7 @@ int rhizome_read_cached(unsigned char *bundle_id, uint64_t version, time_ms_t ti
if (rhizome_open_read(&entry->read_state, filehash, 0)){
free(entry);
return -1;
return WHYF("Payload %s not found", filehash);
}
bcopy(bundle_id, entry->bundle_id, sizeof(entry->bundle_id));
entry->version = version;

View File

@ -85,7 +85,7 @@ doc_FileTransfer="New bundle and update transfer to one node"
setup_FileTransfer() {
setup_common
set_instance +A
rhizome_add_file file1
rhizome_add_file file1 2048
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
@ -142,7 +142,7 @@ doc_DisablingHTTPServer="Disabling HTTP rhizome transports works"
setup_DisablingHTTPServer() {
setup_common
set_instance +A
rhizome_add_file file1
rhizome_add_file file1 2048
executeOk_servald config set rhizome.http.enable 0
start_servald_instances +A
}
@ -157,7 +157,7 @@ setup_HTTPTransport() {
executeOk_servald config set rhizome.mdp.enable 0
set_instance +A
executeOk_servald config set rhizome.mdp.enable 0
rhizome_add_file file1
rhizome_add_file file1 2048
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
@ -173,7 +173,7 @@ setup_MDPTransport() {
executeOk_servald config \
set rhizome.http.enable 0
set_instance +A
rhizome_add_file file1
rhizome_add_file file1 2048
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
@ -189,7 +189,7 @@ setup_UnicastTransfer() {
executeOk_servald config \
set rhizome.http.enable 0
set_instance +A
rhizome_add_file file1
rhizome_add_file file1 2048
set_instance +B
executeOk_servald config \
set interfaces.1.file foo \
@ -209,8 +209,8 @@ setup_journalMDP() {
executeOk_servald config \
set rhizome.http.enable 0
set_instance +A
create_file file1 64
create_file file2 64
create_file file1 2048
create_file file2 2048
executeOk_servald rhizome journal append $SIDA "" file1
extract_stdout_manifestid BID
extract_stdout_version VERSION
@ -235,8 +235,8 @@ setup_journalHTTP() {
executeOk_servald config \
set rhizome.mdp.enable 0
set_instance +A
create_file file1 64
create_file file2 64
create_file file1 2048
create_file file2 2048
executeOk_servald rhizome journal append $SIDA "" file1
extract_stdout_manifestid BID
extract_stdout_version VERSION
@ -341,7 +341,7 @@ test_FileTransferBigHTTPExtBlob() {
# common setup and test routines for transfers to 4 nodes
setup_multitransfer_common() {
set_instance +A
rhizome_add_file file1
rhizome_add_file file1 2048
start_servald_instances +A +B +C +D +E
set_instance +A
assert_peers_are_instances +B +C +D +E