From 2b378ceb9ff86dfa3b14a46093ac7385565e3536 Mon Sep 17 00:00:00 2001 From: Eric Fischer Date: Mon, 4 Jan 2016 13:39:34 -0800 Subject: [PATCH] Use multiple threads to sort the features --- geojson.c | 125 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 93 insertions(+), 32 deletions(-) diff --git a/geojson.c b/geojson.c index 996fb18..0662e8b 100644 --- a/geojson.c +++ b/geojson.c @@ -844,6 +844,64 @@ struct reader { char *geom_map; }; +struct sort_arg { + int task; + int cpus; + long long indexpos; + struct merge *merges; + int indexfd; + int nmerges; + long long unit; + int bytes; +}; + +void *run_sort(void *v) { + struct sort_arg *a = v; + + long long start; + for (start = a->task * a->unit; start < a->indexpos; start += a->unit * a->cpus) { + long long end = start + a->unit; + if (end > a->indexpos) { + end = a->indexpos; + } + + if (a->nmerges != 1) { + if (!quiet) { + fprintf(stderr, "Sorting part %lld of %d \r", start / a->unit + 1, a->nmerges); + } + } + + a->merges[start / a->unit].start = start; + a->merges[start / a->unit].end = end; + a->merges[start / a->unit].next = NULL; + + // MAP_PRIVATE to avoid disk writes if it fits in memory + void *map = mmap(NULL, end - start, PROT_READ | PROT_WRITE, MAP_PRIVATE, a->indexfd, start); + if (map == MAP_FAILED) { + perror("mmap"); + exit(EXIT_FAILURE); + } + + qsort(map, (end - start) / a->bytes, a->bytes, indexcmp); + + // Sorting and then copying avoids disk access to + // write out intermediate stages of the sort. + + void *map2 = mmap(NULL, end - start, PROT_READ | PROT_WRITE, MAP_SHARED, a->indexfd, start); + if (map2 == MAP_FAILED) { + perror("mmap (write)"); + exit(EXIT_FAILURE); + } + + memcpy(map2, map, end - start); + + munmap(map, end - start); + munmap(map2, end - start); + } + + return NULL; +} + int read_json(int argc, char **argv, char *fname, const char *layername, int maxzoom, int minzoom, int basezoom, double basezoom_marker_width, sqlite3 *outdb, struct pool *exclude, struct pool *include, int exclude_all, double droprate, int buffer, const char *tmpdir, double gamma, char *prevent, char *additional, int read_parallel) { int ret = EXIT_SUCCESS; @@ -1187,6 +1245,12 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max fprintf(stderr, "Sorting %lld features\n", (long long) indexpos / bytes); } + // XXX On machines with different page sizes, doing the sorting + // in different-sized chunks can cause features with the same + // index (i.e., the same bbox) to appear in different orders + // because the sort is unstable. This doesn't seem worth spending + // more memory to fix, but could make tests unstable. + int page = sysconf(_SC_PAGESIZE); long long unit = (50 * 1024 * 1024 / bytes) * bytes; while (unit % page != 0) { @@ -1196,45 +1260,32 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max int nmerges = (indexpos + unit - 1) / unit; struct merge merges[nmerges]; - long long start; - for (start = 0; start < indexpos; start += unit) { - long long end = start + unit; - if (end > indexpos) { - end = indexpos; - } + pthread_t pthreads[CPUS]; + struct sort_arg args[CPUS]; - if (nmerges != 1) { - if (!quiet) { - fprintf(stderr, "Sorting part %lld of %d\r", start / unit + 1, nmerges); - } - } + int i; + for (i = 0; i < CPUS; i++) { + args[i].task = i; + args[i].cpus = CPUS; + args[i].indexpos = indexpos; + args[i].merges = merges; + args[i].indexfd = indexfd; + args[i].nmerges = nmerges; + args[i].unit = unit; + args[i].bytes = bytes; - merges[start / unit].start = start; - merges[start / unit].end = end; - merges[start / unit].next = NULL; - - // MAP_PRIVATE to avoid disk writes if it fits in memory - void *map = mmap(NULL, end - start, PROT_READ | PROT_WRITE, MAP_PRIVATE, indexfd, start); - if (map == MAP_FAILED) { - perror("mmap"); + if (pthread_create(&pthreads[i], NULL, run_sort, &args[i]) != 0) { + perror("pthread_create"); exit(EXIT_FAILURE); } + } - qsort(map, (end - start) / bytes, bytes, indexcmp); + for (i = 0; i < CPUS; i++) { + void *retval; - // Sorting and then copying avoids the need to - // write out intermediate stages of the sort. - - void *map2 = mmap(NULL, end - start, PROT_READ | PROT_WRITE, MAP_SHARED, indexfd, start); - if (map2 == MAP_FAILED) { - perror("mmap (write)"); - exit(EXIT_FAILURE); + if (pthread_join(pthreads[i], &retval) != 0) { + perror("pthread_join"); } - - memcpy(map2, map, end - start); - - munmap(map, end - start); - munmap(map2, end - start); } if (nmerges != 1) { @@ -1505,7 +1556,17 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max long long i; long long sum = 0; long long progress = 0; + unsigned long long prev = 0; for (i = 0; i < indexpos / sizeof(struct index); i++) { + if (index_map[i].index < prev) { + unsigned x, y; + decode(index_map[i].index, &x, &y); + double lat, lon; + tile2latlon(x, y, 32, &lat, &lon); + fprintf(stderr, "Internal error: index out of order. %lld vs %lld: %lf,%lf\n", index_map[i].index, prev, lat, lon); + } + prev = index_map[i].index; + fwrite_check(reader[index_map[i].segment].geom_map + index_map[i].start, sizeof(char), index_map[i].end - index_map[i].start, geomfile, fname); sum += index_map[i].end - index_map[i].start; geompos += index_map[i].end - index_map[i].start;