mirror of
https://github.com/servalproject/serval-dna.git
synced 2025-01-29 15:43:56 +00:00
Throttle serial radio devices when tx buffer is in use
This commit is contained in:
parent
0de64d0363
commit
84ad4debfa
@ -684,7 +684,7 @@ overlay_broadcast_ensemble(int interface_number,
|
||||
}
|
||||
|
||||
if (interface->type==OVERLAY_INTERFACE_PACKETRADIO) {
|
||||
return overlay_packetradio_tx_packet(interface_number,recipientaddr,bytes,len);
|
||||
return overlay_packetradio_tx_packet(interface,recipientaddr,bytes,len);
|
||||
} else if (interface->fileP)
|
||||
{
|
||||
|
||||
|
@ -77,125 +77,123 @@ int overlay_rx_packet_append_byte(overlay_interface *interface,unsigned char byt
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void write_buffer(overlay_interface *interface){
|
||||
if (interface->tx_bytes_pending>0) {
|
||||
int written=write(interface->alarm.poll.fd,interface->txbuffer,
|
||||
interface->tx_bytes_pending);
|
||||
if (config.debug.packetradio) DEBUGF("Trying to write %d bytes",
|
||||
interface->tx_bytes_pending);
|
||||
if (written>0) {
|
||||
interface->tx_bytes_pending-=written;
|
||||
bcopy(&interface->txbuffer[written],&interface->txbuffer[0],
|
||||
interface->tx_bytes_pending);
|
||||
if (config.debug.packetradio) DEBUGF("Wrote %d bytes (%d left pending)",
|
||||
written,interface->tx_bytes_pending);
|
||||
} else {
|
||||
if (config.debug.packetradio) DEBUGF("Failed to write any data");
|
||||
}
|
||||
}
|
||||
|
||||
if (interface->tx_bytes_pending>0) {
|
||||
// more to write, so keep POLLOUT flag
|
||||
interface->alarm.poll.events|=POLLOUT;
|
||||
} else {
|
||||
// nothing more to write, so clear POLLOUT flag
|
||||
interface->alarm.poll.events&=~POLLOUT;
|
||||
// try to empty another packet from the queue ASAP
|
||||
overlay_queue_schedule_next(gettime_ms());
|
||||
}
|
||||
watch(&interface->alarm);
|
||||
}
|
||||
|
||||
void overlay_packetradio_poll(struct sched_ent *alarm)
|
||||
{
|
||||
overlay_interface *interface = (overlay_interface *)alarm;
|
||||
|
||||
{
|
||||
if (interface->tx_bytes_pending>0) {
|
||||
int written=write(alarm->poll.fd,interface->txbuffer,
|
||||
interface->tx_bytes_pending);
|
||||
if (config.debug.packetradio) DEBUGF("Trying to write %d bytes",
|
||||
interface->tx_bytes_pending);
|
||||
if (written>0) {
|
||||
interface->tx_bytes_pending-=written;
|
||||
bcopy(&interface->txbuffer[written],&interface->txbuffer[0],
|
||||
interface->tx_bytes_pending);
|
||||
if (config.debug.packetradio) DEBUGF("Wrote %d bytes (%d left pending)",
|
||||
written,interface->tx_bytes_pending);
|
||||
} else {
|
||||
if (config.debug.packetradio) DEBUGF("Failed to write any data");
|
||||
}
|
||||
alarm->poll.revents&=~POLLOUT;
|
||||
watch(alarm);
|
||||
if (interface->tx_bytes_pending>0) {
|
||||
// more to write, so keep POLLOUT flag
|
||||
} else {
|
||||
// nothing more to write, so clear POLLOUT flag
|
||||
alarm->poll.events&=~POLLOUT;
|
||||
watch(alarm);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
time_ms_t now = gettime_ms();
|
||||
|
||||
if (alarm->poll.revents==0){
|
||||
|
||||
if (interface->state==INTERFACE_STATE_UP && interface->tick_ms>0){
|
||||
if (interface->state==INTERFACE_STATE_UP &&
|
||||
(interface->last_tick_ms==-1 || interface->last_tick_ms + interface->tick_ms<now)){
|
||||
// tick the interface
|
||||
time_ms_t now = gettime_ms();
|
||||
overlay_route_queue_advertisements(interface);
|
||||
alarm->alarm=now+interface->tick_ms;
|
||||
alarm->deadline=alarm->alarm+interface->tick_ms/2;
|
||||
unschedule(alarm);
|
||||
schedule(alarm);
|
||||
interface->last_tick_ms=now;
|
||||
}
|
||||
|
||||
alarm->alarm=interface->last_tick_ms + interface->tick_ms;
|
||||
alarm->deadline=alarm->alarm + interface->tick_ms/2;
|
||||
unschedule(alarm);
|
||||
schedule(alarm);
|
||||
return;
|
||||
}
|
||||
|
||||
if (alarm->poll.revents&POLLOUT){
|
||||
write_buffer(interface);
|
||||
}
|
||||
|
||||
// Read data from the serial port
|
||||
// We will almost certainly support more than one type of packet radio
|
||||
// so lets parameterise this.
|
||||
switch(1) {
|
||||
case 1:
|
||||
{
|
||||
unsigned char buffer[OVERLAY_INTERFACE_RX_BUFFER_SIZE];
|
||||
ssize_t nread = read(alarm->poll.fd, buffer, OVERLAY_INTERFACE_RX_BUFFER_SIZE);
|
||||
if (nread == -1){
|
||||
// WHY_perror("read");
|
||||
return;
|
||||
}
|
||||
if (nread>0) {
|
||||
/*
|
||||
Examine received bytes for end of packet marker.
|
||||
The challenge is that we need to make sure that the packet encapsulation
|
||||
is self-synchronising in the event that a data error occurs (including
|
||||
failure to receive an arbitrary number of bytes).
|
||||
For now we will reuse the functional but sub-optimal method described in
|
||||
RFC1055 for SLIP.
|
||||
*/
|
||||
int i;
|
||||
for(i=0;i<nread;i++)
|
||||
{
|
||||
switch (interface->decoder_state) {
|
||||
case DC_ESC:
|
||||
// escaped character
|
||||
interface->decoder_state=DC_NORMAL;
|
||||
switch(buffer[i]) {
|
||||
case SLIP_ESC_END: // escaped END byte
|
||||
overlay_rx_packet_append_byte(interface,SLIP_END);
|
||||
break;
|
||||
case SLIP_ESC_ESC: // escaped escape character
|
||||
overlay_rx_packet_append_byte(interface,SLIP_ESC);
|
||||
break;
|
||||
default: /* Unknown escape character. This is an error. */
|
||||
if (config.debug.packetradio)
|
||||
WARNF("Packet radio stream contained illegal escaped byte 0x%02x -- ignoring packet.",buffer[i]);
|
||||
// interface->recv_offset=0;
|
||||
break;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// non-escape character
|
||||
switch(buffer[i]) {
|
||||
case SLIP_ESC:
|
||||
interface->decoder_state=DC_ESC;
|
||||
break;
|
||||
case SLIP_END:
|
||||
overlay_rx_packet_complete(interface);
|
||||
break;
|
||||
default:
|
||||
overlay_rx_packet_append_byte(interface,buffer[i]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
if (alarm->poll.revents&POLLIN){
|
||||
unsigned char buffer[OVERLAY_INTERFACE_RX_BUFFER_SIZE];
|
||||
ssize_t nread = read(alarm->poll.fd, buffer, OVERLAY_INTERFACE_RX_BUFFER_SIZE);
|
||||
if (nread == -1){
|
||||
// WHY_perror("read");
|
||||
return;
|
||||
}
|
||||
if (nread>0) {
|
||||
/*
|
||||
Examine received bytes for end of packet marker.
|
||||
The challenge is that we need to make sure that the packet encapsulation
|
||||
is self-synchronising in the event that a data error occurs (including
|
||||
failure to receive an arbitrary number of bytes).
|
||||
For now we will reuse the functional but sub-optimal method described in
|
||||
RFC1055 for SLIP.
|
||||
*/
|
||||
int i;
|
||||
for(i=0;i<nread;i++)
|
||||
{
|
||||
switch (interface->decoder_state) {
|
||||
case DC_ESC:
|
||||
// escaped character
|
||||
interface->decoder_state=DC_NORMAL;
|
||||
switch(buffer[i]) {
|
||||
case SLIP_ESC_END: // escaped END byte
|
||||
overlay_rx_packet_append_byte(interface,SLIP_END);
|
||||
break;
|
||||
case SLIP_ESC_ESC: // escaped escape character
|
||||
overlay_rx_packet_append_byte(interface,SLIP_ESC);
|
||||
break;
|
||||
default: /* Unknown escape character. This is an error. */
|
||||
if (config.debug.packetradio)
|
||||
WARNF("Packet radio stream contained illegal escaped byte 0x%02x -- ignoring packet.",buffer[i]);
|
||||
// interface->recv_offset=0;
|
||||
break;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// non-escape character
|
||||
switch(buffer[i]) {
|
||||
case SLIP_ESC:
|
||||
interface->decoder_state=DC_ESC;
|
||||
break;
|
||||
case SLIP_END:
|
||||
overlay_rx_packet_complete(interface);
|
||||
break;
|
||||
default:
|
||||
overlay_rx_packet_append_byte(interface,buffer[i]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
watch(alarm);
|
||||
|
||||
return ;
|
||||
}
|
||||
|
||||
int overlay_packetradio_tx_packet(int interface_number,
|
||||
int overlay_packetradio_tx_packet(overlay_interface *interface,
|
||||
struct sockaddr_in *recipientaddr,
|
||||
unsigned char *bytes,int len)
|
||||
{
|
||||
if (config.debug.packetradio) DEBUGF("Sending packet of %d bytes",len);
|
||||
|
||||
/*
|
||||
This is a bit interesting, because we have to deal with RTS/CTS potentially
|
||||
blocking our writing of the packet.
|
||||
@ -206,19 +204,14 @@ int overlay_packetradio_tx_packet(int interface_number,
|
||||
deal with such truncation in a fairly sane manner.
|
||||
*/
|
||||
|
||||
if (len>OVERLAY_INTERFACE_RX_BUFFER_SIZE) {
|
||||
if (config.debug.packetradio) WHYF("Not sending over-size packet");
|
||||
return -1;
|
||||
}
|
||||
if (overlay_interfaces[interface_number].tx_bytes_pending) {
|
||||
if (config.debug.packetradio)
|
||||
WHYF("Dropping packet due to congestion");
|
||||
return -1;
|
||||
}
|
||||
if (len>OVERLAY_INTERFACE_RX_BUFFER_SIZE)
|
||||
return WHYF("Not sending over-size packet");
|
||||
if (interface->tx_bytes_pending>0)
|
||||
return WHYF("Cannot send two packets at the same time");
|
||||
|
||||
/* Encode packet with SLIP escaping.
|
||||
XXX - Add error correction here also */
|
||||
char buffer[len*2+4];
|
||||
unsigned char *buffer = interface->txbuffer;
|
||||
int out_len=0;
|
||||
int i;
|
||||
|
||||
@ -242,24 +235,9 @@ int overlay_packetradio_tx_packet(int interface_number,
|
||||
|
||||
if (config.debug.packetradio) DEBUGF("Encoded length is %d",out_len);
|
||||
|
||||
int written=write(overlay_interfaces[interface_number].alarm.poll.fd,
|
||||
buffer,out_len);
|
||||
|
||||
if (config.debug.packetradio)
|
||||
DEBUGF("Wrote %d of %d bytes",written,out_len);
|
||||
interface->tx_bytes_pending=out_len;
|
||||
write_buffer(interface);
|
||||
|
||||
if (written<out_len) {
|
||||
int deferred=out_len-written;
|
||||
if (deferred>OVERLAY_INTERFACE_TX_BUFFER_SIZE)
|
||||
deferred=OVERLAY_INTERFACE_TX_BUFFER_SIZE;
|
||||
bcopy(&buffer[written],overlay_interfaces[interface_number].txbuffer,deferred);
|
||||
overlay_interfaces[interface_number].tx_bytes_pending=deferred;
|
||||
overlay_interfaces[interface_number].alarm.poll.events|=POLLOUT;
|
||||
watch(&overlay_interfaces[interface_number].alarm);
|
||||
if (config.debug.packetradio)
|
||||
DEBUGF("Buffered %d bytes for transmission",deferred);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -249,6 +249,23 @@ overlay_init_packet(struct outgoing_packet *packet, struct subscriber *destinati
|
||||
packet->header_length = ob_position(packet->buffer);
|
||||
}
|
||||
|
||||
int overlay_queue_schedule_next(time_ms_t next_allowed_packet){
|
||||
if (next_packet.alarm==0 || next_allowed_packet < next_packet.alarm){
|
||||
|
||||
if (!next_packet.function){
|
||||
next_packet.function=overlay_send_packet;
|
||||
send_packet.name="overlay_send_packet";
|
||||
next_packet.stats=&send_packet;
|
||||
}
|
||||
unschedule(&next_packet);
|
||||
next_packet.alarm=next_allowed_packet;
|
||||
// small grace period, we want to read incoming IO first
|
||||
next_packet.deadline=next_allowed_packet+15;
|
||||
schedule(&next_packet);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// update the alarm time and return 1 if changed
|
||||
static int
|
||||
overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
|
||||
@ -269,6 +286,9 @@ overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
|
||||
|
||||
time_ms_t next_allowed_packet=0;
|
||||
if (frame->interface){
|
||||
// don't include interfaces which are currently transmitting using a serial buffer
|
||||
if (frame->interface->tx_bytes_pending>0)
|
||||
return 0;
|
||||
next_allowed_packet = limit_next_allowed(&frame->interface->transfer_limit);
|
||||
}else{
|
||||
// check all interfaces
|
||||
@ -285,18 +305,7 @@ overlay_calc_queue_time(overlay_txqueue *queue, struct overlay_frame *frame){
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (next_packet.alarm==0 || next_allowed_packet < next_packet.alarm){
|
||||
if (!next_packet.function){
|
||||
next_packet.function=overlay_send_packet;
|
||||
send_packet.name="overlay_send_packet";
|
||||
next_packet.stats=&send_packet;
|
||||
}
|
||||
unschedule(&next_packet);
|
||||
next_packet.alarm=next_allowed_packet;
|
||||
// small grace period, we want to read incoming IO first
|
||||
next_packet.deadline=next_allowed_packet+15;
|
||||
schedule(&next_packet);
|
||||
}
|
||||
overlay_queue_schedule_next(next_allowed_packet);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -410,6 +419,10 @@ overlay_stuff_packet(struct outgoing_packet *packet, overlay_txqueue *queue, tim
|
||||
}
|
||||
|
||||
if (!packet->buffer){
|
||||
// don't transmit on an interface that uses the serial tx buffer
|
||||
if (frame->interface->tx_bytes_pending>0)
|
||||
goto skip;
|
||||
|
||||
// can we send a packet on this interface now?
|
||||
if (limit_is_allowed(&frame->interface->transfer_limit))
|
||||
goto skip;
|
||||
|
@ -213,8 +213,6 @@ int rhizome_direct_continue_sync_request(rhizome_direct_sync_request *r)
|
||||
int count=rhizome_direct_bundle_iterator_fill(r->cursor,-1);
|
||||
|
||||
DEBUGF("Got %d BARs",count);
|
||||
dump("BARs",r->cursor->buffer,
|
||||
r->cursor->buffer_used+r->cursor->buffer_offset_bytes);
|
||||
|
||||
r->dispatch_function(r);
|
||||
|
||||
|
@ -829,7 +829,6 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
|
||||
goto end;
|
||||
}
|
||||
close(sock);
|
||||
dump("response", actionlist, content_length);
|
||||
|
||||
/* We now have the list of (1+RHIZOME_BAR_PREFIX_BYTES)-byte records that indicate
|
||||
the list of BAR prefixes that differ between the two nodes. We can now action
|
||||
@ -846,9 +845,6 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
|
||||
for(i=10;i<content_length;i+=(1+RHIZOME_BAR_PREFIX_BYTES))
|
||||
{
|
||||
int type=actionlist[i];
|
||||
unsigned long long
|
||||
bid_prefix_ll=rhizome_bar_bidprefix_ll((unsigned char *)&actionlist[i+1]);
|
||||
DEBUGF("%s %016llx* @ 0x%x",type==1?"push":"pull",bid_prefix_ll,i);
|
||||
if (type==2&&r->pullP) {
|
||||
/* Need to fetch manifest. Once we have the manifest, then we can
|
||||
use our normal bundle fetch routines from rhizome_fetch.c
|
||||
@ -859,6 +855,7 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
|
||||
Then as noted above, we can use that to pull the file down using
|
||||
existing routines.
|
||||
*/
|
||||
DEBUGF("Fetching manifest %s* @ 0x%x",alloca_tohex(&actionlist[i], 1+RHIZOME_BAR_PREFIX_BYTES),i);
|
||||
if (!rhizome_fetch_request_manifest_by_prefix(&addr,zerosid,&actionlist[i+1],RHIZOME_BAR_PREFIX_BYTES))
|
||||
{
|
||||
/* Fetching the manifest, and then using it to see if we want to
|
||||
@ -1003,7 +1000,6 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
|
||||
if (m) rhizome_manifest_free(m);
|
||||
}
|
||||
next_item:
|
||||
DEBUGF("Next item.");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -514,7 +514,7 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
|
||||
/* TODO We should stream file straight into the database */
|
||||
slot->start_time=gettime_ms();
|
||||
if (create_rhizome_import_dir() == -1)
|
||||
return -1;
|
||||
return WHY("Unable to create import directory");
|
||||
if (slot->manifest) {
|
||||
slot->file_len=slot->manifest->fileLength;
|
||||
slot->rowid=
|
||||
@ -569,8 +569,10 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
|
||||
if (config.debug.rhizome_rx)
|
||||
DEBUGF("RHIZOME HTTP REQUEST family=%u addr=%s sid=%s port=%u %s",
|
||||
slot->peer_ipandport.sin_family,
|
||||
buf,
|
||||
alloca_tohex_sid(slot->peer_sid),
|
||||
buf, ntohs(slot->peer_ipandport.sin_port), alloca_str_toprint(slot->request)
|
||||
ntohs(slot->peer_ipandport.sin_port),
|
||||
alloca_str_toprint(slot->request)
|
||||
);
|
||||
slot->alarm.poll.fd = sock;
|
||||
/* Watch for activity on the socket */
|
||||
|
3
serval.h
3
serval.h
@ -533,6 +533,7 @@ overlay_node *overlay_route_find_node(const unsigned char *sid,int prefixLen,int
|
||||
int overlayServerMode();
|
||||
int overlay_payload_enqueue(struct overlay_frame *p);
|
||||
int overlay_queue_remaining(int queue);
|
||||
int overlay_queue_schedule_next(time_ms_t next_allowed_packet);
|
||||
int overlay_route_record_link( time_ms_t now, struct subscriber *to,
|
||||
struct subscriber *via,int sender_interface,
|
||||
unsigned int s1,unsigned int s2,int score,int gateways_en_route);
|
||||
@ -758,7 +759,7 @@ int fd_poll();
|
||||
void overlay_interface_discover(struct sched_ent *alarm);
|
||||
void overlay_packetradio_poll(struct sched_ent *alarm);
|
||||
int overlay_packetradio_setup_port(overlay_interface *interface);
|
||||
int overlay_packetradio_tx_packet(int interface_number,
|
||||
int overlay_packetradio_tx_packet(overlay_interface *interface,
|
||||
struct sockaddr_in *recipientaddr,
|
||||
unsigned char *bytes,int len);
|
||||
void overlay_dummy_poll(struct sched_ent *alarm);
|
||||
|
Loading…
x
Reference in New Issue
Block a user