From c8a89150643cb64a13042c76275292a14fb4645c Mon Sep 17 00:00:00 2001
From: Eric Fischer <enf@pobox.com>
Date: Fri, 9 Dec 2016 14:01:07 -0800
Subject: [PATCH] Push prefilter writing into a thread (but something is
 crashing)

---
 tile.cpp | 166 ++++++++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 134 insertions(+), 32 deletions(-)

diff --git a/tile.cpp b/tile.cpp
index 02e5e8e..d57003a 100644
--- a/tile.cpp
+++ b/tile.cpp
@@ -1294,6 +1294,82 @@ serial_feature next_feature(FILE *geoms, long long *geompos_in, char *metabase,
 	}
 }
 
+struct run_prefilter_args {
+	FILE *geoms;
+	long long *geompos_in;
+	char *metabase;
+	long long *meta_off;
+	int z;
+	unsigned tx;
+	unsigned ty;
+	unsigned *initial_x;
+	unsigned *initial_y;
+	long long *original_features;
+	long long *unclipped_features;
+	int nextzoom;
+	int maxzoom;
+	int minzoom;
+	int max_zoom_increment;
+	size_t pass;
+	size_t passes;
+	volatile long long *along;
+	long long alongminus;
+	int buffer;
+	int *within;
+	bool *first_time;
+	int line_detail;
+	FILE **geomfile;
+	long long *geompos;
+	volatile double *oprogress;
+	double todo;
+	const char *fname;
+	int child_shards;
+	std::vector<std::vector<std::string>> *layer_unmaps;
+	char *stringpool;
+	long long *pool_off;
+
+	FILE *prefilter_fp;
+};
+
+void *run_prefilter(void *v) {
+	run_prefilter_args *rpa = (run_prefilter_args *) v;
+
+	while (1) {
+		serial_feature sf = next_feature(rpa->geoms, rpa->geompos_in, rpa->metabase, rpa->meta_off, rpa->z, rpa->tx, rpa->ty, rpa->initial_x, rpa->initial_y, rpa->original_features, rpa->unclipped_features, rpa->nextzoom, rpa->maxzoom, rpa->minzoom, rpa->max_zoom_increment, rpa->pass, rpa->passes, rpa->along, rpa->alongminus, rpa->buffer, rpa->within, rpa->first_time, rpa->line_detail, rpa->geomfile, rpa->geompos, rpa->oprogress, rpa->todo, rpa->fname, rpa->child_shards);
+		if (sf.t < 0) {
+			break;
+		}
+
+		mvt_layer tmp_layer;
+		tmp_layer.extent = 1LL << 32;
+		tmp_layer.name = (*(rpa->layer_unmaps))[sf.segment][sf.layer];
+
+		mvt_feature tmp_feature;
+		tmp_feature.type = sf.t;
+		tmp_feature.geometry = to_feature(sf.geometry);
+		tmp_feature.id = sf.id;
+		tmp_feature.has_id = sf.has_id;
+
+		// Offset from tile coordinates back to world coordinates
+		unsigned sx = 0, sy = 0;
+		if (rpa->z != 0) {
+			sx = rpa->tx << (32 - rpa->z);
+			sy = rpa->ty << (32 - rpa->z);
+		}
+		for (size_t i = 0; i < tmp_feature.geometry.size(); i++) {
+			tmp_feature.geometry[i].x += sx;
+			tmp_feature.geometry[i].y += sy;
+		}
+
+		decode_meta(sf.m, sf.keys, sf.values, rpa->stringpool + rpa->pool_off[sf.segment], tmp_layer, tmp_feature);
+		tmp_layer.features.push_back(tmp_feature);
+
+		layer_to_geojson(rpa->prefilter_fp, tmp_layer, 0, 0, 0, false, true);
+	}
+
+	return NULL;
+}
+
 long long write_tile(FILE *geoms, long long *geompos_in, char *metabase, char *stringpool, int z, unsigned tx, unsigned ty, int detail, int min_detail, int basezoom, sqlite3 *outdb, double droprate, int buffer, const char *fname, FILE **geomfile, int minzoom, int maxzoom, double todo, volatile long long *along, long long alongminus, double gamma, int child_shards, long long *meta_off, long long *pool_off, unsigned *initial_x, unsigned *initial_y, volatile int *running, double simplification, std::vector<std::map<std::string, layermap_entry>> *layermaps, std::vector<std::vector<std::string>> *layer_unmaps, size_t pass, size_t passes, unsigned long long mingap, long long minextent, double fraction, const char *prefilter, const char *postfilter, write_tile_args *arg) {
 	int line_detail;
 	double merge_fraction = 1;
@@ -1367,49 +1443,70 @@ long long write_tile(FILE *geoms, long long *geompos_in, char *metabase, char *s
 
 		int prefilter_write = -1, prefilter_read = -1;
 		pid_t prefilter_pid = 0;
-		FILE *prefilter_fd = NULL;
+		FILE *prefilter_fp = NULL;
+		pthread_t prefilter_writer;
+		run_prefilter_args rpa;  // here so it stays in scope until joined
+
 		if (prefilter != NULL) {
 			setup_filter(prefilter, &prefilter_write, &prefilter_read, &prefilter_pid, z, tx, ty);
-			prefilter_fd = fdopen(prefilter_write, "w");
-			if (prefilter_fd == NULL) {
+			prefilter_fp = fdopen(prefilter_write, "w");
+			if (prefilter_fp == NULL) {
 				perror("freopen prefilter");
 				exit(EXIT_FAILURE);
 			}
+
+			rpa.geoms = geoms;
+			rpa.geompos_in = geompos_in;
+			rpa.metabase = metabase;
+			rpa.meta_off = meta_off;
+			rpa.z = z;
+			rpa.tx = tx;
+			rpa.ty = ty;
+			rpa.initial_x = initial_x;
+			rpa.initial_y = initial_y;
+			rpa.original_features = &original_features;
+			rpa.unclipped_features = &unclipped_features;
+			rpa.nextzoom = nextzoom;
+			rpa.maxzoom = maxzoom;
+			rpa.minzoom = minzoom;
+			rpa.max_zoom_increment = max_zoom_increment;
+			rpa.pass = pass;
+			rpa.passes = passes;
+			rpa.along = along;
+			rpa.alongminus = alongminus;
+			rpa.buffer = buffer;
+			rpa.within = within;
+			rpa.first_time = &first_time;
+			rpa.line_detail = line_detail;
+			rpa.geomfile = geomfile;
+			rpa.geompos = geompos;
+			rpa.oprogress = &oprogress;
+			rpa.todo = todo;
+			rpa.fname = fname;
+			rpa.child_shards = child_shards;
+			rpa.prefilter_fp = prefilter_fp;
+			rpa.layer_unmaps = layer_unmaps;
+			rpa.stringpool = stringpool;
+			rpa.pool_off = pool_off;
+
+			if (pthread_create(&prefilter_writer, NULL, run_prefilter, &rpa) != 0) {
+				perror("pthread_create (prefilter writer)");
+				exit(EXIT_FAILURE);
+			}
 		}
 
 		while (1) {
-			serial_feature sf = next_feature(geoms, geompos_in, metabase, meta_off, z, tx, ty, initial_x, initial_y, &original_features, &unclipped_features, nextzoom, maxzoom, minzoom, max_zoom_increment, pass, passes, along, alongminus, buffer, within, &first_time, line_detail, geomfile, geompos, &oprogress, todo, fname, child_shards);
+			serial_feature sf;
 
-			if (sf.t < 0) {
+			if (prefilter == NULL) {
+				sf = next_feature(geoms, geompos_in, metabase, meta_off, z, tx, ty, initial_x, initial_y, &original_features, &unclipped_features, nextzoom, maxzoom, minzoom, max_zoom_increment, pass, passes, along, alongminus, buffer, within, &first_time, line_detail, geomfile, geompos, &oprogress, todo, fname, child_shards);
+			} else {
+				// XXX parse prefilter
 				break;
 			}
 
-			if (prefilter != NULL) {
-				mvt_layer tmp_layer;
-				tmp_layer.extent = 1LL << 32;
-				tmp_layer.name = (*layer_unmaps)[sf.segment][sf.layer];
-
-				mvt_feature tmp_feature;
-				tmp_feature.type = sf.t;
-				tmp_feature.geometry = to_feature(sf.geometry);
-				tmp_feature.id = sf.id;
-				tmp_feature.has_id = sf.has_id;
-
-				// Offset from tile coordinates back to world coordinates
-				unsigned sx = 0, sy = 0;
-				if (z != 0) {
-					sx = tx << (32 - z);
-					sy = ty << (32 - z);
-				}
-				for (size_t i = 0; i < tmp_feature.geometry.size(); i++) {
-					tmp_feature.geometry[i].x += sx;
-					tmp_feature.geometry[i].y += sy;
-				}
-
-				decode_meta(sf.m, sf.keys, sf.values, stringpool + pool_off[sf.segment], tmp_layer, tmp_feature);
-				tmp_layer.features.push_back(tmp_feature);
-
-				layer_to_geojson(prefilter_fd, tmp_layer, 0, 0, 0, false, true);
+			if (sf.t < 0) {
+				break;
 			}
 
 			if (gamma > 0) {
@@ -1485,7 +1582,7 @@ long long write_tile(FILE *geoms, long long *geompos_in, char *metabase, char *s
 		}
 
 		if (prefilter != NULL) {
-			if (fclose(prefilter_fd) != 0) {
+			if (fclose(prefilter_fp) != 0) {
 				perror("fclose output to prefilter");
 				exit(EXIT_FAILURE);
 			}
@@ -1503,6 +1600,11 @@ long long write_tile(FILE *geoms, long long *geompos_in, char *metabase, char *s
 					break;
 				}
 			}
+			void *ret;
+			if (pthread_join(prefilter_writer, &ret) != 0) {
+				perror("pthread_join prefilter writer");
+				exit(EXIT_FAILURE);
+			}
 		}
 
 		first_time = false;