Skip to content

Commit c23396f

Browse files
Merge pull request #7 from LIHPC-Computational-Geometry/cpu100_fan
Release 6.7.0. ThreadPool class fix : avoid consuming 100% of the CPU in the absence of a task to perform.
2 parents 25bac22 + ac3096d commit c23396f

File tree

5 files changed

+96
-98
lines changed

5 files changed

+96
-98
lines changed

cmake/version.cmake

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
#
44

55
set (TK_UTIL_MAJOR_VERSION "6")
6-
set (TK_UTIL_MINOR_VERSION "6")
7-
set (TK_UTIL_RELEASE_VERSION "1")
6+
set (TK_UTIL_MINOR_VERSION "7")
7+
set (TK_UTIL_RELEASE_VERSION "0")
88
set (TK_UTIL_VERSION ${TK_UTIL_MAJOR_VERSION}.${TK_UTIL_MINOR_VERSION}.${TK_UTIL_RELEASE_VERSION})
99

1010
set (TK_UTIL_SCRIPTING_MAJOR_VERSION ${TK_UTIL_MAJOR_VERSION})

src/TkUtil/ThreadPool.cpp

Lines changed: 57 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,18 @@ static const Charset charset ("àéèùô");
2222
// ============================================================================
2323

2424
ThreadPool::TaskIfc::TaskIfc ( )
25-
: _status (ThreadPool::TaskIfc::WAITING), _message ( ), _toDelete (false),
26-
_concurrencyFlag (0)
25+
: _status (ThreadPool::TaskIfc::WAITING), _message ( ), _toDelete (false), _concurrencyFlag (0)
2726
{
2827
} // TaskIfc::TaskIfc
2928

3029

3130
ThreadPool::TaskIfc::TaskIfc (const ThreadPool::TaskIfc& task)
32-
: _status (task._status), _message (task._message),
33-
_toDelete (task._toDelete), _concurrencyFlag (0)
31+
: _status (task._status), _message (task._message), _toDelete (task._toDelete), _concurrencyFlag (0)
3432
{
3533
} // TaskIfc::TaskIfc
3634

3735

