The ROme OpTimistic Simulator  2.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
queues.c
Go to the documentation of this file.
1 
33 #include <stdlib.h>
34 #include <string.h>
35 #include <assert.h>
36 
37 #include <scheduler/process.h>
38 #include <core/core.h>
39 #include <arch/atomic.h>
40 #include <arch/thread.h>
41 #include <datatypes/list.h>
42 #include <datatypes/msgchannel.h>
43 #include <queues/queues.h>
44 #include <mm/state.h>
45 #include <mm/mm.h>
46 #include <scheduler/scheduler.h>
48 #include <communication/gvt.h>
49 #include <statistics/statistics.h>
50 #include <gvt/gvt.h>
51 
63 {
64  msg_t *evt;
65 
66  // The bound can be NULL in the first execution or if it has gone back
67  if (unlikely(lp->bound == NULL && !list_empty(lp->queue_in))) {
68  return list_head(lp->queue_in)->timestamp;
69  } else {
70  evt = list_next(lp->bound);
71  if (likely(evt != NULL)) {
72  return evt->timestamp;
73  }
74  }
75 
76  return INFTY;
77 
78 }
79 
94 {
95  if (likely(list_next(lp->bound) != NULL)) {
96  lp->bound = list_next(lp->bound);
97  } else {
98  return NULL;
99  }
100 
101  return lp->bound;
102 }
103 
116 {
117  struct lp_struct *lp = find_lp_by_gid(msg->receiver);
118 
119  validate_msg(msg);
120 
121  insert_msg(lp->bottom_halves, msg);
122 #ifdef HAVE_PREEMPTION
123  update_min_in_transit(lp->worker_thread, msg->timestamp);
124 #endif
125 }
126 
133 {
134  struct lp_struct *receiver;
135 
136  msg_t *msg_to_process;
137  msg_t *matched_msg;
138 
139  foreach_bound_lp(lp) {
140 
141  while ((msg_to_process = get_msg(lp->bottom_halves)) != NULL) {
142  receiver = find_lp_by_gid(msg_to_process->receiver);
143 
144  // Sanity check
145  if (unlikely
146  (msg_to_process->timestamp < get_last_gvt()))
147  rootsim_error(true,
148  "The impossible happened: I'm receiving a message before the GVT\n");
149 
150  // Handle control messages
151  if (unlikely(!receive_control_msg(msg_to_process))) {
152  msg_release(msg_to_process);
153  continue;
154  }
155 
156  switch (msg_to_process->message_kind) {
157 
158  // It's an antimessage
159  case negative:
160 
161  statistics_post_data(receiver, STAT_ANTIMESSAGE, 1.0);
162 
163  // Find the message matching the antimessage
164  matched_msg = list_tail(receiver->queue_in);
165  while (matched_msg != NULL
166  && matched_msg->mark !=
167  msg_to_process->mark) {
168  matched_msg = list_prev(matched_msg);
169  }
170 
171  // Sanity check
172  if (unlikely(matched_msg == NULL)) {
173  rootsim_error(false,
174  "LP %d Received an antimessage, but no such mark has been found!\n",
175  receiver->gid.to_int);
176  dump_msg_content(msg_to_process);
177  rootsim_error(true, "Aborting...\n");
178  }
179  // If the matched message is in the past, we have to rollback
180  if (matched_msg->timestamp <= lvt(receiver)) {
181 
182  receiver->bound = list_prev(matched_msg);
183  while ((receiver->bound != NULL)
184  && D_EQUAL(receiver->bound->timestamp, msg_to_process->timestamp)) {
185  receiver->bound = list_prev(receiver->bound);
186  }
187 
188  receiver->state = LP_STATE_ROLLBACK;
189  }
190 #ifdef HAVE_MPI
191  register_incoming_msg(msg_to_process);
192 #endif
193 
194  // Delete the matched message
195  list_delete_by_content(receiver->queue_in,
196  matched_msg);
197  msg_release(matched_msg);
198 
199  break;
200 
201  // It's a positive message
202  case positive:
203 
204  // A positive message is directly placed in the queue
205  list_insert(receiver->queue_in, timestamp,
206  msg_to_process);
207 
208  // Check if we've just inserted an out-of-order event.
209  // Here we check for a strictly minor timestamp since
210  // the queue is FIFO for same-timestamp events. Therefore,
211  // A contemporaneous event does not cause a causal violation.
212  if (msg_to_process->timestamp < lvt(receiver)) {
213 
214  receiver->bound = list_prev(msg_to_process);
215  while ((receiver->bound != NULL)
216  && D_EQUAL(receiver->bound->timestamp, msg_to_process->timestamp)) {
217  receiver->bound = list_prev(receiver->bound);
218  }
219 
220  receiver->state = LP_STATE_ROLLBACK;
221  }
222 #ifdef HAVE_MPI
223  register_incoming_msg(msg_to_process);
224 #endif
225  break;
226 
227  // It's a control message
228  case control:
229 
230  // Check if it is an anti control message
231  if (!anti_control_message(msg_to_process)) {
232  msg_release(msg_to_process);
233  continue;
234  }
235 
236  break;
237 
238  default:
239  rootsim_error(true, "Received a message which is neither positive nor negative. Aborting...\n");
240  }
241  }
242  }
243 
244  // We have processed all in transit messages.
245  // Actually, during this operation, some new in transit messages could
246  // be placed by other threads. In this case, we loose their presence.
247  // This is not a correctness error. The only issue could be that the
248  // preemptive scheme will not detect this, and some events could
249  // be in fact executed out of order.
250 #ifdef HAVE_PREEMPTION
251  reset_min_in_transit(local_tid);
252 #endif
253 }
254 
270 unsigned long long generate_mark(struct lp_struct *lp)
271 {
272  unsigned long long k1 = (unsigned long long)lp->gid.to_int;
273  unsigned long long k2 = lp->mark++;
274 
275  return (unsigned long long)(((k1 + k2) * (k1 + k2 + 1) / 2) + k2);
276 }
void register_incoming_msg(const msg_t *msg)
Register an incoming message, if necessary.
Definition: gvt.c:623
Communication Routines.
#define lvt(lp)
Definition: process.h:168
msg_t * bound
Pointer to the last correctly processed event.
Definition: process.h:106
unsigned long long generate_mark(struct lp_struct *lp)
Definition: queues.c:270
void process_bottom_halves(void)
Definition: queues.c:132
#define likely(exp)
Optimize the branch as likely taken.
Definition: core.h:72
simtime_t next_event_timestamp(struct lp_struct *lp)
Definition: queues.c:62
Message queueing subsystem.
A (M, 1) channel for messages.
Core ROOT-Sim functionalities.
msg_channel * bottom_halves
Bottom halves.
Definition: process.h:115
simtime_t get_last_gvt(void)
Definition: gvt.c:179
unsigned int to_int
The GID numerical value.
Definition: core.h:133
Statistics module.
#define list_empty(list)
Definition: list.h:110
The ROOT-Sim scheduler main module header.
void dump_msg_content(msg_t *msg)
Dump the content of a message.
#define list_prev(ptr)
Definition: list.h:95
Generic thread management facilities.
#define list_insert(li, key_name, data)
Insert a new node in the list.
Definition: list.h:163
#define INFTY
Infinite timestamp: this is the highest timestamp in a simulation run.
Definition: ROOT-Sim.h:58
double simtime_t
This defines the type with whom timestamps are represented.
Definition: ROOT-Sim.h:55
Memory Manager main header.
unsigned long long mark
Unique identifier within the LP.
Definition: process.h:121
Generic Lists.
Message Type definition.
Definition: core.h:164
LP control blocks.
Atomic operations.
void validate_msg(msg_t *msg)
Perform some sanity checks on a message buffer.
#define list_head(list)
Definition: list.h:74
#define D_EQUAL(a, b)
Equality condition for doubles.
Definition: core.h:94
void msg_release(msg_t *msg)
Release a message buffer.
GID_t gid
Global ID of the LP.
Definition: process.h:82
short unsigned int state
Current execution state of the LP.
Definition: process.h:88
LP state management.
Global Virtual Time.
void insert_bottom_half(msg_t *msg)
Definition: queues.c:115
msg_t * advance_to_next_event(struct lp_struct *lp)
Definition: queues.c:93
#define list_next(ptr)
Definition: list.h:88
__thread unsigned int local_tid
Definition: thread.c:72
#define list_tail(list)
Definition: list.h:81
Distributed GVT Support module.
unsigned int worker_thread
ID of the worker thread towards which the LP is bound.
Definition: process.h:85
#define unlikely(exp)
Optimize the branch as likely not taken.
Definition: core.h:74