From 11e737ff29657101173909cab84b6172913909b3 Mon Sep 17 00:00:00 2001 From: Eric Fischer Date: Mon, 7 Mar 2016 16:14:30 -0800 Subject: [PATCH 1/3] Reorder geometry as part of the merge --- geojson.c | 192 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 110 insertions(+), 82 deletions(-) diff --git a/geojson.c b/geojson.c index 85c4916..66fbf7a 100644 --- a/geojson.c +++ b/geojson.c @@ -308,7 +308,7 @@ static void insert(struct merge *m, struct merge **head, unsigned char *map, int *head = m; } -static void merge(struct merge *merges, int nmerges, unsigned char *map, FILE *f, int bytes, long long nrec) { +static void merge(struct merge *merges, int nmerges, unsigned char *map, FILE *f, int bytes, long long nrec, char *geom_map, FILE *geom_out, long long *geompos) { int i; struct merge *head = NULL; long long along = 0; @@ -321,6 +321,10 @@ static void merge(struct merge *merges, int nmerges, unsigned char *map, FILE *f } while (head != NULL) { + struct index *ix = (struct index *) (map + head->start); + fwrite_check(geom_map + ix->start, 1, ix->end - ix->start, geom_out, "merge geometry"); + *geompos += ix->end - ix->start; + fwrite_check(map + head->start, bytes, 1, f, "merge temporary"); head->start += bytes; @@ -1440,6 +1444,26 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max } fclose(indexfile); + char geomname[strlen(tmpdir) + strlen("/geom.XXXXXXXX") + 1]; + + FILE *geomfile; + int geomfd; + long long geompos = 0; + struct stat geomst; + + sprintf(geomname, "%s%s", tmpdir, "/geom.XXXXXXXX"); + geomfd = mkstemp(geomname); + if (geomfd < 0) { + perror(geomname); + exit(EXIT_FAILURE); + } + geomfile = fopen(geomname, "wb"); + if (geomfile == NULL) { + perror(geomname); + exit(EXIT_FAILURE); + } + int geomfd2; + /* Sort the index by geometry */ { @@ -1456,6 +1480,9 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max int nmerges = (indexpos + unit - 1) / unit; struct merge merges[nmerges]; + for (i = 0; i < nmerges; i++) { + merges[i].start = merges[i].end = 0; + } pthread_t pthreads[CPUS]; struct sort_arg args[CPUS]; @@ -1491,23 +1518,101 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max } } - void *map = mmap(NULL, indexpos, PROT_READ, MAP_PRIVATE, indexfd, 0); + char *map = mmap(NULL, indexpos, PROT_READ | PROT_WRITE, MAP_SHARED, indexfd, 0); if (map == MAP_FAILED) { perror("mmap"); exit(EXIT_FAILURE); } + /* + * This is the last opportunity to access the geometry in + * close to the original order, so that we can reorder it + * without thrashing. + * + * Each of the sorted index chunks originally had contiguous + * geography, so it can be copied relatively cheaply in sorted order + * into the temporary files that are then merged together to produce + * the final geometry. + */ + + for (i = 0; i < CPUS; i++) { + reader[i].geom_map = NULL; + + if (reader[i].geomst.st_size > 0) { + reader[i].geom_map = mmap(NULL, reader[i].geomst.st_size, PROT_READ, MAP_PRIVATE, reader[i].geomfd, 0); + if (reader[i].geom_map == MAP_FAILED) { + perror("mmap unsorted geometry"); + exit(EXIT_FAILURE); + } + } + } + + for (i = 0; i < nmerges; i++) { + if (!quiet) { + fprintf(stderr, "Reordering geometry: part %d of %d \r", i, nmerges); + } + + long long j; + for (j = merges[i].start; j < merges[i].end; j += sizeof(struct index)) { + struct index *ix = (struct index *) (map + j); + long long start = geompos; + + fwrite_check(reader[ix->segment].geom_map + ix->start, sizeof(char), ix->end - ix->start, geomfile, fname); + geompos += ix->end - ix->start; + + // Repoint the index to where we just copied the geometry + + ix->start = start; + ix->end = geompos; + } + } + + fclose(geomfile); + + long long pre_merged_geompos = geompos; + char *geom_map = mmap(NULL, geompos, PROT_READ, MAP_PRIVATE, geomfd, 0); + if (geom_map == MAP_FAILED) { + perror("mmap"); + exit(EXIT_FAILURE); + } + FILE *f = fopen(indexname, "w"); if (f == NULL) { perror(indexname); exit(EXIT_FAILURE); } - merge(merges, nmerges, (unsigned char *) map, f, bytes, indexpos / bytes); + sprintf(geomname, "%s%s", tmpdir, "/geom.XXXXXXXX"); + geomfd2 = mkstemp(geomname); + if (geomfd2 < 0) { + perror(geomname); + exit(EXIT_FAILURE); + } + geomfile = fopen(geomname, "wb"); + if (geomfile == NULL) { + perror(geomname); + exit(EXIT_FAILURE); + } + + geompos = 0; + + /* initial tile is 0/0/0 */ + serialize_int(geomfile, 0, &geompos, fname); + serialize_uint(geomfile, 0, &geompos, fname); + serialize_uint(geomfile, 0, &geompos, fname); + + merge(merges, nmerges, (unsigned char *) map, f, bytes, indexpos / bytes, geom_map, geomfile, &geompos); munmap(map, indexpos); fclose(f); close(indexfd); + + munmap(geom_map, pre_merged_geompos); + close(geomfd); + + /* end of tile */ + serialize_byte(geomfile, -2, &geompos, fname); + fclose(geomfile); } indexfd = open(indexname, O_RDONLY); @@ -1697,77 +1802,6 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max } unlink(indexname); - for (i = 0; i < CPUS; i++) { - reader[i].geom_map = NULL; - - if (reader[i].geomst.st_size > 0) { - reader[i].geom_map = mmap(NULL, reader[i].geomst.st_size, PROT_READ, MAP_PRIVATE, reader[i].geomfd, 0); - if (reader[i].geom_map == MAP_FAILED) { - perror("mmap unsorted geometry"); - exit(EXIT_FAILURE); - } - } - } - - char geomname[strlen(tmpdir) + strlen("/geom.XXXXXXXX") + 1]; - - FILE *geomfile; - int geomfd; - long long geompos = 0; - struct stat geomst; - - sprintf(geomname, "%s%s", tmpdir, "/geom.XXXXXXXX"); - geomfd = mkstemp(geomname); - if (geomfd < 0) { - perror(geomname); - exit(EXIT_FAILURE); - } - geomfile = fopen(geomname, "wb"); - if (geomfile == NULL) { - perror(geomname); - exit(EXIT_FAILURE); - } - - { - geompos = 0; - - /* initial tile is 0/0/0 */ - serialize_int(geomfile, 0, &geompos, fname); - serialize_uint(geomfile, 0, &geompos, fname); - serialize_uint(geomfile, 0, &geompos, fname); - - long long i; - long long sum = 0; - long long progress = 0; - unsigned long long prev = 0; - for (i = 0; i < indexpos / sizeof(struct index); i++) { - if (index_map[i].index < prev) { - unsigned x, y; - decode(index_map[i].index, &x, &y); - double lat, lon; - tile2latlon(x, y, 32, &lat, &lon); - fprintf(stderr, "Internal error: index out of order. %lld vs %lld: %lf,%lf\n", index_map[i].index, prev, lat, lon); - } - prev = index_map[i].index; - - fwrite_check(reader[index_map[i].segment].geom_map + index_map[i].start, sizeof(char), index_map[i].end - index_map[i].start, geomfile, fname); - sum += index_map[i].end - index_map[i].start; - geompos += index_map[i].end - index_map[i].start; - - long long p = 1000 * i / (indexpos / sizeof(struct index)); - if (p != progress) { - if (!quiet) { - fprintf(stderr, "Reordering geometry: %3.1f%%\r", p / 10.0); - } - progress = p; - } - } - - /* end of tile */ - serialize_byte(geomfile, -2, &geompos, fname); - fclose(geomfile); - } - if (munmap(index_map, indexpos) != 0) { perror("unmap sorted index"); } @@ -1788,13 +1822,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max /* Traverse and split the geometries for each zoom level */ - geomfd = open(geomname, O_RDONLY); - if (geomfd < 0) { - perror("reopen sorted geometry"); - exit(EXIT_FAILURE); - } - unlink(geomname); - if (fstat(geomfd, &geomst) != 0) { + if (fstat(geomfd2, &geomst) != 0) { perror("stat sorted geom\n"); exit(EXIT_FAILURE); } @@ -1802,7 +1830,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max int fd[TEMP_FILES]; off_t size[TEMP_FILES]; - fd[0] = geomfd; + fd[0] = geomfd2; size[0] = geomst.st_size; int j; From bf585a584971a2a145e7730a2a43256284fa8725 Mon Sep 17 00:00:00 2001 From: Eric Fischer Date: Mon, 7 Mar 2016 16:38:21 -0800 Subject: [PATCH 2/3] Add newline to clean up progress messages --- geojson.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/geojson.c b/geojson.c index 66fbf7a..462b2ad 100644 --- a/geojson.c +++ b/geojson.c @@ -1566,6 +1566,9 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max ix->end = geompos; } } + if (!quiet) { + fprintf(stderr, "\n"); + } fclose(geomfile); From cf5082122a60fd5c4bbf4b341d0f6b45faee8e93 Mon Sep 17 00:00:00 2001 From: Eric Fischer Date: Mon, 7 Mar 2016 16:43:10 -0800 Subject: [PATCH 3/3] Close original geometry temp files as soon as they are no longer needed --- geojson.c | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/geojson.c b/geojson.c index 462b2ad..d1f024e 100644 --- a/geojson.c +++ b/geojson.c @@ -1548,8 +1548,8 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max } for (i = 0; i < nmerges; i++) { - if (!quiet) { - fprintf(stderr, "Reordering geometry: part %d of %d \r", i, nmerges); + if (!quiet && nmerges > 1) { + fprintf(stderr, "Reordering geometry: part %d of %d \r", i + 1, nmerges); } long long j; @@ -1566,7 +1566,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max ix->end = geompos; } } - if (!quiet) { + if (!quiet && nmerges > 1) { fprintf(stderr, "\n"); } @@ -1579,7 +1579,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max exit(EXIT_FAILURE); } - FILE *f = fopen(indexname, "w"); + FILE *f = fopen(indexname, "wb"); if (f == NULL) { perror(indexname); exit(EXIT_FAILURE); @@ -1597,6 +1597,17 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max exit(EXIT_FAILURE); } + for (i = 0; i < CPUS; i++) { + if (reader[i].geomst.st_size > 0) { + if (munmap(reader[i].geom_map, reader[i].geomst.st_size) != 0) { + perror("unmap unsorted geometry"); + } + } + if (close(reader[i].geomfd) != 0) { + perror("close unsorted geometry"); + } + } + geompos = 0; /* initial tile is 0/0/0 */ @@ -1809,16 +1820,6 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max perror("unmap sorted index"); } - for (i = 0; i < CPUS; i++) { - if (reader[i].geomst.st_size > 0) { - if (munmap(reader[i].geom_map, reader[i].geomst.st_size) != 0) { - perror("unmap unsorted geometry"); - } - } - if (close(reader[i].geomfd) != 0) { - perror("close unsorted geometry"); - } - } if (close(indexfd) != 0) { perror("close sorted index"); }