Add support for running and stopping servald from a JVM thread

fdpoll will now run a callback when about to sleep / woke up.
A new Java interface to indicate server started / sleeping / waking up.
An android implementation may allow the CPU to sleep.
If there's a java exception the server will try to shutdown.
Calling servald stop is currently undefined.
This commit is contained in:
Jeremy Lakeman 2015-05-25 11:46:37 +09:30
parent 106f5d04d9
commit 51ed6162dd
22 changed files with 343 additions and 67 deletions

3
cli.h
View File

@ -28,8 +28,9 @@
#ifdef UNUSED
# undef UNUSED
#endif
int Throw(JNIEnv *env, const char *class, const char *msg);
#endif
#include "xprintf.h"
#include "log.h"

View File

@ -129,6 +129,10 @@ int initJniTypes(JNIEnv *env)
IJniResults = (*env)->FindClass(env, "org/servalproject/servaldna/IJniResults");
if (IJniResults==NULL)
return Throw(env, "java/lang/IllegalStateException", "Unable to locate class org.servalproject.servaldna.IJniResults");
// make sure the interface class cannot be garbage collected between invocations in the same process
IJniResults = (jclass)(*env)->NewGlobalRef(env, IJniResults);
if (IJniResults==NULL)
return Throw(env, "java/lang/IllegalStateException", "Unable to create global ref to class org.servalproject.servaldna.IJniResults");
startResultSet = (*env)->GetMethodID(env, IJniResults, "startResultSet", "(I)V");
if (startResultSet==NULL)
return Throw(env, "java/lang/IllegalStateException", "Unable to locate method startResultSet");

View File

@ -40,7 +40,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "keyring.h"
#include "serval.h" // for overlay_send_frame()
struct subscriber *directory_service;
__thread struct subscriber *directory_service;
static void directory_update(struct sched_ent *alarm);

View File

