126 MPI_Iprobe(MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
148 MPI_Test(req, &flag, MPI_STATUS_IGNORE);
179 MPI_Isend(((
char *)out_msg->
msg) + MSG_PADDING, MSG_META_SIZE + msg->
size, MPI_BYTE, dest, msg->receiver.
to_int,
msg_comm, &out_msg->
req);
225 MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG,
msg_comm, &pending, &mpi_msg, &status);
231 MPI_Get_count(&status, MPI_BYTE, &size);
234 set_gid(gid, status.MPI_TAG);
235 lp = find_lp_by_gid(gid);
238 msg = rsalloc(MSG_PADDING + size);
239 bzero(msg, MSG_PADDING);
245 MPI_Mrecv(((
char *)msg) + MSG_PADDING, size, MPI_BYTE, &mpi_msg, MPI_STATUS_IGNORE);
300 MPI_Recv(&tdata, 1, MPI_UNSIGNED, MPI_ANY_SOURCE,
MSG_FINI, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
303 rootsim_error(
true,
"MPI_Recv did not complete correctly");
331 for (i = 0; i <
n_ker; i++) {
353 for (i = 0; i < *len; ++i) {
354 inout[i].vec += in[i].vec;
355 inout[i].gvt_round_time += in[i].gvt_round_time;
356 inout[i].gvt_round_time_min = fmin(inout[i].gvt_round_time_min, in[i].gvt_round_time_min);
357 inout[i].gvt_round_time_max = fmax(inout[i].gvt_round_time_max, in[i].gvt_round_time_max);
358 inout[i].max_resident_set += in[i].max_resident_set;
365 #define MPI_TYPE_STAT_LEN (sizeof(struct stat_t)/sizeof(double)) 384 static_assert(offsetof(
struct stat_t, gvt_round_time_max) == (
sizeof(
double) * 19),
"The packing assumptions on struct stat_t are wrong or its definition has been modified");
395 type[i] = MPI_DOUBLE;
396 disp[i] = i *
sizeof(double);
397 block_lengths[i] = 1;
401 MPI_Type_create_struct(MPI_TYPE_STAT_LEN, block_lengths, disp, type, &
stats_mpi_t);
410 #undef MPI_TYPE_STAT_LEN 457 for (i = 0; i <
n_ker; i++) {
496 MPI_Comm_dup(MPI_COMM_WORLD, &comm);
498 MPI_Comm_free(&comm);
516 int mpi_thread_lvl_provided = 0;
517 MPI_Init_thread(argc, argv, MPI_THREAD_MULTIPLE, &mpi_thread_lvl_provided);
520 if (mpi_thread_lvl_provided < MPI_THREAD_MULTIPLE) {
522 if (mpi_thread_lvl_provided < MPI_THREAD_SERIALIZED) {
524 rootsim_error(
true,
"The MPI implementation does not support threads [current thread level support: %d]\n", mpi_thread_lvl_provided);
531 MPI_Comm_size(MPI_COMM_WORLD, (
int *)&
n_ker);
532 MPI_Comm_rank(MPI_COMM_WORLD, (
int *)&
kid);
535 MPI_Comm_dup(MPI_COMM_WORLD, &
msg_comm);
581 MPI_Barrier(MPI_COMM_WORLD);
585 rootsim_error(
true,
"MPI finalize has been invoked by a non master thread: T%u\n",
local_tid);
bool spin_trylock(spinlock_t *s)
bool is_request_completed(MPI_Request *req)
check if an MPI request has been completed
#define likely(exp)
Optimize the branch as likely taken.
The structure representing a node in the outgoing_queue list.
void dist_termination_finalize(void)
Cleanup routine of the distributed termination subsystem.
#define spinlock_init(s)
Spinlock initialization.
void gvt_comm_init(void)
Initialize the MPI-based distributed GVT reduction submodule.
void broadcast_termination(void)
Notify all the kernels about local termination.
Message queueing subsystem.
phase_colour * threads_phase_colour
static MPI_Request * termination_reqs
MPI Requests to handle termination detection collection asynchronously.
Core ROOT-Sim functionalities.
#define SLAB_MSG_SIZE
Slab allocator max message size.
void mpi_finalize(void)
Finalize MPI.
static MPI_Datatype stats_mpi_t
MPI Datatype to describe the content of a struct stat_t.
unsigned int to_int
The GID numerical value.
#define MPI_TYPE_STAT_LEN
The size in bytes of the statistics custom MPI Datatype. It assumes that stat_t contains only double ...
static spinlock_t msgs_lock
A guard to ensure isolation in the the message receiving routine.
static void stats_reduction_init(void)
Initialize MPI Datatype and Operation for statistics reduction.
unsigned int find_kernel_by_gid(GID_t gid)
void store_outgoing_msg(outgoing_msg *out_msg, unsigned int dest_kid)
Store an outgoing message.
One rank informs the others that the simulation has to be stopped.
void register_outgoing_msg(const msg_t *msg)
Register an outgoing message, if necessary.
bool mpi_support_multithread
Flag telling whether the MPI runtime supports multithreading.
void mpi_reduce_statistics(struct stat_t *global, struct stat_t *local)
Invoke statistics reduction.
bool pending_msgs(int tag)
Check if there are pending messages.
bool all_kernels_terminated(void)
Check if all kernels have reached the termination condition.
unsigned int size
Unique identifier of the message, used for rendez-vous events.
void inter_kernel_comm_finalize(void)
Finalize inter-kernel communication.
MPI_Request req
The MPI Request used to keep track of the delivery operation.
#define unlock_mpi()
This macro releases a global lock if multithreaded support is not available from MPI.
outgoing_msg * allocate_outgoing_msg(void)
Allocate a buffer for an outgoing message node.
bool thread_barrier(barrier_t *b)
void validate_msg(msg_t *msg)
Perform some sanity checks on a message buffer.
barrier_t all_thread_barrier
Barrier for all worker threads.
void mpi_init(int *argc, char ***argv)
Initialize MPI subsystem.
void dist_termination_init(void)
Setup the distributed termination subsystem.
void outgoing_window_init(void)
Outgoing queue initialization.
#define master_thread()
This macro expands to true if the current KLT is the master thread for the local kernel.
void send_remote_msg(msg_t *msg)
Send a message to a remote LP.
msg_t * get_msg_from_slab(struct lp_struct *lp)
Get a buffer to keep a message.
GID_t gid
Global ID of the LP.
static MPI_Op reduce_stats_op
MPI Operation to reduce statics.
void receive_remote_msgs(void)
Receive remote messages.
msg_t * msg
A pointer to the msg_t which MPI is delivering.
void insert_bottom_half(msg_t *msg)
void collect_termination(void)
Check if other kernels have reached the termination condition.
void gvt_comm_finalize(void)
Shut down the MPI-based distributed GVT reduction submodule.
static MPI_Comm msg_comm
MPI Communicator for event/control messages.
Message delivery support.
__thread unsigned int local_tid
#define lock_mpi()
This macro takes a global lock if multithread support is not available from MPI.
void syncronize_all(void)
Syncronize all the kernels.
Distributed GVT Support module.
static unsigned int terminated
void inter_kernel_comm_init(void)
Initialize inter-kernel communication.
static void reduce_stat_vector(struct stat_t *in, struct stat_t *inout, int *len, MPI_Datatype *dptr)
Reduce operation for statistics.
unsigned int n_ker
Total number of simulation kernel instances running.
static spinlock_t msgs_fini
A guard to ensure isolation in collect_termination()
#define unlikely(exp)
Optimize the branch as likely not taken.
unsigned int kid
Identifier of the local kernel.
void spin_unlock(spinlock_t *s)