Refactor radio link for better encapsulation

This commit is contained in:
Jeremy Lakeman 2013-11-25 15:05:33 +10:30
parent cd639ba3b6
commit d45470ce81
6 changed files with 228 additions and 243 deletions

View File

@ -476,8 +476,6 @@ ATOM(bool_t, debug, 0, boolean,, "If true, log details
ATOM(bool_t, point_to_point, 0, boolean,, "If true, assume there will only be two devices on this interface") ATOM(bool_t, point_to_point, 0, boolean,, "If true, assume there will only be two devices on this interface")
ATOM(bool_t, ctsrts, 0, boolean,, "If true, enable CTS/RTS hardware handshaking") ATOM(bool_t, ctsrts, 0, boolean,, "If true, enable CTS/RTS hardware handshaking")
ATOM(int32_t, uartbps, 57600, int32_rs232baudrate,, "Speed of serial UART link speed (which may be different to serial device link speed)") ATOM(int32_t, uartbps, 57600, int32_rs232baudrate,, "Speed of serial UART link speed (which may be different to serial device link speed)")
ATOM(int32_t, throttle, 0, int32_nonneg,, "Limit transmit speed of serial interface (bytes per second)")
ATOM(int32_t, burst_size, 0, int32_nonneg,, "Write no more than this many bytes at a time to a serial interface")
END_STRUCT END_STRUCT
ARRAY(interface_list, NO_DUPLICATES) ARRAY(interface_list, NO_DUPLICATES)

View File

@ -49,7 +49,6 @@ struct profile_total sock_any_stats;
static void overlay_interface_poll(struct sched_ent *alarm); static void overlay_interface_poll(struct sched_ent *alarm);
static int re_init_socket(int interface_index); static int re_init_socket(int interface_index);
static void write_stream_buffer(overlay_interface *interface);
static void static void
overlay_interface_close(overlay_interface *interface){ overlay_interface_close(overlay_interface *interface){
@ -59,10 +58,6 @@ overlay_interface_close(overlay_interface *interface){
unschedule(&interface->alarm); unschedule(&interface->alarm);
unwatch(&interface->alarm); unwatch(&interface->alarm);
close(interface->alarm.poll.fd); close(interface->alarm.poll.fd);
if (interface->txbuffer){
free(interface->txbuffer);
interface->txbuffer=NULL;
}
if (interface->radio_link_state) if (interface->radio_link_state)
radio_link_free(interface); radio_link_free(interface);
interface->alarm.poll.fd=-1; interface->alarm.poll.fd=-1;
@ -82,8 +77,7 @@ void interface_state_html(struct strbuf *b, struct overlay_interface *interface)
switch(interface->type){ switch(interface->type){
case OVERLAY_INTERFACE_PACKETRADIO: case OVERLAY_INTERFACE_PACKETRADIO:
strbuf_puts(b, "Type: Packet Radio<br>"); strbuf_puts(b, "Type: Packet Radio<br>");
strbuf_sprintf(b, "RSSI: %ddB<br>",interface->radio_rssi); radio_link_state_html(b, interface);
strbuf_sprintf(b, "Remote RSSI: %ddB<br>",interface->remote_rssi);
break; break;
case OVERLAY_INTERFACE_ETHERNET: case OVERLAY_INTERFACE_ETHERNET:
strbuf_puts(b, "Type: Ethernet<br>"); strbuf_puts(b, "Type: Ethernet<br>");
@ -417,8 +411,6 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
set_destination_ref(&interface->destination, NULL); set_destination_ref(&interface->destination, NULL);
interface->destination = new_destination(interface, ifconfig->encapsulation); interface->destination = new_destination(interface, ifconfig->encapsulation);
interface->throttle_bytes_per_second = ifconfig->throttle;
interface->throttle_burst_write_size = ifconfig->burst_size;
/* Pick a reasonable default MTU. /* Pick a reasonable default MTU.
This will ultimately get tuned by the bandwidth and other properties of the interface */ This will ultimately get tuned by the bandwidth and other properties of the interface */
interface->mtu = 1200; interface->mtu = 1200;
@ -722,8 +714,6 @@ static void interface_read_stream(struct overlay_interface *interface){
return; return;
} }
if (config.debug.packetradio)
dump("read bytes", buffer, nread);
int i; int i;
for (i=0;i<nread;i++) for (i=0;i<nread;i++)
@ -732,104 +722,6 @@ static void interface_read_stream(struct overlay_interface *interface){
OUT(); OUT();
} }
static void write_stream_buffer(overlay_interface *interface){
time_ms_t now = gettime_ms();
// Throttle output to a prescribed bit-rate
// first, reduce the number of bytes based on the configured burst size
int bytes_allowed=interface->throttle_burst_write_size;
int total_written=0;
while (interface->tx_bytes_pending>0 || interface->tx_packet || interface->next_heartbeat <= now) {
if (interface->tx_bytes_pending==0){
// allocate tx buffer on first use
if (!interface->txbuffer){
interface->txbuffer=emalloc(OVERLAY_INTERFACE_RX_BUFFER_SIZE);
if (!interface->txbuffer)
break;
}
if (interface->next_heartbeat <= now){
// Queue a hearbeat now
radio_link_heartbeat(interface->txbuffer,&interface->tx_bytes_pending);
if (config.debug.packetradio)
DEBUGF("Sending heartbeat");
interface->next_heartbeat = now+1000;
}else if(interface->remaining_space >= LINK_MTU + HEARTBEAT_SIZE){
// prepare a new link layer packet in txbuffer
if (radio_link_encode_packet(interface))
break;
if (interface->remaining_space - interface->tx_bytes_pending < LINK_MTU + HEARTBEAT_SIZE)
interface->next_heartbeat = now;
}
// nothing interesting to send, just break
if (interface->tx_bytes_pending==0)
break;
}
if (interface->next_tx_allowed > now)
break;
int bytes = interface->tx_bytes_pending;
if (interface->throttle_burst_write_size && bytes>bytes_allowed)
bytes=bytes_allowed;
if (bytes<=0)
break;
int written=write(interface->alarm.poll.fd, interface->txbuffer, bytes);
if (written<=0){
DEBUGF("Blocking for POLLOUT");
break;
}
interface->remaining_space-=written;
interface->tx_bytes_pending-=written;
total_written+=written;
bytes_allowed-=written;
if (interface->tx_bytes_pending){
bcopy(&interface->txbuffer[written],&interface->txbuffer[0],
interface->tx_bytes_pending);
DEBUGF("Partial write, %d left", interface->tx_bytes_pending);
}
}
if (total_written>0){
// Now when are we allowed to send more?
int rate = interface->throttle_bytes_per_second;
if (interface->remaining_space<=0)
rate = 600;
if (rate){
int delay = total_written*1000/rate;
if (config.debug.throttling)
DEBUGF("Throttling for %dms (%d).", delay, interface->remaining_space);
interface->next_tx_allowed = now + delay;
}
}
time_ms_t next_write = interface->next_tx_allowed;
if (interface->tx_bytes_pending<=0){
next_write = interface->next_heartbeat;
}
if (interface->alarm.alarm==-1 || next_write < interface->alarm.alarm){
interface->alarm.alarm = next_write;
interface->alarm.deadline = interface->alarm.alarm+10;
}
if (interface->tx_bytes_pending>0 && next_write <= now){
// more to write, so set the POLLOUT flag
interface->alarm.poll.events|=POLLOUT;
} else {
// Nothing to write, so clear POLLOUT flag
interface->alarm.poll.events&=~POLLOUT;
}
watch(&interface->alarm);
}
static void overlay_interface_poll(struct sched_ent *alarm) static void overlay_interface_poll(struct sched_ent *alarm)
{ {
struct overlay_interface *interface = (overlay_interface *)alarm; struct overlay_interface *interface = (overlay_interface *)alarm;
@ -841,7 +733,7 @@ static void overlay_interface_poll(struct sched_ent *alarm)
if (interface->state==INTERFACE_STATE_UP if (interface->state==INTERFACE_STATE_UP
&& interface->destination->tick_ms>0 && interface->destination->tick_ms>0
&& interface->send_broadcasts && interface->send_broadcasts
&& !interface->tx_packet){ && !radio_link_is_busy(interface)){
if (now >= interface->destination->last_tx+interface->destination->tick_ms) if (now >= interface->destination->last_tx+interface->destination->tick_ms)
overlay_send_tick_packet(interface->destination); overlay_send_tick_packet(interface->destination);
@ -852,8 +744,8 @@ static void overlay_interface_poll(struct sched_ent *alarm)
switch(interface->socket_type){ switch(interface->socket_type){
case SOCK_STREAM: case SOCK_STREAM:
write_stream_buffer(interface); radio_link_tx(interface);
break; return;
case SOCK_DGRAM: case SOCK_DGRAM:
break; break;
case SOCK_FILE: case SOCK_FILE:
@ -873,14 +765,8 @@ static void overlay_interface_poll(struct sched_ent *alarm)
if (alarm->poll.revents & POLLOUT){ if (alarm->poll.revents & POLLOUT){
switch(interface->socket_type){ switch(interface->socket_type){
case SOCK_STREAM: case SOCK_STREAM:
write_stream_buffer(interface); radio_link_tx(interface);
if (alarm->alarm!=-1 && interface->state==INTERFACE_STATE_UP) { return;
if (alarm->alarm < now)
alarm->alarm = now;
unschedule(alarm);
schedule(alarm);
}
break;
case SOCK_DGRAM: case SOCK_DGRAM:
case SOCK_FILE: case SOCK_FILE:
//XXX error? fatal? //XXX error? fatal?
@ -896,14 +782,9 @@ static void overlay_interface_poll(struct sched_ent *alarm)
case SOCK_STREAM: case SOCK_STREAM:
interface_read_stream(interface); interface_read_stream(interface);
// if we read a valid heartbeat packet, we may be able to write more bytes now. // if we read a valid heartbeat packet, we may be able to write more bytes now.
if (interface->state==INTERFACE_STATE_UP && interface->remaining_space>0){ if (interface->state==INTERFACE_STATE_UP){
write_stream_buffer(interface); radio_link_tx(interface);
if (alarm->alarm!=-1 && interface->state==INTERFACE_STATE_UP) { return;
if (alarm->alarm < now)
alarm->alarm = now;
unschedule(alarm);
schedule(alarm);
}
} }
break; break;
case SOCK_FILE: case SOCK_FILE:
@ -943,24 +824,7 @@ int overlay_broadcast_ensemble(struct network_destination *destination, struct o
switch(interface->socket_type){ switch(interface->socket_type){
case SOCK_STREAM: case SOCK_STREAM:
{ return radio_link_queue_packet(interface, buffer);
if (interface->tx_packet){
ob_free(buffer);
return WHYF("Cannot send two packets to a stream at the same time");
}
// prepare the buffer for reading
ob_flip(buffer);
interface->tx_packet = buffer;
write_stream_buffer(interface);
if (interface->alarm.alarm!=-1){
unschedule(&interface->alarm);
schedule(&interface->alarm);
}
return 0;
}
case SOCK_FILE: case SOCK_FILE:
{ {

View File

@ -22,6 +22,7 @@
#include "conf.h" #include "conf.h"
#include "overlay_buffer.h" #include "overlay_buffer.h"
#include "overlay_packet.h" #include "overlay_packet.h"
#include "radio_link.h"
#include "str.h" #include "str.h"
#include "strbuf.h" #include "strbuf.h"
@ -290,7 +291,7 @@ overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
int i; int i;
for(i=0;i<frame->destination_count;i++) for(i=0;i<frame->destination_count;i++)
{ {
if (frame->destinations[i].destination->interface->tx_packet) if (radio_link_is_busy(frame->destinations[i].destination->interface))
continue; continue;
time_ms_t next_packet = limit_next_allowed(&frame->destinations[i].destination->transfer_limit); time_ms_t next_packet = limit_next_allowed(&frame->destinations[i].destination->transfer_limit);
if (frame->destinations[i].transmit_time){ if (frame->destinations[i].transmit_time){
@ -403,8 +404,7 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
} }
}else{ }else{
// skip this interface if the stream tx buffer has data // skip this interface if the stream tx buffer has data
if (dest->interface->socket_type==SOCK_STREAM if (radio_link_is_busy(dest->interface))
&& dest->interface->tx_packet)
continue; continue;
// can we send a packet on this interface now? // can we send a packet on this interface now?

View File

@ -74,8 +74,9 @@ struct mavlink_RADIO_v10 {
*/ */
#define FEC_LENGTH 32 #define FEC_LENGTH 32
#define FEC_BYTES 223 #define FEC_MAX_BYTES 223
#define RADIO_HEADER_LENGTH 6 #define RADIO_HEADER_LENGTH 6
#define RADIO_USED_HEADER_LENGTH 4
#define RADIO_CRC_LENGTH 2 #define RADIO_CRC_LENGTH 2
#define LINK_PAYLOAD_MTU (LINK_MTU - FEC_LENGTH - RADIO_HEADER_LENGTH - RADIO_CRC_LENGTH) #define LINK_PAYLOAD_MTU (LINK_MTU - FEC_LENGTH - RADIO_HEADER_LENGTH - RADIO_CRC_LENGTH)
@ -83,7 +84,15 @@ struct mavlink_RADIO_v10 {
struct radio_link_state{ struct radio_link_state{
// next seq for transmission // next seq for transmission
int tx_seq; int tx_seq;
// small buffer for parsing incoming bytes from the serial interface,
// looking for recoverable link layer packets
// should be large enough to hold at least one packet from the remote end
// plus one heartbeat packet from the local firmware
uint8_t payload[LINK_MTU*3];
// decoded length of next link layer packet // decoded length of next link layer packet
// including all header and footer bytes
int payload_length; int payload_length;
// last rx seq for reassembly // last rx seq for reassembly
int seq; int seq;
@ -92,15 +101,31 @@ struct radio_link_state{
// offset after payload_start for incoming bytes // offset after payload_start for incoming bytes
int payload_offset; int payload_offset;
// small buffer for parsing incoming bytes from the serial interface,
// looking for recoverable link layer packets
uint8_t payload[LINK_MTU*3];
// small buffer for assembling mdp payloads. // small buffer for assembling mdp payloads.
// should be large enough to hold MDP_MTU uint8_t dst[MDP_MTU];
uint8_t dst[OVERLAY_INTERFACE_RX_BUFFER_SIZE];
// length of recovered packet // length of recovered packet
int packet_length; int packet_length;
// next firmware heartbeat
time_ms_t next_heartbeat;
time_ms_t last_packet;
// parsed rssi
int radio_rssi;
int remote_rssi;
// estimated firmware buffer space
int32_t remaining_space;
// next serial write
uint64_t next_tx_allowed;
// partially sent packet
struct overlay_buffer *tx_packet;
// serial write buffer
uint8_t txbuffer[LINK_MTU];
int tx_bytes;
int tx_pos;
}; };
/* /*
@ -126,8 +151,10 @@ int decode_rs_8(data_t *data, int *eras_pos, int no_eras, int pad);
int radio_link_free(struct overlay_interface *interface) int radio_link_free(struct overlay_interface *interface)
{ {
free(interface->radio_link_state); if (interface->radio_link_state){
interface->radio_link_state=NULL; free(interface->radio_link_state);
interface->radio_link_state=NULL;
}
return 0; return 0;
} }
@ -137,78 +164,183 @@ int radio_link_init(struct overlay_interface *interface)
return 0; return 0;
} }
void radio_link_state_html(struct strbuf *b, struct overlay_interface *interface)
{
struct radio_link_state *state = interface->radio_link_state;
strbuf_sprintf(b, "RSSI: %ddB<br>", state->radio_rssi);
strbuf_sprintf(b, "Remote RSSI: %ddB<br>", state->remote_rssi);
}
// write a new link layer packet to interface->txbuffer // write a new link layer packet to interface->txbuffer
// consuming more bytes from the next interface->tx_packet if required // consuming more bytes from the next interface->tx_packet if required
int radio_link_encode_packet(struct overlay_interface *interface) static int radio_link_encode_packet(struct radio_link_state *link_state)
{ {
// if we have nothing interesting left to send, don't create a packet at all // if we have nothing interesting left to send, don't create a packet at all
if (!interface->tx_packet) if (!link_state->tx_packet)
return 0; return 0;
int count = ob_remaining(interface->tx_packet); int count = ob_remaining(link_state->tx_packet);
int startP = (ob_position(interface->tx_packet) == 0); int startP = (ob_position(link_state->tx_packet) == 0);
int endP = 1; int endP = 1;
if (count > LINK_PAYLOAD_MTU){ if (count > LINK_PAYLOAD_MTU){
count = LINK_PAYLOAD_MTU; count = LINK_PAYLOAD_MTU;
endP = 0; endP = 0;
} }
interface->txbuffer[0]=0xfe; // mavlink v1.0 magic header link_state->txbuffer[0]=0xfe; // mavlink v1.0 magic header
// we need to add FEC_LENGTH for FEC, but the length field doesn't include the expected headers or CRC // we need to add FEC_LENGTH for FEC, but the length field doesn't include the expected headers or CRC
int len = count + FEC_LENGTH - RADIO_CRC_LENGTH; int len = count + FEC_LENGTH - RADIO_CRC_LENGTH;
interface->txbuffer[1]=len; // mavlink payload length link_state->txbuffer[1]=len; // mavlink payload length
interface->txbuffer[2]=(len & 0xF); link_state->txbuffer[2]=(len & 0xF);
interface->txbuffer[3]=0; link_state->txbuffer[3]=0;
// add golay encoding so that decoding the actual length is more reliable // add golay encoding so that decoding the actual length is more reliable
golay_encode(&interface->txbuffer[1]); golay_encode(&link_state->txbuffer[1]);
interface->txbuffer[4]=(interface->radio_link_state->tx_seq++) & 0x3f; link_state->txbuffer[4]=(link_state->tx_seq++) & 0x3f;
if (startP) interface->txbuffer[4]|=0x40; if (startP) link_state->txbuffer[4]|=0x40;
if (endP) interface->txbuffer[4]|=0x80; if (endP) link_state->txbuffer[4]|=0x80;
interface->txbuffer[5]=MAVLINK_MSG_ID_DATASTREAM; link_state->txbuffer[5]=MAVLINK_MSG_ID_DATASTREAM;
ob_get_bytes(interface->tx_packet, &interface->txbuffer[6], count); ob_get_bytes(link_state->tx_packet, &link_state->txbuffer[6], count);
encode_rs_8(&interface->txbuffer[4], &interface->txbuffer[6+count], FEC_BYTES - (count+2)); encode_rs_8(&link_state->txbuffer[4], &link_state->txbuffer[6+count], FEC_MAX_BYTES - (count+2));
interface->tx_bytes_pending=len + RADIO_CRC_LENGTH + RADIO_HEADER_LENGTH; link_state->tx_bytes=len + RADIO_CRC_LENGTH + RADIO_HEADER_LENGTH;
if (endP){ if (endP){
ob_free(interface->tx_packet); ob_free(link_state->tx_packet);
interface->tx_packet=NULL; link_state->tx_packet=NULL;
overlay_queue_schedule_next(gettime_ms()); overlay_queue_schedule_next(gettime_ms());
} }
return 0; return 0;
} }
int radio_link_heartbeat(unsigned char *frame, int *outlen) int radio_link_is_busy(struct overlay_interface *interface)
{ {
int count=9; if (interface->radio_link_state && interface->radio_link_state->tx_packet)
bzero(frame, count + RADIO_CRC_LENGTH + RADIO_HEADER_LENGTH); return 1;
return 0;
frame[0]=0xfe; // mavlink v1.0 frame }
// Must be 9 to indicate heartbeat
frame[1]=count; // payload len, excluding 6 byte header and 2 byte CRC
frame[2]=(count & 0xF); // packet sequence
frame[3]=0x00; // system ID of sender (MAV_TYPE_GENERIC)
// we're golay encoding the length to improve the probability of skipping it correctly
golay_encode(&frame[1]);
frame[4]=0xf1; // component ID of sender (MAV_COMP_ID_UART_BRIDGE)
// Must be zero to indicate heartbeat
frame[5]=0; // message ID type of this frame: DATA_STREAM
// extra magic number to help correctly detect remote heartbeat requests int radio_link_queue_packet(struct overlay_interface *interface, struct overlay_buffer *buffer)
frame[14]=0x55; {
frame[15]=0x05; struct radio_link_state *link_state = interface->radio_link_state;
golay_encode(&frame[14]);
*outlen=count + RADIO_CRC_LENGTH + RADIO_HEADER_LENGTH; if (link_state->tx_packet){
ob_free(buffer);
return WHYF("Cannot send two packets to a stream at the same time");
}
// prepare the buffer for reading
ob_flip(buffer);
link_state->tx_packet = buffer;
radio_link_tx(interface);
return 0; return 0;
} }
static int parse_heartbeat(struct overlay_interface *interface, const unsigned char *payload) static int build_heartbeat(struct radio_link_state *link_state)
{
int count=9;
bzero(link_state->txbuffer, count + RADIO_CRC_LENGTH + RADIO_HEADER_LENGTH);
link_state->txbuffer[0]=0xfe; // mavlink v1.0 link_state->txbuffer
// Must be 9 to indicate heartbeat
link_state->txbuffer[1]=count; // payload len, excluding 6 byte header and 2 byte CRC
link_state->txbuffer[2]=(count & 0xF); // packet sequence
link_state->txbuffer[3]=0x00; // system ID of sender (MAV_TYPE_GENERIC)
// we're golay encoding the length to improve the probability of skipping it correctly
golay_encode(&link_state->txbuffer[1]);
link_state->txbuffer[4]=0xf1; // component ID of sender (MAV_COMP_ID_UART_BRIDGE)
// Must be zero to indicate heartbeat
link_state->txbuffer[5]=0; // message ID type of this link_state->txbuffer: DATA_STREAM
// extra magic number to help correctly detect remote heartbeat requests
link_state->txbuffer[14]=0x55;
link_state->txbuffer[15]=0x05;
golay_encode(&link_state->txbuffer[14]);
link_state->tx_bytes = count + RADIO_CRC_LENGTH + RADIO_HEADER_LENGTH;
if (config.debug.radio_link)
DEBUGF("Produced heartbeat");
return 0;
}
// write a new link layer packet to interface->txbuffer
// consuming more bytes from the next interface->tx_packet if required
int radio_link_tx(struct overlay_interface *interface)
{
struct radio_link_state *link_state = interface->radio_link_state;
unschedule(&interface->alarm);
interface->alarm.alarm = 0;
time_ms_t next_tick = interface->destination->last_tx+interface->destination->tick_ms;
time_ms_t now = gettime_ms();
while(1){
if (link_state->tx_bytes){
if (link_state->next_tx_allowed > now){
interface->alarm.alarm = link_state->next_tx_allowed;
break;
}
int written=write(interface->alarm.poll.fd, &link_state->txbuffer[link_state->tx_pos], link_state->tx_bytes);
if (written<=0){
interface->alarm.poll.events|=POLLOUT;
break;
}
link_state->remaining_space-=written;
link_state->tx_bytes-=written;
if (link_state->tx_bytes)
link_state->tx_pos+=written;
else
link_state->tx_pos=0;
continue;
}
interface->alarm.poll.events&=~POLLOUT;
if (link_state->next_heartbeat<=now){
build_heartbeat(link_state);
link_state->next_heartbeat = now + 1000;
continue;
}
// out of space? Don't bother to send anything interesting
// until we hear the next heartbeat response
if (link_state->remaining_space < LINK_MTU + HEARTBEAT_SIZE){
interface->alarm.alarm = link_state->next_heartbeat;
break;
}
if (link_state->remaining_space < LINK_MTU + HEARTBEAT_SIZE)
link_state->next_heartbeat = now;
if (!link_state->tx_packet){
// finished current packet, wait for more.
interface->alarm.alarm = next_tick;
break;
}
// encode another packet fragment
radio_link_encode_packet(link_state);
link_state->last_packet = now;
}
watch(&interface->alarm);
if (interface->alarm.alarm<now)
interface->alarm.alarm=now;
if (interface->alarm.alarm){
interface->alarm.deadline = interface->alarm.alarm+100;
schedule(&interface->alarm);
}
return 0;
}
static int parse_heartbeat(struct radio_link_state *state, const unsigned char *payload)
{ {
if (payload[0]==0xFE if (payload[0]==0xFE
&& payload[1]==9 && payload[1]==9
@ -217,59 +349,56 @@ static int parse_heartbeat(struct overlay_interface *interface, const unsigned c
&& payload[5]==MAVLINK_MSG_ID_RADIO){ && payload[5]==MAVLINK_MSG_ID_RADIO){
// we can assume that radio status packets arrive without corruption // we can assume that radio status packets arrive without corruption
interface->radio_rssi=(1.0*payload[10]-payload[13])/1.9; state->radio_rssi=(1.0*payload[10]-payload[13])/1.9;
interface->remote_rssi=(1.0*payload[11] - payload[14])/1.9; state->remote_rssi=(1.0*payload[11] - payload[14])/1.9;
int free_space = payload[12]; int free_space = payload[12];
int free_bytes = (free_space * 1280) / 100 - 30; int free_bytes = (free_space * 1280) / 100 - 30;
interface->remaining_space = free_bytes; state->remaining_space = free_bytes;
if (free_bytes>0) if (free_bytes>0)
interface->next_tx_allowed = gettime_ms(); state->next_tx_allowed = gettime_ms();
if (free_bytes>720) if (free_bytes>720)
interface->next_heartbeat=gettime_ms()+1000; state->next_heartbeat=gettime_ms()+1000;
if (config.debug.packetradio) { if (config.debug.packetradio)
INFOF("Link budget = %+ddB, remote link budget = %+ddB, buffer space = %d%% (approx %d)", INFOF("Link budget = %+ddB, remote link budget = %+ddB, buffer space = %d%% (approx %d)",
interface->radio_rssi, state->radio_rssi,
interface->remote_rssi, state->remote_rssi,
free_space, free_bytes); free_space, free_bytes);
}
return 1; return 1;
} }
return 0; return 0;
} }
static int radio_link_parse(struct overlay_interface *interface, struct radio_link_state *state, static int radio_link_parse(struct overlay_interface *interface, struct radio_link_state *state,
int packet_length, unsigned char *payload, int *backtrack) size_t packet_length, uint8_t *payload, int *backtrack)
{ {
*backtrack=0; *backtrack=0;
if (packet_length==9){ if (packet_length==17){
// make sure we've heard the start and end of a remote heartbeat request // if we've heard the start and end of a remote heartbeat request
// we can skip it without checking anything else
int errs=0; int errs=0;
int tail = golay_decode(&errs, &payload[14]); int tail = golay_decode(&errs, &payload[14]);
if (tail == 0x555){ if (tail == 0x555){
if (config.debug.radio_link)
DEBUGF("Decoded remote heartbeat request");
return 1; return 1;
} }
return 0; return 0;
} }
int data_bytes = packet_length - (FEC_LENGTH - RADIO_CRC_LENGTH); size_t data_bytes = packet_length - (RADIO_USED_HEADER_LENGTH + FEC_LENGTH);
// preserve the last 16 bytes of data
unsigned char old_footer[FEC_LENGTH];
unsigned char *payload_footer=&payload[packet_length + RADIO_HEADER_LENGTH + RADIO_CRC_LENGTH - sizeof(old_footer)];
bcopy(payload_footer, old_footer, sizeof(old_footer));
int pad=FEC_BYTES - (data_bytes + RADIO_CRC_LENGTH); int errors=decode_rs_8(&payload[4], NULL, 0, FEC_MAX_BYTES - data_bytes);
int errors=decode_rs_8(&payload[4], NULL, 0, pad);
if (errors==-1){ if (errors==-1){
if (config.debug.radio_link) if (config.debug.radio_link)
DEBUGF("Reed-Solomon error correction failed"); DEBUGF("Reed-Solomon error correction failed");
return 0; return 0;
} }
*backtrack=errors; *backtrack=errors;
data_bytes -= 2;
int seq=payload[4]&0x3f; int seq=payload[4]&0x3f;
if (config.debug.radio_link){ if (config.debug.radio_link){
DEBUGF("Received RS protected message, len: %d, errors: %d, seq: %d, flags:%s%s", DEBUGF("Received RS protected message, len: %zd, errors: %d, seq: %d, flags:%s%s",
data_bytes, data_bytes,
errors, errors,
seq, seq,
@ -318,7 +447,9 @@ static int decode_length(struct radio_link_state *state, unsigned char *p)
if (length<0 || ((length >>8) & 0xF) != (length&0xF)) if (length<0 || ((length >>8) & 0xF) != (length&0xF))
return -1; return -1;
length=length&0xFF; length=length&0xFF;
if (length!=9 && (length <= FEC_LENGTH - RADIO_CRC_LENGTH || length + RADIO_HEADER_LENGTH + RADIO_CRC_LENGTH > LINK_MTU)) length += RADIO_HEADER_LENGTH + RADIO_CRC_LENGTH;
if (length!=17 && (length <= FEC_LENGTH || length > LINK_MTU))
return -1; return -1;
if (config.debug.radio_link && (errs || state->payload_length!=*p)) if (config.debug.radio_link && (errs || state->payload_length!=*p))
@ -328,8 +459,10 @@ static int decode_length(struct radio_link_state *state, unsigned char *p)
return 0; return 0;
} }
// add one byte at a time from the serial link, and attempt to decode packets
int radio_link_decode(struct overlay_interface *interface, uint8_t c) int radio_link_decode(struct overlay_interface *interface, uint8_t c)
{ {
IN();
struct radio_link_state *state=interface->radio_link_state; struct radio_link_state *state=interface->radio_link_state;
if (state->payload_start + state->payload_offset >= sizeof(state->payload)){ if (state->payload_start + state->payload_offset >= sizeof(state->payload)){
@ -353,7 +486,7 @@ int radio_link_decode(struct overlay_interface *interface, uint8_t c)
&& p[4]==RADIO_SOURCE_COMPONENT && p[4]==RADIO_SOURCE_COMPONENT
&& p[5]==MAVLINK_MSG_ID_RADIO){ && p[5]==MAVLINK_MSG_ID_RADIO){
//looks like a valid heartbeat response header, read the rest and process it //looks like a valid heartbeat response header, read the rest and process it
state->payload_length=9; state->payload_length=17;
break; break;
} }
@ -366,15 +499,15 @@ int radio_link_decode(struct overlay_interface *interface, uint8_t c)
} }
// wait for a whole packet // wait for a whole packet
if (!state->payload_length || state->payload_offset < state->payload_length + RADIO_HEADER_LENGTH + RADIO_CRC_LENGTH) if (!state->payload_length || state->payload_offset < state->payload_length)
return 0; RETURN(0);
if (parse_heartbeat(interface, p)){ if (parse_heartbeat(state, p)){
// cut the bytes of the heartbeat out of the buffer // cut the bytes of the heartbeat out of the buffer
state->payload_offset -= state->payload_length + RADIO_HEADER_LENGTH + RADIO_CRC_LENGTH; state->payload_offset -= state->payload_length;
if (state->payload_offset){ if (state->payload_offset){
// shuffle bytes backwards // shuffle bytes backwards
bcopy(&p[state->payload_length + RADIO_HEADER_LENGTH + RADIO_CRC_LENGTH], p, state->payload_offset); bcopy(&p[state->payload_length], p, state->payload_offset);
} }
// restart parsing for a valid header from the beginning of out buffer // restart parsing for a valid header from the beginning of out buffer
state->payload_offset+=state->payload_start; state->payload_offset+=state->payload_start;
@ -394,10 +527,10 @@ int radio_link_decode(struct overlay_interface *interface, uint8_t c)
// If the packet is truncated by less than 16 bytes, RS protection should be enough to recover the packet, // If the packet is truncated by less than 16 bytes, RS protection should be enough to recover the packet,
// but we may need to examine the last few bytes to find the start of the next packet. // but we may need to examine the last few bytes to find the start of the next packet.
state->payload_offset -= state->payload_length + RADIO_HEADER_LENGTH + RADIO_CRC_LENGTH - backtrack; state->payload_offset -= state->payload_length - backtrack;
if (state->payload_offset){ if (state->payload_offset){
// shuffle all remaining bytes back to the start of the buffer // shuffle all remaining bytes back to the start of the buffer
bcopy(&state->payload[state->payload_start + state->payload_length + RADIO_HEADER_LENGTH + RADIO_CRC_LENGTH - backtrack], bcopy(&state->payload[state->payload_start + state->payload_length - backtrack],
state->payload, state->payload_offset); state->payload, state->payload_offset);
} }
state->payload_start=0; state->payload_start=0;
@ -409,4 +542,5 @@ int radio_link_decode(struct overlay_interface *interface, uint8_t c)
} }
state->payload_length=0; state->payload_length=0;
}; };
RETURN(0);
} }

View File

@ -7,7 +7,9 @@
int radio_link_free(struct overlay_interface *interface); int radio_link_free(struct overlay_interface *interface);
int radio_link_init(struct overlay_interface *interface); int radio_link_init(struct overlay_interface *interface);
int radio_link_decode(struct overlay_interface *interface, uint8_t c); int radio_link_decode(struct overlay_interface *interface, uint8_t c);
int radio_link_heartbeat(unsigned char *frame, int *outlen); int radio_link_tx(struct overlay_interface *interface);
int radio_link_encode_packet(struct overlay_interface *interface); void radio_link_state_html(struct strbuf *b, struct overlay_interface *interface);
int radio_link_is_busy(struct overlay_interface *interface);
int radio_link_queue_packet(struct overlay_interface *interface, struct overlay_buffer *buffer);
#endif #endif

View File

@ -344,19 +344,6 @@ typedef struct overlay_interface {
int recv_count; int recv_count;
int tx_count; int tx_count;
// stream socket tx state;
struct overlay_buffer *tx_packet;
uint8_t *txbuffer;
int tx_bytes_pending;
// Throttle TX rate if required (stream interfaces only for now)
uint32_t throttle_bytes_per_second;
uint32_t throttle_burst_write_size;
uint64_t next_tx_allowed;
int32_t remaining_space;
time_ms_t next_heartbeat;
int radio_rssi;
int remote_rssi;
struct radio_link_state *radio_link_state; struct radio_link_state *radio_link_state;
// copy of ifconfig flags // copy of ifconfig flags