7
7
#include " communicator.h"
8
8
#include " recorders.h"
9
9
10
- namespace celerity ::detail {
11
- struct runtime_testspy ;
12
- }
13
-
14
10
namespace celerity ::detail::divergence_checker_detail {
15
11
using task_hash = size_t ;
16
12
using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;
@@ -72,29 +68,32 @@ class divergence_block_chain {
72
68
std::mutex m_task_records_mutex;
73
69
74
70
std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now();
71
+ std::chrono::seconds m_time_of_last_warning = std::chrono::seconds(0 );
75
72
76
73
std::unique_ptr<communicator> m_communicator;
77
74
78
- void divergence_out (const divergence_map& check_map, const int task_num);
75
+ void reprot_divergence (const divergence_map& check_map, const int task_num);
79
76
80
77
void add_new_hashes ();
81
78
void clear (const int min_progress);
82
79
std::pair<int , int > collect_hash_counts ();
83
80
per_node_task_hashes collect_hashes (const int min_hash_count) const ;
84
- divergence_map create_check_map (const per_node_task_hashes& task_hashes, const int task_num) const ;
81
+ divergence_map create_divergence_map (const per_node_task_hashes& task_hashes, const int task_num) const ;
85
82
86
- void check_for_deadlock () const ;
83
+ void check_for_deadlock ();
87
84
88
- static void log_node_divergences (const divergence_map& check_map, const int task_num );
85
+ static void log_node_divergences (const divergence_map& check_map, const int task_id );
89
86
static void log_task_record (const divergence_map& check_map, const task_record& task, const task_hash hash);
90
87
void log_task_record_once (const divergence_map& check_map, const int task_num);
91
88
92
89
void add_new_task (const task_record& task);
93
90
task_record thread_save_get_task_record (const size_t task_num);
94
91
};
92
+ }; // namespace celerity::detail::divergence_checker_detail
95
93
94
+ namespace celerity ::detail {
96
95
class divergence_checker {
97
- friend struct ::celerity::detail:: runtime_testspy;
96
+ friend struct runtime_testspy ;
98
97
99
98
public:
100
99
divergence_checker (task_recorder& task_recorder, std::unique_ptr<communicator> comm, bool test_mode = false )
@@ -111,6 +110,10 @@ class divergence_checker {
111
110
~divergence_checker () { stop (); }
112
111
113
112
private:
113
+ std::thread m_thread;
114
+ bool m_is_running = false ;
115
+ divergence_checker_detail::divergence_block_chain m_block_chain;
116
+
114
117
void start () {
115
118
m_thread = std::thread (&divergence_checker::run, this );
116
119
m_is_running = true ;
@@ -129,9 +132,5 @@ class divergence_checker {
129
132
std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
130
133
}
131
134
}
132
-
133
- std::thread m_thread;
134
- bool m_is_running = false ;
135
- divergence_block_chain m_block_chain;
136
135
};
137
- }; // namespace celerity::detail::divergence_checker_detail
136
+ };
0 commit comments