Cause rhizome add to poke the daemon and trigger all bundle add notifications

This commit is contained in:
Jeremy Lakeman 2017-03-06 14:01:14 +10:30
parent c739555e8c
commit 2540c9e633
6 changed files with 147 additions and 82 deletions

View File

@ -102,6 +102,9 @@ struct mdp_identity_request {
#define MDP_ROUTE_TABLE 5 #define MDP_ROUTE_TABLE 5
/* Tell the daemon to check for new manifests after adding a bundle from any other process */
#define MDP_SYNC_RHIZOME 6
struct overlay_mdp_scan{ struct overlay_mdp_scan{
struct in_addr addr; struct in_addr addr;
}; };

View File

@ -1569,6 +1569,11 @@ static void mdp_process_packet(struct socket_address *client, struct mdp_header
server_config_reload(NULL); server_config_reload(NULL);
mdp_reply_ok(client, header); mdp_reply_ok(client, header);
break; break;
case MDP_SYNC_RHIZOME:
DEBUGF(mdprequests, "Processing MDP_SYNC_RHIZOME from %s", alloca_socket_address(client));
server_rhizome_add_bundle(INT64_MAX);
mdp_reply_ok(client, header);
break;
case MDP_INTERFACE: case MDP_INTERFACE:
DEBUGF(mdprequests, "Processing MDP_INTERFACE from %s", alloca_socket_address(client)); DEBUGF(mdprequests, "Processing MDP_INTERFACE from %s", alloca_socket_address(client));
mdp_interface_packet(client, header, payload); mdp_interface_packet(client, header, payload);

View File

@ -31,6 +31,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "keyring.h" #include "keyring.h"
#include "server.h" #include "server.h"
#include "commandline.h" #include "commandline.h"
#include "mdp_client.h"
static int rhizome_delete_manifest_retry(sqlite_retry_state *retry, const rhizome_bid_t *bidp); static int rhizome_delete_manifest_retry(sqlite_retry_state *retry, const rhizome_bid_t *bidp);
@ -60,6 +61,7 @@ int is_debug_rhizome_ads()
static int (*sqlite_trace_func)() = is_debug_rhizome; static int (*sqlite_trace_func)() = is_debug_rhizome;
const struct __sourceloc *sqlite_trace_whence = NULL; const struct __sourceloc *sqlite_trace_whence = NULL;
static int sqlite_trace_done; static int sqlite_trace_done;
static uint64_t max_rowid=0;
/* This callback conditionally logs all rendered SQL statements. This function is registered with /* This callback conditionally logs all rendered SQL statements. This function is registered with
* SQLite as the 'trace callback'. SQLite invokes it with mask == SQLITE_TRACE_STMT when about to * SQLite as the 'trace callback'. SQLite invokes it with mask == SQLITE_TRACE_STMT when about to
@ -370,6 +372,13 @@ int rhizome_opendb()
// We can't delete a file that is being transferred in another process at this very moment... // We can't delete a file that is being transferred in another process at this very moment...
if (config.rhizome.clean_on_open) if (config.rhizome.clean_on_open)
rhizome_cleanup(NULL); rhizome_cleanup(NULL);
if (serverMode){
sqlite_exec_uint64_retry(&retry, &max_rowid,
"SELECT max(rowid) "
"FROM manifests", END);
}
INFOF("Opened Rhizome database %s, UUID=%s", dbpath, alloca_uuid_str(rhizome_db_uuid)); INFOF("Opened Rhizome database %s, UUID=%s", dbpath, alloca_uuid_str(rhizome_db_uuid));
RETURN(0); RETURN(0);
OUT(); OUT();
@ -1312,6 +1321,40 @@ int rhizome_cleanup(struct rhizome_cleanup_report *report)
OUT(); OUT();
} }
static void sync_rhizome(){
if (server_pid()<=0)
return;
/* Bind to MDP socket and await confirmation */
struct mdp_header mdp_header = {
.remote.port = MDP_SYNC_RHIZOME,
};
int mdpsock = mdp_socket();
if (mdpsock == -1)
WARN("cannot create MDP socket");
set_nonblock(mdpsock);
int r = mdp_send(mdpsock, &mdp_header, NULL, 0);
if (r == -1)
goto end;
time_ms_t deadline = gettime_ms() + 10000; // TODO add --timeout option?
struct mdp_header rev_header;
do {
ssize_t len = mdp_poll_recv(mdpsock, deadline, &rev_header, NULL, 0);
if (len == -1){
r = -1;
goto end;
}
if (len == -2) {
WHYF("timeout while synchronising daemon bundle list");
r = -1;
goto end;
}
} while (!(rev_header.flags & MDP_FLAG_CLOSE));
r = 0;
end:
mdp_close(mdpsock);
}
/* /*
Store the specified manifest into the sqlite database. Store the specified manifest into the sqlite database.
We assume that sufficient space has been made for us. We assume that sufficient space has been made for us.
@ -1409,8 +1452,18 @@ int rhizome_store_manifest(rhizome_manifest *m)
alloca_tohex_rhizome_bid_t(m->keypair.public_key), alloca_tohex_rhizome_bid_t(m->keypair.public_key),
m->version m->version
); );
if (serverMode) if (serverMode){
assert(max_rowid < m->rowid);
// detect any bundles added by the CLI
// due to potential race conditions, we have to do this here
// even though the CLI will try to send us a MDP_SYNC_RHIZOME message
if (m->rowid > max_rowid+1)
server_rhizome_add_bundle(m->rowid);
max_rowid = m->rowid;
CALL_TRIGGER(bundle_add, m); CALL_TRIGGER(bundle_add, m);
}else{
sync_rhizome();
}
return 0; return 0;
} }
rollback: rollback:
@ -1710,16 +1763,7 @@ next:
* Returns RHIZOME_BUNDLE_STATUS_BUSY if the database is locked * Returns RHIZOME_BUNDLE_STATUS_BUSY if the database is locked
* Caller is responsible for allocating and freeing rhizome_manifest * Caller is responsible for allocating and freeing rhizome_manifest
*/ */
static enum rhizome_bundle_status unpack_manifest_row(sqlite_retry_state *retry, rhizome_manifest *m, sqlite3_stmt *statement) static int unpack_manifest_row(rhizome_manifest *m, sqlite3_stmt *statement){
{
int r=sqlite_step_retry(retry, statement);
if (sqlite_code_busy(r))
return RHIZOME_BUNDLE_STATUS_BUSY;
if (!sqlite_code_ok(r))
return RHIZOME_BUNDLE_STATUS_ERROR;
if (r!=SQLITE_ROW)
return RHIZOME_BUNDLE_STATUS_NEW;
const char *q_id = (const char *) sqlite3_column_text(statement, 0); const char *q_id = (const char *) sqlite3_column_text(statement, 0);
const char *q_blob = (char *) sqlite3_column_blob(statement, 1); const char *q_blob = (char *) sqlite3_column_blob(statement, 1);
uint64_t q_version = sqlite3_column_int64(statement, 2); uint64_t q_version = sqlite3_column_int64(statement, 2);
@ -1742,6 +1786,21 @@ static enum rhizome_bundle_status unpack_manifest_row(sqlite_retry_state *retry,
WARNF("Version mismatch, manifest is %"PRIu64", database is %"PRIu64, m->version, q_version); WARNF("Version mismatch, manifest is %"PRIu64", database is %"PRIu64, m->version, q_version);
rhizome_manifest_set_rowid(m, q_rowid); rhizome_manifest_set_rowid(m, q_rowid);
rhizome_manifest_set_inserttime(m, q_inserttime); rhizome_manifest_set_inserttime(m, q_inserttime);
return 0;
}
static enum rhizome_bundle_status step_unpack_manifest_row(sqlite_retry_state *retry, rhizome_manifest *m, sqlite3_stmt *statement)
{
int r=sqlite_step_retry(retry, statement);
if (sqlite_code_busy(r))
return RHIZOME_BUNDLE_STATUS_BUSY;
if (!sqlite_code_ok(r))
return RHIZOME_BUNDLE_STATUS_ERROR;
if (r!=SQLITE_ROW)
return RHIZOME_BUNDLE_STATUS_NEW;
if (unpack_manifest_row(m, statement)==-1)
return RHIZOME_BUNDLE_STATUS_ERROR;
return RHIZOME_BUNDLE_STATUS_SAME; return RHIZOME_BUNDLE_STATUS_SAME;
} }
@ -1763,7 +1822,7 @@ enum rhizome_bundle_status rhizome_retrieve_manifest(const rhizome_bid_t *bidp,
END); END);
if (!statement) if (!statement)
return RHIZOME_BUNDLE_STATUS_ERROR; return RHIZOME_BUNDLE_STATUS_ERROR;
enum rhizome_bundle_status ret = unpack_manifest_row(&retry, m, statement); enum rhizome_bundle_status ret = step_unpack_manifest_row(&retry, m, statement);
sqlite3_finalize(statement); sqlite3_finalize(statement);
return ret; return ret;
} }
@ -1790,7 +1849,7 @@ enum rhizome_bundle_status rhizome_retrieve_manifest_by_prefix(const unsigned ch
END); END);
if (!statement) if (!statement)
return RHIZOME_BUNDLE_STATUS_ERROR; return RHIZOME_BUNDLE_STATUS_ERROR;
enum rhizome_bundle_status ret = unpack_manifest_row(&retry, m, statement); enum rhizome_bundle_status ret = step_unpack_manifest_row(&retry, m, statement);
sqlite3_finalize(statement); sqlite3_finalize(statement);
return ret; return ret;
} }
@ -1809,7 +1868,7 @@ enum rhizome_bundle_status rhizome_retrieve_manifest_by_hash_prefix(const uint8_
END); END);
if (!statement) if (!statement)
return RHIZOME_BUNDLE_STATUS_ERROR; return RHIZOME_BUNDLE_STATUS_ERROR;
enum rhizome_bundle_status ret = unpack_manifest_row(&retry, m, statement); enum rhizome_bundle_status ret = step_unpack_manifest_row(&retry, m, statement);
sqlite3_finalize(statement); sqlite3_finalize(statement);
return ret; return ret;
} }
@ -1860,6 +1919,30 @@ end:
return ret; return ret;
} }
// Detect bundles added from the cmdline, and call trigger functions
void server_rhizome_add_bundle(uint64_t rowid){
assert(serverMode);
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare_bind(&retry,
"SELECT id, manifest, version, inserttime, author, rowid FROM manifests WHERE rowid > ? AND rowid < ?"
"ORDER BY rowid",
INT64, max_rowid, INT64, rowid, END);
while (sqlite_step_retry(&retry, statement) == SQLITE_ROW) {
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
break;
if (unpack_manifest_row(m, statement)!=-1){
if (rhizome_manifest_verify(m)){
assert(max_rowid < m->rowid);
max_rowid = m->rowid;
CALL_TRIGGER(bundle_add, m);
}
}
rhizome_manifest_free(m);
}
sqlite3_finalize(statement);
}
static int rhizome_delete_manifest_retry(sqlite_retry_state *retry, const rhizome_bid_t *bidp) static int rhizome_delete_manifest_retry(sqlite_retry_state *retry, const rhizome_bid_t *bidp)
{ {
sqlite3_stmt *statement = sqlite_prepare_bind(retry, sqlite3_stmt *statement = sqlite_prepare_bind(retry,

View File

@ -65,8 +65,6 @@ struct rhizome_sync
int bar_count; int bar_count;
}; };
static uint64_t max_token=0;
DEFINE_ALARM(rhizome_sync_announce); DEFINE_ALARM(rhizome_sync_announce);
void rhizome_sync_status_html(struct strbuf *b, struct subscriber *subscriber) void rhizome_sync_status_html(struct strbuf *b, struct subscriber *subscriber)
@ -252,45 +250,8 @@ static int sync_bundle_inserted(void **record, void *context)
return 0; return 0;
} }
static void annouce_cli_bundle_add(uint64_t row_id)
{
if (row_id<=max_token)
return;
if (max_token!=0){
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
sqlite3_stmt *statement = sqlite_prepare_bind(&retry,
"SELECT manifest FROM manifests WHERE rowid > ? AND rowid <= ? ORDER BY rowid ASC;",
INT64, max_token, INT64, row_id, END);
while (sqlite_step_retry(&retry, statement) == SQLITE_ROW) {
const void *blob = sqlite3_column_blob(statement, 0);
size_t blob_length = sqlite3_column_bytes(statement, 0);
rhizome_manifest *m = rhizome_new_manifest();
if (m) {
memcpy(m->manifestdata, blob, blob_length);
m->manifest_all_bytes = blob_length;
if ( rhizome_manifest_parse(m) != -1
&& rhizome_manifest_validate(m)
&& rhizome_manifest_verify(m)
) {
assert(m->finalised);
CALL_TRIGGER(bundle_add, m);
}
rhizome_manifest_free(m);
}
}
sqlite3_finalize(statement);
}
max_token = row_id;
}
static void rhizome_sync_bundle_inserted(rhizome_manifest *m) static void rhizome_sync_bundle_inserted(rhizome_manifest *m)
{ {
annouce_cli_bundle_add(m->rowid - 1);
if (m->rowid > max_token)
max_token = m->rowid;
rhizome_bar_t bar; rhizome_bar_t bar;
rhizome_manifest_to_bar(m, &bar); rhizome_manifest_to_bar(m, &bar);
enum_subscribers(NULL, sync_bundle_inserted, (void *)&bar); enum_subscribers(NULL, sync_bundle_inserted, (void *)&bar);
@ -311,20 +272,25 @@ static int sync_cache_bar(struct rhizome_sync *state, const rhizome_bar_t *bar,
if (state->bar_count>=CACHE_BARS) if (state->bar_count>=CACHE_BARS)
return 0; return 0;
// check the database before adding the BAR to the list // check the database before adding the BAR to the list
if (token!=0 && rhizome_is_bar_interesting(bar)!=0){ if (token!=0){
if (!state->bars){ enum rhizome_bundle_status status = rhizome_is_bar_interesting(bar);
state->bars = emalloc(sizeof(struct bar_entry) * CACHE_BARS); if (status == RHIZOME_BUNDLE_STATUS_NEW){
if (!state->bars) if (!state->bars){
return -1; state->bars = emalloc(sizeof(struct bar_entry) * CACHE_BARS);
if (!state->bars)
return -1;
}
DEBUGF(rhizome_sync, "Remembering BAR %s", alloca_tohex_rhizome_bar_t(bar));
state->bars[state->bar_count].bar = *bar;
state->bars[state->bar_count].next_request = gettime_ms();
state->bars[state->bar_count].tries = MAX_TRIES;
state->bar_count++;
ret=1;
}else if(status != RHIZOME_BUNDLE_STATUS_SAME){
return -1;
} }
DEBUGF(rhizome_sync, "Remembering BAR %s", alloca_tohex_rhizome_bar_t(bar));
state->bars[state->bar_count].bar = *bar;
state->bars[state->bar_count].next_request = gettime_ms();
state->bars[state->bar_count].tries = MAX_TRIES;
state->bar_count++;
ret=1;
} }
if (state->sync_end < token){ if (state->sync_end < token){
state->sync_end = token; state->sync_end = token;
@ -490,7 +456,7 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
ob_append_byte(b, MSG_TYPE_BARS); ob_append_byte(b, MSG_TYPE_BARS);
ob_checkpoint(b); ob_checkpoint(b);
while(sqlite_step_retry(&retry, statement)==SQLITE_ROW){ while(count < max_count && sqlite_step_retry(&retry, statement)==SQLITE_ROW){
uint64_t rowid = sqlite3_column_int64(statement, 0); uint64_t rowid = sqlite3_column_int64(statement, 0);
const unsigned char *bar = sqlite3_column_blob(statement, 1); const unsigned char *bar = sqlite3_column_blob(statement, 1);
size_t bar_size = sqlite3_column_bytes(statement, 1); size_t bar_size = sqlite3_column_bytes(statement, 1);
@ -504,35 +470,28 @@ static void sync_send_response(struct subscriber *dest, int forwards, uint64_t t
if (token!=HEAD_FLAG){ if (token!=HEAD_FLAG){
ob_checkpoint(b); ob_checkpoint(b);
append_response(b, token, NULL); append_response(b, token, NULL);
if (ob_overrun(b)) if (ob_overrun(b)){
ob_rewind(b); ob_rewind(b);
else { break;
count++;
last = token;
} }
count++;
last = token;
}else }else
token = rowid; token = rowid;
} }
ob_checkpoint(b); ob_checkpoint(b);
append_response(b, rowid, bar); append_response(b, rowid, bar);
if (ob_overrun(b)) if (ob_overrun(b)){
ob_rewind(b); ob_rewind(b);
else { break;
last = rowid;
count++;
} }
last = rowid;
count++;
} }
if (count >= max_count && rowid <= max_token)
break;
} }
sqlite3_finalize(statement); sqlite3_finalize(statement);
if (token != HEAD_FLAG && token > max_token){
// report bundles added by cli
annouce_cli_bundle_add(token);
}
// send a zero lower bound if we reached the end of our manifest list // send a zero lower bound if we reached the end of our manifest list
if (count && count < max_count && !forwards){ if (count && count < max_count && !forwards){
ob_checkpoint(b); ob_checkpoint(b);

View File

@ -46,4 +46,6 @@ int server_bind();
void server_loop(time_ms_t (*waiting)(time_ms_t, time_ms_t, time_ms_t), void (*wokeup)()); void server_loop(time_ms_t (*waiting)(time_ms_t, time_ms_t, time_ms_t), void (*wokeup)());
void server_rhizome_add_bundle(uint64_t rowid);
#endif // __SERVAL_DNA__SERVER_H #endif // __SERVAL_DNA__SERVER_H

View File

@ -91,6 +91,19 @@ test_FileTransfer() {
receive_and_update_bundle receive_and_update_bundle
} }
doc_FirstFileTransfer="First bundle added to running daemon transfers to one node"
setup_FirstFileTransfer() {
setup_common
start_servald_instances +A +B
foreach_instance +A assert_peers_are_instances +B
foreach_instance +B assert_peers_are_instances +A
}
test_FirstFileTransfer(){
set_instance +A
rhizome_add_file file1 250000
receive_and_update_bundle
}
doc_EncryptedTransfer="Encrypted payload can be opened by destination" doc_EncryptedTransfer="Encrypted payload can be opened by destination"
setup_EncryptedTransfer() { setup_EncryptedTransfer() {
setup_common setup_common