work towards making rhizome direct http pull work. #9

This commit is contained in:
gardners 2012-10-02 23:26:21 +09:30
parent 7c544b25f2
commit a81dafa180
4 changed files with 124 additions and 8 deletions

View File

@ -511,3 +511,8 @@ extern unsigned char favicon_bytes[];
extern int favicon_len;
int rhizome_import_from_files(const char *manifestpath,const char *filepath);
int rhizome_fetch_request_manifest_by_prefix(struct sockaddr_in *peerip,
unsigned char *prefix,
int prefix_length,
int importP);
extern int rhizome_file_fetch_queue_count;

View File

@ -508,9 +508,9 @@ int rhizome_direct_parse_http_request(rhizome_http_request *r)
r->request_length = 0;
r->source_flags = 0;
/* Find the end of the headers and start of any body bytes that we have read so far.
Copy the bytes to a separate buffer, because r->request and r->request_length get used
internally in the parser.
/* Find the end of the headers and start of any body bytes that we have read
so far. Copy the bytes to a separate buffer, because r->request and
r->request_length get used internally in the parser.
*/
if (contentlen) {
char buffer[contentlen];
@ -736,10 +736,25 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
if (type==2&&r->pullP) {
WARN("XXX Rhizome direct http pull yet implemented");
/* Need to fetch manifest. Once we have the manifest, then we can
use our normal bundle fetch routines from rhizome_fetch.c
use our normal bundle fetch routines from rhizome_fetch.c
Generate a request like: GET /rhizome/manifestbybar/<hex of bar>
and add it to our list of HTTP fetch requests, then watch
until the request is finished. That will give us the manifest.
Then as noted above, we can use that to pull the file down using
existing routines.
*/
if (!rhizome_fetch_request_manifest_by_prefix
(&addr,(unsigned char *)&p[i+1],RHIZOME_BAR_PREFIX_BYTES,
1 /* import, getting file if needed */))
{
/* Fetching the manifest, and then using it to see if we want to
fetch the file for import is all handled asynchronously, so just
wait for it to finish. */
while(rhizome_file_fetch_queue_count) fd_poll();
}
} else if (type==1&&r->pushP) {
WARN("XXX rhizome direct http push not implemented");
/* Form up the POST request to submit the appropriate bundle. */
/* Start by getting the manifest, which is the main thing we need, and also

View File

@ -19,6 +19,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include <time.h>
#include <arpa/inet.h>
#include <assert.h>
#include "serval.h"
#include "rhizome.h"
@ -782,9 +783,15 @@ void rhizome_write_content(rhizome_file_fetch_record *q, char *buffer, int bytes
DEBUGF("Received all of file via rhizome -- now to import it");
fclose(q->file);
q->file = NULL;
rhizome_import_received_bundle(q->manifest);
rhizome_manifest_free(q->manifest);
q->manifest = NULL;
if (q->manifest) {
rhizome_import_received_bundle(q->manifest);
rhizome_manifest_free(q->manifest);
q->manifest = NULL;
} else {
/* This was to fetch the manifest, so now fetch the file if needed */
DEBUGF("Received a manifest in response to supplying a manifest prefix.");
DEBUGF("XXX Not implemented scheduling the fetching of the file itself.");
}
rhizome_fetch_close(q);
return;
}
@ -937,3 +944,91 @@ void rhizome_fetch_poll(struct sched_ent *alarm)
}
return;
}
int rhizome_fetch_request_manifest_by_prefix(struct sockaddr_in *peerip,
unsigned char *prefix,
int prefix_length,
int importP)
{
assert(peerip);
/* Transfer via HTTP over IPv4 */
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1)
return WHY_perror("socket");
if (set_nonblock(sock) == -1) {
close(sock);
return -1;
}
struct sockaddr_in addr = *peerip;
addr.sin_family = AF_INET;
INFOF("RHIZOME HTTP REQUEST, CONNECT family=%u port=%u addr=%u.%u.%u.%u",
addr.sin_family, ntohs(addr.sin_port),
((unsigned char*)&addr.sin_addr.s_addr)[0],
((unsigned char*)&addr.sin_addr.s_addr)[1],
((unsigned char*)&addr.sin_addr.s_addr)[2],
((unsigned char*)&addr.sin_addr.s_addr)[3]
);
if (connect(sock, (struct sockaddr*)&addr, sizeof addr) == -1) {
if (errno == EINPROGRESS) {
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("connect() returned EINPROGRESS");
} else {
WHY_perror("connect");
WHY("Failed to open socket to peer's rhizome web server");
close(sock);
return -1;
}
}
rhizome_file_fetch_record *q=&file_fetch_queue[rhizome_file_fetch_queue_count];
q->manifest = NULL;
q->alarm.poll.fd=sock;
bzero(q->fileid, sizeof(q->fileid));
q->request_len = snprintf(q->request, sizeof q->request, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(prefix,prefix_length));
q->request_ofs=0;
q->state=RHIZOME_FETCH_CONNECTING;
q->file_len=-1;
q->file_ofs=0;
/* XXX Don't forget to implement resume */
/* XXX We should stream file straight into the database */
const char *id = rhizome_manifest_get(q->manifest, "id", NULL, 0);
if (id == NULL) {
close(sock);
return WHY("Manifest missing ID");
}
if (create_rhizome_import_dir() == -1)
return -1;
char filename[1024];
if (!FORM_RHIZOME_IMPORT_PATH(filename, "file.%s", id)) {
close(sock);
return -1;
}
q->manifest->dataFileName = strdup(filename);
if ((q->file = fopen(q->manifest->dataFileName, "w")) == NULL) {
WHY_perror("fopen");
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Could not open '%s' to write received file", q->manifest->dataFileName);
close(sock);
return -1;
}
INFOF("RHIZOME HTTP REQUEST, GET \"/rhizome/file/%s\"", q->fileid);
/* Watch for activity on the socket */
q->alarm.function=rhizome_fetch_poll;
fetch_stats.name="rhizome_fetch_poll";
q->alarm.stats=&fetch_stats;
q->alarm.poll.events=POLLIN|POLLOUT;
watch(&q->alarm);
/* And schedule a timeout alarm */
q->alarm.alarm=gettime_ms() + RHIZOME_IDLE_TIMEOUT;
q->alarm.deadline = q->alarm.alarm + RHIZOME_IDLE_TIMEOUT;
schedule(&q->alarm);
rhizome_file_fetch_queue_count++;
if (debug & DEBUG_RHIZOME_RX)
DEBUGF("Queued file for fetching into %s (%d in queue)",
q->manifest->dataFileName, rhizome_file_fetch_queue_count);
return 0;
}

View File

@ -21,6 +21,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#include "rhizome.h"
#include <assert.h>
#include <stdlib.h>
#include <math.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>