mirror of
https://github.com/servalproject/serval-dna.git
synced 2024-12-19 05:07:56 +00:00
Start using network coding over the radio link
This commit is contained in:
parent
d67dfa22b2
commit
76267904d9
@ -980,7 +980,6 @@ int app_mdp_ping(const struct cli_parsed *parsed, struct cli_context *context)
|
||||
int r = mdp_send(mdp_sockfd, &mdp_header, payload, sizeof(payload));
|
||||
if (r<0)
|
||||
WHY_perror("mdp_send");
|
||||
else
|
||||
tx_count++;
|
||||
}
|
||||
|
||||
|
@ -193,14 +193,14 @@ int write_bytes(struct radio_state *s)
|
||||
wrote=8;
|
||||
if (s->last_char_ms)
|
||||
wrote = write(s->fd, s->rxbuffer, wrote);
|
||||
if (wrote>0){
|
||||
if (wrote<=0)
|
||||
abort();
|
||||
log_time();
|
||||
fprintf(stderr, "Wrote to %s\n", s->name);
|
||||
dump(NULL, s->rxbuffer, wrote);
|
||||
if (wrote < s->rxb_len)
|
||||
bcopy(&s->rxbuffer[wrote], s->rxbuffer, s->rxb_len - wrote);
|
||||
s->rxb_len -= wrote;
|
||||
}
|
||||
return wrote;
|
||||
}
|
||||
|
||||
|
187
network_coding.c
187
network_coding.c
@ -26,21 +26,25 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
#include <stdarg.h>
|
||||
#include "network_coding.h"
|
||||
#include "dataformats.h"
|
||||
#include "log.h"
|
||||
#include "mem.h"
|
||||
|
||||
#define FLAG_NEW 1
|
||||
|
||||
|
||||
struct nc_packet{
|
||||
uint8_t sequence;
|
||||
uint32_t combination;
|
||||
size_t len;
|
||||
uint8_t flags;
|
||||
uint8_t *payload;
|
||||
};
|
||||
|
||||
struct nc_half {
|
||||
uint8_t last_ack;
|
||||
|
||||
// Define the size parameters of the network coding data structures
|
||||
uint8_t window_size; // limited to the number of bits we can fit in a uint32_t
|
||||
uint8_t window_start; // sequence of first datagram in sending window
|
||||
uint8_t unseen; // sequence of first unseen packet
|
||||
size_t datagram_size; // number of bytes in each fixed sized unit
|
||||
uint8_t deliver_next; // sequence of next packet that should be delivered
|
||||
// dynamically sized array of pointers to packet buffers
|
||||
@ -51,6 +55,7 @@ struct nc_half {
|
||||
// At the receiver, this should be 2*maximum window size to allow for older packets we can't decode yet
|
||||
uint8_t max_queue_size;
|
||||
uint8_t queue_size; // # of packets currently in the array
|
||||
int count_new; // number of un-sent degrees of freedom
|
||||
};
|
||||
|
||||
struct nc{
|
||||
@ -115,24 +120,24 @@ int nc_tx_has_room(struct nc *n)
|
||||
|
||||
int nc_tx_enqueue_datagram(struct nc *n, unsigned char *d, size_t len)
|
||||
{
|
||||
if (len!=n->tx.datagram_size){
|
||||
fprintf(stderr, "Invalid length %zd (%zd)\n", len, n->tx.datagram_size);
|
||||
return -1;
|
||||
}
|
||||
if (len==0 || len>n->tx.datagram_size)
|
||||
return WHYF("Invalid length %zd (%zd)", len, n->tx.datagram_size);
|
||||
if (!nc_tx_has_room(n))
|
||||
return 1;
|
||||
|
||||
// Add datagram to queue
|
||||
uint8_t seq = n->tx.window_start + n->tx.queue_size;
|
||||
int index = seq & (n->tx.max_queue_size -1);
|
||||
if (n->tx.packets[index].payload){
|
||||
fprintf(stderr, "Attempted to replace TX payload %d (%d) with %d without freeing it first\n",index,n->tx.packets[index].sequence, seq);
|
||||
exit(-1);
|
||||
}
|
||||
n->tx.packets[index].payload = malloc(len);
|
||||
if (n->tx.packets[index].payload)
|
||||
FATALF("Attempted to replace TX payload %d (%d) with %d without freeing it first",index,n->tx.packets[index].sequence, seq);
|
||||
n->tx.packets[index].payload = emalloc(len);
|
||||
if (!n->tx.packets[index].payload)
|
||||
return 1;
|
||||
n->tx.packets[index].sequence = seq;
|
||||
n->tx.packets[index].combination = 0x80000000;
|
||||
n->tx.packets[index].flags = FLAG_NEW;
|
||||
n->tx.count_new++;
|
||||
n->tx.packets[index].len = len;
|
||||
bcopy(d, n->tx.packets[index].payload, len);
|
||||
n->tx.queue_size++;
|
||||
return 0;
|
||||
@ -147,23 +152,6 @@ static int _compare_uint8(uint8_t one, uint8_t two)
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int _nc_get_ack(struct nc_half *n, uint8_t *first_unseen, uint8_t *window_size)
|
||||
{
|
||||
uint8_t seq;
|
||||
|
||||
for (seq = n->window_start; ;seq++){
|
||||
int index = seq & (n->max_queue_size -1);
|
||||
if (n->packets[index].sequence != seq || !n->packets[index].payload)
|
||||
break;
|
||||
}
|
||||
*window_size = 32 - (seq - n->deliver_next);
|
||||
if (*window_size > n->max_queue_size)
|
||||
*window_size = n->max_queue_size;
|
||||
*first_unseen = seq;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int _nc_ack(struct nc_half *n, uint8_t first_unseen, uint8_t window_size)
|
||||
{
|
||||
if (window_size>n->max_queue_size)
|
||||
@ -200,12 +188,13 @@ static uint32_t _combine_masks(const struct nc_packet *src, const struct nc_pack
|
||||
return dst->combination ^ mask;
|
||||
}
|
||||
|
||||
static void _combine_packets(const struct nc_packet *src, struct nc_packet *dst, size_t datagram_size)
|
||||
static void _combine_packets(const struct nc_packet *src, struct nc_packet *dst)
|
||||
{
|
||||
// TODO verify that this combination mask is set correctly.
|
||||
if (dst->len < src->len)
|
||||
FATALF("Expected the destination buffer to be at least %zu", src->len);
|
||||
dst->combination = _combine_masks(src, dst);
|
||||
size_t i;
|
||||
for(i=0;i<datagram_size;i++)
|
||||
for(i=0; i < src->len; i++)
|
||||
dst->payload[i]^=src->payload[i];
|
||||
}
|
||||
|
||||
@ -234,20 +223,36 @@ static int _nc_tx_combine_random_payloads(struct nc_half *n, struct nc_packet *p
|
||||
if (n->packets[index].flags & FLAG_NEW){
|
||||
if (added_new)
|
||||
continue;
|
||||
_combine_packets(&n->packets[index], packet, n->datagram_size);
|
||||
_combine_packets(&n->packets[index], packet);
|
||||
added_new = 1;
|
||||
n->count_new --;
|
||||
n->packets[index].flags =0;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (combination&1)
|
||||
_combine_packets(&n->packets[index], packet, n->datagram_size);
|
||||
_combine_packets(&n->packets[index], packet);
|
||||
combination>>=1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int nc_tx_packet_urgency(struct nc *n)
|
||||
{
|
||||
// new data? send asap
|
||||
if (n->tx.count_new)
|
||||
return URGENCY_ASAP;
|
||||
// ack required? send soon
|
||||
if (n->rx.unseen != n->rx.last_ack)
|
||||
return URGENCY_ASAP;
|
||||
// no new data at either end? don't care
|
||||
if (!n->tx.queue_size && !n->rx.queue_size)
|
||||
return URGENCY_IDLE;
|
||||
// send soon-ish
|
||||
return URGENCY_SOON;
|
||||
}
|
||||
|
||||
// construct a packet and return the payload size
|
||||
int nc_tx_produce_packet(struct nc *n, uint8_t *datagram, uint32_t buffer_size)
|
||||
{
|
||||
@ -255,32 +260,57 @@ int nc_tx_produce_packet(struct nc *n, uint8_t *datagram, uint32_t buffer_size)
|
||||
if (buffer_size < n->tx.datagram_size+NC_HEADER_LEN)
|
||||
return -1;
|
||||
|
||||
if (_nc_get_ack(&n->rx, &datagram[0], &datagram[1]))
|
||||
return -1;
|
||||
uint8_t window_size = 32 - (n->rx.unseen - n->rx.deliver_next);
|
||||
if (window_size > n->rx.max_queue_size)
|
||||
window_size = n->rx.max_queue_size;
|
||||
|
||||
if (!n->tx.queue_size){
|
||||
// No data to send, just send an ack
|
||||
// TODO don't ack too often
|
||||
return 2;
|
||||
}
|
||||
datagram[0]=n->rx.unseen;
|
||||
datagram[1]=window_size;
|
||||
n->rx.last_ack = n->rx.unseen;
|
||||
size_t len=2;
|
||||
|
||||
if (n->tx.queue_size){
|
||||
// Produce linear combination
|
||||
struct nc_packet packet={
|
||||
.sequence = n->tx.window_start,
|
||||
.combination = 0,
|
||||
.payload = &datagram[NC_HEADER_LEN],
|
||||
.len = buffer_size - NC_HEADER_LEN,
|
||||
};
|
||||
bzero(packet.payload, packet.len);
|
||||
|
||||
if (_nc_tx_combine_random_payloads(&n->tx, &packet))
|
||||
return -1;
|
||||
|
||||
// TODO assert actual_combination? (should never be zero)
|
||||
// Write out bitmap of actual combinations involved
|
||||
datagram[2] = packet.sequence;
|
||||
write_uint32(&datagram[3], packet.combination);
|
||||
return NC_HEADER_LEN+n->tx.datagram_size;
|
||||
len = packet.len + NC_HEADER_LEN;
|
||||
// truncate zero bytes from the end
|
||||
while(!datagram[len-1])
|
||||
len--;
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
static int _nc_dump_half(struct nc_half *n)
|
||||
{
|
||||
DEBUGF(" window start; %d", n->window_start);
|
||||
DEBUGF(" queue size; %d", n->queue_size);
|
||||
DEBUGF(" max queue size; %d", n->max_queue_size);
|
||||
int i;
|
||||
for (i=0;i<n->max_queue_size;i++){
|
||||
if (!n->packets[i].payload)
|
||||
continue;
|
||||
DEBUGF(" %02d: 0x%02x, 0x%08x",
|
||||
i, n->packets[i].sequence, n->packets[i].combination);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
// note, the incoming packet buffer must be allocated from the heap
|
||||
// this function will take responsibility for releasing it
|
||||
static int _nc_rx_combine_packet(struct nc_half *n, struct nc_packet *packet)
|
||||
{
|
||||
int i;
|
||||
@ -296,12 +326,12 @@ static int _nc_rx_combine_packet(struct nc_half *n, struct nc_packet *packet)
|
||||
|
||||
// rx packet doesn't add any new information
|
||||
if (new_mask==0){
|
||||
free(packet->payload);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (new_mask < packet->combination){
|
||||
_combine_packets(&n->packets[i], packet, n->datagram_size);
|
||||
}
|
||||
if (new_mask < packet->combination)
|
||||
_combine_packets(&n->packets[i], packet);
|
||||
}
|
||||
|
||||
// the new packet must contain new information that will cause a new packet to be seen.
|
||||
@ -311,15 +341,12 @@ static int _nc_rx_combine_packet(struct nc_half *n, struct nc_packet *packet)
|
||||
|
||||
int index = packet->sequence & (n->max_queue_size -1);
|
||||
if (n->packets[index].payload){
|
||||
fprintf(stderr, "Attempted to replace RX payload %d (%d) with %d without freeing it first\n",index,n->packets[index].sequence, packet->sequence);
|
||||
exit(-1);
|
||||
_nc_dump_half(n);
|
||||
free(packet->payload);
|
||||
FATALF("Attempted to replace RX payload %d (%d) with %d without freeing it first",index,n->packets[index].sequence, packet->sequence);
|
||||
return 1;
|
||||
}
|
||||
|
||||
// try to duplicate the payload first, we don't want to reduce existing packets if this fails.
|
||||
unsigned char *dup_payload = malloc(n->datagram_size);
|
||||
if (!dup_payload)
|
||||
return -1;
|
||||
bcopy(packet->payload, dup_payload, n->datagram_size);
|
||||
// reduce other stored packets
|
||||
for (i=0;i<n->max_queue_size;i++){
|
||||
if (!n->packets[i].payload || _compare_uint8(n->packets[i].sequence, packet->sequence) > 0)
|
||||
@ -328,15 +355,23 @@ static int _nc_rx_combine_packet(struct nc_half *n, struct nc_packet *packet)
|
||||
// n->packets[i].sequence, n->packets[i].combination,
|
||||
// packet->sequence, packet->combination);
|
||||
uint32_t new_mask = _combine_masks(packet, &n->packets[i]);
|
||||
if (new_mask < n->packets[i].combination){
|
||||
_combine_packets(packet, &n->packets[i], n->datagram_size);
|
||||
}
|
||||
if (new_mask < n->packets[i].combination)
|
||||
_combine_packets(packet, &n->packets[i]);
|
||||
}
|
||||
|
||||
// add the packet to our incoming list
|
||||
n->packets[index]=*packet;
|
||||
n->packets[index].payload = dup_payload;
|
||||
n->queue_size++;
|
||||
|
||||
// find the first missing seq
|
||||
uint8_t seq;
|
||||
for (seq = n->window_start; ;seq++){
|
||||
int index = seq & (n->max_queue_size -1);
|
||||
if (n->packets[index].sequence != seq || !n->packets[index].payload)
|
||||
break;
|
||||
}
|
||||
n->unseen = seq;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -357,28 +392,27 @@ static void _nc_rx_advance_window(struct nc_half *n, uint8_t new_window_start)
|
||||
}
|
||||
}
|
||||
|
||||
int nc_rx_packet(struct nc *n, uint8_t *payload, size_t len)
|
||||
int nc_rx_packet(struct nc *n, const uint8_t *payload, size_t len)
|
||||
{
|
||||
if (len!=2 && len != NC_HEADER_LEN+n->rx.datagram_size){
|
||||
fprintf(stderr, "len=%zd\n",len);
|
||||
return -1;
|
||||
if (len>=2){
|
||||
_nc_ack(&n->tx, payload[0], payload[1]);
|
||||
}
|
||||
|
||||
_nc_ack(&n->tx, payload[0], payload[1]);
|
||||
|
||||
if (len < NC_HEADER_LEN+n->rx.datagram_size)
|
||||
return 0;
|
||||
if (len<NC_HEADER_LEN)
|
||||
return 1;
|
||||
|
||||
uint8_t new_window_start = payload[2];
|
||||
|
||||
// assume incoming packets can be padded with zero's up to n->rx.datagram_size
|
||||
struct nc_packet packet={
|
||||
.sequence = new_window_start,
|
||||
.combination = read_uint32(&payload[3]),
|
||||
.payload = &payload[NC_HEADER_LEN],
|
||||
.payload = emalloc_zero(n->rx.datagram_size),
|
||||
.len = n->rx.datagram_size,
|
||||
};
|
||||
bcopy(&payload[NC_HEADER_LEN], packet.payload, len - NC_HEADER_LEN);
|
||||
|
||||
int r = _nc_rx_combine_packet(&n->rx, &packet);
|
||||
|
||||
_nc_rx_advance_window(&n->rx, new_window_start);
|
||||
|
||||
return r;
|
||||
@ -414,31 +448,12 @@ int nc_rx_next_delivered(struct nc *n, uint8_t *payload, int buffer_size)
|
||||
queue is full.
|
||||
4. nc_tx_ack_dof() works, rejects bad input, and correctly releases buffers.
|
||||
5. nc_tx_random_linear_combination() works, rejects bad input, and produces valid
|
||||
linear combinations of the enqueued datagrams, and never produces all zeroes.
|
||||
linear combinations of the enqueuedumd datagrams, and never produces all zeroes.
|
||||
6. nc_rx_linear_combination() works, rejects bad input
|
||||
7. nc_rx_linear_combination() rejects when RX queue full, when combination starts
|
||||
before current window.
|
||||
*/
|
||||
|
||||
static int _nc_dump_half(struct nc_half *n)
|
||||
{
|
||||
fprintf(stderr, " window start; %d\n", n->window_start);
|
||||
fprintf(stderr, " queue size; %d\n", n->queue_size);
|
||||
fprintf(stderr, " max queue size; %d\n", n->max_queue_size);
|
||||
int i;
|
||||
for (i=0;i<n->max_queue_size;i++){
|
||||
if (!n->packets[i].payload)
|
||||
continue;
|
||||
fprintf(stderr, " %02d: 0x%02x, 0x%08x ",
|
||||
i, n->packets[i].sequence, n->packets[i].combination);
|
||||
int j;
|
||||
for(j=0;j<32;j++)
|
||||
fprintf(stderr, "%0d",(n->packets[i].combination>>(31-j))&1);
|
||||
fprintf(stderr, "\n");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void _nc_dump(struct nc *n)
|
||||
{
|
||||
fprintf(stderr, "TX\n");
|
||||
|
@ -3,4 +3,19 @@
|
||||
|
||||
#define NC_HEADER_LEN 7
|
||||
|
||||
#define URGENCY_ASAP 0
|
||||
#define URGENCY_SOON 1
|
||||
#define URGENCY_IDLE 2
|
||||
|
||||
struct nc;
|
||||
|
||||
struct nc *nc_new(uint8_t max_window_size, uint8_t datagram_size);
|
||||
int nc_free(struct nc *n);
|
||||
int nc_tx_has_room(struct nc *n);
|
||||
int nc_tx_enqueue_datagram(struct nc *n, unsigned char *d, size_t len);
|
||||
int nc_tx_produce_packet(struct nc *n, uint8_t *datagram, uint32_t buffer_size);
|
||||
int nc_rx_packet(struct nc *n, const uint8_t *payload, size_t len);
|
||||
int nc_rx_next_delivered(struct nc *n, uint8_t *payload, int buffer_size);
|
||||
int nc_tx_packet_urgency(struct nc *n);
|
||||
|
||||
#endif
|
197
radio_link.c
197
radio_link.c
@ -49,6 +49,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
#include "overlay_buffer.h"
|
||||
#include "golay.h"
|
||||
#include "radio_link.h"
|
||||
#include "network_coding.h"
|
||||
|
||||
#define MAVLINK_MSG_ID_RADIO 166
|
||||
#define MAVLINK_MSG_ID_DATASTREAM 67
|
||||
@ -73,17 +74,18 @@ struct mavlink_RADIO_v10 {
|
||||
|
||||
*/
|
||||
|
||||
#define PAYLOAD_FRAGMENT 0xFF
|
||||
#define FEC_LENGTH 32
|
||||
#define FEC_MAX_BYTES 223
|
||||
#define RADIO_HEADER_LENGTH 6
|
||||
#define RADIO_USED_HEADER_LENGTH 4
|
||||
#define RADIO_ACTUAL_HEADER_LENGTH 4
|
||||
#define RADIO_CRC_LENGTH 2
|
||||
|
||||
#define LINK_PAYLOAD_MTU (LINK_MTU - FEC_LENGTH - RADIO_HEADER_LENGTH - RADIO_CRC_LENGTH)
|
||||
#define LINK_NC_MTU (LINK_MTU - FEC_LENGTH - RADIO_ACTUAL_HEADER_LENGTH)
|
||||
#define LINK_PAYLOAD_MTU (LINK_NC_MTU - NC_HEADER_LEN)
|
||||
|
||||
struct radio_link_state{
|
||||
// next seq for transmission
|
||||
int tx_seq;
|
||||
struct nc *network_coding;
|
||||
|
||||
// small buffer for parsing incoming bytes from the serial interface,
|
||||
// looking for recoverable link layer packets
|
||||
@ -94,8 +96,6 @@ struct radio_link_state{
|
||||
// decoded length of next link layer packet
|
||||
// including all header and footer bytes
|
||||
int payload_length;
|
||||
// last rx seq for reassembly
|
||||
int seq;
|
||||
// offset within payload that we have found a valid looking header
|
||||
int payload_start;
|
||||
// offset after payload_start for incoming bytes
|
||||
@ -152,6 +152,7 @@ int decode_rs_8(data_t *data, int *eras_pos, int no_eras, int pad);
|
||||
int radio_link_free(struct overlay_interface *interface)
|
||||
{
|
||||
if (interface->radio_link_state){
|
||||
nc_free(interface->radio_link_state->network_coding);
|
||||
free(interface->radio_link_state);
|
||||
interface->radio_link_state=NULL;
|
||||
}
|
||||
@ -161,6 +162,7 @@ int radio_link_free(struct overlay_interface *interface)
|
||||
int radio_link_init(struct overlay_interface *interface)
|
||||
{
|
||||
interface->radio_link_state = emalloc_zero(sizeof(struct radio_link_state));
|
||||
interface->radio_link_state->network_coding = nc_new(16, LINK_PAYLOAD_MTU);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -171,48 +173,35 @@ void radio_link_state_html(struct strbuf *b, struct overlay_interface *interface
|
||||
strbuf_sprintf(b, "Remote RSSI: %ddB<br>", state->remote_rssi);
|
||||
}
|
||||
|
||||
// write a new link layer packet to interface->txbuffer
|
||||
// consuming more bytes from the next interface->tx_packet if required
|
||||
static int radio_link_encode_packet(struct radio_link_state *link_state)
|
||||
static int encode_next_packet(struct radio_link_state *link_state)
|
||||
{
|
||||
// if we have nothing interesting left to send, don't create a packet at all
|
||||
if (!link_state->tx_packet)
|
||||
return 0;
|
||||
while (link_state->tx_packet && nc_tx_has_room(link_state->network_coding)){
|
||||
// queue one packet
|
||||
uint8_t next_packet[LINK_PAYLOAD_MTU];
|
||||
bzero(next_packet, sizeof(next_packet));
|
||||
ob_checkpoint(link_state->tx_packet);
|
||||
|
||||
int count = ob_remaining(link_state->tx_packet);
|
||||
int startP = (ob_position(link_state->tx_packet) == 0);
|
||||
int endP = 1;
|
||||
if (count > LINK_PAYLOAD_MTU){
|
||||
count = LINK_PAYLOAD_MTU;
|
||||
endP = 0;
|
||||
if (count > LINK_PAYLOAD_MTU -1){
|
||||
count = LINK_PAYLOAD_MTU -1;
|
||||
next_packet[0]=PAYLOAD_FRAGMENT;
|
||||
}else
|
||||
next_packet[0]=count;
|
||||
|
||||
ob_get_bytes(link_state->tx_packet, &next_packet[1], count);
|
||||
if (nc_tx_enqueue_datagram(link_state->network_coding, next_packet, LINK_PAYLOAD_MTU)==0){
|
||||
if (config.debug.radio_link)
|
||||
DEBUGF("Enqueued fragment len %d", count+1);
|
||||
}else{
|
||||
ob_rewind(link_state->tx_packet);
|
||||
break;
|
||||
}
|
||||
|
||||
link_state->txbuffer[0]=0xfe; // mavlink v1.0 magic header
|
||||
|
||||
// we need to add FEC_LENGTH for FEC, but the length field doesn't include the expected headers or CRC
|
||||
int len = count + FEC_LENGTH - RADIO_CRC_LENGTH;
|
||||
link_state->txbuffer[1]=len; // mavlink payload length
|
||||
link_state->txbuffer[2]=(len & 0xF);
|
||||
link_state->txbuffer[3]=0;
|
||||
|
||||
// add golay encoding so that decoding the actual length is more reliable
|
||||
golay_encode(&link_state->txbuffer[1]);
|
||||
|
||||
|
||||
link_state->txbuffer[4]=(link_state->tx_seq++) & 0x3f;
|
||||
if (startP) link_state->txbuffer[4]|=0x40;
|
||||
if (endP) link_state->txbuffer[4]|=0x80;
|
||||
link_state->txbuffer[5]=MAVLINK_MSG_ID_DATASTREAM;
|
||||
|
||||
ob_get_bytes(link_state->tx_packet, &link_state->txbuffer[6], count);
|
||||
|
||||
encode_rs_8(&link_state->txbuffer[4], &link_state->txbuffer[6+count], FEC_MAX_BYTES - (count+2));
|
||||
link_state->tx_bytes=len + RADIO_CRC_LENGTH + RADIO_HEADER_LENGTH;
|
||||
if (endP){
|
||||
if (!ob_remaining(link_state->tx_packet)){
|
||||
ob_free(link_state->tx_packet);
|
||||
link_state->tx_packet=NULL;
|
||||
overlay_queue_schedule_next(gettime_ms());
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -240,6 +229,41 @@ int radio_link_queue_packet(struct overlay_interface *interface, struct overlay_
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int send_link_packet(struct overlay_interface *interface)
|
||||
{
|
||||
struct radio_link_state *link_state = interface->radio_link_state;
|
||||
|
||||
int data_length = nc_tx_produce_packet(link_state->network_coding,
|
||||
&link_state->txbuffer[RADIO_ACTUAL_HEADER_LENGTH], LINK_NC_MTU);
|
||||
|
||||
// if we have nothing interesting to send, don't create a packet at all
|
||||
if (data_length <=0)
|
||||
return 0;
|
||||
|
||||
link_state->txbuffer[0]=0xfe; // mavlink v1.0 magic header
|
||||
|
||||
// the current firmware assumes that the whole packet contains 6 bytes of header and 2 bytes of crc
|
||||
// that are not counted in the length.
|
||||
int whole_packet = data_length + FEC_LENGTH + RADIO_ACTUAL_HEADER_LENGTH;
|
||||
int radio_length = whole_packet - RADIO_HEADER_LENGTH - RADIO_CRC_LENGTH;
|
||||
link_state->txbuffer[1]=radio_length; // mavlink payload length
|
||||
link_state->txbuffer[2]=(radio_length & 0xF);
|
||||
link_state->txbuffer[3]=0;
|
||||
|
||||
// add golay encoding so that decoding the actual length is more reliable
|
||||
golay_encode(&link_state->txbuffer[1]);
|
||||
|
||||
encode_rs_8(&link_state->txbuffer[RADIO_ACTUAL_HEADER_LENGTH],
|
||||
&link_state->txbuffer[RADIO_ACTUAL_HEADER_LENGTH + data_length],
|
||||
FEC_MAX_BYTES - data_length);
|
||||
|
||||
link_state->tx_bytes=whole_packet;
|
||||
if (config.debug.radio_link)
|
||||
DEBUGF("Produced packet len %d", whole_packet);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int build_heartbeat(struct radio_link_state *link_state)
|
||||
{
|
||||
int count=9;
|
||||
@ -280,6 +304,9 @@ int radio_link_tx(struct overlay_interface *interface)
|
||||
|
||||
while(1){
|
||||
|
||||
// encode more data if we have a packet waiting, and have space
|
||||
encode_next_packet(link_state);
|
||||
|
||||
if (link_state->tx_bytes){
|
||||
if (link_state->next_tx_allowed > now){
|
||||
interface->alarm.alarm = link_state->next_tx_allowed;
|
||||
@ -318,14 +345,25 @@ int radio_link_tx(struct overlay_interface *interface)
|
||||
if (link_state->remaining_space < LINK_MTU + HEARTBEAT_SIZE)
|
||||
link_state->next_heartbeat = now;
|
||||
|
||||
if (!link_state->tx_packet){
|
||||
// finished current packet, wait for more.
|
||||
int urgency = nc_tx_packet_urgency(link_state->network_coding);
|
||||
time_ms_t delay = 500;
|
||||
switch (urgency){
|
||||
case URGENCY_ASAP:
|
||||
delay=20;
|
||||
break;
|
||||
case URGENCY_SOON:
|
||||
delay=200;
|
||||
break;
|
||||
}
|
||||
|
||||
if (link_state->last_packet + delay > now){
|
||||
interface->alarm.alarm = link_state->last_packet + delay;
|
||||
if (interface->alarm.alarm > next_tick)
|
||||
interface->alarm.alarm = next_tick;
|
||||
break;
|
||||
}
|
||||
|
||||
// encode another packet fragment
|
||||
radio_link_encode_packet(link_state);
|
||||
send_link_packet(interface);
|
||||
link_state->last_packet = now;
|
||||
}
|
||||
|
||||
@ -369,7 +407,7 @@ static int parse_heartbeat(struct radio_link_state *state, const unsigned char *
|
||||
}
|
||||
|
||||
static int radio_link_parse(struct overlay_interface *interface, struct radio_link_state *state,
|
||||
size_t packet_length, uint8_t *payload, int *backtrack)
|
||||
int packet_length, unsigned char *payload, int *backtrack)
|
||||
{
|
||||
*backtrack=0;
|
||||
if (packet_length==17){
|
||||
@ -385,57 +423,58 @@ static int radio_link_parse(struct overlay_interface *interface, struct radio_li
|
||||
return 0;
|
||||
}
|
||||
|
||||
size_t data_bytes = packet_length - (RADIO_USED_HEADER_LENGTH + FEC_LENGTH);
|
||||
payload += RADIO_ACTUAL_HEADER_LENGTH;
|
||||
packet_length -= RADIO_ACTUAL_HEADER_LENGTH + FEC_LENGTH;
|
||||
|
||||
int errors=decode_rs_8(&payload[4], NULL, 0, FEC_MAX_BYTES - data_bytes);
|
||||
int errors=decode_rs_8(payload, NULL, 0, FEC_MAX_BYTES - packet_length);
|
||||
if (errors==-1){
|
||||
if (config.debug.radio_link)
|
||||
DEBUGF("Reed-Solomon error correction failed");
|
||||
return 0;
|
||||
}
|
||||
*backtrack=errors;
|
||||
data_bytes -= 2;
|
||||
int seq=payload[4]&0x3f;
|
||||
|
||||
if (config.debug.radio_link){
|
||||
DEBUGF("Received RS protected message, len: %zd, errors: %d, seq: %d, flags:%s%s",
|
||||
data_bytes,
|
||||
errors,
|
||||
seq,
|
||||
payload[4]&0x40?" start":"",
|
||||
payload[4]&0x80?" end":"");
|
||||
}
|
||||
|
||||
if (seq != ((state->seq+1)&0x3f)){
|
||||
// reject partial packet if we missed a sequence number
|
||||
int rx = nc_rx_packet(state->network_coding, payload, packet_length);
|
||||
if (config.debug.radio_link)
|
||||
DEBUGF("Rejecting packet, sequence jumped from %d to %d", state->seq, seq);
|
||||
state->packet_length=sizeof(state->dst)+1;
|
||||
}
|
||||
DEBUGF("RX returned %d", rx);
|
||||
|
||||
if (payload[4]&0x40){
|
||||
// start a new packet
|
||||
state->packet_length=0;
|
||||
}
|
||||
if (rx==0){
|
||||
// we received an interesting packet, can we deliver anything?
|
||||
uint8_t fragment[LINK_PAYLOAD_MTU];
|
||||
while(1){
|
||||
int len=nc_rx_next_delivered(state->network_coding, fragment, sizeof(fragment));
|
||||
if (len<=0)
|
||||
break;
|
||||
int fragment_len=fragment[0];
|
||||
if (fragment_len == PAYLOAD_FRAGMENT)
|
||||
fragment_len = len -1;
|
||||
|
||||
state->seq=payload[4]&0x3f;
|
||||
if (state->packet_length + data_bytes > sizeof(state->dst)){
|
||||
if (config.debug.radio_link)
|
||||
DEBUG("Fragmented packet is too long or a previous piece was missed - discarding");
|
||||
state->packet_length=sizeof(state->dst)+1;
|
||||
return 1;
|
||||
}
|
||||
// is this fragment length invalid?
|
||||
if (fragment_len > len -1)
|
||||
state->packet_length=sizeof(state->dst);
|
||||
|
||||
bcopy(&payload[RADIO_HEADER_LENGTH], &state->dst[state->packet_length], data_bytes);
|
||||
state->packet_length+=data_bytes;
|
||||
// can we fit this fragment into our payload buffer?
|
||||
if (fragment_len+state->packet_length < sizeof(state->dst)){
|
||||
bcopy(&fragment[1], &state->dst[state->packet_length], fragment_len);
|
||||
state->packet_length+=fragment_len;
|
||||
|
||||
if (payload[4]&0x80) {
|
||||
// is this the last fragment?
|
||||
if (fragment[0] != PAYLOAD_FRAGMENT){
|
||||
if (config.debug.radio_link)
|
||||
DEBUGF("PDU Complete (length=%d)",state->packet_length);
|
||||
|
||||
packetOkOverlay(interface, state->dst, state->packet_length, -1, NULL, 0);
|
||||
state->packet_length=sizeof(state->dst)+1;
|
||||
if (packetOkOverlay(interface, state->dst, state->packet_length, -1, NULL, 0)){
|
||||
dump("Invalid packet?", state->dst, state->packet_length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reset the buffer for the next packet
|
||||
if (fragment[0] != PAYLOAD_FRAGMENT)
|
||||
state->packet_length=0;
|
||||
}
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -702,7 +702,6 @@ start_radio_instance() {
|
||||
set debug.rhizome_tx on \
|
||||
set debug.rhizome_rx on \
|
||||
set debug.throttling on \
|
||||
set debug.radio_link on \
|
||||
set rhizome.advertise.interval 5000 \
|
||||
set rhizome.rhizome_mdp_block_size 375 \
|
||||
set log.console.level debug \
|
||||
|
@ -296,10 +296,10 @@ setup_simulate_extender() {
|
||||
foreach_instance +A +B start_routing_instance
|
||||
}
|
||||
test_simulate_extender() {
|
||||
wait_until path_exists +A +B
|
||||
wait_until path_exists +B +A
|
||||
wait_until --timeout=20 path_exists +A +B
|
||||
wait_until --timeout=20 path_exists +B +A
|
||||
set_instance +A
|
||||
executeOk_servald mdp ping --timeout=3 $SIDB 1
|
||||
executeOk_servald mdp ping --interval=0.500 --timeout=10 $SIDB 20
|
||||
tfw_cat --stdout --stderr
|
||||
}
|
||||
teardown_simulate_extender() {
|
||||
|
Loading…
Reference in New Issue
Block a user