A Discrete-Event Network Simulator
API
distributed-simulator-impl.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  *
18  */
19 
22 #include "mpi-interface.h"
23 
24 #include "ns3/simulator.h"
25 #include "ns3/scheduler.h"
26 #include "ns3/event-impl.h"
27 #include "ns3/channel.h"
28 #include "ns3/node-container.h"
29 #include "ns3/ptr.h"
30 #include "ns3/pointer.h"
31 #include "ns3/assert.h"
32 #include "ns3/log.h"
33 
34 #include <cmath>
35 
36 #ifdef NS3_MPI
37 #include <mpi.h>
38 #endif
39 
40 namespace ns3 {
41 
42 NS_LOG_COMPONENT_DEFINE ("DistributedSimulatorImpl");
43 
44 NS_OBJECT_ENSURE_REGISTERED (DistributedSimulatorImpl);
45 
47 {
48 }
49 
50 Time
52 {
53  return m_smallestTime;
54 }
55 
56 uint32_t
58 {
59  return m_txCount;
60 }
61 
62 uint32_t
64 {
65  return m_rxCount;
66 }
67 uint32_t
69 {
70  return m_myId;
71 }
72 
73 bool
75 {
76  return m_isFinished;
77 }
78 
80 
81 TypeId
83 {
84  static TypeId tid = TypeId ("ns3::DistributedSimulatorImpl")
85  .SetParent<Object> ()
86  .AddConstructor<DistributedSimulatorImpl> ()
87  ;
88  return tid;
89 }
90 
92 {
93  NS_LOG_FUNCTION (this);
94 
95 #ifdef NS3_MPI
98 
99  // Allocate the LBTS message buffer
101  m_grantedTime = Seconds (0);
102 #else
104  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
105 #endif
106 
107  m_stop = false;
108  m_globalFinished = false;
109  // uids are allocated from 4.
110  // uid 0 is "invalid" events
111  // uid 1 is "now" events
112  // uid 2 is "destroy" events
113  m_uid = 4;
114  // before ::Run is entered, the m_currentUid will be zero
115  m_currentUid = 0;
116  m_currentTs = 0;
117  m_currentContext = 0xffffffff;
119  m_events = 0;
120 }
121 
123 {
124  NS_LOG_FUNCTION (this);
125 }
126 
127 void
129 {
130  NS_LOG_FUNCTION (this);
131 
132  while (!m_events->IsEmpty ())
133  {
134  Scheduler::Event next = m_events->RemoveNext ();
135  next.impl->Unref ();
136  }
137  m_events = 0;
138  delete [] m_pLBTS;
140 }
141 
142 void
144 {
145  NS_LOG_FUNCTION (this);
146 
147  while (!m_destroyEvents.empty ())
148  {
149  Ptr<EventImpl> ev = m_destroyEvents.front ().PeekEventImpl ();
150  m_destroyEvents.pop_front ();
151  NS_LOG_LOGIC ("handle destroy " << ev);
152  if (!ev->IsCancelled ())
153  {
154  ev->Invoke ();
155  }
156  }
157 
159 }
160 
161 
162 void
164 {
165  NS_LOG_FUNCTION (this);
166 
167 #ifdef NS3_MPI
168  if (MpiInterface::GetSize () <= 1)
169  {
170  m_lookAhead = Seconds (0);
171  }
172  else
173  {
174  if (m_lookAhead == Seconds (-1))
175  {
177  }
178  // else it was already set by SetLookAhead
179 
181  for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter)
182  {
183  if ((*iter)->GetSystemId () != MpiInterface::GetSystemId ())
184  {
185  continue;
186  }
187 
188  for (uint32_t i = 0; i < (*iter)->GetNDevices (); ++i)
189  {
190  Ptr<NetDevice> localNetDevice = (*iter)->GetDevice (i);
191  // only works for p2p links currently
192  if (!localNetDevice->IsPointToPoint ())
193  {
194  continue;
195  }
196  Ptr<Channel> channel = localNetDevice->GetChannel ();
197  if (channel == 0)
198  {
199  continue;
200  }
201 
202  // grab the adjacent node
203  Ptr<Node> remoteNode;
204  if (channel->GetDevice (0) == localNetDevice)
205  {
206  remoteNode = (channel->GetDevice (1))->GetNode ();
207  }
208  else
209  {
210  remoteNode = (channel->GetDevice (0))->GetNode ();
211  }
212 
213  // if it's not remote, don't consider it
214  if (remoteNode->GetSystemId () == MpiInterface::GetSystemId ())
215  {
216  continue;
217  }
218 
219  // compare delay on the channel with current value of
220  // m_lookAhead. if delay on channel is smaller, make
221  // it the new lookAhead.
222  TimeValue delay;
223  channel->GetAttribute ("Delay", delay);
224 
225  if (delay.Get () < m_lookAhead)
226  {
227  m_lookAhead = delay.Get ();
228  }
229  }
230  }
231  }
232 
233  // m_lookAhead is now set
235 
236  /*
237  * Compute the maximum inter-task latency and use that value
238  * for tasks with no inter-task links.
239  *
240  * Special processing for edge cases. For tasks that have no
241  * nodes need to determine a reasonable lookAhead value. Infinity
242  * would work correctly but introduces a performance issue; tasks
243  * with an infinite lookAhead would execute all their events
244  * before doing an AllGather resulting in very bad load balance
245  * during the first time window. Since all tasks participate in
246  * the AllGather it is desirable to have all the tasks advance in
247  * simulation time at a similar rate assuming roughly equal events
248  * per unit of simulation time in order to equalize the amount of
249  * work per time window.
250  */
251  long sendbuf;
252  long recvbuf;
253 
254  /* Tasks with no inter-task links do not contribute to max */
256  {
257  sendbuf = 0;
258  }
259  else
260  {
261  sendbuf = m_lookAhead.GetInteger ();
262  }
263 
264  MPI_Allreduce (&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MPI_COMM_WORLD);
265 
266  /* For nodes that did not compute a lookahead use max from ranks
267  * that did compute a value. An edge case occurs if all nodes have
268  * no inter-task links (max will be 0 in this case). Use infinity so all tasks
269  * will proceed without synchronization until a single AllGather
270  * occurs when all tasks have finished.
271  */
272  if (m_lookAhead == GetMaximumSimulationTime () && recvbuf != 0)
273  {
274  m_lookAhead = Time (recvbuf);
276  }
277 
278 #else
279  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
280 #endif
281 }
282 
283 void
285 {
286  if (lookAhead > 0)
287  {
288  NS_LOG_FUNCTION (this << lookAhead);
289  m_lookAhead = lookAhead;
290  }
291  else
292  {
293  NS_LOG_WARN ("attempted to set look ahead negative: " << lookAhead);
294  }
295 }
296 
297 void
299 {
300  NS_LOG_FUNCTION (this << schedulerFactory);
301 
302  Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
303 
304  if (m_events != 0)
305  {
306  while (!m_events->IsEmpty ())
307  {
308  Scheduler::Event next = m_events->RemoveNext ();
309  scheduler->Insert (next);
310  }
311  }
312  m_events = scheduler;
313 }
314 
315 void
317 {
318  NS_LOG_FUNCTION (this);
319 
320  Scheduler::Event next = m_events->RemoveNext ();
321 
322  NS_ASSERT (next.key.m_ts >= m_currentTs);
324 
325  NS_LOG_LOGIC ("handle " << next.key.m_ts);
326  m_currentTs = next.key.m_ts;
328  m_currentUid = next.key.m_uid;
329  next.impl->Invoke ();
330  next.impl->Unref ();
331 }
332 
333 bool
335 {
336  return m_globalFinished;
337 }
338 
339 bool
341 {
342  return m_events->IsEmpty () || m_stop;
343 }
344 
345 uint64_t
347 {
348  // If local MPI task is has no more events or stop was called
349  // next event time is infinity.
350  if (IsLocalFinished ())
351  {
353  }
354  else
355  {
356  Scheduler::Event ev = m_events->PeekNext ();
357  return ev.key.m_ts;
358  }
359 }
360 
361 Time
363 {
364  return TimeStep (NextTs ());
365 }
366 
367 void
369 {
370  NS_LOG_FUNCTION (this);
371 
372 #ifdef NS3_MPI
374  m_stop = false;
375  while (!m_globalFinished)
376  {
377  Time nextTime = Next ();
378 
379  // If local event is beyond grantedTime then need to synchronize
380  // with other tasks to determine new time window. If local task
381  // is finished then continue to participate in allgather
382  // synchronizations with other tasks until all tasks have
383  // completed.
384  if (nextTime > m_grantedTime || IsLocalFinished () )
385  {
386  // Can't process next event, calculate a new LBTS
387  // First receive any pending messages
389  // reset next time
390  nextTime = Next ();
391  // And check for send completes
393  // Finally calculate the lbts
395  m_myId, IsLocalFinished (), nextTime);
396  m_pLBTS[m_myId] = lMsg;
397  MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS,
398  sizeof (LbtsMessage), MPI_BYTE, MPI_COMM_WORLD);
399  Time smallestTime = m_pLBTS[0].GetSmallestTime ();
400  // The totRx and totTx counts insure there are no transient
401  // messages; If totRx != totTx, there are transients,
402  // so we don't update the granted time.
403  uint32_t totRx = m_pLBTS[0].GetRxCount ();
404  uint32_t totTx = m_pLBTS[0].GetTxCount ();
406 
407  for (uint32_t i = 1; i < m_systemCount; ++i)
408  {
409  if (m_pLBTS[i].GetSmallestTime () < smallestTime)
410  {
411  smallestTime = m_pLBTS[i].GetSmallestTime ();
412  }
413  totRx += m_pLBTS[i].GetRxCount ();
414  totTx += m_pLBTS[i].GetTxCount ();
416  }
417  if (totRx == totTx)
418  {
419  // If lookahead is infinite then granted time should be as well.
420  // Covers the edge case if all the tasks have no inter tasks
421  // links, prevents overflow of granted time.
423  {
425  }
426  else
427  {
428  // Overflow is possible here if near end of representable time.
429  m_grantedTime = smallestTime + m_lookAhead;
430  }
431  }
432  }
433 
434  // Execute next event if it is within the current time window.
435  // Local task may be completed.
436  if ( (nextTime <= m_grantedTime) && (!IsLocalFinished ()) )
437  { // Safe to process
438  ProcessOneEvent ();
439  }
440  }
441 
442  // If the simulator stopped naturally by lack of events, make a
443  // consistency test to check that we didn't lose any events along the way.
444  NS_ASSERT (!m_events->IsEmpty () || m_unscheduledEvents == 0);
445 #else
446  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
447 #endif
448 }
449 
451 {
452  return m_myId;
453 }
454 
455 void
457 {
458  NS_LOG_FUNCTION (this);
459 
460  m_stop = true;
461 }
462 
463 void
465 {
466  NS_LOG_FUNCTION (this << time.GetTimeStep ());
467 
469 }
470 
471 //
472 // Schedule an event for a _relative_ time in the future.
473 //
474 EventId
476 {
477  NS_LOG_FUNCTION (this << time.GetTimeStep () << event);
478 
479  Time tAbsolute = time + TimeStep (m_currentTs);
480 
481  NS_ASSERT (tAbsolute.IsPositive ());
482  NS_ASSERT (tAbsolute >= TimeStep (m_currentTs));
483  Scheduler::Event ev;
484  ev.impl = event;
485  ev.key.m_ts = static_cast<uint64_t> (tAbsolute.GetTimeStep ());
486  ev.key.m_context = GetContext ();
487  ev.key.m_uid = m_uid;
488  m_uid++;
490  m_events->Insert (ev);
491  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
492 }
493 
494 void
495 DistributedSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &time, EventImpl *event)
496 {
497  NS_LOG_FUNCTION (this << context << time.GetTimeStep () << m_currentTs << event);
498 
499  Scheduler::Event ev;
500  ev.impl = event;
501  ev.key.m_ts = m_currentTs + time.GetTimeStep ();
502  ev.key.m_context = context;
503  ev.key.m_uid = m_uid;
504  m_uid++;
506  m_events->Insert (ev);
507 }
508 
509 EventId
511 {
512  NS_LOG_FUNCTION (this << event);
513 
514  Scheduler::Event ev;
515  ev.impl = event;
516  ev.key.m_ts = m_currentTs;
517  ev.key.m_context = GetContext ();
518  ev.key.m_uid = m_uid;
519  m_uid++;
521  m_events->Insert (ev);
522  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
523 }
524 
525 EventId
527 {
528  NS_LOG_FUNCTION (this << event);
529 
530  EventId id (Ptr<EventImpl> (event, false), m_currentTs, 0xffffffff, 2);
531  m_destroyEvents.push_back (id);
532  m_uid++;
533  return id;
534 }
535 
536 Time
538 {
539  return TimeStep (m_currentTs);
540 }
541 
542 Time
544 {
545  if (IsExpired (id))
546  {
547  return TimeStep (0);
548  }
549  else
550  {
551  return TimeStep (id.GetTs () - m_currentTs);
552  }
553 }
554 
555 void
557 {
558  if (id.GetUid () == 2)
559  {
560  // destroy events.
561  for (DestroyEvents::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
562  {
563  if (*i == id)
564  {
565  m_destroyEvents.erase (i);
566  break;
567  }
568  }
569  return;
570  }
571  if (IsExpired (id))
572  {
573  return;
574  }
575  Scheduler::Event event;
576  event.impl = id.PeekEventImpl ();
577  event.key.m_ts = id.GetTs ();
578  event.key.m_context = id.GetContext ();
579  event.key.m_uid = id.GetUid ();
580  m_events->Remove (event);
581  event.impl->Cancel ();
582  // whenever we remove an event from the event list, we have to unref it.
583  event.impl->Unref ();
584 
586 }
587 
588 void
590 {
591  if (!IsExpired (id))
592  {
593  id.PeekEventImpl ()->Cancel ();
594  }
595 }
596 
597 bool
599 {
600  if (id.GetUid () == 2)
601  {
602  if (id.PeekEventImpl () == 0
603  || id.PeekEventImpl ()->IsCancelled ())
604  {
605  return true;
606  }
607  // destroy events.
608  for (DestroyEvents::const_iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
609  {
610  if (*i == id)
611  {
612  return false;
613  }
614  }
615  return true;
616  }
617  if (id.PeekEventImpl () == 0
618  || id.GetTs () < m_currentTs
619  || (id.GetTs () == m_currentTs
620  && id.GetUid () <= m_currentUid)
621  || id.PeekEventImpl ()->IsCancelled ())
622  {
623  return true;
624  }
625  else
626  {
627  return false;
628  }
629 }
630 
631 Time
633 {
636  return TimeStep (0x7fffffffffffffffLL);
637 }
638 
639 uint32_t
641 {
642  return m_currentContext;
643 }
644 
645 } // namespace ns3
Time Get(void) const
Definition: time.cc:436
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:95
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:73
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
virtual void SetScheduler(ObjectFactory schedulerFactory)
Set the Scheduler to be used to manage the event list.
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:44
std::vector< Ptr< Node > >::const_iterator Iterator
Node container iterator.
uint64_t m_ts
Event time stamp.
Definition: scheduler.h:77
EventImpl * impl
Pointer to the event implementation.
Definition: scheduler.h:90
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
Definition: assert.h:61
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:201
virtual EventId ScheduleDestroy(EventImpl *event)
Schedule an event to run at the end of the simulation, after the Stop() time or condition has been re...
virtual Time GetMaximumSimulationTime(void) const
Get the maximum representable simulation time.
Iterator End(void) const
Get an iterator which indicates past-the-last Node in the container.
virtual void DoDispose(void)
Destructor implementation.
Definition: object.cc:338
#define NS_FATAL_ERROR(msg)
Fatal error handling.
Definition: fatal-error.h:100
virtual void SetMaximumLookAhead(const Time lookAhead)
static EventId Schedule(Time const &time, MEM mem_ptr, OBJ obj)
Schedule an event to expire at the relative time "time" is reached.
Definition: simulator.h:819
uint32_t GetSystemId(void) const
Definition: node.cc:113
virtual Time Now(void) const
Return the current simulation virtual time.
virtual void DoDispose(void)
Destructor implementation.
void Invoke(void)
Called by the simulation engine to notify the event that it is time to execute.
Definition: event-impl.cc:46
static void TestSendComplete()
Check for completed sends.
virtual Time GetDelayLeft(const EventId &id) const
Get the remaining time until this event will execute.
EventKey key
Key for sorting and ordering Events.
Definition: scheduler.h:91
static void Destroy()
Deletes storage used by the parallel environment.
AttributeValue implementation for Time.
Definition: nstime.h:921
Ptr< Object > Create(void) const
Create an Object instance of the configured TypeId.
uint32_t m_uid
Event unique id.
Definition: scheduler.h:78
virtual void Cancel(const EventId &id)
Set the cancel bit on this event: the event's associated function will not be invoked when it expires...
virtual EventId ScheduleNow(EventImpl *event)
Schedule an event to run at the current virtual time.
void Unref(void) const
Decrement the reference count.
Maintain the event list.
Definition: scheduler.h:66
#define NS_LOG_LOGIC(msg)
Use NS_LOG to output a message of level LOG_LOGIC.
Definition: log.h:252
virtual bool IsExpired(const EventId &id) const
Check if an event has already run or been cancelled.
Scheduler event.
Definition: scheduler.h:88
Every class exported by the ns3 library is enclosed in the ns3 namespace.
keep track of a set of node pointers.
virtual void Destroy()
Execute the events scheduled with ScheduleDestroy().
Iterator Begin(void) const
Get an iterator which refers to the first Node in the container.
virtual void ScheduleWithContext(uint32_t context, Time const &time, EventImpl *event)
Schedule a future event execution (in a different context).
int64_t GetTimeStep(void) const
Definition: nstime.h:357
Time TimeStep(uint64_t ts)
Definition: nstime.h:916
int64_t GetInteger(void) const
Definition: nstime.h:365
virtual uint32_t GetSystemId(void) const
Get the system id of this simulator.
virtual uint32_t GetContext(void) const
Get the current simulation context.
Structure used for all-reduce LBTS computation.
static NodeContainer GetGlobal(void)
Create a NodeContainer that contains a list of all nodes created through NodeContainer::Create() and ...
virtual void Remove(const EventId &id)
Remove an event from the event list.
Instantiate subclasses of ns3::Object.
A simulation event.
Definition: event-impl.h:44
virtual EventId Schedule(Time const &time, EventImpl *event)
Schedule a future event execution (in the same context).
static void ReceiveMessages()
Check for received messages complete.
An identifier for simulation events.
Definition: event-id.h:53
static uint32_t GetSystemId()
#define NS_LOG_WARN(msg)
Use NS_LOG to output a message of level LOG_WARN.
Definition: log.h:228
static void Stop(void)
Tell the Simulator the calling event should be the last one executed.
Definition: simulator.cc:208
Time Seconds(double value)
Construct a Time in the indicated unit.
Definition: nstime.h:859
#define NS_UNUSED(x)
Definition: unused.h:5
A base class which provides memory management and object aggregation.
Definition: object.h:87
virtual void Run(void)
Run the simulation.
virtual void Stop(void)
Tell the Simulator the calling event should be the last one executed.
virtual bool IsFinished(void) const
Check if the simulation should finish.
a unique identifier for an interface.
Definition: type-id.h:51
TypeId SetParent(TypeId tid)
Definition: type-id.cc:631
static uint32_t GetSize()
uint32_t m_context
Event context.
Definition: scheduler.h:79