38-
ThreadPool::TaskIfc& ThreadPool::TaskIfc::operator = (
39-
const ThreadPool::TaskIfc& task)
36+
ThreadPool::TaskIfc& ThreadPool::TaskIfc::operator = (const ThreadPool::TaskIfc& task)
4037
{
4138
if (&task != this)
4239
{
@@ -78,10 +75,8 @@ void ThreadPool::TaskIfc::setStatus (STATUS status, const UTF8String& msg)
7875
// ============================================================================
7976

8077
ThreadPool::WorkerThread::WorkerThread ( )
81-
: _thread (0), _task (0), _haltMutex ( ), _halted (false),
82-
_completed (false)
83-
{ // Rem : les bouléens _halted et _barrier ne sont pas initialisés
84-
// ici pour éviter des accès concurrents détectés par intel inspector.
78+
: _thread (0), _task (0), _haltMutex ( ), _halted (false), _completed (false)
79+
{ // Rem : les bouléens _halted et _barrier ne sont pas initialisés ici pour éviter des accès concurrents détectés par intel inspector.
8580
// Ils seront initialisés dans execute.
8681
} // WorkerThread::WorkerThread
8782

@@ -93,8 +88,7 @@ ThreadPool::WorkerThread::WorkerThread (const ThreadPool::WorkerThread&)
9388
} // WorkerThread::WorkerThread
9489

9590

96-
ThreadPool::WorkerThread& ThreadPool::WorkerThread::operator = (
97-
const ThreadPool::WorkerThread&)
91+
ThreadPool::WorkerThread& ThreadPool::WorkerThread::operator = (const ThreadPool::WorkerThread&)
9892
{
9993
assert (0 && "WorkerThread assignment operator is not allowed.");
10094

@@ -133,7 +127,13 @@ void ThreadPool::WorkerThread::execute ( )
133127

134128
if (0 == _task)
135129
{
136-
this_thread::yield ( );
130+
//cout << "Worker " << (unsigned long)this << " WAITS " << yieldDelay ( ) << " nanoseconds ..." << endl;
131+
const size_t delay = ThreadPool::yieldDelay ( );
132+
if (0 == delay)
133+
this_thread::yield ( );
134+
else
135+
this_thread::sleep_for (std::chrono::nanoseconds (delay)); // v 6.7.0
136+
//cout << "Worker " << (unsigned long)this << " HAS WAITED." << endl;
137137
} // if (0 == _task)
138138
else
139139
{
@@ -229,21 +229,19 @@ void ThreadPool::WorkerThread::join ( )
229229

230230
ThreadPool* ThreadPool::_instance = 0;
231231
bool ThreadPool::_completed = true; // !running
232-
232+
size_t ThreadPool::_yieldDelay = 100000; // v 6.7.0 - 1 milliseconde
233233

234234
ThreadPool::ThreadPool (size_t tasksNum)
235235
: _thread (0), _tasksNum (tasksNum),
236236
_queuedTasks ( ), _runningTasks ( ), _deadTasks ( ), _workerThreads ( ),
237237
_tasksMutex ( ), _tasksCond ( ), _wakeUpCondMutex ( ), _barrierCondMutex ( )
238-
{ // Rem : les bouléens _halted et _barrier ne sont pas initialisés
239-
// ici pour éviter des accès concurrents détectés par intel inspector.
238+
{ // Rem : les bouléens _halted et _barrier ne sont pas initialisés ici pour éviter des accès concurrents détectés par intel inspector.
240239
// Ils seront initialisés dans init.
241240
} // ThreadPool::ThreadPool
242241

243242

244243
ThreadPool::ThreadPool (const ThreadPool&)
245-
: _thread (0), _tasksNum (0), _haltMutex ( ),
246-
_queuedTasks ( ), _runningTasks ( ), _deadTasks ( ), _workerThreads ( ),
244+
: _thread (0), _tasksNum (0), _haltMutex ( ), _queuedTasks ( ), _runningTasks ( ), _deadTasks ( ), _workerThreads ( ),
247245
_tasksMutex ( ), _tasksCond ( ), _wakeUpCondMutex ( ), _barrierCondMutex ( )
248246
{
249247
assert (0 && "ThreadPool copy constructor is not allowed.");
@@ -286,9 +284,7 @@ void ThreadPool::initialize (size_t tasksNum)
286284
} // if (__INTEL_COMPILER < 1500)
287285
#endif // __INTEL_COMPILER
288286

289-
_instance = new ThreadPool (
290-
0 == tasksNum ?
291-
MachineData::instance ( ).getProcessorsNum ( ) : tasksNum);
287+
_instance = new ThreadPool (0 == tasksNum ? MachineData::instance ( ).getProcessorsNum ( ) : tasksNum);
292288
CHECK_NULL_PTR_ERROR (_instance)
293289

294290
// Création/mise en service des travailleurs :
@@ -321,6 +317,18 @@ ThreadPool& ThreadPool::instance ( )
321317
} // ThreadPool::instance
322318

323319

320+
size_t ThreadPool::yieldDelay ( ) // v 6.7.0
321+
{
322+
return _yieldDelay;
323+
} // ThreadPool::yieldDelay
324+
325+
326+
void ThreadPool::setYieldDelay (size_t delay) // v 6.7.0
327+
{
328+
_yieldDelay = delay;
329+
} // ThreadPool::setYieldDelay
330+
331+
324332
void ThreadPool::stop ( )
325333
{
326334
unique_lock<mutex> haltLock (_haltMutex);
@@ -331,8 +339,7 @@ void ThreadPool::stop ( )
331339
void ThreadPool::stopWorkers ( )
332340
{
333341
// On demande l'arrêt des travailleurs. */
334-
for (vector<WorkerThread*>::iterator itw1 = _workerThreads.begin ( );
335-
_workerThreads.end ( ) != itw1; itw1++)
342+
for (vector<WorkerThread*>::iterator itw1 = _workerThreads.begin ( ); _workerThreads.end ( ) != itw1; itw1++)
336343
(*itw1)->stop ( );
337344

338345
// On attend l'arrêt des travailleurs. */
@@ -346,8 +353,7 @@ void ThreadPool::stopWorkers ( )
346353
bool completed = false;
347354
while (false == completed)
348355
{
349-
// Pour une raison inconnue le réveil ci-dessus ne fonctionne pas
350-
// toujours bien, on remet donc ici une couche tant que tous les threads
356+
// Pour une raison inconnue le réveil ci-dessus ne fonctionne pas toujours bien, on remet donc ici une couche tant que tous les threads
351357
// ne sont pas achevés.
352358
// 12/2017 - gcc 4.4.6
353359
{
@@ -356,14 +362,12 @@ void ThreadPool::stopWorkers ( )
356362
}
357363
this_thread::yield ( );
358364
completed = true;
359-
for (vector<WorkerThread*>::iterator itw3 = _workerThreads.begin ( );
360-
_workerThreads.end ( ) != itw3; itw3++)
365+
for (vector<WorkerThread*>::iterator itw3 = _workerThreads.begin ( ); _workerThreads.end ( ) != itw3; itw3++)
361366
if (false == (*itw3)->completed ( ))
362367
completed = false;
363368
} // while (false == completed)
364369

365-
for (vector<WorkerThread*>::iterator itw2 = _workerThreads.begin ( );
366-
_workerThreads.end ( ) != itw2; itw2++)
370+
for (vector<WorkerThread*>::iterator itw2 = _workerThreads.begin ( ); _workerThreads.end ( ) != itw2; itw2++)
367371
(*itw2)->join ( );
368372
} // ThreadPool::stopWorkers
369373

@@ -386,8 +390,7 @@ void ThreadPool::addTask (ThreadPool::TaskIfc& task, bool barrier)
386390
} // ThreadPool::addTask (ThreadIfc* thread)
387391

388392

389-
void ThreadPool::addTasks (
390-
const vector<ThreadPool::TaskIfc*>& tasks, bool barrier)
393+
void ThreadPool::addTasks (const vector<ThreadPool::TaskIfc*>& tasks, bool barrier)
391394
{
392395
unique_lock<mutex> tasksLock (_tasksMutex);
393396
unique_lock<mutex> barrierLock (_barrierMutex);
@@ -401,7 +404,10 @@ void ThreadPool::addTasks (
401404

402405
// On réveille les travailleurs s'ils étaient au chômage :
403406
unique_lock<mutex> wakeUpCondLock (_wakeUpCondMutex);
404-
_wakeUpCond.notify_all ( );
407+
if (1 == tasks.size ( ))
408+
_wakeUpCond.notify_one ( ); // v 6.7.0
409+
else
410+
_wakeUpCond.notify_all ( );
405411
} // ThreadPool::addTasks (ThreadIfc* thread)
406412

407413

@@ -484,24 +490,27 @@ void ThreadPool::execute ( )
484490
}
485491

486492
// On met les travailleurs en marche. */
487-
for (vector<WorkerThread*>::iterator itw1 = _workerThreads.begin ( );
488-
_workerThreads.end ( ) != itw1; itw1++)
493+
for (vector<WorkerThread*>::iterator itw1 = _workerThreads.begin ( ); _workerThreads.end ( ) != itw1; itw1++)
489494
(*itw1)->start ( );
490495

491496
// La boucle d'exécution du gestionnaire de travailleurs :
492497
while (false == halted)
493498
{
494499
deleteDeadTasks ( );
495500

496-
this_thread::yield ( );
501+
//cout << "Worker " << (unsigned long)this << " WAITS " << yieldDelay ( ) << " nanoseconds ..." << endl;
502+
if (0 == _yieldDelay)
503+
this_thread::yield ( );
504+
else
505+
this_thread::sleep_for (std::chrono::nanoseconds (_yieldDelay)); // v 6.7.0
506+
//cout << "Worker " << (unsigned long)this << " HAS WAITED." << endl;
497507

498508
checkBarrier ( );
499509

500510
{ // Mise en sommeil si absence de travail :
501511
unique_lock<mutex> sleepLock (_tasksMutex);
502512
// Pas de mutex sur _barrierCondMutex : volontaire
503-
if ((false == _barrier) && (0 == _queuedTasks.size ( )) &&
504-
(0 == _runningTasks.size ( )))
513+
if ((false == _barrier) && (0 == _queuedTasks.size ( )) && (0 == _runningTasks.size ( )))
505514
_tasksCond.wait (sleepLock);
506515
}
507516

@@ -527,8 +536,7 @@ void ThreadPool::taskCompleted (ThreadPool::TaskIfc& task)
527536
_deadTasks.push_back (&task);
528537

529538
bool found = false;
530-
for (vector<ThreadPool::TaskIfc*>::iterator it =
531-
_runningTasks.begin ( ); _runningTasks.end ( ) != it; it++)
539+
for (vector<ThreadPool::TaskIfc*>::iterator it = _runningTasks.begin ( ); _runningTasks.end ( ) != it; it++)
532540
{
533541
if (&task == *it)
534542
{
@@ -538,11 +546,11 @@ void ThreadPool::taskCompleted (ThreadPool::TaskIfc& task)
538546
} // if (&task == *it)
539547
} // for (vector<ThreadPool::TaskIfc*>::const_iterator it = ...
540548

541-
// On réveille les travailleurs s'ils étaient au chômage. Peut être y a t'il
542-
// maintenant une tache pouvant être lancée en concurrence avec celles
549+
// On réveille les travailleurs s'ils étaient au chômage. Peut être y a t'il maintenant une tache pouvant être lancée en concurrence avec celles
543550
// actives :
544551
unique_lock<mutex> wakeUpCondLock (_wakeUpCondMutex);
545-
_wakeUpCond.notify_all ( );
552+
// _wakeUpCond.notify_all ( );
553+
_wakeUpCond.notify_one ( ); // v 6.7.0
546554
} // ThreadPool::taskCompleted
547555

548556

@@ -552,8 +560,7 @@ void ThreadPool::deleteDeadTasks ( )
552560

553561
try
554562
{
555-
for (vector<ThreadPool::TaskIfc*>::iterator it = _deadTasks.begin ( );
556-
_deadTasks.end ( ) != it; it++)
563+
for (vector<ThreadPool::TaskIfc*>::iterator it = _deadTasks.begin ( ); _deadTasks.end ( ) != it; it++)
557564
delete *it;
558565
_deadTasks.clear ( );
559566
}
@@ -571,8 +578,7 @@ void ThreadPool::deleteWorkers ( )
571578
int step = 0;
572579
while (false == _workerThreads.empty ( ))
573580
{
574-
for (vector<WorkerThread*>::iterator it = _workerThreads.begin ( );
575-
_workerThreads.end ( ) != it; it++)
581+
for (vector<WorkerThread*>::iterator it = _workerThreads.begin ( ); _workerThreads.end ( ) != it; it++)
576582
{
577583
const bool completed = (*it)->completed ( );
578584
if (true == completed)
@@ -617,8 +623,7 @@ bool ThreadPool::validateConcurrency (size_t flag) const
617623
if (0 == flag)
618624
return true;
619625

620-
for (vector<ThreadPool::TaskIfc*>::const_iterator it =
621-
_runningTasks.begin ( ); _runningTasks.end ( ) != it; it++)
626+
for (vector<ThreadPool::TaskIfc*>::const_iterator it = _runningTasks.begin ( ); _runningTasks.end ( ) != it; it++)
622627
if (0 != ((*it)->getConcurrencyFlag ( ) & flag))
623628
return false;
624629

@@ -634,9 +639,7 @@ void ThreadPool::join ( )
634639
_barrier = true;
635640
}
636641

637-
// Idem ThreadPool::stopWorkers : pour une raison inconnue le réveil ne
638-
// fonctionne pas forcément du premier coup, on remet donc ici plusieurs
639-
// couches.
642+
// Idem ThreadPool::stopWorkers : pour une raison inconnue le réveil ne fonctionne pas forcément du premier coup, on remet donc ici plusieurs couches.
640643
// 12/2017 - gcc 4.4.6
641644
// for (int i = 0; i < 10; i++)
642645
{
@@ -669,8 +672,7 @@ ThreadPool::TaskIfc* ThreadPool::getTask ( )
669672

670673
if (0 != _queuedTasks.size ( ))
671674
{
672-
for (deque<ThreadPool::TaskIfc*>::iterator it =
673-
_queuedTasks.begin ( ); _queuedTasks.end ( ) != it; it++)
675+
for (deque<ThreadPool::TaskIfc*>::iterator it = _queuedTasks.begin ( ); _queuedTasks.end ( ) != it; it++)
674676
{
675677
if (true == validateConcurrency ((*it)->getConcurrencyFlag ( )))
676678
{
@@ -683,11 +685,11 @@ ThreadPool::TaskIfc* ThreadPool::getTask ( )
683685
} // if (0 != _queuedTasks.size ( ))
684686
}
685687

686-
if (0 == task)
688+
/* if (0 == task) v 6.7.0 : appel à ThreadPool::getTask ( ) non bloquant
687689
{
688690
unique_lock<mutex> wakeUpCondLock (_wakeUpCondMutex);
689691
_wakeUpCond.wait (wakeUpCondLock);
690-
}
692+
} */
691693

692694
return task;
693695
} // ThreadPool::getTask

0 commit comments

Comments
 (0)