From 223d8377362e2f4e87dbcf2d6fd0e16b64d82603 Mon Sep 17 00:00:00 2001 From: Eric Fischer Date: Thu, 31 Aug 2017 13:49:19 -0700 Subject: [PATCH] Multithreaded geobuf feature parsing --- geobuf.cpp | 93 +++++++++++++++++++++++++++-- main.cpp | 167 +++++++++++++++++++++++++++++------------------------ 2 files changed, 178 insertions(+), 82 deletions(-) diff --git a/geobuf.cpp b/geobuf.cpp index 33d3563..e5f0b66 100644 --- a/geobuf.cpp +++ b/geobuf.cpp @@ -1,11 +1,13 @@ #include #include #include +#include #include "mvt.hpp" #include "serial.hpp" #include "geobuf.hpp" #include "geojson.hpp" #include "projection.hpp" +#include "main.hpp" #include "protozero/varint.hpp" #include "protozero/pbf_reader.hpp" #include "protozero/pbf_writer.hpp" @@ -19,6 +21,18 @@ #define POLYGON 4 #define MULTIPOLYGON 5 +struct queued_feature { + protozero::pbf_reader pbf; + size_t dim; + double e; + std::vector *keys; + struct serialization_state *sst; + int layer; + std::string layername; +}; + +static std::vector feature_queue; + void ensureDim(size_t dim) { if (dim < 2) { fprintf(stderr, "Geometry has fewer than 2 dimensions: %zu\n", dim); @@ -359,13 +373,13 @@ void readFeature(protozero::pbf_reader &pbf, size_t dim, double e, std::vectorsegment; sf.has_id = has_id; sf.id = id; sf.has_tippecanoe_minzoom = false; sf.has_tippecanoe_maxzoom = false; sf.feature_minzoom = false; - sf.seq = (*sst->layer_seq); + sf.seq = *(sst->layer_seq); sf.geometry = dv[i].dv; sf.t = dv[i].type; sf.full_keys = full_keys; @@ -404,12 +418,79 @@ void readFeature(protozero::pbf_reader &pbf, size_t dim, double e, std::vectorstart; i < qra->end; i++) { + struct queued_feature &qf = feature_queue[i]; + readFeature(qf.pbf, qf.dim, qf.e, *qf.keys, &qf.sst[qra->segment], qf.layer, qf.layername); + } + + return NULL; +} + +void runQueue() { + if (feature_queue.size() == 0) { + return; + } + + struct queue_run_arg qra[CPUS]; + pthread_t pthreads[CPUS]; + + for (size_t i = 0; i < CPUS; i++) { + *(feature_queue[0].sst[i].layer_seq) = *(feature_queue[0].sst[0].layer_seq) + feature_queue.size() * i / CPUS; + + qra[i].start = feature_queue.size() * i / CPUS; + qra[i].end = feature_queue.size() * (i + 1) / CPUS; + qra[i].segment = i; + + if (pthread_create(&pthreads[i], NULL, run_parse_feature, &qra[i]) != 0) { + perror("pthread_create"); + exit(EXIT_FAILURE); + } + } + + for (size_t i = 0; i < CPUS; i++) { + void *retval; + + if (pthread_join(pthreads[i], &retval) != 0) { + perror("pthread_join"); + } + } + + *(feature_queue[0].sst[0].layer_seq) = *(feature_queue[0].sst[CPUS - 1].layer_seq); + feature_queue.clear(); +} + +void queueFeature(protozero::pbf_reader &pbf, size_t dim, double e, std::vector &keys, struct serialization_state *sst, int layer, std::string layername) { + struct queued_feature qf; + qf.pbf = pbf; + qf.dim = dim; + qf.e = e; + qf.keys = &keys; + qf.sst = sst; + qf.layer = layer; + qf.layername = layername; + + feature_queue.push_back(qf); + + if (feature_queue.size() > CPUS * 500) { + runQueue(); + } +} + void outBareGeometry(drawvec const &dv, int type, size_t dim, double e, std::vector &keys, struct serialization_state *sst, int layer, std::string layername) { serial_feature sf; sf.layer = layer; sf.layername = layername; - sf.segment = 0; // single thread + sf.segment = sst->segment; sf.has_id = false; sf.has_tippecanoe_minzoom = false; sf.has_tippecanoe_maxzoom = false; @@ -427,7 +508,7 @@ void readFeatureCollection(protozero::pbf_reader &pbf, size_t dim, double e, std switch (pbf.tag()) { case 1: { protozero::pbf_reader feature_reader(pbf.get_message()); - readFeature(feature_reader, dim, e, keys, sst, layer, layername); + queueFeature(feature_reader, dim, e, keys, sst, layer, layername); break; } @@ -466,7 +547,7 @@ void parse_geobuf(struct serialization_state *sst, const char *src, size_t len, case 5: { protozero::pbf_reader feature_reader(pbf.get_message()); - readFeature(feature_reader, dim, e, keys, sst, layer, layername); + queueFeature(feature_reader, dim, e, keys, sst, layer, layername); break; } @@ -483,4 +564,6 @@ void parse_geobuf(struct serialization_state *sst, const char *src, size_t len, pbf.skip(); } } + + runQueue(); } diff --git a/main.cpp b/main.cpp index c17a328..5ae5c19 100644 --- a/main.cpp +++ b/main.cpp @@ -349,7 +349,7 @@ void *run_sort(void *v) { return NULL; } -void do_read_parallel(char *map, long long len, long long initial_offset, const char *reading, struct reader *reader, volatile long long *progress_seq, std::set *exclude, std::set *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector > *layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) { +void do_read_parallel(char *map, long long len, long long initial_offset, const char *reading, struct reader *readers, volatile long long *progress_seq, std::set *exclude, std::set *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector > *layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) { long long segs[CPUS + 1]; segs[0] = 0; segs[CPUS] = len; @@ -387,7 +387,7 @@ void do_read_parallel(char *map, long long len, long long initial_offset, const sst[i].line = 0; sst[i].layer_seq = &layer_seq[i]; sst[i].progress_seq = progress_seq; - sst[i].readers = reader; + sst[i].readers = readers; sst[i].segment = i; sst[i].initialized = &initialized[i]; sst[i].initial_x = &initial_x[i]; @@ -440,7 +440,7 @@ struct read_parallel_arg { int separator; const char *reading; - struct reader *reader; + struct reader *readers; volatile long long *progress_seq; std::set *exclude; std::set *include; @@ -483,7 +483,7 @@ void *run_read_parallel(void *v) { } madvise(map, rpa->len, MADV_RANDOM); // sequential, but from several pointers at once - do_read_parallel(map, rpa->len, rpa->offset, rpa->reading, rpa->reader, rpa->progress_seq, rpa->exclude, rpa->include, rpa->exclude_all, rpa->fname, rpa->basezoom, rpa->source, rpa->nlayers, rpa->layermaps, rpa->droprate, rpa->initialized, rpa->initial_x, rpa->initial_y, rpa->maxzoom, rpa->layername, rpa->uses_gamma, rpa->attribute_types, rpa->separator, rpa->dist_sum, rpa->dist_count, rpa->want_dist, rpa->filters); + do_read_parallel(map, rpa->len, rpa->offset, rpa->reading, rpa->readers, rpa->progress_seq, rpa->exclude, rpa->include, rpa->exclude_all, rpa->fname, rpa->basezoom, rpa->source, rpa->nlayers, rpa->layermaps, rpa->droprate, rpa->initialized, rpa->initial_x, rpa->initial_y, rpa->maxzoom, rpa->layername, rpa->uses_gamma, rpa->attribute_types, rpa->separator, rpa->dist_sum, rpa->dist_count, rpa->want_dist, rpa->filters); madvise(map, rpa->len, MADV_DONTNEED); if (munmap(map, rpa->len) != 0) { @@ -500,7 +500,7 @@ void *run_read_parallel(void *v) { return NULL; } -void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile int *is_parsing, pthread_t *parallel_parser, bool &parser_created, const char *reading, struct reader *reader, volatile long long *progress_seq, std::set *exclude, std::set *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector > &layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) { +void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile int *is_parsing, pthread_t *parallel_parser, bool &parser_created, const char *reading, struct reader *readers, volatile long long *progress_seq, std::set *exclude, std::set *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector > &layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) { // This has to kick off an intermediate thread to start the parser threads, // so the main thread can get back to reading the next input stage while // the intermediate thread waits for the completion of the parser threads. @@ -521,7 +521,7 @@ void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile i rpa->separator = separator; rpa->reading = reading; - rpa->reader = reader; + rpa->readers = readers; rpa->progress_seq = progress_seq; rpa->exclude = exclude; rpa->include = include; @@ -873,7 +873,7 @@ void prep_drop_states(struct drop_state *ds, int maxzoom, int basezoom, double d } } -void radix(struct reader *reader, int nreaders, FILE *geomfile, int geomfd, FILE *indexfile, int indexfd, const char *tmpdir, long long *geompos, int maxzoom, int basezoom, double droprate, double gamma) { +void radix(struct reader *readers, int nreaders, FILE *geomfile, int geomfd, FILE *indexfile, int indexfd, const char *tmpdir, long long *geompos, int maxzoom, int basezoom, double droprate, double gamma) { // Run through the index and geometry for each reader, // splitting the contents out by index into as many // sub-files as we can write to simultaneously. @@ -926,11 +926,11 @@ void radix(struct reader *reader, int nreaders, FILE *geomfile, int geomfd, FILE int geomfds[nreaders]; int indexfds[nreaders]; for (int i = 0; i < nreaders; i++) { - geomfds[i] = reader[i].geomfd; - indexfds[i] = reader[i].indexfd; + geomfds[i] = readers[i].geomfd; + indexfds[i] = readers[i].indexfd; struct stat geomst; - if (fstat(reader[i].geomfd, &geomst) < 0) { + if (fstat(readers[i].geomfd, &geomst) < 0) { perror("stat geom"); exit(EXIT_FAILURE); } @@ -950,19 +950,19 @@ void radix(struct reader *reader, int nreaders, FILE *geomfile, int geomfd, FILE } } -void choose_first_zoom(long long *file_bbox, struct reader *reader, unsigned *iz, unsigned *ix, unsigned *iy, int minzoom, int buffer) { +void choose_first_zoom(long long *file_bbox, struct reader *readers, unsigned *iz, unsigned *ix, unsigned *iy, int minzoom, int buffer) { for (size_t i = 0; i < CPUS; i++) { - if (reader[i].file_bbox[0] < file_bbox[0]) { - file_bbox[0] = reader[i].file_bbox[0]; + if (readers[i].file_bbox[0] < file_bbox[0]) { + file_bbox[0] = readers[i].file_bbox[0]; } - if (reader[i].file_bbox[1] < file_bbox[1]) { - file_bbox[1] = reader[i].file_bbox[1]; + if (readers[i].file_bbox[1] < file_bbox[1]) { + file_bbox[1] = readers[i].file_bbox[1]; } - if (reader[i].file_bbox[2] > file_bbox[2]) { - file_bbox[2] = reader[i].file_bbox[2]; + if (readers[i].file_bbox[2] > file_bbox[2]) { + file_bbox[2] = readers[i].file_bbox[2]; } - if (reader[i].file_bbox[3] > file_bbox[3]) { - file_bbox[3] = reader[i].file_bbox[3]; + if (readers[i].file_bbox[3] > file_bbox[3]) { + file_bbox[3] = readers[i].file_bbox[3]; } } @@ -1004,9 +1004,9 @@ void choose_first_zoom(long long *file_bbox, struct reader *reader, unsigned *iz int read_input(std::vector &sources, char *fname, int maxzoom, int minzoom, int basezoom, double basezoom_marker_width, sqlite3 *outdb, const char *outdir, std::set *exclude, std::set *include, int exclude_all, double droprate, int buffer, const char *tmpdir, double gamma, int read_parallel, int forcetable, const char *attribution, bool uses_gamma, long long *file_bbox, const char *prefilter, const char *postfilter, const char *description, bool guess_maxzoom, std::map const *attribute_types, const char *pgm) { int ret = EXIT_SUCCESS; - struct reader reader[CPUS]; + struct reader readers[CPUS]; for (size_t i = 0; i < CPUS; i++) { - struct reader *r = reader + i; + struct reader *r = &readers[i]; char metaname[strlen(tmpdir) + strlen("/meta.XXXXXXXX") + 1]; char poolname[strlen(tmpdir) + strlen("/pool.XXXXXXXX") + 1]; @@ -1094,7 +1094,7 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo } struct statfs fsstat; - if (fstatfs(reader[0].geomfd, &fsstat) != 0) { + if (fstatfs(readers[0].geomfd, &fsstat) != 0) { perror("fstatfs"); exit(EXIT_FAILURE); } @@ -1208,32 +1208,45 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo exit(EXIT_FAILURE); } - long long layer_seq = overall_offset; + long long layer_seq[CPUS]; + double dist_sums[CPUS]; + size_t dist_counts[CPUS]; + struct serialization_state sst[CPUS]; - struct serialization_state sst; - sst.fname = reading.c_str(); - sst.line = 0; - sst.layer_seq = &layer_seq; - sst.progress_seq = &progress_seq; - sst.readers = reader; - sst.segment = 0; - sst.initial_x = &initial_x[0]; - sst.initial_y = &initial_y[0]; - sst.initialized = &initialized[0]; - sst.dist_sum = &dist_sum; - sst.dist_count = &dist_count; - sst.want_dist = guess_maxzoom; - sst.maxzoom = maxzoom; - sst.filters = prefilter != NULL || postfilter != NULL; - sst.uses_gamma = uses_gamma; - sst.layermap = &layermaps[0]; - sst.exclude = exclude; - sst.include = include; - sst.exclude_all = exclude_all; - sst.basezoom = basezoom; - sst.attribute_types = attribute_types; + for (size_t i = 0; i < CPUS; i++) { + layer_seq[i] = overall_offset; + dist_sums[i] = 0; + dist_counts[i] = 0; - parse_geobuf(&sst, map, st.st_size, layer, sources[layer].layer); + sst[i].fname = reading.c_str(); + sst[i].line = 0; + sst[i].layer_seq = &layer_seq[i]; + sst[i].progress_seq = &progress_seq; + sst[i].readers = readers; + sst[i].segment = i; + sst[i].initial_x = &initial_x[i]; + sst[i].initial_y = &initial_y[i]; + sst[i].initialized = &initialized[i]; + sst[i].dist_sum = &dist_sums[i]; + sst[i].dist_count = &dist_counts[i]; + sst[i].want_dist = guess_maxzoom; + sst[i].maxzoom = maxzoom; + sst[i].filters = prefilter != NULL || postfilter != NULL; + sst[i].uses_gamma = uses_gamma; + sst[i].layermap = &layermaps[i]; + sst[i].exclude = exclude; + sst[i].include = include; + sst[i].exclude_all = exclude_all; + sst[i].basezoom = basezoom; + sst[i].attribute_types = attribute_types; + } + + parse_geobuf(sst, map, st.st_size, layer, sources[layer].layer); + + for (size_t i = 0; i < CPUS; i++) { + dist_sum += dist_sums[i]; + dist_count += dist_counts[i]; + } if (munmap(map, st.st_size) != 0) { perror("munmap source file"); @@ -1244,8 +1257,8 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo exit(EXIT_FAILURE); } - overall_offset = layer_seq; - checkdisk(reader, CPUS); + overall_offset = layer_seq[0]; + checkdisk(readers, CPUS); continue; } @@ -1286,9 +1299,9 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo } if (map != NULL && map != MAP_FAILED && read_parallel_this) { - do_read_parallel(map, st.st_size - off, overall_offset, reading.c_str(), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, &layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, uses_gamma, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL); + do_read_parallel(map, st.st_size - off, overall_offset, reading.c_str(), readers, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, &layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, uses_gamma, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL); overall_offset += st.st_size - off; - checkdisk(reader, CPUS); + checkdisk(readers, CPUS); if (munmap(map, st.st_size - off) != 0) { perror("munmap source file"); @@ -1362,11 +1375,11 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo } fflush(readfp); - start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str(), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL); + start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str(), readers, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL); initial_offset += ahead; overall_offset += ahead; - checkdisk(reader, CPUS); + checkdisk(readers, CPUS); ahead = 0; sprintf(readname, "%s%s", tmpdir, "/read.XXXXXXXX"); @@ -1399,7 +1412,7 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo fflush(readfp); if (ahead > 0) { - start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str(), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL); + start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str(), readers, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL); if (parser_created) { if (pthread_join(parallel_parser, NULL) != 0) { @@ -1409,7 +1422,7 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo } overall_offset += ahead; - checkdisk(reader, CPUS); + checkdisk(readers, CPUS); } } else { // Plain serial reading @@ -1422,7 +1435,7 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo sst.line = 0; sst.layer_seq = &layer_seq; sst.progress_seq = &progress_seq; - sst.readers = reader; + sst.readers = readers; sst.segment = 0; sst.initial_x = &initial_x[0]; sst.initial_y = &initial_y[0]; @@ -1443,7 +1456,7 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo parse_json(&sst, jp, layer, sources[layer].layer); json_end(jp); overall_offset = layer_seq; - checkdisk(reader, CPUS); + checkdisk(readers, CPUS); } if (fclose(fp) != 0) { @@ -1459,25 +1472,25 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo } for (size_t i = 0; i < CPUS; i++) { - if (fclose(reader[i].metafile) != 0) { + if (fclose(readers[i].metafile) != 0) { perror("fclose meta"); exit(EXIT_FAILURE); } - if (fclose(reader[i].geomfile) != 0) { + if (fclose(readers[i].geomfile) != 0) { perror("fclose geom"); exit(EXIT_FAILURE); } - if (fclose(reader[i].indexfile) != 0) { + if (fclose(readers[i].indexfile) != 0) { perror("fclose index"); exit(EXIT_FAILURE); } - memfile_close(reader[i].treefile); + memfile_close(readers[i].treefile); - if (fstat(reader[i].geomfd, &reader[i].geomst) != 0) { + if (fstat(readers[i].geomfd, &readers[i].geomst) != 0) { perror("stat geom\n"); exit(EXIT_FAILURE); } - if (fstat(reader[i].metafd, &reader[i].metast) != 0) { + if (fstat(readers[i].metafd, &readers[i].metast) != 0) { perror("stat meta\n"); exit(EXIT_FAILURE); } @@ -1532,40 +1545,40 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo long long poolpos = 0; for (size_t i = 0; i < CPUS; i++) { - if (reader[i].metapos > 0) { - void *map = mmap(NULL, reader[i].metapos, PROT_READ, MAP_PRIVATE, reader[i].metafd, 0); + if (readers[i].metapos > 0) { + void *map = mmap(NULL, readers[i].metapos, PROT_READ, MAP_PRIVATE, readers[i].metafd, 0); if (map == MAP_FAILED) { perror("mmap unmerged meta"); exit(EXIT_FAILURE); } - madvise(map, reader[i].metapos, MADV_SEQUENTIAL); - madvise(map, reader[i].metapos, MADV_WILLNEED); - if (fwrite(map, reader[i].metapos, 1, metafile) != 1) { + madvise(map, readers[i].metapos, MADV_SEQUENTIAL); + madvise(map, readers[i].metapos, MADV_WILLNEED); + if (fwrite(map, readers[i].metapos, 1, metafile) != 1) { perror("Reunify meta"); exit(EXIT_FAILURE); } - madvise(map, reader[i].metapos, MADV_DONTNEED); - if (munmap(map, reader[i].metapos) != 0) { + madvise(map, readers[i].metapos, MADV_DONTNEED); + if (munmap(map, readers[i].metapos) != 0) { perror("unmap unmerged meta"); } } meta_off[i] = metapos; - metapos += reader[i].metapos; - if (close(reader[i].metafd) != 0) { + metapos += readers[i].metapos; + if (close(readers[i].metafd) != 0) { perror("close unmerged meta"); } - if (reader[i].poolfile->off > 0) { - if (fwrite(reader[i].poolfile->map, reader[i].poolfile->off, 1, poolfile) != 1) { + if (readers[i].poolfile->off > 0) { + if (fwrite(readers[i].poolfile->map, readers[i].poolfile->off, 1, poolfile) != 1) { perror("Reunify string pool"); exit(EXIT_FAILURE); } } pool_off[i] = poolpos; - poolpos += reader[i].poolfile->off; - memfile_close(reader[i].poolfile); + poolpos += readers[i].poolfile->off; + memfile_close(readers[i].poolfile); } if (fclose(poolfile) != 0) { @@ -1626,7 +1639,7 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo unlink(geomname); unsigned iz = 0, ix = 0, iy = 0; - choose_first_zoom(file_bbox, reader, &iz, &ix, &iy, minzoom, buffer); + choose_first_zoom(file_bbox, readers, &iz, &ix, &iy, minzoom, buffer); long long geompos = 0; @@ -1635,7 +1648,7 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo serialize_uint(geomfile, ix, &geompos, fname); serialize_uint(geomfile, iy, &geompos, fname); - radix(reader, CPUS, geomfile, geomfd, indexfile, indexfd, tmpdir, &geompos, maxzoom, basezoom, droprate, gamma); + radix(readers, CPUS, geomfile, geomfd, indexfile, indexfd, tmpdir, &geompos, maxzoom, basezoom, droprate, gamma); /* end of tile */ serialize_byte(geomfile, -2, &geompos, fname);