Add per-interface packet transmit limits

This commit is contained in:
Jeremy Lakeman 2012-12-13 17:50:09 +10:30
parent 7be03f15e7
commit e517e3a59e
6 changed files with 166 additions and 56 deletions

View File

@ -439,21 +439,7 @@ static int cf_opt_network_interface_legacy(struct config_network_interface *nifp
}
}
if (*p == ':') {
const char *const speed = p + 1;
p = endtext;
len = p - speed;
if (len) {
char buf[len + 1];
strncpy(buf, speed, len)[len] = '\0';
int result = cf_opt_uint64_scaled(&nif.speed, buf);
switch (result) {
case CFERROR: return CFERROR;
case CFOK: break;
default: return result; // "Invalid interface speed"
}
if (nif.speed < 1)
return CFINVALID; // "Interfaces must be capable of at least 1 bit per second"
}
}
if (*p)
return CFINVALID; // "Extra junk at end of interface specification"

View File

@ -227,7 +227,8 @@ ATOM(int, uid, -1, cf_opt_int,, "Allowed UID for monito
END_STRUCT
STRUCT(mdp_iftype)
ATOM(uint32_t, tick_ms, 0, cf_opt_uint32_nonzero,, "Tick interval for this interface type")
ATOM(uint32_t, tick_ms, -1, cf_opt_uint32_nonzero,, "Tick interval for this interface type")
ATOM(int, packet_interval, -1, cf_opt_int,, "Minimum interval between packets in microseconds")
END_STRUCT
ARRAY(mdp_iftypelist, NO_DUPLICATES)
@ -336,8 +337,9 @@ ATOM(struct in_addr, dummy_netmask, (struct in_addr){htonl(0xFFFFFF00)
ATOM(int, dummy_filter_broadcasts, 0, cf_opt_int_boolean,, "If true, drop all incoming broadcast packets")
ATOM(short, type, OVERLAY_INTERFACE_WIFI, cf_opt_interface_type,, "Type of network interface")
ATOM(uint16_t, port, RHIZOME_HTTP_PORT, cf_opt_uint16_nonzero,, "Port number for network interface")
ATOM(uint64_t, speed, 1000000, cf_opt_uint64_scaled,, "Speed in bits per second")
ATOM(int, packet_interval, -1, cf_opt_int,, "Minimum interval between packets in microseconds")
ATOM(int, mdp_tick_ms, -1, cf_opt_int32_nonneg,, "Override MDP tick interval for this interface")
ATOM(char, send_broadcasts, 1, cf_opt_char_boolean,, "If false, don't send any broadcast packets")
ATOM(int, default_route, 0, cf_opt_int_boolean,, "If true, use this interface as a default route")
END_STRUCT

View File

@ -335,49 +335,69 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
This will ultimately get tuned by the bandwidth and other properties of the interface */
interface->mtu=1200;
interface->state=INTERFACE_STATE_DOWN;
interface->bits_per_second = ifconfig->speed;
interface->port= ifconfig->port;
interface->type= ifconfig->type;
interface->default_route = ifconfig->default_route;
DEBUGF("interface->default_route=%d",interface->default_route);
interface->last_tick_ms= -1; // not ticked yet
interface->alarm.poll.fd=0;
// How often do we announce ourselves on this interface?
int32_t tick_ms = ifconfig->mdp_tick_ms;
if (tick_ms < 0) {
int i = config_mdp_iftypelist__get(&config.mdp.iftype, &ifconfig->type);
if (i != -1)
tick_ms = config.mdp.iftype.av[i].value.tick_ms;
}
if (tick_ms < 0) {
switch (ifconfig->type) {
interface->tick_ms=-1;
int packet_interval=-1;
// hard coded defaults:
switch (ifconfig->type) {
case OVERLAY_INTERFACE_PACKETRADIO:
tick_ms = 15000;
interface->tick_ms = 15000;
packet_interval = 1000;
break;
case OVERLAY_INTERFACE_ETHERNET:
tick_ms = 500;
interface->tick_ms = 500;
packet_interval = 100;
break;
case OVERLAY_INTERFACE_WIFI:
tick_ms = 500;
interface->tick_ms = 500;
packet_interval = 400;
break;
case OVERLAY_INTERFACE_UNKNOWN:
tick_ms = 500;
interface->tick_ms = 500;
packet_interval = 100;
break;
default:
return WHYF("Unsupported interface type %d", ifconfig->type);
}
// configurable defaults per interface
{
int i = config_mdp_iftypelist__get(&config.mdp.iftype, &ifconfig->type);
if (i != -1){
if (config.mdp.iftype.av[i].value.tick_ms>=0)
interface->tick_ms = config.mdp.iftype.av[i].value.tick_ms;
if (config.mdp.iftype.av[i].value.packet_interval>=0)
packet_interval=config.mdp.iftype.av[i].value.packet_interval;
}
}
assert(tick_ms >= 0);
interface->tick_ms = tick_ms;
// disable announcements and other broadcasts if tick_ms=0.
if (interface->tick_ms > 0)
interface->send_broadcasts=1;
else{
// specific value for this interface
if (ifconfig->mdp_tick_ms>=0)
interface->tick_ms = ifconfig->mdp_tick_ms;
if (ifconfig->packet_interval>=0)
packet_interval=ifconfig->packet_interval;
interface->send_broadcasts=ifconfig->send_broadcasts;
if (packet_interval<0)
return WHYF("Invalid packet interval %d specified for interface %s", packet_interval, name);
if (packet_interval==0){
INFOF("Interface %s is not sending any traffic!", name);
interface->send_broadcasts=0;
interface->tick_ms=0;
}else if (!interface->send_broadcasts){
INFOF("Interface %s is not sending any broadcast traffic!", name);
// no broadcast traffic implies no ticks
interface->tick_ms=0;
}else if (interface->tick_ms==0)
INFOF("Interface %s is running tickless", name);
}
if (interface->tick_ms<0)
return WHYF("Invalid tick interval %d specified for interface %s", interface->tick_ms, name);
limit_init(&interface->transfer_limit, packet_interval);
if (ifconfig->dummy[0]) {
interface->fileP = 1;
char dummyfile[1024];
@ -436,6 +456,7 @@ overlay_interface_init(const char *name, struct in_addr src_addr, struct in_addr
if (overlay_interface_init_socket(overlay_interface_count))
return WHY("overlay_interface_init_socket() failed");
}
INFOF("Allowing a maximum of %d packets every %lldms", interface->transfer_limit.burst_size, interface->transfer_limit.burst_length);
overlay_interface_count++;
return 0;
@ -451,8 +472,10 @@ static void overlay_interface_poll(struct sched_ent *alarm)
// tick the interface
time_ms_t now = gettime_ms();
int i = (interface - overlay_interfaces);
overlay_tick_interface(i, now);
alarm->alarm=now+interface->tick_ms;
if (overlay_tick_interface(i, now))
alarm->alarm=limit_next_allowed(&overlay_interfaces[i].transfer_limit);
else
alarm->alarm=now+interface->tick_ms;
alarm->deadline=alarm->alarm+interface->tick_ms/2;
schedule(alarm);
}

View File

@ -5,11 +5,58 @@
#include "overlay_buffer.h"
#include "overlay_packet.h"
#define MIN_BURST_LENGTH 5000
struct probe_contents{
struct sockaddr_in addr;
unsigned char interface;
};
static void update_limit_state(struct limit_state *state, time_ms_t now){
if (state->next_interval > now || state->burst_size==0){
return;
}
if (state->next_interval + state->burst_length>now)
state->next_interval+=state->burst_length;
else
state->next_interval=now + state->burst_length;
state->sent = 0;
}
/* When should we next allow this thing to occur? */
time_ms_t limit_next_allowed(struct limit_state *state){
time_ms_t now = gettime_ms();
update_limit_state(state, now);
if (state->sent < state->burst_size)
return now;
return state->next_interval;
}
/* Can we do this now? if so, track it */
int limit_is_allowed(struct limit_state *state){
time_ms_t now = gettime_ms();
update_limit_state(state, now);
if (state->sent >= state->burst_size){
return -1;
}
state->sent ++;
return 0;
}
/* Initialise burst size and length based on the number we can do in one MIN_BURST */
int limit_init(struct limit_state *state, int rate_micro_seconds){
if (rate_micro_seconds==0){
state->burst_size=0;
}else{
state->burst_size = (MIN_BURST_LENGTH / rate_micro_seconds)+1;
state->burst_length = (state->burst_size * rate_micro_seconds) / 1000.0;
}
return 0;
}
// quick test to make sure the specified route is valid.
int subscriber_is_reachable(struct subscriber *subscriber){
if (!subscriber)

View File

@ -296,6 +296,23 @@ overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
// when is the next packet from this queue due?
send_time=queue->first->enqueued_at + queue->transmit_delay;
time_ms_t next_allowed_packet=0;
if (frame->interface){
next_allowed_packet = limit_next_allowed(&frame->interface->transfer_limit);
}else{
// check all interfaces
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP)
continue;
time_ms_t next_packet = limit_next_allowed(&overlay_interfaces[i].transfer_limit);
if (next_allowed_packet==0||next_packet < next_allowed_packet)
next_allowed_packet = next_packet;
}
}
if (next_allowed_packet > send_time)
send_time = next_allowed_packet;
if (next_packet.alarm==0 || send_time < next_packet.alarm){
next_packet.alarm=send_time;
@ -329,10 +346,15 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
frame = overlay_queue_remove(queue, frame);
continue;
}
/* Note, once we queue a broadcast packet we are committed to sending it out every interface,
even if we hear it from somewhere else in the mean time
*/
// quickly skip payloads that have no chance of fitting
if (packet->buffer && ob_position(frame->payload) > ob_remaining(packet->buffer))
goto skip;
if (!frame->destination_resolved){
frame->next_hop = frame->destination;
@ -383,27 +405,37 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
}else{
// find an interface that we haven't broadcast on yet
frame->interface = NULL;
int i;
int i, keep=0;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state==INTERFACE_STATE_UP
&& !frame->broadcast_sent_via[i]){
frame->interface = &overlay_interfaces[i];
frame->recvaddr = overlay_interfaces[i].broadcast_address;
break;
}
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP || frame->broadcast_sent_via[i])
continue;
keep=1;
time_ms_t next_allowed = limit_next_allowed(&overlay_interfaces[i].transfer_limit);
if (next_allowed > now)
continue;
frame->interface = &overlay_interfaces[i];
frame->recvaddr = overlay_interfaces[i].broadcast_address;
break;
}
if (!frame->interface){
if (!keep){
// huh, we don't need to send it anywhere?
frame = overlay_queue_remove(queue, frame);
continue;
}
if (!frame->interface)
goto skip;
}
}
}
if (!packet->buffer){
// can we send a packet on this interface now?
if (limit_is_allowed(&frame->interface->transfer_limit))
goto skip;
if (frame->source_full)
my_subscriber->send_full=1;
overlay_init_packet(packet, frame->next_hop, frame->unicast, frame->interface, frame->recvaddr, 0);
@ -535,10 +567,13 @@ overlay_tick_interface(int i, time_ms_t now) {
struct outgoing_packet packet;
IN();
/* An interface with no speed budget is for listening only, so doesn't get ticked */
if (overlay_interfaces[i].bits_per_second<1
|| overlay_interfaces[i].state!=INTERFACE_STATE_UP) {
RETURN(0);
if (overlay_interfaces[i].state!=INTERFACE_STATE_UP) {
RETURN(-1);
}
if (limit_is_allowed(&overlay_interfaces[i].transfer_limit)){
WARN("Throttling has blocked a tick packet");
RETURN(-1);
}
if (config.debug.overlayinterfaces) DEBUGF("Ticking interface #%d",i);

View File

@ -324,6 +324,18 @@ struct sched_ent{
int _poll_index;
};
struct limit_state{
// length of time for a burst
time_ms_t burst_length;
// how many in a burst
int burst_size;
// how many have we sent in this burst so far
int sent;
// when can we allow another burst
time_ms_t next_interval;
};
struct overlay_buffer;
struct overlay_frame;
struct broadcast;
@ -343,7 +355,6 @@ typedef struct overlay_interface {
int recv_offset;
int fileP; // dummyP
int drop_broadcasts;
int bits_per_second;
int port;
int type;
/* Number of milli-seconds per tick for this interface, which is basically related to the
@ -370,6 +381,8 @@ typedef struct overlay_interface {
int sequence_number;
/* XXX need recent packet buffers to support the above */
struct limit_state transfer_limit;
/* We need to make sure that interface name and broadcast address is unique for all interfaces that are UP.
We bind a separate socket per interface / broadcast address Broadcast address and netmask, if known
We really only case about distinct broadcast addresses on interfaces.
@ -735,6 +748,10 @@ int overlay_mdp_service_stun_req(overlay_mdp_frame *mdp);
int overlay_mdp_service_stun(overlay_mdp_frame *mdp);
int overlay_mdp_service_probe(overlay_mdp_frame *mdp);
time_ms_t limit_next_allowed(struct limit_state *state);
int limit_is_allowed(struct limit_state *state);
int limit_init(struct limit_state *state, int rate_micro_seconds);
/* function timing routines */
int fd_clearstats();
int fd_showstats();