From 6f2659444730fb5601748095e3cd05dc9bcc47a7 Mon Sep 17 00:00:00 2001 From: Jeremy Lakeman Date: Wed, 11 Sep 2013 17:15:43 +0930 Subject: [PATCH] Simplify mavlink link frame creation - build one frame at a time directly from the prepared overlay buffer --- mavlink.c | 40 +++++++++++-------- overlay_buffer.c | 9 +++++ overlay_buffer.h | 1 + overlay_interface.c | 93 +++++++++++++++++++++------------------------ serval.h | 9 +++-- slip.c | 38 ------------------ tests/routing | 1 + 7 files changed, 83 insertions(+), 108 deletions(-) diff --git a/mavlink.c b/mavlink.c index 208617dc..a4706043 100644 --- a/mavlink.c +++ b/mavlink.c @@ -46,6 +46,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. #include "serval.h" #include "conf.h" +#include "overlay_buffer.h" #define MAVLINK_MSG_ID_RADIO 166 #define MAVLINK_MSG_ID_DATASTREAM 67 @@ -134,33 +135,40 @@ struct mavlink_RADIO_v10 { void encode_rs_8(data_t *data, data_t *parity,int pad); int decode_rs_8(data_t *data, int *eras_pos, int no_eras, int pad); -int stream_as_mavlink(int sequence_number,int startP,int endP, - const unsigned char *data,int count, - unsigned char *frame,int *outlen) +int mavlink_encode_packet(struct overlay_interface *interface) { - if (count>252-6-32) return -1; + int count = ob_remaining(interface->tx_packet); + int startP = !ob_position(interface->tx_packet); + int endP = 1; + if (count>252-6-32){ + count = 252-6-32; + endP = 0; + } - frame[0]=0xfe; // mavlink v1.0 frame + interface->txbuffer[0]=0xfe; // mavlink v1.0 frame /* payload len, excluding 6 byte header and 2 byte CRC. But we use a 4-byte CRC, so need to add two to count to make packet lengths be as expected. Note that this construction will result in CRC errors by non-servald programmes, which is probably more helpful than otherwise. */ - frame[1]=count+32-2; // we need 32 bytes for the parity, but this field assumes + interface->txbuffer[1]=count+32-2; // we need 32 bytes for the parity, but this field assumes // that there is a 2 byte CRC, so we can save two bytes - frame[2]=sequence_number; // packet sequence - frame[3]=0x00; // system ID of sender (MAV_TYPE_GENERIC) - frame[4]=0x40; // component ID of sender: we are reusing this to mark start,end of MDP frames - if (startP) frame[4]|=0x01; - if (endP) frame[4]|=0x02; - frame[5]=MAVLINK_MSG_ID_DATASTREAM; // message ID type of this frame: DATA_STREAM + interface->txbuffer[2]=0; // packet sequence + interface->txbuffer[3]=0x00; // system ID of sender (MAV_TYPE_GENERIC) + interface->txbuffer[4]=0x40; // component ID of sender: we are reusing this to mark start,end of MDP frames + if (startP) interface->txbuffer[4]|=0x01; + if (endP) interface->txbuffer[4]|=0x02; + interface->txbuffer[5]=MAVLINK_MSG_ID_DATASTREAM; // message ID type of this frame: DATA_STREAM // payload follows (we reuse the DATA_STREAM message type parameters) - bcopy(data,&frame[6],count); + ob_get_bytes(interface->tx_packet, &interface->txbuffer[6], count); - encode_rs_8(&frame[6],&frame[6+count],(223-count)); - - *outlen=6+count+32; + encode_rs_8(&interface->txbuffer[6],&interface->txbuffer[6+count],(223-count)); + interface->tx_bytes_pending=6+count+32; + if (endP){ + ob_free(interface->tx_packet); + interface->tx_packet=NULL; + } return 0; } diff --git a/overlay_buffer.c b/overlay_buffer.c index ee2e4a00..1f44acf0 100644 --- a/overlay_buffer.c +++ b/overlay_buffer.c @@ -139,6 +139,15 @@ int ob_unlimitsize(struct overlay_buffer *b) return 0; } +int ob_flip(struct overlay_buffer *b) +{ + b->checkpointLength=0; + if (ob_limitsize(b, b->position)) + return -1; + b->position=0; + return 0; +} + int _ob_makespace(struct __sourceloc __whence, struct overlay_buffer *b,int bytes) { if (b->sizeLimit!=-1 && b->position+bytes>b->sizeLimit) { diff --git a/overlay_buffer.h b/overlay_buffer.h index 8c93f01a..49cc8ca8 100644 --- a/overlay_buffer.h +++ b/overlay_buffer.h @@ -50,6 +50,7 @@ int ob_free(struct overlay_buffer *b); int ob_checkpoint(struct overlay_buffer *b); int ob_rewind(struct overlay_buffer *b); int ob_limitsize(struct overlay_buffer *b,int bytes); +int ob_flip(struct overlay_buffer *b); int ob_unlimitsize(struct overlay_buffer *b); int _ob_makespace(struct __sourceloc whence, struct overlay_buffer *b,int bytes); int ob_set(struct overlay_buffer *b, int ofs, unsigned char byte); diff --git a/overlay_interface.c b/overlay_interface.c index 12e5dade..b0e8d60c 100644 --- a/overlay_interface.c +++ b/overlay_interface.c @@ -721,38 +721,50 @@ static void interface_read_stream(struct overlay_interface *interface){ static void write_stream_buffer(overlay_interface *interface){ time_ms_t now = gettime_ms(); - int bytes_allowed=interface->tx_bytes_pending; - if (bytes_allowed>0 && interface->next_tx_allowed < now) { - // Throttle output to a prescribed bit-rate - // first, reduce the number of bytes based on the configured burst size - if (interface->throttle_burst_write_size && bytes_allowed > interface->throttle_burst_write_size) - bytes_allowed = interface->throttle_burst_write_size; + // Throttle output to a prescribed bit-rate + // first, reduce the number of bytes based on the configured burst size + int bytes_allowed=interface->throttle_burst_write_size; + + if (interface->next_tx_allowed < now){ + int total_written=0; + while ((interface->tx_bytes_pending>0 || interface->tx_packet) && + (bytes_allowed>0 || interface->throttle_burst_write_size==0)) { + if (interface->tx_bytes_pending==0 && interface->tx_packet){ + // TODO firmware heartbeat packets + // prepare a new link layer packet in txbuffer + if (mavlink_encode_packet(interface)) + break; + } - if (config.debug.packetradio) - DEBUGF("Trying to write %d bytes",bytes_allowed); - int written=write(interface->alarm.poll.fd,interface->txbuffer, - bytes_allowed); - if (written>0) { + int bytes = interface->tx_bytes_pending; + if (interface->throttle_burst_write_size && bytes>bytes_allowed) + bytes=bytes_allowed; + if (config.debug.packetradio) + DEBUGF("Trying to write %d bytes",bytes); + int written=write(interface->alarm.poll.fd, interface->txbuffer, bytes); + if (written<=0) + break; + interface->tx_bytes_pending-=written; - bcopy(&interface->txbuffer[written],&interface->txbuffer[0], - interface->tx_bytes_pending); + total_written+=written; + bytes_allowed-=written; + if (interface->tx_bytes_pending) + bcopy(&interface->txbuffer[written],&interface->txbuffer[0], + interface->tx_bytes_pending); if (config.debug.packetradio) DEBUGF("Wrote %d bytes (%d left pending)", written, interface->tx_bytes_pending); - - // Now when are we allowed to send more? - if (interface->throttle_bytes_per_second>0) { - int delay = written*1000/interface->throttle_bytes_per_second; - if (config.debug.throttling) - DEBUGF("Throttling for %dms.",delay); - interface->next_tx_allowed = now + delay; - } - } else { - if (config.debug.packetradio) - DEBUGF("Failed to write any data"); + } + + // Now when are we allowed to send more? + if (interface->throttle_bytes_per_second>0) { + int delay = total_written*1000/interface->throttle_bytes_per_second; + if (config.debug.throttling) + DEBUGF("Throttling for %dms.",delay); + interface->next_tx_allowed = now + delay; } } - if (interface->tx_bytes_pending>0){ + if (interface->tx_bytes_pending>0 || interface->tx_packet){ if (interface->next_tx_allowed > now){ // We can't write now, so clear POLLOUT flag interface->alarm.poll.events&=~POLLOUT; @@ -874,40 +886,21 @@ int overlay_broadcast_ensemble(struct network_destination *destination, struct o switch(interface->socket_type){ case SOCK_STREAM: { - if (interface->tx_bytes_pending>0){ + if (interface->tx_packet){ ob_free(buffer); return WHYF("Cannot send two packets to a stream at the same time"); } - /* Encode packet with SLIP escaping. - XXX - Add error correction here also */ - int out_len=0; - int encoded = slip_encode(SLIP_FORMAT_MAVLINK, - bytes, len, - interface->txbuffer+out_len, sizeof(interface->txbuffer) - out_len); - ob_free(buffer); - if (encoded < 0) - return WHY("Buffer overflow"); - if (config.debug.slip){ - // Test decoding of the packet we send - struct slip_decode_state state; - state.encapsulator=SLIP_FORMAT_MAVLINK; - state.src_size=encoded; - state.src_offset=0; - state.src=interface->txbuffer+out_len; - slip_decode(&state); - // clear received packet after processing - state.packet_length=0; - } - - out_len+=encoded; - - interface->tx_bytes_pending=out_len; + // prepare the buffer for reading + ob_flip(buffer); + interface->tx_packet = buffer; write_stream_buffer(interface); + if (interface->alarm.alarm!=-1){ unschedule(&interface->alarm); schedule(&interface->alarm); } + return 0; } diff --git a/serval.h b/serval.h index f3ab79a3..f27c1614 100644 --- a/serval.h +++ b/serval.h @@ -452,14 +452,17 @@ typedef struct overlay_interface { char name[256]; int recv_offset; /* file offset */ + + // stream socket tx state; + struct overlay_buffer *tx_packet; unsigned char txbuffer[OVERLAY_INTERFACE_RX_BUFFER_SIZE]; int tx_bytes_pending; - // Throttle TX rate if required (stream interfaces only for now) uint32_t throttle_bytes_per_second; uint32_t throttle_burst_write_size; uint64_t next_tx_allowed; + struct slip_decode_state slip_decode_state; // copy of ifconfig flags @@ -878,8 +881,6 @@ int generate_nonce(unsigned char *nonce,int bytes); int mavlink_decode(struct slip_decode_state *state,uint8_t c); int mavlink_heartbeat(unsigned char *frame,int *outlen); -int stream_as_mavlink(int sequence_number,int startP,int endP, - const unsigned char *data,int count, - unsigned char *frame,int *outlen); +int mavlink_encode_packet(struct overlay_interface *interface); #endif // __SERVALD_SERVALD_H diff --git a/slip.c b/slip.c index 66395cb5..948e2c67 100644 --- a/slip.c +++ b/slip.c @@ -86,44 +86,6 @@ int slip_encode(int format, const unsigned char *src, int src_bytes, unsigned char *dst, int dst_len) { switch(format) { - case SLIP_FORMAT_MAVLINK: - { - int i; - int dst_offset=0; - // Radio frames are limited to 252 bytes. - // MAVLink header takes 7 bytes - // Reed-Solomon parity takes 32 bytes - // That leaves a maximum size of 252-39=213 bytes - for(i=0;isrc_bytes) slice_bytes=src_bytes-i; - if (dst_offset+slice_bytes+6+2>=dst_len) { - if (config.debug.mavlink) - DEBUGF("Would overflow output buffer."); - return -1; - } else { - int startP=i?0:1; - int endP=i+slice_bytes==src_bytes?1:0; - stream_as_mavlink(0,startP,endP,&src[i],slice_bytes,&dst[dst_offset],&slice_len); - dst_offset+=slice_len; - } - } - if (config.debug.mavlink) { - DEBUGF("Wrote %d byte packet as MAVLink frames",src_bytes); - } - if (config.debug.mavlink_payloads||config.debug.interactive_io) { - DEBUG_packet_visualise("Packet Sent",src,src_bytes); - } - if (config.debug.interactive_io) { - fprintf(stderr,"Press ENTER to continue..."); fflush(stderr); - char buffer[80]; - if (!fgets(buffer,80,stdin)) - FATAL_perror("calling fgets"); - } - return dst_offset; - } - break; case SLIP_FORMAT_SLIP: { int offset=0; diff --git a/tests/routing b/tests/routing index 2c12183a..243a7ee6 100755 --- a/tests/routing +++ b/tests/routing @@ -235,6 +235,7 @@ setup_simulate_extender() { set debug.throttling on \ set debug.packetradio on \ set interfaces.1.type CATEAR \ + set interfaces.1.mdp_tick_ms 5000 \ set interfaces.1.socket_type STREAM \ set interfaces.1.encapsulation SINGLE \ set interfaces.1.point_to_point on \