The ROme OpTimistic Simulator  2.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
gvt.c
Go to the documentation of this file.
1 
34 #include <ROOT-Sim.h>
35 #include <arch/thread.h>
36 #include <gvt/gvt.h>
37 #include <gvt/ccgs.h>
38 #include <core/core.h>
39 #include <core/init.h>
40 #include <core/timer.h>
41 #include <scheduler/process.h>
42 #include <scheduler/scheduler.h>
43 #include <statistics/statistics.h>
44 #include <mm/mm.h>
45 #include <communication/mpi.h>
46 #include <communication/gvt.h>
47 
48 enum kernel_phases {
49  kphase_start,
50 #ifdef HAVE_MPI
51  kphase_white_msg_redux,
52 #endif
53  kphase_kvt,
54 #ifdef HAVE_MPI
55  kphase_gvt_redux,
56 #endif
57  kphase_fossil,
58  kphase_idle
59 };
60 
61 enum thread_phases {
62  tphase_A,
63  tphase_send,
64  tphase_B,
65  tphase_aware,
66  tphase_idle
67 };
68 
69 // Timer to know when we have to start GVT computation.
70 // Each thread could start the GVT reduction phase, so this
71 // is a per-thread variable.
72 timer gvt_timer;
73 
74 timer gvt_round_timer;
75 
76 #ifdef HAVE_MPI
77 static unsigned int init_kvt_tkn;
78 static unsigned int commit_gvt_tkn;
79 #endif
80 
81 /* Data shared across threads */
82 
83 static volatile enum kernel_phases kernel_phase = kphase_idle;
84 
85 static unsigned int init_completed_tkn;
86 static unsigned int commit_kvt_tkn;
87 static unsigned int idle_tkn;
88 
89 static atomic_t counter_initialized;
90 static atomic_t counter_kvt;
91 static atomic_t counter_finalized;
92 
94 static volatile unsigned int current_GVT_round = 0;
95 
98 
101 
104 
113 static __thread simtime_t last_gvt = 0.0;
114 
115 // last agreed KVT
116 static volatile simtime_t new_gvt = 0.0;
117 
119 static __thread enum thread_phases thread_phase = tphase_idle;
120 
122 static __thread unsigned int my_GVT_round = 0;
123 
126 
127 static simtime_t *local_min_barrier;
128 
132 void gvt_init(void)
133 {
134  unsigned int i;
135 
136  // This allows the first GVT phase to start
137  atomic_set(&counter_finalized, 0);
138 
139  // Initialize the local minima
140  local_min = rsalloc(sizeof(simtime_t) * n_cores);
141  local_min_barrier = rsalloc(sizeof(simtime_t) * n_cores);
142  for (i = 0; i < n_cores; i++) {
143  local_min[i] = INFTY;
144  local_min_barrier[i] = INFTY;
145  }
146 
147  timer_start(gvt_timer);
148 
149  // Initialize the CCGS subsystem
150  ccgs_init();
151 }
152 
156 void gvt_fini(void)
157 {
158  // Finalize the CCGS subsystem
159  ccgs_fini();
160 
161 #ifdef HAVE_MPI
162  if ((kernel_phase == kphase_idle && !master_thread() && gvt_init_pending()) || kernel_phase == kphase_start) {
165  join_gvt_redux(-1.0);
166  } else if (kernel_phase == kphase_white_msg_redux || kernel_phase == kphase_kvt) {
168  join_gvt_redux(-1.0);
169  }
170 #endif
171 }
172 
180 {
181  return last_gvt;
182 }
183 
184 static inline void reduce_local_gvt(void)
185 {
186  foreach_bound_lp(lp) {
187  // If no message has been processed, local estimate for
188  // GVT is forced to 0.0. This can happen, e.g., if
189  // GVT is computed very early in the run
190  if (unlikely(lp->bound == NULL)) {
191  local_min[local_tid] = 0.0;
192  break;
193  }
194 
195  // GVT inheritance: if the current LP has no scheduled
196  // events, we can safely assume that it should not
197  // participate to the computation of the GVT, because any
198  // event to it will appear *after* the GVT
199  if (lp->bound->next == NULL)
200  continue;
201 
203  min(local_min[local_tid], lp->bound->timestamp);
204  }
205 }
206 
207 simtime_t GVT_phases(void)
208 {
209  unsigned int i;
210 
211  if (thread_phase == tphase_A) {
212 #ifdef HAVE_MPI
213  // Check whether we have new ingoing messages sent by remote instances
215 #endif
217 
218  reduce_local_gvt();
219 
220  thread_phase = tphase_send; // Entering phase send
221  atomic_dec(&counter_A); // Notify finalization of phase A
222  return -1.0;
223  }
224 
225  if (thread_phase == tphase_send && atomic_read(&counter_A) == 0) {
226 #ifdef HAVE_MPI
227  // Check whether we have new ingoing messages sent by remote instances
229 #endif
231  schedule();
232  thread_phase = tphase_B;
233  atomic_dec(&counter_send);
234  return -1.0;
235  }
236 
237  if (thread_phase == tphase_B && atomic_read(&counter_send) == 0) {
238 #ifdef HAVE_MPI
239  // Check whether we have new ingoing messages sent by remote instances
241 #endif
243 
244  reduce_local_gvt();
245 
246 #ifdef HAVE_MPI
247  // WARNING: local thread cannot send any remote
248  // message between the two following calls
249  exit_red_phase();
252 #endif
253 
254  thread_phase = tphase_aware;
255  atomic_dec(&counter_B);
256 
257  if (atomic_read(&counter_B) == 0) {
258  simtime_t agreed_vt = INFTY;
259  for (i = 0; i < n_cores; i++) {
260  agreed_vt = min(local_min[i], agreed_vt);
261  }
262  return agreed_vt;
263  }
264  return -1.0;
265  }
266 
267  return -1.0;
268 }
269 
270 bool start_new_gvt(void)
271 {
272 #ifdef HAVE_MPI
273  if (!master_kernel()) {
274  //Check if we received a new GVT init msg
275  return gvt_init_pending();
276  }
277 #endif
278 
279  // Has enough time passed since the last GVT reduction?
280  return timer_value_milli(gvt_timer) >
282 }
283 
301 {
302 
303  // GVT reduction initialization.
304  // This is different from the paper's pseudocode to reduce
305  // slightly the number of clock reads
306  if (kernel_phase == kphase_idle) {
307 
308  if (start_new_gvt() &&
310 
311  timer_start(gvt_round_timer);
312 
313 #ifdef HAVE_MPI
314  //inform all the other kernels about the new gvt
315  if (master_kernel()) {
317  } else {
318  gvt_init_clear();
319  }
320 #endif
321 
322  // Reduce the current CCGS termination detection
323  ccgs_reduce_termination();
324 
325  /* kernel GVT round setup */
326 
327 #ifdef HAVE_MPI
329 
330  init_kvt_tkn = 1;
331  commit_gvt_tkn = 1;
332 #endif
333 
334  init_completed_tkn = 1;
335  commit_kvt_tkn = 1;
336  idle_tkn = 1;
337 
338  atomic_set(&counter_initialized, n_cores);
339  atomic_set(&counter_kvt, n_cores);
340  atomic_set(&counter_finalized, n_cores);
341 
342  atomic_set(&counter_A, n_cores);
343  atomic_set(&counter_send, n_cores);
344  atomic_set(&counter_B, n_cores);
345 
346  kernel_phase = kphase_start;
347 
348  timer_restart(gvt_timer);
349  }
350  }
351 
352  /* Thread setup phase:
353  * each thread needs to setup its own local context
354  * before to partecipate to the new GVT round */
355  if (kernel_phase == kphase_start && thread_phase == tphase_idle) {
356 
357  // Someone has modified the GVT round (possibly me).
358  // Keep track of this update
360 
361 #ifdef HAVE_MPI
362  enter_red_phase();
363 #endif
364 
366 
367  thread_phase = tphase_A;
368  atomic_dec(&counter_initialized);
369  if (atomic_read(&counter_initialized) == 0) {
370  if (iCAS(&init_completed_tkn, 1, 0)) {
371 #ifdef HAVE_MPI
373  kernel_phase = kphase_white_msg_redux;
374 #else
375  kernel_phase = kphase_kvt;
376 #endif
377  }
378  }
379  return -1.0;
380  }
381 
382 #ifdef HAVE_MPI
383  if (kernel_phase == kphase_white_msg_redux
385  if (iCAS(&init_kvt_tkn, 1, 0)) {
387  kernel_phase = kphase_kvt;
388  }
389  return -1.0;
390  }
391 #endif
392 
393  /* KVT phase:
394  * make all the threads agree on a common virtual time for this kernel */
395  if (kernel_phase == kphase_kvt && thread_phase != tphase_aware) {
396  simtime_t kvt = GVT_phases();
397  if (D_DIFFER(kvt, -1.0)) {
398  if (iCAS(&commit_kvt_tkn, 1, 0)) {
399 
400 #ifdef HAVE_MPI
401  join_gvt_redux(kvt);
402  kernel_phase = kphase_gvt_redux;
403 
404 #else
405  new_gvt = kvt;
406  kernel_phase = kphase_fossil;
407 
408 #endif
409  }
410  }
411  return -1.0;
412  }
413 
414 #ifdef HAVE_MPI
415  if (kernel_phase == kphase_gvt_redux && gvt_redux_completed()) {
416  if (iCAS(&commit_gvt_tkn, 1, 0)) {
417  int gvt_round_time = timer_value_micro(gvt_round_timer);
418  statistics_post_data(current, STAT_GVT_ROUND_TIME, gvt_round_time);
419 
420  new_gvt = last_reduced_gvt();
421  kernel_phase = kphase_fossil;
422  }
423  return -1.0;
424  }
425 #endif
426 
427  /* GVT adoption phase:
428  * the last agreed GVT needs to be adopted by every thread */
429  if (kernel_phase == kphase_fossil && thread_phase == tphase_aware) {
430 
431  // Execute fossil collection and termination detection
432  // Each thread stores the last computed value in last_gvt,
433  // while the return value is the gvt only for the master
434  // thread. To check for termination based on simulation time,
435  // this variable must be explicitly inspected using
436  // get_last_gvt()
437  adopt_new_gvt(new_gvt);
438 
439  // Dump statistics
440  statistics_on_gvt(new_gvt);
441 
442  last_gvt = new_gvt;
443 
444  thread_phase = tphase_idle;
445  atomic_dec(&counter_finalized);
446 
447  if (atomic_read(&counter_finalized) == 0) {
448  if (iCAS(&idle_tkn, 1, 0)) {
449  kernel_phase = kphase_idle;
450  }
451  }
452  return last_gvt;
453  }
454 
455  return -1.0;
456 }
static atomic_t counter_A
How many threads have left phase A?
Definition: gvt.c:97
simtime_t last_reduced_gvt(void)
Return the last GVT value.
Definition: gvt.c:563
void exit_red_phase(void)
Make a thread exit from red phase.
Definition: gvt.c:279
bool all_white_msg_received(void)
Check if white messages are all received.
Definition: gvt.c:364
void process_bottom_halves(void)
Definition: queues.c:132
Initialization routines.
#define min(a, b)
Macro to find the minimum among two values.
Definition: core.h:115
void join_white_msg_redux(void)
Join the white message reduction collective operation.
Definition: gvt.c:301
void flush_white_msg_recv(void)
Reset received white messages.
Definition: gvt.c:386
bool gvt_redux_completed(void)
Check if final GVT reduction is complete.
Definition: gvt.c:539
#define atomic_read(v)
Read operation on an atomic counter.
Definition: atomic.h:66
Core ROOT-Sim functionalities.
void gvt_init(void)
Definition: gvt.c:132
ROOT-Sim header for model development.
unsigned int n_cores
Total number of cores required for simulation.
Definition: core.c:61
static __thread enum thread_phases thread_phase
What is my phase? All threads start in the initial phase.
Definition: gvt.c:119
void atomic_dec(atomic_t *)
Definition: x86.c:103
simtime_t get_last_gvt(void)
Definition: gvt.c:179
static simtime_t * local_min
The local (per-thread) minimum. It&#39;s not TLS, rather an array, to allow reduction by master thread...
Definition: gvt.c:125
simtime_t gvt_operations(void)
Definition: gvt.c:300
Timers.
Statistics module.
The ROOT-Sim scheduler main module header.
bool gvt_init_pending(void)
Check if there are pending GVT-init messages around.
Definition: gvt.c:477
Generic thread management facilities.
__thread struct lp_struct * current
This is a per-thread variable pointing to the block state of the LP currently scheduled.
Definition: scheduler.c:72
Consistent and Committed Global State.
void schedule(void)
Definition: scheduler.c:321
#define atomic_set(v, i)
Set operation on an atomic counter.
Definition: atomic.h:69
#define INFTY
Infinite timestamp: this is the highest timestamp in a simulation run.
Definition: ROOT-Sim.h:58
bool white_msg_redux_completed(void)
Test completion of white message reduction collective operation.
Definition: gvt.c:328
double simtime_t
This defines the type with whom timestamps are represented.
Definition: ROOT-Sim.h:55
simulation_configuration rootsim_config
This global variable holds the configuration for the current simulation.
Definition: core.c:70
Memory Manager main header.
simtime_t * min_outgoing_red_msg
Minimum time among all the outgoing red messages for each thread.
Definition: gvt.c:51
void wait_white_msg_redux(void)
Wait for the completion of wait message reduction.
Definition: gvt.c:349
void gvt_fini(void)
Definition: gvt.c:156
LP control blocks.
void enter_red_phase(void)
Make a thread enter into red phase.
Definition: gvt.c:259
#define master_thread()
This macro expands to true if the current KLT is the master thread for the local kernel.
Definition: thread.h:155
bool iCAS(volatile uint32_t *ptr, uint32_t oldVal, uint32_t newVal)
Definition: x86.c:49
#define D_DIFFER(a, b)
Difference condition for doubles.
Definition: core.h:98
MPI Support Module.
static __thread simtime_t last_gvt
Definition: gvt.c:113
#define master_kernel()
This macro expands to true if the local kernel is the master kernel.
Definition: core.h:47
static atomic_t counter_B
How many threads have left phase B?
Definition: gvt.c:103
static atomic_t counter_send
How many threads have left phase send?
Definition: gvt.c:100
void broadcast_gvt_init(unsigned int round)
Initiate a distributed GVT.
Definition: gvt.c:447
Global Virtual Time.
void receive_remote_msgs(void)
Receive remote messages.
Definition: mpi.c:208
void gvt_init_clear(void)
Forcely extract GVT-init message from MPI.
Definition: gvt.c:491
void flush_white_msg_sent(void)
Reset sent white messages.
Definition: gvt.c:417
void adopt_new_gvt(simtime_t new_gvt)
Definition: fossil.c:96
static volatile unsigned int current_GVT_round
To be used with CAS to determine who is starting the next GVT reduction phase.
Definition: gvt.c:94
__thread unsigned int local_tid
Definition: thread.c:72
static __thread unsigned int my_GVT_round
Per-thread GVT round counter.
Definition: gvt.c:122
Distributed GVT Support module.
void join_gvt_redux(simtime_t local_vt)
Reduce the GVT value.
Definition: gvt.c:515
int gvt_time_period
Wall-Clock time to wait before executiong GVT operations.
Definition: init.h:60
#define unlikely(exp)
Optimize the branch as likely not taken.
Definition: core.h:74