Merge pull request #182 from mapbox/thrashing

Reorder geometry as part of index merging, to reduce thrashing in low memory
This commit is contained in:
Eric Fischer 2016-03-08 13:41:47 -08:00
commit c387af48cc

218
geojson.c
View File

@ -308,7 +308,7 @@ static void insert(struct merge *m, struct merge **head, unsigned char *map, int
*head = m;
}
static void merge(struct merge *merges, int nmerges, unsigned char *map, FILE *f, int bytes, long long nrec) {
static void merge(struct merge *merges, int nmerges, unsigned char *map, FILE *f, int bytes, long long nrec, char *geom_map, FILE *geom_out, long long *geompos) {
int i;
struct merge *head = NULL;
long long along = 0;
@ -321,6 +321,10 @@ static void merge(struct merge *merges, int nmerges, unsigned char *map, FILE *f
}
while (head != NULL) {
struct index *ix = (struct index *) (map + head->start);
fwrite_check(geom_map + ix->start, 1, ix->end - ix->start, geom_out, "merge geometry");
*geompos += ix->end - ix->start;
fwrite_check(map + head->start, bytes, 1, f, "merge temporary");
head->start += bytes;
@ -1440,6 +1444,26 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
}
fclose(indexfile);
char geomname[strlen(tmpdir) + strlen("/geom.XXXXXXXX") + 1];
FILE *geomfile;
int geomfd;
long long geompos = 0;
struct stat geomst;
sprintf(geomname, "%s%s", tmpdir, "/geom.XXXXXXXX");
geomfd = mkstemp(geomname);
if (geomfd < 0) {
perror(geomname);
exit(EXIT_FAILURE);
}
geomfile = fopen(geomname, "wb");
if (geomfile == NULL) {
perror(geomname);
exit(EXIT_FAILURE);
}
int geomfd2;
/* Sort the index by geometry */
{
@ -1456,6 +1480,9 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
int nmerges = (indexpos + unit - 1) / unit;
struct merge merges[nmerges];
for (i = 0; i < nmerges; i++) {
merges[i].start = merges[i].end = 0;
}
pthread_t pthreads[CPUS];
struct sort_arg args[CPUS];
@ -1491,23 +1518,115 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
}
}
void *map = mmap(NULL, indexpos, PROT_READ, MAP_PRIVATE, indexfd, 0);
char *map = mmap(NULL, indexpos, PROT_READ | PROT_WRITE, MAP_SHARED, indexfd, 0);
if (map == MAP_FAILED) {
perror("mmap");
exit(EXIT_FAILURE);
}
FILE *f = fopen(indexname, "w");
/*
* This is the last opportunity to access the geometry in
* close to the original order, so that we can reorder it
* without thrashing.
*
* Each of the sorted index chunks originally had contiguous
* geography, so it can be copied relatively cheaply in sorted order
* into the temporary files that are then merged together to produce
* the final geometry.
*/
for (i = 0; i < CPUS; i++) {
reader[i].geom_map = NULL;
if (reader[i].geomst.st_size > 0) {
reader[i].geom_map = mmap(NULL, reader[i].geomst.st_size, PROT_READ, MAP_PRIVATE, reader[i].geomfd, 0);
if (reader[i].geom_map == MAP_FAILED) {
perror("mmap unsorted geometry");
exit(EXIT_FAILURE);
}
}
}
for (i = 0; i < nmerges; i++) {
if (!quiet && nmerges > 1) {
fprintf(stderr, "Reordering geometry: part %d of %d \r", i + 1, nmerges);
}
long long j;
for (j = merges[i].start; j < merges[i].end; j += sizeof(struct index)) {
struct index *ix = (struct index *) (map + j);
long long start = geompos;
fwrite_check(reader[ix->segment].geom_map + ix->start, sizeof(char), ix->end - ix->start, geomfile, fname);
geompos += ix->end - ix->start;
// Repoint the index to where we just copied the geometry
ix->start = start;
ix->end = geompos;
}
}
if (!quiet && nmerges > 1) {
fprintf(stderr, "\n");
}
fclose(geomfile);
long long pre_merged_geompos = geompos;
char *geom_map = mmap(NULL, geompos, PROT_READ, MAP_PRIVATE, geomfd, 0);
if (geom_map == MAP_FAILED) {
perror("mmap");
exit(EXIT_FAILURE);
}
FILE *f = fopen(indexname, "wb");
if (f == NULL) {
perror(indexname);
exit(EXIT_FAILURE);
}
merge(merges, nmerges, (unsigned char *) map, f, bytes, indexpos / bytes);
sprintf(geomname, "%s%s", tmpdir, "/geom.XXXXXXXX");
geomfd2 = mkstemp(geomname);
if (geomfd2 < 0) {
perror(geomname);
exit(EXIT_FAILURE);
}
geomfile = fopen(geomname, "wb");
if (geomfile == NULL) {
perror(geomname);
exit(EXIT_FAILURE);
}
for (i = 0; i < CPUS; i++) {
if (reader[i].geomst.st_size > 0) {
if (munmap(reader[i].geom_map, reader[i].geomst.st_size) != 0) {
perror("unmap unsorted geometry");
}
}
if (close(reader[i].geomfd) != 0) {
perror("close unsorted geometry");
}
}
geompos = 0;
/* initial tile is 0/0/0 */
serialize_int(geomfile, 0, &geompos, fname);
serialize_uint(geomfile, 0, &geompos, fname);
serialize_uint(geomfile, 0, &geompos, fname);
merge(merges, nmerges, (unsigned char *) map, f, bytes, indexpos / bytes, geom_map, geomfile, &geompos);
munmap(map, indexpos);
fclose(f);
close(indexfd);
munmap(geom_map, pre_merged_geompos);
close(geomfd);
/* end of tile */
serialize_byte(geomfile, -2, &geompos, fname);
fclose(geomfile);
}
indexfd = open(indexname, O_RDONLY);
@ -1697,104 +1816,17 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
}
unlink(indexname);
for (i = 0; i < CPUS; i++) {
reader[i].geom_map = NULL;
if (reader[i].geomst.st_size > 0) {
reader[i].geom_map = mmap(NULL, reader[i].geomst.st_size, PROT_READ, MAP_PRIVATE, reader[i].geomfd, 0);
if (reader[i].geom_map == MAP_FAILED) {
perror("mmap unsorted geometry");
exit(EXIT_FAILURE);
}
}
}
char geomname[strlen(tmpdir) + strlen("/geom.XXXXXXXX") + 1];
FILE *geomfile;
int geomfd;
long long geompos = 0;
struct stat geomst;
sprintf(geomname, "%s%s", tmpdir, "/geom.XXXXXXXX");
geomfd = mkstemp(geomname);
if (geomfd < 0) {
perror(geomname);
exit(EXIT_FAILURE);
}
geomfile = fopen(geomname, "wb");
if (geomfile == NULL) {
perror(geomname);
exit(EXIT_FAILURE);
}
{
geompos = 0;
/* initial tile is 0/0/0 */
serialize_int(geomfile, 0, &geompos, fname);
serialize_uint(geomfile, 0, &geompos, fname);
serialize_uint(geomfile, 0, &geompos, fname);
long long i;
long long sum = 0;
long long progress = 0;
unsigned long long prev = 0;
for (i = 0; i < indexpos / sizeof(struct index); i++) {
if (index_map[i].index < prev) {
unsigned x, y;
decode(index_map[i].index, &x, &y);
double lat, lon;
tile2latlon(x, y, 32, &lat, &lon);
fprintf(stderr, "Internal error: index out of order. %lld vs %lld: %lf,%lf\n", index_map[i].index, prev, lat, lon);
}
prev = index_map[i].index;
fwrite_check(reader[index_map[i].segment].geom_map + index_map[i].start, sizeof(char), index_map[i].end - index_map[i].start, geomfile, fname);
sum += index_map[i].end - index_map[i].start;
geompos += index_map[i].end - index_map[i].start;
long long p = 1000 * i / (indexpos / sizeof(struct index));
if (p != progress) {
if (!quiet) {
fprintf(stderr, "Reordering geometry: %3.1f%%\r", p / 10.0);
}
progress = p;
}
}
/* end of tile */
serialize_byte(geomfile, -2, &geompos, fname);
fclose(geomfile);
}
if (munmap(index_map, indexpos) != 0) {
perror("unmap sorted index");
}
for (i = 0; i < CPUS; i++) {
if (reader[i].geomst.st_size > 0) {
if (munmap(reader[i].geom_map, reader[i].geomst.st_size) != 0) {
perror("unmap unsorted geometry");
}
}
if (close(reader[i].geomfd) != 0) {
perror("close unsorted geometry");
}
}
if (close(indexfd) != 0) {
perror("close sorted index");
}
/* Traverse and split the geometries for each zoom level */
geomfd = open(geomname, O_RDONLY);
if (geomfd < 0) {
perror("reopen sorted geometry");
exit(EXIT_FAILURE);
}
unlink(geomname);
if (fstat(geomfd, &geomst) != 0) {
if (fstat(geomfd2, &geomst) != 0) {
perror("stat sorted geom\n");
exit(EXIT_FAILURE);
}
@ -1802,7 +1834,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
int fd[TEMP_FILES];
off_t size[TEMP_FILES];
fd[0] = geomfd;
fd[0] = geomfd2;
size[0] = geomst.st_size;
int j;