further work on rhizome direct dispatching etc.

For some reason finds the same manifest several times (size bin
filtering seems to not be working right).
Also sync doesn't realise it has finished, and so doesn't return
when done.
This commit is contained in:
gardners 2012-09-10 09:16:27 +09:30
parent d796a482b7
commit dd01f7939d
3 changed files with 212 additions and 25 deletions

View File

@ -427,5 +427,55 @@ int rhizome_direct_get_bars(const unsigned char bid_low[RHIZOME_MANIFEST_ID_BYTE
int rhizome_direct_process_post_multipart_bytes
(rhizome_http_request *r,const char *bytes,int count);
typedef struct rhizome_direct_sync_request {
struct sched_ent alarm;
rhizome_direct_bundle_cursor *cursor;
int pushP;
int pullP;
/* Sync interval in seconds. zero = sync only once */
int interval;
/* The dispatch function will be called each time a sync request can
be sent off, i.e., one cursor->buffer full of data.
Will differ based on underlying transport. HTTP is the initial
supported transport, but deLorme inReach will likely follow soon after.
*/
void (*dispatch_function)(struct rhizome_direct_sync_request *);
/* General purpose pointer for transport-dependent state */
void *transport_specific_state;
/* Statistics.
Each sync will consist of one or more "fills" of the cursor buffer, which
will then be dispatched by the transport-specific dispatch function.
Each of those dispatches may then result in zero or
*/
int syncs_started;
int syncs_completed;
int fills_sent;
int fill_responses_processed;
int bundles_pushed;
int bundles_pulled;
int bundle_transfers_in_progress;
} rhizome_direct_sync_request;
#define RHIZOME_DIRECT_MAX_SYNC_HANDLES 16
extern rhizome_direct_sync_request *rd_sync_handles[RHIZOME_DIRECT_MAX_SYNC_HANDLES];
extern int rd_sync_handle_count;
rhizome_direct_sync_request
*rhizome_direct_new_sync_request(
void (*transport_specific_dispatch_function)
(struct rhizome_direct_sync_request *),
int buffer_size,int interval, int mode,
void *transport_specific_state);
int rhizome_direct_continue_sync_request(rhizome_direct_sync_request *r);
int rhizome_direct_conclude_sync_request(rhizome_direct_sync_request *r);
void rhizome_direct_http_dispatch(rhizome_direct_sync_request *);
extern unsigned char favicon_bytes[];
extern int favicon_len;

View File

@ -112,6 +112,141 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "str.h"
#include <assert.h>
rhizome_direct_sync_request *rd_sync_handles[RHIZOME_DIRECT_MAX_SYNC_HANDLES];
int rd_sync_handle_count=0;
/* Create (but don't start) a rhizome direct sync request.
This creates the record to say that we want to undertake this synchronisation,
either once or at intervals as specified.
The start process actually triggers the first filling of a cursor buffer, and
then calls the transport specific dispatch function. The transport specific
dispatch function is expected to be asynchronous, and to call the continue
process.
The transport specific dispatch function is also expected to tell rhizome
direct about which bundles to send or receive, or to fetch/push them itself.
For IP-based transports, the built-in http transport will be suitable in
many cases. For non-IP transports the transport will have to take care of
the bundle transport as well.
*/
rhizome_direct_sync_request
*rhizome_direct_new_sync_request(
void (*transport_specific_dispatch_function)
(struct rhizome_direct_sync_request *),
int buffer_size,int interval, int mode, void *state)
{
assert(mode&3);
if (rd_sync_handle_count>=RHIZOME_DIRECT_MAX_SYNC_HANDLES)
{
DEBUGF("Too many Rhizome Direct synchronisation policies.");
return NULL;
}
rhizome_direct_sync_request *r=calloc(sizeof(rhizome_direct_sync_request),1);
assert(r!=NULL);
r->dispatch_function=transport_specific_dispatch_function;
r->transport_specific_state=state;
r->pushP=mode&1;
r->pullP=mode&2;
r->interval=interval;
r->cursor=rhizome_direct_bundle_iterator(buffer_size);
assert(r->cursor);
rd_sync_handles[rd_sync_handle_count++]=r;
return r;
}
/*
Initiate a synchronisation episode.
*/
int rhizome_direct_start_sync_request(rhizome_direct_sync_request *r)
{
assert(r);
assert(r->syncs_started==r->syncs_completed);
r->syncs_started++;
return rhizome_direct_continue_sync_request(r);
}
int rhizome_direct_continue_sync_request(rhizome_direct_sync_request *r)
{
assert(r);
assert(r->syncs_started==r->syncs_completed+1);
/* We might not get any BARs in the final fill, but it doesn't mean that
this cursor fill didn't cover a part of the BAR address space, so we
still have to send it.
We detect completion solely by whether on entering the call we have no
more BAR address space or bundle data size bin space left to explore.
In short, if the cursor's current position is the limit position,
then we can stop.
*/
if (r->cursor->size_high>=r->cursor->limit_size_high)
{
if (memcmp(r->cursor->bid_low,r->cursor->limit_bid_high,
RHIZOME_MANIFEST_ID_BYTES)>=0)
{
/* Sync has finished.
The transport may have initiated one or more transfers, so
we cannot declare the sync complete until we know the transport
has finished transferring. */
if (!r->bundle_transfers_in_progress)
{
/* seems that all is done */
return rhizome_direct_conclude_sync_request(r);
}
} else
DEBUGF("bid_low<limit_bid_high");
}
int count=rhizome_direct_bundle_iterator_fill(r->cursor,-1);
DEBUGF("Got %d BARs",count);
dump("BARs",r->cursor->buffer,
r->cursor->buffer_used+r->cursor->buffer_offset_bytes);
r->dispatch_function(r);
r->fills_sent++;
return count;
}
int rhizome_direct_conclude_sync_request(rhizome_direct_sync_request *r)
{
assert(r);
r->syncs_completed++;
/* reschedule if interval driven?
if one-shot, should we remove from the list of active sync requests?
*/
if (r->interval==0) {
int i;
for(i=0;i<rd_sync_handle_count;i++)
if (r==rd_sync_handles[i])
{
rhizome_direct_bundle_iterator_free(&r->cursor);
free(r);
if (i!=rd_sync_handle_count-1)
rd_sync_handles[i]=rd_sync_handles[rd_sync_handle_count-1];
rd_sync_handle_count--;
return 0;
}
DEBUGF("Couldn't find sync request handle in list.");
return -1;
}
return 0;
}
int app_rhizome_direct_sync(int argc, const char *const *argv,
struct command_line_option *o)
{
@ -124,33 +259,18 @@ int app_rhizome_direct_sync(int argc, const char *const *argv,
DEBUGF("sync direction = %d",mode);
/* Get iterator capable of 64KB buffering */
rhizome_direct_bundle_cursor *c=rhizome_direct_bundle_iterator(0x10000);
int count;
while((count=rhizome_direct_bundle_iterator_fill(c,-1)))
{
DEBUGF("Got %d BARs",count);
dump("BARs",c->buffer,c->buffer_used+c->buffer_offset_bytes);
/* Get iterator capable of 64KB buffering.
In future we should parse the sync URL and base the buffer size on the
transport and allowable traffic volumes. */
rhizome_direct_sync_request
*s = rhizome_direct_new_sync_request(rhizome_direct_http_dispatch,
65536,0,mode,NULL);
/* Build HTTP POST to send to far end presenting these BARs and seeking
feedback from the far end as to which are new, or if the far end has
new content that we do not.
rhizome_direct_start_sync_request(s);
The iterator prepares the buffer entirely, including putting the cursor
range covered, so that the far end can unpack it, search their corresponding
space and return their results.
*/
rhizome_direct_bundle_cursor *t=rhizome_direct_bundle_iterator(0x10000);
rhizome_direct_bundle_iterator_unpickle_range(t,c->buffer,10);
DEBUGF("Unpickled: size bins=%lld..%lld, %08x - %08x",
t->size_high,t->limit_size_high,
*(int*)&t->bid_low[0],
*(int*)&t->limit_bid_high[0]);
rhizome_direct_bundle_iterator_free(&t);
}
rhizome_direct_bundle_iterator_free(&c);
while(fd_poll()&&(rd_sync_handle_count>0)) continue;
return -1;
return 0;
}
rhizome_direct_bundle_cursor *rhizome_direct_bundle_iterator(int buffer_size)
@ -211,6 +331,7 @@ int rhizome_direct_bundle_iterator_pickle_range(rhizome_direct_bundle_cursor *r,
for(v=0;v<4;v++) pickled[1+v]=r->start_bid_low[v];
v=r->size_high;
DEBUGF("pickling size_high=%lld",r->size_high);
ltwov=0;
while(v>1) { ltwov++; v=v>>1; }
pickled[1+4]=ltwov;
for(v=0;v<4;v++) pickled[1+4+1+v]=r->bid_high[v];
@ -294,12 +415,18 @@ int rhizome_direct_bundle_iterator_fill(rhizome_direct_bundle_cursor *c,int max_
c->buffer_used+=RHIZOME_BAR_BYTES*stuffed_now;
if (!stuffed_now) {
/* no more matches in this size bin, so move up a size bin */
DEBUGF("Continue from next size bin");
c->size_low=c->size_high+1;
c->size_high*=2;
/* Record that we covered to the end of that size bin */
memset(c->bid_high,0xff,RHIZOME_MANIFEST_ID_BYTES);
if (c->size_high>c->limit_size_high)
memset(c->bid_low,0xff,RHIZOME_MANIFEST_ID_BYTES);
else
memset(c->bid_low,0x00,RHIZOME_MANIFEST_ID_BYTES);
} else {
/* Continue from next BID */
DEBUGF("Continue from next BID");
bcopy(c->bid_high,c->bid_low,RHIZOME_MANIFEST_ID_BYTES);
int i;
for(i=RHIZOME_BAR_BYTES-1;i>=0;i--)

View File

@ -476,3 +476,13 @@ int rhizome_direct_parse_http_request(rhizome_http_request *r)
return 0;
}
void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
{
DEBUGF("Dispatch size_high=%lld",r->cursor->size_high);
/* Warning: tail recursion when done this way.
Should be triggered by an asynchronous event.
But this will do for now. */
rhizome_direct_continue_sync_request(r);
}