Skip to content

Commit 59a96e3

Browse files
Merge pull request #8 from LIHPC-Computational-Geometry/cpu100_fan2
Cpu100 fan2
2 parents c23396f + a74a07b commit 59a96e3

File tree

6 files changed

+108
-156
lines changed

6 files changed

+108
-156
lines changed

cmake/version.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44

55
set (TK_UTIL_MAJOR_VERSION "6")
6-
set (TK_UTIL_MINOR_VERSION "7")
6+
set (TK_UTIL_MINOR_VERSION "8")
77
set (TK_UTIL_RELEASE_VERSION "0")
88
set (TK_UTIL_VERSION ${TK_UTIL_MAJOR_VERSION}.${TK_UTIL_MINOR_VERSION}.${TK_UTIL_RELEASE_VERSION})
99

src/TkUtil/ThreadPool.cpp

Lines changed: 56 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,7 @@ void ThreadPool::TaskIfc::setStatus (STATUS status, const UTF8String& msg)
7676

7777
ThreadPool::WorkerThread::WorkerThread ( )
7878
: _thread (0), _task (0), _haltMutex ( ), _halted (false), _completed (false)
79-
{ // Rem : les bouléens _halted et _barrier ne sont pas initialisés ici pour éviter des accès concurrents détectés par intel inspector.
80-
// Ils seront initialisés dans execute.
79+
{
8180
} // WorkerThread::WorkerThread
8281

8382

@@ -125,17 +124,7 @@ void ThreadPool::WorkerThread::execute ( )
125124
} // if (true == _halted)
126125
}
127126

128-
if (0 == _task)
129-
{
130-
//cout << "Worker " << (unsigned long)this << " WAITS " << yieldDelay ( ) << " nanoseconds ..." << endl;
131-
const size_t delay = ThreadPool::yieldDelay ( );
132-
if (0 == delay)
133-
this_thread::yield ( );
134-
else
135-
this_thread::sleep_for (std::chrono::nanoseconds (delay)); // v 6.7.0
136-
//cout << "Worker " << (unsigned long)this << " HAS WAITED." << endl;
137-
} // if (0 == _task)
138-
else
127+
if (0 != _task)
139128
{
140129
try
141130
{
@@ -179,16 +168,25 @@ void ThreadPool::WorkerThread::execute ( )
179168

180169
// Informer le gestionnaire de l'accomplissement de la tache :
181170
ThreadPool::instance ( ).taskCompleted (*task);
182-
} // while (false == _halted)
171+
} // if (0 != _task)
183172

184173
{
185174
unique_lock<mutex> haltLock (_haltMutex);
186175
halted = _halted;
187176
}
188-
} // else if (0 == _task)
189-
190-
unique_lock<mutex> completedLock (_completedMutex);
191-
_completed = true;
177+
178+
{ // v 6.8.0. La tache est considérée achevée :
179+
unique_lock<mutex> completedLock (_completedMutex);
180+
_completed = true;
181+
}
182+
183+
// Attente d'une nouvelle tache à exécuter :
184+
if (false == halted)
185+
{ // v 6.8.0
186+
unique_lock<mutex> sleepingLock (ThreadPool::instance ( ).getWakeUpCondMutex ( ));
187+
ThreadPool::instance ( ).getWakeUpCondition ( ).wait (sleepingLock);
188+
} // if (false == halted)
189+
} // while (false == _halted)
192190
} // WorkerThread::execute
193191

194192

@@ -229,12 +227,13 @@ void ThreadPool::WorkerThread::join ( )
229227

230228
ThreadPool* ThreadPool::_instance = 0;
231229
bool ThreadPool::_completed = true; // !running
232-
size_t ThreadPool::_yieldDelay = 100000; // v 6.7.0 - 1 milliseconde
230+
233231

234232
ThreadPool::ThreadPool (size_t tasksNum)
235-
: _thread (0), _tasksNum (tasksNum),
233+
: _thread (0), _tasksNum (tasksNum), _halted (false), _barrier (false),
236234
_queuedTasks ( ), _runningTasks ( ), _deadTasks ( ), _workerThreads ( ),
237-
_tasksMutex ( ), _tasksCond ( ), _wakeUpCondMutex ( ), _barrierCondMutex ( )
235+
_tasksMutex ( ), _tasksCond ( ), _wakeUpCondMutex ( ), _wakeUpCond ( ),
236+
_barrierCondMutex ( ), _barrierCond ( ), _joinCond ( )
238237
{ // Rem : les bouléens _halted et _barrier ne sont pas initialisés ici pour éviter des accès concurrents détectés par intel inspector.
239238
// Ils seront initialisés dans init.
240239
} // ThreadPool::ThreadPool
@@ -267,23 +266,6 @@ void ThreadPool::initialize (size_t tasksNum)
267266
if (0 != _instance)
268267
throw Exception (UTF8String ("ThreadPool::initialize : API déjà initialisée.", charset));
269268

