The ROme OpTimistic Simulator  2.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
scheduler.c
Go to the documentation of this file.
1 
37 #include <stdlib.h>
38 #include <string.h>
39 #include <stdbool.h>
40 #include <datatypes/list.h>
41 #include <datatypes/msgchannel.h>
42 #include <core/core.h>
43 #include <core/timer.h>
44 #include <arch/atomic.h>
45 #include <arch/ult.h>
46 #include <arch/thread.h>
47 #include <core/init.h>
48 #include <scheduler/binding.h>
49 #include <scheduler/process.h>
50 #include <scheduler/scheduler.h>
51 #include <scheduler/stf.h>
52 #include <mm/state.h>
54 
55 #ifdef HAVE_CROSS_STATE
56 #include <mm/ecs.h>
57 #endif
58 
59 #include <mm/mm.h>
60 #include <statistics/statistics.h>
61 #include <arch/thread.h>
63 #include <gvt/gvt.h>
64 #include <statistics/statistics.h>
66 #include <queues/xxhash.h>
67 
69 __thread unsigned int n_prc_per_thread;
70 
72 __thread struct lp_struct *current;
73 
83 __thread msg_t *current_evt;
84 
85 /*
86 * This function initializes the scheduler. In particular, it relies on MPI to broadcast to every simulation kernel process
87 * which is the actual scheduling algorithm selected.
88 *
89 * @author Francesco Quaglia
90 *
91 * @param sched The scheduler selected initially, but master can decide to change it, so slaves must rely on what master send to them
92 */
93 void scheduler_init(void)
94 {
95 #ifdef HAVE_PREEMPTION
96  preempt_init();
97 #endif
98 }
99 
105 void scheduler_fini(void)
106 {
107 #ifdef HAVE_PREEMPTION
108  preempt_fini();
109 #endif
110 
111  foreach_lp(lp) {
112  rsfree(lp->queue_in);
113  rsfree(lp->queue_out);
114  rsfree(lp->queue_states);
115  rsfree(lp->bottom_halves);
116  rsfree(lp->rendezvous_queue);
117 
118  // Destroy stacks
119  rsfree(lp->stack);
120 
121  rsfree(lp);
122  }
123 
124  rsfree(lps_blocks);
125  rsfree(lps_bound_blocks);
126 }
127 
140 void LP_main_loop(void *args)
141 {
142 #ifdef EXTRA_CHECKS
143  unsigned long long hash1, hash2;
144  hash1 = hash2 = 0;
145 #endif
146 
147  (void)args; // this is to make the compiler stop complaining about unused args
148 
149  // Save a default context
150  context_save(&current->default_context);
151 
152  while (true) {
153 
154 #ifdef EXTRA_CHECKS
155  if (current->bound->size > 0) {
156  hash1 = XXH64(current_evt->event_content, current_evt->size, current->gid);
157  }
158 #endif
159 
160  timer event_timer;
161  timer_start(event_timer);
162 
163  // Process the event
164  if(&abm_settings){
165  ProcessEventABM();
166  }else if (&topology_settings){
167  ProcessEventTopology();
168  }else{
169  switch_to_application_mode();
170  current->ProcessEvent(current->gid.to_int,
171  current_evt->timestamp, current_evt->type,
172  current_evt->event_content,
173  current_evt->size,
174  current->current_base_pointer);
175  switch_to_platform_mode();
176  }
177  int delta_event_timer = timer_value_micro(event_timer);
178 
179 #ifdef EXTRA_CHECKS
180  if (current->bound->size > 0) {
181  hash2 =
182  XXH64(current_evt->event_content, current_evt->size,
183  current->gid);
184  }
185 
186  if (hash1 != hash2) {
187  rootsim_error(true,
188  "Error, LP %d has modified the payload of event %d during its processing. Aborting...\n",
189  current->gid, current->bound->type);
190  }
191 #endif
192 
193  statistics_post_data(current, STAT_EVENT, 1.0);
194  statistics_post_data(current, STAT_EVENT_TIME,
195  delta_event_timer);
196 
197  // Give back control to the simulation kernel's user-level thread
199  }
200 }
201 
202 void initialize_worker_thread(void)
203 {
204  msg_t *init_event;
205 
206  // Divide LPs among worker threads, for the first time here
207  rebind_LPs();
208  if (master_thread() && master_kernel()) {
209  printf("Initializing LPs... ");
210  fflush(stdout);
211  }
212  // Worker Threads synchronization barrier: they all should start working together
214 
215  if (master_thread() && master_kernel())
216  printf("done\n");
217 
218  // Schedule an INIT event to the newly instantiated LP
219  // We need two separate foreach_bound_lp here, because
220  // in this way we are sure that there is at least one
221  // event to be used as the bound and we do not have to make
222  // any check on null throughout the scheduler code.
223  foreach_bound_lp(lp) {
224  pack_msg(&init_event, lp->gid, lp->gid, INIT, 0.0, 0.0, 0, NULL);
225  init_event->mark = generate_mark(lp);
226  list_insert_head(lp->queue_in, init_event);
227  lp->state_log_forced = true;
228  }
229 
231 
232  foreach_bound_lp(lp) {
233  schedule_on_init(lp);
234  }
235 
236  // Worker Threads synchronization barrier: they all should start working together
238 
239 #ifdef HAVE_PREEMPTION
241  enable_preemption();
242 #endif
243 
244 }
245 
260 void activate_LP(struct lp_struct *next, msg_t * evt)
261 {
262 
263  // Notify the LP main execution loop of the information to be used for actual simulation
264  current = next;
265  current_evt = evt;
266 
267 // #ifdef HAVE_PREEMPTION
268 // if(!rootsim_config.disable_preemption)
269 // enable_preemption();
270 // #endif
271 
272 #ifdef HAVE_CROSS_STATE
273  // Activate memory view for the current LP
274  lp_alloc_schedule();
275 #endif
276 
277  if (unlikely(is_blocked_state(next->state))) {
278  rootsim_error(true, "Critical condition: LP %d has a wrong state: %d. Aborting...\n",
279  next->gid.to_int, next->state);
280  }
281 
283 
284 // #ifdef HAVE_PREEMPTION
285 // if(!rootsim_config.disable_preemption)
286 // disable_preemption();
287 // #endif
288 
289 #ifdef HAVE_CROSS_STATE
290  // Deactivate memory view for the current LP if no conflict has arisen
291  if (!is_blocked_state(next->state)) {
292 // printf("Deschedule %d\n",lp);
293  lp_alloc_deschedule();
294  }
295 #endif
296 
297  current = NULL;
298 }
299 
300 bool check_rendevouz_request(struct lp_struct *lp)
301 {
302  msg_t *temp_mess;
303 
304  if (lp->state != LP_STATE_WAIT_FOR_SYNCH)
305  return false;
306 
307  if (lp->bound != NULL && list_next(lp->bound) != NULL) {
308  temp_mess = list_next(lp->bound);
309  return temp_mess->type == RENDEZVOUS_START && lp->wait_on_rendezvous > temp_mess->rendezvous_mark;
310  }
311 
312  return false;
313 }
314 
321 void schedule(void)
322 {
323  struct lp_struct *next;
324  msg_t *event;
325 
326 #ifdef HAVE_CROSS_STATE
327  bool resume_execution = false;
328 #endif
329 
330  // Find the next LP to be scheduled
331  switch (rootsim_config.scheduler) {
332 
333  case SCHEDULER_STF:
334  next = smallest_timestamp_first();
335  break;
336 
337  default:
338  rootsim_error(true, "unrecognized scheduler!");
339  }
340 
341  // No logical process found with events to be processed
342  if (next == NULL) {
343  statistics_post_data(NULL, STAT_IDLE_CYCLES, 1.0);
344  return;
345  }
346  // If we have to rollback
347  if (next->state == LP_STATE_ROLLBACK) {
348  rollback(next);
349  next->state = LP_STATE_READY;
350  send_outgoing_msgs(next);
351  return;
352  }
353 
354  if (!is_blocked_state(next->state)
355  && next->state != LP_STATE_READY_FOR_SYNCH) {
356  event = advance_to_next_event(next);
357  } else {
358  event = next->bound;
359  }
360 
361  // Sanity check: if we get here, it means that lid is a LP which has
362  // at least one event to be executed. If advance_to_next_event() returns
363  // NULL, it means that lid has no events to be executed. This is
364  // a critical condition and we abort.
365  if (unlikely(event == NULL)) {
366  rootsim_error(true,
367  "Critical condition: LP %d seems to have events to be processed, but I cannot find them. Aborting...\n",
368  next->gid);
369  }
370 
371  if (unlikely(!process_control_msg(event))) {
372  return;
373  }
374 #ifdef HAVE_CROSS_STATE
375  // In case we are resuming an interrupted execution, we keep track of this.
376  // If at the end of the scheduling the LP is not blocked, we can unblock all the remote objects
377  if (is_blocked_state(next->state) || next->state == LP_STATE_READY_FOR_SYNCH) {
378  resume_execution = true;
379  }
380 #endif
381 
382  // Schedule the LP user-level thread
383  if (next->state == LP_STATE_READY_FOR_SYNCH)
384  next->state = LP_STATE_RUNNING_ECS;
385  else
386  next->state = LP_STATE_RUNNING;
387 
388  activate_LP(next, event);
389 
390  if (!is_blocked_state(next->state)) {
391  next->state = LP_STATE_READY;
392  send_outgoing_msgs(next);
393  }
394 #ifdef HAVE_CROSS_STATE
395  if (resume_execution && !is_blocked_state(next->state)) {
396  //printf("ECS event is finished mark %llu !!!\n", next->wait_on_rendezvous);
397  fflush(stdout);
398  unblock_synchronized_objects(next);
399 
400  // This is to avoid domino effect when relying on rendezvous messages
401  force_LP_checkpoint(next);
402  }
403 #endif
404 
405  // Log the state, if needed
406  LogState(next);
407 }
408 
409 void schedule_on_init(struct lp_struct *next)
410 {
411  msg_t *event;
412 
413 #ifdef HAVE_CROSS_STATE
414  bool resume_execution = false;
415 #endif
416 
417  event = list_head(next->queue_in);
418  next->bound = event;
419 
420 
421  // Sanity check: if we get here, it means that lid is a LP which has
422  // at least one event to be executed. If advance_to_next_event() returns
423  // NULL, it means that lid has no events to be executed. This is
424  // a critical condition and we abort.
425  if (unlikely(event == NULL) || event->type != INIT) {
426  rootsim_error(true,
427  "Critical condition: LP %d should have an INIT event but I cannot find it. Aborting...\n",
428  next->gid);
429  }
430 
431 #ifdef HAVE_CROSS_STATE
432  // In case we are resuming an interrupted execution, we keep track of this.
433  // If at the end of the scheduling the LP is not blocked, we can unblock all the remote objects
434  if (is_blocked_state(next->state) || next->state == LP_STATE_READY_FOR_SYNCH) {
435  resume_execution = true;
436  }
437 #endif
438 
439  next->state = LP_STATE_RUNNING;
440 
441  activate_LP(next, event);
442 
443  if (!is_blocked_state(next->state)) {
444  next->state = LP_STATE_READY;
445  send_outgoing_msgs(next);
446  }
447 #ifdef HAVE_CROSS_STATE
448  if (resume_execution && !is_blocked_state(next->state)) {
449  //printf("ECS event is finished mark %llu !!!\n", next->wait_on_rendezvous);
450  fflush(stdout);
451  unblock_synchronized_objects(next);
452 
453  // This is to avoid domino effect when relying on rendezvous messages
454  force_LP_checkpoint(next);
455  }
456 #endif
457 
458  // Log the state, if needed
459  LogState(next);
460 }
void scheduler_fini(void)
Definition: scheduler.c:105
Per-thread page table.
Communication Routines.
msg_t * bound
Pointer to the last correctly processed event.
Definition: process.h:106
unsigned long long generate_mark(struct lp_struct *lp)
Definition: queues.c:270
void force_LP_checkpoint(struct lp_struct *lp)
Definition: state.c:364
Initialization routines.
struct lp_struct * smallest_timestamp_first(void)
O(n) scheduler.
Definition: stf.c:53
#define context_switch(context_old, context_new)
Swicth machine context for userspace context switch. This is used to schedule a LP or return control ...
Definition: ult.h:75
void ProcessEventABM(void)
Definition: abm_layer.c:461
struct lp_struct ** lps_blocks
Maintain LPs&#39; simulation and execution states.
Definition: process.c:44
User-Level Threads Headers.
A (M, 1) channel for messages.
Core ROOT-Sim functionalities.
unsigned int to_int
The GID numerical value.
Definition: core.h:133
Timers.
Statistics module.
The ROOT-Sim scheduler main module header.
Load sharing rules across worker threads.
Generic thread management facilities.
__thread struct lp_struct * current
This is a per-thread variable pointing to the block state of the LP currently scheduled.
Definition: scheduler.c:72
void schedule(void)
Definition: scheduler.c:321
ECS protocol: start synchronizing two LPs for a page fault.
Definition: communication.h:65
int scheduler
Which scheduler to be used.
Definition: init.h:59
void LP_main_loop(void *args)
Definition: scheduler.c:140
simulation_configuration rootsim_config
This global variable holds the configuration for the current simulation.
Definition: core.c:70
__thread msg_t * current_evt
Definition: scheduler.c:83
Memory Manager main header.
Generic Lists.
Message Type definition.
Definition: core.h:164
unsigned int size
Unique identifier of the message, used for rendez-vous events.
Definition: core.h:189
void rollback(struct lp_struct *lp)
Definition: state.c:220
__thread struct lp_struct ** lps_bound_blocks
Definition: process.c:49
bool thread_barrier(barrier_t *b)
Definition: thread.c:200
__thread kernel_context_t kernel_context
This is the execution context of the simulation kernel.
Definition: ult.c:51
LP control blocks.
void activate_LP(struct lp_struct *next, msg_t *evt)
Definition: scheduler.c:260
Atomic operations.
unsigned long long rendezvous_mark
Unique identifier of the message, used for antimessages.
Definition: core.h:186
void pack_msg(msg_t **msg, GID_t sender, GID_t receiver, int type, simtime_t timestamp, simtime_t send_time, size_t size, void *payload)
Pack a message in a platform-level data structure.
barrier_t all_thread_barrier
Barrier for all worker threads.
Definition: core.c:49
#define list_head(list)
Definition: list.h:74
#define master_thread()
This macro expands to true if the current KLT is the master thread for the local kernel.
Definition: thread.h:155
GID_t gid
Global ID of the LP.
Definition: process.h:82
short unsigned int state
Current execution state of the LP.
Definition: process.h:88
LP state management.
#define master_kernel()
This macro expands to true if the local kernel is the master kernel.
Definition: core.h:47
LP_context_t context
LP execution state.
Definition: process.h:67
Global Virtual Time.
O(n) scheduling algorithm.
LP_context_t default_context
LP execution state when blocked during the execution of an event.
Definition: process.h:70
Fast Hash Algorithm.
void(* ProcessEvent)(unsigned int me, simtime_t now, int event_type, void *event_content, unsigned int size, void *state)
Definition: process.h:136
msg_t * advance_to_next_event(struct lp_struct *lp)
Definition: queues.c:93
__thread unsigned int n_prc_per_thread
This is used to keep track of how many LPs were bound to the current KLT.
Definition: scheduler.c:69
#define list_next(ptr)
Definition: list.h:88
#define INIT
This is the message code which is sent by the simulation kernel upon startup.
Definition: ROOT-Sim.h:52
#define context_save(context)
Save machine context for userspace context switch.
Definition: ult.h:69
void rebind_LPs(void)
Definition: binding.c:252
Event & Cross State Synchornization.
void * current_base_pointer
The current state base pointer (updated by SetState())
Definition: process.h:100
bool disable_preemption
If compiled for preemptive Time Warp, it can be disabled at runtime.
Definition: init.h:77
#define unlikely(exp)
Optimize the branch as likely not taken.
Definition: core.h:74
bool LogState(struct lp_struct *lp)
Definition: state.c:55
void send_outgoing_msgs(struct lp_struct *lp)
Send all pending outgoing messages.