@ -55,13 +55,13 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "strbuf_helpers.h"
#define MAX_WATCHED_FDS 128
struct pollfd fds[MAX_WATCHED_FDS];
int fdcount=0;
struct sched_ent *fd_callbacks[MAX_WATCHED_FDS];
__thread struct pollfd fds[MAX_WATCHED_FDS];
__thread int fdcount=0;
__thread struct sched_ent *fd_callbacks[MAX_WATCHED_FDS];
struct sched_ent *wake_list=NULL;
struct sched_ent *run_soon=NULL;
struct sched_ent *run_now=NULL;
__thread struct sched_ent *wake_list=NULL;
__thread struct sched_ent *run_soon=NULL;
__thread struct sched_ent *run_now=NULL;
struct profile_total poll_stats={NULL,0,"Idle (in poll)",0,0,0,0};
@ -324,7 +324,8 @@ static void call_alarm(struct sched_ent *alarm, int revents)
OUT();
}
int fd_poll()
int fd_poll2(time_ms_t (*waiting)(time_ms_t, time_ms_t, time_ms_t), void (*wokeup)())
{
IN();
@ -338,30 +339,48 @@ int fd_poll()
RETURN(1);
}
time_ms_t ms;
if (run_now){
ms=0;
}else if (wake_list){
ms = (wake_list->wake_at - gettime_ms());
if (ms<0)
ms = 0;
}else if(fdcount==0){
// nothing to do? we need to return instead of waiting forever.
if (!run_now && !wake_list && fdcount==0)
RETURN(0);
}else{
ms =-1;
time_ms_t wait;
if (run_now)
wait = 0;
else {
time_ms_t next_run=TIME_MS_NEVER_WILL;
if(run_soon)
next_run = run_soon->run_after;
time_ms_t next_wake=TIME_MS_NEVER_WILL;
if (wake_list)
next_wake = wake_list->wake_at;
time_ms_t wait_until;
time_ms_t now = gettime_ms();
if (waiting)
wait_until = waiting(now, next_run, next_wake);
else
wait_until = next_wake;
if (wait_until==TIME_MS_NEVER_WILL)
wait = -1;
else if (wait_until < now)
wait = 0;
else
wait = wait_until - now;
}
// check for IO and/or wait for the next wake_at
int r=0;
if (fdcount || ms>0){
if (fdcount || wait>0){
struct call_stats call_stats;
call_stats.totals=&poll_stats;
fd_func_enter(__HERE__, &call_stats);
if (fdcount==0){
sleep_ms(ms);
sleep_ms(wait);
}else{
r = poll(fds, fdcount, ms);
r = poll(fds, fdcount, wait);
if (config.debug.io) {
strbuf b = strbuf_alloca(1024);
int i;
@ -373,17 +392,20 @@ int fd_poll()
strbuf_puts(b, "->");
strbuf_append_poll_events(b, fds[i].revents);
}
DEBUGF("poll(fds=(%s), fdcount=%d, ms=%d) -> %d", strbuf_str(b), fdcount, ms, r);
DEBUGF("poll(fds=(%s), fdcount=%d, ms=%d) -> %d", strbuf_str(b), fdcount, wait, r);
}
}
fd_func_exit(__HERE__, &call_stats);
}
if (wokeup && !run_now)
wokeup();
move_run_list();
// We don't want a single alarm to be able to reschedule itself and starve all IO
// So we only check for new overdue alarms if we attempted to sleep
if (ms && run_now && run_now->run_before <= gettime_ms())
if (wait && run_now && run_now->run_before <= gettime_ms())
RETURN(1);
// process all watched IO handles once (we need to be fair)

View File

@ -147,7 +147,8 @@ int _unwatch(struct __sourceloc, struct sched_ent *alarm);
#define unschedule(alarm) _unschedule(__WHENCE__, alarm)
#define watch(alarm) _watch(__WHENCE__, alarm)
#define unwatch(alarm) _unwatch(__WHENCE__, alarm)
int fd_poll();
int fd_poll2(time_ms_t (*waiting)(time_ms_t, time_ms_t, time_ms_t), void (*wokeup)());
#define fd_poll() fd_poll2(NULL, NULL)
/* function timing routines */
int fd_clearstats();

15
httpd.c
View File

@ -215,13 +215,6 @@ error:
return WHY("Failed to start HTTP server");
success:
server_write_proc_state("http_port", "%d", port);
INFOF("HTTP SERVER START port=%"PRIu16" fd=%d services=RESTful%s%s",
port,
httpd_server_socket,
config.rhizome.http.enable ? ",Rhizome" : "",
config.rhizome.api.addfile.uri_path[0] ? ",RhizomeDirect" : ""
);
httpd_server_port = port;
/* Add Rhizome HTTPd server to list of file descriptors to watch */
server_alarm.function = httpd_server_poll;
@ -229,6 +222,14 @@ success:
server_alarm.poll.fd = httpd_server_socket;
server_alarm.poll.events = POLLIN;
watch(&server_alarm);
INFOF("HTTP SERVER START port=%"PRIu16" fd=%d services=RESTful%s%s",
httpd_server_port,
httpd_server_socket,
config.rhizome.http.enable ? ",Rhizome" : "",
config.rhizome.api.addfile.uri_path[0] ? ",RhizomeDirect" : ""
);
return 0;
}

View File