270-
#ifdef __INTEL_COMPILER
271-
if (__INTEL_COMPILER < 1500)
272-
{
273-
ConsoleOutput::cerr ( )
274-
<< "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
275-
<< co_endl
276-
<< "ATTENTION : LE MULTITHREADING AVEC L'UTILISATION DE L'API "
277-
<< "ThreadPool RISQUE DE PROVOQUER DES PLANTAGES." << co_endl
278-
<< "Cette API ThreadPool est incompatible avec les versions du "
279-
<< "compilateur Intel antérieures à la version 15.0 (version "
280-
<< "courante : " << (unsigned long)__INTEL_COMPILER << ")."
281-
<< co_endl
282-
<< "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
283-
<< co_endl;
284-
} // if (__INTEL_COMPILER < 1500)
285-
#endif // __INTEL_COMPILER
286-
287269
_instance = new ThreadPool (0 == tasksNum ? MachineData::instance ( ).getProcessorsNum ( ) : tasksNum);
288270
CHECK_NULL_PTR_ERROR (_instance)
289271

@@ -317,18 +299,6 @@ ThreadPool& ThreadPool::instance ( )
317299
} // ThreadPool::instance
318300

319301

320-
size_t ThreadPool::yieldDelay ( ) // v 6.7.0
321-
{
322-
return _yieldDelay;
323-
} // ThreadPool::yieldDelay
324-
325-
326-
void ThreadPool::setYieldDelay (size_t delay) // v 6.7.0
327-
{
328-
_yieldDelay = delay;
329-
} // ThreadPool::setYieldDelay
330-
331-
332302
void ThreadPool::stop ( )
333303
{
334304
unique_lock<mutex> haltLock (_haltMutex);
@@ -338,35 +308,17 @@ void ThreadPool::stop ( )
338308

339309
void ThreadPool::stopWorkers ( )
340310
{
341-
// On demande l'arrêt des travailleurs. */
311+
// On demande l'arrêt des travailleurs.
342312
for (vector<WorkerThread*>::iterator itw1 = _workerThreads.begin ( ); _workerThreads.end ( ) != itw1; itw1++)
343313
(*itw1)->stop ( );
344314

345-
// On attend l'arrêt des travailleurs. */
346-
// On réveille les travailleurs s'ils étaient au chômage pour qu'ils
347-
// puissent être joignables :
315+
// On attend l'arrêt des travailleurs.
316+
// On réveille les travailleurs s'ils étaient au chômage pour qu'ils puissent être joignables :
348317
{
349318
unique_lock<mutex> wakeUpCondLock (_wakeUpCondMutex);
350319
_wakeUpCond.notify_all ( );
351320
}
352321

353-
bool completed = false;
354-
while (false == completed)
355-
{
356-
// Pour une raison inconnue le réveil ci-dessus ne fonctionne pas toujours bien, on remet donc ici une couche tant que tous les threads
357-
// ne sont pas achevés.
358-
// 12/2017 - gcc 4.4.6
359-
{
360-
unique_lock<mutex> wakeUpCondLock (_wakeUpCondMutex);
361-
_wakeUpCond.notify_all ( );
362-
}
363-
this_thread::yield ( );
364-
completed = true;
365-
for (vector<WorkerThread*>::iterator itw3 = _workerThreads.begin ( ); _workerThreads.end ( ) != itw3; itw3++)
366-
if (false == (*itw3)->completed ( ))
367-
completed = false;
368-
} // while (false == completed)
369-
370322
for (vector<WorkerThread*>::iterator itw2 = _workerThreads.begin ( ); _workerThreads.end ( ) != itw2; itw2++)
371323
(*itw2)->join ( );
372324
} // ThreadPool::stopWorkers
@@ -428,10 +380,9 @@ void ThreadPool::barrier ( )
428380
_barrier = true;
429381
}
430382
unique_lock<mutex> barrierCondLock (_barrierCondMutex);
431-
// Patch du 09/04/18 : Faut il se mettre en situation d'attente ?
432383
// ATTENTION : on peut avoir _queuedTasks et _runningTasks vides.
433-
unique_lock<mutex> tasksLock (_tasksMutex); // 09/04/18
434-
if ((0 != _queuedTasks.size( )) || (0 != _runningTasks.size( )))// 09/04/18
384+
unique_lock<mutex> tasksLock (_tasksMutex);
385+
if ((0 != _queuedTasks.size( )) || (0 != _runningTasks.size( )))
435386
{
436387
tasksLock.unlock ( ); // 09/04/18
437388
_barrierCond.wait (barrierCondLock);
@@ -488,8 +439,12 @@ void ThreadPool::execute ( )
488439
unique_lock<mutex> completedLock (_tasksMutex);
489440
_completed = false;
490441
}
491-
492-
// On met les travailleurs en marche. */
442+
{ // v 6.8.0
443+
unique_lock<mutex> haltLock (_barrierMutex);
444+
_barrier = false;
445+
}
446+
447+
// On met les travailleurs en marche.
493448
for (vector<WorkerThread*>::iterator itw1 = _workerThreads.begin ( ); _workerThreads.end ( ) != itw1; itw1++)
494449
(*itw1)->start ( );
495450

