1
+ //! Statistics for work packets
2
+ use super :: work_counter:: { WorkCounter , WorkCounterBase , WorkDuration } ;
1
3
use std:: any:: TypeId ;
2
4
use std:: collections:: HashMap ;
3
5
use std:: sync:: atomic:: { AtomicBool , Ordering } ;
4
- use std:: time:: { Duration , SystemTime } ;
5
6
7
+ /// Merge and print the work-packet level statistics from all worker threads
6
8
#[ derive( Default ) ]
7
9
pub struct SchedulerStat {
10
+ /// Map work packet type IDs to work packet names
8
11
work_id_name_map : HashMap < TypeId , & ' static str > ,
12
+ /// Count the number of work packets executed for different types
9
13
work_counts : HashMap < TypeId , usize > ,
10
- work_durations : HashMap < TypeId , Vec < Duration > > ,
14
+ /// Collect work counters from work threads.
15
+ /// Two dimensional vectors are used, e.g.
16
+ /// `[[foo_0, ..., foo_n], ..., [bar_0, ..., bar_n]]`.
17
+ /// The first dimension is for different types of work counters,
18
+ /// (`foo` and `bar` in the above example).
19
+ /// The second dimension if for work counters of the same type but from
20
+ /// different threads (`foo_0` and `bar_0` are from the same thread).
21
+ /// The order of insertion is determined by when [`SchedulerStat::merge`] is
22
+ /// called for each [`WorkerLocalStat`].
23
+ /// We assume different threads have the same set of work counters
24
+ /// (in the same order).
25
+ work_counters : HashMap < TypeId , Vec < Vec < Box < dyn WorkCounter > > > > ,
11
26
}
12
27
13
28
impl SchedulerStat {
@@ -22,37 +37,7 @@ impl SchedulerStat {
22
37
}
23
38
}
24
39
25
- fn geomean ( & self , values : & [ f64 ] ) -> f64 {
26
- // Geomean(xs, N=xs.len()) = (PI(xs))^(1/N) = e^{log{PI(xs)^(1/N)}} = e^{ (1/N) * sum_{x \in xs}{ log(x) } }
27
- let logs = values. iter ( ) . map ( |v| v. ln ( ) ) ;
28
- let sum_logs = logs. sum :: < f64 > ( ) ;
29
- ( sum_logs / values. len ( ) as f64 ) . exp ( )
30
- }
31
-
32
- fn min ( & self , values : & [ f64 ] ) -> f64 {
33
- let mut min = values[ 0 ] ;
34
- for v in values {
35
- if * v < min {
36
- min = * v
37
- }
38
- }
39
- min
40
- }
41
-
42
- fn max ( & self , values : & [ f64 ] ) -> f64 {
43
- let mut max = values[ 0 ] ;
44
- for v in values {
45
- if * v > max {
46
- max = * v
47
- }
48
- }
49
- max
50
- }
51
-
52
- fn sum ( & self , values : & [ f64 ] ) -> f64 {
53
- values. iter ( ) . sum ( )
54
- }
55
-
40
+ /// Used during statistics printing at [`crate::memory_manager::harness_end`]
56
41
pub fn harness_stat ( & self ) -> HashMap < String , String > {
57
42
let mut stat = HashMap :: new ( ) ;
58
43
// Work counts
@@ -67,109 +52,124 @@ impl SchedulerStat {
67
52
}
68
53
stat. insert ( "total-work.count" . to_owned ( ) , format ! ( "{}" , total_count) ) ;
69
54
// Work execution times
70
- let mut total_durations = vec ! [ ] ;
71
- for ( t, durations) in & self . work_durations {
72
- for d in durations {
73
- total_durations. push ( * d) ;
74
- }
55
+ let mut duration_overall: WorkCounterBase = Default :: default ( ) ;
56
+ for ( t, vs) in & self . work_counters {
57
+ // Name of the work packet type
75
58
let n = self . work_id_name_map [ t] ;
76
- let geomean = self . geomean (
77
- & durations
78
- . iter ( )
79
- . map ( |d| d. as_nanos ( ) as f64 )
80
- . collect :: < Vec < _ > > ( ) ,
81
- ) ;
82
- stat. insert (
83
- format ! ( "work.{}.time.geomean" , self . work_name( n) ) ,
84
- format ! ( "{:.2}" , geomean) ,
85
- ) ;
86
- let sum = self . sum (
87
- & durations
59
+ // Iterate through different types of work counters
60
+ for v in vs. iter ( ) {
61
+ // Aggregate work counters of the same type but from different
62
+ // worker threads
63
+ let fold = v
88
64
. iter ( )
89
- . map ( |d| d. as_nanos ( ) as f64 )
90
- . collect :: < Vec < _ > > ( ) ,
91
- ) ;
92
- stat. insert (
93
- format ! ( "work.{}.time.sum" , self . work_name( n) ) ,
94
- format ! ( "{:.2}" , sum) ,
95
- ) ;
96
- }
97
- let durations = total_durations
98
- . iter ( )
99
- . map ( |d| d. as_nanos ( ) as f64 )
100
- . collect :: < Vec < _ > > ( ) ;
101
- if !durations. is_empty ( ) {
102
- stat. insert (
103
- "total-work.time.geomean" . to_owned ( ) ,
104
- format ! ( "{:.2}" , self . geomean( & durations) ) ,
105
- ) ;
106
- stat. insert (
107
- "total-work.time.min" . to_owned ( ) ,
108
- format ! ( "{:.2}" , self . min( & durations) ) ,
109
- ) ;
110
- stat. insert (
111
- "total-work.time.max" . to_owned ( ) ,
112
- format ! ( "{:.2}" , self . max( & durations) ) ,
113
- ) ;
65
+ . fold ( Default :: default ( ) , |acc : WorkCounterBase , x| {
66
+ acc. merge ( x. get_base ( ) )
67
+ } ) ;
68
+ // Update the overall execution time
69
+ duration_overall. merge_inplace ( & fold) ;
70
+ let name = v. first ( ) . unwrap ( ) . name ( ) ;
71
+ stat. insert (
72
+ format ! ( "work.{}.{}.total" , self . work_name( n) , name) ,
73
+ format ! ( "{:.2}" , fold. total) ,
74
+ ) ;
75
+ stat. insert (
76
+ format ! ( "work.{}.{}.min" , self . work_name( n) , name) ,
77
+ format ! ( "{:.2}" , fold. min) ,
78
+ ) ;
79
+ stat. insert (
80
+ format ! ( "work.{}.{}.max" , self . work_name( n) , name) ,
81
+ format ! ( "{:.2}" , fold. max) ,
82
+ ) ;
83
+ }
114
84
}
85
+ // Print out overall execution time
86
+ stat. insert (
87
+ "total-work.time.total" . to_owned ( ) ,
88
+ format ! ( "{:.2}" , duration_overall. total) ,
89
+ ) ;
90
+ stat. insert (
91
+ "total-work.time.min" . to_owned ( ) ,
92
+ format ! ( "{:.2}" , duration_overall. min) ,
93
+ ) ;
94
+ stat. insert (
95
+ "total-work.time.max" . to_owned ( ) ,
96
+ format ! ( "{:.2}" , duration_overall. max) ,
97
+ ) ;
115
98
116
99
stat
117
100
}
118
-
101
+ /// Merge work counters from different worker threads
119
102
pub fn merge ( & mut self , stat : & WorkerLocalStat ) {
103
+ // Merge work packet type ID to work packet name mapping
120
104
for ( id, name) in & stat. work_id_name_map {
121
105
self . work_id_name_map . insert ( * id, * name) ;
122
106
}
107
+ // Merge work count for different work packet types
123
108
for ( id, count) in & stat. work_counts {
124
109
if self . work_counts . contains_key ( id) {
125
110
* self . work_counts . get_mut ( id) . unwrap ( ) += * count;
126
111
} else {
127
112
self . work_counts . insert ( * id, * count) ;
128
113
}
129
114
}
130
- for ( id, durations) in & stat. work_durations {
131
- if self . work_durations . contains_key ( id) {
132
- let work_durations = self . work_durations . get_mut ( id) . unwrap ( ) ;
133
- for d in durations {
134
- work_durations. push ( * d) ;
135
- }
136
- } else {
137
- self . work_durations . insert ( * id, durations. clone ( ) ) ;
115
+ // Merge work counter for different work packet types
116
+ for ( id, counters) in & stat. work_counters {
117
+ // Initialize the two dimensional vector
118
+ // [
119
+ // [], // foo counter
120
+ // [], // bar counter
121
+ // ]
122
+ let vs = self
123
+ . work_counters
124
+ . entry ( * id)
125
+ . or_insert_with ( || vec ! [ vec![ ] ; counters. len( ) ] ) ;
126
+ // [
127
+ // [counters[0] of type foo],
128
+ // [counters[1] of type bar]
129
+ // ]
130
+ for ( v, c) in vs. iter_mut ( ) . zip ( counters. iter ( ) ) {
131
+ v. push ( c. clone ( ) ) ;
138
132
}
139
133
}
140
134
}
141
135
}
142
136
137
+ /// Describing a single work packet
143
138
pub struct WorkStat {
144
139
type_id : TypeId ,
145
140
type_name : & ' static str ,
146
- start_time : SystemTime ,
147
141
}
148
142
149
143
impl WorkStat {
144
+ /// Stop all work counters for the work packet type of the just executed
145
+ /// work packet
150
146
#[ inline( always) ]
151
147
pub fn end_of_work ( & self , worker_stat : & mut WorkerLocalStat ) {
152
148
if !worker_stat. is_enabled ( ) {
153
149
return ;
154
150
} ;
151
+ // Insert type ID, name pair
155
152
worker_stat
156
153
. work_id_name_map
157
154
. insert ( self . type_id , self . type_name ) ;
155
+ // Increment work count
158
156
* worker_stat. work_counts . entry ( self . type_id ) . or_insert ( 0 ) += 1 ;
159
- let duration = self . start_time . elapsed ( ) . unwrap ( ) ;
157
+ // Stop counters
160
158
worker_stat
161
- . work_durations
159
+ . work_counters
162
160
. entry ( self . type_id )
163
- . or_insert_with ( Vec :: new)
164
- . push ( duration) ;
161
+ . and_modify ( |v| {
162
+ v. iter_mut ( ) . for_each ( |c| c. stop ( ) ) ;
163
+ } ) ;
165
164
}
166
165
}
167
166
167
+ /// Worker thread local counterpart of [`SchedulerStat`]
168
168
#[ derive( Default ) ]
169
169
pub struct WorkerLocalStat {
170
170
work_id_name_map : HashMap < TypeId , & ' static str > ,
171
171
work_counts : HashMap < TypeId , usize > ,
172
- work_durations : HashMap < TypeId , Vec < Duration > > ,
172
+ work_counters : HashMap < TypeId , Vec < Box < dyn WorkCounter > > > ,
173
173
enabled : AtomicBool ,
174
174
}
175
175
@@ -182,12 +182,26 @@ impl WorkerLocalStat {
182
182
pub fn enable ( & self ) {
183
183
self . enabled . store ( true , Ordering :: SeqCst ) ;
184
184
}
185
+ /// Measure the execution of a work packet by starting all counters for that
186
+ /// type
185
187
#[ inline]
186
188
pub fn measure_work ( & mut self , work_id : TypeId , work_name : & ' static str ) -> WorkStat {
187
- WorkStat {
189
+ let stat = WorkStat {
188
190
type_id : work_id,
189
191
type_name : work_name,
190
- start_time : SystemTime :: now ( ) ,
192
+ } ;
193
+ if self . is_enabled ( ) {
194
+ self . work_counters
195
+ . entry ( work_id)
196
+ . or_insert_with ( WorkerLocalStat :: counter_set)
197
+ . iter_mut ( )
198
+ . for_each ( |c| c. start ( ) ) ;
191
199
}
200
+ stat
201
+ }
202
+
203
+ // The set of work counters for all work packet types
204
+ fn counter_set ( ) -> Vec < Box < dyn WorkCounter > > {
205
+ vec ! [ Box :: new( WorkDuration :: new( ) ) ]
192
206
}
193
207
}
0 commit comments