Use heartbeat packets to control serial buffering

- Improve simulation of fakeradio
- Backtrack on partial RS errors in case of truncation / heartbeat insertion
This commit is contained in:
Jeremy Lakeman 2013-09-20 12:24:06 +09:30
parent 4b9817218d
commit e3a5e8c353
7 changed files with 149 additions and 105 deletions

View File

@ -22,8 +22,9 @@ struct radio_state {
const char *name; const char *name;
char commandbuffer[128]; char commandbuffer[128];
int cb_len; int cb_len;
unsigned char txbuffer[2048]; unsigned char txbuffer[1280];
int txb_len; int txb_len;
int tx_count;
int wait_count; int wait_count;
unsigned char rxbuffer[512]; unsigned char rxbuffer[512];
int rxb_len; int rxb_len;
@ -127,9 +128,19 @@ int dump(char *name, unsigned char *addr, int len)
return 0; return 0;
} }
static void store_char(struct radio_state *s, unsigned char c)
{
if(s->txb_len<sizeof(s->txbuffer)){
s->txbuffer[s->txb_len++]=c;
}else{
log_time();
fprintf(stderr, "*** Dropped char %02x\n", c);
}
}
int read_bytes(struct radio_state *s) int read_bytes(struct radio_state *s)
{ {
unsigned char buff[256]; unsigned char buff[8];
int i; int i;
int bytes=read(s->fd,buff,sizeof(buff)); int bytes=read(s->fd,buff,sizeof(buff));
if (bytes<=0) if (bytes<=0)
@ -162,25 +173,13 @@ int read_bytes(struct radio_state *s)
// or watch for "+++" // or watch for "+++"
if (buff[i]=='+'){ if (buff[i]=='+'){
// count +'s if (s->state < STATE_PLUSPLUSPLUS)
if (s->state < STATE_PLUSPLUSPLUS){
s->state++; s->state++;
}else if(s->txb_len<sizeof(s->txbuffer)){ }else
s->txbuffer[s->txb_len++]=buff[i]; s->state=STATE_ONLINE;
}
continue;
}
// regenerate any +'s we consumed
while(s->state > STATE_ONLINE){
if(s->txb_len<sizeof(s->txbuffer))
s->txbuffer[s->txb_len++]='+';
s->state--;
}
// or append to the transmit buffer if there's room // or append to the transmit buffer if there's room
if(s->txb_len<sizeof(s->txbuffer)) store_char(s,buff[i]);
s->txbuffer[s->txb_len++]=buff[i];
} }
return bytes; return bytes;
} }
@ -188,6 +187,8 @@ int read_bytes(struct radio_state *s)
int write_bytes(struct radio_state *s) int write_bytes(struct radio_state *s)
{ {
int wrote=s->rxb_len; int wrote=s->rxb_len;
if (wrote>8)
wrote=8;
if (s->last_char_ms) if (s->last_char_ms)
wrote = write(s->fd, s->rxbuffer, wrote); wrote = write(s->fd, s->rxbuffer, wrote);
if (wrote>0){ if (wrote>0){
@ -353,9 +354,11 @@ int transfer_bytes(struct radio_state *radios)
bcopy(&t->txbuffer[bytes], t->txbuffer, t->txb_len - bytes); bcopy(&t->txbuffer[bytes], t->txbuffer, t->txb_len - bytes);
t->txb_len-=bytes; t->txb_len-=bytes;
if (bytes==0 || --t->tx_count<=0){
// swap who's turn it is to transmit // swap who's turn it is to transmit
transmitter = receiver; transmitter = receiver;
r->tx_count=6;
}
// set the wait time for the next transmission // set the wait time for the next transmission
next_transmit_time = gettime_ms() + (bytes+10)/chars_per_ms; next_transmit_time = gettime_ms() + (bytes+10)/chars_per_ms;
return bytes; return bytes;

View File

@ -285,7 +285,7 @@
} }
#endif #endif
/* Apply error to data */ /* Apply error to data */
if (num1 != 0 && loc[j] >= PAD) { if (num1 != 0 && loc[j] >= PAD && loc[j] < NN-NROOTS) {
data[loc[j]-PAD] ^= ALPHA_TO[MODNN(INDEX_OF[num1] + INDEX_OF[num2] + NN - INDEX_OF[den])]; data[loc[j]-PAD] ^= ALPHA_TO[MODNN(INDEX_OF[num1] + INDEX_OF[num2] + NN - INDEX_OF[den])];
} }
} }

