diff --git a/geojson.c b/geojson.c index 35f06c0..19ef9fa 100644 --- a/geojson.c +++ b/geojson.c @@ -902,6 +902,84 @@ void *run_sort(void *v) { return NULL; } +void do_read_parallel(char *map, long long len, long long initial_offset, const char *reading, struct reader *reader, volatile long long *progress_seq, struct pool *exclude, struct pool *include, int exclude_all, char *fname, int maxzoom, int basezoom, int source, int nlayers, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y) { + long long segs[CPUS + 1]; + segs[0] = 0; + segs[CPUS] = len; + + int i; + for (i = 1; i < CPUS; i++) { + segs[i] = len * i / CPUS; + + while (segs[i] < len && map[segs[i]] != '\n') { + segs[i]++; + } + } + + long long layer_seq[CPUS]; + for (i = 0; i < CPUS; i++) { + // To preserve feature ordering, unique id for each segment + // begins with that segment's offset into the input + layer_seq[i] = segs[i] + initial_offset; + } + + struct parse_json_args pja[CPUS]; + pthread_t pthreads[CPUS]; + + for (i = 0; i < CPUS; i++) { + pja[i].jp = json_begin_map(map + segs[i], segs[i + 1] - segs[i]); + pja[i].reading = reading; + pja[i].layer_seq = &layer_seq[i]; + pja[i].progress_seq = progress_seq; + pja[i].metapos = &reader[i].metapos; + pja[i].geompos = &reader[i].geompos; + pja[i].indexpos = &reader[i].indexpos; + pja[i].exclude = exclude; + pja[i].include = include; + pja[i].exclude_all = exclude_all; + pja[i].metafile = reader[i].metafile; + pja[i].geomfile = reader[i].geomfile; + pja[i].indexfile = reader[i].indexfile; + pja[i].poolfile = reader[i].poolfile; + pja[i].treefile = reader[i].treefile; + pja[i].fname = fname; + pja[i].maxzoom = maxzoom; + pja[i].basezoom = basezoom; + pja[i].layer = source < nlayers ? source : 0; + pja[i].droprate = droprate; + pja[i].file_bbox = reader[i].file_bbox; + pja[i].segment = i; + pja[i].initialized = &initialized[i]; + pja[i].initial_x = &initial_x[i]; + pja[i].initial_y = &initial_y[i]; + + if (pthread_create(&pthreads[i], NULL, run_parse_json, &pja[i]) != 0) { + perror("pthread_create"); + exit(EXIT_FAILURE); + } + } + + for (i = 0; i < CPUS; i++) { + void *retval; + + if (pthread_join(pthreads[i], &retval) != 0) { + perror("pthread_join"); + } + + free(pja[i].jp->source); + json_end(pja[i].jp); + } +} + +void start_parsing(int fd, long long offset, volatile int *is_parsing, pthread_t *previous_reader) { +#if 0 + if (pthread_create(previous_reader, NULL, run_start_parsing, start_parsing_args) == 0) { + perror("pthread_create"); + exit(EXIT_FAILURE); + } +#endif +} + int read_json(int argc, char **argv, char *fname, const char *layername, int maxzoom, int minzoom, int basezoom, double basezoom_marker_width, sqlite3 *outdb, struct pool *exclude, struct pool *include, int exclude_all, double droprate, int buffer, const char *tmpdir, double gamma, char *prevent, char *additional, int read_parallel) { int ret = EXIT_SUCCESS; @@ -1048,71 +1126,10 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max } if (map != NULL && map != MAP_FAILED) { - long long segs[CPUS + 1]; - segs[0] = 0; - segs[CPUS] = st.st_size - off; + do_read_parallel(map, st.st_size - off, 0, reading, reader, &progress_seq, exclude, include, exclude_all, fname, maxzoom, basezoom, source, nlayers, droprate, initialized, initial_x, initial_y); - int i; - for (i = 1; i < CPUS; i++) { - segs[i] = off + (st.st_size - off) * i / CPUS; - - while (segs[i] < st.st_size && map[segs[i]] != '\n') { - segs[i]++; - } - } - - long long layer_seq[CPUS]; - for (i = 0; i < CPUS; i++) { - // To preserve feature ordering, unique id for each segment - // begins with that segment's offset into the input - layer_seq[i] = segs[i]; - } - - struct parse_json_args pja[CPUS]; - pthread_t pthreads[CPUS]; - - for (i = 0; i < CPUS; i++) { - pja[i].jp = json_begin_map(map + segs[i], segs[i + 1] - segs[i]); - pja[i].reading = reading; - pja[i].layer_seq = &layer_seq[i]; - pja[i].progress_seq = &progress_seq; - pja[i].metapos = &reader[i].metapos; - pja[i].geompos = &reader[i].geompos; - pja[i].indexpos = &reader[i].indexpos; - pja[i].exclude = exclude; - pja[i].include = include; - pja[i].exclude_all = exclude_all; - pja[i].metafile = reader[i].metafile; - pja[i].geomfile = reader[i].geomfile; - pja[i].indexfile = reader[i].indexfile; - pja[i].poolfile = reader[i].poolfile; - pja[i].treefile = reader[i].treefile; - pja[i].fname = fname; - pja[i].maxzoom = maxzoom; - pja[i].basezoom = basezoom; - pja[i].layer = source < nlayers ? source : 0; - pja[i].droprate = droprate; - pja[i].file_bbox = reader[i].file_bbox; - pja[i].segment = i; - pja[i].initialized = &initialized[i]; - pja[i].initial_x = &initial_x[i]; - pja[i].initial_y = &initial_y[i]; - - if (pthread_create(&pthreads[i], NULL, run_parse_json, &pja[i]) != 0) { - perror("pthread_create"); - exit(EXIT_FAILURE); - } - } - - for (i = 0; i < CPUS; i++) { - void *retval; - - if (pthread_join(pthreads[i], &retval) != 0) { - perror("pthread_join"); - } - - free(pja[i].jp->source); - json_end(pja[i].jp); + if (munmap(map, st.st_size - off) != 0) { + perror("munmap source file"); } } else { FILE *fp = fdopen(fd, "r"); @@ -1122,10 +1139,84 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max continue; } - long long layer_seq = 0; - json_pull *jp = json_begin_file(fp); - parse_json(jp, reading, &layer_seq, &progress_seq, &reader[0].metapos, &reader[0].geompos, &reader[0].indexpos, exclude, include, exclude_all, reader[0].metafile, reader[0].geomfile, reader[0].indexfile, reader[0].poolfile, reader[0].treefile, fname, maxzoom, basezoom, source < nlayers ? source : 0, droprate, reader[0].file_bbox, 0, &initialized[0], &initial_x[0], &initial_y[0]); - json_end(jp); + if (read_parallel) { + // Serial reading of chunks that are then parsed in parallel + + char readname[strlen(tmpdir) + strlen("/read.XXXXXXXX") + 1]; + sprintf(readname, "%s%s", tmpdir, "/read.XXXXXXXX"); + int readfd = mkstemp(readname); + if (readfd < 0) { + perror(readname); + exit(EXIT_FAILURE); + } + FILE *readfp = fdopen(readfd, "w"); + if (readfp == NULL) { + perror(readname); + exit(EXIT_FAILURE); + } + unlink(readname); + + int c; + volatile int is_parsing = 0; + long long ahead = 0; + long long initial_offset = 0; + pthread_t previous_reader; + + while ((c = getc(fp)) != EOF) { + putc(c, readfp); + ahead++; + + if (c == '\n' && ahead > 100000 && is_parsing == 0) { + if (initial_offset != 0) { + if (pthread_join(previous_reader, NULL) != 0) { + perror("pthread_join"); + exit(EXIT_FAILURE); + } + } + + fclose(readfp); + start_parsing(readfd, initial_offset, &is_parsing, &previous_reader); + + initial_offset += ahead; + ahead = 0; + + sprintf(readname, "%s%s", tmpdir, "/read.XXXXXXXX"); + int readfd = mkstemp(readname); + if (readfd < 0) { + perror(readname); + exit(EXIT_FAILURE); + } + FILE *readfp = fdopen(readfd, "w"); + if (readfp == NULL) { + perror(readname); + exit(EXIT_FAILURE); + } + unlink(readname); + } + } + + if (initial_offset != 0) { + if (pthread_join(previous_reader, NULL) != 0) { + perror("pthread_join"); + exit(EXIT_FAILURE); + } + } + + fclose(readfp); + start_parsing(readfd, initial_offset, &is_parsing, &previous_reader); + + if (pthread_join(previous_reader, NULL) != 0) { + perror("pthread_join"); + } + } else { + // Plain serial reading + + long long layer_seq = 0; + json_pull *jp = json_begin_file(fp); + parse_json(jp, reading, &layer_seq, &progress_seq, &reader[0].metapos, &reader[0].geompos, &reader[0].indexpos, exclude, include, exclude_all, reader[0].metafile, reader[0].geomfile, reader[0].indexfile, reader[0].poolfile, reader[0].treefile, fname, maxzoom, basezoom, source < nlayers ? source : 0, droprate, reader[0].file_bbox, 0, &initialized[0], &initial_x[0], &initial_y[0]); + json_end(jp); + } + fclose(fp); } }