mirror of
https://github.com/mapbox/tippecanoe.git
synced 2025-03-25 13:17:38 +00:00
Factor out parallel reading; start to set up semi-parallel reading
This commit is contained in:
parent
29db0e8988
commit
132b7ecd12
227
geojson.c
227
geojson.c
@ -902,6 +902,84 @@ void *run_sort(void *v) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void do_read_parallel(char *map, long long len, long long initial_offset, const char *reading, struct reader *reader, volatile long long *progress_seq, struct pool *exclude, struct pool *include, int exclude_all, char *fname, int maxzoom, int basezoom, int source, int nlayers, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y) {
|
||||
long long segs[CPUS + 1];
|
||||
segs[0] = 0;
|
||||
segs[CPUS] = len;
|
||||
|
||||
int i;
|
||||
for (i = 1; i < CPUS; i++) {
|
||||
segs[i] = len * i / CPUS;
|
||||
|
||||
while (segs[i] < len && map[segs[i]] != '\n') {
|
||||
segs[i]++;
|
||||
}
|
||||
}
|
||||
|
||||
long long layer_seq[CPUS];
|
||||
for (i = 0; i < CPUS; i++) {
|
||||
// To preserve feature ordering, unique id for each segment
|
||||
// begins with that segment's offset into the input
|
||||
layer_seq[i] = segs[i] + initial_offset;
|
||||
}
|
||||
|
||||
struct parse_json_args pja[CPUS];
|
||||
pthread_t pthreads[CPUS];
|
||||
|
||||
for (i = 0; i < CPUS; i++) {
|
||||
pja[i].jp = json_begin_map(map + segs[i], segs[i + 1] - segs[i]);
|
||||
pja[i].reading = reading;
|
||||
pja[i].layer_seq = &layer_seq[i];
|
||||
pja[i].progress_seq = progress_seq;
|
||||
pja[i].metapos = &reader[i].metapos;
|
||||
pja[i].geompos = &reader[i].geompos;
|
||||
pja[i].indexpos = &reader[i].indexpos;
|
||||
pja[i].exclude = exclude;
|
||||
pja[i].include = include;
|
||||
pja[i].exclude_all = exclude_all;
|
||||
pja[i].metafile = reader[i].metafile;
|
||||
pja[i].geomfile = reader[i].geomfile;
|
||||
pja[i].indexfile = reader[i].indexfile;
|
||||
pja[i].poolfile = reader[i].poolfile;
|
||||
pja[i].treefile = reader[i].treefile;
|
||||
pja[i].fname = fname;
|
||||
pja[i].maxzoom = maxzoom;
|
||||
pja[i].basezoom = basezoom;
|
||||
pja[i].layer = source < nlayers ? source : 0;
|
||||
pja[i].droprate = droprate;
|
||||
pja[i].file_bbox = reader[i].file_bbox;
|
||||
pja[i].segment = i;
|
||||
pja[i].initialized = &initialized[i];
|
||||
pja[i].initial_x = &initial_x[i];
|
||||
pja[i].initial_y = &initial_y[i];
|
||||
|
||||
if (pthread_create(&pthreads[i], NULL, run_parse_json, &pja[i]) != 0) {
|
||||
perror("pthread_create");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
for (i = 0; i < CPUS; i++) {
|
||||
void *retval;
|
||||
|
||||
if (pthread_join(pthreads[i], &retval) != 0) {
|
||||
perror("pthread_join");
|
||||
}
|
||||
|
||||
free(pja[i].jp->source);
|
||||
json_end(pja[i].jp);
|
||||
}
|
||||
}
|
||||
|
||||
void start_parsing(int fd, long long offset, volatile int *is_parsing, pthread_t *previous_reader) {
|
||||
#if 0
|
||||
if (pthread_create(previous_reader, NULL, run_start_parsing, start_parsing_args) == 0) {
|
||||
perror("pthread_create");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
int read_json(int argc, char **argv, char *fname, const char *layername, int maxzoom, int minzoom, int basezoom, double basezoom_marker_width, sqlite3 *outdb, struct pool *exclude, struct pool *include, int exclude_all, double droprate, int buffer, const char *tmpdir, double gamma, char *prevent, char *additional, int read_parallel) {
|
||||
int ret = EXIT_SUCCESS;
|
||||
|
||||
@ -1048,71 +1126,10 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
|
||||
}
|
||||
|
||||
if (map != NULL && map != MAP_FAILED) {
|
||||
long long segs[CPUS + 1];
|
||||
segs[0] = 0;
|
||||
segs[CPUS] = st.st_size - off;
|
||||
do_read_parallel(map, st.st_size - off, 0, reading, reader, &progress_seq, exclude, include, exclude_all, fname, maxzoom, basezoom, source, nlayers, droprate, initialized, initial_x, initial_y);
|
||||
|
||||
int i;
|
||||
for (i = 1; i < CPUS; i++) {
|
||||
segs[i] = off + (st.st_size - off) * i / CPUS;
|
||||
|
||||
while (segs[i] < st.st_size && map[segs[i]] != '\n') {
|
||||
segs[i]++;
|
||||
}
|
||||
}
|
||||
|
||||
long long layer_seq[CPUS];
|
||||
for (i = 0; i < CPUS; i++) {
|
||||
// To preserve feature ordering, unique id for each segment
|
||||
// begins with that segment's offset into the input
|
||||
layer_seq[i] = segs[i];
|
||||
}
|
||||
|
||||
struct parse_json_args pja[CPUS];
|
||||
pthread_t pthreads[CPUS];
|
||||
|
||||
for (i = 0; i < CPUS; i++) {
|
||||
pja[i].jp = json_begin_map(map + segs[i], segs[i + 1] - segs[i]);
|
||||
pja[i].reading = reading;
|
||||
pja[i].layer_seq = &layer_seq[i];
|
||||
pja[i].progress_seq = &progress_seq;
|
||||
pja[i].metapos = &reader[i].metapos;
|
||||
pja[i].geompos = &reader[i].geompos;
|
||||
pja[i].indexpos = &reader[i].indexpos;
|
||||
pja[i].exclude = exclude;
|
||||
pja[i].include = include;
|
||||
pja[i].exclude_all = exclude_all;
|
||||
pja[i].metafile = reader[i].metafile;
|
||||
pja[i].geomfile = reader[i].geomfile;
|
||||
pja[i].indexfile = reader[i].indexfile;
|
||||
pja[i].poolfile = reader[i].poolfile;
|
||||
pja[i].treefile = reader[i].treefile;
|
||||
pja[i].fname = fname;
|
||||
pja[i].maxzoom = maxzoom;
|
||||
pja[i].basezoom = basezoom;
|
||||
pja[i].layer = source < nlayers ? source : 0;
|
||||
pja[i].droprate = droprate;
|
||||
pja[i].file_bbox = reader[i].file_bbox;
|
||||
pja[i].segment = i;
|
||||
pja[i].initialized = &initialized[i];
|
||||
pja[i].initial_x = &initial_x[i];
|
||||
pja[i].initial_y = &initial_y[i];
|
||||
|
||||
if (pthread_create(&pthreads[i], NULL, run_parse_json, &pja[i]) != 0) {
|
||||
perror("pthread_create");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
for (i = 0; i < CPUS; i++) {
|
||||
void *retval;
|
||||
|
||||
if (pthread_join(pthreads[i], &retval) != 0) {
|
||||
perror("pthread_join");
|
||||
}
|
||||
|
||||
free(pja[i].jp->source);
|
||||
json_end(pja[i].jp);
|
||||
if (munmap(map, st.st_size - off) != 0) {
|
||||
perror("munmap source file");
|
||||
}
|
||||
} else {
|
||||
FILE *fp = fdopen(fd, "r");
|
||||
@ -1122,10 +1139,84 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
|
||||
continue;
|
||||
}
|
||||
|
||||
long long layer_seq = 0;
|
||||
json_pull *jp = json_begin_file(fp);
|
||||
parse_json(jp, reading, &layer_seq, &progress_seq, &reader[0].metapos, &reader[0].geompos, &reader[0].indexpos, exclude, include, exclude_all, reader[0].metafile, reader[0].geomfile, reader[0].indexfile, reader[0].poolfile, reader[0].treefile, fname, maxzoom, basezoom, source < nlayers ? source : 0, droprate, reader[0].file_bbox, 0, &initialized[0], &initial_x[0], &initial_y[0]);
|
||||
json_end(jp);
|
||||
if (read_parallel) {
|
||||
// Serial reading of chunks that are then parsed in parallel
|
||||
|
||||
char readname[strlen(tmpdir) + strlen("/read.XXXXXXXX") + 1];
|
||||
sprintf(readname, "%s%s", tmpdir, "/read.XXXXXXXX");
|
||||
int readfd = mkstemp(readname);
|
||||
if (readfd < 0) {
|
||||
perror(readname);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
FILE *readfp = fdopen(readfd, "w");
|
||||
if (readfp == NULL) {
|
||||
perror(readname);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
unlink(readname);
|
||||
|
||||
int c;
|
||||
volatile int is_parsing = 0;
|
||||
long long ahead = 0;
|
||||
long long initial_offset = 0;
|
||||
pthread_t previous_reader;
|
||||
|
||||
while ((c = getc(fp)) != EOF) {
|
||||
putc(c, readfp);
|
||||
ahead++;
|
||||
|
||||
if (c == '\n' && ahead > 100000 && is_parsing == 0) {
|
||||
if (initial_offset != 0) {
|
||||
if (pthread_join(previous_reader, NULL) != 0) {
|
||||
perror("pthread_join");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
fclose(readfp);
|
||||
start_parsing(readfd, initial_offset, &is_parsing, &previous_reader);
|
||||
|
||||
initial_offset += ahead;
|
||||
ahead = 0;
|
||||
|
||||
sprintf(readname, "%s%s", tmpdir, "/read.XXXXXXXX");
|
||||
int readfd = mkstemp(readname);
|
||||
if (readfd < 0) {
|
||||
perror(readname);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
FILE *readfp = fdopen(readfd, "w");
|
||||
if (readfp == NULL) {
|
||||
perror(readname);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
unlink(readname);
|
||||
}
|
||||
}
|
||||
|
||||
if (initial_offset != 0) {
|
||||
if (pthread_join(previous_reader, NULL) != 0) {
|
||||
perror("pthread_join");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
fclose(readfp);
|
||||
start_parsing(readfd, initial_offset, &is_parsing, &previous_reader);
|
||||
|
||||
if (pthread_join(previous_reader, NULL) != 0) {
|
||||
perror("pthread_join");
|
||||
}
|
||||
} else {
|
||||
// Plain serial reading
|
||||
|
||||
long long layer_seq = 0;
|
||||
json_pull *jp = json_begin_file(fp);
|
||||
parse_json(jp, reading, &layer_seq, &progress_seq, &reader[0].metapos, &reader[0].geompos, &reader[0].indexpos, exclude, include, exclude_all, reader[0].metafile, reader[0].geomfile, reader[0].indexfile, reader[0].poolfile, reader[0].treefile, fname, maxzoom, basezoom, source < nlayers ? source : 0, droprate, reader[0].file_bbox, 0, &initialized[0], &initial_x[0], &initial_y[0]);
|
||||
json_end(jp);
|
||||
}
|
||||
|
||||
fclose(fp);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user