View File

@ -147,7 +147,6 @@ int mavlink_encode_packet(struct overlay_interface *interface)
count = 255-6-32; count = 255-6-32;
endP = 0; endP = 0;
} }
interface->txbuffer[0]=0xfe; // mavlink v1.0 frame interface->txbuffer[0]=0xfe; // mavlink v1.0 frame
/* payload len, excluding 6 byte header and 2 byte CRC. /* payload len, excluding 6 byte header and 2 byte CRC.
But we use a 4-byte CRC, so need to add two to count to make packet lengths But we use a 4-byte CRC, so need to add two to count to make packet lengths
@ -210,7 +209,7 @@ extern int last_radio_rssi;
extern int last_radio_temperature; extern int last_radio_temperature;
extern int last_radio_rxpackets; extern int last_radio_rxpackets;
static int parse_heartbeat(const unsigned char *payload) static int parse_heartbeat(struct overlay_interface *interface, const unsigned char *payload)
{ {
if (payload[0]==0xFE if (payload[0]==0xFE
&& payload[1]==9 && payload[1]==9
@ -222,11 +221,18 @@ static int parse_heartbeat(const unsigned char *payload)
last_radio_rssi=(1.0*payload[10]-payload[13])/1.9; last_radio_rssi=(1.0*payload[10]-payload[13])/1.9;
last_radio_temperature=-999; // doesn't get reported last_radio_temperature=-999; // doesn't get reported
last_radio_rxpackets=-999; // doesn't get reported last_radio_rxpackets=-999; // doesn't get reported
if (1||config.debug.mavlink||gettime_ms()-last_rssi_time>30000) { int free_space = payload[12];
INFOF("Link budget = %+ddB, remote link budget = %+ddB, buffer space = %d%%", int free_bytes = (free_space * 1280) / 100 - 30;
interface->remaining_space = free_bytes;
if (free_bytes>0)
interface->next_tx_allowed = gettime_ms();
if (free_bytes>720)
interface->next_heartbeat=gettime_ms()+1000;
if (config.debug.mavlink||gettime_ms()-last_rssi_time>30000) {
INFOF("Link budget = %+ddB, remote link budget = %+ddB, buffer space = %d%% (approx %d)",
last_radio_rssi, last_radio_rssi,
(int)((1.0*payload[11] - payload[14])/1.9), (int)((1.0*payload[11] - payload[14])/1.9),
payload[12]); free_space, free_bytes);
last_rssi_time=gettime_ms(); last_rssi_time=gettime_ms();
} }
return 1; return 1;
@ -234,43 +240,45 @@ static int parse_heartbeat(const unsigned char *payload)
return 0; return 0;
} }
int mavlink_parse(struct overlay_interface *interface, struct slip_decode_state *state, static int mavlink_parse(struct overlay_interface *interface, struct slip_decode_state *state,
int packet_length, unsigned char *payload) int packet_length, unsigned char *payload, int *backtrack)
{ {
*backtrack=0;
if (packet_length==9){ if (packet_length==9){
// make sure we've heard the start and end of a remote heartbeat request // make sure we've heard the start and end of a remote heartbeat request
int errs=0; int errs=0;
int tail = golay_decode(&errs, &payload[14]); int tail = golay_decode(&errs, &payload[14]);
if (tail == 0x555){ if (tail == 0x555){
// if we've lost sync and have skipped bytes, drop any previous packet contents
if (state->mavlink_payload_start)
state->packet_length=sizeof(state->dst)+1;
return 1; return 1;
} }
return 0; return 0;
} }
int data_bytes = packet_length - (32 - 2); int data_bytes = packet_length - (32 - 2);
int errcount=decode_rs_8(&payload[4], NULL, 0, 223 - (data_bytes + 2)); // preserve the last 16 bytes of data
if (errcount==-1){ unsigned char old_footer[32];
unsigned char *payload_footer=&payload[packet_length+8-sizeof(old_footer)];
bcopy(payload_footer, old_footer, sizeof(old_footer));
int pad=223 - (data_bytes + 2);
int errors=decode_rs_8(&payload[4], NULL, 0, pad);
if (errors==-1){
if (config.debug.mavlink) if (config.debug.mavlink)
DEBUGF("Reed-Solomon error correction failed"); DEBUGF("Reed-Solomon error correction failed");
return 0; return 0;
} }
*backtrack=errors;
if (config.debug.mavlink){ if (config.debug.mavlink){
DEBUGF("Received RS protected message, len: %d, errors: %d, flags:%s%s", DEBUGF("Received RS protected message, len: %d, errors: %d, flags:%s%s",
data_bytes, data_bytes,
errcount, errors,
payload[4]&0x01?" start":"", payload[4]&0x01?" start":"",
payload[4]&0x02?" end":""); payload[4]&0x02?" end":"");
} }
if (payload[4]&0x01) if (payload[4]&0x01)
state->packet_length=0; state->packet_length=0;
else if (state->mavlink_payload_start)
// if we've lost sync and have skipped bytes, drop any previous packet contents
state->packet_length=sizeof(state->dst)+1;
if (state->packet_length + data_bytes > sizeof(state->dst)){ if (state->packet_length + data_bytes > sizeof(state->dst)){
if (config.debug.mavlink) if (config.debug.mavlink)
@ -315,8 +323,8 @@ int mavlink_decode(struct overlay_interface *interface, struct slip_decode_state
{ {
if (state->mavlink_payload_start + state->mavlink_payload_offset >= sizeof(state->mavlink_payload)){ if (state->mavlink_payload_start + state->mavlink_payload_offset >= sizeof(state->mavlink_payload)){
// drop one byte if we run out of space // drop one byte if we run out of space
if (1||config.debug.mavlink) if (config.debug.mavlink)
DEBUGF("Dropped %02x", state->mavlink_payload[0]); DEBUGF("Dropped %02x, buffer full", state->mavlink_payload[0]);
bcopy(state->mavlink_payload+1, state->mavlink_payload, sizeof(state->mavlink_payload) -1); bcopy(state->mavlink_payload+1, state->mavlink_payload, sizeof(state->mavlink_payload) -1);
state->mavlink_payload_start--; state->mavlink_payload_start--;
} }
@ -337,6 +345,7 @@ int mavlink_decode(struct overlay_interface *interface, struct slip_decode_state
state->mavlink_payload_length=9; state->mavlink_payload_length=9;
break; break;
} }
if (decode_length(state, &p[1])==0) if (decode_length(state, &p[1])==0)
break; break;
@ -349,7 +358,7 @@ int mavlink_decode(struct overlay_interface *interface, struct slip_decode_state
if (!state->mavlink_payload_length || state->mavlink_payload_offset < state->mavlink_payload_length+8) if (!state->mavlink_payload_length || state->mavlink_payload_offset < state->mavlink_payload_length+8)
return 0; return 0;
if (parse_heartbeat(p)){ if (parse_heartbeat(interface, p)){
// cut the bytes of the heartbeat out of the buffer // cut the bytes of the heartbeat out of the buffer
state->mavlink_payload_offset -= state->mavlink_payload_length+8; state->mavlink_payload_offset -= state->mavlink_payload_length+8;
if (state->mavlink_payload_offset){ if (state->mavlink_payload_offset){
@ -364,17 +373,20 @@ int mavlink_decode(struct overlay_interface *interface, struct slip_decode_state
} }
// is this a well formed packet? // is this a well formed packet?
if (mavlink_parse(interface, state, state->mavlink_payload_length, p)==1){ int backtrack=0;
if (mavlink_parse(interface, state, state->mavlink_payload_length, p, &backtrack)==1){
// Since we know we've synced with the remote party, // Since we know we've synced with the remote party,
// and there's nothing we can do about any earlier data // and there's nothing we can do about any earlier data
// throw away everything before the end of this packet // throw away everything before the end of this packet
if (state->mavlink_payload_start && (1||config.debug.mavlink)) if (state->mavlink_payload_start && config.debug.mavlink)
dump("Skipped", state->mavlink_payload, state->mavlink_payload_start); dump("Skipped", state->mavlink_payload, state->mavlink_payload_start);
state->mavlink_payload_offset -= state->mavlink_payload_length+8; // If the packet is truncated by less than 16 bytes, RS protection should be enough to recover the packet,
// but we may need to examine the last few bytes to find the start of the next packet.
state->mavlink_payload_offset -= state->mavlink_payload_length+8-backtrack;
if (state->mavlink_payload_offset){ if (state->mavlink_payload_offset){
// shuffle all remaining bytes back to the start of the buffer // shuffle all remaining bytes back to the start of the buffer
bcopy(&state->mavlink_payload[state->mavlink_payload_start + state->mavlink_payload_length+8], bcopy(&state->mavlink_payload[state->mavlink_payload_start + state->mavlink_payload_length+8-backtrack],
state->mavlink_payload, state->mavlink_payload_offset); state->mavlink_payload, state->mavlink_payload_offset);
} }
state->mavlink_payload_start=0; state->mavlink_payload_start=0;

View File

@ -673,62 +673,83 @@ static void write_stream_buffer(overlay_interface *interface){
// first, reduce the number of bytes based on the configured burst size // first, reduce the number of bytes based on the configured burst size
int bytes_allowed=interface->throttle_burst_write_size; int bytes_allowed=interface->throttle_burst_write_size;
if (interface->next_tx_allowed < now){
int total_written=0; int total_written=0;
while ((interface->tx_bytes_pending>0 || interface->tx_packet) && while (interface->tx_bytes_pending>0 || interface->tx_packet || interface->next_heartbeat <= now) {
(bytes_allowed>0 || interface->throttle_burst_write_size==0)) {
if (interface->tx_bytes_pending==0 && interface->tx_packet){ if (interface->tx_bytes_pending==0){
if (interface->next_heartbeat <= now){ if (interface->next_heartbeat <= now){
// Queue a hearbeat now // Queue a hearbeat now
mavlink_heartbeat(interface->txbuffer,&interface->tx_bytes_pending); mavlink_heartbeat(interface->txbuffer,&interface->tx_bytes_pending);
interface->next_heartbeat = now+5000; if (config.debug.packetradio)
}else{ DEBUGF("Built %d byte heartbeat", interface->tx_bytes_pending);
interface->next_heartbeat = now+1000;
}else if(interface->tx_packet && interface->remaining_space >= 256 + 8+9){
// prepare a new link layer packet in txbuffer // prepare a new link layer packet in txbuffer
if (mavlink_encode_packet(interface)) if (mavlink_encode_packet(interface))
break; break;
if (config.debug.packetradio)
DEBUGF("Built %d byte payload from packet (%d)", interface->tx_bytes_pending, interface->remaining_space);
if (interface->remaining_space - interface->tx_bytes_pending < 256 + 8+9)
interface->next_heartbeat = now;
} }
} }
if (interface->next_tx_allowed > now)
break;
int bytes = interface->tx_bytes_pending; int bytes = interface->tx_bytes_pending;
if (interface->throttle_burst_write_size && bytes>bytes_allowed) if (interface->throttle_burst_write_size && bytes>bytes_allowed)
bytes=bytes_allowed; bytes=bytes_allowed;
if (bytes<=0)
break;
if (config.debug.packetradio) if (config.debug.packetradio)
DEBUGF("Trying to write %d bytes of %d%s", bytes, interface->tx_bytes_pending, interface->tx_packet?", pending packet":""); DEBUGF("Trying to write %d bytes of %d%s", bytes, interface->tx_bytes_pending, interface->tx_packet?", pending packet":"");
int written=write(interface->alarm.poll.fd, interface->txbuffer, bytes); int written=write(interface->alarm.poll.fd, interface->txbuffer, bytes);
if (written<=0) if (written<=0){
DEBUGF("Blocking for POLLOUT");
break; break;
}
interface->remaining_space-=written;
interface->tx_bytes_pending-=written; interface->tx_bytes_pending-=written;
total_written+=written; total_written+=written;
bytes_allowed-=written; bytes_allowed-=written;
if (interface->tx_bytes_pending) if (interface->tx_bytes_pending){
bcopy(&interface->txbuffer[written],&interface->txbuffer[0], bcopy(&interface->txbuffer[written],&interface->txbuffer[0],
interface->tx_bytes_pending); interface->tx_bytes_pending);
DEBUGF("Partial write, %d left", interface->tx_bytes_pending);
}
if (config.debug.packetradio) if (config.debug.packetradio)
DEBUGF("Wrote %d bytes (%d left pending)", written, interface->tx_bytes_pending); DEBUGF("Wrote %d bytes (%d left pending, %d remains)", written, interface->tx_bytes_pending, interface->remaining_space);
} }
if (total_written>0){
// Now when are we allowed to send more? // Now when are we allowed to send more?
if (interface->throttle_bytes_per_second>0) { int rate = interface->throttle_bytes_per_second;
int delay = total_written*1000/interface->throttle_bytes_per_second; if (interface->remaining_space<=0)
rate = 600;
if (rate){
int delay = total_written*1000/rate;
if (config.debug.throttling) if (config.debug.throttling)
DEBUGF("Throttling for %dms.",delay); DEBUGF("Throttling for %dms (%d).", delay, interface->remaining_space);
interface->next_tx_allowed = now + delay; interface->next_tx_allowed = now + delay;
} }
} }
if (interface->tx_bytes_pending>0 || interface->tx_packet){ time_ms_t next_write = interface->next_tx_allowed;
if (interface->next_tx_allowed > now){ if (interface->tx_bytes_pending<=0){
// We can't write now, so clear POLLOUT flag next_write = interface->next_heartbeat;
interface->alarm.poll.events&=~POLLOUT; }
// set the interface alarm to trigger another write
interface->alarm.alarm = interface->next_tx_allowed; if (interface->alarm.alarm==-1 || next_write < interface->alarm.alarm){
interface->alarm.alarm = next_write;
interface->alarm.deadline = interface->alarm.alarm+10; interface->alarm.deadline = interface->alarm.alarm+10;
} else { }
if (interface->tx_bytes_pending>0 && next_write <= now){
// more to write, so set the POLLOUT flag // more to write, so set the POLLOUT flag
interface->alarm.poll.events|=POLLOUT; interface->alarm.poll.events|=POLLOUT;
}
} else { } else {
// Nothing to write, so clear POLLOUT flag // Nothing to write, so clear POLLOUT flag
interface->alarm.poll.events&=~POLLOUT; interface->alarm.poll.events&=~POLLOUT;
@ -802,6 +823,16 @@ static void overlay_interface_poll(struct sched_ent *alarm)
break; break;
case SOCK_STREAM: case SOCK_STREAM:
interface_read_stream(interface); interface_read_stream(interface);
// if we read a valid heartbeat packet, we may be able to write more bytes now.
if (interface->state==INTERFACE_STATE_UP && interface->remaining_space>0){
write_stream_buffer(interface);
if (alarm->alarm!=-1 && interface->state==INTERFACE_STATE_UP) {
if (alarm->alarm < now)
alarm->alarm = now;
unschedule(alarm);
schedule(alarm);
}
}
break; break;
case SOCK_FILE: case SOCK_FILE:
interface_read_file(interface); interface_read_file(interface);

View File

@ -456,6 +456,7 @@ typedef struct overlay_interface {
uint32_t throttle_bytes_per_second; uint32_t throttle_bytes_per_second;
uint32_t throttle_burst_write_size; uint32_t throttle_burst_write_size;
uint64_t next_tx_allowed; uint64_t next_tx_allowed;
int32_t remaining_space;
time_ms_t next_heartbeat; time_ms_t next_heartbeat;

View File

@ -232,8 +232,10 @@ start_radio_instance() {
set debug.rhizome_ads on \ set debug.rhizome_ads on \
set debug.rhizome_tx on \ set debug.rhizome_tx on \
set debug.rhizome_rx on \ set debug.rhizome_rx on \
set rhizome.advertise.interval 1000 \ set debug.throttling on \
set rhizome.rhizome_mdp_block_size 200 \ set debug.mavlink on \
set rhizome.advertise.interval 5000 \
set rhizome.rhizome_mdp_block_size 350 \
set log.console.level debug \ set log.console.level debug \
set log.console.show_pid on \ set log.console.show_pid on \
set log.console.show_time on \ set log.console.show_time on \
@ -241,23 +243,20 @@ start_radio_instance() {
set interfaces.1.mdp_tick_ms 5000 \ set interfaces.1.mdp_tick_ms 5000 \
set interfaces.1.socket_type STREAM \ set interfaces.1.socket_type STREAM \
set interfaces.1.encapsulation SINGLE \ set interfaces.1.encapsulation SINGLE \
set interfaces.1.point_to_point on \ set interfaces.1.point_to_point on
set interfaces.1.packet_interval 5000 \
set interfaces.1.burst_size 100 \
set interfaces.1.throttle 32000
start_servald_server start_servald_server
wait_until interface_up wait_until interface_up
} }
setup_SimulatedRadio() { setup_SimulatedRadio() {
setup_common setup_common
$servald_build_root/fakeradio 64 10000000 > "$SERVALD_VAR/radioout" 2> "$SERVALD_VAR/radioerr" & $servald_build_root/fakeradio 6 10000000 > "$SERVALD_VAR/radioout" 2> "$SERVALD_VAR/radioerr" &
FAKERADIO_PID=$! FAKERADIO_PID=$!
sleep 1 sleep 1
local END1=`head "$SERVALD_VAR/radioout" -n 1` local END1=`head "$SERVALD_VAR/radioout" -n 1`
local END2=`tail "$SERVALD_VAR/radioout" -n 1` local END2=`tail "$SERVALD_VAR/radioout" -n 1`
tfw_log "Started fakeradio pid=$FAKERADIO_PID, end1=$END1, end2=$END2" tfw_log "Started fakeradio pid=$FAKERADIO_PID, end1=$END1, end2=$END2"
set_instance +A set_instance +A
rhizome_add_file file1 2048 rhizome_add_file file1 10000
executeOk_servald config \ executeOk_servald config \
set interfaces.1.file "$END1" set interfaces.1.file "$END1"
set_instance +B set_instance +B

View File

@ -218,7 +218,7 @@ setup_simulate_extender() {
setup_servald setup_servald
assert_no_servald_processes assert_no_servald_processes
foreach_instance +A +B create_single_identity foreach_instance +A +B create_single_identity
$servald_build_root/fakeradio 1 10000000 > "$SERVALD_VAR/radioout" 2> "$SERVALD_VAR/radioerr" & $servald_build_root/fakeradio 1 20000000 > "$SERVALD_VAR/radioout" 2> "$SERVALD_VAR/radioerr" &
FAKERADIO_PID=$! FAKERADIO_PID=$!
sleep 1 sleep 1
local END1=`head "$SERVALD_VAR/radioout" -n 1` local END1=`head "$SERVALD_VAR/radioout" -n 1`
@ -240,9 +240,7 @@ setup_simulate_extender() {
set interfaces.1.socket_type STREAM \ set interfaces.1.socket_type STREAM \
set interfaces.1.encapsulation SINGLE \ set interfaces.1.encapsulation SINGLE \
set interfaces.1.point_to_point on \ set interfaces.1.point_to_point on \
set interfaces.1.packet_interval 5000 \ set interfaces.1.packet_interval 5000
set interfaces.1.burst_size 100 \
set interfaces.1.throttle 1000
foreach_instance +A +B start_routing_instance foreach_instance +A +B start_routing_instance
} }
test_simulate_extender() { test_simulate_extender() {