The ROme OpTimistic Simulator  2.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
gvt.c
Go to the documentation of this file.
1 
38 #ifdef HAVE_MPI
39 
41 #include <communication/mpi.h>
42 #include <communication/gvt.h>
43 
48 phase_colour *threads_phase_colour;
49 
52 
68 
75 
82 
88 
90 static int *white_msg_sent_buff;
91 
96 static int expected_white_msg;
97 
102 static MPI_Request white_count_req;
103 
109 static MPI_Comm white_count_comm;
110 
117 
122 static MPI_Request gvt_reduction_req;
123 
127 static MPI_Comm gvt_reduction_comm;
128 
134 
143 
149 
155 static MPI_Request *gvt_init_reqs;
156 
163 static unsigned int gvt_init_round;
164 
165 
174 void gvt_comm_init(void)
175 {
176  unsigned int i;
177 
178  min_outgoing_red_msg = rsalloc(n_cores * sizeof(simtime_t));
179  threads_phase_colour = rsalloc(n_cores * sizeof(phase_colour));
180  for (i = 0; i < n_cores; i++) {
183  }
184 
185  atomic_set(&white_0_msg_recv, 0);
186  atomic_set(&white_1_msg_recv, 0);
187  white_msg_recv = &white_1_msg_recv;
188  expected_white_msg = 0;
189 
190  white_msg_sent = rsalloc(n_ker * sizeof(atomic_t));
191  for (i = 0; i < n_ker; i++) {
192  atomic_set(&white_msg_sent[i], 0);
193  }
194 
195  white_msg_sent_buff = rsalloc(n_ker * sizeof(unsigned int));
196 
197  white_count_req = MPI_REQUEST_NULL;
198  MPI_Comm_dup(MPI_COMM_WORLD, &white_count_comm);
199  spinlock_init(&white_count_lock);
200 
201  gvt_init_reqs = rsalloc(n_ker * sizeof(MPI_Request));
202  for (i = 0; i < n_ker; i++) {
203  gvt_init_reqs[i] = MPI_REQUEST_NULL;
204  }
205 
206  gvt_reduction_req = MPI_REQUEST_NULL;
207  MPI_Comm_dup(MPI_COMM_WORLD, &gvt_reduction_comm);
208  spinlock_init(&gvt_reduction_lock);
209 }
210 
211 
222 {
223  rsfree(min_outgoing_red_msg);
224  rsfree(threads_phase_colour);
225  rsfree(white_msg_sent);
226  rsfree(white_msg_sent_buff);
227 
228  unsigned int i;
229  for (i = 0; i < n_ker; i++) {
230  if (gvt_init_reqs[i] != MPI_REQUEST_NULL) {
231  MPI_Cancel(&gvt_init_reqs[i]);
232  MPI_Request_free(&gvt_init_reqs[i]);
233  }
234  }
235 
236  if (gvt_init_pending()) {
237  gvt_init_clear();
238  }
239 
240  MPI_Wait(&white_count_req, MPI_STATUS_IGNORE);
241  MPI_Comm_free(&white_count_comm);
242  MPI_Wait(&gvt_reduction_req, MPI_STATUS_IGNORE);
243  MPI_Comm_free(&gvt_reduction_comm);
244  rsfree(gvt_init_reqs);
245 }
246 
247 
259 void enter_red_phase(void)
260 {
261  if (unlikely(in_red_phase())) {
262  rootsim_error(true, "Thread %u cannot enter in red phase because is already in red phase.\n", local_tid);
263  }
266 }
267 
268 
279 void exit_red_phase(void)
280 {
281  if (unlikely(!in_red_phase())) {
282  rootsim_error(true, "Thread %u cannot exit from red phase because it wasn't in red phase.\n", local_tid);
283  }
285 }
286 
287 
302 {
303  unsigned int i;
304  for (i = 0; i < n_ker; i++) {
305  white_msg_sent_buff[i] = atomic_read(&white_msg_sent[i]);
306  }
307 
308  lock_mpi();
309  MPI_Ireduce_scatter_block(white_msg_sent_buff, &expected_white_msg, 1, MPI_INT, MPI_SUM, white_count_comm, &white_count_req);
310  unlock_mpi();
311 }
312 
313 
329 {
330  if (!spin_trylock(&white_count_lock))
331  return false;
332 
333  bool compl = false;
335 
336  spin_unlock(&white_count_lock);
337  return compl;
338 }
339 
340 
350 {
351  MPI_Wait(&white_count_req, MPI_STATUS_IGNORE);
352 }
353 
354 
365 {
366  return (atomic_read(white_msg_recv) == expected_white_msg);
367 }
368 
369 
387 {
388  // santy check: Exactly the expected number of white
389  // messages should have been arrived in the last GVT round
390  if (atomic_read(white_msg_recv) != expected_white_msg) {
391  rootsim_error(true, "unexpected number of white messages received in the last GVT round: [expected: %d, received: %d]\n",
392  expected_white_msg, atomic_read(white_msg_recv));
393  }
394  // prepare the white msg counter for the next GVT round
395  atomic_set(white_msg_recv, 0);
396 
397  if (white_msg_recv == &white_0_msg_recv) {
398  white_msg_recv = &white_1_msg_recv;
399  } else {
400  white_msg_recv = &white_0_msg_recv;
401  }
402 }
403 
404 
418 {
419  unsigned int i;
420 
421  //sanity check
422 #ifndef NDEBUG
423  for (i = 0; i < n_cores; i++) {
425  rootsim_error(true, "flushing outgoing white message counter while some thread are not in red phase\n");
426  }
427 #endif
428 
429  for (i = 0; i < n_ker; i++) {
430  atomic_set(&white_msg_sent[i], 0);
431  }
432 }
433 
434 
447 void broadcast_gvt_init(unsigned int round)
448 {
449  unsigned int i;
450 
451  gvt_init_round = round;
452 
453  for (i = 0; i < n_ker; i++) {
454  if (i == kid)
455  continue;
456  if (!is_request_completed(&(gvt_init_reqs[i]))) {
457  rootsim_error(true, "Failed to send new GVT init round to kernel %u, because the old init request is still pending\n", i);
458  }
459  lock_mpi();
460  MPI_Isend((const void *)&gvt_init_round, 1, MPI_UNSIGNED, i, MSG_NEW_GVT, MPI_COMM_WORLD, &gvt_init_reqs[i]);
461  unlock_mpi();
462  }
463 }
464 
465 
478 {
479  return pending_msgs(MSG_NEW_GVT);
480 }
481 
482 
491 void gvt_init_clear(void)
492 {
493  unsigned int new_gvt_round;
494  lock_mpi();
495  MPI_Recv(&new_gvt_round, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MSG_NEW_GVT, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
496  unlock_mpi();
497 }
498 
499 
515 void join_gvt_redux(simtime_t local_vt)
516 {
517  local_vt_buff = local_vt;
518  lock_mpi();
519  MPI_Iallreduce(&local_vt_buff, &reduced_gvt, 1, MPI_DOUBLE, MPI_MIN, gvt_reduction_comm, &gvt_reduction_req);
520  unlock_mpi();
521 }
522 
523 
540 {
541  if (!spin_trylock(&gvt_reduction_lock))
542  return false;
543 
544  bool compl = false;
546 
547  spin_unlock(&gvt_reduction_lock);
548  return compl;
549 }
550 
551 
564 {
565  return reduced_gvt;
566 }
567 
568 
569 
589 void register_outgoing_msg(const msg_t *msg)
590 {
591 #ifdef HAVE_CROSS_STATE
592  if (is_control_msg(msg->type))
593  return;
594 #endif
595 
596  unsigned int dst_kid = find_kernel_by_gid(msg->receiver);
597 
598  if (dst_kid == kid)
599  return;
600 
601  if (is_red_colour(msg->colour)) {
603  } else {
604  atomic_inc(&white_msg_sent[dst_kid]);
605  }
606 }
607 
608 
623 void register_incoming_msg(const msg_t * msg)
624 {
625 #ifdef HAVE_CROSS_STATE
626  if (is_control_msg(msg->type))
627  return;
628 #endif
629 
630  unsigned int src_kid = find_kernel_by_gid(msg->sender);
631 
632  if (src_kid == kid)
633  return;
634 
635  if (msg->colour == white_0) {
636  atomic_inc(&white_0_msg_recv);
637  } else if (msg->colour == white_1) {
638  atomic_inc(&white_1_msg_recv);
639  }
640 }
641 
642 #endif
static int expected_white_msg
Definition: gvt.c:96
static int * white_msg_sent_buff
Temporary structure used for the MPI collective primitives.
Definition: gvt.c:90
void register_incoming_msg(const msg_t *msg)
Register an incoming message, if necessary.
Definition: gvt.c:623
bool spin_trylock(spinlock_t *s)
Definition: x86.c:151
Communication Routines.
simtime_t last_reduced_gvt(void)
Return the last GVT value.
Definition: gvt.c:563
void exit_red_phase(void)
Make a thread exit from red phase.
Definition: gvt.c:279
bool all_white_msg_received(void)
Check if white messages are all received.
Definition: gvt.c:364
bool is_request_completed(MPI_Request *req)
check if an MPI request has been completed
Definition: mpi.c:144
static MPI_Request white_count_req
Definition: gvt.c:102
#define spinlock_init(s)
Spinlock initialization.
Definition: atomic.h:72
#define next_colour(c)
Tells what is the next colour, using simple arithmetics.
Definition: gvt.h:82
void gvt_comm_init(void)
Initialize the MPI-based distributed GVT reduction submodule.
Definition: gvt.c:174
#define min(a, b)
Macro to find the minimum among two values.
Definition: core.h:115
phase_colour * threads_phase_colour
Definition: gvt.c:48
static atomic_t white_0_msg_recv
Definition: gvt.c:74
void join_white_msg_redux(void)
Join the white message reduction collective operation.
Definition: gvt.c:301
void flush_white_msg_recv(void)
Reset received white messages.
Definition: gvt.c:386
Color is white (phase 2)
Definition: gvt.h:68
bool gvt_redux_completed(void)
Check if final GVT reduction is complete.
Definition: gvt.c:539
#define atomic_read(v)
Read operation on an atomic counter.
Definition: atomic.h:66
static simtime_t local_vt_buff
Definition: gvt.c:142
unsigned int n_cores
Total number of cores required for simulation.
Definition: core.c:61
static MPI_Comm gvt_reduction_comm
Definition: gvt.c:127
Master notifies the new GVT.
Definition: communication.h:87
static atomic_t * white_msg_sent
Definition: gvt.c:87
bool gvt_init_pending(void)
Check if there are pending GVT-init messages around.
Definition: gvt.c:477
unsigned int find_kernel_by_gid(GID_t gid)
Definition: core.c:164
void register_outgoing_msg(const msg_t *msg)
Register an outgoing message, if necessary.
Definition: gvt.c:589
#define atomic_set(v, i)
Set operation on an atomic counter.
Definition: atomic.h:69
bool pending_msgs(int tag)
Check if there are pending messages.
Definition: mpi.c:122
#define INFTY
Infinite timestamp: this is the highest timestamp in a simulation run.
Definition: ROOT-Sim.h:58
bool white_msg_redux_completed(void)
Test completion of white message reduction collective operation.
Definition: gvt.c:328
double simtime_t
This defines the type with whom timestamps are represented.
Definition: ROOT-Sim.h:55
static atomic_t white_1_msg_recv
Definition: gvt.c:81
Message Type definition.
Definition: core.h:164
simtime_t * min_outgoing_red_msg
Minimum time among all the outgoing red messages for each thread.
Definition: gvt.c:51
void wait_white_msg_redux(void)
Wait for the completion of wait message reduction.
Definition: gvt.c:349
#define unlock_mpi()
This macro releases a global lock if multithreaded support is not available from MPI.
Definition: mpi.h:44
static spinlock_t gvt_reduction_lock
Definition: gvt.c:133
void enter_red_phase(void)
Make a thread enter into red phase.
Definition: gvt.c:259
static spinlock_t white_count_lock
Definition: gvt.c:116
static MPI_Request * gvt_init_reqs
Definition: gvt.c:155
Color is white (phase 1)
Definition: gvt.h:66
static MPI_Request gvt_reduction_req
Definition: gvt.c:122
MPI Support Module.
#define in_red_phase()
Tells whether the current thread is in red phase.
Definition: gvt.h:79
static unsigned int gvt_init_round
Definition: gvt.c:163
#define is_control_msg(type)
This macro tells whether a message is a control message, by its type.
Definition: communication.h:76
void broadcast_gvt_init(unsigned int round)
Initiate a distributed GVT.
Definition: gvt.c:447
void gvt_init_clear(void)
Forcely extract GVT-init message from MPI.
Definition: gvt.c:491
volatile atomic_t * white_msg_recv
Definition: gvt.c:67
static simtime_t reduced_gvt
Definition: gvt.c:148
void flush_white_msg_sent(void)
Reset sent white messages.
Definition: gvt.c:417
void gvt_comm_finalize(void)
Shut down the MPI-based distributed GVT reduction submodule.
Definition: gvt.c:221
__thread unsigned int local_tid
Definition: thread.c:72
#define lock_mpi()
This macro takes a global lock if multithread support is not available from MPI.
Definition: mpi.h:41
static MPI_Comm white_count_comm
Definition: gvt.c:109
Distributed GVT Support module.
void join_gvt_redux(simtime_t local_vt)
Reduce the GVT value.
Definition: gvt.c:515
void atomic_inc(atomic_t *)
Definition: x86.c:90
unsigned int n_ker
Total number of simulation kernel instances running.
Definition: core.c:58
#define is_red_colour(c)
Definition: gvt.h:76
#define unlikely(exp)
Optimize the branch as likely not taken.
Definition: core.h:74
unsigned int kid
Identifier of the local kernel.
Definition: core.c:55
void spin_unlock(spinlock_t *s)
Definition: x86.c:161