Actually create threads and hand the tiling tasks off to them

This commit is contained in:
Eric Fischer 2015-10-19 12:32:40 -07:00
parent 10bd608b9e
commit 5d014e040f

99
tile.cc
View File

@ -15,6 +15,7 @@
#include <sys/mman.h>
#include <math.h>
#include <sqlite3.h>
#include <pthread.h>
#include "vector_tile.pb.h"
#include "geometry.hh"
@ -31,6 +32,8 @@ extern "C" {
#define XSTRINGIFY(s) STRINGIFY(s)
#define STRINGIFY(s) #s
pthread_mutex_t db_lock = PTHREAD_MUTEX_INITIALIZER;
// https://github.com/mapbox/mapnik-vector-tile/blob/master/src/vector_tile_compression.hpp
static inline int compress(std::string const &input, std::string &output) {
z_stream deflate_s;
@ -841,7 +844,18 @@ long long write_tile(char **geoms, char *metabase, char *stringpool, unsigned *f
line_detail++; // to keep it the same when the loop decrements it
}
} else {
if (pthread_mutex_lock(&db_lock) != 0) {
perror("pthread_mutex_lock");
exit(EXIT_FAILURE);
}
mbtiles_write_tile(outdb, z, tx, ty, compressed.data(), compressed.size());
if (pthread_mutex_unlock(&db_lock) != 0) {
perror("pthread_mutex_unlock");
exit(EXIT_FAILURE);
}
return count;
}
} else {
@ -898,7 +912,8 @@ struct write_tile_args {
long long *most;
};
void run_thread(write_tile_args *arg) {
void *run_thread(void *vargs) {
write_tile_args *arg = (write_tile_args *) vargs;
struct task *task;
for (task = arg->tasks; task != NULL; task = task->next) {
@ -936,7 +951,7 @@ void run_thread(write_tile_args *arg) {
long long len = write_tile(&geom, arg->metabase, arg->stringpool, arg->file_bbox, z, x, y, z == arg->maxzoom ? arg->full_detail : arg->low_detail, arg->min_detail, arg->maxzoom, arg->file_keys, arg->layernames, arg->outdb, arg->droprate, arg->buffer, arg->fname, arg->geomfile, arg->minzoom, arg->maxzoom, arg->todo, geomstart, *arg->along, arg->gamma, arg->nlayers, arg->prevent, arg->additional, arg->child_shards);
if (len < 0) {
return; // XXX how to report errors from threads?
return NULL; // XXX how to report errors from threads?
// return z - 1;
}
@ -952,6 +967,8 @@ void run_thread(write_tile_args *arg) {
}
*arg->along += arg->geom_size[j];
}
return NULL;
}
int traverse_zooms(int *geomfd, off_t *geom_size, char *metabase, char *stringpool, unsigned *file_bbox, struct pool **file_keys, unsigned *midx, unsigned *midy, char **layernames, int maxzoom, int minzoom, sqlite3 *outdb, double droprate, int buffer, const char *fname, const char *tmpdir, double gamma, int nlayers, char *prevent, char *additional, int full_detail, int low_detail, int min_detail) {
@ -1048,45 +1065,57 @@ int traverse_zooms(int *geomfd, off_t *geom_size, char *metabase, char *stringpo
*d = here;
}
pthread_t pthreads[threads];
write_tile_args args[threads];
int thread;
for (thread = 0; thread < threads; thread++) {
write_tile_args args;
args[thread].metabase = metabase;
args[thread].stringpool = stringpool;
args[thread].file_bbox = file_bbox; // XXX locking
args[thread].min_detail = min_detail;
args[thread].basezoom = maxzoom; // XXX rename?
args[thread].file_keys = file_keys; // XXX locking
args[thread].layernames = layernames;
args[thread].outdb = outdb; // XXX locking
args[thread].droprate = droprate;
args[thread].buffer = buffer;
args[thread].fname = fname;
args[thread].geomfile = sub + thread * (TEMP_FILES / threads);
args[thread].file_minzoom = minzoom;
args[thread].file_maxzoom = maxzoom;
args[thread].todo = todo;
args[thread].along = &along; // XXX locking
args[thread].gamma = gamma;
args[thread].nlayers = nlayers;
args[thread].prevent = prevent;
args[thread].additional = additional;
args[thread].child_shards = TEMP_FILES / threads;
args.metabase = metabase;
args.stringpool = stringpool;
args.file_bbox = file_bbox; // XXX locking
args.min_detail = min_detail;
args.basezoom = maxzoom; // XXX rename?
args.file_keys = file_keys; // XXX locking
args.layernames = layernames;
args.outdb = outdb; // XXX locking
args.droprate = droprate;
args.buffer = buffer;
args.fname = fname;
args.geomfile = sub + thread * (TEMP_FILES / threads);
args.file_minzoom = minzoom;
args.file_maxzoom = maxzoom;
args.todo = todo;
args.along = &along; // XXX locking
args.gamma = gamma;
args.nlayers = nlayers;
args.prevent = prevent;
args.additional = additional;
args.child_shards = TEMP_FILES / threads;
args[thread].geomfd = geomfd;
args[thread].geom_size = geom_size;
args[thread].midx = midx; // XXX locking
args[thread].midy = midy; // XXX locking
args[thread].maxzoom = maxzoom;
args[thread].minzoom = minzoom;
args[thread].full_detail = full_detail;
args[thread].low_detail = low_detail;
args[thread].most = &most; // XXX locking
args.geomfd = geomfd;
args.geom_size = geom_size;
args.midx = midx; // XXX locking
args.midy = midy; // XXX locking
args.maxzoom = maxzoom;
args.minzoom = minzoom;
args.full_detail = full_detail;
args.low_detail = low_detail;
args.most = &most; // XXX locking
args[thread].tasks = dispatches[thread].tasks;
args.tasks = dispatches[thread].tasks;
if (pthread_create(&pthreads[thread], NULL, run_thread, &args[thread]) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}
}
run_thread(&args);
for (thread = 0; thread < threads; thread++) {
void *retval;
if (pthread_join(pthreads[thread], &retval) != 0) {
perror("pthread_join");
}
}
for (j = 0; j < TEMP_FILES; j++) {