Support RFC 8142 GeoJSON text sequences

This commit is contained in:
Eric Fischer 2017-04-28 10:41:20 -07:00
parent dc01d33402
commit 63e0c89c4b
8 changed files with 4177 additions and 123 deletions

View File

@ -1,3 +1,7 @@
## 1.17.3
* Support RFC 8142 GeoJSON text sequences
## 1.17.2 ## 1.17.2
* Organize usage output the same way as in the README * Organize usage output the same way as in the README

View File

@ -99,15 +99,18 @@ parallel-test:
./tippecanoe -z5 -f -pi -l test -n test -P -o tests/parallel/parallel-file.mbtiles tests/parallel/in[123].json tests/parallel/empty[12].json ./tippecanoe -z5 -f -pi -l test -n test -P -o tests/parallel/parallel-file.mbtiles tests/parallel/in[123].json tests/parallel/empty[12].json
cat tests/parallel/in[123].json | ./tippecanoe -z5 -f -pi -l test -n test -o tests/parallel/linear-pipe.mbtiles cat tests/parallel/in[123].json | ./tippecanoe -z5 -f -pi -l test -n test -o tests/parallel/linear-pipe.mbtiles
cat tests/parallel/in[123].json | ./tippecanoe -z5 -f -pi -l test -n test -P -o tests/parallel/parallel-pipe.mbtiles cat tests/parallel/in[123].json | ./tippecanoe -z5 -f -pi -l test -n test -P -o tests/parallel/parallel-pipe.mbtiles
cat tests/parallel/in[123].json | sed 's/^/@/' | tr '@' '\036' | ./tippecanoe -z5 -f -pi -l test -n test -o tests/parallel/implicit-pipe.mbtiles
./tippecanoe -z5 -f -pi -l test -n test -P -o tests/parallel/parallel-pipes.mbtiles <(cat tests/parallel/in1.json) <(cat tests/parallel/empty1.json) <(cat tests/parallel/empty2.json) <(cat tests/parallel/in2.json) /dev/null <(cat tests/parallel/in3.json) ./tippecanoe -z5 -f -pi -l test -n test -P -o tests/parallel/parallel-pipes.mbtiles <(cat tests/parallel/in1.json) <(cat tests/parallel/empty1.json) <(cat tests/parallel/empty2.json) <(cat tests/parallel/in2.json) /dev/null <(cat tests/parallel/in3.json)
./tippecanoe-decode tests/parallel/linear-file.mbtiles > tests/parallel/linear-file.json ./tippecanoe-decode tests/parallel/linear-file.mbtiles > tests/parallel/linear-file.json
./tippecanoe-decode tests/parallel/parallel-file.mbtiles > tests/parallel/parallel-file.json ./tippecanoe-decode tests/parallel/parallel-file.mbtiles > tests/parallel/parallel-file.json
./tippecanoe-decode tests/parallel/linear-pipe.mbtiles > tests/parallel/linear-pipe.json ./tippecanoe-decode tests/parallel/linear-pipe.mbtiles > tests/parallel/linear-pipe.json
./tippecanoe-decode tests/parallel/parallel-pipe.mbtiles > tests/parallel/parallel-pipe.json ./tippecanoe-decode tests/parallel/parallel-pipe.mbtiles > tests/parallel/parallel-pipe.json
./tippecanoe-decode tests/parallel/implicit-pipe.mbtiles > tests/parallel/implicit-pipe.json
./tippecanoe-decode tests/parallel/parallel-pipes.mbtiles > tests/parallel/parallel-pipes.json ./tippecanoe-decode tests/parallel/parallel-pipes.mbtiles > tests/parallel/parallel-pipes.json
cmp tests/parallel/linear-file.json tests/parallel/parallel-file.json cmp tests/parallel/linear-file.json tests/parallel/parallel-file.json
cmp tests/parallel/linear-file.json tests/parallel/linear-pipe.json cmp tests/parallel/linear-file.json tests/parallel/linear-pipe.json
cmp tests/parallel/linear-file.json tests/parallel/parallel-pipe.json cmp tests/parallel/linear-file.json tests/parallel/parallel-pipe.json
cmp tests/parallel/linear-file.json tests/parallel/implicit-pipe.json
cmp tests/parallel/linear-file.json tests/parallel/parallel-pipes.json cmp tests/parallel/linear-file.json tests/parallel/parallel-pipes.json
rm tests/parallel/*.mbtiles tests/parallel/*.json rm tests/parallel/*.mbtiles tests/parallel/*.json

View File

@ -104,6 +104,10 @@ If your input is formatted as newline-delimited GeoJSON, use `-P` to make input
Performance will be better if the input is a named file that can be mapped into memory Performance will be better if the input is a named file that can be mapped into memory
rather than a stream that can only be read sequentially. rather than a stream that can only be read sequentially.
If the input file begins with the [RFC 8142](https://tools.ietf.org/html/rfc8142) record separator,
parallel processing of input will be invoked automatically, splitting at record separators rather
than at all newlines.
### Projection of input ### Projection of input
* `-s` _projection_ or `--projection=`_projection_: Specify the projection of the input data. Currently supported are `EPSG:4326` (WGS84, the default) and `EPSG:3857` (Web Mercator). In general you should use WGS84 for your input files if at all possible. * `-s` _projection_ or `--projection=`_projection_: Specify the projection of the input data. Currently supported are `EPSG:4326` (WGS84, the default) and `EPSG:3857` (Web Mercator). In general you should use WGS84 for your input files if at all possible.

View File

@ -295,7 +295,7 @@ again:
return NULL; return NULL;
} }
} while (c == ' ' || c == '\t' || c == '\r' || c == '\n'); } while (c == ' ' || c == '\t' || c == '\r' || c == '\n' || c == 0x1E);
/////////////////////////// Arrays /////////////////////////// Arrays

View File

@ -372,7 +372,7 @@ void *run_sort(void *v) {
return NULL; return NULL;
} }
void do_read_parallel(char *map, long long len, long long initial_offset, const char *reading, struct reader *reader, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector<std::map<std::string, layermap_entry> > *layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types) { void do_read_parallel(char *map, long long len, long long initial_offset, const char *reading, struct reader *reader, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector<std::map<std::string, layermap_entry> > *layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, int separator) {
long long segs[CPUS + 1]; long long segs[CPUS + 1];
segs[0] = 0; segs[0] = 0;
segs[CPUS] = len; segs[CPUS] = len;
@ -380,7 +380,7 @@ void do_read_parallel(char *map, long long len, long long initial_offset, const
for (size_t i = 1; i < CPUS; i++) { for (size_t i = 1; i < CPUS; i++) {
segs[i] = len * i / CPUS; segs[i] = len * i / CPUS;
while (segs[i] < len && map[segs[i]] != '\n') { while (segs[i] < len && map[segs[i]] != separator) {
segs[i]++; segs[i]++;
} }
} }
@ -455,6 +455,7 @@ struct read_parallel_arg {
long long offset; long long offset;
long long len; long long len;
volatile int *is_parsing; volatile int *is_parsing;
int separator;
const char *reading; const char *reading;
struct reader *reader; struct reader *reader;
@ -496,7 +497,7 @@ void *run_read_parallel(void *v) {
} }
madvise(map, rpa->len, MADV_RANDOM); // sequential, but from several pointers at once madvise(map, rpa->len, MADV_RANDOM); // sequential, but from several pointers at once
do_read_parallel(map, rpa->len, rpa->offset, rpa->reading, rpa->reader, rpa->progress_seq, rpa->exclude, rpa->include, rpa->exclude_all, rpa->fname, rpa->basezoom, rpa->source, rpa->nlayers, rpa->layermaps, rpa->droprate, rpa->initialized, rpa->initial_x, rpa->initial_y, rpa->maxzoom, rpa->layername, rpa->uses_gamma, rpa->attribute_types); do_read_parallel(map, rpa->len, rpa->offset, rpa->reading, rpa->reader, rpa->progress_seq, rpa->exclude, rpa->include, rpa->exclude_all, rpa->fname, rpa->basezoom, rpa->source, rpa->nlayers, rpa->layermaps, rpa->droprate, rpa->initialized, rpa->initial_x, rpa->initial_y, rpa->maxzoom, rpa->layername, rpa->uses_gamma, rpa->attribute_types, rpa->separator);
madvise(map, rpa->len, MADV_DONTNEED); madvise(map, rpa->len, MADV_DONTNEED);
if (munmap(map, rpa->len) != 0) { if (munmap(map, rpa->len) != 0) {
@ -513,7 +514,7 @@ void *run_read_parallel(void *v) {
return NULL; return NULL;
} }
void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile int *is_parsing, pthread_t *parallel_parser, bool &parser_created, const char *reading, struct reader *reader, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector<std::map<std::string, layermap_entry> > &layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types) { void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile int *is_parsing, pthread_t *parallel_parser, bool &parser_created, const char *reading, struct reader *reader, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, char *fname, int basezoom, int source, int nlayers, std::vector<std::map<std::string, layermap_entry> > &layermaps, double droprate, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, int separator) {
// This has to kick off an intermediate thread to start the parser threads, // This has to kick off an intermediate thread to start the parser threads,
// so the main thread can get back to reading the next input stage while // so the main thread can get back to reading the next input stage while
// the intermediate thread waits for the completion of the parser threads. // the intermediate thread waits for the completion of the parser threads.
@ -531,6 +532,7 @@ void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile i
rpa->offset = offset; rpa->offset = offset;
rpa->len = len; rpa->len = len;
rpa->is_parsing = is_parsing; rpa->is_parsing = is_parsing;
rpa->separator = separator;
rpa->reading = reading; rpa->reading = reading;
rpa->reader = reader; rpa->reader = reader;
@ -1200,7 +1202,9 @@ int read_input(std::vector<source> &sources, char *fname, int &maxzoom, int minz
char *map = NULL; char *map = NULL;
off_t off = 0; off_t off = 0;
if (read_parallel) { int read_parallel_this = read_parallel ? '\n' : 0x1E;
if (1) {
if (fstat(fd, &st) == 0) { if (fstat(fd, &st) == 0) {
off = lseek(fd, 0, SEEK_CUR); off = lseek(fd, 0, SEEK_CUR);
if (off >= 0) { if (off >= 0) {
@ -1213,14 +1217,31 @@ int read_input(std::vector<source> &sources, char *fname, int &maxzoom, int minz
} }
} }
if (map != NULL && map != MAP_FAILED && st.st_size - off > 0) {
if (map[0] == 0x1E) {
read_parallel_this = 0x1E;
}
if (!read_parallel_this) {
// Not a GeoJSON text sequence, so unmap and read serially
if (munmap(map, st.st_size - off) != 0) {
perror("munmap source file");
exit(EXIT_FAILURE);
}
map = NULL;
}
}
if (map != NULL && map != MAP_FAILED) { if (map != NULL && map != MAP_FAILED) {
do_read_parallel(map, st.st_size - off, overall_offset, reading.c_str(), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, &layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, uses_gamma, attribute_types); do_read_parallel(map, st.st_size - off, overall_offset, reading.c_str(), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, &layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, uses_gamma, attribute_types, read_parallel_this);
overall_offset += st.st_size - off; overall_offset += st.st_size - off;
checkdisk(reader, CPUS); checkdisk(reader, CPUS);
if (munmap(map, st.st_size - off) != 0) { if (munmap(map, st.st_size - off) != 0) {
madvise(map, st.st_size, MADV_DONTNEED);
perror("munmap source file"); perror("munmap source file");
exit(EXIT_FAILURE);
} }
} else { } else {
FILE *fp = fdopen(fd, "r"); FILE *fp = fdopen(fd, "r");
@ -1233,7 +1254,15 @@ int read_input(std::vector<source> &sources, char *fname, int &maxzoom, int minz
continue; continue;
} }
if (read_parallel) { int c = getc(fp);
if (c != EOF) {
ungetc(c, fp);
}
if (c == 0x1E) {
read_parallel_this = 0x1E;
}
if (read_parallel_this) {
// Serial reading of chunks that are then parsed in parallel // Serial reading of chunks that are then parsed in parallel
char readname[strlen(tmpdir) + strlen("/read.XXXXXXXX") + 1]; char readname[strlen(tmpdir) + strlen("/read.XXXXXXXX") + 1];
@ -1267,7 +1296,7 @@ int read_input(std::vector<source> &sources, char *fname, int &maxzoom, int minz
fwrite_check(buf, sizeof(char), n, readfp, reading.c_str()); fwrite_check(buf, sizeof(char), n, readfp, reading.c_str());
ahead += n; ahead += n;
if (buf[n - 1] == '\n' && ahead > PARSE_MIN) { if (buf[n - 1] == read_parallel_this && ahead > PARSE_MIN) {
// Don't let the streaming reader get too far ahead of the parsers. // Don't let the streaming reader get too far ahead of the parsers.
// If the buffered input gets huge, even if the parsers are still running, // If the buffered input gets huge, even if the parsers are still running,
// wait for the parser thread instead of continuing to stream input. // wait for the parser thread instead of continuing to stream input.
@ -1282,7 +1311,7 @@ int read_input(std::vector<source> &sources, char *fname, int &maxzoom, int minz
} }
fflush(readfp); fflush(readfp);
start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, &parallel_parser, parser_created, reading.c_str(), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types); start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, &parallel_parser, parser_created, reading.c_str(), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this);
initial_offset += ahead; initial_offset += ahead;
overall_offset += ahead; overall_offset += ahead;
@ -1319,7 +1348,7 @@ int read_input(std::vector<source> &sources, char *fname, int &maxzoom, int minz
fflush(readfp); fflush(readfp);
if (ahead > 0) { if (ahead > 0) {
start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, &parallel_parser, parser_created, reading.c_str(), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types); start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, &parallel_parser, parser_created, reading.c_str(), reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, layer, nlayers, layermaps, droprate, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this);
if (parser_created) { if (parser_created) {
if (pthread_join(parallel_parser, NULL) != 0) { if (pthread_join(parallel_parser, NULL) != 0) {

View File

@ -109,6 +109,10 @@ messages may result otherwise.
Performance will be better if the input is a named file that can be mapped into memory Performance will be better if the input is a named file that can be mapped into memory
rather than a stream that can only be read sequentially. rather than a stream that can only be read sequentially.
.RE .RE
.PP
If the input file begins with the RFC 8142 \[la]https://tools.ietf.org/html/rfc8142\[ra] record separator,
parallel processing of input will be invoked automatically, splitting at record separators rather
than at all newlines.
.SS Projection of input .SS Projection of input
.RS .RS
.IP \(bu 2 .IP \(bu 2

File diff suppressed because it is too large Load Diff

View File

@ -1 +1 @@
#define VERSION "tippecanoe v1.17.2\n" #define VERSION "tippecanoe v1.17.3\n"