Skip to content

keyhive_core: send doc initialization cgka ops to the listener #116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions beelay/beelay-core/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ impl Driver {
}

pub(crate) fn step(&mut self, now: UnixTimestampMillis) -> EventResults {
if self.tx_commands.is_closed() {
let mut result = EventResults::default();
result.stopped = true;
return result;
}
*self.now.borrow_mut() = now;
self.executor.run_until_stalled();

Expand Down Expand Up @@ -146,6 +151,10 @@ impl Driver {
}
}

if self.tx_commands.is_closed() {
event_results.stopped = true;
}

event_results
}
}
Expand Down Expand Up @@ -303,6 +312,7 @@ async fn run_inner<R: rand::Rng + rand::CryptoRng + Clone + 'static>(
loops.reconcile(&ctx);
ctx.state().sessions().expire_sessions(now.borrow().clone());
}
tracing::trace!("driver loop completed");
}

#[derive(Debug, PartialEq, Eq)]
Expand Down
14 changes: 9 additions & 5 deletions beelay/beelay-core/src/loading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,16 @@ pub(crate) async fn load_keyhive<R: rand::Rng + rand::CryptoRng + Clone + 'stati
result
};
let events = crate::keyhive_storage::load_events(io).await;
tracing::trace!(num_events = events.len(), "loading keyhive events");
for event in events {
if let Err(e) = keyhive.receive_static_event(event) {
tracing::error!(err=?e, "failed to handle keyhive event");
}
tracing::trace!(num_events = events.len(), ?events, "loading keyhive events");
if let Err(e) = keyhive.ingest_unsorted_static_events(events) {
tracing::error!(err=?e, "failed to ingest keyhive events");
}
// for event in events {
// tracing::trace!(event=?event, "processing loaded event");
// if let Err(e) = keyhive.receive_static_event(event) {
// tracing::error!(err=?e, "failed to handle keyhive event");
// }
// }

(keyhive, rx)
}
Expand Down
57 changes: 57 additions & 0 deletions beelay/beelay-core/tests/keyhive_persistence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::collections::HashMap;

use beelay_core::{Commit, CommitOrBundle};
use keyhive_core::debug_events::{terminal, Nicknames};
use network::Network;
use test_utils::init_logging;

mod network;

#[test]
fn decrypt_on_reload() {
// Check that all the CGKA ops are persisted as they occur so we can reload
// the beelay and still sucessfully decrypt
init_logging();
let mut network = Network::new();
let peer1 = network.create_peer("peer1").build();

let (doc_id, initial_commit) = network.beelay(&peer1).create_doc(vec![]).unwrap();
let mut commits = vec![CommitOrBundle::Commit(initial_commit.clone())];
let mut last_commit = initial_commit;
for i in 0..2 {
let contents = format!("hello {}", i);
let hash = blake3::hash(contents.as_bytes());
let commit = Commit::new(
vec![last_commit.hash()],
"hello".into(),
hash.as_bytes().into(),
);
network
.beelay(&peer1)
.add_commits(doc_id, vec![commit.clone()])
.unwrap();
commits.push(CommitOrBundle::Commit(last_commit));
last_commit = commit.clone();
}

keyhive_core::debug_events::terminal::print_event_table_verbose(
network.beelay(&peer1).log_keyhive_events(
Nicknames::default()
.with_nickname(peer1.as_bytes(), "peer1")
.with_nickname(doc_id.as_bytes(), "doc"),
),
);

network.reload_peer(&peer1);

keyhive_core::debug_events::terminal::print_event_table_verbose(
network.beelay(&peer1).log_keyhive_events(
Nicknames::default()
.with_nickname(peer1.as_bytes(), "peer1")
.with_nickname(doc_id.as_bytes(), "doc"),
),
);

let doc = network.beelay(&peer1).load_doc(doc_id).unwrap();
assert_eq!(doc, commits);
}
11 changes: 11 additions & 0 deletions beelay/beelay-core/tests/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,17 @@ impl Network {
peer_id
}

pub fn reload_peer(&mut self, peer: &PeerId) {
{
let mut beelay = self.beelay(peer);
beelay.shutdown();
}
let beelay = self.beelays.remove(peer).unwrap();
let config =
beelay_core::Config::new(rand::thread_rng(), beelay.signing_key.verifying_key());
self.load_peer(&beelay.nickname, config, beelay.storage, beelay.signing_key);
}

// Create a stream from left to right (i.e. the left peer will send the hello message)
#[allow(dead_code)]
pub fn connect_stream(&mut self, left: &PeerId, right: &PeerId) -> ConnectedPair {
Expand Down
6 changes: 5 additions & 1 deletion keyhive_core/src/keyhive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl<
}
}

