@@ -276,7 +276,7 @@ void MainApp::queuePublishStatsOnDollarTopic()
276
276
*/
277
277
void MainApp::saveStateInThread ()
278
278
{
279
- std::list<BridgeInfoForSerializing> bridgeInfos = BridgeInfoForSerializing::getBridgeInfosForSerializing (this ->bridges );
279
+ std::list<BridgeInfoForSerializing> bridgeInfos = BridgeInfoForSerializing::getBridgeInfosForSerializing (this ->bridgeConfigs );
280
280
281
281
auto f = std::bind (&MainApp::saveState, this ->settings , bridgeInfos, true );
282
282
this ->bgWorker .addTask (f);
@@ -334,12 +334,54 @@ void MainApp::queueRetainedMessageExpiration()
334
334
}
335
335
}
336
336
337
- void MainApp::createBridge (std::shared_ptr<ThreadData> &thread, const std::shared_ptr<BridgeConfig> &bridgeConfig )
337
+ void MainApp::sendBridgesToThreads ( )
338
338
{
339
- std::shared_ptr<BridgeState> bridgeState = std::make_shared<BridgeState>(*bridgeConfig);
339
+ if (threads.empty ())
340
+ return ;
341
+
342
+ int i = 0 ;
343
+ auto bridge_pos = this ->bridgeConfigs .begin ();
344
+ while (bridge_pos != this ->bridgeConfigs .end ())
345
+ {
346
+ auto cur = bridge_pos;
347
+ bridge_pos++;
348
+
349
+ std::shared_ptr<BridgeConfig> bridge = cur->second ;
350
+
351
+ if (!bridge)
352
+ continue ;
353
+
354
+ std::shared_ptr<ThreadData> owner = bridge->owner .lock ();
340
355
341
- bridgeState->threadData = thread;
342
- thread->giveBridge (bridgeState);
356
+ if (!owner)
357
+ {
358
+ owner = threads.at (i++ % threads.size ());
359
+ bridge->owner = owner;
360
+ }
361
+
362
+ if (bridge->queueForDelete )
363
+ {
364
+ owner->removeBridgeQueued (bridge, " Bridge disappeared from config" );
365
+ this ->bridgeConfigs .erase (cur);
366
+ }
367
+ else
368
+ {
369
+ std::shared_ptr<BridgeState> bridgeState = std::make_shared<BridgeState>(*bridge);
370
+ bridgeState->threadData = owner;
371
+ owner->giveBridge (bridgeState);
372
+ }
373
+ }
374
+ }
375
+
376
+ void MainApp::queueSendBridgesToThreads ()
377
+ {
378
+ {
379
+ std::lock_guard<std::mutex> locker (eventMutex);
380
+ auto f = std::bind (&MainApp::sendBridgesToThreads, this );
381
+ taskQueue.push_back (f);
382
+ }
383
+
384
+ wakeUpThread ();
343
385
}
344
386
345
387
void MainApp::queueBridgeReconnectAllThreads (bool alsoQueueNexts)
@@ -409,12 +451,13 @@ void MainApp::saveBridgeInfo(const std::string &filePath, const std::list<Bridge
409
451
bridgeInfoDb.saveInfo (bridgeInfos);
410
452
}
411
453
412
- void MainApp::loadBridgeInfo ()
454
+ std::list<std::shared_ptr<BridgeConfig>> MainApp::loadBridgeInfo (Settings &settings )
413
455
{
414
- this ->bridges = settings.stealBridges ();
456
+ Logger *logger = Logger::getInstance ();
457
+ std::list<std::shared_ptr<BridgeConfig>> bridges = settings.stealBridges ();
415
458
416
459
if (settings.storageDir .empty ())
417
- return ;
460
+ return bridges ;
418
461
419
462
const std::string filePath = settings.getBridgeNamesDBFile ();
420
463
@@ -428,7 +471,7 @@ void MainApp::loadBridgeInfo()
428
471
429
472
for (const BridgeInfoForSerializing &info : bridgeInfos)
430
473
{
431
- for (std::shared_ptr<BridgeConfig> &bridgeConfig : this -> bridges )
474
+ for (std::shared_ptr<BridgeConfig> &bridgeConfig : bridges)
432
475
{
433
476
if (!bridgeConfig->useSavedClientId )
434
477
continue ;
@@ -447,7 +490,7 @@ void MainApp::loadBridgeInfo()
447
490
logger->logf (LOG_WARNING, " File '%s' is not there (yet)" , filePath.c_str ());
448
491
}
449
492
450
-
493
+ return bridges;
451
494
}
452
495
453
496
void MainApp::initMainApp (int argc, char *argv[])
@@ -671,17 +714,10 @@ void MainApp::start()
671
714
672
715
timer.start ();
673
716
674
- uint next_thread_index = 0 ;
717
+ sendBridgesToThreads ();
718
+ queueBridgeReconnectAllThreads (true );
675
719
676
- {
677
- for (std::shared_ptr<BridgeConfig> &bridge : this ->bridges )
678
- {
679
- std::shared_ptr<ThreadData> &thread = threads[next_thread_index++ % threads.size ()];
680
- createBridge (thread, bridge);
681
- }
682
-
683
- queueBridgeReconnectAllThreads (true );
684
- }
720
+ uint next_thread_index = 0 ;
685
721
686
722
this ->bgWorker .start ();
687
723
@@ -858,7 +894,7 @@ void MainApp::start()
858
894
859
895
this ->bgWorker .waitForStop ();
860
896
861
- std::list<BridgeInfoForSerializing> bridgeInfos = BridgeInfoForSerializing::getBridgeInfosForSerializing (this ->bridges );
897
+ std::list<BridgeInfoForSerializing> bridgeInfos = BridgeInfoForSerializing::getBridgeInfosForSerializing (this ->bridgeConfigs );
862
898
saveState (this ->settings , bridgeInfos, false );
863
899
}
864
900
@@ -966,20 +1002,55 @@ void MainApp::loadConfig(bool reload)
966
1002
}
967
1003
}
968
1004
969
- // We are not reloading bridges, because it's hard to figure out what to do. This does mean we need to
970
- // validate the config of existing ones, because they may have their certificate paths changed.
971
- if (this ->bridges .empty ())
972
1005
{
973
- if (!reload)
974
- loadBridgeInfo ();
975
- }
976
- else
977
- {
978
- // Note that this checks the certificate paths of how the briges came from the config file, not the actual bridges. But, at least
979
- // for now, they must be the same, because bridges aren't changed after they are created.
980
- for (auto &bridge : this ->bridges )
1006
+ for (auto &pair : bridgeConfigs)
1007
+ {
1008
+ pair.second ->queueForDelete = true ;
1009
+ }
1010
+
1011
+ std::list<std::shared_ptr<BridgeConfig>> bridges = loadBridgeInfo (this ->settings );
1012
+
1013
+ for (std::shared_ptr<BridgeConfig> &bridge : bridges)
1014
+ {
1015
+ if (!bridge)
1016
+ continue ;
1017
+
1018
+ auto pos = this ->bridgeConfigs .find (bridge->clientidPrefix );
1019
+ if (pos != this ->bridgeConfigs .end ())
1020
+ {
1021
+ logger->log (LOG_NOTICE) << " Assing new config to bridge '" << bridge->clientidPrefix << " ' and reconnect if needed." ;
1022
+
1023
+ std::shared_ptr<BridgeConfig> &cur = pos->second ;
1024
+
1025
+ if (!cur)
1026
+ continue ;
1027
+
1028
+ std::shared_ptr<ThreadData> owner = cur->owner .lock ();
1029
+ std::string clientid = cur->getClientid ();
1030
+ cur = bridge;
1031
+ cur->owner = owner;
1032
+ cur->setClientId (cur->clientidPrefix , clientid);
1033
+ }
1034
+ else
1035
+ {
1036
+ logger->log (LOG_NOTICE) << " Adding bridge '" << bridge->clientidPrefix << " '." ;
1037
+ this ->bridgeConfigs [bridge->clientidPrefix ] = bridge;
1038
+ }
1039
+ }
1040
+
1041
+ for (auto &pair : bridgeConfigs)
1042
+ {
1043
+ if (pair.second ->queueForDelete )
1044
+ {
1045
+ logger->log (LOG_NOTICE) << " Queueing bridge '" << pair.first << " ' for removal, because it disappeared from config." ;
1046
+ }
1047
+ }
1048
+
1049
+ // On first load, the start() function will take care of it.
1050
+ if (reload)
981
1051
{
982
- bridge->isValid ();
1052
+ sendBridgesToThreads ();
1053
+ queueBridgeReconnectAllThreads (false );
983
1054
}
984
1055
}
985
1056
0 commit comments