A Discrete-Event Network Simulator
API
distributed-simulator-impl.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  *
18  */
19 
28 #include "mpi-interface.h"
29 
30 #include "ns3/simulator.h"
31 #include "ns3/scheduler.h"
32 #include "ns3/event-impl.h"
33 #include "ns3/channel.h"
34 #include "ns3/node-container.h"
35 #include "ns3/ptr.h"
36 #include "ns3/pointer.h"
37 #include "ns3/assert.h"
38 #include "ns3/log.h"
39 
40 #include <mpi.h>
41 #include <cmath>
42 
43 namespace ns3 {
44 
45 NS_LOG_COMPONENT_DEFINE ("DistributedSimulatorImpl");
46 
47 NS_OBJECT_ENSURE_REGISTERED (DistributedSimulatorImpl);
48 
50 {
51 }
52 
53 Time
55 {
56  return m_smallestTime;
57 }
58 
59 uint32_t
61 {
62  return m_txCount;
63 }
64 
65 uint32_t
67 {
68  return m_rxCount;
69 }
70 uint32_t
72 {
73  return m_myId;
74 }
75 
76 bool
78 {
79  return m_isFinished;
80 }
81 
88 
89 TypeId
91 {
92  static TypeId tid = TypeId ("ns3::DistributedSimulatorImpl")
94  .SetGroupName ("Mpi")
95  .AddConstructor<DistributedSimulatorImpl> ()
96  ;
97  return tid;
98 }
99 
101 {
102  NS_LOG_FUNCTION (this);
103 
106 
107  // Allocate the LBTS message buffer
109  m_grantedTime = Seconds (0);
110 
111  m_stop = false;
112  m_globalFinished = false;
113  // uids are allocated from 4.
114  // uid 0 is "invalid" events
115  // uid 1 is "now" events
116  // uid 2 is "destroy" events
117  m_uid = 4;
118  // before ::Run is entered, the m_currentUid will be zero
119  m_currentUid = 0;
120  m_currentTs = 0;
123  m_eventCount = 0;
124  m_events = 0;
125 }
126 
128 {
129  NS_LOG_FUNCTION (this);
130 }
131 
132 void
134 {
135  NS_LOG_FUNCTION (this);
136 
137  while (!m_events->IsEmpty ())
138  {
139  Scheduler::Event next = m_events->RemoveNext ();
140  next.impl->Unref ();
141  }
142  m_events = 0;
143  delete [] m_pLBTS;
145 }
146 
147 void
149 {
150  NS_LOG_FUNCTION (this);
151 
152  while (!m_destroyEvents.empty ())
153  {
154  Ptr<EventImpl> ev = m_destroyEvents.front ().PeekEventImpl ();
155  m_destroyEvents.pop_front ();
156  NS_LOG_LOGIC ("handle destroy " << ev);
157  if (!ev->IsCancelled ())
158  {
159  ev->Invoke ();
160  }
161  }
162 
164 }
165 
166 
167 void
169 {
170  NS_LOG_FUNCTION (this);
171 
172  /* If runnning sequential simulation can ignore lookahead */
173  if (MpiInterface::GetSize () <= 1)
174  {
175  m_lookAhead = Seconds (0);
176  }
177  else
178  {
180  for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter)
181  {
182  if ((*iter)->GetSystemId () != MpiInterface::GetSystemId ())
183  {
184  continue;
185  }
186 
187  for (uint32_t i = 0; i < (*iter)->GetNDevices (); ++i)
188  {
189  Ptr<NetDevice> localNetDevice = (*iter)->GetDevice (i);
190  // only works for p2p links currently
191  if (!localNetDevice->IsPointToPoint ())
192  {
193  continue;
194  }
195  Ptr<Channel> channel = localNetDevice->GetChannel ();
196  if (channel == 0)
197  {
198  continue;
199  }
200 
201  // grab the adjacent node
202  Ptr<Node> remoteNode;
203  if (channel->GetDevice (0) == localNetDevice)
204  {
205  remoteNode = (channel->GetDevice (1))->GetNode ();
206  }
207  else
208  {
209  remoteNode = (channel->GetDevice (0))->GetNode ();
210  }
211 
212  // if it's not remote, don't consider it
213  if (remoteNode->GetSystemId () == MpiInterface::GetSystemId ())
214  {
215  continue;
216  }
217 
218  // compare delay on the channel with current value of
219  // m_lookAhead. if delay on channel is smaller, make
220  // it the new lookAhead.
221  TimeValue delay;
222  channel->GetAttribute ("Delay", delay);
223 
224  if (delay.Get () < m_lookAhead)
225  {
226  m_lookAhead = delay.Get ();
227  }
228  }
229  }
230  }
231 
232  // m_lookAhead is now set
234 
235  /*
236  * Compute the maximum inter-task latency and use that value
237  * for tasks with no inter-task links.
238  *
239  * Special processing for edge cases. For tasks that have no
240  * nodes need to determine a reasonable lookAhead value. Infinity
241  * would work correctly but introduces a performance issue; tasks
242  * with an infinite lookAhead would execute all their events
243  * before doing an AllGather resulting in very bad load balance
244  * during the first time window. Since all tasks participate in
245  * the AllGather it is desirable to have all the tasks advance in
246  * simulation time at a similar rate assuming roughly equal events
247  * per unit of simulation time in order to equalize the amount of
248  * work per time window.
249  */
250  long sendbuf;
251  long recvbuf;
252 
253  /* Tasks with no inter-task links do not contribute to max */
255  {
256  sendbuf = 0;
257  }
258  else
259  {
260  sendbuf = m_lookAhead.GetInteger ();
261  }
262 
263  MPI_Allreduce (&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MpiInterface::GetCommunicator ());
264 
265  /* For nodes that did not compute a lookahead use max from ranks
266  * that did compute a value. An edge case occurs if all nodes have
267  * no inter-task links (max will be 0 in this case). Use infinity so all tasks
268  * will proceed without synchronization until a single AllGather
269  * occurs when all tasks have finished.
270  */
271  if (m_lookAhead == GetMaximumSimulationTime () && recvbuf != 0)
272  {
273  m_lookAhead = Time (recvbuf);
275  }
276 }
277 
278 void
280 {
281  if (lookAhead > Time (0))
282  {
283  NS_LOG_FUNCTION (this << lookAhead);
284  m_lookAhead = Min(m_lookAhead, lookAhead);
285  }
286  else
287  {
288  NS_LOG_WARN ("attempted to set lookahead to a negative time: " << lookAhead);
289  }
290 }
291 
292 void
294 {
295  NS_LOG_FUNCTION (this << schedulerFactory);
296 
297  Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
298 
299  if (m_events != 0)
300  {
301  while (!m_events->IsEmpty ())
302  {
303  Scheduler::Event next = m_events->RemoveNext ();
304  scheduler->Insert (next);
305  }
306  }
307  m_events = scheduler;
308 }
309 
310 void
312 {
313  NS_LOG_FUNCTION (this);
314 
315  Scheduler::Event next = m_events->RemoveNext ();
316 
317  NS_ASSERT (next.key.m_ts >= m_currentTs);
319  m_eventCount++;
320 
321  NS_LOG_LOGIC ("handle " << next.key.m_ts);
322  m_currentTs = next.key.m_ts;
324  m_currentUid = next.key.m_uid;
325  next.impl->Invoke ();
326  next.impl->Unref ();
327 }
328 
329 bool
331 {
332  return m_globalFinished;
333 }
334 
335 bool
337 {
338  return m_events->IsEmpty () || m_stop;
339 }
340 
341 uint64_t
343 {
344  // If local MPI task is has no more events or stop was called
345  // next event time is infinity.
346  if (IsLocalFinished ())
347  {
349  }
350  else
351  {
352  Scheduler::Event ev = m_events->PeekNext ();
353  return ev.key.m_ts;
354  }
355 }
356 
357 Time
359 {
360  return TimeStep (NextTs ());
361 }
362 
363 void
365 {
366  NS_LOG_FUNCTION (this);
367 
369  m_stop = false;
370  m_globalFinished = false;
371  while (!m_globalFinished)
372  {
373  Time nextTime = Next ();
374 
375  // If local event is beyond grantedTime then need to synchronize
376  // with other tasks to determine new time window. If local task
377  // is finished then continue to participate in allgather
378  // synchronizations with other tasks until all tasks have
379  // completed.
380  if (nextTime > m_grantedTime || IsLocalFinished () )
381  {
382  // Can't process next event, calculate a new LBTS
383  // First receive any pending messages
385  // reset next time
386  nextTime = Next ();
387  // And check for send completes
389  // Finally calculate the lbts
391  m_myId, IsLocalFinished (), nextTime);
392  m_pLBTS[m_myId] = lMsg;
393  MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS,
394  sizeof (LbtsMessage), MPI_BYTE, MpiInterface::GetCommunicator ());
395  Time smallestTime = m_pLBTS[0].GetSmallestTime ();
396  // The totRx and totTx counts insure there are no transient
397  // messages; If totRx != totTx, there are transients,
398  // so we don't update the granted time.
399  uint32_t totRx = m_pLBTS[0].GetRxCount ();
400  uint32_t totTx = m_pLBTS[0].GetTxCount ();
402 
403  for (uint32_t i = 1; i < m_systemCount; ++i)
404  {
405  if (m_pLBTS[i].GetSmallestTime () < smallestTime)
406  {
407  smallestTime = m_pLBTS[i].GetSmallestTime ();
408  }
409  totRx += m_pLBTS[i].GetRxCount ();
410  totTx += m_pLBTS[i].GetTxCount ();
412  }
413 
414  // Global halting condition is all nodes have empty queue's and
415  // no messages are in-flight.
416  m_globalFinished &= totRx == totTx;
417 
418  if (totRx == totTx)
419  {
420  // If lookahead is infinite then granted time should be as well.
421  // Covers the edge case if all the tasks have no inter tasks
422  // links, prevents overflow of granted time.
424  {
426  }
427  else
428  {
429  // Overflow is possible here if near end of representable time.
430  m_grantedTime = smallestTime + m_lookAhead;
431  }
432  }
433  }
434 
435  // Execute next event if it is within the current time window.
436  // Local task may be completed.
437  if ( (nextTime <= m_grantedTime) && (!IsLocalFinished ()) )
438  { // Safe to process
439  ProcessOneEvent ();
440  }
441  }
442 
443  // If the simulator stopped naturally by lack of events, make a
444  // consistency test to check that we didn't lose any events along the way.
445  NS_ASSERT (!m_events->IsEmpty () || m_unscheduledEvents == 0);
446 }
447 
449 {
450  return m_myId;
451 }
452 
453 void
455 {
456  NS_LOG_FUNCTION (this);
457 
458  m_stop = true;
459 }
460 
461 void
463 {
464  NS_LOG_FUNCTION (this << delay.GetTimeStep ());
465 
467 }
468 
469 //
470 // Schedule an event for a _relative_ time in the future.
471 //
472 EventId
474 {
475  NS_LOG_FUNCTION (this << delay.GetTimeStep () << event);
476 
477  Time tAbsolute = delay + TimeStep (m_currentTs);
478 
479  NS_ASSERT (tAbsolute.IsPositive ());
480  NS_ASSERT (tAbsolute >= TimeStep (m_currentTs));
481  Scheduler::Event ev;
482  ev.impl = event;
483  ev.key.m_ts = static_cast<uint64_t> (tAbsolute.GetTimeStep ());
484  ev.key.m_context = GetContext ();
485  ev.key.m_uid = m_uid;
486  m_uid++;
488  m_events->Insert (ev);
489  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
490 }
491 
492 void
493 DistributedSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &delay, EventImpl *event)
494 {
495  NS_LOG_FUNCTION (this << context << delay.GetTimeStep () << m_currentTs << event);
496 
497  Scheduler::Event ev;
498  ev.impl = event;
499  ev.key.m_ts = m_currentTs + delay.GetTimeStep ();
500  ev.key.m_context = context;
501  ev.key.m_uid = m_uid;
502  m_uid++;
504  m_events->Insert (ev);
505 }
506 
507 EventId
509 {
510  NS_LOG_FUNCTION (this << event);
511 
512  Scheduler::Event ev;
513  ev.impl = event;
514  ev.key.m_ts = m_currentTs;
515  ev.key.m_context = GetContext ();
516  ev.key.m_uid = m_uid;
517  m_uid++;
519  m_events->Insert (ev);
520  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
521 }
522 
523 EventId
525 {
526  NS_LOG_FUNCTION (this << event);
527 
528  EventId id (Ptr<EventImpl> (event, false), m_currentTs, 0xffffffff, 2);
529  m_destroyEvents.push_back (id);
530  m_uid++;
531  return id;
532 }
533 
534 Time
536 {
537  return TimeStep (m_currentTs);
538 }
539 
540 Time
542 {
543  if (IsExpired (id))
544  {
545  return TimeStep (0);
546  }
547  else
548  {
549  return TimeStep (id.GetTs () - m_currentTs);
550  }
551 }
552 
553 void
555 {
556  if (id.GetUid () == 2)
557  {
558  // destroy events.
559  for (DestroyEvents::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
560  {
561  if (*i == id)
562  {
563  m_destroyEvents.erase (i);
564  break;
565  }
566  }
567  return;
568  }
569  if (IsExpired (id))
570  {
571  return;
572  }
573  Scheduler::Event event;
574  event.impl = id.PeekEventImpl ();
575  event.key.m_ts = id.GetTs ();
576  event.key.m_context = id.GetContext ();
577  event.key.m_uid = id.GetUid ();
578  m_events->Remove (event);
579  event.impl->Cancel ();
580  // whenever we remove an event from the event list, we have to unref it.
581  event.impl->Unref ();
582 
584 }
585 
586 void
588 {
589  if (!IsExpired (id))
590  {
591  id.PeekEventImpl ()->Cancel ();
592  }
593 }
594 
595 bool
597 {
598  if (id.GetUid () == 2)
599  {
600  if (id.PeekEventImpl () == 0
601  || id.PeekEventImpl ()->IsCancelled ())
602  {
603  return true;
604  }
605  // destroy events.
606  for (DestroyEvents::const_iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
607  {
608  if (*i == id)
609  {
610  return false;
611  }
612  }
613  return true;
614  }
615  if (id.PeekEventImpl () == 0
616  || id.GetTs () < m_currentTs
617  || (id.GetTs () == m_currentTs
618  && id.GetUid () <= m_currentUid)
619  || id.PeekEventImpl ()->IsCancelled ())
620  {
621  return true;
622  }
623  else
624  {
625  return false;
626  }
627 }
628 
629 Time
631 {
634  return TimeStep (0x7fffffffffffffffLL);
635 }
636 
637 uint32_t
639 {
640  return m_currentContext;
641 }
642 
643 uint64_t
645 {
646  return m_eventCount;
647 }
648 
649 } // namespace ns3
static EventId Schedule(Time const &delay, FUNC f, Ts &&... args)
Schedule an event to expire after delay.
Definition: simulator.h:557
Time m_grantedTime
End of current window.
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:103
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:73
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
virtual void SetScheduler(ObjectFactory schedulerFactory)
Set the Scheduler to be used to manage the event list.
uint32_t m_myId
System Id of the rank sending this LBTS.
virtual uint32_t GetContext(void) const
Get the current simulation context.
Ptr< Scheduler > m_events
The event priority queue.
void Unref(void) const
Decrement the reference count.
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:45
uint32_t m_systemCount
MPI communicator size.
static MPI_Comm GetCommunicator()
Return the communicator used to run ns-3.
int64_t GetInteger(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:424
std::vector< Ptr< Node > >::const_iterator Iterator
Node container iterator.
uint64_t m_ts
Event time stamp.
Definition: scheduler.h:170
virtual EventId Schedule(Time const &delay, EventImpl *event)
Schedule a future event execution (in the same context).
bool IsLocalFinished(void) const
Check if this rank is finished.
Time Next(void) const
Get the time of the next event, as returned by NextTs().
EventImpl * impl
Pointer to the event implementation.
Definition: scheduler.h:183
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
Definition: assert.h:67
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:205
virtual EventId ScheduleDestroy(EventImpl *event)
Schedule an event to run at the end of the simulation, after the Stop() time or condition has been re...
void(* Time)(Time oldValue, Time newValue)
TracedValue callback signature for Time.
Definition: nstime.h:813
virtual void DoDispose(void)
Destructor implementation.
Definition: object.cc:346
Iterator End(void) const
Get an iterator which indicates past-the-last Node in the container.
uint64_t m_eventCount
The event count.
uint32_t m_txCount
Count of transmitted messages.
virtual Time GetDelayLeft(const EventId &id) const
Get the remaining time until this event will execute.
virtual void DoDispose(void)
Destructor implementation.
channel
Definition: third.py:92
void Invoke(void)
Called by the simulation engine to notify the event that it is time to execute.
Definition: event-impl.cc:46
virtual Time GetMaximumSimulationTime(void) const
Get the maximum representable simulation time.
static Time Max()
Maximum representable Time Not to be confused with Max(Time,Time).
Definition: nstime.h:283
static void TestSendComplete()
Check for completed sends.
int64x64_t Min(const int64x64_t &a, const int64x64_t &b)
Minimum.
Definition: int64x64.h:218
Declaration of class ns3::MpiInterface.
void ProcessOneEvent(void)
Process the next event.
EventKey key
Key for sorting and ordering Events.
Definition: scheduler.h:184
virtual bool IsFinished(void) const
Check if the simulation should finish.
uint32_t m_currentContext
Execution context of the current event.
static void Destroy()
Deletes storage used by the parallel environment.
AttributeValue implementation for Time.
Definition: nstime.h:1353
Ptr< Object > Create(void) const
Create an Object instance of the configured TypeId.
uint32_t m_uid
Event unique id.
Definition: scheduler.h:171
virtual void Cancel(const EventId &id)
Set the cancel bit on this event: the event&#39;s associated function will not be invoked when it expires...
DistributedSimulatorImpl()
Default constructor.
virtual EventId ScheduleNow(EventImpl *event)
Schedule an event to run at the current virtual time.
virtual uint64_t GetEventCount(void) const
Get the number of events executed.
DestroyEvents m_destroyEvents
The container of events to run at Destroy()
Maintain the event list.
Definition: scheduler.h:155
bool m_stop
Flag calling for the end of the simulation.
#define NS_LOG_LOGIC(msg)
Use NS_LOG to output a message of level LOG_LOGIC.
Definition: log.h:289
virtual bool IsExpired(const EventId &id) const
Check if an event has already run or been cancelled.
static TypeId GetTypeId(void)
Register this type.
Time m_smallestTime
Earliest next event timestamp.
Scheduler event.
Definition: scheduler.h:181
Distributed simulator implementation using lookahead.
Every class exported by the ns3 library is enclosed in the ns3 namespace.
keep track of a set of node pointers.
virtual uint32_t GetSystemId(void) const
Get the system id of this simulator.
virtual void Destroy()
Execute the events scheduled with ScheduleDestroy().
uint32_t GetSystemId(void) const
Definition: node.cc:123
static Time m_lookAhead
Current window size.
uint64_t m_currentTs
Timestamp of the current event.
int m_unscheduledEvents
Number of events that have been inserted but not yet scheduled, not counting the "destroy" events; th...
Structure used for all-reduce LBTS computation.
static NodeContainer GetGlobal(void)
Create a NodeContainer that contains a list of all nodes created through NodeContainer::Create() and ...
void CalculateLookAhead(void)
Calculate lookahead constraint based on network latency.
virtual void Remove(const EventId &id)
Remove an event from the event list.
Instantiate subclasses of ns3::Object.
uint32_t m_rxCount
Count of received messages.
A simulation event.
Definition: event-impl.h:44
static void ReceiveMessages()
Check for received messages complete.
An identifier for simulation events.
Definition: event-id.h:53
static uint32_t GetSystemId()
Get the id number of this rank.
#define NS_LOG_WARN(msg)
Use NS_LOG to output a message of level LOG_WARN.
Definition: log.h:265
static void Stop(void)
Tell the Simulator the calling event should be the last one executed.
Definition: simulator.cc:180
Time Get(void) const
Definition: time.cc:530
Time Seconds(double value)
Construct a Time in the indicated unit.
Definition: nstime.h:1289
uint32_t m_uid
Next event unique id.
Flag for events not associated with any particular context.
Definition: simulator.h:199
bool m_globalFinished
Are all parallel instances completed.
virtual void BoundLookAhead(const Time lookAhead)
Add additional bound to lookahead constraints.
Declaration of classes ns3::SentBuffer and ns3::GrantedTimeWindowMpiInterface.
virtual void ScheduleWithContext(uint32_t context, Time const &delay, EventImpl *event)
Schedule a future event execution (in a different context).
LbtsMessage * m_pLBTS
Container for Lbts messages, one per rank.
virtual Time Now(void) const
Return the current simulation virtual time.
virtual void Run(void)
Run the simulation.
uint32_t m_currentUid
Unique id of the current event.
virtual void Stop(void)
Tell the Simulator the calling event should be the last one executed.
a unique identifier for an interface.
Definition: type-id.h:58
uint64_t NextTs(void) const
Get the timestep of the next event.
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:923
static uint32_t GetSize()
Get the number of ranks used by ns-3.
uint32_t m_context
Event context.
Definition: scheduler.h:172
Iterator Begin(void) const
Get an iterator which refers to the first Node in the container.
The SimulatorImpl base class.
Declaration of classes ns3::LbtsMessage and ns3::DistributedSimulatorImpl.
bool m_isFinished
true when this rank has no more events.
int64_t GetTimeStep(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:416