From 986719f2ff247f873a96af820df4cda4ca03406e Mon Sep 17 00:00:00 2001 From: Eric Fischer Date: Mon, 4 Apr 2016 10:53:53 -0700 Subject: [PATCH] Rework sorting/merging progress indicator --- geojson.c | 59 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/geojson.c b/geojson.c index 132c176..c2e8023 100644 --- a/geojson.c +++ b/geojson.c @@ -323,11 +323,9 @@ static void insert(struct merge *m, struct merge **head, unsigned char *map) { *head = m; } -static void merge(struct merge *merges, int nmerges, unsigned char *map, FILE *f, int bytes, long long nrec, char *geom_map, FILE *geom_out, long long *geompos) { +static void merge(struct merge *merges, int nmerges, unsigned char *map, FILE *f, int bytes, long long nrec, char *geom_map, FILE *geom_out, long long *geompos, long long *along, long long *reported, long long geom_total) { int i; struct merge *head = NULL; - long long along = 0; - long long reported = -1; for (i = 0; i < nmerges; i++) { if (merges[i].start < merges[i].end) { @@ -340,6 +338,13 @@ static void merge(struct merge *merges, int nmerges, unsigned char *map, FILE *f fwrite_check(geom_map + ix->start, 1, ix->end - ix->start, geom_out, "merge geometry"); *geompos += ix->end - ix->start; + // Count this as a half-accomplishment, since we already half-counted it + *along += (ix->end - ix->start) / 2; + if (!quiet && 100 * *along / geom_total != *reported) { + fprintf(stderr, "Reordering geometry: %lld%% \r", 100 * *along / geom_total); + *reported = 100 * *along / geom_total; + } + fwrite_check(map + head->start, bytes, 1, f, "merge temporary"); head->start += bytes; @@ -350,15 +355,6 @@ static void merge(struct merge *merges, int nmerges, unsigned char *map, FILE *f if (m->start < m->end) { insert(m, &head, map); } - - along++; - long long report = 100 * along / nrec; - if (report != reported) { - if (!quiet) { - fprintf(stderr, "Merging: %lld%%\r", report); - } - reported = report; - } } } @@ -893,12 +889,6 @@ void *run_sort(void *v) { end = a->indexpos; } - if (a->nmerges != 1) { - if (!quiet) { - fprintf(stderr, "Sorting part %lld of %d \r", start / a->unit + 1, a->nmerges); - } - } - a->merges[start / a->unit].start = start; a->merges[start / a->unit].end = end; a->merges[start / a->unit].next = NULL; @@ -1095,7 +1085,7 @@ void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile i } } -void radix1(int *geomfds_in, int *indexfds_in, int inputs, int prefix, int splits, long long mem, const char *tmpdir, int availfiles, FILE *geomfile, FILE *indexfile, long long *geompos_out) { +void radix1(int *geomfds_in, int *indexfds_in, int inputs, int prefix, int splits, long long mem, const char *tmpdir, int availfiles, FILE *geomfile, FILE *indexfile, long long *geompos_out, long long geom_total, long long *reported) { // Arranged as bits to facilitate subdividing again if a subdivided file is still huge int splitbits = log(splits) / log(2); splits = 1 << splitbits; @@ -1106,8 +1096,6 @@ void radix1(int *geomfds_in, int *indexfds_in, int inputs, int prefix, int split int indexfds[splits]; long long geompos[splits]; - fprintf(stderr, "prefix %d, splits %d, splitbits %d\n", prefix, splits, splitbits); - int i; for (i = 0; i < splits; i++) { geompos[i] = 0; @@ -1145,6 +1133,8 @@ void radix1(int *geomfds_in, int *indexfds_in, int inputs, int prefix, int split unlink(indexname); } + long long along = *geompos_out; + for (i = 0; i < inputs; i++) { struct stat geomst, indexst; if (fstat(geomfds_in[i], &geomst) < 0) { @@ -1181,6 +1171,13 @@ void radix1(int *geomfds_in, int *indexfds_in, int inputs, int prefix, int split fwrite_check(geommap + ix.start, ix.end - ix.start, 1, geomfiles[which], "geom"); geompos[which] += ix.end - ix.start; + // Count this as a half-accomplishment, since we will copy again + along += (ix.end - ix.start) / 2; + if (!quiet && 100 * along / geom_total != *reported) { + fprintf(stderr, "Reordering geometry: %lld%% \r", 100 * along / geom_total); + *reported = 100 * along / geom_total; + } + ix.start = pos; ix.end = geompos[which]; @@ -1250,9 +1247,6 @@ void radix1(int *geomfds_in, int *indexfds_in, int inputs, int prefix, int split long long indexpos = indexst.st_size; int bytes = sizeof(struct index); - if (!quiet) { - fprintf(stderr, "Sorting %lld features\n", (long long) indexpos / bytes); - } int page = sysconf(_SC_PAGESIZE); long long unit = (50 * 1024 * 1024 / bytes) * bytes; @@ -1308,7 +1302,7 @@ void radix1(int *geomfds_in, int *indexfds_in, int inputs, int prefix, int split exit(EXIT_FAILURE); } - merge(merges, nmerges, (unsigned char *) indexmap, indexfile, bytes, indexpos / bytes, geommap, geomfile, geompos); + merge(merges, nmerges, (unsigned char *) indexmap, indexfile, bytes, indexpos / bytes, geommap, geomfile, geompos, &along, reported, geom_total); } else if (indexst.st_size == sizeof(struct index) || prefix + splitbits >= 64) { long long a; for (a = 0; a < indexst.st_size / sizeof(struct index); a++) { @@ -1323,7 +1317,7 @@ void radix1(int *geomfds_in, int *indexfds_in, int inputs, int prefix, int split fwrite_check(&ix, sizeof(struct index), 1, indexfile, "index"); } } else { - radix1(&geomfds[i], &indexfds[i], 1, prefix + splitbits, availfiles / 4, mem, tmpdir, availfiles, geomfile, indexfile, geompos_out); + radix1(&geomfds[i], &indexfds[i], 1, prefix + splitbits, availfiles / 4, mem, tmpdir, availfiles, geomfile, indexfile, geompos_out, geom_total, reported); already_closed = 1; } @@ -1390,8 +1384,6 @@ void radix(struct reader *reader, int nreaders, FILE *geomfile, int geomfd, FILE mem = (long long) pages * pagesize; #endif - printf("you have %lld files and %lld memory\n", (long long) rl.rlim_cur, mem); - long long availfiles = rl.rlim_cur - 2 * nreaders // each reader has a geom and an index - 4 // pool, meta, mbtiles, mbtiles journal - 4 // top-level geom and index output, both FILE and fd @@ -1404,15 +1396,24 @@ void radix(struct reader *reader, int nreaders, FILE *geomfile, int geomfd, FILE // is to keep from thrashing by working on chunks that will fit in memory. mem /= 2; + long long geom_total = 0; int geomfds[nreaders]; int indexfds[nreaders]; int i; for (i = 0; i < nreaders; i++) { geomfds[i] = reader[i].geomfd; indexfds[i] = reader[i].indexfd; + + struct stat geomst; + if (fstat(reader[i].geomfd, &geomst) < 0) { + perror("stat geom"); + exit(EXIT_FAILURE); + } + geom_total += geomst.st_size; } - radix1(geomfds, indexfds, nreaders, 0, splits, mem, tmpdir, availfiles, geomfile, indexfile, geompos); + long long reported = -1; + radix1(geomfds, indexfds, nreaders, 0, splits, mem, tmpdir, availfiles, geomfile, indexfile, geompos, geom_total, &reported); } int read_json(int argc, struct source **sourcelist, char *fname, const char *layername, int maxzoom, int minzoom, int basezoom, double basezoom_marker_width, sqlite3 *outdb, struct pool *exclude, struct pool *include, int exclude_all, double droprate, int buffer, const char *tmpdir, double gamma, int *prevent, int *additional, int read_parallel, int forcetable) {