From 40ec317c3626b560c97fee801702d9a29a7dea53 Mon Sep 17 00:00:00 2001 From: Eric Fischer Date: Tue, 22 Dec 2015 14:02:31 -0800 Subject: [PATCH] Launch a separate thread to read each segment --- geojson.c | 123 +++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 98 insertions(+), 25 deletions(-) diff --git a/geojson.c b/geojson.c index 37a12f8..8544027 100644 --- a/geojson.c +++ b/geojson.c @@ -18,6 +18,7 @@ #include #include #include +#include #include "jsonpull.h" #include "tile.h" @@ -745,6 +746,38 @@ void parse_json(json_pull *jp, const char *reading, long long *seq, long long *m } } +struct parse_json_args { + json_pull *jp; + const char *reading; + long long *seq; + long long *metapos; + long long *geompos; + long long *indexpos; + struct pool *exclude; + struct pool *include; + int exclude_all; + FILE *metafile; + FILE *geomfile; + FILE *indexfile; + struct memfile *poolfile; + struct memfile *treefile; + char *fname; + int maxzoom; + int basezoom; + int layer; + double droprate; + long long *file_bbox; + int segment; +}; + +void *run_parse_json(void *v) { + struct parse_json_args *pja = v; + + parse_json(pja->jp, pja->reading, pja->seq, pja->metapos, pja->geompos, pja->indexpos, pja->exclude, pja->include, pja->exclude_all, pja->metafile, pja->geomfile, pja->indexfile, pja->poolfile, pja->treefile, pja->fname, pja->maxzoom, pja->basezoom, pja->layer, pja->droprate, pja->file_bbox, pja->segment); + + return NULL; +} + struct jsonmap { char *map; long long off; @@ -808,10 +841,9 @@ struct reader { int read_json(int argc, char **argv, char *fname, const char *layername, int maxzoom, int minzoom, int basezoom, double basezoom_marker_width, sqlite3 *outdb, struct pool *exclude, struct pool *include, int exclude_all, double droprate, int buffer, const char *tmpdir, double gamma, char *prevent, char *additional) { int ret = EXIT_SUCCESS; -#define THREADS 10 - struct reader reader[THREADS]; + struct reader reader[CPUS]; int i; - for (i = 0; i < THREADS; i++) { + for (i = 0; i < CPUS; i++) { struct reader *r = reader + i; r->metaname = malloc(strlen(tmpdir) + strlen("/meta.XXXXXXXX") + 1); @@ -944,13 +976,13 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max } if (map != NULL && map != MAP_FAILED) { - long long segs[THREADS + 1]; + long long segs[CPUS + 1]; segs[0] = 0; - segs[THREADS] = st.st_size - off; + segs[CPUS] = st.st_size - off; int i; - for (i = 1; i < THREADS; i++) { - segs[i] = off + (st.st_size - off) * i / THREADS; + for (i = 1; i < CPUS; i++) { + segs[i] = off + (st.st_size - off) * i / CPUS; while (segs[i] < st.st_size && map[segs[i]] != '\n') { segs[i]++; @@ -959,11 +991,47 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max printf("%d %lld\n", i, segs[i]); } - for (i = 0; i < THREADS; i++) { - json_pull *jp = json_begin_map(map + segs[i], segs[i + 1] - segs[i]); - parse_json(jp, reading, &seq, &reader[i].metapos, &reader[i].geompos, &reader[i].indexpos, exclude, include, exclude_all, reader[i].metafile, reader[i].geomfile, reader[i].indexfile, reader[i].poolfile, reader[i].treefile, fname, maxzoom, basezoom, source, droprate, reader[i].file_bbox, i); - free(jp->source); - json_end(jp); + struct parse_json_args pja[CPUS]; + pthread_t pthreads[CPUS]; + + for (i = 0; i < CPUS; i++) { + pja[i].jp = json_begin_map(map + segs[i], segs[i + 1] - segs[i]); + pja[i].reading = reading; + pja[i].seq = &seq; + pja[i].metapos = &reader[i].metapos; + pja[i].geompos = &reader[i].geompos; + pja[i].indexpos = &reader[i].indexpos; + pja[i].exclude = exclude; + pja[i].include = include; + pja[i].exclude_all = exclude_all; + pja[i].metafile = reader[i].metafile; + pja[i].geomfile = reader[i].geomfile; + pja[i].indexfile = reader[i].indexfile; + pja[i].poolfile = reader[i].poolfile; + pja[i].treefile = reader[i].treefile; + pja[i].fname = fname; + pja[i].maxzoom = maxzoom; + pja[i].basezoom = basezoom; + pja[i].layer = source; + pja[i].droprate = droprate; + pja[i].file_bbox = reader[i].file_bbox; + pja[i].segment = i; + + if (pthread_create(&pthreads[i], NULL, run_parse_json, &pja[i]) != 0) { + perror("pthread_create"); + exit(EXIT_FAILURE); + } + } + + for (i = 0; i < CPUS; i++) { + void *retval; + + if (pthread_join(pthreads[i], &retval) != 0) { + perror("pthread_join"); + } + + free(pja[i].jp->source); + json_end(pja[i].jp); } } else { FILE *fp = fdopen(fd, "r"); @@ -980,7 +1048,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max } } - for (i = 0; i < THREADS; i++) { + for (i = 0; i < CPUS; i++) { fclose(reader[i].metafile); fclose(reader[i].geomfile); fclose(reader[i].indexfile); @@ -1070,7 +1138,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max unlink(indexname); long long indexpos = 0; - for (i = 0; i < THREADS; i++) { + for (i = 0; i < CPUS; i++) { if (reader[i].indexpos > 0) { void *map = mmap(NULL, reader[i].indexpos, PROT_READ, MAP_PRIVATE, reader[i].indexfd, 0); if (map == MAP_FAILED) { @@ -1371,12 +1439,17 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max } unlink(indexname); - for (i = 0; i < THREADS; i++) { - reader[i].geom_map = mmap(NULL, reader[i].geomst.st_size, PROT_READ, MAP_PRIVATE, reader[i].geomfd, 0); - if (reader[i].geom_map == MAP_FAILED) { - perror("mmap unsorted geometry"); - exit(EXIT_FAILURE); + for (i = 0; i < CPUS; i++) { + reader[i].geom_map = NULL; + + if (reader[i].geomst.st_size > 0) { + reader[i].geom_map = mmap(NULL, reader[i].geomst.st_size, PROT_READ, MAP_PRIVATE, reader[i].geomfd, 0); + if (reader[i].geom_map == MAP_FAILED) { + perror("mmap unsorted geometry"); + exit(EXIT_FAILURE); + } } + if (close(reader[i].geomfd) != 0) { perror("close unsorted geometry"); } @@ -1434,7 +1507,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max perror("unmap sorted index"); } - for (i = 0; i < THREADS; i++) { + for (i = 0; i < CPUS; i++) { if (munmap(reader[i].geom_map, reader[i].geomst.st_size) != 0) { perror("unmap unsorted geometry"); } @@ -1469,7 +1542,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max } if (!quiet) { - for (i = 0; i < THREADS; i++) { + for (i = 0; i < CPUS; i++) { fprintf(stderr, "%lld features, %lld bytes of geometry, %lld bytes of metadata, %lld bytes of string pool\n", seq, (long long) reader[i].geomst.st_size, (long long) reader[i].metast.st_size, reader[i].poolfile->off); } } @@ -1478,8 +1551,8 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max // but keep track of the offsets into it since we still need // segment+offset to find the data. - long long pool_off[THREADS]; - long long meta_off[THREADS]; + long long pool_off[CPUS]; + long long meta_off[CPUS]; char poolname[strlen(tmpdir) + strlen("/pool.XXXXXXXX") + 1]; sprintf(poolname, "%s%s", tmpdir, "/pool.XXXXXXXX"); @@ -1518,7 +1591,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max long long metapos = 0; long long poolpos = 0; - for (i = 0; i < THREADS; i++) { + for (i = 0; i < CPUS; i++) { if (reader[i].metapos > 0) { void *map = mmap(NULL, reader[i].metapos, PROT_READ, MAP_PRIVATE, reader[i].metafd, 0); if (map == MAP_FAILED) { @@ -1595,7 +1668,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max midlon = (maxlon + minlon) / 2; long long file_bbox[4] = { UINT_MAX, UINT_MAX, 0, 0 }; - for (i = 0; i < THREADS; i++) { + for (i = 0; i < CPUS; i++) { if (reader[i].file_bbox[0] < file_bbox[0]) { file_bbox[0] = reader[i].file_bbox[0]; }