The ROme OpTimistic Simulator  2.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
mpi.c
Go to the documentation of this file.
1 
34 #ifdef HAVE_MPI
35 
36 #include <stdbool.h>
37 
38 #include <communication/mpi.h>
39 #include <communication/wnd.h>
40 #include <communication/gvt.h>
42 #include <queues/queues.h>
43 #include <core/core.h>
44 #include <arch/atomic.h>
45 #include <statistics/statistics.h>
46 
49 
56 
59 
64 static unsigned int terminated = 0;
65 
67 static MPI_Request *termination_reqs;
68 
71 
73 static MPI_Op reduce_stats_op;
74 
76 static MPI_Datatype stats_mpi_t;
77 
100 static MPI_Comm msg_comm;
101 
102 
122 bool pending_msgs(int tag)
123 {
124  int flag = 0;
125  lock_mpi();
126  MPI_Iprobe(MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
127  unlock_mpi();
128  return (bool)flag;
129 }
130 
144 bool is_request_completed(MPI_Request *req)
145 {
146  int flag = 0;
147  lock_mpi();
148  MPI_Test(req, &flag, MPI_STATUS_IGNORE);
149  unlock_mpi();
150  return (bool)flag;
151 }
152 
170 {
171  outgoing_msg *out_msg = allocate_outgoing_msg();
172  out_msg->msg = msg;
173  out_msg->msg->colour = threads_phase_colour[local_tid];
174  unsigned int dest = find_kernel_by_gid(msg->receiver);
175 
176  validate_msg(msg);
177 
178  register_outgoing_msg(out_msg->msg);
179 
180  lock_mpi();
181  MPI_Isend(((char *)out_msg->msg) + MSG_PADDING, MSG_META_SIZE + msg->size, MPI_BYTE, dest, msg->receiver.to_int, msg_comm, &out_msg->req);
182  unlock_mpi();
183 
184  // Keep the message in the outgoing queue until it will be delivered
185  store_outgoing_msg(out_msg, dest);
186 }
187 
211 {
212  int size;
213  msg_t *msg;
214  MPI_Status status;
215  MPI_Message mpi_msg;
216  int pending;
217  struct lp_struct *lp;
218  GID_t gid;
219 
220  // TODO: given the latest changes in the platform, this *might*
221  // be removed.
222  if (!spin_trylock(&msgs_lock))
223  return;
224 
225  while (true) {
226  lock_mpi();
227  MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, msg_comm, &pending, &mpi_msg, &status);
228  unlock_mpi();
229 
230  if (!pending)
231  goto out;
232 
233  MPI_Get_count(&status, MPI_BYTE, &size);
234 
235  if (likely(MSG_PADDING + size <= SLAB_MSG_SIZE)) {
236  set_gid(gid, status.MPI_TAG);
237  lp = find_lp_by_gid(gid);
238  msg = get_msg_from_slab(lp);
239  } else {
240  msg = rsalloc(MSG_PADDING + size);
241  bzero(msg, MSG_PADDING);
242  }
243 
244  // Receive the message. Use MPI_Mrecv to be sure that the very same message
245  // which was matched by the previous MPI_Improbe is extracted.
246  lock_mpi();
247  MPI_Mrecv(((char *)msg) + MSG_PADDING, size, MPI_BYTE, &mpi_msg, MPI_STATUS_IGNORE);
248  unlock_mpi();
249 
250  validate_msg(msg);
251  insert_bottom_half(msg);
252  }
253  out:
254  spin_unlock(&msgs_lock);
255 }
256 
257 
258 
273 {
274  return (terminated == n_ker);
275 }
276 
277 
278 
292 {
293  int res;
294  unsigned int tdata;
295 
296  if (terminated == 0 || !spin_trylock(&msgs_fini))
297  return;
298 
299  while (pending_msgs(MSG_FINI)) {
300  lock_mpi();
301  res =
302  MPI_Recv(&tdata, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, MSG_FINI, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
303  unlock_mpi();
304  if (unlikely(res != 0)) {
305  rootsim_error(true, "MPI_Recv did not complete correctly");
306  return;
307  }
308  terminated++;
309  }
310  spin_unlock(&msgs_fini);
311 }
312 
313 
330 {
331  unsigned int i;
332  lock_mpi();
333  for (i = 0; i < n_ker; i++) {
334  if (i == kid)
335  continue;
336  MPI_Isend(&i, 1, MPI_UNSIGNED, i, MSG_FINI, MPI_COMM_WORLD, &termination_reqs[i]);
337  }
338  terminated++;
339  unlock_mpi();
340 }
341 
342 
350 static void reduce_stat_vector(struct stat_t *in, struct stat_t *inout, int *len, MPI_Datatype *dptr)
351 {
352  (void)dptr;
353  int i = 0;
354 
355  for (i = 0; i < *len; ++i) {
356  inout[i].vec += in[i].vec;
357  inout[i].gvt_round_time += in[i].gvt_round_time;
358  inout[i].gvt_round_time_min = fmin(inout[i].gvt_round_time_min, in[i].gvt_round_time_min);
359  inout[i].gvt_round_time_max = fmax(inout[i].gvt_round_time_max, in[i].gvt_round_time_max);
360  inout[i].max_resident_set += in[i].max_resident_set;
361  }
362 }
363 
364 
365 
367 #define MPI_TYPE_STAT_LEN (sizeof(struct stat_t)/sizeof(double))
368 
383 static void stats_reduction_init(void)
384 {
385  // This is a compilation time fail-safe
386  static_assert(offsetof(struct stat_t, gvt_round_time_max) == (sizeof(double) * 19), "The packing assumptions on struct stat_t are wrong or its definition has been modified");
387 
388  unsigned i;
389 
390  // Boilerplate to create a new MPI data type
391  MPI_Datatype type[MPI_TYPE_STAT_LEN];
392  MPI_Aint disp[MPI_TYPE_STAT_LEN];
393  int block_lengths[MPI_TYPE_STAT_LEN];
394 
395  // Initialize those arrays (we asssume that struct stat_t is packed tightly)
396  for (i = 0; i < MPI_TYPE_STAT_LEN; ++i) {
397  type[i] = MPI_DOUBLE;
398  disp[i] = i * sizeof(double);
399  block_lengths[i] = 1;
400  }
401 
402  // Create the custom type and commit the changes
403  MPI_Type_create_struct(MPI_TYPE_STAT_LEN, block_lengths, disp, type, &stats_mpi_t);
404  MPI_Type_commit(&stats_mpi_t);
405 
406  // Create the MPI Operation used to reduce stats
407  if (master_thread()) {
408  MPI_Op_create((MPI_User_function *)reduce_stat_vector, true, &reduce_stats_op);
409  }
410 }
411 
412 #undef MPI_TYPE_STAT_LEN
413 
414 
430 void mpi_reduce_statistics(struct stat_t *global, struct stat_t *local)
431 {
432  MPI_Reduce(local, global, 1, stats_mpi_t, reduce_stats_op, 0, MPI_COMM_WORLD);
433 }
434 
435 
436 
455 {
456  /* init for collective termination */
457  termination_reqs = rsalloc(n_ker * sizeof(MPI_Request));
458  unsigned int i;
459  for (i = 0; i < n_ker; i++) {
460  termination_reqs[i] = MPI_REQUEST_NULL;
461  }
462  spinlock_init(&msgs_fini);
463 }
464 
465 
466 
474 {
475  MPI_Waitall(n_ker, termination_reqs, MPI_STATUSES_IGNORE);
476 }
477 
494 void syncronize_all(void)
495 {
496  if (master_thread()) {
497  MPI_Comm comm;
498  MPI_Comm_dup(MPI_COMM_WORLD, &comm);
499  MPI_Barrier(comm);
500  MPI_Comm_free(&comm);
501  }
503 }
504 
505 
516 void mpi_init(int *argc, char ***argv)
517 {
518  int mpi_thread_lvl_provided = 0;
519  MPI_Init_thread(argc, argv, MPI_THREAD_MULTIPLE, &mpi_thread_lvl_provided);
520 
522  if (mpi_thread_lvl_provided < MPI_THREAD_MULTIPLE) {
523  //MPI do not support thread safe api call
524  if (mpi_thread_lvl_provided < MPI_THREAD_SERIALIZED) {
525  // MPI do not even support serialized threaded call we cannot continue
526  rootsim_error(true, "The MPI implementation does not support threads [current thread level support: %d]\n", mpi_thread_lvl_provided);
527  }
528  mpi_support_multithread = false;
529  }
530 
531  spinlock_init(&mpi_lock);
532 
533  MPI_Comm_size(MPI_COMM_WORLD, (int *)&n_ker);
534  MPI_Comm_rank(MPI_COMM_WORLD, (int *)&kid);
535 
536  // Create a separate communicator which we use to send event messages
537  MPI_Comm_dup(MPI_COMM_WORLD, &msg_comm);
538 }
539 
540 
548 {
549  spinlock_init(&msgs_lock);
550 
552  gvt_comm_init();
555 }
556 
557 
565 {
567  //outgoing_window_finalize();
569 }
570 
571 
580 void mpi_finalize(void)
581 {
582  if (master_thread()) {
583  MPI_Barrier(MPI_COMM_WORLD);
584  MPI_Comm_free(&msg_comm);
585  MPI_Finalize();
586  } else {
587  rootsim_error(true, "MPI finalize has been invoked by a non master thread: T%u\n", local_tid);
588  }
589 }
590 
591 #endif /* HAVE_MPI */
bool spin_trylock(spinlock_t *s)
Definition: x86.c:151
Communication Routines.
bool is_request_completed(MPI_Request *req)
check if an MPI request has been completed
Definition: mpi.c:144
#define likely(exp)
Optimize the branch as likely taken.
Definition: core.h:72
The structure representing a node in the outgoing_queue list.
Definition: wnd.h:41
void dist_termination_finalize(void)
Cleanup routine of the distributed termination subsystem.
Definition: mpi.c:473
#define spinlock_init(s)
Spinlock initialization.
Definition: atomic.h:72
void gvt_comm_init(void)
Initialize the MPI-based distributed GVT reduction submodule.
Definition: gvt.c:174
void broadcast_termination(void)
Notify all the kernels about local termination.
Definition: mpi.c:329
Message queueing subsystem.
phase_colour * threads_phase_colour
Definition: gvt.c:48
static MPI_Request * termination_reqs
MPI Requests to handle termination detection collection asynchronously.
Definition: mpi.c:67
Core ROOT-Sim functionalities.
#define SLAB_MSG_SIZE
Slab allocator max message size.
Definition: communication.h:43
void mpi_finalize(void)
Finalize MPI.
Definition: mpi.c:580
static MPI_Datatype stats_mpi_t
MPI Datatype to describe the content of a struct stat_t.
Definition: mpi.c:76
unsigned int to_int
The GID numerical value.
Definition: core.h:133
Statistics module.
#define MPI_TYPE_STAT_LEN
The size in bytes of the statistics custom MPI Datatype. It assumes that stat_t contains only double ...
Definition: mpi.c:367
static spinlock_t msgs_lock
A guard to ensure isolation in the the message receiving routine.
Definition: mpi.c:58
static void stats_reduction_init(void)
Initialize MPI Datatype and Operation for statistics reduction.
Definition: mpi.c:383
unsigned int find_kernel_by_gid(GID_t gid)
Definition: core.c:164
void store_outgoing_msg(outgoing_msg *out_msg, unsigned int dest_kid)
Store an outgoing message.
Definition: wnd.c:167
One rank informs the others that the simulation has to be stopped.
Definition: communication.h:88
void register_outgoing_msg(const msg_t *msg)
Register an outgoing message, if necessary.
Definition: gvt.c:589
bool mpi_support_multithread
Flag telling whether the MPI runtime supports multithreading.
Definition: mpi.c:48
void mpi_reduce_statistics(struct stat_t *global, struct stat_t *local)
Invoke statistics reduction.
Definition: mpi.c:430
bool pending_msgs(int tag)
Check if there are pending messages.
Definition: mpi.c:122
bool all_kernels_terminated(void)
Check if all kernels have reached the termination condition.
Definition: mpi.c:272
Message Type definition.
Definition: core.h:164
unsigned int size
Unique identifier of the message, used for rendez-vous events.
Definition: core.h:189
void inter_kernel_comm_finalize(void)
Finalize inter-kernel communication.
Definition: mpi.c:564
MPI_Request req
The MPI Request used to keep track of the delivery operation.
Definition: wnd.h:42
#define unlock_mpi()
This macro releases a global lock if multithreaded support is not available from MPI.
Definition: mpi.h:44
outgoing_msg * allocate_outgoing_msg(void)
Allocate a buffer for an outgoing message node.
Definition: wnd.c:130
bool thread_barrier(barrier_t *b)
Definition: thread.c:200
Atomic operations.
void validate_msg(msg_t *msg)
Perform some sanity checks on a message buffer.
barrier_t all_thread_barrier
Barrier for all worker threads.
Definition: core.c:49
void mpi_init(int *argc, char ***argv)
Initialize MPI subsystem.
Definition: mpi.c:516
spinlock_t mpi_lock
Definition: mpi.c:55
void dist_termination_init(void)
Setup the distributed termination subsystem.
Definition: mpi.c:454
void outgoing_window_init(void)
Outgoing queue initialization.
Definition: wnd.c:86
#define master_thread()
This macro expands to true if the current KLT is the master thread for the local kernel.
Definition: thread.h:155
Definition of a GID.
Definition: core.h:132
void send_remote_msg(msg_t *msg)
Send a message to a remote LP.
Definition: mpi.c:169
msg_t * get_msg_from_slab(struct lp_struct *lp)
Get a buffer to keep a message.
GID_t gid
Global ID of the LP.
Definition: process.h:82
MPI Support Module.
static MPI_Op reduce_stats_op
MPI Operation to reduce statics.
Definition: mpi.c:73
void receive_remote_msgs(void)
Receive remote messages.
Definition: mpi.c:210
msg_t * msg
A pointer to the msg_t which MPI is delivering.
Definition: wnd.h:45
void insert_bottom_half(msg_t *msg)
Definition: queues.c:115
void collect_termination(void)
Check if other kernels have reached the termination condition.
Definition: mpi.c:291
void gvt_comm_finalize(void)
Shut down the MPI-based distributed GVT reduction submodule.
Definition: gvt.c:221
static MPI_Comm msg_comm
MPI Communicator for event/control messages.
Definition: mpi.c:100
Message delivery support.
__thread unsigned int local_tid
Definition: thread.c:72
#define lock_mpi()
This macro takes a global lock if multithread support is not available from MPI.
Definition: mpi.h:41
void syncronize_all(void)
Syncronize all the kernels.
Definition: mpi.c:494
Distributed GVT Support module.
static unsigned int terminated
Definition: mpi.c:64
void inter_kernel_comm_init(void)
Initialize inter-kernel communication.
Definition: mpi.c:547
static void reduce_stat_vector(struct stat_t *in, struct stat_t *inout, int *len, MPI_Datatype *dptr)
Reduce operation for statistics.
Definition: mpi.c:350
unsigned int n_ker
Total number of simulation kernel instances running.
Definition: core.c:58
static spinlock_t msgs_fini
A guard to ensure isolation in collect_termination()
Definition: mpi.c:70
#define unlikely(exp)
Optimize the branch as likely not taken.
Definition: core.h:74
unsigned int kid
Identifier of the local kernel.
Definition: core.c:55
void spin_unlock(spinlock_t *s)
Definition: x86.c:161