Add tx throttling for packet radios

avoids missing data due to lack of flow control.
This commit is contained in:
gardners 2013-08-27 20:43:56 +09:30 committed by Jeremy Lakeman
parent 2b1ec5232c
commit 0cbebedc77
5 changed files with 76 additions and 19 deletions

View File

@ -260,6 +260,7 @@ ATOM(bool_t, rhizome_rx, 0, boolean,, "")
ATOM(bool_t, rhizome_ads, 0, boolean,, "") ATOM(bool_t, rhizome_ads, 0, boolean,, "")
ATOM(bool_t, rhizome_nohttptx, 0, boolean,, "") ATOM(bool_t, rhizome_nohttptx, 0, boolean,, "")
ATOM(bool_t, rhizome_mdp_rx, 0, boolean,, "") ATOM(bool_t, rhizome_mdp_rx, 0, boolean,, "")
ATOM(bool_t, throttling, 0, boolean,, "")
ATOM(bool_t, meshms, 0, boolean,, "") ATOM(bool_t, meshms, 0, boolean,, "")
ATOM(bool_t, manifests, 0, boolean,, "") ATOM(bool_t, manifests, 0, boolean,, "")
ATOM(bool_t, vomp, 0, boolean,, "") ATOM(bool_t, vomp, 0, boolean,, "")
@ -451,6 +452,8 @@ ATOM(bool_t, debug, 0, boolean,, "If true, log details
ATOM(bool_t, point_to_point, 0, boolean,, "If true, assume there will only be two devices on this interface") ATOM(bool_t, point_to_point, 0, boolean,, "If true, assume there will only be two devices on this interface")
ATOM(bool_t, ctsrts, 0, boolean,, "If true, enable CTS/RTS hardware handshaking") ATOM(bool_t, ctsrts, 0, boolean,, "If true, enable CTS/RTS hardware handshaking")
ATOM(int32_t, uartbps, 57600, int32_rs232baudrate,, "Speed of serial UART link speed (which may be different to serial device link speed)") ATOM(int32_t, uartbps, 57600, int32_rs232baudrate,, "Speed of serial UART link speed (which may be different to serial device link speed)")
ATOM(int32_t, throttle, 0, int32_nonneg,, "Limit transmit speed of serial interface (bytes per second)")
ATOM(int32_t, burst_size, 0, int32_nonneg,, "Write no more than this many bytes at a time to a serial interface")
END_STRUCT END_STRUCT
ARRAY(interface_list, NO_DUPLICATES) ARRAY(interface_list, NO_DUPLICATES)

View File

@ -378,6 +378,9 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
interface->ctsrts = ifconfig->ctsrts; interface->ctsrts = ifconfig->ctsrts;
set_destination_ref(&interface->destination, NULL); set_destination_ref(&interface->destination, NULL);
interface->destination = new_destination(interface, ifconfig->encapsulation); interface->destination = new_destination(interface, ifconfig->encapsulation);
interface->throttle_bytes_per_second = ifconfig->throttle;
interface->throttle_burst_write_size = ifconfig->burst_size;
/* Pick a reasonable default MTU. /* Pick a reasonable default MTU.
This will ultimately get tuned by the bandwidth and other properties of the interface */ This will ultimately get tuned by the bandwidth and other properties of the interface */
interface->mtu = 1200; interface->mtu = 1200;
@ -717,27 +720,52 @@ static void interface_read_stream(struct overlay_interface *interface){
} }
static void write_stream_buffer(overlay_interface *interface){ static void write_stream_buffer(overlay_interface *interface){
if (interface->tx_bytes_pending>0) { time_ms_t now = gettime_ms();
int bytes_allowed=interface->tx_bytes_pending;
if (bytes_allowed>0 && interface->next_tx_allowed < now) {
// Throttle output to a prescribed bit-rate
// first, reduce the number of bytes based on the configured burst size
if (interface->throttle_burst_write_size && bytes_allowed > interface->throttle_burst_write_size)
bytes_allowed = interface->throttle_burst_write_size;
if (config.debug.packetradio)
DEBUGF("Trying to write %d bytes",bytes_allowed);
int written=write(interface->alarm.poll.fd,interface->txbuffer, int written=write(interface->alarm.poll.fd,interface->txbuffer,
interface->tx_bytes_pending); bytes_allowed);
if (config.debug.packetradio) DEBUGF("Trying to write %d bytes",
interface->tx_bytes_pending);
if (written>0) { if (written>0) {
interface->tx_bytes_pending-=written; interface->tx_bytes_pending-=written;
bcopy(&interface->txbuffer[written],&interface->txbuffer[0], bcopy(&interface->txbuffer[written],&interface->txbuffer[0],
interface->tx_bytes_pending); interface->tx_bytes_pending);
if (config.debug.packetradio) DEBUGF("Wrote %d bytes (%d left pending)", if (config.debug.packetradio)
written,interface->tx_bytes_pending); DEBUGF("Wrote %d bytes (%d left pending)", written, interface->tx_bytes_pending);
// Now when are we allowed to send more?
if (interface->throttle_bytes_per_second>0) {
int delay = written*1000/interface->throttle_bytes_per_second;
if (config.debug.throttling)
DEBUGF("Throttling for %dms.",delay);
interface->next_tx_allowed = now + delay;
}
} else { } else {
if (config.debug.packetradio) DEBUGF("Failed to write any data"); if (config.debug.packetradio)
DEBUGF("Failed to write any data");
} }
} }
if (interface->tx_bytes_pending>0) { if (interface->tx_bytes_pending>0){
// more to write, so keep POLLOUT flag if (interface->next_tx_allowed > now){
interface->alarm.poll.events|=POLLOUT; // We can't write now, so clear POLLOUT flag
interface->alarm.poll.events&=~POLLOUT;
// set the interface alarm to trigger another write
interface->alarm.alarm = interface->next_tx_allowed;
interface->alarm.deadline = interface->alarm.alarm+10;
} else {
// more to write, so set the POLLOUT flag
interface->alarm.poll.events|=POLLOUT;
}
} else { } else {
// nothing more to write, so clear POLLOUT flag // Nothing to write, so clear POLLOUT flag
interface->alarm.poll.events&=~POLLOUT; interface->alarm.poll.events&=~POLLOUT;
// try to empty another packet from the queue ASAP // try to empty another packet from the queue ASAP
overlay_queue_schedule_next(gettime_ms()); overlay_queue_schedule_next(gettime_ms());
@ -749,14 +777,15 @@ static void write_stream_buffer(overlay_interface *interface){
static void overlay_interface_poll(struct sched_ent *alarm) static void overlay_interface_poll(struct sched_ent *alarm)
{ {
struct overlay_interface *interface = (overlay_interface *)alarm; struct overlay_interface *interface = (overlay_interface *)alarm;
time_ms_t now = gettime_ms();
if (alarm->poll.revents==0){ if (alarm->poll.revents==0){
alarm->alarm=-1; alarm->alarm=-1;
time_ms_t now = gettime_ms();
if (interface->state==INTERFACE_STATE_UP if (interface->state==INTERFACE_STATE_UP
&& interface->destination->tick_ms>0 && interface->destination->tick_ms>0
&& interface->send_broadcasts){ && interface->send_broadcasts
&& interface->tx_bytes_pending<=0){
if (now >= interface->destination->last_tx+interface->destination->tick_ms) if (now >= interface->destination->last_tx+interface->destination->tick_ms)
overlay_send_tick_packet(interface->destination); overlay_send_tick_packet(interface->destination);
@ -766,8 +795,10 @@ static void overlay_interface_poll(struct sched_ent *alarm)
} }
switch(interface->socket_type){ switch(interface->socket_type){
case SOCK_DGRAM:
case SOCK_STREAM: case SOCK_STREAM:
write_stream_buffer(interface);
break;
case SOCK_DGRAM:
break; break;
case SOCK_FILE: case SOCK_FILE:
interface_read_file(interface); interface_read_file(interface);
@ -786,6 +817,12 @@ static void overlay_interface_poll(struct sched_ent *alarm)
switch(interface->socket_type){ switch(interface->socket_type){
case SOCK_STREAM: case SOCK_STREAM:
write_stream_buffer(interface); write_stream_buffer(interface);
if (alarm->alarm!=-1 && interface->state==INTERFACE_STATE_UP) {
if (alarm->alarm < now)
alarm->alarm = now;
unschedule(alarm);
schedule(alarm);
}
break; break;
case SOCK_DGRAM: case SOCK_DGRAM:
case SOCK_FILE: case SOCK_FILE:
@ -865,7 +902,10 @@ overlay_broadcast_ensemble(struct network_destination *destination,
interface->tx_bytes_pending=out_len; interface->tx_bytes_pending=out_len;
write_stream_buffer(interface); write_stream_buffer(interface);
if (interface->alarm.alarm!=-1){
unschedule(&interface->alarm);
schedule(&interface->alarm);
}
return 0; return 0;
} }

View File

@ -288,6 +288,8 @@ overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
int i; int i;
for(i=0;i<frame->destination_count;i++) for(i=0;i<frame->destination_count;i++)
{ {
if (frame->destinations[i].destination->interface->tx_bytes_pending>0)
continue;
time_ms_t next_packet = limit_next_allowed(&frame->destinations[i].destination->transfer_limit); time_ms_t next_packet = limit_next_allowed(&frame->destinations[i].destination->transfer_limit);
if (frame->destinations[i].transmit_time){ if (frame->destinations[i].transmit_time){
time_ms_t delay_until = frame->destinations[i].transmit_time + frame->destinations[i].destination->resend_delay; time_ms_t delay_until = frame->destinations[i].transmit_time + frame->destinations[i].destination->resend_delay;

View File

@ -444,6 +444,11 @@ typedef struct overlay_interface {
unsigned char txbuffer[OVERLAY_INTERFACE_RX_BUFFER_SIZE]; unsigned char txbuffer[OVERLAY_INTERFACE_RX_BUFFER_SIZE];
int tx_bytes_pending; int tx_bytes_pending;
// Throttle TX rate if required (stream interfaces only for now)
uint32_t throttle_bytes_per_second;
uint32_t throttle_burst_write_size;
uint64_t next_tx_allowed;
struct slip_decode_state slip_decode_state; struct slip_decode_state slip_decode_state;
// copy of ifconfig flags // copy of ifconfig flags

View File

@ -232,16 +232,23 @@ setup_simulate_extender() {
set interfaces.1.file "$END2" set interfaces.1.file "$END2"
foreach_instance +A +B \ foreach_instance +A +B \
executeOk_servald config \ executeOk_servald config \
set debug.throttling on \
set debug.packetradio on \
set interfaces.1.type CATEAR \ set interfaces.1.type CATEAR \
set interfaces.1.socket_type STREAM \ set interfaces.1.socket_type STREAM \
set interfaces.1.encapsulation SINGLE \ set interfaces.1.encapsulation SINGLE \
set interfaces.1.point_to_point on \ set interfaces.1.point_to_point on \
set interfaces.1.packet_interval 5000 set interfaces.1.packet_interval 5000 \
set interfaces.1.burst_size 100 \
set interfaces.1.throttle 1000
foreach_instance +A +B start_routing_instance foreach_instance +A +B start_routing_instance
} }
test_simulate_extender() { test_simulate_extender() {
wait_until path_exists +A +B wait_until path_exists +A +B
wait_until path_exists +B +A wait_until path_exists +B +A
set_instance +A
executeOk_servald mdp ping --timeout=3 $SIDB 1
tfw_cat --stdout --stderr
} }
teardown_simulate_extender() { teardown_simulate_extender() {
teardown teardown