mirror of
https://github.com/mapbox/tippecanoe.git
synced 2025-04-08 11:34:12 +00:00
FlatGeobuf multithreading [#2]
This commit is contained in:
parent
245570feff
commit
6e6cd29399
255
flatgeobuf.cpp
255
flatgeobuf.cpp
@ -5,6 +5,7 @@
|
||||
#include "flatgeobuf/feature_generated.h"
|
||||
#include "flatgeobuf/header_generated.h"
|
||||
#include "milo/dtoa_milo.h"
|
||||
#include "main.hpp"
|
||||
|
||||
using namespace std;
|
||||
|
||||
@ -93,22 +94,8 @@ drawvec readGeometry(const FlatGeobuf::Geometry *geometry, FlatGeobuf::GeometryT
|
||||
}
|
||||
}
|
||||
|
||||
void parse_flatgeobuf(std::vector<struct serialization_state> *sst, const char *src, size_t len, int layer, std::string layername) {
|
||||
auto header_size = flatbuffers::GetPrefixedSize((const uint8_t *)src + 8);
|
||||
auto header = FlatGeobuf::GetSizePrefixedHeader(src + 8);
|
||||
auto features_count = header->features_count();
|
||||
auto node_size = header->index_node_size();
|
||||
|
||||
|
||||
vector<string> h_column_names;
|
||||
vector<FlatGeobuf::ColumnType> h_column_types;
|
||||
for (size_t i = 0; i < header->columns()->size(); i++) {
|
||||
h_column_names.push_back(header->columns()->Get(i)->name()->c_str());
|
||||
h_column_types.push_back(header->columns()->Get(i)->type());
|
||||
}
|
||||
|
||||
|
||||
auto h_geometry_type = header->geometry_type();
|
||||
void readFeature(const FlatGeobuf::Feature *feature, FlatGeobuf::GeometryType h_geometry_type, const vector<string> &h_column_names, const vector<FlatGeobuf::ColumnType> &h_column_types, struct serialization_state *sst, int layer, std::string layername) {
|
||||
drawvec dv = readGeometry(feature->geometry(), h_geometry_type);
|
||||
|
||||
int drawvec_type = -1;
|
||||
|
||||
@ -132,6 +119,175 @@ void parse_flatgeobuf(std::vector<struct serialization_state> *sst, const char *
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
|
||||
serial_feature sf;
|
||||
|
||||
sf.layer = layer;
|
||||
sf.layername = layername;
|
||||
sf.segment = sst->segment;
|
||||
sf.has_id = false;
|
||||
sf.has_tippecanoe_minzoom = false;
|
||||
sf.has_tippecanoe_maxzoom = false;
|
||||
sf.feature_minzoom = false;
|
||||
sf.seq = (*sst->layer_seq);
|
||||
sf.geometry = dv;
|
||||
sf.t = drawvec_type;
|
||||
|
||||
vector<string> full_keys;
|
||||
vector<serial_val> full_values;
|
||||
|
||||
// assume tabular schema with columns in header
|
||||
size_t p_pos = 0;
|
||||
while (p_pos < feature->properties()->size()) {
|
||||
uint16_t col_idx;
|
||||
memcpy(&col_idx, feature->properties()->data() + p_pos, sizeof(col_idx));
|
||||
|
||||
FlatGeobuf::ColumnType col_type = h_column_types[col_idx];
|
||||
|
||||
serial_val sv;
|
||||
if (col_type == FlatGeobuf::ColumnType::Int) {
|
||||
sv.type = mvt_double; // this is quirky
|
||||
int32_t int_val;
|
||||
memcpy(&int_val, feature->properties()->data() + p_pos + 2, sizeof(int_val));
|
||||
sv.s = to_string(int_val);
|
||||
p_pos += 2 + sizeof(int_val);
|
||||
} else if (col_type == FlatGeobuf::ColumnType::Long) {
|
||||
sv.type = mvt_double; // this is quirky
|
||||
uint64_t long_val;
|
||||
memcpy(&long_val, feature->properties()->data() + p_pos + 2, sizeof(long_val));
|
||||
sv.s = to_string(long_val);
|
||||
p_pos += 2 + sizeof(long_val);
|
||||
} else if (col_type == FlatGeobuf::ColumnType::Double) {
|
||||
sv.type = mvt_double;
|
||||
double double_val;
|
||||
memcpy(&double_val, feature->properties()->data() + p_pos + 2, sizeof(double_val));
|
||||
sv.s = milo::dtoa_milo(double_val);
|
||||
p_pos += 2 + sizeof(double_val);
|
||||
} else if (col_type == FlatGeobuf::ColumnType::String) {
|
||||
sv.type = mvt_string;
|
||||
uint32_t str_len;
|
||||
memcpy(&str_len, feature->properties()->data() + p_pos + 2, sizeof(str_len));
|
||||
string s{reinterpret_cast<const char*>(feature->properties()->data() + p_pos + 2 + 4), str_len};
|
||||
sv.s = s;
|
||||
p_pos += 2 + 4 + str_len;
|
||||
} else {
|
||||
fprintf(stderr, "flatgeobuf has unsupported column type %u\n", (unsigned int)col_type);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
full_keys.push_back(h_column_names[col_idx]);
|
||||
full_values.push_back(sv);
|
||||
}
|
||||
|
||||
sf.full_keys = full_keys;
|
||||
sf.full_values = full_values;
|
||||
|
||||
serialize_feature(sst, sf);
|
||||
}
|
||||
|
||||
struct queued_feature {
|
||||
const FlatGeobuf::Feature *feature = NULL;
|
||||
FlatGeobuf::GeometryType h_geometry_type = FlatGeobuf::GeometryType::Unknown;
|
||||
const vector<std::string> *h_column_names = NULL;
|
||||
const vector<FlatGeobuf::ColumnType> *h_column_types = NULL;
|
||||
vector<struct serialization_state> *sst = NULL;
|
||||
int layer = 0;
|
||||
std::string layername = "";
|
||||
};
|
||||
|
||||
static std::vector<queued_feature> feature_queue;
|
||||
|
||||
struct queue_run_arg {
|
||||
size_t start;
|
||||
size_t end;
|
||||
size_t segment;
|
||||
|
||||
queue_run_arg(size_t start1, size_t end1, size_t segment1)
|
||||
: start(start1), end(end1), segment(segment1) {
|
||||
}
|
||||
};
|
||||
|
||||
void *fgb_run_parse_feature(void *v) {
|
||||
struct queue_run_arg *qra = (struct queue_run_arg *) v;
|
||||
|
||||
for (size_t i = qra->start; i < qra->end; i++) {
|
||||
struct queued_feature &qf = feature_queue[i];
|
||||
readFeature(qf.feature, qf.h_geometry_type, *qf.h_column_names, *qf.h_column_types, &(*qf.sst)[qra->segment], qf.layer, qf.layername);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void fgbRunQueue() {
|
||||
if (feature_queue.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<struct queue_run_arg> qra;
|
||||
|
||||
std::vector<pthread_t> pthreads;
|
||||
pthreads.resize(CPUS);
|
||||
|
||||
for (size_t i = 0; i < CPUS; i++) {
|
||||
*((*(feature_queue[0].sst))[i].layer_seq) = *((*(feature_queue[0].sst))[0].layer_seq) + feature_queue.size() * i / CPUS;
|
||||
|
||||
qra.push_back(queue_run_arg(
|
||||
feature_queue.size() * i / CPUS,
|
||||
feature_queue.size() * (i + 1) / CPUS,
|
||||
i));
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < CPUS; i++) {
|
||||
if (pthread_create(&pthreads[i], NULL, fgb_run_parse_feature, &qra[i]) != 0) {
|
||||
perror("pthread_create");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < CPUS; i++) {
|
||||
void *retval;
|
||||
|
||||
if (pthread_join(pthreads[i], &retval) != 0) {
|
||||
perror("pthread_join");
|
||||
}
|
||||
}
|
||||
|
||||
// Lack of atomicity is OK, since we are single-threaded again here
|
||||
long long was = *((*(feature_queue[0].sst))[CPUS - 1].layer_seq);
|
||||
*((*(feature_queue[0].sst))[0].layer_seq) = was;
|
||||
feature_queue.clear();
|
||||
}
|
||||
|
||||
void queueFeature(const FlatGeobuf::Feature *feature, FlatGeobuf::GeometryType h_geometry_type, const vector<string> &h_column_names, const vector<FlatGeobuf::ColumnType> &h_column_types, std::vector<struct serialization_state> *sst, int layer, string layername) {
|
||||
struct queued_feature qf;
|
||||
qf.feature = feature;
|
||||
qf.h_geometry_type = h_geometry_type;
|
||||
qf.h_column_names = &h_column_names;
|
||||
qf.h_column_types = &h_column_types;
|
||||
qf.sst = sst;
|
||||
qf.layer = layer;
|
||||
qf.layername = layername;
|
||||
|
||||
feature_queue.push_back(qf);
|
||||
|
||||
if (feature_queue.size() > CPUS * 500) {
|
||||
fgbRunQueue();
|
||||
}
|
||||
}
|
||||
|
||||
void parse_flatgeobuf(std::vector<struct serialization_state> *sst, const char *src, size_t len, int layer, std::string layername) {
|
||||
auto header_size = flatbuffers::GetPrefixedSize((const uint8_t *)src + 8);
|
||||
auto header = FlatGeobuf::GetSizePrefixedHeader(src + 8);
|
||||
auto features_count = header->features_count();
|
||||
auto node_size = header->index_node_size();
|
||||
|
||||
vector<string> h_column_names;
|
||||
vector<FlatGeobuf::ColumnType> h_column_types;
|
||||
for (size_t i = 0; i < header->columns()->size(); i++) {
|
||||
h_column_names.push_back(header->columns()->Get(i)->name()->c_str());
|
||||
h_column_types.push_back(header->columns()->Get(i)->type());
|
||||
}
|
||||
|
||||
auto h_geometry_type = header->geometry_type();
|
||||
|
||||
int index_size = 0;
|
||||
if (node_size > 0) {
|
||||
index_size = PackedRTreeSize(features_count,node_size);
|
||||
@ -139,75 +295,10 @@ void parse_flatgeobuf(std::vector<struct serialization_state> *sst, const char *
|
||||
const char* start = src + 8 + 4 + header_size + index_size;
|
||||
|
||||
while (start < src + len) {
|
||||
serial_feature sf;
|
||||
|
||||
auto my_sst = &(*sst)[0];
|
||||
|
||||
auto feature_size = flatbuffers::GetPrefixedSize((const uint8_t *)start);
|
||||
auto feature = FlatGeobuf::GetSizePrefixedFeature(start);
|
||||
|
||||
drawvec dv = readGeometry(feature->geometry(), h_geometry_type);
|
||||
|
||||
sf.layer = layer;
|
||||
sf.layername = layername;
|
||||
sf.segment = my_sst->segment;
|
||||
sf.has_id = false;
|
||||
sf.has_tippecanoe_minzoom = false;
|
||||
sf.has_tippecanoe_maxzoom = false;
|
||||
sf.feature_minzoom = false;
|
||||
sf.seq = (*my_sst->layer_seq);
|
||||
sf.geometry = dv;
|
||||
sf.t = drawvec_type;
|
||||
|
||||
vector<string> full_keys;
|
||||
vector<serial_val> full_values;
|
||||
|
||||
// assume tabular schema with columns in header
|
||||
size_t p_pos = 0;
|
||||
while (p_pos < feature->properties()->size()) {
|
||||
uint16_t col_idx;
|
||||
memcpy(&col_idx, feature->properties()->data() + p_pos, sizeof(col_idx));
|
||||
|
||||
FlatGeobuf::ColumnType col_type = h_column_types[col_idx];
|
||||
|
||||
serial_val sv;
|
||||
if (col_type == FlatGeobuf::ColumnType::Int) {
|
||||
sv.type = mvt_double; // this is quirky
|
||||
int32_t int_val;
|
||||
memcpy(&int_val, feature->properties()->data() + p_pos + 2, sizeof(int_val));
|
||||
sv.s = to_string(int_val);
|
||||
p_pos += 2 + sizeof(int_val);
|
||||
} else if (col_type == FlatGeobuf::ColumnType::Long) {
|
||||
sv.type = mvt_double; // this is quirky
|
||||
uint64_t long_val;
|
||||
memcpy(&long_val, feature->properties()->data() + p_pos + 2, sizeof(long_val));
|
||||
sv.s = to_string(long_val);
|
||||
p_pos += 2 + sizeof(long_val);
|
||||
} else if (col_type == FlatGeobuf::ColumnType::Double) {
|
||||
sv.type = mvt_double;
|
||||
double double_val;
|
||||
memcpy(&double_val, feature->properties()->data() + p_pos + 2, sizeof(double_val));
|
||||
sv.s = milo::dtoa_milo(double_val);
|
||||
p_pos += 2 + sizeof(double_val);
|
||||
} else if (col_type == FlatGeobuf::ColumnType::String) {
|
||||
sv.type = mvt_string;
|
||||
uint32_t str_len;
|
||||
memcpy(&str_len, feature->properties()->data() + p_pos + 2, sizeof(str_len));
|
||||
string s{reinterpret_cast<const char*>(feature->properties()->data() + p_pos + 2 + 4), str_len};
|
||||
sv.s = s;
|
||||
p_pos += 2 + 4 + str_len;
|
||||
} else {
|
||||
fprintf(stderr, "flatgeobuf has unsupported column type %u\n", (unsigned int)col_type);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
full_keys.push_back(h_column_names[col_idx]);
|
||||
full_values.push_back(sv);
|
||||
}
|
||||
|
||||
sf.full_keys = full_keys;
|
||||
sf.full_values = full_values;
|
||||
|
||||
serialize_feature(my_sst, sf);
|
||||
queueFeature(feature, h_geometry_type, h_column_names, h_column_types, sst, layer, layername);
|
||||
|
||||
start += 4 + feature_size;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user