Replace macros with functions

SET_NONBLOCKING(), SET_BLOCKING(), WRITE_STR() are now set_nonblock(),
set_block() and write_str() respectively, all of which log an error before
returning -1.  There are other useful methods: write_all() treats anything less
than all bytes written as an error; write_nonblock() treats EAGAIN and EINTR as
zero bytes written, and a combination: write_all_nonblock().
This commit is contained in:
Andrew Bettison 2012-07-10 16:33:39 +09:30
parent 2758af36ee
commit 86eb482ed9
10 changed files with 167 additions and 89 deletions

View File

@ -18,6 +18,7 @@ SERVALD_SRC_FILES = \
serval-dna/dataformats.c \
serval-dna/dna.c \
serval-dna/log.c \
serval-dna/net.c \
serval-dna/mkdir.c \
serval-dna/strbuf.c \
serval-dna/gateway.c \

View File

@ -7,6 +7,7 @@ SRCS= main.c \
dna.c \
conf.c \
log.c \
net.c \
mkdir.c \
strbuf.c \
dna_identity.c \

View File

@ -171,7 +171,7 @@ dna_helper_enqueue(char *did, unsigned char *requestorSid) {
return WHY("Command to helper is too long");
sigPipeFlag = 0;
WRITE_STR(dna_helper_stdin, buffer);
write_str(dna_helper_stdin, buffer);
if (sigPipeFlag) {
/* Assume broken pipe due to dead helper.

View File

@ -177,12 +177,11 @@ int fd_poll()
if (r>0) {
for(i=0;i<fdcount;i++)
if (fds[i].revents) {
/* Make socket non-blocking */
SET_NONBLOCKING(fds[i].fd);
/* Call the alarm callback with the socket in non-blocking mode */
set_nonblock(fds[i].fd);
call_alarm(fd_callbacks[i], fds[i].revents);
/* Make socket blocking again */
SET_BLOCKING(fds[i].fd);
if (fds[i].fd != -1)
set_block(fds[i].fd);
}
}

View File

@ -106,13 +106,13 @@ int app_monitor_cli(int argc, const char *const *argv, struct command_line_optio
fdcount++;
}
WRITE_STR(fd, "monitor vomp\n");
WRITE_STR(fd, "monitor rhizome\n");
write_str(fd, "monitor vomp\n");
write_str(fd, "monitor rhizome\n");
if (sid!=NULL&&sid[0]) {
char msg[1024];
snprintf(msg,1024,"call %s 5551 5552\n",argv[1]);
WRITE_STR(fd, msg);
write_str(fd, msg);
}
char line[1024];
@ -132,9 +132,9 @@ int app_monitor_cli(int argc, const char *const *argv, struct command_line_optio
if (audev&&audev->poll_fds) fdcount+=audev->poll_fds(&fds[fdcount],128-fdcount);
poll(fds,fdcount,1000);
SET_NONBLOCKING(fd);
set_nonblock(fd);
if (interactiveP)
SET_NONBLOCKING(STDIN_FILENO);
set_nonblock(STDIN_FILENO);
int bytes;
int i;
@ -182,8 +182,9 @@ int app_monitor_cli(int argc, const char *const *argv, struct command_line_optio
audioRecordBufferBytes-=audioRecordBufferOffset;
}
SET_BLOCKING(fd);
SET_BLOCKING(STDIN_FILENO);
set_block(fd);
if (interactiveP)
set_block(STDIN_FILENO);
}
return 0;
@ -228,12 +229,12 @@ int processLine(char *cmd,unsigned char *data,int dataLen)
if (l_state<5&&l_id&&pipeAudio) {
// Take control of audio for this call, and let the java side know
snprintf(msg,1024,"FASTAUDIO:%x:1\n",l_id);
WRITE_STR(fd, msg);
write_str(fd, msg);
}
if (l_state==4&&autoAnswerP) {
// We are ringing, so pickup
sprintf(msg,"pickup %x\n",l_id);
WRITE_STR(fd, msg);
write_str(fd, msg);
}
if (l_state==5) {
if (fast_audio) {
@ -261,7 +262,7 @@ int processLine(char *cmd,unsigned char *data,int dataLen)
"qwertyuiopasdfghjklzxcvbnm123456"
"qwertyuiopasdfghjklzxcvbnm123456"
"qwertyuiopasdfghjklzxcvbnm123456",l_id,counter++);
WRITE_STR(fd, buffer);
write_str(fd, buffer);
printf("< *320:AUDIO:%x:8\\n<320 bytes>\n",l_id);
}
}

View File

