Move payload queueing functions to separate .c file

This commit is contained in:
Jeremy Lakeman 2012-11-20 16:41:06 +10:30
parent c84b7e5db4
commit 36cee7e9bc
6 changed files with 540 additions and 546 deletions

View File

@ -74,8 +74,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
int overlayMode=0;
overlay_txqueue overlay_tx[OQ_MAX];
keyring_file *keyring=NULL;
int overlayServerMode()
@ -94,28 +92,7 @@ int overlayServerMode()
/* put initial identity in if we don't have any visible */
keyring_seed(keyring);
/* Set default congestion levels for queues */
int i;
for(i=0;i<OQ_MAX;i++) {
overlay_tx[i].maxLength=100;
overlay_tx[i].latencyTarget=1000; /* Keep packets in queue for 1 second by default */
overlay_tx[i].transmit_delay=5; /* Hold onto packets for 10ms before trying to send a full packet */
overlay_tx[i].grace_period=100; /* Delay sending a packet for up to 100ms if servald has other processing to do */
}
/* expire voice/video call packets much sooner, as they just aren't any use if late */
overlay_tx[OQ_ISOCHRONOUS_VOICE].latencyTarget=200;
overlay_tx[OQ_ISOCHRONOUS_VIDEO].latencyTarget=200;
/* try to send voice packets without any delay, and before other background processing */
overlay_tx[OQ_ISOCHRONOUS_VOICE].transmit_delay=0;
overlay_tx[OQ_ISOCHRONOUS_VOICE].grace_period=0;
/* Routing payloads, ack's and nacks need to be sent immediately */
overlay_tx[OQ_MESH_MANAGEMENT].transmit_delay=0;
/* opportunistic traffic can be significantly delayed */
overlay_tx[OQ_OPPORTUNISTIC].transmit_delay=200;
overlay_tx[OQ_OPPORTUNISTIC].grace_period=500;
overlay_queue_init();
/* Get the set of socket file descriptors we need to monitor.
Note that end-of-file will trigger select(), so we cannot run select() if we

View File

@ -56,28 +56,12 @@ struct sched_ent sock_any;
struct sockaddr_in sock_any_addr;
struct profile_total sock_any_stats;
struct outgoing_packet{
overlay_interface *interface;
int i;
struct subscriber *unicast_subscriber;
int unicast;
int add_advertisements;
struct sockaddr_in dest;
struct overlay_buffer *buffer;
};
struct sched_ent next_packet;
struct profile_total send_packet;
static int overlay_tick_interface(int i, time_ms_t now);
static void overlay_interface_poll(struct sched_ent *alarm);
static void logServalPacket(int level, struct __sourceloc __whence, const char *message, const unsigned char *packet, size_t len);
static long long parse_quantity(char *q);
#define DEBUG_packet_visualise(M,P,N) logServalPacket(LOG_LEVEL_DEBUG, __WHENCE__, (M), (P), (N))
unsigned char magic_header[]={/* Magic */ 'O',0x10,
/* Version */ 0x00,0x01};
static int overlay_interface_type(char *s)
@ -89,6 +73,17 @@ static int overlay_interface_type(char *s)
return WHY("Invalid interface type -- consider using 'wifi','ethernet' or 'other'");
}
static long long
parse_quantity(char *q)
{
if (strlen(q) >= 80)
return WHY("quantity string >=80 characters");
long long result;
if (str_to_ll_scaled(q, 10, &result, NULL))
return result;
return WHYF("Illegal quantity: %s", alloca_str_toprint(q));
}
int overlay_interface_arg(char *arg)
{
/* Parse an interface argument, of the form:
@ -694,7 +689,7 @@ void overlay_dummy_poll(struct sched_ent *alarm)
return ;
}
static int
int
overlay_broadcast_ensemble(int interface_number,
struct sockaddr_in *recipientaddr,
unsigned char *bytes,int len)
@ -924,377 +919,6 @@ void overlay_interface_discover(struct sched_ent *alarm){
return;
}
/* remove and free a payload from the queue */
static struct overlay_frame *
overlay_queue_remove(overlay_txqueue *queue, struct overlay_frame *frame){
struct overlay_frame *prev = frame->prev;
struct overlay_frame *next = frame->next;
if (prev)
prev->next = next;
else if(frame == queue->first)
queue->first = next;
if (next)
next->prev = prev;
else if(frame == queue->last)
queue->last = prev;
queue->length--;
op_free(frame);
return next;
}
#if 0 /* unused */
static int
overlay_queue_dump(overlay_txqueue *q)
{
strbuf b = strbuf_alloca(8192);
struct overlay_frame *f;
strbuf_sprintf(b,"overlay_txqueue @ 0x%p\n",q);
strbuf_sprintf(b," length=%d\n",q->length);
strbuf_sprintf(b," maxLenght=%d\n",q->maxLength);
strbuf_sprintf(b," latencyTarget=%d milli-seconds\n",q->latencyTarget);
strbuf_sprintf(b," first=%p\n",q->first);
f=q->first;
while(f) {
strbuf_sprintf(b," %p: ->next=%p, ->prev=%p\n",
f,f->next,f->prev);
if (f==f->next) {
strbuf_sprintf(b," LOOP!\n"); break;
}
f=f->next;
}
strbuf_sprintf(b," last=%p\n",q->last);
f=q->last;
while(f) {
strbuf_sprintf(b," %p: ->next=%p, ->prev=%p\n",
f,f->next,f->prev);
if (f==f->prev) {
strbuf_sprintf(b," LOOP!\n"); break;
}
f=f->prev;
}
DEBUG(strbuf_str(b));
return 0;
}
#endif // 0
static void
overlay_init_packet(struct outgoing_packet *packet, overlay_interface *interface, int tick){
packet->interface = interface;
packet->i = (interface - overlay_interfaces);
packet->dest=interface->broadcast_address;
packet->buffer=ob_new();
packet->add_advertisements=1;
ob_limitsize(packet->buffer, packet->interface->mtu);
ob_append_bytes(packet->buffer,magic_header,4);
overlay_address_clear();
if (tick){
/* 1. Send announcement about ourselves, including one SID that we host if we host more than one SID
(the first SID we host becomes our own identity, saving a little bit of data here).
*/
overlay_add_selfannouncement(packet->i, packet->buffer);
}else{
// add a badly formatted dummy self announce payload to tell people we sent this.
ob_append_byte(packet->buffer, OF_TYPE_SELFANNOUNCE);
ob_append_byte(packet->buffer, 1);
ob_append_rfs(packet->buffer, SID_SIZE + 2);
/* from me, to me, via me
(it's shorter than an actual broadcast,
and receivers wont try to process it
since its not going to have a payload body anyway) */
overlay_address_append_self(interface, packet->buffer);
overlay_address_set_sender(my_subscriber);
ob_append_byte(packet->buffer, OA_CODE_PREVIOUS);
ob_append_byte(packet->buffer, OA_CODE_PREVIOUS);
ob_patch_rfs(packet->buffer, COMPUTE_RFS_LENGTH);
}
}
// update the alarm time and return 1 if changed
static int
overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
int ret=0;
time_ms_t send_time;
// ignore packet if the destination is currently unreachable
if (frame->destination && subscriber_is_reachable(frame->destination)==REACHABLE_NONE)
return 0;
// when is the next packet from this queue due?
send_time=queue->first->enqueued_at + queue->transmit_delay;
if (next_packet.alarm==0 || send_time < next_packet.alarm){
next_packet.alarm=send_time;
ret = 1;
}
// how long can we wait if the server is busy?
send_time += queue->grace_period;
if (next_packet.deadline==0 || send_time < next_packet.deadline){
next_packet.deadline=send_time;
ret = 1;
}
if (!next_packet.function){
next_packet.function=overlay_send_packet;
send_packet.name="overlay_send_packet";
next_packet.stats=&send_packet;
}
return ret;
}
static void
overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, time_ms_t now){
struct overlay_frame *frame = queue->first;
// TODO stop when the packet is nearly full?
while(frame){
if (frame->enqueued_at + queue->latencyTarget < now){
DEBUGF("Dropping frame type %x for %s due to expiry timeout",
frame->type, frame->destination?alloca_tohex_sid(frame->destination->sid):"All");
frame = overlay_queue_remove(queue, frame);
continue;
}
/* Note, once we queue a broadcast packet we are committed to sending it out every interface,
even if we hear it from somewhere else in the mean time
*/
struct subscriber *next_hop = frame->destination;
if (next_hop){
switch(subscriber_is_reachable(next_hop)){
case REACHABLE_NONE:
goto skip;
case REACHABLE_INDIRECT:
next_hop=next_hop->next_hop;
frame->sendBroadcast=0;
break;
case REACHABLE_DEFAULT_ROUTE:
next_hop=directory_service;
frame->sendBroadcast=0;
break;
case REACHABLE_DIRECT:
case REACHABLE_UNICAST:
frame->sendBroadcast=0;
break;
case REACHABLE_BROADCAST:
if (!frame->sendBroadcast){
if (frame->ttl>2)
frame->ttl=2;
frame->sendBroadcast=1;
if (is_all_matching(frame->broadcast_id.id, BROADCAST_LEN, 0)){
overlay_broadcast_generate_address(&frame->broadcast_id);
// mark it as already seen so we don't immediately retransmit it
overlay_broadcast_drop_check(&frame->broadcast_id);
}
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
frame->broadcast_sent_via[i]=0;
}
break;
}
}
if (!packet->buffer){
// use the interface of the first payload we find
if (frame->sendBroadcast){
// find an interface that we haven't broadcast on yet
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state==INTERFACE_STATE_UP
&& !frame->broadcast_sent_via[i]){
overlay_init_packet(packet, &overlay_interfaces[i], 0);
break;
}
}
if (!packet->buffer){
// oh dear, why is this broadcast still in the queue?
frame = overlay_queue_remove(queue, frame);
continue;
}
}else{
overlay_init_packet(packet, next_hop->interface, 0);
if (next_hop->reachable==REACHABLE_UNICAST){
packet->unicast_subscriber = next_hop;
packet->dest = next_hop->address;
packet->unicast=1;
}
}
}else{
// make sure this payload can be sent via this interface
if (frame->sendBroadcast){
if (frame->broadcast_sent_via[packet->i]){
goto skip;
}
}else{
if(packet->interface != next_hop->interface)
goto skip;
if (next_hop->reachable==REACHABLE_DIRECT && packet->unicast)
goto skip;
if (next_hop->reachable==REACHABLE_UNICAST &&
((!packet->unicast) ||
packet->dest.sin_addr.s_addr != next_hop->address.sin_addr.s_addr))
goto skip;
}
}
if (debug&DEBUG_OVERLAYFRAMES){
DEBUGF("Sending payload type %x len %d for %s via %s", frame->type, frame->payload->position,
frame->destination?alloca_tohex_sid(frame->destination->sid):"All",
frame->sendBroadcast?alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN):alloca_tohex_sid(next_hop->sid));
}
if (overlay_frame_append_payload(packet->interface, frame, next_hop, packet->buffer))
// payload was not queued
goto skip;
// don't send rhizome adverts if the packet contains a voice payload
if (frame->queue==OQ_ISOCHRONOUS_VOICE)
packet->add_advertisements=0;
// mark the payload as sent
int keep_payload = 0;
if (frame->sendBroadcast){
int i;
frame->broadcast_sent_via[packet->i]=1;
// check if there is still a broadcast to be sent
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state==INTERFACE_STATE_UP)
if (!frame->broadcast_sent_via[i]){
keep_payload=1;
break;
}
}
}else{
frame->send_copies --;
// ignore resend logic for unicast packets, where wifi gives better resilience
if (frame->send_copies>0 && !packet->unicast)
keep_payload=1;
}
if (!keep_payload){
frame = overlay_queue_remove(queue, frame);
continue;
}
skip:
// if we can't send the payload now, check when we should try
overlay_calc_queue_time(queue, frame);
frame = frame->next;
}
}
// fill a packet from our outgoing queues and send it
static int
overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) {
int i;
IN();
// while we're looking at queues, work out when to schedule another packet
unschedule(&next_packet);
next_packet.alarm=0;
next_packet.deadline=0;
for (i=0;i<OQ_MAX;i++){
overlay_txqueue *queue=&overlay_tx[i];
overlay_stuff_packet(packet, queue, now);
}
if (next_packet.alarm)
schedule(&next_packet);
if(packet->buffer){
// send the packet
if (packet->buffer->position>=HEADERFIELDS_LEN){
// stuff rhizome announcements at the last moment
if (packet->add_advertisements)
overlay_rhizome_add_advertisements(packet->i,packet->buffer);
if (debug&DEBUG_PACKETCONSTRUCTION)
dump("assembled packet",&packet->buffer->bytes[0],packet->buffer->position);
if (overlay_broadcast_ensemble(packet->i, &packet->dest, packet->buffer->bytes, packet->buffer->position)){
// sendto failed. We probably don't have a valid route
if (packet->unicast_subscriber){
set_reachable(packet->unicast_subscriber, REACHABLE_NONE);
}
}
}
ob_free(packet->buffer);
overlay_address_clear();
RETURN(1);
}
RETURN(0);
}
// when the queue timer elapses, send a packet
void overlay_send_packet(struct sched_ent *alarm){
struct outgoing_packet packet;
bzero(&packet, sizeof(struct outgoing_packet));
overlay_fill_send_packet(&packet, gettime_ms());
}
// update time for next alarm and reschedule
void overlay_update_queue_schedule(overlay_txqueue *queue, struct overlay_frame *frame){
if (overlay_calc_queue_time(queue, frame)){
unschedule(&next_packet);
schedule(&next_packet);
}
}
static int
overlay_tick_interface(int i, time_ms_t now) {
struct outgoing_packet packet;
IN();
/* An interface with no speed budget is for listening only, so doesn't get ticked */
if (overlay_interfaces[i].bits_per_second<1
|| overlay_interfaces[i].state!=INTERFACE_STATE_UP) {
RETURN(0);
}
if (debug&DEBUG_OVERLAYINTERFACES) DEBUGF("Ticking interface #%d",i);
// initialise the packet buffer
bzero(&packet, sizeof(struct outgoing_packet));
overlay_init_packet(&packet, &overlay_interfaces[i], 1);
/* Add advertisements for ROUTES */
overlay_route_add_advertisements(packet.interface, packet.buffer);
/* Stuff more payloads from queues and send it */
overlay_fill_send_packet(&packet, now);
RETURN(0);
}
static long long
parse_quantity(char *q)
{
if (strlen(q) >= 80)
return WHY("quantity string >=80 characters");
long long result;
if (str_to_ll_scaled(q, 10, &result, NULL))
return result;
return WHYF("Illegal quantity: %s", alloca_str_toprint(q));
}
static void
logServalPacket(int level, struct __sourceloc __whence, const char *message, const unsigned char *packet, size_t len) {
struct mallocbuf mb = STRUCT_MALLOCBUF_NULL;

View File

@ -156,26 +156,6 @@ cleanup:
return -1;
}
int dump_queue(char *msg,int q)
{
overlay_txqueue *qq=&overlay_tx[q];
DEBUGF("Contents of TX queue #%d (%s):",q,msg);
DEBUGF(" length=%d, maxLength=%d",qq->length,qq->maxLength);
struct overlay_frame *f=qq->first,*l=qq->last;
DEBUGF(" head of queue = %p, tail of queue = %p", f, l);
struct overlay_frame *n=f;
int count=0;
while(n) {
DEBUGF(" queue entry #%d : prev=%p, next=%p", count,n->prev,n->next);
if (n==n->next) {
WHY(" ERROR: loop in queue");
return -1;
}
n=n->next;
}
return 0;
}
int dump_payload(struct overlay_frame *p, char *message)
{
DEBUGF( "+++++\nFrame from %s to %s of type 0x%02x %s:",
@ -187,89 +167,6 @@ int dump_payload(struct overlay_frame *p, char *message)
return 0;
}
int overlay_payload_enqueue(struct overlay_frame *p)
{
/* Add payload p to queue q.
Queues get scanned from first to last, so we should append new entries
on the end of the queue.
Complain if there are too many frames in the queue.
*/
if (!p) return WHY("Cannot queue NULL");
if (p->destination){
int r = subscriber_is_reachable(p->destination);
if (r == REACHABLE_SELF || r == REACHABLE_NONE)
return WHYF("Destination %s is unreachable (%d)", alloca_tohex_sid(p->destination->sid), r);
}
if (p->queue>=OQ_MAX)
return WHY("Invalid queue specified");
overlay_txqueue *queue = &overlay_tx[p->queue];
if (debug&DEBUG_PACKETTX)
DEBUGF("Enqueuing packet for %s* (q[%d]length = %d)",
p->destination?alloca_tohex(p->destination->sid, 7): alloca_tohex(p->broadcast_id.id,BROADCAST_LEN),
p->queue, queue->length);
if (p->payload && p->payload->position > p->payload->sizeLimit){
// HACK, maybe should be done in each caller
// set the size of the payload based on the position written
p->payload->sizeLimit=p->payload->position;
}
if (queue->length>=queue->maxLength)
return WHYF("Queue #%d congested (size = %d)",p->queue,queue->maxLength);
if (p->send_copies<=0)
p->send_copies=1;
else if(p->send_copies>5)
return WHY("Too many copies requested");
if (!p->destination){
int i;
int drop=1;
// hook to allow for flooding via olsr
olsr_send(p);
// make sure there is an interface up that allows broadcasts
for(i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state==INTERFACE_STATE_UP
&& overlay_interfaces[i].send_broadcasts){
p->broadcast_sent_via[i]=0;
drop=0;
}else
p->broadcast_sent_via[i]=1;
}
// just drop it now
if (drop)
return -1;
p->sendBroadcast=1;
}
struct overlay_frame *l=queue->last;
if (l) l->next=p;
p->prev=l;
p->next=NULL;
p->enqueued_at=gettime_ms();
queue->last=p;
if (!queue->first) queue->first=p;
queue->length++;
overlay_update_queue_schedule(queue, p);
if (0) dump_queue("after",p->queue);
return 0;
}
int op_free(struct overlay_frame *p)
{
if (!p) return WHY("Asked to free NULL");

519
overlay_queue.c Normal file
View File

@ -0,0 +1,519 @@
#include "serval.h"
#include "overlay_buffer.h"
#include "overlay_packet.h"
#include "str.h"
typedef struct overlay_txqueue {
struct overlay_frame *first;
struct overlay_frame *last;
int length; /* # frames in queue */
int maxLength; /* max # frames in queue before we consider ourselves congested */
/* wait until first->enqueued_at+transmit_delay before trying to force the transmission of a packet */
int transmit_delay;
/* if servald is busy, wait this long before trying to force the transmission of a packet */
int grace_period;
/* Latency target in ms for this traffic class.
Frames older than the latency target will get dropped. */
int latencyTarget;
/* XXX Need to initialise these:
Real-time queue for voice (<200ms ?)
Real-time queue for video (<200ms ?) (lower priority than voice)
Ordinary service queue (<3 sec ?)
Rhizome opportunistic queue (infinity)
(Mesh management doesn't need a queue, as each overlay packet is tagged with some mesh management information)
*/
} overlay_txqueue;
overlay_txqueue overlay_tx[OQ_MAX];
unsigned char magic_header[]={/* Magic */ 'O',0x10,
/* Version */ 0x00,0x01};
struct outgoing_packet{
overlay_interface *interface;
int i;
struct subscriber *unicast_subscriber;
int unicast;
int add_advertisements;
struct sockaddr_in dest;
struct overlay_buffer *buffer;
};
struct sched_ent next_packet;
struct profile_total send_packet;
static void overlay_send_packet(struct sched_ent *alarm);
static void overlay_update_queue_schedule(overlay_txqueue *queue, struct overlay_frame *frame);
int overlay_queue_init(){
/* Set default congestion levels for queues */
int i;
for(i=0;i<OQ_MAX;i++) {
overlay_tx[i].maxLength=100;
overlay_tx[i].latencyTarget=1000; /* Keep packets in queue for 1 second by default */
overlay_tx[i].transmit_delay=5; /* Hold onto packets for 10ms before trying to send a full packet */
overlay_tx[i].grace_period=100; /* Delay sending a packet for up to 100ms if servald has other processing to do */
}
/* expire voice/video call packets much sooner, as they just aren't any use if late */
overlay_tx[OQ_ISOCHRONOUS_VOICE].latencyTarget=200;
overlay_tx[OQ_ISOCHRONOUS_VIDEO].latencyTarget=200;
/* try to send voice packets without any delay, and before other background processing */
overlay_tx[OQ_ISOCHRONOUS_VOICE].transmit_delay=0;
overlay_tx[OQ_ISOCHRONOUS_VOICE].grace_period=0;
/* Routing payloads, ack's and nacks need to be sent immediately */
overlay_tx[OQ_MESH_MANAGEMENT].transmit_delay=0;
/* opportunistic traffic can be significantly delayed */
overlay_tx[OQ_OPPORTUNISTIC].transmit_delay=200;
overlay_tx[OQ_OPPORTUNISTIC].grace_period=500;
return 0;
}
/* remove and free a payload from the queue */
static struct overlay_frame *
overlay_queue_remove(overlay_txqueue *queue, struct overlay_frame *frame){
struct overlay_frame *prev = frame->prev;
struct overlay_frame *next = frame->next;
if (prev)
prev->next = next;
else if(frame == queue->first)
queue->first = next;
if (next)
next->prev = prev;
else if(frame == queue->last)
queue->last = prev;
queue->length--;
op_free(frame);
return next;
}
static int
overlay_queue_dump(overlay_txqueue *q)
{
strbuf b = strbuf_alloca(8192);
struct overlay_frame *f;
strbuf_sprintf(b,"overlay_txqueue @ 0x%p\n",q);
strbuf_sprintf(b," length=%d\n",q->length);
strbuf_sprintf(b," maxLenght=%d\n",q->maxLength);
strbuf_sprintf(b," latencyTarget=%d milli-seconds\n",q->latencyTarget);
strbuf_sprintf(b," first=%p\n",q->first);
f=q->first;
while(f) {
strbuf_sprintf(b," %p: ->next=%p, ->prev=%p\n",
f,f->next,f->prev);
if (f==f->next) {
strbuf_sprintf(b," LOOP!\n"); break;
}
f=f->next;
}
strbuf_sprintf(b," last=%p\n",q->last);
f=q->last;
while(f) {
strbuf_sprintf(b," %p: ->next=%p, ->prev=%p\n",
f,f->next,f->prev);
if (f==f->prev) {
strbuf_sprintf(b," LOOP!\n"); break;
}
f=f->prev;
}
DEBUG(strbuf_str(b));
return 0;
}
int overlay_payload_enqueue(struct overlay_frame *p)
{
/* Add payload p to queue q.
Queues get scanned from first to last, so we should append new entries
on the end of the queue.
Complain if there are too many frames in the queue.
*/
if (!p) return WHY("Cannot queue NULL");
if (p->destination){
int r = subscriber_is_reachable(p->destination);
if (r == REACHABLE_SELF || r == REACHABLE_NONE)
return WHYF("Destination %s is unreachable (%d)", alloca_tohex_sid(p->destination->sid), r);
}
if (p->queue>=OQ_MAX)
return WHY("Invalid queue specified");
overlay_txqueue *queue = &overlay_tx[p->queue];
if (debug&DEBUG_PACKETTX)
DEBUGF("Enqueuing packet for %s* (q[%d]length = %d)",
p->destination?alloca_tohex(p->destination->sid, 7): alloca_tohex(p->broadcast_id.id,BROADCAST_LEN),
p->queue, queue->length);
if (p->payload && p->payload->position > p->payload->sizeLimit){
// HACK, maybe should be done in each caller
// set the size of the payload based on the position written
p->payload->sizeLimit=p->payload->position;
}
if (queue->length>=queue->maxLength)
return WHYF("Queue #%d congested (size = %d)",p->queue,queue->maxLength);
if (p->send_copies<=0)
p->send_copies=1;
else if(p->send_copies>5)
return WHY("Too many copies requested");
if (!p->destination){
int i;
int drop=1;
// hook to allow for flooding via olsr
olsr_send(p);
// make sure there is an interface up that allows broadcasts
for(i=0;i<OVERLAY_MAX_INTERFACES;i++){
if (overlay_interfaces[i].state==INTERFACE_STATE_UP
&& overlay_interfaces[i].send_broadcasts){
p->broadcast_sent_via[i]=0;
drop=0;
}else
p->broadcast_sent_via[i]=1;
}
// just drop it now
if (drop)
return -1;
p->sendBroadcast=1;
}
struct overlay_frame *l=queue->last;
if (l) l->next=p;
p->prev=l;
p->next=NULL;
p->enqueued_at=gettime_ms();
queue->last=p;
if (!queue->first) queue->first=p;
queue->length++;
overlay_update_queue_schedule(queue, p);
if (0) overlay_queue_dump(queue);
return 0;
}
static void
overlay_init_packet(struct outgoing_packet *packet, overlay_interface *interface, int tick){
packet->interface = interface;
packet->i = (interface - overlay_interfaces);
packet->dest=interface->broadcast_address;
packet->buffer=ob_new();
packet->add_advertisements=1;
ob_limitsize(packet->buffer, packet->interface->mtu);
ob_append_bytes(packet->buffer,magic_header,4);
overlay_address_clear();
if (tick){
/* 1. Send announcement about ourselves, including one SID that we host if we host more than one SID
(the first SID we host becomes our own identity, saving a little bit of data here).
*/
overlay_add_selfannouncement(packet->i, packet->buffer);
/* Add advertisements for ROUTES */
overlay_route_add_advertisements(packet->interface, packet->buffer);
}else{
// add a badly formatted dummy self announce payload to tell people we sent this.
ob_append_byte(packet->buffer, OF_TYPE_SELFANNOUNCE);
ob_append_byte(packet->buffer, 1);
ob_append_rfs(packet->buffer, SID_SIZE + 2);
/* from me, to me, via me
(it's shorter than an actual broadcast,
and receivers wont try to process it
since its not going to have a payload body anyway) */
overlay_address_append_self(interface, packet->buffer);
overlay_address_set_sender(my_subscriber);
ob_append_byte(packet->buffer, OA_CODE_PREVIOUS);
ob_append_byte(packet->buffer, OA_CODE_PREVIOUS);
ob_patch_rfs(packet->buffer, COMPUTE_RFS_LENGTH);
}
}
// update the alarm time and return 1 if changed
static int
overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
int ret=0;
time_ms_t send_time;
// ignore packet if the destination is currently unreachable
if (frame->destination && subscriber_is_reachable(frame->destination)==REACHABLE_NONE)
return 0;
// when is the next packet from this queue due?
send_time=queue->first->enqueued_at + queue->transmit_delay;
if (next_packet.alarm==0 || send_time < next_packet.alarm){
next_packet.alarm=send_time;
ret = 1;
}
// how long can we wait if the server is busy?
send_time += queue->grace_period;
if (next_packet.deadline==0 || send_time < next_packet.deadline){
next_packet.deadline=send_time;
ret = 1;
}
if (!next_packet.function){
next_packet.function=overlay_send_packet;
send_packet.name="overlay_send_packet";
next_packet.stats=&send_packet;
}
return ret;
}
static void
overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, time_ms_t now){
struct overlay_frame *frame = queue->first;
// TODO stop when the packet is nearly full?
while(frame){
if (frame->enqueued_at + queue->latencyTarget < now){
DEBUGF("Dropping frame type %x for %s due to expiry timeout",
frame->type, frame->destination?alloca_tohex_sid(frame->destination->sid):"All");
frame = overlay_queue_remove(queue, frame);
continue;
}
/* Note, once we queue a broadcast packet we are committed to sending it out every interface,
even if we hear it from somewhere else in the mean time
*/
struct subscriber *next_hop = frame->destination;
if (next_hop){
switch(subscriber_is_reachable(next_hop)){
case REACHABLE_NONE:
goto skip;
case REACHABLE_INDIRECT:
next_hop=next_hop->next_hop;
frame->sendBroadcast=0;
break;
case REACHABLE_DEFAULT_ROUTE:
next_hop=directory_service;
frame->sendBroadcast=0;
break;
case REACHABLE_DIRECT:
case REACHABLE_UNICAST:
frame->sendBroadcast=0;
break;
case REACHABLE_BROADCAST:
if (!frame->sendBroadcast){
if (frame->ttl>2)
frame->ttl=2;
frame->sendBroadcast=1;
if (is_all_matching(frame->broadcast_id.id, BROADCAST_LEN, 0)){
overlay_broadcast_generate_address(&frame->broadcast_id);
// mark it as already seen so we don't immediately retransmit it
overlay_broadcast_drop_check(&frame->broadcast_id);
}
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
frame->broadcast_sent_via[i]=0;
}
break;
}
}
if (!packet->buffer){
// use the interface of the first payload we find
if (frame->sendBroadcast){
// find an interface that we haven't broadcast on yet
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state==INTERFACE_STATE_UP
&& !frame->broadcast_sent_via[i]){
overlay_init_packet(packet, &overlay_interfaces[i], 0);
break;
}
}
if (!packet->buffer){
// oh dear, why is this broadcast still in the queue?
frame = overlay_queue_remove(queue, frame);
continue;
}
}else{
overlay_init_packet(packet, next_hop->interface, 0);
if (next_hop->reachable==REACHABLE_UNICAST){
packet->unicast_subscriber = next_hop;
packet->dest = next_hop->address;
packet->unicast=1;
}
}
}else{
// make sure this payload can be sent via this interface
if (frame->sendBroadcast){
if (frame->broadcast_sent_via[packet->i]){
goto skip;
}
}else{
if(packet->interface != next_hop->interface)
goto skip;
if (next_hop->reachable==REACHABLE_DIRECT && packet->unicast)
goto skip;
if (next_hop->reachable==REACHABLE_UNICAST &&
((!packet->unicast) ||
packet->dest.sin_addr.s_addr != next_hop->address.sin_addr.s_addr))
goto skip;
}
}
if (debug&DEBUG_OVERLAYFRAMES){
DEBUGF("Sending payload type %x len %d for %s via %s", frame->type, frame->payload->position,
frame->destination?alloca_tohex_sid(frame->destination->sid):"All",
frame->sendBroadcast?alloca_tohex(frame->broadcast_id.id, BROADCAST_LEN):alloca_tohex_sid(next_hop->sid));
}
if (overlay_frame_append_payload(packet->interface, frame, next_hop, packet->buffer))
// payload was not queued
goto skip;
// don't send rhizome adverts if the packet contains a voice payload
if (frame->queue==OQ_ISOCHRONOUS_VOICE)
packet->add_advertisements=0;
// mark the payload as sent
int keep_payload = 0;
if (frame->sendBroadcast){
int i;
frame->broadcast_sent_via[packet->i]=1;
// check if there is still a broadcast to be sent
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].state==INTERFACE_STATE_UP)
if (!frame->broadcast_sent_via[i]){
keep_payload=1;
break;
}
}
}else{
frame->send_copies --;
// ignore resend logic for unicast packets, where wifi gives better resilience
if (frame->send_copies>0 && !packet->unicast)
keep_payload=1;
}
if (!keep_payload){
frame = overlay_queue_remove(queue, frame);
continue;
}
skip:
// if we can't send the payload now, check when we should try
overlay_calc_queue_time(queue, frame);
frame = frame->next;
}
}
// fill a packet from our outgoing queues and send it
static int
overlay_fill_send_packet(struct outgoing_packet *packet, time_ms_t now) {
int i;
IN();
// while we're looking at queues, work out when to schedule another packet
unschedule(&next_packet);
next_packet.alarm=0;
next_packet.deadline=0;
for (i=0;i<OQ_MAX;i++){
overlay_txqueue *queue=&overlay_tx[i];
overlay_stuff_packet(packet, queue, now);
}
if (next_packet.alarm)
schedule(&next_packet);
if(packet->buffer){
// send the packet
if (packet->buffer->position>=HEADERFIELDS_LEN){
// stuff rhizome announcements at the last moment
if (packet->add_advertisements)
overlay_rhizome_add_advertisements(packet->i,packet->buffer);
if (debug&DEBUG_PACKETCONSTRUCTION)
dump("assembled packet",&packet->buffer->bytes[0],packet->buffer->position);
if (overlay_broadcast_ensemble(packet->i, &packet->dest, packet->buffer->bytes, packet->buffer->position)){
// sendto failed. We probably don't have a valid route
if (packet->unicast_subscriber){
set_reachable(packet->unicast_subscriber, REACHABLE_NONE);
}
}
}
ob_free(packet->buffer);
overlay_address_clear();
RETURN(1);
}
RETURN(0);
}
// when the queue timer elapses, send a packet
static void overlay_send_packet(struct sched_ent *alarm){
struct outgoing_packet packet;
bzero(&packet, sizeof(struct outgoing_packet));
overlay_fill_send_packet(&packet, gettime_ms());
}
// update time for next alarm and reschedule
static void overlay_update_queue_schedule(overlay_txqueue *queue, struct overlay_frame *frame){
if (overlay_calc_queue_time(queue, frame)){
unschedule(&next_packet);
schedule(&next_packet);
}
}
int
overlay_tick_interface(int i, time_ms_t now) {
struct outgoing_packet packet;
IN();
/* An interface with no speed budget is for listening only, so doesn't get ticked */
if (overlay_interfaces[i].bits_per_second<1
|| overlay_interfaces[i].state!=INTERFACE_STATE_UP) {
RETURN(0);
}
if (debug&DEBUG_OVERLAYINTERFACES) DEBUGF("Ticking interface #%d",i);
// initialise the packet buffer
bzero(&packet, sizeof(struct outgoing_packet));
overlay_init_packet(&packet, &overlay_interfaces[i], 1);
/* Stuff more payloads from queues and send it */
overlay_fill_send_packet(&packet, now);
RETURN(0);
}

