11use {
22 crate :: retry:: retry_etcd,
33 futures:: StreamExt ,
4- std:: { sync :: Arc , time:: Duration } ,
4+ std:: time:: Duration ,
55 tokio:: {
6- sync:: { broadcast, oneshot , Mutex } ,
7- task:: JoinSet ,
6+ sync:: { broadcast, mpsc , oneshot } ,
7+ task:: { JoinHandle , JoinSet } ,
88 time:: Instant ,
99 } ,
1010 tracing:: { error, warn} ,
@@ -128,8 +128,7 @@ impl ManagedLease {
128128///
129129#[ derive( Clone ) ]
130130pub struct ManagedLeaseFactory {
131- etcd : etcd_client:: Client ,
132- js : Arc < Mutex < JoinSet < ( ) > > > ,
131+ cnc_tx : mpsc:: Sender < ManagedLeaseRuntimeCommand > ,
133132}
134133
135134///
@@ -156,58 +155,75 @@ impl Clone for LeaseExpiredNotify {
156155 }
157156}
158157
159- impl ManagedLeaseFactory {
160- pub fn new ( etcd : etcd_client:: Client ) -> Self {
161- let js = Arc :: new ( Mutex :: new ( JoinSet :: new ( ) ) ) ;
162- let js2 = Arc :: clone ( & js) ;
163-
164- tokio:: spawn ( async move {
165- loop {
166- tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
167- {
168- let mut lock = js2. lock ( ) . await ;
169- while let Some ( result) = lock. try_join_next ( ) {
170- if let Err ( e) = result {
171- error ! ( "detected managed lease thread failed with: {e:?}" ) ;
172- } else {
173- tracing:: info!( "detected managed lease thread finished" ) ;
174- }
175- }
176- }
177- }
178- } ) ;
158+ struct CreateLeaseCommand {
159+ ttl : Duration ,
160+ keepalive_interval : Option < Duration > ,
161+ auto_refresh_limit : Option < usize > ,
162+ callback : oneshot:: Sender < Result < ManagedLease , CreateLeaseError > > ,
163+ }
179164
180- Self { etcd, js }
181- }
165+ enum ManagedLeaseRuntimeCommand {
166+ CreateLease ( CreateLeaseCommand ) ,
167+ }
182168
169+ ///
170+ /// Managed lease factory runtime that will handle the lease creation and keep alive.
171+ /// This is a separate task that will run in the background.
172+ ///
173+ struct ManagedLeaseFactoryRuntime {
183174 ///
184- /// Shutdown the lease factory and revoke all leases .
175+ /// The etcd client to use .
185176 ///
186- /// Becareful calling this method as it will wait for all lease to be revoked.
177+ etcd : etcd_client:: Client ,
178+
187179 ///
188- pub async fn shutdown ( self , timeout : Duration ) {
189- let mut lock = self . js . lock ( ) . await ;
190- let _ = tokio:: time:: timeout ( timeout, lock. shutdown ( ) ) . await ;
191- }
180+ /// The runtime handle to spawn tasks on.
181+ ///
182+ rt : tokio:: runtime:: Handle ,
192183
193- pub async fn new_lease_with_auto_refresh_limit (
194- & self ,
195- ttl : Duration ,
196- keepalive_interval : Option < Duration > ,
197- auto_refresh_limit : Option < usize > ,
198- ) -> Result < ManagedLease , etcd_client:: Error > {
199- let ttl_secs: i64 = ttl. as_secs ( ) as i64 ;
200- assert ! ( ttl_secs >= 2 , "lease ttl must be at least two (2) seconds" ) ;
201- let lease_id = retry_etcd ( self . etcd . clone ( ) , ( ) , move |mut etcd, _| async move {
184+ ///
185+ /// The join set to manage the tasks.
186+ ///
187+ js : JoinSet < ( ) > ,
188+
189+ ///
190+ /// The channel to notify the runtime to shutdown.
191+ ///
192+ cnc_rx : mpsc:: Receiver < ManagedLeaseRuntimeCommand > ,
193+ }
194+
195+ #[ derive( Debug , thiserror:: Error ) ]
196+ pub enum CreateLeaseError {
197+ #[ error( "lease creation failed" ) ]
198+ EtcdError ( #[ from] etcd_client:: Error ) ,
199+ #[ error( "invalid lease ttl, must be at least 2 seconds" ) ]
200+ InvalidTTL ,
201+ }
202+
203+ impl ManagedLeaseFactoryRuntime {
204+ async fn handle_create_lease ( & mut self , cmd : CreateLeaseCommand ) {
205+ let CreateLeaseCommand {
206+ ttl,
207+ keepalive_interval,
208+ auto_refresh_limit,
209+ callback,
210+ } = cmd;
211+ let ttl_secs = ttl. as_secs ( ) as i64 ;
212+ let lease_result = retry_etcd ( self . etcd . clone ( ) , ( ) , move |mut etcd, _| async move {
202213 etcd. lease_grant ( ttl_secs, None ) . await
203214 } )
204- . await ?
205- . id ( ) ;
206- let ( stop_tx, mut stop_rx) = oneshot:: channel ( ) ;
215+ . await ;
216+ let lease_id = match lease_result {
217+ Ok ( lease) => lease. id ( ) ,
218+ Err ( e) => {
219+ let _ = callback. send ( Err ( e. into ( ) ) ) ;
220+ return ;
221+ }
222+ } ;
223+ let ( stop_tx, mut stop_rx) = oneshot:: channel :: < ( ) > ( ) ;
207224 let client = self . etcd . clone ( ) ;
208- let mut lock = self . js . lock ( ) . await ;
209225 let ( tx_expired, rx_expired) = broadcast:: channel ( 1 ) ;
210- let _ = lock . spawn ( async move {
226+ let _ah = self . js . spawn_on ( async move {
211227 let mut refresh_count = 0 ;
212228 ' outer: loop {
213229 let first_keep_alive = Instant :: now ( ) ;
@@ -305,14 +321,131 @@ impl ManagedLeaseFactory {
305321 }
306322 }
307323 let _ = tx_expired. send ( ( ) ) ;
308- } ) ;
309-
310- Ok ( ManagedLease {
324+ } , & self . rt ) ;
325+ let lease = ManagedLease {
311326 etcd : self . etcd . clone ( ) ,
312327 lease_id,
313328 _tx_terminate : stop_tx,
314329 rx_lease_expire : rx_expired,
315- } )
330+ } ;
331+ let _ = callback. send ( Ok ( lease) ) ;
332+ }
333+
334+ async fn handle_command ( & mut self , cmd : ManagedLeaseRuntimeCommand ) {
335+ match cmd {
336+ ManagedLeaseRuntimeCommand :: CreateLease ( cmd) => {
337+ self . handle_create_lease ( cmd) . await ;
338+ }
339+ }
340+ }
341+
342+ async fn run ( mut self ) {
343+ loop {
344+ // Loops ends when both the command channel and the join set are closed.
345+ // When command-and-control channel is closed, it means no `ManagedLease` exists anymore.
346+ // However, the join set may still have tasks running, we must wait for them to finish.
347+ tokio:: select! {
348+ Some ( cmd) = self . cnc_rx. recv( ) => {
349+ self . handle_command( cmd) . await ;
350+ }
351+ Some ( res) = self . js. join_next( ) => {
352+ match res {
353+ Ok ( _) => {
354+ // task completed successfully
355+ tracing:: trace!( "managed lease task completed" ) ;
356+ }
357+ Err ( e) => {
358+ tracing:: warn!( "task failed: {e:?}" ) ;
359+ }
360+ }
361+ }
362+ else => {
363+ break ;
364+ }
365+ }
366+ }
367+ tracing:: trace!( "managed lease factory runtime exiting" ) ;
368+ }
369+ }
370+
371+ impl ManagedLeaseFactory {
372+ ///
373+ /// Create a new managed lease factory.
374+ /// This will spawn a new task that will handle the lease creation and keep alive.
375+ ///
376+ pub fn spawn ( etcd : etcd_client:: Client ) -> ( Self , JoinHandle < ( ) > ) {
377+ Self :: spawn_on ( etcd, tokio:: runtime:: Handle :: current ( ) )
378+ }
379+
380+ ///
381+ /// Create a new managed lease factory.
382+ /// This will spawn a new task that will handle the lease creation and keep alive.
383+ ///
384+ /// Arguments:
385+ /// * `etcd` - The etcd client to use.
386+ /// * `rt` - The runtime handle to spawn tasks on.
387+ pub fn spawn_on (
388+ etcd : etcd_client:: Client ,
389+ rt : tokio:: runtime:: Handle ,
390+ ) -> ( Self , JoinHandle < ( ) > ) {
391+ let ( cnc_tx, cnc_rx) = mpsc:: channel ( 100 ) ;
392+ let lease_rt = ManagedLeaseFactoryRuntime {
393+ etcd,
394+ rt : rt. clone ( ) ,
395+ js : JoinSet :: new ( ) ,
396+ cnc_rx,
397+ } ;
398+ let jh = rt. spawn ( lease_rt. run ( ) ) ;
399+ (
400+ Self {
401+ cnc_tx : cnc_tx. clone ( ) ,
402+ } ,
403+ jh,
404+ )
405+ }
406+
407+ ///
408+ /// Create a new managed lease with the given time-to-live (TTL), keepalive interval and auto refresh limit.
409+ /// The lease will be kept alive until it is dropped OR until the lease has been refresh `auto_refresh_limit` times.
410+ ///
411+ /// Arguments:
412+ ///
413+ /// * `ttl` - The time-to-live for the lease.
414+ /// * `keepalive_interval` - The interval to keep the lease alive.
415+ /// * `auto_refresh_limit` - The number of times to auto refresh the lease.
416+ ///
417+ pub async fn new_lease_with_auto_refresh_limit (
418+ & self ,
419+ ttl : Duration ,
420+ keepalive_interval : Option < Duration > ,
421+ auto_refresh_limit : Option < usize > ,
422+ ) -> Result < ManagedLease , etcd_client:: Error > {
423+ let ttl_secs: i64 = ttl. as_secs ( ) as i64 ;
424+ assert ! ( ttl_secs >= 2 , "lease ttl must be at least two (2) seconds" ) ;
425+ let ( callback_tx, callback_rx) = oneshot:: channel ( ) ;
426+ let command = CreateLeaseCommand {
427+ ttl,
428+ keepalive_interval,
429+ auto_refresh_limit,
430+ callback : callback_tx,
431+ } ;
432+ self . cnc_tx
433+ . send ( ManagedLeaseRuntimeCommand :: CreateLease ( command) )
434+ . await
435+ . expect ( "failed to send command to managed lease factory" ) ;
436+
437+ let result = callback_rx
438+ . await
439+ . expect ( "failed to receive result from managed lease factory" ) ;
440+ match result {
441+ Ok ( lease) => Ok ( lease) ,
442+ Err ( e) => match e {
443+ CreateLeaseError :: EtcdError ( e) => Err ( e) ,
444+ CreateLeaseError :: InvalidTTL => {
445+ panic ! ( "lease ttl must be at least two (2) seconds" ) ;
446+ }
447+ } ,
448+ }
316449 }
317450
318451 ///
@@ -324,6 +457,11 @@ impl ManagedLeaseFactory {
324457 ///
325458 /// Keepalive interval is optional, if not provided it will be half of the ttl.
326459 ///
460+ /// Arguments:
461+ ///
462+ /// * `ttl` - The time-to-live for the lease.
463+ /// * `keepalive_interval` - The interval to keep the lease alive.
464+ ///
327465 pub async fn new_lease (
328466 & self ,
329467 ttl : Duration ,
0 commit comments