@@ -498,17 +453,25 @@ void ThreadPool::execute ( )
498453
{
499454
deleteDeadTasks ( );
500455

501-
//cout << "Worker " << (unsigned long)this << " WAITS " << yieldDelay ( ) << " nanoseconds ..." << endl;
502-
if (0 == _yieldDelay)
503-
this_thread::yield ( );
504-
else
505-
this_thread::sleep_for (std::chrono::nanoseconds (_yieldDelay)); // v 6.7.0
506-
//cout << "Worker " << (unsigned long)this << " HAS WAITED." << endl;
507-
456+
// Si nécessaire, relancer des taches :
457+
{ // v 6.8.0
458+
unique_lock<mutex> runLock (_tasksMutex);
459+
const size_t queued = _queuedTasks.size ( );
460+
if (0 != queued)
461+
{
462+
unique_lock<mutex> wakeUpLock (_wakeUpCondMutex);
463+
if (1 == queued)
464+
_wakeUpCond.notify_one ( );
465+
else
466+
_wakeUpCond.notify_all ( );
467+
} // if (0 != queued)
468+
}
469+
508470
checkBarrier ( );
509471

510472
{ // Mise en sommeil si absence de travail :
511473
unique_lock<mutex> sleepLock (_tasksMutex);
474+
512475
// Pas de mutex sur _barrierCondMutex : volontaire
513476
if ((false == _barrier) && (0 == _queuedTasks.size ( )) && (0 == _runningTasks.size ( )))
514477
_tasksCond.wait (sleepLock);
@@ -520,11 +483,12 @@ void ThreadPool::execute ( )
520483
}
521484
} // while (false == halted)
522485

523-
// stopWorkers ( );
524486
{
525487
unique_lock<mutex> completedLock (_tasksMutex);
526488
_completed = true;
527489
}
490+
491+
_joinCond.notify_one ( ); // v 6.8.0
528492
} // ThreadPool::execute
529493

530494

@@ -549,7 +513,6 @@ void ThreadPool::taskCompleted (ThreadPool::TaskIfc& task)
549513
// On réveille les travailleurs s'ils étaient au chômage. Peut être y a t'il maintenant une tache pouvant être lancée en concurrence avec celles
550514
// actives :
551515
unique_lock<mutex> wakeUpCondLock (_wakeUpCondMutex);
552-
// _wakeUpCond.notify_all ( );
553516
_wakeUpCond.notify_one ( ); // v 6.7.0
554517
} // ThreadPool::taskCompleted
555518

@@ -575,7 +538,6 @@ void ThreadPool::deleteWorkers ( )
575538
unique_lock<mutex> tasksLock (_tasksMutex);
576539

577540
vector<WorkerThread*> workers = _workerThreads;
578-
int step = 0;
579541
while (false == _workerThreads.empty ( ))
580542
{
581543
for (vector<WorkerThread*>::iterator it = _workerThreads.begin ( ); _workerThreads.end ( ) != it; it++)
@@ -639,21 +601,19 @@ void ThreadPool::join ( )
639601
_barrier = true;
640602
}
641603

642-
// Idem ThreadPool::stopWorkers : pour une raison inconnue le réveil ne fonctionne pas forcément du premier coup, on remet donc ici plusieurs couches.
643-
// 12/2017 - gcc 4.4.6
644-
// for (int i = 0; i < 10; i++)
604+
// Réveiller a minima l'instance de cette classe qui est éventuellement en sommeil dans la fonction execute ( ). Pour ce
605+
// on réveille les travailleurs afin qu'ils soient eux-mêmes joignables :
645606
{
646607
unique_lock<mutex> sleepLock (_tasksMutex);
647608
_tasksCond.notify_all ( );
648-
this_thread::yield ( );
649609
}
650610
bool completed = false;
651-
while (false == completed)
652-
{
653-
unique_lock<mutex> completionLock (_tasksMutex);
654-
completed = _completed;
655-
this_thread::yield ( );
656-
} // while (false == completed)
611+
612+
// v 6.8.0. Attendre si nécessaire la fin d'execute ( ) :
613+
std::mutex joinMutex;
614+
unique_lock<mutex> joinLock (joinMutex);
615+
_joinCond.wait (joinLock);
616+
657617
_thread->join ( );
658618
} // ThreadPool::join
659619

@@ -685,12 +645,6 @@ ThreadPool::TaskIfc* ThreadPool::getTask ( )
685645
} // if (0 != _queuedTasks.size ( ))
686646
}
687647

688-
/* if (0 == task) v 6.7.0 : appel à ThreadPool::getTask ( ) non bloquant
689-
{
690-
unique_lock<mutex> wakeUpCondLock (_wakeUpCondMutex);
691-
_wakeUpCond.wait (wakeUpCondLock);
692-
} */
693-
694648
return task;
695649
} // ThreadPool::getTask
696650

0 commit comments

Comments
 (0)