Make file positions and lengths thread-safe

This commit is contained in:
Eric Fischer 2018-05-07 13:17:00 -07:00
parent 2cf0524037
commit 59dd095607
10 changed files with 110 additions and 60 deletions

View File

@ -1,3 +1,7 @@
## 1.27.16
* Fix thread safety issues related to the out-of-disk-space checker
## 1.27.15
* --extend-zooms-if-still-dropping now also extends zooms if features

View File

@ -23,7 +23,7 @@
static int pnpoly(drawvec &vert, size_t start, size_t nvert, long long testx, long long testy);
static int clip(double *x0, double *y0, double *x1, double *y1, double xmin, double ymin, double xmax, double ymax);
drawvec decode_geometry(FILE *meta, long long *geompos, int z, unsigned tx, unsigned ty, long long *bbox, unsigned initial_x, unsigned initial_y) {
drawvec decode_geometry(FILE *meta, std::atomic<long long> *geompos, int z, unsigned tx, unsigned ty, long long *bbox, unsigned initial_x, unsigned initial_y) {
drawvec out;
bbox[0] = LLONG_MAX;

View File

@ -2,6 +2,7 @@
#define GEOMETRY_HPP
#include <vector>
#include <atomic>
#include <sqlite3.h>
#define VT_POINT 1
@ -55,7 +56,7 @@ struct draw {
typedef std::vector<draw> drawvec;
drawvec decode_geometry(FILE *meta, long long *geompos, int z, unsigned tx, unsigned ty, long long *bbox, unsigned initial_x, unsigned initial_y);
drawvec decode_geometry(FILE *meta, std::atomic<long long> *geompos, int z, unsigned tx, unsigned ty, long long *bbox, unsigned initial_x, unsigned initial_y);
void to_tile_scale(drawvec &geom, int z, int detail);
drawvec remove_noop(drawvec geom, int type, int shift);
drawvec clip_point(drawvec &geom, int z, long long buffer);

View File

@ -287,7 +287,7 @@ int calc_feature_minzoom(struct index *ix, struct drop_state *ds, int maxzoom, d
return feature_minzoom;
}
static void merge(struct mergelist *merges, size_t nmerges, unsigned char *map, FILE *indexfile, int bytes, char *geom_map, FILE *geom_out, long long *geompos, long long *progress, long long *progress_max, long long *progress_reported, int maxzoom, double gamma, struct drop_state *ds) {
static void merge(struct mergelist *merges, size_t nmerges, unsigned char *map, FILE *indexfile, int bytes, char *geom_map, FILE *geom_out, std::atomic<long long> *geompos, long long *progress, long long *progress_max, long long *progress_reported, int maxzoom, double gamma, struct drop_state *ds) {
struct mergelist *head = NULL;
for (size_t i = 0; i < nmerges; i++) {
@ -592,7 +592,7 @@ void start_parsing(int fd, FILE *fp, long long offset, long long len, std::atomi
parser_created = true;
}
void radix1(int *geomfds_in, int *indexfds_in, int inputs, int prefix, int splits, long long mem, const char *tmpdir, long long *availfiles, FILE *geomfile, FILE *indexfile, long long *geompos_out, long long *progress, long long *progress_max, long long *progress_reported, int maxzoom, int basezoom, double droprate, double gamma, struct drop_state *ds) {
void radix1(int *geomfds_in, int *indexfds_in, int inputs, int prefix, int splits, long long mem, const char *tmpdir, long long *availfiles, FILE *geomfile, FILE *indexfile, std::atomic<long long> *geompos_out, long long *progress, long long *progress_max, long long *progress_reported, int maxzoom, int basezoom, double droprate, double gamma, struct drop_state *ds) {
// Arranged as bits to facilitate subdividing again if a subdivided file is still huge
int splitbits = log(splits) / log(2);
splits = 1 << splitbits;
@ -601,7 +601,7 @@ void radix1(int *geomfds_in, int *indexfds_in, int inputs, int prefix, int split
FILE *indexfiles[splits];
int geomfds[splits];
int indexfds[splits];
long long sub_geompos[splits];
std::atomic<long long> sub_geompos[splits];
int i;
for (i = 0; i < splits; i++) {
@ -741,7 +741,7 @@ void radix1(int *geomfds_in, int *indexfds_in, int inputs, int prefix, int split
if (indexst.st_size > 0) {
if (indexst.st_size + geomst.st_size < mem) {
long long indexpos = indexst.st_size;
std::atomic<long long> indexpos(indexst.st_size);
int bytes = sizeof(struct index);
int page = sysconf(_SC_PAGESIZE);
@ -917,7 +917,7 @@ void prep_drop_states(struct drop_state *ds, int maxzoom, int basezoom, double d
}
}
void radix(std::vector<struct reader> &readers, int nreaders, FILE *geomfile, FILE *indexfile, const char *tmpdir, long long *geompos, int maxzoom, int basezoom, double droprate, double gamma) {
void radix(std::vector<struct reader> &readers, int nreaders, FILE *geomfile, FILE *indexfile, const char *tmpdir, std::atomic<long long> *geompos, int maxzoom, int basezoom, double droprate, double gamma) {
// Run through the index and geometry for each reader,
// splitting the contents out by index into as many
// sub-files as we can write to simultaneously.
@ -1689,8 +1689,8 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
unlink(metaname);
long long metapos = 0;
long long poolpos = 0;
std::atomic<long long> metapos(0);
std::atomic<long long> poolpos(0);
for (size_t i = 0; i < CPUS; i++) {
if (readers[i].metapos > 0) {
@ -1795,7 +1795,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
iy = justy;
}
long long geompos = 0;
std::atomic<long long> geompos(0);
/* initial tile is 0/0/0 */
serialize_int(geomfile, iz, &geompos, fname);
@ -1821,13 +1821,16 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
perror("stat index");
exit(EXIT_FAILURE);
}
long long indexpos = indexst.st_size;
std::atomic<long long> indexpos(indexst.st_size);
progress_seq = indexpos / sizeof(struct index);
last_progress = 0;
if (!quiet) {
long long s = progress_seq;
fprintf(stderr, "%lld features, %lld bytes of geometry, %lld bytes of separate metadata, %lld bytes of string pool\n", s, geompos, metapos, poolpos);
long long geompos_print = geompos;
long long metapos_print = metapos;
long long poolpos_print = poolpos;
fprintf(stderr, "%lld features, %lld bytes of geometry, %lld bytes of separate metadata, %lld bytes of string pool\n", s, geompos_print, metapos_print, poolpos_print);
}
if (indexpos == 0) {

View File

@ -1,12 +1,18 @@
#ifndef MEMFILE_HPP
#define MEMFILE_HPP
#include <atomic>
struct memfile {
int fd = 0;
char *map = NULL;
long long len = 0;
std::atomic<long long> len;
long long off = 0;
unsigned long tree = 0;
memfile()
: len(0) {
}
};
struct memfile *memfile_open(int fd);

View File

@ -30,17 +30,17 @@ size_t fwrite_check(const void *ptr, size_t size, size_t nitems, FILE *stream, c
return w;
}
void serialize_int(FILE *out, int n, long long *fpos, const char *fname) {
void serialize_int(FILE *out, int n, std::atomic<long long> *fpos, const char *fname) {
serialize_long_long(out, n, fpos, fname);
}
void serialize_long_long(FILE *out, long long n, long long *fpos, const char *fname) {
void serialize_long_long(FILE *out, long long n, std::atomic<long long> *fpos, const char *fname) {
unsigned long long zigzag = protozero::encode_zigzag64(n);
serialize_ulong_long(out, zigzag, fpos, fname);
}
void serialize_ulong_long(FILE *out, unsigned long long zigzag, long long *fpos, const char *fname) {
void serialize_ulong_long(FILE *out, unsigned long long zigzag, std::atomic<long long> *fpos, const char *fname) {
while (1) {
unsigned char b = zigzag & 0x7F;
if ((zigzag >> 7) != 0) {
@ -62,12 +62,12 @@ void serialize_ulong_long(FILE *out, unsigned long long zigzag, long long *fpos,
}
}
void serialize_byte(FILE *out, signed char n, long long *fpos, const char *fname) {
void serialize_byte(FILE *out, signed char n, std::atomic<long long> *fpos, const char *fname) {
fwrite_check(&n, sizeof(signed char), 1, out, fname);
*fpos += sizeof(signed char);
}
void serialize_uint(FILE *out, unsigned n, long long *fpos, const char *fname) {
void serialize_uint(FILE *out, unsigned n, std::atomic<long long> *fpos, const char *fname) {
fwrite_check(&n, sizeof(unsigned), 1, out, fname);
*fpos += sizeof(unsigned);
}
@ -112,14 +112,14 @@ void deserialize_byte(char **f, signed char *n) {
*f += sizeof(signed char);
}
int deserialize_long_long_io(FILE *f, long long *n, long long *geompos) {
int deserialize_long_long_io(FILE *f, long long *n, std::atomic<long long> *geompos) {
unsigned long long zigzag = 0;
int ret = deserialize_ulong_long_io(f, &zigzag, geompos);
*n = protozero::decode_zigzag64(zigzag);
return ret;
}
int deserialize_ulong_long_io(FILE *f, unsigned long long *zigzag, long long *geompos) {
int deserialize_ulong_long_io(FILE *f, unsigned long long *zigzag, std::atomic<long long> *geompos) {
*zigzag = 0;
int shift = 0;
@ -143,14 +143,14 @@ int deserialize_ulong_long_io(FILE *f, unsigned long long *zigzag, long long *ge
return 1;
}
int deserialize_int_io(FILE *f, int *n, long long *geompos) {
int deserialize_int_io(FILE *f, int *n, std::atomic<long long> *geompos) {
long long ll = 0;
int ret = deserialize_long_long_io(f, &ll, geompos);
*n = ll;
return ret;
}
int deserialize_uint_io(FILE *f, unsigned *n, long long *geompos) {
int deserialize_uint_io(FILE *f, unsigned *n, std::atomic<long long> *geompos) {
if (fread(n, sizeof(unsigned), 1, f) != 1) {
return 0;
}
@ -158,7 +158,7 @@ int deserialize_uint_io(FILE *f, unsigned *n, long long *geompos) {
return 1;
}
int deserialize_byte_io(FILE *f, signed char *n, long long *geompos) {
int deserialize_byte_io(FILE *f, signed char *n, std::atomic<long long> *geompos) {
int c = getc(f);
if (c == EOF) {
return 0;
@ -168,7 +168,7 @@ int deserialize_byte_io(FILE *f, signed char *n, long long *geompos) {
return 1;
}
static void write_geometry(drawvec const &dv, long long *fpos, FILE *out, const char *fname, long long wx, long long wy) {
static void write_geometry(drawvec const &dv, std::atomic<long long> *fpos, FILE *out, const char *fname, long long wx, long long wy) {
for (size_t i = 0; i < dv.size(); i++) {
if (dv[i].op == VT_MOVETO || dv[i].op == VT_LINETO) {
serialize_byte(out, dv[i].op, fpos, fname);
@ -182,7 +182,7 @@ static void write_geometry(drawvec const &dv, long long *fpos, FILE *out, const
}
}
void serialize_feature(FILE *geomfile, serial_feature *sf, long long *geompos, const char *fname, long long wx, long long wy, bool include_minzoom) {
void serialize_feature(FILE *geomfile, serial_feature *sf, std::atomic<long long> *geompos, const char *fname, long long wx, long long wy, bool include_minzoom) {
serialize_byte(geomfile, sf->t, geompos, fname);
long long layer = 0;
@ -235,7 +235,7 @@ void serialize_feature(FILE *geomfile, serial_feature *sf, long long *geompos, c
}
}
serial_feature deserialize_feature(FILE *geoms, long long *geompos_in, char *metabase, long long *meta_off, unsigned z, unsigned tx, unsigned ty, unsigned *initial_x, unsigned *initial_y) {
serial_feature deserialize_feature(FILE *geoms, std::atomic<long long> *geompos_in, char *metabase, long long *meta_off, unsigned z, unsigned tx, unsigned ty, unsigned *initial_x, unsigned *initial_y) {
serial_feature sf;
deserialize_byte_io(geoms, &sf.t, geompos_in);

View File

@ -12,12 +12,12 @@
size_t fwrite_check(const void *ptr, size_t size, size_t nitems, FILE *stream, const char *fname);
void serialize_int(FILE *out, int n, long long *fpos, const char *fname);
void serialize_long_long(FILE *out, long long n, long long *fpos, const char *fname);
void serialize_ulong_long(FILE *out, unsigned long long n, long long *fpos, const char *fname);
void serialize_byte(FILE *out, signed char n, long long *fpos, const char *fname);
void serialize_uint(FILE *out, unsigned n, long long *fpos, const char *fname);
void serialize_string(FILE *out, const char *s, long long *fpos, const char *fname);
void serialize_int(FILE *out, int n, std::atomic<long long> *fpos, const char *fname);
void serialize_long_long(FILE *out, long long n, std::atomic<long long> *fpos, const char *fname);
void serialize_ulong_long(FILE *out, unsigned long long n, std::atomic<long long> *fpos, const char *fname);
void serialize_byte(FILE *out, signed char n, std::atomic<long long> *fpos, const char *fname);
void serialize_uint(FILE *out, unsigned n, std::atomic<long long> *fpos, const char *fname);
void serialize_string(FILE *out, const char *s, std::atomic<long long> *fpos, const char *fname);
void deserialize_int(char **f, int *n);
void deserialize_long_long(char **f, long long *n);
@ -25,11 +25,11 @@ void deserialize_ulong_long(char **f, unsigned long long *n);
void deserialize_uint(char **f, unsigned *n);
void deserialize_byte(char **f, signed char *n);
int deserialize_int_io(FILE *f, int *n, long long *geompos);
int deserialize_long_long_io(FILE *f, long long *n, long long *geompos);
int deserialize_ulong_long_io(FILE *f, unsigned long long *n, long long *geompos);
int deserialize_uint_io(FILE *f, unsigned *n, long long *geompos);
int deserialize_byte_io(FILE *f, signed char *n, long long *geompos);
int deserialize_int_io(FILE *f, int *n, std::atomic<long long> *geompos);
int deserialize_long_long_io(FILE *f, long long *n, std::atomic<long long> *geompos);
int deserialize_ulong_long_io(FILE *f, unsigned long long *n, std::atomic<long long> *geompos);
int deserialize_uint_io(FILE *f, unsigned *n, std::atomic<long long> *geompos);
int deserialize_byte_io(FILE *f, signed char *n, std::atomic<long long> *geompos);
struct serial_val {
int type = 0;
@ -70,8 +70,8 @@ struct serial_feature {
bool dropped = false;
};
void serialize_feature(FILE *geomfile, serial_feature *sf, long long *geompos, const char *fname, long long wx, long long wy, bool include_minzoom);
serial_feature deserialize_feature(FILE *geoms, long long *geompos_in, char *metabase, long long *meta_off, unsigned z, unsigned tx, unsigned ty, unsigned *initial_x, unsigned *initial_y);
void serialize_feature(FILE *geomfile, serial_feature *sf, std::atomic<long long> *geompos, const char *fname, long long wx, long long wy, bool include_minzoom);
serial_feature deserialize_feature(FILE *geoms, std::atomic<long long> *geompos_in, char *metabase, long long *meta_off, unsigned z, unsigned tx, unsigned ty, unsigned *initial_x, unsigned *initial_y);
struct reader {
int metafd = -1;
@ -86,9 +86,9 @@ struct reader {
FILE *geomfile = NULL;
FILE *indexfile = NULL;
long long metapos = 0;
long long geompos = 0;
long long indexpos = 0;
std::atomic<long long> metapos;
std::atomic<long long> geompos;
std::atomic<long long> indexpos;
long long file_bbox[4] = {0, 0, 0, 0};
@ -96,6 +96,40 @@ struct reader {
struct stat metast {};
char *geom_map = NULL;
reader()
: metapos(0), geompos(0), indexpos(0) {
}
reader(reader const &r) {
metafd = r.metafd;
poolfd = r.poolfd;
treefd = r.treefd;
geomfd = r.geomfd;
indexfd = r.indexfd;
metafile = r.metafile;
poolfile = r.poolfile;
treefile = r.treefile;
geomfile = r.geomfile;
indexfile = r.indexfile;
long long p = r.metapos;
metapos = p;
p = r.geompos;
geompos = p;
p = r.indexpos;
indexpos = p;
memcpy(file_bbox, r.file_bbox, sizeof(file_bbox));
geomst = r.geomst;
metast = r.metast;
geom_map = r.geom_map;
}
};
struct serialization_state {

View File

@ -244,7 +244,7 @@ static int metacmp(const std::vector<long long> &keys1, const std::vector<long l
}
}
void rewrite(drawvec &geom, int z, int nextzoom, int maxzoom, long long *bbox, unsigned tx, unsigned ty, int buffer, int *within, long long *geompos, FILE **geomfile, const char *fname, signed char t, int layer, long long metastart, signed char feature_minzoom, int child_shards, int max_zoom_increment, long long seq, int tippecanoe_minzoom, int tippecanoe_maxzoom, int segment, unsigned *initial_x, unsigned *initial_y, std::vector<long long> &metakeys, std::vector<long long> &metavals, bool has_id, unsigned long long id, unsigned long long index, long long extent) {
void rewrite(drawvec &geom, int z, int nextzoom, int maxzoom, long long *bbox, unsigned tx, unsigned ty, int buffer, int *within, std::atomic<long long> *geompos, FILE **geomfile, const char *fname, signed char t, int layer, long long metastart, signed char feature_minzoom, int child_shards, int max_zoom_increment, long long seq, int tippecanoe_minzoom, int tippecanoe_maxzoom, int segment, unsigned *initial_x, unsigned *initial_y, std::vector<long long> &metakeys, std::vector<long long> &metavals, bool has_id, unsigned long long id, unsigned long long index, long long extent) {
if (geom.size() > 0 && (nextzoom <= maxzoom || additional[A_EXTEND_ZOOMS])) {
int xo, yo;
int span = 1 << (nextzoom - z);
@ -1288,7 +1288,7 @@ bool clip_to_tile(serial_feature &sf, int z, long long buffer) {
return false;
}
serial_feature next_feature(FILE *geoms, long long *geompos_in, char *metabase, long long *meta_off, int z, unsigned tx, unsigned ty, unsigned *initial_x, unsigned *initial_y, long long *original_features, long long *unclipped_features, int nextzoom, int maxzoom, int minzoom, int max_zoom_increment, size_t pass, size_t passes, std::atomic<long long> *along, long long alongminus, int buffer, int *within, bool *first_time, FILE **geomfile, long long *geompos, std::atomic<double> *oprogress, double todo, const char *fname, int child_shards, struct json_object *filter, const char *stringpool, long long *pool_off, std::vector<std::vector<std::string>> *layer_unmaps) {
serial_feature next_feature(FILE *geoms, std::atomic<long long> *geompos_in, char *metabase, long long *meta_off, int z, unsigned tx, unsigned ty, unsigned *initial_x, unsigned *initial_y, long long *original_features, long long *unclipped_features, int nextzoom, int maxzoom, int minzoom, int max_zoom_increment, size_t pass, size_t passes, std::atomic<long long> *along, long long alongminus, int buffer, int *within, bool *first_time, FILE **geomfile, std::atomic<long long> *geompos, std::atomic<double> *oprogress, double todo, const char *fname, int child_shards, struct json_object *filter, const char *stringpool, long long *pool_off, std::vector<std::vector<std::string>> *layer_unmaps) {
while (1) {
serial_feature sf = deserialize_feature(geoms, geompos_in, metabase, meta_off, z, tx, ty, initial_x, initial_y);
if (sf.t < 0) {
@ -1412,7 +1412,7 @@ serial_feature next_feature(FILE *geoms, long long *geompos_in, char *metabase,
struct run_prefilter_args {
FILE *geoms = NULL;
long long *geompos_in = NULL;
std::atomic<long long> *geompos_in = NULL;
char *metabase = NULL;
long long *meta_off = NULL;
int z = 0;
@ -1434,7 +1434,7 @@ struct run_prefilter_args {
int *within = NULL;
bool *first_time = NULL;
FILE **geomfile = NULL;
long long *geompos = NULL;
std::atomic<long long> *geompos = NULL;
std::atomic<double> *oprogress = NULL;
double todo = 0;
const char *fname = 0;
@ -1665,7 +1665,7 @@ bool find_partial(std::vector<partial> &partials, serial_feature &sf, ssize_t &o
return false;
}
long long write_tile(FILE *geoms, long long *geompos_in, char *metabase, char *stringpool, int z, unsigned tx, unsigned ty, int detail, int min_detail, sqlite3 *outdb, const char *outdir, int buffer, const char *fname, FILE **geomfile, int minzoom, int maxzoom, double todo, std::atomic<long long> *along, long long alongminus, double gamma, int child_shards, long long *meta_off, long long *pool_off, unsigned *initial_x, unsigned *initial_y, std::atomic<int> *running, double simplification, std::vector<std::map<std::string, layermap_entry>> *layermaps, std::vector<std::vector<std::string>> *layer_unmaps, size_t tiling_seg, size_t pass, size_t passes, unsigned long long mingap, long long minextent, double fraction, const char *prefilter, const char *postfilter, struct json_object *filter, write_tile_args *arg) {
long long write_tile(FILE *geoms, std::atomic<long long> *geompos_in, char *metabase, char *stringpool, int z, unsigned tx, unsigned ty, int detail, int min_detail, sqlite3 *outdb, const char *outdir, int buffer, const char *fname, FILE **geomfile, int minzoom, int maxzoom, double todo, std::atomic<long long> *along, long long alongminus, double gamma, int child_shards, long long *meta_off, long long *pool_off, unsigned *initial_x, unsigned *initial_y, std::atomic<int> *running, double simplification, std::vector<std::map<std::string, layermap_entry>> *layermaps, std::vector<std::vector<std::string>> *layer_unmaps, size_t tiling_seg, size_t pass, size_t passes, unsigned long long mingap, long long minextent, double fraction, const char *prefilter, const char *postfilter, struct json_object *filter, write_tile_args *arg) {
int line_detail;
double merge_fraction = 1;
double mingap_fraction = 1;
@ -1721,9 +1721,11 @@ long long write_tile(FILE *geoms, long long *geompos_in, char *metabase, char *s
double coalesced_area = 0;
int within[child_shards];
long long geompos[child_shards];
memset(within, '\0', child_shards * sizeof(int));
memset(geompos, '\0', child_shards * sizeof(long long));
std::atomic<long long> geompos[child_shards];
for (size_t i = 0; i < (size_t) child_shards; i++) {
geompos[i] = 0;
within[i] = 0;
}
if (*geompos_in != og) {
if (fseek(geoms, og, SEEK_SET) != 0) {
@ -2469,7 +2471,7 @@ void *run_thread(void *vargs) {
exit(EXIT_FAILURE);
}
long long geompos = 0;
std::atomic<long long> geompos(0);
long long prevgeom = 0;
while (1) {

View File

@ -1,6 +1,6 @@
#ifndef VERSION_HPP
#define VERSION_HPP
#define VERSION "tippecanoe v1.27.15\n"
#define VERSION "tippecanoe v1.27.16\n"
#endif