A Discrete-Event Network Simulator
API
distributed-simulator-impl.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  *
18  */
19 
22 #include "mpi-interface.h"
23 
24 #include "ns3/simulator.h"
25 #include "ns3/scheduler.h"
26 #include "ns3/event-impl.h"
27 #include "ns3/channel.h"
28 #include "ns3/node-container.h"
29 #include "ns3/ptr.h"
30 #include "ns3/pointer.h"
31 #include "ns3/assert.h"
32 #include "ns3/log.h"
33 
34 #include <cmath>
35 
36 #ifdef NS3_MPI
37 #include <mpi.h>
38 #endif
39 
40 namespace ns3 {
41 
42 NS_LOG_COMPONENT_DEFINE ("DistributedSimulatorImpl");
43 
44 NS_OBJECT_ENSURE_REGISTERED (DistributedSimulatorImpl);
45 
47 {
48 }
49 
50 Time
52 {
53  return m_smallestTime;
54 }
55 
56 uint32_t
58 {
59  return m_txCount;
60 }
61 
62 uint32_t
64 {
65  return m_rxCount;
66 }
67 uint32_t
69 {
70  return m_myId;
71 }
72 
73 bool
75 {
76  return m_isFinished;
77 }
78 
80 
81 TypeId
83 {
84  static TypeId tid = TypeId ("ns3::DistributedSimulatorImpl")
86  .SetGroupName ("Mpi")
87  .AddConstructor<DistributedSimulatorImpl> ()
88  ;
89  return tid;
90 }
91 
93 {
94  NS_LOG_FUNCTION (this);
95 
96 #ifdef NS3_MPI
99 
100  // Allocate the LBTS message buffer
102  m_grantedTime = Seconds (0);
103 #else
105  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
106 #endif
107 
108  m_stop = false;
109  m_globalFinished = false;
110  // uids are allocated from 4.
111  // uid 0 is "invalid" events
112  // uid 1 is "now" events
113  // uid 2 is "destroy" events
114  m_uid = 4;
115  // before ::Run is entered, the m_currentUid will be zero
116  m_currentUid = 0;
117  m_currentTs = 0;
120  m_eventCount = 0;
121  m_events = 0;
122 }
123 
125 {
126  NS_LOG_FUNCTION (this);
127 }
128 
129 void
131 {
132  NS_LOG_FUNCTION (this);
133 
134  while (!m_events->IsEmpty ())
135  {
136  Scheduler::Event next = m_events->RemoveNext ();
137  next.impl->Unref ();
138  }
139  m_events = 0;
140  delete [] m_pLBTS;
142 }
143 
144 void
146 {
147  NS_LOG_FUNCTION (this);
148 
149  while (!m_destroyEvents.empty ())
150  {
151  Ptr<EventImpl> ev = m_destroyEvents.front ().PeekEventImpl ();
152  m_destroyEvents.pop_front ();
153  NS_LOG_LOGIC ("handle destroy " << ev);
154  if (!ev->IsCancelled ())
155  {
156  ev->Invoke ();
157  }
158  }
159 
161 }
162 
163 
164 void
166 {
167  NS_LOG_FUNCTION (this);
168 
169 #ifdef NS3_MPI
170  if (MpiInterface::GetSize () <= 1)
171  {
172  m_lookAhead = Seconds (0);
173  }
174  else
175  {
176  if (m_lookAhead == Seconds (-1))
177  {
179  }
180  // else it was already set by SetLookAhead
181 
183  for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter)
184  {
185  if ((*iter)->GetSystemId () != MpiInterface::GetSystemId ())
186  {
187  continue;
188  }
189 
190  for (uint32_t i = 0; i < (*iter)->GetNDevices (); ++i)
191  {
192  Ptr<NetDevice> localNetDevice = (*iter)->GetDevice (i);
193  // only works for p2p links currently
194  if (!localNetDevice->IsPointToPoint ())
195  {
196  continue;
197  }
198  Ptr<Channel> channel = localNetDevice->GetChannel ();
199  if (channel == 0)
200  {
201  continue;
202  }
203 
204  // grab the adjacent node
205  Ptr<Node> remoteNode;
206  if (channel->GetDevice (0) == localNetDevice)
207  {
208  remoteNode = (channel->GetDevice (1))->GetNode ();
209  }
210  else
211  {
212  remoteNode = (channel->GetDevice (0))->GetNode ();
213  }
214 
215  // if it's not remote, don't consider it
216  if (remoteNode->GetSystemId () == MpiInterface::GetSystemId ())
217  {
218  continue;
219  }
220 
221  // compare delay on the channel with current value of
222  // m_lookAhead. if delay on channel is smaller, make
223  // it the new lookAhead.
224  TimeValue delay;
225  channel->GetAttribute ("Delay", delay);
226 
227  if (delay.Get () < m_lookAhead)
228  {
229  m_lookAhead = delay.Get ();
230  }
231  }
232  }
233  }
234 
235  // m_lookAhead is now set
237 
238  /*
239  * Compute the maximum inter-task latency and use that value
240  * for tasks with no inter-task links.
241  *
242  * Special processing for edge cases. For tasks that have no
243  * nodes need to determine a reasonable lookAhead value. Infinity
244  * would work correctly but introduces a performance issue; tasks
245  * with an infinite lookAhead would execute all their events
246  * before doing an AllGather resulting in very bad load balance
247  * during the first time window. Since all tasks participate in
248  * the AllGather it is desirable to have all the tasks advance in
249  * simulation time at a similar rate assuming roughly equal events
250  * per unit of simulation time in order to equalize the amount of
251  * work per time window.
252  */
253  long sendbuf;
254  long recvbuf;
255 
256  /* Tasks with no inter-task links do not contribute to max */
258  {
259  sendbuf = 0;
260  }
261  else
262  {
263  sendbuf = m_lookAhead.GetInteger ();
264  }
265 
266  MPI_Allreduce (&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MPI_COMM_WORLD);
267 
268  /* For nodes that did not compute a lookahead use max from ranks
269  * that did compute a value. An edge case occurs if all nodes have
270  * no inter-task links (max will be 0 in this case). Use infinity so all tasks
271  * will proceed without synchronization until a single AllGather
272  * occurs when all tasks have finished.
273  */
274  if (m_lookAhead == GetMaximumSimulationTime () && recvbuf != 0)
275  {
276  m_lookAhead = Time (recvbuf);
278  }
279 
280 #else
281  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
282 #endif
283 }
284 
285 void
287 {
288  if (lookAhead > Time (0))
289  {
290  NS_LOG_FUNCTION (this << lookAhead);
291  m_lookAhead = lookAhead;
292  }
293  else
294  {
295  NS_LOG_WARN ("attempted to set look ahead negative: " << lookAhead);
296  }
297 }
298 
299 void
301 {
302  NS_LOG_FUNCTION (this << schedulerFactory);
303 
304  Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
305 
306  if (m_events != 0)
307  {
308  while (!m_events->IsEmpty ())
309  {
310  Scheduler::Event next = m_events->RemoveNext ();
311  scheduler->Insert (next);
312  }
313  }
314  m_events = scheduler;
315 }
316 
317 void
319 {
320  NS_LOG_FUNCTION (this);
321 
322  Scheduler::Event next = m_events->RemoveNext ();
323 
324  NS_ASSERT (next.key.m_ts >= m_currentTs);
326  m_eventCount++;
327 
328  NS_LOG_LOGIC ("handle " << next.key.m_ts);
329  m_currentTs = next.key.m_ts;
331  m_currentUid = next.key.m_uid;
332  next.impl->Invoke ();
333  next.impl->Unref ();
334 }
335 
336 bool
338 {
339  return m_globalFinished;
340 }
341 
342 bool
344 {
345  return m_events->IsEmpty () || m_stop;
346 }
347 
348 uint64_t
350 {
351  // If local MPI task is has no more events or stop was called
352  // next event time is infinity.
353  if (IsLocalFinished ())
354  {
356  }
357  else
358  {
359  Scheduler::Event ev = m_events->PeekNext ();
360  return ev.key.m_ts;
361  }
362 }
363 
364 Time
366 {
367  return TimeStep (NextTs ());
368 }
369 
370 void
372 {
373  NS_LOG_FUNCTION (this);
374 
375 #ifdef NS3_MPI
377  m_stop = false;
378  m_globalFinished = false;
379  while (!m_globalFinished)
380  {
381  Time nextTime = Next ();
382 
383  // If local event is beyond grantedTime then need to synchronize
384  // with other tasks to determine new time window. If local task
385  // is finished then continue to participate in allgather
386  // synchronizations with other tasks until all tasks have
387  // completed.
388  if (nextTime > m_grantedTime || IsLocalFinished () )
389  {
390  // Can't process next event, calculate a new LBTS
391  // First receive any pending messages
393  // reset next time
394  nextTime = Next ();
395  // And check for send completes
397  // Finally calculate the lbts
399  m_myId, IsLocalFinished (), nextTime);
400  m_pLBTS[m_myId] = lMsg;
401  MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS,
402  sizeof (LbtsMessage), MPI_BYTE, MPI_COMM_WORLD);
403  Time smallestTime = m_pLBTS[0].GetSmallestTime ();
404  // The totRx and totTx counts insure there are no transient
405  // messages; If totRx != totTx, there are transients,
406  // so we don't update the granted time.
407  uint32_t totRx = m_pLBTS[0].GetRxCount ();
408  uint32_t totTx = m_pLBTS[0].GetTxCount ();
410 
411  for (uint32_t i = 1; i < m_systemCount; ++i)
412  {
413  if (m_pLBTS[i].GetSmallestTime () < smallestTime)
414  {
415  smallestTime = m_pLBTS[i].GetSmallestTime ();
416  }
417  totRx += m_pLBTS[i].GetRxCount ();
418  totTx += m_pLBTS[i].GetTxCount ();
420  }
421  if (totRx == totTx)
422  {
423  // If lookahead is infinite then granted time should be as well.
424  // Covers the edge case if all the tasks have no inter tasks
425  // links, prevents overflow of granted time.
427  {
429  }
430  else
431  {
432  // Overflow is possible here if near end of representable time.
433  m_grantedTime = smallestTime + m_lookAhead;
434  }
435  }
436  }
437 
438  // Execute next event if it is within the current time window.
439  // Local task may be completed.
440  if ( (nextTime <= m_grantedTime) && (!IsLocalFinished ()) )
441  { // Safe to process
442  ProcessOneEvent ();
443  }
444  }
445 
446  // If the simulator stopped naturally by lack of events, make a
447  // consistency test to check that we didn't lose any events along the way.
448  NS_ASSERT (!m_events->IsEmpty () || m_unscheduledEvents == 0);
449 #else
450  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
451 #endif
452 }
453 
455 {
456  return m_myId;
457 }
458 
459 void
461 {
462  NS_LOG_FUNCTION (this);
463 
464  m_stop = true;
465 }
466 
467 void
469 {
470  NS_LOG_FUNCTION (this << delay.GetTimeStep ());
471 
473 }
474 
475 //
476 // Schedule an event for a _relative_ time in the future.
477 //
478 EventId
480 {
481  NS_LOG_FUNCTION (this << delay.GetTimeStep () << event);
482 
483  Time tAbsolute = delay + TimeStep (m_currentTs);
484 
485  NS_ASSERT (tAbsolute.IsPositive ());
486  NS_ASSERT (tAbsolute >= TimeStep (m_currentTs));
487  Scheduler::Event ev;
488  ev.impl = event;
489  ev.key.m_ts = static_cast<uint64_t> (tAbsolute.GetTimeStep ());
490  ev.key.m_context = GetContext ();
491  ev.key.m_uid = m_uid;
492  m_uid++;
494  m_events->Insert (ev);
495  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
496 }
497 
498 void
499 DistributedSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &delay, EventImpl *event)
500 {
501  NS_LOG_FUNCTION (this << context << delay.GetTimeStep () << m_currentTs << event);
502 
503  Scheduler::Event ev;
504  ev.impl = event;
505  ev.key.m_ts = m_currentTs + delay.GetTimeStep ();
506  ev.key.m_context = context;
507  ev.key.m_uid = m_uid;
508  m_uid++;
510  m_events->Insert (ev);
511 }
512 
513 EventId
515 {
516  NS_LOG_FUNCTION (this << event);
517 
518  Scheduler::Event ev;
519  ev.impl = event;
520  ev.key.m_ts = m_currentTs;
521  ev.key.m_context = GetContext ();
522  ev.key.m_uid = m_uid;
523  m_uid++;
525  m_events->Insert (ev);
526  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
527 }
528 
529 EventId
531 {
532  NS_LOG_FUNCTION (this << event);
533 
534  EventId id (Ptr<EventImpl> (event, false), m_currentTs, 0xffffffff, 2);
535  m_destroyEvents.push_back (id);
536  m_uid++;
537  return id;
538 }
539 
540 Time
542 {
543  return TimeStep (m_currentTs);
544 }
545 
546 Time
548 {
549  if (IsExpired (id))
550  {
551  return TimeStep (0);
552  }
553  else
554  {
555  return TimeStep (id.GetTs () - m_currentTs);
556  }
557 }
558 
559 void
561 {
562  if (id.GetUid () == 2)
563  {
564  // destroy events.
565  for (DestroyEvents::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
566  {
567  if (*i == id)
568  {
569  m_destroyEvents.erase (i);
570  break;
571  }
572  }
573  return;
574  }
575  if (IsExpired (id))
576  {
577  return;
578  }
579  Scheduler::Event event;
580  event.impl = id.PeekEventImpl ();
581  event.key.m_ts = id.GetTs ();
582  event.key.m_context = id.GetContext ();
583  event.key.m_uid = id.GetUid ();
584  m_events->Remove (event);
585  event.impl->Cancel ();
586  // whenever we remove an event from the event list, we have to unref it.
587  event.impl->Unref ();
588 
590 }
591 
592 void
594 {
595  if (!IsExpired (id))
596  {
597  id.PeekEventImpl ()->Cancel ();
598  }
599 }
600 
601 bool
603 {
604  if (id.GetUid () == 2)
605  {
606  if (id.PeekEventImpl () == 0
607  || id.PeekEventImpl ()->IsCancelled ())
608  {
609  return true;
610  }
611  // destroy events.
612  for (DestroyEvents::const_iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
613  {
614  if (*i == id)
615  {
616  return false;
617  }
618  }
619  return true;
620  }
621  if (id.PeekEventImpl () == 0
622  || id.GetTs () < m_currentTs
623  || (id.GetTs () == m_currentTs
624  && id.GetUid () <= m_currentUid)
625  || id.PeekEventImpl ()->IsCancelled ())
626  {
627  return true;
628  }
629  else
630  {
631  return false;
632  }
633 }
634 
635 Time
637 {
640  return TimeStep (0x7fffffffffffffffLL);
641 }
642 
643 uint32_t
645 {
646  return m_currentContext;
647 }
648 
649 uint64_t
651 {
652  return m_eventCount;
653 }
654 
655 } // namespace ns3
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:102
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:73
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
virtual void SetScheduler(ObjectFactory schedulerFactory)
Set the Scheduler to be used to manage the event list.
virtual uint32_t GetContext(void) const
Get the current simulation context.
void Unref(void) const
Decrement the reference count.
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:45
int64_t GetInteger(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:399
std::vector< Ptr< Node > >::const_iterator Iterator
Node container iterator.
uint64_t m_ts
Event time stamp.
Definition: scheduler.h:81
virtual EventId Schedule(Time const &delay, EventImpl *event)
Schedule a future event execution (in the same context).
EventImpl * impl
Pointer to the event implementation.
Definition: scheduler.h:94
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
Definition: assert.h:67
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:204
virtual EventId ScheduleDestroy(EventImpl *event)
Schedule an event to run at the end of the simulation, after the Stop() time or condition has been re...
#define NS_UNUSED(x)
Mark a local variable as unused.
Definition: unused.h:36
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:162
void(* Time)(Time oldValue, Time newValue)
TracedValue callback signature for Time.
Definition: nstime.h:743
virtual void DoDispose(void)
Destructor implementation.
Definition: object.cc:346
virtual void SetMaximumLookAhead(const Time lookAhead)
Iterator End(void) const
Get an iterator which indicates past-the-last Node in the container.
uint64_t m_eventCount
The event count.
virtual Time GetDelayLeft(const EventId &id) const
Get the remaining time until this event will execute.
virtual void DoDispose(void)
Destructor implementation.
channel
Definition: third.py:85
void Invoke(void)
Called by the simulation engine to notify the event that it is time to execute.
Definition: event-impl.cc:46
virtual Time GetMaximumSimulationTime(void) const
Get the maximum representable simulation time.
static void TestSendComplete()
Check for completed sends.
static EventId Schedule(Time const &delay, MEM mem_ptr, OBJ obj)
Schedule an event to expire after delay.
Definition: simulator.h:1389
EventKey key
Key for sorting and ordering Events.
Definition: scheduler.h:95
virtual bool IsFinished(void) const
Check if the simulation should finish.
static void Destroy()
Deletes storage used by the parallel environment.
AttributeValue implementation for Time.
Definition: nstime.h:1124
Ptr< Object > Create(void) const
Create an Object instance of the configured TypeId.
uint32_t m_uid
Event unique id.
Definition: scheduler.h:82
virtual void Cancel(const EventId &id)
Set the cancel bit on this event: the event&#39;s associated function will not be invoked when it expires...
virtual EventId ScheduleNow(EventImpl *event)
Schedule an event to run at the current virtual time.
virtual uint64_t GetEventCount(void) const
Get the number of events executed.
Maintain the event list.
Definition: scheduler.h:66
virtual bool IsExpired(const EventId &id) const
Check if an event has already run or been cancelled.
Scheduler event.
Definition: scheduler.h:92
Distributed simulator implementation using lookahead.
Every class exported by the ns3 library is enclosed in the ns3 namespace.
keep track of a set of node pointers.
virtual uint32_t GetSystemId(void) const
Get the system id of this simulator.
virtual void Destroy()
Execute the events scheduled with ScheduleDestroy().
uint32_t GetSystemId(void) const
Definition: node.cc:121
Time TimeStep(uint64_t ts)
Definition: nstime.h:1119
NS_LOG_LOGIC("Net device "<< nd<< " is not bridged")
Structure used for all-reduce LBTS computation.
static NodeContainer GetGlobal(void)
Create a NodeContainer that contains a list of all nodes created through NodeContainer::Create() and ...
virtual void Remove(const EventId &id)
Remove an event from the event list.
Instantiate subclasses of ns3::Object.
A simulation event.
Definition: event-impl.h:44
static void ReceiveMessages()
Check for received messages complete.
An identifier for simulation events.
Definition: event-id.h:53
static uint32_t GetSystemId()
#define NS_LOG_WARN(msg)
Use NS_LOG to output a message of level LOG_WARN.
Definition: log.h:264
static void Stop(void)
Tell the Simulator the calling event should be the last one executed.
Definition: simulator.cc:178
Time Get(void) const
Definition: time.cc:443
Time Seconds(double value)
Construct a Time in the indicated unit.
Definition: nstime.h:1062
Flag for events not associated with any particular context.
Definition: simulator.h:198
virtual void ScheduleWithContext(uint32_t context, Time const &delay, EventImpl *event)
Schedule a future event execution (in a different context).
virtual Time Now(void) const
Return the current simulation virtual time.
virtual void Run(void)
Run the simulation.
virtual void Stop(void)
Tell the Simulator the calling event should be the last one executed.
a unique identifier for an interface.
Definition: type-id.h:58
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:915
static uint32_t GetSize()
uint32_t m_context
Event context.
Definition: scheduler.h:83
Iterator Begin(void) const
Get an iterator which refers to the first Node in the container.
The SimulatorImpl base class.
int64_t GetTimeStep(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:391