@ -265,7 +265,8 @@ static void monitor_new_client(int s) {
uid_t otheruid;
struct monitor_context *c;
SET_NONBLOCKING(s);
if (set_nonblock(s) == -1)
goto error;
#ifdef linux
len = sizeof(ucred);
@ -288,12 +289,12 @@ static void monitor_new_client(int s) {
if (otheruid != getuid()) {
WHYF("monitor.socket client has wrong uid (%d versus %d)", otheruid,getuid());
WRITE_STR(s, "\nCLOSE:Incorrect UID\n");
write_str(s, "\nCLOSE:Incorrect UID\n");
goto error;
}
if (monitor_socket_count >= MAX_MONITOR_SOCKETS
||monitor_socket_count < 0) {
WRITE_STR(s, "\nCLOSE:All sockets busy\n");
write_str(s, "\nCLOSE:All sockets busy\n");
goto error;
}
@ -305,7 +306,7 @@ static void monitor_new_client(int s) {
c->alarm.poll.events=POLLIN;
c->line_length = 0;
c->state = MONITOR_STATE_COMMAND;
WRITE_STR(s,"\nMONITOR:You are talking to servald\n");
write_str(s,"\nMONITOR:You are talking to servald\n");
INFOF("Got %d clients", monitor_socket_count);
watch(&c->alarm);
@ -327,7 +328,7 @@ int monitor_process_command(struct monitor_context *c)
c->line_length=0;
if (strlen(cmd)>MONITOR_LINE_LENGTH) {
WRITE_STR(c->alarm.poll.fd,"\nERROR:Command too long\n");
write_str(c->alarm.poll.fd,"\nERROR:Command too long\n");
RETURN(-1);
}
@ -394,18 +395,18 @@ int monitor_process_command(struct monitor_context *c)
}
}
if (!gotSid)
WRITE_STR(c->alarm.poll.fd,"\nERROR:no known peers, so cannot place call\n");
write_str(c->alarm.poll.fd,"\nERROR:no known peers, so cannot place call\n");
} else {
// pack the binary representation of the sid into the same buffer.
if (stowSid((unsigned char*)sid, 0, sid) == -1)
WRITE_STR(c->alarm.poll.fd,"\nERROR:invalid SID, so cannot place call\n");
write_str(c->alarm.poll.fd,"\nERROR:invalid SID, so cannot place call\n");
else
gotSid = 1;
}
if (gotSid) {
int cn=0, in=0, kp=0;
if (!keyring_next_identity(keyring, &cn, &in, &kp))
WRITE_STR(c->alarm.poll.fd,"\nERROR:no local identity, so cannot place call\n");
write_str(c->alarm.poll.fd,"\nERROR:no local identity, so cannot place call\n");
else {
vomp_dial(keyring->contexts[cn]->identities[in]->keypairs[kp]->public_key, (unsigned char *)sid, localDid, remoteDid);
}
@ -433,7 +434,7 @@ int monitor_process_command(struct monitor_context *c)
int digit=vomp_parse_dtmf_digit(digits[i]);
if (digit<0) {
snprintf(msg,1024,"\nERROR: invalid DTMF digit 0x%02x\n",digit);
WRITE_STR(c->alarm.poll.fd,msg);
write_str(c->alarm.poll.fd,msg);
}
/* 80ms standard tone duration, so that it is a multiple
of the majority of codec time units (70ms is the nominal
@ -445,7 +446,7 @@ int monitor_process_command(struct monitor_context *c)
}
snprintf(msg,1024,"\nMONITORSTATUS:%d\n",c->flags);
WRITE_STR(c->alarm.poll.fd,msg);
write_str(c->alarm.poll.fd,msg);
RETURN(0);
}
@ -464,7 +465,7 @@ int monitor_process_data(struct monitor_context *c)
vomp_call_state *call=vomp_find_call_by_session(c->sample_call_session_token);
if (!call) {
WRITE_STR(c->alarm.poll.fd,"\nERROR:No such call\n");
write_str(c->alarm.poll.fd,"\nERROR:No such call\n");
RETURN(-1);
}
@ -489,24 +490,17 @@ int monitor_announce_bundle(rhizome_manifest *m)
sender,
recipient,
m->dataFileName?m->dataFileName:"");
for(i=monitor_socket_count -1;i>=0;i--)
{
if (!(monitor_sockets[i].flags&MONITOR_RHIZOME))
continue;
errno=0;
SET_NONBLOCKING(monitor_sockets[i].alarm.poll.fd);
WRITE_STR(monitor_sockets[i].alarm.poll.fd,msg);
SET_BLOCKING(monitor_sockets[i].alarm.poll.fd);
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
WHY_perror("write");
for(i=monitor_socket_count -1;i>=0;i--) {
if (monitor_sockets[i].flags & MONITOR_RHIZOME) {
if ( set_nonblock(monitor_sockets[i].alarm.poll.fd) == -1
|| write_str_nonblock(monitor_sockets[i].alarm.poll.fd, msg) == -1
|| set_block(monitor_sockets[i].alarm.poll.fd) == -1
) {
INFO("Tearing down monitor client");
monitor_client_close(&monitor_sockets[i]);
}
}
}
return 0;
}
@ -523,17 +517,13 @@ int monitor_call_status(vomp_call_state *call)
alloca_tohex_sid(call->remote.sid),
call->local.did,call->remote.did);
msg[1023]=0;
for(i=monitor_socket_count -1;i>=0;i--)
{
for(i=monitor_socket_count -1;i>=0;i--) {
if (!(monitor_sockets[i].flags&MONITOR_VOMP))
continue;
errno=0;
SET_NONBLOCKING(monitor_sockets[i].alarm.poll.fd);
WRITE_STR(monitor_sockets[i].alarm.poll.fd,msg);
SET_BLOCKING(monitor_sockets[i].alarm.poll.fd);
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
WHY_perror("write");
if ( set_nonblock(monitor_sockets[i].alarm.poll.fd) == -1
|| write_str_nonblock(monitor_sockets[i].alarm.poll.fd, msg) == -1
|| set_block(monitor_sockets[i].alarm.poll.fd) == -1
) {
INFOF("Tearing down monitor client #%d", i);
monitor_client_close(&monitor_sockets[i]);
}
@ -577,21 +567,17 @@ int monitor_tell_clients(char *msg, int msglen, int mask)
{
int i;
IN();
for(i=monitor_socket_count -1;i>=0;i--)
{
if (!(monitor_sockets[i].flags&mask))
continue;
errno=0;
SET_NONBLOCKING(monitor_sockets[i].alarm.poll.fd);
write(monitor_sockets[i].alarm.poll.fd, msg, msglen);
SET_BLOCKING(monitor_sockets[i].alarm.poll.fd);
// WHYF("Writing AUDIOPACKET to client");
if (errno&&(errno!=EINTR)&&(errno!=EAGAIN)) {
/* error sending update, so kill monitor socket */
WHY_perror("write");
for(i=monitor_socket_count -1;i>=0;i--) {
if (monitor_sockets[i].flags & mask) {
// DEBUG("Writing AUDIOPACKET to client");
if ( set_nonblock(monitor_sockets[i].alarm.poll.fd) == -1
|| write_all_nonblock(monitor_sockets[i].alarm.poll.fd, msg, msglen) == -1
|| set_block(monitor_sockets[i].alarm.poll.fd) == -1
) {
INFOF("Tearing down monitor client #%d", i);
monitor_client_close(&monitor_sockets[i]);
}
}
}
RETURN(0);
}

