Skip to content

Commit

Permalink
feat(template): sync templates when syncing substates (#1242)
Browse files Browse the repository at this point in the history
Description
---
Synchronizing when a VN is just added to the network or syncing new
substates. We try to fetch template binary from its owning committee to
make our chance higher to get it.
There is also a logic put to periodically check for pending templates,
so if we know that we must have a template ready, just try to fetch
again from other VNs.

Motivation and Context
---
When new nodes are joining the network, they did not have the template
binaries as so we needed synchronization.

How Has This Been Tested?
---
1. Start a local swarm with X VNs
2. Publishing a new template (via CLI is simpler)
3. Adding a new VN (or more)
4. Mine to next epoch to make VN(s) available
5. See if new VN(s) have the new template already registered

What process can a PR reviewer use to test or verify this change?
---


Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify

---------

Co-authored-by: Stan Bondi <[email protected]>
  • Loading branch information
ksrichard and sdbondi authored Jan 20, 2025
1 parent b730be3 commit 6f3d0b6
Show file tree
Hide file tree
Showing 27 changed files with 890 additions and 59 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions applications/tari_dan_app_utilities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ tari_crypto = { workspace = true }
tari_consensus = { workspace = true }
tari_shutdown = { workspace = true }
tari_dan_common_types = { workspace = true }
tari_dan_p2p = { workspace = true }
tari_state_store_sqlite = { workspace = true }
tari_dan_engine = { workspace = true }
tari_dan_storage = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_dan_common_types::NodeAddressable;
use std::time::Duration;

use tari_dan_common_types::{NodeAddressable, PeerAddress};
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_shutdown::ShutdownSignal;
use tari_validator_node_rpc::client::TariValidatorNodeRpcClientFactory;
use tokio::{sync::mpsc, task::JoinHandle};

use super::{downloader::TemplateDownloadWorker, service::TemplateManagerService, TemplateManager};
use crate::template_manager::interface::TemplateManagerHandle;

pub fn spawn<TAddr: NodeAddressable + 'static>(
manager: TemplateManager<TAddr>,
epoch_manager: EpochManagerHandle<PeerAddress>,
client_factory: TariValidatorNodeRpcClientFactory,
shutdown: ShutdownSignal,
) -> (TemplateManagerHandle, JoinHandle<anyhow::Result<()>>) {
let (tx_request, rx_request) = mpsc::channel(1);
Expand All @@ -37,8 +43,16 @@ pub fn spawn<TAddr: NodeAddressable + 'static>(
let (tx_download_queue, rx_download_queue) = mpsc::channel(1);
let (tx_completed_downloads, rx_completed_downloads) = mpsc::channel(1);

let join_handle =
TemplateManagerService::spawn(rx_request, manager, tx_download_queue, rx_completed_downloads, shutdown);
let join_handle = TemplateManagerService::spawn(
rx_request,
manager,
epoch_manager,
tx_download_queue,
rx_completed_downloads,
client_factory,
Duration::from_secs(60),
shutdown,
);
TemplateDownloadWorker::new(rx_download_queue, tx_completed_downloads).spawn();
(handle, join_handle)
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,73 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
name: name.to_string(),
address,
binary_sha,
author_public_key: Default::default(),
},
executable: TemplateExecutable::CompiledWasm(compiled_code),
}
}

pub fn template_exists(&self, address: &TemplateAddress) -> Result<bool, TemplateManagerError> {
pub fn template_exists(
&self,
address: &TemplateAddress,
status: Option<TemplateStatus>,
) -> Result<bool, TemplateManagerError> {
if self.builtin_templates.contains_key(address) {
return Ok(true);
}
let mut tx = self.global_db.create_transaction()?;
self.global_db
.templates(&mut tx)
.template_exists(address)
.template_exists(address, status)
.map_err(|_| TemplateManagerError::TemplateNotFound { address: *address })
}

/// Deletes a template if exists.
pub fn delete_template(&self, address: &TemplateAddress) -> Result<(), TemplateManagerError> {
if !self.template_exists(address, None)? {
return Ok(());
}

let mut tx = self.global_db.create_transaction()?;
self.global_db
.templates(&mut tx)
.delete_template(address)
.map_err(|_| TemplateManagerError::TemplateDeleteFailed { address: *address })
}

/// Fetching all templates by addresses.
pub fn fetch_templates_by_addresses(
&self,
mut addresses: Vec<TemplateAddress>,
) -> Result<Vec<Template>, TemplateManagerError> {
let mut result = Vec::with_capacity(addresses.len());

// check in built-in templates first
let mut found_template_indexes = vec![];
for (i, address) in addresses.iter().enumerate() {
if let Some(template) = self.builtin_templates.get(address) {
result.push(template.clone());
found_template_indexes.push(i);
}
}
found_template_indexes.iter().for_each(|i| {
addresses.remove(*i);
});

// check the rest in DB
let mut tx = self.global_db.create_transaction()?;
self.global_db
.templates(&mut tx)
.get_templates_by_addresses(addresses.iter().map(|addr| addr.as_ref()).collect())
.map_err(|_| TemplateManagerError::TemplatesNotFound { addresses })?
.iter()
.for_each(|template| {
result.push(Template::from(template.clone()));
});

Ok(result)
}

pub fn fetch_template(&self, address: &TemplateAddress) -> Result<Template, TemplateManagerError> {
// first of all, check if the address is for a bulitin template
if let Some(template) = self.builtin_templates.get(address) {
Expand Down Expand Up @@ -190,6 +241,28 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
Ok(templates)
}

pub fn add_pending_template(
&self,
template_address: tari_engine_types::TemplateAddress,
epoch: Epoch,
) -> Result<(), TemplateManagerError> {
let template = DbTemplate::empty_pending(template_address, epoch);

let mut tx = self.global_db.create_transaction()?;
let mut templates_db = self.global_db.templates(&mut tx);
match templates_db.get_template(&template.template_address)? {
Some(_) => templates_db.update_template(
&template.template_address,
DbTemplateUpdate::status(TemplateStatus::Pending),
)?,
None => templates_db.insert_template(template)?,
}

tx.commit()?;

Ok(())
}

pub(super) fn add_template(
&self,
author_public_key: PublicKey,
Expand Down
Loading

0 comments on commit 6f3d0b6

Please sign in to comment.