mirror of
https://github.com/mapbox/tippecanoe.git
synced 2025-02-01 16:58:05 +00:00
Multithreaded geobuf feature parsing
This commit is contained in:
parent
12d744e961
commit
223d837736
93
geobuf.cpp
93
geobuf.cpp
@ -1,11 +1,13 @@
|
||||
#include <stdio.h>
|
||||
#include <string>
|
||||
#include <limits.h>
|
||||
#include <pthread.h>
|
||||
#include "mvt.hpp"
|
||||
#include "serial.hpp"
|
||||
#include "geobuf.hpp"
|
||||
#include "geojson.hpp"
|
||||
#include "projection.hpp"
|
||||
#include "main.hpp"
|
||||
#include "protozero/varint.hpp"
|
||||
#include "protozero/pbf_reader.hpp"
|
||||
#include "protozero/pbf_writer.hpp"
|
||||
@ -19,6 +21,18 @@
|
||||
#define POLYGON 4
|
||||
#define MULTIPOLYGON 5
|
||||
|
||||
struct queued_feature {
|
||||
protozero::pbf_reader pbf;
|
||||
size_t dim;
|
||||
double e;
|
||||
std::vector<std::string> *keys;
|
||||
struct serialization_state *sst;
|
||||
int layer;
|
||||
std::string layername;
|
||||
};
|
||||
|
||||
static std::vector<queued_feature> feature_queue;
|
||||
|
||||
void ensureDim(size_t dim) {
|
||||
if (dim < 2) {
|
||||
fprintf(stderr, "Geometry has fewer than 2 dimensions: %zu\n", dim);
|
||||
@ -359,13 +373,13 @@ void readFeature(protozero::pbf_reader &pbf, size_t dim, double e, std::vector<s
|
||||
|
||||
sf.layer = layer;
|
||||
sf.layername = layername;
|
||||
sf.segment = 0; // single thread
|
||||
sf.segment = sst->segment;
|
||||
sf.has_id = has_id;
|
||||
sf.id = id;
|
||||
sf.has_tippecanoe_minzoom = false;
|
||||
sf.has_tippecanoe_maxzoom = false;
|
||||
sf.feature_minzoom = false;
|
||||
sf.seq = (*sst->layer_seq);
|
||||
sf.seq = *(sst->layer_seq);
|
||||
sf.geometry = dv[i].dv;
|
||||
sf.t = dv[i].type;
|
||||
sf.full_keys = full_keys;
|
||||
@ -404,12 +418,79 @@ void readFeature(protozero::pbf_reader &pbf, size_t dim, double e, std::vector<s
|
||||
}
|
||||
}
|
||||
|
||||
struct queue_run_arg {
|
||||
size_t start;
|
||||
size_t end;
|
||||
size_t segment;
|
||||
};
|
||||
|
||||
void *run_parse_feature(void *v) {
|
||||
struct queue_run_arg *qra = (struct queue_run_arg *) v;
|
||||
|
||||
for (size_t i = qra->start; i < qra->end; i++) {
|
||||
struct queued_feature &qf = feature_queue[i];
|
||||
readFeature(qf.pbf, qf.dim, qf.e, *qf.keys, &qf.sst[qra->segment], qf.layer, qf.layername);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void runQueue() {
|
||||
if (feature_queue.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
struct queue_run_arg qra[CPUS];
|
||||
pthread_t pthreads[CPUS];
|
||||
|
||||
for (size_t i = 0; i < CPUS; i++) {
|
||||
*(feature_queue[0].sst[i].layer_seq) = *(feature_queue[0].sst[0].layer_seq) + feature_queue.size() * i / CPUS;
|
||||
|
||||
qra[i].start = feature_queue.size() * i / CPUS;
|
||||
qra[i].end = feature_queue.size() * (i + 1) / CPUS;
|
||||
qra[i].segment = i;
|
||||
|
||||
if (pthread_create(&pthreads[i], NULL, run_parse_feature, &qra[i]) != 0) {
|
||||
perror("pthread_create");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < CPUS; i++) {
|
||||
void *retval;
|
||||
|
||||
if (pthread_join(pthreads[i], &retval) != 0) {
|
||||
perror("pthread_join");
|
||||
}
|
||||
}
|
||||
|
||||
*(feature_queue[0].sst[0].layer_seq) = *(feature_queue[0].sst[CPUS - 1].layer_seq);
|
||||
feature_queue.clear();
|
||||
}
|
||||
|
||||
void queueFeature(protozero::pbf_reader &pbf, size_t dim, double e, std::vector<std::string> &keys, struct serialization_state *sst, int layer, std::string layername) {
|
||||
struct queued_feature qf;
|
||||
qf.pbf = pbf;
|
||||
qf.dim = dim;
|
||||
qf.e = e;
|
||||
qf.keys = &keys;
|
||||
qf.sst = sst;
|
||||
qf.layer = layer;
|
||||
qf.layername = layername;
|
||||
|
||||
feature_queue.push_back(qf);
|
||||
|
||||
if (feature_queue.size() > CPUS * 500) {
|
||||
runQueue();
|
||||
}
|
||||
}
|
||||
|
||||
void outBareGeometry(drawvec const &dv, int type, size_t dim, double e, std::vector<std::string> &keys, struct serialization_state *sst, int layer, std::string layername) {
|
||||
serial_feature sf;
|
||||
|
||||
sf.layer = layer;
|
||||
sf.layername = layername;
|
||||
sf.segment = 0; // single thread
|
||||
sf.segment = sst->segment;
|
||||
sf.has_id = false;
|
||||
sf.has_tippecanoe_minzoom = false;
|
||||
sf.has_tippecanoe_maxzoom = false;
|
||||
@ -427,7 +508,7 @@ void readFeatureCollection(protozero::pbf_reader &pbf, size_t dim, double e, std
|
||||
switch (pbf.tag()) {
|
||||
case 1: {
|
||||
protozero::pbf_reader feature_reader(pbf.get_message());
|
||||
readFeature(feature_reader, dim, e, keys, sst, layer, layername);
|
||||
queueFeature(feature_reader, dim, e, keys, sst, layer, layername);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -466,7 +547,7 @@ void parse_geobuf(struct serialization_state *sst, const char *src, size_t len,
|
||||
|
||||
case 5: {
|
||||
protozero::pbf_reader feature_reader(pbf.get_message());
|
||||
readFeature(feature_reader, dim, e, keys, sst, layer, layername);
|
||||
queueFeature(feature_reader, dim, e, keys, sst, layer, layername);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -483,4 +564,6 @@ void parse_geobuf(struct serialization_state *sst, const char *src, size_t len,
|
||||
pbf.skip();
|
||||
}
|
||||
}
|
||||
|
||||
runQueue();
|
||||
}
|
||||
|
167
main.cpp
167
main.cpp
@ -349,7 +349,7 @@ void *run_sort(void *v) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void do_read_parallel(char *map, long long len, long long initial_offset, const char *reading, struct reader *reader, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector<std::map<std::string, layermap_entry> > *layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) {
|
||||
void do_read_parallel(char *map, long long len, long long initial_offset, const char *reading, struct reader *readers, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector<std::map<std::string, layermap_entry> > *layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) {
|
||||
long long segs[CPUS + 1];
|
||||
segs[0] = 0;
|
||||
segs[CPUS] = len;
|
||||
@ -387,7 +387,7 @@ void do_read_parallel(char *map, long long len, long long initial_offset, const
|
||||
sst[i].line = 0;
|
||||
sst[i].layer_seq = &layer_seq[i];
|
||||
sst[i].progress_seq = progress_seq;
|
||||
sst[i].readers = reader;
|
||||
sst[i].readers = readers;
|
||||
sst[i].segment = i;
|
||||
sst[i].initialized = &initialized[i];
|
||||
sst[i].initial_x = &initial_x[i];
|
||||
@ -440,7 +440,7 @@ struct read_parallel_arg {
|
||||
int separator;
|
||||
|
||||
const char *reading;
|
||||
struct reader *reader;
|
||||
struct reader *readers;
|
||||
volatile long long *progress_seq;
|
||||
std::set<std::string> *exclude;
|
||||
std::set<std::string> *include;
|
||||
@ -483,7 +483,7 @@ void *run_read_parallel(void *v) {
|
||||
}
|
||||
madvise(map, rpa->len, MADV_RANDOM); // sequential, but from several pointers at once
|
||||
|
||||
do_read_parallel(map, rpa->len, rpa->offset, rpa->reading, rpa->reader, rpa->progress_seq, rpa->exclude, rpa->include, rpa->exclude_all, rpa->fname, rpa->basezoom, rpa->source, rpa->nlayers, rpa->layermaps, rpa->droprate, rpa->initialized, rpa->initial_x, rpa->initial_y, rpa->maxzoom, rpa->layername, rpa->uses_gamma, rpa->attribute_types, rpa->separator, rpa->dist_sum, rpa->dist_count, rpa->want_dist, rpa->filters);
|
||||
do_read_parallel(map, rpa->len, rpa->offset, rpa->reading, rpa->readers, rpa->progress_seq, rpa->exclude, rpa->include, rpa->exclude_all, rpa->fname, rpa->basezoom, rpa->source, rpa->nlayers, rpa->layermaps, rpa->droprate, rpa->initialized, rpa->initial_x, rpa->initial_y, rpa->maxzoom, rpa->layername, rpa->uses_gamma, rpa->attribute_types, rpa->separator, rpa->dist_sum, rpa->dist_count, rpa->want_dist, rpa->filters);
|
||||
|
||||
madvise(map, rpa->len, MADV_DONTNEED);
|
||||
if (munmap(map, rpa->len) != 0) {
|
||||
@ -500,7 +500,7 @@ void *run_read_parallel(void *v) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile int *is_parsing, pthread_t *parallel_parser, bool &parser_created, const char *reading, struct reader *reader, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector<std::map<std::string, layermap_entry> > &layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) {
|
||||
void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile int *is_parsing, pthread_t *parallel_parser, bool &parser_created, const char *reading, struct reader *readers, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector<std::map<std::string, layermap_entry> > &layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) {
|
||||
// This has to kick off an intermediate thread to start the parser threads,
|
||||
// so the main thread can get back to reading the next input stage while
|
||||
// the intermediate thread waits for the completion of the parser threads.
|
||||
@ -521,7 +521,7 @@ void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile i
|
||||
rpa->separator = separator;
|
||||
|
||||
rpa->reading = reading;
|
||||
rpa->reader = reader;
|
||||
rpa->readers = readers;
|
||||
rpa->progress_seq = progress_seq;
|
||||
rpa->exclude = exclude;
|
||||
rpa->include = include;
|
||||
@ -873,7 +873,7 @@ void prep_drop_states(struct drop_state *ds, int maxzoom, int basezoom, double d
|
||||
}
|
||||
}
|
||||
|
||||
void radix(struct reader *reader, int nreaders, FILE *geomfile, int geomfd, FILE *indexfile, int indexfd, const char *tmpdir, long long *geompos, int maxzoom, int basezoom, double droprate, double gamma) {
|
||||
void radix(struct reader *readers, int nreaders, FILE *geomfile, int geomfd, FILE *indexfile, int indexfd, const char *tmpdir, long long *geompos, int maxzoom, int basezoom, double droprate, double gamma) {
|
||||
// Run through the index and geometry for each reader,
|
||||
// splitting the contents out by index into as many
|
||||
// sub-files as we can write to simultaneously.
|
||||
@ -926,11 +926,11 @@ void radix(struct reader *reader, int nreaders, FILE *geomfile, int geomfd, FILE
|
||||
int geomfds[nreaders];
|
||||
int indexfds[nreaders];
|
||||
for (int i = 0; i < nreaders; i++) {
|
||||
geomfds[i] = reader[i].geomfd;
|
||||
indexfds[i] = reader[i].indexfd;
|
||||
geomfds[i] = readers[i].geomfd;
|
||||
indexfds[i] = readers[i].indexfd;
|
||||
|
||||
struct stat geomst;
|
||||
if (fstat(reader[i].geomfd, &geomst) < 0) {
|
||||
if (fstat(readers[i].geomfd, &geomst) < 0) {
|
||||
perror("stat geom");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
@ -950,19 +950,19 @@ void radix(struct reader *reader, int nreaders, FILE *geomfile, int geomfd, FILE
|
||||
}
|
||||
}
|
||||
|
||||
void choose_first_zoom(long long *file_bbox, struct reader *reader, unsigned *iz, unsigned *ix, unsigned *iy, int minzoom, int buffer) {
|
||||
void choose_first_zoom(long long *file_bbox, struct reader *readers, unsigned *iz, unsigned *ix, unsigned *iy, int minzoom, int buffer) {
|
||||
for (size_t i = 0; i < CPUS; i++) {
|
||||
if (reader[i].file_bbox[0] < file_bbox[0]) {
|
||||
file_bbox[0] = reader[i].file_bbox[0];
|
||||
if (readers[i].file_bbox[0] < file_bbox[0]) {
|
||||
file_bbox[0] = readers[i].file_bbox[0];
|
||||
}
|
||||
if (reader[i].file_bbox[1] < file_bbox[1]) {
|
||||
file_bbox[1] = reader[i].file_bbox[1];
|
||||
if (readers[i].file_bbox[1] < file_bbox[1]) {
|
||||
file_bbox[1] = readers[i].file_bbox[1];
|
||||
}
|
||||
if (reader[i].file_bbox[2] > file_bbox[2]) {
|
||||
file_bbox[2] = reader[i].file_bbox[2];
|
||||
if (readers[i].file_bbox[2] > file_bbox[2]) {
|
||||
file_bbox[2] = readers[i].file_bbox[2];
|
||||
}
|
||||
if (reader[i].file_bbox[3] > file_bbox[3]) {
|
||||
file_bbox[3] = reader[i].file_bbox[3];
|
||||
if (readers[i].file_bbox[3] > file_bbox[3]) {
|
||||
file_bbox[3] = readers[i].file_bbox[3];
|
||||
}
|
||||
}
|
||||
|
||||
@ -1004,9 +1004,9 @@ void choose_first_zoom(long long *file_bbox, struct reader *reader, unsigned *iz
|
||||
int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzoom, int basezoom, double basezoom_marker_width, sqlite3 *outdb, const char *outdir, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, double droprate, int buffer, const char *tmpdir, double gamma, int read_parallel, int forcetable, const char *attribution, bool uses_gamma, long long *file_bbox, const char *prefilter, const char *postfilter, const char *description, bool guess_maxzoom, std::map<std::string, int> const *attribute_types, const char *pgm) {
|
||||
int ret = EXIT_SUCCESS;
|
||||
|
||||
struct reader reader[CPUS];
|
||||
struct reader readers[CPUS];
|
||||
for (size_t i = 0; i < CPUS; i++) {
|
||||
struct reader *r = reader + i;
|
||||
struct reader *r = &readers[i];
|
||||
|
||||
char metaname[strlen(tmpdir) + strlen("/meta.XXXXXXXX") + 1];
|
||||
char poolname[strlen(tmpdir) + strlen("/pool.XXXXXXXX") + 1];
|
||||
@ -1094,7 +1094,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
}
|
||||
|
||||
struct statfs fsstat;
|
||||
if (fstatfs(reader[0].geomfd, &fsstat) != 0) {
|
||||
if (fstatfs(readers[0].geomfd, &fsstat) != 0) {
|
||||
perror("fstatfs");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
@ -1208,32 +1208,45 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
long long layer_seq = overall_offset;
|
||||
long long layer_seq[CPUS];
|
||||
double dist_sums[CPUS];
|
||||
size_t dist_counts[CPUS];
|
||||
struct serialization_state sst[CPUS];
|
||||
|
||||
struct serialization_state sst;
|
||||
sst.fname = reading.c_str();
|
||||
sst.line = 0;
|
||||
sst.layer_seq = &layer_seq;
|
||||
sst.progress_seq = &progress_seq;
|
||||
sst.readers = reader;
|
||||
sst.segment = 0;
|
||||
sst.initial_x = &initial_x[0];
|
||||
sst.initial_y = &initial_y[0];
|
||||
sst.initialized = &initialized[0];
|
||||
sst.dist_sum = &dist_sum;
|
||||
sst.dist_count = &dist_count;
|
||||
sst.want_dist = guess_maxzoom;
|
||||
sst.maxzoom = maxzoom;
|
||||
sst.filters = prefilter != NULL || postfilter != NULL;
|
||||
sst.uses_gamma = uses_gamma;
|
||||
sst.layermap = &layermaps[0];
|
||||
sst.exclude = exclude;
|
||||
sst.include = include;
|
||||
sst.exclude_all = exclude_all;
|
||||
sst.basezoom = basezoom;
|
||||
sst.attribute_types = attribute_types;
|
||||
for (size_t i = 0; i < CPUS; i++) {
|
||||
layer_seq[i] = overall_offset;
|
||||
dist_sums[i] = 0;
|
||||
dist_counts[i] = 0;
|
||||
|
||||
parse_geobuf(&sst, map, st.st_size, layer, sources[layer].layer);
|
||||
sst[i].fname = reading.c_str();
|
||||
sst[i].line = 0;
|
||||
sst[i].layer_seq = &layer_seq[i];
|
||||
sst[i].progress_seq = &progress_seq;
|
||||
sst[i].readers = readers;
|
||||
sst[i].segment = i;
|
||||
sst[i].initial_x = &initial_x[i];
|
||||
sst[i].initial_y = &initial_y[i];
|
||||
sst[i].initialized = &initialized[i];
|
||||
sst[i].dist_sum = &dist_sums[i];
|
||||
sst[i].dist_count = &dist_counts[i];
|
||||
sst[i].want_dist = guess_maxzoom;
|
||||
sst[i].maxzoom = maxzoom;
|
||||
sst[i].filters = prefilter != NULL || postfilter != NULL;
|
||||
sst[i].uses_gamma = uses_gamma;
|
||||
sst[i].layermap = &layermaps[i];
|
||||
sst[i].exclude = exclude;
|
||||
sst[i].include = include;
|
||||
sst[i].exclude_all = exclude_all;
|
||||
sst[i].basezoom = basezoom;
|
||||
sst[i].attribute_types = attribute_types;
|
||||
}
|
||||
|
||||
parse_geobuf(sst, map, st.st_size, layer, sources[layer].layer);
|
||||
|
||||
for (size_t i = 0; i < CPUS; i++) {
|
||||
dist_sum += dist_sums[i];
|
||||
dist_count += dist_counts[i];
|
||||
}
|
||||
|
||||
if (munmap(map, st.st_size) != 0) {
|
||||
perror("munmap source file");
|
||||
@ -1244,8 +1257,8 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
overall_offset = layer_seq;
|
||||
checkdisk(reader, CPUS);
|
||||
overall_offset = layer_seq[0];
|
||||
checkdisk(readers, CPUS);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -1286,9 +1299,9 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
}
|
||||
|
||||
if (map != NULL && map != MAP_FAILED && read_parallel_this) {
|
||||
do_read_parallel(map, st.st_size - off, overall_offset, reading.c_str(), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, &layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, uses_gamma, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL);
|
||||
do_read_parallel(map, st.st_size - off, overall_offset, reading.c_str(), readers, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, &layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, uses_gamma, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL);
|
||||
overall_offset += st.st_size - off;
|
||||
checkdisk(reader, CPUS);
|
||||
checkdisk(readers, CPUS);
|
||||
|
||||
if (munmap(map, st.st_size - off) != 0) {
|
||||
perror("munmap source file");
|
||||
@ -1362,11 +1375,11 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
}
|
||||
|
||||
fflush(readfp);
|
||||
start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str(), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL);
|
||||
start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str(), readers, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL);
|
||||
|
||||
initial_offset += ahead;
|
||||
overall_offset += ahead;
|
||||
checkdisk(reader, CPUS);
|
||||
checkdisk(readers, CPUS);
|
||||
ahead = 0;
|
||||
|
||||
sprintf(readname, "%s%s", tmpdir, "/read.XXXXXXXX");
|
||||
@ -1399,7 +1412,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
fflush(readfp);
|
||||
|
||||
if (ahead > 0) {
|
||||
start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str(), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL);
|
||||
start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str(), readers, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL);
|
||||
|
||||
if (parser_created) {
|
||||
if (pthread_join(parallel_parser, NULL) != 0) {
|
||||
@ -1409,7 +1422,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
}
|
||||
|
||||
overall_offset += ahead;
|
||||
checkdisk(reader, CPUS);
|
||||
checkdisk(readers, CPUS);
|
||||
}
|
||||
} else {
|
||||
// Plain serial reading
|
||||
@ -1422,7 +1435,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
sst.line = 0;
|
||||
sst.layer_seq = &layer_seq;
|
||||
sst.progress_seq = &progress_seq;
|
||||
sst.readers = reader;
|
||||
sst.readers = readers;
|
||||
sst.segment = 0;
|
||||
sst.initial_x = &initial_x[0];
|
||||
sst.initial_y = &initial_y[0];
|
||||
@ -1443,7 +1456,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
parse_json(&sst, jp, layer, sources[layer].layer);
|
||||
json_end(jp);
|
||||
overall_offset = layer_seq;
|
||||
checkdisk(reader, CPUS);
|
||||
checkdisk(readers, CPUS);
|
||||
}
|
||||
|
||||
if (fclose(fp) != 0) {
|
||||
@ -1459,25 +1472,25 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < CPUS; i++) {
|
||||
if (fclose(reader[i].metafile) != 0) {
|
||||
if (fclose(readers[i].metafile) != 0) {
|
||||
perror("fclose meta");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
if (fclose(reader[i].geomfile) != 0) {
|
||||
if (fclose(readers[i].geomfile) != 0) {
|
||||
perror("fclose geom");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
if (fclose(reader[i].indexfile) != 0) {
|
||||
if (fclose(readers[i].indexfile) != 0) {
|
||||
perror("fclose index");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
memfile_close(reader[i].treefile);
|
||||
memfile_close(readers[i].treefile);
|
||||
|
||||
if (fstat(reader[i].geomfd, &reader[i].geomst) != 0) {
|
||||
if (fstat(readers[i].geomfd, &readers[i].geomst) != 0) {
|
||||
perror("stat geom\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
if (fstat(reader[i].metafd, &reader[i].metast) != 0) {
|
||||
if (fstat(readers[i].metafd, &readers[i].metast) != 0) {
|
||||
perror("stat meta\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
@ -1532,40 +1545,40 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
long long poolpos = 0;
|
||||
|
||||
for (size_t i = 0; i < CPUS; i++) {
|
||||
if (reader[i].metapos > 0) {
|
||||
void *map = mmap(NULL, reader[i].metapos, PROT_READ, MAP_PRIVATE, reader[i].metafd, 0);
|
||||
if (readers[i].metapos > 0) {
|
||||
void *map = mmap(NULL, readers[i].metapos, PROT_READ, MAP_PRIVATE, readers[i].metafd, 0);
|
||||
if (map == MAP_FAILED) {
|
||||
perror("mmap unmerged meta");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
madvise(map, reader[i].metapos, MADV_SEQUENTIAL);
|
||||
madvise(map, reader[i].metapos, MADV_WILLNEED);
|
||||
if (fwrite(map, reader[i].metapos, 1, metafile) != 1) {
|
||||
madvise(map, readers[i].metapos, MADV_SEQUENTIAL);
|
||||
madvise(map, readers[i].metapos, MADV_WILLNEED);
|
||||
if (fwrite(map, readers[i].metapos, 1, metafile) != 1) {
|
||||
perror("Reunify meta");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
madvise(map, reader[i].metapos, MADV_DONTNEED);
|
||||
if (munmap(map, reader[i].metapos) != 0) {
|
||||
madvise(map, readers[i].metapos, MADV_DONTNEED);
|
||||
if (munmap(map, readers[i].metapos) != 0) {
|
||||
perror("unmap unmerged meta");
|
||||
}
|
||||
}
|
||||
|
||||
meta_off[i] = metapos;
|
||||
metapos += reader[i].metapos;
|
||||
if (close(reader[i].metafd) != 0) {
|
||||
metapos += readers[i].metapos;
|
||||
if (close(readers[i].metafd) != 0) {
|
||||
perror("close unmerged meta");
|
||||
}
|
||||
|
||||
if (reader[i].poolfile->off > 0) {
|
||||
if (fwrite(reader[i].poolfile->map, reader[i].poolfile->off, 1, poolfile) != 1) {
|
||||
if (readers[i].poolfile->off > 0) {
|
||||
if (fwrite(readers[i].poolfile->map, readers[i].poolfile->off, 1, poolfile) != 1) {
|
||||
perror("Reunify string pool");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
pool_off[i] = poolpos;
|
||||
poolpos += reader[i].poolfile->off;
|
||||
memfile_close(reader[i].poolfile);
|
||||
poolpos += readers[i].poolfile->off;
|
||||
memfile_close(readers[i].poolfile);
|
||||
}
|
||||
|
||||
if (fclose(poolfile) != 0) {
|
||||
@ -1626,7 +1639,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
unlink(geomname);
|
||||
|
||||
unsigned iz = 0, ix = 0, iy = 0;
|
||||
choose_first_zoom(file_bbox, reader, &iz, &ix, &iy, minzoom, buffer);
|
||||
choose_first_zoom(file_bbox, readers, &iz, &ix, &iy, minzoom, buffer);
|
||||
|
||||
long long geompos = 0;
|
||||
|
||||
@ -1635,7 +1648,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
serialize_uint(geomfile, ix, &geompos, fname);
|
||||
serialize_uint(geomfile, iy, &geompos, fname);
|
||||
|
||||
radix(reader, CPUS, geomfile, geomfd, indexfile, indexfd, tmpdir, &geompos, maxzoom, basezoom, droprate, gamma);
|
||||
radix(readers, CPUS, geomfile, geomfd, indexfile, indexfd, tmpdir, &geompos, maxzoom, basezoom, droprate, gamma);
|
||||
|
||||
/* end of tile */
|
||||
serialize_byte(geomfile, -2, &geompos, fname);
|
||||
|
Loading…
x
Reference in New Issue
Block a user