26
26
27
27
#include < chrono>
28
28
using std::chrono::high_resolution_clock;
29
+ #include < numeric>
29
30
30
31
#include < esl/agent.hpp>
31
32
#include < esl/computation/environment.hpp>
32
33
#include < esl/data/log.hpp>
34
+ #include < esl/quantity.hpp>
33
35
34
36
35
37
namespace esl ::simulation {
@@ -44,6 +46,7 @@ namespace esl::simulation {
44
46
, sample(parameters.get<std::uint64_t >(" sample" ))
45
47
, agents(e)
46
48
, verbosity(parameters.get<std::uint64_t >(" verbosity" ))
49
+ , threads( std::max(1u , parameters.get<unsigned int >(" threads" )))
47
50
{
48
51
49
52
}
@@ -65,35 +68,41 @@ namespace esl::simulation {
65
68
environment_.before_step ();
66
69
67
70
// read the sample index from the parameter collection
68
- auto first_event_ = step.upper ;
71
+
72
+ std::mutex mutex_first_event_;
73
+ time_point first_event_ = step.upper ;
69
74
unsigned int round_ = 0 ;
70
75
do {
71
76
72
77
if (verbosity > 0 && 0 == (rounds_ % verbosity)){
73
78
LOG (notice) << " time " << step << " round " << round_ << std::endl;
74
79
}
75
80
first_event_ = step.upper ;
76
- for (auto &[i, a] : agents.local_agents_ ) {
77
-
78
- // double agent_cb_end_;
79
- // timings_.emplace(i, 0.);
80
- // timings_cb_.emplace(i, 0.);
81
- // timings_act_.emplace(i, 0.);
82
- // auto agent_start_ = high_resolution_clock::now();
83
- // auto agent_act_ = high_resolution_clock::now();
81
+
82
+
83
+ auto job_ = [&](std::shared_ptr<agent> a){
84
+ // double agent_cb_end_;
85
+ // timings_.emplace(i, 0.);
86
+ // timings_cb_.emplace(i, 0.);
87
+ // timings_act_.emplace(i, 0.);
88
+ // auto agent_start_ = high_resolution_clock::now();
89
+ // auto agent_act_ = high_resolution_clock::now();
84
90
// The seed is deterministic in the following variables:
85
91
std::seed_seq seed_ {
86
- std::uint64_t (std::hash<identity<agent>>()(i)),
87
- std::uint64_t (step.lower ),
88
- std::uint64_t (round_),
89
- sample
90
- };
91
-
92
- // try {
93
- first_event_ = std::min (first_event_, a->process_messages (step, seed_));
94
- // agent_cb_end_ = double((high_resolution_clock::now() - agent_start_).count());
95
- // agent_act_ = high_resolution_clock::now();
92
+ std::uint64_t (std::hash<identity<agent>>()(a->identifier )),
93
+ std::uint64_t (step.lower ), std::uint64_t (round_),
94
+ sample};
95
+
96
+ // try {
97
+
98
+ {
99
+ std::unique_lock lock_ (mutex_first_event_);
100
+ first_event_ = std::min (first_event_,
101
+ a->process_messages (step, seed_));
102
+ // agent_cb_end_ = double((high_resolution_clock::now() - agent_start_).count()); agent_act_ = high_resolution_clock::now();
96
103
first_event_ = std::min (first_event_, a->act (step, seed_));
104
+ }
105
+
97
106
// } catch(const std::runtime_error &e) {
98
107
// LOG(errorlog) << e.what() << std::endl;
99
108
// throw e;
@@ -103,17 +112,45 @@ namespace esl::simulation {
103
112
// } catch(...) {
104
113
// throw;
105
114
// }
106
- a->inbox .clear ();
107
- // auto agent_end_ = high_resolution_clock::now() - agent_start_;
108
-
109
-
110
- // timings_cb_[a->identifier] += agent_cb_end_;
111
- // timings_act_[a->identifier] += double( (high_resolution_clock::now() - agent_act_).count());
112
- // timings_[a->identifier] += (double(agent_end_.count()) );
113
- // LOG(notice) << "a" << a->identifier << " tcb " << step << " " << std::setprecision(8) << timings_cb_[a->identifier]/ 1e+9/step.lower << " s" << std::endl;
114
- // LOG(notice) << "a" << a->identifier << " tac " << step << " " << std::setprecision(8) << timings_act_[a->identifier]/ 1e+9/step.lower << " s" << std::endl;
115
+ a->inbox .clear ();
116
+ // auto agent_end_ = high_resolution_clock::now() - agent_start_;
117
+
118
+
119
+ // timings_cb_[a->identifier] += agent_cb_end_;
120
+ // timings_act_[a->identifier] += double( (high_resolution_clock::now() - agent_act_).count());
121
+ // timings_[a->identifier] += (double(agent_end_.count()) );
122
+ // LOG(notice) << "a" << a->identifier << " tcb " << step << " " << std::setprecision(8) << timings_cb_[a->identifier]/ 1e+9/step.lower << " s" << std::endl; LOG(notice) << "a" << a->identifier << " tac " << step << " " << std::setprecision(8) << timings_act_[a->identifier]/ 1e+9/step.lower << " s" << std::endl;
123
+
124
+ };
125
+
126
+ // important: if using a single thread, run everything in main
127
+ if (threads <= 1 ) {
128
+ for (auto &[i, a] : agents.local_agents_ ) {
129
+ job_ (a);
130
+ }
131
+ }else {
132
+ std::vector<std::thread> threads_;
133
+ auto iterator_ = agents.local_agents_ .begin ();
134
+ for (const auto & tasks_: quantity (agents.local_agents_ .size ()) / threads){
135
+ std::vector<std::shared_ptr<agent>> task_split_;
136
+ for (auto i = quantity (0 ); i < tasks_; ++i){
137
+ task_split_.push_back (iterator_->second );
138
+ std::advance (iterator_, 1 );
139
+ }
140
+
141
+ threads_.emplace_back ([&](std::vector<std::shared_ptr<agent>> ts){
142
+ for (const auto & a: ts){
143
+ job_ (a);
144
+ }
145
+ }, task_split_);
146
+ }
147
+
148
+ for (auto &t: threads_){
149
+ t.join ();
150
+ }
115
151
116
152
}
153
+
117
154
environment_.send_messages (*this );
118
155
++round_;
119
156
++rounds_;
0 commit comments