Simplify mavlink link frame creation

- build one frame at a time directly from the prepared overlay buffer
This commit is contained in:
Jeremy Lakeman 2013-09-11 17:15:43 +09:30
parent e29564bc6c
commit 6f26594447
7 changed files with 83 additions and 108 deletions

View File

@ -46,6 +46,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "serval.h"
#include "conf.h"
#include "overlay_buffer.h"
#define MAVLINK_MSG_ID_RADIO 166
#define MAVLINK_MSG_ID_DATASTREAM 67
@ -134,33 +135,40 @@ struct mavlink_RADIO_v10 {
void encode_rs_8(data_t *data, data_t *parity,int pad);
int decode_rs_8(data_t *data, int *eras_pos, int no_eras, int pad);
int stream_as_mavlink(int sequence_number,int startP,int endP,
const unsigned char *data,int count,
unsigned char *frame,int *outlen)
int mavlink_encode_packet(struct overlay_interface *interface)
{
if (count>252-6-32) return -1;
int count = ob_remaining(interface->tx_packet);
int startP = !ob_position(interface->tx_packet);
int endP = 1;
if (count>252-6-32){
count = 252-6-32;
endP = 0;
}
frame[0]=0xfe; // mavlink v1.0 frame
interface->txbuffer[0]=0xfe; // mavlink v1.0 frame
/* payload len, excluding 6 byte header and 2 byte CRC.
But we use a 4-byte CRC, so need to add two to count to make packet lengths
be as expected.
Note that this construction will result in CRC errors by non-servald
programmes, which is probably more helpful than otherwise.
*/
frame[1]=count+32-2; // we need 32 bytes for the parity, but this field assumes
interface->txbuffer[1]=count+32-2; // we need 32 bytes for the parity, but this field assumes
// that there is a 2 byte CRC, so we can save two bytes
frame[2]=sequence_number; // packet sequence
frame[3]=0x00; // system ID of sender (MAV_TYPE_GENERIC)
frame[4]=0x40; // component ID of sender: we are reusing this to mark start,end of MDP frames
if (startP) frame[4]|=0x01;
if (endP) frame[4]|=0x02;
frame[5]=MAVLINK_MSG_ID_DATASTREAM; // message ID type of this frame: DATA_STREAM
interface->txbuffer[2]=0; // packet sequence
interface->txbuffer[3]=0x00; // system ID of sender (MAV_TYPE_GENERIC)
interface->txbuffer[4]=0x40; // component ID of sender: we are reusing this to mark start,end of MDP frames
if (startP) interface->txbuffer[4]|=0x01;
if (endP) interface->txbuffer[4]|=0x02;
interface->txbuffer[5]=MAVLINK_MSG_ID_DATASTREAM; // message ID type of this frame: DATA_STREAM
// payload follows (we reuse the DATA_STREAM message type parameters)
bcopy(data,&frame[6],count);
ob_get_bytes(interface->tx_packet, &interface->txbuffer[6], count);
encode_rs_8(&frame[6],&frame[6+count],(223-count));
*outlen=6+count+32;
encode_rs_8(&interface->txbuffer[6],&interface->txbuffer[6+count],(223-count));
interface->tx_bytes_pending=6+count+32;
if (endP){
ob_free(interface->tx_packet);
interface->tx_packet=NULL;
}
return 0;
}

View File

@ -139,6 +139,15 @@ int ob_unlimitsize(struct overlay_buffer *b)
return 0;
}
int ob_flip(struct overlay_buffer *b)
{
b->checkpointLength=0;
if (ob_limitsize(b, b->position))
return -1;
b->position=0;
return 0;
}
int _ob_makespace(struct __sourceloc __whence, struct overlay_buffer *b,int bytes)
{
if (b->sizeLimit!=-1 && b->position+bytes>b->sizeLimit) {

View File

@ -50,6 +50,7 @@ int ob_free(struct overlay_buffer *b);
int ob_checkpoint(struct overlay_buffer *b);
int ob_rewind(struct overlay_buffer *b);
int ob_limitsize(struct overlay_buffer *b,int bytes);
int ob_flip(struct overlay_buffer *b);
int ob_unlimitsize(struct overlay_buffer *b);
int _ob_makespace(struct __sourceloc whence, struct overlay_buffer *b,int bytes);
int ob_set(struct overlay_buffer *b, int ofs, unsigned char byte);

View File

@ -721,38 +721,50 @@ static void interface_read_stream(struct overlay_interface *interface){
static void write_stream_buffer(overlay_interface *interface){
time_ms_t now = gettime_ms();
int bytes_allowed=interface->tx_bytes_pending;
if (bytes_allowed>0 && interface->next_tx_allowed < now) {
// Throttle output to a prescribed bit-rate
// first, reduce the number of bytes based on the configured burst size
if (interface->throttle_burst_write_size && bytes_allowed > interface->throttle_burst_write_size)
bytes_allowed = interface->throttle_burst_write_size;
// Throttle output to a prescribed bit-rate
// first, reduce the number of bytes based on the configured burst size
int bytes_allowed=interface->throttle_burst_write_size;
if (interface->next_tx_allowed < now){
int total_written=0;
while ((interface->tx_bytes_pending>0 || interface->tx_packet) &&
(bytes_allowed>0 || interface->throttle_burst_write_size==0)) {
if (interface->tx_bytes_pending==0 && interface->tx_packet){
// TODO firmware heartbeat packets
// prepare a new link layer packet in txbuffer
if (mavlink_encode_packet(interface))
break;
}
if (config.debug.packetradio)
DEBUGF("Trying to write %d bytes",bytes_allowed);
int written=write(interface->alarm.poll.fd,interface->txbuffer,
bytes_allowed);
if (written>0) {
int bytes = interface->tx_bytes_pending;
if (interface->throttle_burst_write_size && bytes>bytes_allowed)
bytes=bytes_allowed;
if (config.debug.packetradio)
DEBUGF("Trying to write %d bytes",bytes);
int written=write(interface->alarm.poll.fd, interface->txbuffer, bytes);
if (written<=0)
break;
interface->tx_bytes_pending-=written;
bcopy(&interface->txbuffer[written],&interface->txbuffer[0],
interface->tx_bytes_pending);
total_written+=written;
bytes_allowed-=written;
if (interface->tx_bytes_pending)
bcopy(&interface->txbuffer[written],&interface->txbuffer[0],
interface->tx_bytes_pending);
if (config.debug.packetradio)
DEBUGF("Wrote %d bytes (%d left pending)", written, interface->tx_bytes_pending);
// Now when are we allowed to send more?
if (interface->throttle_bytes_per_second>0) {
int delay = written*1000/interface->throttle_bytes_per_second;
if (config.debug.throttling)
DEBUGF("Throttling for %dms.",delay);
interface->next_tx_allowed = now + delay;
}
} else {
if (config.debug.packetradio)
DEBUGF("Failed to write any data");
}
// Now when are we allowed to send more?
if (interface->throttle_bytes_per_second>0) {
int delay = total_written*1000/interface->throttle_bytes_per_second;
if (config.debug.throttling)
DEBUGF("Throttling for %dms.",delay);
interface->next_tx_allowed = now + delay;
}
}
if (interface->tx_bytes_pending>0){
if (interface->tx_bytes_pending>0 || interface->tx_packet){
if (interface->next_tx_allowed > now){
// We can't write now, so clear POLLOUT flag
interface->alarm.poll.events&=~POLLOUT;
@ -874,40 +886,21 @@ int overlay_broadcast_ensemble(struct network_destination *destination, struct o
switch(interface->socket_type){
case SOCK_STREAM:
{
if (interface->tx_bytes_pending>0){
if (interface->tx_packet){
ob_free(buffer);
return WHYF("Cannot send two packets to a stream at the same time");
}
/* Encode packet with SLIP escaping.
XXX - Add error correction here also */
int out_len=0;
int encoded = slip_encode(SLIP_FORMAT_MAVLINK,
bytes, len,
interface->txbuffer+out_len, sizeof(interface->txbuffer) - out_len);
ob_free(buffer);
if (encoded < 0)
return WHY("Buffer overflow");
if (config.debug.slip){
// Test decoding of the packet we send
struct slip_decode_state state;
state.encapsulator=SLIP_FORMAT_MAVLINK;
state.src_size=encoded;
state.src_offset=0;
state.src=interface->txbuffer+out_len;
slip_decode(&state);
// clear received packet after processing
state.packet_length=0;
}
out_len+=encoded;
interface->tx_bytes_pending=out_len;
// prepare the buffer for reading
ob_flip(buffer);
interface->tx_packet = buffer;
write_stream_buffer(interface);
if (interface->alarm.alarm!=-1){
unschedule(&interface->alarm);
schedule(&interface->alarm);
}
return 0;
}

View File

@ -452,14 +452,17 @@ typedef struct overlay_interface {
char name[256];
int recv_offset; /* file offset */
// stream socket tx state;
struct overlay_buffer *tx_packet;
unsigned char txbuffer[OVERLAY_INTERFACE_RX_BUFFER_SIZE];
int tx_bytes_pending;
// Throttle TX rate if required (stream interfaces only for now)
uint32_t throttle_bytes_per_second;
uint32_t throttle_burst_write_size;
uint64_t next_tx_allowed;
struct slip_decode_state slip_decode_state;
// copy of ifconfig flags
@ -878,8 +881,6 @@ int generate_nonce(unsigned char *nonce,int bytes);
int mavlink_decode(struct slip_decode_state *state,uint8_t c);
int mavlink_heartbeat(unsigned char *frame,int *outlen);
int stream_as_mavlink(int sequence_number,int startP,int endP,
const unsigned char *data,int count,
unsigned char *frame,int *outlen);
int mavlink_encode_packet(struct overlay_interface *interface);
#endif // __SERVALD_SERVALD_H

38
slip.c
View File

@ -86,44 +86,6 @@ int slip_encode(int format,
const unsigned char *src, int src_bytes, unsigned char *dst, int dst_len)
{
switch(format) {
case SLIP_FORMAT_MAVLINK:
{
int i;
int dst_offset=0;
// Radio frames are limited to 252 bytes.
// MAVLink header takes 7 bytes
// Reed-Solomon parity takes 32 bytes
// That leaves a maximum size of 252-39=213 bytes
for(i=0;i<src_bytes;i+=213) {
int slice_len=0;
int slice_bytes=213;
if (i+slice_bytes>src_bytes) slice_bytes=src_bytes-i;
if (dst_offset+slice_bytes+6+2>=dst_len) {
if (config.debug.mavlink)
DEBUGF("Would overflow output buffer.");
return -1;
} else {
int startP=i?0:1;
int endP=i+slice_bytes==src_bytes?1:0;
stream_as_mavlink(0,startP,endP,&src[i],slice_bytes,&dst[dst_offset],&slice_len);
dst_offset+=slice_len;
}
}
if (config.debug.mavlink) {
DEBUGF("Wrote %d byte packet as MAVLink frames",src_bytes);
}
if (config.debug.mavlink_payloads||config.debug.interactive_io) {
DEBUG_packet_visualise("Packet Sent",src,src_bytes);
}
if (config.debug.interactive_io) {
fprintf(stderr,"Press ENTER to continue..."); fflush(stderr);
char buffer[80];
if (!fgets(buffer,80,stdin))
FATAL_perror("calling fgets");
}
return dst_offset;
}
break;
case SLIP_FORMAT_SLIP:
{
int offset=0;

View File

@ -235,6 +235,7 @@ setup_simulate_extender() {
set debug.throttling on \
set debug.packetradio on \
set interfaces.1.type CATEAR \
set interfaces.1.mdp_tick_ms 5000 \
set interfaces.1.socket_type STREAM \
set interfaces.1.encapsulation SINGLE \
set interfaces.1.point_to_point on \