diff --git a/main.cpp b/main.cpp index b50422b..b3af634 100644 --- a/main.cpp +++ b/main.cpp @@ -478,41 +478,31 @@ void do_read_parallel(char *map, long long len, long long initial_offset, const } static ssize_t read_stream(json_pull *j, char *buffer, size_t n); -#define BUF 262144 struct STREAM { - FILE *fp; - - bool gz; - z_stream inflate_s; - Bytef buf[BUF]; - std::vector ahead; + FILE *fp = NULL; + gzFile gz = NULL; int fclose() { - if (gz) { - inflateEnd(&inflate_s); + int ret; + + if (gz != NULL) { + ret = gzclose(gz); + } else { + ret = ::fclose(fp); } - int ret = ::fclose(fp); delete this; return ret; } int peekc() { - if (gz) { - if (ahead.size() > 0) { - return ahead[ahead.size() - 1]; + if (gz != NULL) { + int c = gzgetc(gz); + if (c != EOF) { + gzungetc(c, gz); } - - char tmp[1]; - size_t nread = read(tmp, 1); - - if (nread == 0) { - return EOF; - } - - ahead.push_back(tmp[0]); - return (unsigned char) tmp[0]; + return c; } else { int c = getc(fp); if (c != EOF) { @@ -523,53 +513,13 @@ struct STREAM { } size_t read(char *out, size_t count) { - if (gz) { - if (ahead.size() > 0 && count > 0) { - out[0] = ahead[ahead.size() - 1]; - ahead.resize(ahead.size() - 1); - return 1; - } - - while (true) { - // Move any existing readahead to the start - memmove(buf, inflate_s.next_in, inflate_s.avail_in); - inflate_s.next_in = buf; - - // Read more compressed data if there is room and availability - bool eof = false; - if (inflate_s.avail_in < BUF) { - size_t nread = ::fread(inflate_s.next_in + inflate_s.avail_in, sizeof(char), BUF - inflate_s.avail_in, fp); - inflate_s.avail_in += nread; - - if (nread == 0) { - eof = true; - } - } - - inflate_s.avail_out = count; - inflate_s.next_out = (Bytef *) out; - - int ret = inflate(&inflate_s, Z_SYNC_FLUSH); - - if (ret == Z_STREAM_ERROR || ret == Z_MEM_ERROR || ret == Z_DATA_ERROR) { - fprintf(stderr, "%s: Decompression error %s\n", *av, inflate_s.msg); - exit(EXIT_FAILURE); - } - - if (inflate_s.next_out != (Bytef *) out) { - size_t n = inflate_s.next_out - (Bytef *) out; - return n; - } - - if (ret == Z_STREAM_END) { - return 0; - } - - if (eof) { - fprintf(stderr, "%s: Compressed data was truncated\n", *av); - exit(EXIT_FAILURE); - } + if (gz != NULL) { + int ret = gzread(gz, out, count); + if (ret < 0) { + fprintf(stderr, "%s: Error reading compressed data\n", *av); + exit(EXIT_FAILURE); } + return ret; } else { return ::fread(out, 1, count, fp); } @@ -585,30 +535,22 @@ static ssize_t read_stream(json_pull *j, char *buffer, size_t n) { } STREAM *streamfdopen(int fd, const char *mode, std::string const &fname) { - FILE *fp = fdopen(fd, mode); - if (fp == NULL) { - return NULL; - } - STREAM *s = new STREAM; - s->fp = fp; - s->gz = false; + s->fp = NULL; + s->gz = NULL; if (fname.size() > 3 && fname.substr(fname.size() - 3) == std::string(".gz")) { - s->inflate_s.zalloc = Z_NULL; - s->inflate_s.zfree = Z_NULL; - s->inflate_s.opaque = Z_NULL; - s->inflate_s.avail_in = 0; - s->inflate_s.next_in = Z_NULL; - - if (inflateInit2(&s->inflate_s, 32 + 15) != Z_OK) { - fprintf(stderr, "%s: %s: Decompression error %s\n", *av, fname.c_str(), s->inflate_s.msg); + s->gz = gzdopen(fd, mode); + if (s->gz == NULL) { + fprintf(stderr, "%s: %s: Decompression error\n", *av, fname.c_str()); + exit(EXIT_FAILURE); + } + } else { + s->fp = fdopen(fd, mode); + if (s->fp == NULL) { + perror(fname.c_str()); exit(EXIT_FAILURE); } - - s->inflate_s.next_in = s->buf; - s->inflate_s.avail_in = 0; - s->gz = true; } return s; @@ -617,7 +559,7 @@ STREAM *streamfdopen(int fd, const char *mode, std::string const &fname) { STREAM *streamfpopen(FILE *fp) { STREAM *s = new STREAM; s->fp = fp; - s->gz = false; + s->gz = NULL; return s; }