7
7
#include " communicator.h"
8
8
#include " recorders.h"
9
9
10
- namespace celerity ::detail {
11
- struct runtime_testspy ;
12
- }
13
-
14
10
namespace celerity ::detail::divergence_checker_detail {
15
11
using task_hash = size_t ;
16
12
using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;
@@ -67,34 +63,38 @@ class divergence_block_chain {
67
63
std::vector<task_record> m_task_records;
68
64
size_t m_tasks_checked = 0 ;
69
65
size_t m_hashes_added = 0 ;
66
+ task_hash m_last_hash = 0 ;
70
67
71
68
std::vector<int > m_per_node_hash_counts;
72
69
std::mutex m_task_records_mutex;
73
70
74
71
std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now();
72
+ std::chrono::seconds m_time_of_last_warning = std::chrono::seconds(0 );
75
73
76
74
std::unique_ptr<communicator> m_communicator;
77
75
78
- void divergence_out (const divergence_map& check_map, const int task_num);
76
+ void reprot_divergence (const divergence_map& check_map, const int task_num);
79
77
80
78
void add_new_hashes ();
81
79
void clear (const int min_progress);
82
80
std::pair<int , int > collect_hash_counts ();
83
81
per_node_task_hashes collect_hashes (const int min_hash_count) const ;
84
- divergence_map create_check_map (const per_node_task_hashes& task_hashes, const int task_num) const ;
82
+ divergence_map create_divergence_map (const per_node_task_hashes& task_hashes, const int task_num) const ;
85
83
86
- void check_for_deadlock () const ;
84
+ void check_for_deadlock ();
87
85
88
- static void log_node_divergences (const divergence_map& check_map, const int task_num );
86
+ static void log_node_divergences (const divergence_map& check_map, const int task_id );
89
87
static void log_task_record (const divergence_map& check_map, const task_record& task, const task_hash hash);
90
88
void log_task_record_once (const divergence_map& check_map, const int task_num);
91
89
92
90
void add_new_task (const task_record& task);
93
91
task_record thread_save_get_task_record (const size_t task_num);
94
92
};
93
+ }; // namespace celerity::detail::divergence_checker_detail
95
94
95
+ namespace celerity ::detail {
96
96
class divergence_checker {
97
- friend struct ::celerity::detail:: runtime_testspy;
97
+ friend struct runtime_testspy ;
98
98
99
99
public:
100
100
divergence_checker (task_recorder& task_recorder, std::unique_ptr<communicator> comm, bool test_mode = false )
@@ -111,6 +111,10 @@ class divergence_checker {
111
111
~divergence_checker () { stop (); }
112
112
113
113
private:
114
+ std::thread m_thread;
115
+ bool m_is_running = false ;
116
+ divergence_checker_detail::divergence_block_chain m_block_chain;
117
+
114
118
void start () {
115
119
m_thread = std::thread (&divergence_checker::run, this );
116
120
m_is_running = true ;
@@ -129,9 +133,5 @@ class divergence_checker {
129
133
std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
130
134
}
131
135
}
132
-
133
- std::thread m_thread;
134
- bool m_is_running = false ;
135
- divergence_block_chain m_block_chain;
136
136
};
137
- }; // namespace celerity::detail::divergence_checker_detail
137
+ };
0 commit comments