added cursor limit pickling, restoration functions. cursor

functions now try to honour these. Not yet tested. #9
This commit is contained in:
gardners 2012-09-09 08:30:38 +09:30
parent 53228f078e
commit 0207d8e541
2 changed files with 135 additions and 21 deletions

View File

@ -389,6 +389,14 @@ int rhizome_http_server_start(int (*http_parse_func)(rhizome_http_request *),
int port_low,int port_high);
typedef struct rhizome_direct_bundle_cursor {
/* Where the current fill started */
long long start_size_high;
unsigned char start_bid_low[RHIZOME_MANIFEST_ID_BYTES];
/* Limit of where this cursor may traverse */
long long limit_size_high;
unsigned char limit_bid_high[RHIZOME_MANIFEST_ID_BYTES];
long long size_low;
long long size_high;
unsigned char bid_low[RHIZOME_MANIFEST_ID_BYTES];
@ -399,12 +407,20 @@ typedef struct rhizome_direct_bundle_cursor {
} rhizome_direct_bundle_cursor;
rhizome_direct_bundle_cursor *rhizome_direct_bundle_iterator(int buffer_size);
void rhizome_direct_bundle_iterator_unlimit(rhizome_direct_bundle_cursor *r);
int rhizome_direct_bundle_iterator_pickle_range(rhizome_direct_bundle_cursor *r,
unsigned char *pickled,
int pickle_buffer_size);
int rhizome_direct_bundle_iterator_unpickle_range(rhizome_direct_bundle_cursor *r,
const unsigned char *pickled,
int pickle_buffer_size);
int rhizome_direct_bundle_iterator_fill(rhizome_direct_bundle_cursor *c,
int max_bars);
void rhizome_direct_bundle_iterator_free(rhizome_direct_bundle_cursor **c);
int rhizome_direct_get_bars(const unsigned char bid_low[RHIZOME_MANIFEST_ID_BYTES],
unsigned char bid_high[RHIZOME_MANIFEST_ID_BYTES],
long long size_low,long long size_high,
const unsigned char bid_max[RHIZOME_MANIFEST_ID_BYTES],
unsigned char *bars_out,
int bars_requested);
int rhizome_direct_process_post_multipart_bytes

View File

@ -584,6 +584,19 @@ int app_rhizome_direct_sync(int argc, const char *const *argv,
{
DEBUGF("Got %d BARs",count);
dump("BARs",c->buffer,c->buffer_used);
/* Build HTTP POST to send to far end presenting these BARs and seeking
feedback from the far end as to which are new, or if the far end has
new content that we do not.
The POST needs to contain the {bundle-size,BID} range that this fill
covers.
XXX - Update cursor function to explicitly provide that information.
Really just needs to remember where the cursor was at the start of a
fill. The position of the cursor at the end of a fill is already
remembered.
*/
}
rhizome_direct_bundle_iterator_free(&c);
@ -600,27 +613,125 @@ rhizome_direct_bundle_cursor *rhizome_direct_bundle_iterator(int buffer_size)
r->size_low=0;
r->size_high=1024;
/* Make cursor initially unlimited in range */
rhizome_direct_bundle_iterator_unlimit(r);
return r;
}
void rhizome_direct_bundle_iterator_unlimit(rhizome_direct_bundle_cursor *r)
{
assert(r!=NULL);
r->limit_size_high=1LL<<48LL;
memset(r->limit_bid_high,0xff,RHIZOME_MANIFEST_ID_BYTES);
return;
}
int rhizome_direct_bundle_iterator_pickle_range(rhizome_direct_bundle_cursor *r,
unsigned char *pickled,
int pickle_buffer_size)
{
assert(r);
assert(pickle_buffer_size>=(1+4+1+4));
/* Pickled cursor ranges use the format:
byte - log2(start_size_high)
4 bytes - first eight bytes of start_bid_low.
byte - log2(size_high)
4 bytes - first eight bytes of bid_high.
For a total of 10 bytes.
We can get away with the short prefixes for the BIDs, because the worst case
scenario is that we include a small part of the BID address space that we
don't need to. That will happen MUCH less often than transferring cursor
ranges, which will happen with every rhizome direct sync.
*/
int v;
int ltwov=0;
v=r->start_size_high;
while(v) { ltwov++; v=v>>1; }
pickled[0]=ltwov;
for(v=0;v<4;v++) pickled[1+v]=r->start_bid_low[v];
v=r->size_high;
while(v) { ltwov++; v=v>>1; }
pickled[1+4]=ltwov;
for(v=0;v<4;v++) pickled[1+4+1+v]=r->bid_high[v];
return 1+4+1+4;
}
int rhizome_direct_bundle_iterator_unpickle_range(rhizome_direct_bundle_cursor *r,
const unsigned char *pickled,
int pickle_buffer_size)
{
assert(r);
if (pickle_buffer_size!=10) {
DEBUGF("pickled rhizome direct cursor ranges should be 10 bytes.");
return -1;
}
int v;
/* Get start of range */
r->size_high=1LL<<pickled[0];
r->size_low=(r->size_high/2)+1;
for(v=0;v<4;v++) r->bid_low[v]=pickled[1+v];
for(;v<RHIZOME_MANIFEST_ID_BYTES;v++) r->bid_low[v]=0x00;
/* Get end of range */
r->limit_size_high=1LL<<pickled[1+4];
for(v=0;v<4;v++) r->limit_bid_high[v]=pickled[1+4+1+v];
for(;v<RHIZOME_MANIFEST_ID_BYTES;v++) r->limit_bid_high[v]=0xff;
return 0;
}
int rhizome_direct_bundle_iterator_fill(rhizome_direct_bundle_cursor *c,int max_bars)
{
int bundles_stuffed=0;
c->buffer_used=0;
/* Note where we are starting the cursor fill from, so that the caller can easily
communicate the range of interest to the far end. We will eventually have a
cursor set function that will allow that information to be loaded back in at
the far end. We will similarly need to have a mechanism to limit the end of
the range that the cursor will cover, so that responses to the exact range
covered can be provided.. But first things first, remembering where the cursor
started.
*/
/* This is the only information required to remember where we started: */
c->start_size_high=c->size_high;
bcopy(c->bid_low,c->start_bid_low,RHIZOME_MANIFEST_ID_BYTES);
/* -1 is magic value for fill right up */
if (max_bars==-1) max_bars=c->buffer_size/RHIZOME_BAR_BYTES;
while (bundles_stuffed<max_bars
/* for now no bundle file can be bigger than 1TB */
&&c->size_low<0x100000000000LL)
while (bundles_stuffed<max_bars&&c->size_high<=c->limit_size_high)
{
/* Don't overrun the cursor's buffer */
int stuffable=(c->buffer_size-c->buffer_used)/RHIZOME_BAR_BYTES;
if (stuffable<=0) break;
/* Make sure we only get the range of BIDs allowed by the cursor limit.
If we are not yet at the bundle data size limit, then any bundle is okay.
If we are at the bundle data size limit, then we need to honour
c->limit_bid_high. */
unsigned char bid_max[RHIZOME_MANIFEST_ID_BYTES];
if (c->size_high==c->limit_size_high)
bcopy(c->limit_bid_high,bid_max,RHIZOME_MANIFEST_ID_BYTES);
else
memset(bid_max,0xff,RHIZOME_MANIFEST_ID_BYTES);
int stuffed_now=rhizome_direct_get_bars(c->bid_low,c->bid_high,
c->size_low,c->size_high,
bid_max,
&c->buffer[c->buffer_used],
stuffable);
bundles_stuffed+=stuffed_now;
@ -670,32 +781,19 @@ void rhizome_direct_bundle_iterator_free(rhizome_direct_bundle_cursor **c)
int rhizome_direct_get_bars(const unsigned char bid_low[RHIZOME_MANIFEST_ID_BYTES],
unsigned char bid_high[RHIZOME_MANIFEST_ID_BYTES],
long long size_low,long long size_high,
const unsigned char bid_max[RHIZOME_MANIFEST_ID_BYTES],
unsigned char *bars_out,
int bars_requested)
{
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
/* Get number of bundles available if required */
char query[1024];
snprintf(query,1024,
"SELECT COUNT(BAR) FROM MANIFESTS"
" WHERE ID>='%s' AND FILESIZE BETWEEN %lld AND %lld ORDER BY BAR LIMIT %d;",
alloca_tohex(bid_low,RHIZOME_MANIFEST_ID_BYTES),
size_low,size_high,
bars_requested);
long long tmp = 0;
if (sqlite_exec_int64_retry(&retry, &tmp, query) != 1)
{ return(WHY("Could not count BARs for advertisement")); }
int bundles_available = (int) tmp;
if(1)
DEBUGF("%d matching bundles in database.",bundles_available);
snprintf(query,1024,
"SELECT BAR,ROWID,ID FROM MANIFESTS"
" WHERE ID>='%s' AND FILESIZE BETWEEN %lld AND %lld"
" WHERE ID>='%s' AND ID<='%s' AND FILESIZE BETWEEN %lld AND %lld"
" ORDER BY BAR LIMIT %d;",
alloca_tohex(bid_low,RHIZOME_MANIFEST_ID_BYTES),
alloca_tohex(bid_max,RHIZOME_MANIFEST_ID_BYTES),
size_low,size_high,
bars_requested);