Skip to content

Commit

Permalink
Adds basic reorg logic
Browse files Browse the repository at this point in the history
- Replaces the height count by an enum that keeps track of the confirmation status.
- Gets rid of missing_confirmations.
- Replaces last_known_block_header by last_known_block_height in Gatekeeper and Watcher.
Removes last_known_block_header in the Responder.
- Fixes block order in Watcher::LocatorCache.

last_known_block_header was only being used to query the height when needed, so there's no real needed
to store the whole header. With respect to the Responder, the data was not being used, only updated.

LocatorCache's blocks where stored in reverse order. The bug was introduced in
df120e5 when trying to actually fix that. Tests were wrong.
  • Loading branch information
sr-gi committed Mar 11, 2022
1 parent d285905 commit 71747fd
Show file tree
Hide file tree
Showing 9 changed files with 1,153 additions and 619 deletions.
7 changes: 3 additions & 4 deletions teos/src/api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,10 +794,9 @@ mod tests_methods {
// Add the appointment to the Responder so it counts as triggered
let appointment = generate_dummy_appointment(None).inner;
let signature = cryptography::sign(&appointment.serialize(), &user_sk).unwrap();
internal_api.get_watcher().add_random_tracker_to_responder(
UUID::new(appointment.locator, UserId(user_pk)),
Some(1),
);
internal_api
.get_watcher()
.add_random_tracker_to_responder(UUID::new(appointment.locator, UserId(user_pk)));

// Try to add it via the http API
assert_eq!(
Expand Down
6 changes: 3 additions & 3 deletions teos/src/api/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ mod tests_private_api {
let appointment = generate_dummy_appointment(None).inner;
internal_api
.watcher
.add_random_tracker_to_responder(UUID::new(appointment.locator, user_id), Some(1));
.add_random_tracker_to_responder(UUID::new(appointment.locator, user_id));

let response = internal_api
.get_all_appointments(Request::new(()))
Expand Down Expand Up @@ -442,7 +442,7 @@ mod tests_private_api {
let appointment = generate_dummy_appointment(None).inner;
internal_api
.watcher
.add_random_tracker_to_responder(UUID::new(appointment.locator, user_id), Some(1));
.add_random_tracker_to_responder(UUID::new(appointment.locator, user_id));
}

let response = internal_api
Expand Down Expand Up @@ -794,7 +794,7 @@ mod tests_public_api {
let user_signature = cryptography::sign(&appointment.serialize(), &user_sk).unwrap();
internal_api
.watcher
.add_random_tracker_to_responder(UUID::new(appointment.locator, user_id), Some(1));
.add_random_tracker_to_responder(UUID::new(appointment.locator, user_id));

match internal_api
.add_appointment(Request::new(msgs::AddAppointmentRequest {
Expand Down
258 changes: 162 additions & 96 deletions teos/src/carrier.rs

Large diffs are not rendered by default.

49 changes: 32 additions & 17 deletions teos/src/dbm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ use teos_common::UserId;

use crate::extended_appointment::{compute_appointment_slots, ExtendedAppointment, UUID};
use crate::gatekeeper::UserInfo;
use crate::responder::TransactionTracker;
use crate::responder::{ConfirmationStatus, TransactionTracker};

/// Packs the errors than can raise when interacting with the underlying database.
#[derive(Debug)]
pub enum Error {
AlreadyExists,
MissingForeignKey,
MissingField,
NotFound,
Unknown(SqliteError),
}
Expand Down Expand Up @@ -90,7 +91,8 @@ impl DBM {
UUID INT PRIMARY KEY,
dispute_tx BLOB NOT NULL,
penalty_tx BLOB NOT NULL,
height INT,
height INT NOT NULL,
confirmed BOOL NOT NULL,
FOREIGN KEY(UUID)
REFERENCES appointments(UUID)
ON DELETE CASCADE
Expand Down Expand Up @@ -455,15 +457,18 @@ impl DBM {
uuid: UUID,
tracker: &TransactionTracker,
) -> Result<(), Error> {
let (height, confirmed) = tracker.status.to_db_data().ok_or(Error::MissingField)?;

let query =
"INSERT INTO trackers (UUID, dispute_tx, penalty_tx, height) VALUES (?1, ?2, ?3, ?4)";
"INSERT INTO trackers (UUID, dispute_tx, penalty_tx, height, confirmed) VALUES (?1, ?2, ?3, ?4, ?5)";
match self.store_data(
query,
params![
uuid.serialize(),
tracker.dispute_tx.serialize(),
tracker.penalty_tx.serialize(),
tracker.height,
height,
confirmed,
],
) {
Ok(x) => {
Expand All @@ -488,14 +493,15 @@ impl DBM {
let dispute_tx = deserialize::<Transaction>(&raw_dispute_tx).unwrap();
let raw_penalty_tx: Vec<u8> = row.get(2).unwrap();
let penalty_tx = deserialize::<Transaction>(&raw_penalty_tx).unwrap();
let height: Option<u32> = row.get(3).unwrap();
let raw_userid: Vec<u8> = row.get(4).unwrap();
let height: u32 = row.get(3).unwrap();
let confirmed: bool = row.get(4).unwrap();
let raw_userid: Vec<u8> = row.get(5).unwrap();
let user_id = UserId::deserialize(&raw_userid).unwrap();

Ok(TransactionTracker {
dispute_tx,
penalty_tx,
height,
status: ConfirmationStatus::from_db_data(height, confirmed),
user_id,
})
})
Expand All @@ -518,16 +524,17 @@ impl DBM {
let dispute_tx = deserialize::<Transaction>(&raw_dispute_tx).unwrap();
let raw_penalty_tx: Vec<u8> = row.get(2).unwrap();
let penalty_tx = deserialize::<Transaction>(&raw_penalty_tx).unwrap();
let height: Option<u32> = row.get(3).unwrap();
let raw_userid: Vec<u8> = row.get(4).unwrap();
let height: u32 = row.get(3).unwrap();
let confirmed: bool = row.get(4).unwrap();
let raw_userid: Vec<u8> = row.get(5).unwrap();
let user_id = UserId::deserialize(&raw_userid).unwrap();

trackers.insert(
uuid,
TransactionTracker {
dispute_tx,
penalty_tx,
height,
status: ConfirmationStatus::from_db_data(height, confirmed),
user_id,
},
);
Expand Down Expand Up @@ -764,7 +771,8 @@ mod tests {
let mut dbm = DBM::in_memory().unwrap();
let uuid = generate_uuid();
let appointment = generate_dummy_appointment(None);
let tracker = get_random_tracker(appointment.user_id, None);
// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(appointment.user_id, ConfirmationStatus::ConfirmedIn(100));

// Add the user and link an appointment (this is usually done once the appointment)
// is added after the user creation, but for the test purpose it can be done all at once.
Expand Down Expand Up @@ -915,7 +923,8 @@ mod tests {
let (uuid, appointment) = generate_dummy_appointment_with_user(user_id, None);
dbm.store_appointment(uuid, &appointment).unwrap();

let tracker = get_random_tracker(user_id, Some(100));
// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(user_id, ConfirmationStatus::InMempoolSince(100));
dbm.store_tracker(uuid, &tracker).unwrap();

// We should get all the appointments back except from the triggered one
Expand Down Expand Up @@ -980,7 +989,8 @@ mod tests {
let mut dbm = DBM::in_memory().unwrap();
let uuid = generate_uuid();
let appointment = generate_dummy_appointment(None);
let tracker = get_random_tracker(appointment.user_id, None);
// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(appointment.user_id, ConfirmationStatus::ConfirmedIn(21));

let info = UserInfo::new(21, 42);

Expand Down Expand Up @@ -1063,7 +1073,8 @@ mod tests {
let (uuid, appointment) = generate_dummy_appointment_with_user(user_id, None);
dbm.store_appointment(uuid, &appointment).unwrap();

let tracker = get_random_tracker(user_id, Some(21));
// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(user_id, ConfirmationStatus::ConfirmedIn(21));
assert!(matches!(dbm.store_tracker(uuid, &tracker), Ok { .. }));
assert_eq!(dbm.load_tracker(uuid).unwrap(), tracker);
}
Expand All @@ -1079,7 +1090,8 @@ mod tests {
let (uuid, appointment) = generate_dummy_appointment_with_user(user_id, None);
dbm.store_appointment(uuid, &appointment).unwrap();

let tracker = get_random_tracker(user_id, Some(42));
// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(user_id, ConfirmationStatus::InMempoolSince(42));
assert!(matches!(dbm.store_tracker(uuid, &tracker), Ok { .. }));

// Try to store it again, but it shouldn't go through
Expand All @@ -1095,7 +1107,9 @@ mod tests {

let uuid = generate_uuid();
let user_id = get_random_user_id();
let tracker = get_random_tracker(user_id, None);

// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(user_id, ConfirmationStatus::InMempoolSince(42));

assert!(matches!(
dbm.store_tracker(uuid, &tracker),
Expand Down Expand Up @@ -1124,7 +1138,8 @@ mod tests {
let (uuid, appointment) = generate_dummy_appointment_with_user(user_id, None);
dbm.store_appointment(uuid, &appointment).unwrap();

let tracker = get_random_tracker(user_id, None);
// The confirmation status doesn't really matter here, it can be any of {ConfirmedIn, InMempoolSince}.
let tracker = get_random_tracker(user_id, ConfirmationStatus::InMempoolSince(42));
dbm.store_tracker(uuid, &tracker).unwrap();
trackers.insert(uuid, tracker);
}
Expand Down
78 changes: 50 additions & 28 deletions teos/src/gatekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
use std::collections::{HashMap, HashSet};
use std::iter::FromIterator;
use std::ops::Deref;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};

use lightning::chain;
use lightning_block_sync::{poll::ValidatedBlockHeader, BlockHeaderData};

use teos_common::constants::ENCRYPTED_BLOB_MAX_SIZE;
use teos_common::cryptography;
Expand Down Expand Up @@ -75,7 +74,7 @@ pub(crate) struct MaxSlotsReached;
#[derive(Debug)]
pub struct Gatekeeper {
/// last known block header by the [Gatekeeper].
last_known_block_header: Mutex<BlockHeaderData>,
last_known_block_height: AtomicU32,
/// Number of slots new subscriptions get by default.
subscription_slots: u32,
/// Expiry time new subscription get by default, in blocks (starting from the block the subscription is requested).
Expand All @@ -91,15 +90,15 @@ pub struct Gatekeeper {
impl Gatekeeper {
/// Creates a new [Gatekeeper] instance.
pub fn new(
last_known_block_header: ValidatedBlockHeader,
last_known_block_height: u32,
subscription_slots: u32,
subscription_duration: u32,
expiry_delta: u32,
dbm: Arc<Mutex<DBM>>,
) -> Self {
let registered_users = dbm.lock().unwrap().load_all_users();
Gatekeeper {
last_known_block_header: Mutex::new(*last_known_block_header.deref()),
last_known_block_height: AtomicU32::new(last_known_block_height),
subscription_slots,
subscription_duration,
expiry_delta,
Expand Down Expand Up @@ -159,7 +158,7 @@ impl Gatekeeper {
&self,
user_id: UserId,
) -> Result<RegistrationReceipt, MaxSlotsReached> {
let block_count = self.last_known_block_header.lock().unwrap().height;
let block_count = self.last_known_block_height.load(Ordering::Acquire);

// TODO: For now, new calls to `add_update_user` add subscription_slots to the current count and reset the expiry time
let mut registered_users = self.registered_users.lock().unwrap();
Expand Down Expand Up @@ -238,7 +237,7 @@ impl Gatekeeper {
Err(AuthenticationFailure("User not found.")),
|user_info| {
Ok((
self.last_known_block_header.lock().unwrap().height
self.last_known_block_height.load(Ordering::Acquire)
>= user_info.subscription_expiry,
user_info.subscription_expiry,
))
Expand Down Expand Up @@ -316,19 +315,17 @@ impl chain::Listen for Gatekeeper {
.retain(|id, _| !outdated_users.contains(id));
self.dbm.lock().unwrap().batch_remove_users(&outdated_users);

// Update last known block
*self.last_known_block_header.lock().unwrap() = BlockHeaderData {
header: block.header,
height,
chainwork: block.header.work(),
};
// Update last known block height
self.last_known_block_height
.store(height, Ordering::Release);
}

/// FIXME: To be implemented.
/// This will handle reorgs on the [Gatekeeper].
#[allow(unused_variables)]
/// Handles reorgs in the [Gatekeeper]. Simply updates the last_known_block_height.
fn block_disconnected(&self, header: &bitcoin::BlockHeader, height: u32) {
todo!()
log::warn!("Block disconnected: {}", header.block_hash());
// There's nothing to be done here but updating the last known block
self.last_known_block_height
.store(height - 1, Ordering::Release);
}
}

Expand All @@ -355,8 +352,8 @@ mod tests {
&& self.subscription_duration == other.subscription_duration
&& self.expiry_delta == other.expiry_delta
&& *self.registered_users.lock().unwrap() == *other.registered_users.lock().unwrap()
&& *self.last_known_block_header.lock().unwrap()
== *other.last_known_block_header.lock().unwrap()
&& self.last_known_block_height.load(Ordering::Relaxed)
== other.last_known_block_height.load(Ordering::Relaxed)
}
}
impl Eq for Gatekeeper {}
Expand Down Expand Up @@ -385,9 +382,8 @@ mod tests {
}

fn init_gatekeeper(chain: &Blockchain) -> Gatekeeper {
let tip = chain.tip();
let dbm = Arc::new(Mutex::new(DBM::in_memory().unwrap()));
Gatekeeper::new(tip, SLOTS, DURATION, EXPIRY_DELTA, dbm)
Gatekeeper::new(chain.get_block_count(), SLOTS, DURATION, EXPIRY_DELTA, dbm)
}

#[test]
Expand All @@ -396,7 +392,13 @@ mod tests {
let chain = Blockchain::default().with_height(START_HEIGHT);
let dbm = Arc::new(Mutex::new(DBM::in_memory().unwrap()));

let gatekeeper = Gatekeeper::new(chain.tip(), SLOTS, DURATION, EXPIRY_DELTA, dbm.clone());
let gatekeeper = Gatekeeper::new(
chain.get_block_count(),
SLOTS,
DURATION,
EXPIRY_DELTA,
dbm.clone(),
);
assert!(gatekeeper.is_fresh());

// If we add some users and appointments to the system and create a new Gatekeeper reusing the same db
Expand All @@ -419,7 +421,8 @@ mod tests {
}

// Create a new GK reusing the same DB and check that the data is loaded
let another_gk = Gatekeeper::new(chain.tip(), SLOTS, DURATION, EXPIRY_DELTA, dbm);
let another_gk =
Gatekeeper::new(chain.get_block_count(), SLOTS, DURATION, EXPIRY_DELTA, dbm);
assert!(!another_gk.is_fresh());
assert_eq!(gatekeeper, another_gk);
}
Expand Down Expand Up @@ -474,7 +477,9 @@ mod tests {

// Let generate a new block and add the user again to check that both the slots and expiry are updated.
chain.generate(None);
*gatekeeper.last_known_block_header.lock().unwrap() = *chain.tip().deref();
gatekeeper
.last_known_block_height
.store(chain.get_block_count(), Ordering::Relaxed);
let updated_receipt = gatekeeper.add_update_user(user_id).unwrap();

assert_eq!(
Expand Down Expand Up @@ -801,7 +806,7 @@ mod tests {
fn test_block_connected() {
// block_connected in the Gatekeeper is used to keep track of time in order to manage the users' subscription expiry.
// Remove users that get outdated at the new block's height from registered_users and the database.
let chain = Blockchain::default().with_height(START_HEIGHT);
let mut chain = Blockchain::default().with_height(START_HEIGHT);
let gatekeeper = init_gatekeeper(&chain);

// Check that users are outdated when the expected height if hit
Expand All @@ -814,7 +819,7 @@ mod tests {
}

// Connect a new block. Outdated users are deleted
gatekeeper.block_connected(chain.blocks.last().unwrap(), chain.tip().height + 1);
gatekeeper.block_connected(&chain.generate(None), chain.get_block_count());

// Check that users have been removed from registered_users and the database
for user_id in &[user1_id, user2_id, user3_id] {
Expand All @@ -831,8 +836,25 @@ mod tests {

// Check that the last_known_block_header has been properly updated
assert_eq!(
gatekeeper.last_known_block_header.lock().unwrap().header,
chain.tip().header
gatekeeper.last_known_block_height.load(Ordering::Relaxed),
chain.get_block_count()
);
}

#[test]
fn test_block_disconnected() {
// Block disconnected simply updates the last known block
let chain = Blockchain::default().with_height(START_HEIGHT);
let gatekeeper = init_gatekeeper(&chain);
let height = chain.get_block_count();

let last_known_block_header = chain.tip();
let prev_block_header = chain.at_height((height - 1) as usize);

gatekeeper.block_disconnected(&last_known_block_header.header, height);
assert_eq!(
gatekeeper.last_known_block_height.load(Ordering::Relaxed),
prev_block_header.height
);
}
}
Loading

0 comments on commit 71747fd

Please sign in to comment.