Put the pieces back together

This commit is contained in:
Eric Fischer 2017-08-23 11:43:48 -07:00
parent 6cea2d5db6
commit 6caf20b9c8
4 changed files with 78 additions and 102 deletions

View File

@ -326,6 +326,8 @@ int serialize_geometry(struct serialization_state *sst, json_object *geometry, j
}
int serialize_feature(struct serialization_state *sst, serial_feature &sf, bool want_dist, bool filters, int maxzoom, bool uses_gamma) {
struct reader *r = &(sst->readers[sst->segment]);
if (want_dist) {
std::vector<unsigned long long> locs;
for (size_t i = 0; i < sf.geometry.size(); i++) {
@ -426,39 +428,39 @@ int serialize_feature(struct serialization_state *sst, serial_feature &sf, bool
if (inline_meta) {
sf.metapos = -1;
for (size_t i = 0; i < sf.full_keys.size(); i++) {
sf.keys.push_back(addpool(sst->poolfile, sst->treefile, sf.full_keys[i].c_str(), mvt_string));
sf.values.push_back(addpool(sst->poolfile, sst->treefile, sf.full_values[i].s.c_str(), sf.full_values[i].type));
sf.keys.push_back(addpool(r->poolfile, r->treefile, sf.full_keys[i].c_str(), mvt_string));
sf.values.push_back(addpool(r->poolfile, r->treefile, sf.full_values[i].s.c_str(), sf.full_values[i].type));
}
} else {
sf.metapos = *(sst->metapos);
sf.metapos = r->metapos;
for (size_t i = 0; i < sf.full_keys.size(); i++) {
serialize_long_long(sst->metafile, addpool(sst->poolfile, sst->treefile, sf.full_keys[i].c_str(), mvt_string), sst->metapos, sst->fname);
serialize_long_long(sst->metafile, addpool(sst->poolfile, sst->treefile, sf.full_values[i].s.c_str(), sf.full_values[i].type), sst->metapos, sst->fname);
serialize_long_long(r->metafile, addpool(r->poolfile, r->treefile, sf.full_keys[i].c_str(), mvt_string), &r->metapos, sst->fname);
serialize_long_long(r->metafile, addpool(r->poolfile, r->treefile, sf.full_values[i].s.c_str(), sf.full_values[i].type), &r->metapos, sst->fname);
}
}
long long geomstart = *(sst->geompos);
serialize_feature(sst->geomfile, &sf, sst->geompos, sst->fname, *(sst->initial_x) >> geometry_scale, *(sst->initial_y) >> geometry_scale, false);
long long geomstart = r->geompos;
serialize_feature(r->geomfile, &sf, &r->geompos, sst->fname, *(sst->initial_x) >> geometry_scale, *(sst->initial_y) >> geometry_scale, false);
struct index index;
index.start = geomstart;
index.end = *(sst->geompos);
index.end = r->geompos;
index.segment = sst->segment;
index.seq = *(sst->layer_seq);
index.t = sf.t;
index.index = bbox_index;
fwrite_check(&index, sizeof(struct index), 1, sst->indexfile, sst->fname);
*(sst->indexpos) += sizeof(struct index);
fwrite_check(&index, sizeof(struct index), 1, r->indexfile, sst->fname);
r->indexpos += sizeof(struct index);
for (size_t i = 0; i < 2; i++) {
if (sf.bbox[i] < sst->file_bbox[i]) {
sst->file_bbox[i] = sf.bbox[i];
if (sf.bbox[i] < r->file_bbox[i]) {
r->file_bbox[i] = sf.bbox[i];
}
}
for (size_t i = 2; i < 4; i++) {
if (sf.bbox[i] > sst->file_bbox[i]) {
sst->file_bbox[i] = sf.bbox[i];
if (sf.bbox[i] > r->file_bbox[i]) {
r->file_bbox[i] = sf.bbox[i];
}
}
@ -616,7 +618,7 @@ void parse_json(struct serialization_state *sst, json_pull *jp, std::set<std::st
void *run_parse_json(void *v) {
struct parse_json_args *pja = (struct parse_json_args *) v;
parse_json(pja->jp, pja->reading, pja->layer_seq, pja->progress_seq, pja->metapos, pja->geompos, pja->indexpos, pja->exclude, pja->include, pja->exclude_all, pja->metafile, pja->geomfile, pja->indexfile, pja->poolfile, pja->treefile, pja->fname, pja->basezoom, pja->layer, pja->droprate, pja->file_bbox, pja->segment, pja->initialized, pja->initial_x, pja->initial_y, pja->readers, pja->maxzoom, pja->layermap, *pja->layername, pja->uses_gamma, pja->attribute_types, pja->dist_sum, pja->dist_count, pja->want_dist, pja->filters);
parse_json(pja->sst, pja->jp, pja->exclude, pja->include, pja->exclude_all, pja->basezoom, pja->layer, pja->droprate, pja->maxzoom, pja->layermap, *pja->layername, pja->uses_gamma, pja->attribute_types, pja->want_dist, pja->filters);
return NULL;
}

View File

@ -7,48 +7,31 @@
#include <string>
#include "mbtiles.hpp"
#include "jsonpull/jsonpull.h"
#include "serial.hpp"
struct parse_json_args {
json_pull *jp;
const char *reading;
volatile long long *layer_seq;
volatile long long *progress_seq;
long long *metapos;
long long *geompos;
long long *indexpos;
std::set<std::string> *exclude;
std::set<std::string> *include;
int exclude_all;
FILE *metafile;
FILE *geomfile;
FILE *indexfile;
struct memfile *poolfile;
struct memfile *treefile;
char *fname;
int basezoom;
int layer;
double droprate;
long long *file_bbox;
int segment;
int *initialized;
unsigned *initial_x;
unsigned *initial_y;
struct reader *readers;
int maxzoom;
std::map<std::string, layermap_entry> *layermap;
std::string *layername;
bool uses_gamma;
std::map<std::string, int> const *attribute_types;
double *dist_sum;
size_t *dist_count;
bool want_dist;
bool filters;
struct serialization_state *sst;
};
struct json_pull *json_begin_map(char *map, long long len);
void json_end_map(struct json_pull *jp);
void parse_json(json_pull *jp, const char *reading, volatile long long *layer_seq, volatile long long *progress_seq, long long *metapos, long long *geompos, long long *indexpos, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, FILE *metafile, FILE *geomfile, FILE *indexfile, struct memfile *poolfile, struct memfile *treefile, char *fname, int basezoom, int layer, double droprate, long long *file_bbox, int segment, int *initialized, unsigned *initial_x, unsigned *initial_y, struct reader *readers, int maxzoom, std::map<std::string, layermap_entry> *layermap, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, double *dist_sum, size_t *dist_count, bool want_dist, bool filters);
void parse_json(struct serialization_state *sst, json_pull *jp, std::set<std::string> *exclude, std::set<std::string> *include, int exclude_all, int basezoom, int layer, double droprate, int maxzoom, std::map<std::string, layermap_entry> *layermap, std::string layername, bool uses_gamma, std::map<std::string, int> const *attribute_types, bool want_dist, bool filters);
void *run_parse_json(void *v);
#endif

View File

@ -79,31 +79,6 @@ size_t TEMP_FILES;
long long MAX_FILES;
static long long diskfree;
struct reader {
int metafd;
int poolfd;
int treefd;
int geomfd;
int indexfd;
FILE *metafile;
struct memfile *poolfile;
struct memfile *treefile;
FILE *geomfile;
FILE *indexfile;
long long metapos;
long long geompos;
long long indexpos;
long long file_bbox[4];
struct stat geomst;
struct stat metast;
char *geom_map;
};
void checkdisk(struct reader *r, int nreader) {
long long used = 0;
int i;
@ -399,6 +374,7 @@ void do_read_parallel(char *map, long long len, long long initial_offset, const
}
struct parse_json_args pja[CPUS];
struct serialization_state sst[CPUS];
pthread_t pthreads[CPUS];
std::vector<std::set<type_and_string> > file_subkeys;
@ -407,41 +383,35 @@ void do_read_parallel(char *map, long long len, long long initial_offset, const
}
for (size_t i = 0; i < CPUS; i++) {
sst[i].fname = reading;
sst[i].line = 0;
sst[i].layer_seq = &layer_seq[i];
sst[i].progress_seq = progress_seq;
sst[i].readers = reader;
sst[i].segment = i;
sst[i].initialized = &initialized[i];
sst[i].initial_x = &initial_x[i];
sst[i].initial_y = &initial_y[i];
sst[i].dist_sum = &(dist_sums[i]);
sst[i].dist_count = &(dist_counts[i]);
pja[i].jp = json_begin_map(map + segs[i], segs[i + 1] - segs[i]);
pja[i].reading = reading;
pja[i].layer_seq = &layer_seq[i];
pja[i].progress_seq = progress_seq;
pja[i].metapos = &reader[i].metapos;
pja[i].geompos = &reader[i].geompos;
pja[i].indexpos = &reader[i].indexpos;
pja[i].exclude = exclude;
pja[i].include = include;
pja[i].exclude_all = exclude_all;
pja[i].metafile = reader[i].metafile;
pja[i].geomfile = reader[i].geomfile;
pja[i].indexfile = reader[i].indexfile;
pja[i].poolfile = reader[i].poolfile;
pja[i].treefile = reader[i].treefile;
pja[i].fname = fname;
pja[i].basezoom = basezoom;
pja[i].layer = source;
pja[i].droprate = droprate;
pja[i].file_bbox = reader[i].file_bbox;
pja[i].segment = i;
pja[i].initialized = &initialized[i];
pja[i].initial_x = &initial_x[i];
pja[i].initial_y = &initial_y[i];
pja[i].readers = reader;
pja[i].maxzoom = maxzoom;
pja[i].layermap = &(*layermaps)[i];
pja[i].layername = &layername;
pja[i].uses_gamma = uses_gamma;
pja[i].attribute_types = attribute_types;
pja[i].dist_sum = &(dist_sums[i]);
pja[i].dist_count = &(dist_counts[i]);
pja[i].want_dist = want_dist;
pja[i].filters = filters;
pja[i].sst = &sst[i];
if (pthread_create(&pthreads[i], NULL, run_parse_json, &pja[i]) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
@ -1386,20 +1356,29 @@ int read_input(std::vector<source> &sources, char *fname, int maxzoom, int minzo
overall_offset += ahead;
checkdisk(reader, CPUS);
}
} else if (c == '{' || c == 0xEF || c == ' ') { // XXX
} else {
// Plain serial reading
long long layer_seq = overall_offset;
json_pull *jp = json_begin_file(fp);
parse_json(jp, reading.c_str(), &layer_seq, &progress_seq, &reader[0].metapos, &reader[0].geompos, &reader[0].indexpos, exclude, include, exclude_all, reader[0].metafile, reader[0].geomfile, reader[0].indexfile, reader[0].poolfile, reader[0].treefile, fname, basezoom, layer, droprate, reader[0].file_bbox, 0, &initialized[0], &initial_x[0], &initial_y[0], reader, maxzoom, &layermaps[0], sources[layer].layer, uses_gamma, attribute_types, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL);
struct serialization_state sst;
sst.fname = reading.c_str();
sst.line = 0;
sst.layer_seq = &layer_seq;
sst.progress_seq = &progress_seq;
sst.readers = reader;
sst.segment = 0;
sst.initial_x = &initial_x[0];
sst.initial_y = &initial_y[0];
sst.initialized = &initialized[0];
sst.dist_sum = &dist_sum;
sst.dist_count = &dist_count;
parse_json(&sst, jp, exclude, include, exclude_all, basezoom, layer, droprate, maxzoom, &layermaps[0], sources[layer].layer, uses_gamma, attribute_types, guess_maxzoom, prefilter != NULL || postfilter != NULL);
json_end(jp);
overall_offset = layer_seq;
checkdisk(reader, CPUS);
} else {
long long layer_seq = overall_offset;
parse_geobuf(fp, reading.c_str(), &layer_seq, &progress_seq, &reader[0].metapos, &reader[0].geompos, &reader[0].indexpos, exclude, include, exclude_all, reader[0].metafile, reader[0].geomfile, reader[0].indexfile, reader[0].poolfile, reader[0].treefile, fname, basezoom, layer, droprate, reader[0].file_bbox, 0, &initialized[0], &initial_x[0], &initial_y[0], reader, maxzoom, &layermaps[0], sources[layer].layer, uses_gamma, attribute_types, &dist_sum, &dist_count, guess_maxzoom);
overall_offset = layer_seq;
checkdisk(reader, CPUS);
}
if (fclose(fp) != 0) {

View File

@ -4,6 +4,7 @@
#include <stddef.h>
#include <stdio.h>
#include <vector>
#include <sys/stat.h>
#include "geometry.hpp"
size_t fwrite_check(const void *ptr, size_t size, size_t nitems, FILE *stream, const char *fname);
@ -67,6 +68,31 @@ struct serial_feature {
void serialize_feature(FILE *geomfile, serial_feature *sf, long long *geompos, const char *fname, long long wx, long long wy, bool include_minzoom);
serial_feature deserialize_feature(FILE *geoms, long long *geompos_in, char *metabase, long long *meta_off, unsigned z, unsigned tx, unsigned ty, unsigned *initial_x, unsigned *initial_y);
struct reader {
int metafd;
int poolfd;
int treefd;
int geomfd;
int indexfd;
FILE *metafile;
struct memfile *poolfile;
struct memfile *treefile;
FILE *geomfile;
FILE *indexfile;
long long metapos;
long long geompos;
long long indexpos;
long long file_bbox[4];
struct stat geomst;
struct stat metast;
char *geom_map;
};
struct serialization_state {
const char *fname; // source file name
int line; // user-oriented location within source for error reports
@ -77,20 +103,6 @@ struct serialization_state {
struct reader *readers; // array of data for each input thread
int segment; // the current input thread
FILE *geomfile; // main feature serialization; references metafile or poolfile
long long *geompos;
FILE *metafile; // feature metadata; references poolfile
long long *metapos;
FILE *indexfile; // quadkey index into geomfile
long long *indexpos;
struct memfile *poolfile; // string pool for keys and values
struct memfile *treefile; // index into poolfile
long long file_bbox[4]; // global bounding box
unsigned *initial_x; // relative offset of all geometries
unsigned *initial_y;
int *initialized;