Launch a separate thread to read each segment

This commit is contained in:
Eric Fischer 2015-12-22 14:02:31 -08:00
parent bd081c864e
commit 40ec317c36

123
geojson.c
View File

@ -18,6 +18,7 @@
#include <sqlite3.h>
#include <stdarg.h>
#include <sys/resource.h>
#include <pthread.h>
#include "jsonpull.h"
#include "tile.h"
@ -745,6 +746,38 @@ void parse_json(json_pull *jp, const char *reading, long long *seq, long long *m
}
}
struct parse_json_args {
json_pull *jp;
const char *reading;
long long *seq;
long long *metapos;
long long *geompos;
long long *indexpos;
struct pool *exclude;
struct pool *include;
int exclude_all;
FILE *metafile;
FILE *geomfile;
FILE *indexfile;
struct memfile *poolfile;
struct memfile *treefile;
char *fname;
int maxzoom;
int basezoom;
int layer;
double droprate;
long long *file_bbox;
int segment;
};
void *run_parse_json(void *v) {
struct parse_json_args *pja = v;
parse_json(pja->jp, pja->reading, pja->seq, pja->metapos, pja->geompos, pja->indexpos, pja->exclude, pja->include, pja->exclude_all, pja->metafile, pja->geomfile, pja->indexfile, pja->poolfile, pja->treefile, pja->fname, pja->maxzoom, pja->basezoom, pja->layer, pja->droprate, pja->file_bbox, pja->segment);
return NULL;
}
struct jsonmap {
char *map;
long long off;
@ -808,10 +841,9 @@ struct reader {
int read_json(int argc, char **argv, char *fname, const char *layername, int maxzoom, int minzoom, int basezoom, double basezoom_marker_width, sqlite3 *outdb, struct pool *exclude, struct pool *include, int exclude_all, double droprate, int buffer, const char *tmpdir, double gamma, char *prevent, char *additional) {
int ret = EXIT_SUCCESS;
#define THREADS 10
struct reader reader[THREADS];
struct reader reader[CPUS];
int i;
for (i = 0; i < THREADS; i++) {
for (i = 0; i < CPUS; i++) {
struct reader *r = reader + i;
r->metaname = malloc(strlen(tmpdir) + strlen("/meta.XXXXXXXX") + 1);
@ -944,13 +976,13 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
}
if (map != NULL && map != MAP_FAILED) {
long long segs[THREADS + 1];
long long segs[CPUS + 1];
segs[0] = 0;
segs[THREADS] = st.st_size - off;
segs[CPUS] = st.st_size - off;
int i;
for (i = 1; i < THREADS; i++) {
segs[i] = off + (st.st_size - off) * i / THREADS;
for (i = 1; i < CPUS; i++) {
segs[i] = off + (st.st_size - off) * i / CPUS;
while (segs[i] < st.st_size && map[segs[i]] != '\n') {
segs[i]++;
@ -959,11 +991,47 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
printf("%d %lld\n", i, segs[i]);
}
for (i = 0; i < THREADS; i++) {
json_pull *jp = json_begin_map(map + segs[i], segs[i + 1] - segs[i]);
parse_json(jp, reading, &seq, &reader[i].metapos, &reader[i].geompos, &reader[i].indexpos, exclude, include, exclude_all, reader[i].metafile, reader[i].geomfile, reader[i].indexfile, reader[i].poolfile, reader[i].treefile, fname, maxzoom, basezoom, source, droprate, reader[i].file_bbox, i);
free(jp->source);
json_end(jp);
struct parse_json_args pja[CPUS];
pthread_t pthreads[CPUS];
for (i = 0; i < CPUS; i++) {
pja[i].jp = json_begin_map(map + segs[i], segs[i + 1] - segs[i]);
pja[i].reading = reading;
pja[i].seq = &seq;
pja[i].metapos = &reader[i].metapos;
pja[i].geompos = &reader[i].geompos;
pja[i].indexpos = &reader[i].indexpos;
pja[i].exclude = exclude;
pja[i].include = include;
pja[i].exclude_all = exclude_all;
pja[i].metafile = reader[i].metafile;
pja[i].geomfile = reader[i].geomfile;
pja[i].indexfile = reader[i].indexfile;
pja[i].poolfile = reader[i].poolfile;
pja[i].treefile = reader[i].treefile;
pja[i].fname = fname;
pja[i].maxzoom = maxzoom;
pja[i].basezoom = basezoom;
pja[i].layer = source;
pja[i].droprate = droprate;
pja[i].file_bbox = reader[i].file_bbox;
pja[i].segment = i;
if (pthread_create(&pthreads[i], NULL, run_parse_json, &pja[i]) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}
}
for (i = 0; i < CPUS; i++) {
void *retval;
if (pthread_join(pthreads[i], &retval) != 0) {
perror("pthread_join");
}
free(pja[i].jp->source);
json_end(pja[i].jp);
}
} else {
FILE *fp = fdopen(fd, "r");
@ -980,7 +1048,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
}
}
for (i = 0; i < THREADS; i++) {
for (i = 0; i < CPUS; i++) {
fclose(reader[i].metafile);
fclose(reader[i].geomfile);
fclose(reader[i].indexfile);
@ -1070,7 +1138,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
unlink(indexname);
long long indexpos = 0;
for (i = 0; i < THREADS; i++) {
for (i = 0; i < CPUS; i++) {
if (reader[i].indexpos > 0) {
void *map = mmap(NULL, reader[i].indexpos, PROT_READ, MAP_PRIVATE, reader[i].indexfd, 0);
if (map == MAP_FAILED) {
@ -1371,12 +1439,17 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
}
unlink(indexname);
for (i = 0; i < THREADS; i++) {
reader[i].geom_map = mmap(NULL, reader[i].geomst.st_size, PROT_READ, MAP_PRIVATE, reader[i].geomfd, 0);
if (reader[i].geom_map == MAP_FAILED) {
perror("mmap unsorted geometry");
exit(EXIT_FAILURE);
for (i = 0; i < CPUS; i++) {
reader[i].geom_map = NULL;
if (reader[i].geomst.st_size > 0) {
reader[i].geom_map = mmap(NULL, reader[i].geomst.st_size, PROT_READ, MAP_PRIVATE, reader[i].geomfd, 0);
if (reader[i].geom_map == MAP_FAILED) {
perror("mmap unsorted geometry");
exit(EXIT_FAILURE);
}
}
if (close(reader[i].geomfd) != 0) {
perror("close unsorted geometry");
}
@ -1434,7 +1507,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
perror("unmap sorted index");
}
for (i = 0; i < THREADS; i++) {
for (i = 0; i < CPUS; i++) {
if (munmap(reader[i].geom_map, reader[i].geomst.st_size) != 0) {
perror("unmap unsorted geometry");
}
@ -1469,7 +1542,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
}
if (!quiet) {
for (i = 0; i < THREADS; i++) {
for (i = 0; i < CPUS; i++) {
fprintf(stderr, "%lld features, %lld bytes of geometry, %lld bytes of metadata, %lld bytes of string pool\n", seq, (long long) reader[i].geomst.st_size, (long long) reader[i].metast.st_size, reader[i].poolfile->off);
}
}
@ -1478,8 +1551,8 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
// but keep track of the offsets into it since we still need
// segment+offset to find the data.
long long pool_off[THREADS];
long long meta_off[THREADS];
long long pool_off[CPUS];
long long meta_off[CPUS];
char poolname[strlen(tmpdir) + strlen("/pool.XXXXXXXX") + 1];
sprintf(poolname, "%s%s", tmpdir, "/pool.XXXXXXXX");
@ -1518,7 +1591,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
long long metapos = 0;
long long poolpos = 0;
for (i = 0; i < THREADS; i++) {
for (i = 0; i < CPUS; i++) {
if (reader[i].metapos > 0) {
void *map = mmap(NULL, reader[i].metapos, PROT_READ, MAP_PRIVATE, reader[i].metafd, 0);
if (map == MAP_FAILED) {
@ -1595,7 +1668,7 @@ int read_json(int argc, char **argv, char *fname, const char *layername, int max
midlon = (maxlon + minlon) / 2;
long long file_bbox[4] = { UINT_MAX, UINT_MAX, 0, 0 };
for (i = 0; i < THREADS; i++) {
for (i = 0; i < CPUS; i++) {
if (reader[i].file_bbox[0] < file_bbox[0]) {
file_bbox[0] = reader[i].file_bbox[0];
}