Factor out filter setup from the reading and writing loops

This commit is contained in:
Eric Fischer 2016-12-08 15:43:52 -08:00
parent d1dc310bbc
commit d940eb1cef
2 changed files with 42 additions and 27 deletions

View File

@ -27,7 +27,7 @@ extern "C" {
#include "read_json.hpp" #include "read_json.hpp"
struct writer_arg { struct writer_arg {
int *pipe_orig; int write_to;
mvt_layer *layer; mvt_layer *layer;
unsigned z; unsigned z;
unsigned x; unsigned x;
@ -39,7 +39,7 @@ void *run_writer(void *a) {
// XXX worry about SIGPIPE? // XXX worry about SIGPIPE?
FILE *fp = fdopen(wa->pipe_orig[1], "w"); FILE *fp = fdopen(wa->write_to, "w");
if (fp == NULL) { if (fp == NULL) {
perror("fdopen (pipe writer)"); perror("fdopen (pipe writer)");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
@ -208,7 +208,7 @@ mvt_layer parse_layer(int fd, unsigned z, unsigned x, unsigned y, mvt_layer cons
static pthread_mutex_t pipe_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t pipe_lock = PTHREAD_MUTEX_INITIALIZER;
mvt_layer filter_layer(const char *filter, mvt_layer &layer, unsigned z, unsigned x, unsigned y) { void setup_filter(const char *filter, int *write_to, int *read_from, pid_t *pid, unsigned z, unsigned x, unsigned y) {
// This will create two pipes, a new thread, and a new process. // This will create two pipes, a new thread, and a new process.
// //
// The new process will read from one pipe and write to the other, and execute the filter. // The new process will read from one pipe and write to the other, and execute the filter.
@ -234,11 +234,11 @@ mvt_layer filter_layer(const char *filter, mvt_layer &layer, unsigned z, unsigne
std::string x_str = std::to_string(x); std::string x_str = std::to_string(x);
std::string y_str = std::to_string(y); std::string y_str = std::to_string(y);
pid_t pid = fork(); *pid = fork();
if (pid < 0) { if (*pid < 0) {
perror("fork"); perror("fork");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} else if (pid == 0) { } else if (*pid == 0) {
// child // child
if (dup2(pipe_orig[0], 0) < 0) { if (dup2(pipe_orig[0], 0) < 0) {
@ -297,37 +297,51 @@ mvt_layer filter_layer(const char *filter, mvt_layer &layer, unsigned z, unsigne
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
writer_arg wa; *write_to = pipe_orig[1];
wa.pipe_orig = pipe_orig; *read_from = pipe_filtered[0];
wa.layer = &layer; }
wa.z = z; }
wa.x = x;
wa.y = y;
pthread_t writer; mvt_layer filter_layer(const char *filter, mvt_layer &layer, unsigned z, unsigned x, unsigned y) {
if (pthread_create(&writer, NULL, run_writer, &wa) != 0) { int write_to, read_from;
perror("pthread_create (filter writer)"); pid_t pid;
exit(EXIT_FAILURE); setup_filter(filter, &write_to, &read_from, &pid, z, x, y);
}
layer = parse_layer(pipe_filtered[0], z, x, y, layer); writer_arg wa;
wa.write_to = write_to;
wa.layer = &layer;
wa.z = z;
wa.x = x;
wa.y = y;
pthread_t writer;
if (pthread_create(&writer, NULL, run_writer, &wa) != 0) {
perror("pthread_create (filter writer)");
exit(EXIT_FAILURE);
}
layer = parse_layer(read_from, z, x, y, layer);
if (close(read_from) != 0) {
perror("close output from filter");
exit(EXIT_FAILURE);
}
while (1) {
int stat_loc; int stat_loc;
if (waitpid(pid, &stat_loc, 0) < 0) { if (waitpid(pid, &stat_loc, 0) < 0) {
perror("waitpid for filter\n"); perror("waitpid for filter\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (WIFEXITED(stat_loc) || WIFSIGNALED(stat_loc)) {
if (close(pipe_filtered[0]) != 0) { break;
perror("close output from filter");
exit(EXIT_FAILURE);
} }
}
void *ret; void *ret;
if (pthread_join(writer, &ret) != 0) { if (pthread_join(writer, &ret) != 0) {
perror("pthread_join filter writer"); perror("pthread_join filter writer");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
}
} }
return layer; return layer;

View File

@ -1 +1,2 @@
mvt_layer filter_layer(const char *filter, mvt_layer &layer, unsigned z, unsigned x, unsigned y); mvt_layer filter_layer(const char *filter, mvt_layer &layer, unsigned z, unsigned x, unsigned y);
void setup_filter(const char *filter, int *write_to, int *read_from, pid_t *pid, unsigned z, unsigned x, unsigned y);