82
net.c Normal file
View File

@ -0,0 +1,82 @@
/*
Serval Distributed Numbering Architecture (DNA)
Copyright (C) 2012 Serval Project Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; either version 2
of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
#include "serval.h"
int set_nonblock(int fd)
{
int flags;
if ((flags = fcntl(fd, F_GETFL, NULL)) == -1)
return WHY_perror("fcntl(F_GETFL)");
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
return WHY_perror("fcntl(F_SETFL)");
return 0;
}
int set_block(int fd)
{
int flags;
if ((flags = fcntl(fd, F_GETFL, NULL)) == -1)
return WHY_perror("fcntl(F_GETFL)");
if (fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == -1)
return WHY_perror("fcntl(F_SETFL)");
return 0;
}
int write_all(int fd, const char *buf, size_t len)
{
ssize_t written = write(fd, buf, len);
if (written == -1)
return WHY_perror("write");
if (written != len)
return WHYF("write(%u bytes) returned %d", len, written);
return written;
}
int write_nonblock(int fd, const char *buf, size_t len)
{
ssize_t written = write(fd, buf, len);
if (written == -1) {
switch (errno) {
case EAGAIN:
case EINTR:
return 0;
}
return WHY_perror("write");
}
return written;
}
int write_all_nonblock(int fd, const char *buf, size_t len)
{
ssize_t written = write_nonblock(fd, buf, len);
if (written != -1 && written != len)
return WHYF("write(%u bytes) returned %d", len, written);
return written;
}
int write_str(int fd, const char *str)
{
return write_all(fd, str, strlen(str));
}
int write_str_nonblock(int fd, const char *str)
{
return write_all_nonblock(fd, str, strlen(str));
}

View File

@ -1155,10 +1155,10 @@ int overlay_mdp_send(overlay_mdp_frame *mdp,int flags,int timeout_ms)
if (!FORM_SERVAL_INSTANCE_PATH(name.sun_path, "mdp.socket"))
return -1;
SET_NONBLOCKING(mdp_client_socket);
set_nonblock(mdp_client_socket);
int result=sendto(mdp_client_socket, mdp, len, 0,
(struct sockaddr *)&name, sizeof(struct sockaddr_un));
SET_BLOCKING(mdp_client_socket);
set_block(mdp_client_socket);
if (result<0) {
mdp->packetTypeAndFlags=MDP_ERROR;
mdp->error.error=1;
@ -1295,9 +1295,9 @@ int overlay_mdp_recv(overlay_mdp_frame *mdp,int *ttl)
mdp->packetTypeAndFlags=0;
/* Check if reply available */
SET_NONBLOCKING(mdp_client_socket);
set_nonblock(mdp_client_socket);
ssize_t len = recvwithttl(mdp_client_socket,(unsigned char *)mdp, sizeof(overlay_mdp_frame),ttl,recvaddr,&recvaddrlen);
SET_BLOCKING(mdp_client_socket);
set_block(mdp_client_socket);
recvaddr_un=(struct sockaddr_un *)recvaddr;
/* Null terminate received address so that the stat() call below can succeed */

