Refactor ply append to open/write/close semantics

This commit is contained in:
Jeremy Lakeman 2017-02-13 10:55:21 +10:30
parent 5ccff053c2
commit ecc77b4088
2 changed files with 82 additions and 26 deletions

View File

@ -65,18 +65,24 @@ static int message_ply_fill_manifest(const keyring_identity *id, const sid_t *re
return ret;
}
int message_ply_append(const keyring_identity *id, const char *service, const sid_t *recipient, struct message_ply *ply, struct overlay_buffer *b,
const char *name, unsigned nassignments, const struct rhizome_manifest_field_assignment *assignments)
int message_ply_write_open(
struct message_ply_write *ply_write,
const struct keyring_identity *id,
const char *service,
const sid_t *recipient,
struct message_ply *ply,
const char *name,
unsigned nassignments,
const struct rhizome_manifest_field_assignment *assignments,
uint64_t advance_by)
{
rhizome_manifest *mout = NULL;
rhizome_manifest *m = rhizome_new_manifest();
if (!m)
bzero(ply_write, sizeof(*ply_write));
ply_write->m = rhizome_new_manifest();
if (!ply_write->m)
return -1;
int ret=-1;
if (ply->known_bid){
switch(message_ply_load_manifest(id, ply, m)){
switch(message_ply_load_manifest(id, ply, ply_write->m)){
case 0:
ply->found = 1;
break;
@ -84,50 +90,80 @@ int message_ply_append(const keyring_identity *id, const char *service, const si
ply->found = 0;
break;
default:
goto end;
return -1;
}
}
// TODO add sender name?
// if recipient, actual sender & name should be encrypted...
if (name)
rhizome_manifest_set_name(m, name);
rhizome_manifest_set_name(ply_write->m, name);
struct rhizome_bundle_result result = rhizome_apply_assignments(m, nassignments, assignments);
struct rhizome_bundle_result result = rhizome_apply_assignments(ply_write->m, nassignments, assignments);
if (result.status != RHIZOME_BUNDLE_STATUS_NEW){
WARNF("Cannot create message ply manifest: %s", alloca_rhizome_bundle_result(result));
rhizome_bundle_result_free(&result);
goto end;
return -1;
}
rhizome_bundle_result_free(&result);
if (!ply->found){
rhizome_manifest_set_service(m, service);
rhizome_manifest_set_service(ply_write->m, service);
if (ply->known_bid)
rhizome_manifest_set_id(m, &ply->bundle_id);
if (message_ply_fill_manifest(id, recipient, ply, m)!=0)
goto end;
rhizome_manifest_set_id(ply_write->m, &ply->bundle_id);
if (message_ply_fill_manifest(id, recipient, ply, ply_write->m)!=0)
return -1;
}
enum rhizome_payload_status pstatus = rhizome_append_journal_buffer(m, 0, ob_ptr(b), ob_position(b));
if (pstatus != RHIZOME_PAYLOAD_STATUS_NEW)
goto end;
enum rhizome_payload_status status = rhizome_write_open_journal(&ply_write->write, ply_write->m, advance_by, RHIZOME_SIZE_UNSET);
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
return -1;
result = rhizome_manifest_finalise(m, &mout, 1);
return 0;
}
int message_ply_write_finish(struct message_ply_write *ply_write)
{
enum rhizome_payload_status status = rhizome_finish_write(&ply_write->write);
status = rhizome_finish_store(&ply_write->write, ply_write->m, status);
if (status != RHIZOME_PAYLOAD_STATUS_NEW)
return -1;
rhizome_manifest *mout = NULL;
struct rhizome_bundle_result result = rhizome_manifest_finalise(ply_write->m, &mout, 1);
if (result.status != RHIZOME_BUNDLE_STATUS_NEW){
WARNF("Cannot create message ply manifest: %s", alloca_rhizome_bundle_result(result));
rhizome_bundle_result_free(&result);
goto end;
return -1;
}
rhizome_bundle_result_free(&result);
if (mout && mout!=ply_write->m){
rhizome_manifest_free(ply_write->m);
ply_write->m = mout;
}
return 0;
}
ret = 0;
void message_ply_write_close(struct message_ply_write *ply_write)
{
rhizome_fail_write(&ply_write->write);
if (ply_write->m)
rhizome_manifest_free(ply_write->m);
ply_write->m = NULL;
}
int message_ply_append(const keyring_identity *id, const char *service, const sid_t *recipient, struct message_ply *ply, struct overlay_buffer *b,
const char *name, unsigned nassignments, const struct rhizome_manifest_field_assignment *assignments)
{
struct message_ply_write write;
int ret=-1;
if (message_ply_write_open(&write, id, service, recipient, ply, name, nassignments, assignments, 0) == -1)
goto end;
if (rhizome_write_buffer(&write.write, ob_ptr(b), ob_position(b)) == -1)
goto end;
ret = message_ply_write_finish(&write);
end:
if (mout && mout!=m)
rhizome_manifest_free(mout);
if (m)
rhizome_manifest_free(m);
message_ply_write_close(&write);
return ret;
}

View File

@ -38,6 +38,12 @@ struct message_ply_read {
uint8_t *record;
};
struct message_ply_write{
rhizome_manifest *m;
struct rhizome_write write;
};
int message_ply_read_open(struct message_ply_read *ply, const rhizome_bid_t *bid);
void message_ply_read_close(struct message_ply_read *ply);
int message_ply_read_prev(struct message_ply_read *ply);
@ -45,6 +51,20 @@ int message_ply_find_prev(struct message_ply_read *ply, char type);
int message_ply_is_open(struct message_ply_read *ply);
void message_ply_read_rewind(struct message_ply_read *ply);
int message_ply_write_open(
struct message_ply_write *ply_write,
const struct keyring_identity *id,
const char *service,
const sid_t *recipient,
struct message_ply *ply,
const char *name,
unsigned nassignments,
const struct rhizome_manifest_field_assignment *assignments,
uint64_t advance_by);
int message_ply_write_finish(struct message_ply_write *write);
void message_ply_write_close(struct message_ply_write *write);
void message_ply_append_ack(struct overlay_buffer *b, uint64_t message_offset, uint64_t previous_ack_offset);
void message_ply_append_timestamp(struct overlay_buffer *b);
void message_ply_append_message(struct overlay_buffer *b, const char *message, size_t message_len);