New enum rhizome_payload_status

Refactor a lot of Rhizome bundle storage code to use the new "enum
rhizome_payload_status" instead of mysterious int values to represent
the outcome of the operation.
This commit is contained in:
Andrew Bettison 2013-12-28 05:26:22 +10:30
parent b37e27f5da
commit 72040517e1
13 changed files with 564 additions and 366 deletions

View File

@ -1511,17 +1511,34 @@ int app_rhizome_add_file(const struct cli_parsed *parsed, struct cli_context *co
}
enum rhizome_bundle_status status = RHIZOME_BUNDLE_STATUS_NEW;
enum rhizome_payload_status pstatus;
if (journal){
if (rhizome_append_journal_file(m, 0, filepath))
status = -1;
pstatus = rhizome_append_journal_file(m, 0, filepath);
} else {
int n = rhizome_stat_payload_file(m, filepath);
if (n == 0 && m->filesize)
n = rhizome_store_payload_file(m, filepath);
if (n == -1)
status = -1;
else if (n)
pstatus = rhizome_stat_payload_file(m, filepath);
assert(m->filesize != RHIZOME_SIZE_UNSET);
if (pstatus == RHIZOME_PAYLOAD_STATUS_NEW) {
assert(m->filesize > 0);
pstatus = rhizome_store_payload_file(m, filepath);
}
}
switch (pstatus) {
case RHIZOME_PAYLOAD_STATUS_EMPTY:
case RHIZOME_PAYLOAD_STATUS_STORED:
case RHIZOME_PAYLOAD_STATUS_NEW:
break;
case RHIZOME_PAYLOAD_STATUS_ERROR:
status = RHIZOME_BUNDLE_STATUS_ERROR;
break;
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
status = RHIZOME_BUNDLE_STATUS_INCONSISTENT;
break;
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
status = RHIZOME_BUNDLE_STATUS_FAKE;
break;
default:
FATALF("pstatus = %d", pstatus);
}
rhizome_manifest *mout = m;
if (status == RHIZOME_BUNDLE_STATUS_NEW) {
@ -1553,6 +1570,7 @@ int app_rhizome_add_file(const struct cli_parsed *parsed, struct cli_context *co
case RHIZOME_BUNDLE_STATUS_INCONSISTENT:
case RHIZOME_BUNDLE_STATUS_ERROR:
case RHIZOME_BUNDLE_STATUS_INVALID:
case RHIZOME_BUNDLE_STATUS_FAKE:
break;
default:
FATALF("status=%d", status);
@ -1793,20 +1811,19 @@ int app_rhizome_extract(const struct cli_parsed *parsed, struct cli_context *con
cli_put_manifest(context, m);
}
int retfile=0;
enum rhizome_payload_status pstatus = RHIZOME_PAYLOAD_STATUS_EMPTY;
if (ret==0 && m->filesize != 0 && filepath && *filepath){
if (extract){
// Save the file, implicitly decrypting if required.
retfile = rhizome_extract_file(m, filepath);
if (retfile)
WHYF("rhizome_extract_file() returned %d", retfile);
pstatus = rhizome_extract_file(m, filepath);
if (pstatus != RHIZOME_PAYLOAD_STATUS_EMPTY && pstatus != RHIZOME_PAYLOAD_STATUS_STORED)
WHYF("rhizome_extract_file() returned %d", pstatus);
}else{
// Save the file without attempting to decrypt
uint64_t length;
retfile = rhizome_dump_file(&m->filehash, filepath, &length);
if (retfile)
WHYF("rhizome_dump_file() returned %d", retfile);
pstatus = rhizome_dump_file(&m->filehash, filepath, &length);
if (pstatus != RHIZOME_PAYLOAD_STATUS_EMPTY && pstatus != RHIZOME_PAYLOAD_STATUS_STORED)
WHYF("rhizome_dump_file() returned %d", pstatus);
}
}
@ -1819,14 +1836,28 @@ int app_rhizome_extract(const struct cli_parsed *parsed, struct cli_context *con
} else {
int append = (strcmp(manifestpath, filepath)==0)?1:0;
// don't write out the manifest if we were asked to append it and writing the file failed.
if ((!append) || retfile==0){
if (!append || (pstatus == RHIZOME_PAYLOAD_STATUS_EMPTY || pstatus == RHIZOME_PAYLOAD_STATUS_STORED)) {
if (rhizome_write_manifest_file(m, manifestpath, append) == -1)
ret = -1;
}
}
}
if (retfile)
ret = retfile == -1 ? -1 : 1;
switch (pstatus) {
case RHIZOME_PAYLOAD_STATUS_EMPTY:
case RHIZOME_PAYLOAD_STATUS_STORED:
break;
case RHIZOME_PAYLOAD_STATUS_NEW:
ret = 1; // payload not found
break;
case RHIZOME_PAYLOAD_STATUS_ERROR:
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
ret = -1;
break;
default:
FATALF("pstatus = %d", pstatus);
}
if (m)
rhizome_manifest_free(m);
keyring_free(keyring);
@ -1851,9 +1882,21 @@ int app_rhizome_export_file(const struct cli_parsed *parsed, struct cli_context
if (!rhizome_exists(&hash))
return 1;
uint64_t length;
int ret = rhizome_dump_file(&hash, filepath, &length);
if (ret)
return ret == -1 ? -1 : 1;
enum rhizome_payload_status pstatus = rhizome_dump_file(&hash, filepath, &length);
switch (pstatus) {
case RHIZOME_PAYLOAD_STATUS_EMPTY:
case RHIZOME_PAYLOAD_STATUS_STORED:
break;
case RHIZOME_PAYLOAD_STATUS_NEW:
return 1; // payload not found
case RHIZOME_PAYLOAD_STATUS_ERROR:
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
return -1;
default:
FATALF("pstatus = %d", pstatus);
}
cli_field_name(context, "filehash", ":");
cli_put_string(context, alloca_tohex_rhizome_filehash_t(hash), "\n");
cli_field_name(context, "filesize", ":");

View File

@ -2083,9 +2083,7 @@ struct nm_record nm_cache[NM_CACHE_SLOTS];
unsigned char *keyring_get_nm_bytes(const sid_t *known_sidp, const sid_t *unknown_sidp)
{
IN();
if (!known_sidp) { RETURNNULL(WHYNULL("known pub key is null")); }
if (!unknown_sidp) { RETURNNULL(WHYNULL("unknown pub key is null")); }
if (!keyring) { RETURNNULL(WHYNULL("keyring is null")); }
assert(keyring != NULL);
/* See if we have it cached already */
unsigned i;

126
meshms.c
View File

@ -262,17 +262,17 @@ static int ply_read_open(struct ply_read *ply, const rhizome_bid_t *bid, rhizome
DEBUGF("Opening ply %s", alloca_tohex_rhizome_bid_t(*bid));
if (rhizome_retrieve_manifest(bid, m))
return -1;
int ret = rhizome_open_decrypt_read(m, &ply->read);
if (ret == 1)
enum rhizome_payload_status pstatus = rhizome_open_decrypt_read(m, &ply->read);
if (pstatus == RHIZOME_PAYLOAD_STATUS_NEW)
WARNF("Payload was not found for manifest %s, %"PRIu64, alloca_tohex_rhizome_bid_t(m->cryptoSignPublic), m->version);
if (ret != 0)
return ret;
if (pstatus != RHIZOME_PAYLOAD_STATUS_STORED && pstatus != RHIZOME_PAYLOAD_STATUS_EMPTY)
return -1;
assert(m->filesize != RHIZOME_SIZE_UNSET);
ply->read.offset = ply->read.length = m->filesize;
return 0;
}
static int ply_read_close(struct ply_read *ply)
static void ply_read_close(struct ply_read *ply)
{
if (ply->buffer){
free(ply->buffer);
@ -280,7 +280,7 @@ static int ply_read_close(struct ply_read *ply)
}
ply->buffer_size=0;
ply->buff.len=0;
return rhizome_read_close(&ply->read);
rhizome_read_close(&ply->read);
}
// read the next record from the ply (backwards)
@ -367,14 +367,36 @@ static int append_meshms_buffer(const sid_t *my_sid, struct conversations *conv,
assert(m->haveSecret);
assert(m->authorship == AUTHOR_AUTHENTIC);
if (rhizome_append_journal_buffer(m, 0, buffer, len))
enum rhizome_payload_status pstatus = rhizome_append_journal_buffer(m, 0, buffer, len);
if (pstatus != RHIZOME_PAYLOAD_STATUS_NEW)
goto end;
if (rhizome_manifest_finalise(m, &mout, 1) == -1)
goto end;
ret=0;
enum rhizome_bundle_status status = rhizome_manifest_finalise(m, &mout, 1);
switch (status) {
case RHIZOME_BUNDLE_STATUS_ERROR:
// error is already logged
break;
case RHIZOME_BUNDLE_STATUS_NEW:
ret = 0;
break;
case RHIZOME_BUNDLE_STATUS_SAME:
case RHIZOME_BUNDLE_STATUS_DUPLICATE:
case RHIZOME_BUNDLE_STATUS_OLD:
WHYF("MeshMS ply manifest (version=%"PRIu64") gazumped by Rhizome store (version=%"PRIu64")",
m->version, mout->version);
break;
case RHIZOME_BUNDLE_STATUS_INCONSISTENT:
WHYF("MeshMS ply manifest not consistent with payload");
break;
case RHIZOME_BUNDLE_STATUS_FAKE:
WHYF("MeshMS ply manifest is not signed");
break;
case RHIZOME_BUNDLE_STATUS_INVALID:
WHYF("MeshMS ply manifest is invalid");
break;
default:
FATALF("status=%d", status);
}
end:
if (mout && mout!=m)
rhizome_manifest_free(mout);
@ -385,7 +407,8 @@ end:
// update if any conversations are unread or need to be acked.
// return -1 for failure, 1 if the conversation index needs to be saved.
static int update_conversation(const sid_t *my_sid, struct conversations *conv){
static int update_conversation(const sid_t *my_sid, struct conversations *conv)
{
if (config.debug.meshms)
DEBUG("Checking if conversation needs to be acked");
@ -489,7 +512,8 @@ end:
}
// update conversations, and return 1 if the conversation index should be saved
static int update_conversations(const sid_t *my_sid, struct conversations *conv){
static int update_conversations(const sid_t *my_sid, struct conversations *conv)
{
if (!conv)
return 0;
int ret = 0;
@ -519,13 +543,13 @@ static int read_known_conversations(rhizome_manifest *m, const sid_t *their_sid,
struct rhizome_read_buffer buff;
bzero(&buff, sizeof(buff));
int ret = rhizome_open_decrypt_read(m, &read);
if (ret == -1)
int ret = -1;
enum rhizome_payload_status pstatus = rhizome_open_decrypt_read(m, &read);
if (pstatus != RHIZOME_PAYLOAD_STATUS_STORED)
goto end;
unsigned char version=0xFF;
ssize_t r = rhizome_read_buffered(&read, &buff, &version, 1);
ret = -1;
if (r == -1)
goto end;
if (version != 1) {
@ -593,12 +617,13 @@ static ssize_t write_conversation(struct rhizome_write *write, struct conversati
len+=measure_packed_uint(conv->read_offset);
len+=measure_packed_uint(conv->their_size);
}
DEBUGF("len %s, %"PRId64", %"PRId64", %"PRId64" = %zu",
alloca_tohex_sid_t(conv->them),
conv->their_last_message,
conv->read_offset,
conv->their_size,
len);
if (config.debug.meshms)
DEBUGF("len %s, %"PRId64", %"PRId64", %"PRId64" = %zu",
alloca_tohex_sid_t(conv->them),
conv->their_last_message,
conv->read_offset,
conv->their_size,
len);
}
// write the two child nodes
ssize_t ret = write_conversation(write, conv->_left);
@ -630,21 +655,46 @@ static int write_known_conversations(rhizome_manifest *m, struct conversations *
// then write it
rhizome_manifest_set_version(m, m->version + 1);
rhizome_manifest_set_filesize(m, (size_t)len + 1);
if (rhizome_write_open_manifest(&write, m) == -1)
goto end;
unsigned char version=1;
if (rhizome_write_buffer(&write, &version, 1) == -1)
goto end;
if (write_conversation(&write, conv) == -1)
goto end;
if (rhizome_finish_write(&write))
goto end;
rhizome_manifest_set_filehash(m, &write.id);
if (rhizome_manifest_finalise(m, &mout, 1) == -1)
goto end;
ret=0;
rhizome_manifest_set_filehash(m, NULL);
enum rhizome_payload_status pstatus = rhizome_write_open_manifest(&write, m);
if (pstatus == RHIZOME_PAYLOAD_STATUS_NEW) {
unsigned char version=1;
if (rhizome_write_buffer(&write, &version, 1) == -1)
goto end;
if (write_conversation(&write, conv) == -1)
goto end;
pstatus = rhizome_finish_write(&write);
if (pstatus != RHIZOME_PAYLOAD_STATUS_NEW)
goto end;
rhizome_manifest_set_filehash(m, &write.id);
}
enum rhizome_bundle_status status = rhizome_manifest_finalise(m, &mout, 1);
switch (status) {
case RHIZOME_BUNDLE_STATUS_ERROR:
// error is already logged
break;
case RHIZOME_BUNDLE_STATUS_NEW:
ret = 0;
break;
case RHIZOME_BUNDLE_STATUS_SAME:
case RHIZOME_BUNDLE_STATUS_DUPLICATE:
case RHIZOME_BUNDLE_STATUS_OLD:
WHYF("MeshMS conversation manifest (version=%"PRIu64") gazumped by Rhizome store (version=%"PRIu64")",
m->version, mout->version);
break;
case RHIZOME_BUNDLE_STATUS_INCONSISTENT:
WHY("MeshMS conversation manifest not consistent with payload");
break;
case RHIZOME_BUNDLE_STATUS_FAKE:
WHY("MeshMS conversation manifest is not signed");
break;
case RHIZOME_BUNDLE_STATUS_INVALID:
WHY("MeshMS conversation manifest is invalid");
break;
default:
FATALF("status=%d", status);
}
end:
if (ret)
rhizome_fail_write(&write);

View File

@ -90,14 +90,14 @@ int rhizome_mdp_send_block(struct subscriber *dest, const rhizome_bid_t *bid, ui
write_uint64(&reply.out.payload[1+16+8], offset);
int bytes_read = rhizome_read_cached(bid, version, gettime_ms()+5000, offset, &reply.out.payload[1+16+8+8], blockLength);
ssize_t bytes_read = rhizome_read_cached(bid, version, gettime_ms()+5000, offset, &reply.out.payload[1+16+8+8], blockLength);
if (bytes_read<=0)
break;
reply.out.payload_length=1+16+8+8+bytes_read;
reply.out.payload_length=1+16+8+8+(size_t)bytes_read;
// Mark the last block of the file, if required
if (bytes_read < blockLength)
if ((size_t)bytes_read < blockLength)
reply.out.payload[0]='T';
// send packet

View File

@ -158,13 +158,24 @@ enum rhizome_bundle_status rhizome_bundle_import_files(rhizome_manifest *m, rhiz
return RHIZOME_BUNDLE_STATUS_INVALID;
enum rhizome_bundle_status status = rhizome_manifest_check_stored(m, mout);
if (status == RHIZOME_BUNDLE_STATUS_NEW) {
int n = rhizome_import_payload_from_file(m, filepath);
if (n == -1)
return -1;
if (n != 0)
status = RHIZOME_BUNDLE_STATUS_INCONSISTENT;
else if (rhizome_store_manifest(m) == -1)
return -1;
enum rhizome_payload_status pstatus = rhizome_import_payload_from_file(m, filepath);
switch (pstatus) {
case RHIZOME_PAYLOAD_STATUS_EMPTY:
case RHIZOME_PAYLOAD_STATUS_STORED:
case RHIZOME_PAYLOAD_STATUS_NEW:
if (rhizome_store_manifest(m) == -1)
return -1;
break;
case RHIZOME_PAYLOAD_STATUS_ERROR:
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
return -1;
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
status = RHIZOME_BUNDLE_STATUS_INCONSISTENT;
break;
default:
FATALF("pstatus = %d", pstatus);
}
}
return status;
}

View File

@ -463,6 +463,16 @@ enum rhizome_bundle_status {
RHIZOME_BUNDLE_STATUS_INCONSISTENT = 6, // manifest filesize/filehash does not match supplied payload
};
enum rhizome_payload_status {
RHIZOME_PAYLOAD_STATUS_ERROR = -1,
RHIZOME_PAYLOAD_STATUS_EMPTY = 0, // payload is empty (zero length)
RHIZOME_PAYLOAD_STATUS_NEW = 1, // payload is not yet in store
RHIZOME_PAYLOAD_STATUS_STORED = 2, // payload is already in store
RHIZOME_PAYLOAD_STATUS_WRONG_SIZE = 3, // payload's size does not match manifest
RHIZOME_PAYLOAD_STATUS_WRONG_HASH = 4, // payload's hash does not match manifest
RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL = 5, // cannot encrypt/decrypt (payload key unknown)
};
int rhizome_write_manifest_file(rhizome_manifest *m, const char *filename, char append);
int rhizome_manifest_selfsign(rhizome_manifest *m);
int rhizome_drop_stored_file(const rhizome_filehash_t *hashp, int maximum_priority);
@ -936,34 +946,34 @@ int unpack_http_response(char *response, struct http_response_parts *parts);
/* rhizome storage methods */
int rhizome_exists(const rhizome_filehash_t *hashp);
int rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *expectedHashp, uint64_t file_length, int priority);
enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *expectedHashp, uint64_t file_length, int priority);
int rhizome_write_buffer(struct rhizome_write *write_state, unsigned char *buffer, size_t data_size);
int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, unsigned char *buffer, size_t data_size);
int rhizome_write_open_manifest(struct rhizome_write *write, rhizome_manifest *m);
enum rhizome_payload_status rhizome_write_open_manifest(struct rhizome_write *write, rhizome_manifest *m);
int rhizome_write_file(struct rhizome_write *write, const char *filename);
int rhizome_fail_write(struct rhizome_write *write);
int rhizome_finish_write(struct rhizome_write *write);
int rhizome_import_payload_from_file(rhizome_manifest *m, const char *filepath);
int rhizome_import_buffer(rhizome_manifest *m, unsigned char *buffer, size_t length);
int rhizome_stat_payload_file(rhizome_manifest *m, const char *filepath);
int rhizome_store_payload_file(rhizome_manifest *m, const char *filepath);
void rhizome_fail_write(struct rhizome_write *write);
enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write);
enum rhizome_payload_status rhizome_import_payload_from_file(rhizome_manifest *m, const char *filepath);
enum rhizome_payload_status rhizome_import_buffer(rhizome_manifest *m, unsigned char *buffer, size_t length);
enum rhizome_payload_status rhizome_stat_payload_file(rhizome_manifest *m, const char *filepath);
enum rhizome_payload_status rhizome_store_payload_file(rhizome_manifest *m, const char *filepath);
int rhizome_derive_payload_key(rhizome_manifest *m);
int rhizome_append_journal_buffer(rhizome_manifest *m, uint64_t advance_by, unsigned char *buffer, size_t len);
int rhizome_append_journal_file(rhizome_manifest *m, uint64_t advance_by, const char *filename);
int rhizome_journal_pipe(struct rhizome_write *write, const rhizome_filehash_t *hashp, uint64_t start_offset, uint64_t length);
enum rhizome_payload_status rhizome_append_journal_buffer(rhizome_manifest *m, uint64_t advance_by, unsigned char *buffer, size_t len);
enum rhizome_payload_status rhizome_append_journal_file(rhizome_manifest *m, uint64_t advance_by, const char *filename);
enum rhizome_payload_status rhizome_journal_pipe(struct rhizome_write *write, const rhizome_filehash_t *hashp, uint64_t start_offset, uint64_t length);
int rhizome_crypt_xor_block(unsigned char *buffer, size_t buffer_size, uint64_t stream_offset,
const unsigned char *key, const unsigned char *nonce);
int rhizome_open_read(struct rhizome_read *read, const rhizome_filehash_t *hashp);
enum rhizome_payload_status rhizome_open_read(struct rhizome_read *read, const rhizome_filehash_t *hashp);
ssize_t rhizome_read(struct rhizome_read *read, unsigned char *buffer, size_t buffer_length);
ssize_t rhizome_read_buffered(struct rhizome_read *read, struct rhizome_read_buffer *buffer, unsigned char *data, size_t len);
int rhizome_read_close(struct rhizome_read *read);
int rhizome_open_decrypt_read(rhizome_manifest *m, struct rhizome_read *read_state);
int rhizome_extract_file(rhizome_manifest *m, const char *filepath);
int rhizome_dump_file(const rhizome_filehash_t *hashp, const char *filepath, uint64_t *lengthp);
int rhizome_read_cached(const rhizome_bid_t *bid, uint64_t version, time_ms_t timeout,
uint64_t fileOffset, unsigned char *buffer, size_t length);
void rhizome_read_close(struct rhizome_read *read);
enum rhizome_payload_status rhizome_open_decrypt_read(rhizome_manifest *m, struct rhizome_read *read_state);
enum rhizome_payload_status rhizome_extract_file(rhizome_manifest *m, const char *filepath);
enum rhizome_payload_status rhizome_dump_file(const rhizome_filehash_t *hashp, const char *filepath, uint64_t *lengthp);
ssize_t rhizome_read_cached(const rhizome_bid_t *bid, uint64_t version, time_ms_t timeout,
uint64_t fileOffset, unsigned char *buffer, size_t length);
int rhizome_cache_close();
int rhizome_database_filehash_from_id(const rhizome_bid_t *bidp, uint64_t version, rhizome_filehash_t *hashp);

View File

@ -116,9 +116,8 @@ void _rhizome_manifest_set_id(struct __sourceloc __whence, rhizome_manifest *m,
{
const char *v = rhizome_manifest_set(m, "id", alloca_tohex_rhizome_bid_t(*bidp));
assert(v); // TODO: remove known manifest fields from vars[]
// If the BID is changed, the secret key and bundle key are no longer valid.
if (m->has_id && bidp != &m->cryptoSignPublic && cmp_rhizome_bid_t(&m->cryptoSignPublic, bidp) != 0) {
m->cryptoSignPublic = *bidp;
// The BID just changed, so the secret key and bundle key are no longer valid.
if (m->haveSecret) {
m->haveSecret = SECRET_UNKNOWN;
bzero(m->cryptoSignSecret, sizeof m->cryptoSignSecret); // not strictly necessary but aids debugging
@ -131,6 +130,7 @@ void _rhizome_manifest_set_id(struct __sourceloc __whence, rhizome_manifest *m,
if (m->authorship == AUTHOR_AUTHENTIC)
m->authorship = AUTHOR_LOCAL;
}
m->cryptoSignPublic = *bidp;
m->has_id = 1;
m->finalised = 0;
}
@ -1057,6 +1057,7 @@ int rhizome_manifest_selfsign(rhizome_manifest *m)
);
bcopy(sig.signature, m->manifestdata + m->manifest_body_bytes, sig.signatureLength);
m->manifest_all_bytes = m->manifest_body_bytes + sig.signatureLength;
m->selfSigned = 1;
return 0;
}
@ -1126,8 +1127,10 @@ enum rhizome_bundle_status rhizome_manifest_finalise(rhizome_manifest *m, rhizom
RETURN(WHY("Could not convert manifest to wire format"));
/* Sign it */
assert(!m->selfSigned);
if (rhizome_manifest_selfsign(m))
RETURN(WHY("Could not sign manifest"));
assert(m->selfSigned);
/* mark manifest as finalised */
enum rhizome_bundle_status status = rhizome_add_manifest(m, mout);
@ -1157,7 +1160,7 @@ int rhizome_fill_manifest(rhizome_manifest *m, const char *filepath, const sid_t
/* Set the bundle ID (public key) and secret key.
*/
if (!m->haveSecret && rhizome_bid_t_is_zero(m->cryptoSignPublic)) {
if (!m->haveSecret && !m->has_id) {
if (config.debug.rhizome)
DEBUG("creating new bundle");
if (rhizome_manifest_createid(m) == -1)

View File

@ -450,7 +450,7 @@ typedef struct manifest_signature_block_cache {
#define SIG_CACHE_SIZE 1024
manifest_signature_block_cache sig_cache[SIG_CACHE_SIZE];
int rhizome_manifest_lookup_signature_validity(const unsigned char *hash, const unsigned char *sig, int sig_len)
static int rhizome_manifest_lookup_signature_validity(const unsigned char *hash, const unsigned char *sig, int sig_len)
{
IN();
unsigned int slot=0;
@ -594,36 +594,39 @@ int rhizome_crypt_xor_block(unsigned char *buffer, size_t buffer_size, uint64_t
return 0;
}
/* If payload key is known, sets m->payloadKey and m->payloadNonce and returns 1.
* Otherwise, returns 0;
*/
int rhizome_derive_payload_key(rhizome_manifest *m)
{
// don't do anything if the manifest isn't flagged as being encrypted
if (m->payloadEncryption != PAYLOAD_ENCRYPTED)
return 0;
assert(m->payloadEncryption == PAYLOAD_ENCRYPTED);
if (m->has_sender && m->has_recipient){
unsigned char *nm_bytes=NULL;
unsigned cn=0, in=0, kp=0;
if (!keyring_find_sid(keyring, &cn, &in, &kp, &m->sender)){
cn=in=kp=0;
if (!keyring_find_sid(keyring, &cn, &in, &kp, &m->recipient)){
return WHYF("Neither the sender %s nor the recipient %s appears in our keyring",
WARNF("Neither sender=%s nor recipient=%s is in keyring",
alloca_tohex_sid_t(m->sender),
alloca_tohex_sid_t(m->recipient));
return 0;
}
nm_bytes=keyring_get_nm_bytes(&m->recipient, &m->sender);
}else{
nm_bytes=keyring_get_nm_bytes(&m->sender, &m->recipient);
}
if (!nm_bytes)
return -1;
assert(nm_bytes != NULL);
unsigned char hash[crypto_hash_sha512_BYTES];
crypto_hash_sha512(hash, nm_bytes, crypto_box_curve25519xsalsa20poly1305_BEFORENMBYTES);
bcopy(hash, m->payloadKey, RHIZOME_CRYPT_KEY_BYTES);
}else{
if (!m->haveSecret)
return WHY("Cannot derive payload key because bundle secret is unknown");
if (!m->haveSecret) {
WHY("Cannot derive payload key because bundle secret is unknown");
return 0;
}
unsigned char raw_key[9+crypto_sign_edwards25519sha512batch_SECRETKEYBYTES]="sasquatch";
bcopy(m->cryptoSignSecret, &raw_key[9], crypto_sign_edwards25519sha512batch_SECRETKEYBYTES);
@ -645,6 +648,6 @@ int rhizome_derive_payload_key(rhizome_manifest *m)
crypto_hash_sha512(hash, raw_nonce, sizeof(raw_nonce));
bcopy(hash, m->payloadNonce, sizeof(m->payloadNonce));
return 0;
return 1;
}

View File

@ -229,7 +229,7 @@ static int rhizome_direct_addfile_end(struct http_request *hr)
http_request_simple_response(&r->http, 500, "Internal Error: Malformed manifest template");
return 0;
}
if (rhizome_stat_payload_file(m, payload_path)) {
if (rhizome_stat_payload_file(m, payload_path) != RHIZOME_PAYLOAD_STATUS_NEW) {
WHY("Payload file stat failed");
rhizome_manifest_free(m);
rhizome_direct_clear_temporary_files(r);
@ -253,7 +253,7 @@ static int rhizome_direct_addfile_end(struct http_request *hr)
// TODO, stream file into database
assert(m->filesize != RHIZOME_SIZE_UNSET);
if (m->filesize > 0) {
if (rhizome_store_payload_file(m, payload_path)) {
if (rhizome_store_payload_file(m, payload_path) != RHIZOME_PAYLOAD_STATUS_NEW) {
rhizome_manifest_free(m);
rhizome_direct_clear_temporary_files(r);
http_request_simple_response(&r->http, 500, "Internal Error: Could not store file");
@ -726,8 +726,20 @@ void rhizome_direct_http_dispatch(rhizome_direct_sync_request *r)
struct rhizome_read read;
bzero(&read, sizeof read);
if (rhizome_open_read(&read, &filehash))
goto closeit;
enum rhizome_bundle_status pstatus = rhizome_open_read(&read, &filehash);
switch (pstatus) {
case RHIZOME_PAYLOAD_STATUS_EMPTY:
case RHIZOME_PAYLOAD_STATUS_STORED:
break;
case RHIZOME_PAYLOAD_STATUS_NEW:
case RHIZOME_PAYLOAD_STATUS_ERROR:
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
goto closeit;
default:
FATALF("pstatus = %d", pstatus);
}
uint64_t read_ofs;
for(read_ofs=0;read_ofs<m->filesize;){

View File

@ -89,7 +89,7 @@ struct rhizome_fetch_slot {
unsigned char mdpRXWindow[32*200];
};
static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot);
static enum rhizome_start_fetch_result rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot);
static int rhizome_fetch_mdp_requestblocks(struct rhizome_fetch_slot *slot);
/* Represents a queue of fetch candidates and a single active fetch for bundle payloads whose size
@ -498,8 +498,12 @@ static int rhizome_import_received_bundle(struct rhizome_manifest *m)
}
}
// begin fetching a bundle
static int schedule_fetch(struct rhizome_fetch_slot *slot)
/* Returns STARTED (0) if the fetch was started.
* Returns IMPORTED if the payload is already in the store.
* Returns -1 on error.
*/
static enum rhizome_start_fetch_result
schedule_fetch(struct rhizome_fetch_slot *slot)
{
IN();
int sock = -1;
@ -550,9 +554,27 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
if (strbuf_overrun(r))
RETURN(WHY("request overrun"));
slot->request_len = strbuf_len(r);
if (rhizome_open_write(&slot->write_state, &slot->manifest->filehash, slot->manifest->filesize, RHIZOME_PRIORITY_DEFAULT))
RETURN(-1);
enum rhizome_payload_status status = rhizome_open_write(&slot->write_state,
&slot->manifest->filehash,
slot->manifest->filesize,
RHIZOME_PRIORITY_DEFAULT);
switch (status) {
case RHIZOME_PAYLOAD_STATUS_EMPTY:
case RHIZOME_PAYLOAD_STATUS_STORED:
RETURN(IMPORTED);
case RHIZOME_PAYLOAD_STATUS_NEW:
break;
case RHIZOME_PAYLOAD_STATUS_ERROR:
RETURN(WHY("error writing new payload"));
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
RETURN(WHY("payload size does not match"));
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
RETURN(WHY("payload hash does not match"));
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
RETURN(WHY("payload cannot be encrypted"));
default:
FATALF("status = %d", status);
}
} else {
strbuf r = strbuf_local(slot->request, sizeof slot->request);
strbuf_sprintf(r, "GET /rhizome/manifestbyprefix/%s HTTP/1.0\r\n\r\n", alloca_tohex(slot->bid.binary, slot->prefix_length));
@ -610,14 +632,15 @@ static int schedule_fetch(struct rhizome_fetch_slot *slot)
slot->alarm.alarm = gettime_ms() + config.rhizome.idle_timeout;
slot->alarm.deadline = slot->alarm.alarm + config.rhizome.idle_timeout;
schedule(&slot->alarm);
RETURN(0);
RETURN(STARTED);
}
enum rhizome_start_fetch_result result;
bail_http:
/* Fetch via overlay, either because no IP address was provided, or because
the connection/attempt to fetch via HTTP failed. */
rhizome_fetch_switch_to_mdp(slot);
RETURN(0);
result = rhizome_fetch_switch_to_mdp(slot);
RETURN(result);
OUT();
}
@ -743,23 +766,20 @@ rhizome_fetch(struct rhizome_fetch_slot *slot, rhizome_manifest *m, const struct
if (config.debug.rhizome_rx)
DEBUGF(" is new");
// If the payload is already available, no need to fetch, so import now.
if (rhizome_exists(&m->filehash)){
if (config.debug.rhizome_rx)
DEBUGF(" fetch not started - payload already present, so importing instead");
if (rhizome_add_manifest(m, NULL) == -1)
RETURN(WHY("add manifest failed"));
RETURN(IMPORTED);
}
/* Prepare for fetching */
slot->peer_ipandport = *peerip;
slot->peer_sid = *peersidp;
slot->manifest = m;
if (schedule_fetch(slot) == -1)
RETURN(-1);
RETURN(STARTED);
enum rhizome_start_fetch_result result = schedule_fetch(slot);
// If the payload is already available, no need to fetch, so import now.
if (result == IMPORTED) {
if (config.debug.rhizome_rx)
DEBUGF(" fetch not started - payload already present, so importing instead");
if (rhizome_add_manifest(m, NULL) == -1)
RETURN(WHY("add manifest failed"));
}
RETURN(result);
}
/* Returns STARTED (0) if the fetch was started.
@ -788,10 +808,7 @@ rhizome_fetch_request_manifest_by_prefix(const struct sockaddr_in *peerip,
for inserting into the database, but we can avoid the temporary file in
the process. */
if (schedule_fetch(slot) == -1) {
return -1;
}
return STARTED;
return schedule_fetch(slot);
}
/* Activate the next fetch for the given slot. This takes the next job from the head of the slot's
@ -1017,7 +1034,7 @@ int rhizome_suggest_queue_manifest_import(rhizome_manifest *m, const struct sock
OUT();
}
static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
static void rhizome_fetch_close(struct rhizome_fetch_slot *slot)
{
if (config.debug.rhizome_rx)
DEBUGF("close Rhizome fetch slot=%d", slotno(slot));
@ -1049,8 +1066,6 @@ static int rhizome_fetch_close(struct rhizome_fetch_slot *slot)
// Activate the next queued fetch that is eligible for this slot. Try starting candidates from
// all queues with the same or smaller size thresholds until the slot is taken.
rhizome_start_next_queued_fetch(slot);
return 0;
}
static void rhizome_fetch_mdp_slot_callback(struct sched_ent *alarm)
@ -1188,7 +1203,7 @@ static int pipe_journal(struct rhizome_fetch_slot *slot){
return 0;
}
static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
static enum rhizome_start_fetch_result rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
{
/* In Rhizome Direct we use the same fetch slot system, but we aren't actually
a running servald instance, so we cannot fall back to MDP. This is detected
@ -1201,11 +1216,13 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
*/
IN();
if (!is_rhizome_mdp_enabled()){
RETURN(rhizome_fetch_close(slot));
rhizome_fetch_close(slot);
RETURN(-1);
}
if (!my_subscriber) {
DEBUGF("I don't have an identity, so we cannot fall back to MDP");
RETURN(rhizome_fetch_close(slot));
rhizome_fetch_close(slot);
RETURN(-1);
}
if (config.debug.rhizome_rx)
@ -1255,7 +1272,7 @@ static int rhizome_fetch_switch_to_mdp(struct rhizome_fetch_slot *slot)
slot->mdpRXBlockLength = config.rhizome.rhizome_mdp_block_size; // Rhizome over MDP block size
rhizome_fetch_mdp_requestblocks(slot);
RETURN(0);
RETURN(STARTED);
OUT();
}
@ -1291,7 +1308,7 @@ void rhizome_fetch_write(struct rhizome_fetch_slot *slot)
return;
}
int rhizome_write_complete(struct rhizome_fetch_slot *slot)
static int rhizome_write_complete(struct rhizome_fetch_slot *slot)
{
IN();
@ -1303,7 +1320,8 @@ int rhizome_write_complete(struct rhizome_fetch_slot *slot)
if (config.debug.rhizome_rx)
DEBUGF("Received all of file via rhizome -- now to import it");
if (rhizome_finish_write(&slot->write_state)){
enum rhizome_payload_status status = rhizome_finish_write(&slot->write_state);
if (status != RHIZOME_PAYLOAD_STATUS_EMPTY && status != RHIZOME_PAYLOAD_STATUS_NEW) {
rhizome_fetch_close(slot);
RETURN(-1);
}
@ -1461,7 +1479,7 @@ int rhizome_received_content(const unsigned char *bidprefix,
}
if (m){
if (rhizome_import_buffer(m, bytes, count) >= 0){
if (rhizome_import_buffer(m, bytes, count) == RHIZOME_PAYLOAD_STATUS_NEW) {
INFOF("Completed MDP transfer in one hit for file %s",
alloca_tohex_rhizome_filehash_t(m->filehash));
rhizome_import_received_bundle(m);

View File

@ -745,22 +745,42 @@ static int rhizome_response_content_init_read_state(rhizome_http_request *r)
static int rhizome_response_content_init_filehash(rhizome_http_request *r, const rhizome_filehash_t *hash)
{
bzero(&r->u.read_state, sizeof r->u.read_state);
int n = rhizome_open_read(&r->u.read_state, hash);
if (n == -1)
return -1;
if (n != 0)
return 404;
enum rhizome_payload_status status = rhizome_open_read(&r->u.read_state, hash);
switch (status) {
case RHIZOME_PAYLOAD_STATUS_EMPTY:
case RHIZOME_PAYLOAD_STATUS_STORED:
break;
case RHIZOME_PAYLOAD_STATUS_NEW:
return 404;
case RHIZOME_PAYLOAD_STATUS_ERROR:
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
return -1;
default:
FATALF("status = %d", status);
}
return rhizome_response_content_init_read_state(r);
}
static int rhizome_response_content_init_payload(rhizome_http_request *r, rhizome_manifest *m)
{
bzero(&r->u.read_state, sizeof r->u.read_state);
int n = rhizome_open_decrypt_read(m, &r->u.read_state);
if (n == -1)
return -1;
if (n != 0)
return 404;
enum rhizome_payload_status status = rhizome_open_decrypt_read(m, &r->u.read_state);
switch (status) {
case RHIZOME_PAYLOAD_STATUS_EMPTY:
case RHIZOME_PAYLOAD_STATUS_STORED:
break;
case RHIZOME_PAYLOAD_STATUS_NEW:
return 404;
case RHIZOME_PAYLOAD_STATUS_ERROR:
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
return -1;
default:
FATALF("status = %d", status);
}
return rhizome_response_content_init_read_state(r);
}

View File

@ -33,13 +33,16 @@ int rhizome_exists(const rhizome_filehash_t *hashp)
return gotfile;
}
int rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *expectedHashp, uint64_t file_length, int priority)
enum rhizome_payload_status rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *expectedHashp, uint64_t file_length, int priority)
{
assert(file_length != RHIZOME_SIZE_UNSET);
assert(file_length != 0);
write->blob_fd=-1;
if (expectedHashp){
if (rhizome_exists(expectedHashp))
return 1;
return RHIZOME_PAYLOAD_STATUS_STORED;
write->id = *expectedHashp;
write->id_known=1;
}else{
@ -54,8 +57,10 @@ int rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *ex
sqlite_retry_state retry = SQLITE_RETRY_STATE_DEFAULT;
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1)
return WHY("Failed to begin transaction");
if (sqlite_exec_void_retry(&retry, "BEGIN TRANSACTION;", END) == -1) {
WHY("Failed to begin transaction");
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
/*
we have to write incrementally so that we can handle blobs larger than available memory.
@ -116,9 +121,10 @@ int rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *ex
if (!sqlite_code_ok(stepcode)){
insert_row_fail:
WHYF("Failed to insert row for id='%"PRId64"'", write->temp_id);
if (statement) sqlite3_finalize(statement);
if (statement)
sqlite3_finalize(statement);
sqlite_exec_void_retry(&retry, "ROLLBACK;", END);
return -1;
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
sqlite3_finalize(statement);
statement=NULL;
@ -138,7 +144,7 @@ int rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *ex
write->blob_fd=-1;
unlink(blob_path);
}
return -1;
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
write->file_length = file_length;
@ -147,7 +153,7 @@ int rhizome_open_write(struct rhizome_write *write, const rhizome_filehash_t *ex
SHA512_Init(&write->sha512_context);
return 0;
return RHIZOME_PAYLOAD_STATUS_NEW;
}
/* blob_open / close will lock the database, this is bad for other processes that might attempt to
@ -259,7 +265,8 @@ static int write_data(struct rhizome_write *write_state, uint64_t file_offset, u
}
// close database locks
static int write_release_lock(struct rhizome_write *write_state){
static int write_release_lock(struct rhizome_write *write_state)
{
int ret=0;
if (write_state->blob_fd != -1)
return 0;
@ -284,7 +291,7 @@ int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uns
{
if (config.debug.rhizome) {
DEBUGF("write_state->file_length=%"PRIu64" offset=%"PRIu64, write_state->file_length, offset);
//dump("buffer", buffer, data_size);
dump("buffer", buffer, data_size);
}
if (offset + data_size > write_state->file_length)
data_size = write_state->file_length - offset;
@ -292,10 +299,11 @@ int rhizome_random_write(struct rhizome_write *write_state, uint64_t offset, uns
struct rhizome_write_buffer **ptr = &write_state->buffer_list;
int ret=0;
int should_write = 0;
// if we are writing to a file, or already have the sql blob open, write as much as we can.
if (write_state->blob_fd != -1 || write_state->sql_blob){
// if we are writing to a file, or already have the sql blob open, or are finishing, write as much
// as we can.
if (write_state->blob_fd != -1 || write_state->sql_blob || buffer == NULL)
should_write = 1;
}else{
else {
// cache up to RHIZOME_BUFFER_MAXIMUM_SIZE or file length before attempting to write everything in one go.
// (Not perfect if the range overlaps)
uint64_t new_size = write_state->written_offset + write_state->buffer_size + data_size;
@ -416,7 +424,8 @@ int rhizome_write_buffer(struct rhizome_write *write_state, unsigned char *buffe
}
/* Expects file to be at least file_length in size, ignoring anything longer than that */
int rhizome_write_file(struct rhizome_write *write, const char *filename){
int rhizome_write_file(struct rhizome_write *write, const char *filename)
{
FILE *f = fopen(filename, "r");
if (!f)
return WHY_perror("fopen");
@ -447,7 +456,7 @@ end:
return ret;
}
int rhizome_fail_write(struct rhizome_write *write)
void rhizome_fail_write(struct rhizome_write *write)
{
if (write->blob_fd != -1){
if (config.debug.externalblobs)
@ -462,36 +471,42 @@ int rhizome_fail_write(struct rhizome_write *write)
free(n);
}
rhizome_delete_file(&write->id);
return 0;
}
int rhizome_finish_write(struct rhizome_write *write)
enum rhizome_payload_status rhizome_finish_write(struct rhizome_write *write)
{
int ret = -1;
if (write->blob_rowid==0 && write->blob_fd == -1)
return WHY("Can't finish a write that has already been closed");
assert(write->blob_rowid != 0 || write->blob_fd != -1);
enum rhizome_payload_status status = RHIZOME_PAYLOAD_STATUS_NEW;
if (write->buffer_list){
if (rhizome_random_write(write, 0, NULL, 0))
if (rhizome_random_write(write, 0, NULL, 0)) {
status = RHIZOME_PAYLOAD_STATUS_ERROR;
goto failure;
if (write->buffer_list){
}
if (write->buffer_list) {
WHYF("Buffer was not cleared");
status = RHIZOME_PAYLOAD_STATUS_ERROR;
goto failure;
}
}
if (write->file_offset < write->file_length){
WHYF("Only processed %"PRIu64" bytes, expected %"PRIu64, write->file_offset, write->file_length);
}
int fd = write->blob_fd;
if (fd>=0){
if (config.debug.externalblobs)
DEBUGF("Closing fd %d", fd);
close(fd);
write->blob_fd=-1;
}
if (write_release_lock(write))
assert(write->file_offset <= write->file_length);
if (write->file_offset < write->file_length) {
WHYF("Only wrote %"PRIu64" bytes, expected %"PRIu64, write->file_offset, write->file_length);
status = RHIZOME_PAYLOAD_STATUS_WRONG_SIZE;
goto failure;
}
int external = 0;
if (write->blob_fd != -1){
external = 1;
if (config.debug.externalblobs)
DEBUGF("Closing fd=%d", write->blob_fd);
close(write->blob_fd);
write->blob_fd = -1;
}
if (write_release_lock(write)) {
status = RHIZOME_PAYLOAD_STATUS_ERROR;
goto failure;
}
rhizome_filehash_t hash_out;
SHA512_Final(hash_out.binary, &write->sha512_context);
@ -500,7 +515,7 @@ int rhizome_finish_write(struct rhizome_write *write)
if (write->id_known) {
if (cmp_rhizome_filehash_t(&write->id, &hash_out) != 0) {
WARNF("expected filehash=%s, got %s", alloca_tohex_rhizome_filehash_t(write->id), alloca_tohex_rhizome_filehash_t(hash_out));
ret = 1;
status = RHIZOME_PAYLOAD_STATUS_WRONG_HASH;
goto failure;
}
} else {
@ -534,7 +549,7 @@ int rhizome_finish_write(struct rhizome_write *write)
)
goto dbfailure;
if (fd>=0){
if (external) {
char blob_path[1024];
char dest_path[1024];
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, "%"PRId64, write->temp_id)){
@ -545,14 +560,12 @@ int rhizome_finish_write(struct rhizome_write *write)
WHYF("Failed to generate file path");
goto dbfailure;
}
if (rename(blob_path, dest_path) == -1) {
WHYF_perror("rename(%s, %s)", blob_path, dest_path);
goto dbfailure;
}
if (config.debug.externalblobs)
DEBUGF("Renamed %s to %s", blob_path, dest_path);
}else{
if (sqlite_exec_void_retry(
&retry,
@ -570,13 +583,14 @@ int rhizome_finish_write(struct rhizome_write *write)
DEBUGF("Stored file %s", alloca_tohex_rhizome_filehash_t(write->id));
}
write->blob_rowid = 0;
return 0;
return status;
dbfailure:
sqlite_exec_void_retry(&retry, "ROLLBACK;", END);
status = RHIZOME_PAYLOAD_STATUS_ERROR;
failure:
rhizome_fail_write(write);
return ret;
return status;
}
/* Import the payload for an existing manifest with a known file size and hash. Compute the hash of
@ -585,51 +599,53 @@ failure:
* match, return 1. If there is an error reading the payload file or writing to the database,
* return -1.
*/
int rhizome_import_payload_from_file(rhizome_manifest *m, const char *filepath)
enum rhizome_payload_status rhizome_import_payload_from_file(rhizome_manifest *m, const char *filepath)
{
assert(m->filesize != RHIZOME_SIZE_UNSET);
if (m->filesize == 0)
return 0;
return RHIZOME_PAYLOAD_STATUS_EMPTY;
/* Import the file first, checking the hash as we go */
struct rhizome_write write;
bzero(&write, sizeof(write));
int ret=rhizome_open_write(&write, &m->filehash, m->filesize, RHIZOME_PRIORITY_DEFAULT);
if (ret!=0)
return ret;
enum rhizome_payload_status status = rhizome_open_write(&write, &m->filehash, m->filesize, RHIZOME_PRIORITY_DEFAULT);
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
return status;
// file payload is not in the store yet
if (rhizome_write_file(&write, filepath)){
rhizome_fail_write(&write);
return -1;
return RHIZOME_BUNDLE_STATUS_ERROR;
}
return rhizome_finish_write(&write);
}
// store a whole payload from a single buffer
int rhizome_import_buffer(rhizome_manifest *m, unsigned char *buffer, size_t length)
enum rhizome_payload_status rhizome_import_buffer(rhizome_manifest *m, unsigned char *buffer, size_t length)
{
assert(m->filesize != RHIZOME_SIZE_UNSET);
if (m->filesize == 0)
return 0;
return RHIZOME_PAYLOAD_STATUS_EMPTY;
if (length != m->filesize)
return WHYF("Expected %"PRIu64" bytes, got %zu", m->filesize, length);
if (length != m->filesize) {
WHYF("Expected %"PRIu64" bytes, got %zu", m->filesize, length);
return RHIZOME_PAYLOAD_STATUS_WRONG_SIZE;
}
/* Import the file first, checking the hash as we go */
struct rhizome_write write;
bzero(&write, sizeof(write));
int ret=rhizome_open_write(&write, &m->filehash, m->filesize, RHIZOME_PRIORITY_DEFAULT);
if (ret!=0)
return ret;
enum rhizome_payload_status status = rhizome_open_write(&write, &m->filehash, m->filesize, RHIZOME_PRIORITY_DEFAULT);
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
return status;
// file payload is not in the store yet
if (rhizome_write_buffer(&write, buffer, length)){
rhizome_fail_write(&write);
return -1;
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
return rhizome_finish_write(&write);
@ -642,13 +658,15 @@ int rhizome_import_buffer(rhizome_manifest *m, unsigned char *buffer, size_t len
* the file size does not match the manifest's 'filesize', returns 1. If there is an error calling
* stat(2) on the payload file (eg, file does not exist), returns -1.
*/
int rhizome_stat_payload_file(rhizome_manifest *m, const char *filepath)
enum rhizome_payload_status rhizome_stat_payload_file(rhizome_manifest *m, const char *filepath)
{
uint64_t size = 0;
if (filepath[0]) {
struct stat stat;
if (lstat(filepath, &stat))
return WHYF_perror("lstat(%s)", alloca_str_toprint(filepath));
if (lstat(filepath, &stat)) {
WHYF_perror("lstat(%s)", alloca_str_toprint(filepath));
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
size = stat.st_size;
}
if (m->filesize == RHIZOME_SIZE_UNSET)
@ -657,19 +675,20 @@ int rhizome_stat_payload_file(rhizome_manifest *m, const char *filepath)
if (config.debug.rhizome)
DEBUGF("payload file %s (size=%"PRIu64") does not match manifest[%d].filesize=%"PRIu64,
alloca_str_toprint(filepath), size, m->manifest_record_number, m->filesize);
return 1;
return RHIZOME_PAYLOAD_STATUS_WRONG_SIZE;
}
return 0;
return size ? RHIZOME_PAYLOAD_STATUS_NEW : RHIZOME_PAYLOAD_STATUS_EMPTY;
}
static int rhizome_write_derive_key(rhizome_manifest *m, struct rhizome_write *write)
static enum rhizome_bundle_status rhizome_write_derive_key(rhizome_manifest *m, struct rhizome_write *write)
{
if (m->payloadEncryption != PAYLOAD_ENCRYPTED)
return 0;
// if the manifest specifies encryption, make sure we can generate the payload key and encrypt the contents as we go
if (rhizome_derive_payload_key(m))
return -1;
return RHIZOME_PAYLOAD_STATUS_NEW;
// if the manifest specifies encryption, make sure we can generate the payload key and encrypt the
// contents as we go
if (!rhizome_derive_payload_key(m))
return RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL;
if (config.debug.rhizome)
DEBUGF("Encrypting payload contents for %s, %"PRIu64, alloca_tohex_rhizome_bid_t(m->cryptoSignPublic), m->version);
@ -680,50 +699,75 @@ static int rhizome_write_derive_key(rhizome_manifest *m, struct rhizome_write *w
bcopy(m->payloadKey, write->key, sizeof(write->key));
bcopy(m->payloadNonce, write->nonce, sizeof(write->nonce));
return 0;
return RHIZOME_PAYLOAD_STATUS_NEW;
}
int rhizome_write_open_manifest(struct rhizome_write *write, rhizome_manifest *m)
enum rhizome_payload_status rhizome_write_open_manifest(struct rhizome_write *write, rhizome_manifest *m)
{
if (rhizome_open_write(
enum rhizome_payload_status status = rhizome_open_write(
write,
m->has_filehash ? &m->filehash : NULL,
m->filesize,
RHIZOME_PRIORITY_DEFAULT
)
)
return -1;
if (rhizome_write_derive_key(m, write))
return -1;
return 0;
);
if (status == RHIZOME_PAYLOAD_STATUS_NEW)
status = rhizome_write_derive_key(m, write);
return status;
}
// import a file for a new bundle with an unknown file hash
// update the manifest with the details of the file
int rhizome_store_payload_file(rhizome_manifest *m, const char *filepath)
enum rhizome_payload_status rhizome_store_payload_file(rhizome_manifest *m, const char *filepath)
{
// Stream the file directly into the database, encrypting & hashing as we go.
struct rhizome_write write;
bzero(&write, sizeof(write));
if ( rhizome_write_open_manifest(&write, m)
|| rhizome_write_file(&write, filepath)
) {
enum rhizome_payload_status status = rhizome_write_open_manifest(&write, m);
switch (status) {
case RHIZOME_PAYLOAD_STATUS_EMPTY:
case RHIZOME_PAYLOAD_STATUS_NEW:
break;
case RHIZOME_PAYLOAD_STATUS_STORED:
case RHIZOME_PAYLOAD_STATUS_ERROR:
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
return status;
default:
FATALF("status = %d", status);
}
if (rhizome_write_file(&write, filepath) == -1) {
rhizome_fail_write(&write);
return -1;
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
int ret = rhizome_finish_write(&write);
if (ret == 0) {
if (m->has_filehash)
assert(cmp_rhizome_filehash_t(&m->filehash, &write.id) == 0);
else
rhizome_manifest_set_filehash(m, &write.id);
status = rhizome_finish_write(&write);
switch (status) {
case RHIZOME_PAYLOAD_STATUS_EMPTY:
assert(write.file_length == 0);
assert(m->filesize == 0);
break;
case RHIZOME_PAYLOAD_STATUS_NEW:
assert(m->filesize == write.file_length);
if (m->has_filehash)
assert(cmp_rhizome_filehash_t(&m->filehash, &write.id) == 0);
else
rhizome_manifest_set_filehash(m, &write.id);
break;
case RHIZOME_PAYLOAD_STATUS_ERROR:
case RHIZOME_PAYLOAD_STATUS_STORED:
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
break;
default:
FATALF("status = %d", status);
}
return ret;
return status;
}
/* Return -1 on error, 0 if file blob found, 1 if not found.
/* Return RHIZOME_PAYLOAD_STATUS_STORED if file blob found, RHIZOME_PAYLOAD_STATUS_NEW if not found.
*/
int rhizome_open_read(struct rhizome_read *read, const rhizome_filehash_t *hashp)
enum rhizome_payload_status rhizome_open_read(struct rhizome_read *read, const rhizome_filehash_t *hashp)
{
read->id = *hashp;
read->blob_rowid = 0;
@ -734,26 +778,29 @@ int rhizome_open_read(struct rhizome_read *read, const rhizome_filehash_t *hashp
"WHERE FILEBLOBS.id = FILES.id"
" AND FILES.id = ?"
" AND FILES.datavalid != 0", RHIZOME_FILEHASH_T, &read->id, END) == -1)
return -1;
return RHIZOME_PAYLOAD_STATUS_ERROR;
if (read->blob_rowid != 0) {
read->length = RHIZOME_SIZE_UNSET; // discover the length on opening the db BLOB
} else {
// No row in FILEBLOBS, look for an external blob file.
char blob_path[1024];
if (!FORM_RHIZOME_DATASTORE_PATH(blob_path, alloca_tohex_rhizome_filehash_t(read->id)))
return -1;
return RHIZOME_PAYLOAD_STATUS_ERROR;
read->blob_fd = open(blob_path, O_RDONLY);
if (read->blob_fd == -1) {
if (errno == ENOENT) {
if (config.debug.externalblobs)
DEBUGF("Stored file does not exist: %s", blob_path);
return 1; // file not available
return RHIZOME_PAYLOAD_STATUS_NEW;
}
return WHYF_perror("open(%s)", alloca_str_toprint(blob_path));
WHYF_perror("open(%s)", alloca_str_toprint(blob_path));
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
off64_t pos = lseek64(read->blob_fd, 0, SEEK_END);
if (pos == -1)
return WHYF_perror("lseek64(%s,0,SEEK_END)", alloca_str_toprint(blob_path));
if (pos == -1) {
WHYF_perror("lseek64(%s,0,SEEK_END)", alloca_str_toprint(blob_path));
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
read->length = pos;
if (config.debug.externalblobs)
DEBUGF("Opened stored file %s as fd %d, len %"PRIx64, blob_path, read->blob_fd, read->length);
@ -761,7 +808,7 @@ int rhizome_open_read(struct rhizome_read *read, const rhizome_filehash_t *hashp
read->offset = 0;
read->hash_offset = 0;
SHA512_Init(&read->sha512_context);
return 0; // file opened
return RHIZOME_PAYLOAD_STATUS_STORED;
}
static ssize_t rhizome_read_retry(sqlite_retry_state *retry, struct rhizome_read *read_state, unsigned char *buffer, size_t bufsz)
@ -929,19 +976,18 @@ ssize_t rhizome_read_buffered(struct rhizome_read *read, struct rhizome_read_buf
return bytes_copied;
}
int rhizome_read_close(struct rhizome_read *read)
void rhizome_read_close(struct rhizome_read *read)
{
if (read->blob_fd >=0){
if (read->blob_fd != -1) {
if (config.debug.externalblobs)
DEBUGF("Closing store fd %d", read->blob_fd);
close(read->blob_fd);
read->blob_fd = -1;
}
read->blob_fd = -1;
if (read->invalid){
if (read->invalid) {
// delete payload!
rhizome_delete_file(&read->id);
}
return 0;
}
struct cache_entry{
@ -1045,7 +1091,7 @@ int rhizome_cache_count()
}
// read a block of data, caching meta data for reuse
int rhizome_read_cached(const rhizome_bid_t *bidp, uint64_t version, time_ms_t timeout, uint64_t fileOffset, unsigned char *buffer, size_t length)
ssize_t rhizome_read_cached(const rhizome_bid_t *bidp, uint64_t version, time_ms_t timeout, uint64_t fileOffset, unsigned char *buffer, size_t length)
{
// look for a cached entry
struct cache_entry **ptr = find_entry_location(&root, bidp, version);
@ -1057,9 +1103,24 @@ int rhizome_read_cached(const rhizome_bid_t *bidp, uint64_t version, time_ms_t t
if (rhizome_database_filehash_from_id(bidp, version, &filehash) == -1)
return -1;
entry = emalloc_zero(sizeof(struct cache_entry));
if (rhizome_open_read(&entry->read_state, &filehash)){
free(entry);
return WHYF("Payload %s not found", alloca_tohex_rhizome_filehash_t(filehash));
if (entry == NULL)
return -1;
enum rhizome_payload_status status = rhizome_open_read(&entry->read_state, &filehash);
switch (status) {
case RHIZOME_PAYLOAD_STATUS_EMPTY:
case RHIZOME_PAYLOAD_STATUS_STORED:
break;
case RHIZOME_PAYLOAD_STATUS_NEW:
free(entry);
return WHYF("Payload %s not found", alloca_tohex_rhizome_filehash_t(filehash));
case RHIZOME_PAYLOAD_STATUS_ERROR:
case RHIZOME_PAYLOAD_STATUS_WRONG_SIZE:
case RHIZOME_PAYLOAD_STATUS_WRONG_HASH:
case RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL:
free(entry);
return WHYF("Error opening payload %s", alloca_tohex_rhizome_filehash_t(filehash));
default:
FATALF("status = %d", status);
}
entry->bundle_id = *bidp;
entry->version = version;
@ -1115,15 +1176,16 @@ static int write_file(struct rhizome_read *read, const char *filepath){
return ret;
}
static int read_derive_key(rhizome_manifest *m, struct rhizome_read *read_state)
static enum rhizome_payload_status read_derive_key(rhizome_manifest *m, struct rhizome_read *read_state)
{
read_state->crypt = m->payloadEncryption == PAYLOAD_ENCRYPTED;
if (read_state->crypt){
// if the manifest specifies encryption, make sure we can generate the payload key and encrypt
// the contents as we go
if (rhizome_derive_payload_key(m)) {
if (!rhizome_derive_payload_key(m)) {
rhizome_read_close(read_state);
return WHY("Unable to decrypt bundle, valid key not found");
WHY("Unable to decrypt bundle, valid key not found");
return RHIZOME_PAYLOAD_STATUS_CRYPTO_FAIL;
}
if (config.debug.rhizome)
DEBUGF("Decrypting payload contents for bid=%s version=%"PRIu64, alloca_tohex_rhizome_bid_t(m->cryptoSignPublic), m->version);
@ -1132,51 +1194,48 @@ static int read_derive_key(rhizome_manifest *m, struct rhizome_read *read_state)
bcopy(m->payloadKey, read_state->key, sizeof(read_state->key));
bcopy(m->payloadNonce, read_state->nonce, sizeof(read_state->nonce));
}
return 0;
return RHIZOME_PAYLOAD_STATUS_STORED;
}
int rhizome_open_decrypt_read(rhizome_manifest *m, struct rhizome_read *read_state)
enum rhizome_payload_status rhizome_open_decrypt_read(rhizome_manifest *m, struct rhizome_read *read_state)
{
int ret = rhizome_open_read(read_state, &m->filehash);
if (ret == 0)
ret = read_derive_key(m, read_state);
return ret;
enum rhizome_payload_status status = rhizome_open_read(read_state, &m->filehash);
if (status == RHIZOME_PAYLOAD_STATUS_STORED)
status = read_derive_key(m, read_state);
return status;
}
/* Extract the file related to a manifest to the file system. The file will be de-crypted and
* verified while reading. If filepath is not supplied, the file will still be checked.
*
* Returns -1 on error, 0 if extracted successfully, 1 if not found.
*/
int rhizome_extract_file(rhizome_manifest *m, const char *filepath)
enum rhizome_payload_status rhizome_extract_file(rhizome_manifest *m, const char *filepath)
{
struct rhizome_read read_state;
bzero(&read_state, sizeof read_state);
int ret = rhizome_open_decrypt_read(m, &read_state);
if (ret == 0)
ret = write_file(&read_state, filepath);
enum rhizome_payload_status status = rhizome_open_decrypt_read(m, &read_state);
if (status == RHIZOME_PAYLOAD_STATUS_STORED) {
if (write_file(&read_state, filepath) == -1)
status = RHIZOME_PAYLOAD_STATUS_ERROR;
}
rhizome_read_close(&read_state);
return ret;
return status;
}
/* dump the raw contents of a file
*
* Returns -1 on error, 0 if dumped successfully, 1 if not found.
*/
int rhizome_dump_file(const rhizome_filehash_t *hashp, const char *filepath, uint64_t *lengthp)
enum rhizome_payload_status rhizome_dump_file(const rhizome_filehash_t *hashp, const char *filepath, uint64_t *lengthp)
{
struct rhizome_read read_state;
bzero(&read_state, sizeof read_state);
int ret = rhizome_open_read(&read_state, hashp);
if (ret == 0) {
ret = write_file(&read_state, filepath);
if (lengthp)
enum rhizome_payload_status status = rhizome_open_read(&read_state, hashp);
if (status == RHIZOME_PAYLOAD_STATUS_STORED) {
if (write_file(&read_state, filepath) == -1)
status = RHIZOME_PAYLOAD_STATUS_ERROR;
else if (lengthp)
*lengthp = read_state.length;
}
rhizome_read_close(&read_state);
return ret;
return status;
}
// pipe data from one payload to another
@ -1205,116 +1264,85 @@ static int rhizome_pipe(struct rhizome_read *read, struct rhizome_write *write,
return 0;
}
int rhizome_journal_pipe(struct rhizome_write *write, const rhizome_filehash_t *hashp, uint64_t start_offset, uint64_t length)
enum rhizome_payload_status rhizome_journal_pipe(struct rhizome_write *write, const rhizome_filehash_t *hashp, uint64_t start_offset, uint64_t length)
{
struct rhizome_read read_state;
bzero(&read_state, sizeof read_state);
if (rhizome_open_read(&read_state, hashp))
return -1;
read_state.offset = start_offset;
int ret = rhizome_pipe(&read_state, write, length);
enum rhizome_payload_status status = rhizome_open_read(&read_state, hashp);
if (status == RHIZOME_PAYLOAD_STATUS_STORED) {
read_state.offset = start_offset;
if (rhizome_pipe(&read_state, write, length) == -1)
status = RHIZOME_PAYLOAD_STATUS_ERROR;
}
rhizome_read_close(&read_state);
return ret;
return status;
}
// open an existing journal bundle, advance the head pointer, duplicate the existing content and get ready to add more.
int rhizome_write_open_journal(struct rhizome_write *write, rhizome_manifest *m, uint64_t advance_by, uint64_t new_size)
enum rhizome_payload_status rhizome_write_open_journal(struct rhizome_write *write, rhizome_manifest *m, uint64_t advance_by, uint64_t new_size)
{
int ret = 0;
assert(m->filesize != RHIZOME_SIZE_UNSET);
assert(m->filesize + new_size > 0);
assert(m->is_journal);
if (advance_by > m->filesize)
return WHY("Cannot advance past the existing content");
assert(advance_by <= m->filesize);
uint64_t copy_length = m->filesize - advance_by;
rhizome_manifest_set_filesize(m, m->filesize + new_size - advance_by);
if (advance_by > 0)
rhizome_manifest_set_tail(m, m->tail + advance_by);
rhizome_manifest_set_version(m, m->filesize);
ret = rhizome_open_write(write, NULL, m->filesize, RHIZOME_PRIORITY_DEFAULT);
if (ret)
goto failure;
if (copy_length>0){
enum rhizome_payload_status status = rhizome_open_write(write, NULL, m->filesize, RHIZOME_PRIORITY_DEFAULT);
if (status == RHIZOME_PAYLOAD_STATUS_NEW && copy_length > 0) {
// note that we don't need to bother decrypting the existing journal payload
ret = rhizome_journal_pipe(write, &m->filehash, advance_by, copy_length);
if (ret)
goto failure;
enum rhizome_payload_status rstatus = rhizome_journal_pipe(write, &m->filehash, advance_by, copy_length);
if (rstatus != RHIZOME_PAYLOAD_STATUS_STORED)
status = RHIZOME_PAYLOAD_STATUS_ERROR;
}
ret = rhizome_write_derive_key(m, write);
if (ret)
goto failure;
return 0;
failure:
if (ret)
if (status == RHIZOME_PAYLOAD_STATUS_NEW)
status = rhizome_write_derive_key(m, write);
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
rhizome_fail_write(write);
return ret;
return status;
}
int rhizome_append_journal_buffer(rhizome_manifest *m, uint64_t advance_by, unsigned char *buffer, size_t len)
enum rhizome_payload_status rhizome_append_journal_buffer(rhizome_manifest *m, uint64_t advance_by, unsigned char *buffer, size_t len)
{
struct rhizome_write write;
bzero(&write, sizeof write);
int ret = rhizome_write_open_journal(&write, m, advance_by, (uint64_t) len);
if (ret)
return -1;
if (buffer && len){
ret = rhizome_write_buffer(&write, buffer, len);
if (ret)
goto failure;
}
ret = rhizome_finish_write(&write);
if (ret)
goto failure;
rhizome_manifest_set_filehash(m, &write.id);
return 0;
failure:
if (ret)
enum rhizome_payload_status status = rhizome_write_open_journal(&write, m, advance_by, (uint64_t) len);
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
return status;
if (buffer && len && rhizome_write_buffer(&write, buffer, len) == -1) {
rhizome_fail_write(&write);
return ret;
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
status = rhizome_finish_write(&write);
if (status != RHIZOME_PAYLOAD_STATUS_NEW) {
rhizome_fail_write(&write);
return status;
}
rhizome_manifest_set_filehash(m, &write.id);
return status;
}
int rhizome_append_journal_file(rhizome_manifest *m, uint64_t advance_by, const char *filename)
enum rhizome_payload_status rhizome_append_journal_file(rhizome_manifest *m, uint64_t advance_by, const char *filename)
{
struct stat stat;
if (lstat(filename,&stat))
return WHYF_perror("stat(%s)", alloca_str_toprint(filename));
struct rhizome_write write;
bzero(&write, sizeof write);
int ret = rhizome_write_open_journal(&write, m, advance_by, stat.st_size);
if (ret)
return -1;
if (stat.st_size){
ret = rhizome_write_file(&write, filename);
if (ret)
goto failure;
}
ret = rhizome_finish_write(&write);
if (ret)
goto failure;
rhizome_manifest_set_filehash(m, &write.id);
return 0;
failure:
if (ret)
enum rhizome_payload_status status = rhizome_write_open_journal(&write, m, advance_by, stat.st_size);
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
return status;
if (stat.st_size != 0 && rhizome_write_file(&write, filename) == -1) {
rhizome_fail_write(&write);
return ret;
return RHIZOME_PAYLOAD_STATUS_ERROR;
}
status = rhizome_finish_write(&write);
if (status != RHIZOME_PAYLOAD_STATUS_NEW) {
rhizome_fail_write(&write);
return status;
}
rhizome_manifest_set_filehash(m, &write.id);
return status;
}

View File

@ -33,6 +33,7 @@ setup_logging() {
set debug.meshms on \
set debug.rhizome on \
set debug.rhizome_manifest on \
set debug.externalblobs on \
set debug.rejecteddata on \
set log.console.level debug \
set log.console.show_time on
@ -72,6 +73,7 @@ test_MessageDelivery() {
tfw_log "CONV_BID=$CONV_BID CONV_SECRET=$CONV_SECRET"
# 5. mark the first message as read
executeOk_servald meshms read messages $SIDA2 $SIDA1 5
tfw_cat --stderr
check_meshms_bundles
executeOk_servald meshms list messages $SIDA2 $SIDA1
assertStdoutGrep --stdout --matches=1 "^0:19:<:How are you\$"
@ -99,14 +101,14 @@ check_meshms_bundles() {
# The only "file" bundle should be the conversation list
executeOk_servald rhizome list file
rhizome_list_unpack X
assert [ $XNROWS -eq 1 ]
assert [ ${XBID[0]} = $CONV_BID ]
assert --stdout --stderr [ $XNROWS -eq 1 ]
assert --stdout --stderr [ ${XBID[0]} = $CONV_BID ]
executeOk_servald rhizome extract bundle $CONV_BID manifest.conv payload.conv $CONV_SECRET
tfw_cat -v manifest.conv --hexdump payload.conv
# The only "MeshMS2" bundles should be the two ply bundles
executeOk_servald rhizome list MeshMS2
rhizome_list_unpack X
assert [ $XNROWS -eq 2 ]
assert --stdout [ $XNROWS -eq 2 ]
local bid
for bid in ${XBID[*]}; do
executeOk_servald rhizome extract bundle $bid manifest.$bid payload.$bid