The ROme OpTimistic Simulator  2.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
communication.c
Go to the documentation of this file.
1 
32 #include <stdlib.h>
33 #include <float.h>
34 
35 #include <core/core.h>
36 #include <gvt/gvt.h>
37 #include <queues/queues.h>
39 #include <statistics/statistics.h>
40 #include <scheduler/scheduler.h>
41 #include <scheduler/process.h>
42 #include <datatypes/list.h>
43 #include <mm/mm.h>
44 #include <arch/atomic.h>
45 #ifdef HAVE_MPI
46 #include <communication/mpi.h>
47 #endif
48 
50 void (*ScheduleNewEvent)(unsigned int gid_receiver, simtime_t timestamp, unsigned int event_type, void *event_content, unsigned int event_size);
51 
60 {
61 #ifdef HAVE_MPI
63 #endif
64 }
65 
66 
76 {
77 #ifdef HAVE_MPI
79  mpi_finalize();
80 #endif
81 
82  // Release memory used for remaining input/output queues
83  foreach_lp(lp) {
84  while (!list_empty(lp->queue_in)) {
85  list_pop(lp->queue_in);
86  }
87  while (!list_empty(lp->queue_out)) {
88  list_pop(lp->queue_out);
89  }
90  }
91 }
92 
93 
112 static inline struct lp_struct *which_slab_to_use(GID_t sender, GID_t receiver)
113 {
114  if (find_kernel_by_gid(receiver) == kid)
115  return find_lp_by_gid(receiver);
116  return find_lp_by_gid(sender);
117 }
118 
119 
131 {
132  struct lp_struct *lp;
133 
134  lp = find_lp_by_gid(msg->sender);
135  slab_free(lp->mm->slab, msg);
136 }
137 
138 
161 {
162  // TODO: The magnitude of this hack compares to that of the national debt.
163  // We are wasting a lot of memory from the LP buddy just to keep antimessages!
164  msg_hdr_t *msg = (msg_hdr_t *) get_msg_from_slab(lp);
165  bzero(msg, SLAB_MSG_SIZE);
166  return msg;
167 }
168 
169 
193 {
194  msg_t *msg = (msg_t *) slab_alloc(lp->mm->slab);
195  bzero(msg, SLAB_MSG_SIZE);
196  return msg;
197 }
198 
199 
224 void msg_release(msg_t *msg)
225 {
226  struct lp_struct *lp;
227 
228  if (likely(sizeof(msg_t) + msg->size <= SLAB_MSG_SIZE)) {
229  lp = which_slab_to_use(msg->sender, msg->receiver);
230  slab_free(lp->mm->slab, msg);
231  } else {
232  rsfree(msg);
233  }
234 }
235 
236 
263 void ParallelScheduleNewEvent(unsigned int gid_receiver, simtime_t timestamp, unsigned int event_type, void *event_content, unsigned int event_size)
264 {
265  msg_t *event;
266  GID_t receiver;
267 
268  switch_to_platform_mode();
269 
270  // Internally to the platform, the receiver is a GID, while models
271  // have no difference across GIDs and LIDs. We convert here the passed
272  // id to a GID.
273  set_gid(receiver, gid_receiver);
274 
275  // In Silent execution, we do not send again already sent messages
276  if (unlikely(current->state == LP_STATE_SILENT_EXEC)) {
277  return;
278  }
279  // Check whether the destination LP is out of range
280  if (unlikely(gid_receiver > n_prc_tot - 1)) { // It's unsigned, so no need to check whether it's < 0
281  rootsim_error(false, "Warning: the destination LP %u is out of range. The event has been ignored\n", gid_receiver);
282  goto out;
283  }
284  // Check if the associated timestamp is negative
285  if (unlikely(timestamp < lvt(current))) {
286  rootsim_error(true, "LP %u is trying to generate an event (type %d) to %u in the past! (Current LVT = %f, generated event's timestamp = %f) Aborting...\n",
287  current->gid, event_type, gid_receiver,
288  lvt(current), timestamp);
289  }
290  // Check if the event type is mapped to an internal control message
291  if (unlikely(event_type >= RESERVED_MSG_CODE)) {
292  rootsim_error(true, "LP %u is generating an event with type %d which is a reserved type. Switch event type to a value less than %d. Aborting...\n",
293  current->gid, event_type, MIN_VALUE_CONTROL);
294  }
295 
296  // Copy all the information into the event structure
297  pack_msg(&event, current->gid, receiver, event_type, timestamp, lvt(current), event_size, event_content);
298  event->mark = generate_mark(current);
299 
300  if (unlikely(event->type == RENDEZVOUS_START)) {
301  event->rendezvous_mark = current_evt->rendezvous_mark;
302  printf("rendezvous_start mark=%llu\n", event->rendezvous_mark);
303  fflush(stdout);
304  }
305 
306  insert_outgoing_msg(event);
307 
308  out:
309  switch_to_application_mode();
310 }
311 
312 
327 void send_antimessages(struct lp_struct *lp, simtime_t after_simtime)
328 {
329  msg_hdr_t *anti_msg, *anti_msg_prev;
330  msg_t *msg;
331 
332  if (unlikely(list_empty(lp->queue_out)))
333  return;
334 
335  // Scan the output queue backwards, sending all required antimessages
336  anti_msg = list_tail(lp->queue_out);
337  while (anti_msg != NULL && anti_msg->send_time > after_simtime) {
338  msg = get_msg_from_slab(which_slab_to_use(anti_msg->sender, anti_msg->receiver));
339  hdr_to_msg(anti_msg, msg);
340  msg->message_kind = negative;
341 
342  Send(msg);
343 
344  // Remove the already-sent antimessage from the output queue
345  anti_msg_prev = list_prev(anti_msg);
346  list_delete_by_content(lp->queue_out, anti_msg);
347  msg_hdr_release(anti_msg);
348  anti_msg = anti_msg_prev;
349  }
350 }
351 
352 
353 
367 void Send(msg_t *msg)
368 {
369  validate_msg(msg);
370 
371 #ifdef HAVE_MPI
372  // Check whether the message recepient kernel is remote
373  if (find_kernel_by_gid(msg->receiver) != kid) {
374  send_remote_msg(msg);
375  return;
376  }
377 #endif
378  insert_bottom_half(msg);
379 }
380 
381 
397 {
398 
399  // if the model is generating many events at the same time, reallocate the outgoing buffer
403  }
404 
406 
407  // Store the minimum timestamp of outgoing messages
408  // TODO: check whether this is still used by preemptive Time Warp or not
409  if (msg->timestamp < current->outgoing_buffer.min_in_transit[current->worker_thread]) {
411  }
412 }
413 
414 
432 {
433  register unsigned int i = 0;
434  msg_t *msg;
435  msg_hdr_t *msg_hdr;
436 
437  for (i = 0; i < lp->outgoing_buffer.size; i++) {
438  msg_hdr = get_msg_hdr_from_slab(lp);
439  msg = lp->outgoing_buffer.outgoing_msgs[i];
440  msg_to_hdr(msg_hdr, msg);
441 
442  Send(msg);
443 
444  // register the message in the sender's output queue, for antimessage management
445  list_insert(lp->queue_out, send_time, msg_hdr);
446  }
447 
448  lp->outgoing_buffer.size = 0;
449 }
450 
451 
494 void pack_msg(msg_t **msg, GID_t sender, GID_t receiver, int type, simtime_t timestamp, simtime_t send_time, size_t size, void *payload)
495 {
496  // Check if we can rely on a slab to initialize the message
497  if (likely(sizeof(msg_t) + size <= SLAB_MSG_SIZE)) {
498  *msg = get_msg_from_slab(which_slab_to_use(sender, receiver));
499  } else {
500  *msg = rsalloc(sizeof(msg_t) + size);
501  bzero(*msg, sizeof(msg_t) + size);
502  }
503 
504  (*msg)->sender = sender;
505  (*msg)->receiver = receiver;
506  (*msg)->type = type;
507  (*msg)->message_kind = positive;
508  (*msg)->timestamp = timestamp;
509  (*msg)->send_time = send_time;
510  (*msg)->size = size;
511  // TODO: si può generare qua dentro la marca, perchĂ© si usa sempre il sender. Occhio al gid/lid!!!!
512 
513  if (payload != NULL && size > 0)
514  memcpy((*msg)->event_content, payload, size);
515 }
516 
517 
532 void msg_to_hdr(msg_hdr_t *hdr, msg_t *msg)
533 {
534  validate_msg(msg);
535 
536  hdr->sender = msg->sender;
537  hdr->receiver = msg->receiver;
538  hdr->type = msg->type;
539  hdr->rendezvous_mark = msg->rendezvous_mark;
540  hdr->timestamp = msg->timestamp;
541  hdr->send_time = msg->send_time;
542  hdr->mark = msg->mark;
543 }
544 
545 
568 void hdr_to_msg(msg_hdr_t *hdr, msg_t *msg)
569 {
570  msg->sender = hdr->sender;
571  msg->receiver = hdr->receiver;
572  msg->type = hdr->type;
573  msg->rendezvous_mark = hdr->rendezvous_mark;
574  msg->timestamp = hdr->timestamp;
575  msg->send_time = hdr->send_time;
576  msg->mark = hdr->mark;
577 }
578 
579 
590 {
591  printf("\tsender: %u\n", msg->sender.to_int);
592  printf("\treceiver: %u\n", msg->sender.to_int);
593 #ifdef HAVE_MPI
594  printf("\tcolour: %d\n", msg->colour);
595 #endif
596  printf("\ttype: %d\n", msg->type);
597  printf("\tmessage_kind: %d\n", msg->message_kind);
598  printf("\ttimestamp: %f\n", msg->timestamp);
599  printf("\tsend_time: %f\n", msg->send_time);
600  printf("\tmark: %llu\n", msg->mark);
601  printf("\trendezvous_mark: %llu\n", msg->rendezvous_mark);
602  printf("\tsize: %d\n", msg->size);
603 }
604 
605 
606 
607 #ifndef NDEBUG
608 
628 unsigned int mark_to_gid(unsigned long long mark)
629 {
630  double z = (double)mark;
631  double w = floor((sqrt(8 * z + 1) - 1) / 2.0);
632  double t = (w * w + w) / 2.0;
633  double y = z - t;
634  double x = w - y;
635 
636  return (int)x;
637 }
638 
639 
664 void validate_msg(msg_t *msg)
665 {
666  assert(msg->sender.to_int <= n_prc_tot);
667  assert(msg->receiver.to_int <= n_prc_tot);
668  assert(msg->message_kind == positive || msg->message_kind == negative || msg->message_kind == control);
669  assert(mark_to_gid(msg->mark) <= n_prc_tot);
670  assert(mark_to_gid(msg->rendezvous_mark) <= n_prc_tot);
671  assert(msg->type < MAX_VALUE_CONTROL);
672 }
673 #endif
Communication Routines.
#define lvt(lp)
Definition: process.h:168
unsigned long long generate_mark(struct lp_struct *lp)
Definition: queues.c:270
#define likely(exp)
Optimize the branch as likely taken.
Definition: core.h:72
Message queueing subsystem.
Core ROOT-Sim functionalities.
#define SLAB_MSG_SIZE
Slab allocator max message size.
Definition: communication.h:43
void mpi_finalize(void)
Finalize MPI.
Definition: mpi.c:578
unsigned int mark_to_gid(unsigned long long mark)
Tell the GID of the sender of a message, given its mark.
unsigned int max_size
Total space in outgoing_msgs.
simtime_t timestamp
Unique identifier of the message, used for rendez-vous event.
Definition: core.h:205
Anything after this value is considered as an impossible message.
Definition: communication.h:72
void msg_to_hdr(msg_hdr_t *hdr, msg_t *msg)
Convert a message to a message header.
void ParallelScheduleNewEvent(unsigned int gid_receiver, simtime_t timestamp, unsigned int event_type, void *event_content, unsigned int event_size)
Schedule a new message to some LP.
unsigned int to_int
The GID numerical value.
Definition: core.h:133
Statistics module.
#define list_empty(list)
Definition: list.h:110
The ROOT-Sim scheduler main module header.
struct memory_map * mm
Memory map of the LP.
Definition: process.h:76
void dump_msg_content(msg_t *msg)
Dump the content of a message.
#define list_prev(ptr)
Definition: list.h:95
Separation value between model and platform messages.
Definition: communication.h:64
__thread struct lp_struct * current
This is a per-thread variable pointing to the block state of the LP currently scheduled.
Definition: scheduler.c:72
unsigned int find_kernel_by_gid(GID_t gid)
Definition: core.c:164
ECS protocol: start synchronizing two LPs for a page fault.
Definition: communication.h:65
#define list_insert(li, key_name, data)
Insert a new node in the list.
Definition: list.h:163
double simtime_t
This defines the type with whom timestamps are represented.
Definition: ROOT-Sim.h:55
__thread msg_t * current_evt
Definition: scheduler.c:83
Memory Manager main header.
unsigned long long mark
Unique identifier within the LP.
Definition: process.h:121
Generic Lists.
Message Type definition.
Definition: core.h:164
unsigned int size
Unique identifier of the message, used for rendez-vous events.
Definition: core.h:189
void inter_kernel_comm_finalize(void)
Finalize inter-kernel communication.
Definition: mpi.c:562
void insert_outgoing_msg(msg_t *msg)
Place a message in the temporary LP outgoing buffer.
void Send(msg_t *msg)
Send a message.
LP control blocks.
Atomic operations.
unsigned long long rendezvous_mark
Unique identifier of the message, used for antimessages.
Definition: core.h:186
void pack_msg(msg_t **msg, GID_t sender, GID_t receiver, int type, simtime_t timestamp, simtime_t send_time, size_t size, void *payload)
Pack a message in a platform-level data structure.
void validate_msg(msg_t *msg)
Perform some sanity checks on a message buffer.
static struct lp_struct * which_slab_to_use(GID_t sender, GID_t receiver)
Find a slab to allocate a message buffer.
void(* ScheduleNewEvent)(unsigned int gid_receiver, simtime_t timestamp, unsigned int event_type, void *event_content, unsigned int event_size)
This is the function pointer to correctly set ScheduleNewEvent API version, depending if we&#39;re runnin...
Definition: communication.c:50
Definition of a GID.
Definition: core.h:132
void msg_release(msg_t *msg)
Release a message buffer.
void send_remote_msg(msg_t *msg)
Send a message to a remote LP.
Definition: mpi.c:169
msg_t * get_msg_from_slab(struct lp_struct *lp)
Get a buffer to keep a message.
outgoing_t outgoing_buffer
Buffer used by KLTs for buffering outgoing messages during the execution of an event.
Definition: process.h:124
GID_t gid
Global ID of the LP.
Definition: process.h:82
MPI Support Module.
short unsigned int state
Current execution state of the LP.
Definition: process.h:88
Global Virtual Time.
void msg_hdr_release(msg_hdr_t *msg)
Release a message header.
void communication_fini(void)
Finalize the communication subsystem.
Definition: communication.c:75
void insert_bottom_half(msg_t *msg)
Definition: queues.c:115
void send_antimessages(struct lp_struct *lp, simtime_t after_simtime)
Send all antimessages for a certain LP.
unsigned int size
How many events is this currently keeping.
Message envelope definition. This is used to handle the output queue and stores information needed to...
Definition: core.h:194
msg_t ** outgoing_msgs
Resizable array of message pointers.
simtime_t * min_in_transit
Smallest timestamp of events kept here.
void communication_init(void)
Initialize the communication subsystem.
Definition: communication.c:59
#define list_tail(list)
Definition: list.h:81
unsigned int n_prc_tot
Total number of logical processes running in the simulation.
Definition: core.c:64
void hdr_to_msg(msg_hdr_t *hdr, msg_t *msg)
convert a message header into a message
void inter_kernel_comm_init(void)
Initialize inter-kernel communication.
Definition: mpi.c:545
unsigned int worker_thread
ID of the worker thread towards which the LP is bound.
Definition: process.h:85
msg_hdr_t * get_msg_hdr_from_slab(struct lp_struct *lp)
Get a buffer to keep a message header.
#define unlikely(exp)
Optimize the branch as likely not taken.
Definition: core.h:74
void send_outgoing_msgs(struct lp_struct *lp)
Send all pending outgoing messages.
unsigned int kid
Identifier of the local kernel.
Definition: core.c:55