Refactor packet construction and sending

This commit is contained in:
Jeremy Lakeman 2012-07-12 10:36:41 +09:30
parent e9566de0af
commit d36ba78afe
6 changed files with 283 additions and 416 deletions

View File

@ -292,7 +292,6 @@ int overlay_frame_process(struct overlay_interface *interface,overlay_frame *f)
if (((!ultimatelyForMe)||broadcast)&&(f->ttl>1))
{
/* Yes, it is. */
int len=0;
if (broadcast&&(!duplicateBroadcast)&&
((f->type==OF_TYPE_SELFANNOUNCE)
@ -310,8 +309,7 @@ int overlay_frame_process(struct overlay_interface *interface,overlay_frame *f)
if (debug&DEBUG_OVERLAYFRAMES) DEBUG("Forwarding frame");
int dontForward=0;
if (!broadcast) {
if (overlay_get_nexthop(f->destination,f->nexthop,&len,
&f->nexthop_interface))
if (overlay_get_nexthop(f->destination,f->nexthop,&f->nexthop_interface))
WHYF("Could not find next hop for %s* - dropping frame",
alloca_tohex(f->destination, 7));
dontForward=1;

View File

@ -45,8 +45,17 @@ struct interface_rules *interface_filter=NULL;
struct profile_total interface_poll_stats;
struct profile_total dummy_poll_stats;
struct outgoing_packet{
overlay_interface *interface;
int i;
overlay_buffer *buffer;
};
int overlay_tick_interface(int i, long long now);
unsigned char magic_header[]={/* Magic */ 'O',0x10,
/* Version */ 0x00,0x01};
/* Return milliseconds since server started. First call will always return zero.
Must use long long, not time_t, as time_t can be 32bits, which is too small for
@ -667,117 +676,25 @@ void overlay_interface_discover(struct sched_ent *alarm){
return;
}
int overlay_stuff_packet_from_queue(int i,overlay_buffer *e,int q,long long now,overlay_frame *pax[],int *frame_pax,int frame_max_pax)
{
if (0) DEBUGF("Stuffing from queue #%d on interface #%d",q,i);
overlay_frame **p=&overlay_tx[q].first;
if (0) DEBUGF("A p=%p, *p=%p, queue=%d",p,*p,q);
while(p&&(*p))
{
if (0) DEBUGF("B p=%p, *p=%p, queue=%d",p,*p,q);
/* Throw away any stale frames */
overlay_frame *pp;
if (p) pp=*p;
if (!pp) break;
if (0) DEBUGF("now=%lld, *p=%p, q=%d, overlay_tx[q]=%p",
now,*p,q,&overlay_tx[q]);
if (0) overlay_queue_dump(&overlay_tx[q]);
if (now>((*p)->enqueued_at+overlay_tx[q].latencyTarget)) {
/* Stale, so remove from queue. */
/* Get pointer to stale entry */
overlay_frame *stale=*p;
if (0)
DEBUGF("Removing stale frame at %p (now=%lld, expiry=%lld)",
stale,
now,((*p)->enqueued_at+overlay_tx[q].latencyTarget));
if (0) DEBUGF("now=%lld, *p=%p, q=%d, overlay_tx[q]=%p",
now,*p,q,&overlay_tx[q]);
/* Make ->next pointer that points to the stale node skip the stale node */
if (0) DEBUGF("p=%p, stale=%p, stale->next=%p",p,stale,stale->next);
*p=stale->next;
/* If there is an entry after the stale now, make it's prev point to the
node before the stale node */
if (*p) (*p)->prev=stale->prev;
if (overlay_tx[q].first==stale) overlay_tx[q].first=stale->next;
if (overlay_tx[q].last==stale) overlay_tx[q].last=stale->prev;
op_free(stale);
overlay_tx[q].length--;
}
else
{
/* We keep trying to queue frames in case they will fit, as not all
frames are of equal size. This means that lower bit-rate codecs will
get higher priority, which is probably not all bad. The only hard
limit is the maximum number of payloads we allow in a frame, which is
set so high as to be irrelevant, even on loopback or gigabit ethernet
interface */
/* Filter for those which should be sent via this interface.
To do that we need to know the nexthop, and the best route to the
next hop. */
int dontSend=1;
/* See if this interface has the best path to this node */
if (!(*p)->isBroadcast) {
unsigned char nexthop[SID_SIZE];
int len=0;
int next_hop_interface=-1;
int r=overlay_get_nexthop((*p)->destination,nexthop,&len,
&next_hop_interface);
if (!r) {
if (next_hop_interface==i) {
if (0) DEBUGF("unicast pax %p",*p);
dontSend=0; } else {
if (0)
DEBUGF("Packet should go via interface #%d, but I am interface #%d",next_hop_interface,i);
}
} else {
DEBUG("bummer, I couldn't find an open route to that node");
DEBUGF("sid=%s", alloca_tohex_sid((*p)->destination));
}
} else if (!(*p)->broadcast_sent_via[i])
{
/* Broadcast frames are easy to work out if they go via this interface,
just make sure that they haven't been previously sent via this
interface. We then have some magic that only dequeues broadcast packets
once they have been sent via all open interfaces (or gone stale) */
dontSend=0;
(*p)->broadcast_sent_via[i]=1;
if (0) DEBUGF("broadcast pax %p",*p);
}
if (dontSend==0) {
/* Try sending by this queue */
if (*frame_pax>=frame_max_pax) break;
if (!overlay_frame_package_fmt1(*p,e))
{
/* Add payload to list of payloads we are sending with this frame so that we can dequeue them
if we send them. */
if (0) {
DEBUGF(" paxed#%d %p%s",*frame_pax,*p,
(*p)->isBroadcast?"(broadcast)":"");
dump("payload of pax",(*p)->payload->bytes,(*p)->payload->length);
}
pax[(*frame_pax)++]=*p;
}
}
}
if (0) DEBUGF("C p=%p, *p=%p, queue=%d",p,*p,q);
if (*p)
/* Consider next in queue */
p=&(*p)->next;
if (0) DEBUGF("D p=%p, *p=%p, queue=%d",p,p?*p:NULL,q);
}
if (0) DEBUG("returning from stuffing");
return 0;
/* remove and free a payload from the queue */
overlay_frame *overlay_queue_remove(overlay_txqueue *queue, overlay_frame *frame){
overlay_frame *prev = frame->prev;
overlay_frame *next = frame->next;
if (prev)
prev->next = next;
else if(frame == queue->first)
queue->first = next;
if (next)
next->prev = prev;
else if(frame == queue->last)
queue->last = prev;
queue->length--;
op_free(frame);
return next;
}
int overlay_queue_dump(overlay_txqueue *q)
@ -791,8 +708,8 @@ int overlay_queue_dump(overlay_txqueue *q)
strbuf_sprintf(b," first=%p\n",q->first);
f=q->first;
while(f) {
strbuf_sprintf(b," %p: ->next=%p, ->prev=%p ->dequeue=%d\n",
f,f->next,f->prev,f->dequeue);
strbuf_sprintf(b," %p: ->next=%p, ->prev=%p\n",
f,f->next,f->prev);
if (f==f->next) {
strbuf_sprintf(b," LOOP!\n"); break;
}
@ -812,161 +729,184 @@ int overlay_queue_dump(overlay_txqueue *q)
return 0;
}
int overlay_resolve_next_hop(overlay_frame *frame){
if (frame->nexthop_address_status==OA_RESOLVED)
return 0;
if (frame->isBroadcast)
bcopy(&frame->destination,&frame->nexthop,SID_SIZE);
else if (overlay_get_nexthop((unsigned char *)frame->destination,frame->nexthop,&frame->nexthop_interface)){
// TODO new code?
frame->nexthop_address_status=OA_UNSUPPORTED;
return -1;
}
frame->nexthop_address_status=OA_RESOLVED;
return 0;
}
void overlay_init_packet(struct outgoing_packet *packet, int interface){
packet->i = interface;
packet->interface = &overlay_interfaces[packet->i];
packet->buffer=ob_new(packet->interface->mtu);
ob_limitsize(packet->buffer, packet->interface->mtu);
ob_append_bytes(packet->buffer,magic_header,4);
}
void overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, long long now){
overlay_frame *frame = queue->first;
// TODO stop when the packet is nearly full?
while(frame){
int drop =0;
frame->isBroadcast = overlay_address_is_broadcast(frame->destination);
if (frame->enqueued_at + queue->latencyTarget < now)
drop=1;
else if(frame->isBroadcast)
drop=overlay_broadcast_drop_check(frame->destination);
if (drop){
DEBUG("Dropping frame due to expiry timeout");
frame = overlay_queue_remove(queue, frame);
continue;
}
if (overlay_resolve_next_hop(frame))
goto skip;
if (!packet->buffer){
// use the interface of the first payload we find
if (frame->isBroadcast){
// find an interface that we haven't broadcast on yet
int i;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].observed>0)
if (!frame->broadcast_sent_via[i]){
overlay_init_packet(packet, i);
break;
}
}
if (!packet->buffer){
// oh dear, why is this broadcast still in the queue?
frame = overlay_queue_remove(queue, frame);
continue;
}
}else{
overlay_init_packet(packet, frame->nexthop_interface);
}
}else{
// make sure this payload can be sent via this interface
if (frame->isBroadcast){
if (frame->broadcast_sent_via[packet->i]){
goto skip;
}
}else if(packet->i != frame->nexthop_interface){
goto skip;
}
}
if (overlay_frame_package_fmt1(frame, packet->buffer))
// payload was not queued
goto skip;
// mark the payload as sent
int keep_payload = 0;
if (frame->isBroadcast){
int i;
frame->broadcast_sent_via[packet->i]=1;
// check if there is still a broadcast to be sent
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].observed>0)
if (!frame->broadcast_sent_via[i]){
keep_payload=1;
break;
}
}
}
if (!keep_payload){
frame = overlay_queue_remove(queue, frame);
continue;
}
skip:
frame = frame->next;
}
}
// fill a packet from our outgoing queues and send it
int overlay_fill_send_packet(struct outgoing_packet *packet, long long now){
int i;
IN();
for (i=0;i<OQ_MAX;i++){
overlay_txqueue *queue=&overlay_tx[i];
overlay_stuff_packet(packet, queue, now);
}
if(packet->buffer){
// send the packet
if (packet->buffer->length>=HEADERFIELDS_LEN){
if (debug&DEBUG_PACKETCONSTRUCTION)
dump("assembled packet",&packet->buffer->bytes[0],packet->buffer->length);
if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("Sending %d byte packet",packet->buffer->length);
overlay_broadcast_ensemble(packet->i,NULL,packet->buffer->bytes,packet->buffer->length);
}
ob_free(packet->buffer);
RETURN(1);
}
RETURN(0);
}
void overlay_send_packet(){
struct outgoing_packet packet;
bzero(&packet, sizeof(struct outgoing_packet));
overlay_fill_send_packet(&packet, overlay_gettime_ms());
}
int overlay_tick_interface(int i, long long now)
{
int frame_pax=0;
overlay_buffer *e=NULL;
#define MAX_FRAME_PAX 1024
overlay_frame *pax[MAX_FRAME_PAX];
struct outgoing_packet packet;
IN();
if (overlay_interfaces[i].bits_per_second<1) {
/* An interface with no speed budget is for listening only, so doesn't get ticked */
return 0;
RETURN(0);
}
// initialise the packet buffer
bzero(&packet, sizeof(struct outgoing_packet));
overlay_init_packet(&packet, i);
if (debug&DEBUG_OVERLAYINTERFACES) DEBUGF("Ticking interface #%d",i);
/* Get a buffer ready, and limit it's size appropriately.
XXX size limit should be reduced from MTU.
XXX we should also take account of the volume of data likely to be in the TX buffer. */
e=ob_new(overlay_interfaces[i].mtu);
if (!e) return WHY("ob_new() failed");
ob_limitsize(e,overlay_interfaces[i].mtu/4);
/* 0. Setup Serval Mesh frame header. We do not use an explicit length field for these, as the various
component payloads are all self-authenticating, or at least that is the theory. */
unsigned char bytes[]={/* Magic */ 'O',0x10,
/* Version */ 0x00,0x01};
if (ob_append_bytes(e,bytes,4)) {
ob_free(e);
return WHY("ob_append_bytes() refused to append magic bytes.");
}
/* 1. Send announcement about ourselves, including one SID that we host if we host more than one SID
(the first SID we host becomes our own identity, saving a little bit of data here).
*/
overlay_add_selfannouncement(i,e);
overlay_add_selfannouncement(i,packet.buffer);
/* 2. Add any queued high-priority isochronous data (i.e. voice) to the frame. */
overlay_stuff_packet_from_queue(i,e,OQ_ISOCHRONOUS_VOICE,now,pax,&frame_pax,MAX_FRAME_PAX);
ob_limitsize(e,overlay_interfaces[i].mtu/2);
/* 3. Add some mesh reachability reports (unlike BATMAN we announce reachability to peers progressively).
Give priority to newly observed nodes so that good news travels quickly to help roaming.
XXX - Don't forget about PONGing reachability reports to allow use of monodirectional links.
*/
overlay_stuff_packet_from_queue(i,e,OQ_MESH_MANAGEMENT,now,pax,&frame_pax,MAX_FRAME_PAX);
/* We previously limited manifest space to 3/4 of MTU, but that causes problems for
MeshMS journal manifests, at least until we move to a compact binary format.
So for now, allow allow rest of packet to get used */
// TODO reduce to <= mtu*3/4 once we have compact binary canonical manifest format
ob_limitsize(e,overlay_interfaces[i].mtu*4/4);
/* Add advertisements for ROUTES not Rhizome bundles.
Rhizome bundle advertisements are lower priority */
overlay_route_add_advertisements(e);
/* Add advertisements for ROUTES */
overlay_route_add_advertisements(packet.buffer);
ob_limitsize(e,overlay_interfaces[i].mtu);
/* 4. XXX Add lower-priority queued data */
overlay_stuff_packet_from_queue(i,e,OQ_ISOCHRONOUS_VIDEO,now,pax,&frame_pax,MAX_FRAME_PAX);
overlay_stuff_packet_from_queue(i,e,OQ_ORDINARY,now,pax,&frame_pax,MAX_FRAME_PAX);
overlay_stuff_packet_from_queue(i,e,OQ_OPPORTUNISTIC,now,pax,&frame_pax,MAX_FRAME_PAX);
/* 5. XXX Fill the packet up to a suitable size with anything that seems a good idea */
if (rhizome_enabled() && rhizome_http_server_running())
overlay_rhizome_add_advertisements(i,e);
if (debug&DEBUG_PACKETCONSTRUCTION)
dump("assembled packet",&e->bytes[0],e->length);
/* Now send the frame. This takes the form of a special DNA packet with a different
service code, which we setup earlier. */
if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("Sending %d byte tick packet",e->length);
if (overlay_broadcast_ensemble(i,NULL,e->bytes,e->length) != -1)
{
if (debug&DEBUG_OVERLAYINTERFACES)
DEBUGF("Successfully transmitted tick frame on interface #%d (%d bytes)",
i,e->length);
/* De-queue the passengers who were aboard.
One round of marking, and then one round of culling from the queue. */
int j,q;
/* Mark frames that can be dequeued */
for(j=0;j<frame_pax;j++)
{
overlay_frame *p=pax[j];
if (0)
DEBUGF("dequeue %p ?%s",p,p->isBroadcast?" (broadcast)":" (unicast)");
if (!p->isBroadcast)
{
if (0) DEBUG("yes");
p->dequeue=1;
}
else {
int i;
int workLeft=0;
for(i=0;i<OVERLAY_MAX_INTERFACES;i++)
{
if (overlay_interfaces[i].observed>0)
if (!p->broadcast_sent_via[i])
{
workLeft=1;
break;
}
}
if (!workLeft) p->dequeue=1;
}
}
/* Visit queues and dequeue all that we can */
for(q=0;q<OQ_MAX;q++)
{
overlay_frame **p=&overlay_tx[q].first;
overlay_frame *t;
while(p&&(*p))
{
if ((*p)->dequeue) {
{
if (debug&DEBUG_QUEUES)
DEBUGF("dequeuing %s* -> %s* NOW (queue length=%d)",
alloca_tohex((*p)->source, 7),
alloca_tohex((*p)->destination, 7),
overlay_tx[q].length);
t=*p;
*p=t->next;
if (overlay_tx[q].last==t) overlay_tx[q].last=t->prev;
if (overlay_tx[q].first==t) overlay_tx[q].first=t->next;
if (t->prev) t->prev->next=t->next;
if (t->next) t->next->prev=t->prev;
if (debug&DEBUG_QUEUES)
{
DEBUGF("** dequeued pax @ %p",t);
overlay_queue_dump(&overlay_tx[q]);
}
if (op_free(t)) {
overlay_queue_dump(&overlay_tx[q]);
WHY("op_free() failed");
if (debug&DEBUG_QUEUES) exit(WHY("Queue structures corrupt"));
}
overlay_tx[q].length--;
}
} else {
/* only skip ahead if we haven't dequeued something */
if (!(*p)) break;
p=&(*p)->next;
}
}
}
if (e) ob_free(e); e=NULL;
return 0;
}
else {
if (e) ob_free(e); e=NULL;
return WHY("overlay_broadcast_ensemble() failed");
}
overlay_rhizome_add_advertisements(i,packet.buffer);
/* Stuff more payloads from queues and send it */
overlay_fill_send_packet(&packet, now);
RETURN(0);
}
long long parse_quantity(char *q)