@ -68,6 +68,8 @@ public class ServalDCommand
*/
private static native int rawCommand(IJniResults results, String[] args);
public static native int server(IJniServer callback, String keyringPin, String[] entryPins);
/**
* Common entry point into servald command line.
*

View File

@ -35,6 +35,17 @@ public class ServerControl {
return loopbackMdpPort;
}
public int getPid() {
return pid;
}
protected void setStatus(String instancePath, int pid, int mdpInetPort, int httpPort){
this.instancePath = instancePath;
this.pid = pid;
this.loopbackMdpPort = mdpInetPort;
this.httpPort = httpPort;
}
private void setStatus(ServalDCommand.Status result){
loopbackMdpPort = result.mdpInetPort;
pid = result.pid;
@ -42,7 +53,7 @@ public class ServerControl {
instancePath = result.instancePath;
}
private void clearStatus(){
protected void clearStatus(){
loopbackMdpPort = 0;
pid = 0;
httpPort = 0;

View File

@ -33,7 +33,7 @@ public abstract class MeshMSException extends Exception
public final URL url;
public MeshMSException(String message, URL url) {
super(message + "; " + url);
super(message + "; " + (url==null?"(null)":url));
this.url = url;
}

View File

@ -2,6 +2,7 @@ package org.servalproject.test;
import org.servalproject.servaldna.AsyncResult;
import org.servalproject.servaldna.ChannelSelector;
import org.servalproject.servaldna.IJniServer;
import org.servalproject.servaldna.MdpDnaLookup;
import org.servalproject.servaldna.MdpServiceLookup;
import org.servalproject.servaldna.ResultList;
@ -59,6 +60,58 @@ public class CommandLine {
lookup.close();
}
private static Runnable server = new Runnable() {
@Override
public void run() {
try {
ServalDCommand.server(new IJniServer() {
@Override
public long aboutToWait(long now, long nextRun, long nextWake) {
if (stopNow)
throw new ServerStopped();
return nextWake;
}
@Override
public void wokeUp() {
if (stopNow)
throw new ServerStopped();
}
@Override
public void started(String instancePath, int pid, int mdpPort, int httpPort) {
System.out.println("Started instance " + instancePath);
synchronized (server) {
server.notifyAll();
}
}
}, "", new String[]{""});
}catch (ServerStopped e){
}
}
};
private static class ServerStopped extends RuntimeException{}
private static boolean stopNow = false;
private static void server() throws InterruptedException, ServalDFailureException {
System.out.println("Starting server thread");
Thread serverThread = new Thread(server, "server");
serverThread.start();
synchronized (server){
server.wait();
}
Thread.sleep(500);
stopNow = true;
ServalDCommand.configSync();
serverThread.join();
}
public static void main(String... args){
if (args.length<1)
return;
@ -66,6 +119,8 @@ public class CommandLine {
try {
String methodName = args[0];
Object result=null;
if (methodName.equals("server"))
server();
if (methodName.equals("start"))
result=ServalDCommand.serverStart();
if (methodName.equals("stop"))

View File

@ -97,10 +97,11 @@ int keyring_release_identity(keyring_iterator *it);
#define KEYTYPE_PUBLIC_TAG 0x05
/* handle to keyring file for use in running instance */
extern keyring_file *keyring;
extern __thread keyring_file *keyring;
/* Public calls to keyring management */
keyring_file *keyring_create_instance();
keyring_file *keyring_open_instance(const char *pin);
keyring_file *keyring_open_instance_cli(const struct cli_parsed *parsed);
int keyring_enter_pin(keyring_file *k, const char *pin);
int keyring_set_did(keyring_identity *id, const char *did, const char *name);

View File

@ -24,8 +24,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "keyring.h"
#include "strbuf_helpers.h"
extern keyring_file *keyring;
#define keyring_TOKEN_STRLEN (BASE64_ENCODED_LEN(sizeof(rhizome_bid_t) + sizeof(uint64_t)))
#define alloca_keyring_token(bid, offset) keyring_ token_to_str(alloca(keyring_TOKEN_STRLEN + 1), (bid), (offset))

View File

