Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions crates/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,37 @@ impl DriftClient {
})
}

/// Create a new `DriftClient` instance with explicit Ws PubSub URL
///
/// * `context` - devnet or mainnet
/// * `rpc_client` - an RpcClient instance
/// * `wallet` - wallet to use for tx signing convenience
/// * `ws_pubsub_url` - custom Ws PubSub URL
pub async fn new_with_ws_url(
context: Context,
rpc_client: RpcClient,
wallet: Wallet,
ws_pubsub_url: &str,
) -> SdkResult<Self> {
// check URL format here to fail early, otherwise happens at request time.
let _ = get_http_url(&rpc_client.url())?;
// validate ws url
let _ws_pubsub_url = get_ws_url(ws_pubsub_url)?;

Ok(Self {
backend: Box::leak(Box::new(
DriftClientBackend::new_with_explicit_ws_url(
context,
Arc::new(rpc_client),
ws_pubsub_url,
)
.await?,
)),
context,
wallet: wallet.into(),
})
}

pub async fn sync_user_accounts(&self, filters: Vec<RpcFilterType>) -> SdkResult<()> {
self.backend.account_map.sync_user_accounts(filters).await
}
Expand Down Expand Up @@ -1145,6 +1176,92 @@ impl DriftClientBackend {
})
}

pub async fn new_with_explicit_ws_url(
context: Context,
rpc_client: Arc<RpcClient>,
ws_pubsub_url: &str,
) -> SdkResult<Self> {
use std::time::Duration;

// Initialize PubsubClient with explicit URL
let pubsub_client = Arc::new(PubsubClient::new(ws_pubsub_url).await?);

let perp_market_map =
MarketMap::<PerpMarket>::new(Arc::clone(&pubsub_client), rpc_client.commitment());
let spot_market_map =
MarketMap::<SpotMarket>::new(Arc::clone(&pubsub_client), rpc_client.commitment());

let lut_pubkeys = context.luts();

let account_map = AccountMap::new(
Arc::clone(&pubsub_client),
Arc::clone(&rpc_client),
rpc_client.commitment(),
);

tokio::try_join!(
account_map.subscribe_account_polled(state_account(), Some(Duration::from_secs(180))),
account_map.subscribe_account_polled(
high_leverage_mode_account(),
Some(Duration::from_secs(180))
)
)?;

let (_, _, lut_accounts, state_account_data) = tokio::try_join!(
perp_market_map.sync(&rpc_client),
spot_market_map.sync(&rpc_client),
rpc_client
.get_multiple_accounts(lut_pubkeys)
.map_err(Into::into),
rpc_client
.get_account_data(state_account())
.map_err(Into::into),
)?;

let lookup_tables = lut_pubkeys
.iter()
.zip(lut_accounts.iter())
.map(|(pubkey, account_data)| {
utils::deserialize_alt(*pubkey, account_data.as_ref().unwrap())
.expect("LUT decodes")
})
.collect();

let mut all_oracles = Vec::<(MarketId, Pubkey, OracleSource)>::with_capacity(
perp_market_map.len() + spot_market_map.len(),
);
for market_oracle_info in perp_market_map
.oracles()
.iter()
.chain(spot_market_map.oracles().iter())
{
all_oracles.push(*market_oracle_info);
}

let oracle_map = OracleMap::new(
Arc::clone(&pubsub_client),
all_oracles.as_slice(),
rpc_client.commitment(),
);

Ok(Self {
rpc_client: Arc::clone(&rpc_client),
pubsub_client,
blockhash_subscriber: BlockhashSubscriber::new(Duration::from_secs(2), rpc_client),
program_data: ProgramData::new(
spot_market_map.values(),
perp_market_map.values(),
lookup_tables,
State::try_deserialize(&mut state_account_data.as_slice()).unwrap(),
),
account_map,
perp_market_map,
spot_market_map,
oracle_map,
grpc_unsub: RwLock::default(),
})
}

/// Returns true if `DriftClientBackend` is subscribed via gRPC
pub fn is_grpc_subscribed(&self) -> bool {
let unsub = self.grpc_unsub.read().unwrap();
Expand Down
Loading