View File

@ -635,22 +635,27 @@ int rhizome_queue_manifest_import(rhizome_manifest *m, struct sockaddr_in *peeri
if (peerip)
{
/* Transfer via HTTP over IPv4 */
int sock = socket(AF_INET,SOCK_STREAM,0);
SET_NONBLOCKING(sock);
struct sockaddr_in peeraddr;
bcopy(peerip,&peeraddr,sizeof(peeraddr));
//peeraddr.sin_port=htons(RHIZOME_HTTP_PORT);
DEBUG("Initiating HTTP connection for transfer");
int r=connect(sock,(struct sockaddr*)&peeraddr,sizeof(peeraddr));
if ((errno!=EINPROGRESS)&&(r!=0)) {
WHY("Failed to open socket to peer's rhizome web server");
WHY_perror("connect");
close (sock);
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1)
return WHY_perror("socket");
if (set_nonblock(sock) == -1) {
close(sock);
return -1;
}
rhizome_file_fetch_record
*q=&file_fetch_queue[rhizome_file_fetch_queue_count];
INFOF("HTTP CONNECT family=%u port=%u addr=%u.%u.%u.%u",
peerip->sin_family, peerip->sin_port,
((unsigned char*)&peerip->sin_addr.s_addr)[0],
((unsigned char*)&peerip->sin_addr.s_addr)[1],
((unsigned char*)&peerip->sin_addr.s_addr)[2],
((unsigned char*)&peerip->sin_addr.s_addr)[3]
);
if (connect(sock, (struct sockaddr*)peerip, sizeof *peerip) == -1 && errno != EINPROGRESS) {
WHY_perror("connect");
WHY("Failed to open socket to peer's rhizome web server");
close(sock);
return -1;
}
rhizome_file_fetch_record *q=&file_fetch_queue[rhizome_file_fetch_queue_count];
q->manifest = m;
*manifest_kept = 1;
q->alarm.poll.fd=sock;

View File

@ -1560,7 +1560,13 @@ void sigIoHandler(int signal);
#define DEFAULT_MONITOR_SOCKET_NAME "org.servalproject.servald.monitor.socket"
#define DEFAULT_MDP_SOCKET_NAME "org.servalproject.servald.mdp.socket"
#define WRITE_STR(fd, str) write(fd, str, strlen(str))
int set_nonblock(int fd);
int set_block(int fd);
int write_all(int fd, const char *buf, size_t len);
int write_nonblock(int fd, const char *buf, size_t len);
int write_all_nonblock(int fd, const char *buf, size_t len);
int write_str(int fd, const char *str);
int write_str_nonblock(int fd, const char *str);
int rhizome_http_server_start();
int overlay_mdp_setup_sockets();
@ -1598,6 +1604,3 @@ int fd_func_enter(struct call_stats *this_call);
#define IN() static struct profile_total _aggregate_stats={NULL,0,__FUNCTION__,0,0,0}; struct call_stats _this_call; fd_func_enter(&_this_call);
#define OUT() fd_func_exit(&_this_call, &_aggregate_stats);
#define RETURN(X) { OUT() return(X); }
#define SET_NONBLOCKING(X) fcntl(X,F_SETFL,fcntl(X, F_GETFL, NULL)|O_NONBLOCK);
#define SET_BLOCKING(X) fcntl(X,F_SETFL,fcntl(X, F_GETFL, NULL)&(~O_NONBLOCK));