Add rhizome test over fakeradio

This commit is contained in:
Jeremy Lakeman 2013-09-13 14:02:35 +09:30
parent 7076b5abb9
commit 2a5ba97b48
8 changed files with 90 additions and 25 deletions

View File

@ -24,6 +24,7 @@ struct radio_state {
int cb_len;
unsigned char txbuffer[2048];
int txb_len;
int wait_count;
unsigned char rxbuffer[512];
int rxb_len;
long long last_char_ms;
@ -316,10 +317,18 @@ int transfer_bytes(struct radio_state *radios)
send=p;
p++;
}
if (send<bytes){
log_time();
fprintf(stderr,"Only sending %d of the available %d bytes for %s\n", send, bytes, t->name);
if (send<bytes && !send){
if (bytes < PACKET_SIZE && t->wait_count++ <5){
log_time();
fprintf(stderr,"Waiting for more bytes for %s\n", t->name);
dump(NULL, t->txbuffer, bytes);
}else
send = bytes;
}
if (send)
t->wait_count=0;
bytes=send;
}

View File

@ -142,7 +142,7 @@ int mavlink_encode_packet(struct overlay_interface *interface)
int count = ob_remaining(interface->tx_packet);
int startP = !ob_position(interface->tx_packet);
int endP = 1;
if (count>255-6-32){
if (count+6+32 > 255){
count = 255-6-32;
endP = 0;
}

View File

@ -283,7 +283,7 @@ static int add_explain_response(struct subscriber *subscriber, void *context){
static int find_subscr_buffer(struct decode_context *context, struct overlay_buffer *b, int len, struct subscriber **subscriber){
if (len<=0 || len>SID_SIZE){
return WHY("Invalid abbreviation length");
return WHYF("Invalid abbreviation length %d", len);
}
unsigned char *id = ob_get_bytes_ptr(b, len);

View File

@ -386,7 +386,7 @@ int rhizome_ignore_manifest_check(unsigned char *bid_prefix, int prefix_len);
#define MAX_CANDIDATES 32
int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sockaddr_in *peerip,const unsigned char peersid[SID_SIZE]);
rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length);
rhizome_manifest * rhizome_fetch_search(const unsigned char *id, int prefix_length);
/* Rhizome file storage api */
struct rhizome_write_buffer{

View File

@ -228,7 +228,7 @@ static struct rhizome_fetch_slot *rhizome_find_fetch_slot(long long size)
// find the first matching active slot for this bundle
static struct rhizome_fetch_slot *fetch_search_slot(unsigned char *id, int prefix_length)
static struct rhizome_fetch_slot *fetch_search_slot(const unsigned char *id, int prefix_length)
{
int i;
for (i = 0; i < NQUEUES; ++i) {
@ -242,7 +242,7 @@ static struct rhizome_fetch_slot *fetch_search_slot(unsigned char *id, int prefi
}
// find the first matching candidate for this bundle
static struct rhizome_fetch_candidate *fetch_search_candidate(unsigned char *id, int prefix_length)
static struct rhizome_fetch_candidate *fetch_search_candidate(const unsigned char *id, int prefix_length)
{
int i, j;
for (i = 0; i < NQUEUES; ++i) {
@ -260,7 +260,7 @@ static struct rhizome_fetch_candidate *fetch_search_candidate(unsigned char *id,
}
/* Search all fetch slots, including active downloads, for a matching manifest */
rhizome_manifest * rhizome_fetch_search(unsigned char *id, int prefix_length){
rhizome_manifest * rhizome_fetch_search(const unsigned char *id, int prefix_length){
struct rhizome_fetch_slot *s = fetch_search_slot(id, prefix_length);
if (s)
return s->manifest;
@ -1533,7 +1533,7 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
rhizome_fetch_mdp_slot_callback(alarm);
break;
default:
default:
// timeout or socket error, close the socket
if (config.debug.rhizome_rx)
DEBUGF("Closing due to timeout or error %x (%x %x)", alarm->poll.revents, POLLHUP, POLLERR);

View File

@ -246,9 +246,10 @@ int rhizome_advertise_manifest(struct subscriber *dest, rhizome_manifest *m){
bzero(frame,sizeof(struct overlay_frame));
frame->type = OF_TYPE_RHIZOME_ADVERT;
frame->source = my_subscriber;
if (dest && (dest->reachable==REACHABLE_UNICAST || dest->reachable==REACHABLE_INDIRECT))
if (dest && dest->reachable&REACHABLE)
frame->destination = dest;
frame->ttl = 1;
else
frame->ttl = 1;
frame->queue = OQ_OPPORTUNISTIC;
frame->payload = ob_new();
@ -260,6 +261,9 @@ int rhizome_advertise_manifest(struct subscriber *dest, rhizome_manifest *m){
if (ob_append_bytes(frame->payload, m->manifestdata, m->manifest_all_bytes)) goto error;
ob_append_byte(frame->payload, 0xFF);
if (overlay_payload_enqueue(frame)) goto error;
if (config.debug.rhizome_ads)
DEBUGF("Advertising manifest %s %"PRId64" to %s",
alloca_tohex_bid(m->cryptoSignPublic), m->version, dest?alloca_tohex_sid(dest->sid):"broadcast");
return 0;
error:
@ -325,19 +329,13 @@ int overlay_rhizome_saw_advertisements(int i, struct decode_context *context, st
WHY("Error parsing manifest body");
goto next;
}
const char *id=alloca_tohex_bid(m->cryptoSignPublic);
char manifest_id_prefix[RHIZOME_MANIFEST_ID_STRLEN + 1];
if (rhizome_manifest_get(m, "id", manifest_id_prefix, sizeof manifest_id_prefix) == NULL) {
WHY("Manifest does not contain 'id' field");
goto next;
}
/* trim manifest ID to a prefix for ease of debugging
(that is the only use of this */
if (config.debug.rhizome_ads){
manifest_id_prefix[8]=0;
long long version = rhizome_manifest_get_ll(m, "version");
DEBUGF("manifest id=%s* version=%lld", manifest_id_prefix, version);
DEBUGF("manifest id=%s version=%lld", id, version);
}
/* Crude signature presence test */
@ -358,7 +356,7 @@ int overlay_rhizome_saw_advertisements(int i, struct decode_context *context, st
crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES)){
/* Ignoring manifest that has caused us problems recently */
if (config.debug.rhizome_ads)
DEBUGF("Ignoring manifest with errors: %s*", manifest_id_prefix);
DEBUGF("Ignoring manifest with errors: %s", id);
goto next;
}
@ -371,8 +369,13 @@ int overlay_rhizome_saw_advertisements(int i, struct decode_context *context, st
crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES, 60000);
goto next;
}
/* Manifest is okay, so see if it is worth storing */
// are we already fetching this bundle [or later]?
rhizome_manifest *mf=rhizome_fetch_search(id, crypto_sign_edwards25519sha512batch_PUBLICKEYBYTES);
if (mf && mf->version >= m->version)
goto next;
if (!rhizome_is_manifest_interesting(m)) {
/* We already have this version or newer */
if (config.debug.rhizome_ads)

3
slip.c
View File

@ -373,9 +373,6 @@ int slip_decode(struct slip_decode_state *state)
// We have to increment src_offset manually here, because returning
// prevents the post-increment in the for loop from triggering
state->src_offset++;
if (config.debug.mavlink) {
DEBUGF("Read %d byte packet from MAVLink frames",state->packet_length);
}
if (config.debug.mavlink_payloads||config.debug.interactive_io) {
DEBUG_packet_visualise("Received packet",state->dst,state->packet_length);
}

View File

@ -219,6 +219,62 @@ test_UnicastTransfer() {
receive_and_update_bundle
}
doc_SimulatedRadio="MDP Transfer over simulated radio link"
interface_up() {
$GREP "Interface .* is up" $instance_servald_log || return 1
return 0
}
start_radio_instance() {
executeOk_servald config \
set monitor.socket "org.servalproject.servald.monitor.socket.$TFWUNIQUE.$instance_name" \
set mdp.socket "org.servalproject.servald.mdp.socket.$TFWUNIQUE.$instance_name" \
set debug.rhizome on \
set debug.rhizome_ads on \
set debug.rhizome_tx on \
set debug.rhizome_rx on \
set rhizome.advertise.interval 1000 \
set rhizome.rhizome_mdp_block_size 200 \
set log.console.level debug \
set log.console.show_pid on \
set log.console.show_time on \
set interfaces.1.type CATEAR \
set interfaces.1.mdp_tick_ms 5000 \
set interfaces.1.socket_type STREAM \
set interfaces.1.encapsulation SINGLE \
set interfaces.1.point_to_point on \
set interfaces.1.packet_interval 5000 \
set interfaces.1.burst_size 100 \
set interfaces.1.throttle 32000
start_servald_server
wait_until interface_up
}
setup_SimulatedRadio() {
setup_common
$servald_build_root/fakeradio 64 10000000 > "$SERVALD_VAR/radioout" 2> "$SERVALD_VAR/radioerr" &
FAKERADIO_PID=$!
sleep 1
local END1=`head "$SERVALD_VAR/radioout" -n 1`
local END2=`tail "$SERVALD_VAR/radioout" -n 1`
tfw_log "Started fakeradio pid=$FAKERADIO_PID, end1=$END1, end2=$END2"
set_instance +A
rhizome_add_file file1 2048
executeOk_servald config \
set interfaces.1.file "$END1"
set_instance +B
executeOk_servald config \
set interfaces.1.file "$END2"
foreach_instance +A +B start_radio_instance
}
test_SimulatedRadio() {
receive_and_update_bundle
}
teardown_SimulatedRadio() {
teardown
tfw_log "Killing fakeradio, pid=$FAKERADIO_PID"
kill $FAKERADIO_PID
tfw_cat "$SERVALD_VAR/radioerr"
}
doc_journalMDP="Transfer and update a journal bundle via MDP"
setup_journalMDP() {
setup_common