Push prefilter writing into a thread (but something is crashing)

This commit is contained in:
Eric Fischer 2016-12-09 14:01:07 -08:00
parent 9f10f48bfb
commit c8a8915064

166
tile.cpp
View File

@ -1294,6 +1294,82 @@ serial_feature next_feature(FILE *geoms, long long *geompos_in, char *metabase,
}
}
struct run_prefilter_args {
FILE *geoms;
long long *geompos_in;
char *metabase;
long long *meta_off;
int z;
unsigned tx;
unsigned ty;
unsigned *initial_x;
unsigned *initial_y;
long long *original_features;
long long *unclipped_features;
int nextzoom;
int maxzoom;
int minzoom;
int max_zoom_increment;
size_t pass;
size_t passes;
volatile long long *along;
long long alongminus;
int buffer;
int *within;
bool *first_time;
int line_detail;
FILE **geomfile;
long long *geompos;
volatile double *oprogress;
double todo;
const char *fname;
int child_shards;
std::vector<std::vector<std::string>> *layer_unmaps;
char *stringpool;
long long *pool_off;
FILE *prefilter_fp;
};
void *run_prefilter(void *v) {
run_prefilter_args *rpa = (run_prefilter_args *) v;
while (1) {
serial_feature sf = next_feature(rpa->geoms, rpa->geompos_in, rpa->metabase, rpa->meta_off, rpa->z, rpa->tx, rpa->ty, rpa->initial_x, rpa->initial_y, rpa->original_features, rpa->unclipped_features, rpa->nextzoom, rpa->maxzoom, rpa->minzoom, rpa->max_zoom_increment, rpa->pass, rpa->passes, rpa->along, rpa->alongminus, rpa->buffer, rpa->within, rpa->first_time, rpa->line_detail, rpa->geomfile, rpa->geompos, rpa->oprogress, rpa->todo, rpa->fname, rpa->child_shards);
if (sf.t < 0) {
break;
}
mvt_layer tmp_layer;
tmp_layer.extent = 1LL << 32;
tmp_layer.name = (*(rpa->layer_unmaps))[sf.segment][sf.layer];
mvt_feature tmp_feature;
tmp_feature.type = sf.t;
tmp_feature.geometry = to_feature(sf.geometry);
tmp_feature.id = sf.id;
tmp_feature.has_id = sf.has_id;
// Offset from tile coordinates back to world coordinates
unsigned sx = 0, sy = 0;
if (rpa->z != 0) {
sx = rpa->tx << (32 - rpa->z);
sy = rpa->ty << (32 - rpa->z);
}
for (size_t i = 0; i < tmp_feature.geometry.size(); i++) {
tmp_feature.geometry[i].x += sx;
tmp_feature.geometry[i].y += sy;
}
decode_meta(sf.m, sf.keys, sf.values, rpa->stringpool + rpa->pool_off[sf.segment], tmp_layer, tmp_feature);
tmp_layer.features.push_back(tmp_feature);
layer_to_geojson(rpa->prefilter_fp, tmp_layer, 0, 0, 0, false, true);
}
return NULL;
}
long long write_tile(FILE *geoms, long long *geompos_in, char *metabase, char *stringpool, int z, unsigned tx, unsigned ty, int detail, int min_detail, int basezoom, sqlite3 *outdb, double droprate, int buffer, const char *fname, FILE **geomfile, int minzoom, int maxzoom, double todo, volatile long long *along, long long alongminus, double gamma, int child_shards, long long *meta_off, long long *pool_off, unsigned *initial_x, unsigned *initial_y, volatile int *running, double simplification, std::vector<std::map<std::string, layermap_entry>> *layermaps, std::vector<std::vector<std::string>> *layer_unmaps, size_t pass, size_t passes, unsigned long long mingap, long long minextent, double fraction, const char *prefilter, const char *postfilter, write_tile_args *arg) {
int line_detail;
double merge_fraction = 1;
@ -1367,49 +1443,70 @@ long long write_tile(FILE *geoms, long long *geompos_in, char *metabase, char *s
int prefilter_write = -1, prefilter_read = -1;
pid_t prefilter_pid = 0;
FILE *prefilter_fd = NULL;
FILE *prefilter_fp = NULL;
pthread_t prefilter_writer;
run_prefilter_args rpa; // here so it stays in scope until joined
if (prefilter != NULL) {
setup_filter(prefilter, &prefilter_write, &prefilter_read, &prefilter_pid, z, tx, ty);
prefilter_fd = fdopen(prefilter_write, "w");
if (prefilter_fd == NULL) {
prefilter_fp = fdopen(prefilter_write, "w");
if (prefilter_fp == NULL) {
perror("freopen prefilter");
exit(EXIT_FAILURE);
}
rpa.geoms = geoms;
rpa.geompos_in = geompos_in;
rpa.metabase = metabase;
rpa.meta_off = meta_off;
rpa.z = z;
rpa.tx = tx;
rpa.ty = ty;
rpa.initial_x = initial_x;
rpa.initial_y = initial_y;
rpa.original_features = &original_features;
rpa.unclipped_features = &unclipped_features;
rpa.nextzoom = nextzoom;
rpa.maxzoom = maxzoom;
rpa.minzoom = minzoom;
rpa.max_zoom_increment = max_zoom_increment;
rpa.pass = pass;
rpa.passes = passes;
rpa.along = along;
rpa.alongminus = alongminus;
rpa.buffer = buffer;
rpa.within = within;
rpa.first_time = &first_time;
rpa.line_detail = line_detail;
rpa.geomfile = geomfile;
rpa.geompos = geompos;
rpa.oprogress = &oprogress;
rpa.todo = todo;
rpa.fname = fname;
rpa.child_shards = child_shards;
rpa.prefilter_fp = prefilter_fp;
rpa.layer_unmaps = layer_unmaps;
rpa.stringpool = stringpool;
rpa.pool_off = pool_off;
if (pthread_create(&prefilter_writer, NULL, run_prefilter, &rpa) != 0) {
perror("pthread_create (prefilter writer)");
exit(EXIT_FAILURE);
}
}
while (1) {
serial_feature sf = next_feature(geoms, geompos_in, metabase, meta_off, z, tx, ty, initial_x, initial_y, &original_features, &unclipped_features, nextzoom, maxzoom, minzoom, max_zoom_increment, pass, passes, along, alongminus, buffer, within, &first_time, line_detail, geomfile, geompos, &oprogress, todo, fname, child_shards);
serial_feature sf;
if (sf.t < 0) {
if (prefilter == NULL) {
sf = next_feature(geoms, geompos_in, metabase, meta_off, z, tx, ty, initial_x, initial_y, &original_features, &unclipped_features, nextzoom, maxzoom, minzoom, max_zoom_increment, pass, passes, along, alongminus, buffer, within, &first_time, line_detail, geomfile, geompos, &oprogress, todo, fname, child_shards);
} else {
// XXX parse prefilter
break;
}
if (prefilter != NULL) {
mvt_layer tmp_layer;
tmp_layer.extent = 1LL << 32;
tmp_layer.name = (*layer_unmaps)[sf.segment][sf.layer];
mvt_feature tmp_feature;
tmp_feature.type = sf.t;
tmp_feature.geometry = to_feature(sf.geometry);
tmp_feature.id = sf.id;
tmp_feature.has_id = sf.has_id;
// Offset from tile coordinates back to world coordinates
unsigned sx = 0, sy = 0;
if (z != 0) {
sx = tx << (32 - z);
sy = ty << (32 - z);
}
for (size_t i = 0; i < tmp_feature.geometry.size(); i++) {
tmp_feature.geometry[i].x += sx;
tmp_feature.geometry[i].y += sy;
}
decode_meta(sf.m, sf.keys, sf.values, stringpool + pool_off[sf.segment], tmp_layer, tmp_feature);
tmp_layer.features.push_back(tmp_feature);
layer_to_geojson(prefilter_fd, tmp_layer, 0, 0, 0, false, true);
if (sf.t < 0) {
break;
}
if (gamma > 0) {
@ -1485,7 +1582,7 @@ long long write_tile(FILE *geoms, long long *geompos_in, char *metabase, char *s
}
if (prefilter != NULL) {
if (fclose(prefilter_fd) != 0) {
if (fclose(prefilter_fp) != 0) {
perror("fclose output to prefilter");
exit(EXIT_FAILURE);
}
@ -1503,6 +1600,11 @@ long long write_tile(FILE *geoms, long long *geompos_in, char *metabase, char *s
break;
}
}
void *ret;
if (pthread_join(prefilter_writer, &ret) != 0) {
perror("pthread_join prefilter writer");
exit(EXIT_FAILURE);
}
}
first_time = false;