A Discrete-Event Network Simulator
API
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
distributed-simulator-impl.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  *
18  * Some updates to this code were developed under a research contract
19  * sponsored by the Army Research Laboratory. (April 30, 2013)
20  */
21 
23 #include "mpi-interface.h"
24 
25 #include "ns3/simulator.h"
26 #include "ns3/scheduler.h"
27 #include "ns3/event-impl.h"
28 #include "ns3/channel.h"
29 #include "ns3/node-container.h"
30 #include "ns3/ptr.h"
31 #include "ns3/pointer.h"
32 #include "ns3/assert.h"
33 #include "ns3/log.h"
34 
35 #include <cmath>
36 
37 #ifdef NS3_MPI
38 #include <mpi.h>
39 #endif
40 
41 NS_LOG_COMPONENT_DEFINE ("DistributedSimulatorImpl");
42 
43 namespace ns3 {
44 
45 NS_OBJECT_ENSURE_REGISTERED (DistributedSimulatorImpl);
46 
48 {
49 }
50 
51 Time
53 {
54  return m_smallestTime;
55 }
56 
57 uint32_t
59 {
60  return m_txCount;
61 }
62 
63 uint32_t
65 {
66  return m_rxCount;
67 }
68 uint32_t
70 {
71  return m_myId;
72 }
73 
74 bool
76 {
77  return m_isFinished;
78 }
79 
81 
82 TypeId
84 {
85  static TypeId tid = TypeId ("ns3::DistributedSimulatorImpl")
86  .SetParent<Object> ()
87  .AddConstructor<DistributedSimulatorImpl> ()
88  ;
89  return tid;
90 }
91 
93 {
94 #ifdef NS3_MPI
97 
98  // Allocate the LBTS message buffer
100  m_grantedTime = Seconds (0);
101 #else
102  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
103 #endif
104 
105  m_stop = false;
106  // uids are allocated from 4.
107  // uid 0 is "invalid" events
108  // uid 1 is "now" events
109  // uid 2 is "destroy" events
110  m_uid = 4;
111  // before ::Run is entered, the m_currentUid will be zero
112  m_currentUid = 0;
113  m_currentTs = 0;
114  m_currentContext = 0xffffffff;
116  m_events = 0;
117 }
118 
120 {
121 }
122 
123 void
125 {
126  while (!m_events->IsEmpty ())
127  {
129  next.impl->Unref ();
130  }
131  m_events = 0;
132  delete [] m_pLBTS;
134 }
135 
136 void
138 {
139  while (!m_destroyEvents.empty ())
140  {
141  Ptr<EventImpl> ev = m_destroyEvents.front ().PeekEventImpl ();
142  m_destroyEvents.pop_front ();
143  NS_LOG_LOGIC ("handle destroy " << ev);
144  if (!ev->IsCancelled ())
145  {
146  ev->Invoke ();
147  }
148  }
149 
151 }
152 
153 
154 void
156 {
157 #ifdef NS3_MPI
158  if (MpiInterface::GetSize () <= 1)
159  {
161  m_grantedTime = Seconds (0);
162  }
163  else
164  {
167 
169  for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter)
170  {
171  if ((*iter)->GetSystemId () != MpiInterface::GetSystemId ())
172  {
173  continue;
174  }
175 
176  for (uint32_t i = 0; i < (*iter)->GetNDevices (); ++i)
177  {
178  Ptr<NetDevice> localNetDevice = (*iter)->GetDevice (i);
179  // only works for p2p links currently
180  if (!localNetDevice->IsPointToPoint ())
181  {
182  continue;
183  }
184  Ptr<Channel> channel = localNetDevice->GetChannel ();
185  if (channel == 0)
186  {
187  continue;
188  }
189 
190  // grab the adjacent node
191  Ptr<Node> remoteNode;
192  if (channel->GetDevice (0) == localNetDevice)
193  {
194  remoteNode = (channel->GetDevice (1))->GetNode ();
195  }
196  else
197  {
198  remoteNode = (channel->GetDevice (0))->GetNode ();
199  }
200 
201  // if it's not remote, don't consider it
202  if (remoteNode->GetSystemId () == MpiInterface::GetSystemId ())
203  {
204  continue;
205  }
206 
207  // compare delay on the channel with current value of
208  // m_lookAhead. if delay on channel is smaller, make
209  // it the new lookAhead.
210  TimeValue delay;
211  channel->GetAttribute ("Delay", delay);
212 
214  {
216  m_grantedTime = delay.Get ();
217  }
218  }
219  }
220  }
221 
222  /*
223  * Compute the maximum inter-task latency and use that value
224  * for tasks with no inter-task links.
225  *
226  * Special processing for edge cases. For tasks that have no
227  * nodes need to determine a reasonable lookAhead value. Infinity
228  * would work correctly but introduces a performance issue; tasks
229  * with an infinite lookAhead would execute all their events
230  * before doing an AllGather resulting in very bad load balance
231  * during the first time window. Since all tasks participate in
232  * the AllGather it is desirable to have all the tasks advance in
233  * simulation time at a similar rate assuming roughly equal events
234  * per unit of simulation time in order to equalize the amount of
235  * work per time window.
236  */
237  long sendbuf;
238  long recvbuf;
239 
240  /* Tasks with no inter-task links do not contribute to max */
242  {
243  sendbuf = 0;
244  }
245  else
246  {
247  sendbuf = m_lookAhead.GetInteger ();
248  }
249 
250  MPI_Allreduce (&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MPI_COMM_WORLD);
251 
252  /* For nodes that did not compute a lookahead use max from ranks
253  * that did compute a value. An edge case occurs if all nodes have
254  * no inter-task links (max will be 0 in this case). Use infinity so all tasks
255  * will proceed without synchronization until a single AllGather
256  * occurs when all tasks have finished.
257  */
258  if (m_lookAhead == GetMaximumSimulationTime () && recvbuf != 0)
259  {
260  m_lookAhead = Time (recvbuf);
262  }
263 
264 #else
265  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
266 #endif
267 }
268 
269 void
271 {
272  Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
273 
274  if (m_events != 0)
275  {
276  while (!m_events->IsEmpty ())
277  {
279  scheduler->Insert (next);
280  }
281  }
282  m_events = scheduler;
283 }
284 
285 void
287 {
289 
290  NS_ASSERT (next.key.m_ts >= m_currentTs);
292 
293  NS_LOG_LOGIC ("handle " << next.key.m_ts);
294  m_currentTs = next.key.m_ts;
296  m_currentUid = next.key.m_uid;
297  next.impl->Invoke ();
298  next.impl->Unref ();
299 }
300 
301 bool
303 {
304  return m_globalFinished;
305 }
306 
307 bool
309 {
310  return m_events->IsEmpty () || m_stop;
311 }
312 
313 uint64_t
315 {
316  // If local MPI task is has no more events or stop was called
317  // next event time is infinity.
318  if (IsLocalFinished ()) {
320  } else {
322  return ev.key.m_ts;
323  }
324 }
325 
326 Time
328 {
329  return TimeStep (NextTs ());
330 }
331 
332 void
334 {
335 #ifdef NS3_MPI
337  m_stop = false;
338  while (!m_globalFinished)
339  {
340  Time nextTime = Next ();
341 
342  // If local event is beyond grantedTime then need to synchronize
343  // with other tasks to determine new time window. If local task
344  // is finished then continue to participate in allgather
345  // synchronizations with other tasks until all tasks have
346  // completed.
347  if (nextTime > m_grantedTime || IsLocalFinished () )
348  {
349 
350  // Can't process next event, calculate a new LBTS
351  // First receive any pending messages
353  // reset next time
354  nextTime = Next ();
355  // And check for send completes
357  // Finally calculate the lbts
359  m_pLBTS[m_myId] = lMsg;
360  MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS,
361  sizeof (LbtsMessage), MPI_BYTE, MPI_COMM_WORLD);
362  Time smallestTime = m_pLBTS[0].GetSmallestTime ();
363  // The totRx and totTx counts insure there are no transient
364  // messages; If totRx != totTx, there are transients,
365  // so we don't update the granted time.
366  uint32_t totRx = m_pLBTS[0].GetRxCount ();
367  uint32_t totTx = m_pLBTS[0].GetTxCount ();
369 
370  for (uint32_t i = 1; i < m_systemCount; ++i)
371  {
372  if (m_pLBTS[i].GetSmallestTime () < smallestTime)
373  {
374  smallestTime = m_pLBTS[i].GetSmallestTime ();
375  }
376  totRx += m_pLBTS[i].GetRxCount ();
377  totTx += m_pLBTS[i].GetTxCount ();
379  }
380  if (totRx == totTx)
381  {
382  // If lookahead is infinite then granted time should be as well.
383  // Covers the edge case if all the tasks have no inter tasks
384  // links, prevents overflow of granted time.
386  {
388  }
389  else
390  {
391  // Overflow is possible here if near end of representable time.
392  m_grantedTime = smallestTime + m_lookAhead;
393  }
394  }
395  }
396 
397  // Execute next event if it is within the current time window.
398  // Local task may be completed.
399  if ( (nextTime <= m_grantedTime) && (!IsLocalFinished ()) )
400  { // Safe to process
401  ProcessOneEvent ();
402  }
403  }
404 
405  // If the simulator stopped naturally by lack of events, make a
406  // consistency test to check that we didn't lose any events along the way.
408 #else
409  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
410 #endif
411 }
412 
414 {
415  return m_myId;
416 }
417 
418 void
420 {
421  m_stop = true;
422 }
423 
424 void
426 {
428 }
429 
430 //
431 // Schedule an event for a _relative_ time in the future.
432 //
433 EventId
435 {
436  Time tAbsolute = time + TimeStep (m_currentTs);
437 
438  NS_ASSERT (tAbsolute.IsPositive ());
439  NS_ASSERT (tAbsolute >= TimeStep (m_currentTs));
440  Scheduler::Event ev;
441  ev.impl = event;
442  ev.key.m_ts = static_cast<uint64_t> (tAbsolute.GetTimeStep ());
443  ev.key.m_context = GetContext ();
444  ev.key.m_uid = m_uid;
445  m_uid++;
447  m_events->Insert (ev);
448  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
449 }
450 
451 void
452 DistributedSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &time, EventImpl *event)
453 {
454  NS_LOG_FUNCTION (this << context << time.GetTimeStep () << m_currentTs << event);
455 
456  Scheduler::Event ev;
457  ev.impl = event;
458  ev.key.m_ts = m_currentTs + time.GetTimeStep ();
459  ev.key.m_context = context;
460  ev.key.m_uid = m_uid;
461  m_uid++;
463  m_events->Insert (ev);
464 }
465 
466 EventId
468 {
469  Scheduler::Event ev;
470  ev.impl = event;
471  ev.key.m_ts = m_currentTs;
472  ev.key.m_context = GetContext ();
473  ev.key.m_uid = m_uid;
474  m_uid++;
476  m_events->Insert (ev);
477  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
478 }
479 
480 EventId
482 {
483  EventId id (Ptr<EventImpl> (event, false), m_currentTs, 0xffffffff, 2);
484  m_destroyEvents.push_back (id);
485  m_uid++;
486  return id;
487 }
488 
489 Time
491 {
492  return TimeStep (m_currentTs);
493 }
494 
495 Time
497 {
498  if (IsExpired (id))
499  {
500  return TimeStep (0);
501  }
502  else
503  {
504  return TimeStep (id.GetTs () - m_currentTs);
505  }
506 }
507 
508 void
510 {
511  if (id.GetUid () == 2)
512  {
513  // destroy events.
514  for (DestroyEvents::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
515  {
516  if (*i == id)
517  {
518  m_destroyEvents.erase (i);
519  break;
520  }
521  }
522  return;
523  }
524  if (IsExpired (id))
525  {
526  return;
527  }
528  Scheduler::Event event;
529  event.impl = id.PeekEventImpl ();
530  event.key.m_ts = id.GetTs ();
531  event.key.m_context = id.GetContext ();
532  event.key.m_uid = id.GetUid ();
533  m_events->Remove (event);
534  event.impl->Cancel ();
535  // whenever we remove an event from the event list, we have to unref it.
536  event.impl->Unref ();
537 
539 }
540 
541 void
543 {
544  if (!IsExpired (id))
545  {
546  id.PeekEventImpl ()->Cancel ();
547  }
548 }
549 
550 bool
552 {
553  if (ev.GetUid () == 2)
554  {
555  if (ev.PeekEventImpl () == 0
556  || ev.PeekEventImpl ()->IsCancelled ())
557  {
558  return true;
559  }
560  // destroy events.
561  for (DestroyEvents::const_iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
562  {
563  if (*i == ev)
564  {
565  return false;
566  }
567  }
568  return true;
569  }
570  if (ev.PeekEventImpl () == 0
571  || ev.GetTs () < m_currentTs
572  || (ev.GetTs () == m_currentTs
573  && ev.GetUid () <= m_currentUid)
574  || ev.PeekEventImpl ()->IsCancelled ())
575  {
576  return true;
577  }
578  else
579  {
580  return false;
581  }
582 }
583 
584 Time
586 {
589  return TimeStep (0x7fffffffffffffffLL);
590 }
591 
592 uint32_t
594 {
595  return m_currentContext;
596 }
597 
598 } // namespace ns3