View File

@ -99,10 +99,8 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
int ofs;
overlay_frame f;
f.payload=NULL;
f.bytes=NULL;
f.bytecount=0;
f.prev=NULL; f.next=NULL;
bzero(&f,sizeof(overlay_frame));
if (recvaddr->sa_family==AF_INET)
f.recvaddr=recvaddr;
else {
@ -139,25 +137,15 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
f.modifiers=0;
ofs+=3;
break;
case OF_TYPE_EXTENDED12:
/* Eat the next byte and then skip over this reserved frame type */
f.type=OF_TYPE_FLAG_E12|(packet[ofs]&OF_MODIFIER_BITS)|(packet[ofs+1]<<4);
f.modifiers=0;
ofs+=2;
break;
case OF_TYPE_NODEANNOUNCE:
case OF_TYPE_IDENTITYENQUIRY:
case OF_TYPE_RESERVED_09:
case OF_TYPE_RESERVED_0a:
case OF_TYPE_RESERVED_0b:
case OF_TYPE_RESERVED_0c:
case OF_TYPE_RESERVED_0d:
case OF_TYPE_SELFANNOUNCE:
case OF_TYPE_SELFANNOUNCE_ACK:
case OF_TYPE_DATA:
case OF_TYPE_DATA_VOICE:
case OF_TYPE_RHIZOME_ADVERT:
case OF_TYPE_PLEASEEXPLAIN:
default:
/* No extra bytes to deal with here */
ofs++;
break;
@ -194,7 +182,7 @@ int packetOkOverlay(struct overlay_interface *interface,unsigned char *packet, s
f.bytecount=f.rfs-(offset-ofs);
if (f.bytecount<0) {
f.bytecount=0;
if (debug&DEBUG_PACKETFORMATS) DEBUGF("f.rfs=%d, offset=%d, ofs=%d", f.rfs, offset, ofs);
if (debug&DEBUG_PACKETFORMATS) DEBUGF("f.rfs=%02x, offset=%02x, ofs=%02x", f.rfs, offset, ofs);
return WHY("negative residual byte count after extracting addresses from frame header");
}

View File

@ -66,19 +66,19 @@ int overlay_frame_package_fmt1(overlay_frame *p,overlay_buffer *b)
Will pick a next hop if one has not been chosen.
*/
int nexthoplen=0;
overlay_buffer *headers=ob_new(256);
int i;
overlay_buffer *headers;
headers=ob_new(256);
if (!headers) return WHY("could not allocate overlay buffer for headers");
if (!p) return WHY("p is NULL");
if (!b) return WHY("b is NULL");
ob_checkpoint(b);
if (debug&DEBUG_PACKETCONSTRUCTION)
dump_payload(p,"package_fmt1 stuffing into packet");
/* Build header */
int fail=0;
if (p->nexthop_address_status!=OA_RESOLVED) {
if (0) WHYF("next hop is NOT resolved for packet to %s",
@ -86,16 +86,18 @@ int overlay_frame_package_fmt1(overlay_frame *p,overlay_buffer *b)
if (overlay_address_is_broadcast(p->destination)) {
/* Broadcast frames are broadcast rather than unicast to next hop.
Just check if the broadcast frame should be dropped first. */
if (overlay_broadcast_drop_check(p->destination))
return WHY("This broadcast packet ID has been seen recently");
int i;
if (overlay_broadcast_drop_check(p->destination)){
WHY("This broadcast packet ID has been seen recently");
goto cleanup;
}
/* Copy the broadcast address exactly so that we preserve the BPI */
for(i=0;i<SID_SIZE;i++) p->nexthop[i]=p->destination[i];
p->nexthop_address_status=OA_RESOLVED;
} else {
if (overlay_get_nexthop((unsigned char *)p->destination,p->nexthop,&nexthoplen,&p->nexthop_interface)) {
fail++;
return WHY("could not determine next hop address for payload");
if (overlay_get_nexthop((unsigned char *)p->destination,p->nexthop,&p->nexthop_interface)) {
WHY("could not determine next hop address for payload");
goto cleanup;
}
else p->nexthop_address_status=OA_RESOLVED;
}
@ -107,27 +109,29 @@ int overlay_frame_package_fmt1(overlay_frame *p,overlay_buffer *b)
if (p->source[0]<0x10) {
// Make sure that addresses do not overload the special address spaces of 0x00*-0x0f*
fail++;
return WHY("packet source address begins with reserved value 0x00-0x0f");
WHY("packet source address begins with reserved value 0x00-0x0f");
goto cleanup;
}
if (p->destination[0]<0x10) {
// Make sure that addresses do not overload the special address spaces of 0x00*-0x0f*
fail++;
return WHY("packet destination address begins with reserved value 0x00-0x0f");
WHY("packet destination address begins with reserved value 0x00-0x0f");
goto cleanup;
}
if (p->nexthop[0]<0x10) {
// Make sure that addresses do not overload the special address spaces of 0x00*-0x0f*
fail++;
return WHY("packet nexthop address begins with reserved value 0x00-0x0f");
WHY("packet nexthop address begins with reserved value 0x00-0x0f");
goto cleanup;
}
/* Write fields into binary structure in correct order */
/* Write out type field byte(s) */
if (!fail) if (op_append_type(headers,p)) fail++;
if (op_append_type(headers,p))
goto cleanup;
/* Write out TTL */
if (!fail) if (ob_append_byte(headers,p->ttl)) fail++;
if (ob_append_byte(headers,p->ttl))
goto cleanup;
/* Length. This is the fun part, because we cannot calculate how many bytes we need until
we have abbreviated the addresses, and the length encoding we use varies according to the
@ -135,55 +139,52 @@ int overlay_frame_package_fmt1(overlay_frame *p,overlay_buffer *b)
we rely on context for abbreviating the addresses. So we write it initially and then patch it
after.
*/
if (!fail) {
int max_len=((SID_SIZE+3)*3+headers->length+p->payload->length);
if (debug&DEBUG_PACKETCONSTRUCTION)
fprintf(stderr,"Appending RFS for max_len=%d\n",max_len);
ob_append_rfs(headers,max_len);
int addrs_start=headers->length;
/* Write out addresses as abbreviated as possible */
overlay_abbreviate_append_address(headers,p->nexthop);
overlay_abbreviate_set_most_recent_address(p->nexthop);
overlay_abbreviate_append_address(headers,p->destination);
overlay_abbreviate_set_most_recent_address(p->destination);
overlay_abbreviate_append_address(headers,p->source);
overlay_abbreviate_set_most_recent_address(p->source);
int addrs_len=headers->length-addrs_start;
int actual_len=addrs_len+p->payload->length;
if (debug&DEBUG_PACKETCONSTRUCTION)
fprintf(stderr,"Patching RFS for actual_len=%d\n",actual_len);
ob_patch_rfs(headers,actual_len);
}
if (fail) {
ob_free(headers);
return WHY("failure count was non-zero");
}
int max_len=((SID_SIZE+3)*3+headers->length+p->payload->length);
if (debug&DEBUG_PACKETCONSTRUCTION)
DEBUGF("Appending RFS for max_len=%d\n",max_len);
ob_append_rfs(headers,max_len);
int addrs_start=headers->length;
/* Write out addresses as abbreviated as possible */
overlay_abbreviate_append_address(headers,p->nexthop);
overlay_abbreviate_set_most_recent_address(p->nexthop);
overlay_abbreviate_append_address(headers,p->destination);
overlay_abbreviate_set_most_recent_address(p->destination);
overlay_abbreviate_append_address(headers,p->source);
overlay_abbreviate_set_most_recent_address(p->source);
int addrs_len=headers->length-addrs_start;
int actual_len=addrs_len+p->payload->length;
if (debug&DEBUG_PACKETCONSTRUCTION)
DEBUGF("Patching RFS for actual_len=%d\n",actual_len);
ob_patch_rfs(headers,actual_len);
/* Write payload format plus total length of header bits */
if (ob_makespace(b,2+headers->length+p->payload->length)) {
/* Not enough space free in output buffer */
ob_free(headers);
if (debug&DEBUG_PACKETFORMATS)
WHY("Could not make enough space free in output buffer");
return -1;
DEBUGF("Could not make enough space free in output buffer");
goto cleanup;
}
/* Package up headers and payload */
ob_checkpoint(b);
if (ob_append_bytes(b,headers->bytes,headers->length))
{ fail++; WHY("could not append header"); }
if (ob_append_bytes(b,p->payload->bytes,p->payload->length))
{ fail++; WHY("could not append payload"); }
if (ob_append_bytes(b,headers->bytes,headers->length)) {
WHY("could not append header");
goto cleanup;
}
if (ob_append_bytes(b,p->payload->bytes,p->payload->length)) {
WHY("could not append payload");
goto cleanup;
}
/* XXX SIGN &/or ENCRYPT */
ob_free(headers);
return 0;
if (fail) { ob_rewind(b); return WHY("failure count was non-zero"); } else return 0;
cleanup:
ob_free(headers);
ob_rewind(b);
return -1;
}
overlay_buffer *overlay_payload_unpackage(overlay_frame *b) {
@ -248,64 +249,6 @@ int overlay_payload_enqueue(int q,overlay_frame *p,int forceBroadcastP)
WHYF("Enqueuing packet for %s* (q[%d]length = %d)",
alloca_tohex(p->destination, 7),
q,overlay_tx[q].length);
if (q==OQ_ISOCHRONOUS_VOICE&&(!forceBroadcastP)) {
/* Dispatch voice data immediately.
Also tell Rhizome to back off a bit, so that voice traffic
can get through. */
int interface=-1;
int nexthoplen=SID_SIZE;
int broadcast=overlay_address_is_broadcast(p->destination);
rhizome_saw_voice_traffic();
overlay_abbreviate_clear_most_recent_address();
if (broadcast) {
bcopy(p->destination,p->nexthop,SID_SIZE);
interface=0;
} else {
if (overlay_get_nexthop(p->destination,p->nexthop,&nexthoplen,
&interface)) {
// (we don't need another log message here)
return -1;
}
}
overlay_buffer *b=ob_new(overlay_interfaces[interface].mtu);
unsigned char bytes[]={/* Magic */ 'O',0x10,
/* Version */ 0x00,0x01};
if (ob_append_bytes(b,bytes,4)) {
ob_free(b);
return WHY("ob_append_bytes() refused to append magic bytes.");
}
if (overlay_frame_package_fmt1(p,b)) {
ob_free(b);
return WHY("could not package voice frame for immediate dispatch");
}
if (debug&DEBUG_OVERLAYINTERFACES)
WHYF("Sending %d byte voice packet",b->length);
nextinterface:
if (overlay_broadcast_ensemble(interface,NULL,b->bytes,b->length) != -1)
{
if (debug&DEBUG_OVERLAYINTERFACES)
WHYF("Voice frame sent on interface #%d (%d bytes)",
interface,b->length);
ob_free(b);
return 0;
} else {
WHYF("Failed to send voice frame on interface #%d (%d bytes)",
interface,b->length);
ob_free(b);
return -1;
}
if (broadcast) {
interface++;
if (interface<overlay_interface_count) goto nextinterface;
}
}
if (q<0||q>=OQ_MAX) return WHY("Invalid queue specified");
if (!p) return WHY("Cannot queue NULL");
@ -336,9 +279,14 @@ int overlay_payload_enqueue(int q,overlay_frame *p,int forceBroadcastP)
overlay_tx[q].last=p;
if (!overlay_tx[q].first) overlay_tx[q].first=p;
overlay_tx[q].length++;
if (0) dump_queue("after",q);
if (q==OQ_ISOCHRONOUS_VOICE&&(!forceBroadcastP)) {
// Send a packet now
overlay_send_packet();
}
return 0;
}

View File

@ -363,8 +363,7 @@ int overlay_route_init(int mb_ram)
nodes that are only indirectly connected. Indeed, the two are somewhat interconnected as
an indirect route may be required to get a self-announce ack back to the sender.
*/
int overlay_get_nexthop(unsigned char *d,unsigned char *nexthop,int *nexthoplen,
int *interface)
int overlay_get_nexthop(unsigned char *d,unsigned char *nexthop,int *interface)
{
int i;
@ -376,32 +375,29 @@ int overlay_get_nexthop(unsigned char *d,unsigned char *nexthop,int *nexthoplen,
if (!overlay_neighbours) return WHY("I have no neighbours");
overlay_neighbour *neh=overlay_route_get_neighbour_structure(d,SID_SIZE,
overlay_neighbour *direct_neighbour=overlay_route_get_neighbour_structure(d,SID_SIZE,
0 /* don't create if
missing */);
if (neh) {
if (direct_neighbour) {
/* Is a direct neighbour.
So in the absence of any better indirect route, we pick the interface that
we can hear this neighbour on the most reliably, and then send the frame
via that interface and directly addressed to the recipient. */
bcopy(d,nexthop,SID_SIZE);
(*nexthoplen)=SID_SIZE;
*interface=0;
for(i=1;i<OVERLAY_MAX_INTERFACES;i++) {
if (neh->scores[i]>neh->scores[*interface]) *interface=i;
}
if (neh->scores[*interface]<1) {
if (1||debug&DEBUG_OVERLAYROUTING)
DEBUGF("No open path to %s, neighbour score <=0",alloca_tohex_sid(neh->node->sid));
return -1;
if (direct_neighbour->scores[i]>direct_neighbour->scores[*interface]) *interface=i;
}
if (direct_neighbour->scores[*interface]>0) {
if (0) DEBUGF("nexthop is %s",alloca_tohex_sid(nexthop));
return 0;
} else {
}
// otherwise fall through
}
{
/* Is not a direct neighbour.
XXX - Very simplistic for now. */
overlay_node *n=overlay_route_find_node(d,SID_SIZE,0 /* don't create if missing */ );
@ -411,13 +407,13 @@ int overlay_get_nexthop(unsigned char *d,unsigned char *nexthop,int *nexthoplen,
int best_o=-1;
for(o=0;o<OVERLAY_MAX_OBSERVATIONS;o++) {
int score=n->observations[o].observed_score;
overlay_neighbour *neh
overlay_neighbour *neighbour
=overlay_route_get_neighbour_structure
(n->observations[o].sender_prefix,OVERLAY_SENDER_PREFIX_LENGTH,0);
if (neh) {
if (neighbour && neighbour!=direct_neighbour) {
for(i=1;i<OVERLAY_MAX_INTERFACES;i++) {
if (neh->scores[i]*score>best_score) {
bcopy(&neh->node->sid[0],&nexthop[0],SID_SIZE);
if (neighbour->scores[i]*score>best_score) {
bcopy(&neighbour->node->sid[0],&nexthop[0],SID_SIZE);
*interface=i;
best_o=o;
best_score=score;
@ -570,15 +566,13 @@ int overlay_route_ack_selfannounce(overlay_frame *f,
out->nexthop_address_status=OA_UNINITIALISED;
{
unsigned char nexthop[SID_SIZE];
int len=0;
int next_hop_interface=-1;
int r=overlay_get_nexthop(out->destination,nexthop,&len,
&next_hop_interface);
int r=overlay_get_nexthop(out->destination,nexthop,&next_hop_interface);
if (r) {
/* no open path, so convert to broadcast */
int i;
for(i=0;i<(SID_SIZE-8);i++) out->nexthop[i]=0xff;
for(i=24;i<SID_SIZE;i++) out->nexthop[i]=random()&0xff;
for(i=(SID_SIZE-8);i<SID_SIZE;i++) out->nexthop[i]=random()&0xff;
out->nexthop_address_status=OA_RESOLVED;
out->ttl=2;
out->isBroadcast=1;

View File

@ -433,7 +433,6 @@ typedef struct overlay_frame {
unsigned int modifiers;
unsigned char ttl;
unsigned char dequeue;
/* Mark which interfaces the frame has been sent on,
so that we can ensure that broadcast frames get sent
@ -860,12 +859,12 @@ int overlay_rx_messages();
int overlay_add_selfannouncement();
int overlay_frame_package_fmt1(overlay_frame *p,overlay_buffer *b);
int overlay_interface_args(const char *arg);
int overlay_get_nexthop(unsigned char *d,unsigned char *nexthop,int *nexthoplen,
int *interface);
int overlay_get_nexthop(unsigned char *d,unsigned char *nexthop,int *interface);
int overlay_sendto(struct sockaddr_in *recipientaddr,unsigned char *bytes,int len);
int overlay_rhizome_add_advertisements(int interface_number,overlay_buffer *e);
int overlay_add_local_identity(unsigned char *s);
int overlay_address_is_local(unsigned char *s);
void overlay_send_packet();
extern int overlay_interface_count;