From 6caf20b9c8193fe134346f48196b6a3c421ac1a7 Mon Sep 17 00:00:00 2001 From: Eric Fischer Date: Wed, 23 Aug 2017 11:43:48 -0700 Subject: [PATCH] Put the pieces back together --- geojson.cpp | 32 +++++++++++---------- geojson.hpp | 25 +++------------- main.cpp | 83 ++++++++++++++++++++--------------------------------- serial.hpp | 40 +++++++++++++++++--------- 4 files changed, 78 insertions(+), 102 deletions(-) diff --git a/geojson.cpp b/geojson.cpp index 355379d..cf32e50 100644 --- a/geojson.cpp +++ b/geojson.cpp @@ -326,6 +326,8 @@ int serialize_geometry(struct serialization_state *sst, json_object *geometry, j } int serialize_feature(struct serialization_state *sst, serial_feature &sf, bool want_dist, bool filters, int maxzoom, bool uses_gamma) { + struct reader *r = &(sst->readers[sst->segment]); + if (want_dist) { std::vector locs; for (size_t i = 0; i < sf.geometry.size(); i++) { @@ -426,39 +428,39 @@ int serialize_feature(struct serialization_state *sst, serial_feature &sf, bool if (inline_meta) { sf.metapos = -1; for (size_t i = 0; i < sf.full_keys.size(); i++) { - sf.keys.push_back(addpool(sst->poolfile, sst->treefile, sf.full_keys[i].c_str(), mvt_string)); - sf.values.push_back(addpool(sst->poolfile, sst->treefile, sf.full_values[i].s.c_str(), sf.full_values[i].type)); + sf.keys.push_back(addpool(r->poolfile, r->treefile, sf.full_keys[i].c_str(), mvt_string)); + sf.values.push_back(addpool(r->poolfile, r->treefile, sf.full_values[i].s.c_str(), sf.full_values[i].type)); } } else { - sf.metapos = *(sst->metapos); + sf.metapos = r->metapos; for (size_t i = 0; i < sf.full_keys.size(); i++) { - serialize_long_long(sst->metafile, addpool(sst->poolfile, sst->treefile, sf.full_keys[i].c_str(), mvt_string), sst->metapos, sst->fname); - serialize_long_long(sst->metafile, addpool(sst->poolfile, sst->treefile, sf.full_values[i].s.c_str(), sf.full_values[i].type), sst->metapos, sst->fname); + serialize_long_long(r->metafile, addpool(r->poolfile, r->treefile, sf.full_keys[i].c_str(), mvt_string), &r->metapos, sst->fname); + serialize_long_long(r->metafile, addpool(r->poolfile, r->treefile, sf.full_values[i].s.c_str(), sf.full_values[i].type), &r->metapos, sst->fname); } } - long long geomstart = *(sst->geompos); - serialize_feature(sst->geomfile, &sf, sst->geompos, sst->fname, *(sst->initial_x) >> geometry_scale, *(sst->initial_y) >> geometry_scale, false); + long long geomstart = r->geompos; + serialize_feature(r->geomfile, &sf, &r->geompos, sst->fname, *(sst->initial_x) >> geometry_scale, *(sst->initial_y) >> geometry_scale, false); struct index index; index.start = geomstart; - index.end = *(sst->geompos); + index.end = r->geompos; index.segment = sst->segment; index.seq = *(sst->layer_seq); index.t = sf.t; index.index = bbox_index; - fwrite_check(&index, sizeof(struct index), 1, sst->indexfile, sst->fname); - *(sst->indexpos) += sizeof(struct index); + fwrite_check(&index, sizeof(struct index), 1, r->indexfile, sst->fname); + r->indexpos += sizeof(struct index); for (size_t i = 0; i < 2; i++) { - if (sf.bbox[i] < sst->file_bbox[i]) { - sst->file_bbox[i] = sf.bbox[i]; + if (sf.bbox[i] < r->file_bbox[i]) { + r->file_bbox[i] = sf.bbox[i]; } } for (size_t i = 2; i < 4; i++) { - if (sf.bbox[i] > sst->file_bbox[i]) { - sst->file_bbox[i] = sf.bbox[i]; + if (sf.bbox[i] > r->file_bbox[i]) { + r->file_bbox[i] = sf.bbox[i]; } } @@ -616,7 +618,7 @@ void parse_json(struct serialization_state *sst, json_pull *jp, std::setjp, pja->reading, pja->layer_seq, pja->progress_seq, pja->metapos, pja->geompos, pja->indexpos, pja->exclude, pja->include, pja->exclude_all, pja->metafile, pja->geomfile, pja->indexfile, pja->poolfile, pja->treefile, pja->fname, pja->basezoom, pja->layer, pja->droprate, pja->file_bbox, pja->segment, pja->initialized, pja->initial_x, pja->initial_y, pja->readers, pja->maxzoom, pja->layermap, *pja->layername, pja->uses_gamma, pja->attribute_types, pja->dist_sum, pja->dist_count, pja->want_dist, pja->filters); + parse_json(pja->sst, pja->jp, pja->exclude, pja->include, pja->exclude_all, pja->basezoom, pja->layer, pja->droprate, pja->maxzoom, pja->layermap, *pja->layername, pja->uses_gamma, pja->attribute_types, pja->want_dist, pja->filters); return NULL; } diff --git a/geojson.hpp b/geojson.hpp index 7107b1d..c7e06db 100644 --- a/geojson.hpp +++ b/geojson.hpp @@ -7,48 +7,31 @@ #include #include "mbtiles.hpp" #include "jsonpull/jsonpull.h" +#include "serial.hpp" struct parse_json_args { json_pull *jp; - const char *reading; - volatile long long *layer_seq; - volatile long long *progress_seq; - long long *metapos; - long long *geompos; - long long *indexpos; std::set *exclude; std::set *include; int exclude_all; - FILE *metafile; - FILE *geomfile; - FILE *indexfile; - struct memfile *poolfile; - struct memfile *treefile; - char *fname; int basezoom; int layer; double droprate; - long long *file_bbox; - int segment; - int *initialized; - unsigned *initial_x; - unsigned *initial_y; - struct reader *readers; int maxzoom; std::map *layermap; std::string *layername; bool uses_gamma; std::map const *attribute_types; - double *dist_sum; - size_t *dist_count; bool want_dist; bool filters; + + struct serialization_state *sst; }; struct json_pull *json_begin_map(char *map, long long len); void json_end_map(struct json_pull *jp); -void parse_json(json_pull *jp, const char *reading, volatile long long *layer_seq, volatile long long *progress_seq, long long *metapos, long long *geompos, long long *indexpos, std::set *exclude, std::set *include, int exclude_all, FILE *metafile, FILE *geomfile, FILE *indexfile, struct memfile *poolfile, struct memfile *treefile, char *fname, int basezoom, int layer, double droprate, long long *file_bbox, int segment, int *initialized, unsigned *initial_x, unsigned *initial_y, struct reader *readers, int maxzoom, std::map *layermap, std::string layername, bool uses_gamma, std::map const *attribute_types, double *dist_sum, size_t *dist_count, bool want_dist, bool filters); +void parse_json(struct serialization_state *sst, json_pull *jp, std::set *exclude, std::set *include, int exclude_all, int basezoom, int layer, double droprate, int maxzoom, std::map *layermap, std::string layername, bool uses_gamma, std::map const *attribute_types, bool want_dist, bool filters); void *run_parse_json(void *v); #endif diff --git a/main.cpp b/main.cpp index 95fa40e..b8be5df 100644 --- a/main.cpp +++ b/main.cpp @@ -79,31 +79,6 @@ size_t TEMP_FILES; long long MAX_FILES; static long long diskfree; -struct reader { - int metafd; - int poolfd; - int treefd; - int geomfd; - int indexfd; - - FILE *metafile; - struct memfile *poolfile; - struct memfile *treefile; - FILE *geomfile; - FILE *indexfile; - - long long metapos; - long long geompos; - long long indexpos; - - long long file_bbox[4]; - - struct stat geomst; - struct stat metast; - - char *geom_map; -}; - void checkdisk(struct reader *r, int nreader) { long long used = 0; int i; @@ -399,6 +374,7 @@ void do_read_parallel(char *map, long long len, long long initial_offset, const } struct parse_json_args pja[CPUS]; + struct serialization_state sst[CPUS]; pthread_t pthreads[CPUS]; std::vector > file_subkeys; @@ -407,41 +383,35 @@ void do_read_parallel(char *map, long long len, long long initial_offset, const } for (size_t i = 0; i < CPUS; i++) { + sst[i].fname = reading; + sst[i].line = 0; + sst[i].layer_seq = &layer_seq[i]; + sst[i].progress_seq = progress_seq; + sst[i].readers = reader; + sst[i].segment = i; + sst[i].initialized = &initialized[i]; + sst[i].initial_x = &initial_x[i]; + sst[i].initial_y = &initial_y[i]; + sst[i].dist_sum = &(dist_sums[i]); + sst[i].dist_count = &(dist_counts[i]); + pja[i].jp = json_begin_map(map + segs[i], segs[i + 1] - segs[i]); - pja[i].reading = reading; - pja[i].layer_seq = &layer_seq[i]; - pja[i].progress_seq = progress_seq; - pja[i].metapos = &reader[i].metapos; - pja[i].geompos = &reader[i].geompos; - pja[i].indexpos = &reader[i].indexpos; pja[i].exclude = exclude; pja[i].include = include; pja[i].exclude_all = exclude_all; - pja[i].metafile = reader[i].metafile; - pja[i].geomfile = reader[i].geomfile; - pja[i].indexfile = reader[i].indexfile; - pja[i].poolfile = reader[i].poolfile; - pja[i].treefile = reader[i].treefile; - pja[i].fname = fname; pja[i].basezoom = basezoom; pja[i].layer = source; pja[i].droprate = droprate; - pja[i].file_bbox = reader[i].file_bbox; - pja[i].segment = i; - pja[i].initialized = &initialized[i]; - pja[i].initial_x = &initial_x[i]; - pja[i].initial_y = &initial_y[i]; - pja[i].readers = reader; pja[i].maxzoom = maxzoom; pja[i].layermap = &(*layermaps)[i]; pja[i].layername = &layername; pja[i].uses_gamma = uses_gamma; pja[i].attribute_types = attribute_types; - pja[i].dist_sum = &(dist_sums[i]); - pja[i].dist_count = &(dist_counts[i]); pja[i].want_dist = want_dist; pja[i].filters = filters; + pja[i].sst = &sst[i]; + if (pthread_create(&pthreads[i], NULL, run_parse_json, &pja[i]) != 0) { perror("pthread_create"); exit(EXIT_FAILURE); @@ -1386,20 +1356,29 @@ int read_input(std::vector &sources, char *fname, int maxzoom, int minzo overall_offset += ahead; checkdisk(reader, CPUS); } - } else if (c == '{' || c == 0xEF || c == ' ') { // XXX + } else { // Plain serial reading long long layer_seq = overall_offset; json_pull *jp = json_begin_file(fp); - parse_json(jp, reading.c_str(), &layer_seq, &progress_seq, &reader[0].metapos, &reader[0].geompos, &reader[0].indexpos, exclude, include, exclude_all, reader[0].metafile, reader[0].geomfile, reader[0].indexfile, reader[0].poolfile, reader[0].treefile, fname, basezoom, layer, droprate, reader[0].file_bbox, 0, &initialized[0], &initial_x[0], &initial_y[0], reader, maxzoom, &layermaps[0], sources[layer].layer, uses_gamma, attribute_types, &dist_sum, &dist_count, guess_maxzoom, prefilter != NULL || postfilter != NULL); + struct serialization_state sst; + + sst.fname = reading.c_str(); + sst.line = 0; + sst.layer_seq = &layer_seq; + sst.progress_seq = &progress_seq; + sst.readers = reader; + sst.segment = 0; + sst.initial_x = &initial_x[0]; + sst.initial_y = &initial_y[0]; + sst.initialized = &initialized[0]; + sst.dist_sum = &dist_sum; + sst.dist_count = &dist_count; + + parse_json(&sst, jp, exclude, include, exclude_all, basezoom, layer, droprate, maxzoom, &layermaps[0], sources[layer].layer, uses_gamma, attribute_types, guess_maxzoom, prefilter != NULL || postfilter != NULL); json_end(jp); overall_offset = layer_seq; checkdisk(reader, CPUS); - } else { - long long layer_seq = overall_offset; - parse_geobuf(fp, reading.c_str(), &layer_seq, &progress_seq, &reader[0].metapos, &reader[0].geompos, &reader[0].indexpos, exclude, include, exclude_all, reader[0].metafile, reader[0].geomfile, reader[0].indexfile, reader[0].poolfile, reader[0].treefile, fname, basezoom, layer, droprate, reader[0].file_bbox, 0, &initialized[0], &initial_x[0], &initial_y[0], reader, maxzoom, &layermaps[0], sources[layer].layer, uses_gamma, attribute_types, &dist_sum, &dist_count, guess_maxzoom); - overall_offset = layer_seq; - checkdisk(reader, CPUS); } if (fclose(fp) != 0) { diff --git a/serial.hpp b/serial.hpp index 83f95ce..3f0cf30 100644 --- a/serial.hpp +++ b/serial.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include "geometry.hpp" size_t fwrite_check(const void *ptr, size_t size, size_t nitems, FILE *stream, const char *fname); @@ -67,6 +68,31 @@ struct serial_feature { void serialize_feature(FILE *geomfile, serial_feature *sf, long long *geompos, const char *fname, long long wx, long long wy, bool include_minzoom); serial_feature deserialize_feature(FILE *geoms, long long *geompos_in, char *metabase, long long *meta_off, unsigned z, unsigned tx, unsigned ty, unsigned *initial_x, unsigned *initial_y); +struct reader { + int metafd; + int poolfd; + int treefd; + int geomfd; + int indexfd; + + FILE *metafile; + struct memfile *poolfile; + struct memfile *treefile; + FILE *geomfile; + FILE *indexfile; + + long long metapos; + long long geompos; + long long indexpos; + + long long file_bbox[4]; + + struct stat geomst; + struct stat metast; + + char *geom_map; +}; + struct serialization_state { const char *fname; // source file name int line; // user-oriented location within source for error reports @@ -77,20 +103,6 @@ struct serialization_state { struct reader *readers; // array of data for each input thread int segment; // the current input thread - FILE *geomfile; // main feature serialization; references metafile or poolfile - long long *geompos; - - FILE *metafile; // feature metadata; references poolfile - long long *metapos; - - FILE *indexfile; // quadkey index into geomfile - long long *indexpos; - - struct memfile *poolfile; // string pool for keys and values - struct memfile *treefile; // index into poolfile - - long long file_bbox[4]; // global bounding box - unsigned *initial_x; // relative offset of all geometries unsigned *initial_y; int *initialized;