Use multiple threads to sort the features

This commit is contained in:
Eric Fischer 2016-01-04 13:39:34 -08:00
parent b39a1714c0
commit 2b378ceb9f

125
geojson.c

@ -844,6 +844,64 @@ struct reader {
char *geom_map;
};
struct sort_arg {
int task;
int cpus;
long long indexpos;
struct merge *merges;
int indexfd;
int nmerges;
long long unit;
int bytes;
};
void *run_sort(void *v) {
struct sort_arg *a = v;
long long start;
for (start = a->task * a->unit; start < a->indexpos; start += a->unit * a->cpus) {
long long end = start + a->unit;
if (end > a->indexpos) {
end = a->indexpos;
}
if (a->nmerges != 1) {
if (!quiet) {
fprintf(stderr, "Sorting part %lld of %d \r", start / a->unit + 1, a->nmerges);
}
}
a->merges[start / a->unit].start = start;
a->merges[start / a->unit].end = end;
a->merges[start / a->unit].next = NULL;
// MAP_PRIVATE to avoid disk writes if it fits in memory
void *map = mmap(NULL, end - start, PROT_READ | PROT_WRITE, MAP_PRIVATE, a->indexfd, start);
if (map == MAP_FAILED) {
perror("mmap");
exit(EXIT_FAILURE);
}
qsort(map, (end - start) / a->bytes, a->bytes, indexcmp);
// Sorting and then copying avoids disk access to
// write out intermediate stages of the sort.
void *map2 = mmap(NULL, end - start, PROT_READ | PROT_WRITE, MAP_SHARED, a->indexfd, start);
if (map2 == MAP_FAILED) {
perror("mmap (write)");
exit(EXIT_FAILURE);
}
memcpy(map2, map, end - start);
munmap(map, end - start);
munmap(map2, end - start);
}
return NULL;
}
int read_json(int argc, char **argv, char *fname, const char *layername, int maxzoom, int minzoom, int basezoom, double basezoom_marker_width, sqlite3 *outdb, struct pool *exclude, struct pool *include, int exclude_all, double droprate, int buffer, const char *tmpdir, double gamma, char *prevent, char *additional, int read_parallel) {
int ret = EXIT_SUCCESS;
@ -1187,6 +1245,12 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
fprintf(stderr, "Sorting %lld features\n", (long long) indexpos / bytes);
}
// XXX On machines with different page sizes, doing the sorting
// in different-sized chunks can cause features with the same
// index (i.e., the same bbox) to appear in different orders
// because the sort is unstable. This doesn't seem worth spending
// more memory to fix, but could make tests unstable.
int page = sysconf(_SC_PAGESIZE);
long long unit = (50 * 1024 * 1024 / bytes) * bytes;
while (unit % page != 0) {
@ -1196,45 +1260,32 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
int nmerges = (indexpos + unit - 1) / unit;
struct merge merges[nmerges];
long long start;
for (start = 0; start < indexpos; start += unit) {
long long end = start + unit;
if (end > indexpos) {
end = indexpos;
}
pthread_t pthreads[CPUS];
struct sort_arg args[CPUS];
if (nmerges != 1) {
if (!quiet) {
fprintf(stderr, "Sorting part %lld of %d\r", start / unit + 1, nmerges);
}
}
int i;
for (i = 0; i < CPUS; i++) {
args[i].task = i;
args[i].cpus = CPUS;
args[i].indexpos = indexpos;
args[i].merges = merges;
args[i].indexfd = indexfd;
args[i].nmerges = nmerges;
args[i].unit = unit;
args[i].bytes = bytes;
merges[start / unit].start = start;
merges[start / unit].end = end;
merges[start / unit].next = NULL;
// MAP_PRIVATE to avoid disk writes if it fits in memory
void *map = mmap(NULL, end - start, PROT_READ | PROT_WRITE, MAP_PRIVATE, indexfd, start);
if (map == MAP_FAILED) {
perror("mmap");
if (pthread_create(&pthreads[i], NULL, run_sort, &args[i]) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}
}
qsort(map, (end - start) / bytes, bytes, indexcmp);
for (i = 0; i < CPUS; i++) {
void *retval;
// Sorting and then copying avoids the need to
// write out intermediate stages of the sort.
void *map2 = mmap(NULL, end - start, PROT_READ | PROT_WRITE, MAP_SHARED, indexfd, start);
if (map2 == MAP_FAILED) {
perror("mmap (write)");
exit(EXIT_FAILURE);
if (pthread_join(pthreads[i], &retval) != 0) {
perror("pthread_join");
}
memcpy(map2, map, end - start);
munmap(map, end - start);
munmap(map2, end - start);
}
if (nmerges != 1) {
@ -1505,7 +1556,17 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
long long i;
long long sum = 0;
long long progress = 0;
unsigned long long prev = 0;
for (i = 0; i < indexpos / sizeof(struct index); i++) {
if (index_map[i].index < prev) {
unsigned x, y;
decode(index_map[i].index, &x, &y);
double lat, lon;
tile2latlon(x, y, 32, &lat, &lon);
fprintf(stderr, "Internal error: index out of order. %lld vs %lld: %lf,%lf\n", index_map[i].index, prev, lat, lon);
}
prev = index_map[i].index;
fwrite_check(reader[index_map[i].segment].geom_map + index_map[i].start, sizeof(char), index_map[i].end - index_map[i].start, geomfile, fname);
sum += index_map[i].end - index_map[i].start;
geompos += index_map[i].end - index_map[i].start;