18
18
* All rights reserved.
19
19
* Copyright (c) 2014-2016 Research Organization for Information Science
20
20
* and Technology (RIST). All rights reserved.
21
- * Copyright (c) 2021-2023 Nanook Consulting. All rights reserved.
21
+ * Copyright (c) 2021-2025 Nanook Consulting All rights reserved.
22
22
* $COPYRIGHT$
23
23
*
24
24
* Additional copyrights may follow
40
40
#include "src/mca/errmgr/errmgr.h"
41
41
#include "src/rml/rml_contact.h"
42
42
#include "src/rml/rml.h"
43
- #include "src/runtime/prte_data_server.h"
43
+ #include "src/runtime/data_server/ prte_data_server.h"
44
44
#include "src/runtime/prte_globals.h"
45
45
#include "src/threads/pmix_threads.h"
46
46
#include "src/util/name_fns.h"
@@ -371,17 +371,51 @@ pmix_status_t pmix_server_unpublish_fn(const pmix_proc_t *proc, char **keys,
371
371
{
372
372
pmix_server_req_t * req ;
373
373
int ret ;
374
- uint8_t cmd = PRTE_PMIX_UNPUBLISH_CMD ;
374
+ uint8_t cmd ;
375
375
size_t m , n ;
376
376
pmix_status_t rc ;
377
377
378
+ // check for a "purge" command
379
+ if (NULL == keys ) {
380
+ /* create the caddy */
381
+ req = PMIX_NEW (pmix_server_req_t );
382
+ pmix_asprintf (& req -> operation , "PURGE: %s:%d" , __FILE__ , __LINE__ );
383
+ req -> opcbfunc = cbfunc ;
384
+ req -> cbdata = cbdata ;
385
+
386
+ /* load the command */
387
+ cmd = PRTE_PMIX_PURGE_PROC_CMD ;
388
+ if (PRTE_SUCCESS != (ret = PMIx_Data_pack (NULL , & req -> msg , & cmd , 1 , PMIX_UINT8 ))) {
389
+ PRTE_ERROR_LOG (ret );
390
+ PMIX_RELEASE (req );
391
+ return PMIX_ERR_PACK_FAILURE ;
392
+ }
393
+
394
+ /* pack the name of the requestor */
395
+ if (PMIX_SUCCESS
396
+ != (rc = PMIx_Data_pack (NULL , & req -> msg , (pmix_proc_t * ) proc , 1 , PMIX_PROC ))) {
397
+ PMIX_ERROR_LOG (rc );
398
+ PMIX_RELEASE (req );
399
+ return rc ;
400
+ }
401
+
402
+ /* thread-shift so we can store the tracker */
403
+ prte_event_set (prte_event_base , & (req -> ev ), -1 , PRTE_EV_WRITE , execute , req );
404
+ PMIX_POST_OBJECT (req );
405
+ prte_event_active (& (req -> ev ), PRTE_EV_WRITE , 1 );
406
+
407
+ return PRTE_SUCCESS ;
408
+ }
409
+
410
+
378
411
/* create the caddy */
379
412
req = PMIX_NEW (pmix_server_req_t );
380
413
pmix_asprintf (& req -> operation , "UNPUBLISH: %s:%d" , __FILE__ , __LINE__ );
381
414
req -> opcbfunc = cbfunc ;
382
415
req -> cbdata = cbdata ;
383
416
384
417
/* load the command */
418
+ cmd = PRTE_PMIX_UNPUBLISH_CMD ;
385
419
if (PRTE_SUCCESS != (ret = PMIx_Data_pack (NULL , & req -> msg , & cmd , 1 , PMIX_UINT8 ))) {
386
420
PRTE_ERROR_LOG (ret );
387
421
PMIX_RELEASE (req );
@@ -456,14 +490,14 @@ void pmix_server_keyval_client(int status, pmix_proc_t *sender,
456
490
pmix_server_req_t * req = NULL ;
457
491
pmix_byte_object_t bo ;
458
492
pmix_data_buffer_t pbkt ;
459
- pmix_status_t ret = PMIX_SUCCESS , rt = PMIX_SUCCESS ;
493
+ pmix_status_t ret = PMIX_SUCCESS ;
460
494
pmix_info_t info ;
461
495
pmix_pdata_t * pdata = NULL ;
462
496
size_t n , npdata = 0 ;
463
- PRTE_HIDE_UNUSED_PARAMS (sender , tg , cbdata );
497
+ PRTE_HIDE_UNUSED_PARAMS (status , sender , tg , cbdata );
464
498
465
499
pmix_output_verbose (1 , prte_pmix_server_globals .output ,
466
- "%s recvd lookup data return" ,
500
+ "%s recvd data server return" ,
467
501
PRTE_NAME_PRINT (PRTE_PROC_MY_NAME ));
468
502
469
503
/* unpack the room number of the request tracker */
@@ -485,23 +519,16 @@ void pmix_server_keyval_client(int status, pmix_proc_t *sender,
485
519
486
520
/* unpack the return status */
487
521
cnt = 1 ;
488
- rc = PMIx_Data_unpack (NULL , buffer , & status , & cnt , PMIX_INT );
522
+ rc = PMIx_Data_unpack (NULL , buffer , & ret , & cnt , PMIX_STATUS );
489
523
if (PMIX_SUCCESS != rc ) {
490
524
PMIX_ERROR_LOG (rc );
491
- ret = PMIX_ERR_UNPACK_FAILURE ;
525
+ ret = rc ;
492
526
goto release ;
493
527
}
494
528
495
- if (PRTE_ERR_NOT_FOUND == status ) {
496
- ret = PMIX_ERR_NOT_FOUND ;
497
- goto release ;
498
- } else if (PRTE_ERR_PARTIAL_SUCCESS == status ) {
499
- rt = PMIX_QUERY_PARTIAL_SUCCESS ;
500
- } else {
501
- ret = PMIX_SUCCESS ;
502
- }
503
- if (PRTE_PMIX_UNPUBLISH_CMD == command ) {
504
- /* nothing else will be included */
529
+ if (PMIX_ERR_NOT_FOUND == ret ||
530
+ PRTE_PMIX_UNPUBLISH_CMD == command ||
531
+ PRTE_PMIX_PUBLISH_CMD == command ) {
505
532
goto release ;
506
533
}
507
534
@@ -512,9 +539,12 @@ void pmix_server_keyval_client(int status, pmix_proc_t *sender,
512
539
* command will not return any data if no matching pending
513
540
* requests were found */
514
541
if (PMIX_SUCCESS != rc ) {
515
- if (PMIX_SUCCESS == ret ) {
516
- ret = rt ;
542
+ if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER == rc ) {
543
+ // not necessarily an error, so don't log it
544
+ goto release ;
517
545
}
546
+ PMIX_ERROR_LOG (rc );
547
+ ret = rc ;
518
548
goto release ;
519
549
}
520
550
@@ -523,12 +553,19 @@ void pmix_server_keyval_client(int status, pmix_proc_t *sender,
523
553
rc = PMIx_Data_load (& pbkt , & bo );
524
554
bo .bytes = NULL ;
525
555
PMIX_BYTE_OBJECT_DESTRUCT (& bo );
556
+ if (PMIX_SUCCESS != rc ) {
557
+ PMIX_ERROR_LOG (rc );
558
+ ret = rc ;
559
+ goto release ;
560
+ }
526
561
527
562
/* unpack the number of data items */
528
563
cnt = 1 ;
529
- if (PMIX_SUCCESS != (ret = PMIx_Data_unpack (NULL , & pbkt , & npdata , & cnt , PMIX_SIZE ))) {
530
- PMIX_ERROR_LOG (ret );
564
+ rc = PMIx_Data_unpack (NULL , & pbkt , & npdata , & cnt , PMIX_SIZE );
565
+ if (PMIX_SUCCESS != rc ) {
566
+ PMIX_ERROR_LOG (rc );
531
567
PMIX_DATA_BUFFER_DESTRUCT (& pbkt );
568
+ ret = rc ;
532
569
goto release ;
533
570
}
534
571
@@ -537,31 +574,32 @@ void pmix_server_keyval_client(int status, pmix_proc_t *sender,
537
574
for (n = 0 ; n < npdata ; n ++ ) {
538
575
PMIX_INFO_CONSTRUCT (& info );
539
576
cnt = 1 ;
540
- if ( PMIX_SUCCESS
541
- != ( ret = PMIx_Data_unpack ( NULL , & pbkt , & pdata [ n ]. proc , & cnt , PMIX_PROC )) ) {
542
- PMIX_ERROR_LOG (ret );
577
+ rc = PMIx_Data_unpack ( NULL , & pbkt , & pdata [ n ]. proc , & cnt , PMIX_PROC );
578
+ if ( PMIX_SUCCESS != rc ) {
579
+ PMIX_ERROR_LOG (rc );
543
580
PMIX_DATA_BUFFER_DESTRUCT (& pbkt );
581
+ ret = rc ;
544
582
goto release ;
545
583
}
546
584
cnt = 1 ;
547
- if (PMIX_SUCCESS != (ret = PMIx_Data_unpack (NULL , & pbkt , & info , & cnt , PMIX_INFO ))) {
548
- PMIX_ERROR_LOG (ret );
585
+ rc = PMIx_Data_unpack (NULL , & pbkt , & info , & cnt , PMIX_INFO );
586
+ if (PMIX_SUCCESS != rc ) {
587
+ PMIX_ERROR_LOG (rc );
549
588
PMIX_DATA_BUFFER_DESTRUCT (& pbkt );
589
+ ret = rc ;
550
590
goto release ;
551
591
}
552
592
PMIX_LOAD_KEY (pdata [n ].key , info .key );
553
- PMIX_VALUE_XFER_DIRECT (ret , & pdata [n ].value , & info .value );
554
- if (PMIX_SUCCESS != ret ) {
555
- PMIX_ERROR_LOG (ret );
593
+ PMIX_VALUE_XFER_DIRECT (rc , & pdata [n ].value , & info .value );
594
+ if (PMIX_SUCCESS != rc ) {
595
+ PMIX_ERROR_LOG (rc );
556
596
PMIX_DATA_BUFFER_DESTRUCT (& pbkt );
597
+ ret = rc ;
557
598
goto release ;
558
599
}
559
600
PMIX_INFO_DESTRUCT (& info );
560
601
}
561
602
}
562
- if (PMIX_SUCCESS == ret ) {
563
- ret = rt ;
564
- }
565
603
566
604
release :
567
605
if (0 <= room_num ) {
0 commit comments