Bring over the multistage sort from datamaps to avoid Mac crashes

This commit is contained in:
Eric Fischer 2014-11-03 22:47:41 -08:00
parent 3f9f50258c
commit 3e3fdcbab4

169
geojson.c
View File

@ -65,9 +65,21 @@ int indexcmp(const void *v1, const void *v2) {
return -1;
} else if (i1->index > i2->index) {
return 1;
} else {
return 0;
}
if (i1->fpos < i2->fpos) {
return -1;
} else if (i1->fpos > i2->fpos) {
return 1;
}
if (i1->maxzoom < i2->maxzoom) {
return -1;
} else if (i1->maxzoom > i2->maxzoom) {
return 1;
}
return 0;
}
size_t fwrite_check(const void *ptr, size_t size, size_t nitems, FILE *stream, char *fname, json_pull *source) {
@ -180,6 +192,10 @@ void check(struct index *ix, long long n, char *metabase, unsigned *file_bbox, s
for (z = maxzoom; z >= minzoom; z--) {
struct index *i, *j = NULL;
for (i = ix; i < ix + n && i != NULL; i = j) {
if (i > ix && indexcmp(i - 1, i) > 0) {
fprintf(stderr, "index out of order\n");
exit(EXIT_FAILURE);
}
unsigned wx, wy;
decode(i->index, &wx, &wy);
@ -221,6 +237,55 @@ void check(struct index *ix, long long n, char *metabase, unsigned *file_bbox, s
fprintf(stderr, "\n");
}
struct merge {
long long start;
long long end;
struct merge *next;
};
static void insert(struct merge *m, struct merge **head, unsigned char *map, int bytes) {
while (*head != NULL && indexcmp(map + m->start, map + (*head)->start) > 0) {
head = &((*head)->next);
}
m->next = *head;
*head = m;
}
static void merge(struct merge *merges, int nmerges, unsigned char *map, FILE *f, int bytes, long long nrec) {
int i;
struct merge *head = NULL;
long long along = 0;
long long reported = -1;
for (i = 0; i < nmerges; i++) {
if (merges[i].start < merges[i].end) {
insert(&(merges[i]), &head, map, bytes);
}
}
while (head != NULL) {
fwrite(map + head->start, bytes, 1, f);
head->start += bytes;
struct merge *m = head;
head = m->next;
m->next = NULL;
if (m->start < m->end) {
insert(m, &head, map, bytes);
}
along++;
long long report = 100 * along / nrec;
if (report != reported) {
fprintf(stderr, "Merging: %lld%%\r", report);
reported = report;
}
}
}
void read_json(FILE *f, char *fname, char *layername, int maxzoom, int minzoom, sqlite3 *outdb, struct pool *exclude, int exclude_all, double droprate, int buffer) {
char metaname[] = "/tmp/meta.XXXXXXXX";
char indexname[] = "/tmp/index.XXXXXXXX";
@ -421,11 +486,6 @@ next_feature:
struct stat indexst;
fstat(indexfd, &indexst);
struct index *index = mmap(NULL, indexst.st_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, indexfd, 0);
if (index == MAP_FAILED) {
perror("mmap index");
exit(EXIT_FAILURE);
}
struct stat metast;
fstat(metafd, &metast);
@ -468,7 +528,100 @@ next_feature:
printf("using layer name %s\n", trunc);
}
qsort(index, indexst.st_size / sizeof(struct index), sizeof(struct index), indexcmp);
{
int bytes = sizeof(struct index);
fprintf(stderr,
"Sorting %lld indices for %lld features\n",
(long long) indexst.st_size / bytes,
seq);
int page = sysconf(_SC_PAGESIZE);
long long unit = (50 * 1024 * 1024 / bytes) * bytes;
while (unit % page != 0) {
unit += bytes;
}
int nmerges = (indexst.st_size + unit - 1) / unit;
struct merge merges[nmerges];
long long start;
for (start = 0; start < indexst.st_size; start += unit) {
long long end = start + unit;
if (end > indexst.st_size) {
end = indexst.st_size;
}
if (nmerges != 1) {
fprintf(stderr, "Sorting part %lld of %d\r", start / unit + 1, nmerges);
}
merges[start / unit].start = start;
merges[start / unit].end = end;
merges[start / unit].next = NULL;
void *map = mmap(NULL, end - start, PROT_READ | PROT_WRITE, MAP_PRIVATE, indexfd, start);
if (map == MAP_FAILED) {
perror("mmap");
exit(EXIT_FAILURE);
}
qsort(map, (end - start) / bytes, bytes, indexcmp);
// Sorting and then copying avoids the need to
// write out intermediate stages of the sort.
void *map2 = mmap(NULL, end - start, PROT_READ | PROT_WRITE, MAP_SHARED, indexfd, start);
if (map2 == MAP_FAILED) {
perror("mmap (write)");
exit(EXIT_FAILURE);
}
memcpy(map2, map, end - start);
munmap(map, end - start);
munmap(map2, end - start);
}
if (nmerges != 1) {
fprintf(stderr, "\n");
}
void *map = mmap(NULL, indexst.st_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, indexfd, 0);
if (map == MAP_FAILED) {
perror("mmap");
exit(EXIT_FAILURE);
}
FILE *f = fopen(indexname, "w");
if (f == NULL) {
perror(indexname);
exit(EXIT_FAILURE);
}
merge(merges, nmerges, map, f, bytes, indexst.st_size / bytes);
munmap(map, indexst.st_size);
fclose(f);
close(indexfd);
}
indexfd = open(indexname, O_RDONLY);
if (indexfd < 0) {
perror(indexname);
exit(EXIT_FAILURE);
}
if (unlink(indexname) != 0) {
perror(indexname);
exit(EXIT_FAILURE);
}
struct index *index = mmap(NULL, indexst.st_size, PROT_READ, MAP_PRIVATE, indexfd, 0);
if (index == MAP_FAILED) {
perror("mmap index");
exit(EXIT_FAILURE);
}
check(index, indexst.st_size / sizeof(struct index), meta, file_bbox, &file_keys, &midx, &midy, layername, maxzoom, minzoom, outdb, droprate, buffer);
munmap(index, indexst.st_size);