@ -58,9 +58,9 @@ struct tree_node{
};
};
static struct tree_node root;
static __thread struct tree_node root;
struct subscriber *my_subscriber=NULL;
__thread struct subscriber *my_subscriber=NULL;
static unsigned char get_nibble(const unsigned char *sidp, int pos)
{

View File

@ -111,8 +111,8 @@ struct decode_context{
struct subscriber *point_to_point_device;
};
extern struct subscriber *my_subscriber;
extern struct subscriber *directory_service;
extern __thread struct subscriber *my_subscriber;
extern __thread struct subscriber *directory_service;
struct subscriber *_find_subscriber(struct __sourceloc, const unsigned char *sid, int len, int create);
#define find_subscriber(sid, len, create) _find_subscriber(__WHENCE__, sid, len, create)

View File

@ -65,6 +65,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "socket.h"
#include "server.h"
uint16_t mdp_loopback_port;
static void overlay_mdp_poll(struct sched_ent *alarm);
static void mdp_poll2(struct sched_ent *alarm);
static int overlay_mdp_releasebindings(struct socket_address *client);
@ -120,6 +122,7 @@ void overlay_mdp_clean_socket_files()
}
closedir(dir);
}
mdp_loopback_port=0;
}
static void overlay_mdp_fill_legacy(
@ -209,8 +212,10 @@ int overlay_mdp_setup_sockets()
fd = -1;
mdp_sock2_inet.poll.events = POLLIN;
watch(&mdp_sock2_inet);
server_write_proc_state("mdp_inet_port", "%d", port);
mdp_loopback_port = port;
INFOF("Socket mdp.2.inet: fd=%d %s", fd, alloca_socket_address(&addr));
break;
}

View File

@ -50,8 +50,8 @@
#include "fdqueue.h"
#include "conf.h"
struct profile_total *stats_head=NULL;
struct call_stats *current_call=NULL;
__thread struct profile_total *stats_head=NULL;
__thread struct call_stats *current_call=NULL;
void fd_clearstat(struct profile_total *s){
s->max_time = 0;

View File

@ -355,7 +355,7 @@ int rhizome_fetch_delay_ms();
#define RHIZOME_BLOB_SUBDIR "blob"
extern sqlite3 *rhizome_db;
extern __thread sqlite3 *rhizome_db;
serval_uuid_t rhizome_db_uuid;
int rhizome_opendb();

View File

@ -44,7 +44,7 @@ static int create_rhizome_store_dir()
return emkdirs_info(rdpath, 0700);
}
sqlite3 *rhizome_db = NULL;
__thread sqlite3 *rhizome_db = NULL;
serval_uuid_t rhizome_db_uuid;
static time_ms_t rhizomeRetryLimit = -1;

View File

@ -272,6 +272,7 @@ int dna_helper_shutdown();
int dna_helper_enqueue(struct subscriber *source, mdp_port_t source_port, const char *did);
int parseDnaReply(const char *buf, size_t len, char *token, char *did, char *name, char *uri, const char **bufp);
extern uint16_t mdp_loopback_port;
int overlay_mdp_setup_sockets();
int overlay_packetradio_setup_port(struct overlay_interface *interface);

201
server.c
View File

