Merge pull request #150 from mapbox/stream-parallel

Chunked parallel reading of input streams
This commit is contained in:
Eric Fischer 2016-01-12 14:29:39 -08:00
commit 45043922e7
5 changed files with 280 additions and 83 deletions

View File

@ -1,3 +1,8 @@
## 1.7.0
* Parallel processing of input with -P works with streamed input too
* Error handling if unsupported options given to -p or -a
## 1.6.4
* Fix crashing bug when layers are being merged with -l

View File

@ -68,7 +68,7 @@ Options
* -P: Use multiple threads to read different parts of each input file at once.
This will only work if the input is line-delimited JSON with each Feature on its
own line, because it knows nothing of the top-level structure around the Features.
In addition, it only works if the input is a named file that can be mapped into memory
Performance will be better if the input is a named file that can be mapped into memory
rather than a stream that can only be read sequentially.
### Zoom levels and resolution

352
geojson.c
View File

@ -69,6 +69,11 @@ void init_cpus() {
CPUS = 1;
}
// Guard against short struct index.segment
if (CPUS > 32767) {
CPUS = 32767;
}
// Round down to a power of 2
CPUS = 1 << (int) (log(CPUS) / log(2));
@ -274,7 +279,8 @@ struct index {
long long start;
long long end;
unsigned long long index;
int segment;
short segment;
unsigned long long seq : (64 - 16); // pack with segment to stay in 32 bytes
};
int indexcmp(const void *v1, const void *v2) {
@ -287,6 +293,12 @@ int indexcmp(const void *v1, const void *v2) {
return 1;
}
if (i1->seq < i2->seq) {
return -1;
} else if (i1->seq > i2->seq) {
return 1;
}
return 0;
}
@ -599,6 +611,7 @@ int serialize_geometry(json_object *geometry, json_object *properties, const cha
index.start = geomstart;
index.end = *geompos;
index.segment = segment;
index.seq = *layer_seq;
// Calculate the center even if off the edge of the plane,
// and then mask to bring it back into the addressable area
@ -741,11 +754,6 @@ void parse_json(json_pull *jp, const char *reading, long long *layer_seq, volati
/* XXX check for any non-features in the outer object */
}
if (!quiet) {
fprintf(stderr, " \r");
// (stderr, "Read 10000.00 million features\r", *progress_seq / 1000000.0);
}
}
struct parse_json_args {
@ -902,6 +910,168 @@ void *run_sort(void *v) {
return NULL;
}
void do_read_parallel(char *map, long long len, long long initial_offset, const char *reading, struct reader *reader, volatile long long *progress_seq, struct pool *exclude, struct pool *include, int exclude_all, char *fname, int maxzoom, int basezoom, int source, int nlayers, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y) {
long long segs[CPUS + 1];
segs[0] = 0;
segs[CPUS] = len;
int i;
for (i = 1; i < CPUS; i++) {
segs[i] = len * i / CPUS;
while (segs[i] < len && map[segs[i]] != '\n') {
segs[i]++;
}
}
long long layer_seq[CPUS];
for (i = 0; i < CPUS; i++) {
// To preserve feature ordering, unique id for each segment
// begins with that segment's offset into the input
layer_seq[i] = segs[i] + initial_offset;
}
struct parse_json_args pja[CPUS];
pthread_t pthreads[CPUS];
for (i = 0; i < CPUS; i++) {
pja[i].jp = json_begin_map(map + segs[i], segs[i + 1] - segs[i]);
pja[i].reading = reading;
pja[i].layer_seq = &layer_seq[i];
pja[i].progress_seq = progress_seq;
pja[i].metapos = &reader[i].metapos;
pja[i].geompos = &reader[i].geompos;
pja[i].indexpos = &reader[i].indexpos;
pja[i].exclude = exclude;
pja[i].include = include;
pja[i].exclude_all = exclude_all;
pja[i].metafile = reader[i].metafile;
pja[i].geomfile = reader[i].geomfile;
pja[i].indexfile = reader[i].indexfile;
pja[i].poolfile = reader[i].poolfile;
pja[i].treefile = reader[i].treefile;
pja[i].fname = fname;
pja[i].maxzoom = maxzoom;
pja[i].basezoom = basezoom;
pja[i].layer = source < nlayers ? source : 0;
pja[i].droprate = droprate;
pja[i].file_bbox = reader[i].file_bbox;
pja[i].segment = i;
pja[i].initialized = &initialized[i];
pja[i].initial_x = &initial_x[i];
pja[i].initial_y = &initial_y[i];
if (pthread_create(&pthreads[i], NULL, run_parse_json, &pja[i]) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}
}
for (i = 0; i < CPUS; i++) {
void *retval;
if (pthread_join(pthreads[i], &retval) != 0) {
perror("pthread_join");
}
free(pja[i].jp->source);
json_end(pja[i].jp);
}
}
struct read_parallel_arg {
int fd;
FILE *fp;
long long offset;
long long len;
volatile int *is_parsing;
const char *reading;
struct reader *reader;
volatile long long *progress_seq;
struct pool *exclude;
struct pool *include;
int exclude_all;
char *fname;
int maxzoom;
int basezoom;
int source;
int nlayers;
double droprate;
int *initialized;
unsigned *initial_x;
unsigned *initial_y;
};
void *run_read_parallel(void *v) {
struct read_parallel_arg *a = v;
struct stat st;
if (fstat(a->fd, &st) != 0) {
perror("stat read temp");
}
if (a->len != st.st_size) {
fprintf(stderr, "wrong number of bytes in temporary: %lld vs %lld\n", a->len, (long long) st.st_size);
}
a->len = st.st_size;
char *map = mmap(NULL, a->len, PROT_READ, MAP_PRIVATE, a->fd, 0);
if (map == NULL || map == MAP_FAILED) {
perror("map intermediate input");
exit(EXIT_FAILURE);
}
do_read_parallel(map, a->len, a->offset, a->reading, a->reader, a->progress_seq, a->exclude, a->include, a->exclude_all, a->fname, a->maxzoom, a->basezoom, a->source, a->nlayers, a->droprate, a->initialized, a->initial_x, a->initial_y);
if (munmap(map, a->len) != 0) {
perror("munmap source file");
}
if (fclose(a->fp) != 0) {
perror("close source file");
}
*(a->is_parsing) = 0;
free(a);
return NULL;
}
void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile int *is_parsing, pthread_t *parallel_parser, const char *reading, struct reader *reader, volatile long long *progress_seq, struct pool *exclude, struct pool *include, int exclude_all, char *fname, int maxzoom, int basezoom, int source, int nlayers, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y) {
// This has to kick off an intermediate thread to start the parser threads,
// so the main thread can get back to reading the next input stage while
// the intermediate thread waits for the completion of the parser threads.
*is_parsing = 1;
struct read_parallel_arg *rpa = malloc(sizeof(struct read_parallel_arg));
rpa->fd = fd;
rpa->fp = fp;
rpa->offset = offset;
rpa->len = len;
rpa->is_parsing = is_parsing;
rpa->reading = reading;
rpa->reader = reader;
rpa->progress_seq = progress_seq;
rpa->exclude = exclude;
rpa->include = include;
rpa->exclude_all = exclude_all;
rpa->fname = fname;
rpa->maxzoom = maxzoom;
rpa->basezoom = basezoom;
rpa->source = source;
rpa->nlayers = nlayers;
rpa->droprate = droprate;
rpa->initialized = initialized;
rpa->initial_x = initial_x;
rpa->initial_y = initial_y;
if (pthread_create(parallel_parser, NULL, run_read_parallel, rpa) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}
}
int read_json(int argc, char **argv, char *fname, const char *layername, int maxzoom, int minzoom, int basezoom, double basezoom_marker_width, sqlite3 *outdb, struct pool *exclude, struct pool *include, int exclude_all, double droprate, int buffer, const char *tmpdir, double gamma, char *prevent, char *additional, int read_parallel) {
int ret = EXIT_SUCCESS;
@ -1048,71 +1218,10 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
}
if (map != NULL && map != MAP_FAILED) {
long long segs[CPUS + 1];
segs[0] = 0;
segs[CPUS] = st.st_size - off;
do_read_parallel(map, st.st_size - off, 0, reading, reader, &progress_seq, exclude, include, exclude_all, fname, maxzoom, basezoom, source, nlayers, droprate, initialized, initial_x, initial_y);
int i;
for (i = 1; i < CPUS; i++) {
segs[i] = off + (st.st_size - off) * i / CPUS;
while (segs[i] < st.st_size && map[segs[i]] != '\n') {
segs[i]++;
}
}
long long layer_seq[CPUS];
for (i = 0; i < CPUS; i++) {
// To preserve feature ordering, unique id for each segment
// begins with that segment's offset into the input
layer_seq[i] = segs[i];
}
struct parse_json_args pja[CPUS];
pthread_t pthreads[CPUS];
for (i = 0; i < CPUS; i++) {
pja[i].jp = json_begin_map(map + segs[i], segs[i + 1] - segs[i]);
pja[i].reading = reading;
pja[i].layer_seq = &layer_seq[i];
pja[i].progress_seq = &progress_seq;
pja[i].metapos = &reader[i].metapos;
pja[i].geompos = &reader[i].geompos;
pja[i].indexpos = &reader[i].indexpos;
pja[i].exclude = exclude;
pja[i].include = include;
pja[i].exclude_all = exclude_all;
pja[i].metafile = reader[i].metafile;
pja[i].geomfile = reader[i].geomfile;
pja[i].indexfile = reader[i].indexfile;
pja[i].poolfile = reader[i].poolfile;
pja[i].treefile = reader[i].treefile;
pja[i].fname = fname;
pja[i].maxzoom = maxzoom;
pja[i].basezoom = basezoom;
pja[i].layer = source < nlayers ? source : 0;
pja[i].droprate = droprate;
pja[i].file_bbox = reader[i].file_bbox;
pja[i].segment = i;
pja[i].initialized = &initialized[i];
pja[i].initial_x = &initial_x[i];
pja[i].initial_y = &initial_y[i];
if (pthread_create(&pthreads[i], NULL, run_parse_json, &pja[i]) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}
}
for (i = 0; i < CPUS; i++) {
void *retval;
if (pthread_join(pthreads[i], &retval) != 0) {
perror("pthread_join");
}
free(pja[i].jp->source);
json_end(pja[i].jp);
if (munmap(map, st.st_size - off) != 0) {
perror("munmap source file");
}
} else {
FILE *fp = fdopen(fd, "r");
@ -1122,14 +1231,103 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
continue;
}
long long layer_seq = 0;
json_pull *jp = json_begin_file(fp);
parse_json(jp, reading, &layer_seq, &progress_seq, &reader[0].metapos, &reader[0].geompos, &reader[0].indexpos, exclude, include, exclude_all, reader[0].metafile, reader[0].geomfile, reader[0].indexfile, reader[0].poolfile, reader[0].treefile, fname, maxzoom, basezoom, source < nlayers ? source : 0, droprate, reader[0].file_bbox, 0, &initialized[0], &initial_x[0], &initial_y[0]);
json_end(jp);
if (read_parallel) {
// Serial reading of chunks that are then parsed in parallel
char readname[strlen(tmpdir) + strlen("/read.XXXXXXXX") + 1];
sprintf(readname, "%s%s", tmpdir, "/read.XXXXXXXX");
int readfd = mkstemp(readname);
if (readfd < 0) {
perror(readname);
exit(EXIT_FAILURE);
}
FILE *readfp = fdopen(readfd, "w");
if (readfp == NULL) {
perror(readname);
exit(EXIT_FAILURE);
}
unlink(readname);
volatile int is_parsing = 0;
long long ahead = 0;
long long initial_offset = 0;
pthread_t parallel_parser;
#define READ_BUF 2000
#define PARSE_MIN 10000000
char buf[READ_BUF];
int n;
while ((n = fread(buf, sizeof(char), READ_BUF, fp)) > 0) {
fwrite_check(buf, sizeof(char), n, readfp, reading);
ahead += n;
if (buf[n - 1] == '\n' && ahead > PARSE_MIN && is_parsing == 0) {
if (initial_offset != 0) {
if (pthread_join(parallel_parser, NULL) != 0) {
perror("pthread_join");
exit(EXIT_FAILURE);
}
}
fflush(readfp);
start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, &parallel_parser, reading, reader, &progress_seq, exclude, include, exclude_all, fname, maxzoom, basezoom, source, nlayers, droprate, initialized, initial_x, initial_y);
initial_offset += ahead;
ahead = 0;
sprintf(readname, "%s%s", tmpdir, "/read.XXXXXXXX");
readfd = mkstemp(readname);
if (readfd < 0) {
perror(readname);
exit(EXIT_FAILURE);
}
readfp = fdopen(readfd, "w");
if (readfp == NULL) {
perror(readname);
exit(EXIT_FAILURE);
}
unlink(readname);
}
}
if (n < 0) {
perror(reading);
}
if (initial_offset != 0) {
if (pthread_join(parallel_parser, NULL) != 0) {
perror("pthread_join");
exit(EXIT_FAILURE);
}
}
fflush(readfp);
if (ahead > 0) {
start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, &parallel_parser, reading, reader, &progress_seq, exclude, include, exclude_all, fname, maxzoom, basezoom, source, nlayers, droprate, initialized, initial_x, initial_y);
if (pthread_join(parallel_parser, NULL) != 0) {
perror("pthread_join");
}
}
} else {
// Plain serial reading
long long layer_seq = 0;
json_pull *jp = json_begin_file(fp);
parse_json(jp, reading, &layer_seq, &progress_seq, &reader[0].metapos, &reader[0].geompos, &reader[0].indexpos, exclude, include, exclude_all, reader[0].metafile, reader[0].geomfile, reader[0].indexfile, reader[0].poolfile, reader[0].treefile, fname, maxzoom, basezoom, source < nlayers ? source : 0, droprate, reader[0].file_bbox, 0, &initialized[0], &initial_x[0], &initial_y[0]);
json_end(jp);
}
fclose(fp);
}
}
if (!quiet) {
fprintf(stderr, " \r");
// (stderr, "Read 10000.00 million features\r", *progress_seq / 1000000.0);
}
for (i = 0; i < CPUS; i++) {
fclose(reader[i].metafile);
fclose(reader[i].geomfile);
@ -1245,12 +1443,6 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
fprintf(stderr, "Sorting %lld features\n", (long long) indexpos / bytes);
}
// XXX On machines with different page sizes, doing the sorting
// in different-sized chunks can cause features with the same
// index (i.e., the same bbox) to appear in different orders
// because the sort is unstable. This doesn't seem worth spending
// more memory to fix, but could make tests unstable.
int page = sysconf(_SC_PAGESIZE);
long long unit = (50 * 1024 * 1024 / bytes) * bytes;
while (unit % page != 0) {

View File

@ -74,7 +74,7 @@ specified, the files are all merged into the single named layer.
\-P: Use multiple threads to read different parts of each input file at once.
This will only work if the input is line\-delimited JSON with each Feature on its
own line, because it knows nothing of the top\-level structure around the Features.
In addition, it only works if the input is a named file that can be mapped into memory
Performance will be better if the input is a named file that can be mapped into memory
rather than a stream that can only be read sequentially.
.RE
.SS Zoom levels and resolution

View File

@ -1 +1 @@
#define VERSION "tippecanoe v1.6.4\n"
#define VERSION "tippecanoe v1.7.0\n"