let new_doc = Document::generate(
let (new_doc, ops) = Document::generate(
NonEmpty {
head: self.active.dupe().into(),
tail: coparents.into_iter().map(Into::into).collect(),
Expand All @@ -232,6 +232,9 @@ impl<
&mut self.csprng,
)
.await?;
for op in ops {
self.event_listener.on_cgka_op(&Rc::new(op)).await;
}

for head in new_doc.delegation_heads().values() {
self.delegations.insert(head.dupe());
Expand Down Expand Up @@ -1044,6 +1047,7 @@ impl<
if let CgkaOperation::Add { added_id, pk, .. } = signed_op.payload {
let active = self.active.borrow();
if active.id() == added_id {
tracing::info!("one of us!");
let sk = active
.prekey_pairs
.get(&pk)
Expand Down
4 changes: 4 additions & 0 deletions keyhive_core/src/principal/active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ impl<S: AsyncSigner, T: ContentRef, L: PrekeyListener> Active<S, T, L> {

/// Deserialize from storage.
pub fn from_archive(archive: &ActiveArchive, signer: S, listener: L) -> Self {
tracing::trace!(
num_prekey_pairs = archive.prekey_pairs.len(),
"loaded from archive"
);
Self {
prekey_pairs: archive.prekey_pairs.clone(),
individual: archive.individual.clone(),
Expand Down
22 changes: 12 additions & 10 deletions keyhive_core/src/principal/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl<S: AsyncSigner, T: ContentRef, L: MembershipListener<S, T>> Document<S, T,
listener: L,
signer: &S,
csprng: &mut R,
) -> Result<Self, GenerateDocError> {
) -> Result<(Self, Vec<Signed<CgkaOperation>>), GenerateDocError> {
let (group_result, group_vk) = EphemeralSigner::with_signer(csprng, |verifier, signer| {
Group::generate_after_content(
signer,
Expand Down Expand Up @@ -204,16 +204,18 @@ impl<S: AsyncSigner, T: ContentRef, L: MembershipListener<S, T>> Document<S, T,
let (_pcs_key, update_op) = cgka
.update(owner_share_key, owner_share_secret_key, signer, csprng)
.await?;
// FIXME: We don't currently do anything with these ops, but need to share them
// across the network.

ops.push(update_op);
Ok(Document {
group,
content_state: HashSet::new(),
content_heads: initial_content_heads.iter().cloned().collect(),
known_decryption_keys: HashMap::new(),
cgka: Some(cgka),
})
Ok((
Document {
group,
content_state: HashSet::new(),
content_heads: initial_content_heads.iter().cloned().collect(),
known_decryption_keys: HashMap::new(),
cgka: Some(cgka),
},
ops,
))
}

#[instrument(
Expand Down
57 changes: 55 additions & 2 deletions keyhive_core/tests/encrypt.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use keyhive_core::{
access::Access, crypto::signer::memory::MemorySigner, keyhive::Keyhive,
listener::no_listener::NoListener, store::ciphertext::memory::MemoryCiphertextStore,
access::Access,
archive::Archive,
crypto::signer::memory::MemorySigner,
event::static_event::StaticEvent,
keyhive::Keyhive,
listener::{log::Log, no_listener::NoListener},
store::ciphertext::memory::MemoryCiphertextStore,
};
use nonempty::nonempty;
use testresult::TestResult;
Expand Down Expand Up @@ -52,3 +57,51 @@ async fn test_encrypt_to_added_member() -> TestResult {
assert_eq!(decrypted, init_content);
Ok(())
}

#[tokio::test]
async fn test_decrypt_after_to_from_archive() {
test_utils::init_logging();
let sk = MemorySigner::generate(&mut rand::thread_rng());
let store: MemoryCiphertextStore<[u8; 32], Vec<u8>> = MemoryCiphertextStore::new();
let log = Log::new();
let mut alice = Keyhive::generate(sk.clone(), store, log.clone(), rand::thread_rng())
.await
.unwrap();

let archive = alice.into_archive();

let init_content = "hello world".as_bytes().to_vec();
let init_hash = blake3::hash(&init_content);

let doc = alice
.generate_doc(vec![], nonempty![init_hash.into()])
.await
.unwrap();

let encrypted = alice
.try_encrypt_content(doc.clone(), &init_hash.into(), &vec![], &init_content)
.await
.unwrap();

let mut alice = Keyhive::try_from_archive(
&archive,
sk,
MemoryCiphertextStore::new(),
NoListener,
rand::thread_rng(),
)
.unwrap();
let mut events = Vec::new();
while let Some(evt) = log.pop() {
events.push(StaticEvent::from(evt));
}
alice.ingest_unsorted_static_events(events).unwrap();

let doc = alice.get_document(doc.borrow().doc_id()).unwrap();

let decrypted = alice
.try_decrypt_content(doc.clone(), encrypted.encrypted_content())
.unwrap();

assert_eq!(decrypted, init_content);
}
Loading