@ -37,16 +37,20 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "server.h"
#include "keyring.h"
#include "commandline.h"
#include "mdp_client.h"
#define PROC_SUBDIR "proc"
#define PIDFILE_NAME "servald.pid"
#define STOPFILE_NAME "servald.stop"
int serverMode = 0;
keyring_file *keyring=NULL;
__thread int serverMode = 0;
__thread keyring_file *keyring=NULL;
static char pidfile_path[256];
static int server_getpid = 0;
static int server_bind();
static void server_loop();
static int server();
static int server_write_pid();
static int server_unlink_pid();
static void signal_handler(int signal);
@ -105,9 +109,142 @@ static const char *_server_pidfile_path(struct __sourceloc __whence)
return pidfile_path;
}
static int server()
#ifdef HAVE_JNI_H
JNIEnv *server_env=NULL;
jclass IJniServer= NULL;
jmethodID aboutToWait, wokeUp, started;
jobject JniCallback;
JNIEXPORT jint JNICALL Java_org_servalproject_servaldna_ServalDCommand_server(
JNIEnv *env, jobject UNUSED(this), jobject callback, jobject keyring_pin, jobjectArray entry_pins)
{
if (!IJniServer){
IJniServer = (*env)->FindClass(env, "org/servalproject/servaldna/IJniServer");
if (IJniServer==NULL)
return Throw(env, "java/lang/IllegalStateException", "Unable to locate class org.servalproject.servaldna.IJniServer");
// make sure the interface class cannot be garbage collected between invocations
IJniServer = (jclass)(*env)->NewGlobalRef(env, IJniServer);
if (IJniServer==NULL)
return Throw(env, "java/lang/IllegalStateException", "Unable to create global ref to class org.servalproject.servaldna.IJniServer");
aboutToWait = (*env)->GetMethodID(env, IJniServer, "aboutToWait", "(JJJ)J");
if (aboutToWait==NULL)
return Throw(env, "java/lang/IllegalStateException", "Unable to locate method aboutToWait");
wokeUp = (*env)->GetMethodID(env, IJniServer, "wokeUp", "()V");
if (wokeUp==NULL)
return Throw(env, "java/lang/IllegalStateException", "Unable to locate method wokeUp");
started = (*env)->GetMethodID(env, IJniServer, "started", "(Ljava/lang/String;III)V");
if (started==NULL)
return Throw(env, "java/lang/IllegalStateException", "Unable to locate method started");
}
int pid = server_pid();
if (pid < 0)
return -1;
if (pid>0)
return 1;
int ret = -1;
{
const char *cpin = keyring_pin?(*env)->GetStringUTFChars(env, keyring_pin, NULL):NULL;
if (cpin != NULL){
keyring = keyring_open_instance(cpin);
(*env)->ReleaseStringUTFChars(env, keyring_pin, cpin);
}else{
keyring = keyring_open_instance("");
}
}
// Always open all PIN-less entries.
keyring_enter_pin(keyring, "");
if (entry_pins){
jsize len = (*env)->GetArrayLength(env, entry_pins);
jsize i;
for (i = 0; i < len; ++i) {
const jstring pin = (jstring)(*env)->GetObjectArrayElement(env, entry_pins, i);
if ((*env)->ExceptionCheck(env))
goto end;
const char *cpin = (*env)->GetStringUTFChars(env, pin, NULL);
if (cpin != NULL){
keyring_enter_pin(keyring, cpin);
(*env)->ReleaseStringUTFChars(env, pin, cpin);
}
}
}
if (keyring_seed(keyring) == -1)
goto end;
if (server_env)
goto end;
server_env = env;
JniCallback = (*env)->NewGlobalRef(env, callback);
ret = server_bind();
if (ret==-1)
goto end;
{
jstring str = (jstring)(*env)->NewStringUTF(env, instance_path());
(*env)->CallVoidMethod(env, callback, started, str, getpid(), mdp_loopback_port, httpd_server_port);
(*env)->DeleteLocalRef(env, str);
}
server_loop();
end:
server_env=NULL;
if (JniCallback){
(*env)->DeleteGlobalRef(env, JniCallback);
JniCallback = NULL;
}
if (keyring)
keyring_free(keyring);
keyring = NULL;
return ret;
}
static time_ms_t waiting(time_ms_t now, time_ms_t next_run, time_ms_t next_wakeup)
{
if (server_env && JniCallback){
jlong r = (*server_env)->CallLongMethod(server_env, JniCallback, aboutToWait, (jlong)now, (jlong)next_run, (jlong)next_wakeup);
// stop the server if there are any issues
if ((*server_env)->ExceptionCheck(server_env)){
serverMode=SERVER_CLOSING;
INFO("Stopping server due to exception");
return now;
}
return r;
}
return next_wakeup;
}
static void wokeup()
{
if (server_env && JniCallback){
(*server_env)->CallVoidMethod(server_env, JniCallback, wokeUp);
// stop the server if there are any issues
if ((*server_env)->ExceptionCheck(server_env)){
INFO("Stopping server due to exception");
serverMode=SERVER_CLOSING;
}
}
}
#else
#define waiting NULL
#define wokeup NULL
#endif
static int server_bind()
{
IN();
serverMode = SERVER_RUNNING;
// Warn, not merely Info, if there is no configured log file.
@ -129,16 +266,22 @@ static int server()
to take very long.
Try to perform only minimal CPU or IO processing here.
*/
if (overlay_mdp_setup_sockets()==-1)
RETURN(-1);
if (overlay_mdp_setup_sockets()==-1){
serverMode = 0;
return -1;
}
if (monitor_setup_sockets()==-1)
RETURN(-1);
if (monitor_setup_sockets()==-1){
serverMode = 0;
return -1;
}
// start the HTTP server if enabled
if (httpd_server_start(HTTPD_PORT, HTTPD_PORT_MAX)==-1)
RETURN(-1);
if (httpd_server_start(HTTPD_PORT, HTTPD_PORT_MAX)==-1){
serverMode = 0;
return -1;
}
/* For testing, it can be very helpful to delay the start of the server process, for example to
* check that the start/stop logic is robust.
*/
@ -150,9 +293,11 @@ static int server()
}
/* record PID file so that servald start can return */
if (server_write_pid())
RETURN(-1);
if (server_write_pid()){
serverMode = 0;
return -1;
}
overlay_queue_init();
time_ms_t now = gettime_ms();
@ -167,13 +312,18 @@ static int server()
/* Calculate (and possibly show) CPU usage stats periodically */
RESCHEDULE(&ALARM_STRUCT(fd_periodicstats), now+3000, TIME_MS_NEVER_WILL, now+3500);
return 0;
}
static void server_loop()
{
cf_on_config_change();
// log message used by tests to wait for the server to start
INFO("Server initialised, entering main loop");
INFOF("Server initialised, entering main loop");
/* Check for activitiy and respond to it */
while((serverMode==SERVER_RUNNING) && fd_poll())
while((serverMode==SERVER_RUNNING) && fd_poll2(waiting, wokeup))
;
serverCleanUp();
@ -182,6 +332,16 @@ static int server()
* if the code reaches here, the check has been done recently.
*/
server_unlink_pid();
serverMode = 0;
}
static int server()
{
IN();
if (server_bind()==-1)
RETURN(-1);
server_loop();
// note that we haven't tried to free all types of allocated memory used by the server.
// so it's safer to force this process to close, instead of trying to release everything.
@ -191,16 +351,21 @@ static int server()
static int server_write_pid()
{
server_write_proc_state("http_port", "%d", httpd_server_port);
server_write_proc_state("mdp_inet_port", "%d", mdp_loopback_port);
/* Record PID to advertise that the server is now running */
const char *ppath = server_pidfile_path();
if (ppath == NULL)
return -1;
FILE *f = fopen(ppath, "w");
if (!f)
return WHYF_perror("fopen(%s,\"w\")", alloca_str_toprint(ppath));
server_getpid = getpid();
fprintf(f,"%d\n", server_getpid);
fclose(f);
return 0;
}
@ -503,7 +668,6 @@ static int app_server_start(const struct cli_parsed *parsed, struct cli_context
if (config.debug.verbose)
DEBUG_cli_parsed(parsed);
/* Process optional arguments */
int pid=-1;
int cpid=-1;
const char *execpath;
if (cli_arg(parsed, "exec", &execpath, cli_absolute_path, NULL) == -1)
@ -516,8 +680,7 @@ static int app_server_start(const struct cli_parsed *parsed, struct cli_context
network interfaces that we will take interest in. */
if (config.interfaces.ac == 0)
NOWHENCE(WARN("No network interfaces configured (empty 'interfaces' config option)"));
if (pid == -1)
pid = server_pid();
int pid = server_pid();
if (pid < 0)
RETURN(-1);
int ret = -1;

View File

@ -28,7 +28,7 @@ DECLARE_ALARM(server_config_reload);
DECLARE_ALARM(rhizome_sync_announce);
DECLARE_ALARM(fd_periodicstats);
extern int serverMode;
extern __thread int serverMode;
int server_pid();
int server_write_proc_state(const char *path, const char *fmt, ...);

View File

@ -161,4 +161,15 @@ teardown_serviceDiscovery() {
report_all_servald_servers
}
doc_servaldThread="Start servald in a JVM thread"
setup_servaldThread() {
setup
set_instance +A
create_single_identity
}
test_servaldThread() {
executeJavaOk --timeout=10 org.servalproject.test.CommandLine 'server'
tfw_cat --stdout --stderr
}
runTests "$@"