View File

@ -329,6 +329,7 @@ struct sched_ent{
};
struct overlay_buffer;
struct overlay_frame;
#define STRUCT_SCHED_ENT_UNUSED ((struct sched_ent){NULL, NULL, NULL, NULL, {-1, 0, 0}, 0LL, 0LL, NULL, -1})
@ -398,35 +399,6 @@ extern overlay_interface overlay_interfaces[OVERLAY_MAX_INTERFACES];
extern int overlay_last_interface_number; // used to remember where a packet came from
extern unsigned int overlay_sequence_number;
typedef struct overlay_txqueue {
struct overlay_frame *first;
struct overlay_frame *last;
int length; /* # frames in queue */
int maxLength; /* max # frames in queue before we consider ourselves congested */
/* wait until first->enqueued_at+transmit_delay before trying to force the transmission of a packet */
int transmit_delay;
/* if servald is busy, wait this long before trying to force the transmission of a packet */
int grace_period;
/* Latency target in ms for this traffic class.
Frames older than the latency target will get dropped. */
int latencyTarget;
/* XXX Need to initialise these:
Real-time queue for voice (<200ms ?)
Real-time queue for video (<200ms ?) (lower priority than voice)
Ordinary service queue (<3 sec ?)
Rhizome opportunistic queue (infinity)
(Mesh management doesn't need a queue, as each overlay packet is tagged with some mesh management information)
*/
} overlay_txqueue;
extern overlay_txqueue overlay_tx[OQ_MAX];
ssize_t recvwithttl(int sock, unsigned char *buffer, size_t bufferlen, int *ttl, struct sockaddr *recvaddr, socklen_t *recvaddrlen);
// is the SID entirely 0xFF?
@ -478,8 +450,6 @@ int overlay_frame_append_payload(overlay_interface *interface, struct overlay_fr
int overlay_interface_args(const char *arg);
int overlay_rhizome_add_advertisements(int interface_number,struct overlay_buffer *e);
int overlay_add_local_identity(unsigned char *s);
void overlay_update_queue_schedule(overlay_txqueue *queue, struct overlay_frame *frame);
void overlay_send_packet(struct sched_ent *alarm);
extern int overlay_interface_count;
@ -669,6 +639,9 @@ int overlay_interface_register(char *name,
struct in_addr mask);
overlay_interface * overlay_interface_find(struct in_addr addr);
overlay_interface * overlay_interface_find_name(const char *name);
int overlay_broadcast_ensemble(int interface_number,
struct sockaddr_in *recipientaddr,
unsigned char *bytes,int len);
int directory_registration();
int directory_service_init();
@ -762,6 +735,9 @@ void overlay_mdp_poll(struct sched_ent *alarm);
void fd_periodicstats(struct sched_ent *alarm);
void rhizome_check_connections(struct sched_ent *alarm);
int overlay_tick_interface(int i, time_ms_t now);
int overlay_queue_init();
void monitor_client_poll(struct sched_ent *alarm);
void monitor_poll(struct sched_ent *alarm);
void rhizome_client_poll(struct sched_ent *alarm);

View File

@ -26,6 +26,7 @@ SERVAL_SOURCES = $(SERVAL_BASE)audiodevices.c \
$(SERVAL_BASE)overlay_address.c \
$(SERVAL_BASE)overlay_buffer.c \
$(SERVAL_BASE)overlay_interface.c \
$(SERVAL_BASE)overlay_queue.c \
$(SERVAL_BASE)overlay_mdp.c \
$(SERVAL_BASE)overlay_olsr.c \
$(SERVAL_BASE)overlay_packetformats.c \