Essentials of multithreaded tile-joining

This commit is contained in:
Eric Fischer 2016-09-20 12:53:17 -07:00
parent 021d792d33
commit 87e4a338f6

View File

@ -10,6 +10,7 @@
#include <set>
#include <zlib.h>
#include <math.h>
#include <pthread.h>
#include "mvt.hpp"
#include "projection.hpp"
#include "pool.hpp"
@ -28,7 +29,7 @@ struct stats {
double minlat, minlon, maxlat, maxlon;
};
void handle(std::string message, int z, unsigned x, unsigned y, std::map<std::string, layermap_entry> &layermap, sqlite3 *outdb, std::vector<std::string> &header, std::map<std::string, std::vector<std::string> > &mapping, std::set<std::string> &exclude, int ifmatched, mvt_tile &outtile) {
void handle(std::string message, int z, unsigned x, unsigned y, std::map<std::string, layermap_entry> &layermap, std::vector<std::string> &header, std::map<std::string, std::vector<std::string>> &mapping, std::set<std::string> &exclude, int ifmatched, mvt_tile &outtile) {
mvt_tile tile;
int features_added = 0;
@ -132,7 +133,7 @@ void handle(std::string message, int z, unsigned x, unsigned y, std::map<std::st
}
if (header.size() > 0 && strcmp(key, header[0].c_str()) == 0) {
std::map<std::string, std::vector<std::string> >::iterator ii = mapping.find(value);
std::map<std::string, std::vector<std::string>>::iterator ii = mapping.find(value);
if (ii != mapping.end()) {
std::vector<std::string> fields = ii->second;
@ -299,42 +300,154 @@ struct reader *begin_reading(char *fname) {
return r;
}
void decode(struct reader *readers, char *map, std::map<std::string, layermap_entry> &layermap, sqlite3 *outdb, struct stats *st, std::vector<std::string> &header, std::map<std::string, std::vector<std::string> > &mapping, std::set<std::string> &exclude, int ifmatched, std::string &attribution) {
std::vector<std::map<std::string, layermap_entry> > layermaps;
struct zxy {
long long z;
long long x;
long long y;
zxy(long long _z, long long _x, long long _y) {
z = _z;
x = _x;
y = _y;
}
bool operator<(zxy const &other) const {
if (z < other.z) {
return true;
}
if (z > other.z) {
return false;
}
if (x < other.x) {
return true;
}
if (x > other.x) {
return false;
}
if (y < other.y) {
return true;
}
return false;
}
};
struct arg {
std::map<zxy, std::vector<std::string>> inputs;
std::map<zxy, std::string> outputs;
std::map<std::string, layermap_entry> *layermap;
std::vector<std::string> *header;
std::map<std::string, std::vector<std::string>> *mapping;
std::set<std::string> *exclude;
int ifmatched;
};
void *join_worker(void *v) {
arg *a = (arg *) v;
for (auto ai = a->inputs.begin(); ai != a->inputs.end(); ++ai) {
mvt_tile tile;
for (size_t i = 0; i < ai->second.size(); i++) {
handle(ai->second[i], ai->first.z, ai->first.x, ai->first.y, *(a->layermap), *(a->header), *(a->mapping), *(a->exclude), a->ifmatched, tile);
}
bool anything = false;
for (size_t i = 0; i < tile.layers.size(); i++) {
if (tile.layers[i].features.size() > 0) {
anything = true;
break;
}
}
if (anything) {
std::string compressed = tile.encode();
if (!pk && compressed.size() > 500000) {
fprintf(stderr, "Tile %lld/%lld/%lld size is %lld, >500000. Skipping this tile\n.", ai->first.z, ai->first.x, ai->first.y, (long long) compressed.size());
} else {
a->outputs.insert(std::pair<zxy, std::string>(ai->first, compressed));
}
}
}
return NULL;
}
void handle_tasks(std::map<zxy, std::vector<std::string>> &tasks, std::vector<std::map<std::string, layermap_entry>> &layermaps, sqlite3 *outdb, std::vector<std::string> &header, std::map<std::string, std::vector<std::string>> &mapping, std::set<std::string> &exclude, int ifmatched) {
pthread_t pthreads[CPUS];
std::vector<arg> args;
for (size_t i = 0; i < CPUS; i++) {
args.push_back(arg());
args[i].layermap = &layermaps[i];
args[i].header = &header;
args[i].mapping = &mapping;
args[i].exclude = &exclude;
args[i].ifmatched = ifmatched;
}
size_t count = 0;
// XXX Be more careful to distribute tasks evenly across CPUs
for (auto ai = tasks.begin(); ai != tasks.end(); ++ai) {
args[count].inputs.insert(*ai);
count = (count + 1) % CPUS;
if (ai == tasks.begin()) {
fprintf(stderr, "%lld/%lld/%lld \r", ai->first.z, ai->first.x, ai->first.y);
}
}
for (size_t i = 0; i < CPUS; i++) {
if (pthread_create(&pthreads[i], NULL, join_worker, &args[i]) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}
}
for (int i = 0; i < CPUS; i++) {
void *retval;
if (pthread_join(pthreads[i], &retval) != 0) {
perror("pthread_join");
}
for (auto ai = args[i].outputs.begin(); ai != args[i].outputs.end(); ++ai) {
mbtiles_write_tile(outdb, ai->first.z, ai->first.x, ai->first.y, ai->second.data(), ai->second.size());
}
}
}
void decode(struct reader *readers, char *map, std::map<std::string, layermap_entry> &layermap, sqlite3 *outdb, struct stats *st, std::vector<std::string> &header, std::map<std::string, std::vector<std::string>> &mapping, std::set<std::string> &exclude, int ifmatched, std::string &attribution) {
std::vector<std::map<std::string, layermap_entry>> layermaps;
for (size_t i = 0; i < CPUS; i++) {
layermaps.push_back(std::map<std::string, layermap_entry>());
}
mvt_tile tile;
std::map<zxy, std::vector<std::string>> tasks;
while (readers != NULL && readers->zoom < 32) {
reader *r = readers;
readers = readers->next;
r->next = NULL;
fprintf(stderr, "%lld/%lld/%lld \r", r->zoom, r->x, r->y);
handle(r->data, r->zoom, r->x, r->y, layermaps[0], outdb, header, mapping, exclude, ifmatched, tile);
zxy tile = zxy(r->zoom, r->x, r->y);
if (tasks.count(tile) == 0) {
tasks.insert(std::pair<zxy, std::vector<std::string>>(tile, std::vector<std::string>()));
}
auto f = tasks.find(tile);
f->second.push_back(r->data);
if (readers == NULL || readers->zoom != r->zoom || readers->x != r->x || readers->y != r->y) {
bool anything = false;
for (size_t i = 0; i < tile.layers.size(); i++) {
if (tile.layers[i].features.size() > 0) {
anything = true;
break;
}
if (tasks.size() > 100 * CPUS) {
handle_tasks(tasks, layermaps, outdb, header, mapping, exclude, ifmatched);
tasks.clear();
}
if (anything) {
std::string compressed = tile.encode();
if (!pk && compressed.size() > 500000) {
fprintf(stderr, "Tile %lld/%lld/%lld size is %lld, >500000. Skipping this tile\n.", r->zoom, r->x, r->y, (long long) compressed.size());
} else {
mbtiles_write_tile(outdb, r->zoom, r->x, r->y, compressed.data(), compressed.size());
}
}
tile = mvt_tile();
}
if (sqlite3_step(r->stmt) == SQLITE_ROW) {
@ -363,6 +476,7 @@ void decode(struct reader *readers, char *map, std::map<std::string, layermap_en
*rr = r;
}
handle_tasks(tasks, layermaps, outdb, header, mapping, exclude, ifmatched);
layermap = merge_layermaps(layermaps);
struct reader *next;
@ -468,7 +582,7 @@ std::string dequote(std::string s) {
return out;
}
void readcsv(char *fn, std::vector<std::string> &header, std::map<std::string, std::vector<std::string> > &mapping) {
void readcsv(char *fn, std::vector<std::string> &header, std::map<std::string, std::vector<std::string>> &mapping) {
FILE *f = fopen(fn, "r");
if (f == NULL) {
perror(fn);
@ -491,7 +605,7 @@ void readcsv(char *fn, std::vector<std::string> &header, std::map<std::string, s
for (size_t i = 0; i < line.size() && i < header.size(); i++) {
// printf("putting %s\n", line[0].c_str());
mapping.insert(std::pair<std::string, std::vector<std::string> >(line[0], line));
mapping.insert(std::pair<std::string, std::vector<std::string>>(line[0], line));
}
}
@ -510,7 +624,7 @@ int main(int argc, char **argv) {
}
std::vector<std::string> header;
std::map<std::string, std::vector<std::string> > mapping;
std::map<std::string, std::vector<std::string>> mapping;
std::set<std::string> exclude;