All the other places where I used volatile but really wanted atomic

This commit is contained in:
Eric Fischer 2018-03-13 15:21:21 -07:00
parent f1eede1106
commit fac0ebbf52
6 changed files with 37 additions and 31 deletions

View File

@ -476,7 +476,9 @@ void runQueue() {
}
}
*((*(feature_queue[0].sst))[0].layer_seq) = *((*(feature_queue[0].sst))[CPUS - 1].layer_seq);
// Lack of atomicity is OK, since we are single-threaded again here
long long was = *((*(feature_queue[0].sst))[CPUS - 1].layer_seq);
*((*(feature_queue[0].sst))[0].layer_seq) = was;
feature_queue.clear();
}

View File

@ -388,7 +388,7 @@ void *run_sort(void *v) {
return NULL;
}
void do_read_parallel(char *map, long long len, long long initial_offset, const char *reading, std::vector<struct reader> *readers, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, json_object *filter, int basezoom, int source, std::vector<std::map<std::string, layermap_entry> > *layermaps, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) {
void do_read_parallel(char *map, long long len, long long initial_offset, const char *reading, std::vector<struct reader> *readers, std::atomic<long long> *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, json_object *filter, int basezoom, int source, std::vector<std::map<std::string, layermap_entry> > *layermaps, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) {
long long segs[CPUS + 1];
segs[0] = 0;
segs[CPUS] = len;
@ -404,7 +404,7 @@ void do_read_parallel(char *map, long long len, long long initial_offset, const
double dist_sums[CPUS];
size_t dist_counts[CPUS];
volatile long long layer_seq[CPUS];
std::atomic<long long> layer_seq[CPUS];
for (size_t i = 0; i < CPUS; i++) {
// To preserve feature ordering, unique id for each segment
// begins with that segment's offset into the input
@ -481,12 +481,12 @@ struct read_parallel_arg {
FILE *fp = NULL;
long long offset = 0;
long long len = 0;
volatile int *is_parsing = NULL;
std::atomic<int> *is_parsing = NULL;
int separator = 0;
const char *reading = NULL;
std::vector<struct reader> *readers = NULL;
volatile long long *progress_seq = NULL;
std::atomic<long long> *progress_seq = NULL;
std::set<std::string> *exclude = NULL;
std::set<std::string> *include = NULL;
int exclude_all = 0;
@ -543,7 +543,7 @@ void *run_read_parallel(void *v) {
return NULL;
}
void start_parsing(int fd, FILE *fp, long long offset, long long len, volatile int *is_parsing, pthread_t *parallel_parser, bool &parser_created, const char *reading, std::vector<struct reader> *readers, volatile long long *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, json_object *filter, int basezoom, int source, std::vector<std::map<std::string, layermap_entry> > &layermaps, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) {
void start_parsing(int fd, FILE *fp, long long offset, long long len, std::atomic<int> *is_parsing, pthread_t *parallel_parser, bool &parser_created, const char *reading, std::vector<struct reader> *readers, std::atomic<long long> *progress_seq, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, json_object *filter, int basezoom, int source, std::vector<std::map<std::string, layermap_entry> > &layermaps, int *initialized, unsigned *initial_x, unsigned *initial_y, int maxzoom, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, int separator, double *dist_sum, size_t *dist_count, bool want_dist, bool filters) {
// This has to kick off an intermediate thread to start the parser threads,
// so the main thread can get back to reading the next input stage while
// the intermediate thread waits for the completion of the parser threads.
@ -1145,7 +1145,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
}
diskfree = (long long) fsstat.f_bsize * fsstat.f_bavail;
volatile long long progress_seq = 0;
std::atomic<long long> progress_seq(0);
// 2 * CPUS: One per reader thread, one per tiling thread
int initialized[2 * CPUS];
@ -1282,7 +1282,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
exit(EXIT_FAILURE);
}
long long layer_seq[CPUS];
std::atomic<long long> layer_seq[CPUS];
double dist_sums[CPUS];
size_t dist_counts[CPUS];
std::vector<struct serialization_state> sst;
@ -1339,7 +1339,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
}
if (sources[source].file.size() > 4 && sources[source].file.substr(sources[source].file.size() - 4) == std::string(".csv")) {
long long layer_seq[CPUS];
std::atomic<long long> layer_seq[CPUS];
double dist_sums[CPUS];
size_t dist_counts[CPUS];
@ -1474,7 +1474,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
}
unlink(readname);
volatile int is_parsing = 0;
std::atomic<int> is_parsing(0);
long long ahead = 0;
long long initial_offset = overall_offset;
pthread_t parallel_parser;
@ -1558,7 +1558,7 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
} else {
// Plain serial reading
long long layer_seq = overall_offset;
std::atomic<long long> layer_seq(overall_offset);
json_pull *jp = json_begin_file(fp);
struct serialization_state sst;
@ -1826,7 +1826,8 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
last_progress = 0;
if (!quiet) {
fprintf(stderr, "%lld features, %lld bytes of geometry, %lld bytes of separate metadata, %lld bytes of string pool\n", progress_seq, geompos, metapos, poolpos);
long long s = progress_seq;
fprintf(stderr, "%lld features, %lld bytes of geometry, %lld bytes of separate metadata, %lld bytes of string pool\n", s, geompos, metapos, poolpos);
}
if (indexpos == 0) {
@ -2150,7 +2151,8 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
size[j] = 0;
}
unsigned midx = 0, midy = 0;
std::atomic<unsigned> midx(0);
std::atomic<unsigned> midy(0);
int written = traverse_zooms(fd, size, meta, stringpool, &midx, &midy, maxzoom, minzoom, outdb, outdir, buffer, fname, tmpdir, gamma, full_detail, low_detail, min_detail, meta_off, pool_off, initial_x, initial_y, simplification, layermaps, prefilter, postfilter, attribute_accum);
if (maxzoom != written) {

View File

@ -432,7 +432,7 @@ int serialize_feature(struct serialization_state *sst, serial_feature &sf) {
inline_meta = false;
if (prevent[P_CLIPPING]) {
static volatile long long warned = 0;
static std::atomic<long long> warned(0);
long long extent = ((sf.bbox[2] - sf.bbox[0]) / ((1LL << (32 - sst->maxzoom)) + 1)) * ((sf.bbox[3] - sf.bbox[1]) / ((1LL << (32 - sst->maxzoom)) + 1));
if (extent > warned) {
fprintf(stderr, "Warning: %s:%d: Large unclipped (-pc) feature may be duplicated across %lld tiles\n", sst->fname, sst->line, extent);

View File

@ -4,6 +4,7 @@
#include <stddef.h>
#include <stdio.h>
#include <vector>
#include <atomic>
#include <sys/stat.h>
#include "geometry.hpp"
#include "mbtiles.hpp"
@ -101,8 +102,8 @@ struct serialization_state {
const char *fname = NULL; // source file name
int line = 0; // user-oriented location within source for error reports
volatile long long *layer_seq = NULL; // sequence within current layer
volatile long long *progress_seq = NULL; // overall sequence for progress indicator
std::atomic<long long> *layer_seq = NULL; // sequence within current layer
std::atomic<long long> *progress_seq = NULL; // overall sequence for progress indicator
std::vector<struct reader> *readers = NULL; // array of data for each input thread
int segment = 0; // the current input thread

View File

@ -1175,25 +1175,25 @@ struct write_tile_args {
const char *fname = NULL;
FILE **geomfile = NULL;
double todo = 0;
volatile long long *along = NULL;
std::atomic<long long> *along = NULL;
double gamma = 0;
double gamma_out = 0;
int child_shards = 0;
int *geomfd = NULL;
off_t *geom_size = NULL;
volatile unsigned *midx = NULL;
volatile unsigned *midy = NULL;
std::atomic<unsigned> *midx = NULL;
std::atomic<unsigned> *midy = NULL;
int maxzoom = 0;
int minzoom = 0;
int full_detail = 0;
int low_detail = 0;
double simplification = 0;
volatile long long *most = NULL;
std::atomic<long long> *most = NULL;
long long *meta_off = NULL;
long long *pool_off = NULL;
unsigned *initial_x = NULL;
unsigned *initial_y = NULL;
volatile int *running = NULL;
std::atomic<int> *running = NULL;
int err = 0;
std::vector<std::map<std::string, layermap_entry>> *layermaps = NULL;
std::vector<std::vector<std::string>> *layer_unmaps = NULL;
@ -1289,7 +1289,7 @@ bool clip_to_tile(serial_feature &sf, int z, long long buffer) {
return false;
}
serial_feature next_feature(FILE *geoms, long long *geompos_in, char *metabase, long long *meta_off, int z, unsigned tx, unsigned ty, unsigned *initial_x, unsigned *initial_y, long long *original_features, long long *unclipped_features, int nextzoom, int maxzoom, int minzoom, int max_zoom_increment, size_t pass, size_t passes, volatile long long *along, long long alongminus, int buffer, int *within, bool *first_time, FILE **geomfile, long long *geompos, volatile double *oprogress, double todo, const char *fname, int child_shards) {
serial_feature next_feature(FILE *geoms, long long *geompos_in, char *metabase, long long *meta_off, int z, unsigned tx, unsigned ty, unsigned *initial_x, unsigned *initial_y, long long *original_features, long long *unclipped_features, int nextzoom, int maxzoom, int minzoom, int max_zoom_increment, size_t pass, size_t passes, std::atomic<long long> *along, long long alongminus, int buffer, int *within, bool *first_time, FILE **geomfile, long long *geompos, std::atomic<double> *oprogress, double todo, const char *fname, int child_shards) {
while (1) {
serial_feature sf = deserialize_feature(geoms, geompos_in, metabase, meta_off, z, tx, ty, initial_x, initial_y);
if (sf.t < 0) {
@ -1357,14 +1357,14 @@ struct run_prefilter_args {
int max_zoom_increment = 0;
size_t pass = 0;
size_t passes = 0;
volatile long long *along = 0;
std::atomic<long long> *along = 0;
long long alongminus = 0;
int buffer = 0;
int *within = NULL;
bool *first_time = NULL;
FILE **geomfile = NULL;
long long *geompos = NULL;
volatile double *oprogress = NULL;
std::atomic<double> *oprogress = NULL;
double todo = 0;
const char *fname = 0;
int child_shards = 0;
@ -1593,13 +1593,13 @@ bool find_partial(std::vector<partial> &partials, serial_feature &sf, ssize_t &o
return false;
}
long long write_tile(FILE *geoms, long long *geompos_in, char *metabase, char *stringpool, int z, unsigned tx, unsigned ty, int detail, int min_detail, sqlite3 *outdb, const char *outdir, int buffer, const char *fname, FILE **geomfile, int minzoom, int maxzoom, double todo, volatile long long *along, long long alongminus, double gamma, int child_shards, long long *meta_off, long long *pool_off, unsigned *initial_x, unsigned *initial_y, volatile int *running, double simplification, std::vector<std::map<std::string, layermap_entry>> *layermaps, std::vector<std::vector<std::string>> *layer_unmaps, size_t tiling_seg, size_t pass, size_t passes, unsigned long long mingap, long long minextent, double fraction, const char *prefilter, const char *postfilter, write_tile_args *arg) {
long long write_tile(FILE *geoms, long long *geompos_in, char *metabase, char *stringpool, int z, unsigned tx, unsigned ty, int detail, int min_detail, sqlite3 *outdb, const char *outdir, int buffer, const char *fname, FILE **geomfile, int minzoom, int maxzoom, double todo, std::atomic<long long> *along, long long alongminus, double gamma, int child_shards, long long *meta_off, long long *pool_off, unsigned *initial_x, unsigned *initial_y, std::atomic<int> *running, double simplification, std::vector<std::map<std::string, layermap_entry>> *layermaps, std::vector<std::vector<std::string>> *layer_unmaps, size_t tiling_seg, size_t pass, size_t passes, unsigned long long mingap, long long minextent, double fraction, const char *prefilter, const char *postfilter, write_tile_args *arg) {
int line_detail;
double merge_fraction = 1;
double mingap_fraction = 1;
double minextent_fraction = 1;
static volatile double oprogress = 0;
static std::atomic<double> oprogress(0);
long long og = *geompos_in;
// XXX is there a way to do this without floating point?
@ -2476,7 +2476,7 @@ void *run_thread(void *vargs) {
return NULL;
}
int traverse_zooms(int *geomfd, off_t *geom_size, char *metabase, char *stringpool, unsigned *midx, unsigned *midy, int &maxzoom, int minzoom, sqlite3 *outdb, const char *outdir, int buffer, const char *fname, const char *tmpdir, double gamma, int full_detail, int low_detail, int min_detail, long long *meta_off, long long *pool_off, unsigned *initial_x, unsigned *initial_y, double simplification, std::vector<std::map<std::string, layermap_entry>> &layermaps, const char *prefilter, const char *postfilter, std::map<std::string, attribute_op> const *attribute_accum) {
int traverse_zooms(int *geomfd, off_t *geom_size, char *metabase, char *stringpool, std::atomic<unsigned> *midx, std::atomic<unsigned> *midy, int &maxzoom, int minzoom, sqlite3 *outdb, const char *outdir, int buffer, const char *fname, const char *tmpdir, double gamma, int full_detail, int low_detail, int min_detail, long long *meta_off, long long *pool_off, unsigned *initial_x, unsigned *initial_y, double simplification, std::vector<std::map<std::string, layermap_entry>> &layermaps, const char *prefilter, const char *postfilter, std::map<std::string, attribute_op> const *attribute_accum) {
last_progress = 0;
// The existing layermaps are one table per input thread.
@ -2502,7 +2502,7 @@ int traverse_zooms(int *geomfd, off_t *geom_size, char *metabase, char *stringpo
int i;
for (i = 0; i <= maxzoom; i++) {
long long most = 0;
std::atomic<long long> most(0);
FILE *sub[TEMP_FILES];
int subfd[TEMP_FILES];
@ -2621,8 +2621,8 @@ int traverse_zooms(int *geomfd, off_t *geom_size, char *metabase, char *stringpo
pthread_t pthreads[threads];
std::vector<write_tile_args> args;
args.resize(threads);
int running = threads;
long long along = 0;
std::atomic<int> running(threads);
std::atomic<long long> along(0);
for (size_t thread = 0; thread < threads; thread++) {
args[thread].metabase = metabase;

View File

@ -4,6 +4,7 @@
#include <stdio.h>
#include <sqlite3.h>
#include <vector>
#include <atomic>
#include <map>
#include "mbtiles.hpp"
@ -19,7 +20,7 @@ enum attribute_op {
long long write_tile(char **geom, char *metabase, char *stringpool, unsigned *file_bbox, int z, unsigned x, unsigned y, int detail, int min_detail, int basezoom, sqlite3 *outdb, const char *outdir, double droprate, int buffer, const char *fname, FILE **geomfile, int file_minzoom, int file_maxzoom, double todo, char *geomstart, long long along, double gamma, int nlayers);
int traverse_zooms(int *geomfd, off_t *geom_size, char *metabase, char *stringpool, unsigned *midx, unsigned *midy, int &maxzoom, int minzoom, sqlite3 *outdb, const char *outdir, int buffer, const char *fname, const char *tmpdir, double gamma, int full_detail, int low_detail, int min_detail, long long *meta_off, long long *pool_off, unsigned *initial_x, unsigned *initial_y, double simplification, std::vector<std::map<std::string, layermap_entry> > &layermap, const char *prefilter, const char *postfilter, std::map<std::string, attribute_op> const *attribute_accum);
int traverse_zooms(int *geomfd, off_t *geom_size, char *metabase, char *stringpool, std::atomic<unsigned> *midx, std::atomic<unsigned> *midy, int &maxzoom, int minzoom, sqlite3 *outdb, const char *outdir, int buffer, const char *fname, const char *tmpdir, double gamma, int full_detail, int low_detail, int min_detail, long long *meta_off, long long *pool_off, unsigned *initial_x, unsigned *initial_y, double simplification, std::vector<std::map<std::string, layermap_entry> > &layermap, const char *prefilter, const char *postfilter, std::map<std::string, attribute_op> const *attribute_accum);
int manage_gap(unsigned long long index, unsigned long long *previndex, double scale, double gamma, double *gap);