mirror of
https://github.com/mapbox/tippecanoe.git
synced 2025-02-01 16:58:05 +00:00
Start an abstraction layer for input streams
This commit is contained in:
parent
8e8b74b0ce
commit
33b0d5c15c
69
main.cpp
69
main.cpp
@ -476,9 +476,53 @@ void do_read_parallel(char *map, long long len, long long initial_offset, const
|
||||
}
|
||||
}
|
||||
|
||||
struct STREAM {
|
||||
FILE *fp;
|
||||
|
||||
int fclose() {
|
||||
int ret = ::fclose(fp);
|
||||
delete this;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int peekc() {
|
||||
int c = getc(fp);
|
||||
if (c != EOF) {
|
||||
ungetc(c, fp);
|
||||
}
|
||||
return c;
|
||||
}
|
||||
|
||||
int fread(char *buf, size_t unit, size_t count) {
|
||||
return ::fread(buf, unit, count, fp);
|
||||
}
|
||||
|
||||
json_pull *json_begin() {
|
||||
return json_begin_file(fp);
|
||||
}
|
||||
};
|
||||
|
||||
STREAM *streamfdopen(int fd, const char *mode) {
|
||||
FILE *fp = fdopen(fd, mode);
|
||||
|
||||
if (fp == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STREAM *s = new STREAM;
|
||||
s->fp = fp;
|
||||
return s;
|
||||
}
|
||||
|
||||
STREAM *streamfpopen(FILE *fp) {
|
||||
STREAM *s = new STREAM;
|
||||
s->fp = fp;
|
||||
return s;
|
||||
}
|
||||
|
||||
struct read_parallel_arg {
|
||||
int fd = 0;
|
||||
FILE *fp = NULL;
|
||||
STREAM *fp = NULL;
|
||||
long long offset = 0;
|
||||
long long len = 0;
|
||||
std::atomic<int> *is_parsing = NULL;
|
||||
@ -532,7 +576,7 @@ void *run_read_parallel(void *v) {
|
||||
if (munmap(map, rpa->len) != 0) {
|
||||
perror("munmap source file");
|
||||
}
|
||||
if (fclose(rpa->fp) != 0) {
|
||||
if (rpa->fp->fclose() != 0) {
|
||||
perror("close source file");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
@ -543,7 +587,7 @@ void *run_read_parallel(void *v) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void start_parsing(int fd, FILE *fp, long long offset, long long len, std::atomic<int> *is_parsing, pthread_t *parallel_parser, bool &parser_created, const char *reading, std::vector<struct reader> *readers, std::atomic<long long> *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, json_object *filter, int basezoom, int source, std::vector<std::map<std::string, layermap_entry> > &layermaps, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) {
|
||||
void start_parsing(int fd, STREAM *fp, long long offset, long long len, std::atomic<int> *is_parsing, pthread_t *parallel_parser, bool &parser_created, const char *reading, std::vector<struct reader> *readers, std::atomic<long long> *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, json_object *filter, int basezoom, int source, std::vector<std::map<std::string, layermap_entry> > &layermaps, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) {
|
||||
// This has to kick off an intermediate thread to start the parser threads,
|
||||
// so the main thread can get back to reading the next input stage while
|
||||
// the intermediate thread waits for the completion of the parser threads.
|
||||
@ -1394,7 +1438,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
|
||||
int read_parallel_this = read_parallel ? '\n' : 0;
|
||||
|
||||
if (1) {
|
||||
if (!(sources[source].file.size() > 3 && sources[source].file.substr(sources[source].file.size() - 3) == std::string(".gz"))) {
|
||||
if (fstat(fd, &st) == 0) {
|
||||
off = lseek(fd, 0, SEEK_CUR);
|
||||
if (off >= 0) {
|
||||
@ -1439,7 +1483,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
} else {
|
||||
FILE *fp = fdopen(fd, "r");
|
||||
STREAM *fp = streamfdopen(fd, "r");
|
||||
if (fp == NULL) {
|
||||
perror(sources[layer].file.c_str());
|
||||
if (close(fd) != 0) {
|
||||
@ -1449,10 +1493,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
continue;
|
||||
}
|
||||
|
||||
int c = getc(fp);
|
||||
if (c != EOF) {
|
||||
ungetc(c, fp);
|
||||
}
|
||||
int c = fp->peekc();
|
||||
if (c == 0x1E) {
|
||||
read_parallel_this = 0x1E;
|
||||
}
|
||||
@ -1487,7 +1528,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
char buf[READ_BUF];
|
||||
int n;
|
||||
|
||||
while ((n = fread(buf, sizeof(char), READ_BUF, fp)) > 0) {
|
||||
while ((n = fp->fread(buf, sizeof(char), READ_BUF)) > 0) {
|
||||
fwrite_check(buf, sizeof(char), n, readfp, reading.c_str());
|
||||
ahead += n;
|
||||
|
||||
@ -1506,7 +1547,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
}
|
||||
|
||||
fflush(readfp);
|
||||
start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str(), &readers, &progress_seq, exclude, include, exclude_all, filter, basezoom, layer, layermaps, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL);
|
||||
start_parsing(readfd, streamfpopen(readfp), initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str(), &readers, &progress_seq, exclude, include, exclude_all, filter, basezoom, layer, layermaps, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL);
|
||||
|
||||
initial_offset += ahead;
|
||||
overall_offset += ahead;
|
||||
@ -1543,7 +1584,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
fflush(readfp);
|
||||
|
||||
if (ahead > 0) {
|
||||
start_parsing(readfd, readfp, initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str(), &readers, &progress_seq, exclude, include, exclude_all, filter, basezoom, layer, layermaps, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL);
|
||||
start_parsing(readfd, streamfpopen(readfp), initial_offset, ahead, &is_parsing, ¶llel_parser, parser_created, reading.c_str(), &readers, &progress_seq, exclude, include, exclude_all, filter, basezoom, layer, layermaps, initialized, initial_x, initial_y, maxzoom, sources[layer].layer, gamma != 0, attribute_types, read_parallel_this, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL);
|
||||
|
||||
if (parser_created) {
|
||||
if (pthread_join(parallel_parser, NULL) != 0) {
|
||||
@ -1559,7 +1600,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
// Plain serial reading
|
||||
|
||||
std::atomic<long long> layer_seq(overall_offset);
|
||||
json_pull *jp = json_begin_file(fp);
|
||||
json_pull *jp = fp->json_begin();
|
||||
struct serialization_state sst;
|
||||
|
||||
sst.fname = reading.c_str();
|
||||
@ -1591,7 +1632,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
|
||||
checkdisk(&readers);
|
||||
}
|
||||
|
||||
if (fclose(fp) != 0) {
|
||||
if (fp->fclose() != 0) {
|
||||
perror("fclose input");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user