@@ -86,12 +86,13 @@ protected function getCache_invalidation_eventsPartitionName(int $shardId, int $
8686 /**
8787 * Process cache invalidation events.
8888 *
89- * @param int $shardId The shard number to process
90- * @param int $priority The priority level
91- * @param int $limit Maximum number of events to fetch per batch
89+ * @param int $shardId The shard number to process
90+ * @param int $priority The priority level
91+ * @param int $limit Maximum number of events to fetch per batch
9292 * @param int $tagBatchSize Number of identifiers to process per batch
9393 *
9494 * @throws \Exception
95+ * @throws \Throwable
9596 */
9697 protected function processEvents (int $ shardId , int $ priority , int $ limit , int $ tagBatchSize , string $ connection_name ): void
9798 {
@@ -103,15 +104,15 @@ protected function processEvents(int $shardId, int $priority, int $limit, int $t
103104
104105 $ events = DB ::table (DB ::raw ("`cache_invalidation_events` PARTITION ( {$ partitionCache_invalidation_events }) " ))
105106 //->from(DB::raw("`{$this->from}` PARTITION ({$partitionsString})"))
106- ->where ('processed ' , '= ' , 0 )
107- ->where ('shard ' , '= ' , $ shardId )
108- ->where ('priority ' , '= ' , $ priority )
109- ->where ('event_time ' , '< ' , $ processingStartTime )
107+ ->where ('processed ' , '= ' , 0 )
108+ ->where ('shard ' , '= ' , $ shardId )
109+ ->where ('priority ' , '= ' , $ priority )
110+ ->where ('event_time ' , '< ' , $ processingStartTime )
110111 // Cerco tutte le chiavi/tag da invalidare per questo database redis
111- ->where ('connection_name ' , '= ' , $ connection_name )
112- ->orderBy ('event_time ' )
113- ->limit ($ limit )
114- ->get ()
112+ ->where ('connection_name ' , '= ' , $ connection_name )
113+ ->orderBy ('event_time ' )
114+ ->limit ($ limit )
115+ ->get ()
115116 ;
116117
117118 //ds($partitionCache_invalidation_events . ' -> Shard (' . $shardId . ') Priority (' . $priority . ') Record = ' . $events->count());
@@ -135,9 +136,9 @@ protected function processEvents(int $shardId, int $priority, int $limit, int $t
135136 //retrive associated identifiers related to fetched event id
136137 // Per le chiavi/tag associati non filtro per connection_name, potrebbero esserci associazioni anche in altri database
137138 $ associations = DB ::table ('cache_invalidation_event_associations ' )
138- ->whereIn ('event_id ' , $ eventIds )
139- ->get ()
140- ->groupBy ('event_id ' )
139+ ->whereIn ('event_id ' , $ eventIds )
140+ ->get ()
141+ ->groupBy ('event_id ' )
141142 ;
142143
143144 // Prepare list of all identifiers to fetch last invalidation times
@@ -321,10 +322,10 @@ protected function updateLastInvalidationTimes(array $identifiers): void
321322 foreach ($ identifiers as $ key ) {
322323 [$ type , $ identifier ] = explode (': ' , $ key , 2 );
323324 DB ::table ('cache_invalidation_timestamps ' )
324- ->updateOrInsert (
325- ['identifier_type ' => $ type , 'identifier ' => $ identifier ],
326- ['last_invalidated ' => $ now ]
327- )
325+ ->updateOrInsert (
326+ ['identifier_type ' => $ type , 'identifier ' => $ identifier ],
327+ ['last_invalidated ' => $ now ]
328+ )
328329 ;
329330 }
330331 }
@@ -335,72 +336,83 @@ protected function updateLastInvalidationTimes(array $identifiers): void
335336 * @param array $batchIdentifiers Array of identifiers to invalidate
336337 * @param array $eventsToUpdate Array of event IDs to mark as processed
337338 *
338- * @throws \Exception
339+ * @throws \Throwable
339340 */
340341 protected function processBatch (array $ batchIdentifiers , array $ eventsToUpdate ): void
341342 {
343+ $ maxAttempts = 5 ;
344+ $ attempts = 0 ;
345+ $ updatedOk = false ;
342346
343- // Begin transaction for the batch
344- // DB::beginTransaction();
347+ while ($ attempts < $ maxAttempts && !$ updatedOk ) {
348+ // Begin transaction for the batch
349+ DB ::beginTransaction ();
345350
346- try {
347- // Separate keys and tags
348- $ keys = [];
349- $ tags = [];
350-
351- foreach ($ batchIdentifiers as $ item ) {
352- switch ($ item ['type ' ]) {
353- case 'key ' :
354- $ keys [] = $ item ['identifier ' ] . '§ ' . $ item ['connection_name ' ];
355- break ;
356- case 'tag ' :
357- $ tags [] = $ item ['identifier ' ] . '§ ' . $ item ['connection_name ' ];
358- break ;
359- }
360-
361- if (empty ($ item ['associated ' ])) {
362- continue ;
363- }
351+ try {
352+ // Separate keys and tags
353+ $ keys = [];
354+ $ tags = [];
364355
365- // Include associated identifiers
366- foreach ($ item ['associated ' ] as $ assoc ) {
367- switch ($ assoc ['type ' ]) {
356+ foreach ($ batchIdentifiers as $ item ) {
357+ switch ($ item ['type ' ]) {
368358 case 'key ' :
369- $ keys [] = $ assoc ['identifier ' ] . '§ ' . $ assoc ['connection_name ' ];
359+ $ keys [] = $ item ['identifier ' ] . '§ ' . $ item ['connection_name ' ];
370360 break ;
371361 case 'tag ' :
372- $ tags [] = $ assoc ['identifier ' ] . '§ ' . $ assoc ['connection_name ' ];
362+ $ tags [] = $ item ['identifier ' ] . '§ ' . $ item ['connection_name ' ];
373363 break ;
374364 }
375- }
376- }
377365
378- // Remove duplicates
379- $ keys = array_unique ( $ keys ) ;
380- $ tags = array_unique ( $ tags );
366+ if ( empty ( $ item [ ' associated ' ])) {
367+ continue ;
368+ }
381369
382- // Invalidate cache for keys
383- if (!empty ($ keys )) {
384- $ this ->invalidateKeys ($ keys );
385- }
370+ // Include associated identifiers
371+ foreach ($ item ['associated ' ] as $ assoc ) {
372+ switch ($ assoc ['type ' ]) {
373+ case 'key ' :
374+ $ keys [] = $ assoc ['identifier ' ] . '§ ' . $ assoc ['connection_name ' ];
375+ break ;
376+ case 'tag ' :
377+ $ tags [] = $ assoc ['identifier ' ] . '§ ' . $ assoc ['connection_name ' ];
378+ break ;
379+ }
380+ }
381+ }
386382
387- // Invalidate cache for tags
388- if (!empty ($ tags )) {
389- $ this ->invalidateTags ($ tags );
390- }
383+ // Remove duplicates
384+ $ keys = array_unique ($ keys );
385+ $ tags = array_unique ($ tags );
391386
392- // Mark events as processed
393- DB ::table ('cache_invalidation_events ' )
394- ->whereIn ('id ' , $ eventsToUpdate )
395- ->update (['processed ' => 1 ])
396- ;
387+ // Invalidate cache for keys
388+ if (!empty ($ keys )) {
389+ $ this ->invalidateKeys ($ keys );
390+ }
391+
392+ // Invalidate cache for tags
393+ if (!empty ($ tags )) {
394+ $ this ->invalidateTags ($ tags );
395+ }
397396
398- // Commit transaction
399- //DB::commit();
400- } catch (\Exception $ e ) {
401- // Rollback transaction on error
402- // DB::rollBack();
403- throw $ e ;
397+ // Mark events as processed
398+ DB ::table ('cache_invalidation_events ' )
399+ ->whereIn ('id ' , $ eventsToUpdate )
400+ ->update (['processed ' => 1 ])
401+ ;
402+
403+ // Commit transaction
404+ DB ::commit ();
405+ $ updatedOk = true ;
406+ } catch (\Throwable $ e ) {
407+ // Rollback transaction on error
408+ DB ::rollBack ();
409+ $ attempts ++;
410+ // Logica per gestire i tentativi falliti
411+ if ($ attempts >= $ maxAttempts ) {
412+ // Salta il record dopo il numero massimo di tentativi
413+ throw $ e ;
414+ }
415+ }
404416 }
405417 }
406418
@@ -421,6 +433,7 @@ protected function invalidateKeys(array $keys): void
421433 // Metodo del progetto
422434 if (is_callable ($ callback )) {
423435 $ callback ($ key , $ connection_name );
436+
424437 return ;
425438 }
426439
@@ -458,6 +471,7 @@ protected function invalidateTags(array $tags): void
458471 foreach ($ groupByConnection as $ connection_name => $ arrTags ) {
459472 $ callback ($ arrTags , $ connection_name );
460473 }
474+
461475 return ;
462476 }
463477 foreach ($ groupByConnection as $ connection_name => $ arrTags ) {
@@ -489,18 +503,18 @@ public function handle(): void
489503 return;
490504 }
491505 */
492- $ lockValue = $ this ->helper ->acquireShardLock ($ shardId , $ lockTimeout , $ connection_name );
506+ $ lockValue = $ this ->helper ->acquireShardLock ($ shardId , $ priority , $ lockTimeout , $ connection_name );
493507
494508 if (!$ lockValue ) {
495509 return ;
496510 }
497511
498512 try {
499513 $ this ->processEvents ($ shardId , $ priority , $ limit , $ tagBatchSize , $ connection_name );
500- } catch (\Exception $ e ) {
501- $ this ->error ('Si è verificato un errore in ' . __METHOD__ . ': ' . $ e ->getMessage () . PHP_EOL . $ e -> getTraceAsString () );
514+ } catch (\Throwable $ e ) {
515+ $ this ->error ('Si è verificato un errore in ' . __METHOD__ . ': ' . $ e ->getMessage ());
502516 } finally {
503- $ this ->helper ->releaseShardLock ($ shardId , $ lockValue , $ connection_name );
517+ $ this ->helper ->releaseShardLock ($ shardId , $ priority , $ lockValue , $ connection_name );
504518 }
505519 }
506520}
0 commit comments