Don't let the input buffer for parallel streaming input get too big.

This commit is contained in:
Eric Fischer 2016-04-11 13:50:39 -07:00
parent 9d48d6a93d
commit e846b11ce7
3 changed files with 51 additions and 26 deletions

View File

@ -1,3 +1,7 @@
## 1.9.12
* Limit the size of the parallel parsing streaming input buffer
## 1.9.11
* Fix a line simplification crash when a segment degenerates to a single point

View File

@ -1711,6 +1711,8 @@ int read_json(int argc, struct source **sourcelist, char *fname, const char *lay
#define READ_BUF 2000
#define PARSE_MIN 10000000
#define PARSE_MAX (1LL * 1024 * 1024 * 1024)
char buf[READ_BUF];
int n;
@ -1718,34 +1720,53 @@ int read_json(int argc, struct source **sourcelist, char *fname, const char *lay
fwrite_check(buf, sizeof(char), n, readfp, reading);
ahead += n;
if (buf[n - 1] == '\n' && ahead > PARSE_MIN && is_parsing == 0) {
if (initial_offset != 0) {
if (pthread_join(parallel_parser, NULL) != 0) {
perror("pthread_join");
if (buf[n - 1] == '\n' && ahead > PARSE_MIN) {
int already_waited = 0;
// Don't let the streaming reader get too far ahead of the parsers.
// If the buffered input gets huge, block until the parsers are done.
while (ahead >= PARSE_MAX && is_parsing != 0) {
if (initial_offset != 0) {
if (pthread_join(parallel_parser, NULL) != 0) {
perror("pthread_join");
exit(EXIT_FAILURE);
}
}
already_waited = 1;
}
if (is_parsing == 0) {
if (!already_waited) {
if (initial_offset != 0) {
if (pthread_join(parallel_parser, NULL) != 0) {
perror("pthread_join");
exit(EXIT_FAILURE);
}
}
}
fflush(readfp);
start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, &parallel_parser, reading, reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, source, nlayers, droprate, initialized, initial_x, initial_y);
initial_offset += ahead;
overall_offset += ahead;
checkdisk(reader, CPUS);
ahead = 0;
sprintf(readname, "%s%s", tmpdir, "/read.XXXXXXXX");
readfd = mkstemp(readname);
if (readfd < 0) {
perror(readname);
exit(EXIT_FAILURE);
}
readfp = fdopen(readfd, "w");
if (readfp == NULL) {
perror(readname);
exit(EXIT_FAILURE);
}
unlink(readname);
}
fflush(readfp);
start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, &parallel_parser, reading, reader, &progress_seq, exclude, include, exclude_all, fname, basezoom, source, nlayers, droprate, initialized, initial_x, initial_y);
initial_offset += ahead;
overall_offset += ahead;
checkdisk(reader, CPUS);
ahead = 0;
sprintf(readname, "%s%s", tmpdir, "/read.XXXXXXXX");
readfd = mkstemp(readname);
if (readfd < 0) {
perror(readname);
exit(EXIT_FAILURE);
}
readfp = fdopen(readfd, "w");
if (readfp == NULL) {
perror(readname);
exit(EXIT_FAILURE);
}
unlink(readname);
}
}
if (n < 0) {

View File

@ -1 +1 @@
#define VERSION "tippecanoe v1.9.11\n"
#define VERSION "tippecanoe v1.9.12\n"