The ROme OpTimistic Simulator  2.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
mpi.c
Go to the documentation of this file.
1 
34 #ifdef HAVE_MPI
35 
36 #include <stdbool.h>
37 
38 #include <communication/mpi.h>
39 #include <communication/wnd.h>
40 #include <communication/gvt.h>
42 #include <queues/queues.h>
43 #include <core/core.h>
44 #include <arch/atomic.h>
45 #include <statistics/statistics.h>
46 
49 
56 
59 
64 static unsigned int terminated = 0;
65 
67 static MPI_Request *termination_reqs;
68 
71 
73 static MPI_Op reduce_stats_op;
74 
76 static MPI_Datatype stats_mpi_t;
77 
100 static MPI_Comm msg_comm;
101 
102 
122 bool pending_msgs(int tag)
123 {
124  int flag = 0;
125  lock_mpi();
126  MPI_Iprobe(MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
127  unlock_mpi();
128  return (bool)flag;
129 }
130 
144 bool is_request_completed(MPI_Request *req)
145 {
146  int flag = 0;
147  lock_mpi();
148  MPI_Test(req, &flag, MPI_STATUS_IGNORE);
149  unlock_mpi();
150  return (bool)flag;
151 }
152 
170 {
171  outgoing_msg *out_msg = allocate_outgoing_msg();
172  out_msg->msg = msg;
173  out_msg->msg->colour = threads_phase_colour[local_tid];
174  unsigned int dest = find_kernel_by_gid(msg->receiver);
175 
176  register_outgoing_msg(out_msg->msg);
177 
178  lock_mpi();
179  MPI_Isend(((char *)out_msg->msg) + MSG_PADDING, MSG_META_SIZE + msg->size, MPI_BYTE, dest, msg->receiver.to_int, msg_comm, &out_msg->req);
180  unlock_mpi();
181 
182  // Keep the message in the outgoing queue until it will be delivered
183  store_outgoing_msg(out_msg, dest);
184 }
185 
209 {
210  int size;
211  msg_t *msg;
212  MPI_Status status;
213  MPI_Message mpi_msg;
214  int pending;
215  struct lp_struct *lp;
216  GID_t gid;
217 
218  // TODO: given the latest changes in the platform, this *might*
219  // be removed.
220  if (!spin_trylock(&msgs_lock))
221  return;
222 
223  while (true) {
224  lock_mpi();
225  MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, msg_comm, &pending, &mpi_msg, &status);
226  unlock_mpi();
227 
228  if (!pending)
229  goto out;
230 
231  MPI_Get_count(&status, MPI_BYTE, &size);
232 
233  if (likely(MSG_PADDING + size <= SLAB_MSG_SIZE)) {
234  set_gid(gid, status.MPI_TAG);
235  lp = find_lp_by_gid(gid);
236  msg = get_msg_from_slab(lp);
237  } else {
238  msg = rsalloc(MSG_PADDING + size);
239  bzero(msg, MSG_PADDING);
240  }
241 
242  // Receive the message. Use MPI_Mrecv to be sure that the very same message
243  // which was matched by the previous MPI_Improbe is extracted.
244  lock_mpi();
245  MPI_Mrecv(((char *)msg) + MSG_PADDING, size, MPI_BYTE, &mpi_msg, MPI_STATUS_IGNORE);
246  unlock_mpi();
247 
248  validate_msg(msg);
249  insert_bottom_half(msg);
250  }
251  out:
252  spin_unlock(&msgs_lock);
253 }
254 
255 
256 
271 {
272  return (terminated == n_ker);
273 }
274 
275 
276 
290 {
291  int res;
292  unsigned int tdata;
293 
294  if (terminated == 0 || !spin_trylock(&msgs_fini))
295  return;
296 
297  while (pending_msgs(MSG_FINI)) {
298  lock_mpi();
299  res =
300  MPI_Recv(&tdata, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MSG_FINI, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
301  unlock_mpi();
302  if (unlikely(res != 0)) {
303  rootsim_error(true, "MPI_Recv did not complete correctly");
304  return;
305  }
306  terminated++;
307  }
308  spin_unlock(&msgs_fini);
309 }
310 
311 
328 {
329  unsigned int i;
330  lock_mpi();
331  for (i = 0; i < n_ker; i++) {
332  if (i == kid)
333  continue;
334  MPI_Isend(&i, 1, MPI_UNSIGNED, i, MSG_FINI, MPI_COMM_WORLD, &termination_reqs[i]);
335  }
336  terminated++;
337  unlock_mpi();
338 }
339 
340 
348 static void reduce_stat_vector(struct stat_t *in, struct stat_t *inout, int *len, MPI_Datatype *dptr)
349 {
350  (void)dptr;
351  int i = 0;
352 
353  for (i = 0; i < *len; ++i) {
354  inout[i].vec += in[i].vec;
355  inout[i].gvt_round_time += in[i].gvt_round_time;
356  inout[i].gvt_round_time_min = fmin(inout[i].gvt_round_time_min, in[i].gvt_round_time_min);
357  inout[i].gvt_round_time_max = fmax(inout[i].gvt_round_time_max, in[i].gvt_round_time_max);
358  inout[i].max_resident_set += in[i].max_resident_set;
359  }
360 }
361 
362 
363 
365 #define MPI_TYPE_STAT_LEN (sizeof(struct stat_t)/sizeof(double))
366 
381 static void stats_reduction_init(void)
382 {
383  // This is a compilation time fail-safe
384  static_assert(offsetof(struct stat_t, gvt_round_time_max) == (sizeof(double) * 19), "The packing assumptions on struct stat_t are wrong or its definition has been modified");
385 
386  unsigned i;
387 
388  // Boilerplate to create a new MPI data type
389  MPI_Datatype type[MPI_TYPE_STAT_LEN];
390  MPI_Aint disp[MPI_TYPE_STAT_LEN];
391  int block_lengths[MPI_TYPE_STAT_LEN];
392 
393  // Initialize those arrays (we asssume that struct stat_t is packed tightly)
394  for (i = 0; i < MPI_TYPE_STAT_LEN; ++i) {
395  type[i] = MPI_DOUBLE;
396  disp[i] = i * sizeof(double);
397  block_lengths[i] = 1;
398  }
399 
400  // Create the custom type and commit the changes
401  MPI_Type_create_struct(MPI_TYPE_STAT_LEN, block_lengths, disp, type, &stats_mpi_t);
402  MPI_Type_commit(&stats_mpi_t);
403 
404  // Create the MPI Operation used to reduce stats
405  if (master_thread()) {
406  MPI_Op_create((MPI_User_function *)reduce_stat_vector, true, &reduce_stats_op);
407  }
408 }
409 
410 #undef MPI_TYPE_STAT_LEN
411 
412 
428 void mpi_reduce_statistics(struct stat_t *global, struct stat_t *local)
429 {
430  MPI_Reduce(local, global, 1, stats_mpi_t, reduce_stats_op, 0, MPI_COMM_WORLD);
431 }
432 
433 
434 
453 {
454  /* init for collective termination */
455  termination_reqs = rsalloc(n_ker * sizeof(MPI_Request));
456  unsigned int i;
457  for (i = 0; i < n_ker; i++) {
458  termination_reqs[i] = MPI_REQUEST_NULL;
459  }
460  spinlock_init(&msgs_fini);
461 }
462 
463 
464 
472 {
473  MPI_Waitall(n_ker, termination_reqs, MPI_STATUSES_IGNORE);
474 }
475 
492 void syncronize_all(void)
493 {
494  if (master_thread()) {
495  MPI_Comm comm;
496  MPI_Comm_dup(MPI_COMM_WORLD, &comm);
497  MPI_Barrier(comm);
498  MPI_Comm_free(&comm);
499  }
501 }
502 
503 
514 void mpi_init(int *argc, char ***argv)
515 {
516  int mpi_thread_lvl_provided = 0;
517  MPI_Init_thread(argc, argv, MPI_THREAD_MULTIPLE, &mpi_thread_lvl_provided);
518 
520  if (mpi_thread_lvl_provided < MPI_THREAD_MULTIPLE) {
521  //MPI do not support thread safe api call
522  if (mpi_thread_lvl_provided < MPI_THREAD_SERIALIZED) {
523  // MPI do not even support serialized threaded call we cannot continue
524  rootsim_error(true, "The MPI implementation does not support threads [current thread level support: %d]\n", mpi_thread_lvl_provided);
525  }
526  mpi_support_multithread = false;
527  }
528 
529  spinlock_init(&mpi_lock);
530 
531  MPI_Comm_size(MPI_COMM_WORLD, (int *)&n_ker);
532  MPI_Comm_rank(MPI_COMM_WORLD, (int *)&kid);
533 
534  // Create a separate communicator which we use to send event messages
535  MPI_Comm_dup(MPI_COMM_WORLD, &msg_comm);
536 }
537 
538 
546 {
547  spinlock_init(&msgs_lock);
548 
550  gvt_comm_init();
553 }
554 
555 
563 {
565  //outgoing_window_finalize();
567 }
568 
569 
578 void mpi_finalize(void)
579 {
580  if (master_thread()) {
581  MPI_Barrier(MPI_COMM_WORLD);
582  MPI_Comm_free(&msg_comm);
583  MPI_Finalize();
584  } else {
585  rootsim_error(true, "MPI finalize has been invoked by a non master thread: T%u\n", local_tid);
586  }
587 }
588 
589 #endif /* HAVE_MPI */
bool spin_trylock(spinlock_t *s)
Definition: x86.c:151
Communication Routines.
bool is_request_completed(MPI_Request *req)
check if an MPI request has been completed
Definition: mpi.c:144
#define likely(exp)
Optimize the branch as likely taken.
Definition: core.h:72
The structure representing a node in the outgoing_queue list.
Definition: wnd.h:41
void dist_termination_finalize(void)
Cleanup routine of the distributed termination subsystem.
Definition: mpi.c:471
#define spinlock_init(s)
Spinlock initialization.
Definition: atomic.h:72
void gvt_comm_init(void)
Initialize the MPI-based distributed GVT reduction submodule.
Definition: gvt.c:174
void broadcast_termination(void)
Notify all the kernels about local termination.
Definition: mpi.c:327
Message queueing subsystem.
phase_colour * threads_phase_colour
Definition: gvt.c:48
static MPI_Request * termination_reqs
MPI Requests to handle termination detection collection asynchronously.
Definition: mpi.c:67
Core ROOT-Sim functionalities.
#define SLAB_MSG_SIZE
Slab allocator max message size.
Definition: communication.h:43
void mpi_finalize(void)
Finalize MPI.
Definition: mpi.c:578
static MPI_Datatype stats_mpi_t
MPI Datatype to describe the content of a struct stat_t.
Definition: mpi.c:76
unsigned int to_int
The GID numerical value.
Definition: core.h:133
Statistics module.
#define MPI_TYPE_STAT_LEN
The size in bytes of the statistics custom MPI Datatype. It assumes that stat_t contains only double ...
Definition: mpi.c:365
static spinlock_t msgs_lock
A guard to ensure isolation in the the message receiving routine.
Definition: mpi.c:58
static void stats_reduction_init(void)
Initialize MPI Datatype and Operation for statistics reduction.
Definition: mpi.c:381
unsigned int find_kernel_by_gid(GID_t gid)
Definition: core.c:164
void store_outgoing_msg(outgoing_msg *out_msg, unsigned int dest_kid)
Store an outgoing message.
Definition: wnd.c:167
One rank informs the others that the simulation has to be stopped.
Definition: communication.h:88
void register_outgoing_msg(const msg_t *msg)
Register an outgoing message, if necessary.
Definition: gvt.c:589
bool mpi_support_multithread
Flag telling whether the MPI runtime supports multithreading.
Definition: mpi.c:48
void mpi_reduce_statistics(struct stat_t *global, struct stat_t *local)
Invoke statistics reduction.
Definition: mpi.c:428
bool pending_msgs(int tag)
Check if there are pending messages.
Definition: mpi.c:122
bool all_kernels_terminated(void)
Check if all kernels have reached the termination condition.
Definition: mpi.c:270
Message Type definition.
Definition: core.h:164
unsigned int size
Unique identifier of the message, used for rendez-vous events.
Definition: core.h:189
void inter_kernel_comm_finalize(void)
Finalize inter-kernel communication.
Definition: mpi.c:562
MPI_Request req
The MPI Request used to keep track of the delivery operation.
Definition: wnd.h:42
#define unlock_mpi()
This macro releases a global lock if multithreaded support is not available from MPI.
Definition: mpi.h:44
outgoing_msg * allocate_outgoing_msg(void)
Allocate a buffer for an outgoing message node.
Definition: wnd.c:130
bool thread_barrier(barrier_t *b)
Definition: thread.c:200
Atomic operations.
void validate_msg(msg_t *msg)
Perform some sanity checks on a message buffer.
barrier_t all_thread_barrier
Barrier for all worker threads.
Definition: core.c:49
void mpi_init(int *argc, char ***argv)
Initialize MPI subsystem.
Definition: mpi.c:514
spinlock_t mpi_lock
Definition: mpi.c:55
void dist_termination_init(void)
Setup the distributed termination subsystem.
Definition: mpi.c:452
void outgoing_window_init(void)
Outgoing queue initialization.
Definition: wnd.c:86
#define master_thread()
This macro expands to true if the current KLT is the master thread for the local kernel.
Definition: thread.h:155
Definition of a GID.
Definition: core.h:132
void send_remote_msg(msg_t *msg)
Send a message to a remote LP.
Definition: mpi.c:169
msg_t * get_msg_from_slab(struct lp_struct *lp)
Get a buffer to keep a message.
GID_t gid
Global ID of the LP.
Definition: process.h:82
MPI Support Module.
static MPI_Op reduce_stats_op
MPI Operation to reduce statics.
Definition: mpi.c:73
void receive_remote_msgs(void)
Receive remote messages.
Definition: mpi.c:208
msg_t * msg
A pointer to the msg_t which MPI is delivering.
Definition: wnd.h:45
void insert_bottom_half(msg_t *msg)
Definition: queues.c:115
void collect_termination(void)
Check if other kernels have reached the termination condition.
Definition: mpi.c:289
void gvt_comm_finalize(void)
Shut down the MPI-based distributed GVT reduction submodule.
Definition: gvt.c:221
static MPI_Comm msg_comm
MPI Communicator for event/control messages.
Definition: mpi.c:100
Message delivery support.
__thread unsigned int local_tid
Definition: thread.c:72
#define lock_mpi()
This macro takes a global lock if multithread support is not available from MPI.
Definition: mpi.h:41
void syncronize_all(void)
Syncronize all the kernels.
Definition: mpi.c:492
Distributed GVT Support module.
static unsigned int terminated
Definition: mpi.c:64
void inter_kernel_comm_init(void)
Initialize inter-kernel communication.
Definition: mpi.c:545
static void reduce_stat_vector(struct stat_t *in, struct stat_t *inout, int *len, MPI_Datatype *dptr)
Reduce operation for statistics.
Definition: mpi.c:348
unsigned int n_ker
Total number of simulation kernel instances running.
Definition: core.c:58
static spinlock_t msgs_fini
A guard to ensure isolation in collect_termination()
Definition: mpi.c:70
#define unlikely(exp)
Optimize the branch as likely not taken.
Definition: core.h:74
unsigned int kid
Identifier of the local kernel.
Definition: core.c:55
void spin_unlock(spinlock_t *s)
Definition: x86.c:161