The ROme OpTimistic Simulator  2.0.0
A General-Purpose Multithreaded Parallel/Distributed Simulation Platform
wnd.c
Go to the documentation of this file.
1 
39 #ifdef HAVE_MPI
40 
42 #include <communication/wnd.h>
43 #include <communication/mpi.h>
44 
55 
57 static int n_queues = 0;
58 
59 
68 static size_t outgoing_queues_size(void)
69 {
70  int i;
71  size_t size = 0;
72  for (i = 0; i < n_queues; i++) {
73  size += list_sizeof(outgoing_queues[i].queue);
74  }
75  return size;
76 }
77 
78 
87 {
88  n_queues = n_ker;
89  outgoing_queues = rsalloc(n_queues * sizeof(outgoing_queue));
90  outgoing_queue *oq;
91  int i;
92  for (i = 0; i < n_queues; i++) {
93  oq = outgoing_queues + i;
94  spinlock_init(&(oq->lock));
95  oq->queue = new_list(outgoing_msg);
96  }
97 }
98 
99 
108 {
110  size_t pending_out_msgs = outgoing_queues_size();
111 
112  if (unlikely(pending_out_msgs > 0)) {
113  rootsim_error(true, "Outgoing queues not empty on exit: %zu\n", outgoing_queues_size());
114  }
115 
116  // TODO: release also lists allocated via new_list in outgoing_window_init()
117  rsfree(outgoing_queues);
118 }
119 
120 
131 {
132  return rsalloc(sizeof(outgoing_msg));
133 }
134 
135 
148 static inline bool is_msg_delivered(outgoing_msg *msg)
149 {
150  return is_request_completed(&(msg->req));
151 }
152 
153 
167 void store_outgoing_msg(outgoing_msg *out_msg, unsigned int dest_kid)
168 {
169  outgoing_queue *oq = &outgoing_queues[dest_kid];
170 
171  spin_lock(&(oq->lock));
172  list_insert_tail(oq->queue, out_msg);
173  spin_unlock(&(oq->lock));
174 }
175 
176 
191 {
192  int pruned = 0;
193 
194  spin_lock(&(oq->lock));
195 
196  outgoing_msg *msg = list_head(oq->queue);
197 
198  // check all the outgoing messages in the queue starting from the
199  // head (the entry with the minimum timestamp) and delete them
200  // if they have been already delivered
201  while (msg != NULL && is_msg_delivered(msg)) {
202  msg_release(msg->msg);
203 
204  list_delete_by_content(oq->queue, msg);
205  pruned++;
206  msg = list_head(oq->queue);
207  }
208 
209  spin_unlock(&(oq->lock));
210 
211  return pruned;
212 }
213 
214 
226 {
227  int i;
228  int pruned = 0;
229  for (i = 0; i < n_queues; i++) {
230  pruned += prune_outgoing_queue(outgoing_queues + i);
231  }
232  return pruned;
233 }
234 
235 #endif
static int n_queues
The number of outgoing queues which are managed by the submodule.
Definition: wnd.c:57
Communication Routines.
int prune_outgoing_queues(void)
Prune all outgoing queues.
Definition: wnd.c:225
bool is_request_completed(MPI_Request *req)
check if an MPI request has been completed
Definition: mpi.c:144
The structure representing a node in the outgoing_queue list.
Definition: wnd.h:41
#define spinlock_init(s)
Spinlock initialization.
Definition: atomic.h:72
static int prune_outgoing_queue(outgoing_queue *oq)
Prune an outgoing queue.
Definition: wnd.c:190
void store_outgoing_msg(outgoing_msg *out_msg, unsigned int dest_kid)
Store an outgoing message.
Definition: wnd.c:167
spinlock_t lock
A lock used to protect access to the actual queue.
Definition: wnd.h:51
static bool is_msg_delivered(outgoing_msg *msg)
Check if a message has been delivered.
Definition: wnd.c:148
#define new_list(type)
Definition: list.h:59
void spin_lock(spinlock_t *s)
Definition: x86.c:135
MPI_Request req
The MPI Request used to keep track of the delivery operation.
Definition: wnd.h:42
outgoing_msg * allocate_outgoing_msg(void)
Allocate a buffer for an outgoing message node.
Definition: wnd.c:130
#define list_head(list)
Definition: list.h:74
void outgoing_window_init(void)
Outgoing queue initialization.
Definition: wnd.c:86
void msg_release(msg_t *msg)
Release a message buffer.
MPI Support Module.
An outgoing queue, to keep track of pending MPI-based message delivery.
Definition: wnd.h:50
msg_t * msg
A pointer to the msg_t which MPI is delivering.
Definition: wnd.h:45
static size_t outgoing_queues_size(void)
Compute the size of all outgoing queues.
Definition: wnd.c:68
Message delivery support.
void outgoing_window_finalize(void)
Finalize the message delivery subsystem.
Definition: wnd.c:107
static outgoing_queue * outgoing_queues
Definition: wnd.c:54
unsigned int n_ker
Total number of simulation kernel instances running.
Definition: core.c:58
#define unlikely(exp)
Optimize the branch as likely not taken.
Definition: core.h:74
void spin_unlock(spinlock_t *s)
Definition: x86.c:161