From e3a5e8c353f72bc55e3ce7629811440cbb8e6046 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Fri, 20 Sep 2013 12:24:06 +0930 Subject: [PATCH] Use heartbeat packets to control serial buffering - Improve simulation of fakeradio - Backtrack on partial RS errors in case of truncation / heartbeat insertion --- fakeradio.c | 45 ++++++++------- fec-3.0.1/decode_rs.h | 2 +- mavlink.c | 58 +++++++++++-------- overlay_interface.c | 127 ++++++++++++++++++++++++++---------------- serval.h | 1 + tests/rhizomeprotocol | 15 +++-- tests/routing | 6 +- 7 files changed, 149 insertions(+), 105 deletions(-) diff --git a/fakeradio.c b/fakeradio.c index 48df11ea..5a441d15 100644 --- a/fakeradio.c +++ b/fakeradio.c @@ -22,8 +22,9 @@ struct radio_state { const char *name; char commandbuffer[128]; int cb_len; - unsigned char txbuffer[2048]; + unsigned char txbuffer[1280]; int txb_len; + int tx_count; int wait_count; unsigned char rxbuffer[512]; int rxb_len; @@ -127,9 +128,19 @@ int dump(char *name, unsigned char *addr, int len) return 0; } +static void store_char(struct radio_state *s, unsigned char c) +{ + if(s->txb_lentxbuffer)){ + s->txbuffer[s->txb_len++]=c; + }else{ + log_time(); + fprintf(stderr, "*** Dropped char %02x\n", c); + } +} + int read_bytes(struct radio_state *s) { - unsigned char buff[256]; + unsigned char buff[8]; int i; int bytes=read(s->fd,buff,sizeof(buff)); if (bytes<=0) @@ -162,25 +173,13 @@ int read_bytes(struct radio_state *s) // or watch for "+++" if (buff[i]=='+'){ - // count +'s - if (s->state < STATE_PLUSPLUSPLUS){ + if (s->state < STATE_PLUSPLUSPLUS) s->state++; - }else if(s->txb_lentxbuffer)){ - s->txbuffer[s->txb_len++]=buff[i]; - } - continue; - } - - // regenerate any +'s we consumed - while(s->state > STATE_ONLINE){ - if(s->txb_lentxbuffer)) - s->txbuffer[s->txb_len++]='+'; - s->state--; - } + }else + s->state=STATE_ONLINE; // or append to the transmit buffer if there's room - if(s->txb_lentxbuffer)) - s->txbuffer[s->txb_len++]=buff[i]; + store_char(s,buff[i]); } return bytes; } @@ -188,6 +187,8 @@ int read_bytes(struct radio_state *s) int write_bytes(struct radio_state *s) { int wrote=s->rxb_len; + if (wrote>8) + wrote=8; if (s->last_char_ms) wrote = write(s->fd, s->rxbuffer, wrote); if (wrote>0){ @@ -353,9 +354,11 @@ int transfer_bytes(struct radio_state *radios) bcopy(&t->txbuffer[bytes], t->txbuffer, t->txb_len - bytes); t->txb_len-=bytes; - // swap who's turn it is to transmit - transmitter = receiver; - + if (bytes==0 || --t->tx_count<=0){ + // swap who's turn it is to transmit + transmitter = receiver; + r->tx_count=6; + } // set the wait time for the next transmission next_transmit_time = gettime_ms() + (bytes+10)/chars_per_ms; return bytes; diff --git a/fec-3.0.1/decode_rs.h b/fec-3.0.1/decode_rs.h index c165cf3d..8d8a5e71 100644 --- a/fec-3.0.1/decode_rs.h +++ b/fec-3.0.1/decode_rs.h @@ -285,7 +285,7 @@ } #endif /* Apply error to data */ - if (num1 != 0 && loc[j] >= PAD) { + if (num1 != 0 && loc[j] >= PAD && loc[j] < NN-NROOTS) { data[loc[j]-PAD] ^= ALPHA_TO[MODNN(INDEX_OF[num1] + INDEX_OF[num2] + NN - INDEX_OF[den])]; } } diff --git a/mavlink.c b/mavlink.c index b65943df..02eb9b49 100644 --- a/mavlink.c +++ b/mavlink.c @@ -147,7 +147,6 @@ int mavlink_encode_packet(struct overlay_interface *interface) count = 255-6-32; endP = 0; } - interface->txbuffer[0]=0xfe; // mavlink v1.0 frame /* payload len, excluding 6 byte header and 2 byte CRC. But we use a 4-byte CRC, so need to add two to count to make packet lengths @@ -210,7 +209,7 @@ extern int last_radio_rssi; extern int last_radio_temperature; extern int last_radio_rxpackets; -static int parse_heartbeat(const unsigned char *payload) +static int parse_heartbeat(struct overlay_interface *interface, const unsigned char *payload) { if (payload[0]==0xFE && payload[1]==9 @@ -222,11 +221,18 @@ static int parse_heartbeat(const unsigned char *payload) last_radio_rssi=(1.0*payload[10]-payload[13])/1.9; last_radio_temperature=-999; // doesn't get reported last_radio_rxpackets=-999; // doesn't get reported - if (1||config.debug.mavlink||gettime_ms()-last_rssi_time>30000) { - INFOF("Link budget = %+ddB, remote link budget = %+ddB, buffer space = %d%%", + int free_space = payload[12]; + int free_bytes = (free_space * 1280) / 100 - 30; + interface->remaining_space = free_bytes; + if (free_bytes>0) + interface->next_tx_allowed = gettime_ms(); + if (free_bytes>720) + interface->next_heartbeat=gettime_ms()+1000; + if (config.debug.mavlink||gettime_ms()-last_rssi_time>30000) { + INFOF("Link budget = %+ddB, remote link budget = %+ddB, buffer space = %d%% (approx %d)", last_radio_rssi, (int)((1.0*payload[11] - payload[14])/1.9), - payload[12]); + free_space, free_bytes); last_rssi_time=gettime_ms(); } return 1; @@ -234,43 +240,45 @@ static int parse_heartbeat(const unsigned char *payload) return 0; } -int mavlink_parse(struct overlay_interface *interface, struct slip_decode_state *state, - int packet_length, unsigned char *payload) +static int mavlink_parse(struct overlay_interface *interface, struct slip_decode_state *state, + int packet_length, unsigned char *payload, int *backtrack) { + *backtrack=0; if (packet_length==9){ // make sure we've heard the start and end of a remote heartbeat request int errs=0; int tail = golay_decode(&errs, &payload[14]); if (tail == 0x555){ - // if we've lost sync and have skipped bytes, drop any previous packet contents - if (state->mavlink_payload_start) - state->packet_length=sizeof(state->dst)+1; return 1; } return 0; } int data_bytes = packet_length - (32 - 2); - int errcount=decode_rs_8(&payload[4], NULL, 0, 223 - (data_bytes + 2)); - if (errcount==-1){ + // preserve the last 16 bytes of data + unsigned char old_footer[32]; + unsigned char *payload_footer=&payload[packet_length+8-sizeof(old_footer)]; + bcopy(payload_footer, old_footer, sizeof(old_footer)); + + int pad=223 - (data_bytes + 2); + int errors=decode_rs_8(&payload[4], NULL, 0, pad); + if (errors==-1){ if (config.debug.mavlink) DEBUGF("Reed-Solomon error correction failed"); return 0; } + *backtrack=errors; if (config.debug.mavlink){ DEBUGF("Received RS protected message, len: %d, errors: %d, flags:%s%s", data_bytes, - errcount, + errors, payload[4]&0x01?" start":"", payload[4]&0x02?" end":""); } if (payload[4]&0x01) state->packet_length=0; - else if (state->mavlink_payload_start) - // if we've lost sync and have skipped bytes, drop any previous packet contents - state->packet_length=sizeof(state->dst)+1; if (state->packet_length + data_bytes > sizeof(state->dst)){ if (config.debug.mavlink) @@ -315,8 +323,8 @@ int mavlink_decode(struct overlay_interface *interface, struct slip_decode_state { if (state->mavlink_payload_start + state->mavlink_payload_offset >= sizeof(state->mavlink_payload)){ // drop one byte if we run out of space - if (1||config.debug.mavlink) - DEBUGF("Dropped %02x", state->mavlink_payload[0]); + if (config.debug.mavlink) + DEBUGF("Dropped %02x, buffer full", state->mavlink_payload[0]); bcopy(state->mavlink_payload+1, state->mavlink_payload, sizeof(state->mavlink_payload) -1); state->mavlink_payload_start--; } @@ -337,6 +345,7 @@ int mavlink_decode(struct overlay_interface *interface, struct slip_decode_state state->mavlink_payload_length=9; break; } + if (decode_length(state, &p[1])==0) break; @@ -349,7 +358,7 @@ int mavlink_decode(struct overlay_interface *interface, struct slip_decode_state if (!state->mavlink_payload_length || state->mavlink_payload_offset < state->mavlink_payload_length+8) return 0; - if (parse_heartbeat(p)){ + if (parse_heartbeat(interface, p)){ // cut the bytes of the heartbeat out of the buffer state->mavlink_payload_offset -= state->mavlink_payload_length+8; if (state->mavlink_payload_offset){ @@ -364,17 +373,20 @@ int mavlink_decode(struct overlay_interface *interface, struct slip_decode_state } // is this a well formed packet? - if (mavlink_parse(interface, state, state->mavlink_payload_length, p)==1){ + int backtrack=0; + if (mavlink_parse(interface, state, state->mavlink_payload_length, p, &backtrack)==1){ // Since we know we've synced with the remote party, // and there's nothing we can do about any earlier data // throw away everything before the end of this packet - if (state->mavlink_payload_start && (1||config.debug.mavlink)) + if (state->mavlink_payload_start && config.debug.mavlink) dump("Skipped", state->mavlink_payload, state->mavlink_payload_start); - state->mavlink_payload_offset -= state->mavlink_payload_length+8; + // If the packet is truncated by less than 16 bytes, RS protection should be enough to recover the packet, + // but we may need to examine the last few bytes to find the start of the next packet. + state->mavlink_payload_offset -= state->mavlink_payload_length+8-backtrack; if (state->mavlink_payload_offset){ // shuffle all remaining bytes back to the start of the buffer - bcopy(&state->mavlink_payload[state->mavlink_payload_start + state->mavlink_payload_length+8], + bcopy(&state->mavlink_payload[state->mavlink_payload_start + state->mavlink_payload_length+8-backtrack], state->mavlink_payload, state->mavlink_payload_offset); } state->mavlink_payload_start=0; diff --git a/overlay_interface.c b/overlay_interface.c index c9d90821..f711df5d 100644 --- a/overlay_interface.c +++ b/overlay_interface.c @@ -672,63 +672,84 @@ static void write_stream_buffer(overlay_interface *interface){ // Throttle output to a prescribed bit-rate // first, reduce the number of bytes based on the configured burst size int bytes_allowed=interface->throttle_burst_write_size; - - if (interface->next_tx_allowed < now){ - int total_written=0; - while ((interface->tx_bytes_pending>0 || interface->tx_packet) && - (bytes_allowed>0 || interface->throttle_burst_write_size==0)) { - - if (interface->tx_bytes_pending==0 && interface->tx_packet){ - if (interface->next_heartbeat <= now){ - // Queue a hearbeat now - mavlink_heartbeat(interface->txbuffer,&interface->tx_bytes_pending); - interface->next_heartbeat = now+5000; - }else{ - // prepare a new link layer packet in txbuffer - if (mavlink_encode_packet(interface)) - break; - } - } - int bytes = interface->tx_bytes_pending; - if (interface->throttle_burst_write_size && bytes>bytes_allowed) - bytes=bytes_allowed; - if (config.debug.packetradio) - DEBUGF("Trying to write %d bytes of %d%s", bytes, interface->tx_bytes_pending, interface->tx_packet?", pending packet":""); - int written=write(interface->alarm.poll.fd, interface->txbuffer, bytes); - if (written<=0) - break; - - interface->tx_bytes_pending-=written; - total_written+=written; - bytes_allowed-=written; - if (interface->tx_bytes_pending) - bcopy(&interface->txbuffer[written],&interface->txbuffer[0], - interface->tx_bytes_pending); - if (config.debug.packetradio) - DEBUGF("Wrote %d bytes (%d left pending)", written, interface->tx_bytes_pending); + int total_written=0; + while (interface->tx_bytes_pending>0 || interface->tx_packet || interface->next_heartbeat <= now) { + + if (interface->tx_bytes_pending==0){ + if (interface->next_heartbeat <= now){ + // Queue a hearbeat now + mavlink_heartbeat(interface->txbuffer,&interface->tx_bytes_pending); + if (config.debug.packetradio) + DEBUGF("Built %d byte heartbeat", interface->tx_bytes_pending); + interface->next_heartbeat = now+1000; + }else if(interface->tx_packet && interface->remaining_space >= 256 + 8+9){ + // prepare a new link layer packet in txbuffer + if (mavlink_encode_packet(interface)) + break; + if (config.debug.packetradio) + DEBUGF("Built %d byte payload from packet (%d)", interface->tx_bytes_pending, interface->remaining_space); + if (interface->remaining_space - interface->tx_bytes_pending < 256 + 8+9) + interface->next_heartbeat = now; + } } + if (interface->next_tx_allowed > now) + break; + + int bytes = interface->tx_bytes_pending; + if (interface->throttle_burst_write_size && bytes>bytes_allowed) + bytes=bytes_allowed; + if (bytes<=0) + break; + + if (config.debug.packetradio) + DEBUGF("Trying to write %d bytes of %d%s", bytes, interface->tx_bytes_pending, interface->tx_packet?", pending packet":""); + int written=write(interface->alarm.poll.fd, interface->txbuffer, bytes); + if (written<=0){ + DEBUGF("Blocking for POLLOUT"); + break; + } + + interface->remaining_space-=written; + interface->tx_bytes_pending-=written; + total_written+=written; + bytes_allowed-=written; + if (interface->tx_bytes_pending){ + bcopy(&interface->txbuffer[written],&interface->txbuffer[0], + interface->tx_bytes_pending); + DEBUGF("Partial write, %d left", interface->tx_bytes_pending); + } + if (config.debug.packetradio) + DEBUGF("Wrote %d bytes (%d left pending, %d remains)", written, interface->tx_bytes_pending, interface->remaining_space); + } + + if (total_written>0){ // Now when are we allowed to send more? - if (interface->throttle_bytes_per_second>0) { - int delay = total_written*1000/interface->throttle_bytes_per_second; + int rate = interface->throttle_bytes_per_second; + if (interface->remaining_space<=0) + rate = 600; + if (rate){ + int delay = total_written*1000/rate; if (config.debug.throttling) - DEBUGF("Throttling for %dms.",delay); + DEBUGF("Throttling for %dms (%d).", delay, interface->remaining_space); interface->next_tx_allowed = now + delay; } } - if (interface->tx_bytes_pending>0 || interface->tx_packet){ - if (interface->next_tx_allowed > now){ - // We can't write now, so clear POLLOUT flag - interface->alarm.poll.events&=~POLLOUT; - // set the interface alarm to trigger another write - interface->alarm.alarm = interface->next_tx_allowed; - interface->alarm.deadline = interface->alarm.alarm+10; - } else { - // more to write, so set the POLLOUT flag - interface->alarm.poll.events|=POLLOUT; - } + time_ms_t next_write = interface->next_tx_allowed; + if (interface->tx_bytes_pending<=0){ + next_write = interface->next_heartbeat; + } + + if (interface->alarm.alarm==-1 || next_write < interface->alarm.alarm){ + interface->alarm.alarm = next_write; + interface->alarm.deadline = interface->alarm.alarm+10; + } + + if (interface->tx_bytes_pending>0 && next_write <= now){ + // more to write, so set the POLLOUT flag + interface->alarm.poll.events|=POLLOUT; } else { // Nothing to write, so clear POLLOUT flag interface->alarm.poll.events&=~POLLOUT; @@ -802,6 +823,16 @@ static void overlay_interface_poll(struct sched_ent *alarm) break; case SOCK_STREAM: interface_read_stream(interface); + // if we read a valid heartbeat packet, we may be able to write more bytes now. + if (interface->state==INTERFACE_STATE_UP && interface->remaining_space>0){ + write_stream_buffer(interface); + if (alarm->alarm!=-1 && interface->state==INTERFACE_STATE_UP) { + if (alarm->alarm < now) + alarm->alarm = now; + unschedule(alarm); + schedule(alarm); + } + } break; case SOCK_FILE: interface_read_file(interface); diff --git a/serval.h b/serval.h index 14827316..61ab30ab 100644 --- a/serval.h +++ b/serval.h @@ -456,6 +456,7 @@ typedef struct overlay_interface { uint32_t throttle_bytes_per_second; uint32_t throttle_burst_write_size; uint64_t next_tx_allowed; + int32_t remaining_space; time_ms_t next_heartbeat; diff --git a/tests/rhizomeprotocol b/tests/rhizomeprotocol index ab00e1a5..a4620ada 100755 --- a/tests/rhizomeprotocol +++ b/tests/rhizomeprotocol @@ -232,8 +232,10 @@ start_radio_instance() { set debug.rhizome_ads on \ set debug.rhizome_tx on \ set debug.rhizome_rx on \ - set rhizome.advertise.interval 1000 \ - set rhizome.rhizome_mdp_block_size 200 \ + set debug.throttling on \ + set debug.mavlink on \ + set rhizome.advertise.interval 5000 \ + set rhizome.rhizome_mdp_block_size 350 \ set log.console.level debug \ set log.console.show_pid on \ set log.console.show_time on \ @@ -241,23 +243,20 @@ start_radio_instance() { set interfaces.1.mdp_tick_ms 5000 \ set interfaces.1.socket_type STREAM \ set interfaces.1.encapsulation SINGLE \ - set interfaces.1.point_to_point on \ - set interfaces.1.packet_interval 5000 \ - set interfaces.1.burst_size 100 \ - set interfaces.1.throttle 32000 + set interfaces.1.point_to_point on start_servald_server wait_until interface_up } setup_SimulatedRadio() { setup_common - $servald_build_root/fakeradio 64 10000000 > "$SERVALD_VAR/radioout" 2> "$SERVALD_VAR/radioerr" & + $servald_build_root/fakeradio 6 10000000 > "$SERVALD_VAR/radioout" 2> "$SERVALD_VAR/radioerr" & FAKERADIO_PID=$! sleep 1 local END1=`head "$SERVALD_VAR/radioout" -n 1` local END2=`tail "$SERVALD_VAR/radioout" -n 1` tfw_log "Started fakeradio pid=$FAKERADIO_PID, end1=$END1, end2=$END2" set_instance +A - rhizome_add_file file1 2048 + rhizome_add_file file1 10000 executeOk_servald config \ set interfaces.1.file "$END1" set_instance +B diff --git a/tests/routing b/tests/routing index 440f9066..d7d838aa 100755 --- a/tests/routing +++ b/tests/routing @@ -218,7 +218,7 @@ setup_simulate_extender() { setup_servald assert_no_servald_processes foreach_instance +A +B create_single_identity - $servald_build_root/fakeradio 1 10000000 > "$SERVALD_VAR/radioout" 2> "$SERVALD_VAR/radioerr" & + $servald_build_root/fakeradio 1 20000000 > "$SERVALD_VAR/radioout" 2> "$SERVALD_VAR/radioerr" & FAKERADIO_PID=$! sleep 1 local END1=`head "$SERVALD_VAR/radioout" -n 1` @@ -240,9 +240,7 @@ setup_simulate_extender() { set interfaces.1.socket_type STREAM \ set interfaces.1.encapsulation SINGLE \ set interfaces.1.point_to_point on \ - set interfaces.1.packet_interval 5000 \ - set interfaces.1.burst_size 100 \ - set interfaces.1.throttle 1000 + set interfaces.1.packet_interval 5000 foreach_instance +A +B start_routing_instance } test_simulate_extender() {